source: wokkel/subprotocols.py @ 166:d9c10a5b5c0d

Last change on this file since 166:d9c10a5b5c0d was 166:d9c10a5b5c0d, checked in by Ralph Meijer <ralphm@…>, 9 years ago

Documentation fixes for pydoctor.

  • Property exe set to *
File size: 15.3 KB
Line 
1# -*- test-case-name: wokkel.test.test_subprotocols -*-
2#
3# Copyright (c) Twisted Matrix Laboratories.
4# See LICENSE for details.
5
6"""
7XMPP subprotocol support.
8"""
9
10__all__ = ['XMPPHandler', 'XMPPHandlerCollection', 'StreamManager',
11           'IQHandlerMixin']
12
13from zope.interface import implements
14
15from twisted.internet import defer
16from twisted.internet.error import ConnectionDone
17from twisted.python import failure, log
18from twisted.python.deprecate import deprecatedModuleAttribute
19from twisted.python.versions import Version
20from twisted.words.protocols.jabber import error, ijabber, xmlstream
21from twisted.words.protocols.jabber.xmlstream import toResponse
22from twisted.words.protocols.jabber.xmlstream import XMPPHandlerCollection
23from twisted.words.xish import xpath
24from twisted.words.xish.domish import IElement
25
26deprecatedModuleAttribute(
27        Version("Wokkel", 0, 7, 0),
28        "Use twisted.words.protocols.jabber.xmlstream.XMPPHandlerCollection "
29                "instead.",
30        __name__,
31        "XMPPHandlerCollection")
32
33class XMPPHandler(object):
34    """
35    XMPP protocol handler.
36
37    Classes derived from this class implement (part of) one or more XMPP
38    extension protocols, and are referred to as a subprotocol implementation.
39    """
40
41    implements(ijabber.IXMPPHandler)
42
43    def __init__(self):
44        self.parent = None
45        self.xmlstream = None
46
47
48    def setHandlerParent(self, parent):
49        self.parent = parent
50        self.parent.addHandler(self)
51
52
53    def disownHandlerParent(self, parent):
54        self.parent.removeHandler(self)
55        self.parent = None
56
57
58    def makeConnection(self, xs):
59        self.xmlstream = xs
60        self.connectionMade()
61
62
63    def connectionMade(self):
64        """
65        Called after a connection has been established.
66
67        Can be overridden to perform work before stream initialization.
68        """
69
70
71    def connectionInitialized(self):
72        """
73        The XML stream has been initialized.
74
75        Can be overridden to perform work after stream initialization, e.g. to
76        set up observers and start exchanging XML stanzas.
77        """
78
79
80    def connectionLost(self, reason):
81        """
82        The XML stream has been closed.
83
84        This method can be extended to inspect the C{reason} argument and
85        act on it.
86        """
87        self.xmlstream = None
88
89
90    def send(self, obj):
91        """
92        Send data over the managed XML stream.
93
94        @note: The stream manager maintains a queue for data sent using this
95               method when there is no current initialized XML stream. This
96               data is then sent as soon as a new stream has been established
97               and initialized. Subsequently, L{connectionInitialized} will be
98               called again. If this queueing is not desired, use C{send} on
99               C{self.xmlstream}.
100
101        @param obj: data to be sent over the XML stream. This is usually an
102                    object providing L{domish.IElement}, or serialized XML. See
103                    L{xmlstream.XmlStream} for details.
104        """
105        self.parent.send(obj)
106
107
108    def request(self, request):
109        """
110        Send an IQ request and track the response.
111
112        This passes the request to the parent for sending and response
113        tracking.
114
115        @see: L{StreamManager.request}.
116        """
117        return self.parent.request(request)
118
119
120
121class StreamManager(XMPPHandlerCollection):
122    """
123    Business logic representing a managed XMPP connection.
124
125    This maintains a single XMPP connection and provides facilities for packet
126    routing and transmission. Business logic modules are objects providing
127    L{IXMPPHandler} (like subclasses of L{XMPPHandler}), and added
128    using L{addHandler}.
129
130    @ivar xmlstream: currently managed XML stream
131    @type xmlstream: L{XmlStream}
132    @ivar logTraffic: if true, log all traffic.
133    @type logTraffic: C{bool}
134    @ivar _initialized: Whether the stream represented by L{xmlstream} has
135                        been initialized. This is used when caching outgoing
136                        stanzas.
137    @type _initialized: C{bool}
138    @ivar _packetQueue: internal buffer of unsent data. See L{send} for details.
139    @type _packetQueue: L{list}
140    @ivar timeout: Default IQ request timeout in seconds.
141    @type timeout: C{int}
142    @ivar _reactor: A provider of L{IReactorTime} to track timeouts.
143    """
144    timeout = None
145    _reactor = None
146
147    logTraffic = False
148
149    def __init__(self, factory, reactor=None):
150        """
151        Construct a stream manager.
152
153        @param factory: The stream factory to connect with.
154        @param reactor: A provider of L{IReactorTime} to track timeouts.
155            If not provided, the global reactor will be used.
156        """
157        XMPPHandlerCollection.__init__(self)
158        self.xmlstream = None
159        self._packetQueue = []
160        self._initialized = False
161
162        factory.addBootstrap(xmlstream.STREAM_CONNECTED_EVENT, self._connected)
163        factory.addBootstrap(xmlstream.STREAM_AUTHD_EVENT, self._authd)
164        factory.addBootstrap(xmlstream.INIT_FAILED_EVENT,
165                             self.initializationFailed)
166        factory.addBootstrap(xmlstream.STREAM_END_EVENT, self._disconnected)
167        self.factory = factory
168
169        if reactor is None:
170            from twisted.internet import reactor
171        self._reactor = reactor
172
173        # Set up IQ response tracking
174        self._iqDeferreds = {}
175
176
177    def addHandler(self, handler):
178        """
179        Add protocol handler.
180
181        When an XML stream has already been established, the handler's
182        C{connectionInitialized} will be called to get it up to speed.
183        """
184        XMPPHandlerCollection.addHandler(self, handler)
185
186        # get protocol handler up to speed when a connection has already
187        # been established
188        if self.xmlstream:
189            handler.makeConnection(self.xmlstream)
190        if self._initialized:
191            handler.connectionInitialized()
192
193
194    def _connected(self, xs):
195        """
196        Called when the transport connection has been established.
197
198        Here we optionally set up traffic logging (depending on L{logTraffic})
199        and call each handler's C{makeConnection} method with the L{XmlStream}
200        instance.
201        """
202        def logDataIn(buf):
203            log.msg("RECV: %r" % buf)
204
205        def logDataOut(buf):
206            log.msg("SEND: %r" % buf)
207
208        if self.logTraffic:
209            xs.rawDataInFn = logDataIn
210            xs.rawDataOutFn = logDataOut
211
212        self.xmlstream = xs
213
214        for e in list(self):
215            e.makeConnection(xs)
216
217
218    def _authd(self, xs):
219        """
220        Called when the stream has been initialized.
221
222        Send out cached stanzas and call each handler's
223        C{connectionInitialized} method.
224        """
225
226        xs.addObserver('/iq[@type="result"]', self._onIQResponse)
227        xs.addObserver('/iq[@type="error"]', self._onIQResponse)
228
229        # Flush all pending packets
230        for p in self._packetQueue:
231            xs.send(p)
232        self._packetQueue = []
233        self._initialized = True
234
235        # Notify all child services which implement
236        # the IService interface
237        for e in list(self):
238            e.connectionInitialized()
239
240
241    def initializationFailed(self, reason):
242        """
243        Called when stream initialization has failed.
244
245        Stream initialization has halted, with the reason indicated by
246        C{reason}. It may be retried by calling the authenticator's
247        C{initializeStream}. See the respective authenticators for details.
248
249        @param reason: A failure instance indicating why stream initialization
250                       failed.
251        @type reason: L{failure.Failure}
252        """
253
254
255    def _disconnected(self, reason):
256        """
257        Called when the stream has been closed.
258
259        From this point on, the manager doesn't interact with the
260        L{XmlStream} anymore and notifies each handler that the connection
261        was lost by calling its C{connectionLost} method.
262        """
263        self.xmlstream = None
264        self._initialized = False
265
266        # Twisted versions before 11.0 passed an XmlStream here.
267        if not hasattr(reason, 'trap'):
268            reason = failure.Failure(ConnectionDone())
269
270        # Notify all child services which implement
271        # the IService interface
272        for e in list(self):
273            e.connectionLost(reason)
274
275        # This errbacks all deferreds of iq's for which no response has
276        # been received with a L{ConnectionLost} failure. Otherwise, the
277        # deferreds will never be fired.
278        iqDeferreds = self._iqDeferreds
279        self._iqDeferreds = {}
280        for d in iqDeferreds.itervalues():
281            d.errback(reason)
282
283
284    def _onIQResponse(self, iq):
285        """
286        Handle iq response by firing associated deferred.
287        """
288        try:
289            d = self._iqDeferreds[iq["id"]]
290        except KeyError:
291            return
292
293        del self._iqDeferreds[iq["id"]]
294        iq.handled = True
295        if iq['type'] == 'error':
296            d.errback(error.exceptionFromStanza(iq))
297        else:
298            d.callback(iq)
299
300
301    def send(self, obj):
302        """
303        Send data over the XML stream.
304
305        When there is no established XML stream, the data is queued and sent
306        out when a new XML stream has been established and initialized.
307
308        @param obj: data to be sent over the XML stream. See
309                    L{xmlstream.XmlStream.send} for details.
310        """
311        if self._initialized:
312            self.xmlstream.send(obj)
313        else:
314            self._packetQueue.append(obj)
315
316
317    def request(self, request):
318        """
319        Send an IQ request and track the response.
320
321        A request is an IQ L{generic.Stanza} of type C{'get'} or C{'set'}. It
322        will have its C{toElement} called to render to a
323        L{Element<twisted.words.xish.domish.Element>} which is then sent out
324        over the current stream. If there is no such stream (yet), it is queued
325        and sent whenever a connection is established and initialized, just
326        like L{send}.
327
328        If the request doesn't have an identifier, it will be assigned a fresh
329        one, so the response can be tracked.
330
331        The deferred that is returned will fire with the
332        L{Element<twisted.words.xish.domish.Element>} representation of the
333        response if it is a result iq. If the response is an error iq, a
334        corresponding L{error.StanzaError} will be errbacked.
335
336        If the connection is closed before a response was received, the deferred
337        will be errbacked with the reason failure.
338
339        A request may also have a timeout, either by setting a default timeout
340        in L{StreamManager}'s C{timeout} attribute or on the C{timeout}
341        attribute of the request.
342
343        @param request: The IQ request.
344        @type request: L{generic.Request}
345        """
346        if (request.stanzaKind != 'iq' or
347            request.stanzaType not in ('get', 'set')):
348            return defer.fail(ValueError("Not a request"))
349
350        element = request.toElement()
351
352        # Make sure we have a trackable id on the stanza
353        if not request.stanzaID:
354            element.addUniqueId()
355            request.stanzaID = element['id']
356
357        # Set up iq response tracking
358        d = defer.Deferred()
359        self._iqDeferreds[element['id']] = d
360
361        timeout = getattr(request, 'timeout', self.timeout)
362
363        if timeout is not None:
364            def onTimeout():
365                del self._iqDeferreds[element['id']]
366                d.errback(xmlstream.TimeoutError("IQ timed out"))
367
368            call = self._reactor.callLater(timeout, onTimeout)
369
370            def cancelTimeout(result):
371                if call.active():
372                    call.cancel()
373
374                return result
375
376            d.addBoth(cancelTimeout)
377        self.send(element)
378        return d
379
380
381
382class IQHandlerMixin(object):
383    """
384    XMPP subprotocol mixin for handle incoming IQ stanzas.
385
386    This matches up the iq with XPath queries to call methods on itself,
387    wrapping the call so that exceptions result in proper error responses,
388    or, when succesful will reply with a response with optional payload.
389
390    Derivatives of this class must provide an
391    L{XmlStream<twisted.words.protocols.jabber.xmlstream.XmlStream>} instance
392    in its C{xmlstream} attribute.
393
394    The optional payload is taken from the result of the handler and is
395    expected to be a child or a list of childs.
396
397    If an exception is raised, or the deferred has its errback called,
398    the exception is checked for being a L{error.StanzaError}. If so,
399    an error response is sent. Any other exception will cause a error
400    response of C{internal-server-error} to be sent.
401
402    A typical way to use this mixin, is to set up L{xpath} observers on the
403    C{xmlstream} to call handleRequest, for example in an overridden
404    L{XMPPHandler.connectionMade}. It is likely a good idea to only listen for
405    incoming iq get and/org iq set requests, and not for any iq, to prevent
406    hijacking incoming responses to outgoing iq requests. An example:
407
408        >>> QUERY_ROSTER = "/query[@xmlns='jabber:iq:roster']"
409        >>> class MyHandler(XMPPHandler, IQHandlerMixin):
410        ...    iqHandlers = {"/iq[@type='get']" + QUERY_ROSTER: 'onRosterGet',
411        ...                  "/iq[@type='set']" + QUERY_ROSTER: 'onRosterSet'}
412        ...    def connectionMade(self):
413        ...        self.xmlstream.addObserver(
414        ...          "/iq[@type='get' or @type='set']" + QUERY_ROSTER,
415        ...          self.handleRequest)
416        ...    def onRosterGet(self, iq):
417        ...        pass
418        ...    def onRosterSet(self, iq):
419        ...        pass
420
421    @cvar iqHandlers: Mapping from XPath queries (as a string) to the method
422                      name that will handle requests that match the query.
423    @type iqHandlers: C{dict}
424    """
425
426    iqHandlers = None
427
428    def handleRequest(self, iq):
429        """
430        Find a handler and wrap the call for sending a response stanza.
431        """
432        def toResult(result, iq):
433            response = toResponse(iq, 'result')
434
435            if result:
436                if IElement.providedBy(result):
437                    response.addChild(result)
438                else:
439                    for element in result:
440                        response.addChild(element)
441
442            return response
443
444        def checkNotImplemented(failure):
445            failure.trap(NotImplementedError)
446            raise error.StanzaError('feature-not-implemented')
447
448        def fromStanzaError(failure, iq):
449            failure.trap(error.StanzaError)
450            return failure.value.toResponse(iq)
451
452        def fromOtherError(failure, iq):
453            log.msg("Unhandled error in iq handler:", isError=True)
454            log.err(failure)
455            return error.StanzaError('internal-server-error').toResponse(iq)
456
457        handler = None
458        for queryString, method in self.iqHandlers.iteritems():
459            if xpath.internQuery(queryString).matches(iq):
460                handler = getattr(self, method)
461
462        if handler:
463            d = defer.maybeDeferred(handler, iq)
464        else:
465            d = defer.fail(NotImplementedError())
466
467        d.addCallback(toResult, iq)
468        d.addErrback(checkNotImplemented)
469        d.addErrback(fromStanzaError, iq)
470        d.addErrback(fromOtherError, iq)
471
472        d.addCallback(self.send)
473
474        iq.handled = True
Note: See TracBrowser for help on using the repository browser.