source: ralphm-patches/server-stream-manager.patch @ 72:727b4d29c48e

Last change on this file since 72:727b4d29c48e was 72:727b4d29c48e, checked in by Ralph Meijer <ralphm@…>, 8 years ago

Major reworking of avatars, session manager and stanza handlers.

File size: 19.3 KB
  • wokkel/client.py

    # HG changeset patch
    # Parent 3f3fe954b1975c2d9115e0fa8177ae7b28a708a8
    Generalize StreamManager and add ServerStreamManager.
    
    This generalizes `StreamManager` to `BaseStreamManager` to take the
    common functionality and reuse it for `ServerStreamManager`. Where
    `StreamManager` is used for initiating connections, with a factory as
    a parameter, `ServerStreamManager` works for receiving connections. It
    must be created in a protocol factory when a connection is established,
    and then its `makeConnection` should be called to hook it up to the
    `XmlStream` instance.
    
    diff --git a/wokkel/client.py b/wokkel/client.py
    a b  
    109109        self._connection.disconnect()
    110110
    111111
    112     def _authd(self, xs):
     112    def connectionInitialized(self, xs):
    113113        """
    114114        Called when the stream has been initialized.
    115115
     
    118118        by its constituent initializers.
    119119        """
    120120        self.jid = self.factory.authenticator.jid
    121         StreamManager._authd(self, xs)
     121        StreamManager.connectionInitialized(self, xs)
    122122
    123123
    124124    def initializationFailed(self, reason):
  • wokkel/subprotocols.py

    diff --git a/wokkel/subprotocols.py b/wokkel/subprotocols.py
    a b  
    118118
    119119
    120120
    121 class StreamManager(XMPPHandlerCollection):
     121class BaseStreamManager(XMPPHandlerCollection):
    122122    """
    123123    Business logic representing a managed XMPP connection.
    124124
     
    129129
    130130    @ivar xmlstream: currently managed XML stream
    131131    @type xmlstream: L{XmlStream}
     132
    132133    @ivar logTraffic: if true, log all traffic.
    133134    @type logTraffic: C{bool}
     135
    134136    @ivar _initialized: Whether the stream represented by L{xmlstream} has
    135137                        been initialized. This is used when caching outgoing
    136138                        stanzas.
    137139    @type _initialized: C{bool}
    138     @ivar _packetQueue: internal buffer of unsent data. See L{send} for details.
    139     @type _packetQueue: L{list}
     140
    140141    @ivar timeout: Default IQ request timeout in seconds.
    141142    @type timeout: C{int}
     143
    142144    @ivar _reactor: A provider of L{IReactorTime} to track timeouts.
    143145    """
     146
    144147    timeout = None
    145148    _reactor = None
    146149
    147150    logTraffic = False
    148151
    149     def __init__(self, factory, reactor=None):
     152    def __init__(self, reactor=None):
    150153        """
    151154        Construct a stream manager.
    152155
    153         @param factory: The stream factory to connect with.
    154156        @param reactor: A provider of L{IReactorTime} to track timeouts.
    155157            If not provided, the global reactor will be used.
    156158        """
    157159        XMPPHandlerCollection.__init__(self)
     160
    158161        self.xmlstream = None
    159162        self._packetQueue = []
    160163        self._initialized = False
    161164
    162         factory.addBootstrap(xmlstream.STREAM_CONNECTED_EVENT, self._connected)
    163         factory.addBootstrap(xmlstream.STREAM_AUTHD_EVENT, self._authd)
    164         factory.addBootstrap(xmlstream.INIT_FAILED_EVENT,
    165                              self.initializationFailed)
    166         factory.addBootstrap(xmlstream.STREAM_END_EVENT, self._disconnected)
    167         self.factory = factory
    168 
    169165        if reactor is None:
    170166            from twisted.internet import reactor
    171167        self._reactor = reactor
     
    191187            handler.connectionInitialized()
    192188
    193189
    194     def _connected(self, xs):
     190    def makeConnection(self, xs):
    195191        """
    196192        Called when the transport connection has been established.
    197193
     
    209205            xs.rawDataInFn = logDataIn
    210206            xs.rawDataOutFn = logDataOut
    211207
     208        xs.addObserver(xmlstream.STREAM_AUTHD_EVENT,
     209                       self.connectionInitialized)
     210        xs.addObserver(xmlstream.STREAM_END_EVENT,
     211                       self.connectionLost)
    212212        self.xmlstream = xs
    213213
    214214        for e in list(self):
    215215            e.makeConnection(xs)
    216216
    217217
    218     def _authd(self, xs):
     218    def connectionInitialized(self, xs):
    219219        """
    220220        Called when the stream has been initialized.
    221221
     
    238238            e.connectionInitialized()
    239239
    240240
    241     def initializationFailed(self, reason):
    242         """
    243         Called when stream initialization has failed.
    244241
    245         Stream initialization has halted, with the reason indicated by
    246         C{reason}. It may be retried by calling the authenticator's
    247         C{initializeStream}. See the respective authenticators for details.
    248 
    249         @param reason: A failure instance indicating why stream initialization
    250                        failed.
    251         @type reason: L{failure.Failure}
    252         """
    253 
    254 
    255     def _disconnected(self, reason):
     242    def connectionLost(self, reason):
    256243        """
    257244        Called when the stream has been closed.
    258245
     
    379366
    380367
    381368
     369class StreamManager(BaseStreamManager):
     370    """
     371    Business logic representing a managed XMPP client connection.
     372
     373    @ivar _packetQueue: internal buffer of unsent data. See L{send} for details.
     374    @type _packetQueue: L{list}
     375    """
     376
     377    def __init__(self, factory, reactor=None):
     378        """
     379        Construct a stream manager.
     380
     381        @param factory: The stream factory to connect with.
     382        @param reactor: A provider of L{IReactorTime} to track timeouts.
     383            If not provided, the global reactor will be used.
     384        """
     385        BaseStreamManager.__init__(self, reactor=reactor)
     386
     387        factory.addBootstrap(xmlstream.STREAM_CONNECTED_EVENT,
     388                             self.makeConnection)
     389        self.factory = factory
     390
     391
     392    def makeConnection(self, xs):
     393        xs.addObserver(xmlstream.INIT_FAILED_EVENT,
     394                       self.initializationFailed)
     395        BaseStreamManager.makeConnection(self, xs)
     396
     397
     398    def initializationFailed(self, reason):
     399        """
     400        Called when stream initialization has failed.
     401
     402        Stream initialization has halted, with the reason indicated by
     403        C{reason}. It may be retried by calling the authenticator's
     404        C{initializeStream}. See the respective authenticators for details.
     405
     406        @param reason: A failure instance indicating why stream initialization
     407                       failed.
     408        @type reason: L{failure.Failure}
     409        """
     410
     411
     412
     413class ServerStreamManager(BaseStreamManager):
     414    """
     415    Business logic representing a managed server-side XMPP connection.
     416    """
     417
     418    def __init__(self, reactor=None):
     419        BaseStreamManager.__init__(self, reactor=reactor)
     420
     421
     422
    382423class IQHandlerMixin(object):
    383424    """
    384425    XMPP subprotocol mixin for handle incoming IQ stanzas.
  • wokkel/test/helpers.py

    diff --git a/wokkel/test/helpers.py b/wokkel/test/helpers.py
    a b  
    108108        factory = DummyFactory()
    109109        StreamManager.__init__(self, factory, reactor)
    110110        self.stub = XmlStreamStub()
    111         self._connected(self.stub.xmlstream)
    112         self._authd(self.stub.xmlstream)
     111        self.makeConnection(self.stub.xmlstream)
     112        self.connectionInitialized(self.stub.xmlstream)
  • wokkel/test/test_client.py

    diff --git a/wokkel/test/test_client.py b/wokkel/test/test_client.py
    a b  
    3838        """
    3939        xs = self.client.factory.buildProtocol(None)
    4040        self.client.factory.authenticator.jid = JID('user@example.org/test')
    41         xs.dispatch(xs, xmlstream.STREAM_AUTHD_EVENT)
     41        self.client.connectionInitialized(xs)
    4242        self.assertEquals(JID('user@example.org/test'), self.client.jid)
    4343
    4444
  • wokkel/test/test_subprotocols.py

    diff --git a/wokkel/test/test_subprotocols.py b/wokkel/test/test_subprotocols.py
    a b  
    189189
    190190
    191191
    192 class StreamManagerTest(unittest.TestCase):
    193     """
    194     Tests for L{subprotocols.StreamManager}.
    195     """
    196 
    197     def setUp(self):
    198         factory = xmlstream.XmlStreamFactory(xmlstream.Authenticator())
    199         self.clock = task.Clock()
    200         self.streamManager = subprotocols.StreamManager(factory, self.clock)
    201         self.xmlstream = factory.buildProtocol(None)
    202         self.transport = proto_helpers.StringTransport()
    203         self.xmlstream.transport = self.transport
    204 
    205         self.request = IQGetStanza()
     192class BaseStreamManagerTestsMixin(object):
    206193
    207194    def _streamStarted(self):
    208195        """
     
    216203        self.xmlstream.dispatch(self.xmlstream, "//event/stream/authd")
    217204
    218205
    219     def test_basic(self):
     206    def test_interface(self):
    220207        """
    221         Test correct initialization and setup of factory observers.
     208        ServerStreamManager implements IXMPPHandlerCollection.
    222209        """
    223         factory = DummyFactory()
    224         sm = subprotocols.StreamManager(factory)
    225         self.assertIdentical(None, sm.xmlstream)
    226         self.assertEquals([], sm.handlers)
    227         self.assertEquals(sm._connected,
    228                           sm.factory.callbacks['//event/stream/connected'])
    229         self.assertEquals(sm._authd,
    230                           sm.factory.callbacks['//event/stream/authd'])
    231         self.assertEquals(sm._disconnected,
    232                           sm.factory.callbacks['//event/stream/end'])
    233         self.assertEquals(sm.initializationFailed,
    234                           sm.factory.callbacks['//event/xmpp/initfailed'])
     210        verifyObject(ijabber.IXMPPHandlerCollection, self.streamManager)
    235211
    236212
    237     def test_connected(self):
     213    def test_makeConnection(self):
    238214        """
    239215        Test that protocol handlers have their connectionMade method called
    240216        when the XML stream is connected.
     
    242218        sm = self.streamManager
    243219        handler = DummyXMPPHandler()
    244220        handler.setHandlerParent(sm)
    245         xs = xmlstream.XmlStream(xmlstream.Authenticator())
    246         sm._connected(xs)
     221        xs = self.xmlstream
     222        sm.makeConnection(xs)
    247223        self.assertEquals(1, handler.doneMade)
    248224        self.assertEquals(0, handler.doneInitialized)
    249225        self.assertEquals(0, handler.doneLost)
    250226
    251227
    252     def test_connectedLogTrafficFalse(self):
     228    def test_makeConnectionLogTrafficFalse(self):
    253229        """
    254230        Test raw data functions unset when logTraffic is set to False.
    255231        """
    256232        sm = self.streamManager
    257233        handler = DummyXMPPHandler()
    258234        handler.setHandlerParent(sm)
    259         xs = xmlstream.XmlStream(xmlstream.Authenticator())
    260         sm._connected(xs)
     235        xs = self.xmlstream
     236        sm.makeConnection(xs)
    261237        self.assertIdentical(None, xs.rawDataInFn)
    262238        self.assertIdentical(None, xs.rawDataOutFn)
    263239
    264240
    265     def test_connectedLogTrafficTrue(self):
     241    def test_makeConnectionLogTrafficTrue(self):
    266242        """
    267243        Test raw data functions set when logTraffic is set to True.
    268244        """
     
    270246        sm.logTraffic = True
    271247        handler = DummyXMPPHandler()
    272248        handler.setHandlerParent(sm)
    273         xs = xmlstream.XmlStream(xmlstream.Authenticator())
    274         sm._connected(xs)
     249        xs = self.xmlstream
     250        sm.makeConnection(xs)
    275251        self.assertNotIdentical(None, xs.rawDataInFn)
    276252        self.assertNotIdentical(None, xs.rawDataOutFn)
    277253
    278254
    279     def test_authd(self):
     255    def test_connectionInitialized(self):
    280256        """
    281257        Test that protocol handlers have their connectionInitialized method
    282258        called when the XML stream is initialized.
     
    284260        sm = self.streamManager
    285261        handler = DummyXMPPHandler()
    286262        handler.setHandlerParent(sm)
    287         xs = xmlstream.XmlStream(xmlstream.Authenticator())
    288         sm._authd(xs)
     263        xs = self.xmlstream
     264        sm.connectionInitialized(xs)
    289265        self.assertEquals(0, handler.doneMade)
    290266        self.assertEquals(1, handler.doneInitialized)
    291267        self.assertEquals(0, handler.doneLost)
    292268
    293269
    294     def test_disconnected(self):
     270    def test_connectionLost(self):
    295271        """
    296272        Protocol handlers have connectionLost called on stream disconnect.
    297273        """
    298274        sm = self.streamManager
    299275        handler = DummyXMPPHandler()
    300276        handler.setHandlerParent(sm)
    301         sm._disconnected(None)
     277        sm.connectionLost(None)
    302278        self.assertEquals(0, handler.doneMade)
    303279        self.assertEquals(0, handler.doneInitialized)
    304280        self.assertEquals(1, handler.doneLost)
    305281
    306282
    307     def test_disconnectedReason(self):
     283    def test_connectionLostReason(self):
    308284        """
    309285        A L{STREAM_END_EVENT} results in L{StreamManager} firing the handlers
    310286        L{connectionLost} methods, passing a L{failure.Failure} reason.
     
    313289        handler = FailureReasonXMPPHandler()
    314290        handler.setHandlerParent(sm)
    315291        xmlstream.XmlStream(xmlstream.Authenticator())
    316         sm._disconnected(failure.Failure(Exception("no reason")))
     292        sm.connectionLost(failure.Failure(Exception("no reason")))
    317293        self.assertEquals(True, handler.gotFailureReason)
    318294
    319295
     
    335311        Adding a handler when connected doesn't call connectionInitialized.
    336312        """
    337313        sm = self.streamManager
    338         xs = xmlstream.XmlStream(xmlstream.Authenticator())
    339         sm._connected(xs)
     314        xs = self.xmlstream
     315        sm.makeConnection(xs)
    340316        handler = DummyXMPPHandler()
    341317        handler.setHandlerParent(sm)
    342318
     
    358334                self.nestedHandler.setHandlerParent(self.parent)
    359335
    360336        sm = self.streamManager
    361         xs = xmlstream.XmlStream(xmlstream.Authenticator())
     337        xs = self.xmlstream
    362338        handler = NestingHandler()
    363339        handler.setHandlerParent(sm)
    364         sm._connected(xs)
     340        sm.makeConnection(xs)
    365341
    366342        self.assertEquals(1, handler.doneMade)
    367343        self.assertEquals(0, handler.doneInitialized)
     
    383359        called.
    384360        """
    385361        sm = self.streamManager
    386         xs = xmlstream.XmlStream(xmlstream.Authenticator())
    387         sm._connected(xs)
    388         sm._authd(xs)
     362        xs = self.xmlstream
     363        sm.makeConnection(xs)
     364        sm.connectionInitialized(xs)
    389365        handler = DummyXMPPHandler()
    390366        handler.setHandlerParent(sm)
    391367
     
    407383                self.nestedHandler.setHandlerParent(self.parent)
    408384
    409385        sm = self.streamManager
    410         xs = xmlstream.XmlStream(xmlstream.Authenticator())
     386        xs = self.xmlstream
    411387        handler = NestingHandler()
    412388        handler.setHandlerParent(sm)
    413         sm._connected(xs)
    414         sm._authd(xs)
     389        sm.makeConnection(xs)
     390        sm.connectionInitialized(xs)
    415391
    416392        self.assertEquals(1, handler.doneMade)
    417393        self.assertEquals(1, handler.doneInitialized)
     
    435411                self.nestedHandler.setHandlerParent(self.parent)
    436412
    437413        sm = self.streamManager
    438         xs = xmlstream.XmlStream(xmlstream.Authenticator())
     414        xs = self.xmlstream
    439415        handler = NestingHandler()
    440416        handler.setHandlerParent(sm)
    441         sm._connected(xs)
    442         sm._authd(xs)
    443         sm._disconnected(xs)
     417        sm.makeConnection(xs)
     418        sm.connectionInitialized(xs)
     419        sm.connectionLost(xs)
    444420
    445421        self.assertEquals(1, handler.doneMade)
    446422        self.assertEquals(1, handler.doneInitialized)
     
    470446
    471447        The data should be sent directly over the XML stream.
    472448        """
    473         factory = xmlstream.XmlStreamFactory(xmlstream.Authenticator())
    474         sm = subprotocols.StreamManager(factory)
    475         xs = factory.buildProtocol(None)
    476         xs.transport = proto_helpers.StringTransport()
     449        xs = self.xmlstream
    477450        xs.connectionMade()
    478451        xs.dataReceived("<stream:stream xmlns='jabber:client' "
    479452                        "xmlns:stream='http://etherx.jabber.org/streams' "
    480453                        "from='example.com' id='12345'>")
    481454        xs.dispatch(xs, "//event/stream/authd")
    482         sm.send("<presence/>")
     455        self.streamManager.send("<presence/>")
    483456        self.assertEquals("<presence/>", xs.transport.value())
    484457
    485458
     
    490463        The data should be cached until an XML stream has been established and
    491464        initialized.
    492465        """
    493         factory = xmlstream.XmlStreamFactory(xmlstream.Authenticator())
    494         sm = subprotocols.StreamManager(factory)
     466        sm = self.streamManager
    495467        handler = DummyXMPPHandler()
    496468        sm.addHandler(handler)
    497469
    498         xs = factory.buildProtocol(None)
     470        xs = self.xmlstream
    499471        xs.transport = proto_helpers.StringTransport()
    500472        sm.send("<presence/>")
    501473        self.assertEquals("", xs.transport.value())
     
    522494        """
    523495        factory = xmlstream.XmlStreamFactory(xmlstream.Authenticator())
    524496        sm = subprotocols.StreamManager(factory)
    525         xs = factory.buildProtocol(None)
     497        xs = self.xmlstream
    526498        xs.transport = proto_helpers.StringTransport()
    527499        xs.connectionMade()
    528500        xs.dataReceived("<stream:stream xmlns='jabber:client' "
     
    545517        handler = DummyXMPPHandler()
    546518        sm.addHandler(handler)
    547519
    548         xs = factory.buildProtocol(None)
     520        xs = self.xmlstream
    549521        xs.connectionMade()
    550522        xs.transport = proto_helpers.StringTransport()
    551523        xs.connectionLost(None)
     
    677649        """
    678650        d = self.streamManager.request(self.request)
    679651        xs = self.xmlstream
     652        xs.connectionMade()
    680653        xs.connectionLost(failure.Failure(ConnectionDone()))
    681654        self.assertFailure(d, ConnectionDone)
    682655        return d
     
    692665            d = xmlstream.IQ(self.xmlstream).send()
    693666            d.addErrback(eb)
    694667
     668        self.xmlstream.connectionMade()
    695669        d = self.streamManager.request(self.request)
    696670        d.addErrback(eb)
    697671        self.xmlstream.connectionLost(failure.Failure(ConnectionDone()))
     
    736710        self.request.timeout = 60
    737711        d = self.streamManager.request(self.request)
    738712
     713        self.xmlstream.connectionMade()
    739714        self.xmlstream.connectionLost(failure.Failure(ConnectionDone()))
    740715        self.assertFailure(d, ConnectionDone)
    741716        self.assertFalse(self.clock.calls)
     
    778753
    779754
    780755
     756class StreamManagerTest(unittest.TestCase, BaseStreamManagerTestsMixin):
     757    """
     758    Tests for L{subprotocols.StreamManager}.
     759    """
     760
     761    def setUp(self):
     762        factory = xmlstream.XmlStreamFactory(xmlstream.Authenticator())
     763        self.clock = task.Clock()
     764        self.streamManager = subprotocols.StreamManager(factory, self.clock)
     765        self.xmlstream = factory.buildProtocol(None)
     766        self.transport = proto_helpers.StringTransport()
     767        self.xmlstream.transport = self.transport
     768
     769        self.request = IQGetStanza()
     770
     771
     772    def test_basic(self):
     773        """
     774        Test correct initialization and setup of factory observers.
     775        """
     776        factory = DummyFactory()
     777        sm = subprotocols.StreamManager(factory)
     778        self.assertIdentical(None, sm.xmlstream)
     779        self.assertEquals([], sm.handlers)
     780        self.assertEquals(sm.makeConnection,
     781                          sm.factory.callbacks['//event/stream/connected'])
     782
     783
     784
     785class ServerStreamManagerTest(unittest.TestCase, BaseStreamManagerTestsMixin):
     786    """
     787    Tests for L{subprotocols.StreamManager}.
     788    """
     789
     790    def setUp(self):
     791        self.clock = task.Clock()
     792        self.streamManager = subprotocols.ServerStreamManager(
     793            reactor=self.clock)
     794
     795        self.xmlstream = xmlstream.XmlStream(xmlstream.Authenticator())
     796        self.transport = proto_helpers.StringTransport()
     797        self.xmlstream.transport = self.transport
     798
     799        self.xmlstream.addObserver(xmlstream.STREAM_CONNECTED_EVENT,
     800                                   self.streamManager.makeConnection)
     801
     802        self.request = IQGetStanza()
     803
     804
     805
    781806class DummyIQHandler(subprotocols.IQHandlerMixin):
    782807    iqHandlers = {'/iq[@type="get"]': 'onGet'}
    783808
Note: See TracBrowser for help on using the repository browser.