source: ralphm-patches/xmpp_client_service.patch @ 35:2866eaae775f

Last change on this file since 35:2866eaae775f was 35:2866eaae775f, checked in by Ralph Meijer <ralphm@…>, 11 years ago

Minor cleanups to deal with changed APIs

File size: 5.0 KB
  • wokkel/client.py

    diff -r 67a42d8a1c73 wokkel/client.py
    a b  
    1717from twisted.names.srvconnect import SRVConnector
    1818from twisted.python import log
    1919from twisted.words.protocols.jabber import client, error, sasl, xmlstream
    20 from twisted.words.protocols.jabber.jid import internJID as JID
    21 from twisted.words.xish import domish
     20from twisted.words.protocols.jabber.jid import JID, internJID
     21from twisted.words.xish import domish, utility
    2222
    2323from wokkel import generic
    24 from wokkel.subprotocols import StreamManager
     24from wokkel.compat import XmlStreamServerFactory
     25from wokkel.subprotocols import StreamManager, XMPPHandlerCollection
    2526
    2627NS_CLIENT = 'jabber:client'
    2728
     
    350351            reply['id'] = iq['id']
    351352        reply.addElement((client.NS_XMPP_SESSION, 'session'))
    352353        self.xmlstream.send(reply)
     354
     355
     356
     357class DemuxedXmlStream(utility.EventDispatcher):
     358    """
     359    Fake XML stream for demultiplexing incoming streams.
     360
     361    Incoming traffic should have its C{from} attribute set to the JID of the
     362    sender and then L{dispatch<utility.EventDispatcher.dispatch>}ed. Outgoing
     363    traffic needs to have the C{to} attribute set. It is then passed on to
     364    the stream manager's C{send} method.
     365    """
     366
     367    def send(self, element):
     368        """
     369        Send element out over the wire.
     370
     371        This calls the stream manager to forward the element based on
     372        the embedded addressing information.
     373        """
     374        self.manager.send(element)
     375
     376
     377
     378class ClientService(XMPPHandlerCollection, service.Service):
     379    """
     380    Service for accepting XMPP client connections.
     381
     382    Incoming client connections are first authenticated using the
     383    L{XMPPClientListenAuthenticator}, and then kept in a dictionary that is
     384    keyed with the JID that was bound to that connection.
     385
     386    Where L{StreamManager} manages one connection at a time, this
     387    service demultiplexes incoming connections. For the subprotocol handlers
     388    (objects providing {ijabber.IXMPPHandler}), their stream is always
     389    connected and initialized, but they only have to deal with one stream. This
     390    makes it easier to create adapters.
     391
     392    As an L{xmlstream.XMPPHandlerCollection}, this service creates a fake XML
     393    stream that is passed to the XMPP subprotocol handlers. The received
     394    stanzas from the incoming client connections are passed on to the handlers
     395    using this fake XML, while having their C{from} attribute set to the JID of
     396    the client stream. Stanzas sent by the handlers are forwarded to the
     397    matching client stream, selected by the stanzas C{to} attribute.
     398    """
     399
     400    logTraffic = False
     401
     402    def __init__(self, domain, port=5222):
     403        self.domain = domain
     404        self.port = port
     405
     406        def authenticatorFactory():
     407            return XMPPClientListenAuthenticator(self.domain)
     408
     409        self.factory = XmlStreamServerFactory(authenticatorFactory)
     410        self.factory.addBootstrap(xmlstream.STREAM_CONNECTED_EVENT,
     411                                  self.makeConnection)
     412        self.factory.addBootstrap(xmlstream.STREAM_AUTHD_EVENT,
     413                                  self.connectionInitialized)
     414
     415        self.streams = {}
     416
     417        XMPPHandlerCollection.__init__(self)
     418
     419
     420    def startService(self):
     421
     422        self.xmlstream = DemuxedXmlStream()
     423        self.xmlstream.manager = self
     424        self._initialized = True
     425
     426        for handler in self:
     427            handler.makeConnection(self.xmlstream)
     428            handler.connectionInitialized()
     429
     430        service.Service.startService(self)
     431        reactor.listenTCP(self.port, self.factory)
     432
     433
     434    def makeConnection(self, xs):
     435        def logDataIn(buf):
     436            log.msg("RECV: %r" % buf)
     437
     438        def logDataOut(buf):
     439            log.msg("SEND: %r" % buf)
     440
     441        if self.logTraffic:
     442            xs.rawDataInFn = logDataIn
     443            xs.rawDataOutFn = logDataOut
     444
     445
     446    def connectionInitialized(self, xs):
     447        self.streams[xs.otherEntity.full()] = xs
     448        xs.addObserver(xmlstream.STREAM_END_EVENT,
     449                       lambda failure: self.connectionDisconnected(xs))
     450        xs.addObserver('/*', lambda element: self.onElement(element, xs))
     451
     452
     453    def connectionDisconnected(self, xs):
     454        del self.streams[xs.otherEntity.full()]
     455
     456
     457    def onElement(self, element, xs):
     458        """
     459        Called when an element was received from one of the connected streams.
     460
     461        """
     462        if element.handled:
     463            return
     464
     465        element["from"] = xs.otherEntity.full()
     466        self.xmlstream.dispatch(element)
     467
     468
     469    def send(self, element):
     470        """
     471        Send element to the proper XML Stream.
     472
     473        This uses addressing embedded in the element to find the correct
     474        stream to forward the element to.
     475        """
     476
     477        destination = internJID(element["to"]).full()
     478
     479        if destination not in self.streams:
     480            raise Exception("Destination unreachable")
     481
     482        self.streams[destination].send(element)
Note: See TracBrowser for help on using the repository browser.