source: wokkel/pubsub.py @ 81:1334124db2fd

Last change on this file since 81:1334124db2fd was 81:1334124db2fd, checked in by Ralph Meijer <ralphm@…>, 11 years ago

Make PubSubRequest?.options be a form and use new form type checking

File size: 39.5 KB
Line 
1# -*- test-case-name: wokkel.test.test_pubsub -*-
2#
3# Copyright (c) 2003-2010 Ralph Meijer
4# See LICENSE for details.
5
6"""
7XMPP publish-subscribe protocol.
8
9This protocol is specified in
10U{XEP-0060<http://www.xmpp.org/extensions/xep-0060.html>}.
11"""
12
13from zope.interface import implements
14
15from twisted.internet import defer
16from twisted.python import log
17from twisted.words.protocols.jabber import jid, error
18from twisted.words.xish import domish
19
20from wokkel import disco, data_form, generic, shim
21from wokkel.compat import IQ
22from wokkel.subprotocols import IQHandlerMixin, XMPPHandler
23from wokkel.iwokkel import IPubSubClient, IPubSubService, IPubSubResource
24
25# Iq get and set XPath queries
26IQ_GET = '/iq[@type="get"]'
27IQ_SET = '/iq[@type="set"]'
28
29# Publish-subscribe namespaces
30NS_PUBSUB = 'http://jabber.org/protocol/pubsub'
31NS_PUBSUB_EVENT = NS_PUBSUB + '#event'
32NS_PUBSUB_ERRORS = NS_PUBSUB + '#errors'
33NS_PUBSUB_OWNER = NS_PUBSUB + "#owner"
34NS_PUBSUB_NODE_CONFIG = NS_PUBSUB + "#node_config"
35NS_PUBSUB_META_DATA = NS_PUBSUB + "#meta-data"
36NS_PUBSUB_SUBSCRIBE_OPTIONS = NS_PUBSUB + "#subscribe_options"
37
38# XPath to match pubsub requests
39PUBSUB_REQUEST = '/iq[@type="get" or @type="set"]/' + \
40                    'pubsub[@xmlns="' + NS_PUBSUB + '" or ' + \
41                           '@xmlns="' + NS_PUBSUB_OWNER + '"]'
42
43class SubscriptionPending(Exception):
44    """
45    Raised when the requested subscription is pending acceptance.
46    """
47
48
49
50class SubscriptionUnconfigured(Exception):
51    """
52    Raised when the requested subscription needs to be configured before
53    becoming active.
54    """
55
56
57
58class PubSubError(error.StanzaError):
59    """
60    Exception with publish-subscribe specific condition.
61    """
62    def __init__(self, condition, pubsubCondition, feature=None, text=None):
63        appCondition = domish.Element((NS_PUBSUB_ERRORS, pubsubCondition))
64        if feature:
65            appCondition['feature'] = feature
66        error.StanzaError.__init__(self, condition,
67                                         text=text,
68                                         appCondition=appCondition)
69
70
71
72class BadRequest(error.StanzaError):
73    """
74    Bad request stanza error.
75    """
76    def __init__(self, pubsubCondition=None, text=None):
77        if pubsubCondition:
78            appCondition = domish.Element((NS_PUBSUB_ERRORS, pubsubCondition))
79        else:
80            appCondition = None
81        error.StanzaError.__init__(self, 'bad-request',
82                                         text=text,
83                                         appCondition=appCondition)
84
85
86
87class Unsupported(PubSubError):
88    def __init__(self, feature, text=None):
89        self.feature = feature
90        PubSubError.__init__(self, 'feature-not-implemented',
91                                   'unsupported',
92                                   feature,
93                                   text)
94
95    def __str__(self):
96        message = PubSubError.__str__(self)
97        message += ', feature %r' % self.feature
98        return message
99
100
101class Subscription(object):
102    """
103    A subscription to a node.
104
105    @ivar nodeIdentifier: The identifier of the node subscribed to.
106                          The root node is denoted by C{None}.
107    @ivar subscriber: The subscribing entity.
108    @ivar state: The subscription state. One of C{'subscribed'}, C{'pending'},
109                 C{'unconfigured'}.
110    @ivar options: Optional list of subscription options.
111    @type options: C{dict}.
112    """
113
114    def __init__(self, nodeIdentifier, subscriber, state, options=None):
115        self.nodeIdentifier = nodeIdentifier
116        self.subscriber = subscriber
117        self.state = state
118        self.options = options or {}
119
120
121
122class Item(domish.Element):
123    """
124    Publish subscribe item.
125
126    This behaves like an object providing L{domish.IElement}.
127
128    Item payload can be added using C{addChild} or C{addRawXml}, or using the
129    C{payload} keyword argument to C{__init__}.
130    """
131
132    def __init__(self, id=None, payload=None):
133        """
134        @param id: optional item identifier
135        @type id: L{unicode}
136        @param payload: optional item payload. Either as a domish element, or
137                        as serialized XML.
138        @type payload: object providing L{domish.IElement} or L{unicode}.
139        """
140
141        domish.Element.__init__(self, (NS_PUBSUB, 'item'))
142        if id is not None:
143            self['id'] = id
144        if payload is not None:
145            if isinstance(payload, basestring):
146                self.addRawXml(payload)
147            else:
148                self.addChild(payload)
149
150
151
152class PubSubRequest(generic.Stanza):
153    """
154    A publish-subscribe request.
155
156    The set of instance variables used depends on the type of request. If
157    a variable is not applicable or not passed in the request, its value is
158    C{None}.
159
160    @ivar verb: The type of publish-subscribe request. See L{_requestVerbMap}.
161    @type verb: C{str}.
162
163    @ivar affiliations: Affiliations to be modified.
164    @type affiliations: C{set}
165    @ivar items: The items to be published, as L{domish.Element}s.
166    @type items: C{list}
167    @ivar itemIdentifiers: Identifiers of the items to be retrieved or
168                           retracted.
169    @type itemIdentifiers: C{set}
170    @ivar maxItems: Maximum number of items to retrieve.
171    @type maxItems: C{int}.
172    @ivar nodeIdentifier: Identifier of the node the request is about.
173    @type nodeIdentifier: C{unicode}
174    @ivar nodeType: The type of node that should be created, or for which the
175                    configuration is retrieved. C{'leaf'} or C{'collection'}.
176    @type nodeType: C{str}
177    @ivar options: Configurations options for nodes, subscriptions and publish
178                   requests.
179    @type options: L{data_form.Form}
180    @ivar subscriber: The subscribing entity.
181    @type subscriber: L{JID}
182    @ivar subscriptionIdentifier: Identifier for a specific subscription.
183    @type subscriptionIdentifier: C{unicode}
184    @ivar subscriptions: Subscriptions to be modified, as a set of
185                         L{Subscription}.
186    @type subscriptions: C{set}
187    """
188
189    verb = None
190
191    affiliations = None
192    items = None
193    itemIdentifiers = None
194    maxItems = None
195    nodeIdentifier = None
196    nodeType = None
197    options = None
198    subscriber = None
199    subscriptionIdentifier = None
200    subscriptions = None
201
202    # Map request iq type and subelement name to request verb
203    _requestVerbMap = {
204        ('set', NS_PUBSUB, 'publish'): 'publish',
205        ('set', NS_PUBSUB, 'subscribe'): 'subscribe',
206        ('set', NS_PUBSUB, 'unsubscribe'): 'unsubscribe',
207        ('get', NS_PUBSUB, 'options'): 'optionsGet',
208        ('set', NS_PUBSUB, 'options'): 'optionsSet',
209        ('get', NS_PUBSUB, 'subscriptions'): 'subscriptions',
210        ('get', NS_PUBSUB, 'affiliations'): 'affiliations',
211        ('set', NS_PUBSUB, 'create'): 'create',
212        ('get', NS_PUBSUB_OWNER, 'default'): 'default',
213        ('get', NS_PUBSUB_OWNER, 'configure'): 'configureGet',
214        ('set', NS_PUBSUB_OWNER, 'configure'): 'configureSet',
215        ('get', NS_PUBSUB, 'items'): 'items',
216        ('set', NS_PUBSUB, 'retract'): 'retract',
217        ('set', NS_PUBSUB_OWNER, 'purge'): 'purge',
218        ('set', NS_PUBSUB_OWNER, 'delete'): 'delete',
219        ('get', NS_PUBSUB_OWNER, 'affiliations'): 'affiliationsGet',
220        ('set', NS_PUBSUB_OWNER, 'affiliations'): 'affiliationsSet',
221        ('get', NS_PUBSUB_OWNER, 'subscriptions'): 'subscriptionsGet',
222        ('set', NS_PUBSUB_OWNER, 'subscriptions'): 'subscriptionsSet',
223    }
224
225    # Map request verb to request iq type and subelement name
226    _verbRequestMap = dict(((v, k) for k, v in _requestVerbMap.iteritems()))
227
228    # Map request verb to parameter handler names
229    _parameters = {
230        'publish': ['node', 'items'],
231        'subscribe': ['nodeOrEmpty', 'jid'],
232        'unsubscribe': ['nodeOrEmpty', 'jid'],
233        'optionsGet': ['nodeOrEmpty', 'jid'],
234        'optionsSet': ['nodeOrEmpty', 'jid', 'options'],
235        'subscriptions': [],
236        'affiliations': [],
237        'create': ['nodeOrNone'],
238        'default': ['default'],
239        'configureGet': ['nodeOrEmpty'],
240        'configureSet': ['nodeOrEmpty', 'configure'],
241        'items': ['node', 'maxItems', 'itemIdentifiers'],
242        'retract': ['node', 'itemIdentifiers'],
243        'purge': ['node'],
244        'delete': ['node'],
245        'affiliationsGet': ['nodeOrEmpty'],
246        'affiliationsSet': [],
247        'subscriptionsGet': ['nodeOrEmpty'],
248        'subscriptionsSet': [],
249    }
250
251    def __init__(self, verb=None):
252        self.verb = verb
253
254
255    def _parse_node(self, verbElement):
256        """
257        Parse the required node identifier out of the verbElement.
258        """
259        try:
260            self.nodeIdentifier = verbElement["node"]
261        except KeyError:
262            raise BadRequest('nodeid-required')
263
264
265    def _render_node(self, verbElement):
266        """
267        Render the required node identifier on the verbElement.
268        """
269        if not self.nodeIdentifier:
270            raise Exception("Node identifier is required")
271
272        verbElement['node'] = self.nodeIdentifier
273
274
275    def _parse_nodeOrEmpty(self, verbElement):
276        """
277        Parse the node identifier out of the verbElement. May be empty.
278        """
279        self.nodeIdentifier = verbElement.getAttribute("node", '')
280
281
282    def _render_nodeOrEmpty(self, verbElement):
283        """
284        Render the node identifier on the verbElement. May be empty.
285        """
286        if self.nodeIdentifier:
287            verbElement['node'] = self.nodeIdentifier
288
289
290    def _parse_nodeOrNone(self, verbElement):
291        """
292        Parse the optional node identifier out of the verbElement.
293        """
294        self.nodeIdentifier = verbElement.getAttribute("node")
295
296
297    def _render_nodeOrNone(self, verbElement):
298        """
299        Render the optional node identifier on the verbElement.
300        """
301        if self.nodeIdentifier:
302            verbElement['node'] = self.nodeIdentifier
303
304
305    def _parse_items(self, verbElement):
306        """
307        Parse items out of the verbElement for publish requests.
308        """
309        self.items = []
310        for element in verbElement.elements():
311            if element.uri == NS_PUBSUB and element.name == 'item':
312                self.items.append(element)
313
314
315    def _render_items(self, verbElement):
316        """
317        Render items into the verbElement for publish requests.
318        """
319        if self.items:
320            for item in self.items:
321                verbElement.addChild(item)
322
323
324    def _parse_jid(self, verbElement):
325        """
326        Parse subscriber out of the verbElement for un-/subscribe requests.
327        """
328        try:
329            self.subscriber = jid.internJID(verbElement["jid"])
330        except KeyError:
331            raise BadRequest('jid-required')
332
333
334    def _render_jid(self, verbElement):
335        """
336        Render subscriber into the verbElement for un-/subscribe requests.
337        """
338        verbElement['jid'] = self.subscriber.full()
339
340
341    def _parse_default(self, verbElement):
342        """
343        Parse node type out of a request for the default node configuration.
344        """
345        form = data_form.findForm(verbElement, NS_PUBSUB_NODE_CONFIG)
346        if form and form.formType == 'submit':
347            values = form.getValues()
348            self.nodeType = values.get('pubsub#node_type', 'leaf')
349        else:
350            self.nodeType = 'leaf'
351
352
353    def _parse_configure(self, verbElement):
354        """
355        Parse options out of a request for setting the node configuration.
356        """
357        form = data_form.findForm(verbElement, NS_PUBSUB_NODE_CONFIG)
358        if form:
359            if form.formType in ('submit', 'cancel'):
360                self.options = form
361            else:
362                raise BadRequest(text=u"Unexpected form type '%s'" % form.formType)
363        else:
364            raise BadRequest(text="Missing configuration form")
365
366
367
368    def _parse_itemIdentifiers(self, verbElement):
369        """
370        Parse item identifiers out of items and retract requests.
371        """
372        self.itemIdentifiers = []
373        for element in verbElement.elements():
374            if element.uri == NS_PUBSUB and element.name == 'item':
375                try:
376                    self.itemIdentifiers.append(element["id"])
377                except KeyError:
378                    raise BadRequest()
379
380
381    def _render_itemIdentifiers(self, verbElement):
382        """
383        Render item identifiers into items and retract requests.
384        """
385        if self.itemIdentifiers:
386            for itemIdentifier in self.itemIdentifiers:
387                item = verbElement.addElement('item')
388                item['id'] = itemIdentifier
389
390
391    def _parse_maxItems(self, verbElement):
392        """
393        Parse maximum items out of an items request.
394        """
395        value = verbElement.getAttribute('max_items')
396
397        if value:
398            try:
399                self.maxItems = int(value)
400            except ValueError:
401                raise BadRequest(text="Field max_items requires a positive " +
402                                      "integer value")
403
404
405    def _render_maxItems(self, verbElement):
406        """
407        Parse maximum items into an items request.
408        """
409        if self.maxItems:
410            verbElement['max_items'] = unicode(self.maxItems)
411
412
413    def _parse_options(self, verbElement):
414        """
415        Parse options form out of a subscription options request.
416        """
417        form = data_form.findForm(verbElement, NS_PUBSUB_SUBSCRIBE_OPTIONS)
418        if form:
419            if form.formType in ('submit', 'cancel'):
420                self.options = form
421            else:
422                raise BadRequest(text=u"Unexpected form type '%s'" % form.formType)
423        else:
424            raise BadRequest(text="Missing options form")
425
426
427    def parseElement(self, element):
428        """
429        Parse the publish-subscribe verb and parameters out of a request.
430        """
431        generic.Stanza.parseElement(self, element)
432
433        for child in element.pubsub.elements():
434            key = (self.stanzaType, child.uri, child.name)
435            try:
436                verb = self._requestVerbMap[key]
437            except KeyError:
438                continue
439            else:
440                self.verb = verb
441                break
442
443        if not self.verb:
444            raise NotImplementedError()
445
446        for parameter in self._parameters[verb]:
447            getattr(self, '_parse_%s' % parameter)(child)
448
449
450    def send(self, xs):
451        """
452        Send this request to its recipient.
453
454        This renders all of the relevant parameters for this specific
455        requests into an L{IQ}, and invoke its C{send} method.
456        This returns a deferred that fires upon reception of a response. See
457        L{IQ} for details.
458
459        @param xs: The XML stream to send the request on.
460        @type xs: L{xmlstream.XmlStream}
461        @rtype: L{defer.Deferred}.
462        """
463
464        try:
465            (self.stanzaType,
466             childURI,
467             childName) = self._verbRequestMap[self.verb]
468        except KeyError:
469            raise NotImplementedError()
470
471        iq = IQ(xs, self.stanzaType)
472        iq.addElement((childURI, 'pubsub'))
473        verbElement = iq.pubsub.addElement(childName)
474
475        if self.sender:
476            iq['from'] = self.sender.full()
477        if self.recipient:
478            iq['to'] = self.recipient.full()
479
480        for parameter in self._parameters[self.verb]:
481            getattr(self, '_render_%s' % parameter)(verbElement)
482
483        return iq.send()
484
485
486
487class PubSubEvent(object):
488    """
489    A publish subscribe event.
490
491    @param sender: The entity from which the notification was received.
492    @type sender: L{jid.JID}
493    @param recipient: The entity to which the notification was sent.
494    @type recipient: L{wokkel.pubsub.ItemsEvent}
495    @param nodeIdentifier: Identifier of the node the event pertains to.
496    @type nodeIdentifier: C{unicode}
497    @param headers: SHIM headers, see L{wokkel.shim.extractHeaders}.
498    @type headers: L{dict}
499    """
500
501    def __init__(self, sender, recipient, nodeIdentifier, headers):
502        self.sender = sender
503        self.recipient = recipient
504        self.nodeIdentifier = nodeIdentifier
505        self.headers = headers
506
507
508
509class ItemsEvent(PubSubEvent):
510    """
511    A publish-subscribe event that signifies new, updated and retracted items.
512
513    @param items: List of received items as domish elements.
514    @type items: C{list} of L{domish.Element}
515    """
516
517    def __init__(self, sender, recipient, nodeIdentifier, items, headers):
518        PubSubEvent.__init__(self, sender, recipient, nodeIdentifier, headers)
519        self.items = items
520
521
522
523class DeleteEvent(PubSubEvent):
524    """
525    A publish-subscribe event that signifies the deletion of a node.
526    """
527
528    redirectURI = None
529
530
531
532class PurgeEvent(PubSubEvent):
533    """
534    A publish-subscribe event that signifies the purging of a node.
535    """
536
537
538
539class PubSubClient(XMPPHandler):
540    """
541    Publish subscribe client protocol.
542    """
543
544    implements(IPubSubClient)
545
546    def connectionInitialized(self):
547        self.xmlstream.addObserver('/message/event[@xmlns="%s"]' %
548                                   NS_PUBSUB_EVENT, self._onEvent)
549
550
551    def _onEvent(self, message):
552        try:
553            sender = jid.JID(message["from"])
554            recipient = jid.JID(message["to"])
555        except KeyError:
556            return
557
558        actionElement = None
559        for element in message.event.elements():
560            if element.uri == NS_PUBSUB_EVENT:
561                actionElement = element
562
563        if not actionElement:
564            return
565
566        eventHandler = getattr(self, "_onEvent_%s" % actionElement.name, None)
567
568        if eventHandler:
569            headers = shim.extractHeaders(message)
570            eventHandler(sender, recipient, actionElement, headers)
571            message.handled = True
572
573
574    def _onEvent_items(self, sender, recipient, action, headers):
575        nodeIdentifier = action["node"]
576
577        items = [element for element in action.elements()
578                         if element.name in ('item', 'retract')]
579
580        event = ItemsEvent(sender, recipient, nodeIdentifier, items, headers)
581        self.itemsReceived(event)
582
583
584    def _onEvent_delete(self, sender, recipient, action, headers):
585        nodeIdentifier = action["node"]
586        event = DeleteEvent(sender, recipient, nodeIdentifier, headers)
587        if action.redirect:
588            event.redirectURI = action.redirect.getAttribute('uri')
589        self.deleteReceived(event)
590
591
592    def _onEvent_purge(self, sender, recipient, action, headers):
593        nodeIdentifier = action["node"]
594        event = PurgeEvent(sender, recipient, nodeIdentifier, headers)
595        self.purgeReceived(event)
596
597
598    def itemsReceived(self, event):
599        pass
600
601
602    def deleteReceived(self, event):
603        pass
604
605
606    def purgeReceived(self, event):
607        pass
608
609
610    def createNode(self, service, nodeIdentifier=None, sender=None):
611        """
612        Create a publish subscribe node.
613
614        @param service: The publish subscribe service to create the node at.
615        @type service: L{JID}
616        @param nodeIdentifier: Optional suggestion for the id of the node.
617        @type nodeIdentifier: C{unicode}
618        """
619        request = PubSubRequest('create')
620        request.recipient = service
621        request.nodeIdentifier = nodeIdentifier
622        request.sender = sender
623
624        def cb(iq):
625            try:
626                new_node = iq.pubsub.create["node"]
627            except AttributeError:
628                # the suggested node identifier was accepted
629                new_node = nodeIdentifier
630            return new_node
631
632        d = request.send(self.xmlstream)
633        d.addCallback(cb)
634        return d
635
636
637    def deleteNode(self, service, nodeIdentifier, sender=None):
638        """
639        Delete a publish subscribe node.
640
641        @param service: The publish subscribe service to delete the node from.
642        @type service: L{JID}
643        @param nodeIdentifier: The identifier of the node.
644        @type nodeIdentifier: C{unicode}
645        """
646        request = PubSubRequest('delete')
647        request.recipient = service
648        request.nodeIdentifier = nodeIdentifier
649        request.sender = sender
650        return request.send(self.xmlstream)
651
652
653    def subscribe(self, service, nodeIdentifier, subscriber, sender=None):
654        """
655        Subscribe to a publish subscribe node.
656
657        @param service: The publish subscribe service that keeps the node.
658        @type service: L{JID}
659        @param nodeIdentifier: The identifier of the node.
660        @type nodeIdentifier: C{unicode}
661        @param subscriber: The entity to subscribe to the node. This entity
662                           will get notifications of new published items.
663        @type subscriber: L{JID}
664        """
665        request = PubSubRequest('subscribe')
666        request.recipient = service
667        request.nodeIdentifier = nodeIdentifier
668        request.subscriber = subscriber
669        request.sender = sender
670
671        def cb(iq):
672            subscription = iq.pubsub.subscription["subscription"]
673
674            if subscription == 'pending':
675                raise SubscriptionPending
676            elif subscription == 'unconfigured':
677                raise SubscriptionUnconfigured
678            else:
679                # we assume subscription == 'subscribed'
680                # any other value would be invalid, but that should have
681                # yielded a stanza error.
682                return None
683
684        d = request.send(self.xmlstream)
685        d.addCallback(cb)
686        return d
687
688
689    def unsubscribe(self, service, nodeIdentifier, subscriber, sender=None):
690        """
691        Unsubscribe from a publish subscribe node.
692
693        @param service: The publish subscribe service that keeps the node.
694        @type service: L{JID}
695        @param nodeIdentifier: The identifier of the node.
696        @type nodeIdentifier: C{unicode}
697        @param subscriber: The entity to unsubscribe from the node.
698        @type subscriber: L{JID}
699        """
700        request = PubSubRequest('unsubscribe')
701        request.recipient = service
702        request.nodeIdentifier = nodeIdentifier
703        request.subscriber = subscriber
704        request.sender = sender
705        return request.send(self.xmlstream)
706
707
708    def publish(self, service, nodeIdentifier, items=None, sender=None):
709        """
710        Publish to a publish subscribe node.
711
712        @param service: The publish subscribe service that keeps the node.
713        @type service: L{JID}
714        @param nodeIdentifier: The identifier of the node.
715        @type nodeIdentifier: C{unicode}
716        @param items: Optional list of L{Item}s to publish.
717        @type items: C{list}
718        """
719        request = PubSubRequest('publish')
720        request.recipient = service
721        request.nodeIdentifier = nodeIdentifier
722        request.items = items
723        request.sender = sender
724        return request.send(self.xmlstream)
725
726
727    def items(self, service, nodeIdentifier, maxItems=None, sender=None):
728        """
729        Retrieve previously published items from a publish subscribe node.
730
731        @param service: The publish subscribe service that keeps the node.
732        @type service: L{JID}
733        @param nodeIdentifier: The identifier of the node.
734        @type nodeIdentifier: C{unicode}
735        @param maxItems: Optional limit on the number of retrieved items.
736        @type maxItems: C{int}
737        """
738        request = PubSubRequest('items')
739        request.recipient = service
740        request.nodeIdentifier = nodeIdentifier
741        if maxItems:
742            request.maxItems = str(int(maxItems))
743        request.sender = sender
744
745        def cb(iq):
746            items = []
747            for element in iq.pubsub.items.elements():
748                if element.uri == NS_PUBSUB and element.name == 'item':
749                    items.append(element)
750            return items
751
752        d = request.send(self.xmlstream)
753        d.addCallback(cb)
754        return d
755
756
757
758class PubSubService(XMPPHandler, IQHandlerMixin):
759    """
760    Protocol implementation for a XMPP Publish Subscribe Service.
761
762    The word Service here is used as taken from the Publish Subscribe
763    specification. It is the party responsible for keeping nodes and their
764    subscriptions, and sending out notifications.
765
766    Methods from the L{IPubSubService} interface that are called as
767    a result of an XMPP request may raise exceptions. Alternatively the
768    deferred returned by these methods may have their errback called. These are
769    handled as follows:
770
771     - If the exception is an instance of L{error.StanzaError}, an error
772       response iq is returned.
773     - Any other exception is reported using L{log.msg}. An error response
774       with the condition C{internal-server-error} is returned.
775
776    The default implementation of said methods raises an L{Unsupported}
777    exception and are meant to be overridden.
778
779    @ivar discoIdentity: Service discovery identity as a dictionary with
780                         keys C{'category'}, C{'type'} and C{'name'}.
781    @ivar pubSubFeatures: List of supported publish-subscribe features for
782                          service discovery, as C{str}.
783    @type pubSubFeatures: C{list} or C{None}
784    """
785
786    implements(IPubSubService)
787
788    iqHandlers = {
789            '/*': '_onPubSubRequest',
790            }
791
792    _legacyHandlers = {
793        'publish': ('publish', ['sender', 'recipient',
794                                'nodeIdentifier', 'items']),
795        'subscribe': ('subscribe', ['sender', 'recipient',
796                                    'nodeIdentifier', 'subscriber']),
797        'unsubscribe': ('unsubscribe', ['sender', 'recipient',
798                                        'nodeIdentifier', 'subscriber']),
799        'subscriptions': ('subscriptions', ['sender', 'recipient']),
800        'affiliations': ('affiliations', ['sender', 'recipient']),
801        'create': ('create', ['sender', 'recipient', 'nodeIdentifier']),
802        'getConfigurationOptions': ('getConfigurationOptions', []),
803        'default': ('getDefaultConfiguration',
804                    ['sender', 'recipient', 'nodeType']),
805        'configureGet': ('getConfiguration', ['sender', 'recipient',
806                                              'nodeIdentifier']),
807        'configureSet': ('setConfiguration', ['sender', 'recipient',
808                                              'nodeIdentifier', 'options']),
809        'items': ('items', ['sender', 'recipient', 'nodeIdentifier',
810                            'maxItems', 'itemIdentifiers']),
811        'retract': ('retract', ['sender', 'recipient', 'nodeIdentifier',
812                                'itemIdentifiers']),
813        'purge': ('purge', ['sender', 'recipient', 'nodeIdentifier']),
814        'delete': ('delete', ['sender', 'recipient', 'nodeIdentifier']),
815    }
816
817    hideNodes = False
818
819    def __init__(self, resource=None):
820        self.resource = resource
821        self.discoIdentity = {'category': 'pubsub',
822                              'type': 'generic',
823                              'name': 'Generic Publish-Subscribe Service'}
824
825        self.pubSubFeatures = []
826
827
828    def connectionMade(self):
829        self.xmlstream.addObserver(PUBSUB_REQUEST, self.handleRequest)
830
831
832    def getDiscoInfo(self, requestor, target, nodeIdentifier):
833        def toInfo(nodeInfo, info):
834            if not nodeInfo:
835                return info
836
837            (nodeType, metaData) = nodeInfo['type'], nodeInfo['meta-data']
838            info.append(disco.DiscoIdentity('pubsub', nodeType))
839            if metaData:
840                form = data_form.Form(formType="result",
841                                      formNamespace=NS_PUBSUB_META_DATA)
842                form.addField(
843                        data_form.Field(
844                            var='pubsub#node_type',
845                            value=nodeType,
846                            label='The type of node (collection or leaf)'
847                        )
848                )
849
850                for metaDatum in metaData:
851                    form.addField(data_form.Field.fromDict(metaDatum))
852
853                info.append(form)
854
855            return info
856
857        info = []
858
859        request = PubSubRequest('discoInfo')
860
861        if self.resource is not None:
862            resource = self.resource.locateResource(request)
863            identity = resource.discoIdentity
864            features = resource.features
865            getInfo = resource.getInfo
866        else:
867            category, idType, name = self.discoIdentity
868            identity = disco.DiscoIdentity(category, idType, name)
869            features = self.pubSubFeatures
870            getInfo = self.getNodeInfo
871
872        if not nodeIdentifier:
873            info.append(identity)
874            info.append(disco.DiscoFeature(disco.NS_DISCO_ITEMS))
875            info.extend([disco.DiscoFeature("%s#%s" % (NS_PUBSUB, feature))
876                         for feature in features])
877
878        d = getInfo(requestor, target, nodeIdentifier or '')
879        d.addCallback(toInfo, info)
880        d.addErrback(log.err)
881        return d
882
883
884    def getDiscoItems(self, requestor, target, nodeIdentifier):
885        if self.hideNodes:
886            d = defer.succeed([])
887        elif self.resource is not None:
888            request = PubSubRequest('discoInfo')
889            resource = self.resource.locateResource(request)
890            d = resource.getNodes(requestor, target, nodeIdentifier)
891        elif nodeIdentifier:
892            d = self.getNodes(requestor, target)
893        else:
894            d = defer.succeed([])
895
896        d.addCallback(lambda nodes: [disco.DiscoItem(target, node)
897                                     for node in nodes])
898        return d
899
900
901    def _onPubSubRequest(self, iq):
902        request = PubSubRequest.fromElement(iq)
903
904        if self.resource is not None:
905            resource = self.resource.locateResource(request)
906        else:
907            resource = self
908
909        # Preprocess the request, knowing the handling resource
910        try:
911            preProcessor = getattr(self, '_preProcess_%s' % request.verb)
912        except AttributeError:
913            pass
914        else:
915            request = preProcessor(resource, request)
916            if request is None:
917                return defer.succeed(None)
918
919        # Process the request itself,
920        if resource is not self:
921            try:
922                handler = getattr(resource, request.verb)
923            except AttributeError:
924                # fix lookup feature
925                text = "Request verb: %s" % request.verb
926                return defer.fail(Unsupported('', text))
927
928            d = handler(request)
929        else:
930            handlerName, argNames = self._legacyHandlers[request.verb]
931            handler = getattr(self, handlerName)
932            args = [getattr(request, arg) for arg in argNames]
933            if 'options' in argNames:
934                args[argNames.index('options')] = request.options.getValues()
935            d = handler(*args)
936
937        # If needed, translate the result into a response
938        try:
939            cb = getattr(self, '_toResponse_%s' % request.verb)
940        except AttributeError:
941            pass
942        else:
943            d.addCallback(cb, resource, request)
944
945        return d
946
947
948    def _toResponse_subscribe(self, result, resource, request):
949        response = domish.Element((NS_PUBSUB, "pubsub"))
950        subscription = response.addElement("subscription")
951        if result.nodeIdentifier:
952            subscription["node"] = result.nodeIdentifier
953        subscription["jid"] = result.subscriber.full()
954        subscription["subscription"] = result.state
955        return response
956
957
958    def _toResponse_subscriptions(self, result, resource, request):
959        response = domish.Element((NS_PUBSUB, 'pubsub'))
960        subscriptions = response.addElement('subscriptions')
961        for subscription in result:
962            item = subscriptions.addElement('subscription')
963            item['node'] = subscription.nodeIdentifier
964            item['jid'] = subscription.subscriber.full()
965            item['subscription'] = subscription.state
966        return response
967
968
969    def _toResponse_affiliations(self, result, resource, request):
970        response = domish.Element((NS_PUBSUB, 'pubsub'))
971        affiliations = response.addElement('affiliations')
972
973        for nodeIdentifier, affiliation in result:
974            item = affiliations.addElement('affiliation')
975            item['node'] = nodeIdentifier
976            item['affiliation'] = affiliation
977
978        return response
979
980
981    def _toResponse_create(self, result, resource, request):
982        if not request.nodeIdentifier or request.nodeIdentifier != result:
983            response = domish.Element((NS_PUBSUB, 'pubsub'))
984            create = response.addElement('create')
985            create['node'] = result
986            return response
987        else:
988            return None
989
990
991    def _formFromConfiguration(self, resource, values):
992        fieldDefs = resource.getConfigurationOptions()
993        form = data_form.Form(formType="form",
994                              formNamespace=NS_PUBSUB_NODE_CONFIG)
995        form.makeFields(values, fieldDefs)
996        return form
997
998
999    def _checkConfiguration(self, resource, form):
1000        fieldDefs = resource.getConfigurationOptions()
1001        form.typeCheck(fieldDefs, filterUnknown=True)
1002
1003
1004    def _preProcess_default(self, resource, request):
1005        if request.nodeType not in ('leaf', 'collection'):
1006            raise error.StanzaError('not-acceptable')
1007        else:
1008            return request
1009
1010
1011    def _toResponse_default(self, options, resource, request):
1012        response = domish.Element((NS_PUBSUB_OWNER, "pubsub"))
1013        default = response.addElement("default")
1014        form = self._formFromConfiguration(resource, options)
1015        default.addChild(form.toElement())
1016        return response
1017
1018
1019    def _toResponse_configureGet(self, options, resource, request):
1020        response = domish.Element((NS_PUBSUB_OWNER, "pubsub"))
1021        configure = response.addElement("configure")
1022        form = self._formFromConfiguration(resource, options)
1023        configure.addChild(form.toElement())
1024
1025        if request.nodeIdentifier:
1026            configure["node"] = request.nodeIdentifier
1027
1028        return response
1029
1030
1031    def _preProcess_configureSet(self, resource, request):
1032        if request.options.formType == 'cancel':
1033            return None
1034        else:
1035            self._checkConfiguration(resource, request.options)
1036            return request
1037
1038
1039    def _toResponse_items(self, result, resource, request):
1040        response = domish.Element((NS_PUBSUB, 'pubsub'))
1041        items = response.addElement('items')
1042        items["node"] = request.nodeIdentifier
1043
1044        for item in result:
1045            items.addChild(item)
1046
1047        return response
1048
1049
1050    def _createNotification(self, eventType, service, nodeIdentifier,
1051                                  subscriber, subscriptions=None):
1052        headers = []
1053
1054        if subscriptions:
1055            for subscription in subscriptions:
1056                if nodeIdentifier != subscription.nodeIdentifier:
1057                    headers.append(('Collection', subscription.nodeIdentifier))
1058
1059        message = domish.Element((None, "message"))
1060        message["from"] = service.full()
1061        message["to"] = subscriber.full()
1062        event = message.addElement((NS_PUBSUB_EVENT, "event"))
1063
1064        element = event.addElement(eventType)
1065        element["node"] = nodeIdentifier
1066
1067        if headers:
1068            message.addChild(shim.Headers(headers))
1069
1070        return message
1071
1072    # public methods
1073
1074    def notifyPublish(self, service, nodeIdentifier, notifications):
1075        for subscriber, subscriptions, items in notifications:
1076            message = self._createNotification('items', service,
1077                                               nodeIdentifier, subscriber,
1078                                               subscriptions)
1079            message.event.items.children = items
1080            self.send(message)
1081
1082
1083    def notifyDelete(self, service, nodeIdentifier, subscribers,
1084                           redirectURI=None):
1085        for subscriber in subscribers:
1086            message = self._createNotification('delete', service,
1087                                               nodeIdentifier,
1088                                               subscriber)
1089            if redirectURI:
1090                redirect = message.event.delete.addElement('redirect')
1091                redirect['uri'] = redirectURI
1092            self.send(message)
1093
1094
1095    def getNodeInfo(self, requestor, service, nodeIdentifier):
1096        return None
1097
1098
1099    def getNodes(self, requestor, service):
1100        return []
1101
1102
1103    def publish(self, requestor, service, nodeIdentifier, items):
1104        raise Unsupported('publish')
1105
1106
1107    def subscribe(self, requestor, service, nodeIdentifier, subscriber):
1108        raise Unsupported('subscribe')
1109
1110
1111    def unsubscribe(self, requestor, service, nodeIdentifier, subscriber):
1112        raise Unsupported('subscribe')
1113
1114
1115    def subscriptions(self, requestor, service):
1116        raise Unsupported('retrieve-subscriptions')
1117
1118
1119    def affiliations(self, requestor, service):
1120        raise Unsupported('retrieve-affiliations')
1121
1122
1123    def create(self, requestor, service, nodeIdentifier):
1124        raise Unsupported('create-nodes')
1125
1126
1127    def getConfigurationOptions(self):
1128        return {}
1129
1130
1131    def getDefaultConfiguration(self, requestor, service, nodeType):
1132        raise Unsupported('retrieve-default')
1133
1134
1135    def getConfiguration(self, requestor, service, nodeIdentifier):
1136        raise Unsupported('config-node')
1137
1138
1139    def setConfiguration(self, requestor, service, nodeIdentifier, options):
1140        raise Unsupported('config-node')
1141
1142
1143    def items(self, requestor, service, nodeIdentifier, maxItems,
1144                    itemIdentifiers):
1145        raise Unsupported('retrieve-items')
1146
1147
1148    def retract(self, requestor, service, nodeIdentifier, itemIdentifiers):
1149        raise Unsupported('retract-items')
1150
1151
1152    def purge(self, requestor, service, nodeIdentifier):
1153        raise Unsupported('purge-nodes')
1154
1155
1156    def delete(self, requestor, service, nodeIdentifier):
1157        raise Unsupported('delete-nodes')
1158
1159
1160
1161class PubSubResource(object):
1162
1163    implements(IPubSubResource)
1164
1165    features = []
1166    discoIdentity = disco.DiscoIdentity('pubsub',
1167                                        'service',
1168                                        'Publish-Subscribe Service')
1169
1170
1171    def locateResource(self, request):
1172        return self
1173
1174
1175    def getInfo(self, requestor, service, nodeIdentifier):
1176        return defer.succeed(None)
1177
1178
1179    def getNodes(self, requestor, service, nodeIdentifier):
1180        return defer.succeed([])
1181
1182
1183    def getConfigurationOptions(self):
1184        return {}
1185
1186
1187    def publish(self, request):
1188        return defer.fail(Unsupported('publish'))
1189
1190
1191    def subscribe(self, request):
1192        return defer.fail(Unsupported('subscribe'))
1193
1194
1195    def unsubscribe(self, request):
1196        return defer.fail(Unsupported('subscribe'))
1197
1198
1199    def subscriptions(self, request):
1200        return defer.fail(Unsupported('retrieve-subscriptions'))
1201
1202
1203    def affiliations(self, request):
1204        return defer.fail(Unsupported('retrieve-affiliations'))
1205
1206
1207    def create(self, request):
1208        return defer.fail(Unsupported('create-nodes'))
1209
1210
1211    def default(self, request):
1212        return defer.fail(Unsupported('retrieve-default'))
1213
1214
1215    def configureGet(self, request):
1216        return defer.fail(Unsupported('config-node'))
1217
1218
1219    def configureSet(self, request):
1220        return defer.fail(Unsupported('config-node'))
1221
1222
1223    def items(self, request):
1224        return defer.fail(Unsupported('retrieve-items'))
1225
1226
1227    def retract(self, request):
1228        return defer.fail(Unsupported('retract-items'))
1229
1230
1231    def purge(self, request):
1232        return defer.fail(Unsupported('purge-nodes'))
1233
1234
1235    def delete(self, request):
1236        return defer.fail(Unsupported('delete-nodes'))
1237
1238
1239    def affiliationsGet(self, request):
1240        return defer.fail(Unsupported('modify-affiliations'))
1241
1242
1243    def affiliationsSet(self, request):
1244        return defer.fail(Unsupported('modify-affiliations'))
1245
1246
1247    def subscriptionsGet(self, request):
1248        return defer.fail(Unsupported('manage-subscriptions'))
1249
1250
1251    def subscriptionsSet(self, request):
1252        return defer.fail(Unsupported('manage-subscriptions'))
Note: See TracBrowser for help on using the repository browser.