source: wokkel/subprotocols.py @ 165:76a61f5aa343

Last change on this file since 165:76a61f5aa343 was 165:76a61f5aa343, checked in by Ralph Meijer <ralphm@…>, 8 years ago

Cleanups leading up to Wokkel 0.7.0.

As we now depend on Twisted 10.0.0 or higher, the following classes and
interfaces were deprecated:

This also resolves all Pyflakes warnings, changes links for www.xmpp.org to
xmpp.org and fixes the copyright notice in LICENSE to include 2012.

  • Property exe set to *
File size: 15.2 KB
Line 
1# -*- test-case-name: wokkel.test.test_subprotocols -*-
2#
3# Copyright (c) Twisted Matrix Laboratories.
4# See LICENSE for details.
5
6"""
7XMPP subprotocol support.
8"""
9
10__all__ = ['XMPPHandler', 'XMPPHandlerCollection', 'StreamManager',
11           'IQHandlerMixin']
12
13from zope.interface import implements
14
15from twisted.internet import defer
16from twisted.internet.error import ConnectionDone
17from twisted.python import failure, log
18from twisted.python.deprecate import deprecatedModuleAttribute
19from twisted.python.versions import Version
20from twisted.words.protocols.jabber import error, ijabber, xmlstream
21from twisted.words.protocols.jabber.xmlstream import toResponse
22from twisted.words.protocols.jabber.xmlstream import XMPPHandlerCollection
23from twisted.words.xish import xpath
24from twisted.words.xish.domish import IElement
25
26deprecatedModuleAttribute(
27        Version("Wokkel", 0, 7, 0),
28        "Use twisted.words.protocols.jabber.xmlstream.XMPPHandlerCollection "
29                "instead.",
30        __name__,
31        "XMPPHandlerCollection")
32
33class XMPPHandler(object):
34    """
35    XMPP protocol handler.
36
37    Classes derived from this class implement (part of) one or more XMPP
38    extension protocols, and are referred to as a subprotocol implementation.
39    """
40
41    implements(ijabber.IXMPPHandler)
42
43    def __init__(self):
44        self.parent = None
45        self.xmlstream = None
46
47
48    def setHandlerParent(self, parent):
49        self.parent = parent
50        self.parent.addHandler(self)
51
52
53    def disownHandlerParent(self, parent):
54        self.parent.removeHandler(self)
55        self.parent = None
56
57
58    def makeConnection(self, xs):
59        self.xmlstream = xs
60        self.connectionMade()
61
62
63    def connectionMade(self):
64        """
65        Called after a connection has been established.
66
67        Can be overridden to perform work before stream initialization.
68        """
69
70
71    def connectionInitialized(self):
72        """
73        The XML stream has been initialized.
74
75        Can be overridden to perform work after stream initialization, e.g. to
76        set up observers and start exchanging XML stanzas.
77        """
78
79
80    def connectionLost(self, reason):
81        """
82        The XML stream has been closed.
83
84        This method can be extended to inspect the C{reason} argument and
85        act on it.
86        """
87        self.xmlstream = None
88
89
90    def send(self, obj):
91        """
92        Send data over the managed XML stream.
93
94        @note: The stream manager maintains a queue for data sent using this
95               method when there is no current initialized XML stream. This
96               data is then sent as soon as a new stream has been established
97               and initialized. Subsequently, L{connectionInitialized} will be
98               called again. If this queueing is not desired, use C{send} on
99               C{self.xmlstream}.
100
101        @param obj: data to be sent over the XML stream. This is usually an
102                    object providing L{domish.IElement}, or serialized XML. See
103                    L{xmlstream.XmlStream} for details.
104        """
105        self.parent.send(obj)
106
107
108    def request(self, request):
109        """
110        Send an IQ request and track the response.
111
112        This passes the request to the parent for sending and response
113        tracking.
114
115        @see: L{StreamManager.request}.
116        """
117        return self.parent.request(request)
118
119
120
121class StreamManager(XMPPHandlerCollection):
122    """
123    Business logic representing a managed XMPP connection.
124
125    This maintains a single XMPP connection and provides facilities for packet
126    routing and transmission. Business logic modules are objects providing
127    L{IXMPPHandler} (like subclasses of L{XMPPHandler}), and added
128    using L{addHandler}.
129
130    @ivar xmlstream: currently managed XML stream
131    @type xmlstream: L{XmlStream}
132    @ivar logTraffic: if true, log all traffic.
133    @type logTraffic: L{bool}
134    @ivar _initialized: Whether the stream represented by L{xmlstream} has
135                        been initialized. This is used when caching outgoing
136                        stanzas.
137    @type _initialized: C{bool}
138    @ivar _packetQueue: internal buffer of unsent data. See L{send} for details.
139    @type _packetQueue: L{list}
140    @ivar timeout: Default IQ request timeout in seconds.
141    @type timeout: C{int}
142    @ivar _reactor: A provider of L{IReactorTime} to track timeouts.
143    """
144    timeout = None
145    _reactor = None
146
147    logTraffic = False
148
149    def __init__(self, factory, reactor=None):
150        """
151        Construct a stream manager.
152
153        @param factory: The stream factory to connect with.
154        @param reactor: A provider of L{IReactorTime} to track timeouts.
155            If not provided, the global reactor will be used.
156        """
157        XMPPHandlerCollection.__init__(self)
158        self.xmlstream = None
159        self._packetQueue = []
160        self._initialized = False
161
162        factory.addBootstrap(xmlstream.STREAM_CONNECTED_EVENT, self._connected)
163        factory.addBootstrap(xmlstream.STREAM_AUTHD_EVENT, self._authd)
164        factory.addBootstrap(xmlstream.INIT_FAILED_EVENT,
165                             self.initializationFailed)
166        factory.addBootstrap(xmlstream.STREAM_END_EVENT, self._disconnected)
167        self.factory = factory
168
169        if reactor is None:
170            from twisted.internet import reactor
171        self._reactor = reactor
172
173        # Set up IQ response tracking
174        self._iqDeferreds = {}
175
176
177    def addHandler(self, handler):
178        """
179        Add protocol handler.
180
181        When an XML stream has already been established, the handler's
182        C{connectionInitialized} will be called to get it up to speed.
183        """
184        XMPPHandlerCollection.addHandler(self, handler)
185
186        # get protocol handler up to speed when a connection has already
187        # been established
188        if self.xmlstream:
189            handler.makeConnection(self.xmlstream)
190        if self._initialized:
191            handler.connectionInitialized()
192
193
194    def _connected(self, xs):
195        """
196        Called when the transport connection has been established.
197
198        Here we optionally set up traffic logging (depending on L{logTraffic})
199        and call each handler's C{makeConnection} method with the L{XmlStream}
200        instance.
201        """
202        def logDataIn(buf):
203            log.msg("RECV: %r" % buf)
204
205        def logDataOut(buf):
206            log.msg("SEND: %r" % buf)
207
208        if self.logTraffic:
209            xs.rawDataInFn = logDataIn
210            xs.rawDataOutFn = logDataOut
211
212        self.xmlstream = xs
213
214        for e in list(self):
215            e.makeConnection(xs)
216
217
218    def _authd(self, xs):
219        """
220        Called when the stream has been initialized.
221
222        Send out cached stanzas and call each handler's
223        C{connectionInitialized} method.
224        """
225
226        xs.addObserver('/iq[@type="result"]', self._onIQResponse)
227        xs.addObserver('/iq[@type="error"]', self._onIQResponse)
228
229        # Flush all pending packets
230        for p in self._packetQueue:
231            xs.send(p)
232        self._packetQueue = []
233        self._initialized = True
234
235        # Notify all child services which implement
236        # the IService interface
237        for e in list(self):
238            e.connectionInitialized()
239
240
241    def initializationFailed(self, reason):
242        """
243        Called when stream initialization has failed.
244
245        Stream initialization has halted, with the reason indicated by
246        C{reason}. It may be retried by calling the authenticator's
247        C{initializeStream}. See the respective authenticators for details.
248
249        @param reason: A failure instance indicating why stream initialization
250                       failed.
251        @type reason: L{failure.Failure}
252        """
253
254
255    def _disconnected(self, reason):
256        """
257        Called when the stream has been closed.
258
259        From this point on, the manager doesn't interact with the
260        L{XmlStream} anymore and notifies each handler that the connection
261        was lost by calling its C{connectionLost} method.
262        """
263        self.xmlstream = None
264        self._initialized = False
265
266        # Twisted versions before 11.0 passed an XmlStream here.
267        if not hasattr(reason, 'trap'):
268            reason = failure.Failure(ConnectionDone())
269
270        # Notify all child services which implement
271        # the IService interface
272        for e in list(self):
273            e.connectionLost(reason)
274
275        # This errbacks all deferreds of iq's for which no response has
276        # been received with a L{ConnectionLost} failure. Otherwise, the
277        # deferreds will never be fired.
278        iqDeferreds = self._iqDeferreds
279        self._iqDeferreds = {}
280        for d in iqDeferreds.itervalues():
281            d.errback(reason)
282
283
284    def _onIQResponse(self, iq):
285        """
286        Handle iq response by firing associated deferred.
287        """
288        try:
289            d = self._iqDeferreds[iq["id"]]
290        except KeyError:
291            return
292
293        del self._iqDeferreds[iq["id"]]
294        iq.handled = True
295        if iq['type'] == 'error':
296            d.errback(error.exceptionFromStanza(iq))
297        else:
298            d.callback(iq)
299
300
301    def send(self, obj):
302        """
303        Send data over the XML stream.
304
305        When there is no established XML stream, the data is queued and sent
306        out when a new XML stream has been established and initialized.
307
308        @param obj: data to be sent over the XML stream. See
309                    L{xmlstream.XmlStream.send} for details.
310        """
311        if self._initialized:
312            self.xmlstream.send(obj)
313        else:
314            self._packetQueue.append(obj)
315
316
317    def request(self, request):
318        """
319        Send an IQ request and track the response.
320
321        A request is an IQ L{generic.Stanza} of type C{'get'} or C{'set'}. It
322        will have its C{toElement} called to render to a L{domish.Element}
323        which is then sent out over the current stream. If there is no such
324        stream (yet), it is queued and sent whenever a connection is
325        established and initialized, just like L{send}.
326
327        If the request doesn't have an identifier, it will be assigned a fresh
328        one, so the response can be tracked.
329
330        The deferred that is returned will fire with the L{domish.Element}
331        representation of the response if it is a result iq. If the response
332        is an error iq, a corresponding L{error.StanzaError} will be errbacked.
333
334        If the connection is closed before a response was received, the deferred
335        will be errbacked with the reason failure.
336
337        A request may also have a timeout, either by setting a default timeout
338        in L{StreamManager.timeout} or on the C{timeout} attribute of the
339        request.
340
341        @param request: The IQ request.
342        @type request: L{generic.Request}
343        """
344        if (request.stanzaKind != 'iq' or
345            request.stanzaType not in ('get', 'set')):
346            return defer.fail(ValueError("Not a request"))
347
348        element = request.toElement()
349
350        # Make sure we have a trackable id on the stanza
351        if not request.stanzaID:
352            element.addUniqueId()
353            request.stanzaID = element['id']
354
355        # Set up iq response tracking
356        d = defer.Deferred()
357        self._iqDeferreds[element['id']] = d
358
359        timeout = getattr(request, 'timeout', self.timeout)
360
361        if timeout is not None:
362            def onTimeout():
363                del self._iqDeferreds[element['id']]
364                d.errback(xmlstream.TimeoutError("IQ timed out"))
365
366            call = self._reactor.callLater(timeout, onTimeout)
367
368            def cancelTimeout(result):
369                if call.active():
370                    call.cancel()
371
372                return result
373
374            d.addBoth(cancelTimeout)
375        self.send(element)
376        return d
377
378
379
380class IQHandlerMixin(object):
381    """
382    XMPP subprotocol mixin for handle incoming IQ stanzas.
383
384    This matches up the iq with XPath queries to call methods on itself,
385    wrapping the call so that exceptions result in proper error responses,
386    or, when succesful will reply with a response with optional payload.
387
388    Derivatives of this class must provide an
389    L{XmlStream<twisted.words.protocols.jabber.xmlstream.XmlStream>} instance
390    in its C{xmlstream} attribute.
391
392    The optional payload is taken from the result of the handler and is
393    expected to be a child or a list of childs.
394
395    If an exception is raised, or the deferred has its errback called,
396    the exception is checked for being a L{error.StanzaError}. If so,
397    an error response is sent. Any other exception will cause a error
398    response of C{internal-server-error} to be sent.
399
400    A typical way to use this mixin, is to set up L{xpath} observers on the
401    C{xmlstream} to call handleRequest, for example in an overridden
402    L{XMPPHandler.connectionMade}. It is likely a good idea to only listen for
403    incoming iq get and/org iq set requests, and not for any iq, to prevent
404    hijacking incoming responses to outgoing iq requests. An example:
405
406        >>> QUERY_ROSTER = "/query[@xmlns='jabber:iq:roster']"
407        >>> class MyHandler(XMPPHandler, IQHandlerMixin):
408        ...    iqHandlers = {"/iq[@type='get']" + QUERY_ROSTER: 'onRosterGet',
409        ...                  "/iq[@type='set']" + QUERY_ROSTER: 'onRosterSet'}
410        ...    def connectionMade(self):
411        ...        self.xmlstream.addObserver(
412        ...          "/iq[@type='get' or @type='set']" + QUERY_ROSTER,
413        ...          self.handleRequest)
414        ...    def onRosterGet(self, iq):
415        ...        pass
416        ...    def onRosterSet(self, iq):
417        ...        pass
418
419    @cvar iqHandlers: Mapping from XPath queries (as a string) to the method
420                      name that will handle requests that match the query.
421    @type iqHandlers: L{dict}
422    """
423
424    iqHandlers = None
425
426    def handleRequest(self, iq):
427        """
428        Find a handler and wrap the call for sending a response stanza.
429        """
430        def toResult(result, iq):
431            response = toResponse(iq, 'result')
432
433            if result:
434                if IElement.providedBy(result):
435                    response.addChild(result)
436                else:
437                    for element in result:
438                        response.addChild(element)
439
440            return response
441
442        def checkNotImplemented(failure):
443            failure.trap(NotImplementedError)
444            raise error.StanzaError('feature-not-implemented')
445
446        def fromStanzaError(failure, iq):
447            failure.trap(error.StanzaError)
448            return failure.value.toResponse(iq)
449
450        def fromOtherError(failure, iq):
451            log.msg("Unhandled error in iq handler:", isError=True)
452            log.err(failure)
453            return error.StanzaError('internal-server-error').toResponse(iq)
454
455        handler = None
456        for queryString, method in self.iqHandlers.iteritems():
457            if xpath.internQuery(queryString).matches(iq):
458                handler = getattr(self, method)
459
460        if handler:
461            d = defer.maybeDeferred(handler, iq)
462        else:
463            d = defer.fail(NotImplementedError())
464
465        d.addCallback(toResult, iq)
466        d.addErrback(checkNotImplemented)
467        d.addErrback(fromStanzaError, iq)
468        d.addErrback(fromOtherError, iq)
469
470        d.addCallback(self.send)
471
472        iq.handled = True
Note: See TracBrowser for help on using the repository browser.