source: wokkel/pubsub.py @ 63:b8139f79276f

Last change on this file since 63:b8139f79276f was 63:b8139f79276f, checked in by Ralph Meijer <ralphm@…>, 12 years ago

Make IQ timeouts work for InternalComponents?.

This introduces wokkel.compat.IQ to work with older Twisted versions, too.

Author: ralphm.
Fixes #53.

File size: 41.2 KB
Line 
1# -*- test-case-name: wokkel.test.test_pubsub -*-
2#
3# Copyright (c) 2003-2009 Ralph Meijer
4# See LICENSE for details.
5
6"""
7XMPP publish-subscribe protocol.
8
9This protocol is specified in
10U{XEP-0060<http://www.xmpp.org/extensions/xep-0060.html>}.
11"""
12
13from zope.interface import implements
14
15from twisted.internet import defer
16from twisted.python import log
17from twisted.words.protocols.jabber import jid, error
18from twisted.words.xish import domish
19
20from wokkel import disco, data_form, generic, shim
21from wokkel.compat import IQ
22from wokkel.subprotocols import IQHandlerMixin, XMPPHandler
23from wokkel.iwokkel import IPubSubClient, IPubSubService, IPubSubResource
24
25# Iq get and set XPath queries
26IQ_GET = '/iq[@type="get"]'
27IQ_SET = '/iq[@type="set"]'
28
29# Publish-subscribe namespaces
30NS_PUBSUB = 'http://jabber.org/protocol/pubsub'
31NS_PUBSUB_EVENT = NS_PUBSUB + '#event'
32NS_PUBSUB_ERRORS = NS_PUBSUB + '#errors'
33NS_PUBSUB_OWNER = NS_PUBSUB + "#owner"
34NS_PUBSUB_NODE_CONFIG = NS_PUBSUB + "#node_config"
35NS_PUBSUB_META_DATA = NS_PUBSUB + "#meta-data"
36NS_PUBSUB_SUBSCRIBE_OPTIONS = NS_PUBSUB + "#subscribe_options"
37
38# XPath to match pubsub requests
39PUBSUB_REQUEST = '/iq[@type="get" or @type="set"]/' + \
40                    'pubsub[@xmlns="' + NS_PUBSUB + '" or ' + \
41                           '@xmlns="' + NS_PUBSUB_OWNER + '"]'
42
43class SubscriptionPending(Exception):
44    """
45    Raised when the requested subscription is pending acceptance.
46    """
47
48
49
50class SubscriptionUnconfigured(Exception):
51    """
52    Raised when the requested subscription needs to be configured before
53    becoming active.
54    """
55
56
57
58class PubSubError(error.StanzaError):
59    """
60    Exception with publish-subscribe specific condition.
61    """
62    def __init__(self, condition, pubsubCondition, feature=None, text=None):
63        appCondition = domish.Element((NS_PUBSUB_ERRORS, pubsubCondition))
64        if feature:
65            appCondition['feature'] = feature
66        error.StanzaError.__init__(self, condition,
67                                         text=text,
68                                         appCondition=appCondition)
69
70
71
72class BadRequest(error.StanzaError):
73    """
74    Bad request stanza error.
75    """
76    def __init__(self, pubsubCondition=None, text=None):
77        if pubsubCondition:
78            appCondition = domish.Element((NS_PUBSUB_ERRORS, pubsubCondition))
79        else:
80            appCondition = None
81        error.StanzaError.__init__(self, 'bad-request',
82                                         text=text,
83                                         appCondition=appCondition)
84
85
86
87class Unsupported(PubSubError):
88    def __init__(self, feature, text=None):
89        self.feature = feature
90        PubSubError.__init__(self, 'feature-not-implemented',
91                                   'unsupported',
92                                   feature,
93                                   text)
94
95    def __str__(self):
96        message = PubSubError.__str__(self)
97        message += ', feature %r' % self.feature
98        return message
99
100
101class Subscription(object):
102    """
103    A subscription to a node.
104
105    @ivar nodeIdentifier: The identifier of the node subscribed to.
106                          The root node is denoted by C{None}.
107    @ivar subscriber: The subscribing entity.
108    @ivar state: The subscription state. One of C{'subscribed'}, C{'pending'},
109                 C{'unconfigured'}.
110    @ivar options: Optional list of subscription options.
111    @type options: C{dict}.
112    """
113
114    def __init__(self, nodeIdentifier, subscriber, state, options=None):
115        self.nodeIdentifier = nodeIdentifier
116        self.subscriber = subscriber
117        self.state = state
118        self.options = options or {}
119
120
121
122class Item(domish.Element):
123    """
124    Publish subscribe item.
125
126    This behaves like an object providing L{domish.IElement}.
127
128    Item payload can be added using C{addChild} or C{addRawXml}, or using the
129    C{payload} keyword argument to C{__init__}.
130    """
131
132    def __init__(self, id=None, payload=None):
133        """
134        @param id: optional item identifier
135        @type id: L{unicode}
136        @param payload: optional item payload. Either as a domish element, or
137                        as serialized XML.
138        @type payload: object providing L{domish.IElement} or L{unicode}.
139        """
140
141        domish.Element.__init__(self, (NS_PUBSUB, 'item'))
142        if id is not None:
143            self['id'] = id
144        if payload is not None:
145            if isinstance(payload, basestring):
146                self.addRawXml(payload)
147            else:
148                self.addChild(payload)
149
150
151
152class PubSubRequest(generic.Stanza):
153    """
154    A publish-subscribe request.
155
156    The set of instance variables used depends on the type of request. If
157    a variable is not applicable or not passed in the request, its value is
158    C{None}.
159
160    @ivar verb: The type of publish-subscribe request. See L{_requestVerbMap}.
161    @type verb: C{str}.
162
163    @ivar affiliations: Affiliations to be modified.
164    @type affiliations: C{set}
165    @ivar items: The items to be published, as L{domish.Element}s.
166    @type items: C{list}
167    @ivar itemIdentifiers: Identifiers of the items to be retrieved or
168                           retracted.
169    @type itemIdentifiers: C{set}
170    @ivar maxItems: Maximum number of items to retrieve.
171    @type maxItems: C{int}.
172    @ivar nodeIdentifier: Identifier of the node the request is about.
173    @type nodeIdentifier: C{unicode}
174    @ivar nodeType: The type of node that should be created, or for which the
175                    configuration is retrieved. C{'leaf'} or C{'collection'}.
176    @type nodeType: C{str}
177    @ivar options: Configurations options for nodes, subscriptions and publish
178                   requests.
179    @type options: L{data_form.Form}
180    @ivar subscriber: The subscribing entity.
181    @type subscriber: L{JID}
182    @ivar subscriptionIdentifier: Identifier for a specific subscription.
183    @type subscriptionIdentifier: C{unicode}
184    @ivar subscriptions: Subscriptions to be modified, as a set of
185                         L{Subscription}.
186    @type subscriptions: C{set}
187    """
188
189    verb = None
190
191    affiliations = None
192    items = None
193    itemIdentifiers = None
194    maxItems = None
195    nodeIdentifier = None
196    nodeType = None
197    options = None
198    subscriber = None
199    subscriptionIdentifier = None
200    subscriptions = None
201
202    # Map request iq type and subelement name to request verb
203    _requestVerbMap = {
204        ('set', NS_PUBSUB, 'publish'): 'publish',
205        ('set', NS_PUBSUB, 'subscribe'): 'subscribe',
206        ('set', NS_PUBSUB, 'unsubscribe'): 'unsubscribe',
207        ('get', NS_PUBSUB, 'options'): 'optionsGet',
208        ('set', NS_PUBSUB, 'options'): 'optionsSet',
209        ('get', NS_PUBSUB, 'subscriptions'): 'subscriptions',
210        ('get', NS_PUBSUB, 'affiliations'): 'affiliations',
211        ('set', NS_PUBSUB, 'create'): 'create',
212        ('get', NS_PUBSUB_OWNER, 'default'): 'default',
213        ('get', NS_PUBSUB_OWNER, 'configure'): 'configureGet',
214        ('set', NS_PUBSUB_OWNER, 'configure'): 'configureSet',
215        ('get', NS_PUBSUB, 'items'): 'items',
216        ('set', NS_PUBSUB, 'retract'): 'retract',
217        ('set', NS_PUBSUB_OWNER, 'purge'): 'purge',
218        ('set', NS_PUBSUB_OWNER, 'delete'): 'delete',
219        ('get', NS_PUBSUB_OWNER, 'affiliations'): 'affiliationsGet',
220        ('set', NS_PUBSUB_OWNER, 'affiliations'): 'affiliationsSet',
221        ('get', NS_PUBSUB_OWNER, 'subscriptions'): 'subscriptionsGet',
222        ('set', NS_PUBSUB_OWNER, 'subscriptions'): 'subscriptionsSet',
223    }
224
225    # Map request verb to request iq type and subelement name
226    _verbRequestMap = dict(((v, k) for k, v in _requestVerbMap.iteritems()))
227
228    # Map request verb to parameter handler names
229    _parameters = {
230        'publish': ['node', 'items'],
231        'subscribe': ['nodeOrEmpty', 'jid'],
232        'unsubscribe': ['nodeOrEmpty', 'jid'],
233        'optionsGet': ['nodeOrEmpty', 'jid'],
234        'optionsSet': ['nodeOrEmpty', 'jid', 'options'],
235        'subscriptions': [],
236        'affiliations': [],
237        'create': ['nodeOrNone'],
238        'default': ['default'],
239        'configureGet': ['nodeOrEmpty'],
240        'configureSet': ['nodeOrEmpty', 'configure'],
241        'items': ['node', 'maxItems', 'itemIdentifiers'],
242        'retract': ['node', 'itemIdentifiers'],
243        'purge': ['node'],
244        'delete': ['node'],
245        'affiliationsGet': ['nodeOrEmpty'],
246        'affiliationsSet': [],
247        'subscriptionsGet': ['nodeOrEmpty'],
248        'subscriptionsSet': [],
249    }
250
251    def __init__(self, verb=None):
252        self.verb = verb
253
254
255    @staticmethod
256    def _findForm(element, formNamespace):
257        """
258        Find a Data Form.
259
260        Look for an element that represents a Data Form with the specified
261        form namespace as a child element of the given element.
262        """
263        if not element:
264            return None
265
266        form = None
267        for child in element.elements():
268            try:
269                form = data_form.Form.fromElement(child)
270            except data_form.Error:
271                continue
272
273            if form.formNamespace != NS_PUBSUB_NODE_CONFIG:
274                continue
275
276        return form
277
278
279    def _parse_node(self, verbElement):
280        """
281        Parse the required node identifier out of the verbElement.
282        """
283        try:
284            self.nodeIdentifier = verbElement["node"]
285        except KeyError:
286            raise BadRequest('nodeid-required')
287
288
289    def _render_node(self, verbElement):
290        """
291        Render the required node identifier on the verbElement.
292        """
293        if not self.nodeIdentifier:
294            raise Exception("Node identifier is required")
295
296        verbElement['node'] = self.nodeIdentifier
297
298
299    def _parse_nodeOrEmpty(self, verbElement):
300        """
301        Parse the node identifier out of the verbElement. May be empty.
302        """
303        self.nodeIdentifier = verbElement.getAttribute("node", '')
304
305
306    def _render_nodeOrEmpty(self, verbElement):
307        """
308        Render the node identifier on the verbElement. May be empty.
309        """
310        if self.nodeIdentifier:
311            verbElement['node'] = self.nodeIdentifier
312
313
314    def _parse_nodeOrNone(self, verbElement):
315        """
316        Parse the optional node identifier out of the verbElement.
317        """
318        self.nodeIdentifier = verbElement.getAttribute("node")
319
320
321    def _render_nodeOrNone(self, verbElement):
322        """
323        Render the optional node identifier on the verbElement.
324        """
325        if self.nodeIdentifier:
326            verbElement['node'] = self.nodeIdentifier
327
328
329    def _parse_items(self, verbElement):
330        """
331        Parse items out of the verbElement for publish requests.
332        """
333        self.items = []
334        for element in verbElement.elements():
335            if element.uri == NS_PUBSUB and element.name == 'item':
336                self.items.append(element)
337
338
339    def _render_items(self, verbElement):
340        """
341        Render items into the verbElement for publish requests.
342        """
343        if self.items:
344            for item in self.items:
345                verbElement.addChild(item)
346
347
348    def _parse_jid(self, verbElement):
349        """
350        Parse subscriber out of the verbElement for un-/subscribe requests.
351        """
352        try:
353            self.subscriber = jid.internJID(verbElement["jid"])
354        except KeyError:
355            raise BadRequest('jid-required')
356
357
358    def _render_jid(self, verbElement):
359        """
360        Render subscriber into the verbElement for un-/subscribe requests.
361        """
362        verbElement['jid'] = self.subscriber.full()
363
364
365    def _parse_default(self, verbElement):
366        """
367        Parse node type out of a request for the default node configuration.
368        """
369        form = PubSubRequest._findForm(verbElement, NS_PUBSUB_NODE_CONFIG)
370        if form and form.formType == 'submit':
371            values = form.getValues()
372            self.nodeType = values.get('pubsub#node_type', 'leaf')
373        else:
374            self.nodeType = 'leaf'
375
376
377    def _parse_configure(self, verbElement):
378        """
379        Parse options out of a request for setting the node configuration.
380        """
381        form = PubSubRequest._findForm(verbElement, NS_PUBSUB_NODE_CONFIG)
382        if form:
383            if form.formType == 'submit':
384                self.options = form.getValues()
385            elif form.formType == 'cancel':
386                self.options = {}
387            else:
388                raise BadRequest(text="Unexpected form type %r" % form.formType)
389        else:
390            raise BadRequest(text="Missing configuration form")
391
392
393
394    def _parse_itemIdentifiers(self, verbElement):
395        """
396        Parse item identifiers out of items and retract requests.
397        """
398        self.itemIdentifiers = []
399        for element in verbElement.elements():
400            if element.uri == NS_PUBSUB and element.name == 'item':
401                try:
402                    self.itemIdentifiers.append(element["id"])
403                except KeyError:
404                    raise BadRequest()
405
406
407    def _render_itemIdentifiers(self, verbElement):
408        """
409        Render item identifiers into items and retract requests.
410        """
411        if self.itemIdentifiers:
412            for itemIdentifier in self.itemIdentifiers:
413                item = verbElement.addElement('item')
414                item['id'] = itemIdentifier
415
416
417    def _parse_maxItems(self, verbElement):
418        """
419        Parse maximum items out of an items request.
420        """
421        value = verbElement.getAttribute('max_items')
422
423        if value:
424            try:
425                self.maxItems = int(value)
426            except ValueError:
427                raise BadRequest(text="Field max_items requires a positive " +
428                                      "integer value")
429
430
431    def _render_maxItems(self, verbElement):
432        """
433        Parse maximum items into an items request.
434        """
435        if self.maxItems:
436            verbElement['max_items'] = unicode(self.maxItems)
437
438
439    def _parse_options(self, verbElement):
440        form = PubSubRequest._findForm(verbElement, NS_PUBSUB_SUBSCRIBE_OPTIONS)
441        if form:
442            if form.formType == 'submit':
443                self.options = form.getValues()
444            elif form.formType == 'cancel':
445                self.options = {}
446            else:
447                raise BadRequest(text="Unexpected form type %r" % form.formType)
448        else:
449            raise BadRequest(text="Missing options form")
450
451    def parseElement(self, element):
452        """
453        Parse the publish-subscribe verb and parameters out of a request.
454        """
455        generic.Stanza.parseElement(self, element)
456
457        for child in element.pubsub.elements():
458            key = (self.stanzaType, child.uri, child.name)
459            try:
460                verb = self._requestVerbMap[key]
461            except KeyError:
462                continue
463            else:
464                self.verb = verb
465                break
466
467        if not self.verb:
468            raise NotImplementedError()
469
470        for parameter in self._parameters[verb]:
471            getattr(self, '_parse_%s' % parameter)(child)
472
473
474    def send(self, xs):
475        """
476        Send this request to its recipient.
477
478        This renders all of the relevant parameters for this specific
479        requests into an L{IQ}, and invoke its C{send} method.
480        This returns a deferred that fires upon reception of a response. See
481        L{IQ} for details.
482
483        @param xs: The XML stream to send the request on.
484        @type xs: L{xmlstream.XmlStream}
485        @rtype: L{defer.Deferred}.
486        """
487
488        try:
489            (self.stanzaType,
490             childURI,
491             childName) = self._verbRequestMap[self.verb]
492        except KeyError:
493            raise NotImplementedError()
494
495        iq = IQ(xs, self.stanzaType)
496        iq.addElement((childURI, 'pubsub'))
497        verbElement = iq.pubsub.addElement(childName)
498
499        if self.sender:
500            iq['from'] = self.sender.full()
501        if self.recipient:
502            iq['to'] = self.recipient.full()
503
504        for parameter in self._parameters[self.verb]:
505            getattr(self, '_render_%s' % parameter)(verbElement)
506
507        return iq.send()
508
509
510
511class PubSubEvent(object):
512    """
513    A publish subscribe event.
514
515    @param sender: The entity from which the notification was received.
516    @type sender: L{jid.JID}
517    @param recipient: The entity to which the notification was sent.
518    @type recipient: L{wokkel.pubsub.ItemsEvent}
519    @param nodeIdentifier: Identifier of the node the event pertains to.
520    @type nodeIdentifier: C{unicode}
521    @param headers: SHIM headers, see L{wokkel.shim.extractHeaders}.
522    @type headers: L{dict}
523    """
524
525    def __init__(self, sender, recipient, nodeIdentifier, headers):
526        self.sender = sender
527        self.recipient = recipient
528        self.nodeIdentifier = nodeIdentifier
529        self.headers = headers
530
531
532
533class ItemsEvent(PubSubEvent):
534    """
535    A publish-subscribe event that signifies new, updated and retracted items.
536
537    @param items: List of received items as domish elements.
538    @type items: C{list} of L{domish.Element}
539    """
540
541    def __init__(self, sender, recipient, nodeIdentifier, items, headers):
542        PubSubEvent.__init__(self, sender, recipient, nodeIdentifier, headers)
543        self.items = items
544
545
546
547class DeleteEvent(PubSubEvent):
548    """
549    A publish-subscribe event that signifies the deletion of a node.
550    """
551
552    redirectURI = None
553
554
555
556class PurgeEvent(PubSubEvent):
557    """
558    A publish-subscribe event that signifies the purging of a node.
559    """
560
561
562
563class PubSubClient(XMPPHandler):
564    """
565    Publish subscribe client protocol.
566    """
567
568    implements(IPubSubClient)
569
570    def connectionInitialized(self):
571        self.xmlstream.addObserver('/message/event[@xmlns="%s"]' %
572                                   NS_PUBSUB_EVENT, self._onEvent)
573
574
575    def _onEvent(self, message):
576        try:
577            sender = jid.JID(message["from"])
578            recipient = jid.JID(message["to"])
579        except KeyError:
580            return
581
582        actionElement = None
583        for element in message.event.elements():
584            if element.uri == NS_PUBSUB_EVENT:
585                actionElement = element
586
587        if not actionElement:
588            return
589
590        eventHandler = getattr(self, "_onEvent_%s" % actionElement.name, None)
591
592        if eventHandler:
593            headers = shim.extractHeaders(message)
594            eventHandler(sender, recipient, actionElement, headers)
595            message.handled = True
596
597
598    def _onEvent_items(self, sender, recipient, action, headers):
599        nodeIdentifier = action["node"]
600
601        items = [element for element in action.elements()
602                         if element.name in ('item', 'retract')]
603
604        event = ItemsEvent(sender, recipient, nodeIdentifier, items, headers)
605        self.itemsReceived(event)
606
607
608    def _onEvent_delete(self, sender, recipient, action, headers):
609        nodeIdentifier = action["node"]
610        event = DeleteEvent(sender, recipient, nodeIdentifier, headers)
611        if action.redirect:
612            event.redirectURI = action.redirect.getAttribute('uri')
613        self.deleteReceived(event)
614
615
616    def _onEvent_purge(self, sender, recipient, action, headers):
617        nodeIdentifier = action["node"]
618        event = PurgeEvent(sender, recipient, nodeIdentifier, headers)
619        self.purgeReceived(event)
620
621
622    def itemsReceived(self, event):
623        pass
624
625
626    def deleteReceived(self, event):
627        pass
628
629
630    def purgeReceived(self, event):
631        pass
632
633
634    def createNode(self, service, nodeIdentifier=None, sender=None):
635        """
636        Create a publish subscribe node.
637
638        @param service: The publish subscribe service to create the node at.
639        @type service: L{JID}
640        @param nodeIdentifier: Optional suggestion for the id of the node.
641        @type nodeIdentifier: C{unicode}
642        """
643        request = PubSubRequest('create')
644        request.recipient = service
645        request.nodeIdentifier = nodeIdentifier
646        request.sender = sender
647
648        def cb(iq):
649            try:
650                new_node = iq.pubsub.create["node"]
651            except AttributeError:
652                # the suggested node identifier was accepted
653                new_node = nodeIdentifier
654            return new_node
655
656        d = request.send(self.xmlstream)
657        d.addCallback(cb)
658        return d
659
660
661    def deleteNode(self, service, nodeIdentifier, sender=None):
662        """
663        Delete a publish subscribe node.
664
665        @param service: The publish subscribe service to delete the node from.
666        @type service: L{JID}
667        @param nodeIdentifier: The identifier of the node.
668        @type nodeIdentifier: C{unicode}
669        """
670        request = PubSubRequest('delete')
671        request.recipient = service
672        request.nodeIdentifier = nodeIdentifier
673        request.sender = sender
674        return request.send(self.xmlstream)
675
676
677    def subscribe(self, service, nodeIdentifier, subscriber, sender=None):
678        """
679        Subscribe to a publish subscribe node.
680
681        @param service: The publish subscribe service that keeps the node.
682        @type service: L{JID}
683        @param nodeIdentifier: The identifier of the node.
684        @type nodeIdentifier: C{unicode}
685        @param subscriber: The entity to subscribe to the node. This entity
686                           will get notifications of new published items.
687        @type subscriber: L{JID}
688        """
689        request = PubSubRequest('subscribe')
690        request.recipient = service
691        request.nodeIdentifier = nodeIdentifier
692        request.subscriber = subscriber
693        request.sender = sender
694
695        def cb(iq):
696            subscription = iq.pubsub.subscription["subscription"]
697
698            if subscription == 'pending':
699                raise SubscriptionPending
700            elif subscription == 'unconfigured':
701                raise SubscriptionUnconfigured
702            else:
703                # we assume subscription == 'subscribed'
704                # any other value would be invalid, but that should have
705                # yielded a stanza error.
706                return None
707
708        d = request.send(self.xmlstream)
709        d.addCallback(cb)
710        return d
711
712
713    def unsubscribe(self, service, nodeIdentifier, subscriber, sender=None):
714        """
715        Unsubscribe from a publish subscribe node.
716
717        @param service: The publish subscribe service that keeps the node.
718        @type service: L{JID}
719        @param nodeIdentifier: The identifier of the node.
720        @type nodeIdentifier: C{unicode}
721        @param subscriber: The entity to unsubscribe from the node.
722        @type subscriber: L{JID}
723        """
724        request = PubSubRequest('unsubscribe')
725        request.recipient = service
726        request.nodeIdentifier = nodeIdentifier
727        request.subscriber = subscriber
728        request.sender = sender
729        return request.send(self.xmlstream)
730
731
732    def publish(self, service, nodeIdentifier, items=None, sender=None):
733        """
734        Publish to a publish subscribe node.
735
736        @param service: The publish subscribe service that keeps the node.
737        @type service: L{JID}
738        @param nodeIdentifier: The identifier of the node.
739        @type nodeIdentifier: C{unicode}
740        @param items: Optional list of L{Item}s to publish.
741        @type items: C{list}
742        """
743        request = PubSubRequest('publish')
744        request.recipient = service
745        request.nodeIdentifier = nodeIdentifier
746        request.items = items
747        request.sender = sender
748        return request.send(self.xmlstream)
749
750
751    def items(self, service, nodeIdentifier, maxItems=None, sender=None):
752        """
753        Retrieve previously published items from a publish subscribe node.
754
755        @param service: The publish subscribe service that keeps the node.
756        @type service: L{JID}
757        @param nodeIdentifier: The identifier of the node.
758        @type nodeIdentifier: C{unicode}
759        @param maxItems: Optional limit on the number of retrieved items.
760        @type maxItems: C{int}
761        """
762        request = PubSubRequest('items')
763        request.recipient = service
764        request.nodeIdentifier = nodeIdentifier
765        if maxItems:
766            request.maxItems = str(int(maxItems))
767        request.sender = sender
768
769        def cb(iq):
770            items = []
771            for element in iq.pubsub.items.elements():
772                if element.uri == NS_PUBSUB and element.name == 'item':
773                    items.append(element)
774            return items
775
776        d = request.send(self.xmlstream)
777        d.addCallback(cb)
778        return d
779
780
781
782class PubSubService(XMPPHandler, IQHandlerMixin):
783    """
784    Protocol implementation for a XMPP Publish Subscribe Service.
785
786    The word Service here is used as taken from the Publish Subscribe
787    specification. It is the party responsible for keeping nodes and their
788    subscriptions, and sending out notifications.
789
790    Methods from the L{IPubSubService} interface that are called as
791    a result of an XMPP request may raise exceptions. Alternatively the
792    deferred returned by these methods may have their errback called. These are
793    handled as follows:
794
795     - If the exception is an instance of L{error.StanzaError}, an error
796       response iq is returned.
797     - Any other exception is reported using L{log.msg}. An error response
798       with the condition C{internal-server-error} is returned.
799
800    The default implementation of said methods raises an L{Unsupported}
801    exception and are meant to be overridden.
802
803    @ivar discoIdentity: Service discovery identity as a dictionary with
804                         keys C{'category'}, C{'type'} and C{'name'}.
805    @ivar pubSubFeatures: List of supported publish-subscribe features for
806                          service discovery, as C{str}.
807    @type pubSubFeatures: C{list} or C{None}
808    """
809
810    implements(IPubSubService)
811
812    iqHandlers = {
813            '/*': '_onPubSubRequest',
814            }
815
816    _legacyHandlers = {
817        'publish': ('publish', ['sender', 'recipient',
818                                'nodeIdentifier', 'items']),
819        'subscribe': ('subscribe', ['sender', 'recipient',
820                                    'nodeIdentifier', 'subscriber']),
821        'unsubscribe': ('unsubscribe', ['sender', 'recipient',
822                                        'nodeIdentifier', 'subscriber']),
823        'subscriptions': ('subscriptions', ['sender', 'recipient']),
824        'affiliations': ('affiliations', ['sender', 'recipient']),
825        'create': ('create', ['sender', 'recipient', 'nodeIdentifier']),
826        'getConfigurationOptions': ('getConfigurationOptions', []),
827        'default': ('getDefaultConfiguration',
828                    ['sender', 'recipient', 'nodeType']),
829        'configureGet': ('getConfiguration', ['sender', 'recipient',
830                                              'nodeIdentifier']),
831        'configureSet': ('setConfiguration', ['sender', 'recipient',
832                                              'nodeIdentifier', 'options']),
833        'items': ('items', ['sender', 'recipient', 'nodeIdentifier',
834                            'maxItems', 'itemIdentifiers']),
835        'retract': ('retract', ['sender', 'recipient', 'nodeIdentifier',
836                                'itemIdentifiers']),
837        'purge': ('purge', ['sender', 'recipient', 'nodeIdentifier']),
838        'delete': ('delete', ['sender', 'recipient', 'nodeIdentifier']),
839    }
840
841    hideNodes = False
842
843    def __init__(self, resource=None):
844        self.resource = resource
845        self.discoIdentity = {'category': 'pubsub',
846                              'type': 'generic',
847                              'name': 'Generic Publish-Subscribe Service'}
848
849        self.pubSubFeatures = []
850
851
852    def connectionMade(self):
853        self.xmlstream.addObserver(PUBSUB_REQUEST, self.handleRequest)
854
855
856    def getDiscoInfo(self, requestor, target, nodeIdentifier):
857        def toInfo(nodeInfo, info):
858            if not nodeInfo:
859                return info
860
861            (nodeType, metaData) = nodeInfo['type'], nodeInfo['meta-data']
862            info.append(disco.DiscoIdentity('pubsub', nodeType))
863            if metaData:
864                form = data_form.Form(formType="result",
865                                      formNamespace=NS_PUBSUB_META_DATA)
866                form.addField(
867                        data_form.Field(
868                            var='pubsub#node_type',
869                            value=nodeType,
870                            label='The type of node (collection or leaf)'
871                        )
872                )
873
874                for metaDatum in metaData:
875                    form.addField(data_form.Field.fromDict(metaDatum))
876
877                info.append(form)
878
879            return info
880
881        info = []
882
883        request = PubSubRequest('discoInfo')
884
885        if self.resource is not None:
886            resource = self.resource.locateResource(request)
887            identity = resource.discoIdentity
888            features = resource.features
889            getInfo = resource.getInfo
890        else:
891            category, idType, name = self.discoIdentity
892            identity = disco.DiscoIdentity(category, idType, name)
893            features = self.pubSubFeatures
894            getInfo = self.getNodeInfo
895
896        if not nodeIdentifier:
897            info.append(identity)
898            info.append(disco.DiscoFeature(disco.NS_DISCO_ITEMS))
899            info.extend([disco.DiscoFeature("%s#%s" % (NS_PUBSUB, feature))
900                         for feature in features])
901
902        d = getInfo(requestor, target, nodeIdentifier or '')
903        d.addCallback(toInfo, info)
904        d.addErrback(log.err)
905        return d
906
907
908    def getDiscoItems(self, requestor, target, nodeIdentifier):
909        if self.hideNodes:
910            d = defer.succeed([])
911        elif self.resource is not None:
912            request = PubSubRequest('discoInfo')
913            resource = self.resource.locateResource(request)
914            d = resource.getNodes(requestor, target, nodeIdentifier)
915        elif nodeIdentifier:
916            d = self.getNodes(requestor, target)
917        else:
918            d = defer.succeed([])
919           
920
921
922        d.addCallback(lambda nodes: [disco.DiscoItem(target, node)
923                                     for node in nodes])
924        return d
925
926
927    def _onPubSubRequest(self, iq):
928        request = PubSubRequest.fromElement(iq)
929
930        if self.resource is not None:
931            resource = self.resource.locateResource(request)
932        else:
933            resource = self
934
935        # Preprocess the request, knowing the handling resource
936        try:
937            preProcessor = getattr(self, '_preProcess_%s' % request.verb)
938        except AttributeError:
939            pass
940        else:
941            request = preProcessor(resource, request)
942            if request is None:
943                return defer.succeed(None)
944
945        # Process the request itself,
946        if resource is not self:
947            try:
948                handler = getattr(resource, request.verb)
949            except AttributeError:
950                # fix lookup feature
951                text = "Request verb: %s" % request.verb
952                return defer.fail(Unsupported('', text))
953
954            d = handler(request)
955        else:
956            handlerName, argNames = self._legacyHandlers[request.verb]
957            handler = getattr(self, handlerName)
958            args = [getattr(request, arg) for arg in argNames]
959            d = handler(*args)
960
961        # If needed, translate the result into a response
962        try:
963            cb = getattr(self, '_toResponse_%s' % request.verb)
964        except AttributeError:
965            pass
966        else:
967            d.addCallback(cb, resource, request)
968
969        return d
970
971
972    def _toResponse_subscribe(self, result, resource, request):
973        response = domish.Element((NS_PUBSUB, "pubsub"))
974        subscription = response.addElement("subscription")
975        if result.nodeIdentifier:
976            subscription["node"] = result.nodeIdentifier
977        subscription["jid"] = result.subscriber.full()
978        subscription["subscription"] = result.state
979        return response
980
981
982    def _toResponse_subscriptions(self, result, resource, request):
983        response = domish.Element((NS_PUBSUB, 'pubsub'))
984        subscriptions = response.addElement('subscriptions')
985        for subscription in result:
986            item = subscriptions.addElement('subscription')
987            item['node'] = subscription.nodeIdentifier
988            item['jid'] = subscription.subscriber.full()
989            item['subscription'] = subscription.state
990        return response
991
992
993    def _toResponse_affiliations(self, result, resource, request):
994        response = domish.Element((NS_PUBSUB, 'pubsub'))
995        affiliations = response.addElement('affiliations')
996
997        for nodeIdentifier, affiliation in result:
998            item = affiliations.addElement('affiliation')
999            item['node'] = nodeIdentifier
1000            item['affiliation'] = affiliation
1001
1002        return response
1003
1004
1005    def _toResponse_create(self, result, resource, request):
1006        if not request.nodeIdentifier or request.nodeIdentifier != result:
1007            response = domish.Element((NS_PUBSUB, 'pubsub'))
1008            create = response.addElement('create')
1009            create['node'] = result
1010            return response
1011        else:
1012            return None
1013
1014
1015    def _makeFields(self, options, values):
1016        fields = []
1017        for name, value in values.iteritems():
1018            if name not in options:
1019                continue
1020
1021            option = {'var': name}
1022            option.update(options[name])
1023            if isinstance(value, list):
1024                option['values'] = value
1025            else:
1026                option['value'] = value
1027            fields.append(data_form.Field.fromDict(option))
1028        return fields
1029
1030
1031    def _formFromConfiguration(self, resource, values):
1032        options = resource.getConfigurationOptions()
1033        fields = self._makeFields(options, values)
1034        form = data_form.Form(formType="form",
1035                              formNamespace=NS_PUBSUB_NODE_CONFIG,
1036                              fields=fields)
1037
1038        return form
1039
1040
1041    def _checkConfiguration(self, resource, values):
1042        options = resource.getConfigurationOptions()
1043        processedValues = {}
1044
1045        for key, value in values.iteritems():
1046            if key not in options:
1047                continue
1048
1049            option = {'var': key}
1050            option.update(options[key])
1051            field = data_form.Field.fromDict(option)
1052            if isinstance(value, list):
1053                field.values = value
1054            else:
1055                field.value = value
1056            field.typeCheck()
1057
1058            if isinstance(value, list):
1059                processedValues[key] = field.values
1060            else:
1061                processedValues[key] = field.value
1062
1063        return processedValues
1064
1065
1066    def _preProcess_default(self, resource, request):
1067        if request.nodeType not in ('leaf', 'collection'):
1068            raise error.StanzaError('not-acceptable')
1069        else:
1070            return request
1071
1072
1073    def _toResponse_default(self, options, resource, request):
1074        response = domish.Element((NS_PUBSUB_OWNER, "pubsub"))
1075        default = response.addElement("default")
1076        form = self._formFromConfiguration(resource, options)
1077        default.addChild(form.toElement())
1078        return response
1079
1080
1081    def _toResponse_configureGet(self, options, resource, request):
1082        response = domish.Element((NS_PUBSUB_OWNER, "pubsub"))
1083        configure = response.addElement("configure")
1084        form = self._formFromConfiguration(resource, options)
1085        configure.addChild(form.toElement())
1086
1087        if request.nodeIdentifier:
1088            configure["node"] = request.nodeIdentifier
1089
1090        return response
1091
1092
1093    def _preProcess_configureSet(self, resource, request):
1094        if request.options:
1095            request.options = self._checkConfiguration(resource,
1096                                                       request.options)
1097            return request
1098        else:
1099            return None
1100
1101
1102    def _toResponse_items(self, result, resource, request):
1103        response = domish.Element((NS_PUBSUB, 'pubsub'))
1104        items = response.addElement('items')
1105        items["node"] = request.nodeIdentifier
1106
1107        for item in result:
1108            items.addChild(item)
1109
1110        return response
1111
1112
1113    def _createNotification(self, eventType, service, nodeIdentifier,
1114                                  subscriber, subscriptions=None):
1115        headers = []
1116
1117        if subscriptions:
1118            for subscription in subscriptions:
1119                if nodeIdentifier != subscription.nodeIdentifier:
1120                    headers.append(('Collection', subscription.nodeIdentifier))
1121
1122        message = domish.Element((None, "message"))
1123        message["from"] = service.full()
1124        message["to"] = subscriber.full()
1125        event = message.addElement((NS_PUBSUB_EVENT, "event"))
1126
1127        element = event.addElement(eventType)
1128        element["node"] = nodeIdentifier
1129
1130        if headers:
1131            message.addChild(shim.Headers(headers))
1132
1133        return message
1134
1135    # public methods
1136
1137    def notifyPublish(self, service, nodeIdentifier, notifications):
1138        for subscriber, subscriptions, items in notifications:
1139            message = self._createNotification('items', service,
1140                                               nodeIdentifier, subscriber,
1141                                               subscriptions)
1142            message.event.items.children = items
1143            self.send(message)
1144
1145
1146    def notifyDelete(self, service, nodeIdentifier, subscribers,
1147                           redirectURI=None):
1148        for subscriber in subscribers:
1149            message = self._createNotification('delete', service,
1150                                               nodeIdentifier,
1151                                               subscriber)
1152            if redirectURI:
1153                redirect = message.event.delete.addElement('redirect')
1154                redirect['uri'] = redirectURI
1155            self.send(message)
1156
1157
1158    def getNodeInfo(self, requestor, service, nodeIdentifier):
1159        return None
1160
1161
1162    def getNodes(self, requestor, service):
1163        return []
1164
1165
1166    def publish(self, requestor, service, nodeIdentifier, items):
1167        raise Unsupported('publish')
1168
1169
1170    def subscribe(self, requestor, service, nodeIdentifier, subscriber):
1171        raise Unsupported('subscribe')
1172
1173
1174    def unsubscribe(self, requestor, service, nodeIdentifier, subscriber):
1175        raise Unsupported('subscribe')
1176
1177
1178    def subscriptions(self, requestor, service):
1179        raise Unsupported('retrieve-subscriptions')
1180
1181
1182    def affiliations(self, requestor, service):
1183        raise Unsupported('retrieve-affiliations')
1184
1185
1186    def create(self, requestor, service, nodeIdentifier):
1187        raise Unsupported('create-nodes')
1188
1189
1190    def getConfigurationOptions(self):
1191        return {}
1192
1193
1194    def getDefaultConfiguration(self, requestor, service, nodeType):
1195        raise Unsupported('retrieve-default')
1196
1197
1198    def getConfiguration(self, requestor, service, nodeIdentifier):
1199        raise Unsupported('config-node')
1200
1201
1202    def setConfiguration(self, requestor, service, nodeIdentifier, options):
1203        raise Unsupported('config-node')
1204
1205
1206    def items(self, requestor, service, nodeIdentifier, maxItems,
1207                    itemIdentifiers):
1208        raise Unsupported('retrieve-items')
1209
1210
1211    def retract(self, requestor, service, nodeIdentifier, itemIdentifiers):
1212        raise Unsupported('retract-items')
1213
1214
1215    def purge(self, requestor, service, nodeIdentifier):
1216        raise Unsupported('purge-nodes')
1217
1218
1219    def delete(self, requestor, service, nodeIdentifier):
1220        raise Unsupported('delete-nodes')
1221
1222
1223
1224class PubSubResource(object):
1225
1226    implements(IPubSubResource)
1227
1228    features = []
1229    discoIdentity = disco.DiscoIdentity('pubsub',
1230                                        'service',
1231                                        'Publish-Subscribe Service')
1232
1233
1234    def locateResource(self, request):
1235        return self
1236
1237
1238    def getInfo(self, requestor, service, nodeIdentifier):
1239        return defer.succeed(None)
1240
1241
1242    def getNodes(self, requestor, service, nodeIdentifier):
1243        return defer.succeed([])
1244
1245
1246    def getConfigurationOptions(self):
1247        return {}
1248
1249
1250    def publish(self, request):
1251        return defer.fail(Unsupported('publish'))
1252
1253
1254    def subscribe(self, request):
1255        return defer.fail(Unsupported('subscribe'))
1256
1257
1258    def unsubscribe(self, request):
1259        return defer.fail(Unsupported('subscribe'))
1260
1261
1262    def subscriptions(self, request):
1263        return defer.fail(Unsupported('retrieve-subscriptions'))
1264
1265
1266    def affiliations(self, request):
1267        return defer.fail(Unsupported('retrieve-affiliations'))
1268
1269
1270    def create(self, request):
1271        return defer.fail(Unsupported('create-nodes'))
1272
1273
1274    def default(self, request):
1275        return defer.fail(Unsupported('retrieve-default'))
1276
1277
1278    def configureGet(self, request):
1279        return defer.fail(Unsupported('config-node'))
1280
1281
1282    def configureSet(self, request):
1283        return defer.fail(Unsupported('config-node'))
1284
1285
1286    def items(self, request):
1287        return defer.fail(Unsupported('retrieve-items'))
1288
1289
1290    def retract(self, request):
1291        return defer.fail(Unsupported('retract-items'))
1292
1293
1294    def purge(self, request):
1295        return defer.fail(Unsupported('purge-nodes'))
1296
1297
1298    def delete(self, request):
1299        return defer.fail(Unsupported('delete-nodes'))
1300
1301
1302    def affiliationsGet(self, request):
1303        return defer.fail(Unsupported('modify-affiliations'))
1304
1305
1306    def affiliationsSet(self, request):
1307        return defer.fail(Unsupported('modify-affiliations'))
1308
1309
1310    def subscriptionsGet(self, request):
1311        return defer.fail(Unsupported('manage-subscriptions'))
1312
1313
1314    def subscriptionsSet(self, request):
1315        return defer.fail(Unsupported('manage-subscriptions'))
Note: See TracBrowser for help on using the repository browser.