Changeset 37:03cd0cb8548c in ralphm-patches


Ignore:
Timestamp:
Feb 14, 2010, 11:09:07 AM (11 years ago)
Author:
Ralph Meijer <ralphm@…>
Branch:
default
Message:

Create factory for accepting c2s connections, split out session manager.

Files:
1 deleted
3 edited

Legend:

Unmodified
Added
Removed
  • roster_server.patch

    r36 r37  
    11diff -r 7b9f484b0b44 wokkel/xmppim.py
    22--- a/wokkel/xmppim.py  Fri Feb 12 19:49:36 2010 +0100
    3 +++ b/wokkel/xmppim.py  Fri Feb 12 19:53:42 2010 +0100
     3+++ b/wokkel/xmppim.py  Sat Feb 13 18:57:26 2010 +0100
    44@@ -12,6 +12,7 @@
    55 All of it should eventually move to Twisted.
     
    7272+
    7373+    def connectionInitialized(self):
    74 +        self.xmlstream.addObserver(XPATH_ROSTER_GET, self._onRosterGet, -1)
    75 +        self.xmlstream.addObserver(XPATH_ROSTER_SET, self._onRosterSet, -1)
     74+        self.xmlstream.addObserver(XPATH_ROSTER_GET, self._onRosterGet)
     75+        self.xmlstream.addObserver(XPATH_ROSTER_SET, self._onRosterSet)
    7676+
    7777+
  • series

    r36 r37  
    33xmpp_client_listener.patch
    44xmpp_client_service.patch
    5 client_service_example.patch
    65component_server.patch
    76pubsub_disco_fixes.patch
  • xmpp_client_service.patch

    r35 r37  
    1 diff -r 67a42d8a1c73 wokkel/client.py
    2 --- a/wokkel/client.py  Fri Feb 12 09:31:19 2010 +0100
    3 +++ b/wokkel/client.py  Fri Feb 12 11:46:41 2010 +0100
     1diff -r 1644083ca235 doc/examples/client_service.tac
     2--- /dev/null   Thu Jan 01 00:00:00 1970 +0000
     3+++ b/doc/examples/client_service.tac   Sat Feb 13 19:01:07 2010 +0100
     4@@ -0,0 +1,64 @@
     5+from twisted.application import service, strports
     6+from twisted.internet import defer
     7+
     8+from wokkel import client
     9+from wokkel.generic import FallbackHandler
     10+from wokkel.subprotocols import XMPPHandler
     11+from wokkel.xmppim import RosterItem, RosterServerProtocol
     12+
     13+from twisted.words.protocols.jabber.jid import internJID as JID
     14+
     15+import socket
     16+domain = socket.gethostname()
     17+
     18+class StaticRoster(RosterServerProtocol):
     19+
     20+    def __init__(self):
     21+        RosterServerProtocol.__init__(self)
     22+        self.roster = {'ralphm': [
     23+                           RosterItem(JID('intosi@'+domain),
     24+                                      subscriptionTo=True,
     25+                                      subscriptionFrom=True,
     26+                                      name='Intosi',
     27+                                      groups=set(['Friends'])),
     28+                           RosterItem(JID('termie@'+domain),
     29+                                      subscriptionTo=True,
     30+                                      subscriptionFrom=True,
     31+                                      name='termie'),
     32+                           ],
     33+                       'test': [],
     34+                       }
     35+
     36+    def getRoster(self, entity):
     37+        return defer.succeed(self.roster[entity.user])
     38+
     39+
     40+class Hello(XMPPHandler):
     41+
     42+    def q(self):
     43+        from wokkel.xmppim import AvailabilityPresence
     44+        p = AvailabilityPresence(JID('ralphm@'+domain+'/default'), JID('termie@'+domain+'/Home'), show='chat')
     45+        self.parent.send(p.toElement())
     46+
     47+
     48+    def connectionInitialized(self):
     49+        from twisted.internet import reactor
     50+        reactor.callLater(5, self.q)
     51+
     52+
     53+
     54+application = service.Application("Jabber server")
     55+
     56+sessionManager = client.SessionManager()
     57+FallbackHandler().setHandlerParent(sessionManager)
     58+StaticRoster().setHandlerParent(sessionManager)
     59+Hello().setHandlerParent(sessionManager)
     60+
     61+clientService = client.ClientService(sessionManager, domain)
     62+clientService.logTraffic = True
     63+
     64+
     65+c2sFactory = client.XMPPC2SServerFactory(clientService)
     66+c2sFactory.logTraffic = True
     67+c2sService = strports.service('5224', c2sFactory)
     68+c2sService.setServiceParent(application)
     69diff -r 1644083ca235 wokkel/client.py
     70--- a/wokkel/client.py  Sat Feb 13 18:57:27 2010 +0100
     71+++ b/wokkel/client.py  Sat Feb 13 19:01:07 2010 +0100
    472@@ -17,11 +17,12 @@
    573 from twisted.names.srvconnect import SRVConnector
     
    775 from twisted.words.protocols.jabber import client, error, sasl, xmlstream
    876-from twisted.words.protocols.jabber.jid import internJID as JID
    9 -from twisted.words.xish import domish
    1077+from twisted.words.protocols.jabber.jid import JID, internJID
    11 +from twisted.words.xish import domish, utility
     78 from twisted.words.xish import domish
    1279 
    1380 from wokkel import generic
     
    1885 NS_CLIENT = 'jabber:client'
    1986 
    20 @@ -350,3 +351,132 @@
     87@@ -311,6 +312,7 @@
     88 
     89         # TODO: check for resource conflicts
     90 
     91+        print self.username, self.domain, self.resource
     92         newJID = JID(tuple=(self.username, self.domain, self.resource))
     93 
     94         reply = domish.Element((None, 'iq'))
     95@@ -350,3 +352,166 @@
    2196             reply['id'] = iq['id']
    2297         reply.addElement((client.NS_XMPP_SESSION, 'session'))
     
    25100+
    26101+
    27 +class DemuxedXmlStream(utility.EventDispatcher):
    28 +    """
    29 +    Fake XML stream for demultiplexing incoming streams.
    30 +
    31 +    Incoming traffic should have its C{from} attribute set to the JID of the
    32 +    sender and then L{dispatch<utility.EventDispatcher.dispatch>}ed. Outgoing
    33 +    traffic needs to have the C{to} attribute set. It is then passed on to
    34 +    the stream manager's C{send} method.
    35 +    """
    36 +
    37 +    def send(self, element):
    38 +        """
    39 +        Send element out over the wire.
    40 +
    41 +        This calls the stream manager to forward the element based on
    42 +        the embedded addressing information.
    43 +        """
    44 +        self.manager.send(element)
    45 +
    46 +
    47 +
    48 +class ClientService(XMPPHandlerCollection, service.Service):
     102+class XMPPC2SServerFactory(XmlStreamServerFactory):
     103+
     104+    def __init__(self, service):
     105+        self.service = service
     106+
     107+        def authenticatorFactory():
     108+            return XMPPClientListenAuthenticator(service.domain)
     109+
     110+        XmlStreamServerFactory.__init__(self, authenticatorFactory)
     111+        self.addBootstrap(xmlstream.STREAM_CONNECTED_EVENT,
     112+                          self.onConnectionMade)
     113+        self.addBootstrap(xmlstream.STREAM_AUTHD_EVENT,
     114+                          self.onAuthenticated)
     115+
     116+        self.serial = 0
     117+
     118+
     119+    def onConnectionMade(self, xs):
     120+        """
     121+        Called when a client-to-server connection was made.
     122+
     123+        This enables traffic debugging on incoming streams.
     124+        """
     125+        xs.serial = self.serial
     126+        self.serial += 1
     127+
     128+        log.msg("Client connection %d made" % xs.serial)
     129+
     130+        def logDataIn(buf):
     131+            log.msg("RECV (%d): %r" % (xs.serial, buf))
     132+
     133+        def logDataOut(buf):
     134+            log.msg("SEND (%d): %r" % (xs.serial, buf))
     135+
     136+        if self.logTraffic:
     137+            xs.rawDataInFn = logDataIn
     138+            xs.rawDataOutFn = logDataOut
     139+
     140+        xs.addObserver(xmlstream.STREAM_ERROR_EVENT, self.onError)
     141+
     142+
     143+    def onAuthenticated(self, xs):
     144+        log.msg("Client connection %d authenticated" % xs.serial)
     145+
     146+        xs.addObserver(xmlstream.STREAM_END_EVENT, self.onConnectionLost,
     147+                                                   0, xs)
     148+        xs.addObserver('/*', self.onElement, 0, xs)
     149+
     150+        self.service.connectionInitialized(xs)
     151+
     152+
     153+    def onConnectionLost(self, xs, reason):
     154+        log.msg("Client connection %d disconnected" % xs.serial)
     155+
     156+        self.service.connectionLost(xs, reason)
     157+
     158+
     159+    def onError(self, reason):
     160+        log.err(reason, "Stream Error")
     161+
     162+
     163+    def onElement(self, xs, element):
     164+        """
     165+        Called when an element was received from one of the connected streams.
     166+
     167+        """
     168+        if element.handled:
     169+            return
     170+        else:
     171+            self.service.dispatch(xs, element)
     172+
     173+
     174+
     175+class SessionManager(XMPPHandlerCollection):
     176+
     177+    def __init__(self):
     178+        XMPPHandlerCollection.__init__(self)
     179+        self.xmlstream = None
     180+
     181+
     182+    def makeConnection(self, xs):
     183+        self.xmlstream = xs
     184+
     185+        for handler in self:
     186+            handler.makeConnection(xs)
     187+            handler.connectionInitialized()
     188+
     189+
     190+    def send(self, obj):
     191+        if self.xmlstream:
     192+            self.xmlstream.send(obj)
     193+
     194+
     195+class ClientService(object):
    49196+    """
    50197+    Service for accepting XMPP client connections.
     
    70217+    logTraffic = False
    71218+
    72 +    def __init__(self, domain, port=5222):
     219+    def __init__(self, sessionManager, domain):
     220+        self.sessionManager = sessionManager
    73221+        self.domain = domain
    74 +        self.port = port
    75 +
    76 +        def authenticatorFactory():
    77 +            return XMPPClientListenAuthenticator(self.domain)
    78 +
    79 +        self.factory = XmlStreamServerFactory(authenticatorFactory)
    80 +        self.factory.addBootstrap(xmlstream.STREAM_CONNECTED_EVENT,
    81 +                                  self.makeConnection)
    82 +        self.factory.addBootstrap(xmlstream.STREAM_AUTHD_EVENT,
    83 +                                  self.connectionInitialized)
    84222+
    85223+        self.streams = {}
    86224+
    87 +        XMPPHandlerCollection.__init__(self)
    88 +
    89 +
    90 +    def startService(self):
    91 +
    92 +        self.xmlstream = DemuxedXmlStream()
    93 +        self.xmlstream.manager = self
    94 +        self._initialized = True
    95 +
    96 +        for handler in self:
    97 +            handler.makeConnection(self.xmlstream)
    98 +            handler.connectionInitialized()
    99 +
    100 +        service.Service.startService(self)
    101 +        reactor.listenTCP(self.port, self.factory)
    102 +
    103 +
    104 +    def makeConnection(self, xs):
    105 +        def logDataIn(buf):
    106 +            log.msg("RECV: %r" % buf)
    107 +
    108 +        def logDataOut(buf):
    109 +            log.msg("SEND: %r" % buf)
    110 +
    111 +        if self.logTraffic:
    112 +            xs.rawDataInFn = logDataIn
    113 +            xs.rawDataOutFn = logDataOut
     225+        pipe = generic.XmlPipe()
     226+        self.xmlstream = pipe.source
     227+        self.sessionManager.makeConnection(pipe.sink)
     228+        self.xmlstream.addObserver('/*', self.send)
    114229+
    115230+
    116231+    def connectionInitialized(self, xs):
    117 +        self.streams[xs.otherEntity.full()] = xs
    118 +        xs.addObserver(xmlstream.STREAM_END_EVENT,
    119 +                       lambda failure: self.connectionDisconnected(xs))
    120 +        xs.addObserver('/*', lambda element: self.onElement(element, xs))
    121 +
    122 +
    123 +    def connectionDisconnected(self, xs):
    124 +        del self.streams[xs.otherEntity.full()]
    125 +
    126 +
    127 +    def onElement(self, element, xs):
     232+        self.streams[xs.otherEntity] = xs
     233+
     234+
     235+    def connectionLost(self, xs, reason):
     236+        if xs.otherEntity in self.streams:
     237+            del self.streams[xs.otherEntity]
     238+
     239+
     240+    def send(self, stanza):
     241+        """
     242+        Send stanza to the proper XML Stream.
     243+
     244+        This uses addressing embedded in the element to find the correct stream
     245+        to forward the element to.
     246+        """
     247+        destination = internJID(stanza["to"])
     248+
     249+        if destination not in self.streams:
     250+            log.msg("Euh")
     251+            raise Exception("Destination unreachable")
     252+
     253+        self.streams[destination].send(stanza)
     254+
     255+
     256+    def dispatch(self, xs, stanza):
    128257+        """
    129258+        Called when an element was received from one of the connected streams.
    130 +
    131 +        """
    132 +        if element.handled:
    133 +            return
    134 +
    135 +        element["from"] = xs.otherEntity.full()
    136 +        self.xmlstream.dispatch(element)
    137 +
    138 +
    139 +    def send(self, element):
    140 +        """
    141 +        Send element to the proper XML Stream.
    142 +
    143 +        This uses addressing embedded in the element to find the correct
    144 +        stream to forward the element to.
    145 +        """
    146 +
    147 +        destination = internJID(element["to"]).full()
    148 +
    149 +        if destination not in self.streams:
    150 +            raise Exception("Destination unreachable")
    151 +
    152 +        self.streams[destination].send(element)
     259+        """
     260+        stanza["from"] = xs.otherEntity.full()
     261+        self.xmlstream.send(stanza)
Note: See TracChangeset for help on using the changeset viewer.