source: ralphm-patches/c2s_stanza_handlers.patch @ 54:03ec57713c90

Last change on this file since 54:03ec57713c90 was 54:03ec57713c90, checked in by Ralph Meijer <ralphm@…>, 8 years ago

Upstreamed Request patches, split out c2s patches in managable chunks, prepare for release of Wokkel 0.7.0.

File size: 21.8 KB
  • new file doc/examples/client_service.tac

    # HG changeset patch
    # Parent 9e8497278e0e4f8a145f321a4e4d22e3bb499b38
    Add c2s protocol handlers for iq, message and presence stanzas.
    
    TODO:
     * Add tests.
     * Add docstrings.
     * Save last unavailable presence for future probes.
    
    diff -r 9e8497278e0e doc/examples/client_service.tac
    - +  
     1from twisted.application import service, strports
     2from twisted.internet import defer
     3
     4from wokkel import client, im
     5from wokkel.component import InternalComponent, Router
     6from wokkel.generic import FallbackHandler
     7from wokkel.ping import PingHandler
     8from wokkel.im import RosterItem
     9
     10from twisted.words.protocols.jabber.jid import internJID as JID
     11
     12import socket
     13domain = socket.gethostname()
     14
     15RALPHM = JID('ralphm@'+domain)
     16INTOSI = JID('intosi@'+domain)
     17TERMIE = JID('termie@'+domain)
     18
     19roster = {
     20    'ralphm': {
     21        INTOSI: RosterItem(INTOSI,
     22                           subscriptionTo=True,
     23                           subscriptionFrom=True,
     24                           name='Intosi',
     25                           groups=set(['Friends'])),
     26        TERMIE: RosterItem(TERMIE,
     27                           subscriptionTo=True,
     28                           subscriptionFrom=True,
     29                           name='termie'),
     30        },
     31    'termie': {
     32        RALPHM: RosterItem(RALPHM,
     33                           subscriptionTo=True,
     34                           subscriptionFrom=True,
     35                           name='ralphm'),
     36        }
     37    }
     38
     39accounts = set(roster.keys())
     40
     41
     42class StaticRoster(im.RosterServerProtocol):
     43
     44    def __init__(self, roster):
     45        im.RosterServerProtocol.__init__(self)
     46        self.roster = roster
     47
     48    def getRoster(self, request):
     49        user = request.sender.user
     50        return defer.succeed(self.roster[user].values())
     51
     52
     53
     54application = service.Application("Jabber server")
     55
     56router = Router()
     57component = InternalComponent(router, domain)
     58component.setServiceParent(application)
     59
     60sessionManager = client.SessionManager(domain, accounts)
     61sessionManager.setHandlerParent(component)
     62
     63im.AccountIQHandler(sessionManager).setHandlerParent(component)
     64im.AccountMessageHandler(sessionManager).setHandlerParent(component)
     65im.PresenceServerHandler(sessionManager, domain, roster).setHandlerParent(component)
     66FallbackHandler().setHandlerParent(component)
     67StaticRoster(roster).setHandlerParent(component)
     68PingHandler().setHandlerParent(component)
     69
     70c2sFactory = client.XMPPC2SServerFactory(sessionManager)
     71c2sFactory.logTraffic = True
     72c2sService = strports.service('5224', c2sFactory)
     73c2sService.setServiceParent(application)
     74
     75sessionManager.connectionManager = c2sFactory
  • wokkel/im.py

    diff -r 9e8497278e0e wokkel/im.py
    a b  
    1010U{RFC 6121<http://www.xmpp.org/rfcs/rfc6121.html>} (XMPP IM).
    1111"""
    1212
     13import copy
     14
    1315from twisted.internet import defer
     16from twisted.python import log
    1417from twisted.words.protocols.jabber import jid
    1518from twisted.words.protocols.jabber import error
    1619from twisted.words.xish import domish
     
    168171        self.xmlstream.addObserver("/presence", self._onPresence)
    169172
    170173
    171     def _onPresence(self, element):
     174    def parsePresence(self, element):
     175        """
     176        Parse presence.
     177        """
    172178        stanza = Stanza.fromElement(element)
    173179
    174180        presenceType = stanza.stanzaType or 'available'
     
    178184        except KeyError:
    179185            return
    180186
    181         presence = parser.fromElement(element)
     187        return parser.fromElement(element)
     188
     189
     190    def _onPresence(self, element):
     191        presence = self.parsePresence(element)
     192        presenceType = presence.stanzaType or 'available'
    182193
    183194        try:
    184195            handler = getattr(self, '%sReceived' % presenceType)
    185196        except AttributeError:
    186197            return
    187198        else:
    188             handler(presence)
     199            element.handled = handler(presence)
    189200
    190201
    191202    def errorReceived(self, presence):
     
    553564
    554565
    555566
     567class AccountIQHandler(XMPPHandler):
     568
     569    def __init__(self, sessionManager):
     570        XMPPHandler.__init__(self)
     571        self.sessionManager = sessionManager
     572
     573
     574    def connectionMade(self):
     575        self.xmlstream.addObserver('/iq', self.onIQ, 1)
     576
     577
     578    def onIQ(self, iq):
     579        """
     580        Handler for iq stanzas to user accounts' connected resources.
     581
     582        If the recipient is a bare JID or there is no associated user, this
     583        handler ignores the stanza, so that other handlers have a chance
     584        to pick it up. If used, L{generic.FallbackHandler} will respond with a
     585        C{'service-unavailable'} stanza error if no other handlers handle
     586        the iq.
     587        """
     588
     589        if iq.handled:
     590            return
     591
     592        try:
     593            recipient = jid.internJID(iq['to'])
     594        except KeyError:
     595            return
     596
     597        if not recipient.user:
     598            # This is not for an account, ignore it
     599            return
     600        elif recipient.user not in self.sessionManager.accounts:
     601            # This is not a user, ignore it
     602            return
     603        elif not recipient.resource:
     604            # Bare JID at local domain, ignore it
     605            return
     606
     607        userSessions = self.sessionManager.sessions.get(recipient.user,
     608                                                        {})
     609        if recipient.resource in userSessions:
     610            self.sessionManager.deliverStanza(iq, recipient)
     611        else:
     612            # Full JID without connected resource, return error
     613            exc = error.StanzaError('service-unavailable')
     614            if iq['type'] in ('result', 'error'):
     615                log.err(exc, 'Could not deliver IQ response')
     616            else:
     617                self.send(exc.toResponse(iq))
     618
     619        iq.handled = True
     620
     621
     622
     623class AccountMessageHandler(XMPPHandler):
     624
     625    def __init__(self, sessionManager):
     626        XMPPHandler.__init__(self)
     627        self.sessionManager = sessionManager
     628
     629
     630    def connectionMade(self):
     631        self.xmlstream.addObserver('/message', self.onMessage, 1)
     632
     633
     634    def onMessage(self, message):
     635        """
     636        Handler for message stanzas to user accounts.
     637        """
     638
     639        if message.handled:
     640            return
     641
     642        try:
     643            recipient = jid.internJID(message['to'])
     644        except KeyError:
     645            return
     646
     647        stanzaType = message.getAttribute('type', 'normal')
     648
     649        try:
     650            if not recipient.user:
     651                # This is not for an account, ignore it
     652                return
     653            elif recipient.user not in self.sessionManager.accounts:
     654                # This is not a user, ignore it
     655                return
     656            elif recipient.resource:
     657                userSessions = self.sessionManager.sessions.get(recipient.user,
     658                                                                {})
     659                if recipient.resource in userSessions:
     660                    self.sessionManager.deliverStanza(message, recipient)
     661                else:
     662                    if stanzaType in ('normal', 'chat', 'headline'):
     663                        self.onMessageBareJID(message, recipient.userhostJID())
     664                    elif stanzaType == 'error':
     665                        log.msg("Dropping message to unconnected resource %r" %
     666                                recipient.full())
     667                    elif stanzaType == 'groupchat':
     668                        raise error.StanzaError('service-unavailable')
     669            else:
     670                self.onMessageBareJID(message, recipient)
     671        except error.StanzaError, exc:
     672            if stanzaType == 'error':
     673                log.err(exc, "Undeliverable error")
     674            else:
     675                self.send(exc.toResponse(message))
     676
     677        message.handled = True
     678
     679
     680    def onMessageBareJID(self, message, bareJID):
     681        stanzaType = message.getAttribute('type', 'normal')
     682
     683        userSessions = self.sessionManager.sessions.get(bareJID.user, {})
     684
     685        recipients = set()
     686
     687        if stanzaType == 'headline':
     688            for session in userSessions:
     689                if session.presence.priority >= 0:
     690                    recipients.add(session.entity)
     691        elif stanzaType in ('chat', 'normal'):
     692            priorities = {}
     693            for session in userSessions.itervalues():
     694                if not session.presence or not session.presence.available:
     695                    continue
     696                priority = session.presence.priority
     697                if priority >= 0:
     698                    priorities.setdefault(priority, set()).add(session.entity)
     699                maxPriority = max(priorities.keys())
     700                recipients.update(priorities[maxPriority])
     701        elif stanzaType == 'groupchat':
     702            raise error.StanzaError('service-unavailable')
     703
     704        if recipients:
     705            for recipient in recipients:
     706                self.sessionManager.deliverStanza(message, recipient)
     707        elif stanzaType in ('chat', 'normal'):
     708            raise error.StanzaError('service-unavailable')
     709        else:
     710            # silently discard
     711            log.msg("Discarding message to %r" % message['to'])
     712
     713
     714
     715
     716def clonePresence(presence):
     717    """
     718    Make a deep copy of a presence stanza.
     719
     720    The returned presence stanza is an orphaned deep copy of the given
     721    original.
     722
     723    @note: Since the reference to the original parent, if any, is gone,
     724    inherited attributes like C{xml:lang} are not preserved.
     725    """
     726    element = presence.element
     727
     728    parent = element.parent
     729    element.parent = None
     730    newElement = copy.deepcopy(element)
     731    element.parent = parent
     732    return newElement
     733
     734
     735
     736class PresenceServerHandler(PresenceProtocol):
     737
     738    def __init__(self, sessionManager, domain, roster):
     739        PresenceProtocol.__init__(self)
     740        self.sessionManager = sessionManager
     741        self.domain = domain
     742        self.roster = roster
     743        self.presences = {} # user -> resource -> presence
     744        self.offlinePresences = {} # user -> presence
     745        self.remotePresences = {} # user -> remote entity -> presence
     746
     747        self.sessionManager.clientStream.addObserver('/presence',
     748                                                     self._onPresenceOutbound)
     749
     750
     751    def _onPresenceOutbound(self, element):
     752        log.msg("Got outbound presence: %r" % element.toXml())
     753        presence = self.parsePresence(element)
     754
     755        presenceType = presence.stanzaType or 'available'
     756        method = '%sReceivedOutbound' % presenceType
     757        print method
     758
     759        try:
     760            handler = getattr(self, method)
     761        except AttributeError:
     762            return
     763        else:
     764            element.handled = handler(presence)
     765
     766
     767    def _broadcastToOtherResources(self, presence):
     768        """
     769        Broadcast presence to other available resources.
     770        """
     771        fromJID = presence.sender
     772        for otherResource in self.presences[fromJID.user]:
     773            if otherResource == fromJID.resource:
     774                continue
     775
     776            resourceJID = jid.JID(tuple=(fromJID.user,
     777                                         fromJID.host,
     778                                         otherResource))
     779            outPresence = clonePresence(presence)
     780            outPresence['to'] = resourceJID.full()
     781            self.sessionManager.deliverStanza(outPresence, resourceJID)
     782
     783
     784    def _broadcastToContacts(self, presence):
     785        """
     786        Broadcast presence to subscribed entities.
     787        """
     788        fromJID = presence.sender
     789        roster = self.roster[fromJID.user]
     790
     791        for item in roster.itervalues():
     792            if not item.subscriptionFrom:
     793                continue
     794
     795            outPresence = clonePresence(presence)
     796            outPresence['to'] = item.entity.full()
     797
     798            if item.entity.host == self.domain:
     799                # local contact
     800                if item.entity.user in self.presences:
     801                    # broadcast to contact's available resources
     802                    for itemResource in self.presences[item.entity.user]:
     803                        resourceJID = jid.JID(tuple=(item.entity.user,
     804                                                     item.entity.host,
     805                                                     itemResource))
     806                        self.sessionManager.deliverStanza(outPresence,
     807                                                          resourceJID)
     808            else:
     809                # remote contact
     810                self.send(outPresence)
     811
     812
     813    def _on_availableBroadcast(self, presence):
     814        fromJID = presence.sender
     815        user, resource = fromJID.user, fromJID.resource
     816        roster = self.roster[user]
     817
     818        if user not in self.presences:
     819            # initial presence
     820            self.presences[user] = {}
     821            self.remotePresences[user] = {}
     822
     823            # send out probes
     824            for item in roster.itervalues():
     825                if item.subscriptionTo and item.entity.host != self.domain:
     826                    self.probe(item.entity, fromJID)
     827        else:
     828            if resource not in self.presences[user]:
     829                # initial presence with another available resource
     830
     831                # send last known presences from remote contacts
     832                remotePresences = self.remotePresences[user]
     833                for entity, remotePresence in remotePresences.iteritems():
     834                    self.sessionManager.deliverStanza(remotePresence.element,
     835                                                      fromJID)
     836
     837            # send presence to other resources
     838            self._broadcastToOtherResources(presence)
     839
     840        # Send last known local presences
     841        if user not in self.presences or resource not in self.presences[user]:
     842            for item in roster.itervalues():
     843                if item.subscriptionTo and \
     844                   item.entity.host == self.domain and \
     845                   item.entity.user in self.presences:
     846                    for contactPresence in \
     847                            self.presences[item.entity.user].itervalues():
     848                        outPresence = clonePresence(contactPresence)
     849                        outPresence['to'] = fromJID.userhost()
     850                        self.sessionManager.deliverStanza(outPresence, fromJID)
     851
     852        # broadcast presence
     853        self._broadcastToContacts(presence)
     854
     855        # save presence
     856        self.presences[user][resource] = presence
     857        self.sessionManager.sessions[user][resource].presence = presence
     858
     859        return True
     860
     861
     862    def _on_availableDirected(self, presence):
     863        self.send(presence.element)
     864        return True
     865
     866
     867    def availableReceivedOutbound(self, presence):
     868        if presence.recipient:
     869            return self._on_availableDirected(presence)
     870        else:
     871            return self._on_availableBroadcast(presence)
     872
     873
     874    def availableReceived(self, presence):
     875        fromJID = presence.sender
     876        toJID = presence.recipient
     877
     878        if toJID.user not in self.roster:
     879            return False
     880
     881        if toJID.user in self.presences:
     882            for resource in self.presences[toJID.user]:
     883                resourceJID = jid.JID(tuple=(toJID.user,
     884                                             toJID.host,
     885                                             resource))
     886                self.sessionManager.deliverStanza(presence.element, resourceJID)
     887            self.remotePresences[toJID.user][fromJID] = presence
     888        else:
     889            # no such user or no available resource, ignore this stanza
     890            pass
     891
     892        return True
     893
     894
     895    def _on_unavailableBroadcast(self, presence):
     896        fromJID = presence.sender
     897        user, resource = fromJID.user, fromJID.resource
     898
     899        # broadcast presence
     900        self._broadcastToContacts(presence)
     901
     902        if user in self.presences:
     903            # send presence to other resources
     904            self._broadcastToOtherResources(presence)
     905
     906            # update stored presences
     907            if resource in self.presences[user]:
     908                del self.presences[user][resource]
     909
     910            if not self.presences[user]:
     911                # last resource to become unavailable
     912                del self.presences[user]
     913
     914                # TODO: save last unavailable presence
     915
     916        return True
     917
     918
     919    def _on_unavailableDirected(self, presence):
     920        self.send(presence.element)
     921        return True
     922
     923
     924    def unavailableReceivedOutbound(self, presence):
     925        if presence.recipient:
     926            return self._on_unavailableDirected(presence)
     927        else:
     928            return self._on_unavailableBroadcast(presence)
     929
     930#    def unavailableReceived(self, presence):
     931
     932
     933    def subscribedReceivedOutbound(self, presence):
     934        log.msg("%r subscribed %s to its presence" % (presence.sender,
     935                                                      presence.recipient))
     936        self.send(presence.element)
     937        return True
     938
     939
     940    def subscribedReceived(self, presence):
     941        log.msg("%r subscribed %s to its presence" % (presence.sender,
     942                                                      presence.recipient))
     943
     944
     945    def unsubscribedReceivedOutbound(self, presence):
     946        log.msg("%r unsubscribed %s from its presence" % (presence.sender,
     947                                                          presence.recipient))
     948        self.send(presence.element)
     949        return True
     950
     951
     952    def unsubscribedReceived(self, presence):
     953        log.msg("%r unsubscribed %s from its presence" % (presence.sender,
     954                                                          presence.recipient))
     955
     956
     957    def subscribeReceivedOutbound(self, presence):
     958        log.msg("%r requests subscription to %s" % (presence.sender,
     959                                                    presence.recipient))
     960        self.send(presence.element)
     961        return True
     962
     963
     964    def subscribeReceived(self, presence):
     965        log.msg("%r requests subscription to %s" % (presence.sender,
     966                                                    presence.recipient))
     967
     968
     969    def unsubscribeReceivedOutbound(self, presence):
     970        log.msg("%r requests unsubscription from %s" % (presence.sender,
     971                                                        presence.recipient))
     972        self.send(presence.element)
     973        return True
     974
     975
     976    def unsubscribeReceived(self, presence):
     977        log.msg("%r requests unsubscription from %s" % (presence.sender,
     978                                                        presence.recipient))
     979
     980
     981    def probeReceived(self, presence):
     982        fromJID = presence.sender
     983        toJID = presence.recipient
     984
     985        if toJID.user not in self.roster or \
     986           fromJID.userhost() not in self.roster[toJID.user] or \
     987           not self.roster[toJID.user][fromJID.userhost()].subscriptionFrom:
     988            # send unsubscribed
     989            pass
     990        elif toJID.user not in self.presences:
     991            # send last unavailable or nothing
     992            pass
     993        else:
     994            for resourcePresence in self.presences[toJID.user].itervalues():
     995                outPresence = clonePresence(resourcePresence)
     996                outPresence['to'] = fromJID.userhost()
     997                self.send(outPresence)
     998
     999
     1000
    5561001class RosterServerProtocol(XMPPHandler, IQHandlerMixin):
    5571002    """
    5581003    XMPP subprotocol handler for the roster, server side.
  • wokkel/test/test_im.py

    diff -r 9e8497278e0e wokkel/test/test_im.py
    a b  
    1313from twisted.words.xish import domish, utility
    1414
    1515from wokkel import im
    16 from wokkel.generic import ErrorStanza, parseXml
     16from wokkel.generic import ErrorStanza, Stanza, parseXml
    1717from wokkel.test.helpers import TestableRequestHandlerMixin, XmlStreamStub
    1818
    1919NS_XML = 'http://www.w3.org/XML/1998/namespace'
     
    846846
    847847
    848848
     849class AccountIQHandlerTest(unittest.TestCase):
     850    """
     851    Tests for L{im.AccountIQHandler}.
     852    """
     853
     854    def setUp(self):
     855        self.stub = XmlStreamStub()
     856        self.protocol = im.AccountIQHandler(None)
     857        self.protocol.makeConnection(self.stub.xmlstream)
     858        self.protocol.connectionInitialized()
     859
     860
     861    def test_onIQNotUser(self):
     862        """
     863        IQs to JIDs without local part are ignored.
     864        """
     865        xml = """
     866          <iq to='example.org'>
     867            <query xmlns='jabber:iq:version'/>
     868          </iq>
     869        """
     870
     871        iq = parseXml(xml)
     872        self.stub.send(iq)
     873
     874        self.assertFalse(getattr(iq, 'handled'))
     875
     876
     877
     878class AccountMessageHandlerTest(unittest.TestCase):
     879    """
     880    Tests for L{im.AccountMessageHandler}.
     881    """
     882
     883    def setUp(self):
     884        self.stub = XmlStreamStub()
     885        self.protocol = im.AccountMessageHandler(None)
     886        self.protocol.makeConnection(self.stub.xmlstream)
     887        self.protocol.connectionInitialized()
     888
     889
     890    def test_onMessageNotUser(self):
     891        """
     892        Messages to JIDs without local part are ignored.
     893        """
     894        xml = """
     895          <message to='example.org'>
     896            <body>Hello</body>
     897          </message>
     898        """
     899
     900        message = parseXml(xml)
     901        self.stub.send(message)
     902
     903        self.assertFalse(getattr(message, 'handled'))
     904
     905
     906
     907class ClonePresenceTest(unittest.TestCase):
     908    """
     909    Tests for L{im.clonePresence}.
     910    """
     911
     912    def test_rootElement(self):
     913        """
     914        The copied presence stanza is not identical, but renders identically.
     915        """
     916        originalElement = domish.Element((None, 'presence'))
     917        stanza = Stanza.fromElement(originalElement)
     918        copyElement = im.clonePresence(stanza)
     919
     920        self.assertNotIdentical(copyElement, originalElement)
     921        self.assertEquals(copyElement.toXml(), originalElement.toXml())
     922
     923
     924
    849925class RosterServerProtocolTest(unittest.TestCase, TestableRequestHandlerMixin):
    850926    """
    851927    Tests for L{im.RosterServerProtocol}.
Note: See TracBrowser for help on using the repository browser.