source: wokkel/pubsub.py @ 15:14a00be2ae9d

Last change on this file since 15:14a00be2ae9d was 15:14a00be2ae9d, checked in by Ralph Meijer <ralphm@…>, 14 years ago

Add notifyDelete to send out node deletion notifications.

File size: 26.6 KB
Line 
1# -*- test-case-name: wokkel.test.test_pubsub -*-
2#
3# Copyright (c) 2003-2008 Ralph Meijer
4# See LICENSE for details.
5
6"""
7XMPP publish-subscribe protocol.
8
9This protocol is specified in
10U{XEP-0060<http://www.xmpp.org/extensions/xep-0060.html>}.
11"""
12
13from zope.interface import implements
14
15from twisted.internet import defer, reactor
16from twisted.words.protocols.jabber import jid, error, xmlstream
17from twisted.words.xish import domish
18
19from wokkel import disco, data_form
20from wokkel.subprotocols import IQHandlerMixin, XMPPHandler
21from wokkel.iwokkel import IPubSubClient, IPubSubService
22
23# Iq get and set XPath queries
24IQ_GET = '/iq[@type="get"]'
25IQ_SET = '/iq[@type="set"]'
26
27# Publish-subscribe namespaces
28NS_PUBSUB = 'http://jabber.org/protocol/pubsub'
29NS_PUBSUB_EVENT = NS_PUBSUB + '#event'
30NS_PUBSUB_ERRORS = NS_PUBSUB + '#errors'
31NS_PUBSUB_OWNER = NS_PUBSUB + "#owner"
32NS_PUBSUB_NODE_CONFIG = NS_PUBSUB + "#node_config"
33NS_PUBSUB_META_DATA = NS_PUBSUB + "#meta-data"
34
35# In publish-subscribe namespace XPath query selector.
36IN_NS_PUBSUB = '[@xmlns="' + NS_PUBSUB + '"]'
37IN_NS_PUBSUB_OWNER = '[@xmlns="' + NS_PUBSUB_OWNER + '"]'
38
39# Publish-subscribe XPath queries
40PUBSUB_ELEMENT = '/pubsub' + IN_NS_PUBSUB
41PUBSUB_OWNER_ELEMENT = '/pubsub' + IN_NS_PUBSUB_OWNER
42PUBSUB_GET = IQ_GET + PUBSUB_ELEMENT
43PUBSUB_SET = IQ_SET + PUBSUB_ELEMENT
44PUBSUB_OWNER_GET = IQ_GET + PUBSUB_OWNER_ELEMENT
45PUBSUB_OWNER_SET = IQ_SET + PUBSUB_OWNER_ELEMENT
46
47# Publish-subscribe command XPath queries
48PUBSUB_PUBLISH = PUBSUB_SET + '/publish' + IN_NS_PUBSUB
49PUBSUB_CREATE = PUBSUB_SET + '/create' + IN_NS_PUBSUB
50PUBSUB_SUBSCRIBE = PUBSUB_SET + '/subscribe' + IN_NS_PUBSUB
51PUBSUB_UNSUBSCRIBE = PUBSUB_SET + '/unsubscribe' + IN_NS_PUBSUB
52PUBSUB_OPTIONS_GET = PUBSUB_GET + '/options' + IN_NS_PUBSUB
53PUBSUB_OPTIONS_SET = PUBSUB_SET + '/options' + IN_NS_PUBSUB
54PUBSUB_DEFAULT = PUBSUB_OWNER_GET + '/default' + IN_NS_PUBSUB_OWNER
55PUBSUB_CONFIGURE_GET = PUBSUB_OWNER_GET + '/configure' + IN_NS_PUBSUB_OWNER
56PUBSUB_CONFIGURE_SET = PUBSUB_OWNER_SET + '/configure' + IN_NS_PUBSUB_OWNER
57PUBSUB_SUBSCRIPTIONS = PUBSUB_GET + '/subscriptions' + IN_NS_PUBSUB
58PUBSUB_AFFILIATIONS = PUBSUB_GET + '/affiliations' + IN_NS_PUBSUB
59PUBSUB_AFFILIATIONS_GET = PUBSUB_OWNER_GET + '/affiliations' + \
60                          IN_NS_PUBSUB_OWNER
61PUBSUB_AFFILIATIONS_SET = PUBSUB_OWNER_SET + '/affiliations' + \
62                          IN_NS_PUBSUB_OWNER
63PUBSUB_SUBSCRIPTIONS_GET = PUBSUB_OWNER_GET + '/subscriptions' + \
64                          IN_NS_PUBSUB_OWNER
65PUBSUB_SUBSCRIPTIONS_SET = PUBSUB_OWNER_SET + '/subscriptions' + \
66                          IN_NS_PUBSUB_OWNER
67PUBSUB_ITEMS = PUBSUB_GET + '/items' + IN_NS_PUBSUB
68PUBSUB_RETRACT = PUBSUB_SET + '/retract' + IN_NS_PUBSUB
69PUBSUB_PURGE = PUBSUB_OWNER_SET + '/purge' + IN_NS_PUBSUB_OWNER
70PUBSUB_DELETE = PUBSUB_OWNER_SET + '/delete' + IN_NS_PUBSUB_OWNER
71
72class BadRequest(error.StanzaError):
73    """
74    Bad request stanza error.
75    """
76    def __init__(self):
77        error.StanzaError.__init__(self, 'bad-request')
78
79
80class SubscriptionPending(Exception):
81    """
82    Raised when the requested subscription is pending acceptance.
83    """
84
85
86class SubscriptionUnconfigured(Exception):
87    """
88    Raised when the requested subscription needs to be configured before
89    becoming active.
90    """
91
92
93class PubSubError(error.StanzaError):
94    """
95    Exception with publish-subscribe specific condition.
96    """
97    def __init__(self, condition, pubsubCondition, feature=None, text=None):
98        appCondition = domish.Element((NS_PUBSUB_ERRORS, pubsubCondition))
99        if feature:
100            appCondition['feature'] = feature
101        error.StanzaError.__init__(self, condition,
102                                         text=text,
103                                         appCondition=appCondition)
104
105
106class Unsupported(PubSubError):
107    def __init__(self, feature, text=None):
108        PubSubError.__init__(self, 'feature-not-implemented',
109                                   'unsupported',
110                                   feature,
111                                   text)
112
113
114class OptionsUnavailable(Unsupported):
115    def __init__(self):
116        Unsupported.__init__(self, 'subscription-options-unavailable')
117
118
119class Item(domish.Element):
120    """
121    Publish subscribe item.
122
123    This behaves like an object providing L{domish.IElement}.
124
125    Item payload can be added using C{addChild} or C{addRawXml}, or using the
126    C{payload} keyword argument to C{__init__}.
127    """
128
129    def __init__(self, id=None, payload=None):
130        """
131        @param id: optional item identifier
132        @type id: L{unicode}
133        @param payload: optional item payload. Either as a domish element, or
134                        as serialized XML.
135        @type payload: object providing L{domish.IElement} or L{unicode}.
136        """
137
138        domish.Element.__init__(self, (None, 'item'))
139        if id is not None:
140            self['id'] = id
141        if payload is not None:
142            if isinstance(payload, basestring):
143                self.addRawXml(payload)
144            else:
145                self.addChild(payload)
146
147
148class PubSubRequest(xmlstream.IQ):
149    """
150    Base class for publish subscribe user requests.
151
152    @cvar namespace: request namespace
153    @cvar verb: request verb
154    @cvar method: type attribute of the IQ request. Either C{'set'} or C{'get'}
155    @ivar command: command element of the request. This is the direct child of
156                   the C{pubsub} element in the C{namespace} with the name
157                   C{verb}.
158    """
159
160    namespace = NS_PUBSUB
161    method = 'set'
162
163    def __init__(self, xs):
164        xmlstream.IQ.__init__(self, xs, self.method)
165        self.addElement((self.namespace, 'pubsub'))
166
167        self.command = self.pubsub.addElement(self.verb)
168
169    def send(self, to):
170        """
171        Send out request.
172
173        Extends L{xmlstream.IQ.send} by requiring the C{to} parameter to be
174        a L{JID} instance.
175
176        @param to: Entity to send the request to.
177        @type to: L{JID}
178        """
179        destination = to.full()
180        return xmlstream.IQ.send(self, destination)
181
182
183class CreateNode(PubSubRequest):
184    verb = 'create'
185
186    def __init__(self, xs, node=None):
187        PubSubRequest.__init__(self, xs)
188        if node:
189            self.command["node"] = node
190
191
192class DeleteNode(PubSubRequest):
193    verb = 'delete'
194    def __init__(self, xs, node):
195        PubSubRequest.__init__(self, xs)
196        self.command["node"] = node
197
198
199class Subscribe(PubSubRequest):
200    verb = 'subscribe'
201
202    def __init__(self, xs, node, subscriber):
203        PubSubRequest.__init__(self, xs)
204        self.command["node"] = node
205        self.command["jid"] = subscriber.full()
206
207
208class Unsubscribe(PubSubRequest):
209    verb = 'unsubscribe'
210
211    def __init__(self, xs, node, subscriber):
212        PubSubRequest.__init__(self, xs)
213        self.command["node"] = node
214        self.command["jid"] = subscriber.full()
215
216
217class Publish(PubSubRequest):
218    verb = 'publish'
219
220    def __init__(self, xs, node):
221        PubSubRequest.__init__(self, xs)
222        self.command["node"] = node
223
224    def addItem(self, id=None, payload=None):
225        item = self.command.addElement("item")
226        item.addChild(payload)
227
228        if id is not None:
229            item["id"] = id
230
231        return item
232
233
234class PubSubClient(XMPPHandler):
235    """
236    Publish subscribe client protocol.
237    """
238
239    implements(IPubSubClient)
240
241    def connectionInitialized(self):
242        self.xmlstream.addObserver('/message/event[@xmlns="%s"]' %
243                                   NS_PUBSUB_EVENT, self._onEvent)
244
245    def _onEvent(self, message):
246        try:
247            service = jid.JID(message["from"])
248            recipient = jid.JID(message["to"])
249        except KeyError:
250            return
251
252        actionElement = None
253        for element in message.event.elements():
254            if element.uri == NS_PUBSUB_EVENT:
255                actionElement = element
256
257        if not actionElement:
258            return
259
260        eventHandler = getattr(self, "_onEvent_%s" % actionElement.name, None)
261
262        if eventHandler:
263            eventHandler(service, recipient, actionElement)
264            message.handled = True
265
266    def _onEvent_items(self, service, recipient, action):
267        nodeIdentifier = action["node"]
268
269        items = [element for element in action.elements()
270                         if element.name in ('item', 'retract')]
271
272        self.itemsReceived(recipient, service, nodeIdentifier, items)
273
274    def _onEvent_delete(self, service, recipient, action):
275        nodeIdentifier = action["node"]
276        self.deleteReceived(recipient, service, nodeIdentifier)
277
278    def _onEvent_purge(self, service, recipient, action):
279        nodeIdentifier = action["node"]
280        self.purgeReceived(recipient, service, nodeIdentifier)
281
282    def itemsReceived(self, recipient, service, nodeIdentifier, items):
283        pass
284
285    def deleteReceived(self, recipient, service, nodeIdentifier):
286        pass
287
288    def purgeReceived(self, recipient, service, nodeIdentifier):
289        pass
290
291    def createNode(self, service, nodeIdentifier=None):
292        request = CreateNode(self.xmlstream, nodeIdentifier)
293
294        def cb(iq):
295            try:
296                new_node = iq.pubsub.create["node"]
297            except AttributeError:
298                # the suggested node identifier was accepted
299                new_node = nodeIdentifier
300            return new_node
301
302        return request.send(service).addCallback(cb)
303
304    def deleteNode(self, service, nodeIdentifier):
305        return DeleteNode(self.xmlstream, nodeIdentifier).send(service)
306
307    def subscribe(self, service, nodeIdentifier, subscriber):
308        request = Subscribe(self.xmlstream, nodeIdentifier, subscriber)
309
310        def cb(iq):
311            subscription = iq.pubsub.subscription["subscription"]
312
313            if subscription == 'pending':
314                raise SubscriptionPending
315            elif subscription == 'unconfigured':
316                raise SubscriptionUnconfigured
317            else:
318                # we assume subscription == 'subscribed'
319                # any other value would be invalid, but that should have
320                # yielded a stanza error.
321                return None
322
323        return request.send(service).addCallback(cb)
324
325    def unsubscribe(self, service, nodeIdentifier, subscriber):
326        request = Unsubscribe(self.xmlstream, nodeIdentifier, subscriber)
327        return request.send(service)
328
329    def publish(self, service, nodeIdentifier, items=[]):
330        request = Publish(self.xmlstream, nodeIdentifier)
331        for item in items:
332            request.command.addChild(item)
333
334        return request.send(service)
335
336
337class PubSubService(XMPPHandler, IQHandlerMixin):
338    """
339    Protocol implementation for a XMPP Publish Subscribe Service.
340
341    The word Service here is used as taken from the Publish Subscribe
342    specification. It is the party responsible for keeping nodes and their
343    subscriptions, and sending out notifications.
344
345    Methods from the L{IPubSubService} interface that are called as
346    a result of an XMPP request may raise exceptions. Alternatively the
347    deferred returned by these methods may have their errback called. These are
348    handled as follows:
349
350    * If the exception is an instance of L{error.StanzaError}, an error
351      response iq is returned.
352    * Any other exception is reported using L{log.msg}. An error response
353      with the condition C{internal-server-error} is returned.
354
355    The default implementation of said methods raises an L{Unsupported}
356    exception and are meant to be overridden.
357
358    @ivar discoIdentity: Service discovery identity as a dictionary with
359                         keys C{'category'}, C{'type'} and C{'name'}.
360    @ivar pubSubFeatures: List of supported publish-subscribe features for
361                          service discovery, as C{str}.
362    @type pubSubFeatures: C{list} or C{None}.
363    """
364
365    implements(IPubSubService)
366
367    iqHandlers = {
368            PUBSUB_PUBLISH: '_onPublish',
369            PUBSUB_CREATE: '_onCreate',
370            PUBSUB_SUBSCRIBE: '_onSubscribe',
371            PUBSUB_OPTIONS_GET: '_onOptionsGet',
372            PUBSUB_OPTIONS_SET: '_onOptionsSet',
373            PUBSUB_AFFILIATIONS: '_onAffiliations',
374            PUBSUB_ITEMS: '_onItems',
375            PUBSUB_RETRACT: '_onRetract',
376            PUBSUB_SUBSCRIPTIONS: '_onSubscriptions',
377            PUBSUB_UNSUBSCRIBE: '_onUnsubscribe',
378
379            PUBSUB_AFFILIATIONS_GET: '_onAffiliationsGet',
380            PUBSUB_AFFILIATIONS_SET: '_onAffiliationsSet',
381            PUBSUB_CONFIGURE_GET: '_onConfigureGet',
382            PUBSUB_CONFIGURE_SET: '_onConfigureSet',
383            PUBSUB_DEFAULT: '_onDefault',
384            PUBSUB_PURGE: '_onPurge',
385            PUBSUB_DELETE: '_onDelete',
386            PUBSUB_SUBSCRIPTIONS_GET: '_onSubscriptionsGet',
387            PUBSUB_SUBSCRIPTIONS_SET: '_onSubscriptionsSet',
388
389            }
390
391    def __init__(self):
392        self.discoIdentity = {'category': 'pubsub',
393                              'type': 'generic',
394                              'name': 'Generic Publish-Subscribe Service'}
395
396        self.pubSubFeatures = []
397
398    def connectionMade(self):
399        self.xmlstream.addObserver(PUBSUB_GET, self.handleRequest)
400        self.xmlstream.addObserver(PUBSUB_SET, self.handleRequest)
401        self.xmlstream.addObserver(PUBSUB_OWNER_GET, self.handleRequest)
402        self.xmlstream.addObserver(PUBSUB_OWNER_SET, self.handleRequest)
403
404    def getDiscoInfo(self, requestor, target, nodeIdentifier):
405        info = []
406
407        if not nodeIdentifier:
408            info.append(disco.DiscoIdentity(**self.discoIdentity))
409
410            info.append(disco.DiscoFeature(disco.NS_ITEMS))
411            info.extend([disco.DiscoFeature("%s#%s" % (NS_PUBSUB, feature))
412                         for feature in self.pubSubFeatures])
413
414            return defer.succeed(info)
415        else:
416            def toInfo(nodeInfo):
417                if not nodeInfo:
418                    return []
419
420                (nodeType, metaData) = nodeInfo['type'], nodeInfo['meta-data']
421                info.append(disco.DiscoIdentity('pubsub', nodeType))
422                if metaData:
423                    form = data_form.Form(type="result",
424                                          form_type=NS_PUBSUB_META_DATA) 
425                    form.add_field("text-single",
426                                   "pubsub#node_type",
427                                   "The type of node (collection or leaf)",
428                                   nodeType)
429
430                    for metaDatum in metaData:
431                        form.add_field(**metaDatum)
432
433                    info.append(form)
434                return info
435
436            d = self.getNodeInfo(requestor, target, nodeIdentifier)
437            d.addCallback(toInfo)
438            return d
439
440    def getDiscoItems(self, requestor, target, nodeIdentifier):
441        if nodeIdentifier or self.hideNodes:
442            return defer.succeed([])
443
444        d = self.getNodes(requestor, target)
445        d.addCallback(lambda nodes: [disco.DiscoItem(target, node)
446                                     for node in nodes])
447        return d
448
449    def _onPublish(self, iq):
450        requestor = jid.internJID(iq["from"]).userhostJID()
451        service = jid.internJID(iq["to"])
452
453        try:
454            nodeIdentifier = iq.pubsub.publish["node"]
455        except KeyError:
456            raise BadRequest
457
458        items = []
459        for element in iq.pubsub.publish.elements():
460            if element.uri == NS_PUBSUB and element.name == 'item':
461                items.append(element)
462
463        return self.publish(requestor, service, nodeIdentifier, items)
464
465    def _onSubscribe(self, iq):
466        requestor = jid.internJID(iq["from"]).userhostJID()
467        service = jid.internJID(iq["to"])
468
469        try:
470            nodeIdentifier = iq.pubsub.subscribe["node"]
471            subscriber = jid.internJID(iq.pubsub.subscribe["jid"])
472        except KeyError:
473            raise BadRequest
474
475        def toResponse(subscription):
476            nodeIdentifier, state = subscription
477            response = domish.Element((NS_PUBSUB, "pubsub"))
478            subscription = response.addElement("subscription")
479            subscription["node"] = nodeIdentifier
480            subscription["jid"] = subscriber.full()
481            subscription["subscription"] = state
482            return response
483
484        d = self.subscribe(requestor, service, nodeIdentifier, subscriber)
485        d.addCallback(toResponse)
486        return d
487
488    def _onUnsubscribe(self, iq):
489        requestor = jid.internJID(iq["from"]).userhostJID()
490        service = jid.internJID(iq["to"])
491
492        try:
493            nodeIdentifier = iq.pubsub.unsubscribe["node"]
494            subscriber = jid.internJID(iq.pubsub.unsubscribe["jid"])
495        except KeyError:
496            raise BadRequest
497
498        return self.unsubscribe(requestor, service, nodeIdentifier, subscriber)
499
500    def _onOptionsGet(self, iq):
501        raise Unsupported('subscription-options-unavailable')
502
503    def _onOptionsSet(self, iq):
504        raise Unsupported('subscription-options-unavailable')
505
506    def _onSubscriptions(self, iq):
507        requestor = jid.internJID(iq["from"]).userhostJID()
508        service = jid.internJID(iq["to"])
509
510        def toResponse(result):
511            response = domish.Element((NS_PUBSUB, 'pubsub'))
512            subscriptions = response.addElement('subscriptions')
513            for node, subscriber, state in result:
514                item = subscriptions.addElement('subscription')
515                item['node'] = node
516                item['jid'] = subscriber.full()
517                item['subscription'] = state
518            return response
519
520        d = self.subscriptions(requestor, service)
521        d.addCallback(toResponse)
522        return d
523
524    def _onAffiliations(self, iq):
525        requestor = jid.internJID(iq["from"]).userhostJID()
526        service = jid.internJID(iq["to"])
527
528        def toResponse(result):
529            response = domish.Element((NS_PUBSUB, 'pubsub'))
530            affiliations = response.addElement('affiliations')
531
532            for nodeIdentifier, affiliation in result:
533                item = affiliations.addElement('affiliation')
534                item['node'] = nodeIdentifier
535                item['affiliation'] = affiliation
536
537            return response
538
539        d = self.affiliations(requestor, service)
540        d.addCallback(toResponse)
541        return d
542
543    def _onCreate(self, iq):
544        requestor = jid.internJID(iq["from"]).userhostJID()
545        service = jid.internJID(iq["to"])
546        nodeIdentifier = iq.pubsub.create.getAttribute("node")
547
548        def toResponse(result):
549            if not nodeIdentifier or nodeIdentifier != result:
550                response = domish.Element((NS_PUBSUB, 'pubsub'))
551                create = response.addElement('create')
552                create['node'] = result
553                return response
554            else:
555                return None
556
557        d = self.create(requestor, service, nodeIdentifier)
558        d.addCallback(toResponse)
559        return d
560
561    def _formFromConfiguration(self, options):
562        form = data_form.Form(type="form", form_type=NS_PUBSUB_NODE_CONFIG)
563
564        for option in options:
565            form.add_field(**option)
566
567        return form
568
569    def _onDefault(self, iq):
570        requestor = jid.internJID(iq["from"]).userhostJID()
571        service = jid.internJID(iq["to"])
572
573        def toResponse(options):
574            response = domish.Element((NS_PUBSUB_OWNER, "pubsub"))
575            default = response.addElement("default")
576            default.addChild(self._formFromConfiguration(options))
577            return response
578
579        d = self.getDefaultConfiguration(requestor, service)
580        d.addCallback(toResponse)
581        return d
582
583    def _onConfigureGet(self, iq):
584        requestor = jid.internJID(iq["from"]).userhostJID()
585        service = jid.internJID(iq["to"])
586        nodeIdentifier = iq.pubsub.configure.getAttribute("node")
587
588        def toResponse(options):
589            response = domish.Element((NS_PUBSUB_OWNER, "pubsub"))
590            configure = response.addElement("configure")
591            configure.addChild(self._formFromConfiguration(options))
592
593            if nodeIdentifier:
594                configure["node"] = nodeIdentifier
595
596            return response
597
598        d = self.getConfiguration(requestor, service, nodeIdentifier)
599        d.addCallback(toResponse)
600        return d
601
602    def _onConfigureSet(self, iq):
603        requestor = jid.internJID(iq["from"]).userhostJID()
604        service = jid.internJID(iq["to"])
605        nodeIdentifier = iq.pubsub.configure["node"]
606
607        def getFormOptions(self, form):
608            options = {}
609
610            for element in form.elements():
611                if element.name == 'field' and \
612                   element.uri == data_form.NS_X_DATA:
613                    try:
614                        options[element["var"]] = str(element.value)
615                    except (KeyError, AttributeError):
616                        raise BadRequest
617
618            return options
619
620        # Search configuration form with correct FORM_TYPE and process it
621
622        for element in iq.pubsub.configure.elements():
623            if element.name != 'x' or element.uri != data_form.NS_X_DATA:
624                continue
625
626            type = element.getAttribute("type")
627            if type == "cancel":
628                return None
629            elif type != "submit":
630                continue
631
632            options = getFormOptions(element)
633
634            if options["FORM_TYPE"] == NS_PUBSUB + "#node_config":
635                del options["FORM_TYPE"]
636                return self.setConfiguration(requestor, service,
637                                             nodeIdentifier, options)
638
639        raise BadRequest
640
641    def _onItems(self, iq):
642        requestor = jid.internJID(iq["from"]).userhostJID()
643        service = jid.internJID(iq["to"])
644
645        try:
646            nodeIdentifier = iq.pubsub.items["node"]
647        except KeyError:
648            raise BadRequest
649
650        maxItems = iq.pubsub.items.getAttribute('max_items')
651
652        if maxItems:
653            try:
654                maxItems = int(maxItems)
655            except ValueError:
656                raise BadRequest
657
658        itemIdentifiers = []
659        for child in iq.pubsub.items.elements():
660            if child.name == 'item' and child.uri == NS_PUBSUB:
661                try:
662                    itemIdentifiers.append(child["id"])
663                except KeyError:
664                    raise BadRequest
665
666        def toResponse(result):
667            response = domish.Element((NS_PUBSUB, 'pubsub'))
668            items = response.addElement('items')
669            items["node"] = nodeIdentifier
670
671            for item in result:
672                items.addRawXml(item)
673
674            return response
675
676        d = self.items(requestor, service, nodeIdentifier, maxItems,
677                       itemIdentifiers)
678        d.addCallback(toResponse)
679        return d
680
681    def _onRetract(self, iq):
682        requestor = jid.internJID(iq["from"]).userhostJID()
683        service = jid.internJID(iq["to"])
684
685        try:
686            nodeIdentifier = iq.pubsub.retract["node"]
687        except KeyError:
688            raise BadRequest
689
690        itemIdentifiers = []
691        for child in iq.pubsub.retract.elements():
692            if child.uri == NS_PUBSUB_OWNER and child.name == 'item':
693                try:
694                    itemIdentifiers.append(child["id"])
695                except KeyError:
696                    raise BadRequest
697
698        return self.retract(requestor, service, nodeIdentifier,
699                            itemIdentifiers)
700
701    def _onPurge(self, iq):
702        requestor = jid.internJID(iq["from"]).userhostJID()
703        service = jid.internJID(iq["to"])
704
705        try:
706            nodeIdentifier = iq.pubsub.purge["node"]
707        except KeyError:
708            raise BadRequest
709
710        return self.purge(requestor, service, nodeIdentifier)
711
712    def _onDelete(self, iq):
713        requestor = jid.internJID(iq["from"]).userhostJID()
714        service = jid.internJID(iq["to"])
715
716        try:
717            nodeIdentifier = iq.pubsub.delete["node"]
718        except KeyError:
719            raise BadRequest
720
721        return self.delete(requestor, service, nodeIdentifier)
722
723    def _onAffiliationsGet(self, iq):
724        raise Unsupported('modify-affiliations')
725
726    def _onAffiliationsSet(self, iq):
727        raise Unsupported('modify-affiliations')
728
729    def _onSubscriptionsGet(self, iq):
730        raise Unsupported('manage-subscriptions')
731
732    def _onSubscriptionsSet(self, iq):
733        raise Unsupported('manage-subscriptions')
734
735    # public methods
736
737    def notifyPublish(self, service, nodeIdentifier, notifications):
738        for recipient, items in notifications:
739            message = domish.Element((None, "message"))
740            message["from"] = service.full()
741            message["to"] = recipient.full()
742            event = message.addElement((NS_PUBSUB_EVENT, "event"))
743            element = event.addElement("items")
744            element["node"] = nodeIdentifier
745            element.children = items
746            self.send(message)
747
748    def notifyDelete(self, service, nodeIdentifier, recipients):
749        for recipient in recipients:
750            message = domish.Element((None, "message"))
751            message["from"] = service.full()
752            message["to"] = recipient.full()
753            event = message.addElement((NS_PUBSUB_EVENT, "event"))
754            element = event.addElement("delete")
755            element["node"] = nodeIdentifier
756            self.send(message)
757
758    def getNodeInfo(self, requestor, service, nodeIdentifier):
759        return None
760
761    def getNodes(self, requestor, service):
762        return []
763
764    def publish(self, requestor, service, nodeIdentifier, items):
765        raise Unsupported('publish')
766
767    def subscribe(self, requestor, service, nodeIdentifier, subscriber):
768        raise Unsupported('subscribe')
769
770    def unsubscribe(self, requestor, service, nodeIdentifier, subscriber):
771        raise Unsupported('subscribe')
772
773    def subscriptions(self, requestor, service):
774        raise Unsupported('retrieve-subscriptions')
775
776    def affiliations(self, requestor, service):
777        raise Unsupported('retrieve-affiliations')
778
779    def create(self, requestor, service, nodeIdentifier):
780        raise Unsupported('create-nodes')
781
782    def getDefaultConfiguration(self, requestor, service):
783        raise Unsupported('retrieve-default')
784
785    def getConfiguration(self, requestor, service, nodeIdentifier):
786        raise Unsupported('config-node')
787
788    def setConfiguration(self, requestor, service, nodeIdentifier, options):
789        raise Unsupported('config-node')
790
791    def items(self, requestor, service, nodeIdentifier, maxItems,
792                    itemIdentifiers):
793        raise Unsupported('retrieve-items')
794
795    def retract(self, requestor, service, nodeIdentifier, itemIdentifiers):
796        raise Unsupported('retract-items')
797
798    def purge(self, requestor, service, nodeIdentifier):
799        raise Unsupported('purge-nodes')
800
801    def delete(self, requestor, service, nodeIdentifier):
802        raise Unsupported('delete-nodes')
Note: See TracBrowser for help on using the repository browser.