source: ralphm-patches/xmpp_client_service.patch @ 51:6edeb69e910c

Last change on this file since 51:6edeb69e910c was 51:6edeb69e910c, checked in by Ralph Meijer <ralphm@…>, 9 years ago

Redo RosterServerProtocol? with IQHandlerMixin, add tests.

File size: 32.2 KB
  • new file doc/examples/client_service.tac

    diff -r d7fa09914b70 doc/examples/client_service.tac
    - +  
     1from twisted.application import service, strports
     2from twisted.internet import defer
     3
     4from wokkel import client, xmppim
     5from wokkel.component import InternalComponent, Router
     6from wokkel.generic import FallbackHandler
     7from wokkel.ping import PingHandler
     8from wokkel.xmppim import RosterItem
     9
     10from twisted.words.protocols.jabber.jid import internJID as JID
     11
     12import socket
     13domain = socket.gethostname()
     14
     15RALPHM = JID('ralphm@'+domain)
     16INTOSI = JID('intosi@'+domain)
     17TERMIE = JID('termie@'+domain)
     18
     19roster = {
     20    'ralphm': {
     21        INTOSI: RosterItem(INTOSI,
     22                           subscriptionTo=True,
     23                           subscriptionFrom=True,
     24                           name='Intosi',
     25                           groups=set(['Friends'])),
     26        TERMIE: RosterItem(TERMIE,
     27                           subscriptionTo=True,
     28                           subscriptionFrom=True,
     29                           name='termie'),
     30        },
     31    'termie': {
     32        RALPHM: RosterItem(RALPHM,
     33                           subscriptionTo=True,
     34                           subscriptionFrom=True,
     35                           name='ralphm'),
     36        }
     37    }
     38
     39accounts = set(roster.keys())
     40
     41
     42class StaticRoster(xmppim.RosterServerProtocol):
     43
     44    def __init__(self, roster):
     45        xmppim.RosterServerProtocol.__init__(self)
     46        self.roster = roster
     47
     48    def getRoster(self, entity):
     49        return defer.succeed(self.roster[entity.user].values())
     50
     51
     52
     53application = service.Application("Jabber server")
     54
     55router = Router()
     56component = InternalComponent(router, domain)
     57component.setServiceParent(application)
     58
     59sessionManager = client.SessionManager(domain, accounts)
     60sessionManager.setHandlerParent(component)
     61
     62xmppim.AccountIQHandler(sessionManager).setHandlerParent(component)
     63xmppim.AccountMessageHandler(sessionManager).setHandlerParent(component)
     64xmppim.PresenceServerHandler(sessionManager, domain, roster).setHandlerParent(component)
     65FallbackHandler().setHandlerParent(component)
     66StaticRoster(roster).setHandlerParent(component)
     67PingHandler().setHandlerParent(component)
     68
     69c2sFactory = client.XMPPC2SServerFactory(sessionManager)
     70c2sFactory.logTraffic = True
     71c2sService = strports.service('5224', c2sFactory)
     72c2sService.setServiceParent(application)
     73
     74sessionManager.connectionManager = c2sFactory
  • wokkel/client.py

    diff -r d7fa09914b70 wokkel/client.py
    a b  
    1010that should probably eventually move there.
    1111"""
    1212
     13import base64
     14
    1315from twisted.application import service
    14 from twisted.internet import reactor
     16from twisted.internet import defer, reactor
    1517from twisted.names.srvconnect import SRVConnector
    16 from twisted.words.protocols.jabber import client, sasl, xmlstream
     18from twisted.python import log, randbytes
     19from twisted.words.protocols.jabber import client, error, sasl, xmlstream
     20from twisted.words.protocols.jabber.jid import JID, internJID
     21from twisted.words.xish import domish, utility
    1722
    1823from wokkel import generic
    19 from wokkel.subprotocols import StreamManager
     24from wokkel.compat import XmlStreamServerFactory
     25from wokkel.subprotocols import StreamManager, XMPPHandler
     26
     27NS_CLIENT = 'jabber:client'
     28
     29XPATH_ALL = "/*"
     30XPATH_AUTH = "/auth[@xmlns='%s']" % sasl.NS_XMPP_SASL
     31XPATH_BIND = "/iq[@type='set']/bind[@xmlns='%s']" % client.NS_XMPP_BIND
     32XPATH_SESSION = "/iq[@type='set']/session[@xmlns='%s']" % \
     33                client.NS_XMPP_SESSION
    2034
    2135class CheckAuthInitializer(object):
    2236    """
     
    5165    autentication.
    5266    """
    5367
    54     namespace = 'jabber:client'
     68    namespace = NS_CLIENT
    5569
    5670    def __init__(self, jid, password):
    5771        xmlstream.ConnectAuthenticator.__init__(self, jid.host)
     
    186200    c = XMPPClientConnector(reactor, domain, factory)
    187201    c.connect()
    188202    return factory.deferred
     203
     204
     205
     206class XMPPClientListenAuthenticator(xmlstream.ListenAuthenticator):
     207    namespace = NS_CLIENT
     208
     209    def __init__(self, service):
     210        self.service = service
     211        self.failureGrace = 3
     212        self.state = 'auth'
     213
     214
     215    def associateWithStream(self, xs):
     216        xmlstream.ListenAuthenticator.associateWithStream(self, xs)
     217        self.xmlstream.addObserver(XPATH_ALL, self.onElementFallback, -1)
     218
     219
     220    def onElementFallback(self, element):
     221        if element.handled:
     222            return
     223
     224        exc = error.StreamError('not-authorized')
     225        self.xmlstream.sendStreamError(exc)
     226
     227
     228    def streamStarted(self, rootElement):
     229        xmlstream.ListenAuthenticator.streamStarted(self, rootElement)
     230
     231        # check namespace
     232        #if self.xmlstream.namespace != self.namespace:
     233        #    self.xmlstream.namespace = self.namespace
     234        #    exc = error.StreamError('invalid-namespace')
     235        #    self.xmlstream.sendStreamError(exc)
     236        #    return
     237
     238        # TODO: check domain (self.service.domain)
     239
     240        self.xmlstream.sendHeader()
     241
     242        try:
     243            stateHandlerName = 'streamStarted_' + self.state
     244            stateHandler = getattr(self, stateHandlerName)
     245        except AttributeError:
     246            log.msg('streamStarted handler for', self.state, 'not found')
     247        else:
     248            stateHandler()
     249
     250
     251    def toState(self, state):
     252        self.state = state
     253        if state == 'initialized':
     254            self.xmlstream.removeObserver(XPATH_ALL, self.onElementFallback)
     255            self.xmlstream.addOnetimeObserver(XPATH_SESSION, self.onSession, 1)
     256            self.xmlstream.dispatch(self.xmlstream,
     257                                    xmlstream.STREAM_AUTHD_EVENT)
     258
     259
     260    def streamStarted_auth(self):
     261        features = domish.Element((xmlstream.NS_STREAMS, 'features'))
     262        features.addElement((sasl.NS_XMPP_SASL, 'mechanisms'))
     263        features.mechanisms.addElement('mechanism', content='PLAIN')
     264        self.xmlstream.send(features)
     265        self.xmlstream.addOnetimeObserver(XPATH_AUTH, self.onAuth)
     266
     267
     268    def onAuth(self, auth):
     269        auth.handled = True
     270
     271        if auth.getAttribute('mechanism') != 'PLAIN':
     272            failure = domish.Element((sasl.NS_XMPP_SASL, 'failure'))
     273            failure.addElement('invalid-mechanism')
     274            self.xmlstream.send(failure)
     275
     276            # Close stream on too many failing authentication attempts
     277            self.failureGrace -= 1
     278            if self.failureGrace == 0:
     279                self.xmlstream.sendFooter()
     280            else:
     281                self.xmlstream.addOnetimeObserver(XPATH_AUTH, self.onAuth)
     282
     283            return
     284
     285        initialResponse = base64.b64decode(unicode(auth))
     286        authzid, authcid, passwd = initialResponse.split('\x00')
     287
     288        # TODO: check passwd
     289
     290        # authenticated
     291
     292        self.username = authcid
     293
     294        success = domish.Element((sasl.NS_XMPP_SASL, 'success'))
     295        self.xmlstream.send(success)
     296        self.xmlstream.reset()
     297
     298        self.toState('bind')
     299
     300
     301    def streamStarted_bind(self):
     302        features = domish.Element((xmlstream.NS_STREAMS, 'features'))
     303        features.addElement((client.NS_XMPP_BIND, 'bind'))
     304        features.addElement((client.NS_XMPP_SESSION, 'session'))
     305        self.xmlstream.send(features)
     306        self.xmlstream.addOnetimeObserver(XPATH_BIND, self.onBind)
     307
     308
     309    def onBind(self, iq):
     310        def cb(boundJID):
     311            self.xmlstream.otherEntity = boundJID
     312            self.toState('initialized')
     313
     314            response = xmlstream.toResponse(iq, 'result')
     315            response.addElement((client.NS_XMPP_BIND, 'bind'))
     316            response.bind.addElement((client.NS_XMPP_BIND, 'jid'),
     317                                  content=boundJID.full())
     318
     319            return response
     320
     321        def eb(failure):
     322            if not isinstance(failure, error.StanzaError):
     323                log.msg(failure)
     324                exc = error.StanzaError('internal-server-error')
     325            else:
     326                exc = failure.value
     327
     328            return exc.toResponse(iq)
     329
     330        iq.handled = True
     331        resource = unicode(iq.bind) or None
     332        d = self.service.bindResource(self.username,
     333                                      self.service.domain,
     334                                      resource)
     335        d.addCallback(cb)
     336        d.addErrback(eb)
     337        d.addCallback(self.xmlstream.send)
     338
     339
     340    def onSession(self, iq):
     341        iq.handled = True
     342
     343        reply = domish.Element((None, 'iq'))
     344        reply['type'] = 'result'
     345        if iq.getAttribute('id'):
     346            reply['id'] = iq['id']
     347        reply.addElement((client.NS_XMPP_SESSION, 'session'))
     348        self.xmlstream.send(reply)
     349
     350
     351
     352class RecipientUnavailable(Exception):
     353    """
     354    The addressed entity is not, or no longer, available.
     355    """
     356
     357
     358
     359class XMPPC2SServerFactory(XmlStreamServerFactory):
     360
     361    def __init__(self, service):
     362        self.service = service
     363
     364        def authenticatorFactory():
     365            return XMPPClientListenAuthenticator(service)
     366
     367        XmlStreamServerFactory.__init__(self, authenticatorFactory)
     368        self.addBootstrap(xmlstream.STREAM_CONNECTED_EVENT,
     369                          self.onConnectionMade)
     370        self.addBootstrap(xmlstream.STREAM_AUTHD_EVENT,
     371                          self.onAuthenticated)
     372
     373        self.serial = 0
     374        self.streams = {}
     375
     376
     377    def onConnectionMade(self, xs):
     378        """
     379        Called when a client-to-server connection was made.
     380
     381        This enables traffic debugging on incoming streams.
     382        """
     383        xs.serial = self.serial
     384        self.serial += 1
     385
     386        log.msg("Client connection %d made" % xs.serial)
     387
     388        def logDataIn(buf):
     389            log.msg("RECV (%d): %r" % (xs.serial, buf))
     390
     391        def logDataOut(buf):
     392            log.msg("SEND (%d): %r" % (xs.serial, buf))
     393
     394        if self.logTraffic:
     395            xs.rawDataInFn = logDataIn
     396            xs.rawDataOutFn = logDataOut
     397
     398        xs.addObserver(xmlstream.STREAM_ERROR_EVENT, self.onError)
     399
     400
     401    def onAuthenticated(self, xs):
     402        log.msg("Client connection %d authenticated" % xs.serial)
     403
     404        xs.addObserver(xmlstream.STREAM_END_EVENT, self.onConnectionLost,
     405                                                   0, xs)
     406        xs.addObserver('/*', self.onElement, 0, xs)
     407
     408        # Record this stream as bound to the authenticated JID
     409        self.streams[xs.otherEntity] = xs
     410
     411
     412    def onConnectionLost(self, xs, reason):
     413        log.msg("Client connection %d disconnected" % xs.serial)
     414
     415        entity = xs.otherEntity
     416        self.service.unbindResource(entity.user,
     417                                    entity.host,
     418                                    entity.resource,
     419                                    reason)
     420
     421        # If the lost connections had been bound, remove the reference
     422        if xs.otherEntity in self.streams:
     423            del self.streams[xs.otherEntity]
     424
     425
     426    def onError(self, reason):
     427        log.err(reason, "Stream Error")
     428
     429
     430    def onElement(self, xs, stanza):
     431        """
     432        Called when an element was received from one of the connected streams.
     433
     434        """
     435        if stanza.handled:
     436            return
     437        else:
     438            self.service.onElement(stanza, xs.otherEntity)
     439
     440
     441    def deliverStanza(self, element, recipient):
     442        if recipient in self.streams:
     443            self.streams[recipient].send(element)
     444        else:
     445            raise RecipientUnavailable(u"There is no connection for %s" %
     446                                       recipient.full())
     447
     448
     449class Session(object):
     450    def __init__(self, entity):
     451        self.entity = entity
     452        self.connected = False
     453        self.interested = False
     454        self.presence = None
     455
     456
     457
     458class SessionManager(XMPPHandler):
     459
     460    """
     461    Session Manager.
     462
     463    @ivar xmlstream: XML Stream to inject incoming stanzas from client
     464        connections into. Stanzas where the C{'to'} attribute is not set
     465        or is directed at the local domain are injected as if received on
     466        the XML Stream (using C{dispatch}), other stanzas are injected as if
     467        they were sent from the XML Stream (using C{send}).
     468    """
     469
     470    def __init__(self, domain, accounts):
     471        XMPPHandler.__init__(self)
     472        self.domain = domain
     473        self.accounts = accounts
     474
     475        self.connectionManager = None
     476        self.sessions = {}
     477        self.clientStream = utility.EventDispatcher()
     478        self.clientStream.addObserver('/*', self.routeOrDeliver, -1)
     479
     480
     481    def bindResource(self, localpart, domain, resource):
     482        if domain != self.domain:
     483            raise Exception("I don't host this domain!")
     484
     485        try:
     486            userSessions = self.sessions[localpart]
     487        except KeyError:
     488            userSessions = self.sessions[localpart] = {}
     489
     490        if resource is None:
     491            resource = randbytes.secureRandom(8).encode('hex')
     492        elif resource in self.userSessions:
     493            resource = resource + ' ' + randbytes.secureRandom(8).encode('hex')
     494
     495        entity = JID(tuple=(localpart, domain, resource))
     496        session = Session(entity)
     497        session.connected = True
     498        userSessions[resource] = session
     499
     500        return defer.succeed(entity)
     501
     502
     503    def unbindResource(self, localpart, domain, resource, reason=None):
     504        try:
     505            session = self.sessions[localpart][resource]
     506        except KeyError:
     507            pass
     508        else:
     509            session.connected = False
     510            del self.sessions[localpart][resource]
     511            if not self.sessions[localpart]:
     512                del self.sessions[localpart]
     513
     514        return defer.succeed(None)
     515
     516
     517    def onElement(self, element, sender):
     518        # Make sure each stanza has a sender address
     519        if (element.name == 'presence' and
     520            element.getAttribute('type') in ('subscribe', 'subscribed',
     521                                             'unsubscribe', 'unsubscribed')):
     522            element['from'] = sender.userhost()
     523        else:
     524            element['from'] = sender.full()
     525
     526        self.clientStream.dispatch(element)
     527
     528
     529    def routeOrDeliver(self, element):
     530        if element.handled:
     531            return
     532
     533        if (not element.hasAttribute('to') or
     534            internJID(element['to']).host == self.domain):
     535            # This stanza is for local delivery
     536            log.msg("Delivering locally: %r" % element.toXml())
     537            self.xmlstream.dispatch(element)
     538        else:
     539            # This stanza is for remote routing
     540            log.msg("Routing remotely: %r" % element.toXml())
     541            self.xmlstream.send(element)
     542
     543
     544    def deliverStanza(self, element, recipient):
     545        if self.connectionManager:
     546            self.connectionManager.deliverStanza(element, recipient)
     547        else:
     548            raise Exception("No connection manager set")
  • wokkel/component.py

    diff -r d7fa09914b70 wokkel/component.py
    a b  
    313313        """
    314314        destination = JID(stanza['to'])
    315315
    316         log.msg("Routing to %s: %r" % (destination.full(), stanza.toXml()))
    317316
    318317        if destination.host in self.routes:
     318            msg = "Routing to %s (default route): %r"
     319            log.msg("Routing to %s: %r" % (destination.full(),
     320                                           stanza.toXml()))
    319321            self.routes[destination.host].send(stanza)
     322        elif None in self.routes:
     323            log.msg("Routing to %s (default route): %r" % (destination.full(),
     324                                                           stanza.toXml()))
     325            self.routes[None].send(stanza)
    320326        else:
    321             self.routes[None].send(stanza)
     327            log.msg("No route to %s: %r" % (destination.full(),
     328                                            stanza.toXml()))
     329            if stanza.getAttribute('type') not in ('result', 'error'):
     330                # No route, send back error
     331                exc = error.StanzaError('remote-server-timeout', type='wait')
     332                exc.code = '504'
     333                response = exc.toResponse(stanza)
     334                self.route(response)
    322335
    323336
    324337
  • wokkel/im.py

    diff -r d7fa09914b70 wokkel/im.py
    a b  
    170170        self.xmlstream.addObserver("/presence", self._onPresence)
    171171
    172172
    173     def _onPresence(self, element):
     173    def parsePresence(self, element):
     174        """
     175        Parse presence.
     176        """
    174177        stanza = Stanza.fromElement(element)
    175178
    176179        presenceType = stanza.stanzaType or 'available'
     
    180183        except KeyError:
    181184            return
    182185
    183         presence = parser.fromElement(element)
     186        return parser.fromElement(element)
     187
     188
     189    def _onPresence(self, element):
     190        presence = self.parsePresence(element)
     191        presenceType = presence.stanzaType or 'available'
    184192
    185193        try:
    186194            handler = getattr(self, '%sReceived' % presenceType)
    187195        except AttributeError:
    188196            return
    189197        else:
    190             handler(presence)
     198            element.handled = handler(presence)
    191199
    192200
    193201    def errorReceived(self, presence):
     
    583591
    584592
    585593
     594class AccountIQHandler(XMPPHandler):
     595
     596    def __init__(self, sessionManager):
     597        XMPPHandler.__init__(self)
     598        self.sessionManager = sessionManager
     599
     600
     601    def connectionMade(self):
     602        self.xmlstream.addObserver('/iq', self.onIQ, 1)
     603
     604
     605    def onIQ(self, iq):
     606        """
     607        Handler for iq stanzas to user accounts' connected resources.
     608
     609        If the recipient is a bare JID or there is no associated user, this
     610        handler ignores the stanza, so that other handlers have a chance
     611        to pick it up. If used, L{generic.FallbackHandler} will respond with a
     612        C{'service-unavailable'} stanza error if no other handlers handle
     613        the iq.
     614        """
     615
     616        if iq.handled:
     617            return
     618
     619        try:
     620            recipient = internJID(iq['to'])
     621        except KeyError:
     622            return
     623
     624        if not recipient.user:
     625            # This is not for an account, ignore it
     626            return
     627        elif recipient.user not in self.sessionManager.accounts:
     628            # This is not a user, ignore it
     629            return
     630        elif not recipient.resource:
     631            # Bare JID at local domain, ignore it
     632            return
     633
     634        userSessions = self.sessionManager.sessions.get(recipient.user,
     635                                                        {})
     636        if recipient.resource in userSessions:
     637            self.sessionManager.deliverStanza(iq, recipient)
     638        else:
     639            # Full JID without connected resource, return error
     640            exc = error.StanzaError('service-unavailable')
     641            if iq['type'] in ('result', 'error'):
     642                log.err(exc, 'Could not deliver IQ response')
     643            else:
     644                self.send(exc.toResponse(iq))
     645
     646        iq.handled = True
     647
     648
     649
     650class AccountMessageHandler(XMPPHandler):
     651
     652    def __init__(self, sessionManager):
     653        XMPPHandler.__init__(self)
     654        self.sessionManager = sessionManager
     655
     656
     657    def connectionMade(self):
     658        self.xmlstream.addObserver('/message', self.onMessage, 1)
     659
     660
     661    def onMessage(self, message):
     662        """
     663        Handler for message stanzas to user accounts.
     664        """
     665
     666        if message.handled:
     667            return
     668
     669        try:
     670            recipient = internJID(message['to'])
     671        except KeyError:
     672            return
     673
     674        stanzaType = message.getAttribute('type', 'normal')
     675
     676        try:
     677            if not recipient.user:
     678                # This is not for an account, ignore it
     679                return
     680            elif recipient.user not in self.sessionManager.accounts:
     681                # This is not a user, ignore it
     682                return
     683            elif recipient.resource:
     684                userSessions = self.sessionManager.sessions.get(recipient.user,
     685                                                                {})
     686                if recipient.resource in userSessions:
     687                    self.sessionManager.deliverStanza(message, recipient)
     688                else:
     689                    if stanzaType in ('normal', 'chat', 'headline'):
     690                        self.onMessageBareJID(message, recipient.userhostJID())
     691                    elif stanzaType == 'error':
     692                        log.msg("Dropping message to unconnected resource %r" %
     693                                recipient.full())
     694                    elif stanzaType == 'groupchat':
     695                        raise error.StanzaError('service-unavailable')
     696            else:
     697                self.onMessageBareJID(message, recipient)
     698        except error.StanzaError, exc:
     699            if stanzaType == 'error':
     700                log.err(exc, "Undeliverable error")
     701            else:
     702                self.send(exc.toResponse(message))
     703
     704        message.handled = True
     705
     706
     707    def onMessageBareJID(self, message, bareJID):
     708        stanzaType = message.getAttribute('type', 'normal')
     709
     710        userSessions = self.sessionManager.sessions.get(bareJID.user, {})
     711
     712        recipients = set()
     713
     714        if stanzaType == 'headline':
     715            for session in userSessions:
     716                if session.presence.priority >= 0:
     717                    recipients.add(session.entity)
     718        elif stanzaType in ('chat', 'normal'):
     719            priorities = {}
     720            for session in userSessions.itervalues():
     721                if not session.presence or not session.presence.available:
     722                    continue
     723                priority = session.presence.priority
     724                if priority >= 0:
     725                    priorities.setdefault(priority, set()).add(session.entity)
     726                maxPriority = max(priorities.keys())
     727                recipients.update(priorities[maxPriority])
     728        elif stanzaType == 'groupchat':
     729            raise error.StanzaError('service-unavailable')
     730
     731        if recipients:
     732            for recipient in recipients:
     733                self.sessionManager.deliverStanza(message, recipient)
     734        elif stanzaType in ('chat', 'normal'):
     735            raise error.StanzaError('service-unavailable')
     736        else:
     737            # silently discard
     738            log.msg("Discarding message to %r" % message['to'])
     739
     740
     741
     742
     743def clonePresence(presence):
     744    """
     745    Make a deep copy of a presence stanza.
     746
     747    The returned presence stanza is an orphaned deep copy of the given
     748    original.
     749
     750    @note: Since the reference to the original parent, if any, is gone,
     751    inherited attributes like C{xml:lang} are not preserved.
     752    """
     753    element = presence.element
     754
     755    parent = element.parent
     756    element.parent = None
     757    newElement = copy.deepcopy(element)
     758    element.parent = parent
     759    return newElement
     760
     761
     762
     763class PresenceServerHandler(PresenceProtocol):
     764
     765    def __init__(self, sessionManager, domain, roster):
     766        PresenceProtocol.__init__(self)
     767        self.sessionManager = sessionManager
     768        self.domain = domain
     769        self.roster = roster
     770        self.presences = {} # user -> resource -> presence
     771        self.offlinePresences = {} # user -> presence
     772        self.remotePresences = {} # user -> remote entity -> presence
     773
     774        self.sessionManager.clientStream.addObserver('/presence',
     775                                                     self._onPresenceOutbound)
     776
     777
     778    def _onPresenceOutbound(self, element):
     779        log.msg("Got outbound presence: %r" % element.toXml())
     780        presence = self.parsePresence(element)
     781
     782        presenceType = presence.stanzaType or 'available'
     783        method = '%sReceivedOutbound' % presenceType
     784        print method
     785
     786        try:
     787            handler = getattr(self, method)
     788        except AttributeError:
     789            return
     790        else:
     791            element.handled = handler(presence)
     792
     793
     794    def _broadcastToOtherResources(self, presence):
     795        """
     796        Broadcast presence to other available resources.
     797        """
     798        fromJID = presence.sender
     799        for otherResource in self.presences[fromJID.user]:
     800            if otherResource == fromJID.resource:
     801                continue
     802
     803            resourceJID = JID(tuple=(fromJID.user, fromJID.host, otherResource))
     804            outPresence = clonePresence(presence)
     805            outPresence['to'] = resourceJID.full()
     806            self.sessionManager.deliverStanza(outPresence, resourceJID)
     807
     808
     809    def _broadcastToContacts(self, presence):
     810        """
     811        Broadcast presence to subscribed entities.
     812        """
     813        fromJID = presence.sender
     814        roster = self.roster[fromJID.user]
     815
     816        for item in roster.itervalues():
     817            if not item.subscriptionFrom:
     818                continue
     819
     820            outPresence = clonePresence(presence)
     821            outPresence['to'] = item.jid.full()
     822
     823            if item.jid.host == self.domain:
     824                # local contact
     825                if item.jid.user in self.presences:
     826                    # broadcast to contact's available resources
     827                    for itemResource in self.presences[item.jid.user]:
     828                        resourceJID = JID(tuple=(item.jid.user,
     829                                                 item.jid.host,
     830                                                 itemResource))
     831                        self.sessionManager.deliverStanza(outPresence,
     832                                                          resourceJID)
     833            else:
     834                # remote contact
     835                self.send(outPresence)
     836
     837
     838    def _on_availableBroadcast(self, presence):
     839        fromJID = presence.sender
     840        user, resource = fromJID.user, fromJID.resource
     841        roster = self.roster[user]
     842
     843        if user not in self.presences:
     844            # initial presence
     845            self.presences[user] = {}
     846            self.remotePresences[user] = {}
     847
     848            # send out probes
     849            for item in roster.itervalues():
     850                if item.subscriptionTo and item.jid.host != self.domain:
     851                    self.probe(item.jid, fromJID)
     852        else:
     853            if resource not in self.presences[user]:
     854                # initial presence with another available resource
     855
     856                # send last known presences from remote contacts
     857                remotePresences = self.remotePresences[user]
     858                for entity, remotePresence in remotePresences.iteritems():
     859                    self.sessionManager.deliverStanza(remotePresence.element,
     860                                                      fromJID)
     861
     862            # send presence to other resources
     863            self._broadcastToOtherResources(presence)
     864
     865        # Send last known local presences
     866        if user not in self.presences or resource not in self.presences[user]:
     867            for item in roster.itervalues():
     868                if item.subscriptionTo and \
     869                   item.jid.host == self.domain and \
     870                   item.jid.user in self.presences:
     871                    for contactPresence in \
     872                            self.presences[item.jid.user].itervalues():
     873                        outPresence = clonePresence(contactPresence)
     874                        outPresence['to'] = fromJID.userhost()
     875                        self.sessionManager.deliverStanza(outPresence, fromJID)
     876
     877        # broadcast presence
     878        self._broadcastToContacts(presence)
     879
     880        # save presence
     881        self.presences[user][resource] = presence
     882        self.sessionManager.sessions[user][resource].presence = presence
     883
     884        return True
     885
     886
     887    def _on_availableDirected(self, presence):
     888        self.send(presence.element)
     889        return True
     890
     891
     892    def availableReceivedOutbound(self, presence):
     893        if presence.recipient:
     894            return self._on_availableDirected(presence)
     895        else:
     896            return self._on_availableBroadcast(presence)
     897
     898
     899    def availableReceived(self, presence):
     900        fromJID = presence.sender
     901        toJID = presence.recipient
     902
     903        if toJID.user not in self.roster:
     904            return False
     905
     906        if toJID.user in self.presences:
     907            for resource in self.presences[toJID.user]:
     908                resourceJID = JID(tuple=(toJID.user, toJID.host, resource))
     909                self.sessionManager.deliverStanza(presence.element, resourceJID)
     910            self.remotePresences[toJID.user][fromJID] = presence
     911        else:
     912            # no such user or no available resource, ignore this stanza
     913            pass
     914
     915        return True
     916
     917
     918    def _on_unavailableBroadcast(self, presence):
     919        fromJID = presence.sender
     920        user, resource = fromJID.user, fromJID.resource
     921
     922        # broadcast presence
     923        self._broadcastToContacts(presence)
     924
     925        if user in self.presences:
     926            # send presence to other resources
     927            self._broadcastToOtherResources(presence)
     928
     929            # update stored presences
     930            if resource in self.presences[user]:
     931                del self.presences[user][resource]
     932
     933            if not self.presences[user]:
     934                # last resource to become unavailable
     935                del self.presences[user]
     936
     937                # TODO: save last unavailable presence
     938
     939        return True
     940
     941
     942    def _on_unavailableDirected(self, presence):
     943        self.send(presence.element)
     944        return True
     945
     946
     947    def unavailableReceivedOutbound(self, presence):
     948        if presence.recipient:
     949            return self._on_unavailableDirected(presence)
     950        else:
     951            return self._on_unavailableBroadcast(presence)
     952
     953#    def unavailableReceived(self, presence):
     954
     955
     956    def subscribedReceivedOutbound(self, presence):
     957        log.msg("%r subscribed %s to its presence" % (presence.sender,
     958                                                      presence.recipient))
     959        self.send(presence.element)
     960        return True
     961
     962
     963    def subscribedReceived(self, presence):
     964        log.msg("%r subscribed %s to its presence" % (presence.sender,
     965                                                      presence.recipient))
     966
     967
     968    def unsubscribedReceivedOutbound(self, presence):
     969        log.msg("%r unsubscribed %s from its presence" % (presence.sender,
     970                                                          presence.recipient))
     971        self.send(presence.element)
     972        return True
     973
     974
     975    def unsubscribedReceived(self, presence):
     976        log.msg("%r unsubscribed %s from its presence" % (presence.sender,
     977                                                          presence.recipient))
     978
     979
     980    def subscribeReceivedOutbound(self, presence):
     981        log.msg("%r requests subscription to %s" % (presence.sender,
     982                                                    presence.recipient))
     983        self.send(presence.element)
     984        return True
     985
     986
     987    def subscribeReceived(self, presence):
     988        log.msg("%r requests subscription to %s" % (presence.sender,
     989                                                    presence.recipient))
     990
     991
     992    def unsubscribeReceivedOutbound(self, presence):
     993        log.msg("%r requests unsubscription from %s" % (presence.sender,
     994                                                        presence.recipient))
     995        self.send(presence.element)
     996        return True
     997
     998
     999    def unsubscribeReceived(self, presence):
     1000        log.msg("%r requests unsubscription from %s" % (presence.sender,
     1001                                                        presence.recipient))
     1002
     1003
     1004    def probeReceived(self, presence):
     1005        fromJID = presence.sender
     1006        toJID = presence.recipient
     1007
     1008        if toJID.user not in self.roster or \
     1009           fromJID.userhost() not in self.roster[toJID.user] or \
     1010           not self.roster[toJID.user][fromJID.userhost()].subscriptionFrom:
     1011            # send unsubscribed
     1012            pass
     1013        elif toJID.user not in self.presences:
     1014            # send last unavailable or nothing
     1015            pass
     1016        else:
     1017            for resourcePresence in self.presences[toJID.user].itervalues():
     1018                outPresence = clonePresence(resourcePresence)
     1019                outPresence['to'] = fromJID.userhost()
     1020                self.send(outPresence)
     1021
     1022
     1023
    5861024class RosterServerProtocol(XMPPHandler, IQHandlerMixin):
    5871025    """
    5881026    XMPP subprotocol handler for the roster, server side.
Note: See TracBrowser for help on using the repository browser.