source: wokkel/test/helpers.py @ 147:aa2cfee614e7

wokkel-muc-client-support-24
Last change on this file since 147:aa2cfee614e7 was 147:aa2cfee614e7, checked in by Ralph Meijer <ralphm@…>, 9 years ago

Use generic.Request for room destruction request, add TestableStreamManager?.

  • Property exe set to *
File size: 3.6 KB
Line 
1# Copyright (c) Ralph Meijer.
2# See LICENSE for details.
3
4"""
5Unit test helpers.
6"""
7
8from twisted.internet import defer, task
9from twisted.words.xish import xpath
10from twisted.words.xish.utility import EventDispatcher
11
12from wokkel.generic import parseXml
13from wokkel.subprotocols import StreamManager
14
15class XmlStreamStub(object):
16    """
17    Stub for testing objects that communicate through XML Streams.
18
19    Instances of this stub hold an object in L{xmlstream} that acts like an
20    L{XmlStream<twisted.words.xish.xmlstream.XmlStream} after connection stream
21    initialization. Stanzas can be sent through it by calling its C{send}
22    method with an object implementing
23    L{IElement<twisted.words.xish.domish.IElement>} as its first argument.
24    These appear in sequence in the L{output} instance variable of the stub.
25
26    For the reverse direction, stanzas passed to L{send} of the stub, will be
27    dispatched in the stubbed XmlStream as if it was received over the wire, so
28    that registered observers will be called.
29
30    Example:
31
32        >>> stub = XmlStreamStub()
33        >>> stub.xmlstream.send(domish.Element((None, 'presence')))
34        >>> stub.output[-1].toXml()
35        u'<presence/>'
36        >>> def cb(stanza):
37        ...     print "Got: %r" stanza.toXml()
38        >>> stub.xmlstream.addObserver('/presence')
39        >>> stub.send(domish.Element((None, 'presence')))
40        Got: u'<presence/>'
41
42    @ivar xmlstream: Stubbed XML Stream.
43    @type xmlstream: L{EventDispatcher}
44    @ivar output: List of stanzas sent to the XML Stream.
45    @type output: L{list}
46    """
47
48    def __init__(self):
49        self.output = []
50        self.xmlstream = EventDispatcher()
51        self.xmlstream.send = self.output.append
52
53    def send(self, obj):
54        """
55        Pass an element to the XML Stream as if received.
56
57        @param obj: Element to be dispatched to C{self.xmlstream}.
58        @type obj: object implementing
59                   L{IElement<twisted.words.xish.domish.IElement>}.
60        """
61        self.xmlstream.dispatch(obj)
62
63
64class TestableRequestHandlerMixin(object):
65    """
66    Mixin for testing XMPPHandlers that process iq requests.
67
68    Handlers that use L{wokkel.subprotocols.IQHandlerMixin} define a
69    C{iqHandlers} attribute that lists the handlers to be called for iq
70    requests. This mixin provides L{handleRequest} to mimic the handler
71    processing for easier testing.
72    """
73
74    def handleRequest(self, xml):
75        """
76        Find a handler and call it directly.
77
78        @param xml: XML stanza that may yield a handler being called.
79        @type xml: C{str}.
80        @return: Deferred that fires with the result of a handler for this
81                 stanza. If no handler was found, the deferred has its errback
82                 called with a C{NotImplementedError} exception.
83        """
84        handler = None
85        iq = parseXml(xml)
86        for queryString, method in self.service.iqHandlers.iteritems():
87            if xpath.internQuery(queryString).matches(iq):
88                handler = getattr(self.service, method)
89
90        if handler:
91            d = defer.maybeDeferred(handler, iq)
92        else:
93            d = defer.fail(NotImplementedError())
94
95        return d
96
97
98class TestableStreamManager(StreamManager):
99    """
100    Stream manager for testing subprotocol handlers.
101    """
102
103    def __init__(self, reactor=None):
104        class DummyFactory(object):
105            def addBootstrap(self, event, fn):
106                pass
107
108        factory = DummyFactory()
109        StreamManager.__init__(self, factory, reactor)
110        self.stub = XmlStreamStub()
111        self._connected(self.stub.xmlstream)
112        self._authd(self.stub.xmlstream)
Note: See TracBrowser for help on using the repository browser.