source:
ralphm-patches/server-stream-manager.patch
@
79:0752a1cca356
Last change on this file since 79:0752a1cca356 was 79:0752a1cca356, checked in by Ralph Meijer <ralphm@…>, 6 years ago | |
---|---|
File size: 19.7 KB |
-
wokkel/client.py
# HG changeset patch # Parent 3f3fe954b1975c2d9115e0fa8177ae7b28a708a8 # Parent e198cebf36146a768eda902efa5ee741e947fda8 Generalize StreamManager and add ServerStreamManager. This generalizes `StreamManager` to `BaseStreamManager` to take the common functionality and reuse it for `ServerStreamManager`. Where `StreamManager` is used for initiating connections, with a factory as a parameter, `ServerStreamManager` works for receiving connections. It must be created in a protocol factory when a connection is established, and then its `makeConnection` should be called to hook it up to the `XmlStream` instance. diff --git a/wokkel/client.py b/wokkel/client.py
a b 109 109 self._connection.disconnect() 110 110 111 111 112 def _authd(self, xs):112 def connectionInitialized(self, xs): 113 113 """ 114 114 Called when the stream has been initialized. 115 115 … … 118 118 by its constituent initializers. 119 119 """ 120 120 self.jid = self.factory.authenticator.jid 121 StreamManager. _authd(self, xs)121 StreamManager.connectionInitialized(self, xs) 122 122 123 123 124 124 def initializationFailed(self, reason): -
wokkel/subprotocols.py
diff --git a/wokkel/subprotocols.py b/wokkel/subprotocols.py
a b 122 122 123 123 124 124 125 class StreamManager(XMPPHandlerCollection):125 class BaseStreamManager(XMPPHandlerCollection): 126 126 """ 127 127 Business logic representing a managed XMPP connection. 128 128 … … 133 133 134 134 @ivar xmlstream: currently managed XML stream 135 135 @type xmlstream: L{XmlStream} 136 136 137 @ivar logTraffic: if true, log all traffic. 137 138 @type logTraffic: C{bool} 139 138 140 @ivar _initialized: Whether the stream represented by L{xmlstream} has 139 141 been initialized. This is used when caching outgoing 140 142 stanzas. 141 143 @type _initialized: C{bool} 142 @ivar _packetQueue: internal buffer of unsent data. See L{send} for details. 143 @type _packetQueue: L{list} 144 144 145 @ivar timeout: Default IQ request timeout in seconds. 145 146 @type timeout: C{int} 147 146 148 @ivar _reactor: A provider of L{IReactorTime} to track timeouts. 147 149 """ 150 148 151 timeout = None 149 152 _reactor = None 150 153 151 154 logTraffic = False 152 155 153 def __init__(self, factory,reactor=None):156 def __init__(self, reactor=None): 154 157 """ 155 158 Construct a stream manager. 156 159 157 @param factory: The stream factory to connect with.158 160 @param reactor: A provider of L{IReactorTime} to track timeouts. 159 161 If not provided, the global reactor will be used. 160 162 """ 161 163 XMPPHandlerCollection.__init__(self) 164 162 165 self.xmlstream = None 163 166 self._packetQueue = [] 164 167 self._initialized = False 165 168 166 factory.addBootstrap(xmlstream.STREAM_CONNECTED_EVENT, self._connected)167 factory.addBootstrap(xmlstream.STREAM_AUTHD_EVENT, self._authd)168 factory.addBootstrap(xmlstream.INIT_FAILED_EVENT,169 self.initializationFailed)170 factory.addBootstrap(xmlstream.STREAM_END_EVENT, self._disconnected)171 self.factory = factory172 173 169 if reactor is None: 174 170 from twisted.internet import reactor 175 171 self._reactor = reactor … … 195 191 handler.connectionInitialized() 196 192 197 193 198 def _connected(self, xs):194 def makeConnection(self, xs): 199 195 """ 200 196 Called when the transport connection has been established. 201 197 … … 213 209 xs.rawDataInFn = logDataIn 214 210 xs.rawDataOutFn = logDataOut 215 211 212 xs.addObserver(xmlstream.STREAM_AUTHD_EVENT, 213 self.connectionInitialized) 214 xs.addObserver(xmlstream.STREAM_END_EVENT, 215 self.connectionLost) 216 216 self.xmlstream = xs 217 217 218 218 for e in list(self): 219 219 e.makeConnection(xs) 220 220 221 221 222 def _authd(self, xs):222 def connectionInitialized(self, xs): 223 223 """ 224 224 Called when the stream has been initialized. 225 225 … … 242 242 e.connectionInitialized() 243 243 244 244 245 def initializationFailed(self, reason):246 """247 Called when stream initialization has failed.248 245 249 Stream initialization has halted, with the reason indicated by 250 C{reason}. It may be retried by calling the authenticator's 251 C{initializeStream}. See the respective authenticators for details. 252 253 @param reason: A failure instance indicating why stream initialization 254 failed. 255 @type reason: L{failure.Failure} 256 """ 257 258 259 def _disconnected(self, reason): 246 def connectionLost(self, reason): 260 247 """ 261 248 Called when the stream has been closed. 262 249 … … 468 455 469 456 470 457 458 class StreamManager(BaseStreamManager): 459 """ 460 Business logic representing a managed XMPP client connection. 461 462 @ivar _packetQueue: internal buffer of unsent data. See L{send} for details. 463 @type _packetQueue: L{list} 464 """ 465 466 def __init__(self, factory, reactor=None): 467 """ 468 Construct a stream manager. 469 470 @param factory: The stream factory to connect with. 471 @param reactor: A provider of L{IReactorTime} to track timeouts. 472 If not provided, the global reactor will be used. 473 """ 474 BaseStreamManager.__init__(self, reactor=reactor) 475 476 factory.addBootstrap(xmlstream.STREAM_CONNECTED_EVENT, 477 self.makeConnection) 478 self.factory = factory 479 480 481 def makeConnection(self, xs): 482 xs.addObserver(xmlstream.INIT_FAILED_EVENT, 483 self.initializationFailed) 484 BaseStreamManager.makeConnection(self, xs) 485 486 487 def initializationFailed(self, reason): 488 """ 489 Called when stream initialization has failed. 490 491 Stream initialization has halted, with the reason indicated by 492 C{reason}. It may be retried by calling the authenticator's 493 C{initializeStream}. See the respective authenticators for details. 494 495 @param reason: A failure instance indicating why stream initialization 496 failed. 497 @type reason: L{failure.Failure} 498 """ 499 500 501 502 class ServerStreamManager(BaseStreamManager): 503 """ 504 Business logic representing a managed server-side XMPP connection. 505 """ 506 507 def __init__(self, reactor=None): 508 BaseStreamManager.__init__(self, reactor=reactor) 509 510 511 471 512 class IQHandlerMixin(object): 472 513 """ 473 514 XMPP subprotocol mixin for handle incoming IQ stanzas. -
wokkel/test/helpers.py
diff --git a/wokkel/test/helpers.py b/wokkel/test/helpers.py
a b 108 108 factory = DummyFactory() 109 109 StreamManager.__init__(self, factory, reactor) 110 110 self.stub = XmlStreamStub() 111 self. _connected(self.stub.xmlstream)112 self. _authd(self.stub.xmlstream)111 self.makeConnection(self.stub.xmlstream) 112 self.connectionInitialized(self.stub.xmlstream) -
wokkel/test/test_client.py
diff --git a/wokkel/test/test_client.py b/wokkel/test/test_client.py
a b 38 38 """ 39 39 xs = self.client.factory.buildProtocol(None) 40 40 self.client.factory.authenticator.jid = JID('user@example.org/test') 41 xs.dispatch(xs, xmlstream.STREAM_AUTHD_EVENT)41 self.client.connectionInitialized(xs) 42 42 self.assertEquals(JID('user@example.org/test'), self.client.jid) 43 43 44 44 -
wokkel/test/test_subprotocols.py
diff --git a/wokkel/test/test_subprotocols.py b/wokkel/test/test_subprotocols.py
a b 190 190 191 191 192 192 193 class StreamManagerTest(unittest.TestCase): 194 """ 195 Tests for L{subprotocols.StreamManager}. 196 """ 197 198 def setUp(self): 199 factory = xmlstream.XmlStreamFactory(xmlstream.Authenticator()) 200 self.clock = task.Clock() 201 self.streamManager = subprotocols.StreamManager(factory, self.clock) 202 self.xmlstream = factory.buildProtocol(None) 203 self.transport = proto_helpers.StringTransport() 204 self.xmlstream.transport = self.transport 205 206 self.request = IQGetStanza(recipient=JID('other@example.org'), 207 sender=JID('user@example.org')) 208 193 class BaseStreamManagerTestsMixin(object): 209 194 210 195 def _streamStarted(self): 211 196 """ … … 219 204 self.xmlstream.dispatch(self.xmlstream, "//event/stream/authd") 220 205 221 206 222 def test_ basic(self):207 def test_interface(self): 223 208 """ 224 Test correct initialization and setup of factory observers.209 ServerStreamManager implements IXMPPHandlerCollection. 225 210 """ 226 factory = DummyFactory() 227 sm = subprotocols.StreamManager(factory) 228 self.assertIdentical(None, sm.xmlstream) 229 self.assertEquals([], sm.handlers) 230 self.assertEquals(sm._connected, 231 sm.factory.callbacks['//event/stream/connected']) 232 self.assertEquals(sm._authd, 233 sm.factory.callbacks['//event/stream/authd']) 234 self.assertEquals(sm._disconnected, 235 sm.factory.callbacks['//event/stream/end']) 236 self.assertEquals(sm.initializationFailed, 237 sm.factory.callbacks['//event/xmpp/initfailed']) 211 verifyObject(ijabber.IXMPPHandlerCollection, self.streamManager) 238 212 239 213 240 def test_ connected(self):214 def test_makeConnection(self): 241 215 """ 242 216 Test that protocol handlers have their connectionMade method called 243 217 when the XML stream is connected. … … 245 219 sm = self.streamManager 246 220 handler = DummyXMPPHandler() 247 221 handler.setHandlerParent(sm) 248 xs = xmlstream.XmlStream(xmlstream.Authenticator())249 sm. _connected(xs)222 xs = self.xmlstream 223 sm.makeConnection(xs) 250 224 self.assertEquals(1, handler.doneMade) 251 225 self.assertEquals(0, handler.doneInitialized) 252 226 self.assertEquals(0, handler.doneLost) 253 227 254 228 255 def test_ connectedLogTrafficFalse(self):229 def test_makeConnectionLogTrafficFalse(self): 256 230 """ 257 231 Test raw data functions unset when logTraffic is set to False. 258 232 """ 259 233 sm = self.streamManager 260 234 handler = DummyXMPPHandler() 261 235 handler.setHandlerParent(sm) 262 xs = xmlstream.XmlStream(xmlstream.Authenticator())263 sm. _connected(xs)236 xs = self.xmlstream 237 sm.makeConnection(xs) 264 238 self.assertIdentical(None, xs.rawDataInFn) 265 239 self.assertIdentical(None, xs.rawDataOutFn) 266 240 267 241 268 def test_ connectedLogTrafficTrue(self):242 def test_makeConnectionLogTrafficTrue(self): 269 243 """ 270 244 Test raw data functions set when logTraffic is set to True. 271 245 """ … … 273 247 sm.logTraffic = True 274 248 handler = DummyXMPPHandler() 275 249 handler.setHandlerParent(sm) 276 xs = xmlstream.XmlStream(xmlstream.Authenticator())277 sm. _connected(xs)250 xs = self.xmlstream 251 sm.makeConnection(xs) 278 252 self.assertNotIdentical(None, xs.rawDataInFn) 279 253 self.assertNotIdentical(None, xs.rawDataOutFn) 280 254 281 255 282 def test_ authd(self):256 def test_connectionInitialized(self): 283 257 """ 284 258 Test that protocol handlers have their connectionInitialized method 285 259 called when the XML stream is initialized. … … 287 261 sm = self.streamManager 288 262 handler = DummyXMPPHandler() 289 263 handler.setHandlerParent(sm) 290 xs = xmlstream.XmlStream(xmlstream.Authenticator())291 sm. _authd(xs)264 xs = self.xmlstream 265 sm.connectionInitialized(xs) 292 266 self.assertEquals(0, handler.doneMade) 293 267 self.assertEquals(1, handler.doneInitialized) 294 268 self.assertEquals(0, handler.doneLost) 295 269 296 270 297 def test_ disconnected(self):271 def test_connectionLost(self): 298 272 """ 299 273 Protocol handlers have connectionLost called on stream disconnect. 300 274 """ 301 275 sm = self.streamManager 302 276 handler = DummyXMPPHandler() 303 277 handler.setHandlerParent(sm) 304 sm. _disconnected(None)278 sm.connectionLost(None) 305 279 self.assertEquals(0, handler.doneMade) 306 280 self.assertEquals(0, handler.doneInitialized) 307 281 self.assertEquals(1, handler.doneLost) 308 282 309 283 310 def test_ disconnectedReason(self):284 def test_connectionLostReason(self): 311 285 """ 312 286 A L{STREAM_END_EVENT} results in L{StreamManager} firing the handlers 313 287 L{connectionLost} methods, passing a L{failure.Failure} reason. … … 316 290 handler = FailureReasonXMPPHandler() 317 291 handler.setHandlerParent(sm) 318 292 xmlstream.XmlStream(xmlstream.Authenticator()) 319 sm. _disconnected(failure.Failure(Exception("no reason")))293 sm.connectionLost(failure.Failure(Exception("no reason"))) 320 294 self.assertEquals(True, handler.gotFailureReason) 321 295 322 296 … … 338 312 Adding a handler when connected doesn't call connectionInitialized. 339 313 """ 340 314 sm = self.streamManager 341 xs = xmlstream.XmlStream(xmlstream.Authenticator())342 sm. _connected(xs)315 xs = self.xmlstream 316 sm.makeConnection(xs) 343 317 handler = DummyXMPPHandler() 344 318 handler.setHandlerParent(sm) 345 319 … … 361 335 self.nestedHandler.setHandlerParent(self.parent) 362 336 363 337 sm = self.streamManager 364 xs = xmlstream.XmlStream(xmlstream.Authenticator())338 xs = self.xmlstream 365 339 handler = NestingHandler() 366 340 handler.setHandlerParent(sm) 367 sm. _connected(xs)341 sm.makeConnection(xs) 368 342 369 343 self.assertEquals(1, handler.doneMade) 370 344 self.assertEquals(0, handler.doneInitialized) … … 386 360 called. 387 361 """ 388 362 sm = self.streamManager 389 xs = xmlstream.XmlStream(xmlstream.Authenticator())390 sm. _connected(xs)391 sm. _authd(xs)363 xs = self.xmlstream 364 sm.makeConnection(xs) 365 sm.connectionInitialized(xs) 392 366 handler = DummyXMPPHandler() 393 367 handler.setHandlerParent(sm) 394 368 … … 410 384 self.nestedHandler.setHandlerParent(self.parent) 411 385 412 386 sm = self.streamManager 413 xs = xmlstream.XmlStream(xmlstream.Authenticator())387 xs = self.xmlstream 414 388 handler = NestingHandler() 415 389 handler.setHandlerParent(sm) 416 sm. _connected(xs)417 sm. _authd(xs)390 sm.makeConnection(xs) 391 sm.connectionInitialized(xs) 418 392 419 393 self.assertEquals(1, handler.doneMade) 420 394 self.assertEquals(1, handler.doneInitialized) … … 438 412 self.nestedHandler.setHandlerParent(self.parent) 439 413 440 414 sm = self.streamManager 441 xs = xmlstream.XmlStream(xmlstream.Authenticator())415 xs = self.xmlstream 442 416 handler = NestingHandler() 443 417 handler.setHandlerParent(sm) 444 sm. _connected(xs)445 sm. _authd(xs)446 sm. _disconnected(xs)418 sm.makeConnection(xs) 419 sm.connectionInitialized(xs) 420 sm.connectionLost(xs) 447 421 448 422 self.assertEquals(1, handler.doneMade) 449 423 self.assertEquals(1, handler.doneInitialized) … … 473 447 474 448 The data should be sent directly over the XML stream. 475 449 """ 476 factory = xmlstream.XmlStreamFactory(xmlstream.Authenticator()) 477 sm = subprotocols.StreamManager(factory) 478 xs = factory.buildProtocol(None) 479 xs.transport = proto_helpers.StringTransport() 450 xs = self.xmlstream 480 451 xs.connectionMade() 481 452 xs.dataReceived("<stream:stream xmlns='jabber:client' " 482 453 "xmlns:stream='http://etherx.jabber.org/streams' " 483 454 "from='example.com' id='12345'>") 484 455 xs.dispatch(xs, "//event/stream/authd") 485 s m.send("<presence/>")456 self.streamManager.send("<presence/>") 486 457 self.assertEquals("<presence/>", xs.transport.value()) 487 458 488 459 … … 493 464 The data should be cached until an XML stream has been established and 494 465 initialized. 495 466 """ 496 factory = xmlstream.XmlStreamFactory(xmlstream.Authenticator()) 497 sm = subprotocols.StreamManager(factory) 467 sm = self.streamManager 498 468 handler = DummyXMPPHandler() 499 469 sm.addHandler(handler) 500 470 501 xs = factory.buildProtocol(None)471 xs = self.xmlstream 502 472 xs.transport = proto_helpers.StringTransport() 503 473 sm.send("<presence/>") 504 474 self.assertEquals("", xs.transport.value()) … … 525 495 """ 526 496 factory = xmlstream.XmlStreamFactory(xmlstream.Authenticator()) 527 497 sm = subprotocols.StreamManager(factory) 528 xs = factory.buildProtocol(None)498 xs = self.xmlstream 529 499 xs.transport = proto_helpers.StringTransport() 530 500 xs.connectionMade() 531 501 xs.dataReceived("<stream:stream xmlns='jabber:client' " … … 548 518 handler = DummyXMPPHandler() 549 519 sm.addHandler(handler) 550 520 551 xs = factory.buildProtocol(None)521 xs = self.xmlstream 552 522 xs.connectionMade() 553 523 xs.transport = proto_helpers.StringTransport() 554 524 xs.connectionLost(None) … … 720 690 """ 721 691 d = self.streamManager.request(self.request) 722 692 xs = self.xmlstream 693 xs.connectionMade() 723 694 xs.connectionLost(failure.Failure(ConnectionDone())) 724 695 self.assertFailure(d, ConnectionDone) 725 696 return d … … 735 706 d = xmlstream.IQ(self.xmlstream).send() 736 707 d.addErrback(eb) 737 708 709 self.xmlstream.connectionMade() 738 710 d = self.streamManager.request(self.request) 739 711 d.addErrback(eb) 740 712 self.xmlstream.connectionLost(failure.Failure(ConnectionDone())) … … 780 752 self.request.timeout = 60 781 753 d = self.streamManager.request(self.request) 782 754 755 self.xmlstream.connectionMade() 783 756 self.xmlstream.connectionLost(failure.Failure(ConnectionDone())) 784 757 self.assertFailure(d, ConnectionDone) 785 758 self.assertFalse(self.clock.calls) … … 822 795 823 796 824 797 798 class StreamManagerTest(unittest.TestCase, BaseStreamManagerTestsMixin): 799 """ 800 Tests for L{subprotocols.StreamManager}. 801 """ 802 803 def setUp(self): 804 factory = xmlstream.XmlStreamFactory(xmlstream.Authenticator()) 805 self.clock = task.Clock() 806 self.streamManager = subprotocols.StreamManager(factory, self.clock) 807 self.xmlstream = factory.buildProtocol(None) 808 self.transport = proto_helpers.StringTransport() 809 self.xmlstream.transport = self.transport 810 811 self.request = IQGetStanza(recipient=JID('other@example.org'), 812 sender=JID('user@example.org')) 813 814 815 def test_basic(self): 816 """ 817 Test correct initialization and setup of factory observers. 818 """ 819 factory = DummyFactory() 820 sm = subprotocols.StreamManager(factory) 821 self.assertIdentical(None, sm.xmlstream) 822 self.assertEquals([], sm.handlers) 823 self.assertEquals(sm.makeConnection, 824 sm.factory.callbacks['//event/stream/connected']) 825 826 827 828 class ServerStreamManagerTest(unittest.TestCase, BaseStreamManagerTestsMixin): 829 """ 830 Tests for L{subprotocols.StreamManager}. 831 """ 832 833 def setUp(self): 834 self.clock = task.Clock() 835 self.streamManager = subprotocols.ServerStreamManager( 836 reactor=self.clock) 837 838 self.xmlstream = xmlstream.XmlStream(xmlstream.Authenticator()) 839 self.transport = proto_helpers.StringTransport() 840 self.xmlstream.transport = self.transport 841 842 self.xmlstream.addObserver(xmlstream.STREAM_CONNECTED_EVENT, 843 self.streamManager.makeConnection) 844 845 self.request = IQGetStanza(recipient=JID('other@example.org'), 846 sender=JID('user@example.org')) 847 848 849 825 850 class DummyIQHandler(subprotocols.IQHandlerMixin): 826 851 iqHandlers = {'/iq[@type="get"]': 'onGet'} 827 852
Note: See TracBrowser
for help on using the repository browser.