source: wokkel/component.py @ 169:bb939a909750

Last change on this file since 169:bb939a909750 was 169:bb939a909750, checked in by Ralph Meijer <ralphm@…>, 10 years ago

Let wokkel.component.Component reconnect if first attempt fails.

Author: ralphm.
Fixes #75.

  • Property exe set to *
File size: 12.2 KB
RevLine 
[35]1# -*- test-case-name: wokkel.test.test_component -*-
2#
[96]3# Copyright (c) Ralph Meijer.
[1]4# See LICENSE for details.
5
6"""
[169]7XMPP External Component utilities.
[1]8"""
9
10from twisted.application import service
11from twisted.internet import reactor
[35]12from twisted.python import log
13from twisted.words.protocols.jabber.jid import internJID as JID
14from twisted.words.protocols.jabber import component, error, xmlstream
[1]15from twisted.words.xish import domish
16
[39]17from wokkel.generic import XmlPipe
[1]18from wokkel.subprotocols import StreamManager
19
[35]20NS_COMPONENT_ACCEPT = 'jabber:component:accept'
21
[1]22class Component(StreamManager, service.Service):
[169]23    """
24    XMPP External Component service.
25
26    This service is a XMPP stream manager that connects as an External
27    Component to an XMPP server, as described in
28    U{XEP-0114<http://xmpp.org/extensions/xep-0114.html>}.
29    """
[1]30    def __init__(self, host, port, jid, password):
31        self.host = host
32        self.port = port
33
34        factory = component.componentFactory(jid, password)
35
36        StreamManager.__init__(self, factory)
37
[169]38
[1]39    def _authd(self, xs):
[169]40        """
41        Called when stream initialization has completed.
42
43        This replaces the C{send} method of the C{XmlStream} instance
44        that represents the current connection so that outgoing stanzas
45        always have a from attribute set to the JID of the component.
46        """
[1]47        old_send = xs.send
48
49        def send(obj):
50            if domish.IElement.providedBy(obj) and \
51                    not obj.getAttribute('from'):
[6]52                obj['from'] = self.xmlstream.thisEntity.full()
[1]53            old_send(obj)
54
55        xs.send = send
56        StreamManager._authd(self, xs)
57
[169]58
[1]59    def initializationFailed(self, reason):
60        """
61        Called when stream initialization has failed.
62
63        Stop the service (thereby disconnecting the current stream) and
64        raise the exception.
65        """
66        self.stopService()
67        reason.raiseException()
68
[169]69
[1]70    def startService(self):
[169]71        """
72        Start the service and connect to the server.
73        """
[1]74        service.Service.startService(self)
75
76        self._connection = self._getConnection()
77
[169]78
[1]79    def stopService(self):
[169]80        """
81        Stop the service, close the connection and prevent reconnects.
82        """
[1]83        service.Service.stopService(self)
84
[169]85        self.factory.stopTrying()
[1]86        self._connection.disconnect()
87
[169]88
[1]89    def _getConnection(self):
[169]90        """
91        Create a connector that connects to the server.
92        """
[1]93        return reactor.connectTCP(self.host, self.port, self.factory)
[35]94
95
96
[165]97class InternalComponent(xmlstream.XMPPHandlerCollection, service.Service):
[39]98    """
99    Component service that connects directly to a router.
100
101    Instead of opening a socket to connect to a router, like L{Component},
102    components of this type connect to a router in the same process. This
103    allows for one-process XMPP servers.
[54]104
105    @ivar domains: Domains (as C{str}) this component will handle traffic for.
[166]106    @type domains: C{set}
[39]107    """
108
[54]109    def __init__(self, router, domain=None):
[165]110        xmlstream.XMPPHandlerCollection.__init__(self)
[54]111
112        self._router = router
113        self.domains = set()
114        if domain:
115            self.domains.add(domain)
[39]116
117        self.xmlstream = None
118
119    def startService(self):
120        """
121        Create a XML pipe, connect to the router and setup handlers.
122        """
123        service.Service.startService(self)
124
[54]125        self._pipe = XmlPipe()
126        self.xmlstream = self._pipe.source
127
128        for domain in self.domains:
129            self._router.addRoute(domain, self._pipe.sink)
[39]130
131        for e in self:
132            e.makeConnection(self.xmlstream)
133            e.connectionInitialized()
134
135
136    def stopService(self):
137        """
138        Disconnect from the router and handlers.
139        """
140        service.Service.stopService(self)
141
[54]142        for domain in self.domains:
143            self._router.removeRoute(domain, self._pipe.sink)
144
145        self._pipe = None
[39]146        self.xmlstream = None
147
148        for e in self:
149            e.connectionLost(None)
150
151
152    def addHandler(self, handler):
153        """
154        Add a new handler and connect it to the stream.
155        """
[165]156        xmlstream.XMPPHandlerCollection.addHandler(self, handler)
[39]157
158        if self.xmlstream:
159            handler.makeConnection(self.xmlstream)
160            handler.connectionInitialized()
161
162
163    def send(self, obj):
164        """
165        Send data to the XML stream, so it ends up at the router.
166        """
167        self.xmlstream.send(obj)
168
169
170
[35]171class ListenComponentAuthenticator(xmlstream.ListenAuthenticator):
172    """
173    Authenticator for accepting components.
[40]174
175    @ivar secret: The shared used to authorized incoming component connections.
[76]176    @type secret: C{unicode}.
[35]177    """
[40]178
[35]179    namespace = NS_COMPONENT_ACCEPT
180
181    def __init__(self, secret):
182        self.secret = secret
183        xmlstream.ListenAuthenticator.__init__(self)
184
185
186    def associateWithStream(self, xs):
[40]187        """
188        Associate the authenticator with a stream.
189
190        This sets the stream's version to 0.0, because the XEP-0114 component
191        protocol was not designed for XMPP 1.0.
192        """
[35]193        xs.version = (0, 0)
194        xmlstream.ListenAuthenticator.associateWithStream(self, xs)
195
196
197    def streamStarted(self, rootElement):
[40]198        """
199        Called by the stream when it has started.
200
201        This examines the default namespace of the incoming stream and whether
202        there is a requested hostname for the component. Then it generates a
203        stream identifier, sends a response header and adds an observer for
204        the first incoming element, triggering L{onElement}.
205        """
206
[35]207        xmlstream.ListenAuthenticator.streamStarted(self, rootElement)
208
[41]209        # Compatibility fix for pre-8.2 implementations of ListenAuthenticator
[40]210        if not self.xmlstream.sid:
211            from twisted.python import randbytes
212            self.xmlstream.sid = randbytes.secureRandom(8).encode('hex')
213
[35]214        if rootElement.defaultUri != self.namespace:
215            exc = error.StreamError('invalid-namespace')
216            self.xmlstream.sendStreamError(exc)
217            return
218
219        # self.xmlstream.thisEntity is set to the address the component
[40]220        # wants to assume.
[35]221        if not self.xmlstream.thisEntity:
222            exc = error.StreamError('improper-addressing')
223            self.xmlstream.sendStreamError(exc)
224            return
225
226        self.xmlstream.sendHeader()
227        self.xmlstream.addOnetimeObserver('/*', self.onElement)
228
229
230    def onElement(self, element):
[40]231        """
232        Called on incoming XML Stanzas.
233
234        The very first element received should be a request for handshake.
235        Otherwise, the stream is dropped with a 'not-authorized' error. If a
236        handshake request was received, the hash is extracted and passed to
237        L{onHandshake}.
238        """
[35]239        if (element.uri, element.name) == (self.namespace, 'handshake'):
240            self.onHandshake(unicode(element))
241        else:
[40]242            exc = error.StreamError('not-authorized')
[35]243            self.xmlstream.sendStreamError(exc)
244
245
246    def onHandshake(self, handshake):
[40]247        """
248        Called upon receiving the handshake request.
249
250        This checks that the given hash in C{handshake} is equal to a
251        calculated hash, responding with a handshake reply or a stream error.
252        If the handshake was ok, the stream is authorized, and  XML Stanzas may
253        be exchanged.
254        """
[76]255        calculatedHash = xmlstream.hashPassword(self.xmlstream.sid,
256                                                unicode(self.secret))
[35]257        if handshake != calculatedHash:
258            exc = error.StreamError('not-authorized', text='Invalid hash')
259            self.xmlstream.sendStreamError(exc)
260        else:
261            self.xmlstream.send('<handshake/>')
262            self.xmlstream.dispatch(self.xmlstream,
263                                    xmlstream.STREAM_AUTHD_EVENT)
264
265
266
[40]267class Router(object):
[35]268    """
[40]269    XMPP Server's Router.
[35]270
[40]271    A router connects the different components of the XMPP service and routes
272    messages between them based on the given routing table.
[35]273
274    Connected components are trusted to have correct addressing in the
275    stanzas they offer for routing.
276
277    A route destination of C{None} adds a default route. Traffic for which no
278    specific route exists, will be routed to this default route.
279
280    @ivar routes: Routes based on the host part of JIDs. Maps host names to the
[166]281        L{EventDispatcher<twisted.words.xish.utility.EventDispatcher>}s that
282        should receive the traffic. A key of C{None} means the default route.
[35]283    @type routes: C{dict}
284    """
285
286    def __init__(self):
287        self.routes = {}
288
289
290    def addRoute(self, destination, xs):
291        """
292        Add a new route.
293
294        The passed XML Stream C{xs} will have an observer for all stanzas
295        added to route its outgoing traffic. In turn, traffic for
296        C{destination} will be passed to this stream.
297
298        @param destination: Destination of the route to be added as a host name
299                            or C{None} for the default route.
[166]300        @type destination: C{str} or C{NoneType}
301
[35]302        @param xs: XML Stream to register the route for.
[166]303        @type xs:
304            L{EventDispatcher<twisted.words.xish.utility.EventDispatcher>}
[35]305        """
306        self.routes[destination] = xs
307        xs.addObserver('/*', self.route)
308
309
310    def removeRoute(self, destination, xs):
311        """
312        Remove a route.
313
314        @param destination: Destination of the route that should be removed.
315        @type destination: C{str}.
[166]316
[35]317        @param xs: XML Stream to remove the route for.
[166]318        @type xs:
319            L{EventDispatcher<twisted.words.xish.utility.EventDispatcher>}
[35]320        """
321        xs.removeObserver('/*', self.route)
322        if (xs == self.routes[destination]):
323            del self.routes[destination]
324
325
326    def route(self, stanza):
327        """
328        Route a stanza.
329
330        @param stanza: The stanza to be routed.
331        @type stanza: L{domish.Element}.
332        """
333        destination = JID(stanza['to'])
334
335        log.msg("Routing to %s: %r" % (destination.full(), stanza.toXml()))
336
337        if destination.host in self.routes:
338            self.routes[destination.host].send(stanza)
339        else:
340            self.routes[None].send(stanza)
341
342
343
[165]344class XMPPComponentServerFactory(xmlstream.XmlStreamServerFactory):
[35]345    """
[40]346    XMPP Component Server factory.
[35]347
[40]348    This factory accepts XMPP external component connections and makes
[35]349    the router service route traffic for a component's bound domain
350    to that component.
351    """
352
353    logTraffic = False
354
[40]355    def __init__(self, router, secret='secret'):
[35]356        self.router = router
357        self.secret = secret
358
359        def authenticatorFactory():
360            return ListenComponentAuthenticator(self.secret)
361
[165]362        xmlstream.XmlStreamServerFactory.__init__(self, authenticatorFactory)
[40]363        self.addBootstrap(xmlstream.STREAM_CONNECTED_EVENT,
364                          self.makeConnection)
365        self.addBootstrap(xmlstream.STREAM_AUTHD_EVENT,
366                          self.connectionInitialized)
[35]367
368        self.serial = 0
369
370
371    def makeConnection(self, xs):
372        """
373        Called when a component connection was made.
374
375        This enables traffic debugging on incoming streams.
376        """
377        xs.serial = self.serial
378        self.serial += 1
379
380        def logDataIn(buf):
381            log.msg("RECV (%d): %r" % (xs.serial, buf))
382
383        def logDataOut(buf):
384            log.msg("SEND (%d): %r" % (xs.serial, buf))
385
386        if self.logTraffic:
387            xs.rawDataInFn = logDataIn
388            xs.rawDataOutFn = logDataOut
389
390        xs.addObserver(xmlstream.STREAM_ERROR_EVENT, self.onError)
391
392
393    def connectionInitialized(self, xs):
394        """
395        Called when a component has succesfully authenticated.
396
397        Add the component to the routing table and establish a handler
398        for a closed connection.
399        """
400        destination = xs.thisEntity.host
401
402        self.router.addRoute(destination, xs)
403        xs.addObserver(xmlstream.STREAM_END_EVENT, self.connectionLost, 0,
404                                                   destination, xs)
405
406
407    def onError(self, reason):
408        log.err(reason, "Stream Error")
409
410
411    def connectionLost(self, destination, xs, reason):
412        self.router.removeRoute(destination, xs)
Note: See TracBrowser for help on using the repository browser.