source: ralphm-patches/xmpp_client_service.patch @ 40:75ee881b216f

Last change on this file since 40:75ee881b216f was 40:75ee881b216f, checked in by Ralph Meijer <ralphm@…>, 10 years ago

Fix up outbound presence handling, prepare for presence subs.

File size: 39.7 KB
  • new file doc/examples/client_service.tac

    diff -r 62f841ed2a99 doc/examples/client_service.tac
    - +  
     1from twisted.application import service, strports
     2from twisted.internet import defer
     3
     4from wokkel import client, xmppim
     5from wokkel.component import InternalComponent, Router
     6from wokkel.generic import FallbackHandler
     7from wokkel.ping import PingHandler
     8from wokkel.xmppim import RosterItem
     9
     10from twisted.words.protocols.jabber.jid import internJID as JID
     11
     12import socket
     13domain = socket.gethostname()
     14
     15RALPHM = JID('ralphm@'+domain)
     16INTOSI = JID('intosi@'+domain)
     17TERMIE = JID('termie@'+domain)
     18
     19roster = {
     20    'ralphm': {
     21        INTOSI: RosterItem(INTOSI,
     22                           subscriptionTo=True,
     23                           subscriptionFrom=True,
     24                           name='Intosi',
     25                           groups=set(['Friends'])),
     26        TERMIE: RosterItem(TERMIE,
     27                           subscriptionTo=True,
     28                           subscriptionFrom=True,
     29                           name='termie'),
     30        },
     31    'termie': {
     32        RALPHM: RosterItem(RALPHM,
     33                           subscriptionTo=True,
     34                           subscriptionFrom=True,
     35                           name='ralphm'),
     36        }
     37    }
     38
     39accounts = set(roster.keys())
     40
     41
     42class StaticRoster(xmppim.RosterServerProtocol):
     43
     44    def __init__(self, roster):
     45        xmppim.RosterServerProtocol.__init__(self)
     46        self.roster = roster
     47
     48    def getRoster(self, entity):
     49        return defer.succeed(self.roster[entity.user].values())
     50
     51
     52
     53application = service.Application("Jabber server")
     54
     55router = Router()
     56component = InternalComponent(router, domain)
     57component.setServiceParent(application)
     58
     59sessionManager = client.SessionManager(domain, accounts)
     60sessionManager.setHandlerParent(component)
     61
     62xmppim.AccountIQHandler(sessionManager).setHandlerParent(component)
     63xmppim.AccountMessageHandler(sessionManager).setHandlerParent(component)
     64xmppim.PresenceServerHandler(sessionManager, domain, roster).setHandlerParent(component)
     65FallbackHandler().setHandlerParent(component)
     66StaticRoster(roster).setHandlerParent(component)
     67PingHandler().setHandlerParent(component)
     68
     69c2sFactory = client.XMPPC2SServerFactory(sessionManager)
     70c2sFactory.logTraffic = True
     71c2sService = strports.service('5224', c2sFactory)
     72c2sService.setServiceParent(application)
     73
     74sessionManager.connectionManager = c2sFactory
  • wokkel/client.py

    diff -r 62f841ed2a99 wokkel/client.py
    a b  
    1010that should probably eventually move there.
    1111"""
    1212
     13import base64
     14
    1315from twisted.application import service
    14 from twisted.internet import reactor
     16from twisted.internet import defer, reactor
    1517from twisted.names.srvconnect import SRVConnector
    16 from twisted.words.protocols.jabber import client, sasl, xmlstream
     18from twisted.python import log, randbytes
     19from twisted.words.protocols.jabber import client, error, sasl, xmlstream
     20from twisted.words.protocols.jabber.jid import JID, internJID
     21from twisted.words.xish import domish, utility
    1722
    1823from wokkel import generic
    19 from wokkel.subprotocols import StreamManager
     24from wokkel.compat import XmlStreamServerFactory
     25from wokkel.subprotocols import StreamManager, XMPPHandler
     26
     27NS_CLIENT = 'jabber:client'
     28
     29XPATH_ALL = "/*"
     30XPATH_AUTH = "/auth[@xmlns='%s']" % sasl.NS_XMPP_SASL
     31XPATH_BIND = "/iq[@type='set']/bind[@xmlns='%s']" % client.NS_XMPP_BIND
     32XPATH_SESSION = "/iq[@type='set']/session[@xmlns='%s']" % \
     33                client.NS_XMPP_SESSION
    2034
    2135class CheckAuthInitializer(object):
    2236    """
     
    5165    autentication.
    5266    """
    5367
    54     namespace = 'jabber:client'
     68    namespace = NS_CLIENT
    5569
    5670    def __init__(self, jid, password):
    5771        xmlstream.ConnectAuthenticator.__init__(self, jid.host)
     
    186200    c = XMPPClientConnector(reactor, domain, factory)
    187201    c.connect()
    188202    return factory.deferred
     203
     204
     205
     206class XMPPClientListenAuthenticator(xmlstream.ListenAuthenticator):
     207    namespace = NS_CLIENT
     208
     209    def __init__(self, service):
     210        self.service = service
     211        self.failureGrace = 3
     212        self.state = 'auth'
     213
     214
     215    def associateWithStream(self, xs):
     216        xmlstream.ListenAuthenticator.associateWithStream(self, xs)
     217        self.xmlstream.addObserver(XPATH_ALL, self.onElementFallback, -1)
     218
     219
     220    def onElementFallback(self, element):
     221        if element.handled:
     222            return
     223
     224        exc = error.StreamError('not-authorized')
     225        self.xmlstream.sendStreamError(exc)
     226
     227
     228    def streamStarted(self, rootElement):
     229        xmlstream.ListenAuthenticator.streamStarted(self, rootElement)
     230
     231        # check namespace
     232        #if self.xmlstream.namespace != self.namespace:
     233        #    self.xmlstream.namespace = self.namespace
     234        #    exc = error.StreamError('invalid-namespace')
     235        #    self.xmlstream.sendStreamError(exc)
     236        #    return
     237
     238        # TODO: check domain (self.service.domain)
     239
     240        self.xmlstream.sendHeader()
     241
     242        try:
     243            stateHandlerName = 'streamStarted_' + self.state
     244            stateHandler = getattr(self, stateHandlerName)
     245        except AttributeError:
     246            log.msg('streamStarted handler for', self.state, 'not found')
     247        else:
     248            stateHandler()
     249
     250
     251    def toState(self, state):
     252        self.state = state
     253        if state == 'initialized':
     254            self.xmlstream.removeObserver(XPATH_ALL, self.onElementFallback)
     255            self.xmlstream.addOnetimeObserver(XPATH_SESSION, self.onSession, 1)
     256            self.xmlstream.dispatch(self.xmlstream,
     257                                    xmlstream.STREAM_AUTHD_EVENT)
     258
     259
     260    def streamStarted_auth(self):
     261        features = domish.Element((xmlstream.NS_STREAMS, 'features'))
     262        features.addElement((sasl.NS_XMPP_SASL, 'mechanisms'))
     263        features.mechanisms.addElement('mechanism', content='PLAIN')
     264        self.xmlstream.send(features)
     265        self.xmlstream.addOnetimeObserver(XPATH_AUTH, self.onAuth)
     266
     267
     268    def onAuth(self, auth):
     269        auth.handled = True
     270
     271        if auth.getAttribute('mechanism') != 'PLAIN':
     272            failure = domish.Element((sasl.NS_XMPP_SASL, 'failure'))
     273            failure.addElement('invalid-mechanism')
     274            self.xmlstream.send(failure)
     275
     276            # Close stream on too many failing authentication attempts
     277            self.failureGrace -= 1
     278            if self.failureGrace == 0:
     279                self.xmlstream.sendFooter()
     280            else:
     281                self.xmlstream.addOnetimeObserver(XPATH_AUTH, self.onAuth)
     282
     283            return
     284
     285        initialResponse = base64.b64decode(unicode(auth))
     286        authzid, authcid, passwd = initialResponse.split('\x00')
     287
     288        # TODO: check passwd
     289
     290        # authenticated
     291
     292        self.username = authcid
     293
     294        success = domish.Element((sasl.NS_XMPP_SASL, 'success'))
     295        self.xmlstream.send(success)
     296        self.xmlstream.reset()
     297
     298        self.toState('bind')
     299
     300
     301    def streamStarted_bind(self):
     302        features = domish.Element((xmlstream.NS_STREAMS, 'features'))
     303        features.addElement((client.NS_XMPP_BIND, 'bind'))
     304        features.addElement((client.NS_XMPP_SESSION, 'session'))
     305        self.xmlstream.send(features)
     306        self.xmlstream.addOnetimeObserver(XPATH_BIND, self.onBind)
     307
     308
     309    def onBind(self, iq):
     310        def cb(boundJID):
     311            self.xmlstream.otherEntity = boundJID
     312            self.toState('initialized')
     313
     314            response = xmlstream.toResponse(iq, 'result')
     315            response.addElement((client.NS_XMPP_BIND, 'bind'))
     316            response.bind.addElement((client.NS_XMPP_BIND, 'jid'),
     317                                  content=boundJID.full())
     318
     319            return response
     320
     321        def eb(failure):
     322            if not isinstance(failure, error.StanzaError):
     323                log.msg(failure)
     324                exc = error.StanzaError('internal-server-error')
     325            else:
     326                exc = failure.value
     327
     328            return exc.toResponse(iq)
     329
     330        iq.handled = True
     331        resource = unicode(iq.bind) or None
     332        d = self.service.bindResource(self.username,
     333                                      self.service.domain,
     334                                      resource)
     335        d.addCallback(cb)
     336        d.addErrback(eb)
     337        d.addCallback(self.xmlstream.send)
     338
     339
     340    def onSession(self, iq):
     341        iq.handled = True
     342
     343        reply = domish.Element((None, 'iq'))
     344        reply['type'] = 'result'
     345        if iq.getAttribute('id'):
     346            reply['id'] = iq['id']
     347        reply.addElement((client.NS_XMPP_SESSION, 'session'))
     348        self.xmlstream.send(reply)
     349
     350
     351
     352class RecipientUnavailable(Exception):
     353    """
     354    The addressed entity is not, or no longer, available.
     355    """
     356
     357
     358
     359class XMPPC2SServerFactory(XmlStreamServerFactory):
     360
     361    def __init__(self, service):
     362        self.service = service
     363
     364        def authenticatorFactory():
     365            return XMPPClientListenAuthenticator(service)
     366
     367        XmlStreamServerFactory.__init__(self, authenticatorFactory)
     368        self.addBootstrap(xmlstream.STREAM_CONNECTED_EVENT,
     369                          self.onConnectionMade)
     370        self.addBootstrap(xmlstream.STREAM_AUTHD_EVENT,
     371                          self.onAuthenticated)
     372
     373        self.serial = 0
     374        self.streams = {}
     375
     376
     377    def onConnectionMade(self, xs):
     378        """
     379        Called when a client-to-server connection was made.
     380
     381        This enables traffic debugging on incoming streams.
     382        """
     383        xs.serial = self.serial
     384        self.serial += 1
     385
     386        log.msg("Client connection %d made" % xs.serial)
     387
     388        def logDataIn(buf):
     389            log.msg("RECV (%d): %r" % (xs.serial, buf))
     390
     391        def logDataOut(buf):
     392            log.msg("SEND (%d): %r" % (xs.serial, buf))
     393
     394        if self.logTraffic:
     395            xs.rawDataInFn = logDataIn
     396            xs.rawDataOutFn = logDataOut
     397
     398        xs.addObserver(xmlstream.STREAM_ERROR_EVENT, self.onError)
     399
     400
     401    def onAuthenticated(self, xs):
     402        log.msg("Client connection %d authenticated" % xs.serial)
     403
     404        xs.addObserver(xmlstream.STREAM_END_EVENT, self.onConnectionLost,
     405                                                   0, xs)
     406        xs.addObserver('/*', self.onElement, 0, xs)
     407
     408        # Record this stream as bound to the authenticated JID
     409        self.streams[xs.otherEntity] = xs
     410
     411
     412    def onConnectionLost(self, xs, reason):
     413        log.msg("Client connection %d disconnected" % xs.serial)
     414
     415        entity = xs.otherEntity
     416        self.service.unbindResource(entity.user,
     417                                    entity.host,
     418                                    entity.resource,
     419                                    reason)
     420
     421        # If the lost connections had been bound, remove the reference
     422        if xs.otherEntity in self.streams:
     423            del self.streams[xs.otherEntity]
     424
     425
     426    def onError(self, reason):
     427        log.err(reason, "Stream Error")
     428
     429
     430    def onElement(self, xs, stanza):
     431        """
     432        Called when an element was received from one of the connected streams.
     433
     434        """
     435        if stanza.handled:
     436            return
     437        else:
     438            self.service.onElement(stanza, xs.otherEntity)
     439
     440
     441    def deliverStanza(self, element, recipient):
     442        if recipient in self.streams:
     443            self.streams[recipient].send(element)
     444        else:
     445            raise RecipientUnavailable(u"There is no connection for %s" %
     446                                       recipient.full())
     447
     448
     449class Session(object):
     450    def __init__(self, entity):
     451        self.entity = entity
     452        self.connected = False
     453        self.interested = False
     454        self.presence = None
     455
     456
     457
     458class SessionManager(XMPPHandler):
     459
     460    """
     461    Session Manager.
     462
     463    @ivar xmlstream: XML Stream to inject incoming stanzas from client
     464        connections into. Stanzas where the C{'to'} attribute is not set
     465        or is directed at the local domain are injected as if received on
     466        the XML Stream (using C{dispatch}), other stanzas are injected as if
     467        they were sent from the XML Stream (using C{send}).
     468    """
     469
     470    def __init__(self, domain, accounts):
     471        XMPPHandler.__init__(self)
     472        self.domain = domain
     473        self.accounts = accounts
     474
     475        self.connectionManager = None
     476        self.sessions = {}
     477        self.clientStream = utility.EventDispatcher()
     478        self.clientStream.addObserver('/*', self.routeOrDeliver, -1)
     479
     480
     481    def bindResource(self, localpart, domain, resource):
     482        if domain != self.domain:
     483            raise Exception("I don't host this domain!")
     484
     485        try:
     486            userSessions = self.sessions[localpart]
     487        except KeyError:
     488            userSessions = self.sessions[localpart] = {}
     489
     490        if resource is None:
     491            resource = randbytes.secureRandom(8).encode('hex')
     492        elif resource in self.userSessions:
     493            resource = resource + ' ' + randbytes.secureRandom(8).encode('hex')
     494
     495        entity = JID(tuple=(localpart, domain, resource))
     496        session = Session(entity)
     497        session.connected = True
     498        userSessions[resource] = session
     499
     500        return defer.succeed(entity)
     501
     502
     503    def unbindResource(self, localpart, domain, resource, reason=None):
     504        try:
     505            session = self.sessions[localpart][resource]
     506        except KeyError:
     507            pass
     508        else:
     509            session.connected = False
     510            del self.sessions[localpart][resource]
     511            if not self.sessions[localpart]:
     512                del self.sessions[localpart]
     513
     514        return defer.succeed(None)
     515
     516
     517    def onElement(self, element, sender):
     518        # Make sure each stanza has a sender address
     519        if (element.name == 'presence' and
     520            element.getAttribute('type') in ('subscribe', 'subscribed',
     521                                             'unsubscribe', 'unsubscribed')):
     522            element['from'] = sender.userhost()
     523        else:
     524            element['from'] = sender.full()
     525
     526        self.clientStream.dispatch(element)
     527
     528
     529    def routeOrDeliver(self, element):
     530        if element.handled:
     531            return
     532
     533        if (not element.hasAttribute('to') or
     534            internJID(element['to']).host == self.domain):
     535            # This stanza is for local delivery
     536            log.msg("Delivering locally: %r" % element.toXml())
     537            self.xmlstream.dispatch(element)
     538        else:
     539            # This stanza is for remote routing
     540            log.msg("Routing remotely: %r" % element.toXml())
     541            self.xmlstream.send(element)
     542
     543
     544    def deliverStanza(self, element, recipient):
     545        if self.connectionManager:
     546            self.connectionManager.deliverStanza(element, recipient)
     547        else:
     548            raise Exception("No connection manager set")
  • wokkel/component.py

    diff -r 62f841ed2a99 wokkel/component.py
    a b  
    313313        """
    314314        destination = JID(stanza['to'])
    315315
    316         log.msg("Routing to %s: %r" % (destination.full(), stanza.toXml()))
    317316
    318317        if destination.host in self.routes:
     318            msg = "Routing to %s (default route): %r"
     319            log.msg("Routing to %s: %r" % (destination.full(),
     320                                           stanza.toXml()))
    319321            self.routes[destination.host].send(stanza)
     322        elif None in self.routers:
     323            log.msg("Routing to %s (default route): %r" % (destination.full(),
     324                                                           stanza.toXml()))
     325            self.routes[None].send(stanza)
    320326        else:
    321             self.routes[None].send(stanza)
     327            log.msg("No route to %s: %r" % (destination.full(),
     328                                            stanza.toXml()))
     329            if stanza.getAttribute('type') not in ('result', 'error'):
     330                # No route, send back error
     331                exc = error.StanzaError('remote-server-timeout')
     332                response = exc.toResponse(stanza)
     333                self.route(response)
    322334
    323335
    324336
  • wokkel/xmppim.py

    diff -r 62f841ed2a99 wokkel/xmppim.py
    a b  
    1212All of it should eventually move to Twisted.
    1313"""
    1414
     15import copy
     16
     17from twisted.python import log
    1518from twisted.words.protocols.jabber import error, xmlstream
    16 from twisted.words.protocols.jabber.jid import JID
     19from twisted.words.protocols.jabber.jid import JID, internJID
    1720from twisted.words.xish import domish
    1821
    1922from wokkel.compat import IQ
     
    8588            handler(presence)
    8689
    8790    def _onPresenceAvailable(self, presence):
    88         entity = JID(presence["from"])
     91        entity = internJID(presence["from"])
    8992
    9093        show = unicode(presence.show or '')
    9194        if show not in ['away', 'xa', 'chat', 'dnd']:
     
    101104        self.availableReceived(entity, show, statuses, priority)
    102105
    103106    def _onPresenceUnavailable(self, presence):
    104         entity = JID(presence["from"])
     107        entity = internJID(presence["from"])
    105108
    106109        statuses = self._getStatuses(presence)
    107110
    108111        self.unavailableReceived(entity, statuses)
    109112
    110113    def _onPresenceSubscribed(self, presence):
    111         self.subscribedReceived(JID(presence["from"]))
     114        self.subscribedReceived(internJID(presence["from"]))
    112115
    113116    def _onPresenceUnsubscribed(self, presence):
    114         self.unsubscribedReceived(JID(presence["from"]))
     117        self.unsubscribedReceived(internJID(presence["from"]))
    115118
    116119    def _onPresenceSubscribe(self, presence):
    117         self.subscribeReceived(JID(presence["from"]))
     120        self.subscribeReceived(internJID(presence["from"]))
    118121
    119122    def _onPresenceUnsubscribe(self, presence):
    120         self.unsubscribeReceived(JID(presence["from"]))
     123        self.unsubscribeReceived(internJID(presence["from"]))
    121124
    122125
    123126    def availableReceived(self, entity, show=None, statuses=None, priority=0):
     
    125128        Available presence was received.
    126129
    127130        @param entity: entity from which the presence was received.
    128         @type entity: {JID}
     131        @type entity: L{JID}
    129132        @param show: detailed presence information. One of C{'away'}, C{'xa'},
    130133                     C{'chat'}, C{'dnd'} or C{None}.
    131134        @type show: C{str} or C{NoneType}
     
    143146        Unavailable presence was received.
    144147
    145148        @param entity: entity from which the presence was received.
    146         @type entity: {JID}
     149        @type entity: L{JID}
    147150        @param statuses: dictionary of natural language descriptions of the
    148151                         availability status, keyed by the language
    149152                         descriptor. A status without a language
     
    156159        Subscription approval confirmation was received.
    157160
    158161        @param entity: entity from which the confirmation was received.
    159         @type entity: {JID}
     162        @type entity: L{JID}
    160163        """
    161164
    162165    def unsubscribedReceived(self, entity):
     
    164167        Unsubscription confirmation was received.
    165168
    166169        @param entity: entity from which the confirmation was received.
    167         @type entity: {JID}
     170        @type entity: L{JID}
    168171        """
    169172
    170173    def subscribeReceived(self, entity):
     
    172175        Subscription request was received.
    173176
    174177        @param entity: entity from which the request was received.
    175         @type entity: {JID}
     178        @type entity: L{JID}
    176179        """
    177180
    178181    def unsubscribeReceived(self, entity):
     
    180183        Unsubscription request was received.
    181184
    182185        @param entity: entity from which the request was received.
    183         @type entity: {JID}
     186        @type entity: L{JID}
    184187        """
    185188
    186189    def available(self, entity=None, show=None, statuses=None, priority=0):
     
    188191        Send available presence.
    189192
    190193        @param entity: optional entity to which the presence should be sent.
    191         @type entity: {JID}
     194        @type entity: L{JID}
    192195        @param show: optional detailed presence information. One of C{'away'},
    193196                     C{'xa'}, C{'chat'}, C{'dnd'}.
    194197        @type show: C{str}
     
    207210        Send unavailable presence.
    208211
    209212        @param entity: optional entity to which the presence should be sent.
    210         @type entity: {JID}
     213        @type entity: L{JID}
    211214        @param statuses: dictionary of natural language descriptions of the
    212215                         availability status, keyed by the language
    213216                         descriptor. A status without a language
     
    221224        Send subscription request
    222225
    223226        @param entity: entity to subscribe to.
    224         @type entity: {JID}
     227        @type entity: L{JID}
    225228        """
    226229        self.send(Presence(to=entity, type='subscribe'))
    227230
     
    230233        Send unsubscription request
    231234
    232235        @param entity: entity to unsubscribe from.
    233         @type entity: {JID}
     236        @type entity: L{JID}
    234237        """
    235238        self.send(Presence(to=entity, type='unsubscribe'))
    236239
     
    239242        Send subscription confirmation.
    240243
    241244        @param entity: entity that subscribed.
    242         @type entity: {JID}
     245        @type entity: L{JID}
    243246        """
    244247        self.send(Presence(to=entity, type='subscribed'))
    245248
     
    248251        Send unsubscription confirmation.
    249252
    250253        @param entity: entity that unsubscribed.
    251         @type entity: {JID}
     254        @type entity: L{JID}
    252255        """
    253256        self.send(Presence(to=entity, type='unsubscribed'))
    254257
     
    395398        self.xmlstream.addObserver("/presence", self._onPresence)
    396399
    397400
    398     def _onPresence(self, element):
     401    def parsePresence(self, element):
     402        """
     403        Parse presence.
     404        """
    399405        stanza = Stanza.fromElement(element)
    400406
    401407        presenceType = stanza.stanzaType or 'available'
     
    405411        except KeyError:
    406412            return
    407413
    408         presence = parser.fromElement(element)
     414        return parser.fromElement(element)
     415
     416
     417    def _onPresence(self, element):
     418        presence = self.parsePresence(element)
     419        presenceType = presence.stanzaType or 'available'
    409420
    410421        try:
    411422            handler = getattr(self, '%sReceived' % presenceType)
     
    478489
    479490        @param recipient: Optional Recipient to which the presence should be
    480491            sent.
    481         @type recipient: {JID}
     492        @type recipient: L{JID}
    482493
    483494        @param show: Optional detailed presence information. One of C{'away'},
    484495            C{'xa'}, C{'chat'}, C{'dnd'}.
     
    503514        Send unavailable presence.
    504515
    505516        @param recipient: Optional entity to which the presence should be sent.
    506         @type recipient: {JID}
     517        @type recipient: L{JID}
    507518
    508519        @param statuses: dictionary of natural language descriptions of the
    509520            availability status, keyed by the language descriptor. A status
     
    520531        Send subscription request
    521532
    522533        @param recipient: Entity to subscribe to.
    523         @type recipient: {JID}
     534        @type recipient: L{JID}
    524535        """
    525536        presence = SubscriptionPresence(recipient=recipient, sender=sender)
    526537        presence.stanzaType = 'subscribe'
     
    532543        Send unsubscription request
    533544
    534545        @param recipient: Entity to unsubscribe from.
    535         @type recipient: {JID}
     546        @type recipient: L{JID}
    536547        """
    537548        presence = SubscriptionPresence(recipient=recipient, sender=sender)
    538549        presence.stanzaType = 'unsubscribe'
     
    544555        Send subscription confirmation.
    545556
    546557        @param recipient: Entity that subscribed.
    547         @type recipient: {JID}
     558        @type recipient: L{JID}
    548559        """
    549560        presence = SubscriptionPresence(recipient=recipient, sender=sender)
    550561        presence.stanzaType = 'subscribed'
     
    556567        Send unsubscription confirmation.
    557568
    558569        @param recipient: Entity that unsubscribed.
    559         @type recipient: {JID}
     570        @type recipient: L{JID}
    560571        """
    561572        presence = SubscriptionPresence(recipient=recipient, sender=sender)
    562573        presence.stanzaType = 'unsubscribed'
     
    568579        Send presence probe.
    569580
    570581        @param recipient: Entity to be probed.
    571         @type recipient: {JID}
     582        @type recipient: L{JID}
    572583        """
    573584        presence = ProbePresence(recipient=recipient, sender=sender)
    574585        self.send(presence.toElement())
     
    652663
    653664
    654665    def _parseRosterItem(self, element):
    655         jid = JID(element['jid'])
     666        jid = internJID(element['jid'])
    656667        item = RosterItem(jid)
    657668        item.name = element.getAttribute('name')
    658669        subscription = element.getAttribute('subscription')
     
    715726        itemElement = iq.query.item
    716727
    717728        if unicode(itemElement['subscription']) == 'remove':
    718             self.onRosterRemove(JID(itemElement['jid']))
     729            self.onRosterRemove(internJID(itemElement['jid']))
    719730        else:
    720731            item = self._parseRosterItem(iq.query.item)
    721732            self.onRosterSet(item)
     
    763774    def _onRosterGet(self, iq):
    764775        iq.handled = True
    765776
    766         d = self.getRoster(JID(iq["from"]))
     777        d = self.getRoster(internJID(iq["from"]))
    767778        d.addCallback(self._toRosterReply, iq)
    768779        d.addErrback(lambda _: error.ErrorStanza('internal-error').toResponse(iq))
    769780        d.addBoth(self.send)
     
    808819        """
    809820        Called when a message stanza was received.
    810821        """
     822
     823
     824
     825class AccountIQHandler(XMPPHandler):
     826
     827    def __init__(self, sessionManager):
     828        XMPPHandler.__init__(self)
     829        self.sessionManager = sessionManager
     830
     831
     832    def connectionMade(self):
     833        self.xmlstream.addObserver('/iq', self.onIQ, 1)
     834
     835
     836    def onIQ(self, iq):
     837        """
     838        Handler for iq stanzas to user accounts' connected resources.
     839
     840        If the recipient is a bare JID or there is no associated user, this
     841        handler ignores the stanza, so that other handlers have a chance
     842        to pick it up. If used, L{generic.FallbackHandler} will respond with a
     843        C{'service-unavailable'} stanza error if no other handlers handle
     844        the iq.
     845        """
     846
     847        if iq.handled:
     848            return
     849
     850        try:
     851            recipient = internJID(iq['to'])
     852        except KeyError:
     853            return
     854
     855        if not recipient.user:
     856            # This is not for an account, ignore it
     857            return
     858        elif recipient.user not in self.sessionManager.accounts:
     859            # This is not a user, ignore it
     860            return
     861        elif not recipient.resource:
     862            # Bare JID at local domain, ignore it
     863            return
     864        elif recipient.user in self.sessionManager.sessions:
     865            # Full JID with connected resource, deliver the stanza
     866            self.sessionManager.deliverStanza(iq, recipient)
     867        else:
     868            # Full JID without connected resource, return error
     869            exc = error.StanzaError('service-unavailable')
     870            if iq['type'] in ('result', 'error'):
     871                log.err(exc, 'Could not deliver IQ response')
     872            else:
     873                self.send(exc.toResponse(iq))
     874
     875        iq.handled = True
     876
     877
     878
     879class AccountMessageHandler(XMPPHandler):
     880
     881    def __init__(self, sessionManager):
     882        XMPPHandler.__init__(self)
     883        self.sessionManager = sessionManager
     884
     885
     886    def connectionMade(self):
     887        self.xmlstream.addObserver('/message', self.onMessage, 1)
     888
     889
     890    def onMessage(self, message):
     891        """
     892        Handler for message stanzas to user accounts.
     893        """
     894
     895        if message.handled:
     896            return
     897
     898        try:
     899            recipient = internJID(message['to'])
     900        except KeyError:
     901            return
     902
     903        stanzaType = message.getAttribute('type', 'normal')
     904
     905        try:
     906            if not recipient.user:
     907                # This is not for an account, ignore it
     908                return
     909            elif recipient.user not in self.sessionManager.accounts:
     910                # This is not a user, ignore it
     911                return
     912            elif recipient.resource:
     913                userSessions = self.sessionManager.sessions.get(recipient.user,
     914                                                                {})
     915                if recipient.resource in userSessions:
     916                    self.sessionManager.deliverStanza(message, recipient)
     917                else:
     918                    if stanzaType in ('normal', 'chat', 'headline'):
     919                        self.onMessageBareJID(message, recipient.userhostJID())
     920                    elif stanzaType == 'error':
     921                        log.msg("Dropping message to unconnected resource %r" %
     922                                recipient.full())
     923                    elif stanzaType == 'groupchat':
     924                        raise error.StanzaError('service-unavailable')
     925            else:
     926                self.onMessageBareJID(message, recipient)
     927        except error.StanzaError, exc:
     928            if stanzaType == 'error':
     929                log.err(exc, "Undeliverable error")
     930            else:
     931                self.send(exc.toResponse(message))
     932
     933        message.handled = True
     934
     935
     936    def onMessageBareJID(self, message, bareJID):
     937        stanzaType = message.getAttribute('type', 'normal')
     938
     939        userSessions = self.sessionManager.sessions.get(bareJID.user, {})
     940
     941        recipients = set()
     942
     943        if stanzaType == 'headline':
     944            for session in userSessions:
     945                if session.presence.priority >= 0:
     946                    recipients.add(session.entity)
     947        elif stanzaType in ('chat', 'normal'):
     948            priorities = {}
     949            for session in userSessions.itervalues():
     950                if not session.presence or not session.presence.available:
     951                    continue
     952                priority = session.presence.priority
     953                if priority >= 0:
     954                    priorities.setdefault(priority, set()).add(session.entity)
     955                maxPriority = max(priorities.keys())
     956                recipients.update(priorities[maxPriority])
     957        elif stanzaType == 'groupchat':
     958            raise error.StanzaError('service-unavailable')
     959
     960        if recipients:
     961            for recipient in recipients:
     962                self.sessionManager.deliverStanza(message, recipient)
     963        elif stanzaType in ('chat', 'normal'):
     964            raise error.StanzaError('service-unavailable')
     965        else:
     966            # silently discard
     967            log.msg("Discarding message to %r" % message['to'])
     968
     969
     970
     971
     972def clonePresence(presence):
     973    """
     974    Make a deep copy of a presence stanza.
     975
     976    The returned presence stanza is an orphaned deep copy of the given
     977    original.
     978
     979    @note: Since the reference to the original parent, if any, is gone,
     980    inherited attributes like C{xml:lang} are not preserved.
     981    """
     982    element = presence.element
     983
     984    parent = element.parent
     985    element.parent = None
     986    newElement = copy.deepcopy(element)
     987    element.parent = parent
     988    return newElement
     989
     990
     991
     992class PresenceServerHandler(PresenceProtocol):
     993
     994    def __init__(self, sessionManager, domain, roster):
     995        PresenceProtocol.__init__(self)
     996        self.sessionManager = sessionManager
     997        self.domain = domain
     998        self.roster = roster
     999        self.presences = {} # user -> resource -> presence
     1000        self.offlinePresences = {} # user -> presence
     1001        self.remotePresences = {} # user -> remote entity -> presence
     1002
     1003        self.sessionManager.clientStream.addObserver('/presence',
     1004                                                     self._onPresenceOutbound)
     1005
     1006
     1007    def _broadcastToOtherResources(self, presence):
     1008        """
     1009        Broadcast presence to other available resources.
     1010        """
     1011        fromJID = presence.sender
     1012        for otherResource in self.presences[fromJID.user]:
     1013            if otherResource == fromJID.resource:
     1014                continue
     1015
     1016            resourceJID = JID(tuple=(fromJID.user, fromJID.host, otherResource))
     1017            outPresence = clonePresence(presence)
     1018            outPresence['to'] = resourceJID.full()
     1019            self.sessionManager.deliverStanza(outPresence, resourceJID)
     1020
     1021
     1022    def _broadcastToContacts(self, presence):
     1023        """
     1024        Broadcast presence to subscribed entities.
     1025        """
     1026        fromJID = presence.sender
     1027        roster = self.roster[fromJID.user]
     1028
     1029        for item in roster.itervalues():
     1030            if not item.subscriptionFrom:
     1031                continue
     1032
     1033            outPresence = clonePresence(presence)
     1034            outPresence['to'] = item.jid.full()
     1035
     1036            if item.jid.host == self.domain:
     1037                # local contact
     1038                if item.jid.user in self.presences:
     1039                    # broadcast to contact's available resources
     1040                    for itemResource in self.presences[item.jid.user]:
     1041                        resourceJID = JID(tuple=(item.jid.user,
     1042                                                 item.jid.host,
     1043                                                 itemResource))
     1044                        self.sessionManager.deliverStanza(outPresence,
     1045                                                          resourceJID)
     1046            else:
     1047                # remote contact
     1048                self.send(outPresence)
     1049
     1050
     1051    def _on_availableBroadcast(self, presence):
     1052        fromJID = presence.sender
     1053        user, resource = fromJID.user, fromJID.resource
     1054        roster = self.roster[user]
     1055
     1056        if user not in self.presences:
     1057            # initial presence
     1058            self.presences[user] = {}
     1059            self.remotePresences[user] = {}
     1060
     1061            # send out probes
     1062            for item in roster.itervalues():
     1063                if item.subscriptionTo and item.jid.host != self.domain:
     1064                    self.probe(item.jid, fromJID)
     1065        else:
     1066            if resource not in self.presences[user]:
     1067                # initial presence with another available resource
     1068
     1069                # send last known presences from remote contacts
     1070                remotePresences = self.remotePresences[user]
     1071                for entity, remotePresence in remotePresences.iteritems():
     1072                    self.sessionManager.deliverStanza(remotePresence.element,
     1073                                                      fromJID)
     1074
     1075            # send presence to other resources
     1076            self._broadcastToOtherResources(presence)
     1077
     1078        # Send last known local presences
     1079        if user not in self.presences or resource not in self.presences[user]:
     1080            for item in roster.itervalues():
     1081                if item.subscriptionTo and \
     1082                   item.jid.host == self.domain and \
     1083                   item.jid.user in self.presences:
     1084                    for contactPresence in \
     1085                            self.presences[item.jid.user].itervalues():
     1086                        outPresence = clonePresence(contactPresence)
     1087                        outPresence['to'] = fromJID.userhost()
     1088                        self.sessionManager.deliverStanza(outPresence, fromJID)
     1089
     1090        # broadcast presence
     1091        self._broadcastToContacts(presence)
     1092
     1093        # save presence
     1094        self.presences[user][resource] = presence
     1095        self.sessionManager.sessions[user][resource].presence = presence
     1096
     1097
     1098    #def _on_availableDirected(self, presence):
     1099    #    pass
     1100
     1101
     1102    def _on_availableInbound(self, presence):
     1103        fromJID = presence.sender
     1104        toJID = presence.recipient
     1105        if (toJID.user in self.roster and
     1106            toJID.user in self.presences):
     1107            for resource in self.presences[toJID.user]:
     1108                resourceJID = JID(tuple=(toJID.user, toJID.host, resource))
     1109                self.sessionManager.deliverStanza(presence.element, resourceJID)
     1110            self.remotePresences[toJID.user][fromJID] = presence
     1111        else:
     1112            # no such user or no available resource, ignore this stanza
     1113            pass
     1114
     1115
     1116    def _on_unavailableBroadcast(self, presence):
     1117        fromJID = presence.sender
     1118        user, resource = fromJID.user, fromJID.resource
     1119
     1120        # broadcast presence
     1121        self._broadcastToContacts(presence)
     1122
     1123        if user in self.presences:
     1124            # send presence to other resources
     1125            self._broadcastToOtherResources(presence)
     1126
     1127            # update stored presences
     1128            if resource in self.presences[user]:
     1129                del self.presences[user][resource]
     1130
     1131            if not self.presences[user]:
     1132                # last resource to become unavailable
     1133                del self.presences[user]
     1134
     1135                # TODO: save last unavailable presence
     1136
     1137
     1138#    def _on_unavailableDirected(self, presence):
     1139#        pass
     1140
     1141
     1142#    def _on_unavailableInbound(self, presence):
     1143#        pass
     1144
     1145
     1146    def _onPresenceOutbound(self, element):
     1147        log.msg("Got outbound presence: %r" % element.toXml())
     1148        presence = self.parsePresence(element)
     1149
     1150        presenceType = presence.stanzaType or 'available'
     1151        method = '%sReceivedOutbound' % presenceType
     1152        print method
     1153
     1154        try:
     1155            handler = getattr(self, method)
     1156        except AttributeError:
     1157            return
     1158        else:
     1159            element.handled = True
     1160            handler(presence)
     1161
     1162
     1163    def availableReceived(self, presence):
     1164        self._on_availableInbound(presence)
     1165
     1166
     1167    def availableReceivedOutbound(self, presence):
     1168        if presence.recipient:
     1169            pass # self._on_availableDirected(presence)
     1170        else:
     1171            self._on_availableBroadcast(presence)
     1172
     1173
     1174#    def unavailableReceived(self, presence):
     1175#        self._on_unavailableInbound(presence)
     1176
     1177
     1178    def unavailableReceivedOutbound(self, presence):
     1179        if presence.recipient:
     1180            pass # self._on_unavailableDirected(presence)
     1181        else:
     1182            self._on_unavailableBroadcast(presence)
     1183
     1184
     1185    def subscribedReceivedOutbound(self, presence):
     1186        log.msg("%r subscribed %s to its presence" % (presence.sender,
     1187                                                      presence.recipient))
     1188        #self.send(presence.element)
     1189
     1190
     1191    def unsubscribedReceivedOutbound(self, presence):
     1192        log.msg("%r unsubscribed %s from its presence" % (presence.sender,
     1193                                                          presence.recipient))
     1194        #self.send(presence.element)
     1195
     1196
     1197    def subscribeReceived(self, presence):
     1198        log.msg("%r requests subscription to %s" % (presence.sender,
     1199                                                    presence.recipient))
     1200
     1201
     1202    def subscribeReceivedOutbound(self, presence):
     1203        log.msg("%r requests subscription to %s" % (presence.sender,
     1204                                                    presence.recipient))
     1205        self.send(presence.element)
     1206
     1207
     1208    def unsubscribeReceived(self, presence):
     1209        log.msg("%r requests unsubscription from %s" % (presence.sender,
     1210                                                        presence.recipient))
     1211
     1212    def unsubscribeReceivedOutbound(self, presence):
     1213        log.msg("%r requests unsubscription from %s" % (presence.sender,
     1214                                                        presence.recipient))
     1215        self.send(presence.element)
     1216
     1217
     1218    def probeReceived(self, presence):
     1219        fromJID = presence.sender
     1220        toJID = presence.recipient
     1221
     1222        if toJID.user not in self.roster or \
     1223           fromJID.userhost() not in self.roster[toJID.user] or \
     1224           not self.roster[toJID.user][fromJID.userhost()].subscriptionFrom:
     1225            # send unsubscribed
     1226            pass
     1227        elif toJID.user not in self.presences:
     1228            # send last unavailable or nothing
     1229            pass
     1230        else:
     1231            for resourcePresence in self.presences[toJID.user].itervalues():
     1232                outPresence = clonePresence(resourcePresence)
     1233                outPresence['to'] = fromJID.userhost()
     1234                self.send(outPresence)
Note: See TracBrowser for help on using the repository browser.