source: wokkel/pubsub.py @ 13:b460ae320c23

Last change on this file since 13:b460ae320c23 was 13:b460ae320c23, checked in by Ralph Meijer <ralphm@…>, 14 years ago

Add support for receiving notifications for item retraction, node deletion and purging.

File size: 26.1 KB
Line 
1# -*- test-case-name: wokkel.test.test_pubsub -*-
2#
3# Copyright (c) 2003-2008 Ralph Meijer
4# See LICENSE for details.
5
6"""
7XMPP publish-subscribe protocol.
8
9This protocol is specified in
10U{XEP-0060<http://www.xmpp.org/extensions/xep-0060.html>}.
11"""
12
13from zope.interface import implements
14
15from twisted.internet import defer, reactor
16from twisted.words.protocols.jabber import jid, error, xmlstream
17from twisted.words.xish import domish
18
19from wokkel import disco, data_form
20from wokkel.subprotocols import IQHandlerMixin, XMPPHandler
21from wokkel.iwokkel import IPubSubClient, IPubSubService
22
23# Iq get and set XPath queries
24IQ_GET = '/iq[@type="get"]'
25IQ_SET = '/iq[@type="set"]'
26
27# Publish-subscribe namespaces
28NS_PUBSUB = 'http://jabber.org/protocol/pubsub'
29NS_PUBSUB_EVENT = NS_PUBSUB + '#event'
30NS_PUBSUB_ERRORS = NS_PUBSUB + '#errors'
31NS_PUBSUB_OWNER = NS_PUBSUB + "#owner"
32NS_PUBSUB_NODE_CONFIG = NS_PUBSUB + "#node_config"
33NS_PUBSUB_META_DATA = NS_PUBSUB + "#meta-data"
34
35# In publish-subscribe namespace XPath query selector.
36IN_NS_PUBSUB = '[@xmlns="' + NS_PUBSUB + '"]'
37IN_NS_PUBSUB_OWNER = '[@xmlns="' + NS_PUBSUB_OWNER + '"]'
38
39# Publish-subscribe XPath queries
40PUBSUB_ELEMENT = '/pubsub' + IN_NS_PUBSUB
41PUBSUB_OWNER_ELEMENT = '/pubsub' + IN_NS_PUBSUB_OWNER
42PUBSUB_GET = IQ_GET + PUBSUB_ELEMENT
43PUBSUB_SET = IQ_SET + PUBSUB_ELEMENT
44PUBSUB_OWNER_GET = IQ_GET + PUBSUB_OWNER_ELEMENT
45PUBSUB_OWNER_SET = IQ_SET + PUBSUB_OWNER_ELEMENT
46
47# Publish-subscribe command XPath queries
48PUBSUB_PUBLISH = PUBSUB_SET + '/publish' + IN_NS_PUBSUB
49PUBSUB_CREATE = PUBSUB_SET + '/create' + IN_NS_PUBSUB
50PUBSUB_SUBSCRIBE = PUBSUB_SET + '/subscribe' + IN_NS_PUBSUB
51PUBSUB_UNSUBSCRIBE = PUBSUB_SET + '/unsubscribe' + IN_NS_PUBSUB
52PUBSUB_OPTIONS_GET = PUBSUB_GET + '/options' + IN_NS_PUBSUB
53PUBSUB_OPTIONS_SET = PUBSUB_SET + '/options' + IN_NS_PUBSUB
54PUBSUB_DEFAULT = PUBSUB_OWNER_GET + '/default' + IN_NS_PUBSUB_OWNER
55PUBSUB_CONFIGURE_GET = PUBSUB_OWNER_GET + '/configure' + IN_NS_PUBSUB_OWNER
56PUBSUB_CONFIGURE_SET = PUBSUB_OWNER_SET + '/configure' + IN_NS_PUBSUB_OWNER
57PUBSUB_SUBSCRIPTIONS = PUBSUB_GET + '/subscriptions' + IN_NS_PUBSUB
58PUBSUB_AFFILIATIONS = PUBSUB_GET + '/affiliations' + IN_NS_PUBSUB
59PUBSUB_AFFILIATIONS_GET = PUBSUB_OWNER_GET + '/affiliations' + \
60                          IN_NS_PUBSUB_OWNER
61PUBSUB_AFFILIATIONS_SET = PUBSUB_OWNER_SET + '/affiliations' + \
62                          IN_NS_PUBSUB_OWNER
63PUBSUB_SUBSCRIPTIONS_GET = PUBSUB_OWNER_GET + '/subscriptions' + \
64                          IN_NS_PUBSUB_OWNER
65PUBSUB_SUBSCRIPTIONS_SET = PUBSUB_OWNER_SET + '/subscriptions' + \
66                          IN_NS_PUBSUB_OWNER
67PUBSUB_ITEMS = PUBSUB_GET + '/items' + IN_NS_PUBSUB
68PUBSUB_RETRACT = PUBSUB_SET + '/retract' + IN_NS_PUBSUB
69PUBSUB_PURGE = PUBSUB_OWNER_SET + '/purge' + IN_NS_PUBSUB_OWNER
70PUBSUB_DELETE = PUBSUB_OWNER_SET + '/delete' + IN_NS_PUBSUB_OWNER
71
72class BadRequest(error.StanzaError):
73    """
74    Bad request stanza error.
75    """
76    def __init__(self):
77        error.StanzaError.__init__(self, 'bad-request')
78
79
80class SubscriptionPending(Exception):
81    """
82    Raised when the requested subscription is pending acceptance.
83    """
84
85
86class SubscriptionUnconfigured(Exception):
87    """
88    Raised when the requested subscription needs to be configured before
89    becoming active.
90    """
91
92
93class PubSubError(error.StanzaError):
94    """
95    Exception with publish-subscribe specific condition.
96    """
97    def __init__(self, condition, pubsubCondition, feature=None, text=None):
98        appCondition = domish.Element((NS_PUBSUB_ERRORS, pubsubCondition))
99        if feature:
100            appCondition['feature'] = feature
101        error.StanzaError.__init__(self, condition,
102                                         text=text,
103                                         appCondition=appCondition)
104
105
106class Unsupported(PubSubError):
107    def __init__(self, feature, text=None):
108        PubSubError.__init__(self, 'feature-not-implemented',
109                                   'unsupported',
110                                   feature,
111                                   text)
112
113
114class OptionsUnavailable(Unsupported):
115    def __init__(self):
116        Unsupported.__init__(self, 'subscription-options-unavailable')
117
118
119class Item(domish.Element):
120    """
121    Publish subscribe item.
122
123    This behaves like an object providing L{domish.IElement}.
124
125    Item payload can be added using C{addChild} or C{addRawXml}, or using the
126    C{payload} keyword argument to C{__init__}.
127    """
128
129    def __init__(self, id=None, payload=None):
130        """
131        @param id: optional item identifier
132        @type id: L{unicode}
133        @param payload: optional item payload. Either as a domish element, or
134                        as serialized XML.
135        @type payload: object providing L{domish.IElement} or L{unicode}.
136        """
137
138        domish.Element.__init__(self, (None, 'item'))
139        if id is not None:
140            self['id'] = id
141        if payload is not None:
142            if isinstance(payload, basestring):
143                self.addRawXml(payload)
144            else:
145                self.addChild(payload)
146
147
148class PubSubRequest(xmlstream.IQ):
149    """
150    Base class for publish subscribe user requests.
151
152    @cvar namespace: request namespace
153    @cvar verb: request verb
154    @cvar method: type attribute of the IQ request. Either C{'set'} or C{'get'}
155    @ivar command: command element of the request. This is the direct child of
156                   the C{pubsub} element in the C{namespace} with the name
157                   C{verb}.
158    """
159
160    namespace = NS_PUBSUB
161    method = 'set'
162
163    def __init__(self, xs):
164        xmlstream.IQ.__init__(self, xs, self.method)
165        self.addElement((self.namespace, 'pubsub'))
166
167        self.command = self.pubsub.addElement(self.verb)
168
169    def send(self, to):
170        """
171        Send out request.
172
173        Extends L{xmlstream.IQ.send} by requiring the C{to} parameter to be
174        a L{JID} instance.
175
176        @param to: Entity to send the request to.
177        @type to: L{JID}
178        """
179        destination = to.full()
180        return xmlstream.IQ.send(self, destination)
181
182
183class CreateNode(PubSubRequest):
184    verb = 'create'
185
186    def __init__(self, xs, node=None):
187        PubSubRequest.__init__(self, xs)
188        if node:
189            self.command["node"] = node
190
191
192class DeleteNode(PubSubRequest):
193    verb = 'delete'
194    def __init__(self, xs, node):
195        PubSubRequest.__init__(self, xs)
196        self.command["node"] = node
197
198
199class Subscribe(PubSubRequest):
200    verb = 'subscribe'
201
202    def __init__(self, xs, node, subscriber):
203        PubSubRequest.__init__(self, xs)
204        self.command["node"] = node
205        self.command["jid"] = subscriber.full()
206
207
208class Unsubscribe(PubSubRequest):
209    verb = 'unsubscribe'
210
211    def __init__(self, xs, node, subscriber):
212        PubSubRequest.__init__(self, xs)
213        self.command["node"] = node
214        self.command["jid"] = subscriber.full()
215
216
217class Publish(PubSubRequest):
218    verb = 'publish'
219
220    def __init__(self, xs, node):
221        PubSubRequest.__init__(self, xs)
222        self.command["node"] = node
223
224    def addItem(self, id=None, payload=None):
225        item = self.command.addElement("item")
226        item.addChild(payload)
227
228        if id is not None:
229            item["id"] = id
230
231        return item
232
233
234class PubSubClient(XMPPHandler):
235    """
236    Publish subscribe client protocol.
237    """
238
239    implements(IPubSubClient)
240
241    def connectionInitialized(self):
242        self.xmlstream.addObserver('/message/event[@xmlns="%s"]' %
243                                   NS_PUBSUB_EVENT, self._onEvent)
244
245    def _onEvent(self, message):
246        try:
247            service = jid.JID(message["from"])
248            recipient = jid.JID(message["to"])
249        except KeyError:
250            return
251
252        for element in message.event.elements():
253            if element.uri == NS_PUBSUB_EVENT:
254                actionElement = element
255
256        if not actionElement:
257            return
258
259        eventHandler = getattr(self, "_onEvent_%s" % actionElement.name, None)
260
261        if eventHandler:
262            eventHandler(service, recipient, actionElement)
263            message.handled = True
264
265    def _onEvent_items(self, service, recipient, action):
266        nodeIdentifier = action["node"]
267
268        items = [element for element in action.elements()
269                         if element.name in ('item', 'retract')]
270
271        self.itemsReceived(recipient, service, nodeIdentifier, items)
272
273    def _onEvent_delete(self, service, recipient, action):
274        nodeIdentifier = action["node"]
275        self.deleteReceived(recipient, service, nodeIdentifier)
276
277    def _onEvent_purge(self, service, recipient, action):
278        nodeIdentifier = action["node"]
279        self.purgeReceived(recipient, service, nodeIdentifier)
280
281    def itemsReceived(self, recipient, service, nodeIdentifier, items):
282        pass
283
284    def deleteReceived(self, recipient, service, nodeIdentifier):
285        pass
286
287    def purgeReceived(self, recipient, service, nodeIdentifier):
288        pass
289
290    def createNode(self, service, nodeIdentifier=None):
291        request = CreateNode(self.xmlstream, nodeIdentifier)
292
293        def cb(iq):
294            try:
295                new_node = iq.pubsub.create["node"]
296            except AttributeError:
297                # the suggested node identifier was accepted
298                new_node = nodeIdentifier
299            return new_node
300
301        return request.send(service).addCallback(cb)
302
303    def deleteNode(self, service, nodeIdentifier):
304        return DeleteNode(self.xmlstream, nodeIdentifier).send(service)
305
306    def subscribe(self, service, nodeIdentifier, subscriber):
307        request = Subscribe(self.xmlstream, nodeIdentifier, subscriber)
308
309        def cb(iq):
310            subscription = iq.pubsub.subscription["subscription"]
311
312            if subscription == 'pending':
313                raise SubscriptionPending
314            elif subscription == 'unconfigured':
315                raise SubscriptionUnconfigured
316            else:
317                # we assume subscription == 'subscribed'
318                # any other value would be invalid, but that should have
319                # yielded a stanza error.
320                return None
321
322        return request.send(service).addCallback(cb)
323
324    def unsubscribe(self, service, nodeIdentifier, subscriber):
325        request = Unsubscribe(self.xmlstream, nodeIdentifier, subscriber)
326        return request.send(service)
327
328    def publish(self, service, nodeIdentifier, items=[]):
329        request = Publish(self.xmlstream, nodeIdentifier)
330        for item in items:
331            request.command.addChild(item)
332
333        return request.send(service)
334
335
336class PubSubService(XMPPHandler, IQHandlerMixin):
337    """
338    Protocol implementation for a XMPP Publish Subscribe Service.
339
340    The word Service here is used as taken from the Publish Subscribe
341    specification. It is the party responsible for keeping nodes and their
342    subscriptions, and sending out notifications.
343
344    Methods from the L{IPubSubService} interface that are called as
345    a result of an XMPP request may raise exceptions. Alternatively the
346    deferred returned by these methods may have their errback called. These are
347    handled as follows:
348
349    * If the exception is an instance of L{error.StanzaError}, an error
350      response iq is returned.
351    * Any other exception is reported using L{log.msg}. An error response
352      with the condition C{internal-server-error} is returned.
353
354    The default implementation of said methods raises an L{Unsupported}
355    exception and are meant to be overridden.
356
357    @ivar discoIdentity: Service discovery identity as a dictionary with
358                         keys C{'category'}, C{'type'} and C{'name'}.
359    @ivar pubSubFeatures: List of supported publish-subscribe features for
360                          service discovery, as C{str}.
361    @type pubSubFeatures: C{list} or C{None}.
362    """
363
364    implements(IPubSubService)
365
366    iqHandlers = {
367            PUBSUB_PUBLISH: '_onPublish',
368            PUBSUB_CREATE: '_onCreate',
369            PUBSUB_SUBSCRIBE: '_onSubscribe',
370            PUBSUB_OPTIONS_GET: '_onOptionsGet',
371            PUBSUB_OPTIONS_SET: '_onOptionsSet',
372            PUBSUB_AFFILIATIONS: '_onAffiliations',
373            PUBSUB_ITEMS: '_onItems',
374            PUBSUB_RETRACT: '_onRetract',
375            PUBSUB_SUBSCRIPTIONS: '_onSubscriptions',
376            PUBSUB_UNSUBSCRIBE: '_onUnsubscribe',
377
378            PUBSUB_AFFILIATIONS_GET: '_onAffiliationsGet',
379            PUBSUB_AFFILIATIONS_SET: '_onAffiliationsSet',
380            PUBSUB_CONFIGURE_GET: '_onConfigureGet',
381            PUBSUB_CONFIGURE_SET: '_onConfigureSet',
382            PUBSUB_DEFAULT: '_onDefault',
383            PUBSUB_PURGE: '_onPurge',
384            PUBSUB_DELETE: '_onDelete',
385            PUBSUB_SUBSCRIPTIONS_GET: '_onSubscriptionsGet',
386            PUBSUB_SUBSCRIPTIONS_SET: '_onSubscriptionsSet',
387
388            }
389
390    def __init__(self):
391        self.discoIdentity = {'category': 'pubsub',
392                              'type': 'generic',
393                              'name': 'Generic Publish-Subscribe Service'}
394
395        self.pubSubFeatures = []
396
397    def connectionMade(self):
398        self.xmlstream.addObserver(PUBSUB_GET, self.handleRequest)
399        self.xmlstream.addObserver(PUBSUB_SET, self.handleRequest)
400        self.xmlstream.addObserver(PUBSUB_OWNER_GET, self.handleRequest)
401        self.xmlstream.addObserver(PUBSUB_OWNER_SET, self.handleRequest)
402
403    def getDiscoInfo(self, requestor, target, nodeIdentifier):
404        info = []
405
406        if not nodeIdentifier:
407            info.append(disco.DiscoIdentity(**self.discoIdentity))
408
409            info.append(disco.DiscoFeature(disco.NS_ITEMS))
410            info.extend([disco.DiscoFeature("%s#%s" % (NS_PUBSUB, feature))
411                         for feature in self.pubSubFeatures])
412
413            return defer.succeed(info)
414        else:
415            def toInfo(nodeInfo):
416                if not nodeInfo:
417                    return []
418
419                (nodeType, metaData) = nodeInfo['type'], nodeInfo['meta-data']
420                info.append(disco.DiscoIdentity('pubsub', nodeType))
421                if metaData:
422                    form = data_form.Form(type="result",
423                                          form_type=NS_PUBSUB_META_DATA) 
424                    form.add_field("text-single",
425                                   "pubsub#node_type",
426                                   "The type of node (collection or leaf)",
427                                   nodeType)
428
429                    for metaDatum in metaData:
430                        form.add_field(**metaDatum)
431
432                    info.append(form)
433                return info
434
435            d = self.getNodeInfo(requestor, target, nodeIdentifier)
436            d.addCallback(toInfo)
437            return d
438
439    def getDiscoItems(self, requestor, target, nodeIdentifier):
440        if nodeIdentifier or self.hideNodes:
441            return defer.succeed([])
442
443        d = self.getNodes(requestor, target)
444        d.addCallback(lambda nodes: [disco.DiscoItem(target, node)
445                                     for node in nodes])
446        return d
447
448    def _onPublish(self, iq):
449        requestor = jid.internJID(iq["from"]).userhostJID()
450        service = jid.internJID(iq["to"])
451
452        try:
453            nodeIdentifier = iq.pubsub.publish["node"]
454        except KeyError:
455            raise BadRequest
456
457        items = []
458        for element in iq.pubsub.publish.elements():
459            if element.uri == NS_PUBSUB and element.name == 'item':
460                items.append(element)
461
462        return self.publish(requestor, service, nodeIdentifier, items)
463
464    def _onSubscribe(self, iq):
465        requestor = jid.internJID(iq["from"]).userhostJID()
466        service = jid.internJID(iq["to"])
467
468        try:
469            nodeIdentifier = iq.pubsub.subscribe["node"]
470            subscriber = jid.internJID(iq.pubsub.subscribe["jid"])
471        except KeyError:
472            raise BadRequest
473
474        def toResponse(subscription):
475            nodeIdentifier, state = subscription
476            response = domish.Element((NS_PUBSUB, "pubsub"))
477            subscription = response.addElement("subscription")
478            subscription["node"] = nodeIdentifier
479            subscription["jid"] = subscriber.full()
480            subscription["subscription"] = state
481            return response
482
483        d = self.subscribe(requestor, service, nodeIdentifier, subscriber)
484        d.addCallback(toResponse)
485        return d
486
487    def _onUnsubscribe(self, iq):
488        requestor = jid.internJID(iq["from"]).userhostJID()
489        service = jid.internJID(iq["to"])
490
491        try:
492            nodeIdentifier = iq.pubsub.unsubscribe["node"]
493            subscriber = jid.internJID(iq.pubsub.unsubscribe["jid"])
494        except KeyError:
495            raise BadRequest
496
497        return self.unsubscribe(requestor, service, nodeIdentifier, subscriber)
498
499    def _onOptionsGet(self, iq):
500        raise Unsupported('subscription-options-unavailable')
501
502    def _onOptionsSet(self, iq):
503        raise Unsupported('subscription-options-unavailable')
504
505    def _onSubscriptions(self, iq):
506        requestor = jid.internJID(iq["from"]).userhostJID()
507        service = jid.internJID(iq["to"])
508
509        def toResponse(result):
510            response = domish.Element((NS_PUBSUB, 'pubsub'))
511            subscriptions = response.addElement('subscriptions')
512            for node, subscriber, state in result:
513                item = subscriptions.addElement('subscription')
514                item['node'] = node
515                item['jid'] = subscriber.full()
516                item['subscription'] = state
517            return response
518
519        d = self.subscriptions(requestor, service)
520        d.addCallback(toResponse)
521        return d
522
523    def _onAffiliations(self, iq):
524        requestor = jid.internJID(iq["from"]).userhostJID()
525        service = jid.internJID(iq["to"])
526
527        def toResponse(result):
528            response = domish.Element((NS_PUBSUB, 'pubsub'))
529            affiliations = response.addElement('affiliations')
530
531            for nodeIdentifier, affiliation in result:
532                item = affiliations.addElement('affiliation')
533                item['node'] = nodeIdentifier
534                item['affiliation'] = affiliation
535
536            return response
537
538        d = self.affiliations(requestor, service)
539        d.addCallback(toResponse)
540        return d
541
542    def _onCreate(self, iq):
543        requestor = jid.internJID(iq["from"]).userhostJID()
544        service = jid.internJID(iq["to"])
545        nodeIdentifier = iq.pubsub.create.getAttribute("node")
546
547        def toResponse(result):
548            if not nodeIdentifier or nodeIdentifier != result:
549                response = domish.Element((NS_PUBSUB, 'pubsub'))
550                create = response.addElement('create')
551                create['node'] = result
552                return response
553            else:
554                return None
555
556        d = self.create(requestor, service, nodeIdentifier)
557        d.addCallback(toResponse)
558        return d
559
560    def _formFromConfiguration(self, options):
561        form = data_form.Form(type="form", form_type=NS_PUBSUB_NODE_CONFIG)
562
563        for option in options:
564            form.add_field(**option)
565
566        return form
567
568    def _onDefault(self, iq):
569        requestor = jid.internJID(iq["from"]).userhostJID()
570        service = jid.internJID(iq["to"])
571
572        def toResponse(options):
573            response = domish.Element((NS_PUBSUB_OWNER, "pubsub"))
574            default = response.addElement("default")
575            default.addChild(self._formFromConfiguration(options))
576            return response
577
578        d = self.getDefaultConfiguration(requestor, service)
579        d.addCallback(toResponse)
580        return d
581
582    def _onConfigureGet(self, iq):
583        requestor = jid.internJID(iq["from"]).userhostJID()
584        service = jid.internJID(iq["to"])
585        nodeIdentifier = iq.pubsub.configure.getAttribute("node")
586
587        def toResponse(options):
588            response = domish.Element((NS_PUBSUB_OWNER, "pubsub"))
589            configure = response.addElement("configure")
590            configure.addChild(self._formFromConfiguration(options))
591
592            if nodeIdentifier:
593                configure["node"] = nodeIdentifier
594
595            return response
596
597        d = self.getConfiguration(requestor, service, nodeIdentifier)
598        d.addCallback(toResponse)
599        return d
600
601    def _onConfigureSet(self, iq):
602        requestor = jid.internJID(iq["from"]).userhostJID()
603        service = jid.internJID(iq["to"])
604        nodeIdentifier = iq.pubsub.configure["node"]
605
606        def getFormOptions(self, form):
607            options = {}
608
609            for element in form.elements():
610                if element.name == 'field' and \
611                   element.uri == data_form.NS_X_DATA:
612                    try:
613                        options[element["var"]] = str(element.value)
614                    except (KeyError, AttributeError):
615                        raise BadRequest
616
617            return options
618
619        # Search configuration form with correct FORM_TYPE and process it
620
621        for element in iq.pubsub.configure.elements():
622            if element.name != 'x' or element.uri != data_form.NS_X_DATA:
623                continue
624
625            type = element.getAttribute("type")
626            if type == "cancel":
627                return None
628            elif type != "submit":
629                continue
630
631            options = getFormOptions(element)
632
633            if options["FORM_TYPE"] == NS_PUBSUB + "#node_config":
634                del options["FORM_TYPE"]
635                return self.setConfiguration(requestor, service,
636                                             nodeIdentifier, options)
637
638        raise BadRequest
639
640    def _onItems(self, iq):
641        requestor = jid.internJID(iq["from"]).userhostJID()
642        service = jid.internJID(iq["to"])
643
644        try:
645            nodeIdentifier = iq.pubsub.items["node"]
646        except KeyError:
647            raise BadRequest
648
649        maxItems = iq.pubsub.items.getAttribute('max_items')
650
651        if maxItems:
652            try:
653                maxItems = int(maxItems)
654            except ValueError:
655                raise BadRequest
656
657        itemIdentifiers = []
658        for child in iq.pubsub.items.elements():
659            if child.name == 'item' and child.uri == NS_PUBSUB:
660                try:
661                    itemIdentifiers.append(child["id"])
662                except KeyError:
663                    raise BadRequest
664
665        def toResponse(result):
666            response = domish.Element((NS_PUBSUB, 'pubsub'))
667            items = response.addElement('items')
668            items["node"] = nodeIdentifier
669
670            for item in result:
671                items.addRawXml(item)
672
673            return response
674
675        d = self.items(requestor, service, nodeIdentifier, maxItems,
676                       itemIdentifiers)
677        d.addCallback(toResponse)
678        return d
679
680    def _onRetract(self, iq):
681        requestor = jid.internJID(iq["from"]).userhostJID()
682        service = jid.internJID(iq["to"])
683
684        try:
685            nodeIdentifier = iq.pubsub.retract["node"]
686        except KeyError:
687            raise BadRequest
688
689        itemIdentifiers = []
690        for child in iq.pubsub.retract.elements():
691            if child.uri == NS_PUBSUB_OWNER and child.name == 'item':
692                try:
693                    itemIdentifiers.append(child["id"])
694                except KeyError:
695                    raise BadRequest
696
697        return self.retract(requestor, service, nodeIdentifier,
698                            itemIdentifiers)
699
700    def _onPurge(self, iq):
701        requestor = jid.internJID(iq["from"]).userhostJID()
702        service = jid.internJID(iq["to"])
703
704        try:
705            nodeIdentifier = iq.pubsub.purge["node"]
706        except KeyError:
707            raise BadRequest
708
709        return self.purge(requestor, service, nodeIdentifier)
710
711    def _onDelete(self, iq):
712        requestor = jid.internJID(iq["from"]).userhostJID()
713        service = jid.internJID(iq["to"])
714
715        try:
716            nodeIdentifier = iq.pubsub.delete["node"]
717        except KeyError:
718            raise BadRequest
719
720        return self.delete(requestor, service, nodeIdentifier)
721
722    def _onAffiliationsGet(self, iq):
723        raise Unsupported('modify-affiliations')
724
725    def _onAffiliationsSet(self, iq):
726        raise Unsupported('modify-affiliations')
727
728    def _onSubscriptionsGet(self, iq):
729        raise Unsupported('manage-subscriptions')
730
731    def _onSubscriptionsSet(self, iq):
732        raise Unsupported('manage-subscriptions')
733
734    # public methods
735
736    def notifyPublish(self, service, nodeIdentifier, notifications):
737
738        print notifications
739        for recipient, items in notifications:
740            message = domish.Element((None, "message"))
741            message["from"] = service.full()
742            message["to"] = recipient.full()
743            event = message.addElement((NS_PUBSUB_EVENT, "event"))
744            element = event.addElement("items")
745            element["node"] = nodeIdentifier
746            element.children = items
747            self.send(message)
748
749    def getNodeInfo(self, requestor, service, nodeIdentifier):
750        return None
751
752    def getNodes(self, requestor, service):
753        return []
754
755    def publish(self, requestor, service, nodeIdentifier, items):
756        raise Unsupported('publish')
757
758    def subscribe(self, requestor, service, nodeIdentifier, subscriber):
759        raise Unsupported('subscribe')
760
761    def unsubscribe(self, requestor, service, nodeIdentifier, subscriber):
762        raise Unsupported('subscribe')
763
764    def subscriptions(self, requestor, service):
765        raise Unsupported('retrieve-subscriptions')
766
767    def affiliations(self, requestor, service):
768        raise Unsupported('retrieve-affiliations')
769
770    def create(self, requestor, service, nodeIdentifier):
771        raise Unsupported('create-nodes')
772
773    def getDefaultConfiguration(self, requestor, service):
774        raise Unsupported('retrieve-default')
775
776    def getConfiguration(self, requestor, service, nodeIdentifier):
777        raise Unsupported('config-node')
778
779    def setConfiguration(self, requestor, service, nodeIdentifier, options):
780        raise Unsupported('config-node')
781
782    def items(self, requestor, service, nodeIdentifier, maxItems,
783                    itemIdentifiers):
784        raise Unsupported('retrieve-items')
785
786    def retract(self, requestor, service, nodeIdentifier, itemIdentifiers):
787        raise Unsupported('retract-items')
788
789    def purge(self, requestor, service, nodeIdentifier):
790        raise Unsupported('purge-nodes')
791
792    def delete(self, requestor, service, nodeIdentifier):
793        raise Unsupported('delete-nodes')
Note: See TracBrowser for help on using the repository browser.