source: ralphm-patches/c2s_stanza_handlers.patch @ 69:cc2fc0173c4d

Last change on this file since 69:cc2fc0173c4d was 69:cc2fc0173c4d, checked in by Ralph Meijer <ralphm@…>, 7 years ago

Use stanzas, use methods on SessionManager? to retrieve sessions.

File size: 23.8 KB
  • new file doc/examples/client_service.tac

    # HG changeset patch
    # Parent eb898b91399636715feb9530c3d3e0090628926a
    Add c2s protocol handlers for iq, message and presence stanzas.
    
    TODO:
     * Add tests.
     * Add docstrings.
     * Save last unavailable presence for future probes.
    
    diff --git a/doc/examples/client_service.tac b/doc/examples/client_service.tac
    new file mode 100644
    - +  
     1from twisted.application import service, strports
     2from twisted.cred.portal import Portal
     3from twisted.cred.checkers import InMemoryUsernamePasswordDatabaseDontUse
     4from twisted.internet import defer
     5
     6from wokkel import client, xmppim
     7from wokkel.component import InternalComponent, Router
     8from wokkel.generic import FallbackHandler
     9from wokkel.ping import PingHandler
     10from wokkel.xmppim import RosterItem
     11
     12from twisted.words.protocols.jabber.jid import internJID as JID
     13
     14import socket
     15domain = socket.gethostname()
     16
     17ALICE = JID('alice@'+domain)
     18BOB = JID('bob@'+domain)
     19CHARLIE = JID('charlie@'+domain)
     20
     21roster = {
     22    'alice': {
     23        BOB: RosterItem(BOB,
     24                           subscriptionTo=True,
     25                           subscriptionFrom=True,
     26                           name='Bob'),
     27        CHARLIE: RosterItem(CHARLIE,
     28                           subscriptionTo=True,
     29                           subscriptionFrom=True,
     30                           name='Charlie',
     31                           groups=set(['Friends'])),
     32        },
     33    'bob': {
     34        ALICE: RosterItem(ALICE,
     35                           subscriptionTo=True,
     36                           subscriptionFrom=True,
     37                           name='Alice'),
     38        }
     39    }
     40
     41accounts = set(roster.keys())
     42
     43
     44class StaticRoster(xmppim.RosterServerProtocol):
     45
     46    def __init__(self, roster):
     47        xmppim.RosterServerProtocol.__init__(self)
     48        self.roster = roster
     49
     50    def getRoster(self, request):
     51        user = request.sender.user
     52        return defer.succeed(self.roster[user].values())
     53
     54
     55
     56application = service.Application("Jabber server")
     57
     58router = Router()
     59component = InternalComponent(router, domain)
     60component.setServiceParent(application)
     61
     62sessionManager = client.SessionManager(domain, accounts)
     63sessionManager.setHandlerParent(component)
     64
     65checker = InMemoryUsernamePasswordDatabaseDontUse(alice='secret',
     66                                                  bob='secret')
     67portal = Portal(sessionManager, (checker,))
     68portals = {JID(domain): portal}
     69
     70xmppim.AccountIQHandler(sessionManager).setHandlerParent(component)
     71xmppim.AccountMessageHandler(sessionManager).setHandlerParent(component)
     72xmppim.PresenceServerHandler(sessionManager, domain, roster).setHandlerParent(component)
     73FallbackHandler().setHandlerParent(component)
     74StaticRoster(roster).setHandlerParent(component)
     75PingHandler().setHandlerParent(component)
     76
     77c2sFactory = client.XMPPC2SServerFactory(portals)
     78c2sFactory.logTraffic = True
     79c2sService = strports.service('5224', c2sFactory)
     80c2sService.setServiceParent(application)
     81
     82sessionManager.connectionManager = c2sFactory
  • wokkel/client.py

    diff --git a/wokkel/client.py b/wokkel/client.py
    a b  
    589589        return defer.succeed(entity)
    590590
    591591
     592    def lookupSessions(self, entity):
     593        """
     594        Return all sessions for a user.
     595
     596        @param entity: Entity to retrieve sessions for. This the resource part
     597            will be ignored.
     598        @type entity: L{JID<twisted.words.protocols.jabber.jid.JID>}
     599
     600        @return: Mapping of sessions keyed by resource.
     601        @rtype: C{dict}
     602        """
     603        localpart = entity.user
     604
     605        try:
     606            return self.sessions[localpart]
     607        except:
     608            return {}
     609
     610
    592611    def lookupSession(self, entity):
    593         localpart = entity.user
    594         resource = entity.resource
     612        """
     613        Return the session for a particular resource of an entity.
    595614
    596         userSessions = self.sessions[localpart]
    597         session = userSessions[resource]
    598         return session
     615        @param entity: Entity to retrieve sessions for.
     616        @type entity: L{JID<twisted.words.protocols.jabber.jid.JID>}
     617
     618        @return: C{UserSession}.
     619        """
     620
     621        userSessions = self.lookupSessions(entity)
     622        return userSessions[entity.resource]
     623
    599624
    600625
    601626    def unbindResource(self, session, reason=None):
  • wokkel/test/test_xmppim.py

    diff --git a/wokkel/test/test_xmppim.py b/wokkel/test/test_xmppim.py
    a b  
    1313from twisted.words.xish import domish, utility
    1414
    1515from wokkel import xmppim
    16 from wokkel.generic import ErrorStanza, parseXml
     16from wokkel.generic import ErrorStanza, Stanza, parseXml
    1717from wokkel.test.helpers import TestableRequestHandlerMixin, XmlStreamStub
    1818
    1919NS_XML = 'http://www.w3.org/XML/1998/namespace'
     
    13331333
    13341334
    13351335
     1336class AccountIQHandlerTest(unittest.TestCase):
     1337    """
     1338    Tests for L{xmppim.AccountIQHandler}.
     1339    """
     1340
     1341    def setUp(self):
     1342        self.stub = XmlStreamStub()
     1343        self.protocol = xmppim.AccountIQHandler(None)
     1344        self.protocol.makeConnection(self.stub.xmlstream)
     1345        self.protocol.connectionInitialized()
     1346
     1347
     1348    def test_onIQNotUser(self):
     1349        """
     1350        IQs to JIDs without local part are ignored.
     1351        """
     1352        xml = """
     1353          <iq to='example.org'>
     1354            <query xmlns='jabber:iq:version'/>
     1355          </iq>
     1356        """
     1357
     1358        iq = parseXml(xml)
     1359        self.stub.send(iq)
     1360
     1361        self.assertFalse(getattr(iq, 'handled'))
     1362
     1363
     1364
     1365class AccountMessageHandlerTest(unittest.TestCase):
     1366    """
     1367    Tests for L{xmppim.AccountMessageHandler}.
     1368    """
     1369
     1370    def setUp(self):
     1371        self.stub = XmlStreamStub()
     1372        self.protocol = xmppim.AccountMessageHandler(None)
     1373        self.protocol.makeConnection(self.stub.xmlstream)
     1374        self.protocol.connectionInitialized()
     1375
     1376
     1377    def test_onMessageNotUser(self):
     1378        """
     1379        Messages to JIDs without local part are ignored.
     1380        """
     1381        xml = """
     1382          <message to='example.org'>
     1383            <body>Hello</body>
     1384          </message>
     1385        """
     1386
     1387        message = parseXml(xml)
     1388        self.stub.send(message)
     1389
     1390        self.assertFalse(getattr(message, 'handled'))
     1391
     1392
     1393
     1394class ClonePresenceTest(unittest.TestCase):
     1395    """
     1396    Tests for L{xmppim.clonePresence}.
     1397    """
     1398
     1399    def test_rootElement(self):
     1400        """
     1401        The copied presence stanza is not identical, but renders identically.
     1402        """
     1403        originalElement = domish.Element((None, 'presence'))
     1404        stanza = Stanza.fromElement(originalElement)
     1405        copyElement = xmppim.clonePresence(stanza)
     1406
     1407        self.assertNotIdentical(copyElement, originalElement)
     1408        self.assertEquals(copyElement.toXml(), originalElement.toXml())
     1409
     1410
     1411
    13361412class RosterServerProtocolTest(unittest.TestCase, TestableRequestHandlerMixin):
    13371413    """
    13381414    Tests for L{xmppim.RosterServerProtocol}.
  • wokkel/xmppim.py

    diff --git a/wokkel/xmppim.py b/wokkel/xmppim.py
    a b  
    1212
    1313import warnings
    1414
     15import copy
     16
    1517from twisted.internet import defer
     18from twisted.python import log
    1619from twisted.words.protocols.jabber import error
    1720from twisted.words.protocols.jabber.jid import JID
    1821from twisted.words.xish import domish
     
    408411
    409412
    410413
    411     def _onPresence(self, element):
    412         """
    413         Called when a presence stanza has been received.
    414         """
     414    def parsePresence(self, element):
    415415        stanza = Stanza.fromElement(element)
    416416
    417417        presenceType = stanza.stanzaType or 'available'
     
    421421        except KeyError:
    422422            return
    423423
    424         presence = parser.fromElement(element)
     424        return parser.fromElement(element)
     425
     426
     427    def _onPresence(self, element):
     428        """
     429        Called when a presence stanza has been received.
     430        """
     431        presence = self.parsePresence(element)
     432        presenceType = presence.stanzaType or 'available'
    425433
    426434        try:
    427435            handler = getattr(self, '%sReceived' % presenceType)
    428436        except AttributeError:
    429437            return
    430438        else:
    431             handler(presence)
     439            element.handled = handler(presence)
    432440
    433441
    434442
     
    10351043        return element
    10361044
    10371045
     1046    @classmethod
     1047    def fromElement(cls, element):
     1048        stanza = super(Message, cls).fromElement(element)
     1049        stanza.stanzaType = stanza.stanzaType or 'normal'
     1050        return stanza
     1051
     1052
    10381053
    10391054class MessageProtocol(XMPPHandler):
    10401055    """
     
    10671082
    10681083
    10691084
     1085class AccountIQHandler(XMPPHandler):
     1086
     1087    def __init__(self, sessionManager):
     1088        XMPPHandler.__init__(self)
     1089        self.sessionManager = sessionManager
     1090
     1091
     1092    def connectionMade(self):
     1093        self.xmlstream.addObserver('/iq', self.onIQ, 1)
     1094
     1095
     1096    def onIQ(self, iq):
     1097        """
     1098        Handler for iq stanzas to user accounts' connected resources.
     1099
     1100        If the recipient is a bare JID or there is no associated user, this
     1101        handler ignores the stanza, so that other handlers have a chance
     1102        to pick it up. If used, L{generic.FallbackHandler} will respond with a
     1103        C{'service-unavailable'} stanza error if no other handlers handle
     1104        the iq.
     1105        """
     1106
     1107        if iq.handled:
     1108            return
     1109
     1110        stanza = Stanza.fromElement(iq)
     1111        recipient = stanza.recipient
     1112
     1113        if not recipient:
     1114            # This stanza doesn't have a recipient, ignore it.
     1115            return
     1116        elif not recipient.user:
     1117            # This is not for an account, ignore it
     1118            return
     1119        elif recipient.user not in self.sessionManager.accounts:
     1120            # This is not a user, ignore it
     1121            return
     1122        elif not recipient.resource:
     1123            # Bare JID at local domain, ignore it
     1124            return
     1125
     1126        userSessions = self.sessionManager.lookupSessions(recipient)
     1127        if recipient.resource in userSessions:
     1128            self.sessionManager.deliverStanza(iq, recipient)
     1129        else:
     1130            # Full JID without connected resource, return error
     1131            exc = error.StanzaError('service-unavailable')
     1132            if stanza.stanzaType in ('result', 'error'):
     1133                log.err(exc, 'Could not deliver IQ response')
     1134            else:
     1135                self.send(exc.toResponse(iq))
     1136
     1137        iq.handled = True
     1138
     1139
     1140
     1141class AccountMessageHandler(XMPPHandler):
     1142
     1143    def __init__(self, sessionManager):
     1144        XMPPHandler.__init__(self)
     1145        self.sessionManager = sessionManager
     1146
     1147
     1148    def connectionMade(self):
     1149        self.xmlstream.addObserver('/message', self.onMessage, 1)
     1150
     1151
     1152    def onMessage(self, element):
     1153        """
     1154        Handler for message stanzas to user accounts.
     1155        """
     1156
     1157        if element.handled:
     1158            return
     1159
     1160        message = Message.fromElement(element)
     1161        recipient = message.recipient
     1162        stanzaType = message.stanzaType or 'normal'
     1163
     1164        try:
     1165            if not recipient:
     1166                # This stanza doesn't have a recipient, ignore it.
     1167                return
     1168            if not recipient.user:
     1169                # This is not for an account, ignore it
     1170                return
     1171            elif recipient.user not in self.sessionManager.accounts:
     1172                # This is not a user, ignore it
     1173                return
     1174            elif recipient.resource:
     1175                userSessions = self.sessionManager.lookupSessions(recipient)
     1176                if recipient.resource in userSessions:
     1177                    self.sessionManager.deliverStanza(element, recipient)
     1178                else:
     1179                    if stanzaType in ('normal', 'chat', 'headline'):
     1180                        self.onMessageBareJID(message)
     1181                    elif stanzaType == 'error':
     1182                        log.msg("Dropping message to unconnected resource %r" %
     1183                                recipient.full())
     1184                    elif stanzaType == 'groupchat':
     1185                        raise error.StanzaError('service-unavailable')
     1186            else:
     1187                self.onMessageBareJID(message)
     1188        except error.StanzaError, exc:
     1189            if stanzaType == 'error':
     1190                log.err(exc, "Undeliverable error")
     1191            else:
     1192                self.send(exc.toResponse(element))
     1193
     1194        element.handled = True
     1195
     1196
     1197    def onMessageBareJID(self, message):
     1198        userSessions = self.sessionManager.lookupSessions(message.recipient)
     1199
     1200        recipients = set()
     1201
     1202        if message.stanzaType == 'headline':
     1203            for session in userSessions.itervalues():
     1204                if session.presence.priority >= 0:
     1205                    recipients.add(session.entity)
     1206        elif message.stanzaType in ('chat', 'normal'):
     1207            priorities = {}
     1208            for session in userSessions.itervalues():
     1209                if not session.presence or not session.presence.available:
     1210                    continue
     1211                priority = session.presence.priority
     1212                if priority >= 0:
     1213                    priorities.setdefault(priority, set()).add(session.entity)
     1214            if priorities:
     1215                maxPriority = max(priorities.keys())
     1216                recipients.update(priorities[maxPriority])
     1217        elif message.stanzaType == 'groupchat':
     1218            raise error.StanzaError('service-unavailable')
     1219
     1220        if recipients:
     1221            for recipient in recipients:
     1222                self.sessionManager.deliverStanza(message.element, recipient)
     1223        elif message.stanzaType in ('chat', 'normal'):
     1224            raise error.StanzaError('service-unavailable')
     1225        else:
     1226            # silently discard
     1227            log.msg("Discarding message to %r" % message.recipient)
     1228
     1229
     1230
     1231
     1232def clonePresence(presence):
     1233    """
     1234    Make a deep copy of a presence stanza.
     1235
     1236    The returned presence stanza is an orphaned deep copy of the given
     1237    original.
     1238
     1239    @note: Since the reference to the original parent, if any, is gone,
     1240    inherited attributes like C{xml:lang} are not preserved.
     1241    """
     1242    element = presence.element
     1243
     1244    parent = element.parent
     1245    element.parent = None
     1246    newElement = copy.deepcopy(element)
     1247    element.parent = parent
     1248    return newElement
     1249
     1250
     1251
     1252class PresenceServerHandler(PresenceProtocol):
     1253
     1254    def __init__(self, sessionManager, domain, roster):
     1255        PresenceProtocol.__init__(self)
     1256        self.sessionManager = sessionManager
     1257        self.domain = domain
     1258        self.roster = roster
     1259        self.presences = {} # user -> resource -> presence
     1260        self.offlinePresences = {} # user -> presence
     1261        self.remotePresences = {} # user -> remote entity -> presence
     1262
     1263        self.sessionManager.clientStream.addObserver('/presence',
     1264                                                     self._onPresenceOutbound)
     1265
     1266
     1267    def _onPresenceOutbound(self, element):
     1268        log.msg("Got outbound presence: %r" % element.toXml())
     1269        presence = self.parsePresence(element)
     1270
     1271        presenceType = presence.stanzaType or 'available'
     1272        method = '%sReceivedOutbound' % presenceType
     1273        print method
     1274
     1275        try:
     1276            handler = getattr(self, method)
     1277        except AttributeError:
     1278            return
     1279        else:
     1280            element.handled = handler(presence)
     1281
     1282
     1283    def _broadcastToOtherResources(self, presence):
     1284        """
     1285        Broadcast presence to other available resources.
     1286        """
     1287        fromJID = presence.sender
     1288        for otherResource in self.presences[fromJID.user]:
     1289            if otherResource == fromJID.resource:
     1290                continue
     1291
     1292            resourceJID = JID(tuple=(fromJID.user,
     1293                                     fromJID.host,
     1294                                     otherResource))
     1295            outPresence = clonePresence(presence)
     1296            outPresence['to'] = resourceJID.full()
     1297            self.sessionManager.deliverStanza(outPresence, resourceJID)
     1298
     1299
     1300    def _broadcastToContacts(self, presence):
     1301        """
     1302        Broadcast presence to subscribed entities.
     1303        """
     1304        fromJID = presence.sender
     1305        roster = self.roster[fromJID.user]
     1306
     1307        for item in roster.itervalues():
     1308            if not item.subscriptionFrom:
     1309                continue
     1310
     1311            outPresence = clonePresence(presence)
     1312            outPresence['to'] = item.entity.full()
     1313
     1314            if item.entity.host == self.domain:
     1315                # local contact
     1316                if item.entity.user in self.presences:
     1317                    # broadcast to contact's available resources
     1318                    for itemResource in self.presences[item.entity.user]:
     1319                        resourceJID = JID(tuple=(item.entity.user,
     1320                                                 item.entity.host,
     1321                                                 itemResource))
     1322                        self.sessionManager.deliverStanza(outPresence,
     1323                                                          resourceJID)
     1324            else:
     1325                # remote contact
     1326                self.send(outPresence)
     1327
     1328
     1329    def _on_availableBroadcast(self, presence):
     1330        fromJID = presence.sender
     1331        user, resource = fromJID.user, fromJID.resource
     1332        roster = self.roster[user]
     1333
     1334        if user not in self.presences:
     1335            # initial presence
     1336            self.presences[user] = {}
     1337            self.remotePresences[user] = {}
     1338
     1339            # send out probes
     1340            for item in roster.itervalues():
     1341                if item.subscriptionTo and item.entity.host != self.domain:
     1342                    self.probe(item.entity, fromJID)
     1343        else:
     1344            if resource not in self.presences[user]:
     1345                # initial presence with another available resource
     1346
     1347                # send last known presences from remote contacts
     1348                remotePresences = self.remotePresences[user]
     1349                for entity, remotePresence in remotePresences.iteritems():
     1350                    self.sessionManager.deliverStanza(remotePresence.element,
     1351                                                      fromJID)
     1352
     1353            # send presence to other resources
     1354            self._broadcastToOtherResources(presence)
     1355
     1356        # Send last known local presences
     1357        if user not in self.presences or resource not in self.presences[user]:
     1358            for item in roster.itervalues():
     1359                if item.subscriptionTo and \
     1360                   item.entity.host == self.domain and \
     1361                   item.entity.user in self.presences:
     1362                    for contactPresence in \
     1363                            self.presences[item.entity.user].itervalues():
     1364                        outPresence = clonePresence(contactPresence)
     1365                        outPresence['to'] = fromJID.userhost()
     1366                        self.sessionManager.deliverStanza(outPresence, fromJID)
     1367
     1368        # broadcast presence
     1369        self._broadcastToContacts(presence)
     1370
     1371        # save presence
     1372        self.presences[user][resource] = presence
     1373        session = self.sessionManager.lookupSession(fromJID)
     1374        session.presence = presence
     1375
     1376        return True
     1377
     1378
     1379    def _on_availableDirected(self, presence):
     1380        self.send(presence.element)
     1381        return True
     1382
     1383
     1384    def availableReceivedOutbound(self, presence):
     1385        if presence.recipient:
     1386            return self._on_availableDirected(presence)
     1387        else:
     1388            return self._on_availableBroadcast(presence)
     1389
     1390
     1391    def availableReceived(self, presence):
     1392        fromJID = presence.sender
     1393        toJID = presence.recipient
     1394
     1395        if not toJID.user:
     1396            # This is not for an account, ignore it.
     1397            return False
     1398        elif toJID.user not in self.roster:
     1399            # This is not for a known account, ignore it.
     1400            return False
     1401        elif toJID.user not in self.presences:
     1402            # No available resource, drop it.
     1403            return True
     1404        else:
     1405            for resource in self.presences[toJID.user]:
     1406                resourceJID = JID(tuple=(toJID.user,
     1407                                         toJID.host,
     1408                                         resource))
     1409                self.sessionManager.deliverStanza(presence.element, resourceJID)
     1410            self.remotePresences[toJID.user][fromJID] = presence
     1411            return True
     1412
     1413
     1414    def _on_unavailableBroadcast(self, presence):
     1415        fromJID = presence.sender
     1416        user, resource = fromJID.user, fromJID.resource
     1417
     1418        # broadcast presence
     1419        self._broadcastToContacts(presence)
     1420
     1421        if user in self.presences:
     1422            # send presence to other resources
     1423            self._broadcastToOtherResources(presence)
     1424
     1425            # update stored presences
     1426            if resource in self.presences[user]:
     1427                del self.presences[user][resource]
     1428
     1429            if not self.presences[user]:
     1430                # last resource to become unavailable
     1431                del self.presences[user]
     1432
     1433                # TODO: save last unavailable presence
     1434
     1435        return True
     1436
     1437
     1438    def _on_unavailableDirected(self, presence):
     1439        self.send(presence.element)
     1440        return True
     1441
     1442
     1443    def unavailableReceivedOutbound(self, presence):
     1444        if presence.recipient:
     1445            return self._on_unavailableDirected(presence)
     1446        else:
     1447            return self._on_unavailableBroadcast(presence)
     1448
     1449#    def unavailableReceived(self, presence):
     1450
     1451
     1452    def subscribedReceivedOutbound(self, presence):
     1453        log.msg("%r subscribed %s to its presence" % (presence.sender,
     1454                                                      presence.recipient))
     1455        self.send(presence.element)
     1456        return True
     1457
     1458
     1459    def subscribedReceived(self, presence):
     1460        log.msg("%r subscribed %s to its presence" % (presence.sender,
     1461                                                      presence.recipient))
     1462
     1463
     1464    def unsubscribedReceivedOutbound(self, presence):
     1465        log.msg("%r unsubscribed %s from its presence" % (presence.sender,
     1466                                                          presence.recipient))
     1467        self.send(presence.element)
     1468        return True
     1469
     1470
     1471    def unsubscribedReceived(self, presence):
     1472        log.msg("%r unsubscribed %s from its presence" % (presence.sender,
     1473                                                          presence.recipient))
     1474
     1475
     1476    def subscribeReceivedOutbound(self, presence):
     1477        log.msg("%r requests subscription to %s" % (presence.sender,
     1478                                                    presence.recipient))
     1479        self.send(presence.element)
     1480        return True
     1481
     1482
     1483    def subscribeReceived(self, presence):
     1484        log.msg("%r requests subscription to %s" % (presence.sender,
     1485                                                    presence.recipient))
     1486
     1487
     1488    def unsubscribeReceivedOutbound(self, presence):
     1489        log.msg("%r requests unsubscription from %s" % (presence.sender,
     1490                                                        presence.recipient))
     1491        self.send(presence.element)
     1492        return True
     1493
     1494
     1495    def unsubscribeReceived(self, presence):
     1496        log.msg("%r requests unsubscription from %s" % (presence.sender,
     1497                                                        presence.recipient))
     1498
     1499
     1500    def probeReceived(self, presence):
     1501        fromJID = presence.sender
     1502        toJID = presence.recipient
     1503
     1504        if toJID.user not in self.roster or \
     1505           fromJID.userhost() not in self.roster[toJID.user] or \
     1506           not self.roster[toJID.user][fromJID.userhost()].subscriptionFrom:
     1507            # send unsubscribed
     1508            pass
     1509        elif toJID.user not in self.presences:
     1510            # send last unavailable or nothing
     1511            pass
     1512        else:
     1513            for resourcePresence in self.presences[toJID.user].itervalues():
     1514                outPresence = clonePresence(resourcePresence)
     1515                outPresence['to'] = fromJID.userhost()
     1516                self.send(outPresence)
     1517
     1518
     1519
    10701520class RosterServerProtocol(XMPPHandler, IQHandlerMixin):
    10711521    """
    10721522    XMPP subprotocol handler for the roster, server side.
Note: See TracBrowser for help on using the repository browser.