source: wokkel/pubsub.py @ 93:8f0856ddc616

Last change on this file since 93:8f0856ddc616 was 93:8f0856ddc616, checked in by Ralph Meijer <ralphm@…>, 11 years ago

Add support for managing affiliations of Publish-Subscribe nodes, server-side.

File size: 48.9 KB
Line 
1# -*- test-case-name: wokkel.test.test_pubsub -*-
2#
3# Copyright (c) 2003-2011 Ralph Meijer
4# See LICENSE for details.
5
6"""
7XMPP publish-subscribe protocol.
8
9This protocol is specified in
10U{XEP-0060<http://www.xmpp.org/extensions/xep-0060.html>}.
11"""
12
13from zope.interface import implements
14
15from twisted.internet import defer
16from twisted.python import log
17from twisted.words.protocols.jabber import jid, error
18from twisted.words.xish import domish
19
20from wokkel import disco, data_form, generic, shim
21from wokkel.compat import IQ
22from wokkel.subprotocols import IQHandlerMixin, XMPPHandler
23from wokkel.iwokkel import IPubSubClient, IPubSubService, IPubSubResource
24
25# Iq get and set XPath queries
26IQ_GET = '/iq[@type="get"]'
27IQ_SET = '/iq[@type="set"]'
28
29# Publish-subscribe namespaces
30NS_PUBSUB = 'http://jabber.org/protocol/pubsub'
31NS_PUBSUB_EVENT = NS_PUBSUB + '#event'
32NS_PUBSUB_ERRORS = NS_PUBSUB + '#errors'
33NS_PUBSUB_OWNER = NS_PUBSUB + "#owner"
34NS_PUBSUB_NODE_CONFIG = NS_PUBSUB + "#node_config"
35NS_PUBSUB_META_DATA = NS_PUBSUB + "#meta-data"
36NS_PUBSUB_SUBSCRIBE_OPTIONS = NS_PUBSUB + "#subscribe_options"
37
38# XPath to match pubsub requests
39PUBSUB_REQUEST = '/iq[@type="get" or @type="set"]/' + \
40                    'pubsub[@xmlns="' + NS_PUBSUB + '" or ' + \
41                           '@xmlns="' + NS_PUBSUB_OWNER + '"]'
42
43class SubscriptionPending(Exception):
44    """
45    Raised when the requested subscription is pending acceptance.
46    """
47
48
49
50class SubscriptionUnconfigured(Exception):
51    """
52    Raised when the requested subscription needs to be configured before
53    becoming active.
54    """
55
56
57
58class PubSubError(error.StanzaError):
59    """
60    Exception with publish-subscribe specific condition.
61    """
62    def __init__(self, condition, pubsubCondition, feature=None, text=None):
63        appCondition = domish.Element((NS_PUBSUB_ERRORS, pubsubCondition))
64        if feature:
65            appCondition['feature'] = feature
66        error.StanzaError.__init__(self, condition,
67                                         text=text,
68                                         appCondition=appCondition)
69
70
71
72class BadRequest(error.StanzaError):
73    """
74    Bad request stanza error.
75    """
76    def __init__(self, pubsubCondition=None, text=None):
77        if pubsubCondition:
78            appCondition = domish.Element((NS_PUBSUB_ERRORS, pubsubCondition))
79        else:
80            appCondition = None
81        error.StanzaError.__init__(self, 'bad-request',
82                                         text=text,
83                                         appCondition=appCondition)
84
85
86
87class Unsupported(PubSubError):
88    def __init__(self, feature, text=None):
89        self.feature = feature
90        PubSubError.__init__(self, 'feature-not-implemented',
91                                   'unsupported',
92                                   feature,
93                                   text)
94
95    def __str__(self):
96        message = PubSubError.__str__(self)
97        message += ', feature %r' % self.feature
98        return message
99
100
101class Subscription(object):
102    """
103    A subscription to a node.
104
105    @ivar nodeIdentifier: The identifier of the node subscribed to.  The root
106        node is denoted by C{None}.
107    @type nodeIdentifier: C{unicode}
108
109    @ivar subscriber: The subscribing entity.
110    @type subscriber: L{jid.JID}
111
112    @ivar state: The subscription state. One of C{'subscribed'}, C{'pending'},
113                 C{'unconfigured'}.
114    @type state: C{unicode}
115
116    @ivar options: Optional list of subscription options.
117    @type options: C{dict}
118
119    @ivar subscriptionIdentifier: Optional subscription identifier.
120    @type subscriptionIdentifier: C{unicode}
121    """
122
123    def __init__(self, nodeIdentifier, subscriber, state, options=None,
124                       subscriptionIdentifier=None):
125        self.nodeIdentifier = nodeIdentifier
126        self.subscriber = subscriber
127        self.state = state
128        self.options = options or {}
129        self.subscriptionIdentifier = subscriptionIdentifier
130
131
132    @staticmethod
133    def fromElement(element):
134        return Subscription(
135                element.getAttribute('node'),
136                jid.JID(element.getAttribute('jid')),
137                element.getAttribute('subscription'),
138                subscriptionIdentifier=element.getAttribute('subid'))
139
140
141    def toElement(self):
142        """
143        Return the DOM representation of this subscription.
144
145        @rtype: L{domish.Element}
146        """
147        element = domish.Element((None, 'subscription'))
148        if self.nodeIdentifier:
149            element['node'] = self.nodeIdentifier
150        element['jid'] = unicode(self.subscriber)
151        element['subscription'] = self.state
152        if self.subscriptionIdentifier:
153            element['subid'] = self.subscriptionIdentifier
154        return element
155
156
157
158class Item(domish.Element):
159    """
160    Publish subscribe item.
161
162    This behaves like an object providing L{domish.IElement}.
163
164    Item payload can be added using C{addChild} or C{addRawXml}, or using the
165    C{payload} keyword argument to C{__init__}.
166    """
167
168    def __init__(self, id=None, payload=None):
169        """
170        @param id: optional item identifier
171        @type id: L{unicode}
172        @param payload: optional item payload. Either as a domish element, or
173                        as serialized XML.
174        @type payload: object providing L{domish.IElement} or L{unicode}.
175        """
176
177        domish.Element.__init__(self, (None, 'item'))
178        if id is not None:
179            self['id'] = id
180        if payload is not None:
181            if isinstance(payload, basestring):
182                self.addRawXml(payload)
183            else:
184                self.addChild(payload)
185
186
187
188class PubSubRequest(generic.Stanza):
189    """
190    A publish-subscribe request.
191
192    The set of instance variables used depends on the type of request. If
193    a variable is not applicable or not passed in the request, its value is
194    C{None}.
195
196    @ivar verb: The type of publish-subscribe request. See L{_requestVerbMap}.
197    @type verb: C{str}.
198
199    @ivar affiliations: Affiliations to be modified.
200    @type affiliations: C{set}
201    @ivar items: The items to be published, as L{domish.Element}s.
202    @type items: C{list}
203    @ivar itemIdentifiers: Identifiers of the items to be retrieved or
204                           retracted.
205    @type itemIdentifiers: C{set}
206    @ivar maxItems: Maximum number of items to retrieve.
207    @type maxItems: C{int}.
208    @ivar nodeIdentifier: Identifier of the node the request is about.
209    @type nodeIdentifier: C{unicode}
210    @ivar nodeType: The type of node that should be created, or for which the
211                    configuration is retrieved. C{'leaf'} or C{'collection'}.
212    @type nodeType: C{str}
213    @ivar options: Configurations options for nodes, subscriptions and publish
214                   requests.
215    @type options: L{data_form.Form}
216    @ivar subscriber: The subscribing entity.
217    @type subscriber: L{JID}
218    @ivar subscriptionIdentifier: Identifier for a specific subscription.
219    @type subscriptionIdentifier: C{unicode}
220    @ivar subscriptions: Subscriptions to be modified, as a set of
221                         L{Subscription}.
222    @type subscriptions: C{set}
223    @ivar affiliations: Affiliations to be modified, as a dictionary of entity
224                        (L{JID} to affiliation (C{unicode}).
225    @type affiliations: C{dict}
226    """
227
228    verb = None
229
230    affiliations = None
231    items = None
232    itemIdentifiers = None
233    maxItems = None
234    nodeIdentifier = None
235    nodeType = None
236    options = None
237    subscriber = None
238    subscriptionIdentifier = None
239    subscriptions = None
240    affiliations = None
241
242    # Map request iq type and subelement name to request verb
243    _requestVerbMap = {
244        ('set', NS_PUBSUB, 'publish'): 'publish',
245        ('set', NS_PUBSUB, 'subscribe'): 'subscribe',
246        ('set', NS_PUBSUB, 'unsubscribe'): 'unsubscribe',
247        ('get', NS_PUBSUB, 'options'): 'optionsGet',
248        ('set', NS_PUBSUB, 'options'): 'optionsSet',
249        ('get', NS_PUBSUB, 'subscriptions'): 'subscriptions',
250        ('get', NS_PUBSUB, 'affiliations'): 'affiliations',
251        ('set', NS_PUBSUB, 'create'): 'create',
252        ('get', NS_PUBSUB_OWNER, 'default'): 'default',
253        ('get', NS_PUBSUB_OWNER, 'configure'): 'configureGet',
254        ('set', NS_PUBSUB_OWNER, 'configure'): 'configureSet',
255        ('get', NS_PUBSUB, 'items'): 'items',
256        ('set', NS_PUBSUB, 'retract'): 'retract',
257        ('set', NS_PUBSUB_OWNER, 'purge'): 'purge',
258        ('set', NS_PUBSUB_OWNER, 'delete'): 'delete',
259        ('get', NS_PUBSUB_OWNER, 'affiliations'): 'affiliationsGet',
260        ('set', NS_PUBSUB_OWNER, 'affiliations'): 'affiliationsSet',
261        ('get', NS_PUBSUB_OWNER, 'subscriptions'): 'subscriptionsGet',
262        ('set', NS_PUBSUB_OWNER, 'subscriptions'): 'subscriptionsSet',
263    }
264
265    # Map request verb to request iq type and subelement name
266    _verbRequestMap = dict(((v, k) for k, v in _requestVerbMap.iteritems()))
267
268    # Map request verb to parameter handler names
269    _parameters = {
270        'publish': ['node', 'items'],
271        'subscribe': ['nodeOrEmpty', 'jid', 'optionsWithSubscribe'],
272        'unsubscribe': ['nodeOrEmpty', 'jid', 'subidOrNone'],
273        'optionsGet': ['nodeOrEmpty', 'jid', 'subidOrNone'],
274        'optionsSet': ['nodeOrEmpty', 'jid', 'options', 'subidOrNone'],
275        'subscriptions': [],
276        'affiliations': [],
277        'create': ['nodeOrNone', 'configureOrNone'],
278        'default': ['default'],
279        'configureGet': ['nodeOrEmpty'],
280        'configureSet': ['nodeOrEmpty', 'configure'],
281        'items': ['node', 'maxItems', 'itemIdentifiers', 'subidOrNone'],
282        'retract': ['node', 'itemIdentifiers'],
283        'purge': ['node'],
284        'delete': ['node'],
285        'affiliationsGet': ['nodeOrEmpty'],
286        'affiliationsSet': ['nodeOrEmpty', 'affiliations'],
287        'subscriptionsGet': ['nodeOrEmpty'],
288        'subscriptionsSet': [],
289    }
290
291    def __init__(self, verb=None):
292        self.verb = verb
293
294
295    def _parse_node(self, verbElement):
296        """
297        Parse the required node identifier out of the verbElement.
298        """
299        try:
300            self.nodeIdentifier = verbElement["node"]
301        except KeyError:
302            raise BadRequest('nodeid-required')
303
304
305    def _render_node(self, verbElement):
306        """
307        Render the required node identifier on the verbElement.
308        """
309        if not self.nodeIdentifier:
310            raise Exception("Node identifier is required")
311
312        verbElement['node'] = self.nodeIdentifier
313
314
315    def _parse_nodeOrEmpty(self, verbElement):
316        """
317        Parse the node identifier out of the verbElement. May be empty.
318        """
319        self.nodeIdentifier = verbElement.getAttribute("node", '')
320
321
322    def _render_nodeOrEmpty(self, verbElement):
323        """
324        Render the node identifier on the verbElement. May be empty.
325        """
326        if self.nodeIdentifier:
327            verbElement['node'] = self.nodeIdentifier
328
329
330    def _parse_nodeOrNone(self, verbElement):
331        """
332        Parse the optional node identifier out of the verbElement.
333        """
334        self.nodeIdentifier = verbElement.getAttribute("node")
335
336
337    def _render_nodeOrNone(self, verbElement):
338        """
339        Render the optional node identifier on the verbElement.
340        """
341        if self.nodeIdentifier:
342            verbElement['node'] = self.nodeIdentifier
343
344
345    def _parse_items(self, verbElement):
346        """
347        Parse items out of the verbElement for publish requests.
348        """
349        self.items = []
350        for element in verbElement.elements():
351            if element.uri == NS_PUBSUB and element.name == 'item':
352                self.items.append(element)
353
354
355    def _render_items(self, verbElement):
356        """
357        Render items into the verbElement for publish requests.
358        """
359        if self.items:
360            for item in self.items:
361                verbElement.addChild(item)
362
363
364    def _parse_jid(self, verbElement):
365        """
366        Parse subscriber out of the verbElement for un-/subscribe requests.
367        """
368        try:
369            self.subscriber = jid.internJID(verbElement["jid"])
370        except KeyError:
371            raise BadRequest('jid-required')
372
373
374    def _render_jid(self, verbElement):
375        """
376        Render subscriber into the verbElement for un-/subscribe requests.
377        """
378        verbElement['jid'] = self.subscriber.full()
379
380
381    def _parse_default(self, verbElement):
382        """
383        Parse node type out of a request for the default node configuration.
384        """
385        form = data_form.findForm(verbElement, NS_PUBSUB_NODE_CONFIG)
386        if form and form.formType == 'submit':
387            values = form.getValues()
388            self.nodeType = values.get('pubsub#node_type', 'leaf')
389        else:
390            self.nodeType = 'leaf'
391
392
393    def _parse_configure(self, verbElement):
394        """
395        Parse options out of a request for setting the node configuration.
396        """
397        form = data_form.findForm(verbElement, NS_PUBSUB_NODE_CONFIG)
398        if form:
399            if form.formType in ('submit', 'cancel'):
400                self.options = form
401            else:
402                raise BadRequest(text=u"Unexpected form type '%s'" % form.formType)
403        else:
404            raise BadRequest(text="Missing configuration form")
405
406
407    def _parse_configureOrNone(self, verbElement):
408        """
409        Parse optional node configuration form in create request.
410        """
411        for element in verbElement.parent.elements():
412            if element.uri == NS_PUBSUB and element.name == 'configure':
413                form = data_form.findForm(element, NS_PUBSUB_NODE_CONFIG)
414                if form:
415                    if form.formType != 'submit':
416                        raise BadRequest(text=u"Unexpected form type '%s'" %
417                                              form.formType)
418                else:
419                    form = data_form.Form('submit',
420                                          formNamespace=NS_PUBSUB_NODE_CONFIG)
421                self.options = form
422
423
424    def _render_configureOrNone(self, verbElement):
425        """
426        Render optional node configuration form in create request.
427        """
428        if self.options is not None:
429            configure = verbElement.parent.addElement('configure')
430            configure.addChild(self.options.toElement())
431
432
433    def _parse_itemIdentifiers(self, verbElement):
434        """
435        Parse item identifiers out of items and retract requests.
436        """
437        self.itemIdentifiers = []
438        for element in verbElement.elements():
439            if element.uri == NS_PUBSUB and element.name == 'item':
440                try:
441                    self.itemIdentifiers.append(element["id"])
442                except KeyError:
443                    raise BadRequest()
444
445
446    def _render_itemIdentifiers(self, verbElement):
447        """
448        Render item identifiers into items and retract requests.
449        """
450        if self.itemIdentifiers:
451            for itemIdentifier in self.itemIdentifiers:
452                item = verbElement.addElement('item')
453                item['id'] = itemIdentifier
454
455
456    def _parse_maxItems(self, verbElement):
457        """
458        Parse maximum items out of an items request.
459        """
460        value = verbElement.getAttribute('max_items')
461
462        if value:
463            try:
464                self.maxItems = int(value)
465            except ValueError:
466                raise BadRequest(text="Field max_items requires a positive " +
467                                      "integer value")
468
469
470    def _render_maxItems(self, verbElement):
471        """
472        Render maximum items into an items request.
473        """
474        if self.maxItems:
475            verbElement['max_items'] = unicode(self.maxItems)
476
477
478    def _parse_subidOrNone(self, verbElement):
479        """
480        Parse subscription identifier out of a request.
481        """
482        self.subscriptionIdentifier = verbElement.getAttribute("subid")
483
484
485    def _render_subidOrNone(self, verbElement):
486        """
487        Render subscription identifier into a request.
488        """
489        if self.subscriptionIdentifier:
490            verbElement['subid'] = self.subscriptionIdentifier
491
492
493    def _parse_options(self, verbElement):
494        """
495        Parse options form out of a subscription options request.
496        """
497        form = data_form.findForm(verbElement, NS_PUBSUB_SUBSCRIBE_OPTIONS)
498        if form:
499            if form.formType in ('submit', 'cancel'):
500                self.options = form
501            else:
502                raise BadRequest(text=u"Unexpected form type '%s'" % form.formType)
503        else:
504            raise BadRequest(text="Missing options form")
505
506
507
508    def _render_options(self, verbElement):
509        verbElement.addChild(self.options.toElement())
510
511
512    def _parse_optionsWithSubscribe(self, verbElement):
513        for element in verbElement.parent.elements():
514            if element.name == 'options' and element.uri == NS_PUBSUB:
515                form = data_form.findForm(element,
516                                          NS_PUBSUB_SUBSCRIBE_OPTIONS)
517                if form:
518                    if form.formType != 'submit':
519                        raise BadRequest(text=u"Unexpected form type '%s'" %
520                                              form.formType)
521                else:
522                    form = data_form.Form('submit',
523                                          formNamespace=NS_PUBSUB_SUBSCRIBE_OPTIONS)
524                self.options = form
525
526
527    def _render_optionsWithSubscribe(self, verbElement):
528        if self.options:
529            optionsElement = verbElement.parent.addElement('options')
530            self._render_options(optionsElement)
531
532
533    def _parse_affiliations(self, verbElement):
534        self.affiliations = {}
535        for element in verbElement.elements():
536            if (element.uri == NS_PUBSUB_OWNER and
537                element.name == 'affiliation'):
538                try:
539                    entity = jid.internJID(element['jid']).userhostJID()
540                except KeyError:
541                    raise BadRequest(text='Missing jid attribute')
542
543                if entity in self.affiliations:
544                    raise BadRequest(text='Multiple affiliations for an entity')
545
546                try:
547                    affiliation = element['affiliation']
548                except KeyError:
549                    raise BadRequest(text='Missing affiliation attribute')
550
551                self.affiliations[entity] = affiliation
552
553
554    def parseElement(self, element):
555        """
556        Parse the publish-subscribe verb and parameters out of a request.
557        """
558        generic.Stanza.parseElement(self, element)
559
560        verbs = []
561        verbElements = []
562        for child in element.pubsub.elements():
563            key = (self.stanzaType, child.uri, child.name)
564            try:
565                verb = self._requestVerbMap[key]
566            except KeyError:
567                continue
568
569            verbs.append(verb)
570            verbElements.append(child)
571
572        if not verbs:
573            raise NotImplementedError()
574
575        if len(verbs) > 1:
576            if 'optionsSet' in verbs and 'subscribe' in verbs:
577                self.verb = 'subscribe'
578                verbElement = verbElements[verbs.index('subscribe')]
579            else:
580                raise NotImplementedError()
581        else:
582            self.verb = verbs[0]
583            verbElement = verbElements[0]
584
585        for parameter in self._parameters[self.verb]:
586            getattr(self, '_parse_%s' % parameter)(verbElement)
587
588
589
590    def send(self, xs):
591        """
592        Send this request to its recipient.
593
594        This renders all of the relevant parameters for this specific
595        requests into an L{IQ}, and invoke its C{send} method.
596        This returns a deferred that fires upon reception of a response. See
597        L{IQ} for details.
598
599        @param xs: The XML stream to send the request on.
600        @type xs: L{xmlstream.XmlStream}
601        @rtype: L{defer.Deferred}.
602        """
603
604        try:
605            (self.stanzaType,
606             childURI,
607             childName) = self._verbRequestMap[self.verb]
608        except KeyError:
609            raise NotImplementedError()
610
611        iq = IQ(xs, self.stanzaType)
612        iq.addElement((childURI, 'pubsub'))
613        verbElement = iq.pubsub.addElement(childName)
614
615        if self.sender:
616            iq['from'] = self.sender.full()
617        if self.recipient:
618            iq['to'] = self.recipient.full()
619
620        for parameter in self._parameters[self.verb]:
621            getattr(self, '_render_%s' % parameter)(verbElement)
622
623        return iq.send()
624
625
626
627class PubSubEvent(object):
628    """
629    A publish subscribe event.
630
631    @param sender: The entity from which the notification was received.
632    @type sender: L{jid.JID}
633    @param recipient: The entity to which the notification was sent.
634    @type recipient: L{wokkel.pubsub.ItemsEvent}
635    @param nodeIdentifier: Identifier of the node the event pertains to.
636    @type nodeIdentifier: C{unicode}
637    @param headers: SHIM headers, see L{wokkel.shim.extractHeaders}.
638    @type headers: L{dict}
639    """
640
641    def __init__(self, sender, recipient, nodeIdentifier, headers):
642        self.sender = sender
643        self.recipient = recipient
644        self.nodeIdentifier = nodeIdentifier
645        self.headers = headers
646
647
648
649class ItemsEvent(PubSubEvent):
650    """
651    A publish-subscribe event that signifies new, updated and retracted items.
652
653    @param items: List of received items as domish elements.
654    @type items: C{list} of L{domish.Element}
655    """
656
657    def __init__(self, sender, recipient, nodeIdentifier, items, headers):
658        PubSubEvent.__init__(self, sender, recipient, nodeIdentifier, headers)
659        self.items = items
660
661
662
663class DeleteEvent(PubSubEvent):
664    """
665    A publish-subscribe event that signifies the deletion of a node.
666    """
667
668    redirectURI = None
669
670
671
672class PurgeEvent(PubSubEvent):
673    """
674    A publish-subscribe event that signifies the purging of a node.
675    """
676
677
678
679class PubSubClient(XMPPHandler):
680    """
681    Publish subscribe client protocol.
682    """
683
684    implements(IPubSubClient)
685
686    def connectionInitialized(self):
687        self.xmlstream.addObserver('/message/event[@xmlns="%s"]' %
688                                   NS_PUBSUB_EVENT, self._onEvent)
689
690
691    def _onEvent(self, message):
692        if message.getAttribute('type') == 'error':
693            return
694
695        try:
696            sender = jid.JID(message["from"])
697            recipient = jid.JID(message["to"])
698        except KeyError:
699            return
700
701        actionElement = None
702        for element in message.event.elements():
703            if element.uri == NS_PUBSUB_EVENT:
704                actionElement = element
705
706        if not actionElement:
707            return
708
709        eventHandler = getattr(self, "_onEvent_%s" % actionElement.name, None)
710
711        if eventHandler:
712            headers = shim.extractHeaders(message)
713            eventHandler(sender, recipient, actionElement, headers)
714            message.handled = True
715
716
717    def _onEvent_items(self, sender, recipient, action, headers):
718        nodeIdentifier = action["node"]
719
720        items = [element for element in action.elements()
721                         if element.name in ('item', 'retract')]
722
723        event = ItemsEvent(sender, recipient, nodeIdentifier, items, headers)
724        self.itemsReceived(event)
725
726
727    def _onEvent_delete(self, sender, recipient, action, headers):
728        nodeIdentifier = action["node"]
729        event = DeleteEvent(sender, recipient, nodeIdentifier, headers)
730        if action.redirect:
731            event.redirectURI = action.redirect.getAttribute('uri')
732        self.deleteReceived(event)
733
734
735    def _onEvent_purge(self, sender, recipient, action, headers):
736        nodeIdentifier = action["node"]
737        event = PurgeEvent(sender, recipient, nodeIdentifier, headers)
738        self.purgeReceived(event)
739
740
741    def itemsReceived(self, event):
742        pass
743
744
745    def deleteReceived(self, event):
746        pass
747
748
749    def purgeReceived(self, event):
750        pass
751
752
753    def createNode(self, service, nodeIdentifier=None, options=None,
754                         sender=None):
755        """
756        Create a publish subscribe node.
757
758        @param service: The publish subscribe service to create the node at.
759        @type service: L{JID}
760        @param nodeIdentifier: Optional suggestion for the id of the node.
761        @type nodeIdentifier: C{unicode}
762        @param options: Optional node configuration options.
763        @type options: C{dict}
764        """
765        request = PubSubRequest('create')
766        request.recipient = service
767        request.nodeIdentifier = nodeIdentifier
768        request.sender = sender
769
770        if options:
771            form = data_form.Form(formType='submit',
772                                  formNamespace=NS_PUBSUB_NODE_CONFIG)
773            form.makeFields(options)
774            request.options = form
775
776        def cb(iq):
777            try:
778                new_node = iq.pubsub.create["node"]
779            except AttributeError:
780                # the suggested node identifier was accepted
781                new_node = nodeIdentifier
782            return new_node
783
784        d = request.send(self.xmlstream)
785        d.addCallback(cb)
786        return d
787
788
789    def deleteNode(self, service, nodeIdentifier, sender=None):
790        """
791        Delete a publish subscribe node.
792
793        @param service: The publish subscribe service to delete the node from.
794        @type service: L{JID}
795        @param nodeIdentifier: The identifier of the node.
796        @type nodeIdentifier: C{unicode}
797        """
798        request = PubSubRequest('delete')
799        request.recipient = service
800        request.nodeIdentifier = nodeIdentifier
801        request.sender = sender
802        return request.send(self.xmlstream)
803
804
805    def subscribe(self, service, nodeIdentifier, subscriber,
806                        options=None, sender=None):
807        """
808        Subscribe to a publish subscribe node.
809
810        @param service: The publish subscribe service that keeps the node.
811        @type service: L{JID}
812
813        @param nodeIdentifier: The identifier of the node.
814        @type nodeIdentifier: C{unicode}
815
816        @param subscriber: The entity to subscribe to the node. This entity
817            will get notifications of new published items.
818        @type subscriber: L{JID}
819
820        @param options: Subscription options.
821        @type options: C{dict}
822
823        @return: Deferred that fires with L{Subscription} or errbacks with
824            L{SubscriptionPending} or L{SubscriptionUnconfigured}.
825        @rtype: L{defer.Deferred}
826        """
827        request = PubSubRequest('subscribe')
828        request.recipient = service
829        request.nodeIdentifier = nodeIdentifier
830        request.subscriber = subscriber
831        request.sender = sender
832
833        if options:
834            form = data_form.Form(formType='submit',
835                                  formNamespace=NS_PUBSUB_SUBSCRIBE_OPTIONS)
836            form.makeFields(options)
837            request.options = form
838
839        def cb(iq):
840            subscription = Subscription.fromElement(iq.pubsub.subscription)
841
842            if subscription.state == 'pending':
843                raise SubscriptionPending()
844            elif subscription.state == 'unconfigured':
845                raise SubscriptionUnconfigured()
846            else:
847                # we assume subscription == 'subscribed'
848                # any other value would be invalid, but that should have
849                # yielded a stanza error.
850                return subscription
851
852        d = request.send(self.xmlstream)
853        d.addCallback(cb)
854        return d
855
856
857    def unsubscribe(self, service, nodeIdentifier, subscriber,
858                          subscriptionIdentifier=None, sender=None):
859        """
860        Unsubscribe from a publish subscribe node.
861
862        @param service: The publish subscribe service that keeps the node.
863        @type service: L{JID}
864
865        @param nodeIdentifier: The identifier of the node.
866        @type nodeIdentifier: C{unicode}
867
868        @param subscriber: The entity to unsubscribe from the node.
869        @type subscriber: L{JID}
870
871        @param subscriptionIdentifier: Optional subscription identifier.
872        @type subscriptionIdentifier: C{unicode}
873        """
874        request = PubSubRequest('unsubscribe')
875        request.recipient = service
876        request.nodeIdentifier = nodeIdentifier
877        request.subscriber = subscriber
878        request.subscriptionIdentifier = subscriptionIdentifier
879        request.sender = sender
880        return request.send(self.xmlstream)
881
882
883    def publish(self, service, nodeIdentifier, items=None, sender=None):
884        """
885        Publish to a publish subscribe node.
886
887        @param service: The publish subscribe service that keeps the node.
888        @type service: L{JID}
889        @param nodeIdentifier: The identifier of the node.
890        @type nodeIdentifier: C{unicode}
891        @param items: Optional list of L{Item}s to publish.
892        @type items: C{list}
893        """
894        request = PubSubRequest('publish')
895        request.recipient = service
896        request.nodeIdentifier = nodeIdentifier
897        request.items = items
898        request.sender = sender
899        return request.send(self.xmlstream)
900
901
902    def items(self, service, nodeIdentifier, maxItems=None,
903              subscriptionIdentifier=None, sender=None):
904        """
905        Retrieve previously published items from a publish subscribe node.
906
907        @param service: The publish subscribe service that keeps the node.
908        @type service: L{JID}
909
910        @param nodeIdentifier: The identifier of the node.
911        @type nodeIdentifier: C{unicode}
912
913        @param maxItems: Optional limit on the number of retrieved items.
914        @type maxItems: C{int}
915
916        @param subscriptionIdentifier: Optional subscription identifier. In
917            case the node has been subscribed to multiple times, this narrows
918            the results to the specific subscription.
919        @type subscriptionIdentifier: C{unicode}
920        """
921        request = PubSubRequest('items')
922        request.recipient = service
923        request.nodeIdentifier = nodeIdentifier
924        if maxItems:
925            request.maxItems = str(int(maxItems))
926        request.subscriptionIdentifier = subscriptionIdentifier
927        request.sender = sender
928
929        def cb(iq):
930            items = []
931            for element in iq.pubsub.items.elements():
932                if element.uri == NS_PUBSUB and element.name == 'item':
933                    items.append(element)
934            return items
935
936        d = request.send(self.xmlstream)
937        d.addCallback(cb)
938        return d
939
940
941    def getOptions(self, service, nodeIdentifier, subscriber,
942                         subscriptionIdentifier=None, sender=None):
943        """
944        Get subscription options.
945
946        @param service: The publish subscribe service that keeps the node.
947        @type service: L{JID}
948
949        @param nodeIdentifier: The identifier of the node.
950        @type nodeIdentifier: C{unicode}
951
952        @param subscriber: The entity subscribed to the node.
953        @type subscriber: L{JID}
954
955        @param subscriptionIdentifier: Optional subscription identifier.
956        @type subscriptionIdentifier: C{unicode}
957
958        @rtype: L{data_form.Form}
959        """
960        request = PubSubRequest('optionsGet')
961        request.recipient = service
962        request.nodeIdentifier = nodeIdentifier
963        request.subscriber = subscriber
964        request.subscriptionIdentifier = subscriptionIdentifier
965        request.sender = sender
966
967        def cb(iq):
968            form = data_form.findForm(iq.pubsub.options,
969                                      NS_PUBSUB_SUBSCRIBE_OPTIONS)
970            form.typeCheck()
971            return form
972
973        d = request.send(self.xmlstream)
974        d.addCallback(cb)
975        return d
976
977
978    def setOptions(self, service, nodeIdentifier, subscriber,
979                         options, subscriptionIdentifier=None, sender=None):
980        """
981        Set subscription options.
982
983        @param service: The publish subscribe service that keeps the node.
984        @type service: L{JID}
985
986        @param nodeIdentifier: The identifier of the node.
987        @type nodeIdentifier: C{unicode}
988
989        @param subscriber: The entity subscribed to the node.
990        @type subscriber: L{JID}
991
992        @param options: Subscription options.
993        @type options: C{dict}.
994
995        @param subscriptionIdentifier: Optional subscription identifier.
996        @type subscriptionIdentifier: C{unicode}
997        """
998        request = PubSubRequest('optionsSet')
999        request.recipient = service
1000        request.nodeIdentifier = nodeIdentifier
1001        request.subscriber = subscriber
1002        request.subscriptionIdentifier = subscriptionIdentifier
1003        request.sender = sender
1004
1005        form = data_form.Form(formType='submit',
1006                              formNamespace=NS_PUBSUB_SUBSCRIBE_OPTIONS)
1007        form.makeFields(options)
1008        request.options = form
1009
1010        d = request.send(self.xmlstream)
1011        return d
1012
1013
1014
1015class PubSubService(XMPPHandler, IQHandlerMixin):
1016    """
1017    Protocol implementation for a XMPP Publish Subscribe Service.
1018
1019    The word Service here is used as taken from the Publish Subscribe
1020    specification. It is the party responsible for keeping nodes and their
1021    subscriptions, and sending out notifications.
1022
1023    Methods from the L{IPubSubService} interface that are called as
1024    a result of an XMPP request may raise exceptions. Alternatively the
1025    deferred returned by these methods may have their errback called. These are
1026    handled as follows:
1027
1028     - If the exception is an instance of L{error.StanzaError}, an error
1029       response iq is returned.
1030     - Any other exception is reported using L{log.msg}. An error response
1031       with the condition C{internal-server-error} is returned.
1032
1033    The default implementation of said methods raises an L{Unsupported}
1034    exception and are meant to be overridden.
1035
1036    @ivar discoIdentity: Service discovery identity as a dictionary with
1037                         keys C{'category'}, C{'type'} and C{'name'}.
1038    @ivar pubSubFeatures: List of supported publish-subscribe features for
1039                          service discovery, as C{str}.
1040    @type pubSubFeatures: C{list} or C{None}
1041    """
1042
1043    implements(IPubSubService, disco.IDisco)
1044
1045    iqHandlers = {
1046            '/*': '_onPubSubRequest',
1047            }
1048
1049    _legacyHandlers = {
1050        'publish': ('publish', ['sender', 'recipient',
1051                                'nodeIdentifier', 'items']),
1052        'subscribe': ('subscribe', ['sender', 'recipient',
1053                                    'nodeIdentifier', 'subscriber']),
1054        'unsubscribe': ('unsubscribe', ['sender', 'recipient',
1055                                        'nodeIdentifier', 'subscriber']),
1056        'subscriptions': ('subscriptions', ['sender', 'recipient']),
1057        'affiliations': ('affiliations', ['sender', 'recipient']),
1058        'create': ('create', ['sender', 'recipient', 'nodeIdentifier']),
1059        'getConfigurationOptions': ('getConfigurationOptions', []),
1060        'default': ('getDefaultConfiguration',
1061                    ['sender', 'recipient', 'nodeType']),
1062        'configureGet': ('getConfiguration', ['sender', 'recipient',
1063                                              'nodeIdentifier']),
1064        'configureSet': ('setConfiguration', ['sender', 'recipient',
1065                                              'nodeIdentifier', 'options']),
1066        'items': ('items', ['sender', 'recipient', 'nodeIdentifier',
1067                            'maxItems', 'itemIdentifiers']),
1068        'retract': ('retract', ['sender', 'recipient', 'nodeIdentifier',
1069                                'itemIdentifiers']),
1070        'purge': ('purge', ['sender', 'recipient', 'nodeIdentifier']),
1071        'delete': ('delete', ['sender', 'recipient', 'nodeIdentifier']),
1072    }
1073
1074    hideNodes = False
1075
1076    def __init__(self, resource=None):
1077        self.resource = resource
1078        self.discoIdentity = {'category': 'pubsub',
1079                              'type': 'service',
1080                              'name': 'Generic Publish-Subscribe Service'}
1081
1082        self.pubSubFeatures = []
1083
1084
1085    def connectionMade(self):
1086        self.xmlstream.addObserver(PUBSUB_REQUEST, self.handleRequest)
1087
1088
1089    def getDiscoInfo(self, requestor, target, nodeIdentifier=''):
1090        def toInfo(nodeInfo):
1091            if not nodeInfo:
1092                return
1093
1094            (nodeType, metaData) = nodeInfo['type'], nodeInfo['meta-data']
1095            info.append(disco.DiscoIdentity('pubsub', nodeType))
1096            if metaData:
1097                form = data_form.Form(formType="result",
1098                                      formNamespace=NS_PUBSUB_META_DATA)
1099                form.addField(
1100                        data_form.Field(
1101                            var='pubsub#node_type',
1102                            value=nodeType,
1103                            label='The type of node (collection or leaf)'
1104                        )
1105                )
1106
1107                for metaDatum in metaData:
1108                    form.addField(data_form.Field.fromDict(metaDatum))
1109
1110                info.append(form)
1111
1112            return
1113
1114        info = []
1115
1116        request = PubSubRequest('discoInfo')
1117
1118        if self.resource is not None:
1119            resource = self.resource.locateResource(request)
1120            identity = resource.discoIdentity
1121            features = resource.features
1122            getInfo = resource.getInfo
1123        else:
1124            category = self.discoIdentity['category']
1125            idType = self.discoIdentity['type']
1126            name = self.discoIdentity['name']
1127            identity = disco.DiscoIdentity(category, idType, name)
1128            features = self.pubSubFeatures
1129            getInfo = self.getNodeInfo
1130
1131        if not nodeIdentifier:
1132            info.append(identity)
1133            info.append(disco.DiscoFeature(disco.NS_DISCO_ITEMS))
1134            info.extend([disco.DiscoFeature("%s#%s" % (NS_PUBSUB, feature))
1135                         for feature in features])
1136
1137        d = defer.maybeDeferred(getInfo, requestor, target, nodeIdentifier or '')
1138        d.addCallback(toInfo)
1139        d.addErrback(log.err)
1140        d.addCallback(lambda _: info)
1141        return d
1142
1143
1144    def getDiscoItems(self, requestor, target, nodeIdentifier=''):
1145        if self.hideNodes:
1146            d = defer.succeed([])
1147        elif self.resource is not None:
1148            request = PubSubRequest('discoInfo')
1149            resource = self.resource.locateResource(request)
1150            d = resource.getNodes(requestor, target, nodeIdentifier)
1151        elif nodeIdentifier:
1152            d = self.getNodes(requestor, target)
1153        else:
1154            d = defer.succeed([])
1155
1156        d.addCallback(lambda nodes: [disco.DiscoItem(target, node)
1157                                     for node in nodes])
1158        return d
1159
1160
1161    def _onPubSubRequest(self, iq):
1162        request = PubSubRequest.fromElement(iq)
1163
1164        if self.resource is not None:
1165            resource = self.resource.locateResource(request)
1166        else:
1167            resource = self
1168
1169        # Preprocess the request, knowing the handling resource
1170        try:
1171            preProcessor = getattr(self, '_preProcess_%s' % request.verb)
1172        except AttributeError:
1173            pass
1174        else:
1175            request = preProcessor(resource, request)
1176            if request is None:
1177                return defer.succeed(None)
1178
1179        # Process the request itself,
1180        if resource is not self:
1181            try:
1182                handler = getattr(resource, request.verb)
1183            except AttributeError:
1184                # fix lookup feature
1185                text = "Request verb: %s" % request.verb
1186                return defer.fail(Unsupported('', text))
1187
1188            d = handler(request)
1189        else:
1190            handlerName, argNames = self._legacyHandlers[request.verb]
1191            handler = getattr(self, handlerName)
1192            args = [getattr(request, arg) for arg in argNames]
1193            if 'options' in argNames:
1194                args[argNames.index('options')] = request.options.getValues()
1195            d = handler(*args)
1196
1197        # If needed, translate the result into a response
1198        try:
1199            cb = getattr(self, '_toResponse_%s' % request.verb)
1200        except AttributeError:
1201            pass
1202        else:
1203            d.addCallback(cb, resource, request)
1204
1205        return d
1206
1207
1208    def _toResponse_subscribe(self, result, resource, request):
1209        response = domish.Element((NS_PUBSUB, "pubsub"))
1210        response.addChild(result.toElement())
1211        return response
1212
1213
1214    def _toResponse_subscriptions(self, result, resource, request):
1215        response = domish.Element((NS_PUBSUB, 'pubsub'))
1216        subscriptions = response.addElement('subscriptions')
1217        for subscription in result:
1218            subscriptions.addChild(subscription.toElement())
1219        return response
1220
1221
1222    def _toResponse_affiliations(self, result, resource, request):
1223        response = domish.Element((NS_PUBSUB, 'pubsub'))
1224        affiliations = response.addElement('affiliations')
1225
1226        for nodeIdentifier, affiliation in result:
1227            item = affiliations.addElement('affiliation')
1228            item['node'] = nodeIdentifier
1229            item['affiliation'] = affiliation
1230
1231        return response
1232
1233
1234    def _toResponse_create(self, result, resource, request):
1235        if not request.nodeIdentifier or request.nodeIdentifier != result:
1236            response = domish.Element((NS_PUBSUB, 'pubsub'))
1237            create = response.addElement('create')
1238            create['node'] = result
1239            return response
1240        else:
1241            return None
1242
1243
1244    def _formFromConfiguration(self, resource, values):
1245        fieldDefs = resource.getConfigurationOptions()
1246        form = data_form.Form(formType="form",
1247                              formNamespace=NS_PUBSUB_NODE_CONFIG)
1248        form.makeFields(values, fieldDefs)
1249        return form
1250
1251
1252    def _checkConfiguration(self, resource, form):
1253        fieldDefs = resource.getConfigurationOptions()
1254        form.typeCheck(fieldDefs, filterUnknown=True)
1255
1256
1257    def _preProcess_create(self, resource, request):
1258        if request.options:
1259            self._checkConfiguration(resource, request.options)
1260        return request
1261
1262
1263    def _preProcess_default(self, resource, request):
1264        if request.nodeType not in ('leaf', 'collection'):
1265            raise error.StanzaError('not-acceptable')
1266        else:
1267            return request
1268
1269
1270    def _toResponse_default(self, options, resource, request):
1271        response = domish.Element((NS_PUBSUB_OWNER, "pubsub"))
1272        default = response.addElement("default")
1273        form = self._formFromConfiguration(resource, options)
1274        default.addChild(form.toElement())
1275        return response
1276
1277
1278    def _toResponse_configureGet(self, options, resource, request):
1279        response = domish.Element((NS_PUBSUB_OWNER, "pubsub"))
1280        configure = response.addElement("configure")
1281        form = self._formFromConfiguration(resource, options)
1282        configure.addChild(form.toElement())
1283
1284        if request.nodeIdentifier:
1285            configure["node"] = request.nodeIdentifier
1286
1287        return response
1288
1289
1290    def _preProcess_configureSet(self, resource, request):
1291        if request.options.formType == 'cancel':
1292            return None
1293        else:
1294            self._checkConfiguration(resource, request.options)
1295            return request
1296
1297
1298    def _toResponse_items(self, result, resource, request):
1299        response = domish.Element((NS_PUBSUB, 'pubsub'))
1300        items = response.addElement('items')
1301        items["node"] = request.nodeIdentifier
1302
1303        for item in result:
1304            items.addChild(item)
1305
1306        return response
1307
1308
1309    def _createNotification(self, eventType, service, nodeIdentifier,
1310                                  subscriber, subscriptions=None):
1311        headers = []
1312
1313        if subscriptions:
1314            for subscription in subscriptions:
1315                if nodeIdentifier != subscription.nodeIdentifier:
1316                    headers.append(('Collection', subscription.nodeIdentifier))
1317
1318        message = domish.Element((None, "message"))
1319        message["from"] = service.full()
1320        message["to"] = subscriber.full()
1321        event = message.addElement((NS_PUBSUB_EVENT, "event"))
1322
1323        element = event.addElement(eventType)
1324        element["node"] = nodeIdentifier
1325
1326        if headers:
1327            message.addChild(shim.Headers(headers))
1328
1329        return message
1330
1331
1332    def _toResponse_affiliationsGet(self, result, resource, request):
1333        response = domish.Element((NS_PUBSUB_OWNER, 'pubsub'))
1334        affiliations = response.addElement('affiliations')
1335
1336        if request.nodeIdentifier:
1337            affiliations['node'] = request.nodeIdentifier
1338
1339        for entity, affiliation in result.iteritems():
1340            item = affiliations.addElement('affiliation')
1341            item['jid'] = entity.full()
1342            item['affiliation'] = affiliation
1343
1344        return response
1345
1346
1347    # public methods
1348
1349    def notifyPublish(self, service, nodeIdentifier, notifications):
1350        for subscriber, subscriptions, items in notifications:
1351            message = self._createNotification('items', service,
1352                                               nodeIdentifier, subscriber,
1353                                               subscriptions)
1354            message.event.items.children = items
1355            self.send(message)
1356
1357
1358    def notifyDelete(self, service, nodeIdentifier, subscribers,
1359                           redirectURI=None):
1360        for subscriber in subscribers:
1361            message = self._createNotification('delete', service,
1362                                               nodeIdentifier,
1363                                               subscriber)
1364            if redirectURI:
1365                redirect = message.event.delete.addElement('redirect')
1366                redirect['uri'] = redirectURI
1367            self.send(message)
1368
1369
1370    def getNodeInfo(self, requestor, service, nodeIdentifier):
1371        return None
1372
1373
1374    def getNodes(self, requestor, service):
1375        return []
1376
1377
1378    def publish(self, requestor, service, nodeIdentifier, items):
1379        raise Unsupported('publish')
1380
1381
1382    def subscribe(self, requestor, service, nodeIdentifier, subscriber):
1383        raise Unsupported('subscribe')
1384
1385
1386    def unsubscribe(self, requestor, service, nodeIdentifier, subscriber):
1387        raise Unsupported('subscribe')
1388
1389
1390    def subscriptions(self, requestor, service):
1391        raise Unsupported('retrieve-subscriptions')
1392
1393
1394    def affiliations(self, requestor, service):
1395        raise Unsupported('retrieve-affiliations')
1396
1397
1398    def create(self, requestor, service, nodeIdentifier):
1399        raise Unsupported('create-nodes')
1400
1401
1402    def getConfigurationOptions(self):
1403        return {}
1404
1405
1406    def getDefaultConfiguration(self, requestor, service, nodeType):
1407        raise Unsupported('retrieve-default')
1408
1409
1410    def getConfiguration(self, requestor, service, nodeIdentifier):
1411        raise Unsupported('config-node')
1412
1413
1414    def setConfiguration(self, requestor, service, nodeIdentifier, options):
1415        raise Unsupported('config-node')
1416
1417
1418    def items(self, requestor, service, nodeIdentifier, maxItems,
1419                    itemIdentifiers):
1420        raise Unsupported('retrieve-items')
1421
1422
1423    def retract(self, requestor, service, nodeIdentifier, itemIdentifiers):
1424        raise Unsupported('retract-items')
1425
1426
1427    def purge(self, requestor, service, nodeIdentifier):
1428        raise Unsupported('purge-nodes')
1429
1430
1431    def delete(self, requestor, service, nodeIdentifier):
1432        raise Unsupported('delete-nodes')
1433
1434
1435
1436class PubSubResource(object):
1437
1438    implements(IPubSubResource)
1439
1440    features = []
1441    discoIdentity = disco.DiscoIdentity('pubsub',
1442                                        'service',
1443                                        'Publish-Subscribe Service')
1444
1445
1446    def locateResource(self, request):
1447        return self
1448
1449
1450    def getInfo(self, requestor, service, nodeIdentifier):
1451        return defer.succeed(None)
1452
1453
1454    def getNodes(self, requestor, service, nodeIdentifier):
1455        return defer.succeed([])
1456
1457
1458    def getConfigurationOptions(self):
1459        return {}
1460
1461
1462    def publish(self, request):
1463        return defer.fail(Unsupported('publish'))
1464
1465
1466    def subscribe(self, request):
1467        return defer.fail(Unsupported('subscribe'))
1468
1469
1470    def unsubscribe(self, request):
1471        return defer.fail(Unsupported('subscribe'))
1472
1473
1474    def subscriptions(self, request):
1475        return defer.fail(Unsupported('retrieve-subscriptions'))
1476
1477
1478    def affiliations(self, request):
1479        return defer.fail(Unsupported('retrieve-affiliations'))
1480
1481
1482    def create(self, request):
1483        return defer.fail(Unsupported('create-nodes'))
1484
1485
1486    def default(self, request):
1487        return defer.fail(Unsupported('retrieve-default'))
1488
1489
1490    def configureGet(self, request):
1491        return defer.fail(Unsupported('config-node'))
1492
1493
1494    def configureSet(self, request):
1495        return defer.fail(Unsupported('config-node'))
1496
1497
1498    def items(self, request):
1499        return defer.fail(Unsupported('retrieve-items'))
1500
1501
1502    def retract(self, request):
1503        return defer.fail(Unsupported('retract-items'))
1504
1505
1506    def purge(self, request):
1507        return defer.fail(Unsupported('purge-nodes'))
1508
1509
1510    def delete(self, request):
1511        return defer.fail(Unsupported('delete-nodes'))
1512
1513
1514    def affiliationsGet(self, request):
1515        return defer.fail(Unsupported('modify-affiliations'))
1516
1517
1518    def affiliationsSet(self, request):
1519        return defer.fail(Unsupported('modify-affiliations'))
1520
1521
1522    def subscriptionsGet(self, request):
1523        return defer.fail(Unsupported('manage-subscriptions'))
1524
1525
1526    def subscriptionsSet(self, request):
1527        return defer.fail(Unsupported('manage-subscriptions'))
Note: See TracBrowser for help on using the repository browser.