source: wokkel/component.py @ 35:eb020b49a77d

Last change on this file since 35:eb020b49a77d was 35:eb020b49a77d, checked in by Ralph Meijer <ralphm@…>, 14 years ago

Add XMPP router and server side component authenticator.

Author: ralphm.
Fixed #30.

File size: 8.2 KB
Line 
1# -*- test-case-name: wokkel.test.test_component -*-
2#
3# Copyright (c) 2003-2008 Ralph Meijer
4# See LICENSE for details.
5
6"""
7XMPP External Component utilities
8"""
9
10from twisted.application import service
11from twisted.internet import reactor
12from twisted.python import log
13from twisted.words.protocols.jabber.jid import internJID as JID
14from twisted.words.protocols.jabber import component, error, xmlstream
15from twisted.words.xish import domish
16
17from wokkel.subprotocols import StreamManager
18
19NS_COMPONENT_ACCEPT = 'jabber:component:accept'
20
21class Component(StreamManager, service.Service):
22    def __init__(self, host, port, jid, password):
23        self.host = host
24        self.port = port
25
26        factory = component.componentFactory(jid, password)
27
28        StreamManager.__init__(self, factory)
29
30    def _authd(self, xs):
31        old_send = xs.send
32
33        def send(obj):
34            if domish.IElement.providedBy(obj) and \
35                    not obj.getAttribute('from'):
36                obj['from'] = self.xmlstream.thisEntity.full()
37            old_send(obj)
38
39        xs.send = send
40        StreamManager._authd(self, xs)
41
42    def initializationFailed(self, reason):
43        """
44        Called when stream initialization has failed.
45
46        Stop the service (thereby disconnecting the current stream) and
47        raise the exception.
48        """
49        self.stopService()
50        reason.raiseException()
51
52    def startService(self):
53        service.Service.startService(self)
54
55        self.factory.stopTrying()
56        self._connection = self._getConnection()
57
58    def stopService(self):
59        service.Service.stopService(self)
60
61        self._connection.disconnect()
62
63    def _getConnection(self):
64        return reactor.connectTCP(self.host, self.port, self.factory)
65
66
67
68class ListenComponentAuthenticator(xmlstream.ListenAuthenticator):
69    """
70    Authenticator for accepting components.
71    """
72    namespace = NS_COMPONENT_ACCEPT
73
74    def __init__(self, secret):
75        self.secret = secret
76        xmlstream.ListenAuthenticator.__init__(self)
77
78
79    def associateWithStream(self, xs):
80        xs.version = (0, 0)
81        xmlstream.ListenAuthenticator.associateWithStream(self, xs)
82
83
84    def streamStarted(self, rootElement):
85        xmlstream.ListenAuthenticator.streamStarted(self, rootElement)
86
87        if rootElement.defaultUri != self.namespace:
88            exc = error.StreamError('invalid-namespace')
89            self.xmlstream.sendStreamError(exc)
90            return
91
92        # self.xmlstream.thisEntity is set to the address the component
93        # wants to assume. This should probably be checked.
94        if not self.xmlstream.thisEntity:
95            exc = error.StreamError('improper-addressing')
96            self.xmlstream.sendStreamError(exc)
97            return
98
99        self.xmlstream.sid = 'random' # FIXME
100
101        self.xmlstream.sendHeader()
102        self.xmlstream.addOnetimeObserver('/*', self.onElement)
103
104
105    def onElement(self, element):
106        if (element.uri, element.name) == (self.namespace, 'handshake'):
107            self.onHandshake(unicode(element))
108        else:
109            exc = error.streamError('not-authorized')
110            self.xmlstream.sendStreamError(exc)
111
112
113    def onHandshake(self, handshake):
114        calculatedHash = xmlstream.hashPassword(self.xmlstream.sid, self.secret)
115        if handshake != calculatedHash:
116            exc = error.StreamError('not-authorized', text='Invalid hash')
117            self.xmlstream.sendStreamError(exc)
118        else:
119            self.xmlstream.send('<handshake/>')
120            self.xmlstream.dispatch(self.xmlstream,
121                                    xmlstream.STREAM_AUTHD_EVENT)
122
123
124
125class RouterService(service.Service):
126    """
127    XMPP Server's Router Service.
128
129    This service connects the different components of the XMPP service and
130    routes messages between them based on the given routing table.
131
132    Connected components are trusted to have correct addressing in the
133    stanzas they offer for routing.
134
135    A route destination of C{None} adds a default route. Traffic for which no
136    specific route exists, will be routed to this default route.
137
138    @ivar routes: Routes based on the host part of JIDs. Maps host names to the
139                  L{EventDispatcher<utility.EventDispatcher>}s that should
140                  receive the traffic. A key of C{None} means the default
141                  route.
142    @type routes: C{dict}
143    """
144
145    def __init__(self):
146        self.routes = {}
147
148
149    def addRoute(self, destination, xs):
150        """
151        Add a new route.
152
153        The passed XML Stream C{xs} will have an observer for all stanzas
154        added to route its outgoing traffic. In turn, traffic for
155        C{destination} will be passed to this stream.
156
157        @param destination: Destination of the route to be added as a host name
158                            or C{None} for the default route.
159        @type destination: C{str} or C{NoneType}.
160        @param xs: XML Stream to register the route for.
161        @type xs: L{EventDispatcher<utility.EventDispatcher>}.
162        """
163        self.routes[destination] = xs
164        xs.addObserver('/*', self.route)
165
166
167    def removeRoute(self, destination, xs):
168        """
169        Remove a route.
170
171        @param destination: Destination of the route that should be removed.
172        @type destination: C{str}.
173        @param xs: XML Stream to remove the route for.
174        @type xs: L{EventDispatcher<utility.EventDispatcher>}.
175        """
176        xs.removeObserver('/*', self.route)
177        if (xs == self.routes[destination]):
178            del self.routes[destination]
179
180
181    def route(self, stanza):
182        """
183        Route a stanza.
184
185        @param stanza: The stanza to be routed.
186        @type stanza: L{domish.Element}.
187        """
188        if not list(stanza.elements()):
189            return
190
191        destination = JID(stanza['to'])
192
193        log.msg("Routing to %s: %r" % (destination.full(), stanza.toXml()))
194
195        if destination.host in self.routes:
196            self.routes[destination.host].send(stanza)
197        else:
198            self.routes[None].send(stanza)
199
200
201
202class ComponentServer(service.Service):
203    """
204    XMPP Component Server service.
205
206    This service accepts XMPP external component connections and makes
207    the router service route traffic for a component's bound domain
208    to that component.
209    """
210
211    logTraffic = False
212
213    def __init__(self, router, port=5347, secret='secret'):
214        self.router = router
215        self.port = port
216        self.secret = secret
217
218        def authenticatorFactory():
219            return ListenComponentAuthenticator(self.secret)
220
221        self.factory = xmlstream.XmlStreamServerFactory(authenticatorFactory)
222        self.factory.addBootstrap(xmlstream.STREAM_CONNECTED_EVENT,
223                                  self.makeConnection)
224        self.factory.addBootstrap(xmlstream.STREAM_AUTHD_EVENT,
225                                  self.connectionInitialized)
226
227        self.serial = 0
228
229
230    def startService(self):
231        service.Service.startService(self)
232        reactor.listenTCP(self.port, self.factory)
233
234
235    def makeConnection(self, xs):
236        """
237        Called when a component connection was made.
238
239        This enables traffic debugging on incoming streams.
240        """
241        xs.serial = self.serial
242        self.serial += 1
243
244        def logDataIn(buf):
245            log.msg("RECV (%d): %r" % (xs.serial, buf))
246
247        def logDataOut(buf):
248            log.msg("SEND (%d): %r" % (xs.serial, buf))
249
250        if self.logTraffic:
251            xs.rawDataInFn = logDataIn
252            xs.rawDataOutFn = logDataOut
253
254        xs.addObserver(xmlstream.STREAM_ERROR_EVENT, self.onError)
255
256
257    def connectionInitialized(self, xs):
258        """
259        Called when a component has succesfully authenticated.
260
261        Add the component to the routing table and establish a handler
262        for a closed connection.
263        """
264        destination = xs.thisEntity.host
265
266        self.router.addRoute(destination, xs)
267        xs.addObserver(xmlstream.STREAM_END_EVENT, self.connectionLost, 0,
268                                                   destination, xs)
269
270
271    def onError(self, reason):
272        log.err(reason, "Stream Error")
273
274
275    def connectionLost(self, destination, xs, reason):
276        self.router.removeRoute(destination, xs)
Note: See TracBrowser for help on using the repository browser.