source: wokkel/pubsub.py @ 86:f4aa0d507bc8

Last change on this file since 86:f4aa0d507bc8 was 86:f4aa0d507bc8, checked in by Ralph Meijer <ralphm@…>, 11 years ago

Don't set an explicit namespace on wokkel.pubsub.Item.

Before this change, wokkel.pubsub.Item could not be used for constructing
notifications, as those are in another namespace. This makes sure that items
will always have the correct namespace, inherited from their ancestors.

File size: 47.1 KB
Line 
1# -*- test-case-name: wokkel.test.test_pubsub -*-
2#
3# Copyright (c) 2003-2010 Ralph Meijer
4# See LICENSE for details.
5
6"""
7XMPP publish-subscribe protocol.
8
9This protocol is specified in
10U{XEP-0060<http://www.xmpp.org/extensions/xep-0060.html>}.
11"""
12
13from zope.interface import implements
14
15from twisted.internet import defer
16from twisted.python import log
17from twisted.words.protocols.jabber import jid, error
18from twisted.words.xish import domish
19
20from wokkel import disco, data_form, generic, shim
21from wokkel.compat import IQ
22from wokkel.subprotocols import IQHandlerMixin, XMPPHandler
23from wokkel.iwokkel import IPubSubClient, IPubSubService, IPubSubResource
24
25# Iq get and set XPath queries
26IQ_GET = '/iq[@type="get"]'
27IQ_SET = '/iq[@type="set"]'
28
29# Publish-subscribe namespaces
30NS_PUBSUB = 'http://jabber.org/protocol/pubsub'
31NS_PUBSUB_EVENT = NS_PUBSUB + '#event'
32NS_PUBSUB_ERRORS = NS_PUBSUB + '#errors'
33NS_PUBSUB_OWNER = NS_PUBSUB + "#owner"
34NS_PUBSUB_NODE_CONFIG = NS_PUBSUB + "#node_config"
35NS_PUBSUB_META_DATA = NS_PUBSUB + "#meta-data"
36NS_PUBSUB_SUBSCRIBE_OPTIONS = NS_PUBSUB + "#subscribe_options"
37
38# XPath to match pubsub requests
39PUBSUB_REQUEST = '/iq[@type="get" or @type="set"]/' + \
40                    'pubsub[@xmlns="' + NS_PUBSUB + '" or ' + \
41                           '@xmlns="' + NS_PUBSUB_OWNER + '"]'
42
43class SubscriptionPending(Exception):
44    """
45    Raised when the requested subscription is pending acceptance.
46    """
47
48
49
50class SubscriptionUnconfigured(Exception):
51    """
52    Raised when the requested subscription needs to be configured before
53    becoming active.
54    """
55
56
57
58class PubSubError(error.StanzaError):
59    """
60    Exception with publish-subscribe specific condition.
61    """
62    def __init__(self, condition, pubsubCondition, feature=None, text=None):
63        appCondition = domish.Element((NS_PUBSUB_ERRORS, pubsubCondition))
64        if feature:
65            appCondition['feature'] = feature
66        error.StanzaError.__init__(self, condition,
67                                         text=text,
68                                         appCondition=appCondition)
69
70
71
72class BadRequest(error.StanzaError):
73    """
74    Bad request stanza error.
75    """
76    def __init__(self, pubsubCondition=None, text=None):
77        if pubsubCondition:
78            appCondition = domish.Element((NS_PUBSUB_ERRORS, pubsubCondition))
79        else:
80            appCondition = None
81        error.StanzaError.__init__(self, 'bad-request',
82                                         text=text,
83                                         appCondition=appCondition)
84
85
86
87class Unsupported(PubSubError):
88    def __init__(self, feature, text=None):
89        self.feature = feature
90        PubSubError.__init__(self, 'feature-not-implemented',
91                                   'unsupported',
92                                   feature,
93                                   text)
94
95    def __str__(self):
96        message = PubSubError.__str__(self)
97        message += ', feature %r' % self.feature
98        return message
99
100
101class Subscription(object):
102    """
103    A subscription to a node.
104
105    @ivar nodeIdentifier: The identifier of the node subscribed to.  The root
106        node is denoted by C{None}.
107    @type nodeIdentifier: C{unicode}
108
109    @ivar subscriber: The subscribing entity.
110    @type subscriber: L{jid.JID}
111
112    @ivar state: The subscription state. One of C{'subscribed'}, C{'pending'},
113                 C{'unconfigured'}.
114    @type state: C{unicode}
115
116    @ivar options: Optional list of subscription options.
117    @type options: C{dict}
118
119    @ivar subscriptionIdentifier: Optional subscription identifier.
120    @type subscriptionIdentifier: C{unicode}
121    """
122
123    def __init__(self, nodeIdentifier, subscriber, state, options=None,
124                       subscriptionIdentifier=None):
125        self.nodeIdentifier = nodeIdentifier
126        self.subscriber = subscriber
127        self.state = state
128        self.options = options or {}
129        self.subscriptionIdentifier = subscriptionIdentifier
130
131
132    @staticmethod
133    def fromElement(element):
134        return Subscription(
135                element.getAttribute('node'),
136                jid.JID(element.getAttribute('jid')),
137                element.getAttribute('subscription'),
138                subscriptionIdentifier=element.getAttribute('subid'))
139
140
141    def toElement(self):
142        """
143        Return the DOM representation of this subscription.
144
145        @rtype: L{domish.Element}
146        """
147        element = domish.Element((None, 'subscription'))
148        if self.nodeIdentifier:
149            element['node'] = self.nodeIdentifier
150        element['jid'] = unicode(self.subscriber)
151        element['subscription'] = self.state
152        if self.subscriptionIdentifier:
153            element['subid'] = self.subscriptionIdentifier
154        return element
155
156
157
158class Item(domish.Element):
159    """
160    Publish subscribe item.
161
162    This behaves like an object providing L{domish.IElement}.
163
164    Item payload can be added using C{addChild} or C{addRawXml}, or using the
165    C{payload} keyword argument to C{__init__}.
166    """
167
168    def __init__(self, id=None, payload=None):
169        """
170        @param id: optional item identifier
171        @type id: L{unicode}
172        @param payload: optional item payload. Either as a domish element, or
173                        as serialized XML.
174        @type payload: object providing L{domish.IElement} or L{unicode}.
175        """
176
177        domish.Element.__init__(self, (None, 'item'))
178        if id is not None:
179            self['id'] = id
180        if payload is not None:
181            if isinstance(payload, basestring):
182                self.addRawXml(payload)
183            else:
184                self.addChild(payload)
185
186
187
188class PubSubRequest(generic.Stanza):
189    """
190    A publish-subscribe request.
191
192    The set of instance variables used depends on the type of request. If
193    a variable is not applicable or not passed in the request, its value is
194    C{None}.
195
196    @ivar verb: The type of publish-subscribe request. See L{_requestVerbMap}.
197    @type verb: C{str}.
198
199    @ivar affiliations: Affiliations to be modified.
200    @type affiliations: C{set}
201    @ivar items: The items to be published, as L{domish.Element}s.
202    @type items: C{list}
203    @ivar itemIdentifiers: Identifiers of the items to be retrieved or
204                           retracted.
205    @type itemIdentifiers: C{set}
206    @ivar maxItems: Maximum number of items to retrieve.
207    @type maxItems: C{int}.
208    @ivar nodeIdentifier: Identifier of the node the request is about.
209    @type nodeIdentifier: C{unicode}
210    @ivar nodeType: The type of node that should be created, or for which the
211                    configuration is retrieved. C{'leaf'} or C{'collection'}.
212    @type nodeType: C{str}
213    @ivar options: Configurations options for nodes, subscriptions and publish
214                   requests.
215    @type options: L{data_form.Form}
216    @ivar subscriber: The subscribing entity.
217    @type subscriber: L{JID}
218    @ivar subscriptionIdentifier: Identifier for a specific subscription.
219    @type subscriptionIdentifier: C{unicode}
220    @ivar subscriptions: Subscriptions to be modified, as a set of
221                         L{Subscription}.
222    @type subscriptions: C{set}
223    """
224
225    verb = None
226
227    affiliations = None
228    items = None
229    itemIdentifiers = None
230    maxItems = None
231    nodeIdentifier = None
232    nodeType = None
233    options = None
234    subscriber = None
235    subscriptionIdentifier = None
236    subscriptions = None
237
238    # Map request iq type and subelement name to request verb
239    _requestVerbMap = {
240        ('set', NS_PUBSUB, 'publish'): 'publish',
241        ('set', NS_PUBSUB, 'subscribe'): 'subscribe',
242        ('set', NS_PUBSUB, 'unsubscribe'): 'unsubscribe',
243        ('get', NS_PUBSUB, 'options'): 'optionsGet',
244        ('set', NS_PUBSUB, 'options'): 'optionsSet',
245        ('get', NS_PUBSUB, 'subscriptions'): 'subscriptions',
246        ('get', NS_PUBSUB, 'affiliations'): 'affiliations',
247        ('set', NS_PUBSUB, 'create'): 'create',
248        ('get', NS_PUBSUB_OWNER, 'default'): 'default',
249        ('get', NS_PUBSUB_OWNER, 'configure'): 'configureGet',
250        ('set', NS_PUBSUB_OWNER, 'configure'): 'configureSet',
251        ('get', NS_PUBSUB, 'items'): 'items',
252        ('set', NS_PUBSUB, 'retract'): 'retract',
253        ('set', NS_PUBSUB_OWNER, 'purge'): 'purge',
254        ('set', NS_PUBSUB_OWNER, 'delete'): 'delete',
255        ('get', NS_PUBSUB_OWNER, 'affiliations'): 'affiliationsGet',
256        ('set', NS_PUBSUB_OWNER, 'affiliations'): 'affiliationsSet',
257        ('get', NS_PUBSUB_OWNER, 'subscriptions'): 'subscriptionsGet',
258        ('set', NS_PUBSUB_OWNER, 'subscriptions'): 'subscriptionsSet',
259    }
260
261    # Map request verb to request iq type and subelement name
262    _verbRequestMap = dict(((v, k) for k, v in _requestVerbMap.iteritems()))
263
264    # Map request verb to parameter handler names
265    _parameters = {
266        'publish': ['node', 'items'],
267        'subscribe': ['nodeOrEmpty', 'jid', 'optionsWithSubscribe'],
268        'unsubscribe': ['nodeOrEmpty', 'jid', 'subidOrNone'],
269        'optionsGet': ['nodeOrEmpty', 'jid', 'subidOrNone'],
270        'optionsSet': ['nodeOrEmpty', 'jid', 'options', 'subidOrNone'],
271        'subscriptions': [],
272        'affiliations': [],
273        'create': ['nodeOrNone', 'configureOrNone'],
274        'default': ['default'],
275        'configureGet': ['nodeOrEmpty'],
276        'configureSet': ['nodeOrEmpty', 'configure'],
277        'items': ['node', 'maxItems', 'itemIdentifiers', 'subidOrNone'],
278        'retract': ['node', 'itemIdentifiers'],
279        'purge': ['node'],
280        'delete': ['node'],
281        'affiliationsGet': ['nodeOrEmpty'],
282        'affiliationsSet': [],
283        'subscriptionsGet': ['nodeOrEmpty'],
284        'subscriptionsSet': [],
285    }
286
287    def __init__(self, verb=None):
288        self.verb = verb
289
290
291    def _parse_node(self, verbElement):
292        """
293        Parse the required node identifier out of the verbElement.
294        """
295        try:
296            self.nodeIdentifier = verbElement["node"]
297        except KeyError:
298            raise BadRequest('nodeid-required')
299
300
301    def _render_node(self, verbElement):
302        """
303        Render the required node identifier on the verbElement.
304        """
305        if not self.nodeIdentifier:
306            raise Exception("Node identifier is required")
307
308        verbElement['node'] = self.nodeIdentifier
309
310
311    def _parse_nodeOrEmpty(self, verbElement):
312        """
313        Parse the node identifier out of the verbElement. May be empty.
314        """
315        self.nodeIdentifier = verbElement.getAttribute("node", '')
316
317
318    def _render_nodeOrEmpty(self, verbElement):
319        """
320        Render the node identifier on the verbElement. May be empty.
321        """
322        if self.nodeIdentifier:
323            verbElement['node'] = self.nodeIdentifier
324
325
326    def _parse_nodeOrNone(self, verbElement):
327        """
328        Parse the optional node identifier out of the verbElement.
329        """
330        self.nodeIdentifier = verbElement.getAttribute("node")
331
332
333    def _render_nodeOrNone(self, verbElement):
334        """
335        Render the optional node identifier on the verbElement.
336        """
337        if self.nodeIdentifier:
338            verbElement['node'] = self.nodeIdentifier
339
340
341    def _parse_items(self, verbElement):
342        """
343        Parse items out of the verbElement for publish requests.
344        """
345        self.items = []
346        for element in verbElement.elements():
347            if element.uri == NS_PUBSUB and element.name == 'item':
348                self.items.append(element)
349
350
351    def _render_items(self, verbElement):
352        """
353        Render items into the verbElement for publish requests.
354        """
355        if self.items:
356            for item in self.items:
357                verbElement.addChild(item)
358
359
360    def _parse_jid(self, verbElement):
361        """
362        Parse subscriber out of the verbElement for un-/subscribe requests.
363        """
364        try:
365            self.subscriber = jid.internJID(verbElement["jid"])
366        except KeyError:
367            raise BadRequest('jid-required')
368
369
370    def _render_jid(self, verbElement):
371        """
372        Render subscriber into the verbElement for un-/subscribe requests.
373        """
374        verbElement['jid'] = self.subscriber.full()
375
376
377    def _parse_default(self, verbElement):
378        """
379        Parse node type out of a request for the default node configuration.
380        """
381        form = data_form.findForm(verbElement, NS_PUBSUB_NODE_CONFIG)
382        if form and form.formType == 'submit':
383            values = form.getValues()
384            self.nodeType = values.get('pubsub#node_type', 'leaf')
385        else:
386            self.nodeType = 'leaf'
387
388
389    def _parse_configure(self, verbElement):
390        """
391        Parse options out of a request for setting the node configuration.
392        """
393        form = data_form.findForm(verbElement, NS_PUBSUB_NODE_CONFIG)
394        if form:
395            if form.formType in ('submit', 'cancel'):
396                self.options = form
397            else:
398                raise BadRequest(text=u"Unexpected form type '%s'" % form.formType)
399        else:
400            raise BadRequest(text="Missing configuration form")
401
402
403    def _parse_configureOrNone(self, verbElement):
404        """
405        Parse optional node configuration form in create request.
406        """
407        for element in verbElement.parent.elements():
408            if element.uri == NS_PUBSUB and element.name == 'configure':
409                form = data_form.findForm(element, NS_PUBSUB_NODE_CONFIG)
410                if form:
411                    if form.formType != 'submit':
412                        raise BadRequest(text=u"Unexpected form type '%s'" %
413                                              form.formType)
414                else:
415                    form = data_form.Form('submit',
416                                          formNamespace=NS_PUBSUB_NODE_CONFIG)
417                self.options = form
418
419
420    def _render_configureOrNone(self, verbElement):
421        """
422        Render optional node configuration form in create request.
423        """
424        if self.options is not None:
425            configure = verbElement.parent.addElement('configure')
426            configure.addChild(self.options.toElement())
427
428
429    def _parse_itemIdentifiers(self, verbElement):
430        """
431        Parse item identifiers out of items and retract requests.
432        """
433        self.itemIdentifiers = []
434        for element in verbElement.elements():
435            if element.uri == NS_PUBSUB and element.name == 'item':
436                try:
437                    self.itemIdentifiers.append(element["id"])
438                except KeyError:
439                    raise BadRequest()
440
441
442    def _render_itemIdentifiers(self, verbElement):
443        """
444        Render item identifiers into items and retract requests.
445        """
446        if self.itemIdentifiers:
447            for itemIdentifier in self.itemIdentifiers:
448                item = verbElement.addElement('item')
449                item['id'] = itemIdentifier
450
451
452    def _parse_maxItems(self, verbElement):
453        """
454        Parse maximum items out of an items request.
455        """
456        value = verbElement.getAttribute('max_items')
457
458        if value:
459            try:
460                self.maxItems = int(value)
461            except ValueError:
462                raise BadRequest(text="Field max_items requires a positive " +
463                                      "integer value")
464
465
466    def _render_maxItems(self, verbElement):
467        """
468        Render maximum items into an items request.
469        """
470        if self.maxItems:
471            verbElement['max_items'] = unicode(self.maxItems)
472
473
474    def _parse_subidOrNone(self, verbElement):
475        """
476        Parse subscription identifier out of a request.
477        """
478        self.subscriptionIdentifier = verbElement.getAttribute("subid")
479
480
481    def _render_subidOrNone(self, verbElement):
482        """
483        Render subscription identifier into a request.
484        """
485        if self.subscriptionIdentifier:
486            verbElement['subid'] = self.subscriptionIdentifier
487
488
489    def _parse_options(self, verbElement):
490        """
491        Parse options form out of a subscription options request.
492        """
493        form = data_form.findForm(verbElement, NS_PUBSUB_SUBSCRIBE_OPTIONS)
494        if form:
495            if form.formType in ('submit', 'cancel'):
496                self.options = form
497            else:
498                raise BadRequest(text=u"Unexpected form type '%s'" % form.formType)
499        else:
500            raise BadRequest(text="Missing options form")
501
502
503
504    def _render_options(self, verbElement):
505        verbElement.addChild(self.options.toElement())
506
507
508    def _parse_optionsWithSubscribe(self, verbElement):
509        for element in verbElement.parent.elements():
510            if element.name == 'options' and element.uri == NS_PUBSUB:
511                form = data_form.findForm(element,
512                                          NS_PUBSUB_SUBSCRIBE_OPTIONS)
513                if form:
514                    if form.formType != 'submit':
515                        raise BadRequest(text=u"Unexpected form type '%s'" %
516                                              form.formType)
517                else:
518                    form = data_form.Form('submit',
519                                          formNamespace=NS_PUBSUB_SUBSCRIBE_OPTIONS)
520                self.options = form
521
522
523    def _render_optionsWithSubscribe(self, verbElement):
524        if self.options:
525            optionsElement = verbElement.parent.addElement('options')
526            self._render_options(optionsElement)
527
528
529    def parseElement(self, element):
530        """
531        Parse the publish-subscribe verb and parameters out of a request.
532        """
533        generic.Stanza.parseElement(self, element)
534
535        verbs = []
536        children = []
537        for child in element.pubsub.elements():
538            key = (self.stanzaType, child.uri, child.name)
539            try:
540                verb = self._requestVerbMap[key]
541            except KeyError:
542                continue
543
544            verbs.append(verb)
545            children.append(child)
546
547        if not verbs:
548            raise NotImplementedError()
549
550        if len(verbs) > 1:
551            if 'optionsSet' in verbs and 'subscribe' in verbs:
552                self.verb = 'subscribe'
553                child = children[verbs.index('subscribe')]
554            else:
555                raise NotImplementedError()
556        else:
557            self.verb = verbs[0]
558
559        for parameter in self._parameters[self.verb]:
560            getattr(self, '_parse_%s' % parameter)(child)
561
562
563
564    def send(self, xs):
565        """
566        Send this request to its recipient.
567
568        This renders all of the relevant parameters for this specific
569        requests into an L{IQ}, and invoke its C{send} method.
570        This returns a deferred that fires upon reception of a response. See
571        L{IQ} for details.
572
573        @param xs: The XML stream to send the request on.
574        @type xs: L{xmlstream.XmlStream}
575        @rtype: L{defer.Deferred}.
576        """
577
578        try:
579            (self.stanzaType,
580             childURI,
581             childName) = self._verbRequestMap[self.verb]
582        except KeyError:
583            raise NotImplementedError()
584
585        iq = IQ(xs, self.stanzaType)
586        iq.addElement((childURI, 'pubsub'))
587        verbElement = iq.pubsub.addElement(childName)
588
589        if self.sender:
590            iq['from'] = self.sender.full()
591        if self.recipient:
592            iq['to'] = self.recipient.full()
593
594        for parameter in self._parameters[self.verb]:
595            getattr(self, '_render_%s' % parameter)(verbElement)
596
597        return iq.send()
598
599
600
601class PubSubEvent(object):
602    """
603    A publish subscribe event.
604
605    @param sender: The entity from which the notification was received.
606    @type sender: L{jid.JID}
607    @param recipient: The entity to which the notification was sent.
608    @type recipient: L{wokkel.pubsub.ItemsEvent}
609    @param nodeIdentifier: Identifier of the node the event pertains to.
610    @type nodeIdentifier: C{unicode}
611    @param headers: SHIM headers, see L{wokkel.shim.extractHeaders}.
612    @type headers: L{dict}
613    """
614
615    def __init__(self, sender, recipient, nodeIdentifier, headers):
616        self.sender = sender
617        self.recipient = recipient
618        self.nodeIdentifier = nodeIdentifier
619        self.headers = headers
620
621
622
623class ItemsEvent(PubSubEvent):
624    """
625    A publish-subscribe event that signifies new, updated and retracted items.
626
627    @param items: List of received items as domish elements.
628    @type items: C{list} of L{domish.Element}
629    """
630
631    def __init__(self, sender, recipient, nodeIdentifier, items, headers):
632        PubSubEvent.__init__(self, sender, recipient, nodeIdentifier, headers)
633        self.items = items
634
635
636
637class DeleteEvent(PubSubEvent):
638    """
639    A publish-subscribe event that signifies the deletion of a node.
640    """
641
642    redirectURI = None
643
644
645
646class PurgeEvent(PubSubEvent):
647    """
648    A publish-subscribe event that signifies the purging of a node.
649    """
650
651
652
653class PubSubClient(XMPPHandler):
654    """
655    Publish subscribe client protocol.
656    """
657
658    implements(IPubSubClient)
659
660    def connectionInitialized(self):
661        self.xmlstream.addObserver('/message/event[@xmlns="%s"]' %
662                                   NS_PUBSUB_EVENT, self._onEvent)
663
664
665    def _onEvent(self, message):
666        try:
667            sender = jid.JID(message["from"])
668            recipient = jid.JID(message["to"])
669        except KeyError:
670            return
671
672        actionElement = None
673        for element in message.event.elements():
674            if element.uri == NS_PUBSUB_EVENT:
675                actionElement = element
676
677        if not actionElement:
678            return
679
680        eventHandler = getattr(self, "_onEvent_%s" % actionElement.name, None)
681
682        if eventHandler:
683            headers = shim.extractHeaders(message)
684            eventHandler(sender, recipient, actionElement, headers)
685            message.handled = True
686
687
688    def _onEvent_items(self, sender, recipient, action, headers):
689        nodeIdentifier = action["node"]
690
691        items = [element for element in action.elements()
692                         if element.name in ('item', 'retract')]
693
694        event = ItemsEvent(sender, recipient, nodeIdentifier, items, headers)
695        self.itemsReceived(event)
696
697
698    def _onEvent_delete(self, sender, recipient, action, headers):
699        nodeIdentifier = action["node"]
700        event = DeleteEvent(sender, recipient, nodeIdentifier, headers)
701        if action.redirect:
702            event.redirectURI = action.redirect.getAttribute('uri')
703        self.deleteReceived(event)
704
705
706    def _onEvent_purge(self, sender, recipient, action, headers):
707        nodeIdentifier = action["node"]
708        event = PurgeEvent(sender, recipient, nodeIdentifier, headers)
709        self.purgeReceived(event)
710
711
712    def itemsReceived(self, event):
713        pass
714
715
716    def deleteReceived(self, event):
717        pass
718
719
720    def purgeReceived(self, event):
721        pass
722
723
724    def createNode(self, service, nodeIdentifier=None, options=None,
725                         sender=None):
726        """
727        Create a publish subscribe node.
728
729        @param service: The publish subscribe service to create the node at.
730        @type service: L{JID}
731        @param nodeIdentifier: Optional suggestion for the id of the node.
732        @type nodeIdentifier: C{unicode}
733        @param options: Optional node configuration options.
734        @type options: C{dict}
735        """
736        request = PubSubRequest('create')
737        request.recipient = service
738        request.nodeIdentifier = nodeIdentifier
739        request.sender = sender
740
741        if options:
742            form = data_form.Form(formType='submit',
743                                  formNamespace=NS_PUBSUB_NODE_CONFIG)
744            form.makeFields(options)
745            request.options = form
746
747        def cb(iq):
748            try:
749                new_node = iq.pubsub.create["node"]
750            except AttributeError:
751                # the suggested node identifier was accepted
752                new_node = nodeIdentifier
753            return new_node
754
755        d = request.send(self.xmlstream)
756        d.addCallback(cb)
757        return d
758
759
760    def deleteNode(self, service, nodeIdentifier, sender=None):
761        """
762        Delete a publish subscribe node.
763
764        @param service: The publish subscribe service to delete the node from.
765        @type service: L{JID}
766        @param nodeIdentifier: The identifier of the node.
767        @type nodeIdentifier: C{unicode}
768        """
769        request = PubSubRequest('delete')
770        request.recipient = service
771        request.nodeIdentifier = nodeIdentifier
772        request.sender = sender
773        return request.send(self.xmlstream)
774
775
776    def subscribe(self, service, nodeIdentifier, subscriber,
777                        options=None, sender=None):
778        """
779        Subscribe to a publish subscribe node.
780
781        @param service: The publish subscribe service that keeps the node.
782        @type service: L{JID}
783
784        @param nodeIdentifier: The identifier of the node.
785        @type nodeIdentifier: C{unicode}
786
787        @param subscriber: The entity to subscribe to the node. This entity
788            will get notifications of new published items.
789        @type subscriber: L{JID}
790
791        @param options: Subscription options.
792        @type options: C{dict}
793
794        @return: Deferred that fires with L{Subscription} or errbacks with
795            L{SubscriptionPending} or L{SubscriptionUnconfigured}.
796        @rtype: L{defer.Deferred}
797        """
798        request = PubSubRequest('subscribe')
799        request.recipient = service
800        request.nodeIdentifier = nodeIdentifier
801        request.subscriber = subscriber
802        request.sender = sender
803
804        if options:
805            form = data_form.Form(formType='submit',
806                                  formNamespace=NS_PUBSUB_SUBSCRIBE_OPTIONS)
807            form.makeFields(options)
808            request.options = form
809
810        def cb(iq):
811            subscription = Subscription.fromElement(iq.pubsub.subscription)
812
813            if subscription.state == 'pending':
814                raise SubscriptionPending()
815            elif subscription.state == 'unconfigured':
816                raise SubscriptionUnconfigured()
817            else:
818                # we assume subscription == 'subscribed'
819                # any other value would be invalid, but that should have
820                # yielded a stanza error.
821                return subscription
822
823        d = request.send(self.xmlstream)
824        d.addCallback(cb)
825        return d
826
827
828    def unsubscribe(self, service, nodeIdentifier, subscriber,
829                          subscriptionIdentifier=None, sender=None):
830        """
831        Unsubscribe from a publish subscribe node.
832
833        @param service: The publish subscribe service that keeps the node.
834        @type service: L{JID}
835
836        @param nodeIdentifier: The identifier of the node.
837        @type nodeIdentifier: C{unicode}
838
839        @param subscriber: The entity to unsubscribe from the node.
840        @type subscriber: L{JID}
841
842        @param subscriptionIdentifier: Optional subscription identifier.
843        @type subscriptionIdentifier: C{unicode}
844        """
845        request = PubSubRequest('unsubscribe')
846        request.recipient = service
847        request.nodeIdentifier = nodeIdentifier
848        request.subscriber = subscriber
849        request.subscriptionIdentifier = subscriptionIdentifier
850        request.sender = sender
851        return request.send(self.xmlstream)
852
853
854    def publish(self, service, nodeIdentifier, items=None, sender=None):
855        """
856        Publish to a publish subscribe node.
857
858        @param service: The publish subscribe service that keeps the node.
859        @type service: L{JID}
860        @param nodeIdentifier: The identifier of the node.
861        @type nodeIdentifier: C{unicode}
862        @param items: Optional list of L{Item}s to publish.
863        @type items: C{list}
864        """
865        request = PubSubRequest('publish')
866        request.recipient = service
867        request.nodeIdentifier = nodeIdentifier
868        request.items = items
869        request.sender = sender
870        return request.send(self.xmlstream)
871
872
873    def items(self, service, nodeIdentifier, maxItems=None,
874              subscriptionIdentifier=None, sender=None):
875        """
876        Retrieve previously published items from a publish subscribe node.
877
878        @param service: The publish subscribe service that keeps the node.
879        @type service: L{JID}
880
881        @param nodeIdentifier: The identifier of the node.
882        @type nodeIdentifier: C{unicode}
883
884        @param maxItems: Optional limit on the number of retrieved items.
885        @type maxItems: C{int}
886
887        @param subscriptionIdentifier: Optional subscription identifier. In
888            case the node has been subscribed to multiple times, this narrows
889            the results to the specific subscription.
890        @type subscriptionIdentifier: C{unicode}
891        """
892        request = PubSubRequest('items')
893        request.recipient = service
894        request.nodeIdentifier = nodeIdentifier
895        if maxItems:
896            request.maxItems = str(int(maxItems))
897        request.subscriptionIdentifier = subscriptionIdentifier
898        request.sender = sender
899
900        def cb(iq):
901            items = []
902            for element in iq.pubsub.items.elements():
903                if element.uri == NS_PUBSUB and element.name == 'item':
904                    items.append(element)
905            return items
906
907        d = request.send(self.xmlstream)
908        d.addCallback(cb)
909        return d
910
911
912    def getOptions(self, service, nodeIdentifier, subscriber,
913                         subscriptionIdentifier=None, sender=None):
914        """
915        Get subscription options.
916
917        @param service: The publish subscribe service that keeps the node.
918        @type service: L{JID}
919
920        @param nodeIdentifier: The identifier of the node.
921        @type nodeIdentifier: C{unicode}
922
923        @param subscriber: The entity subscribed to the node.
924        @type subscriber: L{JID}
925
926        @param subscriptionIdentifier: Optional subscription identifier.
927        @type subscriptionIdentifier: C{unicode}
928
929        @rtype: L{data_form.Form}
930        """
931        request = PubSubRequest('optionsGet')
932        request.recipient = service
933        request.nodeIdentifier = nodeIdentifier
934        request.subscriber = subscriber
935        request.subscriptionIdentifier = subscriptionIdentifier
936        request.sender = sender
937
938        def cb(iq):
939            form = data_form.findForm(iq.pubsub.options,
940                                      NS_PUBSUB_SUBSCRIBE_OPTIONS)
941            form.typeCheck()
942            return form
943
944        d = request.send(self.xmlstream)
945        d.addCallback(cb)
946        return d
947
948
949    def setOptions(self, service, nodeIdentifier, subscriber,
950                         options, subscriptionIdentifier=None, sender=None):
951        """
952        Set subscription options.
953
954        @param service: The publish subscribe service that keeps the node.
955        @type service: L{JID}
956
957        @param nodeIdentifier: The identifier of the node.
958        @type nodeIdentifier: C{unicode}
959
960        @param subscriber: The entity subscribed to the node.
961        @type subscriber: L{JID}
962
963        @param options: Subscription options.
964        @type options: C{dict}.
965
966        @param subscriptionIdentifier: Optional subscription identifier.
967        @type subscriptionIdentifier: C{unicode}
968        """
969        request = PubSubRequest('optionsSet')
970        request.recipient = service
971        request.nodeIdentifier = nodeIdentifier
972        request.subscriber = subscriber
973        request.subscriptionIdentifier = subscriptionIdentifier
974        request.sender = sender
975
976        form = data_form.Form(formType='submit',
977                              formNamespace=NS_PUBSUB_SUBSCRIBE_OPTIONS)
978        form.makeFields(options)
979        request.options = form
980
981        d = request.send(self.xmlstream)
982        return d
983
984
985
986class PubSubService(XMPPHandler, IQHandlerMixin):
987    """
988    Protocol implementation for a XMPP Publish Subscribe Service.
989
990    The word Service here is used as taken from the Publish Subscribe
991    specification. It is the party responsible for keeping nodes and their
992    subscriptions, and sending out notifications.
993
994    Methods from the L{IPubSubService} interface that are called as
995    a result of an XMPP request may raise exceptions. Alternatively the
996    deferred returned by these methods may have their errback called. These are
997    handled as follows:
998
999     - If the exception is an instance of L{error.StanzaError}, an error
1000       response iq is returned.
1001     - Any other exception is reported using L{log.msg}. An error response
1002       with the condition C{internal-server-error} is returned.
1003
1004    The default implementation of said methods raises an L{Unsupported}
1005    exception and are meant to be overridden.
1006
1007    @ivar discoIdentity: Service discovery identity as a dictionary with
1008                         keys C{'category'}, C{'type'} and C{'name'}.
1009    @ivar pubSubFeatures: List of supported publish-subscribe features for
1010                          service discovery, as C{str}.
1011    @type pubSubFeatures: C{list} or C{None}
1012    """
1013
1014    implements(IPubSubService)
1015
1016    iqHandlers = {
1017            '/*': '_onPubSubRequest',
1018            }
1019
1020    _legacyHandlers = {
1021        'publish': ('publish', ['sender', 'recipient',
1022                                'nodeIdentifier', 'items']),
1023        'subscribe': ('subscribe', ['sender', 'recipient',
1024                                    'nodeIdentifier', 'subscriber']),
1025        'unsubscribe': ('unsubscribe', ['sender', 'recipient',
1026                                        'nodeIdentifier', 'subscriber']),
1027        'subscriptions': ('subscriptions', ['sender', 'recipient']),
1028        'affiliations': ('affiliations', ['sender', 'recipient']),
1029        'create': ('create', ['sender', 'recipient', 'nodeIdentifier']),
1030        'getConfigurationOptions': ('getConfigurationOptions', []),
1031        'default': ('getDefaultConfiguration',
1032                    ['sender', 'recipient', 'nodeType']),
1033        'configureGet': ('getConfiguration', ['sender', 'recipient',
1034                                              'nodeIdentifier']),
1035        'configureSet': ('setConfiguration', ['sender', 'recipient',
1036                                              'nodeIdentifier', 'options']),
1037        'items': ('items', ['sender', 'recipient', 'nodeIdentifier',
1038                            'maxItems', 'itemIdentifiers']),
1039        'retract': ('retract', ['sender', 'recipient', 'nodeIdentifier',
1040                                'itemIdentifiers']),
1041        'purge': ('purge', ['sender', 'recipient', 'nodeIdentifier']),
1042        'delete': ('delete', ['sender', 'recipient', 'nodeIdentifier']),
1043    }
1044
1045    hideNodes = False
1046
1047    def __init__(self, resource=None):
1048        self.resource = resource
1049        self.discoIdentity = {'category': 'pubsub',
1050                              'type': 'generic',
1051                              'name': 'Generic Publish-Subscribe Service'}
1052
1053        self.pubSubFeatures = []
1054
1055
1056    def connectionMade(self):
1057        self.xmlstream.addObserver(PUBSUB_REQUEST, self.handleRequest)
1058
1059
1060    def getDiscoInfo(self, requestor, target, nodeIdentifier):
1061        def toInfo(nodeInfo, info):
1062            if not nodeInfo:
1063                return info
1064
1065            (nodeType, metaData) = nodeInfo['type'], nodeInfo['meta-data']
1066            info.append(disco.DiscoIdentity('pubsub', nodeType))
1067            if metaData:
1068                form = data_form.Form(formType="result",
1069                                      formNamespace=NS_PUBSUB_META_DATA)
1070                form.addField(
1071                        data_form.Field(
1072                            var='pubsub#node_type',
1073                            value=nodeType,
1074                            label='The type of node (collection or leaf)'
1075                        )
1076                )
1077
1078                for metaDatum in metaData:
1079                    form.addField(data_form.Field.fromDict(metaDatum))
1080
1081                info.append(form)
1082
1083            return info
1084
1085        info = []
1086
1087        request = PubSubRequest('discoInfo')
1088
1089        if self.resource is not None:
1090            resource = self.resource.locateResource(request)
1091            identity = resource.discoIdentity
1092            features = resource.features
1093            getInfo = resource.getInfo
1094        else:
1095            category, idType, name = self.discoIdentity
1096            identity = disco.DiscoIdentity(category, idType, name)
1097            features = self.pubSubFeatures
1098            getInfo = self.getNodeInfo
1099
1100        if not nodeIdentifier:
1101            info.append(identity)
1102            info.append(disco.DiscoFeature(disco.NS_DISCO_ITEMS))
1103            info.extend([disco.DiscoFeature("%s#%s" % (NS_PUBSUB, feature))
1104                         for feature in features])
1105
1106        d = getInfo(requestor, target, nodeIdentifier or '')
1107        d.addCallback(toInfo, info)
1108        d.addErrback(log.err)
1109        return d
1110
1111
1112    def getDiscoItems(self, requestor, target, nodeIdentifier):
1113        if self.hideNodes:
1114            d = defer.succeed([])
1115        elif self.resource is not None:
1116            request = PubSubRequest('discoInfo')
1117            resource = self.resource.locateResource(request)
1118            d = resource.getNodes(requestor, target, nodeIdentifier)
1119        elif nodeIdentifier:
1120            d = self.getNodes(requestor, target)
1121        else:
1122            d = defer.succeed([])
1123
1124        d.addCallback(lambda nodes: [disco.DiscoItem(target, node)
1125                                     for node in nodes])
1126        return d
1127
1128
1129    def _onPubSubRequest(self, iq):
1130        request = PubSubRequest.fromElement(iq)
1131
1132        if self.resource is not None:
1133            resource = self.resource.locateResource(request)
1134        else:
1135            resource = self
1136
1137        # Preprocess the request, knowing the handling resource
1138        try:
1139            preProcessor = getattr(self, '_preProcess_%s' % request.verb)
1140        except AttributeError:
1141            pass
1142        else:
1143            request = preProcessor(resource, request)
1144            if request is None:
1145                return defer.succeed(None)
1146
1147        # Process the request itself,
1148        if resource is not self:
1149            try:
1150                handler = getattr(resource, request.verb)
1151            except AttributeError:
1152                # fix lookup feature
1153                text = "Request verb: %s" % request.verb
1154                return defer.fail(Unsupported('', text))
1155
1156            d = handler(request)
1157        else:
1158            handlerName, argNames = self._legacyHandlers[request.verb]
1159            handler = getattr(self, handlerName)
1160            args = [getattr(request, arg) for arg in argNames]
1161            if 'options' in argNames:
1162                args[argNames.index('options')] = request.options.getValues()
1163            d = handler(*args)
1164
1165        # If needed, translate the result into a response
1166        try:
1167            cb = getattr(self, '_toResponse_%s' % request.verb)
1168        except AttributeError:
1169            pass
1170        else:
1171            d.addCallback(cb, resource, request)
1172
1173        return d
1174
1175
1176    def _toResponse_subscribe(self, result, resource, request):
1177        response = domish.Element((NS_PUBSUB, "pubsub"))
1178        subscription = response.addChild(result.toElement())
1179        return response
1180
1181
1182    def _toResponse_subscriptions(self, result, resource, request):
1183        response = domish.Element((NS_PUBSUB, 'pubsub'))
1184        subscriptions = response.addElement('subscriptions')
1185        for subscription in result:
1186            subscriptions.addChild(subscription.toElement())
1187        return response
1188
1189
1190    def _toResponse_affiliations(self, result, resource, request):
1191        response = domish.Element((NS_PUBSUB, 'pubsub'))
1192        affiliations = response.addElement('affiliations')
1193
1194        for nodeIdentifier, affiliation in result:
1195            item = affiliations.addElement('affiliation')
1196            item['node'] = nodeIdentifier
1197            item['affiliation'] = affiliation
1198
1199        return response
1200
1201
1202    def _toResponse_create(self, result, resource, request):
1203        if not request.nodeIdentifier or request.nodeIdentifier != result:
1204            response = domish.Element((NS_PUBSUB, 'pubsub'))
1205            create = response.addElement('create')
1206            create['node'] = result
1207            return response
1208        else:
1209            return None
1210
1211
1212    def _formFromConfiguration(self, resource, values):
1213        fieldDefs = resource.getConfigurationOptions()
1214        form = data_form.Form(formType="form",
1215                              formNamespace=NS_PUBSUB_NODE_CONFIG)
1216        form.makeFields(values, fieldDefs)
1217        return form
1218
1219
1220    def _checkConfiguration(self, resource, form):
1221        fieldDefs = resource.getConfigurationOptions()
1222        form.typeCheck(fieldDefs, filterUnknown=True)
1223
1224
1225    def _preProcess_create(self, resource, request):
1226        if request.options:
1227            self._checkConfiguration(resource, request.options)
1228        return request
1229
1230
1231    def _preProcess_default(self, resource, request):
1232        if request.nodeType not in ('leaf', 'collection'):
1233            raise error.StanzaError('not-acceptable')
1234        else:
1235            return request
1236
1237
1238    def _toResponse_default(self, options, resource, request):
1239        response = domish.Element((NS_PUBSUB_OWNER, "pubsub"))
1240        default = response.addElement("default")
1241        form = self._formFromConfiguration(resource, options)
1242        default.addChild(form.toElement())
1243        return response
1244
1245
1246    def _toResponse_configureGet(self, options, resource, request):
1247        response = domish.Element((NS_PUBSUB_OWNER, "pubsub"))
1248        configure = response.addElement("configure")
1249        form = self._formFromConfiguration(resource, options)
1250        configure.addChild(form.toElement())
1251
1252        if request.nodeIdentifier:
1253            configure["node"] = request.nodeIdentifier
1254
1255        return response
1256
1257
1258    def _preProcess_configureSet(self, resource, request):
1259        if request.options.formType == 'cancel':
1260            return None
1261        else:
1262            self._checkConfiguration(resource, request.options)
1263            return request
1264
1265
1266    def _toResponse_items(self, result, resource, request):
1267        response = domish.Element((NS_PUBSUB, 'pubsub'))
1268        items = response.addElement('items')
1269        items["node"] = request.nodeIdentifier
1270
1271        for item in result:
1272            items.addChild(item)
1273
1274        return response
1275
1276
1277    def _createNotification(self, eventType, service, nodeIdentifier,
1278                                  subscriber, subscriptions=None):
1279        headers = []
1280
1281        if subscriptions:
1282            for subscription in subscriptions:
1283                if nodeIdentifier != subscription.nodeIdentifier:
1284                    headers.append(('Collection', subscription.nodeIdentifier))
1285
1286        message = domish.Element((None, "message"))
1287        message["from"] = service.full()
1288        message["to"] = subscriber.full()
1289        event = message.addElement((NS_PUBSUB_EVENT, "event"))
1290
1291        element = event.addElement(eventType)
1292        element["node"] = nodeIdentifier
1293
1294        if headers:
1295            message.addChild(shim.Headers(headers))
1296
1297        return message
1298
1299    # public methods
1300
1301    def notifyPublish(self, service, nodeIdentifier, notifications):
1302        for subscriber, subscriptions, items in notifications:
1303            message = self._createNotification('items', service,
1304                                               nodeIdentifier, subscriber,
1305                                               subscriptions)
1306            message.event.items.children = items
1307            self.send(message)
1308
1309
1310    def notifyDelete(self, service, nodeIdentifier, subscribers,
1311                           redirectURI=None):
1312        for subscriber in subscribers:
1313            message = self._createNotification('delete', service,
1314                                               nodeIdentifier,
1315                                               subscriber)
1316            if redirectURI:
1317                redirect = message.event.delete.addElement('redirect')
1318                redirect['uri'] = redirectURI
1319            self.send(message)
1320
1321
1322    def getNodeInfo(self, requestor, service, nodeIdentifier):
1323        return None
1324
1325
1326    def getNodes(self, requestor, service):
1327        return []
1328
1329
1330    def publish(self, requestor, service, nodeIdentifier, items):
1331        raise Unsupported('publish')
1332
1333
1334    def subscribe(self, requestor, service, nodeIdentifier, subscriber):
1335        raise Unsupported('subscribe')
1336
1337
1338    def unsubscribe(self, requestor, service, nodeIdentifier, subscriber):
1339        raise Unsupported('subscribe')
1340
1341
1342    def subscriptions(self, requestor, service):
1343        raise Unsupported('retrieve-subscriptions')
1344
1345
1346    def affiliations(self, requestor, service):
1347        raise Unsupported('retrieve-affiliations')
1348
1349
1350    def create(self, requestor, service, nodeIdentifier):
1351        raise Unsupported('create-nodes')
1352
1353
1354    def getConfigurationOptions(self):
1355        return {}
1356
1357
1358    def getDefaultConfiguration(self, requestor, service, nodeType):
1359        raise Unsupported('retrieve-default')
1360
1361
1362    def getConfiguration(self, requestor, service, nodeIdentifier):
1363        raise Unsupported('config-node')
1364
1365
1366    def setConfiguration(self, requestor, service, nodeIdentifier, options):
1367        raise Unsupported('config-node')
1368
1369
1370    def items(self, requestor, service, nodeIdentifier, maxItems,
1371                    itemIdentifiers):
1372        raise Unsupported('retrieve-items')
1373
1374
1375    def retract(self, requestor, service, nodeIdentifier, itemIdentifiers):
1376        raise Unsupported('retract-items')
1377
1378
1379    def purge(self, requestor, service, nodeIdentifier):
1380        raise Unsupported('purge-nodes')
1381
1382
1383    def delete(self, requestor, service, nodeIdentifier):
1384        raise Unsupported('delete-nodes')
1385
1386
1387
1388class PubSubResource(object):
1389
1390    implements(IPubSubResource)
1391
1392    features = []
1393    discoIdentity = disco.DiscoIdentity('pubsub',
1394                                        'service',
1395                                        'Publish-Subscribe Service')
1396
1397
1398    def locateResource(self, request):
1399        return self
1400
1401
1402    def getInfo(self, requestor, service, nodeIdentifier):
1403        return defer.succeed(None)
1404
1405
1406    def getNodes(self, requestor, service, nodeIdentifier):
1407        return defer.succeed([])
1408
1409
1410    def getConfigurationOptions(self):
1411        return {}
1412
1413
1414    def publish(self, request):
1415        return defer.fail(Unsupported('publish'))
1416
1417
1418    def subscribe(self, request):
1419        return defer.fail(Unsupported('subscribe'))
1420
1421
1422    def unsubscribe(self, request):
1423        return defer.fail(Unsupported('subscribe'))
1424
1425
1426    def subscriptions(self, request):
1427        return defer.fail(Unsupported('retrieve-subscriptions'))
1428
1429
1430    def affiliations(self, request):
1431        return defer.fail(Unsupported('retrieve-affiliations'))
1432
1433
1434    def create(self, request):
1435        return defer.fail(Unsupported('create-nodes'))
1436
1437
1438    def default(self, request):
1439        return defer.fail(Unsupported('retrieve-default'))
1440
1441
1442    def configureGet(self, request):
1443        return defer.fail(Unsupported('config-node'))
1444
1445
1446    def configureSet(self, request):
1447        return defer.fail(Unsupported('config-node'))
1448
1449
1450    def items(self, request):
1451        return defer.fail(Unsupported('retrieve-items'))
1452
1453
1454    def retract(self, request):
1455        return defer.fail(Unsupported('retract-items'))
1456
1457
1458    def purge(self, request):
1459        return defer.fail(Unsupported('purge-nodes'))
1460
1461
1462    def delete(self, request):
1463        return defer.fail(Unsupported('delete-nodes'))
1464
1465
1466    def affiliationsGet(self, request):
1467        return defer.fail(Unsupported('modify-affiliations'))
1468
1469
1470    def affiliationsSet(self, request):
1471        return defer.fail(Unsupported('modify-affiliations'))
1472
1473
1474    def subscriptionsGet(self, request):
1475        return defer.fail(Unsupported('manage-subscriptions'))
1476
1477
1478    def subscriptionsSet(self, request):
1479        return defer.fail(Unsupported('manage-subscriptions'))
Note: See TracBrowser for help on using the repository browser.