Ignore:
Timestamp:
Feb 20, 2008, 3:44:44 PM (14 years ago)
Author:
Ralph Meijer <ralphm@…>
Branch:
default
Convert:
svn:b33ecbfc-034c-dc11-8662-000475d9059e/trunk@41
Message:

Add support for receiving notifications for item retraction, node deletion and purging.

File:
1 edited

Legend:

Unmodified
Added
Removed
  • wokkel/test/test_pubsub.py

    r10 r13  
    77
    88from twisted.trial import unittest
     9from twisted.python import failure
     10from twisted.internet import defer, reactor
    911from twisted.words.xish import domish
    1012from twisted.words.protocols.jabber import error
     
    2123NS_PUBSUB = 'http://jabber.org/protocol/pubsub'
    2224NS_PUBSUB_ERRORS = 'http://jabber.org/protocol/pubsub#errors'
     25NS_PUBSUB_EVENT = 'http://jabber.org/protocol/pubsub#event'
     26
     27def calledAsync(fn):
     28    """
     29    Function wrapper that fires a deferred upon calling the given function.
     30    """
     31    d = defer.Deferred()
     32
     33    def func(*args, **kwargs):
     34        try:
     35            result = fn(*args, **kwargs)
     36        except:
     37            d.errback()
     38        else:
     39            d.callback(result)
     40
     41    return d, func
     42
    2343
    2444class PubSubClientTest(unittest.TestCase):
     45    timeout = 2
    2546
    2647    def setUp(self):
     
    2849        self.protocol = pubsub.PubSubClient()
    2950        self.protocol.xmlstream = self.stub.xmlstream
     51        self.protocol.connectionInitialized()
    3052
    3153    def test_unsubscribe(self):
     
    5173        return d
    5274
     75    def test_event_items(self):
     76        """
     77        Test receiving an items event resulting in a call to itemsReceived.
     78        """
     79        message = domish.Element((None, 'message'))
     80        message['from'] = 'pubsub.example.org'
     81        message['to'] = 'user@example.org/home'
     82        event = message.addElement((NS_PUBSUB_EVENT, 'event'))
     83        items = event.addElement('items')
     84        items['node'] = 'test'
     85        item1 = items.addElement('item')
     86        item1['id'] = 'item1'
     87        item2 = items.addElement('retract')
     88        item2['id'] = 'item2'
     89        item3 = items.addElement('item')
     90        item3['id'] = 'item3'
     91
     92        def itemsReceived(recipient, service, nodeIdentifier, items):
     93            self.assertEquals(JID('user@example.org/home'), recipient)
     94            self.assertEquals(JID('pubsub.example.org'), service)
     95            self.assertEquals('test', nodeIdentifier)
     96            self.assertEquals([item1, item2, item3], items)
     97
     98        d, self.protocol.itemsReceived = calledAsync(itemsReceived)
     99        self.stub.send(message)
     100        return d
     101
     102    def test_event_delete(self):
     103        """
     104        Test receiving a delete event resulting in a call to deleteReceived.
     105        """
     106        message = domish.Element((None, 'message'))
     107        message['from'] = 'pubsub.example.org'
     108        message['to'] = 'user@example.org/home'
     109        event = message.addElement((NS_PUBSUB_EVENT, 'event'))
     110        items = event.addElement('delete')
     111        items['node'] = 'test'
     112
     113        def deleteReceived(recipient, service, nodeIdentifier):
     114            self.assertEquals(JID('user@example.org/home'), recipient)
     115            self.assertEquals(JID('pubsub.example.org'), service)
     116            self.assertEquals('test', nodeIdentifier)
     117
     118        d, self.protocol.deleteReceived = calledAsync(deleteReceived)
     119        self.stub.send(message)
     120        return d
     121
     122    def test_event_purge(self):
     123        """
     124        Test receiving a purge event resulting in a call to purgeReceived.
     125        """
     126        message = domish.Element((None, 'message'))
     127        message['from'] = 'pubsub.example.org'
     128        message['to'] = 'user@example.org/home'
     129        event = message.addElement((NS_PUBSUB_EVENT, 'event'))
     130        items = event.addElement('purge')
     131        items['node'] = 'test'
     132
     133        def purgeReceived(recipient, service, nodeIdentifier):
     134            self.assertEquals(JID('user@example.org/home'), recipient)
     135            self.assertEquals(JID('pubsub.example.org'), service)
     136            self.assertEquals('test', nodeIdentifier)
     137
     138        d, self.protocol.purgeReceived = calledAsync(purgeReceived)
     139        self.stub.send(message)
     140        return d
    53141
    54142class PubSubServiceTest(unittest.TestCase):
Note: See TracChangeset for help on using the changeset viewer.