1 | # Copyright (c) Ralph Meijer. |
---|
2 | # See LICENSE for details. |
---|
3 | |
---|
4 | """ |
---|
5 | Unit test helpers. |
---|
6 | """ |
---|
7 | |
---|
8 | from __future__ import division, absolute_import |
---|
9 | |
---|
10 | from twisted.internet import defer |
---|
11 | from twisted.python.compat import iteritems |
---|
12 | from twisted.words.xish import xpath |
---|
13 | from twisted.words.xish.utility import EventDispatcher |
---|
14 | |
---|
15 | from wokkel.generic import parseXml |
---|
16 | from wokkel.subprotocols import StreamManager |
---|
17 | |
---|
18 | class XmlStreamStub(object): |
---|
19 | """ |
---|
20 | Stub for testing objects that communicate through XML Streams. |
---|
21 | |
---|
22 | Instances of this stub hold an object in L{xmlstream} that acts like an |
---|
23 | L{XmlStream<twisted.words.xish.xmlstream.XmlStream} after connection stream |
---|
24 | initialization. Stanzas can be sent through it by calling its C{send} |
---|
25 | method with an object implementing |
---|
26 | L{IElement<twisted.words.xish.domish.IElement>} as its first argument. |
---|
27 | These appear in sequence in the L{output} instance variable of the stub. |
---|
28 | |
---|
29 | For the reverse direction, stanzas passed to L{send} of the stub, will be |
---|
30 | dispatched in the stubbed XmlStream as if it was received over the wire, so |
---|
31 | that registered observers will be called. |
---|
32 | |
---|
33 | Example:: |
---|
34 | |
---|
35 | >>> stub = XmlStreamStub() |
---|
36 | >>> stub.xmlstream.send(domish.Element((None, 'presence'))) |
---|
37 | >>> stub.output[-1].toXml() |
---|
38 | u'<presence/>' |
---|
39 | >>> def cb(stanza): |
---|
40 | ... print("Got: %r" stanza.toXml()) |
---|
41 | >>> stub.xmlstream.addObserver('/presence') |
---|
42 | >>> stub.send(domish.Element((None, 'presence'))) |
---|
43 | Got: u'<presence/>' |
---|
44 | |
---|
45 | @ivar xmlstream: Stubbed XML Stream. |
---|
46 | @type xmlstream: L{EventDispatcher} |
---|
47 | @ivar output: List of stanzas sent to the XML Stream. |
---|
48 | @type output: L{list} |
---|
49 | """ |
---|
50 | |
---|
51 | def __init__(self): |
---|
52 | self.output = [] |
---|
53 | self.xmlstream = EventDispatcher() |
---|
54 | self.xmlstream.send = self.output.append |
---|
55 | |
---|
56 | def send(self, obj): |
---|
57 | """ |
---|
58 | Pass an element to the XML Stream as if received. |
---|
59 | |
---|
60 | @param obj: Element to be dispatched to C{self.xmlstream}. |
---|
61 | @type obj: object implementing |
---|
62 | L{IElement<twisted.words.xish.domish.IElement>}. |
---|
63 | """ |
---|
64 | self.xmlstream.dispatch(obj) |
---|
65 | |
---|
66 | |
---|
67 | class TestableRequestHandlerMixin(object): |
---|
68 | """ |
---|
69 | Mixin for testing XMPPHandlers that process iq requests. |
---|
70 | |
---|
71 | Handlers that use L{wokkel.subprotocols.IQHandlerMixin} define a |
---|
72 | C{iqHandlers} attribute that lists the handlers to be called for iq |
---|
73 | requests. This mixin provides L{handleRequest} to mimic the handler |
---|
74 | processing for easier testing. |
---|
75 | """ |
---|
76 | |
---|
77 | def handleRequest(self, xml): |
---|
78 | """ |
---|
79 | Find a handler and call it directly. |
---|
80 | |
---|
81 | @param xml: XML stanza that may yield a handler being called. |
---|
82 | @type xml: C{str}. |
---|
83 | @return: Deferred that fires with the result of a handler for this |
---|
84 | stanza. If no handler was found, the deferred has its errback |
---|
85 | called with a C{NotImplementedError} exception. |
---|
86 | """ |
---|
87 | handler = None |
---|
88 | iq = parseXml(xml) |
---|
89 | for queryString, method in iteritems(self.service.iqHandlers): |
---|
90 | if xpath.internQuery(queryString).matches(iq): |
---|
91 | handler = getattr(self.service, method) |
---|
92 | |
---|
93 | if handler: |
---|
94 | d = defer.maybeDeferred(handler, iq) |
---|
95 | else: |
---|
96 | d = defer.fail(NotImplementedError()) |
---|
97 | |
---|
98 | return d |
---|
99 | |
---|
100 | |
---|
101 | class TestableStreamManager(StreamManager): |
---|
102 | """ |
---|
103 | Stream manager for testing subprotocol handlers. |
---|
104 | """ |
---|
105 | |
---|
106 | def __init__(self, reactor=None): |
---|
107 | class DummyFactory(object): |
---|
108 | def addBootstrap(self, event, fn): |
---|
109 | pass |
---|
110 | |
---|
111 | factory = DummyFactory() |
---|
112 | StreamManager.__init__(self, factory, reactor) |
---|
113 | self.stub = XmlStreamStub() |
---|
114 | self._connected(self.stub.xmlstream) |
---|
115 | self._authd(self.stub.xmlstream) |
---|