source: wokkel/server.py @ 96:8e6130587088

Last change on this file since 96:8e6130587088 was 96:8e6130587088, checked in by Ralph Meijer <ralphm@…>, 9 years ago

Remove copyright dates from individual source files, only update LICENSE.

  • Property exe set to *
File size: 23.3 KB
Line 
1# -*- test-case-name: wokkel.test.test_server -*-
2#
3# Copyright (c) Ralph Meijer.
4# See LICENSE for details.
5
6"""
7XMPP Server-to-Server protocol.
8
9This module implements several aspects of XMPP server-to-server communications
10as described in XMPP Core (RFC 3920). Refer to that document for the meaning
11of the used terminology.
12"""
13
14# hashlib is new in Python 2.5, try that first.
15try:
16    from hashlib import sha256
17    digestmod = sha256
18except ImportError:
19    import Crypto.Hash.SHA256 as digestmod
20    sha256 = digestmod.new
21
22import hmac
23
24from zope.interface import implements
25
26from twisted.application import service
27from twisted.internet import defer, reactor
28from twisted.names.srvconnect import SRVConnector
29from twisted.python import log, randbytes
30from twisted.words.protocols.jabber import error, ijabber, jid, xmlstream
31from twisted.words.xish import domish
32
33from wokkel.generic import DeferredXmlStreamFactory, XmlPipe
34from wokkel.compat import XmlStreamServerFactory
35
36NS_DIALBACK = 'jabber:server:dialback'
37
38def generateKey(secret, receivingServer, originatingServer, streamID):
39    """
40    Generate a dialback key for server-to-server XMPP Streams.
41
42    The dialback key is generated using the algorithm described in
43    U{XEP-0185<http://www.xmpp.org/extensions/xep-0185.html>}. The used
44    terminology for the parameters is described in RFC-3920.
45
46    @param secret: the shared secret known to the Originating Server and
47                   Authoritive Server.
48    @type secret: C{str}
49    @param receivingServer: the Receiving Server host name.
50    @type receivingServer: C{str}
51    @param originatingServer: the Originating Server host name.
52    @type originatingServer: C{str}
53    @param streamID: the Stream ID as generated by the Receiving Server.
54    @type streamID: C{str}
55    @return: hexadecimal digest of the generated key.
56    @type: C{str}
57    """
58
59    hashObject = sha256()
60    hashObject.update(secret)
61    hashedSecret = hashObject.hexdigest()
62    message = " ".join([receivingServer, originatingServer, streamID])
63    hash = hmac.HMAC(hashedSecret, message, digestmod=digestmod)
64    return hash.hexdigest()
65
66
67def trapStreamError(xs, observer):
68    """
69    Trap stream errors.
70
71    This wraps an observer to catch exceptions. In case of a
72    L{error.StreamError}, it is send over the given XML stream. All other
73    exceptions yield a C{'internal-server-error'} stream error, that is
74    sent over the stream, while the exception is logged.
75
76    @return: Wrapped observer
77    """
78
79    def wrappedObserver(element):
80        try:
81            observer(element)
82        except error.StreamError, exc:
83            xs.sendStreamError(exc)
84        except:
85            log.err()
86            exc = error.StreamError('internal-server-error')
87            xs.sendStreamError(exc)
88
89    return wrappedObserver
90
91
92class XMPPServerConnector(SRVConnector):
93    def __init__(self, reactor, domain, factory):
94        SRVConnector.__init__(self, reactor, 'xmpp-server', domain, factory)
95
96
97    def pickServer(self):
98        host, port = SRVConnector.pickServer(self)
99
100        if not self.servers and not self.orderedServers:
101            # no SRV record, fall back..
102            port = 5269
103
104        return host, port
105
106
107class DialbackFailed(Exception):
108    pass
109
110
111
112class OriginatingDialbackInitializer(object):
113    """
114    Server Dialback Initializer for the Orginating Server.
115    """
116
117    implements(ijabber.IInitiatingInitializer)
118
119    _deferred = None
120
121    def __init__(self, xs, thisHost, otherHost, secret):
122        self.xmlstream = xs
123        self.thisHost = thisHost
124        self.otherHost = otherHost
125        self.secret = secret
126
127
128    def initialize(self):
129        self._deferred = defer.Deferred()
130        self.xmlstream.addObserver(xmlstream.STREAM_ERROR_EVENT,
131                                   self.onStreamError)
132        self.xmlstream.addObserver("/result[@xmlns='%s']" % NS_DIALBACK,
133                                   self.onResult)
134
135        key = generateKey(self.secret, self.otherHost,
136                          self.thisHost, self.xmlstream.sid)
137
138        result = domish.Element((NS_DIALBACK, 'result'))
139        result['from'] = self.thisHost
140        result['to'] = self.otherHost
141        result.addContent(key)
142
143        self.xmlstream.send(result)
144
145        return self._deferred
146
147
148    def onResult(self, result):
149        self.xmlstream.removeObserver(xmlstream.STREAM_ERROR_EVENT,
150                                      self.onStreamError)
151        if result['type'] == 'valid':
152            self.xmlstream.otherEntity = jid.internJID(self.otherHost)
153            self._deferred.callback(None)
154        else:
155            self._deferred.errback(DialbackFailed())
156
157
158    def onStreamError(self, failure):
159        self.xmlstream.removeObserver("/result[@xmlns='%s']" % NS_DIALBACK,
160                                      self.onResult)
161        self._deferred.errback(failure)
162
163
164
165class ReceivingDialbackInitializer(object):
166    """
167    Server Dialback Initializer for the Receiving Server.
168    """
169
170    implements(ijabber.IInitiatingInitializer)
171
172    _deferred = None
173
174    def __init__(self, xs, thisHost, otherHost, originalStreamID, key):
175        self.xmlstream = xs
176        self.thisHost = thisHost
177        self.otherHost = otherHost
178        self.originalStreamID = originalStreamID
179        self.key = key
180
181
182    def initialize(self):
183        self._deferred = defer.Deferred()
184        self.xmlstream.addObserver(xmlstream.STREAM_ERROR_EVENT,
185                                   self.onStreamError)
186        self.xmlstream.addObserver("/verify[@xmlns='%s']" % NS_DIALBACK,
187                                   self.onVerify)
188
189        verify = domish.Element((NS_DIALBACK, 'verify'))
190        verify['from'] = self.thisHost
191        verify['to'] = self.otherHost
192        verify['id'] = self.originalStreamID
193        verify.addContent(self.key)
194
195        self.xmlstream.send(verify)
196        return self._deferred
197
198
199    def onVerify(self, verify):
200        self.xmlstream.removeObserver(xmlstream.STREAM_ERROR_EVENT,
201                                      self.onStreamError)
202        if verify['id'] != self.originalStreamID:
203            self.xmlstream.sendStreamError(error.StreamError('invalid-id'))
204            self._deferred.errback(DialbackFailed())
205        elif verify['to'] != self.thisHost:
206            self.xmlstream.sendStreamError(error.StreamError('host-unknown'))
207            self._deferred.errback(DialbackFailed())
208        elif verify['from'] != self.otherHost:
209            self.xmlstream.sendStreamError(error.StreamError('invalid-from'))
210            self._deferred.errback(DialbackFailed())
211        elif verify['type'] == 'valid':
212            self._deferred.callback(None)
213        else:
214            self._deferred.errback(DialbackFailed())
215
216
217    def onStreamError(self, failure):
218        self.xmlstream.removeObserver("/verify[@xmlns='%s']" % NS_DIALBACK,
219                                      self.onVerify)
220        self._deferred.errback(failure)
221
222
223
224class XMPPServerConnectAuthenticator(xmlstream.ConnectAuthenticator):
225    """
226    Authenticator for an outgoing XMPP server-to-server connection.
227
228    This authenticator connects to C{otherHost} (the Receiving Server) and then
229    initiates dialback as C{thisHost} (the Originating Server) using
230    L{OriginatingDialbackInitializer}.
231
232    @ivar thisHost: The domain this server connects from (the Originating
233                    Server) .
234    @ivar otherHost: The domain of the server this server connects to (the
235                     Receiving Server).
236    @ivar secret: The shared secret that is used for verifying the validity
237                  of this new connection.
238    """
239    namespace = 'jabber:server'
240
241    def __init__(self, thisHost, otherHost, secret):
242        self.thisHost = thisHost
243        self.otherHost = otherHost
244        self.secret = secret
245        xmlstream.ConnectAuthenticator.__init__(self, otherHost)
246
247
248    def connectionMade(self):
249        self.xmlstream.thisEntity = jid.internJID(self.thisHost)
250        self.xmlstream.prefixes = {xmlstream.NS_STREAMS: 'stream',
251                                   NS_DIALBACK: 'db'}
252        xmlstream.ConnectAuthenticator.connectionMade(self)
253
254
255    def associateWithStream(self, xs):
256        xmlstream.ConnectAuthenticator.associateWithStream(self, xs)
257        init = OriginatingDialbackInitializer(xs, self.thisHost,
258                                              self.otherHost, self.secret)
259        xs.initializers = [init]
260
261
262
263class XMPPServerVerifyAuthenticator(xmlstream.ConnectAuthenticator):
264    """
265    Authenticator for an outgoing connection to verify an incoming connection.
266
267    This authenticator connects to C{otherHost} (the Authoritative Server) and
268    then initiates dialback as C{thisHost} (the Receiving Server) using
269    L{ReceivingDialbackInitializer}.
270
271    @ivar thisHost: The domain this server connects from (the Receiving
272                    Server) .
273    @ivar otherHost: The domain of the server this server connects to (the
274                     Authoritative Server).
275    @ivar originalStreamID: The stream ID of the incoming connection that is
276                            being verified.
277    @ivar key: The key provided by the Receving Server to be verified.
278    """
279    namespace = 'jabber:server'
280
281    def __init__(self, thisHost, otherHost, originalStreamID, key):
282        self.thisHost = thisHost
283        self.otherHost = otherHost
284        self.originalStreamID = originalStreamID
285        self.key = key
286        xmlstream.ConnectAuthenticator.__init__(self, otherHost)
287
288
289    def connectionMade(self):
290        self.xmlstream.thisEntity = jid.internJID(self.thisHost)
291        self.xmlstream.prefixes = {xmlstream.NS_STREAMS: 'stream',
292                                   NS_DIALBACK: 'db'}
293        xmlstream.ConnectAuthenticator.connectionMade(self)
294
295
296    def associateWithStream(self, xs):
297        xmlstream.ConnectAuthenticator.associateWithStream(self, xs)
298        init = ReceivingDialbackInitializer(xs, self.thisHost, self.otherHost,
299                                            self.originalStreamID, self.key)
300        xs.initializers = [init]
301
302
303
304class XMPPServerListenAuthenticator(xmlstream.ListenAuthenticator):
305    """
306    Authenticator for an incoming XMPP server-to-server connection.
307
308    This authenticator handles two types of incoming connections. Regular
309    server-to-server connections are from the Originating Server to the
310    Receiving Server, where this server is the Receiving Server. These
311    connections start out by receiving a dialback key, verifying the
312    key with the Authoritative Server, and then accept normal XMPP stanzas.
313
314    The other type of connections is from a Receiving Server to an
315    Authoritative Server, where this server acts as the Authoritative Server.
316    These connections are used to verify the validity of an outgoing connection
317    from this server. In this case, this server receives a verification
318    request, checks the key and then returns the result.
319
320    @ivar service: The service that keeps the list of domains we accept
321                   connections for.
322    """
323    namespace = 'jabber:server'
324
325    def __init__(self, service):
326        xmlstream.ListenAuthenticator.__init__(self)
327        self.service = service
328
329
330    def streamStarted(self, rootElement):
331        xmlstream.ListenAuthenticator.streamStarted(self, rootElement)
332
333        # Compatibility fix for pre-8.2 implementations of ListenAuthenticator
334        if not self.xmlstream.sid:
335            self.xmlstream.sid = randbytes.secureRandom(8).encode('hex')
336
337        if self.xmlstream.thisEntity:
338            targetDomain = self.xmlstream.thisEntity.host
339        else:
340            targetDomain = self.service.defaultDomain
341
342        def prepareStream(domain):
343            self.xmlstream.namespace = self.namespace
344            self.xmlstream.prefixes = {xmlstream.NS_STREAMS: 'stream',
345                                       NS_DIALBACK: 'db'}
346            if domain:
347                self.xmlstream.thisEntity = jid.internJID(domain)
348
349        try:
350            if xmlstream.NS_STREAMS != rootElement.uri or \
351               self.namespace != self.xmlstream.namespace or \
352               ('db', NS_DIALBACK) not in rootElement.localPrefixes.iteritems():
353                raise error.StreamError('invalid-namespace')
354
355            if targetDomain and targetDomain not in self.service.domains:
356                raise error.StreamError('host-unknown')
357        except error.StreamError, exc:
358            prepareStream(self.service.defaultDomain)
359            self.xmlstream.sendStreamError(exc)
360            return
361
362        self.xmlstream.addObserver("//verify[@xmlns='%s']" % NS_DIALBACK,
363                                   trapStreamError(self.xmlstream,
364                                                   self.onVerify))
365        self.xmlstream.addObserver("//result[@xmlns='%s']" % NS_DIALBACK,
366                                   self.onResult)
367
368        prepareStream(targetDomain)
369        self.xmlstream.sendHeader()
370
371        if self.xmlstream.version >= (1, 0):
372            features = domish.Element((xmlstream.NS_STREAMS, 'features'))
373            self.xmlstream.send(features)
374
375
376    def onVerify(self, verify):
377        try:
378            receivingServer = jid.JID(verify['from']).host
379            originatingServer = jid.JID(verify['to']).host
380        except (KeyError, jid.InvalidFormat):
381            raise error.StreamError('improper-addressing')
382
383        if originatingServer not in self.service.domains:
384            raise error.StreamError('host-unknown')
385
386        if (self.xmlstream.otherEntity and
387            receivingServer != self.xmlstream.otherEntity.host):
388            raise error.StreamError('invalid-from')
389
390        streamID = verify.getAttribute('id', '')
391        key = unicode(verify)
392
393        calculatedKey = generateKey(self.service.secret, receivingServer,
394                                    originatingServer, streamID)
395        validity = (key == calculatedKey) and 'valid' or 'invalid'
396
397        reply = domish.Element((NS_DIALBACK, 'verify'))
398        reply['from'] = originatingServer
399        reply['to'] = receivingServer
400        reply['id'] = streamID
401        reply['type'] = validity
402        self.xmlstream.send(reply)
403
404
405    def onResult(self, result):
406        def reply(validity):
407            reply = domish.Element((NS_DIALBACK, 'result'))
408            reply['from'] = result['to']
409            reply['to'] = result['from']
410            reply['type'] = validity
411            self.xmlstream.send(reply)
412
413        def valid(xs):
414            reply('valid')
415            if not self.xmlstream.thisEntity:
416                self.xmlstream.thisEntity = jid.internJID(receivingServer)
417            self.xmlstream.otherEntity = jid.internJID(originatingServer)
418            self.xmlstream.dispatch(self.xmlstream,
419                                    xmlstream.STREAM_AUTHD_EVENT)
420
421        def invalid(failure):
422            log.err(failure)
423            reply('invalid')
424
425        receivingServer = result['to']
426        originatingServer = result['from']
427        key = unicode(result)
428
429        d = self.service.validateConnection(receivingServer, originatingServer,
430                                            self.xmlstream.sid, key)
431        d.addCallbacks(valid, invalid)
432        return d
433
434
435
436class DeferredS2SClientFactory(DeferredXmlStreamFactory):
437    """
438    Deferred firing factory for initiating XMPP server-to-server connection.
439
440    The deferred has its callbacks called upon succesful authentication with
441    the other server. In case of failed authentication or connection, the
442    deferred will have its errbacks called instead.
443    """
444
445    logTraffic = False
446
447    def __init__(self, authenticator):
448        DeferredXmlStreamFactory.__init__(self, authenticator)
449
450        self.addBootstrap(xmlstream.STREAM_CONNECTED_EVENT,
451                          self.onConnectionMade)
452
453        self.serial = 0
454
455
456    def onConnectionMade(self, xs):
457        xs.serial = self.serial
458        self.serial += 1
459
460        def logDataIn(buf):
461            log.msg("RECV (%d): %r" % (xs.serial, buf))
462
463        def logDataOut(buf):
464            log.msg("SEND (%d): %r" % (xs.serial, buf))
465
466        if self.logTraffic:
467            xs.rawDataInFn = logDataIn
468            xs.rawDataOutFn = logDataOut
469
470
471
472def initiateS2S(factory):
473    domain = factory.authenticator.otherHost
474    c = XMPPServerConnector(reactor, domain, factory)
475    c.connect()
476    return factory.deferred
477
478
479
480class XMPPS2SServerFactory(XmlStreamServerFactory):
481    """
482    XMPP Server-to-Server Server factory.
483
484    This factory accepts XMPP server-to-server connections.
485    """
486
487    logTraffic = False
488
489    def __init__(self, service):
490        self.service = service
491
492        def authenticatorFactory():
493            return XMPPServerListenAuthenticator(service)
494
495        XmlStreamServerFactory.__init__(self, authenticatorFactory)
496        self.addBootstrap(xmlstream.STREAM_CONNECTED_EVENT,
497                          self.onConnectionMade)
498        self.addBootstrap(xmlstream.STREAM_AUTHD_EVENT,
499                          self.onAuthenticated)
500
501        self.serial = 0
502
503
504    def onConnectionMade(self, xs):
505        """
506        Called when a server-to-server connection was made.
507
508        This enables traffic debugging on incoming streams.
509        """
510        xs.serial = self.serial
511        self.serial += 1
512
513        def logDataIn(buf):
514            log.msg("RECV (%d): %r" % (xs.serial, buf))
515
516        def logDataOut(buf):
517            log.msg("SEND (%d): %r" % (xs.serial, buf))
518
519        if self.logTraffic:
520            xs.rawDataInFn = logDataIn
521            xs.rawDataOutFn = logDataOut
522
523        xs.addObserver(xmlstream.STREAM_ERROR_EVENT, self.onError)
524
525
526    def onAuthenticated(self, xs):
527        thisHost = xs.thisEntity.host
528        otherHost = xs.otherEntity.host
529
530        log.msg("Incoming connection %d from %r to %r established" %
531                (xs.serial, otherHost, thisHost))
532
533        xs.addObserver(xmlstream.STREAM_END_EVENT, self.onConnectionLost,
534                                                   0, xs)
535        xs.addObserver('/*', self.onElement, 0, xs)
536
537
538    def onConnectionLost(self, xs, reason):
539        thisHost = xs.thisEntity.host
540        otherHost = xs.otherEntity.host
541
542        log.msg("Incoming connection %d from %r to %r disconnected" %
543                (xs.serial, otherHost, thisHost))
544
545
546    def onError(self, reason):
547        log.err(reason, "Stream Error")
548
549
550    def onElement(self, xs, element):
551        """
552        Called when an element was received from one of the connected streams.
553
554        """
555        if element.handled:
556            return
557        else:
558            self.service.dispatch(xs, element)
559
560
561
562class ServerService(object):
563    """
564    Service for managing XMPP server to server connections.
565    """
566
567    logTraffic = False
568
569    def __init__(self, router, domain=None, secret=None):
570        self.router = router
571
572        self.defaultDomain = domain
573        self.domains = set()
574        if self.defaultDomain:
575            self.domains.add(self.defaultDomain)
576
577        if secret is not None:
578            self.secret = secret
579        else:
580            self.secret = randbytes.secureRandom(16).encode('hex')
581
582        self._outgoingStreams = {}
583        self._outgoingQueues = {}
584        self._outgoingConnecting = set()
585        self.serial = 0
586
587        pipe = XmlPipe()
588        self.xmlstream = pipe.source
589        self.router.addRoute(None, pipe.sink)
590        self.xmlstream.addObserver('/*', self.send)
591
592
593    def outgoingInitialized(self, xs):
594        thisHost = xs.thisEntity.host
595        otherHost = xs.otherEntity.host
596
597        log.msg("Outgoing connection %d from %r to %r established" %
598                (xs.serial, thisHost, otherHost))
599
600        self._outgoingStreams[thisHost, otherHost] = xs
601        xs.addObserver(xmlstream.STREAM_END_EVENT,
602                       lambda _: self.outgoingDisconnected(xs))
603
604        if (thisHost, otherHost) in self._outgoingQueues:
605            for element in self._outgoingQueues[thisHost, otherHost]:
606                xs.send(element)
607            del self._outgoingQueues[thisHost, otherHost]
608
609
610    def outgoingDisconnected(self, xs):
611        thisHost = xs.thisEntity.host
612        otherHost = xs.otherEntity.host
613
614        log.msg("Outgoing connection %d from %r to %r disconnected" %
615                (xs.serial, thisHost, otherHost))
616
617        del self._outgoingStreams[thisHost, otherHost]
618
619
620    def initiateOutgoingStream(self, thisHost, otherHost):
621        """
622        Initiate an outgoing XMPP server-to-server connection.
623        """
624
625        def resetConnecting(_):
626            self._outgoingConnecting.remove((thisHost, otherHost))
627
628        if (thisHost, otherHost) in self._outgoingConnecting:
629            return
630
631        authenticator = XMPPServerConnectAuthenticator(thisHost,
632                                                       otherHost,
633                                                       self.secret)
634        factory = DeferredS2SClientFactory(authenticator)
635        factory.addBootstrap(xmlstream.STREAM_AUTHD_EVENT,
636                             self.outgoingInitialized)
637        factory.logTraffic = self.logTraffic
638
639        self._outgoingConnecting.add((thisHost, otherHost))
640
641        d = initiateS2S(factory)
642        d.addBoth(resetConnecting)
643        return d
644
645
646    def validateConnection(self, thisHost, otherHost, sid, key):
647        """
648        Validate an incoming XMPP server-to-server connection.
649        """
650
651        def connected(xs):
652            # Set up stream for immediate disconnection.
653            def disconnect(_):
654                xs.transport.loseConnection()
655            xs.addObserver(xmlstream.STREAM_AUTHD_EVENT, disconnect)
656            xs.addObserver(xmlstream.INIT_FAILED_EVENT, disconnect)
657
658        authenticator = XMPPServerVerifyAuthenticator(thisHost, otherHost,
659                                                      sid, key)
660        factory = DeferredS2SClientFactory(authenticator)
661        factory.addBootstrap(xmlstream.STREAM_CONNECTED_EVENT, connected)
662        factory.logTraffic = self.logTraffic
663
664        d = initiateS2S(factory)
665        return d
666
667
668    def send(self, stanza):
669        """
670        Send stanza to the proper XML Stream.
671
672        This uses addressing embedded in the stanza to find the correct stream
673        to forward the stanza to.
674        """
675
676        otherHost = jid.internJID(stanza["to"]).host
677        thisHost = jid.internJID(stanza["from"]).host
678
679        if (thisHost, otherHost) not in self._outgoingStreams:
680            # There is no connection with the destination (yet). Cache the
681            # outgoing stanza until the connection has been established.
682            # XXX: If the connection cannot be established, the queue should
683            #      be emptied at some point.
684            if (thisHost, otherHost) not in self._outgoingQueues:
685                self._outgoingQueues[(thisHost, otherHost)] = []
686            self._outgoingQueues[(thisHost, otherHost)].append(stanza)
687            self.initiateOutgoingStream(thisHost, otherHost)
688        else:
689            self._outgoingStreams[(thisHost, otherHost)].send(stanza)
690
691
692    def dispatch(self, xs, stanza):
693        """
694        Send on element to be routed within the server.
695        """
696        stanzaFrom = stanza.getAttribute('from')
697        stanzaTo = stanza.getAttribute('to')
698
699        if not stanzaFrom or not stanzaTo:
700            xs.sendStreamError(error.StreamError('improper-addressing'))
701        else:
702            try:
703                sender = jid.internJID(stanzaFrom)
704                recipient = jid.internJID(stanzaTo)
705            except jid.InvalidFormat:
706                log.msg("Dropping error stanza with malformed JID")
707
708            if sender.host != xs.otherEntity.host:
709                xs.sendStreamError(error.StreamError('invalid-from'))
710            else:
711                self.xmlstream.send(stanza)
Note: See TracBrowser for help on using the repository browser.