source: ralphm-patches/xmpp_client_service.patch @ 38:c399ade66e8a

Last change on this file since 38:c399ade66e8a was 38:c399ade66e8a, checked in by Ralph Meijer <ralphm@…>, 11 years ago

Merged xmpp c2s stuff into one patch, added routing magic.

File size: 8.2 KB
  • new file doc/examples/client_service.tac

    diff -r 1644083ca235 doc/examples/client_service.tac
    - +  
     1from twisted.application import service, strports
     2from twisted.internet import defer
     3
     4from wokkel import client
     5from wokkel.generic import FallbackHandler
     6from wokkel.subprotocols import XMPPHandler
     7from wokkel.xmppim import RosterItem, RosterServerProtocol
     8
     9from twisted.words.protocols.jabber.jid import internJID as JID
     10
     11import socket
     12domain = socket.gethostname()
     13
     14class StaticRoster(RosterServerProtocol):
     15
     16    def __init__(self):
     17        RosterServerProtocol.__init__(self)
     18        self.roster = {'ralphm': [
     19                           RosterItem(JID('intosi@'+domain),
     20                                      subscriptionTo=True,
     21                                      subscriptionFrom=True,
     22                                      name='Intosi',
     23                                      groups=set(['Friends'])),
     24                           RosterItem(JID('termie@'+domain),
     25                                      subscriptionTo=True,
     26                                      subscriptionFrom=True,
     27                                      name='termie'),
     28                           ],
     29                       'test': [],
     30                       }
     31
     32    def getRoster(self, entity):
     33        return defer.succeed(self.roster[entity.user])
     34
     35
     36class Hello(XMPPHandler):
     37
     38    def q(self):
     39        from wokkel.xmppim import AvailabilityPresence
     40        p = AvailabilityPresence(JID('ralphm@'+domain+'/default'), JID('termie@'+domain+'/Home'), show='chat')
     41        self.parent.send(p.toElement())
     42
     43
     44    def connectionInitialized(self):
     45        from twisted.internet import reactor
     46        reactor.callLater(5, self.q)
     47
     48
     49
     50application = service.Application("Jabber server")
     51
     52sessionManager = client.SessionManager()
     53FallbackHandler().setHandlerParent(sessionManager)
     54StaticRoster().setHandlerParent(sessionManager)
     55Hello().setHandlerParent(sessionManager)
     56
     57clientService = client.ClientService(sessionManager, domain)
     58clientService.logTraffic = True
     59
     60
     61c2sFactory = client.XMPPC2SServerFactory(clientService)
     62c2sFactory.logTraffic = True
     63c2sService = strports.service('5224', c2sFactory)
     64c2sService.setServiceParent(application)
  • wokkel/client.py

    diff -r 1644083ca235 wokkel/client.py
    a b  
    1717from twisted.names.srvconnect import SRVConnector
    1818from twisted.python import log
    1919from twisted.words.protocols.jabber import client, error, sasl, xmlstream
    20 from twisted.words.protocols.jabber.jid import internJID as JID
     20from twisted.words.protocols.jabber.jid import JID, internJID
    2121from twisted.words.xish import domish
    2222
    2323from wokkel import generic
    24 from wokkel.subprotocols import StreamManager
     24from wokkel.compat import XmlStreamServerFactory
     25from wokkel.subprotocols import StreamManager, XMPPHandlerCollection
    2526
    2627NS_CLIENT = 'jabber:client'
    2728
     
    311312
    312313        # TODO: check for resource conflicts
    313314
     315        print self.username, self.domain, self.resource
    314316        newJID = JID(tuple=(self.username, self.domain, self.resource))
    315317
    316318        reply = domish.Element((None, 'iq'))
     
    350352            reply['id'] = iq['id']
    351353        reply.addElement((client.NS_XMPP_SESSION, 'session'))
    352354        self.xmlstream.send(reply)
     355
     356
     357
     358class XMPPC2SServerFactory(XmlStreamServerFactory):
     359
     360    def __init__(self, service):
     361        self.service = service
     362
     363        def authenticatorFactory():
     364            return XMPPClientListenAuthenticator(service.domain)
     365
     366        XmlStreamServerFactory.__init__(self, authenticatorFactory)
     367        self.addBootstrap(xmlstream.STREAM_CONNECTED_EVENT,
     368                          self.onConnectionMade)
     369        self.addBootstrap(xmlstream.STREAM_AUTHD_EVENT,
     370                          self.onAuthenticated)
     371
     372        self.serial = 0
     373
     374
     375    def onConnectionMade(self, xs):
     376        """
     377        Called when a client-to-server connection was made.
     378
     379        This enables traffic debugging on incoming streams.
     380        """
     381        xs.serial = self.serial
     382        self.serial += 1
     383
     384        log.msg("Client connection %d made" % xs.serial)
     385
     386        def logDataIn(buf):
     387            log.msg("RECV (%d): %r" % (xs.serial, buf))
     388
     389        def logDataOut(buf):
     390            log.msg("SEND (%d): %r" % (xs.serial, buf))
     391
     392        if self.logTraffic:
     393            xs.rawDataInFn = logDataIn
     394            xs.rawDataOutFn = logDataOut
     395
     396        xs.addObserver(xmlstream.STREAM_ERROR_EVENT, self.onError)
     397
     398
     399    def onAuthenticated(self, xs):
     400        log.msg("Client connection %d authenticated" % xs.serial)
     401
     402        xs.addObserver(xmlstream.STREAM_END_EVENT, self.onConnectionLost,
     403                                                   0, xs)
     404        xs.addObserver('/*', self.onElement, 0, xs)
     405
     406        self.service.connectionInitialized(xs)
     407
     408
     409    def onConnectionLost(self, xs, reason):
     410        log.msg("Client connection %d disconnected" % xs.serial)
     411
     412        self.service.connectionLost(xs, reason)
     413
     414
     415    def onError(self, reason):
     416        log.err(reason, "Stream Error")
     417
     418
     419    def onElement(self, xs, element):
     420        """
     421        Called when an element was received from one of the connected streams.
     422
     423        """
     424        if element.handled:
     425            return
     426        else:
     427            self.service.dispatch(xs, element)
     428
     429
     430
     431class SessionManager(XMPPHandlerCollection):
     432
     433    def __init__(self):
     434        XMPPHandlerCollection.__init__(self)
     435        self.xmlstream = None
     436
     437
     438    def makeConnection(self, xs):
     439        self.xmlstream = xs
     440
     441        for handler in self:
     442            handler.makeConnection(xs)
     443            handler.connectionInitialized()
     444
     445
     446    def send(self, obj):
     447        if self.xmlstream:
     448            self.xmlstream.send(obj)
     449
     450
     451class ClientService(object):
     452    """
     453    Service for accepting XMPP client connections.
     454
     455    Incoming client connections are first authenticated using the
     456    L{XMPPClientListenAuthenticator}, and then kept in a dictionary that is
     457    keyed with the JID that was bound to that connection.
     458
     459    Where L{StreamManager} manages one connection at a time, this
     460    service demultiplexes incoming connections. For the subprotocol handlers
     461    (objects providing {ijabber.IXMPPHandler}), their stream is always
     462    connected and initialized, but they only have to deal with one stream. This
     463    makes it easier to create adapters.
     464
     465    As an L{xmlstream.XMPPHandlerCollection}, this service creates a fake XML
     466    stream that is passed to the XMPP subprotocol handlers. The received
     467    stanzas from the incoming client connections are passed on to the handlers
     468    using this fake XML, while having their C{from} attribute set to the JID of
     469    the client stream. Stanzas sent by the handlers are forwarded to the
     470    matching client stream, selected by the stanzas C{to} attribute.
     471    """
     472
     473    logTraffic = False
     474
     475    def __init__(self, sessionManager, domain):
     476        self.sessionManager = sessionManager
     477        self.domain = domain
     478
     479        self.streams = {}
     480
     481        pipe = generic.XmlPipe()
     482        self.xmlstream = pipe.source
     483        self.sessionManager.makeConnection(pipe.sink)
     484        self.xmlstream.addObserver('/*', self.send)
     485
     486
     487    def connectionInitialized(self, xs):
     488        self.streams[xs.otherEntity] = xs
     489
     490
     491    def connectionLost(self, xs, reason):
     492        if xs.otherEntity in self.streams:
     493            del self.streams[xs.otherEntity]
     494
     495
     496    def send(self, stanza):
     497        """
     498        Send stanza to the proper XML Stream.
     499
     500        This uses addressing embedded in the element to find the correct stream
     501        to forward the element to.
     502        """
     503        destination = internJID(stanza["to"])
     504
     505        if destination not in self.streams:
     506            log.msg("Euh")
     507            raise Exception("Destination unreachable")
     508
     509        self.streams[destination].send(stanza)
     510
     511
     512    def dispatch(self, xs, stanza):
     513        """
     514        Called when an element was received from one of the connected streams.
     515        """
     516        stanza["from"] = xs.otherEntity.full()
     517        self.xmlstream.send(stanza)
Note: See TracBrowser for help on using the repository browser.