source: wokkel/component.py @ 246:388bfe3e4ea7

Last change on this file since 246:388bfe3e4ea7 was 205:e861fc4596ed, checked in by Ralph Meijer <ralphm@…>, 6 years ago

imported patch py3-component.patch

File size: 12.9 KB
RevLine 
[35]1# -*- test-case-name: wokkel.test.test_component -*-
2#
[96]3# Copyright (c) Ralph Meijer.
[1]4# See LICENSE for details.
5
6"""
[169]7XMPP External Component utilities.
[1]8"""
9
[205]10from __future__ import division, absolute_import
11
[1]12from twisted.application import service
13from twisted.internet import reactor
[35]14from twisted.python import log
[205]15from twisted.python.compat import unicode
[35]16from twisted.words.protocols.jabber.jid import internJID as JID
17from twisted.words.protocols.jabber import component, error, xmlstream
[1]18from twisted.words.xish import domish
19
[39]20from wokkel.generic import XmlPipe
[1]21from wokkel.subprotocols import StreamManager
22
[35]23NS_COMPONENT_ACCEPT = 'jabber:component:accept'
24
[1]25class Component(StreamManager, service.Service):
[169]26    """
27    XMPP External Component service.
28
29    This service is a XMPP stream manager that connects as an External
30    Component to an XMPP server, as described in
31    U{XEP-0114<http://xmpp.org/extensions/xep-0114.html>}.
32    """
[1]33    def __init__(self, host, port, jid, password):
34        self.host = host
35        self.port = port
36
37        factory = component.componentFactory(jid, password)
38
39        StreamManager.__init__(self, factory)
40
[169]41
[1]42    def _authd(self, xs):
[169]43        """
44        Called when stream initialization has completed.
45
46        This replaces the C{send} method of the C{XmlStream} instance
47        that represents the current connection so that outgoing stanzas
48        always have a from attribute set to the JID of the component.
49        """
[1]50        old_send = xs.send
51
52        def send(obj):
53            if domish.IElement.providedBy(obj) and \
54                    not obj.getAttribute('from'):
[6]55                obj['from'] = self.xmlstream.thisEntity.full()
[1]56            old_send(obj)
57
58        xs.send = send
59        StreamManager._authd(self, xs)
60
[169]61
[1]62    def initializationFailed(self, reason):
63        """
64        Called when stream initialization has failed.
65
66        Stop the service (thereby disconnecting the current stream) and
67        raise the exception.
68        """
69        self.stopService()
70        reason.raiseException()
71
[169]72
[1]73    def startService(self):
[169]74        """
75        Start the service and connect to the server.
76        """
[1]77        service.Service.startService(self)
78
79        self._connection = self._getConnection()
80
[169]81
[1]82    def stopService(self):
[169]83        """
84        Stop the service, close the connection and prevent reconnects.
85        """
[1]86        service.Service.stopService(self)
87
[169]88        self.factory.stopTrying()
[1]89        self._connection.disconnect()
90
[169]91
[1]92    def _getConnection(self):
[169]93        """
94        Create a connector that connects to the server.
95        """
[1]96        return reactor.connectTCP(self.host, self.port, self.factory)
[35]97
98
99
[165]100class InternalComponent(xmlstream.XMPPHandlerCollection, service.Service):
[39]101    """
102    Component service that connects directly to a router.
103
104    Instead of opening a socket to connect to a router, like L{Component},
105    components of this type connect to a router in the same process. This
106    allows for one-process XMPP servers.
[54]107
[205]108    @ivar domains: Domains (as L{unicode}) this component will handle traffic
109        for.
110    @type domains: L{set}
[39]111    """
112
[54]113    def __init__(self, router, domain=None):
[165]114        xmlstream.XMPPHandlerCollection.__init__(self)
[54]115
116        self._router = router
117        self.domains = set()
118        if domain:
119            self.domains.add(domain)
[39]120
121        self.xmlstream = None
122
123    def startService(self):
124        """
125        Create a XML pipe, connect to the router and setup handlers.
126        """
127        service.Service.startService(self)
128
[54]129        self._pipe = XmlPipe()
130        self.xmlstream = self._pipe.source
131
132        for domain in self.domains:
133            self._router.addRoute(domain, self._pipe.sink)
[39]134
135        for e in self:
136            e.makeConnection(self.xmlstream)
137            e.connectionInitialized()
138
139
140    def stopService(self):
141        """
142        Disconnect from the router and handlers.
143        """
144        service.Service.stopService(self)
145
[54]146        for domain in self.domains:
147            self._router.removeRoute(domain, self._pipe.sink)
148
149        self._pipe = None
[39]150        self.xmlstream = None
151
152        for e in self:
153            e.connectionLost(None)
154
155
156    def addHandler(self, handler):
157        """
158        Add a new handler and connect it to the stream.
159        """
[165]160        xmlstream.XMPPHandlerCollection.addHandler(self, handler)
[39]161
162        if self.xmlstream:
163            handler.makeConnection(self.xmlstream)
164            handler.connectionInitialized()
165
166
167    def send(self, obj):
168        """
169        Send data to the XML stream, so it ends up at the router.
170        """
171        self.xmlstream.send(obj)
172
173
174
[35]175class ListenComponentAuthenticator(xmlstream.ListenAuthenticator):
176    """
177    Authenticator for accepting components.
[40]178
179    @ivar secret: The shared used to authorized incoming component connections.
[205]180    @type secret: L{unicode}.
[35]181    """
[40]182
[35]183    namespace = NS_COMPONENT_ACCEPT
184
185    def __init__(self, secret):
186        self.secret = secret
187        xmlstream.ListenAuthenticator.__init__(self)
188
189
190    def associateWithStream(self, xs):
[40]191        """
192        Associate the authenticator with a stream.
193
194        This sets the stream's version to 0.0, because the XEP-0114 component
195        protocol was not designed for XMPP 1.0.
196        """
[35]197        xs.version = (0, 0)
198        xmlstream.ListenAuthenticator.associateWithStream(self, xs)
199
200
201    def streamStarted(self, rootElement):
[40]202        """
203        Called by the stream when it has started.
204
205        This examines the default namespace of the incoming stream and whether
206        there is a requested hostname for the component. Then it generates a
207        stream identifier, sends a response header and adds an observer for
208        the first incoming element, triggering L{onElement}.
209        """
210
[35]211        xmlstream.ListenAuthenticator.streamStarted(self, rootElement)
212
[41]213        # Compatibility fix for pre-8.2 implementations of ListenAuthenticator
[40]214        if not self.xmlstream.sid:
215            from twisted.python import randbytes
216            self.xmlstream.sid = randbytes.secureRandom(8).encode('hex')
217
[35]218        if rootElement.defaultUri != self.namespace:
219            exc = error.StreamError('invalid-namespace')
220            self.xmlstream.sendStreamError(exc)
221            return
222
223        # self.xmlstream.thisEntity is set to the address the component
[40]224        # wants to assume.
[35]225        if not self.xmlstream.thisEntity:
226            exc = error.StreamError('improper-addressing')
227            self.xmlstream.sendStreamError(exc)
228            return
229
230        self.xmlstream.sendHeader()
231        self.xmlstream.addOnetimeObserver('/*', self.onElement)
232
233
234    def onElement(self, element):
[40]235        """
236        Called on incoming XML Stanzas.
237
238        The very first element received should be a request for handshake.
239        Otherwise, the stream is dropped with a 'not-authorized' error. If a
240        handshake request was received, the hash is extracted and passed to
241        L{onHandshake}.
242        """
[35]243        if (element.uri, element.name) == (self.namespace, 'handshake'):
244            self.onHandshake(unicode(element))
245        else:
[40]246            exc = error.StreamError('not-authorized')
[35]247            self.xmlstream.sendStreamError(exc)
248
249
250    def onHandshake(self, handshake):
[40]251        """
252        Called upon receiving the handshake request.
253
254        This checks that the given hash in C{handshake} is equal to a
255        calculated hash, responding with a handshake reply or a stream error.
256        If the handshake was ok, the stream is authorized, and  XML Stanzas may
257        be exchanged.
258        """
[76]259        calculatedHash = xmlstream.hashPassword(self.xmlstream.sid,
260                                                unicode(self.secret))
[35]261        if handshake != calculatedHash:
262            exc = error.StreamError('not-authorized', text='Invalid hash')
263            self.xmlstream.sendStreamError(exc)
264        else:
265            self.xmlstream.send('<handshake/>')
266            self.xmlstream.dispatch(self.xmlstream,
267                                    xmlstream.STREAM_AUTHD_EVENT)
268
269
270
[40]271class Router(object):
[35]272    """
[40]273    XMPP Server's Router.
[35]274
[40]275    A router connects the different components of the XMPP service and routes
276    messages between them based on the given routing table.
[35]277
278    Connected components are trusted to have correct addressing in the
279    stanzas they offer for routing.
280
[205]281    A route destination of L{None} adds a default route. Traffic for which no
[35]282    specific route exists, will be routed to this default route.
283
284    @ivar routes: Routes based on the host part of JIDs. Maps host names to the
[166]285        L{EventDispatcher<twisted.words.xish.utility.EventDispatcher>}s that
[205]286        should receive the traffic. A key of L{None} means the default route.
287    @type routes: L{dict}
[35]288    """
289
290    def __init__(self):
291        self.routes = {}
292
293
294    def addRoute(self, destination, xs):
295        """
296        Add a new route.
297
298        The passed XML Stream C{xs} will have an observer for all stanzas
299        added to route its outgoing traffic. In turn, traffic for
300        C{destination} will be passed to this stream.
301
302        @param destination: Destination of the route to be added as a host name
[205]303                            or L{None} for the default route.
304        @type destination: L{unicode} or L{NoneType}
[166]305
[35]306        @param xs: XML Stream to register the route for.
[166]307        @type xs:
308            L{EventDispatcher<twisted.words.xish.utility.EventDispatcher>}
[35]309        """
310        self.routes[destination] = xs
311        xs.addObserver('/*', self.route)
312
313
314    def removeRoute(self, destination, xs):
315        """
316        Remove a route.
317
318        @param destination: Destination of the route that should be removed.
[205]319        @type destination: L{unicode}
[166]320
[35]321        @param xs: XML Stream to remove the route for.
[166]322        @type xs:
323            L{EventDispatcher<twisted.words.xish.utility.EventDispatcher>}
[35]324        """
325        xs.removeObserver('/*', self.route)
326        if (xs == self.routes[destination]):
327            del self.routes[destination]
328
329
330    def route(self, stanza):
331        """
332        Route a stanza.
333
334        @param stanza: The stanza to be routed.
335        @type stanza: L{domish.Element}.
336        """
337        destination = JID(stanza['to'])
338
339
340        if destination.host in self.routes:
[176]341            log.msg("Routing to %s: %r" % (destination.full(),
342                                           stanza.toXml()))
[35]343            self.routes[destination.host].send(stanza)
[176]344        elif None in self.routes:
345            log.msg("Routing to %s (default route): %r" % (destination.full(),
346                                                           stanza.toXml()))
347            self.routes[None].send(stanza)
[35]348        else:
[176]349            log.msg("No route to %s: %r" % (destination.full(),
350                                            stanza.toXml()))
351            if stanza.getAttribute('type') not in ('result', 'error'):
352                # No route, send back error
353                exc = error.StanzaError('remote-server-timeout', type='wait')
354                exc.code = '504'
355                response = exc.toResponse(stanza)
356                self.route(response)
[35]357
358
359
[165]360class XMPPComponentServerFactory(xmlstream.XmlStreamServerFactory):
[35]361    """
[40]362    XMPP Component Server factory.
[35]363
[40]364    This factory accepts XMPP external component connections and makes
[35]365    the router service route traffic for a component's bound domain
366    to that component.
367    """
368
369    logTraffic = False
370
[40]371    def __init__(self, router, secret='secret'):
[35]372        self.router = router
373        self.secret = secret
374
375        def authenticatorFactory():
376            return ListenComponentAuthenticator(self.secret)
377
[165]378        xmlstream.XmlStreamServerFactory.__init__(self, authenticatorFactory)
[40]379        self.addBootstrap(xmlstream.STREAM_CONNECTED_EVENT,
380                          self.makeConnection)
381        self.addBootstrap(xmlstream.STREAM_AUTHD_EVENT,
382                          self.connectionInitialized)
[35]383
384        self.serial = 0
385
386
387    def makeConnection(self, xs):
388        """
389        Called when a component connection was made.
390
391        This enables traffic debugging on incoming streams.
392        """
393        xs.serial = self.serial
394        self.serial += 1
395
396        def logDataIn(buf):
397            log.msg("RECV (%d): %r" % (xs.serial, buf))
398
399        def logDataOut(buf):
400            log.msg("SEND (%d): %r" % (xs.serial, buf))
401
402        if self.logTraffic:
403            xs.rawDataInFn = logDataIn
404            xs.rawDataOutFn = logDataOut
405
406        xs.addObserver(xmlstream.STREAM_ERROR_EVENT, self.onError)
407
408
409    def connectionInitialized(self, xs):
410        """
411        Called when a component has succesfully authenticated.
412
413        Add the component to the routing table and establish a handler
414        for a closed connection.
415        """
416        destination = xs.thisEntity.host
417
418        self.router.addRoute(destination, xs)
419        xs.addObserver(xmlstream.STREAM_END_EVENT, self.connectionLost, 0,
420                                                   destination, xs)
421
422
423    def onError(self, reason):
424        log.err(reason, "Stream Error")
425
426
427    def connectionLost(self, destination, xs, reason):
428        self.router.removeRoute(destination, xs)
Note: See TracBrowser for help on using the repository browser.