source: wokkel/pubsub.py @ 50:a37c65be8203

Last change on this file since 50:a37c65be8203 was 49:50a84c44cbf1, checked in by Ralph Meijer <ralphm@…>, 13 years ago

Add Service Discovery client protocol and overhaul data classes.

Author: ralphm.
Reviewer: tofu.
Fixes #28.

File size: 33.3 KB
Line 
1# -*- test-case-name: wokkel.test.test_pubsub -*-
2#
3# Copyright (c) 2003-2008 Ralph Meijer
4# See LICENSE for details.
5
6"""
7XMPP publish-subscribe protocol.
8
9This protocol is specified in
10U{XEP-0060<http://www.xmpp.org/extensions/xep-0060.html>}.
11"""
12
13from zope.interface import implements
14
15from twisted.internet import defer
16from twisted.words.protocols.jabber import jid, error, xmlstream
17from twisted.words.xish import domish
18
19from wokkel import disco, data_form, shim
20from wokkel.subprotocols import IQHandlerMixin, XMPPHandler
21from wokkel.iwokkel import IPubSubClient, IPubSubService
22
23# Iq get and set XPath queries
24IQ_GET = '/iq[@type="get"]'
25IQ_SET = '/iq[@type="set"]'
26
27# Publish-subscribe namespaces
28NS_PUBSUB = 'http://jabber.org/protocol/pubsub'
29NS_PUBSUB_EVENT = NS_PUBSUB + '#event'
30NS_PUBSUB_ERRORS = NS_PUBSUB + '#errors'
31NS_PUBSUB_OWNER = NS_PUBSUB + "#owner"
32NS_PUBSUB_NODE_CONFIG = NS_PUBSUB + "#node_config"
33NS_PUBSUB_META_DATA = NS_PUBSUB + "#meta-data"
34
35# In publish-subscribe namespace XPath query selector.
36IN_NS_PUBSUB = '[@xmlns="' + NS_PUBSUB + '"]'
37IN_NS_PUBSUB_OWNER = '[@xmlns="' + NS_PUBSUB_OWNER + '"]'
38
39# Publish-subscribe XPath queries
40PUBSUB_ELEMENT = '/pubsub' + IN_NS_PUBSUB
41PUBSUB_OWNER_ELEMENT = '/pubsub' + IN_NS_PUBSUB_OWNER
42PUBSUB_GET = IQ_GET + PUBSUB_ELEMENT
43PUBSUB_SET = IQ_SET + PUBSUB_ELEMENT
44PUBSUB_OWNER_GET = IQ_GET + PUBSUB_OWNER_ELEMENT
45PUBSUB_OWNER_SET = IQ_SET + PUBSUB_OWNER_ELEMENT
46
47# Publish-subscribe command XPath queries
48PUBSUB_PUBLISH = PUBSUB_SET + '/publish' + IN_NS_PUBSUB
49PUBSUB_CREATE = PUBSUB_SET + '/create' + IN_NS_PUBSUB
50PUBSUB_SUBSCRIBE = PUBSUB_SET + '/subscribe' + IN_NS_PUBSUB
51PUBSUB_UNSUBSCRIBE = PUBSUB_SET + '/unsubscribe' + IN_NS_PUBSUB
52PUBSUB_OPTIONS_GET = PUBSUB_GET + '/options' + IN_NS_PUBSUB
53PUBSUB_OPTIONS_SET = PUBSUB_SET + '/options' + IN_NS_PUBSUB
54PUBSUB_DEFAULT = PUBSUB_OWNER_GET + '/default' + IN_NS_PUBSUB_OWNER
55PUBSUB_CONFIGURE_GET = PUBSUB_OWNER_GET + '/configure' + IN_NS_PUBSUB_OWNER
56PUBSUB_CONFIGURE_SET = PUBSUB_OWNER_SET + '/configure' + IN_NS_PUBSUB_OWNER
57PUBSUB_SUBSCRIPTIONS = PUBSUB_GET + '/subscriptions' + IN_NS_PUBSUB
58PUBSUB_AFFILIATIONS = PUBSUB_GET + '/affiliations' + IN_NS_PUBSUB
59PUBSUB_AFFILIATIONS_GET = PUBSUB_OWNER_GET + '/affiliations' + \
60                          IN_NS_PUBSUB_OWNER
61PUBSUB_AFFILIATIONS_SET = PUBSUB_OWNER_SET + '/affiliations' + \
62                          IN_NS_PUBSUB_OWNER
63PUBSUB_SUBSCRIPTIONS_GET = PUBSUB_OWNER_GET + '/subscriptions' + \
64                          IN_NS_PUBSUB_OWNER
65PUBSUB_SUBSCRIPTIONS_SET = PUBSUB_OWNER_SET + '/subscriptions' + \
66                          IN_NS_PUBSUB_OWNER
67PUBSUB_ITEMS = PUBSUB_GET + '/items' + IN_NS_PUBSUB
68PUBSUB_RETRACT = PUBSUB_SET + '/retract' + IN_NS_PUBSUB
69PUBSUB_PURGE = PUBSUB_OWNER_SET + '/purge' + IN_NS_PUBSUB_OWNER
70PUBSUB_DELETE = PUBSUB_OWNER_SET + '/delete' + IN_NS_PUBSUB_OWNER
71
72class SubscriptionPending(Exception):
73    """
74    Raised when the requested subscription is pending acceptance.
75    """
76
77
78
79class SubscriptionUnconfigured(Exception):
80    """
81    Raised when the requested subscription needs to be configured before
82    becoming active.
83    """
84
85
86
87class PubSubError(error.StanzaError):
88    """
89    Exception with publish-subscribe specific condition.
90    """
91    def __init__(self, condition, pubsubCondition, feature=None, text=None):
92        appCondition = domish.Element((NS_PUBSUB_ERRORS, pubsubCondition))
93        if feature:
94            appCondition['feature'] = feature
95        error.StanzaError.__init__(self, condition,
96                                         text=text,
97                                         appCondition=appCondition)
98
99
100
101class BadRequest(PubSubError):
102    """
103    Bad request stanza error.
104    """
105    def __init__(self, pubsubCondition=None, text=None):
106        PubSubError.__init__(self, 'bad-request', pubsubCondition, text)
107
108
109
110class Unsupported(PubSubError):
111    def __init__(self, feature, text=None):
112        PubSubError.__init__(self, 'feature-not-implemented',
113                                   'unsupported',
114                                   feature,
115                                   text)
116
117
118
119class Subscription(object):
120    """
121    A subscription to a node.
122
123    @ivar nodeIdentifier: The identifier of the node subscribed to.
124                          The root node is denoted by C{None}.
125    @ivar subscriber: The subscribing entity.
126    @ivar state: The subscription state. One of C{'subscribed'}, C{'pending'},
127                 C{'unconfigured'}.
128    @ivar options: Optional list of subscription options.
129    @type options: C{dict}.
130    """
131
132    def __init__(self, nodeIdentifier, subscriber, state, options=None):
133        self.nodeIdentifier = nodeIdentifier
134        self.subscriber = subscriber
135        self.state = state
136        self.options = options or {}
137
138
139
140class Item(domish.Element):
141    """
142    Publish subscribe item.
143
144    This behaves like an object providing L{domish.IElement}.
145
146    Item payload can be added using C{addChild} or C{addRawXml}, or using the
147    C{payload} keyword argument to C{__init__}.
148    """
149
150    def __init__(self, id=None, payload=None):
151        """
152        @param id: optional item identifier
153        @type id: L{unicode}
154        @param payload: optional item payload. Either as a domish element, or
155                        as serialized XML.
156        @type payload: object providing L{domish.IElement} or L{unicode}.
157        """
158
159        domish.Element.__init__(self, (NS_PUBSUB, 'item'))
160        if id is not None:
161            self['id'] = id
162        if payload is not None:
163            if isinstance(payload, basestring):
164                self.addRawXml(payload)
165            else:
166                self.addChild(payload)
167
168
169
170class _PubSubRequest(xmlstream.IQ):
171    """
172    Publish subscribe request.
173
174    @ivar verb: Request verb
175    @type verb: C{str}
176    @ivar namespace: Request namespace.
177    @type namespace: C{str}
178    @ivar method: Type attribute of the IQ request. Either C{'set'} or C{'get'}
179    @type method: C{str}
180    @ivar command: Command element of the request. This is the direct child of
181                   the C{pubsub} element in the C{namespace} with the name
182                   C{verb}.
183    """
184
185    def __init__(self, xs, verb, namespace=NS_PUBSUB, method='set'):
186        xmlstream.IQ.__init__(self, xs, method)
187        self.addElement((namespace, 'pubsub'))
188
189        self.command = self.pubsub.addElement(verb)
190
191
192    def send(self, to):
193        """
194        Send out request.
195
196        Extends L{xmlstream.IQ.send} by requiring the C{to} parameter to be
197        a L{JID} instance.
198
199        @param to: Entity to send the request to.
200        @type to: L{JID}
201        """
202        destination = to.full()
203        return xmlstream.IQ.send(self, destination)
204
205
206
207class PubSubEvent(object):
208    """
209    A publish subscribe event.
210
211    @param sender: The entity from which the notification was received.
212    @type sender: L{jid.JID}
213    @param recipient: The entity to which the notification was sent.
214    @type recipient: L{wokkel.pubsub.ItemsEvent}
215    @param nodeIdentifier: Identifier of the node the event pertains to.
216    @type nodeIdentifier: C{unicode}
217    @param headers: SHIM headers, see L{wokkel.shim.extractHeaders}.
218    @type headers: L{dict}
219    """
220
221    def __init__(self, sender, recipient, nodeIdentifier, headers):
222        self.sender = sender
223        self.recipient = recipient
224        self.nodeIdentifier = nodeIdentifier
225        self.headers = headers
226
227
228
229class ItemsEvent(PubSubEvent):
230    """
231    A publish-subscribe event that signifies new, updated and retracted items.
232
233    @param items: List of received items as domish elements.
234    @type items: C{list} of L{domish.Element}
235    """
236
237    def __init__(self, sender, recipient, nodeIdentifier, items, headers):
238        PubSubEvent.__init__(self, sender, recipient, nodeIdentifier, headers)
239        self.items = items
240
241
242
243class DeleteEvent(PubSubEvent):
244    """
245    A publish-subscribe event that signifies the deletion of a node.
246    """
247
248    redirectURI = None
249
250
251
252class PurgeEvent(PubSubEvent):
253    """
254    A publish-subscribe event that signifies the purging of a node.
255    """
256
257
258
259class PubSubClient(XMPPHandler):
260    """
261    Publish subscribe client protocol.
262    """
263
264    implements(IPubSubClient)
265
266    def connectionInitialized(self):
267        self.xmlstream.addObserver('/message/event[@xmlns="%s"]' %
268                                   NS_PUBSUB_EVENT, self._onEvent)
269
270
271    def _onEvent(self, message):
272        try:
273            sender = jid.JID(message["from"])
274            recipient = jid.JID(message["to"])
275        except KeyError:
276            return
277
278        actionElement = None
279        for element in message.event.elements():
280            if element.uri == NS_PUBSUB_EVENT:
281                actionElement = element
282
283        if not actionElement:
284            return
285
286        eventHandler = getattr(self, "_onEvent_%s" % actionElement.name, None)
287
288        if eventHandler:
289            headers = shim.extractHeaders(message)
290            eventHandler(sender, recipient, actionElement, headers)
291            message.handled = True
292
293
294    def _onEvent_items(self, sender, recipient, action, headers):
295        nodeIdentifier = action["node"]
296
297        items = [element for element in action.elements()
298                         if element.name in ('item', 'retract')]
299
300        event = ItemsEvent(sender, recipient, nodeIdentifier, items, headers)
301        self.itemsReceived(event)
302
303
304    def _onEvent_delete(self, sender, recipient, action, headers):
305        nodeIdentifier = action["node"]
306        event = DeleteEvent(sender, recipient, nodeIdentifier, headers)
307        if action.redirect:
308            event.redirectURI = action.redirect.getAttribute('uri')
309        self.deleteReceived(event)
310
311
312    def _onEvent_purge(self, sender, recipient, action, headers):
313        nodeIdentifier = action["node"]
314        event = PurgeEvent(sender, recipient, nodeIdentifier, headers)
315        self.purgeReceived(event)
316
317
318    def itemsReceived(self, event):
319        pass
320
321
322    def deleteReceived(self, event):
323        pass
324
325
326    def purgeReceived(self, event):
327        pass
328
329
330    def createNode(self, service, nodeIdentifier=None):
331        """
332        Create a publish subscribe node.
333
334        @param service: The publish subscribe service to create the node at.
335        @type service: L{JID}
336        @param nodeIdentifier: Optional suggestion for the id of the node.
337        @type nodeIdentifier: C{unicode}
338        """
339
340
341        request = _PubSubRequest(self.xmlstream, 'create')
342        if nodeIdentifier:
343            request.command['node'] = nodeIdentifier
344
345        def cb(iq):
346            try:
347                new_node = iq.pubsub.create["node"]
348            except AttributeError:
349                # the suggested node identifier was accepted
350                new_node = nodeIdentifier
351            return new_node
352
353        return request.send(service).addCallback(cb)
354
355
356    def deleteNode(self, service, nodeIdentifier):
357        """
358        Delete a publish subscribe node.
359
360        @param service: The publish subscribe service to delete the node from.
361        @type service: L{JID}
362        @param nodeIdentifier: The identifier of the node.
363        @type nodeIdentifier: C{unicode}
364        """
365        request = _PubSubRequest(self.xmlstream, 'delete', NS_PUBSUB_OWNER)
366        request.command['node'] = nodeIdentifier
367        return request.send(service)
368
369
370    def subscribe(self, service, nodeIdentifier, subscriber):
371        """
372        Subscribe to a publish subscribe node.
373
374        @param service: The publish subscribe service that keeps the node.
375        @type service: L{JID}
376        @param nodeIdentifier: The identifier of the node.
377        @type nodeIdentifier: C{unicode}
378        @param subscriber: The entity to subscribe to the node. This entity
379                           will get notifications of new published items.
380        @type subscriber: L{JID}
381        """
382        request = _PubSubRequest(self.xmlstream, 'subscribe')
383        if nodeIdentifier:
384            request.command['node'] = nodeIdentifier
385        request.command['jid'] = subscriber.full()
386
387        def cb(iq):
388            subscription = iq.pubsub.subscription["subscription"]
389
390            if subscription == 'pending':
391                raise SubscriptionPending
392            elif subscription == 'unconfigured':
393                raise SubscriptionUnconfigured
394            else:
395                # we assume subscription == 'subscribed'
396                # any other value would be invalid, but that should have
397                # yielded a stanza error.
398                return None
399
400        return request.send(service).addCallback(cb)
401
402
403    def unsubscribe(self, service, nodeIdentifier, subscriber):
404        """
405        Unsubscribe from a publish subscribe node.
406
407        @param service: The publish subscribe service that keeps the node.
408        @type service: L{JID}
409        @param nodeIdentifier: The identifier of the node.
410        @type nodeIdentifier: C{unicode}
411        @param subscriber: The entity to unsubscribe from the node.
412        @type subscriber: L{JID}
413        """
414        request = _PubSubRequest(self.xmlstream, 'unsubscribe')
415        if nodeIdentifier:
416            request.command['node'] = nodeIdentifier
417        request.command['jid'] = subscriber.full()
418        return request.send(service)
419
420
421    def publish(self, service, nodeIdentifier, items=None):
422        """
423        Publish to a publish subscribe node.
424
425        @param service: The publish subscribe service that keeps the node.
426        @type service: L{JID}
427        @param nodeIdentifier: The identifier of the node.
428        @type nodeIdentifier: C{unicode}
429        @param items: Optional list of L{Item}s to publish.
430        @type items: C{list}
431        """
432        request = _PubSubRequest(self.xmlstream, 'publish')
433        request.command['node'] = nodeIdentifier
434        if items:
435            for item in items:
436                request.command.addChild(item)
437
438        return request.send(service)
439
440
441    def items(self, service, nodeIdentifier, maxItems=None):
442        """
443        Retrieve previously published items from a publish subscribe node.
444
445        @param service: The publish subscribe service that keeps the node.
446        @type service: L{JID}
447        @param nodeIdentifier: The identifier of the node.
448        @type nodeIdentifier: C{unicode}
449        @param maxItems: Optional limit on the number of retrieved items.
450        @type maxItems: C{int}
451        """
452        request = _PubSubRequest(self.xmlstream, 'items', method='get')
453        if nodeIdentifier:
454            request.command['node'] = nodeIdentifier
455        if maxItems:
456            request.command["max_items"] = str(int(maxItems))
457
458        def cb(iq):
459            items = []
460            for element in iq.pubsub.items.elements():
461                if element.uri == NS_PUBSUB and element.name == 'item':
462                    items.append(element)
463            return items
464
465        return request.send(service).addCallback(cb)
466
467
468
469class PubSubService(XMPPHandler, IQHandlerMixin):
470    """
471    Protocol implementation for a XMPP Publish Subscribe Service.
472
473    The word Service here is used as taken from the Publish Subscribe
474    specification. It is the party responsible for keeping nodes and their
475    subscriptions, and sending out notifications.
476
477    Methods from the L{IPubSubService} interface that are called as
478    a result of an XMPP request may raise exceptions. Alternatively the
479    deferred returned by these methods may have their errback called. These are
480    handled as follows:
481
482     - If the exception is an instance of L{error.StanzaError}, an error
483       response iq is returned.
484     - Any other exception is reported using L{log.msg}. An error response
485       with the condition C{internal-server-error} is returned.
486
487    The default implementation of said methods raises an L{Unsupported}
488    exception and are meant to be overridden.
489
490    @ivar discoIdentity: Service discovery identity as a dictionary with
491                         keys C{'category'}, C{'type'} and C{'name'}.
492    @ivar pubSubFeatures: List of supported publish-subscribe features for
493                          service discovery, as C{str}.
494    @type pubSubFeatures: C{list} or C{None}
495    """
496
497    implements(IPubSubService)
498
499    iqHandlers = {
500            PUBSUB_PUBLISH: '_onPublish',
501            PUBSUB_CREATE: '_onCreate',
502            PUBSUB_SUBSCRIBE: '_onSubscribe',
503            PUBSUB_OPTIONS_GET: '_onOptionsGet',
504            PUBSUB_OPTIONS_SET: '_onOptionsSet',
505            PUBSUB_AFFILIATIONS: '_onAffiliations',
506            PUBSUB_ITEMS: '_onItems',
507            PUBSUB_RETRACT: '_onRetract',
508            PUBSUB_SUBSCRIPTIONS: '_onSubscriptions',
509            PUBSUB_UNSUBSCRIBE: '_onUnsubscribe',
510
511            PUBSUB_AFFILIATIONS_GET: '_onAffiliationsGet',
512            PUBSUB_AFFILIATIONS_SET: '_onAffiliationsSet',
513            PUBSUB_CONFIGURE_GET: '_onConfigureGet',
514            PUBSUB_CONFIGURE_SET: '_onConfigureSet',
515            PUBSUB_DEFAULT: '_onDefault',
516            PUBSUB_PURGE: '_onPurge',
517            PUBSUB_DELETE: '_onDelete',
518            PUBSUB_SUBSCRIPTIONS_GET: '_onSubscriptionsGet',
519            PUBSUB_SUBSCRIPTIONS_SET: '_onSubscriptionsSet',
520
521            }
522
523
524    def __init__(self):
525        self.discoIdentity = {'category': 'pubsub',
526                              'type': 'generic',
527                              'name': 'Generic Publish-Subscribe Service'}
528
529        self.pubSubFeatures = []
530
531
532    def connectionMade(self):
533        self.xmlstream.addObserver(PUBSUB_GET, self.handleRequest)
534        self.xmlstream.addObserver(PUBSUB_SET, self.handleRequest)
535        self.xmlstream.addObserver(PUBSUB_OWNER_GET, self.handleRequest)
536        self.xmlstream.addObserver(PUBSUB_OWNER_SET, self.handleRequest)
537
538
539    def getDiscoInfo(self, requestor, target, nodeIdentifier):
540        info = []
541
542        if not nodeIdentifier:
543            category, idType, name = self.discoIdentity
544            info.append(disco.DiscoIdentity(category, idType, name))
545
546            info.append(disco.DiscoFeature(disco.NS_DISCO_ITEMS))
547            info.extend([disco.DiscoFeature("%s#%s" % (NS_PUBSUB, feature))
548                         for feature in self.pubSubFeatures])
549
550        def toInfo(nodeInfo):
551            if not nodeInfo:
552                return
553
554            (nodeType, metaData) = nodeInfo['type'], nodeInfo['meta-data']
555            info.append(disco.DiscoIdentity('pubsub', nodeType))
556            if metaData:
557                form = data_form.Form(formType="result",
558                                      formNamespace=NS_PUBSUB_META_DATA)
559                form.addField(
560                        data_form.Field(
561                            var='pubsub#node_type',
562                            value=nodeType,
563                            label='The type of node (collection or leaf)'
564                        )
565                )
566
567                for metaDatum in metaData:
568                    form.addField(data_form.Field.fromDict(metaDatum))
569
570                info.append(form.toElement())
571
572        d = self.getNodeInfo(requestor, target, nodeIdentifier or '')
573        d.addCallback(toInfo)
574        d.addBoth(lambda result: info)
575        return d
576
577
578    def getDiscoItems(self, requestor, target, nodeIdentifier):
579        if nodeIdentifier or self.hideNodes:
580            return defer.succeed([])
581
582        d = self.getNodes(requestor, target)
583        d.addCallback(lambda nodes: [disco.DiscoItem(target, node)
584                                     for node in nodes])
585        return d
586
587
588    def _findForm(self, element, formNamespace):
589        if not element:
590            return None
591
592        form = None
593        for child in element.elements():
594            try:
595                form = data_form.Form.fromElement(child)
596            except data_form.Error:
597                continue
598
599            if form.formNamespace != NS_PUBSUB_NODE_CONFIG:
600                continue
601
602        return form
603
604
605    def _getParameter_node(self, commandElement):
606        try:
607            return commandElement["node"]
608        except KeyError:
609            raise BadRequest('nodeid-required')
610
611
612    def _getParameter_nodeOrEmpty(self, commandElement):
613        return commandElement.getAttribute("node", '')
614
615
616    def _getParameter_jid(self, commandElement):
617        try:
618            return jid.internJID(commandElement["jid"])
619        except KeyError:
620            raise BadRequest('jid-required')
621
622
623    def _getParameter_max_items(self, commandElement):
624        value = commandElement.getAttribute('max_items')
625
626        if value:
627            try:
628                return int(value)
629            except ValueError:
630                raise BadRequest(text="Field max_items requires a positive " +
631                                      "integer value")
632        else:
633            return None
634
635
636    def _getParameters(self, iq, *names):
637        requestor = jid.internJID(iq["from"]).userhostJID()
638        service = jid.internJID(iq["to"])
639
640        params = [requestor, service]
641
642        if names:
643            command = names[0]
644            commandElement = getattr(iq.pubsub, command)
645            if not commandElement:
646                raise Exception("Could not find command element %r" % command)
647
648        for name in names[1:]:
649            try:
650                getter = getattr(self, '_getParameter_' + name)
651            except KeyError:
652                raise Exception("No parameter getter for this name")
653
654            params.append(getter(commandElement))
655
656        return params
657
658
659    def _onPublish(self, iq):
660        requestor, service, nodeIdentifier = self._getParameters(
661                iq, 'publish', 'node')
662
663        items = []
664        for element in iq.pubsub.publish.elements():
665            if element.uri == NS_PUBSUB and element.name == 'item':
666                items.append(element)
667
668        return self.publish(requestor, service, nodeIdentifier, items)
669
670
671    def _onSubscribe(self, iq):
672        requestor, service, nodeIdentifier, subscriber = self._getParameters(
673                iq, 'subscribe', 'nodeOrEmpty', 'jid')
674
675        def toResponse(result):
676            response = domish.Element((NS_PUBSUB, "pubsub"))
677            subscription = response.addElement("subscription")
678            if result.nodeIdentifier:
679                subscription["node"] = result.nodeIdentifier
680            subscription["jid"] = result.subscriber.full()
681            subscription["subscription"] = result.state
682            return response
683
684        d = self.subscribe(requestor, service, nodeIdentifier, subscriber)
685        d.addCallback(toResponse)
686        return d
687
688
689    def _onUnsubscribe(self, iq):
690        requestor, service, nodeIdentifier, subscriber = self._getParameters(
691                iq, 'unsubscribe', 'nodeOrEmpty', 'jid')
692
693        return self.unsubscribe(requestor, service, nodeIdentifier, subscriber)
694
695
696    def _onOptionsGet(self, iq):
697        raise Unsupported('subscription-options')
698
699
700    def _onOptionsSet(self, iq):
701        raise Unsupported('subscription-options')
702
703
704    def _onSubscriptions(self, iq):
705        requestor, service = self._getParameters(iq)
706
707        def toResponse(result):
708            response = domish.Element((NS_PUBSUB, 'pubsub'))
709            subscriptions = response.addElement('subscriptions')
710            for subscription in result:
711                item = subscriptions.addElement('subscription')
712                item['node'] = subscription.nodeIdentifier
713                item['jid'] = subscription.subscriber.full()
714                item['subscription'] = subscription.state
715            return response
716
717        d = self.subscriptions(requestor, service)
718        d.addCallback(toResponse)
719        return d
720
721
722    def _onAffiliations(self, iq):
723        requestor, service = self._getParameters(iq)
724
725        def toResponse(result):
726            response = domish.Element((NS_PUBSUB, 'pubsub'))
727            affiliations = response.addElement('affiliations')
728
729            for nodeIdentifier, affiliation in result:
730                item = affiliations.addElement('affiliation')
731                item['node'] = nodeIdentifier
732                item['affiliation'] = affiliation
733
734            return response
735
736        d = self.affiliations(requestor, service)
737        d.addCallback(toResponse)
738        return d
739
740
741    def _onCreate(self, iq):
742        requestor, service = self._getParameters(iq)
743        nodeIdentifier = iq.pubsub.create.getAttribute("node")
744
745        def toResponse(result):
746            if not nodeIdentifier or nodeIdentifier != result:
747                response = domish.Element((NS_PUBSUB, 'pubsub'))
748                create = response.addElement('create')
749                create['node'] = result
750                return response
751            else:
752                return None
753
754        d = self.create(requestor, service, nodeIdentifier)
755        d.addCallback(toResponse)
756        return d
757
758
759    def _makeFields(self, options, values):
760        fields = []
761        for name, value in values.iteritems():
762            if name not in options:
763                continue
764
765            option = {'var': name}
766            option.update(options[name])
767            if isinstance(value, list):
768                option['values'] = value
769            else:
770                option['value'] = value
771            fields.append(data_form.Field.fromDict(option))
772        return fields
773
774    def _formFromConfiguration(self, values):
775        options = self.getConfigurationOptions()
776        fields = self._makeFields(options, values)
777        form = data_form.Form(formType="form",
778                              formNamespace=NS_PUBSUB_NODE_CONFIG,
779                              fields=fields)
780
781        return form
782
783    def _checkConfiguration(self, values):
784        options = self.getConfigurationOptions()
785        processedValues = {}
786
787        for key, value in values.iteritems():
788            if key not in options:
789                continue
790
791            option = {'var': key}
792            option.update(options[key])
793            field = data_form.Field.fromDict(option)
794            if isinstance(value, list):
795                field.values = value
796            else:
797                field.value = value
798            field.typeCheck()
799
800            if isinstance(value, list):
801                processedValues[key] = field.values
802            else:
803                processedValues[key] = field.value
804
805        return processedValues
806
807
808    def _onDefault(self, iq):
809        requestor, service = self._getParameters(iq)
810
811        def toResponse(options):
812            response = domish.Element((NS_PUBSUB_OWNER, "pubsub"))
813            default = response.addElement("default")
814            default.addChild(self._formFromConfiguration(options).toElement())
815            return response
816
817        form = self._findForm(iq.pubsub.config, NS_PUBSUB_NODE_CONFIG)
818        values = form and form.formType == 'result' and form.getValues() or {}
819        nodeType = values.get('pubsub#node_type', 'leaf')
820
821        if nodeType not in ('leaf', 'collections'):
822            return defer.fail(error.StanzaError('not-acceptable'))
823
824        d = self.getDefaultConfiguration(requestor, service, nodeType)
825        d.addCallback(toResponse)
826        return d
827
828
829    def _onConfigureGet(self, iq):
830        requestor, service, nodeIdentifier = self._getParameters(
831                iq, 'configure', 'nodeOrEmpty')
832
833        def toResponse(options):
834            response = domish.Element((NS_PUBSUB_OWNER, "pubsub"))
835            configure = response.addElement("configure")
836            configure.addChild(self._formFromConfiguration(options).toElement())
837
838            if nodeIdentifier:
839                configure["node"] = nodeIdentifier
840
841            return response
842
843        d = self.getConfiguration(requestor, service, nodeIdentifier)
844        d.addCallback(toResponse)
845        return d
846
847
848    def _onConfigureSet(self, iq):
849        requestor, service, nodeIdentifier = self._getParameters(
850                iq, 'configure', 'nodeOrEmpty')
851
852        # Search configuration form with correct FORM_TYPE and process it
853
854        form = self._findForm(iq.pubsub.configure, NS_PUBSUB_NODE_CONFIG)
855
856        if form:
857            if form.formType == 'submit':
858                options = self._checkConfiguration(form.getValues())
859
860                return self.setConfiguration(requestor, service,
861                                             nodeIdentifier, options)
862            elif form.formType == 'cancel':
863                return None
864
865        raise BadRequest()
866
867
868    def _onItems(self, iq):
869        requestor, service, nodeIdentifier, maxItems = self._getParameters(
870                iq, 'items', 'nodeOrEmpty', 'max_items')
871
872        itemIdentifiers = []
873        for child in iq.pubsub.items.elements():
874            if child.name == 'item' and child.uri == NS_PUBSUB:
875                try:
876                    itemIdentifiers.append(child["id"])
877                except KeyError:
878                    raise BadRequest()
879
880        def toResponse(result):
881            response = domish.Element((NS_PUBSUB, 'pubsub'))
882            items = response.addElement('items')
883            if nodeIdentifier:
884                items["node"] = nodeIdentifier
885
886            for item in result:
887                items.addChild(item)
888
889            return response
890
891        d = self.items(requestor, service, nodeIdentifier, maxItems,
892                       itemIdentifiers)
893        d.addCallback(toResponse)
894        return d
895
896
897    def _onRetract(self, iq):
898        requestor, service, nodeIdentifier = self._getParameters(
899                iq, 'retract', 'node')
900
901        itemIdentifiers = []
902        for child in iq.pubsub.retract.elements():
903            if child.uri == NS_PUBSUB and child.name == 'item':
904                try:
905                    itemIdentifiers.append(child["id"])
906                except KeyError:
907                    raise BadRequest()
908
909        return self.retract(requestor, service, nodeIdentifier,
910                            itemIdentifiers)
911
912
913    def _onPurge(self, iq):
914        requestor, service, nodeIdentifier = self._getParameters(
915                iq, 'purge', 'node')
916        return self.purge(requestor, service, nodeIdentifier)
917
918
919    def _onDelete(self, iq):
920        requestor, service, nodeIdentifier = self._getParameters(
921                iq, 'delete', 'node')
922        return self.delete(requestor, service, nodeIdentifier)
923
924
925    def _onAffiliationsGet(self, iq):
926        raise Unsupported('modify-affiliations')
927
928
929    def _onAffiliationsSet(self, iq):
930        raise Unsupported('modify-affiliations')
931
932
933    def _onSubscriptionsGet(self, iq):
934        raise Unsupported('manage-subscriptions')
935
936
937    def _onSubscriptionsSet(self, iq):
938        raise Unsupported('manage-subscriptions')
939
940    # public methods
941
942    def _createNotification(self, eventType, service, nodeIdentifier,
943                                  subscriber, subscriptions=None):
944        headers = []
945
946        if subscriptions:
947            for subscription in subscriptions:
948                if nodeIdentifier != subscription.nodeIdentifier:
949                    headers.append(('Collection', subscription.nodeIdentifier))
950
951        message = domish.Element((None, "message"))
952        message["from"] = service.full()
953        message["to"] = subscriber.full()
954        event = message.addElement((NS_PUBSUB_EVENT, "event"))
955
956        element = event.addElement(eventType)
957        element["node"] = nodeIdentifier
958
959        if headers:
960            message.addChild(shim.Headers(headers))
961
962        return message
963
964    def notifyPublish(self, service, nodeIdentifier, notifications):
965        for subscriber, subscriptions, items in notifications:
966            message = self._createNotification('items', service,
967                                               nodeIdentifier, subscriber,
968                                               subscriptions)
969            message.event.items.children = items
970            self.send(message)
971
972
973    def notifyDelete(self, service, nodeIdentifier, subscribers,
974                           redirectURI=None):
975        for subscriber in subscribers:
976            message = self._createNotification('delete', service,
977                                               nodeIdentifier,
978                                               subscriber)
979            if redirectURI:
980                redirect = message.event.delete.addElement('redirect')
981                redirect['uri'] = redirectURI
982            self.send(message)
983
984
985    def getNodeInfo(self, requestor, service, nodeIdentifier):
986        return None
987
988
989    def getNodes(self, requestor, service):
990        return []
991
992
993    def publish(self, requestor, service, nodeIdentifier, items):
994        raise Unsupported('publish')
995
996
997    def subscribe(self, requestor, service, nodeIdentifier, subscriber):
998        raise Unsupported('subscribe')
999
1000
1001    def unsubscribe(self, requestor, service, nodeIdentifier, subscriber):
1002        raise Unsupported('subscribe')
1003
1004
1005    def subscriptions(self, requestor, service):
1006        raise Unsupported('retrieve-subscriptions')
1007
1008
1009    def affiliations(self, requestor, service):
1010        raise Unsupported('retrieve-affiliations')
1011
1012
1013    def create(self, requestor, service, nodeIdentifier):
1014        raise Unsupported('create-nodes')
1015
1016
1017    def getConfigurationOptions(self):
1018        return {}
1019
1020
1021    def getDefaultConfiguration(self, requestor, service, nodeType):
1022        raise Unsupported('retrieve-default')
1023
1024
1025    def getConfiguration(self, requestor, service, nodeIdentifier):
1026        raise Unsupported('config-node')
1027
1028
1029    def setConfiguration(self, requestor, service, nodeIdentifier, options):
1030        raise Unsupported('config-node')
1031
1032
1033    def items(self, requestor, service, nodeIdentifier, maxItems,
1034                    itemIdentifiers):
1035        raise Unsupported('retrieve-items')
1036
1037
1038    def retract(self, requestor, service, nodeIdentifier, itemIdentifiers):
1039        raise Unsupported('retract-items')
1040
1041
1042    def purge(self, requestor, service, nodeIdentifier):
1043        raise Unsupported('purge-nodes')
1044
1045
1046    def delete(self, requestor, service, nodeIdentifier):
1047        raise Unsupported('delete-nodes')
Note: See TracBrowser for help on using the repository browser.