Changeset 100:e7199edcb6f5 for wokkel/subprotocols.py
- Timestamp:
- Aug 3, 2011, 9:49:21 AM (11 years ago)
- Branch:
- default
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
wokkel/subprotocols.py
r99 r100 157 157 @ivar _packetQueue: internal buffer of unsent data. See L{send} for details. 158 158 @type _packetQueue: L{list} 159 """ 159 @ivar timeout: Default IQ request timeout in seconds. 160 @type timeout: C{int} 161 @ivar _reactor: A provider of L{IReactorTime} to track timeouts. 162 """ 163 timeout = None 164 _reactor = None 160 165 161 166 logTraffic = False 162 167 163 def __init__(self, factory): 168 def __init__(self, factory, reactor=None): 169 """ 170 Construct a stream manager. 171 172 @param factory: The stream factory to connect with. 173 @param reactor: A provider of L{IReactorTime} to track timeouts. 174 If not provided, the global reactor will be used. 175 """ 164 176 XMPPHandlerCollection.__init__(self) 165 177 self.xmlstream = None … … 174 186 self.factory = factory 175 187 188 if reactor is None: 189 from twisted.internet import reactor 190 self._reactor = reactor 191 192 # Set up IQ response tracking 193 self._iqDeferreds = {} 194 176 195 177 196 def addHandler(self, handler): … … 223 242 C{connectionInitialized} method. 224 243 """ 244 245 xs.addObserver('/iq[@type="result"]', self._onIQResponse) 246 xs.addObserver('/iq[@type="error"]', self._onIQResponse) 247 225 248 # Flush all pending packets 226 249 for p in self._packetQueue: … … 269 292 e.connectionLost(reason) 270 293 294 # This errbacks all deferreds of iq's for which no response has 295 # been received with a L{ConnectionLost} failure. Otherwise, the 296 # deferreds will never be fired. 297 iqDeferreds = self._iqDeferreds 298 self._iqDeferreds = {} 299 for d in iqDeferreds.itervalues(): 300 d.errback(reason) 301 302 303 def _onIQResponse(self, iq): 304 """ 305 Handle iq response by firing associated deferred. 306 """ 307 try: 308 d = self._iqDeferreds[iq["id"]] 309 except KeyError: 310 return 311 312 del self._iqDeferreds[iq["id"]] 313 iq.handled = True 314 if iq['type'] == 'error': 315 d.errback(error.exceptionFromStanza(iq)) 316 else: 317 d.callback(iq) 318 271 319 272 320 def send(self, obj): … … 284 332 else: 285 333 self._packetQueue.append(obj) 334 335 336 def request(self, request): 337 """ 338 Send an IQ request and track the response. 339 340 A request is an IQ L{generic.Stanza} of type C{'get'} or C{'set'}. It 341 will have its C{toElement} called to render to a L{domish.Element} 342 which is then sent out over the current stream. If there is no such 343 stream (yet), it is queued and sent whenever a connection is 344 established and initialized, just like L{send}. 345 346 If the request doesn't have an identifier, it will be assigned a fresh 347 one, so the response can be tracked. 348 349 The deferred that is returned will fire with the L{domish.Element} 350 representation of the response if it is a result iq. If the response 351 is an error iq, a corresponding L{error.StanzaError} will be errbacked. 352 353 If the connection is closed before a response was received, the deferred 354 will be errbacked with the reason failure. 355 356 A request may also have a timeout, either by setting a default timeout 357 in L{StreamManager.timeout} or on the C{timeout} attribute of the 358 request. 359 360 @param request: The IQ request. 361 @type request: L{generic.Request} 362 """ 363 if (request.stanzaKind != 'iq' or 364 request.stanzaType not in ('get', 'set')): 365 return defer.fail(ValueError("Not a request")) 366 367 element = request.toElement() 368 369 # Make sure we have a trackable id on the stanza 370 if not request.stanzaID: 371 element.addUniqueId() 372 request.stanzaID = element['id'] 373 374 # Set up iq response tracking 375 d = defer.Deferred() 376 self._iqDeferreds[element['id']] = d 377 378 timeout = getattr(request, 'timeout', self.timeout) 379 380 if timeout is not None: 381 def onTimeout(): 382 del self._iqDeferreds[element['id']] 383 d.errback(xmlstream.TimeoutError("IQ timed out")) 384 385 call = self._reactor.callLater(timeout, onTimeout) 386 387 def cancelTimeout(result): 388 if call.active(): 389 call.cancel() 390 391 return result 392 393 d.addBoth(cancelTimeout) 394 self.send(element) 395 return d 286 396 287 397
Note: See TracChangeset
for help on using the changeset viewer.