source: wokkel/subprotocols.py @ 99:2c8dc93fbef4

Last change on this file since 99:2c8dc93fbef4 was 99:2c8dc93fbef4, checked in by Ralph Meijer <ralphm@…>, 10 years ago

Fix race condition and nesting errors when adding a new subprotocol handler.

This fixes two related issues with adding a new subprotocol handler:

  • Adding a handler when the stream is not yet initialized (authenticated) does not cause connectionMade to be called.
  • Adding a handler in connectionMade, connectionInitialized, or connectionLost modifies the lists of handlers iterated over, causing some methods being called too often.

Author: ff, kandaurovoleg, ralphm.
Fixes: #48.

  • Property exe set to *
File size: 11.7 KB
Line 
1# -*- test-case-name: wokkel.test.test_subprotocols -*-
2#
3# Copyright (c) Twisted Matrix Laboratories.
4# See LICENSE for details.
5
6"""
7XMPP subprotocol support.
8"""
9
10from zope.interface import implements
11
12from twisted.internet import defer
13from twisted.internet.error import ConnectionDone
14from twisted.python import failure, log
15from twisted.words.protocols.jabber import error, xmlstream
16from twisted.words.protocols.jabber.xmlstream import toResponse
17from twisted.words.xish import xpath
18from twisted.words.xish.domish import IElement
19
20from wokkel.iwokkel import IXMPPHandler, IXMPPHandlerCollection
21
22class XMPPHandler(object):
23    """
24    XMPP protocol handler.
25
26    Classes derived from this class implement (part of) one or more XMPP
27    extension protocols, and are referred to as a subprotocol implementation.
28    """
29
30    implements(IXMPPHandler)
31
32    def __init__(self):
33        self.parent = None
34        self.xmlstream = None
35
36
37    def setHandlerParent(self, parent):
38        self.parent = parent
39        self.parent.addHandler(self)
40
41
42    def disownHandlerParent(self, parent):
43        self.parent.removeHandler(self)
44        self.parent = None
45
46
47    def makeConnection(self, xs):
48        self.xmlstream = xs
49        self.connectionMade()
50
51
52    def connectionMade(self):
53        """
54        Called after a connection has been established.
55
56        Can be overridden to perform work before stream initialization.
57        """
58
59
60    def connectionInitialized(self):
61        """
62        The XML stream has been initialized.
63
64        Can be overridden to perform work after stream initialization, e.g. to
65        set up observers and start exchanging XML stanzas.
66        """
67
68
69    def connectionLost(self, reason):
70        """
71        The XML stream has been closed.
72
73        This method can be extended to inspect the C{reason} argument and
74        act on it.
75        """
76        self.xmlstream = None
77
78
79    def send(self, obj):
80        """
81        Send data over the managed XML stream.
82
83        @note: The stream manager maintains a queue for data sent using this
84               method when there is no current initialized XML stream. This
85               data is then sent as soon as a new stream has been established
86               and initialized. Subsequently, L{connectionInitialized} will be
87               called again. If this queueing is not desired, use C{send} on
88               C{self.xmlstream}.
89
90        @param obj: data to be sent over the XML stream. This is usually an
91                    object providing L{domish.IElement}, or serialized XML. See
92                    L{xmlstream.XmlStream} for details.
93        """
94        self.parent.send(obj)
95
96
97
98class XMPPHandlerCollection(object):
99    """
100    Collection of XMPP subprotocol handlers.
101
102    This allows for grouping of subprotocol handlers, but is not an
103    L{XMPPHandler} itself, so this is not recursive.
104
105    @ivar handlers: List of protocol handlers.
106    @type handlers: L{list} of objects providing
107                      L{IXMPPHandler}
108    """
109
110    implements(IXMPPHandlerCollection)
111
112    def __init__(self):
113        self.handlers = []
114
115
116    def __iter__(self):
117        """
118        Act as a container for handlers.
119        """
120        return iter(self.handlers)
121
122
123    def addHandler(self, handler):
124        """
125        Add protocol handler.
126
127        Protocol handlers are expected to provide L{IXMPPHandler}.
128        """
129        self.handlers.append(handler)
130
131
132    def removeHandler(self, handler):
133        """
134        Remove protocol handler.
135        """
136        self.handlers.remove(handler)
137
138
139
140class StreamManager(XMPPHandlerCollection):
141    """
142    Business logic representing a managed XMPP connection.
143
144    This maintains a single XMPP connection and provides facilities for packet
145    routing and transmission. Business logic modules are objects providing
146    L{IXMPPHandler} (like subclasses of L{XMPPHandler}), and added
147    using L{addHandler}.
148
149    @ivar xmlstream: currently managed XML stream
150    @type xmlstream: L{XmlStream}
151    @ivar logTraffic: if true, log all traffic.
152    @type logTraffic: L{bool}
153    @ivar _initialized: Whether the stream represented by L{xmlstream} has
154                        been initialized. This is used when caching outgoing
155                        stanzas.
156    @type _initialized: C{bool}
157    @ivar _packetQueue: internal buffer of unsent data. See L{send} for details.
158    @type _packetQueue: L{list}
159    """
160
161    logTraffic = False
162
163    def __init__(self, factory):
164        XMPPHandlerCollection.__init__(self)
165        self.xmlstream = None
166        self._packetQueue = []
167        self._initialized = False
168
169        factory.addBootstrap(xmlstream.STREAM_CONNECTED_EVENT, self._connected)
170        factory.addBootstrap(xmlstream.STREAM_AUTHD_EVENT, self._authd)
171        factory.addBootstrap(xmlstream.INIT_FAILED_EVENT,
172                             self.initializationFailed)
173        factory.addBootstrap(xmlstream.STREAM_END_EVENT, self._disconnected)
174        self.factory = factory
175
176
177    def addHandler(self, handler):
178        """
179        Add protocol handler.
180
181        When an XML stream has already been established, the handler's
182        C{connectionInitialized} will be called to get it up to speed.
183        """
184        XMPPHandlerCollection.addHandler(self, handler)
185
186        # get protocol handler up to speed when a connection has already
187        # been established
188        if self.xmlstream:
189            handler.makeConnection(self.xmlstream)
190        if self._initialized:
191            handler.connectionInitialized()
192
193
194    def _connected(self, xs):
195        """
196        Called when the transport connection has been established.
197
198        Here we optionally set up traffic logging (depending on L{logTraffic})
199        and call each handler's C{makeConnection} method with the L{XmlStream}
200        instance.
201        """
202        def logDataIn(buf):
203            log.msg("RECV: %r" % buf)
204
205        def logDataOut(buf):
206            log.msg("SEND: %r" % buf)
207
208        if self.logTraffic:
209            xs.rawDataInFn = logDataIn
210            xs.rawDataOutFn = logDataOut
211
212        self.xmlstream = xs
213
214        for e in list(self):
215            e.makeConnection(xs)
216
217
218    def _authd(self, xs):
219        """
220        Called when the stream has been initialized.
221
222        Send out cached stanzas and call each handler's
223        C{connectionInitialized} method.
224        """
225        # Flush all pending packets
226        for p in self._packetQueue:
227            xs.send(p)
228        self._packetQueue = []
229        self._initialized = True
230
231        # Notify all child services which implement
232        # the IService interface
233        for e in list(self):
234            e.connectionInitialized()
235
236
237    def initializationFailed(self, reason):
238        """
239        Called when stream initialization has failed.
240
241        Stream initialization has halted, with the reason indicated by
242        C{reason}. It may be retried by calling the authenticator's
243        C{initializeStream}. See the respective authenticators for details.
244
245        @param reason: A failure instance indicating why stream initialization
246                       failed.
247        @type reason: L{failure.Failure}
248        """
249
250
251    def _disconnected(self, reason):
252        """
253        Called when the stream has been closed.
254
255        From this point on, the manager doesn't interact with the
256        L{XmlStream} anymore and notifies each handler that the connection
257        was lost by calling its C{connectionLost} method.
258        """
259        self.xmlstream = None
260        self._initialized = False
261
262        # Twisted versions before 11.0 passed an XmlStream here.
263        if not hasattr(reason, 'trap'):
264            reason = failure.Failure(ConnectionDone())
265
266        # Notify all child services which implement
267        # the IService interface
268        for e in list(self):
269            e.connectionLost(reason)
270
271
272    def send(self, obj):
273        """
274        Send data over the XML stream.
275
276        When there is no established XML stream, the data is queued and sent
277        out when a new XML stream has been established and initialized.
278
279        @param obj: data to be sent over the XML stream. See
280                    L{xmlstream.XmlStream.send} for details.
281        """
282        if self._initialized:
283            self.xmlstream.send(obj)
284        else:
285            self._packetQueue.append(obj)
286
287
288
289class IQHandlerMixin(object):
290    """
291    XMPP subprotocol mixin for handle incoming IQ stanzas.
292
293    This matches up the iq with XPath queries to call methods on itself,
294    wrapping the call so that exceptions result in proper error responses,
295    or, when succesful will reply with a response with optional payload.
296
297    Derivatives of this class must provide an
298    L{XmlStream<twisted.words.protocols.jabber.xmlstream.XmlStream>} instance
299    in its C{xmlstream} attribute.
300
301    The optional payload is taken from the result of the handler and is
302    expected to be a child or a list of childs.
303
304    If an exception is raised, or the deferred has its errback called,
305    the exception is checked for being a L{error.StanzaError}. If so,
306    an error response is sent. Any other exception will cause a error
307    response of C{internal-server-error} to be sent.
308
309    A typical way to use this mixin, is to set up L{xpath} observers on the
310    C{xmlstream} to call handleRequest, for example in an overridden
311    L{XMPPHandler.connectionMade}. It is likely a good idea to only listen for
312    incoming iq get and/org iq set requests, and not for any iq, to prevent
313    hijacking incoming responses to outgoing iq requests. An example:
314
315        >>> QUERY_ROSTER = "/query[@xmlns='jabber:iq:roster']"
316        >>> class MyHandler(XMPPHandler, IQHandlerMixin):
317        ...    iqHandlers = {"/iq[@type='get']" + QUERY_ROSTER: 'onRosterGet',
318        ...                  "/iq[@type='set']" + QUERY_ROSTER: 'onRosterSet'}
319        ...    def connectionMade(self):
320        ...        self.xmlstream.addObserver(
321        ...          "/iq[@type='get' or @type='set']" + QUERY_ROSTER,
322        ...          self.handleRequest)
323        ...    def onRosterGet(self, iq):
324        ...        pass
325        ...    def onRosterSet(self, iq):
326        ...        pass
327
328    @cvar iqHandlers: Mapping from XPath queries (as a string) to the method
329                      name that will handle requests that match the query.
330    @type iqHandlers: L{dict}
331    """
332
333    iqHandlers = None
334
335    def handleRequest(self, iq):
336        """
337        Find a handler and wrap the call for sending a response stanza.
338        """
339        def toResult(result, iq):
340            response = toResponse(iq, 'result')
341
342            if result:
343                if IElement.providedBy(result):
344                    response.addChild(result)
345                else:
346                    for element in result:
347                        response.addChild(element)
348
349            return response
350
351        def checkNotImplemented(failure):
352            failure.trap(NotImplementedError)
353            raise error.StanzaError('feature-not-implemented')
354
355        def fromStanzaError(failure, iq):
356            e = failure.trap(error.StanzaError)
357            return failure.value.toResponse(iq)
358
359        def fromOtherError(failure, iq):
360            log.msg("Unhandled error in iq handler:", isError=True)
361            log.err(failure)
362            return error.StanzaError('internal-server-error').toResponse(iq)
363
364        handler = None
365        for queryString, method in self.iqHandlers.iteritems():
366            if xpath.internQuery(queryString).matches(iq):
367                handler = getattr(self, method)
368
369        if handler:
370            d = defer.maybeDeferred(handler, iq)
371        else:
372            d = defer.fail(NotImplementedError())
373
374        d.addCallback(toResult, iq)
375        d.addErrback(checkNotImplemented)
376        d.addErrback(fromStanzaError, iq)
377        d.addErrback(fromOtherError, iq)
378
379        d.addCallback(self.send)
380
381        iq.handled = True
Note: See TracBrowser for help on using the repository browser.