source: ralphm-patches/xmpp_client_service.patch @ 35:2866eaae775f

Last change on this file since 35:2866eaae775f was 35:2866eaae775f, checked in by Ralph Meijer <ralphm@…>, 12 years ago

Minor cleanups to deal with changed APIs

File size: 5.0 KB
RevLine 
[34]1diff -r 67a42d8a1c73 wokkel/client.py
2--- a/wokkel/client.py  Fri Feb 12 09:31:19 2010 +0100
[35]3+++ b/wokkel/client.py  Fri Feb 12 11:46:41 2010 +0100
4@@ -17,11 +17,12 @@
5 from twisted.names.srvconnect import SRVConnector
[34]6 from twisted.python import log
7 from twisted.words.protocols.jabber import client, error, sasl, xmlstream
[35]8-from twisted.words.protocols.jabber.jid import internJID as JID
[34]9-from twisted.words.xish import domish
[35]10+from twisted.words.protocols.jabber.jid import JID, internJID
[34]11+from twisted.words.xish import domish, utility
12 
13 from wokkel import generic
14-from wokkel.subprotocols import StreamManager
15+from wokkel.compat import XmlStreamServerFactory
[35]16+from wokkel.subprotocols import StreamManager, XMPPHandlerCollection
[34]17 
18 NS_CLIENT = 'jabber:client'
19 
[35]20@@ -350,3 +351,132 @@
[34]21             reply['id'] = iq['id']
22         reply.addElement((client.NS_XMPP_SESSION, 'session'))
23         self.xmlstream.send(reply)
24+
25+
26+
27+class DemuxedXmlStream(utility.EventDispatcher):
28+    """
29+    Fake XML stream for demultiplexing incoming streams.
30+
31+    Incoming traffic should have its C{from} attribute set to the JID of the
32+    sender and then L{dispatch<utility.EventDispatcher.dispatch>}ed. Outgoing
33+    traffic needs to have the C{to} attribute set. It is then passed on to
34+    the stream manager's C{send} method.
35+    """
36+
37+    def send(self, element):
38+        """
39+        Send element out over the wire.
40+
41+        This calls the stream manager to forward the element based on
42+        the embedded addressing information.
43+        """
44+        self.manager.send(element)
45+
46+
47+
[35]48+class ClientService(XMPPHandlerCollection, service.Service):
[34]49+    """
50+    Service for accepting XMPP client connections.
51+
52+    Incoming client connections are first authenticated using the
53+    L{XMPPClientListenAuthenticator}, and then kept in a dictionary that is
54+    keyed with the JID that was bound to that connection.
55+
56+    Where L{StreamManager} manages one connection at a time, this
57+    service demultiplexes incoming connections. For the subprotocol handlers
58+    (objects providing {ijabber.IXMPPHandler}), their stream is always
59+    connected and initialized, but they only have to deal with one stream. This
60+    makes it easier to create adapters.
61+
[35]62+    As an L{xmlstream.XMPPHandlerCollection}, this service creates a fake XML
[34]63+    stream that is passed to the XMPP subprotocol handlers. The received
64+    stanzas from the incoming client connections are passed on to the handlers
65+    using this fake XML, while having their C{from} attribute set to the JID of
66+    the client stream. Stanzas sent by the handlers are forwarded to the
67+    matching client stream, selected by the stanzas C{to} attribute.
68+    """
69+
70+    logTraffic = False
71+
72+    def __init__(self, domain, port=5222):
73+        self.domain = domain
74+        self.port = port
75+
[35]76+        def authenticatorFactory():
77+            return XMPPClientListenAuthenticator(self.domain)
78+
79+        self.factory = XmlStreamServerFactory(authenticatorFactory)
[34]80+        self.factory.addBootstrap(xmlstream.STREAM_CONNECTED_EVENT,
81+                                  self.makeConnection)
82+        self.factory.addBootstrap(xmlstream.STREAM_AUTHD_EVENT,
83+                                  self.connectionInitialized)
84+
85+        self.streams = {}
86+
[35]87+        XMPPHandlerCollection.__init__(self)
[34]88+
89+
90+    def startService(self):
91+
92+        self.xmlstream = DemuxedXmlStream()
93+        self.xmlstream.manager = self
94+        self._initialized = True
95+
96+        for handler in self:
97+            handler.makeConnection(self.xmlstream)
98+            handler.connectionInitialized()
99+
100+        service.Service.startService(self)
101+        reactor.listenTCP(self.port, self.factory)
102+
103+
104+    def makeConnection(self, xs):
105+        def logDataIn(buf):
106+            log.msg("RECV: %r" % buf)
107+
108+        def logDataOut(buf):
109+            log.msg("SEND: %r" % buf)
110+
111+        if self.logTraffic:
112+            xs.rawDataInFn = logDataIn
113+            xs.rawDataOutFn = logDataOut
114+
115+
116+    def connectionInitialized(self, xs):
117+        self.streams[xs.otherEntity.full()] = xs
118+        xs.addObserver(xmlstream.STREAM_END_EVENT,
119+                       lambda failure: self.connectionDisconnected(xs))
120+        xs.addObserver('/*', lambda element: self.onElement(element, xs))
121+
122+
123+    def connectionDisconnected(self, xs):
124+        del self.streams[xs.otherEntity.full()]
125+
126+
127+    def onElement(self, element, xs):
128+        """
129+        Called when an element was received from one of the connected streams.
130+
131+        """
132+        if element.handled:
133+            return
134+
135+        element["from"] = xs.otherEntity.full()
136+        self.xmlstream.dispatch(element)
137+
138+
139+    def send(self, element):
140+        """
141+        Send element to the proper XML Stream.
142+
143+        This uses addressing embedded in the element to find the correct
144+        stream to forward the element to.
145+        """
146+
[35]147+        destination = internJID(element["to"]).full()
[34]148+
149+        if destination not in self.streams:
150+            raise Exception("Destination unreachable")
151+
152+        self.streams[destination].send(element)
Note: See TracBrowser for help on using the repository browser.