source: ralphm-patches/xmpp_client_service.patch @ 34:e46c5701df9e

Last change on this file since 34:e46c5701df9e was 34:e46c5701df9e, checked in by Ralph Meijer <ralphm@…>, 12 years ago

Add a bunch of new patches.

File size: 4.9 KB
RevLine 
[34]1diff -r 67a42d8a1c73 wokkel/client.py
2--- a/wokkel/client.py  Fri Feb 12 09:31:19 2010 +0100
3+++ b/wokkel/client.py  Fri Feb 12 09:37:28 2010 +0100
4@@ -18,10 +18,11 @@
5 from twisted.python import log
6 from twisted.words.protocols.jabber import client, error, sasl, xmlstream
7 from twisted.words.protocols.jabber.jid import internJID as JID
8-from twisted.words.xish import domish
9+from twisted.words.xish import domish, utility
10 
11 from wokkel import generic
12-from wokkel.subprotocols import StreamManager
13+from wokkel.compat import XmlStreamServerFactory
14+from wokkel.subprotocols import StreamManager, XMPPHandlerContainer
15 
16 NS_CLIENT = 'jabber:client'
17 
18@@ -350,3 +351,130 @@
19             reply['id'] = iq['id']
20         reply.addElement((client.NS_XMPP_SESSION, 'session'))
21         self.xmlstream.send(reply)
22+
23+
24+
25+class DemuxedXmlStream(utility.EventDispatcher):
26+    """
27+    Fake XML stream for demultiplexing incoming streams.
28+
29+    Incoming traffic should have its C{from} attribute set to the JID of the
30+    sender and then L{dispatch<utility.EventDispatcher.dispatch>}ed. Outgoing
31+    traffic needs to have the C{to} attribute set. It is then passed on to
32+    the stream manager's C{send} method.
33+    """
34+
35+    def send(self, element):
36+        """
37+        Send element out over the wire.
38+
39+        This calls the stream manager to forward the element based on
40+        the embedded addressing information.
41+        """
42+        self.manager.send(element)
43+
44+
45+
46+class ClientService(XMPPHandlerContainer, service.Service):
47+    """
48+    Service for accepting XMPP client connections.
49+
50+    Incoming client connections are first authenticated using the
51+    L{XMPPClientListenAuthenticator}, and then kept in a dictionary that is
52+    keyed with the JID that was bound to that connection.
53+
54+    Where L{StreamManager} manages one connection at a time, this
55+    service demultiplexes incoming connections. For the subprotocol handlers
56+    (objects providing {ijabber.IXMPPHandler}), their stream is always
57+    connected and initialized, but they only have to deal with one stream. This
58+    makes it easier to create adapters.
59+
60+    As an L{xmlstream.XMPPHandlerContainer}, this service creates a fake XML
61+    stream that is passed to the XMPP subprotocol handlers. The received
62+    stanzas from the incoming client connections are passed on to the handlers
63+    using this fake XML, while having their C{from} attribute set to the JID of
64+    the client stream. Stanzas sent by the handlers are forwarded to the
65+    matching client stream, selected by the stanzas C{to} attribute.
66+    """
67+
68+    logTraffic = False
69+
70+    def __init__(self, domain, port=5222):
71+        self.domain = domain
72+        self.port = port
73+
74+        self.factory = XmlStreamServerFactory(XMPPClientListenAuthenticator,
75+                                              self.domain)
76+        self.factory.addBootstrap(xmlstream.STREAM_CONNECTED_EVENT,
77+                                  self.makeConnection)
78+        self.factory.addBootstrap(xmlstream.STREAM_AUTHD_EVENT,
79+                                  self.connectionInitialized)
80+
81+        self.streams = {}
82+
83+        XMPPHandlerContainer.__init__(self)
84+
85+
86+    def startService(self):
87+
88+        self.xmlstream = DemuxedXmlStream()
89+        self.xmlstream.manager = self
90+        self._initialized = True
91+
92+        for handler in self:
93+            handler.makeConnection(self.xmlstream)
94+            handler.connectionInitialized()
95+
96+        service.Service.startService(self)
97+        reactor.listenTCP(self.port, self.factory)
98+
99+
100+    def makeConnection(self, xs):
101+        def logDataIn(buf):
102+            log.msg("RECV: %r" % buf)
103+
104+        def logDataOut(buf):
105+            log.msg("SEND: %r" % buf)
106+
107+        if self.logTraffic:
108+            xs.rawDataInFn = logDataIn
109+            xs.rawDataOutFn = logDataOut
110+
111+
112+    def connectionInitialized(self, xs):
113+        self.streams[xs.otherEntity.full()] = xs
114+        xs.addObserver(xmlstream.STREAM_END_EVENT,
115+                       lambda failure: self.connectionDisconnected(xs))
116+        xs.addObserver('/*', lambda element: self.onElement(element, xs))
117+
118+
119+    def connectionDisconnected(self, xs):
120+        del self.streams[xs.otherEntity.full()]
121+
122+
123+    def onElement(self, element, xs):
124+        """
125+        Called when an element was received from one of the connected streams.
126+
127+        """
128+        if element.handled:
129+            return
130+
131+        element["from"] = xs.otherEntity.full()
132+        self.xmlstream.dispatch(element)
133+
134+
135+    def send(self, element):
136+        """
137+        Send element to the proper XML Stream.
138+
139+        This uses addressing embedded in the element to find the correct
140+        stream to forward the element to.
141+        """
142+
143+        destination = JID(element["to"]).full()
144+
145+        if destination not in self.streams:
146+            raise Exception("Destination unreachable")
147+
148+        self.streams[destination].send(element)
Note: See TracBrowser for help on using the repository browser.