# -*- test-case-name: wokkel.test.test_pubsub -*- # # Copyright (c) 2003-2008 Ralph Meijer # See LICENSE for details. """ XMPP publish-subscribe protocol. This protocol is specified in U{XEP-0060}. """ from zope.interface import implements from twisted.internet import defer, reactor from twisted.words.protocols.jabber import jid, error, xmlstream from twisted.words.xish import domish from wokkel import disco, data_form from wokkel.subprotocols import IQHandlerMixin, XMPPHandler from wokkel.iwokkel import IPubSubClient, IPubSubService # Iq get and set XPath queries IQ_GET = '/iq[@type="get"]' IQ_SET = '/iq[@type="set"]' # Publish-subscribe namespaces NS_PUBSUB = 'http://jabber.org/protocol/pubsub' NS_PUBSUB_EVENT = NS_PUBSUB + '#event' NS_PUBSUB_ERRORS = NS_PUBSUB + '#errors' NS_PUBSUB_OWNER = NS_PUBSUB + "#owner" NS_PUBSUB_NODE_CONFIG = NS_PUBSUB + "#node_config" NS_PUBSUB_META_DATA = NS_PUBSUB + "#meta-data" # In publish-subscribe namespace XPath query selector. IN_NS_PUBSUB = '[@xmlns="' + NS_PUBSUB + '"]' IN_NS_PUBSUB_OWNER = '[@xmlns="' + NS_PUBSUB_OWNER + '"]' # Publish-subscribe XPath queries PUBSUB_ELEMENT = '/pubsub' + IN_NS_PUBSUB PUBSUB_OWNER_ELEMENT = '/pubsub' + IN_NS_PUBSUB_OWNER PUBSUB_GET = IQ_GET + PUBSUB_ELEMENT PUBSUB_SET = IQ_SET + PUBSUB_ELEMENT PUBSUB_OWNER_GET = IQ_GET + PUBSUB_OWNER_ELEMENT PUBSUB_OWNER_SET = IQ_SET + PUBSUB_OWNER_ELEMENT # Publish-subscribe command XPath queries PUBSUB_PUBLISH = PUBSUB_SET + '/publish' + IN_NS_PUBSUB PUBSUB_CREATE = PUBSUB_SET + '/create' + IN_NS_PUBSUB PUBSUB_SUBSCRIBE = PUBSUB_SET + '/subscribe' + IN_NS_PUBSUB PUBSUB_UNSUBSCRIBE = PUBSUB_SET + '/unsubscribe' + IN_NS_PUBSUB PUBSUB_OPTIONS_GET = PUBSUB_GET + '/options' + IN_NS_PUBSUB PUBSUB_OPTIONS_SET = PUBSUB_SET + '/options' + IN_NS_PUBSUB PUBSUB_DEFAULT = PUBSUB_OWNER_GET + '/default' + IN_NS_PUBSUB_OWNER PUBSUB_CONFIGURE_GET = PUBSUB_OWNER_GET + '/configure' + IN_NS_PUBSUB_OWNER PUBSUB_CONFIGURE_SET = PUBSUB_OWNER_SET + '/configure' + IN_NS_PUBSUB_OWNER PUBSUB_SUBSCRIPTIONS = PUBSUB_GET + '/subscriptions' + IN_NS_PUBSUB PUBSUB_AFFILIATIONS = PUBSUB_GET + '/affiliations' + IN_NS_PUBSUB PUBSUB_AFFILIATIONS_GET = PUBSUB_OWNER_GET + '/affiliations' + \ IN_NS_PUBSUB_OWNER PUBSUB_AFFILIATIONS_SET = PUBSUB_OWNER_SET + '/affiliations' + \ IN_NS_PUBSUB_OWNER PUBSUB_SUBSCRIPTIONS_GET = PUBSUB_OWNER_GET + '/subscriptions' + \ IN_NS_PUBSUB_OWNER PUBSUB_SUBSCRIPTIONS_SET = PUBSUB_OWNER_SET + '/subscriptions' + \ IN_NS_PUBSUB_OWNER PUBSUB_ITEMS = PUBSUB_GET + '/items' + IN_NS_PUBSUB PUBSUB_RETRACT = PUBSUB_SET + '/retract' + IN_NS_PUBSUB PUBSUB_PURGE = PUBSUB_OWNER_SET + '/purge' + IN_NS_PUBSUB_OWNER PUBSUB_DELETE = PUBSUB_OWNER_SET + '/delete' + IN_NS_PUBSUB_OWNER class BadRequest(error.StanzaError): """ Bad request stanza error. """ def __init__(self): error.StanzaError.__init__(self, 'bad-request') class SubscriptionPending(Exception): """ Raised when the requested subscription is pending acceptance. """ class SubscriptionUnconfigured(Exception): """ Raised when the requested subscription needs to be configured before becoming active. """ class PubSubError(error.StanzaError): """ Exception with publish-subscribe specific condition. """ def __init__(self, condition, pubsubCondition, feature=None, text=None): appCondition = domish.Element((NS_PUBSUB_ERRORS, pubsubCondition)) if feature: appCondition['feature'] = feature error.StanzaError.__init__(self, condition, text=text, appCondition=appCondition) class Unsupported(PubSubError): def __init__(self, feature, text=None): PubSubError.__init__(self, 'feature-not-implemented', 'unsupported', feature, text) class OptionsUnavailable(Unsupported): def __init__(self): Unsupported.__init__(self, 'subscription-options-unavailable') class Item(domish.Element): """ Publish subscribe item. This behaves like an object providing L{domish.IElement}. Item payload can be added using C{addChild} or C{addRawXml}, or using the C{payload} keyword argument to C{__init__}. """ def __init__(self, id=None, payload=None): """ @param id: optional item identifier @type id: L{unicode} @param payload: optional item payload. Either as a domish element, or as serialized XML. @type payload: object providing L{domish.IElement} or L{unicode}. """ domish.Element.__init__(self, (None, 'item')) if id is not None: self['id'] = id if payload is not None: if isinstance(payload, basestring): self.addRawXml(payload) else: self.addChild(payload) class PubSubRequest(xmlstream.IQ): """ Base class for publish subscribe user requests. @cvar namespace: request namespace @cvar verb: request verb @cvar method: type attribute of the IQ request. Either C{'set'} or C{'get'} @ivar command: command element of the request. This is the direct child of the C{pubsub} element in the C{namespace} with the name C{verb}. """ namespace = NS_PUBSUB method = 'set' def __init__(self, xs): xmlstream.IQ.__init__(self, xs, self.method) self.addElement((self.namespace, 'pubsub')) self.command = self.pubsub.addElement(self.verb) def send(self, to): """ Send out request. Extends L{xmlstream.IQ.send} by requiring the C{to} parameter to be a L{JID} instance. @param to: Entity to send the request to. @type to: L{JID} """ destination = to.full() return xmlstream.IQ.send(self, destination) class CreateNode(PubSubRequest): verb = 'create' def __init__(self, xs, node=None): PubSubRequest.__init__(self, xs) if node: self.command["node"] = node class DeleteNode(PubSubRequest): verb = 'delete' def __init__(self, xs, node): PubSubRequest.__init__(self, xs) self.command["node"] = node class Subscribe(PubSubRequest): verb = 'subscribe' def __init__(self, xs, node, subscriber): PubSubRequest.__init__(self, xs) self.command["node"] = node self.command["jid"] = subscriber.full() class Unsubscribe(PubSubRequest): verb = 'unsubscribe' def __init__(self, xs, node, subscriber): PubSubRequest.__init__(self, xs) self.command["node"] = node self.command["jid"] = subscriber.full() class Publish(PubSubRequest): verb = 'publish' def __init__(self, xs, node): PubSubRequest.__init__(self, xs) self.command["node"] = node def addItem(self, id=None, payload=None): item = self.command.addElement("item") item.addChild(payload) if id is not None: item["id"] = id return item class Items(PubSubRequest): verb = 'items' method = 'get' def __init__(self, xs, node): PubSubRequest.__init__(self, xs) self.command["node"] = node class PubSubClient(XMPPHandler): """ Publish subscribe client protocol. """ implements(IPubSubClient) def connectionInitialized(self): self.xmlstream.addObserver('/message/event[@xmlns="%s"]' % NS_PUBSUB_EVENT, self._onEvent) def _onEvent(self, message): try: service = jid.JID(message["from"]) recipient = jid.JID(message["to"]) except KeyError: return actionElement = None for element in message.event.elements(): if element.uri == NS_PUBSUB_EVENT: actionElement = element if not actionElement: return eventHandler = getattr(self, "_onEvent_%s" % actionElement.name, None) if eventHandler: eventHandler(service, recipient, actionElement) message.handled = True def _onEvent_items(self, service, recipient, action): nodeIdentifier = action["node"] items = [element for element in action.elements() if element.name in ('item', 'retract')] self.itemsReceived(recipient, service, nodeIdentifier, items) def _onEvent_delete(self, service, recipient, action): nodeIdentifier = action["node"] self.deleteReceived(recipient, service, nodeIdentifier) def _onEvent_purge(self, service, recipient, action): nodeIdentifier = action["node"] self.purgeReceived(recipient, service, nodeIdentifier) def itemsReceived(self, recipient, service, nodeIdentifier, items): pass def deleteReceived(self, recipient, service, nodeIdentifier): pass def purgeReceived(self, recipient, service, nodeIdentifier): pass def createNode(self, service, nodeIdentifier=None): request = CreateNode(self.xmlstream, nodeIdentifier) def cb(iq): try: new_node = iq.pubsub.create["node"] except AttributeError: # the suggested node identifier was accepted new_node = nodeIdentifier return new_node return request.send(service).addCallback(cb) def deleteNode(self, service, nodeIdentifier): return DeleteNode(self.xmlstream, nodeIdentifier).send(service) def subscribe(self, service, nodeIdentifier, subscriber): request = Subscribe(self.xmlstream, nodeIdentifier, subscriber) def cb(iq): subscription = iq.pubsub.subscription["subscription"] if subscription == 'pending': raise SubscriptionPending elif subscription == 'unconfigured': raise SubscriptionUnconfigured else: # we assume subscription == 'subscribed' # any other value would be invalid, but that should have # yielded a stanza error. return None return request.send(service).addCallback(cb) def unsubscribe(self, service, nodeIdentifier, subscriber): request = Unsubscribe(self.xmlstream, nodeIdentifier, subscriber) return request.send(service) def publish(self, service, nodeIdentifier, items=[]): request = Publish(self.xmlstream, nodeIdentifier) for item in items: request.command.addChild(item) return request.send(service) def items(self, service, nodeIdentifier): def cb(iq): items = [] for element in iq.pubsub.items.elements(): if element.uri == NS_PUBSUB and element.name == 'item': items.append(element) return items request = Items(self.xmlstream, nodeIdentifier) return request.send(service).addCallback(cb) class PubSubService(XMPPHandler, IQHandlerMixin): """ Protocol implementation for a XMPP Publish Subscribe Service. The word Service here is used as taken from the Publish Subscribe specification. It is the party responsible for keeping nodes and their subscriptions, and sending out notifications. Methods from the L{IPubSubService} interface that are called as a result of an XMPP request may raise exceptions. Alternatively the deferred returned by these methods may have their errback called. These are handled as follows: * If the exception is an instance of L{error.StanzaError}, an error response iq is returned. * Any other exception is reported using L{log.msg}. An error response with the condition C{internal-server-error} is returned. The default implementation of said methods raises an L{Unsupported} exception and are meant to be overridden. @ivar discoIdentity: Service discovery identity as a dictionary with keys C{'category'}, C{'type'} and C{'name'}. @ivar pubSubFeatures: List of supported publish-subscribe features for service discovery, as C{str}. @type pubSubFeatures: C{list} or C{None}. """ implements(IPubSubService) iqHandlers = { PUBSUB_PUBLISH: '_onPublish', PUBSUB_CREATE: '_onCreate', PUBSUB_SUBSCRIBE: '_onSubscribe', PUBSUB_OPTIONS_GET: '_onOptionsGet', PUBSUB_OPTIONS_SET: '_onOptionsSet', PUBSUB_AFFILIATIONS: '_onAffiliations', PUBSUB_ITEMS: '_onItems', PUBSUB_RETRACT: '_onRetract', PUBSUB_SUBSCRIPTIONS: '_onSubscriptions', PUBSUB_UNSUBSCRIBE: '_onUnsubscribe', PUBSUB_AFFILIATIONS_GET: '_onAffiliationsGet', PUBSUB_AFFILIATIONS_SET: '_onAffiliationsSet', PUBSUB_CONFIGURE_GET: '_onConfigureGet', PUBSUB_CONFIGURE_SET: '_onConfigureSet', PUBSUB_DEFAULT: '_onDefault', PUBSUB_PURGE: '_onPurge', PUBSUB_DELETE: '_onDelete', PUBSUB_SUBSCRIPTIONS_GET: '_onSubscriptionsGet', PUBSUB_SUBSCRIPTIONS_SET: '_onSubscriptionsSet', } def __init__(self): self.discoIdentity = {'category': 'pubsub', 'type': 'generic', 'name': 'Generic Publish-Subscribe Service'} self.pubSubFeatures = [] def connectionMade(self): self.xmlstream.addObserver(PUBSUB_GET, self.handleRequest) self.xmlstream.addObserver(PUBSUB_SET, self.handleRequest) self.xmlstream.addObserver(PUBSUB_OWNER_GET, self.handleRequest) self.xmlstream.addObserver(PUBSUB_OWNER_SET, self.handleRequest) def getDiscoInfo(self, requestor, target, nodeIdentifier): info = [] if not nodeIdentifier: info.append(disco.DiscoIdentity(**self.discoIdentity)) info.append(disco.DiscoFeature(disco.NS_ITEMS)) info.extend([disco.DiscoFeature("%s#%s" % (NS_PUBSUB, feature)) for feature in self.pubSubFeatures]) return defer.succeed(info) else: def toInfo(nodeInfo): if not nodeInfo: return [] (nodeType, metaData) = nodeInfo['type'], nodeInfo['meta-data'] info.append(disco.DiscoIdentity('pubsub', nodeType)) if metaData: form = data_form.Form(type="result", form_type=NS_PUBSUB_META_DATA) form.add_field("text-single", "pubsub#node_type", "The type of node (collection or leaf)", nodeType) for metaDatum in metaData: form.add_field(**metaDatum) info.append(form) return info d = self.getNodeInfo(requestor, target, nodeIdentifier) d.addCallback(toInfo) return d def getDiscoItems(self, requestor, target, nodeIdentifier): if nodeIdentifier or self.hideNodes: return defer.succeed([]) d = self.getNodes(requestor, target) d.addCallback(lambda nodes: [disco.DiscoItem(target, node) for node in nodes]) return d def _onPublish(self, iq): requestor = jid.internJID(iq["from"]).userhostJID() service = jid.internJID(iq["to"]) try: nodeIdentifier = iq.pubsub.publish["node"] except KeyError: raise BadRequest items = [] for element in iq.pubsub.publish.elements(): if element.uri == NS_PUBSUB and element.name == 'item': items.append(element) return self.publish(requestor, service, nodeIdentifier, items) def _onSubscribe(self, iq): requestor = jid.internJID(iq["from"]).userhostJID() service = jid.internJID(iq["to"]) try: nodeIdentifier = iq.pubsub.subscribe["node"] subscriber = jid.internJID(iq.pubsub.subscribe["jid"]) except KeyError: raise BadRequest def toResponse(subscription): nodeIdentifier, state = subscription response = domish.Element((NS_PUBSUB, "pubsub")) subscription = response.addElement("subscription") subscription["node"] = nodeIdentifier subscription["jid"] = subscriber.full() subscription["subscription"] = state return response d = self.subscribe(requestor, service, nodeIdentifier, subscriber) d.addCallback(toResponse) return d def _onUnsubscribe(self, iq): requestor = jid.internJID(iq["from"]).userhostJID() service = jid.internJID(iq["to"]) try: nodeIdentifier = iq.pubsub.unsubscribe["node"] subscriber = jid.internJID(iq.pubsub.unsubscribe["jid"]) except KeyError: raise BadRequest return self.unsubscribe(requestor, service, nodeIdentifier, subscriber) def _onOptionsGet(self, iq): raise Unsupported('subscription-options-unavailable') def _onOptionsSet(self, iq): raise Unsupported('subscription-options-unavailable') def _onSubscriptions(self, iq): requestor = jid.internJID(iq["from"]).userhostJID() service = jid.internJID(iq["to"]) def toResponse(result): response = domish.Element((NS_PUBSUB, 'pubsub')) subscriptions = response.addElement('subscriptions') for node, subscriber, state in result: item = subscriptions.addElement('subscription') item['node'] = node item['jid'] = subscriber.full() item['subscription'] = state return response d = self.subscriptions(requestor, service) d.addCallback(toResponse) return d def _onAffiliations(self, iq): requestor = jid.internJID(iq["from"]).userhostJID() service = jid.internJID(iq["to"]) def toResponse(result): response = domish.Element((NS_PUBSUB, 'pubsub')) affiliations = response.addElement('affiliations') for nodeIdentifier, affiliation in result: item = affiliations.addElement('affiliation') item['node'] = nodeIdentifier item['affiliation'] = affiliation return response d = self.affiliations(requestor, service) d.addCallback(toResponse) return d def _onCreate(self, iq): requestor = jid.internJID(iq["from"]).userhostJID() service = jid.internJID(iq["to"]) nodeIdentifier = iq.pubsub.create.getAttribute("node") def toResponse(result): if not nodeIdentifier or nodeIdentifier != result: response = domish.Element((NS_PUBSUB, 'pubsub')) create = response.addElement('create') create['node'] = result return response else: return None d = self.create(requestor, service, nodeIdentifier) d.addCallback(toResponse) return d def _formFromConfiguration(self, options): form = data_form.Form(type="form", form_type=NS_PUBSUB_NODE_CONFIG) for option in options: form.add_field(**option) return form def _onDefault(self, iq): requestor = jid.internJID(iq["from"]).userhostJID() service = jid.internJID(iq["to"]) def toResponse(options): response = domish.Element((NS_PUBSUB_OWNER, "pubsub")) default = response.addElement("default") default.addChild(self._formFromConfiguration(options)) return response d = self.getDefaultConfiguration(requestor, service) d.addCallback(toResponse) return d def _onConfigureGet(self, iq): requestor = jid.internJID(iq["from"]).userhostJID() service = jid.internJID(iq["to"]) nodeIdentifier = iq.pubsub.configure.getAttribute("node") def toResponse(options): response = domish.Element((NS_PUBSUB_OWNER, "pubsub")) configure = response.addElement("configure") configure.addChild(self._formFromConfiguration(options)) if nodeIdentifier: configure["node"] = nodeIdentifier return response d = self.getConfiguration(requestor, service, nodeIdentifier) d.addCallback(toResponse) return d def _onConfigureSet(self, iq): requestor = jid.internJID(iq["from"]).userhostJID() service = jid.internJID(iq["to"]) nodeIdentifier = iq.pubsub.configure["node"] def getFormOptions(self, form): options = {} for element in form.elements(): if element.name == 'field' and \ element.uri == data_form.NS_X_DATA: try: options[element["var"]] = str(element.value) except (KeyError, AttributeError): raise BadRequest return options # Search configuration form with correct FORM_TYPE and process it for element in iq.pubsub.configure.elements(): if element.name != 'x' or element.uri != data_form.NS_X_DATA: continue type = element.getAttribute("type") if type == "cancel": return None elif type != "submit": continue options = getFormOptions(element) if options["FORM_TYPE"] == NS_PUBSUB + "#node_config": del options["FORM_TYPE"] return self.setConfiguration(requestor, service, nodeIdentifier, options) raise BadRequest def _onItems(self, iq): requestor = jid.internJID(iq["from"]).userhostJID() service = jid.internJID(iq["to"]) try: nodeIdentifier = iq.pubsub.items["node"] except KeyError: raise BadRequest maxItems = iq.pubsub.items.getAttribute('max_items') if maxItems: try: maxItems = int(maxItems) except ValueError: raise BadRequest itemIdentifiers = [] for child in iq.pubsub.items.elements(): if child.name == 'item' and child.uri == NS_PUBSUB: try: itemIdentifiers.append(child["id"]) except KeyError: raise BadRequest def toResponse(result): response = domish.Element((NS_PUBSUB, 'pubsub')) items = response.addElement('items') items["node"] = nodeIdentifier for item in result: items.addRawXml(item) return response d = self.items(requestor, service, nodeIdentifier, maxItems, itemIdentifiers) d.addCallback(toResponse) return d def _onRetract(self, iq): requestor = jid.internJID(iq["from"]).userhostJID() service = jid.internJID(iq["to"]) try: nodeIdentifier = iq.pubsub.retract["node"] except KeyError: raise BadRequest itemIdentifiers = [] for child in iq.pubsub.retract.elements(): if child.uri == NS_PUBSUB_OWNER and child.name == 'item': try: itemIdentifiers.append(child["id"]) except KeyError: raise BadRequest return self.retract(requestor, service, nodeIdentifier, itemIdentifiers) def _onPurge(self, iq): requestor = jid.internJID(iq["from"]).userhostJID() service = jid.internJID(iq["to"]) try: nodeIdentifier = iq.pubsub.purge["node"] except KeyError: raise BadRequest return self.purge(requestor, service, nodeIdentifier) def _onDelete(self, iq): requestor = jid.internJID(iq["from"]).userhostJID() service = jid.internJID(iq["to"]) try: nodeIdentifier = iq.pubsub.delete["node"] except KeyError: raise BadRequest return self.delete(requestor, service, nodeIdentifier) def _onAffiliationsGet(self, iq): raise Unsupported('modify-affiliations') def _onAffiliationsSet(self, iq): raise Unsupported('modify-affiliations') def _onSubscriptionsGet(self, iq): raise Unsupported('manage-subscriptions') def _onSubscriptionsSet(self, iq): raise Unsupported('manage-subscriptions') # public methods def notifyPublish(self, service, nodeIdentifier, notifications): for recipient, items in notifications: message = domish.Element((None, "message")) message["from"] = service.full() message["to"] = recipient.full() event = message.addElement((NS_PUBSUB_EVENT, "event")) element = event.addElement("items") element["node"] = nodeIdentifier element.children = items self.send(message) def notifyDelete(self, service, nodeIdentifier, recipients): for recipient in recipients: message = domish.Element((None, "message")) message["from"] = service.full() message["to"] = recipient.full() event = message.addElement((NS_PUBSUB_EVENT, "event")) element = event.addElement("delete") element["node"] = nodeIdentifier self.send(message) def getNodeInfo(self, requestor, service, nodeIdentifier): return None def getNodes(self, requestor, service): return [] def publish(self, requestor, service, nodeIdentifier, items): raise Unsupported('publish') def subscribe(self, requestor, service, nodeIdentifier, subscriber): raise Unsupported('subscribe') def unsubscribe(self, requestor, service, nodeIdentifier, subscriber): raise Unsupported('subscribe') def subscriptions(self, requestor, service): raise Unsupported('retrieve-subscriptions') def affiliations(self, requestor, service): raise Unsupported('retrieve-affiliations') def create(self, requestor, service, nodeIdentifier): raise Unsupported('create-nodes') def getDefaultConfiguration(self, requestor, service): raise Unsupported('retrieve-default') def getConfiguration(self, requestor, service, nodeIdentifier): raise Unsupported('config-node') def setConfiguration(self, requestor, service, nodeIdentifier, options): raise Unsupported('config-node') def items(self, requestor, service, nodeIdentifier, maxItems, itemIdentifiers): raise Unsupported('retrieve-items') def retract(self, requestor, service, nodeIdentifier, itemIdentifiers): raise Unsupported('retract-items') def purge(self, requestor, service, nodeIdentifier): raise Unsupported('purge-nodes') def delete(self, requestor, service, nodeIdentifier): raise Unsupported('delete-nodes')