Ignore:
Timestamp:
Aug 4, 2008, 3:44:57 PM (13 years ago)
Author:
Ralph Meijer <ralphm@…>
Branch:
default
Convert:
svn:b33ecbfc-034c-dc11-8662-000475d9059e/trunk@60
Message:

Add support for pubsub collections.

File:
1 edited

Legend:

Unmodified
Added
Removed
  • wokkel/pubsub.py

    r29 r30  
    7070PUBSUB_DELETE = PUBSUB_OWNER_SET + '/delete' + IN_NS_PUBSUB_OWNER
    7171
    72 class BadRequest(error.StanzaError):
    73     """
    74     Bad request stanza error.
    75     """
    76     def __init__(self):
    77         error.StanzaError.__init__(self, 'bad-request')
    78 
    79 
    80 
    8172class SubscriptionPending(Exception):
    8273    """
     
    10899
    109100
     101class BadRequest(PubSubError):
     102    """
     103    Bad request stanza error.
     104    """
     105    def __init__(self, pubsubCondition=None, text=None):
     106        PubSubError.__init__(self, 'bad-request', pubsubCondition, text)
     107
     108
     109
    110110class Unsupported(PubSubError):
    111111    def __init__(self, feature, text=None):
     
    117117
    118118
    119 class OptionsUnavailable(Unsupported):
    120     def __init__(self):
    121         Unsupported.__init__(self, 'subscription-options-unavailable')
     119class Subscription(object):
     120    """
     121    A subscription to a node.
     122
     123    @ivar nodeIdentifier: The identifier of the node subscribed to.
     124                          The root node is denoted by C{None}.
     125    @ivar subscriber: The subscribing entity.
     126    @ivar state: The subscription state. One of C{'subscribed'}, C{'pending'},
     127                 C{'unconfigured'}.
     128    @ivar options: Optional list of subscription options.
     129    @type options: C{dict}.
     130    """
     131
     132    def __init__(self, nodeIdentifier, subscriber, state, options=None):
     133        self.nodeIdentifier = nodeIdentifier
     134        self.subscriber = subscriber
     135        self.state = state
     136        self.options = options or {}
    122137
    123138
     
    274289            message.handled = True
    275290
     291
    276292    def _onEvent_items(self, sender, recipient, action, headers):
    277293        nodeIdentifier = action["node"]
     
    282298        event = ItemsEvent(sender, recipient, nodeIdentifier, items, headers)
    283299        self.itemsReceived(event)
     300
    284301
    285302    def _onEvent_delete(self, sender, recipient, action, headers):
     
    288305        self.deleteReceived(event)
    289306
     307
    290308    def _onEvent_purge(self, sender, recipient, action, headers):
    291309        nodeIdentifier = action["node"]
     
    293311        self.purgeReceived(event)
    294312
     313
    295314    def itemsReceived(self, event):
    296315        pass
    297316
     317
    298318    def deleteReceived(self, event):
    299319        pass
    300320
     321
    301322    def purgeReceived(self, event):
    302323        pass
     324
    303325
    304326    def createNode(self, service, nodeIdentifier=None):
     
    355377        """
    356378        request = _PubSubRequest(self.xmlstream, 'subscribe')
    357         request.command['node'] = nodeIdentifier
     379        if nodeIdentifier:
     380            request.command['node'] = nodeIdentifier
    358381        request.command['jid'] = subscriber.full()
    359382
     
    386409        """
    387410        request = _PubSubRequest(self.xmlstream, 'unsubscribe')
    388         request.command['node'] = nodeIdentifier
     411        if nodeIdentifier:
     412            request.command['node'] = nodeIdentifier
    389413        request.command['jid'] = subscriber.full()
    390414        return request.send(service)
     
    423447        """
    424448        request = _PubSubRequest(self.xmlstream, 'items', method='get')
    425         request.command['node'] = nodeIdentifier
     449        if nodeIdentifier:
     450            request.command['node'] = nodeIdentifier
    426451        if maxItems:
    427452            request.command["max_items"] = str(int(maxItems))
     
    573598
    574599
    575     def _onPublish(self, iq):
     600    def _getParameter_node(self, commandElement):
     601        try:
     602            return commandElement["node"]
     603        except KeyError:
     604            raise BadRequest('nodeid-required')
     605
     606
     607    def _getParameter_nodeOrEmpty(self, commandElement):
     608        return commandElement.getAttribute("node", '')
     609
     610
     611    def _getParameter_jid(self, commandElement):
     612        try:
     613            return jid.internJID(commandElement["jid"])
     614        except KeyError:
     615            raise BadRequest('jid-required')
     616
     617
     618    def _getParameter_max_items(self, commandElement):
     619        value = commandElement.getAttribute('max_items')
     620
     621        if value:
     622            try:
     623                return int(value)
     624            except ValueError:
     625                raise BadRequest(text="Field max_items requires a positive " +
     626                                      "integer value")
     627        else:
     628            return None
     629
     630
     631    def _getParameters(self, iq, *names):
    576632        requestor = jid.internJID(iq["from"]).userhostJID()
    577633        service = jid.internJID(iq["to"])
    578634
    579         try:
    580             nodeIdentifier = iq.pubsub.publish["node"]
    581         except KeyError:
    582             raise BadRequest
     635        params = [requestor, service]
     636
     637        if names:
     638            command = names[0]
     639            commandElement = getattr(iq.pubsub, command)
     640            if not commandElement:
     641                raise Exception("Could not find command element %r" % command)
     642
     643        for name in names[1:]:
     644            try:
     645                getter = getattr(self, '_getParameter_' + name)
     646            except KeyError:
     647                raise Exception("No parameter getter for this name")
     648
     649            params.append(getter(commandElement))
     650
     651        return params
     652
     653
     654    def _onPublish(self, iq):
     655        requestor, service, nodeIdentifier = self._getParameters(
     656                iq, 'publish', 'node')
    583657
    584658        items = []
     
    591665
    592666    def _onSubscribe(self, iq):
    593         requestor = jid.internJID(iq["from"]).userhostJID()
    594         service = jid.internJID(iq["to"])
    595 
    596         try:
    597             nodeIdentifier = iq.pubsub.subscribe["node"]
    598             subscriber = jid.internJID(iq.pubsub.subscribe["jid"])
    599         except KeyError:
    600             raise BadRequest
    601 
    602         def toResponse(subscription):
    603             nodeIdentifier, state = subscription
     667        requestor, service, nodeIdentifier, subscriber = self._getParameters(
     668                iq, 'subscribe', 'nodeOrEmpty', 'jid')
     669
     670        def toResponse(result):
    604671            response = domish.Element((NS_PUBSUB, "pubsub"))
    605672            subscription = response.addElement("subscription")
    606             subscription["node"] = nodeIdentifier
    607             subscription["jid"] = subscriber.full()
    608             subscription["subscription"] = state
     673            if result.nodeIdentifier:
     674                subscription["node"] = result.nodeIdentifier
     675            subscription["jid"] = result.subscriber.full()
     676            subscription["subscription"] = result.state
    609677            return response
    610678
     
    615683
    616684    def _onUnsubscribe(self, iq):
    617         requestor = jid.internJID(iq["from"]).userhostJID()
    618         service = jid.internJID(iq["to"])
    619 
    620         try:
    621             nodeIdentifier = iq.pubsub.unsubscribe["node"]
    622             subscriber = jid.internJID(iq.pubsub.unsubscribe["jid"])
    623         except KeyError:
    624             raise BadRequest
     685        requestor, service, nodeIdentifier, subscriber = self._getParameters(
     686                iq, 'unsubscribe', 'nodeOrEmpty', 'jid')
    625687
    626688        return self.unsubscribe(requestor, service, nodeIdentifier, subscriber)
     
    628690
    629691    def _onOptionsGet(self, iq):
    630         raise Unsupported('subscription-options-unavailable')
     692        raise Unsupported('subscription-options')
    631693
    632694
    633695    def _onOptionsSet(self, iq):
    634         raise Unsupported('subscription-options-unavailable')
     696        raise Unsupported('subscription-options')
    635697
    636698
    637699    def _onSubscriptions(self, iq):
    638         requestor = jid.internJID(iq["from"]).userhostJID()
    639         service = jid.internJID(iq["to"])
     700        requestor, service = self._getParameters(iq)
    640701
    641702        def toResponse(result):
     
    655716
    656717    def _onAffiliations(self, iq):
    657         requestor = jid.internJID(iq["from"]).userhostJID()
    658         service = jid.internJID(iq["to"])
     718        requestor, service = self._getParameters(iq)
    659719
    660720        def toResponse(result):
     
    675735
    676736    def _onCreate(self, iq):
    677         requestor = jid.internJID(iq["from"]).userhostJID()
    678         service = jid.internJID(iq["to"])
     737        requestor, service = self._getParameters(iq)
    679738        nodeIdentifier = iq.pubsub.create.getAttribute("node")
    680739
     
    743802
    744803    def _onDefault(self, iq):
    745         requestor = jid.internJID(iq["from"]).userhostJID()
    746         service = jid.internJID(iq["to"])
     804        requestor, service = self._getParameters(iq)
    747805
    748806        def toResponse(options):
     
    752810            return response
    753811
    754         d = self.getDefaultConfiguration(requestor, service)
     812        form = self._findForm(iq.pubsub.config, NS_PUBSUB_NODE_CONFIG)
     813        values = form and form.formType == 'result' and form.getValues() or {}
     814        nodeType = values.get('pubsub#node_type', 'leaf')
     815
     816        if nodeType not in ('leaf', 'collections'):
     817            return defer.fail(error.StanzaError('not-acceptable'))
     818
     819        d = self.getDefaultConfiguration(requestor, service, nodeType)
    755820        d.addCallback(toResponse)
    756821        return d
     
    758823
    759824    def _onConfigureGet(self, iq):
    760         requestor = jid.internJID(iq["from"]).userhostJID()
    761         service = jid.internJID(iq["to"])
    762         nodeIdentifier = iq.pubsub.configure.getAttribute("node")
     825        requestor, service, nodeIdentifier = self._getParameters(
     826                iq, 'configure', 'nodeOrEmpty')
    763827
    764828        def toResponse(options):
     
    778842
    779843    def _onConfigureSet(self, iq):
    780         requestor = jid.internJID(iq["from"]).userhostJID()
    781         service = jid.internJID(iq["to"])
    782         nodeIdentifier = iq.pubsub.configure["node"]
     844        requestor, service, nodeIdentifier = self._getParameters(
     845                iq, 'configure', 'nodeOrEmpty')
    783846
    784847        # Search configuration form with correct FORM_TYPE and process it
     
    799862
    800863    def _onItems(self, iq):
    801         requestor = jid.internJID(iq["from"]).userhostJID()
    802         service = jid.internJID(iq["to"])
    803 
    804         try:
    805             nodeIdentifier = iq.pubsub.items["node"]
    806         except KeyError:
    807             raise BadRequest
    808 
    809         maxItems = iq.pubsub.items.getAttribute('max_items')
    810 
    811         if maxItems:
    812             try:
    813                 maxItems = int(maxItems)
    814             except ValueError:
    815                 raise BadRequest
     864        requestor, service, nodeIdentifier, maxItems = self._getParameters(
     865                iq, 'items', 'nodeOrEmpty', 'max_items')
    816866
    817867        itemIdentifiers = []
     
    821871                    itemIdentifiers.append(child["id"])
    822872                except KeyError:
    823                     raise BadRequest
     873                    raise BadRequest()
    824874
    825875        def toResponse(result):
    826876            response = domish.Element((NS_PUBSUB, 'pubsub'))
    827877            items = response.addElement('items')
    828             items["node"] = nodeIdentifier
     878            if nodeIdentifier:
     879                items["node"] = nodeIdentifier
    829880
    830881            for item in result:
     
    840891
    841892    def _onRetract(self, iq):
    842         requestor = jid.internJID(iq["from"]).userhostJID()
    843         service = jid.internJID(iq["to"])
    844 
    845         try:
    846             nodeIdentifier = iq.pubsub.retract["node"]
    847         except KeyError:
    848             raise BadRequest
     893        requestor, service, nodeIdentifier = self._getParameters(
     894                iq, 'retract', 'node')
    849895
    850896        itemIdentifiers = []
    851897        for child in iq.pubsub.retract.elements():
    852             if child.uri == NS_PUBSUB_OWNER and child.name == 'item':
     898            if child.uri == NS_PUBSUB and child.name == 'item':
    853899                try:
    854900                    itemIdentifiers.append(child["id"])
    855901                except KeyError:
    856                     raise BadRequest
     902                    raise BadRequest()
    857903
    858904        return self.retract(requestor, service, nodeIdentifier,
     
    861907
    862908    def _onPurge(self, iq):
    863         requestor = jid.internJID(iq["from"]).userhostJID()
    864         service = jid.internJID(iq["to"])
    865 
    866         try:
    867             nodeIdentifier = iq.pubsub.purge["node"]
    868         except KeyError:
    869             raise BadRequest
    870 
     909        requestor, service, nodeIdentifier = self._getParameters(
     910                iq, 'purge', 'node')
    871911        return self.purge(requestor, service, nodeIdentifier)
    872912
    873913
    874914    def _onDelete(self, iq):
    875         requestor = jid.internJID(iq["from"]).userhostJID()
    876         service = jid.internJID(iq["to"])
    877 
    878         try:
    879             nodeIdentifier = iq.pubsub.delete["node"]
    880         except KeyError:
    881             raise BadRequest
    882 
     915        requestor, service, nodeIdentifier = self._getParameters(
     916                iq, 'delete', 'node')
    883917        return self.delete(requestor, service, nodeIdentifier)
    884918
     
    901935    # public methods
    902936
     937    def _createNotification(self, eventType, service, nodeIdentifier,
     938                                  subscriber, subscriptions=None):
     939        headers = []
     940
     941        if subscriptions:
     942            for subscription in subscriptions:
     943                if nodeIdentifier != subscription.nodeIdentifier:
     944                    headers.append(('Collection', subscription.nodeIdentifier))
     945
     946        message = domish.Element((None, "message"))
     947        message["from"] = service.full()
     948        message["to"] = subscriber.full()
     949        event = message.addElement((NS_PUBSUB_EVENT, "event"))
     950
     951        element = event.addElement(eventType)
     952        element["node"] = nodeIdentifier
     953
     954        if headers:
     955            message.addChild(shim.Headers(headers))
     956
     957        return message
     958
    903959    def notifyPublish(self, service, nodeIdentifier, notifications):
    904         for recipient, items in notifications:
    905             message = domish.Element((None, "message"))
    906             message["from"] = service.full()
    907             message["to"] = recipient.full()
    908             event = message.addElement((NS_PUBSUB_EVENT, "event"))
    909             element = event.addElement("items")
    910             element["node"] = nodeIdentifier
    911             element.children = items
     960        for subscriber, subscriptions, items in notifications:
     961            message = self._createNotification('items', service,
     962                                               nodeIdentifier, subscriber,
     963                                               subscriptions)
     964            message.event.items.children = items
    912965            self.send(message)
    913966
    914967
    915     def notifyDelete(self, service, nodeIdentifier, recipients):
    916         for recipient in recipients:
    917             message = domish.Element((None, "message"))
    918             message["from"] = service.full()
    919             message["to"] = recipient.full()
    920             event = message.addElement((NS_PUBSUB_EVENT, "event"))
    921             element = event.addElement("delete")
    922             element["node"] = nodeIdentifier
     968    def notifyDelete(self, service, nodeIdentifier, subscriptions):
     969        for subscription in subscriptions:
     970            message = self._createNotification('delete', service,
     971                                               nodeIdentifier,
     972                                               subscription.subscriber)
    923973            self.send(message)
    924974
Note: See TracChangeset for help on using the changeset viewer.