source: wokkel/server.py @ 178:37f36ea93838

Last change on this file since 178:37f36ea93838 was 178:37f36ea93838, checked in by Ralph Meijer <ralphm@…>, 8 years ago

Properly encode IDN domain names for SRV lookups.

Before Twisted 12.3.0, the SRV lookup done for outgoing
connections succeeded if passed a unicode string with all-ASCII
code points. A recent change made the DNS code more strict, and
only byte string are accepted as domain name. See
http://twistedmatrix.com/trac/ticket/6245 for details.

This change makes sure domain names are encoded to their ASCII
Compatible Equivalent (ACE) version before passing the resulting
byte string to twisted.names.srvconnect.SRVConnector, as per
RFC 3490.

Note that while connecting to servers with an IDN domain name
now works properly, authentication using MD5-Digest SASL mechanism
will fail until http://twistedmatrix.com/trac/ticket/5066 has been
resolved.

Fixes: #77.

  • Property exe set to *
File size: 23.3 KB
Line 
1# -*- test-case-name: wokkel.test.test_server -*-
2#
3# Copyright (c) Ralph Meijer.
4# See LICENSE for details.
5
6"""
7XMPP Server-to-Server protocol.
8
9This module implements several aspects of XMPP server-to-server communications
10as described in XMPP Core (RFC 3920). Refer to that document for the meaning
11of the used terminology.
12"""
13
14# hashlib is new in Python 2.5, try that first.
15try:
16    from hashlib import sha256
17    digestmod = sha256
18except ImportError:
19    import Crypto.Hash.SHA256 as digestmod
20    sha256 = digestmod.new
21
22import hmac
23
24from zope.interface import implements
25
26from twisted.internet import defer, reactor
27from twisted.names.srvconnect import SRVConnector
28from twisted.python import log, randbytes
29from twisted.words.protocols.jabber import error, ijabber, jid, xmlstream
30from twisted.words.xish import domish
31
32from wokkel.generic import DeferredXmlStreamFactory, XmlPipe, prepareIDNName
33
34NS_DIALBACK = 'jabber:server:dialback'
35
36def generateKey(secret, receivingServer, originatingServer, streamID):
37    """
38    Generate a dialback key for server-to-server XMPP Streams.
39
40    The dialback key is generated using the algorithm described in
41    U{XEP-0185<http://xmpp.org/extensions/xep-0185.html>}. The used
42    terminology for the parameters is described in RFC-3920.
43
44    @param secret: the shared secret known to the Originating Server and
45                   Authoritive Server.
46    @type secret: C{str}
47    @param receivingServer: the Receiving Server host name.
48    @type receivingServer: C{str}
49    @param originatingServer: the Originating Server host name.
50    @type originatingServer: C{str}
51    @param streamID: the Stream ID as generated by the Receiving Server.
52    @type streamID: C{str}
53    @return: hexadecimal digest of the generated key.
54    @type: C{str}
55    """
56
57    hashObject = sha256()
58    hashObject.update(secret)
59    hashedSecret = hashObject.hexdigest()
60    message = " ".join([receivingServer, originatingServer, streamID])
61    hash = hmac.HMAC(hashedSecret, message, digestmod=digestmod)
62    return hash.hexdigest()
63
64
65def trapStreamError(xs, observer):
66    """
67    Trap stream errors.
68
69    This wraps an observer to catch exceptions. In case of a
70    L{error.StreamError}, it is send over the given XML stream. All other
71    exceptions yield a C{'internal-server-error'} stream error, that is
72    sent over the stream, while the exception is logged.
73
74    @return: Wrapped observer
75    """
76
77    def wrappedObserver(element):
78        try:
79            observer(element)
80        except error.StreamError, exc:
81            xs.sendStreamError(exc)
82        except:
83            log.err()
84            exc = error.StreamError('internal-server-error')
85            xs.sendStreamError(exc)
86
87    return wrappedObserver
88
89
90class XMPPServerConnector(SRVConnector):
91    def __init__(self, reactor, domain, factory):
92        SRVConnector.__init__(self, reactor, 'xmpp-server', domain, factory)
93
94
95    def pickServer(self):
96        host, port = SRVConnector.pickServer(self)
97
98        if not self.servers and not self.orderedServers:
99            # no SRV record, fall back..
100            port = 5269
101
102        return host, port
103
104
105class DialbackFailed(Exception):
106    pass
107
108
109
110class OriginatingDialbackInitializer(object):
111    """
112    Server Dialback Initializer for the Orginating Server.
113    """
114
115    implements(ijabber.IInitiatingInitializer)
116
117    _deferred = None
118
119    def __init__(self, xs, thisHost, otherHost, secret):
120        self.xmlstream = xs
121        self.thisHost = thisHost
122        self.otherHost = otherHost
123        self.secret = secret
124
125
126    def initialize(self):
127        self._deferred = defer.Deferred()
128        self.xmlstream.addObserver(xmlstream.STREAM_ERROR_EVENT,
129                                   self.onStreamError)
130        self.xmlstream.addObserver("/result[@xmlns='%s']" % NS_DIALBACK,
131                                   self.onResult)
132
133        key = generateKey(self.secret, self.otherHost,
134                          self.thisHost, self.xmlstream.sid)
135
136        result = domish.Element((NS_DIALBACK, 'result'))
137        result['from'] = self.thisHost
138        result['to'] = self.otherHost
139        result.addContent(key)
140
141        self.xmlstream.send(result)
142
143        return self._deferred
144
145
146    def onResult(self, result):
147        self.xmlstream.removeObserver(xmlstream.STREAM_ERROR_EVENT,
148                                      self.onStreamError)
149        if result['type'] == 'valid':
150            self.xmlstream.otherEntity = jid.internJID(self.otherHost)
151            self._deferred.callback(None)
152        else:
153            self._deferred.errback(DialbackFailed())
154
155
156    def onStreamError(self, failure):
157        self.xmlstream.removeObserver("/result[@xmlns='%s']" % NS_DIALBACK,
158                                      self.onResult)
159        self._deferred.errback(failure)
160
161
162
163class ReceivingDialbackInitializer(object):
164    """
165    Server Dialback Initializer for the Receiving Server.
166    """
167
168    implements(ijabber.IInitiatingInitializer)
169
170    _deferred = None
171
172    def __init__(self, xs, thisHost, otherHost, originalStreamID, key):
173        self.xmlstream = xs
174        self.thisHost = thisHost
175        self.otherHost = otherHost
176        self.originalStreamID = originalStreamID
177        self.key = key
178
179
180    def initialize(self):
181        self._deferred = defer.Deferred()
182        self.xmlstream.addObserver(xmlstream.STREAM_ERROR_EVENT,
183                                   self.onStreamError)
184        self.xmlstream.addObserver("/verify[@xmlns='%s']" % NS_DIALBACK,
185                                   self.onVerify)
186
187        verify = domish.Element((NS_DIALBACK, 'verify'))
188        verify['from'] = self.thisHost
189        verify['to'] = self.otherHost
190        verify['id'] = self.originalStreamID
191        verify.addContent(self.key)
192
193        self.xmlstream.send(verify)
194        return self._deferred
195
196
197    def onVerify(self, verify):
198        self.xmlstream.removeObserver(xmlstream.STREAM_ERROR_EVENT,
199                                      self.onStreamError)
200        if verify['id'] != self.originalStreamID:
201            self.xmlstream.sendStreamError(error.StreamError('invalid-id'))
202            self._deferred.errback(DialbackFailed())
203        elif verify['to'] != self.thisHost:
204            self.xmlstream.sendStreamError(error.StreamError('host-unknown'))
205            self._deferred.errback(DialbackFailed())
206        elif verify['from'] != self.otherHost:
207            self.xmlstream.sendStreamError(error.StreamError('invalid-from'))
208            self._deferred.errback(DialbackFailed())
209        elif verify['type'] == 'valid':
210            self._deferred.callback(None)
211        else:
212            self._deferred.errback(DialbackFailed())
213
214
215    def onStreamError(self, failure):
216        self.xmlstream.removeObserver("/verify[@xmlns='%s']" % NS_DIALBACK,
217                                      self.onVerify)
218        self._deferred.errback(failure)
219
220
221
222class XMPPServerConnectAuthenticator(xmlstream.ConnectAuthenticator):
223    """
224    Authenticator for an outgoing XMPP server-to-server connection.
225
226    This authenticator connects to C{otherHost} (the Receiving Server) and then
227    initiates dialback as C{thisHost} (the Originating Server) using
228    L{OriginatingDialbackInitializer}.
229
230    @ivar thisHost: The domain this server connects from (the Originating
231                    Server) .
232    @ivar otherHost: The domain of the server this server connects to (the
233                     Receiving Server).
234    @ivar secret: The shared secret that is used for verifying the validity
235                  of this new connection.
236    """
237    namespace = 'jabber:server'
238
239    def __init__(self, thisHost, otherHost, secret):
240        self.thisHost = thisHost
241        self.otherHost = otherHost
242        self.secret = secret
243        xmlstream.ConnectAuthenticator.__init__(self, otherHost)
244
245
246    def connectionMade(self):
247        self.xmlstream.thisEntity = jid.internJID(self.thisHost)
248        self.xmlstream.prefixes = {xmlstream.NS_STREAMS: 'stream',
249                                   NS_DIALBACK: 'db'}
250        xmlstream.ConnectAuthenticator.connectionMade(self)
251
252
253    def associateWithStream(self, xs):
254        xmlstream.ConnectAuthenticator.associateWithStream(self, xs)
255        init = OriginatingDialbackInitializer(xs, self.thisHost,
256                                              self.otherHost, self.secret)
257        xs.initializers = [init]
258
259
260
261class XMPPServerVerifyAuthenticator(xmlstream.ConnectAuthenticator):
262    """
263    Authenticator for an outgoing connection to verify an incoming connection.
264
265    This authenticator connects to C{otherHost} (the Authoritative Server) and
266    then initiates dialback as C{thisHost} (the Receiving Server) using
267    L{ReceivingDialbackInitializer}.
268
269    @ivar thisHost: The domain this server connects from (the Receiving
270                    Server) .
271    @ivar otherHost: The domain of the server this server connects to (the
272                     Authoritative Server).
273    @ivar originalStreamID: The stream ID of the incoming connection that is
274                            being verified.
275    @ivar key: The key provided by the Receving Server to be verified.
276    """
277    namespace = 'jabber:server'
278
279    def __init__(self, thisHost, otherHost, originalStreamID, key):
280        self.thisHost = thisHost
281        self.otherHost = otherHost
282        self.originalStreamID = originalStreamID
283        self.key = key
284        xmlstream.ConnectAuthenticator.__init__(self, otherHost)
285
286
287    def connectionMade(self):
288        self.xmlstream.thisEntity = jid.internJID(self.thisHost)
289        self.xmlstream.prefixes = {xmlstream.NS_STREAMS: 'stream',
290                                   NS_DIALBACK: 'db'}
291        xmlstream.ConnectAuthenticator.connectionMade(self)
292
293
294    def associateWithStream(self, xs):
295        xmlstream.ConnectAuthenticator.associateWithStream(self, xs)
296        init = ReceivingDialbackInitializer(xs, self.thisHost, self.otherHost,
297                                            self.originalStreamID, self.key)
298        xs.initializers = [init]
299
300
301
302class XMPPServerListenAuthenticator(xmlstream.ListenAuthenticator):
303    """
304    Authenticator for an incoming XMPP server-to-server connection.
305
306    This authenticator handles two types of incoming connections. Regular
307    server-to-server connections are from the Originating Server to the
308    Receiving Server, where this server is the Receiving Server. These
309    connections start out by receiving a dialback key, verifying the
310    key with the Authoritative Server, and then accept normal XMPP stanzas.
311
312    The other type of connections is from a Receiving Server to an
313    Authoritative Server, where this server acts as the Authoritative Server.
314    These connections are used to verify the validity of an outgoing connection
315    from this server. In this case, this server receives a verification
316    request, checks the key and then returns the result.
317
318    @ivar service: The service that keeps the list of domains we accept
319                   connections for.
320    """
321    namespace = 'jabber:server'
322
323    def __init__(self, service):
324        xmlstream.ListenAuthenticator.__init__(self)
325        self.service = service
326
327
328    def streamStarted(self, rootElement):
329        xmlstream.ListenAuthenticator.streamStarted(self, rootElement)
330
331        # Compatibility fix for pre-8.2 implementations of ListenAuthenticator
332        if not self.xmlstream.sid:
333            self.xmlstream.sid = randbytes.secureRandom(8).encode('hex')
334
335        if self.xmlstream.thisEntity:
336            targetDomain = self.xmlstream.thisEntity.host
337        else:
338            targetDomain = self.service.defaultDomain
339
340        def prepareStream(domain):
341            self.xmlstream.namespace = self.namespace
342            self.xmlstream.prefixes = {xmlstream.NS_STREAMS: 'stream',
343                                       NS_DIALBACK: 'db'}
344            if domain:
345                self.xmlstream.thisEntity = jid.internJID(domain)
346
347        try:
348            if xmlstream.NS_STREAMS != rootElement.uri or \
349               self.namespace != self.xmlstream.namespace or \
350               ('db', NS_DIALBACK) not in rootElement.localPrefixes.iteritems():
351                raise error.StreamError('invalid-namespace')
352
353            if targetDomain and targetDomain not in self.service.domains:
354                raise error.StreamError('host-unknown')
355        except error.StreamError, exc:
356            prepareStream(self.service.defaultDomain)
357            self.xmlstream.sendStreamError(exc)
358            return
359
360        self.xmlstream.addObserver("//verify[@xmlns='%s']" % NS_DIALBACK,
361                                   trapStreamError(self.xmlstream,
362                                                   self.onVerify))
363        self.xmlstream.addObserver("//result[@xmlns='%s']" % NS_DIALBACK,
364                                   self.onResult)
365
366        prepareStream(targetDomain)
367        self.xmlstream.sendHeader()
368
369        if self.xmlstream.version >= (1, 0):
370            features = domish.Element((xmlstream.NS_STREAMS, 'features'))
371            self.xmlstream.send(features)
372
373
374    def onVerify(self, verify):
375        try:
376            receivingServer = jid.JID(verify['from']).host
377            originatingServer = jid.JID(verify['to']).host
378        except (KeyError, jid.InvalidFormat):
379            raise error.StreamError('improper-addressing')
380
381        if originatingServer not in self.service.domains:
382            raise error.StreamError('host-unknown')
383
384        if (self.xmlstream.otherEntity and
385            receivingServer != self.xmlstream.otherEntity.host):
386            raise error.StreamError('invalid-from')
387
388        streamID = verify.getAttribute('id', '')
389        key = unicode(verify)
390
391        calculatedKey = generateKey(self.service.secret, receivingServer,
392                                    originatingServer, streamID)
393        validity = (key == calculatedKey) and 'valid' or 'invalid'
394
395        reply = domish.Element((NS_DIALBACK, 'verify'))
396        reply['from'] = originatingServer
397        reply['to'] = receivingServer
398        reply['id'] = streamID
399        reply['type'] = validity
400        self.xmlstream.send(reply)
401
402
403    def onResult(self, result):
404        def reply(validity):
405            reply = domish.Element((NS_DIALBACK, 'result'))
406            reply['from'] = result['to']
407            reply['to'] = result['from']
408            reply['type'] = validity
409            self.xmlstream.send(reply)
410
411        def valid(xs):
412            reply('valid')
413            if not self.xmlstream.thisEntity:
414                self.xmlstream.thisEntity = jid.internJID(receivingServer)
415            self.xmlstream.otherEntity = jid.internJID(originatingServer)
416            self.xmlstream.dispatch(self.xmlstream,
417                                    xmlstream.STREAM_AUTHD_EVENT)
418
419        def invalid(failure):
420            log.err(failure)
421            reply('invalid')
422
423        receivingServer = result['to']
424        originatingServer = result['from']
425        key = unicode(result)
426
427        d = self.service.validateConnection(receivingServer, originatingServer,
428                                            self.xmlstream.sid, key)
429        d.addCallbacks(valid, invalid)
430        return d
431
432
433
434class DeferredS2SClientFactory(DeferredXmlStreamFactory):
435    """
436    Deferred firing factory for initiating XMPP server-to-server connection.
437
438    The deferred has its callbacks called upon succesful authentication with
439    the other server. In case of failed authentication or connection, the
440    deferred will have its errbacks called instead.
441    """
442
443    logTraffic = False
444
445    def __init__(self, authenticator):
446        DeferredXmlStreamFactory.__init__(self, authenticator)
447
448        self.addBootstrap(xmlstream.STREAM_CONNECTED_EVENT,
449                          self.onConnectionMade)
450
451        self.serial = 0
452
453
454    def onConnectionMade(self, xs):
455        xs.serial = self.serial
456        self.serial += 1
457
458        def logDataIn(buf):
459            log.msg("RECV (%d): %r" % (xs.serial, buf))
460
461        def logDataOut(buf):
462            log.msg("SEND (%d): %r" % (xs.serial, buf))
463
464        if self.logTraffic:
465            xs.rawDataInFn = logDataIn
466            xs.rawDataOutFn = logDataOut
467
468
469
470def initiateS2S(factory):
471    domain = prepareIDNName(factory.authenticator.otherHost)
472    c = XMPPServerConnector(reactor, domain, factory)
473    c.connect()
474    return factory.deferred
475
476
477
478class XMPPS2SServerFactory(xmlstream.XmlStreamServerFactory):
479    """
480    XMPP Server-to-Server Server factory.
481
482    This factory accepts XMPP server-to-server connections.
483    """
484
485    logTraffic = False
486
487    def __init__(self, service):
488        self.service = service
489
490        def authenticatorFactory():
491            return XMPPServerListenAuthenticator(service)
492
493        xmlstream.XmlStreamServerFactory.__init__(self, authenticatorFactory)
494        self.addBootstrap(xmlstream.STREAM_CONNECTED_EVENT,
495                          self.onConnectionMade)
496        self.addBootstrap(xmlstream.STREAM_AUTHD_EVENT,
497                          self.onAuthenticated)
498
499        self.serial = 0
500
501
502    def onConnectionMade(self, xs):
503        """
504        Called when a server-to-server connection was made.
505
506        This enables traffic debugging on incoming streams.
507        """
508        xs.serial = self.serial
509        self.serial += 1
510
511        def logDataIn(buf):
512            log.msg("RECV (%d): %r" % (xs.serial, buf))
513
514        def logDataOut(buf):
515            log.msg("SEND (%d): %r" % (xs.serial, buf))
516
517        if self.logTraffic:
518            xs.rawDataInFn = logDataIn
519            xs.rawDataOutFn = logDataOut
520
521        xs.addObserver(xmlstream.STREAM_ERROR_EVENT, self.onError)
522
523
524    def onAuthenticated(self, xs):
525        thisHost = xs.thisEntity.host
526        otherHost = xs.otherEntity.host
527
528        log.msg("Incoming connection %d from %r to %r established" %
529                (xs.serial, otherHost, thisHost))
530
531        xs.addObserver(xmlstream.STREAM_END_EVENT, self.onConnectionLost,
532                                                   0, xs)
533        xs.addObserver('/*', self.onElement, 0, xs)
534
535
536    def onConnectionLost(self, xs, reason):
537        thisHost = xs.thisEntity.host
538        otherHost = xs.otherEntity.host
539
540        log.msg("Incoming connection %d from %r to %r disconnected" %
541                (xs.serial, otherHost, thisHost))
542
543
544    def onError(self, reason):
545        log.err(reason, "Stream Error")
546
547
548    def onElement(self, xs, element):
549        """
550        Called when an element was received from one of the connected streams.
551
552        """
553        if element.handled:
554            return
555        else:
556            self.service.dispatch(xs, element)
557
558
559
560class ServerService(object):
561    """
562    Service for managing XMPP server to server connections.
563    """
564
565    logTraffic = False
566
567    def __init__(self, router, domain=None, secret=None):
568        self.router = router
569
570        self.defaultDomain = domain
571        self.domains = set()
572        if self.defaultDomain:
573            self.domains.add(self.defaultDomain)
574
575        if secret is not None:
576            self.secret = secret
577        else:
578            self.secret = randbytes.secureRandom(16).encode('hex')
579
580        self._outgoingStreams = {}
581        self._outgoingQueues = {}
582        self._outgoingConnecting = set()
583        self.serial = 0
584
585        pipe = XmlPipe()
586        self.xmlstream = pipe.source
587        self.router.addRoute(None, pipe.sink)
588        self.xmlstream.addObserver('/*', self.send)
589
590
591    def outgoingInitialized(self, xs):
592        thisHost = xs.thisEntity.host
593        otherHost = xs.otherEntity.host
594
595        log.msg("Outgoing connection %d from %r to %r established" %
596                (xs.serial, thisHost, otherHost))
597
598        self._outgoingStreams[thisHost, otherHost] = xs
599        xs.addObserver(xmlstream.STREAM_END_EVENT,
600                       lambda _: self.outgoingDisconnected(xs))
601
602        if (thisHost, otherHost) in self._outgoingQueues:
603            for element in self._outgoingQueues[thisHost, otherHost]:
604                xs.send(element)
605            del self._outgoingQueues[thisHost, otherHost]
606
607
608    def outgoingDisconnected(self, xs):
609        thisHost = xs.thisEntity.host
610        otherHost = xs.otherEntity.host
611
612        log.msg("Outgoing connection %d from %r to %r disconnected" %
613                (xs.serial, thisHost, otherHost))
614
615        del self._outgoingStreams[thisHost, otherHost]
616
617
618    def initiateOutgoingStream(self, thisHost, otherHost):
619        """
620        Initiate an outgoing XMPP server-to-server connection.
621        """
622
623        def resetConnecting(_):
624            self._outgoingConnecting.remove((thisHost, otherHost))
625
626        if (thisHost, otherHost) in self._outgoingConnecting:
627            return
628
629        authenticator = XMPPServerConnectAuthenticator(thisHost,
630                                                       otherHost,
631                                                       self.secret)
632        factory = DeferredS2SClientFactory(authenticator)
633        factory.addBootstrap(xmlstream.STREAM_AUTHD_EVENT,
634                             self.outgoingInitialized)
635        factory.logTraffic = self.logTraffic
636
637        self._outgoingConnecting.add((thisHost, otherHost))
638
639        d = initiateS2S(factory)
640        d.addBoth(resetConnecting)
641        return d
642
643
644    def validateConnection(self, thisHost, otherHost, sid, key):
645        """
646        Validate an incoming XMPP server-to-server connection.
647        """
648
649        def connected(xs):
650            # Set up stream for immediate disconnection.
651            def disconnect(_):
652                xs.transport.loseConnection()
653            xs.addObserver(xmlstream.STREAM_AUTHD_EVENT, disconnect)
654            xs.addObserver(xmlstream.INIT_FAILED_EVENT, disconnect)
655
656        authenticator = XMPPServerVerifyAuthenticator(thisHost, otherHost,
657                                                      sid, key)
658        factory = DeferredS2SClientFactory(authenticator)
659        factory.addBootstrap(xmlstream.STREAM_CONNECTED_EVENT, connected)
660        factory.logTraffic = self.logTraffic
661
662        d = initiateS2S(factory)
663        return d
664
665
666    def send(self, stanza):
667        """
668        Send stanza to the proper XML Stream.
669
670        This uses addressing embedded in the stanza to find the correct stream
671        to forward the stanza to.
672        """
673
674        otherHost = jid.internJID(stanza["to"]).host
675        thisHost = jid.internJID(stanza["from"]).host
676
677        if (thisHost, otherHost) not in self._outgoingStreams:
678            # There is no connection with the destination (yet). Cache the
679            # outgoing stanza until the connection has been established.
680            # XXX: If the connection cannot be established, the queue should
681            #      be emptied at some point.
682            if (thisHost, otherHost) not in self._outgoingQueues:
683                self._outgoingQueues[(thisHost, otherHost)] = []
684            self._outgoingQueues[(thisHost, otherHost)].append(stanza)
685            self.initiateOutgoingStream(thisHost, otherHost)
686        else:
687            self._outgoingStreams[(thisHost, otherHost)].send(stanza)
688
689
690    def dispatch(self, xs, stanza):
691        """
692        Send on element to be routed within the server.
693        """
694        stanzaFrom = stanza.getAttribute('from')
695        stanzaTo = stanza.getAttribute('to')
696
697        if not stanzaFrom or not stanzaTo:
698            xs.sendStreamError(error.StreamError('improper-addressing'))
699        else:
700            try:
701                sender = jid.internJID(stanzaFrom)
702                jid.internJID(stanzaTo)
703            except jid.InvalidFormat:
704                log.msg("Dropping error stanza with malformed JID")
705
706            if sender.host != xs.otherEntity.host:
707                xs.sendStreamError(error.StreamError('invalid-from'))
708            else:
709                self.xmlstream.send(stanza)
Note: See TracBrowser for help on using the repository browser.