Ignore:
Timestamp:
Jul 6, 2009, 9:26:47 AM (12 years ago)
Author:
Ralph Meijer <ralphm@…>
Branch:
default
Convert:
svn:b33ecbfc-034c-dc11-8662-000475d9059e/trunk@175
Message:

Make IQ timeouts work for InternalComponents?.

This introduces wokkel.compat.IQ to work with older Twisted versions, too.

Author: ralphm.
Fixes #53.

File:
1 edited

Legend:

Unmodified
Added
Removed
  • wokkel/test/test_compat.py

    r53 r63  
    77"""
    88
     9from zope.interface import implements
    910from zope.interface.verify import verifyObject
    10 from twisted.internet import defer, protocol
    11 from twisted.internet.interfaces import IProtocolFactory
     11from twisted.internet import protocol, task
     12from twisted.internet.interfaces import IProtocolFactory, IReactorTime
    1213from twisted.trial import unittest
    13 from twisted.words.xish import domish, utility
     14from twisted.words.xish import utility
    1415from twisted.words.protocols.jabber import xmlstream
    15 from wokkel.compat import BootstrapMixin, XmlStreamServerFactory
     16from wokkel.compat import BootstrapMixin, IQ, XmlStreamServerFactory
    1617
    1718class DummyProtocol(protocol.Protocol, utility.EventDispatcher):
     
    166167        xs = self.factory.buildProtocol(None)
    167168        self.assertIdentical(self.factory, xs.factory)
     169
     170
     171
     172class FakeReactor(object):
     173
     174    implements(IReactorTime)
     175    def __init__(self):
     176        self.clock = task.Clock()
     177        self.callLater = self.clock.callLater
     178        self.getDelayedCalls = self.clock.getDelayedCalls
     179
     180
     181
     182class IQTest(unittest.TestCase):
     183    """
     184    Tests for L{IQ}.
     185    """
     186
     187    def setUp(self):
     188        self.reactor = FakeReactor()
     189        self.clock = self.reactor.clock
     190
     191
     192    def testRequestTimingOutEventDispatcher(self):
     193        """
     194        Test that an iq request with a defined timeout times out.
     195        """
     196        from twisted.words.xish import utility
     197        output = []
     198        xs = utility.EventDispatcher()
     199        xs.send = output.append
     200
     201        self.iq = IQ(xs, reactor=self.reactor)
     202        self.iq.timeout = 60
     203        d = self.iq.send()
     204        self.assertFailure(d, xmlstream.TimeoutError)
     205
     206        self.clock.pump([1, 60])
     207        self.assertFalse(self.reactor.getDelayedCalls())
     208        self.assertFalse(xs.iqDeferreds)
     209        return d
Note: See TracChangeset for help on using the changeset viewer.