Changeset 35:eb020b49a77d


Ignore:
Timestamp:
Oct 10, 2008, 5:24:28 PM (13 years ago)
Author:
Ralph Meijer <ralphm@…>
Branch:
default
Convert:
svn:b33ecbfc-034c-dc11-8662-000475d9059e/trunk@90
Message:

Add XMPP router and server side component authenticator.

Author: ralphm.
Fixed #30.

Location:
wokkel
Files:
1 added
3 edited

Legend:

Unmodified
Added
Removed
  • wokkel/component.py

    r6 r35  
    1 # Copyright (c) 2003-2007 Ralph Meijer
     1# -*- test-case-name: wokkel.test.test_component -*-
     2#
     3# Copyright (c) 2003-2008 Ralph Meijer
    24# See LICENSE for details.
    35
     
    810from twisted.application import service
    911from twisted.internet import reactor
    10 from twisted.words.protocols.jabber import component
     12from twisted.python import log
     13from twisted.words.protocols.jabber.jid import internJID as JID
     14from twisted.words.protocols.jabber import component, error, xmlstream
    1115from twisted.words.xish import domish
    1216
    1317from wokkel.subprotocols import StreamManager
     18
     19NS_COMPONENT_ACCEPT = 'jabber:component:accept'
    1420
    1521class Component(StreamManager, service.Service):
     
    5763    def _getConnection(self):
    5864        return reactor.connectTCP(self.host, self.port, self.factory)
     65
     66
     67
     68class ListenComponentAuthenticator(xmlstream.ListenAuthenticator):
     69    """
     70    Authenticator for accepting components.
     71    """
     72    namespace = NS_COMPONENT_ACCEPT
     73
     74    def __init__(self, secret):
     75        self.secret = secret
     76        xmlstream.ListenAuthenticator.__init__(self)
     77
     78
     79    def associateWithStream(self, xs):
     80        xs.version = (0, 0)
     81        xmlstream.ListenAuthenticator.associateWithStream(self, xs)
     82
     83
     84    def streamStarted(self, rootElement):
     85        xmlstream.ListenAuthenticator.streamStarted(self, rootElement)
     86
     87        if rootElement.defaultUri != self.namespace:
     88            exc = error.StreamError('invalid-namespace')
     89            self.xmlstream.sendStreamError(exc)
     90            return
     91
     92        # self.xmlstream.thisEntity is set to the address the component
     93        # wants to assume. This should probably be checked.
     94        if not self.xmlstream.thisEntity:
     95            exc = error.StreamError('improper-addressing')
     96            self.xmlstream.sendStreamError(exc)
     97            return
     98
     99        self.xmlstream.sid = 'random' # FIXME
     100
     101        self.xmlstream.sendHeader()
     102        self.xmlstream.addOnetimeObserver('/*', self.onElement)
     103
     104
     105    def onElement(self, element):
     106        if (element.uri, element.name) == (self.namespace, 'handshake'):
     107            self.onHandshake(unicode(element))
     108        else:
     109            exc = error.streamError('not-authorized')
     110            self.xmlstream.sendStreamError(exc)
     111
     112
     113    def onHandshake(self, handshake):
     114        calculatedHash = xmlstream.hashPassword(self.xmlstream.sid, self.secret)
     115        if handshake != calculatedHash:
     116            exc = error.StreamError('not-authorized', text='Invalid hash')
     117            self.xmlstream.sendStreamError(exc)
     118        else:
     119            self.xmlstream.send('<handshake/>')
     120            self.xmlstream.dispatch(self.xmlstream,
     121                                    xmlstream.STREAM_AUTHD_EVENT)
     122
     123
     124
     125class RouterService(service.Service):
     126    """
     127    XMPP Server's Router Service.
     128
     129    This service connects the different components of the XMPP service and
     130    routes messages between them based on the given routing table.
     131
     132    Connected components are trusted to have correct addressing in the
     133    stanzas they offer for routing.
     134
     135    A route destination of C{None} adds a default route. Traffic for which no
     136    specific route exists, will be routed to this default route.
     137
     138    @ivar routes: Routes based on the host part of JIDs. Maps host names to the
     139                  L{EventDispatcher<utility.EventDispatcher>}s that should
     140                  receive the traffic. A key of C{None} means the default
     141                  route.
     142    @type routes: C{dict}
     143    """
     144
     145    def __init__(self):
     146        self.routes = {}
     147
     148
     149    def addRoute(self, destination, xs):
     150        """
     151        Add a new route.
     152
     153        The passed XML Stream C{xs} will have an observer for all stanzas
     154        added to route its outgoing traffic. In turn, traffic for
     155        C{destination} will be passed to this stream.
     156
     157        @param destination: Destination of the route to be added as a host name
     158                            or C{None} for the default route.
     159        @type destination: C{str} or C{NoneType}.
     160        @param xs: XML Stream to register the route for.
     161        @type xs: L{EventDispatcher<utility.EventDispatcher>}.
     162        """
     163        self.routes[destination] = xs
     164        xs.addObserver('/*', self.route)
     165
     166
     167    def removeRoute(self, destination, xs):
     168        """
     169        Remove a route.
     170
     171        @param destination: Destination of the route that should be removed.
     172        @type destination: C{str}.
     173        @param xs: XML Stream to remove the route for.
     174        @type xs: L{EventDispatcher<utility.EventDispatcher>}.
     175        """
     176        xs.removeObserver('/*', self.route)
     177        if (xs == self.routes[destination]):
     178            del self.routes[destination]
     179
     180
     181    def route(self, stanza):
     182        """
     183        Route a stanza.
     184
     185        @param stanza: The stanza to be routed.
     186        @type stanza: L{domish.Element}.
     187        """
     188        if not list(stanza.elements()):
     189            return
     190
     191        destination = JID(stanza['to'])
     192
     193        log.msg("Routing to %s: %r" % (destination.full(), stanza.toXml()))
     194
     195        if destination.host in self.routes:
     196            self.routes[destination.host].send(stanza)
     197        else:
     198            self.routes[None].send(stanza)
     199
     200
     201
     202class ComponentServer(service.Service):
     203    """
     204    XMPP Component Server service.
     205
     206    This service accepts XMPP external component connections and makes
     207    the router service route traffic for a component's bound domain
     208    to that component.
     209    """
     210
     211    logTraffic = False
     212
     213    def __init__(self, router, port=5347, secret='secret'):
     214        self.router = router
     215        self.port = port
     216        self.secret = secret
     217
     218        def authenticatorFactory():
     219            return ListenComponentAuthenticator(self.secret)
     220
     221        self.factory = xmlstream.XmlStreamServerFactory(authenticatorFactory)
     222        self.factory.addBootstrap(xmlstream.STREAM_CONNECTED_EVENT,
     223                                  self.makeConnection)
     224        self.factory.addBootstrap(xmlstream.STREAM_AUTHD_EVENT,
     225                                  self.connectionInitialized)
     226
     227        self.serial = 0
     228
     229
     230    def startService(self):
     231        service.Service.startService(self)
     232        reactor.listenTCP(self.port, self.factory)
     233
     234
     235    def makeConnection(self, xs):
     236        """
     237        Called when a component connection was made.
     238
     239        This enables traffic debugging on incoming streams.
     240        """
     241        xs.serial = self.serial
     242        self.serial += 1
     243
     244        def logDataIn(buf):
     245            log.msg("RECV (%d): %r" % (xs.serial, buf))
     246
     247        def logDataOut(buf):
     248            log.msg("SEND (%d): %r" % (xs.serial, buf))
     249
     250        if self.logTraffic:
     251            xs.rawDataInFn = logDataIn
     252            xs.rawDataOutFn = logDataOut
     253
     254        xs.addObserver(xmlstream.STREAM_ERROR_EVENT, self.onError)
     255
     256
     257    def connectionInitialized(self, xs):
     258        """
     259        Called when a component has succesfully authenticated.
     260
     261        Add the component to the routing table and establish a handler
     262        for a closed connection.
     263        """
     264        destination = xs.thisEntity.host
     265
     266        self.router.addRoute(destination, xs)
     267        xs.addObserver(xmlstream.STREAM_END_EVENT, self.connectionLost, 0,
     268                                                   destination, xs)
     269
     270
     271    def onError(self, reason):
     272        log.err(reason, "Stream Error")
     273
     274
     275    def connectionLost(self, destination, xs, reason):
     276        self.router.removeRoute(destination, xs)
  • wokkel/generic.py

    r30 r35  
    1313from twisted.words.protocols.jabber import error
    1414from twisted.words.protocols.jabber.xmlstream import toResponse
    15 from twisted.words.xish import domish
     15from twisted.words.xish import domish, utility
    1616
    1717from wokkel import disco
     
    121121    def getDiscoItems(self, requestor, target, node):
    122122        return defer.succeed([])
     123
     124
     125
     126class XmlPipe(object):
     127    """
     128    XML stream pipe.
     129
     130    Connects two objects that communicate stanzas through an XML stream like
     131    interface. Each of the ends of the pipe (sink and source) can be used to
     132    send XML stanzas to the other side, or add observers to process XML stanzas
     133    that were sent from the other side.
     134
     135    XML pipes are usually used in place of regular XML streams that are
     136    transported over TCP. This is the reason for the use of the names source
     137    and sink for both ends of the pipe. The source side corresponds with the
     138    entity that initiated the TCP connection, whereas the sink corresponds with
     139    the entity that accepts that connection. In this object, though, the source
     140    and sink are treated equally.
     141
     142    Unlike Jabber
     143    L{XmlStream<twisted.words.protocols.jabber.xmlstream.XmlStream>}s, the sink
     144    and source objects are assumed to represent an eternal connected and
     145    initialized XML stream. As such, events corresponding to connection,
     146    disconnection, initialization and stream errors are not dispatched or
     147    processed.
     148
     149    @ivar source: Source XML stream.
     150    @ivar sink: Sink XML stream.
     151    """
     152
     153    def __init__(self):
     154        self.source = utility.EventDispatcher()
     155        self.sink = utility.EventDispatcher()
     156        self.source.send = lambda obj: self.sink.dispatch(obj)
     157        self.sink.send = lambda obj: self.source.dispatch(obj)
  • wokkel/test/test_generic.py

    r20 r35  
    4949        self.assertEquals(1, len(elements))
    5050        self.assertEquals('0.1.0', unicode(elements[0]))
     51
     52
     53
     54class XmlPipeTest(unittest.TestCase):
     55    """
     56    Tests for L{wokkel.generic.XmlPipe}.
     57    """
     58
     59    def setUp(self):
     60        self.pipe = generic.XmlPipe()
     61
     62
     63    def test_sendFromSource(self):
     64        """
     65        Send an element from the source and observe it from the sink.
     66        """
     67        def cb(obj):
     68            called.append(obj)
     69
     70        called = []
     71        self.pipe.sink.addObserver('/test[@xmlns="testns"]', cb)
     72        element = domish.Element(('testns', 'test'))
     73        self.pipe.source.send(element)
     74        self.assertEquals([element], called)
     75
     76
     77    def test_sendFromSink(self):
     78        """
     79        Send an element from the sink and observe it from the source.
     80        """
     81        def cb(obj):
     82            called.append(obj)
     83
     84        called = []
     85        self.pipe.source.addObserver('/test[@xmlns="testns"]', cb)
     86        element = domish.Element(('testns', 'test'))
     87        self.pipe.sink.send(element)
     88        self.assertEquals([element], called)
Note: See TracChangeset for help on using the changeset viewer.