source: ralphm-patches/s2s.patch @ 11:8294ad7253bd

Last change on this file since 11:8294ad7253bd was 11:8294ad7253bd, checked in by Ralph Meijer <ralphm@…>, 12 years ago

Allow no default domain, generate a secret if not provided.

File size: 39.7 KB
  • new file wokkel/server.py

    diff -r 313d45b505a7 wokkel/server.py
    - +  
     1# -*- test-case-name: wokkel.test.test_server -*-
     2#
     3# Copyright (c) 2003-2008 Ralph Meijer
     4# See LICENSE for details.
     5
     6"""
     7XMPP Server-to-Server protocol.
     8
     9This module implements several aspects of XMPP server-to-server communications
     10as described in XMPP Core (RFC 3920). Refer to that document for the meaning
     11of the used terminology.
     12"""
     13
     14# hashlib is new in Python 2.5, try that first.
     15try:
     16    from hashlib import sha256
     17    digestmod = sha256
     18except ImportError:
     19    import Crypto.Hash.SHA256 as digestmod
     20    sha256 = digestmod.new
     21
     22import hmac
     23
     24from zope.interface import implements
     25
     26from twisted.application import service
     27from twisted.internet import defer, reactor
     28from twisted.names.srvconnect import SRVConnector
     29from twisted.python import log, randbytes
     30from twisted.words.protocols.jabber import error, ijabber, jid, xmlstream
     31from twisted.words.xish import domish
     32
     33from wokkel.generic import DeferredXmlStreamFactory, XmlPipe
     34from wokkel.compat import XmlStreamServerFactory
     35
     36NS_DIALBACK = 'jabber:server:dialback'
     37
     38def generateKey(secret, receivingServer, originatingServer, streamID):
     39    """
     40    Generate a dialback key for server-to-server XMPP Streams.
     41
     42    The dialback key is generated using the algorithm described in
     43    U{XEP-0185<http://www.xmpp.org/extensions/xep-0185.html>}. The used
     44    terminology for the parameters is described in RFC-3920.
     45
     46    @param secret: the shared secret known to the Originating Server and
     47                   Authoritive Server.
     48    @type secret: C{str}
     49    @param receivingServer: the Receiving Server host name.
     50    @type receivingServer: C{str}
     51    @param originatingServer: the Originating Server host name.
     52    @type originatingServer: C{str}
     53    @param streamID: the Stream ID as generated by the Receiving Server.
     54    @type streamID: C{str}
     55    @return: hexadecimal digest of the generated key.
     56    @type: C{str}
     57    """
     58
     59    hashObject = sha256()
     60    hashObject.update(secret)
     61    hashedSecret = hashObject.hexdigest()
     62    message = " ".join([receivingServer, originatingServer, streamID])
     63    hash = hmac.HMAC(hashedSecret, message, digestmod=digestmod)
     64    return hash.hexdigest()
     65
     66
     67def trapStreamError(xs, observer):
     68    """
     69    Trap stream errors.
     70
     71    This wraps an observer to catch exceptions. In case of a
     72    L{error.StreamError}, it is send over the given XML stream. All other
     73    exceptions yield a C{'internal-server-error'} stream error, that is
     74    sent over the stream, while the exception is logged.
     75
     76    @return: Wrapped observer
     77    """
     78
     79    def wrappedObserver(element):
     80        try:
     81            observer(element)
     82        except error.StreamError, exc:
     83            xs.sendStreamError(exc)
     84        except:
     85            log.err()
     86            exc = error.StreamError('internal-server-error')
     87            xs.sendStreamError(exc)
     88
     89    return wrappedObserver
     90
     91
     92class XMPPServerConnector(SRVConnector):
     93    def __init__(self, reactor, domain, factory):
     94        SRVConnector.__init__(self, reactor, 'xmpp-server', domain, factory)
     95
     96
     97    def pickServer(self):
     98        host, port = SRVConnector.pickServer(self)
     99
     100        if not self.servers and not self.orderedServers:
     101            # no SRV record, fall back..
     102            port = 5269
     103
     104        return host, port
     105
     106
     107class DialbackFailed(Exception):
     108    pass
     109
     110
     111
     112class OriginatingDialbackInitializer(object):
     113    """
     114    Server Dialback Initializer for the Orginating Server.
     115    """
     116
     117    implements(ijabber.IInitiatingInitializer)
     118
     119    _deferred = None
     120
     121    def __init__(self, xs, thisHost, otherHost, secret):
     122        self.xmlstream = xs
     123        self.thisHost = thisHost
     124        self.otherHost = otherHost
     125        self.secret = secret
     126
     127
     128    def initialize(self):
     129        self._deferred = defer.Deferred()
     130        self.xmlstream.addObserver(xmlstream.STREAM_ERROR_EVENT,
     131                                   self.onStreamError)
     132        self.xmlstream.addObserver("/result[@xmlns='%s']" % NS_DIALBACK,
     133                                   self.onResult)
     134
     135        key = generateKey(self.secret, self.otherHost,
     136                          self.thisHost, self.xmlstream.sid)
     137
     138        result = domish.Element((NS_DIALBACK, 'result'))
     139        result['from'] = self.thisHost
     140        result['to'] = self.otherHost
     141        result.addContent(key)
     142
     143        self.xmlstream.send(result)
     144
     145        return self._deferred
     146
     147
     148    def onResult(self, result):
     149        self.xmlstream.removeObserver(xmlstream.STREAM_ERROR_EVENT,
     150                                      self.onStreamError)
     151        if result['type'] == 'valid':
     152            self.xmlstream.otherEntity = jid.internJID(self.otherHost)
     153            self._deferred.callback(None)
     154        else:
     155            self._deferred.errback(DialbackFailed())
     156
     157
     158    def onStreamError(self, failure):
     159        self.xmlstream.removeObserver("/result[@xmlns='%s']" % NS_DIALBACK,
     160                                      self.onResult)
     161        self._deferred.errback(failure)
     162
     163
     164
     165class ReceivingDialbackInitializer(object):
     166    """
     167    Server Dialback Initializer for the Receiving Server.
     168    """
     169
     170    implements(ijabber.IInitiatingInitializer)
     171
     172    _deferred = None
     173
     174    def __init__(self, xs, thisHost, otherHost, originalStreamID, key):
     175        self.xmlstream = xs
     176        self.thisHost = thisHost
     177        self.otherHost = otherHost
     178        self.originalStreamID = originalStreamID
     179        self.key = key
     180
     181
     182    def initialize(self):
     183        self._deferred = defer.Deferred()
     184        self.xmlstream.addObserver(xmlstream.STREAM_ERROR_EVENT,
     185                                   self.onStreamError)
     186        self.xmlstream.addObserver("/verify[@xmlns='%s']" % NS_DIALBACK,
     187                                   self.onVerify)
     188
     189        verify = domish.Element((NS_DIALBACK, 'verify'))
     190        verify['from'] = self.thisHost
     191        verify['to'] = self.otherHost
     192        verify['id'] = self.originalStreamID
     193        verify.addContent(self.key)
     194
     195        self.xmlstream.send(verify)
     196        return self._deferred
     197
     198
     199    def onVerify(self, verify):
     200        self.xmlstream.removeObserver(xmlstream.STREAM_ERROR_EVENT,
     201                                      self.onStreamError)
     202        if verify['id'] != self.originalStreamID:
     203            self.xmlstream.sendStreamError(error.StreamError('invalid-id'))
     204            self._deferred.errback(DialbackFailed())
     205        elif verify['to'] != self.thisHost:
     206            self.xmlstream.sendStreamError(error.StreamError('host-unknown'))
     207            self._deferred.errback(DialbackFailed())
     208        elif verify['from'] != self.otherHost:
     209            self.xmlstream.sendStreamError(error.StreamError('invalid-from'))
     210            self._deferred.errback(DialbackFailed())
     211        elif verify['type'] == 'valid':
     212            self._deferred.callback(None)
     213        else:
     214            self._deferred.errback(DialbackFailed())
     215
     216
     217    def onStreamError(self, failure):
     218        self.xmlstream.removeObserver("/verify[@xmlns='%s']" % NS_DIALBACK,
     219                                      self.onVerify)
     220        self._deferred.errback(failure)
     221
     222
     223
     224class XMPPServerConnectAuthenticator(xmlstream.ConnectAuthenticator):
     225    """
     226    Authenticator for an outgoing XMPP server-to-server connection.
     227
     228    This authenticator connects to C{otherHost} (the Receiving Server) and then
     229    initiates dialback as C{thisHost} (the Originating Server) using
     230    L{OriginatingDialbackInitializer}.
     231
     232    @ivar thisHost: The domain this server connects from (the Originating
     233                    Server) .
     234    @ivar otherHost: The domain of the server this server connects to (the
     235                     Receiving Server).
     236    @ivar secret: The shared secret that is used for verifying the validity
     237                  of this new connection.
     238    """
     239    namespace = 'jabber:server'
     240
     241    def __init__(self, thisHost, otherHost, secret):
     242        self.thisHost = thisHost
     243        self.otherHost = otherHost
     244        self.secret = secret
     245        xmlstream.ConnectAuthenticator.__init__(self, otherHost)
     246
     247
     248    def connectionMade(self):
     249        self.xmlstream.thisEntity = jid.internJID(self.thisHost)
     250        self.xmlstream.prefixes = {xmlstream.NS_STREAMS: 'stream',
     251                                   NS_DIALBACK: 'db'}
     252        xmlstream.ConnectAuthenticator.connectionMade(self)
     253
     254
     255    def associateWithStream(self, xs):
     256        xmlstream.ConnectAuthenticator.associateWithStream(self, xs)
     257        init = OriginatingDialbackInitializer(xs, self.thisHost,
     258                                              self.otherHost, self.secret)
     259        xs.initializers = [init]
     260
     261
     262
     263class XMPPServerVerifyAuthenticator(xmlstream.ConnectAuthenticator):
     264    """
     265    Authenticator for an outgoing connection to verify an incoming connection.
     266
     267    This authenticator connects to C{otherHost} (the Authoritative Server) and
     268    then initiates dialback as C{thisHost} (the Receiving Server) using
     269    L{ReceivingDialbackInitializer}.
     270
     271    @ivar thisHost: The domain this server connects from (the Receiving
     272                    Server) .
     273    @ivar otherHost: The domain of the server this server connects to (the
     274                     Authoritative Server).
     275    @ivar originalStreamID: The stream ID of the incoming connection that is
     276                            being verified.
     277    @ivar key: The key provided by the Receving Server to be verified.
     278    """
     279    namespace = 'jabber:server'
     280
     281    def __init__(self, thisHost, otherHost, originalStreamID, key):
     282        self.thisHost = thisHost
     283        self.otherHost = otherHost
     284        self.originalStreamID = originalStreamID
     285        self.key = key
     286        xmlstream.ConnectAuthenticator.__init__(self, otherHost)
     287
     288
     289    def connectionMade(self):
     290        self.xmlstream.thisEntity = jid.internJID(self.thisHost)
     291        self.xmlstream.prefixes = {xmlstream.NS_STREAMS: 'stream',
     292                                   NS_DIALBACK: 'db'}
     293        xmlstream.ConnectAuthenticator.connectionMade(self)
     294
     295
     296    def associateWithStream(self, xs):
     297        xmlstream.ConnectAuthenticator.associateWithStream(self, xs)
     298        init = ReceivingDialbackInitializer(xs, self.thisHost, self.otherHost,
     299                                            self.originalStreamID, self.key)
     300        xs.initializers = [init]
     301
     302
     303
     304class XMPPServerListenAuthenticator(xmlstream.ListenAuthenticator):
     305    """
     306    Authenticator for an incoming XMPP server-to-server connection.
     307
     308    This authenticator handles two types of incoming connections. Regular
     309    server-to-server connections are from the Originating Server to the
     310    Receiving Server, where this server is the Receiving Server. These
     311    connections start out by receiving a dialback key, verifying the
     312    key with the Authoritative Server, and then accept normal XMPP stanzas.
     313
     314    The other type of connections is from a Receiving Server to an
     315    Authoritative Server, where this server acts as the Authoritative Server.
     316    These connections are used to verify the validity of an outgoing connection
     317    from this server. In this case, this server receives a verification
     318    request, checks the key and then returns the result.
     319
     320    @ivar service: The service that keeps the list of domains we accept
     321                   connections for.
     322    """
     323    namespace = 'jabber:server'
     324
     325    def __init__(self, service):
     326        xmlstream.ListenAuthenticator.__init__(self)
     327        self.service = service
     328
     329
     330    def streamStarted(self, rootElement):
     331        xmlstream.ListenAuthenticator.streamStarted(self, rootElement)
     332
     333        # Compatibility fix for pre-8.2 implementations of ListenAuthenticator
     334        if not self.xmlstream.sid:
     335            self.xmlstream.sid = randbytes.secureRandom(8).encode('hex')
     336
     337        if self.xmlstream.thisEntity:
     338            targetDomain = self.xmlstream.thisEntity.host
     339        else:
     340            targetDomain = self.service.defaultDomain
     341
     342        def prepareStream(domain):
     343            self.xmlstream.namespace = self.namespace
     344            self.xmlstream.prefixes = {xmlstream.NS_STREAMS: 'stream',
     345                                       NS_DIALBACK: 'db'}
     346            if domain:
     347                self.xmlstream.thisEntity = jid.internJID(domain)
     348
     349        try:
     350            if xmlstream.NS_STREAMS != rootElement.uri or \
     351               self.namespace != self.xmlstream.namespace or \
     352               ('db', NS_DIALBACK) not in rootElement.localPrefixes.iteritems():
     353                raise error.StreamError('invalid-namespace')
     354
     355            if not targetDomain or targetDomain not in self.service.domains:
     356                raise error.StreamError('host-unknown')
     357        except error.StreamError, exc:
     358            prepareStream(self.service.defaultDomain)
     359            self.xmlstream.sendStreamError(exc)
     360            return
     361
     362        self.xmlstream.addObserver("//verify[@xmlns='%s']" % NS_DIALBACK,
     363                                   trapStreamError(self.xmlstream,
     364                                                   self.onVerify))
     365        self.xmlstream.addObserver("//result[@xmlns='%s']" % NS_DIALBACK,
     366                                   self.onResult)
     367
     368        prepareStream(targetDomain)
     369        self.xmlstream.sendHeader()
     370
     371        if self.xmlstream.version >= (1, 0):
     372            features = domish.Element((xmlstream.NS_STREAMS, 'features'))
     373            self.xmlstream.send(features)
     374
     375
     376    def onVerify(self, verify):
     377        try:
     378            receivingServer = jid.JID(verify['from']).host
     379            originatingServer = jid.JID(verify['to']).host
     380        except (KeyError, jid.InvalidFormat):
     381            raise error.StreamError('improper-addressing')
     382
     383        if originatingServer not in self.service.domains:
     384            raise error.StreamError('host-unknown')
     385
     386        if (self.xmlstream.otherEntity and
     387            receivingServer != self.xmlstream.otherEntity.host):
     388            raise error.StreamError('invalid-from')
     389
     390        streamID = verify.getAttribute('id', '')
     391        key = unicode(verify)
     392
     393        calculatedKey = generateKey(self.service.secret, receivingServer,
     394                                    originatingServer, streamID)
     395        validity = (key == calculatedKey) and 'valid' or 'invalid'
     396
     397        reply = domish.Element((NS_DIALBACK, 'verify'))
     398        reply['from'] = originatingServer
     399        reply['to'] = receivingServer
     400        reply['id'] = streamID
     401        reply['type'] = validity
     402        self.xmlstream.send(reply)
     403
     404
     405    def onResult(self, result):
     406        def reply(validity):
     407            reply = domish.Element((NS_DIALBACK, 'result'))
     408            reply['from'] = result['to']
     409            reply['to'] = result['from']
     410            reply['type'] = validity
     411            self.xmlstream.send(reply)
     412
     413        def valid(xs):
     414            reply('valid')
     415            self.xmlstream.otherEntity = jid.internJID(originatingServer)
     416            self.xmlstream.dispatch(self.xmlstream,
     417                                    xmlstream.STREAM_AUTHD_EVENT)
     418
     419        def invalid(failure):
     420            log.err(failure)
     421            reply('invalid')
     422
     423        receivingServer = result['to']
     424        originatingServer = result['from']
     425        key = unicode(result)
     426
     427        d = self.service.validateConnection(receivingServer, originatingServer,
     428                                            self.xmlstream.sid, key)
     429        d.addCallbacks(valid, invalid)
     430        return d
     431
     432
     433
     434class DeferredS2SClientFactory(DeferredXmlStreamFactory):
     435    """
     436    Deferred firing factory for initiating XMPP server-to-server connection.
     437
     438    The deferred has its callbacks called upon succesful authentication with
     439    the other server. In case of failed authentication or connection, the
     440    deferred will have its errbacks called instead.
     441    """
     442
     443    logTraffic = False
     444
     445    def __init__(self, authenticator):
     446        DeferredXmlStreamFactory.__init__(self, authenticator)
     447
     448        self.addBootstrap(xmlstream.STREAM_CONNECTED_EVENT,
     449                          self.onConnectionMade)
     450
     451        self.serial = 0
     452
     453
     454    def onConnectionMade(self, xs):
     455        xs.serial = self.serial
     456        self.serial += 1
     457
     458        def logDataIn(buf):
     459            log.msg("RECV (%d): %r" % (xs.serial, buf))
     460
     461        def logDataOut(buf):
     462            log.msg("SEND (%d): %r" % (xs.serial, buf))
     463
     464        if self.logTraffic:
     465            xs.rawDataInFn = logDataIn
     466            xs.rawDataOutFn = logDataOut
     467
     468
     469
     470def initiateS2S(factory):
     471    domain = factory.authenticator.otherHost
     472    c = XMPPServerConnector(reactor, domain, factory)
     473    c.connect()
     474    return factory.deferred
     475
     476
     477
     478class XMPPS2SServerFactory(XmlStreamServerFactory):
     479    """
     480    XMPP Server-to-Server Server factory.
     481
     482    This factory accepts XMPP server-to-server connections.
     483    """
     484
     485    logTraffic = False
     486
     487    def __init__(self, service):
     488        self.service = service
     489
     490        def authenticatorFactory():
     491            return XMPPServerListenAuthenticator(service)
     492
     493        XmlStreamServerFactory.__init__(self, authenticatorFactory)
     494        self.addBootstrap(xmlstream.STREAM_CONNECTED_EVENT,
     495                          self.onConnectionMade)
     496        self.addBootstrap(xmlstream.STREAM_AUTHD_EVENT,
     497                          self.onAuthenticated)
     498
     499        self.serial = 0
     500
     501
     502    def onConnectionMade(self, xs):
     503        """
     504        Called when a server-to-server connection was made.
     505
     506        This enables traffic debugging on incoming streams.
     507        """
     508        xs.serial = self.serial
     509        self.serial += 1
     510
     511        def logDataIn(buf):
     512            log.msg("RECV (%d): %r" % (xs.serial, buf))
     513
     514        def logDataOut(buf):
     515            log.msg("SEND (%d): %r" % (xs.serial, buf))
     516
     517        if self.logTraffic:
     518            xs.rawDataInFn = logDataIn
     519            xs.rawDataOutFn = logDataOut
     520
     521        xs.addObserver(xmlstream.STREAM_ERROR_EVENT, self.onError)
     522
     523
     524    def onAuthenticated(self, xs):
     525        thisHost = xs.thisEntity.host
     526        otherHost = xs.otherEntity.host
     527
     528        log.msg("Incoming connection %d from %r to %r established" %
     529                (xs.serial, otherHost, thisHost))
     530
     531        xs.addObserver(xmlstream.STREAM_END_EVENT, self.onConnectionLost,
     532                                                   0, xs)
     533        xs.addObserver('/*', self.onElement, 0, xs)
     534
     535
     536    def onConnectionLost(self, xs, reason):
     537        thisHost = xs.thisEntity.host
     538        otherHost = xs.otherEntity.host
     539
     540        log.msg("Incoming connection %d from %r to %r disconnected" %
     541                (xs.serial, otherHost, thisHost))
     542
     543
     544    def onError(self, reason):
     545        log.err(reason, "Stream Error")
     546
     547
     548    def onElement(self, xs, element):
     549        """
     550        Called when an element was received from one of the connected streams.
     551
     552        """
     553        if element.handled:
     554            return
     555        else:
     556            self.service.dispatch(xs, element)
     557
     558
     559
     560class ServerService(object):
     561    """
     562    Service for managing XMPP server to server connections.
     563    """
     564
     565    logTraffic = False
     566
     567    def __init__(self, router, domain=None, secret=None):
     568        self.router = router
     569
     570        self.defaultDomain = domain
     571        self.domains = set()
     572        if self.defaultDomain:
     573            self.domains.add(self.defaultDomain)
     574
     575        if secret is not None:
     576            self.secret = secret
     577        else:
     578            self.secret = randbytes.secureRandom(16).encode('hex')
     579
     580        self._outgoingStreams = {}
     581        self._outgoingQueues = {}
     582        self._outgoingConnecting = set()
     583        self.serial = 0
     584
     585        pipe = XmlPipe()
     586        self.xmlstream = pipe.source
     587        self.router.addRoute(None, pipe.sink)
     588        self.xmlstream.addObserver('/*', self.send)
     589
     590
     591    def outgoingInitialized(self, xs):
     592        thisHost = xs.thisEntity.host
     593        otherHost = xs.otherEntity.host
     594
     595        log.msg("Outgoing connection %d from %r to %r established" %
     596                (xs.serial, thisHost, otherHost))
     597
     598        self._outgoingStreams[thisHost, otherHost] = xs
     599        xs.addObserver(xmlstream.STREAM_END_EVENT,
     600                       lambda _: self.outgoingDisconnected(xs))
     601
     602        if (thisHost, otherHost) in self._outgoingQueues:
     603            for element in self._outgoingQueues[thisHost, otherHost]:
     604                xs.send(element)
     605            del self._outgoingQueues[thisHost, otherHost]
     606
     607
     608    def outgoingDisconnected(self, xs):
     609        thisHost = xs.thisEntity.host
     610        otherHost = xs.otherEntity.host
     611
     612        log.msg("Outgoing connection %d from %r to %r disconnected" %
     613                (xs.serial, thisHost, otherHost))
     614
     615        del self._outgoingStreams[thisHost, otherHost]
     616
     617
     618    def initiateOutgoingStream(self, thisHost, otherHost):
     619        """
     620        Initiate an outgoing XMPP server-to-server connection.
     621        """
     622
     623        def resetConnecting(_):
     624            self._outgoingConnecting.remove((thisHost, otherHost))
     625
     626        if (thisHost, otherHost) in self._outgoingConnecting:
     627            return
     628
     629        authenticator = XMPPServerConnectAuthenticator(thisHost,
     630                                                       otherHost,
     631                                                       self.secret)
     632        factory = DeferredS2SClientFactory(authenticator)
     633        factory.addBootstrap(xmlstream.STREAM_AUTHD_EVENT,
     634                             self.outgoingInitialized)
     635        factory.logTraffic = self.logTraffic
     636
     637        self._outgoingConnecting.add((thisHost, otherHost))
     638
     639        d = initiateS2S(factory)
     640        d.addBoth(resetConnecting)
     641        return d
     642
     643
     644    def validateConnection(self, thisHost, otherHost, sid, key):
     645        """
     646        Validate an incoming XMPP server-to-server connection.
     647        """
     648
     649        def connected(xs):
     650            # Set up stream for immediate disconnection.
     651            def disconnect(_):
     652                xs.transport.loseConnection()
     653            xs.addObserver(xmlstream.STREAM_AUTHD_EVENT, disconnect)
     654            xs.addObserver(xmlstream.INIT_FAILED_EVENT, disconnect)
     655
     656        authenticator = XMPPServerVerifyAuthenticator(thisHost, otherHost,
     657                                                      sid, key)
     658        factory = DeferredS2SClientFactory(authenticator)
     659        factory.addBootstrap(xmlstream.STREAM_CONNECTED_EVENT, connected)
     660        factory.logTraffic = self.logTraffic
     661
     662        d = initiateS2S(factory)
     663        return d
     664
     665
     666    def send(self, stanza):
     667        """
     668        Send stanza to the proper XML Stream.
     669
     670        This uses addressing embedded in the stanza to find the correct stream
     671        to forward the stanza to.
     672        """
     673
     674        otherHost = jid.internJID(stanza["to"]).host
     675        thisHost = jid.internJID(stanza["from"]).host
     676
     677        if (thisHost, otherHost) not in self._outgoingStreams:
     678            # There is no connection with the destination (yet). Cache the
     679            # outgoing stanza until the connection has been established.
     680            # XXX: If the connection cannot be established, the queue should
     681            #      be emptied at some point.
     682            if (thisHost, otherHost) not in self._outgoingQueues:
     683                self._outgoingQueues[(thisHost, otherHost)] = []
     684            self._outgoingQueues[(thisHost, otherHost)].append(stanza)
     685            self.initiateOutgoingStream(thisHost, otherHost)
     686        else:
     687            self._outgoingStreams[(thisHost, otherHost)].send(stanza)
     688
     689
     690    def dispatch(self, xs, stanza):
     691        """
     692        Send on element to be routed within the server.
     693        """
     694        stanzaFrom = stanza.getAttribute('from')
     695        stanzaTo = stanza.getAttribute('to')
     696
     697        if not stanzaFrom or not stanzaTo:
     698            xs.sendStreamError(error.StreamError('improper-addressing'))
     699        else:
     700            try:
     701                sender = jid.internJID(stanzaFrom)
     702                recipient = jid.internJID(stanzaTo)
     703            except jid.InvalidFormat:
     704                log.msg("Dropping error stanza with malformed JID")
     705
     706            if sender.host != xs.otherEntity.host:
     707                xs.sendStreamError(error.StreamError('invalid-from'))
     708            else:
     709                self.xmlstream.send(stanza)
  • new file wokkel/test/test_server.py

    diff -r 313d45b505a7 wokkel/test/test_server.py
    - +  
     1# Copyright (c) 2003-2008 Ralph Meijer
     2# See LICENSE for details.
     3
     4"""
     5Tests for L{wokkel.server}.
     6"""
     7
     8from twisted.internet import defer
     9from twisted.python import failure
     10from twisted.test.proto_helpers import StringTransport
     11from twisted.trial import unittest
     12from twisted.words.protocols.jabber import error, jid, xmlstream
     13from twisted.words.xish import domish
     14
     15from wokkel import component, server
     16
     17NS_STREAMS = 'http://etherx.jabber.org/streams'
     18NS_DIALBACK = "jabber:server:dialback"
     19
     20class GenerateKeyTest(unittest.TestCase):
     21    """
     22    Tests for L{server.generateKey}.
     23    """
     24
     25    def testBasic(self):
     26        originating = "example.org"
     27        receiving = "xmpp.example.com"
     28        sid = "D60000229F"
     29        secret = "s3cr3tf0rd14lb4ck"
     30
     31        key = server.generateKey(secret, receiving, originating, sid)
     32
     33        self.assertEqual(key,
     34            '37c69b1cf07a3f67c04a5ef5902fa5114f2c76fe4a2686482ba5b89323075643')
     35
     36
     37
     38class XMPPServerListenAuthenticatorTest(unittest.TestCase):
     39    """
     40    Tests for L{server.XMPPServerListenAuthenticator}.
     41    """
     42
     43    secret = "s3cr3tf0rd14lb4ck"
     44    originating = "example.org"
     45    receiving = "xmpp.example.com"
     46    sid = "D60000229F"
     47    key = '37c69b1cf07a3f67c04a5ef5902fa5114f2c76fe4a2686482ba5b89323075643'
     48
     49    def setUp(self):
     50        self.output = []
     51
     52        class MyService(object):
     53            pass
     54
     55        self.service = MyService()
     56        self.service.defaultDomain = self.receiving
     57        self.service.domains = [self.receiving, 'pubsub.'+self.receiving]
     58        self.service.secret = self.secret
     59
     60        self.authenticator = server.XMPPServerListenAuthenticator(self.service)
     61        self.xmlstream = xmlstream.XmlStream(self.authenticator)
     62        self.xmlstream.send = self.output.append
     63        self.xmlstream.transport = StringTransport()
     64
     65
     66    def test_attributes(self):
     67        """
     68        Test attributes of authenticator and stream objects.
     69        """
     70        self.assertEqual(self.service, self.authenticator.service)
     71        self.assertEqual(self.xmlstream.initiating, False)
     72
     73
     74    def test_streamStartedVersion0(self):
     75        """
     76        The authenticator supports pre-XMPP 1.0 streams.
     77        """
     78        self.xmlstream.connectionMade()
     79        self.xmlstream.dataReceived(
     80            "<stream:stream xmlns:stream='http://etherx.jabber.org/streams' "
     81                           "xmlns:db='jabber:server:dialback' "
     82                           "xmlns='jabber:server' "
     83                           "to='xmpp.example.com'>")
     84        self.assertEqual((0, 0), self.xmlstream.version)
     85
     86
     87    def test_streamStartedVersion1(self):
     88        """
     89        The authenticator supports XMPP 1.0 streams.
     90        """
     91        self.xmlstream.connectionMade()
     92        self.xmlstream.dataReceived(
     93            "<stream:stream xmlns:stream='http://etherx.jabber.org/streams' "
     94                           "xmlns:db='jabber:server:dialback' "
     95                           "xmlns='jabber:server' "
     96                           "to='xmpp.example.com' "
     97                           "version='1.0'>")
     98        self.assertEqual((1, 0), self.xmlstream.version)
     99
     100
     101    def test_streamStartedSID(self):
     102        """
     103        The response stream will have a stream ID.
     104        """
     105        self.xmlstream.connectionMade()
     106        self.assertIdentical(None, self.xmlstream.sid)
     107
     108        self.xmlstream.dataReceived(
     109            "<stream:stream xmlns:stream='http://etherx.jabber.org/streams' "
     110                           "xmlns:db='jabber:server:dialback' "
     111                           "xmlns='jabber:server' "
     112                           "to='xmpp.example.com' "
     113                           "version='1.0'>")
     114        self.assertNotIdentical(None, self.xmlstream.sid)
     115
     116
     117    def test_streamStartedSentResponseHeader(self):
     118        """
     119        A stream header is sent in response to the incoming stream header.
     120        """
     121        self.xmlstream.connectionMade()
     122        self.assertFalse(self.xmlstream._headerSent)
     123
     124        self.xmlstream.dataReceived(
     125            "<stream:stream xmlns:stream='http://etherx.jabber.org/streams' "
     126                           "xmlns:db='jabber:server:dialback' "
     127                           "xmlns='jabber:server' "
     128                           "to='xmpp.example.com'>")
     129        self.assertTrue(self.xmlstream._headerSent)
     130
     131
     132    def test_streamStartedNotSentFeatures(self):
     133        """
     134        No features are sent in response to an XMPP < 1.0 stream header.
     135        """
     136        self.xmlstream.connectionMade()
     137        self.xmlstream.dataReceived(
     138            "<stream:stream xmlns:stream='http://etherx.jabber.org/streams' "
     139                           "xmlns:db='jabber:server:dialback' "
     140                           "xmlns='jabber:server' "
     141                           "to='xmpp.example.com'>")
     142        self.assertEqual(1, len(self.output))
     143
     144
     145    def test_streamStartedSentFeatures(self):
     146        """
     147        Features are sent in response to an XMPP >= 1.0 stream header.
     148        """
     149        self.xmlstream.connectionMade()
     150        self.xmlstream.dataReceived(
     151            "<stream:stream xmlns:stream='http://etherx.jabber.org/streams' "
     152                           "xmlns:db='jabber:server:dialback' "
     153                           "xmlns='jabber:server' "
     154                           "to='xmpp.example.com' "
     155                           "version='1.0'>")
     156        self.assertEqual(2, len(self.output))
     157        features = self.output[-1]
     158        self.assertEqual(NS_STREAMS, features.uri)
     159        self.assertEqual('features', features.name)
     160
     161
     162    def test_streamRootElement(self):
     163        """
     164        Test stream error on wrong stream namespace.
     165        """
     166        self.xmlstream.connectionMade()
     167        self.xmlstream.dataReceived(
     168            "<stream:stream xmlns:stream='badns' "
     169                           "xmlns:db='jabber:server:dialback' "
     170                           "xmlns='jabber:server' "
     171                           "to='xmpp.example.com'>")
     172
     173        self.assertEqual(3, len(self.output))
     174        exc = error.exceptionFromStreamError(self.output[1])
     175        self.assertEqual('invalid-namespace', exc.condition)
     176
     177
     178    def test_streamDefaultNamespace(self):
     179        """
     180        Test stream error on missing dialback namespace.
     181        """
     182        self.xmlstream.connectionMade()
     183        self.xmlstream.dataReceived(
     184            "<stream:stream xmlns:stream='http://etherx.jabber.org/streams' "
     185                           "xmlns:db='jabber:server:dialback' "
     186                           "xmlns='badns' "
     187                           "to='xmpp.example.com'>")
     188
     189        self.assertEqual(3, len(self.output))
     190        exc = error.exceptionFromStreamError(self.output[1])
     191        self.assertEqual('invalid-namespace', exc.condition)
     192
     193
     194    def test_streamNoDialbackNamespace(self):
     195        """
     196        Test stream error on missing dialback namespace.
     197        """
     198        self.xmlstream.connectionMade()
     199        self.xmlstream.dataReceived(
     200            "<stream:stream xmlns:stream='http://etherx.jabber.org/streams' "
     201                           "xmlns='jabber:server' "
     202                           "to='xmpp.example.com'>")
     203
     204        self.assertEqual(3, len(self.output))
     205        exc = error.exceptionFromStreamError(self.output[1])
     206        self.assertEqual('invalid-namespace', exc.condition)
     207
     208
     209    def test_streamBadDialbackNamespace(self):
     210        """
     211        Test stream error on missing dialback namespace.
     212        """
     213        self.xmlstream.connectionMade()
     214        self.xmlstream.dataReceived(
     215            "<stream:stream xmlns:stream='http://etherx.jabber.org/streams' "
     216                           "xmlns:db='badns' "
     217                           "xmlns='jabber:server' "
     218                           "to='xmpp.example.com'>")
     219
     220        self.assertEqual(3, len(self.output))
     221        exc = error.exceptionFromStreamError(self.output[1])
     222        self.assertEqual('invalid-namespace', exc.condition)
     223
     224
     225    def test_streamToUnknownHost(self):
     226        """
     227        Test stream error on stream's to attribute having unknown host.
     228        """
     229        self.xmlstream.connectionMade()
     230        self.xmlstream.dataReceived(
     231            "<stream:stream xmlns:stream='http://etherx.jabber.org/streams' "
     232                           "xmlns:db='jabber:server:dialback' "
     233                           "xmlns='jabber:server' "
     234                           "to='badhost'>")
     235
     236        self.assertEqual(3, len(self.output))
     237        exc = error.exceptionFromStreamError(self.output[1])
     238        self.assertEqual('host-unknown', exc.condition)
     239
     240
     241    def test_streamToOtherLocalHost(self):
     242        """
     243        The authenticator supports XMPP 1.0 streams.
     244        """
     245        self.xmlstream.connectionMade()
     246        self.xmlstream.dataReceived(
     247            "<stream:stream xmlns:stream='http://etherx.jabber.org/streams' "
     248                           "xmlns:db='jabber:server:dialback' "
     249                           "xmlns='jabber:server' "
     250                           "to='pubsub.xmpp.example.com' "
     251                           "version='1.0'>")
     252
     253        self.assertEqual(2, len(self.output))
     254        self.assertEqual(jid.JID('pubsub.xmpp.example.com'),
     255                         self.xmlstream.thisEntity)
     256
     257    def test_onResult(self):
     258        def cb(result):
     259            self.assertEqual(1, len(self.output))
     260            reply = self.output[0]
     261            self.assertEqual(self.originating, reply['to'])
     262            self.assertEqual(self.receiving, reply['from'])
     263            self.assertEqual('valid', reply['type'])
     264
     265        def validateConnection(thisHost, otherHost, sid, key):
     266            self.assertEqual(thisHost, self.receiving)
     267            self.assertEqual(otherHost, self.originating)
     268            self.assertEqual(sid, self.sid)
     269            self.assertEqual(key, self.key)
     270            return defer.succeed(None)
     271
     272        self.xmlstream.sid = self.sid
     273        self.service.validateConnection = validateConnection
     274
     275        result = domish.Element((NS_DIALBACK, 'result'))
     276        result['to'] = self.receiving
     277        result['from'] = self.originating
     278        result.addContent(self.key)
     279
     280        d = self.authenticator.onResult(result)
     281        d.addCallback(cb)
     282        return d
     283
     284
     285    def test_onResultFailure(self):
     286        class TestError(Exception):
     287            pass
     288
     289        def cb(result):
     290            reply = self.output[0]
     291            self.assertEqual('invalid', reply['type'])
     292            self.assertEqual(1, len(self.flushLoggedErrors(TestError)))
     293
     294
     295        def validateConnection(thisHost, otherHost, sid, key):
     296            return defer.fail(TestError())
     297
     298        self.xmlstream.sid = self.sid
     299        self.service.validateConnection = validateConnection
     300
     301        result = domish.Element((NS_DIALBACK, 'result'))
     302        result['to'] = self.receiving
     303        result['from'] = self.originating
     304        result.addContent(self.key)
     305
     306        d = self.authenticator.onResult(result)
     307        d.addCallback(cb)
     308        return d
     309
     310
     311
     312class FakeService(object):
     313    domains = set(['example.org', 'pubsub.example.org'])
     314    defaultDomain = 'example.org'
     315    secret = 'mysecret'
     316
     317    def __init__(self):
     318        self.dispatched = []
     319
     320    def dispatch(self, xs, element):
     321        self.dispatched.append(element)
     322
     323
     324
     325class XMPPS2SServerFactoryTest(unittest.TestCase):
     326    """
     327    Tests for L{component.XMPPS2SServerFactory}.
     328    """
     329
     330    def setUp(self):
     331        self.service = FakeService()
     332        self.factory = server.XMPPS2SServerFactory(self.service)
     333        self.xmlstream = self.factory.buildProtocol(None)
     334        self.transport = StringTransport()
     335        self.xmlstream.thisEntity = jid.JID('example.org')
     336        self.xmlstream.otherEntity = jid.JID('example.com')
     337
     338
     339    def test_makeConnection(self):
     340        """
     341        A new connection increases the stream serial count. No logs by default.
     342        """
     343        self.xmlstream.makeConnection(self.transport)
     344        self.assertEqual(0, self.xmlstream.serial)
     345        self.assertEqual(1, self.factory.serial)
     346        self.assertIdentical(None, self.xmlstream.rawDataInFn)
     347        self.assertIdentical(None, self.xmlstream.rawDataOutFn)
     348
     349
     350    def test_makeConnectionLogTraffic(self):
     351        """
     352        Setting logTraffic should set up raw data loggers.
     353        """
     354        self.factory.logTraffic = True
     355        self.xmlstream.makeConnection(self.transport)
     356        self.assertNotIdentical(None, self.xmlstream.rawDataInFn)
     357        self.assertNotIdentical(None, self.xmlstream.rawDataOutFn)
     358
     359
     360    def test_onError(self):
     361        """
     362        An observer for stream errors should trigger onError to log it.
     363        """
     364        self.xmlstream.makeConnection(self.transport)
     365
     366        class TestError(Exception):
     367            pass
     368
     369        reason = failure.Failure(TestError())
     370        self.xmlstream.dispatch(reason, xmlstream.STREAM_ERROR_EVENT)
     371        self.assertEqual(1, len(self.flushLoggedErrors(TestError)))
     372
     373
     374    def test_connectionInitialized(self):
     375        """
     376        """
     377        self.xmlstream.makeConnection(self.transport)
     378        self.xmlstream.dispatch(self.xmlstream, xmlstream.STREAM_AUTHD_EVENT)
     379
     380
     381    def test_connectionLost(self):
     382        """
     383        """
     384        self.xmlstream.makeConnection(self.transport)
     385        self.xmlstream.dispatch(self.xmlstream, xmlstream.STREAM_AUTHD_EVENT)
     386        self.xmlstream.dispatch(None, xmlstream.STREAM_END_EVENT)
     387
     388
     389    def test_Element(self):
     390        self.xmlstream.makeConnection(self.transport)
     391        self.xmlstream.dispatch(self.xmlstream, xmlstream.STREAM_AUTHD_EVENT)
     392
     393        stanza = domish.Element((None, "presence"))
     394        self.xmlstream.dispatch(stanza)
     395        self.assertEqual(1, len(self.service.dispatched))
     396        self.assertIdentical(stanza, self.service.dispatched[-1])
     397
     398
     399    def test_ElementNotAuthenticated(self):
     400        self.xmlstream.makeConnection(self.transport)
     401
     402        stanza = domish.Element((None, "presence"))
     403        self.xmlstream.dispatch(stanza)
     404        self.assertEqual(0, len(self.service.dispatched))
     405
     406
     407
     408class ServerServiceTest(unittest.TestCase):
     409
     410    def setUp(self):
     411        self.output = []
     412
     413        self.xmlstream = xmlstream.XmlStream(xmlstream.Authenticator())
     414        self.xmlstream.thisEntity = jid.JID('example.org')
     415        self.xmlstream.otherEntity = jid.JID('example.com')
     416        self.xmlstream.send = self.output.append
     417
     418        self.router = component.Router()
     419        self.service = server.ServerService(self.router,
     420                                            secret='mysecret',
     421                                            domain='example.org')
     422        self.service.xmlstream = self.xmlstream
     423
     424
     425    def test_defaultDomainInDomains(self):
     426        """
     427        The default domain is part of the domains considered local.
     428        """
     429        self.assertIn(self.service.defaultDomain, self.service.domains)
     430
     431
     432    def test_dispatch(self):
     433        stanza = domish.Element((None, "presence"))
     434        stanza['to'] = 'user@example.org'
     435        stanza['from'] = 'other@example.com'
     436        self.service.dispatch(self.xmlstream, stanza)
     437
     438        self.assertEqual(1, len(self.output))
     439        self.assertIdentical(stanza, self.output[-1])
     440
     441
     442    def test_dispatchNoTo(self):
     443        errors = []
     444        self.xmlstream.sendStreamError = errors.append
     445
     446        stanza = domish.Element((None, "presence"))
     447        stanza['from'] = 'other@example.com'
     448        self.service.dispatch(self.xmlstream, stanza)
     449
     450        self.assertEqual(1, len(errors))
Note: See TracBrowser for help on using the repository browser.