source: wokkel/pubsub.py @ 30:68535ae85c8d

Last change on this file since 30:68535ae85c8d was 30:68535ae85c8d, checked in by Ralph Meijer <ralphm@…>, 13 years ago

Add support for pubsub collections.

File size: 32.9 KB
Line 
1# -*- test-case-name: wokkel.test.test_pubsub -*-
2#
3# Copyright (c) 2003-2008 Ralph Meijer
4# See LICENSE for details.
5
6"""
7XMPP publish-subscribe protocol.
8
9This protocol is specified in
10U{XEP-0060<http://www.xmpp.org/extensions/xep-0060.html>}.
11"""
12
13from zope.interface import implements
14
15from twisted.internet import defer
16from twisted.words.protocols.jabber import jid, error, xmlstream
17from twisted.words.xish import domish
18
19from wokkel import disco, data_form, shim
20from wokkel.subprotocols import IQHandlerMixin, XMPPHandler
21from wokkel.iwokkel import IPubSubClient, IPubSubService
22
23# Iq get and set XPath queries
24IQ_GET = '/iq[@type="get"]'
25IQ_SET = '/iq[@type="set"]'
26
27# Publish-subscribe namespaces
28NS_PUBSUB = 'http://jabber.org/protocol/pubsub'
29NS_PUBSUB_EVENT = NS_PUBSUB + '#event'
30NS_PUBSUB_ERRORS = NS_PUBSUB + '#errors'
31NS_PUBSUB_OWNER = NS_PUBSUB + "#owner"
32NS_PUBSUB_NODE_CONFIG = NS_PUBSUB + "#node_config"
33NS_PUBSUB_META_DATA = NS_PUBSUB + "#meta-data"
34
35# In publish-subscribe namespace XPath query selector.
36IN_NS_PUBSUB = '[@xmlns="' + NS_PUBSUB + '"]'
37IN_NS_PUBSUB_OWNER = '[@xmlns="' + NS_PUBSUB_OWNER + '"]'
38
39# Publish-subscribe XPath queries
40PUBSUB_ELEMENT = '/pubsub' + IN_NS_PUBSUB
41PUBSUB_OWNER_ELEMENT = '/pubsub' + IN_NS_PUBSUB_OWNER
42PUBSUB_GET = IQ_GET + PUBSUB_ELEMENT
43PUBSUB_SET = IQ_SET + PUBSUB_ELEMENT
44PUBSUB_OWNER_GET = IQ_GET + PUBSUB_OWNER_ELEMENT
45PUBSUB_OWNER_SET = IQ_SET + PUBSUB_OWNER_ELEMENT
46
47# Publish-subscribe command XPath queries
48PUBSUB_PUBLISH = PUBSUB_SET + '/publish' + IN_NS_PUBSUB
49PUBSUB_CREATE = PUBSUB_SET + '/create' + IN_NS_PUBSUB
50PUBSUB_SUBSCRIBE = PUBSUB_SET + '/subscribe' + IN_NS_PUBSUB
51PUBSUB_UNSUBSCRIBE = PUBSUB_SET + '/unsubscribe' + IN_NS_PUBSUB
52PUBSUB_OPTIONS_GET = PUBSUB_GET + '/options' + IN_NS_PUBSUB
53PUBSUB_OPTIONS_SET = PUBSUB_SET + '/options' + IN_NS_PUBSUB
54PUBSUB_DEFAULT = PUBSUB_OWNER_GET + '/default' + IN_NS_PUBSUB_OWNER
55PUBSUB_CONFIGURE_GET = PUBSUB_OWNER_GET + '/configure' + IN_NS_PUBSUB_OWNER
56PUBSUB_CONFIGURE_SET = PUBSUB_OWNER_SET + '/configure' + IN_NS_PUBSUB_OWNER
57PUBSUB_SUBSCRIPTIONS = PUBSUB_GET + '/subscriptions' + IN_NS_PUBSUB
58PUBSUB_AFFILIATIONS = PUBSUB_GET + '/affiliations' + IN_NS_PUBSUB
59PUBSUB_AFFILIATIONS_GET = PUBSUB_OWNER_GET + '/affiliations' + \
60                          IN_NS_PUBSUB_OWNER
61PUBSUB_AFFILIATIONS_SET = PUBSUB_OWNER_SET + '/affiliations' + \
62                          IN_NS_PUBSUB_OWNER
63PUBSUB_SUBSCRIPTIONS_GET = PUBSUB_OWNER_GET + '/subscriptions' + \
64                          IN_NS_PUBSUB_OWNER
65PUBSUB_SUBSCRIPTIONS_SET = PUBSUB_OWNER_SET + '/subscriptions' + \
66                          IN_NS_PUBSUB_OWNER
67PUBSUB_ITEMS = PUBSUB_GET + '/items' + IN_NS_PUBSUB
68PUBSUB_RETRACT = PUBSUB_SET + '/retract' + IN_NS_PUBSUB
69PUBSUB_PURGE = PUBSUB_OWNER_SET + '/purge' + IN_NS_PUBSUB_OWNER
70PUBSUB_DELETE = PUBSUB_OWNER_SET + '/delete' + IN_NS_PUBSUB_OWNER
71
72class SubscriptionPending(Exception):
73    """
74    Raised when the requested subscription is pending acceptance.
75    """
76
77
78
79class SubscriptionUnconfigured(Exception):
80    """
81    Raised when the requested subscription needs to be configured before
82    becoming active.
83    """
84
85
86
87class PubSubError(error.StanzaError):
88    """
89    Exception with publish-subscribe specific condition.
90    """
91    def __init__(self, condition, pubsubCondition, feature=None, text=None):
92        appCondition = domish.Element((NS_PUBSUB_ERRORS, pubsubCondition))
93        if feature:
94            appCondition['feature'] = feature
95        error.StanzaError.__init__(self, condition,
96                                         text=text,
97                                         appCondition=appCondition)
98
99
100
101class BadRequest(PubSubError):
102    """
103    Bad request stanza error.
104    """
105    def __init__(self, pubsubCondition=None, text=None):
106        PubSubError.__init__(self, 'bad-request', pubsubCondition, text)
107
108
109
110class Unsupported(PubSubError):
111    def __init__(self, feature, text=None):
112        PubSubError.__init__(self, 'feature-not-implemented',
113                                   'unsupported',
114                                   feature,
115                                   text)
116
117
118
119class Subscription(object):
120    """
121    A subscription to a node.
122
123    @ivar nodeIdentifier: The identifier of the node subscribed to.
124                          The root node is denoted by C{None}.
125    @ivar subscriber: The subscribing entity.
126    @ivar state: The subscription state. One of C{'subscribed'}, C{'pending'},
127                 C{'unconfigured'}.
128    @ivar options: Optional list of subscription options.
129    @type options: C{dict}.
130    """
131
132    def __init__(self, nodeIdentifier, subscriber, state, options=None):
133        self.nodeIdentifier = nodeIdentifier
134        self.subscriber = subscriber
135        self.state = state
136        self.options = options or {}
137
138
139
140class Item(domish.Element):
141    """
142    Publish subscribe item.
143
144    This behaves like an object providing L{domish.IElement}.
145
146    Item payload can be added using C{addChild} or C{addRawXml}, or using the
147    C{payload} keyword argument to C{__init__}.
148    """
149
150    def __init__(self, id=None, payload=None):
151        """
152        @param id: optional item identifier
153        @type id: L{unicode}
154        @param payload: optional item payload. Either as a domish element, or
155                        as serialized XML.
156        @type payload: object providing L{domish.IElement} or L{unicode}.
157        """
158
159        domish.Element.__init__(self, (NS_PUBSUB, 'item'))
160        if id is not None:
161            self['id'] = id
162        if payload is not None:
163            if isinstance(payload, basestring):
164                self.addRawXml(payload)
165            else:
166                self.addChild(payload)
167
168
169
170class _PubSubRequest(xmlstream.IQ):
171    """
172    Publish subscribe request.
173
174    @ivar verb: Request verb
175    @type verb: C{str}
176    @ivar namespace: Request namespace.
177    @type namespace: C{str}
178    @ivar method: Type attribute of the IQ request. Either C{'set'} or C{'get'}
179    @type method: C{str}
180    @ivar command: Command element of the request. This is the direct child of
181                   the C{pubsub} element in the C{namespace} with the name
182                   C{verb}.
183    """
184
185    def __init__(self, xs, verb, namespace=NS_PUBSUB, method='set'):
186        xmlstream.IQ.__init__(self, xs, method)
187        self.addElement((namespace, 'pubsub'))
188
189        self.command = self.pubsub.addElement(verb)
190
191
192    def send(self, to):
193        """
194        Send out request.
195
196        Extends L{xmlstream.IQ.send} by requiring the C{to} parameter to be
197        a L{JID} instance.
198
199        @param to: Entity to send the request to.
200        @type to: L{JID}
201        """
202        destination = to.full()
203        return xmlstream.IQ.send(self, destination)
204
205
206
207class PubSubEvent(object):
208    """
209    A publish subscribe event.
210
211    @param sender: The entity from which the notification was received.
212    @type sender: L{jid.JID}
213    @param recipient: The entity to which the notification was sent.
214    @type recipient: L{wokkel.pubsub.ItemsEvent}
215    @param nodeIdentifier: Identifier of the node the event pertains to.
216    @type nodeIdentifier: C{unicode}
217    @param headers: SHIM headers, see L{wokkel.shim.extractHeaders}.
218    @type headers: L{dict}
219    """
220
221    def __init__(self, sender, recipient, nodeIdentifier, headers):
222        self.sender = sender
223        self.recipient = recipient
224        self.nodeIdentifier = nodeIdentifier
225        self.headers = headers
226
227
228
229class ItemsEvent(PubSubEvent):
230    """
231    A publish-subscribe event that signifies new, updated and retracted items.
232
233    @param items: List of received items as domish elements.
234    @type items: C{list} of L{domish.Element}
235    """
236
237    def __init__(self, sender, recipient, nodeIdentifier, items, headers):
238        PubSubEvent.__init__(self, sender, recipient, nodeIdentifier, headers)
239        self.items = items
240
241
242
243class DeleteEvent(PubSubEvent):
244    """
245    A publish-subscribe event that signifies the deletion of a node.
246    """
247
248
249
250class PurgeEvent(PubSubEvent):
251    """
252    A publish-subscribe event that signifies the purging of a node.
253    """
254
255
256
257class PubSubClient(XMPPHandler):
258    """
259    Publish subscribe client protocol.
260    """
261
262    implements(IPubSubClient)
263
264    def connectionInitialized(self):
265        self.xmlstream.addObserver('/message/event[@xmlns="%s"]' %
266                                   NS_PUBSUB_EVENT, self._onEvent)
267
268
269    def _onEvent(self, message):
270        try:
271            sender = jid.JID(message["from"])
272            recipient = jid.JID(message["to"])
273        except KeyError:
274            return
275
276        actionElement = None
277        for element in message.event.elements():
278            if element.uri == NS_PUBSUB_EVENT:
279                actionElement = element
280
281        if not actionElement:
282            return
283
284        eventHandler = getattr(self, "_onEvent_%s" % actionElement.name, None)
285
286        if eventHandler:
287            headers = shim.extractHeaders(message)
288            eventHandler(sender, recipient, actionElement, headers)
289            message.handled = True
290
291
292    def _onEvent_items(self, sender, recipient, action, headers):
293        nodeIdentifier = action["node"]
294
295        items = [element for element in action.elements()
296                         if element.name in ('item', 'retract')]
297
298        event = ItemsEvent(sender, recipient, nodeIdentifier, items, headers)
299        self.itemsReceived(event)
300
301
302    def _onEvent_delete(self, sender, recipient, action, headers):
303        nodeIdentifier = action["node"]
304        event = DeleteEvent(sender, recipient, nodeIdentifier, headers)
305        self.deleteReceived(event)
306
307
308    def _onEvent_purge(self, sender, recipient, action, headers):
309        nodeIdentifier = action["node"]
310        event = PurgeEvent(sender, recipient, nodeIdentifier, headers)
311        self.purgeReceived(event)
312
313
314    def itemsReceived(self, event):
315        pass
316
317
318    def deleteReceived(self, event):
319        pass
320
321
322    def purgeReceived(self, event):
323        pass
324
325
326    def createNode(self, service, nodeIdentifier=None):
327        """
328        Create a publish subscribe node.
329
330        @param service: The publish subscribe service to create the node at.
331        @type service: L{JID}
332        @param nodeIdentifier: Optional suggestion for the id of the node.
333        @type nodeIdentifier: C{unicode}
334        """
335
336
337        request = _PubSubRequest(self.xmlstream, 'create')
338        if nodeIdentifier:
339            request.command['node'] = nodeIdentifier
340
341        def cb(iq):
342            try:
343                new_node = iq.pubsub.create["node"]
344            except AttributeError:
345                # the suggested node identifier was accepted
346                new_node = nodeIdentifier
347            return new_node
348
349        return request.send(service).addCallback(cb)
350
351
352    def deleteNode(self, service, nodeIdentifier):
353        """
354        Delete a publish subscribe node.
355
356        @param service: The publish subscribe service to delete the node from.
357        @type service: L{JID}
358        @param nodeIdentifier: The identifier of the node.
359        @type nodeIdentifier: C{unicode}
360        """
361        request = _PubSubRequest(self.xmlstream, 'delete')
362        request.command['node'] = nodeIdentifier
363        return request.send(service)
364
365
366    def subscribe(self, service, nodeIdentifier, subscriber):
367        """
368        Subscribe to a publish subscribe node.
369
370        @param service: The publish subscribe service that keeps the node.
371        @type service: L{JID}
372        @param nodeIdentifier: The identifier of the node.
373        @type nodeIdentifier: C{unicode}
374        @param subscriber: The entity to subscribe to the node. This entity
375                           will get notifications of new published items.
376        @type subscriber: L{JID}
377        """
378        request = _PubSubRequest(self.xmlstream, 'subscribe')
379        if nodeIdentifier:
380            request.command['node'] = nodeIdentifier
381        request.command['jid'] = subscriber.full()
382
383        def cb(iq):
384            subscription = iq.pubsub.subscription["subscription"]
385
386            if subscription == 'pending':
387                raise SubscriptionPending
388            elif subscription == 'unconfigured':
389                raise SubscriptionUnconfigured
390            else:
391                # we assume subscription == 'subscribed'
392                # any other value would be invalid, but that should have
393                # yielded a stanza error.
394                return None
395
396        return request.send(service).addCallback(cb)
397
398
399    def unsubscribe(self, service, nodeIdentifier, subscriber):
400        """
401        Unsubscribe from a publish subscribe node.
402
403        @param service: The publish subscribe service that keeps the node.
404        @type service: L{JID}
405        @param nodeIdentifier: The identifier of the node.
406        @type nodeIdentifier: C{unicode}
407        @param subscriber: The entity to unsubscribe from the node.
408        @type subscriber: L{JID}
409        """
410        request = _PubSubRequest(self.xmlstream, 'unsubscribe')
411        if nodeIdentifier:
412            request.command['node'] = nodeIdentifier
413        request.command['jid'] = subscriber.full()
414        return request.send(service)
415
416
417    def publish(self, service, nodeIdentifier, items=None):
418        """
419        Publish to a publish subscribe node.
420
421        @param service: The publish subscribe service that keeps the node.
422        @type service: L{JID}
423        @param nodeIdentifier: The identifier of the node.
424        @type nodeIdentifier: C{unicode}
425        @param items: Optional list of L{Item}s to publish.
426        @type items: C{list}
427        """
428        request = _PubSubRequest(self.xmlstream, 'publish')
429        request.command['node'] = nodeIdentifier
430        if items:
431            for item in items:
432                request.command.addChild(item)
433
434        return request.send(service)
435
436
437    def items(self, service, nodeIdentifier, maxItems=None):
438        """
439        Retrieve previously published items from a publish subscribe node.
440
441        @param service: The publish subscribe service that keeps the node.
442        @type service: L{JID}
443        @param nodeIdentifier: The identifier of the node.
444        @type nodeIdentifier: C{unicode}
445        @param maxItems: Optional limit on the number of retrieved items.
446        @type maxItems: C{int}
447        """
448        request = _PubSubRequest(self.xmlstream, 'items', method='get')
449        if nodeIdentifier:
450            request.command['node'] = nodeIdentifier
451        if maxItems:
452            request.command["max_items"] = str(int(maxItems))
453
454        def cb(iq):
455            items = []
456            for element in iq.pubsub.items.elements():
457                if element.uri == NS_PUBSUB and element.name == 'item':
458                    items.append(element)
459            return items
460
461        return request.send(service).addCallback(cb)
462
463
464
465class PubSubService(XMPPHandler, IQHandlerMixin):
466    """
467    Protocol implementation for a XMPP Publish Subscribe Service.
468
469    The word Service here is used as taken from the Publish Subscribe
470    specification. It is the party responsible for keeping nodes and their
471    subscriptions, and sending out notifications.
472
473    Methods from the L{IPubSubService} interface that are called as
474    a result of an XMPP request may raise exceptions. Alternatively the
475    deferred returned by these methods may have their errback called. These are
476    handled as follows:
477
478     - If the exception is an instance of L{error.StanzaError}, an error
479       response iq is returned.
480     - Any other exception is reported using L{log.msg}. An error response
481       with the condition C{internal-server-error} is returned.
482
483    The default implementation of said methods raises an L{Unsupported}
484    exception and are meant to be overridden.
485
486    @ivar discoIdentity: Service discovery identity as a dictionary with
487                         keys C{'category'}, C{'type'} and C{'name'}.
488    @ivar pubSubFeatures: List of supported publish-subscribe features for
489                          service discovery, as C{str}.
490    @type pubSubFeatures: C{list} or C{None}
491    """
492
493    implements(IPubSubService)
494
495    iqHandlers = {
496            PUBSUB_PUBLISH: '_onPublish',
497            PUBSUB_CREATE: '_onCreate',
498            PUBSUB_SUBSCRIBE: '_onSubscribe',
499            PUBSUB_OPTIONS_GET: '_onOptionsGet',
500            PUBSUB_OPTIONS_SET: '_onOptionsSet',
501            PUBSUB_AFFILIATIONS: '_onAffiliations',
502            PUBSUB_ITEMS: '_onItems',
503            PUBSUB_RETRACT: '_onRetract',
504            PUBSUB_SUBSCRIPTIONS: '_onSubscriptions',
505            PUBSUB_UNSUBSCRIBE: '_onUnsubscribe',
506
507            PUBSUB_AFFILIATIONS_GET: '_onAffiliationsGet',
508            PUBSUB_AFFILIATIONS_SET: '_onAffiliationsSet',
509            PUBSUB_CONFIGURE_GET: '_onConfigureGet',
510            PUBSUB_CONFIGURE_SET: '_onConfigureSet',
511            PUBSUB_DEFAULT: '_onDefault',
512            PUBSUB_PURGE: '_onPurge',
513            PUBSUB_DELETE: '_onDelete',
514            PUBSUB_SUBSCRIPTIONS_GET: '_onSubscriptionsGet',
515            PUBSUB_SUBSCRIPTIONS_SET: '_onSubscriptionsSet',
516
517            }
518
519
520    def __init__(self):
521        self.discoIdentity = {'category': 'pubsub',
522                              'type': 'generic',
523                              'name': 'Generic Publish-Subscribe Service'}
524
525        self.pubSubFeatures = []
526
527
528    def connectionMade(self):
529        self.xmlstream.addObserver(PUBSUB_GET, self.handleRequest)
530        self.xmlstream.addObserver(PUBSUB_SET, self.handleRequest)
531        self.xmlstream.addObserver(PUBSUB_OWNER_GET, self.handleRequest)
532        self.xmlstream.addObserver(PUBSUB_OWNER_SET, self.handleRequest)
533
534
535    def getDiscoInfo(self, requestor, target, nodeIdentifier):
536        info = []
537
538        if not nodeIdentifier:
539            info.append(disco.DiscoIdentity(**self.discoIdentity))
540
541            info.append(disco.DiscoFeature(disco.NS_ITEMS))
542            info.extend([disco.DiscoFeature("%s#%s" % (NS_PUBSUB, feature))
543                         for feature in self.pubSubFeatures])
544
545        def toInfo(nodeInfo):
546            if not nodeInfo:
547                return
548
549            (nodeType, metaData) = nodeInfo['type'], nodeInfo['meta-data']
550            info.append(disco.DiscoIdentity('pubsub', nodeType))
551            if metaData:
552                form = data_form.Form(formType="result",
553                                      formNamespace=NS_PUBSUB_META_DATA)
554                form.addField(
555                        data_form.Field(
556                            var='pubsub#node_type',
557                            value=nodeType,
558                            label='The type of node (collection or leaf)'
559                        )
560                )
561
562                for metaDatum in metaData:
563                    form.addField(data_form.Field.fromDict(metaDatum))
564
565                info.append(form.toElement())
566
567        d = self.getNodeInfo(requestor, target, nodeIdentifier or '')
568        d.addCallback(toInfo)
569        d.addBoth(lambda result: info)
570        return d
571
572
573    def getDiscoItems(self, requestor, target, nodeIdentifier):
574        if nodeIdentifier or self.hideNodes:
575            return defer.succeed([])
576
577        d = self.getNodes(requestor, target)
578        d.addCallback(lambda nodes: [disco.DiscoItem(target, node)
579                                     for node in nodes])
580        return d
581
582
583    def _findForm(self, element, formNamespace):
584        if not element:
585            return None
586
587        form = None
588        for child in element.elements():
589            try:
590                form = data_form.Form.fromElement(child)
591            except data_form.Error:
592                continue
593
594            if form.formNamespace != NS_PUBSUB_NODE_CONFIG:
595                continue
596
597        return form
598
599
600    def _getParameter_node(self, commandElement):
601        try:
602            return commandElement["node"]
603        except KeyError:
604            raise BadRequest('nodeid-required')
605
606
607    def _getParameter_nodeOrEmpty(self, commandElement):
608        return commandElement.getAttribute("node", '')
609
610
611    def _getParameter_jid(self, commandElement):
612        try:
613            return jid.internJID(commandElement["jid"])
614        except KeyError:
615            raise BadRequest('jid-required')
616
617
618    def _getParameter_max_items(self, commandElement):
619        value = commandElement.getAttribute('max_items')
620
621        if value:
622            try:
623                return int(value)
624            except ValueError:
625                raise BadRequest(text="Field max_items requires a positive " +
626                                      "integer value")
627        else:
628            return None
629
630
631    def _getParameters(self, iq, *names):
632        requestor = jid.internJID(iq["from"]).userhostJID()
633        service = jid.internJID(iq["to"])
634
635        params = [requestor, service]
636
637        if names:
638            command = names[0]
639            commandElement = getattr(iq.pubsub, command)
640            if not commandElement:
641                raise Exception("Could not find command element %r" % command)
642
643        for name in names[1:]:
644            try:
645                getter = getattr(self, '_getParameter_' + name)
646            except KeyError:
647                raise Exception("No parameter getter for this name")
648
649            params.append(getter(commandElement))
650
651        return params
652
653
654    def _onPublish(self, iq):
655        requestor, service, nodeIdentifier = self._getParameters(
656                iq, 'publish', 'node')
657
658        items = []
659        for element in iq.pubsub.publish.elements():
660            if element.uri == NS_PUBSUB and element.name == 'item':
661                items.append(element)
662
663        return self.publish(requestor, service, nodeIdentifier, items)
664
665
666    def _onSubscribe(self, iq):
667        requestor, service, nodeIdentifier, subscriber = self._getParameters(
668                iq, 'subscribe', 'nodeOrEmpty', 'jid')
669
670        def toResponse(result):
671            response = domish.Element((NS_PUBSUB, "pubsub"))
672            subscription = response.addElement("subscription")
673            if result.nodeIdentifier:
674                subscription["node"] = result.nodeIdentifier
675            subscription["jid"] = result.subscriber.full()
676            subscription["subscription"] = result.state
677            return response
678
679        d = self.subscribe(requestor, service, nodeIdentifier, subscriber)
680        d.addCallback(toResponse)
681        return d
682
683
684    def _onUnsubscribe(self, iq):
685        requestor, service, nodeIdentifier, subscriber = self._getParameters(
686                iq, 'unsubscribe', 'nodeOrEmpty', 'jid')
687
688        return self.unsubscribe(requestor, service, nodeIdentifier, subscriber)
689
690
691    def _onOptionsGet(self, iq):
692        raise Unsupported('subscription-options')
693
694
695    def _onOptionsSet(self, iq):
696        raise Unsupported('subscription-options')
697
698
699    def _onSubscriptions(self, iq):
700        requestor, service = self._getParameters(iq)
701
702        def toResponse(result):
703            response = domish.Element((NS_PUBSUB, 'pubsub'))
704            subscriptions = response.addElement('subscriptions')
705            for node, subscriber, state in result:
706                item = subscriptions.addElement('subscription')
707                item['node'] = node
708                item['jid'] = subscriber.full()
709                item['subscription'] = state
710            return response
711
712        d = self.subscriptions(requestor, service)
713        d.addCallback(toResponse)
714        return d
715
716
717    def _onAffiliations(self, iq):
718        requestor, service = self._getParameters(iq)
719
720        def toResponse(result):
721            response = domish.Element((NS_PUBSUB, 'pubsub'))
722            affiliations = response.addElement('affiliations')
723
724            for nodeIdentifier, affiliation in result:
725                item = affiliations.addElement('affiliation')
726                item['node'] = nodeIdentifier
727                item['affiliation'] = affiliation
728
729            return response
730
731        d = self.affiliations(requestor, service)
732        d.addCallback(toResponse)
733        return d
734
735
736    def _onCreate(self, iq):
737        requestor, service = self._getParameters(iq)
738        nodeIdentifier = iq.pubsub.create.getAttribute("node")
739
740        def toResponse(result):
741            if not nodeIdentifier or nodeIdentifier != result:
742                response = domish.Element((NS_PUBSUB, 'pubsub'))
743                create = response.addElement('create')
744                create['node'] = result
745                return response
746            else:
747                return None
748
749        d = self.create(requestor, service, nodeIdentifier)
750        d.addCallback(toResponse)
751        return d
752
753
754    def _makeFields(self, options, values):
755        fields = []
756        for name, value in values.iteritems():
757            if name not in options:
758                continue
759
760            option = {'var': name}
761            option.update(options[name])
762            if isinstance(value, list):
763                option['values'] = value
764            else:
765                option['value'] = value
766            fields.append(data_form.Field.fromDict(option))
767        return fields
768
769    def _formFromConfiguration(self, values):
770        options = self.getConfigurationOptions()
771        fields = self._makeFields(options, values)
772        form = data_form.Form(formType="form",
773                              formNamespace=NS_PUBSUB_NODE_CONFIG,
774                              fields=fields)
775
776        return form
777
778    def _checkConfiguration(self, values):
779        options = self.getConfigurationOptions()
780        processedValues = {}
781
782        for key, value in values.iteritems():
783            if key not in options:
784                continue
785
786            option = {'var': key}
787            option.update(options[key])
788            field = data_form.Field.fromDict(option)
789            if isinstance(value, list):
790                field.values = value
791            else:
792                field.value = value
793            field.typeCheck()
794
795            if isinstance(value, list):
796                processedValues[key] = field.values
797            else:
798                processedValues[key] = field.value
799
800        return processedValues
801
802
803    def _onDefault(self, iq):
804        requestor, service = self._getParameters(iq)
805
806        def toResponse(options):
807            response = domish.Element((NS_PUBSUB_OWNER, "pubsub"))
808            default = response.addElement("default")
809            default.addChild(self._formFromConfiguration(options).toElement())
810            return response
811
812        form = self._findForm(iq.pubsub.config, NS_PUBSUB_NODE_CONFIG)
813        values = form and form.formType == 'result' and form.getValues() or {}
814        nodeType = values.get('pubsub#node_type', 'leaf')
815
816        if nodeType not in ('leaf', 'collections'):
817            return defer.fail(error.StanzaError('not-acceptable'))
818
819        d = self.getDefaultConfiguration(requestor, service, nodeType)
820        d.addCallback(toResponse)
821        return d
822
823
824    def _onConfigureGet(self, iq):
825        requestor, service, nodeIdentifier = self._getParameters(
826                iq, 'configure', 'nodeOrEmpty')
827
828        def toResponse(options):
829            response = domish.Element((NS_PUBSUB_OWNER, "pubsub"))
830            configure = response.addElement("configure")
831            configure.addChild(self._formFromConfiguration(options).toElement())
832
833            if nodeIdentifier:
834                configure["node"] = nodeIdentifier
835
836            return response
837
838        d = self.getConfiguration(requestor, service, nodeIdentifier)
839        d.addCallback(toResponse)
840        return d
841
842
843    def _onConfigureSet(self, iq):
844        requestor, service, nodeIdentifier = self._getParameters(
845                iq, 'configure', 'nodeOrEmpty')
846
847        # Search configuration form with correct FORM_TYPE and process it
848
849        form = self._findForm(iq.pubsub.configure, NS_PUBSUB_NODE_CONFIG)
850
851        if form:
852            if form.formType == 'submit':
853                options = self._checkConfiguration(form.getValues())
854
855                return self.setConfiguration(requestor, service,
856                                             nodeIdentifier, options)
857            elif form.formType == 'cancel':
858                return None
859
860        raise BadRequest()
861
862
863    def _onItems(self, iq):
864        requestor, service, nodeIdentifier, maxItems = self._getParameters(
865                iq, 'items', 'nodeOrEmpty', 'max_items')
866
867        itemIdentifiers = []
868        for child in iq.pubsub.items.elements():
869            if child.name == 'item' and child.uri == NS_PUBSUB:
870                try:
871                    itemIdentifiers.append(child["id"])
872                except KeyError:
873                    raise BadRequest()
874
875        def toResponse(result):
876            response = domish.Element((NS_PUBSUB, 'pubsub'))
877            items = response.addElement('items')
878            if nodeIdentifier:
879                items["node"] = nodeIdentifier
880
881            for item in result:
882                items.addChild(item)
883
884            return response
885
886        d = self.items(requestor, service, nodeIdentifier, maxItems,
887                       itemIdentifiers)
888        d.addCallback(toResponse)
889        return d
890
891
892    def _onRetract(self, iq):
893        requestor, service, nodeIdentifier = self._getParameters(
894                iq, 'retract', 'node')
895
896        itemIdentifiers = []
897        for child in iq.pubsub.retract.elements():
898            if child.uri == NS_PUBSUB and child.name == 'item':
899                try:
900                    itemIdentifiers.append(child["id"])
901                except KeyError:
902                    raise BadRequest()
903
904        return self.retract(requestor, service, nodeIdentifier,
905                            itemIdentifiers)
906
907
908    def _onPurge(self, iq):
909        requestor, service, nodeIdentifier = self._getParameters(
910                iq, 'purge', 'node')
911        return self.purge(requestor, service, nodeIdentifier)
912
913
914    def _onDelete(self, iq):
915        requestor, service, nodeIdentifier = self._getParameters(
916                iq, 'delete', 'node')
917        return self.delete(requestor, service, nodeIdentifier)
918
919
920    def _onAffiliationsGet(self, iq):
921        raise Unsupported('modify-affiliations')
922
923
924    def _onAffiliationsSet(self, iq):
925        raise Unsupported('modify-affiliations')
926
927
928    def _onSubscriptionsGet(self, iq):
929        raise Unsupported('manage-subscriptions')
930
931
932    def _onSubscriptionsSet(self, iq):
933        raise Unsupported('manage-subscriptions')
934
935    # public methods
936
937    def _createNotification(self, eventType, service, nodeIdentifier,
938                                  subscriber, subscriptions=None):
939        headers = []
940
941        if subscriptions:
942            for subscription in subscriptions:
943                if nodeIdentifier != subscription.nodeIdentifier:
944                    headers.append(('Collection', subscription.nodeIdentifier))
945
946        message = domish.Element((None, "message"))
947        message["from"] = service.full()
948        message["to"] = subscriber.full()
949        event = message.addElement((NS_PUBSUB_EVENT, "event"))
950
951        element = event.addElement(eventType)
952        element["node"] = nodeIdentifier
953
954        if headers:
955            message.addChild(shim.Headers(headers))
956
957        return message
958
959    def notifyPublish(self, service, nodeIdentifier, notifications):
960        for subscriber, subscriptions, items in notifications:
961            message = self._createNotification('items', service,
962                                               nodeIdentifier, subscriber,
963                                               subscriptions)
964            message.event.items.children = items
965            self.send(message)
966
967
968    def notifyDelete(self, service, nodeIdentifier, subscriptions):
969        for subscription in subscriptions:
970            message = self._createNotification('delete', service,
971                                               nodeIdentifier,
972                                               subscription.subscriber)
973            self.send(message)
974
975
976    def getNodeInfo(self, requestor, service, nodeIdentifier):
977        return None
978
979
980    def getNodes(self, requestor, service):
981        return []
982
983
984    def publish(self, requestor, service, nodeIdentifier, items):
985        raise Unsupported('publish')
986
987
988    def subscribe(self, requestor, service, nodeIdentifier, subscriber):
989        raise Unsupported('subscribe')
990
991
992    def unsubscribe(self, requestor, service, nodeIdentifier, subscriber):
993        raise Unsupported('subscribe')
994
995
996    def subscriptions(self, requestor, service):
997        raise Unsupported('retrieve-subscriptions')
998
999
1000    def affiliations(self, requestor, service):
1001        raise Unsupported('retrieve-affiliations')
1002
1003
1004    def create(self, requestor, service, nodeIdentifier):
1005        raise Unsupported('create-nodes')
1006
1007
1008    def getConfigurationOptions(self):
1009        return {}
1010
1011
1012    def getDefaultConfiguration(self, requestor, service):
1013        raise Unsupported('retrieve-default')
1014
1015
1016    def getConfiguration(self, requestor, service, nodeIdentifier):
1017        raise Unsupported('config-node')
1018
1019
1020    def setConfiguration(self, requestor, service, nodeIdentifier, options):
1021        raise Unsupported('config-node')
1022
1023
1024    def items(self, requestor, service, nodeIdentifier, maxItems,
1025                    itemIdentifiers):
1026        raise Unsupported('retrieve-items')
1027
1028
1029    def retract(self, requestor, service, nodeIdentifier, itemIdentifiers):
1030        raise Unsupported('retract-items')
1031
1032
1033    def purge(self, requestor, service, nodeIdentifier):
1034        raise Unsupported('purge-nodes')
1035
1036
1037    def delete(self, requestor, service, nodeIdentifier):
1038        raise Unsupported('delete-nodes')
Note: See TracBrowser for help on using the repository browser.