source: wokkel/pubsub.py @ 43:0a525d09169d

Last change on this file since 43:0a525d09169d was 43:0a525d09169d, checked in by Ralph Meijer <ralphm@…>, 13 years ago

Add support for sending and receiving node delete notifications with redirect.

File size: 33.3 KB
Line 
1# -*- test-case-name: wokkel.test.test_pubsub -*-
2#
3# Copyright (c) 2003-2008 Ralph Meijer
4# See LICENSE for details.
5
6"""
7XMPP publish-subscribe protocol.
8
9This protocol is specified in
10U{XEP-0060<http://www.xmpp.org/extensions/xep-0060.html>}.
11"""
12
13from zope.interface import implements
14
15from twisted.internet import defer
16from twisted.words.protocols.jabber import jid, error, xmlstream
17from twisted.words.xish import domish
18
19from wokkel import disco, data_form, shim
20from wokkel.subprotocols import IQHandlerMixin, XMPPHandler
21from wokkel.iwokkel import IPubSubClient, IPubSubService
22
23# Iq get and set XPath queries
24IQ_GET = '/iq[@type="get"]'
25IQ_SET = '/iq[@type="set"]'
26
27# Publish-subscribe namespaces
28NS_PUBSUB = 'http://jabber.org/protocol/pubsub'
29NS_PUBSUB_EVENT = NS_PUBSUB + '#event'
30NS_PUBSUB_ERRORS = NS_PUBSUB + '#errors'
31NS_PUBSUB_OWNER = NS_PUBSUB + "#owner"
32NS_PUBSUB_NODE_CONFIG = NS_PUBSUB + "#node_config"
33NS_PUBSUB_META_DATA = NS_PUBSUB + "#meta-data"
34
35# In publish-subscribe namespace XPath query selector.
36IN_NS_PUBSUB = '[@xmlns="' + NS_PUBSUB + '"]'
37IN_NS_PUBSUB_OWNER = '[@xmlns="' + NS_PUBSUB_OWNER + '"]'
38
39# Publish-subscribe XPath queries
40PUBSUB_ELEMENT = '/pubsub' + IN_NS_PUBSUB
41PUBSUB_OWNER_ELEMENT = '/pubsub' + IN_NS_PUBSUB_OWNER
42PUBSUB_GET = IQ_GET + PUBSUB_ELEMENT
43PUBSUB_SET = IQ_SET + PUBSUB_ELEMENT
44PUBSUB_OWNER_GET = IQ_GET + PUBSUB_OWNER_ELEMENT
45PUBSUB_OWNER_SET = IQ_SET + PUBSUB_OWNER_ELEMENT
46
47# Publish-subscribe command XPath queries
48PUBSUB_PUBLISH = PUBSUB_SET + '/publish' + IN_NS_PUBSUB
49PUBSUB_CREATE = PUBSUB_SET + '/create' + IN_NS_PUBSUB
50PUBSUB_SUBSCRIBE = PUBSUB_SET + '/subscribe' + IN_NS_PUBSUB
51PUBSUB_UNSUBSCRIBE = PUBSUB_SET + '/unsubscribe' + IN_NS_PUBSUB
52PUBSUB_OPTIONS_GET = PUBSUB_GET + '/options' + IN_NS_PUBSUB
53PUBSUB_OPTIONS_SET = PUBSUB_SET + '/options' + IN_NS_PUBSUB
54PUBSUB_DEFAULT = PUBSUB_OWNER_GET + '/default' + IN_NS_PUBSUB_OWNER
55PUBSUB_CONFIGURE_GET = PUBSUB_OWNER_GET + '/configure' + IN_NS_PUBSUB_OWNER
56PUBSUB_CONFIGURE_SET = PUBSUB_OWNER_SET + '/configure' + IN_NS_PUBSUB_OWNER
57PUBSUB_SUBSCRIPTIONS = PUBSUB_GET + '/subscriptions' + IN_NS_PUBSUB
58PUBSUB_AFFILIATIONS = PUBSUB_GET + '/affiliations' + IN_NS_PUBSUB
59PUBSUB_AFFILIATIONS_GET = PUBSUB_OWNER_GET + '/affiliations' + \
60                          IN_NS_PUBSUB_OWNER
61PUBSUB_AFFILIATIONS_SET = PUBSUB_OWNER_SET + '/affiliations' + \
62                          IN_NS_PUBSUB_OWNER
63PUBSUB_SUBSCRIPTIONS_GET = PUBSUB_OWNER_GET + '/subscriptions' + \
64                          IN_NS_PUBSUB_OWNER
65PUBSUB_SUBSCRIPTIONS_SET = PUBSUB_OWNER_SET + '/subscriptions' + \
66                          IN_NS_PUBSUB_OWNER
67PUBSUB_ITEMS = PUBSUB_GET + '/items' + IN_NS_PUBSUB
68PUBSUB_RETRACT = PUBSUB_SET + '/retract' + IN_NS_PUBSUB
69PUBSUB_PURGE = PUBSUB_OWNER_SET + '/purge' + IN_NS_PUBSUB_OWNER
70PUBSUB_DELETE = PUBSUB_OWNER_SET + '/delete' + IN_NS_PUBSUB_OWNER
71
72class SubscriptionPending(Exception):
73    """
74    Raised when the requested subscription is pending acceptance.
75    """
76
77
78
79class SubscriptionUnconfigured(Exception):
80    """
81    Raised when the requested subscription needs to be configured before
82    becoming active.
83    """
84
85
86
87class PubSubError(error.StanzaError):
88    """
89    Exception with publish-subscribe specific condition.
90    """
91    def __init__(self, condition, pubsubCondition, feature=None, text=None):
92        appCondition = domish.Element((NS_PUBSUB_ERRORS, pubsubCondition))
93        if feature:
94            appCondition['feature'] = feature
95        error.StanzaError.__init__(self, condition,
96                                         text=text,
97                                         appCondition=appCondition)
98
99
100
101class BadRequest(PubSubError):
102    """
103    Bad request stanza error.
104    """
105    def __init__(self, pubsubCondition=None, text=None):
106        PubSubError.__init__(self, 'bad-request', pubsubCondition, text)
107
108
109
110class Unsupported(PubSubError):
111    def __init__(self, feature, text=None):
112        PubSubError.__init__(self, 'feature-not-implemented',
113                                   'unsupported',
114                                   feature,
115                                   text)
116
117
118
119class Subscription(object):
120    """
121    A subscription to a node.
122
123    @ivar nodeIdentifier: The identifier of the node subscribed to.
124                          The root node is denoted by C{None}.
125    @ivar subscriber: The subscribing entity.
126    @ivar state: The subscription state. One of C{'subscribed'}, C{'pending'},
127                 C{'unconfigured'}.
128    @ivar options: Optional list of subscription options.
129    @type options: C{dict}.
130    """
131
132    def __init__(self, nodeIdentifier, subscriber, state, options=None):
133        self.nodeIdentifier = nodeIdentifier
134        self.subscriber = subscriber
135        self.state = state
136        self.options = options or {}
137
138
139
140class Item(domish.Element):
141    """
142    Publish subscribe item.
143
144    This behaves like an object providing L{domish.IElement}.
145
146    Item payload can be added using C{addChild} or C{addRawXml}, or using the
147    C{payload} keyword argument to C{__init__}.
148    """
149
150    def __init__(self, id=None, payload=None):
151        """
152        @param id: optional item identifier
153        @type id: L{unicode}
154        @param payload: optional item payload. Either as a domish element, or
155                        as serialized XML.
156        @type payload: object providing L{domish.IElement} or L{unicode}.
157        """
158
159        domish.Element.__init__(self, (NS_PUBSUB, 'item'))
160        if id is not None:
161            self['id'] = id
162        if payload is not None:
163            if isinstance(payload, basestring):
164                self.addRawXml(payload)
165            else:
166                self.addChild(payload)
167
168
169
170class _PubSubRequest(xmlstream.IQ):
171    """
172    Publish subscribe request.
173
174    @ivar verb: Request verb
175    @type verb: C{str}
176    @ivar namespace: Request namespace.
177    @type namespace: C{str}
178    @ivar method: Type attribute of the IQ request. Either C{'set'} or C{'get'}
179    @type method: C{str}
180    @ivar command: Command element of the request. This is the direct child of
181                   the C{pubsub} element in the C{namespace} with the name
182                   C{verb}.
183    """
184
185    def __init__(self, xs, verb, namespace=NS_PUBSUB, method='set'):
186        xmlstream.IQ.__init__(self, xs, method)
187        self.addElement((namespace, 'pubsub'))
188
189        self.command = self.pubsub.addElement(verb)
190
191
192    def send(self, to):
193        """
194        Send out request.
195
196        Extends L{xmlstream.IQ.send} by requiring the C{to} parameter to be
197        a L{JID} instance.
198
199        @param to: Entity to send the request to.
200        @type to: L{JID}
201        """
202        destination = to.full()
203        return xmlstream.IQ.send(self, destination)
204
205
206
207class PubSubEvent(object):
208    """
209    A publish subscribe event.
210
211    @param sender: The entity from which the notification was received.
212    @type sender: L{jid.JID}
213    @param recipient: The entity to which the notification was sent.
214    @type recipient: L{wokkel.pubsub.ItemsEvent}
215    @param nodeIdentifier: Identifier of the node the event pertains to.
216    @type nodeIdentifier: C{unicode}
217    @param headers: SHIM headers, see L{wokkel.shim.extractHeaders}.
218    @type headers: L{dict}
219    """
220
221    def __init__(self, sender, recipient, nodeIdentifier, headers):
222        self.sender = sender
223        self.recipient = recipient
224        self.nodeIdentifier = nodeIdentifier
225        self.headers = headers
226
227
228
229class ItemsEvent(PubSubEvent):
230    """
231    A publish-subscribe event that signifies new, updated and retracted items.
232
233    @param items: List of received items as domish elements.
234    @type items: C{list} of L{domish.Element}
235    """
236
237    def __init__(self, sender, recipient, nodeIdentifier, items, headers):
238        PubSubEvent.__init__(self, sender, recipient, nodeIdentifier, headers)
239        self.items = items
240
241
242
243class DeleteEvent(PubSubEvent):
244    """
245    A publish-subscribe event that signifies the deletion of a node.
246    """
247
248    redirectURI = None
249
250
251
252class PurgeEvent(PubSubEvent):
253    """
254    A publish-subscribe event that signifies the purging of a node.
255    """
256
257
258
259class PubSubClient(XMPPHandler):
260    """
261    Publish subscribe client protocol.
262    """
263
264    implements(IPubSubClient)
265
266    def connectionInitialized(self):
267        self.xmlstream.addObserver('/message/event[@xmlns="%s"]' %
268                                   NS_PUBSUB_EVENT, self._onEvent)
269
270
271    def _onEvent(self, message):
272        try:
273            sender = jid.JID(message["from"])
274            recipient = jid.JID(message["to"])
275        except KeyError:
276            return
277
278        actionElement = None
279        for element in message.event.elements():
280            if element.uri == NS_PUBSUB_EVENT:
281                actionElement = element
282
283        if not actionElement:
284            return
285
286        eventHandler = getattr(self, "_onEvent_%s" % actionElement.name, None)
287
288        if eventHandler:
289            headers = shim.extractHeaders(message)
290            eventHandler(sender, recipient, actionElement, headers)
291            message.handled = True
292
293
294    def _onEvent_items(self, sender, recipient, action, headers):
295        nodeIdentifier = action["node"]
296
297        items = [element for element in action.elements()
298                         if element.name in ('item', 'retract')]
299
300        event = ItemsEvent(sender, recipient, nodeIdentifier, items, headers)
301        self.itemsReceived(event)
302
303
304    def _onEvent_delete(self, sender, recipient, action, headers):
305        nodeIdentifier = action["node"]
306        event = DeleteEvent(sender, recipient, nodeIdentifier, headers)
307        if action.redirect:
308            event.redirectURI = action.redirect.getAttribute('uri')
309        self.deleteReceived(event)
310
311
312    def _onEvent_purge(self, sender, recipient, action, headers):
313        nodeIdentifier = action["node"]
314        event = PurgeEvent(sender, recipient, nodeIdentifier, headers)
315        self.purgeReceived(event)
316
317
318    def itemsReceived(self, event):
319        pass
320
321
322    def deleteReceived(self, event):
323        pass
324
325
326    def purgeReceived(self, event):
327        pass
328
329
330    def createNode(self, service, nodeIdentifier=None):
331        """
332        Create a publish subscribe node.
333
334        @param service: The publish subscribe service to create the node at.
335        @type service: L{JID}
336        @param nodeIdentifier: Optional suggestion for the id of the node.
337        @type nodeIdentifier: C{unicode}
338        """
339
340
341        request = _PubSubRequest(self.xmlstream, 'create')
342        if nodeIdentifier:
343            request.command['node'] = nodeIdentifier
344
345        def cb(iq):
346            try:
347                new_node = iq.pubsub.create["node"]
348            except AttributeError:
349                # the suggested node identifier was accepted
350                new_node = nodeIdentifier
351            return new_node
352
353        return request.send(service).addCallback(cb)
354
355
356    def deleteNode(self, service, nodeIdentifier):
357        """
358        Delete a publish subscribe node.
359
360        @param service: The publish subscribe service to delete the node from.
361        @type service: L{JID}
362        @param nodeIdentifier: The identifier of the node.
363        @type nodeIdentifier: C{unicode}
364        """
365        request = _PubSubRequest(self.xmlstream, 'delete', NS_PUBSUB_OWNER)
366        request.command['node'] = nodeIdentifier
367        return request.send(service)
368
369
370    def subscribe(self, service, nodeIdentifier, subscriber):
371        """
372        Subscribe to a publish subscribe node.
373
374        @param service: The publish subscribe service that keeps the node.
375        @type service: L{JID}
376        @param nodeIdentifier: The identifier of the node.
377        @type nodeIdentifier: C{unicode}
378        @param subscriber: The entity to subscribe to the node. This entity
379                           will get notifications of new published items.
380        @type subscriber: L{JID}
381        """
382        request = _PubSubRequest(self.xmlstream, 'subscribe')
383        if nodeIdentifier:
384            request.command['node'] = nodeIdentifier
385        request.command['jid'] = subscriber.full()
386
387        def cb(iq):
388            subscription = iq.pubsub.subscription["subscription"]
389
390            if subscription == 'pending':
391                raise SubscriptionPending
392            elif subscription == 'unconfigured':
393                raise SubscriptionUnconfigured
394            else:
395                # we assume subscription == 'subscribed'
396                # any other value would be invalid, but that should have
397                # yielded a stanza error.
398                return None
399
400        return request.send(service).addCallback(cb)
401
402
403    def unsubscribe(self, service, nodeIdentifier, subscriber):
404        """
405        Unsubscribe from a publish subscribe node.
406
407        @param service: The publish subscribe service that keeps the node.
408        @type service: L{JID}
409        @param nodeIdentifier: The identifier of the node.
410        @type nodeIdentifier: C{unicode}
411        @param subscriber: The entity to unsubscribe from the node.
412        @type subscriber: L{JID}
413        """
414        request = _PubSubRequest(self.xmlstream, 'unsubscribe')
415        if nodeIdentifier:
416            request.command['node'] = nodeIdentifier
417        request.command['jid'] = subscriber.full()
418        return request.send(service)
419
420
421    def publish(self, service, nodeIdentifier, items=None):
422        """
423        Publish to a publish subscribe node.
424
425        @param service: The publish subscribe service that keeps the node.
426        @type service: L{JID}
427        @param nodeIdentifier: The identifier of the node.
428        @type nodeIdentifier: C{unicode}
429        @param items: Optional list of L{Item}s to publish.
430        @type items: C{list}
431        """
432        request = _PubSubRequest(self.xmlstream, 'publish')
433        request.command['node'] = nodeIdentifier
434        if items:
435            for item in items:
436                request.command.addChild(item)
437
438        return request.send(service)
439
440
441    def items(self, service, nodeIdentifier, maxItems=None):
442        """
443        Retrieve previously published items from a publish subscribe node.
444
445        @param service: The publish subscribe service that keeps the node.
446        @type service: L{JID}
447        @param nodeIdentifier: The identifier of the node.
448        @type nodeIdentifier: C{unicode}
449        @param maxItems: Optional limit on the number of retrieved items.
450        @type maxItems: C{int}
451        """
452        request = _PubSubRequest(self.xmlstream, 'items', method='get')
453        if nodeIdentifier:
454            request.command['node'] = nodeIdentifier
455        if maxItems:
456            request.command["max_items"] = str(int(maxItems))
457
458        def cb(iq):
459            items = []
460            for element in iq.pubsub.items.elements():
461                if element.uri == NS_PUBSUB and element.name == 'item':
462                    items.append(element)
463            return items
464
465        return request.send(service).addCallback(cb)
466
467
468
469class PubSubService(XMPPHandler, IQHandlerMixin):
470    """
471    Protocol implementation for a XMPP Publish Subscribe Service.
472
473    The word Service here is used as taken from the Publish Subscribe
474    specification. It is the party responsible for keeping nodes and their
475    subscriptions, and sending out notifications.
476
477    Methods from the L{IPubSubService} interface that are called as
478    a result of an XMPP request may raise exceptions. Alternatively the
479    deferred returned by these methods may have their errback called. These are
480    handled as follows:
481
482     - If the exception is an instance of L{error.StanzaError}, an error
483       response iq is returned.
484     - Any other exception is reported using L{log.msg}. An error response
485       with the condition C{internal-server-error} is returned.
486
487    The default implementation of said methods raises an L{Unsupported}
488    exception and are meant to be overridden.
489
490    @ivar discoIdentity: Service discovery identity as a dictionary with
491                         keys C{'category'}, C{'type'} and C{'name'}.
492    @ivar pubSubFeatures: List of supported publish-subscribe features for
493                          service discovery, as C{str}.
494    @type pubSubFeatures: C{list} or C{None}
495    """
496
497    implements(IPubSubService)
498
499    iqHandlers = {
500            PUBSUB_PUBLISH: '_onPublish',
501            PUBSUB_CREATE: '_onCreate',
502            PUBSUB_SUBSCRIBE: '_onSubscribe',
503            PUBSUB_OPTIONS_GET: '_onOptionsGet',
504            PUBSUB_OPTIONS_SET: '_onOptionsSet',
505            PUBSUB_AFFILIATIONS: '_onAffiliations',
506            PUBSUB_ITEMS: '_onItems',
507            PUBSUB_RETRACT: '_onRetract',
508            PUBSUB_SUBSCRIPTIONS: '_onSubscriptions',
509            PUBSUB_UNSUBSCRIBE: '_onUnsubscribe',
510
511            PUBSUB_AFFILIATIONS_GET: '_onAffiliationsGet',
512            PUBSUB_AFFILIATIONS_SET: '_onAffiliationsSet',
513            PUBSUB_CONFIGURE_GET: '_onConfigureGet',
514            PUBSUB_CONFIGURE_SET: '_onConfigureSet',
515            PUBSUB_DEFAULT: '_onDefault',
516            PUBSUB_PURGE: '_onPurge',
517            PUBSUB_DELETE: '_onDelete',
518            PUBSUB_SUBSCRIPTIONS_GET: '_onSubscriptionsGet',
519            PUBSUB_SUBSCRIPTIONS_SET: '_onSubscriptionsSet',
520
521            }
522
523
524    def __init__(self):
525        self.discoIdentity = {'category': 'pubsub',
526                              'type': 'generic',
527                              'name': 'Generic Publish-Subscribe Service'}
528
529        self.pubSubFeatures = []
530
531
532    def connectionMade(self):
533        self.xmlstream.addObserver(PUBSUB_GET, self.handleRequest)
534        self.xmlstream.addObserver(PUBSUB_SET, self.handleRequest)
535        self.xmlstream.addObserver(PUBSUB_OWNER_GET, self.handleRequest)
536        self.xmlstream.addObserver(PUBSUB_OWNER_SET, self.handleRequest)
537
538
539    def getDiscoInfo(self, requestor, target, nodeIdentifier):
540        info = []
541
542        if not nodeIdentifier:
543            info.append(disco.DiscoIdentity(**self.discoIdentity))
544
545            info.append(disco.DiscoFeature(disco.NS_ITEMS))
546            info.extend([disco.DiscoFeature("%s#%s" % (NS_PUBSUB, feature))
547                         for feature in self.pubSubFeatures])
548
549        def toInfo(nodeInfo):
550            if not nodeInfo:
551                return
552
553            (nodeType, metaData) = nodeInfo['type'], nodeInfo['meta-data']
554            info.append(disco.DiscoIdentity('pubsub', nodeType))
555            if metaData:
556                form = data_form.Form(formType="result",
557                                      formNamespace=NS_PUBSUB_META_DATA)
558                form.addField(
559                        data_form.Field(
560                            var='pubsub#node_type',
561                            value=nodeType,
562                            label='The type of node (collection or leaf)'
563                        )
564                )
565
566                for metaDatum in metaData:
567                    form.addField(data_form.Field.fromDict(metaDatum))
568
569                info.append(form.toElement())
570
571        d = self.getNodeInfo(requestor, target, nodeIdentifier or '')
572        d.addCallback(toInfo)
573        d.addBoth(lambda result: info)
574        return d
575
576
577    def getDiscoItems(self, requestor, target, nodeIdentifier):
578        if nodeIdentifier or self.hideNodes:
579            return defer.succeed([])
580
581        d = self.getNodes(requestor, target)
582        d.addCallback(lambda nodes: [disco.DiscoItem(target, node)
583                                     for node in nodes])
584        return d
585
586
587    def _findForm(self, element, formNamespace):
588        if not element:
589            return None
590
591        form = None
592        for child in element.elements():
593            try:
594                form = data_form.Form.fromElement(child)
595            except data_form.Error:
596                continue
597
598            if form.formNamespace != NS_PUBSUB_NODE_CONFIG:
599                continue
600
601        return form
602
603
604    def _getParameter_node(self, commandElement):
605        try:
606            return commandElement["node"]
607        except KeyError:
608            raise BadRequest('nodeid-required')
609
610
611    def _getParameter_nodeOrEmpty(self, commandElement):
612        return commandElement.getAttribute("node", '')
613
614
615    def _getParameter_jid(self, commandElement):
616        try:
617            return jid.internJID(commandElement["jid"])
618        except KeyError:
619            raise BadRequest('jid-required')
620
621
622    def _getParameter_max_items(self, commandElement):
623        value = commandElement.getAttribute('max_items')
624
625        if value:
626            try:
627                return int(value)
628            except ValueError:
629                raise BadRequest(text="Field max_items requires a positive " +
630                                      "integer value")
631        else:
632            return None
633
634
635    def _getParameters(self, iq, *names):
636        requestor = jid.internJID(iq["from"]).userhostJID()
637        service = jid.internJID(iq["to"])
638
639        params = [requestor, service]
640
641        if names:
642            command = names[0]
643            commandElement = getattr(iq.pubsub, command)
644            if not commandElement:
645                raise Exception("Could not find command element %r" % command)
646
647        for name in names[1:]:
648            try:
649                getter = getattr(self, '_getParameter_' + name)
650            except KeyError:
651                raise Exception("No parameter getter for this name")
652
653            params.append(getter(commandElement))
654
655        return params
656
657
658    def _onPublish(self, iq):
659        requestor, service, nodeIdentifier = self._getParameters(
660                iq, 'publish', 'node')
661
662        items = []
663        for element in iq.pubsub.publish.elements():
664            if element.uri == NS_PUBSUB and element.name == 'item':
665                items.append(element)
666
667        return self.publish(requestor, service, nodeIdentifier, items)
668
669
670    def _onSubscribe(self, iq):
671        requestor, service, nodeIdentifier, subscriber = self._getParameters(
672                iq, 'subscribe', 'nodeOrEmpty', 'jid')
673
674        def toResponse(result):
675            response = domish.Element((NS_PUBSUB, "pubsub"))
676            subscription = response.addElement("subscription")
677            if result.nodeIdentifier:
678                subscription["node"] = result.nodeIdentifier
679            subscription["jid"] = result.subscriber.full()
680            subscription["subscription"] = result.state
681            return response
682
683        d = self.subscribe(requestor, service, nodeIdentifier, subscriber)
684        d.addCallback(toResponse)
685        return d
686
687
688    def _onUnsubscribe(self, iq):
689        requestor, service, nodeIdentifier, subscriber = self._getParameters(
690                iq, 'unsubscribe', 'nodeOrEmpty', 'jid')
691
692        return self.unsubscribe(requestor, service, nodeIdentifier, subscriber)
693
694
695    def _onOptionsGet(self, iq):
696        raise Unsupported('subscription-options')
697
698
699    def _onOptionsSet(self, iq):
700        raise Unsupported('subscription-options')
701
702
703    def _onSubscriptions(self, iq):
704        requestor, service = self._getParameters(iq)
705
706        def toResponse(result):
707            response = domish.Element((NS_PUBSUB, 'pubsub'))
708            subscriptions = response.addElement('subscriptions')
709            for subscription in result:
710                item = subscriptions.addElement('subscription')
711                item['node'] = subscription.nodeIdentifier
712                item['jid'] = subscription.subscriber.full()
713                item['subscription'] = subscription.state
714            return response
715
716        d = self.subscriptions(requestor, service)
717        d.addCallback(toResponse)
718        return d
719
720
721    def _onAffiliations(self, iq):
722        requestor, service = self._getParameters(iq)
723
724        def toResponse(result):
725            response = domish.Element((NS_PUBSUB, 'pubsub'))
726            affiliations = response.addElement('affiliations')
727
728            for nodeIdentifier, affiliation in result:
729                item = affiliations.addElement('affiliation')
730                item['node'] = nodeIdentifier
731                item['affiliation'] = affiliation
732
733            return response
734
735        d = self.affiliations(requestor, service)
736        d.addCallback(toResponse)
737        return d
738
739
740    def _onCreate(self, iq):
741        requestor, service = self._getParameters(iq)
742        nodeIdentifier = iq.pubsub.create.getAttribute("node")
743
744        def toResponse(result):
745            if not nodeIdentifier or nodeIdentifier != result:
746                response = domish.Element((NS_PUBSUB, 'pubsub'))
747                create = response.addElement('create')
748                create['node'] = result
749                return response
750            else:
751                return None
752
753        d = self.create(requestor, service, nodeIdentifier)
754        d.addCallback(toResponse)
755        return d
756
757
758    def _makeFields(self, options, values):
759        fields = []
760        for name, value in values.iteritems():
761            if name not in options:
762                continue
763
764            option = {'var': name}
765            option.update(options[name])
766            if isinstance(value, list):
767                option['values'] = value
768            else:
769                option['value'] = value
770            fields.append(data_form.Field.fromDict(option))
771        return fields
772
773    def _formFromConfiguration(self, values):
774        options = self.getConfigurationOptions()
775        fields = self._makeFields(options, values)
776        form = data_form.Form(formType="form",
777                              formNamespace=NS_PUBSUB_NODE_CONFIG,
778                              fields=fields)
779
780        return form
781
782    def _checkConfiguration(self, values):
783        options = self.getConfigurationOptions()
784        processedValues = {}
785
786        for key, value in values.iteritems():
787            if key not in options:
788                continue
789
790            option = {'var': key}
791            option.update(options[key])
792            field = data_form.Field.fromDict(option)
793            if isinstance(value, list):
794                field.values = value
795            else:
796                field.value = value
797            field.typeCheck()
798
799            if isinstance(value, list):
800                processedValues[key] = field.values
801            else:
802                processedValues[key] = field.value
803
804        return processedValues
805
806
807    def _onDefault(self, iq):
808        requestor, service = self._getParameters(iq)
809
810        def toResponse(options):
811            response = domish.Element((NS_PUBSUB_OWNER, "pubsub"))
812            default = response.addElement("default")
813            default.addChild(self._formFromConfiguration(options).toElement())
814            return response
815
816        form = self._findForm(iq.pubsub.config, NS_PUBSUB_NODE_CONFIG)
817        values = form and form.formType == 'result' and form.getValues() or {}
818        nodeType = values.get('pubsub#node_type', 'leaf')
819
820        if nodeType not in ('leaf', 'collections'):
821            return defer.fail(error.StanzaError('not-acceptable'))
822
823        d = self.getDefaultConfiguration(requestor, service, nodeType)
824        d.addCallback(toResponse)
825        return d
826
827
828    def _onConfigureGet(self, iq):
829        requestor, service, nodeIdentifier = self._getParameters(
830                iq, 'configure', 'nodeOrEmpty')
831
832        def toResponse(options):
833            response = domish.Element((NS_PUBSUB_OWNER, "pubsub"))
834            configure = response.addElement("configure")
835            configure.addChild(self._formFromConfiguration(options).toElement())
836
837            if nodeIdentifier:
838                configure["node"] = nodeIdentifier
839
840            return response
841
842        d = self.getConfiguration(requestor, service, nodeIdentifier)
843        d.addCallback(toResponse)
844        return d
845
846
847    def _onConfigureSet(self, iq):
848        requestor, service, nodeIdentifier = self._getParameters(
849                iq, 'configure', 'nodeOrEmpty')
850
851        # Search configuration form with correct FORM_TYPE and process it
852
853        form = self._findForm(iq.pubsub.configure, NS_PUBSUB_NODE_CONFIG)
854
855        if form:
856            if form.formType == 'submit':
857                options = self._checkConfiguration(form.getValues())
858
859                return self.setConfiguration(requestor, service,
860                                             nodeIdentifier, options)
861            elif form.formType == 'cancel':
862                return None
863
864        raise BadRequest()
865
866
867    def _onItems(self, iq):
868        requestor, service, nodeIdentifier, maxItems = self._getParameters(
869                iq, 'items', 'nodeOrEmpty', 'max_items')
870
871        itemIdentifiers = []
872        for child in iq.pubsub.items.elements():
873            if child.name == 'item' and child.uri == NS_PUBSUB:
874                try:
875                    itemIdentifiers.append(child["id"])
876                except KeyError:
877                    raise BadRequest()
878
879        def toResponse(result):
880            response = domish.Element((NS_PUBSUB, 'pubsub'))
881            items = response.addElement('items')
882            if nodeIdentifier:
883                items["node"] = nodeIdentifier
884
885            for item in result:
886                items.addChild(item)
887
888            return response
889
890        d = self.items(requestor, service, nodeIdentifier, maxItems,
891                       itemIdentifiers)
892        d.addCallback(toResponse)
893        return d
894
895
896    def _onRetract(self, iq):
897        requestor, service, nodeIdentifier = self._getParameters(
898                iq, 'retract', 'node')
899
900        itemIdentifiers = []
901        for child in iq.pubsub.retract.elements():
902            if child.uri == NS_PUBSUB and child.name == 'item':
903                try:
904                    itemIdentifiers.append(child["id"])
905                except KeyError:
906                    raise BadRequest()
907
908        return self.retract(requestor, service, nodeIdentifier,
909                            itemIdentifiers)
910
911
912    def _onPurge(self, iq):
913        requestor, service, nodeIdentifier = self._getParameters(
914                iq, 'purge', 'node')
915        return self.purge(requestor, service, nodeIdentifier)
916
917
918    def _onDelete(self, iq):
919        requestor, service, nodeIdentifier = self._getParameters(
920                iq, 'delete', 'node')
921        return self.delete(requestor, service, nodeIdentifier)
922
923
924    def _onAffiliationsGet(self, iq):
925        raise Unsupported('modify-affiliations')
926
927
928    def _onAffiliationsSet(self, iq):
929        raise Unsupported('modify-affiliations')
930
931
932    def _onSubscriptionsGet(self, iq):
933        raise Unsupported('manage-subscriptions')
934
935
936    def _onSubscriptionsSet(self, iq):
937        raise Unsupported('manage-subscriptions')
938
939    # public methods
940
941    def _createNotification(self, eventType, service, nodeIdentifier,
942                                  subscriber, subscriptions=None):
943        headers = []
944
945        if subscriptions:
946            for subscription in subscriptions:
947                if nodeIdentifier != subscription.nodeIdentifier:
948                    headers.append(('Collection', subscription.nodeIdentifier))
949
950        message = domish.Element((None, "message"))
951        message["from"] = service.full()
952        message["to"] = subscriber.full()
953        event = message.addElement((NS_PUBSUB_EVENT, "event"))
954
955        element = event.addElement(eventType)
956        element["node"] = nodeIdentifier
957
958        if headers:
959            message.addChild(shim.Headers(headers))
960
961        return message
962
963    def notifyPublish(self, service, nodeIdentifier, notifications):
964        for subscriber, subscriptions, items in notifications:
965            message = self._createNotification('items', service,
966                                               nodeIdentifier, subscriber,
967                                               subscriptions)
968            message.event.items.children = items
969            self.send(message)
970
971
972    def notifyDelete(self, service, nodeIdentifier, subscribers,
973                           redirectURI=None):
974        for subscriber in subscribers:
975            message = self._createNotification('delete', service,
976                                               nodeIdentifier,
977                                               subscriber)
978            if redirectURI:
979                redirect = message.event.delete.addElement('redirect')
980                redirect['uri'] = redirectURI
981            self.send(message)
982
983
984    def getNodeInfo(self, requestor, service, nodeIdentifier):
985        return None
986
987
988    def getNodes(self, requestor, service):
989        return []
990
991
992    def publish(self, requestor, service, nodeIdentifier, items):
993        raise Unsupported('publish')
994
995
996    def subscribe(self, requestor, service, nodeIdentifier, subscriber):
997        raise Unsupported('subscribe')
998
999
1000    def unsubscribe(self, requestor, service, nodeIdentifier, subscriber):
1001        raise Unsupported('subscribe')
1002
1003
1004    def subscriptions(self, requestor, service):
1005        raise Unsupported('retrieve-subscriptions')
1006
1007
1008    def affiliations(self, requestor, service):
1009        raise Unsupported('retrieve-affiliations')
1010
1011
1012    def create(self, requestor, service, nodeIdentifier):
1013        raise Unsupported('create-nodes')
1014
1015
1016    def getConfigurationOptions(self):
1017        return {}
1018
1019
1020    def getDefaultConfiguration(self, requestor, service, nodeType):
1021        raise Unsupported('retrieve-default')
1022
1023
1024    def getConfiguration(self, requestor, service, nodeIdentifier):
1025        raise Unsupported('config-node')
1026
1027
1028    def setConfiguration(self, requestor, service, nodeIdentifier, options):
1029        raise Unsupported('config-node')
1030
1031
1032    def items(self, requestor, service, nodeIdentifier, maxItems,
1033                    itemIdentifiers):
1034        raise Unsupported('retrieve-items')
1035
1036
1037    def retract(self, requestor, service, nodeIdentifier, itemIdentifiers):
1038        raise Unsupported('retract-items')
1039
1040
1041    def purge(self, requestor, service, nodeIdentifier):
1042        raise Unsupported('purge-nodes')
1043
1044
1045    def delete(self, requestor, service, nodeIdentifier):
1046        raise Unsupported('delete-nodes')
Note: See TracBrowser for help on using the repository browser.