source: wokkel/component.py @ 39:a9e354f69018

Last change on this file since 39:a9e354f69018 was 39:a9e354f69018, checked in by Ralph Meijer <ralphm@…>, 14 years ago

Add Internal Component service that connects to a router in-process.

Author: ralphm.
Fixes #31.

File size: 9.9 KB
Line 
1# -*- test-case-name: wokkel.test.test_component -*-
2#
3# Copyright (c) 2003-2008 Ralph Meijer
4# See LICENSE for details.
5
6"""
7XMPP External Component utilities
8"""
9
10from twisted.application import service
11from twisted.internet import reactor
12from twisted.python import log
13from twisted.words.protocols.jabber.jid import internJID as JID
14from twisted.words.protocols.jabber import component, error, xmlstream
15from twisted.words.xish import domish
16
17from wokkel.generic import XmlPipe
18from wokkel.subprotocols import StreamManager
19
20NS_COMPONENT_ACCEPT = 'jabber:component:accept'
21
22class Component(StreamManager, service.Service):
23    def __init__(self, host, port, jid, password):
24        self.host = host
25        self.port = port
26
27        factory = component.componentFactory(jid, password)
28
29        StreamManager.__init__(self, factory)
30
31    def _authd(self, xs):
32        old_send = xs.send
33
34        def send(obj):
35            if domish.IElement.providedBy(obj) and \
36                    not obj.getAttribute('from'):
37                obj['from'] = self.xmlstream.thisEntity.full()
38            old_send(obj)
39
40        xs.send = send
41        StreamManager._authd(self, xs)
42
43    def initializationFailed(self, reason):
44        """
45        Called when stream initialization has failed.
46
47        Stop the service (thereby disconnecting the current stream) and
48        raise the exception.
49        """
50        self.stopService()
51        reason.raiseException()
52
53    def startService(self):
54        service.Service.startService(self)
55
56        self.factory.stopTrying()
57        self._connection = self._getConnection()
58
59    def stopService(self):
60        service.Service.stopService(self)
61
62        self._connection.disconnect()
63
64    def _getConnection(self):
65        return reactor.connectTCP(self.host, self.port, self.factory)
66
67
68
69class InternalComponent(xmlstream.XMPPHandlerCollection, service.Service):
70    """
71    Component service that connects directly to a router.
72
73    Instead of opening a socket to connect to a router, like L{Component},
74    components of this type connect to a router in the same process. This
75    allows for one-process XMPP servers.
76    """
77
78    def __init__(self, router, domain):
79        xmlstream.XMPPHandlerCollection.__init__(self)
80        self.router = router
81        self.domain = domain
82
83        self.xmlstream = None
84
85    def startService(self):
86        """
87        Create a XML pipe, connect to the router and setup handlers.
88        """
89        service.Service.startService(self)
90
91        self.pipe = XmlPipe()
92        self.xmlstream = self.pipe.source
93        self.router.addRoute(self.domain, self.pipe.sink)
94
95        for e in self:
96            e.makeConnection(self.xmlstream)
97            e.connectionInitialized()
98
99
100    def stopService(self):
101        """
102        Disconnect from the router and handlers.
103        """
104        service.Service.stopService(self)
105
106        self.router.removeRoute(self.domain, self.pipe.sink)
107        self.pipe = None
108        self.xmlstream = None
109
110        for e in self:
111            e.connectionLost(None)
112
113
114    def addHandler(self, handler):
115        """
116        Add a new handler and connect it to the stream.
117        """
118        xmlstream.XMPPHandlerCollection.addHandler(self, handler)
119
120        if self.xmlstream:
121            handler.makeConnection(self.xmlstream)
122            handler.connectionInitialized()
123
124
125    def send(self, obj):
126        """
127        Send data to the XML stream, so it ends up at the router.
128        """
129        self.xmlstream.send(obj)
130
131
132
133class ListenComponentAuthenticator(xmlstream.ListenAuthenticator):
134    """
135    Authenticator for accepting components.
136    """
137    namespace = NS_COMPONENT_ACCEPT
138
139    def __init__(self, secret):
140        self.secret = secret
141        xmlstream.ListenAuthenticator.__init__(self)
142
143
144    def associateWithStream(self, xs):
145        xs.version = (0, 0)
146        xmlstream.ListenAuthenticator.associateWithStream(self, xs)
147
148
149    def streamStarted(self, rootElement):
150        xmlstream.ListenAuthenticator.streamStarted(self, rootElement)
151
152        if rootElement.defaultUri != self.namespace:
153            exc = error.StreamError('invalid-namespace')
154            self.xmlstream.sendStreamError(exc)
155            return
156
157        # self.xmlstream.thisEntity is set to the address the component
158        # wants to assume. This should probably be checked.
159        if not self.xmlstream.thisEntity:
160            exc = error.StreamError('improper-addressing')
161            self.xmlstream.sendStreamError(exc)
162            return
163
164        self.xmlstream.sid = 'random' # FIXME
165
166        self.xmlstream.sendHeader()
167        self.xmlstream.addOnetimeObserver('/*', self.onElement)
168
169
170    def onElement(self, element):
171        if (element.uri, element.name) == (self.namespace, 'handshake'):
172            self.onHandshake(unicode(element))
173        else:
174            exc = error.streamError('not-authorized')
175            self.xmlstream.sendStreamError(exc)
176
177
178    def onHandshake(self, handshake):
179        calculatedHash = xmlstream.hashPassword(self.xmlstream.sid, self.secret)
180        if handshake != calculatedHash:
181            exc = error.StreamError('not-authorized', text='Invalid hash')
182            self.xmlstream.sendStreamError(exc)
183        else:
184            self.xmlstream.send('<handshake/>')
185            self.xmlstream.dispatch(self.xmlstream,
186                                    xmlstream.STREAM_AUTHD_EVENT)
187
188
189
190class RouterService(service.Service):
191    """
192    XMPP Server's Router Service.
193
194    This service connects the different components of the XMPP service and
195    routes messages between them based on the given routing table.
196
197    Connected components are trusted to have correct addressing in the
198    stanzas they offer for routing.
199
200    A route destination of C{None} adds a default route. Traffic for which no
201    specific route exists, will be routed to this default route.
202
203    @ivar routes: Routes based on the host part of JIDs. Maps host names to the
204                  L{EventDispatcher<utility.EventDispatcher>}s that should
205                  receive the traffic. A key of C{None} means the default
206                  route.
207    @type routes: C{dict}
208    """
209
210    def __init__(self):
211        self.routes = {}
212
213
214    def addRoute(self, destination, xs):
215        """
216        Add a new route.
217
218        The passed XML Stream C{xs} will have an observer for all stanzas
219        added to route its outgoing traffic. In turn, traffic for
220        C{destination} will be passed to this stream.
221
222        @param destination: Destination of the route to be added as a host name
223                            or C{None} for the default route.
224        @type destination: C{str} or C{NoneType}.
225        @param xs: XML Stream to register the route for.
226        @type xs: L{EventDispatcher<utility.EventDispatcher>}.
227        """
228        self.routes[destination] = xs
229        xs.addObserver('/*', self.route)
230
231
232    def removeRoute(self, destination, xs):
233        """
234        Remove a route.
235
236        @param destination: Destination of the route that should be removed.
237        @type destination: C{str}.
238        @param xs: XML Stream to remove the route for.
239        @type xs: L{EventDispatcher<utility.EventDispatcher>}.
240        """
241        xs.removeObserver('/*', self.route)
242        if (xs == self.routes[destination]):
243            del self.routes[destination]
244
245
246    def route(self, stanza):
247        """
248        Route a stanza.
249
250        @param stanza: The stanza to be routed.
251        @type stanza: L{domish.Element}.
252        """
253        if not list(stanza.elements()):
254            return
255
256        destination = JID(stanza['to'])
257
258        log.msg("Routing to %s: %r" % (destination.full(), stanza.toXml()))
259
260        if destination.host in self.routes:
261            self.routes[destination.host].send(stanza)
262        else:
263            self.routes[None].send(stanza)
264
265
266
267class ComponentServer(service.Service):
268    """
269    XMPP Component Server service.
270
271    This service accepts XMPP external component connections and makes
272    the router service route traffic for a component's bound domain
273    to that component.
274    """
275
276    logTraffic = False
277
278    def __init__(self, router, port=5347, secret='secret'):
279        self.router = router
280        self.port = port
281        self.secret = secret
282
283        def authenticatorFactory():
284            return ListenComponentAuthenticator(self.secret)
285
286        self.factory = xmlstream.XmlStreamServerFactory(authenticatorFactory)
287        self.factory.addBootstrap(xmlstream.STREAM_CONNECTED_EVENT,
288                                  self.makeConnection)
289        self.factory.addBootstrap(xmlstream.STREAM_AUTHD_EVENT,
290                                  self.connectionInitialized)
291
292        self.serial = 0
293
294
295    def startService(self):
296        service.Service.startService(self)
297        reactor.listenTCP(self.port, self.factory)
298
299
300    def makeConnection(self, xs):
301        """
302        Called when a component connection was made.
303
304        This enables traffic debugging on incoming streams.
305        """
306        xs.serial = self.serial
307        self.serial += 1
308
309        def logDataIn(buf):
310            log.msg("RECV (%d): %r" % (xs.serial, buf))
311
312        def logDataOut(buf):
313            log.msg("SEND (%d): %r" % (xs.serial, buf))
314
315        if self.logTraffic:
316            xs.rawDataInFn = logDataIn
317            xs.rawDataOutFn = logDataOut
318
319        xs.addObserver(xmlstream.STREAM_ERROR_EVENT, self.onError)
320
321
322    def connectionInitialized(self, xs):
323        """
324        Called when a component has succesfully authenticated.
325
326        Add the component to the routing table and establish a handler
327        for a closed connection.
328        """
329        destination = xs.thisEntity.host
330
331        self.router.addRoute(destination, xs)
332        xs.addObserver(xmlstream.STREAM_END_EVENT, self.connectionLost, 0,
333                                                   destination, xs)
334
335
336    def onError(self, reason):
337        log.err(reason, "Stream Error")
338
339
340    def connectionLost(self, destination, xs, reason):
341        self.router.removeRoute(destination, xs)
Note: See TracBrowser for help on using the repository browser.