source: wokkel/server.py @ 206:a32614b03670

Last change on this file since 206:a32614b03670 was 206:a32614b03670, checked in by Ralph Meijer <ralphm@…>, 4 years ago

imported patch py3-server.patch

  • Property exe set to *
File size: 23.1 KB
Line 
1# -*- test-case-name: wokkel.test.test_server -*-
2#
3# Copyright (c) Ralph Meijer.
4# See LICENSE for details.
5
6"""
7XMPP Server-to-Server protocol.
8
9This module implements several aspects of XMPP server-to-server communications
10as described in XMPP Core (RFC 3920). Refer to that document for the meaning
11of the used terminology.
12"""
13
14from __future__ import division, absolute_import
15
16import binascii
17from hashlib import sha256
18import hmac
19
20from zope.interface import implementer
21
22from twisted.internet import defer, reactor
23from twisted.names.srvconnect import SRVConnector
24from twisted.python import log, randbytes
25from twisted.python.compat import iteritems, unicode
26from twisted.words.protocols.jabber import error, ijabber, jid, xmlstream
27from twisted.words.xish import domish
28
29from wokkel.generic import DeferredXmlStreamFactory, XmlPipe
30
31NS_DIALBACK = 'jabber:server:dialback'
32
33def generateKey(secret, receivingServer, originatingServer, streamID):
34    """
35    Generate a dialback key for server-to-server XMPP Streams.
36
37    The dialback key is generated using the algorithm described in
38    U{XEP-0185<http://xmpp.org/extensions/xep-0185.html>}. The used
39    terminology for the parameters is described in RFC-3920.
40
41    @param secret: the shared secret known to the Originating Server and
42                   Authoritive Server.
43    @type secret: L{unicode}
44    @param receivingServer: the Receiving Server host name.
45    @type receivingServer: L{unicode}
46    @param originatingServer: the Originating Server host name.
47    @type originatingServer: L{unicode}
48    @param streamID: the Stream ID as generated by the Receiving Server.
49    @type streamID: L{unicode}
50    @return: hexadecimal digest of the generated key.
51    @type: C{str}
52    """
53
54    hashObject = sha256()
55    hashObject.update(secret.encode('ascii'))
56    hashedSecret = hashObject.hexdigest()
57    message = " ".join([receivingServer, originatingServer, streamID])
58    hash = hmac.HMAC(hashedSecret.encode('ascii'),
59                     message.encode('ascii'),
60                     digestmod=sha256)
61    return hash.hexdigest()
62
63
64def trapStreamError(xs, observer):
65    """
66    Trap stream errors.
67
68    This wraps an observer to catch exceptions. In case of a
69    L{error.StreamError}, it is send over the given XML stream. All other
70    exceptions yield a C{'internal-server-error'} stream error, that is
71    sent over the stream, while the exception is logged.
72
73    @return: Wrapped observer
74    """
75
76    def wrappedObserver(element):
77        try:
78            observer(element)
79        except error.StreamError as exc:
80            xs.sendStreamError(exc)
81        except:
82            log.err()
83            exc = error.StreamError('internal-server-error')
84            xs.sendStreamError(exc)
85
86    return wrappedObserver
87
88
89class XMPPServerConnector(SRVConnector):
90    def __init__(self, reactor, domain, factory):
91        SRVConnector.__init__(self, reactor, 'xmpp-server', domain, factory)
92
93
94    def pickServer(self):
95        host, port = SRVConnector.pickServer(self)
96
97        if not self.servers and not self.orderedServers:
98            # no SRV record, fall back..
99            port = 5269
100
101        return host, port
102
103
104class DialbackFailed(Exception):
105    pass
106
107
108
109@implementer(ijabber.IInitiatingInitializer)
110class OriginatingDialbackInitializer(object):
111    """
112    Server Dialback Initializer for the Orginating Server.
113    """
114
115    _deferred = None
116
117    def __init__(self, xs, thisHost, otherHost, secret):
118        self.xmlstream = xs
119        self.thisHost = thisHost
120        self.otherHost = otherHost
121        self.secret = secret
122
123
124    def initialize(self):
125        self._deferred = defer.Deferred()
126        self.xmlstream.addObserver(xmlstream.STREAM_ERROR_EVENT,
127                                   self.onStreamError)
128        self.xmlstream.addObserver("/result[@xmlns='%s']" % NS_DIALBACK,
129                                   self.onResult)
130
131        key = generateKey(self.secret, self.otherHost,
132                          self.thisHost, self.xmlstream.sid)
133
134        result = domish.Element((NS_DIALBACK, 'result'))
135        result['from'] = self.thisHost
136        result['to'] = self.otherHost
137        result.addContent(key)
138
139        self.xmlstream.send(result)
140
141        return self._deferred
142
143
144    def onResult(self, result):
145        self.xmlstream.removeObserver(xmlstream.STREAM_ERROR_EVENT,
146                                      self.onStreamError)
147        if result['type'] == 'valid':
148            self.xmlstream.otherEntity = jid.internJID(self.otherHost)
149            self._deferred.callback(None)
150        else:
151            self._deferred.errback(DialbackFailed())
152
153
154    def onStreamError(self, failure):
155        self.xmlstream.removeObserver("/result[@xmlns='%s']" % NS_DIALBACK,
156                                      self.onResult)
157        self._deferred.errback(failure)
158
159
160
161@implementer(ijabber.IInitiatingInitializer)
162class ReceivingDialbackInitializer(object):
163    """
164    Server Dialback Initializer for the Receiving Server.
165    """
166
167    _deferred = None
168
169    def __init__(self, xs, thisHost, otherHost, originalStreamID, key):
170        self.xmlstream = xs
171        self.thisHost = thisHost
172        self.otherHost = otherHost
173        self.originalStreamID = originalStreamID
174        self.key = key
175
176
177    def initialize(self):
178        self._deferred = defer.Deferred()
179        self.xmlstream.addObserver(xmlstream.STREAM_ERROR_EVENT,
180                                   self.onStreamError)
181        self.xmlstream.addObserver("/verify[@xmlns='%s']" % NS_DIALBACK,
182                                   self.onVerify)
183
184        verify = domish.Element((NS_DIALBACK, 'verify'))
185        verify['from'] = self.thisHost
186        verify['to'] = self.otherHost
187        verify['id'] = self.originalStreamID
188        verify.addContent(self.key)
189
190        self.xmlstream.send(verify)
191        return self._deferred
192
193
194    def onVerify(self, verify):
195        self.xmlstream.removeObserver(xmlstream.STREAM_ERROR_EVENT,
196                                      self.onStreamError)
197        if verify['id'] != self.originalStreamID:
198            self.xmlstream.sendStreamError(error.StreamError('invalid-id'))
199            self._deferred.errback(DialbackFailed())
200        elif verify['to'] != self.thisHost:
201            self.xmlstream.sendStreamError(error.StreamError('host-unknown'))
202            self._deferred.errback(DialbackFailed())
203        elif verify['from'] != self.otherHost:
204            self.xmlstream.sendStreamError(error.StreamError('invalid-from'))
205            self._deferred.errback(DialbackFailed())
206        elif verify['type'] == 'valid':
207            self._deferred.callback(None)
208        else:
209            self._deferred.errback(DialbackFailed())
210
211
212    def onStreamError(self, failure):
213        self.xmlstream.removeObserver("/verify[@xmlns='%s']" % NS_DIALBACK,
214                                      self.onVerify)
215        self._deferred.errback(failure)
216
217
218
219class XMPPServerConnectAuthenticator(xmlstream.ConnectAuthenticator):
220    """
221    Authenticator for an outgoing XMPP server-to-server connection.
222
223    This authenticator connects to C{otherHost} (the Receiving Server) and then
224    initiates dialback as C{thisHost} (the Originating Server) using
225    L{OriginatingDialbackInitializer}.
226
227    @ivar thisHost: The domain this server connects from (the Originating
228                    Server) .
229    @ivar otherHost: The domain of the server this server connects to (the
230                     Receiving Server).
231    @ivar secret: The shared secret that is used for verifying the validity
232                  of this new connection.
233    """
234    namespace = 'jabber:server'
235
236    def __init__(self, thisHost, otherHost, secret):
237        self.thisHost = thisHost
238        self.otherHost = otherHost
239        self.secret = secret
240        xmlstream.ConnectAuthenticator.__init__(self, otherHost)
241
242
243    def connectionMade(self):
244        self.xmlstream.thisEntity = jid.internJID(self.thisHost)
245        self.xmlstream.prefixes = {xmlstream.NS_STREAMS: 'stream',
246                                   NS_DIALBACK: 'db'}
247        xmlstream.ConnectAuthenticator.connectionMade(self)
248
249
250    def associateWithStream(self, xs):
251        xmlstream.ConnectAuthenticator.associateWithStream(self, xs)
252        init = OriginatingDialbackInitializer(xs, self.thisHost,
253                                              self.otherHost, self.secret)
254        xs.initializers = [init]
255
256
257
258class XMPPServerVerifyAuthenticator(xmlstream.ConnectAuthenticator):
259    """
260    Authenticator for an outgoing connection to verify an incoming connection.
261
262    This authenticator connects to C{otherHost} (the Authoritative Server) and
263    then initiates dialback as C{thisHost} (the Receiving Server) using
264    L{ReceivingDialbackInitializer}.
265
266    @ivar thisHost: The domain this server connects from (the Receiving
267                    Server) .
268    @ivar otherHost: The domain of the server this server connects to (the
269                     Authoritative Server).
270    @ivar originalStreamID: The stream ID of the incoming connection that is
271                            being verified.
272    @ivar key: The key provided by the Receving Server to be verified.
273    """
274    namespace = 'jabber:server'
275
276    def __init__(self, thisHost, otherHost, originalStreamID, key):
277        self.thisHost = thisHost
278        self.otherHost = otherHost
279        self.originalStreamID = originalStreamID
280        self.key = key
281        xmlstream.ConnectAuthenticator.__init__(self, otherHost)
282
283
284    def connectionMade(self):
285        self.xmlstream.thisEntity = jid.internJID(self.thisHost)
286        self.xmlstream.prefixes = {xmlstream.NS_STREAMS: 'stream',
287                                   NS_DIALBACK: 'db'}
288        xmlstream.ConnectAuthenticator.connectionMade(self)
289
290
291    def associateWithStream(self, xs):
292        xmlstream.ConnectAuthenticator.associateWithStream(self, xs)
293        init = ReceivingDialbackInitializer(xs, self.thisHost, self.otherHost,
294                                            self.originalStreamID, self.key)
295        xs.initializers = [init]
296
297
298
299class XMPPServerListenAuthenticator(xmlstream.ListenAuthenticator):
300    """
301    Authenticator for an incoming XMPP server-to-server connection.
302
303    This authenticator handles two types of incoming connections. Regular
304    server-to-server connections are from the Originating Server to the
305    Receiving Server, where this server is the Receiving Server. These
306    connections start out by receiving a dialback key, verifying the
307    key with the Authoritative Server, and then accept normal XMPP stanzas.
308
309    The other type of connections is from a Receiving Server to an
310    Authoritative Server, where this server acts as the Authoritative Server.
311    These connections are used to verify the validity of an outgoing connection
312    from this server. In this case, this server receives a verification
313    request, checks the key and then returns the result.
314
315    @ivar service: The service that keeps the list of domains we accept
316                   connections for.
317    """
318    namespace = 'jabber:server'
319
320    def __init__(self, service):
321        xmlstream.ListenAuthenticator.__init__(self)
322        self.service = service
323
324
325    def streamStarted(self, rootElement):
326        xmlstream.ListenAuthenticator.streamStarted(self, rootElement)
327
328        if self.xmlstream.thisEntity:
329            targetDomain = self.xmlstream.thisEntity.host
330        else:
331            targetDomain = self.service.defaultDomain
332
333        def prepareStream(domain):
334            self.xmlstream.namespace = self.namespace
335            self.xmlstream.prefixes = {xmlstream.NS_STREAMS: 'stream',
336                                       NS_DIALBACK: 'db'}
337            if domain:
338                self.xmlstream.thisEntity = jid.internJID(domain)
339
340        try:
341            if xmlstream.NS_STREAMS != rootElement.uri or \
342               self.namespace != self.xmlstream.namespace or \
343               ('db', NS_DIALBACK) not in iteritems(rootElement.localPrefixes):
344                raise error.StreamError('invalid-namespace')
345
346            if targetDomain and targetDomain not in self.service.domains:
347                raise error.StreamError('host-unknown')
348        except error.StreamError as exc:
349            prepareStream(self.service.defaultDomain)
350            self.xmlstream.sendStreamError(exc)
351            return
352
353        self.xmlstream.addObserver("//verify[@xmlns='%s']" % NS_DIALBACK,
354                                   trapStreamError(self.xmlstream,
355                                                   self.onVerify))
356        self.xmlstream.addObserver("//result[@xmlns='%s']" % NS_DIALBACK,
357                                   self.onResult)
358
359        prepareStream(targetDomain)
360        self.xmlstream.sendHeader()
361
362        if self.xmlstream.version >= (1, 0):
363            features = domish.Element((xmlstream.NS_STREAMS, 'features'))
364            self.xmlstream.send(features)
365
366
367    def onVerify(self, verify):
368        try:
369            receivingServer = jid.JID(verify['from']).host
370            originatingServer = jid.JID(verify['to']).host
371        except (KeyError, jid.InvalidFormat):
372            raise error.StreamError('improper-addressing')
373
374        if originatingServer not in self.service.domains:
375            raise error.StreamError('host-unknown')
376
377        if (self.xmlstream.otherEntity and
378            receivingServer != self.xmlstream.otherEntity.host):
379            raise error.StreamError('invalid-from')
380
381        streamID = verify.getAttribute('id', '')
382        key = unicode(verify)
383
384        calculatedKey = generateKey(self.service.secret, receivingServer,
385                                    originatingServer, streamID)
386        validity = (key == calculatedKey) and 'valid' or 'invalid'
387
388        reply = domish.Element((NS_DIALBACK, 'verify'))
389        reply['from'] = originatingServer
390        reply['to'] = receivingServer
391        reply['id'] = streamID
392        reply['type'] = validity
393        self.xmlstream.send(reply)
394
395
396    def onResult(self, result):
397        def reply(validity):
398            reply = domish.Element((NS_DIALBACK, 'result'))
399            reply['from'] = result['to']
400            reply['to'] = result['from']
401            reply['type'] = validity
402            self.xmlstream.send(reply)
403
404        def valid(xs):
405            reply('valid')
406            if not self.xmlstream.thisEntity:
407                self.xmlstream.thisEntity = jid.internJID(receivingServer)
408            self.xmlstream.otherEntity = jid.internJID(originatingServer)
409            self.xmlstream.dispatch(self.xmlstream,
410                                    xmlstream.STREAM_AUTHD_EVENT)
411
412        def invalid(failure):
413            log.err(failure)
414            reply('invalid')
415
416        receivingServer = result['to']
417        originatingServer = result['from']
418        key = unicode(result)
419
420        d = self.service.validateConnection(receivingServer, originatingServer,
421                                            self.xmlstream.sid, key)
422        d.addCallbacks(valid, invalid)
423        return d
424
425
426
427class DeferredS2SClientFactory(DeferredXmlStreamFactory):
428    """
429    Deferred firing factory for initiating XMPP server-to-server connection.
430
431    The deferred has its callbacks called upon succesful authentication with
432    the other server. In case of failed authentication or connection, the
433    deferred will have its errbacks called instead.
434    """
435
436    logTraffic = False
437
438    def __init__(self, authenticator):
439        DeferredXmlStreamFactory.__init__(self, authenticator)
440
441        self.addBootstrap(xmlstream.STREAM_CONNECTED_EVENT,
442                          self.onConnectionMade)
443
444        self.serial = 0
445
446
447    def onConnectionMade(self, xs):
448        xs.serial = self.serial
449        self.serial += 1
450
451        def logDataIn(buf):
452            log.msg("RECV (%d): %r" % (xs.serial, buf))
453
454        def logDataOut(buf):
455            log.msg("SEND (%d): %r" % (xs.serial, buf))
456
457        if self.logTraffic:
458            xs.rawDataInFn = logDataIn
459            xs.rawDataOutFn = logDataOut
460
461
462
463def initiateS2S(factory):
464    domain = factory.authenticator.otherHost.encode('idna')
465    c = XMPPServerConnector(reactor, domain, factory)
466    c.connect()
467    return factory.deferred
468
469
470
471class XMPPS2SServerFactory(xmlstream.XmlStreamServerFactory):
472    """
473    XMPP Server-to-Server Server factory.
474
475    This factory accepts XMPP server-to-server connections.
476    """
477
478    logTraffic = False
479
480    def __init__(self, service):
481        self.service = service
482
483        def authenticatorFactory():
484            return XMPPServerListenAuthenticator(service)
485
486        xmlstream.XmlStreamServerFactory.__init__(self, authenticatorFactory)
487        self.addBootstrap(xmlstream.STREAM_CONNECTED_EVENT,
488                          self.onConnectionMade)
489        self.addBootstrap(xmlstream.STREAM_AUTHD_EVENT,
490                          self.onAuthenticated)
491
492        self.serial = 0
493
494
495    def onConnectionMade(self, xs):
496        """
497        Called when a server-to-server connection was made.
498
499        This enables traffic debugging on incoming streams.
500        """
501        xs.serial = self.serial
502        self.serial += 1
503
504        def logDataIn(buf):
505            log.msg("RECV (%d): %r" % (xs.serial, buf))
506
507        def logDataOut(buf):
508            log.msg("SEND (%d): %r" % (xs.serial, buf))
509
510        if self.logTraffic:
511            xs.rawDataInFn = logDataIn
512            xs.rawDataOutFn = logDataOut
513
514        xs.addObserver(xmlstream.STREAM_ERROR_EVENT, self.onError)
515
516
517    def onAuthenticated(self, xs):
518        thisHost = xs.thisEntity.host
519        otherHost = xs.otherEntity.host
520
521        log.msg("Incoming connection %d from %r to %r established" %
522                (xs.serial, otherHost, thisHost))
523
524        xs.addObserver(xmlstream.STREAM_END_EVENT, self.onConnectionLost,
525                                                   0, xs)
526        xs.addObserver('/*', self.onElement, 0, xs)
527
528
529    def onConnectionLost(self, xs, reason):
530        thisHost = xs.thisEntity.host
531        otherHost = xs.otherEntity.host
532
533        log.msg("Incoming connection %d from %r to %r disconnected" %
534                (xs.serial, otherHost, thisHost))
535
536
537    def onError(self, reason):
538        log.err(reason, "Stream Error")
539
540
541    def onElement(self, xs, element):
542        """
543        Called when an element was received from one of the connected streams.
544
545        """
546        if element.handled:
547            return
548        else:
549            self.service.dispatch(xs, element)
550
551
552
553class ServerService(object):
554    """
555    Service for managing XMPP server to server connections.
556    """
557
558    logTraffic = False
559
560    def __init__(self, router, domain=None, secret=None):
561        self.router = router
562
563        self.defaultDomain = domain
564        self.domains = set()
565        if self.defaultDomain:
566            self.domains.add(self.defaultDomain)
567
568        if secret is not None:
569            self.secret = secret
570        else:
571            self.secret = binascii.hexlify(randbytes.secureRandom(16))
572
573        self._outgoingStreams = {}
574        self._outgoingQueues = {}
575        self._outgoingConnecting = set()
576        self.serial = 0
577
578        pipe = XmlPipe()
579        self.xmlstream = pipe.source
580        self.router.addRoute(None, pipe.sink)
581        self.xmlstream.addObserver('/*', self.send)
582
583
584    def outgoingInitialized(self, xs):
585        thisHost = xs.thisEntity.host
586        otherHost = xs.otherEntity.host
587
588        log.msg("Outgoing connection %d from %r to %r established" %
589                (xs.serial, thisHost, otherHost))
590
591        self._outgoingStreams[thisHost, otherHost] = xs
592        xs.addObserver(xmlstream.STREAM_END_EVENT,
593                       lambda _: self.outgoingDisconnected(xs))
594
595        if (thisHost, otherHost) in self._outgoingQueues:
596            for element in self._outgoingQueues[thisHost, otherHost]:
597                xs.send(element)
598            del self._outgoingQueues[thisHost, otherHost]
599
600
601    def outgoingDisconnected(self, xs):
602        thisHost = xs.thisEntity.host
603        otherHost = xs.otherEntity.host
604
605        log.msg("Outgoing connection %d from %r to %r disconnected" %
606                (xs.serial, thisHost, otherHost))
607
608        del self._outgoingStreams[thisHost, otherHost]
609
610
611    def initiateOutgoingStream(self, thisHost, otherHost):
612        """
613        Initiate an outgoing XMPP server-to-server connection.
614        """
615
616        def resetConnecting(_):
617            self._outgoingConnecting.remove((thisHost, otherHost))
618
619        if (thisHost, otherHost) in self._outgoingConnecting:
620            return
621
622        authenticator = XMPPServerConnectAuthenticator(thisHost,
623                                                       otherHost,
624                                                       self.secret)
625        factory = DeferredS2SClientFactory(authenticator)
626        factory.addBootstrap(xmlstream.STREAM_AUTHD_EVENT,
627                             self.outgoingInitialized)
628        factory.logTraffic = self.logTraffic
629
630        self._outgoingConnecting.add((thisHost, otherHost))
631
632        d = initiateS2S(factory)
633        d.addBoth(resetConnecting)
634        return d
635
636
637    def validateConnection(self, thisHost, otherHost, sid, key):
638        """
639        Validate an incoming XMPP server-to-server connection.
640        """
641
642        def connected(xs):
643            # Set up stream for immediate disconnection.
644            def disconnect(_):
645                xs.transport.loseConnection()
646            xs.addObserver(xmlstream.STREAM_AUTHD_EVENT, disconnect)
647            xs.addObserver(xmlstream.INIT_FAILED_EVENT, disconnect)
648
649        authenticator = XMPPServerVerifyAuthenticator(thisHost, otherHost,
650                                                      sid, key)
651        factory = DeferredS2SClientFactory(authenticator)
652        factory.addBootstrap(xmlstream.STREAM_CONNECTED_EVENT, connected)
653        factory.logTraffic = self.logTraffic
654
655        d = initiateS2S(factory)
656        return d
657
658
659    def send(self, stanza):
660        """
661        Send stanza to the proper XML Stream.
662
663        This uses addressing embedded in the stanza to find the correct stream
664        to forward the stanza to.
665        """
666
667        otherHost = jid.internJID(stanza["to"]).host
668        thisHost = jid.internJID(stanza["from"]).host
669
670        if (thisHost, otherHost) not in self._outgoingStreams:
671            # There is no connection with the destination (yet). Cache the
672            # outgoing stanza until the connection has been established.
673            # XXX: If the connection cannot be established, the queue should
674            #      be emptied at some point.
675            if (thisHost, otherHost) not in self._outgoingQueues:
676                self._outgoingQueues[(thisHost, otherHost)] = []
677            self._outgoingQueues[(thisHost, otherHost)].append(stanza)
678            self.initiateOutgoingStream(thisHost, otherHost)
679        else:
680            self._outgoingStreams[(thisHost, otherHost)].send(stanza)
681
682
683    def dispatch(self, xs, stanza):
684        """
685        Send on element to be routed within the server.
686        """
687        stanzaFrom = stanza.getAttribute('from')
688        stanzaTo = stanza.getAttribute('to')
689
690        if not stanzaFrom or not stanzaTo:
691            xs.sendStreamError(error.StreamError('improper-addressing'))
692        else:
693            try:
694                sender = jid.internJID(stanzaFrom)
695                jid.internJID(stanzaTo)
696            except jid.InvalidFormat:
697                log.msg("Dropping error stanza with malformed JID")
698
699            if sender.host != xs.otherEntity.host:
700                xs.sendStreamError(error.StreamError('invalid-from'))
701            else:
702                self.xmlstream.send(stanza)
Note: See TracBrowser for help on using the repository browser.