source: wokkel/subprotocols.py @ 101:d88462a49f86

Last change on this file since 101:d88462a49f86 was 101:d88462a49f86, checked in by Ralph Meijer <ralphm@…>, 9 years ago

Add request method to XMPPHandler, passing up to its StreamManager?.

  • Property exe set to *
File size: 15.6 KB
Line 
1# -*- test-case-name: wokkel.test.test_subprotocols -*-
2#
3# Copyright (c) Twisted Matrix Laboratories.
4# See LICENSE for details.
5
6"""
7XMPP subprotocol support.
8"""
9
10from zope.interface import implements
11
12from twisted.internet import defer
13from twisted.internet.error import ConnectionDone
14from twisted.python import failure, log
15from twisted.words.protocols.jabber import error, xmlstream
16from twisted.words.protocols.jabber.xmlstream import toResponse
17from twisted.words.xish import xpath
18from twisted.words.xish.domish import IElement
19
20from wokkel.iwokkel import IXMPPHandler, IXMPPHandlerCollection
21
22class XMPPHandler(object):
23    """
24    XMPP protocol handler.
25
26    Classes derived from this class implement (part of) one or more XMPP
27    extension protocols, and are referred to as a subprotocol implementation.
28    """
29
30    implements(IXMPPHandler)
31
32    def __init__(self):
33        self.parent = None
34        self.xmlstream = None
35
36
37    def setHandlerParent(self, parent):
38        self.parent = parent
39        self.parent.addHandler(self)
40
41
42    def disownHandlerParent(self, parent):
43        self.parent.removeHandler(self)
44        self.parent = None
45
46
47    def makeConnection(self, xs):
48        self.xmlstream = xs
49        self.connectionMade()
50
51
52    def connectionMade(self):
53        """
54        Called after a connection has been established.
55
56        Can be overridden to perform work before stream initialization.
57        """
58
59
60    def connectionInitialized(self):
61        """
62        The XML stream has been initialized.
63
64        Can be overridden to perform work after stream initialization, e.g. to
65        set up observers and start exchanging XML stanzas.
66        """
67
68
69    def connectionLost(self, reason):
70        """
71        The XML stream has been closed.
72
73        This method can be extended to inspect the C{reason} argument and
74        act on it.
75        """
76        self.xmlstream = None
77
78
79    def send(self, obj):
80        """
81        Send data over the managed XML stream.
82
83        @note: The stream manager maintains a queue for data sent using this
84               method when there is no current initialized XML stream. This
85               data is then sent as soon as a new stream has been established
86               and initialized. Subsequently, L{connectionInitialized} will be
87               called again. If this queueing is not desired, use C{send} on
88               C{self.xmlstream}.
89
90        @param obj: data to be sent over the XML stream. This is usually an
91                    object providing L{domish.IElement}, or serialized XML. See
92                    L{xmlstream.XmlStream} for details.
93        """
94        self.parent.send(obj)
95
96
97    def request(self, request):
98        """
99        Send an IQ request and track the response.
100
101        This passes the request to the parent for sending and response
102        tracking.
103
104        @see: L{StreamManager.request}.
105        """
106        return self.parent.request(request)
107
108
109
110class XMPPHandlerCollection(object):
111    """
112    Collection of XMPP subprotocol handlers.
113
114    This allows for grouping of subprotocol handlers, but is not an
115    L{XMPPHandler} itself, so this is not recursive.
116
117    @ivar handlers: List of protocol handlers.
118    @type handlers: L{list} of objects providing
119                      L{IXMPPHandler}
120    """
121
122    implements(IXMPPHandlerCollection)
123
124    def __init__(self):
125        self.handlers = []
126
127
128    def __iter__(self):
129        """
130        Act as a container for handlers.
131        """
132        return iter(self.handlers)
133
134
135    def addHandler(self, handler):
136        """
137        Add protocol handler.
138
139        Protocol handlers are expected to provide L{IXMPPHandler}.
140        """
141        self.handlers.append(handler)
142
143
144    def removeHandler(self, handler):
145        """
146        Remove protocol handler.
147        """
148        self.handlers.remove(handler)
149
150
151
152class StreamManager(XMPPHandlerCollection):
153    """
154    Business logic representing a managed XMPP connection.
155
156    This maintains a single XMPP connection and provides facilities for packet
157    routing and transmission. Business logic modules are objects providing
158    L{IXMPPHandler} (like subclasses of L{XMPPHandler}), and added
159    using L{addHandler}.
160
161    @ivar xmlstream: currently managed XML stream
162    @type xmlstream: L{XmlStream}
163    @ivar logTraffic: if true, log all traffic.
164    @type logTraffic: L{bool}
165    @ivar _initialized: Whether the stream represented by L{xmlstream} has
166                        been initialized. This is used when caching outgoing
167                        stanzas.
168    @type _initialized: C{bool}
169    @ivar _packetQueue: internal buffer of unsent data. See L{send} for details.
170    @type _packetQueue: L{list}
171    @ivar timeout: Default IQ request timeout in seconds.
172    @type timeout: C{int}
173    @ivar _reactor: A provider of L{IReactorTime} to track timeouts.
174    """
175    timeout = None
176    _reactor = None
177
178    logTraffic = False
179
180    def __init__(self, factory, reactor=None):
181        """
182        Construct a stream manager.
183
184        @param factory: The stream factory to connect with.
185        @param reactor: A provider of L{IReactorTime} to track timeouts.
186            If not provided, the global reactor will be used.
187        """
188        XMPPHandlerCollection.__init__(self)
189        self.xmlstream = None
190        self._packetQueue = []
191        self._initialized = False
192
193        factory.addBootstrap(xmlstream.STREAM_CONNECTED_EVENT, self._connected)
194        factory.addBootstrap(xmlstream.STREAM_AUTHD_EVENT, self._authd)
195        factory.addBootstrap(xmlstream.INIT_FAILED_EVENT,
196                             self.initializationFailed)
197        factory.addBootstrap(xmlstream.STREAM_END_EVENT, self._disconnected)
198        self.factory = factory
199
200        if reactor is None:
201            from twisted.internet import reactor
202        self._reactor = reactor
203
204        # Set up IQ response tracking
205        self._iqDeferreds = {}
206
207
208    def addHandler(self, handler):
209        """
210        Add protocol handler.
211
212        When an XML stream has already been established, the handler's
213        C{connectionInitialized} will be called to get it up to speed.
214        """
215        XMPPHandlerCollection.addHandler(self, handler)
216
217        # get protocol handler up to speed when a connection has already
218        # been established
219        if self.xmlstream:
220            handler.makeConnection(self.xmlstream)
221        if self._initialized:
222            handler.connectionInitialized()
223
224
225    def _connected(self, xs):
226        """
227        Called when the transport connection has been established.
228
229        Here we optionally set up traffic logging (depending on L{logTraffic})
230        and call each handler's C{makeConnection} method with the L{XmlStream}
231        instance.
232        """
233        def logDataIn(buf):
234            log.msg("RECV: %r" % buf)
235
236        def logDataOut(buf):
237            log.msg("SEND: %r" % buf)
238
239        if self.logTraffic:
240            xs.rawDataInFn = logDataIn
241            xs.rawDataOutFn = logDataOut
242
243        self.xmlstream = xs
244
245        for e in list(self):
246            e.makeConnection(xs)
247
248
249    def _authd(self, xs):
250        """
251        Called when the stream has been initialized.
252
253        Send out cached stanzas and call each handler's
254        C{connectionInitialized} method.
255        """
256
257        xs.addObserver('/iq[@type="result"]', self._onIQResponse)
258        xs.addObserver('/iq[@type="error"]', self._onIQResponse)
259
260        # Flush all pending packets
261        for p in self._packetQueue:
262            xs.send(p)
263        self._packetQueue = []
264        self._initialized = True
265
266        # Notify all child services which implement
267        # the IService interface
268        for e in list(self):
269            e.connectionInitialized()
270
271
272    def initializationFailed(self, reason):
273        """
274        Called when stream initialization has failed.
275
276        Stream initialization has halted, with the reason indicated by
277        C{reason}. It may be retried by calling the authenticator's
278        C{initializeStream}. See the respective authenticators for details.
279
280        @param reason: A failure instance indicating why stream initialization
281                       failed.
282        @type reason: L{failure.Failure}
283        """
284
285
286    def _disconnected(self, reason):
287        """
288        Called when the stream has been closed.
289
290        From this point on, the manager doesn't interact with the
291        L{XmlStream} anymore and notifies each handler that the connection
292        was lost by calling its C{connectionLost} method.
293        """
294        self.xmlstream = None
295        self._initialized = False
296
297        # Twisted versions before 11.0 passed an XmlStream here.
298        if not hasattr(reason, 'trap'):
299            reason = failure.Failure(ConnectionDone())
300
301        # Notify all child services which implement
302        # the IService interface
303        for e in list(self):
304            e.connectionLost(reason)
305
306        # This errbacks all deferreds of iq's for which no response has
307        # been received with a L{ConnectionLost} failure. Otherwise, the
308        # deferreds will never be fired.
309        iqDeferreds = self._iqDeferreds
310        self._iqDeferreds = {}
311        for d in iqDeferreds.itervalues():
312            d.errback(reason)
313
314
315    def _onIQResponse(self, iq):
316        """
317        Handle iq response by firing associated deferred.
318        """
319        try:
320            d = self._iqDeferreds[iq["id"]]
321        except KeyError:
322            return
323
324        del self._iqDeferreds[iq["id"]]
325        iq.handled = True
326        if iq['type'] == 'error':
327            d.errback(error.exceptionFromStanza(iq))
328        else:
329            d.callback(iq)
330
331
332    def send(self, obj):
333        """
334        Send data over the XML stream.
335
336        When there is no established XML stream, the data is queued and sent
337        out when a new XML stream has been established and initialized.
338
339        @param obj: data to be sent over the XML stream. See
340                    L{xmlstream.XmlStream.send} for details.
341        """
342        if self._initialized:
343            self.xmlstream.send(obj)
344        else:
345            self._packetQueue.append(obj)
346
347
348    def request(self, request):
349        """
350        Send an IQ request and track the response.
351
352        A request is an IQ L{generic.Stanza} of type C{'get'} or C{'set'}. It
353        will have its C{toElement} called to render to a L{domish.Element}
354        which is then sent out over the current stream. If there is no such
355        stream (yet), it is queued and sent whenever a connection is
356        established and initialized, just like L{send}.
357
358        If the request doesn't have an identifier, it will be assigned a fresh
359        one, so the response can be tracked.
360
361        The deferred that is returned will fire with the L{domish.Element}
362        representation of the response if it is a result iq. If the response
363        is an error iq, a corresponding L{error.StanzaError} will be errbacked.
364
365        If the connection is closed before a response was received, the deferred
366        will be errbacked with the reason failure.
367
368        A request may also have a timeout, either by setting a default timeout
369        in L{StreamManager.timeout} or on the C{timeout} attribute of the
370        request.
371
372        @param request: The IQ request.
373        @type request: L{generic.Request}
374        """
375        if (request.stanzaKind != 'iq' or
376            request.stanzaType not in ('get', 'set')):
377            return defer.fail(ValueError("Not a request"))
378
379        element = request.toElement()
380
381        # Make sure we have a trackable id on the stanza
382        if not request.stanzaID:
383            element.addUniqueId()
384            request.stanzaID = element['id']
385
386        # Set up iq response tracking
387        d = defer.Deferred()
388        self._iqDeferreds[element['id']] = d
389
390        timeout = getattr(request, 'timeout', self.timeout)
391
392        if timeout is not None:
393            def onTimeout():
394                del self._iqDeferreds[element['id']]
395                d.errback(xmlstream.TimeoutError("IQ timed out"))
396
397            call = self._reactor.callLater(timeout, onTimeout)
398
399            def cancelTimeout(result):
400                if call.active():
401                    call.cancel()
402
403                return result
404
405            d.addBoth(cancelTimeout)
406        self.send(element)
407        return d
408
409
410
411class IQHandlerMixin(object):
412    """
413    XMPP subprotocol mixin for handle incoming IQ stanzas.
414
415    This matches up the iq with XPath queries to call methods on itself,
416    wrapping the call so that exceptions result in proper error responses,
417    or, when succesful will reply with a response with optional payload.
418
419    Derivatives of this class must provide an
420    L{XmlStream<twisted.words.protocols.jabber.xmlstream.XmlStream>} instance
421    in its C{xmlstream} attribute.
422
423    The optional payload is taken from the result of the handler and is
424    expected to be a child or a list of childs.
425
426    If an exception is raised, or the deferred has its errback called,
427    the exception is checked for being a L{error.StanzaError}. If so,
428    an error response is sent. Any other exception will cause a error
429    response of C{internal-server-error} to be sent.
430
431    A typical way to use this mixin, is to set up L{xpath} observers on the
432    C{xmlstream} to call handleRequest, for example in an overridden
433    L{XMPPHandler.connectionMade}. It is likely a good idea to only listen for
434    incoming iq get and/org iq set requests, and not for any iq, to prevent
435    hijacking incoming responses to outgoing iq requests. An example:
436
437        >>> QUERY_ROSTER = "/query[@xmlns='jabber:iq:roster']"
438        >>> class MyHandler(XMPPHandler, IQHandlerMixin):
439        ...    iqHandlers = {"/iq[@type='get']" + QUERY_ROSTER: 'onRosterGet',
440        ...                  "/iq[@type='set']" + QUERY_ROSTER: 'onRosterSet'}
441        ...    def connectionMade(self):
442        ...        self.xmlstream.addObserver(
443        ...          "/iq[@type='get' or @type='set']" + QUERY_ROSTER,
444        ...          self.handleRequest)
445        ...    def onRosterGet(self, iq):
446        ...        pass
447        ...    def onRosterSet(self, iq):
448        ...        pass
449
450    @cvar iqHandlers: Mapping from XPath queries (as a string) to the method
451                      name that will handle requests that match the query.
452    @type iqHandlers: L{dict}
453    """
454
455    iqHandlers = None
456
457    def handleRequest(self, iq):
458        """
459        Find a handler and wrap the call for sending a response stanza.
460        """
461        def toResult(result, iq):
462            response = toResponse(iq, 'result')
463
464            if result:
465                if IElement.providedBy(result):
466                    response.addChild(result)
467                else:
468                    for element in result:
469                        response.addChild(element)
470
471            return response
472
473        def checkNotImplemented(failure):
474            failure.trap(NotImplementedError)
475            raise error.StanzaError('feature-not-implemented')
476
477        def fromStanzaError(failure, iq):
478            e = failure.trap(error.StanzaError)
479            return failure.value.toResponse(iq)
480
481        def fromOtherError(failure, iq):
482            log.msg("Unhandled error in iq handler:", isError=True)
483            log.err(failure)
484            return error.StanzaError('internal-server-error').toResponse(iq)
485
486        handler = None
487        for queryString, method in self.iqHandlers.iteritems():
488            if xpath.internQuery(queryString).matches(iq):
489                handler = getattr(self, method)
490
491        if handler:
492            d = defer.maybeDeferred(handler, iq)
493        else:
494            d = defer.fail(NotImplementedError())
495
496        d.addCallback(toResult, iq)
497        d.addErrback(checkNotImplemented)
498        d.addErrback(fromStanzaError, iq)
499        d.addErrback(fromOtherError, iq)
500
501        d.addCallback(self.send)
502
503        iq.handled = True
Note: See TracBrowser for help on using the repository browser.