source: wokkel/subprotocols.py @ 45:125f9d902a20

Last change on this file since 45:125f9d902a20 was 45:125f9d902a20, checked in by Ralph Meijer <ralphm@…>, 12 years ago

Bring subprotocols code in line with Twisted trunk.

File size: 11.5 KB
Line 
1# -*- test-case-name: wokkel.test.test_subprotocols -*-
2#
3# Copyright (c) 2001-2007 Twisted Matrix Laboratories.
4# See LICENSE for details.
5
6"""
7XMPP subprotocol support.
8"""
9
10from zope.interface import implements
11
12from twisted.internet import defer
13from twisted.python import log
14from twisted.words.protocols.jabber import error, xmlstream
15from twisted.words.xish import xpath
16from twisted.words.xish.domish import IElement
17
18try:
19    from twisted.words.protocols.jabber.xmlstream import toResponse
20except ImportError:
21    from wokkel.compat import toResponse
22
23from wokkel.iwokkel import IXMPPHandler, IXMPPHandlerCollection
24
25class XMPPHandler(object):
26    """
27    XMPP protocol handler.
28
29    Classes derived from this class implement (part of) one or more XMPP
30    extension protocols, and are referred to as a subprotocol implementation.
31    """
32
33    implements(IXMPPHandler)
34
35    def __init__(self):
36        self.parent = None
37        self.xmlstream = None
38
39
40    def setHandlerParent(self, parent):
41        self.parent = parent
42        self.parent.addHandler(self)
43
44
45    def disownHandlerParent(self, parent):
46        self.parent.removeHandler(self)
47        self.parent = None
48
49
50    def makeConnection(self, xs):
51        self.xmlstream = xs
52        self.connectionMade()
53
54
55    def connectionMade(self):
56        """
57        Called after a connection has been established.
58
59        Can be overridden to perform work before stream initialization.
60        """
61
62
63    def connectionInitialized(self):
64        """
65        The XML stream has been initialized.
66
67        Can be overridden to perform work after stream initialization, e.g. to
68        set up observers and start exchanging XML stanzas.
69        """
70
71
72    def connectionLost(self, reason):
73        """
74        The XML stream has been closed.
75
76        This method can be extended to inspect the C{reason} argument and
77        act on it.
78        """
79        self.xmlstream = None
80
81
82    def send(self, obj):
83        """
84        Send data over the managed XML stream.
85
86        @note: The stream manager maintains a queue for data sent using this
87               method when there is no current initialized XML stream. This
88               data is then sent as soon as a new stream has been established
89               and initialized. Subsequently, L{connectionInitialized} will be
90               called again. If this queueing is not desired, use C{send} on
91               C{self.xmlstream}.
92
93        @param obj: data to be sent over the XML stream. This is usually an
94                    object providing L{domish.IElement}, or serialized XML. See
95                    L{xmlstream.XmlStream} for details.
96        """
97        self.parent.send(obj)
98
99
100
101class XMPPHandlerCollection(object):
102    """
103    Collection of XMPP subprotocol handlers.
104
105    This allows for grouping of subprotocol handlers, but is not an
106    L{XMPPHandler} itself, so this is not recursive.
107
108    @ivar handlers: List of protocol handlers.
109    @type handlers: L{list} of objects providing
110                      L{IXMPPHandler}
111    """
112
113    implements(IXMPPHandlerCollection)
114
115    def __init__(self):
116        self.handlers = []
117
118
119    def __iter__(self):
120        """
121        Act as a container for handlers.
122        """
123        return iter(self.handlers)
124
125
126    def addHandler(self, handler):
127        """
128        Add protocol handler.
129
130        Protocol handlers are expected to provide L{IXMPPHandler}.
131        """
132        self.handlers.append(handler)
133
134
135    def removeHandler(self, handler):
136        """
137        Remove protocol handler.
138        """
139        self.handlers.remove(handler)
140
141
142
143class StreamManager(XMPPHandlerCollection):
144    """
145    Business logic representing a managed XMPP connection.
146
147    This maintains a single XMPP connection and provides facilities for packet
148    routing and transmission. Business logic modules are objects providing
149    L{IXMPPHandler} (like subclasses of L{XMPPHandler}), and added
150    using L{addHandler}.
151
152    @ivar xmlstream: currently managed XML stream
153    @type xmlstream: L{XmlStream}
154    @ivar logTraffic: if true, log all traffic.
155    @type logTraffic: L{bool}
156    @ivar _initialized: Whether the stream represented by L{xmlstream} has
157                        been initialized. This is used when caching outgoing
158                        stanzas.
159    @type _initialized: C{bool}
160    @ivar _packetQueue: internal buffer of unsent data. See L{send} for details.
161    @type _packetQueue: L{list}
162    """
163
164    logTraffic = False
165
166    def __init__(self, factory):
167        XMPPHandlerCollection.__init__(self)
168        self.xmlstream = None
169        self._packetQueue = []
170        self._initialized = False
171
172        factory.addBootstrap(xmlstream.STREAM_CONNECTED_EVENT, self._connected)
173        factory.addBootstrap(xmlstream.STREAM_AUTHD_EVENT, self._authd)
174        factory.addBootstrap(xmlstream.INIT_FAILED_EVENT,
175                             self.initializationFailed)
176        factory.addBootstrap(xmlstream.STREAM_END_EVENT, self._disconnected)
177        self.factory = factory
178
179
180    def addHandler(self, handler):
181        """
182        Add protocol handler.
183
184        When an XML stream has already been established, the handler's
185        C{connectionInitialized} will be called to get it up to speed.
186        """
187        XMPPHandlerCollection.addHandler(self, handler)
188
189        # get protocol handler up to speed when a connection has already
190        # been established
191        if self.xmlstream and self._initialized:
192            handler.makeConnection(self.xmlstream)
193            handler.connectionInitialized()
194
195
196    def _connected(self, xs):
197        """
198        Called when the transport connection has been established.
199
200        Here we optionally set up traffic logging (depending on L{logTraffic})
201        and call each handler's C{makeConnection} method with the L{XmlStream}
202        instance.
203        """
204        def logDataIn(buf):
205            log.msg("RECV: %r" % buf)
206
207        def logDataOut(buf):
208            log.msg("SEND: %r" % buf)
209
210        if self.logTraffic:
211            xs.rawDataInFn = logDataIn
212            xs.rawDataOutFn = logDataOut
213
214        self.xmlstream = xs
215
216        for e in self:
217            e.makeConnection(xs)
218
219
220    def _authd(self, xs):
221        """
222        Called when the stream has been initialized.
223
224        Send out cached stanzas and call each handler's
225        C{connectionInitialized} method.
226        """
227        # Flush all pending packets
228        for p in self._packetQueue:
229            xs.send(p)
230        self._packetQueue = []
231        self._initialized = True
232
233        # Notify all child services which implement
234        # the IService interface
235        for e in self:
236            e.connectionInitialized()
237
238
239    def initializationFailed(self, reason):
240        """
241        Called when stream initialization has failed.
242
243        Stream initialization has halted, with the reason indicated by
244        C{reason}. It may be retried by calling the authenticator's
245        C{initializeStream}. See the respective authenticators for details.
246
247        @param reason: A failure instance indicating why stream initialization
248                       failed.
249        @type reason: L{failure.Failure}
250        """
251
252
253    def _disconnected(self, _):
254        """
255        Called when the stream has been closed.
256
257        From this point on, the manager doesn't interact with the
258        L{XmlStream} anymore and notifies each handler that the connection
259        was lost by calling its C{connectionLost} method.
260        """
261        self.xmlstream = None
262        self._initialized = False
263
264        # Notify all child services which implement
265        # the IService interface
266        for e in self:
267            e.connectionLost(None)
268
269
270    def send(self, obj):
271        """
272        Send data over the XML stream.
273
274        When there is no established XML stream, the data is queued and sent
275        out when a new XML stream has been established and initialized.
276
277        @param obj: data to be sent over the XML stream. See
278                    L{xmlstream.XmlStream.send} for details.
279        """
280        if self._initialized:
281            self.xmlstream.send(obj)
282        else:
283            self._packetQueue.append(obj)
284
285
286
287class IQHandlerMixin(object):
288    """
289    XMPP subprotocol mixin for handle incoming IQ stanzas.
290
291    This matches up the iq with XPath queries to call methods on itself,
292    wrapping the call so that exceptions result in proper error responses,
293    or, when succesful will reply with a response with optional payload.
294
295    Derivatives of this class must provide an
296    L{XmlStream<twisted.words.protocols.jabber.xmlstream.XmlStream>} instance
297    in its C{xmlstream} attribute.
298
299    The optional payload is taken from the result of the handler and is
300    expected to be a child or a list of childs.
301
302    If an exception is raised, or the deferred has its errback called,
303    the exception is checked for being a L{error.StanzaError}. If so,
304    an error response is sent. Any other exception will cause a error
305    response of C{internal-server-error} to be sent.
306
307    A typical way to use this mixin, is to set up L{xpath} observers on the
308    C{xmlstream} to call handleRequest, for example in an overridden
309    L{XMPPHandler.connectionMade}. It is likely a good idea to only listen for
310    incoming iq get and/org iq set requests, and not for any iq, to prevent
311    hijacking incoming responses to outgoing iq requests. An example:
312
313        >>> QUERY_ROSTER = "/query[@xmlns='jabber:iq:roster']"
314        >>> class MyHandler(XMPPHandler, IQHandlerMixin):
315        ...    iqHandlers = {"/iq[@type='get']" + QUERY_ROSTER: 'onRosterGet',
316        ...                  "/iq[@type='set']" + QUERY_ROSTER: 'onRosterSet'}
317        ...    def connectionMade(self):
318        ...        self.xmlstream.addObserver(
319        ...          "/iq[@type='get' or @type='set']" + QUERY_ROSTER,
320        ...          self.handleRequest)
321        ...    def onRosterGet(self, iq):
322        ...        pass
323        ...    def onRosterSet(self, iq):
324        ...        pass
325
326    @cvar iqHandlers: Mapping from XPath queries (as a string) to the method
327                      name that will handle requests that match the query.
328    @type iqHandlers: L{dict}
329    """
330
331    iqHandlers = None
332
333    def handleRequest(self, iq):
334        """
335        Find a handler and wrap the call for sending a response stanza.
336        """
337        def toResult(result, iq):
338            response = toResponse(iq, 'result')
339
340            if result:
341                if IElement.providedBy(result):
342                    response.addChild(result)
343                else:
344                    for element in result:
345                        response.addChild(element)
346
347            return response
348
349        def checkNotImplemented(failure):
350            failure.trap(NotImplementedError)
351            raise error.StanzaError('feature-not-implemented')
352
353        def fromStanzaError(failure, iq):
354            e = failure.trap(error.StanzaError)
355            return failure.value.toResponse(iq)
356
357        def fromOtherError(failure, iq):
358            log.msg("Unhandled error in iq handler:", isError=True)
359            log.err(failure)
360            return error.StanzaError('internal-server-error').toResponse(iq)
361
362        handler = None
363        for queryString, method in self.iqHandlers.iteritems():
364            if xpath.internQuery(queryString).matches(iq):
365                handler = getattr(self, method)
366
367        if handler:
368            d = defer.maybeDeferred(handler, iq)
369        else:
370            d = defer.fail(NotImplementedError())
371
372        d.addCallback(toResult, iq)
373        d.addErrback(checkNotImplemented)
374        d.addErrback(fromStanzaError, iq)
375        d.addErrback(fromOtherError, iq)
376
377        d.addCallback(self.send)
378
379        iq.handled = True
Note: See TracBrowser for help on using the repository browser.