Ignore:
Timestamp:
Aug 3, 2011, 9:49:21 AM (9 years ago)
Author:
Ralph Meijer <ralphm@…>
Branch:
default
Message:

Add iq request (set/get) tracking for StreamManager?.

This adds a new request method to StreamManager? to serialize an iq get or iq
set request object (using toElement), send it over the stream (or queue it
until the next time a stream is initialized) and track the response stanza by
returning a Deferred.

Requests may have a timeout, so that the deferred errbacks after the timeout
(counting from the moment of calling request) has expired.

This implementation is similar to the response tracking done in
twisted.words.protocols.jabber.xmlstream.IQ, but slighly different:

  • Requests can be queued until the connection is initialized, even before a XmlStream? object is available. I.e. it is tied to the StreamManager?, not the stream.
  • Response timeout starts counting from the moment of calling request, not from the moment request is sent over the wire.
File:
1 edited

Legend:

Unmodified
Added
Removed
  • wokkel/test/test_subprotocols.py

    r99 r100  
    1010from twisted.trial import unittest
    1111from twisted.test import proto_helpers
    12 from twisted.internet import defer
     12from twisted.internet import defer, task
     13from twisted.internet.error import ConnectionDone
    1314from twisted.python import failure
    1415from twisted.words.xish import domish
    1516from twisted.words.protocols.jabber import error, xmlstream
    1617
    17 from wokkel import iwokkel, subprotocols
     18from wokkel import generic, iwokkel, subprotocols
    1819
    1920class DummyFactory(object):
     
    165166
    166167
     168class IQGetStanza(generic.Stanza):
     169    stanzaKind = 'iq'
     170    stanzaType = 'get'
     171    stanzaID = 'test'
     172
     173
     174
    167175class StreamManagerTest(unittest.TestCase):
    168176    """
     
    171179
    172180    def setUp(self):
     181        factory = xmlstream.XmlStreamFactory(xmlstream.Authenticator())
     182        self.clock = task.Clock()
     183        self.streamManager = subprotocols.StreamManager(factory, self.clock)
     184        self.xmlstream = factory.buildProtocol(None)
     185        self.transport = proto_helpers.StringTransport()
     186        self.xmlstream.transport = self.transport
     187
     188        self.request = IQGetStanza()
     189
     190    def _streamStarted(self):
     191        """
     192        Bring the test stream to the initialized state.
     193        """
     194        self.xmlstream.connectionMade()
     195        self.xmlstream.dataReceived(
     196                "<stream:stream xmlns='jabber:client' "
     197                    "xmlns:stream='http://etherx.jabber.org/streams' "
     198                    "from='example.com' id='12345'>")
     199        self.xmlstream.dispatch(self.xmlstream, "//event/stream/authd")
     200
     201
     202    def test_basic(self):
     203        """
     204        Test correct initialization and setup of factory observers.
     205        """
    173206        factory = DummyFactory()
    174         self.streamManager = subprotocols.StreamManager(factory)
    175 
    176     def test_basic(self):
    177         """
    178         Test correct initialization and setup of factory observers.
    179         """
    180         sm = self.streamManager
     207        sm = subprotocols.StreamManager(factory)
    181208        self.assertIdentical(None, sm.xmlstream)
    182209        self.assertEquals([], sm.handlers)
     
    408435
    409436
     437
    410438    def test_removeHandler(self):
    411439        """
     
    418446        self.assertNotIn(handler, sm)
    419447        self.assertIdentical(None, handler.parent)
     448
    420449
    421450    def test_sendInitialized(self):
     
    508537        self.assertEquals("", xs.transport.value())
    509538        self.assertEquals("<presence/>", sm._packetQueue[0])
     539
     540
     541    def test_requestSendInitialized(self):
     542        """
     543        A request is sent out over the wire when the stream is initialized.
     544        """
     545        self._streamStarted()
     546
     547        self.streamManager.request(self.request)
     548        expected = u"<iq type='get' id='%s'/>" % self.request.stanzaID
     549        self.assertEquals(expected, self.transport.value())
     550
     551
     552    def test_requestSendInitializedFreshID(self):
     553        """
     554        A request without an ID gets a fresh one upon send.
     555        """
     556        self._streamStarted()
     557
     558        self.request.stanzaID = None
     559        self.streamManager.request(self.request)
     560        self.assertNotIdentical(None, self.request.stanzaID)
     561        expected = u"<iq type='get' id='%s'/>" % self.request.stanzaID
     562        self.assertEquals(expected, self.transport.value())
     563
     564
     565    def test_requestSendNotConnected(self):
     566        """
     567        A request is queued until a stream is initialized.
     568        """
     569        handler = DummyXMPPHandler()
     570        self.streamManager.addHandler(handler)
     571
     572        self.streamManager.request(self.request)
     573        expected = u"<iq type='get' id='test'/>"
     574
     575        xs = self.xmlstream
     576        self.assertEquals("", xs.transport.value())
     577
     578        xs.connectionMade()
     579        self.assertEquals("", xs.transport.value())
     580
     581        xs.dataReceived("<stream:stream xmlns='jabber:client' "
     582                        "xmlns:stream='http://etherx.jabber.org/streams' "
     583                        "from='example.com' id='12345'>")
     584        xs.dispatch(xs, "//event/stream/authd")
     585
     586        self.assertEquals(expected, xs.transport.value())
     587        self.assertFalse(self.streamManager._packetQueue)
     588
     589
     590    def test_requestResultResponse(self):
     591        """
     592        A result response gets the request deferred fired with the response.
     593        """
     594        def cb(result):
     595            self.assertEquals(result['type'], 'result')
     596
     597        self._streamStarted()
     598        d = self.streamManager.request(self.request)
     599        d.addCallback(cb)
     600
     601        xs = self.xmlstream
     602        xs.dataReceived("<iq type='result' id='test'/>")
     603        return d
     604
     605
     606    def test_requestErrorResponse(self):
     607        """
     608        An error response gets the request deferred fired with a failure.
     609        """
     610        self._streamStarted()
     611        d = self.streamManager.request(self.request)
     612        self.assertFailure(d, error.StanzaError)
     613
     614        xs = self.xmlstream
     615        xs.dataReceived("<iq type='error' id='test'/>")
     616        return d
     617
     618
     619    def test_requestNonTrackedResponse(self):
     620        """
     621        Test that untracked iq responses don't trigger any action.
     622
     623        Untracked means that the id of the incoming response iq is not
     624        in the stream's C{iqDeferreds} dictionary.
     625        """
     626        # Set up a fallback handler that checks the stanza's handled attribute.
     627        # If that is set to True, the iq tracker claims to have handled the
     628        # response.
     629        dispatched = []
     630        def cb(iq):
     631            dispatched.append(iq)
     632
     633        self._streamStarted()
     634        self.xmlstream.addObserver("/iq", cb, -1)
     635
     636        # Receive an untracked iq response
     637        self.xmlstream.dataReceived("<iq type='result' id='other'/>")
     638        self.assertEquals(1, len(dispatched))
     639        self.assertFalse(getattr(dispatched[-1], 'handled', False))
     640
     641
     642    def test_requestCleanup(self):
     643        """
     644        Test if the deferred associated with an iq request is removed
     645        from the list kept in the L{XmlStream} object after it has
     646        been fired.
     647        """
     648        self._streamStarted()
     649        d = self.streamManager.request(self.request)
     650        xs = self.xmlstream
     651        xs.dataReceived("<iq type='result' id='test'/>")
     652        self.assertNotIn('test', self.streamManager._iqDeferreds)
     653        return d
     654
     655
     656    def test_requestDisconnectCleanup(self):
     657        """
     658        Test if deferreds for iq's that haven't yet received a response
     659        have their errback called on stream disconnect.
     660        """
     661        d = self.streamManager.request(self.request)
     662        xs = self.xmlstream
     663        xs.connectionLost(failure.Failure(ConnectionDone()))
     664        self.assertFailure(d, ConnectionDone)
     665        return d
     666
     667
     668    def test_requestNoModifyingDict(self):
     669        """
     670        Test to make sure the errbacks cannot cause the iteration of the
     671        iqDeferreds to blow up in our face.
     672        """
     673
     674        def eb(failure):
     675            d = xmlstream.IQ(self.xmlstream).send()
     676            d.addErrback(eb)
     677
     678        d = self.streamManager.request(self.request)
     679        d.addErrback(eb)
     680        self.xmlstream.connectionLost(failure.Failure(ConnectionDone()))
     681        return d
     682
     683
     684    def test_requestTimingOut(self):
     685        """
     686        Test that an iq request with a defined timeout times out.
     687        """
     688        self.request.timeout = 60
     689        d = self.streamManager.request(self.request)
     690        self.assertFailure(d, xmlstream.TimeoutError)
     691
     692        self.clock.pump([1, 60])
     693        self.assertFalse(self.clock.calls)
     694        self.assertFalse(self.streamManager._iqDeferreds)
     695        return d
     696
     697
     698    def test_requestNotTimingOut(self):
     699        """
     700        Test that an iq request with a defined timeout does not time out
     701        when a response was received before the timeout period elapsed.
     702        """
     703        self._streamStarted()
     704        self.request.timeout = 60
     705        d = self.streamManager.request(self.request)
     706        self.clock.callLater(1, self.xmlstream.dataReceived,
     707                             "<iq type='result' id='test'/>")
     708        self.clock.pump([1, 1])
     709        self.assertFalse(self.clock.calls)
     710        return d
     711
     712
     713    def test_requestDisconnectTimeoutCancellation(self):
     714        """
     715        Test if timeouts for iq's that haven't yet received a response
     716        are cancelled on stream disconnect.
     717        """
     718
     719        self.request.timeout = 60
     720        d = self.streamManager.request(self.request)
     721
     722        self.xmlstream.connectionLost(failure.Failure(ConnectionDone()))
     723        self.assertFailure(d, ConnectionDone)
     724        self.assertFalse(self.clock.calls)
     725        return d
     726
     727
     728    def test_requestNotIQ(self):
     729        """
     730        The request stanza must be an iq.
     731        """
     732        stanza = generic.Stanza()
     733        stanza.stanzaKind = 'message'
     734
     735        d = self.streamManager.request(stanza)
     736        self.assertFailure(d, ValueError)
     737
     738
     739    def test_requestNotResult(self):
     740        """
     741        The request stanza cannot be of type 'result'.
     742        """
     743        stanza = generic.Stanza()
     744        stanza.stanzaKind = 'iq'
     745        stanza.stanzaType = 'result'
     746
     747        d = self.streamManager.request(stanza)
     748        self.assertFailure(d, ValueError)
     749
     750
     751    def test_requestNotError(self):
     752        """
     753        The request stanza cannot be of type 'error'.
     754        """
     755        stanza = generic.Stanza()
     756        stanza.stanzaKind = 'iq'
     757        stanza.stanzaType = 'error'
     758
     759        d = self.streamManager.request(stanza)
     760        self.assertFailure(d, ValueError)
    510761
    511762
Note: See TracChangeset for help on using the changeset viewer.