source: wokkel/component.py @ 47:895c67e2ed9f

Last change on this file since 47:895c67e2ed9f was 41:72fa7b817767, checked in by Ralph Meijer <ralphm@…>, 14 years ago

Compatibility fixes for Twisted 8.0 and 8.1.

File size: 11.4 KB
Line 
1# -*- test-case-name: wokkel.test.test_component -*-
2#
3# Copyright (c) 2003-2008 Ralph Meijer
4# See LICENSE for details.
5
6"""
7XMPP External Component utilities
8"""
9
10from twisted.application import service
11from twisted.internet import reactor
12from twisted.python import log
13from twisted.words.protocols.jabber.jid import internJID as JID
14from twisted.words.protocols.jabber import component, error, xmlstream
15from twisted.words.xish import domish
16
17try:
18    #from twisted.words.protocols.jabber.xmlstream import XMPPHandler
19    from twisted.words.protocols.jabber.xmlstream import XMPPHandlerCollection
20except ImportError:
21    #from wokkel.subprotocols import XMPPHandler
22    from wokkel.subprotocols import XMPPHandlerCollection
23
24try:
25    from twisted.words.protocols.jabber.xmlstream import XmlStreamServerFactory
26except ImportError:
27    from wokkel.compat import XmlStreamServerFactory
28
29from wokkel.generic import XmlPipe
30from wokkel.subprotocols import StreamManager
31
32NS_COMPONENT_ACCEPT = 'jabber:component:accept'
33
34class Component(StreamManager, service.Service):
35    def __init__(self, host, port, jid, password):
36        self.host = host
37        self.port = port
38
39        factory = component.componentFactory(jid, password)
40
41        StreamManager.__init__(self, factory)
42
43    def _authd(self, xs):
44        old_send = xs.send
45
46        def send(obj):
47            if domish.IElement.providedBy(obj) and \
48                    not obj.getAttribute('from'):
49                obj['from'] = self.xmlstream.thisEntity.full()
50            old_send(obj)
51
52        xs.send = send
53        StreamManager._authd(self, xs)
54
55    def initializationFailed(self, reason):
56        """
57        Called when stream initialization has failed.
58
59        Stop the service (thereby disconnecting the current stream) and
60        raise the exception.
61        """
62        self.stopService()
63        reason.raiseException()
64
65    def startService(self):
66        service.Service.startService(self)
67
68        self.factory.stopTrying()
69        self._connection = self._getConnection()
70
71    def stopService(self):
72        service.Service.stopService(self)
73
74        self._connection.disconnect()
75
76    def _getConnection(self):
77        return reactor.connectTCP(self.host, self.port, self.factory)
78
79
80
81class InternalComponent(XMPPHandlerCollection, service.Service):
82    """
83    Component service that connects directly to a router.
84
85    Instead of opening a socket to connect to a router, like L{Component},
86    components of this type connect to a router in the same process. This
87    allows for one-process XMPP servers.
88    """
89
90    def __init__(self, router, domain):
91        XMPPHandlerCollection.__init__(self)
92        self.router = router
93        self.domain = domain
94
95        self.xmlstream = None
96
97    def startService(self):
98        """
99        Create a XML pipe, connect to the router and setup handlers.
100        """
101        service.Service.startService(self)
102
103        self.pipe = XmlPipe()
104        self.xmlstream = self.pipe.source
105        self.router.addRoute(self.domain, self.pipe.sink)
106
107        for e in self:
108            e.makeConnection(self.xmlstream)
109            e.connectionInitialized()
110
111
112    def stopService(self):
113        """
114        Disconnect from the router and handlers.
115        """
116        service.Service.stopService(self)
117
118        self.router.removeRoute(self.domain, self.pipe.sink)
119        self.pipe = None
120        self.xmlstream = None
121
122        for e in self:
123            e.connectionLost(None)
124
125
126    def addHandler(self, handler):
127        """
128        Add a new handler and connect it to the stream.
129        """
130        XMPPHandlerCollection.addHandler(self, handler)
131
132        if self.xmlstream:
133            handler.makeConnection(self.xmlstream)
134            handler.connectionInitialized()
135
136
137    def send(self, obj):
138        """
139        Send data to the XML stream, so it ends up at the router.
140        """
141        self.xmlstream.send(obj)
142
143
144
145class ListenComponentAuthenticator(xmlstream.ListenAuthenticator):
146    """
147    Authenticator for accepting components.
148
149    @ivar secret: The shared used to authorized incoming component connections.
150    @type secret: C{str}.
151    """
152
153    namespace = NS_COMPONENT_ACCEPT
154
155    def __init__(self, secret):
156        self.secret = secret
157        xmlstream.ListenAuthenticator.__init__(self)
158
159
160    def associateWithStream(self, xs):
161        """
162        Associate the authenticator with a stream.
163
164        This sets the stream's version to 0.0, because the XEP-0114 component
165        protocol was not designed for XMPP 1.0.
166        """
167        xs.version = (0, 0)
168        xmlstream.ListenAuthenticator.associateWithStream(self, xs)
169
170
171    def streamStarted(self, rootElement):
172        """
173        Called by the stream when it has started.
174
175        This examines the default namespace of the incoming stream and whether
176        there is a requested hostname for the component. Then it generates a
177        stream identifier, sends a response header and adds an observer for
178        the first incoming element, triggering L{onElement}.
179        """
180
181        xmlstream.ListenAuthenticator.streamStarted(self, rootElement)
182
183        # Compatibility fix for pre-8.2 implementations of ListenAuthenticator
184        if not self.xmlstream.sid:
185            from twisted.python import randbytes
186            self.xmlstream.sid = randbytes.secureRandom(8).encode('hex')
187
188        if rootElement.defaultUri != self.namespace:
189            exc = error.StreamError('invalid-namespace')
190            self.xmlstream.sendStreamError(exc)
191            return
192
193        # self.xmlstream.thisEntity is set to the address the component
194        # wants to assume.
195        if not self.xmlstream.thisEntity:
196            exc = error.StreamError('improper-addressing')
197            self.xmlstream.sendStreamError(exc)
198            return
199
200        self.xmlstream.sendHeader()
201        self.xmlstream.addOnetimeObserver('/*', self.onElement)
202
203
204    def onElement(self, element):
205        """
206        Called on incoming XML Stanzas.
207
208        The very first element received should be a request for handshake.
209        Otherwise, the stream is dropped with a 'not-authorized' error. If a
210        handshake request was received, the hash is extracted and passed to
211        L{onHandshake}.
212        """
213        if (element.uri, element.name) == (self.namespace, 'handshake'):
214            self.onHandshake(unicode(element))
215        else:
216            exc = error.StreamError('not-authorized')
217            self.xmlstream.sendStreamError(exc)
218
219
220    def onHandshake(self, handshake):
221        """
222        Called upon receiving the handshake request.
223
224        This checks that the given hash in C{handshake} is equal to a
225        calculated hash, responding with a handshake reply or a stream error.
226        If the handshake was ok, the stream is authorized, and  XML Stanzas may
227        be exchanged.
228        """
229        calculatedHash = xmlstream.hashPassword(self.xmlstream.sid, self.secret)
230        if handshake != calculatedHash:
231            exc = error.StreamError('not-authorized', text='Invalid hash')
232            self.xmlstream.sendStreamError(exc)
233        else:
234            self.xmlstream.send('<handshake/>')
235            self.xmlstream.dispatch(self.xmlstream,
236                                    xmlstream.STREAM_AUTHD_EVENT)
237
238
239
240class Router(object):
241    """
242    XMPP Server's Router.
243
244    A router connects the different components of the XMPP service and routes
245    messages between them based on the given routing table.
246
247    Connected components are trusted to have correct addressing in the
248    stanzas they offer for routing.
249
250    A route destination of C{None} adds a default route. Traffic for which no
251    specific route exists, will be routed to this default route.
252
253    @ivar routes: Routes based on the host part of JIDs. Maps host names to the
254                  L{EventDispatcher<utility.EventDispatcher>}s that should
255                  receive the traffic. A key of C{None} means the default
256                  route.
257    @type routes: C{dict}
258    """
259
260    def __init__(self):
261        self.routes = {}
262
263
264    def addRoute(self, destination, xs):
265        """
266        Add a new route.
267
268        The passed XML Stream C{xs} will have an observer for all stanzas
269        added to route its outgoing traffic. In turn, traffic for
270        C{destination} will be passed to this stream.
271
272        @param destination: Destination of the route to be added as a host name
273                            or C{None} for the default route.
274        @type destination: C{str} or C{NoneType}.
275        @param xs: XML Stream to register the route for.
276        @type xs: L{EventDispatcher<utility.EventDispatcher>}.
277        """
278        self.routes[destination] = xs
279        xs.addObserver('/*', self.route)
280
281
282    def removeRoute(self, destination, xs):
283        """
284        Remove a route.
285
286        @param destination: Destination of the route that should be removed.
287        @type destination: C{str}.
288        @param xs: XML Stream to remove the route for.
289        @type xs: L{EventDispatcher<utility.EventDispatcher>}.
290        """
291        xs.removeObserver('/*', self.route)
292        if (xs == self.routes[destination]):
293            del self.routes[destination]
294
295
296    def route(self, stanza):
297        """
298        Route a stanza.
299
300        @param stanza: The stanza to be routed.
301        @type stanza: L{domish.Element}.
302        """
303        destination = JID(stanza['to'])
304
305        log.msg("Routing to %s: %r" % (destination.full(), stanza.toXml()))
306
307        if destination.host in self.routes:
308            self.routes[destination.host].send(stanza)
309        else:
310            self.routes[None].send(stanza)
311
312
313
314class XMPPComponentServerFactory(XmlStreamServerFactory):
315    """
316    XMPP Component Server factory.
317
318    This factory accepts XMPP external component connections and makes
319    the router service route traffic for a component's bound domain
320    to that component.
321    """
322
323    logTraffic = False
324
325    def __init__(self, router, secret='secret'):
326        self.router = router
327        self.secret = secret
328
329        def authenticatorFactory():
330            return ListenComponentAuthenticator(self.secret)
331
332        XmlStreamServerFactory.__init__(self, authenticatorFactory)
333        self.addBootstrap(xmlstream.STREAM_CONNECTED_EVENT,
334                          self.makeConnection)
335        self.addBootstrap(xmlstream.STREAM_AUTHD_EVENT,
336                          self.connectionInitialized)
337
338        self.serial = 0
339
340
341    def makeConnection(self, xs):
342        """
343        Called when a component connection was made.
344
345        This enables traffic debugging on incoming streams.
346        """
347        xs.serial = self.serial
348        self.serial += 1
349
350        def logDataIn(buf):
351            log.msg("RECV (%d): %r" % (xs.serial, buf))
352
353        def logDataOut(buf):
354            log.msg("SEND (%d): %r" % (xs.serial, buf))
355
356        if self.logTraffic:
357            xs.rawDataInFn = logDataIn
358            xs.rawDataOutFn = logDataOut
359
360        xs.addObserver(xmlstream.STREAM_ERROR_EVENT, self.onError)
361
362
363    def connectionInitialized(self, xs):
364        """
365        Called when a component has succesfully authenticated.
366
367        Add the component to the routing table and establish a handler
368        for a closed connection.
369        """
370        destination = xs.thisEntity.host
371
372        self.router.addRoute(destination, xs)
373        xs.addObserver(xmlstream.STREAM_END_EVENT, self.connectionLost, 0,
374                                                   destination, xs)
375
376
377    def onError(self, reason):
378        log.err(reason, "Stream Error")
379
380
381    def connectionLost(self, destination, xs, reason):
382        self.router.removeRoute(destination, xs)
Note: See TracBrowser for help on using the repository browser.