source: wokkel/pubsub.py @ 225:58dd11c3ddd5

Last change on this file since 225:58dd11c3ddd5 was 225:58dd11c3ddd5, checked in by souliane <souliane@…>, 6 years ago

implement item retract + notification

  • Property exe set to *
File size: 51.7 KB
RevLine 
[1]1# -*- test-case-name: wokkel.test.test_pubsub -*-
2#
[96]3# Copyright (c) Ralph Meijer.
[1]4# See LICENSE for details.
5
6"""
7XMPP publish-subscribe protocol.
8
9This protocol is specified in
[165]10U{XEP-0060<http://xmpp.org/extensions/xep-0060.html>}.
[1]11"""
12
13from zope.interface import implements
14
[19]15from twisted.internet import defer
[59]16from twisted.python import log
[63]17from twisted.words.protocols.jabber import jid, error
[1]18from twisted.words.xish import domish
19
[57]20from wokkel import disco, data_form, generic, shim
[63]21from wokkel.compat import IQ
[1]22from wokkel.subprotocols import IQHandlerMixin, XMPPHandler
[59]23from wokkel.iwokkel import IPubSubClient, IPubSubService, IPubSubResource
[1]24
25# Iq get and set XPath queries
26IQ_GET = '/iq[@type="get"]'
27IQ_SET = '/iq[@type="set"]'
28
29# Publish-subscribe namespaces
30NS_PUBSUB = 'http://jabber.org/protocol/pubsub'
31NS_PUBSUB_EVENT = NS_PUBSUB + '#event'
32NS_PUBSUB_ERRORS = NS_PUBSUB + '#errors'
33NS_PUBSUB_OWNER = NS_PUBSUB + "#owner"
34NS_PUBSUB_NODE_CONFIG = NS_PUBSUB + "#node_config"
35NS_PUBSUB_META_DATA = NS_PUBSUB + "#meta-data"
[57]36NS_PUBSUB_SUBSCRIBE_OPTIONS = NS_PUBSUB + "#subscribe_options"
[1]37
[57]38# XPath to match pubsub requests
39PUBSUB_REQUEST = '/iq[@type="get" or @type="set"]/' + \
40                    'pubsub[@xmlns="' + NS_PUBSUB + '" or ' + \
41                           '@xmlns="' + NS_PUBSUB_OWNER + '"]'
[1]42
[225]43BOOL_TRUE = ('1','true')
44BOOL_FALSE = ('0','false')
45
[2]46class SubscriptionPending(Exception):
47    """
48    Raised when the requested subscription is pending acceptance.
49    """
50
51
[18]52
[2]53class SubscriptionUnconfigured(Exception):
54    """
55    Raised when the requested subscription needs to be configured before
56    becoming active.
57    """
58
59
[18]60
[1]61class PubSubError(error.StanzaError):
[2]62    """
63    Exception with publish-subscribe specific condition.
64    """
[1]65    def __init__(self, condition, pubsubCondition, feature=None, text=None):
66        appCondition = domish.Element((NS_PUBSUB_ERRORS, pubsubCondition))
67        if feature:
68            appCondition['feature'] = feature
69        error.StanzaError.__init__(self, condition,
70                                         text=text,
71                                         appCondition=appCondition)
72
[2]73
[18]74
[57]75class BadRequest(error.StanzaError):
[30]76    """
77    Bad request stanza error.
78    """
79    def __init__(self, pubsubCondition=None, text=None):
[57]80        if pubsubCondition:
81            appCondition = domish.Element((NS_PUBSUB_ERRORS, pubsubCondition))
82        else:
83            appCondition = None
84        error.StanzaError.__init__(self, 'bad-request',
85                                         text=text,
86                                         appCondition=appCondition)
[30]87
88
89
[1]90class Unsupported(PubSubError):
91    def __init__(self, feature, text=None):
[59]92        self.feature = feature
[1]93        PubSubError.__init__(self, 'feature-not-implemented',
94                                   'unsupported',
95                                   feature,
96                                   text)
97
[59]98    def __str__(self):
99        message = PubSubError.__str__(self)
100        message += ', feature %r' % self.feature
101        return message
[2]102
[18]103
[30]104class Subscription(object):
105    """
106    A subscription to a node.
107
[84]108    @ivar nodeIdentifier: The identifier of the node subscribed to.  The root
109        node is denoted by C{None}.
110    @type nodeIdentifier: C{unicode}
111
[30]112    @ivar subscriber: The subscribing entity.
[84]113    @type subscriber: L{jid.JID}
114
[30]115    @ivar state: The subscription state. One of C{'subscribed'}, C{'pending'},
116                 C{'unconfigured'}.
[84]117    @type state: C{unicode}
118
[30]119    @ivar options: Optional list of subscription options.
[84]120    @type options: C{dict}
121
122    @ivar subscriptionIdentifier: Optional subscription identifier.
123    @type subscriptionIdentifier: C{unicode}
[30]124    """
125
[84]126    def __init__(self, nodeIdentifier, subscriber, state, options=None,
127                       subscriptionIdentifier=None):
[30]128        self.nodeIdentifier = nodeIdentifier
129        self.subscriber = subscriber
130        self.state = state
131        self.options = options or {}
[84]132        self.subscriptionIdentifier = subscriptionIdentifier
133
134
135    @staticmethod
136    def fromElement(element):
137        return Subscription(
138                element.getAttribute('node'),
139                jid.JID(element.getAttribute('jid')),
140                element.getAttribute('subscription'),
141                subscriptionIdentifier=element.getAttribute('subid'))
142
143
[97]144    def toElement(self, defaultUri=None):
[84]145        """
146        Return the DOM representation of this subscription.
147
148        @rtype: L{domish.Element}
149        """
[97]150        element = domish.Element((defaultUri, 'subscription'))
[84]151        if self.nodeIdentifier:
152            element['node'] = self.nodeIdentifier
153        element['jid'] = unicode(self.subscriber)
154        element['subscription'] = self.state
155        if self.subscriptionIdentifier:
156            element['subid'] = self.subscriptionIdentifier
157        return element
[1]158
[2]159
[18]160
[2]161class Item(domish.Element):
162    """
163    Publish subscribe item.
164
165    This behaves like an object providing L{domish.IElement}.
166
167    Item payload can be added using C{addChild} or C{addRawXml}, or using the
168    C{payload} keyword argument to C{__init__}.
169    """
170
171    def __init__(self, id=None, payload=None):
172        """
173        @param id: optional item identifier
[166]174        @type id: C{unicode}
[2]175        @param payload: optional item payload. Either as a domish element, or
176                        as serialized XML.
[166]177        @type payload: object providing L{domish.IElement} or C{unicode}.
[2]178        """
179
[86]180        domish.Element.__init__(self, (None, 'item'))
[2]181        if id is not None:
182            self['id'] = id
183        if payload is not None:
184            if isinstance(payload, basestring):
185                self.addRawXml(payload)
186            else:
187                self.addChild(payload)
188
[10]189
[18]190
[57]191class PubSubRequest(generic.Stanza):
[2]192    """
[57]193    A publish-subscribe request.
[2]194
[57]195    The set of instance variables used depends on the type of request. If
196    a variable is not applicable or not passed in the request, its value is
197    C{None}.
198
[166]199    @ivar verb: The type of publish-subscribe request. See C{_requestVerbMap}.
[57]200    @type verb: C{str}.
201
202    @ivar affiliations: Affiliations to be modified.
203    @type affiliations: C{set}
[166]204
[57]205    @ivar items: The items to be published, as L{domish.Element}s.
206    @type items: C{list}
[166]207
[57]208    @ivar itemIdentifiers: Identifiers of the items to be retrieved or
209                           retracted.
210    @type itemIdentifiers: C{set}
[166]211
[57]212    @ivar maxItems: Maximum number of items to retrieve.
213    @type maxItems: C{int}.
[166]214
[57]215    @ivar nodeIdentifier: Identifier of the node the request is about.
216    @type nodeIdentifier: C{unicode}
[166]217
[57]218    @ivar nodeType: The type of node that should be created, or for which the
219                    configuration is retrieved. C{'leaf'} or C{'collection'}.
220    @type nodeType: C{str}
[166]221
[57]222    @ivar options: Configurations options for nodes, subscriptions and publish
223                   requests.
224    @type options: L{data_form.Form}
[166]225
[57]226    @ivar subscriber: The subscribing entity.
[166]227    @type subscriber: L{JID<twisted.words.protocols.jabber.jid.JID>}
228
[57]229    @ivar subscriptionIdentifier: Identifier for a specific subscription.
230    @type subscriptionIdentifier: C{unicode}
[166]231
[57]232    @ivar subscriptions: Subscriptions to be modified, as a set of
[166]233        L{Subscription}.
[57]234    @type subscriptions: C{set}
[166]235
[93]236    @ivar affiliations: Affiliations to be modified, as a dictionary of entity
[166]237        (L{JID<twisted.words.protocols.jabber.jid.JID>} to affiliation
238        (C{unicode}).
[93]239    @type affiliations: C{dict}
[2]240    """
241
[57]242    verb = None
[2]243
[57]244    affiliations = None
245    items = None
246    itemIdentifiers = None
247    maxItems = None
248    nodeIdentifier = None
249    nodeType = None
250    options = None
251    subscriber = None
252    subscriptionIdentifier = None
253    subscriptions = None
[93]254    affiliations = None
[225]255    notify = None
[2]256
[57]257    # Map request iq type and subelement name to request verb
258    _requestVerbMap = {
259        ('set', NS_PUBSUB, 'publish'): 'publish',
260        ('set', NS_PUBSUB, 'subscribe'): 'subscribe',
261        ('set', NS_PUBSUB, 'unsubscribe'): 'unsubscribe',
262        ('get', NS_PUBSUB, 'options'): 'optionsGet',
263        ('set', NS_PUBSUB, 'options'): 'optionsSet',
264        ('get', NS_PUBSUB, 'subscriptions'): 'subscriptions',
265        ('get', NS_PUBSUB, 'affiliations'): 'affiliations',
266        ('set', NS_PUBSUB, 'create'): 'create',
267        ('get', NS_PUBSUB_OWNER, 'default'): 'default',
268        ('get', NS_PUBSUB_OWNER, 'configure'): 'configureGet',
269        ('set', NS_PUBSUB_OWNER, 'configure'): 'configureSet',
270        ('get', NS_PUBSUB, 'items'): 'items',
271        ('set', NS_PUBSUB, 'retract'): 'retract',
272        ('set', NS_PUBSUB_OWNER, 'purge'): 'purge',
273        ('set', NS_PUBSUB_OWNER, 'delete'): 'delete',
274        ('get', NS_PUBSUB_OWNER, 'affiliations'): 'affiliationsGet',
275        ('set', NS_PUBSUB_OWNER, 'affiliations'): 'affiliationsSet',
276        ('get', NS_PUBSUB_OWNER, 'subscriptions'): 'subscriptionsGet',
277        ('set', NS_PUBSUB_OWNER, 'subscriptions'): 'subscriptionsSet',
278    }
[25]279
[57]280    # Map request verb to request iq type and subelement name
281    _verbRequestMap = dict(((v, k) for k, v in _requestVerbMap.iteritems()))
282
283    # Map request verb to parameter handler names
284    _parameters = {
285        'publish': ['node', 'items'],
[83]286        'subscribe': ['nodeOrEmpty', 'jid', 'optionsWithSubscribe'],
[84]287        'unsubscribe': ['nodeOrEmpty', 'jid', 'subidOrNone'],
288        'optionsGet': ['nodeOrEmpty', 'jid', 'subidOrNone'],
289        'optionsSet': ['nodeOrEmpty', 'jid', 'options', 'subidOrNone'],
[57]290        'subscriptions': [],
291        'affiliations': [],
[82]292        'create': ['nodeOrNone', 'configureOrNone'],
[57]293        'default': ['default'],
294        'configureGet': ['nodeOrEmpty'],
295        'configureSet': ['nodeOrEmpty', 'configure'],
[84]296        'items': ['node', 'maxItems', 'itemIdentifiers', 'subidOrNone'],
[225]297        'retract': ['node', 'notify', 'itemIdentifiers'],
[57]298        'purge': ['node'],
299        'delete': ['node'],
[59]300        'affiliationsGet': ['nodeOrEmpty'],
[93]301        'affiliationsSet': ['nodeOrEmpty', 'affiliations'],
[59]302        'subscriptionsGet': ['nodeOrEmpty'],
[57]303        'subscriptionsSet': [],
304    }
305
306    def __init__(self, verb=None):
307        self.verb = verb
308
309
310    def _parse_node(self, verbElement):
[10]311        """
[57]312        Parse the required node identifier out of the verbElement.
313        """
314        try:
315            self.nodeIdentifier = verbElement["node"]
316        except KeyError:
317            raise BadRequest('nodeid-required')
318
319
320    def _render_node(self, verbElement):
321        """
322        Render the required node identifier on the verbElement.
323        """
324        if not self.nodeIdentifier:
325            raise Exception("Node identifier is required")
326
327        verbElement['node'] = self.nodeIdentifier
328
329
330    def _parse_nodeOrEmpty(self, verbElement):
331        """
332        Parse the node identifier out of the verbElement. May be empty.
333        """
334        self.nodeIdentifier = verbElement.getAttribute("node", '')
335
336
337    def _render_nodeOrEmpty(self, verbElement):
338        """
339        Render the node identifier on the verbElement. May be empty.
340        """
341        if self.nodeIdentifier:
342            verbElement['node'] = self.nodeIdentifier
343
344
345    def _parse_nodeOrNone(self, verbElement):
346        """
347        Parse the optional node identifier out of the verbElement.
348        """
349        self.nodeIdentifier = verbElement.getAttribute("node")
350
351
352    def _render_nodeOrNone(self, verbElement):
353        """
354        Render the optional node identifier on the verbElement.
355        """
356        if self.nodeIdentifier:
357            verbElement['node'] = self.nodeIdentifier
358
359
360    def _parse_items(self, verbElement):
361        """
362        Parse items out of the verbElement for publish requests.
363        """
364        self.items = []
365        for element in verbElement.elements():
366            if element.uri == NS_PUBSUB and element.name == 'item':
367                self.items.append(element)
368
369
370    def _render_items(self, verbElement):
371        """
372        Render items into the verbElement for publish requests.
373        """
374        if self.items:
375            for item in self.items:
[97]376                item.uri = NS_PUBSUB
[57]377                verbElement.addChild(item)
378
379
380    def _parse_jid(self, verbElement):
381        """
382        Parse subscriber out of the verbElement for un-/subscribe requests.
383        """
384        try:
385            self.subscriber = jid.internJID(verbElement["jid"])
386        except KeyError:
387            raise BadRequest('jid-required')
388
389
390    def _render_jid(self, verbElement):
391        """
392        Render subscriber into the verbElement for un-/subscribe requests.
393        """
394        verbElement['jid'] = self.subscriber.full()
395
396
397    def _parse_default(self, verbElement):
398        """
399        Parse node type out of a request for the default node configuration.
400        """
[80]401        form = data_form.findForm(verbElement, NS_PUBSUB_NODE_CONFIG)
[105]402        if form is not None and form.formType == 'submit':
[57]403            values = form.getValues()
404            self.nodeType = values.get('pubsub#node_type', 'leaf')
405        else:
406            self.nodeType = 'leaf'
407
408
409    def _parse_configure(self, verbElement):
410        """
411        Parse options out of a request for setting the node configuration.
412        """
[80]413        form = data_form.findForm(verbElement, NS_PUBSUB_NODE_CONFIG)
[105]414        if form is not None:
[81]415            if form.formType in ('submit', 'cancel'):
416                self.options = form
[57]417            else:
[81]418                raise BadRequest(text=u"Unexpected form type '%s'" % form.formType)
[57]419        else:
420            raise BadRequest(text="Missing configuration form")
421
422
[82]423    def _parse_configureOrNone(self, verbElement):
424        """
425        Parse optional node configuration form in create request.
426        """
427        for element in verbElement.parent.elements():
428            if element.uri == NS_PUBSUB and element.name == 'configure':
429                form = data_form.findForm(element, NS_PUBSUB_NODE_CONFIG)
[105]430                if form is not None:
[83]431                    if form.formType != 'submit':
[82]432                        raise BadRequest(text=u"Unexpected form type '%s'" %
433                                              form.formType)
434                else:
435                    form = data_form.Form('submit',
436                                          formNamespace=NS_PUBSUB_NODE_CONFIG)
[83]437                self.options = form
[82]438
439
440    def _render_configureOrNone(self, verbElement):
441        """
442        Render optional node configuration form in create request.
443        """
444        if self.options is not None:
445            configure = verbElement.parent.addElement('configure')
446            configure.addChild(self.options.toElement())
447
[57]448
449    def _parse_itemIdentifiers(self, verbElement):
450        """
451        Parse item identifiers out of items and retract requests.
452        """
453        self.itemIdentifiers = []
454        for element in verbElement.elements():
455            if element.uri == NS_PUBSUB and element.name == 'item':
456                try:
457                    self.itemIdentifiers.append(element["id"])
458                except KeyError:
459                    raise BadRequest()
460
461
462    def _render_itemIdentifiers(self, verbElement):
463        """
464        Render item identifiers into items and retract requests.
465        """
466        if self.itemIdentifiers:
467            for itemIdentifier in self.itemIdentifiers:
468                item = verbElement.addElement('item')
469                item['id'] = itemIdentifier
470
471
472    def _parse_maxItems(self, verbElement):
473        """
474        Parse maximum items out of an items request.
475        """
476        value = verbElement.getAttribute('max_items')
477
478        if value:
479            try:
480                self.maxItems = int(value)
481            except ValueError:
482                raise BadRequest(text="Field max_items requires a positive " +
483                                      "integer value")
484
485
486    def _render_maxItems(self, verbElement):
487        """
[84]488        Render maximum items into an items request.
[57]489        """
490        if self.maxItems:
491            verbElement['max_items'] = unicode(self.maxItems)
492
493
[84]494    def _parse_subidOrNone(self, verbElement):
495        """
496        Parse subscription identifier out of a request.
497        """
498        self.subscriptionIdentifier = verbElement.getAttribute("subid")
499
500
501    def _render_subidOrNone(self, verbElement):
502        """
503        Render subscription identifier into a request.
504        """
505        if self.subscriptionIdentifier:
506            verbElement['subid'] = self.subscriptionIdentifier
507
508
[57]509    def _parse_options(self, verbElement):
[81]510        """
511        Parse options form out of a subscription options request.
512        """
[80]513        form = data_form.findForm(verbElement, NS_PUBSUB_SUBSCRIBE_OPTIONS)
[105]514        if form is not None:
[81]515            if form.formType in ('submit', 'cancel'):
516                self.options = form
[57]517            else:
[81]518                raise BadRequest(text=u"Unexpected form type '%s'" % form.formType)
[57]519        else:
520            raise BadRequest(text="Missing options form")
521
[81]522
[83]523
524    def _render_options(self, verbElement):
525        verbElement.addChild(self.options.toElement())
526
527
528    def _parse_optionsWithSubscribe(self, verbElement):
529        for element in verbElement.parent.elements():
530            if element.name == 'options' and element.uri == NS_PUBSUB:
531                form = data_form.findForm(element,
532                                          NS_PUBSUB_SUBSCRIBE_OPTIONS)
[105]533                if form is not None:
[83]534                    if form.formType != 'submit':
535                        raise BadRequest(text=u"Unexpected form type '%s'" %
536                                              form.formType)
537                else:
538                    form = data_form.Form('submit',
539                                          formNamespace=NS_PUBSUB_SUBSCRIBE_OPTIONS)
540                self.options = form
541
542
543    def _render_optionsWithSubscribe(self, verbElement):
[105]544        if self.options is not None:
[83]545            optionsElement = verbElement.parent.addElement('options')
546            self._render_options(optionsElement)
547
548
[93]549    def _parse_affiliations(self, verbElement):
550        self.affiliations = {}
551        for element in verbElement.elements():
552            if (element.uri == NS_PUBSUB_OWNER and
553                element.name == 'affiliation'):
554                try:
555                    entity = jid.internJID(element['jid']).userhostJID()
556                except KeyError:
557                    raise BadRequest(text='Missing jid attribute')
558
559                if entity in self.affiliations:
560                    raise BadRequest(text='Multiple affiliations for an entity')
561
562                try:
563                    affiliation = element['affiliation']
564                except KeyError:
565                    raise BadRequest(text='Missing affiliation attribute')
566
567                self.affiliations[entity] = affiliation
568
569
[225]570    def _parse_notify(self, verbElement):
571        value = verbElement.getAttribute('notify')
572
573        if value:
574            if value in BOOL_TRUE:
575                self.notify = True
576            elif value in BOOL_FALSE:
577                self.notify = False
578            else:
579                raise BadRequest(text="Field notify must be a boolean value")
580
581
582    def _render_notify(self, verbElement):
583        if self.notify is not None:
584            verbElement['notify'] = "true" if self.notify else "false"
585
586
[57]587    def parseElement(self, element):
588        """
589        Parse the publish-subscribe verb and parameters out of a request.
590        """
591        generic.Stanza.parseElement(self, element)
592
[83]593        verbs = []
[92]594        verbElements = []
[57]595        for child in element.pubsub.elements():
596            key = (self.stanzaType, child.uri, child.name)
597            try:
598                verb = self._requestVerbMap[key]
599            except KeyError:
600                continue
601
[83]602            verbs.append(verb)
[92]603            verbElements.append(child)
[83]604
605        if not verbs:
[57]606            raise NotImplementedError()
607
[83]608        if len(verbs) > 1:
609            if 'optionsSet' in verbs and 'subscribe' in verbs:
610                self.verb = 'subscribe'
[92]611                verbElement = verbElements[verbs.index('subscribe')]
[83]612            else:
613                raise NotImplementedError()
614        else:
615            self.verb = verbs[0]
[92]616            verbElement = verbElements[0]
[83]617
618        for parameter in self._parameters[self.verb]:
[92]619            getattr(self, '_parse_%s' % parameter)(verbElement)
[57]620
621
[83]622
[57]623    def send(self, xs):
624        """
625        Send this request to its recipient.
626
627        This renders all of the relevant parameters for this specific
[63]628        requests into an L{IQ}, and invoke its C{send} method.
[57]629        This returns a deferred that fires upon reception of a response. See
[63]630        L{IQ} for details.
[57]631
632        @param xs: The XML stream to send the request on.
[166]633        @type xs: L{twisted.words.protocols.jabber.xmlstream.XmlStream}
[57]634        @rtype: L{defer.Deferred}.
635        """
636
637        try:
638            (self.stanzaType,
639             childURI,
640             childName) = self._verbRequestMap[self.verb]
641        except KeyError:
642            raise NotImplementedError()
643
[63]644        iq = IQ(xs, self.stanzaType)
[57]645        iq.addElement((childURI, 'pubsub'))
646        verbElement = iq.pubsub.addElement(childName)
647
648        if self.sender:
649            iq['from'] = self.sender.full()
650        if self.recipient:
651            iq['to'] = self.recipient.full()
652
653        for parameter in self._parameters[self.verb]:
654            getattr(self, '_render_%s' % parameter)(verbElement)
655
656        return iq.send()
[2]657
[10]658
659
[27]660class PubSubEvent(object):
661    """
662    A publish subscribe event.
663
664    @param sender: The entity from which the notification was received.
665    @type sender: L{jid.JID}
666    @param recipient: The entity to which the notification was sent.
667    @type recipient: L{wokkel.pubsub.ItemsEvent}
668    @param nodeIdentifier: Identifier of the node the event pertains to.
669    @type nodeIdentifier: C{unicode}
670    @param headers: SHIM headers, see L{wokkel.shim.extractHeaders}.
[166]671    @type headers: C{dict}
[27]672    """
673
674    def __init__(self, sender, recipient, nodeIdentifier, headers):
675        self.sender = sender
676        self.recipient = recipient
677        self.nodeIdentifier = nodeIdentifier
678        self.headers = headers
679
680
681
682class ItemsEvent(PubSubEvent):
683    """
684    A publish-subscribe event that signifies new, updated and retracted items.
685
686    @param items: List of received items as domish elements.
687    @type items: C{list} of L{domish.Element}
688    """
689
690    def __init__(self, sender, recipient, nodeIdentifier, items, headers):
691        PubSubEvent.__init__(self, sender, recipient, nodeIdentifier, headers)
692        self.items = items
693
694
695
696class DeleteEvent(PubSubEvent):
697    """
698    A publish-subscribe event that signifies the deletion of a node.
699    """
700
[43]701    redirectURI = None
702
[27]703
704
705class PurgeEvent(PubSubEvent):
706    """
707    A publish-subscribe event that signifies the purging of a node.
708    """
709
710
711
[2]712class PubSubClient(XMPPHandler):
713    """
714    Publish subscribe client protocol.
715    """
716
717    implements(IPubSubClient)
718
719    def connectionInitialized(self):
[13]720        self.xmlstream.addObserver('/message/event[@xmlns="%s"]' %
721                                   NS_PUBSUB_EVENT, self._onEvent)
[2]722
[25]723
[13]724    def _onEvent(self, message):
[90]725        if message.getAttribute('type') == 'error':
726            return
727
[2]728        try:
[27]729            sender = jid.JID(message["from"])
[6]730            recipient = jid.JID(message["to"])
[2]731        except KeyError:
732            return
733
[15]734        actionElement = None
[13]735        for element in message.event.elements():
736            if element.uri == NS_PUBSUB_EVENT:
737                actionElement = element
738
739        if not actionElement:
740            return
741
742        eventHandler = getattr(self, "_onEvent_%s" % actionElement.name, None)
743
744        if eventHandler:
[27]745            headers = shim.extractHeaders(message)
746            eventHandler(sender, recipient, actionElement, headers)
[13]747            message.handled = True
748
[30]749
[27]750    def _onEvent_items(self, sender, recipient, action, headers):
[13]751        nodeIdentifier = action["node"]
752
753        items = [element for element in action.elements()
754                         if element.name in ('item', 'retract')]
[2]755
[27]756        event = ItemsEvent(sender, recipient, nodeIdentifier, items, headers)
757        self.itemsReceived(event)
[2]758
[30]759
[27]760    def _onEvent_delete(self, sender, recipient, action, headers):
[13]761        nodeIdentifier = action["node"]
[27]762        event = DeleteEvent(sender, recipient, nodeIdentifier, headers)
[43]763        if action.redirect:
764            event.redirectURI = action.redirect.getAttribute('uri')
[27]765        self.deleteReceived(event)
[13]766
[30]767
[27]768    def _onEvent_purge(self, sender, recipient, action, headers):
[13]769        nodeIdentifier = action["node"]
[27]770        event = PurgeEvent(sender, recipient, nodeIdentifier, headers)
771        self.purgeReceived(event)
[13]772
[30]773
[27]774    def itemsReceived(self, event):
[2]775        pass
776
[30]777
[27]778    def deleteReceived(self, event):
[13]779        pass
780
[30]781
[27]782    def purgeReceived(self, event):
[13]783        pass
784
[30]785
[82]786    def createNode(self, service, nodeIdentifier=None, options=None,
787                         sender=None):
[18]788        """
789        Create a publish subscribe node.
790
791        @param service: The publish subscribe service to create the node at.
[166]792        @type service: L{JID<twisted.words.protocols.jabber.jid.JID>}
[18]793        @param nodeIdentifier: Optional suggestion for the id of the node.
794        @type nodeIdentifier: C{unicode}
[82]795        @param options: Optional node configuration options.
796        @type options: C{dict}
[18]797        """
[57]798        request = PubSubRequest('create')
799        request.recipient = service
800        request.nodeIdentifier = nodeIdentifier
[58]801        request.sender = sender
[2]802
[82]803        if options:
804            form = data_form.Form(formType='submit',
805                                  formNamespace=NS_PUBSUB_NODE_CONFIG)
806            form.makeFields(options)
807            request.options = form
808
[2]809        def cb(iq):
810            try:
811                new_node = iq.pubsub.create["node"]
812            except AttributeError:
813                # the suggested node identifier was accepted
[6]814                new_node = nodeIdentifier
[2]815            return new_node
816
[57]817        d = request.send(self.xmlstream)
818        d.addCallback(cb)
819        return d
[2]820
[25]821
[58]822    def deleteNode(self, service, nodeIdentifier, sender=None):
[18]823        """
824        Delete a publish subscribe node.
825
826        @param service: The publish subscribe service to delete the node from.
[166]827        @type service: L{JID<twisted.words.protocols.jabber.jid.JID>}
[18]828        @param nodeIdentifier: The identifier of the node.
829        @type nodeIdentifier: C{unicode}
830        """
[57]831        request = PubSubRequest('delete')
832        request.recipient = service
833        request.nodeIdentifier = nodeIdentifier
[58]834        request.sender = sender
[57]835        return request.send(self.xmlstream)
[2]836
[25]837
[83]838    def subscribe(self, service, nodeIdentifier, subscriber,
839                        options=None, sender=None):
[18]840        """
841        Subscribe to a publish subscribe node.
842
843        @param service: The publish subscribe service that keeps the node.
[166]844        @type service: L{JID<twisted.words.protocols.jabber.jid.JID>}
[84]845
[18]846        @param nodeIdentifier: The identifier of the node.
847        @type nodeIdentifier: C{unicode}
[84]848
[18]849        @param subscriber: The entity to subscribe to the node. This entity
[84]850            will get notifications of new published items.
[166]851        @type subscriber: L{JID<twisted.words.protocols.jabber.jid.JID>}
[84]852
[83]853        @param options: Subscription options.
[84]854        @type options: C{dict}
855
856        @return: Deferred that fires with L{Subscription} or errbacks with
857            L{SubscriptionPending} or L{SubscriptionUnconfigured}.
858        @rtype: L{defer.Deferred}
[18]859        """
[57]860        request = PubSubRequest('subscribe')
861        request.recipient = service
862        request.nodeIdentifier = nodeIdentifier
863        request.subscriber = subscriber
[58]864        request.sender = sender
[2]865
[83]866        if options:
867            form = data_form.Form(formType='submit',
868                                  formNamespace=NS_PUBSUB_SUBSCRIBE_OPTIONS)
869            form.makeFields(options)
870            request.options = form
871
[2]872        def cb(iq):
[84]873            subscription = Subscription.fromElement(iq.pubsub.subscription)
[2]874
[84]875            if subscription.state == 'pending':
876                raise SubscriptionPending()
877            elif subscription.state == 'unconfigured':
878                raise SubscriptionUnconfigured()
[2]879            else:
880                # we assume subscription == 'subscribed'
881                # any other value would be invalid, but that should have
882                # yielded a stanza error.
[84]883                return subscription
[2]884
[57]885        d = request.send(self.xmlstream)
886        d.addCallback(cb)
887        return d
[2]888
[25]889
[84]890    def unsubscribe(self, service, nodeIdentifier, subscriber,
891                          subscriptionIdentifier=None, sender=None):
[18]892        """
893        Unsubscribe from a publish subscribe node.
894
895        @param service: The publish subscribe service that keeps the node.
[166]896        @type service: L{JID<twisted.words.protocols.jabber.jid.JID>}
[84]897
[18]898        @param nodeIdentifier: The identifier of the node.
899        @type nodeIdentifier: C{unicode}
[84]900
[18]901        @param subscriber: The entity to unsubscribe from the node.
[166]902        @type subscriber: L{JID<twisted.words.protocols.jabber.jid.JID>}
[84]903
904        @param subscriptionIdentifier: Optional subscription identifier.
905        @type subscriptionIdentifier: C{unicode}
[18]906        """
[57]907        request = PubSubRequest('unsubscribe')
908        request.recipient = service
909        request.nodeIdentifier = nodeIdentifier
910        request.subscriber = subscriber
[84]911        request.subscriptionIdentifier = subscriptionIdentifier
[58]912        request.sender = sender
[57]913        return request.send(self.xmlstream)
[10]914
[25]915
[58]916    def publish(self, service, nodeIdentifier, items=None, sender=None):
[18]917        """
918        Publish to a publish subscribe node.
919
920        @param service: The publish subscribe service that keeps the node.
[166]921        @type service: L{JID<twisted.words.protocols.jabber.jid.JID>}
[18]922        @param nodeIdentifier: The identifier of the node.
923        @type nodeIdentifier: C{unicode}
924        @param items: Optional list of L{Item}s to publish.
925        @type items: C{list}
926        """
[57]927        request = PubSubRequest('publish')
928        request.recipient = service
929        request.nodeIdentifier = nodeIdentifier
930        request.items = items
[58]931        request.sender = sender
[57]932        return request.send(self.xmlstream)
[2]933
[25]934
[84]935    def items(self, service, nodeIdentifier, maxItems=None,
936              subscriptionIdentifier=None, sender=None):
[18]937        """
938        Retrieve previously published items from a publish subscribe node.
939
940        @param service: The publish subscribe service that keeps the node.
[166]941        @type service: L{JID<twisted.words.protocols.jabber.jid.JID>}
[84]942
[18]943        @param nodeIdentifier: The identifier of the node.
944        @type nodeIdentifier: C{unicode}
[84]945
[18]946        @param maxItems: Optional limit on the number of retrieved items.
947        @type maxItems: C{int}
[84]948
949        @param subscriptionIdentifier: Optional subscription identifier. In
950            case the node has been subscribed to multiple times, this narrows
951            the results to the specific subscription.
952        @type subscriptionIdentifier: C{unicode}
[18]953        """
[57]954        request = PubSubRequest('items')
955        request.recipient = service
956        request.nodeIdentifier = nodeIdentifier
[18]957        if maxItems:
[57]958            request.maxItems = str(int(maxItems))
[84]959        request.subscriptionIdentifier = subscriptionIdentifier
[58]960        request.sender = sender
[18]961
[17]962        def cb(iq):
963            items = []
964            for element in iq.pubsub.items.elements():
965                if element.uri == NS_PUBSUB and element.name == 'item':
966                    items.append(element)
967            return items
968
[57]969        d = request.send(self.xmlstream)
970        d.addCallback(cb)
971        return d
[10]972
[225]973    def retractItems(self, service, nodeIdentifier, itemIdentifiers, notify=None, sender=None):
974        """
975        Retract items from a publish subscribe node.
976
977        @param service: The publish subscribe service to delete the node from.
978        @type service: L{JID<twisted.words.protocols.jabber.jid.JID>}
979        @param nodeIdentifier: The identifier of the node.
980        @type nodeIdentifier: C{unicode}
981        @param itemIdentifiers: Identifiers of the items to be retracted.
982        @type itemIdentifiers: C{set}
983        @param notify: True if notification is required
984        @type notify: C{unicode}
985        """
986        request = self._request_class('retract')
987        request.recipient = service
988        request.nodeIdentifier = nodeIdentifier
989        request.itemIdentifiers = itemIdentifiers
990        request.notify = notify
991        request.sender = sender
992        return request.send(self.xmlstream)
[18]993
[84]994    def getOptions(self, service, nodeIdentifier, subscriber,
995                         subscriptionIdentifier=None, sender=None):
[83]996        """
997        Get subscription options.
998
999        @param service: The publish subscribe service that keeps the node.
[166]1000        @type service: L{JID<twisted.words.protocols.jabber.jid.JID>}
[83]1001
1002        @param nodeIdentifier: The identifier of the node.
1003        @type nodeIdentifier: C{unicode}
1004
1005        @param subscriber: The entity subscribed to the node.
[166]1006        @type subscriber: L{JID<twisted.words.protocols.jabber.jid.JID>}
[83]1007
[84]1008        @param subscriptionIdentifier: Optional subscription identifier.
1009        @type subscriptionIdentifier: C{unicode}
1010
[83]1011        @rtype: L{data_form.Form}
1012        """
1013        request = PubSubRequest('optionsGet')
1014        request.recipient = service
1015        request.nodeIdentifier = nodeIdentifier
1016        request.subscriber = subscriber
[84]1017        request.subscriptionIdentifier = subscriptionIdentifier
[83]1018        request.sender = sender
1019
1020        def cb(iq):
1021            form = data_form.findForm(iq.pubsub.options,
1022                                      NS_PUBSUB_SUBSCRIBE_OPTIONS)
1023            form.typeCheck()
1024            return form
1025
1026        d = request.send(self.xmlstream)
1027        d.addCallback(cb)
1028        return d
1029
1030
1031    def setOptions(self, service, nodeIdentifier, subscriber,
[84]1032                         options, subscriptionIdentifier=None, sender=None):
[83]1033        """
1034        Set subscription options.
1035
1036        @param service: The publish subscribe service that keeps the node.
[166]1037        @type service: L{JID<twisted.words.protocols.jabber.jid.JID>}
[83]1038
1039        @param nodeIdentifier: The identifier of the node.
1040        @type nodeIdentifier: C{unicode}
1041
1042        @param subscriber: The entity subscribed to the node.
[166]1043        @type subscriber: L{JID<twisted.words.protocols.jabber.jid.JID>}
[83]1044
1045        @param options: Subscription options.
1046        @type options: C{dict}.
[84]1047
1048        @param subscriptionIdentifier: Optional subscription identifier.
1049        @type subscriptionIdentifier: C{unicode}
[83]1050        """
1051        request = PubSubRequest('optionsSet')
1052        request.recipient = service
1053        request.nodeIdentifier = nodeIdentifier
1054        request.subscriber = subscriber
[84]1055        request.subscriptionIdentifier = subscriptionIdentifier
[83]1056        request.sender = sender
1057
1058        form = data_form.Form(formType='submit',
1059                              formNamespace=NS_PUBSUB_SUBSCRIBE_OPTIONS)
1060        form.makeFields(options)
1061        request.options = form
1062
1063        d = request.send(self.xmlstream)
1064        return d
1065
1066
[18]1067
[1]1068class PubSubService(XMPPHandler, IQHandlerMixin):
1069    """
1070    Protocol implementation for a XMPP Publish Subscribe Service.
1071
1072    The word Service here is used as taken from the Publish Subscribe
1073    specification. It is the party responsible for keeping nodes and their
1074    subscriptions, and sending out notifications.
1075
[166]1076    Methods from the L{IPubSubService} interface that are called as a result
1077    of an XMPP request may raise exceptions. Alternatively the deferred
1078    returned by these methods may have their errback called. These are handled
1079    as follows:
[1]1080
[22]1081     - If the exception is an instance of L{error.StanzaError}, an error
1082       response iq is returned.
1083     - Any other exception is reported using L{log.msg}. An error response
1084       with the condition C{internal-server-error} is returned.
[1]1085
1086    The default implementation of said methods raises an L{Unsupported}
1087    exception and are meant to be overridden.
1088
1089    @ivar discoIdentity: Service discovery identity as a dictionary with
1090                         keys C{'category'}, C{'type'} and C{'name'}.
1091    @ivar pubSubFeatures: List of supported publish-subscribe features for
1092                          service discovery, as C{str}.
[22]1093    @type pubSubFeatures: C{list} or C{None}
[1]1094    """
1095
[88]1096    implements(IPubSubService, disco.IDisco)
[1]1097
1098    iqHandlers = {
[57]1099            '/*': '_onPubSubRequest',
[1]1100            }
1101
[59]1102    _legacyHandlers = {
1103        'publish': ('publish', ['sender', 'recipient',
1104                                'nodeIdentifier', 'items']),
1105        'subscribe': ('subscribe', ['sender', 'recipient',
1106                                    'nodeIdentifier', 'subscriber']),
1107        'unsubscribe': ('unsubscribe', ['sender', 'recipient',
1108                                        'nodeIdentifier', 'subscriber']),
1109        'subscriptions': ('subscriptions', ['sender', 'recipient']),
1110        'affiliations': ('affiliations', ['sender', 'recipient']),
1111        'create': ('create', ['sender', 'recipient', 'nodeIdentifier']),
1112        'getConfigurationOptions': ('getConfigurationOptions', []),
1113        'default': ('getDefaultConfiguration',
1114                    ['sender', 'recipient', 'nodeType']),
1115        'configureGet': ('getConfiguration', ['sender', 'recipient',
1116                                              'nodeIdentifier']),
1117        'configureSet': ('setConfiguration', ['sender', 'recipient',
1118                                              'nodeIdentifier', 'options']),
1119        'items': ('items', ['sender', 'recipient', 'nodeIdentifier',
1120                            'maxItems', 'itemIdentifiers']),
1121        'retract': ('retract', ['sender', 'recipient', 'nodeIdentifier',
1122                                'itemIdentifiers']),
1123        'purge': ('purge', ['sender', 'recipient', 'nodeIdentifier']),
1124        'delete': ('delete', ['sender', 'recipient', 'nodeIdentifier']),
1125    }
[25]1126
[59]1127    hideNodes = False
1128
1129    def __init__(self, resource=None):
1130        self.resource = resource
[1]1131        self.discoIdentity = {'category': 'pubsub',
[87]1132                              'type': 'service',
[1]1133                              'name': 'Generic Publish-Subscribe Service'}
1134
1135        self.pubSubFeatures = []
1136
[25]1137
[1]1138    def connectionMade(self):
[57]1139        self.xmlstream.addObserver(PUBSUB_REQUEST, self.handleRequest)
[1]1140
[25]1141
[88]1142    def getDiscoInfo(self, requestor, target, nodeIdentifier=''):
1143        def toInfo(nodeInfo):
[25]1144            if not nodeInfo:
[88]1145                return
[1]1146
[25]1147            (nodeType, metaData) = nodeInfo['type'], nodeInfo['meta-data']
1148            info.append(disco.DiscoIdentity('pubsub', nodeType))
1149            if metaData:
1150                form = data_form.Form(formType="result",
1151                                      formNamespace=NS_PUBSUB_META_DATA)
[29]1152                form.addField(
[25]1153                        data_form.Field(
1154                            var='pubsub#node_type',
1155                            value=nodeType,
1156                            label='The type of node (collection or leaf)'
1157                        )
1158                )
[1]1159
[25]1160                for metaDatum in metaData:
[29]1161                    form.addField(data_form.Field.fromDict(metaDatum))
[1]1162
[52]1163                info.append(form)
[1]1164
[88]1165            return
[59]1166
1167        info = []
1168
1169        request = PubSubRequest('discoInfo')
1170
1171        if self.resource is not None:
1172            resource = self.resource.locateResource(request)
1173            identity = resource.discoIdentity
1174            features = resource.features
1175            getInfo = resource.getInfo
1176        else:
[87]1177            category = self.discoIdentity['category']
1178            idType = self.discoIdentity['type']
1179            name = self.discoIdentity['name']
[59]1180            identity = disco.DiscoIdentity(category, idType, name)
1181            features = self.pubSubFeatures
1182            getInfo = self.getNodeInfo
1183
1184        if not nodeIdentifier:
1185            info.append(identity)
1186            info.append(disco.DiscoFeature(disco.NS_DISCO_ITEMS))
1187            info.extend([disco.DiscoFeature("%s#%s" % (NS_PUBSUB, feature))
1188                         for feature in features])
1189
[87]1190        d = defer.maybeDeferred(getInfo, requestor, target, nodeIdentifier or '')
[88]1191        d.addCallback(toInfo)
[59]1192        d.addErrback(log.err)
[88]1193        d.addCallback(lambda _: info)
[25]1194        return d
1195
[1]1196
[88]1197    def getDiscoItems(self, requestor, target, nodeIdentifier=''):
[59]1198        if self.hideNodes:
1199            d = defer.succeed([])
1200        elif self.resource is not None:
1201            request = PubSubRequest('discoInfo')
1202            resource = self.resource.locateResource(request)
1203            d = resource.getNodes(requestor, target, nodeIdentifier)
1204        elif nodeIdentifier:
1205            d = self.getNodes(requestor, target)
1206        else:
1207            d = defer.succeed([])
1208
[1]1209        d.addCallback(lambda nodes: [disco.DiscoItem(target, node)
1210                                     for node in nodes])
1211        return d
1212
[25]1213
[57]1214    def _onPubSubRequest(self, iq):
1215        request = PubSubRequest.fromElement(iq)
[25]1216
[59]1217        if self.resource is not None:
1218            resource = self.resource.locateResource(request)
1219        else:
1220            resource = self
[25]1221
[59]1222        # Preprocess the request, knowing the handling resource
1223        try:
1224            preProcessor = getattr(self, '_preProcess_%s' % request.verb)
1225        except AttributeError:
1226            pass
1227        else:
1228            request = preProcessor(resource, request)
1229            if request is None:
1230                return defer.succeed(None)
[25]1231
[59]1232        # Process the request itself,
1233        if resource is not self:
1234            try:
1235                handler = getattr(resource, request.verb)
1236            except AttributeError:
1237                text = "Request verb: %s" % request.verb
1238                return defer.fail(Unsupported('', text))
[25]1239
[59]1240            d = handler(request)
1241        else:
[94]1242            try:
1243                handlerName, argNames = self._legacyHandlers[request.verb]
1244            except KeyError:
1245                text = "Request verb: %s" % request.verb
1246                return defer.fail(Unsupported('', text))
1247
[59]1248            handler = getattr(self, handlerName)
1249            args = [getattr(request, arg) for arg in argNames]
1250            d = handler(*args)
[1]1251
[59]1252        # If needed, translate the result into a response
1253        try:
1254            cb = getattr(self, '_toResponse_%s' % request.verb)
1255        except AttributeError:
1256            pass
1257        else:
1258            d.addCallback(cb, resource, request)
[1]1259
1260        return d
1261
[25]1262
[59]1263    def _toResponse_subscribe(self, result, resource, request):
1264        response = domish.Element((NS_PUBSUB, "pubsub"))
[97]1265        response.addChild(result.toElement(NS_PUBSUB))
[59]1266        return response
[1]1267
1268
[59]1269    def _toResponse_subscriptions(self, result, resource, request):
1270        response = domish.Element((NS_PUBSUB, 'pubsub'))
1271        subscriptions = response.addElement('subscriptions')
1272        for subscription in result:
[97]1273            subscriptions.addChild(subscription.toElement(NS_PUBSUB))
[59]1274        return response
[1]1275
[25]1276
[59]1277    def _toResponse_affiliations(self, result, resource, request):
1278        response = domish.Element((NS_PUBSUB, 'pubsub'))
1279        affiliations = response.addElement('affiliations')
[1]1280
[59]1281        for nodeIdentifier, affiliation in result:
1282            item = affiliations.addElement('affiliation')
1283            item['node'] = nodeIdentifier
1284            item['affiliation'] = affiliation
[25]1285
[59]1286        return response
[1]1287
[59]1288
1289    def _toResponse_create(self, result, resource, request):
1290        if not request.nodeIdentifier or request.nodeIdentifier != result:
[1]1291            response = domish.Element((NS_PUBSUB, 'pubsub'))
[59]1292            create = response.addElement('create')
1293            create['node'] = result
[1]1294            return response
[59]1295        else:
1296            return None
[1]1297
1298
[59]1299    def _formFromConfiguration(self, resource, values):
[79]1300        fieldDefs = resource.getConfigurationOptions()
[25]1301        form = data_form.Form(formType="form",
[79]1302                              formNamespace=NS_PUBSUB_NODE_CONFIG)
1303        form.makeFields(values, fieldDefs)
[1]1304        return form
1305
[57]1306
[81]1307    def _checkConfiguration(self, resource, form):
1308        fieldDefs = resource.getConfigurationOptions()
1309        form.typeCheck(fieldDefs, filterUnknown=True)
[29]1310
[25]1311
[82]1312    def _preProcess_create(self, resource, request):
1313        if request.options:
1314            self._checkConfiguration(resource, request.options)
1315        return request
1316
1317
[59]1318    def _preProcess_default(self, resource, request):
1319        if request.nodeType not in ('leaf', 'collection'):
1320            raise error.StanzaError('not-acceptable')
1321        else:
1322            return request
[1]1323
1324
[59]1325    def _toResponse_default(self, options, resource, request):
1326        response = domish.Element((NS_PUBSUB_OWNER, "pubsub"))
1327        default = response.addElement("default")
1328        form = self._formFromConfiguration(resource, options)
1329        default.addChild(form.toElement())
1330        return response
[30]1331
[1]1332
[59]1333    def _toResponse_configureGet(self, options, resource, request):
1334        response = domish.Element((NS_PUBSUB_OWNER, "pubsub"))
1335        configure = response.addElement("configure")
1336        form = self._formFromConfiguration(resource, options)
1337        configure.addChild(form.toElement())
[25]1338
[59]1339        if request.nodeIdentifier:
1340            configure["node"] = request.nodeIdentifier
[1]1341
[59]1342        return response
[1]1343
1344
[59]1345    def _preProcess_configureSet(self, resource, request):
[81]1346        if request.options.formType == 'cancel':
1347            return None
1348        else:
1349            self._checkConfiguration(resource, request.options)
[59]1350            return request
[1]1351
1352
[59]1353    def _toResponse_items(self, result, resource, request):
1354        response = domish.Element((NS_PUBSUB, 'pubsub'))
1355        items = response.addElement('items')
1356        items["node"] = request.nodeIdentifier
[1]1357
[59]1358        for item in result:
[97]1359            item.uri = NS_PUBSUB
[59]1360            items.addChild(item)
[1]1361
[59]1362        return response
[1]1363
1364
[30]1365    def _createNotification(self, eventType, service, nodeIdentifier,
1366                                  subscriber, subscriptions=None):
1367        headers = []
1368
1369        if subscriptions:
1370            for subscription in subscriptions:
1371                if nodeIdentifier != subscription.nodeIdentifier:
1372                    headers.append(('Collection', subscription.nodeIdentifier))
1373
1374        message = domish.Element((None, "message"))
1375        message["from"] = service.full()
1376        message["to"] = subscriber.full()
1377        event = message.addElement((NS_PUBSUB_EVENT, "event"))
1378
1379        element = event.addElement(eventType)
1380        element["node"] = nodeIdentifier
1381
1382        if headers:
1383            message.addChild(shim.Headers(headers))
1384
1385        return message
1386
[93]1387
1388    def _toResponse_affiliationsGet(self, result, resource, request):
1389        response = domish.Element((NS_PUBSUB_OWNER, 'pubsub'))
1390        affiliations = response.addElement('affiliations')
1391
1392        if request.nodeIdentifier:
1393            affiliations['node'] = request.nodeIdentifier
1394
1395        for entity, affiliation in result.iteritems():
1396            item = affiliations.addElement('affiliation')
1397            item['jid'] = entity.full()
1398            item['affiliation'] = affiliation
1399
1400        return response
1401
1402
[59]1403    # public methods
1404
[6]1405    def notifyPublish(self, service, nodeIdentifier, notifications):
[30]1406        for subscriber, subscriptions, items in notifications:
1407            message = self._createNotification('items', service,
1408                                               nodeIdentifier, subscriber,
1409                                               subscriptions)
[97]1410            for item in items:
1411                item.uri = NS_PUBSUB_EVENT
1412                message.event.items.addChild(item)
[1]1413            self.send(message)
1414
[25]1415
[225]1416    def notifyRetract(self, service, nodeIdentifier, notifications):
1417        for subscriber, subscriptions, items in notifications:
1418            message = self._createNotification('items', service,
1419                                               nodeIdentifier, subscriber,
1420                                               subscriptions)
1421            for item in items:
1422                retract = domish.Element((None, "retract"))
1423                retract['id'] = item['id']
1424                message.event.items.addChild(retract)
1425            self.send(message)
1426
1427
[43]1428    def notifyDelete(self, service, nodeIdentifier, subscribers,
1429                           redirectURI=None):
1430        for subscriber in subscribers:
[30]1431            message = self._createNotification('delete', service,
1432                                               nodeIdentifier,
[43]1433                                               subscriber)
1434            if redirectURI:
1435                redirect = message.event.delete.addElement('redirect')
1436                redirect['uri'] = redirectURI
[15]1437            self.send(message)
1438
[25]1439
[6]1440    def getNodeInfo(self, requestor, service, nodeIdentifier):
[1]1441        return None
1442
[25]1443
[6]1444    def getNodes(self, requestor, service):
[1]1445        return []
1446
[25]1447
[6]1448    def publish(self, requestor, service, nodeIdentifier, items):
[1]1449        raise Unsupported('publish')
1450
[25]1451
[6]1452    def subscribe(self, requestor, service, nodeIdentifier, subscriber):
[1]1453        raise Unsupported('subscribe')
1454
[25]1455
[6]1456    def unsubscribe(self, requestor, service, nodeIdentifier, subscriber):
[1]1457        raise Unsupported('subscribe')
1458
[25]1459
[6]1460    def subscriptions(self, requestor, service):
[1]1461        raise Unsupported('retrieve-subscriptions')
1462
[25]1463
[6]1464    def affiliations(self, requestor, service):
[1]1465        raise Unsupported('retrieve-affiliations')
1466
[25]1467
[6]1468    def create(self, requestor, service, nodeIdentifier):
[1]1469        raise Unsupported('create-nodes')
1470
[25]1471
1472    def getConfigurationOptions(self):
1473        return {}
1474
1475
[43]1476    def getDefaultConfiguration(self, requestor, service, nodeType):
[1]1477        raise Unsupported('retrieve-default')
1478
[25]1479
[6]1480    def getConfiguration(self, requestor, service, nodeIdentifier):
[1]1481        raise Unsupported('config-node')
1482
[25]1483
[6]1484    def setConfiguration(self, requestor, service, nodeIdentifier, options):
[1]1485        raise Unsupported('config-node')
1486
[25]1487
[6]1488    def items(self, requestor, service, nodeIdentifier, maxItems,
1489                    itemIdentifiers):
[1]1490        raise Unsupported('retrieve-items')
1491
[25]1492
[6]1493    def retract(self, requestor, service, nodeIdentifier, itemIdentifiers):
[1]1494        raise Unsupported('retract-items')
1495
[25]1496
[6]1497    def purge(self, requestor, service, nodeIdentifier):
[1]1498        raise Unsupported('purge-nodes')
1499
[25]1500
[6]1501    def delete(self, requestor, service, nodeIdentifier):
[1]1502        raise Unsupported('delete-nodes')
[59]1503
1504
1505
1506class PubSubResource(object):
1507
1508    implements(IPubSubResource)
1509
1510    features = []
1511    discoIdentity = disco.DiscoIdentity('pubsub',
1512                                        'service',
1513                                        'Publish-Subscribe Service')
1514
1515
1516    def locateResource(self, request):
1517        return self
1518
1519
1520    def getInfo(self, requestor, service, nodeIdentifier):
1521        return defer.succeed(None)
1522
1523
1524    def getNodes(self, requestor, service, nodeIdentifier):
1525        return defer.succeed([])
1526
1527
1528    def getConfigurationOptions(self):
1529        return {}
1530
1531
1532    def publish(self, request):
1533        return defer.fail(Unsupported('publish'))
1534
1535
1536    def subscribe(self, request):
1537        return defer.fail(Unsupported('subscribe'))
1538
1539
1540    def unsubscribe(self, request):
1541        return defer.fail(Unsupported('subscribe'))
1542
1543
1544    def subscriptions(self, request):
1545        return defer.fail(Unsupported('retrieve-subscriptions'))
1546
1547
1548    def affiliations(self, request):
1549        return defer.fail(Unsupported('retrieve-affiliations'))
1550
1551
1552    def create(self, request):
1553        return defer.fail(Unsupported('create-nodes'))
1554
1555
1556    def default(self, request):
1557        return defer.fail(Unsupported('retrieve-default'))
1558
1559
1560    def configureGet(self, request):
1561        return defer.fail(Unsupported('config-node'))
1562
1563
1564    def configureSet(self, request):
1565        return defer.fail(Unsupported('config-node'))
1566
1567
1568    def items(self, request):
1569        return defer.fail(Unsupported('retrieve-items'))
1570
1571
1572    def retract(self, request):
1573        return defer.fail(Unsupported('retract-items'))
1574
1575
1576    def purge(self, request):
1577        return defer.fail(Unsupported('purge-nodes'))
1578
1579
1580    def delete(self, request):
1581        return defer.fail(Unsupported('delete-nodes'))
1582
1583
1584    def affiliationsGet(self, request):
1585        return defer.fail(Unsupported('modify-affiliations'))
1586
1587
1588    def affiliationsSet(self, request):
1589        return defer.fail(Unsupported('modify-affiliations'))
1590
1591
1592    def subscriptionsGet(self, request):
1593        return defer.fail(Unsupported('manage-subscriptions'))
1594
1595
1596    def subscriptionsSet(self, request):
1597        return defer.fail(Unsupported('manage-subscriptions'))
Note: See TracBrowser for help on using the repository browser.