source: wokkel/test/test_pubsub.py @ 13:b460ae320c23

Last change on this file since 13:b460ae320c23 was 13:b460ae320c23, checked in by Ralph Meijer <ralphm@…>, 14 years ago

Add support for receiving notifications for item retraction, node deletion and purging.

File size: 6.7 KB
Line 
1# Copyright (c) 2003-2008 Ralph Meijer
2# See LICENSE for details.
3
4"""
5Tests for L{wokkel.pubsub}
6"""
7
8from twisted.trial import unittest
9from twisted.python import failure
10from twisted.internet import defer, reactor
11from twisted.words.xish import domish
12from twisted.words.protocols.jabber import error
13from twisted.words.protocols.jabber.jid import JID
14
15from wokkel import pubsub
16from wokkel.test.helpers import XmlStreamStub
17
18try:
19    from twisted.words.protocols.jabber.xmlstream import toResponse
20except ImportError:
21    from wokkel.compat import toResponse
22
23NS_PUBSUB = 'http://jabber.org/protocol/pubsub'
24NS_PUBSUB_ERRORS = 'http://jabber.org/protocol/pubsub#errors'
25NS_PUBSUB_EVENT = 'http://jabber.org/protocol/pubsub#event'
26
27def calledAsync(fn):
28    """
29    Function wrapper that fires a deferred upon calling the given function.
30    """
31    d = defer.Deferred()
32
33    def func(*args, **kwargs):
34        try:
35            result = fn(*args, **kwargs)
36        except:
37            d.errback()
38        else:
39            d.callback(result)
40
41    return d, func
42
43
44class PubSubClientTest(unittest.TestCase):
45    timeout = 2
46
47    def setUp(self):
48        self.stub = XmlStreamStub()
49        self.protocol = pubsub.PubSubClient()
50        self.protocol.xmlstream = self.stub.xmlstream
51        self.protocol.connectionInitialized()
52
53    def test_unsubscribe(self):
54        """
55        Test sending unsubscription request.
56        """
57        d = self.protocol.unsubscribe(JID('pubsub.example.org'), 'test',
58                                      JID('user@example.org'))
59
60        iq = self.stub.output[-1]
61        self.assertEquals('pubsub.example.org', iq.getAttribute('to'))
62        self.assertEquals('set', iq.getAttribute('type'))
63        self.assertEquals('pubsub', iq.pubsub.name)
64        self.assertEquals(NS_PUBSUB, iq.pubsub.uri)
65        children = list(domish.generateElementsQNamed(iq.pubsub.children,
66                                                      'unsubscribe', NS_PUBSUB))
67        self.assertEquals(1, len(children))
68        child = children[0]
69        self.assertEquals('test', child['node'])
70        self.assertEquals('user@example.org', child['jid'])
71
72        self.stub.send(toResponse(iq, 'result'))
73        return d
74
75    def test_event_items(self):
76        """
77        Test receiving an items event resulting in a call to itemsReceived.
78        """
79        message = domish.Element((None, 'message'))
80        message['from'] = 'pubsub.example.org'
81        message['to'] = 'user@example.org/home'
82        event = message.addElement((NS_PUBSUB_EVENT, 'event'))
83        items = event.addElement('items')
84        items['node'] = 'test'
85        item1 = items.addElement('item')
86        item1['id'] = 'item1'
87        item2 = items.addElement('retract')
88        item2['id'] = 'item2'
89        item3 = items.addElement('item')
90        item3['id'] = 'item3'
91
92        def itemsReceived(recipient, service, nodeIdentifier, items):
93            self.assertEquals(JID('user@example.org/home'), recipient)
94            self.assertEquals(JID('pubsub.example.org'), service)
95            self.assertEquals('test', nodeIdentifier)
96            self.assertEquals([item1, item2, item3], items)
97
98        d, self.protocol.itemsReceived = calledAsync(itemsReceived)
99        self.stub.send(message)
100        return d
101
102    def test_event_delete(self):
103        """
104        Test receiving a delete event resulting in a call to deleteReceived.
105        """
106        message = domish.Element((None, 'message'))
107        message['from'] = 'pubsub.example.org'
108        message['to'] = 'user@example.org/home'
109        event = message.addElement((NS_PUBSUB_EVENT, 'event'))
110        items = event.addElement('delete')
111        items['node'] = 'test'
112
113        def deleteReceived(recipient, service, nodeIdentifier):
114            self.assertEquals(JID('user@example.org/home'), recipient)
115            self.assertEquals(JID('pubsub.example.org'), service)
116            self.assertEquals('test', nodeIdentifier)
117
118        d, self.protocol.deleteReceived = calledAsync(deleteReceived)
119        self.stub.send(message)
120        return d
121
122    def test_event_purge(self):
123        """
124        Test receiving a purge event resulting in a call to purgeReceived.
125        """
126        message = domish.Element((None, 'message'))
127        message['from'] = 'pubsub.example.org'
128        message['to'] = 'user@example.org/home'
129        event = message.addElement((NS_PUBSUB_EVENT, 'event'))
130        items = event.addElement('purge')
131        items['node'] = 'test'
132
133        def purgeReceived(recipient, service, nodeIdentifier):
134            self.assertEquals(JID('user@example.org/home'), recipient)
135            self.assertEquals(JID('pubsub.example.org'), service)
136            self.assertEquals('test', nodeIdentifier)
137
138        d, self.protocol.purgeReceived = calledAsync(purgeReceived)
139        self.stub.send(message)
140        return d
141
142class PubSubServiceTest(unittest.TestCase):
143
144    def setUp(self):
145        self.output = []
146
147    def send(self, obj):
148        self.output.append(obj)
149
150    def test_onPublishNoNode(self):
151        handler = pubsub.PubSubService()
152        handler.parent = self
153        iq = domish.Element((None, 'iq'))
154        iq['from'] = 'user@example.org'
155        iq['to'] = 'pubsub.example.org'
156        iq['type'] = 'set'
157        iq.addElement((NS_PUBSUB, 'pubsub'))
158        iq.pubsub.addElement('publish')
159        handler.handleRequest(iq)
160
161        e = error.exceptionFromStanza(self.output[-1])
162        self.assertEquals('bad-request', e.condition)
163
164    def test_onPublish(self):
165        class Handler(pubsub.PubSubService):
166            def publish(self, *args, **kwargs):
167                self.args = args
168                self.kwargs = kwargs
169
170        handler = Handler()
171        handler.parent = self
172        iq = domish.Element((None, 'iq'))
173        iq['type'] = 'set'
174        iq['from'] = 'user@example.org'
175        iq['to'] = 'pubsub.example.org'
176        iq.addElement((NS_PUBSUB, 'pubsub'))
177        iq.pubsub.addElement('publish')
178        iq.pubsub.publish['node'] = 'test'
179        handler.handleRequest(iq)
180
181        self.assertEqual((JID('user@example.org'),
182                          JID('pubsub.example.org'), 'test', []), handler.args)
183
184    def test_onOptionsGet(self):
185        handler = pubsub.PubSubService()
186        handler.parent = self
187        iq = domish.Element((None, 'iq'))
188        iq['from'] = 'user@example.org'
189        iq['to'] = 'pubsub.example.org'
190        iq['type'] = 'get'
191        iq.addElement((NS_PUBSUB, 'pubsub'))
192        iq.pubsub.addElement('options')
193        handler.handleRequest(iq)
194
195        e = error.exceptionFromStanza(self.output[-1])
196        self.assertEquals('feature-not-implemented', e.condition)
197        self.assertEquals('unsupported', e.appCondition.name)
198        self.assertEquals(NS_PUBSUB_ERRORS, e.appCondition.uri)
Note: See TracBrowser for help on using the repository browser.