[1] | 1 | # -*- test-case-name: wokkel.test.test_pubsub -*- |
---|
| 2 | # |
---|
[79] | 3 | # Copyright (c) 2003-2010 Ralph Meijer |
---|
[1] | 4 | # See LICENSE for details. |
---|
| 5 | |
---|
| 6 | """ |
---|
| 7 | XMPP publish-subscribe protocol. |
---|
| 8 | |
---|
| 9 | This protocol is specified in |
---|
| 10 | U{XEP-0060<http://www.xmpp.org/extensions/xep-0060.html>}. |
---|
| 11 | """ |
---|
| 12 | |
---|
| 13 | from zope.interface import implements |
---|
| 14 | |
---|
[19] | 15 | from twisted.internet import defer |
---|
[59] | 16 | from twisted.python import log |
---|
[63] | 17 | from twisted.words.protocols.jabber import jid, error |
---|
[1] | 18 | from twisted.words.xish import domish |
---|
| 19 | |
---|
[57] | 20 | from wokkel import disco, data_form, generic, shim |
---|
[63] | 21 | from wokkel.compat import IQ |
---|
[1] | 22 | from wokkel.subprotocols import IQHandlerMixin, XMPPHandler |
---|
[59] | 23 | from wokkel.iwokkel import IPubSubClient, IPubSubService, IPubSubResource |
---|
[1] | 24 | |
---|
| 25 | # Iq get and set XPath queries |
---|
| 26 | IQ_GET = '/iq[@type="get"]' |
---|
| 27 | IQ_SET = '/iq[@type="set"]' |
---|
| 28 | |
---|
| 29 | # Publish-subscribe namespaces |
---|
| 30 | NS_PUBSUB = 'http://jabber.org/protocol/pubsub' |
---|
| 31 | NS_PUBSUB_EVENT = NS_PUBSUB + '#event' |
---|
| 32 | NS_PUBSUB_ERRORS = NS_PUBSUB + '#errors' |
---|
| 33 | NS_PUBSUB_OWNER = NS_PUBSUB + "#owner" |
---|
| 34 | NS_PUBSUB_NODE_CONFIG = NS_PUBSUB + "#node_config" |
---|
| 35 | NS_PUBSUB_META_DATA = NS_PUBSUB + "#meta-data" |
---|
[57] | 36 | NS_PUBSUB_SUBSCRIBE_OPTIONS = NS_PUBSUB + "#subscribe_options" |
---|
[1] | 37 | |
---|
[57] | 38 | # XPath to match pubsub requests |
---|
| 39 | PUBSUB_REQUEST = '/iq[@type="get" or @type="set"]/' + \ |
---|
| 40 | 'pubsub[@xmlns="' + NS_PUBSUB + '" or ' + \ |
---|
| 41 | '@xmlns="' + NS_PUBSUB_OWNER + '"]' |
---|
[1] | 42 | |
---|
[2] | 43 | class SubscriptionPending(Exception): |
---|
| 44 | """ |
---|
| 45 | Raised when the requested subscription is pending acceptance. |
---|
| 46 | """ |
---|
| 47 | |
---|
| 48 | |
---|
[18] | 49 | |
---|
[2] | 50 | class SubscriptionUnconfigured(Exception): |
---|
| 51 | """ |
---|
| 52 | Raised when the requested subscription needs to be configured before |
---|
| 53 | becoming active. |
---|
| 54 | """ |
---|
| 55 | |
---|
| 56 | |
---|
[18] | 57 | |
---|
[1] | 58 | class PubSubError(error.StanzaError): |
---|
[2] | 59 | """ |
---|
| 60 | Exception with publish-subscribe specific condition. |
---|
| 61 | """ |
---|
[1] | 62 | def __init__(self, condition, pubsubCondition, feature=None, text=None): |
---|
| 63 | appCondition = domish.Element((NS_PUBSUB_ERRORS, pubsubCondition)) |
---|
| 64 | if feature: |
---|
| 65 | appCondition['feature'] = feature |
---|
| 66 | error.StanzaError.__init__(self, condition, |
---|
| 67 | text=text, |
---|
| 68 | appCondition=appCondition) |
---|
| 69 | |
---|
[2] | 70 | |
---|
[18] | 71 | |
---|
[57] | 72 | class BadRequest(error.StanzaError): |
---|
[30] | 73 | """ |
---|
| 74 | Bad request stanza error. |
---|
| 75 | """ |
---|
| 76 | def __init__(self, pubsubCondition=None, text=None): |
---|
[57] | 77 | if pubsubCondition: |
---|
| 78 | appCondition = domish.Element((NS_PUBSUB_ERRORS, pubsubCondition)) |
---|
| 79 | else: |
---|
| 80 | appCondition = None |
---|
| 81 | error.StanzaError.__init__(self, 'bad-request', |
---|
| 82 | text=text, |
---|
| 83 | appCondition=appCondition) |
---|
[30] | 84 | |
---|
| 85 | |
---|
| 86 | |
---|
[1] | 87 | class Unsupported(PubSubError): |
---|
| 88 | def __init__(self, feature, text=None): |
---|
[59] | 89 | self.feature = feature |
---|
[1] | 90 | PubSubError.__init__(self, 'feature-not-implemented', |
---|
| 91 | 'unsupported', |
---|
| 92 | feature, |
---|
| 93 | text) |
---|
| 94 | |
---|
[59] | 95 | def __str__(self): |
---|
| 96 | message = PubSubError.__str__(self) |
---|
| 97 | message += ', feature %r' % self.feature |
---|
| 98 | return message |
---|
[2] | 99 | |
---|
[18] | 100 | |
---|
[30] | 101 | class Subscription(object): |
---|
| 102 | """ |
---|
| 103 | A subscription to a node. |
---|
| 104 | |
---|
| 105 | @ivar nodeIdentifier: The identifier of the node subscribed to. |
---|
| 106 | The root node is denoted by C{None}. |
---|
| 107 | @ivar subscriber: The subscribing entity. |
---|
| 108 | @ivar state: The subscription state. One of C{'subscribed'}, C{'pending'}, |
---|
| 109 | C{'unconfigured'}. |
---|
| 110 | @ivar options: Optional list of subscription options. |
---|
| 111 | @type options: C{dict}. |
---|
| 112 | """ |
---|
| 113 | |
---|
| 114 | def __init__(self, nodeIdentifier, subscriber, state, options=None): |
---|
| 115 | self.nodeIdentifier = nodeIdentifier |
---|
| 116 | self.subscriber = subscriber |
---|
| 117 | self.state = state |
---|
| 118 | self.options = options or {} |
---|
[1] | 119 | |
---|
[2] | 120 | |
---|
[18] | 121 | |
---|
[2] | 122 | class Item(domish.Element): |
---|
| 123 | """ |
---|
| 124 | Publish subscribe item. |
---|
| 125 | |
---|
| 126 | This behaves like an object providing L{domish.IElement}. |
---|
| 127 | |
---|
| 128 | Item payload can be added using C{addChild} or C{addRawXml}, or using the |
---|
| 129 | C{payload} keyword argument to C{__init__}. |
---|
| 130 | """ |
---|
| 131 | |
---|
| 132 | def __init__(self, id=None, payload=None): |
---|
| 133 | """ |
---|
| 134 | @param id: optional item identifier |
---|
| 135 | @type id: L{unicode} |
---|
| 136 | @param payload: optional item payload. Either as a domish element, or |
---|
| 137 | as serialized XML. |
---|
| 138 | @type payload: object providing L{domish.IElement} or L{unicode}. |
---|
| 139 | """ |
---|
| 140 | |
---|
[18] | 141 | domish.Element.__init__(self, (NS_PUBSUB, 'item')) |
---|
[2] | 142 | if id is not None: |
---|
| 143 | self['id'] = id |
---|
| 144 | if payload is not None: |
---|
| 145 | if isinstance(payload, basestring): |
---|
| 146 | self.addRawXml(payload) |
---|
| 147 | else: |
---|
| 148 | self.addChild(payload) |
---|
| 149 | |
---|
[10] | 150 | |
---|
[18] | 151 | |
---|
[57] | 152 | class PubSubRequest(generic.Stanza): |
---|
[2] | 153 | """ |
---|
[57] | 154 | A publish-subscribe request. |
---|
[2] | 155 | |
---|
[57] | 156 | The set of instance variables used depends on the type of request. If |
---|
| 157 | a variable is not applicable or not passed in the request, its value is |
---|
| 158 | C{None}. |
---|
| 159 | |
---|
| 160 | @ivar verb: The type of publish-subscribe request. See L{_requestVerbMap}. |
---|
| 161 | @type verb: C{str}. |
---|
| 162 | |
---|
| 163 | @ivar affiliations: Affiliations to be modified. |
---|
| 164 | @type affiliations: C{set} |
---|
| 165 | @ivar items: The items to be published, as L{domish.Element}s. |
---|
| 166 | @type items: C{list} |
---|
| 167 | @ivar itemIdentifiers: Identifiers of the items to be retrieved or |
---|
| 168 | retracted. |
---|
| 169 | @type itemIdentifiers: C{set} |
---|
| 170 | @ivar maxItems: Maximum number of items to retrieve. |
---|
| 171 | @type maxItems: C{int}. |
---|
| 172 | @ivar nodeIdentifier: Identifier of the node the request is about. |
---|
| 173 | @type nodeIdentifier: C{unicode} |
---|
| 174 | @ivar nodeType: The type of node that should be created, or for which the |
---|
| 175 | configuration is retrieved. C{'leaf'} or C{'collection'}. |
---|
| 176 | @type nodeType: C{str} |
---|
| 177 | @ivar options: Configurations options for nodes, subscriptions and publish |
---|
| 178 | requests. |
---|
| 179 | @type options: L{data_form.Form} |
---|
| 180 | @ivar subscriber: The subscribing entity. |
---|
| 181 | @type subscriber: L{JID} |
---|
| 182 | @ivar subscriptionIdentifier: Identifier for a specific subscription. |
---|
| 183 | @type subscriptionIdentifier: C{unicode} |
---|
| 184 | @ivar subscriptions: Subscriptions to be modified, as a set of |
---|
| 185 | L{Subscription}. |
---|
| 186 | @type subscriptions: C{set} |
---|
[2] | 187 | """ |
---|
| 188 | |
---|
[57] | 189 | verb = None |
---|
[2] | 190 | |
---|
[57] | 191 | affiliations = None |
---|
| 192 | items = None |
---|
| 193 | itemIdentifiers = None |
---|
| 194 | maxItems = None |
---|
| 195 | nodeIdentifier = None |
---|
| 196 | nodeType = None |
---|
| 197 | options = None |
---|
| 198 | subscriber = None |
---|
| 199 | subscriptionIdentifier = None |
---|
| 200 | subscriptions = None |
---|
[2] | 201 | |
---|
[57] | 202 | # Map request iq type and subelement name to request verb |
---|
| 203 | _requestVerbMap = { |
---|
| 204 | ('set', NS_PUBSUB, 'publish'): 'publish', |
---|
| 205 | ('set', NS_PUBSUB, 'subscribe'): 'subscribe', |
---|
| 206 | ('set', NS_PUBSUB, 'unsubscribe'): 'unsubscribe', |
---|
| 207 | ('get', NS_PUBSUB, 'options'): 'optionsGet', |
---|
| 208 | ('set', NS_PUBSUB, 'options'): 'optionsSet', |
---|
| 209 | ('get', NS_PUBSUB, 'subscriptions'): 'subscriptions', |
---|
| 210 | ('get', NS_PUBSUB, 'affiliations'): 'affiliations', |
---|
| 211 | ('set', NS_PUBSUB, 'create'): 'create', |
---|
| 212 | ('get', NS_PUBSUB_OWNER, 'default'): 'default', |
---|
| 213 | ('get', NS_PUBSUB_OWNER, 'configure'): 'configureGet', |
---|
| 214 | ('set', NS_PUBSUB_OWNER, 'configure'): 'configureSet', |
---|
| 215 | ('get', NS_PUBSUB, 'items'): 'items', |
---|
| 216 | ('set', NS_PUBSUB, 'retract'): 'retract', |
---|
| 217 | ('set', NS_PUBSUB_OWNER, 'purge'): 'purge', |
---|
| 218 | ('set', NS_PUBSUB_OWNER, 'delete'): 'delete', |
---|
| 219 | ('get', NS_PUBSUB_OWNER, 'affiliations'): 'affiliationsGet', |
---|
| 220 | ('set', NS_PUBSUB_OWNER, 'affiliations'): 'affiliationsSet', |
---|
| 221 | ('get', NS_PUBSUB_OWNER, 'subscriptions'): 'subscriptionsGet', |
---|
| 222 | ('set', NS_PUBSUB_OWNER, 'subscriptions'): 'subscriptionsSet', |
---|
| 223 | } |
---|
[25] | 224 | |
---|
[57] | 225 | # Map request verb to request iq type and subelement name |
---|
| 226 | _verbRequestMap = dict(((v, k) for k, v in _requestVerbMap.iteritems())) |
---|
| 227 | |
---|
| 228 | # Map request verb to parameter handler names |
---|
| 229 | _parameters = { |
---|
| 230 | 'publish': ['node', 'items'], |
---|
| 231 | 'subscribe': ['nodeOrEmpty', 'jid'], |
---|
| 232 | 'unsubscribe': ['nodeOrEmpty', 'jid'], |
---|
| 233 | 'optionsGet': ['nodeOrEmpty', 'jid'], |
---|
| 234 | 'optionsSet': ['nodeOrEmpty', 'jid', 'options'], |
---|
| 235 | 'subscriptions': [], |
---|
| 236 | 'affiliations': [], |
---|
| 237 | 'create': ['nodeOrNone'], |
---|
| 238 | 'default': ['default'], |
---|
| 239 | 'configureGet': ['nodeOrEmpty'], |
---|
| 240 | 'configureSet': ['nodeOrEmpty', 'configure'], |
---|
| 241 | 'items': ['node', 'maxItems', 'itemIdentifiers'], |
---|
| 242 | 'retract': ['node', 'itemIdentifiers'], |
---|
| 243 | 'purge': ['node'], |
---|
| 244 | 'delete': ['node'], |
---|
[59] | 245 | 'affiliationsGet': ['nodeOrEmpty'], |
---|
[57] | 246 | 'affiliationsSet': [], |
---|
[59] | 247 | 'subscriptionsGet': ['nodeOrEmpty'], |
---|
[57] | 248 | 'subscriptionsSet': [], |
---|
| 249 | } |
---|
| 250 | |
---|
| 251 | def __init__(self, verb=None): |
---|
| 252 | self.verb = verb |
---|
| 253 | |
---|
| 254 | |
---|
| 255 | def _parse_node(self, verbElement): |
---|
[10] | 256 | """ |
---|
[57] | 257 | Parse the required node identifier out of the verbElement. |
---|
| 258 | """ |
---|
| 259 | try: |
---|
| 260 | self.nodeIdentifier = verbElement["node"] |
---|
| 261 | except KeyError: |
---|
| 262 | raise BadRequest('nodeid-required') |
---|
| 263 | |
---|
| 264 | |
---|
| 265 | def _render_node(self, verbElement): |
---|
| 266 | """ |
---|
| 267 | Render the required node identifier on the verbElement. |
---|
| 268 | """ |
---|
| 269 | if not self.nodeIdentifier: |
---|
| 270 | raise Exception("Node identifier is required") |
---|
| 271 | |
---|
| 272 | verbElement['node'] = self.nodeIdentifier |
---|
| 273 | |
---|
| 274 | |
---|
| 275 | def _parse_nodeOrEmpty(self, verbElement): |
---|
| 276 | """ |
---|
| 277 | Parse the node identifier out of the verbElement. May be empty. |
---|
| 278 | """ |
---|
| 279 | self.nodeIdentifier = verbElement.getAttribute("node", '') |
---|
| 280 | |
---|
| 281 | |
---|
| 282 | def _render_nodeOrEmpty(self, verbElement): |
---|
| 283 | """ |
---|
| 284 | Render the node identifier on the verbElement. May be empty. |
---|
| 285 | """ |
---|
| 286 | if self.nodeIdentifier: |
---|
| 287 | verbElement['node'] = self.nodeIdentifier |
---|
| 288 | |
---|
| 289 | |
---|
| 290 | def _parse_nodeOrNone(self, verbElement): |
---|
| 291 | """ |
---|
| 292 | Parse the optional node identifier out of the verbElement. |
---|
| 293 | """ |
---|
| 294 | self.nodeIdentifier = verbElement.getAttribute("node") |
---|
| 295 | |
---|
| 296 | |
---|
| 297 | def _render_nodeOrNone(self, verbElement): |
---|
| 298 | """ |
---|
| 299 | Render the optional node identifier on the verbElement. |
---|
| 300 | """ |
---|
| 301 | if self.nodeIdentifier: |
---|
| 302 | verbElement['node'] = self.nodeIdentifier |
---|
| 303 | |
---|
| 304 | |
---|
| 305 | def _parse_items(self, verbElement): |
---|
| 306 | """ |
---|
| 307 | Parse items out of the verbElement for publish requests. |
---|
| 308 | """ |
---|
| 309 | self.items = [] |
---|
| 310 | for element in verbElement.elements(): |
---|
| 311 | if element.uri == NS_PUBSUB and element.name == 'item': |
---|
| 312 | self.items.append(element) |
---|
| 313 | |
---|
| 314 | |
---|
| 315 | def _render_items(self, verbElement): |
---|
| 316 | """ |
---|
| 317 | Render items into the verbElement for publish requests. |
---|
| 318 | """ |
---|
| 319 | if self.items: |
---|
| 320 | for item in self.items: |
---|
| 321 | verbElement.addChild(item) |
---|
| 322 | |
---|
| 323 | |
---|
| 324 | def _parse_jid(self, verbElement): |
---|
| 325 | """ |
---|
| 326 | Parse subscriber out of the verbElement for un-/subscribe requests. |
---|
| 327 | """ |
---|
| 328 | try: |
---|
| 329 | self.subscriber = jid.internJID(verbElement["jid"]) |
---|
| 330 | except KeyError: |
---|
| 331 | raise BadRequest('jid-required') |
---|
| 332 | |
---|
| 333 | |
---|
| 334 | def _render_jid(self, verbElement): |
---|
| 335 | """ |
---|
| 336 | Render subscriber into the verbElement for un-/subscribe requests. |
---|
| 337 | """ |
---|
| 338 | verbElement['jid'] = self.subscriber.full() |
---|
| 339 | |
---|
| 340 | |
---|
| 341 | def _parse_default(self, verbElement): |
---|
| 342 | """ |
---|
| 343 | Parse node type out of a request for the default node configuration. |
---|
| 344 | """ |
---|
[80] | 345 | form = data_form.findForm(verbElement, NS_PUBSUB_NODE_CONFIG) |
---|
[57] | 346 | if form and form.formType == 'submit': |
---|
| 347 | values = form.getValues() |
---|
| 348 | self.nodeType = values.get('pubsub#node_type', 'leaf') |
---|
| 349 | else: |
---|
| 350 | self.nodeType = 'leaf' |
---|
| 351 | |
---|
| 352 | |
---|
| 353 | def _parse_configure(self, verbElement): |
---|
| 354 | """ |
---|
| 355 | Parse options out of a request for setting the node configuration. |
---|
| 356 | """ |
---|
[80] | 357 | form = data_form.findForm(verbElement, NS_PUBSUB_NODE_CONFIG) |
---|
[57] | 358 | if form: |
---|
[81] | 359 | if form.formType in ('submit', 'cancel'): |
---|
| 360 | self.options = form |
---|
[57] | 361 | else: |
---|
[81] | 362 | raise BadRequest(text=u"Unexpected form type '%s'" % form.formType) |
---|
[57] | 363 | else: |
---|
| 364 | raise BadRequest(text="Missing configuration form") |
---|
| 365 | |
---|
| 366 | |
---|
| 367 | |
---|
| 368 | def _parse_itemIdentifiers(self, verbElement): |
---|
| 369 | """ |
---|
| 370 | Parse item identifiers out of items and retract requests. |
---|
| 371 | """ |
---|
| 372 | self.itemIdentifiers = [] |
---|
| 373 | for element in verbElement.elements(): |
---|
| 374 | if element.uri == NS_PUBSUB and element.name == 'item': |
---|
| 375 | try: |
---|
| 376 | self.itemIdentifiers.append(element["id"]) |
---|
| 377 | except KeyError: |
---|
| 378 | raise BadRequest() |
---|
| 379 | |
---|
| 380 | |
---|
| 381 | def _render_itemIdentifiers(self, verbElement): |
---|
| 382 | """ |
---|
| 383 | Render item identifiers into items and retract requests. |
---|
| 384 | """ |
---|
| 385 | if self.itemIdentifiers: |
---|
| 386 | for itemIdentifier in self.itemIdentifiers: |
---|
| 387 | item = verbElement.addElement('item') |
---|
| 388 | item['id'] = itemIdentifier |
---|
| 389 | |
---|
| 390 | |
---|
| 391 | def _parse_maxItems(self, verbElement): |
---|
| 392 | """ |
---|
| 393 | Parse maximum items out of an items request. |
---|
| 394 | """ |
---|
| 395 | value = verbElement.getAttribute('max_items') |
---|
| 396 | |
---|
| 397 | if value: |
---|
| 398 | try: |
---|
| 399 | self.maxItems = int(value) |
---|
| 400 | except ValueError: |
---|
| 401 | raise BadRequest(text="Field max_items requires a positive " + |
---|
| 402 | "integer value") |
---|
| 403 | |
---|
| 404 | |
---|
| 405 | def _render_maxItems(self, verbElement): |
---|
| 406 | """ |
---|
| 407 | Parse maximum items into an items request. |
---|
| 408 | """ |
---|
| 409 | if self.maxItems: |
---|
| 410 | verbElement['max_items'] = unicode(self.maxItems) |
---|
| 411 | |
---|
| 412 | |
---|
| 413 | def _parse_options(self, verbElement): |
---|
[81] | 414 | """ |
---|
| 415 | Parse options form out of a subscription options request. |
---|
| 416 | """ |
---|
[80] | 417 | form = data_form.findForm(verbElement, NS_PUBSUB_SUBSCRIBE_OPTIONS) |
---|
[57] | 418 | if form: |
---|
[81] | 419 | if form.formType in ('submit', 'cancel'): |
---|
| 420 | self.options = form |
---|
[57] | 421 | else: |
---|
[81] | 422 | raise BadRequest(text=u"Unexpected form type '%s'" % form.formType) |
---|
[57] | 423 | else: |
---|
| 424 | raise BadRequest(text="Missing options form") |
---|
| 425 | |
---|
[81] | 426 | |
---|
[57] | 427 | def parseElement(self, element): |
---|
| 428 | """ |
---|
| 429 | Parse the publish-subscribe verb and parameters out of a request. |
---|
| 430 | """ |
---|
| 431 | generic.Stanza.parseElement(self, element) |
---|
| 432 | |
---|
| 433 | for child in element.pubsub.elements(): |
---|
| 434 | key = (self.stanzaType, child.uri, child.name) |
---|
| 435 | try: |
---|
| 436 | verb = self._requestVerbMap[key] |
---|
| 437 | except KeyError: |
---|
| 438 | continue |
---|
| 439 | else: |
---|
| 440 | self.verb = verb |
---|
| 441 | break |
---|
| 442 | |
---|
| 443 | if not self.verb: |
---|
| 444 | raise NotImplementedError() |
---|
| 445 | |
---|
| 446 | for parameter in self._parameters[verb]: |
---|
| 447 | getattr(self, '_parse_%s' % parameter)(child) |
---|
| 448 | |
---|
| 449 | |
---|
| 450 | def send(self, xs): |
---|
| 451 | """ |
---|
| 452 | Send this request to its recipient. |
---|
| 453 | |
---|
| 454 | This renders all of the relevant parameters for this specific |
---|
[63] | 455 | requests into an L{IQ}, and invoke its C{send} method. |
---|
[57] | 456 | This returns a deferred that fires upon reception of a response. See |
---|
[63] | 457 | L{IQ} for details. |
---|
[57] | 458 | |
---|
| 459 | @param xs: The XML stream to send the request on. |
---|
| 460 | @type xs: L{xmlstream.XmlStream} |
---|
| 461 | @rtype: L{defer.Deferred}. |
---|
| 462 | """ |
---|
| 463 | |
---|
| 464 | try: |
---|
| 465 | (self.stanzaType, |
---|
| 466 | childURI, |
---|
| 467 | childName) = self._verbRequestMap[self.verb] |
---|
| 468 | except KeyError: |
---|
| 469 | raise NotImplementedError() |
---|
| 470 | |
---|
[63] | 471 | iq = IQ(xs, self.stanzaType) |
---|
[57] | 472 | iq.addElement((childURI, 'pubsub')) |
---|
| 473 | verbElement = iq.pubsub.addElement(childName) |
---|
| 474 | |
---|
| 475 | if self.sender: |
---|
| 476 | iq['from'] = self.sender.full() |
---|
| 477 | if self.recipient: |
---|
| 478 | iq['to'] = self.recipient.full() |
---|
| 479 | |
---|
| 480 | for parameter in self._parameters[self.verb]: |
---|
| 481 | getattr(self, '_render_%s' % parameter)(verbElement) |
---|
| 482 | |
---|
| 483 | return iq.send() |
---|
[2] | 484 | |
---|
[10] | 485 | |
---|
| 486 | |
---|
[27] | 487 | class PubSubEvent(object): |
---|
| 488 | """ |
---|
| 489 | A publish subscribe event. |
---|
| 490 | |
---|
| 491 | @param sender: The entity from which the notification was received. |
---|
| 492 | @type sender: L{jid.JID} |
---|
| 493 | @param recipient: The entity to which the notification was sent. |
---|
| 494 | @type recipient: L{wokkel.pubsub.ItemsEvent} |
---|
| 495 | @param nodeIdentifier: Identifier of the node the event pertains to. |
---|
| 496 | @type nodeIdentifier: C{unicode} |
---|
| 497 | @param headers: SHIM headers, see L{wokkel.shim.extractHeaders}. |
---|
| 498 | @type headers: L{dict} |
---|
| 499 | """ |
---|
| 500 | |
---|
| 501 | def __init__(self, sender, recipient, nodeIdentifier, headers): |
---|
| 502 | self.sender = sender |
---|
| 503 | self.recipient = recipient |
---|
| 504 | self.nodeIdentifier = nodeIdentifier |
---|
| 505 | self.headers = headers |
---|
| 506 | |
---|
| 507 | |
---|
| 508 | |
---|
| 509 | class ItemsEvent(PubSubEvent): |
---|
| 510 | """ |
---|
| 511 | A publish-subscribe event that signifies new, updated and retracted items. |
---|
| 512 | |
---|
| 513 | @param items: List of received items as domish elements. |
---|
| 514 | @type items: C{list} of L{domish.Element} |
---|
| 515 | """ |
---|
| 516 | |
---|
| 517 | def __init__(self, sender, recipient, nodeIdentifier, items, headers): |
---|
| 518 | PubSubEvent.__init__(self, sender, recipient, nodeIdentifier, headers) |
---|
| 519 | self.items = items |
---|
| 520 | |
---|
| 521 | |
---|
| 522 | |
---|
| 523 | class DeleteEvent(PubSubEvent): |
---|
| 524 | """ |
---|
| 525 | A publish-subscribe event that signifies the deletion of a node. |
---|
| 526 | """ |
---|
| 527 | |
---|
[43] | 528 | redirectURI = None |
---|
| 529 | |
---|
[27] | 530 | |
---|
| 531 | |
---|
| 532 | class PurgeEvent(PubSubEvent): |
---|
| 533 | """ |
---|
| 534 | A publish-subscribe event that signifies the purging of a node. |
---|
| 535 | """ |
---|
| 536 | |
---|
| 537 | |
---|
| 538 | |
---|
[2] | 539 | class PubSubClient(XMPPHandler): |
---|
| 540 | """ |
---|
| 541 | Publish subscribe client protocol. |
---|
| 542 | """ |
---|
| 543 | |
---|
| 544 | implements(IPubSubClient) |
---|
| 545 | |
---|
| 546 | def connectionInitialized(self): |
---|
[13] | 547 | self.xmlstream.addObserver('/message/event[@xmlns="%s"]' % |
---|
| 548 | NS_PUBSUB_EVENT, self._onEvent) |
---|
[2] | 549 | |
---|
[25] | 550 | |
---|
[13] | 551 | def _onEvent(self, message): |
---|
[2] | 552 | try: |
---|
[27] | 553 | sender = jid.JID(message["from"]) |
---|
[6] | 554 | recipient = jid.JID(message["to"]) |
---|
[2] | 555 | except KeyError: |
---|
| 556 | return |
---|
| 557 | |
---|
[15] | 558 | actionElement = None |
---|
[13] | 559 | for element in message.event.elements(): |
---|
| 560 | if element.uri == NS_PUBSUB_EVENT: |
---|
| 561 | actionElement = element |
---|
| 562 | |
---|
| 563 | if not actionElement: |
---|
| 564 | return |
---|
| 565 | |
---|
| 566 | eventHandler = getattr(self, "_onEvent_%s" % actionElement.name, None) |
---|
| 567 | |
---|
| 568 | if eventHandler: |
---|
[27] | 569 | headers = shim.extractHeaders(message) |
---|
| 570 | eventHandler(sender, recipient, actionElement, headers) |
---|
[13] | 571 | message.handled = True |
---|
| 572 | |
---|
[30] | 573 | |
---|
[27] | 574 | def _onEvent_items(self, sender, recipient, action, headers): |
---|
[13] | 575 | nodeIdentifier = action["node"] |
---|
| 576 | |
---|
| 577 | items = [element for element in action.elements() |
---|
| 578 | if element.name in ('item', 'retract')] |
---|
[2] | 579 | |
---|
[27] | 580 | event = ItemsEvent(sender, recipient, nodeIdentifier, items, headers) |
---|
| 581 | self.itemsReceived(event) |
---|
[2] | 582 | |
---|
[30] | 583 | |
---|
[27] | 584 | def _onEvent_delete(self, sender, recipient, action, headers): |
---|
[13] | 585 | nodeIdentifier = action["node"] |
---|
[27] | 586 | event = DeleteEvent(sender, recipient, nodeIdentifier, headers) |
---|
[43] | 587 | if action.redirect: |
---|
| 588 | event.redirectURI = action.redirect.getAttribute('uri') |
---|
[27] | 589 | self.deleteReceived(event) |
---|
[13] | 590 | |
---|
[30] | 591 | |
---|
[27] | 592 | def _onEvent_purge(self, sender, recipient, action, headers): |
---|
[13] | 593 | nodeIdentifier = action["node"] |
---|
[27] | 594 | event = PurgeEvent(sender, recipient, nodeIdentifier, headers) |
---|
| 595 | self.purgeReceived(event) |
---|
[13] | 596 | |
---|
[30] | 597 | |
---|
[27] | 598 | def itemsReceived(self, event): |
---|
[2] | 599 | pass |
---|
| 600 | |
---|
[30] | 601 | |
---|
[27] | 602 | def deleteReceived(self, event): |
---|
[13] | 603 | pass |
---|
| 604 | |
---|
[30] | 605 | |
---|
[27] | 606 | def purgeReceived(self, event): |
---|
[13] | 607 | pass |
---|
| 608 | |
---|
[30] | 609 | |
---|
[58] | 610 | def createNode(self, service, nodeIdentifier=None, sender=None): |
---|
[18] | 611 | """ |
---|
| 612 | Create a publish subscribe node. |
---|
| 613 | |
---|
| 614 | @param service: The publish subscribe service to create the node at. |
---|
| 615 | @type service: L{JID} |
---|
| 616 | @param nodeIdentifier: Optional suggestion for the id of the node. |
---|
| 617 | @type nodeIdentifier: C{unicode} |
---|
| 618 | """ |
---|
[57] | 619 | request = PubSubRequest('create') |
---|
| 620 | request.recipient = service |
---|
| 621 | request.nodeIdentifier = nodeIdentifier |
---|
[58] | 622 | request.sender = sender |
---|
[2] | 623 | |
---|
| 624 | def cb(iq): |
---|
| 625 | try: |
---|
| 626 | new_node = iq.pubsub.create["node"] |
---|
| 627 | except AttributeError: |
---|
| 628 | # the suggested node identifier was accepted |
---|
[6] | 629 | new_node = nodeIdentifier |
---|
[2] | 630 | return new_node |
---|
| 631 | |
---|
[57] | 632 | d = request.send(self.xmlstream) |
---|
| 633 | d.addCallback(cb) |
---|
| 634 | return d |
---|
[2] | 635 | |
---|
[25] | 636 | |
---|
[58] | 637 | def deleteNode(self, service, nodeIdentifier, sender=None): |
---|
[18] | 638 | """ |
---|
| 639 | Delete a publish subscribe node. |
---|
| 640 | |
---|
| 641 | @param service: The publish subscribe service to delete the node from. |
---|
| 642 | @type service: L{JID} |
---|
| 643 | @param nodeIdentifier: The identifier of the node. |
---|
| 644 | @type nodeIdentifier: C{unicode} |
---|
| 645 | """ |
---|
[57] | 646 | request = PubSubRequest('delete') |
---|
| 647 | request.recipient = service |
---|
| 648 | request.nodeIdentifier = nodeIdentifier |
---|
[58] | 649 | request.sender = sender |
---|
[57] | 650 | return request.send(self.xmlstream) |
---|
[2] | 651 | |
---|
[25] | 652 | |
---|
[58] | 653 | def subscribe(self, service, nodeIdentifier, subscriber, sender=None): |
---|
[18] | 654 | """ |
---|
| 655 | Subscribe to a publish subscribe node. |
---|
| 656 | |
---|
| 657 | @param service: The publish subscribe service that keeps the node. |
---|
| 658 | @type service: L{JID} |
---|
| 659 | @param nodeIdentifier: The identifier of the node. |
---|
| 660 | @type nodeIdentifier: C{unicode} |
---|
| 661 | @param subscriber: The entity to subscribe to the node. This entity |
---|
| 662 | will get notifications of new published items. |
---|
| 663 | @type subscriber: L{JID} |
---|
| 664 | """ |
---|
[57] | 665 | request = PubSubRequest('subscribe') |
---|
| 666 | request.recipient = service |
---|
| 667 | request.nodeIdentifier = nodeIdentifier |
---|
| 668 | request.subscriber = subscriber |
---|
[58] | 669 | request.sender = sender |
---|
[2] | 670 | |
---|
| 671 | def cb(iq): |
---|
| 672 | subscription = iq.pubsub.subscription["subscription"] |
---|
| 673 | |
---|
| 674 | if subscription == 'pending': |
---|
| 675 | raise SubscriptionPending |
---|
| 676 | elif subscription == 'unconfigured': |
---|
| 677 | raise SubscriptionUnconfigured |
---|
| 678 | else: |
---|
| 679 | # we assume subscription == 'subscribed' |
---|
| 680 | # any other value would be invalid, but that should have |
---|
| 681 | # yielded a stanza error. |
---|
| 682 | return None |
---|
| 683 | |
---|
[57] | 684 | d = request.send(self.xmlstream) |
---|
| 685 | d.addCallback(cb) |
---|
| 686 | return d |
---|
[2] | 687 | |
---|
[25] | 688 | |
---|
[58] | 689 | def unsubscribe(self, service, nodeIdentifier, subscriber, sender=None): |
---|
[18] | 690 | """ |
---|
| 691 | Unsubscribe from a publish subscribe node. |
---|
| 692 | |
---|
| 693 | @param service: The publish subscribe service that keeps the node. |
---|
| 694 | @type service: L{JID} |
---|
| 695 | @param nodeIdentifier: The identifier of the node. |
---|
| 696 | @type nodeIdentifier: C{unicode} |
---|
| 697 | @param subscriber: The entity to unsubscribe from the node. |
---|
| 698 | @type subscriber: L{JID} |
---|
| 699 | """ |
---|
[57] | 700 | request = PubSubRequest('unsubscribe') |
---|
| 701 | request.recipient = service |
---|
| 702 | request.nodeIdentifier = nodeIdentifier |
---|
| 703 | request.subscriber = subscriber |
---|
[58] | 704 | request.sender = sender |
---|
[57] | 705 | return request.send(self.xmlstream) |
---|
[10] | 706 | |
---|
[25] | 707 | |
---|
[58] | 708 | def publish(self, service, nodeIdentifier, items=None, sender=None): |
---|
[18] | 709 | """ |
---|
| 710 | Publish to a publish subscribe node. |
---|
| 711 | |
---|
| 712 | @param service: The publish subscribe service that keeps the node. |
---|
| 713 | @type service: L{JID} |
---|
| 714 | @param nodeIdentifier: The identifier of the node. |
---|
| 715 | @type nodeIdentifier: C{unicode} |
---|
| 716 | @param items: Optional list of L{Item}s to publish. |
---|
| 717 | @type items: C{list} |
---|
| 718 | """ |
---|
[57] | 719 | request = PubSubRequest('publish') |
---|
| 720 | request.recipient = service |
---|
| 721 | request.nodeIdentifier = nodeIdentifier |
---|
| 722 | request.items = items |
---|
[58] | 723 | request.sender = sender |
---|
[57] | 724 | return request.send(self.xmlstream) |
---|
[2] | 725 | |
---|
[25] | 726 | |
---|
[58] | 727 | def items(self, service, nodeIdentifier, maxItems=None, sender=None): |
---|
[18] | 728 | """ |
---|
| 729 | Retrieve previously published items from a publish subscribe node. |
---|
| 730 | |
---|
| 731 | @param service: The publish subscribe service that keeps the node. |
---|
| 732 | @type service: L{JID} |
---|
| 733 | @param nodeIdentifier: The identifier of the node. |
---|
| 734 | @type nodeIdentifier: C{unicode} |
---|
| 735 | @param maxItems: Optional limit on the number of retrieved items. |
---|
| 736 | @type maxItems: C{int} |
---|
| 737 | """ |
---|
[57] | 738 | request = PubSubRequest('items') |
---|
| 739 | request.recipient = service |
---|
| 740 | request.nodeIdentifier = nodeIdentifier |
---|
[18] | 741 | if maxItems: |
---|
[57] | 742 | request.maxItems = str(int(maxItems)) |
---|
[58] | 743 | request.sender = sender |
---|
[18] | 744 | |
---|
[17] | 745 | def cb(iq): |
---|
| 746 | items = [] |
---|
| 747 | for element in iq.pubsub.items.elements(): |
---|
| 748 | if element.uri == NS_PUBSUB and element.name == 'item': |
---|
| 749 | items.append(element) |
---|
| 750 | return items |
---|
| 751 | |
---|
[57] | 752 | d = request.send(self.xmlstream) |
---|
| 753 | d.addCallback(cb) |
---|
| 754 | return d |
---|
[10] | 755 | |
---|
[18] | 756 | |
---|
| 757 | |
---|
[1] | 758 | class PubSubService(XMPPHandler, IQHandlerMixin): |
---|
| 759 | """ |
---|
| 760 | Protocol implementation for a XMPP Publish Subscribe Service. |
---|
| 761 | |
---|
| 762 | The word Service here is used as taken from the Publish Subscribe |
---|
| 763 | specification. It is the party responsible for keeping nodes and their |
---|
| 764 | subscriptions, and sending out notifications. |
---|
| 765 | |
---|
| 766 | Methods from the L{IPubSubService} interface that are called as |
---|
| 767 | a result of an XMPP request may raise exceptions. Alternatively the |
---|
| 768 | deferred returned by these methods may have their errback called. These are |
---|
| 769 | handled as follows: |
---|
| 770 | |
---|
[22] | 771 | - If the exception is an instance of L{error.StanzaError}, an error |
---|
| 772 | response iq is returned. |
---|
| 773 | - Any other exception is reported using L{log.msg}. An error response |
---|
| 774 | with the condition C{internal-server-error} is returned. |
---|
[1] | 775 | |
---|
| 776 | The default implementation of said methods raises an L{Unsupported} |
---|
| 777 | exception and are meant to be overridden. |
---|
| 778 | |
---|
| 779 | @ivar discoIdentity: Service discovery identity as a dictionary with |
---|
| 780 | keys C{'category'}, C{'type'} and C{'name'}. |
---|
| 781 | @ivar pubSubFeatures: List of supported publish-subscribe features for |
---|
| 782 | service discovery, as C{str}. |
---|
[22] | 783 | @type pubSubFeatures: C{list} or C{None} |
---|
[1] | 784 | """ |
---|
| 785 | |
---|
| 786 | implements(IPubSubService) |
---|
| 787 | |
---|
| 788 | iqHandlers = { |
---|
[57] | 789 | '/*': '_onPubSubRequest', |
---|
[1] | 790 | } |
---|
| 791 | |
---|
[59] | 792 | _legacyHandlers = { |
---|
| 793 | 'publish': ('publish', ['sender', 'recipient', |
---|
| 794 | 'nodeIdentifier', 'items']), |
---|
| 795 | 'subscribe': ('subscribe', ['sender', 'recipient', |
---|
| 796 | 'nodeIdentifier', 'subscriber']), |
---|
| 797 | 'unsubscribe': ('unsubscribe', ['sender', 'recipient', |
---|
| 798 | 'nodeIdentifier', 'subscriber']), |
---|
| 799 | 'subscriptions': ('subscriptions', ['sender', 'recipient']), |
---|
| 800 | 'affiliations': ('affiliations', ['sender', 'recipient']), |
---|
| 801 | 'create': ('create', ['sender', 'recipient', 'nodeIdentifier']), |
---|
| 802 | 'getConfigurationOptions': ('getConfigurationOptions', []), |
---|
| 803 | 'default': ('getDefaultConfiguration', |
---|
| 804 | ['sender', 'recipient', 'nodeType']), |
---|
| 805 | 'configureGet': ('getConfiguration', ['sender', 'recipient', |
---|
| 806 | 'nodeIdentifier']), |
---|
| 807 | 'configureSet': ('setConfiguration', ['sender', 'recipient', |
---|
| 808 | 'nodeIdentifier', 'options']), |
---|
| 809 | 'items': ('items', ['sender', 'recipient', 'nodeIdentifier', |
---|
| 810 | 'maxItems', 'itemIdentifiers']), |
---|
| 811 | 'retract': ('retract', ['sender', 'recipient', 'nodeIdentifier', |
---|
| 812 | 'itemIdentifiers']), |
---|
| 813 | 'purge': ('purge', ['sender', 'recipient', 'nodeIdentifier']), |
---|
| 814 | 'delete': ('delete', ['sender', 'recipient', 'nodeIdentifier']), |
---|
| 815 | } |
---|
[25] | 816 | |
---|
[59] | 817 | hideNodes = False |
---|
| 818 | |
---|
| 819 | def __init__(self, resource=None): |
---|
| 820 | self.resource = resource |
---|
[1] | 821 | self.discoIdentity = {'category': 'pubsub', |
---|
| 822 | 'type': 'generic', |
---|
| 823 | 'name': 'Generic Publish-Subscribe Service'} |
---|
| 824 | |
---|
| 825 | self.pubSubFeatures = [] |
---|
| 826 | |
---|
[25] | 827 | |
---|
[1] | 828 | def connectionMade(self): |
---|
[57] | 829 | self.xmlstream.addObserver(PUBSUB_REQUEST, self.handleRequest) |
---|
[1] | 830 | |
---|
[25] | 831 | |
---|
[6] | 832 | def getDiscoInfo(self, requestor, target, nodeIdentifier): |
---|
[59] | 833 | def toInfo(nodeInfo, info): |
---|
[25] | 834 | if not nodeInfo: |
---|
[59] | 835 | return info |
---|
[1] | 836 | |
---|
[25] | 837 | (nodeType, metaData) = nodeInfo['type'], nodeInfo['meta-data'] |
---|
| 838 | info.append(disco.DiscoIdentity('pubsub', nodeType)) |
---|
| 839 | if metaData: |
---|
| 840 | form = data_form.Form(formType="result", |
---|
| 841 | formNamespace=NS_PUBSUB_META_DATA) |
---|
[29] | 842 | form.addField( |
---|
[25] | 843 | data_form.Field( |
---|
| 844 | var='pubsub#node_type', |
---|
| 845 | value=nodeType, |
---|
| 846 | label='The type of node (collection or leaf)' |
---|
| 847 | ) |
---|
| 848 | ) |
---|
[1] | 849 | |
---|
[25] | 850 | for metaDatum in metaData: |
---|
[29] | 851 | form.addField(data_form.Field.fromDict(metaDatum)) |
---|
[1] | 852 | |
---|
[52] | 853 | info.append(form) |
---|
[1] | 854 | |
---|
[59] | 855 | return info |
---|
| 856 | |
---|
| 857 | info = [] |
---|
| 858 | |
---|
| 859 | request = PubSubRequest('discoInfo') |
---|
| 860 | |
---|
| 861 | if self.resource is not None: |
---|
| 862 | resource = self.resource.locateResource(request) |
---|
| 863 | identity = resource.discoIdentity |
---|
| 864 | features = resource.features |
---|
| 865 | getInfo = resource.getInfo |
---|
| 866 | else: |
---|
| 867 | category, idType, name = self.discoIdentity |
---|
| 868 | identity = disco.DiscoIdentity(category, idType, name) |
---|
| 869 | features = self.pubSubFeatures |
---|
| 870 | getInfo = self.getNodeInfo |
---|
| 871 | |
---|
| 872 | if not nodeIdentifier: |
---|
| 873 | info.append(identity) |
---|
| 874 | info.append(disco.DiscoFeature(disco.NS_DISCO_ITEMS)) |
---|
| 875 | info.extend([disco.DiscoFeature("%s#%s" % (NS_PUBSUB, feature)) |
---|
| 876 | for feature in features]) |
---|
| 877 | |
---|
| 878 | d = getInfo(requestor, target, nodeIdentifier or '') |
---|
| 879 | d.addCallback(toInfo, info) |
---|
| 880 | d.addErrback(log.err) |
---|
[25] | 881 | return d |
---|
| 882 | |
---|
[1] | 883 | |
---|
[6] | 884 | def getDiscoItems(self, requestor, target, nodeIdentifier): |
---|
[59] | 885 | if self.hideNodes: |
---|
| 886 | d = defer.succeed([]) |
---|
| 887 | elif self.resource is not None: |
---|
| 888 | request = PubSubRequest('discoInfo') |
---|
| 889 | resource = self.resource.locateResource(request) |
---|
| 890 | d = resource.getNodes(requestor, target, nodeIdentifier) |
---|
| 891 | elif nodeIdentifier: |
---|
| 892 | d = self.getNodes(requestor, target) |
---|
| 893 | else: |
---|
| 894 | d = defer.succeed([]) |
---|
| 895 | |
---|
[1] | 896 | d.addCallback(lambda nodes: [disco.DiscoItem(target, node) |
---|
| 897 | for node in nodes]) |
---|
| 898 | return d |
---|
| 899 | |
---|
[25] | 900 | |
---|
[57] | 901 | def _onPubSubRequest(self, iq): |
---|
| 902 | request = PubSubRequest.fromElement(iq) |
---|
[25] | 903 | |
---|
[59] | 904 | if self.resource is not None: |
---|
| 905 | resource = self.resource.locateResource(request) |
---|
| 906 | else: |
---|
| 907 | resource = self |
---|
[25] | 908 | |
---|
[59] | 909 | # Preprocess the request, knowing the handling resource |
---|
| 910 | try: |
---|
| 911 | preProcessor = getattr(self, '_preProcess_%s' % request.verb) |
---|
| 912 | except AttributeError: |
---|
| 913 | pass |
---|
| 914 | else: |
---|
| 915 | request = preProcessor(resource, request) |
---|
| 916 | if request is None: |
---|
| 917 | return defer.succeed(None) |
---|
[25] | 918 | |
---|
[59] | 919 | # Process the request itself, |
---|
| 920 | if resource is not self: |
---|
| 921 | try: |
---|
| 922 | handler = getattr(resource, request.verb) |
---|
| 923 | except AttributeError: |
---|
| 924 | # fix lookup feature |
---|
| 925 | text = "Request verb: %s" % request.verb |
---|
| 926 | return defer.fail(Unsupported('', text)) |
---|
[25] | 927 | |
---|
[59] | 928 | d = handler(request) |
---|
| 929 | else: |
---|
| 930 | handlerName, argNames = self._legacyHandlers[request.verb] |
---|
| 931 | handler = getattr(self, handlerName) |
---|
| 932 | args = [getattr(request, arg) for arg in argNames] |
---|
[81] | 933 | if 'options' in argNames: |
---|
| 934 | args[argNames.index('options')] = request.options.getValues() |
---|
[59] | 935 | d = handler(*args) |
---|
[1] | 936 | |
---|
[59] | 937 | # If needed, translate the result into a response |
---|
| 938 | try: |
---|
| 939 | cb = getattr(self, '_toResponse_%s' % request.verb) |
---|
| 940 | except AttributeError: |
---|
| 941 | pass |
---|
| 942 | else: |
---|
| 943 | d.addCallback(cb, resource, request) |
---|
[1] | 944 | |
---|
| 945 | return d |
---|
| 946 | |
---|
[25] | 947 | |
---|
[59] | 948 | def _toResponse_subscribe(self, result, resource, request): |
---|
| 949 | response = domish.Element((NS_PUBSUB, "pubsub")) |
---|
| 950 | subscription = response.addElement("subscription") |
---|
| 951 | if result.nodeIdentifier: |
---|
| 952 | subscription["node"] = result.nodeIdentifier |
---|
| 953 | subscription["jid"] = result.subscriber.full() |
---|
| 954 | subscription["subscription"] = result.state |
---|
| 955 | return response |
---|
[1] | 956 | |
---|
| 957 | |
---|
[59] | 958 | def _toResponse_subscriptions(self, result, resource, request): |
---|
| 959 | response = domish.Element((NS_PUBSUB, 'pubsub')) |
---|
| 960 | subscriptions = response.addElement('subscriptions') |
---|
| 961 | for subscription in result: |
---|
| 962 | item = subscriptions.addElement('subscription') |
---|
| 963 | item['node'] = subscription.nodeIdentifier |
---|
| 964 | item['jid'] = subscription.subscriber.full() |
---|
| 965 | item['subscription'] = subscription.state |
---|
| 966 | return response |
---|
[1] | 967 | |
---|
[25] | 968 | |
---|
[59] | 969 | def _toResponse_affiliations(self, result, resource, request): |
---|
| 970 | response = domish.Element((NS_PUBSUB, 'pubsub')) |
---|
| 971 | affiliations = response.addElement('affiliations') |
---|
[1] | 972 | |
---|
[59] | 973 | for nodeIdentifier, affiliation in result: |
---|
| 974 | item = affiliations.addElement('affiliation') |
---|
| 975 | item['node'] = nodeIdentifier |
---|
| 976 | item['affiliation'] = affiliation |
---|
[25] | 977 | |
---|
[59] | 978 | return response |
---|
[1] | 979 | |
---|
[59] | 980 | |
---|
| 981 | def _toResponse_create(self, result, resource, request): |
---|
| 982 | if not request.nodeIdentifier or request.nodeIdentifier != result: |
---|
[1] | 983 | response = domish.Element((NS_PUBSUB, 'pubsub')) |
---|
[59] | 984 | create = response.addElement('create') |
---|
| 985 | create['node'] = result |
---|
[1] | 986 | return response |
---|
[59] | 987 | else: |
---|
| 988 | return None |
---|
[1] | 989 | |
---|
| 990 | |
---|
[59] | 991 | def _formFromConfiguration(self, resource, values): |
---|
[79] | 992 | fieldDefs = resource.getConfigurationOptions() |
---|
[25] | 993 | form = data_form.Form(formType="form", |
---|
[79] | 994 | formNamespace=NS_PUBSUB_NODE_CONFIG) |
---|
| 995 | form.makeFields(values, fieldDefs) |
---|
[1] | 996 | return form |
---|
| 997 | |
---|
[57] | 998 | |
---|
[81] | 999 | def _checkConfiguration(self, resource, form): |
---|
| 1000 | fieldDefs = resource.getConfigurationOptions() |
---|
| 1001 | form.typeCheck(fieldDefs, filterUnknown=True) |
---|
[29] | 1002 | |
---|
[25] | 1003 | |
---|
[59] | 1004 | def _preProcess_default(self, resource, request): |
---|
| 1005 | if request.nodeType not in ('leaf', 'collection'): |
---|
| 1006 | raise error.StanzaError('not-acceptable') |
---|
| 1007 | else: |
---|
| 1008 | return request |
---|
[1] | 1009 | |
---|
| 1010 | |
---|
[59] | 1011 | def _toResponse_default(self, options, resource, request): |
---|
| 1012 | response = domish.Element((NS_PUBSUB_OWNER, "pubsub")) |
---|
| 1013 | default = response.addElement("default") |
---|
| 1014 | form = self._formFromConfiguration(resource, options) |
---|
| 1015 | default.addChild(form.toElement()) |
---|
| 1016 | return response |
---|
[30] | 1017 | |
---|
[1] | 1018 | |
---|
[59] | 1019 | def _toResponse_configureGet(self, options, resource, request): |
---|
| 1020 | response = domish.Element((NS_PUBSUB_OWNER, "pubsub")) |
---|
| 1021 | configure = response.addElement("configure") |
---|
| 1022 | form = self._formFromConfiguration(resource, options) |
---|
| 1023 | configure.addChild(form.toElement()) |
---|
[25] | 1024 | |
---|
[59] | 1025 | if request.nodeIdentifier: |
---|
| 1026 | configure["node"] = request.nodeIdentifier |
---|
[1] | 1027 | |
---|
[59] | 1028 | return response |
---|
[1] | 1029 | |
---|
| 1030 | |
---|
[59] | 1031 | def _preProcess_configureSet(self, resource, request): |
---|
[81] | 1032 | if request.options.formType == 'cancel': |
---|
| 1033 | return None |
---|
| 1034 | else: |
---|
| 1035 | self._checkConfiguration(resource, request.options) |
---|
[59] | 1036 | return request |
---|
[1] | 1037 | |
---|
| 1038 | |
---|
[59] | 1039 | def _toResponse_items(self, result, resource, request): |
---|
| 1040 | response = domish.Element((NS_PUBSUB, 'pubsub')) |
---|
| 1041 | items = response.addElement('items') |
---|
| 1042 | items["node"] = request.nodeIdentifier |
---|
[1] | 1043 | |
---|
[59] | 1044 | for item in result: |
---|
| 1045 | items.addChild(item) |
---|
[1] | 1046 | |
---|
[59] | 1047 | return response |
---|
[1] | 1048 | |
---|
| 1049 | |
---|
[30] | 1050 | def _createNotification(self, eventType, service, nodeIdentifier, |
---|
| 1051 | subscriber, subscriptions=None): |
---|
| 1052 | headers = [] |
---|
| 1053 | |
---|
| 1054 | if subscriptions: |
---|
| 1055 | for subscription in subscriptions: |
---|
| 1056 | if nodeIdentifier != subscription.nodeIdentifier: |
---|
| 1057 | headers.append(('Collection', subscription.nodeIdentifier)) |
---|
| 1058 | |
---|
| 1059 | message = domish.Element((None, "message")) |
---|
| 1060 | message["from"] = service.full() |
---|
| 1061 | message["to"] = subscriber.full() |
---|
| 1062 | event = message.addElement((NS_PUBSUB_EVENT, "event")) |
---|
| 1063 | |
---|
| 1064 | element = event.addElement(eventType) |
---|
| 1065 | element["node"] = nodeIdentifier |
---|
| 1066 | |
---|
| 1067 | if headers: |
---|
| 1068 | message.addChild(shim.Headers(headers)) |
---|
| 1069 | |
---|
| 1070 | return message |
---|
| 1071 | |
---|
[59] | 1072 | # public methods |
---|
| 1073 | |
---|
[6] | 1074 | def notifyPublish(self, service, nodeIdentifier, notifications): |
---|
[30] | 1075 | for subscriber, subscriptions, items in notifications: |
---|
| 1076 | message = self._createNotification('items', service, |
---|
| 1077 | nodeIdentifier, subscriber, |
---|
| 1078 | subscriptions) |
---|
| 1079 | message.event.items.children = items |
---|
[1] | 1080 | self.send(message) |
---|
| 1081 | |
---|
[25] | 1082 | |
---|
[43] | 1083 | def notifyDelete(self, service, nodeIdentifier, subscribers, |
---|
| 1084 | redirectURI=None): |
---|
| 1085 | for subscriber in subscribers: |
---|
[30] | 1086 | message = self._createNotification('delete', service, |
---|
| 1087 | nodeIdentifier, |
---|
[43] | 1088 | subscriber) |
---|
| 1089 | if redirectURI: |
---|
| 1090 | redirect = message.event.delete.addElement('redirect') |
---|
| 1091 | redirect['uri'] = redirectURI |
---|
[15] | 1092 | self.send(message) |
---|
| 1093 | |
---|
[25] | 1094 | |
---|
[6] | 1095 | def getNodeInfo(self, requestor, service, nodeIdentifier): |
---|
[1] | 1096 | return None |
---|
| 1097 | |
---|
[25] | 1098 | |
---|
[6] | 1099 | def getNodes(self, requestor, service): |
---|
[1] | 1100 | return [] |
---|
| 1101 | |
---|
[25] | 1102 | |
---|
[6] | 1103 | def publish(self, requestor, service, nodeIdentifier, items): |
---|
[1] | 1104 | raise Unsupported('publish') |
---|
| 1105 | |
---|
[25] | 1106 | |
---|
[6] | 1107 | def subscribe(self, requestor, service, nodeIdentifier, subscriber): |
---|
[1] | 1108 | raise Unsupported('subscribe') |
---|
| 1109 | |
---|
[25] | 1110 | |
---|
[6] | 1111 | def unsubscribe(self, requestor, service, nodeIdentifier, subscriber): |
---|
[1] | 1112 | raise Unsupported('subscribe') |
---|
| 1113 | |
---|
[25] | 1114 | |
---|
[6] | 1115 | def subscriptions(self, requestor, service): |
---|
[1] | 1116 | raise Unsupported('retrieve-subscriptions') |
---|
| 1117 | |
---|
[25] | 1118 | |
---|
[6] | 1119 | def affiliations(self, requestor, service): |
---|
[1] | 1120 | raise Unsupported('retrieve-affiliations') |
---|
| 1121 | |
---|
[25] | 1122 | |
---|
[6] | 1123 | def create(self, requestor, service, nodeIdentifier): |
---|
[1] | 1124 | raise Unsupported('create-nodes') |
---|
| 1125 | |
---|
[25] | 1126 | |
---|
| 1127 | def getConfigurationOptions(self): |
---|
| 1128 | return {} |
---|
| 1129 | |
---|
| 1130 | |
---|
[43] | 1131 | def getDefaultConfiguration(self, requestor, service, nodeType): |
---|
[1] | 1132 | raise Unsupported('retrieve-default') |
---|
| 1133 | |
---|
[25] | 1134 | |
---|
[6] | 1135 | def getConfiguration(self, requestor, service, nodeIdentifier): |
---|
[1] | 1136 | raise Unsupported('config-node') |
---|
| 1137 | |
---|
[25] | 1138 | |
---|
[6] | 1139 | def setConfiguration(self, requestor, service, nodeIdentifier, options): |
---|
[1] | 1140 | raise Unsupported('config-node') |
---|
| 1141 | |
---|
[25] | 1142 | |
---|
[6] | 1143 | def items(self, requestor, service, nodeIdentifier, maxItems, |
---|
| 1144 | itemIdentifiers): |
---|
[1] | 1145 | raise Unsupported('retrieve-items') |
---|
| 1146 | |
---|
[25] | 1147 | |
---|
[6] | 1148 | def retract(self, requestor, service, nodeIdentifier, itemIdentifiers): |
---|
[1] | 1149 | raise Unsupported('retract-items') |
---|
| 1150 | |
---|
[25] | 1151 | |
---|
[6] | 1152 | def purge(self, requestor, service, nodeIdentifier): |
---|
[1] | 1153 | raise Unsupported('purge-nodes') |
---|
| 1154 | |
---|
[25] | 1155 | |
---|
[6] | 1156 | def delete(self, requestor, service, nodeIdentifier): |
---|
[1] | 1157 | raise Unsupported('delete-nodes') |
---|
[59] | 1158 | |
---|
| 1159 | |
---|
| 1160 | |
---|
| 1161 | class PubSubResource(object): |
---|
| 1162 | |
---|
| 1163 | implements(IPubSubResource) |
---|
| 1164 | |
---|
| 1165 | features = [] |
---|
| 1166 | discoIdentity = disco.DiscoIdentity('pubsub', |
---|
| 1167 | 'service', |
---|
| 1168 | 'Publish-Subscribe Service') |
---|
| 1169 | |
---|
| 1170 | |
---|
| 1171 | def locateResource(self, request): |
---|
| 1172 | return self |
---|
| 1173 | |
---|
| 1174 | |
---|
| 1175 | def getInfo(self, requestor, service, nodeIdentifier): |
---|
| 1176 | return defer.succeed(None) |
---|
| 1177 | |
---|
| 1178 | |
---|
| 1179 | def getNodes(self, requestor, service, nodeIdentifier): |
---|
| 1180 | return defer.succeed([]) |
---|
| 1181 | |
---|
| 1182 | |
---|
| 1183 | def getConfigurationOptions(self): |
---|
| 1184 | return {} |
---|
| 1185 | |
---|
| 1186 | |
---|
| 1187 | def publish(self, request): |
---|
| 1188 | return defer.fail(Unsupported('publish')) |
---|
| 1189 | |
---|
| 1190 | |
---|
| 1191 | def subscribe(self, request): |
---|
| 1192 | return defer.fail(Unsupported('subscribe')) |
---|
| 1193 | |
---|
| 1194 | |
---|
| 1195 | def unsubscribe(self, request): |
---|
| 1196 | return defer.fail(Unsupported('subscribe')) |
---|
| 1197 | |
---|
| 1198 | |
---|
| 1199 | def subscriptions(self, request): |
---|
| 1200 | return defer.fail(Unsupported('retrieve-subscriptions')) |
---|
| 1201 | |
---|
| 1202 | |
---|
| 1203 | def affiliations(self, request): |
---|
| 1204 | return defer.fail(Unsupported('retrieve-affiliations')) |
---|
| 1205 | |
---|
| 1206 | |
---|
| 1207 | def create(self, request): |
---|
| 1208 | return defer.fail(Unsupported('create-nodes')) |
---|
| 1209 | |
---|
| 1210 | |
---|
| 1211 | def default(self, request): |
---|
| 1212 | return defer.fail(Unsupported('retrieve-default')) |
---|
| 1213 | |
---|
| 1214 | |
---|
| 1215 | def configureGet(self, request): |
---|
| 1216 | return defer.fail(Unsupported('config-node')) |
---|
| 1217 | |
---|
| 1218 | |
---|
| 1219 | def configureSet(self, request): |
---|
| 1220 | return defer.fail(Unsupported('config-node')) |
---|
| 1221 | |
---|
| 1222 | |
---|
| 1223 | def items(self, request): |
---|
| 1224 | return defer.fail(Unsupported('retrieve-items')) |
---|
| 1225 | |
---|
| 1226 | |
---|
| 1227 | def retract(self, request): |
---|
| 1228 | return defer.fail(Unsupported('retract-items')) |
---|
| 1229 | |
---|
| 1230 | |
---|
| 1231 | def purge(self, request): |
---|
| 1232 | return defer.fail(Unsupported('purge-nodes')) |
---|
| 1233 | |
---|
| 1234 | |
---|
| 1235 | def delete(self, request): |
---|
| 1236 | return defer.fail(Unsupported('delete-nodes')) |
---|
| 1237 | |
---|
| 1238 | |
---|
| 1239 | def affiliationsGet(self, request): |
---|
| 1240 | return defer.fail(Unsupported('modify-affiliations')) |
---|
| 1241 | |
---|
| 1242 | |
---|
| 1243 | def affiliationsSet(self, request): |
---|
| 1244 | return defer.fail(Unsupported('modify-affiliations')) |
---|
| 1245 | |
---|
| 1246 | |
---|
| 1247 | def subscriptionsGet(self, request): |
---|
| 1248 | return defer.fail(Unsupported('manage-subscriptions')) |
---|
| 1249 | |
---|
| 1250 | |
---|
| 1251 | def subscriptionsSet(self, request): |
---|
| 1252 | return defer.fail(Unsupported('manage-subscriptions')) |
---|