# HG changeset patch # Parent 9e8497278e0e4f8a145f321a4e4d22e3bb499b38 Add c2s protocol handlers for iq, message and presence stanzas. TODO: * Add tests. * Add docstrings. * Save last unavailable presence for future probes. diff -r 9e8497278e0e doc/examples/client_service.tac --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/doc/examples/client_service.tac Thu Oct 06 18:55:13 2011 +0200 @@ -0,0 +1,75 @@ +from twisted.application import service, strports +from twisted.internet import defer + +from wokkel import client, im +from wokkel.component import InternalComponent, Router +from wokkel.generic import FallbackHandler +from wokkel.ping import PingHandler +from wokkel.im import RosterItem + +from twisted.words.protocols.jabber.jid import internJID as JID + +import socket +domain = socket.gethostname() + +RALPHM = JID('ralphm@'+domain) +INTOSI = JID('intosi@'+domain) +TERMIE = JID('termie@'+domain) + +roster = { + 'ralphm': { + INTOSI: RosterItem(INTOSI, + subscriptionTo=True, + subscriptionFrom=True, + name='Intosi', + groups=set(['Friends'])), + TERMIE: RosterItem(TERMIE, + subscriptionTo=True, + subscriptionFrom=True, + name='termie'), + }, + 'termie': { + RALPHM: RosterItem(RALPHM, + subscriptionTo=True, + subscriptionFrom=True, + name='ralphm'), + } + } + +accounts = set(roster.keys()) + + +class StaticRoster(im.RosterServerProtocol): + + def __init__(self, roster): + im.RosterServerProtocol.__init__(self) + self.roster = roster + + def getRoster(self, request): + user = request.sender.user + return defer.succeed(self.roster[user].values()) + + + +application = service.Application("Jabber server") + +router = Router() +component = InternalComponent(router, domain) +component.setServiceParent(application) + +sessionManager = client.SessionManager(domain, accounts) +sessionManager.setHandlerParent(component) + +im.AccountIQHandler(sessionManager).setHandlerParent(component) +im.AccountMessageHandler(sessionManager).setHandlerParent(component) +im.PresenceServerHandler(sessionManager, domain, roster).setHandlerParent(component) +FallbackHandler().setHandlerParent(component) +StaticRoster(roster).setHandlerParent(component) +PingHandler().setHandlerParent(component) + +c2sFactory = client.XMPPC2SServerFactory(sessionManager) +c2sFactory.logTraffic = True +c2sService = strports.service('5224', c2sFactory) +c2sService.setServiceParent(application) + +sessionManager.connectionManager = c2sFactory diff -r 9e8497278e0e wokkel/im.py --- a/wokkel/im.py Thu Oct 06 18:47:41 2011 +0200 +++ b/wokkel/im.py Thu Oct 06 18:55:13 2011 +0200 @@ -10,7 +10,10 @@ U{RFC 6121} (XMPP IM). """ +import copy + from twisted.internet import defer +from twisted.python import log from twisted.words.protocols.jabber import jid from twisted.words.protocols.jabber import error from twisted.words.xish import domish @@ -168,7 +171,10 @@ self.xmlstream.addObserver("/presence", self._onPresence) - def _onPresence(self, element): + def parsePresence(self, element): + """ + Parse presence. + """ stanza = Stanza.fromElement(element) presenceType = stanza.stanzaType or 'available' @@ -178,14 +184,19 @@ except KeyError: return - presence = parser.fromElement(element) + return parser.fromElement(element) + + + def _onPresence(self, element): + presence = self.parsePresence(element) + presenceType = presence.stanzaType or 'available' try: handler = getattr(self, '%sReceived' % presenceType) except AttributeError: return else: - handler(presence) + element.handled = handler(presence) def errorReceived(self, presence): @@ -553,6 +564,440 @@ +class AccountIQHandler(XMPPHandler): + + def __init__(self, sessionManager): + XMPPHandler.__init__(self) + self.sessionManager = sessionManager + + + def connectionMade(self): + self.xmlstream.addObserver('/iq', self.onIQ, 1) + + + def onIQ(self, iq): + """ + Handler for iq stanzas to user accounts' connected resources. + + If the recipient is a bare JID or there is no associated user, this + handler ignores the stanza, so that other handlers have a chance + to pick it up. If used, L{generic.FallbackHandler} will respond with a + C{'service-unavailable'} stanza error if no other handlers handle + the iq. + """ + + if iq.handled: + return + + try: + recipient = jid.internJID(iq['to']) + except KeyError: + return + + if not recipient.user: + # This is not for an account, ignore it + return + elif recipient.user not in self.sessionManager.accounts: + # This is not a user, ignore it + return + elif not recipient.resource: + # Bare JID at local domain, ignore it + return + + userSessions = self.sessionManager.sessions.get(recipient.user, + {}) + if recipient.resource in userSessions: + self.sessionManager.deliverStanza(iq, recipient) + else: + # Full JID without connected resource, return error + exc = error.StanzaError('service-unavailable') + if iq['type'] in ('result', 'error'): + log.err(exc, 'Could not deliver IQ response') + else: + self.send(exc.toResponse(iq)) + + iq.handled = True + + + +class AccountMessageHandler(XMPPHandler): + + def __init__(self, sessionManager): + XMPPHandler.__init__(self) + self.sessionManager = sessionManager + + + def connectionMade(self): + self.xmlstream.addObserver('/message', self.onMessage, 1) + + + def onMessage(self, message): + """ + Handler for message stanzas to user accounts. + """ + + if message.handled: + return + + try: + recipient = jid.internJID(message['to']) + except KeyError: + return + + stanzaType = message.getAttribute('type', 'normal') + + try: + if not recipient.user: + # This is not for an account, ignore it + return + elif recipient.user not in self.sessionManager.accounts: + # This is not a user, ignore it + return + elif recipient.resource: + userSessions = self.sessionManager.sessions.get(recipient.user, + {}) + if recipient.resource in userSessions: + self.sessionManager.deliverStanza(message, recipient) + else: + if stanzaType in ('normal', 'chat', 'headline'): + self.onMessageBareJID(message, recipient.userhostJID()) + elif stanzaType == 'error': + log.msg("Dropping message to unconnected resource %r" % + recipient.full()) + elif stanzaType == 'groupchat': + raise error.StanzaError('service-unavailable') + else: + self.onMessageBareJID(message, recipient) + except error.StanzaError, exc: + if stanzaType == 'error': + log.err(exc, "Undeliverable error") + else: + self.send(exc.toResponse(message)) + + message.handled = True + + + def onMessageBareJID(self, message, bareJID): + stanzaType = message.getAttribute('type', 'normal') + + userSessions = self.sessionManager.sessions.get(bareJID.user, {}) + + recipients = set() + + if stanzaType == 'headline': + for session in userSessions: + if session.presence.priority >= 0: + recipients.add(session.entity) + elif stanzaType in ('chat', 'normal'): + priorities = {} + for session in userSessions.itervalues(): + if not session.presence or not session.presence.available: + continue + priority = session.presence.priority + if priority >= 0: + priorities.setdefault(priority, set()).add(session.entity) + maxPriority = max(priorities.keys()) + recipients.update(priorities[maxPriority]) + elif stanzaType == 'groupchat': + raise error.StanzaError('service-unavailable') + + if recipients: + for recipient in recipients: + self.sessionManager.deliverStanza(message, recipient) + elif stanzaType in ('chat', 'normal'): + raise error.StanzaError('service-unavailable') + else: + # silently discard + log.msg("Discarding message to %r" % message['to']) + + + + +def clonePresence(presence): + """ + Make a deep copy of a presence stanza. + + The returned presence stanza is an orphaned deep copy of the given + original. + + @note: Since the reference to the original parent, if any, is gone, + inherited attributes like C{xml:lang} are not preserved. + """ + element = presence.element + + parent = element.parent + element.parent = None + newElement = copy.deepcopy(element) + element.parent = parent + return newElement + + + +class PresenceServerHandler(PresenceProtocol): + + def __init__(self, sessionManager, domain, roster): + PresenceProtocol.__init__(self) + self.sessionManager = sessionManager + self.domain = domain + self.roster = roster + self.presences = {} # user -> resource -> presence + self.offlinePresences = {} # user -> presence + self.remotePresences = {} # user -> remote entity -> presence + + self.sessionManager.clientStream.addObserver('/presence', + self._onPresenceOutbound) + + + def _onPresenceOutbound(self, element): + log.msg("Got outbound presence: %r" % element.toXml()) + presence = self.parsePresence(element) + + presenceType = presence.stanzaType or 'available' + method = '%sReceivedOutbound' % presenceType + print method + + try: + handler = getattr(self, method) + except AttributeError: + return + else: + element.handled = handler(presence) + + + def _broadcastToOtherResources(self, presence): + """ + Broadcast presence to other available resources. + """ + fromJID = presence.sender + for otherResource in self.presences[fromJID.user]: + if otherResource == fromJID.resource: + continue + + resourceJID = jid.JID(tuple=(fromJID.user, + fromJID.host, + otherResource)) + outPresence = clonePresence(presence) + outPresence['to'] = resourceJID.full() + self.sessionManager.deliverStanza(outPresence, resourceJID) + + + def _broadcastToContacts(self, presence): + """ + Broadcast presence to subscribed entities. + """ + fromJID = presence.sender + roster = self.roster[fromJID.user] + + for item in roster.itervalues(): + if not item.subscriptionFrom: + continue + + outPresence = clonePresence(presence) + outPresence['to'] = item.entity.full() + + if item.entity.host == self.domain: + # local contact + if item.entity.user in self.presences: + # broadcast to contact's available resources + for itemResource in self.presences[item.entity.user]: + resourceJID = jid.JID(tuple=(item.entity.user, + item.entity.host, + itemResource)) + self.sessionManager.deliverStanza(outPresence, + resourceJID) + else: + # remote contact + self.send(outPresence) + + + def _on_availableBroadcast(self, presence): + fromJID = presence.sender + user, resource = fromJID.user, fromJID.resource + roster = self.roster[user] + + if user not in self.presences: + # initial presence + self.presences[user] = {} + self.remotePresences[user] = {} + + # send out probes + for item in roster.itervalues(): + if item.subscriptionTo and item.entity.host != self.domain: + self.probe(item.entity, fromJID) + else: + if resource not in self.presences[user]: + # initial presence with another available resource + + # send last known presences from remote contacts + remotePresences = self.remotePresences[user] + for entity, remotePresence in remotePresences.iteritems(): + self.sessionManager.deliverStanza(remotePresence.element, + fromJID) + + # send presence to other resources + self._broadcastToOtherResources(presence) + + # Send last known local presences + if user not in self.presences or resource not in self.presences[user]: + for item in roster.itervalues(): + if item.subscriptionTo and \ + item.entity.host == self.domain and \ + item.entity.user in self.presences: + for contactPresence in \ + self.presences[item.entity.user].itervalues(): + outPresence = clonePresence(contactPresence) + outPresence['to'] = fromJID.userhost() + self.sessionManager.deliverStanza(outPresence, fromJID) + + # broadcast presence + self._broadcastToContacts(presence) + + # save presence + self.presences[user][resource] = presence + self.sessionManager.sessions[user][resource].presence = presence + + return True + + + def _on_availableDirected(self, presence): + self.send(presence.element) + return True + + + def availableReceivedOutbound(self, presence): + if presence.recipient: + return self._on_availableDirected(presence) + else: + return self._on_availableBroadcast(presence) + + + def availableReceived(self, presence): + fromJID = presence.sender + toJID = presence.recipient + + if toJID.user not in self.roster: + return False + + if toJID.user in self.presences: + for resource in self.presences[toJID.user]: + resourceJID = jid.JID(tuple=(toJID.user, + toJID.host, + resource)) + self.sessionManager.deliverStanza(presence.element, resourceJID) + self.remotePresences[toJID.user][fromJID] = presence + else: + # no such user or no available resource, ignore this stanza + pass + + return True + + + def _on_unavailableBroadcast(self, presence): + fromJID = presence.sender + user, resource = fromJID.user, fromJID.resource + + # broadcast presence + self._broadcastToContacts(presence) + + if user in self.presences: + # send presence to other resources + self._broadcastToOtherResources(presence) + + # update stored presences + if resource in self.presences[user]: + del self.presences[user][resource] + + if not self.presences[user]: + # last resource to become unavailable + del self.presences[user] + + # TODO: save last unavailable presence + + return True + + + def _on_unavailableDirected(self, presence): + self.send(presence.element) + return True + + + def unavailableReceivedOutbound(self, presence): + if presence.recipient: + return self._on_unavailableDirected(presence) + else: + return self._on_unavailableBroadcast(presence) + +# def unavailableReceived(self, presence): + + + def subscribedReceivedOutbound(self, presence): + log.msg("%r subscribed %s to its presence" % (presence.sender, + presence.recipient)) + self.send(presence.element) + return True + + + def subscribedReceived(self, presence): + log.msg("%r subscribed %s to its presence" % (presence.sender, + presence.recipient)) + + + def unsubscribedReceivedOutbound(self, presence): + log.msg("%r unsubscribed %s from its presence" % (presence.sender, + presence.recipient)) + self.send(presence.element) + return True + + + def unsubscribedReceived(self, presence): + log.msg("%r unsubscribed %s from its presence" % (presence.sender, + presence.recipient)) + + + def subscribeReceivedOutbound(self, presence): + log.msg("%r requests subscription to %s" % (presence.sender, + presence.recipient)) + self.send(presence.element) + return True + + + def subscribeReceived(self, presence): + log.msg("%r requests subscription to %s" % (presence.sender, + presence.recipient)) + + + def unsubscribeReceivedOutbound(self, presence): + log.msg("%r requests unsubscription from %s" % (presence.sender, + presence.recipient)) + self.send(presence.element) + return True + + + def unsubscribeReceived(self, presence): + log.msg("%r requests unsubscription from %s" % (presence.sender, + presence.recipient)) + + + def probeReceived(self, presence): + fromJID = presence.sender + toJID = presence.recipient + + if toJID.user not in self.roster or \ + fromJID.userhost() not in self.roster[toJID.user] or \ + not self.roster[toJID.user][fromJID.userhost()].subscriptionFrom: + # send unsubscribed + pass + elif toJID.user not in self.presences: + # send last unavailable or nothing + pass + else: + for resourcePresence in self.presences[toJID.user].itervalues(): + outPresence = clonePresence(resourcePresence) + outPresence['to'] = fromJID.userhost() + self.send(outPresence) + + + class RosterServerProtocol(XMPPHandler, IQHandlerMixin): """ XMPP subprotocol handler for the roster, server side. diff -r 9e8497278e0e wokkel/test/test_im.py --- a/wokkel/test/test_im.py Thu Oct 06 18:47:41 2011 +0200 +++ b/wokkel/test/test_im.py Thu Oct 06 18:55:13 2011 +0200 @@ -13,7 +13,7 @@ from twisted.words.xish import domish, utility from wokkel import im -from wokkel.generic import ErrorStanza, parseXml +from wokkel.generic import ErrorStanza, Stanza, parseXml from wokkel.test.helpers import TestableRequestHandlerMixin, XmlStreamStub NS_XML = 'http://www.w3.org/XML/1998/namespace' @@ -846,6 +846,82 @@ +class AccountIQHandlerTest(unittest.TestCase): + """ + Tests for L{im.AccountIQHandler}. + """ + + def setUp(self): + self.stub = XmlStreamStub() + self.protocol = im.AccountIQHandler(None) + self.protocol.makeConnection(self.stub.xmlstream) + self.protocol.connectionInitialized() + + + def test_onIQNotUser(self): + """ + IQs to JIDs without local part are ignored. + """ + xml = """ + + + + """ + + iq = parseXml(xml) + self.stub.send(iq) + + self.assertFalse(getattr(iq, 'handled')) + + + +class AccountMessageHandlerTest(unittest.TestCase): + """ + Tests for L{im.AccountMessageHandler}. + """ + + def setUp(self): + self.stub = XmlStreamStub() + self.protocol = im.AccountMessageHandler(None) + self.protocol.makeConnection(self.stub.xmlstream) + self.protocol.connectionInitialized() + + + def test_onMessageNotUser(self): + """ + Messages to JIDs without local part are ignored. + """ + xml = """ + + Hello + + """ + + message = parseXml(xml) + self.stub.send(message) + + self.assertFalse(getattr(message, 'handled')) + + + +class ClonePresenceTest(unittest.TestCase): + """ + Tests for L{im.clonePresence}. + """ + + def test_rootElement(self): + """ + The copied presence stanza is not identical, but renders identically. + """ + originalElement = domish.Element((None, 'presence')) + stanza = Stanza.fromElement(originalElement) + copyElement = im.clonePresence(stanza) + + self.assertNotIdentical(copyElement, originalElement) + self.assertEquals(copyElement.toXml(), originalElement.toXml()) + + + class RosterServerProtocolTest(unittest.TestCase, TestableRequestHandlerMixin): """ Tests for L{im.RosterServerProtocol}.