source: wokkel/component.py @ 40:2bcb10fb4da4

Last change on this file since 40:2bcb10fb4da4 was 40:2bcb10fb4da4, checked in by Ralph Meijer <ralphm@…>, 14 years ago

Track changes to Twisted branch for the XMPP router.

File size: 11.0 KB
Line 
1# -*- test-case-name: wokkel.test.test_component -*-
2#
3# Copyright (c) 2003-2008 Ralph Meijer
4# See LICENSE for details.
5
6"""
7XMPP External Component utilities
8"""
9
10from twisted.application import service
11from twisted.internet import reactor
12from twisted.python import log
13from twisted.words.protocols.jabber.jid import internJID as JID
14from twisted.words.protocols.jabber import component, error, xmlstream
15from twisted.words.xish import domish
16
17from wokkel.generic import XmlPipe
18from wokkel.subprotocols import StreamManager
19
20NS_COMPONENT_ACCEPT = 'jabber:component:accept'
21
22class Component(StreamManager, service.Service):
23    def __init__(self, host, port, jid, password):
24        self.host = host
25        self.port = port
26
27        factory = component.componentFactory(jid, password)
28
29        StreamManager.__init__(self, factory)
30
31    def _authd(self, xs):
32        old_send = xs.send
33
34        def send(obj):
35            if domish.IElement.providedBy(obj) and \
36                    not obj.getAttribute('from'):
37                obj['from'] = self.xmlstream.thisEntity.full()
38            old_send(obj)
39
40        xs.send = send
41        StreamManager._authd(self, xs)
42
43    def initializationFailed(self, reason):
44        """
45        Called when stream initialization has failed.
46
47        Stop the service (thereby disconnecting the current stream) and
48        raise the exception.
49        """
50        self.stopService()
51        reason.raiseException()
52
53    def startService(self):
54        service.Service.startService(self)
55
56        self.factory.stopTrying()
57        self._connection = self._getConnection()
58
59    def stopService(self):
60        service.Service.stopService(self)
61
62        self._connection.disconnect()
63
64    def _getConnection(self):
65        return reactor.connectTCP(self.host, self.port, self.factory)
66
67
68
69class InternalComponent(xmlstream.XMPPHandlerCollection, service.Service):
70    """
71    Component service that connects directly to a router.
72
73    Instead of opening a socket to connect to a router, like L{Component},
74    components of this type connect to a router in the same process. This
75    allows for one-process XMPP servers.
76    """
77
78    def __init__(self, router, domain):
79        xmlstream.XMPPHandlerCollection.__init__(self)
80        self.router = router
81        self.domain = domain
82
83        self.xmlstream = None
84
85    def startService(self):
86        """
87        Create a XML pipe, connect to the router and setup handlers.
88        """
89        service.Service.startService(self)
90
91        self.pipe = XmlPipe()
92        self.xmlstream = self.pipe.source
93        self.router.addRoute(self.domain, self.pipe.sink)
94
95        for e in self:
96            e.makeConnection(self.xmlstream)
97            e.connectionInitialized()
98
99
100    def stopService(self):
101        """
102        Disconnect from the router and handlers.
103        """
104        service.Service.stopService(self)
105
106        self.router.removeRoute(self.domain, self.pipe.sink)
107        self.pipe = None
108        self.xmlstream = None
109
110        for e in self:
111            e.connectionLost(None)
112
113
114    def addHandler(self, handler):
115        """
116        Add a new handler and connect it to the stream.
117        """
118        xmlstream.XMPPHandlerCollection.addHandler(self, handler)
119
120        if self.xmlstream:
121            handler.makeConnection(self.xmlstream)
122            handler.connectionInitialized()
123
124
125    def send(self, obj):
126        """
127        Send data to the XML stream, so it ends up at the router.
128        """
129        self.xmlstream.send(obj)
130
131
132
133class ListenComponentAuthenticator(xmlstream.ListenAuthenticator):
134    """
135    Authenticator for accepting components.
136
137    @ivar secret: The shared used to authorized incoming component connections.
138    @type secret: C{str}.
139    """
140
141    namespace = NS_COMPONENT_ACCEPT
142
143    def __init__(self, secret):
144        self.secret = secret
145        xmlstream.ListenAuthenticator.__init__(self)
146
147
148    def associateWithStream(self, xs):
149        """
150        Associate the authenticator with a stream.
151
152        This sets the stream's version to 0.0, because the XEP-0114 component
153        protocol was not designed for XMPP 1.0.
154        """
155        xs.version = (0, 0)
156        xmlstream.ListenAuthenticator.associateWithStream(self, xs)
157
158
159    def streamStarted(self, rootElement):
160        """
161        Called by the stream when it has started.
162
163        This examines the default namespace of the incoming stream and whether
164        there is a requested hostname for the component. Then it generates a
165        stream identifier, sends a response header and adds an observer for
166        the first incoming element, triggering L{onElement}.
167        """
168
169        xmlstream.ListenAuthenticator.streamStarted(self, rootElement)
170
171        # Compatibility fix
172        if not self.xmlstream.sid:
173            from twisted.python import randbytes
174            self.xmlstream.sid = randbytes.secureRandom(8).encode('hex')
175
176        if rootElement.defaultUri != self.namespace:
177            exc = error.StreamError('invalid-namespace')
178            self.xmlstream.sendStreamError(exc)
179            return
180
181        # self.xmlstream.thisEntity is set to the address the component
182        # wants to assume.
183        if not self.xmlstream.thisEntity:
184            exc = error.StreamError('improper-addressing')
185            self.xmlstream.sendStreamError(exc)
186            return
187
188        self.xmlstream.sendHeader()
189        self.xmlstream.addOnetimeObserver('/*', self.onElement)
190
191
192    def onElement(self, element):
193        """
194        Called on incoming XML Stanzas.
195
196        The very first element received should be a request for handshake.
197        Otherwise, the stream is dropped with a 'not-authorized' error. If a
198        handshake request was received, the hash is extracted and passed to
199        L{onHandshake}.
200        """
201        if (element.uri, element.name) == (self.namespace, 'handshake'):
202            self.onHandshake(unicode(element))
203        else:
204            exc = error.StreamError('not-authorized')
205            self.xmlstream.sendStreamError(exc)
206
207
208    def onHandshake(self, handshake):
209        """
210        Called upon receiving the handshake request.
211
212        This checks that the given hash in C{handshake} is equal to a
213        calculated hash, responding with a handshake reply or a stream error.
214        If the handshake was ok, the stream is authorized, and  XML Stanzas may
215        be exchanged.
216        """
217        calculatedHash = xmlstream.hashPassword(self.xmlstream.sid, self.secret)
218        if handshake != calculatedHash:
219            exc = error.StreamError('not-authorized', text='Invalid hash')
220            self.xmlstream.sendStreamError(exc)
221        else:
222            self.xmlstream.send('<handshake/>')
223            self.xmlstream.dispatch(self.xmlstream,
224                                    xmlstream.STREAM_AUTHD_EVENT)
225
226
227
228class Router(object):
229    """
230    XMPP Server's Router.
231
232    A router connects the different components of the XMPP service and routes
233    messages between them based on the given routing table.
234
235    Connected components are trusted to have correct addressing in the
236    stanzas they offer for routing.
237
238    A route destination of C{None} adds a default route. Traffic for which no
239    specific route exists, will be routed to this default route.
240
241    @ivar routes: Routes based on the host part of JIDs. Maps host names to the
242                  L{EventDispatcher<utility.EventDispatcher>}s that should
243                  receive the traffic. A key of C{None} means the default
244                  route.
245    @type routes: C{dict}
246    """
247
248    def __init__(self):
249        self.routes = {}
250
251
252    def addRoute(self, destination, xs):
253        """
254        Add a new route.
255
256        The passed XML Stream C{xs} will have an observer for all stanzas
257        added to route its outgoing traffic. In turn, traffic for
258        C{destination} will be passed to this stream.
259
260        @param destination: Destination of the route to be added as a host name
261                            or C{None} for the default route.
262        @type destination: C{str} or C{NoneType}.
263        @param xs: XML Stream to register the route for.
264        @type xs: L{EventDispatcher<utility.EventDispatcher>}.
265        """
266        self.routes[destination] = xs
267        xs.addObserver('/*', self.route)
268
269
270    def removeRoute(self, destination, xs):
271        """
272        Remove a route.
273
274        @param destination: Destination of the route that should be removed.
275        @type destination: C{str}.
276        @param xs: XML Stream to remove the route for.
277        @type xs: L{EventDispatcher<utility.EventDispatcher>}.
278        """
279        xs.removeObserver('/*', self.route)
280        if (xs == self.routes[destination]):
281            del self.routes[destination]
282
283
284    def route(self, stanza):
285        """
286        Route a stanza.
287
288        @param stanza: The stanza to be routed.
289        @type stanza: L{domish.Element}.
290        """
291        destination = JID(stanza['to'])
292
293        log.msg("Routing to %s: %r" % (destination.full(), stanza.toXml()))
294
295        if destination.host in self.routes:
296            self.routes[destination.host].send(stanza)
297        else:
298            self.routes[None].send(stanza)
299
300
301
302class XMPPComponentServerFactory(xmlstream.XmlStreamServerFactory):
303    """
304    XMPP Component Server factory.
305
306    This factory accepts XMPP external component connections and makes
307    the router service route traffic for a component's bound domain
308    to that component.
309    """
310
311    logTraffic = False
312
313    def __init__(self, router, secret='secret'):
314        self.router = router
315        self.secret = secret
316
317        def authenticatorFactory():
318            return ListenComponentAuthenticator(self.secret)
319
320        xmlstream.XmlStreamServerFactory.__init__(self, authenticatorFactory)
321        self.addBootstrap(xmlstream.STREAM_CONNECTED_EVENT,
322                          self.makeConnection)
323        self.addBootstrap(xmlstream.STREAM_AUTHD_EVENT,
324                          self.connectionInitialized)
325
326        self.serial = 0
327
328
329    def makeConnection(self, xs):
330        """
331        Called when a component connection was made.
332
333        This enables traffic debugging on incoming streams.
334        """
335        xs.serial = self.serial
336        self.serial += 1
337
338        def logDataIn(buf):
339            log.msg("RECV (%d): %r" % (xs.serial, buf))
340
341        def logDataOut(buf):
342            log.msg("SEND (%d): %r" % (xs.serial, buf))
343
344        if self.logTraffic:
345            xs.rawDataInFn = logDataIn
346            xs.rawDataOutFn = logDataOut
347
348        xs.addObserver(xmlstream.STREAM_ERROR_EVENT, self.onError)
349
350
351    def connectionInitialized(self, xs):
352        """
353        Called when a component has succesfully authenticated.
354
355        Add the component to the routing table and establish a handler
356        for a closed connection.
357        """
358        destination = xs.thisEntity.host
359
360        self.router.addRoute(destination, xs)
361        xs.addObserver(xmlstream.STREAM_END_EVENT, self.connectionLost, 0,
362                                                   destination, xs)
363
364
365    def onError(self, reason):
366        log.err(reason, "Stream Error")
367
368
369    def connectionLost(self, destination, xs, reason):
370        self.router.removeRoute(destination, xs)
Note: See TracBrowser for help on using the repository browser.