source: wokkel/pubsub.py @ 1:677f7d3cca75

Last change on this file since 1:677f7d3cca75 was 1:677f7d3cca75, checked in by Ralph Meijer <ralphm@…>, 14 years ago

Initial, heavily worked around code drop from the idavoll and mimir projects.

File size: 18.5 KB
Line 
1# -*- test-case-name: wokkel.test.test_pubsub -*-
2#
3# Copyright (c) 2003-2007 Ralph Meijer
4# See LICENSE for details.
5
6"""
7XMPP publish-subscribe protocol.
8
9This protocol is specified in
10U{XEP-0060<http://www.xmpp.org/extensions/xep-0060.html>}.
11"""
12
13from zope.interface import implements
14
15from twisted.internet import defer
16from twisted.words.protocols.jabber import jid, error
17from twisted.words.xish import domish
18
19from wokkel import disco, data_form
20from wokkel.subprotocols import IQHandlerMixin, XMPPHandler
21from wokkel.iwokkel import IPubSubService
22
23# Iq get and set XPath queries
24IQ_GET = '/iq[@type="get"]'
25IQ_SET = '/iq[@type="set"]'
26
27# Publish-subscribe namespaces
28NS_PUBSUB = 'http://jabber.org/protocol/pubsub'
29NS_PUBSUB_EVENT = NS_PUBSUB + '#event'
30NS_PUBSUB_ERRORS = NS_PUBSUB + '#errors'
31NS_PUBSUB_OWNER = NS_PUBSUB + "#owner"
32NS_PUBSUB_NODE_CONFIG = NS_PUBSUB + "#node_config"
33NS_PUBSUB_META_DATA = NS_PUBSUB + "#meta-data"
34
35# In publish-subscribe namespace XPath query selector.
36IN_NS_PUBSUB = '[@xmlns="' + NS_PUBSUB + '"]'
37IN_NS_PUBSUB_OWNER = '[@xmlns="' + NS_PUBSUB_OWNER + '"]'
38
39# Publish-subscribe XPath queries
40PUBSUB_ELEMENT = '/pubsub' + IN_NS_PUBSUB
41PUBSUB_OWNER_ELEMENT = '/pubsub' + IN_NS_PUBSUB_OWNER
42PUBSUB_GET = IQ_GET + PUBSUB_ELEMENT
43PUBSUB_SET = IQ_SET + PUBSUB_ELEMENT
44PUBSUB_OWNER_GET = IQ_GET + PUBSUB_OWNER_ELEMENT
45PUBSUB_OWNER_SET = IQ_SET + PUBSUB_OWNER_ELEMENT
46
47# Publish-subscribe command XPath queries
48PUBSUB_PUBLISH = PUBSUB_SET + '/publish' + IN_NS_PUBSUB
49PUBSUB_CREATE = PUBSUB_SET + '/create' + IN_NS_PUBSUB
50PUBSUB_SUBSCRIBE = PUBSUB_SET + '/subscribe' + IN_NS_PUBSUB
51PUBSUB_UNSUBSCRIBE = PUBSUB_SET + '/unsubscribe' + IN_NS_PUBSUB
52PUBSUB_OPTIONS_GET = PUBSUB_GET + '/options' + IN_NS_PUBSUB
53PUBSUB_OPTIONS_SET = PUBSUB_SET + '/options' + IN_NS_PUBSUB
54PUBSUB_DEFAULT = PUBSUB_OWNER_GET + '/default' + IN_NS_PUBSUB_OWNER
55PUBSUB_CONFIGURE_GET = PUBSUB_OWNER_GET + '/configure' + IN_NS_PUBSUB_OWNER
56PUBSUB_CONFIGURE_SET = PUBSUB_OWNER_SET + '/configure' + IN_NS_PUBSUB_OWNER
57PUBSUB_SUBSCRIPTIONS = PUBSUB_GET + '/subscriptions' + IN_NS_PUBSUB
58PUBSUB_AFFILIATIONS = PUBSUB_GET + '/affiliations' + IN_NS_PUBSUB
59PUBSUB_AFFILIATIONS_GET = PUBSUB_OWNER_GET + '/affiliations' + \
60                          IN_NS_PUBSUB_OWNER
61PUBSUB_AFFILIATIONS_SET = PUBSUB_OWNER_SET + '/affiliations' + \
62                          IN_NS_PUBSUB_OWNER
63PUBSUB_SUBSCRIPTIONS_GET = PUBSUB_OWNER_GET + '/subscriptions' + \
64                          IN_NS_PUBSUB_OWNER
65PUBSUB_SUBSCRIPTIONS_SET = PUBSUB_OWNER_SET + '/subscriptions' + \
66                          IN_NS_PUBSUB_OWNER
67PUBSUB_ITEMS = PUBSUB_GET + '/items' + IN_NS_PUBSUB
68PUBSUB_RETRACT = PUBSUB_SET + '/retract' + IN_NS_PUBSUB
69PUBSUB_PURGE = PUBSUB_OWNER_SET + '/purge' + IN_NS_PUBSUB_OWNER
70PUBSUB_DELETE = PUBSUB_OWNER_SET + '/delete' + IN_NS_PUBSUB_OWNER
71
72class BadRequest(error.StanzaError):
73    def __init__(self):
74        error.StanzaError.__init__(self, 'bad-request')
75
76class PubSubError(error.StanzaError):
77    def __init__(self, condition, pubsubCondition, feature=None, text=None):
78        appCondition = domish.Element((NS_PUBSUB_ERRORS, pubsubCondition))
79        if feature:
80            appCondition['feature'] = feature
81        error.StanzaError.__init__(self, condition,
82                                         text=text,
83                                         appCondition=appCondition)
84
85class Unsupported(PubSubError):
86    def __init__(self, feature, text=None):
87        PubSubError.__init__(self, 'feature-not-implemented',
88                                   'unsupported',
89                                   feature,
90                                   text)
91
92class OptionsUnavailable(Unsupported):
93    def __init__(self):
94        Unsupported.__init__(self, 'subscription-options-unavailable')
95
96class PubSubService(XMPPHandler, IQHandlerMixin):
97    """
98    Protocol implementation for a XMPP Publish Subscribe Service.
99
100    The word Service here is used as taken from the Publish Subscribe
101    specification. It is the party responsible for keeping nodes and their
102    subscriptions, and sending out notifications.
103
104    Methods from the L{IPubSubService} interface that are called as
105    a result of an XMPP request may raise exceptions. Alternatively the
106    deferred returned by these methods may have their errback called. These are
107    handled as follows:
108
109    * If the exception is an instance of L{error.StanzaError}, an error
110      response iq is returned.
111    * Any other exception is reported using L{log.msg}. An error response
112      with the condition C{internal-server-error} is returned.
113
114    The default implementation of said methods raises an L{Unsupported}
115    exception and are meant to be overridden.
116
117    @ivar discoIdentity: Service discovery identity as a dictionary with
118                         keys C{'category'}, C{'type'} and C{'name'}.
119    @ivar pubSubFeatures: List of supported publish-subscribe features for
120                          service discovery, as C{str}.
121    @type pubSubFeatures: C{list} or C{None}.
122    """
123
124    implements(IPubSubService)
125
126    iqHandlers = {
127            PUBSUB_PUBLISH: '_onPublish',
128            PUBSUB_CREATE: '_onCreate',
129            PUBSUB_SUBSCRIBE: '_onSubscribe',
130            PUBSUB_OPTIONS_GET: '_onOptionsGet',
131            PUBSUB_OPTIONS_SET: '_onOptionsSet',
132            PUBSUB_AFFILIATIONS: '_onAffiliations',
133            PUBSUB_ITEMS: '_onItems',
134            PUBSUB_RETRACT: '_onRetract',
135            PUBSUB_SUBSCRIPTIONS: '_onSubscriptions',
136            PUBSUB_UNSUBSCRIBE: '_onUnsubscribe',
137
138            PUBSUB_AFFILIATIONS_GET: '_onAffiliationsGet',
139            PUBSUB_AFFILIATIONS_SET: '_onAffiliationsSet',
140            PUBSUB_CONFIGURE_GET: '_onConfigureGet',
141            PUBSUB_CONFIGURE_SET: '_onConfigureSet',
142            PUBSUB_DEFAULT: '_onDefault',
143            PUBSUB_PURGE: '_onPurge',
144            PUBSUB_DELETE: '_onDelete',
145            PUBSUB_SUBSCRIPTIONS_GET: '_onSubscriptionsGet',
146            PUBSUB_SUBSCRIPTIONS_SET: '_onSubscriptionsSet',
147
148            }
149
150    def __init__(self):
151        self.discoIdentity = {'category': 'pubsub',
152                              'type': 'generic',
153                              'name': 'Generic Publish-Subscribe Service'}
154
155        self.pubSubFeatures = []
156
157    def connectionMade(self):
158        self.xmlstream.addObserver(PUBSUB_GET, self.handleRequest)
159        self.xmlstream.addObserver(PUBSUB_SET, self.handleRequest)
160        self.xmlstream.addObserver(PUBSUB_OWNER_GET, self.handleRequest)
161        self.xmlstream.addObserver(PUBSUB_OWNER_SET, self.handleRequest)
162
163    def getDiscoInfo(self, target, requestor, nodeIdentifier):
164        info = []
165
166        if not nodeIdentifier:
167            info.append(disco.DiscoIdentity(**self.discoIdentity))
168
169            info.append(disco.DiscoFeature(disco.NS_ITEMS))
170            info.extend([disco.DiscoFeature("%s#%s" % (NS_PUBSUB, feature))
171                         for feature in self.pubSubFeatures])
172
173            return defer.succeed(info)
174        else:
175            def toInfo(nodeInfo):
176                if not nodeInfo:
177                    return []
178
179                (nodeType, metaData) = nodeInfo
180                info.append(disco.Identity('pubsub', nodeType))
181                if metaData:
182                    form = data_form.Form(type="result",
183                                          form_type=NS_PUBSUB_META_DATA) 
184                    form.add_field("text-single",
185                                   "pubsub#node_type",
186                                   "The type of node (collection or leaf)",
187                                   nodeType)
188
189                    for metaDatum in metaData:
190                        form.add_field(**metaDatum)
191
192                    info.append(form)
193                return info
194
195            d = self.getNodeInfo(requestor, nodeIdentifier)
196            d.addCallback(toInfo)
197            return d
198
199    def getDiscoItems(self, target, requestor, nodeIdentifier):
200        if nodeIdentifier or self.hideNodes:
201            return defer.succeed([])
202
203        d = self.getNodes(requestor)
204        d.addCallback(lambda nodes: [disco.DiscoItem(target, node)
205                                     for node in nodes])
206        return d
207
208    def _onPublish(self, iq):
209        requestor = jid.internJID(iq["from"]).userhostJID()
210
211        try:
212            nodeIdentifier = iq.pubsub.publish["node"]
213        except KeyError:
214            raise BadRequest
215
216        items = []
217        for child in iq.pubsub.publish.children:
218            if child.__class__ == domish.Element and child.name == 'item':
219                items.append(child)
220
221        return self.publish(requestor, nodeIdentifier, items)
222
223    def _onSubscribe(self, iq):
224        requestor = jid.internJID(iq["from"]).userhostJID()
225
226        try:
227            nodeIdentifier = iq.pubsub.subscribe["node"]
228            subscriber = jid.internJID(iq.pubsub.subscribe["jid"])
229        except KeyError:
230            raise BadRequest
231
232        def toResponse(subscription):
233            nodeIdentifier, state = subscription
234            response = domish.Element((NS_PUBSUB, "pubsub"))
235            subscription = response.addElement("subscription")
236            subscription["node"] = nodeIdentifier
237            subscription["jid"] = subscriber.full()
238            subscription["subscription"] = state
239            return response
240
241        d = self.subscribe(requestor, nodeIdentifier, subscriber)
242        d.addCallback(toResponse)
243        return d
244
245    def _onUnsubscribe(self, iq):
246        requestor = jid.internJID(iq["from"]).userhostJID()
247
248        try:
249            nodeIdentifier = iq.pubsub.unsubscribe["node"]
250            subscriber = jid.internJID(iq.pubsub.unsubscribe["jid"])
251        except KeyError:
252            raise BadRequest
253
254        return self.unsubscribe(requestor, nodeIdentifier, subscriber)
255
256    def _onOptionsGet(self, iq):
257        raise Unsupported('subscription-options-unavailable')
258
259    def _onOptionsSet(self, iq):
260        raise Unsupported('subscription-options-unavailable')
261
262    def _onSubscriptions(self, iq):
263        requestor = jid.internJID(iq["from"]).userhostJID()
264
265        def toResponse(result):
266            response = domish.Element((NS_PUBSUB, 'pubsub'))
267            subscriptions = response.addElement('subscriptions')
268            for node, subscriber, state in result:
269                item = subscriptions.addElement('subscription')
270                item['node'] = node
271                item['jid'] = subscriber.full()
272                item['subscription'] = state
273            return response
274
275        d = self.subscriptions(requestor)
276        d.addCallback(toResponse)
277        return d
278
279    def _onAffiliations(self, iq):
280        requestor = jid.internJID(iq["from"]).userhostJID()
281
282        def toResponse(result):
283            response = domish.Element((NS_PUBSUB, 'pubsub'))
284            affiliations = response.addElement('affiliations')
285
286            for nodeIdentifier, affiliation in result:
287                item = affiliations.addElement('affiliation')
288                item['node'] = nodeIdentifier
289                item['affiliation'] = affiliation
290
291            return response
292
293        d = self.affiliations(requestor)
294        d.addCallback(toResponse)
295        return d
296
297    def _onCreate(self, iq):
298        requestor = jid.internJID(iq["from"]).userhostJID()
299        nodeIdentifier = iq.pubsub.create.getAttribute("node")
300
301        def toResponse(result):
302            if not nodeIdentifier or nodeIdentifier != result:
303                response = domish.Element((NS_PUBSUB, 'pubsub'))
304                create = response.addElement('create')
305                create['node'] = result
306                return response
307            else:
308                return None
309
310        d = self.create(requestor, nodeIdentifier)
311        d.addCallback(toResponse)
312        return d
313
314    def _formFromConfiguration(self, options):
315        form = data_form.Form(type="form", form_type=NS_PUBSUB_NODE_CONFIG)
316
317        for option in options:
318            form.add_field(**option)
319
320        return form
321
322    def _onDefault(self, iq):
323        requestor = jid.internJID(iq["from"]).userhostJID()
324
325        def toResponse(options):
326            response = domish.Element((NS_PUBSUB_OWNER, "pubsub"))
327            default = response.addElement("default")
328            default.addChild(self._formFromConfiguration(options))
329            return response
330
331        d = self.getDefaultConfiguration(requestor)
332        d.addCallback(toResponse)
333        return d
334
335    def _onConfigureGet(self, iq):
336        requestor = jid.internJID(iq["from"]).userhostJID()
337        nodeIdentifier = iq.pubsub.configure.getAttribute("node")
338
339        def toResponse(options):
340            response = domish.Element((NS_PUBSUB_OWNER, "pubsub"))
341            configure = response.addElement("configure")
342            configure.addChild(self._formFromConfiguration(options))
343
344            if nodeIdentifier:
345                configure["node"] = nodeIdentifier
346
347            return response
348
349        d = self.getConfiguration(requestor, nodeIdentifier)
350        d.addCallback(toResponse)
351        return d
352
353    def _onConfigureSet(self, iq):
354        requestor = jid.internJID(iq["from"]).userhostJID()
355        nodeIdentifier = iq.pubsub.configure["node"]
356
357        def getFormOptions(self, form):
358            options = {}
359
360            for element in form.elements():
361                if element.name == 'field' and \
362                   element.uri == data_form.NS_X_DATA:
363                    try:
364                        options[element["var"]] = str(element.value)
365                    except (KeyError, AttributeError):
366                        raise BadRequest
367
368            return options
369
370        # Search configuration form with correct FORM_TYPE and process it
371
372        for element in iq.pubsub.configure.elements():
373            if element.name != 'x' or element.uri != data_form.NS_X_DATA:
374                continue
375
376            type = element.getAttribute("type")
377            if type == "cancel":
378                return None
379            elif type != "submit":
380                continue
381
382            options = getFormOptions(element)
383
384            if options["FORM_TYPE"] == NS_PUBSUB + "#node_config":
385                del options["FORM_TYPE"]
386                return self.setConfiguration(requestor, nodeIdentifier,
387                                             options)
388
389        raise BadRequest
390
391    def _onItems(self, iq):
392        requestor = jid.internJID(iq["from"]).userhostJID()
393
394        try:
395            nodeIdentifier = iq.pubsub.items["node"]
396        except KeyError:
397            raise BadRequest
398
399        maxItems = iq.pubsub.items.getAttribute('max_items')
400
401        if maxItems:
402            try:
403                maxItems = int(maxItems)
404            except ValueError:
405                raise BadRequest
406
407        itemIdentifiers = []
408        for child in iq.pubsub.items.elements():
409            if child.name == 'item' and child.uri == NS_PUBSUB:
410                try:
411                    itemIdentifiers.append(child["id"])
412                except KeyError:
413                    raise BadRequest
414
415        def toResponse(result):
416            response = domish.Element((NS_PUBSUB, 'pubsub'))
417            items = response.addElement('items')
418            items["node"] = nodeIdentifier
419
420            for item in result:
421                items.addRawXml(item)
422
423            return response
424
425        d = self.items(requestor, nodeIdentifier, maxItems, itemIdentifiers)
426        d.addCallback(toResponse)
427        return d
428
429    def _onRetract(self, iq):
430        requestor = jid.internJID(iq["from"]).userhostJID()
431
432        try:
433            nodeIdentifier = iq.pubsub.retract["node"]
434        except KeyError:
435            raise BadRequest
436
437        itemIdentifiers = []
438        for child in iq.pubsub.retract.elements():
439            if child.uri == NS_PUBSUB_OWNER and child.name == 'item':
440                try:
441                    itemIdentifiers.append(child["id"])
442                except KeyError:
443                    raise BadRequest
444
445        return self.retract(requestor, nodeIdentifier, itemIdentifiers)
446
447    def _onPurge(self, iq):
448        requestor = jid.internJID(iq["from"]).userhostJID()
449
450        try:
451            nodeIdentifier = iq.pubsub.purge["node"]
452        except KeyError:
453            raise BadRequest
454
455        return self.purge(requestor, nodeIdentifier)
456
457    def _onDelete(self, iq):
458        requestor = jid.internJID(iq["from"]).userhostJID()
459
460        try:
461            nodeIdentifier = iq.pubsub.delete["node"]
462        except KeyError:
463            raise BadRequest
464
465        return self.delete(requestor, nodeIdentifier)
466
467    def _onAffiliationsGet(self, iq):
468        raise Unsupported('modify-affiliations')
469
470    def _onAffiliationsSet(self, iq):
471        raise Unsupported('modify-affiliations')
472
473    def _onSubscriptionsGet(self, iq):
474        raise Unsupported('manage-subscriptions')
475
476    def _onSubscriptionsSet(self, iq):
477        raise Unsupported('manage-subscriptions')
478
479    # public methods
480
481    def notifyPublish(self, entity, nodeIdentifier, notifications):
482
483        print notifications
484        for recipient, items in notifications:
485            message = domish.Element((None, "message"))
486            message["from"] = entity.full()
487            message["to"] = recipient.full()
488            event = message.addElement((NS_PUBSUB_EVENT, "event"))
489            element = event.addElement("items")
490            element["node"] = nodeIdentifier
491            element.children = items
492            self.send(message)
493
494    def getNodeInfo(self, requestor, nodeIdentifier):
495        return None
496
497    def getNodes(self, requestor):
498        return []
499
500    def publish(self, requestor, nodeIdentifier, items):
501        raise Unsupported('publish')
502
503    def subscribe(self, requestor, nodeIdentifier, subscriber):
504        raise Unsupported('subscribe')
505
506    def unsubscribe(self, requestor, nodeIdentifier, subscriber):
507        raise Unsupported('subscribe')
508
509    def subscriptions(self, requestor):
510        raise Unsupported('retrieve-subscriptions')
511
512    def affiliations(self, requestor):
513        raise Unsupported('retrieve-affiliations')
514
515    def create(self, requestor, nodeIdentifier):
516        raise Unsupported('create-nodes')
517
518    def getDefaultConfiguration(self, requestor):
519        raise Unsupported('retrieve-default')
520
521    def getConfiguration(self, requestor, nodeIdentifier):
522        raise Unsupported('config-node')
523
524    def setConfiguration(self, requestor, nodeIdentifier, options):
525        raise Unsupported('config-node')
526
527    def items(self, requestor, nodeIdentifier, maxItems, itemIdentifiers):
528        raise Unsupported('retrieve-items')
529
530    def retract(self, requestor, nodeIdentifier, itemIdentifiers):
531        raise Unsupported('retract-items')
532
533    def purge(self, requestor, nodeIdentifier):
534        raise Unsupported('purge-nodes')
535
536    def delete(self, requestor, nodeIdentifier):
537        raise Unsupported('delete-nodes')
538
Note: See TracBrowser for help on using the repository browser.