source: wokkel/pubsub.py @ 17:d42fd6c92576

Last change on this file since 17:d42fd6c92576 was 17:d42fd6c92576, checked in by Ralph Meijer <ralphm@…>, 14 years ago

Add client support for getting items from a node.

File size: 27.1 KB
Line 
1# -*- test-case-name: wokkel.test.test_pubsub -*-
2#
3# Copyright (c) 2003-2008 Ralph Meijer
4# See LICENSE for details.
5
6"""
7XMPP publish-subscribe protocol.
8
9This protocol is specified in
10U{XEP-0060<http://www.xmpp.org/extensions/xep-0060.html>}.
11"""
12
13from zope.interface import implements
14
15from twisted.internet import defer, reactor
16from twisted.words.protocols.jabber import jid, error, xmlstream
17from twisted.words.xish import domish
18
19from wokkel import disco, data_form
20from wokkel.subprotocols import IQHandlerMixin, XMPPHandler
21from wokkel.iwokkel import IPubSubClient, IPubSubService
22
23# Iq get and set XPath queries
24IQ_GET = '/iq[@type="get"]'
25IQ_SET = '/iq[@type="set"]'
26
27# Publish-subscribe namespaces
28NS_PUBSUB = 'http://jabber.org/protocol/pubsub'
29NS_PUBSUB_EVENT = NS_PUBSUB + '#event'
30NS_PUBSUB_ERRORS = NS_PUBSUB + '#errors'
31NS_PUBSUB_OWNER = NS_PUBSUB + "#owner"
32NS_PUBSUB_NODE_CONFIG = NS_PUBSUB + "#node_config"
33NS_PUBSUB_META_DATA = NS_PUBSUB + "#meta-data"
34
35# In publish-subscribe namespace XPath query selector.
36IN_NS_PUBSUB = '[@xmlns="' + NS_PUBSUB + '"]'
37IN_NS_PUBSUB_OWNER = '[@xmlns="' + NS_PUBSUB_OWNER + '"]'
38
39# Publish-subscribe XPath queries
40PUBSUB_ELEMENT = '/pubsub' + IN_NS_PUBSUB
41PUBSUB_OWNER_ELEMENT = '/pubsub' + IN_NS_PUBSUB_OWNER
42PUBSUB_GET = IQ_GET + PUBSUB_ELEMENT
43PUBSUB_SET = IQ_SET + PUBSUB_ELEMENT
44PUBSUB_OWNER_GET = IQ_GET + PUBSUB_OWNER_ELEMENT
45PUBSUB_OWNER_SET = IQ_SET + PUBSUB_OWNER_ELEMENT
46
47# Publish-subscribe command XPath queries
48PUBSUB_PUBLISH = PUBSUB_SET + '/publish' + IN_NS_PUBSUB
49PUBSUB_CREATE = PUBSUB_SET + '/create' + IN_NS_PUBSUB
50PUBSUB_SUBSCRIBE = PUBSUB_SET + '/subscribe' + IN_NS_PUBSUB
51PUBSUB_UNSUBSCRIBE = PUBSUB_SET + '/unsubscribe' + IN_NS_PUBSUB
52PUBSUB_OPTIONS_GET = PUBSUB_GET + '/options' + IN_NS_PUBSUB
53PUBSUB_OPTIONS_SET = PUBSUB_SET + '/options' + IN_NS_PUBSUB
54PUBSUB_DEFAULT = PUBSUB_OWNER_GET + '/default' + IN_NS_PUBSUB_OWNER
55PUBSUB_CONFIGURE_GET = PUBSUB_OWNER_GET + '/configure' + IN_NS_PUBSUB_OWNER
56PUBSUB_CONFIGURE_SET = PUBSUB_OWNER_SET + '/configure' + IN_NS_PUBSUB_OWNER
57PUBSUB_SUBSCRIPTIONS = PUBSUB_GET + '/subscriptions' + IN_NS_PUBSUB
58PUBSUB_AFFILIATIONS = PUBSUB_GET + '/affiliations' + IN_NS_PUBSUB
59PUBSUB_AFFILIATIONS_GET = PUBSUB_OWNER_GET + '/affiliations' + \
60                          IN_NS_PUBSUB_OWNER
61PUBSUB_AFFILIATIONS_SET = PUBSUB_OWNER_SET + '/affiliations' + \
62                          IN_NS_PUBSUB_OWNER
63PUBSUB_SUBSCRIPTIONS_GET = PUBSUB_OWNER_GET + '/subscriptions' + \
64                          IN_NS_PUBSUB_OWNER
65PUBSUB_SUBSCRIPTIONS_SET = PUBSUB_OWNER_SET + '/subscriptions' + \
66                          IN_NS_PUBSUB_OWNER
67PUBSUB_ITEMS = PUBSUB_GET + '/items' + IN_NS_PUBSUB
68PUBSUB_RETRACT = PUBSUB_SET + '/retract' + IN_NS_PUBSUB
69PUBSUB_PURGE = PUBSUB_OWNER_SET + '/purge' + IN_NS_PUBSUB_OWNER
70PUBSUB_DELETE = PUBSUB_OWNER_SET + '/delete' + IN_NS_PUBSUB_OWNER
71
72class BadRequest(error.StanzaError):
73    """
74    Bad request stanza error.
75    """
76    def __init__(self):
77        error.StanzaError.__init__(self, 'bad-request')
78
79
80class SubscriptionPending(Exception):
81    """
82    Raised when the requested subscription is pending acceptance.
83    """
84
85
86class SubscriptionUnconfigured(Exception):
87    """
88    Raised when the requested subscription needs to be configured before
89    becoming active.
90    """
91
92
93class PubSubError(error.StanzaError):
94    """
95    Exception with publish-subscribe specific condition.
96    """
97    def __init__(self, condition, pubsubCondition, feature=None, text=None):
98        appCondition = domish.Element((NS_PUBSUB_ERRORS, pubsubCondition))
99        if feature:
100            appCondition['feature'] = feature
101        error.StanzaError.__init__(self, condition,
102                                         text=text,
103                                         appCondition=appCondition)
104
105
106class Unsupported(PubSubError):
107    def __init__(self, feature, text=None):
108        PubSubError.__init__(self, 'feature-not-implemented',
109                                   'unsupported',
110                                   feature,
111                                   text)
112
113
114class OptionsUnavailable(Unsupported):
115    def __init__(self):
116        Unsupported.__init__(self, 'subscription-options-unavailable')
117
118
119class Item(domish.Element):
120    """
121    Publish subscribe item.
122
123    This behaves like an object providing L{domish.IElement}.
124
125    Item payload can be added using C{addChild} or C{addRawXml}, or using the
126    C{payload} keyword argument to C{__init__}.
127    """
128
129    def __init__(self, id=None, payload=None):
130        """
131        @param id: optional item identifier
132        @type id: L{unicode}
133        @param payload: optional item payload. Either as a domish element, or
134                        as serialized XML.
135        @type payload: object providing L{domish.IElement} or L{unicode}.
136        """
137
138        domish.Element.__init__(self, (None, 'item'))
139        if id is not None:
140            self['id'] = id
141        if payload is not None:
142            if isinstance(payload, basestring):
143                self.addRawXml(payload)
144            else:
145                self.addChild(payload)
146
147
148class PubSubRequest(xmlstream.IQ):
149    """
150    Base class for publish subscribe user requests.
151
152    @cvar namespace: request namespace
153    @cvar verb: request verb
154    @cvar method: type attribute of the IQ request. Either C{'set'} or C{'get'}
155    @ivar command: command element of the request. This is the direct child of
156                   the C{pubsub} element in the C{namespace} with the name
157                   C{verb}.
158    """
159
160    namespace = NS_PUBSUB
161    method = 'set'
162
163    def __init__(self, xs):
164        xmlstream.IQ.__init__(self, xs, self.method)
165        self.addElement((self.namespace, 'pubsub'))
166
167        self.command = self.pubsub.addElement(self.verb)
168
169    def send(self, to):
170        """
171        Send out request.
172
173        Extends L{xmlstream.IQ.send} by requiring the C{to} parameter to be
174        a L{JID} instance.
175
176        @param to: Entity to send the request to.
177        @type to: L{JID}
178        """
179        destination = to.full()
180        return xmlstream.IQ.send(self, destination)
181
182
183class CreateNode(PubSubRequest):
184    verb = 'create'
185
186    def __init__(self, xs, node=None):
187        PubSubRequest.__init__(self, xs)
188        if node:
189            self.command["node"] = node
190
191
192class DeleteNode(PubSubRequest):
193    verb = 'delete'
194    def __init__(self, xs, node):
195        PubSubRequest.__init__(self, xs)
196        self.command["node"] = node
197
198
199class Subscribe(PubSubRequest):
200    verb = 'subscribe'
201
202    def __init__(self, xs, node, subscriber):
203        PubSubRequest.__init__(self, xs)
204        self.command["node"] = node
205        self.command["jid"] = subscriber.full()
206
207
208class Unsubscribe(PubSubRequest):
209    verb = 'unsubscribe'
210
211    def __init__(self, xs, node, subscriber):
212        PubSubRequest.__init__(self, xs)
213        self.command["node"] = node
214        self.command["jid"] = subscriber.full()
215
216
217class Publish(PubSubRequest):
218    verb = 'publish'
219
220    def __init__(self, xs, node):
221        PubSubRequest.__init__(self, xs)
222        self.command["node"] = node
223
224    def addItem(self, id=None, payload=None):
225        item = self.command.addElement("item")
226        item.addChild(payload)
227
228        if id is not None:
229            item["id"] = id
230
231        return item
232
233class Items(PubSubRequest):
234    verb = 'items'
235    method = 'get'
236
237    def __init__(self, xs, node):
238        PubSubRequest.__init__(self, xs)
239        self.command["node"] = node
240
241class PubSubClient(XMPPHandler):
242    """
243    Publish subscribe client protocol.
244    """
245
246    implements(IPubSubClient)
247
248    def connectionInitialized(self):
249        self.xmlstream.addObserver('/message/event[@xmlns="%s"]' %
250                                   NS_PUBSUB_EVENT, self._onEvent)
251
252    def _onEvent(self, message):
253        try:
254            service = jid.JID(message["from"])
255            recipient = jid.JID(message["to"])
256        except KeyError:
257            return
258
259        actionElement = None
260        for element in message.event.elements():
261            if element.uri == NS_PUBSUB_EVENT:
262                actionElement = element
263
264        if not actionElement:
265            return
266
267        eventHandler = getattr(self, "_onEvent_%s" % actionElement.name, None)
268
269        if eventHandler:
270            eventHandler(service, recipient, actionElement)
271            message.handled = True
272
273    def _onEvent_items(self, service, recipient, action):
274        nodeIdentifier = action["node"]
275
276        items = [element for element in action.elements()
277                         if element.name in ('item', 'retract')]
278
279        self.itemsReceived(recipient, service, nodeIdentifier, items)
280
281    def _onEvent_delete(self, service, recipient, action):
282        nodeIdentifier = action["node"]
283        self.deleteReceived(recipient, service, nodeIdentifier)
284
285    def _onEvent_purge(self, service, recipient, action):
286        nodeIdentifier = action["node"]
287        self.purgeReceived(recipient, service, nodeIdentifier)
288
289    def itemsReceived(self, recipient, service, nodeIdentifier, items):
290        pass
291
292    def deleteReceived(self, recipient, service, nodeIdentifier):
293        pass
294
295    def purgeReceived(self, recipient, service, nodeIdentifier):
296        pass
297
298    def createNode(self, service, nodeIdentifier=None):
299        request = CreateNode(self.xmlstream, nodeIdentifier)
300
301        def cb(iq):
302            try:
303                new_node = iq.pubsub.create["node"]
304            except AttributeError:
305                # the suggested node identifier was accepted
306                new_node = nodeIdentifier
307            return new_node
308
309        return request.send(service).addCallback(cb)
310
311    def deleteNode(self, service, nodeIdentifier):
312        return DeleteNode(self.xmlstream, nodeIdentifier).send(service)
313
314    def subscribe(self, service, nodeIdentifier, subscriber):
315        request = Subscribe(self.xmlstream, nodeIdentifier, subscriber)
316
317        def cb(iq):
318            subscription = iq.pubsub.subscription["subscription"]
319
320            if subscription == 'pending':
321                raise SubscriptionPending
322            elif subscription == 'unconfigured':
323                raise SubscriptionUnconfigured
324            else:
325                # we assume subscription == 'subscribed'
326                # any other value would be invalid, but that should have
327                # yielded a stanza error.
328                return None
329
330        return request.send(service).addCallback(cb)
331
332    def unsubscribe(self, service, nodeIdentifier, subscriber):
333        request = Unsubscribe(self.xmlstream, nodeIdentifier, subscriber)
334        return request.send(service)
335
336    def publish(self, service, nodeIdentifier, items=[]):
337        request = Publish(self.xmlstream, nodeIdentifier)
338        for item in items:
339            request.command.addChild(item)
340
341        return request.send(service)
342
343    def items(self, service, nodeIdentifier):
344        def cb(iq):
345            items = []
346            for element in iq.pubsub.items.elements():
347                if element.uri == NS_PUBSUB and element.name == 'item':
348                    items.append(element)
349            return items
350
351        request = Items(self.xmlstream, nodeIdentifier)
352        return request.send(service).addCallback(cb)
353
354class PubSubService(XMPPHandler, IQHandlerMixin):
355    """
356    Protocol implementation for a XMPP Publish Subscribe Service.
357
358    The word Service here is used as taken from the Publish Subscribe
359    specification. It is the party responsible for keeping nodes and their
360    subscriptions, and sending out notifications.
361
362    Methods from the L{IPubSubService} interface that are called as
363    a result of an XMPP request may raise exceptions. Alternatively the
364    deferred returned by these methods may have their errback called. These are
365    handled as follows:
366
367    * If the exception is an instance of L{error.StanzaError}, an error
368      response iq is returned.
369    * Any other exception is reported using L{log.msg}. An error response
370      with the condition C{internal-server-error} is returned.
371
372    The default implementation of said methods raises an L{Unsupported}
373    exception and are meant to be overridden.
374
375    @ivar discoIdentity: Service discovery identity as a dictionary with
376                         keys C{'category'}, C{'type'} and C{'name'}.
377    @ivar pubSubFeatures: List of supported publish-subscribe features for
378                          service discovery, as C{str}.
379    @type pubSubFeatures: C{list} or C{None}.
380    """
381
382    implements(IPubSubService)
383
384    iqHandlers = {
385            PUBSUB_PUBLISH: '_onPublish',
386            PUBSUB_CREATE: '_onCreate',
387            PUBSUB_SUBSCRIBE: '_onSubscribe',
388            PUBSUB_OPTIONS_GET: '_onOptionsGet',
389            PUBSUB_OPTIONS_SET: '_onOptionsSet',
390            PUBSUB_AFFILIATIONS: '_onAffiliations',
391            PUBSUB_ITEMS: '_onItems',
392            PUBSUB_RETRACT: '_onRetract',
393            PUBSUB_SUBSCRIPTIONS: '_onSubscriptions',
394            PUBSUB_UNSUBSCRIBE: '_onUnsubscribe',
395
396            PUBSUB_AFFILIATIONS_GET: '_onAffiliationsGet',
397            PUBSUB_AFFILIATIONS_SET: '_onAffiliationsSet',
398            PUBSUB_CONFIGURE_GET: '_onConfigureGet',
399            PUBSUB_CONFIGURE_SET: '_onConfigureSet',
400            PUBSUB_DEFAULT: '_onDefault',
401            PUBSUB_PURGE: '_onPurge',
402            PUBSUB_DELETE: '_onDelete',
403            PUBSUB_SUBSCRIPTIONS_GET: '_onSubscriptionsGet',
404            PUBSUB_SUBSCRIPTIONS_SET: '_onSubscriptionsSet',
405
406            }
407
408    def __init__(self):
409        self.discoIdentity = {'category': 'pubsub',
410                              'type': 'generic',
411                              'name': 'Generic Publish-Subscribe Service'}
412
413        self.pubSubFeatures = []
414
415    def connectionMade(self):
416        self.xmlstream.addObserver(PUBSUB_GET, self.handleRequest)
417        self.xmlstream.addObserver(PUBSUB_SET, self.handleRequest)
418        self.xmlstream.addObserver(PUBSUB_OWNER_GET, self.handleRequest)
419        self.xmlstream.addObserver(PUBSUB_OWNER_SET, self.handleRequest)
420
421    def getDiscoInfo(self, requestor, target, nodeIdentifier):
422        info = []
423
424        if not nodeIdentifier:
425            info.append(disco.DiscoIdentity(**self.discoIdentity))
426
427            info.append(disco.DiscoFeature(disco.NS_ITEMS))
428            info.extend([disco.DiscoFeature("%s#%s" % (NS_PUBSUB, feature))
429                         for feature in self.pubSubFeatures])
430
431            return defer.succeed(info)
432        else:
433            def toInfo(nodeInfo):
434                if not nodeInfo:
435                    return []
436
437                (nodeType, metaData) = nodeInfo['type'], nodeInfo['meta-data']
438                info.append(disco.DiscoIdentity('pubsub', nodeType))
439                if metaData:
440                    form = data_form.Form(type="result",
441                                          form_type=NS_PUBSUB_META_DATA) 
442                    form.add_field("text-single",
443                                   "pubsub#node_type",
444                                   "The type of node (collection or leaf)",
445                                   nodeType)
446
447                    for metaDatum in metaData:
448                        form.add_field(**metaDatum)
449
450                    info.append(form)
451                return info
452
453            d = self.getNodeInfo(requestor, target, nodeIdentifier)
454            d.addCallback(toInfo)
455            return d
456
457    def getDiscoItems(self, requestor, target, nodeIdentifier):
458        if nodeIdentifier or self.hideNodes:
459            return defer.succeed([])
460
461        d = self.getNodes(requestor, target)
462        d.addCallback(lambda nodes: [disco.DiscoItem(target, node)
463                                     for node in nodes])
464        return d
465
466    def _onPublish(self, iq):
467        requestor = jid.internJID(iq["from"]).userhostJID()
468        service = jid.internJID(iq["to"])
469
470        try:
471            nodeIdentifier = iq.pubsub.publish["node"]
472        except KeyError:
473            raise BadRequest
474
475        items = []
476        for element in iq.pubsub.publish.elements():
477            if element.uri == NS_PUBSUB and element.name == 'item':
478                items.append(element)
479
480        return self.publish(requestor, service, nodeIdentifier, items)
481
482    def _onSubscribe(self, iq):
483        requestor = jid.internJID(iq["from"]).userhostJID()
484        service = jid.internJID(iq["to"])
485
486        try:
487            nodeIdentifier = iq.pubsub.subscribe["node"]
488            subscriber = jid.internJID(iq.pubsub.subscribe["jid"])
489        except KeyError:
490            raise BadRequest
491
492        def toResponse(subscription):
493            nodeIdentifier, state = subscription
494            response = domish.Element((NS_PUBSUB, "pubsub"))
495            subscription = response.addElement("subscription")
496            subscription["node"] = nodeIdentifier
497            subscription["jid"] = subscriber.full()
498            subscription["subscription"] = state
499            return response
500
501        d = self.subscribe(requestor, service, nodeIdentifier, subscriber)
502        d.addCallback(toResponse)
503        return d
504
505    def _onUnsubscribe(self, iq):
506        requestor = jid.internJID(iq["from"]).userhostJID()
507        service = jid.internJID(iq["to"])
508
509        try:
510            nodeIdentifier = iq.pubsub.unsubscribe["node"]
511            subscriber = jid.internJID(iq.pubsub.unsubscribe["jid"])
512        except KeyError:
513            raise BadRequest
514
515        return self.unsubscribe(requestor, service, nodeIdentifier, subscriber)
516
517    def _onOptionsGet(self, iq):
518        raise Unsupported('subscription-options-unavailable')
519
520    def _onOptionsSet(self, iq):
521        raise Unsupported('subscription-options-unavailable')
522
523    def _onSubscriptions(self, iq):
524        requestor = jid.internJID(iq["from"]).userhostJID()
525        service = jid.internJID(iq["to"])
526
527        def toResponse(result):
528            response = domish.Element((NS_PUBSUB, 'pubsub'))
529            subscriptions = response.addElement('subscriptions')
530            for node, subscriber, state in result:
531                item = subscriptions.addElement('subscription')
532                item['node'] = node
533                item['jid'] = subscriber.full()
534                item['subscription'] = state
535            return response
536
537        d = self.subscriptions(requestor, service)
538        d.addCallback(toResponse)
539        return d
540
541    def _onAffiliations(self, iq):
542        requestor = jid.internJID(iq["from"]).userhostJID()
543        service = jid.internJID(iq["to"])
544
545        def toResponse(result):
546            response = domish.Element((NS_PUBSUB, 'pubsub'))
547            affiliations = response.addElement('affiliations')
548
549            for nodeIdentifier, affiliation in result:
550                item = affiliations.addElement('affiliation')
551                item['node'] = nodeIdentifier
552                item['affiliation'] = affiliation
553
554            return response
555
556        d = self.affiliations(requestor, service)
557        d.addCallback(toResponse)
558        return d
559
560    def _onCreate(self, iq):
561        requestor = jid.internJID(iq["from"]).userhostJID()
562        service = jid.internJID(iq["to"])
563        nodeIdentifier = iq.pubsub.create.getAttribute("node")
564
565        def toResponse(result):
566            if not nodeIdentifier or nodeIdentifier != result:
567                response = domish.Element((NS_PUBSUB, 'pubsub'))
568                create = response.addElement('create')
569                create['node'] = result
570                return response
571            else:
572                return None
573
574        d = self.create(requestor, service, nodeIdentifier)
575        d.addCallback(toResponse)
576        return d
577
578    def _formFromConfiguration(self, options):
579        form = data_form.Form(type="form", form_type=NS_PUBSUB_NODE_CONFIG)
580
581        for option in options:
582            form.add_field(**option)
583
584        return form
585
586    def _onDefault(self, iq):
587        requestor = jid.internJID(iq["from"]).userhostJID()
588        service = jid.internJID(iq["to"])
589
590        def toResponse(options):
591            response = domish.Element((NS_PUBSUB_OWNER, "pubsub"))
592            default = response.addElement("default")
593            default.addChild(self._formFromConfiguration(options))
594            return response
595
596        d = self.getDefaultConfiguration(requestor, service)
597        d.addCallback(toResponse)
598        return d
599
600    def _onConfigureGet(self, iq):
601        requestor = jid.internJID(iq["from"]).userhostJID()
602        service = jid.internJID(iq["to"])
603        nodeIdentifier = iq.pubsub.configure.getAttribute("node")
604
605        def toResponse(options):
606            response = domish.Element((NS_PUBSUB_OWNER, "pubsub"))
607            configure = response.addElement("configure")
608            configure.addChild(self._formFromConfiguration(options))
609
610            if nodeIdentifier:
611                configure["node"] = nodeIdentifier
612
613            return response
614
615        d = self.getConfiguration(requestor, service, nodeIdentifier)
616        d.addCallback(toResponse)
617        return d
618
619    def _onConfigureSet(self, iq):
620        requestor = jid.internJID(iq["from"]).userhostJID()
621        service = jid.internJID(iq["to"])
622        nodeIdentifier = iq.pubsub.configure["node"]
623
624        def getFormOptions(self, form):
625            options = {}
626
627            for element in form.elements():
628                if element.name == 'field' and \
629                   element.uri == data_form.NS_X_DATA:
630                    try:
631                        options[element["var"]] = str(element.value)
632                    except (KeyError, AttributeError):
633                        raise BadRequest
634
635            return options
636
637        # Search configuration form with correct FORM_TYPE and process it
638
639        for element in iq.pubsub.configure.elements():
640            if element.name != 'x' or element.uri != data_form.NS_X_DATA:
641                continue
642
643            type = element.getAttribute("type")
644            if type == "cancel":
645                return None
646            elif type != "submit":
647                continue
648
649            options = getFormOptions(element)
650
651            if options["FORM_TYPE"] == NS_PUBSUB + "#node_config":
652                del options["FORM_TYPE"]
653                return self.setConfiguration(requestor, service,
654                                             nodeIdentifier, options)
655
656        raise BadRequest
657
658    def _onItems(self, iq):
659        requestor = jid.internJID(iq["from"]).userhostJID()
660        service = jid.internJID(iq["to"])
661
662        try:
663            nodeIdentifier = iq.pubsub.items["node"]
664        except KeyError:
665            raise BadRequest
666
667        maxItems = iq.pubsub.items.getAttribute('max_items')
668
669        if maxItems:
670            try:
671                maxItems = int(maxItems)
672            except ValueError:
673                raise BadRequest
674
675        itemIdentifiers = []
676        for child in iq.pubsub.items.elements():
677            if child.name == 'item' and child.uri == NS_PUBSUB:
678                try:
679                    itemIdentifiers.append(child["id"])
680                except KeyError:
681                    raise BadRequest
682
683        def toResponse(result):
684            response = domish.Element((NS_PUBSUB, 'pubsub'))
685            items = response.addElement('items')
686            items["node"] = nodeIdentifier
687
688            for item in result:
689                items.addRawXml(item)
690
691            return response
692
693        d = self.items(requestor, service, nodeIdentifier, maxItems,
694                       itemIdentifiers)
695        d.addCallback(toResponse)
696        return d
697
698    def _onRetract(self, iq):
699        requestor = jid.internJID(iq["from"]).userhostJID()
700        service = jid.internJID(iq["to"])
701
702        try:
703            nodeIdentifier = iq.pubsub.retract["node"]
704        except KeyError:
705            raise BadRequest
706
707        itemIdentifiers = []
708        for child in iq.pubsub.retract.elements():
709            if child.uri == NS_PUBSUB_OWNER and child.name == 'item':
710                try:
711                    itemIdentifiers.append(child["id"])
712                except KeyError:
713                    raise BadRequest
714
715        return self.retract(requestor, service, nodeIdentifier,
716                            itemIdentifiers)
717
718    def _onPurge(self, iq):
719        requestor = jid.internJID(iq["from"]).userhostJID()
720        service = jid.internJID(iq["to"])
721
722        try:
723            nodeIdentifier = iq.pubsub.purge["node"]
724        except KeyError:
725            raise BadRequest
726
727        return self.purge(requestor, service, nodeIdentifier)
728
729    def _onDelete(self, iq):
730        requestor = jid.internJID(iq["from"]).userhostJID()
731        service = jid.internJID(iq["to"])
732
733        try:
734            nodeIdentifier = iq.pubsub.delete["node"]
735        except KeyError:
736            raise BadRequest
737
738        return self.delete(requestor, service, nodeIdentifier)
739
740    def _onAffiliationsGet(self, iq):
741        raise Unsupported('modify-affiliations')
742
743    def _onAffiliationsSet(self, iq):
744        raise Unsupported('modify-affiliations')
745
746    def _onSubscriptionsGet(self, iq):
747        raise Unsupported('manage-subscriptions')
748
749    def _onSubscriptionsSet(self, iq):
750        raise Unsupported('manage-subscriptions')
751
752    # public methods
753
754    def notifyPublish(self, service, nodeIdentifier, notifications):
755        for recipient, items in notifications:
756            message = domish.Element((None, "message"))
757            message["from"] = service.full()
758            message["to"] = recipient.full()
759            event = message.addElement((NS_PUBSUB_EVENT, "event"))
760            element = event.addElement("items")
761            element["node"] = nodeIdentifier
762            element.children = items
763            self.send(message)
764
765    def notifyDelete(self, service, nodeIdentifier, recipients):
766        for recipient in recipients:
767            message = domish.Element((None, "message"))
768            message["from"] = service.full()
769            message["to"] = recipient.full()
770            event = message.addElement((NS_PUBSUB_EVENT, "event"))
771            element = event.addElement("delete")
772            element["node"] = nodeIdentifier
773            self.send(message)
774
775    def getNodeInfo(self, requestor, service, nodeIdentifier):
776        return None
777
778    def getNodes(self, requestor, service):
779        return []
780
781    def publish(self, requestor, service, nodeIdentifier, items):
782        raise Unsupported('publish')
783
784    def subscribe(self, requestor, service, nodeIdentifier, subscriber):
785        raise Unsupported('subscribe')
786
787    def unsubscribe(self, requestor, service, nodeIdentifier, subscriber):
788        raise Unsupported('subscribe')
789
790    def subscriptions(self, requestor, service):
791        raise Unsupported('retrieve-subscriptions')
792
793    def affiliations(self, requestor, service):
794        raise Unsupported('retrieve-affiliations')
795
796    def create(self, requestor, service, nodeIdentifier):
797        raise Unsupported('create-nodes')
798
799    def getDefaultConfiguration(self, requestor, service):
800        raise Unsupported('retrieve-default')
801
802    def getConfiguration(self, requestor, service, nodeIdentifier):
803        raise Unsupported('config-node')
804
805    def setConfiguration(self, requestor, service, nodeIdentifier, options):
806        raise Unsupported('config-node')
807
808    def items(self, requestor, service, nodeIdentifier, maxItems,
809                    itemIdentifiers):
810        raise Unsupported('retrieve-items')
811
812    def retract(self, requestor, service, nodeIdentifier, itemIdentifiers):
813        raise Unsupported('retract-items')
814
815    def purge(self, requestor, service, nodeIdentifier):
816        raise Unsupported('purge-nodes')
817
818    def delete(self, requestor, service, nodeIdentifier):
819        raise Unsupported('delete-nodes')
Note: See TracBrowser for help on using the repository browser.