source: wokkel/pubsub.py @ 92:f69b08281bec

Last change on this file since 92:f69b08281bec was 92:f69b08281bec, checked in by Ralph Meijer <ralphm@…>, 11 years ago

Properly pass verb element to parse methods, not just the last one.

Author: ralphm.
Fixes: #72.

This also cleans up a few pyflakes warnings, including a masked test.

File size: 47.4 KB
Line 
1# -*- test-case-name: wokkel.test.test_pubsub -*-
2#
3# Copyright (c) 2003-2011 Ralph Meijer
4# See LICENSE for details.
5
6"""
7XMPP publish-subscribe protocol.
8
9This protocol is specified in
10U{XEP-0060<http://www.xmpp.org/extensions/xep-0060.html>}.
11"""
12
13from zope.interface import implements
14
15from twisted.internet import defer
16from twisted.python import log
17from twisted.words.protocols.jabber import jid, error
18from twisted.words.xish import domish
19
20from wokkel import disco, data_form, generic, shim
21from wokkel.compat import IQ
22from wokkel.subprotocols import IQHandlerMixin, XMPPHandler
23from wokkel.iwokkel import IPubSubClient, IPubSubService, IPubSubResource
24
25# Iq get and set XPath queries
26IQ_GET = '/iq[@type="get"]'
27IQ_SET = '/iq[@type="set"]'
28
29# Publish-subscribe namespaces
30NS_PUBSUB = 'http://jabber.org/protocol/pubsub'
31NS_PUBSUB_EVENT = NS_PUBSUB + '#event'
32NS_PUBSUB_ERRORS = NS_PUBSUB + '#errors'
33NS_PUBSUB_OWNER = NS_PUBSUB + "#owner"
34NS_PUBSUB_NODE_CONFIG = NS_PUBSUB + "#node_config"
35NS_PUBSUB_META_DATA = NS_PUBSUB + "#meta-data"
36NS_PUBSUB_SUBSCRIBE_OPTIONS = NS_PUBSUB + "#subscribe_options"
37
38# XPath to match pubsub requests
39PUBSUB_REQUEST = '/iq[@type="get" or @type="set"]/' + \
40                    'pubsub[@xmlns="' + NS_PUBSUB + '" or ' + \
41                           '@xmlns="' + NS_PUBSUB_OWNER + '"]'
42
43class SubscriptionPending(Exception):
44    """
45    Raised when the requested subscription is pending acceptance.
46    """
47
48
49
50class SubscriptionUnconfigured(Exception):
51    """
52    Raised when the requested subscription needs to be configured before
53    becoming active.
54    """
55
56
57
58class PubSubError(error.StanzaError):
59    """
60    Exception with publish-subscribe specific condition.
61    """
62    def __init__(self, condition, pubsubCondition, feature=None, text=None):
63        appCondition = domish.Element((NS_PUBSUB_ERRORS, pubsubCondition))
64        if feature:
65            appCondition['feature'] = feature
66        error.StanzaError.__init__(self, condition,
67                                         text=text,
68                                         appCondition=appCondition)
69
70
71
72class BadRequest(error.StanzaError):
73    """
74    Bad request stanza error.
75    """
76    def __init__(self, pubsubCondition=None, text=None):
77        if pubsubCondition:
78            appCondition = domish.Element((NS_PUBSUB_ERRORS, pubsubCondition))
79        else:
80            appCondition = None
81        error.StanzaError.__init__(self, 'bad-request',
82                                         text=text,
83                                         appCondition=appCondition)
84
85
86
87class Unsupported(PubSubError):
88    def __init__(self, feature, text=None):
89        self.feature = feature
90        PubSubError.__init__(self, 'feature-not-implemented',
91                                   'unsupported',
92                                   feature,
93                                   text)
94
95    def __str__(self):
96        message = PubSubError.__str__(self)
97        message += ', feature %r' % self.feature
98        return message
99
100
101class Subscription(object):
102    """
103    A subscription to a node.
104
105    @ivar nodeIdentifier: The identifier of the node subscribed to.  The root
106        node is denoted by C{None}.
107    @type nodeIdentifier: C{unicode}
108
109    @ivar subscriber: The subscribing entity.
110    @type subscriber: L{jid.JID}
111
112    @ivar state: The subscription state. One of C{'subscribed'}, C{'pending'},
113                 C{'unconfigured'}.
114    @type state: C{unicode}
115
116    @ivar options: Optional list of subscription options.
117    @type options: C{dict}
118
119    @ivar subscriptionIdentifier: Optional subscription identifier.
120    @type subscriptionIdentifier: C{unicode}
121    """
122
123    def __init__(self, nodeIdentifier, subscriber, state, options=None,
124                       subscriptionIdentifier=None):
125        self.nodeIdentifier = nodeIdentifier
126        self.subscriber = subscriber
127        self.state = state
128        self.options = options or {}
129        self.subscriptionIdentifier = subscriptionIdentifier
130
131
132    @staticmethod
133    def fromElement(element):
134        return Subscription(
135                element.getAttribute('node'),
136                jid.JID(element.getAttribute('jid')),
137                element.getAttribute('subscription'),
138                subscriptionIdentifier=element.getAttribute('subid'))
139
140
141    def toElement(self):
142        """
143        Return the DOM representation of this subscription.
144
145        @rtype: L{domish.Element}
146        """
147        element = domish.Element((None, 'subscription'))
148        if self.nodeIdentifier:
149            element['node'] = self.nodeIdentifier
150        element['jid'] = unicode(self.subscriber)
151        element['subscription'] = self.state
152        if self.subscriptionIdentifier:
153            element['subid'] = self.subscriptionIdentifier
154        return element
155
156
157
158class Item(domish.Element):
159    """
160    Publish subscribe item.
161
162    This behaves like an object providing L{domish.IElement}.
163
164    Item payload can be added using C{addChild} or C{addRawXml}, or using the
165    C{payload} keyword argument to C{__init__}.
166    """
167
168    def __init__(self, id=None, payload=None):
169        """
170        @param id: optional item identifier
171        @type id: L{unicode}
172        @param payload: optional item payload. Either as a domish element, or
173                        as serialized XML.
174        @type payload: object providing L{domish.IElement} or L{unicode}.
175        """
176
177        domish.Element.__init__(self, (None, 'item'))
178        if id is not None:
179            self['id'] = id
180        if payload is not None:
181            if isinstance(payload, basestring):
182                self.addRawXml(payload)
183            else:
184                self.addChild(payload)
185
186
187
188class PubSubRequest(generic.Stanza):
189    """
190    A publish-subscribe request.
191
192    The set of instance variables used depends on the type of request. If
193    a variable is not applicable or not passed in the request, its value is
194    C{None}.
195
196    @ivar verb: The type of publish-subscribe request. See L{_requestVerbMap}.
197    @type verb: C{str}.
198
199    @ivar affiliations: Affiliations to be modified.
200    @type affiliations: C{set}
201    @ivar items: The items to be published, as L{domish.Element}s.
202    @type items: C{list}
203    @ivar itemIdentifiers: Identifiers of the items to be retrieved or
204                           retracted.
205    @type itemIdentifiers: C{set}
206    @ivar maxItems: Maximum number of items to retrieve.
207    @type maxItems: C{int}.
208    @ivar nodeIdentifier: Identifier of the node the request is about.
209    @type nodeIdentifier: C{unicode}
210    @ivar nodeType: The type of node that should be created, or for which the
211                    configuration is retrieved. C{'leaf'} or C{'collection'}.
212    @type nodeType: C{str}
213    @ivar options: Configurations options for nodes, subscriptions and publish
214                   requests.
215    @type options: L{data_form.Form}
216    @ivar subscriber: The subscribing entity.
217    @type subscriber: L{JID}
218    @ivar subscriptionIdentifier: Identifier for a specific subscription.
219    @type subscriptionIdentifier: C{unicode}
220    @ivar subscriptions: Subscriptions to be modified, as a set of
221                         L{Subscription}.
222    @type subscriptions: C{set}
223    """
224
225    verb = None
226
227    affiliations = None
228    items = None
229    itemIdentifiers = None
230    maxItems = None
231    nodeIdentifier = None
232    nodeType = None
233    options = None
234    subscriber = None
235    subscriptionIdentifier = None
236    subscriptions = None
237
238    # Map request iq type and subelement name to request verb
239    _requestVerbMap = {
240        ('set', NS_PUBSUB, 'publish'): 'publish',
241        ('set', NS_PUBSUB, 'subscribe'): 'subscribe',
242        ('set', NS_PUBSUB, 'unsubscribe'): 'unsubscribe',
243        ('get', NS_PUBSUB, 'options'): 'optionsGet',
244        ('set', NS_PUBSUB, 'options'): 'optionsSet',
245        ('get', NS_PUBSUB, 'subscriptions'): 'subscriptions',
246        ('get', NS_PUBSUB, 'affiliations'): 'affiliations',
247        ('set', NS_PUBSUB, 'create'): 'create',
248        ('get', NS_PUBSUB_OWNER, 'default'): 'default',
249        ('get', NS_PUBSUB_OWNER, 'configure'): 'configureGet',
250        ('set', NS_PUBSUB_OWNER, 'configure'): 'configureSet',
251        ('get', NS_PUBSUB, 'items'): 'items',
252        ('set', NS_PUBSUB, 'retract'): 'retract',
253        ('set', NS_PUBSUB_OWNER, 'purge'): 'purge',
254        ('set', NS_PUBSUB_OWNER, 'delete'): 'delete',
255        ('get', NS_PUBSUB_OWNER, 'affiliations'): 'affiliationsGet',
256        ('set', NS_PUBSUB_OWNER, 'affiliations'): 'affiliationsSet',
257        ('get', NS_PUBSUB_OWNER, 'subscriptions'): 'subscriptionsGet',
258        ('set', NS_PUBSUB_OWNER, 'subscriptions'): 'subscriptionsSet',
259    }
260
261    # Map request verb to request iq type and subelement name
262    _verbRequestMap = dict(((v, k) for k, v in _requestVerbMap.iteritems()))
263
264    # Map request verb to parameter handler names
265    _parameters = {
266        'publish': ['node', 'items'],
267        'subscribe': ['nodeOrEmpty', 'jid', 'optionsWithSubscribe'],
268        'unsubscribe': ['nodeOrEmpty', 'jid', 'subidOrNone'],
269        'optionsGet': ['nodeOrEmpty', 'jid', 'subidOrNone'],
270        'optionsSet': ['nodeOrEmpty', 'jid', 'options', 'subidOrNone'],
271        'subscriptions': [],
272        'affiliations': [],
273        'create': ['nodeOrNone', 'configureOrNone'],
274        'default': ['default'],
275        'configureGet': ['nodeOrEmpty'],
276        'configureSet': ['nodeOrEmpty', 'configure'],
277        'items': ['node', 'maxItems', 'itemIdentifiers', 'subidOrNone'],
278        'retract': ['node', 'itemIdentifiers'],
279        'purge': ['node'],
280        'delete': ['node'],
281        'affiliationsGet': ['nodeOrEmpty'],
282        'affiliationsSet': [],
283        'subscriptionsGet': ['nodeOrEmpty'],
284        'subscriptionsSet': [],
285    }
286
287    def __init__(self, verb=None):
288        self.verb = verb
289
290
291    def _parse_node(self, verbElement):
292        """
293        Parse the required node identifier out of the verbElement.
294        """
295        try:
296            self.nodeIdentifier = verbElement["node"]
297        except KeyError:
298            raise BadRequest('nodeid-required')
299
300
301    def _render_node(self, verbElement):
302        """
303        Render the required node identifier on the verbElement.
304        """
305        if not self.nodeIdentifier:
306            raise Exception("Node identifier is required")
307
308        verbElement['node'] = self.nodeIdentifier
309
310
311    def _parse_nodeOrEmpty(self, verbElement):
312        """
313        Parse the node identifier out of the verbElement. May be empty.
314        """
315        self.nodeIdentifier = verbElement.getAttribute("node", '')
316
317
318    def _render_nodeOrEmpty(self, verbElement):
319        """
320        Render the node identifier on the verbElement. May be empty.
321        """
322        if self.nodeIdentifier:
323            verbElement['node'] = self.nodeIdentifier
324
325
326    def _parse_nodeOrNone(self, verbElement):
327        """
328        Parse the optional node identifier out of the verbElement.
329        """
330        self.nodeIdentifier = verbElement.getAttribute("node")
331
332
333    def _render_nodeOrNone(self, verbElement):
334        """
335        Render the optional node identifier on the verbElement.
336        """
337        if self.nodeIdentifier:
338            verbElement['node'] = self.nodeIdentifier
339
340
341    def _parse_items(self, verbElement):
342        """
343        Parse items out of the verbElement for publish requests.
344        """
345        self.items = []
346        for element in verbElement.elements():
347            if element.uri == NS_PUBSUB and element.name == 'item':
348                self.items.append(element)
349
350
351    def _render_items(self, verbElement):
352        """
353        Render items into the verbElement for publish requests.
354        """
355        if self.items:
356            for item in self.items:
357                verbElement.addChild(item)
358
359
360    def _parse_jid(self, verbElement):
361        """
362        Parse subscriber out of the verbElement for un-/subscribe requests.
363        """
364        try:
365            self.subscriber = jid.internJID(verbElement["jid"])
366        except KeyError:
367            raise BadRequest('jid-required')
368
369
370    def _render_jid(self, verbElement):
371        """
372        Render subscriber into the verbElement for un-/subscribe requests.
373        """
374        verbElement['jid'] = self.subscriber.full()
375
376
377    def _parse_default(self, verbElement):
378        """
379        Parse node type out of a request for the default node configuration.
380        """
381        form = data_form.findForm(verbElement, NS_PUBSUB_NODE_CONFIG)
382        if form and form.formType == 'submit':
383            values = form.getValues()
384            self.nodeType = values.get('pubsub#node_type', 'leaf')
385        else:
386            self.nodeType = 'leaf'
387
388
389    def _parse_configure(self, verbElement):
390        """
391        Parse options out of a request for setting the node configuration.
392        """
393        form = data_form.findForm(verbElement, NS_PUBSUB_NODE_CONFIG)
394        if form:
395            if form.formType in ('submit', 'cancel'):
396                self.options = form
397            else:
398                raise BadRequest(text=u"Unexpected form type '%s'" % form.formType)
399        else:
400            raise BadRequest(text="Missing configuration form")
401
402
403    def _parse_configureOrNone(self, verbElement):
404        """
405        Parse optional node configuration form in create request.
406        """
407        for element in verbElement.parent.elements():
408            if element.uri == NS_PUBSUB and element.name == 'configure':
409                form = data_form.findForm(element, NS_PUBSUB_NODE_CONFIG)
410                if form:
411                    if form.formType != 'submit':
412                        raise BadRequest(text=u"Unexpected form type '%s'" %
413                                              form.formType)
414                else:
415                    form = data_form.Form('submit',
416                                          formNamespace=NS_PUBSUB_NODE_CONFIG)
417                self.options = form
418
419
420    def _render_configureOrNone(self, verbElement):
421        """
422        Render optional node configuration form in create request.
423        """
424        if self.options is not None:
425            configure = verbElement.parent.addElement('configure')
426            configure.addChild(self.options.toElement())
427
428
429    def _parse_itemIdentifiers(self, verbElement):
430        """
431        Parse item identifiers out of items and retract requests.
432        """
433        self.itemIdentifiers = []
434        for element in verbElement.elements():
435            if element.uri == NS_PUBSUB and element.name == 'item':
436                try:
437                    self.itemIdentifiers.append(element["id"])
438                except KeyError:
439                    raise BadRequest()
440
441
442    def _render_itemIdentifiers(self, verbElement):
443        """
444        Render item identifiers into items and retract requests.
445        """
446        if self.itemIdentifiers:
447            for itemIdentifier in self.itemIdentifiers:
448                item = verbElement.addElement('item')
449                item['id'] = itemIdentifier
450
451
452    def _parse_maxItems(self, verbElement):
453        """
454        Parse maximum items out of an items request.
455        """
456        value = verbElement.getAttribute('max_items')
457
458        if value:
459            try:
460                self.maxItems = int(value)
461            except ValueError:
462                raise BadRequest(text="Field max_items requires a positive " +
463                                      "integer value")
464
465
466    def _render_maxItems(self, verbElement):
467        """
468        Render maximum items into an items request.
469        """
470        if self.maxItems:
471            verbElement['max_items'] = unicode(self.maxItems)
472
473
474    def _parse_subidOrNone(self, verbElement):
475        """
476        Parse subscription identifier out of a request.
477        """
478        self.subscriptionIdentifier = verbElement.getAttribute("subid")
479
480
481    def _render_subidOrNone(self, verbElement):
482        """
483        Render subscription identifier into a request.
484        """
485        if self.subscriptionIdentifier:
486            verbElement['subid'] = self.subscriptionIdentifier
487
488
489    def _parse_options(self, verbElement):
490        """
491        Parse options form out of a subscription options request.
492        """
493        form = data_form.findForm(verbElement, NS_PUBSUB_SUBSCRIBE_OPTIONS)
494        if form:
495            if form.formType in ('submit', 'cancel'):
496                self.options = form
497            else:
498                raise BadRequest(text=u"Unexpected form type '%s'" % form.formType)
499        else:
500            raise BadRequest(text="Missing options form")
501
502
503
504    def _render_options(self, verbElement):
505        verbElement.addChild(self.options.toElement())
506
507
508    def _parse_optionsWithSubscribe(self, verbElement):
509        for element in verbElement.parent.elements():
510            if element.name == 'options' and element.uri == NS_PUBSUB:
511                form = data_form.findForm(element,
512                                          NS_PUBSUB_SUBSCRIBE_OPTIONS)
513                if form:
514                    if form.formType != 'submit':
515                        raise BadRequest(text=u"Unexpected form type '%s'" %
516                                              form.formType)
517                else:
518                    form = data_form.Form('submit',
519                                          formNamespace=NS_PUBSUB_SUBSCRIBE_OPTIONS)
520                self.options = form
521
522
523    def _render_optionsWithSubscribe(self, verbElement):
524        if self.options:
525            optionsElement = verbElement.parent.addElement('options')
526            self._render_options(optionsElement)
527
528
529    def parseElement(self, element):
530        """
531        Parse the publish-subscribe verb and parameters out of a request.
532        """
533        generic.Stanza.parseElement(self, element)
534
535        verbs = []
536        verbElements = []
537        for child in element.pubsub.elements():
538            key = (self.stanzaType, child.uri, child.name)
539            try:
540                verb = self._requestVerbMap[key]
541            except KeyError:
542                continue
543
544            verbs.append(verb)
545            verbElements.append(child)
546
547        if not verbs:
548            raise NotImplementedError()
549
550        if len(verbs) > 1:
551            if 'optionsSet' in verbs and 'subscribe' in verbs:
552                self.verb = 'subscribe'
553                verbElement = verbElements[verbs.index('subscribe')]
554            else:
555                raise NotImplementedError()
556        else:
557            self.verb = verbs[0]
558            verbElement = verbElements[0]
559
560        for parameter in self._parameters[self.verb]:
561            getattr(self, '_parse_%s' % parameter)(verbElement)
562
563
564
565    def send(self, xs):
566        """
567        Send this request to its recipient.
568
569        This renders all of the relevant parameters for this specific
570        requests into an L{IQ}, and invoke its C{send} method.
571        This returns a deferred that fires upon reception of a response. See
572        L{IQ} for details.
573
574        @param xs: The XML stream to send the request on.
575        @type xs: L{xmlstream.XmlStream}
576        @rtype: L{defer.Deferred}.
577        """
578
579        try:
580            (self.stanzaType,
581             childURI,
582             childName) = self._verbRequestMap[self.verb]
583        except KeyError:
584            raise NotImplementedError()
585
586        iq = IQ(xs, self.stanzaType)
587        iq.addElement((childURI, 'pubsub'))
588        verbElement = iq.pubsub.addElement(childName)
589
590        if self.sender:
591            iq['from'] = self.sender.full()
592        if self.recipient:
593            iq['to'] = self.recipient.full()
594
595        for parameter in self._parameters[self.verb]:
596            getattr(self, '_render_%s' % parameter)(verbElement)
597
598        return iq.send()
599
600
601
602class PubSubEvent(object):
603    """
604    A publish subscribe event.
605
606    @param sender: The entity from which the notification was received.
607    @type sender: L{jid.JID}
608    @param recipient: The entity to which the notification was sent.
609    @type recipient: L{wokkel.pubsub.ItemsEvent}
610    @param nodeIdentifier: Identifier of the node the event pertains to.
611    @type nodeIdentifier: C{unicode}
612    @param headers: SHIM headers, see L{wokkel.shim.extractHeaders}.
613    @type headers: L{dict}
614    """
615
616    def __init__(self, sender, recipient, nodeIdentifier, headers):
617        self.sender = sender
618        self.recipient = recipient
619        self.nodeIdentifier = nodeIdentifier
620        self.headers = headers
621
622
623
624class ItemsEvent(PubSubEvent):
625    """
626    A publish-subscribe event that signifies new, updated and retracted items.
627
628    @param items: List of received items as domish elements.
629    @type items: C{list} of L{domish.Element}
630    """
631
632    def __init__(self, sender, recipient, nodeIdentifier, items, headers):
633        PubSubEvent.__init__(self, sender, recipient, nodeIdentifier, headers)
634        self.items = items
635
636
637
638class DeleteEvent(PubSubEvent):
639    """
640    A publish-subscribe event that signifies the deletion of a node.
641    """
642
643    redirectURI = None
644
645
646
647class PurgeEvent(PubSubEvent):
648    """
649    A publish-subscribe event that signifies the purging of a node.
650    """
651
652
653
654class PubSubClient(XMPPHandler):
655    """
656    Publish subscribe client protocol.
657    """
658
659    implements(IPubSubClient)
660
661    def connectionInitialized(self):
662        self.xmlstream.addObserver('/message/event[@xmlns="%s"]' %
663                                   NS_PUBSUB_EVENT, self._onEvent)
664
665
666    def _onEvent(self, message):
667        if message.getAttribute('type') == 'error':
668            return
669
670        try:
671            sender = jid.JID(message["from"])
672            recipient = jid.JID(message["to"])
673        except KeyError:
674            return
675
676        actionElement = None
677        for element in message.event.elements():
678            if element.uri == NS_PUBSUB_EVENT:
679                actionElement = element
680
681        if not actionElement:
682            return
683
684        eventHandler = getattr(self, "_onEvent_%s" % actionElement.name, None)
685
686        if eventHandler:
687            headers = shim.extractHeaders(message)
688            eventHandler(sender, recipient, actionElement, headers)
689            message.handled = True
690
691
692    def _onEvent_items(self, sender, recipient, action, headers):
693        nodeIdentifier = action["node"]
694
695        items = [element for element in action.elements()
696                         if element.name in ('item', 'retract')]
697
698        event = ItemsEvent(sender, recipient, nodeIdentifier, items, headers)
699        self.itemsReceived(event)
700
701
702    def _onEvent_delete(self, sender, recipient, action, headers):
703        nodeIdentifier = action["node"]
704        event = DeleteEvent(sender, recipient, nodeIdentifier, headers)
705        if action.redirect:
706            event.redirectURI = action.redirect.getAttribute('uri')
707        self.deleteReceived(event)
708
709
710    def _onEvent_purge(self, sender, recipient, action, headers):
711        nodeIdentifier = action["node"]
712        event = PurgeEvent(sender, recipient, nodeIdentifier, headers)
713        self.purgeReceived(event)
714
715
716    def itemsReceived(self, event):
717        pass
718
719
720    def deleteReceived(self, event):
721        pass
722
723
724    def purgeReceived(self, event):
725        pass
726
727
728    def createNode(self, service, nodeIdentifier=None, options=None,
729                         sender=None):
730        """
731        Create a publish subscribe node.
732
733        @param service: The publish subscribe service to create the node at.
734        @type service: L{JID}
735        @param nodeIdentifier: Optional suggestion for the id of the node.
736        @type nodeIdentifier: C{unicode}
737        @param options: Optional node configuration options.
738        @type options: C{dict}
739        """
740        request = PubSubRequest('create')
741        request.recipient = service
742        request.nodeIdentifier = nodeIdentifier
743        request.sender = sender
744
745        if options:
746            form = data_form.Form(formType='submit',
747                                  formNamespace=NS_PUBSUB_NODE_CONFIG)
748            form.makeFields(options)
749            request.options = form
750
751        def cb(iq):
752            try:
753                new_node = iq.pubsub.create["node"]
754            except AttributeError:
755                # the suggested node identifier was accepted
756                new_node = nodeIdentifier
757            return new_node
758
759        d = request.send(self.xmlstream)
760        d.addCallback(cb)
761        return d
762
763
764    def deleteNode(self, service, nodeIdentifier, sender=None):
765        """
766        Delete a publish subscribe node.
767
768        @param service: The publish subscribe service to delete the node from.
769        @type service: L{JID}
770        @param nodeIdentifier: The identifier of the node.
771        @type nodeIdentifier: C{unicode}
772        """
773        request = PubSubRequest('delete')
774        request.recipient = service
775        request.nodeIdentifier = nodeIdentifier
776        request.sender = sender
777        return request.send(self.xmlstream)
778
779
780    def subscribe(self, service, nodeIdentifier, subscriber,
781                        options=None, sender=None):
782        """
783        Subscribe to a publish subscribe node.
784
785        @param service: The publish subscribe service that keeps the node.
786        @type service: L{JID}
787
788        @param nodeIdentifier: The identifier of the node.
789        @type nodeIdentifier: C{unicode}
790
791        @param subscriber: The entity to subscribe to the node. This entity
792            will get notifications of new published items.
793        @type subscriber: L{JID}
794
795        @param options: Subscription options.
796        @type options: C{dict}
797
798        @return: Deferred that fires with L{Subscription} or errbacks with
799            L{SubscriptionPending} or L{SubscriptionUnconfigured}.
800        @rtype: L{defer.Deferred}
801        """
802        request = PubSubRequest('subscribe')
803        request.recipient = service
804        request.nodeIdentifier = nodeIdentifier
805        request.subscriber = subscriber
806        request.sender = sender
807
808        if options:
809            form = data_form.Form(formType='submit',
810                                  formNamespace=NS_PUBSUB_SUBSCRIBE_OPTIONS)
811            form.makeFields(options)
812            request.options = form
813
814        def cb(iq):
815            subscription = Subscription.fromElement(iq.pubsub.subscription)
816
817            if subscription.state == 'pending':
818                raise SubscriptionPending()
819            elif subscription.state == 'unconfigured':
820                raise SubscriptionUnconfigured()
821            else:
822                # we assume subscription == 'subscribed'
823                # any other value would be invalid, but that should have
824                # yielded a stanza error.
825                return subscription
826
827        d = request.send(self.xmlstream)
828        d.addCallback(cb)
829        return d
830
831
832    def unsubscribe(self, service, nodeIdentifier, subscriber,
833                          subscriptionIdentifier=None, sender=None):
834        """
835        Unsubscribe from a publish subscribe node.
836
837        @param service: The publish subscribe service that keeps the node.
838        @type service: L{JID}
839
840        @param nodeIdentifier: The identifier of the node.
841        @type nodeIdentifier: C{unicode}
842
843        @param subscriber: The entity to unsubscribe from the node.
844        @type subscriber: L{JID}
845
846        @param subscriptionIdentifier: Optional subscription identifier.
847        @type subscriptionIdentifier: C{unicode}
848        """
849        request = PubSubRequest('unsubscribe')
850        request.recipient = service
851        request.nodeIdentifier = nodeIdentifier
852        request.subscriber = subscriber
853        request.subscriptionIdentifier = subscriptionIdentifier
854        request.sender = sender
855        return request.send(self.xmlstream)
856
857
858    def publish(self, service, nodeIdentifier, items=None, sender=None):
859        """
860        Publish to a publish subscribe node.
861
862        @param service: The publish subscribe service that keeps the node.
863        @type service: L{JID}
864        @param nodeIdentifier: The identifier of the node.
865        @type nodeIdentifier: C{unicode}
866        @param items: Optional list of L{Item}s to publish.
867        @type items: C{list}
868        """
869        request = PubSubRequest('publish')
870        request.recipient = service
871        request.nodeIdentifier = nodeIdentifier
872        request.items = items
873        request.sender = sender
874        return request.send(self.xmlstream)
875
876
877    def items(self, service, nodeIdentifier, maxItems=None,
878              subscriptionIdentifier=None, sender=None):
879        """
880        Retrieve previously published items from a publish subscribe node.
881
882        @param service: The publish subscribe service that keeps the node.
883        @type service: L{JID}
884
885        @param nodeIdentifier: The identifier of the node.
886        @type nodeIdentifier: C{unicode}
887
888        @param maxItems: Optional limit on the number of retrieved items.
889        @type maxItems: C{int}
890
891        @param subscriptionIdentifier: Optional subscription identifier. In
892            case the node has been subscribed to multiple times, this narrows
893            the results to the specific subscription.
894        @type subscriptionIdentifier: C{unicode}
895        """
896        request = PubSubRequest('items')
897        request.recipient = service
898        request.nodeIdentifier = nodeIdentifier
899        if maxItems:
900            request.maxItems = str(int(maxItems))
901        request.subscriptionIdentifier = subscriptionIdentifier
902        request.sender = sender
903
904        def cb(iq):
905            items = []
906            for element in iq.pubsub.items.elements():
907                if element.uri == NS_PUBSUB and element.name == 'item':
908                    items.append(element)
909            return items
910
911        d = request.send(self.xmlstream)
912        d.addCallback(cb)
913        return d
914
915
916    def getOptions(self, service, nodeIdentifier, subscriber,
917                         subscriptionIdentifier=None, sender=None):
918        """
919        Get subscription options.
920
921        @param service: The publish subscribe service that keeps the node.
922        @type service: L{JID}
923
924        @param nodeIdentifier: The identifier of the node.
925        @type nodeIdentifier: C{unicode}
926
927        @param subscriber: The entity subscribed to the node.
928        @type subscriber: L{JID}
929
930        @param subscriptionIdentifier: Optional subscription identifier.
931        @type subscriptionIdentifier: C{unicode}
932
933        @rtype: L{data_form.Form}
934        """
935        request = PubSubRequest('optionsGet')
936        request.recipient = service
937        request.nodeIdentifier = nodeIdentifier
938        request.subscriber = subscriber
939        request.subscriptionIdentifier = subscriptionIdentifier
940        request.sender = sender
941
942        def cb(iq):
943            form = data_form.findForm(iq.pubsub.options,
944                                      NS_PUBSUB_SUBSCRIBE_OPTIONS)
945            form.typeCheck()
946            return form
947
948        d = request.send(self.xmlstream)
949        d.addCallback(cb)
950        return d
951
952
953    def setOptions(self, service, nodeIdentifier, subscriber,
954                         options, subscriptionIdentifier=None, sender=None):
955        """
956        Set subscription options.
957
958        @param service: The publish subscribe service that keeps the node.
959        @type service: L{JID}
960
961        @param nodeIdentifier: The identifier of the node.
962        @type nodeIdentifier: C{unicode}
963
964        @param subscriber: The entity subscribed to the node.
965        @type subscriber: L{JID}
966
967        @param options: Subscription options.
968        @type options: C{dict}.
969
970        @param subscriptionIdentifier: Optional subscription identifier.
971        @type subscriptionIdentifier: C{unicode}
972        """
973        request = PubSubRequest('optionsSet')
974        request.recipient = service
975        request.nodeIdentifier = nodeIdentifier
976        request.subscriber = subscriber
977        request.subscriptionIdentifier = subscriptionIdentifier
978        request.sender = sender
979
980        form = data_form.Form(formType='submit',
981                              formNamespace=NS_PUBSUB_SUBSCRIBE_OPTIONS)
982        form.makeFields(options)
983        request.options = form
984
985        d = request.send(self.xmlstream)
986        return d
987
988
989
990class PubSubService(XMPPHandler, IQHandlerMixin):
991    """
992    Protocol implementation for a XMPP Publish Subscribe Service.
993
994    The word Service here is used as taken from the Publish Subscribe
995    specification. It is the party responsible for keeping nodes and their
996    subscriptions, and sending out notifications.
997
998    Methods from the L{IPubSubService} interface that are called as
999    a result of an XMPP request may raise exceptions. Alternatively the
1000    deferred returned by these methods may have their errback called. These are
1001    handled as follows:
1002
1003     - If the exception is an instance of L{error.StanzaError}, an error
1004       response iq is returned.
1005     - Any other exception is reported using L{log.msg}. An error response
1006       with the condition C{internal-server-error} is returned.
1007
1008    The default implementation of said methods raises an L{Unsupported}
1009    exception and are meant to be overridden.
1010
1011    @ivar discoIdentity: Service discovery identity as a dictionary with
1012                         keys C{'category'}, C{'type'} and C{'name'}.
1013    @ivar pubSubFeatures: List of supported publish-subscribe features for
1014                          service discovery, as C{str}.
1015    @type pubSubFeatures: C{list} or C{None}
1016    """
1017
1018    implements(IPubSubService, disco.IDisco)
1019
1020    iqHandlers = {
1021            '/*': '_onPubSubRequest',
1022            }
1023
1024    _legacyHandlers = {
1025        'publish': ('publish', ['sender', 'recipient',
1026                                'nodeIdentifier', 'items']),
1027        'subscribe': ('subscribe', ['sender', 'recipient',
1028                                    'nodeIdentifier', 'subscriber']),
1029        'unsubscribe': ('unsubscribe', ['sender', 'recipient',
1030                                        'nodeIdentifier', 'subscriber']),
1031        'subscriptions': ('subscriptions', ['sender', 'recipient']),
1032        'affiliations': ('affiliations', ['sender', 'recipient']),
1033        'create': ('create', ['sender', 'recipient', 'nodeIdentifier']),
1034        'getConfigurationOptions': ('getConfigurationOptions', []),
1035        'default': ('getDefaultConfiguration',
1036                    ['sender', 'recipient', 'nodeType']),
1037        'configureGet': ('getConfiguration', ['sender', 'recipient',
1038                                              'nodeIdentifier']),
1039        'configureSet': ('setConfiguration', ['sender', 'recipient',
1040                                              'nodeIdentifier', 'options']),
1041        'items': ('items', ['sender', 'recipient', 'nodeIdentifier',
1042                            'maxItems', 'itemIdentifiers']),
1043        'retract': ('retract', ['sender', 'recipient', 'nodeIdentifier',
1044                                'itemIdentifiers']),
1045        'purge': ('purge', ['sender', 'recipient', 'nodeIdentifier']),
1046        'delete': ('delete', ['sender', 'recipient', 'nodeIdentifier']),
1047    }
1048
1049    hideNodes = False
1050
1051    def __init__(self, resource=None):
1052        self.resource = resource
1053        self.discoIdentity = {'category': 'pubsub',
1054                              'type': 'service',
1055                              'name': 'Generic Publish-Subscribe Service'}
1056
1057        self.pubSubFeatures = []
1058
1059
1060    def connectionMade(self):
1061        self.xmlstream.addObserver(PUBSUB_REQUEST, self.handleRequest)
1062
1063
1064    def getDiscoInfo(self, requestor, target, nodeIdentifier=''):
1065        def toInfo(nodeInfo):
1066            if not nodeInfo:
1067                return
1068
1069            (nodeType, metaData) = nodeInfo['type'], nodeInfo['meta-data']
1070            info.append(disco.DiscoIdentity('pubsub', nodeType))
1071            if metaData:
1072                form = data_form.Form(formType="result",
1073                                      formNamespace=NS_PUBSUB_META_DATA)
1074                form.addField(
1075                        data_form.Field(
1076                            var='pubsub#node_type',
1077                            value=nodeType,
1078                            label='The type of node (collection or leaf)'
1079                        )
1080                )
1081
1082                for metaDatum in metaData:
1083                    form.addField(data_form.Field.fromDict(metaDatum))
1084
1085                info.append(form)
1086
1087            return
1088
1089        info = []
1090
1091        request = PubSubRequest('discoInfo')
1092
1093        if self.resource is not None:
1094            resource = self.resource.locateResource(request)
1095            identity = resource.discoIdentity
1096            features = resource.features
1097            getInfo = resource.getInfo
1098        else:
1099            category = self.discoIdentity['category']
1100            idType = self.discoIdentity['type']
1101            name = self.discoIdentity['name']
1102            identity = disco.DiscoIdentity(category, idType, name)
1103            features = self.pubSubFeatures
1104            getInfo = self.getNodeInfo
1105
1106        if not nodeIdentifier:
1107            info.append(identity)
1108            info.append(disco.DiscoFeature(disco.NS_DISCO_ITEMS))
1109            info.extend([disco.DiscoFeature("%s#%s" % (NS_PUBSUB, feature))
1110                         for feature in features])
1111
1112        d = defer.maybeDeferred(getInfo, requestor, target, nodeIdentifier or '')
1113        d.addCallback(toInfo)
1114        d.addErrback(log.err)
1115        d.addCallback(lambda _: info)
1116        return d
1117
1118
1119    def getDiscoItems(self, requestor, target, nodeIdentifier=''):
1120        if self.hideNodes:
1121            d = defer.succeed([])
1122        elif self.resource is not None:
1123            request = PubSubRequest('discoInfo')
1124            resource = self.resource.locateResource(request)
1125            d = resource.getNodes(requestor, target, nodeIdentifier)
1126        elif nodeIdentifier:
1127            d = self.getNodes(requestor, target)
1128        else:
1129            d = defer.succeed([])
1130
1131        d.addCallback(lambda nodes: [disco.DiscoItem(target, node)
1132                                     for node in nodes])
1133        return d
1134
1135
1136    def _onPubSubRequest(self, iq):
1137        request = PubSubRequest.fromElement(iq)
1138
1139        if self.resource is not None:
1140            resource = self.resource.locateResource(request)
1141        else:
1142            resource = self
1143
1144        # Preprocess the request, knowing the handling resource
1145        try:
1146            preProcessor = getattr(self, '_preProcess_%s' % request.verb)
1147        except AttributeError:
1148            pass
1149        else:
1150            request = preProcessor(resource, request)
1151            if request is None:
1152                return defer.succeed(None)
1153
1154        # Process the request itself,
1155        if resource is not self:
1156            try:
1157                handler = getattr(resource, request.verb)
1158            except AttributeError:
1159                # fix lookup feature
1160                text = "Request verb: %s" % request.verb
1161                return defer.fail(Unsupported('', text))
1162
1163            d = handler(request)
1164        else:
1165            handlerName, argNames = self._legacyHandlers[request.verb]
1166            handler = getattr(self, handlerName)
1167            args = [getattr(request, arg) for arg in argNames]
1168            if 'options' in argNames:
1169                args[argNames.index('options')] = request.options.getValues()
1170            d = handler(*args)
1171
1172        # If needed, translate the result into a response
1173        try:
1174            cb = getattr(self, '_toResponse_%s' % request.verb)
1175        except AttributeError:
1176            pass
1177        else:
1178            d.addCallback(cb, resource, request)
1179
1180        return d
1181
1182
1183    def _toResponse_subscribe(self, result, resource, request):
1184        response = domish.Element((NS_PUBSUB, "pubsub"))
1185        response.addChild(result.toElement())
1186        return response
1187
1188
1189    def _toResponse_subscriptions(self, result, resource, request):
1190        response = domish.Element((NS_PUBSUB, 'pubsub'))
1191        subscriptions = response.addElement('subscriptions')
1192        for subscription in result:
1193            subscriptions.addChild(subscription.toElement())
1194        return response
1195
1196
1197    def _toResponse_affiliations(self, result, resource, request):
1198        response = domish.Element((NS_PUBSUB, 'pubsub'))
1199        affiliations = response.addElement('affiliations')
1200
1201        for nodeIdentifier, affiliation in result:
1202            item = affiliations.addElement('affiliation')
1203            item['node'] = nodeIdentifier
1204            item['affiliation'] = affiliation
1205
1206        return response
1207
1208
1209    def _toResponse_create(self, result, resource, request):
1210        if not request.nodeIdentifier or request.nodeIdentifier != result:
1211            response = domish.Element((NS_PUBSUB, 'pubsub'))
1212            create = response.addElement('create')
1213            create['node'] = result
1214            return response
1215        else:
1216            return None
1217
1218
1219    def _formFromConfiguration(self, resource, values):
1220        fieldDefs = resource.getConfigurationOptions()
1221        form = data_form.Form(formType="form",
1222                              formNamespace=NS_PUBSUB_NODE_CONFIG)
1223        form.makeFields(values, fieldDefs)
1224        return form
1225
1226
1227    def _checkConfiguration(self, resource, form):
1228        fieldDefs = resource.getConfigurationOptions()
1229        form.typeCheck(fieldDefs, filterUnknown=True)
1230
1231
1232    def _preProcess_create(self, resource, request):
1233        if request.options:
1234            self._checkConfiguration(resource, request.options)
1235        return request
1236
1237
1238    def _preProcess_default(self, resource, request):
1239        if request.nodeType not in ('leaf', 'collection'):
1240            raise error.StanzaError('not-acceptable')
1241        else:
1242            return request
1243
1244
1245    def _toResponse_default(self, options, resource, request):
1246        response = domish.Element((NS_PUBSUB_OWNER, "pubsub"))
1247        default = response.addElement("default")
1248        form = self._formFromConfiguration(resource, options)
1249        default.addChild(form.toElement())
1250        return response
1251
1252
1253    def _toResponse_configureGet(self, options, resource, request):
1254        response = domish.Element((NS_PUBSUB_OWNER, "pubsub"))
1255        configure = response.addElement("configure")
1256        form = self._formFromConfiguration(resource, options)
1257        configure.addChild(form.toElement())
1258
1259        if request.nodeIdentifier:
1260            configure["node"] = request.nodeIdentifier
1261
1262        return response
1263
1264
1265    def _preProcess_configureSet(self, resource, request):
1266        if request.options.formType == 'cancel':
1267            return None
1268        else:
1269            self._checkConfiguration(resource, request.options)
1270            return request
1271
1272
1273    def _toResponse_items(self, result, resource, request):
1274        response = domish.Element((NS_PUBSUB, 'pubsub'))
1275        items = response.addElement('items')
1276        items["node"] = request.nodeIdentifier
1277
1278        for item in result:
1279            items.addChild(item)
1280
1281        return response
1282
1283
1284    def _createNotification(self, eventType, service, nodeIdentifier,
1285                                  subscriber, subscriptions=None):
1286        headers = []
1287
1288        if subscriptions:
1289            for subscription in subscriptions:
1290                if nodeIdentifier != subscription.nodeIdentifier:
1291                    headers.append(('Collection', subscription.nodeIdentifier))
1292
1293        message = domish.Element((None, "message"))
1294        message["from"] = service.full()
1295        message["to"] = subscriber.full()
1296        event = message.addElement((NS_PUBSUB_EVENT, "event"))
1297
1298        element = event.addElement(eventType)
1299        element["node"] = nodeIdentifier
1300
1301        if headers:
1302            message.addChild(shim.Headers(headers))
1303
1304        return message
1305
1306    # public methods
1307
1308    def notifyPublish(self, service, nodeIdentifier, notifications):
1309        for subscriber, subscriptions, items in notifications:
1310            message = self._createNotification('items', service,
1311                                               nodeIdentifier, subscriber,
1312                                               subscriptions)
1313            message.event.items.children = items
1314            self.send(message)
1315
1316
1317    def notifyDelete(self, service, nodeIdentifier, subscribers,
1318                           redirectURI=None):
1319        for subscriber in subscribers:
1320            message = self._createNotification('delete', service,
1321                                               nodeIdentifier,
1322                                               subscriber)
1323            if redirectURI:
1324                redirect = message.event.delete.addElement('redirect')
1325                redirect['uri'] = redirectURI
1326            self.send(message)
1327
1328
1329    def getNodeInfo(self, requestor, service, nodeIdentifier):
1330        return None
1331
1332
1333    def getNodes(self, requestor, service):
1334        return []
1335
1336
1337    def publish(self, requestor, service, nodeIdentifier, items):
1338        raise Unsupported('publish')
1339
1340
1341    def subscribe(self, requestor, service, nodeIdentifier, subscriber):
1342        raise Unsupported('subscribe')
1343
1344
1345    def unsubscribe(self, requestor, service, nodeIdentifier, subscriber):
1346        raise Unsupported('subscribe')
1347
1348
1349    def subscriptions(self, requestor, service):
1350        raise Unsupported('retrieve-subscriptions')
1351
1352
1353    def affiliations(self, requestor, service):
1354        raise Unsupported('retrieve-affiliations')
1355
1356
1357    def create(self, requestor, service, nodeIdentifier):
1358        raise Unsupported('create-nodes')
1359
1360
1361    def getConfigurationOptions(self):
1362        return {}
1363
1364
1365    def getDefaultConfiguration(self, requestor, service, nodeType):
1366        raise Unsupported('retrieve-default')
1367
1368
1369    def getConfiguration(self, requestor, service, nodeIdentifier):
1370        raise Unsupported('config-node')
1371
1372
1373    def setConfiguration(self, requestor, service, nodeIdentifier, options):
1374        raise Unsupported('config-node')
1375
1376
1377    def items(self, requestor, service, nodeIdentifier, maxItems,
1378                    itemIdentifiers):
1379        raise Unsupported('retrieve-items')
1380
1381
1382    def retract(self, requestor, service, nodeIdentifier, itemIdentifiers):
1383        raise Unsupported('retract-items')
1384
1385
1386    def purge(self, requestor, service, nodeIdentifier):
1387        raise Unsupported('purge-nodes')
1388
1389
1390    def delete(self, requestor, service, nodeIdentifier):
1391        raise Unsupported('delete-nodes')
1392
1393
1394
1395class PubSubResource(object):
1396
1397    implements(IPubSubResource)
1398
1399    features = []
1400    discoIdentity = disco.DiscoIdentity('pubsub',
1401                                        'service',
1402                                        'Publish-Subscribe Service')
1403
1404
1405    def locateResource(self, request):
1406        return self
1407
1408
1409    def getInfo(self, requestor, service, nodeIdentifier):
1410        return defer.succeed(None)
1411
1412
1413    def getNodes(self, requestor, service, nodeIdentifier):
1414        return defer.succeed([])
1415
1416
1417    def getConfigurationOptions(self):
1418        return {}
1419
1420
1421    def publish(self, request):
1422        return defer.fail(Unsupported('publish'))
1423
1424
1425    def subscribe(self, request):
1426        return defer.fail(Unsupported('subscribe'))
1427
1428
1429    def unsubscribe(self, request):
1430        return defer.fail(Unsupported('subscribe'))
1431
1432
1433    def subscriptions(self, request):
1434        return defer.fail(Unsupported('retrieve-subscriptions'))
1435
1436
1437    def affiliations(self, request):
1438        return defer.fail(Unsupported('retrieve-affiliations'))
1439
1440
1441    def create(self, request):
1442        return defer.fail(Unsupported('create-nodes'))
1443
1444
1445    def default(self, request):
1446        return defer.fail(Unsupported('retrieve-default'))
1447
1448
1449    def configureGet(self, request):
1450        return defer.fail(Unsupported('config-node'))
1451
1452
1453    def configureSet(self, request):
1454        return defer.fail(Unsupported('config-node'))
1455
1456
1457    def items(self, request):
1458        return defer.fail(Unsupported('retrieve-items'))
1459
1460
1461    def retract(self, request):
1462        return defer.fail(Unsupported('retract-items'))
1463
1464
1465    def purge(self, request):
1466        return defer.fail(Unsupported('purge-nodes'))
1467
1468
1469    def delete(self, request):
1470        return defer.fail(Unsupported('delete-nodes'))
1471
1472
1473    def affiliationsGet(self, request):
1474        return defer.fail(Unsupported('modify-affiliations'))
1475
1476
1477    def affiliationsSet(self, request):
1478        return defer.fail(Unsupported('modify-affiliations'))
1479
1480
1481    def subscriptionsGet(self, request):
1482        return defer.fail(Unsupported('manage-subscriptions'))
1483
1484
1485    def subscriptionsSet(self, request):
1486        return defer.fail(Unsupported('manage-subscriptions'))
Note: See TracBrowser for help on using the repository browser.