source: ralphm-patches/s2s.patch @ 12:fc40892815eb

Last change on this file since 12:fc40892815eb was 12:fc40892815eb, checked in by Ralph Meijer <ralphm@…>, 13 years ago

Make sure empty default domains works.

File size: 39.8 KB
RevLine 
[12]1diff -r fd1dc58fe561 wokkel/server.py
[6]2--- /dev/null   Thu Jan 01 00:00:00 1970 +0000
[12]3+++ b/wokkel/server.py  Wed Apr 22 01:47:14 2009 -0700
4@@ -0,0 +1,711 @@
[6]5+# -*- test-case-name: wokkel.test.test_server -*-
6+#
7+# Copyright (c) 2003-2008 Ralph Meijer
8+# See LICENSE for details.
9+
10+"""
11+XMPP Server-to-Server protocol.
12+
13+This module implements several aspects of XMPP server-to-server communications
14+as described in XMPP Core (RFC 3920). Refer to that document for the meaning
15+of the used terminology.
16+"""
17+
18+# hashlib is new in Python 2.5, try that first.
19+try:
20+    from hashlib import sha256
[10]21+    digestmod = sha256
[6]22+except ImportError:
[10]23+    import Crypto.Hash.SHA256 as digestmod
24+    sha256 = digestmod.new
[6]25+
26+import hmac
27+
28+from zope.interface import implements
29+
30+from twisted.application import service
31+from twisted.internet import defer, reactor
32+from twisted.names.srvconnect import SRVConnector
[11]33+from twisted.python import log, randbytes
[6]34+from twisted.words.protocols.jabber import error, ijabber, jid, xmlstream
35+from twisted.words.xish import domish
36+
37+from wokkel.generic import DeferredXmlStreamFactory, XmlPipe
38+from wokkel.compat import XmlStreamServerFactory
39+
40+NS_DIALBACK = 'jabber:server:dialback'
41+
42+def generateKey(secret, receivingServer, originatingServer, streamID):
43+    """
44+    Generate a dialback key for server-to-server XMPP Streams.
45+
46+    The dialback key is generated using the algorithm described in
47+    U{XEP-0185<http://www.xmpp.org/extensions/xep-0185.html>}. The used
48+    terminology for the parameters is described in RFC-3920.
49+
50+    @param secret: the shared secret known to the Originating Server and
51+                   Authoritive Server.
52+    @type secret: C{str}
53+    @param receivingServer: the Receiving Server host name.
54+    @type receivingServer: C{str}
55+    @param originatingServer: the Originating Server host name.
56+    @type originatingServer: C{str}
57+    @param streamID: the Stream ID as generated by the Receiving Server.
58+    @type streamID: C{str}
59+    @return: hexadecimal digest of the generated key.
60+    @type: C{str}
61+    """
62+
63+    hashObject = sha256()
64+    hashObject.update(secret)
65+    hashedSecret = hashObject.hexdigest()
66+    message = " ".join([receivingServer, originatingServer, streamID])
[10]67+    hash = hmac.HMAC(hashedSecret, message, digestmod=digestmod)
[6]68+    return hash.hexdigest()
69+
70+
71+def trapStreamError(xs, observer):
72+    """
73+    Trap stream errors.
74+
75+    This wraps an observer to catch exceptions. In case of a
76+    L{error.StreamError}, it is send over the given XML stream. All other
77+    exceptions yield a C{'internal-server-error'} stream error, that is
78+    sent over the stream, while the exception is logged.
79+
80+    @return: Wrapped observer
81+    """
82+
83+    def wrappedObserver(element):
84+        try:
85+            observer(element)
86+        except error.StreamError, exc:
87+            xs.sendStreamError(exc)
88+        except:
89+            log.err()
90+            exc = error.StreamError('internal-server-error')
91+            xs.sendStreamError(exc)
92+
93+    return wrappedObserver
94+
95+
96+class XMPPServerConnector(SRVConnector):
97+    def __init__(self, reactor, domain, factory):
98+        SRVConnector.__init__(self, reactor, 'xmpp-server', domain, factory)
99+
100+
101+    def pickServer(self):
102+        host, port = SRVConnector.pickServer(self)
103+
104+        if not self.servers and not self.orderedServers:
105+            # no SRV record, fall back..
106+            port = 5269
107+
108+        return host, port
109+
110+
111+class DialbackFailed(Exception):
112+    pass
113+
114+
115+
116+class OriginatingDialbackInitializer(object):
117+    """
118+    Server Dialback Initializer for the Orginating Server.
119+    """
120+
121+    implements(ijabber.IInitiatingInitializer)
122+
123+    _deferred = None
124+
125+    def __init__(self, xs, thisHost, otherHost, secret):
126+        self.xmlstream = xs
127+        self.thisHost = thisHost
128+        self.otherHost = otherHost
129+        self.secret = secret
130+
131+
132+    def initialize(self):
133+        self._deferred = defer.Deferred()
134+        self.xmlstream.addObserver(xmlstream.STREAM_ERROR_EVENT,
135+                                   self.onStreamError)
136+        self.xmlstream.addObserver("/result[@xmlns='%s']" % NS_DIALBACK,
137+                                   self.onResult)
138+
139+        key = generateKey(self.secret, self.otherHost,
140+                          self.thisHost, self.xmlstream.sid)
141+
142+        result = domish.Element((NS_DIALBACK, 'result'))
143+        result['from'] = self.thisHost
144+        result['to'] = self.otherHost
145+        result.addContent(key)
146+
147+        self.xmlstream.send(result)
148+
149+        return self._deferred
150+
151+
152+    def onResult(self, result):
153+        self.xmlstream.removeObserver(xmlstream.STREAM_ERROR_EVENT,
154+                                      self.onStreamError)
155+        if result['type'] == 'valid':
156+            self.xmlstream.otherEntity = jid.internJID(self.otherHost)
157+            self._deferred.callback(None)
158+        else:
159+            self._deferred.errback(DialbackFailed())
160+
161+
162+    def onStreamError(self, failure):
163+        self.xmlstream.removeObserver("/result[@xmlns='%s']" % NS_DIALBACK,
164+                                      self.onResult)
165+        self._deferred.errback(failure)
166+
167+
168+
169+class ReceivingDialbackInitializer(object):
170+    """
171+    Server Dialback Initializer for the Receiving Server.
172+    """
173+
174+    implements(ijabber.IInitiatingInitializer)
175+
176+    _deferred = None
177+
178+    def __init__(self, xs, thisHost, otherHost, originalStreamID, key):
179+        self.xmlstream = xs
180+        self.thisHost = thisHost
181+        self.otherHost = otherHost
182+        self.originalStreamID = originalStreamID
183+        self.key = key
184+
185+
186+    def initialize(self):
187+        self._deferred = defer.Deferred()
188+        self.xmlstream.addObserver(xmlstream.STREAM_ERROR_EVENT,
189+                                   self.onStreamError)
190+        self.xmlstream.addObserver("/verify[@xmlns='%s']" % NS_DIALBACK,
191+                                   self.onVerify)
192+
193+        verify = domish.Element((NS_DIALBACK, 'verify'))
194+        verify['from'] = self.thisHost
195+        verify['to'] = self.otherHost
196+        verify['id'] = self.originalStreamID
197+        verify.addContent(self.key)
198+
199+        self.xmlstream.send(verify)
200+        return self._deferred
201+
202+
203+    def onVerify(self, verify):
204+        self.xmlstream.removeObserver(xmlstream.STREAM_ERROR_EVENT,
205+                                      self.onStreamError)
206+        if verify['id'] != self.originalStreamID:
207+            self.xmlstream.sendStreamError(error.StreamError('invalid-id'))
208+            self._deferred.errback(DialbackFailed())
209+        elif verify['to'] != self.thisHost:
210+            self.xmlstream.sendStreamError(error.StreamError('host-unknown'))
211+            self._deferred.errback(DialbackFailed())
212+        elif verify['from'] != self.otherHost:
213+            self.xmlstream.sendStreamError(error.StreamError('invalid-from'))
214+            self._deferred.errback(DialbackFailed())
215+        elif verify['type'] == 'valid':
216+            self._deferred.callback(None)
217+        else:
218+            self._deferred.errback(DialbackFailed())
219+
220+
221+    def onStreamError(self, failure):
222+        self.xmlstream.removeObserver("/verify[@xmlns='%s']" % NS_DIALBACK,
223+                                      self.onVerify)
224+        self._deferred.errback(failure)
225+
226+
227+
228+class XMPPServerConnectAuthenticator(xmlstream.ConnectAuthenticator):
[7]229+    """
230+    Authenticator for an outgoing XMPP server-to-server connection.
231+
232+    This authenticator connects to C{otherHost} (the Receiving Server) and then
233+    initiates dialback as C{thisHost} (the Originating Server) using
234+    L{OriginatingDialbackInitializer}.
235+
236+    @ivar thisHost: The domain this server connects from (the Originating
237+                    Server) .
238+    @ivar otherHost: The domain of the server this server connects to (the
239+                     Receiving Server).
240+    @ivar secret: The shared secret that is used for verifying the validity
241+                  of this new connection.
242+    """
[6]243+    namespace = 'jabber:server'
244+
245+    def __init__(self, thisHost, otherHost, secret):
246+        self.thisHost = thisHost
247+        self.otherHost = otherHost
248+        self.secret = secret
249+        xmlstream.ConnectAuthenticator.__init__(self, otherHost)
250+
251+
252+    def connectionMade(self):
253+        self.xmlstream.thisEntity = jid.internJID(self.thisHost)
254+        self.xmlstream.prefixes = {xmlstream.NS_STREAMS: 'stream',
255+                                   NS_DIALBACK: 'db'}
256+        xmlstream.ConnectAuthenticator.connectionMade(self)
257+
258+
259+    def associateWithStream(self, xs):
260+        xmlstream.ConnectAuthenticator.associateWithStream(self, xs)
261+        init = OriginatingDialbackInitializer(xs, self.thisHost,
262+                                              self.otherHost, self.secret)
263+        xs.initializers = [init]
264+
265+
266+
267+class XMPPServerVerifyAuthenticator(xmlstream.ConnectAuthenticator):
[7]268+    """
269+    Authenticator for an outgoing connection to verify an incoming connection.
270+
271+    This authenticator connects to C{otherHost} (the Authoritative Server) and
272+    then initiates dialback as C{thisHost} (the Receiving Server) using
273+    L{ReceivingDialbackInitializer}.
274+
275+    @ivar thisHost: The domain this server connects from (the Receiving
276+                    Server) .
277+    @ivar otherHost: The domain of the server this server connects to (the
278+                     Authoritative Server).
279+    @ivar originalStreamID: The stream ID of the incoming connection that is
280+                            being verified.
281+    @ivar key: The key provided by the Receving Server to be verified.
282+    """
[6]283+    namespace = 'jabber:server'
284+
285+    def __init__(self, thisHost, otherHost, originalStreamID, key):
286+        self.thisHost = thisHost
287+        self.otherHost = otherHost
288+        self.originalStreamID = originalStreamID
289+        self.key = key
290+        xmlstream.ConnectAuthenticator.__init__(self, otherHost)
291+
292+
293+    def connectionMade(self):
294+        self.xmlstream.thisEntity = jid.internJID(self.thisHost)
295+        self.xmlstream.prefixes = {xmlstream.NS_STREAMS: 'stream',
296+                                   NS_DIALBACK: 'db'}
297+        xmlstream.ConnectAuthenticator.connectionMade(self)
298+
299+
300+    def associateWithStream(self, xs):
301+        xmlstream.ConnectAuthenticator.associateWithStream(self, xs)
302+        init = ReceivingDialbackInitializer(xs, self.thisHost, self.otherHost,
303+                                            self.originalStreamID, self.key)
304+        xs.initializers = [init]
305+
306+
307+
308+class XMPPServerListenAuthenticator(xmlstream.ListenAuthenticator):
[7]309+    """
310+    Authenticator for an incoming XMPP server-to-server connection.
311+
312+    This authenticator handles two types of incoming connections. Regular
313+    server-to-server connections are from the Originating Server to the
314+    Receiving Server, where this server is the Receiving Server. These
315+    connections start out by receiving a dialback key, verifying the
316+    key with the Authoritative Server, and then accept normal XMPP stanzas.
317+
318+    The other type of connections is from a Receiving Server to an
319+    Authoritative Server, where this server acts as the Authoritative Server.
320+    These connections are used to verify the validity of an outgoing connection
321+    from this server. In this case, this server receives a verification
322+    request, checks the key and then returns the result.
323+
324+    @ivar service: The service that keeps the list of domains we accept
325+                   connections for.
326+    """
[6]327+    namespace = 'jabber:server'
328+
[7]329+    def __init__(self, service):
[6]330+        xmlstream.ListenAuthenticator.__init__(self)
[7]331+        self.service = service
[6]332+
333+
334+    def streamStarted(self, rootElement):
335+        xmlstream.ListenAuthenticator.streamStarted(self, rootElement)
336+
337+        # Compatibility fix for pre-8.2 implementations of ListenAuthenticator
338+        if not self.xmlstream.sid:
339+            self.xmlstream.sid = randbytes.secureRandom(8).encode('hex')
340+
[7]341+        if self.xmlstream.thisEntity:
342+            targetDomain = self.xmlstream.thisEntity.host
343+        else:
344+            targetDomain = self.service.defaultDomain
345+
346+        def prepareStream(domain):
[6]347+            self.xmlstream.namespace = self.namespace
348+            self.xmlstream.prefixes = {xmlstream.NS_STREAMS: 'stream',
349+                                       NS_DIALBACK: 'db'}
[11]350+            if domain:
351+                self.xmlstream.thisEntity = jid.internJID(domain)
[6]352+
353+        try:
354+            if xmlstream.NS_STREAMS != rootElement.uri or \
355+               self.namespace != self.xmlstream.namespace or \
356+               ('db', NS_DIALBACK) not in rootElement.localPrefixes.iteritems():
357+                raise error.StreamError('invalid-namespace')
358+
[12]359+            if targetDomain and targetDomain not in self.service.domains:
[6]360+                raise error.StreamError('host-unknown')
361+        except error.StreamError, exc:
[7]362+            prepareStream(self.service.defaultDomain)
[6]363+            self.xmlstream.sendStreamError(exc)
364+            return
365+
366+        self.xmlstream.addObserver("//verify[@xmlns='%s']" % NS_DIALBACK,
367+                                   trapStreamError(self.xmlstream,
368+                                                   self.onVerify))
369+        self.xmlstream.addObserver("//result[@xmlns='%s']" % NS_DIALBACK,
370+                                   self.onResult)
371+
[7]372+        prepareStream(targetDomain)
[6]373+        self.xmlstream.sendHeader()
374+
375+        if self.xmlstream.version >= (1, 0):
376+            features = domish.Element((xmlstream.NS_STREAMS, 'features'))
377+            self.xmlstream.send(features)
378+
379+
380+    def onVerify(self, verify):
381+        try:
[7]382+            receivingServer = jid.JID(verify['from']).host
383+            originatingServer = jid.JID(verify['to']).host
[6]384+        except (KeyError, jid.InvalidFormat):
385+            raise error.StreamError('improper-addressing')
386+
[7]387+        if originatingServer not in self.service.domains:
[6]388+            raise error.StreamError('host-unknown')
389+
[7]390+        if (self.xmlstream.otherEntity and
391+            receivingServer != self.xmlstream.otherEntity.host):
[6]392+            raise error.StreamError('invalid-from')
393+
394+        streamID = verify.getAttribute('id', '')
395+        key = unicode(verify)
396+
[7]397+        calculatedKey = generateKey(self.service.secret, receivingServer,
[6]398+                                    originatingServer, streamID)
399+        validity = (key == calculatedKey) and 'valid' or 'invalid'
400+
401+        reply = domish.Element((NS_DIALBACK, 'verify'))
402+        reply['from'] = originatingServer
403+        reply['to'] = receivingServer
404+        reply['id'] = streamID
405+        reply['type'] = validity
406+        self.xmlstream.send(reply)
407+
408+
409+    def onResult(self, result):
410+        def reply(validity):
411+            reply = domish.Element((NS_DIALBACK, 'result'))
412+            reply['from'] = result['to']
413+            reply['to'] = result['from']
414+            reply['type'] = validity
415+            self.xmlstream.send(reply)
416+
417+        def valid(xs):
418+            reply('valid')
[12]419+            if not self.xmlstream.thisEntity:
420+                self.xmlstream.thisEntity = jid.internJID(receivingServer)
[6]421+            self.xmlstream.otherEntity = jid.internJID(originatingServer)
422+            self.xmlstream.dispatch(self.xmlstream,
423+                                    xmlstream.STREAM_AUTHD_EVENT)
424+
425+        def invalid(failure):
[7]426+            log.err(failure)
[6]427+            reply('invalid')
428+
[7]429+        receivingServer = result['to']
[6]430+        originatingServer = result['from']
[7]431+        key = unicode(result)
[6]432+
[7]433+        d = self.service.validateConnection(receivingServer, originatingServer,
434+                                            self.xmlstream.sid, key)
435+        d.addCallbacks(valid, invalid)
436+        return d
[6]437+
438+
439+
440+class DeferredS2SClientFactory(DeferredXmlStreamFactory):
441+    """
442+    Deferred firing factory for initiating XMPP server-to-server connection.
443+
444+    The deferred has its callbacks called upon succesful authentication with
445+    the other server. In case of failed authentication or connection, the
446+    deferred will have its errbacks called instead.
447+    """
448+
[9]449+    logTraffic = False
450+
[7]451+    def __init__(self, authenticator):
[6]452+        DeferredXmlStreamFactory.__init__(self, authenticator)
453+
[7]454+        self.addBootstrap(xmlstream.STREAM_CONNECTED_EVENT,
455+                          self.onConnectionMade)
[6]456+
457+        self.serial = 0
458+
459+
[7]460+    def onConnectionMade(self, xs):
[6]461+        xs.serial = self.serial
462+        self.serial += 1
463+
464+        def logDataIn(buf):
465+            log.msg("RECV (%d): %r" % (xs.serial, buf))
466+
467+        def logDataOut(buf):
468+            log.msg("SEND (%d): %r" % (xs.serial, buf))
469+
470+        if self.logTraffic:
471+            xs.rawDataInFn = logDataIn
472+            xs.rawDataOutFn = logDataOut
473+
474+
475+
[7]476+def initiateS2S(factory):
477+    domain = factory.authenticator.otherHost
478+    c = XMPPServerConnector(reactor, domain, factory)
479+    c.connect()
480+    return factory.deferred
[6]481+
482+
483+
[7]484+class XMPPS2SServerFactory(XmlStreamServerFactory):
485+    """
486+    XMPP Server-to-Server Server factory.
[6]487+
[7]488+    This factory accepts XMPP server-to-server connections.
489+    """
[6]490+
[7]491+    logTraffic = False
[6]492+
[7]493+    def __init__(self, service):
494+        self.service = service
[6]495+
[7]496+        def authenticatorFactory():
497+            return XMPPServerListenAuthenticator(service)
[6]498+
[7]499+        XmlStreamServerFactory.__init__(self, authenticatorFactory)
500+        self.addBootstrap(xmlstream.STREAM_CONNECTED_EVENT,
501+                          self.onConnectionMade)
502+        self.addBootstrap(xmlstream.STREAM_AUTHD_EVENT,
503+                          self.onAuthenticated)
[6]504+
[7]505+        self.serial = 0
[6]506+
507+
[7]508+    def onConnectionMade(self, xs):
509+        """
510+        Called when a server-to-server connection was made.
511+
512+        This enables traffic debugging on incoming streams.
513+        """
514+        xs.serial = self.serial
515+        self.serial += 1
516+
517+        def logDataIn(buf):
518+            log.msg("RECV (%d): %r" % (xs.serial, buf))
519+
520+        def logDataOut(buf):
521+            log.msg("SEND (%d): %r" % (xs.serial, buf))
522+
523+        if self.logTraffic:
524+            xs.rawDataInFn = logDataIn
525+            xs.rawDataOutFn = logDataOut
526+
527+        xs.addObserver(xmlstream.STREAM_ERROR_EVENT, self.onError)
528+
529+
530+    def onAuthenticated(self, xs):
531+        thisHost = xs.thisEntity.host
532+        otherHost = xs.otherEntity.host
533+
534+        log.msg("Incoming connection %d from %r to %r established" %
535+                (xs.serial, otherHost, thisHost))
536+
537+        xs.addObserver(xmlstream.STREAM_END_EVENT, self.onConnectionLost,
538+                                                   0, xs)
539+        xs.addObserver('/*', self.onElement, 0, xs)
540+
541+
542+    def onConnectionLost(self, xs, reason):
543+        thisHost = xs.thisEntity.host
544+        otherHost = xs.otherEntity.host
545+
546+        log.msg("Incoming connection %d from %r to %r disconnected" %
547+                (xs.serial, otherHost, thisHost))
548+
549+
550+    def onError(self, reason):
551+        log.err(reason, "Stream Error")
[6]552+
553+
[9]554+    def onElement(self, xs, element):
[6]555+        """
556+        Called when an element was received from one of the connected streams.
557+
558+        """
559+        if element.handled:
560+            return
561+        else:
[9]562+            self.service.dispatch(xs, element)
[7]563+
564+
565+
566+class ServerService(object):
567+    """
568+    Service for managing XMPP server to server connections.
569+    """
570+
571+    logTraffic = False
572+
[11]573+    def __init__(self, router, domain=None, secret=None):
[7]574+        self.router = router
[11]575+
[7]576+        self.defaultDomain = domain
[11]577+        self.domains = set()
578+        if self.defaultDomain:
579+            self.domains.add(self.defaultDomain)
580+
581+        if secret is not None:
582+            self.secret = secret
583+        else:
584+            self.secret = randbytes.secureRandom(16).encode('hex')
[7]585+
586+        self._outgoingStreams = {}
587+        self._outgoingQueues = {}
588+        self._outgoingConnecting = set()
589+        self.serial = 0
590+
591+        pipe = XmlPipe()
592+        self.xmlstream = pipe.source
593+        self.router.addRoute(None, pipe.sink)
594+        self.xmlstream.addObserver('/*', self.send)
595+
596+
597+    def outgoingInitialized(self, xs):
598+        thisHost = xs.thisEntity.host
599+        otherHost = xs.otherEntity.host
600+
601+        log.msg("Outgoing connection %d from %r to %r established" %
602+                (xs.serial, thisHost, otherHost))
603+
604+        self._outgoingStreams[thisHost, otherHost] = xs
605+        xs.addObserver(xmlstream.STREAM_END_EVENT,
606+                       lambda _: self.outgoingDisconnected(xs))
607+
608+        if (thisHost, otherHost) in self._outgoingQueues:
609+            for element in self._outgoingQueues[thisHost, otherHost]:
610+                xs.send(element)
611+            del self._outgoingQueues[thisHost, otherHost]
612+
613+
614+    def outgoingDisconnected(self, xs):
615+        thisHost = xs.thisEntity.host
616+        otherHost = xs.otherEntity.host
617+
618+        log.msg("Outgoing connection %d from %r to %r disconnected" %
619+                (xs.serial, thisHost, otherHost))
620+
621+        del self._outgoingStreams[thisHost, otherHost]
622+
623+
624+    def initiateOutgoingStream(self, thisHost, otherHost):
625+        """
626+        Initiate an outgoing XMPP server-to-server connection.
627+        """
628+
629+        def resetConnecting(_):
630+            self._outgoingConnecting.remove((thisHost, otherHost))
631+
632+        if (thisHost, otherHost) in self._outgoingConnecting:
633+            return
634+
635+        authenticator = XMPPServerConnectAuthenticator(thisHost,
636+                                                       otherHost,
637+                                                       self.secret)
638+        factory = DeferredS2SClientFactory(authenticator)
639+        factory.addBootstrap(xmlstream.STREAM_AUTHD_EVENT,
640+                             self.outgoingInitialized)
[9]641+        factory.logTraffic = self.logTraffic
[7]642+
643+        self._outgoingConnecting.add((thisHost, otherHost))
644+
645+        d = initiateS2S(factory)
646+        d.addBoth(resetConnecting)
647+        return d
648+
649+
650+    def validateConnection(self, thisHost, otherHost, sid, key):
651+        """
652+        Validate an incoming XMPP server-to-server connection.
653+        """
654+
655+        def connected(xs):
656+            # Set up stream for immediate disconnection.
657+            def disconnect(_):
658+                xs.transport.loseConnection()
659+            xs.addObserver(xmlstream.STREAM_AUTHD_EVENT, disconnect)
660+            xs.addObserver(xmlstream.INIT_FAILED_EVENT, disconnect)
661+
662+        authenticator = XMPPServerVerifyAuthenticator(thisHost, otherHost,
663+                                                      sid, key)
664+        factory = DeferredS2SClientFactory(authenticator)
665+        factory.addBootstrap(xmlstream.STREAM_CONNECTED_EVENT, connected)
[9]666+        factory.logTraffic = self.logTraffic
[7]667+
668+        d = initiateS2S(factory)
669+        return d
[6]670+
671+
672+    def send(self, stanza):
673+        """
674+        Send stanza to the proper XML Stream.
675+
676+        This uses addressing embedded in the stanza to find the correct stream
677+        to forward the stanza to.
678+        """
679+
[7]680+        otherHost = jid.internJID(stanza["to"]).host
681+        thisHost = jid.internJID(stanza["from"]).host
[6]682+
[7]683+        if (thisHost, otherHost) not in self._outgoingStreams:
[6]684+            # There is no connection with the destination (yet). Cache the
685+            # outgoing stanza until the connection has been established.
686+            # XXX: If the connection cannot be established, the queue should
687+            #      be emptied at some point.
[7]688+            if (thisHost, otherHost) not in self._outgoingQueues:
689+                self._outgoingQueues[(thisHost, otherHost)] = []
690+            self._outgoingQueues[(thisHost, otherHost)].append(stanza)
691+            self.initiateOutgoingStream(thisHost, otherHost)
[6]692+        else:
[7]693+            self._outgoingStreams[(thisHost, otherHost)].send(stanza)
694+
695+
[9]696+    def dispatch(self, xs, stanza):
[7]697+        """
698+        Send on element to be routed within the server.
699+        """
[9]700+        stanzaFrom = stanza.getAttribute('from')
701+        stanzaTo = stanza.getAttribute('to')
702+
703+        if not stanzaFrom or not stanzaTo:
704+            xs.sendStreamError(error.StreamError('improper-addressing'))
705+        else:
706+            try:
707+                sender = jid.internJID(stanzaFrom)
708+                recipient = jid.internJID(stanzaTo)
709+            except jid.InvalidFormat:
710+                log.msg("Dropping error stanza with malformed JID")
711+
712+            if sender.host != xs.otherEntity.host:
713+                xs.sendStreamError(error.StreamError('invalid-from'))
714+            else:
715+                self.xmlstream.send(stanza)
[12]716diff -r fd1dc58fe561 wokkel/test/test_server.py
[6]717--- /dev/null   Thu Jan 01 00:00:00 1970 +0000
[12]718+++ b/wokkel/test/test_server.py        Wed Apr 22 01:47:14 2009 -0700
[9]719@@ -0,0 +1,450 @@
[6]720+# Copyright (c) 2003-2008 Ralph Meijer
721+# See LICENSE for details.
722+
723+"""
724+Tests for L{wokkel.server}.
725+"""
726+
[7]727+from twisted.internet import defer
728+from twisted.python import failure
729+from twisted.test.proto_helpers import StringTransport
[6]730+from twisted.trial import unittest
[7]731+from twisted.words.protocols.jabber import error, jid, xmlstream
732+from twisted.words.xish import domish
[6]733+
[7]734+from wokkel import component, server
[6]735+
736+NS_STREAMS = 'http://etherx.jabber.org/streams'
[7]737+NS_DIALBACK = "jabber:server:dialback"
[6]738+
739+class GenerateKeyTest(unittest.TestCase):
740+    """
741+    Tests for L{server.generateKey}.
742+    """
743+
744+    def testBasic(self):
[7]745+        originating = "example.org"
746+        receiving = "xmpp.example.com"
747+        sid = "D60000229F"
[6]748+        secret = "s3cr3tf0rd14lb4ck"
749+
[7]750+        key = server.generateKey(secret, receiving, originating, sid)
[6]751+
752+        self.assertEqual(key,
[7]753+            '37c69b1cf07a3f67c04a5ef5902fa5114f2c76fe4a2686482ba5b89323075643')
[6]754+
755+
756+
757+class XMPPServerListenAuthenticatorTest(unittest.TestCase):
758+    """
759+    Tests for L{server.XMPPServerListenAuthenticator}.
760+    """
761+
[7]762+    secret = "s3cr3tf0rd14lb4ck"
763+    originating = "example.org"
764+    receiving = "xmpp.example.com"
765+    sid = "D60000229F"
766+    key = '37c69b1cf07a3f67c04a5ef5902fa5114f2c76fe4a2686482ba5b89323075643'
[6]767+
768+    def setUp(self):
769+        self.output = []
[7]770+
771+        class MyService(object):
772+            pass
773+
774+        self.service = MyService()
775+        self.service.defaultDomain = self.receiving
776+        self.service.domains = [self.receiving, 'pubsub.'+self.receiving]
777+        self.service.secret = self.secret
778+
779+        self.authenticator = server.XMPPServerListenAuthenticator(self.service)
[6]780+        self.xmlstream = xmlstream.XmlStream(self.authenticator)
781+        self.xmlstream.send = self.output.append
782+        self.xmlstream.transport = StringTransport()
783+
784+
785+    def test_attributes(self):
786+        """
787+        Test attributes of authenticator and stream objects.
788+        """
[7]789+        self.assertEqual(self.service, self.authenticator.service)
790+        self.assertEqual(self.xmlstream.initiating, False)
[6]791+
792+
793+    def test_streamStartedVersion0(self):
794+        """
795+        The authenticator supports pre-XMPP 1.0 streams.
796+        """
797+        self.xmlstream.connectionMade()
798+        self.xmlstream.dataReceived(
799+            "<stream:stream xmlns:stream='http://etherx.jabber.org/streams' "
800+                           "xmlns:db='jabber:server:dialback' "
801+                           "xmlns='jabber:server' "
[7]802+                           "to='xmpp.example.com'>")
[6]803+        self.assertEqual((0, 0), self.xmlstream.version)
804+
805+
806+    def test_streamStartedVersion1(self):
807+        """
808+        The authenticator supports XMPP 1.0 streams.
809+        """
810+        self.xmlstream.connectionMade()
811+        self.xmlstream.dataReceived(
812+            "<stream:stream xmlns:stream='http://etherx.jabber.org/streams' "
813+                           "xmlns:db='jabber:server:dialback' "
814+                           "xmlns='jabber:server' "
[7]815+                           "to='xmpp.example.com' "
[6]816+                           "version='1.0'>")
817+        self.assertEqual((1, 0), self.xmlstream.version)
818+
819+
820+    def test_streamStartedSID(self):
821+        """
822+        The response stream will have a stream ID.
823+        """
824+        self.xmlstream.connectionMade()
825+        self.assertIdentical(None, self.xmlstream.sid)
826+
827+        self.xmlstream.dataReceived(
828+            "<stream:stream xmlns:stream='http://etherx.jabber.org/streams' "
829+                           "xmlns:db='jabber:server:dialback' "
830+                           "xmlns='jabber:server' "
[7]831+                           "to='xmpp.example.com' "
[6]832+                           "version='1.0'>")
833+        self.assertNotIdentical(None, self.xmlstream.sid)
834+
835+
836+    def test_streamStartedSentResponseHeader(self):
837+        """
838+        A stream header is sent in response to the incoming stream header.
839+        """
840+        self.xmlstream.connectionMade()
841+        self.assertFalse(self.xmlstream._headerSent)
842+
843+        self.xmlstream.dataReceived(
844+            "<stream:stream xmlns:stream='http://etherx.jabber.org/streams' "
845+                           "xmlns:db='jabber:server:dialback' "
846+                           "xmlns='jabber:server' "
[7]847+                           "to='xmpp.example.com'>")
[6]848+        self.assertTrue(self.xmlstream._headerSent)
849+
850+
851+    def test_streamStartedNotSentFeatures(self):
852+        """
853+        No features are sent in response to an XMPP < 1.0 stream header.
854+        """
855+        self.xmlstream.connectionMade()
856+        self.xmlstream.dataReceived(
857+            "<stream:stream xmlns:stream='http://etherx.jabber.org/streams' "
858+                           "xmlns:db='jabber:server:dialback' "
859+                           "xmlns='jabber:server' "
[7]860+                           "to='xmpp.example.com'>")
[6]861+        self.assertEqual(1, len(self.output))
862+
863+
864+    def test_streamStartedSentFeatures(self):
865+        """
866+        Features are sent in response to an XMPP >= 1.0 stream header.
867+        """
868+        self.xmlstream.connectionMade()
869+        self.xmlstream.dataReceived(
870+            "<stream:stream xmlns:stream='http://etherx.jabber.org/streams' "
871+                           "xmlns:db='jabber:server:dialback' "
872+                           "xmlns='jabber:server' "
[7]873+                           "to='xmpp.example.com' "
[6]874+                           "version='1.0'>")
875+        self.assertEqual(2, len(self.output))
876+        features = self.output[-1]
877+        self.assertEqual(NS_STREAMS, features.uri)
878+        self.assertEqual('features', features.name)
879+
880+
881+    def test_streamRootElement(self):
882+        """
883+        Test stream error on wrong stream namespace.
884+        """
885+        self.xmlstream.connectionMade()
886+        self.xmlstream.dataReceived(
887+            "<stream:stream xmlns:stream='badns' "
888+                           "xmlns:db='jabber:server:dialback' "
889+                           "xmlns='jabber:server' "
[7]890+                           "to='xmpp.example.com'>")
[6]891+
[7]892+        self.assertEqual(3, len(self.output))
[6]893+        exc = error.exceptionFromStreamError(self.output[1])
[7]894+        self.assertEqual('invalid-namespace', exc.condition)
[6]895+
896+
897+    def test_streamDefaultNamespace(self):
898+        """
899+        Test stream error on missing dialback namespace.
900+        """
901+        self.xmlstream.connectionMade()
902+        self.xmlstream.dataReceived(
903+            "<stream:stream xmlns:stream='http://etherx.jabber.org/streams' "
904+                           "xmlns:db='jabber:server:dialback' "
905+                           "xmlns='badns' "
[7]906+                           "to='xmpp.example.com'>")
[6]907+
[7]908+        self.assertEqual(3, len(self.output))
[6]909+        exc = error.exceptionFromStreamError(self.output[1])
[7]910+        self.assertEqual('invalid-namespace', exc.condition)
[6]911+
912+
913+    def test_streamNoDialbackNamespace(self):
914+        """
915+        Test stream error on missing dialback namespace.
916+        """
917+        self.xmlstream.connectionMade()
918+        self.xmlstream.dataReceived(
919+            "<stream:stream xmlns:stream='http://etherx.jabber.org/streams' "
920+                           "xmlns='jabber:server' "
[7]921+                           "to='xmpp.example.com'>")
[6]922+
[7]923+        self.assertEqual(3, len(self.output))
[6]924+        exc = error.exceptionFromStreamError(self.output[1])
[7]925+        self.assertEqual('invalid-namespace', exc.condition)
[6]926+
927+
928+    def test_streamBadDialbackNamespace(self):
929+        """
930+        Test stream error on missing dialback namespace.
931+        """
932+        self.xmlstream.connectionMade()
933+        self.xmlstream.dataReceived(
934+            "<stream:stream xmlns:stream='http://etherx.jabber.org/streams' "
935+                           "xmlns:db='badns' "
936+                           "xmlns='jabber:server' "
[7]937+                           "to='xmpp.example.com'>")
[6]938+
[7]939+        self.assertEqual(3, len(self.output))
[6]940+        exc = error.exceptionFromStreamError(self.output[1])
[7]941+        self.assertEqual('invalid-namespace', exc.condition)
[6]942+
943+
944+    def test_streamToUnknownHost(self):
945+        """
946+        Test stream error on stream's to attribute having unknown host.
947+        """
948+        self.xmlstream.connectionMade()
949+        self.xmlstream.dataReceived(
950+            "<stream:stream xmlns:stream='http://etherx.jabber.org/streams' "
951+                           "xmlns:db='jabber:server:dialback' "
952+                           "xmlns='jabber:server' "
953+                           "to='badhost'>")
954+
[7]955+        self.assertEqual(3, len(self.output))
[6]956+        exc = error.exceptionFromStreamError(self.output[1])
[7]957+        self.assertEqual('host-unknown', exc.condition)
958+
959+
960+    def test_streamToOtherLocalHost(self):
961+        """
962+        The authenticator supports XMPP 1.0 streams.
963+        """
964+        self.xmlstream.connectionMade()
965+        self.xmlstream.dataReceived(
966+            "<stream:stream xmlns:stream='http://etherx.jabber.org/streams' "
967+                           "xmlns:db='jabber:server:dialback' "
968+                           "xmlns='jabber:server' "
969+                           "to='pubsub.xmpp.example.com' "
970+                           "version='1.0'>")
971+
972+        self.assertEqual(2, len(self.output))
973+        self.assertEqual(jid.JID('pubsub.xmpp.example.com'),
974+                         self.xmlstream.thisEntity)
975+
976+    def test_onResult(self):
977+        def cb(result):
978+            self.assertEqual(1, len(self.output))
979+            reply = self.output[0]
980+            self.assertEqual(self.originating, reply['to'])
981+            self.assertEqual(self.receiving, reply['from'])
982+            self.assertEqual('valid', reply['type'])
983+
984+        def validateConnection(thisHost, otherHost, sid, key):
985+            self.assertEqual(thisHost, self.receiving)
986+            self.assertEqual(otherHost, self.originating)
987+            self.assertEqual(sid, self.sid)
988+            self.assertEqual(key, self.key)
989+            return defer.succeed(None)
990+
991+        self.xmlstream.sid = self.sid
992+        self.service.validateConnection = validateConnection
993+
994+        result = domish.Element((NS_DIALBACK, 'result'))
995+        result['to'] = self.receiving
996+        result['from'] = self.originating
997+        result.addContent(self.key)
998+
999+        d = self.authenticator.onResult(result)
1000+        d.addCallback(cb)
1001+        return d
1002+
1003+
1004+    def test_onResultFailure(self):
1005+        class TestError(Exception):
1006+            pass
1007+
1008+        def cb(result):
1009+            reply = self.output[0]
1010+            self.assertEqual('invalid', reply['type'])
1011+            self.assertEqual(1, len(self.flushLoggedErrors(TestError)))
1012+
1013+
1014+        def validateConnection(thisHost, otherHost, sid, key):
1015+            return defer.fail(TestError())
1016+
1017+        self.xmlstream.sid = self.sid
1018+        self.service.validateConnection = validateConnection
1019+
1020+        result = domish.Element((NS_DIALBACK, 'result'))
1021+        result['to'] = self.receiving
1022+        result['from'] = self.originating
1023+        result.addContent(self.key)
1024+
1025+        d = self.authenticator.onResult(result)
1026+        d.addCallback(cb)
1027+        return d
1028+
1029+
1030+
1031+class FakeService(object):
1032+    domains = set(['example.org', 'pubsub.example.org'])
1033+    defaultDomain = 'example.org'
1034+    secret = 'mysecret'
1035+
1036+    def __init__(self):
1037+        self.dispatched = []
1038+
[9]1039+    def dispatch(self, xs, element):
1040+        self.dispatched.append(element)
[7]1041+
1042+
1043+
1044+class XMPPS2SServerFactoryTest(unittest.TestCase):
1045+    """
1046+    Tests for L{component.XMPPS2SServerFactory}.
1047+    """
1048+
1049+    def setUp(self):
[9]1050+        self.service = FakeService()
[7]1051+        self.factory = server.XMPPS2SServerFactory(self.service)
1052+        self.xmlstream = self.factory.buildProtocol(None)
[9]1053+        self.transport = StringTransport()
[7]1054+        self.xmlstream.thisEntity = jid.JID('example.org')
1055+        self.xmlstream.otherEntity = jid.JID('example.com')
1056+
1057+
1058+    def test_makeConnection(self):
1059+        """
1060+        A new connection increases the stream serial count. No logs by default.
1061+        """
[9]1062+        self.xmlstream.makeConnection(self.transport)
[7]1063+        self.assertEqual(0, self.xmlstream.serial)
1064+        self.assertEqual(1, self.factory.serial)
1065+        self.assertIdentical(None, self.xmlstream.rawDataInFn)
1066+        self.assertIdentical(None, self.xmlstream.rawDataOutFn)
1067+
1068+
1069+    def test_makeConnectionLogTraffic(self):
1070+        """
1071+        Setting logTraffic should set up raw data loggers.
1072+        """
1073+        self.factory.logTraffic = True
[9]1074+        self.xmlstream.makeConnection(self.transport)
[7]1075+        self.assertNotIdentical(None, self.xmlstream.rawDataInFn)
1076+        self.assertNotIdentical(None, self.xmlstream.rawDataOutFn)
1077+
1078+
1079+    def test_onError(self):
1080+        """
1081+        An observer for stream errors should trigger onError to log it.
1082+        """
[9]1083+        self.xmlstream.makeConnection(self.transport)
[7]1084+
1085+        class TestError(Exception):
1086+            pass
1087+
1088+        reason = failure.Failure(TestError())
1089+        self.xmlstream.dispatch(reason, xmlstream.STREAM_ERROR_EVENT)
1090+        self.assertEqual(1, len(self.flushLoggedErrors(TestError)))
1091+
1092+
1093+    def test_connectionInitialized(self):
1094+        """
1095+        """
[9]1096+        self.xmlstream.makeConnection(self.transport)
[7]1097+        self.xmlstream.dispatch(self.xmlstream, xmlstream.STREAM_AUTHD_EVENT)
1098+
1099+
1100+    def test_connectionLost(self):
1101+        """
1102+        """
[9]1103+        self.xmlstream.makeConnection(self.transport)
[7]1104+        self.xmlstream.dispatch(self.xmlstream, xmlstream.STREAM_AUTHD_EVENT)
1105+        self.xmlstream.dispatch(None, xmlstream.STREAM_END_EVENT)
1106+
1107+
[9]1108+    def test_Element(self):
1109+        self.xmlstream.makeConnection(self.transport)
1110+        self.xmlstream.dispatch(self.xmlstream, xmlstream.STREAM_AUTHD_EVENT)
1111+
1112+        stanza = domish.Element((None, "presence"))
1113+        self.xmlstream.dispatch(stanza)
1114+        self.assertEqual(1, len(self.service.dispatched))
1115+        self.assertIdentical(stanza, self.service.dispatched[-1])
1116+
1117+
[7]1118+    def test_ElementNotAuthenticated(self):
[9]1119+        self.xmlstream.makeConnection(self.transport)
1120+
1121+        stanza = domish.Element((None, "presence"))
1122+        self.xmlstream.dispatch(stanza)
1123+        self.assertEqual(0, len(self.service.dispatched))
1124+
[7]1125+
1126+
1127+class ServerServiceTest(unittest.TestCase):
1128+
1129+    def setUp(self):
[9]1130+        self.output = []
1131+
1132+        self.xmlstream = xmlstream.XmlStream(xmlstream.Authenticator())
1133+        self.xmlstream.thisEntity = jid.JID('example.org')
1134+        self.xmlstream.otherEntity = jid.JID('example.com')
1135+        self.xmlstream.send = self.output.append
1136+
[7]1137+        self.router = component.Router()
1138+        self.service = server.ServerService(self.router,
1139+                                            secret='mysecret',
1140+                                            domain='example.org')
[9]1141+        self.service.xmlstream = self.xmlstream
[7]1142+
1143+
1144+    def test_defaultDomainInDomains(self):
1145+        """
1146+        The default domain is part of the domains considered local.
1147+        """
1148+        self.assertIn(self.service.defaultDomain, self.service.domains)
[9]1149+
1150+
1151+    def test_dispatch(self):
1152+        stanza = domish.Element((None, "presence"))
1153+        stanza['to'] = 'user@example.org'
1154+        stanza['from'] = 'other@example.com'
1155+        self.service.dispatch(self.xmlstream, stanza)
1156+
1157+        self.assertEqual(1, len(self.output))
1158+        self.assertIdentical(stanza, self.output[-1])
1159+
1160+
1161+    def test_dispatchNoTo(self):
1162+        errors = []
1163+        self.xmlstream.sendStreamError = errors.append
1164+
1165+        stanza = domish.Element((None, "presence"))
1166+        stanza['from'] = 'other@example.com'
1167+        self.service.dispatch(self.xmlstream, stanza)
1168+
1169+        self.assertEqual(1, len(errors))
Note: See TracBrowser for help on using the repository browser.