source: wokkel/pubsub.py @ 83:81daaca3d75f

Last change on this file since 83:81daaca3d75f was 83:81daaca3d75f, checked in by Ralph Meijer <ralphm@…>, 13 years ago

Add support for subscription options in subscribe requests

Fixes #63.

File size: 44.7 KB
Line 
1# -*- test-case-name: wokkel.test.test_pubsub -*-
2#
3# Copyright (c) 2003-2010 Ralph Meijer
4# See LICENSE for details.
5
6"""
7XMPP publish-subscribe protocol.
8
9This protocol is specified in
10U{XEP-0060<http://www.xmpp.org/extensions/xep-0060.html>}.
11"""
12
13from zope.interface import implements
14
15from twisted.internet import defer
16from twisted.python import log
17from twisted.words.protocols.jabber import jid, error
18from twisted.words.xish import domish
19
20from wokkel import disco, data_form, generic, shim
21from wokkel.compat import IQ
22from wokkel.subprotocols import IQHandlerMixin, XMPPHandler
23from wokkel.iwokkel import IPubSubClient, IPubSubService, IPubSubResource
24
25# Iq get and set XPath queries
26IQ_GET = '/iq[@type="get"]'
27IQ_SET = '/iq[@type="set"]'
28
29# Publish-subscribe namespaces
30NS_PUBSUB = 'http://jabber.org/protocol/pubsub'
31NS_PUBSUB_EVENT = NS_PUBSUB + '#event'
32NS_PUBSUB_ERRORS = NS_PUBSUB + '#errors'
33NS_PUBSUB_OWNER = NS_PUBSUB + "#owner"
34NS_PUBSUB_NODE_CONFIG = NS_PUBSUB + "#node_config"
35NS_PUBSUB_META_DATA = NS_PUBSUB + "#meta-data"
36NS_PUBSUB_SUBSCRIBE_OPTIONS = NS_PUBSUB + "#subscribe_options"
37
38# XPath to match pubsub requests
39PUBSUB_REQUEST = '/iq[@type="get" or @type="set"]/' + \
40                    'pubsub[@xmlns="' + NS_PUBSUB + '" or ' + \
41                           '@xmlns="' + NS_PUBSUB_OWNER + '"]'
42
43class SubscriptionPending(Exception):
44    """
45    Raised when the requested subscription is pending acceptance.
46    """
47
48
49
50class SubscriptionUnconfigured(Exception):
51    """
52    Raised when the requested subscription needs to be configured before
53    becoming active.
54    """
55
56
57
58class PubSubError(error.StanzaError):
59    """
60    Exception with publish-subscribe specific condition.
61    """
62    def __init__(self, condition, pubsubCondition, feature=None, text=None):
63        appCondition = domish.Element((NS_PUBSUB_ERRORS, pubsubCondition))
64        if feature:
65            appCondition['feature'] = feature
66        error.StanzaError.__init__(self, condition,
67                                         text=text,
68                                         appCondition=appCondition)
69
70
71
72class BadRequest(error.StanzaError):
73    """
74    Bad request stanza error.
75    """
76    def __init__(self, pubsubCondition=None, text=None):
77        if pubsubCondition:
78            appCondition = domish.Element((NS_PUBSUB_ERRORS, pubsubCondition))
79        else:
80            appCondition = None
81        error.StanzaError.__init__(self, 'bad-request',
82                                         text=text,
83                                         appCondition=appCondition)
84
85
86
87class Unsupported(PubSubError):
88    def __init__(self, feature, text=None):
89        self.feature = feature
90        PubSubError.__init__(self, 'feature-not-implemented',
91                                   'unsupported',
92                                   feature,
93                                   text)
94
95    def __str__(self):
96        message = PubSubError.__str__(self)
97        message += ', feature %r' % self.feature
98        return message
99
100
101class Subscription(object):
102    """
103    A subscription to a node.
104
105    @ivar nodeIdentifier: The identifier of the node subscribed to.
106                          The root node is denoted by C{None}.
107    @ivar subscriber: The subscribing entity.
108    @ivar state: The subscription state. One of C{'subscribed'}, C{'pending'},
109                 C{'unconfigured'}.
110    @ivar options: Optional list of subscription options.
111    @type options: C{dict}.
112    """
113
114    def __init__(self, nodeIdentifier, subscriber, state, options=None):
115        self.nodeIdentifier = nodeIdentifier
116        self.subscriber = subscriber
117        self.state = state
118        self.options = options or {}
119
120
121
122class Item(domish.Element):
123    """
124    Publish subscribe item.
125
126    This behaves like an object providing L{domish.IElement}.
127
128    Item payload can be added using C{addChild} or C{addRawXml}, or using the
129    C{payload} keyword argument to C{__init__}.
130    """
131
132    def __init__(self, id=None, payload=None):
133        """
134        @param id: optional item identifier
135        @type id: L{unicode}
136        @param payload: optional item payload. Either as a domish element, or
137                        as serialized XML.
138        @type payload: object providing L{domish.IElement} or L{unicode}.
139        """
140
141        domish.Element.__init__(self, (NS_PUBSUB, 'item'))
142        if id is not None:
143            self['id'] = id
144        if payload is not None:
145            if isinstance(payload, basestring):
146                self.addRawXml(payload)
147            else:
148                self.addChild(payload)
149
150
151
152class PubSubRequest(generic.Stanza):
153    """
154    A publish-subscribe request.
155
156    The set of instance variables used depends on the type of request. If
157    a variable is not applicable or not passed in the request, its value is
158    C{None}.
159
160    @ivar verb: The type of publish-subscribe request. See L{_requestVerbMap}.
161    @type verb: C{str}.
162
163    @ivar affiliations: Affiliations to be modified.
164    @type affiliations: C{set}
165    @ivar items: The items to be published, as L{domish.Element}s.
166    @type items: C{list}
167    @ivar itemIdentifiers: Identifiers of the items to be retrieved or
168                           retracted.
169    @type itemIdentifiers: C{set}
170    @ivar maxItems: Maximum number of items to retrieve.
171    @type maxItems: C{int}.
172    @ivar nodeIdentifier: Identifier of the node the request is about.
173    @type nodeIdentifier: C{unicode}
174    @ivar nodeType: The type of node that should be created, or for which the
175                    configuration is retrieved. C{'leaf'} or C{'collection'}.
176    @type nodeType: C{str}
177    @ivar options: Configurations options for nodes, subscriptions and publish
178                   requests.
179    @type options: L{data_form.Form}
180    @ivar subscriber: The subscribing entity.
181    @type subscriber: L{JID}
182    @ivar subscriptionIdentifier: Identifier for a specific subscription.
183    @type subscriptionIdentifier: C{unicode}
184    @ivar subscriptions: Subscriptions to be modified, as a set of
185                         L{Subscription}.
186    @type subscriptions: C{set}
187    """
188
189    verb = None
190
191    affiliations = None
192    items = None
193    itemIdentifiers = None
194    maxItems = None
195    nodeIdentifier = None
196    nodeType = None
197    options = None
198    subscriber = None
199    subscriptionIdentifier = None
200    subscriptions = None
201
202    # Map request iq type and subelement name to request verb
203    _requestVerbMap = {
204        ('set', NS_PUBSUB, 'publish'): 'publish',
205        ('set', NS_PUBSUB, 'subscribe'): 'subscribe',
206        ('set', NS_PUBSUB, 'unsubscribe'): 'unsubscribe',
207        ('get', NS_PUBSUB, 'options'): 'optionsGet',
208        ('set', NS_PUBSUB, 'options'): 'optionsSet',
209        ('get', NS_PUBSUB, 'subscriptions'): 'subscriptions',
210        ('get', NS_PUBSUB, 'affiliations'): 'affiliations',
211        ('set', NS_PUBSUB, 'create'): 'create',
212        ('get', NS_PUBSUB_OWNER, 'default'): 'default',
213        ('get', NS_PUBSUB_OWNER, 'configure'): 'configureGet',
214        ('set', NS_PUBSUB_OWNER, 'configure'): 'configureSet',
215        ('get', NS_PUBSUB, 'items'): 'items',
216        ('set', NS_PUBSUB, 'retract'): 'retract',
217        ('set', NS_PUBSUB_OWNER, 'purge'): 'purge',
218        ('set', NS_PUBSUB_OWNER, 'delete'): 'delete',
219        ('get', NS_PUBSUB_OWNER, 'affiliations'): 'affiliationsGet',
220        ('set', NS_PUBSUB_OWNER, 'affiliations'): 'affiliationsSet',
221        ('get', NS_PUBSUB_OWNER, 'subscriptions'): 'subscriptionsGet',
222        ('set', NS_PUBSUB_OWNER, 'subscriptions'): 'subscriptionsSet',
223    }
224
225    # Map request verb to request iq type and subelement name
226    _verbRequestMap = dict(((v, k) for k, v in _requestVerbMap.iteritems()))
227
228    # Map request verb to parameter handler names
229    _parameters = {
230        'publish': ['node', 'items'],
231        'subscribe': ['nodeOrEmpty', 'jid', 'optionsWithSubscribe'],
232        'unsubscribe': ['nodeOrEmpty', 'jid'],
233        'optionsGet': ['nodeOrEmpty', 'jid'],
234        'optionsSet': ['nodeOrEmpty', 'jid', 'options'],
235        'subscriptions': [],
236        'affiliations': [],
237        'create': ['nodeOrNone', 'configureOrNone'],
238        'default': ['default'],
239        'configureGet': ['nodeOrEmpty'],
240        'configureSet': ['nodeOrEmpty', 'configure'],
241        'items': ['node', 'maxItems', 'itemIdentifiers'],
242        'retract': ['node', 'itemIdentifiers'],
243        'purge': ['node'],
244        'delete': ['node'],
245        'affiliationsGet': ['nodeOrEmpty'],
246        'affiliationsSet': [],
247        'subscriptionsGet': ['nodeOrEmpty'],
248        'subscriptionsSet': [],
249    }
250
251    def __init__(self, verb=None):
252        self.verb = verb
253
254
255    def _parse_node(self, verbElement):
256        """
257        Parse the required node identifier out of the verbElement.
258        """
259        try:
260            self.nodeIdentifier = verbElement["node"]
261        except KeyError:
262            raise BadRequest('nodeid-required')
263
264
265    def _render_node(self, verbElement):
266        """
267        Render the required node identifier on the verbElement.
268        """
269        if not self.nodeIdentifier:
270            raise Exception("Node identifier is required")
271
272        verbElement['node'] = self.nodeIdentifier
273
274
275    def _parse_nodeOrEmpty(self, verbElement):
276        """
277        Parse the node identifier out of the verbElement. May be empty.
278        """
279        self.nodeIdentifier = verbElement.getAttribute("node", '')
280
281
282    def _render_nodeOrEmpty(self, verbElement):
283        """
284        Render the node identifier on the verbElement. May be empty.
285        """
286        if self.nodeIdentifier:
287            verbElement['node'] = self.nodeIdentifier
288
289
290    def _parse_nodeOrNone(self, verbElement):
291        """
292        Parse the optional node identifier out of the verbElement.
293        """
294        self.nodeIdentifier = verbElement.getAttribute("node")
295
296
297    def _render_nodeOrNone(self, verbElement):
298        """
299        Render the optional node identifier on the verbElement.
300        """
301        if self.nodeIdentifier:
302            verbElement['node'] = self.nodeIdentifier
303
304
305    def _parse_items(self, verbElement):
306        """
307        Parse items out of the verbElement for publish requests.
308        """
309        self.items = []
310        for element in verbElement.elements():
311            if element.uri == NS_PUBSUB and element.name == 'item':
312                self.items.append(element)
313
314
315    def _render_items(self, verbElement):
316        """
317        Render items into the verbElement for publish requests.
318        """
319        if self.items:
320            for item in self.items:
321                verbElement.addChild(item)
322
323
324    def _parse_jid(self, verbElement):
325        """
326        Parse subscriber out of the verbElement for un-/subscribe requests.
327        """
328        try:
329            self.subscriber = jid.internJID(verbElement["jid"])
330        except KeyError:
331            raise BadRequest('jid-required')
332
333
334    def _render_jid(self, verbElement):
335        """
336        Render subscriber into the verbElement for un-/subscribe requests.
337        """
338        verbElement['jid'] = self.subscriber.full()
339
340
341    def _parse_default(self, verbElement):
342        """
343        Parse node type out of a request for the default node configuration.
344        """
345        form = data_form.findForm(verbElement, NS_PUBSUB_NODE_CONFIG)
346        if form and form.formType == 'submit':
347            values = form.getValues()
348            self.nodeType = values.get('pubsub#node_type', 'leaf')
349        else:
350            self.nodeType = 'leaf'
351
352
353    def _parse_configure(self, verbElement):
354        """
355        Parse options out of a request for setting the node configuration.
356        """
357        form = data_form.findForm(verbElement, NS_PUBSUB_NODE_CONFIG)
358        if form:
359            if form.formType in ('submit', 'cancel'):
360                self.options = form
361            else:
362                raise BadRequest(text=u"Unexpected form type '%s'" % form.formType)
363        else:
364            raise BadRequest(text="Missing configuration form")
365
366
367    def _parse_configureOrNone(self, verbElement):
368        """
369        Parse optional node configuration form in create request.
370        """
371        for element in verbElement.parent.elements():
372            if element.uri == NS_PUBSUB and element.name == 'configure':
373                form = data_form.findForm(element, NS_PUBSUB_NODE_CONFIG)
374                if form:
375                    if form.formType != 'submit':
376                        raise BadRequest(text=u"Unexpected form type '%s'" %
377                                              form.formType)
378                else:
379                    form = data_form.Form('submit',
380                                          formNamespace=NS_PUBSUB_NODE_CONFIG)
381                self.options = form
382
383
384    def _render_configureOrNone(self, verbElement):
385        """
386        Render optional node configuration form in create request.
387        """
388        if self.options is not None:
389            configure = verbElement.parent.addElement('configure')
390            configure.addChild(self.options.toElement())
391
392
393    def _parse_itemIdentifiers(self, verbElement):
394        """
395        Parse item identifiers out of items and retract requests.
396        """
397        self.itemIdentifiers = []
398        for element in verbElement.elements():
399            if element.uri == NS_PUBSUB and element.name == 'item':
400                try:
401                    self.itemIdentifiers.append(element["id"])
402                except KeyError:
403                    raise BadRequest()
404
405
406    def _render_itemIdentifiers(self, verbElement):
407        """
408        Render item identifiers into items and retract requests.
409        """
410        if self.itemIdentifiers:
411            for itemIdentifier in self.itemIdentifiers:
412                item = verbElement.addElement('item')
413                item['id'] = itemIdentifier
414
415
416    def _parse_maxItems(self, verbElement):
417        """
418        Parse maximum items out of an items request.
419        """
420        value = verbElement.getAttribute('max_items')
421
422        if value:
423            try:
424                self.maxItems = int(value)
425            except ValueError:
426                raise BadRequest(text="Field max_items requires a positive " +
427                                      "integer value")
428
429
430    def _render_maxItems(self, verbElement):
431        """
432        Parse maximum items into an items request.
433        """
434        if self.maxItems:
435            verbElement['max_items'] = unicode(self.maxItems)
436
437
438    def _parse_options(self, verbElement):
439        """
440        Parse options form out of a subscription options request.
441        """
442        form = data_form.findForm(verbElement, NS_PUBSUB_SUBSCRIBE_OPTIONS)
443        if form:
444            if form.formType in ('submit', 'cancel'):
445                self.options = form
446            else:
447                raise BadRequest(text=u"Unexpected form type '%s'" % form.formType)
448        else:
449            raise BadRequest(text="Missing options form")
450
451
452
453    def _render_options(self, verbElement):
454        verbElement.addChild(self.options.toElement())
455
456
457    def _parse_optionsWithSubscribe(self, verbElement):
458        for element in verbElement.parent.elements():
459            if element.name == 'options' and element.uri == NS_PUBSUB:
460                form = data_form.findForm(element,
461                                          NS_PUBSUB_SUBSCRIBE_OPTIONS)
462                if form:
463                    if form.formType != 'submit':
464                        raise BadRequest(text=u"Unexpected form type '%s'" %
465                                              form.formType)
466                else:
467                    form = data_form.Form('submit',
468                                          formNamespace=NS_PUBSUB_SUBSCRIBE_OPTIONS)
469                self.options = form
470
471
472    def _render_optionsWithSubscribe(self, verbElement):
473        if self.options:
474            optionsElement = verbElement.parent.addElement('options')
475            self._render_options(optionsElement)
476
477
478    def parseElement(self, element):
479        """
480        Parse the publish-subscribe verb and parameters out of a request.
481        """
482        generic.Stanza.parseElement(self, element)
483
484        verbs = []
485        children = []
486        for child in element.pubsub.elements():
487            key = (self.stanzaType, child.uri, child.name)
488            try:
489                verb = self._requestVerbMap[key]
490            except KeyError:
491                continue
492
493            verbs.append(verb)
494            children.append(child)
495
496        if not verbs:
497            raise NotImplementedError()
498
499        if len(verbs) > 1:
500            if 'optionsSet' in verbs and 'subscribe' in verbs:
501                self.verb = 'subscribe'
502                child = children[verbs.index('subscribe')]
503            else:
504                raise NotImplementedError()
505        else:
506            self.verb = verbs[0]
507
508        for parameter in self._parameters[self.verb]:
509            getattr(self, '_parse_%s' % parameter)(child)
510
511
512
513    def send(self, xs):
514        """
515        Send this request to its recipient.
516
517        This renders all of the relevant parameters for this specific
518        requests into an L{IQ}, and invoke its C{send} method.
519        This returns a deferred that fires upon reception of a response. See
520        L{IQ} for details.
521
522        @param xs: The XML stream to send the request on.
523        @type xs: L{xmlstream.XmlStream}
524        @rtype: L{defer.Deferred}.
525        """
526
527        try:
528            (self.stanzaType,
529             childURI,
530             childName) = self._verbRequestMap[self.verb]
531        except KeyError:
532            raise NotImplementedError()
533
534        iq = IQ(xs, self.stanzaType)
535        iq.addElement((childURI, 'pubsub'))
536        verbElement = iq.pubsub.addElement(childName)
537
538        if self.sender:
539            iq['from'] = self.sender.full()
540        if self.recipient:
541            iq['to'] = self.recipient.full()
542
543        for parameter in self._parameters[self.verb]:
544            getattr(self, '_render_%s' % parameter)(verbElement)
545
546        return iq.send()
547
548
549
550class PubSubEvent(object):
551    """
552    A publish subscribe event.
553
554    @param sender: The entity from which the notification was received.
555    @type sender: L{jid.JID}
556    @param recipient: The entity to which the notification was sent.
557    @type recipient: L{wokkel.pubsub.ItemsEvent}
558    @param nodeIdentifier: Identifier of the node the event pertains to.
559    @type nodeIdentifier: C{unicode}
560    @param headers: SHIM headers, see L{wokkel.shim.extractHeaders}.
561    @type headers: L{dict}
562    """
563
564    def __init__(self, sender, recipient, nodeIdentifier, headers):
565        self.sender = sender
566        self.recipient = recipient
567        self.nodeIdentifier = nodeIdentifier
568        self.headers = headers
569
570
571
572class ItemsEvent(PubSubEvent):
573    """
574    A publish-subscribe event that signifies new, updated and retracted items.
575
576    @param items: List of received items as domish elements.
577    @type items: C{list} of L{domish.Element}
578    """
579
580    def __init__(self, sender, recipient, nodeIdentifier, items, headers):
581        PubSubEvent.__init__(self, sender, recipient, nodeIdentifier, headers)
582        self.items = items
583
584
585
586class DeleteEvent(PubSubEvent):
587    """
588    A publish-subscribe event that signifies the deletion of a node.
589    """
590
591    redirectURI = None
592
593
594
595class PurgeEvent(PubSubEvent):
596    """
597    A publish-subscribe event that signifies the purging of a node.
598    """
599
600
601
602class PubSubClient(XMPPHandler):
603    """
604    Publish subscribe client protocol.
605    """
606
607    implements(IPubSubClient)
608
609    def connectionInitialized(self):
610        self.xmlstream.addObserver('/message/event[@xmlns="%s"]' %
611                                   NS_PUBSUB_EVENT, self._onEvent)
612
613
614    def _onEvent(self, message):
615        try:
616            sender = jid.JID(message["from"])
617            recipient = jid.JID(message["to"])
618        except KeyError:
619            return
620
621        actionElement = None
622        for element in message.event.elements():
623            if element.uri == NS_PUBSUB_EVENT:
624                actionElement = element
625
626        if not actionElement:
627            return
628
629        eventHandler = getattr(self, "_onEvent_%s" % actionElement.name, None)
630
631        if eventHandler:
632            headers = shim.extractHeaders(message)
633            eventHandler(sender, recipient, actionElement, headers)
634            message.handled = True
635
636
637    def _onEvent_items(self, sender, recipient, action, headers):
638        nodeIdentifier = action["node"]
639
640        items = [element for element in action.elements()
641                         if element.name in ('item', 'retract')]
642
643        event = ItemsEvent(sender, recipient, nodeIdentifier, items, headers)
644        self.itemsReceived(event)
645
646
647    def _onEvent_delete(self, sender, recipient, action, headers):
648        nodeIdentifier = action["node"]
649        event = DeleteEvent(sender, recipient, nodeIdentifier, headers)
650        if action.redirect:
651            event.redirectURI = action.redirect.getAttribute('uri')
652        self.deleteReceived(event)
653
654
655    def _onEvent_purge(self, sender, recipient, action, headers):
656        nodeIdentifier = action["node"]
657        event = PurgeEvent(sender, recipient, nodeIdentifier, headers)
658        self.purgeReceived(event)
659
660
661    def itemsReceived(self, event):
662        pass
663
664
665    def deleteReceived(self, event):
666        pass
667
668
669    def purgeReceived(self, event):
670        pass
671
672
673    def createNode(self, service, nodeIdentifier=None, options=None,
674                         sender=None):
675        """
676        Create a publish subscribe node.
677
678        @param service: The publish subscribe service to create the node at.
679        @type service: L{JID}
680        @param nodeIdentifier: Optional suggestion for the id of the node.
681        @type nodeIdentifier: C{unicode}
682        @param options: Optional node configuration options.
683        @type options: C{dict}
684        """
685        request = PubSubRequest('create')
686        request.recipient = service
687        request.nodeIdentifier = nodeIdentifier
688        request.sender = sender
689
690        if options:
691            form = data_form.Form(formType='submit',
692                                  formNamespace=NS_PUBSUB_NODE_CONFIG)
693            form.makeFields(options)
694            request.options = form
695
696        def cb(iq):
697            try:
698                new_node = iq.pubsub.create["node"]
699            except AttributeError:
700                # the suggested node identifier was accepted
701                new_node = nodeIdentifier
702            return new_node
703
704        d = request.send(self.xmlstream)
705        d.addCallback(cb)
706        return d
707
708
709    def deleteNode(self, service, nodeIdentifier, sender=None):
710        """
711        Delete a publish subscribe node.
712
713        @param service: The publish subscribe service to delete the node from.
714        @type service: L{JID}
715        @param nodeIdentifier: The identifier of the node.
716        @type nodeIdentifier: C{unicode}
717        """
718        request = PubSubRequest('delete')
719        request.recipient = service
720        request.nodeIdentifier = nodeIdentifier
721        request.sender = sender
722        return request.send(self.xmlstream)
723
724
725    def subscribe(self, service, nodeIdentifier, subscriber,
726                        options=None, sender=None):
727        """
728        Subscribe to a publish subscribe node.
729
730        @param service: The publish subscribe service that keeps the node.
731        @type service: L{JID}
732        @param nodeIdentifier: The identifier of the node.
733        @type nodeIdentifier: C{unicode}
734        @param subscriber: The entity to subscribe to the node. This entity
735                           will get notifications of new published items.
736        @type subscriber: L{JID}
737        @param options: Subscription options.
738        @type options: C{dict}.
739        """
740        request = PubSubRequest('subscribe')
741        request.recipient = service
742        request.nodeIdentifier = nodeIdentifier
743        request.subscriber = subscriber
744        request.sender = sender
745
746        if options:
747            form = data_form.Form(formType='submit',
748                                  formNamespace=NS_PUBSUB_SUBSCRIBE_OPTIONS)
749            form.makeFields(options)
750            request.options = form
751
752        def cb(iq):
753            subscription = iq.pubsub.subscription["subscription"]
754
755            if subscription == 'pending':
756                raise SubscriptionPending
757            elif subscription == 'unconfigured':
758                raise SubscriptionUnconfigured
759            else:
760                # we assume subscription == 'subscribed'
761                # any other value would be invalid, but that should have
762                # yielded a stanza error.
763                return None
764
765        d = request.send(self.xmlstream)
766        d.addCallback(cb)
767        return d
768
769
770    def unsubscribe(self, service, nodeIdentifier, subscriber, sender=None):
771        """
772        Unsubscribe from a publish subscribe node.
773
774        @param service: The publish subscribe service that keeps the node.
775        @type service: L{JID}
776        @param nodeIdentifier: The identifier of the node.
777        @type nodeIdentifier: C{unicode}
778        @param subscriber: The entity to unsubscribe from the node.
779        @type subscriber: L{JID}
780        """
781        request = PubSubRequest('unsubscribe')
782        request.recipient = service
783        request.nodeIdentifier = nodeIdentifier
784        request.subscriber = subscriber
785        request.sender = sender
786        return request.send(self.xmlstream)
787
788
789    def publish(self, service, nodeIdentifier, items=None, sender=None):
790        """
791        Publish to a publish subscribe node.
792
793        @param service: The publish subscribe service that keeps the node.
794        @type service: L{JID}
795        @param nodeIdentifier: The identifier of the node.
796        @type nodeIdentifier: C{unicode}
797        @param items: Optional list of L{Item}s to publish.
798        @type items: C{list}
799        """
800        request = PubSubRequest('publish')
801        request.recipient = service
802        request.nodeIdentifier = nodeIdentifier
803        request.items = items
804        request.sender = sender
805        return request.send(self.xmlstream)
806
807
808    def items(self, service, nodeIdentifier, maxItems=None, sender=None):
809        """
810        Retrieve previously published items from a publish subscribe node.
811
812        @param service: The publish subscribe service that keeps the node.
813        @type service: L{JID}
814        @param nodeIdentifier: The identifier of the node.
815        @type nodeIdentifier: C{unicode}
816        @param maxItems: Optional limit on the number of retrieved items.
817        @type maxItems: C{int}
818        """
819        request = PubSubRequest('items')
820        request.recipient = service
821        request.nodeIdentifier = nodeIdentifier
822        if maxItems:
823            request.maxItems = str(int(maxItems))
824        request.sender = sender
825
826        def cb(iq):
827            items = []
828            for element in iq.pubsub.items.elements():
829                if element.uri == NS_PUBSUB and element.name == 'item':
830                    items.append(element)
831            return items
832
833        d = request.send(self.xmlstream)
834        d.addCallback(cb)
835        return d
836
837
838    def getOptions(self, service, nodeIdentifier, subscriber, sender=None):
839        """
840        Get subscription options.
841
842        @param service: The publish subscribe service that keeps the node.
843        @type service: L{JID}
844
845        @param nodeIdentifier: The identifier of the node.
846        @type nodeIdentifier: C{unicode}
847
848        @param subscriber: The entity subscribed to the node.
849        @type subscriber: L{JID}
850
851        @rtype: L{data_form.Form}
852        """
853        request = PubSubRequest('optionsGet')
854        request.recipient = service
855        request.nodeIdentifier = nodeIdentifier
856        request.subscriber = subscriber
857        request.sender = sender
858
859        def cb(iq):
860            form = data_form.findForm(iq.pubsub.options,
861                                      NS_PUBSUB_SUBSCRIBE_OPTIONS)
862            form.typeCheck()
863            return form
864
865        d = request.send(self.xmlstream)
866        d.addCallback(cb)
867        return d
868
869
870    def setOptions(self, service, nodeIdentifier, subscriber,
871                         options, sender=None):
872        """
873        Set subscription options.
874
875        @param service: The publish subscribe service that keeps the node.
876        @type service: L{JID}
877
878        @param nodeIdentifier: The identifier of the node.
879        @type nodeIdentifier: C{unicode}
880
881        @param subscriber: The entity subscribed to the node.
882        @type subscriber: L{JID}
883
884        @param options: Subscription options.
885        @type options: C{dict}.
886        """
887        request = PubSubRequest('optionsSet')
888        request.recipient = service
889        request.nodeIdentifier = nodeIdentifier
890        request.subscriber = subscriber
891        request.sender = sender
892
893        form = data_form.Form(formType='submit',
894                              formNamespace=NS_PUBSUB_SUBSCRIBE_OPTIONS)
895        form.makeFields(options)
896        request.options = form
897
898        d = request.send(self.xmlstream)
899        return d
900
901
902
903class PubSubService(XMPPHandler, IQHandlerMixin):
904    """
905    Protocol implementation for a XMPP Publish Subscribe Service.
906
907    The word Service here is used as taken from the Publish Subscribe
908    specification. It is the party responsible for keeping nodes and their
909    subscriptions, and sending out notifications.
910
911    Methods from the L{IPubSubService} interface that are called as
912    a result of an XMPP request may raise exceptions. Alternatively the
913    deferred returned by these methods may have their errback called. These are
914    handled as follows:
915
916     - If the exception is an instance of L{error.StanzaError}, an error
917       response iq is returned.
918     - Any other exception is reported using L{log.msg}. An error response
919       with the condition C{internal-server-error} is returned.
920
921    The default implementation of said methods raises an L{Unsupported}
922    exception and are meant to be overridden.
923
924    @ivar discoIdentity: Service discovery identity as a dictionary with
925                         keys C{'category'}, C{'type'} and C{'name'}.
926    @ivar pubSubFeatures: List of supported publish-subscribe features for
927                          service discovery, as C{str}.
928    @type pubSubFeatures: C{list} or C{None}
929    """
930
931    implements(IPubSubService)
932
933    iqHandlers = {
934            '/*': '_onPubSubRequest',
935            }
936
937    _legacyHandlers = {
938        'publish': ('publish', ['sender', 'recipient',
939                                'nodeIdentifier', 'items']),
940        'subscribe': ('subscribe', ['sender', 'recipient',
941                                    'nodeIdentifier', 'subscriber']),
942        'unsubscribe': ('unsubscribe', ['sender', 'recipient',
943                                        'nodeIdentifier', 'subscriber']),
944        'subscriptions': ('subscriptions', ['sender', 'recipient']),
945        'affiliations': ('affiliations', ['sender', 'recipient']),
946        'create': ('create', ['sender', 'recipient', 'nodeIdentifier']),
947        'getConfigurationOptions': ('getConfigurationOptions', []),
948        'default': ('getDefaultConfiguration',
949                    ['sender', 'recipient', 'nodeType']),
950        'configureGet': ('getConfiguration', ['sender', 'recipient',
951                                              'nodeIdentifier']),
952        'configureSet': ('setConfiguration', ['sender', 'recipient',
953                                              'nodeIdentifier', 'options']),
954        'items': ('items', ['sender', 'recipient', 'nodeIdentifier',
955                            'maxItems', 'itemIdentifiers']),
956        'retract': ('retract', ['sender', 'recipient', 'nodeIdentifier',
957                                'itemIdentifiers']),
958        'purge': ('purge', ['sender', 'recipient', 'nodeIdentifier']),
959        'delete': ('delete', ['sender', 'recipient', 'nodeIdentifier']),
960    }
961
962    hideNodes = False
963
964    def __init__(self, resource=None):
965        self.resource = resource
966        self.discoIdentity = {'category': 'pubsub',
967                              'type': 'generic',
968                              'name': 'Generic Publish-Subscribe Service'}
969
970        self.pubSubFeatures = []
971
972
973    def connectionMade(self):
974        self.xmlstream.addObserver(PUBSUB_REQUEST, self.handleRequest)
975
976
977    def getDiscoInfo(self, requestor, target, nodeIdentifier):
978        def toInfo(nodeInfo, info):
979            if not nodeInfo:
980                return info
981
982            (nodeType, metaData) = nodeInfo['type'], nodeInfo['meta-data']
983            info.append(disco.DiscoIdentity('pubsub', nodeType))
984            if metaData:
985                form = data_form.Form(formType="result",
986                                      formNamespace=NS_PUBSUB_META_DATA)
987                form.addField(
988                        data_form.Field(
989                            var='pubsub#node_type',
990                            value=nodeType,
991                            label='The type of node (collection or leaf)'
992                        )
993                )
994
995                for metaDatum in metaData:
996                    form.addField(data_form.Field.fromDict(metaDatum))
997
998                info.append(form)
999
1000            return info
1001
1002        info = []
1003
1004        request = PubSubRequest('discoInfo')
1005
1006        if self.resource is not None:
1007            resource = self.resource.locateResource(request)
1008            identity = resource.discoIdentity
1009            features = resource.features
1010            getInfo = resource.getInfo
1011        else:
1012            category, idType, name = self.discoIdentity
1013            identity = disco.DiscoIdentity(category, idType, name)
1014            features = self.pubSubFeatures
1015            getInfo = self.getNodeInfo
1016
1017        if not nodeIdentifier:
1018            info.append(identity)
1019            info.append(disco.DiscoFeature(disco.NS_DISCO_ITEMS))
1020            info.extend([disco.DiscoFeature("%s#%s" % (NS_PUBSUB, feature))
1021                         for feature in features])
1022
1023        d = getInfo(requestor, target, nodeIdentifier or '')
1024        d.addCallback(toInfo, info)
1025        d.addErrback(log.err)
1026        return d
1027
1028
1029    def getDiscoItems(self, requestor, target, nodeIdentifier):
1030        if self.hideNodes:
1031            d = defer.succeed([])
1032        elif self.resource is not None:
1033            request = PubSubRequest('discoInfo')
1034            resource = self.resource.locateResource(request)
1035            d = resource.getNodes(requestor, target, nodeIdentifier)
1036        elif nodeIdentifier:
1037            d = self.getNodes(requestor, target)
1038        else:
1039            d = defer.succeed([])
1040
1041        d.addCallback(lambda nodes: [disco.DiscoItem(target, node)
1042                                     for node in nodes])
1043        return d
1044
1045
1046    def _onPubSubRequest(self, iq):
1047        request = PubSubRequest.fromElement(iq)
1048
1049        if self.resource is not None:
1050            resource = self.resource.locateResource(request)
1051        else:
1052            resource = self
1053
1054        # Preprocess the request, knowing the handling resource
1055        try:
1056            preProcessor = getattr(self, '_preProcess_%s' % request.verb)
1057        except AttributeError:
1058            pass
1059        else:
1060            request = preProcessor(resource, request)
1061            if request is None:
1062                return defer.succeed(None)
1063
1064        # Process the request itself,
1065        if resource is not self:
1066            try:
1067                handler = getattr(resource, request.verb)
1068            except AttributeError:
1069                # fix lookup feature
1070                text = "Request verb: %s" % request.verb
1071                return defer.fail(Unsupported('', text))
1072
1073            d = handler(request)
1074        else:
1075            handlerName, argNames = self._legacyHandlers[request.verb]
1076            handler = getattr(self, handlerName)
1077            args = [getattr(request, arg) for arg in argNames]
1078            if 'options' in argNames:
1079                args[argNames.index('options')] = request.options.getValues()
1080            d = handler(*args)
1081
1082        # If needed, translate the result into a response
1083        try:
1084            cb = getattr(self, '_toResponse_%s' % request.verb)
1085        except AttributeError:
1086            pass
1087        else:
1088            d.addCallback(cb, resource, request)
1089
1090        return d
1091
1092
1093    def _toResponse_subscribe(self, result, resource, request):
1094        response = domish.Element((NS_PUBSUB, "pubsub"))
1095        subscription = response.addElement("subscription")
1096        if result.nodeIdentifier:
1097            subscription["node"] = result.nodeIdentifier
1098        subscription["jid"] = result.subscriber.full()
1099        subscription["subscription"] = result.state
1100        return response
1101
1102
1103    def _toResponse_subscriptions(self, result, resource, request):
1104        response = domish.Element((NS_PUBSUB, 'pubsub'))
1105        subscriptions = response.addElement('subscriptions')
1106        for subscription in result:
1107            item = subscriptions.addElement('subscription')
1108            item['node'] = subscription.nodeIdentifier
1109            item['jid'] = subscription.subscriber.full()
1110            item['subscription'] = subscription.state
1111        return response
1112
1113
1114    def _toResponse_affiliations(self, result, resource, request):
1115        response = domish.Element((NS_PUBSUB, 'pubsub'))
1116        affiliations = response.addElement('affiliations')
1117
1118        for nodeIdentifier, affiliation in result:
1119            item = affiliations.addElement('affiliation')
1120            item['node'] = nodeIdentifier
1121            item['affiliation'] = affiliation
1122
1123        return response
1124
1125
1126    def _toResponse_create(self, result, resource, request):
1127        if not request.nodeIdentifier or request.nodeIdentifier != result:
1128            response = domish.Element((NS_PUBSUB, 'pubsub'))
1129            create = response.addElement('create')
1130            create['node'] = result
1131            return response
1132        else:
1133            return None
1134
1135
1136    def _formFromConfiguration(self, resource, values):
1137        fieldDefs = resource.getConfigurationOptions()
1138        form = data_form.Form(formType="form",
1139                              formNamespace=NS_PUBSUB_NODE_CONFIG)
1140        form.makeFields(values, fieldDefs)
1141        return form
1142
1143
1144    def _checkConfiguration(self, resource, form):
1145        fieldDefs = resource.getConfigurationOptions()
1146        form.typeCheck(fieldDefs, filterUnknown=True)
1147
1148
1149    def _preProcess_create(self, resource, request):
1150        if request.options:
1151            self._checkConfiguration(resource, request.options)
1152        return request
1153
1154
1155    def _preProcess_default(self, resource, request):
1156        if request.nodeType not in ('leaf', 'collection'):
1157            raise error.StanzaError('not-acceptable')
1158        else:
1159            return request
1160
1161
1162    def _toResponse_default(self, options, resource, request):
1163        response = domish.Element((NS_PUBSUB_OWNER, "pubsub"))
1164        default = response.addElement("default")
1165        form = self._formFromConfiguration(resource, options)
1166        default.addChild(form.toElement())
1167        return response
1168
1169
1170    def _toResponse_configureGet(self, options, resource, request):
1171        response = domish.Element((NS_PUBSUB_OWNER, "pubsub"))
1172        configure = response.addElement("configure")
1173        form = self._formFromConfiguration(resource, options)
1174        configure.addChild(form.toElement())
1175
1176        if request.nodeIdentifier:
1177            configure["node"] = request.nodeIdentifier
1178
1179        return response
1180
1181
1182    def _preProcess_configureSet(self, resource, request):
1183        if request.options.formType == 'cancel':
1184            return None
1185        else:
1186            self._checkConfiguration(resource, request.options)
1187            return request
1188
1189
1190    def _toResponse_items(self, result, resource, request):
1191        response = domish.Element((NS_PUBSUB, 'pubsub'))
1192        items = response.addElement('items')
1193        items["node"] = request.nodeIdentifier
1194
1195        for item in result:
1196            items.addChild(item)
1197
1198        return response
1199
1200
1201    def _createNotification(self, eventType, service, nodeIdentifier,
1202                                  subscriber, subscriptions=None):
1203        headers = []
1204
1205        if subscriptions:
1206            for subscription in subscriptions:
1207                if nodeIdentifier != subscription.nodeIdentifier:
1208                    headers.append(('Collection', subscription.nodeIdentifier))
1209
1210        message = domish.Element((None, "message"))
1211        message["from"] = service.full()
1212        message["to"] = subscriber.full()
1213        event = message.addElement((NS_PUBSUB_EVENT, "event"))
1214
1215        element = event.addElement(eventType)
1216        element["node"] = nodeIdentifier
1217
1218        if headers:
1219            message.addChild(shim.Headers(headers))
1220
1221        return message
1222
1223    # public methods
1224
1225    def notifyPublish(self, service, nodeIdentifier, notifications):
1226        for subscriber, subscriptions, items in notifications:
1227            message = self._createNotification('items', service,
1228                                               nodeIdentifier, subscriber,
1229                                               subscriptions)
1230            message.event.items.children = items
1231            self.send(message)
1232
1233
1234    def notifyDelete(self, service, nodeIdentifier, subscribers,
1235                           redirectURI=None):
1236        for subscriber in subscribers:
1237            message = self._createNotification('delete', service,
1238                                               nodeIdentifier,
1239                                               subscriber)
1240            if redirectURI:
1241                redirect = message.event.delete.addElement('redirect')
1242                redirect['uri'] = redirectURI
1243            self.send(message)
1244
1245
1246    def getNodeInfo(self, requestor, service, nodeIdentifier):
1247        return None
1248
1249
1250    def getNodes(self, requestor, service):
1251        return []
1252
1253
1254    def publish(self, requestor, service, nodeIdentifier, items):
1255        raise Unsupported('publish')
1256
1257
1258    def subscribe(self, requestor, service, nodeIdentifier, subscriber):
1259        raise Unsupported('subscribe')
1260
1261
1262    def unsubscribe(self, requestor, service, nodeIdentifier, subscriber):
1263        raise Unsupported('subscribe')
1264
1265
1266    def subscriptions(self, requestor, service):
1267        raise Unsupported('retrieve-subscriptions')
1268
1269
1270    def affiliations(self, requestor, service):
1271        raise Unsupported('retrieve-affiliations')
1272
1273
1274    def create(self, requestor, service, nodeIdentifier):
1275        raise Unsupported('create-nodes')
1276
1277
1278    def getConfigurationOptions(self):
1279        return {}
1280
1281
1282    def getDefaultConfiguration(self, requestor, service, nodeType):
1283        raise Unsupported('retrieve-default')
1284
1285
1286    def getConfiguration(self, requestor, service, nodeIdentifier):
1287        raise Unsupported('config-node')
1288
1289
1290    def setConfiguration(self, requestor, service, nodeIdentifier, options):
1291        raise Unsupported('config-node')
1292
1293
1294    def items(self, requestor, service, nodeIdentifier, maxItems,
1295                    itemIdentifiers):
1296        raise Unsupported('retrieve-items')
1297
1298
1299    def retract(self, requestor, service, nodeIdentifier, itemIdentifiers):
1300        raise Unsupported('retract-items')
1301
1302
1303    def purge(self, requestor, service, nodeIdentifier):
1304        raise Unsupported('purge-nodes')
1305
1306
1307    def delete(self, requestor, service, nodeIdentifier):
1308        raise Unsupported('delete-nodes')
1309
1310
1311
1312class PubSubResource(object):
1313
1314    implements(IPubSubResource)
1315
1316    features = []
1317    discoIdentity = disco.DiscoIdentity('pubsub',
1318                                        'service',
1319                                        'Publish-Subscribe Service')
1320
1321
1322    def locateResource(self, request):
1323        return self
1324
1325
1326    def getInfo(self, requestor, service, nodeIdentifier):
1327        return defer.succeed(None)
1328
1329
1330    def getNodes(self, requestor, service, nodeIdentifier):
1331        return defer.succeed([])
1332
1333
1334    def getConfigurationOptions(self):
1335        return {}
1336
1337
1338    def publish(self, request):
1339        return defer.fail(Unsupported('publish'))
1340
1341
1342    def subscribe(self, request):
1343        return defer.fail(Unsupported('subscribe'))
1344
1345
1346    def unsubscribe(self, request):
1347        return defer.fail(Unsupported('subscribe'))
1348
1349
1350    def subscriptions(self, request):
1351        return defer.fail(Unsupported('retrieve-subscriptions'))
1352
1353
1354    def affiliations(self, request):
1355        return defer.fail(Unsupported('retrieve-affiliations'))
1356
1357
1358    def create(self, request):
1359        return defer.fail(Unsupported('create-nodes'))
1360
1361
1362    def default(self, request):
1363        return defer.fail(Unsupported('retrieve-default'))
1364
1365
1366    def configureGet(self, request):
1367        return defer.fail(Unsupported('config-node'))
1368
1369
1370    def configureSet(self, request):
1371        return defer.fail(Unsupported('config-node'))
1372
1373
1374    def items(self, request):
1375        return defer.fail(Unsupported('retrieve-items'))
1376
1377
1378    def retract(self, request):
1379        return defer.fail(Unsupported('retract-items'))
1380
1381
1382    def purge(self, request):
1383        return defer.fail(Unsupported('purge-nodes'))
1384
1385
1386    def delete(self, request):
1387        return defer.fail(Unsupported('delete-nodes'))
1388
1389
1390    def affiliationsGet(self, request):
1391        return defer.fail(Unsupported('modify-affiliations'))
1392
1393
1394    def affiliationsSet(self, request):
1395        return defer.fail(Unsupported('modify-affiliations'))
1396
1397
1398    def subscriptionsGet(self, request):
1399        return defer.fail(Unsupported('manage-subscriptions'))
1400
1401
1402    def subscriptionsSet(self, request):
1403        return defer.fail(Unsupported('manage-subscriptions'))
Note: See TracBrowser for help on using the repository browser.