source: wokkel/component.py @ 165:76a61f5aa343

Last change on this file since 165:76a61f5aa343 was 165:76a61f5aa343, checked in by Ralph Meijer <ralphm@…>, 8 years ago

Cleanups leading up to Wokkel 0.7.0.

As we now depend on Twisted 10.0.0 or higher, the following classes and
interfaces were deprecated:

This also resolves all Pyflakes warnings, changes links for www.xmpp.org to
xmpp.org and fixes the copyright notice in LICENSE to include 2012.

  • Property exe set to *
File size: 11.4 KB
Line 
1# -*- test-case-name: wokkel.test.test_component -*-
2#
3# Copyright (c) Ralph Meijer.
4# See LICENSE for details.
5
6"""
7XMPP External Component utilities
8"""
9
10from twisted.application import service
11from twisted.internet import reactor
12from twisted.python import log
13from twisted.words.protocols.jabber.jid import internJID as JID
14from twisted.words.protocols.jabber import component, error, xmlstream
15from twisted.words.xish import domish
16
17from wokkel.generic import XmlPipe
18from wokkel.subprotocols import StreamManager
19
20NS_COMPONENT_ACCEPT = 'jabber:component:accept'
21
22class Component(StreamManager, service.Service):
23    def __init__(self, host, port, jid, password):
24        self.host = host
25        self.port = port
26
27        factory = component.componentFactory(jid, password)
28
29        StreamManager.__init__(self, factory)
30
31    def _authd(self, xs):
32        old_send = xs.send
33
34        def send(obj):
35            if domish.IElement.providedBy(obj) and \
36                    not obj.getAttribute('from'):
37                obj['from'] = self.xmlstream.thisEntity.full()
38            old_send(obj)
39
40        xs.send = send
41        StreamManager._authd(self, xs)
42
43    def initializationFailed(self, reason):
44        """
45        Called when stream initialization has failed.
46
47        Stop the service (thereby disconnecting the current stream) and
48        raise the exception.
49        """
50        self.stopService()
51        reason.raiseException()
52
53    def startService(self):
54        service.Service.startService(self)
55
56        self.factory.stopTrying()
57        self._connection = self._getConnection()
58
59    def stopService(self):
60        service.Service.stopService(self)
61
62        self._connection.disconnect()
63
64    def _getConnection(self):
65        return reactor.connectTCP(self.host, self.port, self.factory)
66
67
68
69class InternalComponent(xmlstream.XMPPHandlerCollection, service.Service):
70    """
71    Component service that connects directly to a router.
72
73    Instead of opening a socket to connect to a router, like L{Component},
74    components of this type connect to a router in the same process. This
75    allows for one-process XMPP servers.
76
77    @ivar domains: Domains (as C{str}) this component will handle traffic for.
78    @type domains: L{set}
79    """
80
81    def __init__(self, router, domain=None):
82        xmlstream.XMPPHandlerCollection.__init__(self)
83
84        self._router = router
85        self.domains = set()
86        if domain:
87            self.domains.add(domain)
88
89        self.xmlstream = None
90
91    def startService(self):
92        """
93        Create a XML pipe, connect to the router and setup handlers.
94        """
95        service.Service.startService(self)
96
97        self._pipe = XmlPipe()
98        self.xmlstream = self._pipe.source
99
100        for domain in self.domains:
101            self._router.addRoute(domain, self._pipe.sink)
102
103        for e in self:
104            e.makeConnection(self.xmlstream)
105            e.connectionInitialized()
106
107
108    def stopService(self):
109        """
110        Disconnect from the router and handlers.
111        """
112        service.Service.stopService(self)
113
114        for domain in self.domains:
115            self._router.removeRoute(domain, self._pipe.sink)
116
117        self._pipe = None
118        self.xmlstream = None
119
120        for e in self:
121            e.connectionLost(None)
122
123
124    def addHandler(self, handler):
125        """
126        Add a new handler and connect it to the stream.
127        """
128        xmlstream.XMPPHandlerCollection.addHandler(self, handler)
129
130        if self.xmlstream:
131            handler.makeConnection(self.xmlstream)
132            handler.connectionInitialized()
133
134
135    def send(self, obj):
136        """
137        Send data to the XML stream, so it ends up at the router.
138        """
139        self.xmlstream.send(obj)
140
141
142
143class ListenComponentAuthenticator(xmlstream.ListenAuthenticator):
144    """
145    Authenticator for accepting components.
146
147    @ivar secret: The shared used to authorized incoming component connections.
148    @type secret: C{unicode}.
149    """
150
151    namespace = NS_COMPONENT_ACCEPT
152
153    def __init__(self, secret):
154        self.secret = secret
155        xmlstream.ListenAuthenticator.__init__(self)
156
157
158    def associateWithStream(self, xs):
159        """
160        Associate the authenticator with a stream.
161
162        This sets the stream's version to 0.0, because the XEP-0114 component
163        protocol was not designed for XMPP 1.0.
164        """
165        xs.version = (0, 0)
166        xmlstream.ListenAuthenticator.associateWithStream(self, xs)
167
168
169    def streamStarted(self, rootElement):
170        """
171        Called by the stream when it has started.
172
173        This examines the default namespace of the incoming stream and whether
174        there is a requested hostname for the component. Then it generates a
175        stream identifier, sends a response header and adds an observer for
176        the first incoming element, triggering L{onElement}.
177        """
178
179        xmlstream.ListenAuthenticator.streamStarted(self, rootElement)
180
181        # Compatibility fix for pre-8.2 implementations of ListenAuthenticator
182        if not self.xmlstream.sid:
183            from twisted.python import randbytes
184            self.xmlstream.sid = randbytes.secureRandom(8).encode('hex')
185
186        if rootElement.defaultUri != self.namespace:
187            exc = error.StreamError('invalid-namespace')
188            self.xmlstream.sendStreamError(exc)
189            return
190
191        # self.xmlstream.thisEntity is set to the address the component
192        # wants to assume.
193        if not self.xmlstream.thisEntity:
194            exc = error.StreamError('improper-addressing')
195            self.xmlstream.sendStreamError(exc)
196            return
197
198        self.xmlstream.sendHeader()
199        self.xmlstream.addOnetimeObserver('/*', self.onElement)
200
201
202    def onElement(self, element):
203        """
204        Called on incoming XML Stanzas.
205
206        The very first element received should be a request for handshake.
207        Otherwise, the stream is dropped with a 'not-authorized' error. If a
208        handshake request was received, the hash is extracted and passed to
209        L{onHandshake}.
210        """
211        if (element.uri, element.name) == (self.namespace, 'handshake'):
212            self.onHandshake(unicode(element))
213        else:
214            exc = error.StreamError('not-authorized')
215            self.xmlstream.sendStreamError(exc)
216
217
218    def onHandshake(self, handshake):
219        """
220        Called upon receiving the handshake request.
221
222        This checks that the given hash in C{handshake} is equal to a
223        calculated hash, responding with a handshake reply or a stream error.
224        If the handshake was ok, the stream is authorized, and  XML Stanzas may
225        be exchanged.
226        """
227        calculatedHash = xmlstream.hashPassword(self.xmlstream.sid,
228                                                unicode(self.secret))
229        if handshake != calculatedHash:
230            exc = error.StreamError('not-authorized', text='Invalid hash')
231            self.xmlstream.sendStreamError(exc)
232        else:
233            self.xmlstream.send('<handshake/>')
234            self.xmlstream.dispatch(self.xmlstream,
235                                    xmlstream.STREAM_AUTHD_EVENT)
236
237
238
239class Router(object):
240    """
241    XMPP Server's Router.
242
243    A router connects the different components of the XMPP service and routes
244    messages between them based on the given routing table.
245
246    Connected components are trusted to have correct addressing in the
247    stanzas they offer for routing.
248
249    A route destination of C{None} adds a default route. Traffic for which no
250    specific route exists, will be routed to this default route.
251
252    @ivar routes: Routes based on the host part of JIDs. Maps host names to the
253                  L{EventDispatcher<utility.EventDispatcher>}s that should
254                  receive the traffic. A key of C{None} means the default
255                  route.
256    @type routes: C{dict}
257    """
258
259    def __init__(self):
260        self.routes = {}
261
262
263    def addRoute(self, destination, xs):
264        """
265        Add a new route.
266
267        The passed XML Stream C{xs} will have an observer for all stanzas
268        added to route its outgoing traffic. In turn, traffic for
269        C{destination} will be passed to this stream.
270
271        @param destination: Destination of the route to be added as a host name
272                            or C{None} for the default route.
273        @type destination: C{str} or C{NoneType}.
274        @param xs: XML Stream to register the route for.
275        @type xs: L{EventDispatcher<utility.EventDispatcher>}.
276        """
277        self.routes[destination] = xs
278        xs.addObserver('/*', self.route)
279
280
281    def removeRoute(self, destination, xs):
282        """
283        Remove a route.
284
285        @param destination: Destination of the route that should be removed.
286        @type destination: C{str}.
287        @param xs: XML Stream to remove the route for.
288        @type xs: L{EventDispatcher<utility.EventDispatcher>}.
289        """
290        xs.removeObserver('/*', self.route)
291        if (xs == self.routes[destination]):
292            del self.routes[destination]
293
294
295    def route(self, stanza):
296        """
297        Route a stanza.
298
299        @param stanza: The stanza to be routed.
300        @type stanza: L{domish.Element}.
301        """
302        destination = JID(stanza['to'])
303
304        log.msg("Routing to %s: %r" % (destination.full(), stanza.toXml()))
305
306        if destination.host in self.routes:
307            self.routes[destination.host].send(stanza)
308        else:
309            self.routes[None].send(stanza)
310
311
312
313class XMPPComponentServerFactory(xmlstream.XmlStreamServerFactory):
314    """
315    XMPP Component Server factory.
316
317    This factory accepts XMPP external component connections and makes
318    the router service route traffic for a component's bound domain
319    to that component.
320    """
321
322    logTraffic = False
323
324    def __init__(self, router, secret='secret'):
325        self.router = router
326        self.secret = secret
327
328        def authenticatorFactory():
329            return ListenComponentAuthenticator(self.secret)
330
331        xmlstream.XmlStreamServerFactory.__init__(self, authenticatorFactory)
332        self.addBootstrap(xmlstream.STREAM_CONNECTED_EVENT,
333                          self.makeConnection)
334        self.addBootstrap(xmlstream.STREAM_AUTHD_EVENT,
335                          self.connectionInitialized)
336
337        self.serial = 0
338
339
340    def makeConnection(self, xs):
341        """
342        Called when a component connection was made.
343
344        This enables traffic debugging on incoming streams.
345        """
346        xs.serial = self.serial
347        self.serial += 1
348
349        def logDataIn(buf):
350            log.msg("RECV (%d): %r" % (xs.serial, buf))
351
352        def logDataOut(buf):
353            log.msg("SEND (%d): %r" % (xs.serial, buf))
354
355        if self.logTraffic:
356            xs.rawDataInFn = logDataIn
357            xs.rawDataOutFn = logDataOut
358
359        xs.addObserver(xmlstream.STREAM_ERROR_EVENT, self.onError)
360
361
362    def connectionInitialized(self, xs):
363        """
364        Called when a component has succesfully authenticated.
365
366        Add the component to the routing table and establish a handler
367        for a closed connection.
368        """
369        destination = xs.thisEntity.host
370
371        self.router.addRoute(destination, xs)
372        xs.addObserver(xmlstream.STREAM_END_EVENT, self.connectionLost, 0,
373                                                   destination, xs)
374
375
376    def onError(self, reason):
377        log.err(reason, "Stream Error")
378
379
380    def connectionLost(self, destination, xs, reason):
381        self.router.removeRoute(destination, xs)
Note: See TracBrowser for help on using the repository browser.