1 | # Copyright (c) Ralph Meijer. |
---|
2 | # See LICENSE for details. |
---|
3 | |
---|
4 | """ |
---|
5 | Unit test helpers. |
---|
6 | """ |
---|
7 | |
---|
8 | from twisted.internet import defer, task |
---|
9 | from twisted.words.xish import xpath |
---|
10 | from twisted.words.xish.utility import EventDispatcher |
---|
11 | |
---|
12 | from wokkel.generic import parseXml |
---|
13 | from wokkel.subprotocols import StreamManager |
---|
14 | |
---|
15 | class XmlStreamStub(object): |
---|
16 | """ |
---|
17 | Stub for testing objects that communicate through XML Streams. |
---|
18 | |
---|
19 | Instances of this stub hold an object in L{xmlstream} that acts like an |
---|
20 | L{XmlStream<twisted.words.xish.xmlstream.XmlStream} after connection stream |
---|
21 | initialization. Stanzas can be sent through it by calling its C{send} |
---|
22 | method with an object implementing |
---|
23 | L{IElement<twisted.words.xish.domish.IElement>} as its first argument. |
---|
24 | These appear in sequence in the L{output} instance variable of the stub. |
---|
25 | |
---|
26 | For the reverse direction, stanzas passed to L{send} of the stub, will be |
---|
27 | dispatched in the stubbed XmlStream as if it was received over the wire, so |
---|
28 | that registered observers will be called. |
---|
29 | |
---|
30 | Example: |
---|
31 | |
---|
32 | >>> stub = XmlStreamStub() |
---|
33 | >>> stub.xmlstream.send(domish.Element((None, 'presence'))) |
---|
34 | >>> stub.output[-1].toXml() |
---|
35 | u'<presence/>' |
---|
36 | >>> def cb(stanza): |
---|
37 | ... print "Got: %r" stanza.toXml() |
---|
38 | >>> stub.xmlstream.addObserver('/presence') |
---|
39 | >>> stub.send(domish.Element((None, 'presence'))) |
---|
40 | Got: u'<presence/>' |
---|
41 | |
---|
42 | @ivar xmlstream: Stubbed XML Stream. |
---|
43 | @type xmlstream: L{EventDispatcher} |
---|
44 | @ivar output: List of stanzas sent to the XML Stream. |
---|
45 | @type output: L{list} |
---|
46 | """ |
---|
47 | |
---|
48 | def __init__(self): |
---|
49 | self.output = [] |
---|
50 | self.xmlstream = EventDispatcher() |
---|
51 | self.xmlstream.send = self.output.append |
---|
52 | |
---|
53 | def send(self, obj): |
---|
54 | """ |
---|
55 | Pass an element to the XML Stream as if received. |
---|
56 | |
---|
57 | @param obj: Element to be dispatched to C{self.xmlstream}. |
---|
58 | @type obj: object implementing |
---|
59 | L{IElement<twisted.words.xish.domish.IElement>}. |
---|
60 | """ |
---|
61 | self.xmlstream.dispatch(obj) |
---|
62 | |
---|
63 | |
---|
64 | class TestableRequestHandlerMixin(object): |
---|
65 | """ |
---|
66 | Mixin for testing XMPPHandlers that process iq requests. |
---|
67 | |
---|
68 | Handlers that use L{wokkel.subprotocols.IQHandlerMixin} define a |
---|
69 | C{iqHandlers} attribute that lists the handlers to be called for iq |
---|
70 | requests. This mixin provides L{handleRequest} to mimic the handler |
---|
71 | processing for easier testing. |
---|
72 | """ |
---|
73 | |
---|
74 | def handleRequest(self, xml): |
---|
75 | """ |
---|
76 | Find a handler and call it directly. |
---|
77 | |
---|
78 | @param xml: XML stanza that may yield a handler being called. |
---|
79 | @type xml: C{str}. |
---|
80 | @return: Deferred that fires with the result of a handler for this |
---|
81 | stanza. If no handler was found, the deferred has its errback |
---|
82 | called with a C{NotImplementedError} exception. |
---|
83 | """ |
---|
84 | handler = None |
---|
85 | iq = parseXml(xml) |
---|
86 | for queryString, method in self.service.iqHandlers.iteritems(): |
---|
87 | if xpath.internQuery(queryString).matches(iq): |
---|
88 | handler = getattr(self.service, method) |
---|
89 | |
---|
90 | if handler: |
---|
91 | d = defer.maybeDeferred(handler, iq) |
---|
92 | else: |
---|
93 | d = defer.fail(NotImplementedError()) |
---|
94 | |
---|
95 | return d |
---|
96 | |
---|
97 | |
---|
98 | class TestableStreamManager(StreamManager): |
---|
99 | """ |
---|
100 | Stream manager for testing subprotocol handlers. |
---|
101 | """ |
---|
102 | |
---|
103 | def __init__(self, reactor=None): |
---|
104 | class DummyFactory(object): |
---|
105 | def addBootstrap(self, event, fn): |
---|
106 | pass |
---|
107 | |
---|
108 | factory = DummyFactory() |
---|
109 | StreamManager.__init__(self, factory, reactor) |
---|
110 | self.stub = XmlStreamStub() |
---|
111 | self._connected(self.stub.xmlstream) |
---|
112 | self._authd(self.stub.xmlstream) |
---|