source: wokkel/pubsub.py @ 25:fd00a744a458

Last change on this file since 25:fd00a744a458 was 25:fd00a744a458, checked in by Ralph Meijer <ralphm@…>, 13 years ago

Refactor Data Forms.

Author: ralphm.
Fixes #13.

This refactoring provides an abstract representation of Forms, Fields and
Options and each of those can be parsed from or unparsed to XML. This change
also simplifies testing in test_pubsub, by allowing the 'received' requests
to be represented as an XML snippit.

File size: 28.8 KB
Line 
1# -*- test-case-name: wokkel.test.test_pubsub -*-
2#
3# Copyright (c) 2003-2008 Ralph Meijer
4# See LICENSE for details.
5
6"""
7XMPP publish-subscribe protocol.
8
9This protocol is specified in
10U{XEP-0060<http://www.xmpp.org/extensions/xep-0060.html>}.
11"""
12
13from zope.interface import implements
14
15from twisted.internet import defer
16from twisted.words.protocols.jabber import jid, error, xmlstream
17from twisted.words.xish import domish
18
19from wokkel import disco, data_form
20from wokkel.subprotocols import IQHandlerMixin, XMPPHandler
21from wokkel.iwokkel import IPubSubClient, IPubSubService
22
23# Iq get and set XPath queries
24IQ_GET = '/iq[@type="get"]'
25IQ_SET = '/iq[@type="set"]'
26
27# Publish-subscribe namespaces
28NS_PUBSUB = 'http://jabber.org/protocol/pubsub'
29NS_PUBSUB_EVENT = NS_PUBSUB + '#event'
30NS_PUBSUB_ERRORS = NS_PUBSUB + '#errors'
31NS_PUBSUB_OWNER = NS_PUBSUB + "#owner"
32NS_PUBSUB_NODE_CONFIG = NS_PUBSUB + "#node_config"
33NS_PUBSUB_META_DATA = NS_PUBSUB + "#meta-data"
34
35# In publish-subscribe namespace XPath query selector.
36IN_NS_PUBSUB = '[@xmlns="' + NS_PUBSUB + '"]'
37IN_NS_PUBSUB_OWNER = '[@xmlns="' + NS_PUBSUB_OWNER + '"]'
38
39# Publish-subscribe XPath queries
40PUBSUB_ELEMENT = '/pubsub' + IN_NS_PUBSUB
41PUBSUB_OWNER_ELEMENT = '/pubsub' + IN_NS_PUBSUB_OWNER
42PUBSUB_GET = IQ_GET + PUBSUB_ELEMENT
43PUBSUB_SET = IQ_SET + PUBSUB_ELEMENT
44PUBSUB_OWNER_GET = IQ_GET + PUBSUB_OWNER_ELEMENT
45PUBSUB_OWNER_SET = IQ_SET + PUBSUB_OWNER_ELEMENT
46
47# Publish-subscribe command XPath queries
48PUBSUB_PUBLISH = PUBSUB_SET + '/publish' + IN_NS_PUBSUB
49PUBSUB_CREATE = PUBSUB_SET + '/create' + IN_NS_PUBSUB
50PUBSUB_SUBSCRIBE = PUBSUB_SET + '/subscribe' + IN_NS_PUBSUB
51PUBSUB_UNSUBSCRIBE = PUBSUB_SET + '/unsubscribe' + IN_NS_PUBSUB
52PUBSUB_OPTIONS_GET = PUBSUB_GET + '/options' + IN_NS_PUBSUB
53PUBSUB_OPTIONS_SET = PUBSUB_SET + '/options' + IN_NS_PUBSUB
54PUBSUB_DEFAULT = PUBSUB_OWNER_GET + '/default' + IN_NS_PUBSUB_OWNER
55PUBSUB_CONFIGURE_GET = PUBSUB_OWNER_GET + '/configure' + IN_NS_PUBSUB_OWNER
56PUBSUB_CONFIGURE_SET = PUBSUB_OWNER_SET + '/configure' + IN_NS_PUBSUB_OWNER
57PUBSUB_SUBSCRIPTIONS = PUBSUB_GET + '/subscriptions' + IN_NS_PUBSUB
58PUBSUB_AFFILIATIONS = PUBSUB_GET + '/affiliations' + IN_NS_PUBSUB
59PUBSUB_AFFILIATIONS_GET = PUBSUB_OWNER_GET + '/affiliations' + \
60                          IN_NS_PUBSUB_OWNER
61PUBSUB_AFFILIATIONS_SET = PUBSUB_OWNER_SET + '/affiliations' + \
62                          IN_NS_PUBSUB_OWNER
63PUBSUB_SUBSCRIPTIONS_GET = PUBSUB_OWNER_GET + '/subscriptions' + \
64                          IN_NS_PUBSUB_OWNER
65PUBSUB_SUBSCRIPTIONS_SET = PUBSUB_OWNER_SET + '/subscriptions' + \
66                          IN_NS_PUBSUB_OWNER
67PUBSUB_ITEMS = PUBSUB_GET + '/items' + IN_NS_PUBSUB
68PUBSUB_RETRACT = PUBSUB_SET + '/retract' + IN_NS_PUBSUB
69PUBSUB_PURGE = PUBSUB_OWNER_SET + '/purge' + IN_NS_PUBSUB_OWNER
70PUBSUB_DELETE = PUBSUB_OWNER_SET + '/delete' + IN_NS_PUBSUB_OWNER
71
72class BadRequest(error.StanzaError):
73    """
74    Bad request stanza error.
75    """
76    def __init__(self):
77        error.StanzaError.__init__(self, 'bad-request')
78
79
80
81class SubscriptionPending(Exception):
82    """
83    Raised when the requested subscription is pending acceptance.
84    """
85
86
87
88class SubscriptionUnconfigured(Exception):
89    """
90    Raised when the requested subscription needs to be configured before
91    becoming active.
92    """
93
94
95
96class PubSubError(error.StanzaError):
97    """
98    Exception with publish-subscribe specific condition.
99    """
100    def __init__(self, condition, pubsubCondition, feature=None, text=None):
101        appCondition = domish.Element((NS_PUBSUB_ERRORS, pubsubCondition))
102        if feature:
103            appCondition['feature'] = feature
104        error.StanzaError.__init__(self, condition,
105                                         text=text,
106                                         appCondition=appCondition)
107
108
109
110class Unsupported(PubSubError):
111    def __init__(self, feature, text=None):
112        PubSubError.__init__(self, 'feature-not-implemented',
113                                   'unsupported',
114                                   feature,
115                                   text)
116
117
118
119class OptionsUnavailable(Unsupported):
120    def __init__(self):
121        Unsupported.__init__(self, 'subscription-options-unavailable')
122
123
124
125class Item(domish.Element):
126    """
127    Publish subscribe item.
128
129    This behaves like an object providing L{domish.IElement}.
130
131    Item payload can be added using C{addChild} or C{addRawXml}, or using the
132    C{payload} keyword argument to C{__init__}.
133    """
134
135    def __init__(self, id=None, payload=None):
136        """
137        @param id: optional item identifier
138        @type id: L{unicode}
139        @param payload: optional item payload. Either as a domish element, or
140                        as serialized XML.
141        @type payload: object providing L{domish.IElement} or L{unicode}.
142        """
143
144        domish.Element.__init__(self, (NS_PUBSUB, 'item'))
145        if id is not None:
146            self['id'] = id
147        if payload is not None:
148            if isinstance(payload, basestring):
149                self.addRawXml(payload)
150            else:
151                self.addChild(payload)
152
153
154
155class _PubSubRequest(xmlstream.IQ):
156    """
157    Publish subscribe request.
158
159    @ivar verb: Request verb
160    @type verb: C{str}
161    @ivar namespace: Request namespace.
162    @type namespace: C{str}
163    @ivar method: Type attribute of the IQ request. Either C{'set'} or C{'get'}
164    @type method: C{str}
165    @ivar command: Command element of the request. This is the direct child of
166                   the C{pubsub} element in the C{namespace} with the name
167                   C{verb}.
168    """
169
170    def __init__(self, xs, verb, namespace=NS_PUBSUB, method='set'):
171        xmlstream.IQ.__init__(self, xs, method)
172        self.addElement((namespace, 'pubsub'))
173
174        self.command = self.pubsub.addElement(verb)
175
176
177    def send(self, to):
178        """
179        Send out request.
180
181        Extends L{xmlstream.IQ.send} by requiring the C{to} parameter to be
182        a L{JID} instance.
183
184        @param to: Entity to send the request to.
185        @type to: L{JID}
186        """
187        destination = to.full()
188        return xmlstream.IQ.send(self, destination)
189
190
191
192class PubSubClient(XMPPHandler):
193    """
194    Publish subscribe client protocol.
195    """
196
197    implements(IPubSubClient)
198
199    def connectionInitialized(self):
200        self.xmlstream.addObserver('/message/event[@xmlns="%s"]' %
201                                   NS_PUBSUB_EVENT, self._onEvent)
202
203
204    def _onEvent(self, message):
205        try:
206            service = jid.JID(message["from"])
207            recipient = jid.JID(message["to"])
208        except KeyError:
209            return
210
211        actionElement = None
212        for element in message.event.elements():
213            if element.uri == NS_PUBSUB_EVENT:
214                actionElement = element
215
216        if not actionElement:
217            return
218
219        eventHandler = getattr(self, "_onEvent_%s" % actionElement.name, None)
220
221        if eventHandler:
222            eventHandler(service, recipient, actionElement)
223            message.handled = True
224
225    def _onEvent_items(self, service, recipient, action):
226        nodeIdentifier = action["node"]
227
228        items = [element for element in action.elements()
229                         if element.name in ('item', 'retract')]
230
231        self.itemsReceived(recipient, service, nodeIdentifier, items)
232
233    def _onEvent_delete(self, service, recipient, action):
234        nodeIdentifier = action["node"]
235        self.deleteReceived(recipient, service, nodeIdentifier)
236
237    def _onEvent_purge(self, service, recipient, action):
238        nodeIdentifier = action["node"]
239        self.purgeReceived(recipient, service, nodeIdentifier)
240
241    def itemsReceived(self, recipient, service, nodeIdentifier, items):
242        pass
243
244    def deleteReceived(self, recipient, service, nodeIdentifier):
245        pass
246
247    def purgeReceived(self, recipient, service, nodeIdentifier):
248        pass
249
250    def createNode(self, service, nodeIdentifier=None):
251        """
252        Create a publish subscribe node.
253
254        @param service: The publish subscribe service to create the node at.
255        @type service: L{JID}
256        @param nodeIdentifier: Optional suggestion for the id of the node.
257        @type nodeIdentifier: C{unicode}
258        """
259
260
261        request = _PubSubRequest(self.xmlstream, 'create')
262        if nodeIdentifier:
263            request.command['node'] = nodeIdentifier
264
265        def cb(iq):
266            try:
267                new_node = iq.pubsub.create["node"]
268            except AttributeError:
269                # the suggested node identifier was accepted
270                new_node = nodeIdentifier
271            return new_node
272
273        return request.send(service).addCallback(cb)
274
275
276    def deleteNode(self, service, nodeIdentifier):
277        """
278        Delete a publish subscribe node.
279
280        @param service: The publish subscribe service to delete the node from.
281        @type service: L{JID}
282        @param nodeIdentifier: The identifier of the node.
283        @type nodeIdentifier: C{unicode}
284        """
285        request = _PubSubRequest(self.xmlstream, 'delete')
286        request.command['node'] = nodeIdentifier
287        return request.send(service)
288
289
290    def subscribe(self, service, nodeIdentifier, subscriber):
291        """
292        Subscribe to a publish subscribe node.
293
294        @param service: The publish subscribe service that keeps the node.
295        @type service: L{JID}
296        @param nodeIdentifier: The identifier of the node.
297        @type nodeIdentifier: C{unicode}
298        @param subscriber: The entity to subscribe to the node. This entity
299                           will get notifications of new published items.
300        @type subscriber: L{JID}
301        """
302        request = _PubSubRequest(self.xmlstream, 'subscribe')
303        request.command['node'] = nodeIdentifier
304        request.command['jid'] = subscriber.full()
305
306        def cb(iq):
307            subscription = iq.pubsub.subscription["subscription"]
308
309            if subscription == 'pending':
310                raise SubscriptionPending
311            elif subscription == 'unconfigured':
312                raise SubscriptionUnconfigured
313            else:
314                # we assume subscription == 'subscribed'
315                # any other value would be invalid, but that should have
316                # yielded a stanza error.
317                return None
318
319        return request.send(service).addCallback(cb)
320
321
322    def unsubscribe(self, service, nodeIdentifier, subscriber):
323        """
324        Unsubscribe from a publish subscribe node.
325
326        @param service: The publish subscribe service that keeps the node.
327        @type service: L{JID}
328        @param nodeIdentifier: The identifier of the node.
329        @type nodeIdentifier: C{unicode}
330        @param subscriber: The entity to unsubscribe from the node.
331        @type subscriber: L{JID}
332        """
333        request = _PubSubRequest(self.xmlstream, 'unsubscribe')
334        request.command['node'] = nodeIdentifier
335        request.command['jid'] = subscriber.full()
336        return request.send(service)
337
338
339    def publish(self, service, nodeIdentifier, items=None):
340        """
341        Publish to a publish subscribe node.
342
343        @param service: The publish subscribe service that keeps the node.
344        @type service: L{JID}
345        @param nodeIdentifier: The identifier of the node.
346        @type nodeIdentifier: C{unicode}
347        @param items: Optional list of L{Item}s to publish.
348        @type items: C{list}
349        """
350        request = _PubSubRequest(self.xmlstream, 'publish')
351        request.command['node'] = nodeIdentifier
352        if items:
353            for item in items:
354                request.command.addChild(item)
355
356        return request.send(service)
357
358
359    def items(self, service, nodeIdentifier, maxItems=None):
360        """
361        Retrieve previously published items from a publish subscribe node.
362
363        @param service: The publish subscribe service that keeps the node.
364        @type service: L{JID}
365        @param nodeIdentifier: The identifier of the node.
366        @type nodeIdentifier: C{unicode}
367        @param maxItems: Optional limit on the number of retrieved items.
368        @type maxItems: C{int}
369        """
370        request = _PubSubRequest(self.xmlstream, 'items', method='get')
371        request.command['node'] = nodeIdentifier
372        if maxItems:
373            request.command["max_items"] = str(int(maxItems))
374
375        def cb(iq):
376            items = []
377            for element in iq.pubsub.items.elements():
378                if element.uri == NS_PUBSUB and element.name == 'item':
379                    items.append(element)
380            return items
381
382        return request.send(service).addCallback(cb)
383
384
385
386class PubSubService(XMPPHandler, IQHandlerMixin):
387    """
388    Protocol implementation for a XMPP Publish Subscribe Service.
389
390    The word Service here is used as taken from the Publish Subscribe
391    specification. It is the party responsible for keeping nodes and their
392    subscriptions, and sending out notifications.
393
394    Methods from the L{IPubSubService} interface that are called as
395    a result of an XMPP request may raise exceptions. Alternatively the
396    deferred returned by these methods may have their errback called. These are
397    handled as follows:
398
399     - If the exception is an instance of L{error.StanzaError}, an error
400       response iq is returned.
401     - Any other exception is reported using L{log.msg}. An error response
402       with the condition C{internal-server-error} is returned.
403
404    The default implementation of said methods raises an L{Unsupported}
405    exception and are meant to be overridden.
406
407    @ivar discoIdentity: Service discovery identity as a dictionary with
408                         keys C{'category'}, C{'type'} and C{'name'}.
409    @ivar pubSubFeatures: List of supported publish-subscribe features for
410                          service discovery, as C{str}.
411    @type pubSubFeatures: C{list} or C{None}
412    """
413
414    implements(IPubSubService)
415
416    iqHandlers = {
417            PUBSUB_PUBLISH: '_onPublish',
418            PUBSUB_CREATE: '_onCreate',
419            PUBSUB_SUBSCRIBE: '_onSubscribe',
420            PUBSUB_OPTIONS_GET: '_onOptionsGet',
421            PUBSUB_OPTIONS_SET: '_onOptionsSet',
422            PUBSUB_AFFILIATIONS: '_onAffiliations',
423            PUBSUB_ITEMS: '_onItems',
424            PUBSUB_RETRACT: '_onRetract',
425            PUBSUB_SUBSCRIPTIONS: '_onSubscriptions',
426            PUBSUB_UNSUBSCRIBE: '_onUnsubscribe',
427
428            PUBSUB_AFFILIATIONS_GET: '_onAffiliationsGet',
429            PUBSUB_AFFILIATIONS_SET: '_onAffiliationsSet',
430            PUBSUB_CONFIGURE_GET: '_onConfigureGet',
431            PUBSUB_CONFIGURE_SET: '_onConfigureSet',
432            PUBSUB_DEFAULT: '_onDefault',
433            PUBSUB_PURGE: '_onPurge',
434            PUBSUB_DELETE: '_onDelete',
435            PUBSUB_SUBSCRIPTIONS_GET: '_onSubscriptionsGet',
436            PUBSUB_SUBSCRIPTIONS_SET: '_onSubscriptionsSet',
437
438            }
439
440
441    def __init__(self):
442        self.discoIdentity = {'category': 'pubsub',
443                              'type': 'generic',
444                              'name': 'Generic Publish-Subscribe Service'}
445
446        self.pubSubFeatures = []
447
448
449    def connectionMade(self):
450        self.xmlstream.addObserver(PUBSUB_GET, self.handleRequest)
451        self.xmlstream.addObserver(PUBSUB_SET, self.handleRequest)
452        self.xmlstream.addObserver(PUBSUB_OWNER_GET, self.handleRequest)
453        self.xmlstream.addObserver(PUBSUB_OWNER_SET, self.handleRequest)
454
455
456    def getDiscoInfo(self, requestor, target, nodeIdentifier):
457        info = []
458
459        if not nodeIdentifier:
460            info.append(disco.DiscoIdentity(**self.discoIdentity))
461
462            info.append(disco.DiscoFeature(disco.NS_ITEMS))
463            info.extend([disco.DiscoFeature("%s#%s" % (NS_PUBSUB, feature))
464                         for feature in self.pubSubFeatures])
465
466        def toInfo(nodeInfo):
467            if not nodeInfo:
468                return
469
470            (nodeType, metaData) = nodeInfo['type'], nodeInfo['meta-data']
471            info.append(disco.DiscoIdentity('pubsub', nodeType))
472            if metaData:
473                form = data_form.Form(formType="result",
474                                      formNamespace=NS_PUBSUB_META_DATA)
475                form.fields.append(
476                        data_form.Field(
477                            var='pubsub#node_type',
478                            value=nodeType,
479                            label='The type of node (collection or leaf)'
480                        )
481                )
482
483                for metaDatum in metaData:
484                    form.fields.append(data_form.Field.fromDict(metaDatum))
485
486                info.append(form.toElement())
487
488        d = self.getNodeInfo(requestor, target, nodeIdentifier or '')
489        d.addCallback(toInfo)
490        d.addBoth(lambda result: info)
491        return d
492
493
494    def getDiscoItems(self, requestor, target, nodeIdentifier):
495        if nodeIdentifier or self.hideNodes:
496            return defer.succeed([])
497
498        d = self.getNodes(requestor, target)
499        d.addCallback(lambda nodes: [disco.DiscoItem(target, node)
500                                     for node in nodes])
501        return d
502
503
504    def _findForm(self, element, formNamespace):
505        if not element:
506            return None
507
508        form = None
509        for child in element.elements():
510            try:
511                form = data_form.Form.fromElement(child)
512            except data_form.Error:
513                continue
514
515            if form.formNamespace != NS_PUBSUB_NODE_CONFIG:
516                continue
517
518        return form
519
520
521    def _onPublish(self, iq):
522        requestor = jid.internJID(iq["from"]).userhostJID()
523        service = jid.internJID(iq["to"])
524
525        try:
526            nodeIdentifier = iq.pubsub.publish["node"]
527        except KeyError:
528            raise BadRequest
529
530        items = []
531        for element in iq.pubsub.publish.elements():
532            if element.uri == NS_PUBSUB and element.name == 'item':
533                items.append(element)
534
535        return self.publish(requestor, service, nodeIdentifier, items)
536
537
538    def _onSubscribe(self, iq):
539        requestor = jid.internJID(iq["from"]).userhostJID()
540        service = jid.internJID(iq["to"])
541
542        try:
543            nodeIdentifier = iq.pubsub.subscribe["node"]
544            subscriber = jid.internJID(iq.pubsub.subscribe["jid"])
545        except KeyError:
546            raise BadRequest
547
548        def toResponse(subscription):
549            nodeIdentifier, state = subscription
550            response = domish.Element((NS_PUBSUB, "pubsub"))
551            subscription = response.addElement("subscription")
552            subscription["node"] = nodeIdentifier
553            subscription["jid"] = subscriber.full()
554            subscription["subscription"] = state
555            return response
556
557        d = self.subscribe(requestor, service, nodeIdentifier, subscriber)
558        d.addCallback(toResponse)
559        return d
560
561
562    def _onUnsubscribe(self, iq):
563        requestor = jid.internJID(iq["from"]).userhostJID()
564        service = jid.internJID(iq["to"])
565
566        try:
567            nodeIdentifier = iq.pubsub.unsubscribe["node"]
568            subscriber = jid.internJID(iq.pubsub.unsubscribe["jid"])
569        except KeyError:
570            raise BadRequest
571
572        return self.unsubscribe(requestor, service, nodeIdentifier, subscriber)
573
574
575    def _onOptionsGet(self, iq):
576        raise Unsupported('subscription-options-unavailable')
577
578
579    def _onOptionsSet(self, iq):
580        raise Unsupported('subscription-options-unavailable')
581
582
583    def _onSubscriptions(self, iq):
584        requestor = jid.internJID(iq["from"]).userhostJID()
585        service = jid.internJID(iq["to"])
586
587        def toResponse(result):
588            response = domish.Element((NS_PUBSUB, 'pubsub'))
589            subscriptions = response.addElement('subscriptions')
590            for node, subscriber, state in result:
591                item = subscriptions.addElement('subscription')
592                item['node'] = node
593                item['jid'] = subscriber.full()
594                item['subscription'] = state
595            return response
596
597        d = self.subscriptions(requestor, service)
598        d.addCallback(toResponse)
599        return d
600
601
602    def _onAffiliations(self, iq):
603        requestor = jid.internJID(iq["from"]).userhostJID()
604        service = jid.internJID(iq["to"])
605
606        def toResponse(result):
607            response = domish.Element((NS_PUBSUB, 'pubsub'))
608            affiliations = response.addElement('affiliations')
609
610            for nodeIdentifier, affiliation in result:
611                item = affiliations.addElement('affiliation')
612                item['node'] = nodeIdentifier
613                item['affiliation'] = affiliation
614
615            return response
616
617        d = self.affiliations(requestor, service)
618        d.addCallback(toResponse)
619        return d
620
621
622    def _onCreate(self, iq):
623        requestor = jid.internJID(iq["from"]).userhostJID()
624        service = jid.internJID(iq["to"])
625        nodeIdentifier = iq.pubsub.create.getAttribute("node")
626
627        def toResponse(result):
628            if not nodeIdentifier or nodeIdentifier != result:
629                response = domish.Element((NS_PUBSUB, 'pubsub'))
630                create = response.addElement('create')
631                create['node'] = result
632                return response
633            else:
634                return None
635
636        d = self.create(requestor, service, nodeIdentifier)
637        d.addCallback(toResponse)
638        return d
639
640
641    def _formFromConfiguration(self, values):
642        options = self.getConfigurationOptions()
643        form = data_form.Form(formType="form",
644                              formNamespace=NS_PUBSUB_NODE_CONFIG)
645
646        for name, value in values.iteritems():
647            if name in options:
648                option = {'var': name}
649                option.update(options[name])
650                if isinstance(value, list):
651                    option['values'] = value
652                else:
653                    option['value'] = value
654                form.fields.append(data_form.Field.fromDict(option))
655
656        return form
657
658
659    def _onDefault(self, iq):
660        requestor = jid.internJID(iq["from"]).userhostJID()
661        service = jid.internJID(iq["to"])
662
663        def toResponse(options):
664            response = domish.Element((NS_PUBSUB_OWNER, "pubsub"))
665            default = response.addElement("default")
666            default.addChild(self._formFromConfiguration(options).toElement())
667            return response
668
669        d = self.getDefaultConfiguration(requestor, service)
670        d.addCallback(toResponse)
671        return d
672
673
674    def _onConfigureGet(self, iq):
675        requestor = jid.internJID(iq["from"]).userhostJID()
676        service = jid.internJID(iq["to"])
677        nodeIdentifier = iq.pubsub.configure.getAttribute("node")
678
679        def toResponse(options):
680            response = domish.Element((NS_PUBSUB_OWNER, "pubsub"))
681            configure = response.addElement("configure")
682            configure.addChild(self._formFromConfiguration(options).toElement())
683
684            if nodeIdentifier:
685                configure["node"] = nodeIdentifier
686
687            return response
688
689        d = self.getConfiguration(requestor, service, nodeIdentifier)
690        d.addCallback(toResponse)
691        return d
692
693
694    def _onConfigureSet(self, iq):
695        requestor = jid.internJID(iq["from"]).userhostJID()
696        service = jid.internJID(iq["to"])
697        nodeIdentifier = iq.pubsub.configure["node"]
698
699        # Search configuration form with correct FORM_TYPE and process it
700
701        form = self._findForm(iq.pubsub.configure, NS_PUBSUB_NODE_CONFIG)
702
703        if form:
704            if form.formType == 'submit':
705                options = form.getValues()
706
707                return self.setConfiguration(requestor, service,
708                                             nodeIdentifier, options)
709            elif form.formType == 'cancel':
710                return None
711
712        raise BadRequest()
713
714
715    def _onItems(self, iq):
716        requestor = jid.internJID(iq["from"]).userhostJID()
717        service = jid.internJID(iq["to"])
718
719        try:
720            nodeIdentifier = iq.pubsub.items["node"]
721        except KeyError:
722            raise BadRequest
723
724        maxItems = iq.pubsub.items.getAttribute('max_items')
725
726        if maxItems:
727            try:
728                maxItems = int(maxItems)
729            except ValueError:
730                raise BadRequest
731
732        itemIdentifiers = []
733        for child in iq.pubsub.items.elements():
734            if child.name == 'item' and child.uri == NS_PUBSUB:
735                try:
736                    itemIdentifiers.append(child["id"])
737                except KeyError:
738                    raise BadRequest
739
740        def toResponse(result):
741            response = domish.Element((NS_PUBSUB, 'pubsub'))
742            items = response.addElement('items')
743            items["node"] = nodeIdentifier
744
745            for item in result:
746                items.addChild(item)
747
748            return response
749
750        d = self.items(requestor, service, nodeIdentifier, maxItems,
751                       itemIdentifiers)
752        d.addCallback(toResponse)
753        return d
754
755
756    def _onRetract(self, iq):
757        requestor = jid.internJID(iq["from"]).userhostJID()
758        service = jid.internJID(iq["to"])
759
760        try:
761            nodeIdentifier = iq.pubsub.retract["node"]
762        except KeyError:
763            raise BadRequest
764
765        itemIdentifiers = []
766        for child in iq.pubsub.retract.elements():
767            if child.uri == NS_PUBSUB_OWNER and child.name == 'item':
768                try:
769                    itemIdentifiers.append(child["id"])
770                except KeyError:
771                    raise BadRequest
772
773        return self.retract(requestor, service, nodeIdentifier,
774                            itemIdentifiers)
775
776
777    def _onPurge(self, iq):
778        requestor = jid.internJID(iq["from"]).userhostJID()
779        service = jid.internJID(iq["to"])
780
781        try:
782            nodeIdentifier = iq.pubsub.purge["node"]
783        except KeyError:
784            raise BadRequest
785
786        return self.purge(requestor, service, nodeIdentifier)
787
788
789    def _onDelete(self, iq):
790        requestor = jid.internJID(iq["from"]).userhostJID()
791        service = jid.internJID(iq["to"])
792
793        try:
794            nodeIdentifier = iq.pubsub.delete["node"]
795        except KeyError:
796            raise BadRequest
797
798        return self.delete(requestor, service, nodeIdentifier)
799
800
801    def _onAffiliationsGet(self, iq):
802        raise Unsupported('modify-affiliations')
803
804
805    def _onAffiliationsSet(self, iq):
806        raise Unsupported('modify-affiliations')
807
808
809    def _onSubscriptionsGet(self, iq):
810        raise Unsupported('manage-subscriptions')
811
812
813    def _onSubscriptionsSet(self, iq):
814        raise Unsupported('manage-subscriptions')
815
816    # public methods
817
818    def notifyPublish(self, service, nodeIdentifier, notifications):
819        for recipient, items in notifications:
820            message = domish.Element((None, "message"))
821            message["from"] = service.full()
822            message["to"] = recipient.full()
823            event = message.addElement((NS_PUBSUB_EVENT, "event"))
824            element = event.addElement("items")
825            element["node"] = nodeIdentifier
826            element.children = items
827            self.send(message)
828
829
830    def notifyDelete(self, service, nodeIdentifier, recipients):
831        for recipient in recipients:
832            message = domish.Element((None, "message"))
833            message["from"] = service.full()
834            message["to"] = recipient.full()
835            event = message.addElement((NS_PUBSUB_EVENT, "event"))
836            element = event.addElement("delete")
837            element["node"] = nodeIdentifier
838            self.send(message)
839
840
841    def getNodeInfo(self, requestor, service, nodeIdentifier):
842        return None
843
844
845    def getNodes(self, requestor, service):
846        return []
847
848
849    def publish(self, requestor, service, nodeIdentifier, items):
850        raise Unsupported('publish')
851
852
853    def subscribe(self, requestor, service, nodeIdentifier, subscriber):
854        raise Unsupported('subscribe')
855
856
857    def unsubscribe(self, requestor, service, nodeIdentifier, subscriber):
858        raise Unsupported('subscribe')
859
860
861    def subscriptions(self, requestor, service):
862        raise Unsupported('retrieve-subscriptions')
863
864
865    def affiliations(self, requestor, service):
866        raise Unsupported('retrieve-affiliations')
867
868
869    def create(self, requestor, service, nodeIdentifier):
870        raise Unsupported('create-nodes')
871
872
873    def getConfigurationOptions(self):
874        return {}
875
876
877    def getDefaultConfiguration(self, requestor, service):
878        raise Unsupported('retrieve-default')
879
880
881    def getConfiguration(self, requestor, service, nodeIdentifier):
882        raise Unsupported('config-node')
883
884
885    def setConfiguration(self, requestor, service, nodeIdentifier, options):
886        raise Unsupported('config-node')
887
888
889    def items(self, requestor, service, nodeIdentifier, maxItems,
890                    itemIdentifiers):
891        raise Unsupported('retrieve-items')
892
893
894    def retract(self, requestor, service, nodeIdentifier, itemIdentifiers):
895        raise Unsupported('retract-items')
896
897
898    def purge(self, requestor, service, nodeIdentifier):
899        raise Unsupported('purge-nodes')
900
901
902    def delete(self, requestor, service, nodeIdentifier):
903        raise Unsupported('delete-nodes')
Note: See TracBrowser for help on using the repository browser.