Ignore:
Timestamp:
Aug 3, 2011, 9:49:21 AM (9 years ago)
Author:
Ralph Meijer <ralphm@…>
Branch:
default
Message:

Add iq request (set/get) tracking for StreamManager?.

This adds a new request method to StreamManager? to serialize an iq get or iq
set request object (using toElement), send it over the stream (or queue it
until the next time a stream is initialized) and track the response stanza by
returning a Deferred.

Requests may have a timeout, so that the deferred errbacks after the timeout
(counting from the moment of calling request) has expired.

This implementation is similar to the response tracking done in
twisted.words.protocols.jabber.xmlstream.IQ, but slighly different:

  • Requests can be queued until the connection is initialized, even before a XmlStream? object is available. I.e. it is tied to the StreamManager?, not the stream.
  • Response timeout starts counting from the moment of calling request, not from the moment request is sent over the wire.
File:
1 edited

Legend:

Unmodified
Added
Removed
  • wokkel/subprotocols.py

    r99 r100  
    157157    @ivar _packetQueue: internal buffer of unsent data. See L{send} for details.
    158158    @type _packetQueue: L{list}
    159     """
     159    @ivar timeout: Default IQ request timeout in seconds.
     160    @type timeout: C{int}
     161    @ivar _reactor: A provider of L{IReactorTime} to track timeouts.
     162    """
     163    timeout = None
     164    _reactor = None
    160165
    161166    logTraffic = False
    162167
    163     def __init__(self, factory):
     168    def __init__(self, factory, reactor=None):
     169        """
     170        Construct a stream manager.
     171
     172        @param factory: The stream factory to connect with.
     173        @param reactor: A provider of L{IReactorTime} to track timeouts.
     174            If not provided, the global reactor will be used.
     175        """
    164176        XMPPHandlerCollection.__init__(self)
    165177        self.xmlstream = None
     
    174186        self.factory = factory
    175187
     188        if reactor is None:
     189            from twisted.internet import reactor
     190        self._reactor = reactor
     191
     192        # Set up IQ response tracking
     193        self._iqDeferreds = {}
     194
    176195
    177196    def addHandler(self, handler):
     
    223242        C{connectionInitialized} method.
    224243        """
     244
     245        xs.addObserver('/iq[@type="result"]', self._onIQResponse)
     246        xs.addObserver('/iq[@type="error"]', self._onIQResponse)
     247
    225248        # Flush all pending packets
    226249        for p in self._packetQueue:
     
    269292            e.connectionLost(reason)
    270293
     294        # This errbacks all deferreds of iq's for which no response has
     295        # been received with a L{ConnectionLost} failure. Otherwise, the
     296        # deferreds will never be fired.
     297        iqDeferreds = self._iqDeferreds
     298        self._iqDeferreds = {}
     299        for d in iqDeferreds.itervalues():
     300            d.errback(reason)
     301
     302
     303    def _onIQResponse(self, iq):
     304        """
     305        Handle iq response by firing associated deferred.
     306        """
     307        try:
     308            d = self._iqDeferreds[iq["id"]]
     309        except KeyError:
     310            return
     311
     312        del self._iqDeferreds[iq["id"]]
     313        iq.handled = True
     314        if iq['type'] == 'error':
     315            d.errback(error.exceptionFromStanza(iq))
     316        else:
     317            d.callback(iq)
     318
    271319
    272320    def send(self, obj):
     
    284332        else:
    285333            self._packetQueue.append(obj)
     334
     335
     336    def request(self, request):
     337        """
     338        Send an IQ request and track the response.
     339
     340        A request is an IQ L{generic.Stanza} of type C{'get'} or C{'set'}. It
     341        will have its C{toElement} called to render to a L{domish.Element}
     342        which is then sent out over the current stream. If there is no such
     343        stream (yet), it is queued and sent whenever a connection is
     344        established and initialized, just like L{send}.
     345
     346        If the request doesn't have an identifier, it will be assigned a fresh
     347        one, so the response can be tracked.
     348
     349        The deferred that is returned will fire with the L{domish.Element}
     350        representation of the response if it is a result iq. If the response
     351        is an error iq, a corresponding L{error.StanzaError} will be errbacked.
     352
     353        If the connection is closed before a response was received, the deferred
     354        will be errbacked with the reason failure.
     355
     356        A request may also have a timeout, either by setting a default timeout
     357        in L{StreamManager.timeout} or on the C{timeout} attribute of the
     358        request.
     359
     360        @param request: The IQ request.
     361        @type request: L{generic.Request}
     362        """
     363        if (request.stanzaKind != 'iq' or
     364            request.stanzaType not in ('get', 'set')):
     365            return defer.fail(ValueError("Not a request"))
     366
     367        element = request.toElement()
     368
     369        # Make sure we have a trackable id on the stanza
     370        if not request.stanzaID:
     371            element.addUniqueId()
     372            request.stanzaID = element['id']
     373
     374        # Set up iq response tracking
     375        d = defer.Deferred()
     376        self._iqDeferreds[element['id']] = d
     377
     378        timeout = getattr(request, 'timeout', self.timeout)
     379
     380        if timeout is not None:
     381            def onTimeout():
     382                del self._iqDeferreds[element['id']]
     383                d.errback(xmlstream.TimeoutError("IQ timed out"))
     384
     385            call = self._reactor.callLater(timeout, onTimeout)
     386
     387            def cancelTimeout(result):
     388                if call.active():
     389                    call.cancel()
     390
     391                return result
     392
     393            d.addBoth(cancelTimeout)
     394        self.send(element)
     395        return d
    286396
    287397
Note: See TracChangeset for help on using the changeset viewer.