source: wokkel/subprotocols.py @ 53:4aec395fda5b

Last change on this file since 53:4aec395fda5b was 53:4aec395fda5b, checked in by Ralph Meijer <ralphm@…>, 11 years ago

Release Wokkel 0.5.0.

File size: 11.5 KB
Line 
1# -*- test-case-name: wokkel.test.test_subprotocols -*-
2#
3# Copyright (c) 2001-2009 Twisted Matrix Laboratories.
4# See LICENSE for details.
5
6"""
7XMPP subprotocol support.
8"""
9
10from zope.interface import implements
11
12from twisted.internet import defer
13from twisted.python import log
14from twisted.words.protocols.jabber import error, xmlstream
15from twisted.words.protocols.jabber.xmlstream import toResponse
16from twisted.words.xish import xpath
17from twisted.words.xish.domish import IElement
18
19from wokkel.iwokkel import IXMPPHandler, IXMPPHandlerCollection
20
21class XMPPHandler(object):
22    """
23    XMPP protocol handler.
24
25    Classes derived from this class implement (part of) one or more XMPP
26    extension protocols, and are referred to as a subprotocol implementation.
27    """
28
29    implements(IXMPPHandler)
30
31    def __init__(self):
32        self.parent = None
33        self.xmlstream = None
34
35
36    def setHandlerParent(self, parent):
37        self.parent = parent
38        self.parent.addHandler(self)
39
40
41    def disownHandlerParent(self, parent):
42        self.parent.removeHandler(self)
43        self.parent = None
44
45
46    def makeConnection(self, xs):
47        self.xmlstream = xs
48        self.connectionMade()
49
50
51    def connectionMade(self):
52        """
53        Called after a connection has been established.
54
55        Can be overridden to perform work before stream initialization.
56        """
57
58
59    def connectionInitialized(self):
60        """
61        The XML stream has been initialized.
62
63        Can be overridden to perform work after stream initialization, e.g. to
64        set up observers and start exchanging XML stanzas.
65        """
66
67
68    def connectionLost(self, reason):
69        """
70        The XML stream has been closed.
71
72        This method can be extended to inspect the C{reason} argument and
73        act on it.
74        """
75        self.xmlstream = None
76
77
78    def send(self, obj):
79        """
80        Send data over the managed XML stream.
81
82        @note: The stream manager maintains a queue for data sent using this
83               method when there is no current initialized XML stream. This
84               data is then sent as soon as a new stream has been established
85               and initialized. Subsequently, L{connectionInitialized} will be
86               called again. If this queueing is not desired, use C{send} on
87               C{self.xmlstream}.
88
89        @param obj: data to be sent over the XML stream. This is usually an
90                    object providing L{domish.IElement}, or serialized XML. See
91                    L{xmlstream.XmlStream} for details.
92        """
93        self.parent.send(obj)
94
95
96
97class XMPPHandlerCollection(object):
98    """
99    Collection of XMPP subprotocol handlers.
100
101    This allows for grouping of subprotocol handlers, but is not an
102    L{XMPPHandler} itself, so this is not recursive.
103
104    @ivar handlers: List of protocol handlers.
105    @type handlers: L{list} of objects providing
106                      L{IXMPPHandler}
107    """
108
109    implements(IXMPPHandlerCollection)
110
111    def __init__(self):
112        self.handlers = []
113
114
115    def __iter__(self):
116        """
117        Act as a container for handlers.
118        """
119        return iter(self.handlers)
120
121
122    def addHandler(self, handler):
123        """
124        Add protocol handler.
125
126        Protocol handlers are expected to provide L{IXMPPHandler}.
127        """
128        self.handlers.append(handler)
129
130
131    def removeHandler(self, handler):
132        """
133        Remove protocol handler.
134        """
135        self.handlers.remove(handler)
136
137
138
139class StreamManager(XMPPHandlerCollection):
140    """
141    Business logic representing a managed XMPP connection.
142
143    This maintains a single XMPP connection and provides facilities for packet
144    routing and transmission. Business logic modules are objects providing
145    L{IXMPPHandler} (like subclasses of L{XMPPHandler}), and added
146    using L{addHandler}.
147
148    @ivar xmlstream: currently managed XML stream
149    @type xmlstream: L{XmlStream}
150    @ivar logTraffic: if true, log all traffic.
151    @type logTraffic: L{bool}
152    @ivar _initialized: Whether the stream represented by L{xmlstream} has
153                        been initialized. This is used when caching outgoing
154                        stanzas.
155    @type _initialized: C{bool}
156    @ivar _packetQueue: internal buffer of unsent data. See L{send} for details.
157    @type _packetQueue: L{list}
158    """
159
160    logTraffic = False
161
162    def __init__(self, factory):
163        XMPPHandlerCollection.__init__(self)
164        self.xmlstream = None
165        self._packetQueue = []
166        self._initialized = False
167
168        factory.addBootstrap(xmlstream.STREAM_CONNECTED_EVENT, self._connected)
169        factory.addBootstrap(xmlstream.STREAM_AUTHD_EVENT, self._authd)
170        factory.addBootstrap(xmlstream.INIT_FAILED_EVENT,
171                             self.initializationFailed)
172        factory.addBootstrap(xmlstream.STREAM_END_EVENT, self._disconnected)
173        self.factory = factory
174
175
176    def addHandler(self, handler):
177        """
178        Add protocol handler.
179
180        When an XML stream has already been established, the handler's
181        C{connectionInitialized} will be called to get it up to speed.
182        """
183        XMPPHandlerCollection.addHandler(self, handler)
184
185        # get protocol handler up to speed when a connection has already
186        # been established
187        if self.xmlstream and self._initialized:
188            handler.makeConnection(self.xmlstream)
189            handler.connectionInitialized()
190
191
192    def _connected(self, xs):
193        """
194        Called when the transport connection has been established.
195
196        Here we optionally set up traffic logging (depending on L{logTraffic})
197        and call each handler's C{makeConnection} method with the L{XmlStream}
198        instance.
199        """
200        def logDataIn(buf):
201            log.msg("RECV: %r" % buf)
202
203        def logDataOut(buf):
204            log.msg("SEND: %r" % buf)
205
206        if self.logTraffic:
207            xs.rawDataInFn = logDataIn
208            xs.rawDataOutFn = logDataOut
209
210        self.xmlstream = xs
211
212        for e in self:
213            e.makeConnection(xs)
214
215
216    def _authd(self, xs):
217        """
218        Called when the stream has been initialized.
219
220        Send out cached stanzas and call each handler's
221        C{connectionInitialized} method.
222        """
223        # Flush all pending packets
224        for p in self._packetQueue:
225            xs.send(p)
226        self._packetQueue = []
227        self._initialized = True
228
229        # Notify all child services which implement
230        # the IService interface
231        for e in self:
232            e.connectionInitialized()
233
234
235    def initializationFailed(self, reason):
236        """
237        Called when stream initialization has failed.
238
239        Stream initialization has halted, with the reason indicated by
240        C{reason}. It may be retried by calling the authenticator's
241        C{initializeStream}. See the respective authenticators for details.
242
243        @param reason: A failure instance indicating why stream initialization
244                       failed.
245        @type reason: L{failure.Failure}
246        """
247
248
249    def _disconnected(self, _):
250        """
251        Called when the stream has been closed.
252
253        From this point on, the manager doesn't interact with the
254        L{XmlStream} anymore and notifies each handler that the connection
255        was lost by calling its C{connectionLost} method.
256        """
257        self.xmlstream = None
258        self._initialized = False
259
260        # Notify all child services which implement
261        # the IService interface
262        for e in self:
263            e.connectionLost(None)
264
265
266    def send(self, obj):
267        """
268        Send data over the XML stream.
269
270        When there is no established XML stream, the data is queued and sent
271        out when a new XML stream has been established and initialized.
272
273        @param obj: data to be sent over the XML stream. See
274                    L{xmlstream.XmlStream.send} for details.
275        """
276        if self._initialized:
277            self.xmlstream.send(obj)
278        else:
279            self._packetQueue.append(obj)
280
281
282
283class IQHandlerMixin(object):
284    """
285    XMPP subprotocol mixin for handle incoming IQ stanzas.
286
287    This matches up the iq with XPath queries to call methods on itself,
288    wrapping the call so that exceptions result in proper error responses,
289    or, when succesful will reply with a response with optional payload.
290
291    Derivatives of this class must provide an
292    L{XmlStream<twisted.words.protocols.jabber.xmlstream.XmlStream>} instance
293    in its C{xmlstream} attribute.
294
295    The optional payload is taken from the result of the handler and is
296    expected to be a child or a list of childs.
297
298    If an exception is raised, or the deferred has its errback called,
299    the exception is checked for being a L{error.StanzaError}. If so,
300    an error response is sent. Any other exception will cause a error
301    response of C{internal-server-error} to be sent.
302
303    A typical way to use this mixin, is to set up L{xpath} observers on the
304    C{xmlstream} to call handleRequest, for example in an overridden
305    L{XMPPHandler.connectionMade}. It is likely a good idea to only listen for
306    incoming iq get and/org iq set requests, and not for any iq, to prevent
307    hijacking incoming responses to outgoing iq requests. An example:
308
309        >>> QUERY_ROSTER = "/query[@xmlns='jabber:iq:roster']"
310        >>> class MyHandler(XMPPHandler, IQHandlerMixin):
311        ...    iqHandlers = {"/iq[@type='get']" + QUERY_ROSTER: 'onRosterGet',
312        ...                  "/iq[@type='set']" + QUERY_ROSTER: 'onRosterSet'}
313        ...    def connectionMade(self):
314        ...        self.xmlstream.addObserver(
315        ...          "/iq[@type='get' or @type='set']" + QUERY_ROSTER,
316        ...          self.handleRequest)
317        ...    def onRosterGet(self, iq):
318        ...        pass
319        ...    def onRosterSet(self, iq):
320        ...        pass
321
322    @cvar iqHandlers: Mapping from XPath queries (as a string) to the method
323                      name that will handle requests that match the query.
324    @type iqHandlers: L{dict}
325    """
326
327    iqHandlers = None
328
329    def handleRequest(self, iq):
330        """
331        Find a handler and wrap the call for sending a response stanza.
332        """
333        def toResult(result, iq):
334            response = toResponse(iq, 'result')
335
336            if result:
337                if IElement.providedBy(result):
338                    response.addChild(result)
339                else:
340                    for element in result:
341                        response.addChild(element)
342
343            return response
344
345        def checkNotImplemented(failure):
346            failure.trap(NotImplementedError)
347            raise error.StanzaError('feature-not-implemented')
348
349        def fromStanzaError(failure, iq):
350            e = failure.trap(error.StanzaError)
351            return failure.value.toResponse(iq)
352
353        def fromOtherError(failure, iq):
354            log.msg("Unhandled error in iq handler:", isError=True)
355            log.err(failure)
356            return error.StanzaError('internal-server-error').toResponse(iq)
357
358        handler = None
359        for queryString, method in self.iqHandlers.iteritems():
360            if xpath.internQuery(queryString).matches(iq):
361                handler = getattr(self, method)
362
363        if handler:
364            d = defer.maybeDeferred(handler, iq)
365        else:
366            d = defer.fail(NotImplementedError())
367
368        d.addCallback(toResult, iq)
369        d.addErrback(checkNotImplemented)
370        d.addErrback(fromStanzaError, iq)
371        d.addErrback(fromOtherError, iq)
372
373        d.addCallback(self.send)
374
375        iq.handled = True
Note: See TracBrowser for help on using the repository browser.