source: wokkel/pubsub.py @ 29:dd4e908b9d12

Last change on this file since 29:dd4e908b9d12 was 29:dd4e908b9d12, checked in by Ralph Meijer <ralphm@…>, 13 years ago

Implement type checking in data forms and incoming pubsub node config.

Author: ralphm.
Fixes #15.

File size: 31.2 KB
Line 
1# -*- test-case-name: wokkel.test.test_pubsub -*-
2#
3# Copyright (c) 2003-2008 Ralph Meijer
4# See LICENSE for details.
5
6"""
7XMPP publish-subscribe protocol.
8
9This protocol is specified in
10U{XEP-0060<http://www.xmpp.org/extensions/xep-0060.html>}.
11"""
12
13from zope.interface import implements
14
15from twisted.internet import defer
16from twisted.words.protocols.jabber import jid, error, xmlstream
17from twisted.words.xish import domish
18
19from wokkel import disco, data_form, shim
20from wokkel.subprotocols import IQHandlerMixin, XMPPHandler
21from wokkel.iwokkel import IPubSubClient, IPubSubService
22
23# Iq get and set XPath queries
24IQ_GET = '/iq[@type="get"]'
25IQ_SET = '/iq[@type="set"]'
26
27# Publish-subscribe namespaces
28NS_PUBSUB = 'http://jabber.org/protocol/pubsub'
29NS_PUBSUB_EVENT = NS_PUBSUB + '#event'
30NS_PUBSUB_ERRORS = NS_PUBSUB + '#errors'
31NS_PUBSUB_OWNER = NS_PUBSUB + "#owner"
32NS_PUBSUB_NODE_CONFIG = NS_PUBSUB + "#node_config"
33NS_PUBSUB_META_DATA = NS_PUBSUB + "#meta-data"
34
35# In publish-subscribe namespace XPath query selector.
36IN_NS_PUBSUB = '[@xmlns="' + NS_PUBSUB + '"]'
37IN_NS_PUBSUB_OWNER = '[@xmlns="' + NS_PUBSUB_OWNER + '"]'
38
39# Publish-subscribe XPath queries
40PUBSUB_ELEMENT = '/pubsub' + IN_NS_PUBSUB
41PUBSUB_OWNER_ELEMENT = '/pubsub' + IN_NS_PUBSUB_OWNER
42PUBSUB_GET = IQ_GET + PUBSUB_ELEMENT
43PUBSUB_SET = IQ_SET + PUBSUB_ELEMENT
44PUBSUB_OWNER_GET = IQ_GET + PUBSUB_OWNER_ELEMENT
45PUBSUB_OWNER_SET = IQ_SET + PUBSUB_OWNER_ELEMENT
46
47# Publish-subscribe command XPath queries
48PUBSUB_PUBLISH = PUBSUB_SET + '/publish' + IN_NS_PUBSUB
49PUBSUB_CREATE = PUBSUB_SET + '/create' + IN_NS_PUBSUB
50PUBSUB_SUBSCRIBE = PUBSUB_SET + '/subscribe' + IN_NS_PUBSUB
51PUBSUB_UNSUBSCRIBE = PUBSUB_SET + '/unsubscribe' + IN_NS_PUBSUB
52PUBSUB_OPTIONS_GET = PUBSUB_GET + '/options' + IN_NS_PUBSUB
53PUBSUB_OPTIONS_SET = PUBSUB_SET + '/options' + IN_NS_PUBSUB
54PUBSUB_DEFAULT = PUBSUB_OWNER_GET + '/default' + IN_NS_PUBSUB_OWNER
55PUBSUB_CONFIGURE_GET = PUBSUB_OWNER_GET + '/configure' + IN_NS_PUBSUB_OWNER
56PUBSUB_CONFIGURE_SET = PUBSUB_OWNER_SET + '/configure' + IN_NS_PUBSUB_OWNER
57PUBSUB_SUBSCRIPTIONS = PUBSUB_GET + '/subscriptions' + IN_NS_PUBSUB
58PUBSUB_AFFILIATIONS = PUBSUB_GET + '/affiliations' + IN_NS_PUBSUB
59PUBSUB_AFFILIATIONS_GET = PUBSUB_OWNER_GET + '/affiliations' + \
60                          IN_NS_PUBSUB_OWNER
61PUBSUB_AFFILIATIONS_SET = PUBSUB_OWNER_SET + '/affiliations' + \
62                          IN_NS_PUBSUB_OWNER
63PUBSUB_SUBSCRIPTIONS_GET = PUBSUB_OWNER_GET + '/subscriptions' + \
64                          IN_NS_PUBSUB_OWNER
65PUBSUB_SUBSCRIPTIONS_SET = PUBSUB_OWNER_SET + '/subscriptions' + \
66                          IN_NS_PUBSUB_OWNER
67PUBSUB_ITEMS = PUBSUB_GET + '/items' + IN_NS_PUBSUB
68PUBSUB_RETRACT = PUBSUB_SET + '/retract' + IN_NS_PUBSUB
69PUBSUB_PURGE = PUBSUB_OWNER_SET + '/purge' + IN_NS_PUBSUB_OWNER
70PUBSUB_DELETE = PUBSUB_OWNER_SET + '/delete' + IN_NS_PUBSUB_OWNER
71
72class BadRequest(error.StanzaError):
73    """
74    Bad request stanza error.
75    """
76    def __init__(self):
77        error.StanzaError.__init__(self, 'bad-request')
78
79
80
81class SubscriptionPending(Exception):
82    """
83    Raised when the requested subscription is pending acceptance.
84    """
85
86
87
88class SubscriptionUnconfigured(Exception):
89    """
90    Raised when the requested subscription needs to be configured before
91    becoming active.
92    """
93
94
95
96class PubSubError(error.StanzaError):
97    """
98    Exception with publish-subscribe specific condition.
99    """
100    def __init__(self, condition, pubsubCondition, feature=None, text=None):
101        appCondition = domish.Element((NS_PUBSUB_ERRORS, pubsubCondition))
102        if feature:
103            appCondition['feature'] = feature
104        error.StanzaError.__init__(self, condition,
105                                         text=text,
106                                         appCondition=appCondition)
107
108
109
110class Unsupported(PubSubError):
111    def __init__(self, feature, text=None):
112        PubSubError.__init__(self, 'feature-not-implemented',
113                                   'unsupported',
114                                   feature,
115                                   text)
116
117
118
119class OptionsUnavailable(Unsupported):
120    def __init__(self):
121        Unsupported.__init__(self, 'subscription-options-unavailable')
122
123
124
125class Item(domish.Element):
126    """
127    Publish subscribe item.
128
129    This behaves like an object providing L{domish.IElement}.
130
131    Item payload can be added using C{addChild} or C{addRawXml}, or using the
132    C{payload} keyword argument to C{__init__}.
133    """
134
135    def __init__(self, id=None, payload=None):
136        """
137        @param id: optional item identifier
138        @type id: L{unicode}
139        @param payload: optional item payload. Either as a domish element, or
140                        as serialized XML.
141        @type payload: object providing L{domish.IElement} or L{unicode}.
142        """
143
144        domish.Element.__init__(self, (NS_PUBSUB, 'item'))
145        if id is not None:
146            self['id'] = id
147        if payload is not None:
148            if isinstance(payload, basestring):
149                self.addRawXml(payload)
150            else:
151                self.addChild(payload)
152
153
154
155class _PubSubRequest(xmlstream.IQ):
156    """
157    Publish subscribe request.
158
159    @ivar verb: Request verb
160    @type verb: C{str}
161    @ivar namespace: Request namespace.
162    @type namespace: C{str}
163    @ivar method: Type attribute of the IQ request. Either C{'set'} or C{'get'}
164    @type method: C{str}
165    @ivar command: Command element of the request. This is the direct child of
166                   the C{pubsub} element in the C{namespace} with the name
167                   C{verb}.
168    """
169
170    def __init__(self, xs, verb, namespace=NS_PUBSUB, method='set'):
171        xmlstream.IQ.__init__(self, xs, method)
172        self.addElement((namespace, 'pubsub'))
173
174        self.command = self.pubsub.addElement(verb)
175
176
177    def send(self, to):
178        """
179        Send out request.
180
181        Extends L{xmlstream.IQ.send} by requiring the C{to} parameter to be
182        a L{JID} instance.
183
184        @param to: Entity to send the request to.
185        @type to: L{JID}
186        """
187        destination = to.full()
188        return xmlstream.IQ.send(self, destination)
189
190
191
192class PubSubEvent(object):
193    """
194    A publish subscribe event.
195
196    @param sender: The entity from which the notification was received.
197    @type sender: L{jid.JID}
198    @param recipient: The entity to which the notification was sent.
199    @type recipient: L{wokkel.pubsub.ItemsEvent}
200    @param nodeIdentifier: Identifier of the node the event pertains to.
201    @type nodeIdentifier: C{unicode}
202    @param headers: SHIM headers, see L{wokkel.shim.extractHeaders}.
203    @type headers: L{dict}
204    """
205
206    def __init__(self, sender, recipient, nodeIdentifier, headers):
207        self.sender = sender
208        self.recipient = recipient
209        self.nodeIdentifier = nodeIdentifier
210        self.headers = headers
211
212
213
214class ItemsEvent(PubSubEvent):
215    """
216    A publish-subscribe event that signifies new, updated and retracted items.
217
218    @param items: List of received items as domish elements.
219    @type items: C{list} of L{domish.Element}
220    """
221
222    def __init__(self, sender, recipient, nodeIdentifier, items, headers):
223        PubSubEvent.__init__(self, sender, recipient, nodeIdentifier, headers)
224        self.items = items
225
226
227
228class DeleteEvent(PubSubEvent):
229    """
230    A publish-subscribe event that signifies the deletion of a node.
231    """
232
233
234
235class PurgeEvent(PubSubEvent):
236    """
237    A publish-subscribe event that signifies the purging of a node.
238    """
239
240
241
242class PubSubClient(XMPPHandler):
243    """
244    Publish subscribe client protocol.
245    """
246
247    implements(IPubSubClient)
248
249    def connectionInitialized(self):
250        self.xmlstream.addObserver('/message/event[@xmlns="%s"]' %
251                                   NS_PUBSUB_EVENT, self._onEvent)
252
253
254    def _onEvent(self, message):
255        try:
256            sender = jid.JID(message["from"])
257            recipient = jid.JID(message["to"])
258        except KeyError:
259            return
260
261        actionElement = None
262        for element in message.event.elements():
263            if element.uri == NS_PUBSUB_EVENT:
264                actionElement = element
265
266        if not actionElement:
267            return
268
269        eventHandler = getattr(self, "_onEvent_%s" % actionElement.name, None)
270
271        if eventHandler:
272            headers = shim.extractHeaders(message)
273            eventHandler(sender, recipient, actionElement, headers)
274            message.handled = True
275
276    def _onEvent_items(self, sender, recipient, action, headers):
277        nodeIdentifier = action["node"]
278
279        items = [element for element in action.elements()
280                         if element.name in ('item', 'retract')]
281
282        event = ItemsEvent(sender, recipient, nodeIdentifier, items, headers)
283        self.itemsReceived(event)
284
285    def _onEvent_delete(self, sender, recipient, action, headers):
286        nodeIdentifier = action["node"]
287        event = DeleteEvent(sender, recipient, nodeIdentifier, headers)
288        self.deleteReceived(event)
289
290    def _onEvent_purge(self, sender, recipient, action, headers):
291        nodeIdentifier = action["node"]
292        event = PurgeEvent(sender, recipient, nodeIdentifier, headers)
293        self.purgeReceived(event)
294
295    def itemsReceived(self, event):
296        pass
297
298    def deleteReceived(self, event):
299        pass
300
301    def purgeReceived(self, event):
302        pass
303
304    def createNode(self, service, nodeIdentifier=None):
305        """
306        Create a publish subscribe node.
307
308        @param service: The publish subscribe service to create the node at.
309        @type service: L{JID}
310        @param nodeIdentifier: Optional suggestion for the id of the node.
311        @type nodeIdentifier: C{unicode}
312        """
313
314
315        request = _PubSubRequest(self.xmlstream, 'create')
316        if nodeIdentifier:
317            request.command['node'] = nodeIdentifier
318
319        def cb(iq):
320            try:
321                new_node = iq.pubsub.create["node"]
322            except AttributeError:
323                # the suggested node identifier was accepted
324                new_node = nodeIdentifier
325            return new_node
326
327        return request.send(service).addCallback(cb)
328
329
330    def deleteNode(self, service, nodeIdentifier):
331        """
332        Delete a publish subscribe node.
333
334        @param service: The publish subscribe service to delete the node from.
335        @type service: L{JID}
336        @param nodeIdentifier: The identifier of the node.
337        @type nodeIdentifier: C{unicode}
338        """
339        request = _PubSubRequest(self.xmlstream, 'delete')
340        request.command['node'] = nodeIdentifier
341        return request.send(service)
342
343
344    def subscribe(self, service, nodeIdentifier, subscriber):
345        """
346        Subscribe to a publish subscribe node.
347
348        @param service: The publish subscribe service that keeps the node.
349        @type service: L{JID}
350        @param nodeIdentifier: The identifier of the node.
351        @type nodeIdentifier: C{unicode}
352        @param subscriber: The entity to subscribe to the node. This entity
353                           will get notifications of new published items.
354        @type subscriber: L{JID}
355        """
356        request = _PubSubRequest(self.xmlstream, 'subscribe')
357        request.command['node'] = nodeIdentifier
358        request.command['jid'] = subscriber.full()
359
360        def cb(iq):
361            subscription = iq.pubsub.subscription["subscription"]
362
363            if subscription == 'pending':
364                raise SubscriptionPending
365            elif subscription == 'unconfigured':
366                raise SubscriptionUnconfigured
367            else:
368                # we assume subscription == 'subscribed'
369                # any other value would be invalid, but that should have
370                # yielded a stanza error.
371                return None
372
373        return request.send(service).addCallback(cb)
374
375
376    def unsubscribe(self, service, nodeIdentifier, subscriber):
377        """
378        Unsubscribe from a publish subscribe node.
379
380        @param service: The publish subscribe service that keeps the node.
381        @type service: L{JID}
382        @param nodeIdentifier: The identifier of the node.
383        @type nodeIdentifier: C{unicode}
384        @param subscriber: The entity to unsubscribe from the node.
385        @type subscriber: L{JID}
386        """
387        request = _PubSubRequest(self.xmlstream, 'unsubscribe')
388        request.command['node'] = nodeIdentifier
389        request.command['jid'] = subscriber.full()
390        return request.send(service)
391
392
393    def publish(self, service, nodeIdentifier, items=None):
394        """
395        Publish to a publish subscribe node.
396
397        @param service: The publish subscribe service that keeps the node.
398        @type service: L{JID}
399        @param nodeIdentifier: The identifier of the node.
400        @type nodeIdentifier: C{unicode}
401        @param items: Optional list of L{Item}s to publish.
402        @type items: C{list}
403        """
404        request = _PubSubRequest(self.xmlstream, 'publish')
405        request.command['node'] = nodeIdentifier
406        if items:
407            for item in items:
408                request.command.addChild(item)
409
410        return request.send(service)
411
412
413    def items(self, service, nodeIdentifier, maxItems=None):
414        """
415        Retrieve previously published items from a publish subscribe node.
416
417        @param service: The publish subscribe service that keeps the node.
418        @type service: L{JID}
419        @param nodeIdentifier: The identifier of the node.
420        @type nodeIdentifier: C{unicode}
421        @param maxItems: Optional limit on the number of retrieved items.
422        @type maxItems: C{int}
423        """
424        request = _PubSubRequest(self.xmlstream, 'items', method='get')
425        request.command['node'] = nodeIdentifier
426        if maxItems:
427            request.command["max_items"] = str(int(maxItems))
428
429        def cb(iq):
430            items = []
431            for element in iq.pubsub.items.elements():
432                if element.uri == NS_PUBSUB and element.name == 'item':
433                    items.append(element)
434            return items
435
436        return request.send(service).addCallback(cb)
437
438
439
440class PubSubService(XMPPHandler, IQHandlerMixin):
441    """
442    Protocol implementation for a XMPP Publish Subscribe Service.
443
444    The word Service here is used as taken from the Publish Subscribe
445    specification. It is the party responsible for keeping nodes and their
446    subscriptions, and sending out notifications.
447
448    Methods from the L{IPubSubService} interface that are called as
449    a result of an XMPP request may raise exceptions. Alternatively the
450    deferred returned by these methods may have their errback called. These are
451    handled as follows:
452
453     - If the exception is an instance of L{error.StanzaError}, an error
454       response iq is returned.
455     - Any other exception is reported using L{log.msg}. An error response
456       with the condition C{internal-server-error} is returned.
457
458    The default implementation of said methods raises an L{Unsupported}
459    exception and are meant to be overridden.
460
461    @ivar discoIdentity: Service discovery identity as a dictionary with
462                         keys C{'category'}, C{'type'} and C{'name'}.
463    @ivar pubSubFeatures: List of supported publish-subscribe features for
464                          service discovery, as C{str}.
465    @type pubSubFeatures: C{list} or C{None}
466    """
467
468    implements(IPubSubService)
469
470    iqHandlers = {
471            PUBSUB_PUBLISH: '_onPublish',
472            PUBSUB_CREATE: '_onCreate',
473            PUBSUB_SUBSCRIBE: '_onSubscribe',
474            PUBSUB_OPTIONS_GET: '_onOptionsGet',
475            PUBSUB_OPTIONS_SET: '_onOptionsSet',
476            PUBSUB_AFFILIATIONS: '_onAffiliations',
477            PUBSUB_ITEMS: '_onItems',
478            PUBSUB_RETRACT: '_onRetract',
479            PUBSUB_SUBSCRIPTIONS: '_onSubscriptions',
480            PUBSUB_UNSUBSCRIBE: '_onUnsubscribe',
481
482            PUBSUB_AFFILIATIONS_GET: '_onAffiliationsGet',
483            PUBSUB_AFFILIATIONS_SET: '_onAffiliationsSet',
484            PUBSUB_CONFIGURE_GET: '_onConfigureGet',
485            PUBSUB_CONFIGURE_SET: '_onConfigureSet',
486            PUBSUB_DEFAULT: '_onDefault',
487            PUBSUB_PURGE: '_onPurge',
488            PUBSUB_DELETE: '_onDelete',
489            PUBSUB_SUBSCRIPTIONS_GET: '_onSubscriptionsGet',
490            PUBSUB_SUBSCRIPTIONS_SET: '_onSubscriptionsSet',
491
492            }
493
494
495    def __init__(self):
496        self.discoIdentity = {'category': 'pubsub',
497                              'type': 'generic',
498                              'name': 'Generic Publish-Subscribe Service'}
499
500        self.pubSubFeatures = []
501
502
503    def connectionMade(self):
504        self.xmlstream.addObserver(PUBSUB_GET, self.handleRequest)
505        self.xmlstream.addObserver(PUBSUB_SET, self.handleRequest)
506        self.xmlstream.addObserver(PUBSUB_OWNER_GET, self.handleRequest)
507        self.xmlstream.addObserver(PUBSUB_OWNER_SET, self.handleRequest)
508
509
510    def getDiscoInfo(self, requestor, target, nodeIdentifier):
511        info = []
512
513        if not nodeIdentifier:
514            info.append(disco.DiscoIdentity(**self.discoIdentity))
515
516            info.append(disco.DiscoFeature(disco.NS_ITEMS))
517            info.extend([disco.DiscoFeature("%s#%s" % (NS_PUBSUB, feature))
518                         for feature in self.pubSubFeatures])
519
520        def toInfo(nodeInfo):
521            if not nodeInfo:
522                return
523
524            (nodeType, metaData) = nodeInfo['type'], nodeInfo['meta-data']
525            info.append(disco.DiscoIdentity('pubsub', nodeType))
526            if metaData:
527                form = data_form.Form(formType="result",
528                                      formNamespace=NS_PUBSUB_META_DATA)
529                form.addField(
530                        data_form.Field(
531                            var='pubsub#node_type',
532                            value=nodeType,
533                            label='The type of node (collection or leaf)'
534                        )
535                )
536
537                for metaDatum in metaData:
538                    form.addField(data_form.Field.fromDict(metaDatum))
539
540                info.append(form.toElement())
541
542        d = self.getNodeInfo(requestor, target, nodeIdentifier or '')
543        d.addCallback(toInfo)
544        d.addBoth(lambda result: info)
545        return d
546
547
548    def getDiscoItems(self, requestor, target, nodeIdentifier):
549        if nodeIdentifier or self.hideNodes:
550            return defer.succeed([])
551
552        d = self.getNodes(requestor, target)
553        d.addCallback(lambda nodes: [disco.DiscoItem(target, node)
554                                     for node in nodes])
555        return d
556
557
558    def _findForm(self, element, formNamespace):
559        if not element:
560            return None
561
562        form = None
563        for child in element.elements():
564            try:
565                form = data_form.Form.fromElement(child)
566            except data_form.Error:
567                continue
568
569            if form.formNamespace != NS_PUBSUB_NODE_CONFIG:
570                continue
571
572        return form
573
574
575    def _onPublish(self, iq):
576        requestor = jid.internJID(iq["from"]).userhostJID()
577        service = jid.internJID(iq["to"])
578
579        try:
580            nodeIdentifier = iq.pubsub.publish["node"]
581        except KeyError:
582            raise BadRequest
583
584        items = []
585        for element in iq.pubsub.publish.elements():
586            if element.uri == NS_PUBSUB and element.name == 'item':
587                items.append(element)
588
589        return self.publish(requestor, service, nodeIdentifier, items)
590
591
592    def _onSubscribe(self, iq):
593        requestor = jid.internJID(iq["from"]).userhostJID()
594        service = jid.internJID(iq["to"])
595
596        try:
597            nodeIdentifier = iq.pubsub.subscribe["node"]
598            subscriber = jid.internJID(iq.pubsub.subscribe["jid"])
599        except KeyError:
600            raise BadRequest
601
602        def toResponse(subscription):
603            nodeIdentifier, state = subscription
604            response = domish.Element((NS_PUBSUB, "pubsub"))
605            subscription = response.addElement("subscription")
606            subscription["node"] = nodeIdentifier
607            subscription["jid"] = subscriber.full()
608            subscription["subscription"] = state
609            return response
610
611        d = self.subscribe(requestor, service, nodeIdentifier, subscriber)
612        d.addCallback(toResponse)
613        return d
614
615
616    def _onUnsubscribe(self, iq):
617        requestor = jid.internJID(iq["from"]).userhostJID()
618        service = jid.internJID(iq["to"])
619
620        try:
621            nodeIdentifier = iq.pubsub.unsubscribe["node"]
622            subscriber = jid.internJID(iq.pubsub.unsubscribe["jid"])
623        except KeyError:
624            raise BadRequest
625
626        return self.unsubscribe(requestor, service, nodeIdentifier, subscriber)
627
628
629    def _onOptionsGet(self, iq):
630        raise Unsupported('subscription-options-unavailable')
631
632
633    def _onOptionsSet(self, iq):
634        raise Unsupported('subscription-options-unavailable')
635
636
637    def _onSubscriptions(self, iq):
638        requestor = jid.internJID(iq["from"]).userhostJID()
639        service = jid.internJID(iq["to"])
640
641        def toResponse(result):
642            response = domish.Element((NS_PUBSUB, 'pubsub'))
643            subscriptions = response.addElement('subscriptions')
644            for node, subscriber, state in result:
645                item = subscriptions.addElement('subscription')
646                item['node'] = node
647                item['jid'] = subscriber.full()
648                item['subscription'] = state
649            return response
650
651        d = self.subscriptions(requestor, service)
652        d.addCallback(toResponse)
653        return d
654
655
656    def _onAffiliations(self, iq):
657        requestor = jid.internJID(iq["from"]).userhostJID()
658        service = jid.internJID(iq["to"])
659
660        def toResponse(result):
661            response = domish.Element((NS_PUBSUB, 'pubsub'))
662            affiliations = response.addElement('affiliations')
663
664            for nodeIdentifier, affiliation in result:
665                item = affiliations.addElement('affiliation')
666                item['node'] = nodeIdentifier
667                item['affiliation'] = affiliation
668
669            return response
670
671        d = self.affiliations(requestor, service)
672        d.addCallback(toResponse)
673        return d
674
675
676    def _onCreate(self, iq):
677        requestor = jid.internJID(iq["from"]).userhostJID()
678        service = jid.internJID(iq["to"])
679        nodeIdentifier = iq.pubsub.create.getAttribute("node")
680
681        def toResponse(result):
682            if not nodeIdentifier or nodeIdentifier != result:
683                response = domish.Element((NS_PUBSUB, 'pubsub'))
684                create = response.addElement('create')
685                create['node'] = result
686                return response
687            else:
688                return None
689
690        d = self.create(requestor, service, nodeIdentifier)
691        d.addCallback(toResponse)
692        return d
693
694
695    def _makeFields(self, options, values):
696        fields = []
697        for name, value in values.iteritems():
698            if name not in options:
699                continue
700
701            option = {'var': name}
702            option.update(options[name])
703            if isinstance(value, list):
704                option['values'] = value
705            else:
706                option['value'] = value
707            fields.append(data_form.Field.fromDict(option))
708        return fields
709
710    def _formFromConfiguration(self, values):
711        options = self.getConfigurationOptions()
712        fields = self._makeFields(options, values)
713        form = data_form.Form(formType="form",
714                              formNamespace=NS_PUBSUB_NODE_CONFIG,
715                              fields=fields)
716
717        return form
718
719    def _checkConfiguration(self, values):
720        options = self.getConfigurationOptions()
721        processedValues = {}
722
723        for key, value in values.iteritems():
724            if key not in options:
725                continue
726
727            option = {'var': key}
728            option.update(options[key])
729            field = data_form.Field.fromDict(option)
730            if isinstance(value, list):
731                field.values = value
732            else:
733                field.value = value
734            field.typeCheck()
735
736            if isinstance(value, list):
737                processedValues[key] = field.values
738            else:
739                processedValues[key] = field.value
740
741        return processedValues
742
743
744    def _onDefault(self, iq):
745        requestor = jid.internJID(iq["from"]).userhostJID()
746        service = jid.internJID(iq["to"])
747
748        def toResponse(options):
749            response = domish.Element((NS_PUBSUB_OWNER, "pubsub"))
750            default = response.addElement("default")
751            default.addChild(self._formFromConfiguration(options).toElement())
752            return response
753
754        d = self.getDefaultConfiguration(requestor, service)
755        d.addCallback(toResponse)
756        return d
757
758
759    def _onConfigureGet(self, iq):
760        requestor = jid.internJID(iq["from"]).userhostJID()
761        service = jid.internJID(iq["to"])
762        nodeIdentifier = iq.pubsub.configure.getAttribute("node")
763
764        def toResponse(options):
765            response = domish.Element((NS_PUBSUB_OWNER, "pubsub"))
766            configure = response.addElement("configure")
767            configure.addChild(self._formFromConfiguration(options).toElement())
768
769            if nodeIdentifier:
770                configure["node"] = nodeIdentifier
771
772            return response
773
774        d = self.getConfiguration(requestor, service, nodeIdentifier)
775        d.addCallback(toResponse)
776        return d
777
778
779    def _onConfigureSet(self, iq):
780        requestor = jid.internJID(iq["from"]).userhostJID()
781        service = jid.internJID(iq["to"])
782        nodeIdentifier = iq.pubsub.configure["node"]
783
784        # Search configuration form with correct FORM_TYPE and process it
785
786        form = self._findForm(iq.pubsub.configure, NS_PUBSUB_NODE_CONFIG)
787
788        if form:
789            if form.formType == 'submit':
790                options = self._checkConfiguration(form.getValues())
791
792                return self.setConfiguration(requestor, service,
793                                             nodeIdentifier, options)
794            elif form.formType == 'cancel':
795                return None
796
797        raise BadRequest()
798
799
800    def _onItems(self, iq):
801        requestor = jid.internJID(iq["from"]).userhostJID()
802        service = jid.internJID(iq["to"])
803
804        try:
805            nodeIdentifier = iq.pubsub.items["node"]
806        except KeyError:
807            raise BadRequest
808
809        maxItems = iq.pubsub.items.getAttribute('max_items')
810
811        if maxItems:
812            try:
813                maxItems = int(maxItems)
814            except ValueError:
815                raise BadRequest
816
817        itemIdentifiers = []
818        for child in iq.pubsub.items.elements():
819            if child.name == 'item' and child.uri == NS_PUBSUB:
820                try:
821                    itemIdentifiers.append(child["id"])
822                except KeyError:
823                    raise BadRequest
824
825        def toResponse(result):
826            response = domish.Element((NS_PUBSUB, 'pubsub'))
827            items = response.addElement('items')
828            items["node"] = nodeIdentifier
829
830            for item in result:
831                items.addChild(item)
832
833            return response
834
835        d = self.items(requestor, service, nodeIdentifier, maxItems,
836                       itemIdentifiers)
837        d.addCallback(toResponse)
838        return d
839
840
841    def _onRetract(self, iq):
842        requestor = jid.internJID(iq["from"]).userhostJID()
843        service = jid.internJID(iq["to"])
844
845        try:
846            nodeIdentifier = iq.pubsub.retract["node"]
847        except KeyError:
848            raise BadRequest
849
850        itemIdentifiers = []
851        for child in iq.pubsub.retract.elements():
852            if child.uri == NS_PUBSUB_OWNER and child.name == 'item':
853                try:
854                    itemIdentifiers.append(child["id"])
855                except KeyError:
856                    raise BadRequest
857
858        return self.retract(requestor, service, nodeIdentifier,
859                            itemIdentifiers)
860
861
862    def _onPurge(self, iq):
863        requestor = jid.internJID(iq["from"]).userhostJID()
864        service = jid.internJID(iq["to"])
865
866        try:
867            nodeIdentifier = iq.pubsub.purge["node"]
868        except KeyError:
869            raise BadRequest
870
871        return self.purge(requestor, service, nodeIdentifier)
872
873
874    def _onDelete(self, iq):
875        requestor = jid.internJID(iq["from"]).userhostJID()
876        service = jid.internJID(iq["to"])
877
878        try:
879            nodeIdentifier = iq.pubsub.delete["node"]
880        except KeyError:
881            raise BadRequest
882
883        return self.delete(requestor, service, nodeIdentifier)
884
885
886    def _onAffiliationsGet(self, iq):
887        raise Unsupported('modify-affiliations')
888
889
890    def _onAffiliationsSet(self, iq):
891        raise Unsupported('modify-affiliations')
892
893
894    def _onSubscriptionsGet(self, iq):
895        raise Unsupported('manage-subscriptions')
896
897
898    def _onSubscriptionsSet(self, iq):
899        raise Unsupported('manage-subscriptions')
900
901    # public methods
902
903    def notifyPublish(self, service, nodeIdentifier, notifications):
904        for recipient, items in notifications:
905            message = domish.Element((None, "message"))
906            message["from"] = service.full()
907            message["to"] = recipient.full()
908            event = message.addElement((NS_PUBSUB_EVENT, "event"))
909            element = event.addElement("items")
910            element["node"] = nodeIdentifier
911            element.children = items
912            self.send(message)
913
914
915    def notifyDelete(self, service, nodeIdentifier, recipients):
916        for recipient in recipients:
917            message = domish.Element((None, "message"))
918            message["from"] = service.full()
919            message["to"] = recipient.full()
920            event = message.addElement((NS_PUBSUB_EVENT, "event"))
921            element = event.addElement("delete")
922            element["node"] = nodeIdentifier
923            self.send(message)
924
925
926    def getNodeInfo(self, requestor, service, nodeIdentifier):
927        return None
928
929
930    def getNodes(self, requestor, service):
931        return []
932
933
934    def publish(self, requestor, service, nodeIdentifier, items):
935        raise Unsupported('publish')
936
937
938    def subscribe(self, requestor, service, nodeIdentifier, subscriber):
939        raise Unsupported('subscribe')
940
941
942    def unsubscribe(self, requestor, service, nodeIdentifier, subscriber):
943        raise Unsupported('subscribe')
944
945
946    def subscriptions(self, requestor, service):
947        raise Unsupported('retrieve-subscriptions')
948
949
950    def affiliations(self, requestor, service):
951        raise Unsupported('retrieve-affiliations')
952
953
954    def create(self, requestor, service, nodeIdentifier):
955        raise Unsupported('create-nodes')
956
957
958    def getConfigurationOptions(self):
959        return {}
960
961
962    def getDefaultConfiguration(self, requestor, service):
963        raise Unsupported('retrieve-default')
964
965
966    def getConfiguration(self, requestor, service, nodeIdentifier):
967        raise Unsupported('config-node')
968
969
970    def setConfiguration(self, requestor, service, nodeIdentifier, options):
971        raise Unsupported('config-node')
972
973
974    def items(self, requestor, service, nodeIdentifier, maxItems,
975                    itemIdentifiers):
976        raise Unsupported('retrieve-items')
977
978
979    def retract(self, requestor, service, nodeIdentifier, itemIdentifiers):
980        raise Unsupported('retract-items')
981
982
983    def purge(self, requestor, service, nodeIdentifier):
984        raise Unsupported('purge-nodes')
985
986
987    def delete(self, requestor, service, nodeIdentifier):
988        raise Unsupported('delete-nodes')
Note: See TracBrowser for help on using the repository browser.