source:
ralphm-patches/server-stream-manager.patch
@
72:727b4d29c48e
Last change on this file since 72:727b4d29c48e was 72:727b4d29c48e, checked in by Ralph Meijer <ralphm@…>, 10 years ago | |
---|---|
File size: 19.3 KB |
-
wokkel/client.py
# HG changeset patch # Parent 3f3fe954b1975c2d9115e0fa8177ae7b28a708a8 Generalize StreamManager and add ServerStreamManager. This generalizes `StreamManager` to `BaseStreamManager` to take the common functionality and reuse it for `ServerStreamManager`. Where `StreamManager` is used for initiating connections, with a factory as a parameter, `ServerStreamManager` works for receiving connections. It must be created in a protocol factory when a connection is established, and then its `makeConnection` should be called to hook it up to the `XmlStream` instance. diff --git a/wokkel/client.py b/wokkel/client.py
a b 109 109 self._connection.disconnect() 110 110 111 111 112 def _authd(self, xs):112 def connectionInitialized(self, xs): 113 113 """ 114 114 Called when the stream has been initialized. 115 115 … … 118 118 by its constituent initializers. 119 119 """ 120 120 self.jid = self.factory.authenticator.jid 121 StreamManager. _authd(self, xs)121 StreamManager.connectionInitialized(self, xs) 122 122 123 123 124 124 def initializationFailed(self, reason): -
wokkel/subprotocols.py
diff --git a/wokkel/subprotocols.py b/wokkel/subprotocols.py
a b 118 118 119 119 120 120 121 class StreamManager(XMPPHandlerCollection):121 class BaseStreamManager(XMPPHandlerCollection): 122 122 """ 123 123 Business logic representing a managed XMPP connection. 124 124 … … 129 129 130 130 @ivar xmlstream: currently managed XML stream 131 131 @type xmlstream: L{XmlStream} 132 132 133 @ivar logTraffic: if true, log all traffic. 133 134 @type logTraffic: C{bool} 135 134 136 @ivar _initialized: Whether the stream represented by L{xmlstream} has 135 137 been initialized. This is used when caching outgoing 136 138 stanzas. 137 139 @type _initialized: C{bool} 138 @ivar _packetQueue: internal buffer of unsent data. See L{send} for details. 139 @type _packetQueue: L{list} 140 140 141 @ivar timeout: Default IQ request timeout in seconds. 141 142 @type timeout: C{int} 143 142 144 @ivar _reactor: A provider of L{IReactorTime} to track timeouts. 143 145 """ 146 144 147 timeout = None 145 148 _reactor = None 146 149 147 150 logTraffic = False 148 151 149 def __init__(self, factory,reactor=None):152 def __init__(self, reactor=None): 150 153 """ 151 154 Construct a stream manager. 152 155 153 @param factory: The stream factory to connect with.154 156 @param reactor: A provider of L{IReactorTime} to track timeouts. 155 157 If not provided, the global reactor will be used. 156 158 """ 157 159 XMPPHandlerCollection.__init__(self) 160 158 161 self.xmlstream = None 159 162 self._packetQueue = [] 160 163 self._initialized = False 161 164 162 factory.addBootstrap(xmlstream.STREAM_CONNECTED_EVENT, self._connected)163 factory.addBootstrap(xmlstream.STREAM_AUTHD_EVENT, self._authd)164 factory.addBootstrap(xmlstream.INIT_FAILED_EVENT,165 self.initializationFailed)166 factory.addBootstrap(xmlstream.STREAM_END_EVENT, self._disconnected)167 self.factory = factory168 169 165 if reactor is None: 170 166 from twisted.internet import reactor 171 167 self._reactor = reactor … … 191 187 handler.connectionInitialized() 192 188 193 189 194 def _connected(self, xs):190 def makeConnection(self, xs): 195 191 """ 196 192 Called when the transport connection has been established. 197 193 … … 209 205 xs.rawDataInFn = logDataIn 210 206 xs.rawDataOutFn = logDataOut 211 207 208 xs.addObserver(xmlstream.STREAM_AUTHD_EVENT, 209 self.connectionInitialized) 210 xs.addObserver(xmlstream.STREAM_END_EVENT, 211 self.connectionLost) 212 212 self.xmlstream = xs 213 213 214 214 for e in list(self): 215 215 e.makeConnection(xs) 216 216 217 217 218 def _authd(self, xs):218 def connectionInitialized(self, xs): 219 219 """ 220 220 Called when the stream has been initialized. 221 221 … … 238 238 e.connectionInitialized() 239 239 240 240 241 def initializationFailed(self, reason):242 """243 Called when stream initialization has failed.244 241 245 Stream initialization has halted, with the reason indicated by 246 C{reason}. It may be retried by calling the authenticator's 247 C{initializeStream}. See the respective authenticators for details. 248 249 @param reason: A failure instance indicating why stream initialization 250 failed. 251 @type reason: L{failure.Failure} 252 """ 253 254 255 def _disconnected(self, reason): 242 def connectionLost(self, reason): 256 243 """ 257 244 Called when the stream has been closed. 258 245 … … 379 366 380 367 381 368 369 class StreamManager(BaseStreamManager): 370 """ 371 Business logic representing a managed XMPP client connection. 372 373 @ivar _packetQueue: internal buffer of unsent data. See L{send} for details. 374 @type _packetQueue: L{list} 375 """ 376 377 def __init__(self, factory, reactor=None): 378 """ 379 Construct a stream manager. 380 381 @param factory: The stream factory to connect with. 382 @param reactor: A provider of L{IReactorTime} to track timeouts. 383 If not provided, the global reactor will be used. 384 """ 385 BaseStreamManager.__init__(self, reactor=reactor) 386 387 factory.addBootstrap(xmlstream.STREAM_CONNECTED_EVENT, 388 self.makeConnection) 389 self.factory = factory 390 391 392 def makeConnection(self, xs): 393 xs.addObserver(xmlstream.INIT_FAILED_EVENT, 394 self.initializationFailed) 395 BaseStreamManager.makeConnection(self, xs) 396 397 398 def initializationFailed(self, reason): 399 """ 400 Called when stream initialization has failed. 401 402 Stream initialization has halted, with the reason indicated by 403 C{reason}. It may be retried by calling the authenticator's 404 C{initializeStream}. See the respective authenticators for details. 405 406 @param reason: A failure instance indicating why stream initialization 407 failed. 408 @type reason: L{failure.Failure} 409 """ 410 411 412 413 class ServerStreamManager(BaseStreamManager): 414 """ 415 Business logic representing a managed server-side XMPP connection. 416 """ 417 418 def __init__(self, reactor=None): 419 BaseStreamManager.__init__(self, reactor=reactor) 420 421 422 382 423 class IQHandlerMixin(object): 383 424 """ 384 425 XMPP subprotocol mixin for handle incoming IQ stanzas. -
wokkel/test/helpers.py
diff --git a/wokkel/test/helpers.py b/wokkel/test/helpers.py
a b 108 108 factory = DummyFactory() 109 109 StreamManager.__init__(self, factory, reactor) 110 110 self.stub = XmlStreamStub() 111 self. _connected(self.stub.xmlstream)112 self. _authd(self.stub.xmlstream)111 self.makeConnection(self.stub.xmlstream) 112 self.connectionInitialized(self.stub.xmlstream) -
wokkel/test/test_client.py
diff --git a/wokkel/test/test_client.py b/wokkel/test/test_client.py
a b 38 38 """ 39 39 xs = self.client.factory.buildProtocol(None) 40 40 self.client.factory.authenticator.jid = JID('user@example.org/test') 41 xs.dispatch(xs, xmlstream.STREAM_AUTHD_EVENT)41 self.client.connectionInitialized(xs) 42 42 self.assertEquals(JID('user@example.org/test'), self.client.jid) 43 43 44 44 -
wokkel/test/test_subprotocols.py
diff --git a/wokkel/test/test_subprotocols.py b/wokkel/test/test_subprotocols.py
a b 189 189 190 190 191 191 192 class StreamManagerTest(unittest.TestCase): 193 """ 194 Tests for L{subprotocols.StreamManager}. 195 """ 196 197 def setUp(self): 198 factory = xmlstream.XmlStreamFactory(xmlstream.Authenticator()) 199 self.clock = task.Clock() 200 self.streamManager = subprotocols.StreamManager(factory, self.clock) 201 self.xmlstream = factory.buildProtocol(None) 202 self.transport = proto_helpers.StringTransport() 203 self.xmlstream.transport = self.transport 204 205 self.request = IQGetStanza() 192 class BaseStreamManagerTestsMixin(object): 206 193 207 194 def _streamStarted(self): 208 195 """ … … 216 203 self.xmlstream.dispatch(self.xmlstream, "//event/stream/authd") 217 204 218 205 219 def test_ basic(self):206 def test_interface(self): 220 207 """ 221 Test correct initialization and setup of factory observers.208 ServerStreamManager implements IXMPPHandlerCollection. 222 209 """ 223 factory = DummyFactory() 224 sm = subprotocols.StreamManager(factory) 225 self.assertIdentical(None, sm.xmlstream) 226 self.assertEquals([], sm.handlers) 227 self.assertEquals(sm._connected, 228 sm.factory.callbacks['//event/stream/connected']) 229 self.assertEquals(sm._authd, 230 sm.factory.callbacks['//event/stream/authd']) 231 self.assertEquals(sm._disconnected, 232 sm.factory.callbacks['//event/stream/end']) 233 self.assertEquals(sm.initializationFailed, 234 sm.factory.callbacks['//event/xmpp/initfailed']) 210 verifyObject(ijabber.IXMPPHandlerCollection, self.streamManager) 235 211 236 212 237 def test_ connected(self):213 def test_makeConnection(self): 238 214 """ 239 215 Test that protocol handlers have their connectionMade method called 240 216 when the XML stream is connected. … … 242 218 sm = self.streamManager 243 219 handler = DummyXMPPHandler() 244 220 handler.setHandlerParent(sm) 245 xs = xmlstream.XmlStream(xmlstream.Authenticator())246 sm. _connected(xs)221 xs = self.xmlstream 222 sm.makeConnection(xs) 247 223 self.assertEquals(1, handler.doneMade) 248 224 self.assertEquals(0, handler.doneInitialized) 249 225 self.assertEquals(0, handler.doneLost) 250 226 251 227 252 def test_ connectedLogTrafficFalse(self):228 def test_makeConnectionLogTrafficFalse(self): 253 229 """ 254 230 Test raw data functions unset when logTraffic is set to False. 255 231 """ 256 232 sm = self.streamManager 257 233 handler = DummyXMPPHandler() 258 234 handler.setHandlerParent(sm) 259 xs = xmlstream.XmlStream(xmlstream.Authenticator())260 sm. _connected(xs)235 xs = self.xmlstream 236 sm.makeConnection(xs) 261 237 self.assertIdentical(None, xs.rawDataInFn) 262 238 self.assertIdentical(None, xs.rawDataOutFn) 263 239 264 240 265 def test_ connectedLogTrafficTrue(self):241 def test_makeConnectionLogTrafficTrue(self): 266 242 """ 267 243 Test raw data functions set when logTraffic is set to True. 268 244 """ … … 270 246 sm.logTraffic = True 271 247 handler = DummyXMPPHandler() 272 248 handler.setHandlerParent(sm) 273 xs = xmlstream.XmlStream(xmlstream.Authenticator())274 sm. _connected(xs)249 xs = self.xmlstream 250 sm.makeConnection(xs) 275 251 self.assertNotIdentical(None, xs.rawDataInFn) 276 252 self.assertNotIdentical(None, xs.rawDataOutFn) 277 253 278 254 279 def test_ authd(self):255 def test_connectionInitialized(self): 280 256 """ 281 257 Test that protocol handlers have their connectionInitialized method 282 258 called when the XML stream is initialized. … … 284 260 sm = self.streamManager 285 261 handler = DummyXMPPHandler() 286 262 handler.setHandlerParent(sm) 287 xs = xmlstream.XmlStream(xmlstream.Authenticator())288 sm. _authd(xs)263 xs = self.xmlstream 264 sm.connectionInitialized(xs) 289 265 self.assertEquals(0, handler.doneMade) 290 266 self.assertEquals(1, handler.doneInitialized) 291 267 self.assertEquals(0, handler.doneLost) 292 268 293 269 294 def test_ disconnected(self):270 def test_connectionLost(self): 295 271 """ 296 272 Protocol handlers have connectionLost called on stream disconnect. 297 273 """ 298 274 sm = self.streamManager 299 275 handler = DummyXMPPHandler() 300 276 handler.setHandlerParent(sm) 301 sm. _disconnected(None)277 sm.connectionLost(None) 302 278 self.assertEquals(0, handler.doneMade) 303 279 self.assertEquals(0, handler.doneInitialized) 304 280 self.assertEquals(1, handler.doneLost) 305 281 306 282 307 def test_ disconnectedReason(self):283 def test_connectionLostReason(self): 308 284 """ 309 285 A L{STREAM_END_EVENT} results in L{StreamManager} firing the handlers 310 286 L{connectionLost} methods, passing a L{failure.Failure} reason. … … 313 289 handler = FailureReasonXMPPHandler() 314 290 handler.setHandlerParent(sm) 315 291 xmlstream.XmlStream(xmlstream.Authenticator()) 316 sm. _disconnected(failure.Failure(Exception("no reason")))292 sm.connectionLost(failure.Failure(Exception("no reason"))) 317 293 self.assertEquals(True, handler.gotFailureReason) 318 294 319 295 … … 335 311 Adding a handler when connected doesn't call connectionInitialized. 336 312 """ 337 313 sm = self.streamManager 338 xs = xmlstream.XmlStream(xmlstream.Authenticator())339 sm. _connected(xs)314 xs = self.xmlstream 315 sm.makeConnection(xs) 340 316 handler = DummyXMPPHandler() 341 317 handler.setHandlerParent(sm) 342 318 … … 358 334 self.nestedHandler.setHandlerParent(self.parent) 359 335 360 336 sm = self.streamManager 361 xs = xmlstream.XmlStream(xmlstream.Authenticator())337 xs = self.xmlstream 362 338 handler = NestingHandler() 363 339 handler.setHandlerParent(sm) 364 sm. _connected(xs)340 sm.makeConnection(xs) 365 341 366 342 self.assertEquals(1, handler.doneMade) 367 343 self.assertEquals(0, handler.doneInitialized) … … 383 359 called. 384 360 """ 385 361 sm = self.streamManager 386 xs = xmlstream.XmlStream(xmlstream.Authenticator())387 sm. _connected(xs)388 sm. _authd(xs)362 xs = self.xmlstream 363 sm.makeConnection(xs) 364 sm.connectionInitialized(xs) 389 365 handler = DummyXMPPHandler() 390 366 handler.setHandlerParent(sm) 391 367 … … 407 383 self.nestedHandler.setHandlerParent(self.parent) 408 384 409 385 sm = self.streamManager 410 xs = xmlstream.XmlStream(xmlstream.Authenticator())386 xs = self.xmlstream 411 387 handler = NestingHandler() 412 388 handler.setHandlerParent(sm) 413 sm. _connected(xs)414 sm. _authd(xs)389 sm.makeConnection(xs) 390 sm.connectionInitialized(xs) 415 391 416 392 self.assertEquals(1, handler.doneMade) 417 393 self.assertEquals(1, handler.doneInitialized) … … 435 411 self.nestedHandler.setHandlerParent(self.parent) 436 412 437 413 sm = self.streamManager 438 xs = xmlstream.XmlStream(xmlstream.Authenticator())414 xs = self.xmlstream 439 415 handler = NestingHandler() 440 416 handler.setHandlerParent(sm) 441 sm. _connected(xs)442 sm. _authd(xs)443 sm. _disconnected(xs)417 sm.makeConnection(xs) 418 sm.connectionInitialized(xs) 419 sm.connectionLost(xs) 444 420 445 421 self.assertEquals(1, handler.doneMade) 446 422 self.assertEquals(1, handler.doneInitialized) … … 470 446 471 447 The data should be sent directly over the XML stream. 472 448 """ 473 factory = xmlstream.XmlStreamFactory(xmlstream.Authenticator()) 474 sm = subprotocols.StreamManager(factory) 475 xs = factory.buildProtocol(None) 476 xs.transport = proto_helpers.StringTransport() 449 xs = self.xmlstream 477 450 xs.connectionMade() 478 451 xs.dataReceived("<stream:stream xmlns='jabber:client' " 479 452 "xmlns:stream='http://etherx.jabber.org/streams' " 480 453 "from='example.com' id='12345'>") 481 454 xs.dispatch(xs, "//event/stream/authd") 482 s m.send("<presence/>")455 self.streamManager.send("<presence/>") 483 456 self.assertEquals("<presence/>", xs.transport.value()) 484 457 485 458 … … 490 463 The data should be cached until an XML stream has been established and 491 464 initialized. 492 465 """ 493 factory = xmlstream.XmlStreamFactory(xmlstream.Authenticator()) 494 sm = subprotocols.StreamManager(factory) 466 sm = self.streamManager 495 467 handler = DummyXMPPHandler() 496 468 sm.addHandler(handler) 497 469 498 xs = factory.buildProtocol(None)470 xs = self.xmlstream 499 471 xs.transport = proto_helpers.StringTransport() 500 472 sm.send("<presence/>") 501 473 self.assertEquals("", xs.transport.value()) … … 522 494 """ 523 495 factory = xmlstream.XmlStreamFactory(xmlstream.Authenticator()) 524 496 sm = subprotocols.StreamManager(factory) 525 xs = factory.buildProtocol(None)497 xs = self.xmlstream 526 498 xs.transport = proto_helpers.StringTransport() 527 499 xs.connectionMade() 528 500 xs.dataReceived("<stream:stream xmlns='jabber:client' " … … 545 517 handler = DummyXMPPHandler() 546 518 sm.addHandler(handler) 547 519 548 xs = factory.buildProtocol(None)520 xs = self.xmlstream 549 521 xs.connectionMade() 550 522 xs.transport = proto_helpers.StringTransport() 551 523 xs.connectionLost(None) … … 677 649 """ 678 650 d = self.streamManager.request(self.request) 679 651 xs = self.xmlstream 652 xs.connectionMade() 680 653 xs.connectionLost(failure.Failure(ConnectionDone())) 681 654 self.assertFailure(d, ConnectionDone) 682 655 return d … … 692 665 d = xmlstream.IQ(self.xmlstream).send() 693 666 d.addErrback(eb) 694 667 668 self.xmlstream.connectionMade() 695 669 d = self.streamManager.request(self.request) 696 670 d.addErrback(eb) 697 671 self.xmlstream.connectionLost(failure.Failure(ConnectionDone())) … … 736 710 self.request.timeout = 60 737 711 d = self.streamManager.request(self.request) 738 712 713 self.xmlstream.connectionMade() 739 714 self.xmlstream.connectionLost(failure.Failure(ConnectionDone())) 740 715 self.assertFailure(d, ConnectionDone) 741 716 self.assertFalse(self.clock.calls) … … 778 753 779 754 780 755 756 class StreamManagerTest(unittest.TestCase, BaseStreamManagerTestsMixin): 757 """ 758 Tests for L{subprotocols.StreamManager}. 759 """ 760 761 def setUp(self): 762 factory = xmlstream.XmlStreamFactory(xmlstream.Authenticator()) 763 self.clock = task.Clock() 764 self.streamManager = subprotocols.StreamManager(factory, self.clock) 765 self.xmlstream = factory.buildProtocol(None) 766 self.transport = proto_helpers.StringTransport() 767 self.xmlstream.transport = self.transport 768 769 self.request = IQGetStanza() 770 771 772 def test_basic(self): 773 """ 774 Test correct initialization and setup of factory observers. 775 """ 776 factory = DummyFactory() 777 sm = subprotocols.StreamManager(factory) 778 self.assertIdentical(None, sm.xmlstream) 779 self.assertEquals([], sm.handlers) 780 self.assertEquals(sm.makeConnection, 781 sm.factory.callbacks['//event/stream/connected']) 782 783 784 785 class ServerStreamManagerTest(unittest.TestCase, BaseStreamManagerTestsMixin): 786 """ 787 Tests for L{subprotocols.StreamManager}. 788 """ 789 790 def setUp(self): 791 self.clock = task.Clock() 792 self.streamManager = subprotocols.ServerStreamManager( 793 reactor=self.clock) 794 795 self.xmlstream = xmlstream.XmlStream(xmlstream.Authenticator()) 796 self.transport = proto_helpers.StringTransport() 797 self.xmlstream.transport = self.transport 798 799 self.xmlstream.addObserver(xmlstream.STREAM_CONNECTED_EVENT, 800 self.streamManager.makeConnection) 801 802 self.request = IQGetStanza() 803 804 805 781 806 class DummyIQHandler(subprotocols.IQHandlerMixin): 782 807 iqHandlers = {'/iq[@type="get"]': 'onGet'} 783 808
Note: See TracBrowser
for help on using the repository browser.