source: ralphm-patches/s2s.patch @ 12:fc40892815eb

Last change on this file since 12:fc40892815eb was 12:fc40892815eb, checked in by Ralph Meijer <ralphm@…>, 12 years ago

Make sure empty default domains works.

File size: 39.8 KB
  • new file wokkel/server.py

    diff -r fd1dc58fe561 wokkel/server.py
    - +  
     1# -*- test-case-name: wokkel.test.test_server -*-
     2#
     3# Copyright (c) 2003-2008 Ralph Meijer
     4# See LICENSE for details.
     5
     6"""
     7XMPP Server-to-Server protocol.
     8
     9This module implements several aspects of XMPP server-to-server communications
     10as described in XMPP Core (RFC 3920). Refer to that document for the meaning
     11of the used terminology.
     12"""
     13
     14# hashlib is new in Python 2.5, try that first.
     15try:
     16    from hashlib import sha256
     17    digestmod = sha256
     18except ImportError:
     19    import Crypto.Hash.SHA256 as digestmod
     20    sha256 = digestmod.new
     21
     22import hmac
     23
     24from zope.interface import implements
     25
     26from twisted.application import service
     27from twisted.internet import defer, reactor
     28from twisted.names.srvconnect import SRVConnector
     29from twisted.python import log, randbytes
     30from twisted.words.protocols.jabber import error, ijabber, jid, xmlstream
     31from twisted.words.xish import domish
     32
     33from wokkel.generic import DeferredXmlStreamFactory, XmlPipe
     34from wokkel.compat import XmlStreamServerFactory
     35
     36NS_DIALBACK = 'jabber:server:dialback'
     37
     38def generateKey(secret, receivingServer, originatingServer, streamID):
     39    """
     40    Generate a dialback key for server-to-server XMPP Streams.
     41
     42    The dialback key is generated using the algorithm described in
     43    U{XEP-0185<http://www.xmpp.org/extensions/xep-0185.html>}. The used
     44    terminology for the parameters is described in RFC-3920.
     45
     46    @param secret: the shared secret known to the Originating Server and
     47                   Authoritive Server.
     48    @type secret: C{str}
     49    @param receivingServer: the Receiving Server host name.
     50    @type receivingServer: C{str}
     51    @param originatingServer: the Originating Server host name.
     52    @type originatingServer: C{str}
     53    @param streamID: the Stream ID as generated by the Receiving Server.
     54    @type streamID: C{str}
     55    @return: hexadecimal digest of the generated key.
     56    @type: C{str}
     57    """
     58
     59    hashObject = sha256()
     60    hashObject.update(secret)
     61    hashedSecret = hashObject.hexdigest()
     62    message = " ".join([receivingServer, originatingServer, streamID])
     63    hash = hmac.HMAC(hashedSecret, message, digestmod=digestmod)
     64    return hash.hexdigest()
     65
     66
     67def trapStreamError(xs, observer):
     68    """
     69    Trap stream errors.
     70
     71    This wraps an observer to catch exceptions. In case of a
     72    L{error.StreamError}, it is send over the given XML stream. All other
     73    exceptions yield a C{'internal-server-error'} stream error, that is
     74    sent over the stream, while the exception is logged.
     75
     76    @return: Wrapped observer
     77    """
     78
     79    def wrappedObserver(element):
     80        try:
     81            observer(element)
     82        except error.StreamError, exc:
     83            xs.sendStreamError(exc)
     84        except:
     85            log.err()
     86            exc = error.StreamError('internal-server-error')
     87            xs.sendStreamError(exc)
     88
     89    return wrappedObserver
     90
     91
     92class XMPPServerConnector(SRVConnector):
     93    def __init__(self, reactor, domain, factory):
     94        SRVConnector.__init__(self, reactor, 'xmpp-server', domain, factory)
     95
     96
     97    def pickServer(self):
     98        host, port = SRVConnector.pickServer(self)
     99
     100        if not self.servers and not self.orderedServers:
     101            # no SRV record, fall back..
     102            port = 5269
     103
     104        return host, port
     105
     106
     107class DialbackFailed(Exception):
     108    pass
     109
     110
     111
     112class OriginatingDialbackInitializer(object):
     113    """
     114    Server Dialback Initializer for the Orginating Server.
     115    """
     116
     117    implements(ijabber.IInitiatingInitializer)
     118
     119    _deferred = None
     120
     121    def __init__(self, xs, thisHost, otherHost, secret):
     122        self.xmlstream = xs
     123        self.thisHost = thisHost
     124        self.otherHost = otherHost
     125        self.secret = secret
     126
     127
     128    def initialize(self):
     129        self._deferred = defer.Deferred()
     130        self.xmlstream.addObserver(xmlstream.STREAM_ERROR_EVENT,
     131                                   self.onStreamError)
     132        self.xmlstream.addObserver("/result[@xmlns='%s']" % NS_DIALBACK,
     133                                   self.onResult)
     134
     135        key = generateKey(self.secret, self.otherHost,
     136                          self.thisHost, self.xmlstream.sid)
     137
     138        result = domish.Element((NS_DIALBACK, 'result'))
     139        result['from'] = self.thisHost
     140        result['to'] = self.otherHost
     141        result.addContent(key)
     142
     143        self.xmlstream.send(result)
     144
     145        return self._deferred
     146
     147
     148    def onResult(self, result):
     149        self.xmlstream.removeObserver(xmlstream.STREAM_ERROR_EVENT,
     150                                      self.onStreamError)
     151        if result['type'] == 'valid':
     152            self.xmlstream.otherEntity = jid.internJID(self.otherHost)
     153            self._deferred.callback(None)
     154        else:
     155            self._deferred.errback(DialbackFailed())
     156
     157
     158    def onStreamError(self, failure):
     159        self.xmlstream.removeObserver("/result[@xmlns='%s']" % NS_DIALBACK,
     160                                      self.onResult)
     161        self._deferred.errback(failure)
     162
     163
     164
     165class ReceivingDialbackInitializer(object):
     166    """
     167    Server Dialback Initializer for the Receiving Server.
     168    """
     169
     170    implements(ijabber.IInitiatingInitializer)
     171
     172    _deferred = None
     173
     174    def __init__(self, xs, thisHost, otherHost, originalStreamID, key):
     175        self.xmlstream = xs
     176        self.thisHost = thisHost
     177        self.otherHost = otherHost
     178        self.originalStreamID = originalStreamID
     179        self.key = key
     180
     181
     182    def initialize(self):
     183        self._deferred = defer.Deferred()
     184        self.xmlstream.addObserver(xmlstream.STREAM_ERROR_EVENT,
     185                                   self.onStreamError)
     186        self.xmlstream.addObserver("/verify[@xmlns='%s']" % NS_DIALBACK,
     187                                   self.onVerify)
     188
     189        verify = domish.Element((NS_DIALBACK, 'verify'))
     190        verify['from'] = self.thisHost
     191        verify['to'] = self.otherHost
     192        verify['id'] = self.originalStreamID
     193        verify.addContent(self.key)
     194
     195        self.xmlstream.send(verify)
     196        return self._deferred
     197
     198
     199    def onVerify(self, verify):
     200        self.xmlstream.removeObserver(xmlstream.STREAM_ERROR_EVENT,
     201                                      self.onStreamError)
     202        if verify['id'] != self.originalStreamID:
     203            self.xmlstream.sendStreamError(error.StreamError('invalid-id'))
     204            self._deferred.errback(DialbackFailed())
     205        elif verify['to'] != self.thisHost:
     206            self.xmlstream.sendStreamError(error.StreamError('host-unknown'))
     207            self._deferred.errback(DialbackFailed())
     208        elif verify['from'] != self.otherHost:
     209            self.xmlstream.sendStreamError(error.StreamError('invalid-from'))
     210            self._deferred.errback(DialbackFailed())
     211        elif verify['type'] == 'valid':
     212            self._deferred.callback(None)
     213        else:
     214            self._deferred.errback(DialbackFailed())
     215
     216
     217    def onStreamError(self, failure):
     218        self.xmlstream.removeObserver("/verify[@xmlns='%s']" % NS_DIALBACK,
     219                                      self.onVerify)
     220        self._deferred.errback(failure)
     221
     222
     223
     224class XMPPServerConnectAuthenticator(xmlstream.ConnectAuthenticator):
     225    """
     226    Authenticator for an outgoing XMPP server-to-server connection.
     227
     228    This authenticator connects to C{otherHost} (the Receiving Server) and then
     229    initiates dialback as C{thisHost} (the Originating Server) using
     230    L{OriginatingDialbackInitializer}.
     231
     232    @ivar thisHost: The domain this server connects from (the Originating
     233                    Server) .
     234    @ivar otherHost: The domain of the server this server connects to (the
     235                     Receiving Server).
     236    @ivar secret: The shared secret that is used for verifying the validity
     237                  of this new connection.
     238    """
     239    namespace = 'jabber:server'
     240
     241    def __init__(self, thisHost, otherHost, secret):
     242        self.thisHost = thisHost
     243        self.otherHost = otherHost
     244        self.secret = secret
     245        xmlstream.ConnectAuthenticator.__init__(self, otherHost)
     246
     247
     248    def connectionMade(self):
     249        self.xmlstream.thisEntity = jid.internJID(self.thisHost)
     250        self.xmlstream.prefixes = {xmlstream.NS_STREAMS: 'stream',
     251                                   NS_DIALBACK: 'db'}
     252        xmlstream.ConnectAuthenticator.connectionMade(self)
     253
     254
     255    def associateWithStream(self, xs):
     256        xmlstream.ConnectAuthenticator.associateWithStream(self, xs)
     257        init = OriginatingDialbackInitializer(xs, self.thisHost,
     258                                              self.otherHost, self.secret)
     259        xs.initializers = [init]
     260
     261
     262
     263class XMPPServerVerifyAuthenticator(xmlstream.ConnectAuthenticator):
     264    """
     265    Authenticator for an outgoing connection to verify an incoming connection.
     266
     267    This authenticator connects to C{otherHost} (the Authoritative Server) and
     268    then initiates dialback as C{thisHost} (the Receiving Server) using
     269    L{ReceivingDialbackInitializer}.
     270
     271    @ivar thisHost: The domain this server connects from (the Receiving
     272                    Server) .
     273    @ivar otherHost: The domain of the server this server connects to (the
     274                     Authoritative Server).
     275    @ivar originalStreamID: The stream ID of the incoming connection that is
     276                            being verified.
     277    @ivar key: The key provided by the Receving Server to be verified.
     278    """
     279    namespace = 'jabber:server'
     280
     281    def __init__(self, thisHost, otherHost, originalStreamID, key):
     282        self.thisHost = thisHost
     283        self.otherHost = otherHost
     284        self.originalStreamID = originalStreamID
     285        self.key = key
     286        xmlstream.ConnectAuthenticator.__init__(self, otherHost)
     287
     288
     289    def connectionMade(self):
     290        self.xmlstream.thisEntity = jid.internJID(self.thisHost)
     291        self.xmlstream.prefixes = {xmlstream.NS_STREAMS: 'stream',
     292                                   NS_DIALBACK: 'db'}
     293        xmlstream.ConnectAuthenticator.connectionMade(self)
     294
     295
     296    def associateWithStream(self, xs):
     297        xmlstream.ConnectAuthenticator.associateWithStream(self, xs)
     298        init = ReceivingDialbackInitializer(xs, self.thisHost, self.otherHost,
     299                                            self.originalStreamID, self.key)
     300        xs.initializers = [init]
     301
     302
     303
     304class XMPPServerListenAuthenticator(xmlstream.ListenAuthenticator):
     305    """
     306    Authenticator for an incoming XMPP server-to-server connection.
     307
     308    This authenticator handles two types of incoming connections. Regular
     309    server-to-server connections are from the Originating Server to the
     310    Receiving Server, where this server is the Receiving Server. These
     311    connections start out by receiving a dialback key, verifying the
     312    key with the Authoritative Server, and then accept normal XMPP stanzas.
     313
     314    The other type of connections is from a Receiving Server to an
     315    Authoritative Server, where this server acts as the Authoritative Server.
     316    These connections are used to verify the validity of an outgoing connection
     317    from this server. In this case, this server receives a verification
     318    request, checks the key and then returns the result.
     319
     320    @ivar service: The service that keeps the list of domains we accept
     321                   connections for.
     322    """
     323    namespace = 'jabber:server'
     324
     325    def __init__(self, service):
     326        xmlstream.ListenAuthenticator.__init__(self)
     327        self.service = service
     328
     329
     330    def streamStarted(self, rootElement):
     331        xmlstream.ListenAuthenticator.streamStarted(self, rootElement)
     332
     333        # Compatibility fix for pre-8.2 implementations of ListenAuthenticator
     334        if not self.xmlstream.sid:
     335            self.xmlstream.sid = randbytes.secureRandom(8).encode('hex')
     336
     337        if self.xmlstream.thisEntity:
     338            targetDomain = self.xmlstream.thisEntity.host
     339        else:
     340            targetDomain = self.service.defaultDomain
     341
     342        def prepareStream(domain):
     343            self.xmlstream.namespace = self.namespace
     344            self.xmlstream.prefixes = {xmlstream.NS_STREAMS: 'stream',
     345                                       NS_DIALBACK: 'db'}
     346            if domain:
     347                self.xmlstream.thisEntity = jid.internJID(domain)
     348
     349        try:
     350            if xmlstream.NS_STREAMS != rootElement.uri or \
     351               self.namespace != self.xmlstream.namespace or \
     352               ('db', NS_DIALBACK) not in rootElement.localPrefixes.iteritems():
     353                raise error.StreamError('invalid-namespace')
     354
     355            if targetDomain and targetDomain not in self.service.domains:
     356                raise error.StreamError('host-unknown')
     357        except error.StreamError, exc:
     358            prepareStream(self.service.defaultDomain)
     359            self.xmlstream.sendStreamError(exc)
     360            return
     361
     362        self.xmlstream.addObserver("//verify[@xmlns='%s']" % NS_DIALBACK,
     363                                   trapStreamError(self.xmlstream,
     364                                                   self.onVerify))
     365        self.xmlstream.addObserver("//result[@xmlns='%s']" % NS_DIALBACK,
     366                                   self.onResult)
     367
     368        prepareStream(targetDomain)
     369        self.xmlstream.sendHeader()
     370
     371        if self.xmlstream.version >= (1, 0):
     372            features = domish.Element((xmlstream.NS_STREAMS, 'features'))
     373            self.xmlstream.send(features)
     374
     375
     376    def onVerify(self, verify):
     377        try:
     378            receivingServer = jid.JID(verify['from']).host
     379            originatingServer = jid.JID(verify['to']).host
     380        except (KeyError, jid.InvalidFormat):
     381            raise error.StreamError('improper-addressing')
     382
     383        if originatingServer not in self.service.domains:
     384            raise error.StreamError('host-unknown')
     385
     386        if (self.xmlstream.otherEntity and
     387            receivingServer != self.xmlstream.otherEntity.host):
     388            raise error.StreamError('invalid-from')
     389
     390        streamID = verify.getAttribute('id', '')
     391        key = unicode(verify)
     392
     393        calculatedKey = generateKey(self.service.secret, receivingServer,
     394                                    originatingServer, streamID)
     395        validity = (key == calculatedKey) and 'valid' or 'invalid'
     396
     397        reply = domish.Element((NS_DIALBACK, 'verify'))
     398        reply['from'] = originatingServer
     399        reply['to'] = receivingServer
     400        reply['id'] = streamID
     401        reply['type'] = validity
     402        self.xmlstream.send(reply)
     403
     404
     405    def onResult(self, result):
     406        def reply(validity):
     407            reply = domish.Element((NS_DIALBACK, 'result'))
     408            reply['from'] = result['to']
     409            reply['to'] = result['from']
     410            reply['type'] = validity
     411            self.xmlstream.send(reply)
     412
     413        def valid(xs):
     414            reply('valid')
     415            if not self.xmlstream.thisEntity:
     416                self.xmlstream.thisEntity = jid.internJID(receivingServer)
     417            self.xmlstream.otherEntity = jid.internJID(originatingServer)
     418            self.xmlstream.dispatch(self.xmlstream,
     419                                    xmlstream.STREAM_AUTHD_EVENT)
     420
     421        def invalid(failure):
     422            log.err(failure)
     423            reply('invalid')
     424
     425        receivingServer = result['to']
     426        originatingServer = result['from']
     427        key = unicode(result)
     428
     429        d = self.service.validateConnection(receivingServer, originatingServer,
     430                                            self.xmlstream.sid, key)
     431        d.addCallbacks(valid, invalid)
     432        return d
     433
     434
     435
     436class DeferredS2SClientFactory(DeferredXmlStreamFactory):
     437    """
     438    Deferred firing factory for initiating XMPP server-to-server connection.
     439
     440    The deferred has its callbacks called upon succesful authentication with
     441    the other server. In case of failed authentication or connection, the
     442    deferred will have its errbacks called instead.
     443    """
     444
     445    logTraffic = False
     446
     447    def __init__(self, authenticator):
     448        DeferredXmlStreamFactory.__init__(self, authenticator)
     449
     450        self.addBootstrap(xmlstream.STREAM_CONNECTED_EVENT,
     451                          self.onConnectionMade)
     452
     453        self.serial = 0
     454
     455
     456    def onConnectionMade(self, xs):
     457        xs.serial = self.serial
     458        self.serial += 1
     459
     460        def logDataIn(buf):
     461            log.msg("RECV (%d): %r" % (xs.serial, buf))
     462
     463        def logDataOut(buf):
     464            log.msg("SEND (%d): %r" % (xs.serial, buf))
     465
     466        if self.logTraffic:
     467            xs.rawDataInFn = logDataIn
     468            xs.rawDataOutFn = logDataOut
     469
     470
     471
     472def initiateS2S(factory):
     473    domain = factory.authenticator.otherHost
     474    c = XMPPServerConnector(reactor, domain, factory)
     475    c.connect()
     476    return factory.deferred
     477
     478
     479
     480class XMPPS2SServerFactory(XmlStreamServerFactory):
     481    """
     482    XMPP Server-to-Server Server factory.
     483
     484    This factory accepts XMPP server-to-server connections.
     485    """
     486
     487    logTraffic = False
     488
     489    def __init__(self, service):
     490        self.service = service
     491
     492        def authenticatorFactory():
     493            return XMPPServerListenAuthenticator(service)
     494
     495        XmlStreamServerFactory.__init__(self, authenticatorFactory)
     496        self.addBootstrap(xmlstream.STREAM_CONNECTED_EVENT,
     497                          self.onConnectionMade)
     498        self.addBootstrap(xmlstream.STREAM_AUTHD_EVENT,
     499                          self.onAuthenticated)
     500
     501        self.serial = 0
     502
     503
     504    def onConnectionMade(self, xs):
     505        """
     506        Called when a server-to-server connection was made.
     507
     508        This enables traffic debugging on incoming streams.
     509        """
     510        xs.serial = self.serial
     511        self.serial += 1
     512
     513        def logDataIn(buf):
     514            log.msg("RECV (%d): %r" % (xs.serial, buf))
     515
     516        def logDataOut(buf):
     517            log.msg("SEND (%d): %r" % (xs.serial, buf))
     518
     519        if self.logTraffic:
     520            xs.rawDataInFn = logDataIn
     521            xs.rawDataOutFn = logDataOut
     522
     523        xs.addObserver(xmlstream.STREAM_ERROR_EVENT, self.onError)
     524
     525
     526    def onAuthenticated(self, xs):
     527        thisHost = xs.thisEntity.host
     528        otherHost = xs.otherEntity.host
     529
     530        log.msg("Incoming connection %d from %r to %r established" %
     531                (xs.serial, otherHost, thisHost))
     532
     533        xs.addObserver(xmlstream.STREAM_END_EVENT, self.onConnectionLost,
     534                                                   0, xs)
     535        xs.addObserver('/*', self.onElement, 0, xs)
     536
     537
     538    def onConnectionLost(self, xs, reason):
     539        thisHost = xs.thisEntity.host
     540        otherHost = xs.otherEntity.host
     541
     542        log.msg("Incoming connection %d from %r to %r disconnected" %
     543                (xs.serial, otherHost, thisHost))
     544
     545
     546    def onError(self, reason):
     547        log.err(reason, "Stream Error")
     548
     549
     550    def onElement(self, xs, element):
     551        """
     552        Called when an element was received from one of the connected streams.
     553
     554        """
     555        if element.handled:
     556            return
     557        else:
     558            self.service.dispatch(xs, element)
     559
     560
     561
     562class ServerService(object):
     563    """
     564    Service for managing XMPP server to server connections.
     565    """
     566
     567    logTraffic = False
     568
     569    def __init__(self, router, domain=None, secret=None):
     570        self.router = router
     571
     572        self.defaultDomain = domain
     573        self.domains = set()
     574        if self.defaultDomain:
     575            self.domains.add(self.defaultDomain)
     576
     577        if secret is not None:
     578            self.secret = secret
     579        else:
     580            self.secret = randbytes.secureRandom(16).encode('hex')
     581
     582        self._outgoingStreams = {}
     583        self._outgoingQueues = {}
     584        self._outgoingConnecting = set()
     585        self.serial = 0
     586
     587        pipe = XmlPipe()
     588        self.xmlstream = pipe.source
     589        self.router.addRoute(None, pipe.sink)
     590        self.xmlstream.addObserver('/*', self.send)
     591
     592
     593    def outgoingInitialized(self, xs):
     594        thisHost = xs.thisEntity.host
     595        otherHost = xs.otherEntity.host
     596
     597        log.msg("Outgoing connection %d from %r to %r established" %
     598                (xs.serial, thisHost, otherHost))
     599
     600        self._outgoingStreams[thisHost, otherHost] = xs
     601        xs.addObserver(xmlstream.STREAM_END_EVENT,
     602                       lambda _: self.outgoingDisconnected(xs))
     603
     604        if (thisHost, otherHost) in self._outgoingQueues:
     605            for element in self._outgoingQueues[thisHost, otherHost]:
     606                xs.send(element)
     607            del self._outgoingQueues[thisHost, otherHost]
     608
     609
     610    def outgoingDisconnected(self, xs):
     611        thisHost = xs.thisEntity.host
     612        otherHost = xs.otherEntity.host
     613
     614        log.msg("Outgoing connection %d from %r to %r disconnected" %
     615                (xs.serial, thisHost, otherHost))
     616
     617        del self._outgoingStreams[thisHost, otherHost]
     618
     619
     620    def initiateOutgoingStream(self, thisHost, otherHost):
     621        """
     622        Initiate an outgoing XMPP server-to-server connection.
     623        """
     624
     625        def resetConnecting(_):
     626            self._outgoingConnecting.remove((thisHost, otherHost))
     627
     628        if (thisHost, otherHost) in self._outgoingConnecting:
     629            return
     630
     631        authenticator = XMPPServerConnectAuthenticator(thisHost,
     632                                                       otherHost,
     633                                                       self.secret)
     634        factory = DeferredS2SClientFactory(authenticator)
     635        factory.addBootstrap(xmlstream.STREAM_AUTHD_EVENT,
     636                             self.outgoingInitialized)
     637        factory.logTraffic = self.logTraffic
     638
     639        self._outgoingConnecting.add((thisHost, otherHost))
     640
     641        d = initiateS2S(factory)
     642        d.addBoth(resetConnecting)
     643        return d
     644
     645
     646    def validateConnection(self, thisHost, otherHost, sid, key):
     647        """
     648        Validate an incoming XMPP server-to-server connection.
     649        """
     650
     651        def connected(xs):
     652            # Set up stream for immediate disconnection.
     653            def disconnect(_):
     654                xs.transport.loseConnection()
     655            xs.addObserver(xmlstream.STREAM_AUTHD_EVENT, disconnect)
     656            xs.addObserver(xmlstream.INIT_FAILED_EVENT, disconnect)
     657
     658        authenticator = XMPPServerVerifyAuthenticator(thisHost, otherHost,
     659                                                      sid, key)
     660        factory = DeferredS2SClientFactory(authenticator)
     661        factory.addBootstrap(xmlstream.STREAM_CONNECTED_EVENT, connected)
     662        factory.logTraffic = self.logTraffic
     663
     664        d = initiateS2S(factory)
     665        return d
     666
     667
     668    def send(self, stanza):
     669        """
     670        Send stanza to the proper XML Stream.
     671
     672        This uses addressing embedded in the stanza to find the correct stream
     673        to forward the stanza to.
     674        """
     675
     676        otherHost = jid.internJID(stanza["to"]).host
     677        thisHost = jid.internJID(stanza["from"]).host
     678
     679        if (thisHost, otherHost) not in self._outgoingStreams:
     680            # There is no connection with the destination (yet). Cache the
     681            # outgoing stanza until the connection has been established.
     682            # XXX: If the connection cannot be established, the queue should
     683            #      be emptied at some point.
     684            if (thisHost, otherHost) not in self._outgoingQueues:
     685                self._outgoingQueues[(thisHost, otherHost)] = []
     686            self._outgoingQueues[(thisHost, otherHost)].append(stanza)
     687            self.initiateOutgoingStream(thisHost, otherHost)
     688        else:
     689            self._outgoingStreams[(thisHost, otherHost)].send(stanza)
     690
     691
     692    def dispatch(self, xs, stanza):
     693        """
     694        Send on element to be routed within the server.
     695        """
     696        stanzaFrom = stanza.getAttribute('from')
     697        stanzaTo = stanza.getAttribute('to')
     698
     699        if not stanzaFrom or not stanzaTo:
     700            xs.sendStreamError(error.StreamError('improper-addressing'))
     701        else:
     702            try:
     703                sender = jid.internJID(stanzaFrom)
     704                recipient = jid.internJID(stanzaTo)
     705            except jid.InvalidFormat:
     706                log.msg("Dropping error stanza with malformed JID")
     707
     708            if sender.host != xs.otherEntity.host:
     709                xs.sendStreamError(error.StreamError('invalid-from'))
     710            else:
     711                self.xmlstream.send(stanza)
  • new file wokkel/test/test_server.py

    diff -r fd1dc58fe561 wokkel/test/test_server.py
    - +  
     1# Copyright (c) 2003-2008 Ralph Meijer
     2# See LICENSE for details.
     3
     4"""
     5Tests for L{wokkel.server}.
     6"""
     7
     8from twisted.internet import defer
     9from twisted.python import failure
     10from twisted.test.proto_helpers import StringTransport
     11from twisted.trial import unittest
     12from twisted.words.protocols.jabber import error, jid, xmlstream
     13from twisted.words.xish import domish
     14
     15from wokkel import component, server
     16
     17NS_STREAMS = 'http://etherx.jabber.org/streams'
     18NS_DIALBACK = "jabber:server:dialback"
     19
     20class GenerateKeyTest(unittest.TestCase):
     21    """
     22    Tests for L{server.generateKey}.
     23    """
     24
     25    def testBasic(self):
     26        originating = "example.org"
     27        receiving = "xmpp.example.com"
     28        sid = "D60000229F"
     29        secret = "s3cr3tf0rd14lb4ck"
     30
     31        key = server.generateKey(secret, receiving, originating, sid)
     32
     33        self.assertEqual(key,
     34            '37c69b1cf07a3f67c04a5ef5902fa5114f2c76fe4a2686482ba5b89323075643')
     35
     36
     37
     38class XMPPServerListenAuthenticatorTest(unittest.TestCase):
     39    """
     40    Tests for L{server.XMPPServerListenAuthenticator}.
     41    """
     42
     43    secret = "s3cr3tf0rd14lb4ck"
     44    originating = "example.org"
     45    receiving = "xmpp.example.com"
     46    sid = "D60000229F"
     47    key = '37c69b1cf07a3f67c04a5ef5902fa5114f2c76fe4a2686482ba5b89323075643'
     48
     49    def setUp(self):
     50        self.output = []
     51
     52        class MyService(object):
     53            pass
     54
     55        self.service = MyService()
     56        self.service.defaultDomain = self.receiving
     57        self.service.domains = [self.receiving, 'pubsub.'+self.receiving]
     58        self.service.secret = self.secret
     59
     60        self.authenticator = server.XMPPServerListenAuthenticator(self.service)
     61        self.xmlstream = xmlstream.XmlStream(self.authenticator)
     62        self.xmlstream.send = self.output.append
     63        self.xmlstream.transport = StringTransport()
     64
     65
     66    def test_attributes(self):
     67        """
     68        Test attributes of authenticator and stream objects.
     69        """
     70        self.assertEqual(self.service, self.authenticator.service)
     71        self.assertEqual(self.xmlstream.initiating, False)
     72
     73
     74    def test_streamStartedVersion0(self):
     75        """
     76        The authenticator supports pre-XMPP 1.0 streams.
     77        """
     78        self.xmlstream.connectionMade()
     79        self.xmlstream.dataReceived(
     80            "<stream:stream xmlns:stream='http://etherx.jabber.org/streams' "
     81                           "xmlns:db='jabber:server:dialback' "
     82                           "xmlns='jabber:server' "
     83                           "to='xmpp.example.com'>")
     84        self.assertEqual((0, 0), self.xmlstream.version)
     85
     86
     87    def test_streamStartedVersion1(self):
     88        """
     89        The authenticator supports XMPP 1.0 streams.
     90        """
     91        self.xmlstream.connectionMade()
     92        self.xmlstream.dataReceived(
     93            "<stream:stream xmlns:stream='http://etherx.jabber.org/streams' "
     94                           "xmlns:db='jabber:server:dialback' "
     95                           "xmlns='jabber:server' "
     96                           "to='xmpp.example.com' "
     97                           "version='1.0'>")
     98        self.assertEqual((1, 0), self.xmlstream.version)
     99
     100
     101    def test_streamStartedSID(self):
     102        """
     103        The response stream will have a stream ID.
     104        """
     105        self.xmlstream.connectionMade()
     106        self.assertIdentical(None, self.xmlstream.sid)
     107
     108        self.xmlstream.dataReceived(
     109            "<stream:stream xmlns:stream='http://etherx.jabber.org/streams' "
     110                           "xmlns:db='jabber:server:dialback' "
     111                           "xmlns='jabber:server' "
     112                           "to='xmpp.example.com' "
     113                           "version='1.0'>")
     114        self.assertNotIdentical(None, self.xmlstream.sid)
     115
     116
     117    def test_streamStartedSentResponseHeader(self):
     118        """
     119        A stream header is sent in response to the incoming stream header.
     120        """
     121        self.xmlstream.connectionMade()
     122        self.assertFalse(self.xmlstream._headerSent)
     123
     124        self.xmlstream.dataReceived(
     125            "<stream:stream xmlns:stream='http://etherx.jabber.org/streams' "
     126                           "xmlns:db='jabber:server:dialback' "
     127                           "xmlns='jabber:server' "
     128                           "to='xmpp.example.com'>")
     129        self.assertTrue(self.xmlstream._headerSent)
     130
     131
     132    def test_streamStartedNotSentFeatures(self):
     133        """
     134        No features are sent in response to an XMPP < 1.0 stream header.
     135        """
     136        self.xmlstream.connectionMade()
     137        self.xmlstream.dataReceived(
     138            "<stream:stream xmlns:stream='http://etherx.jabber.org/streams' "
     139                           "xmlns:db='jabber:server:dialback' "
     140                           "xmlns='jabber:server' "
     141                           "to='xmpp.example.com'>")
     142        self.assertEqual(1, len(self.output))
     143
     144
     145    def test_streamStartedSentFeatures(self):
     146        """
     147        Features are sent in response to an XMPP >= 1.0 stream header.
     148        """
     149        self.xmlstream.connectionMade()
     150        self.xmlstream.dataReceived(
     151            "<stream:stream xmlns:stream='http://etherx.jabber.org/streams' "
     152                           "xmlns:db='jabber:server:dialback' "
     153                           "xmlns='jabber:server' "
     154                           "to='xmpp.example.com' "
     155                           "version='1.0'>")
     156        self.assertEqual(2, len(self.output))
     157        features = self.output[-1]
     158        self.assertEqual(NS_STREAMS, features.uri)
     159        self.assertEqual('features', features.name)
     160
     161
     162    def test_streamRootElement(self):
     163        """
     164        Test stream error on wrong stream namespace.
     165        """
     166        self.xmlstream.connectionMade()
     167        self.xmlstream.dataReceived(
     168            "<stream:stream xmlns:stream='badns' "
     169                           "xmlns:db='jabber:server:dialback' "
     170                           "xmlns='jabber:server' "
     171                           "to='xmpp.example.com'>")
     172
     173        self.assertEqual(3, len(self.output))
     174        exc = error.exceptionFromStreamError(self.output[1])
     175        self.assertEqual('invalid-namespace', exc.condition)
     176
     177
     178    def test_streamDefaultNamespace(self):
     179        """
     180        Test stream error on missing dialback namespace.
     181        """
     182        self.xmlstream.connectionMade()
     183        self.xmlstream.dataReceived(
     184            "<stream:stream xmlns:stream='http://etherx.jabber.org/streams' "
     185                           "xmlns:db='jabber:server:dialback' "
     186                           "xmlns='badns' "
     187                           "to='xmpp.example.com'>")
     188
     189        self.assertEqual(3, len(self.output))
     190        exc = error.exceptionFromStreamError(self.output[1])
     191        self.assertEqual('invalid-namespace', exc.condition)
     192
     193
     194    def test_streamNoDialbackNamespace(self):
     195        """
     196        Test stream error on missing dialback namespace.
     197        """
     198        self.xmlstream.connectionMade()
     199        self.xmlstream.dataReceived(
     200            "<stream:stream xmlns:stream='http://etherx.jabber.org/streams' "
     201                           "xmlns='jabber:server' "
     202                           "to='xmpp.example.com'>")
     203
     204        self.assertEqual(3, len(self.output))
     205        exc = error.exceptionFromStreamError(self.output[1])
     206        self.assertEqual('invalid-namespace', exc.condition)
     207
     208
     209    def test_streamBadDialbackNamespace(self):
     210        """
     211        Test stream error on missing dialback namespace.
     212        """
     213        self.xmlstream.connectionMade()
     214        self.xmlstream.dataReceived(
     215            "<stream:stream xmlns:stream='http://etherx.jabber.org/streams' "
     216                           "xmlns:db='badns' "
     217                           "xmlns='jabber:server' "
     218                           "to='xmpp.example.com'>")
     219
     220        self.assertEqual(3, len(self.output))
     221        exc = error.exceptionFromStreamError(self.output[1])
     222        self.assertEqual('invalid-namespace', exc.condition)
     223
     224
     225    def test_streamToUnknownHost(self):
     226        """
     227        Test stream error on stream's to attribute having unknown host.
     228        """
     229        self.xmlstream.connectionMade()
     230        self.xmlstream.dataReceived(
     231            "<stream:stream xmlns:stream='http://etherx.jabber.org/streams' "
     232                           "xmlns:db='jabber:server:dialback' "
     233                           "xmlns='jabber:server' "
     234                           "to='badhost'>")
     235
     236        self.assertEqual(3, len(self.output))
     237        exc = error.exceptionFromStreamError(self.output[1])
     238        self.assertEqual('host-unknown', exc.condition)
     239
     240
     241    def test_streamToOtherLocalHost(self):
     242        """
     243        The authenticator supports XMPP 1.0 streams.
     244        """
     245        self.xmlstream.connectionMade()
     246        self.xmlstream.dataReceived(
     247            "<stream:stream xmlns:stream='http://etherx.jabber.org/streams' "
     248                           "xmlns:db='jabber:server:dialback' "
     249                           "xmlns='jabber:server' "
     250                           "to='pubsub.xmpp.example.com' "
     251                           "version='1.0'>")
     252
     253        self.assertEqual(2, len(self.output))
     254        self.assertEqual(jid.JID('pubsub.xmpp.example.com'),
     255                         self.xmlstream.thisEntity)
     256
     257    def test_onResult(self):
     258        def cb(result):
     259            self.assertEqual(1, len(self.output))
     260            reply = self.output[0]
     261            self.assertEqual(self.originating, reply['to'])
     262            self.assertEqual(self.receiving, reply['from'])
     263            self.assertEqual('valid', reply['type'])
     264
     265        def validateConnection(thisHost, otherHost, sid, key):
     266            self.assertEqual(thisHost, self.receiving)
     267            self.assertEqual(otherHost, self.originating)
     268            self.assertEqual(sid, self.sid)
     269            self.assertEqual(key, self.key)
     270            return defer.succeed(None)
     271
     272        self.xmlstream.sid = self.sid
     273        self.service.validateConnection = validateConnection
     274
     275        result = domish.Element((NS_DIALBACK, 'result'))
     276        result['to'] = self.receiving
     277        result['from'] = self.originating
     278        result.addContent(self.key)
     279
     280        d = self.authenticator.onResult(result)
     281        d.addCallback(cb)
     282        return d
     283
     284
     285    def test_onResultFailure(self):
     286        class TestError(Exception):
     287            pass
     288
     289        def cb(result):
     290            reply = self.output[0]
     291            self.assertEqual('invalid', reply['type'])
     292            self.assertEqual(1, len(self.flushLoggedErrors(TestError)))
     293
     294
     295        def validateConnection(thisHost, otherHost, sid, key):
     296            return defer.fail(TestError())
     297
     298        self.xmlstream.sid = self.sid
     299        self.service.validateConnection = validateConnection
     300
     301        result = domish.Element((NS_DIALBACK, 'result'))
     302        result['to'] = self.receiving
     303        result['from'] = self.originating
     304        result.addContent(self.key)
     305
     306        d = self.authenticator.onResult(result)
     307        d.addCallback(cb)
     308        return d
     309
     310
     311
     312class FakeService(object):
     313    domains = set(['example.org', 'pubsub.example.org'])
     314    defaultDomain = 'example.org'
     315    secret = 'mysecret'
     316
     317    def __init__(self):
     318        self.dispatched = []
     319
     320    def dispatch(self, xs, element):
     321        self.dispatched.append(element)
     322
     323
     324
     325class XMPPS2SServerFactoryTest(unittest.TestCase):
     326    """
     327    Tests for L{component.XMPPS2SServerFactory}.
     328    """
     329
     330    def setUp(self):
     331        self.service = FakeService()
     332        self.factory = server.XMPPS2SServerFactory(self.service)
     333        self.xmlstream = self.factory.buildProtocol(None)
     334        self.transport = StringTransport()
     335        self.xmlstream.thisEntity = jid.JID('example.org')
     336        self.xmlstream.otherEntity = jid.JID('example.com')
     337
     338
     339    def test_makeConnection(self):
     340        """
     341        A new connection increases the stream serial count. No logs by default.
     342        """
     343        self.xmlstream.makeConnection(self.transport)
     344        self.assertEqual(0, self.xmlstream.serial)
     345        self.assertEqual(1, self.factory.serial)
     346        self.assertIdentical(None, self.xmlstream.rawDataInFn)
     347        self.assertIdentical(None, self.xmlstream.rawDataOutFn)
     348
     349
     350    def test_makeConnectionLogTraffic(self):
     351        """
     352        Setting logTraffic should set up raw data loggers.
     353        """
     354        self.factory.logTraffic = True
     355        self.xmlstream.makeConnection(self.transport)
     356        self.assertNotIdentical(None, self.xmlstream.rawDataInFn)
     357        self.assertNotIdentical(None, self.xmlstream.rawDataOutFn)
     358
     359
     360    def test_onError(self):
     361        """
     362        An observer for stream errors should trigger onError to log it.
     363        """
     364        self.xmlstream.makeConnection(self.transport)
     365
     366        class TestError(Exception):
     367            pass
     368
     369        reason = failure.Failure(TestError())
     370        self.xmlstream.dispatch(reason, xmlstream.STREAM_ERROR_EVENT)
     371        self.assertEqual(1, len(self.flushLoggedErrors(TestError)))
     372
     373
     374    def test_connectionInitialized(self):
     375        """
     376        """
     377        self.xmlstream.makeConnection(self.transport)
     378        self.xmlstream.dispatch(self.xmlstream, xmlstream.STREAM_AUTHD_EVENT)
     379
     380
     381    def test_connectionLost(self):
     382        """
     383        """
     384        self.xmlstream.makeConnection(self.transport)
     385        self.xmlstream.dispatch(self.xmlstream, xmlstream.STREAM_AUTHD_EVENT)
     386        self.xmlstream.dispatch(None, xmlstream.STREAM_END_EVENT)
     387
     388
     389    def test_Element(self):
     390        self.xmlstream.makeConnection(self.transport)
     391        self.xmlstream.dispatch(self.xmlstream, xmlstream.STREAM_AUTHD_EVENT)
     392
     393        stanza = domish.Element((None, "presence"))
     394        self.xmlstream.dispatch(stanza)
     395        self.assertEqual(1, len(self.service.dispatched))
     396        self.assertIdentical(stanza, self.service.dispatched[-1])
     397
     398
     399    def test_ElementNotAuthenticated(self):
     400        self.xmlstream.makeConnection(self.transport)
     401
     402        stanza = domish.Element((None, "presence"))
     403        self.xmlstream.dispatch(stanza)
     404        self.assertEqual(0, len(self.service.dispatched))
     405
     406
     407
     408class ServerServiceTest(unittest.TestCase):
     409
     410    def setUp(self):
     411        self.output = []
     412
     413        self.xmlstream = xmlstream.XmlStream(xmlstream.Authenticator())
     414        self.xmlstream.thisEntity = jid.JID('example.org')
     415        self.xmlstream.otherEntity = jid.JID('example.com')
     416        self.xmlstream.send = self.output.append
     417
     418        self.router = component.Router()
     419        self.service = server.ServerService(self.router,
     420                                            secret='mysecret',
     421                                            domain='example.org')
     422        self.service.xmlstream = self.xmlstream
     423
     424
     425    def test_defaultDomainInDomains(self):
     426        """
     427        The default domain is part of the domains considered local.
     428        """
     429        self.assertIn(self.service.defaultDomain, self.service.domains)
     430
     431
     432    def test_dispatch(self):
     433        stanza = domish.Element((None, "presence"))
     434        stanza['to'] = 'user@example.org'
     435        stanza['from'] = 'other@example.com'
     436        self.service.dispatch(self.xmlstream, stanza)
     437
     438        self.assertEqual(1, len(self.output))
     439        self.assertIdentical(stanza, self.output[-1])
     440
     441
     442    def test_dispatchNoTo(self):
     443        errors = []
     444        self.xmlstream.sendStreamError = errors.append
     445
     446        stanza = domish.Element((None, "presence"))
     447        stanza['from'] = 'other@example.com'
     448        self.service.dispatch(self.xmlstream, stanza)
     449
     450        self.assertEqual(1, len(errors))
Note: See TracBrowser for help on using the repository browser.