source: wokkel/component.py @ 166:d9c10a5b5c0d

Last change on this file since 166:d9c10a5b5c0d was 166:d9c10a5b5c0d, checked in by Ralph Meijer <ralphm@…>, 10 years ago

Documentation fixes for pydoctor.

  • Property exe set to *
File size: 11.4 KB
Line 
1# -*- test-case-name: wokkel.test.test_component -*-
2#
3# Copyright (c) Ralph Meijer.
4# See LICENSE for details.
5
6"""
7XMPP External Component utilities
8"""
9
10from twisted.application import service
11from twisted.internet import reactor
12from twisted.python import log
13from twisted.words.protocols.jabber.jid import internJID as JID
14from twisted.words.protocols.jabber import component, error, xmlstream
15from twisted.words.xish import domish
16
17from wokkel.generic import XmlPipe
18from wokkel.subprotocols import StreamManager
19
20NS_COMPONENT_ACCEPT = 'jabber:component:accept'
21
22class Component(StreamManager, service.Service):
23    def __init__(self, host, port, jid, password):
24        self.host = host
25        self.port = port
26
27        factory = component.componentFactory(jid, password)
28
29        StreamManager.__init__(self, factory)
30
31    def _authd(self, xs):
32        old_send = xs.send
33
34        def send(obj):
35            if domish.IElement.providedBy(obj) and \
36                    not obj.getAttribute('from'):
37                obj['from'] = self.xmlstream.thisEntity.full()
38            old_send(obj)
39
40        xs.send = send
41        StreamManager._authd(self, xs)
42
43    def initializationFailed(self, reason):
44        """
45        Called when stream initialization has failed.
46
47        Stop the service (thereby disconnecting the current stream) and
48        raise the exception.
49        """
50        self.stopService()
51        reason.raiseException()
52
53    def startService(self):
54        service.Service.startService(self)
55
56        self.factory.stopTrying()
57        self._connection = self._getConnection()
58
59    def stopService(self):
60        service.Service.stopService(self)
61
62        self._connection.disconnect()
63
64    def _getConnection(self):
65        return reactor.connectTCP(self.host, self.port, self.factory)
66
67
68
69class InternalComponent(xmlstream.XMPPHandlerCollection, service.Service):
70    """
71    Component service that connects directly to a router.
72
73    Instead of opening a socket to connect to a router, like L{Component},
74    components of this type connect to a router in the same process. This
75    allows for one-process XMPP servers.
76
77    @ivar domains: Domains (as C{str}) this component will handle traffic for.
78    @type domains: C{set}
79    """
80
81    def __init__(self, router, domain=None):
82        xmlstream.XMPPHandlerCollection.__init__(self)
83
84        self._router = router
85        self.domains = set()
86        if domain:
87            self.domains.add(domain)
88
89        self.xmlstream = None
90
91    def startService(self):
92        """
93        Create a XML pipe, connect to the router and setup handlers.
94        """
95        service.Service.startService(self)
96
97        self._pipe = XmlPipe()
98        self.xmlstream = self._pipe.source
99
100        for domain in self.domains:
101            self._router.addRoute(domain, self._pipe.sink)
102
103        for e in self:
104            e.makeConnection(self.xmlstream)
105            e.connectionInitialized()
106
107
108    def stopService(self):
109        """
110        Disconnect from the router and handlers.
111        """
112        service.Service.stopService(self)
113
114        for domain in self.domains:
115            self._router.removeRoute(domain, self._pipe.sink)
116
117        self._pipe = None
118        self.xmlstream = None
119
120        for e in self:
121            e.connectionLost(None)
122
123
124    def addHandler(self, handler):
125        """
126        Add a new handler and connect it to the stream.
127        """
128        xmlstream.XMPPHandlerCollection.addHandler(self, handler)
129
130        if self.xmlstream:
131            handler.makeConnection(self.xmlstream)
132            handler.connectionInitialized()
133
134
135    def send(self, obj):
136        """
137        Send data to the XML stream, so it ends up at the router.
138        """
139        self.xmlstream.send(obj)
140
141
142
143class ListenComponentAuthenticator(xmlstream.ListenAuthenticator):
144    """
145    Authenticator for accepting components.
146
147    @ivar secret: The shared used to authorized incoming component connections.
148    @type secret: C{unicode}.
149    """
150
151    namespace = NS_COMPONENT_ACCEPT
152
153    def __init__(self, secret):
154        self.secret = secret
155        xmlstream.ListenAuthenticator.__init__(self)
156
157
158    def associateWithStream(self, xs):
159        """
160        Associate the authenticator with a stream.
161
162        This sets the stream's version to 0.0, because the XEP-0114 component
163        protocol was not designed for XMPP 1.0.
164        """
165        xs.version = (0, 0)
166        xmlstream.ListenAuthenticator.associateWithStream(self, xs)
167
168
169    def streamStarted(self, rootElement):
170        """
171        Called by the stream when it has started.
172
173        This examines the default namespace of the incoming stream and whether
174        there is a requested hostname for the component. Then it generates a
175        stream identifier, sends a response header and adds an observer for
176        the first incoming element, triggering L{onElement}.
177        """
178
179        xmlstream.ListenAuthenticator.streamStarted(self, rootElement)
180
181        # Compatibility fix for pre-8.2 implementations of ListenAuthenticator
182        if not self.xmlstream.sid:
183            from twisted.python import randbytes
184            self.xmlstream.sid = randbytes.secureRandom(8).encode('hex')
185
186        if rootElement.defaultUri != self.namespace:
187            exc = error.StreamError('invalid-namespace')
188            self.xmlstream.sendStreamError(exc)
189            return
190
191        # self.xmlstream.thisEntity is set to the address the component
192        # wants to assume.
193        if not self.xmlstream.thisEntity:
194            exc = error.StreamError('improper-addressing')
195            self.xmlstream.sendStreamError(exc)
196            return
197
198        self.xmlstream.sendHeader()
199        self.xmlstream.addOnetimeObserver('/*', self.onElement)
200
201
202    def onElement(self, element):
203        """
204        Called on incoming XML Stanzas.
205
206        The very first element received should be a request for handshake.
207        Otherwise, the stream is dropped with a 'not-authorized' error. If a
208        handshake request was received, the hash is extracted and passed to
209        L{onHandshake}.
210        """
211        if (element.uri, element.name) == (self.namespace, 'handshake'):
212            self.onHandshake(unicode(element))
213        else:
214            exc = error.StreamError('not-authorized')
215            self.xmlstream.sendStreamError(exc)
216
217
218    def onHandshake(self, handshake):
219        """
220        Called upon receiving the handshake request.
221
222        This checks that the given hash in C{handshake} is equal to a
223        calculated hash, responding with a handshake reply or a stream error.
224        If the handshake was ok, the stream is authorized, and  XML Stanzas may
225        be exchanged.
226        """
227        calculatedHash = xmlstream.hashPassword(self.xmlstream.sid,
228                                                unicode(self.secret))
229        if handshake != calculatedHash:
230            exc = error.StreamError('not-authorized', text='Invalid hash')
231            self.xmlstream.sendStreamError(exc)
232        else:
233            self.xmlstream.send('<handshake/>')
234            self.xmlstream.dispatch(self.xmlstream,
235                                    xmlstream.STREAM_AUTHD_EVENT)
236
237
238
239class Router(object):
240    """
241    XMPP Server's Router.
242
243    A router connects the different components of the XMPP service and routes
244    messages between them based on the given routing table.
245
246    Connected components are trusted to have correct addressing in the
247    stanzas they offer for routing.
248
249    A route destination of C{None} adds a default route. Traffic for which no
250    specific route exists, will be routed to this default route.
251
252    @ivar routes: Routes based on the host part of JIDs. Maps host names to the
253        L{EventDispatcher<twisted.words.xish.utility.EventDispatcher>}s that
254        should receive the traffic. A key of C{None} means the default route.
255    @type routes: C{dict}
256    """
257
258    def __init__(self):
259        self.routes = {}
260
261
262    def addRoute(self, destination, xs):
263        """
264        Add a new route.
265
266        The passed XML Stream C{xs} will have an observer for all stanzas
267        added to route its outgoing traffic. In turn, traffic for
268        C{destination} will be passed to this stream.
269
270        @param destination: Destination of the route to be added as a host name
271                            or C{None} for the default route.
272        @type destination: C{str} or C{NoneType}
273
274        @param xs: XML Stream to register the route for.
275        @type xs:
276            L{EventDispatcher<twisted.words.xish.utility.EventDispatcher>}
277        """
278        self.routes[destination] = xs
279        xs.addObserver('/*', self.route)
280
281
282    def removeRoute(self, destination, xs):
283        """
284        Remove a route.
285
286        @param destination: Destination of the route that should be removed.
287        @type destination: C{str}.
288
289        @param xs: XML Stream to remove the route for.
290        @type xs:
291            L{EventDispatcher<twisted.words.xish.utility.EventDispatcher>}
292        """
293        xs.removeObserver('/*', self.route)
294        if (xs == self.routes[destination]):
295            del self.routes[destination]
296
297
298    def route(self, stanza):
299        """
300        Route a stanza.
301
302        @param stanza: The stanza to be routed.
303        @type stanza: L{domish.Element}.
304        """
305        destination = JID(stanza['to'])
306
307        log.msg("Routing to %s: %r" % (destination.full(), stanza.toXml()))
308
309        if destination.host in self.routes:
310            self.routes[destination.host].send(stanza)
311        else:
312            self.routes[None].send(stanza)
313
314
315
316class XMPPComponentServerFactory(xmlstream.XmlStreamServerFactory):
317    """
318    XMPP Component Server factory.
319
320    This factory accepts XMPP external component connections and makes
321    the router service route traffic for a component's bound domain
322    to that component.
323    """
324
325    logTraffic = False
326
327    def __init__(self, router, secret='secret'):
328        self.router = router
329        self.secret = secret
330
331        def authenticatorFactory():
332            return ListenComponentAuthenticator(self.secret)
333
334        xmlstream.XmlStreamServerFactory.__init__(self, authenticatorFactory)
335        self.addBootstrap(xmlstream.STREAM_CONNECTED_EVENT,
336                          self.makeConnection)
337        self.addBootstrap(xmlstream.STREAM_AUTHD_EVENT,
338                          self.connectionInitialized)
339
340        self.serial = 0
341
342
343    def makeConnection(self, xs):
344        """
345        Called when a component connection was made.
346
347        This enables traffic debugging on incoming streams.
348        """
349        xs.serial = self.serial
350        self.serial += 1
351
352        def logDataIn(buf):
353            log.msg("RECV (%d): %r" % (xs.serial, buf))
354
355        def logDataOut(buf):
356            log.msg("SEND (%d): %r" % (xs.serial, buf))
357
358        if self.logTraffic:
359            xs.rawDataInFn = logDataIn
360            xs.rawDataOutFn = logDataOut
361
362        xs.addObserver(xmlstream.STREAM_ERROR_EVENT, self.onError)
363
364
365    def connectionInitialized(self, xs):
366        """
367        Called when a component has succesfully authenticated.
368
369        Add the component to the routing table and establish a handler
370        for a closed connection.
371        """
372        destination = xs.thisEntity.host
373
374        self.router.addRoute(destination, xs)
375        xs.addObserver(xmlstream.STREAM_END_EVENT, self.connectionLost, 0,
376                                                   destination, xs)
377
378
379    def onError(self, reason):
380        log.err(reason, "Stream Error")
381
382
383    def connectionLost(self, destination, xs, reason):
384        self.router.removeRoute(destination, xs)
Note: See TracBrowser for help on using the repository browser.