source: ralphm-patches/xmpp_client_service.patch @ 52:a6ed3b9703cb

Last change on this file since 52:a6ed3b9703cb was 52:a6ed3b9703cb, checked in by Ralph Meijer <ralphm@…>, 9 years ago

Bring client service patch in line with API changes in switch to wokkel.im.

File size: 35.2 KB
  • new file doc/examples/client_service.tac

    diff -r 5c11baa0ef4c doc/examples/client_service.tac
    - +  
     1from twisted.application import service, strports
     2from twisted.internet import defer
     3
     4from wokkel import client, im
     5from wokkel.component import InternalComponent, Router
     6from wokkel.generic import FallbackHandler
     7from wokkel.ping import PingHandler
     8from wokkel.im import RosterItem
     9
     10from twisted.words.protocols.jabber.jid import internJID as JID
     11
     12import socket
     13domain = socket.gethostname()
     14
     15RALPHM = JID('ralphm@'+domain)
     16INTOSI = JID('intosi@'+domain)
     17TERMIE = JID('termie@'+domain)
     18
     19roster = {
     20    'ralphm': {
     21        INTOSI: RosterItem(INTOSI,
     22                           subscriptionTo=True,
     23                           subscriptionFrom=True,
     24                           name='Intosi',
     25                           groups=set(['Friends'])),
     26        TERMIE: RosterItem(TERMIE,
     27                           subscriptionTo=True,
     28                           subscriptionFrom=True,
     29                           name='termie'),
     30        },
     31    'termie': {
     32        RALPHM: RosterItem(RALPHM,
     33                           subscriptionTo=True,
     34                           subscriptionFrom=True,
     35                           name='ralphm'),
     36        }
     37    }
     38
     39accounts = set(roster.keys())
     40
     41
     42class StaticRoster(im.RosterServerProtocol):
     43
     44    def __init__(self, roster):
     45        im.RosterServerProtocol.__init__(self)
     46        self.roster = roster
     47
     48    def getRoster(self, request):
     49        user = request.sender.user
     50        return defer.succeed(self.roster[user].values())
     51
     52
     53
     54application = service.Application("Jabber server")
     55
     56router = Router()
     57component = InternalComponent(router, domain)
     58component.setServiceParent(application)
     59
     60sessionManager = client.SessionManager(domain, accounts)
     61sessionManager.setHandlerParent(component)
     62
     63im.AccountIQHandler(sessionManager).setHandlerParent(component)
     64im.AccountMessageHandler(sessionManager).setHandlerParent(component)
     65im.PresenceServerHandler(sessionManager, domain, roster).setHandlerParent(component)
     66FallbackHandler().setHandlerParent(component)
     67StaticRoster(roster).setHandlerParent(component)
     68PingHandler().setHandlerParent(component)
     69
     70c2sFactory = client.XMPPC2SServerFactory(sessionManager)
     71c2sFactory.logTraffic = True
     72c2sService = strports.service('5224', c2sFactory)
     73c2sService.setServiceParent(application)
     74
     75sessionManager.connectionManager = c2sFactory
  • wokkel/client.py

    diff -r 5c11baa0ef4c wokkel/client.py
    a b  
    1010that should probably eventually move there.
    1111"""
    1212
     13import base64
     14
    1315from twisted.application import service
    14 from twisted.internet import reactor
     16from twisted.internet import defer, reactor
    1517from twisted.names.srvconnect import SRVConnector
    16 from twisted.words.protocols.jabber import client, sasl, xmlstream
     18from twisted.python import log, randbytes
     19from twisted.words.protocols.jabber import client, error, sasl, xmlstream
     20from twisted.words.protocols.jabber.jid import JID, internJID
     21from twisted.words.xish import domish, utility
    1722
    1823from wokkel import generic
    19 from wokkel.subprotocols import StreamManager
     24from wokkel.compat import XmlStreamServerFactory
     25from wokkel.subprotocols import StreamManager, XMPPHandler
     26
     27NS_CLIENT = 'jabber:client'
     28
     29XPATH_ALL = "/*"
     30XPATH_AUTH = "/auth[@xmlns='%s']" % sasl.NS_XMPP_SASL
     31XPATH_BIND = "/iq[@type='set']/bind[@xmlns='%s']" % client.NS_XMPP_BIND
     32XPATH_SESSION = "/iq[@type='set']/session[@xmlns='%s']" % \
     33                client.NS_XMPP_SESSION
    2034
    2135class CheckAuthInitializer(object):
    2236    """
     
    5165    autentication.
    5266    """
    5367
    54     namespace = 'jabber:client'
     68    namespace = NS_CLIENT
    5569
    5670    def __init__(self, jid, password):
    5771        xmlstream.ConnectAuthenticator.__init__(self, jid.host)
     
    186200    c = XMPPClientConnector(reactor, domain, factory)
    187201    c.connect()
    188202    return factory.deferred
     203
     204
     205
     206class XMPPClientListenAuthenticator(xmlstream.ListenAuthenticator):
     207    namespace = NS_CLIENT
     208
     209    def __init__(self, service):
     210        self.service = service
     211        self.failureGrace = 3
     212        self.state = 'auth'
     213
     214
     215    def associateWithStream(self, xs):
     216        xmlstream.ListenAuthenticator.associateWithStream(self, xs)
     217        self.xmlstream.addObserver(XPATH_ALL, self.onElementFallback, -1)
     218
     219
     220    def onElementFallback(self, element):
     221        if element.handled:
     222            return
     223
     224        exc = error.StreamError('not-authorized')
     225        self.xmlstream.sendStreamError(exc)
     226
     227
     228    def streamStarted(self, rootElement):
     229        xmlstream.ListenAuthenticator.streamStarted(self, rootElement)
     230
     231        # check namespace
     232        #if self.xmlstream.namespace != self.namespace:
     233        #    self.xmlstream.namespace = self.namespace
     234        #    exc = error.StreamError('invalid-namespace')
     235        #    self.xmlstream.sendStreamError(exc)
     236        #    return
     237
     238        # TODO: check domain (self.service.domain)
     239
     240        self.xmlstream.sendHeader()
     241
     242        try:
     243            stateHandlerName = 'streamStarted_' + self.state
     244            stateHandler = getattr(self, stateHandlerName)
     245        except AttributeError:
     246            log.msg('streamStarted handler for', self.state, 'not found')
     247        else:
     248            stateHandler()
     249
     250
     251    def toState(self, state):
     252        self.state = state
     253        if state == 'initialized':
     254            self.xmlstream.removeObserver(XPATH_ALL, self.onElementFallback)
     255            self.xmlstream.addOnetimeObserver(XPATH_SESSION, self.onSession, 1)
     256            self.xmlstream.dispatch(self.xmlstream,
     257                                    xmlstream.STREAM_AUTHD_EVENT)
     258
     259
     260    def streamStarted_auth(self):
     261        features = domish.Element((xmlstream.NS_STREAMS, 'features'))
     262        features.addElement((sasl.NS_XMPP_SASL, 'mechanisms'))
     263        features.mechanisms.addElement('mechanism', content='PLAIN')
     264        self.xmlstream.send(features)
     265        self.xmlstream.addOnetimeObserver(XPATH_AUTH, self.onAuth)
     266
     267
     268    def onAuth(self, auth):
     269        auth.handled = True
     270
     271        if auth.getAttribute('mechanism') != 'PLAIN':
     272            failure = domish.Element((sasl.NS_XMPP_SASL, 'failure'))
     273            failure.addElement('invalid-mechanism')
     274            self.xmlstream.send(failure)
     275
     276            # Close stream on too many failing authentication attempts
     277            self.failureGrace -= 1
     278            if self.failureGrace == 0:
     279                self.xmlstream.sendFooter()
     280            else:
     281                self.xmlstream.addOnetimeObserver(XPATH_AUTH, self.onAuth)
     282
     283            return
     284
     285        initialResponse = base64.b64decode(unicode(auth))
     286        authzid, authcid, passwd = initialResponse.split('\x00')
     287
     288        # TODO: check passwd
     289
     290        # authenticated
     291
     292        self.username = authcid
     293
     294        success = domish.Element((sasl.NS_XMPP_SASL, 'success'))
     295        self.xmlstream.send(success)
     296        self.xmlstream.reset()
     297
     298        self.toState('bind')
     299
     300
     301    def streamStarted_bind(self):
     302        features = domish.Element((xmlstream.NS_STREAMS, 'features'))
     303        features.addElement((client.NS_XMPP_BIND, 'bind'))
     304        features.addElement((client.NS_XMPP_SESSION, 'session'))
     305        self.xmlstream.send(features)
     306        self.xmlstream.addOnetimeObserver(XPATH_BIND, self.onBind)
     307
     308
     309    def onBind(self, iq):
     310        def cb(boundJID):
     311            self.xmlstream.otherEntity = boundJID
     312            self.toState('initialized')
     313
     314            response = xmlstream.toResponse(iq, 'result')
     315            response.addElement((client.NS_XMPP_BIND, 'bind'))
     316            response.bind.addElement((client.NS_XMPP_BIND, 'jid'),
     317                                  content=boundJID.full())
     318
     319            return response
     320
     321        def eb(failure):
     322            if not isinstance(failure, error.StanzaError):
     323                log.msg(failure)
     324                exc = error.StanzaError('internal-server-error')
     325            else:
     326                exc = failure.value
     327
     328            return exc.toResponse(iq)
     329
     330        iq.handled = True
     331        resource = unicode(iq.bind) or None
     332        d = self.service.bindResource(self.username,
     333                                      self.service.domain,
     334                                      resource)
     335        d.addCallback(cb)
     336        d.addErrback(eb)
     337        d.addCallback(self.xmlstream.send)
     338
     339
     340    def onSession(self, iq):
     341        iq.handled = True
     342
     343        reply = domish.Element((None, 'iq'))
     344        reply['type'] = 'result'
     345        if iq.getAttribute('id'):
     346            reply['id'] = iq['id']
     347        reply.addElement((client.NS_XMPP_SESSION, 'session'))
     348        self.xmlstream.send(reply)
     349
     350
     351
     352class RecipientUnavailable(Exception):
     353    """
     354    The addressed entity is not, or no longer, available.
     355    """
     356
     357
     358
     359class XMPPC2SServerFactory(XmlStreamServerFactory):
     360
     361    def __init__(self, service):
     362        self.service = service
     363
     364        def authenticatorFactory():
     365            return XMPPClientListenAuthenticator(service)
     366
     367        XmlStreamServerFactory.__init__(self, authenticatorFactory)
     368        self.addBootstrap(xmlstream.STREAM_CONNECTED_EVENT,
     369                          self.onConnectionMade)
     370        self.addBootstrap(xmlstream.STREAM_AUTHD_EVENT,
     371                          self.onAuthenticated)
     372
     373        self.serial = 0
     374        self.streams = {}
     375
     376
     377    def onConnectionMade(self, xs):
     378        """
     379        Called when a client-to-server connection was made.
     380
     381        This enables traffic debugging on incoming streams.
     382        """
     383        xs.serial = self.serial
     384        self.serial += 1
     385
     386        log.msg("Client connection %d made" % xs.serial)
     387
     388        def logDataIn(buf):
     389            log.msg("RECV (%d): %r" % (xs.serial, buf))
     390
     391        def logDataOut(buf):
     392            log.msg("SEND (%d): %r" % (xs.serial, buf))
     393
     394        if self.logTraffic:
     395            xs.rawDataInFn = logDataIn
     396            xs.rawDataOutFn = logDataOut
     397
     398        xs.addObserver(xmlstream.STREAM_ERROR_EVENT, self.onError)
     399
     400
     401    def onAuthenticated(self, xs):
     402        log.msg("Client connection %d authenticated" % xs.serial)
     403
     404        xs.addObserver(xmlstream.STREAM_END_EVENT, self.onConnectionLost,
     405                                                   0, xs)
     406        xs.addObserver('/*', self.onElement, 0, xs)
     407
     408        # Record this stream as bound to the authenticated JID
     409        self.streams[xs.otherEntity] = xs
     410
     411
     412    def onConnectionLost(self, xs, reason):
     413        log.msg("Client connection %d disconnected" % xs.serial)
     414
     415        entity = xs.otherEntity
     416        self.service.unbindResource(entity.user,
     417                                    entity.host,
     418                                    entity.resource,
     419                                    reason)
     420
     421        # If the lost connections had been bound, remove the reference
     422        if xs.otherEntity in self.streams:
     423            del self.streams[xs.otherEntity]
     424
     425
     426    def onError(self, reason):
     427        log.err(reason, "Stream Error")
     428
     429
     430    def onElement(self, xs, stanza):
     431        """
     432        Called when an element was received from one of the connected streams.
     433
     434        """
     435        if stanza.handled:
     436            return
     437        else:
     438            self.service.onElement(stanza, xs.otherEntity)
     439
     440
     441    def deliverStanza(self, element, recipient):
     442        if recipient in self.streams:
     443            self.streams[recipient].send(element)
     444        else:
     445            raise RecipientUnavailable(u"There is no connection for %s" %
     446                                       recipient.full())
     447
     448
     449class Session(object):
     450    def __init__(self, entity):
     451        self.entity = entity
     452        self.connected = False
     453        self.interested = False
     454        self.presence = None
     455
     456
     457
     458class SessionManager(XMPPHandler):
     459
     460    """
     461    Session Manager.
     462
     463    @ivar xmlstream: XML Stream to inject incoming stanzas from client
     464        connections into. Stanzas where the C{'to'} attribute is not set
     465        or is directed at the local domain are injected as if received on
     466        the XML Stream (using C{dispatch}), other stanzas are injected as if
     467        they were sent from the XML Stream (using C{send}).
     468    """
     469
     470    def __init__(self, domain, accounts):
     471        XMPPHandler.__init__(self)
     472        self.domain = domain
     473        self.accounts = accounts
     474
     475        self.connectionManager = None
     476        self.sessions = {}
     477        self.clientStream = utility.EventDispatcher()
     478        self.clientStream.addObserver('/*', self.routeOrDeliver, -1)
     479
     480
     481    def bindResource(self, localpart, domain, resource):
     482        if domain != self.domain:
     483            raise Exception("I don't host this domain!")
     484
     485        try:
     486            userSessions = self.sessions[localpart]
     487        except KeyError:
     488            userSessions = self.sessions[localpart] = {}
     489
     490        if resource is None:
     491            resource = randbytes.secureRandom(8).encode('hex')
     492        elif resource in self.userSessions:
     493            resource = resource + ' ' + randbytes.secureRandom(8).encode('hex')
     494
     495        entity = JID(tuple=(localpart, domain, resource))
     496        session = Session(entity)
     497        session.connected = True
     498        userSessions[resource] = session
     499
     500        return defer.succeed(entity)
     501
     502
     503    def unbindResource(self, localpart, domain, resource, reason=None):
     504        try:
     505            session = self.sessions[localpart][resource]
     506        except KeyError:
     507            pass
     508        else:
     509            session.connected = False
     510            del self.sessions[localpart][resource]
     511            if not self.sessions[localpart]:
     512                del self.sessions[localpart]
     513
     514        return defer.succeed(None)
     515
     516
     517    def onElement(self, element, sender):
     518        # Make sure each stanza has a sender address
     519        if (element.name == 'presence' and
     520            element.getAttribute('type') in ('subscribe', 'subscribed',
     521                                             'unsubscribe', 'unsubscribed')):
     522            element['from'] = sender.userhost()
     523        else:
     524            element['from'] = sender.full()
     525
     526        self.clientStream.dispatch(element)
     527
     528
     529    def routeOrDeliver(self, element):
     530        if element.handled:
     531            return
     532
     533        if (not element.hasAttribute('to') or
     534            internJID(element['to']).host == self.domain):
     535            # This stanza is for local delivery
     536            log.msg("Delivering locally: %r" % element.toXml())
     537            self.xmlstream.dispatch(element)
     538        else:
     539            # This stanza is for remote routing
     540            log.msg("Routing remotely: %r" % element.toXml())
     541            self.xmlstream.send(element)
     542
     543
     544    def deliverStanza(self, element, recipient):
     545        if self.connectionManager:
     546            self.connectionManager.deliverStanza(element, recipient)
     547        else:
     548            raise Exception("No connection manager set")
  • wokkel/component.py

    diff -r 5c11baa0ef4c wokkel/component.py
    a b  
    313313        """
    314314        destination = JID(stanza['to'])
    315315
    316         log.msg("Routing to %s: %r" % (destination.full(), stanza.toXml()))
    317316
    318317        if destination.host in self.routes:
     318            msg = "Routing to %s (default route): %r"
     319            log.msg("Routing to %s: %r" % (destination.full(),
     320                                           stanza.toXml()))
    319321            self.routes[destination.host].send(stanza)
     322        elif None in self.routes:
     323            log.msg("Routing to %s (default route): %r" % (destination.full(),
     324                                                           stanza.toXml()))
     325            self.routes[None].send(stanza)
    320326        else:
    321             self.routes[None].send(stanza)
     327            log.msg("No route to %s: %r" % (destination.full(),
     328                                            stanza.toXml()))
     329            if stanza.getAttribute('type') not in ('result', 'error'):
     330                # No route, send back error
     331                exc = error.StanzaError('remote-server-timeout', type='wait')
     332                exc.code = '504'
     333                response = exc.toResponse(stanza)
     334                self.route(response)
    322335
    323336
    324337
  • wokkel/im.py

    diff -r 5c11baa0ef4c wokkel/im.py
    a b  
    1010U{RFC 6121<http://www.xmpp.org/rfcs/rfc6121.html>} (XMPP IM).
    1111"""
    1212
     13import copy
     14
    1315from twisted.internet import defer
     16from twisted.python import log
    1417from twisted.words.protocols.jabber import jid
    1518from twisted.words.protocols.jabber import error
    1619from twisted.words.xish import domish
     
    168171        self.xmlstream.addObserver("/presence", self._onPresence)
    169172
    170173
    171     def _onPresence(self, element):
     174    def parsePresence(self, element):
     175        """
     176        Parse presence.
     177        """
    172178        stanza = Stanza.fromElement(element)
    173179
    174180        presenceType = stanza.stanzaType or 'available'
     
    178184        except KeyError:
    179185            return
    180186
    181         presence = parser.fromElement(element)
     187        return parser.fromElement(element)
     188
     189
     190    def _onPresence(self, element):
     191        presence = self.parsePresence(element)
     192        presenceType = presence.stanzaType or 'available'
    182193
    183194        try:
    184195            handler = getattr(self, '%sReceived' % presenceType)
    185196        except AttributeError:
    186197            return
    187198        else:
    188             handler(presence)
     199            element.handled = handler(presence)
    189200
    190201
    191202    def errorReceived(self, presence):
     
    553564
    554565
    555566
     567class AccountIQHandler(XMPPHandler):
     568
     569    def __init__(self, sessionManager):
     570        XMPPHandler.__init__(self)
     571        self.sessionManager = sessionManager
     572
     573
     574    def connectionMade(self):
     575        self.xmlstream.addObserver('/iq', self.onIQ, 1)
     576
     577
     578    def onIQ(self, iq):
     579        """
     580        Handler for iq stanzas to user accounts' connected resources.
     581
     582        If the recipient is a bare JID or there is no associated user, this
     583        handler ignores the stanza, so that other handlers have a chance
     584        to pick it up. If used, L{generic.FallbackHandler} will respond with a
     585        C{'service-unavailable'} stanza error if no other handlers handle
     586        the iq.
     587        """
     588
     589        if iq.handled:
     590            return
     591
     592        try:
     593            recipient = jid.internJID(iq['to'])
     594        except KeyError:
     595            return
     596
     597        if not recipient.user:
     598            # This is not for an account, ignore it
     599            return
     600        elif recipient.user not in self.sessionManager.accounts:
     601            # This is not a user, ignore it
     602            return
     603        elif not recipient.resource:
     604            # Bare JID at local domain, ignore it
     605            return
     606
     607        userSessions = self.sessionManager.sessions.get(recipient.user,
     608                                                        {})
     609        if recipient.resource in userSessions:
     610            self.sessionManager.deliverStanza(iq, recipient)
     611        else:
     612            # Full JID without connected resource, return error
     613            exc = error.StanzaError('service-unavailable')
     614            if iq['type'] in ('result', 'error'):
     615                log.err(exc, 'Could not deliver IQ response')
     616            else:
     617                self.send(exc.toResponse(iq))
     618
     619        iq.handled = True
     620
     621
     622
     623class AccountMessageHandler(XMPPHandler):
     624
     625    def __init__(self, sessionManager):
     626        XMPPHandler.__init__(self)
     627        self.sessionManager = sessionManager
     628
     629
     630    def connectionMade(self):
     631        self.xmlstream.addObserver('/message', self.onMessage, 1)
     632
     633
     634    def onMessage(self, message):
     635        """
     636        Handler for message stanzas to user accounts.
     637        """
     638
     639        if message.handled:
     640            return
     641
     642        try:
     643            recipient = jid.internJID(message['to'])
     644        except KeyError:
     645            return
     646
     647        stanzaType = message.getAttribute('type', 'normal')
     648
     649        try:
     650            if not recipient.user:
     651                # This is not for an account, ignore it
     652                return
     653            elif recipient.user not in self.sessionManager.accounts:
     654                # This is not a user, ignore it
     655                return
     656            elif recipient.resource:
     657                userSessions = self.sessionManager.sessions.get(recipient.user,
     658                                                                {})
     659                if recipient.resource in userSessions:
     660                    self.sessionManager.deliverStanza(message, recipient)
     661                else:
     662                    if stanzaType in ('normal', 'chat', 'headline'):
     663                        self.onMessageBareJID(message, recipient.userhostJID())
     664                    elif stanzaType == 'error':
     665                        log.msg("Dropping message to unconnected resource %r" %
     666                                recipient.full())
     667                    elif stanzaType == 'groupchat':
     668                        raise error.StanzaError('service-unavailable')
     669            else:
     670                self.onMessageBareJID(message, recipient)
     671        except error.StanzaError, exc:
     672            if stanzaType == 'error':
     673                log.err(exc, "Undeliverable error")
     674            else:
     675                self.send(exc.toResponse(message))
     676
     677        message.handled = True
     678
     679
     680    def onMessageBareJID(self, message, bareJID):
     681        stanzaType = message.getAttribute('type', 'normal')
     682
     683        userSessions = self.sessionManager.sessions.get(bareJID.user, {})
     684
     685        recipients = set()
     686
     687        if stanzaType == 'headline':
     688            for session in userSessions:
     689                if session.presence.priority >= 0:
     690                    recipients.add(session.entity)
     691        elif stanzaType in ('chat', 'normal'):
     692            priorities = {}
     693            for session in userSessions.itervalues():
     694                if not session.presence or not session.presence.available:
     695                    continue
     696                priority = session.presence.priority
     697                if priority >= 0:
     698                    priorities.setdefault(priority, set()).add(session.entity)
     699                maxPriority = max(priorities.keys())
     700                recipients.update(priorities[maxPriority])
     701        elif stanzaType == 'groupchat':
     702            raise error.StanzaError('service-unavailable')
     703
     704        if recipients:
     705            for recipient in recipients:
     706                self.sessionManager.deliverStanza(message, recipient)
     707        elif stanzaType in ('chat', 'normal'):
     708            raise error.StanzaError('service-unavailable')
     709        else:
     710            # silently discard
     711            log.msg("Discarding message to %r" % message['to'])
     712
     713
     714
     715
     716def clonePresence(presence):
     717    """
     718    Make a deep copy of a presence stanza.
     719
     720    The returned presence stanza is an orphaned deep copy of the given
     721    original.
     722
     723    @note: Since the reference to the original parent, if any, is gone,
     724    inherited attributes like C{xml:lang} are not preserved.
     725    """
     726    element = presence.element
     727
     728    parent = element.parent
     729    element.parent = None
     730    newElement = copy.deepcopy(element)
     731    element.parent = parent
     732    return newElement
     733
     734
     735
     736class PresenceServerHandler(PresenceProtocol):
     737
     738    def __init__(self, sessionManager, domain, roster):
     739        PresenceProtocol.__init__(self)
     740        self.sessionManager = sessionManager
     741        self.domain = domain
     742        self.roster = roster
     743        self.presences = {} # user -> resource -> presence
     744        self.offlinePresences = {} # user -> presence
     745        self.remotePresences = {} # user -> remote entity -> presence
     746
     747        self.sessionManager.clientStream.addObserver('/presence',
     748                                                     self._onPresenceOutbound)
     749
     750
     751    def _onPresenceOutbound(self, element):
     752        log.msg("Got outbound presence: %r" % element.toXml())
     753        presence = self.parsePresence(element)
     754
     755        presenceType = presence.stanzaType or 'available'
     756        method = '%sReceivedOutbound' % presenceType
     757        print method
     758
     759        try:
     760            handler = getattr(self, method)
     761        except AttributeError:
     762            return
     763        else:
     764            element.handled = handler(presence)
     765
     766
     767    def _broadcastToOtherResources(self, presence):
     768        """
     769        Broadcast presence to other available resources.
     770        """
     771        fromJID = presence.sender
     772        for otherResource in self.presences[fromJID.user]:
     773            if otherResource == fromJID.resource:
     774                continue
     775
     776            resourceJID = jid.JID(tuple=(fromJID.user,
     777                                         fromJID.host,
     778                                         otherResource))
     779            outPresence = clonePresence(presence)
     780            outPresence['to'] = resourceJID.full()
     781            self.sessionManager.deliverStanza(outPresence, resourceJID)
     782
     783
     784    def _broadcastToContacts(self, presence):
     785        """
     786        Broadcast presence to subscribed entities.
     787        """
     788        fromJID = presence.sender
     789        roster = self.roster[fromJID.user]
     790
     791        for item in roster.itervalues():
     792            if not item.subscriptionFrom:
     793                continue
     794
     795            outPresence = clonePresence(presence)
     796            outPresence['to'] = item.entity.full()
     797
     798            if item.entity.host == self.domain:
     799                # local contact
     800                if item.entity.user in self.presences:
     801                    # broadcast to contact's available resources
     802                    for itemResource in self.presences[item.entity.user]:
     803                        resourceJID = jid.JID(tuple=(item.entity.user,
     804                                                     item.entity.host,
     805                                                     itemResource))
     806                        self.sessionManager.deliverStanza(outPresence,
     807                                                          resourceJID)
     808            else:
     809                # remote contact
     810                self.send(outPresence)
     811
     812
     813    def _on_availableBroadcast(self, presence):
     814        fromJID = presence.sender
     815        user, resource = fromJID.user, fromJID.resource
     816        roster = self.roster[user]
     817
     818        if user not in self.presences:
     819            # initial presence
     820            self.presences[user] = {}
     821            self.remotePresences[user] = {}
     822
     823            # send out probes
     824            for item in roster.itervalues():
     825                if item.subscriptionTo and item.entity.host != self.domain:
     826                    self.probe(item.entity, fromJID)
     827        else:
     828            if resource not in self.presences[user]:
     829                # initial presence with another available resource
     830
     831                # send last known presences from remote contacts
     832                remotePresences = self.remotePresences[user]
     833                for entity, remotePresence in remotePresences.iteritems():
     834                    self.sessionManager.deliverStanza(remotePresence.element,
     835                                                      fromJID)
     836
     837            # send presence to other resources
     838            self._broadcastToOtherResources(presence)
     839
     840        # Send last known local presences
     841        if user not in self.presences or resource not in self.presences[user]:
     842            for item in roster.itervalues():
     843                if item.subscriptionTo and \
     844                   item.entity.host == self.domain and \
     845                   item.entity.user in self.presences:
     846                    for contactPresence in \
     847                            self.presences[item.entity.user].itervalues():
     848                        outPresence = clonePresence(contactPresence)
     849                        outPresence['to'] = fromJID.userhost()
     850                        self.sessionManager.deliverStanza(outPresence, fromJID)
     851
     852        # broadcast presence
     853        self._broadcastToContacts(presence)
     854
     855        # save presence
     856        self.presences[user][resource] = presence
     857        self.sessionManager.sessions[user][resource].presence = presence
     858
     859        return True
     860
     861
     862    def _on_availableDirected(self, presence):
     863        self.send(presence.element)
     864        return True
     865
     866
     867    def availableReceivedOutbound(self, presence):
     868        if presence.recipient:
     869            return self._on_availableDirected(presence)
     870        else:
     871            return self._on_availableBroadcast(presence)
     872
     873
     874    def availableReceived(self, presence):
     875        fromJID = presence.sender
     876        toJID = presence.recipient
     877
     878        if toJID.user not in self.roster:
     879            return False
     880
     881        if toJID.user in self.presences:
     882            for resource in self.presences[toJID.user]:
     883                resourceJID = jid.JID(tuple=(toJID.user,
     884                                             toJID.host,
     885                                             resource))
     886                self.sessionManager.deliverStanza(presence.element, resourceJID)
     887            self.remotePresences[toJID.user][fromJID] = presence
     888        else:
     889            # no such user or no available resource, ignore this stanza
     890            pass
     891
     892        return True
     893
     894
     895    def _on_unavailableBroadcast(self, presence):
     896        fromJID = presence.sender
     897        user, resource = fromJID.user, fromJID.resource
     898
     899        # broadcast presence
     900        self._broadcastToContacts(presence)
     901
     902        if user in self.presences:
     903            # send presence to other resources
     904            self._broadcastToOtherResources(presence)
     905
     906            # update stored presences
     907            if resource in self.presences[user]:
     908                del self.presences[user][resource]
     909
     910            if not self.presences[user]:
     911                # last resource to become unavailable
     912                del self.presences[user]
     913
     914                # TODO: save last unavailable presence
     915
     916        return True
     917
     918
     919    def _on_unavailableDirected(self, presence):
     920        self.send(presence.element)
     921        return True
     922
     923
     924    def unavailableReceivedOutbound(self, presence):
     925        if presence.recipient:
     926            return self._on_unavailableDirected(presence)
     927        else:
     928            return self._on_unavailableBroadcast(presence)
     929
     930#    def unavailableReceived(self, presence):
     931
     932
     933    def subscribedReceivedOutbound(self, presence):
     934        log.msg("%r subscribed %s to its presence" % (presence.sender,
     935                                                      presence.recipient))
     936        self.send(presence.element)
     937        return True
     938
     939
     940    def subscribedReceived(self, presence):
     941        log.msg("%r subscribed %s to its presence" % (presence.sender,
     942                                                      presence.recipient))
     943
     944
     945    def unsubscribedReceivedOutbound(self, presence):
     946        log.msg("%r unsubscribed %s from its presence" % (presence.sender,
     947                                                          presence.recipient))
     948        self.send(presence.element)
     949        return True
     950
     951
     952    def unsubscribedReceived(self, presence):
     953        log.msg("%r unsubscribed %s from its presence" % (presence.sender,
     954                                                          presence.recipient))
     955
     956
     957    def subscribeReceivedOutbound(self, presence):
     958        log.msg("%r requests subscription to %s" % (presence.sender,
     959                                                    presence.recipient))
     960        self.send(presence.element)
     961        return True
     962
     963
     964    def subscribeReceived(self, presence):
     965        log.msg("%r requests subscription to %s" % (presence.sender,
     966                                                    presence.recipient))
     967
     968
     969    def unsubscribeReceivedOutbound(self, presence):
     970        log.msg("%r requests unsubscription from %s" % (presence.sender,
     971                                                        presence.recipient))
     972        self.send(presence.element)
     973        return True
     974
     975
     976    def unsubscribeReceived(self, presence):
     977        log.msg("%r requests unsubscription from %s" % (presence.sender,
     978                                                        presence.recipient))
     979
     980
     981    def probeReceived(self, presence):
     982        fromJID = presence.sender
     983        toJID = presence.recipient
     984
     985        if toJID.user not in self.roster or \
     986           fromJID.userhost() not in self.roster[toJID.user] or \
     987           not self.roster[toJID.user][fromJID.userhost()].subscriptionFrom:
     988            # send unsubscribed
     989            pass
     990        elif toJID.user not in self.presences:
     991            # send last unavailable or nothing
     992            pass
     993        else:
     994            for resourcePresence in self.presences[toJID.user].itervalues():
     995                outPresence = clonePresence(resourcePresence)
     996                outPresence['to'] = fromJID.userhost()
     997                self.send(outPresence)
     998
     999
     1000
    5561001class RosterServerProtocol(XMPPHandler, IQHandlerMixin):
    5571002    """
    5581003    XMPP subprotocol handler for the roster, server side.
  • wokkel/test/test_im.py

    diff -r 5c11baa0ef4c wokkel/test/test_im.py
    a b  
    1313from twisted.words.xish import domish, utility
    1414
    1515from wokkel import im
    16 from wokkel.generic import ErrorStanza, parseXml
     16from wokkel.generic import ErrorStanza, Stanza, parseXml
    1717from wokkel.test.helpers import TestableRequestHandlerMixin, XmlStreamStub
    1818
    1919NS_XML = 'http://www.w3.org/XML/1998/namespace'
     
    846846
    847847
    848848
     849class AccountIQHandlerTest(unittest.TestCase):
     850    """
     851    Tests for L{im.AccountIQHandler}.
     852    """
     853
     854    def setUp(self):
     855        self.stub = XmlStreamStub()
     856        self.protocol = im.AccountIQHandler(None)
     857        self.protocol.makeConnection(self.stub.xmlstream)
     858        self.protocol.connectionInitialized()
     859
     860
     861    def test_onIQNotUser(self):
     862        """
     863        IQs to JIDs without local part are ignored.
     864        """
     865        xml = """
     866          <iq to='example.org'>
     867            <query xmlns='jabber:iq:version'/>
     868          </iq>
     869        """
     870
     871        iq = parseXml(xml)
     872        self.stub.send(iq)
     873
     874        self.assertFalse(getattr(iq, 'handled'))
     875
     876
     877
     878class AccountMessageHandlerTest(unittest.TestCase):
     879    """
     880    Tests for L{im.AccountMessageHandler}.
     881    """
     882
     883    def setUp(self):
     884        self.stub = XmlStreamStub()
     885        self.protocol = im.AccountMessageHandler(None)
     886        self.protocol.makeConnection(self.stub.xmlstream)
     887        self.protocol.connectionInitialized()
     888
     889
     890    def test_onMessageNotUser(self):
     891        """
     892        Messages to JIDs without local part are ignored.
     893        """
     894        xml = """
     895          <message to='example.org'>
     896            <body>Hello</body>
     897          </message>
     898        """
     899
     900        message = parseXml(xml)
     901        self.stub.send(message)
     902
     903        self.assertFalse(getattr(message, 'handled'))
     904
     905
     906
     907class ClonePresenceTest(unittest.TestCase):
     908    """
     909    Tests for L{im.clonePresence}.
     910    """
     911
     912    def test_rootElement(self):
     913        """
     914        The copied presence stanza is not identical, but renders identically.
     915        """
     916        originalElement = domish.Element((None, 'presence'))
     917        stanza = Stanza.fromElement(originalElement)
     918        copyElement = im.clonePresence(stanza)
     919
     920        self.assertNotIdentical(copyElement, originalElement)
     921        self.assertEquals(copyElement.toXml(), originalElement.toXml())
     922
     923
     924
    849925class RosterServerProtocolTest(unittest.TestCase, TestableRequestHandlerMixin):
    850926    """
    851927    Tests for L{im.RosterServerProtocol}.
Note: See TracBrowser for help on using the repository browser.