source: ralphm-patches/server-stream-manager.patch @ 74:355afce3af27

Last change on this file since 74:355afce3af27 was 74:355afce3af27, checked in by Ralph Meijer <ralphm@…>, 6 years ago

Refactor Component to match changes to StreamManager?.

File size: 21.2 KB
  • wokkel/client.py

    # HG changeset patch
    # Parent 71082c22f73200a0b8b4b5c6e5d2773f9f68dd4d
    Generalize StreamManager and add ServerStreamManager.
    
    This generalizes `StreamManager` to `BaseStreamManager` to take the
    common functionality and reuse it for `ServerStreamManager`. Where
    `StreamManager` is used for initiating connections, with a factory as
    a parameter, `ServerStreamManager` works for receiving connections. It
    must be created in a protocol factory when a connection is established,
    and then its `makeConnection` should be called to hook it up to the
    `XmlStream` instance.
    
    diff --git a/wokkel/client.py b/wokkel/client.py
    a b  
    109109        self._connection.disconnect()
    110110
    111111
    112     def _authd(self, xs):
     112    def connectionInitialized(self, xs):
    113113        """
    114114        Called when the stream has been initialized.
    115115
     
    118118        by its constituent initializers.
    119119        """
    120120        self.jid = self.factory.authenticator.jid
    121         StreamManager._authd(self, xs)
     121        StreamManager.connectionInitialized(self, xs)
    122122
    123123
    124124    def initializationFailed(self, reason):
  • wokkel/component.py

    diff --git a/wokkel/component.py b/wokkel/component.py
    a b  
    3636        StreamManager.__init__(self, factory)
    3737
    3838
    39     def _authd(self, xs):
     39    def connectionInitialized(self, xs):
    4040        """
    4141        Called when stream initialization has completed.
    4242
     
    5353            old_send(obj)
    5454
    5555        xs.send = send
    56         StreamManager._authd(self, xs)
     56        StreamManager.connectionInitialized(self, xs)
    5757
    5858
    5959    def initializationFailed(self, reason):
  • wokkel/subprotocols.py

    diff --git a/wokkel/subprotocols.py b/wokkel/subprotocols.py
    a b  
    120120
    121121
    122122
    123 class StreamManager(XMPPHandlerCollection):
     123class BaseStreamManager(XMPPHandlerCollection):
    124124    """
    125125    Business logic representing a managed XMPP connection.
    126126
     
    131131
    132132    @ivar xmlstream: currently managed XML stream
    133133    @type xmlstream: L{XmlStream}
     134
    134135    @ivar logTraffic: if true, log all traffic.
    135136    @type logTraffic: C{bool}
     137
    136138    @ivar _initialized: Whether the stream represented by L{xmlstream} has
    137139                        been initialized. This is used when caching outgoing
    138140                        stanzas.
    139141    @type _initialized: C{bool}
    140     @ivar _packetQueue: internal buffer of unsent data. See L{send} for details.
    141     @type _packetQueue: L{list}
     142
    142143    @ivar timeout: Default IQ request timeout in seconds.
    143144    @type timeout: C{int}
     145
    144146    @ivar _reactor: A provider of L{IReactorTime} to track timeouts.
    145147    """
     148
    146149    timeout = None
    147150    _reactor = None
    148151
    149152    logTraffic = False
    150153
    151     def __init__(self, factory, reactor=None):
     154    def __init__(self, reactor=None):
    152155        """
    153156        Construct a stream manager.
    154157
    155         @param factory: The stream factory to connect with.
    156158        @param reactor: A provider of L{IReactorTime} to track timeouts.
    157159            If not provided, the global reactor will be used.
    158160        """
    159161        XMPPHandlerCollection.__init__(self)
     162
    160163        self.xmlstream = None
    161164        self._packetQueue = []
    162165        self._initialized = False
    163166
    164         factory.addBootstrap(xmlstream.STREAM_CONNECTED_EVENT, self._connected)
    165         factory.addBootstrap(xmlstream.STREAM_AUTHD_EVENT, self._authd)
    166         factory.addBootstrap(xmlstream.INIT_FAILED_EVENT,
    167                              self.initializationFailed)
    168         factory.addBootstrap(xmlstream.STREAM_END_EVENT, self._disconnected)
    169         self.factory = factory
    170 
    171167        if reactor is None:
    172168            from twisted.internet import reactor
    173169        self._reactor = reactor
     
    193189            handler.connectionInitialized()
    194190
    195191
    196     def _connected(self, xs):
     192    def makeConnection(self, xs):
    197193        """
    198194        Called when the transport connection has been established.
    199195
     
    211207            xs.rawDataInFn = logDataIn
    212208            xs.rawDataOutFn = logDataOut
    213209
     210        xs.addObserver(xmlstream.STREAM_AUTHD_EVENT,
     211                       self.connectionInitialized)
     212        xs.addObserver(xmlstream.STREAM_END_EVENT,
     213                       self.connectionLost)
    214214        self.xmlstream = xs
    215215
    216216        for e in list(self):
    217217            e.makeConnection(xs)
    218218
    219219
    220     def _authd(self, xs):
     220    def connectionInitialized(self, xs):
    221221        """
    222222        Called when the stream has been initialized.
    223223
     
    240240            e.connectionInitialized()
    241241
    242242
    243     def initializationFailed(self, reason):
    244         """
    245         Called when stream initialization has failed.
    246243
    247         Stream initialization has halted, with the reason indicated by
    248         C{reason}. It may be retried by calling the authenticator's
    249         C{initializeStream}. See the respective authenticators for details.
    250 
    251         @param reason: A failure instance indicating why stream initialization
    252                        failed.
    253         @type reason: L{failure.Failure}
    254         """
    255 
    256 
    257     def _disconnected(self, reason):
     244    def connectionLost(self, reason):
    258245        """
    259246        Called when the stream has been closed.
    260247
     
    464451
    465452
    466453
     454class StreamManager(BaseStreamManager):
     455    """
     456    Business logic representing a managed XMPP client connection.
     457
     458    @ivar _packetQueue: internal buffer of unsent data. See L{send} for details.
     459    @type _packetQueue: L{list}
     460    """
     461
     462    def __init__(self, factory, reactor=None):
     463        """
     464        Construct a stream manager.
     465
     466        @param factory: The stream factory to connect with.
     467        @param reactor: A provider of L{IReactorTime} to track timeouts.
     468            If not provided, the global reactor will be used.
     469        """
     470        BaseStreamManager.__init__(self, reactor=reactor)
     471
     472        factory.addBootstrap(xmlstream.STREAM_CONNECTED_EVENT,
     473                             self.makeConnection)
     474        self.factory = factory
     475
     476
     477    def makeConnection(self, xs):
     478        xs.addObserver(xmlstream.INIT_FAILED_EVENT,
     479                       self.initializationFailed)
     480        BaseStreamManager.makeConnection(self, xs)
     481
     482
     483    def initializationFailed(self, reason):
     484        """
     485        Called when stream initialization has failed.
     486
     487        Stream initialization has halted, with the reason indicated by
     488        C{reason}. It may be retried by calling the authenticator's
     489        C{initializeStream}. See the respective authenticators for details.
     490
     491        @param reason: A failure instance indicating why stream initialization
     492                       failed.
     493        @type reason: L{failure.Failure}
     494        """
     495
     496
     497
     498class ServerStreamManager(BaseStreamManager):
     499    """
     500    Business logic representing a managed server-side XMPP connection.
     501    """
     502
     503    def __init__(self, reactor=None):
     504        BaseStreamManager.__init__(self, reactor=reactor)
     505
     506
     507
    467508class IQHandlerMixin(object):
    468509    """
    469510    XMPP subprotocol mixin for handle incoming IQ stanzas.
  • wokkel/test/helpers.py

    diff --git a/wokkel/test/helpers.py b/wokkel/test/helpers.py
    a b  
    108108        factory = DummyFactory()
    109109        StreamManager.__init__(self, factory, reactor)
    110110        self.stub = XmlStreamStub()
    111         self._connected(self.stub.xmlstream)
    112         self._authd(self.stub.xmlstream)
     111        self.makeConnection(self.stub.xmlstream)
     112        self.connectionInitialized(self.stub.xmlstream)
  • wokkel/test/test_client.py

    diff --git a/wokkel/test/test_client.py b/wokkel/test/test_client.py
    a b  
    3838        """
    3939        xs = self.client.factory.buildProtocol(None)
    4040        self.client.factory.authenticator.jid = JID('user@example.org/test')
    41         xs.dispatch(xs, xmlstream.STREAM_AUTHD_EVENT)
     41        self.client.connectionInitialized(xs)
    4242        self.assertEquals(JID('user@example.org/test'), self.client.jid)
    4343
    4444
  • wokkel/test/test_component.py

    diff --git a/wokkel/test/test_component.py b/wokkel/test/test_component.py
    a b  
    4949    def __init__(self, *args, **kwargs):
    5050        component.Component.__init__(self, *args, **kwargs)
    5151        self.factory.clock = Clock()
     52        self.output = []
    5253
    5354
    5455    def _getConnection(self):
    5556        c = FakeConnector(self.factory, None, None)
    5657        c.connect()
     58        xs = self.factory.buildProtocol(None)
     59        xs.send = self.output.append
     60        xs.connectionMade()
     61        self.makeConnection(xs)
     62        xs.thisEntity = xs.otherEntity
     63        xs.dispatch(xs, xmlstream.STREAM_AUTHD_EVENT)
    5764        return c
    5865
    5966
     
    104111        self.assertEqual(1, connector.connects)
    105112
    106113
     114    def test_stampFrom(self):
     115        """
     116        Outgoing elements with missing sender address get component JID.
     117        """
     118        comp = TestableComponent('example.org', 5347,
     119                                 'test.example.org', 'secret')
     120        comp.startService()
     121
     122        element = domish.Element((component.NS_COMPONENT_ACCEPT, "message"))
     123        element["to"] = "test@example.org"
     124        comp.xmlstream.send(element)
     125
     126        self.assertEqual('test.example.org', element.getAttribute("from"))
     127
     128
    107129
    108130class InternalComponentTest(unittest.TestCase):
    109131    """
  • wokkel/test/test_subprotocols.py

    diff --git a/wokkel/test/test_subprotocols.py b/wokkel/test/test_subprotocols.py
    a b  
    189189
    190190
    191191
    192 class StreamManagerTest(unittest.TestCase):
    193     """
    194     Tests for L{subprotocols.StreamManager}.
    195     """
    196 
    197     def setUp(self):
    198         factory = xmlstream.XmlStreamFactory(xmlstream.Authenticator())
    199         self.clock = task.Clock()
    200         self.streamManager = subprotocols.StreamManager(factory, self.clock)
    201         self.xmlstream = factory.buildProtocol(None)
    202         self.transport = proto_helpers.StringTransport()
    203         self.xmlstream.transport = self.transport
    204 
    205         self.request = IQGetStanza()
     192class BaseStreamManagerTestsMixin(object):
    206193
    207194    def _streamStarted(self):
    208195        """
     
    216203        self.xmlstream.dispatch(self.xmlstream, "//event/stream/authd")
    217204
    218205
    219     def test_basic(self):
     206    def test_interface(self):
    220207        """
    221         Test correct initialization and setup of factory observers.
     208        ServerStreamManager implements IXMPPHandlerCollection.
    222209        """
    223         factory = DummyFactory()
    224         sm = subprotocols.StreamManager(factory)
    225         self.assertIdentical(None, sm.xmlstream)
    226         self.assertEquals([], sm.handlers)
    227         self.assertEquals(sm._connected,
    228                           sm.factory.callbacks['//event/stream/connected'])
    229         self.assertEquals(sm._authd,
    230                           sm.factory.callbacks['//event/stream/authd'])
    231         self.assertEquals(sm._disconnected,
    232                           sm.factory.callbacks['//event/stream/end'])
    233         self.assertEquals(sm.initializationFailed,
    234                           sm.factory.callbacks['//event/xmpp/initfailed'])
     210        verifyObject(ijabber.IXMPPHandlerCollection, self.streamManager)
    235211
    236212
    237     def test_connected(self):
     213    def test_makeConnection(self):
    238214        """
    239215        Test that protocol handlers have their connectionMade method called
    240216        when the XML stream is connected.
     
    242218        sm = self.streamManager
    243219        handler = DummyXMPPHandler()
    244220        handler.setHandlerParent(sm)
    245         xs = xmlstream.XmlStream(xmlstream.Authenticator())
    246         sm._connected(xs)
     221        xs = self.xmlstream
     222        sm.makeConnection(xs)
    247223        self.assertEquals(1, handler.doneMade)
    248224        self.assertEquals(0, handler.doneInitialized)
    249225        self.assertEquals(0, handler.doneLost)
    250226
    251227
    252     def test_connectedLogTrafficFalse(self):
     228    def test_makeConnectionLogTrafficFalse(self):
    253229        """
    254230        Test raw data functions unset when logTraffic is set to False.
    255231        """
    256232        sm = self.streamManager
    257233        handler = DummyXMPPHandler()
    258234        handler.setHandlerParent(sm)
    259         xs = xmlstream.XmlStream(xmlstream.Authenticator())
    260         sm._connected(xs)
     235        xs = self.xmlstream
     236        sm.makeConnection(xs)
    261237        self.assertIdentical(None, xs.rawDataInFn)
    262238        self.assertIdentical(None, xs.rawDataOutFn)
    263239
    264240
    265     def test_connectedLogTrafficTrue(self):
     241    def test_makeConnectionLogTrafficTrue(self):
    266242        """
    267243        Test raw data functions set when logTraffic is set to True.
    268244        """
     
    270246        sm.logTraffic = True
    271247        handler = DummyXMPPHandler()
    272248        handler.setHandlerParent(sm)
    273         xs = xmlstream.XmlStream(xmlstream.Authenticator())
    274         sm._connected(xs)
     249        xs = self.xmlstream
     250        sm.makeConnection(xs)
    275251        self.assertNotIdentical(None, xs.rawDataInFn)
    276252        self.assertNotIdentical(None, xs.rawDataOutFn)
    277253
    278254
    279     def test_authd(self):
     255    def test_connectionInitialized(self):
    280256        """
    281257        Test that protocol handlers have their connectionInitialized method
    282258        called when the XML stream is initialized.
     
    284260        sm = self.streamManager
    285261        handler = DummyXMPPHandler()
    286262        handler.setHandlerParent(sm)
    287         xs = xmlstream.XmlStream(xmlstream.Authenticator())
    288         sm._authd(xs)
     263        xs = self.xmlstream
     264        sm.connectionInitialized(xs)
    289265        self.assertEquals(0, handler.doneMade)
    290266        self.assertEquals(1, handler.doneInitialized)
    291267        self.assertEquals(0, handler.doneLost)
    292268
    293269
    294     def test_disconnected(self):
     270    def test_connectionLost(self):
    295271        """
    296272        Protocol handlers have connectionLost called on stream disconnect.
    297273        """
    298274        sm = self.streamManager
    299275        handler = DummyXMPPHandler()
    300276        handler.setHandlerParent(sm)
    301         sm._disconnected(None)
     277        sm.connectionLost(None)
    302278        self.assertEquals(0, handler.doneMade)
    303279        self.assertEquals(0, handler.doneInitialized)
    304280        self.assertEquals(1, handler.doneLost)
    305281
    306282
    307     def test_disconnectedReason(self):
     283    def test_connectionLostReason(self):
    308284        """
    309285        A L{STREAM_END_EVENT} results in L{StreamManager} firing the handlers
    310286        L{connectionLost} methods, passing a L{failure.Failure} reason.
     
    313289        handler = FailureReasonXMPPHandler()
    314290        handler.setHandlerParent(sm)
    315291        xmlstream.XmlStream(xmlstream.Authenticator())
    316         sm._disconnected(failure.Failure(Exception("no reason")))
     292        sm.connectionLost(failure.Failure(Exception("no reason")))
    317293        self.assertEquals(True, handler.gotFailureReason)
    318294
    319295
     
    335311        Adding a handler when connected doesn't call connectionInitialized.
    336312        """
    337313        sm = self.streamManager
    338         xs = xmlstream.XmlStream(xmlstream.Authenticator())
    339         sm._connected(xs)
     314        xs = self.xmlstream
     315        sm.makeConnection(xs)
    340316        handler = DummyXMPPHandler()
    341317        handler.setHandlerParent(sm)
    342318
     
    358334                self.nestedHandler.setHandlerParent(self.parent)
    359335
    360336        sm = self.streamManager
    361         xs = xmlstream.XmlStream(xmlstream.Authenticator())
     337        xs = self.xmlstream
    362338        handler = NestingHandler()
    363339        handler.setHandlerParent(sm)
    364         sm._connected(xs)
     340        sm.makeConnection(xs)
    365341
    366342        self.assertEquals(1, handler.doneMade)
    367343        self.assertEquals(0, handler.doneInitialized)
     
    383359        called.
    384360        """
    385361        sm = self.streamManager
    386         xs = xmlstream.XmlStream(xmlstream.Authenticator())
    387         sm._connected(xs)
    388         sm._authd(xs)
     362        xs = self.xmlstream
     363        sm.makeConnection(xs)
     364        sm.connectionInitialized(xs)
    389365        handler = DummyXMPPHandler()
    390366        handler.setHandlerParent(sm)
    391367
     
    407383                self.nestedHandler.setHandlerParent(self.parent)
    408384
    409385        sm = self.streamManager
    410         xs = xmlstream.XmlStream(xmlstream.Authenticator())
     386        xs = self.xmlstream
    411387        handler = NestingHandler()
    412388        handler.setHandlerParent(sm)
    413         sm._connected(xs)
    414         sm._authd(xs)
     389        sm.makeConnection(xs)
     390        sm.connectionInitialized(xs)
    415391
    416392        self.assertEquals(1, handler.doneMade)
    417393        self.assertEquals(1, handler.doneInitialized)
     
    435411                self.nestedHandler.setHandlerParent(self.parent)
    436412
    437413        sm = self.streamManager
    438         xs = xmlstream.XmlStream(xmlstream.Authenticator())
     414        xs = self.xmlstream
    439415        handler = NestingHandler()
    440416        handler.setHandlerParent(sm)
    441         sm._connected(xs)
    442         sm._authd(xs)
    443         sm._disconnected(xs)
     417        sm.makeConnection(xs)
     418        sm.connectionInitialized(xs)
     419        sm.connectionLost(xs)
    444420
    445421        self.assertEquals(1, handler.doneMade)
    446422        self.assertEquals(1, handler.doneInitialized)
     
    470446
    471447        The data should be sent directly over the XML stream.
    472448        """
    473         factory = xmlstream.XmlStreamFactory(xmlstream.Authenticator())
    474         sm = subprotocols.StreamManager(factory)
    475         xs = factory.buildProtocol(None)
    476         xs.transport = proto_helpers.StringTransport()
     449        xs = self.xmlstream
    477450        xs.connectionMade()
    478451        xs.dataReceived("<stream:stream xmlns='jabber:client' "
    479452                        "xmlns:stream='http://etherx.jabber.org/streams' "
    480453                        "from='example.com' id='12345'>")
    481454        xs.dispatch(xs, "//event/stream/authd")
    482         sm.send("<presence/>")
     455        self.streamManager.send("<presence/>")
    483456        self.assertEquals("<presence/>", xs.transport.value())
    484457
    485458
     
    490463        The data should be cached until an XML stream has been established and
    491464        initialized.
    492465        """
    493         factory = xmlstream.XmlStreamFactory(xmlstream.Authenticator())
    494         sm = subprotocols.StreamManager(factory)
     466        sm = self.streamManager
    495467        handler = DummyXMPPHandler()
    496468        sm.addHandler(handler)
    497469
    498         xs = factory.buildProtocol(None)
     470        xs = self.xmlstream
    499471        xs.transport = proto_helpers.StringTransport()
    500472        sm.send("<presence/>")
    501473        self.assertEquals("", xs.transport.value())
     
    522494        """
    523495        factory = xmlstream.XmlStreamFactory(xmlstream.Authenticator())
    524496        sm = subprotocols.StreamManager(factory)
    525         xs = factory.buildProtocol(None)
     497        xs = self.xmlstream
    526498        xs.transport = proto_helpers.StringTransport()
    527499        xs.connectionMade()
    528500        xs.dataReceived("<stream:stream xmlns='jabber:client' "
     
    545517        handler = DummyXMPPHandler()
    546518        sm.addHandler(handler)
    547519
    548         xs = factory.buildProtocol(None)
     520        xs = self.xmlstream
    549521        xs.connectionMade()
    550522        xs.transport = proto_helpers.StringTransport()
    551523        xs.connectionLost(None)
     
    677649        """
    678650        d = self.streamManager.request(self.request)
    679651        xs = self.xmlstream
     652        xs.connectionMade()
    680653        xs.connectionLost(failure.Failure(ConnectionDone()))
    681654        self.assertFailure(d, ConnectionDone)
    682655        return d
     
    692665            d = xmlstream.IQ(self.xmlstream).send()
    693666            d.addErrback(eb)
    694667
     668        self.xmlstream.connectionMade()
    695669        d = self.streamManager.request(self.request)
    696670        d.addErrback(eb)
    697671        self.xmlstream.connectionLost(failure.Failure(ConnectionDone()))
     
    736710        self.request.timeout = 60
    737711        d = self.streamManager.request(self.request)
    738712
     713        self.xmlstream.connectionMade()
    739714        self.xmlstream.connectionLost(failure.Failure(ConnectionDone()))
    740715        self.assertFailure(d, ConnectionDone)
    741716        self.assertFalse(self.clock.calls)
     
    778753
    779754
    780755
     756class StreamManagerTest(unittest.TestCase, BaseStreamManagerTestsMixin):
     757    """
     758    Tests for L{subprotocols.StreamManager}.
     759    """
     760
     761    def setUp(self):
     762        factory = xmlstream.XmlStreamFactory(xmlstream.Authenticator())
     763        self.clock = task.Clock()
     764        self.streamManager = subprotocols.StreamManager(factory, self.clock)
     765        self.xmlstream = factory.buildProtocol(None)
     766        self.transport = proto_helpers.StringTransport()
     767        self.xmlstream.transport = self.transport
     768
     769        self.request = IQGetStanza()
     770
     771
     772    def test_basic(self):
     773        """
     774        Test correct initialization and setup of factory observers.
     775        """
     776        factory = DummyFactory()
     777        sm = subprotocols.StreamManager(factory)
     778        self.assertIdentical(None, sm.xmlstream)
     779        self.assertEquals([], sm.handlers)
     780        self.assertEquals(sm.makeConnection,
     781                          sm.factory.callbacks['//event/stream/connected'])
     782
     783
     784
     785class ServerStreamManagerTest(unittest.TestCase, BaseStreamManagerTestsMixin):
     786    """
     787    Tests for L{subprotocols.StreamManager}.
     788    """
     789
     790    def setUp(self):
     791        self.clock = task.Clock()
     792        self.streamManager = subprotocols.ServerStreamManager(
     793            reactor=self.clock)
     794
     795        self.xmlstream = xmlstream.XmlStream(xmlstream.Authenticator())
     796        self.transport = proto_helpers.StringTransport()
     797        self.xmlstream.transport = self.transport
     798
     799        self.xmlstream.addObserver(xmlstream.STREAM_CONNECTED_EVENT,
     800                                   self.streamManager.makeConnection)
     801
     802        self.request = IQGetStanza()
     803
     804
     805
    781806class DummyIQHandler(subprotocols.IQHandlerMixin):
    782807    iqHandlers = {'/iq[@type="get"]': 'onGet'}
    783808
Note: See TracBrowser for help on using the repository browser.