source:
ralphm-patches/request-tracking.patch
@
53:fd2b3f70b221
Last change on this file since 53:fd2b3f70b221 was 53:fd2b3f70b221, checked in by Ralph Meijer <ralphm@…>, 9 years ago | |
---|---|
File size: 15.7 KB |
-
wokkel/subprotocols.py
# HG changeset patch # Parent 6f36a7f92973d3843a42b4c80791715e694b66a3 Add iq request (set/get) tracking for StreamManager. This adds a new `request` method to StreamManager to serialize an iq get or iq set request object (using toElement), send it over the stream (or queue it until the next time a stream is initialized) and track the response stanza by returning a Deferred. Requests may have a timeout, so that the deferred errbacks after the timeout (counting from the moment of calling `request`) has expired. This implementation is similar to the response tracking done in `twisted.words.protocols.jabber.xmlstream.IQ`, but slighly different: * Requests can be queued until the connection is initialized, even before a XmlStream object is available. I.e. it is tied to the StreamManager, not the stream. * Response timeout starts counting from the moment of calling `request`, not from the moment request is sent over the wire. diff -r 6f36a7f92973 wokkel/subprotocols.py
a b 156 156 @type _initialized: C{bool} 157 157 @ivar _packetQueue: internal buffer of unsent data. See L{send} for details. 158 158 @type _packetQueue: L{list} 159 @ivar timeout: Default IQ request timeout in seconds. 160 @type timeout: C{int} 161 @ivar _reactor: A provider of L{IReactorTime} to track timeouts. 159 162 """ 163 timeout = None 164 _reactor = None 160 165 161 166 logTraffic = False 162 167 163 def __init__(self, factory): 168 def __init__(self, factory, reactor=None): 169 """ 170 Construct a stream manager. 171 172 @param factory: The stream factory to connect with. 173 @param reactor: A provider of L{IReactorTime} to track timeouts. 174 If not provided, the global reactor will be used. 175 """ 164 176 XMPPHandlerCollection.__init__(self) 165 177 self.xmlstream = None 166 178 self._packetQueue = [] … … 173 185 factory.addBootstrap(xmlstream.STREAM_END_EVENT, self._disconnected) 174 186 self.factory = factory 175 187 188 if reactor is None: 189 from twisted.internet import reactor 190 self._reactor = reactor 191 192 # Set up IQ response tracking 193 self._iqDeferreds = {} 194 176 195 177 196 def addHandler(self, handler): 178 197 """ … … 222 241 Send out cached stanzas and call each handler's 223 242 C{connectionInitialized} method. 224 243 """ 244 245 xs.addObserver('/iq[@type="result"]', self._onIQResponse) 246 xs.addObserver('/iq[@type="error"]', self._onIQResponse) 247 225 248 # Flush all pending packets 226 249 for p in self._packetQueue: 227 250 xs.send(p) … … 268 291 for e in list(self): 269 292 e.connectionLost(reason) 270 293 294 # This errbacks all deferreds of iq's for which no response has 295 # been received with a L{ConnectionLost} failure. Otherwise, the 296 # deferreds will never be fired. 297 iqDeferreds = self._iqDeferreds 298 self._iqDeferreds = {} 299 for d in iqDeferreds.itervalues(): 300 d.errback(reason) 301 302 303 def _onIQResponse(self, iq): 304 """ 305 Handle iq response by firing associated deferred. 306 """ 307 try: 308 d = self._iqDeferreds[iq["id"]] 309 except KeyError: 310 return 311 312 del self._iqDeferreds[iq["id"]] 313 iq.handled = True 314 if iq['type'] == 'error': 315 d.errback(error.exceptionFromStanza(iq)) 316 else: 317 d.callback(iq) 318 271 319 272 320 def send(self, obj): 273 321 """ … … 285 333 self._packetQueue.append(obj) 286 334 287 335 336 def request(self, request): 337 """ 338 Send an IQ request and track the response. 339 340 A request is an IQ L{generic.Stanza} of type C{'get'} or C{'set'}. It 341 will have its C{toElement} called to render to a L{domish.Element} 342 which is then sent out over the current stream. If there is no such 343 stream (yet), it is queued and sent whenever a connection is 344 established and initialized, just like L{send}. 345 346 If the request doesn't have an identifier, it will be assigned a fresh 347 one, so the response can be tracked. 348 349 The deferred that is returned will fire with the L{domish.Element} 350 representation of the response if it is a result iq. If the response 351 is an error iq, a corresponding L{error.StanzaError} will be errbacked. 352 353 If the connection is closed before a response was received, the deferred 354 will be errbacked with the reason failure. 355 356 A request may also have a timeout, either by setting a default timeout 357 in L{StreamManager.timeout} or on the C{timeout} attribute of the 358 request. 359 360 @param request: The IQ request. 361 @type request: L{generic.Request} 362 """ 363 if (request.stanzaKind != 'iq' or 364 request.stanzaType not in ('get', 'set')): 365 return defer.fail(ValueError("Not a request")) 366 367 element = request.toElement() 368 369 # Make sure we have a trackable id on the stanza 370 if not request.stanzaID: 371 element.addUniqueId() 372 request.stanzaID = element['id'] 373 374 # Set up iq response tracking 375 d = defer.Deferred() 376 self._iqDeferreds[element['id']] = d 377 378 timeout = getattr(request, 'timeout', self.timeout) 379 380 if timeout is not None: 381 def onTimeout(): 382 del self._iqDeferreds[element['id']] 383 d.errback(xmlstream.TimeoutError("IQ timed out")) 384 385 call = self._reactor.callLater(timeout, onTimeout) 386 387 def cancelTimeout(result): 388 if call.active(): 389 call.cancel() 390 391 return result 392 393 d.addBoth(cancelTimeout) 394 self.send(element) 395 return d 396 397 288 398 289 399 class IQHandlerMixin(object): 290 400 """ -
wokkel/test/test_subprotocols.py
diff -r 6f36a7f92973 wokkel/test/test_subprotocols.py
a b 9 9 10 10 from twisted.trial import unittest 11 11 from twisted.test import proto_helpers 12 from twisted.internet import defer 12 from twisted.internet import defer, task 13 from twisted.internet.error import ConnectionDone 13 14 from twisted.python import failure 14 15 from twisted.words.xish import domish 15 16 from twisted.words.protocols.jabber import error, xmlstream 16 17 17 from wokkel import iwokkel, subprotocols18 from wokkel import generic, iwokkel, subprotocols 18 19 19 20 class DummyFactory(object): 20 21 """ … … 164 165 165 166 166 167 168 class IQGetStanza(generic.Stanza): 169 stanzaKind = 'iq' 170 stanzaType = 'get' 171 stanzaID = 'test' 172 173 174 167 175 class StreamManagerTest(unittest.TestCase): 168 176 """ 169 177 Tests for L{subprotocols.StreamManager}. 170 178 """ 171 179 172 180 def setUp(self): 173 factory = DummyFactory() 174 self.streamManager = subprotocols.StreamManager(factory) 181 factory = xmlstream.XmlStreamFactory(xmlstream.Authenticator()) 182 self.clock = task.Clock() 183 self.streamManager = subprotocols.StreamManager(factory, self.clock) 184 self.xmlstream = factory.buildProtocol(None) 185 self.transport = proto_helpers.StringTransport() 186 self.xmlstream.transport = self.transport 187 188 self.request = IQGetStanza() 189 190 def _streamStarted(self): 191 """ 192 Bring the test stream to the initialized state. 193 """ 194 self.xmlstream.connectionMade() 195 self.xmlstream.dataReceived( 196 "<stream:stream xmlns='jabber:client' " 197 "xmlns:stream='http://etherx.jabber.org/streams' " 198 "from='example.com' id='12345'>") 199 self.xmlstream.dispatch(self.xmlstream, "//event/stream/authd") 200 175 201 176 202 def test_basic(self): 177 203 """ 178 204 Test correct initialization and setup of factory observers. 179 205 """ 180 sm = self.streamManager 206 factory = DummyFactory() 207 sm = subprotocols.StreamManager(factory) 181 208 self.assertIdentical(None, sm.xmlstream) 182 209 self.assertEquals([], sm.handlers) 183 210 self.assertEquals(sm._connected, … … 407 434 self.assertEquals(0, handler.nestedHandler.doneLost) 408 435 409 436 437 410 438 def test_removeHandler(self): 411 439 """ 412 440 Test removal of protocol handler. … … 418 446 self.assertNotIn(handler, sm) 419 447 self.assertIdentical(None, handler.parent) 420 448 449 421 450 def test_sendInitialized(self): 422 451 """ 423 452 Test send when the stream has been initialized. … … 509 538 self.assertEquals("<presence/>", sm._packetQueue[0]) 510 539 511 540 541 def test_requestSendInitialized(self): 542 """ 543 A request is sent out over the wire when the stream is initialized. 544 """ 545 self._streamStarted() 546 547 self.streamManager.request(self.request) 548 expected = u"<iq type='get' id='%s'/>" % self.request.stanzaID 549 self.assertEquals(expected, self.transport.value()) 550 551 552 def test_requestSendInitializedFreshID(self): 553 """ 554 A request without an ID gets a fresh one upon send. 555 """ 556 self._streamStarted() 557 558 self.request.stanzaID = None 559 self.streamManager.request(self.request) 560 self.assertNotIdentical(None, self.request.stanzaID) 561 expected = u"<iq type='get' id='%s'/>" % self.request.stanzaID 562 self.assertEquals(expected, self.transport.value()) 563 564 565 def test_requestSendNotConnected(self): 566 """ 567 A request is queued until a stream is initialized. 568 """ 569 handler = DummyXMPPHandler() 570 self.streamManager.addHandler(handler) 571 572 self.streamManager.request(self.request) 573 expected = u"<iq type='get' id='test'/>" 574 575 xs = self.xmlstream 576 self.assertEquals("", xs.transport.value()) 577 578 xs.connectionMade() 579 self.assertEquals("", xs.transport.value()) 580 581 xs.dataReceived("<stream:stream xmlns='jabber:client' " 582 "xmlns:stream='http://etherx.jabber.org/streams' " 583 "from='example.com' id='12345'>") 584 xs.dispatch(xs, "//event/stream/authd") 585 586 self.assertEquals(expected, xs.transport.value()) 587 self.assertFalse(self.streamManager._packetQueue) 588 589 590 def test_requestResultResponse(self): 591 """ 592 A result response gets the request deferred fired with the response. 593 """ 594 def cb(result): 595 self.assertEquals(result['type'], 'result') 596 597 self._streamStarted() 598 d = self.streamManager.request(self.request) 599 d.addCallback(cb) 600 601 xs = self.xmlstream 602 xs.dataReceived("<iq type='result' id='test'/>") 603 return d 604 605 606 def test_requestErrorResponse(self): 607 """ 608 An error response gets the request deferred fired with a failure. 609 """ 610 self._streamStarted() 611 d = self.streamManager.request(self.request) 612 self.assertFailure(d, error.StanzaError) 613 614 xs = self.xmlstream 615 xs.dataReceived("<iq type='error' id='test'/>") 616 return d 617 618 619 def test_requestNonTrackedResponse(self): 620 """ 621 Test that untracked iq responses don't trigger any action. 622 623 Untracked means that the id of the incoming response iq is not 624 in the stream's C{iqDeferreds} dictionary. 625 """ 626 # Set up a fallback handler that checks the stanza's handled attribute. 627 # If that is set to True, the iq tracker claims to have handled the 628 # response. 629 dispatched = [] 630 def cb(iq): 631 dispatched.append(iq) 632 633 self._streamStarted() 634 self.xmlstream.addObserver("/iq", cb, -1) 635 636 # Receive an untracked iq response 637 self.xmlstream.dataReceived("<iq type='result' id='other'/>") 638 self.assertEquals(1, len(dispatched)) 639 self.assertFalse(getattr(dispatched[-1], 'handled', False)) 640 641 642 def test_requestCleanup(self): 643 """ 644 Test if the deferred associated with an iq request is removed 645 from the list kept in the L{XmlStream} object after it has 646 been fired. 647 """ 648 self._streamStarted() 649 d = self.streamManager.request(self.request) 650 xs = self.xmlstream 651 xs.dataReceived("<iq type='result' id='test'/>") 652 self.assertNotIn('test', self.streamManager._iqDeferreds) 653 return d 654 655 656 def test_requestDisconnectCleanup(self): 657 """ 658 Test if deferreds for iq's that haven't yet received a response 659 have their errback called on stream disconnect. 660 """ 661 d = self.streamManager.request(self.request) 662 xs = self.xmlstream 663 xs.connectionLost(failure.Failure(ConnectionDone())) 664 self.assertFailure(d, ConnectionDone) 665 return d 666 667 668 def test_requestNoModifyingDict(self): 669 """ 670 Test to make sure the errbacks cannot cause the iteration of the 671 iqDeferreds to blow up in our face. 672 """ 673 674 def eb(failure): 675 d = xmlstream.IQ(self.xmlstream).send() 676 d.addErrback(eb) 677 678 d = self.streamManager.request(self.request) 679 d.addErrback(eb) 680 self.xmlstream.connectionLost(failure.Failure(ConnectionDone())) 681 return d 682 683 684 def test_requestTimingOut(self): 685 """ 686 Test that an iq request with a defined timeout times out. 687 """ 688 self.request.timeout = 60 689 d = self.streamManager.request(self.request) 690 self.assertFailure(d, xmlstream.TimeoutError) 691 692 self.clock.pump([1, 60]) 693 self.assertFalse(self.clock.calls) 694 self.assertFalse(self.streamManager._iqDeferreds) 695 return d 696 697 698 def test_requestNotTimingOut(self): 699 """ 700 Test that an iq request with a defined timeout does not time out 701 when a response was received before the timeout period elapsed. 702 """ 703 self._streamStarted() 704 self.request.timeout = 60 705 d = self.streamManager.request(self.request) 706 self.clock.callLater(1, self.xmlstream.dataReceived, 707 "<iq type='result' id='test'/>") 708 self.clock.pump([1, 1]) 709 self.assertFalse(self.clock.calls) 710 return d 711 712 713 def test_requestDisconnectTimeoutCancellation(self): 714 """ 715 Test if timeouts for iq's that haven't yet received a response 716 are cancelled on stream disconnect. 717 """ 718 719 self.request.timeout = 60 720 d = self.streamManager.request(self.request) 721 722 self.xmlstream.connectionLost(failure.Failure(ConnectionDone())) 723 self.assertFailure(d, ConnectionDone) 724 self.assertFalse(self.clock.calls) 725 return d 726 727 728 def test_requestNotIQ(self): 729 """ 730 The request stanza must be an iq. 731 """ 732 stanza = generic.Stanza() 733 stanza.stanzaKind = 'message' 734 735 d = self.streamManager.request(stanza) 736 self.assertFailure(d, ValueError) 737 738 739 def test_requestNotResult(self): 740 """ 741 The request stanza cannot be of type 'result'. 742 """ 743 stanza = generic.Stanza() 744 stanza.stanzaKind = 'iq' 745 stanza.stanzaType = 'result' 746 747 d = self.streamManager.request(stanza) 748 self.assertFailure(d, ValueError) 749 750 751 def test_requestNotError(self): 752 """ 753 The request stanza cannot be of type 'error'. 754 """ 755 stanza = generic.Stanza() 756 stanza.stanzaKind = 'iq' 757 stanza.stanzaType = 'error' 758 759 d = self.streamManager.request(stanza) 760 self.assertFailure(d, ValueError) 761 762 512 763 513 764 class DummyIQHandler(subprotocols.IQHandlerMixin): 514 765 iqHandlers = {'/iq[@type="get"]': 'onGet'}
Note: See TracBrowser
for help on using the repository browser.