source: wokkel/pubsub.py @ 82:f965c6c5da15

Last change on this file since 82:f965c6c5da15 was 82:f965c6c5da15, checked in by Ralph Meijer <ralphm@…>, 11 years ago

Add support for configuration options in node creation requests

File size: 41.2 KB
Line 
1# -*- test-case-name: wokkel.test.test_pubsub -*-
2#
3# Copyright (c) 2003-2010 Ralph Meijer
4# See LICENSE for details.
5
6"""
7XMPP publish-subscribe protocol.
8
9This protocol is specified in
10U{XEP-0060<http://www.xmpp.org/extensions/xep-0060.html>}.
11"""
12
13from zope.interface import implements
14
15from twisted.internet import defer
16from twisted.python import log
17from twisted.words.protocols.jabber import jid, error
18from twisted.words.xish import domish
19
20from wokkel import disco, data_form, generic, shim
21from wokkel.compat import IQ
22from wokkel.subprotocols import IQHandlerMixin, XMPPHandler
23from wokkel.iwokkel import IPubSubClient, IPubSubService, IPubSubResource
24
25# Iq get and set XPath queries
26IQ_GET = '/iq[@type="get"]'
27IQ_SET = '/iq[@type="set"]'
28
29# Publish-subscribe namespaces
30NS_PUBSUB = 'http://jabber.org/protocol/pubsub'
31NS_PUBSUB_EVENT = NS_PUBSUB + '#event'
32NS_PUBSUB_ERRORS = NS_PUBSUB + '#errors'
33NS_PUBSUB_OWNER = NS_PUBSUB + "#owner"
34NS_PUBSUB_NODE_CONFIG = NS_PUBSUB + "#node_config"
35NS_PUBSUB_META_DATA = NS_PUBSUB + "#meta-data"
36NS_PUBSUB_SUBSCRIBE_OPTIONS = NS_PUBSUB + "#subscribe_options"
37
38# XPath to match pubsub requests
39PUBSUB_REQUEST = '/iq[@type="get" or @type="set"]/' + \
40                    'pubsub[@xmlns="' + NS_PUBSUB + '" or ' + \
41                           '@xmlns="' + NS_PUBSUB_OWNER + '"]'
42
43class SubscriptionPending(Exception):
44    """
45    Raised when the requested subscription is pending acceptance.
46    """
47
48
49
50class SubscriptionUnconfigured(Exception):
51    """
52    Raised when the requested subscription needs to be configured before
53    becoming active.
54    """
55
56
57
58class PubSubError(error.StanzaError):
59    """
60    Exception with publish-subscribe specific condition.
61    """
62    def __init__(self, condition, pubsubCondition, feature=None, text=None):
63        appCondition = domish.Element((NS_PUBSUB_ERRORS, pubsubCondition))
64        if feature:
65            appCondition['feature'] = feature
66        error.StanzaError.__init__(self, condition,
67                                         text=text,
68                                         appCondition=appCondition)
69
70
71
72class BadRequest(error.StanzaError):
73    """
74    Bad request stanza error.
75    """
76    def __init__(self, pubsubCondition=None, text=None):
77        if pubsubCondition:
78            appCondition = domish.Element((NS_PUBSUB_ERRORS, pubsubCondition))
79        else:
80            appCondition = None
81        error.StanzaError.__init__(self, 'bad-request',
82                                         text=text,
83                                         appCondition=appCondition)
84
85
86
87class Unsupported(PubSubError):
88    def __init__(self, feature, text=None):
89        self.feature = feature
90        PubSubError.__init__(self, 'feature-not-implemented',
91                                   'unsupported',
92                                   feature,
93                                   text)
94
95    def __str__(self):
96        message = PubSubError.__str__(self)
97        message += ', feature %r' % self.feature
98        return message
99
100
101class Subscription(object):
102    """
103    A subscription to a node.
104
105    @ivar nodeIdentifier: The identifier of the node subscribed to.
106                          The root node is denoted by C{None}.
107    @ivar subscriber: The subscribing entity.
108    @ivar state: The subscription state. One of C{'subscribed'}, C{'pending'},
109                 C{'unconfigured'}.
110    @ivar options: Optional list of subscription options.
111    @type options: C{dict}.
112    """
113
114    def __init__(self, nodeIdentifier, subscriber, state, options=None):
115        self.nodeIdentifier = nodeIdentifier
116        self.subscriber = subscriber
117        self.state = state
118        self.options = options or {}
119
120
121
122class Item(domish.Element):
123    """
124    Publish subscribe item.
125
126    This behaves like an object providing L{domish.IElement}.
127
128    Item payload can be added using C{addChild} or C{addRawXml}, or using the
129    C{payload} keyword argument to C{__init__}.
130    """
131
132    def __init__(self, id=None, payload=None):
133        """
134        @param id: optional item identifier
135        @type id: L{unicode}
136        @param payload: optional item payload. Either as a domish element, or
137                        as serialized XML.
138        @type payload: object providing L{domish.IElement} or L{unicode}.
139        """
140
141        domish.Element.__init__(self, (NS_PUBSUB, 'item'))
142        if id is not None:
143            self['id'] = id
144        if payload is not None:
145            if isinstance(payload, basestring):
146                self.addRawXml(payload)
147            else:
148                self.addChild(payload)
149
150
151
152class PubSubRequest(generic.Stanza):
153    """
154    A publish-subscribe request.
155
156    The set of instance variables used depends on the type of request. If
157    a variable is not applicable or not passed in the request, its value is
158    C{None}.
159
160    @ivar verb: The type of publish-subscribe request. See L{_requestVerbMap}.
161    @type verb: C{str}.
162
163    @ivar affiliations: Affiliations to be modified.
164    @type affiliations: C{set}
165    @ivar items: The items to be published, as L{domish.Element}s.
166    @type items: C{list}
167    @ivar itemIdentifiers: Identifiers of the items to be retrieved or
168                           retracted.
169    @type itemIdentifiers: C{set}
170    @ivar maxItems: Maximum number of items to retrieve.
171    @type maxItems: C{int}.
172    @ivar nodeIdentifier: Identifier of the node the request is about.
173    @type nodeIdentifier: C{unicode}
174    @ivar nodeType: The type of node that should be created, or for which the
175                    configuration is retrieved. C{'leaf'} or C{'collection'}.
176    @type nodeType: C{str}
177    @ivar options: Configurations options for nodes, subscriptions and publish
178                   requests.
179    @type options: L{data_form.Form}
180    @ivar subscriber: The subscribing entity.
181    @type subscriber: L{JID}
182    @ivar subscriptionIdentifier: Identifier for a specific subscription.
183    @type subscriptionIdentifier: C{unicode}
184    @ivar subscriptions: Subscriptions to be modified, as a set of
185                         L{Subscription}.
186    @type subscriptions: C{set}
187    """
188
189    verb = None
190
191    affiliations = None
192    items = None
193    itemIdentifiers = None
194    maxItems = None
195    nodeIdentifier = None
196    nodeType = None
197    options = None
198    subscriber = None
199    subscriptionIdentifier = None
200    subscriptions = None
201
202    # Map request iq type and subelement name to request verb
203    _requestVerbMap = {
204        ('set', NS_PUBSUB, 'publish'): 'publish',
205        ('set', NS_PUBSUB, 'subscribe'): 'subscribe',
206        ('set', NS_PUBSUB, 'unsubscribe'): 'unsubscribe',
207        ('get', NS_PUBSUB, 'options'): 'optionsGet',
208        ('set', NS_PUBSUB, 'options'): 'optionsSet',
209        ('get', NS_PUBSUB, 'subscriptions'): 'subscriptions',
210        ('get', NS_PUBSUB, 'affiliations'): 'affiliations',
211        ('set', NS_PUBSUB, 'create'): 'create',
212        ('get', NS_PUBSUB_OWNER, 'default'): 'default',
213        ('get', NS_PUBSUB_OWNER, 'configure'): 'configureGet',
214        ('set', NS_PUBSUB_OWNER, 'configure'): 'configureSet',
215        ('get', NS_PUBSUB, 'items'): 'items',
216        ('set', NS_PUBSUB, 'retract'): 'retract',
217        ('set', NS_PUBSUB_OWNER, 'purge'): 'purge',
218        ('set', NS_PUBSUB_OWNER, 'delete'): 'delete',
219        ('get', NS_PUBSUB_OWNER, 'affiliations'): 'affiliationsGet',
220        ('set', NS_PUBSUB_OWNER, 'affiliations'): 'affiliationsSet',
221        ('get', NS_PUBSUB_OWNER, 'subscriptions'): 'subscriptionsGet',
222        ('set', NS_PUBSUB_OWNER, 'subscriptions'): 'subscriptionsSet',
223    }
224
225    # Map request verb to request iq type and subelement name
226    _verbRequestMap = dict(((v, k) for k, v in _requestVerbMap.iteritems()))
227
228    # Map request verb to parameter handler names
229    _parameters = {
230        'publish': ['node', 'items'],
231        'subscribe': ['nodeOrEmpty', 'jid'],
232        'unsubscribe': ['nodeOrEmpty', 'jid'],
233        'optionsGet': ['nodeOrEmpty', 'jid'],
234        'optionsSet': ['nodeOrEmpty', 'jid', 'options'],
235        'subscriptions': [],
236        'affiliations': [],
237        'create': ['nodeOrNone', 'configureOrNone'],
238        'default': ['default'],
239        'configureGet': ['nodeOrEmpty'],
240        'configureSet': ['nodeOrEmpty', 'configure'],
241        'items': ['node', 'maxItems', 'itemIdentifiers'],
242        'retract': ['node', 'itemIdentifiers'],
243        'purge': ['node'],
244        'delete': ['node'],
245        'affiliationsGet': ['nodeOrEmpty'],
246        'affiliationsSet': [],
247        'subscriptionsGet': ['nodeOrEmpty'],
248        'subscriptionsSet': [],
249    }
250
251    def __init__(self, verb=None):
252        self.verb = verb
253
254
255    def _parse_node(self, verbElement):
256        """
257        Parse the required node identifier out of the verbElement.
258        """
259        try:
260            self.nodeIdentifier = verbElement["node"]
261        except KeyError:
262            raise BadRequest('nodeid-required')
263
264
265    def _render_node(self, verbElement):
266        """
267        Render the required node identifier on the verbElement.
268        """
269        if not self.nodeIdentifier:
270            raise Exception("Node identifier is required")
271
272        verbElement['node'] = self.nodeIdentifier
273
274
275    def _parse_nodeOrEmpty(self, verbElement):
276        """
277        Parse the node identifier out of the verbElement. May be empty.
278        """
279        self.nodeIdentifier = verbElement.getAttribute("node", '')
280
281
282    def _render_nodeOrEmpty(self, verbElement):
283        """
284        Render the node identifier on the verbElement. May be empty.
285        """
286        if self.nodeIdentifier:
287            verbElement['node'] = self.nodeIdentifier
288
289
290    def _parse_nodeOrNone(self, verbElement):
291        """
292        Parse the optional node identifier out of the verbElement.
293        """
294        self.nodeIdentifier = verbElement.getAttribute("node")
295
296
297    def _render_nodeOrNone(self, verbElement):
298        """
299        Render the optional node identifier on the verbElement.
300        """
301        if self.nodeIdentifier:
302            verbElement['node'] = self.nodeIdentifier
303
304
305    def _parse_items(self, verbElement):
306        """
307        Parse items out of the verbElement for publish requests.
308        """
309        self.items = []
310        for element in verbElement.elements():
311            if element.uri == NS_PUBSUB and element.name == 'item':
312                self.items.append(element)
313
314
315    def _render_items(self, verbElement):
316        """
317        Render items into the verbElement for publish requests.
318        """
319        if self.items:
320            for item in self.items:
321                verbElement.addChild(item)
322
323
324    def _parse_jid(self, verbElement):
325        """
326        Parse subscriber out of the verbElement for un-/subscribe requests.
327        """
328        try:
329            self.subscriber = jid.internJID(verbElement["jid"])
330        except KeyError:
331            raise BadRequest('jid-required')
332
333
334    def _render_jid(self, verbElement):
335        """
336        Render subscriber into the verbElement for un-/subscribe requests.
337        """
338        verbElement['jid'] = self.subscriber.full()
339
340
341    def _parse_default(self, verbElement):
342        """
343        Parse node type out of a request for the default node configuration.
344        """
345        form = data_form.findForm(verbElement, NS_PUBSUB_NODE_CONFIG)
346        if form and form.formType == 'submit':
347            values = form.getValues()
348            self.nodeType = values.get('pubsub#node_type', 'leaf')
349        else:
350            self.nodeType = 'leaf'
351
352
353    def _parse_configure(self, verbElement):
354        """
355        Parse options out of a request for setting the node configuration.
356        """
357        form = data_form.findForm(verbElement, NS_PUBSUB_NODE_CONFIG)
358        if form:
359            if form.formType in ('submit', 'cancel'):
360                self.options = form
361            else:
362                raise BadRequest(text=u"Unexpected form type '%s'" % form.formType)
363        else:
364            raise BadRequest(text="Missing configuration form")
365
366
367    def _parse_configureOrNone(self, verbElement):
368        """
369        Parse optional node configuration form in create request.
370        """
371        for element in verbElement.parent.elements():
372            if element.uri == NS_PUBSUB and element.name == 'configure':
373                form = data_form.findForm(element, NS_PUBSUB_NODE_CONFIG)
374                if form:
375                    if form.formType == 'submit':
376                        self.options = form
377                    else:
378                        raise BadRequest(text=u"Unexpected form type '%s'" %
379                                              form.formType)
380                else:
381                    form = data_form.Form('submit',
382                                          formNamespace=NS_PUBSUB_NODE_CONFIG)
383                    self.options = form
384
385
386    def _render_configureOrNone(self, verbElement):
387        """
388        Render optional node configuration form in create request.
389        """
390        if self.options is not None:
391            configure = verbElement.parent.addElement('configure')
392            configure.addChild(self.options.toElement())
393
394
395    def _parse_itemIdentifiers(self, verbElement):
396        """
397        Parse item identifiers out of items and retract requests.
398        """
399        self.itemIdentifiers = []
400        for element in verbElement.elements():
401            if element.uri == NS_PUBSUB and element.name == 'item':
402                try:
403                    self.itemIdentifiers.append(element["id"])
404                except KeyError:
405                    raise BadRequest()
406
407
408    def _render_itemIdentifiers(self, verbElement):
409        """
410        Render item identifiers into items and retract requests.
411        """
412        if self.itemIdentifiers:
413            for itemIdentifier in self.itemIdentifiers:
414                item = verbElement.addElement('item')
415                item['id'] = itemIdentifier
416
417
418    def _parse_maxItems(self, verbElement):
419        """
420        Parse maximum items out of an items request.
421        """
422        value = verbElement.getAttribute('max_items')
423
424        if value:
425            try:
426                self.maxItems = int(value)
427            except ValueError:
428                raise BadRequest(text="Field max_items requires a positive " +
429                                      "integer value")
430
431
432    def _render_maxItems(self, verbElement):
433        """
434        Parse maximum items into an items request.
435        """
436        if self.maxItems:
437            verbElement['max_items'] = unicode(self.maxItems)
438
439
440    def _parse_options(self, verbElement):
441        """
442        Parse options form out of a subscription options request.
443        """
444        form = data_form.findForm(verbElement, NS_PUBSUB_SUBSCRIBE_OPTIONS)
445        if form:
446            if form.formType in ('submit', 'cancel'):
447                self.options = form
448            else:
449                raise BadRequest(text=u"Unexpected form type '%s'" % form.formType)
450        else:
451            raise BadRequest(text="Missing options form")
452
453
454    def parseElement(self, element):
455        """
456        Parse the publish-subscribe verb and parameters out of a request.
457        """
458        generic.Stanza.parseElement(self, element)
459
460        for child in element.pubsub.elements():
461            key = (self.stanzaType, child.uri, child.name)
462            try:
463                verb = self._requestVerbMap[key]
464            except KeyError:
465                continue
466            else:
467                self.verb = verb
468                break
469
470        if not self.verb:
471            raise NotImplementedError()
472
473        for parameter in self._parameters[verb]:
474            getattr(self, '_parse_%s' % parameter)(child)
475
476
477    def send(self, xs):
478        """
479        Send this request to its recipient.
480
481        This renders all of the relevant parameters for this specific
482        requests into an L{IQ}, and invoke its C{send} method.
483        This returns a deferred that fires upon reception of a response. See
484        L{IQ} for details.
485
486        @param xs: The XML stream to send the request on.
487        @type xs: L{xmlstream.XmlStream}
488        @rtype: L{defer.Deferred}.
489        """
490
491        try:
492            (self.stanzaType,
493             childURI,
494             childName) = self._verbRequestMap[self.verb]
495        except KeyError:
496            raise NotImplementedError()
497
498        iq = IQ(xs, self.stanzaType)
499        iq.addElement((childURI, 'pubsub'))
500        verbElement = iq.pubsub.addElement(childName)
501
502        if self.sender:
503            iq['from'] = self.sender.full()
504        if self.recipient:
505            iq['to'] = self.recipient.full()
506
507        for parameter in self._parameters[self.verb]:
508            getattr(self, '_render_%s' % parameter)(verbElement)
509
510        return iq.send()
511
512
513
514class PubSubEvent(object):
515    """
516    A publish subscribe event.
517
518    @param sender: The entity from which the notification was received.
519    @type sender: L{jid.JID}
520    @param recipient: The entity to which the notification was sent.
521    @type recipient: L{wokkel.pubsub.ItemsEvent}
522    @param nodeIdentifier: Identifier of the node the event pertains to.
523    @type nodeIdentifier: C{unicode}
524    @param headers: SHIM headers, see L{wokkel.shim.extractHeaders}.
525    @type headers: L{dict}
526    """
527
528    def __init__(self, sender, recipient, nodeIdentifier, headers):
529        self.sender = sender
530        self.recipient = recipient
531        self.nodeIdentifier = nodeIdentifier
532        self.headers = headers
533
534
535
536class ItemsEvent(PubSubEvent):
537    """
538    A publish-subscribe event that signifies new, updated and retracted items.
539
540    @param items: List of received items as domish elements.
541    @type items: C{list} of L{domish.Element}
542    """
543
544    def __init__(self, sender, recipient, nodeIdentifier, items, headers):
545        PubSubEvent.__init__(self, sender, recipient, nodeIdentifier, headers)
546        self.items = items
547
548
549
550class DeleteEvent(PubSubEvent):
551    """
552    A publish-subscribe event that signifies the deletion of a node.
553    """
554
555    redirectURI = None
556
557
558
559class PurgeEvent(PubSubEvent):
560    """
561    A publish-subscribe event that signifies the purging of a node.
562    """
563
564
565
566class PubSubClient(XMPPHandler):
567    """
568    Publish subscribe client protocol.
569    """
570
571    implements(IPubSubClient)
572
573    def connectionInitialized(self):
574        self.xmlstream.addObserver('/message/event[@xmlns="%s"]' %
575                                   NS_PUBSUB_EVENT, self._onEvent)
576
577
578    def _onEvent(self, message):
579        try:
580            sender = jid.JID(message["from"])
581            recipient = jid.JID(message["to"])
582        except KeyError:
583            return
584
585        actionElement = None
586        for element in message.event.elements():
587            if element.uri == NS_PUBSUB_EVENT:
588                actionElement = element
589
590        if not actionElement:
591            return
592
593        eventHandler = getattr(self, "_onEvent_%s" % actionElement.name, None)
594
595        if eventHandler:
596            headers = shim.extractHeaders(message)
597            eventHandler(sender, recipient, actionElement, headers)
598            message.handled = True
599
600
601    def _onEvent_items(self, sender, recipient, action, headers):
602        nodeIdentifier = action["node"]
603
604        items = [element for element in action.elements()
605                         if element.name in ('item', 'retract')]
606
607        event = ItemsEvent(sender, recipient, nodeIdentifier, items, headers)
608        self.itemsReceived(event)
609
610
611    def _onEvent_delete(self, sender, recipient, action, headers):
612        nodeIdentifier = action["node"]
613        event = DeleteEvent(sender, recipient, nodeIdentifier, headers)
614        if action.redirect:
615            event.redirectURI = action.redirect.getAttribute('uri')
616        self.deleteReceived(event)
617
618
619    def _onEvent_purge(self, sender, recipient, action, headers):
620        nodeIdentifier = action["node"]
621        event = PurgeEvent(sender, recipient, nodeIdentifier, headers)
622        self.purgeReceived(event)
623
624
625    def itemsReceived(self, event):
626        pass
627
628
629    def deleteReceived(self, event):
630        pass
631
632
633    def purgeReceived(self, event):
634        pass
635
636
637    def createNode(self, service, nodeIdentifier=None, options=None,
638                         sender=None):
639        """
640        Create a publish subscribe node.
641
642        @param service: The publish subscribe service to create the node at.
643        @type service: L{JID}
644        @param nodeIdentifier: Optional suggestion for the id of the node.
645        @type nodeIdentifier: C{unicode}
646        @param options: Optional node configuration options.
647        @type options: C{dict}
648        """
649        request = PubSubRequest('create')
650        request.recipient = service
651        request.nodeIdentifier = nodeIdentifier
652        request.sender = sender
653
654        if options:
655            form = data_form.Form(formType='submit',
656                                  formNamespace=NS_PUBSUB_NODE_CONFIG)
657            form.makeFields(options)
658            request.options = form
659
660        def cb(iq):
661            try:
662                new_node = iq.pubsub.create["node"]
663            except AttributeError:
664                # the suggested node identifier was accepted
665                new_node = nodeIdentifier
666            return new_node
667
668        d = request.send(self.xmlstream)
669        d.addCallback(cb)
670        return d
671
672
673    def deleteNode(self, service, nodeIdentifier, sender=None):
674        """
675        Delete a publish subscribe node.
676
677        @param service: The publish subscribe service to delete the node from.
678        @type service: L{JID}
679        @param nodeIdentifier: The identifier of the node.
680        @type nodeIdentifier: C{unicode}
681        """
682        request = PubSubRequest('delete')
683        request.recipient = service
684        request.nodeIdentifier = nodeIdentifier
685        request.sender = sender
686        return request.send(self.xmlstream)
687
688
689    def subscribe(self, service, nodeIdentifier, subscriber, sender=None):
690        """
691        Subscribe to a publish subscribe node.
692
693        @param service: The publish subscribe service that keeps the node.
694        @type service: L{JID}
695        @param nodeIdentifier: The identifier of the node.
696        @type nodeIdentifier: C{unicode}
697        @param subscriber: The entity to subscribe to the node. This entity
698                           will get notifications of new published items.
699        @type subscriber: L{JID}
700        """
701        request = PubSubRequest('subscribe')
702        request.recipient = service
703        request.nodeIdentifier = nodeIdentifier
704        request.subscriber = subscriber
705        request.sender = sender
706
707        def cb(iq):
708            subscription = iq.pubsub.subscription["subscription"]
709
710            if subscription == 'pending':
711                raise SubscriptionPending
712            elif subscription == 'unconfigured':
713                raise SubscriptionUnconfigured
714            else:
715                # we assume subscription == 'subscribed'
716                # any other value would be invalid, but that should have
717                # yielded a stanza error.
718                return None
719
720        d = request.send(self.xmlstream)
721        d.addCallback(cb)
722        return d
723
724
725    def unsubscribe(self, service, nodeIdentifier, subscriber, sender=None):
726        """
727        Unsubscribe from a publish subscribe node.
728
729        @param service: The publish subscribe service that keeps the node.
730        @type service: L{JID}
731        @param nodeIdentifier: The identifier of the node.
732        @type nodeIdentifier: C{unicode}
733        @param subscriber: The entity to unsubscribe from the node.
734        @type subscriber: L{JID}
735        """
736        request = PubSubRequest('unsubscribe')
737        request.recipient = service
738        request.nodeIdentifier = nodeIdentifier
739        request.subscriber = subscriber
740        request.sender = sender
741        return request.send(self.xmlstream)
742
743
744    def publish(self, service, nodeIdentifier, items=None, sender=None):
745        """
746        Publish to a publish subscribe node.
747
748        @param service: The publish subscribe service that keeps the node.
749        @type service: L{JID}
750        @param nodeIdentifier: The identifier of the node.
751        @type nodeIdentifier: C{unicode}
752        @param items: Optional list of L{Item}s to publish.
753        @type items: C{list}
754        """
755        request = PubSubRequest('publish')
756        request.recipient = service
757        request.nodeIdentifier = nodeIdentifier
758        request.items = items
759        request.sender = sender
760        return request.send(self.xmlstream)
761
762
763    def items(self, service, nodeIdentifier, maxItems=None, sender=None):
764        """
765        Retrieve previously published items from a publish subscribe node.
766
767        @param service: The publish subscribe service that keeps the node.
768        @type service: L{JID}
769        @param nodeIdentifier: The identifier of the node.
770        @type nodeIdentifier: C{unicode}
771        @param maxItems: Optional limit on the number of retrieved items.
772        @type maxItems: C{int}
773        """
774        request = PubSubRequest('items')
775        request.recipient = service
776        request.nodeIdentifier = nodeIdentifier
777        if maxItems:
778            request.maxItems = str(int(maxItems))
779        request.sender = sender
780
781        def cb(iq):
782            items = []
783            for element in iq.pubsub.items.elements():
784                if element.uri == NS_PUBSUB and element.name == 'item':
785                    items.append(element)
786            return items
787
788        d = request.send(self.xmlstream)
789        d.addCallback(cb)
790        return d
791
792
793
794class PubSubService(XMPPHandler, IQHandlerMixin):
795    """
796    Protocol implementation for a XMPP Publish Subscribe Service.
797
798    The word Service here is used as taken from the Publish Subscribe
799    specification. It is the party responsible for keeping nodes and their
800    subscriptions, and sending out notifications.
801
802    Methods from the L{IPubSubService} interface that are called as
803    a result of an XMPP request may raise exceptions. Alternatively the
804    deferred returned by these methods may have their errback called. These are
805    handled as follows:
806
807     - If the exception is an instance of L{error.StanzaError}, an error
808       response iq is returned.
809     - Any other exception is reported using L{log.msg}. An error response
810       with the condition C{internal-server-error} is returned.
811
812    The default implementation of said methods raises an L{Unsupported}
813    exception and are meant to be overridden.
814
815    @ivar discoIdentity: Service discovery identity as a dictionary with
816                         keys C{'category'}, C{'type'} and C{'name'}.
817    @ivar pubSubFeatures: List of supported publish-subscribe features for
818                          service discovery, as C{str}.
819    @type pubSubFeatures: C{list} or C{None}
820    """
821
822    implements(IPubSubService)
823
824    iqHandlers = {
825            '/*': '_onPubSubRequest',
826            }
827
828    _legacyHandlers = {
829        'publish': ('publish', ['sender', 'recipient',
830                                'nodeIdentifier', 'items']),
831        'subscribe': ('subscribe', ['sender', 'recipient',
832                                    'nodeIdentifier', 'subscriber']),
833        'unsubscribe': ('unsubscribe', ['sender', 'recipient',
834                                        'nodeIdentifier', 'subscriber']),
835        'subscriptions': ('subscriptions', ['sender', 'recipient']),
836        'affiliations': ('affiliations', ['sender', 'recipient']),
837        'create': ('create', ['sender', 'recipient', 'nodeIdentifier']),
838        'getConfigurationOptions': ('getConfigurationOptions', []),
839        'default': ('getDefaultConfiguration',
840                    ['sender', 'recipient', 'nodeType']),
841        'configureGet': ('getConfiguration', ['sender', 'recipient',
842                                              'nodeIdentifier']),
843        'configureSet': ('setConfiguration', ['sender', 'recipient',
844                                              'nodeIdentifier', 'options']),
845        'items': ('items', ['sender', 'recipient', 'nodeIdentifier',
846                            'maxItems', 'itemIdentifiers']),
847        'retract': ('retract', ['sender', 'recipient', 'nodeIdentifier',
848                                'itemIdentifiers']),
849        'purge': ('purge', ['sender', 'recipient', 'nodeIdentifier']),
850        'delete': ('delete', ['sender', 'recipient', 'nodeIdentifier']),
851    }
852
853    hideNodes = False
854
855    def __init__(self, resource=None):
856        self.resource = resource
857        self.discoIdentity = {'category': 'pubsub',
858                              'type': 'generic',
859                              'name': 'Generic Publish-Subscribe Service'}
860
861        self.pubSubFeatures = []
862
863
864    def connectionMade(self):
865        self.xmlstream.addObserver(PUBSUB_REQUEST, self.handleRequest)
866
867
868    def getDiscoInfo(self, requestor, target, nodeIdentifier):
869        def toInfo(nodeInfo, info):
870            if not nodeInfo:
871                return info
872
873            (nodeType, metaData) = nodeInfo['type'], nodeInfo['meta-data']
874            info.append(disco.DiscoIdentity('pubsub', nodeType))
875            if metaData:
876                form = data_form.Form(formType="result",
877                                      formNamespace=NS_PUBSUB_META_DATA)
878                form.addField(
879                        data_form.Field(
880                            var='pubsub#node_type',
881                            value=nodeType,
882                            label='The type of node (collection or leaf)'
883                        )
884                )
885
886                for metaDatum in metaData:
887                    form.addField(data_form.Field.fromDict(metaDatum))
888
889                info.append(form)
890
891            return info
892
893        info = []
894
895        request = PubSubRequest('discoInfo')
896
897        if self.resource is not None:
898            resource = self.resource.locateResource(request)
899            identity = resource.discoIdentity
900            features = resource.features
901            getInfo = resource.getInfo
902        else:
903            category, idType, name = self.discoIdentity
904            identity = disco.DiscoIdentity(category, idType, name)
905            features = self.pubSubFeatures
906            getInfo = self.getNodeInfo
907
908        if not nodeIdentifier:
909            info.append(identity)
910            info.append(disco.DiscoFeature(disco.NS_DISCO_ITEMS))
911            info.extend([disco.DiscoFeature("%s#%s" % (NS_PUBSUB, feature))
912                         for feature in features])
913
914        d = getInfo(requestor, target, nodeIdentifier or '')
915        d.addCallback(toInfo, info)
916        d.addErrback(log.err)
917        return d
918
919
920    def getDiscoItems(self, requestor, target, nodeIdentifier):
921        if self.hideNodes:
922            d = defer.succeed([])
923        elif self.resource is not None:
924            request = PubSubRequest('discoInfo')
925            resource = self.resource.locateResource(request)
926            d = resource.getNodes(requestor, target, nodeIdentifier)
927        elif nodeIdentifier:
928            d = self.getNodes(requestor, target)
929        else:
930            d = defer.succeed([])
931
932        d.addCallback(lambda nodes: [disco.DiscoItem(target, node)
933                                     for node in nodes])
934        return d
935
936
937    def _onPubSubRequest(self, iq):
938        request = PubSubRequest.fromElement(iq)
939
940        if self.resource is not None:
941            resource = self.resource.locateResource(request)
942        else:
943            resource = self
944
945        # Preprocess the request, knowing the handling resource
946        try:
947            preProcessor = getattr(self, '_preProcess_%s' % request.verb)
948        except AttributeError:
949            pass
950        else:
951            request = preProcessor(resource, request)
952            if request is None:
953                return defer.succeed(None)
954
955        # Process the request itself,
956        if resource is not self:
957            try:
958                handler = getattr(resource, request.verb)
959            except AttributeError:
960                # fix lookup feature
961                text = "Request verb: %s" % request.verb
962                return defer.fail(Unsupported('', text))
963
964            d = handler(request)
965        else:
966            handlerName, argNames = self._legacyHandlers[request.verb]
967            handler = getattr(self, handlerName)
968            args = [getattr(request, arg) for arg in argNames]
969            if 'options' in argNames:
970                args[argNames.index('options')] = request.options.getValues()
971            d = handler(*args)
972
973        # If needed, translate the result into a response
974        try:
975            cb = getattr(self, '_toResponse_%s' % request.verb)
976        except AttributeError:
977            pass
978        else:
979            d.addCallback(cb, resource, request)
980
981        return d
982
983
984    def _toResponse_subscribe(self, result, resource, request):
985        response = domish.Element((NS_PUBSUB, "pubsub"))
986        subscription = response.addElement("subscription")
987        if result.nodeIdentifier:
988            subscription["node"] = result.nodeIdentifier
989        subscription["jid"] = result.subscriber.full()
990        subscription["subscription"] = result.state
991        return response
992
993
994    def _toResponse_subscriptions(self, result, resource, request):
995        response = domish.Element((NS_PUBSUB, 'pubsub'))
996        subscriptions = response.addElement('subscriptions')
997        for subscription in result:
998            item = subscriptions.addElement('subscription')
999            item['node'] = subscription.nodeIdentifier
1000            item['jid'] = subscription.subscriber.full()
1001            item['subscription'] = subscription.state
1002        return response
1003
1004
1005    def _toResponse_affiliations(self, result, resource, request):
1006        response = domish.Element((NS_PUBSUB, 'pubsub'))
1007        affiliations = response.addElement('affiliations')
1008
1009        for nodeIdentifier, affiliation in result:
1010            item = affiliations.addElement('affiliation')
1011            item['node'] = nodeIdentifier
1012            item['affiliation'] = affiliation
1013
1014        return response
1015
1016
1017    def _toResponse_create(self, result, resource, request):
1018        if not request.nodeIdentifier or request.nodeIdentifier != result:
1019            response = domish.Element((NS_PUBSUB, 'pubsub'))
1020            create = response.addElement('create')
1021            create['node'] = result
1022            return response
1023        else:
1024            return None
1025
1026
1027    def _formFromConfiguration(self, resource, values):
1028        fieldDefs = resource.getConfigurationOptions()
1029        form = data_form.Form(formType="form",
1030                              formNamespace=NS_PUBSUB_NODE_CONFIG)
1031        form.makeFields(values, fieldDefs)
1032        return form
1033
1034
1035    def _checkConfiguration(self, resource, form):
1036        fieldDefs = resource.getConfigurationOptions()
1037        form.typeCheck(fieldDefs, filterUnknown=True)
1038
1039
1040    def _preProcess_create(self, resource, request):
1041        if request.options:
1042            self._checkConfiguration(resource, request.options)
1043        return request
1044
1045
1046    def _preProcess_default(self, resource, request):
1047        if request.nodeType not in ('leaf', 'collection'):
1048            raise error.StanzaError('not-acceptable')
1049        else:
1050            return request
1051
1052
1053    def _toResponse_default(self, options, resource, request):
1054        response = domish.Element((NS_PUBSUB_OWNER, "pubsub"))
1055        default = response.addElement("default")
1056        form = self._formFromConfiguration(resource, options)
1057        default.addChild(form.toElement())
1058        return response
1059
1060
1061    def _toResponse_configureGet(self, options, resource, request):
1062        response = domish.Element((NS_PUBSUB_OWNER, "pubsub"))
1063        configure = response.addElement("configure")
1064        form = self._formFromConfiguration(resource, options)
1065        configure.addChild(form.toElement())
1066
1067        if request.nodeIdentifier:
1068            configure["node"] = request.nodeIdentifier
1069
1070        return response
1071
1072
1073    def _preProcess_configureSet(self, resource, request):
1074        if request.options.formType == 'cancel':
1075            return None
1076        else:
1077            self._checkConfiguration(resource, request.options)
1078            return request
1079
1080
1081    def _toResponse_items(self, result, resource, request):
1082        response = domish.Element((NS_PUBSUB, 'pubsub'))
1083        items = response.addElement('items')
1084        items["node"] = request.nodeIdentifier
1085
1086        for item in result:
1087            items.addChild(item)
1088
1089        return response
1090
1091
1092    def _createNotification(self, eventType, service, nodeIdentifier,
1093                                  subscriber, subscriptions=None):
1094        headers = []
1095
1096        if subscriptions:
1097            for subscription in subscriptions:
1098                if nodeIdentifier != subscription.nodeIdentifier:
1099                    headers.append(('Collection', subscription.nodeIdentifier))
1100
1101        message = domish.Element((None, "message"))
1102        message["from"] = service.full()
1103        message["to"] = subscriber.full()
1104        event = message.addElement((NS_PUBSUB_EVENT, "event"))
1105
1106        element = event.addElement(eventType)
1107        element["node"] = nodeIdentifier
1108
1109        if headers:
1110            message.addChild(shim.Headers(headers))
1111
1112        return message
1113
1114    # public methods
1115
1116    def notifyPublish(self, service, nodeIdentifier, notifications):
1117        for subscriber, subscriptions, items in notifications:
1118            message = self._createNotification('items', service,
1119                                               nodeIdentifier, subscriber,
1120                                               subscriptions)
1121            message.event.items.children = items
1122            self.send(message)
1123
1124
1125    def notifyDelete(self, service, nodeIdentifier, subscribers,
1126                           redirectURI=None):
1127        for subscriber in subscribers:
1128            message = self._createNotification('delete', service,
1129                                               nodeIdentifier,
1130                                               subscriber)
1131            if redirectURI:
1132                redirect = message.event.delete.addElement('redirect')
1133                redirect['uri'] = redirectURI
1134            self.send(message)
1135
1136
1137    def getNodeInfo(self, requestor, service, nodeIdentifier):
1138        return None
1139
1140
1141    def getNodes(self, requestor, service):
1142        return []
1143
1144
1145    def publish(self, requestor, service, nodeIdentifier, items):
1146        raise Unsupported('publish')
1147
1148
1149    def subscribe(self, requestor, service, nodeIdentifier, subscriber):
1150        raise Unsupported('subscribe')
1151
1152
1153    def unsubscribe(self, requestor, service, nodeIdentifier, subscriber):
1154        raise Unsupported('subscribe')
1155
1156
1157    def subscriptions(self, requestor, service):
1158        raise Unsupported('retrieve-subscriptions')
1159
1160
1161    def affiliations(self, requestor, service):
1162        raise Unsupported('retrieve-affiliations')
1163
1164
1165    def create(self, requestor, service, nodeIdentifier):
1166        raise Unsupported('create-nodes')
1167
1168
1169    def getConfigurationOptions(self):
1170        return {}
1171
1172
1173    def getDefaultConfiguration(self, requestor, service, nodeType):
1174        raise Unsupported('retrieve-default')
1175
1176
1177    def getConfiguration(self, requestor, service, nodeIdentifier):
1178        raise Unsupported('config-node')
1179
1180
1181    def setConfiguration(self, requestor, service, nodeIdentifier, options):
1182        raise Unsupported('config-node')
1183
1184
1185    def items(self, requestor, service, nodeIdentifier, maxItems,
1186                    itemIdentifiers):
1187        raise Unsupported('retrieve-items')
1188
1189
1190    def retract(self, requestor, service, nodeIdentifier, itemIdentifiers):
1191        raise Unsupported('retract-items')
1192
1193
1194    def purge(self, requestor, service, nodeIdentifier):
1195        raise Unsupported('purge-nodes')
1196
1197
1198    def delete(self, requestor, service, nodeIdentifier):
1199        raise Unsupported('delete-nodes')
1200
1201
1202
1203class PubSubResource(object):
1204
1205    implements(IPubSubResource)
1206
1207    features = []
1208    discoIdentity = disco.DiscoIdentity('pubsub',
1209                                        'service',
1210                                        'Publish-Subscribe Service')
1211
1212
1213    def locateResource(self, request):
1214        return self
1215
1216
1217    def getInfo(self, requestor, service, nodeIdentifier):
1218        return defer.succeed(None)
1219
1220
1221    def getNodes(self, requestor, service, nodeIdentifier):
1222        return defer.succeed([])
1223
1224
1225    def getConfigurationOptions(self):
1226        return {}
1227
1228
1229    def publish(self, request):
1230        return defer.fail(Unsupported('publish'))
1231
1232
1233    def subscribe(self, request):
1234        return defer.fail(Unsupported('subscribe'))
1235
1236
1237    def unsubscribe(self, request):
1238        return defer.fail(Unsupported('subscribe'))
1239
1240
1241    def subscriptions(self, request):
1242        return defer.fail(Unsupported('retrieve-subscriptions'))
1243
1244
1245    def affiliations(self, request):
1246        return defer.fail(Unsupported('retrieve-affiliations'))
1247
1248
1249    def create(self, request):
1250        return defer.fail(Unsupported('create-nodes'))
1251
1252
1253    def default(self, request):
1254        return defer.fail(Unsupported('retrieve-default'))
1255
1256
1257    def configureGet(self, request):
1258        return defer.fail(Unsupported('config-node'))
1259
1260
1261    def configureSet(self, request):
1262        return defer.fail(Unsupported('config-node'))
1263
1264
1265    def items(self, request):
1266        return defer.fail(Unsupported('retrieve-items'))
1267
1268
1269    def retract(self, request):
1270        return defer.fail(Unsupported('retract-items'))
1271
1272
1273    def purge(self, request):
1274        return defer.fail(Unsupported('purge-nodes'))
1275
1276
1277    def delete(self, request):
1278        return defer.fail(Unsupported('delete-nodes'))
1279
1280
1281    def affiliationsGet(self, request):
1282        return defer.fail(Unsupported('modify-affiliations'))
1283
1284
1285    def affiliationsSet(self, request):
1286        return defer.fail(Unsupported('modify-affiliations'))
1287
1288
1289    def subscriptionsGet(self, request):
1290        return defer.fail(Unsupported('manage-subscriptions'))
1291
1292
1293    def subscriptionsSet(self, request):
1294        return defer.fail(Unsupported('manage-subscriptions'))
Note: See TracBrowser for help on using the repository browser.