source: ralphm-patches/server-stream-manager.patch @ 79:0752a1cca356

Last change on this file since 79:0752a1cca356 was 79:0752a1cca356, checked in by Ralph Meijer <ralphm@…>, 4 years ago

Split out stanza module and response tracking patches, fix other patches.

File size: 19.7 KB
  • wokkel/client.py

    # HG changeset patch
    # Parent 3f3fe954b1975c2d9115e0fa8177ae7b28a708a8
    # Parent  e198cebf36146a768eda902efa5ee741e947fda8
    Generalize StreamManager and add ServerStreamManager.
    
    This generalizes `StreamManager` to `BaseStreamManager` to take the
    common functionality and reuse it for `ServerStreamManager`. Where
    `StreamManager` is used for initiating connections, with a factory as
    a parameter, `ServerStreamManager` works for receiving connections. It
    must be created in a protocol factory when a connection is established,
    and then its `makeConnection` should be called to hook it up to the
    `XmlStream` instance.
    
    diff --git a/wokkel/client.py b/wokkel/client.py
    a b  
    109109        self._connection.disconnect()
    110110
    111111
    112     def _authd(self, xs):
     112    def connectionInitialized(self, xs):
    113113        """
    114114        Called when the stream has been initialized.
    115115
     
    118118        by its constituent initializers.
    119119        """
    120120        self.jid = self.factory.authenticator.jid
    121         StreamManager._authd(self, xs)
     121        StreamManager.connectionInitialized(self, xs)
    122122
    123123
    124124    def initializationFailed(self, reason):
  • wokkel/subprotocols.py

    diff --git a/wokkel/subprotocols.py b/wokkel/subprotocols.py
    a b  
    122122
    123123
    124124
    125 class StreamManager(XMPPHandlerCollection):
     125class BaseStreamManager(XMPPHandlerCollection):
    126126    """
    127127    Business logic representing a managed XMPP connection.
    128128
     
    133133
    134134    @ivar xmlstream: currently managed XML stream
    135135    @type xmlstream: L{XmlStream}
     136
    136137    @ivar logTraffic: if true, log all traffic.
    137138    @type logTraffic: C{bool}
     139
    138140    @ivar _initialized: Whether the stream represented by L{xmlstream} has
    139141                        been initialized. This is used when caching outgoing
    140142                        stanzas.
    141143    @type _initialized: C{bool}
    142     @ivar _packetQueue: internal buffer of unsent data. See L{send} for details.
    143     @type _packetQueue: L{list}
     144
    144145    @ivar timeout: Default IQ request timeout in seconds.
    145146    @type timeout: C{int}
     147
    146148    @ivar _reactor: A provider of L{IReactorTime} to track timeouts.
    147149    """
     150
    148151    timeout = None
    149152    _reactor = None
    150153
    151154    logTraffic = False
    152155
    153     def __init__(self, factory, reactor=None):
     156    def __init__(self, reactor=None):
    154157        """
    155158        Construct a stream manager.
    156159
    157         @param factory: The stream factory to connect with.
    158160        @param reactor: A provider of L{IReactorTime} to track timeouts.
    159161            If not provided, the global reactor will be used.
    160162        """
    161163        XMPPHandlerCollection.__init__(self)
     164
    162165        self.xmlstream = None
    163166        self._packetQueue = []
    164167        self._initialized = False
    165168
    166         factory.addBootstrap(xmlstream.STREAM_CONNECTED_EVENT, self._connected)
    167         factory.addBootstrap(xmlstream.STREAM_AUTHD_EVENT, self._authd)
    168         factory.addBootstrap(xmlstream.INIT_FAILED_EVENT,
    169                              self.initializationFailed)
    170         factory.addBootstrap(xmlstream.STREAM_END_EVENT, self._disconnected)
    171         self.factory = factory
    172 
    173169        if reactor is None:
    174170            from twisted.internet import reactor
    175171        self._reactor = reactor
     
    195191            handler.connectionInitialized()
    196192
    197193
    198     def _connected(self, xs):
     194    def makeConnection(self, xs):
    199195        """
    200196        Called when the transport connection has been established.
    201197
     
    213209            xs.rawDataInFn = logDataIn
    214210            xs.rawDataOutFn = logDataOut
    215211
     212        xs.addObserver(xmlstream.STREAM_AUTHD_EVENT,
     213                       self.connectionInitialized)
     214        xs.addObserver(xmlstream.STREAM_END_EVENT,
     215                       self.connectionLost)
    216216        self.xmlstream = xs
    217217
    218218        for e in list(self):
    219219            e.makeConnection(xs)
    220220
    221221
    222     def _authd(self, xs):
     222    def connectionInitialized(self, xs):
    223223        """
    224224        Called when the stream has been initialized.
    225225
     
    242242            e.connectionInitialized()
    243243
    244244
    245     def initializationFailed(self, reason):
    246         """
    247         Called when stream initialization has failed.
    248245
    249         Stream initialization has halted, with the reason indicated by
    250         C{reason}. It may be retried by calling the authenticator's
    251         C{initializeStream}. See the respective authenticators for details.
    252 
    253         @param reason: A failure instance indicating why stream initialization
    254                        failed.
    255         @type reason: L{failure.Failure}
    256         """
    257 
    258 
    259     def _disconnected(self, reason):
     246    def connectionLost(self, reason):
    260247        """
    261248        Called when the stream has been closed.
    262249
     
    468455
    469456
    470457
     458class StreamManager(BaseStreamManager):
     459    """
     460    Business logic representing a managed XMPP client connection.
     461
     462    @ivar _packetQueue: internal buffer of unsent data. See L{send} for details.
     463    @type _packetQueue: L{list}
     464    """
     465
     466    def __init__(self, factory, reactor=None):
     467        """
     468        Construct a stream manager.
     469
     470        @param factory: The stream factory to connect with.
     471        @param reactor: A provider of L{IReactorTime} to track timeouts.
     472            If not provided, the global reactor will be used.
     473        """
     474        BaseStreamManager.__init__(self, reactor=reactor)
     475
     476        factory.addBootstrap(xmlstream.STREAM_CONNECTED_EVENT,
     477                             self.makeConnection)
     478        self.factory = factory
     479
     480
     481    def makeConnection(self, xs):
     482        xs.addObserver(xmlstream.INIT_FAILED_EVENT,
     483                       self.initializationFailed)
     484        BaseStreamManager.makeConnection(self, xs)
     485
     486
     487    def initializationFailed(self, reason):
     488        """
     489        Called when stream initialization has failed.
     490
     491        Stream initialization has halted, with the reason indicated by
     492        C{reason}. It may be retried by calling the authenticator's
     493        C{initializeStream}. See the respective authenticators for details.
     494
     495        @param reason: A failure instance indicating why stream initialization
     496                       failed.
     497        @type reason: L{failure.Failure}
     498        """
     499
     500
     501
     502class ServerStreamManager(BaseStreamManager):
     503    """
     504    Business logic representing a managed server-side XMPP connection.
     505    """
     506
     507    def __init__(self, reactor=None):
     508        BaseStreamManager.__init__(self, reactor=reactor)
     509
     510
     511
    471512class IQHandlerMixin(object):
    472513    """
    473514    XMPP subprotocol mixin for handle incoming IQ stanzas.
  • wokkel/test/helpers.py

    diff --git a/wokkel/test/helpers.py b/wokkel/test/helpers.py
    a b  
    108108        factory = DummyFactory()
    109109        StreamManager.__init__(self, factory, reactor)
    110110        self.stub = XmlStreamStub()
    111         self._connected(self.stub.xmlstream)
    112         self._authd(self.stub.xmlstream)
     111        self.makeConnection(self.stub.xmlstream)
     112        self.connectionInitialized(self.stub.xmlstream)
  • wokkel/test/test_client.py

    diff --git a/wokkel/test/test_client.py b/wokkel/test/test_client.py
    a b  
    3838        """
    3939        xs = self.client.factory.buildProtocol(None)
    4040        self.client.factory.authenticator.jid = JID('user@example.org/test')
    41         xs.dispatch(xs, xmlstream.STREAM_AUTHD_EVENT)
     41        self.client.connectionInitialized(xs)
    4242        self.assertEquals(JID('user@example.org/test'), self.client.jid)
    4343
    4444
  • wokkel/test/test_subprotocols.py

    diff --git a/wokkel/test/test_subprotocols.py b/wokkel/test/test_subprotocols.py
    a b  
    190190
    191191
    192192
    193 class StreamManagerTest(unittest.TestCase):
    194     """
    195     Tests for L{subprotocols.StreamManager}.
    196     """
    197 
    198     def setUp(self):
    199         factory = xmlstream.XmlStreamFactory(xmlstream.Authenticator())
    200         self.clock = task.Clock()
    201         self.streamManager = subprotocols.StreamManager(factory, self.clock)
    202         self.xmlstream = factory.buildProtocol(None)
    203         self.transport = proto_helpers.StringTransport()
    204         self.xmlstream.transport = self.transport
    205 
    206         self.request = IQGetStanza(recipient=JID('other@example.org'),
    207                                    sender=JID('user@example.org'))
    208 
     193class BaseStreamManagerTestsMixin(object):
    209194
    210195    def _streamStarted(self):
    211196        """
     
    219204        self.xmlstream.dispatch(self.xmlstream, "//event/stream/authd")
    220205
    221206
    222     def test_basic(self):
     207    def test_interface(self):
    223208        """
    224         Test correct initialization and setup of factory observers.
     209        ServerStreamManager implements IXMPPHandlerCollection.
    225210        """
    226         factory = DummyFactory()
    227         sm = subprotocols.StreamManager(factory)
    228         self.assertIdentical(None, sm.xmlstream)
    229         self.assertEquals([], sm.handlers)
    230         self.assertEquals(sm._connected,
    231                           sm.factory.callbacks['//event/stream/connected'])
    232         self.assertEquals(sm._authd,
    233                           sm.factory.callbacks['//event/stream/authd'])
    234         self.assertEquals(sm._disconnected,
    235                           sm.factory.callbacks['//event/stream/end'])
    236         self.assertEquals(sm.initializationFailed,
    237                           sm.factory.callbacks['//event/xmpp/initfailed'])
     211        verifyObject(ijabber.IXMPPHandlerCollection, self.streamManager)
    238212
    239213
    240     def test_connected(self):
     214    def test_makeConnection(self):
    241215        """
    242216        Test that protocol handlers have their connectionMade method called
    243217        when the XML stream is connected.
     
    245219        sm = self.streamManager
    246220        handler = DummyXMPPHandler()
    247221        handler.setHandlerParent(sm)
    248         xs = xmlstream.XmlStream(xmlstream.Authenticator())
    249         sm._connected(xs)
     222        xs = self.xmlstream
     223        sm.makeConnection(xs)
    250224        self.assertEquals(1, handler.doneMade)
    251225        self.assertEquals(0, handler.doneInitialized)
    252226        self.assertEquals(0, handler.doneLost)
    253227
    254228
    255     def test_connectedLogTrafficFalse(self):
     229    def test_makeConnectionLogTrafficFalse(self):
    256230        """
    257231        Test raw data functions unset when logTraffic is set to False.
    258232        """
    259233        sm = self.streamManager
    260234        handler = DummyXMPPHandler()
    261235        handler.setHandlerParent(sm)
    262         xs = xmlstream.XmlStream(xmlstream.Authenticator())
    263         sm._connected(xs)
     236        xs = self.xmlstream
     237        sm.makeConnection(xs)
    264238        self.assertIdentical(None, xs.rawDataInFn)
    265239        self.assertIdentical(None, xs.rawDataOutFn)
    266240
    267241
    268     def test_connectedLogTrafficTrue(self):
     242    def test_makeConnectionLogTrafficTrue(self):
    269243        """
    270244        Test raw data functions set when logTraffic is set to True.
    271245        """
     
    273247        sm.logTraffic = True
    274248        handler = DummyXMPPHandler()
    275249        handler.setHandlerParent(sm)
    276         xs = xmlstream.XmlStream(xmlstream.Authenticator())
    277         sm._connected(xs)
     250        xs = self.xmlstream
     251        sm.makeConnection(xs)
    278252        self.assertNotIdentical(None, xs.rawDataInFn)
    279253        self.assertNotIdentical(None, xs.rawDataOutFn)
    280254
    281255
    282     def test_authd(self):
     256    def test_connectionInitialized(self):
    283257        """
    284258        Test that protocol handlers have their connectionInitialized method
    285259        called when the XML stream is initialized.
     
    287261        sm = self.streamManager
    288262        handler = DummyXMPPHandler()
    289263        handler.setHandlerParent(sm)
    290         xs = xmlstream.XmlStream(xmlstream.Authenticator())
    291         sm._authd(xs)
     264        xs = self.xmlstream
     265        sm.connectionInitialized(xs)
    292266        self.assertEquals(0, handler.doneMade)
    293267        self.assertEquals(1, handler.doneInitialized)
    294268        self.assertEquals(0, handler.doneLost)
    295269
    296270
    297     def test_disconnected(self):
     271    def test_connectionLost(self):
    298272        """
    299273        Protocol handlers have connectionLost called on stream disconnect.
    300274        """
    301275        sm = self.streamManager
    302276        handler = DummyXMPPHandler()
    303277        handler.setHandlerParent(sm)
    304         sm._disconnected(None)
     278        sm.connectionLost(None)
    305279        self.assertEquals(0, handler.doneMade)
    306280        self.assertEquals(0, handler.doneInitialized)
    307281        self.assertEquals(1, handler.doneLost)
    308282
    309283
    310     def test_disconnectedReason(self):
     284    def test_connectionLostReason(self):
    311285        """
    312286        A L{STREAM_END_EVENT} results in L{StreamManager} firing the handlers
    313287        L{connectionLost} methods, passing a L{failure.Failure} reason.
     
    316290        handler = FailureReasonXMPPHandler()
    317291        handler.setHandlerParent(sm)
    318292        xmlstream.XmlStream(xmlstream.Authenticator())
    319         sm._disconnected(failure.Failure(Exception("no reason")))
     293        sm.connectionLost(failure.Failure(Exception("no reason")))
    320294        self.assertEquals(True, handler.gotFailureReason)
    321295
    322296
     
    338312        Adding a handler when connected doesn't call connectionInitialized.
    339313        """
    340314        sm = self.streamManager
    341         xs = xmlstream.XmlStream(xmlstream.Authenticator())
    342         sm._connected(xs)
     315        xs = self.xmlstream
     316        sm.makeConnection(xs)
    343317        handler = DummyXMPPHandler()
    344318        handler.setHandlerParent(sm)
    345319
     
    361335                self.nestedHandler.setHandlerParent(self.parent)
    362336
    363337        sm = self.streamManager
    364         xs = xmlstream.XmlStream(xmlstream.Authenticator())
     338        xs = self.xmlstream
    365339        handler = NestingHandler()
    366340        handler.setHandlerParent(sm)
    367         sm._connected(xs)
     341        sm.makeConnection(xs)
    368342
    369343        self.assertEquals(1, handler.doneMade)
    370344        self.assertEquals(0, handler.doneInitialized)
     
    386360        called.
    387361        """
    388362        sm = self.streamManager
    389         xs = xmlstream.XmlStream(xmlstream.Authenticator())
    390         sm._connected(xs)
    391         sm._authd(xs)
     363        xs = self.xmlstream
     364        sm.makeConnection(xs)
     365        sm.connectionInitialized(xs)
    392366        handler = DummyXMPPHandler()
    393367        handler.setHandlerParent(sm)
    394368
     
    410384                self.nestedHandler.setHandlerParent(self.parent)
    411385
    412386        sm = self.streamManager
    413         xs = xmlstream.XmlStream(xmlstream.Authenticator())
     387        xs = self.xmlstream
    414388        handler = NestingHandler()
    415389        handler.setHandlerParent(sm)
    416         sm._connected(xs)
    417         sm._authd(xs)
     390        sm.makeConnection(xs)
     391        sm.connectionInitialized(xs)
    418392
    419393        self.assertEquals(1, handler.doneMade)
    420394        self.assertEquals(1, handler.doneInitialized)
     
    438412                self.nestedHandler.setHandlerParent(self.parent)
    439413
    440414        sm = self.streamManager
    441         xs = xmlstream.XmlStream(xmlstream.Authenticator())
     415        xs = self.xmlstream
    442416        handler = NestingHandler()
    443417        handler.setHandlerParent(sm)
    444         sm._connected(xs)
    445         sm._authd(xs)
    446         sm._disconnected(xs)
     418        sm.makeConnection(xs)
     419        sm.connectionInitialized(xs)
     420        sm.connectionLost(xs)
    447421
    448422        self.assertEquals(1, handler.doneMade)
    449423        self.assertEquals(1, handler.doneInitialized)
     
    473447
    474448        The data should be sent directly over the XML stream.
    475449        """
    476         factory = xmlstream.XmlStreamFactory(xmlstream.Authenticator())
    477         sm = subprotocols.StreamManager(factory)
    478         xs = factory.buildProtocol(None)
    479         xs.transport = proto_helpers.StringTransport()
     450        xs = self.xmlstream
    480451        xs.connectionMade()
    481452        xs.dataReceived("<stream:stream xmlns='jabber:client' "
    482453                        "xmlns:stream='http://etherx.jabber.org/streams' "
    483454                        "from='example.com' id='12345'>")
    484455        xs.dispatch(xs, "//event/stream/authd")
    485         sm.send("<presence/>")
     456        self.streamManager.send("<presence/>")
    486457        self.assertEquals("<presence/>", xs.transport.value())
    487458
    488459
     
    493464        The data should be cached until an XML stream has been established and
    494465        initialized.
    495466        """
    496         factory = xmlstream.XmlStreamFactory(xmlstream.Authenticator())
    497         sm = subprotocols.StreamManager(factory)
     467        sm = self.streamManager
    498468        handler = DummyXMPPHandler()
    499469        sm.addHandler(handler)
    500470
    501         xs = factory.buildProtocol(None)
     471        xs = self.xmlstream
    502472        xs.transport = proto_helpers.StringTransport()
    503473        sm.send("<presence/>")
    504474        self.assertEquals("", xs.transport.value())
     
    525495        """
    526496        factory = xmlstream.XmlStreamFactory(xmlstream.Authenticator())
    527497        sm = subprotocols.StreamManager(factory)
    528         xs = factory.buildProtocol(None)
     498        xs = self.xmlstream
    529499        xs.transport = proto_helpers.StringTransport()
    530500        xs.connectionMade()
    531501        xs.dataReceived("<stream:stream xmlns='jabber:client' "
     
    548518        handler = DummyXMPPHandler()
    549519        sm.addHandler(handler)
    550520
    551         xs = factory.buildProtocol(None)
     521        xs = self.xmlstream
    552522        xs.connectionMade()
    553523        xs.transport = proto_helpers.StringTransport()
    554524        xs.connectionLost(None)
     
    720690        """
    721691        d = self.streamManager.request(self.request)
    722692        xs = self.xmlstream
     693        xs.connectionMade()
    723694        xs.connectionLost(failure.Failure(ConnectionDone()))
    724695        self.assertFailure(d, ConnectionDone)
    725696        return d
     
    735706            d = xmlstream.IQ(self.xmlstream).send()
    736707            d.addErrback(eb)
    737708
     709        self.xmlstream.connectionMade()
    738710        d = self.streamManager.request(self.request)
    739711        d.addErrback(eb)
    740712        self.xmlstream.connectionLost(failure.Failure(ConnectionDone()))
     
    780752        self.request.timeout = 60
    781753        d = self.streamManager.request(self.request)
    782754
     755        self.xmlstream.connectionMade()
    783756        self.xmlstream.connectionLost(failure.Failure(ConnectionDone()))
    784757        self.assertFailure(d, ConnectionDone)
    785758        self.assertFalse(self.clock.calls)
     
    822795
    823796
    824797
     798class StreamManagerTest(unittest.TestCase, BaseStreamManagerTestsMixin):
     799    """
     800    Tests for L{subprotocols.StreamManager}.
     801    """
     802
     803    def setUp(self):
     804        factory = xmlstream.XmlStreamFactory(xmlstream.Authenticator())
     805        self.clock = task.Clock()
     806        self.streamManager = subprotocols.StreamManager(factory, self.clock)
     807        self.xmlstream = factory.buildProtocol(None)
     808        self.transport = proto_helpers.StringTransport()
     809        self.xmlstream.transport = self.transport
     810
     811        self.request = IQGetStanza(recipient=JID('other@example.org'),
     812                                   sender=JID('user@example.org'))
     813
     814
     815    def test_basic(self):
     816        """
     817        Test correct initialization and setup of factory observers.
     818        """
     819        factory = DummyFactory()
     820        sm = subprotocols.StreamManager(factory)
     821        self.assertIdentical(None, sm.xmlstream)
     822        self.assertEquals([], sm.handlers)
     823        self.assertEquals(sm.makeConnection,
     824                          sm.factory.callbacks['//event/stream/connected'])
     825
     826
     827
     828class ServerStreamManagerTest(unittest.TestCase, BaseStreamManagerTestsMixin):
     829    """
     830    Tests for L{subprotocols.StreamManager}.
     831    """
     832
     833    def setUp(self):
     834        self.clock = task.Clock()
     835        self.streamManager = subprotocols.ServerStreamManager(
     836            reactor=self.clock)
     837
     838        self.xmlstream = xmlstream.XmlStream(xmlstream.Authenticator())
     839        self.transport = proto_helpers.StringTransport()
     840        self.xmlstream.transport = self.transport
     841
     842        self.xmlstream.addObserver(xmlstream.STREAM_CONNECTED_EVENT,
     843                                   self.streamManager.makeConnection)
     844
     845        self.request = IQGetStanza(recipient=JID('other@example.org'),
     846                                   sender=JID('user@example.org'))
     847
     848
     849
    825850class DummyIQHandler(subprotocols.IQHandlerMixin):
    826851    iqHandlers = {'/iq[@type="get"]': 'onGet'}
    827852
Note: See TracBrowser for help on using the repository browser.