1 | # Copyright (c) 2001-2007 Twisted Matrix Laboratories. |
---|
2 | # Copyright (c) 2008 Ralph Meijer |
---|
3 | # See LICENSE for details. |
---|
4 | |
---|
5 | """ |
---|
6 | Tests for L{wokkel.compat}. |
---|
7 | """ |
---|
8 | |
---|
9 | from zope.interface.verify import verifyObject |
---|
10 | from twisted.internet import defer, protocol |
---|
11 | from twisted.internet.interfaces import IProtocolFactory |
---|
12 | from twisted.trial import unittest |
---|
13 | from twisted.words.xish import domish, utility |
---|
14 | from twisted.words.protocols.jabber import xmlstream |
---|
15 | from wokkel.compat import toResponse, BootstrapMixin, XmlStreamServerFactory |
---|
16 | |
---|
17 | class DummyProtocol(protocol.Protocol, utility.EventDispatcher): |
---|
18 | """ |
---|
19 | I am a protocol with an event dispatcher without further processing. |
---|
20 | |
---|
21 | This protocol is only used for testing BootstrapMixin to make |
---|
22 | sure the bootstrap observers are added to the protocol instance. |
---|
23 | """ |
---|
24 | |
---|
25 | def __init__(self, *args, **kwargs): |
---|
26 | self.args = args |
---|
27 | self.kwargs = kwargs |
---|
28 | self.observers = [] |
---|
29 | |
---|
30 | utility.EventDispatcher.__init__(self) |
---|
31 | |
---|
32 | |
---|
33 | |
---|
34 | class BootstrapMixinTest(unittest.TestCase): |
---|
35 | """ |
---|
36 | Tests for L{BootstrapMixin}. |
---|
37 | |
---|
38 | @ivar factory: Instance of the factory or mixin under test. |
---|
39 | """ |
---|
40 | |
---|
41 | def setUp(self): |
---|
42 | self.factory = BootstrapMixin() |
---|
43 | |
---|
44 | |
---|
45 | def test_installBootstraps(self): |
---|
46 | """ |
---|
47 | Dispatching an event should fire registered bootstrap observers. |
---|
48 | """ |
---|
49 | called = [] |
---|
50 | |
---|
51 | def cb(data): |
---|
52 | called.append(data) |
---|
53 | |
---|
54 | dispatcher = DummyProtocol() |
---|
55 | self.factory.addBootstrap('//event/myevent', cb) |
---|
56 | self.factory.installBootstraps(dispatcher) |
---|
57 | |
---|
58 | dispatcher.dispatch(None, '//event/myevent') |
---|
59 | self.assertEquals(1, len(called)) |
---|
60 | |
---|
61 | |
---|
62 | def test_addAndRemoveBootstrap(self): |
---|
63 | """ |
---|
64 | Test addition and removal of a bootstrap event handler. |
---|
65 | """ |
---|
66 | |
---|
67 | called = [] |
---|
68 | |
---|
69 | def cb(data): |
---|
70 | called.append(data) |
---|
71 | |
---|
72 | self.factory.addBootstrap('//event/myevent', cb) |
---|
73 | self.factory.removeBootstrap('//event/myevent', cb) |
---|
74 | |
---|
75 | dispatcher = DummyProtocol() |
---|
76 | self.factory.installBootstraps(dispatcher) |
---|
77 | |
---|
78 | dispatcher.dispatch(None, '//event/myevent') |
---|
79 | self.assertFalse(called) |
---|
80 | |
---|
81 | |
---|
82 | |
---|
83 | class ToResponseTest(unittest.TestCase): |
---|
84 | |
---|
85 | def test_toResponse(self): |
---|
86 | """ |
---|
87 | Test that a response stanza is generated with addressing swapped. |
---|
88 | """ |
---|
89 | stanza = domish.Element(('jabber:client', 'iq')) |
---|
90 | stanza['type'] = 'get' |
---|
91 | stanza['to'] = 'user1@example.com' |
---|
92 | stanza['from'] = 'user2@example.com/resource' |
---|
93 | stanza['id'] = 'stanza1' |
---|
94 | response = toResponse(stanza, 'result') |
---|
95 | self.assertNotIdentical(stanza, response) |
---|
96 | self.assertEqual(response['from'], 'user1@example.com') |
---|
97 | self.assertEqual(response['to'], 'user2@example.com/resource') |
---|
98 | self.assertEqual(response['type'], 'result') |
---|
99 | self.assertEqual(response['id'], 'stanza1') |
---|
100 | |
---|
101 | def test_toResponseNoFrom(self): |
---|
102 | """ |
---|
103 | Test that a response is generated from a stanza without a from address. |
---|
104 | """ |
---|
105 | stanza = domish.Element(('jabber:client', 'iq')) |
---|
106 | stanza['type'] = 'get' |
---|
107 | stanza['to'] = 'user1@example.com' |
---|
108 | response = toResponse(stanza) |
---|
109 | self.assertEqual(response['from'], 'user1@example.com') |
---|
110 | self.failIf(response.hasAttribute('to')) |
---|
111 | |
---|
112 | def test_toResponseNoTo(self): |
---|
113 | """ |
---|
114 | Test that a response is generated from a stanza without a to address. |
---|
115 | """ |
---|
116 | stanza = domish.Element(('jabber:client', 'iq')) |
---|
117 | stanza['type'] = 'get' |
---|
118 | stanza['from'] = 'user2@example.com/resource' |
---|
119 | response = toResponse(stanza) |
---|
120 | self.failIf(response.hasAttribute('from')) |
---|
121 | self.assertEqual(response['to'], 'user2@example.com/resource') |
---|
122 | |
---|
123 | def test_toResponseNoAddressing(self): |
---|
124 | """ |
---|
125 | Test that a response is generated from a stanza without any addressing. |
---|
126 | """ |
---|
127 | stanza = domish.Element(('jabber:client', 'message')) |
---|
128 | stanza['type'] = 'chat' |
---|
129 | response = toResponse(stanza) |
---|
130 | self.failIf(response.hasAttribute('to')) |
---|
131 | self.failIf(response.hasAttribute('from')) |
---|
132 | |
---|
133 | def test_noID(self): |
---|
134 | """ |
---|
135 | Test that a proper response is generated without id attribute. |
---|
136 | """ |
---|
137 | stanza = domish.Element(('jabber:client', 'message')) |
---|
138 | response = toResponse(stanza) |
---|
139 | self.failIf(response.hasAttribute('id')) |
---|
140 | |
---|
141 | |
---|
142 | def test_noType(self): |
---|
143 | """ |
---|
144 | Test that a proper response is generated without type attribute. |
---|
145 | """ |
---|
146 | stanza = domish.Element(('jabber:client', 'message')) |
---|
147 | response = toResponse(stanza) |
---|
148 | self.failIf(response.hasAttribute('type')) |
---|
149 | |
---|
150 | |
---|
151 | |
---|
152 | class XmlStreamServerFactoryTest(BootstrapMixinTest): |
---|
153 | """ |
---|
154 | Tests for L{XmlStreamServerFactory}. |
---|
155 | """ |
---|
156 | |
---|
157 | def setUp(self): |
---|
158 | """ |
---|
159 | Set up a server factory with a authenticator factory function. |
---|
160 | """ |
---|
161 | class TestAuthenticator(object): |
---|
162 | def __init__(self): |
---|
163 | self.xmlstreams = [] |
---|
164 | |
---|
165 | def associateWithStream(self, xs): |
---|
166 | self.xmlstreams.append(xs) |
---|
167 | |
---|
168 | def authenticatorFactory(): |
---|
169 | return TestAuthenticator() |
---|
170 | |
---|
171 | self.factory = XmlStreamServerFactory(authenticatorFactory) |
---|
172 | |
---|
173 | |
---|
174 | def test_interface(self): |
---|
175 | """ |
---|
176 | L{XmlStreamServerFactory} is a L{Factory}. |
---|
177 | """ |
---|
178 | verifyObject(IProtocolFactory, self.factory) |
---|
179 | |
---|
180 | |
---|
181 | def test_buildProtocolAuthenticatorInstantiation(self): |
---|
182 | """ |
---|
183 | The authenticator factory should be used to instantiate the |
---|
184 | authenticator and pass it to the protocol. |
---|
185 | |
---|
186 | The default protocol, L{XmlStream} stores the authenticator it is |
---|
187 | passed, and calls its C{associateWithStream} method. so we use that to |
---|
188 | check whether our authenticator factory is used and the protocol |
---|
189 | instance gets an authenticator. |
---|
190 | """ |
---|
191 | xs = self.factory.buildProtocol(None) |
---|
192 | self.assertEquals([xs], xs.authenticator.xmlstreams) |
---|
193 | |
---|
194 | |
---|
195 | def test_buildProtocolXmlStream(self): |
---|
196 | """ |
---|
197 | The protocol factory creates Jabber XML Stream protocols by default. |
---|
198 | """ |
---|
199 | xs = self.factory.buildProtocol(None) |
---|
200 | self.assertIsInstance(xs, xmlstream.XmlStream) |
---|
201 | |
---|
202 | |
---|
203 | def test_buildProtocolTwice(self): |
---|
204 | """ |
---|
205 | Subsequent calls to buildProtocol should result in different instances |
---|
206 | of the protocol, as well as their authenticators. |
---|
207 | """ |
---|
208 | xs1 = self.factory.buildProtocol(None) |
---|
209 | xs2 = self.factory.buildProtocol(None) |
---|
210 | self.assertNotIdentical(xs1, xs2) |
---|
211 | self.assertNotIdentical(xs1.authenticator, xs2.authenticator) |
---|
212 | |
---|
213 | |
---|
214 | def test_buildProtocolInstallsBootstraps(self): |
---|
215 | """ |
---|
216 | The protocol factory installs bootstrap event handlers on the protocol. |
---|
217 | """ |
---|
218 | called = [] |
---|
219 | |
---|
220 | def cb(data): |
---|
221 | called.append(data) |
---|
222 | |
---|
223 | self.factory.addBootstrap('//event/myevent', cb) |
---|
224 | |
---|
225 | xs = self.factory.buildProtocol(None) |
---|
226 | xs.dispatch(None, '//event/myevent') |
---|
227 | |
---|
228 | self.assertEquals(1, len(called)) |
---|
229 | |
---|
230 | |
---|
231 | def test_buildProtocolStoresFactory(self): |
---|
232 | """ |
---|
233 | The protocol factory is saved in the protocol. |
---|
234 | """ |
---|
235 | xs = self.factory.buildProtocol(None) |
---|
236 | self.assertIdentical(self.factory, xs.factory) |
---|