source: wokkel/pubsub.py @ 57:bcc9bc7a0929

Last change on this file since 57:bcc9bc7a0929 was 57:bcc9bc7a0929, checked in by Ralph Meijer <ralphm@…>, 13 years ago

Add a PubSubRequest? class, to parse and render publish-subscribe requests.

Author: ralphm.
Fixes #45.

File size: 37.8 KB
Line 
1# -*- test-case-name: wokkel.test.test_pubsub -*-
2#
3# Copyright (c) 2003-2008 Ralph Meijer
4# See LICENSE for details.
5
6"""
7XMPP publish-subscribe protocol.
8
9This protocol is specified in
10U{XEP-0060<http://www.xmpp.org/extensions/xep-0060.html>}.
11"""
12
13from zope.interface import implements
14
15from twisted.internet import defer
16from twisted.words.protocols.jabber import jid, error, xmlstream
17from twisted.words.xish import domish
18
19from wokkel import disco, data_form, generic, shim
20from wokkel.subprotocols import IQHandlerMixin, XMPPHandler
21from wokkel.iwokkel import IPubSubClient, IPubSubService
22
23# Iq get and set XPath queries
24IQ_GET = '/iq[@type="get"]'
25IQ_SET = '/iq[@type="set"]'
26
27# Publish-subscribe namespaces
28NS_PUBSUB = 'http://jabber.org/protocol/pubsub'
29NS_PUBSUB_EVENT = NS_PUBSUB + '#event'
30NS_PUBSUB_ERRORS = NS_PUBSUB + '#errors'
31NS_PUBSUB_OWNER = NS_PUBSUB + "#owner"
32NS_PUBSUB_NODE_CONFIG = NS_PUBSUB + "#node_config"
33NS_PUBSUB_META_DATA = NS_PUBSUB + "#meta-data"
34NS_PUBSUB_SUBSCRIBE_OPTIONS = NS_PUBSUB + "#subscribe_options"
35
36# XPath to match pubsub requests
37PUBSUB_REQUEST = '/iq[@type="get" or @type="set"]/' + \
38                    'pubsub[@xmlns="' + NS_PUBSUB + '" or ' + \
39                           '@xmlns="' + NS_PUBSUB_OWNER + '"]'
40
41class SubscriptionPending(Exception):
42    """
43    Raised when the requested subscription is pending acceptance.
44    """
45
46
47
48class SubscriptionUnconfigured(Exception):
49    """
50    Raised when the requested subscription needs to be configured before
51    becoming active.
52    """
53
54
55
56class PubSubError(error.StanzaError):
57    """
58    Exception with publish-subscribe specific condition.
59    """
60    def __init__(self, condition, pubsubCondition, feature=None, text=None):
61        appCondition = domish.Element((NS_PUBSUB_ERRORS, pubsubCondition))
62        if feature:
63            appCondition['feature'] = feature
64        error.StanzaError.__init__(self, condition,
65                                         text=text,
66                                         appCondition=appCondition)
67
68
69
70class BadRequest(error.StanzaError):
71    """
72    Bad request stanza error.
73    """
74    def __init__(self, pubsubCondition=None, text=None):
75        if pubsubCondition:
76            appCondition = domish.Element((NS_PUBSUB_ERRORS, pubsubCondition))
77        else:
78            appCondition = None
79        error.StanzaError.__init__(self, 'bad-request',
80                                         text=text,
81                                         appCondition=appCondition)
82
83
84
85class Unsupported(PubSubError):
86    def __init__(self, feature, text=None):
87        PubSubError.__init__(self, 'feature-not-implemented',
88                                   'unsupported',
89                                   feature,
90                                   text)
91
92
93
94class Subscription(object):
95    """
96    A subscription to a node.
97
98    @ivar nodeIdentifier: The identifier of the node subscribed to.
99                          The root node is denoted by C{None}.
100    @ivar subscriber: The subscribing entity.
101    @ivar state: The subscription state. One of C{'subscribed'}, C{'pending'},
102                 C{'unconfigured'}.
103    @ivar options: Optional list of subscription options.
104    @type options: C{dict}.
105    """
106
107    def __init__(self, nodeIdentifier, subscriber, state, options=None):
108        self.nodeIdentifier = nodeIdentifier
109        self.subscriber = subscriber
110        self.state = state
111        self.options = options or {}
112
113
114
115class Item(domish.Element):
116    """
117    Publish subscribe item.
118
119    This behaves like an object providing L{domish.IElement}.
120
121    Item payload can be added using C{addChild} or C{addRawXml}, or using the
122    C{payload} keyword argument to C{__init__}.
123    """
124
125    def __init__(self, id=None, payload=None):
126        """
127        @param id: optional item identifier
128        @type id: L{unicode}
129        @param payload: optional item payload. Either as a domish element, or
130                        as serialized XML.
131        @type payload: object providing L{domish.IElement} or L{unicode}.
132        """
133
134        domish.Element.__init__(self, (NS_PUBSUB, 'item'))
135        if id is not None:
136            self['id'] = id
137        if payload is not None:
138            if isinstance(payload, basestring):
139                self.addRawXml(payload)
140            else:
141                self.addChild(payload)
142
143
144
145class PubSubRequest(generic.Stanza):
146    """
147    A publish-subscribe request.
148
149    The set of instance variables used depends on the type of request. If
150    a variable is not applicable or not passed in the request, its value is
151    C{None}.
152
153    @ivar verb: The type of publish-subscribe request. See L{_requestVerbMap}.
154    @type verb: C{str}.
155
156    @ivar affiliations: Affiliations to be modified.
157    @type affiliations: C{set}
158    @ivar items: The items to be published, as L{domish.Element}s.
159    @type items: C{list}
160    @ivar itemIdentifiers: Identifiers of the items to be retrieved or
161                           retracted.
162    @type itemIdentifiers: C{set}
163    @ivar maxItems: Maximum number of items to retrieve.
164    @type maxItems: C{int}.
165    @ivar nodeIdentifier: Identifier of the node the request is about.
166    @type nodeIdentifier: C{unicode}
167    @ivar nodeType: The type of node that should be created, or for which the
168                    configuration is retrieved. C{'leaf'} or C{'collection'}.
169    @type nodeType: C{str}
170    @ivar options: Configurations options for nodes, subscriptions and publish
171                   requests.
172    @type options: L{data_form.Form}
173    @ivar subscriber: The subscribing entity.
174    @type subscriber: L{JID}
175    @ivar subscriptionIdentifier: Identifier for a specific subscription.
176    @type subscriptionIdentifier: C{unicode}
177    @ivar subscriptions: Subscriptions to be modified, as a set of
178                         L{Subscription}.
179    @type subscriptions: C{set}
180    """
181
182    verb = None
183
184    affiliations = None
185    items = None
186    itemIdentifiers = None
187    maxItems = None
188    nodeIdentifier = None
189    nodeType = None
190    options = None
191    subscriber = None
192    subscriptionIdentifier = None
193    subscriptions = None
194
195    # Map request iq type and subelement name to request verb
196    _requestVerbMap = {
197        ('set', NS_PUBSUB, 'publish'): 'publish',
198        ('set', NS_PUBSUB, 'subscribe'): 'subscribe',
199        ('set', NS_PUBSUB, 'unsubscribe'): 'unsubscribe',
200        ('get', NS_PUBSUB, 'options'): 'optionsGet',
201        ('set', NS_PUBSUB, 'options'): 'optionsSet',
202        ('get', NS_PUBSUB, 'subscriptions'): 'subscriptions',
203        ('get', NS_PUBSUB, 'affiliations'): 'affiliations',
204        ('set', NS_PUBSUB, 'create'): 'create',
205        ('get', NS_PUBSUB_OWNER, 'default'): 'default',
206        ('get', NS_PUBSUB_OWNER, 'configure'): 'configureGet',
207        ('set', NS_PUBSUB_OWNER, 'configure'): 'configureSet',
208        ('get', NS_PUBSUB, 'items'): 'items',
209        ('set', NS_PUBSUB, 'retract'): 'retract',
210        ('set', NS_PUBSUB_OWNER, 'purge'): 'purge',
211        ('set', NS_PUBSUB_OWNER, 'delete'): 'delete',
212        ('get', NS_PUBSUB_OWNER, 'affiliations'): 'affiliationsGet',
213        ('set', NS_PUBSUB_OWNER, 'affiliations'): 'affiliationsSet',
214        ('get', NS_PUBSUB_OWNER, 'subscriptions'): 'subscriptionsGet',
215        ('set', NS_PUBSUB_OWNER, 'subscriptions'): 'subscriptionsSet',
216    }
217
218    # Map request verb to request iq type and subelement name
219    _verbRequestMap = dict(((v, k) for k, v in _requestVerbMap.iteritems()))
220
221    # Map request verb to parameter handler names
222    _parameters = {
223        'publish': ['node', 'items'],
224        'subscribe': ['nodeOrEmpty', 'jid'],
225        'unsubscribe': ['nodeOrEmpty', 'jid'],
226        'optionsGet': ['nodeOrEmpty', 'jid'],
227        'optionsSet': ['nodeOrEmpty', 'jid', 'options'],
228        'subscriptions': [],
229        'affiliations': [],
230        'create': ['nodeOrNone'],
231        'default': ['default'],
232        'configureGet': ['nodeOrEmpty'],
233        'configureSet': ['nodeOrEmpty', 'configure'],
234        'items': ['node', 'maxItems', 'itemIdentifiers'],
235        'retract': ['node', 'itemIdentifiers'],
236        'purge': ['node'],
237        'delete': ['node'],
238        'affiliationsGet': [],
239        'affiliationsSet': [],
240        'subscriptionsGet': [],
241        'subscriptionsSet': [],
242    }
243
244    def __init__(self, verb=None):
245        self.verb = verb
246
247
248    @staticmethod
249    def _findForm(element, formNamespace):
250        """
251        Find a Data Form.
252
253        Look for an element that represents a Data Form with the specified
254        form namespace as a child element of the given element.
255        """
256        if not element:
257            return None
258
259        form = None
260        for child in element.elements():
261            try:
262                form = data_form.Form.fromElement(child)
263            except data_form.Error:
264                continue
265
266            if form.formNamespace != NS_PUBSUB_NODE_CONFIG:
267                continue
268
269        return form
270
271
272    def _parse_node(self, verbElement):
273        """
274        Parse the required node identifier out of the verbElement.
275        """
276        try:
277            self.nodeIdentifier = verbElement["node"]
278        except KeyError:
279            raise BadRequest('nodeid-required')
280
281
282    def _render_node(self, verbElement):
283        """
284        Render the required node identifier on the verbElement.
285        """
286        if not self.nodeIdentifier:
287            raise Exception("Node identifier is required")
288
289        verbElement['node'] = self.nodeIdentifier
290
291
292    def _parse_nodeOrEmpty(self, verbElement):
293        """
294        Parse the node identifier out of the verbElement. May be empty.
295        """
296        self.nodeIdentifier = verbElement.getAttribute("node", '')
297
298
299    def _render_nodeOrEmpty(self, verbElement):
300        """
301        Render the node identifier on the verbElement. May be empty.
302        """
303        if self.nodeIdentifier:
304            verbElement['node'] = self.nodeIdentifier
305
306
307    def _parse_nodeOrNone(self, verbElement):
308        """
309        Parse the optional node identifier out of the verbElement.
310        """
311        self.nodeIdentifier = verbElement.getAttribute("node")
312
313
314    def _render_nodeOrNone(self, verbElement):
315        """
316        Render the optional node identifier on the verbElement.
317        """
318        if self.nodeIdentifier:
319            verbElement['node'] = self.nodeIdentifier
320
321
322    def _parse_items(self, verbElement):
323        """
324        Parse items out of the verbElement for publish requests.
325        """
326        self.items = []
327        for element in verbElement.elements():
328            if element.uri == NS_PUBSUB and element.name == 'item':
329                self.items.append(element)
330
331
332    def _render_items(self, verbElement):
333        """
334        Render items into the verbElement for publish requests.
335        """
336        if self.items:
337            for item in self.items:
338                verbElement.addChild(item)
339
340
341    def _parse_jid(self, verbElement):
342        """
343        Parse subscriber out of the verbElement for un-/subscribe requests.
344        """
345        try:
346            self.subscriber = jid.internJID(verbElement["jid"])
347        except KeyError:
348            raise BadRequest('jid-required')
349
350
351    def _render_jid(self, verbElement):
352        """
353        Render subscriber into the verbElement for un-/subscribe requests.
354        """
355        verbElement['jid'] = self.subscriber.full()
356
357
358    def _parse_default(self, verbElement):
359        """
360        Parse node type out of a request for the default node configuration.
361        """
362        form = PubSubRequest._findForm(verbElement, NS_PUBSUB_NODE_CONFIG)
363        if form and form.formType == 'submit':
364            values = form.getValues()
365            self.nodeType = values.get('pubsub#node_type', 'leaf')
366        else:
367            self.nodeType = 'leaf'
368
369
370    def _parse_configure(self, verbElement):
371        """
372        Parse options out of a request for setting the node configuration.
373        """
374        form = PubSubRequest._findForm(verbElement, NS_PUBSUB_NODE_CONFIG)
375        if form:
376            if form.formType == 'submit':
377                self.options = form.getValues()
378            elif form.formType == 'cancel':
379                self.options = {}
380            else:
381                raise BadRequest(text="Unexpected form type %r" % form.formType)
382        else:
383            raise BadRequest(text="Missing configuration form")
384
385
386
387    def _parse_itemIdentifiers(self, verbElement):
388        """
389        Parse item identifiers out of items and retract requests.
390        """
391        self.itemIdentifiers = []
392        for element in verbElement.elements():
393            if element.uri == NS_PUBSUB and element.name == 'item':
394                try:
395                    self.itemIdentifiers.append(element["id"])
396                except KeyError:
397                    raise BadRequest()
398
399
400    def _render_itemIdentifiers(self, verbElement):
401        """
402        Render item identifiers into items and retract requests.
403        """
404        if self.itemIdentifiers:
405            for itemIdentifier in self.itemIdentifiers:
406                item = verbElement.addElement('item')
407                item['id'] = itemIdentifier
408
409
410    def _parse_maxItems(self, verbElement):
411        """
412        Parse maximum items out of an items request.
413        """
414        value = verbElement.getAttribute('max_items')
415
416        if value:
417            try:
418                self.maxItems = int(value)
419            except ValueError:
420                raise BadRequest(text="Field max_items requires a positive " +
421                                      "integer value")
422
423
424    def _render_maxItems(self, verbElement):
425        """
426        Parse maximum items into an items request.
427        """
428        if self.maxItems:
429            verbElement['max_items'] = unicode(self.maxItems)
430
431
432    def _parse_options(self, verbElement):
433        form = PubSubRequest._findForm(verbElement, NS_PUBSUB_SUBSCRIBE_OPTIONS)
434        if form:
435            if form.formType == 'submit':
436                self.options = form.getValues()
437            elif form.formType == 'cancel':
438                self.options = {}
439            else:
440                raise BadRequest(text="Unexpected form type %r" % form.formType)
441        else:
442            raise BadRequest(text="Missing options form")
443
444    def parseElement(self, element):
445        """
446        Parse the publish-subscribe verb and parameters out of a request.
447        """
448        generic.Stanza.parseElement(self, element)
449
450        for child in element.pubsub.elements():
451            key = (self.stanzaType, child.uri, child.name)
452            try:
453                verb = self._requestVerbMap[key]
454            except KeyError:
455                continue
456            else:
457                self.verb = verb
458                break
459
460        if not self.verb:
461            raise NotImplementedError()
462
463        for parameter in self._parameters[verb]:
464            getattr(self, '_parse_%s' % parameter)(child)
465
466
467    def send(self, xs):
468        """
469        Send this request to its recipient.
470
471        This renders all of the relevant parameters for this specific
472        requests into an L{xmlstream.IQ}, and invoke its C{send} method.
473        This returns a deferred that fires upon reception of a response. See
474        L{xmlstream.IQ} for details.
475
476        @param xs: The XML stream to send the request on.
477        @type xs: L{xmlstream.XmlStream}
478        @rtype: L{defer.Deferred}.
479        """
480
481        try:
482            (self.stanzaType,
483             childURI,
484             childName) = self._verbRequestMap[self.verb]
485        except KeyError:
486            raise NotImplementedError()
487
488        iq = xmlstream.IQ(xs, self.stanzaType)
489        iq.addElement((childURI, 'pubsub'))
490        verbElement = iq.pubsub.addElement(childName)
491
492        if self.sender:
493            iq['from'] = self.sender.full()
494        if self.recipient:
495            iq['to'] = self.recipient.full()
496
497        for parameter in self._parameters[self.verb]:
498            getattr(self, '_render_%s' % parameter)(verbElement)
499
500        return iq.send()
501
502
503
504class PubSubEvent(object):
505    """
506    A publish subscribe event.
507
508    @param sender: The entity from which the notification was received.
509    @type sender: L{jid.JID}
510    @param recipient: The entity to which the notification was sent.
511    @type recipient: L{wokkel.pubsub.ItemsEvent}
512    @param nodeIdentifier: Identifier of the node the event pertains to.
513    @type nodeIdentifier: C{unicode}
514    @param headers: SHIM headers, see L{wokkel.shim.extractHeaders}.
515    @type headers: L{dict}
516    """
517
518    def __init__(self, sender, recipient, nodeIdentifier, headers):
519        self.sender = sender
520        self.recipient = recipient
521        self.nodeIdentifier = nodeIdentifier
522        self.headers = headers
523
524
525
526class ItemsEvent(PubSubEvent):
527    """
528    A publish-subscribe event that signifies new, updated and retracted items.
529
530    @param items: List of received items as domish elements.
531    @type items: C{list} of L{domish.Element}
532    """
533
534    def __init__(self, sender, recipient, nodeIdentifier, items, headers):
535        PubSubEvent.__init__(self, sender, recipient, nodeIdentifier, headers)
536        self.items = items
537
538
539
540class DeleteEvent(PubSubEvent):
541    """
542    A publish-subscribe event that signifies the deletion of a node.
543    """
544
545    redirectURI = None
546
547
548
549class PurgeEvent(PubSubEvent):
550    """
551    A publish-subscribe event that signifies the purging of a node.
552    """
553
554
555
556class PubSubClient(XMPPHandler):
557    """
558    Publish subscribe client protocol.
559    """
560
561    implements(IPubSubClient)
562
563    def connectionInitialized(self):
564        self.xmlstream.addObserver('/message/event[@xmlns="%s"]' %
565                                   NS_PUBSUB_EVENT, self._onEvent)
566
567
568    def _onEvent(self, message):
569        try:
570            sender = jid.JID(message["from"])
571            recipient = jid.JID(message["to"])
572        except KeyError:
573            return
574
575        actionElement = None
576        for element in message.event.elements():
577            if element.uri == NS_PUBSUB_EVENT:
578                actionElement = element
579
580        if not actionElement:
581            return
582
583        eventHandler = getattr(self, "_onEvent_%s" % actionElement.name, None)
584
585        if eventHandler:
586            headers = shim.extractHeaders(message)
587            eventHandler(sender, recipient, actionElement, headers)
588            message.handled = True
589
590
591    def _onEvent_items(self, sender, recipient, action, headers):
592        nodeIdentifier = action["node"]
593
594        items = [element for element in action.elements()
595                         if element.name in ('item', 'retract')]
596
597        event = ItemsEvent(sender, recipient, nodeIdentifier, items, headers)
598        self.itemsReceived(event)
599
600
601    def _onEvent_delete(self, sender, recipient, action, headers):
602        nodeIdentifier = action["node"]
603        event = DeleteEvent(sender, recipient, nodeIdentifier, headers)
604        if action.redirect:
605            event.redirectURI = action.redirect.getAttribute('uri')
606        self.deleteReceived(event)
607
608
609    def _onEvent_purge(self, sender, recipient, action, headers):
610        nodeIdentifier = action["node"]
611        event = PurgeEvent(sender, recipient, nodeIdentifier, headers)
612        self.purgeReceived(event)
613
614
615    def itemsReceived(self, event):
616        pass
617
618
619    def deleteReceived(self, event):
620        pass
621
622
623    def purgeReceived(self, event):
624        pass
625
626
627    def createNode(self, service, nodeIdentifier=None):
628        """
629        Create a publish subscribe node.
630
631        @param service: The publish subscribe service to create the node at.
632        @type service: L{JID}
633        @param nodeIdentifier: Optional suggestion for the id of the node.
634        @type nodeIdentifier: C{unicode}
635        """
636        request = PubSubRequest('create')
637        request.recipient = service
638        request.nodeIdentifier = nodeIdentifier
639
640        def cb(iq):
641            try:
642                new_node = iq.pubsub.create["node"]
643            except AttributeError:
644                # the suggested node identifier was accepted
645                new_node = nodeIdentifier
646            return new_node
647
648        d = request.send(self.xmlstream)
649        d.addCallback(cb)
650        return d
651
652
653    def deleteNode(self, service, nodeIdentifier):
654        """
655        Delete a publish subscribe node.
656
657        @param service: The publish subscribe service to delete the node from.
658        @type service: L{JID}
659        @param nodeIdentifier: The identifier of the node.
660        @type nodeIdentifier: C{unicode}
661        """
662        request = PubSubRequest('delete')
663        request.recipient = service
664        request.nodeIdentifier = nodeIdentifier
665        return request.send(self.xmlstream)
666
667
668    def subscribe(self, service, nodeIdentifier, subscriber):
669        """
670        Subscribe to a publish subscribe node.
671
672        @param service: The publish subscribe service that keeps the node.
673        @type service: L{JID}
674        @param nodeIdentifier: The identifier of the node.
675        @type nodeIdentifier: C{unicode}
676        @param subscriber: The entity to subscribe to the node. This entity
677                           will get notifications of new published items.
678        @type subscriber: L{JID}
679        """
680        request = PubSubRequest('subscribe')
681        request.recipient = service
682        request.nodeIdentifier = nodeIdentifier
683        request.subscriber = subscriber
684
685        def cb(iq):
686            subscription = iq.pubsub.subscription["subscription"]
687
688            if subscription == 'pending':
689                raise SubscriptionPending
690            elif subscription == 'unconfigured':
691                raise SubscriptionUnconfigured
692            else:
693                # we assume subscription == 'subscribed'
694                # any other value would be invalid, but that should have
695                # yielded a stanza error.
696                return None
697
698        d = request.send(self.xmlstream)
699        d.addCallback(cb)
700        return d
701
702
703    def unsubscribe(self, service, nodeIdentifier, subscriber):
704        """
705        Unsubscribe from a publish subscribe node.
706
707        @param service: The publish subscribe service that keeps the node.
708        @type service: L{JID}
709        @param nodeIdentifier: The identifier of the node.
710        @type nodeIdentifier: C{unicode}
711        @param subscriber: The entity to unsubscribe from the node.
712        @type subscriber: L{JID}
713        """
714        request = PubSubRequest('unsubscribe')
715        request.recipient = service
716        request.nodeIdentifier = nodeIdentifier
717        request.subscriber = subscriber
718        return request.send(self.xmlstream)
719
720
721    def publish(self, service, nodeIdentifier, items=None):
722        """
723        Publish to a publish subscribe node.
724
725        @param service: The publish subscribe service that keeps the node.
726        @type service: L{JID}
727        @param nodeIdentifier: The identifier of the node.
728        @type nodeIdentifier: C{unicode}
729        @param items: Optional list of L{Item}s to publish.
730        @type items: C{list}
731        """
732        request = PubSubRequest('publish')
733        request.recipient = service
734        request.nodeIdentifier = nodeIdentifier
735        request.items = items
736        return request.send(self.xmlstream)
737
738
739    def items(self, service, nodeIdentifier, maxItems=None):
740        """
741        Retrieve previously published items from a publish subscribe node.
742
743        @param service: The publish subscribe service that keeps the node.
744        @type service: L{JID}
745        @param nodeIdentifier: The identifier of the node.
746        @type nodeIdentifier: C{unicode}
747        @param maxItems: Optional limit on the number of retrieved items.
748        @type maxItems: C{int}
749        """
750        request = PubSubRequest('items')
751        request.recipient = service
752        request.nodeIdentifier = nodeIdentifier
753        if maxItems:
754            request.maxItems = str(int(maxItems))
755
756        def cb(iq):
757            items = []
758            for element in iq.pubsub.items.elements():
759                if element.uri == NS_PUBSUB and element.name == 'item':
760                    items.append(element)
761            return items
762
763        d = request.send(self.xmlstream)
764        d.addCallback(cb)
765        return d
766
767
768
769class PubSubService(XMPPHandler, IQHandlerMixin):
770    """
771    Protocol implementation for a XMPP Publish Subscribe Service.
772
773    The word Service here is used as taken from the Publish Subscribe
774    specification. It is the party responsible for keeping nodes and their
775    subscriptions, and sending out notifications.
776
777    Methods from the L{IPubSubService} interface that are called as
778    a result of an XMPP request may raise exceptions. Alternatively the
779    deferred returned by these methods may have their errback called. These are
780    handled as follows:
781
782     - If the exception is an instance of L{error.StanzaError}, an error
783       response iq is returned.
784     - Any other exception is reported using L{log.msg}. An error response
785       with the condition C{internal-server-error} is returned.
786
787    The default implementation of said methods raises an L{Unsupported}
788    exception and are meant to be overridden.
789
790    @ivar discoIdentity: Service discovery identity as a dictionary with
791                         keys C{'category'}, C{'type'} and C{'name'}.
792    @ivar pubSubFeatures: List of supported publish-subscribe features for
793                          service discovery, as C{str}.
794    @type pubSubFeatures: C{list} or C{None}
795    """
796
797    implements(IPubSubService)
798
799    iqHandlers = {
800            '/*': '_onPubSubRequest',
801            }
802
803
804    def __init__(self):
805        self.discoIdentity = {'category': 'pubsub',
806                              'type': 'generic',
807                              'name': 'Generic Publish-Subscribe Service'}
808
809        self.pubSubFeatures = []
810
811
812    def connectionMade(self):
813        self.xmlstream.addObserver(PUBSUB_REQUEST, self.handleRequest)
814
815
816    def getDiscoInfo(self, requestor, target, nodeIdentifier):
817        info = []
818
819        if not nodeIdentifier:
820            category, idType, name = self.discoIdentity
821            info.append(disco.DiscoIdentity(category, idType, name))
822
823            info.append(disco.DiscoFeature(disco.NS_DISCO_ITEMS))
824            info.extend([disco.DiscoFeature("%s#%s" % (NS_PUBSUB, feature))
825                         for feature in self.pubSubFeatures])
826
827        def toInfo(nodeInfo):
828            if not nodeInfo:
829                return
830
831            (nodeType, metaData) = nodeInfo['type'], nodeInfo['meta-data']
832            info.append(disco.DiscoIdentity('pubsub', nodeType))
833            if metaData:
834                form = data_form.Form(formType="result",
835                                      formNamespace=NS_PUBSUB_META_DATA)
836                form.addField(
837                        data_form.Field(
838                            var='pubsub#node_type',
839                            value=nodeType,
840                            label='The type of node (collection or leaf)'
841                        )
842                )
843
844                for metaDatum in metaData:
845                    form.addField(data_form.Field.fromDict(metaDatum))
846
847                info.append(form)
848
849        d = self.getNodeInfo(requestor, target, nodeIdentifier or '')
850        d.addCallback(toInfo)
851        d.addBoth(lambda result: info)
852        return d
853
854
855    def getDiscoItems(self, requestor, target, nodeIdentifier):
856        if nodeIdentifier or self.hideNodes:
857            return defer.succeed([])
858
859        d = self.getNodes(requestor, target)
860        d.addCallback(lambda nodes: [disco.DiscoItem(target, node)
861                                     for node in nodes])
862        return d
863
864
865    def _onPubSubRequest(self, iq):
866        request = PubSubRequest.fromElement(iq)
867        handler = getattr(self, '_on_%s' % request.verb)
868        return handler(request)
869
870
871    def _on_publish(self, request):
872        return self.publish(request.sender, request.recipient,
873                            request.nodeIdentifier, request.items)
874
875
876    def _on_subscribe(self, request):
877
878        def toResponse(result):
879            response = domish.Element((NS_PUBSUB, "pubsub"))
880            subscription = response.addElement("subscription")
881            if result.nodeIdentifier:
882                subscription["node"] = result.nodeIdentifier
883            subscription["jid"] = result.subscriber.full()
884            subscription["subscription"] = result.state
885            return response
886
887        d = self.subscribe(request.sender, request.recipient,
888                           request.nodeIdentifier, request.subscriber)
889        d.addCallback(toResponse)
890        return d
891
892
893    def _on_unsubscribe(self, request):
894        return self.unsubscribe(request.sender, request.recipient,
895                                request.nodeIdentifier, request.subscriber)
896
897
898    def _on_optionsGet(self, request):
899        raise Unsupported('subscription-options')
900
901
902    def _on_optionsSet(self, request):
903        raise Unsupported('subscription-options')
904
905
906    def _on_subscriptions(self, request):
907
908        def toResponse(result):
909            response = domish.Element((NS_PUBSUB, 'pubsub'))
910            subscriptions = response.addElement('subscriptions')
911            for subscription in result:
912                item = subscriptions.addElement('subscription')
913                item['node'] = subscription.nodeIdentifier
914                item['jid'] = subscription.subscriber.full()
915                item['subscription'] = subscription.state
916            return response
917
918        d = self.subscriptions(request.sender, request.recipient)
919        d.addCallback(toResponse)
920        return d
921
922
923    def _on_affiliations(self, request):
924
925        def toResponse(result):
926            response = domish.Element((NS_PUBSUB, 'pubsub'))
927            affiliations = response.addElement('affiliations')
928
929            for nodeIdentifier, affiliation in result:
930                item = affiliations.addElement('affiliation')
931                item['node'] = nodeIdentifier
932                item['affiliation'] = affiliation
933
934            return response
935
936        d = self.affiliations(request.sender, request.recipient)
937        d.addCallback(toResponse)
938        return d
939
940
941    def _on_create(self, request):
942
943        def toResponse(result):
944            if not request.nodeIdentifier or request.nodeIdentifier != result:
945                response = domish.Element((NS_PUBSUB, 'pubsub'))
946                create = response.addElement('create')
947                create['node'] = result
948                return response
949            else:
950                return None
951
952        d = self.create(request.sender, request.recipient,
953                        request.nodeIdentifier)
954        d.addCallback(toResponse)
955        return d
956
957
958    def _makeFields(self, options, values):
959        fields = []
960        for name, value in values.iteritems():
961            if name not in options:
962                continue
963
964            option = {'var': name}
965            option.update(options[name])
966            if isinstance(value, list):
967                option['values'] = value
968            else:
969                option['value'] = value
970            fields.append(data_form.Field.fromDict(option))
971        return fields
972
973
974    def _formFromConfiguration(self, values):
975        options = self.getConfigurationOptions()
976        fields = self._makeFields(options, values)
977        form = data_form.Form(formType="form",
978                              formNamespace=NS_PUBSUB_NODE_CONFIG,
979                              fields=fields)
980
981        return form
982
983
984    def _checkConfiguration(self, values):
985        options = self.getConfigurationOptions()
986        processedValues = {}
987
988        for key, value in values.iteritems():
989            if key not in options:
990                continue
991
992            option = {'var': key}
993            option.update(options[key])
994            field = data_form.Field.fromDict(option)
995            if isinstance(value, list):
996                field.values = value
997            else:
998                field.value = value
999            field.typeCheck()
1000
1001            if isinstance(value, list):
1002                processedValues[key] = field.values
1003            else:
1004                processedValues[key] = field.value
1005
1006        return processedValues
1007
1008
1009    def _on_default(self, request):
1010
1011        def toResponse(options):
1012            response = domish.Element((NS_PUBSUB_OWNER, "pubsub"))
1013            default = response.addElement("default")
1014            default.addChild(self._formFromConfiguration(options).toElement())
1015            return response
1016
1017        if request.nodeType not in ('leaf', 'collection'):
1018            return defer.fail(error.StanzaError('not-acceptable'))
1019
1020        d = self.getDefaultConfiguration(request.sender, request.recipient,
1021                                         request.nodeType)
1022        d.addCallback(toResponse)
1023        return d
1024
1025
1026    def _on_configureGet(self, request):
1027        def toResponse(options):
1028            response = domish.Element((NS_PUBSUB_OWNER, "pubsub"))
1029            configure = response.addElement("configure")
1030            form = self._formFromConfiguration(options)
1031            configure.addChild(form.toElement())
1032
1033            if request.nodeIdentifier:
1034                configure["node"] = request.nodeIdentifier
1035
1036            return response
1037
1038        d = self.getConfiguration(request.sender, request.recipient,
1039                                  request.nodeIdentifier)
1040        d.addCallback(toResponse)
1041        return d
1042
1043
1044    def _on_configureSet(self, request):
1045        if request.options:
1046            request.options = self._checkConfiguration(request.options)
1047            return self.setConfiguration(request.sender, request.recipient,
1048                                         request.nodeIdentifier,
1049                                         request.options)
1050        else:
1051            return None
1052
1053
1054
1055    def _on_items(self, request):
1056
1057        def toResponse(result):
1058            response = domish.Element((NS_PUBSUB, 'pubsub'))
1059            items = response.addElement('items')
1060            items["node"] = request.nodeIdentifier
1061
1062            for item in result:
1063                items.addChild(item)
1064
1065            return response
1066
1067        d = self.items(request.sender, request.recipient,
1068                       request.nodeIdentifier, request.maxItems,
1069                       request.itemIdentifiers)
1070        d.addCallback(toResponse)
1071        return d
1072
1073
1074    def _on_retract(self, request):
1075        return self.retract(request.sender, request.recipient,
1076                            request.nodeIdentifier, request.itemIdentifiers)
1077
1078
1079    def _on_purge(self, request):
1080        return self.purge(request.sender, request.recipient,
1081                          request.nodeIdentifier)
1082
1083
1084    def _on_delete(self, request):
1085        return self.delete(request.sender, request.recipient,
1086                           request.nodeIdentifier)
1087
1088
1089    def _on_affiliationsGet(self, iq):
1090        raise Unsupported('modify-affiliations')
1091
1092
1093    def _on_affiliationsSet(self, iq):
1094        raise Unsupported('modify-affiliations')
1095
1096
1097    def _on_subscriptionsGet(self, iq):
1098        raise Unsupported('manage-subscriptions')
1099
1100
1101    def _on_subscriptionsSet(self, iq):
1102        raise Unsupported('manage-subscriptions')
1103
1104    # public methods
1105
1106    def _createNotification(self, eventType, service, nodeIdentifier,
1107                                  subscriber, subscriptions=None):
1108        headers = []
1109
1110        if subscriptions:
1111            for subscription in subscriptions:
1112                if nodeIdentifier != subscription.nodeIdentifier:
1113                    headers.append(('Collection', subscription.nodeIdentifier))
1114
1115        message = domish.Element((None, "message"))
1116        message["from"] = service.full()
1117        message["to"] = subscriber.full()
1118        event = message.addElement((NS_PUBSUB_EVENT, "event"))
1119
1120        element = event.addElement(eventType)
1121        element["node"] = nodeIdentifier
1122
1123        if headers:
1124            message.addChild(shim.Headers(headers))
1125
1126        return message
1127
1128    def notifyPublish(self, service, nodeIdentifier, notifications):
1129        for subscriber, subscriptions, items in notifications:
1130            message = self._createNotification('items', service,
1131                                               nodeIdentifier, subscriber,
1132                                               subscriptions)
1133            message.event.items.children = items
1134            self.send(message)
1135
1136
1137    def notifyDelete(self, service, nodeIdentifier, subscribers,
1138                           redirectURI=None):
1139        for subscriber in subscribers:
1140            message = self._createNotification('delete', service,
1141                                               nodeIdentifier,
1142                                               subscriber)
1143            if redirectURI:
1144                redirect = message.event.delete.addElement('redirect')
1145                redirect['uri'] = redirectURI
1146            self.send(message)
1147
1148
1149    def getNodeInfo(self, requestor, service, nodeIdentifier):
1150        return None
1151
1152
1153    def getNodes(self, requestor, service):
1154        return []
1155
1156
1157    def publish(self, requestor, service, nodeIdentifier, items):
1158        raise Unsupported('publish')
1159
1160
1161    def subscribe(self, requestor, service, nodeIdentifier, subscriber):
1162        raise Unsupported('subscribe')
1163
1164
1165    def unsubscribe(self, requestor, service, nodeIdentifier, subscriber):
1166        raise Unsupported('subscribe')
1167
1168
1169    def subscriptions(self, requestor, service):
1170        raise Unsupported('retrieve-subscriptions')
1171
1172
1173    def affiliations(self, requestor, service):
1174        raise Unsupported('retrieve-affiliations')
1175
1176
1177    def create(self, requestor, service, nodeIdentifier):
1178        raise Unsupported('create-nodes')
1179
1180
1181    def getConfigurationOptions(self):
1182        return {}
1183
1184
1185    def getDefaultConfiguration(self, requestor, service, nodeType):
1186        raise Unsupported('retrieve-default')
1187
1188
1189    def getConfiguration(self, requestor, service, nodeIdentifier):
1190        raise Unsupported('config-node')
1191
1192
1193    def setConfiguration(self, requestor, service, nodeIdentifier, options):
1194        raise Unsupported('config-node')
1195
1196
1197    def items(self, requestor, service, nodeIdentifier, maxItems,
1198                    itemIdentifiers):
1199        raise Unsupported('retrieve-items')
1200
1201
1202    def retract(self, requestor, service, nodeIdentifier, itemIdentifiers):
1203        raise Unsupported('retract-items')
1204
1205
1206    def purge(self, requestor, service, nodeIdentifier):
1207        raise Unsupported('purge-nodes')
1208
1209
1210    def delete(self, requestor, service, nodeIdentifier):
1211        raise Unsupported('delete-nodes')
Note: See TracBrowser for help on using the repository browser.