source: wokkel/pubsub.py @ 80:3ee3e922549d

Last change on this file since 80:3ee3e922549d was 80:3ee3e922549d, checked in by Ralph Meijer <ralphm@…>, 13 years ago

Move findForm to wokkel.data_form

File size: 40.1 KB
Line 
1# -*- test-case-name: wokkel.test.test_pubsub -*-
2#
3# Copyright (c) 2003-2010 Ralph Meijer
4# See LICENSE for details.
5
6"""
7XMPP publish-subscribe protocol.
8
9This protocol is specified in
10U{XEP-0060<http://www.xmpp.org/extensions/xep-0060.html>}.
11"""
12
13from zope.interface import implements
14
15from twisted.internet import defer
16from twisted.python import log
17from twisted.words.protocols.jabber import jid, error
18from twisted.words.xish import domish
19
20from wokkel import disco, data_form, generic, shim
21from wokkel.compat import IQ
22from wokkel.subprotocols import IQHandlerMixin, XMPPHandler
23from wokkel.iwokkel import IPubSubClient, IPubSubService, IPubSubResource
24
25# Iq get and set XPath queries
26IQ_GET = '/iq[@type="get"]'
27IQ_SET = '/iq[@type="set"]'
28
29# Publish-subscribe namespaces
30NS_PUBSUB = 'http://jabber.org/protocol/pubsub'
31NS_PUBSUB_EVENT = NS_PUBSUB + '#event'
32NS_PUBSUB_ERRORS = NS_PUBSUB + '#errors'
33NS_PUBSUB_OWNER = NS_PUBSUB + "#owner"
34NS_PUBSUB_NODE_CONFIG = NS_PUBSUB + "#node_config"
35NS_PUBSUB_META_DATA = NS_PUBSUB + "#meta-data"
36NS_PUBSUB_SUBSCRIBE_OPTIONS = NS_PUBSUB + "#subscribe_options"
37
38# XPath to match pubsub requests
39PUBSUB_REQUEST = '/iq[@type="get" or @type="set"]/' + \
40                    'pubsub[@xmlns="' + NS_PUBSUB + '" or ' + \
41                           '@xmlns="' + NS_PUBSUB_OWNER + '"]'
42
43class SubscriptionPending(Exception):
44    """
45    Raised when the requested subscription is pending acceptance.
46    """
47
48
49
50class SubscriptionUnconfigured(Exception):
51    """
52    Raised when the requested subscription needs to be configured before
53    becoming active.
54    """
55
56
57
58class PubSubError(error.StanzaError):
59    """
60    Exception with publish-subscribe specific condition.
61    """
62    def __init__(self, condition, pubsubCondition, feature=None, text=None):
63        appCondition = domish.Element((NS_PUBSUB_ERRORS, pubsubCondition))
64        if feature:
65            appCondition['feature'] = feature
66        error.StanzaError.__init__(self, condition,
67                                         text=text,
68                                         appCondition=appCondition)
69
70
71
72class BadRequest(error.StanzaError):
73    """
74    Bad request stanza error.
75    """
76    def __init__(self, pubsubCondition=None, text=None):
77        if pubsubCondition:
78            appCondition = domish.Element((NS_PUBSUB_ERRORS, pubsubCondition))
79        else:
80            appCondition = None
81        error.StanzaError.__init__(self, 'bad-request',
82                                         text=text,
83                                         appCondition=appCondition)
84
85
86
87class Unsupported(PubSubError):
88    def __init__(self, feature, text=None):
89        self.feature = feature
90        PubSubError.__init__(self, 'feature-not-implemented',
91                                   'unsupported',
92                                   feature,
93                                   text)
94
95    def __str__(self):
96        message = PubSubError.__str__(self)
97        message += ', feature %r' % self.feature
98        return message
99
100
101class Subscription(object):
102    """
103    A subscription to a node.
104
105    @ivar nodeIdentifier: The identifier of the node subscribed to.
106                          The root node is denoted by C{None}.
107    @ivar subscriber: The subscribing entity.
108    @ivar state: The subscription state. One of C{'subscribed'}, C{'pending'},
109                 C{'unconfigured'}.
110    @ivar options: Optional list of subscription options.
111    @type options: C{dict}.
112    """
113
114    def __init__(self, nodeIdentifier, subscriber, state, options=None):
115        self.nodeIdentifier = nodeIdentifier
116        self.subscriber = subscriber
117        self.state = state
118        self.options = options or {}
119
120
121
122class Item(domish.Element):
123    """
124    Publish subscribe item.
125
126    This behaves like an object providing L{domish.IElement}.
127
128    Item payload can be added using C{addChild} or C{addRawXml}, or using the
129    C{payload} keyword argument to C{__init__}.
130    """
131
132    def __init__(self, id=None, payload=None):
133        """
134        @param id: optional item identifier
135        @type id: L{unicode}
136        @param payload: optional item payload. Either as a domish element, or
137                        as serialized XML.
138        @type payload: object providing L{domish.IElement} or L{unicode}.
139        """
140
141        domish.Element.__init__(self, (NS_PUBSUB, 'item'))
142        if id is not None:
143            self['id'] = id
144        if payload is not None:
145            if isinstance(payload, basestring):
146                self.addRawXml(payload)
147            else:
148                self.addChild(payload)
149
150
151
152class PubSubRequest(generic.Stanza):
153    """
154    A publish-subscribe request.
155
156    The set of instance variables used depends on the type of request. If
157    a variable is not applicable or not passed in the request, its value is
158    C{None}.
159
160    @ivar verb: The type of publish-subscribe request. See L{_requestVerbMap}.
161    @type verb: C{str}.
162
163    @ivar affiliations: Affiliations to be modified.
164    @type affiliations: C{set}
165    @ivar items: The items to be published, as L{domish.Element}s.
166    @type items: C{list}
167    @ivar itemIdentifiers: Identifiers of the items to be retrieved or
168                           retracted.
169    @type itemIdentifiers: C{set}
170    @ivar maxItems: Maximum number of items to retrieve.
171    @type maxItems: C{int}.
172    @ivar nodeIdentifier: Identifier of the node the request is about.
173    @type nodeIdentifier: C{unicode}
174    @ivar nodeType: The type of node that should be created, or for which the
175                    configuration is retrieved. C{'leaf'} or C{'collection'}.
176    @type nodeType: C{str}
177    @ivar options: Configurations options for nodes, subscriptions and publish
178                   requests.
179    @type options: L{data_form.Form}
180    @ivar subscriber: The subscribing entity.
181    @type subscriber: L{JID}
182    @ivar subscriptionIdentifier: Identifier for a specific subscription.
183    @type subscriptionIdentifier: C{unicode}
184    @ivar subscriptions: Subscriptions to be modified, as a set of
185                         L{Subscription}.
186    @type subscriptions: C{set}
187    """
188
189    verb = None
190
191    affiliations = None
192    items = None
193    itemIdentifiers = None
194    maxItems = None
195    nodeIdentifier = None
196    nodeType = None
197    options = None
198    subscriber = None
199    subscriptionIdentifier = None
200    subscriptions = None
201
202    # Map request iq type and subelement name to request verb
203    _requestVerbMap = {
204        ('set', NS_PUBSUB, 'publish'): 'publish',
205        ('set', NS_PUBSUB, 'subscribe'): 'subscribe',
206        ('set', NS_PUBSUB, 'unsubscribe'): 'unsubscribe',
207        ('get', NS_PUBSUB, 'options'): 'optionsGet',
208        ('set', NS_PUBSUB, 'options'): 'optionsSet',
209        ('get', NS_PUBSUB, 'subscriptions'): 'subscriptions',
210        ('get', NS_PUBSUB, 'affiliations'): 'affiliations',
211        ('set', NS_PUBSUB, 'create'): 'create',
212        ('get', NS_PUBSUB_OWNER, 'default'): 'default',
213        ('get', NS_PUBSUB_OWNER, 'configure'): 'configureGet',
214        ('set', NS_PUBSUB_OWNER, 'configure'): 'configureSet',
215        ('get', NS_PUBSUB, 'items'): 'items',
216        ('set', NS_PUBSUB, 'retract'): 'retract',
217        ('set', NS_PUBSUB_OWNER, 'purge'): 'purge',
218        ('set', NS_PUBSUB_OWNER, 'delete'): 'delete',
219        ('get', NS_PUBSUB_OWNER, 'affiliations'): 'affiliationsGet',
220        ('set', NS_PUBSUB_OWNER, 'affiliations'): 'affiliationsSet',
221        ('get', NS_PUBSUB_OWNER, 'subscriptions'): 'subscriptionsGet',
222        ('set', NS_PUBSUB_OWNER, 'subscriptions'): 'subscriptionsSet',
223    }
224
225    # Map request verb to request iq type and subelement name
226    _verbRequestMap = dict(((v, k) for k, v in _requestVerbMap.iteritems()))
227
228    # Map request verb to parameter handler names
229    _parameters = {
230        'publish': ['node', 'items'],
231        'subscribe': ['nodeOrEmpty', 'jid'],
232        'unsubscribe': ['nodeOrEmpty', 'jid'],
233        'optionsGet': ['nodeOrEmpty', 'jid'],
234        'optionsSet': ['nodeOrEmpty', 'jid', 'options'],
235        'subscriptions': [],
236        'affiliations': [],
237        'create': ['nodeOrNone'],
238        'default': ['default'],
239        'configureGet': ['nodeOrEmpty'],
240        'configureSet': ['nodeOrEmpty', 'configure'],
241        'items': ['node', 'maxItems', 'itemIdentifiers'],
242        'retract': ['node', 'itemIdentifiers'],
243        'purge': ['node'],
244        'delete': ['node'],
245        'affiliationsGet': ['nodeOrEmpty'],
246        'affiliationsSet': [],
247        'subscriptionsGet': ['nodeOrEmpty'],
248        'subscriptionsSet': [],
249    }
250
251    def __init__(self, verb=None):
252        self.verb = verb
253
254
255    def _parse_node(self, verbElement):
256        """
257        Parse the required node identifier out of the verbElement.
258        """
259        try:
260            self.nodeIdentifier = verbElement["node"]
261        except KeyError:
262            raise BadRequest('nodeid-required')
263
264
265    def _render_node(self, verbElement):
266        """
267        Render the required node identifier on the verbElement.
268        """
269        if not self.nodeIdentifier:
270            raise Exception("Node identifier is required")
271
272        verbElement['node'] = self.nodeIdentifier
273
274
275    def _parse_nodeOrEmpty(self, verbElement):
276        """
277        Parse the node identifier out of the verbElement. May be empty.
278        """
279        self.nodeIdentifier = verbElement.getAttribute("node", '')
280
281
282    def _render_nodeOrEmpty(self, verbElement):
283        """
284        Render the node identifier on the verbElement. May be empty.
285        """
286        if self.nodeIdentifier:
287            verbElement['node'] = self.nodeIdentifier
288
289
290    def _parse_nodeOrNone(self, verbElement):
291        """
292        Parse the optional node identifier out of the verbElement.
293        """
294        self.nodeIdentifier = verbElement.getAttribute("node")
295
296
297    def _render_nodeOrNone(self, verbElement):
298        """
299        Render the optional node identifier on the verbElement.
300        """
301        if self.nodeIdentifier:
302            verbElement['node'] = self.nodeIdentifier
303
304
305    def _parse_items(self, verbElement):
306        """
307        Parse items out of the verbElement for publish requests.
308        """
309        self.items = []
310        for element in verbElement.elements():
311            if element.uri == NS_PUBSUB and element.name == 'item':
312                self.items.append(element)
313
314
315    def _render_items(self, verbElement):
316        """
317        Render items into the verbElement for publish requests.
318        """
319        if self.items:
320            for item in self.items:
321                verbElement.addChild(item)
322
323
324    def _parse_jid(self, verbElement):
325        """
326        Parse subscriber out of the verbElement for un-/subscribe requests.
327        """
328        try:
329            self.subscriber = jid.internJID(verbElement["jid"])
330        except KeyError:
331            raise BadRequest('jid-required')
332
333
334    def _render_jid(self, verbElement):
335        """
336        Render subscriber into the verbElement for un-/subscribe requests.
337        """
338        verbElement['jid'] = self.subscriber.full()
339
340
341    def _parse_default(self, verbElement):
342        """
343        Parse node type out of a request for the default node configuration.
344        """
345        form = data_form.findForm(verbElement, NS_PUBSUB_NODE_CONFIG)
346        if form and form.formType == 'submit':
347            values = form.getValues()
348            self.nodeType = values.get('pubsub#node_type', 'leaf')
349        else:
350            self.nodeType = 'leaf'
351
352
353    def _parse_configure(self, verbElement):
354        """
355        Parse options out of a request for setting the node configuration.
356        """
357        form = data_form.findForm(verbElement, NS_PUBSUB_NODE_CONFIG)
358        if form:
359            if form.formType == 'submit':
360                self.options = form.getValues()
361            elif form.formType == 'cancel':
362                self.options = {}
363            else:
364                raise BadRequest(text="Unexpected form type %r" % form.formType)
365        else:
366            raise BadRequest(text="Missing configuration form")
367
368
369
370    def _parse_itemIdentifiers(self, verbElement):
371        """
372        Parse item identifiers out of items and retract requests.
373        """
374        self.itemIdentifiers = []
375        for element in verbElement.elements():
376            if element.uri == NS_PUBSUB and element.name == 'item':
377                try:
378                    self.itemIdentifiers.append(element["id"])
379                except KeyError:
380                    raise BadRequest()
381
382
383    def _render_itemIdentifiers(self, verbElement):
384        """
385        Render item identifiers into items and retract requests.
386        """
387        if self.itemIdentifiers:
388            for itemIdentifier in self.itemIdentifiers:
389                item = verbElement.addElement('item')
390                item['id'] = itemIdentifier
391
392
393    def _parse_maxItems(self, verbElement):
394        """
395        Parse maximum items out of an items request.
396        """
397        value = verbElement.getAttribute('max_items')
398
399        if value:
400            try:
401                self.maxItems = int(value)
402            except ValueError:
403                raise BadRequest(text="Field max_items requires a positive " +
404                                      "integer value")
405
406
407    def _render_maxItems(self, verbElement):
408        """
409        Parse maximum items into an items request.
410        """
411        if self.maxItems:
412            verbElement['max_items'] = unicode(self.maxItems)
413
414
415    def _parse_options(self, verbElement):
416        form = data_form.findForm(verbElement, NS_PUBSUB_SUBSCRIBE_OPTIONS)
417        if form:
418            if form.formType == 'submit':
419                self.options = form.getValues()
420            elif form.formType == 'cancel':
421                self.options = {}
422            else:
423                raise BadRequest(text="Unexpected form type %r" % form.formType)
424        else:
425            raise BadRequest(text="Missing options form")
426
427    def parseElement(self, element):
428        """
429        Parse the publish-subscribe verb and parameters out of a request.
430        """
431        generic.Stanza.parseElement(self, element)
432
433        for child in element.pubsub.elements():
434            key = (self.stanzaType, child.uri, child.name)
435            try:
436                verb = self._requestVerbMap[key]
437            except KeyError:
438                continue
439            else:
440                self.verb = verb
441                break
442
443        if not self.verb:
444            raise NotImplementedError()
445
446        for parameter in self._parameters[verb]:
447            getattr(self, '_parse_%s' % parameter)(child)
448
449
450    def send(self, xs):
451        """
452        Send this request to its recipient.
453
454        This renders all of the relevant parameters for this specific
455        requests into an L{IQ}, and invoke its C{send} method.
456        This returns a deferred that fires upon reception of a response. See
457        L{IQ} for details.
458
459        @param xs: The XML stream to send the request on.
460        @type xs: L{xmlstream.XmlStream}
461        @rtype: L{defer.Deferred}.
462        """
463
464        try:
465            (self.stanzaType,
466             childURI,
467             childName) = self._verbRequestMap[self.verb]
468        except KeyError:
469            raise NotImplementedError()
470
471        iq = IQ(xs, self.stanzaType)
472        iq.addElement((childURI, 'pubsub'))
473        verbElement = iq.pubsub.addElement(childName)
474
475        if self.sender:
476            iq['from'] = self.sender.full()
477        if self.recipient:
478            iq['to'] = self.recipient.full()
479
480        for parameter in self._parameters[self.verb]:
481            getattr(self, '_render_%s' % parameter)(verbElement)
482
483        return iq.send()
484
485
486
487class PubSubEvent(object):
488    """
489    A publish subscribe event.
490
491    @param sender: The entity from which the notification was received.
492    @type sender: L{jid.JID}
493    @param recipient: The entity to which the notification was sent.
494    @type recipient: L{wokkel.pubsub.ItemsEvent}
495    @param nodeIdentifier: Identifier of the node the event pertains to.
496    @type nodeIdentifier: C{unicode}
497    @param headers: SHIM headers, see L{wokkel.shim.extractHeaders}.
498    @type headers: L{dict}
499    """
500
501    def __init__(self, sender, recipient, nodeIdentifier, headers):
502        self.sender = sender
503        self.recipient = recipient
504        self.nodeIdentifier = nodeIdentifier
505        self.headers = headers
506
507
508
509class ItemsEvent(PubSubEvent):
510    """
511    A publish-subscribe event that signifies new, updated and retracted items.
512
513    @param items: List of received items as domish elements.
514    @type items: C{list} of L{domish.Element}
515    """
516
517    def __init__(self, sender, recipient, nodeIdentifier, items, headers):
518        PubSubEvent.__init__(self, sender, recipient, nodeIdentifier, headers)
519        self.items = items
520
521
522
523class DeleteEvent(PubSubEvent):
524    """
525    A publish-subscribe event that signifies the deletion of a node.
526    """
527
528    redirectURI = None
529
530
531
532class PurgeEvent(PubSubEvent):
533    """
534    A publish-subscribe event that signifies the purging of a node.
535    """
536
537
538
539class PubSubClient(XMPPHandler):
540    """
541    Publish subscribe client protocol.
542    """
543
544    implements(IPubSubClient)
545
546    def connectionInitialized(self):
547        self.xmlstream.addObserver('/message/event[@xmlns="%s"]' %
548                                   NS_PUBSUB_EVENT, self._onEvent)
549
550
551    def _onEvent(self, message):
552        try:
553            sender = jid.JID(message["from"])
554            recipient = jid.JID(message["to"])
555        except KeyError:
556            return
557
558        actionElement = None
559        for element in message.event.elements():
560            if element.uri == NS_PUBSUB_EVENT:
561                actionElement = element
562
563        if not actionElement:
564            return
565
566        eventHandler = getattr(self, "_onEvent_%s" % actionElement.name, None)
567
568        if eventHandler:
569            headers = shim.extractHeaders(message)
570            eventHandler(sender, recipient, actionElement, headers)
571            message.handled = True
572
573
574    def _onEvent_items(self, sender, recipient, action, headers):
575        nodeIdentifier = action["node"]
576
577        items = [element for element in action.elements()
578                         if element.name in ('item', 'retract')]
579
580        event = ItemsEvent(sender, recipient, nodeIdentifier, items, headers)
581        self.itemsReceived(event)
582
583
584    def _onEvent_delete(self, sender, recipient, action, headers):
585        nodeIdentifier = action["node"]
586        event = DeleteEvent(sender, recipient, nodeIdentifier, headers)
587        if action.redirect:
588            event.redirectURI = action.redirect.getAttribute('uri')
589        self.deleteReceived(event)
590
591
592    def _onEvent_purge(self, sender, recipient, action, headers):
593        nodeIdentifier = action["node"]
594        event = PurgeEvent(sender, recipient, nodeIdentifier, headers)
595        self.purgeReceived(event)
596
597
598    def itemsReceived(self, event):
599        pass
600
601
602    def deleteReceived(self, event):
603        pass
604
605
606    def purgeReceived(self, event):
607        pass
608
609
610    def createNode(self, service, nodeIdentifier=None, sender=None):
611        """
612        Create a publish subscribe node.
613
614        @param service: The publish subscribe service to create the node at.
615        @type service: L{JID}
616        @param nodeIdentifier: Optional suggestion for the id of the node.
617        @type nodeIdentifier: C{unicode}
618        """
619        request = PubSubRequest('create')
620        request.recipient = service
621        request.nodeIdentifier = nodeIdentifier
622        request.sender = sender
623
624        def cb(iq):
625            try:
626                new_node = iq.pubsub.create["node"]
627            except AttributeError:
628                # the suggested node identifier was accepted
629                new_node = nodeIdentifier
630            return new_node
631
632        d = request.send(self.xmlstream)
633        d.addCallback(cb)
634        return d
635
636
637    def deleteNode(self, service, nodeIdentifier, sender=None):
638        """
639        Delete a publish subscribe node.
640
641        @param service: The publish subscribe service to delete the node from.
642        @type service: L{JID}
643        @param nodeIdentifier: The identifier of the node.
644        @type nodeIdentifier: C{unicode}
645        """
646        request = PubSubRequest('delete')
647        request.recipient = service
648        request.nodeIdentifier = nodeIdentifier
649        request.sender = sender
650        return request.send(self.xmlstream)
651
652
653    def subscribe(self, service, nodeIdentifier, subscriber, sender=None):
654        """
655        Subscribe to a publish subscribe node.
656
657        @param service: The publish subscribe service that keeps the node.
658        @type service: L{JID}
659        @param nodeIdentifier: The identifier of the node.
660        @type nodeIdentifier: C{unicode}
661        @param subscriber: The entity to subscribe to the node. This entity
662                           will get notifications of new published items.
663        @type subscriber: L{JID}
664        """
665        request = PubSubRequest('subscribe')
666        request.recipient = service
667        request.nodeIdentifier = nodeIdentifier
668        request.subscriber = subscriber
669        request.sender = sender
670
671        def cb(iq):
672            subscription = iq.pubsub.subscription["subscription"]
673
674            if subscription == 'pending':
675                raise SubscriptionPending
676            elif subscription == 'unconfigured':
677                raise SubscriptionUnconfigured
678            else:
679                # we assume subscription == 'subscribed'
680                # any other value would be invalid, but that should have
681                # yielded a stanza error.
682                return None
683
684        d = request.send(self.xmlstream)
685        d.addCallback(cb)
686        return d
687
688
689    def unsubscribe(self, service, nodeIdentifier, subscriber, sender=None):
690        """
691        Unsubscribe from a publish subscribe node.
692
693        @param service: The publish subscribe service that keeps the node.
694        @type service: L{JID}
695        @param nodeIdentifier: The identifier of the node.
696        @type nodeIdentifier: C{unicode}
697        @param subscriber: The entity to unsubscribe from the node.
698        @type subscriber: L{JID}
699        """
700        request = PubSubRequest('unsubscribe')
701        request.recipient = service
702        request.nodeIdentifier = nodeIdentifier
703        request.subscriber = subscriber
704        request.sender = sender
705        return request.send(self.xmlstream)
706
707
708    def publish(self, service, nodeIdentifier, items=None, sender=None):
709        """
710        Publish to a publish subscribe node.
711
712        @param service: The publish subscribe service that keeps the node.
713        @type service: L{JID}
714        @param nodeIdentifier: The identifier of the node.
715        @type nodeIdentifier: C{unicode}
716        @param items: Optional list of L{Item}s to publish.
717        @type items: C{list}
718        """
719        request = PubSubRequest('publish')
720        request.recipient = service
721        request.nodeIdentifier = nodeIdentifier
722        request.items = items
723        request.sender = sender
724        return request.send(self.xmlstream)
725
726
727    def items(self, service, nodeIdentifier, maxItems=None, sender=None):
728        """
729        Retrieve previously published items from a publish subscribe node.
730
731        @param service: The publish subscribe service that keeps the node.
732        @type service: L{JID}
733        @param nodeIdentifier: The identifier of the node.
734        @type nodeIdentifier: C{unicode}
735        @param maxItems: Optional limit on the number of retrieved items.
736        @type maxItems: C{int}
737        """
738        request = PubSubRequest('items')
739        request.recipient = service
740        request.nodeIdentifier = nodeIdentifier
741        if maxItems:
742            request.maxItems = str(int(maxItems))
743        request.sender = sender
744
745        def cb(iq):
746            items = []
747            for element in iq.pubsub.items.elements():
748                if element.uri == NS_PUBSUB and element.name == 'item':
749                    items.append(element)
750            return items
751
752        d = request.send(self.xmlstream)
753        d.addCallback(cb)
754        return d
755
756
757
758class PubSubService(XMPPHandler, IQHandlerMixin):
759    """
760    Protocol implementation for a XMPP Publish Subscribe Service.
761
762    The word Service here is used as taken from the Publish Subscribe
763    specification. It is the party responsible for keeping nodes and their
764    subscriptions, and sending out notifications.
765
766    Methods from the L{IPubSubService} interface that are called as
767    a result of an XMPP request may raise exceptions. Alternatively the
768    deferred returned by these methods may have their errback called. These are
769    handled as follows:
770
771     - If the exception is an instance of L{error.StanzaError}, an error
772       response iq is returned.
773     - Any other exception is reported using L{log.msg}. An error response
774       with the condition C{internal-server-error} is returned.
775
776    The default implementation of said methods raises an L{Unsupported}
777    exception and are meant to be overridden.
778
779    @ivar discoIdentity: Service discovery identity as a dictionary with
780                         keys C{'category'}, C{'type'} and C{'name'}.
781    @ivar pubSubFeatures: List of supported publish-subscribe features for
782                          service discovery, as C{str}.
783    @type pubSubFeatures: C{list} or C{None}
784    """
785
786    implements(IPubSubService)
787
788    iqHandlers = {
789            '/*': '_onPubSubRequest',
790            }
791
792    _legacyHandlers = {
793        'publish': ('publish', ['sender', 'recipient',
794                                'nodeIdentifier', 'items']),
795        'subscribe': ('subscribe', ['sender', 'recipient',
796                                    'nodeIdentifier', 'subscriber']),
797        'unsubscribe': ('unsubscribe', ['sender', 'recipient',
798                                        'nodeIdentifier', 'subscriber']),
799        'subscriptions': ('subscriptions', ['sender', 'recipient']),
800        'affiliations': ('affiliations', ['sender', 'recipient']),
801        'create': ('create', ['sender', 'recipient', 'nodeIdentifier']),
802        'getConfigurationOptions': ('getConfigurationOptions', []),
803        'default': ('getDefaultConfiguration',
804                    ['sender', 'recipient', 'nodeType']),
805        'configureGet': ('getConfiguration', ['sender', 'recipient',
806                                              'nodeIdentifier']),
807        'configureSet': ('setConfiguration', ['sender', 'recipient',
808                                              'nodeIdentifier', 'options']),
809        'items': ('items', ['sender', 'recipient', 'nodeIdentifier',
810                            'maxItems', 'itemIdentifiers']),
811        'retract': ('retract', ['sender', 'recipient', 'nodeIdentifier',
812                                'itemIdentifiers']),
813        'purge': ('purge', ['sender', 'recipient', 'nodeIdentifier']),
814        'delete': ('delete', ['sender', 'recipient', 'nodeIdentifier']),
815    }
816
817    hideNodes = False
818
819    def __init__(self, resource=None):
820        self.resource = resource
821        self.discoIdentity = {'category': 'pubsub',
822                              'type': 'generic',
823                              'name': 'Generic Publish-Subscribe Service'}
824
825        self.pubSubFeatures = []
826
827
828    def connectionMade(self):
829        self.xmlstream.addObserver(PUBSUB_REQUEST, self.handleRequest)
830
831
832    def getDiscoInfo(self, requestor, target, nodeIdentifier):
833        def toInfo(nodeInfo, info):
834            if not nodeInfo:
835                return info
836
837            (nodeType, metaData) = nodeInfo['type'], nodeInfo['meta-data']
838            info.append(disco.DiscoIdentity('pubsub', nodeType))
839            if metaData:
840                form = data_form.Form(formType="result",
841                                      formNamespace=NS_PUBSUB_META_DATA)
842                form.addField(
843                        data_form.Field(
844                            var='pubsub#node_type',
845                            value=nodeType,
846                            label='The type of node (collection or leaf)'
847                        )
848                )
849
850                for metaDatum in metaData:
851                    form.addField(data_form.Field.fromDict(metaDatum))
852
853                info.append(form)
854
855            return info
856
857        info = []
858
859        request = PubSubRequest('discoInfo')
860
861        if self.resource is not None:
862            resource = self.resource.locateResource(request)
863            identity = resource.discoIdentity
864            features = resource.features
865            getInfo = resource.getInfo
866        else:
867            category, idType, name = self.discoIdentity
868            identity = disco.DiscoIdentity(category, idType, name)
869            features = self.pubSubFeatures
870            getInfo = self.getNodeInfo
871
872        if not nodeIdentifier:
873            info.append(identity)
874            info.append(disco.DiscoFeature(disco.NS_DISCO_ITEMS))
875            info.extend([disco.DiscoFeature("%s#%s" % (NS_PUBSUB, feature))
876                         for feature in features])
877
878        d = getInfo(requestor, target, nodeIdentifier or '')
879        d.addCallback(toInfo, info)
880        d.addErrback(log.err)
881        return d
882
883
884    def getDiscoItems(self, requestor, target, nodeIdentifier):
885        if self.hideNodes:
886            d = defer.succeed([])
887        elif self.resource is not None:
888            request = PubSubRequest('discoInfo')
889            resource = self.resource.locateResource(request)
890            d = resource.getNodes(requestor, target, nodeIdentifier)
891        elif nodeIdentifier:
892            d = self.getNodes(requestor, target)
893        else:
894            d = defer.succeed([])
895           
896
897
898        d.addCallback(lambda nodes: [disco.DiscoItem(target, node)
899                                     for node in nodes])
900        return d
901
902
903    def _onPubSubRequest(self, iq):
904        request = PubSubRequest.fromElement(iq)
905
906        if self.resource is not None:
907            resource = self.resource.locateResource(request)
908        else:
909            resource = self
910
911        # Preprocess the request, knowing the handling resource
912        try:
913            preProcessor = getattr(self, '_preProcess_%s' % request.verb)
914        except AttributeError:
915            pass
916        else:
917            request = preProcessor(resource, request)
918            if request is None:
919                return defer.succeed(None)
920
921        # Process the request itself,
922        if resource is not self:
923            try:
924                handler = getattr(resource, request.verb)
925            except AttributeError:
926                # fix lookup feature
927                text = "Request verb: %s" % request.verb
928                return defer.fail(Unsupported('', text))
929
930            d = handler(request)
931        else:
932            handlerName, argNames = self._legacyHandlers[request.verb]
933            handler = getattr(self, handlerName)
934            args = [getattr(request, arg) for arg in argNames]
935            d = handler(*args)
936
937        # If needed, translate the result into a response
938        try:
939            cb = getattr(self, '_toResponse_%s' % request.verb)
940        except AttributeError:
941            pass
942        else:
943            d.addCallback(cb, resource, request)
944
945        return d
946
947
948    def _toResponse_subscribe(self, result, resource, request):
949        response = domish.Element((NS_PUBSUB, "pubsub"))
950        subscription = response.addElement("subscription")
951        if result.nodeIdentifier:
952            subscription["node"] = result.nodeIdentifier
953        subscription["jid"] = result.subscriber.full()
954        subscription["subscription"] = result.state
955        return response
956
957
958    def _toResponse_subscriptions(self, result, resource, request):
959        response = domish.Element((NS_PUBSUB, 'pubsub'))
960        subscriptions = response.addElement('subscriptions')
961        for subscription in result:
962            item = subscriptions.addElement('subscription')
963            item['node'] = subscription.nodeIdentifier
964            item['jid'] = subscription.subscriber.full()
965            item['subscription'] = subscription.state
966        return response
967
968
969    def _toResponse_affiliations(self, result, resource, request):
970        response = domish.Element((NS_PUBSUB, 'pubsub'))
971        affiliations = response.addElement('affiliations')
972
973        for nodeIdentifier, affiliation in result:
974            item = affiliations.addElement('affiliation')
975            item['node'] = nodeIdentifier
976            item['affiliation'] = affiliation
977
978        return response
979
980
981    def _toResponse_create(self, result, resource, request):
982        if not request.nodeIdentifier or request.nodeIdentifier != result:
983            response = domish.Element((NS_PUBSUB, 'pubsub'))
984            create = response.addElement('create')
985            create['node'] = result
986            return response
987        else:
988            return None
989
990
991    def _formFromConfiguration(self, resource, values):
992        fieldDefs = resource.getConfigurationOptions()
993        form = data_form.Form(formType="form",
994                              formNamespace=NS_PUBSUB_NODE_CONFIG)
995        form.makeFields(values, fieldDefs)
996        return form
997
998
999    def _checkConfiguration(self, resource, values):
1000        options = resource.getConfigurationOptions()
1001        processedValues = {}
1002
1003        for key, value in values.iteritems():
1004            if key not in options:
1005                continue
1006
1007            option = {'var': key}
1008            option.update(options[key])
1009            field = data_form.Field.fromDict(option)
1010            if isinstance(value, list):
1011                field.values = value
1012            else:
1013                field.value = value
1014            field.typeCheck()
1015
1016            if isinstance(value, list):
1017                processedValues[key] = field.values
1018            else:
1019                processedValues[key] = field.value
1020
1021        return processedValues
1022
1023
1024    def _preProcess_default(self, resource, request):
1025        if request.nodeType not in ('leaf', 'collection'):
1026            raise error.StanzaError('not-acceptable')
1027        else:
1028            return request
1029
1030
1031    def _toResponse_default(self, options, resource, request):
1032        response = domish.Element((NS_PUBSUB_OWNER, "pubsub"))
1033        default = response.addElement("default")
1034        form = self._formFromConfiguration(resource, options)
1035        default.addChild(form.toElement())
1036        return response
1037
1038
1039    def _toResponse_configureGet(self, options, resource, request):
1040        response = domish.Element((NS_PUBSUB_OWNER, "pubsub"))
1041        configure = response.addElement("configure")
1042        form = self._formFromConfiguration(resource, options)
1043        configure.addChild(form.toElement())
1044
1045        if request.nodeIdentifier:
1046            configure["node"] = request.nodeIdentifier
1047
1048        return response
1049
1050
1051    def _preProcess_configureSet(self, resource, request):
1052        if request.options:
1053            request.options = self._checkConfiguration(resource,
1054                                                       request.options)
1055            return request
1056        else:
1057            return None
1058
1059
1060    def _toResponse_items(self, result, resource, request):
1061        response = domish.Element((NS_PUBSUB, 'pubsub'))
1062        items = response.addElement('items')
1063        items["node"] = request.nodeIdentifier
1064
1065        for item in result:
1066            items.addChild(item)
1067
1068        return response
1069
1070
1071    def _createNotification(self, eventType, service, nodeIdentifier,
1072                                  subscriber, subscriptions=None):
1073        headers = []
1074
1075        if subscriptions:
1076            for subscription in subscriptions:
1077                if nodeIdentifier != subscription.nodeIdentifier:
1078                    headers.append(('Collection', subscription.nodeIdentifier))
1079
1080        message = domish.Element((None, "message"))
1081        message["from"] = service.full()
1082        message["to"] = subscriber.full()
1083        event = message.addElement((NS_PUBSUB_EVENT, "event"))
1084
1085        element = event.addElement(eventType)
1086        element["node"] = nodeIdentifier
1087
1088        if headers:
1089            message.addChild(shim.Headers(headers))
1090
1091        return message
1092
1093    # public methods
1094
1095    def notifyPublish(self, service, nodeIdentifier, notifications):
1096        for subscriber, subscriptions, items in notifications:
1097            message = self._createNotification('items', service,
1098                                               nodeIdentifier, subscriber,
1099                                               subscriptions)
1100            message.event.items.children = items
1101            self.send(message)
1102
1103
1104    def notifyDelete(self, service, nodeIdentifier, subscribers,
1105                           redirectURI=None):
1106        for subscriber in subscribers:
1107            message = self._createNotification('delete', service,
1108                                               nodeIdentifier,
1109                                               subscriber)
1110            if redirectURI:
1111                redirect = message.event.delete.addElement('redirect')
1112                redirect['uri'] = redirectURI
1113            self.send(message)
1114
1115
1116    def getNodeInfo(self, requestor, service, nodeIdentifier):
1117        return None
1118
1119
1120    def getNodes(self, requestor, service):
1121        return []
1122
1123
1124    def publish(self, requestor, service, nodeIdentifier, items):
1125        raise Unsupported('publish')
1126
1127
1128    def subscribe(self, requestor, service, nodeIdentifier, subscriber):
1129        raise Unsupported('subscribe')
1130
1131
1132    def unsubscribe(self, requestor, service, nodeIdentifier, subscriber):
1133        raise Unsupported('subscribe')
1134
1135
1136    def subscriptions(self, requestor, service):
1137        raise Unsupported('retrieve-subscriptions')
1138
1139
1140    def affiliations(self, requestor, service):
1141        raise Unsupported('retrieve-affiliations')
1142
1143
1144    def create(self, requestor, service, nodeIdentifier):
1145        raise Unsupported('create-nodes')
1146
1147
1148    def getConfigurationOptions(self):
1149        return {}
1150
1151
1152    def getDefaultConfiguration(self, requestor, service, nodeType):
1153        raise Unsupported('retrieve-default')
1154
1155
1156    def getConfiguration(self, requestor, service, nodeIdentifier):
1157        raise Unsupported('config-node')
1158
1159
1160    def setConfiguration(self, requestor, service, nodeIdentifier, options):
1161        raise Unsupported('config-node')
1162
1163
1164    def items(self, requestor, service, nodeIdentifier, maxItems,
1165                    itemIdentifiers):
1166        raise Unsupported('retrieve-items')
1167
1168
1169    def retract(self, requestor, service, nodeIdentifier, itemIdentifiers):
1170        raise Unsupported('retract-items')
1171
1172
1173    def purge(self, requestor, service, nodeIdentifier):
1174        raise Unsupported('purge-nodes')
1175
1176
1177    def delete(self, requestor, service, nodeIdentifier):
1178        raise Unsupported('delete-nodes')
1179
1180
1181
1182class PubSubResource(object):
1183
1184    implements(IPubSubResource)
1185
1186    features = []
1187    discoIdentity = disco.DiscoIdentity('pubsub',
1188                                        'service',
1189                                        'Publish-Subscribe Service')
1190
1191
1192    def locateResource(self, request):
1193        return self
1194
1195
1196    def getInfo(self, requestor, service, nodeIdentifier):
1197        return defer.succeed(None)
1198
1199
1200    def getNodes(self, requestor, service, nodeIdentifier):
1201        return defer.succeed([])
1202
1203
1204    def getConfigurationOptions(self):
1205        return {}
1206
1207
1208    def publish(self, request):
1209        return defer.fail(Unsupported('publish'))
1210
1211
1212    def subscribe(self, request):
1213        return defer.fail(Unsupported('subscribe'))
1214
1215
1216    def unsubscribe(self, request):
1217        return defer.fail(Unsupported('subscribe'))
1218
1219
1220    def subscriptions(self, request):
1221        return defer.fail(Unsupported('retrieve-subscriptions'))
1222
1223
1224    def affiliations(self, request):
1225        return defer.fail(Unsupported('retrieve-affiliations'))
1226
1227
1228    def create(self, request):
1229        return defer.fail(Unsupported('create-nodes'))
1230
1231
1232    def default(self, request):
1233        return defer.fail(Unsupported('retrieve-default'))
1234
1235
1236    def configureGet(self, request):
1237        return defer.fail(Unsupported('config-node'))
1238
1239
1240    def configureSet(self, request):
1241        return defer.fail(Unsupported('config-node'))
1242
1243
1244    def items(self, request):
1245        return defer.fail(Unsupported('retrieve-items'))
1246
1247
1248    def retract(self, request):
1249        return defer.fail(Unsupported('retract-items'))
1250
1251
1252    def purge(self, request):
1253        return defer.fail(Unsupported('purge-nodes'))
1254
1255
1256    def delete(self, request):
1257        return defer.fail(Unsupported('delete-nodes'))
1258
1259
1260    def affiliationsGet(self, request):
1261        return defer.fail(Unsupported('modify-affiliations'))
1262
1263
1264    def affiliationsSet(self, request):
1265        return defer.fail(Unsupported('modify-affiliations'))
1266
1267
1268    def subscriptionsGet(self, request):
1269        return defer.fail(Unsupported('manage-subscriptions'))
1270
1271
1272    def subscriptionsSet(self, request):
1273        return defer.fail(Unsupported('manage-subscriptions'))
Note: See TracBrowser for help on using the repository browser.