source: wokkel/pubsub.py @ 27:d62d7ea14995

Last change on this file since 27:d62d7ea14995 was 27:d62d7ea14995, checked in by Ralph Meijer <ralphm@…>, 13 years ago

Implement SHIM support.

Author: ralphm.
Fixes #14.

This changes the signature of PubSubClient's itemsReceived and friends, to
have an object to represent the event, for easier addition of new information
from the event's message stanza, like these SHIM headers.

File size: 30.3 KB
Line 
1# -*- test-case-name: wokkel.test.test_pubsub -*-
2#
3# Copyright (c) 2003-2008 Ralph Meijer
4# See LICENSE for details.
5
6"""
7XMPP publish-subscribe protocol.
8
9This protocol is specified in
10U{XEP-0060<http://www.xmpp.org/extensions/xep-0060.html>}.
11"""
12
13from zope.interface import implements
14
15from twisted.internet import defer
16from twisted.words.protocols.jabber import jid, error, xmlstream
17from twisted.words.xish import domish
18
19from wokkel import disco, data_form, shim
20from wokkel.subprotocols import IQHandlerMixin, XMPPHandler
21from wokkel.iwokkel import IPubSubClient, IPubSubService
22
23# Iq get and set XPath queries
24IQ_GET = '/iq[@type="get"]'
25IQ_SET = '/iq[@type="set"]'
26
27# Publish-subscribe namespaces
28NS_PUBSUB = 'http://jabber.org/protocol/pubsub'
29NS_PUBSUB_EVENT = NS_PUBSUB + '#event'
30NS_PUBSUB_ERRORS = NS_PUBSUB + '#errors'
31NS_PUBSUB_OWNER = NS_PUBSUB + "#owner"
32NS_PUBSUB_NODE_CONFIG = NS_PUBSUB + "#node_config"
33NS_PUBSUB_META_DATA = NS_PUBSUB + "#meta-data"
34
35# In publish-subscribe namespace XPath query selector.
36IN_NS_PUBSUB = '[@xmlns="' + NS_PUBSUB + '"]'
37IN_NS_PUBSUB_OWNER = '[@xmlns="' + NS_PUBSUB_OWNER + '"]'
38
39# Publish-subscribe XPath queries
40PUBSUB_ELEMENT = '/pubsub' + IN_NS_PUBSUB
41PUBSUB_OWNER_ELEMENT = '/pubsub' + IN_NS_PUBSUB_OWNER
42PUBSUB_GET = IQ_GET + PUBSUB_ELEMENT
43PUBSUB_SET = IQ_SET + PUBSUB_ELEMENT
44PUBSUB_OWNER_GET = IQ_GET + PUBSUB_OWNER_ELEMENT
45PUBSUB_OWNER_SET = IQ_SET + PUBSUB_OWNER_ELEMENT
46
47# Publish-subscribe command XPath queries
48PUBSUB_PUBLISH = PUBSUB_SET + '/publish' + IN_NS_PUBSUB
49PUBSUB_CREATE = PUBSUB_SET + '/create' + IN_NS_PUBSUB
50PUBSUB_SUBSCRIBE = PUBSUB_SET + '/subscribe' + IN_NS_PUBSUB
51PUBSUB_UNSUBSCRIBE = PUBSUB_SET + '/unsubscribe' + IN_NS_PUBSUB
52PUBSUB_OPTIONS_GET = PUBSUB_GET + '/options' + IN_NS_PUBSUB
53PUBSUB_OPTIONS_SET = PUBSUB_SET + '/options' + IN_NS_PUBSUB
54PUBSUB_DEFAULT = PUBSUB_OWNER_GET + '/default' + IN_NS_PUBSUB_OWNER
55PUBSUB_CONFIGURE_GET = PUBSUB_OWNER_GET + '/configure' + IN_NS_PUBSUB_OWNER
56PUBSUB_CONFIGURE_SET = PUBSUB_OWNER_SET + '/configure' + IN_NS_PUBSUB_OWNER
57PUBSUB_SUBSCRIPTIONS = PUBSUB_GET + '/subscriptions' + IN_NS_PUBSUB
58PUBSUB_AFFILIATIONS = PUBSUB_GET + '/affiliations' + IN_NS_PUBSUB
59PUBSUB_AFFILIATIONS_GET = PUBSUB_OWNER_GET + '/affiliations' + \
60                          IN_NS_PUBSUB_OWNER
61PUBSUB_AFFILIATIONS_SET = PUBSUB_OWNER_SET + '/affiliations' + \
62                          IN_NS_PUBSUB_OWNER
63PUBSUB_SUBSCRIPTIONS_GET = PUBSUB_OWNER_GET + '/subscriptions' + \
64                          IN_NS_PUBSUB_OWNER
65PUBSUB_SUBSCRIPTIONS_SET = PUBSUB_OWNER_SET + '/subscriptions' + \
66                          IN_NS_PUBSUB_OWNER
67PUBSUB_ITEMS = PUBSUB_GET + '/items' + IN_NS_PUBSUB
68PUBSUB_RETRACT = PUBSUB_SET + '/retract' + IN_NS_PUBSUB
69PUBSUB_PURGE = PUBSUB_OWNER_SET + '/purge' + IN_NS_PUBSUB_OWNER
70PUBSUB_DELETE = PUBSUB_OWNER_SET + '/delete' + IN_NS_PUBSUB_OWNER
71
72class BadRequest(error.StanzaError):
73    """
74    Bad request stanza error.
75    """
76    def __init__(self):
77        error.StanzaError.__init__(self, 'bad-request')
78
79
80
81class SubscriptionPending(Exception):
82    """
83    Raised when the requested subscription is pending acceptance.
84    """
85
86
87
88class SubscriptionUnconfigured(Exception):
89    """
90    Raised when the requested subscription needs to be configured before
91    becoming active.
92    """
93
94
95
96class PubSubError(error.StanzaError):
97    """
98    Exception with publish-subscribe specific condition.
99    """
100    def __init__(self, condition, pubsubCondition, feature=None, text=None):
101        appCondition = domish.Element((NS_PUBSUB_ERRORS, pubsubCondition))
102        if feature:
103            appCondition['feature'] = feature
104        error.StanzaError.__init__(self, condition,
105                                         text=text,
106                                         appCondition=appCondition)
107
108
109
110class Unsupported(PubSubError):
111    def __init__(self, feature, text=None):
112        PubSubError.__init__(self, 'feature-not-implemented',
113                                   'unsupported',
114                                   feature,
115                                   text)
116
117
118
119class OptionsUnavailable(Unsupported):
120    def __init__(self):
121        Unsupported.__init__(self, 'subscription-options-unavailable')
122
123
124
125class Item(domish.Element):
126    """
127    Publish subscribe item.
128
129    This behaves like an object providing L{domish.IElement}.
130
131    Item payload can be added using C{addChild} or C{addRawXml}, or using the
132    C{payload} keyword argument to C{__init__}.
133    """
134
135    def __init__(self, id=None, payload=None):
136        """
137        @param id: optional item identifier
138        @type id: L{unicode}
139        @param payload: optional item payload. Either as a domish element, or
140                        as serialized XML.
141        @type payload: object providing L{domish.IElement} or L{unicode}.
142        """
143
144        domish.Element.__init__(self, (NS_PUBSUB, 'item'))
145        if id is not None:
146            self['id'] = id
147        if payload is not None:
148            if isinstance(payload, basestring):
149                self.addRawXml(payload)
150            else:
151                self.addChild(payload)
152
153
154
155class _PubSubRequest(xmlstream.IQ):
156    """
157    Publish subscribe request.
158
159    @ivar verb: Request verb
160    @type verb: C{str}
161    @ivar namespace: Request namespace.
162    @type namespace: C{str}
163    @ivar method: Type attribute of the IQ request. Either C{'set'} or C{'get'}
164    @type method: C{str}
165    @ivar command: Command element of the request. This is the direct child of
166                   the C{pubsub} element in the C{namespace} with the name
167                   C{verb}.
168    """
169
170    def __init__(self, xs, verb, namespace=NS_PUBSUB, method='set'):
171        xmlstream.IQ.__init__(self, xs, method)
172        self.addElement((namespace, 'pubsub'))
173
174        self.command = self.pubsub.addElement(verb)
175
176
177    def send(self, to):
178        """
179        Send out request.
180
181        Extends L{xmlstream.IQ.send} by requiring the C{to} parameter to be
182        a L{JID} instance.
183
184        @param to: Entity to send the request to.
185        @type to: L{JID}
186        """
187        destination = to.full()
188        return xmlstream.IQ.send(self, destination)
189
190
191
192class PubSubEvent(object):
193    """
194    A publish subscribe event.
195
196    @param sender: The entity from which the notification was received.
197    @type sender: L{jid.JID}
198    @param recipient: The entity to which the notification was sent.
199    @type recipient: L{wokkel.pubsub.ItemsEvent}
200    @param nodeIdentifier: Identifier of the node the event pertains to.
201    @type nodeIdentifier: C{unicode}
202    @param headers: SHIM headers, see L{wokkel.shim.extractHeaders}.
203    @type headers: L{dict}
204    """
205
206    def __init__(self, sender, recipient, nodeIdentifier, headers):
207        self.sender = sender
208        self.recipient = recipient
209        self.nodeIdentifier = nodeIdentifier
210        self.headers = headers
211
212
213
214class ItemsEvent(PubSubEvent):
215    """
216    A publish-subscribe event that signifies new, updated and retracted items.
217
218    @param items: List of received items as domish elements.
219    @type items: C{list} of L{domish.Element}
220    """
221
222    def __init__(self, sender, recipient, nodeIdentifier, items, headers):
223        PubSubEvent.__init__(self, sender, recipient, nodeIdentifier, headers)
224        self.items = items
225
226
227
228class DeleteEvent(PubSubEvent):
229    """
230    A publish-subscribe event that signifies the deletion of a node.
231    """
232
233
234
235class PurgeEvent(PubSubEvent):
236    """
237    A publish-subscribe event that signifies the purging of a node.
238    """
239
240
241
242class PubSubClient(XMPPHandler):
243    """
244    Publish subscribe client protocol.
245    """
246
247    implements(IPubSubClient)
248
249    def connectionInitialized(self):
250        self.xmlstream.addObserver('/message/event[@xmlns="%s"]' %
251                                   NS_PUBSUB_EVENT, self._onEvent)
252
253
254    def _onEvent(self, message):
255        try:
256            sender = jid.JID(message["from"])
257            recipient = jid.JID(message["to"])
258        except KeyError:
259            return
260
261        actionElement = None
262        for element in message.event.elements():
263            if element.uri == NS_PUBSUB_EVENT:
264                actionElement = element
265
266        if not actionElement:
267            return
268
269        eventHandler = getattr(self, "_onEvent_%s" % actionElement.name, None)
270
271        if eventHandler:
272            headers = shim.extractHeaders(message)
273            eventHandler(sender, recipient, actionElement, headers)
274            message.handled = True
275
276    def _onEvent_items(self, sender, recipient, action, headers):
277        nodeIdentifier = action["node"]
278
279        items = [element for element in action.elements()
280                         if element.name in ('item', 'retract')]
281
282        event = ItemsEvent(sender, recipient, nodeIdentifier, items, headers)
283        self.itemsReceived(event)
284
285    def _onEvent_delete(self, sender, recipient, action, headers):
286        nodeIdentifier = action["node"]
287        event = DeleteEvent(sender, recipient, nodeIdentifier, headers)
288        self.deleteReceived(event)
289
290    def _onEvent_purge(self, sender, recipient, action, headers):
291        nodeIdentifier = action["node"]
292        event = PurgeEvent(sender, recipient, nodeIdentifier, headers)
293        self.purgeReceived(event)
294
295    def itemsReceived(self, event):
296        pass
297
298    def deleteReceived(self, event):
299        pass
300
301    def purgeReceived(self, event):
302        pass
303
304    def createNode(self, service, nodeIdentifier=None):
305        """
306        Create a publish subscribe node.
307
308        @param service: The publish subscribe service to create the node at.
309        @type service: L{JID}
310        @param nodeIdentifier: Optional suggestion for the id of the node.
311        @type nodeIdentifier: C{unicode}
312        """
313
314
315        request = _PubSubRequest(self.xmlstream, 'create')
316        if nodeIdentifier:
317            request.command['node'] = nodeIdentifier
318
319        def cb(iq):
320            try:
321                new_node = iq.pubsub.create["node"]
322            except AttributeError:
323                # the suggested node identifier was accepted
324                new_node = nodeIdentifier
325            return new_node
326
327        return request.send(service).addCallback(cb)
328
329
330    def deleteNode(self, service, nodeIdentifier):
331        """
332        Delete a publish subscribe node.
333
334        @param service: The publish subscribe service to delete the node from.
335        @type service: L{JID}
336        @param nodeIdentifier: The identifier of the node.
337        @type nodeIdentifier: C{unicode}
338        """
339        request = _PubSubRequest(self.xmlstream, 'delete')
340        request.command['node'] = nodeIdentifier
341        return request.send(service)
342
343
344    def subscribe(self, service, nodeIdentifier, subscriber):
345        """
346        Subscribe to a publish subscribe node.
347
348        @param service: The publish subscribe service that keeps the node.
349        @type service: L{JID}
350        @param nodeIdentifier: The identifier of the node.
351        @type nodeIdentifier: C{unicode}
352        @param subscriber: The entity to subscribe to the node. This entity
353                           will get notifications of new published items.
354        @type subscriber: L{JID}
355        """
356        request = _PubSubRequest(self.xmlstream, 'subscribe')
357        request.command['node'] = nodeIdentifier
358        request.command['jid'] = subscriber.full()
359
360        def cb(iq):
361            subscription = iq.pubsub.subscription["subscription"]
362
363            if subscription == 'pending':
364                raise SubscriptionPending
365            elif subscription == 'unconfigured':
366                raise SubscriptionUnconfigured
367            else:
368                # we assume subscription == 'subscribed'
369                # any other value would be invalid, but that should have
370                # yielded a stanza error.
371                return None
372
373        return request.send(service).addCallback(cb)
374
375
376    def unsubscribe(self, service, nodeIdentifier, subscriber):
377        """
378        Unsubscribe from a publish subscribe node.
379
380        @param service: The publish subscribe service that keeps the node.
381        @type service: L{JID}
382        @param nodeIdentifier: The identifier of the node.
383        @type nodeIdentifier: C{unicode}
384        @param subscriber: The entity to unsubscribe from the node.
385        @type subscriber: L{JID}
386        """
387        request = _PubSubRequest(self.xmlstream, 'unsubscribe')
388        request.command['node'] = nodeIdentifier
389        request.command['jid'] = subscriber.full()
390        return request.send(service)
391
392
393    def publish(self, service, nodeIdentifier, items=None):
394        """
395        Publish to a publish subscribe node.
396
397        @param service: The publish subscribe service that keeps the node.
398        @type service: L{JID}
399        @param nodeIdentifier: The identifier of the node.
400        @type nodeIdentifier: C{unicode}
401        @param items: Optional list of L{Item}s to publish.
402        @type items: C{list}
403        """
404        request = _PubSubRequest(self.xmlstream, 'publish')
405        request.command['node'] = nodeIdentifier
406        if items:
407            for item in items:
408                request.command.addChild(item)
409
410        return request.send(service)
411
412
413    def items(self, service, nodeIdentifier, maxItems=None):
414        """
415        Retrieve previously published items from a publish subscribe node.
416
417        @param service: The publish subscribe service that keeps the node.
418        @type service: L{JID}
419        @param nodeIdentifier: The identifier of the node.
420        @type nodeIdentifier: C{unicode}
421        @param maxItems: Optional limit on the number of retrieved items.
422        @type maxItems: C{int}
423        """
424        request = _PubSubRequest(self.xmlstream, 'items', method='get')
425        request.command['node'] = nodeIdentifier
426        if maxItems:
427            request.command["max_items"] = str(int(maxItems))
428
429        def cb(iq):
430            items = []
431            for element in iq.pubsub.items.elements():
432                if element.uri == NS_PUBSUB and element.name == 'item':
433                    items.append(element)
434            return items
435
436        return request.send(service).addCallback(cb)
437
438
439
440class PubSubService(XMPPHandler, IQHandlerMixin):
441    """
442    Protocol implementation for a XMPP Publish Subscribe Service.
443
444    The word Service here is used as taken from the Publish Subscribe
445    specification. It is the party responsible for keeping nodes and their
446    subscriptions, and sending out notifications.
447
448    Methods from the L{IPubSubService} interface that are called as
449    a result of an XMPP request may raise exceptions. Alternatively the
450    deferred returned by these methods may have their errback called. These are
451    handled as follows:
452
453     - If the exception is an instance of L{error.StanzaError}, an error
454       response iq is returned.
455     - Any other exception is reported using L{log.msg}. An error response
456       with the condition C{internal-server-error} is returned.
457
458    The default implementation of said methods raises an L{Unsupported}
459    exception and are meant to be overridden.
460
461    @ivar discoIdentity: Service discovery identity as a dictionary with
462                         keys C{'category'}, C{'type'} and C{'name'}.
463    @ivar pubSubFeatures: List of supported publish-subscribe features for
464                          service discovery, as C{str}.
465    @type pubSubFeatures: C{list} or C{None}
466    """
467
468    implements(IPubSubService)
469
470    iqHandlers = {
471            PUBSUB_PUBLISH: '_onPublish',
472            PUBSUB_CREATE: '_onCreate',
473            PUBSUB_SUBSCRIBE: '_onSubscribe',
474            PUBSUB_OPTIONS_GET: '_onOptionsGet',
475            PUBSUB_OPTIONS_SET: '_onOptionsSet',
476            PUBSUB_AFFILIATIONS: '_onAffiliations',
477            PUBSUB_ITEMS: '_onItems',
478            PUBSUB_RETRACT: '_onRetract',
479            PUBSUB_SUBSCRIPTIONS: '_onSubscriptions',
480            PUBSUB_UNSUBSCRIBE: '_onUnsubscribe',
481
482            PUBSUB_AFFILIATIONS_GET: '_onAffiliationsGet',
483            PUBSUB_AFFILIATIONS_SET: '_onAffiliationsSet',
484            PUBSUB_CONFIGURE_GET: '_onConfigureGet',
485            PUBSUB_CONFIGURE_SET: '_onConfigureSet',
486            PUBSUB_DEFAULT: '_onDefault',
487            PUBSUB_PURGE: '_onPurge',
488            PUBSUB_DELETE: '_onDelete',
489            PUBSUB_SUBSCRIPTIONS_GET: '_onSubscriptionsGet',
490            PUBSUB_SUBSCRIPTIONS_SET: '_onSubscriptionsSet',
491
492            }
493
494
495    def __init__(self):
496        self.discoIdentity = {'category': 'pubsub',
497                              'type': 'generic',
498                              'name': 'Generic Publish-Subscribe Service'}
499
500        self.pubSubFeatures = []
501
502
503    def connectionMade(self):
504        self.xmlstream.addObserver(PUBSUB_GET, self.handleRequest)
505        self.xmlstream.addObserver(PUBSUB_SET, self.handleRequest)
506        self.xmlstream.addObserver(PUBSUB_OWNER_GET, self.handleRequest)
507        self.xmlstream.addObserver(PUBSUB_OWNER_SET, self.handleRequest)
508
509
510    def getDiscoInfo(self, requestor, target, nodeIdentifier):
511        info = []
512
513        if not nodeIdentifier:
514            info.append(disco.DiscoIdentity(**self.discoIdentity))
515
516            info.append(disco.DiscoFeature(disco.NS_ITEMS))
517            info.extend([disco.DiscoFeature("%s#%s" % (NS_PUBSUB, feature))
518                         for feature in self.pubSubFeatures])
519
520        def toInfo(nodeInfo):
521            if not nodeInfo:
522                return
523
524            (nodeType, metaData) = nodeInfo['type'], nodeInfo['meta-data']
525            info.append(disco.DiscoIdentity('pubsub', nodeType))
526            if metaData:
527                form = data_form.Form(formType="result",
528                                      formNamespace=NS_PUBSUB_META_DATA)
529                form.fields.append(
530                        data_form.Field(
531                            var='pubsub#node_type',
532                            value=nodeType,
533                            label='The type of node (collection or leaf)'
534                        )
535                )
536
537                for metaDatum in metaData:
538                    form.fields.append(data_form.Field.fromDict(metaDatum))
539
540                info.append(form.toElement())
541
542        d = self.getNodeInfo(requestor, target, nodeIdentifier or '')
543        d.addCallback(toInfo)
544        d.addBoth(lambda result: info)
545        return d
546
547
548    def getDiscoItems(self, requestor, target, nodeIdentifier):
549        if nodeIdentifier or self.hideNodes:
550            return defer.succeed([])
551
552        d = self.getNodes(requestor, target)
553        d.addCallback(lambda nodes: [disco.DiscoItem(target, node)
554                                     for node in nodes])
555        return d
556
557
558    def _findForm(self, element, formNamespace):
559        if not element:
560            return None
561
562        form = None
563        for child in element.elements():
564            try:
565                form = data_form.Form.fromElement(child)
566            except data_form.Error:
567                continue
568
569            if form.formNamespace != NS_PUBSUB_NODE_CONFIG:
570                continue
571
572        return form
573
574
575    def _onPublish(self, iq):
576        requestor = jid.internJID(iq["from"]).userhostJID()
577        service = jid.internJID(iq["to"])
578
579        try:
580            nodeIdentifier = iq.pubsub.publish["node"]
581        except KeyError:
582            raise BadRequest
583
584        items = []
585        for element in iq.pubsub.publish.elements():
586            if element.uri == NS_PUBSUB and element.name == 'item':
587                items.append(element)
588
589        return self.publish(requestor, service, nodeIdentifier, items)
590
591
592    def _onSubscribe(self, iq):
593        requestor = jid.internJID(iq["from"]).userhostJID()
594        service = jid.internJID(iq["to"])
595
596        try:
597            nodeIdentifier = iq.pubsub.subscribe["node"]
598            subscriber = jid.internJID(iq.pubsub.subscribe["jid"])
599        except KeyError:
600            raise BadRequest
601
602        def toResponse(subscription):
603            nodeIdentifier, state = subscription
604            response = domish.Element((NS_PUBSUB, "pubsub"))
605            subscription = response.addElement("subscription")
606            subscription["node"] = nodeIdentifier
607            subscription["jid"] = subscriber.full()
608            subscription["subscription"] = state
609            return response
610
611        d = self.subscribe(requestor, service, nodeIdentifier, subscriber)
612        d.addCallback(toResponse)
613        return d
614
615
616    def _onUnsubscribe(self, iq):
617        requestor = jid.internJID(iq["from"]).userhostJID()
618        service = jid.internJID(iq["to"])
619
620        try:
621            nodeIdentifier = iq.pubsub.unsubscribe["node"]
622            subscriber = jid.internJID(iq.pubsub.unsubscribe["jid"])
623        except KeyError:
624            raise BadRequest
625
626        return self.unsubscribe(requestor, service, nodeIdentifier, subscriber)
627
628
629    def _onOptionsGet(self, iq):
630        raise Unsupported('subscription-options-unavailable')
631
632
633    def _onOptionsSet(self, iq):
634        raise Unsupported('subscription-options-unavailable')
635
636
637    def _onSubscriptions(self, iq):
638        requestor = jid.internJID(iq["from"]).userhostJID()
639        service = jid.internJID(iq["to"])
640
641        def toResponse(result):
642            response = domish.Element((NS_PUBSUB, 'pubsub'))
643            subscriptions = response.addElement('subscriptions')
644            for node, subscriber, state in result:
645                item = subscriptions.addElement('subscription')
646                item['node'] = node
647                item['jid'] = subscriber.full()
648                item['subscription'] = state
649            return response
650
651        d = self.subscriptions(requestor, service)
652        d.addCallback(toResponse)
653        return d
654
655
656    def _onAffiliations(self, iq):
657        requestor = jid.internJID(iq["from"]).userhostJID()
658        service = jid.internJID(iq["to"])
659
660        def toResponse(result):
661            response = domish.Element((NS_PUBSUB, 'pubsub'))
662            affiliations = response.addElement('affiliations')
663
664            for nodeIdentifier, affiliation in result:
665                item = affiliations.addElement('affiliation')
666                item['node'] = nodeIdentifier
667                item['affiliation'] = affiliation
668
669            return response
670
671        d = self.affiliations(requestor, service)
672        d.addCallback(toResponse)
673        return d
674
675
676    def _onCreate(self, iq):
677        requestor = jid.internJID(iq["from"]).userhostJID()
678        service = jid.internJID(iq["to"])
679        nodeIdentifier = iq.pubsub.create.getAttribute("node")
680
681        def toResponse(result):
682            if not nodeIdentifier or nodeIdentifier != result:
683                response = domish.Element((NS_PUBSUB, 'pubsub'))
684                create = response.addElement('create')
685                create['node'] = result
686                return response
687            else:
688                return None
689
690        d = self.create(requestor, service, nodeIdentifier)
691        d.addCallback(toResponse)
692        return d
693
694
695    def _formFromConfiguration(self, values):
696        options = self.getConfigurationOptions()
697        form = data_form.Form(formType="form",
698                              formNamespace=NS_PUBSUB_NODE_CONFIG)
699
700        for name, value in values.iteritems():
701            if name in options:
702                option = {'var': name}
703                option.update(options[name])
704                if isinstance(value, list):
705                    option['values'] = value
706                else:
707                    option['value'] = value
708                form.fields.append(data_form.Field.fromDict(option))
709
710        return form
711
712
713    def _onDefault(self, iq):
714        requestor = jid.internJID(iq["from"]).userhostJID()
715        service = jid.internJID(iq["to"])
716
717        def toResponse(options):
718            response = domish.Element((NS_PUBSUB_OWNER, "pubsub"))
719            default = response.addElement("default")
720            default.addChild(self._formFromConfiguration(options).toElement())
721            return response
722
723        d = self.getDefaultConfiguration(requestor, service)
724        d.addCallback(toResponse)
725        return d
726
727
728    def _onConfigureGet(self, iq):
729        requestor = jid.internJID(iq["from"]).userhostJID()
730        service = jid.internJID(iq["to"])
731        nodeIdentifier = iq.pubsub.configure.getAttribute("node")
732
733        def toResponse(options):
734            response = domish.Element((NS_PUBSUB_OWNER, "pubsub"))
735            configure = response.addElement("configure")
736            configure.addChild(self._formFromConfiguration(options).toElement())
737
738            if nodeIdentifier:
739                configure["node"] = nodeIdentifier
740
741            return response
742
743        d = self.getConfiguration(requestor, service, nodeIdentifier)
744        d.addCallback(toResponse)
745        return d
746
747
748    def _onConfigureSet(self, iq):
749        requestor = jid.internJID(iq["from"]).userhostJID()
750        service = jid.internJID(iq["to"])
751        nodeIdentifier = iq.pubsub.configure["node"]
752
753        # Search configuration form with correct FORM_TYPE and process it
754
755        form = self._findForm(iq.pubsub.configure, NS_PUBSUB_NODE_CONFIG)
756
757        if form:
758            if form.formType == 'submit':
759                options = form.getValues()
760
761                return self.setConfiguration(requestor, service,
762                                             nodeIdentifier, options)
763            elif form.formType == 'cancel':
764                return None
765
766        raise BadRequest()
767
768
769    def _onItems(self, iq):
770        requestor = jid.internJID(iq["from"]).userhostJID()
771        service = jid.internJID(iq["to"])
772
773        try:
774            nodeIdentifier = iq.pubsub.items["node"]
775        except KeyError:
776            raise BadRequest
777
778        maxItems = iq.pubsub.items.getAttribute('max_items')
779
780        if maxItems:
781            try:
782                maxItems = int(maxItems)
783            except ValueError:
784                raise BadRequest
785
786        itemIdentifiers = []
787        for child in iq.pubsub.items.elements():
788            if child.name == 'item' and child.uri == NS_PUBSUB:
789                try:
790                    itemIdentifiers.append(child["id"])
791                except KeyError:
792                    raise BadRequest
793
794        def toResponse(result):
795            response = domish.Element((NS_PUBSUB, 'pubsub'))
796            items = response.addElement('items')
797            items["node"] = nodeIdentifier
798
799            for item in result:
800                items.addChild(item)
801
802            return response
803
804        d = self.items(requestor, service, nodeIdentifier, maxItems,
805                       itemIdentifiers)
806        d.addCallback(toResponse)
807        return d
808
809
810    def _onRetract(self, iq):
811        requestor = jid.internJID(iq["from"]).userhostJID()
812        service = jid.internJID(iq["to"])
813
814        try:
815            nodeIdentifier = iq.pubsub.retract["node"]
816        except KeyError:
817            raise BadRequest
818
819        itemIdentifiers = []
820        for child in iq.pubsub.retract.elements():
821            if child.uri == NS_PUBSUB_OWNER and child.name == 'item':
822                try:
823                    itemIdentifiers.append(child["id"])
824                except KeyError:
825                    raise BadRequest
826
827        return self.retract(requestor, service, nodeIdentifier,
828                            itemIdentifiers)
829
830
831    def _onPurge(self, iq):
832        requestor = jid.internJID(iq["from"]).userhostJID()
833        service = jid.internJID(iq["to"])
834
835        try:
836            nodeIdentifier = iq.pubsub.purge["node"]
837        except KeyError:
838            raise BadRequest
839
840        return self.purge(requestor, service, nodeIdentifier)
841
842
843    def _onDelete(self, iq):
844        requestor = jid.internJID(iq["from"]).userhostJID()
845        service = jid.internJID(iq["to"])
846
847        try:
848            nodeIdentifier = iq.pubsub.delete["node"]
849        except KeyError:
850            raise BadRequest
851
852        return self.delete(requestor, service, nodeIdentifier)
853
854
855    def _onAffiliationsGet(self, iq):
856        raise Unsupported('modify-affiliations')
857
858
859    def _onAffiliationsSet(self, iq):
860        raise Unsupported('modify-affiliations')
861
862
863    def _onSubscriptionsGet(self, iq):
864        raise Unsupported('manage-subscriptions')
865
866
867    def _onSubscriptionsSet(self, iq):
868        raise Unsupported('manage-subscriptions')
869
870    # public methods
871
872    def notifyPublish(self, service, nodeIdentifier, notifications):
873        for recipient, items in notifications:
874            message = domish.Element((None, "message"))
875            message["from"] = service.full()
876            message["to"] = recipient.full()
877            event = message.addElement((NS_PUBSUB_EVENT, "event"))
878            element = event.addElement("items")
879            element["node"] = nodeIdentifier
880            element.children = items
881            self.send(message)
882
883
884    def notifyDelete(self, service, nodeIdentifier, recipients):
885        for recipient in recipients:
886            message = domish.Element((None, "message"))
887            message["from"] = service.full()
888            message["to"] = recipient.full()
889            event = message.addElement((NS_PUBSUB_EVENT, "event"))
890            element = event.addElement("delete")
891            element["node"] = nodeIdentifier
892            self.send(message)
893
894
895    def getNodeInfo(self, requestor, service, nodeIdentifier):
896        return None
897
898
899    def getNodes(self, requestor, service):
900        return []
901
902
903    def publish(self, requestor, service, nodeIdentifier, items):
904        raise Unsupported('publish')
905
906
907    def subscribe(self, requestor, service, nodeIdentifier, subscriber):
908        raise Unsupported('subscribe')
909
910
911    def unsubscribe(self, requestor, service, nodeIdentifier, subscriber):
912        raise Unsupported('subscribe')
913
914
915    def subscriptions(self, requestor, service):
916        raise Unsupported('retrieve-subscriptions')
917
918
919    def affiliations(self, requestor, service):
920        raise Unsupported('retrieve-affiliations')
921
922
923    def create(self, requestor, service, nodeIdentifier):
924        raise Unsupported('create-nodes')
925
926
927    def getConfigurationOptions(self):
928        return {}
929
930
931    def getDefaultConfiguration(self, requestor, service):
932        raise Unsupported('retrieve-default')
933
934
935    def getConfiguration(self, requestor, service, nodeIdentifier):
936        raise Unsupported('config-node')
937
938
939    def setConfiguration(self, requestor, service, nodeIdentifier, options):
940        raise Unsupported('config-node')
941
942
943    def items(self, requestor, service, nodeIdentifier, maxItems,
944                    itemIdentifiers):
945        raise Unsupported('retrieve-items')
946
947
948    def retract(self, requestor, service, nodeIdentifier, itemIdentifiers):
949        raise Unsupported('retract-items')
950
951
952    def purge(self, requestor, service, nodeIdentifier):
953        raise Unsupported('purge-nodes')
954
955
956    def delete(self, requestor, service, nodeIdentifier):
957        raise Unsupported('delete-nodes')
Note: See TracBrowser for help on using the repository browser.