source: wokkel/pubsub.py @ 59:e984452207e0

Last change on this file since 59:e984452207e0 was 59:e984452207e0, checked in by Ralph Meijer <ralphm@…>, 12 years ago

Provide PubSubResource?, modeled after Twisted Web resources.

Author: ralphm.
Fixes #47.

This should make it rather easy to make publish-subscribe enabled services.

File size: 41.2 KB
Line 
1# -*- test-case-name: wokkel.test.test_pubsub -*-
2#
3# Copyright (c) 2003-2008 Ralph Meijer
4# See LICENSE for details.
5
6"""
7XMPP publish-subscribe protocol.
8
9This protocol is specified in
10U{XEP-0060<http://www.xmpp.org/extensions/xep-0060.html>}.
11"""
12
13from zope.interface import implements
14
15from twisted.internet import defer
16from twisted.python import log
17from twisted.words.protocols.jabber import jid, error, xmlstream
18from twisted.words.xish import domish
19
20from wokkel import disco, data_form, generic, shim
21from wokkel.subprotocols import IQHandlerMixin, XMPPHandler
22from wokkel.iwokkel import IPubSubClient, IPubSubService, IPubSubResource
23
24# Iq get and set XPath queries
25IQ_GET = '/iq[@type="get"]'
26IQ_SET = '/iq[@type="set"]'
27
28# Publish-subscribe namespaces
29NS_PUBSUB = 'http://jabber.org/protocol/pubsub'
30NS_PUBSUB_EVENT = NS_PUBSUB + '#event'
31NS_PUBSUB_ERRORS = NS_PUBSUB + '#errors'
32NS_PUBSUB_OWNER = NS_PUBSUB + "#owner"
33NS_PUBSUB_NODE_CONFIG = NS_PUBSUB + "#node_config"
34NS_PUBSUB_META_DATA = NS_PUBSUB + "#meta-data"
35NS_PUBSUB_SUBSCRIBE_OPTIONS = NS_PUBSUB + "#subscribe_options"
36
37# XPath to match pubsub requests
38PUBSUB_REQUEST = '/iq[@type="get" or @type="set"]/' + \
39                    'pubsub[@xmlns="' + NS_PUBSUB + '" or ' + \
40                           '@xmlns="' + NS_PUBSUB_OWNER + '"]'
41
42class SubscriptionPending(Exception):
43    """
44    Raised when the requested subscription is pending acceptance.
45    """
46
47
48
49class SubscriptionUnconfigured(Exception):
50    """
51    Raised when the requested subscription needs to be configured before
52    becoming active.
53    """
54
55
56
57class PubSubError(error.StanzaError):
58    """
59    Exception with publish-subscribe specific condition.
60    """
61    def __init__(self, condition, pubsubCondition, feature=None, text=None):
62        appCondition = domish.Element((NS_PUBSUB_ERRORS, pubsubCondition))
63        if feature:
64            appCondition['feature'] = feature
65        error.StanzaError.__init__(self, condition,
66                                         text=text,
67                                         appCondition=appCondition)
68
69
70
71class BadRequest(error.StanzaError):
72    """
73    Bad request stanza error.
74    """
75    def __init__(self, pubsubCondition=None, text=None):
76        if pubsubCondition:
77            appCondition = domish.Element((NS_PUBSUB_ERRORS, pubsubCondition))
78        else:
79            appCondition = None
80        error.StanzaError.__init__(self, 'bad-request',
81                                         text=text,
82                                         appCondition=appCondition)
83
84
85
86class Unsupported(PubSubError):
87    def __init__(self, feature, text=None):
88        self.feature = feature
89        PubSubError.__init__(self, 'feature-not-implemented',
90                                   'unsupported',
91                                   feature,
92                                   text)
93
94    def __str__(self):
95        message = PubSubError.__str__(self)
96        message += ', feature %r' % self.feature
97        return message
98
99
100class Subscription(object):
101    """
102    A subscription to a node.
103
104    @ivar nodeIdentifier: The identifier of the node subscribed to.
105                          The root node is denoted by C{None}.
106    @ivar subscriber: The subscribing entity.
107    @ivar state: The subscription state. One of C{'subscribed'}, C{'pending'},
108                 C{'unconfigured'}.
109    @ivar options: Optional list of subscription options.
110    @type options: C{dict}.
111    """
112
113    def __init__(self, nodeIdentifier, subscriber, state, options=None):
114        self.nodeIdentifier = nodeIdentifier
115        self.subscriber = subscriber
116        self.state = state
117        self.options = options or {}
118
119
120
121class Item(domish.Element):
122    """
123    Publish subscribe item.
124
125    This behaves like an object providing L{domish.IElement}.
126
127    Item payload can be added using C{addChild} or C{addRawXml}, or using the
128    C{payload} keyword argument to C{__init__}.
129    """
130
131    def __init__(self, id=None, payload=None):
132        """
133        @param id: optional item identifier
134        @type id: L{unicode}
135        @param payload: optional item payload. Either as a domish element, or
136                        as serialized XML.
137        @type payload: object providing L{domish.IElement} or L{unicode}.
138        """
139
140        domish.Element.__init__(self, (NS_PUBSUB, 'item'))
141        if id is not None:
142            self['id'] = id
143        if payload is not None:
144            if isinstance(payload, basestring):
145                self.addRawXml(payload)
146            else:
147                self.addChild(payload)
148
149
150
151class PubSubRequest(generic.Stanza):
152    """
153    A publish-subscribe request.
154
155    The set of instance variables used depends on the type of request. If
156    a variable is not applicable or not passed in the request, its value is
157    C{None}.
158
159    @ivar verb: The type of publish-subscribe request. See L{_requestVerbMap}.
160    @type verb: C{str}.
161
162    @ivar affiliations: Affiliations to be modified.
163    @type affiliations: C{set}
164    @ivar items: The items to be published, as L{domish.Element}s.
165    @type items: C{list}
166    @ivar itemIdentifiers: Identifiers of the items to be retrieved or
167                           retracted.
168    @type itemIdentifiers: C{set}
169    @ivar maxItems: Maximum number of items to retrieve.
170    @type maxItems: C{int}.
171    @ivar nodeIdentifier: Identifier of the node the request is about.
172    @type nodeIdentifier: C{unicode}
173    @ivar nodeType: The type of node that should be created, or for which the
174                    configuration is retrieved. C{'leaf'} or C{'collection'}.
175    @type nodeType: C{str}
176    @ivar options: Configurations options for nodes, subscriptions and publish
177                   requests.
178    @type options: L{data_form.Form}
179    @ivar subscriber: The subscribing entity.
180    @type subscriber: L{JID}
181    @ivar subscriptionIdentifier: Identifier for a specific subscription.
182    @type subscriptionIdentifier: C{unicode}
183    @ivar subscriptions: Subscriptions to be modified, as a set of
184                         L{Subscription}.
185    @type subscriptions: C{set}
186    """
187
188    verb = None
189
190    affiliations = None
191    items = None
192    itemIdentifiers = None
193    maxItems = None
194    nodeIdentifier = None
195    nodeType = None
196    options = None
197    subscriber = None
198    subscriptionIdentifier = None
199    subscriptions = None
200
201    # Map request iq type and subelement name to request verb
202    _requestVerbMap = {
203        ('set', NS_PUBSUB, 'publish'): 'publish',
204        ('set', NS_PUBSUB, 'subscribe'): 'subscribe',
205        ('set', NS_PUBSUB, 'unsubscribe'): 'unsubscribe',
206        ('get', NS_PUBSUB, 'options'): 'optionsGet',
207        ('set', NS_PUBSUB, 'options'): 'optionsSet',
208        ('get', NS_PUBSUB, 'subscriptions'): 'subscriptions',
209        ('get', NS_PUBSUB, 'affiliations'): 'affiliations',
210        ('set', NS_PUBSUB, 'create'): 'create',
211        ('get', NS_PUBSUB_OWNER, 'default'): 'default',
212        ('get', NS_PUBSUB_OWNER, 'configure'): 'configureGet',
213        ('set', NS_PUBSUB_OWNER, 'configure'): 'configureSet',
214        ('get', NS_PUBSUB, 'items'): 'items',
215        ('set', NS_PUBSUB, 'retract'): 'retract',
216        ('set', NS_PUBSUB_OWNER, 'purge'): 'purge',
217        ('set', NS_PUBSUB_OWNER, 'delete'): 'delete',
218        ('get', NS_PUBSUB_OWNER, 'affiliations'): 'affiliationsGet',
219        ('set', NS_PUBSUB_OWNER, 'affiliations'): 'affiliationsSet',
220        ('get', NS_PUBSUB_OWNER, 'subscriptions'): 'subscriptionsGet',
221        ('set', NS_PUBSUB_OWNER, 'subscriptions'): 'subscriptionsSet',
222    }
223
224    # Map request verb to request iq type and subelement name
225    _verbRequestMap = dict(((v, k) for k, v in _requestVerbMap.iteritems()))
226
227    # Map request verb to parameter handler names
228    _parameters = {
229        'publish': ['node', 'items'],
230        'subscribe': ['nodeOrEmpty', 'jid'],
231        'unsubscribe': ['nodeOrEmpty', 'jid'],
232        'optionsGet': ['nodeOrEmpty', 'jid'],
233        'optionsSet': ['nodeOrEmpty', 'jid', 'options'],
234        'subscriptions': [],
235        'affiliations': [],
236        'create': ['nodeOrNone'],
237        'default': ['default'],
238        'configureGet': ['nodeOrEmpty'],
239        'configureSet': ['nodeOrEmpty', 'configure'],
240        'items': ['node', 'maxItems', 'itemIdentifiers'],
241        'retract': ['node', 'itemIdentifiers'],
242        'purge': ['node'],
243        'delete': ['node'],
244        'affiliationsGet': ['nodeOrEmpty'],
245        'affiliationsSet': [],
246        'subscriptionsGet': ['nodeOrEmpty'],
247        'subscriptionsSet': [],
248    }
249
250    def __init__(self, verb=None):
251        self.verb = verb
252
253
254    @staticmethod
255    def _findForm(element, formNamespace):
256        """
257        Find a Data Form.
258
259        Look for an element that represents a Data Form with the specified
260        form namespace as a child element of the given element.
261        """
262        if not element:
263            return None
264
265        form = None
266        for child in element.elements():
267            try:
268                form = data_form.Form.fromElement(child)
269            except data_form.Error:
270                continue
271
272            if form.formNamespace != NS_PUBSUB_NODE_CONFIG:
273                continue
274
275        return form
276
277
278    def _parse_node(self, verbElement):
279        """
280        Parse the required node identifier out of the verbElement.
281        """
282        try:
283            self.nodeIdentifier = verbElement["node"]
284        except KeyError:
285            raise BadRequest('nodeid-required')
286
287
288    def _render_node(self, verbElement):
289        """
290        Render the required node identifier on the verbElement.
291        """
292        if not self.nodeIdentifier:
293            raise Exception("Node identifier is required")
294
295        verbElement['node'] = self.nodeIdentifier
296
297
298    def _parse_nodeOrEmpty(self, verbElement):
299        """
300        Parse the node identifier out of the verbElement. May be empty.
301        """
302        self.nodeIdentifier = verbElement.getAttribute("node", '')
303
304
305    def _render_nodeOrEmpty(self, verbElement):
306        """
307        Render the node identifier on the verbElement. May be empty.
308        """
309        if self.nodeIdentifier:
310            verbElement['node'] = self.nodeIdentifier
311
312
313    def _parse_nodeOrNone(self, verbElement):
314        """
315        Parse the optional node identifier out of the verbElement.
316        """
317        self.nodeIdentifier = verbElement.getAttribute("node")
318
319
320    def _render_nodeOrNone(self, verbElement):
321        """
322        Render the optional node identifier on the verbElement.
323        """
324        if self.nodeIdentifier:
325            verbElement['node'] = self.nodeIdentifier
326
327
328    def _parse_items(self, verbElement):
329        """
330        Parse items out of the verbElement for publish requests.
331        """
332        self.items = []
333        for element in verbElement.elements():
334            if element.uri == NS_PUBSUB and element.name == 'item':
335                self.items.append(element)
336
337
338    def _render_items(self, verbElement):
339        """
340        Render items into the verbElement for publish requests.
341        """
342        if self.items:
343            for item in self.items:
344                verbElement.addChild(item)
345
346
347    def _parse_jid(self, verbElement):
348        """
349        Parse subscriber out of the verbElement for un-/subscribe requests.
350        """
351        try:
352            self.subscriber = jid.internJID(verbElement["jid"])
353        except KeyError:
354            raise BadRequest('jid-required')
355
356
357    def _render_jid(self, verbElement):
358        """
359        Render subscriber into the verbElement for un-/subscribe requests.
360        """
361        verbElement['jid'] = self.subscriber.full()
362
363
364    def _parse_default(self, verbElement):
365        """
366        Parse node type out of a request for the default node configuration.
367        """
368        form = PubSubRequest._findForm(verbElement, NS_PUBSUB_NODE_CONFIG)
369        if form and form.formType == 'submit':
370            values = form.getValues()
371            self.nodeType = values.get('pubsub#node_type', 'leaf')
372        else:
373            self.nodeType = 'leaf'
374
375
376    def _parse_configure(self, verbElement):
377        """
378        Parse options out of a request for setting the node configuration.
379        """
380        form = PubSubRequest._findForm(verbElement, NS_PUBSUB_NODE_CONFIG)
381        if form:
382            if form.formType == 'submit':
383                self.options = form.getValues()
384            elif form.formType == 'cancel':
385                self.options = {}
386            else:
387                raise BadRequest(text="Unexpected form type %r" % form.formType)
388        else:
389            raise BadRequest(text="Missing configuration form")
390
391
392
393    def _parse_itemIdentifiers(self, verbElement):
394        """
395        Parse item identifiers out of items and retract requests.
396        """
397        self.itemIdentifiers = []
398        for element in verbElement.elements():
399            if element.uri == NS_PUBSUB and element.name == 'item':
400                try:
401                    self.itemIdentifiers.append(element["id"])
402                except KeyError:
403                    raise BadRequest()
404
405
406    def _render_itemIdentifiers(self, verbElement):
407        """
408        Render item identifiers into items and retract requests.
409        """
410        if self.itemIdentifiers:
411            for itemIdentifier in self.itemIdentifiers:
412                item = verbElement.addElement('item')
413                item['id'] = itemIdentifier
414
415
416    def _parse_maxItems(self, verbElement):
417        """
418        Parse maximum items out of an items request.
419        """
420        value = verbElement.getAttribute('max_items')
421
422        if value:
423            try:
424                self.maxItems = int(value)
425            except ValueError:
426                raise BadRequest(text="Field max_items requires a positive " +
427                                      "integer value")
428
429
430    def _render_maxItems(self, verbElement):
431        """
432        Parse maximum items into an items request.
433        """
434        if self.maxItems:
435            verbElement['max_items'] = unicode(self.maxItems)
436
437
438    def _parse_options(self, verbElement):
439        form = PubSubRequest._findForm(verbElement, NS_PUBSUB_SUBSCRIBE_OPTIONS)
440        if form:
441            if form.formType == 'submit':
442                self.options = form.getValues()
443            elif form.formType == 'cancel':
444                self.options = {}
445            else:
446                raise BadRequest(text="Unexpected form type %r" % form.formType)
447        else:
448            raise BadRequest(text="Missing options form")
449
450    def parseElement(self, element):
451        """
452        Parse the publish-subscribe verb and parameters out of a request.
453        """
454        generic.Stanza.parseElement(self, element)
455
456        for child in element.pubsub.elements():
457            key = (self.stanzaType, child.uri, child.name)
458            try:
459                verb = self._requestVerbMap[key]
460            except KeyError:
461                continue
462            else:
463                self.verb = verb
464                break
465
466        if not self.verb:
467            raise NotImplementedError()
468
469        for parameter in self._parameters[verb]:
470            getattr(self, '_parse_%s' % parameter)(child)
471
472
473    def send(self, xs):
474        """
475        Send this request to its recipient.
476
477        This renders all of the relevant parameters for this specific
478        requests into an L{xmlstream.IQ}, and invoke its C{send} method.
479        This returns a deferred that fires upon reception of a response. See
480        L{xmlstream.IQ} for details.
481
482        @param xs: The XML stream to send the request on.
483        @type xs: L{xmlstream.XmlStream}
484        @rtype: L{defer.Deferred}.
485        """
486
487        try:
488            (self.stanzaType,
489             childURI,
490             childName) = self._verbRequestMap[self.verb]
491        except KeyError:
492            raise NotImplementedError()
493
494        iq = xmlstream.IQ(xs, self.stanzaType)
495        iq.addElement((childURI, 'pubsub'))
496        verbElement = iq.pubsub.addElement(childName)
497
498        if self.sender:
499            iq['from'] = self.sender.full()
500        if self.recipient:
501            iq['to'] = self.recipient.full()
502
503        for parameter in self._parameters[self.verb]:
504            getattr(self, '_render_%s' % parameter)(verbElement)
505
506        return iq.send()
507
508
509
510class PubSubEvent(object):
511    """
512    A publish subscribe event.
513
514    @param sender: The entity from which the notification was received.
515    @type sender: L{jid.JID}
516    @param recipient: The entity to which the notification was sent.
517    @type recipient: L{wokkel.pubsub.ItemsEvent}
518    @param nodeIdentifier: Identifier of the node the event pertains to.
519    @type nodeIdentifier: C{unicode}
520    @param headers: SHIM headers, see L{wokkel.shim.extractHeaders}.
521    @type headers: L{dict}
522    """
523
524    def __init__(self, sender, recipient, nodeIdentifier, headers):
525        self.sender = sender
526        self.recipient = recipient
527        self.nodeIdentifier = nodeIdentifier
528        self.headers = headers
529
530
531
532class ItemsEvent(PubSubEvent):
533    """
534    A publish-subscribe event that signifies new, updated and retracted items.
535
536    @param items: List of received items as domish elements.
537    @type items: C{list} of L{domish.Element}
538    """
539
540    def __init__(self, sender, recipient, nodeIdentifier, items, headers):
541        PubSubEvent.__init__(self, sender, recipient, nodeIdentifier, headers)
542        self.items = items
543
544
545
546class DeleteEvent(PubSubEvent):
547    """
548    A publish-subscribe event that signifies the deletion of a node.
549    """
550
551    redirectURI = None
552
553
554
555class PurgeEvent(PubSubEvent):
556    """
557    A publish-subscribe event that signifies the purging of a node.
558    """
559
560
561
562class PubSubClient(XMPPHandler):
563    """
564    Publish subscribe client protocol.
565    """
566
567    implements(IPubSubClient)
568
569    def connectionInitialized(self):
570        self.xmlstream.addObserver('/message/event[@xmlns="%s"]' %
571                                   NS_PUBSUB_EVENT, self._onEvent)
572
573
574    def _onEvent(self, message):
575        try:
576            sender = jid.JID(message["from"])
577            recipient = jid.JID(message["to"])
578        except KeyError:
579            return
580
581        actionElement = None
582        for element in message.event.elements():
583            if element.uri == NS_PUBSUB_EVENT:
584                actionElement = element
585
586        if not actionElement:
587            return
588
589        eventHandler = getattr(self, "_onEvent_%s" % actionElement.name, None)
590
591        if eventHandler:
592            headers = shim.extractHeaders(message)
593            eventHandler(sender, recipient, actionElement, headers)
594            message.handled = True
595
596
597    def _onEvent_items(self, sender, recipient, action, headers):
598        nodeIdentifier = action["node"]
599
600        items = [element for element in action.elements()
601                         if element.name in ('item', 'retract')]
602
603        event = ItemsEvent(sender, recipient, nodeIdentifier, items, headers)
604        self.itemsReceived(event)
605
606
607    def _onEvent_delete(self, sender, recipient, action, headers):
608        nodeIdentifier = action["node"]
609        event = DeleteEvent(sender, recipient, nodeIdentifier, headers)
610        if action.redirect:
611            event.redirectURI = action.redirect.getAttribute('uri')
612        self.deleteReceived(event)
613
614
615    def _onEvent_purge(self, sender, recipient, action, headers):
616        nodeIdentifier = action["node"]
617        event = PurgeEvent(sender, recipient, nodeIdentifier, headers)
618        self.purgeReceived(event)
619
620
621    def itemsReceived(self, event):
622        pass
623
624
625    def deleteReceived(self, event):
626        pass
627
628
629    def purgeReceived(self, event):
630        pass
631
632
633    def createNode(self, service, nodeIdentifier=None, sender=None):
634        """
635        Create a publish subscribe node.
636
637        @param service: The publish subscribe service to create the node at.
638        @type service: L{JID}
639        @param nodeIdentifier: Optional suggestion for the id of the node.
640        @type nodeIdentifier: C{unicode}
641        """
642        request = PubSubRequest('create')
643        request.recipient = service
644        request.nodeIdentifier = nodeIdentifier
645        request.sender = sender
646
647        def cb(iq):
648            try:
649                new_node = iq.pubsub.create["node"]
650            except AttributeError:
651                # the suggested node identifier was accepted
652                new_node = nodeIdentifier
653            return new_node
654
655        d = request.send(self.xmlstream)
656        d.addCallback(cb)
657        return d
658
659
660    def deleteNode(self, service, nodeIdentifier, sender=None):
661        """
662        Delete a publish subscribe node.
663
664        @param service: The publish subscribe service to delete the node from.
665        @type service: L{JID}
666        @param nodeIdentifier: The identifier of the node.
667        @type nodeIdentifier: C{unicode}
668        """
669        request = PubSubRequest('delete')
670        request.recipient = service
671        request.nodeIdentifier = nodeIdentifier
672        request.sender = sender
673        return request.send(self.xmlstream)
674
675
676    def subscribe(self, service, nodeIdentifier, subscriber, sender=None):
677        """
678        Subscribe to a publish subscribe node.
679
680        @param service: The publish subscribe service that keeps the node.
681        @type service: L{JID}
682        @param nodeIdentifier: The identifier of the node.
683        @type nodeIdentifier: C{unicode}
684        @param subscriber: The entity to subscribe to the node. This entity
685                           will get notifications of new published items.
686        @type subscriber: L{JID}
687        """
688        request = PubSubRequest('subscribe')
689        request.recipient = service
690        request.nodeIdentifier = nodeIdentifier
691        request.subscriber = subscriber
692        request.sender = sender
693
694        def cb(iq):
695            subscription = iq.pubsub.subscription["subscription"]
696
697            if subscription == 'pending':
698                raise SubscriptionPending
699            elif subscription == 'unconfigured':
700                raise SubscriptionUnconfigured
701            else:
702                # we assume subscription == 'subscribed'
703                # any other value would be invalid, but that should have
704                # yielded a stanza error.
705                return None
706
707        d = request.send(self.xmlstream)
708        d.addCallback(cb)
709        return d
710
711
712    def unsubscribe(self, service, nodeIdentifier, subscriber, sender=None):
713        """
714        Unsubscribe from a publish subscribe node.
715
716        @param service: The publish subscribe service that keeps the node.
717        @type service: L{JID}
718        @param nodeIdentifier: The identifier of the node.
719        @type nodeIdentifier: C{unicode}
720        @param subscriber: The entity to unsubscribe from the node.
721        @type subscriber: L{JID}
722        """
723        request = PubSubRequest('unsubscribe')
724        request.recipient = service
725        request.nodeIdentifier = nodeIdentifier
726        request.subscriber = subscriber
727        request.sender = sender
728        return request.send(self.xmlstream)
729
730
731    def publish(self, service, nodeIdentifier, items=None, sender=None):
732        """
733        Publish to a publish subscribe node.
734
735        @param service: The publish subscribe service that keeps the node.
736        @type service: L{JID}
737        @param nodeIdentifier: The identifier of the node.
738        @type nodeIdentifier: C{unicode}
739        @param items: Optional list of L{Item}s to publish.
740        @type items: C{list}
741        """
742        request = PubSubRequest('publish')
743        request.recipient = service
744        request.nodeIdentifier = nodeIdentifier
745        request.items = items
746        request.sender = sender
747        return request.send(self.xmlstream)
748
749
750    def items(self, service, nodeIdentifier, maxItems=None, sender=None):
751        """
752        Retrieve previously published items from a publish subscribe node.
753
754        @param service: The publish subscribe service that keeps the node.
755        @type service: L{JID}
756        @param nodeIdentifier: The identifier of the node.
757        @type nodeIdentifier: C{unicode}
758        @param maxItems: Optional limit on the number of retrieved items.
759        @type maxItems: C{int}
760        """
761        request = PubSubRequest('items')
762        request.recipient = service
763        request.nodeIdentifier = nodeIdentifier
764        if maxItems:
765            request.maxItems = str(int(maxItems))
766        request.sender = sender
767
768        def cb(iq):
769            items = []
770            for element in iq.pubsub.items.elements():
771                if element.uri == NS_PUBSUB and element.name == 'item':
772                    items.append(element)
773            return items
774
775        d = request.send(self.xmlstream)
776        d.addCallback(cb)
777        return d
778
779
780
781class PubSubService(XMPPHandler, IQHandlerMixin):
782    """
783    Protocol implementation for a XMPP Publish Subscribe Service.
784
785    The word Service here is used as taken from the Publish Subscribe
786    specification. It is the party responsible for keeping nodes and their
787    subscriptions, and sending out notifications.
788
789    Methods from the L{IPubSubService} interface that are called as
790    a result of an XMPP request may raise exceptions. Alternatively the
791    deferred returned by these methods may have their errback called. These are
792    handled as follows:
793
794     - If the exception is an instance of L{error.StanzaError}, an error
795       response iq is returned.
796     - Any other exception is reported using L{log.msg}. An error response
797       with the condition C{internal-server-error} is returned.
798
799    The default implementation of said methods raises an L{Unsupported}
800    exception and are meant to be overridden.
801
802    @ivar discoIdentity: Service discovery identity as a dictionary with
803                         keys C{'category'}, C{'type'} and C{'name'}.
804    @ivar pubSubFeatures: List of supported publish-subscribe features for
805                          service discovery, as C{str}.
806    @type pubSubFeatures: C{list} or C{None}
807    """
808
809    implements(IPubSubService)
810
811    iqHandlers = {
812            '/*': '_onPubSubRequest',
813            }
814
815    _legacyHandlers = {
816        'publish': ('publish', ['sender', 'recipient',
817                                'nodeIdentifier', 'items']),
818        'subscribe': ('subscribe', ['sender', 'recipient',
819                                    'nodeIdentifier', 'subscriber']),
820        'unsubscribe': ('unsubscribe', ['sender', 'recipient',
821                                        'nodeIdentifier', 'subscriber']),
822        'subscriptions': ('subscriptions', ['sender', 'recipient']),
823        'affiliations': ('affiliations', ['sender', 'recipient']),
824        'create': ('create', ['sender', 'recipient', 'nodeIdentifier']),
825        'getConfigurationOptions': ('getConfigurationOptions', []),
826        'default': ('getDefaultConfiguration',
827                    ['sender', 'recipient', 'nodeType']),
828        'configureGet': ('getConfiguration', ['sender', 'recipient',
829                                              'nodeIdentifier']),
830        'configureSet': ('setConfiguration', ['sender', 'recipient',
831                                              'nodeIdentifier', 'options']),
832        'items': ('items', ['sender', 'recipient', 'nodeIdentifier',
833                            'maxItems', 'itemIdentifiers']),
834        'retract': ('retract', ['sender', 'recipient', 'nodeIdentifier',
835                                'itemIdentifiers']),
836        'purge': ('purge', ['sender', 'recipient', 'nodeIdentifier']),
837        'delete': ('delete', ['sender', 'recipient', 'nodeIdentifier']),
838    }
839
840    hideNodes = False
841
842    def __init__(self, resource=None):
843        self.resource = resource
844        self.discoIdentity = {'category': 'pubsub',
845                              'type': 'generic',
846                              'name': 'Generic Publish-Subscribe Service'}
847
848        self.pubSubFeatures = []
849
850
851    def connectionMade(self):
852        self.xmlstream.addObserver(PUBSUB_REQUEST, self.handleRequest)
853
854
855    def getDiscoInfo(self, requestor, target, nodeIdentifier):
856        def toInfo(nodeInfo, info):
857            if not nodeInfo:
858                return info
859
860            (nodeType, metaData) = nodeInfo['type'], nodeInfo['meta-data']
861            info.append(disco.DiscoIdentity('pubsub', nodeType))
862            if metaData:
863                form = data_form.Form(formType="result",
864                                      formNamespace=NS_PUBSUB_META_DATA)
865                form.addField(
866                        data_form.Field(
867                            var='pubsub#node_type',
868                            value=nodeType,
869                            label='The type of node (collection or leaf)'
870                        )
871                )
872
873                for metaDatum in metaData:
874                    form.addField(data_form.Field.fromDict(metaDatum))
875
876                info.append(form)
877
878            return info
879
880        info = []
881
882        request = PubSubRequest('discoInfo')
883
884        if self.resource is not None:
885            resource = self.resource.locateResource(request)
886            identity = resource.discoIdentity
887            features = resource.features
888            getInfo = resource.getInfo
889        else:
890            category, idType, name = self.discoIdentity
891            identity = disco.DiscoIdentity(category, idType, name)
892            features = self.pubSubFeatures
893            getInfo = self.getNodeInfo
894
895        if not nodeIdentifier:
896            info.append(identity)
897            info.append(disco.DiscoFeature(disco.NS_DISCO_ITEMS))
898            info.extend([disco.DiscoFeature("%s#%s" % (NS_PUBSUB, feature))
899                         for feature in features])
900
901        d = getInfo(requestor, target, nodeIdentifier or '')
902        d.addCallback(toInfo, info)
903        d.addErrback(log.err)
904        return d
905
906
907    def getDiscoItems(self, requestor, target, nodeIdentifier):
908        if self.hideNodes:
909            d = defer.succeed([])
910        elif self.resource is not None:
911            request = PubSubRequest('discoInfo')
912            resource = self.resource.locateResource(request)
913            d = resource.getNodes(requestor, target, nodeIdentifier)
914        elif nodeIdentifier:
915            d = self.getNodes(requestor, target)
916        else:
917            d = defer.succeed([])
918           
919
920
921        d.addCallback(lambda nodes: [disco.DiscoItem(target, node)
922                                     for node in nodes])
923        return d
924
925
926    def _onPubSubRequest(self, iq):
927        request = PubSubRequest.fromElement(iq)
928
929        if self.resource is not None:
930            resource = self.resource.locateResource(request)
931        else:
932            resource = self
933
934        # Preprocess the request, knowing the handling resource
935        try:
936            preProcessor = getattr(self, '_preProcess_%s' % request.verb)
937        except AttributeError:
938            pass
939        else:
940            request = preProcessor(resource, request)
941            if request is None:
942                return defer.succeed(None)
943
944        # Process the request itself,
945        if resource is not self:
946            try:
947                handler = getattr(resource, request.verb)
948            except AttributeError:
949                # fix lookup feature
950                text = "Request verb: %s" % request.verb
951                return defer.fail(Unsupported('', text))
952
953            d = handler(request)
954        else:
955            handlerName, argNames = self._legacyHandlers[request.verb]
956            handler = getattr(self, handlerName)
957            args = [getattr(request, arg) for arg in argNames]
958            d = handler(*args)
959
960        # If needed, translate the result into a response
961        try:
962            cb = getattr(self, '_toResponse_%s' % request.verb)
963        except AttributeError:
964            pass
965        else:
966            d.addCallback(cb, resource, request)
967
968        return d
969
970
971    def _toResponse_subscribe(self, result, resource, request):
972        response = domish.Element((NS_PUBSUB, "pubsub"))
973        subscription = response.addElement("subscription")
974        if result.nodeIdentifier:
975            subscription["node"] = result.nodeIdentifier
976        subscription["jid"] = result.subscriber.full()
977        subscription["subscription"] = result.state
978        return response
979
980
981    def _toResponse_subscriptions(self, result, resource, request):
982        response = domish.Element((NS_PUBSUB, 'pubsub'))
983        subscriptions = response.addElement('subscriptions')
984        for subscription in result:
985            item = subscriptions.addElement('subscription')
986            item['node'] = subscription.nodeIdentifier
987            item['jid'] = subscription.subscriber.full()
988            item['subscription'] = subscription.state
989        return response
990
991
992    def _toResponse_affiliations(self, result, resource, request):
993        response = domish.Element((NS_PUBSUB, 'pubsub'))
994        affiliations = response.addElement('affiliations')
995
996        for nodeIdentifier, affiliation in result:
997            item = affiliations.addElement('affiliation')
998            item['node'] = nodeIdentifier
999            item['affiliation'] = affiliation
1000
1001        return response
1002
1003
1004    def _toResponse_create(self, result, resource, request):
1005        if not request.nodeIdentifier or request.nodeIdentifier != result:
1006            response = domish.Element((NS_PUBSUB, 'pubsub'))
1007            create = response.addElement('create')
1008            create['node'] = result
1009            return response
1010        else:
1011            return None
1012
1013
1014    def _makeFields(self, options, values):
1015        fields = []
1016        for name, value in values.iteritems():
1017            if name not in options:
1018                continue
1019
1020            option = {'var': name}
1021            option.update(options[name])
1022            if isinstance(value, list):
1023                option['values'] = value
1024            else:
1025                option['value'] = value
1026            fields.append(data_form.Field.fromDict(option))
1027        return fields
1028
1029
1030    def _formFromConfiguration(self, resource, values):
1031        options = resource.getConfigurationOptions()
1032        fields = self._makeFields(options, values)
1033        form = data_form.Form(formType="form",
1034                              formNamespace=NS_PUBSUB_NODE_CONFIG,
1035                              fields=fields)
1036
1037        return form
1038
1039
1040    def _checkConfiguration(self, resource, values):
1041        options = resource.getConfigurationOptions()
1042        processedValues = {}
1043
1044        for key, value in values.iteritems():
1045            if key not in options:
1046                continue
1047
1048            option = {'var': key}
1049            option.update(options[key])
1050            field = data_form.Field.fromDict(option)
1051            if isinstance(value, list):
1052                field.values = value
1053            else:
1054                field.value = value
1055            field.typeCheck()
1056
1057            if isinstance(value, list):
1058                processedValues[key] = field.values
1059            else:
1060                processedValues[key] = field.value
1061
1062        return processedValues
1063
1064
1065    def _preProcess_default(self, resource, request):
1066        if request.nodeType not in ('leaf', 'collection'):
1067            raise error.StanzaError('not-acceptable')
1068        else:
1069            return request
1070
1071
1072    def _toResponse_default(self, options, resource, request):
1073        response = domish.Element((NS_PUBSUB_OWNER, "pubsub"))
1074        default = response.addElement("default")
1075        form = self._formFromConfiguration(resource, options)
1076        default.addChild(form.toElement())
1077        return response
1078
1079
1080    def _toResponse_configureGet(self, options, resource, request):
1081        response = domish.Element((NS_PUBSUB_OWNER, "pubsub"))
1082        configure = response.addElement("configure")
1083        form = self._formFromConfiguration(resource, options)
1084        configure.addChild(form.toElement())
1085
1086        if request.nodeIdentifier:
1087            configure["node"] = request.nodeIdentifier
1088
1089        return response
1090
1091
1092    def _preProcess_configureSet(self, resource, request):
1093        if request.options:
1094            request.options = self._checkConfiguration(resource,
1095                                                       request.options)
1096            return request
1097        else:
1098            return None
1099
1100
1101    def _toResponse_items(self, result, resource, request):
1102        response = domish.Element((NS_PUBSUB, 'pubsub'))
1103        items = response.addElement('items')
1104        items["node"] = request.nodeIdentifier
1105
1106        for item in result:
1107            items.addChild(item)
1108
1109        return response
1110
1111
1112    def _createNotification(self, eventType, service, nodeIdentifier,
1113                                  subscriber, subscriptions=None):
1114        headers = []
1115
1116        if subscriptions:
1117            for subscription in subscriptions:
1118                if nodeIdentifier != subscription.nodeIdentifier:
1119                    headers.append(('Collection', subscription.nodeIdentifier))
1120
1121        message = domish.Element((None, "message"))
1122        message["from"] = service.full()
1123        message["to"] = subscriber.full()
1124        event = message.addElement((NS_PUBSUB_EVENT, "event"))
1125
1126        element = event.addElement(eventType)
1127        element["node"] = nodeIdentifier
1128
1129        if headers:
1130            message.addChild(shim.Headers(headers))
1131
1132        return message
1133
1134    # public methods
1135
1136    def notifyPublish(self, service, nodeIdentifier, notifications):
1137        for subscriber, subscriptions, items in notifications:
1138            message = self._createNotification('items', service,
1139                                               nodeIdentifier, subscriber,
1140                                               subscriptions)
1141            message.event.items.children = items
1142            self.send(message)
1143
1144
1145    def notifyDelete(self, service, nodeIdentifier, subscribers,
1146                           redirectURI=None):
1147        for subscriber in subscribers:
1148            message = self._createNotification('delete', service,
1149                                               nodeIdentifier,
1150                                               subscriber)
1151            if redirectURI:
1152                redirect = message.event.delete.addElement('redirect')
1153                redirect['uri'] = redirectURI
1154            self.send(message)
1155
1156
1157    def getNodeInfo(self, requestor, service, nodeIdentifier):
1158        return None
1159
1160
1161    def getNodes(self, requestor, service):
1162        return []
1163
1164
1165    def publish(self, requestor, service, nodeIdentifier, items):
1166        raise Unsupported('publish')
1167
1168
1169    def subscribe(self, requestor, service, nodeIdentifier, subscriber):
1170        raise Unsupported('subscribe')
1171
1172
1173    def unsubscribe(self, requestor, service, nodeIdentifier, subscriber):
1174        raise Unsupported('subscribe')
1175
1176
1177    def subscriptions(self, requestor, service):
1178        raise Unsupported('retrieve-subscriptions')
1179
1180
1181    def affiliations(self, requestor, service):
1182        raise Unsupported('retrieve-affiliations')
1183
1184
1185    def create(self, requestor, service, nodeIdentifier):
1186        raise Unsupported('create-nodes')
1187
1188
1189    def getConfigurationOptions(self):
1190        return {}
1191
1192
1193    def getDefaultConfiguration(self, requestor, service, nodeType):
1194        raise Unsupported('retrieve-default')
1195
1196
1197    def getConfiguration(self, requestor, service, nodeIdentifier):
1198        raise Unsupported('config-node')
1199
1200
1201    def setConfiguration(self, requestor, service, nodeIdentifier, options):
1202        raise Unsupported('config-node')
1203
1204
1205    def items(self, requestor, service, nodeIdentifier, maxItems,
1206                    itemIdentifiers):
1207        raise Unsupported('retrieve-items')
1208
1209
1210    def retract(self, requestor, service, nodeIdentifier, itemIdentifiers):
1211        raise Unsupported('retract-items')
1212
1213
1214    def purge(self, requestor, service, nodeIdentifier):
1215        raise Unsupported('purge-nodes')
1216
1217
1218    def delete(self, requestor, service, nodeIdentifier):
1219        raise Unsupported('delete-nodes')
1220
1221
1222
1223class PubSubResource(object):
1224
1225    implements(IPubSubResource)
1226
1227    features = []
1228    discoIdentity = disco.DiscoIdentity('pubsub',
1229                                        'service',
1230                                        'Publish-Subscribe Service')
1231
1232
1233    def locateResource(self, request):
1234        return self
1235
1236
1237    def getInfo(self, requestor, service, nodeIdentifier):
1238        return defer.succeed(None)
1239
1240
1241    def getNodes(self, requestor, service, nodeIdentifier):
1242        return defer.succeed([])
1243
1244
1245    def getConfigurationOptions(self):
1246        return {}
1247
1248
1249    def publish(self, request):
1250        return defer.fail(Unsupported('publish'))
1251
1252
1253    def subscribe(self, request):
1254        return defer.fail(Unsupported('subscribe'))
1255
1256
1257    def unsubscribe(self, request):
1258        return defer.fail(Unsupported('subscribe'))
1259
1260
1261    def subscriptions(self, request):
1262        return defer.fail(Unsupported('retrieve-subscriptions'))
1263
1264
1265    def affiliations(self, request):
1266        return defer.fail(Unsupported('retrieve-affiliations'))
1267
1268
1269    def create(self, request):
1270        return defer.fail(Unsupported('create-nodes'))
1271
1272
1273    def default(self, request):
1274        return defer.fail(Unsupported('retrieve-default'))
1275
1276
1277    def configureGet(self, request):
1278        return defer.fail(Unsupported('config-node'))
1279
1280
1281    def configureSet(self, request):
1282        return defer.fail(Unsupported('config-node'))
1283
1284
1285    def items(self, request):
1286        return defer.fail(Unsupported('retrieve-items'))
1287
1288
1289    def retract(self, request):
1290        return defer.fail(Unsupported('retract-items'))
1291
1292
1293    def purge(self, request):
1294        return defer.fail(Unsupported('purge-nodes'))
1295
1296
1297    def delete(self, request):
1298        return defer.fail(Unsupported('delete-nodes'))
1299
1300
1301    def affiliationsGet(self, request):
1302        return defer.fail(Unsupported('modify-affiliations'))
1303
1304
1305    def affiliationsSet(self, request):
1306        return defer.fail(Unsupported('modify-affiliations'))
1307
1308
1309    def subscriptionsGet(self, request):
1310        return defer.fail(Unsupported('manage-subscriptions'))
1311
1312
1313    def subscriptionsSet(self, request):
1314        return defer.fail(Unsupported('manage-subscriptions'))
Note: See TracBrowser for help on using the repository browser.