source: ralphm-patches/xmpp_client_service.patch @ 37:03cd0cb8548c

Last change on this file since 37:03cd0cb8548c was 37:03cd0cb8548c, checked in by Ralph Meijer <ralphm@…>, 12 years ago

Create factory for accepting c2s connections, split out session manager.

File size: 8.2 KB
RevLine 
[37]1diff -r 1644083ca235 doc/examples/client_service.tac
2--- /dev/null   Thu Jan 01 00:00:00 1970 +0000
3+++ b/doc/examples/client_service.tac   Sat Feb 13 19:01:07 2010 +0100
4@@ -0,0 +1,64 @@
5+from twisted.application import service, strports
6+from twisted.internet import defer
7+
8+from wokkel import client
9+from wokkel.generic import FallbackHandler
10+from wokkel.subprotocols import XMPPHandler
11+from wokkel.xmppim import RosterItem, RosterServerProtocol
12+
13+from twisted.words.protocols.jabber.jid import internJID as JID
14+
15+import socket
16+domain = socket.gethostname()
17+
18+class StaticRoster(RosterServerProtocol):
19+
20+    def __init__(self):
21+        RosterServerProtocol.__init__(self)
22+        self.roster = {'ralphm': [
23+                           RosterItem(JID('intosi@'+domain),
24+                                      subscriptionTo=True,
25+                                      subscriptionFrom=True,
26+                                      name='Intosi',
27+                                      groups=set(['Friends'])),
28+                           RosterItem(JID('termie@'+domain),
29+                                      subscriptionTo=True,
30+                                      subscriptionFrom=True,
31+                                      name='termie'),
32+                           ],
33+                       'test': [],
34+                       }
35+
36+    def getRoster(self, entity):
37+        return defer.succeed(self.roster[entity.user])
38+
39+
40+class Hello(XMPPHandler):
41+
42+    def q(self):
43+        from wokkel.xmppim import AvailabilityPresence
44+        p = AvailabilityPresence(JID('ralphm@'+domain+'/default'), JID('termie@'+domain+'/Home'), show='chat')
45+        self.parent.send(p.toElement())
46+
47+
48+    def connectionInitialized(self):
49+        from twisted.internet import reactor
50+        reactor.callLater(5, self.q)
51+
52+
53+
54+application = service.Application("Jabber server")
55+
56+sessionManager = client.SessionManager()
57+FallbackHandler().setHandlerParent(sessionManager)
58+StaticRoster().setHandlerParent(sessionManager)
59+Hello().setHandlerParent(sessionManager)
60+
61+clientService = client.ClientService(sessionManager, domain)
62+clientService.logTraffic = True
63+
64+
65+c2sFactory = client.XMPPC2SServerFactory(clientService)
66+c2sFactory.logTraffic = True
67+c2sService = strports.service('5224', c2sFactory)
68+c2sService.setServiceParent(application)
69diff -r 1644083ca235 wokkel/client.py
70--- a/wokkel/client.py  Sat Feb 13 18:57:27 2010 +0100
71+++ b/wokkel/client.py  Sat Feb 13 19:01:07 2010 +0100
[35]72@@ -17,11 +17,12 @@
73 from twisted.names.srvconnect import SRVConnector
[34]74 from twisted.python import log
75 from twisted.words.protocols.jabber import client, error, sasl, xmlstream
[35]76-from twisted.words.protocols.jabber.jid import internJID as JID
77+from twisted.words.protocols.jabber.jid import JID, internJID
[37]78 from twisted.words.xish import domish
[34]79 
80 from wokkel import generic
81-from wokkel.subprotocols import StreamManager
82+from wokkel.compat import XmlStreamServerFactory
[35]83+from wokkel.subprotocols import StreamManager, XMPPHandlerCollection
[34]84 
85 NS_CLIENT = 'jabber:client'
86 
[37]87@@ -311,6 +312,7 @@
88 
89         # TODO: check for resource conflicts
90 
91+        print self.username, self.domain, self.resource
92         newJID = JID(tuple=(self.username, self.domain, self.resource))
93 
94         reply = domish.Element((None, 'iq'))
95@@ -350,3 +352,166 @@
[34]96             reply['id'] = iq['id']
97         reply.addElement((client.NS_XMPP_SESSION, 'session'))
98         self.xmlstream.send(reply)
99+
100+
101+
[37]102+class XMPPC2SServerFactory(XmlStreamServerFactory):
[34]103+
[37]104+    def __init__(self, service):
105+        self.service = service
[34]106+
[37]107+        def authenticatorFactory():
108+            return XMPPClientListenAuthenticator(service.domain)
109+
110+        XmlStreamServerFactory.__init__(self, authenticatorFactory)
111+        self.addBootstrap(xmlstream.STREAM_CONNECTED_EVENT,
112+                          self.onConnectionMade)
113+        self.addBootstrap(xmlstream.STREAM_AUTHD_EVENT,
114+                          self.onAuthenticated)
115+
116+        self.serial = 0
117+
118+
119+    def onConnectionMade(self, xs):
[34]120+        """
[37]121+        Called when a client-to-server connection was made.
[34]122+
[37]123+        This enables traffic debugging on incoming streams.
[34]124+        """
[37]125+        xs.serial = self.serial
126+        self.serial += 1
[34]127+
[37]128+        log.msg("Client connection %d made" % xs.serial)
[34]129+
[37]130+        def logDataIn(buf):
131+            log.msg("RECV (%d): %r" % (xs.serial, buf))
[34]132+
[37]133+        def logDataOut(buf):
134+            log.msg("SEND (%d): %r" % (xs.serial, buf))
135+
136+        if self.logTraffic:
137+            xs.rawDataInFn = logDataIn
138+            xs.rawDataOutFn = logDataOut
139+
140+        xs.addObserver(xmlstream.STREAM_ERROR_EVENT, self.onError)
141+
142+
143+    def onAuthenticated(self, xs):
144+        log.msg("Client connection %d authenticated" % xs.serial)
145+
146+        xs.addObserver(xmlstream.STREAM_END_EVENT, self.onConnectionLost,
147+                                                   0, xs)
148+        xs.addObserver('/*', self.onElement, 0, xs)
149+
150+        self.service.connectionInitialized(xs)
151+
152+
153+    def onConnectionLost(self, xs, reason):
154+        log.msg("Client connection %d disconnected" % xs.serial)
155+
156+        self.service.connectionLost(xs, reason)
157+
158+
159+    def onError(self, reason):
160+        log.err(reason, "Stream Error")
161+
162+
163+    def onElement(self, xs, element):
164+        """
165+        Called when an element was received from one of the connected streams.
166+
167+        """
168+        if element.handled:
169+            return
170+        else:
171+            self.service.dispatch(xs, element)
172+
173+
174+
175+class SessionManager(XMPPHandlerCollection):
176+
177+    def __init__(self):
178+        XMPPHandlerCollection.__init__(self)
179+        self.xmlstream = None
180+
181+
182+    def makeConnection(self, xs):
183+        self.xmlstream = xs
184+
185+        for handler in self:
186+            handler.makeConnection(xs)
187+            handler.connectionInitialized()
188+
189+
190+    def send(self, obj):
191+        if self.xmlstream:
192+            self.xmlstream.send(obj)
193+
194+
195+class ClientService(object):
[34]196+    """
197+    Service for accepting XMPP client connections.
198+
199+    Incoming client connections are first authenticated using the
200+    L{XMPPClientListenAuthenticator}, and then kept in a dictionary that is
201+    keyed with the JID that was bound to that connection.
202+
203+    Where L{StreamManager} manages one connection at a time, this
204+    service demultiplexes incoming connections. For the subprotocol handlers
205+    (objects providing {ijabber.IXMPPHandler}), their stream is always
206+    connected and initialized, but they only have to deal with one stream. This
207+    makes it easier to create adapters.
208+
[35]209+    As an L{xmlstream.XMPPHandlerCollection}, this service creates a fake XML
[34]210+    stream that is passed to the XMPP subprotocol handlers. The received
211+    stanzas from the incoming client connections are passed on to the handlers
212+    using this fake XML, while having their C{from} attribute set to the JID of
213+    the client stream. Stanzas sent by the handlers are forwarded to the
214+    matching client stream, selected by the stanzas C{to} attribute.
215+    """
216+
217+    logTraffic = False
218+
[37]219+    def __init__(self, sessionManager, domain):
220+        self.sessionManager = sessionManager
[34]221+        self.domain = domain
222+
223+        self.streams = {}
224+
[37]225+        pipe = generic.XmlPipe()
226+        self.xmlstream = pipe.source
227+        self.sessionManager.makeConnection(pipe.sink)
228+        self.xmlstream.addObserver('/*', self.send)
[34]229+
230+
231+    def connectionInitialized(self, xs):
[37]232+        self.streams[xs.otherEntity] = xs
[34]233+
234+
[37]235+    def connectionLost(self, xs, reason):
236+        if xs.otherEntity in self.streams:
237+            del self.streams[xs.otherEntity]
[34]238+
239+
[37]240+    def send(self, stanza):
241+        """
242+        Send stanza to the proper XML Stream.
243+
244+        This uses addressing embedded in the element to find the correct stream
245+        to forward the element to.
246+        """
247+        destination = internJID(stanza["to"])
248+
249+        if destination not in self.streams:
250+            log.msg("Euh")
251+            raise Exception("Destination unreachable")
252+
253+        self.streams[destination].send(stanza)
254+
255+
256+    def dispatch(self, xs, stanza):
[34]257+        """
258+        Called when an element was received from one of the connected streams.
259+        """
[37]260+        stanza["from"] = xs.otherEntity.full()
261+        self.xmlstream.send(stanza)
Note: See TracBrowser for help on using the repository browser.