source: ralphm-patches/c2s_server_factory.patch @ 68:14b59e18ecb6

Last change on this file since 68:14b59e18ecb6 was 68:14b59e18ecb6, checked in by Ralph Meijer <ralphm@…>, 8 years ago

Checkpoint

File size: 6.2 KB
RevLine 
[54]1# HG changeset patch
[68]2# Parent f164cc75b8337933cc6dd9c7f43424f3dbd74dee
[54]3Add factory for accepting client connections.
4
5The new XMPPC2SServerFactory is a server factory for accepting client
6connections. It uses `XMPPClientListenAuthenticator` to perform the
7steps for authentication and binding of a resource, and keeps a list
8of all streams.
9
10Upon loss of the connection, the service is called with `unbindResource`.
11Received stanzas cause the service's `onElement` to be called.
12
13The factory has a `deliverStanza` method to deliver stanzas to a particular
14recipient. This is used for stanzas that have different recipient addressing
15than the actual recipient (for presence and messages from a different (or no)
16resource).
17
18TODO:
19
20 * Add docstrings.
21 * Add tests.
22
[66]23diff --git a/wokkel/client.py b/wokkel/client.py
24--- a/wokkel/client.py
25+++ b/wokkel/client.py
[68]26@@ -21,7 +21,9 @@
[57]27 from twisted.words.xish import domish
[54]28 
29 from wokkel import generic
[57]30+from wokkel.compat import XmlStreamServerFactory
[66]31 from wokkel.iwokkel import IUserSession
[68]32+from wokkel.subprotocols import ServerStreamManager
[54]33 from wokkel.subprotocols import StreamManager
34 
[68]35 NS_CLIENT = 'jabber:client'
36@@ -442,3 +444,41 @@
[66]37             self.portal = self.portals[self.xmlstream.thisEntity]
38         except KeyError:
39             raise error.StreamError('host-unknown')
[54]40+
41+
42+
43+class XMPPC2SServerFactory(XmlStreamServerFactory):
44+
[68]45+    def __init__(self, portals):
[54]46+        def authenticatorFactory():
[68]47+            return XMPPClientListenAuthenticator(portals)
[54]48+
49+        XmlStreamServerFactory.__init__(self, authenticatorFactory)
50+        self.addBootstrap(xmlstream.STREAM_CONNECTED_EVENT,
51+                          self.onConnectionMade)
52+
53+
54+    def onConnectionMade(self, xs):
55+        """
[68]56+        Called when a connection is made.
[54]57+
[68]58+        This creates a stream manager, calls L{setupHandlers} to attach
59+        subprotocol handlers and then signals the stream manager that
60+        the connection was made.
[54]61+        """
[68]62+        sm = ServerStreamManager()
63+        sm.logTraffic = self.logTraffic
[54]64+
[68]65+        for handler in self.setupHandlers():
66+            handler.setHandlerParent(sm)
[54]67+
[68]68+        sm.makeConnection(xs)
[54]69+
70+
[68]71+    def setupHandlers(self):
72+        """
73+        Set up XMPP subprotocol handlers.
74+        """
75+        return [
76+            generic.StanzaForwarder()
77+        ]
78diff --git a/wokkel/generic.py b/wokkel/generic.py
79--- a/wokkel/generic.py
80+++ b/wokkel/generic.py
81@@ -615,3 +615,55 @@
82 
83         self._initializers = self.getInitializers()
84         self._initializeStream()
85+
86+
87+
88+class StanzaForwarder(XMPPHandler):
89+    """
90+    XMPP protocol for passing incoming stanzas to the stream avatar.
91+
92+    This handler adds an observer for all XML Stanzas to forward to the C{send}
93+    method on the cred avatar set on the XML Stream, unless it has been handled
94+    by other observers.
95+
96+    Stream errors are logged.
97+    """
98+
99+    def connectionMade(self):
100+        """
101+        Called when a connection is made.
102+        """
103+        self.xmlstream.addObserver(xmlstream.STREAM_ERROR_EVENT, self.onError)
104+
105+
106+    def connectionInitialized(self):
107+        """
108+        Called when the stream has been initialized.
109+        """
110+        self.xmlstream.addObserver(
111+            '/*[@xmlns="%s"]' % self.xmlstream.namespace,
112+            self.onStanza, priority=-1)
113+
114+
115+    def onStanza(self, element):
116+        """
117+        Called when a stanza element was received.
118+
119+        Unless a stanza has already been handled, or the name of the element is
120+        not one of C{'iq'}, C{'message'}, C{'presence'}, the stanza is passed
121+        on to the avatar's C{send} method.
122+        """
123+        if element.handled:
124+            return
125+
126+        if element.name not in ('iq', 'message', 'presence'):
127+            return
128+
129+        self.xmlstream.avatar.send(element)
130+
131+
132+    def onError(reason):
133+        """
134+        Log a stream error.
135+        """
136+        log.err(reason, "Stream error")
137diff --git a/wokkel/subprotocols.py b/wokkel/subprotocols.py
138--- a/wokkel/subprotocols.py
139+++ b/wokkel/subprotocols.py
140@@ -142,10 +142,13 @@
141     @type timeout: C{int}
142 
143     @ivar _reactor: A provider of L{IReactorTime} to track timeouts.
144+
145+    @cvar __streamCount: Global stream count for distinguishing streams.
146     """
147 
148     timeout = None
149     _reactor = None
150+    __streamCount = 0
151 
152     logTraffic = False
153 
154@@ -195,20 +198,24 @@
155         and call each handler's C{makeConnection} method with the L{XmlStream}
156         instance.
157         """
158-        def logDataIn(buf):
159-            log.msg("RECV: %r" % buf)
160+        xs.serial = self.__streamCount
161+        BaseStreamManager.__streamCount += 1
162 
163-        def logDataOut(buf):
164-            log.msg("SEND: %r" % buf)
165 
166         if self.logTraffic:
167-            xs.rawDataInFn = logDataIn
168-            xs.rawDataOutFn = logDataOut
169+            def logData(direction, data):
170+                log.msg(format="%(direction)s (%(streamID)s): %(data)r",
171+                        direction=direction, streamID=xs.serial, data=data)
172+
173+            log.msg(format="Connection %(streamID) made", streamID=xs.serial)
174+            xs.rawDataInFn = lambda data: logData("RECV", data)
175+            xs.rawDataOutFn = lambda data: logData("SEND", data)
176 
177         xs.addObserver(xmlstream.STREAM_AUTHD_EVENT,
178                        self.connectionInitialized)
179         xs.addObserver(xmlstream.STREAM_END_EVENT,
180                        self.connectionLost)
181+
182         self.xmlstream = xs
183 
184         for e in list(self):
185@@ -222,6 +229,9 @@
186         Send out cached stanzas and call each handler's
187         C{connectionInitialized} method.
188         """
[54]189+        if self.logTraffic:
[68]190+            log.msg(format="Connection %(streamID) initialized",
191+                    streamID=xs.serial)
192 
193         xs.addObserver('/iq[@type="result"]', self._onIQResponse)
194         xs.addObserver('/iq[@type="error"]', self._onIQResponse)
195@@ -247,6 +257,10 @@
196         L{XmlStream} anymore and notifies each handler that the connection
197         was lost by calling its C{connectionLost} method.
198         """
199+        if self.logTraffic:
200+            log.msg(format="Connection %(streamID) lost",
201+                    streamID=self.xmlstream.serial)
[54]202+
[68]203         self.xmlstream = None
204         self._initialized = False
205 
Note: See TracBrowser for help on using the repository browser.