source: wokkel/subprotocols.py @ 1:677f7d3cca75

Last change on this file since 1:677f7d3cca75 was 1:677f7d3cca75, checked in by Ralph Meijer <ralphm@…>, 15 years ago

Initial, heavily worked around code drop from the idavoll and mimir projects.

File size: 8.4 KB
Line 
1# -*- test-case-name: wokkel.test.test_subprotocols -*-
2#
3# Copyright (c) 2001-2007 Twisted Matrix Laboratories.
4# See LICENSE for details.
5
6"""
7XMPP subprotocol support.
8"""
9
10from zope.interface import implements
11
12from twisted.internet import defer
13from twisted.python import log
14from twisted.words.protocols.jabber import error, xmlstream
15from twisted.words.xish import xpath
16from twisted.words.xish.domish import IElement
17
18from wokkel.iwokkel import IXMPPHandler
19
20class XMPPHandler(object):
21    implements(IXMPPHandler)
22
23    def makeConnection(self, xs):
24        self.xmlstream = xs
25        self.connectionMade()
26
27    def connectionMade(self):
28        pass
29
30    def connectionInitialized(self):
31        pass
32
33    def connectionLost(self, reason):
34        self.xmlstream = None
35
36    def send(self, obj):
37        """
38        Send data over the managed XML stream.
39
40        @note: The stream manager maintains a queue for data sent using this
41               method when there is no current initialized XML stream. This
42               data is then sent as soon as a new stream has been established
43               and initialized. Subsequently, L{connectionInitialized} will be
44               called again. If this queueing is not desired, use C{send} on
45               C{self.xmlstream}.
46
47        @param obj: data to be sent over the XML stream. This is usually an
48                    object providing L{domish.IElement}, or serialized XML. See
49                    L{xmlstream.XmlStream} for details.
50        """
51        self.manager.send(obj)
52
53class XMPPHandlerContainer(object):
54    """
55    Container for XMPP subprotocol handlers.
56    """
57
58    def __init__(self):
59        self.handlers = []
60        self.xmlstream = None
61        self._initialized = False
62
63    def __iter__(self):
64        """
65        Act as a container for handlers.
66        """
67        return iter(self.handlers)
68
69    def addHandler(self, handler):
70        """
71        Add protocol handler.
72
73        Protocol handlers are expected to provide L{IXMPPHandler}.
74
75        When an XML stream has already been established, the handler's
76        C{connectionInitialized} will be called to get it up to speed.
77        """
78
79        self.handlers.append(handler)
80        handler.manager = self
81
82        # get protocol handler up to speed when a connection has already
83        # been established
84        if self.xmlstream and self._initialized:
85            handler.makeConnection(self.xmlstream)
86            handler.connectionInitialized()
87
88    def removeHandler(self, handler):
89        """
90        Remove protocol handler.
91        """
92
93        handler.manager = None
94        self.handlers.remove(handler)
95
96class StreamManager(XMPPHandlerContainer):
97    """
98    Business logic representing a managed XMPP connection.
99
100    This maintains a single XMPP connection and provides facilities for packet
101    routing and transmission. Business logic modules are objects providing
102    L{IXMPPHandler} (like subclasses of L{XMPPHandler}), and added
103    using L{addHandler}.
104
105    @ivar xmlstream: currently managed XML stream
106    @type xmlstream: L{XmlStream}
107    @ivar handlers: list of protocol handlers
108    @type handlers: L{list} of objects providing
109                      L{IXMPPHandler}
110    @ivar logTraffic: if true, log all traffic.
111    @type logTraffic: L{bool}
112    @ivar _packetQueue: internal buffer of unsent data. See L{send} for details.
113    @type _packetQueue: L{list}
114    """
115
116    logTraffic = False
117
118    def __init__(self, factory):
119        self.handlers = []
120        self.xmlstream = None
121        self._packetQueue = []
122        self._initialized = False
123
124        factory.addBootstrap(xmlstream.STREAM_CONNECTED_EVENT, self._connected)
125        factory.addBootstrap(xmlstream.STREAM_AUTHD_EVENT, self._authd)
126        factory.addBootstrap(xmlstream.INIT_FAILED_EVENT,
127                             self.initializationFailed)
128        factory.addBootstrap(xmlstream.STREAM_END_EVENT, self._disconnected)
129        self.factory = factory
130
131    def _connected(self, xs):
132        def logDataIn(buf):
133            log.msg("RECV: %r" % buf)
134
135        def logDataOut(buf):
136            log.msg("SEND: %r" % buf)
137
138        if self.logTraffic:
139            xs.rawDataInFn = logDataIn
140            xs.rawDataOutFn = logDataOut
141
142        self.xmlstream = xs
143
144        for e in self:
145            e.makeConnection(xs)
146
147    def _authd(self, xs):
148        # Flush all pending packets
149        for p in self._packetQueue:
150            xs.send(p)
151        self._packetQueue = []
152        self._initialized = True
153
154        # Notify all child services which implement
155        # the IService interface
156        for e in self:
157            e.connectionInitialized()
158
159    def initializationFailed(self, reason):
160        """
161        Called when stream initialization has failed.
162
163        Stream initialization has halted, with the reason indicated by
164        C{reason}. It may be retried by calling the authenticator's
165        C{initializeStream}. See the respective authenticators for details.
166
167        @param reason: A failure instance indicating why stream initialization
168                       failed.
169        @type reason: L{failure.Failure}
170        """
171
172    def _disconnected(self, _):
173        self.xmlstream = None
174        self._initialized = False
175
176        # Notify all child services which implement
177        # the IService interface
178        for e in self:
179            e.xmlstream = None
180            e.connectionLost(None)
181
182    def send(self, obj):
183        """
184        Send data over the XML stream.
185
186        When there is no established XML stream, the data is queued and sent
187        out when a new XML stream has been established and initialized.
188
189        @param obj: data to be sent over the XML stream. See
190                    L{xmlstream.XmlStream.send} for details.
191        """
192
193        if self._initialized:
194            self.xmlstream.send(obj)
195        else:
196            self._packetQueue.append(obj)
197
198
199class IQHandlerMixin(object):
200    """
201    XMPP subprotocol mixin for handle incoming IQ stanzas.
202
203    This matches up the iq with XPath queries to call methods on itself,
204    wrapping the call so that exceptions result in proper error responses,
205    or, when succesful will reply with a response with optional payload.
206
207    Derivatives of this class must provide an
208    L{XmlStream<twisted.words.protocols.jabber.xmlstream.XmlStream>} instance
209    in its C{xmlstream} attribute.
210
211    The optional payload is taken from the result of the handler and is
212    expected to be a child or a list of childs.
213
214    If an exception is raised, or the deferred has its errback called,
215    the exception is checked for being a L{error.StanzaError}. If so,
216    an error response is sent. Any other exception will cause a error
217    response of C{internal-server-error} to be sent.
218
219    @cvar iqHandlers: Mapping from XPath queries (as a string) to the method
220                      name that will handle requests that match the query.
221    @type iqHandlers: L{dict}
222    """
223
224    iqHandlers = None
225
226    def handleRequest(self, iq):
227        """
228        Find a handler and wrap the call for sending a response stanza.
229        """
230        def toResult(result, iq):
231            response = xmlstream.toResponse(iq, 'result')
232
233            if result:
234                if IElement.providedBy(result):
235                    response.addChild(result)
236                else:
237                    for element in result:
238                        response.addChild(element)
239
240            return response
241
242        def checkNotImplemented(failure):
243            failure.trap(NotImplementedError)
244            raise error.StanzaError('feature-not-implemented')
245
246        def fromStanzaError(failure, iq):
247            e = failure.trap(error.StanzaError)
248            return failure.value.toResponse(iq)
249
250        def fromOtherError(failure, iq):
251            log.msg("Unhandled error in iq handler:", isError=True)
252            log.err(failure)
253            return error.StanzaError('internal-server-error').toResponse(iq)
254
255        handler = None
256        for queryString, method in self.iqHandlers.iteritems():
257            if xpath.internQuery(queryString).matches(iq):
258                handler = getattr(self, method)
259
260        if handler:
261            d = defer.maybeDeferred(handler, iq)
262        else:
263            d = defer.fail(NotImplementedError())
264
265        d.addCallback(toResult, iq)
266        d.addErrback(checkNotImplemented)
267        d.addErrback(fromStanzaError, iq)
268        d.addErrback(fromOtherError, iq)
269
270        d.addCallback(self.send)
271
272        iq.handled = True
Note: See TracBrowser for help on using the repository browser.