source: wokkel/pubsub.py @ 58:a8c2aadabebd

Last change on this file since 58:a8c2aadabebd was 58:a8c2aadabebd, checked in by Ralph Meijer <ralphm@…>, 13 years ago

Allow publish-subscribe client requests to come from a specific JID.

Author: ralphm.
Fixes #46.

File size: 38.1 KB
Line 
1# -*- test-case-name: wokkel.test.test_pubsub -*-
2#
3# Copyright (c) 2003-2008 Ralph Meijer
4# See LICENSE for details.
5
6"""
7XMPP publish-subscribe protocol.
8
9This protocol is specified in
10U{XEP-0060<http://www.xmpp.org/extensions/xep-0060.html>}.
11"""
12
13from zope.interface import implements
14
15from twisted.internet import defer
16from twisted.words.protocols.jabber import jid, error, xmlstream
17from twisted.words.xish import domish
18
19from wokkel import disco, data_form, generic, shim
20from wokkel.subprotocols import IQHandlerMixin, XMPPHandler
21from wokkel.iwokkel import IPubSubClient, IPubSubService
22
23# Iq get and set XPath queries
24IQ_GET = '/iq[@type="get"]'
25IQ_SET = '/iq[@type="set"]'
26
27# Publish-subscribe namespaces
28NS_PUBSUB = 'http://jabber.org/protocol/pubsub'
29NS_PUBSUB_EVENT = NS_PUBSUB + '#event'
30NS_PUBSUB_ERRORS = NS_PUBSUB + '#errors'
31NS_PUBSUB_OWNER = NS_PUBSUB + "#owner"
32NS_PUBSUB_NODE_CONFIG = NS_PUBSUB + "#node_config"
33NS_PUBSUB_META_DATA = NS_PUBSUB + "#meta-data"
34NS_PUBSUB_SUBSCRIBE_OPTIONS = NS_PUBSUB + "#subscribe_options"
35
36# XPath to match pubsub requests
37PUBSUB_REQUEST = '/iq[@type="get" or @type="set"]/' + \
38                    'pubsub[@xmlns="' + NS_PUBSUB + '" or ' + \
39                           '@xmlns="' + NS_PUBSUB_OWNER + '"]'
40
41class SubscriptionPending(Exception):
42    """
43    Raised when the requested subscription is pending acceptance.
44    """
45
46
47
48class SubscriptionUnconfigured(Exception):
49    """
50    Raised when the requested subscription needs to be configured before
51    becoming active.
52    """
53
54
55
56class PubSubError(error.StanzaError):
57    """
58    Exception with publish-subscribe specific condition.
59    """
60    def __init__(self, condition, pubsubCondition, feature=None, text=None):
61        appCondition = domish.Element((NS_PUBSUB_ERRORS, pubsubCondition))
62        if feature:
63            appCondition['feature'] = feature
64        error.StanzaError.__init__(self, condition,
65                                         text=text,
66                                         appCondition=appCondition)
67
68
69
70class BadRequest(error.StanzaError):
71    """
72    Bad request stanza error.
73    """
74    def __init__(self, pubsubCondition=None, text=None):
75        if pubsubCondition:
76            appCondition = domish.Element((NS_PUBSUB_ERRORS, pubsubCondition))
77        else:
78            appCondition = None
79        error.StanzaError.__init__(self, 'bad-request',
80                                         text=text,
81                                         appCondition=appCondition)
82
83
84
85class Unsupported(PubSubError):
86    def __init__(self, feature, text=None):
87        PubSubError.__init__(self, 'feature-not-implemented',
88                                   'unsupported',
89                                   feature,
90                                   text)
91
92
93
94class Subscription(object):
95    """
96    A subscription to a node.
97
98    @ivar nodeIdentifier: The identifier of the node subscribed to.
99                          The root node is denoted by C{None}.
100    @ivar subscriber: The subscribing entity.
101    @ivar state: The subscription state. One of C{'subscribed'}, C{'pending'},
102                 C{'unconfigured'}.
103    @ivar options: Optional list of subscription options.
104    @type options: C{dict}.
105    """
106
107    def __init__(self, nodeIdentifier, subscriber, state, options=None):
108        self.nodeIdentifier = nodeIdentifier
109        self.subscriber = subscriber
110        self.state = state
111        self.options = options or {}
112
113
114
115class Item(domish.Element):
116    """
117    Publish subscribe item.
118
119    This behaves like an object providing L{domish.IElement}.
120
121    Item payload can be added using C{addChild} or C{addRawXml}, or using the
122    C{payload} keyword argument to C{__init__}.
123    """
124
125    def __init__(self, id=None, payload=None):
126        """
127        @param id: optional item identifier
128        @type id: L{unicode}
129        @param payload: optional item payload. Either as a domish element, or
130                        as serialized XML.
131        @type payload: object providing L{domish.IElement} or L{unicode}.
132        """
133
134        domish.Element.__init__(self, (NS_PUBSUB, 'item'))
135        if id is not None:
136            self['id'] = id
137        if payload is not None:
138            if isinstance(payload, basestring):
139                self.addRawXml(payload)
140            else:
141                self.addChild(payload)
142
143
144
145class PubSubRequest(generic.Stanza):
146    """
147    A publish-subscribe request.
148
149    The set of instance variables used depends on the type of request. If
150    a variable is not applicable or not passed in the request, its value is
151    C{None}.
152
153    @ivar verb: The type of publish-subscribe request. See L{_requestVerbMap}.
154    @type verb: C{str}.
155
156    @ivar affiliations: Affiliations to be modified.
157    @type affiliations: C{set}
158    @ivar items: The items to be published, as L{domish.Element}s.
159    @type items: C{list}
160    @ivar itemIdentifiers: Identifiers of the items to be retrieved or
161                           retracted.
162    @type itemIdentifiers: C{set}
163    @ivar maxItems: Maximum number of items to retrieve.
164    @type maxItems: C{int}.
165    @ivar nodeIdentifier: Identifier of the node the request is about.
166    @type nodeIdentifier: C{unicode}
167    @ivar nodeType: The type of node that should be created, or for which the
168                    configuration is retrieved. C{'leaf'} or C{'collection'}.
169    @type nodeType: C{str}
170    @ivar options: Configurations options for nodes, subscriptions and publish
171                   requests.
172    @type options: L{data_form.Form}
173    @ivar subscriber: The subscribing entity.
174    @type subscriber: L{JID}
175    @ivar subscriptionIdentifier: Identifier for a specific subscription.
176    @type subscriptionIdentifier: C{unicode}
177    @ivar subscriptions: Subscriptions to be modified, as a set of
178                         L{Subscription}.
179    @type subscriptions: C{set}
180    """
181
182    verb = None
183
184    affiliations = None
185    items = None
186    itemIdentifiers = None
187    maxItems = None
188    nodeIdentifier = None
189    nodeType = None
190    options = None
191    subscriber = None
192    subscriptionIdentifier = None
193    subscriptions = None
194
195    # Map request iq type and subelement name to request verb
196    _requestVerbMap = {
197        ('set', NS_PUBSUB, 'publish'): 'publish',
198        ('set', NS_PUBSUB, 'subscribe'): 'subscribe',
199        ('set', NS_PUBSUB, 'unsubscribe'): 'unsubscribe',
200        ('get', NS_PUBSUB, 'options'): 'optionsGet',
201        ('set', NS_PUBSUB, 'options'): 'optionsSet',
202        ('get', NS_PUBSUB, 'subscriptions'): 'subscriptions',
203        ('get', NS_PUBSUB, 'affiliations'): 'affiliations',
204        ('set', NS_PUBSUB, 'create'): 'create',
205        ('get', NS_PUBSUB_OWNER, 'default'): 'default',
206        ('get', NS_PUBSUB_OWNER, 'configure'): 'configureGet',
207        ('set', NS_PUBSUB_OWNER, 'configure'): 'configureSet',
208        ('get', NS_PUBSUB, 'items'): 'items',
209        ('set', NS_PUBSUB, 'retract'): 'retract',
210        ('set', NS_PUBSUB_OWNER, 'purge'): 'purge',
211        ('set', NS_PUBSUB_OWNER, 'delete'): 'delete',
212        ('get', NS_PUBSUB_OWNER, 'affiliations'): 'affiliationsGet',
213        ('set', NS_PUBSUB_OWNER, 'affiliations'): 'affiliationsSet',
214        ('get', NS_PUBSUB_OWNER, 'subscriptions'): 'subscriptionsGet',
215        ('set', NS_PUBSUB_OWNER, 'subscriptions'): 'subscriptionsSet',
216    }
217
218    # Map request verb to request iq type and subelement name
219    _verbRequestMap = dict(((v, k) for k, v in _requestVerbMap.iteritems()))
220
221    # Map request verb to parameter handler names
222    _parameters = {
223        'publish': ['node', 'items'],
224        'subscribe': ['nodeOrEmpty', 'jid'],
225        'unsubscribe': ['nodeOrEmpty', 'jid'],
226        'optionsGet': ['nodeOrEmpty', 'jid'],
227        'optionsSet': ['nodeOrEmpty', 'jid', 'options'],
228        'subscriptions': [],
229        'affiliations': [],
230        'create': ['nodeOrNone'],
231        'default': ['default'],
232        'configureGet': ['nodeOrEmpty'],
233        'configureSet': ['nodeOrEmpty', 'configure'],
234        'items': ['node', 'maxItems', 'itemIdentifiers'],
235        'retract': ['node', 'itemIdentifiers'],
236        'purge': ['node'],
237        'delete': ['node'],
238        'affiliationsGet': [],
239        'affiliationsSet': [],
240        'subscriptionsGet': [],
241        'subscriptionsSet': [],
242    }
243
244    def __init__(self, verb=None):
245        self.verb = verb
246
247
248    @staticmethod
249    def _findForm(element, formNamespace):
250        """
251        Find a Data Form.
252
253        Look for an element that represents a Data Form with the specified
254        form namespace as a child element of the given element.
255        """
256        if not element:
257            return None
258
259        form = None
260        for child in element.elements():
261            try:
262                form = data_form.Form.fromElement(child)
263            except data_form.Error:
264                continue
265
266            if form.formNamespace != NS_PUBSUB_NODE_CONFIG:
267                continue
268
269        return form
270
271
272    def _parse_node(self, verbElement):
273        """
274        Parse the required node identifier out of the verbElement.
275        """
276        try:
277            self.nodeIdentifier = verbElement["node"]
278        except KeyError:
279            raise BadRequest('nodeid-required')
280
281
282    def _render_node(self, verbElement):
283        """
284        Render the required node identifier on the verbElement.
285        """
286        if not self.nodeIdentifier:
287            raise Exception("Node identifier is required")
288
289        verbElement['node'] = self.nodeIdentifier
290
291
292    def _parse_nodeOrEmpty(self, verbElement):
293        """
294        Parse the node identifier out of the verbElement. May be empty.
295        """
296        self.nodeIdentifier = verbElement.getAttribute("node", '')
297
298
299    def _render_nodeOrEmpty(self, verbElement):
300        """
301        Render the node identifier on the verbElement. May be empty.
302        """
303        if self.nodeIdentifier:
304            verbElement['node'] = self.nodeIdentifier
305
306
307    def _parse_nodeOrNone(self, verbElement):
308        """
309        Parse the optional node identifier out of the verbElement.
310        """
311        self.nodeIdentifier = verbElement.getAttribute("node")
312
313
314    def _render_nodeOrNone(self, verbElement):
315        """
316        Render the optional node identifier on the verbElement.
317        """
318        if self.nodeIdentifier:
319            verbElement['node'] = self.nodeIdentifier
320
321
322    def _parse_items(self, verbElement):
323        """
324        Parse items out of the verbElement for publish requests.
325        """
326        self.items = []
327        for element in verbElement.elements():
328            if element.uri == NS_PUBSUB and element.name == 'item':
329                self.items.append(element)
330
331
332    def _render_items(self, verbElement):
333        """
334        Render items into the verbElement for publish requests.
335        """
336        if self.items:
337            for item in self.items:
338                verbElement.addChild(item)
339
340
341    def _parse_jid(self, verbElement):
342        """
343        Parse subscriber out of the verbElement for un-/subscribe requests.
344        """
345        try:
346            self.subscriber = jid.internJID(verbElement["jid"])
347        except KeyError:
348            raise BadRequest('jid-required')
349
350
351    def _render_jid(self, verbElement):
352        """
353        Render subscriber into the verbElement for un-/subscribe requests.
354        """
355        verbElement['jid'] = self.subscriber.full()
356
357
358    def _parse_default(self, verbElement):
359        """
360        Parse node type out of a request for the default node configuration.
361        """
362        form = PubSubRequest._findForm(verbElement, NS_PUBSUB_NODE_CONFIG)
363        if form and form.formType == 'submit':
364            values = form.getValues()
365            self.nodeType = values.get('pubsub#node_type', 'leaf')
366        else:
367            self.nodeType = 'leaf'
368
369
370    def _parse_configure(self, verbElement):
371        """
372        Parse options out of a request for setting the node configuration.
373        """
374        form = PubSubRequest._findForm(verbElement, NS_PUBSUB_NODE_CONFIG)
375        if form:
376            if form.formType == 'submit':
377                self.options = form.getValues()
378            elif form.formType == 'cancel':
379                self.options = {}
380            else:
381                raise BadRequest(text="Unexpected form type %r" % form.formType)
382        else:
383            raise BadRequest(text="Missing configuration form")
384
385
386
387    def _parse_itemIdentifiers(self, verbElement):
388        """
389        Parse item identifiers out of items and retract requests.
390        """
391        self.itemIdentifiers = []
392        for element in verbElement.elements():
393            if element.uri == NS_PUBSUB and element.name == 'item':
394                try:
395                    self.itemIdentifiers.append(element["id"])
396                except KeyError:
397                    raise BadRequest()
398
399
400    def _render_itemIdentifiers(self, verbElement):
401        """
402        Render item identifiers into items and retract requests.
403        """
404        if self.itemIdentifiers:
405            for itemIdentifier in self.itemIdentifiers:
406                item = verbElement.addElement('item')
407                item['id'] = itemIdentifier
408
409
410    def _parse_maxItems(self, verbElement):
411        """
412        Parse maximum items out of an items request.
413        """
414        value = verbElement.getAttribute('max_items')
415
416        if value:
417            try:
418                self.maxItems = int(value)
419            except ValueError:
420                raise BadRequest(text="Field max_items requires a positive " +
421                                      "integer value")
422
423
424    def _render_maxItems(self, verbElement):
425        """
426        Parse maximum items into an items request.
427        """
428        if self.maxItems:
429            verbElement['max_items'] = unicode(self.maxItems)
430
431
432    def _parse_options(self, verbElement):
433        form = PubSubRequest._findForm(verbElement, NS_PUBSUB_SUBSCRIBE_OPTIONS)
434        if form:
435            if form.formType == 'submit':
436                self.options = form.getValues()
437            elif form.formType == 'cancel':
438                self.options = {}
439            else:
440                raise BadRequest(text="Unexpected form type %r" % form.formType)
441        else:
442            raise BadRequest(text="Missing options form")
443
444    def parseElement(self, element):
445        """
446        Parse the publish-subscribe verb and parameters out of a request.
447        """
448        generic.Stanza.parseElement(self, element)
449
450        for child in element.pubsub.elements():
451            key = (self.stanzaType, child.uri, child.name)
452            try:
453                verb = self._requestVerbMap[key]
454            except KeyError:
455                continue
456            else:
457                self.verb = verb
458                break
459
460        if not self.verb:
461            raise NotImplementedError()
462
463        for parameter in self._parameters[verb]:
464            getattr(self, '_parse_%s' % parameter)(child)
465
466
467    def send(self, xs):
468        """
469        Send this request to its recipient.
470
471        This renders all of the relevant parameters for this specific
472        requests into an L{xmlstream.IQ}, and invoke its C{send} method.
473        This returns a deferred that fires upon reception of a response. See
474        L{xmlstream.IQ} for details.
475
476        @param xs: The XML stream to send the request on.
477        @type xs: L{xmlstream.XmlStream}
478        @rtype: L{defer.Deferred}.
479        """
480
481        try:
482            (self.stanzaType,
483             childURI,
484             childName) = self._verbRequestMap[self.verb]
485        except KeyError:
486            raise NotImplementedError()
487
488        iq = xmlstream.IQ(xs, self.stanzaType)
489        iq.addElement((childURI, 'pubsub'))
490        verbElement = iq.pubsub.addElement(childName)
491
492        if self.sender:
493            iq['from'] = self.sender.full()
494        if self.recipient:
495            iq['to'] = self.recipient.full()
496
497        for parameter in self._parameters[self.verb]:
498            getattr(self, '_render_%s' % parameter)(verbElement)
499
500        return iq.send()
501
502
503
504class PubSubEvent(object):
505    """
506    A publish subscribe event.
507
508    @param sender: The entity from which the notification was received.
509    @type sender: L{jid.JID}
510    @param recipient: The entity to which the notification was sent.
511    @type recipient: L{wokkel.pubsub.ItemsEvent}
512    @param nodeIdentifier: Identifier of the node the event pertains to.
513    @type nodeIdentifier: C{unicode}
514    @param headers: SHIM headers, see L{wokkel.shim.extractHeaders}.
515    @type headers: L{dict}
516    """
517
518    def __init__(self, sender, recipient, nodeIdentifier, headers):
519        self.sender = sender
520        self.recipient = recipient
521        self.nodeIdentifier = nodeIdentifier
522        self.headers = headers
523
524
525
526class ItemsEvent(PubSubEvent):
527    """
528    A publish-subscribe event that signifies new, updated and retracted items.
529
530    @param items: List of received items as domish elements.
531    @type items: C{list} of L{domish.Element}
532    """
533
534    def __init__(self, sender, recipient, nodeIdentifier, items, headers):
535        PubSubEvent.__init__(self, sender, recipient, nodeIdentifier, headers)
536        self.items = items
537
538
539
540class DeleteEvent(PubSubEvent):
541    """
542    A publish-subscribe event that signifies the deletion of a node.
543    """
544
545    redirectURI = None
546
547
548
549class PurgeEvent(PubSubEvent):
550    """
551    A publish-subscribe event that signifies the purging of a node.
552    """
553
554
555
556class PubSubClient(XMPPHandler):
557    """
558    Publish subscribe client protocol.
559    """
560
561    implements(IPubSubClient)
562
563    def connectionInitialized(self):
564        self.xmlstream.addObserver('/message/event[@xmlns="%s"]' %
565                                   NS_PUBSUB_EVENT, self._onEvent)
566
567
568    def _onEvent(self, message):
569        try:
570            sender = jid.JID(message["from"])
571            recipient = jid.JID(message["to"])
572        except KeyError:
573            return
574
575        actionElement = None
576        for element in message.event.elements():
577            if element.uri == NS_PUBSUB_EVENT:
578                actionElement = element
579
580        if not actionElement:
581            return
582
583        eventHandler = getattr(self, "_onEvent_%s" % actionElement.name, None)
584
585        if eventHandler:
586            headers = shim.extractHeaders(message)
587            eventHandler(sender, recipient, actionElement, headers)
588            message.handled = True
589
590
591    def _onEvent_items(self, sender, recipient, action, headers):
592        nodeIdentifier = action["node"]
593
594        items = [element for element in action.elements()
595                         if element.name in ('item', 'retract')]
596
597        event = ItemsEvent(sender, recipient, nodeIdentifier, items, headers)
598        self.itemsReceived(event)
599
600
601    def _onEvent_delete(self, sender, recipient, action, headers):
602        nodeIdentifier = action["node"]
603        event = DeleteEvent(sender, recipient, nodeIdentifier, headers)
604        if action.redirect:
605            event.redirectURI = action.redirect.getAttribute('uri')
606        self.deleteReceived(event)
607
608
609    def _onEvent_purge(self, sender, recipient, action, headers):
610        nodeIdentifier = action["node"]
611        event = PurgeEvent(sender, recipient, nodeIdentifier, headers)
612        self.purgeReceived(event)
613
614
615    def itemsReceived(self, event):
616        pass
617
618
619    def deleteReceived(self, event):
620        pass
621
622
623    def purgeReceived(self, event):
624        pass
625
626
627    def createNode(self, service, nodeIdentifier=None, sender=None):
628        """
629        Create a publish subscribe node.
630
631        @param service: The publish subscribe service to create the node at.
632        @type service: L{JID}
633        @param nodeIdentifier: Optional suggestion for the id of the node.
634        @type nodeIdentifier: C{unicode}
635        """
636        request = PubSubRequest('create')
637        request.recipient = service
638        request.nodeIdentifier = nodeIdentifier
639        request.sender = sender
640
641        def cb(iq):
642            try:
643                new_node = iq.pubsub.create["node"]
644            except AttributeError:
645                # the suggested node identifier was accepted
646                new_node = nodeIdentifier
647            return new_node
648
649        d = request.send(self.xmlstream)
650        d.addCallback(cb)
651        return d
652
653
654    def deleteNode(self, service, nodeIdentifier, sender=None):
655        """
656        Delete a publish subscribe node.
657
658        @param service: The publish subscribe service to delete the node from.
659        @type service: L{JID}
660        @param nodeIdentifier: The identifier of the node.
661        @type nodeIdentifier: C{unicode}
662        """
663        request = PubSubRequest('delete')
664        request.recipient = service
665        request.nodeIdentifier = nodeIdentifier
666        request.sender = sender
667        return request.send(self.xmlstream)
668
669
670    def subscribe(self, service, nodeIdentifier, subscriber, sender=None):
671        """
672        Subscribe to a publish subscribe node.
673
674        @param service: The publish subscribe service that keeps the node.
675        @type service: L{JID}
676        @param nodeIdentifier: The identifier of the node.
677        @type nodeIdentifier: C{unicode}
678        @param subscriber: The entity to subscribe to the node. This entity
679                           will get notifications of new published items.
680        @type subscriber: L{JID}
681        """
682        request = PubSubRequest('subscribe')
683        request.recipient = service
684        request.nodeIdentifier = nodeIdentifier
685        request.subscriber = subscriber
686        request.sender = sender
687
688        def cb(iq):
689            subscription = iq.pubsub.subscription["subscription"]
690
691            if subscription == 'pending':
692                raise SubscriptionPending
693            elif subscription == 'unconfigured':
694                raise SubscriptionUnconfigured
695            else:
696                # we assume subscription == 'subscribed'
697                # any other value would be invalid, but that should have
698                # yielded a stanza error.
699                return None
700
701        d = request.send(self.xmlstream)
702        d.addCallback(cb)
703        return d
704
705
706    def unsubscribe(self, service, nodeIdentifier, subscriber, sender=None):
707        """
708        Unsubscribe from a publish subscribe node.
709
710        @param service: The publish subscribe service that keeps the node.
711        @type service: L{JID}
712        @param nodeIdentifier: The identifier of the node.
713        @type nodeIdentifier: C{unicode}
714        @param subscriber: The entity to unsubscribe from the node.
715        @type subscriber: L{JID}
716        """
717        request = PubSubRequest('unsubscribe')
718        request.recipient = service
719        request.nodeIdentifier = nodeIdentifier
720        request.subscriber = subscriber
721        request.sender = sender
722        return request.send(self.xmlstream)
723
724
725    def publish(self, service, nodeIdentifier, items=None, sender=None):
726        """
727        Publish to a publish subscribe node.
728
729        @param service: The publish subscribe service that keeps the node.
730        @type service: L{JID}
731        @param nodeIdentifier: The identifier of the node.
732        @type nodeIdentifier: C{unicode}
733        @param items: Optional list of L{Item}s to publish.
734        @type items: C{list}
735        """
736        request = PubSubRequest('publish')
737        request.recipient = service
738        request.nodeIdentifier = nodeIdentifier
739        request.items = items
740        request.sender = sender
741        return request.send(self.xmlstream)
742
743
744    def items(self, service, nodeIdentifier, maxItems=None, sender=None):
745        """
746        Retrieve previously published items from a publish subscribe node.
747
748        @param service: The publish subscribe service that keeps the node.
749        @type service: L{JID}
750        @param nodeIdentifier: The identifier of the node.
751        @type nodeIdentifier: C{unicode}
752        @param maxItems: Optional limit on the number of retrieved items.
753        @type maxItems: C{int}
754        """
755        request = PubSubRequest('items')
756        request.recipient = service
757        request.nodeIdentifier = nodeIdentifier
758        if maxItems:
759            request.maxItems = str(int(maxItems))
760        request.sender = sender
761
762        def cb(iq):
763            items = []
764            for element in iq.pubsub.items.elements():
765                if element.uri == NS_PUBSUB and element.name == 'item':
766                    items.append(element)
767            return items
768
769        d = request.send(self.xmlstream)
770        d.addCallback(cb)
771        return d
772
773
774
775class PubSubService(XMPPHandler, IQHandlerMixin):
776    """
777    Protocol implementation for a XMPP Publish Subscribe Service.
778
779    The word Service here is used as taken from the Publish Subscribe
780    specification. It is the party responsible for keeping nodes and their
781    subscriptions, and sending out notifications.
782
783    Methods from the L{IPubSubService} interface that are called as
784    a result of an XMPP request may raise exceptions. Alternatively the
785    deferred returned by these methods may have their errback called. These are
786    handled as follows:
787
788     - If the exception is an instance of L{error.StanzaError}, an error
789       response iq is returned.
790     - Any other exception is reported using L{log.msg}. An error response
791       with the condition C{internal-server-error} is returned.
792
793    The default implementation of said methods raises an L{Unsupported}
794    exception and are meant to be overridden.
795
796    @ivar discoIdentity: Service discovery identity as a dictionary with
797                         keys C{'category'}, C{'type'} and C{'name'}.
798    @ivar pubSubFeatures: List of supported publish-subscribe features for
799                          service discovery, as C{str}.
800    @type pubSubFeatures: C{list} or C{None}
801    """
802
803    implements(IPubSubService)
804
805    iqHandlers = {
806            '/*': '_onPubSubRequest',
807            }
808
809
810    def __init__(self):
811        self.discoIdentity = {'category': 'pubsub',
812                              'type': 'generic',
813                              'name': 'Generic Publish-Subscribe Service'}
814
815        self.pubSubFeatures = []
816
817
818    def connectionMade(self):
819        self.xmlstream.addObserver(PUBSUB_REQUEST, self.handleRequest)
820
821
822    def getDiscoInfo(self, requestor, target, nodeIdentifier):
823        info = []
824
825        if not nodeIdentifier:
826            category, idType, name = self.discoIdentity
827            info.append(disco.DiscoIdentity(category, idType, name))
828
829            info.append(disco.DiscoFeature(disco.NS_DISCO_ITEMS))
830            info.extend([disco.DiscoFeature("%s#%s" % (NS_PUBSUB, feature))
831                         for feature in self.pubSubFeatures])
832
833        def toInfo(nodeInfo):
834            if not nodeInfo:
835                return
836
837            (nodeType, metaData) = nodeInfo['type'], nodeInfo['meta-data']
838            info.append(disco.DiscoIdentity('pubsub', nodeType))
839            if metaData:
840                form = data_form.Form(formType="result",
841                                      formNamespace=NS_PUBSUB_META_DATA)
842                form.addField(
843                        data_form.Field(
844                            var='pubsub#node_type',
845                            value=nodeType,
846                            label='The type of node (collection or leaf)'
847                        )
848                )
849
850                for metaDatum in metaData:
851                    form.addField(data_form.Field.fromDict(metaDatum))
852
853                info.append(form)
854
855        d = self.getNodeInfo(requestor, target, nodeIdentifier or '')
856        d.addCallback(toInfo)
857        d.addBoth(lambda result: info)
858        return d
859
860
861    def getDiscoItems(self, requestor, target, nodeIdentifier):
862        if nodeIdentifier or self.hideNodes:
863            return defer.succeed([])
864
865        d = self.getNodes(requestor, target)
866        d.addCallback(lambda nodes: [disco.DiscoItem(target, node)
867                                     for node in nodes])
868        return d
869
870
871    def _onPubSubRequest(self, iq):
872        request = PubSubRequest.fromElement(iq)
873        handler = getattr(self, '_on_%s' % request.verb)
874        return handler(request)
875
876
877    def _on_publish(self, request):
878        return self.publish(request.sender, request.recipient,
879                            request.nodeIdentifier, request.items)
880
881
882    def _on_subscribe(self, request):
883
884        def toResponse(result):
885            response = domish.Element((NS_PUBSUB, "pubsub"))
886            subscription = response.addElement("subscription")
887            if result.nodeIdentifier:
888                subscription["node"] = result.nodeIdentifier
889            subscription["jid"] = result.subscriber.full()
890            subscription["subscription"] = result.state
891            return response
892
893        d = self.subscribe(request.sender, request.recipient,
894                           request.nodeIdentifier, request.subscriber)
895        d.addCallback(toResponse)
896        return d
897
898
899    def _on_unsubscribe(self, request):
900        return self.unsubscribe(request.sender, request.recipient,
901                                request.nodeIdentifier, request.subscriber)
902
903
904    def _on_optionsGet(self, request):
905        raise Unsupported('subscription-options')
906
907
908    def _on_optionsSet(self, request):
909        raise Unsupported('subscription-options')
910
911
912    def _on_subscriptions(self, request):
913
914        def toResponse(result):
915            response = domish.Element((NS_PUBSUB, 'pubsub'))
916            subscriptions = response.addElement('subscriptions')
917            for subscription in result:
918                item = subscriptions.addElement('subscription')
919                item['node'] = subscription.nodeIdentifier
920                item['jid'] = subscription.subscriber.full()
921                item['subscription'] = subscription.state
922            return response
923
924        d = self.subscriptions(request.sender, request.recipient)
925        d.addCallback(toResponse)
926        return d
927
928
929    def _on_affiliations(self, request):
930
931        def toResponse(result):
932            response = domish.Element((NS_PUBSUB, 'pubsub'))
933            affiliations = response.addElement('affiliations')
934
935            for nodeIdentifier, affiliation in result:
936                item = affiliations.addElement('affiliation')
937                item['node'] = nodeIdentifier
938                item['affiliation'] = affiliation
939
940            return response
941
942        d = self.affiliations(request.sender, request.recipient)
943        d.addCallback(toResponse)
944        return d
945
946
947    def _on_create(self, request):
948
949        def toResponse(result):
950            if not request.nodeIdentifier or request.nodeIdentifier != result:
951                response = domish.Element((NS_PUBSUB, 'pubsub'))
952                create = response.addElement('create')
953                create['node'] = result
954                return response
955            else:
956                return None
957
958        d = self.create(request.sender, request.recipient,
959                        request.nodeIdentifier)
960        d.addCallback(toResponse)
961        return d
962
963
964    def _makeFields(self, options, values):
965        fields = []
966        for name, value in values.iteritems():
967            if name not in options:
968                continue
969
970            option = {'var': name}
971            option.update(options[name])
972            if isinstance(value, list):
973                option['values'] = value
974            else:
975                option['value'] = value
976            fields.append(data_form.Field.fromDict(option))
977        return fields
978
979
980    def _formFromConfiguration(self, values):
981        options = self.getConfigurationOptions()
982        fields = self._makeFields(options, values)
983        form = data_form.Form(formType="form",
984                              formNamespace=NS_PUBSUB_NODE_CONFIG,
985                              fields=fields)
986
987        return form
988
989
990    def _checkConfiguration(self, values):
991        options = self.getConfigurationOptions()
992        processedValues = {}
993
994        for key, value in values.iteritems():
995            if key not in options:
996                continue
997
998            option = {'var': key}
999            option.update(options[key])
1000            field = data_form.Field.fromDict(option)
1001            if isinstance(value, list):
1002                field.values = value
1003            else:
1004                field.value = value
1005            field.typeCheck()
1006
1007            if isinstance(value, list):
1008                processedValues[key] = field.values
1009            else:
1010                processedValues[key] = field.value
1011
1012        return processedValues
1013
1014
1015    def _on_default(self, request):
1016
1017        def toResponse(options):
1018            response = domish.Element((NS_PUBSUB_OWNER, "pubsub"))
1019            default = response.addElement("default")
1020            default.addChild(self._formFromConfiguration(options).toElement())
1021            return response
1022
1023        if request.nodeType not in ('leaf', 'collection'):
1024            return defer.fail(error.StanzaError('not-acceptable'))
1025
1026        d = self.getDefaultConfiguration(request.sender, request.recipient,
1027                                         request.nodeType)
1028        d.addCallback(toResponse)
1029        return d
1030
1031
1032    def _on_configureGet(self, request):
1033        def toResponse(options):
1034            response = domish.Element((NS_PUBSUB_OWNER, "pubsub"))
1035            configure = response.addElement("configure")
1036            form = self._formFromConfiguration(options)
1037            configure.addChild(form.toElement())
1038
1039            if request.nodeIdentifier:
1040                configure["node"] = request.nodeIdentifier
1041
1042            return response
1043
1044        d = self.getConfiguration(request.sender, request.recipient,
1045                                  request.nodeIdentifier)
1046        d.addCallback(toResponse)
1047        return d
1048
1049
1050    def _on_configureSet(self, request):
1051        if request.options:
1052            request.options = self._checkConfiguration(request.options)
1053            return self.setConfiguration(request.sender, request.recipient,
1054                                         request.nodeIdentifier,
1055                                         request.options)
1056        else:
1057            return None
1058
1059
1060
1061    def _on_items(self, request):
1062
1063        def toResponse(result):
1064            response = domish.Element((NS_PUBSUB, 'pubsub'))
1065            items = response.addElement('items')
1066            items["node"] = request.nodeIdentifier
1067
1068            for item in result:
1069                items.addChild(item)
1070
1071            return response
1072
1073        d = self.items(request.sender, request.recipient,
1074                       request.nodeIdentifier, request.maxItems,
1075                       request.itemIdentifiers)
1076        d.addCallback(toResponse)
1077        return d
1078
1079
1080    def _on_retract(self, request):
1081        return self.retract(request.sender, request.recipient,
1082                            request.nodeIdentifier, request.itemIdentifiers)
1083
1084
1085    def _on_purge(self, request):
1086        return self.purge(request.sender, request.recipient,
1087                          request.nodeIdentifier)
1088
1089
1090    def _on_delete(self, request):
1091        return self.delete(request.sender, request.recipient,
1092                           request.nodeIdentifier)
1093
1094
1095    def _on_affiliationsGet(self, iq):
1096        raise Unsupported('modify-affiliations')
1097
1098
1099    def _on_affiliationsSet(self, iq):
1100        raise Unsupported('modify-affiliations')
1101
1102
1103    def _on_subscriptionsGet(self, iq):
1104        raise Unsupported('manage-subscriptions')
1105
1106
1107    def _on_subscriptionsSet(self, iq):
1108        raise Unsupported('manage-subscriptions')
1109
1110    # public methods
1111
1112    def _createNotification(self, eventType, service, nodeIdentifier,
1113                                  subscriber, subscriptions=None):
1114        headers = []
1115
1116        if subscriptions:
1117            for subscription in subscriptions:
1118                if nodeIdentifier != subscription.nodeIdentifier:
1119                    headers.append(('Collection', subscription.nodeIdentifier))
1120
1121        message = domish.Element((None, "message"))
1122        message["from"] = service.full()
1123        message["to"] = subscriber.full()
1124        event = message.addElement((NS_PUBSUB_EVENT, "event"))
1125
1126        element = event.addElement(eventType)
1127        element["node"] = nodeIdentifier
1128
1129        if headers:
1130            message.addChild(shim.Headers(headers))
1131
1132        return message
1133
1134    def notifyPublish(self, service, nodeIdentifier, notifications):
1135        for subscriber, subscriptions, items in notifications:
1136            message = self._createNotification('items', service,
1137                                               nodeIdentifier, subscriber,
1138                                               subscriptions)
1139            message.event.items.children = items
1140            self.send(message)
1141
1142
1143    def notifyDelete(self, service, nodeIdentifier, subscribers,
1144                           redirectURI=None):
1145        for subscriber in subscribers:
1146            message = self._createNotification('delete', service,
1147                                               nodeIdentifier,
1148                                               subscriber)
1149            if redirectURI:
1150                redirect = message.event.delete.addElement('redirect')
1151                redirect['uri'] = redirectURI
1152            self.send(message)
1153
1154
1155    def getNodeInfo(self, requestor, service, nodeIdentifier):
1156        return None
1157
1158
1159    def getNodes(self, requestor, service):
1160        return []
1161
1162
1163    def publish(self, requestor, service, nodeIdentifier, items):
1164        raise Unsupported('publish')
1165
1166
1167    def subscribe(self, requestor, service, nodeIdentifier, subscriber):
1168        raise Unsupported('subscribe')
1169
1170
1171    def unsubscribe(self, requestor, service, nodeIdentifier, subscriber):
1172        raise Unsupported('subscribe')
1173
1174
1175    def subscriptions(self, requestor, service):
1176        raise Unsupported('retrieve-subscriptions')
1177
1178
1179    def affiliations(self, requestor, service):
1180        raise Unsupported('retrieve-affiliations')
1181
1182
1183    def create(self, requestor, service, nodeIdentifier):
1184        raise Unsupported('create-nodes')
1185
1186
1187    def getConfigurationOptions(self):
1188        return {}
1189
1190
1191    def getDefaultConfiguration(self, requestor, service, nodeType):
1192        raise Unsupported('retrieve-default')
1193
1194
1195    def getConfiguration(self, requestor, service, nodeIdentifier):
1196        raise Unsupported('config-node')
1197
1198
1199    def setConfiguration(self, requestor, service, nodeIdentifier, options):
1200        raise Unsupported('config-node')
1201
1202
1203    def items(self, requestor, service, nodeIdentifier, maxItems,
1204                    itemIdentifiers):
1205        raise Unsupported('retrieve-items')
1206
1207
1208    def retract(self, requestor, service, nodeIdentifier, itemIdentifiers):
1209        raise Unsupported('retract-items')
1210
1211
1212    def purge(self, requestor, service, nodeIdentifier):
1213        raise Unsupported('purge-nodes')
1214
1215
1216    def delete(self, requestor, service, nodeIdentifier):
1217        raise Unsupported('delete-nodes')
Note: See TracBrowser for help on using the repository browser.