Changeset 39:d6a0f8cbabf3 in ralphm-patches


Ignore:
Timestamp:
Feb 21, 2010, 2:29:49 PM (11 years ago)
Author:
Ralph Meijer <ralphm@…>
Branch:
default
Message:

Add message, iq and presence handling, remove ClientService? and redo
relationship between c2s factory and session manager.

File:
1 edited

Legend:

Unmodified
Added
Removed
  • xmpp_client_service.patch

    r38 r39  
    1 diff -r 1644083ca235 doc/examples/client_service.tac
     1diff -r 62f841ed2a99 doc/examples/client_service.tac
    22--- /dev/null   Thu Jan 01 00:00:00 1970 +0000
    3 +++ b/doc/examples/client_service.tac   Sat Feb 13 19:01:07 2010 +0100
    4 @@ -0,0 +1,64 @@
     3+++ b/doc/examples/client_service.tac   Sun Feb 21 14:28:09 2010 +0100
     4@@ -0,0 +1,74 @@
    55+from twisted.application import service, strports
    66+from twisted.internet import defer
    77+
    8 +from wokkel import client
     8+from wokkel import client, xmppim
     9+from wokkel.component import InternalComponent, Router
    910+from wokkel.generic import FallbackHandler
    10 +from wokkel.subprotocols import XMPPHandler
    11 +from wokkel.xmppim import RosterItem, RosterServerProtocol
     11+from wokkel.ping import PingHandler
     12+from wokkel.xmppim import RosterItem
    1213+
    1314+from twisted.words.protocols.jabber.jid import internJID as JID
     
    1617+domain = socket.gethostname()
    1718+
    18 +class StaticRoster(RosterServerProtocol):
    19 +
    20 +    def __init__(self):
    21 +        RosterServerProtocol.__init__(self)
    22 +        self.roster = {'ralphm': [
    23 +                           RosterItem(JID('intosi@'+domain),
    24 +                                      subscriptionTo=True,
    25 +                                      subscriptionFrom=True,
    26 +                                      name='Intosi',
    27 +                                      groups=set(['Friends'])),
    28 +                           RosterItem(JID('termie@'+domain),
    29 +                                      subscriptionTo=True,
    30 +                                      subscriptionFrom=True,
    31 +                                      name='termie'),
    32 +                           ],
    33 +                       'test': [],
    34 +                       }
     19+RALPHM = JID('ralphm@'+domain)
     20+INTOSI = JID('intosi@'+domain)
     21+TERMIE = JID('termie@'+domain)
     22+
     23+roster = {
     24+    'ralphm': {
     25+        INTOSI: RosterItem(INTOSI,
     26+                           subscriptionTo=True,
     27+                           subscriptionFrom=True,
     28+                           name='Intosi',
     29+                           groups=set(['Friends'])),
     30+        TERMIE: RosterItem(TERMIE,
     31+                           subscriptionTo=True,
     32+                           subscriptionFrom=True,
     33+                           name='termie'),
     34+        },
     35+    'termie': {
     36+        RALPHM: RosterItem(RALPHM,
     37+                           subscriptionTo=True,
     38+                           subscriptionFrom=True,
     39+                           name='ralphm'),
     40+        }
     41+    }
     42+
     43+accounts = set(roster.keys())
     44+
     45+
     46+class StaticRoster(xmppim.RosterServerProtocol):
     47+
     48+    def __init__(self, roster):
     49+        xmppim.RosterServerProtocol.__init__(self)
     50+        self.roster = roster
    3551+
    3652+    def getRoster(self, entity):
    37 +        return defer.succeed(self.roster[entity.user])
    38 +
    39 +
    40 +class Hello(XMPPHandler):
    41 +
    42 +    def q(self):
    43 +        from wokkel.xmppim import AvailabilityPresence
    44 +        p = AvailabilityPresence(JID('ralphm@'+domain+'/default'), JID('termie@'+domain+'/Home'), show='chat')
    45 +        self.parent.send(p.toElement())
    46 +
    47 +
    48 +    def connectionInitialized(self):
    49 +        from twisted.internet import reactor
    50 +        reactor.callLater(5, self.q)
     53+        return defer.succeed(self.roster[entity.user].values())
    5154+
    5255+
     
    5457+application = service.Application("Jabber server")
    5558+
    56 +sessionManager = client.SessionManager()
    57 +FallbackHandler().setHandlerParent(sessionManager)
    58 +StaticRoster().setHandlerParent(sessionManager)
    59 +Hello().setHandlerParent(sessionManager)
    60 +
    61 +clientService = client.ClientService(sessionManager, domain)
    62 +clientService.logTraffic = True
    63 +
    64 +
    65 +c2sFactory = client.XMPPC2SServerFactory(clientService)
     59+router = Router()
     60+component = InternalComponent(router, domain)
     61+component.setServiceParent(application)
     62+
     63+sessionManager = client.SessionManager(domain, accounts)
     64+sessionManager.setHandlerParent(component)
     65+
     66+xmppim.AccountIQHandler(sessionManager).setHandlerParent(component)
     67+xmppim.AccountMessageHandler(sessionManager).setHandlerParent(component)
     68+xmppim.PresenceServerHandler(sessionManager, domain, roster).setHandlerParent(component)
     69+FallbackHandler().setHandlerParent(component)
     70+StaticRoster(roster).setHandlerParent(component)
     71+PingHandler().setHandlerParent(component)
     72+
     73+c2sFactory = client.XMPPC2SServerFactory(sessionManager)
    6674+c2sFactory.logTraffic = True
    6775+c2sService = strports.service('5224', c2sFactory)
    6876+c2sService.setServiceParent(application)
    69 diff -r 1644083ca235 wokkel/client.py
    70 --- a/wokkel/client.py  Sat Feb 13 18:57:27 2010 +0100
    71 +++ b/wokkel/client.py  Sat Feb 13 19:01:07 2010 +0100
    72 @@ -17,11 +17,12 @@
     77+
     78+sessionManager.connectionManager = c2sFactory
     79diff -r 62f841ed2a99 wokkel/client.py
     80--- a/wokkel/client.py  Sat Feb 13 18:57:26 2010 +0100
     81+++ b/wokkel/client.py  Sun Feb 21 14:28:09 2010 +0100
     82@@ -10,13 +10,27 @@
     83 that should probably eventually move there.
     84 """
     85 
     86+import base64
     87+
     88 from twisted.application import service
     89-from twisted.internet import reactor
     90+from twisted.internet import defer, reactor
    7391 from twisted.names.srvconnect import SRVConnector
    74  from twisted.python import log
    75  from twisted.words.protocols.jabber import client, error, sasl, xmlstream
    76 -from twisted.words.protocols.jabber.jid import internJID as JID
     92-from twisted.words.protocols.jabber import client, sasl, xmlstream
     93+from twisted.python import log, randbytes
     94+from twisted.words.protocols.jabber import client, error, sasl, xmlstream
    7795+from twisted.words.protocols.jabber.jid import JID, internJID
    78  from twisted.words.xish import domish
     96+from twisted.words.xish import domish
    7997 
    8098 from wokkel import generic
    8199-from wokkel.subprotocols import StreamManager
    82100+from wokkel.compat import XmlStreamServerFactory
    83 +from wokkel.subprotocols import StreamManager, XMPPHandlerCollection
    84  
    85  NS_CLIENT = 'jabber:client'
    86  
    87 @@ -311,6 +312,7 @@
    88  
    89          # TODO: check for resource conflicts
    90  
    91 +        print self.username, self.domain, self.resource
    92          newJID = JID(tuple=(self.username, self.domain, self.resource))
    93  
    94          reply = domish.Element((None, 'iq'))
    95 @@ -350,3 +352,166 @@
    96              reply['id'] = iq['id']
    97          reply.addElement((client.NS_XMPP_SESSION, 'session'))
    98          self.xmlstream.send(reply)
    99 +
    100 +
    101 +
    102 +class XMPPC2SServerFactory(XmlStreamServerFactory):
     101+from wokkel.subprotocols import StreamManager, XMPPHandler
     102+
     103+NS_CLIENT = 'jabber:client'
     104+
     105+XPATH_ALL = "/*"
     106+XPATH_AUTH = "/auth[@xmlns='%s']" % sasl.NS_XMPP_SASL
     107+XPATH_BIND = "/iq[@type='set']/bind[@xmlns='%s']" % client.NS_XMPP_BIND
     108+XPATH_SESSION = "/iq[@type='set']/session[@xmlns='%s']" % \
     109+                client.NS_XMPP_SESSION
     110 
     111 class CheckAuthInitializer(object):
     112     """
     113@@ -51,7 +65,7 @@
     114     autentication.
     115     """
     116 
     117-    namespace = 'jabber:client'
     118+    namespace = NS_CLIENT
     119 
     120     def __init__(self, jid, password):
     121         xmlstream.ConnectAuthenticator.__init__(self, jid.host)
     122@@ -186,3 +200,338 @@
     123     c = XMPPClientConnector(reactor, domain, factory)
     124     c.connect()
     125     return factory.deferred
     126+
     127+
     128+
     129+class XMPPClientListenAuthenticator(xmlstream.ListenAuthenticator):
     130+    namespace = NS_CLIENT
    103131+
    104132+    def __init__(self, service):
    105133+        self.service = service
     134+        self.failureGrace = 3
     135+        self.state = 'auth'
     136+
     137+
     138+    def associateWithStream(self, xs):
     139+        xmlstream.ListenAuthenticator.associateWithStream(self, xs)
     140+        self.xmlstream.addObserver(XPATH_ALL, self.onElementFallback, -1)
     141+
     142+
     143+    def onElementFallback(self, element):
     144+        if element.handled:
     145+            return
     146+
     147+        exc = error.StreamError('not-authorized')
     148+        self.xmlstream.sendStreamError(exc)
     149+
     150+
     151+    def streamStarted(self, rootElement):
     152+        xmlstream.ListenAuthenticator.streamStarted(self, rootElement)
     153+
     154+        # check namespace
     155+        #if self.xmlstream.namespace != self.namespace:
     156+        #    self.xmlstream.namespace = self.namespace
     157+        #    exc = error.StreamError('invalid-namespace')
     158+        #    self.xmlstream.sendStreamError(exc)
     159+        #    return
     160+
     161+        # TODO: check domain (self.service.domain)
     162+
     163+        self.xmlstream.sendHeader()
     164+
     165+        try:
     166+            stateHandlerName = 'streamStarted_' + self.state
     167+            stateHandler = getattr(self, stateHandlerName)
     168+        except AttributeError:
     169+            log.msg('streamStarted handler for', self.state, 'not found')
     170+        else:
     171+            stateHandler()
     172+
     173+
     174+    def toState(self, state):
     175+        self.state = state
     176+        if state == 'initialized':
     177+            self.xmlstream.removeObserver(XPATH_ALL, self.onElementFallback)
     178+            self.xmlstream.addOnetimeObserver(XPATH_SESSION, self.onSession, 1)
     179+            self.xmlstream.dispatch(self.xmlstream,
     180+                                    xmlstream.STREAM_AUTHD_EVENT)
     181+
     182+
     183+    def streamStarted_auth(self):
     184+        features = domish.Element((xmlstream.NS_STREAMS, 'features'))
     185+        features.addElement((sasl.NS_XMPP_SASL, 'mechanisms'))
     186+        features.mechanisms.addElement('mechanism', content='PLAIN')
     187+        self.xmlstream.send(features)
     188+        self.xmlstream.addOnetimeObserver(XPATH_AUTH, self.onAuth)
     189+
     190+
     191+    def onAuth(self, auth):
     192+        auth.handled = True
     193+
     194+        if auth.getAttribute('mechanism') != 'PLAIN':
     195+            failure = domish.Element((sasl.NS_XMPP_SASL, 'failure'))
     196+            failure.addElement('invalid-mechanism')
     197+            self.xmlstream.send(failure)
     198+
     199+            # Close stream on too many failing authentication attempts
     200+            self.failureGrace -= 1
     201+            if self.failureGrace == 0:
     202+                self.xmlstream.sendFooter()
     203+            else:
     204+                self.xmlstream.addOnetimeObserver(XPATH_AUTH, self.onAuth)
     205+
     206+            return
     207+
     208+        initialResponse = base64.b64decode(unicode(auth))
     209+        authzid, authcid, passwd = initialResponse.split('\x00')
     210+
     211+        # TODO: check passwd
     212+
     213+        # authenticated
     214+
     215+        self.username = authcid
     216+
     217+        success = domish.Element((sasl.NS_XMPP_SASL, 'success'))
     218+        self.xmlstream.send(success)
     219+        self.xmlstream.reset()
     220+
     221+        self.toState('bind')
     222+
     223+
     224+    def streamStarted_bind(self):
     225+        features = domish.Element((xmlstream.NS_STREAMS, 'features'))
     226+        features.addElement((client.NS_XMPP_BIND, 'bind'))
     227+        features.addElement((client.NS_XMPP_SESSION, 'session'))
     228+        self.xmlstream.send(features)
     229+        self.xmlstream.addOnetimeObserver(XPATH_BIND, self.onBind)
     230+
     231+
     232+    def onBind(self, iq):
     233+        def cb(boundJID):
     234+            self.xmlstream.otherEntity = boundJID
     235+            self.toState('initialized')
     236+
     237+            response = xmlstream.toResponse(iq, 'result')
     238+            response.addElement((client.NS_XMPP_BIND, 'bind'))
     239+            response.bind.addElement((client.NS_XMPP_BIND, 'jid'),
     240+                                  content=boundJID.full())
     241+
     242+            return response
     243+
     244+        def eb(failure):
     245+            if not isinstance(failure, error.StanzaError):
     246+                log.msg(failure)
     247+                exc = error.StanzaError('internal-server-error')
     248+            else:
     249+                exc = failure.value
     250+
     251+            return exc.toResponse(iq)
     252+
     253+        iq.handled = True
     254+        resource = unicode(iq.bind) or None
     255+        d = self.service.bindResource(self.username,
     256+                                      self.service.domain,
     257+                                      resource)
     258+        d.addCallback(cb)
     259+        d.addErrback(eb)
     260+        d.addCallback(self.xmlstream.send)
     261+
     262+
     263+    def onSession(self, iq):
     264+        iq.handled = True
     265+
     266+        reply = domish.Element((None, 'iq'))
     267+        reply['type'] = 'result'
     268+        if iq.getAttribute('id'):
     269+            reply['id'] = iq['id']
     270+        reply.addElement((client.NS_XMPP_SESSION, 'session'))
     271+        self.xmlstream.send(reply)
     272+
     273+
     274+
     275+class RecipientUnavailable(Exception):
     276+    """
     277+    The addressed entity is not, or no longer, available.
     278+    """
     279+
     280+
     281+
     282+class XMPPC2SServerFactory(XmlStreamServerFactory):
     283+
     284+    def __init__(self, service):
     285+        self.service = service
    106286+
    107287+        def authenticatorFactory():
    108 +            return XMPPClientListenAuthenticator(service.domain)
     288+            return XMPPClientListenAuthenticator(service)
    109289+
    110290+        XmlStreamServerFactory.__init__(self, authenticatorFactory)
     
    115295+
    116296+        self.serial = 0
     297+        self.streams = {}
    117298+
    118299+
     
    148329+        xs.addObserver('/*', self.onElement, 0, xs)
    149330+
    150 +        self.service.connectionInitialized(xs)
     331+        # Record this stream as bound to the authenticated JID
     332+        self.streams[xs.otherEntity] = xs
    151333+
    152334+
     
    154336+        log.msg("Client connection %d disconnected" % xs.serial)
    155337+
    156 +        self.service.connectionLost(xs, reason)
     338+        entity = xs.otherEntity
     339+        self.service.unbindResource(entity.user,
     340+                                    entity.host,
     341+                                    entity.resource,
     342+                                    reason)
     343+
     344+        # If the lost connections had been bound, remove the reference
     345+        if xs.otherEntity in self.streams:
     346+            del self.streams[xs.otherEntity]
    157347+
    158348+
     
    161351+
    162352+
    163 +    def onElement(self, xs, element):
     353+    def onElement(self, xs, stanza):
    164354+        """
    165355+        Called when an element was received from one of the connected streams.
    166356+
    167357+        """
    168 +        if element.handled:
     358+        if stanza.handled:
    169359+            return
    170360+        else:
    171 +            self.service.dispatch(xs, element)
    172 +
    173 +
    174 +
    175 +class SessionManager(XMPPHandlerCollection):
    176 +
    177 +    def __init__(self):
    178 +        XMPPHandlerCollection.__init__(self)
    179 +        self.xmlstream = None
    180 +
    181 +
    182 +    def makeConnection(self, xs):
    183 +        self.xmlstream = xs
    184 +
    185 +        for handler in self:
    186 +            handler.makeConnection(xs)
    187 +            handler.connectionInitialized()
    188 +
    189 +
    190 +    def send(self, obj):
    191 +        if self.xmlstream:
    192 +            self.xmlstream.send(obj)
    193 +
    194 +
    195 +class ClientService(object):
     361+            self.service.onElement(stanza, xs.otherEntity)
     362+
     363+
     364+    def deliverStanza(self, element, recipient):
     365+        if recipient in self.streams:
     366+            self.streams[recipient].send(element)
     367+        else:
     368+            raise RecipientUnavailable(u"There is no connection for %s" %
     369+                                       recipient.full())
     370+
     371+
     372+class Session(object):
     373+    def __init__(self, entity):
     374+        self.entity = entity
     375+        self.connected = False
     376+        self.interested = False
     377+        self.presence = None
     378+
     379+
     380+
     381+class SessionManager(XMPPHandler):
     382+
    196383+    """
    197 +    Service for accepting XMPP client connections.
    198 +
    199 +    Incoming client connections are first authenticated using the
    200 +    L{XMPPClientListenAuthenticator}, and then kept in a dictionary that is
    201 +    keyed with the JID that was bound to that connection.
    202 +
    203 +    Where L{StreamManager} manages one connection at a time, this
    204 +    service demultiplexes incoming connections. For the subprotocol handlers
    205 +    (objects providing {ijabber.IXMPPHandler}), their stream is always
    206 +    connected and initialized, but they only have to deal with one stream. This
    207 +    makes it easier to create adapters.
    208 +
    209 +    As an L{xmlstream.XMPPHandlerCollection}, this service creates a fake XML
    210 +    stream that is passed to the XMPP subprotocol handlers. The received
    211 +    stanzas from the incoming client connections are passed on to the handlers
    212 +    using this fake XML, while having their C{from} attribute set to the JID of
    213 +    the client stream. Stanzas sent by the handlers are forwarded to the
    214 +    matching client stream, selected by the stanzas C{to} attribute.
     384+    Session Manager.
     385+
     386+    @ivar xmlstream: XML Stream to inject incoming stanzas from client
     387+        connections into. Stanzas where the C{'to'} attribute is not set
     388+        or is directed at the local domain are injected as if received on
     389+        the XML Stream (using C{dispatch}), other stanzas are injected as if
     390+        they were sent from the XML Stream (using C{send}).
    215391+    """
    216392+
    217 +    logTraffic = False
    218 +
    219 +    def __init__(self, sessionManager, domain):
     393+    def __init__(self, domain, accounts):
     394+        XMPPHandler.__init__(self)
     395+        self.domain = domain
     396+        self.accounts = accounts
     397+
     398+        self.connectionManager = None
     399+        self.sessions = {}
     400+
     401+
     402+    def bindResource(self, localpart, domain, resource):
     403+        if domain != self.domain:
     404+            raise Exception("I don't host this domain!")
     405+
     406+        try:
     407+            userSessions = self.sessions[localpart]
     408+        except KeyError:
     409+            userSessions = self.sessions[localpart] = {}
     410+
     411+        if resource is None:
     412+            resource = randbytes.secureRandom(8).encode('hex')
     413+        elif resource in self.userSessions:
     414+            resource = resource + ' ' + randbytes.secureRandom(8).encode('hex')
     415+
     416+        entity = JID(tuple=(localpart, domain, resource))
     417+        session = Session(entity)
     418+        session.connected = True
     419+        userSessions[resource] = session
     420+
     421+        return defer.succeed(entity)
     422+
     423+
     424+    def unbindResource(self, localpart, domain, resource, reason=None):
     425+        try:
     426+            session = self.sessions[localpart][resource]
     427+        except KeyError:
     428+            pass
     429+        else:
     430+            session.connected = False
     431+            del self.sessions[localpart][resource]
     432+            if not self.sessions[localpart]:
     433+                del self.sessions[localpart]
     434+
     435+        return defer.succeed(None)
     436+
     437+
     438+    def onElement(self, element, sender):
     439+        # Make sure each stanza has a sender address
     440+        if (element.name == 'presence' and
     441+            element.getAttribute('type') in ('subscribe', 'subscribed',
     442+                                             'unsubscribe', 'unsubscribed')):
     443+            element['from'] = sender.userhost()
     444+        else:
     445+            element['from'] = sender.full()
     446+
     447+        if (not element.hasAttribute('to') or
     448+                internJID(element['to']).host == self.domain):
     449+            # This stanza is for local delivery
     450+            self.xmlstream.dispatch(element)
     451+        else:
     452+            # This stanza is for remote delivery
     453+            self.xmlstream.send(element)
     454+
     455+
     456+    def deliverStanza(self, element, recipient):
     457+        if self.connectionManager:
     458+            self.connectionManager.deliverStanza(element, recipient)
     459+        else:
     460+            raise Exception("No connection manager set")
     461diff -r 62f841ed2a99 wokkel/component.py
     462--- a/wokkel/component.py       Sat Feb 13 18:57:26 2010 +0100
     463+++ b/wokkel/component.py       Sun Feb 21 14:28:09 2010 +0100
     464@@ -313,12 +313,24 @@
     465         """
     466         destination = JID(stanza['to'])
     467 
     468-        log.msg("Routing to %s: %r" % (destination.full(), stanza.toXml()))
     469 
     470         if destination.host in self.routes:
     471+            msg = "Routing to %s (default route): %r"
     472+            log.msg("Routing to %s: %r" % (destination.full(),
     473+                                           stanza.toXml()))
     474             self.routes[destination.host].send(stanza)
     475+        elif None in self.routers:
     476+            log.msg("Routing to %s (default route): %r" % (destination.full(),
     477+                                                           stanza.toXml()))
     478+            self.routes[None].send(stanza)
     479         else:
     480-            self.routes[None].send(stanza)
     481+            log.msg("No route to %s: %r" % (destination.full(),
     482+                                            stanza.toXml()))
     483+            if stanza.getAttribute('type') not in ('result', 'error'):
     484+                # No route, send back error
     485+                exc = error.StanzaError('remote-server-timeout')
     486+                response = exc.toResponse(stanza)
     487+                self.route(response)
     488 
     489 
     490 
     491diff -r 62f841ed2a99 wokkel/xmppim.py
     492--- a/wokkel/xmppim.py  Sat Feb 13 18:57:26 2010 +0100
     493+++ b/wokkel/xmppim.py  Sun Feb 21 14:28:09 2010 +0100
     494@@ -12,8 +12,11 @@
     495 All of it should eventually move to Twisted.
     496 """
     497 
     498+import copy
     499+
     500+from twisted.python import log
     501 from twisted.words.protocols.jabber import error, xmlstream
     502-from twisted.words.protocols.jabber.jid import JID
     503+from twisted.words.protocols.jabber.jid import JID, internJID
     504 from twisted.words.xish import domish
     505 
     506 from wokkel.compat import IQ
     507@@ -85,7 +88,7 @@
     508             handler(presence)
     509 
     510     def _onPresenceAvailable(self, presence):
     511-        entity = JID(presence["from"])
     512+        entity = internJID(presence["from"])
     513 
     514         show = unicode(presence.show or '')
     515         if show not in ['away', 'xa', 'chat', 'dnd']:
     516@@ -101,23 +104,23 @@
     517         self.availableReceived(entity, show, statuses, priority)
     518 
     519     def _onPresenceUnavailable(self, presence):
     520-        entity = JID(presence["from"])
     521+        entity = internJID(presence["from"])
     522 
     523         statuses = self._getStatuses(presence)
     524 
     525         self.unavailableReceived(entity, statuses)
     526 
     527     def _onPresenceSubscribed(self, presence):
     528-        self.subscribedReceived(JID(presence["from"]))
     529+        self.subscribedReceived(internJID(presence["from"]))
     530 
     531     def _onPresenceUnsubscribed(self, presence):
     532-        self.unsubscribedReceived(JID(presence["from"]))
     533+        self.unsubscribedReceived(internJID(presence["from"]))
     534 
     535     def _onPresenceSubscribe(self, presence):
     536-        self.subscribeReceived(JID(presence["from"]))
     537+        self.subscribeReceived(internJID(presence["from"]))
     538 
     539     def _onPresenceUnsubscribe(self, presence):
     540-        self.unsubscribeReceived(JID(presence["from"]))
     541+        self.unsubscribeReceived(internJID(presence["from"]))
     542 
     543 
     544     def availableReceived(self, entity, show=None, statuses=None, priority=0):
     545@@ -125,7 +128,7 @@
     546         Available presence was received.
     547 
     548         @param entity: entity from which the presence was received.
     549-        @type entity: {JID}
     550+        @type entity: L{JID}
     551         @param show: detailed presence information. One of C{'away'}, C{'xa'},
     552                      C{'chat'}, C{'dnd'} or C{None}.
     553         @type show: C{str} or C{NoneType}
     554@@ -143,7 +146,7 @@
     555         Unavailable presence was received.
     556 
     557         @param entity: entity from which the presence was received.
     558-        @type entity: {JID}
     559+        @type entity: L{JID}
     560         @param statuses: dictionary of natural language descriptions of the
     561                          availability status, keyed by the language
     562                          descriptor. A status without a language
     563@@ -156,7 +159,7 @@
     564         Subscription approval confirmation was received.
     565 
     566         @param entity: entity from which the confirmation was received.
     567-        @type entity: {JID}
     568+        @type entity: L{JID}
     569         """
     570 
     571     def unsubscribedReceived(self, entity):
     572@@ -164,7 +167,7 @@
     573         Unsubscription confirmation was received.
     574 
     575         @param entity: entity from which the confirmation was received.
     576-        @type entity: {JID}
     577+        @type entity: L{JID}
     578         """
     579 
     580     def subscribeReceived(self, entity):
     581@@ -172,7 +175,7 @@
     582         Subscription request was received.
     583 
     584         @param entity: entity from which the request was received.
     585-        @type entity: {JID}
     586+        @type entity: L{JID}
     587         """
     588 
     589     def unsubscribeReceived(self, entity):
     590@@ -180,7 +183,7 @@
     591         Unsubscription request was received.
     592 
     593         @param entity: entity from which the request was received.
     594-        @type entity: {JID}
     595+        @type entity: L{JID}
     596         """
     597 
     598     def available(self, entity=None, show=None, statuses=None, priority=0):
     599@@ -188,7 +191,7 @@
     600         Send available presence.
     601 
     602         @param entity: optional entity to which the presence should be sent.
     603-        @type entity: {JID}
     604+        @type entity: L{JID}
     605         @param show: optional detailed presence information. One of C{'away'},
     606                      C{'xa'}, C{'chat'}, C{'dnd'}.
     607         @type show: C{str}
     608@@ -207,7 +210,7 @@
     609         Send unavailable presence.
     610 
     611         @param entity: optional entity to which the presence should be sent.
     612-        @type entity: {JID}
     613+        @type entity: L{JID}
     614         @param statuses: dictionary of natural language descriptions of the
     615                          availability status, keyed by the language
     616                          descriptor. A status without a language
     617@@ -221,7 +224,7 @@
     618         Send subscription request
     619 
     620         @param entity: entity to subscribe to.
     621-        @type entity: {JID}
     622+        @type entity: L{JID}
     623         """
     624         self.send(Presence(to=entity, type='subscribe'))
     625 
     626@@ -230,7 +233,7 @@
     627         Send unsubscription request
     628 
     629         @param entity: entity to unsubscribe from.
     630-        @type entity: {JID}
     631+        @type entity: L{JID}
     632         """
     633         self.send(Presence(to=entity, type='unsubscribe'))
     634 
     635@@ -239,7 +242,7 @@
     636         Send subscription confirmation.
     637 
     638         @param entity: entity that subscribed.
     639-        @type entity: {JID}
     640+        @type entity: L{JID}
     641         """
     642         self.send(Presence(to=entity, type='subscribed'))
     643 
     644@@ -248,7 +251,7 @@
     645         Send unsubscription confirmation.
     646 
     647         @param entity: entity that unsubscribed.
     648-        @type entity: {JID}
     649+        @type entity: L{JID}
     650         """
     651         self.send(Presence(to=entity, type='unsubscribed'))
     652 
     653@@ -478,7 +481,7 @@
     654 
     655         @param recipient: Optional Recipient to which the presence should be
     656             sent.
     657-        @type recipient: {JID}
     658+        @type recipient: L{JID}
     659 
     660         @param show: Optional detailed presence information. One of C{'away'},
     661             C{'xa'}, C{'chat'}, C{'dnd'}.
     662@@ -503,7 +506,7 @@
     663         Send unavailable presence.
     664 
     665         @param recipient: Optional entity to which the presence should be sent.
     666-        @type recipient: {JID}
     667+        @type recipient: L{JID}
     668 
     669         @param statuses: dictionary of natural language descriptions of the
     670             availability status, keyed by the language descriptor. A status
     671@@ -520,7 +523,7 @@
     672         Send subscription request
     673 
     674         @param recipient: Entity to subscribe to.
     675-        @type recipient: {JID}
     676+        @type recipient: L{JID}
     677         """
     678         presence = SubscriptionPresence(recipient=recipient, sender=sender)
     679         presence.stanzaType = 'subscribe'
     680@@ -532,7 +535,7 @@
     681         Send unsubscription request
     682 
     683         @param recipient: Entity to unsubscribe from.
     684-        @type recipient: {JID}
     685+        @type recipient: L{JID}
     686         """
     687         presence = SubscriptionPresence(recipient=recipient, sender=sender)
     688         presence.stanzaType = 'unsubscribe'
     689@@ -544,7 +547,7 @@
     690         Send subscription confirmation.
     691 
     692         @param recipient: Entity that subscribed.
     693-        @type recipient: {JID}
     694+        @type recipient: L{JID}
     695         """
     696         presence = SubscriptionPresence(recipient=recipient, sender=sender)
     697         presence.stanzaType = 'subscribed'
     698@@ -556,7 +559,7 @@
     699         Send unsubscription confirmation.
     700 
     701         @param recipient: Entity that unsubscribed.
     702-        @type recipient: {JID}
     703+        @type recipient: L{JID}
     704         """
     705         presence = SubscriptionPresence(recipient=recipient, sender=sender)
     706         presence.stanzaType = 'unsubscribed'
     707@@ -568,7 +571,7 @@
     708         Send presence probe.
     709 
     710         @param recipient: Entity to be probed.
     711-        @type recipient: {JID}
     712+        @type recipient: L{JID}
     713         """
     714         presence = ProbePresence(recipient=recipient, sender=sender)
     715         self.send(presence.toElement())
     716@@ -652,7 +655,7 @@
     717 
     718 
     719     def _parseRosterItem(self, element):
     720-        jid = JID(element['jid'])
     721+        jid = internJID(element['jid'])
     722         item = RosterItem(jid)
     723         item.name = element.getAttribute('name')
     724         subscription = element.getAttribute('subscription')
     725@@ -715,7 +718,7 @@
     726         itemElement = iq.query.item
     727 
     728         if unicode(itemElement['subscription']) == 'remove':
     729-            self.onRosterRemove(JID(itemElement['jid']))
     730+            self.onRosterRemove(internJID(itemElement['jid']))
     731         else:
     732             item = self._parseRosterItem(iq.query.item)
     733             self.onRosterSet(item)
     734@@ -763,7 +766,7 @@
     735     def _onRosterGet(self, iq):
     736         iq.handled = True
     737 
     738-        d = self.getRoster(JID(iq["from"]))
     739+        d = self.getRoster(internJID(iq["from"]))
     740         d.addCallback(self._toRosterReply, iq)
     741         d.addErrback(lambda _: error.ErrorStanza('internal-error').toResponse(iq))
     742         d.addBoth(self.send)
     743@@ -808,3 +811,380 @@
     744         """
     745         Called when a message stanza was received.
     746         """
     747+
     748+
     749+
     750+class AccountIQHandler(XMPPHandler):
     751+
     752+    def __init__(self, sessionManager):
     753+        XMPPHandler.__init__(self)
     754+        self.sessionManager = sessionManager
     755+
     756+
     757+    def connectionMade(self):
     758+        self.xmlstream.addObserver('/iq', self.onIQ, 1)
     759+
     760+
     761+    def onIQ(self, iq):
     762+        """
     763+        Handler for iq stanzas to user accounts' connected resources.
     764+
     765+        If the recipient is a bare JID or there is no associated user, this
     766+        handler ignores the stanza, so that other handlers have a chance
     767+        to pick it up. If used, L{generic.FallbackHandler} will respond with a
     768+        C{'service-unavailable'} stanza error if no other handlers handle
     769+        the iq.
     770+        """
     771+
     772+        if iq.handled:
     773+            return
     774+
     775+        try:
     776+            recipient = internJID(iq['to'])
     777+        except KeyError:
     778+            return
     779+
     780+        if not recipient.user:
     781+            # This is not for an account, ignore it
     782+            return
     783+        elif recipient.user not in self.sessionManager.accounts:
     784+            # This is not a user, ignore it
     785+            return
     786+        elif not recipient.resource:
     787+            # Bare JID at local domain, ignore it
     788+            return
     789+        elif recipient.user in self.sessionManager.sessions:
     790+            # Full JID with connected resource, deliver the stanza
     791+            self.sessionManager.deliverStanza(iq, recipient)
     792+        else:
     793+            # Full JID without connected resource, return error
     794+            exc = error.StanzaError('service-unavailable')
     795+            if iq['type'] in ('result', 'error'):
     796+                log.err(exc, 'Could not deliver IQ response')
     797+            else:
     798+                self.send(exc.toResponse(iq))
     799+
     800+        iq.handled = True
     801+
     802+
     803+
     804+class AccountMessageHandler(XMPPHandler):
     805+
     806+    def __init__(self, sessionManager):
     807+        XMPPHandler.__init__(self)
     808+        self.sessionManager = sessionManager
     809+
     810+
     811+    def connectionMade(self):
     812+        self.xmlstream.addObserver('/message', self.onMessage, 1)
     813+
     814+
     815+    def onMessage(self, message):
     816+        """
     817+        Handler for message stanzas to user accounts.
     818+        """
     819+
     820+        if message.handled:
     821+            return
     822+
     823+        try:
     824+            recipient = internJID(message['to'])
     825+        except KeyError:
     826+            return
     827+
     828+        stanzaType = message.getAttribute('type', 'normal')
     829+
     830+        try:
     831+            if not recipient.user:
     832+                # This is not for an account, ignore it
     833+                return
     834+            elif recipient.user not in self.sessionManager.accounts:
     835+                # This is not a user, ignore it
     836+                return
     837+            elif recipient.resource:
     838+                userSessions = self.sessionManager.sessions.get(recipient.user,
     839+                                                                {})
     840+                if recipient.resource in userSessions:
     841+                    self.sessionManager.deliverStanza(message, recipient)
     842+                else:
     843+                    if stanzaType in ('normal', 'chat', 'headline'):
     844+                        self.onMessageBareJID(message, recipient.userhostJID())
     845+                    elif stanzaType == 'error':
     846+                        log.msg("Dropping message to unconnected resource %r" %
     847+                                recipient.full())
     848+                    elif stanzaType == 'groupchat':
     849+                        raise error.StanzaError('service-unavailable')
     850+            else:
     851+                self.onMessageBareJID(message, recipient)
     852+        except error.StanzaError, exc:
     853+            if stanzaType == 'error':
     854+                log.err(exc, "Undeliverable error")
     855+            else:
     856+                self.send(exc.toResponse(message))
     857+
     858+        message.handled = True
     859+
     860+
     861+    def onMessageBareJID(self, message, bareJID):
     862+        stanzaType = message.getAttribute('type', 'normal')
     863+
     864+        userSessions = self.sessionManager.sessions.get(bareJID.user, {})
     865+        print userSessions
     866+
     867+        recipients = set()
     868+
     869+        if stanzaType == 'headline':
     870+            for session in userSessions:
     871+                if session.presence.priority >= 0:
     872+                    recipients.add(session.entity)
     873+        elif stanzaType in ('chat', 'normal'):
     874+            priorities = {}
     875+            for session in userSessions.itervalues():
     876+                if not session.presence or not session.presence.available:
     877+                    continue
     878+                priority = session.presence.priority
     879+                if priority >= 0:
     880+                    priorities.setdefault(priority, set()).add(session.entity)
     881+                maxPriority = max(priorities.keys())
     882+                recipients.update(priorities[maxPriority])
     883+        elif stanzaType == 'groupchat':
     884+            raise error.StanzaError('service-unavailable')
     885+
     886+        if recipients:
     887+            for recipient in recipients:
     888+                self.sessionManager.deliverStanza(message, recipient)
     889+        elif stanzaType in ('chat', 'normal'):
     890+            raise error.StanzaError('service-unavailable')
     891+        else:
     892+            # silently discard
     893+            log.msg("Discarding message to %r" % message['to'])
     894+
     895+
     896+
     897+
     898+def clonePresence(presence):
     899+    """
     900+    Make a deep copy of a presence stanza.
     901+
     902+    The returned presence stanza is an orphaned deep copy of the given
     903+    original.
     904+
     905+    @note: Since the reference to the original parent, if any, is gone,
     906+    inherited attributes like C{xml:lang} are not preserved.
     907+    """
     908+    element = presence.element
     909+
     910+    parent = element.parent
     911+    element.parent = None
     912+    newElement = copy.deepcopy(element)
     913+    element.parent = parent
     914+    return newElement
     915+
     916+
     917+
     918+class PresenceServerHandler(PresenceProtocol):
     919+
     920+    def __init__(self, sessionManager, domain, roster):
     921+        PresenceProtocol.__init__(self)
    220922+        self.sessionManager = sessionManager
    221923+        self.domain = domain
    222 +
    223 +        self.streams = {}
    224 +
    225 +        pipe = generic.XmlPipe()
    226 +        self.xmlstream = pipe.source
    227 +        self.sessionManager.makeConnection(pipe.sink)
    228 +        self.xmlstream.addObserver('/*', self.send)
    229 +
    230 +
    231 +    def connectionInitialized(self, xs):
    232 +        self.streams[xs.otherEntity] = xs
    233 +
    234 +
    235 +    def connectionLost(self, xs, reason):
    236 +        if xs.otherEntity in self.streams:
    237 +            del self.streams[xs.otherEntity]
    238 +
    239 +
    240 +    def send(self, stanza):
     924+        self.roster = roster
     925+        self.presences = {} # user -> resource -> presence
     926+        self.offlinePresences = {} # user -> presence
     927+        self.remotePresences = {} # user -> remote entity -> presence
     928+
     929+
     930+    def _broadcastToOtherResources(self, presence):
    241931+        """
    242 +        Send stanza to the proper XML Stream.
    243 +
    244 +        This uses addressing embedded in the element to find the correct stream
    245 +        to forward the element to.
     932+        Broadcast presence to other available resources.
    246933+        """
    247 +        destination = internJID(stanza["to"])
    248 +
    249 +        if destination not in self.streams:
    250 +            log.msg("Euh")
    251 +            raise Exception("Destination unreachable")
    252 +
    253 +        self.streams[destination].send(stanza)
    254 +
    255 +
    256 +    def dispatch(self, xs, stanza):
     934+        fromJID = presence.sender
     935+        for otherResource in self.presences[fromJID.user]:
     936+            if otherResource == fromJID.resource:
     937+                continue
     938+
     939+            resourceJID = JID(tuple=(fromJID.user, fromJID.host, otherResource))
     940+            outPresence = clonePresence(presence)
     941+            outPresence['to'] = resourceJID.full()
     942+            self.sessionManager.deliverStanza(outPresence, resourceJID)
     943+
     944+
     945+    def _broadcastToContacts(self, presence):
    257946+        """
    258 +        Called when an element was received from one of the connected streams.
     947+        Broadcast presence to subscribed entities.
    259948+        """
    260 +        stanza["from"] = xs.otherEntity.full()
    261 +        self.xmlstream.send(stanza)
     949+        fromJID = presence.sender
     950+        roster = self.roster[fromJID.user]
     951+
     952+        for item in roster.itervalues():
     953+            if not item.subscriptionFrom:
     954+                continue
     955+
     956+            outPresence = clonePresence(presence)
     957+            outPresence['to'] = item.jid.full()
     958+
     959+            if item.jid.host == self.domain:
     960+                # local contact
     961+                if item.jid.user in self.presences:
     962+                    # broadcast to contact's available resources
     963+                    for itemResource in self.presences[item.jid.user]:
     964+                        resourceJID = JID(tuple=(item.jid.user,
     965+                                                 item.jid.host,
     966+                                                 itemResource))
     967+                        self.sessionManager.deliverStanza(outPresence,
     968+                                                          resourceJID)
     969+            else:
     970+                # remote contact
     971+                self.send(outPresence)
     972+
     973+
     974+    def _on_availableBroadcast(self, presence):
     975+        fromJID = presence.sender
     976+        user, resource = fromJID.user, fromJID.resource
     977+        roster = self.roster[user]
     978+
     979+        if user not in self.presences:
     980+            # initial presence
     981+            self.presences[user] = {}
     982+            self.remotePresences[user] = {}
     983+
     984+            # send out probes
     985+            for item in roster.itervalues():
     986+                if item.subscriptionTo and item.jid.host != self.domain:
     987+                    self.probe(item.jid, fromJID)
     988+        else:
     989+            if resource not in self.presences[user]:
     990+                # initial presence with another available resource
     991+
     992+                # send last known presences from remote contacts
     993+                remotePresences = self.remotePresences[user]
     994+                for entity, remotePresence in remotePresences.iteritems():
     995+                    self.sessionManager.deliverStanza(remotePresence.element,
     996+                                                      fromJID)
     997+
     998+            # send presence to other resources
     999+            self._broadcastToOtherResources(presence)
     1000+
     1001+        # Send last known local presences
     1002+        if user not in self.presences or resource not in self.presences[user]:
     1003+            for item in roster.itervalues():
     1004+                if item.subscriptionTo and \
     1005+                   item.jid.host == self.domain and \
     1006+                   item.jid.user in self.presences:
     1007+                    for contactPresence in \
     1008+                            self.presences[item.jid.user].itervalues():
     1009+                        outPresence = clonePresence(contactPresence)
     1010+                        outPresence['to'] = fromJID.userhost()
     1011+                        self.sessionManager.deliverStanza(outPresence, fromJID)
     1012+
     1013+        # broadcast presence
     1014+        self._broadcastToContacts(presence)
     1015+
     1016+        # save presence
     1017+        self.presences[user][resource] = presence
     1018+        self.sessionManager.sessions[user][resource].presence = presence
     1019+
     1020+
     1021+    #def _on_availableDirected(self, presence):
     1022+    #    pass
     1023+
     1024+
     1025+    def _on_availableInbound(self, presence):
     1026+        fromJID = presence.sender
     1027+        toJID = presence.recipient
     1028+        if (toJID.user in self.roster and
     1029+            toJID.user in self.presences):
     1030+            for resource in self.presences[toJID.user]:
     1031+                resourceJID = JID(tuple=(toJID.user, toJID.host, resource))
     1032+                self.sessionManager.deliverStanza(presence.element, resourceJID)
     1033+            self.remotePresences[toJID.user][fromJID] = presence
     1034+        else:
     1035+            # no such user or no available resource, ignore this stanza
     1036+            pass
     1037+
     1038+
     1039+    def _on_unavailableBroadcast(self, presence):
     1040+        fromJID = presence.sender
     1041+        user, resource = fromJID.user, fromJID.resource
     1042+
     1043+        # broadcast presence
     1044+        self._broadcastToContacts(presence)
     1045+
     1046+        if user in self.presences:
     1047+            # send presence to other resources
     1048+            self._broadcastToOtherResources(presence)
     1049+
     1050+            # update stored presences
     1051+            if resource in self.presences[user]:
     1052+                del self.presences[user][resource]
     1053+
     1054+            if not self.presences[user]:
     1055+                # last resource to become unavailable
     1056+                del self.presences[user]
     1057+
     1058+                # TODO: save last unavailable presence
     1059+
     1060+
     1061+#    def _on_unavailableDirected(self, presence):
     1062+#        pass
     1063+
     1064+
     1065+#    def _on_unavailableInbound(self, presence):
     1066+#        pass
     1067+
     1068+
     1069+    def getDirection(self, presence):
     1070+        if not presence.recipient:
     1071+            if presence.sender.host == self.domain:
     1072+                # broadcast presence from local domain
     1073+                return 'Broadcast'
     1074+            else:
     1075+                raise Exception("Unexpected missing to address")
     1076+        else:
     1077+            if presence.sender.host == self.domain:
     1078+                # directed presence from local domain
     1079+                return 'Directed'
     1080+            elif presence.recipient.host == self.domain:
     1081+                # incoming remote presence
     1082+                return 'Inbound'
     1083+            else:
     1084+                raise Exception("Badly routed presence")
     1085+
     1086+    def availableReceived(self, presence):
     1087+        direction = self.getDirection(presence)
     1088+        handler = getattr(self, "_on_available%s" % direction)
     1089+        if handler:
     1090+            handler(presence)
     1091+            presence.handled = True
     1092+        else:
     1093+            print "Unhandled: %r" % presence.element.toXml()
     1094+
     1095+
     1096+    def unavailableReceived(self, presence):
     1097+        direction = self.getDirection(presence)
     1098+        handler = getattr(self, "_on_unavailable%s" % direction)
     1099+        if handler:
     1100+            handler(presence)
     1101+            presence.handled = True
     1102+        else:
     1103+            print "Unhandled: %r" % presence.element.toXml()
     1104+
     1105+
     1106+
     1107+    def probeReceived(self, presence):
     1108+        fromJID = presence.sender
     1109+        toJID = presence.recipient
     1110+
     1111+        if toJID.user not in self.roster or \
     1112+           fromJID.userhost() not in self.roster[toJID.user] or \
     1113+           not self.roster[toJID.user][fromJID.userhost()].subscriptionFrom:
     1114+            # send unsubscribed
     1115+            pass
     1116+        elif toJID.user not in self.presences:
     1117+            # send last unavailable or nothing
     1118+            pass
     1119+        else:
     1120+            for resourcePresence in self.presences[toJID.user].itervalues():
     1121+                outPresence = clonePresence(resourcePresence)
     1122+                outPresence['to'] = fromJID.userhost()
     1123+                self.send(outPresence)
Note: See TracChangeset for help on using the changeset viewer.