[1] | 1 | # -*- test-case-name: wokkel.test.test_pubsub -*- |
---|
| 2 | # |
---|
| 3 | # Copyright (c) 2003-2007 Ralph Meijer |
---|
| 4 | # See LICENSE for details. |
---|
| 5 | |
---|
| 6 | """ |
---|
| 7 | XMPP publish-subscribe protocol. |
---|
| 8 | |
---|
| 9 | This protocol is specified in |
---|
| 10 | U{XEP-0060<http://www.xmpp.org/extensions/xep-0060.html>}. |
---|
| 11 | """ |
---|
| 12 | |
---|
| 13 | from zope.interface import implements |
---|
| 14 | |
---|
| 15 | from twisted.internet import defer |
---|
[2] | 16 | from twisted.words.protocols.jabber import jid, error, xmlstream |
---|
[1] | 17 | from twisted.words.xish import domish |
---|
| 18 | |
---|
| 19 | from wokkel import disco, data_form |
---|
| 20 | from wokkel.subprotocols import IQHandlerMixin, XMPPHandler |
---|
[2] | 21 | from wokkel.iwokkel import IPubSubClient, IPubSubService |
---|
[1] | 22 | |
---|
| 23 | # Iq get and set XPath queries |
---|
| 24 | IQ_GET = '/iq[@type="get"]' |
---|
| 25 | IQ_SET = '/iq[@type="set"]' |
---|
| 26 | |
---|
| 27 | # Publish-subscribe namespaces |
---|
| 28 | NS_PUBSUB = 'http://jabber.org/protocol/pubsub' |
---|
| 29 | NS_PUBSUB_EVENT = NS_PUBSUB + '#event' |
---|
| 30 | NS_PUBSUB_ERRORS = NS_PUBSUB + '#errors' |
---|
| 31 | NS_PUBSUB_OWNER = NS_PUBSUB + "#owner" |
---|
| 32 | NS_PUBSUB_NODE_CONFIG = NS_PUBSUB + "#node_config" |
---|
| 33 | NS_PUBSUB_META_DATA = NS_PUBSUB + "#meta-data" |
---|
| 34 | |
---|
| 35 | # In publish-subscribe namespace XPath query selector. |
---|
| 36 | IN_NS_PUBSUB = '[@xmlns="' + NS_PUBSUB + '"]' |
---|
| 37 | IN_NS_PUBSUB_OWNER = '[@xmlns="' + NS_PUBSUB_OWNER + '"]' |
---|
| 38 | |
---|
| 39 | # Publish-subscribe XPath queries |
---|
| 40 | PUBSUB_ELEMENT = '/pubsub' + IN_NS_PUBSUB |
---|
| 41 | PUBSUB_OWNER_ELEMENT = '/pubsub' + IN_NS_PUBSUB_OWNER |
---|
| 42 | PUBSUB_GET = IQ_GET + PUBSUB_ELEMENT |
---|
| 43 | PUBSUB_SET = IQ_SET + PUBSUB_ELEMENT |
---|
| 44 | PUBSUB_OWNER_GET = IQ_GET + PUBSUB_OWNER_ELEMENT |
---|
| 45 | PUBSUB_OWNER_SET = IQ_SET + PUBSUB_OWNER_ELEMENT |
---|
| 46 | |
---|
| 47 | # Publish-subscribe command XPath queries |
---|
| 48 | PUBSUB_PUBLISH = PUBSUB_SET + '/publish' + IN_NS_PUBSUB |
---|
| 49 | PUBSUB_CREATE = PUBSUB_SET + '/create' + IN_NS_PUBSUB |
---|
| 50 | PUBSUB_SUBSCRIBE = PUBSUB_SET + '/subscribe' + IN_NS_PUBSUB |
---|
| 51 | PUBSUB_UNSUBSCRIBE = PUBSUB_SET + '/unsubscribe' + IN_NS_PUBSUB |
---|
| 52 | PUBSUB_OPTIONS_GET = PUBSUB_GET + '/options' + IN_NS_PUBSUB |
---|
| 53 | PUBSUB_OPTIONS_SET = PUBSUB_SET + '/options' + IN_NS_PUBSUB |
---|
| 54 | PUBSUB_DEFAULT = PUBSUB_OWNER_GET + '/default' + IN_NS_PUBSUB_OWNER |
---|
| 55 | PUBSUB_CONFIGURE_GET = PUBSUB_OWNER_GET + '/configure' + IN_NS_PUBSUB_OWNER |
---|
| 56 | PUBSUB_CONFIGURE_SET = PUBSUB_OWNER_SET + '/configure' + IN_NS_PUBSUB_OWNER |
---|
| 57 | PUBSUB_SUBSCRIPTIONS = PUBSUB_GET + '/subscriptions' + IN_NS_PUBSUB |
---|
| 58 | PUBSUB_AFFILIATIONS = PUBSUB_GET + '/affiliations' + IN_NS_PUBSUB |
---|
| 59 | PUBSUB_AFFILIATIONS_GET = PUBSUB_OWNER_GET + '/affiliations' + \ |
---|
| 60 | IN_NS_PUBSUB_OWNER |
---|
| 61 | PUBSUB_AFFILIATIONS_SET = PUBSUB_OWNER_SET + '/affiliations' + \ |
---|
| 62 | IN_NS_PUBSUB_OWNER |
---|
| 63 | PUBSUB_SUBSCRIPTIONS_GET = PUBSUB_OWNER_GET + '/subscriptions' + \ |
---|
| 64 | IN_NS_PUBSUB_OWNER |
---|
| 65 | PUBSUB_SUBSCRIPTIONS_SET = PUBSUB_OWNER_SET + '/subscriptions' + \ |
---|
| 66 | IN_NS_PUBSUB_OWNER |
---|
| 67 | PUBSUB_ITEMS = PUBSUB_GET + '/items' + IN_NS_PUBSUB |
---|
| 68 | PUBSUB_RETRACT = PUBSUB_SET + '/retract' + IN_NS_PUBSUB |
---|
| 69 | PUBSUB_PURGE = PUBSUB_OWNER_SET + '/purge' + IN_NS_PUBSUB_OWNER |
---|
| 70 | PUBSUB_DELETE = PUBSUB_OWNER_SET + '/delete' + IN_NS_PUBSUB_OWNER |
---|
| 71 | |
---|
| 72 | class BadRequest(error.StanzaError): |
---|
[2] | 73 | """ |
---|
| 74 | Bad request stanza error. |
---|
| 75 | """ |
---|
[1] | 76 | def __init__(self): |
---|
| 77 | error.StanzaError.__init__(self, 'bad-request') |
---|
| 78 | |
---|
[2] | 79 | |
---|
| 80 | |
---|
| 81 | class SubscriptionPending(Exception): |
---|
| 82 | """ |
---|
| 83 | Raised when the requested subscription is pending acceptance. |
---|
| 84 | """ |
---|
| 85 | |
---|
| 86 | |
---|
| 87 | class SubscriptionUnconfigured(Exception): |
---|
| 88 | """ |
---|
| 89 | Raised when the requested subscription needs to be configured before |
---|
| 90 | becoming active. |
---|
| 91 | """ |
---|
| 92 | |
---|
| 93 | |
---|
[1] | 94 | class PubSubError(error.StanzaError): |
---|
[2] | 95 | """ |
---|
| 96 | Exception with publish-subscribe specific condition. |
---|
| 97 | """ |
---|
[1] | 98 | def __init__(self, condition, pubsubCondition, feature=None, text=None): |
---|
| 99 | appCondition = domish.Element((NS_PUBSUB_ERRORS, pubsubCondition)) |
---|
| 100 | if feature: |
---|
| 101 | appCondition['feature'] = feature |
---|
| 102 | error.StanzaError.__init__(self, condition, |
---|
| 103 | text=text, |
---|
| 104 | appCondition=appCondition) |
---|
| 105 | |
---|
[2] | 106 | |
---|
[1] | 107 | class Unsupported(PubSubError): |
---|
| 108 | def __init__(self, feature, text=None): |
---|
| 109 | PubSubError.__init__(self, 'feature-not-implemented', |
---|
| 110 | 'unsupported', |
---|
| 111 | feature, |
---|
| 112 | text) |
---|
| 113 | |
---|
[2] | 114 | |
---|
[1] | 115 | class OptionsUnavailable(Unsupported): |
---|
| 116 | def __init__(self): |
---|
| 117 | Unsupported.__init__(self, 'subscription-options-unavailable') |
---|
| 118 | |
---|
[2] | 119 | |
---|
| 120 | class Item(domish.Element): |
---|
| 121 | """ |
---|
| 122 | Publish subscribe item. |
---|
| 123 | |
---|
| 124 | This behaves like an object providing L{domish.IElement}. |
---|
| 125 | |
---|
| 126 | Item payload can be added using C{addChild} or C{addRawXml}, or using the |
---|
| 127 | C{payload} keyword argument to C{__init__}. |
---|
| 128 | """ |
---|
| 129 | |
---|
| 130 | def __init__(self, id=None, payload=None): |
---|
| 131 | """ |
---|
| 132 | @param id: optional item identifier |
---|
| 133 | @type id: L{unicode} |
---|
| 134 | @param payload: optional item payload. Either as a domish element, or |
---|
| 135 | as serialized XML. |
---|
| 136 | @type payload: object providing L{domish.IElement} or L{unicode}. |
---|
| 137 | """ |
---|
| 138 | |
---|
| 139 | domish.Element.__init__(self, (None, 'item')) |
---|
| 140 | if id is not None: |
---|
| 141 | self['id'] = id |
---|
| 142 | if payload is not None: |
---|
| 143 | if isinstance(payload, basestring): |
---|
| 144 | self.addRawXml(payload) |
---|
| 145 | else: |
---|
| 146 | self.addChild(payload) |
---|
| 147 | |
---|
| 148 | class PubSubRequest(xmlstream.IQ): |
---|
| 149 | """ |
---|
| 150 | Base class for publish subscribe user requests. |
---|
| 151 | |
---|
| 152 | @cvar namespace: request namespace |
---|
| 153 | @cvar verb: request verb |
---|
| 154 | @cvar method: type attribute of the IQ request. Either C{'set'} or C{'get'} |
---|
| 155 | @ivar command: command element of the request. This is the direct child of |
---|
| 156 | the C{pubsub} element in the C{namespace} with the name |
---|
| 157 | C{verb}. |
---|
| 158 | """ |
---|
| 159 | |
---|
| 160 | namespace = NS_PUBSUB |
---|
| 161 | method = 'set' |
---|
| 162 | |
---|
| 163 | def __init__(self, xs): |
---|
| 164 | xmlstream.IQ.__init__(self, xs, self.method) |
---|
| 165 | self.addElement((self.namespace, 'pubsub')) |
---|
| 166 | |
---|
| 167 | self.command = self.pubsub.addElement(self.verb) |
---|
| 168 | |
---|
| 169 | def send(self, to): |
---|
| 170 | destination = unicode(to) |
---|
| 171 | return xmlstream.IQ.send(self, destination) |
---|
| 172 | |
---|
| 173 | class CreateNode(PubSubRequest): |
---|
| 174 | verb = 'create' |
---|
| 175 | |
---|
| 176 | def __init__(self, xs, node=None): |
---|
| 177 | PubSubRequest.__init__(self, xs) |
---|
| 178 | if node: |
---|
| 179 | self.command["node"] = node |
---|
| 180 | |
---|
| 181 | class DeleteNode(PubSubRequest): |
---|
| 182 | verb = 'delete' |
---|
| 183 | def __init__(self, xs, node): |
---|
| 184 | PubSubRequest.__init__(self, xs) |
---|
| 185 | self.command["node"] = node |
---|
| 186 | |
---|
| 187 | class Subscribe(PubSubRequest): |
---|
| 188 | verb = 'subscribe' |
---|
| 189 | |
---|
| 190 | def __init__(self, xs, node, subscriber): |
---|
| 191 | PubSubRequest.__init__(self, xs) |
---|
| 192 | self.command["node"] = node |
---|
| 193 | self.command["jid"] = subscriber.full() |
---|
| 194 | |
---|
| 195 | class Publish(PubSubRequest): |
---|
| 196 | verb = 'publish' |
---|
| 197 | |
---|
| 198 | def __init__(self, xs, node): |
---|
| 199 | PubSubRequest.__init__(self, xs) |
---|
| 200 | self.command["node"] = node |
---|
| 201 | |
---|
| 202 | def addItem(self, id=None, payload=None): |
---|
| 203 | item = self.command.addElement("item") |
---|
| 204 | item.addChild(payload) |
---|
| 205 | |
---|
| 206 | if id is not None: |
---|
| 207 | item["id"] = id |
---|
| 208 | |
---|
| 209 | return item |
---|
| 210 | |
---|
| 211 | class PubSubClient(XMPPHandler): |
---|
| 212 | """ |
---|
| 213 | Publish subscribe client protocol. |
---|
| 214 | """ |
---|
| 215 | |
---|
| 216 | implements(IPubSubClient) |
---|
| 217 | |
---|
| 218 | def connectionInitialized(self): |
---|
| 219 | self.xmlstream.addObserver('/message/event[@xmlns="%s"]/items' % |
---|
| 220 | NS_PUBSUB_EVENT, self._onItems) |
---|
| 221 | |
---|
| 222 | def _onItems(self, message): |
---|
| 223 | try: |
---|
[6] | 224 | service = jid.JID(message["from"]) |
---|
| 225 | recipient = jid.JID(message["to"]) |
---|
| 226 | nodeIdentifier = message.event.items["node"] |
---|
[2] | 227 | except KeyError: |
---|
| 228 | return |
---|
| 229 | |
---|
| 230 | items = [element for element in message.event.items.elements() |
---|
| 231 | if element.name == 'item'] |
---|
| 232 | |
---|
[6] | 233 | self.itemsReceived(recipient, service, nodeIdentifier, items) |
---|
[2] | 234 | |
---|
[6] | 235 | def itemsReceived(self, recipient, service, nodeIdentifier, items): |
---|
[2] | 236 | pass |
---|
| 237 | |
---|
[6] | 238 | def createNode(self, service, nodeIdentifier=None): |
---|
| 239 | request = CreateNode(self.xmlstream, nodeIdentifier) |
---|
[2] | 240 | |
---|
| 241 | def cb(iq): |
---|
| 242 | try: |
---|
| 243 | new_node = iq.pubsub.create["node"] |
---|
| 244 | except AttributeError: |
---|
| 245 | # the suggested node identifier was accepted |
---|
[6] | 246 | new_node = nodeIdentifier |
---|
[2] | 247 | return new_node |
---|
| 248 | |
---|
| 249 | return request.send(service).addCallback(cb) |
---|
| 250 | |
---|
[6] | 251 | def deleteNode(self, service, nodeIdentifier): |
---|
| 252 | return DeleteNode(self.xmlstream, nodeIdentifier).send(service) |
---|
[2] | 253 | |
---|
[6] | 254 | def subscribe(self, service, nodeIdentifier, subscriber): |
---|
| 255 | request = Subscribe(self.xmlstream, nodeIdentifier, subscriber) |
---|
[2] | 256 | |
---|
| 257 | def cb(iq): |
---|
| 258 | subscription = iq.pubsub.subscription["subscription"] |
---|
| 259 | |
---|
| 260 | if subscription == 'pending': |
---|
| 261 | raise SubscriptionPending |
---|
| 262 | elif subscription == 'unconfigured': |
---|
| 263 | raise SubscriptionUnconfigured |
---|
| 264 | else: |
---|
| 265 | # we assume subscription == 'subscribed' |
---|
| 266 | # any other value would be invalid, but that should have |
---|
| 267 | # yielded a stanza error. |
---|
| 268 | return None |
---|
| 269 | |
---|
| 270 | return request.send(service).addCallback(cb) |
---|
| 271 | |
---|
[6] | 272 | def publish(self, service, nodeIdentifier, items=[]): |
---|
| 273 | request = Publish(self.xmlstream, nodeIdentifier) |
---|
[2] | 274 | for item in items: |
---|
| 275 | request.command.addChild(item) |
---|
| 276 | |
---|
| 277 | return request.send(service) |
---|
| 278 | |
---|
[1] | 279 | class PubSubService(XMPPHandler, IQHandlerMixin): |
---|
| 280 | """ |
---|
| 281 | Protocol implementation for a XMPP Publish Subscribe Service. |
---|
| 282 | |
---|
| 283 | The word Service here is used as taken from the Publish Subscribe |
---|
| 284 | specification. It is the party responsible for keeping nodes and their |
---|
| 285 | subscriptions, and sending out notifications. |
---|
| 286 | |
---|
| 287 | Methods from the L{IPubSubService} interface that are called as |
---|
| 288 | a result of an XMPP request may raise exceptions. Alternatively the |
---|
| 289 | deferred returned by these methods may have their errback called. These are |
---|
| 290 | handled as follows: |
---|
| 291 | |
---|
| 292 | * If the exception is an instance of L{error.StanzaError}, an error |
---|
| 293 | response iq is returned. |
---|
| 294 | * Any other exception is reported using L{log.msg}. An error response |
---|
| 295 | with the condition C{internal-server-error} is returned. |
---|
| 296 | |
---|
| 297 | The default implementation of said methods raises an L{Unsupported} |
---|
| 298 | exception and are meant to be overridden. |
---|
| 299 | |
---|
| 300 | @ivar discoIdentity: Service discovery identity as a dictionary with |
---|
| 301 | keys C{'category'}, C{'type'} and C{'name'}. |
---|
| 302 | @ivar pubSubFeatures: List of supported publish-subscribe features for |
---|
| 303 | service discovery, as C{str}. |
---|
| 304 | @type pubSubFeatures: C{list} or C{None}. |
---|
| 305 | """ |
---|
| 306 | |
---|
| 307 | implements(IPubSubService) |
---|
| 308 | |
---|
| 309 | iqHandlers = { |
---|
| 310 | PUBSUB_PUBLISH: '_onPublish', |
---|
| 311 | PUBSUB_CREATE: '_onCreate', |
---|
| 312 | PUBSUB_SUBSCRIBE: '_onSubscribe', |
---|
| 313 | PUBSUB_OPTIONS_GET: '_onOptionsGet', |
---|
| 314 | PUBSUB_OPTIONS_SET: '_onOptionsSet', |
---|
| 315 | PUBSUB_AFFILIATIONS: '_onAffiliations', |
---|
| 316 | PUBSUB_ITEMS: '_onItems', |
---|
| 317 | PUBSUB_RETRACT: '_onRetract', |
---|
| 318 | PUBSUB_SUBSCRIPTIONS: '_onSubscriptions', |
---|
| 319 | PUBSUB_UNSUBSCRIBE: '_onUnsubscribe', |
---|
| 320 | |
---|
| 321 | PUBSUB_AFFILIATIONS_GET: '_onAffiliationsGet', |
---|
| 322 | PUBSUB_AFFILIATIONS_SET: '_onAffiliationsSet', |
---|
| 323 | PUBSUB_CONFIGURE_GET: '_onConfigureGet', |
---|
| 324 | PUBSUB_CONFIGURE_SET: '_onConfigureSet', |
---|
| 325 | PUBSUB_DEFAULT: '_onDefault', |
---|
| 326 | PUBSUB_PURGE: '_onPurge', |
---|
| 327 | PUBSUB_DELETE: '_onDelete', |
---|
| 328 | PUBSUB_SUBSCRIPTIONS_GET: '_onSubscriptionsGet', |
---|
| 329 | PUBSUB_SUBSCRIPTIONS_SET: '_onSubscriptionsSet', |
---|
| 330 | |
---|
| 331 | } |
---|
| 332 | |
---|
| 333 | def __init__(self): |
---|
| 334 | self.discoIdentity = {'category': 'pubsub', |
---|
| 335 | 'type': 'generic', |
---|
| 336 | 'name': 'Generic Publish-Subscribe Service'} |
---|
| 337 | |
---|
| 338 | self.pubSubFeatures = [] |
---|
| 339 | |
---|
| 340 | def connectionMade(self): |
---|
| 341 | self.xmlstream.addObserver(PUBSUB_GET, self.handleRequest) |
---|
| 342 | self.xmlstream.addObserver(PUBSUB_SET, self.handleRequest) |
---|
| 343 | self.xmlstream.addObserver(PUBSUB_OWNER_GET, self.handleRequest) |
---|
| 344 | self.xmlstream.addObserver(PUBSUB_OWNER_SET, self.handleRequest) |
---|
| 345 | |
---|
[6] | 346 | def getDiscoInfo(self, requestor, target, nodeIdentifier): |
---|
[1] | 347 | info = [] |
---|
| 348 | |
---|
| 349 | if not nodeIdentifier: |
---|
| 350 | info.append(disco.DiscoIdentity(**self.discoIdentity)) |
---|
| 351 | |
---|
| 352 | info.append(disco.DiscoFeature(disco.NS_ITEMS)) |
---|
| 353 | info.extend([disco.DiscoFeature("%s#%s" % (NS_PUBSUB, feature)) |
---|
| 354 | for feature in self.pubSubFeatures]) |
---|
| 355 | |
---|
| 356 | return defer.succeed(info) |
---|
| 357 | else: |
---|
| 358 | def toInfo(nodeInfo): |
---|
| 359 | if not nodeInfo: |
---|
| 360 | return [] |
---|
| 361 | |
---|
| 362 | (nodeType, metaData) = nodeInfo |
---|
| 363 | info.append(disco.Identity('pubsub', nodeType)) |
---|
| 364 | if metaData: |
---|
| 365 | form = data_form.Form(type="result", |
---|
| 366 | form_type=NS_PUBSUB_META_DATA) |
---|
| 367 | form.add_field("text-single", |
---|
| 368 | "pubsub#node_type", |
---|
| 369 | "The type of node (collection or leaf)", |
---|
| 370 | nodeType) |
---|
| 371 | |
---|
| 372 | for metaDatum in metaData: |
---|
| 373 | form.add_field(**metaDatum) |
---|
| 374 | |
---|
| 375 | info.append(form) |
---|
| 376 | return info |
---|
| 377 | |
---|
| 378 | d = self.getNodeInfo(requestor, nodeIdentifier) |
---|
| 379 | d.addCallback(toInfo) |
---|
| 380 | return d |
---|
| 381 | |
---|
[6] | 382 | def getDiscoItems(self, requestor, target, nodeIdentifier): |
---|
[1] | 383 | if nodeIdentifier or self.hideNodes: |
---|
| 384 | return defer.succeed([]) |
---|
| 385 | |
---|
| 386 | d = self.getNodes(requestor) |
---|
| 387 | d.addCallback(lambda nodes: [disco.DiscoItem(target, node) |
---|
| 388 | for node in nodes]) |
---|
| 389 | return d |
---|
| 390 | |
---|
| 391 | def _onPublish(self, iq): |
---|
| 392 | requestor = jid.internJID(iq["from"]).userhostJID() |
---|
[6] | 393 | service = jid.internJID(iq["to"]) |
---|
[1] | 394 | |
---|
| 395 | try: |
---|
| 396 | nodeIdentifier = iq.pubsub.publish["node"] |
---|
| 397 | except KeyError: |
---|
| 398 | raise BadRequest |
---|
| 399 | |
---|
| 400 | items = [] |
---|
[2] | 401 | for element in iq.pubsub.publish.elements(): |
---|
| 402 | if element.uri == NS_PUBSUB and element.name == 'item': |
---|
| 403 | items.append(element) |
---|
[1] | 404 | |
---|
[6] | 405 | return self.publish(requestor, service, nodeIdentifier, items) |
---|
[1] | 406 | |
---|
| 407 | def _onSubscribe(self, iq): |
---|
| 408 | requestor = jid.internJID(iq["from"]).userhostJID() |
---|
[6] | 409 | service = jid.internJID(iq["to"]) |
---|
[1] | 410 | |
---|
| 411 | try: |
---|
| 412 | nodeIdentifier = iq.pubsub.subscribe["node"] |
---|
| 413 | subscriber = jid.internJID(iq.pubsub.subscribe["jid"]) |
---|
| 414 | except KeyError: |
---|
| 415 | raise BadRequest |
---|
| 416 | |
---|
| 417 | def toResponse(subscription): |
---|
| 418 | nodeIdentifier, state = subscription |
---|
| 419 | response = domish.Element((NS_PUBSUB, "pubsub")) |
---|
| 420 | subscription = response.addElement("subscription") |
---|
| 421 | subscription["node"] = nodeIdentifier |
---|
| 422 | subscription["jid"] = subscriber.full() |
---|
| 423 | subscription["subscription"] = state |
---|
| 424 | return response |
---|
| 425 | |
---|
[6] | 426 | d = self.subscribe(requestor, service, nodeIdentifier, subscriber) |
---|
[1] | 427 | d.addCallback(toResponse) |
---|
| 428 | return d |
---|
| 429 | |
---|
| 430 | def _onUnsubscribe(self, iq): |
---|
| 431 | requestor = jid.internJID(iq["from"]).userhostJID() |
---|
[6] | 432 | service = jid.internJID(iq["to"]) |
---|
[1] | 433 | |
---|
| 434 | try: |
---|
| 435 | nodeIdentifier = iq.pubsub.unsubscribe["node"] |
---|
| 436 | subscriber = jid.internJID(iq.pubsub.unsubscribe["jid"]) |
---|
| 437 | except KeyError: |
---|
| 438 | raise BadRequest |
---|
| 439 | |
---|
[6] | 440 | return self.unsubscribe(requestor, service, nodeIdentifier, subscriber) |
---|
[1] | 441 | |
---|
| 442 | def _onOptionsGet(self, iq): |
---|
| 443 | raise Unsupported('subscription-options-unavailable') |
---|
| 444 | |
---|
| 445 | def _onOptionsSet(self, iq): |
---|
| 446 | raise Unsupported('subscription-options-unavailable') |
---|
| 447 | |
---|
| 448 | def _onSubscriptions(self, iq): |
---|
| 449 | requestor = jid.internJID(iq["from"]).userhostJID() |
---|
[6] | 450 | service = jid.internJID(iq["to"]) |
---|
[1] | 451 | |
---|
| 452 | def toResponse(result): |
---|
| 453 | response = domish.Element((NS_PUBSUB, 'pubsub')) |
---|
| 454 | subscriptions = response.addElement('subscriptions') |
---|
| 455 | for node, subscriber, state in result: |
---|
| 456 | item = subscriptions.addElement('subscription') |
---|
| 457 | item['node'] = node |
---|
| 458 | item['jid'] = subscriber.full() |
---|
| 459 | item['subscription'] = state |
---|
| 460 | return response |
---|
| 461 | |
---|
[6] | 462 | d = self.subscriptions(requestor, service) |
---|
[1] | 463 | d.addCallback(toResponse) |
---|
| 464 | return d |
---|
| 465 | |
---|
| 466 | def _onAffiliations(self, iq): |
---|
| 467 | requestor = jid.internJID(iq["from"]).userhostJID() |
---|
[6] | 468 | service = jid.internJID(iq["to"]) |
---|
[1] | 469 | |
---|
| 470 | def toResponse(result): |
---|
| 471 | response = domish.Element((NS_PUBSUB, 'pubsub')) |
---|
| 472 | affiliations = response.addElement('affiliations') |
---|
| 473 | |
---|
| 474 | for nodeIdentifier, affiliation in result: |
---|
| 475 | item = affiliations.addElement('affiliation') |
---|
| 476 | item['node'] = nodeIdentifier |
---|
| 477 | item['affiliation'] = affiliation |
---|
| 478 | |
---|
| 479 | return response |
---|
| 480 | |
---|
[6] | 481 | d = self.affiliations(requestor, service) |
---|
[1] | 482 | d.addCallback(toResponse) |
---|
| 483 | return d |
---|
| 484 | |
---|
| 485 | def _onCreate(self, iq): |
---|
| 486 | requestor = jid.internJID(iq["from"]).userhostJID() |
---|
[6] | 487 | service = jid.internJID(iq["to"]) |
---|
[1] | 488 | nodeIdentifier = iq.pubsub.create.getAttribute("node") |
---|
| 489 | |
---|
| 490 | def toResponse(result): |
---|
| 491 | if not nodeIdentifier or nodeIdentifier != result: |
---|
| 492 | response = domish.Element((NS_PUBSUB, 'pubsub')) |
---|
| 493 | create = response.addElement('create') |
---|
| 494 | create['node'] = result |
---|
| 495 | return response |
---|
| 496 | else: |
---|
| 497 | return None |
---|
| 498 | |
---|
[6] | 499 | d = self.create(requestor, service, nodeIdentifier) |
---|
[1] | 500 | d.addCallback(toResponse) |
---|
| 501 | return d |
---|
| 502 | |
---|
| 503 | def _formFromConfiguration(self, options): |
---|
| 504 | form = data_form.Form(type="form", form_type=NS_PUBSUB_NODE_CONFIG) |
---|
| 505 | |
---|
| 506 | for option in options: |
---|
| 507 | form.add_field(**option) |
---|
| 508 | |
---|
| 509 | return form |
---|
| 510 | |
---|
| 511 | def _onDefault(self, iq): |
---|
| 512 | requestor = jid.internJID(iq["from"]).userhostJID() |
---|
[6] | 513 | service = jid.internJID(iq["to"]) |
---|
[1] | 514 | |
---|
| 515 | def toResponse(options): |
---|
| 516 | response = domish.Element((NS_PUBSUB_OWNER, "pubsub")) |
---|
| 517 | default = response.addElement("default") |
---|
| 518 | default.addChild(self._formFromConfiguration(options)) |
---|
| 519 | return response |
---|
| 520 | |
---|
[6] | 521 | d = self.getDefaultConfiguration(requestor, service) |
---|
[1] | 522 | d.addCallback(toResponse) |
---|
| 523 | return d |
---|
| 524 | |
---|
| 525 | def _onConfigureGet(self, iq): |
---|
| 526 | requestor = jid.internJID(iq["from"]).userhostJID() |
---|
[6] | 527 | service = jid.internJID(iq["to"]) |
---|
[1] | 528 | nodeIdentifier = iq.pubsub.configure.getAttribute("node") |
---|
| 529 | |
---|
| 530 | def toResponse(options): |
---|
| 531 | response = domish.Element((NS_PUBSUB_OWNER, "pubsub")) |
---|
| 532 | configure = response.addElement("configure") |
---|
| 533 | configure.addChild(self._formFromConfiguration(options)) |
---|
| 534 | |
---|
| 535 | if nodeIdentifier: |
---|
| 536 | configure["node"] = nodeIdentifier |
---|
| 537 | |
---|
| 538 | return response |
---|
| 539 | |
---|
[6] | 540 | d = self.getConfiguration(requestor, service, nodeIdentifier) |
---|
[1] | 541 | d.addCallback(toResponse) |
---|
| 542 | return d |
---|
| 543 | |
---|
| 544 | def _onConfigureSet(self, iq): |
---|
| 545 | requestor = jid.internJID(iq["from"]).userhostJID() |
---|
[6] | 546 | service = jid.internJID(iq["to"]) |
---|
[1] | 547 | nodeIdentifier = iq.pubsub.configure["node"] |
---|
| 548 | |
---|
| 549 | def getFormOptions(self, form): |
---|
| 550 | options = {} |
---|
| 551 | |
---|
| 552 | for element in form.elements(): |
---|
| 553 | if element.name == 'field' and \ |
---|
| 554 | element.uri == data_form.NS_X_DATA: |
---|
| 555 | try: |
---|
| 556 | options[element["var"]] = str(element.value) |
---|
| 557 | except (KeyError, AttributeError): |
---|
| 558 | raise BadRequest |
---|
| 559 | |
---|
| 560 | return options |
---|
| 561 | |
---|
| 562 | # Search configuration form with correct FORM_TYPE and process it |
---|
| 563 | |
---|
| 564 | for element in iq.pubsub.configure.elements(): |
---|
| 565 | if element.name != 'x' or element.uri != data_form.NS_X_DATA: |
---|
| 566 | continue |
---|
| 567 | |
---|
| 568 | type = element.getAttribute("type") |
---|
| 569 | if type == "cancel": |
---|
| 570 | return None |
---|
| 571 | elif type != "submit": |
---|
| 572 | continue |
---|
| 573 | |
---|
| 574 | options = getFormOptions(element) |
---|
| 575 | |
---|
| 576 | if options["FORM_TYPE"] == NS_PUBSUB + "#node_config": |
---|
| 577 | del options["FORM_TYPE"] |
---|
[6] | 578 | return self.setConfiguration(requestor, service, |
---|
| 579 | nodeIdentifier, options) |
---|
[1] | 580 | |
---|
| 581 | raise BadRequest |
---|
| 582 | |
---|
| 583 | def _onItems(self, iq): |
---|
| 584 | requestor = jid.internJID(iq["from"]).userhostJID() |
---|
[6] | 585 | service = jid.internJID(iq["to"]) |
---|
[1] | 586 | |
---|
| 587 | try: |
---|
| 588 | nodeIdentifier = iq.pubsub.items["node"] |
---|
| 589 | except KeyError: |
---|
| 590 | raise BadRequest |
---|
| 591 | |
---|
| 592 | maxItems = iq.pubsub.items.getAttribute('max_items') |
---|
| 593 | |
---|
| 594 | if maxItems: |
---|
| 595 | try: |
---|
| 596 | maxItems = int(maxItems) |
---|
| 597 | except ValueError: |
---|
| 598 | raise BadRequest |
---|
| 599 | |
---|
| 600 | itemIdentifiers = [] |
---|
| 601 | for child in iq.pubsub.items.elements(): |
---|
| 602 | if child.name == 'item' and child.uri == NS_PUBSUB: |
---|
| 603 | try: |
---|
| 604 | itemIdentifiers.append(child["id"]) |
---|
| 605 | except KeyError: |
---|
| 606 | raise BadRequest |
---|
| 607 | |
---|
| 608 | def toResponse(result): |
---|
| 609 | response = domish.Element((NS_PUBSUB, 'pubsub')) |
---|
| 610 | items = response.addElement('items') |
---|
| 611 | items["node"] = nodeIdentifier |
---|
| 612 | |
---|
| 613 | for item in result: |
---|
| 614 | items.addRawXml(item) |
---|
| 615 | |
---|
| 616 | return response |
---|
| 617 | |
---|
[6] | 618 | d = self.items(requestor, service, nodeIdentifier, maxItems, |
---|
| 619 | itemIdentifiers) |
---|
[1] | 620 | d.addCallback(toResponse) |
---|
| 621 | return d |
---|
| 622 | |
---|
| 623 | def _onRetract(self, iq): |
---|
| 624 | requestor = jid.internJID(iq["from"]).userhostJID() |
---|
[6] | 625 | service = jid.internJID(iq["to"]) |
---|
[1] | 626 | |
---|
| 627 | try: |
---|
| 628 | nodeIdentifier = iq.pubsub.retract["node"] |
---|
| 629 | except KeyError: |
---|
| 630 | raise BadRequest |
---|
| 631 | |
---|
| 632 | itemIdentifiers = [] |
---|
| 633 | for child in iq.pubsub.retract.elements(): |
---|
| 634 | if child.uri == NS_PUBSUB_OWNER and child.name == 'item': |
---|
| 635 | try: |
---|
| 636 | itemIdentifiers.append(child["id"]) |
---|
| 637 | except KeyError: |
---|
| 638 | raise BadRequest |
---|
| 639 | |
---|
[6] | 640 | return self.retract(requestor, service, nodeIdentifier, |
---|
| 641 | itemIdentifiers) |
---|
[1] | 642 | |
---|
| 643 | def _onPurge(self, iq): |
---|
| 644 | requestor = jid.internJID(iq["from"]).userhostJID() |
---|
[6] | 645 | service = jid.internJID(iq["to"]) |
---|
[1] | 646 | |
---|
| 647 | try: |
---|
| 648 | nodeIdentifier = iq.pubsub.purge["node"] |
---|
| 649 | except KeyError: |
---|
| 650 | raise BadRequest |
---|
| 651 | |
---|
[6] | 652 | return self.purge(requestor, service, nodeIdentifier) |
---|
[1] | 653 | |
---|
| 654 | def _onDelete(self, iq): |
---|
| 655 | requestor = jid.internJID(iq["from"]).userhostJID() |
---|
[6] | 656 | service = jid.internJID(iq["to"]) |
---|
[1] | 657 | |
---|
| 658 | try: |
---|
| 659 | nodeIdentifier = iq.pubsub.delete["node"] |
---|
| 660 | except KeyError: |
---|
| 661 | raise BadRequest |
---|
| 662 | |
---|
[6] | 663 | return self.delete(requestor, service, nodeIdentifier) |
---|
[1] | 664 | |
---|
| 665 | def _onAffiliationsGet(self, iq): |
---|
| 666 | raise Unsupported('modify-affiliations') |
---|
| 667 | |
---|
| 668 | def _onAffiliationsSet(self, iq): |
---|
| 669 | raise Unsupported('modify-affiliations') |
---|
| 670 | |
---|
| 671 | def _onSubscriptionsGet(self, iq): |
---|
| 672 | raise Unsupported('manage-subscriptions') |
---|
| 673 | |
---|
| 674 | def _onSubscriptionsSet(self, iq): |
---|
| 675 | raise Unsupported('manage-subscriptions') |
---|
| 676 | |
---|
| 677 | # public methods |
---|
| 678 | |
---|
[6] | 679 | def notifyPublish(self, service, nodeIdentifier, notifications): |
---|
[1] | 680 | |
---|
| 681 | print notifications |
---|
| 682 | for recipient, items in notifications: |
---|
| 683 | message = domish.Element((None, "message")) |
---|
[6] | 684 | message["from"] = service.full() |
---|
[1] | 685 | message["to"] = recipient.full() |
---|
| 686 | event = message.addElement((NS_PUBSUB_EVENT, "event")) |
---|
| 687 | element = event.addElement("items") |
---|
| 688 | element["node"] = nodeIdentifier |
---|
| 689 | element.children = items |
---|
| 690 | self.send(message) |
---|
| 691 | |
---|
[6] | 692 | def getNodeInfo(self, requestor, service, nodeIdentifier): |
---|
[1] | 693 | return None |
---|
| 694 | |
---|
[6] | 695 | def getNodes(self, requestor, service): |
---|
[1] | 696 | return [] |
---|
| 697 | |
---|
[6] | 698 | def publish(self, requestor, service, nodeIdentifier, items): |
---|
[1] | 699 | raise Unsupported('publish') |
---|
| 700 | |
---|
[6] | 701 | def subscribe(self, requestor, service, nodeIdentifier, subscriber): |
---|
[1] | 702 | raise Unsupported('subscribe') |
---|
| 703 | |
---|
[6] | 704 | def unsubscribe(self, requestor, service, nodeIdentifier, subscriber): |
---|
[1] | 705 | raise Unsupported('subscribe') |
---|
| 706 | |
---|
[6] | 707 | def subscriptions(self, requestor, service): |
---|
[1] | 708 | raise Unsupported('retrieve-subscriptions') |
---|
| 709 | |
---|
[6] | 710 | def affiliations(self, requestor, service): |
---|
[1] | 711 | raise Unsupported('retrieve-affiliations') |
---|
| 712 | |
---|
[6] | 713 | def create(self, requestor, service, nodeIdentifier): |
---|
[1] | 714 | raise Unsupported('create-nodes') |
---|
| 715 | |
---|
[6] | 716 | def getDefaultConfiguration(self, requestor, service): |
---|
[1] | 717 | raise Unsupported('retrieve-default') |
---|
| 718 | |
---|
[6] | 719 | def getConfiguration(self, requestor, service, nodeIdentifier): |
---|
[1] | 720 | raise Unsupported('config-node') |
---|
| 721 | |
---|
[6] | 722 | def setConfiguration(self, requestor, service, nodeIdentifier, options): |
---|
[1] | 723 | raise Unsupported('config-node') |
---|
| 724 | |
---|
[6] | 725 | def items(self, requestor, service, nodeIdentifier, maxItems, |
---|
| 726 | itemIdentifiers): |
---|
[1] | 727 | raise Unsupported('retrieve-items') |
---|
| 728 | |
---|
[6] | 729 | def retract(self, requestor, service, nodeIdentifier, itemIdentifiers): |
---|
[1] | 730 | raise Unsupported('retract-items') |
---|
| 731 | |
---|
[6] | 732 | def purge(self, requestor, service, nodeIdentifier): |
---|
[1] | 733 | raise Unsupported('purge-nodes') |
---|
| 734 | |
---|
[6] | 735 | def delete(self, requestor, service, nodeIdentifier): |
---|
[1] | 736 | raise Unsupported('delete-nodes') |
---|