source: ralphm-patches/c2s_stanza_handlers.patch @ 57:0d8b6cf41728

Last change on this file since 57:0d8b6cf41728 was 57:0d8b6cf41728, checked in by Ralph Meijer <ralphm@…>, 9 years ago

Wokkel 0.7.0 release, clean up various patches.

File size: 21.8 KB
  • new file doc/examples/client_service.tac

    # HG changeset patch
    # Parent 24c6e79ab1c449f41fd5d4c2cb843dc16efe2c59
    Add c2s protocol handlers for iq, message and presence stanzas.
    
    TODO:
     * Add tests.
     * Add docstrings.
     * Save last unavailable presence for future probes.
    
    diff -r 24c6e79ab1c4 doc/examples/client_service.tac
    - +  
     1from twisted.application import service, strports
     2from twisted.internet import defer
     3
     4from wokkel import client, im
     5from wokkel.component import InternalComponent, Router
     6from wokkel.generic import FallbackHandler
     7from wokkel.ping import PingHandler
     8from wokkel.im import RosterItem
     9
     10from twisted.words.protocols.jabber.jid import internJID as JID
     11
     12import socket
     13domain = socket.gethostname()
     14
     15RALPHM = JID('ralphm@'+domain)
     16INTOSI = JID('intosi@'+domain)
     17TERMIE = JID('termie@'+domain)
     18
     19roster = {
     20    'ralphm': {
     21        INTOSI: RosterItem(INTOSI,
     22                           subscriptionTo=True,
     23                           subscriptionFrom=True,
     24                           name='Intosi',
     25                           groups=set(['Friends'])),
     26        TERMIE: RosterItem(TERMIE,
     27                           subscriptionTo=True,
     28                           subscriptionFrom=True,
     29                           name='termie'),
     30        },
     31    'termie': {
     32        RALPHM: RosterItem(RALPHM,
     33                           subscriptionTo=True,
     34                           subscriptionFrom=True,
     35                           name='ralphm'),
     36        }
     37    }
     38
     39accounts = set(roster.keys())
     40
     41
     42class StaticRoster(im.RosterServerProtocol):
     43
     44    def __init__(self, roster):
     45        im.RosterServerProtocol.__init__(self)
     46        self.roster = roster
     47
     48    def getRoster(self, request):
     49        user = request.sender.user
     50        return defer.succeed(self.roster[user].values())
     51
     52
     53
     54application = service.Application("Jabber server")
     55
     56router = Router()
     57component = InternalComponent(router, domain)
     58component.setServiceParent(application)
     59
     60sessionManager = client.SessionManager(domain, accounts)
     61sessionManager.setHandlerParent(component)
     62
     63im.AccountIQHandler(sessionManager).setHandlerParent(component)
     64im.AccountMessageHandler(sessionManager).setHandlerParent(component)
     65im.PresenceServerHandler(sessionManager, domain, roster).setHandlerParent(component)
     66FallbackHandler().setHandlerParent(component)
     67StaticRoster(roster).setHandlerParent(component)
     68PingHandler().setHandlerParent(component)
     69
     70c2sFactory = client.XMPPC2SServerFactory(sessionManager)
     71c2sFactory.logTraffic = True
     72c2sService = strports.service('5224', c2sFactory)
     73c2sService.setServiceParent(application)
     74
     75sessionManager.connectionManager = c2sFactory
  • wokkel/im.py

    diff -r 24c6e79ab1c4 wokkel/im.py
    a b  
    1010U{RFC 6121<http://www.xmpp.org/rfcs/rfc6121.html>} (XMPP IM).
    1111"""
    1212
     13import copy
     14
    1315from twisted.internet import defer
     16from twisted.python import log
    1417from twisted.words.protocols.jabber import error
    1518from twisted.words.protocols.jabber import jid
    1619from twisted.words.xish import domish
     
    179182
    180183
    181184
    182     def _onPresence(self, element):
    183         """
    184         Called when a presence stanza has been received.
    185         """
     185    def parsePresence(self, element):
    186186        stanza = Stanza.fromElement(element)
    187187
    188188        presenceType = stanza.stanzaType or 'available'
     
    192192        except KeyError:
    193193            return
    194194
    195         presence = parser.fromElement(element)
     195        return parser.fromElement(element)
     196
     197
     198    def _onPresence(self, element):
     199        """
     200        Called when a presence stanza has been received.
     201        """
     202        presence = self.parsePresence(element)
     203        presenceType = presence.stanzaType or 'available'
    196204
    197205        try:
    198206            handler = getattr(self, '%sReceived' % presenceType)
    199207        except AttributeError:
    200208            return
    201209        else:
    202             handler(presence)
     210            element.handled = handler(presence)
    203211
    204212
    205213
     
    582590
    583591
    584592
     593class AccountIQHandler(XMPPHandler):
     594
     595    def __init__(self, sessionManager):
     596        XMPPHandler.__init__(self)
     597        self.sessionManager = sessionManager
     598
     599
     600    def connectionMade(self):
     601        self.xmlstream.addObserver('/iq', self.onIQ, 1)
     602
     603
     604    def onIQ(self, iq):
     605        """
     606        Handler for iq stanzas to user accounts' connected resources.
     607
     608        If the recipient is a bare JID or there is no associated user, this
     609        handler ignores the stanza, so that other handlers have a chance
     610        to pick it up. If used, L{generic.FallbackHandler} will respond with a
     611        C{'service-unavailable'} stanza error if no other handlers handle
     612        the iq.
     613        """
     614
     615        if iq.handled:
     616            return
     617
     618        try:
     619            recipient = jid.internJID(iq['to'])
     620        except KeyError:
     621            return
     622
     623        if not recipient.user:
     624            # This is not for an account, ignore it
     625            return
     626        elif recipient.user not in self.sessionManager.accounts:
     627            # This is not a user, ignore it
     628            return
     629        elif not recipient.resource:
     630            # Bare JID at local domain, ignore it
     631            return
     632
     633        userSessions = self.sessionManager.sessions.get(recipient.user,
     634                                                        {})
     635        if recipient.resource in userSessions:
     636            self.sessionManager.deliverStanza(iq, recipient)
     637        else:
     638            # Full JID without connected resource, return error
     639            exc = error.StanzaError('service-unavailable')
     640            if iq['type'] in ('result', 'error'):
     641                log.err(exc, 'Could not deliver IQ response')
     642            else:
     643                self.send(exc.toResponse(iq))
     644
     645        iq.handled = True
     646
     647
     648
     649class AccountMessageHandler(XMPPHandler):
     650
     651    def __init__(self, sessionManager):
     652        XMPPHandler.__init__(self)
     653        self.sessionManager = sessionManager
     654
     655
     656    def connectionMade(self):
     657        self.xmlstream.addObserver('/message', self.onMessage, 1)
     658
     659
     660    def onMessage(self, message):
     661        """
     662        Handler for message stanzas to user accounts.
     663        """
     664
     665        if message.handled:
     666            return
     667
     668        try:
     669            recipient = jid.internJID(message['to'])
     670        except KeyError:
     671            return
     672
     673        stanzaType = message.getAttribute('type', 'normal')
     674
     675        try:
     676            if not recipient.user:
     677                # This is not for an account, ignore it
     678                return
     679            elif recipient.user not in self.sessionManager.accounts:
     680                # This is not a user, ignore it
     681                return
     682            elif recipient.resource:
     683                userSessions = self.sessionManager.sessions.get(recipient.user,
     684                                                                {})
     685                if recipient.resource in userSessions:
     686                    self.sessionManager.deliverStanza(message, recipient)
     687                else:
     688                    if stanzaType in ('normal', 'chat', 'headline'):
     689                        self.onMessageBareJID(message, recipient.userhostJID())
     690                    elif stanzaType == 'error':
     691                        log.msg("Dropping message to unconnected resource %r" %
     692                                recipient.full())
     693                    elif stanzaType == 'groupchat':
     694                        raise error.StanzaError('service-unavailable')
     695            else:
     696                self.onMessageBareJID(message, recipient)
     697        except error.StanzaError, exc:
     698            if stanzaType == 'error':
     699                log.err(exc, "Undeliverable error")
     700            else:
     701                self.send(exc.toResponse(message))
     702
     703        message.handled = True
     704
     705
     706    def onMessageBareJID(self, message, bareJID):
     707        stanzaType = message.getAttribute('type', 'normal')
     708
     709        userSessions = self.sessionManager.sessions.get(bareJID.user, {})
     710
     711        recipients = set()
     712
     713        if stanzaType == 'headline':
     714            for session in userSessions:
     715                if session.presence.priority >= 0:
     716                    recipients.add(session.entity)
     717        elif stanzaType in ('chat', 'normal'):
     718            priorities = {}
     719            for session in userSessions.itervalues():
     720                if not session.presence or not session.presence.available:
     721                    continue
     722                priority = session.presence.priority
     723                if priority >= 0:
     724                    priorities.setdefault(priority, set()).add(session.entity)
     725                maxPriority = max(priorities.keys())
     726                recipients.update(priorities[maxPriority])
     727        elif stanzaType == 'groupchat':
     728            raise error.StanzaError('service-unavailable')
     729
     730        if recipients:
     731            for recipient in recipients:
     732                self.sessionManager.deliverStanza(message, recipient)
     733        elif stanzaType in ('chat', 'normal'):
     734            raise error.StanzaError('service-unavailable')
     735        else:
     736            # silently discard
     737            log.msg("Discarding message to %r" % message['to'])
     738
     739
     740
     741
     742def clonePresence(presence):
     743    """
     744    Make a deep copy of a presence stanza.
     745
     746    The returned presence stanza is an orphaned deep copy of the given
     747    original.
     748
     749    @note: Since the reference to the original parent, if any, is gone,
     750    inherited attributes like C{xml:lang} are not preserved.
     751    """
     752    element = presence.element
     753
     754    parent = element.parent
     755    element.parent = None
     756    newElement = copy.deepcopy(element)
     757    element.parent = parent
     758    return newElement
     759
     760
     761
     762class PresenceServerHandler(PresenceProtocol):
     763
     764    def __init__(self, sessionManager, domain, roster):
     765        PresenceProtocol.__init__(self)
     766        self.sessionManager = sessionManager
     767        self.domain = domain
     768        self.roster = roster
     769        self.presences = {} # user -> resource -> presence
     770        self.offlinePresences = {} # user -> presence
     771        self.remotePresences = {} # user -> remote entity -> presence
     772
     773        self.sessionManager.clientStream.addObserver('/presence',
     774                                                     self._onPresenceOutbound)
     775
     776
     777    def _onPresenceOutbound(self, element):
     778        log.msg("Got outbound presence: %r" % element.toXml())
     779        presence = self.parsePresence(element)
     780
     781        presenceType = presence.stanzaType or 'available'
     782        method = '%sReceivedOutbound' % presenceType
     783        print method
     784
     785        try:
     786            handler = getattr(self, method)
     787        except AttributeError:
     788            return
     789        else:
     790            element.handled = handler(presence)
     791
     792
     793    def _broadcastToOtherResources(self, presence):
     794        """
     795        Broadcast presence to other available resources.
     796        """
     797        fromJID = presence.sender
     798        for otherResource in self.presences[fromJID.user]:
     799            if otherResource == fromJID.resource:
     800                continue
     801
     802            resourceJID = jid.JID(tuple=(fromJID.user,
     803                                         fromJID.host,
     804                                         otherResource))
     805            outPresence = clonePresence(presence)
     806            outPresence['to'] = resourceJID.full()
     807            self.sessionManager.deliverStanza(outPresence, resourceJID)
     808
     809
     810    def _broadcastToContacts(self, presence):
     811        """
     812        Broadcast presence to subscribed entities.
     813        """
     814        fromJID = presence.sender
     815        roster = self.roster[fromJID.user]
     816
     817        for item in roster.itervalues():
     818            if not item.subscriptionFrom:
     819                continue
     820
     821            outPresence = clonePresence(presence)
     822            outPresence['to'] = item.entity.full()
     823
     824            if item.entity.host == self.domain:
     825                # local contact
     826                if item.entity.user in self.presences:
     827                    # broadcast to contact's available resources
     828                    for itemResource in self.presences[item.entity.user]:
     829                        resourceJID = jid.JID(tuple=(item.entity.user,
     830                                                     item.entity.host,
     831                                                     itemResource))
     832                        self.sessionManager.deliverStanza(outPresence,
     833                                                          resourceJID)
     834            else:
     835                # remote contact
     836                self.send(outPresence)
     837
     838
     839    def _on_availableBroadcast(self, presence):
     840        fromJID = presence.sender
     841        user, resource = fromJID.user, fromJID.resource
     842        roster = self.roster[user]
     843
     844        if user not in self.presences:
     845            # initial presence
     846            self.presences[user] = {}
     847            self.remotePresences[user] = {}
     848
     849            # send out probes
     850            for item in roster.itervalues():
     851                if item.subscriptionTo and item.entity.host != self.domain:
     852                    self.probe(item.entity, fromJID)
     853        else:
     854            if resource not in self.presences[user]:
     855                # initial presence with another available resource
     856
     857                # send last known presences from remote contacts
     858                remotePresences = self.remotePresences[user]
     859                for entity, remotePresence in remotePresences.iteritems():
     860                    self.sessionManager.deliverStanza(remotePresence.element,
     861                                                      fromJID)
     862
     863            # send presence to other resources
     864            self._broadcastToOtherResources(presence)
     865
     866        # Send last known local presences
     867        if user not in self.presences or resource not in self.presences[user]:
     868            for item in roster.itervalues():
     869                if item.subscriptionTo and \
     870                   item.entity.host == self.domain and \
     871                   item.entity.user in self.presences:
     872                    for contactPresence in \
     873                            self.presences[item.entity.user].itervalues():
     874                        outPresence = clonePresence(contactPresence)
     875                        outPresence['to'] = fromJID.userhost()
     876                        self.sessionManager.deliverStanza(outPresence, fromJID)
     877
     878        # broadcast presence
     879        self._broadcastToContacts(presence)
     880
     881        # save presence
     882        self.presences[user][resource] = presence
     883        self.sessionManager.sessions[user][resource].presence = presence
     884
     885        return True
     886
     887
     888    def _on_availableDirected(self, presence):
     889        self.send(presence.element)
     890        return True
     891
     892
     893    def availableReceivedOutbound(self, presence):
     894        if presence.recipient:
     895            return self._on_availableDirected(presence)
     896        else:
     897            return self._on_availableBroadcast(presence)
     898
     899
     900    def availableReceived(self, presence):
     901        fromJID = presence.sender
     902        toJID = presence.recipient
     903
     904        if toJID.user not in self.roster:
     905            return False
     906
     907        if toJID.user in self.presences:
     908            for resource in self.presences[toJID.user]:
     909                resourceJID = jid.JID(tuple=(toJID.user,
     910                                             toJID.host,
     911                                             resource))
     912                self.sessionManager.deliverStanza(presence.element, resourceJID)
     913            self.remotePresences[toJID.user][fromJID] = presence
     914        else:
     915            # no such user or no available resource, ignore this stanza
     916            pass
     917
     918        return True
     919
     920
     921    def _on_unavailableBroadcast(self, presence):
     922        fromJID = presence.sender
     923        user, resource = fromJID.user, fromJID.resource
     924
     925        # broadcast presence
     926        self._broadcastToContacts(presence)
     927
     928        if user in self.presences:
     929            # send presence to other resources
     930            self._broadcastToOtherResources(presence)
     931
     932            # update stored presences
     933            if resource in self.presences[user]:
     934                del self.presences[user][resource]
     935
     936            if not self.presences[user]:
     937                # last resource to become unavailable
     938                del self.presences[user]
     939
     940                # TODO: save last unavailable presence
     941
     942        return True
     943
     944
     945    def _on_unavailableDirected(self, presence):
     946        self.send(presence.element)
     947        return True
     948
     949
     950    def unavailableReceivedOutbound(self, presence):
     951        if presence.recipient:
     952            return self._on_unavailableDirected(presence)
     953        else:
     954            return self._on_unavailableBroadcast(presence)
     955
     956#    def unavailableReceived(self, presence):
     957
     958
     959    def subscribedReceivedOutbound(self, presence):
     960        log.msg("%r subscribed %s to its presence" % (presence.sender,
     961                                                      presence.recipient))
     962        self.send(presence.element)
     963        return True
     964
     965
     966    def subscribedReceived(self, presence):
     967        log.msg("%r subscribed %s to its presence" % (presence.sender,
     968                                                      presence.recipient))
     969
     970
     971    def unsubscribedReceivedOutbound(self, presence):
     972        log.msg("%r unsubscribed %s from its presence" % (presence.sender,
     973                                                          presence.recipient))
     974        self.send(presence.element)
     975        return True
     976
     977
     978    def unsubscribedReceived(self, presence):
     979        log.msg("%r unsubscribed %s from its presence" % (presence.sender,
     980                                                          presence.recipient))
     981
     982
     983    def subscribeReceivedOutbound(self, presence):
     984        log.msg("%r requests subscription to %s" % (presence.sender,
     985                                                    presence.recipient))
     986        self.send(presence.element)
     987        return True
     988
     989
     990    def subscribeReceived(self, presence):
     991        log.msg("%r requests subscription to %s" % (presence.sender,
     992                                                    presence.recipient))
     993
     994
     995    def unsubscribeReceivedOutbound(self, presence):
     996        log.msg("%r requests unsubscription from %s" % (presence.sender,
     997                                                        presence.recipient))
     998        self.send(presence.element)
     999        return True
     1000
     1001
     1002    def unsubscribeReceived(self, presence):
     1003        log.msg("%r requests unsubscription from %s" % (presence.sender,
     1004                                                        presence.recipient))
     1005
     1006
     1007    def probeReceived(self, presence):
     1008        fromJID = presence.sender
     1009        toJID = presence.recipient
     1010
     1011        if toJID.user not in self.roster or \
     1012           fromJID.userhost() not in self.roster[toJID.user] or \
     1013           not self.roster[toJID.user][fromJID.userhost()].subscriptionFrom:
     1014            # send unsubscribed
     1015            pass
     1016        elif toJID.user not in self.presences:
     1017            # send last unavailable or nothing
     1018            pass
     1019        else:
     1020            for resourcePresence in self.presences[toJID.user].itervalues():
     1021                outPresence = clonePresence(resourcePresence)
     1022                outPresence['to'] = fromJID.userhost()
     1023                self.send(outPresence)
     1024
     1025
     1026
    5851027class RosterServerProtocol(XMPPHandler, IQHandlerMixin):
    5861028    """
    5871029    XMPP subprotocol handler for the roster, server side.
  • wokkel/test/test_im.py

    diff -r 24c6e79ab1c4 wokkel/test/test_im.py
    a b  
    1313from twisted.words.xish import domish, utility
    1414
    1515from wokkel import im
    16 from wokkel.generic import ErrorStanza, parseXml
     16from wokkel.generic import ErrorStanza, Stanza, parseXml
    1717from wokkel.test.helpers import TestableRequestHandlerMixin, XmlStreamStub
    1818
    1919NS_XML = 'http://www.w3.org/XML/1998/namespace'
     
    846846
    847847
    848848
     849class AccountIQHandlerTest(unittest.TestCase):
     850    """
     851    Tests for L{im.AccountIQHandler}.
     852    """
     853
     854    def setUp(self):
     855        self.stub = XmlStreamStub()
     856        self.protocol = im.AccountIQHandler(None)
     857        self.protocol.makeConnection(self.stub.xmlstream)
     858        self.protocol.connectionInitialized()
     859
     860
     861    def test_onIQNotUser(self):
     862        """
     863        IQs to JIDs without local part are ignored.
     864        """
     865        xml = """
     866          <iq to='example.org'>
     867            <query xmlns='jabber:iq:version'/>
     868          </iq>
     869        """
     870
     871        iq = parseXml(xml)
     872        self.stub.send(iq)
     873
     874        self.assertFalse(getattr(iq, 'handled'))
     875
     876
     877
     878class AccountMessageHandlerTest(unittest.TestCase):
     879    """
     880    Tests for L{im.AccountMessageHandler}.
     881    """
     882
     883    def setUp(self):
     884        self.stub = XmlStreamStub()
     885        self.protocol = im.AccountMessageHandler(None)
     886        self.protocol.makeConnection(self.stub.xmlstream)
     887        self.protocol.connectionInitialized()
     888
     889
     890    def test_onMessageNotUser(self):
     891        """
     892        Messages to JIDs without local part are ignored.
     893        """
     894        xml = """
     895          <message to='example.org'>
     896            <body>Hello</body>
     897          </message>
     898        """
     899
     900        message = parseXml(xml)
     901        self.stub.send(message)
     902
     903        self.assertFalse(getattr(message, 'handled'))
     904
     905
     906
     907class ClonePresenceTest(unittest.TestCase):
     908    """
     909    Tests for L{im.clonePresence}.
     910    """
     911
     912    def test_rootElement(self):
     913        """
     914        The copied presence stanza is not identical, but renders identically.
     915        """
     916        originalElement = domish.Element((None, 'presence'))
     917        stanza = Stanza.fromElement(originalElement)
     918        copyElement = im.clonePresence(stanza)
     919
     920        self.assertNotIdentical(copyElement, originalElement)
     921        self.assertEquals(copyElement.toXml(), originalElement.toXml())
     922
     923
     924
    849925class RosterServerProtocolTest(unittest.TestCase, TestableRequestHandlerMixin):
    850926    """
    851927    Tests for L{im.RosterServerProtocol}.
Note: See TracBrowser for help on using the repository browser.