source: wokkel/subprotocols.py @ 22:9edffdf4aee4

Last change on this file since 22:9edffdf4aee4 was 22:9edffdf4aee4, checked in by Ralph Meijer <ralphm@…>, 13 years ago

Clarify use of IQHandlerMixin, fix some doc strings.

File size: 9.8 KB
Line 
1# -*- test-case-name: wokkel.test.test_subprotocols -*-
2#
3# Copyright (c) 2001-2007 Twisted Matrix Laboratories.
4# See LICENSE for details.
5
6"""
7XMPP subprotocol support.
8"""
9
10from zope.interface import implements
11
12from twisted.internet import defer
13from twisted.python import log
14from twisted.words.protocols.jabber import error, xmlstream
15from twisted.words.xish import xpath
16from twisted.words.xish.domish import IElement
17
18try:
19    from twisted.words.protocols.jabber.xmlstream import toResponse
20except ImportError:
21    from wokkel.compat import toResponse
22
23from wokkel.iwokkel import IXMPPHandler, IXMPPHandlerCollection
24
25class XMPPHandler(object):
26    implements(IXMPPHandler)
27
28    def setHandlerParent(self, parent):
29        self.parent = parent
30        self.parent.addHandler(self)
31
32    def disownHandlerParent(self, parent):
33        self.parent.removeHandler(self)
34        self.parent = None
35
36    def makeConnection(self, xs):
37        self.xmlstream = xs
38        self.connectionMade()
39
40    def connectionMade(self):
41        pass
42
43    def connectionInitialized(self):
44        pass
45
46    def connectionLost(self, reason):
47        self.xmlstream = None
48
49    def send(self, obj):
50        """
51        Send data over the managed XML stream.
52
53        @note: The stream manager maintains a queue for data sent using this
54               method when there is no current initialized XML stream. This
55               data is then sent as soon as a new stream has been established
56               and initialized. Subsequently, L{connectionInitialized} will be
57               called again. If this queueing is not desired, use C{send} on
58               C{self.xmlstream}.
59
60        @param obj: data to be sent over the XML stream. This is usually an
61                    object providing L{domish.IElement}, or serialized XML. See
62                    L{xmlstream.XmlStream} for details.
63        """
64        self.parent.send(obj)
65
66
67class XMPPHandlerCollection(object):
68    """
69    Collection of XMPP subprotocol handlers.
70
71    This allows for grouping of subprotocol handlers, but is not an
72    L{XMPPHandler} itself, so this is not recursive.
73
74    @ivar xmlstream: Currently managed XML stream.
75    @type xmlstream: L{XmlStream}
76    @ivar handlers: List of protocol handlers.
77    @type handlers: L{list} of objects providing
78                      L{IXMPPHandler}
79    """
80
81    implements(IXMPPHandlerCollection)
82
83    def __init__(self):
84        self.handlers = []
85        self.xmlstream = None
86        self._initialized = False
87
88    def __iter__(self):
89        """
90        Act as a container for handlers.
91        """
92        return iter(self.handlers)
93
94    def addHandler(self, handler):
95        """
96        Add protocol handler.
97
98        Protocol handlers are expected to provide L{IXMPPHandler}.
99
100        When an XML stream has already been established, the handler's
101        C{connectionInitialized} will be called to get it up to speed.
102        """
103
104        self.handlers.append(handler)
105
106        # get protocol handler up to speed when a connection has already
107        # been established
108        if self.xmlstream and self._initialized:
109            handler.makeConnection(self.xmlstream)
110            handler.connectionInitialized()
111
112    def removeHandler(self, handler):
113        """
114        Remove protocol handler.
115        """
116
117        self.handlers.remove(handler)
118
119class StreamManager(XMPPHandlerCollection):
120    """
121    Business logic representing a managed XMPP connection.
122
123    This maintains a single XMPP connection and provides facilities for packet
124    routing and transmission. Business logic modules are objects providing
125    L{IXMPPHandler} (like subclasses of L{XMPPHandler}), and added
126    using L{addHandler}.
127
128    @ivar logTraffic: if true, log all traffic.
129    @type logTraffic: L{bool}
130    @ivar _packetQueue: internal buffer of unsent data. See L{send} for details.
131    @type _packetQueue: L{list}
132    """
133
134    logTraffic = False
135
136    def __init__(self, factory):
137        self.handlers = []
138        self.xmlstream = None
139        self._packetQueue = []
140        self._initialized = False
141
142        factory.addBootstrap(xmlstream.STREAM_CONNECTED_EVENT, self._connected)
143        factory.addBootstrap(xmlstream.STREAM_AUTHD_EVENT, self._authd)
144        factory.addBootstrap(xmlstream.INIT_FAILED_EVENT,
145                             self.initializationFailed)
146        factory.addBootstrap(xmlstream.STREAM_END_EVENT, self._disconnected)
147        self.factory = factory
148
149    def _connected(self, xs):
150        def logDataIn(buf):
151            log.msg("RECV: %r" % buf)
152
153        def logDataOut(buf):
154            log.msg("SEND: %r" % buf)
155
156        if self.logTraffic:
157            xs.rawDataInFn = logDataIn
158            xs.rawDataOutFn = logDataOut
159
160        self.xmlstream = xs
161
162        for e in self:
163            e.makeConnection(xs)
164
165    def _authd(self, xs):
166        # Flush all pending packets
167        for p in self._packetQueue:
168            xs.send(p)
169        self._packetQueue = []
170        self._initialized = True
171
172        # Notify all child services which implement
173        # the IService interface
174        for e in self:
175            e.connectionInitialized()
176
177    def initializationFailed(self, reason):
178        """
179        Called when stream initialization has failed.
180
181        Stream initialization has halted, with the reason indicated by
182        C{reason}. It may be retried by calling the authenticator's
183        C{initializeStream}. See the respective authenticators for details.
184
185        @param reason: A failure instance indicating why stream initialization
186                       failed.
187        @type reason: L{failure.Failure}
188        """
189
190    def _disconnected(self, _):
191        self.xmlstream = None
192        self._initialized = False
193
194        # Notify all child services which implement
195        # the IService interface
196        for e in self:
197            e.xmlstream = None
198            e.connectionLost(None)
199
200    def send(self, obj):
201        """
202        Send data over the XML stream.
203
204        When there is no established XML stream, the data is queued and sent
205        out when a new XML stream has been established and initialized.
206
207        @param obj: data to be sent over the XML stream. See
208                    L{xmlstream.XmlStream.send} for details.
209        """
210
211        if self._initialized:
212            self.xmlstream.send(obj)
213        else:
214            self._packetQueue.append(obj)
215
216
217class IQHandlerMixin(object):
218    """
219    XMPP subprotocol mixin for handle incoming IQ stanzas.
220
221    This matches up the iq with XPath queries to call methods on itself,
222    wrapping the call so that exceptions result in proper error responses,
223    or, when succesful will reply with a response with optional payload.
224
225    Derivatives of this class must provide an
226    L{XmlStream<twisted.words.protocols.jabber.xmlstream.XmlStream>} instance
227    in its C{xmlstream} attribute.
228
229    The optional payload is taken from the result of the handler and is
230    expected to be a child or a list of childs.
231
232    If an exception is raised, or the deferred has its errback called,
233    the exception is checked for being a L{error.StanzaError}. If so,
234    an error response is sent. Any other exception will cause a error
235    response of C{internal-server-error} to be sent.
236
237    A typical way to use this mixin, is to set up L{xpath} observers on the
238    C{xmlstream} to call handleRequest, for example in an overridden
239    L{XMPPHandler.connectionMade}. It is likely a good idea to only listen for
240    incoming iq get and/org iq set requests, and not for any iq, to prevent
241    hijacking incoming responses to outgoing iq requests. An example:
242
243        >>> QUERY_ROSTER = "/query[@xmlns='jabber:iq:roster']"
244        >>> class MyHandler(XMPPHandler, IQHandlerMixin):
245        ...    iqHandlers = {"/iq[@type='get']" + QUERY_ROSTER: 'onRosterGet',
246        ...                  "/iq[@type='set']" + QUERY_ROSTER: 'onRosterSet'}
247        ...    def connectionMade(self):
248        ...        self.xmlstream.addObserver(
249        ...          "/iq[@type='get' or @type='set']" + QUERY_ROSTER,
250        ...          self.handleRequest)
251        ...    def onRosterGet(self, iq):
252        ...        pass
253        ...    def onRosterSet(self, iq):
254        ...        pass
255
256    @cvar iqHandlers: Mapping from XPath queries (as a string) to the method
257                      name that will handle requests that match the query.
258    @type iqHandlers: L{dict}
259    """
260
261    iqHandlers = None
262
263    def handleRequest(self, iq):
264        """
265        Find a handler and wrap the call for sending a response stanza.
266        """
267        def toResult(result, iq):
268            response = toResponse(iq, 'result')
269
270            if result:
271                if IElement.providedBy(result):
272                    response.addChild(result)
273                else:
274                    for element in result:
275                        response.addChild(element)
276
277            return response
278
279        def checkNotImplemented(failure):
280            failure.trap(NotImplementedError)
281            raise error.StanzaError('feature-not-implemented')
282
283        def fromStanzaError(failure, iq):
284            e = failure.trap(error.StanzaError)
285            return failure.value.toResponse(iq)
286
287        def fromOtherError(failure, iq):
288            log.msg("Unhandled error in iq handler:", isError=True)
289            log.err(failure)
290            return error.StanzaError('internal-server-error').toResponse(iq)
291
292        handler = None
293        for queryString, method in self.iqHandlers.iteritems():
294            if xpath.internQuery(queryString).matches(iq):
295                handler = getattr(self, method)
296
297        if handler:
298            d = defer.maybeDeferred(handler, iq)
299        else:
300            d = defer.fail(NotImplementedError())
301
302        d.addCallback(toResult, iq)
303        d.addErrback(checkNotImplemented)
304        d.addErrback(fromStanzaError, iq)
305        d.addErrback(fromOtherError, iq)
306
307        d.addCallback(self.send)
308
309        iq.handled = True
Note: See TracBrowser for help on using the repository browser.