source: wokkel/pubsub.py @ 90:1ffd4dd005e4

Last change on this file since 90:1ffd4dd005e4 was 90:1ffd4dd005e4, checked in by Ralph Meijer <ralphm@…>, 11 years ago

Ignore error messages in PubSubClient?.

Author: ralphm
Reviewer: arjan
Fixes: #69

File size: 47.3 KB
Line 
1# -*- test-case-name: wokkel.test.test_pubsub -*-
2#
3# Copyright (c) 2003-2010 Ralph Meijer
4# See LICENSE for details.
5
6"""
7XMPP publish-subscribe protocol.
8
9This protocol is specified in
10U{XEP-0060<http://www.xmpp.org/extensions/xep-0060.html>}.
11"""
12
13from zope.interface import implements
14
15from twisted.internet import defer
16from twisted.python import log
17from twisted.words.protocols.jabber import jid, error
18from twisted.words.xish import domish
19
20from wokkel import disco, data_form, generic, shim
21from wokkel.compat import IQ
22from wokkel.subprotocols import IQHandlerMixin, XMPPHandler
23from wokkel.iwokkel import IPubSubClient, IPubSubService, IPubSubResource
24
25# Iq get and set XPath queries
26IQ_GET = '/iq[@type="get"]'
27IQ_SET = '/iq[@type="set"]'
28
29# Publish-subscribe namespaces
30NS_PUBSUB = 'http://jabber.org/protocol/pubsub'
31NS_PUBSUB_EVENT = NS_PUBSUB + '#event'
32NS_PUBSUB_ERRORS = NS_PUBSUB + '#errors'
33NS_PUBSUB_OWNER = NS_PUBSUB + "#owner"
34NS_PUBSUB_NODE_CONFIG = NS_PUBSUB + "#node_config"
35NS_PUBSUB_META_DATA = NS_PUBSUB + "#meta-data"
36NS_PUBSUB_SUBSCRIBE_OPTIONS = NS_PUBSUB + "#subscribe_options"
37
38# XPath to match pubsub requests
39PUBSUB_REQUEST = '/iq[@type="get" or @type="set"]/' + \
40                    'pubsub[@xmlns="' + NS_PUBSUB + '" or ' + \
41                           '@xmlns="' + NS_PUBSUB_OWNER + '"]'
42
43class SubscriptionPending(Exception):
44    """
45    Raised when the requested subscription is pending acceptance.
46    """
47
48
49
50class SubscriptionUnconfigured(Exception):
51    """
52    Raised when the requested subscription needs to be configured before
53    becoming active.
54    """
55
56
57
58class PubSubError(error.StanzaError):
59    """
60    Exception with publish-subscribe specific condition.
61    """
62    def __init__(self, condition, pubsubCondition, feature=None, text=None):
63        appCondition = domish.Element((NS_PUBSUB_ERRORS, pubsubCondition))
64        if feature:
65            appCondition['feature'] = feature
66        error.StanzaError.__init__(self, condition,
67                                         text=text,
68                                         appCondition=appCondition)
69
70
71
72class BadRequest(error.StanzaError):
73    """
74    Bad request stanza error.
75    """
76    def __init__(self, pubsubCondition=None, text=None):
77        if pubsubCondition:
78            appCondition = domish.Element((NS_PUBSUB_ERRORS, pubsubCondition))
79        else:
80            appCondition = None
81        error.StanzaError.__init__(self, 'bad-request',
82                                         text=text,
83                                         appCondition=appCondition)
84
85
86
87class Unsupported(PubSubError):
88    def __init__(self, feature, text=None):
89        self.feature = feature
90        PubSubError.__init__(self, 'feature-not-implemented',
91                                   'unsupported',
92                                   feature,
93                                   text)
94
95    def __str__(self):
96        message = PubSubError.__str__(self)
97        message += ', feature %r' % self.feature
98        return message
99
100
101class Subscription(object):
102    """
103    A subscription to a node.
104
105    @ivar nodeIdentifier: The identifier of the node subscribed to.  The root
106        node is denoted by C{None}.
107    @type nodeIdentifier: C{unicode}
108
109    @ivar subscriber: The subscribing entity.
110    @type subscriber: L{jid.JID}
111
112    @ivar state: The subscription state. One of C{'subscribed'}, C{'pending'},
113                 C{'unconfigured'}.
114    @type state: C{unicode}
115
116    @ivar options: Optional list of subscription options.
117    @type options: C{dict}
118
119    @ivar subscriptionIdentifier: Optional subscription identifier.
120    @type subscriptionIdentifier: C{unicode}
121    """
122
123    def __init__(self, nodeIdentifier, subscriber, state, options=None,
124                       subscriptionIdentifier=None):
125        self.nodeIdentifier = nodeIdentifier
126        self.subscriber = subscriber
127        self.state = state
128        self.options = options or {}
129        self.subscriptionIdentifier = subscriptionIdentifier
130
131
132    @staticmethod
133    def fromElement(element):
134        return Subscription(
135                element.getAttribute('node'),
136                jid.JID(element.getAttribute('jid')),
137                element.getAttribute('subscription'),
138                subscriptionIdentifier=element.getAttribute('subid'))
139
140
141    def toElement(self):
142        """
143        Return the DOM representation of this subscription.
144
145        @rtype: L{domish.Element}
146        """
147        element = domish.Element((None, 'subscription'))
148        if self.nodeIdentifier:
149            element['node'] = self.nodeIdentifier
150        element['jid'] = unicode(self.subscriber)
151        element['subscription'] = self.state
152        if self.subscriptionIdentifier:
153            element['subid'] = self.subscriptionIdentifier
154        return element
155
156
157
158class Item(domish.Element):
159    """
160    Publish subscribe item.
161
162    This behaves like an object providing L{domish.IElement}.
163
164    Item payload can be added using C{addChild} or C{addRawXml}, or using the
165    C{payload} keyword argument to C{__init__}.
166    """
167
168    def __init__(self, id=None, payload=None):
169        """
170        @param id: optional item identifier
171        @type id: L{unicode}
172        @param payload: optional item payload. Either as a domish element, or
173                        as serialized XML.
174        @type payload: object providing L{domish.IElement} or L{unicode}.
175        """
176
177        domish.Element.__init__(self, (None, 'item'))
178        if id is not None:
179            self['id'] = id
180        if payload is not None:
181            if isinstance(payload, basestring):
182                self.addRawXml(payload)
183            else:
184                self.addChild(payload)
185
186
187
188class PubSubRequest(generic.Stanza):
189    """
190    A publish-subscribe request.
191
192    The set of instance variables used depends on the type of request. If
193    a variable is not applicable or not passed in the request, its value is
194    C{None}.
195
196    @ivar verb: The type of publish-subscribe request. See L{_requestVerbMap}.
197    @type verb: C{str}.
198
199    @ivar affiliations: Affiliations to be modified.
200    @type affiliations: C{set}
201    @ivar items: The items to be published, as L{domish.Element}s.
202    @type items: C{list}
203    @ivar itemIdentifiers: Identifiers of the items to be retrieved or
204                           retracted.
205    @type itemIdentifiers: C{set}
206    @ivar maxItems: Maximum number of items to retrieve.
207    @type maxItems: C{int}.
208    @ivar nodeIdentifier: Identifier of the node the request is about.
209    @type nodeIdentifier: C{unicode}
210    @ivar nodeType: The type of node that should be created, or for which the
211                    configuration is retrieved. C{'leaf'} or C{'collection'}.
212    @type nodeType: C{str}
213    @ivar options: Configurations options for nodes, subscriptions and publish
214                   requests.
215    @type options: L{data_form.Form}
216    @ivar subscriber: The subscribing entity.
217    @type subscriber: L{JID}
218    @ivar subscriptionIdentifier: Identifier for a specific subscription.
219    @type subscriptionIdentifier: C{unicode}
220    @ivar subscriptions: Subscriptions to be modified, as a set of
221                         L{Subscription}.
222    @type subscriptions: C{set}
223    """
224
225    verb = None
226
227    affiliations = None
228    items = None
229    itemIdentifiers = None
230    maxItems = None
231    nodeIdentifier = None
232    nodeType = None
233    options = None
234    subscriber = None
235    subscriptionIdentifier = None
236    subscriptions = None
237
238    # Map request iq type and subelement name to request verb
239    _requestVerbMap = {
240        ('set', NS_PUBSUB, 'publish'): 'publish',
241        ('set', NS_PUBSUB, 'subscribe'): 'subscribe',
242        ('set', NS_PUBSUB, 'unsubscribe'): 'unsubscribe',
243        ('get', NS_PUBSUB, 'options'): 'optionsGet',
244        ('set', NS_PUBSUB, 'options'): 'optionsSet',
245        ('get', NS_PUBSUB, 'subscriptions'): 'subscriptions',
246        ('get', NS_PUBSUB, 'affiliations'): 'affiliations',
247        ('set', NS_PUBSUB, 'create'): 'create',
248        ('get', NS_PUBSUB_OWNER, 'default'): 'default',
249        ('get', NS_PUBSUB_OWNER, 'configure'): 'configureGet',
250        ('set', NS_PUBSUB_OWNER, 'configure'): 'configureSet',
251        ('get', NS_PUBSUB, 'items'): 'items',
252        ('set', NS_PUBSUB, 'retract'): 'retract',
253        ('set', NS_PUBSUB_OWNER, 'purge'): 'purge',
254        ('set', NS_PUBSUB_OWNER, 'delete'): 'delete',
255        ('get', NS_PUBSUB_OWNER, 'affiliations'): 'affiliationsGet',
256        ('set', NS_PUBSUB_OWNER, 'affiliations'): 'affiliationsSet',
257        ('get', NS_PUBSUB_OWNER, 'subscriptions'): 'subscriptionsGet',
258        ('set', NS_PUBSUB_OWNER, 'subscriptions'): 'subscriptionsSet',
259    }
260
261    # Map request verb to request iq type and subelement name
262    _verbRequestMap = dict(((v, k) for k, v in _requestVerbMap.iteritems()))
263
264    # Map request verb to parameter handler names
265    _parameters = {
266        'publish': ['node', 'items'],
267        'subscribe': ['nodeOrEmpty', 'jid', 'optionsWithSubscribe'],
268        'unsubscribe': ['nodeOrEmpty', 'jid', 'subidOrNone'],
269        'optionsGet': ['nodeOrEmpty', 'jid', 'subidOrNone'],
270        'optionsSet': ['nodeOrEmpty', 'jid', 'options', 'subidOrNone'],
271        'subscriptions': [],
272        'affiliations': [],
273        'create': ['nodeOrNone', 'configureOrNone'],
274        'default': ['default'],
275        'configureGet': ['nodeOrEmpty'],
276        'configureSet': ['nodeOrEmpty', 'configure'],
277        'items': ['node', 'maxItems', 'itemIdentifiers', 'subidOrNone'],
278        'retract': ['node', 'itemIdentifiers'],
279        'purge': ['node'],
280        'delete': ['node'],
281        'affiliationsGet': ['nodeOrEmpty'],
282        'affiliationsSet': [],
283        'subscriptionsGet': ['nodeOrEmpty'],
284        'subscriptionsSet': [],
285    }
286
287    def __init__(self, verb=None):
288        self.verb = verb
289
290
291    def _parse_node(self, verbElement):
292        """
293        Parse the required node identifier out of the verbElement.
294        """
295        try:
296            self.nodeIdentifier = verbElement["node"]
297        except KeyError:
298            raise BadRequest('nodeid-required')
299
300
301    def _render_node(self, verbElement):
302        """
303        Render the required node identifier on the verbElement.
304        """
305        if not self.nodeIdentifier:
306            raise Exception("Node identifier is required")
307
308        verbElement['node'] = self.nodeIdentifier
309
310
311    def _parse_nodeOrEmpty(self, verbElement):
312        """
313        Parse the node identifier out of the verbElement. May be empty.
314        """
315        self.nodeIdentifier = verbElement.getAttribute("node", '')
316
317
318    def _render_nodeOrEmpty(self, verbElement):
319        """
320        Render the node identifier on the verbElement. May be empty.
321        """
322        if self.nodeIdentifier:
323            verbElement['node'] = self.nodeIdentifier
324
325
326    def _parse_nodeOrNone(self, verbElement):
327        """
328        Parse the optional node identifier out of the verbElement.
329        """
330        self.nodeIdentifier = verbElement.getAttribute("node")
331
332
333    def _render_nodeOrNone(self, verbElement):
334        """
335        Render the optional node identifier on the verbElement.
336        """
337        if self.nodeIdentifier:
338            verbElement['node'] = self.nodeIdentifier
339
340
341    def _parse_items(self, verbElement):
342        """
343        Parse items out of the verbElement for publish requests.
344        """
345        self.items = []
346        for element in verbElement.elements():
347            if element.uri == NS_PUBSUB and element.name == 'item':
348                self.items.append(element)
349
350
351    def _render_items(self, verbElement):
352        """
353        Render items into the verbElement for publish requests.
354        """
355        if self.items:
356            for item in self.items:
357                verbElement.addChild(item)
358
359
360    def _parse_jid(self, verbElement):
361        """
362        Parse subscriber out of the verbElement for un-/subscribe requests.
363        """
364        try:
365            self.subscriber = jid.internJID(verbElement["jid"])
366        except KeyError:
367            raise BadRequest('jid-required')
368
369
370    def _render_jid(self, verbElement):
371        """
372        Render subscriber into the verbElement for un-/subscribe requests.
373        """
374        verbElement['jid'] = self.subscriber.full()
375
376
377    def _parse_default(self, verbElement):
378        """
379        Parse node type out of a request for the default node configuration.
380        """
381        form = data_form.findForm(verbElement, NS_PUBSUB_NODE_CONFIG)
382        if form and form.formType == 'submit':
383            values = form.getValues()
384            self.nodeType = values.get('pubsub#node_type', 'leaf')
385        else:
386            self.nodeType = 'leaf'
387
388
389    def _parse_configure(self, verbElement):
390        """
391        Parse options out of a request for setting the node configuration.
392        """
393        form = data_form.findForm(verbElement, NS_PUBSUB_NODE_CONFIG)
394        if form:
395            if form.formType in ('submit', 'cancel'):
396                self.options = form
397            else:
398                raise BadRequest(text=u"Unexpected form type '%s'" % form.formType)
399        else:
400            raise BadRequest(text="Missing configuration form")
401
402
403    def _parse_configureOrNone(self, verbElement):
404        """
405        Parse optional node configuration form in create request.
406        """
407        for element in verbElement.parent.elements():
408            if element.uri == NS_PUBSUB and element.name == 'configure':
409                form = data_form.findForm(element, NS_PUBSUB_NODE_CONFIG)
410                if form:
411                    if form.formType != 'submit':
412                        raise BadRequest(text=u"Unexpected form type '%s'" %
413                                              form.formType)
414                else:
415                    form = data_form.Form('submit',
416                                          formNamespace=NS_PUBSUB_NODE_CONFIG)
417                self.options = form
418
419
420    def _render_configureOrNone(self, verbElement):
421        """
422        Render optional node configuration form in create request.
423        """
424        if self.options is not None:
425            configure = verbElement.parent.addElement('configure')
426            configure.addChild(self.options.toElement())
427
428
429    def _parse_itemIdentifiers(self, verbElement):
430        """
431        Parse item identifiers out of items and retract requests.
432        """
433        self.itemIdentifiers = []
434        for element in verbElement.elements():
435            if element.uri == NS_PUBSUB and element.name == 'item':
436                try:
437                    self.itemIdentifiers.append(element["id"])
438                except KeyError:
439                    raise BadRequest()
440
441
442    def _render_itemIdentifiers(self, verbElement):
443        """
444        Render item identifiers into items and retract requests.
445        """
446        if self.itemIdentifiers:
447            for itemIdentifier in self.itemIdentifiers:
448                item = verbElement.addElement('item')
449                item['id'] = itemIdentifier
450
451
452    def _parse_maxItems(self, verbElement):
453        """
454        Parse maximum items out of an items request.
455        """
456        value = verbElement.getAttribute('max_items')
457
458        if value:
459            try:
460                self.maxItems = int(value)
461            except ValueError:
462                raise BadRequest(text="Field max_items requires a positive " +
463                                      "integer value")
464
465
466    def _render_maxItems(self, verbElement):
467        """
468        Render maximum items into an items request.
469        """
470        if self.maxItems:
471            verbElement['max_items'] = unicode(self.maxItems)
472
473
474    def _parse_subidOrNone(self, verbElement):
475        """
476        Parse subscription identifier out of a request.
477        """
478        self.subscriptionIdentifier = verbElement.getAttribute("subid")
479
480
481    def _render_subidOrNone(self, verbElement):
482        """
483        Render subscription identifier into a request.
484        """
485        if self.subscriptionIdentifier:
486            verbElement['subid'] = self.subscriptionIdentifier
487
488
489    def _parse_options(self, verbElement):
490        """
491        Parse options form out of a subscription options request.
492        """
493        form = data_form.findForm(verbElement, NS_PUBSUB_SUBSCRIBE_OPTIONS)
494        if form:
495            if form.formType in ('submit', 'cancel'):
496                self.options = form
497            else:
498                raise BadRequest(text=u"Unexpected form type '%s'" % form.formType)
499        else:
500            raise BadRequest(text="Missing options form")
501
502
503
504    def _render_options(self, verbElement):
505        verbElement.addChild(self.options.toElement())
506
507
508    def _parse_optionsWithSubscribe(self, verbElement):
509        for element in verbElement.parent.elements():
510            if element.name == 'options' and element.uri == NS_PUBSUB:
511                form = data_form.findForm(element,
512                                          NS_PUBSUB_SUBSCRIBE_OPTIONS)
513                if form:
514                    if form.formType != 'submit':
515                        raise BadRequest(text=u"Unexpected form type '%s'" %
516                                              form.formType)
517                else:
518                    form = data_form.Form('submit',
519                                          formNamespace=NS_PUBSUB_SUBSCRIBE_OPTIONS)
520                self.options = form
521
522
523    def _render_optionsWithSubscribe(self, verbElement):
524        if self.options:
525            optionsElement = verbElement.parent.addElement('options')
526            self._render_options(optionsElement)
527
528
529    def parseElement(self, element):
530        """
531        Parse the publish-subscribe verb and parameters out of a request.
532        """
533        generic.Stanza.parseElement(self, element)
534
535        verbs = []
536        children = []
537        for child in element.pubsub.elements():
538            key = (self.stanzaType, child.uri, child.name)
539            try:
540                verb = self._requestVerbMap[key]
541            except KeyError:
542                continue
543
544            verbs.append(verb)
545            children.append(child)
546
547        if not verbs:
548            raise NotImplementedError()
549
550        if len(verbs) > 1:
551            if 'optionsSet' in verbs and 'subscribe' in verbs:
552                self.verb = 'subscribe'
553                child = children[verbs.index('subscribe')]
554            else:
555                raise NotImplementedError()
556        else:
557            self.verb = verbs[0]
558
559        for parameter in self._parameters[self.verb]:
560            getattr(self, '_parse_%s' % parameter)(child)
561
562
563
564    def send(self, xs):
565        """
566        Send this request to its recipient.
567
568        This renders all of the relevant parameters for this specific
569        requests into an L{IQ}, and invoke its C{send} method.
570        This returns a deferred that fires upon reception of a response. See
571        L{IQ} for details.
572
573        @param xs: The XML stream to send the request on.
574        @type xs: L{xmlstream.XmlStream}
575        @rtype: L{defer.Deferred}.
576        """
577
578        try:
579            (self.stanzaType,
580             childURI,
581             childName) = self._verbRequestMap[self.verb]
582        except KeyError:
583            raise NotImplementedError()
584
585        iq = IQ(xs, self.stanzaType)
586        iq.addElement((childURI, 'pubsub'))
587        verbElement = iq.pubsub.addElement(childName)
588
589        if self.sender:
590            iq['from'] = self.sender.full()
591        if self.recipient:
592            iq['to'] = self.recipient.full()
593
594        for parameter in self._parameters[self.verb]:
595            getattr(self, '_render_%s' % parameter)(verbElement)
596
597        return iq.send()
598
599
600
601class PubSubEvent(object):
602    """
603    A publish subscribe event.
604
605    @param sender: The entity from which the notification was received.
606    @type sender: L{jid.JID}
607    @param recipient: The entity to which the notification was sent.
608    @type recipient: L{wokkel.pubsub.ItemsEvent}
609    @param nodeIdentifier: Identifier of the node the event pertains to.
610    @type nodeIdentifier: C{unicode}
611    @param headers: SHIM headers, see L{wokkel.shim.extractHeaders}.
612    @type headers: L{dict}
613    """
614
615    def __init__(self, sender, recipient, nodeIdentifier, headers):
616        self.sender = sender
617        self.recipient = recipient
618        self.nodeIdentifier = nodeIdentifier
619        self.headers = headers
620
621
622
623class ItemsEvent(PubSubEvent):
624    """
625    A publish-subscribe event that signifies new, updated and retracted items.
626
627    @param items: List of received items as domish elements.
628    @type items: C{list} of L{domish.Element}
629    """
630
631    def __init__(self, sender, recipient, nodeIdentifier, items, headers):
632        PubSubEvent.__init__(self, sender, recipient, nodeIdentifier, headers)
633        self.items = items
634
635
636
637class DeleteEvent(PubSubEvent):
638    """
639    A publish-subscribe event that signifies the deletion of a node.
640    """
641
642    redirectURI = None
643
644
645
646class PurgeEvent(PubSubEvent):
647    """
648    A publish-subscribe event that signifies the purging of a node.
649    """
650
651
652
653class PubSubClient(XMPPHandler):
654    """
655    Publish subscribe client protocol.
656    """
657
658    implements(IPubSubClient)
659
660    def connectionInitialized(self):
661        self.xmlstream.addObserver('/message/event[@xmlns="%s"]' %
662                                   NS_PUBSUB_EVENT, self._onEvent)
663
664
665    def _onEvent(self, message):
666        if message.getAttribute('type') == 'error':
667            return
668
669        try:
670            sender = jid.JID(message["from"])
671            recipient = jid.JID(message["to"])
672        except KeyError:
673            return
674
675        actionElement = None
676        for element in message.event.elements():
677            if element.uri == NS_PUBSUB_EVENT:
678                actionElement = element
679
680        if not actionElement:
681            return
682
683        eventHandler = getattr(self, "_onEvent_%s" % actionElement.name, None)
684
685        if eventHandler:
686            headers = shim.extractHeaders(message)
687            eventHandler(sender, recipient, actionElement, headers)
688            message.handled = True
689
690
691    def _onEvent_items(self, sender, recipient, action, headers):
692        nodeIdentifier = action["node"]
693
694        items = [element for element in action.elements()
695                         if element.name in ('item', 'retract')]
696
697        event = ItemsEvent(sender, recipient, nodeIdentifier, items, headers)
698        self.itemsReceived(event)
699
700
701    def _onEvent_delete(self, sender, recipient, action, headers):
702        nodeIdentifier = action["node"]
703        event = DeleteEvent(sender, recipient, nodeIdentifier, headers)
704        if action.redirect:
705            event.redirectURI = action.redirect.getAttribute('uri')
706        self.deleteReceived(event)
707
708
709    def _onEvent_purge(self, sender, recipient, action, headers):
710        nodeIdentifier = action["node"]
711        event = PurgeEvent(sender, recipient, nodeIdentifier, headers)
712        self.purgeReceived(event)
713
714
715    def itemsReceived(self, event):
716        pass
717
718
719    def deleteReceived(self, event):
720        pass
721
722
723    def purgeReceived(self, event):
724        pass
725
726
727    def createNode(self, service, nodeIdentifier=None, options=None,
728                         sender=None):
729        """
730        Create a publish subscribe node.
731
732        @param service: The publish subscribe service to create the node at.
733        @type service: L{JID}
734        @param nodeIdentifier: Optional suggestion for the id of the node.
735        @type nodeIdentifier: C{unicode}
736        @param options: Optional node configuration options.
737        @type options: C{dict}
738        """
739        request = PubSubRequest('create')
740        request.recipient = service
741        request.nodeIdentifier = nodeIdentifier
742        request.sender = sender
743
744        if options:
745            form = data_form.Form(formType='submit',
746                                  formNamespace=NS_PUBSUB_NODE_CONFIG)
747            form.makeFields(options)
748            request.options = form
749
750        def cb(iq):
751            try:
752                new_node = iq.pubsub.create["node"]
753            except AttributeError:
754                # the suggested node identifier was accepted
755                new_node = nodeIdentifier
756            return new_node
757
758        d = request.send(self.xmlstream)
759        d.addCallback(cb)
760        return d
761
762
763    def deleteNode(self, service, nodeIdentifier, sender=None):
764        """
765        Delete a publish subscribe node.
766
767        @param service: The publish subscribe service to delete the node from.
768        @type service: L{JID}
769        @param nodeIdentifier: The identifier of the node.
770        @type nodeIdentifier: C{unicode}
771        """
772        request = PubSubRequest('delete')
773        request.recipient = service
774        request.nodeIdentifier = nodeIdentifier
775        request.sender = sender
776        return request.send(self.xmlstream)
777
778
779    def subscribe(self, service, nodeIdentifier, subscriber,
780                        options=None, sender=None):
781        """
782        Subscribe to a publish subscribe node.
783
784        @param service: The publish subscribe service that keeps the node.
785        @type service: L{JID}
786
787        @param nodeIdentifier: The identifier of the node.
788        @type nodeIdentifier: C{unicode}
789
790        @param subscriber: The entity to subscribe to the node. This entity
791            will get notifications of new published items.
792        @type subscriber: L{JID}
793
794        @param options: Subscription options.
795        @type options: C{dict}
796
797        @return: Deferred that fires with L{Subscription} or errbacks with
798            L{SubscriptionPending} or L{SubscriptionUnconfigured}.
799        @rtype: L{defer.Deferred}
800        """
801        request = PubSubRequest('subscribe')
802        request.recipient = service
803        request.nodeIdentifier = nodeIdentifier
804        request.subscriber = subscriber
805        request.sender = sender
806
807        if options:
808            form = data_form.Form(formType='submit',
809                                  formNamespace=NS_PUBSUB_SUBSCRIBE_OPTIONS)
810            form.makeFields(options)
811            request.options = form
812
813        def cb(iq):
814            subscription = Subscription.fromElement(iq.pubsub.subscription)
815
816            if subscription.state == 'pending':
817                raise SubscriptionPending()
818            elif subscription.state == 'unconfigured':
819                raise SubscriptionUnconfigured()
820            else:
821                # we assume subscription == 'subscribed'
822                # any other value would be invalid, but that should have
823                # yielded a stanza error.
824                return subscription
825
826        d = request.send(self.xmlstream)
827        d.addCallback(cb)
828        return d
829
830
831    def unsubscribe(self, service, nodeIdentifier, subscriber,
832                          subscriptionIdentifier=None, sender=None):
833        """
834        Unsubscribe from a publish subscribe node.
835
836        @param service: The publish subscribe service that keeps the node.
837        @type service: L{JID}
838
839        @param nodeIdentifier: The identifier of the node.
840        @type nodeIdentifier: C{unicode}
841
842        @param subscriber: The entity to unsubscribe from the node.
843        @type subscriber: L{JID}
844
845        @param subscriptionIdentifier: Optional subscription identifier.
846        @type subscriptionIdentifier: C{unicode}
847        """
848        request = PubSubRequest('unsubscribe')
849        request.recipient = service
850        request.nodeIdentifier = nodeIdentifier
851        request.subscriber = subscriber
852        request.subscriptionIdentifier = subscriptionIdentifier
853        request.sender = sender
854        return request.send(self.xmlstream)
855
856
857    def publish(self, service, nodeIdentifier, items=None, sender=None):
858        """
859        Publish to a publish subscribe node.
860
861        @param service: The publish subscribe service that keeps the node.
862        @type service: L{JID}
863        @param nodeIdentifier: The identifier of the node.
864        @type nodeIdentifier: C{unicode}
865        @param items: Optional list of L{Item}s to publish.
866        @type items: C{list}
867        """
868        request = PubSubRequest('publish')
869        request.recipient = service
870        request.nodeIdentifier = nodeIdentifier
871        request.items = items
872        request.sender = sender
873        return request.send(self.xmlstream)
874
875
876    def items(self, service, nodeIdentifier, maxItems=None,
877              subscriptionIdentifier=None, sender=None):
878        """
879        Retrieve previously published items from a publish subscribe node.
880
881        @param service: The publish subscribe service that keeps the node.
882        @type service: L{JID}
883
884        @param nodeIdentifier: The identifier of the node.
885        @type nodeIdentifier: C{unicode}
886
887        @param maxItems: Optional limit on the number of retrieved items.
888        @type maxItems: C{int}
889
890        @param subscriptionIdentifier: Optional subscription identifier. In
891            case the node has been subscribed to multiple times, this narrows
892            the results to the specific subscription.
893        @type subscriptionIdentifier: C{unicode}
894        """
895        request = PubSubRequest('items')
896        request.recipient = service
897        request.nodeIdentifier = nodeIdentifier
898        if maxItems:
899            request.maxItems = str(int(maxItems))
900        request.subscriptionIdentifier = subscriptionIdentifier
901        request.sender = sender
902
903        def cb(iq):
904            items = []
905            for element in iq.pubsub.items.elements():
906                if element.uri == NS_PUBSUB and element.name == 'item':
907                    items.append(element)
908            return items
909
910        d = request.send(self.xmlstream)
911        d.addCallback(cb)
912        return d
913
914
915    def getOptions(self, service, nodeIdentifier, subscriber,
916                         subscriptionIdentifier=None, sender=None):
917        """
918        Get subscription options.
919
920        @param service: The publish subscribe service that keeps the node.
921        @type service: L{JID}
922
923        @param nodeIdentifier: The identifier of the node.
924        @type nodeIdentifier: C{unicode}
925
926        @param subscriber: The entity subscribed to the node.
927        @type subscriber: L{JID}
928
929        @param subscriptionIdentifier: Optional subscription identifier.
930        @type subscriptionIdentifier: C{unicode}
931
932        @rtype: L{data_form.Form}
933        """
934        request = PubSubRequest('optionsGet')
935        request.recipient = service
936        request.nodeIdentifier = nodeIdentifier
937        request.subscriber = subscriber
938        request.subscriptionIdentifier = subscriptionIdentifier
939        request.sender = sender
940
941        def cb(iq):
942            form = data_form.findForm(iq.pubsub.options,
943                                      NS_PUBSUB_SUBSCRIBE_OPTIONS)
944            form.typeCheck()
945            return form
946
947        d = request.send(self.xmlstream)
948        d.addCallback(cb)
949        return d
950
951
952    def setOptions(self, service, nodeIdentifier, subscriber,
953                         options, subscriptionIdentifier=None, sender=None):
954        """
955        Set subscription options.
956
957        @param service: The publish subscribe service that keeps the node.
958        @type service: L{JID}
959
960        @param nodeIdentifier: The identifier of the node.
961        @type nodeIdentifier: C{unicode}
962
963        @param subscriber: The entity subscribed to the node.
964        @type subscriber: L{JID}
965
966        @param options: Subscription options.
967        @type options: C{dict}.
968
969        @param subscriptionIdentifier: Optional subscription identifier.
970        @type subscriptionIdentifier: C{unicode}
971        """
972        request = PubSubRequest('optionsSet')
973        request.recipient = service
974        request.nodeIdentifier = nodeIdentifier
975        request.subscriber = subscriber
976        request.subscriptionIdentifier = subscriptionIdentifier
977        request.sender = sender
978
979        form = data_form.Form(formType='submit',
980                              formNamespace=NS_PUBSUB_SUBSCRIBE_OPTIONS)
981        form.makeFields(options)
982        request.options = form
983
984        d = request.send(self.xmlstream)
985        return d
986
987
988
989class PubSubService(XMPPHandler, IQHandlerMixin):
990    """
991    Protocol implementation for a XMPP Publish Subscribe Service.
992
993    The word Service here is used as taken from the Publish Subscribe
994    specification. It is the party responsible for keeping nodes and their
995    subscriptions, and sending out notifications.
996
997    Methods from the L{IPubSubService} interface that are called as
998    a result of an XMPP request may raise exceptions. Alternatively the
999    deferred returned by these methods may have their errback called. These are
1000    handled as follows:
1001
1002     - If the exception is an instance of L{error.StanzaError}, an error
1003       response iq is returned.
1004     - Any other exception is reported using L{log.msg}. An error response
1005       with the condition C{internal-server-error} is returned.
1006
1007    The default implementation of said methods raises an L{Unsupported}
1008    exception and are meant to be overridden.
1009
1010    @ivar discoIdentity: Service discovery identity as a dictionary with
1011                         keys C{'category'}, C{'type'} and C{'name'}.
1012    @ivar pubSubFeatures: List of supported publish-subscribe features for
1013                          service discovery, as C{str}.
1014    @type pubSubFeatures: C{list} or C{None}
1015    """
1016
1017    implements(IPubSubService, disco.IDisco)
1018
1019    iqHandlers = {
1020            '/*': '_onPubSubRequest',
1021            }
1022
1023    _legacyHandlers = {
1024        'publish': ('publish', ['sender', 'recipient',
1025                                'nodeIdentifier', 'items']),
1026        'subscribe': ('subscribe', ['sender', 'recipient',
1027                                    'nodeIdentifier', 'subscriber']),
1028        'unsubscribe': ('unsubscribe', ['sender', 'recipient',
1029                                        'nodeIdentifier', 'subscriber']),
1030        'subscriptions': ('subscriptions', ['sender', 'recipient']),
1031        'affiliations': ('affiliations', ['sender', 'recipient']),
1032        'create': ('create', ['sender', 'recipient', 'nodeIdentifier']),
1033        'getConfigurationOptions': ('getConfigurationOptions', []),
1034        'default': ('getDefaultConfiguration',
1035                    ['sender', 'recipient', 'nodeType']),
1036        'configureGet': ('getConfiguration', ['sender', 'recipient',
1037                                              'nodeIdentifier']),
1038        'configureSet': ('setConfiguration', ['sender', 'recipient',
1039                                              'nodeIdentifier', 'options']),
1040        'items': ('items', ['sender', 'recipient', 'nodeIdentifier',
1041                            'maxItems', 'itemIdentifiers']),
1042        'retract': ('retract', ['sender', 'recipient', 'nodeIdentifier',
1043                                'itemIdentifiers']),
1044        'purge': ('purge', ['sender', 'recipient', 'nodeIdentifier']),
1045        'delete': ('delete', ['sender', 'recipient', 'nodeIdentifier']),
1046    }
1047
1048    hideNodes = False
1049
1050    def __init__(self, resource=None):
1051        self.resource = resource
1052        self.discoIdentity = {'category': 'pubsub',
1053                              'type': 'service',
1054                              'name': 'Generic Publish-Subscribe Service'}
1055
1056        self.pubSubFeatures = []
1057
1058
1059    def connectionMade(self):
1060        self.xmlstream.addObserver(PUBSUB_REQUEST, self.handleRequest)
1061
1062
1063    def getDiscoInfo(self, requestor, target, nodeIdentifier=''):
1064        def toInfo(nodeInfo):
1065            if not nodeInfo:
1066                return
1067
1068            (nodeType, metaData) = nodeInfo['type'], nodeInfo['meta-data']
1069            info.append(disco.DiscoIdentity('pubsub', nodeType))
1070            if metaData:
1071                form = data_form.Form(formType="result",
1072                                      formNamespace=NS_PUBSUB_META_DATA)
1073                form.addField(
1074                        data_form.Field(
1075                            var='pubsub#node_type',
1076                            value=nodeType,
1077                            label='The type of node (collection or leaf)'
1078                        )
1079                )
1080
1081                for metaDatum in metaData:
1082                    form.addField(data_form.Field.fromDict(metaDatum))
1083
1084                info.append(form)
1085
1086            return
1087
1088        info = []
1089
1090        request = PubSubRequest('discoInfo')
1091
1092        if self.resource is not None:
1093            resource = self.resource.locateResource(request)
1094            identity = resource.discoIdentity
1095            features = resource.features
1096            getInfo = resource.getInfo
1097        else:
1098            category = self.discoIdentity['category']
1099            idType = self.discoIdentity['type']
1100            name = self.discoIdentity['name']
1101            identity = disco.DiscoIdentity(category, idType, name)
1102            features = self.pubSubFeatures
1103            getInfo = self.getNodeInfo
1104
1105        if not nodeIdentifier:
1106            info.append(identity)
1107            info.append(disco.DiscoFeature(disco.NS_DISCO_ITEMS))
1108            info.extend([disco.DiscoFeature("%s#%s" % (NS_PUBSUB, feature))
1109                         for feature in features])
1110
1111        d = defer.maybeDeferred(getInfo, requestor, target, nodeIdentifier or '')
1112        d.addCallback(toInfo)
1113        d.addErrback(log.err)
1114        d.addCallback(lambda _: info)
1115        return d
1116
1117
1118    def getDiscoItems(self, requestor, target, nodeIdentifier=''):
1119        if self.hideNodes:
1120            d = defer.succeed([])
1121        elif self.resource is not None:
1122            request = PubSubRequest('discoInfo')
1123            resource = self.resource.locateResource(request)
1124            d = resource.getNodes(requestor, target, nodeIdentifier)
1125        elif nodeIdentifier:
1126            d = self.getNodes(requestor, target)
1127        else:
1128            d = defer.succeed([])
1129
1130        d.addCallback(lambda nodes: [disco.DiscoItem(target, node)
1131                                     for node in nodes])
1132        return d
1133
1134
1135    def _onPubSubRequest(self, iq):
1136        request = PubSubRequest.fromElement(iq)
1137
1138        if self.resource is not None:
1139            resource = self.resource.locateResource(request)
1140        else:
1141            resource = self
1142
1143        # Preprocess the request, knowing the handling resource
1144        try:
1145            preProcessor = getattr(self, '_preProcess_%s' % request.verb)
1146        except AttributeError:
1147            pass
1148        else:
1149            request = preProcessor(resource, request)
1150            if request is None:
1151                return defer.succeed(None)
1152
1153        # Process the request itself,
1154        if resource is not self:
1155            try:
1156                handler = getattr(resource, request.verb)
1157            except AttributeError:
1158                # fix lookup feature
1159                text = "Request verb: %s" % request.verb
1160                return defer.fail(Unsupported('', text))
1161
1162            d = handler(request)
1163        else:
1164            handlerName, argNames = self._legacyHandlers[request.verb]
1165            handler = getattr(self, handlerName)
1166            args = [getattr(request, arg) for arg in argNames]
1167            if 'options' in argNames:
1168                args[argNames.index('options')] = request.options.getValues()
1169            d = handler(*args)
1170
1171        # If needed, translate the result into a response
1172        try:
1173            cb = getattr(self, '_toResponse_%s' % request.verb)
1174        except AttributeError:
1175            pass
1176        else:
1177            d.addCallback(cb, resource, request)
1178
1179        return d
1180
1181
1182    def _toResponse_subscribe(self, result, resource, request):
1183        response = domish.Element((NS_PUBSUB, "pubsub"))
1184        subscription = response.addChild(result.toElement())
1185        return response
1186
1187
1188    def _toResponse_subscriptions(self, result, resource, request):
1189        response = domish.Element((NS_PUBSUB, 'pubsub'))
1190        subscriptions = response.addElement('subscriptions')
1191        for subscription in result:
1192            subscriptions.addChild(subscription.toElement())
1193        return response
1194
1195
1196    def _toResponse_affiliations(self, result, resource, request):
1197        response = domish.Element((NS_PUBSUB, 'pubsub'))
1198        affiliations = response.addElement('affiliations')
1199
1200        for nodeIdentifier, affiliation in result:
1201            item = affiliations.addElement('affiliation')
1202            item['node'] = nodeIdentifier
1203            item['affiliation'] = affiliation
1204
1205        return response
1206
1207
1208    def _toResponse_create(self, result, resource, request):
1209        if not request.nodeIdentifier or request.nodeIdentifier != result:
1210            response = domish.Element((NS_PUBSUB, 'pubsub'))
1211            create = response.addElement('create')
1212            create['node'] = result
1213            return response
1214        else:
1215            return None
1216
1217
1218    def _formFromConfiguration(self, resource, values):
1219        fieldDefs = resource.getConfigurationOptions()
1220        form = data_form.Form(formType="form",
1221                              formNamespace=NS_PUBSUB_NODE_CONFIG)
1222        form.makeFields(values, fieldDefs)
1223        return form
1224
1225
1226    def _checkConfiguration(self, resource, form):
1227        fieldDefs = resource.getConfigurationOptions()
1228        form.typeCheck(fieldDefs, filterUnknown=True)
1229
1230
1231    def _preProcess_create(self, resource, request):
1232        if request.options:
1233            self._checkConfiguration(resource, request.options)
1234        return request
1235
1236
1237    def _preProcess_default(self, resource, request):
1238        if request.nodeType not in ('leaf', 'collection'):
1239            raise error.StanzaError('not-acceptable')
1240        else:
1241            return request
1242
1243
1244    def _toResponse_default(self, options, resource, request):
1245        response = domish.Element((NS_PUBSUB_OWNER, "pubsub"))
1246        default = response.addElement("default")
1247        form = self._formFromConfiguration(resource, options)
1248        default.addChild(form.toElement())
1249        return response
1250
1251
1252    def _toResponse_configureGet(self, options, resource, request):
1253        response = domish.Element((NS_PUBSUB_OWNER, "pubsub"))
1254        configure = response.addElement("configure")
1255        form = self._formFromConfiguration(resource, options)
1256        configure.addChild(form.toElement())
1257
1258        if request.nodeIdentifier:
1259            configure["node"] = request.nodeIdentifier
1260
1261        return response
1262
1263
1264    def _preProcess_configureSet(self, resource, request):
1265        if request.options.formType == 'cancel':
1266            return None
1267        else:
1268            self._checkConfiguration(resource, request.options)
1269            return request
1270
1271
1272    def _toResponse_items(self, result, resource, request):
1273        response = domish.Element((NS_PUBSUB, 'pubsub'))
1274        items = response.addElement('items')
1275        items["node"] = request.nodeIdentifier
1276
1277        for item in result:
1278            items.addChild(item)
1279
1280        return response
1281
1282
1283    def _createNotification(self, eventType, service, nodeIdentifier,
1284                                  subscriber, subscriptions=None):
1285        headers = []
1286
1287        if subscriptions:
1288            for subscription in subscriptions:
1289                if nodeIdentifier != subscription.nodeIdentifier:
1290                    headers.append(('Collection', subscription.nodeIdentifier))
1291
1292        message = domish.Element((None, "message"))
1293        message["from"] = service.full()
1294        message["to"] = subscriber.full()
1295        event = message.addElement((NS_PUBSUB_EVENT, "event"))
1296
1297        element = event.addElement(eventType)
1298        element["node"] = nodeIdentifier
1299
1300        if headers:
1301            message.addChild(shim.Headers(headers))
1302
1303        return message
1304
1305    # public methods
1306
1307    def notifyPublish(self, service, nodeIdentifier, notifications):
1308        for subscriber, subscriptions, items in notifications:
1309            message = self._createNotification('items', service,
1310                                               nodeIdentifier, subscriber,
1311                                               subscriptions)
1312            message.event.items.children = items
1313            self.send(message)
1314
1315
1316    def notifyDelete(self, service, nodeIdentifier, subscribers,
1317                           redirectURI=None):
1318        for subscriber in subscribers:
1319            message = self._createNotification('delete', service,
1320                                               nodeIdentifier,
1321                                               subscriber)
1322            if redirectURI:
1323                redirect = message.event.delete.addElement('redirect')
1324                redirect['uri'] = redirectURI
1325            self.send(message)
1326
1327
1328    def getNodeInfo(self, requestor, service, nodeIdentifier):
1329        return None
1330
1331
1332    def getNodes(self, requestor, service):
1333        return []
1334
1335
1336    def publish(self, requestor, service, nodeIdentifier, items):
1337        raise Unsupported('publish')
1338
1339
1340    def subscribe(self, requestor, service, nodeIdentifier, subscriber):
1341        raise Unsupported('subscribe')
1342
1343
1344    def unsubscribe(self, requestor, service, nodeIdentifier, subscriber):
1345        raise Unsupported('subscribe')
1346
1347
1348    def subscriptions(self, requestor, service):
1349        raise Unsupported('retrieve-subscriptions')
1350
1351
1352    def affiliations(self, requestor, service):
1353        raise Unsupported('retrieve-affiliations')
1354
1355
1356    def create(self, requestor, service, nodeIdentifier):
1357        raise Unsupported('create-nodes')
1358
1359
1360    def getConfigurationOptions(self):
1361        return {}
1362
1363
1364    def getDefaultConfiguration(self, requestor, service, nodeType):
1365        raise Unsupported('retrieve-default')
1366
1367
1368    def getConfiguration(self, requestor, service, nodeIdentifier):
1369        raise Unsupported('config-node')
1370
1371
1372    def setConfiguration(self, requestor, service, nodeIdentifier, options):
1373        raise Unsupported('config-node')
1374
1375
1376    def items(self, requestor, service, nodeIdentifier, maxItems,
1377                    itemIdentifiers):
1378        raise Unsupported('retrieve-items')
1379
1380
1381    def retract(self, requestor, service, nodeIdentifier, itemIdentifiers):
1382        raise Unsupported('retract-items')
1383
1384
1385    def purge(self, requestor, service, nodeIdentifier):
1386        raise Unsupported('purge-nodes')
1387
1388
1389    def delete(self, requestor, service, nodeIdentifier):
1390        raise Unsupported('delete-nodes')
1391
1392
1393
1394class PubSubResource(object):
1395
1396    implements(IPubSubResource)
1397
1398    features = []
1399    discoIdentity = disco.DiscoIdentity('pubsub',
1400                                        'service',
1401                                        'Publish-Subscribe Service')
1402
1403
1404    def locateResource(self, request):
1405        return self
1406
1407
1408    def getInfo(self, requestor, service, nodeIdentifier):
1409        return defer.succeed(None)
1410
1411
1412    def getNodes(self, requestor, service, nodeIdentifier):
1413        return defer.succeed([])
1414
1415
1416    def getConfigurationOptions(self):
1417        return {}
1418
1419
1420    def publish(self, request):
1421        return defer.fail(Unsupported('publish'))
1422
1423
1424    def subscribe(self, request):
1425        return defer.fail(Unsupported('subscribe'))
1426
1427
1428    def unsubscribe(self, request):
1429        return defer.fail(Unsupported('subscribe'))
1430
1431
1432    def subscriptions(self, request):
1433        return defer.fail(Unsupported('retrieve-subscriptions'))
1434
1435
1436    def affiliations(self, request):
1437        return defer.fail(Unsupported('retrieve-affiliations'))
1438
1439
1440    def create(self, request):
1441        return defer.fail(Unsupported('create-nodes'))
1442
1443
1444    def default(self, request):
1445        return defer.fail(Unsupported('retrieve-default'))
1446
1447
1448    def configureGet(self, request):
1449        return defer.fail(Unsupported('config-node'))
1450
1451
1452    def configureSet(self, request):
1453        return defer.fail(Unsupported('config-node'))
1454
1455
1456    def items(self, request):
1457        return defer.fail(Unsupported('retrieve-items'))
1458
1459
1460    def retract(self, request):
1461        return defer.fail(Unsupported('retract-items'))
1462
1463
1464    def purge(self, request):
1465        return defer.fail(Unsupported('purge-nodes'))
1466
1467
1468    def delete(self, request):
1469        return defer.fail(Unsupported('delete-nodes'))
1470
1471
1472    def affiliationsGet(self, request):
1473        return defer.fail(Unsupported('modify-affiliations'))
1474
1475
1476    def affiliationsSet(self, request):
1477        return defer.fail(Unsupported('modify-affiliations'))
1478
1479
1480    def subscriptionsGet(self, request):
1481        return defer.fail(Unsupported('manage-subscriptions'))
1482
1483
1484    def subscriptionsSet(self, request):
1485        return defer.fail(Unsupported('manage-subscriptions'))
Note: See TracBrowser for help on using the repository browser.