source: wokkel/subprotocols.py @ 98:c3b20ad2bb70

Last change on this file since 98:c3b20ad2bb70 was 98:c3b20ad2bb70, checked in by Ralph Meijer <ralphm@…>, 10 years ago

Track change in Twisted Words, to pass a reason Failure with STREAM_END_EVENT.

  • Property exe set to *
File size: 11.7 KB
Line 
1# -*- test-case-name: wokkel.test.test_subprotocols -*-
2#
3# Copyright (c) Twisted Matrix Laboratories.
4# See LICENSE for details.
5
6"""
7XMPP subprotocol support.
8"""
9
10from zope.interface import implements
11
12from twisted.internet import defer
13from twisted.internet.error import ConnectionDone
14from twisted.python import failure, log
15from twisted.words.protocols.jabber import error, xmlstream
16from twisted.words.protocols.jabber.xmlstream import toResponse
17from twisted.words.xish import xpath
18from twisted.words.xish.domish import IElement
19
20from wokkel.iwokkel import IXMPPHandler, IXMPPHandlerCollection
21
22class XMPPHandler(object):
23    """
24    XMPP protocol handler.
25
26    Classes derived from this class implement (part of) one or more XMPP
27    extension protocols, and are referred to as a subprotocol implementation.
28    """
29
30    implements(IXMPPHandler)
31
32    def __init__(self):
33        self.parent = None
34        self.xmlstream = None
35
36
37    def setHandlerParent(self, parent):
38        self.parent = parent
39        self.parent.addHandler(self)
40
41
42    def disownHandlerParent(self, parent):
43        self.parent.removeHandler(self)
44        self.parent = None
45
46
47    def makeConnection(self, xs):
48        self.xmlstream = xs
49        self.connectionMade()
50
51
52    def connectionMade(self):
53        """
54        Called after a connection has been established.
55
56        Can be overridden to perform work before stream initialization.
57        """
58
59
60    def connectionInitialized(self):
61        """
62        The XML stream has been initialized.
63
64        Can be overridden to perform work after stream initialization, e.g. to
65        set up observers and start exchanging XML stanzas.
66        """
67
68
69    def connectionLost(self, reason):
70        """
71        The XML stream has been closed.
72
73        This method can be extended to inspect the C{reason} argument and
74        act on it.
75        """
76        self.xmlstream = None
77
78
79    def send(self, obj):
80        """
81        Send data over the managed XML stream.
82
83        @note: The stream manager maintains a queue for data sent using this
84               method when there is no current initialized XML stream. This
85               data is then sent as soon as a new stream has been established
86               and initialized. Subsequently, L{connectionInitialized} will be
87               called again. If this queueing is not desired, use C{send} on
88               C{self.xmlstream}.
89
90        @param obj: data to be sent over the XML stream. This is usually an
91                    object providing L{domish.IElement}, or serialized XML. See
92                    L{xmlstream.XmlStream} for details.
93        """
94        self.parent.send(obj)
95
96
97
98class XMPPHandlerCollection(object):
99    """
100    Collection of XMPP subprotocol handlers.
101
102    This allows for grouping of subprotocol handlers, but is not an
103    L{XMPPHandler} itself, so this is not recursive.
104
105    @ivar handlers: List of protocol handlers.
106    @type handlers: L{list} of objects providing
107                      L{IXMPPHandler}
108    """
109
110    implements(IXMPPHandlerCollection)
111
112    def __init__(self):
113        self.handlers = []
114
115
116    def __iter__(self):
117        """
118        Act as a container for handlers.
119        """
120        return iter(self.handlers)
121
122
123    def addHandler(self, handler):
124        """
125        Add protocol handler.
126
127        Protocol handlers are expected to provide L{IXMPPHandler}.
128        """
129        self.handlers.append(handler)
130
131
132    def removeHandler(self, handler):
133        """
134        Remove protocol handler.
135        """
136        self.handlers.remove(handler)
137
138
139
140class StreamManager(XMPPHandlerCollection):
141    """
142    Business logic representing a managed XMPP connection.
143
144    This maintains a single XMPP connection and provides facilities for packet
145    routing and transmission. Business logic modules are objects providing
146    L{IXMPPHandler} (like subclasses of L{XMPPHandler}), and added
147    using L{addHandler}.
148
149    @ivar xmlstream: currently managed XML stream
150    @type xmlstream: L{XmlStream}
151    @ivar logTraffic: if true, log all traffic.
152    @type logTraffic: L{bool}
153    @ivar _initialized: Whether the stream represented by L{xmlstream} has
154                        been initialized. This is used when caching outgoing
155                        stanzas.
156    @type _initialized: C{bool}
157    @ivar _packetQueue: internal buffer of unsent data. See L{send} for details.
158    @type _packetQueue: L{list}
159    """
160
161    logTraffic = False
162
163    def __init__(self, factory):
164        XMPPHandlerCollection.__init__(self)
165        self.xmlstream = None
166        self._packetQueue = []
167        self._initialized = False
168
169        factory.addBootstrap(xmlstream.STREAM_CONNECTED_EVENT, self._connected)
170        factory.addBootstrap(xmlstream.STREAM_AUTHD_EVENT, self._authd)
171        factory.addBootstrap(xmlstream.INIT_FAILED_EVENT,
172                             self.initializationFailed)
173        factory.addBootstrap(xmlstream.STREAM_END_EVENT, self._disconnected)
174        self.factory = factory
175
176
177    def addHandler(self, handler):
178        """
179        Add protocol handler.
180
181        When an XML stream has already been established, the handler's
182        C{connectionInitialized} will be called to get it up to speed.
183        """
184        XMPPHandlerCollection.addHandler(self, handler)
185
186        # get protocol handler up to speed when a connection has already
187        # been established
188        if self.xmlstream and self._initialized:
189            handler.makeConnection(self.xmlstream)
190            handler.connectionInitialized()
191
192
193    def _connected(self, xs):
194        """
195        Called when the transport connection has been established.
196
197        Here we optionally set up traffic logging (depending on L{logTraffic})
198        and call each handler's C{makeConnection} method with the L{XmlStream}
199        instance.
200        """
201        def logDataIn(buf):
202            log.msg("RECV: %r" % buf)
203
204        def logDataOut(buf):
205            log.msg("SEND: %r" % buf)
206
207        if self.logTraffic:
208            xs.rawDataInFn = logDataIn
209            xs.rawDataOutFn = logDataOut
210
211        self.xmlstream = xs
212
213        for e in self:
214            e.makeConnection(xs)
215
216
217    def _authd(self, xs):
218        """
219        Called when the stream has been initialized.
220
221        Send out cached stanzas and call each handler's
222        C{connectionInitialized} method.
223        """
224        # Flush all pending packets
225        for p in self._packetQueue:
226            xs.send(p)
227        self._packetQueue = []
228        self._initialized = True
229
230        # Notify all child services which implement
231        # the IService interface
232        for e in self:
233            e.connectionInitialized()
234
235
236    def initializationFailed(self, reason):
237        """
238        Called when stream initialization has failed.
239
240        Stream initialization has halted, with the reason indicated by
241        C{reason}. It may be retried by calling the authenticator's
242        C{initializeStream}. See the respective authenticators for details.
243
244        @param reason: A failure instance indicating why stream initialization
245                       failed.
246        @type reason: L{failure.Failure}
247        """
248
249
250    def _disconnected(self, reason):
251        """
252        Called when the stream has been closed.
253
254        From this point on, the manager doesn't interact with the
255        L{XmlStream} anymore and notifies each handler that the connection
256        was lost by calling its C{connectionLost} method.
257        """
258        self.xmlstream = None
259        self._initialized = False
260
261        # Twisted versions before 11.0 passed an XmlStream here.
262        if not hasattr(reason, 'trap'):
263            reason = failure.Failure(ConnectionDone())
264
265        # Notify all child services which implement
266        # the IService interface
267        for e in self:
268            e.connectionLost(reason)
269
270
271    def send(self, obj):
272        """
273        Send data over the XML stream.
274
275        When there is no established XML stream, the data is queued and sent
276        out when a new XML stream has been established and initialized.
277
278        @param obj: data to be sent over the XML stream. See
279                    L{xmlstream.XmlStream.send} for details.
280        """
281        if self._initialized:
282            self.xmlstream.send(obj)
283        else:
284            self._packetQueue.append(obj)
285
286
287
288class IQHandlerMixin(object):
289    """
290    XMPP subprotocol mixin for handle incoming IQ stanzas.
291
292    This matches up the iq with XPath queries to call methods on itself,
293    wrapping the call so that exceptions result in proper error responses,
294    or, when succesful will reply with a response with optional payload.
295
296    Derivatives of this class must provide an
297    L{XmlStream<twisted.words.protocols.jabber.xmlstream.XmlStream>} instance
298    in its C{xmlstream} attribute.
299
300    The optional payload is taken from the result of the handler and is
301    expected to be a child or a list of childs.
302
303    If an exception is raised, or the deferred has its errback called,
304    the exception is checked for being a L{error.StanzaError}. If so,
305    an error response is sent. Any other exception will cause a error
306    response of C{internal-server-error} to be sent.
307
308    A typical way to use this mixin, is to set up L{xpath} observers on the
309    C{xmlstream} to call handleRequest, for example in an overridden
310    L{XMPPHandler.connectionMade}. It is likely a good idea to only listen for
311    incoming iq get and/org iq set requests, and not for any iq, to prevent
312    hijacking incoming responses to outgoing iq requests. An example:
313
314        >>> QUERY_ROSTER = "/query[@xmlns='jabber:iq:roster']"
315        >>> class MyHandler(XMPPHandler, IQHandlerMixin):
316        ...    iqHandlers = {"/iq[@type='get']" + QUERY_ROSTER: 'onRosterGet',
317        ...                  "/iq[@type='set']" + QUERY_ROSTER: 'onRosterSet'}
318        ...    def connectionMade(self):
319        ...        self.xmlstream.addObserver(
320        ...          "/iq[@type='get' or @type='set']" + QUERY_ROSTER,
321        ...          self.handleRequest)
322        ...    def onRosterGet(self, iq):
323        ...        pass
324        ...    def onRosterSet(self, iq):
325        ...        pass
326
327    @cvar iqHandlers: Mapping from XPath queries (as a string) to the method
328                      name that will handle requests that match the query.
329    @type iqHandlers: L{dict}
330    """
331
332    iqHandlers = None
333
334    def handleRequest(self, iq):
335        """
336        Find a handler and wrap the call for sending a response stanza.
337        """
338        def toResult(result, iq):
339            response = toResponse(iq, 'result')
340
341            if result:
342                if IElement.providedBy(result):
343                    response.addChild(result)
344                else:
345                    for element in result:
346                        response.addChild(element)
347
348            return response
349
350        def checkNotImplemented(failure):
351            failure.trap(NotImplementedError)
352            raise error.StanzaError('feature-not-implemented')
353
354        def fromStanzaError(failure, iq):
355            e = failure.trap(error.StanzaError)
356            return failure.value.toResponse(iq)
357
358        def fromOtherError(failure, iq):
359            log.msg("Unhandled error in iq handler:", isError=True)
360            log.err(failure)
361            return error.StanzaError('internal-server-error').toResponse(iq)
362
363        handler = None
364        for queryString, method in self.iqHandlers.iteritems():
365            if xpath.internQuery(queryString).matches(iq):
366                handler = getattr(self, method)
367
368        if handler:
369            d = defer.maybeDeferred(handler, iq)
370        else:
371            d = defer.fail(NotImplementedError())
372
373        d.addCallback(toResult, iq)
374        d.addErrback(checkNotImplemented)
375        d.addErrback(fromStanzaError, iq)
376        d.addErrback(fromOtherError, iq)
377
378        d.addCallback(self.send)
379
380        iq.handled = True
Note: See TracBrowser for help on using the repository browser.