source: wokkel/pubsub.py @ 22:9edffdf4aee4

Last change on this file since 22:9edffdf4aee4 was 22:9edffdf4aee4, checked in by Ralph Meijer <ralphm@…>, 13 years ago

Clarify use of IQHandlerMixin, fix some doc strings.

File size: 28.5 KB
Line 
1# -*- test-case-name: wokkel.test.test_pubsub -*-
2#
3# Copyright (c) 2003-2008 Ralph Meijer
4# See LICENSE for details.
5
6"""
7XMPP publish-subscribe protocol.
8
9This protocol is specified in
10U{XEP-0060<http://www.xmpp.org/extensions/xep-0060.html>}.
11"""
12
13from zope.interface import implements
14
15from twisted.internet import defer
16from twisted.words.protocols.jabber import jid, error, xmlstream
17from twisted.words.xish import domish
18
19from wokkel import disco, data_form
20from wokkel.subprotocols import IQHandlerMixin, XMPPHandler
21from wokkel.iwokkel import IPubSubClient, IPubSubService
22
23# Iq get and set XPath queries
24IQ_GET = '/iq[@type="get"]'
25IQ_SET = '/iq[@type="set"]'
26
27# Publish-subscribe namespaces
28NS_PUBSUB = 'http://jabber.org/protocol/pubsub'
29NS_PUBSUB_EVENT = NS_PUBSUB + '#event'
30NS_PUBSUB_ERRORS = NS_PUBSUB + '#errors'
31NS_PUBSUB_OWNER = NS_PUBSUB + "#owner"
32NS_PUBSUB_NODE_CONFIG = NS_PUBSUB + "#node_config"
33NS_PUBSUB_META_DATA = NS_PUBSUB + "#meta-data"
34
35# In publish-subscribe namespace XPath query selector.
36IN_NS_PUBSUB = '[@xmlns="' + NS_PUBSUB + '"]'
37IN_NS_PUBSUB_OWNER = '[@xmlns="' + NS_PUBSUB_OWNER + '"]'
38
39# Publish-subscribe XPath queries
40PUBSUB_ELEMENT = '/pubsub' + IN_NS_PUBSUB
41PUBSUB_OWNER_ELEMENT = '/pubsub' + IN_NS_PUBSUB_OWNER
42PUBSUB_GET = IQ_GET + PUBSUB_ELEMENT
43PUBSUB_SET = IQ_SET + PUBSUB_ELEMENT
44PUBSUB_OWNER_GET = IQ_GET + PUBSUB_OWNER_ELEMENT
45PUBSUB_OWNER_SET = IQ_SET + PUBSUB_OWNER_ELEMENT
46
47# Publish-subscribe command XPath queries
48PUBSUB_PUBLISH = PUBSUB_SET + '/publish' + IN_NS_PUBSUB
49PUBSUB_CREATE = PUBSUB_SET + '/create' + IN_NS_PUBSUB
50PUBSUB_SUBSCRIBE = PUBSUB_SET + '/subscribe' + IN_NS_PUBSUB
51PUBSUB_UNSUBSCRIBE = PUBSUB_SET + '/unsubscribe' + IN_NS_PUBSUB
52PUBSUB_OPTIONS_GET = PUBSUB_GET + '/options' + IN_NS_PUBSUB
53PUBSUB_OPTIONS_SET = PUBSUB_SET + '/options' + IN_NS_PUBSUB
54PUBSUB_DEFAULT = PUBSUB_OWNER_GET + '/default' + IN_NS_PUBSUB_OWNER
55PUBSUB_CONFIGURE_GET = PUBSUB_OWNER_GET + '/configure' + IN_NS_PUBSUB_OWNER
56PUBSUB_CONFIGURE_SET = PUBSUB_OWNER_SET + '/configure' + IN_NS_PUBSUB_OWNER
57PUBSUB_SUBSCRIPTIONS = PUBSUB_GET + '/subscriptions' + IN_NS_PUBSUB
58PUBSUB_AFFILIATIONS = PUBSUB_GET + '/affiliations' + IN_NS_PUBSUB
59PUBSUB_AFFILIATIONS_GET = PUBSUB_OWNER_GET + '/affiliations' + \
60                          IN_NS_PUBSUB_OWNER
61PUBSUB_AFFILIATIONS_SET = PUBSUB_OWNER_SET + '/affiliations' + \
62                          IN_NS_PUBSUB_OWNER
63PUBSUB_SUBSCRIPTIONS_GET = PUBSUB_OWNER_GET + '/subscriptions' + \
64                          IN_NS_PUBSUB_OWNER
65PUBSUB_SUBSCRIPTIONS_SET = PUBSUB_OWNER_SET + '/subscriptions' + \
66                          IN_NS_PUBSUB_OWNER
67PUBSUB_ITEMS = PUBSUB_GET + '/items' + IN_NS_PUBSUB
68PUBSUB_RETRACT = PUBSUB_SET + '/retract' + IN_NS_PUBSUB
69PUBSUB_PURGE = PUBSUB_OWNER_SET + '/purge' + IN_NS_PUBSUB_OWNER
70PUBSUB_DELETE = PUBSUB_OWNER_SET + '/delete' + IN_NS_PUBSUB_OWNER
71
72class BadRequest(error.StanzaError):
73    """
74    Bad request stanza error.
75    """
76    def __init__(self):
77        error.StanzaError.__init__(self, 'bad-request')
78
79
80
81class SubscriptionPending(Exception):
82    """
83    Raised when the requested subscription is pending acceptance.
84    """
85
86
87
88class SubscriptionUnconfigured(Exception):
89    """
90    Raised when the requested subscription needs to be configured before
91    becoming active.
92    """
93
94
95
96class PubSubError(error.StanzaError):
97    """
98    Exception with publish-subscribe specific condition.
99    """
100    def __init__(self, condition, pubsubCondition, feature=None, text=None):
101        appCondition = domish.Element((NS_PUBSUB_ERRORS, pubsubCondition))
102        if feature:
103            appCondition['feature'] = feature
104        error.StanzaError.__init__(self, condition,
105                                         text=text,
106                                         appCondition=appCondition)
107
108
109
110class Unsupported(PubSubError):
111    def __init__(self, feature, text=None):
112        PubSubError.__init__(self, 'feature-not-implemented',
113                                   'unsupported',
114                                   feature,
115                                   text)
116
117
118
119class OptionsUnavailable(Unsupported):
120    def __init__(self):
121        Unsupported.__init__(self, 'subscription-options-unavailable')
122
123
124
125class Item(domish.Element):
126    """
127    Publish subscribe item.
128
129    This behaves like an object providing L{domish.IElement}.
130
131    Item payload can be added using C{addChild} or C{addRawXml}, or using the
132    C{payload} keyword argument to C{__init__}.
133    """
134
135    def __init__(self, id=None, payload=None):
136        """
137        @param id: optional item identifier
138        @type id: L{unicode}
139        @param payload: optional item payload. Either as a domish element, or
140                        as serialized XML.
141        @type payload: object providing L{domish.IElement} or L{unicode}.
142        """
143
144        domish.Element.__init__(self, (NS_PUBSUB, 'item'))
145        if id is not None:
146            self['id'] = id
147        if payload is not None:
148            if isinstance(payload, basestring):
149                self.addRawXml(payload)
150            else:
151                self.addChild(payload)
152
153
154
155class _PubSubRequest(xmlstream.IQ):
156    """
157    Publish subscribe request.
158
159    @ivar verb: Request verb
160    @type verb: C{str}
161    @ivar namespace: Request namespace.
162    @type namespace: C{str}
163    @ivar method: Type attribute of the IQ request. Either C{'set'} or C{'get'}
164    @type method: C{str}
165    @ivar command: Command element of the request. This is the direct child of
166                   the C{pubsub} element in the C{namespace} with the name
167                   C{verb}.
168    """
169
170    def __init__(self, xs, verb, namespace=NS_PUBSUB, method='set'):
171        xmlstream.IQ.__init__(self, xs, method)
172        self.addElement((namespace, 'pubsub'))
173
174        self.command = self.pubsub.addElement(verb)
175
176    def send(self, to):
177        """
178        Send out request.
179
180        Extends L{xmlstream.IQ.send} by requiring the C{to} parameter to be
181        a L{JID} instance.
182
183        @param to: Entity to send the request to.
184        @type to: L{JID}
185        """
186        destination = to.full()
187        return xmlstream.IQ.send(self, destination)
188
189
190
191class PubSubClient(XMPPHandler):
192    """
193    Publish subscribe client protocol.
194    """
195
196    implements(IPubSubClient)
197
198    def connectionInitialized(self):
199        self.xmlstream.addObserver('/message/event[@xmlns="%s"]' %
200                                   NS_PUBSUB_EVENT, self._onEvent)
201
202    def _onEvent(self, message):
203        try:
204            service = jid.JID(message["from"])
205            recipient = jid.JID(message["to"])
206        except KeyError:
207            return
208
209        actionElement = None
210        for element in message.event.elements():
211            if element.uri == NS_PUBSUB_EVENT:
212                actionElement = element
213
214        if not actionElement:
215            return
216
217        eventHandler = getattr(self, "_onEvent_%s" % actionElement.name, None)
218
219        if eventHandler:
220            eventHandler(service, recipient, actionElement)
221            message.handled = True
222
223    def _onEvent_items(self, service, recipient, action):
224        nodeIdentifier = action["node"]
225
226        items = [element for element in action.elements()
227                         if element.name in ('item', 'retract')]
228
229        self.itemsReceived(recipient, service, nodeIdentifier, items)
230
231    def _onEvent_delete(self, service, recipient, action):
232        nodeIdentifier = action["node"]
233        self.deleteReceived(recipient, service, nodeIdentifier)
234
235    def _onEvent_purge(self, service, recipient, action):
236        nodeIdentifier = action["node"]
237        self.purgeReceived(recipient, service, nodeIdentifier)
238
239    def itemsReceived(self, recipient, service, nodeIdentifier, items):
240        pass
241
242    def deleteReceived(self, recipient, service, nodeIdentifier):
243        pass
244
245    def purgeReceived(self, recipient, service, nodeIdentifier):
246        pass
247
248    def createNode(self, service, nodeIdentifier=None):
249        """
250        Create a publish subscribe node.
251
252        @param service: The publish subscribe service to create the node at.
253        @type service: L{JID}
254        @param nodeIdentifier: Optional suggestion for the id of the node.
255        @type nodeIdentifier: C{unicode}
256        """
257
258
259        request = _PubSubRequest(self.xmlstream, 'create')
260        if nodeIdentifier:
261            request.command['node'] = nodeIdentifier
262
263        def cb(iq):
264            try:
265                new_node = iq.pubsub.create["node"]
266            except AttributeError:
267                # the suggested node identifier was accepted
268                new_node = nodeIdentifier
269            return new_node
270
271        return request.send(service).addCallback(cb)
272
273    def deleteNode(self, service, nodeIdentifier):
274        """
275        Delete a publish subscribe node.
276
277        @param service: The publish subscribe service to delete the node from.
278        @type service: L{JID}
279        @param nodeIdentifier: The identifier of the node.
280        @type nodeIdentifier: C{unicode}
281        """
282        request = _PubSubRequest(self.xmlstream, 'delete')
283        request.command['node'] = nodeIdentifier
284        return request.send(service)
285
286    def subscribe(self, service, nodeIdentifier, subscriber):
287        """
288        Subscribe to a publish subscribe node.
289
290        @param service: The publish subscribe service that keeps the node.
291        @type service: L{JID}
292        @param nodeIdentifier: The identifier of the node.
293        @type nodeIdentifier: C{unicode}
294        @param subscriber: The entity to subscribe to the node. This entity
295                           will get notifications of new published items.
296        @type subscriber: L{JID}
297        """
298        request = _PubSubRequest(self.xmlstream, 'subscribe')
299        request.command['node'] = nodeIdentifier
300        request.command['jid'] = subscriber.full()
301
302        def cb(iq):
303            subscription = iq.pubsub.subscription["subscription"]
304
305            if subscription == 'pending':
306                raise SubscriptionPending
307            elif subscription == 'unconfigured':
308                raise SubscriptionUnconfigured
309            else:
310                # we assume subscription == 'subscribed'
311                # any other value would be invalid, but that should have
312                # yielded a stanza error.
313                return None
314
315        return request.send(service).addCallback(cb)
316
317    def unsubscribe(self, service, nodeIdentifier, subscriber):
318        """
319        Unsubscribe from a publish subscribe node.
320
321        @param service: The publish subscribe service that keeps the node.
322        @type service: L{JID}
323        @param nodeIdentifier: The identifier of the node.
324        @type nodeIdentifier: C{unicode}
325        @param subscriber: The entity to unsubscribe from the node.
326        @type subscriber: L{JID}
327        """
328        request = _PubSubRequest(self.xmlstream, 'unsubscribe')
329        request.command['node'] = nodeIdentifier
330        request.command['jid'] = subscriber.full()
331        return request.send(service)
332
333    def publish(self, service, nodeIdentifier, items=None):
334        """
335        Publish to a publish subscribe node.
336
337        @param service: The publish subscribe service that keeps the node.
338        @type service: L{JID}
339        @param nodeIdentifier: The identifier of the node.
340        @type nodeIdentifier: C{unicode}
341        @param items: Optional list of L{Item}s to publish.
342        @type items: C{list}
343        """
344        request = _PubSubRequest(self.xmlstream, 'publish')
345        request.command['node'] = nodeIdentifier
346        if items:
347            for item in items:
348                request.command.addChild(item)
349
350        return request.send(service)
351
352    def items(self, service, nodeIdentifier, maxItems=None):
353        """
354        Retrieve previously published items from a publish subscribe node.
355
356        @param service: The publish subscribe service that keeps the node.
357        @type service: L{JID}
358        @param nodeIdentifier: The identifier of the node.
359        @type nodeIdentifier: C{unicode}
360        @param maxItems: Optional limit on the number of retrieved items.
361        @type maxItems: C{int}
362        """
363        request = _PubSubRequest(self.xmlstream, 'items', method='get')
364        request.command['node'] = nodeIdentifier
365        if maxItems:
366            request.command["max_items"] = str(int(maxItems))
367
368        def cb(iq):
369            items = []
370            for element in iq.pubsub.items.elements():
371                if element.uri == NS_PUBSUB and element.name == 'item':
372                    items.append(element)
373            return items
374
375        return request.send(service).addCallback(cb)
376
377
378
379class PubSubService(XMPPHandler, IQHandlerMixin):
380    """
381    Protocol implementation for a XMPP Publish Subscribe Service.
382
383    The word Service here is used as taken from the Publish Subscribe
384    specification. It is the party responsible for keeping nodes and their
385    subscriptions, and sending out notifications.
386
387    Methods from the L{IPubSubService} interface that are called as
388    a result of an XMPP request may raise exceptions. Alternatively the
389    deferred returned by these methods may have their errback called. These are
390    handled as follows:
391
392     - If the exception is an instance of L{error.StanzaError}, an error
393       response iq is returned.
394     - Any other exception is reported using L{log.msg}. An error response
395       with the condition C{internal-server-error} is returned.
396
397    The default implementation of said methods raises an L{Unsupported}
398    exception and are meant to be overridden.
399
400    @ivar discoIdentity: Service discovery identity as a dictionary with
401                         keys C{'category'}, C{'type'} and C{'name'}.
402    @ivar pubSubFeatures: List of supported publish-subscribe features for
403                          service discovery, as C{str}.
404    @type pubSubFeatures: C{list} or C{None}
405    """
406
407    implements(IPubSubService)
408
409    iqHandlers = {
410            PUBSUB_PUBLISH: '_onPublish',
411            PUBSUB_CREATE: '_onCreate',
412            PUBSUB_SUBSCRIBE: '_onSubscribe',
413            PUBSUB_OPTIONS_GET: '_onOptionsGet',
414            PUBSUB_OPTIONS_SET: '_onOptionsSet',
415            PUBSUB_AFFILIATIONS: '_onAffiliations',
416            PUBSUB_ITEMS: '_onItems',
417            PUBSUB_RETRACT: '_onRetract',
418            PUBSUB_SUBSCRIPTIONS: '_onSubscriptions',
419            PUBSUB_UNSUBSCRIBE: '_onUnsubscribe',
420
421            PUBSUB_AFFILIATIONS_GET: '_onAffiliationsGet',
422            PUBSUB_AFFILIATIONS_SET: '_onAffiliationsSet',
423            PUBSUB_CONFIGURE_GET: '_onConfigureGet',
424            PUBSUB_CONFIGURE_SET: '_onConfigureSet',
425            PUBSUB_DEFAULT: '_onDefault',
426            PUBSUB_PURGE: '_onPurge',
427            PUBSUB_DELETE: '_onDelete',
428            PUBSUB_SUBSCRIPTIONS_GET: '_onSubscriptionsGet',
429            PUBSUB_SUBSCRIPTIONS_SET: '_onSubscriptionsSet',
430
431            }
432
433    def __init__(self):
434        self.discoIdentity = {'category': 'pubsub',
435                              'type': 'generic',
436                              'name': 'Generic Publish-Subscribe Service'}
437
438        self.pubSubFeatures = []
439
440    def connectionMade(self):
441        self.xmlstream.addObserver(PUBSUB_GET, self.handleRequest)
442        self.xmlstream.addObserver(PUBSUB_SET, self.handleRequest)
443        self.xmlstream.addObserver(PUBSUB_OWNER_GET, self.handleRequest)
444        self.xmlstream.addObserver(PUBSUB_OWNER_SET, self.handleRequest)
445
446    def getDiscoInfo(self, requestor, target, nodeIdentifier):
447        info = []
448
449        if not nodeIdentifier:
450            info.append(disco.DiscoIdentity(**self.discoIdentity))
451
452            info.append(disco.DiscoFeature(disco.NS_ITEMS))
453            info.extend([disco.DiscoFeature("%s#%s" % (NS_PUBSUB, feature))
454                         for feature in self.pubSubFeatures])
455
456            return defer.succeed(info)
457        else:
458            def toInfo(nodeInfo):
459                if not nodeInfo:
460                    return []
461
462                (nodeType, metaData) = nodeInfo['type'], nodeInfo['meta-data']
463                info.append(disco.DiscoIdentity('pubsub', nodeType))
464                if metaData:
465                    form = data_form.Form(type="result",
466                                          form_type=NS_PUBSUB_META_DATA) 
467                    form.add_field("text-single",
468                                   "pubsub#node_type",
469                                   "The type of node (collection or leaf)",
470                                   nodeType)
471
472                    for metaDatum in metaData:
473                        form.add_field(**metaDatum)
474
475                    info.append(form)
476                return info
477
478            d = self.getNodeInfo(requestor, target, nodeIdentifier)
479            d.addCallback(toInfo)
480            return d
481
482    def getDiscoItems(self, requestor, target, nodeIdentifier):
483        if nodeIdentifier or self.hideNodes:
484            return defer.succeed([])
485
486        d = self.getNodes(requestor, target)
487        d.addCallback(lambda nodes: [disco.DiscoItem(target, node)
488                                     for node in nodes])
489        return d
490
491    def _onPublish(self, iq):
492        requestor = jid.internJID(iq["from"]).userhostJID()
493        service = jid.internJID(iq["to"])
494
495        try:
496            nodeIdentifier = iq.pubsub.publish["node"]
497        except KeyError:
498            raise BadRequest
499
500        items = []
501        for element in iq.pubsub.publish.elements():
502            if element.uri == NS_PUBSUB and element.name == 'item':
503                items.append(element)
504
505        return self.publish(requestor, service, nodeIdentifier, items)
506
507    def _onSubscribe(self, iq):
508        requestor = jid.internJID(iq["from"]).userhostJID()
509        service = jid.internJID(iq["to"])
510
511        try:
512            nodeIdentifier = iq.pubsub.subscribe["node"]
513            subscriber = jid.internJID(iq.pubsub.subscribe["jid"])
514        except KeyError:
515            raise BadRequest
516
517        def toResponse(subscription):
518            nodeIdentifier, state = subscription
519            response = domish.Element((NS_PUBSUB, "pubsub"))
520            subscription = response.addElement("subscription")
521            subscription["node"] = nodeIdentifier
522            subscription["jid"] = subscriber.full()
523            subscription["subscription"] = state
524            return response
525
526        d = self.subscribe(requestor, service, nodeIdentifier, subscriber)
527        d.addCallback(toResponse)
528        return d
529
530    def _onUnsubscribe(self, iq):
531        requestor = jid.internJID(iq["from"]).userhostJID()
532        service = jid.internJID(iq["to"])
533
534        try:
535            nodeIdentifier = iq.pubsub.unsubscribe["node"]
536            subscriber = jid.internJID(iq.pubsub.unsubscribe["jid"])
537        except KeyError:
538            raise BadRequest
539
540        return self.unsubscribe(requestor, service, nodeIdentifier, subscriber)
541
542    def _onOptionsGet(self, iq):
543        raise Unsupported('subscription-options-unavailable')
544
545    def _onOptionsSet(self, iq):
546        raise Unsupported('subscription-options-unavailable')
547
548    def _onSubscriptions(self, iq):
549        requestor = jid.internJID(iq["from"]).userhostJID()
550        service = jid.internJID(iq["to"])
551
552        def toResponse(result):
553            response = domish.Element((NS_PUBSUB, 'pubsub'))
554            subscriptions = response.addElement('subscriptions')
555            for node, subscriber, state in result:
556                item = subscriptions.addElement('subscription')
557                item['node'] = node
558                item['jid'] = subscriber.full()
559                item['subscription'] = state
560            return response
561
562        d = self.subscriptions(requestor, service)
563        d.addCallback(toResponse)
564        return d
565
566    def _onAffiliations(self, iq):
567        requestor = jid.internJID(iq["from"]).userhostJID()
568        service = jid.internJID(iq["to"])
569
570        def toResponse(result):
571            response = domish.Element((NS_PUBSUB, 'pubsub'))
572            affiliations = response.addElement('affiliations')
573
574            for nodeIdentifier, affiliation in result:
575                item = affiliations.addElement('affiliation')
576                item['node'] = nodeIdentifier
577                item['affiliation'] = affiliation
578
579            return response
580
581        d = self.affiliations(requestor, service)
582        d.addCallback(toResponse)
583        return d
584
585    def _onCreate(self, iq):
586        requestor = jid.internJID(iq["from"]).userhostJID()
587        service = jid.internJID(iq["to"])
588        nodeIdentifier = iq.pubsub.create.getAttribute("node")
589
590        def toResponse(result):
591            if not nodeIdentifier or nodeIdentifier != result:
592                response = domish.Element((NS_PUBSUB, 'pubsub'))
593                create = response.addElement('create')
594                create['node'] = result
595                return response
596            else:
597                return None
598
599        d = self.create(requestor, service, nodeIdentifier)
600        d.addCallback(toResponse)
601        return d
602
603    def _formFromConfiguration(self, options):
604        form = data_form.Form(type="form", form_type=NS_PUBSUB_NODE_CONFIG)
605
606        for option in options:
607            form.add_field(**option)
608
609        return form
610
611    def _onDefault(self, iq):
612        requestor = jid.internJID(iq["from"]).userhostJID()
613        service = jid.internJID(iq["to"])
614
615        def toResponse(options):
616            response = domish.Element((NS_PUBSUB_OWNER, "pubsub"))
617            default = response.addElement("default")
618            default.addChild(self._formFromConfiguration(options))
619            return response
620
621        d = self.getDefaultConfiguration(requestor, service)
622        d.addCallback(toResponse)
623        return d
624
625    def _onConfigureGet(self, iq):
626        requestor = jid.internJID(iq["from"]).userhostJID()
627        service = jid.internJID(iq["to"])
628        nodeIdentifier = iq.pubsub.configure.getAttribute("node")
629
630        def toResponse(options):
631            response = domish.Element((NS_PUBSUB_OWNER, "pubsub"))
632            configure = response.addElement("configure")
633            configure.addChild(self._formFromConfiguration(options))
634
635            if nodeIdentifier:
636                configure["node"] = nodeIdentifier
637
638            return response
639
640        d = self.getConfiguration(requestor, service, nodeIdentifier)
641        d.addCallback(toResponse)
642        return d
643
644    def _onConfigureSet(self, iq):
645        requestor = jid.internJID(iq["from"]).userhostJID()
646        service = jid.internJID(iq["to"])
647        nodeIdentifier = iq.pubsub.configure["node"]
648
649        def getFormOptions(self, form):
650            options = {}
651
652            for element in form.elements():
653                if element.name == 'field' and \
654                   element.uri == data_form.NS_X_DATA:
655                    try:
656                        options[element["var"]] = str(element.value)
657                    except (KeyError, AttributeError):
658                        raise BadRequest
659
660            return options
661
662        # Search configuration form with correct FORM_TYPE and process it
663
664        for element in iq.pubsub.configure.elements():
665            if element.name != 'x' or element.uri != data_form.NS_X_DATA:
666                continue
667
668            type = element.getAttribute("type")
669            if type == "cancel":
670                return None
671            elif type != "submit":
672                continue
673
674            options = getFormOptions(element)
675
676            if options["FORM_TYPE"] == NS_PUBSUB + "#node_config":
677                del options["FORM_TYPE"]
678                return self.setConfiguration(requestor, service,
679                                             nodeIdentifier, options)
680
681        raise BadRequest
682
683    def _onItems(self, iq):
684        requestor = jid.internJID(iq["from"]).userhostJID()
685        service = jid.internJID(iq["to"])
686
687        try:
688            nodeIdentifier = iq.pubsub.items["node"]
689        except KeyError:
690            raise BadRequest
691
692        maxItems = iq.pubsub.items.getAttribute('max_items')
693
694        if maxItems:
695            try:
696                maxItems = int(maxItems)
697            except ValueError:
698                raise BadRequest
699
700        itemIdentifiers = []
701        for child in iq.pubsub.items.elements():
702            if child.name == 'item' and child.uri == NS_PUBSUB:
703                try:
704                    itemIdentifiers.append(child["id"])
705                except KeyError:
706                    raise BadRequest
707
708        def toResponse(result):
709            response = domish.Element((NS_PUBSUB, 'pubsub'))
710            items = response.addElement('items')
711            items["node"] = nodeIdentifier
712
713            for item in result:
714                items.addRawXml(item)
715
716            return response
717
718        d = self.items(requestor, service, nodeIdentifier, maxItems,
719                       itemIdentifiers)
720        d.addCallback(toResponse)
721        return d
722
723    def _onRetract(self, iq):
724        requestor = jid.internJID(iq["from"]).userhostJID()
725        service = jid.internJID(iq["to"])
726
727        try:
728            nodeIdentifier = iq.pubsub.retract["node"]
729        except KeyError:
730            raise BadRequest
731
732        itemIdentifiers = []
733        for child in iq.pubsub.retract.elements():
734            if child.uri == NS_PUBSUB_OWNER and child.name == 'item':
735                try:
736                    itemIdentifiers.append(child["id"])
737                except KeyError:
738                    raise BadRequest
739
740        return self.retract(requestor, service, nodeIdentifier,
741                            itemIdentifiers)
742
743    def _onPurge(self, iq):
744        requestor = jid.internJID(iq["from"]).userhostJID()
745        service = jid.internJID(iq["to"])
746
747        try:
748            nodeIdentifier = iq.pubsub.purge["node"]
749        except KeyError:
750            raise BadRequest
751
752        return self.purge(requestor, service, nodeIdentifier)
753
754    def _onDelete(self, iq):
755        requestor = jid.internJID(iq["from"]).userhostJID()
756        service = jid.internJID(iq["to"])
757
758        try:
759            nodeIdentifier = iq.pubsub.delete["node"]
760        except KeyError:
761            raise BadRequest
762
763        return self.delete(requestor, service, nodeIdentifier)
764
765    def _onAffiliationsGet(self, iq):
766        raise Unsupported('modify-affiliations')
767
768    def _onAffiliationsSet(self, iq):
769        raise Unsupported('modify-affiliations')
770
771    def _onSubscriptionsGet(self, iq):
772        raise Unsupported('manage-subscriptions')
773
774    def _onSubscriptionsSet(self, iq):
775        raise Unsupported('manage-subscriptions')
776
777    # public methods
778
779    def notifyPublish(self, service, nodeIdentifier, notifications):
780        for recipient, items in notifications:
781            message = domish.Element((None, "message"))
782            message["from"] = service.full()
783            message["to"] = recipient.full()
784            event = message.addElement((NS_PUBSUB_EVENT, "event"))
785            element = event.addElement("items")
786            element["node"] = nodeIdentifier
787            element.children = items
788            self.send(message)
789
790    def notifyDelete(self, service, nodeIdentifier, recipients):
791        for recipient in recipients:
792            message = domish.Element((None, "message"))
793            message["from"] = service.full()
794            message["to"] = recipient.full()
795            event = message.addElement((NS_PUBSUB_EVENT, "event"))
796            element = event.addElement("delete")
797            element["node"] = nodeIdentifier
798            self.send(message)
799
800    def getNodeInfo(self, requestor, service, nodeIdentifier):
801        return None
802
803    def getNodes(self, requestor, service):
804        return []
805
806    def publish(self, requestor, service, nodeIdentifier, items):
807        raise Unsupported('publish')
808
809    def subscribe(self, requestor, service, nodeIdentifier, subscriber):
810        raise Unsupported('subscribe')
811
812    def unsubscribe(self, requestor, service, nodeIdentifier, subscriber):
813        raise Unsupported('subscribe')
814
815    def subscriptions(self, requestor, service):
816        raise Unsupported('retrieve-subscriptions')
817
818    def affiliations(self, requestor, service):
819        raise Unsupported('retrieve-affiliations')
820
821    def create(self, requestor, service, nodeIdentifier):
822        raise Unsupported('create-nodes')
823
824    def getDefaultConfiguration(self, requestor, service):
825        raise Unsupported('retrieve-default')
826
827    def getConfiguration(self, requestor, service, nodeIdentifier):
828        raise Unsupported('config-node')
829
830    def setConfiguration(self, requestor, service, nodeIdentifier, options):
831        raise Unsupported('config-node')
832
833    def items(self, requestor, service, nodeIdentifier, maxItems,
834                    itemIdentifiers):
835        raise Unsupported('retrieve-items')
836
837    def retract(self, requestor, service, nodeIdentifier, itemIdentifiers):
838        raise Unsupported('retract-items')
839
840    def purge(self, requestor, service, nodeIdentifier):
841        raise Unsupported('purge-nodes')
842
843    def delete(self, requestor, service, nodeIdentifier):
844        raise Unsupported('delete-nodes')
Note: See TracBrowser for help on using the repository browser.