source: ralphm-patches/xmpp_client_service.patch @ 39:d6a0f8cbabf3

Last change on this file since 39:d6a0f8cbabf3 was 39:d6a0f8cbabf3, checked in by Ralph Meijer <ralphm@…>, 10 years ago

Add message, iq and presence handling, remove ClientService? and redo
relationship between c2s factory and session manager.

File size: 37.4 KB
  • new file doc/examples/client_service.tac

    diff -r 62f841ed2a99 doc/examples/client_service.tac
    - +  
     1from twisted.application import service, strports
     2from twisted.internet import defer
     3
     4from wokkel import client, xmppim
     5from wokkel.component import InternalComponent, Router
     6from wokkel.generic import FallbackHandler
     7from wokkel.ping import PingHandler
     8from wokkel.xmppim import RosterItem
     9
     10from twisted.words.protocols.jabber.jid import internJID as JID
     11
     12import socket
     13domain = socket.gethostname()
     14
     15RALPHM = JID('ralphm@'+domain)
     16INTOSI = JID('intosi@'+domain)
     17TERMIE = JID('termie@'+domain)
     18
     19roster = {
     20    'ralphm': {
     21        INTOSI: RosterItem(INTOSI,
     22                           subscriptionTo=True,
     23                           subscriptionFrom=True,
     24                           name='Intosi',
     25                           groups=set(['Friends'])),
     26        TERMIE: RosterItem(TERMIE,
     27                           subscriptionTo=True,
     28                           subscriptionFrom=True,
     29                           name='termie'),
     30        },
     31    'termie': {
     32        RALPHM: RosterItem(RALPHM,
     33                           subscriptionTo=True,
     34                           subscriptionFrom=True,
     35                           name='ralphm'),
     36        }
     37    }
     38
     39accounts = set(roster.keys())
     40
     41
     42class StaticRoster(xmppim.RosterServerProtocol):
     43
     44    def __init__(self, roster):
     45        xmppim.RosterServerProtocol.__init__(self)
     46        self.roster = roster
     47
     48    def getRoster(self, entity):
     49        return defer.succeed(self.roster[entity.user].values())
     50
     51
     52
     53application = service.Application("Jabber server")
     54
     55router = Router()
     56component = InternalComponent(router, domain)
     57component.setServiceParent(application)
     58
     59sessionManager = client.SessionManager(domain, accounts)
     60sessionManager.setHandlerParent(component)
     61
     62xmppim.AccountIQHandler(sessionManager).setHandlerParent(component)
     63xmppim.AccountMessageHandler(sessionManager).setHandlerParent(component)
     64xmppim.PresenceServerHandler(sessionManager, domain, roster).setHandlerParent(component)
     65FallbackHandler().setHandlerParent(component)
     66StaticRoster(roster).setHandlerParent(component)
     67PingHandler().setHandlerParent(component)
     68
     69c2sFactory = client.XMPPC2SServerFactory(sessionManager)
     70c2sFactory.logTraffic = True
     71c2sService = strports.service('5224', c2sFactory)
     72c2sService.setServiceParent(application)
     73
     74sessionManager.connectionManager = c2sFactory
  • wokkel/client.py

    diff -r 62f841ed2a99 wokkel/client.py
    a b  
    1010that should probably eventually move there.
    1111"""
    1212
     13import base64
     14
    1315from twisted.application import service
    14 from twisted.internet import reactor
     16from twisted.internet import defer, reactor
    1517from twisted.names.srvconnect import SRVConnector
    16 from twisted.words.protocols.jabber import client, sasl, xmlstream
     18from twisted.python import log, randbytes
     19from twisted.words.protocols.jabber import client, error, sasl, xmlstream
     20from twisted.words.protocols.jabber.jid import JID, internJID
     21from twisted.words.xish import domish
    1722
    1823from wokkel import generic
    19 from wokkel.subprotocols import StreamManager
     24from wokkel.compat import XmlStreamServerFactory
     25from wokkel.subprotocols import StreamManager, XMPPHandler
     26
     27NS_CLIENT = 'jabber:client'
     28
     29XPATH_ALL = "/*"
     30XPATH_AUTH = "/auth[@xmlns='%s']" % sasl.NS_XMPP_SASL
     31XPATH_BIND = "/iq[@type='set']/bind[@xmlns='%s']" % client.NS_XMPP_BIND
     32XPATH_SESSION = "/iq[@type='set']/session[@xmlns='%s']" % \
     33                client.NS_XMPP_SESSION
    2034
    2135class CheckAuthInitializer(object):
    2236    """
     
    5165    autentication.
    5266    """
    5367
    54     namespace = 'jabber:client'
     68    namespace = NS_CLIENT
    5569
    5670    def __init__(self, jid, password):
    5771        xmlstream.ConnectAuthenticator.__init__(self, jid.host)
     
    186200    c = XMPPClientConnector(reactor, domain, factory)
    187201    c.connect()
    188202    return factory.deferred
     203
     204
     205
     206class XMPPClientListenAuthenticator(xmlstream.ListenAuthenticator):
     207    namespace = NS_CLIENT
     208
     209    def __init__(self, service):
     210        self.service = service
     211        self.failureGrace = 3
     212        self.state = 'auth'
     213
     214
     215    def associateWithStream(self, xs):
     216        xmlstream.ListenAuthenticator.associateWithStream(self, xs)
     217        self.xmlstream.addObserver(XPATH_ALL, self.onElementFallback, -1)
     218
     219
     220    def onElementFallback(self, element):
     221        if element.handled:
     222            return
     223
     224        exc = error.StreamError('not-authorized')
     225        self.xmlstream.sendStreamError(exc)
     226
     227
     228    def streamStarted(self, rootElement):
     229        xmlstream.ListenAuthenticator.streamStarted(self, rootElement)
     230
     231        # check namespace
     232        #if self.xmlstream.namespace != self.namespace:
     233        #    self.xmlstream.namespace = self.namespace
     234        #    exc = error.StreamError('invalid-namespace')
     235        #    self.xmlstream.sendStreamError(exc)
     236        #    return
     237
     238        # TODO: check domain (self.service.domain)
     239
     240        self.xmlstream.sendHeader()
     241
     242        try:
     243            stateHandlerName = 'streamStarted_' + self.state
     244            stateHandler = getattr(self, stateHandlerName)
     245        except AttributeError:
     246            log.msg('streamStarted handler for', self.state, 'not found')
     247        else:
     248            stateHandler()
     249
     250
     251    def toState(self, state):
     252        self.state = state
     253        if state == 'initialized':
     254            self.xmlstream.removeObserver(XPATH_ALL, self.onElementFallback)
     255            self.xmlstream.addOnetimeObserver(XPATH_SESSION, self.onSession, 1)
     256            self.xmlstream.dispatch(self.xmlstream,
     257                                    xmlstream.STREAM_AUTHD_EVENT)
     258
     259
     260    def streamStarted_auth(self):
     261        features = domish.Element((xmlstream.NS_STREAMS, 'features'))
     262        features.addElement((sasl.NS_XMPP_SASL, 'mechanisms'))
     263        features.mechanisms.addElement('mechanism', content='PLAIN')
     264        self.xmlstream.send(features)
     265        self.xmlstream.addOnetimeObserver(XPATH_AUTH, self.onAuth)
     266
     267
     268    def onAuth(self, auth):
     269        auth.handled = True
     270
     271        if auth.getAttribute('mechanism') != 'PLAIN':
     272            failure = domish.Element((sasl.NS_XMPP_SASL, 'failure'))
     273            failure.addElement('invalid-mechanism')
     274            self.xmlstream.send(failure)
     275
     276            # Close stream on too many failing authentication attempts
     277            self.failureGrace -= 1
     278            if self.failureGrace == 0:
     279                self.xmlstream.sendFooter()
     280            else:
     281                self.xmlstream.addOnetimeObserver(XPATH_AUTH, self.onAuth)
     282
     283            return
     284
     285        initialResponse = base64.b64decode(unicode(auth))
     286        authzid, authcid, passwd = initialResponse.split('\x00')
     287
     288        # TODO: check passwd
     289
     290        # authenticated
     291
     292        self.username = authcid
     293
     294        success = domish.Element((sasl.NS_XMPP_SASL, 'success'))
     295        self.xmlstream.send(success)
     296        self.xmlstream.reset()
     297
     298        self.toState('bind')
     299
     300
     301    def streamStarted_bind(self):
     302        features = domish.Element((xmlstream.NS_STREAMS, 'features'))
     303        features.addElement((client.NS_XMPP_BIND, 'bind'))
     304        features.addElement((client.NS_XMPP_SESSION, 'session'))
     305        self.xmlstream.send(features)
     306        self.xmlstream.addOnetimeObserver(XPATH_BIND, self.onBind)
     307
     308
     309    def onBind(self, iq):
     310        def cb(boundJID):
     311            self.xmlstream.otherEntity = boundJID
     312            self.toState('initialized')
     313
     314            response = xmlstream.toResponse(iq, 'result')
     315            response.addElement((client.NS_XMPP_BIND, 'bind'))
     316            response.bind.addElement((client.NS_XMPP_BIND, 'jid'),
     317                                  content=boundJID.full())
     318
     319            return response
     320
     321        def eb(failure):
     322            if not isinstance(failure, error.StanzaError):
     323                log.msg(failure)
     324                exc = error.StanzaError('internal-server-error')
     325            else:
     326                exc = failure.value
     327
     328            return exc.toResponse(iq)
     329
     330        iq.handled = True
     331        resource = unicode(iq.bind) or None
     332        d = self.service.bindResource(self.username,
     333                                      self.service.domain,
     334                                      resource)
     335        d.addCallback(cb)
     336        d.addErrback(eb)
     337        d.addCallback(self.xmlstream.send)
     338
     339
     340    def onSession(self, iq):
     341        iq.handled = True
     342
     343        reply = domish.Element((None, 'iq'))
     344        reply['type'] = 'result'
     345        if iq.getAttribute('id'):
     346            reply['id'] = iq['id']
     347        reply.addElement((client.NS_XMPP_SESSION, 'session'))
     348        self.xmlstream.send(reply)
     349
     350
     351
     352class RecipientUnavailable(Exception):
     353    """
     354    The addressed entity is not, or no longer, available.
     355    """
     356
     357
     358
     359class XMPPC2SServerFactory(XmlStreamServerFactory):
     360
     361    def __init__(self, service):
     362        self.service = service
     363
     364        def authenticatorFactory():
     365            return XMPPClientListenAuthenticator(service)
     366
     367        XmlStreamServerFactory.__init__(self, authenticatorFactory)
     368        self.addBootstrap(xmlstream.STREAM_CONNECTED_EVENT,
     369                          self.onConnectionMade)
     370        self.addBootstrap(xmlstream.STREAM_AUTHD_EVENT,
     371                          self.onAuthenticated)
     372
     373        self.serial = 0
     374        self.streams = {}
     375
     376
     377    def onConnectionMade(self, xs):
     378        """
     379        Called when a client-to-server connection was made.
     380
     381        This enables traffic debugging on incoming streams.
     382        """
     383        xs.serial = self.serial
     384        self.serial += 1
     385
     386        log.msg("Client connection %d made" % xs.serial)
     387
     388        def logDataIn(buf):
     389            log.msg("RECV (%d): %r" % (xs.serial, buf))
     390
     391        def logDataOut(buf):
     392            log.msg("SEND (%d): %r" % (xs.serial, buf))
     393
     394        if self.logTraffic:
     395            xs.rawDataInFn = logDataIn
     396            xs.rawDataOutFn = logDataOut
     397
     398        xs.addObserver(xmlstream.STREAM_ERROR_EVENT, self.onError)
     399
     400
     401    def onAuthenticated(self, xs):
     402        log.msg("Client connection %d authenticated" % xs.serial)
     403
     404        xs.addObserver(xmlstream.STREAM_END_EVENT, self.onConnectionLost,
     405                                                   0, xs)
     406        xs.addObserver('/*', self.onElement, 0, xs)
     407
     408        # Record this stream as bound to the authenticated JID
     409        self.streams[xs.otherEntity] = xs
     410
     411
     412    def onConnectionLost(self, xs, reason):
     413        log.msg("Client connection %d disconnected" % xs.serial)
     414
     415        entity = xs.otherEntity
     416        self.service.unbindResource(entity.user,
     417                                    entity.host,
     418                                    entity.resource,
     419                                    reason)
     420
     421        # If the lost connections had been bound, remove the reference
     422        if xs.otherEntity in self.streams:
     423            del self.streams[xs.otherEntity]
     424
     425
     426    def onError(self, reason):
     427        log.err(reason, "Stream Error")
     428
     429
     430    def onElement(self, xs, stanza):
     431        """
     432        Called when an element was received from one of the connected streams.
     433
     434        """
     435        if stanza.handled:
     436            return
     437        else:
     438            self.service.onElement(stanza, xs.otherEntity)
     439
     440
     441    def deliverStanza(self, element, recipient):
     442        if recipient in self.streams:
     443            self.streams[recipient].send(element)
     444        else:
     445            raise RecipientUnavailable(u"There is no connection for %s" %
     446                                       recipient.full())
     447
     448
     449class Session(object):
     450    def __init__(self, entity):
     451        self.entity = entity
     452        self.connected = False
     453        self.interested = False
     454        self.presence = None
     455
     456
     457
     458class SessionManager(XMPPHandler):
     459
     460    """
     461    Session Manager.
     462
     463    @ivar xmlstream: XML Stream to inject incoming stanzas from client
     464        connections into. Stanzas where the C{'to'} attribute is not set
     465        or is directed at the local domain are injected as if received on
     466        the XML Stream (using C{dispatch}), other stanzas are injected as if
     467        they were sent from the XML Stream (using C{send}).
     468    """
     469
     470    def __init__(self, domain, accounts):
     471        XMPPHandler.__init__(self)
     472        self.domain = domain
     473        self.accounts = accounts
     474
     475        self.connectionManager = None
     476        self.sessions = {}
     477
     478
     479    def bindResource(self, localpart, domain, resource):
     480        if domain != self.domain:
     481            raise Exception("I don't host this domain!")
     482
     483        try:
     484            userSessions = self.sessions[localpart]
     485        except KeyError:
     486            userSessions = self.sessions[localpart] = {}
     487
     488        if resource is None:
     489            resource = randbytes.secureRandom(8).encode('hex')
     490        elif resource in self.userSessions:
     491            resource = resource + ' ' + randbytes.secureRandom(8).encode('hex')
     492
     493        entity = JID(tuple=(localpart, domain, resource))
     494        session = Session(entity)
     495        session.connected = True
     496        userSessions[resource] = session
     497
     498        return defer.succeed(entity)
     499
     500
     501    def unbindResource(self, localpart, domain, resource, reason=None):
     502        try:
     503            session = self.sessions[localpart][resource]
     504        except KeyError:
     505            pass
     506        else:
     507            session.connected = False
     508            del self.sessions[localpart][resource]
     509            if not self.sessions[localpart]:
     510                del self.sessions[localpart]
     511
     512        return defer.succeed(None)
     513
     514
     515    def onElement(self, element, sender):
     516        # Make sure each stanza has a sender address
     517        if (element.name == 'presence' and
     518            element.getAttribute('type') in ('subscribe', 'subscribed',
     519                                             'unsubscribe', 'unsubscribed')):
     520            element['from'] = sender.userhost()
     521        else:
     522            element['from'] = sender.full()
     523
     524        if (not element.hasAttribute('to') or
     525                internJID(element['to']).host == self.domain):
     526            # This stanza is for local delivery
     527            self.xmlstream.dispatch(element)
     528        else:
     529            # This stanza is for remote delivery
     530            self.xmlstream.send(element)
     531
     532
     533    def deliverStanza(self, element, recipient):
     534        if self.connectionManager:
     535            self.connectionManager.deliverStanza(element, recipient)
     536        else:
     537            raise Exception("No connection manager set")
  • wokkel/component.py

    diff -r 62f841ed2a99 wokkel/component.py
    a b  
    313313        """
    314314        destination = JID(stanza['to'])
    315315
    316         log.msg("Routing to %s: %r" % (destination.full(), stanza.toXml()))
    317316
    318317        if destination.host in self.routes:
     318            msg = "Routing to %s (default route): %r"
     319            log.msg("Routing to %s: %r" % (destination.full(),
     320                                           stanza.toXml()))
    319321            self.routes[destination.host].send(stanza)
     322        elif None in self.routers:
     323            log.msg("Routing to %s (default route): %r" % (destination.full(),
     324                                                           stanza.toXml()))
     325            self.routes[None].send(stanza)
    320326        else:
    321             self.routes[None].send(stanza)
     327            log.msg("No route to %s: %r" % (destination.full(),
     328                                            stanza.toXml()))
     329            if stanza.getAttribute('type') not in ('result', 'error'):
     330                # No route, send back error
     331                exc = error.StanzaError('remote-server-timeout')
     332                response = exc.toResponse(stanza)
     333                self.route(response)
    322334
    323335
    324336
  • wokkel/xmppim.py

    diff -r 62f841ed2a99 wokkel/xmppim.py
    a b  
    1212All of it should eventually move to Twisted.
    1313"""
    1414
     15import copy
     16
     17from twisted.python import log
    1518from twisted.words.protocols.jabber import error, xmlstream
    16 from twisted.words.protocols.jabber.jid import JID
     19from twisted.words.protocols.jabber.jid import JID, internJID
    1720from twisted.words.xish import domish
    1821
    1922from wokkel.compat import IQ
     
    8588            handler(presence)
    8689
    8790    def _onPresenceAvailable(self, presence):
    88         entity = JID(presence["from"])
     91        entity = internJID(presence["from"])
    8992
    9093        show = unicode(presence.show or '')
    9194        if show not in ['away', 'xa', 'chat', 'dnd']:
     
    101104        self.availableReceived(entity, show, statuses, priority)
    102105
    103106    def _onPresenceUnavailable(self, presence):
    104         entity = JID(presence["from"])
     107        entity = internJID(presence["from"])
    105108
    106109        statuses = self._getStatuses(presence)
    107110
    108111        self.unavailableReceived(entity, statuses)
    109112
    110113    def _onPresenceSubscribed(self, presence):
    111         self.subscribedReceived(JID(presence["from"]))
     114        self.subscribedReceived(internJID(presence["from"]))
    112115
    113116    def _onPresenceUnsubscribed(self, presence):
    114         self.unsubscribedReceived(JID(presence["from"]))
     117        self.unsubscribedReceived(internJID(presence["from"]))
    115118
    116119    def _onPresenceSubscribe(self, presence):
    117         self.subscribeReceived(JID(presence["from"]))
     120        self.subscribeReceived(internJID(presence["from"]))
    118121
    119122    def _onPresenceUnsubscribe(self, presence):
    120         self.unsubscribeReceived(JID(presence["from"]))
     123        self.unsubscribeReceived(internJID(presence["from"]))
    121124
    122125
    123126    def availableReceived(self, entity, show=None, statuses=None, priority=0):
     
    125128        Available presence was received.
    126129
    127130        @param entity: entity from which the presence was received.
    128         @type entity: {JID}
     131        @type entity: L{JID}
    129132        @param show: detailed presence information. One of C{'away'}, C{'xa'},
    130133                     C{'chat'}, C{'dnd'} or C{None}.
    131134        @type show: C{str} or C{NoneType}
     
    143146        Unavailable presence was received.
    144147
    145148        @param entity: entity from which the presence was received.
    146         @type entity: {JID}
     149        @type entity: L{JID}
    147150        @param statuses: dictionary of natural language descriptions of the
    148151                         availability status, keyed by the language
    149152                         descriptor. A status without a language
     
    156159        Subscription approval confirmation was received.
    157160
    158161        @param entity: entity from which the confirmation was received.
    159         @type entity: {JID}
     162        @type entity: L{JID}
    160163        """
    161164
    162165    def unsubscribedReceived(self, entity):
     
    164167        Unsubscription confirmation was received.
    165168
    166169        @param entity: entity from which the confirmation was received.
    167         @type entity: {JID}
     170        @type entity: L{JID}
    168171        """
    169172
    170173    def subscribeReceived(self, entity):
     
    172175        Subscription request was received.
    173176
    174177        @param entity: entity from which the request was received.
    175         @type entity: {JID}
     178        @type entity: L{JID}
    176179        """
    177180
    178181    def unsubscribeReceived(self, entity):
     
    180183        Unsubscription request was received.
    181184
    182185        @param entity: entity from which the request was received.
    183         @type entity: {JID}
     186        @type entity: L{JID}
    184187        """
    185188
    186189    def available(self, entity=None, show=None, statuses=None, priority=0):
     
    188191        Send available presence.
    189192
    190193        @param entity: optional entity to which the presence should be sent.
    191         @type entity: {JID}
     194        @type entity: L{JID}
    192195        @param show: optional detailed presence information. One of C{'away'},
    193196                     C{'xa'}, C{'chat'}, C{'dnd'}.
    194197        @type show: C{str}
     
    207210        Send unavailable presence.
    208211
    209212        @param entity: optional entity to which the presence should be sent.
    210         @type entity: {JID}
     213        @type entity: L{JID}
    211214        @param statuses: dictionary of natural language descriptions of the
    212215                         availability status, keyed by the language
    213216                         descriptor. A status without a language
     
    221224        Send subscription request
    222225
    223226        @param entity: entity to subscribe to.
    224         @type entity: {JID}
     227        @type entity: L{JID}
    225228        """
    226229        self.send(Presence(to=entity, type='subscribe'))
    227230
     
    230233        Send unsubscription request
    231234
    232235        @param entity: entity to unsubscribe from.
    233         @type entity: {JID}
     236        @type entity: L{JID}
    234237        """
    235238        self.send(Presence(to=entity, type='unsubscribe'))
    236239
     
    239242        Send subscription confirmation.
    240243
    241244        @param entity: entity that subscribed.
    242         @type entity: {JID}
     245        @type entity: L{JID}
    243246        """
    244247        self.send(Presence(to=entity, type='subscribed'))
    245248
     
    248251        Send unsubscription confirmation.
    249252
    250253        @param entity: entity that unsubscribed.
    251         @type entity: {JID}
     254        @type entity: L{JID}
    252255        """
    253256        self.send(Presence(to=entity, type='unsubscribed'))
    254257
     
    478481
    479482        @param recipient: Optional Recipient to which the presence should be
    480483            sent.
    481         @type recipient: {JID}
     484        @type recipient: L{JID}
    482485
    483486        @param show: Optional detailed presence information. One of C{'away'},
    484487            C{'xa'}, C{'chat'}, C{'dnd'}.
     
    503506        Send unavailable presence.
    504507
    505508        @param recipient: Optional entity to which the presence should be sent.
    506         @type recipient: {JID}
     509        @type recipient: L{JID}
    507510
    508511        @param statuses: dictionary of natural language descriptions of the
    509512            availability status, keyed by the language descriptor. A status
     
    520523        Send subscription request
    521524
    522525        @param recipient: Entity to subscribe to.
    523         @type recipient: {JID}
     526        @type recipient: L{JID}
    524527        """
    525528        presence = SubscriptionPresence(recipient=recipient, sender=sender)
    526529        presence.stanzaType = 'subscribe'
     
    532535        Send unsubscription request
    533536
    534537        @param recipient: Entity to unsubscribe from.
    535         @type recipient: {JID}
     538        @type recipient: L{JID}
    536539        """
    537540        presence = SubscriptionPresence(recipient=recipient, sender=sender)
    538541        presence.stanzaType = 'unsubscribe'
     
    544547        Send subscription confirmation.
    545548
    546549        @param recipient: Entity that subscribed.
    547         @type recipient: {JID}
     550        @type recipient: L{JID}
    548551        """
    549552        presence = SubscriptionPresence(recipient=recipient, sender=sender)
    550553        presence.stanzaType = 'subscribed'
     
    556559        Send unsubscription confirmation.
    557560
    558561        @param recipient: Entity that unsubscribed.
    559         @type recipient: {JID}
     562        @type recipient: L{JID}
    560563        """
    561564        presence = SubscriptionPresence(recipient=recipient, sender=sender)
    562565        presence.stanzaType = 'unsubscribed'
     
    568571        Send presence probe.
    569572
    570573        @param recipient: Entity to be probed.
    571         @type recipient: {JID}
     574        @type recipient: L{JID}
    572575        """
    573576        presence = ProbePresence(recipient=recipient, sender=sender)
    574577        self.send(presence.toElement())
     
    652655
    653656
    654657    def _parseRosterItem(self, element):
    655         jid = JID(element['jid'])
     658        jid = internJID(element['jid'])
    656659        item = RosterItem(jid)
    657660        item.name = element.getAttribute('name')
    658661        subscription = element.getAttribute('subscription')
     
    715718        itemElement = iq.query.item
    716719
    717720        if unicode(itemElement['subscription']) == 'remove':
    718             self.onRosterRemove(JID(itemElement['jid']))
     721            self.onRosterRemove(internJID(itemElement['jid']))
    719722        else:
    720723            item = self._parseRosterItem(iq.query.item)
    721724            self.onRosterSet(item)
     
    763766    def _onRosterGet(self, iq):
    764767        iq.handled = True
    765768
    766         d = self.getRoster(JID(iq["from"]))
     769        d = self.getRoster(internJID(iq["from"]))
    767770        d.addCallback(self._toRosterReply, iq)
    768771        d.addErrback(lambda _: error.ErrorStanza('internal-error').toResponse(iq))
    769772        d.addBoth(self.send)
     
    808811        """
    809812        Called when a message stanza was received.
    810813        """
     814
     815
     816
     817class AccountIQHandler(XMPPHandler):
     818
     819    def __init__(self, sessionManager):
     820        XMPPHandler.__init__(self)
     821        self.sessionManager = sessionManager
     822
     823
     824    def connectionMade(self):
     825        self.xmlstream.addObserver('/iq', self.onIQ, 1)
     826
     827
     828    def onIQ(self, iq):
     829        """
     830        Handler for iq stanzas to user accounts' connected resources.
     831
     832        If the recipient is a bare JID or there is no associated user, this
     833        handler ignores the stanza, so that other handlers have a chance
     834        to pick it up. If used, L{generic.FallbackHandler} will respond with a
     835        C{'service-unavailable'} stanza error if no other handlers handle
     836        the iq.
     837        """
     838
     839        if iq.handled:
     840            return
     841
     842        try:
     843            recipient = internJID(iq['to'])
     844        except KeyError:
     845            return
     846
     847        if not recipient.user:
     848            # This is not for an account, ignore it
     849            return
     850        elif recipient.user not in self.sessionManager.accounts:
     851            # This is not a user, ignore it
     852            return
     853        elif not recipient.resource:
     854            # Bare JID at local domain, ignore it
     855            return
     856        elif recipient.user in self.sessionManager.sessions:
     857            # Full JID with connected resource, deliver the stanza
     858            self.sessionManager.deliverStanza(iq, recipient)
     859        else:
     860            # Full JID without connected resource, return error
     861            exc = error.StanzaError('service-unavailable')
     862            if iq['type'] in ('result', 'error'):
     863                log.err(exc, 'Could not deliver IQ response')
     864            else:
     865                self.send(exc.toResponse(iq))
     866
     867        iq.handled = True
     868
     869
     870
     871class AccountMessageHandler(XMPPHandler):
     872
     873    def __init__(self, sessionManager):
     874        XMPPHandler.__init__(self)
     875        self.sessionManager = sessionManager
     876
     877
     878    def connectionMade(self):
     879        self.xmlstream.addObserver('/message', self.onMessage, 1)
     880
     881
     882    def onMessage(self, message):
     883        """
     884        Handler for message stanzas to user accounts.
     885        """
     886
     887        if message.handled:
     888            return
     889
     890        try:
     891            recipient = internJID(message['to'])
     892        except KeyError:
     893            return
     894
     895        stanzaType = message.getAttribute('type', 'normal')
     896
     897        try:
     898            if not recipient.user:
     899                # This is not for an account, ignore it
     900                return
     901            elif recipient.user not in self.sessionManager.accounts:
     902                # This is not a user, ignore it
     903                return
     904            elif recipient.resource:
     905                userSessions = self.sessionManager.sessions.get(recipient.user,
     906                                                                {})
     907                if recipient.resource in userSessions:
     908                    self.sessionManager.deliverStanza(message, recipient)
     909                else:
     910                    if stanzaType in ('normal', 'chat', 'headline'):
     911                        self.onMessageBareJID(message, recipient.userhostJID())
     912                    elif stanzaType == 'error':
     913                        log.msg("Dropping message to unconnected resource %r" %
     914                                recipient.full())
     915                    elif stanzaType == 'groupchat':
     916                        raise error.StanzaError('service-unavailable')
     917            else:
     918                self.onMessageBareJID(message, recipient)
     919        except error.StanzaError, exc:
     920            if stanzaType == 'error':
     921                log.err(exc, "Undeliverable error")
     922            else:
     923                self.send(exc.toResponse(message))
     924
     925        message.handled = True
     926
     927
     928    def onMessageBareJID(self, message, bareJID):
     929        stanzaType = message.getAttribute('type', 'normal')
     930
     931        userSessions = self.sessionManager.sessions.get(bareJID.user, {})
     932        print userSessions
     933
     934        recipients = set()
     935
     936        if stanzaType == 'headline':
     937            for session in userSessions:
     938                if session.presence.priority >= 0:
     939                    recipients.add(session.entity)
     940        elif stanzaType in ('chat', 'normal'):
     941            priorities = {}
     942            for session in userSessions.itervalues():
     943                if not session.presence or not session.presence.available:
     944                    continue
     945                priority = session.presence.priority
     946                if priority >= 0:
     947                    priorities.setdefault(priority, set()).add(session.entity)
     948                maxPriority = max(priorities.keys())
     949                recipients.update(priorities[maxPriority])
     950        elif stanzaType == 'groupchat':
     951            raise error.StanzaError('service-unavailable')
     952
     953        if recipients:
     954            for recipient in recipients:
     955                self.sessionManager.deliverStanza(message, recipient)
     956        elif stanzaType in ('chat', 'normal'):
     957            raise error.StanzaError('service-unavailable')
     958        else:
     959            # silently discard
     960            log.msg("Discarding message to %r" % message['to'])
     961
     962
     963
     964
     965def clonePresence(presence):
     966    """
     967    Make a deep copy of a presence stanza.
     968
     969    The returned presence stanza is an orphaned deep copy of the given
     970    original.
     971
     972    @note: Since the reference to the original parent, if any, is gone,
     973    inherited attributes like C{xml:lang} are not preserved.
     974    """
     975    element = presence.element
     976
     977    parent = element.parent
     978    element.parent = None
     979    newElement = copy.deepcopy(element)
     980    element.parent = parent
     981    return newElement
     982
     983
     984
     985class PresenceServerHandler(PresenceProtocol):
     986
     987    def __init__(self, sessionManager, domain, roster):
     988        PresenceProtocol.__init__(self)
     989        self.sessionManager = sessionManager
     990        self.domain = domain
     991        self.roster = roster
     992        self.presences = {} # user -> resource -> presence
     993        self.offlinePresences = {} # user -> presence
     994        self.remotePresences = {} # user -> remote entity -> presence
     995
     996
     997    def _broadcastToOtherResources(self, presence):
     998        """
     999        Broadcast presence to other available resources.
     1000        """
     1001        fromJID = presence.sender
     1002        for otherResource in self.presences[fromJID.user]:
     1003            if otherResource == fromJID.resource:
     1004                continue
     1005
     1006            resourceJID = JID(tuple=(fromJID.user, fromJID.host, otherResource))
     1007            outPresence = clonePresence(presence)
     1008            outPresence['to'] = resourceJID.full()
     1009            self.sessionManager.deliverStanza(outPresence, resourceJID)
     1010
     1011
     1012    def _broadcastToContacts(self, presence):
     1013        """
     1014        Broadcast presence to subscribed entities.
     1015        """
     1016        fromJID = presence.sender
     1017        roster = self.roster[fromJID.user]
     1018
     1019        for item in roster.itervalues():
     1020            if not item.subscriptionFrom:
     1021                continue
     1022
     1023            outPresence = clonePresence(presence)
     1024            outPresence['to'] = item.jid.full()
     1025
     1026            if item.jid.host == self.domain:
     1027                # local contact
     1028                if item.jid.user in self.presences:
     1029                    # broadcast to contact's available resources
     1030                    for itemResource in self.presences[item.jid.user]:
     1031                        resourceJID = JID(tuple=(item.jid.user,
     1032                                                 item.jid.host,
     1033                                                 itemResource))
     1034                        self.sessionManager.deliverStanza(outPresence,
     1035                                                          resourceJID)
     1036            else:
     1037                # remote contact
     1038                self.send(outPresence)
     1039
     1040
     1041    def _on_availableBroadcast(self, presence):
     1042        fromJID = presence.sender
     1043        user, resource = fromJID.user, fromJID.resource
     1044        roster = self.roster[user]
     1045
     1046        if user not in self.presences:
     1047            # initial presence
     1048            self.presences[user] = {}
     1049            self.remotePresences[user] = {}
     1050
     1051            # send out probes
     1052            for item in roster.itervalues():
     1053                if item.subscriptionTo and item.jid.host != self.domain:
     1054                    self.probe(item.jid, fromJID)
     1055        else:
     1056            if resource not in self.presences[user]:
     1057                # initial presence with another available resource
     1058
     1059                # send last known presences from remote contacts
     1060                remotePresences = self.remotePresences[user]
     1061                for entity, remotePresence in remotePresences.iteritems():
     1062                    self.sessionManager.deliverStanza(remotePresence.element,
     1063                                                      fromJID)
     1064
     1065            # send presence to other resources
     1066            self._broadcastToOtherResources(presence)
     1067
     1068        # Send last known local presences
     1069        if user not in self.presences or resource not in self.presences[user]:
     1070            for item in roster.itervalues():
     1071                if item.subscriptionTo and \
     1072                   item.jid.host == self.domain and \
     1073                   item.jid.user in self.presences:
     1074                    for contactPresence in \
     1075                            self.presences[item.jid.user].itervalues():
     1076                        outPresence = clonePresence(contactPresence)
     1077                        outPresence['to'] = fromJID.userhost()
     1078                        self.sessionManager.deliverStanza(outPresence, fromJID)
     1079
     1080        # broadcast presence
     1081        self._broadcastToContacts(presence)
     1082
     1083        # save presence
     1084        self.presences[user][resource] = presence
     1085        self.sessionManager.sessions[user][resource].presence = presence
     1086
     1087
     1088    #def _on_availableDirected(self, presence):
     1089    #    pass
     1090
     1091
     1092    def _on_availableInbound(self, presence):
     1093        fromJID = presence.sender
     1094        toJID = presence.recipient
     1095        if (toJID.user in self.roster and
     1096            toJID.user in self.presences):
     1097            for resource in self.presences[toJID.user]:
     1098                resourceJID = JID(tuple=(toJID.user, toJID.host, resource))
     1099                self.sessionManager.deliverStanza(presence.element, resourceJID)
     1100            self.remotePresences[toJID.user][fromJID] = presence
     1101        else:
     1102            # no such user or no available resource, ignore this stanza
     1103            pass
     1104
     1105
     1106    def _on_unavailableBroadcast(self, presence):
     1107        fromJID = presence.sender
     1108        user, resource = fromJID.user, fromJID.resource
     1109
     1110        # broadcast presence
     1111        self._broadcastToContacts(presence)
     1112
     1113        if user in self.presences:
     1114            # send presence to other resources
     1115            self._broadcastToOtherResources(presence)
     1116
     1117            # update stored presences
     1118            if resource in self.presences[user]:
     1119                del self.presences[user][resource]
     1120
     1121            if not self.presences[user]:
     1122                # last resource to become unavailable
     1123                del self.presences[user]
     1124
     1125                # TODO: save last unavailable presence
     1126
     1127
     1128#    def _on_unavailableDirected(self, presence):
     1129#        pass
     1130
     1131
     1132#    def _on_unavailableInbound(self, presence):
     1133#        pass
     1134
     1135
     1136    def getDirection(self, presence):
     1137        if not presence.recipient:
     1138            if presence.sender.host == self.domain:
     1139                # broadcast presence from local domain
     1140                return 'Broadcast'
     1141            else:
     1142                raise Exception("Unexpected missing to address")
     1143        else:
     1144            if presence.sender.host == self.domain:
     1145                # directed presence from local domain
     1146                return 'Directed'
     1147            elif presence.recipient.host == self.domain:
     1148                # incoming remote presence
     1149                return 'Inbound'
     1150            else:
     1151                raise Exception("Badly routed presence")
     1152
     1153    def availableReceived(self, presence):
     1154        direction = self.getDirection(presence)
     1155        handler = getattr(self, "_on_available%s" % direction)
     1156        if handler:
     1157            handler(presence)
     1158            presence.handled = True
     1159        else:
     1160            print "Unhandled: %r" % presence.element.toXml()
     1161
     1162
     1163    def unavailableReceived(self, presence):
     1164        direction = self.getDirection(presence)
     1165        handler = getattr(self, "_on_unavailable%s" % direction)
     1166        if handler:
     1167            handler(presence)
     1168            presence.handled = True
     1169        else:
     1170            print "Unhandled: %r" % presence.element.toXml()
     1171
     1172
     1173
     1174    def probeReceived(self, presence):
     1175        fromJID = presence.sender
     1176        toJID = presence.recipient
     1177
     1178        if toJID.user not in self.roster or \
     1179           fromJID.userhost() not in self.roster[toJID.user] or \
     1180           not self.roster[toJID.user][fromJID.userhost()].subscriptionFrom:
     1181            # send unsubscribed
     1182            pass
     1183        elif toJID.user not in self.presences:
     1184            # send last unavailable or nothing
     1185            pass
     1186        else:
     1187            for resourcePresence in self.presences[toJID.user].itervalues():
     1188                outPresence = clonePresence(resourcePresence)
     1189                outPresence['to'] = fromJID.userhost()
     1190                self.send(outPresence)
Note: See TracBrowser for help on using the repository browser.