source: ralphm-patches/request-tracking.patch @ 53:fd2b3f70b221

Last change on this file since 53:fd2b3f70b221 was 53:fd2b3f70b221, checked in by Ralph Meijer <ralphm@…>, 9 years ago

Small cleanups for request handling.

File size: 15.7 KB
  • wokkel/subprotocols.py

    # HG changeset patch
    # Parent 6f36a7f92973d3843a42b4c80791715e694b66a3
    Add iq request (set/get) tracking for StreamManager.
    
    This adds a new `request` method to StreamManager to serialize an iq get or iq
    set request object (using toElement), send it over the stream (or queue it
    until the next time a stream is initialized) and track the response stanza by
    returning a Deferred.
    
    Requests may have a timeout, so that the deferred errbacks after the timeout
    (counting from the moment of calling `request`) has expired.
    
    This implementation is similar to the response tracking done in
    `twisted.words.protocols.jabber.xmlstream.IQ`, but slighly different:
    
      * Requests can be queued until the connection is initialized, even before
        a XmlStream object is available. I.e. it is tied to the StreamManager, not
        the stream.
      * Response timeout starts counting from the moment of calling `request`, not
        from the moment request is sent over the wire.
    
    diff -r 6f36a7f92973 wokkel/subprotocols.py
    a b  
    156156    @type _initialized: C{bool}
    157157    @ivar _packetQueue: internal buffer of unsent data. See L{send} for details.
    158158    @type _packetQueue: L{list}
     159    @ivar timeout: Default IQ request timeout in seconds.
     160    @type timeout: C{int}
     161    @ivar _reactor: A provider of L{IReactorTime} to track timeouts.
    159162    """
     163    timeout = None
     164    _reactor = None
    160165
    161166    logTraffic = False
    162167
    163     def __init__(self, factory):
     168    def __init__(self, factory, reactor=None):
     169        """
     170        Construct a stream manager.
     171
     172        @param factory: The stream factory to connect with.
     173        @param reactor: A provider of L{IReactorTime} to track timeouts.
     174            If not provided, the global reactor will be used.
     175        """
    164176        XMPPHandlerCollection.__init__(self)
    165177        self.xmlstream = None
    166178        self._packetQueue = []
     
    173185        factory.addBootstrap(xmlstream.STREAM_END_EVENT, self._disconnected)
    174186        self.factory = factory
    175187
     188        if reactor is None:
     189            from twisted.internet import reactor
     190        self._reactor = reactor
     191
     192        # Set up IQ response tracking
     193        self._iqDeferreds = {}
     194
    176195
    177196    def addHandler(self, handler):
    178197        """
     
    222241        Send out cached stanzas and call each handler's
    223242        C{connectionInitialized} method.
    224243        """
     244
     245        xs.addObserver('/iq[@type="result"]', self._onIQResponse)
     246        xs.addObserver('/iq[@type="error"]', self._onIQResponse)
     247
    225248        # Flush all pending packets
    226249        for p in self._packetQueue:
    227250            xs.send(p)
     
    268291        for e in list(self):
    269292            e.connectionLost(reason)
    270293
     294        # This errbacks all deferreds of iq's for which no response has
     295        # been received with a L{ConnectionLost} failure. Otherwise, the
     296        # deferreds will never be fired.
     297        iqDeferreds = self._iqDeferreds
     298        self._iqDeferreds = {}
     299        for d in iqDeferreds.itervalues():
     300            d.errback(reason)
     301
     302
     303    def _onIQResponse(self, iq):
     304        """
     305        Handle iq response by firing associated deferred.
     306        """
     307        try:
     308            d = self._iqDeferreds[iq["id"]]
     309        except KeyError:
     310            return
     311
     312        del self._iqDeferreds[iq["id"]]
     313        iq.handled = True
     314        if iq['type'] == 'error':
     315            d.errback(error.exceptionFromStanza(iq))
     316        else:
     317            d.callback(iq)
     318
    271319
    272320    def send(self, obj):
    273321        """
     
    285333            self._packetQueue.append(obj)
    286334
    287335
     336    def request(self, request):
     337        """
     338        Send an IQ request and track the response.
     339
     340        A request is an IQ L{generic.Stanza} of type C{'get'} or C{'set'}. It
     341        will have its C{toElement} called to render to a L{domish.Element}
     342        which is then sent out over the current stream. If there is no such
     343        stream (yet), it is queued and sent whenever a connection is
     344        established and initialized, just like L{send}.
     345
     346        If the request doesn't have an identifier, it will be assigned a fresh
     347        one, so the response can be tracked.
     348
     349        The deferred that is returned will fire with the L{domish.Element}
     350        representation of the response if it is a result iq. If the response
     351        is an error iq, a corresponding L{error.StanzaError} will be errbacked.
     352
     353        If the connection is closed before a response was received, the deferred
     354        will be errbacked with the reason failure.
     355
     356        A request may also have a timeout, either by setting a default timeout
     357        in L{StreamManager.timeout} or on the C{timeout} attribute of the
     358        request.
     359
     360        @param request: The IQ request.
     361        @type request: L{generic.Request}
     362        """
     363        if (request.stanzaKind != 'iq' or
     364            request.stanzaType not in ('get', 'set')):
     365            return defer.fail(ValueError("Not a request"))
     366
     367        element = request.toElement()
     368
     369        # Make sure we have a trackable id on the stanza
     370        if not request.stanzaID:
     371            element.addUniqueId()
     372            request.stanzaID = element['id']
     373
     374        # Set up iq response tracking
     375        d = defer.Deferred()
     376        self._iqDeferreds[element['id']] = d
     377
     378        timeout = getattr(request, 'timeout', self.timeout)
     379
     380        if timeout is not None:
     381            def onTimeout():
     382                del self._iqDeferreds[element['id']]
     383                d.errback(xmlstream.TimeoutError("IQ timed out"))
     384
     385            call = self._reactor.callLater(timeout, onTimeout)
     386
     387            def cancelTimeout(result):
     388                if call.active():
     389                    call.cancel()
     390
     391                return result
     392
     393            d.addBoth(cancelTimeout)
     394        self.send(element)
     395        return d
     396
     397
    288398
    289399class IQHandlerMixin(object):
    290400    """
  • wokkel/test/test_subprotocols.py

    diff -r 6f36a7f92973 wokkel/test/test_subprotocols.py
    a b  
    99
    1010from twisted.trial import unittest
    1111from twisted.test import proto_helpers
    12 from twisted.internet import defer
     12from twisted.internet import defer, task
     13from twisted.internet.error import ConnectionDone
    1314from twisted.python import failure
    1415from twisted.words.xish import domish
    1516from twisted.words.protocols.jabber import error, xmlstream
    1617
    17 from wokkel import iwokkel, subprotocols
     18from wokkel import generic, iwokkel, subprotocols
    1819
    1920class DummyFactory(object):
    2021    """
     
    164165
    165166
    166167
     168class IQGetStanza(generic.Stanza):
     169    stanzaKind = 'iq'
     170    stanzaType = 'get'
     171    stanzaID = 'test'
     172
     173
     174
    167175class StreamManagerTest(unittest.TestCase):
    168176    """
    169177    Tests for L{subprotocols.StreamManager}.
    170178    """
    171179
    172180    def setUp(self):
    173         factory = DummyFactory()
    174         self.streamManager = subprotocols.StreamManager(factory)
     181        factory = xmlstream.XmlStreamFactory(xmlstream.Authenticator())
     182        self.clock = task.Clock()
     183        self.streamManager = subprotocols.StreamManager(factory, self.clock)
     184        self.xmlstream = factory.buildProtocol(None)
     185        self.transport = proto_helpers.StringTransport()
     186        self.xmlstream.transport = self.transport
     187
     188        self.request = IQGetStanza()
     189
     190    def _streamStarted(self):
     191        """
     192        Bring the test stream to the initialized state.
     193        """
     194        self.xmlstream.connectionMade()
     195        self.xmlstream.dataReceived(
     196                "<stream:stream xmlns='jabber:client' "
     197                    "xmlns:stream='http://etherx.jabber.org/streams' "
     198                    "from='example.com' id='12345'>")
     199        self.xmlstream.dispatch(self.xmlstream, "//event/stream/authd")
     200
    175201
    176202    def test_basic(self):
    177203        """
    178204        Test correct initialization and setup of factory observers.
    179205        """
    180         sm = self.streamManager
     206        factory = DummyFactory()
     207        sm = subprotocols.StreamManager(factory)
    181208        self.assertIdentical(None, sm.xmlstream)
    182209        self.assertEquals([], sm.handlers)
    183210        self.assertEquals(sm._connected,
     
    407434        self.assertEquals(0, handler.nestedHandler.doneLost)
    408435
    409436
     437
    410438    def test_removeHandler(self):
    411439        """
    412440        Test removal of protocol handler.
     
    418446        self.assertNotIn(handler, sm)
    419447        self.assertIdentical(None, handler.parent)
    420448
     449
    421450    def test_sendInitialized(self):
    422451        """
    423452        Test send when the stream has been initialized.
     
    509538        self.assertEquals("<presence/>", sm._packetQueue[0])
    510539
    511540
     541    def test_requestSendInitialized(self):
     542        """
     543        A request is sent out over the wire when the stream is initialized.
     544        """
     545        self._streamStarted()
     546
     547        self.streamManager.request(self.request)
     548        expected = u"<iq type='get' id='%s'/>" % self.request.stanzaID
     549        self.assertEquals(expected, self.transport.value())
     550
     551
     552    def test_requestSendInitializedFreshID(self):
     553        """
     554        A request without an ID gets a fresh one upon send.
     555        """
     556        self._streamStarted()
     557
     558        self.request.stanzaID = None
     559        self.streamManager.request(self.request)
     560        self.assertNotIdentical(None, self.request.stanzaID)
     561        expected = u"<iq type='get' id='%s'/>" % self.request.stanzaID
     562        self.assertEquals(expected, self.transport.value())
     563
     564
     565    def test_requestSendNotConnected(self):
     566        """
     567        A request is queued until a stream is initialized.
     568        """
     569        handler = DummyXMPPHandler()
     570        self.streamManager.addHandler(handler)
     571
     572        self.streamManager.request(self.request)
     573        expected = u"<iq type='get' id='test'/>"
     574
     575        xs = self.xmlstream
     576        self.assertEquals("", xs.transport.value())
     577
     578        xs.connectionMade()
     579        self.assertEquals("", xs.transport.value())
     580
     581        xs.dataReceived("<stream:stream xmlns='jabber:client' "
     582                        "xmlns:stream='http://etherx.jabber.org/streams' "
     583                        "from='example.com' id='12345'>")
     584        xs.dispatch(xs, "//event/stream/authd")
     585
     586        self.assertEquals(expected, xs.transport.value())
     587        self.assertFalse(self.streamManager._packetQueue)
     588
     589
     590    def test_requestResultResponse(self):
     591        """
     592        A result response gets the request deferred fired with the response.
     593        """
     594        def cb(result):
     595            self.assertEquals(result['type'], 'result')
     596
     597        self._streamStarted()
     598        d = self.streamManager.request(self.request)
     599        d.addCallback(cb)
     600
     601        xs = self.xmlstream
     602        xs.dataReceived("<iq type='result' id='test'/>")
     603        return d
     604
     605
     606    def test_requestErrorResponse(self):
     607        """
     608        An error response gets the request deferred fired with a failure.
     609        """
     610        self._streamStarted()
     611        d = self.streamManager.request(self.request)
     612        self.assertFailure(d, error.StanzaError)
     613
     614        xs = self.xmlstream
     615        xs.dataReceived("<iq type='error' id='test'/>")
     616        return d
     617
     618
     619    def test_requestNonTrackedResponse(self):
     620        """
     621        Test that untracked iq responses don't trigger any action.
     622
     623        Untracked means that the id of the incoming response iq is not
     624        in the stream's C{iqDeferreds} dictionary.
     625        """
     626        # Set up a fallback handler that checks the stanza's handled attribute.
     627        # If that is set to True, the iq tracker claims to have handled the
     628        # response.
     629        dispatched = []
     630        def cb(iq):
     631            dispatched.append(iq)
     632
     633        self._streamStarted()
     634        self.xmlstream.addObserver("/iq", cb, -1)
     635
     636        # Receive an untracked iq response
     637        self.xmlstream.dataReceived("<iq type='result' id='other'/>")
     638        self.assertEquals(1, len(dispatched))
     639        self.assertFalse(getattr(dispatched[-1], 'handled', False))
     640
     641
     642    def test_requestCleanup(self):
     643        """
     644        Test if the deferred associated with an iq request is removed
     645        from the list kept in the L{XmlStream} object after it has
     646        been fired.
     647        """
     648        self._streamStarted()
     649        d = self.streamManager.request(self.request)
     650        xs = self.xmlstream
     651        xs.dataReceived("<iq type='result' id='test'/>")
     652        self.assertNotIn('test', self.streamManager._iqDeferreds)
     653        return d
     654
     655
     656    def test_requestDisconnectCleanup(self):
     657        """
     658        Test if deferreds for iq's that haven't yet received a response
     659        have their errback called on stream disconnect.
     660        """
     661        d = self.streamManager.request(self.request)
     662        xs = self.xmlstream
     663        xs.connectionLost(failure.Failure(ConnectionDone()))
     664        self.assertFailure(d, ConnectionDone)
     665        return d
     666
     667
     668    def test_requestNoModifyingDict(self):
     669        """
     670        Test to make sure the errbacks cannot cause the iteration of the
     671        iqDeferreds to blow up in our face.
     672        """
     673
     674        def eb(failure):
     675            d = xmlstream.IQ(self.xmlstream).send()
     676            d.addErrback(eb)
     677
     678        d = self.streamManager.request(self.request)
     679        d.addErrback(eb)
     680        self.xmlstream.connectionLost(failure.Failure(ConnectionDone()))
     681        return d
     682
     683
     684    def test_requestTimingOut(self):
     685        """
     686        Test that an iq request with a defined timeout times out.
     687        """
     688        self.request.timeout = 60
     689        d = self.streamManager.request(self.request)
     690        self.assertFailure(d, xmlstream.TimeoutError)
     691
     692        self.clock.pump([1, 60])
     693        self.assertFalse(self.clock.calls)
     694        self.assertFalse(self.streamManager._iqDeferreds)
     695        return d
     696
     697
     698    def test_requestNotTimingOut(self):
     699        """
     700        Test that an iq request with a defined timeout does not time out
     701        when a response was received before the timeout period elapsed.
     702        """
     703        self._streamStarted()
     704        self.request.timeout = 60
     705        d = self.streamManager.request(self.request)
     706        self.clock.callLater(1, self.xmlstream.dataReceived,
     707                             "<iq type='result' id='test'/>")
     708        self.clock.pump([1, 1])
     709        self.assertFalse(self.clock.calls)
     710        return d
     711
     712
     713    def test_requestDisconnectTimeoutCancellation(self):
     714        """
     715        Test if timeouts for iq's that haven't yet received a response
     716        are cancelled on stream disconnect.
     717        """
     718
     719        self.request.timeout = 60
     720        d = self.streamManager.request(self.request)
     721
     722        self.xmlstream.connectionLost(failure.Failure(ConnectionDone()))
     723        self.assertFailure(d, ConnectionDone)
     724        self.assertFalse(self.clock.calls)
     725        return d
     726
     727
     728    def test_requestNotIQ(self):
     729        """
     730        The request stanza must be an iq.
     731        """
     732        stanza = generic.Stanza()
     733        stanza.stanzaKind = 'message'
     734
     735        d = self.streamManager.request(stanza)
     736        self.assertFailure(d, ValueError)
     737
     738
     739    def test_requestNotResult(self):
     740        """
     741        The request stanza cannot be of type 'result'.
     742        """
     743        stanza = generic.Stanza()
     744        stanza.stanzaKind = 'iq'
     745        stanza.stanzaType = 'result'
     746
     747        d = self.streamManager.request(stanza)
     748        self.assertFailure(d, ValueError)
     749
     750
     751    def test_requestNotError(self):
     752        """
     753        The request stanza cannot be of type 'error'.
     754        """
     755        stanza = generic.Stanza()
     756        stanza.stanzaKind = 'iq'
     757        stanza.stanzaType = 'error'
     758
     759        d = self.streamManager.request(stanza)
     760        self.assertFailure(d, ValueError)
     761
     762
    512763
    513764class DummyIQHandler(subprotocols.IQHandlerMixin):
    514765    iqHandlers = {'/iq[@type="get"]': 'onGet'}
Note: See TracBrowser for help on using the repository browser.