source: ralphm-patches/request-tracking.patch @ 47:f6d222b68f1c

Last change on this file since 47:f6d222b68f1c was 47:f6d222b68f1c, checked in by Ralph Meijer <ralphm@…>, 9 years ago

Many new patches, upstreamed affiliations and worked on pubsub item.
Upstreamed pubsub_manage_affiliations

File size: 14.9 KB
  • wokkel/subprotocols.py

    # HG changeset patch
    # Parent 60dcf9e2441bb510f1da17e6bc4401ef7b6db0e9
    Add iq request (set/get) tracking for StreamManager.
    
    This adds a new `request` method to StreamManager to serialize an iq get or iq
    set request object (using toElement), send it over the stream (or queue it
    until the next time a stream is initialized) and track the response stanza by
    returning a Deferred.
    
    Requests may have a timeout, so that the deferred errbacks after the timeout
    (counting from the moment of calling `request`) has expired.
    
    This implementation is similar to the response tracking done in
    `twisted.words.protocols.jabber.xmlstream.IQ`, but slighly different:
    
      * Requests can be queued until the connection is initialized, even before
        a XmlStream object is available. I.e. it is tied to the StreamManager, not
        the stream.
      * Response timeout starts counting from the moment of calling `request`, not
        from the moment request is sent over the wire.
    
    diff -r 60dcf9e2441b wokkel/subprotocols.py
    a b  
    156156    @type _initialized: C{bool}
    157157    @ivar _packetQueue: internal buffer of unsent data. See L{send} for details.
    158158    @type _packetQueue: L{list}
     159    @ivar timeout: Default IQ request timeout in seconds.
     160    @type timeout: C{int}
     161    @ivar _reactor: A provider of L{IReactorTime} to track timeouts.
    159162    """
     163    timeout = None
     164    _reactor = None
    160165
    161166    logTraffic = False
    162167
    163     def __init__(self, factory):
     168    def __init__(self, factory, reactor=None):
     169        """
     170        Construct a stream manager.
     171
     172        @param factory: The stream factory to connect with.
     173        @param reactor: A provider of L{IReactorTime} to track timeouts.
     174            If not provided, the global reactor will be used.
     175        """
    164176        XMPPHandlerCollection.__init__(self)
    165177        self.xmlstream = None
    166178        self._packetQueue = []
     
    173185        factory.addBootstrap(xmlstream.STREAM_END_EVENT, self._disconnected)
    174186        self.factory = factory
    175187
     188        if reactor is None:
     189            from twisted.internet import reactor
     190        self._reactor = reactor
     191
     192        # Set up IQ response tracking
     193        self._iqDeferreds = {}
     194
    176195
    177196    def addHandler(self, handler):
    178197        """
     
    222241        Send out cached stanzas and call each handler's
    223242        C{connectionInitialized} method.
    224243        """
     244
     245        xs.addObserver('/iq[@type="result"]', self._onIQResponse)
     246        xs.addObserver('/iq[@type="error"]', self._onIQResponse)
     247
    225248        # Flush all pending packets
    226249        for p in self._packetQueue:
    227250            xs.send(p)
     
    268291        for e in list(self):
    269292            e.connectionLost(reason)
    270293
     294        # This errbacks all deferreds of iq's for which no response has
     295        # been received with a L{ConnectionLost} failure. Otherwise, the
     296        # deferreds will never be fired.
     297        iqDeferreds = self._iqDeferreds
     298        self._iqDeferreds = {}
     299        for d in iqDeferreds.itervalues():
     300            d.errback(reason)
     301
     302
     303    def _onIQResponse(self, iq):
     304        """
     305        Handle iq response by firing associated deferred.
     306        """
     307        if getattr(iq, 'handled', False):
     308            return
     309
     310        try:
     311            d = self._iqDeferreds[iq["id"]]
     312        except KeyError:
     313            return
     314
     315        del self._iqDeferreds[iq["id"]]
     316        iq.handled = True
     317        if iq['type'] == 'error':
     318            d.errback(error.exceptionFromStanza(iq))
     319        else:
     320            d.callback(iq)
     321
    271322
    272323    def send(self, obj):
    273324        """
     
    285336            self._packetQueue.append(obj)
    286337
    287338
     339    def request(self, request):
     340        """
     341        Send an IQ request and track the response.
     342
     343        A request is an IQ L{generic.Stanza} of type C{'get'} or C{'set'}. It
     344        will have its C{toElement} called to render to a L{domish.Element}
     345        which is then sent out over the current stream. If there is no such
     346        stream (yet), it is queued and sent whenever a connection is
     347        established and initialized, just like L{send}.
     348
     349        If the request doesn't have an identifier, it will be assigned a fresh
     350        one, so the response can be tracked.
     351
     352        The deferred that is returned will fire with the L{domish.Element}
     353        representation of the response if it is a result iq. If the response
     354        is an error iq, a corresponding L{error.StanzaError} will be errbacked.
     355
     356        If the connection is closed before a response was received, the deferred
     357        will be errbacked with the reason failure.
     358
     359        A request may also have a timeout, either by setting a default timeout
     360        in L{StreamManager.timeout} or on the C{timeout} attribute of the
     361        request.
     362
     363        @param request: The IQ request.
     364        @type request: L{generic.Request}
     365        """
     366        if (request.stanzaKind != 'iq' or
     367            request.stanzaType not in ('get', 'set')):
     368            return defer.fail(Exception("Not a request"))
     369
     370        element = request.toElement()
     371
     372        # Make sure we have a trackable id on the stanza
     373        if not request.stanzaID:
     374            element.addUniqueId()
     375            request.stanzaID = element['id']
     376
     377        # Set up iq response tracking
     378        d = defer.Deferred()
     379        self._iqDeferreds[element['id']] = d
     380
     381        timeout = request.timeout or self.timeout
     382
     383        if timeout is not None:
     384            def onTimeout():
     385                del self._iqDeferreds[element['id']]
     386                d.errback(xmlstream.TimeoutError("IQ timed out"))
     387
     388            call = self._reactor.callLater(timeout, onTimeout)
     389
     390            def cancelTimeout(result):
     391                if call.active():
     392                    call.cancel()
     393
     394                return result
     395
     396            d.addBoth(cancelTimeout)
     397        self.send(element)
     398        return d
     399
     400
    288401
    289402class IQHandlerMixin(object):
    290403    """
  • wokkel/test/test_subprotocols.py

    diff -r 60dcf9e2441b wokkel/test/test_subprotocols.py
    a b  
    99
    1010from twisted.trial import unittest
    1111from twisted.test import proto_helpers
    12 from twisted.internet import defer
     12from twisted.internet import defer, task
     13from twisted.internet.error import ConnectionDone
    1314from twisted.python import failure
    1415from twisted.words.xish import domish
    1516from twisted.words.protocols.jabber import error, xmlstream
    1617
    17 from wokkel import iwokkel, subprotocols
     18from wokkel import generic, iwokkel, subprotocols
    1819
    1920class DummyFactory(object):
    2021    """
     
    164165
    165166
    166167
     168class IQGetStanza(generic.Stanza):
     169    timeout = None
     170
     171    stanzaKind = 'iq'
     172    stanzaType = 'get'
     173    stanzaID = 'test'
     174
     175
     176
    167177class StreamManagerTest(unittest.TestCase):
    168178    """
    169179    Tests for L{subprotocols.StreamManager}.
    170180    """
    171181
    172182    def setUp(self):
    173         factory = DummyFactory()
    174         self.streamManager = subprotocols.StreamManager(factory)
     183        factory = xmlstream.XmlStreamFactory(xmlstream.Authenticator())
     184        self.clock = task.Clock()
     185        self.streamManager = subprotocols.StreamManager(factory, self.clock)
     186        self.xmlstream = factory.buildProtocol(None)
     187        self.transport = proto_helpers.StringTransport()
     188        self.xmlstream.transport = self.transport
     189
     190        self.request = IQGetStanza()
     191
     192    def _streamStarted(self):
     193        """
     194        Bring the test stream to the initialized state.
     195        """
     196        self.xmlstream.connectionMade()
     197        self.xmlstream.dataReceived(
     198                "<stream:stream xmlns='jabber:client' "
     199                    "xmlns:stream='http://etherx.jabber.org/streams' "
     200                    "from='example.com' id='12345'>")
     201        self.xmlstream.dispatch(self.xmlstream, "//event/stream/authd")
     202
    175203
    176204    def test_basic(self):
    177205        """
    178206        Test correct initialization and setup of factory observers.
    179207        """
    180         sm = self.streamManager
     208        factory = DummyFactory()
     209        sm = subprotocols.StreamManager(factory)
    181210        self.assertIdentical(None, sm.xmlstream)
    182211        self.assertEquals([], sm.handlers)
    183212        self.assertEquals(sm._connected,
     
    407436        self.assertEquals(0, handler.nestedHandler.doneLost)
    408437
    409438
     439
    410440    def test_removeHandler(self):
    411441        """
    412442        Test removal of protocol handler.
     
    418448        self.assertNotIn(handler, sm)
    419449        self.assertIdentical(None, handler.parent)
    420450
     451
    421452    def test_sendInitialized(self):
    422453        """
    423454        Test send when the stream has been initialized.
     
    509540        self.assertEquals("<presence/>", sm._packetQueue[0])
    510541
    511542
     543    def test_requestSendInitialized(self):
     544        """
     545        A request is sent out over the wire when the stream is initialized.
     546        """
     547        self._streamStarted()
     548
     549        self.streamManager.request(self.request)
     550        expected = u"<iq type='get' id='%s'/>" % self.request.stanzaID
     551        self.assertEquals(expected, self.transport.value())
     552
     553
     554    def test_requestSendInitializedFreshID(self):
     555        """
     556        A request without an ID gets a fresh one upon send.
     557        """
     558        self._streamStarted()
     559
     560        self.request.stanzaID = None
     561        self.streamManager.request(self.request)
     562        self.assertNotIdentical(None, self.request.stanzaID)
     563        expected = u"<iq type='get' id='%s'/>" % self.request.stanzaID
     564        self.assertEquals(expected, self.transport.value())
     565
     566
     567    def test_requestSendNotConnected(self):
     568        """
     569        A request is queued until a stream is initialized.
     570        """
     571        handler = DummyXMPPHandler()
     572        self.streamManager.addHandler(handler)
     573
     574        self.streamManager.request(self.request)
     575        expected = u"<iq type='get' id='test'/>"
     576
     577        xs = self.xmlstream
     578        self.assertEquals("", xs.transport.value())
     579
     580        xs.connectionMade()
     581        self.assertEquals("", xs.transport.value())
     582
     583        xs.dataReceived("<stream:stream xmlns='jabber:client' "
     584                        "xmlns:stream='http://etherx.jabber.org/streams' "
     585                        "from='example.com' id='12345'>")
     586        xs.dispatch(xs, "//event/stream/authd")
     587
     588        self.assertEquals(expected, xs.transport.value())
     589        self.assertFalse(self.streamManager._packetQueue)
     590
     591
     592    def test_requestResultResponse(self):
     593        """
     594        A result response gets the request deferred fired with the response.
     595        """
     596        def cb(result):
     597            self.assertEquals(result['type'], 'result')
     598
     599        self._streamStarted()
     600        d = self.streamManager.request(self.request)
     601        d.addCallback(cb)
     602
     603        xs = self.xmlstream
     604        xs.dataReceived("<iq type='result' id='test'/>")
     605        return d
     606
     607
     608    def test_requestErrorResponse(self):
     609        """
     610        An error response gets the request deferred fired with a failure.
     611        """
     612        self._streamStarted()
     613        d = self.streamManager.request(self.request)
     614        self.assertFailure(d, error.StanzaError)
     615
     616        xs = self.xmlstream
     617        xs.dataReceived("<iq type='error' id='test'/>")
     618        return d
     619
     620
     621    def test_requestNonTrackedResponse(self):
     622        """
     623        Test that untracked iq responses don't trigger any action.
     624
     625        Untracked means that the id of the incoming response iq is not
     626        in the stream's C{iqDeferreds} dictionary.
     627        """
     628        # Set up a fallback handler that checks the stanza's handled attribute.
     629        # If that is set to True, the iq tracker claims to have handled the
     630        # response.
     631        dispatched = []
     632        def cb(iq):
     633            dispatched.append(iq)
     634
     635        self._streamStarted()
     636        self.xmlstream.addObserver("/iq", cb, -1)
     637
     638        # Receive an untracked iq response
     639        self.xmlstream.dataReceived("<iq type='result' id='other'/>")
     640        self.assertEquals(1, len(dispatched))
     641        self.assertFalse(getattr(dispatched[-1], 'handled', False))
     642
     643
     644    def test_requestCleanup(self):
     645        """
     646        Test if the deferred associated with an iq request is removed
     647        from the list kept in the L{XmlStream} object after it has
     648        been fired.
     649        """
     650        self._streamStarted()
     651        d = self.streamManager.request(self.request)
     652        xs = self.xmlstream
     653        xs.dataReceived("<iq type='result' id='test'/>")
     654        self.assertNotIn('test', self.streamManager._iqDeferreds)
     655        return d
     656
     657
     658    def test_requestDisconnectCleanup(self):
     659        """
     660        Test if deferreds for iq's that haven't yet received a response
     661        have their errback called on stream disconnect.
     662        """
     663        d = self.streamManager.request(self.request)
     664        xs = self.xmlstream
     665        xs.connectionLost(failure.Failure(ConnectionDone()))
     666        self.assertFailure(d, ConnectionDone)
     667        return d
     668
     669
     670    def test_requestNoModifyingDict(self):
     671        """
     672        Test to make sure the errbacks cannot cause the iteration of the
     673        iqDeferreds to blow up in our face.
     674        """
     675
     676        def eb(failure):
     677            d = xmlstream.IQ(self.xmlstream).send()
     678            d.addErrback(eb)
     679
     680        d = self.streamManager.request(self.request)
     681        d.addErrback(eb)
     682        self.xmlstream.connectionLost(failure.Failure(ConnectionDone()))
     683        return d
     684
     685
     686    def test_requestTimingOut(self):
     687        """
     688        Test that an iq request with a defined timeout times out.
     689        """
     690        self.request.timeout = 60
     691        d = self.streamManager.request(self.request)
     692        self.assertFailure(d, xmlstream.TimeoutError)
     693
     694        self.clock.pump([1, 60])
     695        self.assertFalse(self.clock.calls)
     696        self.assertFalse(self.streamManager._iqDeferreds)
     697        return d
     698
     699
     700    def test_requestNotTimingOut(self):
     701        """
     702        Test that an iq request with a defined timeout does not time out
     703        when a response was received before the timeout period elapsed.
     704        """
     705        self._streamStarted()
     706        self.request.timeout = 60
     707        d = self.streamManager.request(self.request)
     708        self.clock.callLater(1, self.xmlstream.dataReceived,
     709                             "<iq type='result' id='test'/>")
     710        self.clock.pump([1, 1])
     711        self.assertFalse(self.clock.calls)
     712        return d
     713
     714
     715    def test_requestDisconnectTimeoutCancellation(self):
     716        """
     717        Test if timeouts for iq's that haven't yet received a response
     718        are cancelled on stream disconnect.
     719        """
     720
     721        self.request.timeout = 60
     722        d = self.streamManager.request(self.request)
     723
     724        xs = self.xmlstream
     725        self.xmlstream.connectionLost(failure.Failure(ConnectionDone()))
     726        self.assertFailure(d, ConnectionDone)
     727        self.assertFalse(self.clock.calls)
     728        return d
     729
     730
    512731
    513732class DummyIQHandler(subprotocols.IQHandlerMixin):
    514733    iqHandlers = {'/iq[@type="get"]': 'onGet'}
Note: See TracBrowser for help on using the repository browser.