source: wokkel/component.py @ 95:2d57501b5c68

Last change on this file since 95:2d57501b5c68 was 76:2919e60d3588, checked in by Ralph Meijer <ralphm@…>, 12 years ago

Sync with Twisted changes involving hashPassword.

See http://twistedmatrix.com/trac/ticket/3847.

File size: 11.7 KB
Line 
1# -*- test-case-name: wokkel.test.test_component -*-
2#
3# Copyright (c) 2003-2008 Ralph Meijer
4# See LICENSE for details.
5
6"""
7XMPP External Component utilities
8"""
9
10from twisted.application import service
11from twisted.internet import reactor
12from twisted.python import log
13from twisted.words.protocols.jabber.jid import internJID as JID
14from twisted.words.protocols.jabber import component, error, xmlstream
15from twisted.words.xish import domish
16
17try:
18    #from twisted.words.protocols.jabber.xmlstream import XMPPHandler
19    from twisted.words.protocols.jabber.xmlstream import XMPPHandlerCollection
20except ImportError:
21    #from wokkel.subprotocols import XMPPHandler
22    from wokkel.subprotocols import XMPPHandlerCollection
23
24try:
25    from twisted.words.protocols.jabber.xmlstream import XmlStreamServerFactory
26except ImportError:
27    from wokkel.compat import XmlStreamServerFactory
28
29from wokkel.generic import XmlPipe
30from wokkel.subprotocols import StreamManager
31
32NS_COMPONENT_ACCEPT = 'jabber:component:accept'
33
34class Component(StreamManager, service.Service):
35    def __init__(self, host, port, jid, password):
36        self.host = host
37        self.port = port
38
39        factory = component.componentFactory(jid, password)
40
41        StreamManager.__init__(self, factory)
42
43    def _authd(self, xs):
44        old_send = xs.send
45
46        def send(obj):
47            if domish.IElement.providedBy(obj) and \
48                    not obj.getAttribute('from'):
49                obj['from'] = self.xmlstream.thisEntity.full()
50            old_send(obj)
51
52        xs.send = send
53        StreamManager._authd(self, xs)
54
55    def initializationFailed(self, reason):
56        """
57        Called when stream initialization has failed.
58
59        Stop the service (thereby disconnecting the current stream) and
60        raise the exception.
61        """
62        self.stopService()
63        reason.raiseException()
64
65    def startService(self):
66        service.Service.startService(self)
67
68        self.factory.stopTrying()
69        self._connection = self._getConnection()
70
71    def stopService(self):
72        service.Service.stopService(self)
73
74        self._connection.disconnect()
75
76    def _getConnection(self):
77        return reactor.connectTCP(self.host, self.port, self.factory)
78
79
80
81class InternalComponent(XMPPHandlerCollection, service.Service):
82    """
83    Component service that connects directly to a router.
84
85    Instead of opening a socket to connect to a router, like L{Component},
86    components of this type connect to a router in the same process. This
87    allows for one-process XMPP servers.
88
89    @ivar domains: Domains (as C{str}) this component will handle traffic for.
90    @type domains: L{set}
91    """
92
93    def __init__(self, router, domain=None):
94        XMPPHandlerCollection.__init__(self)
95
96        self._router = router
97        self.domains = set()
98        if domain:
99            self.domains.add(domain)
100
101        self.xmlstream = None
102
103    def startService(self):
104        """
105        Create a XML pipe, connect to the router and setup handlers.
106        """
107        service.Service.startService(self)
108
109        self._pipe = XmlPipe()
110        self.xmlstream = self._pipe.source
111
112        for domain in self.domains:
113            self._router.addRoute(domain, self._pipe.sink)
114
115        for e in self:
116            e.makeConnection(self.xmlstream)
117            e.connectionInitialized()
118
119
120    def stopService(self):
121        """
122        Disconnect from the router and handlers.
123        """
124        service.Service.stopService(self)
125
126        for domain in self.domains:
127            self._router.removeRoute(domain, self._pipe.sink)
128
129        self._pipe = None
130        self.xmlstream = None
131
132        for e in self:
133            e.connectionLost(None)
134
135
136    def addHandler(self, handler):
137        """
138        Add a new handler and connect it to the stream.
139        """
140        XMPPHandlerCollection.addHandler(self, handler)
141
142        if self.xmlstream:
143            handler.makeConnection(self.xmlstream)
144            handler.connectionInitialized()
145
146
147    def send(self, obj):
148        """
149        Send data to the XML stream, so it ends up at the router.
150        """
151        self.xmlstream.send(obj)
152
153
154
155class ListenComponentAuthenticator(xmlstream.ListenAuthenticator):
156    """
157    Authenticator for accepting components.
158
159    @ivar secret: The shared used to authorized incoming component connections.
160    @type secret: C{unicode}.
161    """
162
163    namespace = NS_COMPONENT_ACCEPT
164
165    def __init__(self, secret):
166        self.secret = secret
167        xmlstream.ListenAuthenticator.__init__(self)
168
169
170    def associateWithStream(self, xs):
171        """
172        Associate the authenticator with a stream.
173
174        This sets the stream's version to 0.0, because the XEP-0114 component
175        protocol was not designed for XMPP 1.0.
176        """
177        xs.version = (0, 0)
178        xmlstream.ListenAuthenticator.associateWithStream(self, xs)
179
180
181    def streamStarted(self, rootElement):
182        """
183        Called by the stream when it has started.
184
185        This examines the default namespace of the incoming stream and whether
186        there is a requested hostname for the component. Then it generates a
187        stream identifier, sends a response header and adds an observer for
188        the first incoming element, triggering L{onElement}.
189        """
190
191        xmlstream.ListenAuthenticator.streamStarted(self, rootElement)
192
193        # Compatibility fix for pre-8.2 implementations of ListenAuthenticator
194        if not self.xmlstream.sid:
195            from twisted.python import randbytes
196            self.xmlstream.sid = randbytes.secureRandom(8).encode('hex')
197
198        if rootElement.defaultUri != self.namespace:
199            exc = error.StreamError('invalid-namespace')
200            self.xmlstream.sendStreamError(exc)
201            return
202
203        # self.xmlstream.thisEntity is set to the address the component
204        # wants to assume.
205        if not self.xmlstream.thisEntity:
206            exc = error.StreamError('improper-addressing')
207            self.xmlstream.sendStreamError(exc)
208            return
209
210        self.xmlstream.sendHeader()
211        self.xmlstream.addOnetimeObserver('/*', self.onElement)
212
213
214    def onElement(self, element):
215        """
216        Called on incoming XML Stanzas.
217
218        The very first element received should be a request for handshake.
219        Otherwise, the stream is dropped with a 'not-authorized' error. If a
220        handshake request was received, the hash is extracted and passed to
221        L{onHandshake}.
222        """
223        if (element.uri, element.name) == (self.namespace, 'handshake'):
224            self.onHandshake(unicode(element))
225        else:
226            exc = error.StreamError('not-authorized')
227            self.xmlstream.sendStreamError(exc)
228
229
230    def onHandshake(self, handshake):
231        """
232        Called upon receiving the handshake request.
233
234        This checks that the given hash in C{handshake} is equal to a
235        calculated hash, responding with a handshake reply or a stream error.
236        If the handshake was ok, the stream is authorized, and  XML Stanzas may
237        be exchanged.
238        """
239        calculatedHash = xmlstream.hashPassword(self.xmlstream.sid,
240                                                unicode(self.secret))
241        if handshake != calculatedHash:
242            exc = error.StreamError('not-authorized', text='Invalid hash')
243            self.xmlstream.sendStreamError(exc)
244        else:
245            self.xmlstream.send('<handshake/>')
246            self.xmlstream.dispatch(self.xmlstream,
247                                    xmlstream.STREAM_AUTHD_EVENT)
248
249
250
251class Router(object):
252    """
253    XMPP Server's Router.
254
255    A router connects the different components of the XMPP service and routes
256    messages between them based on the given routing table.
257
258    Connected components are trusted to have correct addressing in the
259    stanzas they offer for routing.
260
261    A route destination of C{None} adds a default route. Traffic for which no
262    specific route exists, will be routed to this default route.
263
264    @ivar routes: Routes based on the host part of JIDs. Maps host names to the
265                  L{EventDispatcher<utility.EventDispatcher>}s that should
266                  receive the traffic. A key of C{None} means the default
267                  route.
268    @type routes: C{dict}
269    """
270
271    def __init__(self):
272        self.routes = {}
273
274
275    def addRoute(self, destination, xs):
276        """
277        Add a new route.
278
279        The passed XML Stream C{xs} will have an observer for all stanzas
280        added to route its outgoing traffic. In turn, traffic for
281        C{destination} will be passed to this stream.
282
283        @param destination: Destination of the route to be added as a host name
284                            or C{None} for the default route.
285        @type destination: C{str} or C{NoneType}.
286        @param xs: XML Stream to register the route for.
287        @type xs: L{EventDispatcher<utility.EventDispatcher>}.
288        """
289        self.routes[destination] = xs
290        xs.addObserver('/*', self.route)
291
292
293    def removeRoute(self, destination, xs):
294        """
295        Remove a route.
296
297        @param destination: Destination of the route that should be removed.
298        @type destination: C{str}.
299        @param xs: XML Stream to remove the route for.
300        @type xs: L{EventDispatcher<utility.EventDispatcher>}.
301        """
302        xs.removeObserver('/*', self.route)
303        if (xs == self.routes[destination]):
304            del self.routes[destination]
305
306
307    def route(self, stanza):
308        """
309        Route a stanza.
310
311        @param stanza: The stanza to be routed.
312        @type stanza: L{domish.Element}.
313        """
314        destination = JID(stanza['to'])
315
316        log.msg("Routing to %s: %r" % (destination.full(), stanza.toXml()))
317
318        if destination.host in self.routes:
319            self.routes[destination.host].send(stanza)
320        else:
321            self.routes[None].send(stanza)
322
323
324
325class XMPPComponentServerFactory(XmlStreamServerFactory):
326    """
327    XMPP Component Server factory.
328
329    This factory accepts XMPP external component connections and makes
330    the router service route traffic for a component's bound domain
331    to that component.
332    """
333
334    logTraffic = False
335
336    def __init__(self, router, secret='secret'):
337        self.router = router
338        self.secret = secret
339
340        def authenticatorFactory():
341            return ListenComponentAuthenticator(self.secret)
342
343        XmlStreamServerFactory.__init__(self, authenticatorFactory)
344        self.addBootstrap(xmlstream.STREAM_CONNECTED_EVENT,
345                          self.makeConnection)
346        self.addBootstrap(xmlstream.STREAM_AUTHD_EVENT,
347                          self.connectionInitialized)
348
349        self.serial = 0
350
351
352    def makeConnection(self, xs):
353        """
354        Called when a component connection was made.
355
356        This enables traffic debugging on incoming streams.
357        """
358        xs.serial = self.serial
359        self.serial += 1
360
361        def logDataIn(buf):
362            log.msg("RECV (%d): %r" % (xs.serial, buf))
363
364        def logDataOut(buf):
365            log.msg("SEND (%d): %r" % (xs.serial, buf))
366
367        if self.logTraffic:
368            xs.rawDataInFn = logDataIn
369            xs.rawDataOutFn = logDataOut
370
371        xs.addObserver(xmlstream.STREAM_ERROR_EVENT, self.onError)
372
373
374    def connectionInitialized(self, xs):
375        """
376        Called when a component has succesfully authenticated.
377
378        Add the component to the routing table and establish a handler
379        for a closed connection.
380        """
381        destination = xs.thisEntity.host
382
383        self.router.addRoute(destination, xs)
384        xs.addObserver(xmlstream.STREAM_END_EVENT, self.connectionLost, 0,
385                                                   destination, xs)
386
387
388    def onError(self, reason):
389        log.err(reason, "Stream Error")
390
391
392    def connectionLost(self, destination, xs, reason):
393        self.router.removeRoute(destination, xs)
Note: See TracBrowser for help on using the repository browser.