source:
ralphm-patches/server-stream-manager.patch
@
74:355afce3af27
Last change on this file since 74:355afce3af27 was 74:355afce3af27, checked in by Ralph Meijer <ralphm@…>, 8 years ago | |
---|---|
File size: 21.2 KB |
-
wokkel/client.py
# HG changeset patch # Parent 71082c22f73200a0b8b4b5c6e5d2773f9f68dd4d Generalize StreamManager and add ServerStreamManager. This generalizes `StreamManager` to `BaseStreamManager` to take the common functionality and reuse it for `ServerStreamManager`. Where `StreamManager` is used for initiating connections, with a factory as a parameter, `ServerStreamManager` works for receiving connections. It must be created in a protocol factory when a connection is established, and then its `makeConnection` should be called to hook it up to the `XmlStream` instance. diff --git a/wokkel/client.py b/wokkel/client.py
a b 109 109 self._connection.disconnect() 110 110 111 111 112 def _authd(self, xs):112 def connectionInitialized(self, xs): 113 113 """ 114 114 Called when the stream has been initialized. 115 115 … … 118 118 by its constituent initializers. 119 119 """ 120 120 self.jid = self.factory.authenticator.jid 121 StreamManager. _authd(self, xs)121 StreamManager.connectionInitialized(self, xs) 122 122 123 123 124 124 def initializationFailed(self, reason): -
wokkel/component.py
diff --git a/wokkel/component.py b/wokkel/component.py
a b 36 36 StreamManager.__init__(self, factory) 37 37 38 38 39 def _authd(self, xs):39 def connectionInitialized(self, xs): 40 40 """ 41 41 Called when stream initialization has completed. 42 42 … … 53 53 old_send(obj) 54 54 55 55 xs.send = send 56 StreamManager. _authd(self, xs)56 StreamManager.connectionInitialized(self, xs) 57 57 58 58 59 59 def initializationFailed(self, reason): -
wokkel/subprotocols.py
diff --git a/wokkel/subprotocols.py b/wokkel/subprotocols.py
a b 120 120 121 121 122 122 123 class StreamManager(XMPPHandlerCollection):123 class BaseStreamManager(XMPPHandlerCollection): 124 124 """ 125 125 Business logic representing a managed XMPP connection. 126 126 … … 131 131 132 132 @ivar xmlstream: currently managed XML stream 133 133 @type xmlstream: L{XmlStream} 134 134 135 @ivar logTraffic: if true, log all traffic. 135 136 @type logTraffic: C{bool} 137 136 138 @ivar _initialized: Whether the stream represented by L{xmlstream} has 137 139 been initialized. This is used when caching outgoing 138 140 stanzas. 139 141 @type _initialized: C{bool} 140 @ivar _packetQueue: internal buffer of unsent data. See L{send} for details. 141 @type _packetQueue: L{list} 142 142 143 @ivar timeout: Default IQ request timeout in seconds. 143 144 @type timeout: C{int} 145 144 146 @ivar _reactor: A provider of L{IReactorTime} to track timeouts. 145 147 """ 148 146 149 timeout = None 147 150 _reactor = None 148 151 149 152 logTraffic = False 150 153 151 def __init__(self, factory,reactor=None):154 def __init__(self, reactor=None): 152 155 """ 153 156 Construct a stream manager. 154 157 155 @param factory: The stream factory to connect with.156 158 @param reactor: A provider of L{IReactorTime} to track timeouts. 157 159 If not provided, the global reactor will be used. 158 160 """ 159 161 XMPPHandlerCollection.__init__(self) 162 160 163 self.xmlstream = None 161 164 self._packetQueue = [] 162 165 self._initialized = False 163 166 164 factory.addBootstrap(xmlstream.STREAM_CONNECTED_EVENT, self._connected)165 factory.addBootstrap(xmlstream.STREAM_AUTHD_EVENT, self._authd)166 factory.addBootstrap(xmlstream.INIT_FAILED_EVENT,167 self.initializationFailed)168 factory.addBootstrap(xmlstream.STREAM_END_EVENT, self._disconnected)169 self.factory = factory170 171 167 if reactor is None: 172 168 from twisted.internet import reactor 173 169 self._reactor = reactor … … 193 189 handler.connectionInitialized() 194 190 195 191 196 def _connected(self, xs):192 def makeConnection(self, xs): 197 193 """ 198 194 Called when the transport connection has been established. 199 195 … … 211 207 xs.rawDataInFn = logDataIn 212 208 xs.rawDataOutFn = logDataOut 213 209 210 xs.addObserver(xmlstream.STREAM_AUTHD_EVENT, 211 self.connectionInitialized) 212 xs.addObserver(xmlstream.STREAM_END_EVENT, 213 self.connectionLost) 214 214 self.xmlstream = xs 215 215 216 216 for e in list(self): 217 217 e.makeConnection(xs) 218 218 219 219 220 def _authd(self, xs):220 def connectionInitialized(self, xs): 221 221 """ 222 222 Called when the stream has been initialized. 223 223 … … 240 240 e.connectionInitialized() 241 241 242 242 243 def initializationFailed(self, reason):244 """245 Called when stream initialization has failed.246 243 247 Stream initialization has halted, with the reason indicated by 248 C{reason}. It may be retried by calling the authenticator's 249 C{initializeStream}. See the respective authenticators for details. 250 251 @param reason: A failure instance indicating why stream initialization 252 failed. 253 @type reason: L{failure.Failure} 254 """ 255 256 257 def _disconnected(self, reason): 244 def connectionLost(self, reason): 258 245 """ 259 246 Called when the stream has been closed. 260 247 … … 464 451 465 452 466 453 454 class StreamManager(BaseStreamManager): 455 """ 456 Business logic representing a managed XMPP client connection. 457 458 @ivar _packetQueue: internal buffer of unsent data. See L{send} for details. 459 @type _packetQueue: L{list} 460 """ 461 462 def __init__(self, factory, reactor=None): 463 """ 464 Construct a stream manager. 465 466 @param factory: The stream factory to connect with. 467 @param reactor: A provider of L{IReactorTime} to track timeouts. 468 If not provided, the global reactor will be used. 469 """ 470 BaseStreamManager.__init__(self, reactor=reactor) 471 472 factory.addBootstrap(xmlstream.STREAM_CONNECTED_EVENT, 473 self.makeConnection) 474 self.factory = factory 475 476 477 def makeConnection(self, xs): 478 xs.addObserver(xmlstream.INIT_FAILED_EVENT, 479 self.initializationFailed) 480 BaseStreamManager.makeConnection(self, xs) 481 482 483 def initializationFailed(self, reason): 484 """ 485 Called when stream initialization has failed. 486 487 Stream initialization has halted, with the reason indicated by 488 C{reason}. It may be retried by calling the authenticator's 489 C{initializeStream}. See the respective authenticators for details. 490 491 @param reason: A failure instance indicating why stream initialization 492 failed. 493 @type reason: L{failure.Failure} 494 """ 495 496 497 498 class ServerStreamManager(BaseStreamManager): 499 """ 500 Business logic representing a managed server-side XMPP connection. 501 """ 502 503 def __init__(self, reactor=None): 504 BaseStreamManager.__init__(self, reactor=reactor) 505 506 507 467 508 class IQHandlerMixin(object): 468 509 """ 469 510 XMPP subprotocol mixin for handle incoming IQ stanzas. -
wokkel/test/helpers.py
diff --git a/wokkel/test/helpers.py b/wokkel/test/helpers.py
a b 108 108 factory = DummyFactory() 109 109 StreamManager.__init__(self, factory, reactor) 110 110 self.stub = XmlStreamStub() 111 self. _connected(self.stub.xmlstream)112 self. _authd(self.stub.xmlstream)111 self.makeConnection(self.stub.xmlstream) 112 self.connectionInitialized(self.stub.xmlstream) -
wokkel/test/test_client.py
diff --git a/wokkel/test/test_client.py b/wokkel/test/test_client.py
a b 38 38 """ 39 39 xs = self.client.factory.buildProtocol(None) 40 40 self.client.factory.authenticator.jid = JID('user@example.org/test') 41 xs.dispatch(xs, xmlstream.STREAM_AUTHD_EVENT)41 self.client.connectionInitialized(xs) 42 42 self.assertEquals(JID('user@example.org/test'), self.client.jid) 43 43 44 44 -
wokkel/test/test_component.py
diff --git a/wokkel/test/test_component.py b/wokkel/test/test_component.py
a b 49 49 def __init__(self, *args, **kwargs): 50 50 component.Component.__init__(self, *args, **kwargs) 51 51 self.factory.clock = Clock() 52 self.output = [] 52 53 53 54 54 55 def _getConnection(self): 55 56 c = FakeConnector(self.factory, None, None) 56 57 c.connect() 58 xs = self.factory.buildProtocol(None) 59 xs.send = self.output.append 60 xs.connectionMade() 61 self.makeConnection(xs) 62 xs.thisEntity = xs.otherEntity 63 xs.dispatch(xs, xmlstream.STREAM_AUTHD_EVENT) 57 64 return c 58 65 59 66 … … 104 111 self.assertEqual(1, connector.connects) 105 112 106 113 114 def test_stampFrom(self): 115 """ 116 Outgoing elements with missing sender address get component JID. 117 """ 118 comp = TestableComponent('example.org', 5347, 119 'test.example.org', 'secret') 120 comp.startService() 121 122 element = domish.Element((component.NS_COMPONENT_ACCEPT, "message")) 123 element["to"] = "test@example.org" 124 comp.xmlstream.send(element) 125 126 self.assertEqual('test.example.org', element.getAttribute("from")) 127 128 107 129 108 130 class InternalComponentTest(unittest.TestCase): 109 131 """ -
wokkel/test/test_subprotocols.py
diff --git a/wokkel/test/test_subprotocols.py b/wokkel/test/test_subprotocols.py
a b 189 189 190 190 191 191 192 class StreamManagerTest(unittest.TestCase): 193 """ 194 Tests for L{subprotocols.StreamManager}. 195 """ 196 197 def setUp(self): 198 factory = xmlstream.XmlStreamFactory(xmlstream.Authenticator()) 199 self.clock = task.Clock() 200 self.streamManager = subprotocols.StreamManager(factory, self.clock) 201 self.xmlstream = factory.buildProtocol(None) 202 self.transport = proto_helpers.StringTransport() 203 self.xmlstream.transport = self.transport 204 205 self.request = IQGetStanza() 192 class BaseStreamManagerTestsMixin(object): 206 193 207 194 def _streamStarted(self): 208 195 """ … … 216 203 self.xmlstream.dispatch(self.xmlstream, "//event/stream/authd") 217 204 218 205 219 def test_ basic(self):206 def test_interface(self): 220 207 """ 221 Test correct initialization and setup of factory observers.208 ServerStreamManager implements IXMPPHandlerCollection. 222 209 """ 223 factory = DummyFactory() 224 sm = subprotocols.StreamManager(factory) 225 self.assertIdentical(None, sm.xmlstream) 226 self.assertEquals([], sm.handlers) 227 self.assertEquals(sm._connected, 228 sm.factory.callbacks['//event/stream/connected']) 229 self.assertEquals(sm._authd, 230 sm.factory.callbacks['//event/stream/authd']) 231 self.assertEquals(sm._disconnected, 232 sm.factory.callbacks['//event/stream/end']) 233 self.assertEquals(sm.initializationFailed, 234 sm.factory.callbacks['//event/xmpp/initfailed']) 210 verifyObject(ijabber.IXMPPHandlerCollection, self.streamManager) 235 211 236 212 237 def test_ connected(self):213 def test_makeConnection(self): 238 214 """ 239 215 Test that protocol handlers have their connectionMade method called 240 216 when the XML stream is connected. … … 242 218 sm = self.streamManager 243 219 handler = DummyXMPPHandler() 244 220 handler.setHandlerParent(sm) 245 xs = xmlstream.XmlStream(xmlstream.Authenticator())246 sm. _connected(xs)221 xs = self.xmlstream 222 sm.makeConnection(xs) 247 223 self.assertEquals(1, handler.doneMade) 248 224 self.assertEquals(0, handler.doneInitialized) 249 225 self.assertEquals(0, handler.doneLost) 250 226 251 227 252 def test_ connectedLogTrafficFalse(self):228 def test_makeConnectionLogTrafficFalse(self): 253 229 """ 254 230 Test raw data functions unset when logTraffic is set to False. 255 231 """ 256 232 sm = self.streamManager 257 233 handler = DummyXMPPHandler() 258 234 handler.setHandlerParent(sm) 259 xs = xmlstream.XmlStream(xmlstream.Authenticator())260 sm. _connected(xs)235 xs = self.xmlstream 236 sm.makeConnection(xs) 261 237 self.assertIdentical(None, xs.rawDataInFn) 262 238 self.assertIdentical(None, xs.rawDataOutFn) 263 239 264 240 265 def test_ connectedLogTrafficTrue(self):241 def test_makeConnectionLogTrafficTrue(self): 266 242 """ 267 243 Test raw data functions set when logTraffic is set to True. 268 244 """ … … 270 246 sm.logTraffic = True 271 247 handler = DummyXMPPHandler() 272 248 handler.setHandlerParent(sm) 273 xs = xmlstream.XmlStream(xmlstream.Authenticator())274 sm. _connected(xs)249 xs = self.xmlstream 250 sm.makeConnection(xs) 275 251 self.assertNotIdentical(None, xs.rawDataInFn) 276 252 self.assertNotIdentical(None, xs.rawDataOutFn) 277 253 278 254 279 def test_ authd(self):255 def test_connectionInitialized(self): 280 256 """ 281 257 Test that protocol handlers have their connectionInitialized method 282 258 called when the XML stream is initialized. … … 284 260 sm = self.streamManager 285 261 handler = DummyXMPPHandler() 286 262 handler.setHandlerParent(sm) 287 xs = xmlstream.XmlStream(xmlstream.Authenticator())288 sm. _authd(xs)263 xs = self.xmlstream 264 sm.connectionInitialized(xs) 289 265 self.assertEquals(0, handler.doneMade) 290 266 self.assertEquals(1, handler.doneInitialized) 291 267 self.assertEquals(0, handler.doneLost) 292 268 293 269 294 def test_ disconnected(self):270 def test_connectionLost(self): 295 271 """ 296 272 Protocol handlers have connectionLost called on stream disconnect. 297 273 """ 298 274 sm = self.streamManager 299 275 handler = DummyXMPPHandler() 300 276 handler.setHandlerParent(sm) 301 sm. _disconnected(None)277 sm.connectionLost(None) 302 278 self.assertEquals(0, handler.doneMade) 303 279 self.assertEquals(0, handler.doneInitialized) 304 280 self.assertEquals(1, handler.doneLost) 305 281 306 282 307 def test_ disconnectedReason(self):283 def test_connectionLostReason(self): 308 284 """ 309 285 A L{STREAM_END_EVENT} results in L{StreamManager} firing the handlers 310 286 L{connectionLost} methods, passing a L{failure.Failure} reason. … … 313 289 handler = FailureReasonXMPPHandler() 314 290 handler.setHandlerParent(sm) 315 291 xmlstream.XmlStream(xmlstream.Authenticator()) 316 sm. _disconnected(failure.Failure(Exception("no reason")))292 sm.connectionLost(failure.Failure(Exception("no reason"))) 317 293 self.assertEquals(True, handler.gotFailureReason) 318 294 319 295 … … 335 311 Adding a handler when connected doesn't call connectionInitialized. 336 312 """ 337 313 sm = self.streamManager 338 xs = xmlstream.XmlStream(xmlstream.Authenticator())339 sm. _connected(xs)314 xs = self.xmlstream 315 sm.makeConnection(xs) 340 316 handler = DummyXMPPHandler() 341 317 handler.setHandlerParent(sm) 342 318 … … 358 334 self.nestedHandler.setHandlerParent(self.parent) 359 335 360 336 sm = self.streamManager 361 xs = xmlstream.XmlStream(xmlstream.Authenticator())337 xs = self.xmlstream 362 338 handler = NestingHandler() 363 339 handler.setHandlerParent(sm) 364 sm. _connected(xs)340 sm.makeConnection(xs) 365 341 366 342 self.assertEquals(1, handler.doneMade) 367 343 self.assertEquals(0, handler.doneInitialized) … … 383 359 called. 384 360 """ 385 361 sm = self.streamManager 386 xs = xmlstream.XmlStream(xmlstream.Authenticator())387 sm. _connected(xs)388 sm. _authd(xs)362 xs = self.xmlstream 363 sm.makeConnection(xs) 364 sm.connectionInitialized(xs) 389 365 handler = DummyXMPPHandler() 390 366 handler.setHandlerParent(sm) 391 367 … … 407 383 self.nestedHandler.setHandlerParent(self.parent) 408 384 409 385 sm = self.streamManager 410 xs = xmlstream.XmlStream(xmlstream.Authenticator())386 xs = self.xmlstream 411 387 handler = NestingHandler() 412 388 handler.setHandlerParent(sm) 413 sm. _connected(xs)414 sm. _authd(xs)389 sm.makeConnection(xs) 390 sm.connectionInitialized(xs) 415 391 416 392 self.assertEquals(1, handler.doneMade) 417 393 self.assertEquals(1, handler.doneInitialized) … … 435 411 self.nestedHandler.setHandlerParent(self.parent) 436 412 437 413 sm = self.streamManager 438 xs = xmlstream.XmlStream(xmlstream.Authenticator())414 xs = self.xmlstream 439 415 handler = NestingHandler() 440 416 handler.setHandlerParent(sm) 441 sm. _connected(xs)442 sm. _authd(xs)443 sm. _disconnected(xs)417 sm.makeConnection(xs) 418 sm.connectionInitialized(xs) 419 sm.connectionLost(xs) 444 420 445 421 self.assertEquals(1, handler.doneMade) 446 422 self.assertEquals(1, handler.doneInitialized) … … 470 446 471 447 The data should be sent directly over the XML stream. 472 448 """ 473 factory = xmlstream.XmlStreamFactory(xmlstream.Authenticator()) 474 sm = subprotocols.StreamManager(factory) 475 xs = factory.buildProtocol(None) 476 xs.transport = proto_helpers.StringTransport() 449 xs = self.xmlstream 477 450 xs.connectionMade() 478 451 xs.dataReceived("<stream:stream xmlns='jabber:client' " 479 452 "xmlns:stream='http://etherx.jabber.org/streams' " 480 453 "from='example.com' id='12345'>") 481 454 xs.dispatch(xs, "//event/stream/authd") 482 s m.send("<presence/>")455 self.streamManager.send("<presence/>") 483 456 self.assertEquals("<presence/>", xs.transport.value()) 484 457 485 458 … … 490 463 The data should be cached until an XML stream has been established and 491 464 initialized. 492 465 """ 493 factory = xmlstream.XmlStreamFactory(xmlstream.Authenticator()) 494 sm = subprotocols.StreamManager(factory) 466 sm = self.streamManager 495 467 handler = DummyXMPPHandler() 496 468 sm.addHandler(handler) 497 469 498 xs = factory.buildProtocol(None)470 xs = self.xmlstream 499 471 xs.transport = proto_helpers.StringTransport() 500 472 sm.send("<presence/>") 501 473 self.assertEquals("", xs.transport.value()) … … 522 494 """ 523 495 factory = xmlstream.XmlStreamFactory(xmlstream.Authenticator()) 524 496 sm = subprotocols.StreamManager(factory) 525 xs = factory.buildProtocol(None)497 xs = self.xmlstream 526 498 xs.transport = proto_helpers.StringTransport() 527 499 xs.connectionMade() 528 500 xs.dataReceived("<stream:stream xmlns='jabber:client' " … … 545 517 handler = DummyXMPPHandler() 546 518 sm.addHandler(handler) 547 519 548 xs = factory.buildProtocol(None)520 xs = self.xmlstream 549 521 xs.connectionMade() 550 522 xs.transport = proto_helpers.StringTransport() 551 523 xs.connectionLost(None) … … 677 649 """ 678 650 d = self.streamManager.request(self.request) 679 651 xs = self.xmlstream 652 xs.connectionMade() 680 653 xs.connectionLost(failure.Failure(ConnectionDone())) 681 654 self.assertFailure(d, ConnectionDone) 682 655 return d … … 692 665 d = xmlstream.IQ(self.xmlstream).send() 693 666 d.addErrback(eb) 694 667 668 self.xmlstream.connectionMade() 695 669 d = self.streamManager.request(self.request) 696 670 d.addErrback(eb) 697 671 self.xmlstream.connectionLost(failure.Failure(ConnectionDone())) … … 736 710 self.request.timeout = 60 737 711 d = self.streamManager.request(self.request) 738 712 713 self.xmlstream.connectionMade() 739 714 self.xmlstream.connectionLost(failure.Failure(ConnectionDone())) 740 715 self.assertFailure(d, ConnectionDone) 741 716 self.assertFalse(self.clock.calls) … … 778 753 779 754 780 755 756 class StreamManagerTest(unittest.TestCase, BaseStreamManagerTestsMixin): 757 """ 758 Tests for L{subprotocols.StreamManager}. 759 """ 760 761 def setUp(self): 762 factory = xmlstream.XmlStreamFactory(xmlstream.Authenticator()) 763 self.clock = task.Clock() 764 self.streamManager = subprotocols.StreamManager(factory, self.clock) 765 self.xmlstream = factory.buildProtocol(None) 766 self.transport = proto_helpers.StringTransport() 767 self.xmlstream.transport = self.transport 768 769 self.request = IQGetStanza() 770 771 772 def test_basic(self): 773 """ 774 Test correct initialization and setup of factory observers. 775 """ 776 factory = DummyFactory() 777 sm = subprotocols.StreamManager(factory) 778 self.assertIdentical(None, sm.xmlstream) 779 self.assertEquals([], sm.handlers) 780 self.assertEquals(sm.makeConnection, 781 sm.factory.callbacks['//event/stream/connected']) 782 783 784 785 class ServerStreamManagerTest(unittest.TestCase, BaseStreamManagerTestsMixin): 786 """ 787 Tests for L{subprotocols.StreamManager}. 788 """ 789 790 def setUp(self): 791 self.clock = task.Clock() 792 self.streamManager = subprotocols.ServerStreamManager( 793 reactor=self.clock) 794 795 self.xmlstream = xmlstream.XmlStream(xmlstream.Authenticator()) 796 self.transport = proto_helpers.StringTransport() 797 self.xmlstream.transport = self.transport 798 799 self.xmlstream.addObserver(xmlstream.STREAM_CONNECTED_EVENT, 800 self.streamManager.makeConnection) 801 802 self.request = IQGetStanza() 803 804 805 781 806 class DummyIQHandler(subprotocols.IQHandlerMixin): 782 807 iqHandlers = {'/iq[@type="get"]': 'onGet'} 783 808
Note: See TracBrowser
for help on using the repository browser.