source: wokkel/subprotocols.py @ 194:f1b586df427b

Last change on this file since 194:f1b586df427b was 194:f1b586df427b, checked in by Ralph Meijer <ralphm@…>, 4 years ago

imported patch py3-subprotocols.patch

  • Property exe set to *
File size: 15.4 KB
Line 
1# -*- test-case-name: wokkel.test.test_subprotocols -*-
2#
3# Copyright (c) Twisted Matrix Laboratories.
4# See LICENSE for details.
5
6"""
7XMPP subprotocol support.
8"""
9
10from __future__ import division, absolute_import
11
12from zope.interface import implementer
13
14from twisted.internet import defer
15from twisted.internet.error import ConnectionDone
16from twisted.python import failure, log
17from twisted.python.compat import iteritems, itervalues
18from twisted.python.deprecate import deprecatedModuleAttribute
19from twisted.python.versions import Version
20from twisted.words.protocols.jabber import error, ijabber, xmlstream
21from twisted.words.protocols.jabber.xmlstream import toResponse
22from twisted.words.protocols.jabber.xmlstream import XMPPHandlerCollection
23from twisted.words.xish import xpath
24from twisted.words.xish.domish import IElement
25
26deprecatedModuleAttribute(
27        Version("Wokkel", 0, 7, 0),
28        "Use twisted.words.protocols.jabber.xmlstream.XMPPHandlerCollection "
29                "instead.",
30        __name__,
31        "XMPPHandlerCollection")
32
33@implementer(ijabber.IXMPPHandler)
34class XMPPHandler(object):
35    """
36    XMPP protocol handler.
37
38    Classes derived from this class implement (part of) one or more XMPP
39    extension protocols, and are referred to as a subprotocol implementation.
40    """
41
42    def __init__(self):
43        self.parent = None
44        self.xmlstream = None
45
46
47    def setHandlerParent(self, parent):
48        self.parent = parent
49        self.parent.addHandler(self)
50
51
52    def disownHandlerParent(self, parent):
53        self.parent.removeHandler(self)
54        self.parent = None
55
56
57    def makeConnection(self, xs):
58        self.xmlstream = xs
59        self.connectionMade()
60
61
62    def connectionMade(self):
63        """
64        Called after a connection has been established.
65
66        Can be overridden to perform work before stream initialization.
67        """
68
69
70    def connectionInitialized(self):
71        """
72        The XML stream has been initialized.
73
74        Can be overridden to perform work after stream initialization, e.g. to
75        set up observers and start exchanging XML stanzas.
76        """
77
78
79    def connectionLost(self, reason):
80        """
81        The XML stream has been closed.
82
83        This method can be extended to inspect the C{reason} argument and
84        act on it.
85        """
86        self.xmlstream = None
87
88
89    def send(self, obj):
90        """
91        Send data over the managed XML stream.
92
93        @note: The stream manager maintains a queue for data sent using this
94               method when there is no current initialized XML stream. This
95               data is then sent as soon as a new stream has been established
96               and initialized. Subsequently, L{connectionInitialized} will be
97               called again. If this queueing is not desired, use C{send} on
98               C{self.xmlstream}.
99
100        @param obj: data to be sent over the XML stream. This is usually an
101                    object providing L{domish.IElement}, or serialized XML. See
102                    L{xmlstream.XmlStream} for details.
103        """
104        self.parent.send(obj)
105
106
107    def request(self, request):
108        """
109        Send an IQ request and track the response.
110
111        This passes the request to the parent for sending and response
112        tracking.
113
114        @see: L{StreamManager.request}.
115        """
116        return self.parent.request(request)
117
118
119
120class StreamManager(XMPPHandlerCollection):
121    """
122    Business logic representing a managed XMPP connection.
123
124    This maintains a single XMPP connection and provides facilities for packet
125    routing and transmission. Business logic modules are objects providing
126    L{IXMPPHandler} (like subclasses of L{XMPPHandler}), and added
127    using L{addHandler}.
128
129    @ivar xmlstream: currently managed XML stream
130    @type xmlstream: L{XmlStream}
131    @ivar logTraffic: if true, log all traffic.
132    @type logTraffic: C{bool}
133    @ivar _initialized: Whether the stream represented by L{xmlstream} has
134                        been initialized. This is used when caching outgoing
135                        stanzas.
136    @type _initialized: C{bool}
137    @ivar _packetQueue: internal buffer of unsent data. See L{send} for details.
138    @type _packetQueue: L{list}
139    @ivar timeout: Default IQ request timeout in seconds.
140    @type timeout: C{int}
141    @ivar _reactor: A provider of L{IReactorTime} to track timeouts.
142    """
143    timeout = None
144    _reactor = None
145
146    logTraffic = False
147
148    def __init__(self, factory, reactor=None):
149        """
150        Construct a stream manager.
151
152        @param factory: The stream factory to connect with.
153        @param reactor: A provider of L{IReactorTime} to track timeouts.
154            If not provided, the global reactor will be used.
155        """
156        XMPPHandlerCollection.__init__(self)
157        self.xmlstream = None
158        self._packetQueue = []
159        self._initialized = False
160
161        factory.addBootstrap(xmlstream.STREAM_CONNECTED_EVENT, self._connected)
162        factory.addBootstrap(xmlstream.STREAM_AUTHD_EVENT, self._authd)
163        factory.addBootstrap(xmlstream.INIT_FAILED_EVENT,
164                             self.initializationFailed)
165        factory.addBootstrap(xmlstream.STREAM_END_EVENT, self._disconnected)
166        self.factory = factory
167
168        if reactor is None:
169            from twisted.internet import reactor
170        self._reactor = reactor
171
172        # Set up IQ response tracking
173        self._iqDeferreds = {}
174
175
176    def addHandler(self, handler):
177        """
178        Add protocol handler.
179
180        When an XML stream has already been established, the handler's
181        C{connectionInitialized} will be called to get it up to speed.
182        """
183        XMPPHandlerCollection.addHandler(self, handler)
184
185        # get protocol handler up to speed when a connection has already
186        # been established
187        if self.xmlstream:
188            handler.makeConnection(self.xmlstream)
189        if self._initialized:
190            handler.connectionInitialized()
191
192
193    def _connected(self, xs):
194        """
195        Called when the transport connection has been established.
196
197        Here we optionally set up traffic logging (depending on L{logTraffic})
198        and call each handler's C{makeConnection} method with the L{XmlStream}
199        instance.
200        """
201        def logDataIn(buf):
202            log.msg("RECV: %r" % buf)
203
204        def logDataOut(buf):
205            log.msg("SEND: %r" % buf)
206
207        if self.logTraffic:
208            xs.rawDataInFn = logDataIn
209            xs.rawDataOutFn = logDataOut
210
211        self.xmlstream = xs
212
213        for e in list(self):
214            e.makeConnection(xs)
215
216
217    def _authd(self, xs):
218        """
219        Called when the stream has been initialized.
220
221        Send out cached stanzas and call each handler's
222        C{connectionInitialized} method.
223        """
224
225        xs.addObserver('/iq[@type="result"]', self._onIQResponse)
226        xs.addObserver('/iq[@type="error"]', self._onIQResponse)
227
228        # Flush all pending packets
229        for p in self._packetQueue:
230            xs.send(p)
231        self._packetQueue = []
232        self._initialized = True
233
234        # Notify all child services which implement
235        # the IService interface
236        for e in list(self):
237            e.connectionInitialized()
238
239
240    def initializationFailed(self, reason):
241        """
242        Called when stream initialization has failed.
243
244        Stream initialization has halted, with the reason indicated by
245        C{reason}. It may be retried by calling the authenticator's
246        C{initializeStream}. See the respective authenticators for details.
247
248        @param reason: A failure instance indicating why stream initialization
249                       failed.
250        @type reason: L{failure.Failure}
251        """
252
253
254    def _disconnected(self, reason):
255        """
256        Called when the stream has been closed.
257
258        From this point on, the manager doesn't interact with the
259        L{XmlStream} anymore and notifies each handler that the connection
260        was lost by calling its C{connectionLost} method.
261        """
262        self.xmlstream = None
263        self._initialized = False
264
265        # Twisted versions before 11.0 passed an XmlStream here.
266        if not hasattr(reason, 'trap'):
267            reason = failure.Failure(ConnectionDone())
268
269        # Notify all child services which implement
270        # the IService interface
271        for e in list(self):
272            e.connectionLost(reason)
273
274        # This errbacks all deferreds of iq's for which no response has
275        # been received with a L{ConnectionLost} failure. Otherwise, the
276        # deferreds will never be fired.
277        iqDeferreds = self._iqDeferreds
278        self._iqDeferreds = {}
279        for d in itervalues(iqDeferreds):
280            d.errback(reason)
281
282
283    def _onIQResponse(self, iq):
284        """
285        Handle iq response by firing associated deferred.
286        """
287        try:
288            d = self._iqDeferreds[iq["id"]]
289        except KeyError:
290            return
291
292        del self._iqDeferreds[iq["id"]]
293        iq.handled = True
294        if iq['type'] == 'error':
295            d.errback(error.exceptionFromStanza(iq))
296        else:
297            d.callback(iq)
298
299
300    def send(self, obj):
301        """
302        Send data over the XML stream.
303
304        When there is no established XML stream, the data is queued and sent
305        out when a new XML stream has been established and initialized.
306
307        @param obj: data to be sent over the XML stream. See
308                    L{xmlstream.XmlStream.send} for details.
309        """
310        if self._initialized:
311            self.xmlstream.send(obj)
312        else:
313            self._packetQueue.append(obj)
314
315
316    def request(self, request):
317        """
318        Send an IQ request and track the response.
319
320        A request is an IQ L{generic.Stanza} of type C{'get'} or C{'set'}. It
321        will have its C{toElement} called to render to a
322        L{Element<twisted.words.xish.domish.Element>} which is then sent out
323        over the current stream. If there is no such stream (yet), it is queued
324        and sent whenever a connection is established and initialized, just
325        like L{send}.
326
327        If the request doesn't have an identifier, it will be assigned a fresh
328        one, so the response can be tracked.
329
330        The deferred that is returned will fire with the
331        L{Element<twisted.words.xish.domish.Element>} representation of the
332        response if it is a result iq. If the response is an error iq, a
333        corresponding L{error.StanzaError} will be errbacked.
334
335        If the connection is closed before a response was received, the deferred
336        will be errbacked with the reason failure.
337
338        A request may also have a timeout, either by setting a default timeout
339        in L{StreamManager}'s C{timeout} attribute or on the C{timeout}
340        attribute of the request.
341
342        @param request: The IQ request.
343        @type request: L{generic.Request}
344        """
345        if (request.stanzaKind != 'iq' or
346            request.stanzaType not in ('get', 'set')):
347            return defer.fail(ValueError("Not a request"))
348
349        element = request.toElement()
350
351        # Make sure we have a trackable id on the stanza
352        if not request.stanzaID:
353            element.addUniqueId()
354            request.stanzaID = element['id']
355
356        # Set up iq response tracking
357        d = defer.Deferred()
358        self._iqDeferreds[element['id']] = d
359
360        timeout = getattr(request, 'timeout', self.timeout)
361
362        if timeout is not None:
363            def onTimeout():
364                del self._iqDeferreds[element['id']]
365                d.errback(xmlstream.TimeoutError("IQ timed out"))
366
367            call = self._reactor.callLater(timeout, onTimeout)
368
369            def cancelTimeout(result):
370                if call.active():
371                    call.cancel()
372
373                return result
374
375            d.addBoth(cancelTimeout)
376        self.send(element)
377        return d
378
379
380
381class IQHandlerMixin(object):
382    """
383    XMPP subprotocol mixin for handle incoming IQ stanzas.
384
385    This matches up the iq with XPath queries to call methods on itself,
386    wrapping the call so that exceptions result in proper error responses,
387    or, when succesful will reply with a response with optional payload.
388
389    Derivatives of this class must provide an
390    L{XmlStream<twisted.words.protocols.jabber.xmlstream.XmlStream>} instance
391    in its C{xmlstream} attribute.
392
393    The optional payload is taken from the result of the handler and is
394    expected to be a child or a list of childs.
395
396    If an exception is raised, or the deferred has its errback called,
397    the exception is checked for being a L{error.StanzaError}. If so,
398    an error response is sent. Any other exception will cause a error
399    response of C{internal-server-error} to be sent.
400
401    A typical way to use this mixin, is to set up L{xpath} observers on the
402    C{xmlstream} to call handleRequest, for example in an overridden
403    L{XMPPHandler.connectionMade}. It is likely a good idea to only listen for
404    incoming iq get and/org iq set requests, and not for any iq, to prevent
405    hijacking incoming responses to outgoing iq requests. An example:
406
407        >>> QUERY_ROSTER = "/query[@xmlns='jabber:iq:roster']"
408        >>> class MyHandler(XMPPHandler, IQHandlerMixin):
409        ...    iqHandlers = {"/iq[@type='get']" + QUERY_ROSTER: 'onRosterGet',
410        ...                  "/iq[@type='set']" + QUERY_ROSTER: 'onRosterSet'}
411        ...    def connectionMade(self):
412        ...        self.xmlstream.addObserver(
413        ...          "/iq[@type='get' or @type='set']" + QUERY_ROSTER,
414        ...          self.handleRequest)
415        ...    def onRosterGet(self, iq):
416        ...        pass
417        ...    def onRosterSet(self, iq):
418        ...        pass
419
420    @cvar iqHandlers: Mapping from XPath queries (as a string) to the method
421                      name that will handle requests that match the query.
422    @type iqHandlers: C{dict}
423    """
424
425    iqHandlers = None
426
427    def handleRequest(self, iq):
428        """
429        Find a handler and wrap the call for sending a response stanza.
430        """
431        def toResult(result, iq):
432            response = toResponse(iq, 'result')
433
434            if result:
435                if IElement.providedBy(result):
436                    response.addChild(result)
437                else:
438                    for element in result:
439                        response.addChild(element)
440
441            return response
442
443        def checkNotImplemented(failure):
444            failure.trap(NotImplementedError)
445            raise error.StanzaError('feature-not-implemented')
446
447        def fromStanzaError(failure, iq):
448            failure.trap(error.StanzaError)
449            return failure.value.toResponse(iq)
450
451        def fromOtherError(failure, iq):
452            log.msg("Unhandled error in iq handler:", isError=True)
453            log.err(failure)
454            return error.StanzaError('internal-server-error').toResponse(iq)
455
456        handler = None
457        for queryString, method in iteritems(self.iqHandlers):
458            if xpath.internQuery(queryString).matches(iq):
459                handler = getattr(self, method)
460
461        if handler:
462            d = defer.maybeDeferred(handler, iq)
463        else:
464            d = defer.fail(NotImplementedError())
465
466        d.addCallback(toResult, iq)
467        d.addErrback(checkNotImplemented)
468        d.addErrback(fromStanzaError, iq)
469        d.addErrback(fromOtherError, iq)
470
471        d.addCallback(self.send)
472
473        iq.handled = True
474
475
476
477__all__ = ['XMPPHandler', 'XMPPHandlerCollection', 'StreamManager',
478           'IQHandlerMixin']
Note: See TracBrowser for help on using the repository browser.