source: ralphm-patches/s2s.patch @ 11:8294ad7253bd

Last change on this file since 11:8294ad7253bd was 11:8294ad7253bd, checked in by Ralph Meijer <ralphm@…>, 13 years ago

Allow no default domain, generate a secret if not provided.

File size: 39.7 KB
RevLine 
[9]1diff -r 313d45b505a7 wokkel/server.py
[6]2--- /dev/null   Thu Jan 01 00:00:00 1970 +0000
[11]3+++ b/wokkel/server.py  Fri Apr 10 09:38:01 2009 +0200
4@@ -0,0 +1,709 @@
[6]5+# -*- test-case-name: wokkel.test.test_server -*-
6+#
7+# Copyright (c) 2003-2008 Ralph Meijer
8+# See LICENSE for details.
9+
10+"""
11+XMPP Server-to-Server protocol.
12+
13+This module implements several aspects of XMPP server-to-server communications
14+as described in XMPP Core (RFC 3920). Refer to that document for the meaning
15+of the used terminology.
16+"""
17+
18+# hashlib is new in Python 2.5, try that first.
19+try:
20+    from hashlib import sha256
[10]21+    digestmod = sha256
[6]22+except ImportError:
[10]23+    import Crypto.Hash.SHA256 as digestmod
24+    sha256 = digestmod.new
[6]25+
26+import hmac
27+
28+from zope.interface import implements
29+
30+from twisted.application import service
31+from twisted.internet import defer, reactor
32+from twisted.names.srvconnect import SRVConnector
[11]33+from twisted.python import log, randbytes
[6]34+from twisted.words.protocols.jabber import error, ijabber, jid, xmlstream
35+from twisted.words.xish import domish
36+
37+from wokkel.generic import DeferredXmlStreamFactory, XmlPipe
38+from wokkel.compat import XmlStreamServerFactory
39+
40+NS_DIALBACK = 'jabber:server:dialback'
41+
42+def generateKey(secret, receivingServer, originatingServer, streamID):
43+    """
44+    Generate a dialback key for server-to-server XMPP Streams.
45+
46+    The dialback key is generated using the algorithm described in
47+    U{XEP-0185<http://www.xmpp.org/extensions/xep-0185.html>}. The used
48+    terminology for the parameters is described in RFC-3920.
49+
50+    @param secret: the shared secret known to the Originating Server and
51+                   Authoritive Server.
52+    @type secret: C{str}
53+    @param receivingServer: the Receiving Server host name.
54+    @type receivingServer: C{str}
55+    @param originatingServer: the Originating Server host name.
56+    @type originatingServer: C{str}
57+    @param streamID: the Stream ID as generated by the Receiving Server.
58+    @type streamID: C{str}
59+    @return: hexadecimal digest of the generated key.
60+    @type: C{str}
61+    """
62+
63+    hashObject = sha256()
64+    hashObject.update(secret)
65+    hashedSecret = hashObject.hexdigest()
66+    message = " ".join([receivingServer, originatingServer, streamID])
[10]67+    hash = hmac.HMAC(hashedSecret, message, digestmod=digestmod)
[6]68+    return hash.hexdigest()
69+
70+
71+def trapStreamError(xs, observer):
72+    """
73+    Trap stream errors.
74+
75+    This wraps an observer to catch exceptions. In case of a
76+    L{error.StreamError}, it is send over the given XML stream. All other
77+    exceptions yield a C{'internal-server-error'} stream error, that is
78+    sent over the stream, while the exception is logged.
79+
80+    @return: Wrapped observer
81+    """
82+
83+    def wrappedObserver(element):
84+        try:
85+            observer(element)
86+        except error.StreamError, exc:
87+            xs.sendStreamError(exc)
88+        except:
89+            log.err()
90+            exc = error.StreamError('internal-server-error')
91+            xs.sendStreamError(exc)
92+
93+    return wrappedObserver
94+
95+
96+class XMPPServerConnector(SRVConnector):
97+    def __init__(self, reactor, domain, factory):
98+        SRVConnector.__init__(self, reactor, 'xmpp-server', domain, factory)
99+
100+
101+    def pickServer(self):
102+        host, port = SRVConnector.pickServer(self)
103+
104+        if not self.servers and not self.orderedServers:
105+            # no SRV record, fall back..
106+            port = 5269
107+
108+        return host, port
109+
110+
111+class DialbackFailed(Exception):
112+    pass
113+
114+
115+
116+class OriginatingDialbackInitializer(object):
117+    """
118+    Server Dialback Initializer for the Orginating Server.
119+    """
120+
121+    implements(ijabber.IInitiatingInitializer)
122+
123+    _deferred = None
124+
125+    def __init__(self, xs, thisHost, otherHost, secret):
126+        self.xmlstream = xs
127+        self.thisHost = thisHost
128+        self.otherHost = otherHost
129+        self.secret = secret
130+
131+
132+    def initialize(self):
133+        self._deferred = defer.Deferred()
134+        self.xmlstream.addObserver(xmlstream.STREAM_ERROR_EVENT,
135+                                   self.onStreamError)
136+        self.xmlstream.addObserver("/result[@xmlns='%s']" % NS_DIALBACK,
137+                                   self.onResult)
138+
139+        key = generateKey(self.secret, self.otherHost,
140+                          self.thisHost, self.xmlstream.sid)
141+
142+        result = domish.Element((NS_DIALBACK, 'result'))
143+        result['from'] = self.thisHost
144+        result['to'] = self.otherHost
145+        result.addContent(key)
146+
147+        self.xmlstream.send(result)
148+
149+        return self._deferred
150+
151+
152+    def onResult(self, result):
153+        self.xmlstream.removeObserver(xmlstream.STREAM_ERROR_EVENT,
154+                                      self.onStreamError)
155+        if result['type'] == 'valid':
156+            self.xmlstream.otherEntity = jid.internJID(self.otherHost)
157+            self._deferred.callback(None)
158+        else:
159+            self._deferred.errback(DialbackFailed())
160+
161+
162+    def onStreamError(self, failure):
163+        self.xmlstream.removeObserver("/result[@xmlns='%s']" % NS_DIALBACK,
164+                                      self.onResult)
165+        self._deferred.errback(failure)
166+
167+
168+
169+class ReceivingDialbackInitializer(object):
170+    """
171+    Server Dialback Initializer for the Receiving Server.
172+    """
173+
174+    implements(ijabber.IInitiatingInitializer)
175+
176+    _deferred = None
177+
178+    def __init__(self, xs, thisHost, otherHost, originalStreamID, key):
179+        self.xmlstream = xs
180+        self.thisHost = thisHost
181+        self.otherHost = otherHost
182+        self.originalStreamID = originalStreamID
183+        self.key = key
184+
185+
186+    def initialize(self):
187+        self._deferred = defer.Deferred()
188+        self.xmlstream.addObserver(xmlstream.STREAM_ERROR_EVENT,
189+                                   self.onStreamError)
190+        self.xmlstream.addObserver("/verify[@xmlns='%s']" % NS_DIALBACK,
191+                                   self.onVerify)
192+
193+        verify = domish.Element((NS_DIALBACK, 'verify'))
194+        verify['from'] = self.thisHost
195+        verify['to'] = self.otherHost
196+        verify['id'] = self.originalStreamID
197+        verify.addContent(self.key)
198+
199+        self.xmlstream.send(verify)
200+        return self._deferred
201+
202+
203+    def onVerify(self, verify):
204+        self.xmlstream.removeObserver(xmlstream.STREAM_ERROR_EVENT,
205+                                      self.onStreamError)
206+        if verify['id'] != self.originalStreamID:
207+            self.xmlstream.sendStreamError(error.StreamError('invalid-id'))
208+            self._deferred.errback(DialbackFailed())
209+        elif verify['to'] != self.thisHost:
210+            self.xmlstream.sendStreamError(error.StreamError('host-unknown'))
211+            self._deferred.errback(DialbackFailed())
212+        elif verify['from'] != self.otherHost:
213+            self.xmlstream.sendStreamError(error.StreamError('invalid-from'))
214+            self._deferred.errback(DialbackFailed())
215+        elif verify['type'] == 'valid':
216+            self._deferred.callback(None)
217+        else:
218+            self._deferred.errback(DialbackFailed())
219+
220+
221+    def onStreamError(self, failure):
222+        self.xmlstream.removeObserver("/verify[@xmlns='%s']" % NS_DIALBACK,
223+                                      self.onVerify)
224+        self._deferred.errback(failure)
225+
226+
227+
228+class XMPPServerConnectAuthenticator(xmlstream.ConnectAuthenticator):
[7]229+    """
230+    Authenticator for an outgoing XMPP server-to-server connection.
231+
232+    This authenticator connects to C{otherHost} (the Receiving Server) and then
233+    initiates dialback as C{thisHost} (the Originating Server) using
234+    L{OriginatingDialbackInitializer}.
235+
236+    @ivar thisHost: The domain this server connects from (the Originating
237+                    Server) .
238+    @ivar otherHost: The domain of the server this server connects to (the
239+                     Receiving Server).
240+    @ivar secret: The shared secret that is used for verifying the validity
241+                  of this new connection.
242+    """
[6]243+    namespace = 'jabber:server'
244+
245+    def __init__(self, thisHost, otherHost, secret):
246+        self.thisHost = thisHost
247+        self.otherHost = otherHost
248+        self.secret = secret
249+        xmlstream.ConnectAuthenticator.__init__(self, otherHost)
250+
251+
252+    def connectionMade(self):
253+        self.xmlstream.thisEntity = jid.internJID(self.thisHost)
254+        self.xmlstream.prefixes = {xmlstream.NS_STREAMS: 'stream',
255+                                   NS_DIALBACK: 'db'}
256+        xmlstream.ConnectAuthenticator.connectionMade(self)
257+
258+
259+    def associateWithStream(self, xs):
260+        xmlstream.ConnectAuthenticator.associateWithStream(self, xs)
261+        init = OriginatingDialbackInitializer(xs, self.thisHost,
262+                                              self.otherHost, self.secret)
263+        xs.initializers = [init]
264+
265+
266+
267+class XMPPServerVerifyAuthenticator(xmlstream.ConnectAuthenticator):
[7]268+    """
269+    Authenticator for an outgoing connection to verify an incoming connection.
270+
271+    This authenticator connects to C{otherHost} (the Authoritative Server) and
272+    then initiates dialback as C{thisHost} (the Receiving Server) using
273+    L{ReceivingDialbackInitializer}.
274+
275+    @ivar thisHost: The domain this server connects from (the Receiving
276+                    Server) .
277+    @ivar otherHost: The domain of the server this server connects to (the
278+                     Authoritative Server).
279+    @ivar originalStreamID: The stream ID of the incoming connection that is
280+                            being verified.
281+    @ivar key: The key provided by the Receving Server to be verified.
282+    """
[6]283+    namespace = 'jabber:server'
284+
285+    def __init__(self, thisHost, otherHost, originalStreamID, key):
286+        self.thisHost = thisHost
287+        self.otherHost = otherHost
288+        self.originalStreamID = originalStreamID
289+        self.key = key
290+        xmlstream.ConnectAuthenticator.__init__(self, otherHost)
291+
292+
293+    def connectionMade(self):
294+        self.xmlstream.thisEntity = jid.internJID(self.thisHost)
295+        self.xmlstream.prefixes = {xmlstream.NS_STREAMS: 'stream',
296+                                   NS_DIALBACK: 'db'}
297+        xmlstream.ConnectAuthenticator.connectionMade(self)
298+
299+
300+    def associateWithStream(self, xs):
301+        xmlstream.ConnectAuthenticator.associateWithStream(self, xs)
302+        init = ReceivingDialbackInitializer(xs, self.thisHost, self.otherHost,
303+                                            self.originalStreamID, self.key)
304+        xs.initializers = [init]
305+
306+
307+
308+class XMPPServerListenAuthenticator(xmlstream.ListenAuthenticator):
[7]309+    """
310+    Authenticator for an incoming XMPP server-to-server connection.
311+
312+    This authenticator handles two types of incoming connections. Regular
313+    server-to-server connections are from the Originating Server to the
314+    Receiving Server, where this server is the Receiving Server. These
315+    connections start out by receiving a dialback key, verifying the
316+    key with the Authoritative Server, and then accept normal XMPP stanzas.
317+
318+    The other type of connections is from a Receiving Server to an
319+    Authoritative Server, where this server acts as the Authoritative Server.
320+    These connections are used to verify the validity of an outgoing connection
321+    from this server. In this case, this server receives a verification
322+    request, checks the key and then returns the result.
323+
324+    @ivar service: The service that keeps the list of domains we accept
325+                   connections for.
326+    """
[6]327+    namespace = 'jabber:server'
328+
[7]329+    def __init__(self, service):
[6]330+        xmlstream.ListenAuthenticator.__init__(self)
[7]331+        self.service = service
[6]332+
333+
334+    def streamStarted(self, rootElement):
335+        xmlstream.ListenAuthenticator.streamStarted(self, rootElement)
336+
337+        # Compatibility fix for pre-8.2 implementations of ListenAuthenticator
338+        if not self.xmlstream.sid:
339+            self.xmlstream.sid = randbytes.secureRandom(8).encode('hex')
340+
[7]341+        if self.xmlstream.thisEntity:
342+            targetDomain = self.xmlstream.thisEntity.host
343+        else:
344+            targetDomain = self.service.defaultDomain
345+
346+        def prepareStream(domain):
[6]347+            self.xmlstream.namespace = self.namespace
348+            self.xmlstream.prefixes = {xmlstream.NS_STREAMS: 'stream',
349+                                       NS_DIALBACK: 'db'}
[11]350+            if domain:
351+                self.xmlstream.thisEntity = jid.internJID(domain)
[6]352+
353+        try:
354+            if xmlstream.NS_STREAMS != rootElement.uri or \
355+               self.namespace != self.xmlstream.namespace or \
356+               ('db', NS_DIALBACK) not in rootElement.localPrefixes.iteritems():
357+                raise error.StreamError('invalid-namespace')
358+
[11]359+            if not targetDomain or targetDomain not in self.service.domains:
[6]360+                raise error.StreamError('host-unknown')
361+        except error.StreamError, exc:
[7]362+            prepareStream(self.service.defaultDomain)
[6]363+            self.xmlstream.sendStreamError(exc)
364+            return
365+
366+        self.xmlstream.addObserver("//verify[@xmlns='%s']" % NS_DIALBACK,
367+                                   trapStreamError(self.xmlstream,
368+                                                   self.onVerify))
369+        self.xmlstream.addObserver("//result[@xmlns='%s']" % NS_DIALBACK,
370+                                   self.onResult)
371+
[7]372+        prepareStream(targetDomain)
[6]373+        self.xmlstream.sendHeader()
374+
375+        if self.xmlstream.version >= (1, 0):
376+            features = domish.Element((xmlstream.NS_STREAMS, 'features'))
377+            self.xmlstream.send(features)
378+
379+
380+    def onVerify(self, verify):
381+        try:
[7]382+            receivingServer = jid.JID(verify['from']).host
383+            originatingServer = jid.JID(verify['to']).host
[6]384+        except (KeyError, jid.InvalidFormat):
385+            raise error.StreamError('improper-addressing')
386+
[7]387+        if originatingServer not in self.service.domains:
[6]388+            raise error.StreamError('host-unknown')
389+
[7]390+        if (self.xmlstream.otherEntity and
391+            receivingServer != self.xmlstream.otherEntity.host):
[6]392+            raise error.StreamError('invalid-from')
393+
394+        streamID = verify.getAttribute('id', '')
395+        key = unicode(verify)
396+
[7]397+        calculatedKey = generateKey(self.service.secret, receivingServer,
[6]398+                                    originatingServer, streamID)
399+        validity = (key == calculatedKey) and 'valid' or 'invalid'
400+
401+        reply = domish.Element((NS_DIALBACK, 'verify'))
402+        reply['from'] = originatingServer
403+        reply['to'] = receivingServer
404+        reply['id'] = streamID
405+        reply['type'] = validity
406+        self.xmlstream.send(reply)
407+
408+
409+    def onResult(self, result):
410+        def reply(validity):
411+            reply = domish.Element((NS_DIALBACK, 'result'))
412+            reply['from'] = result['to']
413+            reply['to'] = result['from']
414+            reply['type'] = validity
415+            self.xmlstream.send(reply)
416+
417+        def valid(xs):
418+            reply('valid')
419+            self.xmlstream.otherEntity = jid.internJID(originatingServer)
420+            self.xmlstream.dispatch(self.xmlstream,
421+                                    xmlstream.STREAM_AUTHD_EVENT)
422+
423+        def invalid(failure):
[7]424+            log.err(failure)
[6]425+            reply('invalid')
426+
[7]427+        receivingServer = result['to']
[6]428+        originatingServer = result['from']
[7]429+        key = unicode(result)
[6]430+
[7]431+        d = self.service.validateConnection(receivingServer, originatingServer,
432+                                            self.xmlstream.sid, key)
433+        d.addCallbacks(valid, invalid)
434+        return d
[6]435+
436+
437+
438+class DeferredS2SClientFactory(DeferredXmlStreamFactory):
439+    """
440+    Deferred firing factory for initiating XMPP server-to-server connection.
441+
442+    The deferred has its callbacks called upon succesful authentication with
443+    the other server. In case of failed authentication or connection, the
444+    deferred will have its errbacks called instead.
445+    """
446+
[9]447+    logTraffic = False
448+
[7]449+    def __init__(self, authenticator):
[6]450+        DeferredXmlStreamFactory.__init__(self, authenticator)
451+
[7]452+        self.addBootstrap(xmlstream.STREAM_CONNECTED_EVENT,
453+                          self.onConnectionMade)
[6]454+
455+        self.serial = 0
456+
457+
[7]458+    def onConnectionMade(self, xs):
[6]459+        xs.serial = self.serial
460+        self.serial += 1
461+
462+        def logDataIn(buf):
463+            log.msg("RECV (%d): %r" % (xs.serial, buf))
464+
465+        def logDataOut(buf):
466+            log.msg("SEND (%d): %r" % (xs.serial, buf))
467+
468+        if self.logTraffic:
469+            xs.rawDataInFn = logDataIn
470+            xs.rawDataOutFn = logDataOut
471+
472+
473+
[7]474+def initiateS2S(factory):
475+    domain = factory.authenticator.otherHost
476+    c = XMPPServerConnector(reactor, domain, factory)
477+    c.connect()
478+    return factory.deferred
[6]479+
480+
481+
[7]482+class XMPPS2SServerFactory(XmlStreamServerFactory):
483+    """
484+    XMPP Server-to-Server Server factory.
[6]485+
[7]486+    This factory accepts XMPP server-to-server connections.
487+    """
[6]488+
[7]489+    logTraffic = False
[6]490+
[7]491+    def __init__(self, service):
492+        self.service = service
[6]493+
[7]494+        def authenticatorFactory():
495+            return XMPPServerListenAuthenticator(service)
[6]496+
[7]497+        XmlStreamServerFactory.__init__(self, authenticatorFactory)
498+        self.addBootstrap(xmlstream.STREAM_CONNECTED_EVENT,
499+                          self.onConnectionMade)
500+        self.addBootstrap(xmlstream.STREAM_AUTHD_EVENT,
501+                          self.onAuthenticated)
[6]502+
[7]503+        self.serial = 0
[6]504+
505+
[7]506+    def onConnectionMade(self, xs):
507+        """
508+        Called when a server-to-server connection was made.
509+
510+        This enables traffic debugging on incoming streams.
511+        """
512+        xs.serial = self.serial
513+        self.serial += 1
514+
515+        def logDataIn(buf):
516+            log.msg("RECV (%d): %r" % (xs.serial, buf))
517+
518+        def logDataOut(buf):
519+            log.msg("SEND (%d): %r" % (xs.serial, buf))
520+
521+        if self.logTraffic:
522+            xs.rawDataInFn = logDataIn
523+            xs.rawDataOutFn = logDataOut
524+
525+        xs.addObserver(xmlstream.STREAM_ERROR_EVENT, self.onError)
526+
527+
528+    def onAuthenticated(self, xs):
529+        thisHost = xs.thisEntity.host
530+        otherHost = xs.otherEntity.host
531+
532+        log.msg("Incoming connection %d from %r to %r established" %
533+                (xs.serial, otherHost, thisHost))
534+
535+        xs.addObserver(xmlstream.STREAM_END_EVENT, self.onConnectionLost,
536+                                                   0, xs)
537+        xs.addObserver('/*', self.onElement, 0, xs)
538+
539+
540+    def onConnectionLost(self, xs, reason):
541+        thisHost = xs.thisEntity.host
542+        otherHost = xs.otherEntity.host
543+
544+        log.msg("Incoming connection %d from %r to %r disconnected" %
545+                (xs.serial, otherHost, thisHost))
546+
547+
548+    def onError(self, reason):
549+        log.err(reason, "Stream Error")
[6]550+
551+
[9]552+    def onElement(self, xs, element):
[6]553+        """
554+        Called when an element was received from one of the connected streams.
555+
556+        """
557+        if element.handled:
558+            return
559+        else:
[9]560+            self.service.dispatch(xs, element)
[7]561+
562+
563+
564+class ServerService(object):
565+    """
566+    Service for managing XMPP server to server connections.
567+    """
568+
569+    logTraffic = False
570+
[11]571+    def __init__(self, router, domain=None, secret=None):
[7]572+        self.router = router
[11]573+
[7]574+        self.defaultDomain = domain
[11]575+        self.domains = set()
576+        if self.defaultDomain:
577+            self.domains.add(self.defaultDomain)
578+
579+        if secret is not None:
580+            self.secret = secret
581+        else:
582+            self.secret = randbytes.secureRandom(16).encode('hex')
[7]583+
584+        self._outgoingStreams = {}
585+        self._outgoingQueues = {}
586+        self._outgoingConnecting = set()
587+        self.serial = 0
588+
589+        pipe = XmlPipe()
590+        self.xmlstream = pipe.source
591+        self.router.addRoute(None, pipe.sink)
592+        self.xmlstream.addObserver('/*', self.send)
593+
594+
595+    def outgoingInitialized(self, xs):
596+        thisHost = xs.thisEntity.host
597+        otherHost = xs.otherEntity.host
598+
599+        log.msg("Outgoing connection %d from %r to %r established" %
600+                (xs.serial, thisHost, otherHost))
601+
602+        self._outgoingStreams[thisHost, otherHost] = xs
603+        xs.addObserver(xmlstream.STREAM_END_EVENT,
604+                       lambda _: self.outgoingDisconnected(xs))
605+
606+        if (thisHost, otherHost) in self._outgoingQueues:
607+            for element in self._outgoingQueues[thisHost, otherHost]:
608+                xs.send(element)
609+            del self._outgoingQueues[thisHost, otherHost]
610+
611+
612+    def outgoingDisconnected(self, xs):
613+        thisHost = xs.thisEntity.host
614+        otherHost = xs.otherEntity.host
615+
616+        log.msg("Outgoing connection %d from %r to %r disconnected" %
617+                (xs.serial, thisHost, otherHost))
618+
619+        del self._outgoingStreams[thisHost, otherHost]
620+
621+
622+    def initiateOutgoingStream(self, thisHost, otherHost):
623+        """
624+        Initiate an outgoing XMPP server-to-server connection.
625+        """
626+
627+        def resetConnecting(_):
628+            self._outgoingConnecting.remove((thisHost, otherHost))
629+
630+        if (thisHost, otherHost) in self._outgoingConnecting:
631+            return
632+
633+        authenticator = XMPPServerConnectAuthenticator(thisHost,
634+                                                       otherHost,
635+                                                       self.secret)
636+        factory = DeferredS2SClientFactory(authenticator)
637+        factory.addBootstrap(xmlstream.STREAM_AUTHD_EVENT,
638+                             self.outgoingInitialized)
[9]639+        factory.logTraffic = self.logTraffic
[7]640+
641+        self._outgoingConnecting.add((thisHost, otherHost))
642+
643+        d = initiateS2S(factory)
644+        d.addBoth(resetConnecting)
645+        return d
646+
647+
648+    def validateConnection(self, thisHost, otherHost, sid, key):
649+        """
650+        Validate an incoming XMPP server-to-server connection.
651+        """
652+
653+        def connected(xs):
654+            # Set up stream for immediate disconnection.
655+            def disconnect(_):
656+                xs.transport.loseConnection()
657+            xs.addObserver(xmlstream.STREAM_AUTHD_EVENT, disconnect)
658+            xs.addObserver(xmlstream.INIT_FAILED_EVENT, disconnect)
659+
660+        authenticator = XMPPServerVerifyAuthenticator(thisHost, otherHost,
661+                                                      sid, key)
662+        factory = DeferredS2SClientFactory(authenticator)
663+        factory.addBootstrap(xmlstream.STREAM_CONNECTED_EVENT, connected)
[9]664+        factory.logTraffic = self.logTraffic
[7]665+
666+        d = initiateS2S(factory)
667+        return d
[6]668+
669+
670+    def send(self, stanza):
671+        """
672+        Send stanza to the proper XML Stream.
673+
674+        This uses addressing embedded in the stanza to find the correct stream
675+        to forward the stanza to.
676+        """
677+
[7]678+        otherHost = jid.internJID(stanza["to"]).host
679+        thisHost = jid.internJID(stanza["from"]).host
[6]680+
[7]681+        if (thisHost, otherHost) not in self._outgoingStreams:
[6]682+            # There is no connection with the destination (yet). Cache the
683+            # outgoing stanza until the connection has been established.
684+            # XXX: If the connection cannot be established, the queue should
685+            #      be emptied at some point.
[7]686+            if (thisHost, otherHost) not in self._outgoingQueues:
687+                self._outgoingQueues[(thisHost, otherHost)] = []
688+            self._outgoingQueues[(thisHost, otherHost)].append(stanza)
689+            self.initiateOutgoingStream(thisHost, otherHost)
[6]690+        else:
[7]691+            self._outgoingStreams[(thisHost, otherHost)].send(stanza)
692+
693+
[9]694+    def dispatch(self, xs, stanza):
[7]695+        """
696+        Send on element to be routed within the server.
697+        """
[9]698+        stanzaFrom = stanza.getAttribute('from')
699+        stanzaTo = stanza.getAttribute('to')
700+
701+        if not stanzaFrom or not stanzaTo:
702+            xs.sendStreamError(error.StreamError('improper-addressing'))
703+        else:
704+            try:
705+                sender = jid.internJID(stanzaFrom)
706+                recipient = jid.internJID(stanzaTo)
707+            except jid.InvalidFormat:
708+                log.msg("Dropping error stanza with malformed JID")
709+
710+            if sender.host != xs.otherEntity.host:
711+                xs.sendStreamError(error.StreamError('invalid-from'))
712+            else:
713+                self.xmlstream.send(stanza)
714diff -r 313d45b505a7 wokkel/test/test_server.py
[6]715--- /dev/null   Thu Jan 01 00:00:00 1970 +0000
[11]716+++ b/wokkel/test/test_server.py        Fri Apr 10 09:38:01 2009 +0200
[9]717@@ -0,0 +1,450 @@
[6]718+# Copyright (c) 2003-2008 Ralph Meijer
719+# See LICENSE for details.
720+
721+"""
722+Tests for L{wokkel.server}.
723+"""
724+
[7]725+from twisted.internet import defer
726+from twisted.python import failure
727+from twisted.test.proto_helpers import StringTransport
[6]728+from twisted.trial import unittest
[7]729+from twisted.words.protocols.jabber import error, jid, xmlstream
730+from twisted.words.xish import domish
[6]731+
[7]732+from wokkel import component, server
[6]733+
734+NS_STREAMS = 'http://etherx.jabber.org/streams'
[7]735+NS_DIALBACK = "jabber:server:dialback"
[6]736+
737+class GenerateKeyTest(unittest.TestCase):
738+    """
739+    Tests for L{server.generateKey}.
740+    """
741+
742+    def testBasic(self):
[7]743+        originating = "example.org"
744+        receiving = "xmpp.example.com"
745+        sid = "D60000229F"
[6]746+        secret = "s3cr3tf0rd14lb4ck"
747+
[7]748+        key = server.generateKey(secret, receiving, originating, sid)
[6]749+
750+        self.assertEqual(key,
[7]751+            '37c69b1cf07a3f67c04a5ef5902fa5114f2c76fe4a2686482ba5b89323075643')
[6]752+
753+
754+
755+class XMPPServerListenAuthenticatorTest(unittest.TestCase):
756+    """
757+    Tests for L{server.XMPPServerListenAuthenticator}.
758+    """
759+
[7]760+    secret = "s3cr3tf0rd14lb4ck"
761+    originating = "example.org"
762+    receiving = "xmpp.example.com"
763+    sid = "D60000229F"
764+    key = '37c69b1cf07a3f67c04a5ef5902fa5114f2c76fe4a2686482ba5b89323075643'
[6]765+
766+    def setUp(self):
767+        self.output = []
[7]768+
769+        class MyService(object):
770+            pass
771+
772+        self.service = MyService()
773+        self.service.defaultDomain = self.receiving
774+        self.service.domains = [self.receiving, 'pubsub.'+self.receiving]
775+        self.service.secret = self.secret
776+
777+        self.authenticator = server.XMPPServerListenAuthenticator(self.service)
[6]778+        self.xmlstream = xmlstream.XmlStream(self.authenticator)
779+        self.xmlstream.send = self.output.append
780+        self.xmlstream.transport = StringTransport()
781+
782+
783+    def test_attributes(self):
784+        """
785+        Test attributes of authenticator and stream objects.
786+        """
[7]787+        self.assertEqual(self.service, self.authenticator.service)
788+        self.assertEqual(self.xmlstream.initiating, False)
[6]789+
790+
791+    def test_streamStartedVersion0(self):
792+        """
793+        The authenticator supports pre-XMPP 1.0 streams.
794+        """
795+        self.xmlstream.connectionMade()
796+        self.xmlstream.dataReceived(
797+            "<stream:stream xmlns:stream='http://etherx.jabber.org/streams' "
798+                           "xmlns:db='jabber:server:dialback' "
799+                           "xmlns='jabber:server' "
[7]800+                           "to='xmpp.example.com'>")
[6]801+        self.assertEqual((0, 0), self.xmlstream.version)
802+
803+
804+    def test_streamStartedVersion1(self):
805+        """
806+        The authenticator supports XMPP 1.0 streams.
807+        """
808+        self.xmlstream.connectionMade()
809+        self.xmlstream.dataReceived(
810+            "<stream:stream xmlns:stream='http://etherx.jabber.org/streams' "
811+                           "xmlns:db='jabber:server:dialback' "
812+                           "xmlns='jabber:server' "
[7]813+                           "to='xmpp.example.com' "
[6]814+                           "version='1.0'>")
815+        self.assertEqual((1, 0), self.xmlstream.version)
816+
817+
818+    def test_streamStartedSID(self):
819+        """
820+        The response stream will have a stream ID.
821+        """
822+        self.xmlstream.connectionMade()
823+        self.assertIdentical(None, self.xmlstream.sid)
824+
825+        self.xmlstream.dataReceived(
826+            "<stream:stream xmlns:stream='http://etherx.jabber.org/streams' "
827+                           "xmlns:db='jabber:server:dialback' "
828+                           "xmlns='jabber:server' "
[7]829+                           "to='xmpp.example.com' "
[6]830+                           "version='1.0'>")
831+        self.assertNotIdentical(None, self.xmlstream.sid)
832+
833+
834+    def test_streamStartedSentResponseHeader(self):
835+        """
836+        A stream header is sent in response to the incoming stream header.
837+        """
838+        self.xmlstream.connectionMade()
839+        self.assertFalse(self.xmlstream._headerSent)
840+
841+        self.xmlstream.dataReceived(
842+            "<stream:stream xmlns:stream='http://etherx.jabber.org/streams' "
843+                           "xmlns:db='jabber:server:dialback' "
844+                           "xmlns='jabber:server' "
[7]845+                           "to='xmpp.example.com'>")
[6]846+        self.assertTrue(self.xmlstream._headerSent)
847+
848+
849+    def test_streamStartedNotSentFeatures(self):
850+        """
851+        No features are sent in response to an XMPP < 1.0 stream header.
852+        """
853+        self.xmlstream.connectionMade()
854+        self.xmlstream.dataReceived(
855+            "<stream:stream xmlns:stream='http://etherx.jabber.org/streams' "
856+                           "xmlns:db='jabber:server:dialback' "
857+                           "xmlns='jabber:server' "
[7]858+                           "to='xmpp.example.com'>")
[6]859+        self.assertEqual(1, len(self.output))
860+
861+
862+    def test_streamStartedSentFeatures(self):
863+        """
864+        Features are sent in response to an XMPP >= 1.0 stream header.
865+        """
866+        self.xmlstream.connectionMade()
867+        self.xmlstream.dataReceived(
868+            "<stream:stream xmlns:stream='http://etherx.jabber.org/streams' "
869+                           "xmlns:db='jabber:server:dialback' "
870+                           "xmlns='jabber:server' "
[7]871+                           "to='xmpp.example.com' "
[6]872+                           "version='1.0'>")
873+        self.assertEqual(2, len(self.output))
874+        features = self.output[-1]
875+        self.assertEqual(NS_STREAMS, features.uri)
876+        self.assertEqual('features', features.name)
877+
878+
879+    def test_streamRootElement(self):
880+        """
881+        Test stream error on wrong stream namespace.
882+        """
883+        self.xmlstream.connectionMade()
884+        self.xmlstream.dataReceived(
885+            "<stream:stream xmlns:stream='badns' "
886+                           "xmlns:db='jabber:server:dialback' "
887+                           "xmlns='jabber:server' "
[7]888+                           "to='xmpp.example.com'>")
[6]889+
[7]890+        self.assertEqual(3, len(self.output))
[6]891+        exc = error.exceptionFromStreamError(self.output[1])
[7]892+        self.assertEqual('invalid-namespace', exc.condition)
[6]893+
894+
895+    def test_streamDefaultNamespace(self):
896+        """
897+        Test stream error on missing dialback namespace.
898+        """
899+        self.xmlstream.connectionMade()
900+        self.xmlstream.dataReceived(
901+            "<stream:stream xmlns:stream='http://etherx.jabber.org/streams' "
902+                           "xmlns:db='jabber:server:dialback' "
903+                           "xmlns='badns' "
[7]904+                           "to='xmpp.example.com'>")
[6]905+
[7]906+        self.assertEqual(3, len(self.output))
[6]907+        exc = error.exceptionFromStreamError(self.output[1])
[7]908+        self.assertEqual('invalid-namespace', exc.condition)
[6]909+
910+
911+    def test_streamNoDialbackNamespace(self):
912+        """
913+        Test stream error on missing dialback namespace.
914+        """
915+        self.xmlstream.connectionMade()
916+        self.xmlstream.dataReceived(
917+            "<stream:stream xmlns:stream='http://etherx.jabber.org/streams' "
918+                           "xmlns='jabber:server' "
[7]919+                           "to='xmpp.example.com'>")
[6]920+
[7]921+        self.assertEqual(3, len(self.output))
[6]922+        exc = error.exceptionFromStreamError(self.output[1])
[7]923+        self.assertEqual('invalid-namespace', exc.condition)
[6]924+
925+
926+    def test_streamBadDialbackNamespace(self):
927+        """
928+        Test stream error on missing dialback namespace.
929+        """
930+        self.xmlstream.connectionMade()
931+        self.xmlstream.dataReceived(
932+            "<stream:stream xmlns:stream='http://etherx.jabber.org/streams' "
933+                           "xmlns:db='badns' "
934+                           "xmlns='jabber:server' "
[7]935+                           "to='xmpp.example.com'>")
[6]936+
[7]937+        self.assertEqual(3, len(self.output))
[6]938+        exc = error.exceptionFromStreamError(self.output[1])
[7]939+        self.assertEqual('invalid-namespace', exc.condition)
[6]940+
941+
942+    def test_streamToUnknownHost(self):
943+        """
944+        Test stream error on stream's to attribute having unknown host.
945+        """
946+        self.xmlstream.connectionMade()
947+        self.xmlstream.dataReceived(
948+            "<stream:stream xmlns:stream='http://etherx.jabber.org/streams' "
949+                           "xmlns:db='jabber:server:dialback' "
950+                           "xmlns='jabber:server' "
951+                           "to='badhost'>")
952+
[7]953+        self.assertEqual(3, len(self.output))
[6]954+        exc = error.exceptionFromStreamError(self.output[1])
[7]955+        self.assertEqual('host-unknown', exc.condition)
956+
957+
958+    def test_streamToOtherLocalHost(self):
959+        """
960+        The authenticator supports XMPP 1.0 streams.
961+        """
962+        self.xmlstream.connectionMade()
963+        self.xmlstream.dataReceived(
964+            "<stream:stream xmlns:stream='http://etherx.jabber.org/streams' "
965+                           "xmlns:db='jabber:server:dialback' "
966+                           "xmlns='jabber:server' "
967+                           "to='pubsub.xmpp.example.com' "
968+                           "version='1.0'>")
969+
970+        self.assertEqual(2, len(self.output))
971+        self.assertEqual(jid.JID('pubsub.xmpp.example.com'),
972+                         self.xmlstream.thisEntity)
973+
974+    def test_onResult(self):
975+        def cb(result):
976+            self.assertEqual(1, len(self.output))
977+            reply = self.output[0]
978+            self.assertEqual(self.originating, reply['to'])
979+            self.assertEqual(self.receiving, reply['from'])
980+            self.assertEqual('valid', reply['type'])
981+
982+        def validateConnection(thisHost, otherHost, sid, key):
983+            self.assertEqual(thisHost, self.receiving)
984+            self.assertEqual(otherHost, self.originating)
985+            self.assertEqual(sid, self.sid)
986+            self.assertEqual(key, self.key)
987+            return defer.succeed(None)
988+
989+        self.xmlstream.sid = self.sid
990+        self.service.validateConnection = validateConnection
991+
992+        result = domish.Element((NS_DIALBACK, 'result'))
993+        result['to'] = self.receiving
994+        result['from'] = self.originating
995+        result.addContent(self.key)
996+
997+        d = self.authenticator.onResult(result)
998+        d.addCallback(cb)
999+        return d
1000+
1001+
1002+    def test_onResultFailure(self):
1003+        class TestError(Exception):
1004+            pass
1005+
1006+        def cb(result):
1007+            reply = self.output[0]
1008+            self.assertEqual('invalid', reply['type'])
1009+            self.assertEqual(1, len(self.flushLoggedErrors(TestError)))
1010+
1011+
1012+        def validateConnection(thisHost, otherHost, sid, key):
1013+            return defer.fail(TestError())
1014+
1015+        self.xmlstream.sid = self.sid
1016+        self.service.validateConnection = validateConnection
1017+
1018+        result = domish.Element((NS_DIALBACK, 'result'))
1019+        result['to'] = self.receiving
1020+        result['from'] = self.originating
1021+        result.addContent(self.key)
1022+
1023+        d = self.authenticator.onResult(result)
1024+        d.addCallback(cb)
1025+        return d
1026+
1027+
1028+
1029+class FakeService(object):
1030+    domains = set(['example.org', 'pubsub.example.org'])
1031+    defaultDomain = 'example.org'
1032+    secret = 'mysecret'
1033+
1034+    def __init__(self):
1035+        self.dispatched = []
1036+
[9]1037+    def dispatch(self, xs, element):
1038+        self.dispatched.append(element)
[7]1039+
1040+
1041+
1042+class XMPPS2SServerFactoryTest(unittest.TestCase):
1043+    """
1044+    Tests for L{component.XMPPS2SServerFactory}.
1045+    """
1046+
1047+    def setUp(self):
[9]1048+        self.service = FakeService()
[7]1049+        self.factory = server.XMPPS2SServerFactory(self.service)
1050+        self.xmlstream = self.factory.buildProtocol(None)
[9]1051+        self.transport = StringTransport()
[7]1052+        self.xmlstream.thisEntity = jid.JID('example.org')
1053+        self.xmlstream.otherEntity = jid.JID('example.com')
1054+
1055+
1056+    def test_makeConnection(self):
1057+        """
1058+        A new connection increases the stream serial count. No logs by default.
1059+        """
[9]1060+        self.xmlstream.makeConnection(self.transport)
[7]1061+        self.assertEqual(0, self.xmlstream.serial)
1062+        self.assertEqual(1, self.factory.serial)
1063+        self.assertIdentical(None, self.xmlstream.rawDataInFn)
1064+        self.assertIdentical(None, self.xmlstream.rawDataOutFn)
1065+
1066+
1067+    def test_makeConnectionLogTraffic(self):
1068+        """
1069+        Setting logTraffic should set up raw data loggers.
1070+        """
1071+        self.factory.logTraffic = True
[9]1072+        self.xmlstream.makeConnection(self.transport)
[7]1073+        self.assertNotIdentical(None, self.xmlstream.rawDataInFn)
1074+        self.assertNotIdentical(None, self.xmlstream.rawDataOutFn)
1075+
1076+
1077+    def test_onError(self):
1078+        """
1079+        An observer for stream errors should trigger onError to log it.
1080+        """
[9]1081+        self.xmlstream.makeConnection(self.transport)
[7]1082+
1083+        class TestError(Exception):
1084+            pass
1085+
1086+        reason = failure.Failure(TestError())
1087+        self.xmlstream.dispatch(reason, xmlstream.STREAM_ERROR_EVENT)
1088+        self.assertEqual(1, len(self.flushLoggedErrors(TestError)))
1089+
1090+
1091+    def test_connectionInitialized(self):
1092+        """
1093+        """
[9]1094+        self.xmlstream.makeConnection(self.transport)
[7]1095+        self.xmlstream.dispatch(self.xmlstream, xmlstream.STREAM_AUTHD_EVENT)
1096+
1097+
1098+    def test_connectionLost(self):
1099+        """
1100+        """
[9]1101+        self.xmlstream.makeConnection(self.transport)
[7]1102+        self.xmlstream.dispatch(self.xmlstream, xmlstream.STREAM_AUTHD_EVENT)
1103+        self.xmlstream.dispatch(None, xmlstream.STREAM_END_EVENT)
1104+
1105+
[9]1106+    def test_Element(self):
1107+        self.xmlstream.makeConnection(self.transport)
1108+        self.xmlstream.dispatch(self.xmlstream, xmlstream.STREAM_AUTHD_EVENT)
1109+
1110+        stanza = domish.Element((None, "presence"))
1111+        self.xmlstream.dispatch(stanza)
1112+        self.assertEqual(1, len(self.service.dispatched))
1113+        self.assertIdentical(stanza, self.service.dispatched[-1])
1114+
1115+
[7]1116+    def test_ElementNotAuthenticated(self):
[9]1117+        self.xmlstream.makeConnection(self.transport)
1118+
1119+        stanza = domish.Element((None, "presence"))
1120+        self.xmlstream.dispatch(stanza)
1121+        self.assertEqual(0, len(self.service.dispatched))
1122+
[7]1123+
1124+
1125+class ServerServiceTest(unittest.TestCase):
1126+
1127+    def setUp(self):
[9]1128+        self.output = []
1129+
1130+        self.xmlstream = xmlstream.XmlStream(xmlstream.Authenticator())
1131+        self.xmlstream.thisEntity = jid.JID('example.org')
1132+        self.xmlstream.otherEntity = jid.JID('example.com')
1133+        self.xmlstream.send = self.output.append
1134+
[7]1135+        self.router = component.Router()
1136+        self.service = server.ServerService(self.router,
1137+                                            secret='mysecret',
1138+                                            domain='example.org')
[9]1139+        self.service.xmlstream = self.xmlstream
[7]1140+
1141+
1142+    def test_defaultDomainInDomains(self):
1143+        """
1144+        The default domain is part of the domains considered local.
1145+        """
1146+        self.assertIn(self.service.defaultDomain, self.service.domains)
[9]1147+
1148+
1149+    def test_dispatch(self):
1150+        stanza = domish.Element((None, "presence"))
1151+        stanza['to'] = 'user@example.org'
1152+        stanza['from'] = 'other@example.com'
1153+        self.service.dispatch(self.xmlstream, stanza)
1154+
1155+        self.assertEqual(1, len(self.output))
1156+        self.assertIdentical(stanza, self.output[-1])
1157+
1158+
1159+    def test_dispatchNoTo(self):
1160+        errors = []
1161+        self.xmlstream.sendStreamError = errors.append
1162+
1163+        stanza = domish.Element((None, "presence"))
1164+        stanza['from'] = 'other@example.com'
1165+        self.service.dispatch(self.xmlstream, stanza)
1166+
1167+        self.assertEqual(1, len(errors))
Note: See TracBrowser for help on using the repository browser.