diff -r 67a42d8a1c73 wokkel/client.py --- a/wokkel/client.py Fri Feb 12 09:31:19 2010 +0100 +++ b/wokkel/client.py Fri Feb 12 09:37:28 2010 +0100 @@ -18,10 +18,11 @@ from twisted.python import log from twisted.words.protocols.jabber import client, error, sasl, xmlstream from twisted.words.protocols.jabber.jid import internJID as JID -from twisted.words.xish import domish +from twisted.words.xish import domish, utility from wokkel import generic -from wokkel.subprotocols import StreamManager +from wokkel.compat import XmlStreamServerFactory +from wokkel.subprotocols import StreamManager, XMPPHandlerContainer NS_CLIENT = 'jabber:client' @@ -350,3 +351,130 @@ reply['id'] = iq['id'] reply.addElement((client.NS_XMPP_SESSION, 'session')) self.xmlstream.send(reply) + + + +class DemuxedXmlStream(utility.EventDispatcher): + """ + Fake XML stream for demultiplexing incoming streams. + + Incoming traffic should have its C{from} attribute set to the JID of the + sender and then L{dispatch}ed. Outgoing + traffic needs to have the C{to} attribute set. It is then passed on to + the stream manager's C{send} method. + """ + + def send(self, element): + """ + Send element out over the wire. + + This calls the stream manager to forward the element based on + the embedded addressing information. + """ + self.manager.send(element) + + + +class ClientService(XMPPHandlerContainer, service.Service): + """ + Service for accepting XMPP client connections. + + Incoming client connections are first authenticated using the + L{XMPPClientListenAuthenticator}, and then kept in a dictionary that is + keyed with the JID that was bound to that connection. + + Where L{StreamManager} manages one connection at a time, this + service demultiplexes incoming connections. For the subprotocol handlers + (objects providing {ijabber.IXMPPHandler}), their stream is always + connected and initialized, but they only have to deal with one stream. This + makes it easier to create adapters. + + As an L{xmlstream.XMPPHandlerContainer}, this service creates a fake XML + stream that is passed to the XMPP subprotocol handlers. The received + stanzas from the incoming client connections are passed on to the handlers + using this fake XML, while having their C{from} attribute set to the JID of + the client stream. Stanzas sent by the handlers are forwarded to the + matching client stream, selected by the stanzas C{to} attribute. + """ + + logTraffic = False + + def __init__(self, domain, port=5222): + self.domain = domain + self.port = port + + self.factory = XmlStreamServerFactory(XMPPClientListenAuthenticator, + self.domain) + self.factory.addBootstrap(xmlstream.STREAM_CONNECTED_EVENT, + self.makeConnection) + self.factory.addBootstrap(xmlstream.STREAM_AUTHD_EVENT, + self.connectionInitialized) + + self.streams = {} + + XMPPHandlerContainer.__init__(self) + + + def startService(self): + + self.xmlstream = DemuxedXmlStream() + self.xmlstream.manager = self + self._initialized = True + + for handler in self: + handler.makeConnection(self.xmlstream) + handler.connectionInitialized() + + service.Service.startService(self) + reactor.listenTCP(self.port, self.factory) + + + def makeConnection(self, xs): + def logDataIn(buf): + log.msg("RECV: %r" % buf) + + def logDataOut(buf): + log.msg("SEND: %r" % buf) + + if self.logTraffic: + xs.rawDataInFn = logDataIn + xs.rawDataOutFn = logDataOut + + + def connectionInitialized(self, xs): + self.streams[xs.otherEntity.full()] = xs + xs.addObserver(xmlstream.STREAM_END_EVENT, + lambda failure: self.connectionDisconnected(xs)) + xs.addObserver('/*', lambda element: self.onElement(element, xs)) + + + def connectionDisconnected(self, xs): + del self.streams[xs.otherEntity.full()] + + + def onElement(self, element, xs): + """ + Called when an element was received from one of the connected streams. + + """ + if element.handled: + return + + element["from"] = xs.otherEntity.full() + self.xmlstream.dispatch(element) + + + def send(self, element): + """ + Send element to the proper XML Stream. + + This uses addressing embedded in the element to find the correct + stream to forward the element to. + """ + + destination = JID(element["to"]).full() + + if destination not in self.streams: + raise Exception("Destination unreachable") + + self.streams[destination].send(element)