Ignore:
Timestamp:
Apr 16, 2008, 4:08:15 PM (14 years ago)
Author:
Ralph Meijer <ralphm@…>
Branch:
default
Convert:
svn:b33ecbfc-034c-dc11-8662-000475d9059e/trunk@48
Message:

Add maxItems argument to PubSubClient?.items(), lot of docs and tests.

File:
1 edited

Legend:

Unmodified
Added
Removed
  • wokkel/test/test_pubsub.py

    r17 r18  
    5151        self.protocol.connectionInitialized()
    5252
    53     def test_unsubscribe(self):
    54         """
    55         Test sending unsubscription request.
    56         """
    57         d = self.protocol.unsubscribe(JID('pubsub.example.org'), 'test',
    58                                       JID('user@example.org'))
    59 
    60         iq = self.stub.output[-1]
    61         self.assertEquals('pubsub.example.org', iq.getAttribute('to'))
    62         self.assertEquals('set', iq.getAttribute('type'))
    63         self.assertEquals('pubsub', iq.pubsub.name)
    64         self.assertEquals(NS_PUBSUB, iq.pubsub.uri)
    65         children = list(domish.generateElementsQNamed(iq.pubsub.children,
    66                                                       'unsubscribe', NS_PUBSUB))
    67         self.assertEquals(1, len(children))
    68         child = children[0]
    69         self.assertEquals('test', child['node'])
    70         self.assertEquals('user@example.org', child['jid'])
    71 
    72         self.stub.send(toResponse(iq, 'result'))
    73         return d
    7453
    7554    def test_event_items(self):
     
    10079        return d
    10180
     81
    10282    def test_event_delete(self):
    10383        """
     
    120100        return d
    121101
     102
    122103    def test_event_purge(self):
    123104        """
     
    141122
    142123
     124    def test_createNode(self):
     125        """
     126        Test sending create request.
     127        """
     128
     129        def cb(nodeIdentifier):
     130            self.assertEquals('test', nodeIdentifier)
     131
     132        d = self.protocol.createNode(JID('pubsub.example.org'), 'test')
     133        d.addCallback(cb)
     134
     135        iq = self.stub.output[-1]
     136        self.assertEquals('pubsub.example.org', iq.getAttribute('to'))
     137        self.assertEquals('set', iq.getAttribute('type'))
     138        self.assertEquals('pubsub', iq.pubsub.name)
     139        self.assertEquals(NS_PUBSUB, iq.pubsub.uri)
     140        children = list(domish.generateElementsQNamed(iq.pubsub.children,
     141                                                      'create', NS_PUBSUB))
     142        self.assertEquals(1, len(children))
     143        child = children[0]
     144        self.assertEquals('test', child['node'])
     145
     146        response = toResponse(iq, 'result')
     147        self.stub.send(response)
     148        return d
     149
     150
     151    def test_createNodeInstant(self):
     152        """
     153        Test sending create request resulting in an instant node.
     154        """
     155
     156        def cb(nodeIdentifier):
     157            self.assertEquals('test', nodeIdentifier)
     158
     159        d = self.protocol.createNode(JID('pubsub.example.org'))
     160        d.addCallback(cb)
     161
     162        iq = self.stub.output[-1]
     163        children = list(domish.generateElementsQNamed(iq.pubsub.children,
     164                                                      'create', NS_PUBSUB))
     165        child = children[0]
     166        self.assertFalse(child.hasAttribute('node'))
     167
     168        response = toResponse(iq, 'result')
     169        command = response.addElement((NS_PUBSUB, 'pubsub'))
     170        create = command.addElement('create')
     171        create['node'] = 'test'
     172        self.stub.send(response)
     173        return d
     174
     175
     176    def test_createNodeRenamed(self):
     177        """
     178        Test sending create request resulting in renamed node.
     179        """
     180
     181        def cb(nodeIdentifier):
     182            self.assertEquals('test2', nodeIdentifier)
     183
     184        d = self.protocol.createNode(JID('pubsub.example.org'), 'test')
     185        d.addCallback(cb)
     186
     187        iq = self.stub.output[-1]
     188        children = list(domish.generateElementsQNamed(iq.pubsub.children,
     189                                                      'create', NS_PUBSUB))
     190        child = children[0]
     191        self.assertEquals('test', child['node'])
     192
     193        response = toResponse(iq, 'result')
     194        command = response.addElement((NS_PUBSUB, 'pubsub'))
     195        create = command.addElement('create')
     196        create['node'] = 'test2'
     197        self.stub.send(response)
     198        return d
     199
     200
     201    def test_deleteNode(self):
     202        """
     203        Test sending delete request.
     204        """
     205
     206        d = self.protocol.deleteNode(JID('pubsub.example.org'), 'test')
     207
     208        iq = self.stub.output[-1]
     209        self.assertEquals('pubsub.example.org', iq.getAttribute('to'))
     210        self.assertEquals('set', iq.getAttribute('type'))
     211        self.assertEquals('pubsub', iq.pubsub.name)
     212        self.assertEquals(NS_PUBSUB, iq.pubsub.uri)
     213        children = list(domish.generateElementsQNamed(iq.pubsub.children,
     214                                                      'delete', NS_PUBSUB))
     215        self.assertEquals(1, len(children))
     216        child = children[0]
     217        self.assertEquals('test', child['node'])
     218
     219        response = toResponse(iq, 'result')
     220        self.stub.send(response)
     221        return d
     222
     223
     224    def test_publish(self):
     225        """
     226        Test sending publish request.
     227        """
     228
     229        item = pubsub.Item()
     230        d = self.protocol.publish(JID('pubsub.example.org'), 'test', [item])
     231
     232        iq = self.stub.output[-1]
     233        self.assertEquals('pubsub.example.org', iq.getAttribute('to'))
     234        self.assertEquals('set', iq.getAttribute('type'))
     235        self.assertEquals('pubsub', iq.pubsub.name)
     236        self.assertEquals(NS_PUBSUB, iq.pubsub.uri)
     237        children = list(domish.generateElementsQNamed(iq.pubsub.children,
     238                                                      'publish', NS_PUBSUB))
     239        self.assertEquals(1, len(children))
     240        child = children[0]
     241        self.assertEquals('test', child['node'])
     242        items = list(domish.generateElementsQNamed(child.children,
     243                                                   'item', NS_PUBSUB))
     244        self.assertEquals(1, len(items))
     245        self.assertIdentical(item, items[0])
     246
     247        response = toResponse(iq, 'result')
     248        self.stub.send(response)
     249        return d
     250
     251
     252    def test_publishNoItems(self):
     253        """
     254        Test sending publish request without items.
     255        """
     256
     257        d = self.protocol.publish(JID('pubsub.example.org'), 'test')
     258
     259        iq = self.stub.output[-1]
     260        self.assertEquals('pubsub.example.org', iq.getAttribute('to'))
     261        self.assertEquals('set', iq.getAttribute('type'))
     262        self.assertEquals('pubsub', iq.pubsub.name)
     263        self.assertEquals(NS_PUBSUB, iq.pubsub.uri)
     264        children = list(domish.generateElementsQNamed(iq.pubsub.children,
     265                                                      'publish', NS_PUBSUB))
     266        self.assertEquals(1, len(children))
     267        child = children[0]
     268        self.assertEquals('test', child['node'])
     269
     270        response = toResponse(iq, 'result')
     271        self.stub.send(response)
     272        return d
     273
     274
     275    def test_subscribe(self):
     276        """
     277        Test sending subscription request.
     278        """
     279        d = self.protocol.subscribe(JID('pubsub.example.org'), 'test',
     280                                      JID('user@example.org'))
     281
     282        iq = self.stub.output[-1]
     283        self.assertEquals('pubsub.example.org', iq.getAttribute('to'))
     284        self.assertEquals('set', iq.getAttribute('type'))
     285        self.assertEquals('pubsub', iq.pubsub.name)
     286        self.assertEquals(NS_PUBSUB, iq.pubsub.uri)
     287        children = list(domish.generateElementsQNamed(iq.pubsub.children,
     288                                                      'subscribe', NS_PUBSUB))
     289        self.assertEquals(1, len(children))
     290        child = children[0]
     291        self.assertEquals('test', child['node'])
     292        self.assertEquals('user@example.org', child['jid'])
     293
     294        response = toResponse(iq, 'result')
     295        pubsub = response.addElement((NS_PUBSUB, 'pubsub'))
     296        subscription = pubsub.addElement('subscription')
     297        subscription['node'] = 'test'
     298        subscription['jid'] = 'user@example.org'
     299        subscription['subscription'] = 'subscribed'
     300        self.stub.send(response)
     301        return d
     302
     303
     304    def test_subscribePending(self):
     305        """
     306        Test sending subscription request that results in a pending
     307        subscription.
     308        """
     309        d = self.protocol.subscribe(JID('pubsub.example.org'), 'test',
     310                                      JID('user@example.org'))
     311
     312        iq = self.stub.output[-1]
     313        response = toResponse(iq, 'result')
     314        command = response.addElement((NS_PUBSUB, 'pubsub'))
     315        subscription = command.addElement('subscription')
     316        subscription['node'] = 'test'
     317        subscription['jid'] = 'user@example.org'
     318        subscription['subscription'] = 'pending'
     319        self.stub.send(response)
     320        self.assertFailure(d, pubsub.SubscriptionPending)
     321        return d
     322
     323
     324    def test_subscribePending(self):
     325        """
     326        Test sending subscription request that results in an unconfigured
     327        subscription.
     328        """
     329        d = self.protocol.subscribe(JID('pubsub.example.org'), 'test',
     330                                      JID('user@example.org'))
     331
     332        iq = self.stub.output[-1]
     333        response = toResponse(iq, 'result')
     334        command = response.addElement((NS_PUBSUB, 'pubsub'))
     335        subscription = command.addElement('subscription')
     336        subscription['node'] = 'test'
     337        subscription['jid'] = 'user@example.org'
     338        subscription['subscription'] = 'unconfigured'
     339        self.stub.send(response)
     340        self.assertFailure(d, pubsub.SubscriptionUnconfigured)
     341        return d
     342
     343
     344    def test_unsubscribe(self):
     345        """
     346        Test sending unsubscription request.
     347        """
     348        d = self.protocol.unsubscribe(JID('pubsub.example.org'), 'test',
     349                                      JID('user@example.org'))
     350
     351        iq = self.stub.output[-1]
     352        self.assertEquals('pubsub.example.org', iq.getAttribute('to'))
     353        self.assertEquals('set', iq.getAttribute('type'))
     354        self.assertEquals('pubsub', iq.pubsub.name)
     355        self.assertEquals(NS_PUBSUB, iq.pubsub.uri)
     356        children = list(domish.generateElementsQNamed(iq.pubsub.children,
     357                                                      'unsubscribe', NS_PUBSUB))
     358        self.assertEquals(1, len(children))
     359        child = children[0]
     360        self.assertEquals('test', child['node'])
     361        self.assertEquals('user@example.org', child['jid'])
     362
     363        self.stub.send(toResponse(iq, 'result'))
     364        return d
     365
     366
    143367    def test_items(self):
    144368        """
     
    169393
    170394        return d
     395
     396
     397    def test_itemsMaxItems(self):
     398        """
     399        Test sending items request, with limit on the number of items.
     400        """
     401        def cb(items):
     402            self.assertEquals(2, len(items))
     403            self.assertEquals([item1, item2], items)
     404
     405        d = self.protocol.items(JID('pubsub.example.org'), 'test', maxItems=2)
     406        d.addCallback(cb)
     407
     408        iq = self.stub.output[-1]
     409        self.assertEquals('pubsub.example.org', iq.getAttribute('to'))
     410        self.assertEquals('get', iq.getAttribute('type'))
     411        self.assertEquals('pubsub', iq.pubsub.name)
     412        self.assertEquals(NS_PUBSUB, iq.pubsub.uri)
     413        children = list(domish.generateElementsQNamed(iq.pubsub.children,
     414                                                      'items', NS_PUBSUB))
     415        self.assertEquals(1, len(children))
     416        child = children[0]
     417        self.assertEquals('test', child['node'])
     418        self.assertEquals('2', child['max_items'])
     419
     420        response = toResponse(iq, 'result')
     421        items = response.addElement((NS_PUBSUB, 'pubsub')).addElement('items')
     422        items['node'] = 'test'
     423        item1 = items.addElement('item')
     424        item1['id'] = 'item1'
     425        item2 = items.addElement('item')
     426        item2['id'] = 'item2'
     427
     428        self.stub.send(response)
     429
     430        return d
     431
     432
    171433
    172434class PubSubServiceTest(unittest.TestCase):
Note: See TracChangeset for help on using the changeset viewer.