source: wokkel/pubsub.py @ 87:c9d1bf0b783d

Last change on this file since 87:c9d1bf0b783d was 87:c9d1bf0b783d, checked in by Ralph Meijer <ralphm@…>, 11 years ago

Correctly report disco identity for PubSubService? without PubSubResource?.

Author: ralphm.
Fixes #64.

File size: 47.2 KB
Line 
1# -*- test-case-name: wokkel.test.test_pubsub -*-
2#
3# Copyright (c) 2003-2010 Ralph Meijer
4# See LICENSE for details.
5
6"""
7XMPP publish-subscribe protocol.
8
9This protocol is specified in
10U{XEP-0060<http://www.xmpp.org/extensions/xep-0060.html>}.
11"""
12
13from zope.interface import implements
14
15from twisted.internet import defer
16from twisted.python import log
17from twisted.words.protocols.jabber import jid, error
18from twisted.words.xish import domish
19
20from wokkel import disco, data_form, generic, shim
21from wokkel.compat import IQ
22from wokkel.subprotocols import IQHandlerMixin, XMPPHandler
23from wokkel.iwokkel import IPubSubClient, IPubSubService, IPubSubResource
24
25# Iq get and set XPath queries
26IQ_GET = '/iq[@type="get"]'
27IQ_SET = '/iq[@type="set"]'
28
29# Publish-subscribe namespaces
30NS_PUBSUB = 'http://jabber.org/protocol/pubsub'
31NS_PUBSUB_EVENT = NS_PUBSUB + '#event'
32NS_PUBSUB_ERRORS = NS_PUBSUB + '#errors'
33NS_PUBSUB_OWNER = NS_PUBSUB + "#owner"
34NS_PUBSUB_NODE_CONFIG = NS_PUBSUB + "#node_config"
35NS_PUBSUB_META_DATA = NS_PUBSUB + "#meta-data"
36NS_PUBSUB_SUBSCRIBE_OPTIONS = NS_PUBSUB + "#subscribe_options"
37
38# XPath to match pubsub requests
39PUBSUB_REQUEST = '/iq[@type="get" or @type="set"]/' + \
40                    'pubsub[@xmlns="' + NS_PUBSUB + '" or ' + \
41                           '@xmlns="' + NS_PUBSUB_OWNER + '"]'
42
43class SubscriptionPending(Exception):
44    """
45    Raised when the requested subscription is pending acceptance.
46    """
47
48
49
50class SubscriptionUnconfigured(Exception):
51    """
52    Raised when the requested subscription needs to be configured before
53    becoming active.
54    """
55
56
57
58class PubSubError(error.StanzaError):
59    """
60    Exception with publish-subscribe specific condition.
61    """
62    def __init__(self, condition, pubsubCondition, feature=None, text=None):
63        appCondition = domish.Element((NS_PUBSUB_ERRORS, pubsubCondition))
64        if feature:
65            appCondition['feature'] = feature
66        error.StanzaError.__init__(self, condition,
67                                         text=text,
68                                         appCondition=appCondition)
69
70
71
72class BadRequest(error.StanzaError):
73    """
74    Bad request stanza error.
75    """
76    def __init__(self, pubsubCondition=None, text=None):
77        if pubsubCondition:
78            appCondition = domish.Element((NS_PUBSUB_ERRORS, pubsubCondition))
79        else:
80            appCondition = None
81        error.StanzaError.__init__(self, 'bad-request',
82                                         text=text,
83                                         appCondition=appCondition)
84
85
86
87class Unsupported(PubSubError):
88    def __init__(self, feature, text=None):
89        self.feature = feature
90        PubSubError.__init__(self, 'feature-not-implemented',
91                                   'unsupported',
92                                   feature,
93                                   text)
94
95    def __str__(self):
96        message = PubSubError.__str__(self)
97        message += ', feature %r' % self.feature
98        return message
99
100
101class Subscription(object):
102    """
103    A subscription to a node.
104
105    @ivar nodeIdentifier: The identifier of the node subscribed to.  The root
106        node is denoted by C{None}.
107    @type nodeIdentifier: C{unicode}
108
109    @ivar subscriber: The subscribing entity.
110    @type subscriber: L{jid.JID}
111
112    @ivar state: The subscription state. One of C{'subscribed'}, C{'pending'},
113                 C{'unconfigured'}.
114    @type state: C{unicode}
115
116    @ivar options: Optional list of subscription options.
117    @type options: C{dict}
118
119    @ivar subscriptionIdentifier: Optional subscription identifier.
120    @type subscriptionIdentifier: C{unicode}
121    """
122
123    def __init__(self, nodeIdentifier, subscriber, state, options=None,
124                       subscriptionIdentifier=None):
125        self.nodeIdentifier = nodeIdentifier
126        self.subscriber = subscriber
127        self.state = state
128        self.options = options or {}
129        self.subscriptionIdentifier = subscriptionIdentifier
130
131
132    @staticmethod
133    def fromElement(element):
134        return Subscription(
135                element.getAttribute('node'),
136                jid.JID(element.getAttribute('jid')),
137                element.getAttribute('subscription'),
138                subscriptionIdentifier=element.getAttribute('subid'))
139
140
141    def toElement(self):
142        """
143        Return the DOM representation of this subscription.
144
145        @rtype: L{domish.Element}
146        """
147        element = domish.Element((None, 'subscription'))
148        if self.nodeIdentifier:
149            element['node'] = self.nodeIdentifier
150        element['jid'] = unicode(self.subscriber)
151        element['subscription'] = self.state
152        if self.subscriptionIdentifier:
153            element['subid'] = self.subscriptionIdentifier
154        return element
155
156
157
158class Item(domish.Element):
159    """
160    Publish subscribe item.
161
162    This behaves like an object providing L{domish.IElement}.
163
164    Item payload can be added using C{addChild} or C{addRawXml}, or using the
165    C{payload} keyword argument to C{__init__}.
166    """
167
168    def __init__(self, id=None, payload=None):
169        """
170        @param id: optional item identifier
171        @type id: L{unicode}
172        @param payload: optional item payload. Either as a domish element, or
173                        as serialized XML.
174        @type payload: object providing L{domish.IElement} or L{unicode}.
175        """
176
177        domish.Element.__init__(self, (None, 'item'))
178        if id is not None:
179            self['id'] = id
180        if payload is not None:
181            if isinstance(payload, basestring):
182                self.addRawXml(payload)
183            else:
184                self.addChild(payload)
185
186
187
188class PubSubRequest(generic.Stanza):
189    """
190    A publish-subscribe request.
191
192    The set of instance variables used depends on the type of request. If
193    a variable is not applicable or not passed in the request, its value is
194    C{None}.
195
196    @ivar verb: The type of publish-subscribe request. See L{_requestVerbMap}.
197    @type verb: C{str}.
198
199    @ivar affiliations: Affiliations to be modified.
200    @type affiliations: C{set}
201    @ivar items: The items to be published, as L{domish.Element}s.
202    @type items: C{list}
203    @ivar itemIdentifiers: Identifiers of the items to be retrieved or
204                           retracted.
205    @type itemIdentifiers: C{set}
206    @ivar maxItems: Maximum number of items to retrieve.
207    @type maxItems: C{int}.
208    @ivar nodeIdentifier: Identifier of the node the request is about.
209    @type nodeIdentifier: C{unicode}
210    @ivar nodeType: The type of node that should be created, or for which the
211                    configuration is retrieved. C{'leaf'} or C{'collection'}.
212    @type nodeType: C{str}
213    @ivar options: Configurations options for nodes, subscriptions and publish
214                   requests.
215    @type options: L{data_form.Form}
216    @ivar subscriber: The subscribing entity.
217    @type subscriber: L{JID}
218    @ivar subscriptionIdentifier: Identifier for a specific subscription.
219    @type subscriptionIdentifier: C{unicode}
220    @ivar subscriptions: Subscriptions to be modified, as a set of
221                         L{Subscription}.
222    @type subscriptions: C{set}
223    """
224
225    verb = None
226
227    affiliations = None
228    items = None
229    itemIdentifiers = None
230    maxItems = None
231    nodeIdentifier = None
232    nodeType = None
233    options = None
234    subscriber = None
235    subscriptionIdentifier = None
236    subscriptions = None
237
238    # Map request iq type and subelement name to request verb
239    _requestVerbMap = {
240        ('set', NS_PUBSUB, 'publish'): 'publish',
241        ('set', NS_PUBSUB, 'subscribe'): 'subscribe',
242        ('set', NS_PUBSUB, 'unsubscribe'): 'unsubscribe',
243        ('get', NS_PUBSUB, 'options'): 'optionsGet',
244        ('set', NS_PUBSUB, 'options'): 'optionsSet',
245        ('get', NS_PUBSUB, 'subscriptions'): 'subscriptions',
246        ('get', NS_PUBSUB, 'affiliations'): 'affiliations',
247        ('set', NS_PUBSUB, 'create'): 'create',
248        ('get', NS_PUBSUB_OWNER, 'default'): 'default',
249        ('get', NS_PUBSUB_OWNER, 'configure'): 'configureGet',
250        ('set', NS_PUBSUB_OWNER, 'configure'): 'configureSet',
251        ('get', NS_PUBSUB, 'items'): 'items',
252        ('set', NS_PUBSUB, 'retract'): 'retract',
253        ('set', NS_PUBSUB_OWNER, 'purge'): 'purge',
254        ('set', NS_PUBSUB_OWNER, 'delete'): 'delete',
255        ('get', NS_PUBSUB_OWNER, 'affiliations'): 'affiliationsGet',
256        ('set', NS_PUBSUB_OWNER, 'affiliations'): 'affiliationsSet',
257        ('get', NS_PUBSUB_OWNER, 'subscriptions'): 'subscriptionsGet',
258        ('set', NS_PUBSUB_OWNER, 'subscriptions'): 'subscriptionsSet',
259    }
260
261    # Map request verb to request iq type and subelement name
262    _verbRequestMap = dict(((v, k) for k, v in _requestVerbMap.iteritems()))
263
264    # Map request verb to parameter handler names
265    _parameters = {
266        'publish': ['node', 'items'],
267        'subscribe': ['nodeOrEmpty', 'jid', 'optionsWithSubscribe'],
268        'unsubscribe': ['nodeOrEmpty', 'jid', 'subidOrNone'],
269        'optionsGet': ['nodeOrEmpty', 'jid', 'subidOrNone'],
270        'optionsSet': ['nodeOrEmpty', 'jid', 'options', 'subidOrNone'],
271        'subscriptions': [],
272        'affiliations': [],
273        'create': ['nodeOrNone', 'configureOrNone'],
274        'default': ['default'],
275        'configureGet': ['nodeOrEmpty'],
276        'configureSet': ['nodeOrEmpty', 'configure'],
277        'items': ['node', 'maxItems', 'itemIdentifiers', 'subidOrNone'],
278        'retract': ['node', 'itemIdentifiers'],
279        'purge': ['node'],
280        'delete': ['node'],
281        'affiliationsGet': ['nodeOrEmpty'],
282        'affiliationsSet': [],
283        'subscriptionsGet': ['nodeOrEmpty'],
284        'subscriptionsSet': [],
285    }
286
287    def __init__(self, verb=None):
288        self.verb = verb
289
290
291    def _parse_node(self, verbElement):
292        """
293        Parse the required node identifier out of the verbElement.
294        """
295        try:
296            self.nodeIdentifier = verbElement["node"]
297        except KeyError:
298            raise BadRequest('nodeid-required')
299
300
301    def _render_node(self, verbElement):
302        """
303        Render the required node identifier on the verbElement.
304        """
305        if not self.nodeIdentifier:
306            raise Exception("Node identifier is required")
307
308        verbElement['node'] = self.nodeIdentifier
309
310
311    def _parse_nodeOrEmpty(self, verbElement):
312        """
313        Parse the node identifier out of the verbElement. May be empty.
314        """
315        self.nodeIdentifier = verbElement.getAttribute("node", '')
316
317
318    def _render_nodeOrEmpty(self, verbElement):
319        """
320        Render the node identifier on the verbElement. May be empty.
321        """
322        if self.nodeIdentifier:
323            verbElement['node'] = self.nodeIdentifier
324
325
326    def _parse_nodeOrNone(self, verbElement):
327        """
328        Parse the optional node identifier out of the verbElement.
329        """
330        self.nodeIdentifier = verbElement.getAttribute("node")
331
332
333    def _render_nodeOrNone(self, verbElement):
334        """
335        Render the optional node identifier on the verbElement.
336        """
337        if self.nodeIdentifier:
338            verbElement['node'] = self.nodeIdentifier
339
340
341    def _parse_items(self, verbElement):
342        """
343        Parse items out of the verbElement for publish requests.
344        """
345        self.items = []
346        for element in verbElement.elements():
347            if element.uri == NS_PUBSUB and element.name == 'item':
348                self.items.append(element)
349
350
351    def _render_items(self, verbElement):
352        """
353        Render items into the verbElement for publish requests.
354        """
355        if self.items:
356            for item in self.items:
357                verbElement.addChild(item)
358
359
360    def _parse_jid(self, verbElement):
361        """
362        Parse subscriber out of the verbElement for un-/subscribe requests.
363        """
364        try:
365            self.subscriber = jid.internJID(verbElement["jid"])
366        except KeyError:
367            raise BadRequest('jid-required')
368
369
370    def _render_jid(self, verbElement):
371        """
372        Render subscriber into the verbElement for un-/subscribe requests.
373        """
374        verbElement['jid'] = self.subscriber.full()
375
376
377    def _parse_default(self, verbElement):
378        """
379        Parse node type out of a request for the default node configuration.
380        """
381        form = data_form.findForm(verbElement, NS_PUBSUB_NODE_CONFIG)
382        if form and form.formType == 'submit':
383            values = form.getValues()
384            self.nodeType = values.get('pubsub#node_type', 'leaf')
385        else:
386            self.nodeType = 'leaf'
387
388
389    def _parse_configure(self, verbElement):
390        """
391        Parse options out of a request for setting the node configuration.
392        """
393        form = data_form.findForm(verbElement, NS_PUBSUB_NODE_CONFIG)
394        if form:
395            if form.formType in ('submit', 'cancel'):
396                self.options = form
397            else:
398                raise BadRequest(text=u"Unexpected form type '%s'" % form.formType)
399        else:
400            raise BadRequest(text="Missing configuration form")
401
402
403    def _parse_configureOrNone(self, verbElement):
404        """
405        Parse optional node configuration form in create request.
406        """
407        for element in verbElement.parent.elements():
408            if element.uri == NS_PUBSUB and element.name == 'configure':
409                form = data_form.findForm(element, NS_PUBSUB_NODE_CONFIG)
410                if form:
411                    if form.formType != 'submit':
412                        raise BadRequest(text=u"Unexpected form type '%s'" %
413                                              form.formType)
414                else:
415                    form = data_form.Form('submit',
416                                          formNamespace=NS_PUBSUB_NODE_CONFIG)
417                self.options = form
418
419
420    def _render_configureOrNone(self, verbElement):
421        """
422        Render optional node configuration form in create request.
423        """
424        if self.options is not None:
425            configure = verbElement.parent.addElement('configure')
426            configure.addChild(self.options.toElement())
427
428
429    def _parse_itemIdentifiers(self, verbElement):
430        """
431        Parse item identifiers out of items and retract requests.
432        """
433        self.itemIdentifiers = []
434        for element in verbElement.elements():
435            if element.uri == NS_PUBSUB and element.name == 'item':
436                try:
437                    self.itemIdentifiers.append(element["id"])
438                except KeyError:
439                    raise BadRequest()
440
441
442    def _render_itemIdentifiers(self, verbElement):
443        """
444        Render item identifiers into items and retract requests.
445        """
446        if self.itemIdentifiers:
447            for itemIdentifier in self.itemIdentifiers:
448                item = verbElement.addElement('item')
449                item['id'] = itemIdentifier
450
451
452    def _parse_maxItems(self, verbElement):
453        """
454        Parse maximum items out of an items request.
455        """
456        value = verbElement.getAttribute('max_items')
457
458        if value:
459            try:
460                self.maxItems = int(value)
461            except ValueError:
462                raise BadRequest(text="Field max_items requires a positive " +
463                                      "integer value")
464
465
466    def _render_maxItems(self, verbElement):
467        """
468        Render maximum items into an items request.
469        """
470        if self.maxItems:
471            verbElement['max_items'] = unicode(self.maxItems)
472
473
474    def _parse_subidOrNone(self, verbElement):
475        """
476        Parse subscription identifier out of a request.
477        """
478        self.subscriptionIdentifier = verbElement.getAttribute("subid")
479
480
481    def _render_subidOrNone(self, verbElement):
482        """
483        Render subscription identifier into a request.
484        """
485        if self.subscriptionIdentifier:
486            verbElement['subid'] = self.subscriptionIdentifier
487
488
489    def _parse_options(self, verbElement):
490        """
491        Parse options form out of a subscription options request.
492        """
493        form = data_form.findForm(verbElement, NS_PUBSUB_SUBSCRIBE_OPTIONS)
494        if form:
495            if form.formType in ('submit', 'cancel'):
496                self.options = form
497            else:
498                raise BadRequest(text=u"Unexpected form type '%s'" % form.formType)
499        else:
500            raise BadRequest(text="Missing options form")
501
502
503
504    def _render_options(self, verbElement):
505        verbElement.addChild(self.options.toElement())
506
507
508    def _parse_optionsWithSubscribe(self, verbElement):
509        for element in verbElement.parent.elements():
510            if element.name == 'options' and element.uri == NS_PUBSUB:
511                form = data_form.findForm(element,
512                                          NS_PUBSUB_SUBSCRIBE_OPTIONS)
513                if form:
514                    if form.formType != 'submit':
515                        raise BadRequest(text=u"Unexpected form type '%s'" %
516                                              form.formType)
517                else:
518                    form = data_form.Form('submit',
519                                          formNamespace=NS_PUBSUB_SUBSCRIBE_OPTIONS)
520                self.options = form
521
522
523    def _render_optionsWithSubscribe(self, verbElement):
524        if self.options:
525            optionsElement = verbElement.parent.addElement('options')
526            self._render_options(optionsElement)
527
528
529    def parseElement(self, element):
530        """
531        Parse the publish-subscribe verb and parameters out of a request.
532        """
533        generic.Stanza.parseElement(self, element)
534
535        verbs = []
536        children = []
537        for child in element.pubsub.elements():
538            key = (self.stanzaType, child.uri, child.name)
539            try:
540                verb = self._requestVerbMap[key]
541            except KeyError:
542                continue
543
544            verbs.append(verb)
545            children.append(child)
546
547        if not verbs:
548            raise NotImplementedError()
549
550        if len(verbs) > 1:
551            if 'optionsSet' in verbs and 'subscribe' in verbs:
552                self.verb = 'subscribe'
553                child = children[verbs.index('subscribe')]
554            else:
555                raise NotImplementedError()
556        else:
557            self.verb = verbs[0]
558
559        for parameter in self._parameters[self.verb]:
560            getattr(self, '_parse_%s' % parameter)(child)
561
562
563
564    def send(self, xs):
565        """
566        Send this request to its recipient.
567
568        This renders all of the relevant parameters for this specific
569        requests into an L{IQ}, and invoke its C{send} method.
570        This returns a deferred that fires upon reception of a response. See
571        L{IQ} for details.
572
573        @param xs: The XML stream to send the request on.
574        @type xs: L{xmlstream.XmlStream}
575        @rtype: L{defer.Deferred}.
576        """
577
578        try:
579            (self.stanzaType,
580             childURI,
581             childName) = self._verbRequestMap[self.verb]
582        except KeyError:
583            raise NotImplementedError()
584
585        iq = IQ(xs, self.stanzaType)
586        iq.addElement((childURI, 'pubsub'))
587        verbElement = iq.pubsub.addElement(childName)
588
589        if self.sender:
590            iq['from'] = self.sender.full()
591        if self.recipient:
592            iq['to'] = self.recipient.full()
593
594        for parameter in self._parameters[self.verb]:
595            getattr(self, '_render_%s' % parameter)(verbElement)
596
597        return iq.send()
598
599
600
601class PubSubEvent(object):
602    """
603    A publish subscribe event.
604
605    @param sender: The entity from which the notification was received.
606    @type sender: L{jid.JID}
607    @param recipient: The entity to which the notification was sent.
608    @type recipient: L{wokkel.pubsub.ItemsEvent}
609    @param nodeIdentifier: Identifier of the node the event pertains to.
610    @type nodeIdentifier: C{unicode}
611    @param headers: SHIM headers, see L{wokkel.shim.extractHeaders}.
612    @type headers: L{dict}
613    """
614
615    def __init__(self, sender, recipient, nodeIdentifier, headers):
616        self.sender = sender
617        self.recipient = recipient
618        self.nodeIdentifier = nodeIdentifier
619        self.headers = headers
620
621
622
623class ItemsEvent(PubSubEvent):
624    """
625    A publish-subscribe event that signifies new, updated and retracted items.
626
627    @param items: List of received items as domish elements.
628    @type items: C{list} of L{domish.Element}
629    """
630
631    def __init__(self, sender, recipient, nodeIdentifier, items, headers):
632        PubSubEvent.__init__(self, sender, recipient, nodeIdentifier, headers)
633        self.items = items
634
635
636
637class DeleteEvent(PubSubEvent):
638    """
639    A publish-subscribe event that signifies the deletion of a node.
640    """
641
642    redirectURI = None
643
644
645
646class PurgeEvent(PubSubEvent):
647    """
648    A publish-subscribe event that signifies the purging of a node.
649    """
650
651
652
653class PubSubClient(XMPPHandler):
654    """
655    Publish subscribe client protocol.
656    """
657
658    implements(IPubSubClient)
659
660    def connectionInitialized(self):
661        self.xmlstream.addObserver('/message/event[@xmlns="%s"]' %
662                                   NS_PUBSUB_EVENT, self._onEvent)
663
664
665    def _onEvent(self, message):
666        try:
667            sender = jid.JID(message["from"])
668            recipient = jid.JID(message["to"])
669        except KeyError:
670            return
671
672        actionElement = None
673        for element in message.event.elements():
674            if element.uri == NS_PUBSUB_EVENT:
675                actionElement = element
676
677        if not actionElement:
678            return
679
680        eventHandler = getattr(self, "_onEvent_%s" % actionElement.name, None)
681
682        if eventHandler:
683            headers = shim.extractHeaders(message)
684            eventHandler(sender, recipient, actionElement, headers)
685            message.handled = True
686
687
688    def _onEvent_items(self, sender, recipient, action, headers):
689        nodeIdentifier = action["node"]
690
691        items = [element for element in action.elements()
692                         if element.name in ('item', 'retract')]
693
694        event = ItemsEvent(sender, recipient, nodeIdentifier, items, headers)
695        self.itemsReceived(event)
696
697
698    def _onEvent_delete(self, sender, recipient, action, headers):
699        nodeIdentifier = action["node"]
700        event = DeleteEvent(sender, recipient, nodeIdentifier, headers)
701        if action.redirect:
702            event.redirectURI = action.redirect.getAttribute('uri')
703        self.deleteReceived(event)
704
705
706    def _onEvent_purge(self, sender, recipient, action, headers):
707        nodeIdentifier = action["node"]
708        event = PurgeEvent(sender, recipient, nodeIdentifier, headers)
709        self.purgeReceived(event)
710
711
712    def itemsReceived(self, event):
713        pass
714
715
716    def deleteReceived(self, event):
717        pass
718
719
720    def purgeReceived(self, event):
721        pass
722
723
724    def createNode(self, service, nodeIdentifier=None, options=None,
725                         sender=None):
726        """
727        Create a publish subscribe node.
728
729        @param service: The publish subscribe service to create the node at.
730        @type service: L{JID}
731        @param nodeIdentifier: Optional suggestion for the id of the node.
732        @type nodeIdentifier: C{unicode}
733        @param options: Optional node configuration options.
734        @type options: C{dict}
735        """
736        request = PubSubRequest('create')
737        request.recipient = service
738        request.nodeIdentifier = nodeIdentifier
739        request.sender = sender
740
741        if options:
742            form = data_form.Form(formType='submit',
743                                  formNamespace=NS_PUBSUB_NODE_CONFIG)
744            form.makeFields(options)
745            request.options = form
746
747        def cb(iq):
748            try:
749                new_node = iq.pubsub.create["node"]
750            except AttributeError:
751                # the suggested node identifier was accepted
752                new_node = nodeIdentifier
753            return new_node
754
755        d = request.send(self.xmlstream)
756        d.addCallback(cb)
757        return d
758
759
760    def deleteNode(self, service, nodeIdentifier, sender=None):
761        """
762        Delete a publish subscribe node.
763
764        @param service: The publish subscribe service to delete the node from.
765        @type service: L{JID}
766        @param nodeIdentifier: The identifier of the node.
767        @type nodeIdentifier: C{unicode}
768        """
769        request = PubSubRequest('delete')
770        request.recipient = service
771        request.nodeIdentifier = nodeIdentifier
772        request.sender = sender
773        return request.send(self.xmlstream)
774
775
776    def subscribe(self, service, nodeIdentifier, subscriber,
777                        options=None, sender=None):
778        """
779        Subscribe to a publish subscribe node.
780
781        @param service: The publish subscribe service that keeps the node.
782        @type service: L{JID}
783
784        @param nodeIdentifier: The identifier of the node.
785        @type nodeIdentifier: C{unicode}
786
787        @param subscriber: The entity to subscribe to the node. This entity
788            will get notifications of new published items.
789        @type subscriber: L{JID}
790
791        @param options: Subscription options.
792        @type options: C{dict}
793
794        @return: Deferred that fires with L{Subscription} or errbacks with
795            L{SubscriptionPending} or L{SubscriptionUnconfigured}.
796        @rtype: L{defer.Deferred}
797        """
798        request = PubSubRequest('subscribe')
799        request.recipient = service
800        request.nodeIdentifier = nodeIdentifier
801        request.subscriber = subscriber
802        request.sender = sender
803
804        if options:
805            form = data_form.Form(formType='submit',
806                                  formNamespace=NS_PUBSUB_SUBSCRIBE_OPTIONS)
807            form.makeFields(options)
808            request.options = form
809
810        def cb(iq):
811            subscription = Subscription.fromElement(iq.pubsub.subscription)
812
813            if subscription.state == 'pending':
814                raise SubscriptionPending()
815            elif subscription.state == 'unconfigured':
816                raise SubscriptionUnconfigured()
817            else:
818                # we assume subscription == 'subscribed'
819                # any other value would be invalid, but that should have
820                # yielded a stanza error.
821                return subscription
822
823        d = request.send(self.xmlstream)
824        d.addCallback(cb)
825        return d
826
827
828    def unsubscribe(self, service, nodeIdentifier, subscriber,
829                          subscriptionIdentifier=None, sender=None):
830        """
831        Unsubscribe from a publish subscribe node.
832
833        @param service: The publish subscribe service that keeps the node.
834        @type service: L{JID}
835
836        @param nodeIdentifier: The identifier of the node.
837        @type nodeIdentifier: C{unicode}
838
839        @param subscriber: The entity to unsubscribe from the node.
840        @type subscriber: L{JID}
841
842        @param subscriptionIdentifier: Optional subscription identifier.
843        @type subscriptionIdentifier: C{unicode}
844        """
845        request = PubSubRequest('unsubscribe')
846        request.recipient = service
847        request.nodeIdentifier = nodeIdentifier
848        request.subscriber = subscriber
849        request.subscriptionIdentifier = subscriptionIdentifier
850        request.sender = sender
851        return request.send(self.xmlstream)
852
853
854    def publish(self, service, nodeIdentifier, items=None, sender=None):
855        """
856        Publish to a publish subscribe node.
857
858        @param service: The publish subscribe service that keeps the node.
859        @type service: L{JID}
860        @param nodeIdentifier: The identifier of the node.
861        @type nodeIdentifier: C{unicode}
862        @param items: Optional list of L{Item}s to publish.
863        @type items: C{list}
864        """
865        request = PubSubRequest('publish')
866        request.recipient = service
867        request.nodeIdentifier = nodeIdentifier
868        request.items = items
869        request.sender = sender
870        return request.send(self.xmlstream)
871
872
873    def items(self, service, nodeIdentifier, maxItems=None,
874              subscriptionIdentifier=None, sender=None):
875        """
876        Retrieve previously published items from a publish subscribe node.
877
878        @param service: The publish subscribe service that keeps the node.
879        @type service: L{JID}
880
881        @param nodeIdentifier: The identifier of the node.
882        @type nodeIdentifier: C{unicode}
883
884        @param maxItems: Optional limit on the number of retrieved items.
885        @type maxItems: C{int}
886
887        @param subscriptionIdentifier: Optional subscription identifier. In
888            case the node has been subscribed to multiple times, this narrows
889            the results to the specific subscription.
890        @type subscriptionIdentifier: C{unicode}
891        """
892        request = PubSubRequest('items')
893        request.recipient = service
894        request.nodeIdentifier = nodeIdentifier
895        if maxItems:
896            request.maxItems = str(int(maxItems))
897        request.subscriptionIdentifier = subscriptionIdentifier
898        request.sender = sender
899
900        def cb(iq):
901            items = []
902            for element in iq.pubsub.items.elements():
903                if element.uri == NS_PUBSUB and element.name == 'item':
904                    items.append(element)
905            return items
906
907        d = request.send(self.xmlstream)
908        d.addCallback(cb)
909        return d
910
911
912    def getOptions(self, service, nodeIdentifier, subscriber,
913                         subscriptionIdentifier=None, sender=None):
914        """
915        Get subscription options.
916
917        @param service: The publish subscribe service that keeps the node.
918        @type service: L{JID}
919
920        @param nodeIdentifier: The identifier of the node.
921        @type nodeIdentifier: C{unicode}
922
923        @param subscriber: The entity subscribed to the node.
924        @type subscriber: L{JID}
925
926        @param subscriptionIdentifier: Optional subscription identifier.
927        @type subscriptionIdentifier: C{unicode}
928
929        @rtype: L{data_form.Form}
930        """
931        request = PubSubRequest('optionsGet')
932        request.recipient = service
933        request.nodeIdentifier = nodeIdentifier
934        request.subscriber = subscriber
935        request.subscriptionIdentifier = subscriptionIdentifier
936        request.sender = sender
937
938        def cb(iq):
939            form = data_form.findForm(iq.pubsub.options,
940                                      NS_PUBSUB_SUBSCRIBE_OPTIONS)
941            form.typeCheck()
942            return form
943
944        d = request.send(self.xmlstream)
945        d.addCallback(cb)
946        return d
947
948
949    def setOptions(self, service, nodeIdentifier, subscriber,
950                         options, subscriptionIdentifier=None, sender=None):
951        """
952        Set subscription options.
953
954        @param service: The publish subscribe service that keeps the node.
955        @type service: L{JID}
956
957        @param nodeIdentifier: The identifier of the node.
958        @type nodeIdentifier: C{unicode}
959
960        @param subscriber: The entity subscribed to the node.
961        @type subscriber: L{JID}
962
963        @param options: Subscription options.
964        @type options: C{dict}.
965
966        @param subscriptionIdentifier: Optional subscription identifier.
967        @type subscriptionIdentifier: C{unicode}
968        """
969        request = PubSubRequest('optionsSet')
970        request.recipient = service
971        request.nodeIdentifier = nodeIdentifier
972        request.subscriber = subscriber
973        request.subscriptionIdentifier = subscriptionIdentifier
974        request.sender = sender
975
976        form = data_form.Form(formType='submit',
977                              formNamespace=NS_PUBSUB_SUBSCRIBE_OPTIONS)
978        form.makeFields(options)
979        request.options = form
980
981        d = request.send(self.xmlstream)
982        return d
983
984
985
986class PubSubService(XMPPHandler, IQHandlerMixin):
987    """
988    Protocol implementation for a XMPP Publish Subscribe Service.
989
990    The word Service here is used as taken from the Publish Subscribe
991    specification. It is the party responsible for keeping nodes and their
992    subscriptions, and sending out notifications.
993
994    Methods from the L{IPubSubService} interface that are called as
995    a result of an XMPP request may raise exceptions. Alternatively the
996    deferred returned by these methods may have their errback called. These are
997    handled as follows:
998
999     - If the exception is an instance of L{error.StanzaError}, an error
1000       response iq is returned.
1001     - Any other exception is reported using L{log.msg}. An error response
1002       with the condition C{internal-server-error} is returned.
1003
1004    The default implementation of said methods raises an L{Unsupported}
1005    exception and are meant to be overridden.
1006
1007    @ivar discoIdentity: Service discovery identity as a dictionary with
1008                         keys C{'category'}, C{'type'} and C{'name'}.
1009    @ivar pubSubFeatures: List of supported publish-subscribe features for
1010                          service discovery, as C{str}.
1011    @type pubSubFeatures: C{list} or C{None}
1012    """
1013
1014    implements(IPubSubService)
1015
1016    iqHandlers = {
1017            '/*': '_onPubSubRequest',
1018            }
1019
1020    _legacyHandlers = {
1021        'publish': ('publish', ['sender', 'recipient',
1022                                'nodeIdentifier', 'items']),
1023        'subscribe': ('subscribe', ['sender', 'recipient',
1024                                    'nodeIdentifier', 'subscriber']),
1025        'unsubscribe': ('unsubscribe', ['sender', 'recipient',
1026                                        'nodeIdentifier', 'subscriber']),
1027        'subscriptions': ('subscriptions', ['sender', 'recipient']),
1028        'affiliations': ('affiliations', ['sender', 'recipient']),
1029        'create': ('create', ['sender', 'recipient', 'nodeIdentifier']),
1030        'getConfigurationOptions': ('getConfigurationOptions', []),
1031        'default': ('getDefaultConfiguration',
1032                    ['sender', 'recipient', 'nodeType']),
1033        'configureGet': ('getConfiguration', ['sender', 'recipient',
1034                                              'nodeIdentifier']),
1035        'configureSet': ('setConfiguration', ['sender', 'recipient',
1036                                              'nodeIdentifier', 'options']),
1037        'items': ('items', ['sender', 'recipient', 'nodeIdentifier',
1038                            'maxItems', 'itemIdentifiers']),
1039        'retract': ('retract', ['sender', 'recipient', 'nodeIdentifier',
1040                                'itemIdentifiers']),
1041        'purge': ('purge', ['sender', 'recipient', 'nodeIdentifier']),
1042        'delete': ('delete', ['sender', 'recipient', 'nodeIdentifier']),
1043    }
1044
1045    hideNodes = False
1046
1047    def __init__(self, resource=None):
1048        self.resource = resource
1049        self.discoIdentity = {'category': 'pubsub',
1050                              'type': 'service',
1051                              'name': 'Generic Publish-Subscribe Service'}
1052
1053        self.pubSubFeatures = []
1054
1055
1056    def connectionMade(self):
1057        self.xmlstream.addObserver(PUBSUB_REQUEST, self.handleRequest)
1058
1059
1060    def getDiscoInfo(self, requestor, target, nodeIdentifier):
1061        def toInfo(nodeInfo, info):
1062            if not nodeInfo:
1063                return info
1064
1065            (nodeType, metaData) = nodeInfo['type'], nodeInfo['meta-data']
1066            info.append(disco.DiscoIdentity('pubsub', nodeType))
1067            if metaData:
1068                form = data_form.Form(formType="result",
1069                                      formNamespace=NS_PUBSUB_META_DATA)
1070                form.addField(
1071                        data_form.Field(
1072                            var='pubsub#node_type',
1073                            value=nodeType,
1074                            label='The type of node (collection or leaf)'
1075                        )
1076                )
1077
1078                for metaDatum in metaData:
1079                    form.addField(data_form.Field.fromDict(metaDatum))
1080
1081                info.append(form)
1082
1083            return info
1084
1085        info = []
1086
1087        request = PubSubRequest('discoInfo')
1088
1089        if self.resource is not None:
1090            resource = self.resource.locateResource(request)
1091            identity = resource.discoIdentity
1092            features = resource.features
1093            getInfo = resource.getInfo
1094        else:
1095            category = self.discoIdentity['category']
1096            idType = self.discoIdentity['type']
1097            name = self.discoIdentity['name']
1098            identity = disco.DiscoIdentity(category, idType, name)
1099            features = self.pubSubFeatures
1100            getInfo = self.getNodeInfo
1101
1102        if not nodeIdentifier:
1103            info.append(identity)
1104            info.append(disco.DiscoFeature(disco.NS_DISCO_ITEMS))
1105            info.extend([disco.DiscoFeature("%s#%s" % (NS_PUBSUB, feature))
1106                         for feature in features])
1107
1108        d = defer.maybeDeferred(getInfo, requestor, target, nodeIdentifier or '')
1109        d.addCallback(toInfo, info)
1110        d.addErrback(log.err)
1111        return d
1112
1113
1114    def getDiscoItems(self, requestor, target, nodeIdentifier):
1115        if self.hideNodes:
1116            d = defer.succeed([])
1117        elif self.resource is not None:
1118            request = PubSubRequest('discoInfo')
1119            resource = self.resource.locateResource(request)
1120            d = resource.getNodes(requestor, target, nodeIdentifier)
1121        elif nodeIdentifier:
1122            d = self.getNodes(requestor, target)
1123        else:
1124            d = defer.succeed([])
1125
1126        d.addCallback(lambda nodes: [disco.DiscoItem(target, node)
1127                                     for node in nodes])
1128        return d
1129
1130
1131    def _onPubSubRequest(self, iq):
1132        request = PubSubRequest.fromElement(iq)
1133
1134        if self.resource is not None:
1135            resource = self.resource.locateResource(request)
1136        else:
1137            resource = self
1138
1139        # Preprocess the request, knowing the handling resource
1140        try:
1141            preProcessor = getattr(self, '_preProcess_%s' % request.verb)
1142        except AttributeError:
1143            pass
1144        else:
1145            request = preProcessor(resource, request)
1146            if request is None:
1147                return defer.succeed(None)
1148
1149        # Process the request itself,
1150        if resource is not self:
1151            try:
1152                handler = getattr(resource, request.verb)
1153            except AttributeError:
1154                # fix lookup feature
1155                text = "Request verb: %s" % request.verb
1156                return defer.fail(Unsupported('', text))
1157
1158            d = handler(request)
1159        else:
1160            handlerName, argNames = self._legacyHandlers[request.verb]
1161            handler = getattr(self, handlerName)
1162            args = [getattr(request, arg) for arg in argNames]
1163            if 'options' in argNames:
1164                args[argNames.index('options')] = request.options.getValues()
1165            d = handler(*args)
1166
1167        # If needed, translate the result into a response
1168        try:
1169            cb = getattr(self, '_toResponse_%s' % request.verb)
1170        except AttributeError:
1171            pass
1172        else:
1173            d.addCallback(cb, resource, request)
1174
1175        return d
1176
1177
1178    def _toResponse_subscribe(self, result, resource, request):
1179        response = domish.Element((NS_PUBSUB, "pubsub"))
1180        subscription = response.addChild(result.toElement())
1181        return response
1182
1183
1184    def _toResponse_subscriptions(self, result, resource, request):
1185        response = domish.Element((NS_PUBSUB, 'pubsub'))
1186        subscriptions = response.addElement('subscriptions')
1187        for subscription in result:
1188            subscriptions.addChild(subscription.toElement())
1189        return response
1190
1191
1192    def _toResponse_affiliations(self, result, resource, request):
1193        response = domish.Element((NS_PUBSUB, 'pubsub'))
1194        affiliations = response.addElement('affiliations')
1195
1196        for nodeIdentifier, affiliation in result:
1197            item = affiliations.addElement('affiliation')
1198            item['node'] = nodeIdentifier
1199            item['affiliation'] = affiliation
1200
1201        return response
1202
1203
1204    def _toResponse_create(self, result, resource, request):
1205        if not request.nodeIdentifier or request.nodeIdentifier != result:
1206            response = domish.Element((NS_PUBSUB, 'pubsub'))
1207            create = response.addElement('create')
1208            create['node'] = result
1209            return response
1210        else:
1211            return None
1212
1213
1214    def _formFromConfiguration(self, resource, values):
1215        fieldDefs = resource.getConfigurationOptions()
1216        form = data_form.Form(formType="form",
1217                              formNamespace=NS_PUBSUB_NODE_CONFIG)
1218        form.makeFields(values, fieldDefs)
1219        return form
1220
1221
1222    def _checkConfiguration(self, resource, form):
1223        fieldDefs = resource.getConfigurationOptions()
1224        form.typeCheck(fieldDefs, filterUnknown=True)
1225
1226
1227    def _preProcess_create(self, resource, request):
1228        if request.options:
1229            self._checkConfiguration(resource, request.options)
1230        return request
1231
1232
1233    def _preProcess_default(self, resource, request):
1234        if request.nodeType not in ('leaf', 'collection'):
1235            raise error.StanzaError('not-acceptable')
1236        else:
1237            return request
1238
1239
1240    def _toResponse_default(self, options, resource, request):
1241        response = domish.Element((NS_PUBSUB_OWNER, "pubsub"))
1242        default = response.addElement("default")
1243        form = self._formFromConfiguration(resource, options)
1244        default.addChild(form.toElement())
1245        return response
1246
1247
1248    def _toResponse_configureGet(self, options, resource, request):
1249        response = domish.Element((NS_PUBSUB_OWNER, "pubsub"))
1250        configure = response.addElement("configure")
1251        form = self._formFromConfiguration(resource, options)
1252        configure.addChild(form.toElement())
1253
1254        if request.nodeIdentifier:
1255            configure["node"] = request.nodeIdentifier
1256
1257        return response
1258
1259
1260    def _preProcess_configureSet(self, resource, request):
1261        if request.options.formType == 'cancel':
1262            return None
1263        else:
1264            self._checkConfiguration(resource, request.options)
1265            return request
1266
1267
1268    def _toResponse_items(self, result, resource, request):
1269        response = domish.Element((NS_PUBSUB, 'pubsub'))
1270        items = response.addElement('items')
1271        items["node"] = request.nodeIdentifier
1272
1273        for item in result:
1274            items.addChild(item)
1275
1276        return response
1277
1278
1279    def _createNotification(self, eventType, service, nodeIdentifier,
1280                                  subscriber, subscriptions=None):
1281        headers = []
1282
1283        if subscriptions:
1284            for subscription in subscriptions:
1285                if nodeIdentifier != subscription.nodeIdentifier:
1286                    headers.append(('Collection', subscription.nodeIdentifier))
1287
1288        message = domish.Element((None, "message"))
1289        message["from"] = service.full()
1290        message["to"] = subscriber.full()
1291        event = message.addElement((NS_PUBSUB_EVENT, "event"))
1292
1293        element = event.addElement(eventType)
1294        element["node"] = nodeIdentifier
1295
1296        if headers:
1297            message.addChild(shim.Headers(headers))
1298
1299        return message
1300
1301    # public methods
1302
1303    def notifyPublish(self, service, nodeIdentifier, notifications):
1304        for subscriber, subscriptions, items in notifications:
1305            message = self._createNotification('items', service,
1306                                               nodeIdentifier, subscriber,
1307                                               subscriptions)
1308            message.event.items.children = items
1309            self.send(message)
1310
1311
1312    def notifyDelete(self, service, nodeIdentifier, subscribers,
1313                           redirectURI=None):
1314        for subscriber in subscribers:
1315            message = self._createNotification('delete', service,
1316                                               nodeIdentifier,
1317                                               subscriber)
1318            if redirectURI:
1319                redirect = message.event.delete.addElement('redirect')
1320                redirect['uri'] = redirectURI
1321            self.send(message)
1322
1323
1324    def getNodeInfo(self, requestor, service, nodeIdentifier):
1325        return None
1326
1327
1328    def getNodes(self, requestor, service):
1329        return []
1330
1331
1332    def publish(self, requestor, service, nodeIdentifier, items):
1333        raise Unsupported('publish')
1334
1335
1336    def subscribe(self, requestor, service, nodeIdentifier, subscriber):
1337        raise Unsupported('subscribe')
1338
1339
1340    def unsubscribe(self, requestor, service, nodeIdentifier, subscriber):
1341        raise Unsupported('subscribe')
1342
1343
1344    def subscriptions(self, requestor, service):
1345        raise Unsupported('retrieve-subscriptions')
1346
1347
1348    def affiliations(self, requestor, service):
1349        raise Unsupported('retrieve-affiliations')
1350
1351
1352    def create(self, requestor, service, nodeIdentifier):
1353        raise Unsupported('create-nodes')
1354
1355
1356    def getConfigurationOptions(self):
1357        return {}
1358
1359
1360    def getDefaultConfiguration(self, requestor, service, nodeType):
1361        raise Unsupported('retrieve-default')
1362
1363
1364    def getConfiguration(self, requestor, service, nodeIdentifier):
1365        raise Unsupported('config-node')
1366
1367
1368    def setConfiguration(self, requestor, service, nodeIdentifier, options):
1369        raise Unsupported('config-node')
1370
1371
1372    def items(self, requestor, service, nodeIdentifier, maxItems,
1373                    itemIdentifiers):
1374        raise Unsupported('retrieve-items')
1375
1376
1377    def retract(self, requestor, service, nodeIdentifier, itemIdentifiers):
1378        raise Unsupported('retract-items')
1379
1380
1381    def purge(self, requestor, service, nodeIdentifier):
1382        raise Unsupported('purge-nodes')
1383
1384
1385    def delete(self, requestor, service, nodeIdentifier):
1386        raise Unsupported('delete-nodes')
1387
1388
1389
1390class PubSubResource(object):
1391
1392    implements(IPubSubResource)
1393
1394    features = []
1395    discoIdentity = disco.DiscoIdentity('pubsub',
1396                                        'service',
1397                                        'Publish-Subscribe Service')
1398
1399
1400    def locateResource(self, request):
1401        return self
1402
1403
1404    def getInfo(self, requestor, service, nodeIdentifier):
1405        return defer.succeed(None)
1406
1407
1408    def getNodes(self, requestor, service, nodeIdentifier):
1409        return defer.succeed([])
1410
1411
1412    def getConfigurationOptions(self):
1413        return {}
1414
1415
1416    def publish(self, request):
1417        return defer.fail(Unsupported('publish'))
1418
1419
1420    def subscribe(self, request):
1421        return defer.fail(Unsupported('subscribe'))
1422
1423
1424    def unsubscribe(self, request):
1425        return defer.fail(Unsupported('subscribe'))
1426
1427
1428    def subscriptions(self, request):
1429        return defer.fail(Unsupported('retrieve-subscriptions'))
1430
1431
1432    def affiliations(self, request):
1433        return defer.fail(Unsupported('retrieve-affiliations'))
1434
1435
1436    def create(self, request):
1437        return defer.fail(Unsupported('create-nodes'))
1438
1439
1440    def default(self, request):
1441        return defer.fail(Unsupported('retrieve-default'))
1442
1443
1444    def configureGet(self, request):
1445        return defer.fail(Unsupported('config-node'))
1446
1447
1448    def configureSet(self, request):
1449        return defer.fail(Unsupported('config-node'))
1450
1451
1452    def items(self, request):
1453        return defer.fail(Unsupported('retrieve-items'))
1454
1455
1456    def retract(self, request):
1457        return defer.fail(Unsupported('retract-items'))
1458
1459
1460    def purge(self, request):
1461        return defer.fail(Unsupported('purge-nodes'))
1462
1463
1464    def delete(self, request):
1465        return defer.fail(Unsupported('delete-nodes'))
1466
1467
1468    def affiliationsGet(self, request):
1469        return defer.fail(Unsupported('modify-affiliations'))
1470
1471
1472    def affiliationsSet(self, request):
1473        return defer.fail(Unsupported('modify-affiliations'))
1474
1475
1476    def subscriptionsGet(self, request):
1477        return defer.fail(Unsupported('manage-subscriptions'))
1478
1479
1480    def subscriptionsSet(self, request):
1481        return defer.fail(Unsupported('manage-subscriptions'))
Note: See TracBrowser for help on using the repository browser.