Ignore:
Timestamp:
Oct 10, 2008, 5:24:28 PM (14 years ago)
Author:
Ralph Meijer <ralphm@…>
Branch:
default
Convert:
svn:b33ecbfc-034c-dc11-8662-000475d9059e/trunk@90
Message:

Add XMPP router and server side component authenticator.

Author: ralphm.
Fixed #30.

File:
1 edited

Legend:

Unmodified
Added
Removed
  • wokkel/component.py

    r6 r35  
    1 # Copyright (c) 2003-2007 Ralph Meijer
     1# -*- test-case-name: wokkel.test.test_component -*-
     2#
     3# Copyright (c) 2003-2008 Ralph Meijer
    24# See LICENSE for details.
    35
     
    810from twisted.application import service
    911from twisted.internet import reactor
    10 from twisted.words.protocols.jabber import component
     12from twisted.python import log
     13from twisted.words.protocols.jabber.jid import internJID as JID
     14from twisted.words.protocols.jabber import component, error, xmlstream
    1115from twisted.words.xish import domish
    1216
    1317from wokkel.subprotocols import StreamManager
     18
     19NS_COMPONENT_ACCEPT = 'jabber:component:accept'
    1420
    1521class Component(StreamManager, service.Service):
     
    5763    def _getConnection(self):
    5864        return reactor.connectTCP(self.host, self.port, self.factory)
     65
     66
     67
     68class ListenComponentAuthenticator(xmlstream.ListenAuthenticator):
     69    """
     70    Authenticator for accepting components.
     71    """
     72    namespace = NS_COMPONENT_ACCEPT
     73
     74    def __init__(self, secret):
     75        self.secret = secret
     76        xmlstream.ListenAuthenticator.__init__(self)
     77
     78
     79    def associateWithStream(self, xs):
     80        xs.version = (0, 0)
     81        xmlstream.ListenAuthenticator.associateWithStream(self, xs)
     82
     83
     84    def streamStarted(self, rootElement):
     85        xmlstream.ListenAuthenticator.streamStarted(self, rootElement)
     86
     87        if rootElement.defaultUri != self.namespace:
     88            exc = error.StreamError('invalid-namespace')
     89            self.xmlstream.sendStreamError(exc)
     90            return
     91
     92        # self.xmlstream.thisEntity is set to the address the component
     93        # wants to assume. This should probably be checked.
     94        if not self.xmlstream.thisEntity:
     95            exc = error.StreamError('improper-addressing')
     96            self.xmlstream.sendStreamError(exc)
     97            return
     98
     99        self.xmlstream.sid = 'random' # FIXME
     100
     101        self.xmlstream.sendHeader()
     102        self.xmlstream.addOnetimeObserver('/*', self.onElement)
     103
     104
     105    def onElement(self, element):
     106        if (element.uri, element.name) == (self.namespace, 'handshake'):
     107            self.onHandshake(unicode(element))
     108        else:
     109            exc = error.streamError('not-authorized')
     110            self.xmlstream.sendStreamError(exc)
     111
     112
     113    def onHandshake(self, handshake):
     114        calculatedHash = xmlstream.hashPassword(self.xmlstream.sid, self.secret)
     115        if handshake != calculatedHash:
     116            exc = error.StreamError('not-authorized', text='Invalid hash')
     117            self.xmlstream.sendStreamError(exc)
     118        else:
     119            self.xmlstream.send('<handshake/>')
     120            self.xmlstream.dispatch(self.xmlstream,
     121                                    xmlstream.STREAM_AUTHD_EVENT)
     122
     123
     124
     125class RouterService(service.Service):
     126    """
     127    XMPP Server's Router Service.
     128
     129    This service connects the different components of the XMPP service and
     130    routes messages between them based on the given routing table.
     131
     132    Connected components are trusted to have correct addressing in the
     133    stanzas they offer for routing.
     134
     135    A route destination of C{None} adds a default route. Traffic for which no
     136    specific route exists, will be routed to this default route.
     137
     138    @ivar routes: Routes based on the host part of JIDs. Maps host names to the
     139                  L{EventDispatcher<utility.EventDispatcher>}s that should
     140                  receive the traffic. A key of C{None} means the default
     141                  route.
     142    @type routes: C{dict}
     143    """
     144
     145    def __init__(self):
     146        self.routes = {}
     147
     148
     149    def addRoute(self, destination, xs):
     150        """
     151        Add a new route.
     152
     153        The passed XML Stream C{xs} will have an observer for all stanzas
     154        added to route its outgoing traffic. In turn, traffic for
     155        C{destination} will be passed to this stream.
     156
     157        @param destination: Destination of the route to be added as a host name
     158                            or C{None} for the default route.
     159        @type destination: C{str} or C{NoneType}.
     160        @param xs: XML Stream to register the route for.
     161        @type xs: L{EventDispatcher<utility.EventDispatcher>}.
     162        """
     163        self.routes[destination] = xs
     164        xs.addObserver('/*', self.route)
     165
     166
     167    def removeRoute(self, destination, xs):
     168        """
     169        Remove a route.
     170
     171        @param destination: Destination of the route that should be removed.
     172        @type destination: C{str}.
     173        @param xs: XML Stream to remove the route for.
     174        @type xs: L{EventDispatcher<utility.EventDispatcher>}.
     175        """
     176        xs.removeObserver('/*', self.route)
     177        if (xs == self.routes[destination]):
     178            del self.routes[destination]
     179
     180
     181    def route(self, stanza):
     182        """
     183        Route a stanza.
     184
     185        @param stanza: The stanza to be routed.
     186        @type stanza: L{domish.Element}.
     187        """
     188        if not list(stanza.elements()):
     189            return
     190
     191        destination = JID(stanza['to'])
     192
     193        log.msg("Routing to %s: %r" % (destination.full(), stanza.toXml()))
     194
     195        if destination.host in self.routes:
     196            self.routes[destination.host].send(stanza)
     197        else:
     198            self.routes[None].send(stanza)
     199
     200
     201
     202class ComponentServer(service.Service):
     203    """
     204    XMPP Component Server service.
     205
     206    This service accepts XMPP external component connections and makes
     207    the router service route traffic for a component's bound domain
     208    to that component.
     209    """
     210
     211    logTraffic = False
     212
     213    def __init__(self, router, port=5347, secret='secret'):
     214        self.router = router
     215        self.port = port
     216        self.secret = secret
     217
     218        def authenticatorFactory():
     219            return ListenComponentAuthenticator(self.secret)
     220
     221        self.factory = xmlstream.XmlStreamServerFactory(authenticatorFactory)
     222        self.factory.addBootstrap(xmlstream.STREAM_CONNECTED_EVENT,
     223                                  self.makeConnection)
     224        self.factory.addBootstrap(xmlstream.STREAM_AUTHD_EVENT,
     225                                  self.connectionInitialized)
     226
     227        self.serial = 0
     228
     229
     230    def startService(self):
     231        service.Service.startService(self)
     232        reactor.listenTCP(self.port, self.factory)
     233
     234
     235    def makeConnection(self, xs):
     236        """
     237        Called when a component connection was made.
     238
     239        This enables traffic debugging on incoming streams.
     240        """
     241        xs.serial = self.serial
     242        self.serial += 1
     243
     244        def logDataIn(buf):
     245            log.msg("RECV (%d): %r" % (xs.serial, buf))
     246
     247        def logDataOut(buf):
     248            log.msg("SEND (%d): %r" % (xs.serial, buf))
     249
     250        if self.logTraffic:
     251            xs.rawDataInFn = logDataIn
     252            xs.rawDataOutFn = logDataOut
     253
     254        xs.addObserver(xmlstream.STREAM_ERROR_EVENT, self.onError)
     255
     256
     257    def connectionInitialized(self, xs):
     258        """
     259        Called when a component has succesfully authenticated.
     260
     261        Add the component to the routing table and establish a handler
     262        for a closed connection.
     263        """
     264        destination = xs.thisEntity.host
     265
     266        self.router.addRoute(destination, xs)
     267        xs.addObserver(xmlstream.STREAM_END_EVENT, self.connectionLost, 0,
     268                                                   destination, xs)
     269
     270
     271    def onError(self, reason):
     272        log.err(reason, "Stream Error")
     273
     274
     275    def connectionLost(self, destination, xs, reason):
     276        self.router.removeRoute(destination, xs)
Note: See TracChangeset for help on using the changeset viewer.