source: wokkel/pubsub.py @ 46:4ee1f9c08b22

Last change on this file since 46:4ee1f9c08b22 was 43:0a525d09169d, checked in by Ralph Meijer <ralphm@…>, 14 years ago

Add support for sending and receiving node delete notifications with redirect.

File size: 33.3 KB
RevLine 
[1]1# -*- test-case-name: wokkel.test.test_pubsub -*-
2#
[10]3# Copyright (c) 2003-2008 Ralph Meijer
[1]4# See LICENSE for details.
5
6"""
7XMPP publish-subscribe protocol.
8
9This protocol is specified in
10U{XEP-0060<http://www.xmpp.org/extensions/xep-0060.html>}.
11"""
12
13from zope.interface import implements
14
[19]15from twisted.internet import defer
[2]16from twisted.words.protocols.jabber import jid, error, xmlstream
[1]17from twisted.words.xish import domish
18
[27]19from wokkel import disco, data_form, shim
[1]20from wokkel.subprotocols import IQHandlerMixin, XMPPHandler
[2]21from wokkel.iwokkel import IPubSubClient, IPubSubService
[1]22
23# Iq get and set XPath queries
24IQ_GET = '/iq[@type="get"]'
25IQ_SET = '/iq[@type="set"]'
26
27# Publish-subscribe namespaces
28NS_PUBSUB = 'http://jabber.org/protocol/pubsub'
29NS_PUBSUB_EVENT = NS_PUBSUB + '#event'
30NS_PUBSUB_ERRORS = NS_PUBSUB + '#errors'
31NS_PUBSUB_OWNER = NS_PUBSUB + "#owner"
32NS_PUBSUB_NODE_CONFIG = NS_PUBSUB + "#node_config"
33NS_PUBSUB_META_DATA = NS_PUBSUB + "#meta-data"
34
35# In publish-subscribe namespace XPath query selector.
36IN_NS_PUBSUB = '[@xmlns="' + NS_PUBSUB + '"]'
37IN_NS_PUBSUB_OWNER = '[@xmlns="' + NS_PUBSUB_OWNER + '"]'
38
39# Publish-subscribe XPath queries
40PUBSUB_ELEMENT = '/pubsub' + IN_NS_PUBSUB
41PUBSUB_OWNER_ELEMENT = '/pubsub' + IN_NS_PUBSUB_OWNER
42PUBSUB_GET = IQ_GET + PUBSUB_ELEMENT
43PUBSUB_SET = IQ_SET + PUBSUB_ELEMENT
44PUBSUB_OWNER_GET = IQ_GET + PUBSUB_OWNER_ELEMENT
45PUBSUB_OWNER_SET = IQ_SET + PUBSUB_OWNER_ELEMENT
46
47# Publish-subscribe command XPath queries
48PUBSUB_PUBLISH = PUBSUB_SET + '/publish' + IN_NS_PUBSUB
49PUBSUB_CREATE = PUBSUB_SET + '/create' + IN_NS_PUBSUB
50PUBSUB_SUBSCRIBE = PUBSUB_SET + '/subscribe' + IN_NS_PUBSUB
51PUBSUB_UNSUBSCRIBE = PUBSUB_SET + '/unsubscribe' + IN_NS_PUBSUB
52PUBSUB_OPTIONS_GET = PUBSUB_GET + '/options' + IN_NS_PUBSUB
53PUBSUB_OPTIONS_SET = PUBSUB_SET + '/options' + IN_NS_PUBSUB
54PUBSUB_DEFAULT = PUBSUB_OWNER_GET + '/default' + IN_NS_PUBSUB_OWNER
55PUBSUB_CONFIGURE_GET = PUBSUB_OWNER_GET + '/configure' + IN_NS_PUBSUB_OWNER
56PUBSUB_CONFIGURE_SET = PUBSUB_OWNER_SET + '/configure' + IN_NS_PUBSUB_OWNER
57PUBSUB_SUBSCRIPTIONS = PUBSUB_GET + '/subscriptions' + IN_NS_PUBSUB
58PUBSUB_AFFILIATIONS = PUBSUB_GET + '/affiliations' + IN_NS_PUBSUB
59PUBSUB_AFFILIATIONS_GET = PUBSUB_OWNER_GET + '/affiliations' + \
60                          IN_NS_PUBSUB_OWNER
61PUBSUB_AFFILIATIONS_SET = PUBSUB_OWNER_SET + '/affiliations' + \
62                          IN_NS_PUBSUB_OWNER
63PUBSUB_SUBSCRIPTIONS_GET = PUBSUB_OWNER_GET + '/subscriptions' + \
64                          IN_NS_PUBSUB_OWNER
65PUBSUB_SUBSCRIPTIONS_SET = PUBSUB_OWNER_SET + '/subscriptions' + \
66                          IN_NS_PUBSUB_OWNER
67PUBSUB_ITEMS = PUBSUB_GET + '/items' + IN_NS_PUBSUB
68PUBSUB_RETRACT = PUBSUB_SET + '/retract' + IN_NS_PUBSUB
69PUBSUB_PURGE = PUBSUB_OWNER_SET + '/purge' + IN_NS_PUBSUB_OWNER
70PUBSUB_DELETE = PUBSUB_OWNER_SET + '/delete' + IN_NS_PUBSUB_OWNER
71
[2]72class SubscriptionPending(Exception):
73    """
74    Raised when the requested subscription is pending acceptance.
75    """
76
77
[18]78
[2]79class SubscriptionUnconfigured(Exception):
80    """
81    Raised when the requested subscription needs to be configured before
82    becoming active.
83    """
84
85
[18]86
[1]87class PubSubError(error.StanzaError):
[2]88    """
89    Exception with publish-subscribe specific condition.
90    """
[1]91    def __init__(self, condition, pubsubCondition, feature=None, text=None):
92        appCondition = domish.Element((NS_PUBSUB_ERRORS, pubsubCondition))
93        if feature:
94            appCondition['feature'] = feature
95        error.StanzaError.__init__(self, condition,
96                                         text=text,
97                                         appCondition=appCondition)
98
[2]99
[18]100
[30]101class BadRequest(PubSubError):
102    """
103    Bad request stanza error.
104    """
105    def __init__(self, pubsubCondition=None, text=None):
106        PubSubError.__init__(self, 'bad-request', pubsubCondition, text)
107
108
109
[1]110class Unsupported(PubSubError):
111    def __init__(self, feature, text=None):
112        PubSubError.__init__(self, 'feature-not-implemented',
113                                   'unsupported',
114                                   feature,
115                                   text)
116
[2]117
[18]118
[30]119class Subscription(object):
120    """
121    A subscription to a node.
122
123    @ivar nodeIdentifier: The identifier of the node subscribed to.
124                          The root node is denoted by C{None}.
125    @ivar subscriber: The subscribing entity.
126    @ivar state: The subscription state. One of C{'subscribed'}, C{'pending'},
127                 C{'unconfigured'}.
128    @ivar options: Optional list of subscription options.
129    @type options: C{dict}.
130    """
131
132    def __init__(self, nodeIdentifier, subscriber, state, options=None):
133        self.nodeIdentifier = nodeIdentifier
134        self.subscriber = subscriber
135        self.state = state
136        self.options = options or {}
[1]137
[2]138
[18]139
[2]140class Item(domish.Element):
141    """
142    Publish subscribe item.
143
144    This behaves like an object providing L{domish.IElement}.
145
146    Item payload can be added using C{addChild} or C{addRawXml}, or using the
147    C{payload} keyword argument to C{__init__}.
148    """
149
150    def __init__(self, id=None, payload=None):
151        """
152        @param id: optional item identifier
153        @type id: L{unicode}
154        @param payload: optional item payload. Either as a domish element, or
155                        as serialized XML.
156        @type payload: object providing L{domish.IElement} or L{unicode}.
157        """
158
[18]159        domish.Element.__init__(self, (NS_PUBSUB, 'item'))
[2]160        if id is not None:
161            self['id'] = id
162        if payload is not None:
163            if isinstance(payload, basestring):
164                self.addRawXml(payload)
165            else:
166                self.addChild(payload)
167
[10]168
[18]169
170class _PubSubRequest(xmlstream.IQ):
[2]171    """
[18]172    Publish subscribe request.
[2]173
[18]174    @ivar verb: Request verb
175    @type verb: C{str}
176    @ivar namespace: Request namespace.
177    @type namespace: C{str}
178    @ivar method: Type attribute of the IQ request. Either C{'set'} or C{'get'}
179    @type method: C{str}
180    @ivar command: Command element of the request. This is the direct child of
[2]181                   the C{pubsub} element in the C{namespace} with the name
182                   C{verb}.
183    """
184
[18]185    def __init__(self, xs, verb, namespace=NS_PUBSUB, method='set'):
186        xmlstream.IQ.__init__(self, xs, method)
187        self.addElement((namespace, 'pubsub'))
[2]188
[18]189        self.command = self.pubsub.addElement(verb)
[2]190
[25]191
[2]192    def send(self, to):
[10]193        """
194        Send out request.
195
196        Extends L{xmlstream.IQ.send} by requiring the C{to} parameter to be
197        a L{JID} instance.
198
199        @param to: Entity to send the request to.
200        @type to: L{JID}
201        """
202        destination = to.full()
[2]203        return xmlstream.IQ.send(self, destination)
204
[10]205
206
[27]207class PubSubEvent(object):
208    """
209    A publish subscribe event.
210
211    @param sender: The entity from which the notification was received.
212    @type sender: L{jid.JID}
213    @param recipient: The entity to which the notification was sent.
214    @type recipient: L{wokkel.pubsub.ItemsEvent}
215    @param nodeIdentifier: Identifier of the node the event pertains to.
216    @type nodeIdentifier: C{unicode}
217    @param headers: SHIM headers, see L{wokkel.shim.extractHeaders}.
218    @type headers: L{dict}
219    """
220
221    def __init__(self, sender, recipient, nodeIdentifier, headers):
222        self.sender = sender
223        self.recipient = recipient
224        self.nodeIdentifier = nodeIdentifier
225        self.headers = headers
226
227
228
229class ItemsEvent(PubSubEvent):
230    """
231    A publish-subscribe event that signifies new, updated and retracted items.
232
233    @param items: List of received items as domish elements.
234    @type items: C{list} of L{domish.Element}
235    """
236
237    def __init__(self, sender, recipient, nodeIdentifier, items, headers):
238        PubSubEvent.__init__(self, sender, recipient, nodeIdentifier, headers)
239        self.items = items
240
241
242
243class DeleteEvent(PubSubEvent):
244    """
245    A publish-subscribe event that signifies the deletion of a node.
246    """
247
[43]248    redirectURI = None
249
[27]250
251
252class PurgeEvent(PubSubEvent):
253    """
254    A publish-subscribe event that signifies the purging of a node.
255    """
256
257
258
[2]259class PubSubClient(XMPPHandler):
260    """
261    Publish subscribe client protocol.
262    """
263
264    implements(IPubSubClient)
265
266    def connectionInitialized(self):
[13]267        self.xmlstream.addObserver('/message/event[@xmlns="%s"]' %
268                                   NS_PUBSUB_EVENT, self._onEvent)
[2]269
[25]270
[13]271    def _onEvent(self, message):
[2]272        try:
[27]273            sender = jid.JID(message["from"])
[6]274            recipient = jid.JID(message["to"])
[2]275        except KeyError:
276            return
277
[15]278        actionElement = None
[13]279        for element in message.event.elements():
280            if element.uri == NS_PUBSUB_EVENT:
281                actionElement = element
282
283        if not actionElement:
284            return
285
286        eventHandler = getattr(self, "_onEvent_%s" % actionElement.name, None)
287
288        if eventHandler:
[27]289            headers = shim.extractHeaders(message)
290            eventHandler(sender, recipient, actionElement, headers)
[13]291            message.handled = True
292
[30]293
[27]294    def _onEvent_items(self, sender, recipient, action, headers):
[13]295        nodeIdentifier = action["node"]
296
297        items = [element for element in action.elements()
298                         if element.name in ('item', 'retract')]
[2]299
[27]300        event = ItemsEvent(sender, recipient, nodeIdentifier, items, headers)
301        self.itemsReceived(event)
[2]302
[30]303
[27]304    def _onEvent_delete(self, sender, recipient, action, headers):
[13]305        nodeIdentifier = action["node"]
[27]306        event = DeleteEvent(sender, recipient, nodeIdentifier, headers)
[43]307        if action.redirect:
308            event.redirectURI = action.redirect.getAttribute('uri')
[27]309        self.deleteReceived(event)
[13]310
[30]311
[27]312    def _onEvent_purge(self, sender, recipient, action, headers):
[13]313        nodeIdentifier = action["node"]
[27]314        event = PurgeEvent(sender, recipient, nodeIdentifier, headers)
315        self.purgeReceived(event)
[13]316
[30]317
[27]318    def itemsReceived(self, event):
[2]319        pass
320
[30]321
[27]322    def deleteReceived(self, event):
[13]323        pass
324
[30]325
[27]326    def purgeReceived(self, event):
[13]327        pass
328
[30]329
[6]330    def createNode(self, service, nodeIdentifier=None):
[18]331        """
332        Create a publish subscribe node.
333
334        @param service: The publish subscribe service to create the node at.
335        @type service: L{JID}
336        @param nodeIdentifier: Optional suggestion for the id of the node.
337        @type nodeIdentifier: C{unicode}
338        """
339
340
341        request = _PubSubRequest(self.xmlstream, 'create')
342        if nodeIdentifier:
343            request.command['node'] = nodeIdentifier
[2]344
345        def cb(iq):
346            try:
347                new_node = iq.pubsub.create["node"]
348            except AttributeError:
349                # the suggested node identifier was accepted
[6]350                new_node = nodeIdentifier
[2]351            return new_node
352
353        return request.send(service).addCallback(cb)
354
[25]355
[6]356    def deleteNode(self, service, nodeIdentifier):
[18]357        """
358        Delete a publish subscribe node.
359
360        @param service: The publish subscribe service to delete the node from.
361        @type service: L{JID}
362        @param nodeIdentifier: The identifier of the node.
363        @type nodeIdentifier: C{unicode}
364        """
[33]365        request = _PubSubRequest(self.xmlstream, 'delete', NS_PUBSUB_OWNER)
[18]366        request.command['node'] = nodeIdentifier
367        return request.send(service)
[2]368
[25]369
[6]370    def subscribe(self, service, nodeIdentifier, subscriber):
[18]371        """
372        Subscribe to a publish subscribe node.
373
374        @param service: The publish subscribe service that keeps the node.
375        @type service: L{JID}
376        @param nodeIdentifier: The identifier of the node.
377        @type nodeIdentifier: C{unicode}
378        @param subscriber: The entity to subscribe to the node. This entity
379                           will get notifications of new published items.
380        @type subscriber: L{JID}
381        """
382        request = _PubSubRequest(self.xmlstream, 'subscribe')
[30]383        if nodeIdentifier:
384            request.command['node'] = nodeIdentifier
[18]385        request.command['jid'] = subscriber.full()
[2]386
387        def cb(iq):
388            subscription = iq.pubsub.subscription["subscription"]
389
390            if subscription == 'pending':
391                raise SubscriptionPending
392            elif subscription == 'unconfigured':
393                raise SubscriptionUnconfigured
394            else:
395                # we assume subscription == 'subscribed'
396                # any other value would be invalid, but that should have
397                # yielded a stanza error.
398                return None
399
400        return request.send(service).addCallback(cb)
401
[25]402
[10]403    def unsubscribe(self, service, nodeIdentifier, subscriber):
[18]404        """
405        Unsubscribe from a publish subscribe node.
406
407        @param service: The publish subscribe service that keeps the node.
408        @type service: L{JID}
409        @param nodeIdentifier: The identifier of the node.
410        @type nodeIdentifier: C{unicode}
411        @param subscriber: The entity to unsubscribe from the node.
412        @type subscriber: L{JID}
413        """
414        request = _PubSubRequest(self.xmlstream, 'unsubscribe')
[30]415        if nodeIdentifier:
416            request.command['node'] = nodeIdentifier
[18]417        request.command['jid'] = subscriber.full()
[10]418        return request.send(service)
419
[25]420
[18]421    def publish(self, service, nodeIdentifier, items=None):
422        """
423        Publish to a publish subscribe node.
424
425        @param service: The publish subscribe service that keeps the node.
426        @type service: L{JID}
427        @param nodeIdentifier: The identifier of the node.
428        @type nodeIdentifier: C{unicode}
429        @param items: Optional list of L{Item}s to publish.
430        @type items: C{list}
431        """
432        request = _PubSubRequest(self.xmlstream, 'publish')
433        request.command['node'] = nodeIdentifier
434        if items:
435            for item in items:
436                request.command.addChild(item)
[2]437
438        return request.send(service)
439
[25]440
[18]441    def items(self, service, nodeIdentifier, maxItems=None):
442        """
443        Retrieve previously published items from a publish subscribe node.
444
445        @param service: The publish subscribe service that keeps the node.
446        @type service: L{JID}
447        @param nodeIdentifier: The identifier of the node.
448        @type nodeIdentifier: C{unicode}
449        @param maxItems: Optional limit on the number of retrieved items.
450        @type maxItems: C{int}
451        """
452        request = _PubSubRequest(self.xmlstream, 'items', method='get')
[30]453        if nodeIdentifier:
454            request.command['node'] = nodeIdentifier
[18]455        if maxItems:
456            request.command["max_items"] = str(int(maxItems))
457
[17]458        def cb(iq):
459            items = []
460            for element in iq.pubsub.items.elements():
461                if element.uri == NS_PUBSUB and element.name == 'item':
462                    items.append(element)
463            return items
464
465        return request.send(service).addCallback(cb)
[10]466
[18]467
468
[1]469class PubSubService(XMPPHandler, IQHandlerMixin):
470    """
471    Protocol implementation for a XMPP Publish Subscribe Service.
472
473    The word Service here is used as taken from the Publish Subscribe
474    specification. It is the party responsible for keeping nodes and their
475    subscriptions, and sending out notifications.
476
477    Methods from the L{IPubSubService} interface that are called as
478    a result of an XMPP request may raise exceptions. Alternatively the
479    deferred returned by these methods may have their errback called. These are
480    handled as follows:
481
[22]482     - If the exception is an instance of L{error.StanzaError}, an error
483       response iq is returned.
484     - Any other exception is reported using L{log.msg}. An error response
485       with the condition C{internal-server-error} is returned.
[1]486
487    The default implementation of said methods raises an L{Unsupported}
488    exception and are meant to be overridden.
489
490    @ivar discoIdentity: Service discovery identity as a dictionary with
491                         keys C{'category'}, C{'type'} and C{'name'}.
492    @ivar pubSubFeatures: List of supported publish-subscribe features for
493                          service discovery, as C{str}.
[22]494    @type pubSubFeatures: C{list} or C{None}
[1]495    """
496
497    implements(IPubSubService)
498
499    iqHandlers = {
500            PUBSUB_PUBLISH: '_onPublish',
501            PUBSUB_CREATE: '_onCreate',
502            PUBSUB_SUBSCRIBE: '_onSubscribe',
503            PUBSUB_OPTIONS_GET: '_onOptionsGet',
504            PUBSUB_OPTIONS_SET: '_onOptionsSet',
505            PUBSUB_AFFILIATIONS: '_onAffiliations',
506            PUBSUB_ITEMS: '_onItems',
507            PUBSUB_RETRACT: '_onRetract',
508            PUBSUB_SUBSCRIPTIONS: '_onSubscriptions',
509            PUBSUB_UNSUBSCRIBE: '_onUnsubscribe',
510
511            PUBSUB_AFFILIATIONS_GET: '_onAffiliationsGet',
512            PUBSUB_AFFILIATIONS_SET: '_onAffiliationsSet',
513            PUBSUB_CONFIGURE_GET: '_onConfigureGet',
514            PUBSUB_CONFIGURE_SET: '_onConfigureSet',
515            PUBSUB_DEFAULT: '_onDefault',
516            PUBSUB_PURGE: '_onPurge',
517            PUBSUB_DELETE: '_onDelete',
518            PUBSUB_SUBSCRIPTIONS_GET: '_onSubscriptionsGet',
519            PUBSUB_SUBSCRIPTIONS_SET: '_onSubscriptionsSet',
520
521            }
522
[25]523
[1]524    def __init__(self):
525        self.discoIdentity = {'category': 'pubsub',
526                              'type': 'generic',
527                              'name': 'Generic Publish-Subscribe Service'}
528
529        self.pubSubFeatures = []
530
[25]531
[1]532    def connectionMade(self):
533        self.xmlstream.addObserver(PUBSUB_GET, self.handleRequest)
534        self.xmlstream.addObserver(PUBSUB_SET, self.handleRequest)
535        self.xmlstream.addObserver(PUBSUB_OWNER_GET, self.handleRequest)
536        self.xmlstream.addObserver(PUBSUB_OWNER_SET, self.handleRequest)
537
[25]538
[6]539    def getDiscoInfo(self, requestor, target, nodeIdentifier):
[1]540        info = []
541
542        if not nodeIdentifier:
543            info.append(disco.DiscoIdentity(**self.discoIdentity))
544
545            info.append(disco.DiscoFeature(disco.NS_ITEMS))
546            info.extend([disco.DiscoFeature("%s#%s" % (NS_PUBSUB, feature))
547                         for feature in self.pubSubFeatures])
548
[25]549        def toInfo(nodeInfo):
550            if not nodeInfo:
551                return
[1]552
[25]553            (nodeType, metaData) = nodeInfo['type'], nodeInfo['meta-data']
554            info.append(disco.DiscoIdentity('pubsub', nodeType))
555            if metaData:
556                form = data_form.Form(formType="result",
557                                      formNamespace=NS_PUBSUB_META_DATA)
[29]558                form.addField(
[25]559                        data_form.Field(
560                            var='pubsub#node_type',
561                            value=nodeType,
562                            label='The type of node (collection or leaf)'
563                        )
564                )
[1]565
[25]566                for metaDatum in metaData:
[29]567                    form.addField(data_form.Field.fromDict(metaDatum))
[1]568
[25]569                info.append(form.toElement())
[1]570
[25]571        d = self.getNodeInfo(requestor, target, nodeIdentifier or '')
572        d.addCallback(toInfo)
573        d.addBoth(lambda result: info)
574        return d
575
[1]576
[6]577    def getDiscoItems(self, requestor, target, nodeIdentifier):
[1]578        if nodeIdentifier or self.hideNodes:
579            return defer.succeed([])
580
[7]581        d = self.getNodes(requestor, target)
[1]582        d.addCallback(lambda nodes: [disco.DiscoItem(target, node)
583                                     for node in nodes])
584        return d
585
[25]586
587    def _findForm(self, element, formNamespace):
588        if not element:
589            return None
590
591        form = None
592        for child in element.elements():
593            try:
594                form = data_form.Form.fromElement(child)
595            except data_form.Error:
596                continue
597
598            if form.formNamespace != NS_PUBSUB_NODE_CONFIG:
599                continue
600
601        return form
602
603
[30]604    def _getParameter_node(self, commandElement):
605        try:
606            return commandElement["node"]
607        except KeyError:
608            raise BadRequest('nodeid-required')
609
610
611    def _getParameter_nodeOrEmpty(self, commandElement):
612        return commandElement.getAttribute("node", '')
613
614
615    def _getParameter_jid(self, commandElement):
616        try:
617            return jid.internJID(commandElement["jid"])
618        except KeyError:
619            raise BadRequest('jid-required')
620
621
622    def _getParameter_max_items(self, commandElement):
623        value = commandElement.getAttribute('max_items')
624
625        if value:
626            try:
627                return int(value)
628            except ValueError:
629                raise BadRequest(text="Field max_items requires a positive " +
630                                      "integer value")
631        else:
632            return None
633
634
635    def _getParameters(self, iq, *names):
[1]636        requestor = jid.internJID(iq["from"]).userhostJID()
[6]637        service = jid.internJID(iq["to"])
[1]638
[30]639        params = [requestor, service]
640
641        if names:
642            command = names[0]
643            commandElement = getattr(iq.pubsub, command)
644            if not commandElement:
645                raise Exception("Could not find command element %r" % command)
646
647        for name in names[1:]:
648            try:
649                getter = getattr(self, '_getParameter_' + name)
650            except KeyError:
651                raise Exception("No parameter getter for this name")
652
653            params.append(getter(commandElement))
654
655        return params
656
657
658    def _onPublish(self, iq):
659        requestor, service, nodeIdentifier = self._getParameters(
660                iq, 'publish', 'node')
[1]661
662        items = []
[2]663        for element in iq.pubsub.publish.elements():
664            if element.uri == NS_PUBSUB and element.name == 'item':
665                items.append(element)
[1]666
[6]667        return self.publish(requestor, service, nodeIdentifier, items)
[1]668
[25]669
[1]670    def _onSubscribe(self, iq):
[30]671        requestor, service, nodeIdentifier, subscriber = self._getParameters(
672                iq, 'subscribe', 'nodeOrEmpty', 'jid')
[1]673
[30]674        def toResponse(result):
[1]675            response = domish.Element((NS_PUBSUB, "pubsub"))
676            subscription = response.addElement("subscription")
[30]677            if result.nodeIdentifier:
678                subscription["node"] = result.nodeIdentifier
679            subscription["jid"] = result.subscriber.full()
680            subscription["subscription"] = result.state
[1]681            return response
682
[6]683        d = self.subscribe(requestor, service, nodeIdentifier, subscriber)
[1]684        d.addCallback(toResponse)
685        return d
686
[25]687
[1]688    def _onUnsubscribe(self, iq):
[30]689        requestor, service, nodeIdentifier, subscriber = self._getParameters(
690                iq, 'unsubscribe', 'nodeOrEmpty', 'jid')
[1]691
[6]692        return self.unsubscribe(requestor, service, nodeIdentifier, subscriber)
[1]693
[25]694
[1]695    def _onOptionsGet(self, iq):
[30]696        raise Unsupported('subscription-options')
[1]697
[25]698
[1]699    def _onOptionsSet(self, iq):
[30]700        raise Unsupported('subscription-options')
[1]701
[25]702
[1]703    def _onSubscriptions(self, iq):
[30]704        requestor, service = self._getParameters(iq)
[1]705
706        def toResponse(result):
707            response = domish.Element((NS_PUBSUB, 'pubsub'))
708            subscriptions = response.addElement('subscriptions')
[32]709            for subscription in result:
[1]710                item = subscriptions.addElement('subscription')
[32]711                item['node'] = subscription.nodeIdentifier
712                item['jid'] = subscription.subscriber.full()
713                item['subscription'] = subscription.state
[1]714            return response
715
[6]716        d = self.subscriptions(requestor, service)
[1]717        d.addCallback(toResponse)
718        return d
719
[25]720
[1]721    def _onAffiliations(self, iq):
[30]722        requestor, service = self._getParameters(iq)
[1]723
724        def toResponse(result):
725            response = domish.Element((NS_PUBSUB, 'pubsub'))
726            affiliations = response.addElement('affiliations')
727
728            for nodeIdentifier, affiliation in result:
729                item = affiliations.addElement('affiliation')
730                item['node'] = nodeIdentifier
731                item['affiliation'] = affiliation
732
733            return response
734
[6]735        d = self.affiliations(requestor, service)
[1]736        d.addCallback(toResponse)
737        return d
738
[25]739
[1]740    def _onCreate(self, iq):
[30]741        requestor, service = self._getParameters(iq)
[1]742        nodeIdentifier = iq.pubsub.create.getAttribute("node")
743
744        def toResponse(result):
745            if not nodeIdentifier or nodeIdentifier != result:
746                response = domish.Element((NS_PUBSUB, 'pubsub'))
747                create = response.addElement('create')
748                create['node'] = result
749                return response
750            else:
751                return None
752
[6]753        d = self.create(requestor, service, nodeIdentifier)
[1]754        d.addCallback(toResponse)
755        return d
756
757
[29]758    def _makeFields(self, options, values):
759        fields = []
760        for name, value in values.iteritems():
761            if name not in options:
762                continue
763
764            option = {'var': name}
765            option.update(options[name])
766            if isinstance(value, list):
767                option['values'] = value
768            else:
769                option['value'] = value
770            fields.append(data_form.Field.fromDict(option))
771        return fields
772
[25]773    def _formFromConfiguration(self, values):
774        options = self.getConfigurationOptions()
[29]775        fields = self._makeFields(options, values)
[25]776        form = data_form.Form(formType="form",
[29]777                              formNamespace=NS_PUBSUB_NODE_CONFIG,
778                              fields=fields)
[1]779
780        return form
781
[29]782    def _checkConfiguration(self, values):
783        options = self.getConfigurationOptions()
784        processedValues = {}
785
786        for key, value in values.iteritems():
787            if key not in options:
788                continue
789
790            option = {'var': key}
791            option.update(options[key])
792            field = data_form.Field.fromDict(option)
793            if isinstance(value, list):
794                field.values = value
795            else:
796                field.value = value
797            field.typeCheck()
798
799            if isinstance(value, list):
800                processedValues[key] = field.values
801            else:
802                processedValues[key] = field.value
803
804        return processedValues
805
[25]806
[1]807    def _onDefault(self, iq):
[30]808        requestor, service = self._getParameters(iq)
[1]809
810        def toResponse(options):
811            response = domish.Element((NS_PUBSUB_OWNER, "pubsub"))
812            default = response.addElement("default")
[25]813            default.addChild(self._formFromConfiguration(options).toElement())
[1]814            return response
815
[30]816        form = self._findForm(iq.pubsub.config, NS_PUBSUB_NODE_CONFIG)
817        values = form and form.formType == 'result' and form.getValues() or {}
818        nodeType = values.get('pubsub#node_type', 'leaf')
819
820        if nodeType not in ('leaf', 'collections'):
821            return defer.fail(error.StanzaError('not-acceptable'))
822
823        d = self.getDefaultConfiguration(requestor, service, nodeType)
[1]824        d.addCallback(toResponse)
825        return d
826
[25]827
[1]828    def _onConfigureGet(self, iq):
[30]829        requestor, service, nodeIdentifier = self._getParameters(
830                iq, 'configure', 'nodeOrEmpty')
[1]831
832        def toResponse(options):
833            response = domish.Element((NS_PUBSUB_OWNER, "pubsub"))
834            configure = response.addElement("configure")
[25]835            configure.addChild(self._formFromConfiguration(options).toElement())
[1]836
837            if nodeIdentifier:
838                configure["node"] = nodeIdentifier
839
840            return response
841
[6]842        d = self.getConfiguration(requestor, service, nodeIdentifier)
[1]843        d.addCallback(toResponse)
844        return d
845
[25]846
[1]847    def _onConfigureSet(self, iq):
[30]848        requestor, service, nodeIdentifier = self._getParameters(
849                iq, 'configure', 'nodeOrEmpty')
[1]850
851        # Search configuration form with correct FORM_TYPE and process it
852
[25]853        form = self._findForm(iq.pubsub.configure, NS_PUBSUB_NODE_CONFIG)
[1]854
[25]855        if form:
856            if form.formType == 'submit':
[29]857                options = self._checkConfiguration(form.getValues())
[1]858
[6]859                return self.setConfiguration(requestor, service,
860                                             nodeIdentifier, options)
[25]861            elif form.formType == 'cancel':
862                return None
[1]863
[25]864        raise BadRequest()
865
[1]866
867    def _onItems(self, iq):
[30]868        requestor, service, nodeIdentifier, maxItems = self._getParameters(
869                iq, 'items', 'nodeOrEmpty', 'max_items')
[1]870
871        itemIdentifiers = []
872        for child in iq.pubsub.items.elements():
873            if child.name == 'item' and child.uri == NS_PUBSUB:
874                try:
875                    itemIdentifiers.append(child["id"])
876                except KeyError:
[30]877                    raise BadRequest()
[1]878
879        def toResponse(result):
880            response = domish.Element((NS_PUBSUB, 'pubsub'))
881            items = response.addElement('items')
[30]882            if nodeIdentifier:
883                items["node"] = nodeIdentifier
[1]884
885            for item in result:
[24]886                items.addChild(item)
[1]887
888            return response
889
[6]890        d = self.items(requestor, service, nodeIdentifier, maxItems,
891                       itemIdentifiers)
[1]892        d.addCallback(toResponse)
893        return d
894
[25]895
[1]896    def _onRetract(self, iq):
[30]897        requestor, service, nodeIdentifier = self._getParameters(
898                iq, 'retract', 'node')
[1]899
900        itemIdentifiers = []
901        for child in iq.pubsub.retract.elements():
[30]902            if child.uri == NS_PUBSUB and child.name == 'item':
[1]903                try:
904                    itemIdentifiers.append(child["id"])
905                except KeyError:
[30]906                    raise BadRequest()
[1]907
[6]908        return self.retract(requestor, service, nodeIdentifier,
909                            itemIdentifiers)
[1]910
[25]911
[1]912    def _onPurge(self, iq):
[30]913        requestor, service, nodeIdentifier = self._getParameters(
914                iq, 'purge', 'node')
[6]915        return self.purge(requestor, service, nodeIdentifier)
[1]916
[25]917
[1]918    def _onDelete(self, iq):
[30]919        requestor, service, nodeIdentifier = self._getParameters(
920                iq, 'delete', 'node')
[6]921        return self.delete(requestor, service, nodeIdentifier)
[1]922
[25]923
[1]924    def _onAffiliationsGet(self, iq):
925        raise Unsupported('modify-affiliations')
926
[25]927
[1]928    def _onAffiliationsSet(self, iq):
929        raise Unsupported('modify-affiliations')
930
[25]931
[1]932    def _onSubscriptionsGet(self, iq):
933        raise Unsupported('manage-subscriptions')
934
[25]935
[1]936    def _onSubscriptionsSet(self, iq):
937        raise Unsupported('manage-subscriptions')
938
939    # public methods
940
[30]941    def _createNotification(self, eventType, service, nodeIdentifier,
942                                  subscriber, subscriptions=None):
943        headers = []
944
945        if subscriptions:
946            for subscription in subscriptions:
947                if nodeIdentifier != subscription.nodeIdentifier:
948                    headers.append(('Collection', subscription.nodeIdentifier))
949
950        message = domish.Element((None, "message"))
951        message["from"] = service.full()
952        message["to"] = subscriber.full()
953        event = message.addElement((NS_PUBSUB_EVENT, "event"))
954
955        element = event.addElement(eventType)
956        element["node"] = nodeIdentifier
957
958        if headers:
959            message.addChild(shim.Headers(headers))
960
961        return message
962
[6]963    def notifyPublish(self, service, nodeIdentifier, notifications):
[30]964        for subscriber, subscriptions, items in notifications:
965            message = self._createNotification('items', service,
966                                               nodeIdentifier, subscriber,
967                                               subscriptions)
968            message.event.items.children = items
[1]969            self.send(message)
970
[25]971
[43]972    def notifyDelete(self, service, nodeIdentifier, subscribers,
973                           redirectURI=None):
974        for subscriber in subscribers:
[30]975            message = self._createNotification('delete', service,
976                                               nodeIdentifier,
[43]977                                               subscriber)
978            if redirectURI:
979                redirect = message.event.delete.addElement('redirect')
980                redirect['uri'] = redirectURI
[15]981            self.send(message)
982
[25]983
[6]984    def getNodeInfo(self, requestor, service, nodeIdentifier):
[1]985        return None
986
[25]987
[6]988    def getNodes(self, requestor, service):
[1]989        return []
990
[25]991
[6]992    def publish(self, requestor, service, nodeIdentifier, items):
[1]993        raise Unsupported('publish')
994
[25]995
[6]996    def subscribe(self, requestor, service, nodeIdentifier, subscriber):
[1]997        raise Unsupported('subscribe')
998
[25]999
[6]1000    def unsubscribe(self, requestor, service, nodeIdentifier, subscriber):
[1]1001        raise Unsupported('subscribe')
1002
[25]1003
[6]1004    def subscriptions(self, requestor, service):
[1]1005        raise Unsupported('retrieve-subscriptions')
1006
[25]1007
[6]1008    def affiliations(self, requestor, service):
[1]1009        raise Unsupported('retrieve-affiliations')
1010
[25]1011
[6]1012    def create(self, requestor, service, nodeIdentifier):
[1]1013        raise Unsupported('create-nodes')
1014
[25]1015
1016    def getConfigurationOptions(self):
1017        return {}
1018
1019
[43]1020    def getDefaultConfiguration(self, requestor, service, nodeType):
[1]1021        raise Unsupported('retrieve-default')
1022
[25]1023
[6]1024    def getConfiguration(self, requestor, service, nodeIdentifier):
[1]1025        raise Unsupported('config-node')
1026
[25]1027
[6]1028    def setConfiguration(self, requestor, service, nodeIdentifier, options):
[1]1029        raise Unsupported('config-node')
1030
[25]1031
[6]1032    def items(self, requestor, service, nodeIdentifier, maxItems,
1033                    itemIdentifiers):
[1]1034        raise Unsupported('retrieve-items')
1035
[25]1036
[6]1037    def retract(self, requestor, service, nodeIdentifier, itemIdentifiers):
[1]1038        raise Unsupported('retract-items')
1039
[25]1040
[6]1041    def purge(self, requestor, service, nodeIdentifier):
[1]1042        raise Unsupported('purge-nodes')
1043
[25]1044
[6]1045    def delete(self, requestor, service, nodeIdentifier):
[1]1046        raise Unsupported('delete-nodes')
Note: See TracBrowser for help on using the repository browser.