Ignore:
Timestamp:
Apr 22, 2009, 5:04:03 PM (13 years ago)
Author:
Ralph Meijer <ralphm@…>
Branch:
default
Convert:
svn:b33ecbfc-034c-dc11-8662-000475d9059e/trunk@171
Message:

Provide PubSubResource?, modeled after Twisted Web resources.

Author: ralphm.
Fixes #47.

This should make it rather easy to make publish-subscribe enabled services.

File:
1 edited

Legend:

Unmodified
Added
Removed
  • wokkel/pubsub.py

    r58 r59  
    1414
    1515from twisted.internet import defer
     16from twisted.python import log
    1617from twisted.words.protocols.jabber import jid, error, xmlstream
    1718from twisted.words.xish import domish
     
    1920from wokkel import disco, data_form, generic, shim
    2021from wokkel.subprotocols import IQHandlerMixin, XMPPHandler
    21 from wokkel.iwokkel import IPubSubClient, IPubSubService
     22from wokkel.iwokkel import IPubSubClient, IPubSubService, IPubSubResource
    2223
    2324# Iq get and set XPath queries
     
    8586class Unsupported(PubSubError):
    8687    def __init__(self, feature, text=None):
     88        self.feature = feature
    8789        PubSubError.__init__(self, 'feature-not-implemented',
    8890                                   'unsupported',
     
    9092                                   text)
    9193
     94    def __str__(self):
     95        message = PubSubError.__str__(self)
     96        message += ', feature %r' % self.feature
     97        return message
    9298
    9399
     
    236242        'purge': ['node'],
    237243        'delete': ['node'],
    238         'affiliationsGet': [],
     244        'affiliationsGet': ['nodeOrEmpty'],
    239245        'affiliationsSet': [],
    240         'subscriptionsGet': [],
     246        'subscriptionsGet': ['nodeOrEmpty'],
    241247        'subscriptionsSet': [],
    242248    }
     
    807813            }
    808814
    809 
    810     def __init__(self):
     815    _legacyHandlers = {
     816        'publish': ('publish', ['sender', 'recipient',
     817                                'nodeIdentifier', 'items']),
     818        'subscribe': ('subscribe', ['sender', 'recipient',
     819                                    'nodeIdentifier', 'subscriber']),
     820        'unsubscribe': ('unsubscribe', ['sender', 'recipient',
     821                                        'nodeIdentifier', 'subscriber']),
     822        'subscriptions': ('subscriptions', ['sender', 'recipient']),
     823        'affiliations': ('affiliations', ['sender', 'recipient']),
     824        'create': ('create', ['sender', 'recipient', 'nodeIdentifier']),
     825        'getConfigurationOptions': ('getConfigurationOptions', []),
     826        'default': ('getDefaultConfiguration',
     827                    ['sender', 'recipient', 'nodeType']),
     828        'configureGet': ('getConfiguration', ['sender', 'recipient',
     829                                              'nodeIdentifier']),
     830        'configureSet': ('setConfiguration', ['sender', 'recipient',
     831                                              'nodeIdentifier', 'options']),
     832        'items': ('items', ['sender', 'recipient', 'nodeIdentifier',
     833                            'maxItems', 'itemIdentifiers']),
     834        'retract': ('retract', ['sender', 'recipient', 'nodeIdentifier',
     835                                'itemIdentifiers']),
     836        'purge': ('purge', ['sender', 'recipient', 'nodeIdentifier']),
     837        'delete': ('delete', ['sender', 'recipient', 'nodeIdentifier']),
     838    }
     839
     840    hideNodes = False
     841
     842    def __init__(self, resource=None):
     843        self.resource = resource
    811844        self.discoIdentity = {'category': 'pubsub',
    812845                              'type': 'generic',
     
    821854
    822855    def getDiscoInfo(self, requestor, target, nodeIdentifier):
    823         info = []
    824 
    825         if not nodeIdentifier:
    826             category, idType, name = self.discoIdentity
    827             info.append(disco.DiscoIdentity(category, idType, name))
    828 
    829             info.append(disco.DiscoFeature(disco.NS_DISCO_ITEMS))
    830             info.extend([disco.DiscoFeature("%s#%s" % (NS_PUBSUB, feature))
    831                          for feature in self.pubSubFeatures])
    832 
    833         def toInfo(nodeInfo):
     856        def toInfo(nodeInfo, info):
    834857            if not nodeInfo:
    835                 return
     858                return info
    836859
    837860            (nodeType, metaData) = nodeInfo['type'], nodeInfo['meta-data']
     
    853876                info.append(form)
    854877
    855         d = self.getNodeInfo(requestor, target, nodeIdentifier or '')
    856         d.addCallback(toInfo)
    857         d.addBoth(lambda result: info)
     878            return info
     879
     880        info = []
     881
     882        request = PubSubRequest('discoInfo')
     883
     884        if self.resource is not None:
     885            resource = self.resource.locateResource(request)
     886            identity = resource.discoIdentity
     887            features = resource.features
     888            getInfo = resource.getInfo
     889        else:
     890            category, idType, name = self.discoIdentity
     891            identity = disco.DiscoIdentity(category, idType, name)
     892            features = self.pubSubFeatures
     893            getInfo = self.getNodeInfo
     894
     895        if not nodeIdentifier:
     896            info.append(identity)
     897            info.append(disco.DiscoFeature(disco.NS_DISCO_ITEMS))
     898            info.extend([disco.DiscoFeature("%s#%s" % (NS_PUBSUB, feature))
     899                         for feature in features])
     900
     901        d = getInfo(requestor, target, nodeIdentifier or '')
     902        d.addCallback(toInfo, info)
     903        d.addErrback(log.err)
    858904        return d
    859905
    860906
    861907    def getDiscoItems(self, requestor, target, nodeIdentifier):
    862         if nodeIdentifier or self.hideNodes:
    863             return defer.succeed([])
    864 
    865         d = self.getNodes(requestor, target)
     908        if self.hideNodes:
     909            d = defer.succeed([])
     910        elif self.resource is not None:
     911            request = PubSubRequest('discoInfo')
     912            resource = self.resource.locateResource(request)
     913            d = resource.getNodes(requestor, target, nodeIdentifier)
     914        elif nodeIdentifier:
     915            d = self.getNodes(requestor, target)
     916        else:
     917            d = defer.succeed([])
     918           
     919
     920
    866921        d.addCallback(lambda nodes: [disco.DiscoItem(target, node)
    867922                                     for node in nodes])
     
    871926    def _onPubSubRequest(self, iq):
    872927        request = PubSubRequest.fromElement(iq)
    873         handler = getattr(self, '_on_%s' % request.verb)
    874         return handler(request)
    875 
    876 
    877     def _on_publish(self, request):
    878         return self.publish(request.sender, request.recipient,
    879                             request.nodeIdentifier, request.items)
    880 
    881 
    882     def _on_subscribe(self, request):
    883 
    884         def toResponse(result):
    885             response = domish.Element((NS_PUBSUB, "pubsub"))
    886             subscription = response.addElement("subscription")
    887             if result.nodeIdentifier:
    888                 subscription["node"] = result.nodeIdentifier
    889             subscription["jid"] = result.subscriber.full()
    890             subscription["subscription"] = result.state
     928
     929        if self.resource is not None:
     930            resource = self.resource.locateResource(request)
     931        else:
     932            resource = self
     933
     934        # Preprocess the request, knowing the handling resource
     935        try:
     936            preProcessor = getattr(self, '_preProcess_%s' % request.verb)
     937        except AttributeError:
     938            pass
     939        else:
     940            request = preProcessor(resource, request)
     941            if request is None:
     942                return defer.succeed(None)
     943
     944        # Process the request itself,
     945        if resource is not self:
     946            try:
     947                handler = getattr(resource, request.verb)
     948            except AttributeError:
     949                # fix lookup feature
     950                text = "Request verb: %s" % request.verb
     951                return defer.fail(Unsupported('', text))
     952
     953            d = handler(request)
     954        else:
     955            handlerName, argNames = self._legacyHandlers[request.verb]
     956            handler = getattr(self, handlerName)
     957            args = [getattr(request, arg) for arg in argNames]
     958            d = handler(*args)
     959
     960        # If needed, translate the result into a response
     961        try:
     962            cb = getattr(self, '_toResponse_%s' % request.verb)
     963        except AttributeError:
     964            pass
     965        else:
     966            d.addCallback(cb, resource, request)
     967
     968        return d
     969
     970
     971    def _toResponse_subscribe(self, result, resource, request):
     972        response = domish.Element((NS_PUBSUB, "pubsub"))
     973        subscription = response.addElement("subscription")
     974        if result.nodeIdentifier:
     975            subscription["node"] = result.nodeIdentifier
     976        subscription["jid"] = result.subscriber.full()
     977        subscription["subscription"] = result.state
     978        return response
     979
     980
     981    def _toResponse_subscriptions(self, result, resource, request):
     982        response = domish.Element((NS_PUBSUB, 'pubsub'))
     983        subscriptions = response.addElement('subscriptions')
     984        for subscription in result:
     985            item = subscriptions.addElement('subscription')
     986            item['node'] = subscription.nodeIdentifier
     987            item['jid'] = subscription.subscriber.full()
     988            item['subscription'] = subscription.state
     989        return response
     990
     991
     992    def _toResponse_affiliations(self, result, resource, request):
     993        response = domish.Element((NS_PUBSUB, 'pubsub'))
     994        affiliations = response.addElement('affiliations')
     995
     996        for nodeIdentifier, affiliation in result:
     997            item = affiliations.addElement('affiliation')
     998            item['node'] = nodeIdentifier
     999            item['affiliation'] = affiliation
     1000
     1001        return response
     1002
     1003
     1004    def _toResponse_create(self, result, resource, request):
     1005        if not request.nodeIdentifier or request.nodeIdentifier != result:
     1006            response = domish.Element((NS_PUBSUB, 'pubsub'))
     1007            create = response.addElement('create')
     1008            create['node'] = result
    8911009            return response
    892 
    893         d = self.subscribe(request.sender, request.recipient,
    894                            request.nodeIdentifier, request.subscriber)
    895         d.addCallback(toResponse)
    896         return d
    897 
    898 
    899     def _on_unsubscribe(self, request):
    900         return self.unsubscribe(request.sender, request.recipient,
    901                                 request.nodeIdentifier, request.subscriber)
    902 
    903 
    904     def _on_optionsGet(self, request):
    905         raise Unsupported('subscription-options')
    906 
    907 
    908     def _on_optionsSet(self, request):
    909         raise Unsupported('subscription-options')
    910 
    911 
    912     def _on_subscriptions(self, request):
    913 
    914         def toResponse(result):
    915             response = domish.Element((NS_PUBSUB, 'pubsub'))
    916             subscriptions = response.addElement('subscriptions')
    917             for subscription in result:
    918                 item = subscriptions.addElement('subscription')
    919                 item['node'] = subscription.nodeIdentifier
    920                 item['jid'] = subscription.subscriber.full()
    921                 item['subscription'] = subscription.state
    922             return response
    923 
    924         d = self.subscriptions(request.sender, request.recipient)
    925         d.addCallback(toResponse)
    926         return d
    927 
    928 
    929     def _on_affiliations(self, request):
    930 
    931         def toResponse(result):
    932             response = domish.Element((NS_PUBSUB, 'pubsub'))
    933             affiliations = response.addElement('affiliations')
    934 
    935             for nodeIdentifier, affiliation in result:
    936                 item = affiliations.addElement('affiliation')
    937                 item['node'] = nodeIdentifier
    938                 item['affiliation'] = affiliation
    939 
    940             return response
    941 
    942         d = self.affiliations(request.sender, request.recipient)
    943         d.addCallback(toResponse)
    944         return d
    945 
    946 
    947     def _on_create(self, request):
    948 
    949         def toResponse(result):
    950             if not request.nodeIdentifier or request.nodeIdentifier != result:
    951                 response = domish.Element((NS_PUBSUB, 'pubsub'))
    952                 create = response.addElement('create')
    953                 create['node'] = result
    954                 return response
    955             else:
    956                 return None
    957 
    958         d = self.create(request.sender, request.recipient,
    959                         request.nodeIdentifier)
    960         d.addCallback(toResponse)
    961         return d
     1010        else:
     1011            return None
    9621012
    9631013
     
    9781028
    9791029
    980     def _formFromConfiguration(self, values):
    981         options = self.getConfigurationOptions()
     1030    def _formFromConfiguration(self, resource, values):
     1031        options = resource.getConfigurationOptions()
    9821032        fields = self._makeFields(options, values)
    9831033        form = data_form.Form(formType="form",
     
    9881038
    9891039
    990     def _checkConfiguration(self, values):
    991         options = self.getConfigurationOptions()
     1040    def _checkConfiguration(self, resource, values):
     1041        options = resource.getConfigurationOptions()
    9921042        processedValues = {}
    9931043
     
    10131063
    10141064
    1015     def _on_default(self, request):
    1016 
    1017         def toResponse(options):
    1018             response = domish.Element((NS_PUBSUB_OWNER, "pubsub"))
    1019             default = response.addElement("default")
    1020             default.addChild(self._formFromConfiguration(options).toElement())
    1021             return response
    1022 
     1065    def _preProcess_default(self, resource, request):
    10231066        if request.nodeType not in ('leaf', 'collection'):
    1024             return defer.fail(error.StanzaError('not-acceptable'))
    1025 
    1026         d = self.getDefaultConfiguration(request.sender, request.recipient,
    1027                                          request.nodeType)
    1028         d.addCallback(toResponse)
    1029         return d
    1030 
    1031 
    1032     def _on_configureGet(self, request):
    1033         def toResponse(options):
    1034             response = domish.Element((NS_PUBSUB_OWNER, "pubsub"))
    1035             configure = response.addElement("configure")
    1036             form = self._formFromConfiguration(options)
    1037             configure.addChild(form.toElement())
    1038 
    1039             if request.nodeIdentifier:
    1040                 configure["node"] = request.nodeIdentifier
    1041 
    1042             return response
    1043 
    1044         d = self.getConfiguration(request.sender, request.recipient,
    1045                                   request.nodeIdentifier)
    1046         d.addCallback(toResponse)
    1047         return d
    1048 
    1049 
    1050     def _on_configureSet(self, request):
     1067            raise error.StanzaError('not-acceptable')
     1068        else:
     1069            return request
     1070
     1071
     1072    def _toResponse_default(self, options, resource, request):
     1073        response = domish.Element((NS_PUBSUB_OWNER, "pubsub"))
     1074        default = response.addElement("default")
     1075        form = self._formFromConfiguration(resource, options)
     1076        default.addChild(form.toElement())
     1077        return response
     1078
     1079
     1080    def _toResponse_configureGet(self, options, resource, request):
     1081        response = domish.Element((NS_PUBSUB_OWNER, "pubsub"))
     1082        configure = response.addElement("configure")
     1083        form = self._formFromConfiguration(resource, options)
     1084        configure.addChild(form.toElement())
     1085
     1086        if request.nodeIdentifier:
     1087            configure["node"] = request.nodeIdentifier
     1088
     1089        return response
     1090
     1091
     1092    def _preProcess_configureSet(self, resource, request):
    10511093        if request.options:
    1052             request.options = self._checkConfiguration(request.options)
    1053             return self.setConfiguration(request.sender, request.recipient,
    1054                                          request.nodeIdentifier,
    1055                                          request.options)
     1094            request.options = self._checkConfiguration(resource,
     1095                                                       request.options)
     1096            return request
    10561097        else:
    10571098            return None
    10581099
    10591100
    1060 
    1061     def _on_items(self, request):
    1062 
    1063         def toResponse(result):
    1064             response = domish.Element((NS_PUBSUB, 'pubsub'))
    1065             items = response.addElement('items')
    1066             items["node"] = request.nodeIdentifier
    1067 
    1068             for item in result:
    1069                 items.addChild(item)
    1070 
    1071             return response
    1072 
    1073         d = self.items(request.sender, request.recipient,
    1074                        request.nodeIdentifier, request.maxItems,
    1075                        request.itemIdentifiers)
    1076         d.addCallback(toResponse)
    1077         return d
    1078 
    1079 
    1080     def _on_retract(self, request):
    1081         return self.retract(request.sender, request.recipient,
    1082                             request.nodeIdentifier, request.itemIdentifiers)
    1083 
    1084 
    1085     def _on_purge(self, request):
    1086         return self.purge(request.sender, request.recipient,
    1087                           request.nodeIdentifier)
    1088 
    1089 
    1090     def _on_delete(self, request):
    1091         return self.delete(request.sender, request.recipient,
    1092                            request.nodeIdentifier)
    1093 
    1094 
    1095     def _on_affiliationsGet(self, iq):
    1096         raise Unsupported('modify-affiliations')
    1097 
    1098 
    1099     def _on_affiliationsSet(self, iq):
    1100         raise Unsupported('modify-affiliations')
    1101 
    1102 
    1103     def _on_subscriptionsGet(self, iq):
    1104         raise Unsupported('manage-subscriptions')
    1105 
    1106 
    1107     def _on_subscriptionsSet(self, iq):
    1108         raise Unsupported('manage-subscriptions')
    1109 
    1110     # public methods
     1101    def _toResponse_items(self, result, resource, request):
     1102        response = domish.Element((NS_PUBSUB, 'pubsub'))
     1103        items = response.addElement('items')
     1104        items["node"] = request.nodeIdentifier
     1105
     1106        for item in result:
     1107            items.addChild(item)
     1108
     1109        return response
     1110
    11111111
    11121112    def _createNotification(self, eventType, service, nodeIdentifier,
     
    11311131
    11321132        return message
     1133
     1134    # public methods
    11331135
    11341136    def notifyPublish(self, service, nodeIdentifier, notifications):
     
    12161218    def delete(self, requestor, service, nodeIdentifier):
    12171219        raise Unsupported('delete-nodes')
     1220
     1221
     1222
     1223class PubSubResource(object):
     1224
     1225    implements(IPubSubResource)
     1226
     1227    features = []
     1228    discoIdentity = disco.DiscoIdentity('pubsub',
     1229                                        'service',
     1230                                        'Publish-Subscribe Service')
     1231
     1232
     1233    def locateResource(self, request):
     1234        return self
     1235
     1236
     1237    def getInfo(self, requestor, service, nodeIdentifier):
     1238        return defer.succeed(None)
     1239
     1240
     1241    def getNodes(self, requestor, service, nodeIdentifier):
     1242        return defer.succeed([])
     1243
     1244
     1245    def getConfigurationOptions(self):
     1246        return {}
     1247
     1248
     1249    def publish(self, request):
     1250        return defer.fail(Unsupported('publish'))
     1251
     1252
     1253    def subscribe(self, request):
     1254        return defer.fail(Unsupported('subscribe'))
     1255
     1256
     1257    def unsubscribe(self, request):
     1258        return defer.fail(Unsupported('subscribe'))
     1259
     1260
     1261    def subscriptions(self, request):
     1262        return defer.fail(Unsupported('retrieve-subscriptions'))
     1263
     1264
     1265    def affiliations(self, request):
     1266        return defer.fail(Unsupported('retrieve-affiliations'))
     1267
     1268
     1269    def create(self, request):
     1270        return defer.fail(Unsupported('create-nodes'))
     1271
     1272
     1273    def default(self, request):
     1274        return defer.fail(Unsupported('retrieve-default'))
     1275
     1276
     1277    def configureGet(self, request):
     1278        return defer.fail(Unsupported('config-node'))
     1279
     1280
     1281    def configureSet(self, request):
     1282        return defer.fail(Unsupported('config-node'))
     1283
     1284
     1285    def items(self, request):
     1286        return defer.fail(Unsupported('retrieve-items'))
     1287
     1288
     1289    def retract(self, request):
     1290        return defer.fail(Unsupported('retract-items'))
     1291
     1292
     1293    def purge(self, request):
     1294        return defer.fail(Unsupported('purge-nodes'))
     1295
     1296
     1297    def delete(self, request):
     1298        return defer.fail(Unsupported('delete-nodes'))
     1299
     1300
     1301    def affiliationsGet(self, request):
     1302        return defer.fail(Unsupported('modify-affiliations'))
     1303
     1304
     1305    def affiliationsSet(self, request):
     1306        return defer.fail(Unsupported('modify-affiliations'))
     1307
     1308
     1309    def subscriptionsGet(self, request):
     1310        return defer.fail(Unsupported('manage-subscriptions'))
     1311
     1312
     1313    def subscriptionsSet(self, request):
     1314        return defer.fail(Unsupported('manage-subscriptions'))
Note: See TracChangeset for help on using the changeset viewer.