# -*- test-case-name: wokkel.test.test_subprotocols -*- # # Copyright (c) 2001-2009 Twisted Matrix Laboratories. # See LICENSE for details. """ XMPP subprotocol support. """ from zope.interface import implements from twisted.internet import defer from twisted.python import log from twisted.words.protocols.jabber import error, xmlstream from twisted.words.protocols.jabber.xmlstream import toResponse from twisted.words.xish import xpath from twisted.words.xish.domish import IElement from wokkel.iwokkel import IXMPPHandler, IXMPPHandlerCollection class XMPPHandler(object): """ XMPP protocol handler. Classes derived from this class implement (part of) one or more XMPP extension protocols, and are referred to as a subprotocol implementation. """ implements(IXMPPHandler) def __init__(self): self.parent = None self.xmlstream = None def setHandlerParent(self, parent): self.parent = parent self.parent.addHandler(self) def disownHandlerParent(self, parent): self.parent.removeHandler(self) self.parent = None def makeConnection(self, xs): self.xmlstream = xs self.connectionMade() def connectionMade(self): """ Called after a connection has been established. Can be overridden to perform work before stream initialization. """ def connectionInitialized(self): """ The XML stream has been initialized. Can be overridden to perform work after stream initialization, e.g. to set up observers and start exchanging XML stanzas. """ def connectionLost(self, reason): """ The XML stream has been closed. This method can be extended to inspect the C{reason} argument and act on it. """ self.xmlstream = None def send(self, obj): """ Send data over the managed XML stream. @note: The stream manager maintains a queue for data sent using this method when there is no current initialized XML stream. This data is then sent as soon as a new stream has been established and initialized. Subsequently, L{connectionInitialized} will be called again. If this queueing is not desired, use C{send} on C{self.xmlstream}. @param obj: data to be sent over the XML stream. This is usually an object providing L{domish.IElement}, or serialized XML. See L{xmlstream.XmlStream} for details. """ self.parent.send(obj) class XMPPHandlerCollection(object): """ Collection of XMPP subprotocol handlers. This allows for grouping of subprotocol handlers, but is not an L{XMPPHandler} itself, so this is not recursive. @ivar handlers: List of protocol handlers. @type handlers: L{list} of objects providing L{IXMPPHandler} """ implements(IXMPPHandlerCollection) def __init__(self): self.handlers = [] def __iter__(self): """ Act as a container for handlers. """ return iter(self.handlers) def addHandler(self, handler): """ Add protocol handler. Protocol handlers are expected to provide L{IXMPPHandler}. """ self.handlers.append(handler) def removeHandler(self, handler): """ Remove protocol handler. """ self.handlers.remove(handler) class StreamManager(XMPPHandlerCollection): """ Business logic representing a managed XMPP connection. This maintains a single XMPP connection and provides facilities for packet routing and transmission. Business logic modules are objects providing L{IXMPPHandler} (like subclasses of L{XMPPHandler}), and added using L{addHandler}. @ivar xmlstream: currently managed XML stream @type xmlstream: L{XmlStream} @ivar logTraffic: if true, log all traffic. @type logTraffic: L{bool} @ivar _initialized: Whether the stream represented by L{xmlstream} has been initialized. This is used when caching outgoing stanzas. @type _initialized: C{bool} @ivar _packetQueue: internal buffer of unsent data. See L{send} for details. @type _packetQueue: L{list} """ logTraffic = False def __init__(self, factory): XMPPHandlerCollection.__init__(self) self.xmlstream = None self._packetQueue = [] self._initialized = False factory.addBootstrap(xmlstream.STREAM_CONNECTED_EVENT, self._connected) factory.addBootstrap(xmlstream.STREAM_AUTHD_EVENT, self._authd) factory.addBootstrap(xmlstream.INIT_FAILED_EVENT, self.initializationFailed) factory.addBootstrap(xmlstream.STREAM_END_EVENT, self._disconnected) self.factory = factory def addHandler(self, handler): """ Add protocol handler. When an XML stream has already been established, the handler's C{connectionInitialized} will be called to get it up to speed. """ XMPPHandlerCollection.addHandler(self, handler) # get protocol handler up to speed when a connection has already # been established if self.xmlstream and self._initialized: handler.makeConnection(self.xmlstream) handler.connectionInitialized() def _connected(self, xs): """ Called when the transport connection has been established. Here we optionally set up traffic logging (depending on L{logTraffic}) and call each handler's C{makeConnection} method with the L{XmlStream} instance. """ def logDataIn(buf): log.msg("RECV: %r" % buf) def logDataOut(buf): log.msg("SEND: %r" % buf) if self.logTraffic: xs.rawDataInFn = logDataIn xs.rawDataOutFn = logDataOut self.xmlstream = xs for e in self: e.makeConnection(xs) def _authd(self, xs): """ Called when the stream has been initialized. Send out cached stanzas and call each handler's C{connectionInitialized} method. """ # Flush all pending packets for p in self._packetQueue: xs.send(p) self._packetQueue = [] self._initialized = True # Notify all child services which implement # the IService interface for e in self: e.connectionInitialized() def initializationFailed(self, reason): """ Called when stream initialization has failed. Stream initialization has halted, with the reason indicated by C{reason}. It may be retried by calling the authenticator's C{initializeStream}. See the respective authenticators for details. @param reason: A failure instance indicating why stream initialization failed. @type reason: L{failure.Failure} """ def _disconnected(self, _): """ Called when the stream has been closed. From this point on, the manager doesn't interact with the L{XmlStream} anymore and notifies each handler that the connection was lost by calling its C{connectionLost} method. """ self.xmlstream = None self._initialized = False # Notify all child services which implement # the IService interface for e in self: e.connectionLost(None) def send(self, obj): """ Send data over the XML stream. When there is no established XML stream, the data is queued and sent out when a new XML stream has been established and initialized. @param obj: data to be sent over the XML stream. See L{xmlstream.XmlStream.send} for details. """ if self._initialized: self.xmlstream.send(obj) else: self._packetQueue.append(obj) class IQHandlerMixin(object): """ XMPP subprotocol mixin for handle incoming IQ stanzas. This matches up the iq with XPath queries to call methods on itself, wrapping the call so that exceptions result in proper error responses, or, when succesful will reply with a response with optional payload. Derivatives of this class must provide an L{XmlStream} instance in its C{xmlstream} attribute. The optional payload is taken from the result of the handler and is expected to be a child or a list of childs. If an exception is raised, or the deferred has its errback called, the exception is checked for being a L{error.StanzaError}. If so, an error response is sent. Any other exception will cause a error response of C{internal-server-error} to be sent. A typical way to use this mixin, is to set up L{xpath} observers on the C{xmlstream} to call handleRequest, for example in an overridden L{XMPPHandler.connectionMade}. It is likely a good idea to only listen for incoming iq get and/org iq set requests, and not for any iq, to prevent hijacking incoming responses to outgoing iq requests. An example: >>> QUERY_ROSTER = "/query[@xmlns='jabber:iq:roster']" >>> class MyHandler(XMPPHandler, IQHandlerMixin): ... iqHandlers = {"/iq[@type='get']" + QUERY_ROSTER: 'onRosterGet', ... "/iq[@type='set']" + QUERY_ROSTER: 'onRosterSet'} ... def connectionMade(self): ... self.xmlstream.addObserver( ... "/iq[@type='get' or @type='set']" + QUERY_ROSTER, ... self.handleRequest) ... def onRosterGet(self, iq): ... pass ... def onRosterSet(self, iq): ... pass @cvar iqHandlers: Mapping from XPath queries (as a string) to the method name that will handle requests that match the query. @type iqHandlers: L{dict} """ iqHandlers = None def handleRequest(self, iq): """ Find a handler and wrap the call for sending a response stanza. """ def toResult(result, iq): response = toResponse(iq, 'result') if result: if IElement.providedBy(result): response.addChild(result) else: for element in result: response.addChild(element) return response def checkNotImplemented(failure): failure.trap(NotImplementedError) raise error.StanzaError('feature-not-implemented') def fromStanzaError(failure, iq): e = failure.trap(error.StanzaError) return failure.value.toResponse(iq) def fromOtherError(failure, iq): log.msg("Unhandled error in iq handler:", isError=True) log.err(failure) return error.StanzaError('internal-server-error').toResponse(iq) handler = None for queryString, method in self.iqHandlers.iteritems(): if xpath.internQuery(queryString).matches(iq): handler = getattr(self, method) if handler: d = defer.maybeDeferred(handler, iq) else: d = defer.fail(NotImplementedError()) d.addCallback(toResult, iq) d.addErrback(checkNotImplemented) d.addErrback(fromStanzaError, iq) d.addErrback(fromOtherError, iq) d.addCallback(self.send) iq.handled = True