Changeset 13:b460ae320c23 for wokkel/test
- Timestamp:
- Feb 20, 2008, 3:44:44 PM (14 years ago)
- Branch:
- default
- Convert:
- svn:b33ecbfc-034c-dc11-8662-000475d9059e/trunk@41
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
wokkel/test/test_pubsub.py
r10 r13 7 7 8 8 from twisted.trial import unittest 9 from twisted.python import failure 10 from twisted.internet import defer, reactor 9 11 from twisted.words.xish import domish 10 12 from twisted.words.protocols.jabber import error … … 21 23 NS_PUBSUB = 'http://jabber.org/protocol/pubsub' 22 24 NS_PUBSUB_ERRORS = 'http://jabber.org/protocol/pubsub#errors' 25 NS_PUBSUB_EVENT = 'http://jabber.org/protocol/pubsub#event' 26 27 def calledAsync(fn): 28 """ 29 Function wrapper that fires a deferred upon calling the given function. 30 """ 31 d = defer.Deferred() 32 33 def func(*args, **kwargs): 34 try: 35 result = fn(*args, **kwargs) 36 except: 37 d.errback() 38 else: 39 d.callback(result) 40 41 return d, func 42 23 43 24 44 class PubSubClientTest(unittest.TestCase): 45 timeout = 2 25 46 26 47 def setUp(self): … … 28 49 self.protocol = pubsub.PubSubClient() 29 50 self.protocol.xmlstream = self.stub.xmlstream 51 self.protocol.connectionInitialized() 30 52 31 53 def test_unsubscribe(self): … … 51 73 return d 52 74 75 def test_event_items(self): 76 """ 77 Test receiving an items event resulting in a call to itemsReceived. 78 """ 79 message = domish.Element((None, 'message')) 80 message['from'] = 'pubsub.example.org' 81 message['to'] = 'user@example.org/home' 82 event = message.addElement((NS_PUBSUB_EVENT, 'event')) 83 items = event.addElement('items') 84 items['node'] = 'test' 85 item1 = items.addElement('item') 86 item1['id'] = 'item1' 87 item2 = items.addElement('retract') 88 item2['id'] = 'item2' 89 item3 = items.addElement('item') 90 item3['id'] = 'item3' 91 92 def itemsReceived(recipient, service, nodeIdentifier, items): 93 self.assertEquals(JID('user@example.org/home'), recipient) 94 self.assertEquals(JID('pubsub.example.org'), service) 95 self.assertEquals('test', nodeIdentifier) 96 self.assertEquals([item1, item2, item3], items) 97 98 d, self.protocol.itemsReceived = calledAsync(itemsReceived) 99 self.stub.send(message) 100 return d 101 102 def test_event_delete(self): 103 """ 104 Test receiving a delete event resulting in a call to deleteReceived. 105 """ 106 message = domish.Element((None, 'message')) 107 message['from'] = 'pubsub.example.org' 108 message['to'] = 'user@example.org/home' 109 event = message.addElement((NS_PUBSUB_EVENT, 'event')) 110 items = event.addElement('delete') 111 items['node'] = 'test' 112 113 def deleteReceived(recipient, service, nodeIdentifier): 114 self.assertEquals(JID('user@example.org/home'), recipient) 115 self.assertEquals(JID('pubsub.example.org'), service) 116 self.assertEquals('test', nodeIdentifier) 117 118 d, self.protocol.deleteReceived = calledAsync(deleteReceived) 119 self.stub.send(message) 120 return d 121 122 def test_event_purge(self): 123 """ 124 Test receiving a purge event resulting in a call to purgeReceived. 125 """ 126 message = domish.Element((None, 'message')) 127 message['from'] = 'pubsub.example.org' 128 message['to'] = 'user@example.org/home' 129 event = message.addElement((NS_PUBSUB_EVENT, 'event')) 130 items = event.addElement('purge') 131 items['node'] = 'test' 132 133 def purgeReceived(recipient, service, nodeIdentifier): 134 self.assertEquals(JID('user@example.org/home'), recipient) 135 self.assertEquals(JID('pubsub.example.org'), service) 136 self.assertEquals('test', nodeIdentifier) 137 138 d, self.protocol.purgeReceived = calledAsync(purgeReceived) 139 self.stub.send(message) 140 return d 53 141 54 142 class PubSubServiceTest(unittest.TestCase):
Note: See TracChangeset
for help on using the changeset viewer.