source: ralphm-patches/c2s_stanza_handlers.patch @ 54:03ec57713c90

Last change on this file since 54:03ec57713c90 was 54:03ec57713c90, checked in by Ralph Meijer <ralphm@…>, 11 years ago

Upstreamed Request patches, split out c2s patches in managable chunks, prepare for release of Wokkel 0.7.0.

File size: 21.8 KB
RevLine 
[54]1# HG changeset patch
2# Parent 9e8497278e0e4f8a145f321a4e4d22e3bb499b38
3Add c2s protocol handlers for iq, message and presence stanzas.
4
5TODO:
6 * Add tests.
7 * Add docstrings.
8 * Save last unavailable presence for future probes.
9
10diff -r 9e8497278e0e doc/examples/client_service.tac
11--- /dev/null   Thu Jan 01 00:00:00 1970 +0000
12+++ b/doc/examples/client_service.tac   Thu Oct 06 18:55:13 2011 +0200
13@@ -0,0 +1,75 @@
14+from twisted.application import service, strports
15+from twisted.internet import defer
16+
17+from wokkel import client, im
18+from wokkel.component import InternalComponent, Router
19+from wokkel.generic import FallbackHandler
20+from wokkel.ping import PingHandler
21+from wokkel.im import RosterItem
22+
23+from twisted.words.protocols.jabber.jid import internJID as JID
24+
25+import socket
26+domain = socket.gethostname()
27+
28+RALPHM = JID('ralphm@'+domain)
29+INTOSI = JID('intosi@'+domain)
30+TERMIE = JID('termie@'+domain)
31+
32+roster = {
33+    'ralphm': {
34+        INTOSI: RosterItem(INTOSI,
35+                           subscriptionTo=True,
36+                           subscriptionFrom=True,
37+                           name='Intosi',
38+                           groups=set(['Friends'])),
39+        TERMIE: RosterItem(TERMIE,
40+                           subscriptionTo=True,
41+                           subscriptionFrom=True,
42+                           name='termie'),
43+        },
44+    'termie': {
45+        RALPHM: RosterItem(RALPHM,
46+                           subscriptionTo=True,
47+                           subscriptionFrom=True,
48+                           name='ralphm'),
49+        }
50+    }
51+
52+accounts = set(roster.keys())
53+
54+
55+class StaticRoster(im.RosterServerProtocol):
56+
57+    def __init__(self, roster):
58+        im.RosterServerProtocol.__init__(self)
59+        self.roster = roster
60+
61+    def getRoster(self, request):
62+        user = request.sender.user
63+        return defer.succeed(self.roster[user].values())
64+
65+
66+
67+application = service.Application("Jabber server")
68+
69+router = Router()
70+component = InternalComponent(router, domain)
71+component.setServiceParent(application)
72+
73+sessionManager = client.SessionManager(domain, accounts)
74+sessionManager.setHandlerParent(component)
75+
76+im.AccountIQHandler(sessionManager).setHandlerParent(component)
77+im.AccountMessageHandler(sessionManager).setHandlerParent(component)
78+im.PresenceServerHandler(sessionManager, domain, roster).setHandlerParent(component)
79+FallbackHandler().setHandlerParent(component)
80+StaticRoster(roster).setHandlerParent(component)
81+PingHandler().setHandlerParent(component)
82+
83+c2sFactory = client.XMPPC2SServerFactory(sessionManager)
84+c2sFactory.logTraffic = True
85+c2sService = strports.service('5224', c2sFactory)
86+c2sService.setServiceParent(application)
87+
88+sessionManager.connectionManager = c2sFactory
89diff -r 9e8497278e0e wokkel/im.py
90--- a/wokkel/im.py      Thu Oct 06 18:47:41 2011 +0200
91+++ b/wokkel/im.py      Thu Oct 06 18:55:13 2011 +0200
92@@ -10,7 +10,10 @@
93 U{RFC 6121<http://www.xmpp.org/rfcs/rfc6121.html>} (XMPP IM).
94 """
95 
96+import copy
97+
98 from twisted.internet import defer
99+from twisted.python import log
100 from twisted.words.protocols.jabber import jid
101 from twisted.words.protocols.jabber import error
102 from twisted.words.xish import domish
103@@ -168,7 +171,10 @@
104         self.xmlstream.addObserver("/presence", self._onPresence)
105 
106 
107-    def _onPresence(self, element):
108+    def parsePresence(self, element):
109+        """
110+        Parse presence.
111+        """
112         stanza = Stanza.fromElement(element)
113 
114         presenceType = stanza.stanzaType or 'available'
115@@ -178,14 +184,19 @@
116         except KeyError:
117             return
118 
119-        presence = parser.fromElement(element)
120+        return parser.fromElement(element)
121+
122+
123+    def _onPresence(self, element):
124+        presence = self.parsePresence(element)
125+        presenceType = presence.stanzaType or 'available'
126 
127         try:
128             handler = getattr(self, '%sReceived' % presenceType)
129         except AttributeError:
130             return
131         else:
132-            handler(presence)
133+            element.handled = handler(presence)
134 
135 
136     def errorReceived(self, presence):
137@@ -553,6 +564,440 @@
138 
139 
140 
141+class AccountIQHandler(XMPPHandler):
142+
143+    def __init__(self, sessionManager):
144+        XMPPHandler.__init__(self)
145+        self.sessionManager = sessionManager
146+
147+
148+    def connectionMade(self):
149+        self.xmlstream.addObserver('/iq', self.onIQ, 1)
150+
151+
152+    def onIQ(self, iq):
153+        """
154+        Handler for iq stanzas to user accounts' connected resources.
155+
156+        If the recipient is a bare JID or there is no associated user, this
157+        handler ignores the stanza, so that other handlers have a chance
158+        to pick it up. If used, L{generic.FallbackHandler} will respond with a
159+        C{'service-unavailable'} stanza error if no other handlers handle
160+        the iq.
161+        """
162+
163+        if iq.handled:
164+            return
165+
166+        try:
167+            recipient = jid.internJID(iq['to'])
168+        except KeyError:
169+            return
170+
171+        if not recipient.user:
172+            # This is not for an account, ignore it
173+            return
174+        elif recipient.user not in self.sessionManager.accounts:
175+            # This is not a user, ignore it
176+            return
177+        elif not recipient.resource:
178+            # Bare JID at local domain, ignore it
179+            return
180+
181+        userSessions = self.sessionManager.sessions.get(recipient.user,
182+                                                        {})
183+        if recipient.resource in userSessions:
184+            self.sessionManager.deliverStanza(iq, recipient)
185+        else:
186+            # Full JID without connected resource, return error
187+            exc = error.StanzaError('service-unavailable')
188+            if iq['type'] in ('result', 'error'):
189+                log.err(exc, 'Could not deliver IQ response')
190+            else:
191+                self.send(exc.toResponse(iq))
192+
193+        iq.handled = True
194+
195+
196+
197+class AccountMessageHandler(XMPPHandler):
198+
199+    def __init__(self, sessionManager):
200+        XMPPHandler.__init__(self)
201+        self.sessionManager = sessionManager
202+
203+
204+    def connectionMade(self):
205+        self.xmlstream.addObserver('/message', self.onMessage, 1)
206+
207+
208+    def onMessage(self, message):
209+        """
210+        Handler for message stanzas to user accounts.
211+        """
212+
213+        if message.handled:
214+            return
215+
216+        try:
217+            recipient = jid.internJID(message['to'])
218+        except KeyError:
219+            return
220+
221+        stanzaType = message.getAttribute('type', 'normal')
222+
223+        try:
224+            if not recipient.user:
225+                # This is not for an account, ignore it
226+                return
227+            elif recipient.user not in self.sessionManager.accounts:
228+                # This is not a user, ignore it
229+                return
230+            elif recipient.resource:
231+                userSessions = self.sessionManager.sessions.get(recipient.user,
232+                                                                {})
233+                if recipient.resource in userSessions:
234+                    self.sessionManager.deliverStanza(message, recipient)
235+                else:
236+                    if stanzaType in ('normal', 'chat', 'headline'):
237+                        self.onMessageBareJID(message, recipient.userhostJID())
238+                    elif stanzaType == 'error':
239+                        log.msg("Dropping message to unconnected resource %r" %
240+                                recipient.full())
241+                    elif stanzaType == 'groupchat':
242+                        raise error.StanzaError('service-unavailable')
243+            else:
244+                self.onMessageBareJID(message, recipient)
245+        except error.StanzaError, exc:
246+            if stanzaType == 'error':
247+                log.err(exc, "Undeliverable error")
248+            else:
249+                self.send(exc.toResponse(message))
250+
251+        message.handled = True
252+
253+
254+    def onMessageBareJID(self, message, bareJID):
255+        stanzaType = message.getAttribute('type', 'normal')
256+
257+        userSessions = self.sessionManager.sessions.get(bareJID.user, {})
258+
259+        recipients = set()
260+
261+        if stanzaType == 'headline':
262+            for session in userSessions:
263+                if session.presence.priority >= 0:
264+                    recipients.add(session.entity)
265+        elif stanzaType in ('chat', 'normal'):
266+            priorities = {}
267+            for session in userSessions.itervalues():
268+                if not session.presence or not session.presence.available:
269+                    continue
270+                priority = session.presence.priority
271+                if priority >= 0:
272+                    priorities.setdefault(priority, set()).add(session.entity)
273+                maxPriority = max(priorities.keys())
274+                recipients.update(priorities[maxPriority])
275+        elif stanzaType == 'groupchat':
276+            raise error.StanzaError('service-unavailable')
277+
278+        if recipients:
279+            for recipient in recipients:
280+                self.sessionManager.deliverStanza(message, recipient)
281+        elif stanzaType in ('chat', 'normal'):
282+            raise error.StanzaError('service-unavailable')
283+        else:
284+            # silently discard
285+            log.msg("Discarding message to %r" % message['to'])
286+
287+
288+
289+
290+def clonePresence(presence):
291+    """
292+    Make a deep copy of a presence stanza.
293+
294+    The returned presence stanza is an orphaned deep copy of the given
295+    original.
296+
297+    @note: Since the reference to the original parent, if any, is gone,
298+    inherited attributes like C{xml:lang} are not preserved.
299+    """
300+    element = presence.element
301+
302+    parent = element.parent
303+    element.parent = None
304+    newElement = copy.deepcopy(element)
305+    element.parent = parent
306+    return newElement
307+
308+
309+
310+class PresenceServerHandler(PresenceProtocol):
311+
312+    def __init__(self, sessionManager, domain, roster):
313+        PresenceProtocol.__init__(self)
314+        self.sessionManager = sessionManager
315+        self.domain = domain
316+        self.roster = roster
317+        self.presences = {} # user -> resource -> presence
318+        self.offlinePresences = {} # user -> presence
319+        self.remotePresences = {} # user -> remote entity -> presence
320+
321+        self.sessionManager.clientStream.addObserver('/presence',
322+                                                     self._onPresenceOutbound)
323+
324+
325+    def _onPresenceOutbound(self, element):
326+        log.msg("Got outbound presence: %r" % element.toXml())
327+        presence = self.parsePresence(element)
328+
329+        presenceType = presence.stanzaType or 'available'
330+        method = '%sReceivedOutbound' % presenceType
331+        print method
332+
333+        try:
334+            handler = getattr(self, method)
335+        except AttributeError:
336+            return
337+        else:
338+            element.handled = handler(presence)
339+
340+
341+    def _broadcastToOtherResources(self, presence):
342+        """
343+        Broadcast presence to other available resources.
344+        """
345+        fromJID = presence.sender
346+        for otherResource in self.presences[fromJID.user]:
347+            if otherResource == fromJID.resource:
348+                continue
349+
350+            resourceJID = jid.JID(tuple=(fromJID.user,
351+                                         fromJID.host,
352+                                         otherResource))
353+            outPresence = clonePresence(presence)
354+            outPresence['to'] = resourceJID.full()
355+            self.sessionManager.deliverStanza(outPresence, resourceJID)
356+
357+
358+    def _broadcastToContacts(self, presence):
359+        """
360+        Broadcast presence to subscribed entities.
361+        """
362+        fromJID = presence.sender
363+        roster = self.roster[fromJID.user]
364+
365+        for item in roster.itervalues():
366+            if not item.subscriptionFrom:
367+                continue
368+
369+            outPresence = clonePresence(presence)
370+            outPresence['to'] = item.entity.full()
371+
372+            if item.entity.host == self.domain:
373+                # local contact
374+                if item.entity.user in self.presences:
375+                    # broadcast to contact's available resources
376+                    for itemResource in self.presences[item.entity.user]:
377+                        resourceJID = jid.JID(tuple=(item.entity.user,
378+                                                     item.entity.host,
379+                                                     itemResource))
380+                        self.sessionManager.deliverStanza(outPresence,
381+                                                          resourceJID)
382+            else:
383+                # remote contact
384+                self.send(outPresence)
385+
386+
387+    def _on_availableBroadcast(self, presence):
388+        fromJID = presence.sender
389+        user, resource = fromJID.user, fromJID.resource
390+        roster = self.roster[user]
391+
392+        if user not in self.presences:
393+            # initial presence
394+            self.presences[user] = {}
395+            self.remotePresences[user] = {}
396+
397+            # send out probes
398+            for item in roster.itervalues():
399+                if item.subscriptionTo and item.entity.host != self.domain:
400+                    self.probe(item.entity, fromJID)
401+        else:
402+            if resource not in self.presences[user]:
403+                # initial presence with another available resource
404+
405+                # send last known presences from remote contacts
406+                remotePresences = self.remotePresences[user]
407+                for entity, remotePresence in remotePresences.iteritems():
408+                    self.sessionManager.deliverStanza(remotePresence.element,
409+                                                      fromJID)
410+
411+            # send presence to other resources
412+            self._broadcastToOtherResources(presence)
413+
414+        # Send last known local presences
415+        if user not in self.presences or resource not in self.presences[user]:
416+            for item in roster.itervalues():
417+                if item.subscriptionTo and \
418+                   item.entity.host == self.domain and \
419+                   item.entity.user in self.presences:
420+                    for contactPresence in \
421+                            self.presences[item.entity.user].itervalues():
422+                        outPresence = clonePresence(contactPresence)
423+                        outPresence['to'] = fromJID.userhost()
424+                        self.sessionManager.deliverStanza(outPresence, fromJID)
425+
426+        # broadcast presence
427+        self._broadcastToContacts(presence)
428+
429+        # save presence
430+        self.presences[user][resource] = presence
431+        self.sessionManager.sessions[user][resource].presence = presence
432+
433+        return True
434+
435+
436+    def _on_availableDirected(self, presence):
437+        self.send(presence.element)
438+        return True
439+
440+
441+    def availableReceivedOutbound(self, presence):
442+        if presence.recipient:
443+            return self._on_availableDirected(presence)
444+        else:
445+            return self._on_availableBroadcast(presence)
446+
447+
448+    def availableReceived(self, presence):
449+        fromJID = presence.sender
450+        toJID = presence.recipient
451+
452+        if toJID.user not in self.roster:
453+            return False
454+
455+        if toJID.user in self.presences:
456+            for resource in self.presences[toJID.user]:
457+                resourceJID = jid.JID(tuple=(toJID.user,
458+                                             toJID.host,
459+                                             resource))
460+                self.sessionManager.deliverStanza(presence.element, resourceJID)
461+            self.remotePresences[toJID.user][fromJID] = presence
462+        else:
463+            # no such user or no available resource, ignore this stanza
464+            pass
465+
466+        return True
467+
468+
469+    def _on_unavailableBroadcast(self, presence):
470+        fromJID = presence.sender
471+        user, resource = fromJID.user, fromJID.resource
472+
473+        # broadcast presence
474+        self._broadcastToContacts(presence)
475+
476+        if user in self.presences:
477+            # send presence to other resources
478+            self._broadcastToOtherResources(presence)
479+
480+            # update stored presences
481+            if resource in self.presences[user]:
482+                del self.presences[user][resource]
483+
484+            if not self.presences[user]:
485+                # last resource to become unavailable
486+                del self.presences[user]
487+
488+                # TODO: save last unavailable presence
489+
490+        return True
491+
492+
493+    def _on_unavailableDirected(self, presence):
494+        self.send(presence.element)
495+        return True
496+
497+
498+    def unavailableReceivedOutbound(self, presence):
499+        if presence.recipient:
500+            return self._on_unavailableDirected(presence)
501+        else:
502+            return self._on_unavailableBroadcast(presence)
503+
504+#    def unavailableReceived(self, presence):
505+
506+
507+    def subscribedReceivedOutbound(self, presence):
508+        log.msg("%r subscribed %s to its presence" % (presence.sender,
509+                                                      presence.recipient))
510+        self.send(presence.element)
511+        return True
512+
513+
514+    def subscribedReceived(self, presence):
515+        log.msg("%r subscribed %s to its presence" % (presence.sender,
516+                                                      presence.recipient))
517+
518+
519+    def unsubscribedReceivedOutbound(self, presence):
520+        log.msg("%r unsubscribed %s from its presence" % (presence.sender,
521+                                                          presence.recipient))
522+        self.send(presence.element)
523+        return True
524+
525+
526+    def unsubscribedReceived(self, presence):
527+        log.msg("%r unsubscribed %s from its presence" % (presence.sender,
528+                                                          presence.recipient))
529+
530+
531+    def subscribeReceivedOutbound(self, presence):
532+        log.msg("%r requests subscription to %s" % (presence.sender,
533+                                                    presence.recipient))
534+        self.send(presence.element)
535+        return True
536+
537+
538+    def subscribeReceived(self, presence):
539+        log.msg("%r requests subscription to %s" % (presence.sender,
540+                                                    presence.recipient))
541+
542+
543+    def unsubscribeReceivedOutbound(self, presence):
544+        log.msg("%r requests unsubscription from %s" % (presence.sender,
545+                                                        presence.recipient))
546+        self.send(presence.element)
547+        return True
548+
549+
550+    def unsubscribeReceived(self, presence):
551+        log.msg("%r requests unsubscription from %s" % (presence.sender,
552+                                                        presence.recipient))
553+
554+
555+    def probeReceived(self, presence):
556+        fromJID = presence.sender
557+        toJID = presence.recipient
558+
559+        if toJID.user not in self.roster or \
560+           fromJID.userhost() not in self.roster[toJID.user] or \
561+           not self.roster[toJID.user][fromJID.userhost()].subscriptionFrom:
562+            # send unsubscribed
563+            pass
564+        elif toJID.user not in self.presences:
565+            # send last unavailable or nothing
566+            pass
567+        else:
568+            for resourcePresence in self.presences[toJID.user].itervalues():
569+                outPresence = clonePresence(resourcePresence)
570+                outPresence['to'] = fromJID.userhost()
571+                self.send(outPresence)
572+
573+
574+
575 class RosterServerProtocol(XMPPHandler, IQHandlerMixin):
576     """
577     XMPP subprotocol handler for the roster, server side.
578diff -r 9e8497278e0e wokkel/test/test_im.py
579--- a/wokkel/test/test_im.py    Thu Oct 06 18:47:41 2011 +0200
580+++ b/wokkel/test/test_im.py    Thu Oct 06 18:55:13 2011 +0200
581@@ -13,7 +13,7 @@
582 from twisted.words.xish import domish, utility
583 
584 from wokkel import im
585-from wokkel.generic import ErrorStanza, parseXml
586+from wokkel.generic import ErrorStanza, Stanza, parseXml
587 from wokkel.test.helpers import TestableRequestHandlerMixin, XmlStreamStub
588 
589 NS_XML = 'http://www.w3.org/XML/1998/namespace'
590@@ -846,6 +846,82 @@
591 
592 
593 
594+class AccountIQHandlerTest(unittest.TestCase):
595+    """
596+    Tests for L{im.AccountIQHandler}.
597+    """
598+
599+    def setUp(self):
600+        self.stub = XmlStreamStub()
601+        self.protocol = im.AccountIQHandler(None)
602+        self.protocol.makeConnection(self.stub.xmlstream)
603+        self.protocol.connectionInitialized()
604+
605+
606+    def test_onIQNotUser(self):
607+        """
608+        IQs to JIDs without local part are ignored.
609+        """
610+        xml = """
611+          <iq to='example.org'>
612+            <query xmlns='jabber:iq:version'/>
613+          </iq>
614+        """
615+
616+        iq = parseXml(xml)
617+        self.stub.send(iq)
618+
619+        self.assertFalse(getattr(iq, 'handled'))
620+
621+
622+
623+class AccountMessageHandlerTest(unittest.TestCase):
624+    """
625+    Tests for L{im.AccountMessageHandler}.
626+    """
627+
628+    def setUp(self):
629+        self.stub = XmlStreamStub()
630+        self.protocol = im.AccountMessageHandler(None)
631+        self.protocol.makeConnection(self.stub.xmlstream)
632+        self.protocol.connectionInitialized()
633+
634+
635+    def test_onMessageNotUser(self):
636+        """
637+        Messages to JIDs without local part are ignored.
638+        """
639+        xml = """
640+          <message to='example.org'>
641+            <body>Hello</body>
642+          </message>
643+        """
644+
645+        message = parseXml(xml)
646+        self.stub.send(message)
647+
648+        self.assertFalse(getattr(message, 'handled'))
649+
650+
651+
652+class ClonePresenceTest(unittest.TestCase):
653+    """
654+    Tests for L{im.clonePresence}.
655+    """
656+
657+    def test_rootElement(self):
658+        """
659+        The copied presence stanza is not identical, but renders identically.
660+        """
661+        originalElement = domish.Element((None, 'presence'))
662+        stanza = Stanza.fromElement(originalElement)
663+        copyElement = im.clonePresence(stanza)
664+
665+        self.assertNotIdentical(copyElement, originalElement)
666+        self.assertEquals(copyElement.toXml(), originalElement.toXml())
667+
668+
669+
670 class RosterServerProtocolTest(unittest.TestCase, TestableRequestHandlerMixin):
671     """
672     Tests for L{im.RosterServerProtocol}.
Note: See TracBrowser for help on using the repository browser.