source: wokkel/pubsub.py @ 88:58b00aeed1a6

Last change on this file since 88:58b00aeed1a6 was 88:58b00aeed1a6, checked in by Ralph Meijer <ralphm@…>, 11 years ago

Service Discovery fixes for PubSubService?.

  • PubSubService? now properly implements IDisco.
  • Invalid returned values or exceptions raised by getInfo calls on PubSubResource? instances are now logged and ignored.
File size: 47.3 KB
Line 
1# -*- test-case-name: wokkel.test.test_pubsub -*-
2#
3# Copyright (c) 2003-2010 Ralph Meijer
4# See LICENSE for details.
5
6"""
7XMPP publish-subscribe protocol.
8
9This protocol is specified in
10U{XEP-0060<http://www.xmpp.org/extensions/xep-0060.html>}.
11"""
12
13from zope.interface import implements
14
15from twisted.internet import defer
16from twisted.python import log
17from twisted.words.protocols.jabber import jid, error
18from twisted.words.xish import domish
19
20from wokkel import disco, data_form, generic, shim
21from wokkel.compat import IQ
22from wokkel.subprotocols import IQHandlerMixin, XMPPHandler
23from wokkel.iwokkel import IPubSubClient, IPubSubService, IPubSubResource
24
25# Iq get and set XPath queries
26IQ_GET = '/iq[@type="get"]'
27IQ_SET = '/iq[@type="set"]'
28
29# Publish-subscribe namespaces
30NS_PUBSUB = 'http://jabber.org/protocol/pubsub'
31NS_PUBSUB_EVENT = NS_PUBSUB + '#event'
32NS_PUBSUB_ERRORS = NS_PUBSUB + '#errors'
33NS_PUBSUB_OWNER = NS_PUBSUB + "#owner"
34NS_PUBSUB_NODE_CONFIG = NS_PUBSUB + "#node_config"
35NS_PUBSUB_META_DATA = NS_PUBSUB + "#meta-data"
36NS_PUBSUB_SUBSCRIBE_OPTIONS = NS_PUBSUB + "#subscribe_options"
37
38# XPath to match pubsub requests
39PUBSUB_REQUEST = '/iq[@type="get" or @type="set"]/' + \
40                    'pubsub[@xmlns="' + NS_PUBSUB + '" or ' + \
41                           '@xmlns="' + NS_PUBSUB_OWNER + '"]'
42
43class SubscriptionPending(Exception):
44    """
45    Raised when the requested subscription is pending acceptance.
46    """
47
48
49
50class SubscriptionUnconfigured(Exception):
51    """
52    Raised when the requested subscription needs to be configured before
53    becoming active.
54    """
55
56
57
58class PubSubError(error.StanzaError):
59    """
60    Exception with publish-subscribe specific condition.
61    """
62    def __init__(self, condition, pubsubCondition, feature=None, text=None):
63        appCondition = domish.Element((NS_PUBSUB_ERRORS, pubsubCondition))
64        if feature:
65            appCondition['feature'] = feature
66        error.StanzaError.__init__(self, condition,
67                                         text=text,
68                                         appCondition=appCondition)
69
70
71
72class BadRequest(error.StanzaError):
73    """
74    Bad request stanza error.
75    """
76    def __init__(self, pubsubCondition=None, text=None):
77        if pubsubCondition:
78            appCondition = domish.Element((NS_PUBSUB_ERRORS, pubsubCondition))
79        else:
80            appCondition = None
81        error.StanzaError.__init__(self, 'bad-request',
82                                         text=text,
83                                         appCondition=appCondition)
84
85
86
87class Unsupported(PubSubError):
88    def __init__(self, feature, text=None):
89        self.feature = feature
90        PubSubError.__init__(self, 'feature-not-implemented',
91                                   'unsupported',
92                                   feature,
93                                   text)
94
95    def __str__(self):
96        message = PubSubError.__str__(self)
97        message += ', feature %r' % self.feature
98        return message
99
100
101class Subscription(object):
102    """
103    A subscription to a node.
104
105    @ivar nodeIdentifier: The identifier of the node subscribed to.  The root
106        node is denoted by C{None}.
107    @type nodeIdentifier: C{unicode}
108
109    @ivar subscriber: The subscribing entity.
110    @type subscriber: L{jid.JID}
111
112    @ivar state: The subscription state. One of C{'subscribed'}, C{'pending'},
113                 C{'unconfigured'}.
114    @type state: C{unicode}
115
116    @ivar options: Optional list of subscription options.
117    @type options: C{dict}
118
119    @ivar subscriptionIdentifier: Optional subscription identifier.
120    @type subscriptionIdentifier: C{unicode}
121    """
122
123    def __init__(self, nodeIdentifier, subscriber, state, options=None,
124                       subscriptionIdentifier=None):
125        self.nodeIdentifier = nodeIdentifier
126        self.subscriber = subscriber
127        self.state = state
128        self.options = options or {}
129        self.subscriptionIdentifier = subscriptionIdentifier
130
131
132    @staticmethod
133    def fromElement(element):
134        return Subscription(
135                element.getAttribute('node'),
136                jid.JID(element.getAttribute('jid')),
137                element.getAttribute('subscription'),
138                subscriptionIdentifier=element.getAttribute('subid'))
139
140
141    def toElement(self):
142        """
143        Return the DOM representation of this subscription.
144
145        @rtype: L{domish.Element}
146        """
147        element = domish.Element((None, 'subscription'))
148        if self.nodeIdentifier:
149            element['node'] = self.nodeIdentifier
150        element['jid'] = unicode(self.subscriber)
151        element['subscription'] = self.state
152        if self.subscriptionIdentifier:
153            element['subid'] = self.subscriptionIdentifier
154        return element
155
156
157
158class Item(domish.Element):
159    """
160    Publish subscribe item.
161
162    This behaves like an object providing L{domish.IElement}.
163
164    Item payload can be added using C{addChild} or C{addRawXml}, or using the
165    C{payload} keyword argument to C{__init__}.
166    """
167
168    def __init__(self, id=None, payload=None):
169        """
170        @param id: optional item identifier
171        @type id: L{unicode}
172        @param payload: optional item payload. Either as a domish element, or
173                        as serialized XML.
174        @type payload: object providing L{domish.IElement} or L{unicode}.
175        """
176
177        domish.Element.__init__(self, (None, 'item'))
178        if id is not None:
179            self['id'] = id
180        if payload is not None:
181            if isinstance(payload, basestring):
182                self.addRawXml(payload)
183            else:
184                self.addChild(payload)
185
186
187
188class PubSubRequest(generic.Stanza):
189    """
190    A publish-subscribe request.
191
192    The set of instance variables used depends on the type of request. If
193    a variable is not applicable or not passed in the request, its value is
194    C{None}.
195
196    @ivar verb: The type of publish-subscribe request. See L{_requestVerbMap}.
197    @type verb: C{str}.
198
199    @ivar affiliations: Affiliations to be modified.
200    @type affiliations: C{set}
201    @ivar items: The items to be published, as L{domish.Element}s.
202    @type items: C{list}
203    @ivar itemIdentifiers: Identifiers of the items to be retrieved or
204                           retracted.
205    @type itemIdentifiers: C{set}
206    @ivar maxItems: Maximum number of items to retrieve.
207    @type maxItems: C{int}.
208    @ivar nodeIdentifier: Identifier of the node the request is about.
209    @type nodeIdentifier: C{unicode}
210    @ivar nodeType: The type of node that should be created, or for which the
211                    configuration is retrieved. C{'leaf'} or C{'collection'}.
212    @type nodeType: C{str}
213    @ivar options: Configurations options for nodes, subscriptions and publish
214                   requests.
215    @type options: L{data_form.Form}
216    @ivar subscriber: The subscribing entity.
217    @type subscriber: L{JID}
218    @ivar subscriptionIdentifier: Identifier for a specific subscription.
219    @type subscriptionIdentifier: C{unicode}
220    @ivar subscriptions: Subscriptions to be modified, as a set of
221                         L{Subscription}.
222    @type subscriptions: C{set}
223    """
224
225    verb = None
226
227    affiliations = None
228    items = None
229    itemIdentifiers = None
230    maxItems = None
231    nodeIdentifier = None
232    nodeType = None
233    options = None
234    subscriber = None
235    subscriptionIdentifier = None
236    subscriptions = None
237
238    # Map request iq type and subelement name to request verb
239    _requestVerbMap = {
240        ('set', NS_PUBSUB, 'publish'): 'publish',
241        ('set', NS_PUBSUB, 'subscribe'): 'subscribe',
242        ('set', NS_PUBSUB, 'unsubscribe'): 'unsubscribe',
243        ('get', NS_PUBSUB, 'options'): 'optionsGet',
244        ('set', NS_PUBSUB, 'options'): 'optionsSet',
245        ('get', NS_PUBSUB, 'subscriptions'): 'subscriptions',
246        ('get', NS_PUBSUB, 'affiliations'): 'affiliations',
247        ('set', NS_PUBSUB, 'create'): 'create',
248        ('get', NS_PUBSUB_OWNER, 'default'): 'default',
249        ('get', NS_PUBSUB_OWNER, 'configure'): 'configureGet',
250        ('set', NS_PUBSUB_OWNER, 'configure'): 'configureSet',
251        ('get', NS_PUBSUB, 'items'): 'items',
252        ('set', NS_PUBSUB, 'retract'): 'retract',
253        ('set', NS_PUBSUB_OWNER, 'purge'): 'purge',
254        ('set', NS_PUBSUB_OWNER, 'delete'): 'delete',
255        ('get', NS_PUBSUB_OWNER, 'affiliations'): 'affiliationsGet',
256        ('set', NS_PUBSUB_OWNER, 'affiliations'): 'affiliationsSet',
257        ('get', NS_PUBSUB_OWNER, 'subscriptions'): 'subscriptionsGet',
258        ('set', NS_PUBSUB_OWNER, 'subscriptions'): 'subscriptionsSet',
259    }
260
261    # Map request verb to request iq type and subelement name
262    _verbRequestMap = dict(((v, k) for k, v in _requestVerbMap.iteritems()))
263
264    # Map request verb to parameter handler names
265    _parameters = {
266        'publish': ['node', 'items'],
267        'subscribe': ['nodeOrEmpty', 'jid', 'optionsWithSubscribe'],
268        'unsubscribe': ['nodeOrEmpty', 'jid', 'subidOrNone'],
269        'optionsGet': ['nodeOrEmpty', 'jid', 'subidOrNone'],
270        'optionsSet': ['nodeOrEmpty', 'jid', 'options', 'subidOrNone'],
271        'subscriptions': [],
272        'affiliations': [],
273        'create': ['nodeOrNone', 'configureOrNone'],
274        'default': ['default'],
275        'configureGet': ['nodeOrEmpty'],
276        'configureSet': ['nodeOrEmpty', 'configure'],
277        'items': ['node', 'maxItems', 'itemIdentifiers', 'subidOrNone'],
278        'retract': ['node', 'itemIdentifiers'],
279        'purge': ['node'],
280        'delete': ['node'],
281        'affiliationsGet': ['nodeOrEmpty'],
282        'affiliationsSet': [],
283        'subscriptionsGet': ['nodeOrEmpty'],
284        'subscriptionsSet': [],
285    }
286
287    def __init__(self, verb=None):
288        self.verb = verb
289
290
291    def _parse_node(self, verbElement):
292        """
293        Parse the required node identifier out of the verbElement.
294        """
295        try:
296            self.nodeIdentifier = verbElement["node"]
297        except KeyError:
298            raise BadRequest('nodeid-required')
299
300
301    def _render_node(self, verbElement):
302        """
303        Render the required node identifier on the verbElement.
304        """
305        if not self.nodeIdentifier:
306            raise Exception("Node identifier is required")
307
308        verbElement['node'] = self.nodeIdentifier
309
310
311    def _parse_nodeOrEmpty(self, verbElement):
312        """
313        Parse the node identifier out of the verbElement. May be empty.
314        """
315        self.nodeIdentifier = verbElement.getAttribute("node", '')
316
317
318    def _render_nodeOrEmpty(self, verbElement):
319        """
320        Render the node identifier on the verbElement. May be empty.
321        """
322        if self.nodeIdentifier:
323            verbElement['node'] = self.nodeIdentifier
324
325
326    def _parse_nodeOrNone(self, verbElement):
327        """
328        Parse the optional node identifier out of the verbElement.
329        """
330        self.nodeIdentifier = verbElement.getAttribute("node")
331
332
333    def _render_nodeOrNone(self, verbElement):
334        """
335        Render the optional node identifier on the verbElement.
336        """
337        if self.nodeIdentifier:
338            verbElement['node'] = self.nodeIdentifier
339
340
341    def _parse_items(self, verbElement):
342        """
343        Parse items out of the verbElement for publish requests.
344        """
345        self.items = []
346        for element in verbElement.elements():
347            if element.uri == NS_PUBSUB and element.name == 'item':
348                self.items.append(element)
349
350
351    def _render_items(self, verbElement):
352        """
353        Render items into the verbElement for publish requests.
354        """
355        if self.items:
356            for item in self.items:
357                verbElement.addChild(item)
358
359
360    def _parse_jid(self, verbElement):
361        """
362        Parse subscriber out of the verbElement for un-/subscribe requests.
363        """
364        try:
365            self.subscriber = jid.internJID(verbElement["jid"])
366        except KeyError:
367            raise BadRequest('jid-required')
368
369
370    def _render_jid(self, verbElement):
371        """
372        Render subscriber into the verbElement for un-/subscribe requests.
373        """
374        verbElement['jid'] = self.subscriber.full()
375
376
377    def _parse_default(self, verbElement):
378        """
379        Parse node type out of a request for the default node configuration.
380        """
381        form = data_form.findForm(verbElement, NS_PUBSUB_NODE_CONFIG)
382        if form and form.formType == 'submit':
383            values = form.getValues()
384            self.nodeType = values.get('pubsub#node_type', 'leaf')
385        else:
386            self.nodeType = 'leaf'
387
388
389    def _parse_configure(self, verbElement):
390        """
391        Parse options out of a request for setting the node configuration.
392        """
393        form = data_form.findForm(verbElement, NS_PUBSUB_NODE_CONFIG)
394        if form:
395            if form.formType in ('submit', 'cancel'):
396                self.options = form
397            else:
398                raise BadRequest(text=u"Unexpected form type '%s'" % form.formType)
399        else:
400            raise BadRequest(text="Missing configuration form")
401
402
403    def _parse_configureOrNone(self, verbElement):
404        """
405        Parse optional node configuration form in create request.
406        """
407        for element in verbElement.parent.elements():
408            if element.uri == NS_PUBSUB and element.name == 'configure':
409                form = data_form.findForm(element, NS_PUBSUB_NODE_CONFIG)
410                if form:
411                    if form.formType != 'submit':
412                        raise BadRequest(text=u"Unexpected form type '%s'" %
413                                              form.formType)
414                else:
415                    form = data_form.Form('submit',
416                                          formNamespace=NS_PUBSUB_NODE_CONFIG)
417                self.options = form
418
419
420    def _render_configureOrNone(self, verbElement):
421        """
422        Render optional node configuration form in create request.
423        """
424        if self.options is not None:
425            configure = verbElement.parent.addElement('configure')
426            configure.addChild(self.options.toElement())
427
428
429    def _parse_itemIdentifiers(self, verbElement):
430        """
431        Parse item identifiers out of items and retract requests.
432        """
433        self.itemIdentifiers = []
434        for element in verbElement.elements():
435            if element.uri == NS_PUBSUB and element.name == 'item':
436                try:
437                    self.itemIdentifiers.append(element["id"])
438                except KeyError:
439                    raise BadRequest()
440
441
442    def _render_itemIdentifiers(self, verbElement):
443        """
444        Render item identifiers into items and retract requests.
445        """
446        if self.itemIdentifiers:
447            for itemIdentifier in self.itemIdentifiers:
448                item = verbElement.addElement('item')
449                item['id'] = itemIdentifier
450
451
452    def _parse_maxItems(self, verbElement):
453        """
454        Parse maximum items out of an items request.
455        """
456        value = verbElement.getAttribute('max_items')
457
458        if value:
459            try:
460                self.maxItems = int(value)
461            except ValueError:
462                raise BadRequest(text="Field max_items requires a positive " +
463                                      "integer value")
464
465
466    def _render_maxItems(self, verbElement):
467        """
468        Render maximum items into an items request.
469        """
470        if self.maxItems:
471            verbElement['max_items'] = unicode(self.maxItems)
472
473
474    def _parse_subidOrNone(self, verbElement):
475        """
476        Parse subscription identifier out of a request.
477        """
478        self.subscriptionIdentifier = verbElement.getAttribute("subid")
479
480
481    def _render_subidOrNone(self, verbElement):
482        """
483        Render subscription identifier into a request.
484        """
485        if self.subscriptionIdentifier:
486            verbElement['subid'] = self.subscriptionIdentifier
487
488
489    def _parse_options(self, verbElement):
490        """
491        Parse options form out of a subscription options request.
492        """
493        form = data_form.findForm(verbElement, NS_PUBSUB_SUBSCRIBE_OPTIONS)
494        if form:
495            if form.formType in ('submit', 'cancel'):
496                self.options = form
497            else:
498                raise BadRequest(text=u"Unexpected form type '%s'" % form.formType)
499        else:
500            raise BadRequest(text="Missing options form")
501
502
503
504    def _render_options(self, verbElement):
505        verbElement.addChild(self.options.toElement())
506
507
508    def _parse_optionsWithSubscribe(self, verbElement):
509        for element in verbElement.parent.elements():
510            if element.name == 'options' and element.uri == NS_PUBSUB:
511                form = data_form.findForm(element,
512                                          NS_PUBSUB_SUBSCRIBE_OPTIONS)
513                if form:
514                    if form.formType != 'submit':
515                        raise BadRequest(text=u"Unexpected form type '%s'" %
516                                              form.formType)
517                else:
518                    form = data_form.Form('submit',
519                                          formNamespace=NS_PUBSUB_SUBSCRIBE_OPTIONS)
520                self.options = form
521
522
523    def _render_optionsWithSubscribe(self, verbElement):
524        if self.options:
525            optionsElement = verbElement.parent.addElement('options')
526            self._render_options(optionsElement)
527
528
529    def parseElement(self, element):
530        """
531        Parse the publish-subscribe verb and parameters out of a request.
532        """
533        generic.Stanza.parseElement(self, element)
534
535        verbs = []
536        children = []
537        for child in element.pubsub.elements():
538            key = (self.stanzaType, child.uri, child.name)
539            try:
540                verb = self._requestVerbMap[key]
541            except KeyError:
542                continue
543
544            verbs.append(verb)
545            children.append(child)
546
547        if not verbs:
548            raise NotImplementedError()
549
550        if len(verbs) > 1:
551            if 'optionsSet' in verbs and 'subscribe' in verbs:
552                self.verb = 'subscribe'
553                child = children[verbs.index('subscribe')]
554            else:
555                raise NotImplementedError()
556        else:
557            self.verb = verbs[0]
558
559        for parameter in self._parameters[self.verb]:
560            getattr(self, '_parse_%s' % parameter)(child)
561
562
563
564    def send(self, xs):
565        """
566        Send this request to its recipient.
567
568        This renders all of the relevant parameters for this specific
569        requests into an L{IQ}, and invoke its C{send} method.
570        This returns a deferred that fires upon reception of a response. See
571        L{IQ} for details.
572
573        @param xs: The XML stream to send the request on.
574        @type xs: L{xmlstream.XmlStream}
575        @rtype: L{defer.Deferred}.
576        """
577
578        try:
579            (self.stanzaType,
580             childURI,
581             childName) = self._verbRequestMap[self.verb]
582        except KeyError:
583            raise NotImplementedError()
584
585        iq = IQ(xs, self.stanzaType)
586        iq.addElement((childURI, 'pubsub'))
587        verbElement = iq.pubsub.addElement(childName)
588
589        if self.sender:
590            iq['from'] = self.sender.full()
591        if self.recipient:
592            iq['to'] = self.recipient.full()
593
594        for parameter in self._parameters[self.verb]:
595            getattr(self, '_render_%s' % parameter)(verbElement)
596
597        return iq.send()
598
599
600
601class PubSubEvent(object):
602    """
603    A publish subscribe event.
604
605    @param sender: The entity from which the notification was received.
606    @type sender: L{jid.JID}
607    @param recipient: The entity to which the notification was sent.
608    @type recipient: L{wokkel.pubsub.ItemsEvent}
609    @param nodeIdentifier: Identifier of the node the event pertains to.
610    @type nodeIdentifier: C{unicode}
611    @param headers: SHIM headers, see L{wokkel.shim.extractHeaders}.
612    @type headers: L{dict}
613    """
614
615    def __init__(self, sender, recipient, nodeIdentifier, headers):
616        self.sender = sender
617        self.recipient = recipient
618        self.nodeIdentifier = nodeIdentifier
619        self.headers = headers
620
621
622
623class ItemsEvent(PubSubEvent):
624    """
625    A publish-subscribe event that signifies new, updated and retracted items.
626
627    @param items: List of received items as domish elements.
628    @type items: C{list} of L{domish.Element}
629    """
630
631    def __init__(self, sender, recipient, nodeIdentifier, items, headers):
632        PubSubEvent.__init__(self, sender, recipient, nodeIdentifier, headers)
633        self.items = items
634
635
636
637class DeleteEvent(PubSubEvent):
638    """
639    A publish-subscribe event that signifies the deletion of a node.
640    """
641
642    redirectURI = None
643
644
645
646class PurgeEvent(PubSubEvent):
647    """
648    A publish-subscribe event that signifies the purging of a node.
649    """
650
651
652
653class PubSubClient(XMPPHandler):
654    """
655    Publish subscribe client protocol.
656    """
657
658    implements(IPubSubClient)
659
660    def connectionInitialized(self):
661        self.xmlstream.addObserver('/message/event[@xmlns="%s"]' %
662                                   NS_PUBSUB_EVENT, self._onEvent)
663
664
665    def _onEvent(self, message):
666        try:
667            sender = jid.JID(message["from"])
668            recipient = jid.JID(message["to"])
669        except KeyError:
670            return
671
672        actionElement = None
673        for element in message.event.elements():
674            if element.uri == NS_PUBSUB_EVENT:
675                actionElement = element
676
677        if not actionElement:
678            return
679
680        eventHandler = getattr(self, "_onEvent_%s" % actionElement.name, None)
681
682        if eventHandler:
683            headers = shim.extractHeaders(message)
684            eventHandler(sender, recipient, actionElement, headers)
685            message.handled = True
686
687
688    def _onEvent_items(self, sender, recipient, action, headers):
689        nodeIdentifier = action["node"]
690
691        items = [element for element in action.elements()
692                         if element.name in ('item', 'retract')]
693
694        event = ItemsEvent(sender, recipient, nodeIdentifier, items, headers)
695        self.itemsReceived(event)
696
697
698    def _onEvent_delete(self, sender, recipient, action, headers):
699        nodeIdentifier = action["node"]
700        event = DeleteEvent(sender, recipient, nodeIdentifier, headers)
701        if action.redirect:
702            event.redirectURI = action.redirect.getAttribute('uri')
703        self.deleteReceived(event)
704
705
706    def _onEvent_purge(self, sender, recipient, action, headers):
707        nodeIdentifier = action["node"]
708        event = PurgeEvent(sender, recipient, nodeIdentifier, headers)
709        self.purgeReceived(event)
710
711
712    def itemsReceived(self, event):
713        pass
714
715
716    def deleteReceived(self, event):
717        pass
718
719
720    def purgeReceived(self, event):
721        pass
722
723
724    def createNode(self, service, nodeIdentifier=None, options=None,
725                         sender=None):
726        """
727        Create a publish subscribe node.
728
729        @param service: The publish subscribe service to create the node at.
730        @type service: L{JID}
731        @param nodeIdentifier: Optional suggestion for the id of the node.
732        @type nodeIdentifier: C{unicode}
733        @param options: Optional node configuration options.
734        @type options: C{dict}
735        """
736        request = PubSubRequest('create')
737        request.recipient = service
738        request.nodeIdentifier = nodeIdentifier
739        request.sender = sender
740
741        if options:
742            form = data_form.Form(formType='submit',
743                                  formNamespace=NS_PUBSUB_NODE_CONFIG)
744            form.makeFields(options)
745            request.options = form
746
747        def cb(iq):
748            try:
749                new_node = iq.pubsub.create["node"]
750            except AttributeError:
751                # the suggested node identifier was accepted
752                new_node = nodeIdentifier
753            return new_node
754
755        d = request.send(self.xmlstream)
756        d.addCallback(cb)
757        return d
758
759
760    def deleteNode(self, service, nodeIdentifier, sender=None):
761        """
762        Delete a publish subscribe node.
763
764        @param service: The publish subscribe service to delete the node from.
765        @type service: L{JID}
766        @param nodeIdentifier: The identifier of the node.
767        @type nodeIdentifier: C{unicode}
768        """
769        request = PubSubRequest('delete')
770        request.recipient = service
771        request.nodeIdentifier = nodeIdentifier
772        request.sender = sender
773        return request.send(self.xmlstream)
774
775
776    def subscribe(self, service, nodeIdentifier, subscriber,
777                        options=None, sender=None):
778        """
779        Subscribe to a publish subscribe node.
780
781        @param service: The publish subscribe service that keeps the node.
782        @type service: L{JID}
783
784        @param nodeIdentifier: The identifier of the node.
785        @type nodeIdentifier: C{unicode}
786
787        @param subscriber: The entity to subscribe to the node. This entity
788            will get notifications of new published items.
789        @type subscriber: L{JID}
790
791        @param options: Subscription options.
792        @type options: C{dict}
793
794        @return: Deferred that fires with L{Subscription} or errbacks with
795            L{SubscriptionPending} or L{SubscriptionUnconfigured}.
796        @rtype: L{defer.Deferred}
797        """
798        request = PubSubRequest('subscribe')
799        request.recipient = service
800        request.nodeIdentifier = nodeIdentifier
801        request.subscriber = subscriber
802        request.sender = sender
803
804        if options:
805            form = data_form.Form(formType='submit',
806                                  formNamespace=NS_PUBSUB_SUBSCRIBE_OPTIONS)
807            form.makeFields(options)
808            request.options = form
809
810        def cb(iq):
811            subscription = Subscription.fromElement(iq.pubsub.subscription)
812
813            if subscription.state == 'pending':
814                raise SubscriptionPending()
815            elif subscription.state == 'unconfigured':
816                raise SubscriptionUnconfigured()
817            else:
818                # we assume subscription == 'subscribed'
819                # any other value would be invalid, but that should have
820                # yielded a stanza error.
821                return subscription
822
823        d = request.send(self.xmlstream)
824        d.addCallback(cb)
825        return d
826
827
828    def unsubscribe(self, service, nodeIdentifier, subscriber,
829                          subscriptionIdentifier=None, sender=None):
830        """
831        Unsubscribe from a publish subscribe node.
832
833        @param service: The publish subscribe service that keeps the node.
834        @type service: L{JID}
835
836        @param nodeIdentifier: The identifier of the node.
837        @type nodeIdentifier: C{unicode}
838
839        @param subscriber: The entity to unsubscribe from the node.
840        @type subscriber: L{JID}
841
842        @param subscriptionIdentifier: Optional subscription identifier.
843        @type subscriptionIdentifier: C{unicode}
844        """
845        request = PubSubRequest('unsubscribe')
846        request.recipient = service
847        request.nodeIdentifier = nodeIdentifier
848        request.subscriber = subscriber
849        request.subscriptionIdentifier = subscriptionIdentifier
850        request.sender = sender
851        return request.send(self.xmlstream)
852
853
854    def publish(self, service, nodeIdentifier, items=None, sender=None):
855        """
856        Publish to a publish subscribe node.
857
858        @param service: The publish subscribe service that keeps the node.
859        @type service: L{JID}
860        @param nodeIdentifier: The identifier of the node.
861        @type nodeIdentifier: C{unicode}
862        @param items: Optional list of L{Item}s to publish.
863        @type items: C{list}
864        """
865        request = PubSubRequest('publish')
866        request.recipient = service
867        request.nodeIdentifier = nodeIdentifier
868        request.items = items
869        request.sender = sender
870        return request.send(self.xmlstream)
871
872
873    def items(self, service, nodeIdentifier, maxItems=None,
874              subscriptionIdentifier=None, sender=None):
875        """
876        Retrieve previously published items from a publish subscribe node.
877
878        @param service: The publish subscribe service that keeps the node.
879        @type service: L{JID}
880
881        @param nodeIdentifier: The identifier of the node.
882        @type nodeIdentifier: C{unicode}
883
884        @param maxItems: Optional limit on the number of retrieved items.
885        @type maxItems: C{int}
886
887        @param subscriptionIdentifier: Optional subscription identifier. In
888            case the node has been subscribed to multiple times, this narrows
889            the results to the specific subscription.
890        @type subscriptionIdentifier: C{unicode}
891        """
892        request = PubSubRequest('items')
893        request.recipient = service
894        request.nodeIdentifier = nodeIdentifier
895        if maxItems:
896            request.maxItems = str(int(maxItems))
897        request.subscriptionIdentifier = subscriptionIdentifier
898        request.sender = sender
899
900        def cb(iq):
901            items = []
902            for element in iq.pubsub.items.elements():
903                if element.uri == NS_PUBSUB and element.name == 'item':
904                    items.append(element)
905            return items
906
907        d = request.send(self.xmlstream)
908        d.addCallback(cb)
909        return d
910
911
912    def getOptions(self, service, nodeIdentifier, subscriber,
913                         subscriptionIdentifier=None, sender=None):
914        """
915        Get subscription options.
916
917        @param service: The publish subscribe service that keeps the node.
918        @type service: L{JID}
919
920        @param nodeIdentifier: The identifier of the node.
921        @type nodeIdentifier: C{unicode}
922
923        @param subscriber: The entity subscribed to the node.
924        @type subscriber: L{JID}
925
926        @param subscriptionIdentifier: Optional subscription identifier.
927        @type subscriptionIdentifier: C{unicode}
928
929        @rtype: L{data_form.Form}
930        """
931        request = PubSubRequest('optionsGet')
932        request.recipient = service
933        request.nodeIdentifier = nodeIdentifier
934        request.subscriber = subscriber
935        request.subscriptionIdentifier = subscriptionIdentifier
936        request.sender = sender
937
938        def cb(iq):
939            form = data_form.findForm(iq.pubsub.options,
940                                      NS_PUBSUB_SUBSCRIBE_OPTIONS)
941            form.typeCheck()
942            return form
943
944        d = request.send(self.xmlstream)
945        d.addCallback(cb)
946        return d
947
948
949    def setOptions(self, service, nodeIdentifier, subscriber,
950                         options, subscriptionIdentifier=None, sender=None):
951        """
952        Set subscription options.
953
954        @param service: The publish subscribe service that keeps the node.
955        @type service: L{JID}
956
957        @param nodeIdentifier: The identifier of the node.
958        @type nodeIdentifier: C{unicode}
959
960        @param subscriber: The entity subscribed to the node.
961        @type subscriber: L{JID}
962
963        @param options: Subscription options.
964        @type options: C{dict}.
965
966        @param subscriptionIdentifier: Optional subscription identifier.
967        @type subscriptionIdentifier: C{unicode}
968        """
969        request = PubSubRequest('optionsSet')
970        request.recipient = service
971        request.nodeIdentifier = nodeIdentifier
972        request.subscriber = subscriber
973        request.subscriptionIdentifier = subscriptionIdentifier
974        request.sender = sender
975
976        form = data_form.Form(formType='submit',
977                              formNamespace=NS_PUBSUB_SUBSCRIBE_OPTIONS)
978        form.makeFields(options)
979        request.options = form
980
981        d = request.send(self.xmlstream)
982        return d
983
984
985
986class PubSubService(XMPPHandler, IQHandlerMixin):
987    """
988    Protocol implementation for a XMPP Publish Subscribe Service.
989
990    The word Service here is used as taken from the Publish Subscribe
991    specification. It is the party responsible for keeping nodes and their
992    subscriptions, and sending out notifications.
993
994    Methods from the L{IPubSubService} interface that are called as
995    a result of an XMPP request may raise exceptions. Alternatively the
996    deferred returned by these methods may have their errback called. These are
997    handled as follows:
998
999     - If the exception is an instance of L{error.StanzaError}, an error
1000       response iq is returned.
1001     - Any other exception is reported using L{log.msg}. An error response
1002       with the condition C{internal-server-error} is returned.
1003
1004    The default implementation of said methods raises an L{Unsupported}
1005    exception and are meant to be overridden.
1006
1007    @ivar discoIdentity: Service discovery identity as a dictionary with
1008                         keys C{'category'}, C{'type'} and C{'name'}.
1009    @ivar pubSubFeatures: List of supported publish-subscribe features for
1010                          service discovery, as C{str}.
1011    @type pubSubFeatures: C{list} or C{None}
1012    """
1013
1014    implements(IPubSubService, disco.IDisco)
1015
1016    iqHandlers = {
1017            '/*': '_onPubSubRequest',
1018            }
1019
1020    _legacyHandlers = {
1021        'publish': ('publish', ['sender', 'recipient',
1022                                'nodeIdentifier', 'items']),
1023        'subscribe': ('subscribe', ['sender', 'recipient',
1024                                    'nodeIdentifier', 'subscriber']),
1025        'unsubscribe': ('unsubscribe', ['sender', 'recipient',
1026                                        'nodeIdentifier', 'subscriber']),
1027        'subscriptions': ('subscriptions', ['sender', 'recipient']),
1028        'affiliations': ('affiliations', ['sender', 'recipient']),
1029        'create': ('create', ['sender', 'recipient', 'nodeIdentifier']),
1030        'getConfigurationOptions': ('getConfigurationOptions', []),
1031        'default': ('getDefaultConfiguration',
1032                    ['sender', 'recipient', 'nodeType']),
1033        'configureGet': ('getConfiguration', ['sender', 'recipient',
1034                                              'nodeIdentifier']),
1035        'configureSet': ('setConfiguration', ['sender', 'recipient',
1036                                              'nodeIdentifier', 'options']),
1037        'items': ('items', ['sender', 'recipient', 'nodeIdentifier',
1038                            'maxItems', 'itemIdentifiers']),
1039        'retract': ('retract', ['sender', 'recipient', 'nodeIdentifier',
1040                                'itemIdentifiers']),
1041        'purge': ('purge', ['sender', 'recipient', 'nodeIdentifier']),
1042        'delete': ('delete', ['sender', 'recipient', 'nodeIdentifier']),
1043    }
1044
1045    hideNodes = False
1046
1047    def __init__(self, resource=None):
1048        self.resource = resource
1049        self.discoIdentity = {'category': 'pubsub',
1050                              'type': 'service',
1051                              'name': 'Generic Publish-Subscribe Service'}
1052
1053        self.pubSubFeatures = []
1054
1055
1056    def connectionMade(self):
1057        self.xmlstream.addObserver(PUBSUB_REQUEST, self.handleRequest)
1058
1059
1060    def getDiscoInfo(self, requestor, target, nodeIdentifier=''):
1061        def toInfo(nodeInfo):
1062            if not nodeInfo:
1063                return
1064
1065            (nodeType, metaData) = nodeInfo['type'], nodeInfo['meta-data']
1066            info.append(disco.DiscoIdentity('pubsub', nodeType))
1067            if metaData:
1068                form = data_form.Form(formType="result",
1069                                      formNamespace=NS_PUBSUB_META_DATA)
1070                form.addField(
1071                        data_form.Field(
1072                            var='pubsub#node_type',
1073                            value=nodeType,
1074                            label='The type of node (collection or leaf)'
1075                        )
1076                )
1077
1078                for metaDatum in metaData:
1079                    form.addField(data_form.Field.fromDict(metaDatum))
1080
1081                info.append(form)
1082
1083            return
1084
1085        info = []
1086
1087        request = PubSubRequest('discoInfo')
1088
1089        if self.resource is not None:
1090            resource = self.resource.locateResource(request)
1091            identity = resource.discoIdentity
1092            features = resource.features
1093            getInfo = resource.getInfo
1094        else:
1095            category = self.discoIdentity['category']
1096            idType = self.discoIdentity['type']
1097            name = self.discoIdentity['name']
1098            identity = disco.DiscoIdentity(category, idType, name)
1099            features = self.pubSubFeatures
1100            getInfo = self.getNodeInfo
1101
1102        if not nodeIdentifier:
1103            info.append(identity)
1104            info.append(disco.DiscoFeature(disco.NS_DISCO_ITEMS))
1105            info.extend([disco.DiscoFeature("%s#%s" % (NS_PUBSUB, feature))
1106                         for feature in features])
1107
1108        d = defer.maybeDeferred(getInfo, requestor, target, nodeIdentifier or '')
1109        d.addCallback(toInfo)
1110        d.addErrback(log.err)
1111        d.addCallback(lambda _: info)
1112        return d
1113
1114
1115    def getDiscoItems(self, requestor, target, nodeIdentifier=''):
1116        if self.hideNodes:
1117            d = defer.succeed([])
1118        elif self.resource is not None:
1119            request = PubSubRequest('discoInfo')
1120            resource = self.resource.locateResource(request)
1121            d = resource.getNodes(requestor, target, nodeIdentifier)
1122        elif nodeIdentifier:
1123            d = self.getNodes(requestor, target)
1124        else:
1125            d = defer.succeed([])
1126
1127        d.addCallback(lambda nodes: [disco.DiscoItem(target, node)
1128                                     for node in nodes])
1129        return d
1130
1131
1132    def _onPubSubRequest(self, iq):
1133        request = PubSubRequest.fromElement(iq)
1134
1135        if self.resource is not None:
1136            resource = self.resource.locateResource(request)
1137        else:
1138            resource = self
1139
1140        # Preprocess the request, knowing the handling resource
1141        try:
1142            preProcessor = getattr(self, '_preProcess_%s' % request.verb)
1143        except AttributeError:
1144            pass
1145        else:
1146            request = preProcessor(resource, request)
1147            if request is None:
1148                return defer.succeed(None)
1149
1150        # Process the request itself,
1151        if resource is not self:
1152            try:
1153                handler = getattr(resource, request.verb)
1154            except AttributeError:
1155                # fix lookup feature
1156                text = "Request verb: %s" % request.verb
1157                return defer.fail(Unsupported('', text))
1158
1159            d = handler(request)
1160        else:
1161            handlerName, argNames = self._legacyHandlers[request.verb]
1162            handler = getattr(self, handlerName)
1163            args = [getattr(request, arg) for arg in argNames]
1164            if 'options' in argNames:
1165                args[argNames.index('options')] = request.options.getValues()
1166            d = handler(*args)
1167
1168        # If needed, translate the result into a response
1169        try:
1170            cb = getattr(self, '_toResponse_%s' % request.verb)
1171        except AttributeError:
1172            pass
1173        else:
1174            d.addCallback(cb, resource, request)
1175
1176        return d
1177
1178
1179    def _toResponse_subscribe(self, result, resource, request):
1180        response = domish.Element((NS_PUBSUB, "pubsub"))
1181        subscription = response.addChild(result.toElement())
1182        return response
1183
1184
1185    def _toResponse_subscriptions(self, result, resource, request):
1186        response = domish.Element((NS_PUBSUB, 'pubsub'))
1187        subscriptions = response.addElement('subscriptions')
1188        for subscription in result:
1189            subscriptions.addChild(subscription.toElement())
1190        return response
1191
1192
1193    def _toResponse_affiliations(self, result, resource, request):
1194        response = domish.Element((NS_PUBSUB, 'pubsub'))
1195        affiliations = response.addElement('affiliations')
1196
1197        for nodeIdentifier, affiliation in result:
1198            item = affiliations.addElement('affiliation')
1199            item['node'] = nodeIdentifier
1200            item['affiliation'] = affiliation
1201
1202        return response
1203
1204
1205    def _toResponse_create(self, result, resource, request):
1206        if not request.nodeIdentifier or request.nodeIdentifier != result:
1207            response = domish.Element((NS_PUBSUB, 'pubsub'))
1208            create = response.addElement('create')
1209            create['node'] = result
1210            return response
1211        else:
1212            return None
1213
1214
1215    def _formFromConfiguration(self, resource, values):
1216        fieldDefs = resource.getConfigurationOptions()
1217        form = data_form.Form(formType="form",
1218                              formNamespace=NS_PUBSUB_NODE_CONFIG)
1219        form.makeFields(values, fieldDefs)
1220        return form
1221
1222
1223    def _checkConfiguration(self, resource, form):
1224        fieldDefs = resource.getConfigurationOptions()
1225        form.typeCheck(fieldDefs, filterUnknown=True)
1226
1227
1228    def _preProcess_create(self, resource, request):
1229        if request.options:
1230            self._checkConfiguration(resource, request.options)
1231        return request
1232
1233
1234    def _preProcess_default(self, resource, request):
1235        if request.nodeType not in ('leaf', 'collection'):
1236            raise error.StanzaError('not-acceptable')
1237        else:
1238            return request
1239
1240
1241    def _toResponse_default(self, options, resource, request):
1242        response = domish.Element((NS_PUBSUB_OWNER, "pubsub"))
1243        default = response.addElement("default")
1244        form = self._formFromConfiguration(resource, options)
1245        default.addChild(form.toElement())
1246        return response
1247
1248
1249    def _toResponse_configureGet(self, options, resource, request):
1250        response = domish.Element((NS_PUBSUB_OWNER, "pubsub"))
1251        configure = response.addElement("configure")
1252        form = self._formFromConfiguration(resource, options)
1253        configure.addChild(form.toElement())
1254
1255        if request.nodeIdentifier:
1256            configure["node"] = request.nodeIdentifier
1257
1258        return response
1259
1260
1261    def _preProcess_configureSet(self, resource, request):
1262        if request.options.formType == 'cancel':
1263            return None
1264        else:
1265            self._checkConfiguration(resource, request.options)
1266            return request
1267
1268
1269    def _toResponse_items(self, result, resource, request):
1270        response = domish.Element((NS_PUBSUB, 'pubsub'))
1271        items = response.addElement('items')
1272        items["node"] = request.nodeIdentifier
1273
1274        for item in result:
1275            items.addChild(item)
1276
1277        return response
1278
1279
1280    def _createNotification(self, eventType, service, nodeIdentifier,
1281                                  subscriber, subscriptions=None):
1282        headers = []
1283
1284        if subscriptions:
1285            for subscription in subscriptions:
1286                if nodeIdentifier != subscription.nodeIdentifier:
1287                    headers.append(('Collection', subscription.nodeIdentifier))
1288
1289        message = domish.Element((None, "message"))
1290        message["from"] = service.full()
1291        message["to"] = subscriber.full()
1292        event = message.addElement((NS_PUBSUB_EVENT, "event"))
1293
1294        element = event.addElement(eventType)
1295        element["node"] = nodeIdentifier
1296
1297        if headers:
1298            message.addChild(shim.Headers(headers))
1299
1300        return message
1301
1302    # public methods
1303
1304    def notifyPublish(self, service, nodeIdentifier, notifications):
1305        for subscriber, subscriptions, items in notifications:
1306            message = self._createNotification('items', service,
1307                                               nodeIdentifier, subscriber,
1308                                               subscriptions)
1309            message.event.items.children = items
1310            self.send(message)
1311
1312
1313    def notifyDelete(self, service, nodeIdentifier, subscribers,
1314                           redirectURI=None):
1315        for subscriber in subscribers:
1316            message = self._createNotification('delete', service,
1317                                               nodeIdentifier,
1318                                               subscriber)
1319            if redirectURI:
1320                redirect = message.event.delete.addElement('redirect')
1321                redirect['uri'] = redirectURI
1322            self.send(message)
1323
1324
1325    def getNodeInfo(self, requestor, service, nodeIdentifier):
1326        return None
1327
1328
1329    def getNodes(self, requestor, service):
1330        return []
1331
1332
1333    def publish(self, requestor, service, nodeIdentifier, items):
1334        raise Unsupported('publish')
1335
1336
1337    def subscribe(self, requestor, service, nodeIdentifier, subscriber):
1338        raise Unsupported('subscribe')
1339
1340
1341    def unsubscribe(self, requestor, service, nodeIdentifier, subscriber):
1342        raise Unsupported('subscribe')
1343
1344
1345    def subscriptions(self, requestor, service):
1346        raise Unsupported('retrieve-subscriptions')
1347
1348
1349    def affiliations(self, requestor, service):
1350        raise Unsupported('retrieve-affiliations')
1351
1352
1353    def create(self, requestor, service, nodeIdentifier):
1354        raise Unsupported('create-nodes')
1355
1356
1357    def getConfigurationOptions(self):
1358        return {}
1359
1360
1361    def getDefaultConfiguration(self, requestor, service, nodeType):
1362        raise Unsupported('retrieve-default')
1363
1364
1365    def getConfiguration(self, requestor, service, nodeIdentifier):
1366        raise Unsupported('config-node')
1367
1368
1369    def setConfiguration(self, requestor, service, nodeIdentifier, options):
1370        raise Unsupported('config-node')
1371
1372
1373    def items(self, requestor, service, nodeIdentifier, maxItems,
1374                    itemIdentifiers):
1375        raise Unsupported('retrieve-items')
1376
1377
1378    def retract(self, requestor, service, nodeIdentifier, itemIdentifiers):
1379        raise Unsupported('retract-items')
1380
1381
1382    def purge(self, requestor, service, nodeIdentifier):
1383        raise Unsupported('purge-nodes')
1384
1385
1386    def delete(self, requestor, service, nodeIdentifier):
1387        raise Unsupported('delete-nodes')
1388
1389
1390
1391class PubSubResource(object):
1392
1393    implements(IPubSubResource)
1394
1395    features = []
1396    discoIdentity = disco.DiscoIdentity('pubsub',
1397                                        'service',
1398                                        'Publish-Subscribe Service')
1399
1400
1401    def locateResource(self, request):
1402        return self
1403
1404
1405    def getInfo(self, requestor, service, nodeIdentifier):
1406        return defer.succeed(None)
1407
1408
1409    def getNodes(self, requestor, service, nodeIdentifier):
1410        return defer.succeed([])
1411
1412
1413    def getConfigurationOptions(self):
1414        return {}
1415
1416
1417    def publish(self, request):
1418        return defer.fail(Unsupported('publish'))
1419
1420
1421    def subscribe(self, request):
1422        return defer.fail(Unsupported('subscribe'))
1423
1424
1425    def unsubscribe(self, request):
1426        return defer.fail(Unsupported('subscribe'))
1427
1428
1429    def subscriptions(self, request):
1430        return defer.fail(Unsupported('retrieve-subscriptions'))
1431
1432
1433    def affiliations(self, request):
1434        return defer.fail(Unsupported('retrieve-affiliations'))
1435
1436
1437    def create(self, request):
1438        return defer.fail(Unsupported('create-nodes'))
1439
1440
1441    def default(self, request):
1442        return defer.fail(Unsupported('retrieve-default'))
1443
1444
1445    def configureGet(self, request):
1446        return defer.fail(Unsupported('config-node'))
1447
1448
1449    def configureSet(self, request):
1450        return defer.fail(Unsupported('config-node'))
1451
1452
1453    def items(self, request):
1454        return defer.fail(Unsupported('retrieve-items'))
1455
1456
1457    def retract(self, request):
1458        return defer.fail(Unsupported('retract-items'))
1459
1460
1461    def purge(self, request):
1462        return defer.fail(Unsupported('purge-nodes'))
1463
1464
1465    def delete(self, request):
1466        return defer.fail(Unsupported('delete-nodes'))
1467
1468
1469    def affiliationsGet(self, request):
1470        return defer.fail(Unsupported('modify-affiliations'))
1471
1472
1473    def affiliationsSet(self, request):
1474        return defer.fail(Unsupported('modify-affiliations'))
1475
1476
1477    def subscriptionsGet(self, request):
1478        return defer.fail(Unsupported('manage-subscriptions'))
1479
1480
1481    def subscriptionsSet(self, request):
1482        return defer.fail(Unsupported('manage-subscriptions'))
Note: See TracBrowser for help on using the repository browser.