source: wokkel/component.py @ 205:e861fc4596ed

Last change on this file since 205:e861fc4596ed was 205:e861fc4596ed, checked in by Ralph Meijer <ralphm@…>, 4 years ago

imported patch py3-component.patch

  • Property exe set to *
File size: 12.9 KB
Line 
1# -*- test-case-name: wokkel.test.test_component -*-
2#
3# Copyright (c) Ralph Meijer.
4# See LICENSE for details.
5
6"""
7XMPP External Component utilities.
8"""
9
10from __future__ import division, absolute_import
11
12from twisted.application import service
13from twisted.internet import reactor
14from twisted.python import log
15from twisted.python.compat import unicode
16from twisted.words.protocols.jabber.jid import internJID as JID
17from twisted.words.protocols.jabber import component, error, xmlstream
18from twisted.words.xish import domish
19
20from wokkel.generic import XmlPipe
21from wokkel.subprotocols import StreamManager
22
23NS_COMPONENT_ACCEPT = 'jabber:component:accept'
24
25class Component(StreamManager, service.Service):
26    """
27    XMPP External Component service.
28
29    This service is a XMPP stream manager that connects as an External
30    Component to an XMPP server, as described in
31    U{XEP-0114<http://xmpp.org/extensions/xep-0114.html>}.
32    """
33    def __init__(self, host, port, jid, password):
34        self.host = host
35        self.port = port
36
37        factory = component.componentFactory(jid, password)
38
39        StreamManager.__init__(self, factory)
40
41
42    def _authd(self, xs):
43        """
44        Called when stream initialization has completed.
45
46        This replaces the C{send} method of the C{XmlStream} instance
47        that represents the current connection so that outgoing stanzas
48        always have a from attribute set to the JID of the component.
49        """
50        old_send = xs.send
51
52        def send(obj):
53            if domish.IElement.providedBy(obj) and \
54                    not obj.getAttribute('from'):
55                obj['from'] = self.xmlstream.thisEntity.full()
56            old_send(obj)
57
58        xs.send = send
59        StreamManager._authd(self, xs)
60
61
62    def initializationFailed(self, reason):
63        """
64        Called when stream initialization has failed.
65
66        Stop the service (thereby disconnecting the current stream) and
67        raise the exception.
68        """
69        self.stopService()
70        reason.raiseException()
71
72
73    def startService(self):
74        """
75        Start the service and connect to the server.
76        """
77        service.Service.startService(self)
78
79        self._connection = self._getConnection()
80
81
82    def stopService(self):
83        """
84        Stop the service, close the connection and prevent reconnects.
85        """
86        service.Service.stopService(self)
87
88        self.factory.stopTrying()
89        self._connection.disconnect()
90
91
92    def _getConnection(self):
93        """
94        Create a connector that connects to the server.
95        """
96        return reactor.connectTCP(self.host, self.port, self.factory)
97
98
99
100class InternalComponent(xmlstream.XMPPHandlerCollection, service.Service):
101    """
102    Component service that connects directly to a router.
103
104    Instead of opening a socket to connect to a router, like L{Component},
105    components of this type connect to a router in the same process. This
106    allows for one-process XMPP servers.
107
108    @ivar domains: Domains (as L{unicode}) this component will handle traffic
109        for.
110    @type domains: L{set}
111    """
112
113    def __init__(self, router, domain=None):
114        xmlstream.XMPPHandlerCollection.__init__(self)
115
116        self._router = router
117        self.domains = set()
118        if domain:
119            self.domains.add(domain)
120
121        self.xmlstream = None
122
123    def startService(self):
124        """
125        Create a XML pipe, connect to the router and setup handlers.
126        """
127        service.Service.startService(self)
128
129        self._pipe = XmlPipe()
130        self.xmlstream = self._pipe.source
131
132        for domain in self.domains:
133            self._router.addRoute(domain, self._pipe.sink)
134
135        for e in self:
136            e.makeConnection(self.xmlstream)
137            e.connectionInitialized()
138
139
140    def stopService(self):
141        """
142        Disconnect from the router and handlers.
143        """
144        service.Service.stopService(self)
145
146        for domain in self.domains:
147            self._router.removeRoute(domain, self._pipe.sink)
148
149        self._pipe = None
150        self.xmlstream = None
151
152        for e in self:
153            e.connectionLost(None)
154
155
156    def addHandler(self, handler):
157        """
158        Add a new handler and connect it to the stream.
159        """
160        xmlstream.XMPPHandlerCollection.addHandler(self, handler)
161
162        if self.xmlstream:
163            handler.makeConnection(self.xmlstream)
164            handler.connectionInitialized()
165
166
167    def send(self, obj):
168        """
169        Send data to the XML stream, so it ends up at the router.
170        """
171        self.xmlstream.send(obj)
172
173
174
175class ListenComponentAuthenticator(xmlstream.ListenAuthenticator):
176    """
177    Authenticator for accepting components.
178
179    @ivar secret: The shared used to authorized incoming component connections.
180    @type secret: L{unicode}.
181    """
182
183    namespace = NS_COMPONENT_ACCEPT
184
185    def __init__(self, secret):
186        self.secret = secret
187        xmlstream.ListenAuthenticator.__init__(self)
188
189
190    def associateWithStream(self, xs):
191        """
192        Associate the authenticator with a stream.
193
194        This sets the stream's version to 0.0, because the XEP-0114 component
195        protocol was not designed for XMPP 1.0.
196        """
197        xs.version = (0, 0)
198        xmlstream.ListenAuthenticator.associateWithStream(self, xs)
199
200
201    def streamStarted(self, rootElement):
202        """
203        Called by the stream when it has started.
204
205        This examines the default namespace of the incoming stream and whether
206        there is a requested hostname for the component. Then it generates a
207        stream identifier, sends a response header and adds an observer for
208        the first incoming element, triggering L{onElement}.
209        """
210
211        xmlstream.ListenAuthenticator.streamStarted(self, rootElement)
212
213        # Compatibility fix for pre-8.2 implementations of ListenAuthenticator
214        if not self.xmlstream.sid:
215            from twisted.python import randbytes
216            self.xmlstream.sid = randbytes.secureRandom(8).encode('hex')
217
218        if rootElement.defaultUri != self.namespace:
219            exc = error.StreamError('invalid-namespace')
220            self.xmlstream.sendStreamError(exc)
221            return
222
223        # self.xmlstream.thisEntity is set to the address the component
224        # wants to assume.
225        if not self.xmlstream.thisEntity:
226            exc = error.StreamError('improper-addressing')
227            self.xmlstream.sendStreamError(exc)
228            return
229
230        self.xmlstream.sendHeader()
231        self.xmlstream.addOnetimeObserver('/*', self.onElement)
232
233
234    def onElement(self, element):
235        """
236        Called on incoming XML Stanzas.
237
238        The very first element received should be a request for handshake.
239        Otherwise, the stream is dropped with a 'not-authorized' error. If a
240        handshake request was received, the hash is extracted and passed to
241        L{onHandshake}.
242        """
243        if (element.uri, element.name) == (self.namespace, 'handshake'):
244            self.onHandshake(unicode(element))
245        else:
246            exc = error.StreamError('not-authorized')
247            self.xmlstream.sendStreamError(exc)
248
249
250    def onHandshake(self, handshake):
251        """
252        Called upon receiving the handshake request.
253
254        This checks that the given hash in C{handshake} is equal to a
255        calculated hash, responding with a handshake reply or a stream error.
256        If the handshake was ok, the stream is authorized, and  XML Stanzas may
257        be exchanged.
258        """
259        calculatedHash = xmlstream.hashPassword(self.xmlstream.sid,
260                                                unicode(self.secret))
261        if handshake != calculatedHash:
262            exc = error.StreamError('not-authorized', text='Invalid hash')
263            self.xmlstream.sendStreamError(exc)
264        else:
265            self.xmlstream.send('<handshake/>')
266            self.xmlstream.dispatch(self.xmlstream,
267                                    xmlstream.STREAM_AUTHD_EVENT)
268
269
270
271class Router(object):
272    """
273    XMPP Server's Router.
274
275    A router connects the different components of the XMPP service and routes
276    messages between them based on the given routing table.
277
278    Connected components are trusted to have correct addressing in the
279    stanzas they offer for routing.
280
281    A route destination of L{None} adds a default route. Traffic for which no
282    specific route exists, will be routed to this default route.
283
284    @ivar routes: Routes based on the host part of JIDs. Maps host names to the
285        L{EventDispatcher<twisted.words.xish.utility.EventDispatcher>}s that
286        should receive the traffic. A key of L{None} means the default route.
287    @type routes: L{dict}
288    """
289
290    def __init__(self):
291        self.routes = {}
292
293
294    def addRoute(self, destination, xs):
295        """
296        Add a new route.
297
298        The passed XML Stream C{xs} will have an observer for all stanzas
299        added to route its outgoing traffic. In turn, traffic for
300        C{destination} will be passed to this stream.
301
302        @param destination: Destination of the route to be added as a host name
303                            or L{None} for the default route.
304        @type destination: L{unicode} or L{NoneType}
305
306        @param xs: XML Stream to register the route for.
307        @type xs:
308            L{EventDispatcher<twisted.words.xish.utility.EventDispatcher>}
309        """
310        self.routes[destination] = xs
311        xs.addObserver('/*', self.route)
312
313
314    def removeRoute(self, destination, xs):
315        """
316        Remove a route.
317
318        @param destination: Destination of the route that should be removed.
319        @type destination: L{unicode}
320
321        @param xs: XML Stream to remove the route for.
322        @type xs:
323            L{EventDispatcher<twisted.words.xish.utility.EventDispatcher>}
324        """
325        xs.removeObserver('/*', self.route)
326        if (xs == self.routes[destination]):
327            del self.routes[destination]
328
329
330    def route(self, stanza):
331        """
332        Route a stanza.
333
334        @param stanza: The stanza to be routed.
335        @type stanza: L{domish.Element}.
336        """
337        destination = JID(stanza['to'])
338
339
340        if destination.host in self.routes:
341            log.msg("Routing to %s: %r" % (destination.full(),
342                                           stanza.toXml()))
343            self.routes[destination.host].send(stanza)
344        elif None in self.routes:
345            log.msg("Routing to %s (default route): %r" % (destination.full(),
346                                                           stanza.toXml()))
347            self.routes[None].send(stanza)
348        else:
349            log.msg("No route to %s: %r" % (destination.full(),
350                                            stanza.toXml()))
351            if stanza.getAttribute('type') not in ('result', 'error'):
352                # No route, send back error
353                exc = error.StanzaError('remote-server-timeout', type='wait')
354                exc.code = '504'
355                response = exc.toResponse(stanza)
356                self.route(response)
357
358
359
360class XMPPComponentServerFactory(xmlstream.XmlStreamServerFactory):
361    """
362    XMPP Component Server factory.
363
364    This factory accepts XMPP external component connections and makes
365    the router service route traffic for a component's bound domain
366    to that component.
367    """
368
369    logTraffic = False
370
371    def __init__(self, router, secret='secret'):
372        self.router = router
373        self.secret = secret
374
375        def authenticatorFactory():
376            return ListenComponentAuthenticator(self.secret)
377
378        xmlstream.XmlStreamServerFactory.__init__(self, authenticatorFactory)
379        self.addBootstrap(xmlstream.STREAM_CONNECTED_EVENT,
380                          self.makeConnection)
381        self.addBootstrap(xmlstream.STREAM_AUTHD_EVENT,
382                          self.connectionInitialized)
383
384        self.serial = 0
385
386
387    def makeConnection(self, xs):
388        """
389        Called when a component connection was made.
390
391        This enables traffic debugging on incoming streams.
392        """
393        xs.serial = self.serial
394        self.serial += 1
395
396        def logDataIn(buf):
397            log.msg("RECV (%d): %r" % (xs.serial, buf))
398
399        def logDataOut(buf):
400            log.msg("SEND (%d): %r" % (xs.serial, buf))
401
402        if self.logTraffic:
403            xs.rawDataInFn = logDataIn
404            xs.rawDataOutFn = logDataOut
405
406        xs.addObserver(xmlstream.STREAM_ERROR_EVENT, self.onError)
407
408
409    def connectionInitialized(self, xs):
410        """
411        Called when a component has succesfully authenticated.
412
413        Add the component to the routing table and establish a handler
414        for a closed connection.
415        """
416        destination = xs.thisEntity.host
417
418        self.router.addRoute(destination, xs)
419        xs.addObserver(xmlstream.STREAM_END_EVENT, self.connectionLost, 0,
420                                                   destination, xs)
421
422
423    def onError(self, reason):
424        log.err(reason, "Stream Error")
425
426
427    def connectionLost(self, destination, xs, reason):
428        self.router.removeRoute(destination, xs)
Note: See TracBrowser for help on using the repository browser.