source: wokkel/subprotocols.py @ 100:e7199edcb6f5

Last change on this file since 100:e7199edcb6f5 was 100:e7199edcb6f5, checked in by Ralph Meijer <ralphm@…>, 9 years ago

Add iq request (set/get) tracking for StreamManager?.

This adds a new request method to StreamManager? to serialize an iq get or iq
set request object (using toElement), send it over the stream (or queue it
until the next time a stream is initialized) and track the response stanza by
returning a Deferred.

Requests may have a timeout, so that the deferred errbacks after the timeout
(counting from the moment of calling request) has expired.

This implementation is similar to the response tracking done in
twisted.words.protocols.jabber.xmlstream.IQ, but slighly different:

  • Requests can be queued until the connection is initialized, even before a XmlStream? object is available. I.e. it is tied to the StreamManager?, not the stream.
  • Response timeout starts counting from the moment of calling request, not from the moment request is sent over the wire.
  • Property exe set to *
File size: 15.3 KB
Line 
1# -*- test-case-name: wokkel.test.test_subprotocols -*-
2#
3# Copyright (c) Twisted Matrix Laboratories.
4# See LICENSE for details.
5
6"""
7XMPP subprotocol support.
8"""
9
10from zope.interface import implements
11
12from twisted.internet import defer
13from twisted.internet.error import ConnectionDone
14from twisted.python import failure, log
15from twisted.words.protocols.jabber import error, xmlstream
16from twisted.words.protocols.jabber.xmlstream import toResponse
17from twisted.words.xish import xpath
18from twisted.words.xish.domish import IElement
19
20from wokkel.iwokkel import IXMPPHandler, IXMPPHandlerCollection
21
22class XMPPHandler(object):
23    """
24    XMPP protocol handler.
25
26    Classes derived from this class implement (part of) one or more XMPP
27    extension protocols, and are referred to as a subprotocol implementation.
28    """
29
30    implements(IXMPPHandler)
31
32    def __init__(self):
33        self.parent = None
34        self.xmlstream = None
35
36
37    def setHandlerParent(self, parent):
38        self.parent = parent
39        self.parent.addHandler(self)
40
41
42    def disownHandlerParent(self, parent):
43        self.parent.removeHandler(self)
44        self.parent = None
45
46
47    def makeConnection(self, xs):
48        self.xmlstream = xs
49        self.connectionMade()
50
51
52    def connectionMade(self):
53        """
54        Called after a connection has been established.
55
56        Can be overridden to perform work before stream initialization.
57        """
58
59
60    def connectionInitialized(self):
61        """
62        The XML stream has been initialized.
63
64        Can be overridden to perform work after stream initialization, e.g. to
65        set up observers and start exchanging XML stanzas.
66        """
67
68
69    def connectionLost(self, reason):
70        """
71        The XML stream has been closed.
72
73        This method can be extended to inspect the C{reason} argument and
74        act on it.
75        """
76        self.xmlstream = None
77
78
79    def send(self, obj):
80        """
81        Send data over the managed XML stream.
82
83        @note: The stream manager maintains a queue for data sent using this
84               method when there is no current initialized XML stream. This
85               data is then sent as soon as a new stream has been established
86               and initialized. Subsequently, L{connectionInitialized} will be
87               called again. If this queueing is not desired, use C{send} on
88               C{self.xmlstream}.
89
90        @param obj: data to be sent over the XML stream. This is usually an
91                    object providing L{domish.IElement}, or serialized XML. See
92                    L{xmlstream.XmlStream} for details.
93        """
94        self.parent.send(obj)
95
96
97
98class XMPPHandlerCollection(object):
99    """
100    Collection of XMPP subprotocol handlers.
101
102    This allows for grouping of subprotocol handlers, but is not an
103    L{XMPPHandler} itself, so this is not recursive.
104
105    @ivar handlers: List of protocol handlers.
106    @type handlers: L{list} of objects providing
107                      L{IXMPPHandler}
108    """
109
110    implements(IXMPPHandlerCollection)
111
112    def __init__(self):
113        self.handlers = []
114
115
116    def __iter__(self):
117        """
118        Act as a container for handlers.
119        """
120        return iter(self.handlers)
121
122
123    def addHandler(self, handler):
124        """
125        Add protocol handler.
126
127        Protocol handlers are expected to provide L{IXMPPHandler}.
128        """
129        self.handlers.append(handler)
130
131
132    def removeHandler(self, handler):
133        """
134        Remove protocol handler.
135        """
136        self.handlers.remove(handler)
137
138
139
140class StreamManager(XMPPHandlerCollection):
141    """
142    Business logic representing a managed XMPP connection.
143
144    This maintains a single XMPP connection and provides facilities for packet
145    routing and transmission. Business logic modules are objects providing
146    L{IXMPPHandler} (like subclasses of L{XMPPHandler}), and added
147    using L{addHandler}.
148
149    @ivar xmlstream: currently managed XML stream
150    @type xmlstream: L{XmlStream}
151    @ivar logTraffic: if true, log all traffic.
152    @type logTraffic: L{bool}
153    @ivar _initialized: Whether the stream represented by L{xmlstream} has
154                        been initialized. This is used when caching outgoing
155                        stanzas.
156    @type _initialized: C{bool}
157    @ivar _packetQueue: internal buffer of unsent data. See L{send} for details.
158    @type _packetQueue: L{list}
159    @ivar timeout: Default IQ request timeout in seconds.
160    @type timeout: C{int}
161    @ivar _reactor: A provider of L{IReactorTime} to track timeouts.
162    """
163    timeout = None
164    _reactor = None
165
166    logTraffic = False
167
168    def __init__(self, factory, reactor=None):
169        """
170        Construct a stream manager.
171
172        @param factory: The stream factory to connect with.
173        @param reactor: A provider of L{IReactorTime} to track timeouts.
174            If not provided, the global reactor will be used.
175        """
176        XMPPHandlerCollection.__init__(self)
177        self.xmlstream = None
178        self._packetQueue = []
179        self._initialized = False
180
181        factory.addBootstrap(xmlstream.STREAM_CONNECTED_EVENT, self._connected)
182        factory.addBootstrap(xmlstream.STREAM_AUTHD_EVENT, self._authd)
183        factory.addBootstrap(xmlstream.INIT_FAILED_EVENT,
184                             self.initializationFailed)
185        factory.addBootstrap(xmlstream.STREAM_END_EVENT, self._disconnected)
186        self.factory = factory
187
188        if reactor is None:
189            from twisted.internet import reactor
190        self._reactor = reactor
191
192        # Set up IQ response tracking
193        self._iqDeferreds = {}
194
195
196    def addHandler(self, handler):
197        """
198        Add protocol handler.
199
200        When an XML stream has already been established, the handler's
201        C{connectionInitialized} will be called to get it up to speed.
202        """
203        XMPPHandlerCollection.addHandler(self, handler)
204
205        # get protocol handler up to speed when a connection has already
206        # been established
207        if self.xmlstream:
208            handler.makeConnection(self.xmlstream)
209        if self._initialized:
210            handler.connectionInitialized()
211
212
213    def _connected(self, xs):
214        """
215        Called when the transport connection has been established.
216
217        Here we optionally set up traffic logging (depending on L{logTraffic})
218        and call each handler's C{makeConnection} method with the L{XmlStream}
219        instance.
220        """
221        def logDataIn(buf):
222            log.msg("RECV: %r" % buf)
223
224        def logDataOut(buf):
225            log.msg("SEND: %r" % buf)
226
227        if self.logTraffic:
228            xs.rawDataInFn = logDataIn
229            xs.rawDataOutFn = logDataOut
230
231        self.xmlstream = xs
232
233        for e in list(self):
234            e.makeConnection(xs)
235
236
237    def _authd(self, xs):
238        """
239        Called when the stream has been initialized.
240
241        Send out cached stanzas and call each handler's
242        C{connectionInitialized} method.
243        """
244
245        xs.addObserver('/iq[@type="result"]', self._onIQResponse)
246        xs.addObserver('/iq[@type="error"]', self._onIQResponse)
247
248        # Flush all pending packets
249        for p in self._packetQueue:
250            xs.send(p)
251        self._packetQueue = []
252        self._initialized = True
253
254        # Notify all child services which implement
255        # the IService interface
256        for e in list(self):
257            e.connectionInitialized()
258
259
260    def initializationFailed(self, reason):
261        """
262        Called when stream initialization has failed.
263
264        Stream initialization has halted, with the reason indicated by
265        C{reason}. It may be retried by calling the authenticator's
266        C{initializeStream}. See the respective authenticators for details.
267
268        @param reason: A failure instance indicating why stream initialization
269                       failed.
270        @type reason: L{failure.Failure}
271        """
272
273
274    def _disconnected(self, reason):
275        """
276        Called when the stream has been closed.
277
278        From this point on, the manager doesn't interact with the
279        L{XmlStream} anymore and notifies each handler that the connection
280        was lost by calling its C{connectionLost} method.
281        """
282        self.xmlstream = None
283        self._initialized = False
284
285        # Twisted versions before 11.0 passed an XmlStream here.
286        if not hasattr(reason, 'trap'):
287            reason = failure.Failure(ConnectionDone())
288
289        # Notify all child services which implement
290        # the IService interface
291        for e in list(self):
292            e.connectionLost(reason)
293
294        # This errbacks all deferreds of iq's for which no response has
295        # been received with a L{ConnectionLost} failure. Otherwise, the
296        # deferreds will never be fired.
297        iqDeferreds = self._iqDeferreds
298        self._iqDeferreds = {}
299        for d in iqDeferreds.itervalues():
300            d.errback(reason)
301
302
303    def _onIQResponse(self, iq):
304        """
305        Handle iq response by firing associated deferred.
306        """
307        try:
308            d = self._iqDeferreds[iq["id"]]
309        except KeyError:
310            return
311
312        del self._iqDeferreds[iq["id"]]
313        iq.handled = True
314        if iq['type'] == 'error':
315            d.errback(error.exceptionFromStanza(iq))
316        else:
317            d.callback(iq)
318
319
320    def send(self, obj):
321        """
322        Send data over the XML stream.
323
324        When there is no established XML stream, the data is queued and sent
325        out when a new XML stream has been established and initialized.
326
327        @param obj: data to be sent over the XML stream. See
328                    L{xmlstream.XmlStream.send} for details.
329        """
330        if self._initialized:
331            self.xmlstream.send(obj)
332        else:
333            self._packetQueue.append(obj)
334
335
336    def request(self, request):
337        """
338        Send an IQ request and track the response.
339
340        A request is an IQ L{generic.Stanza} of type C{'get'} or C{'set'}. It
341        will have its C{toElement} called to render to a L{domish.Element}
342        which is then sent out over the current stream. If there is no such
343        stream (yet), it is queued and sent whenever a connection is
344        established and initialized, just like L{send}.
345
346        If the request doesn't have an identifier, it will be assigned a fresh
347        one, so the response can be tracked.
348
349        The deferred that is returned will fire with the L{domish.Element}
350        representation of the response if it is a result iq. If the response
351        is an error iq, a corresponding L{error.StanzaError} will be errbacked.
352
353        If the connection is closed before a response was received, the deferred
354        will be errbacked with the reason failure.
355
356        A request may also have a timeout, either by setting a default timeout
357        in L{StreamManager.timeout} or on the C{timeout} attribute of the
358        request.
359
360        @param request: The IQ request.
361        @type request: L{generic.Request}
362        """
363        if (request.stanzaKind != 'iq' or
364            request.stanzaType not in ('get', 'set')):
365            return defer.fail(ValueError("Not a request"))
366
367        element = request.toElement()
368
369        # Make sure we have a trackable id on the stanza
370        if not request.stanzaID:
371            element.addUniqueId()
372            request.stanzaID = element['id']
373
374        # Set up iq response tracking
375        d = defer.Deferred()
376        self._iqDeferreds[element['id']] = d
377
378        timeout = getattr(request, 'timeout', self.timeout)
379
380        if timeout is not None:
381            def onTimeout():
382                del self._iqDeferreds[element['id']]
383                d.errback(xmlstream.TimeoutError("IQ timed out"))
384
385            call = self._reactor.callLater(timeout, onTimeout)
386
387            def cancelTimeout(result):
388                if call.active():
389                    call.cancel()
390
391                return result
392
393            d.addBoth(cancelTimeout)
394        self.send(element)
395        return d
396
397
398
399class IQHandlerMixin(object):
400    """
401    XMPP subprotocol mixin for handle incoming IQ stanzas.
402
403    This matches up the iq with XPath queries to call methods on itself,
404    wrapping the call so that exceptions result in proper error responses,
405    or, when succesful will reply with a response with optional payload.
406
407    Derivatives of this class must provide an
408    L{XmlStream<twisted.words.protocols.jabber.xmlstream.XmlStream>} instance
409    in its C{xmlstream} attribute.
410
411    The optional payload is taken from the result of the handler and is
412    expected to be a child or a list of childs.
413
414    If an exception is raised, or the deferred has its errback called,
415    the exception is checked for being a L{error.StanzaError}. If so,
416    an error response is sent. Any other exception will cause a error
417    response of C{internal-server-error} to be sent.
418
419    A typical way to use this mixin, is to set up L{xpath} observers on the
420    C{xmlstream} to call handleRequest, for example in an overridden
421    L{XMPPHandler.connectionMade}. It is likely a good idea to only listen for
422    incoming iq get and/org iq set requests, and not for any iq, to prevent
423    hijacking incoming responses to outgoing iq requests. An example:
424
425        >>> QUERY_ROSTER = "/query[@xmlns='jabber:iq:roster']"
426        >>> class MyHandler(XMPPHandler, IQHandlerMixin):
427        ...    iqHandlers = {"/iq[@type='get']" + QUERY_ROSTER: 'onRosterGet',
428        ...                  "/iq[@type='set']" + QUERY_ROSTER: 'onRosterSet'}
429        ...    def connectionMade(self):
430        ...        self.xmlstream.addObserver(
431        ...          "/iq[@type='get' or @type='set']" + QUERY_ROSTER,
432        ...          self.handleRequest)
433        ...    def onRosterGet(self, iq):
434        ...        pass
435        ...    def onRosterSet(self, iq):
436        ...        pass
437
438    @cvar iqHandlers: Mapping from XPath queries (as a string) to the method
439                      name that will handle requests that match the query.
440    @type iqHandlers: L{dict}
441    """
442
443    iqHandlers = None
444
445    def handleRequest(self, iq):
446        """
447        Find a handler and wrap the call for sending a response stanza.
448        """
449        def toResult(result, iq):
450            response = toResponse(iq, 'result')
451
452            if result:
453                if IElement.providedBy(result):
454                    response.addChild(result)
455                else:
456                    for element in result:
457                        response.addChild(element)
458
459            return response
460
461        def checkNotImplemented(failure):
462            failure.trap(NotImplementedError)
463            raise error.StanzaError('feature-not-implemented')
464
465        def fromStanzaError(failure, iq):
466            e = failure.trap(error.StanzaError)
467            return failure.value.toResponse(iq)
468
469        def fromOtherError(failure, iq):
470            log.msg("Unhandled error in iq handler:", isError=True)
471            log.err(failure)
472            return error.StanzaError('internal-server-error').toResponse(iq)
473
474        handler = None
475        for queryString, method in self.iqHandlers.iteritems():
476            if xpath.internQuery(queryString).matches(iq):
477                handler = getattr(self, method)
478
479        if handler:
480            d = defer.maybeDeferred(handler, iq)
481        else:
482            d = defer.fail(NotImplementedError())
483
484        d.addCallback(toResult, iq)
485        d.addErrback(checkNotImplemented)
486        d.addErrback(fromStanzaError, iq)
487        d.addErrback(fromOtherError, iq)
488
489        d.addCallback(self.send)
490
491        iq.handled = True
Note: See TracBrowser for help on using the repository browser.