source: wokkel/component.py @ 169:bb939a909750

Last change on this file since 169:bb939a909750 was 169:bb939a909750, checked in by Ralph Meijer <ralphm@…>, 9 years ago

Let wokkel.component.Component reconnect if first attempt fails.

Author: ralphm.
Fixes #75.

  • Property exe set to *
File size: 12.2 KB
Line 
1# -*- test-case-name: wokkel.test.test_component -*-
2#
3# Copyright (c) Ralph Meijer.
4# See LICENSE for details.
5
6"""
7XMPP External Component utilities.
8"""
9
10from twisted.application import service
11from twisted.internet import reactor
12from twisted.python import log
13from twisted.words.protocols.jabber.jid import internJID as JID
14from twisted.words.protocols.jabber import component, error, xmlstream
15from twisted.words.xish import domish
16
17from wokkel.generic import XmlPipe
18from wokkel.subprotocols import StreamManager
19
20NS_COMPONENT_ACCEPT = 'jabber:component:accept'
21
22class Component(StreamManager, service.Service):
23    """
24    XMPP External Component service.
25
26    This service is a XMPP stream manager that connects as an External
27    Component to an XMPP server, as described in
28    U{XEP-0114<http://xmpp.org/extensions/xep-0114.html>}.
29    """
30    def __init__(self, host, port, jid, password):
31        self.host = host
32        self.port = port
33
34        factory = component.componentFactory(jid, password)
35
36        StreamManager.__init__(self, factory)
37
38
39    def _authd(self, xs):
40        """
41        Called when stream initialization has completed.
42
43        This replaces the C{send} method of the C{XmlStream} instance
44        that represents the current connection so that outgoing stanzas
45        always have a from attribute set to the JID of the component.
46        """
47        old_send = xs.send
48
49        def send(obj):
50            if domish.IElement.providedBy(obj) and \
51                    not obj.getAttribute('from'):
52                obj['from'] = self.xmlstream.thisEntity.full()
53            old_send(obj)
54
55        xs.send = send
56        StreamManager._authd(self, xs)
57
58
59    def initializationFailed(self, reason):
60        """
61        Called when stream initialization has failed.
62
63        Stop the service (thereby disconnecting the current stream) and
64        raise the exception.
65        """
66        self.stopService()
67        reason.raiseException()
68
69
70    def startService(self):
71        """
72        Start the service and connect to the server.
73        """
74        service.Service.startService(self)
75
76        self._connection = self._getConnection()
77
78
79    def stopService(self):
80        """
81        Stop the service, close the connection and prevent reconnects.
82        """
83        service.Service.stopService(self)
84
85        self.factory.stopTrying()
86        self._connection.disconnect()
87
88
89    def _getConnection(self):
90        """
91        Create a connector that connects to the server.
92        """
93        return reactor.connectTCP(self.host, self.port, self.factory)
94
95
96
97class InternalComponent(xmlstream.XMPPHandlerCollection, service.Service):
98    """
99    Component service that connects directly to a router.
100
101    Instead of opening a socket to connect to a router, like L{Component},
102    components of this type connect to a router in the same process. This
103    allows for one-process XMPP servers.
104
105    @ivar domains: Domains (as C{str}) this component will handle traffic for.
106    @type domains: C{set}
107    """
108
109    def __init__(self, router, domain=None):
110        xmlstream.XMPPHandlerCollection.__init__(self)
111
112        self._router = router
113        self.domains = set()
114        if domain:
115            self.domains.add(domain)
116
117        self.xmlstream = None
118
119    def startService(self):
120        """
121        Create a XML pipe, connect to the router and setup handlers.
122        """
123        service.Service.startService(self)
124
125        self._pipe = XmlPipe()
126        self.xmlstream = self._pipe.source
127
128        for domain in self.domains:
129            self._router.addRoute(domain, self._pipe.sink)
130
131        for e in self:
132            e.makeConnection(self.xmlstream)
133            e.connectionInitialized()
134
135
136    def stopService(self):
137        """
138        Disconnect from the router and handlers.
139        """
140        service.Service.stopService(self)
141
142        for domain in self.domains:
143            self._router.removeRoute(domain, self._pipe.sink)
144
145        self._pipe = None
146        self.xmlstream = None
147
148        for e in self:
149            e.connectionLost(None)
150
151
152    def addHandler(self, handler):
153        """
154        Add a new handler and connect it to the stream.
155        """
156        xmlstream.XMPPHandlerCollection.addHandler(self, handler)
157
158        if self.xmlstream:
159            handler.makeConnection(self.xmlstream)
160            handler.connectionInitialized()
161
162
163    def send(self, obj):
164        """
165        Send data to the XML stream, so it ends up at the router.
166        """
167        self.xmlstream.send(obj)
168
169
170
171class ListenComponentAuthenticator(xmlstream.ListenAuthenticator):
172    """
173    Authenticator for accepting components.
174
175    @ivar secret: The shared used to authorized incoming component connections.
176    @type secret: C{unicode}.
177    """
178
179    namespace = NS_COMPONENT_ACCEPT
180
181    def __init__(self, secret):
182        self.secret = secret
183        xmlstream.ListenAuthenticator.__init__(self)
184
185
186    def associateWithStream(self, xs):
187        """
188        Associate the authenticator with a stream.
189
190        This sets the stream's version to 0.0, because the XEP-0114 component
191        protocol was not designed for XMPP 1.0.
192        """
193        xs.version = (0, 0)
194        xmlstream.ListenAuthenticator.associateWithStream(self, xs)
195
196
197    def streamStarted(self, rootElement):
198        """
199        Called by the stream when it has started.
200
201        This examines the default namespace of the incoming stream and whether
202        there is a requested hostname for the component. Then it generates a
203        stream identifier, sends a response header and adds an observer for
204        the first incoming element, triggering L{onElement}.
205        """
206
207        xmlstream.ListenAuthenticator.streamStarted(self, rootElement)
208
209        # Compatibility fix for pre-8.2 implementations of ListenAuthenticator
210        if not self.xmlstream.sid:
211            from twisted.python import randbytes
212            self.xmlstream.sid = randbytes.secureRandom(8).encode('hex')
213
214        if rootElement.defaultUri != self.namespace:
215            exc = error.StreamError('invalid-namespace')
216            self.xmlstream.sendStreamError(exc)
217            return
218
219        # self.xmlstream.thisEntity is set to the address the component
220        # wants to assume.
221        if not self.xmlstream.thisEntity:
222            exc = error.StreamError('improper-addressing')
223            self.xmlstream.sendStreamError(exc)
224            return
225
226        self.xmlstream.sendHeader()
227        self.xmlstream.addOnetimeObserver('/*', self.onElement)
228
229
230    def onElement(self, element):
231        """
232        Called on incoming XML Stanzas.
233
234        The very first element received should be a request for handshake.
235        Otherwise, the stream is dropped with a 'not-authorized' error. If a
236        handshake request was received, the hash is extracted and passed to
237        L{onHandshake}.
238        """
239        if (element.uri, element.name) == (self.namespace, 'handshake'):
240            self.onHandshake(unicode(element))
241        else:
242            exc = error.StreamError('not-authorized')
243            self.xmlstream.sendStreamError(exc)
244
245
246    def onHandshake(self, handshake):
247        """
248        Called upon receiving the handshake request.
249
250        This checks that the given hash in C{handshake} is equal to a
251        calculated hash, responding with a handshake reply or a stream error.
252        If the handshake was ok, the stream is authorized, and  XML Stanzas may
253        be exchanged.
254        """
255        calculatedHash = xmlstream.hashPassword(self.xmlstream.sid,
256                                                unicode(self.secret))
257        if handshake != calculatedHash:
258            exc = error.StreamError('not-authorized', text='Invalid hash')
259            self.xmlstream.sendStreamError(exc)
260        else:
261            self.xmlstream.send('<handshake/>')
262            self.xmlstream.dispatch(self.xmlstream,
263                                    xmlstream.STREAM_AUTHD_EVENT)
264
265
266
267class Router(object):
268    """
269    XMPP Server's Router.
270
271    A router connects the different components of the XMPP service and routes
272    messages between them based on the given routing table.
273
274    Connected components are trusted to have correct addressing in the
275    stanzas they offer for routing.
276
277    A route destination of C{None} adds a default route. Traffic for which no
278    specific route exists, will be routed to this default route.
279
280    @ivar routes: Routes based on the host part of JIDs. Maps host names to the
281        L{EventDispatcher<twisted.words.xish.utility.EventDispatcher>}s that
282        should receive the traffic. A key of C{None} means the default route.
283    @type routes: C{dict}
284    """
285
286    def __init__(self):
287        self.routes = {}
288
289
290    def addRoute(self, destination, xs):
291        """
292        Add a new route.
293
294        The passed XML Stream C{xs} will have an observer for all stanzas
295        added to route its outgoing traffic. In turn, traffic for
296        C{destination} will be passed to this stream.
297
298        @param destination: Destination of the route to be added as a host name
299                            or C{None} for the default route.
300        @type destination: C{str} or C{NoneType}
301
302        @param xs: XML Stream to register the route for.
303        @type xs:
304            L{EventDispatcher<twisted.words.xish.utility.EventDispatcher>}
305        """
306        self.routes[destination] = xs
307        xs.addObserver('/*', self.route)
308
309
310    def removeRoute(self, destination, xs):
311        """
312        Remove a route.
313
314        @param destination: Destination of the route that should be removed.
315        @type destination: C{str}.
316
317        @param xs: XML Stream to remove the route for.
318        @type xs:
319            L{EventDispatcher<twisted.words.xish.utility.EventDispatcher>}
320        """
321        xs.removeObserver('/*', self.route)
322        if (xs == self.routes[destination]):
323            del self.routes[destination]
324
325
326    def route(self, stanza):
327        """
328        Route a stanza.
329
330        @param stanza: The stanza to be routed.
331        @type stanza: L{domish.Element}.
332        """
333        destination = JID(stanza['to'])
334
335        log.msg("Routing to %s: %r" % (destination.full(), stanza.toXml()))
336
337        if destination.host in self.routes:
338            self.routes[destination.host].send(stanza)
339        else:
340            self.routes[None].send(stanza)
341
342
343
344class XMPPComponentServerFactory(xmlstream.XmlStreamServerFactory):
345    """
346    XMPP Component Server factory.
347
348    This factory accepts XMPP external component connections and makes
349    the router service route traffic for a component's bound domain
350    to that component.
351    """
352
353    logTraffic = False
354
355    def __init__(self, router, secret='secret'):
356        self.router = router
357        self.secret = secret
358
359        def authenticatorFactory():
360            return ListenComponentAuthenticator(self.secret)
361
362        xmlstream.XmlStreamServerFactory.__init__(self, authenticatorFactory)
363        self.addBootstrap(xmlstream.STREAM_CONNECTED_EVENT,
364                          self.makeConnection)
365        self.addBootstrap(xmlstream.STREAM_AUTHD_EVENT,
366                          self.connectionInitialized)
367
368        self.serial = 0
369
370
371    def makeConnection(self, xs):
372        """
373        Called when a component connection was made.
374
375        This enables traffic debugging on incoming streams.
376        """
377        xs.serial = self.serial
378        self.serial += 1
379
380        def logDataIn(buf):
381            log.msg("RECV (%d): %r" % (xs.serial, buf))
382
383        def logDataOut(buf):
384            log.msg("SEND (%d): %r" % (xs.serial, buf))
385
386        if self.logTraffic:
387            xs.rawDataInFn = logDataIn
388            xs.rawDataOutFn = logDataOut
389
390        xs.addObserver(xmlstream.STREAM_ERROR_EVENT, self.onError)
391
392
393    def connectionInitialized(self, xs):
394        """
395        Called when a component has succesfully authenticated.
396
397        Add the component to the routing table and establish a handler
398        for a closed connection.
399        """
400        destination = xs.thisEntity.host
401
402        self.router.addRoute(destination, xs)
403        xs.addObserver(xmlstream.STREAM_END_EVENT, self.connectionLost, 0,
404                                                   destination, xs)
405
406
407    def onError(self, reason):
408        log.err(reason, "Stream Error")
409
410
411    def connectionLost(self, destination, xs, reason):
412        self.router.removeRoute(destination, xs)
Note: See TracBrowser for help on using the repository browser.