source:
ralphm-patches/c2s_stanza_handlers.patch
@
57:0d8b6cf41728
Last change on this file since 57:0d8b6cf41728 was 57:0d8b6cf41728, checked in by Ralph Meijer <ralphm@…>, 9 years ago | |
---|---|
File size: 21.8 KB |
-
new file doc/examples/client_service.tac
# HG changeset patch # Parent 24c6e79ab1c449f41fd5d4c2cb843dc16efe2c59 Add c2s protocol handlers for iq, message and presence stanzas. TODO: * Add tests. * Add docstrings. * Save last unavailable presence for future probes. diff -r 24c6e79ab1c4 doc/examples/client_service.tac
- + 1 from twisted.application import service, strports 2 from twisted.internet import defer 3 4 from wokkel import client, im 5 from wokkel.component import InternalComponent, Router 6 from wokkel.generic import FallbackHandler 7 from wokkel.ping import PingHandler 8 from wokkel.im import RosterItem 9 10 from twisted.words.protocols.jabber.jid import internJID as JID 11 12 import socket 13 domain = socket.gethostname() 14 15 RALPHM = JID('ralphm@'+domain) 16 INTOSI = JID('intosi@'+domain) 17 TERMIE = JID('termie@'+domain) 18 19 roster = { 20 'ralphm': { 21 INTOSI: RosterItem(INTOSI, 22 subscriptionTo=True, 23 subscriptionFrom=True, 24 name='Intosi', 25 groups=set(['Friends'])), 26 TERMIE: RosterItem(TERMIE, 27 subscriptionTo=True, 28 subscriptionFrom=True, 29 name='termie'), 30 }, 31 'termie': { 32 RALPHM: RosterItem(RALPHM, 33 subscriptionTo=True, 34 subscriptionFrom=True, 35 name='ralphm'), 36 } 37 } 38 39 accounts = set(roster.keys()) 40 41 42 class StaticRoster(im.RosterServerProtocol): 43 44 def __init__(self, roster): 45 im.RosterServerProtocol.__init__(self) 46 self.roster = roster 47 48 def getRoster(self, request): 49 user = request.sender.user 50 return defer.succeed(self.roster[user].values()) 51 52 53 54 application = service.Application("Jabber server") 55 56 router = Router() 57 component = InternalComponent(router, domain) 58 component.setServiceParent(application) 59 60 sessionManager = client.SessionManager(domain, accounts) 61 sessionManager.setHandlerParent(component) 62 63 im.AccountIQHandler(sessionManager).setHandlerParent(component) 64 im.AccountMessageHandler(sessionManager).setHandlerParent(component) 65 im.PresenceServerHandler(sessionManager, domain, roster).setHandlerParent(component) 66 FallbackHandler().setHandlerParent(component) 67 StaticRoster(roster).setHandlerParent(component) 68 PingHandler().setHandlerParent(component) 69 70 c2sFactory = client.XMPPC2SServerFactory(sessionManager) 71 c2sFactory.logTraffic = True 72 c2sService = strports.service('5224', c2sFactory) 73 c2sService.setServiceParent(application) 74 75 sessionManager.connectionManager = c2sFactory -
wokkel/im.py
diff -r 24c6e79ab1c4 wokkel/im.py
a b 10 10 U{RFC 6121<http://www.xmpp.org/rfcs/rfc6121.html>} (XMPP IM). 11 11 """ 12 12 13 import copy 14 13 15 from twisted.internet import defer 16 from twisted.python import log 14 17 from twisted.words.protocols.jabber import error 15 18 from twisted.words.protocols.jabber import jid 16 19 from twisted.words.xish import domish … … 179 182 180 183 181 184 182 def _onPresence(self, element): 183 """ 184 Called when a presence stanza has been received. 185 """ 185 def parsePresence(self, element): 186 186 stanza = Stanza.fromElement(element) 187 187 188 188 presenceType = stanza.stanzaType or 'available' … … 192 192 except KeyError: 193 193 return 194 194 195 presence = parser.fromElement(element) 195 return parser.fromElement(element) 196 197 198 def _onPresence(self, element): 199 """ 200 Called when a presence stanza has been received. 201 """ 202 presence = self.parsePresence(element) 203 presenceType = presence.stanzaType or 'available' 196 204 197 205 try: 198 206 handler = getattr(self, '%sReceived' % presenceType) 199 207 except AttributeError: 200 208 return 201 209 else: 202 handler(presence)210 element.handled = handler(presence) 203 211 204 212 205 213 … … 582 590 583 591 584 592 593 class AccountIQHandler(XMPPHandler): 594 595 def __init__(self, sessionManager): 596 XMPPHandler.__init__(self) 597 self.sessionManager = sessionManager 598 599 600 def connectionMade(self): 601 self.xmlstream.addObserver('/iq', self.onIQ, 1) 602 603 604 def onIQ(self, iq): 605 """ 606 Handler for iq stanzas to user accounts' connected resources. 607 608 If the recipient is a bare JID or there is no associated user, this 609 handler ignores the stanza, so that other handlers have a chance 610 to pick it up. If used, L{generic.FallbackHandler} will respond with a 611 C{'service-unavailable'} stanza error if no other handlers handle 612 the iq. 613 """ 614 615 if iq.handled: 616 return 617 618 try: 619 recipient = jid.internJID(iq['to']) 620 except KeyError: 621 return 622 623 if not recipient.user: 624 # This is not for an account, ignore it 625 return 626 elif recipient.user not in self.sessionManager.accounts: 627 # This is not a user, ignore it 628 return 629 elif not recipient.resource: 630 # Bare JID at local domain, ignore it 631 return 632 633 userSessions = self.sessionManager.sessions.get(recipient.user, 634 {}) 635 if recipient.resource in userSessions: 636 self.sessionManager.deliverStanza(iq, recipient) 637 else: 638 # Full JID without connected resource, return error 639 exc = error.StanzaError('service-unavailable') 640 if iq['type'] in ('result', 'error'): 641 log.err(exc, 'Could not deliver IQ response') 642 else: 643 self.send(exc.toResponse(iq)) 644 645 iq.handled = True 646 647 648 649 class AccountMessageHandler(XMPPHandler): 650 651 def __init__(self, sessionManager): 652 XMPPHandler.__init__(self) 653 self.sessionManager = sessionManager 654 655 656 def connectionMade(self): 657 self.xmlstream.addObserver('/message', self.onMessage, 1) 658 659 660 def onMessage(self, message): 661 """ 662 Handler for message stanzas to user accounts. 663 """ 664 665 if message.handled: 666 return 667 668 try: 669 recipient = jid.internJID(message['to']) 670 except KeyError: 671 return 672 673 stanzaType = message.getAttribute('type', 'normal') 674 675 try: 676 if not recipient.user: 677 # This is not for an account, ignore it 678 return 679 elif recipient.user not in self.sessionManager.accounts: 680 # This is not a user, ignore it 681 return 682 elif recipient.resource: 683 userSessions = self.sessionManager.sessions.get(recipient.user, 684 {}) 685 if recipient.resource in userSessions: 686 self.sessionManager.deliverStanza(message, recipient) 687 else: 688 if stanzaType in ('normal', 'chat', 'headline'): 689 self.onMessageBareJID(message, recipient.userhostJID()) 690 elif stanzaType == 'error': 691 log.msg("Dropping message to unconnected resource %r" % 692 recipient.full()) 693 elif stanzaType == 'groupchat': 694 raise error.StanzaError('service-unavailable') 695 else: 696 self.onMessageBareJID(message, recipient) 697 except error.StanzaError, exc: 698 if stanzaType == 'error': 699 log.err(exc, "Undeliverable error") 700 else: 701 self.send(exc.toResponse(message)) 702 703 message.handled = True 704 705 706 def onMessageBareJID(self, message, bareJID): 707 stanzaType = message.getAttribute('type', 'normal') 708 709 userSessions = self.sessionManager.sessions.get(bareJID.user, {}) 710 711 recipients = set() 712 713 if stanzaType == 'headline': 714 for session in userSessions: 715 if session.presence.priority >= 0: 716 recipients.add(session.entity) 717 elif stanzaType in ('chat', 'normal'): 718 priorities = {} 719 for session in userSessions.itervalues(): 720 if not session.presence or not session.presence.available: 721 continue 722 priority = session.presence.priority 723 if priority >= 0: 724 priorities.setdefault(priority, set()).add(session.entity) 725 maxPriority = max(priorities.keys()) 726 recipients.update(priorities[maxPriority]) 727 elif stanzaType == 'groupchat': 728 raise error.StanzaError('service-unavailable') 729 730 if recipients: 731 for recipient in recipients: 732 self.sessionManager.deliverStanza(message, recipient) 733 elif stanzaType in ('chat', 'normal'): 734 raise error.StanzaError('service-unavailable') 735 else: 736 # silently discard 737 log.msg("Discarding message to %r" % message['to']) 738 739 740 741 742 def clonePresence(presence): 743 """ 744 Make a deep copy of a presence stanza. 745 746 The returned presence stanza is an orphaned deep copy of the given 747 original. 748 749 @note: Since the reference to the original parent, if any, is gone, 750 inherited attributes like C{xml:lang} are not preserved. 751 """ 752 element = presence.element 753 754 parent = element.parent 755 element.parent = None 756 newElement = copy.deepcopy(element) 757 element.parent = parent 758 return newElement 759 760 761 762 class PresenceServerHandler(PresenceProtocol): 763 764 def __init__(self, sessionManager, domain, roster): 765 PresenceProtocol.__init__(self) 766 self.sessionManager = sessionManager 767 self.domain = domain 768 self.roster = roster 769 self.presences = {} # user -> resource -> presence 770 self.offlinePresences = {} # user -> presence 771 self.remotePresences = {} # user -> remote entity -> presence 772 773 self.sessionManager.clientStream.addObserver('/presence', 774 self._onPresenceOutbound) 775 776 777 def _onPresenceOutbound(self, element): 778 log.msg("Got outbound presence: %r" % element.toXml()) 779 presence = self.parsePresence(element) 780 781 presenceType = presence.stanzaType or 'available' 782 method = '%sReceivedOutbound' % presenceType 783 print method 784 785 try: 786 handler = getattr(self, method) 787 except AttributeError: 788 return 789 else: 790 element.handled = handler(presence) 791 792 793 def _broadcastToOtherResources(self, presence): 794 """ 795 Broadcast presence to other available resources. 796 """ 797 fromJID = presence.sender 798 for otherResource in self.presences[fromJID.user]: 799 if otherResource == fromJID.resource: 800 continue 801 802 resourceJID = jid.JID(tuple=(fromJID.user, 803 fromJID.host, 804 otherResource)) 805 outPresence = clonePresence(presence) 806 outPresence['to'] = resourceJID.full() 807 self.sessionManager.deliverStanza(outPresence, resourceJID) 808 809 810 def _broadcastToContacts(self, presence): 811 """ 812 Broadcast presence to subscribed entities. 813 """ 814 fromJID = presence.sender 815 roster = self.roster[fromJID.user] 816 817 for item in roster.itervalues(): 818 if not item.subscriptionFrom: 819 continue 820 821 outPresence = clonePresence(presence) 822 outPresence['to'] = item.entity.full() 823 824 if item.entity.host == self.domain: 825 # local contact 826 if item.entity.user in self.presences: 827 # broadcast to contact's available resources 828 for itemResource in self.presences[item.entity.user]: 829 resourceJID = jid.JID(tuple=(item.entity.user, 830 item.entity.host, 831 itemResource)) 832 self.sessionManager.deliverStanza(outPresence, 833 resourceJID) 834 else: 835 # remote contact 836 self.send(outPresence) 837 838 839 def _on_availableBroadcast(self, presence): 840 fromJID = presence.sender 841 user, resource = fromJID.user, fromJID.resource 842 roster = self.roster[user] 843 844 if user not in self.presences: 845 # initial presence 846 self.presences[user] = {} 847 self.remotePresences[user] = {} 848 849 # send out probes 850 for item in roster.itervalues(): 851 if item.subscriptionTo and item.entity.host != self.domain: 852 self.probe(item.entity, fromJID) 853 else: 854 if resource not in self.presences[user]: 855 # initial presence with another available resource 856 857 # send last known presences from remote contacts 858 remotePresences = self.remotePresences[user] 859 for entity, remotePresence in remotePresences.iteritems(): 860 self.sessionManager.deliverStanza(remotePresence.element, 861 fromJID) 862 863 # send presence to other resources 864 self._broadcastToOtherResources(presence) 865 866 # Send last known local presences 867 if user not in self.presences or resource not in self.presences[user]: 868 for item in roster.itervalues(): 869 if item.subscriptionTo and \ 870 item.entity.host == self.domain and \ 871 item.entity.user in self.presences: 872 for contactPresence in \ 873 self.presences[item.entity.user].itervalues(): 874 outPresence = clonePresence(contactPresence) 875 outPresence['to'] = fromJID.userhost() 876 self.sessionManager.deliverStanza(outPresence, fromJID) 877 878 # broadcast presence 879 self._broadcastToContacts(presence) 880 881 # save presence 882 self.presences[user][resource] = presence 883 self.sessionManager.sessions[user][resource].presence = presence 884 885 return True 886 887 888 def _on_availableDirected(self, presence): 889 self.send(presence.element) 890 return True 891 892 893 def availableReceivedOutbound(self, presence): 894 if presence.recipient: 895 return self._on_availableDirected(presence) 896 else: 897 return self._on_availableBroadcast(presence) 898 899 900 def availableReceived(self, presence): 901 fromJID = presence.sender 902 toJID = presence.recipient 903 904 if toJID.user not in self.roster: 905 return False 906 907 if toJID.user in self.presences: 908 for resource in self.presences[toJID.user]: 909 resourceJID = jid.JID(tuple=(toJID.user, 910 toJID.host, 911 resource)) 912 self.sessionManager.deliverStanza(presence.element, resourceJID) 913 self.remotePresences[toJID.user][fromJID] = presence 914 else: 915 # no such user or no available resource, ignore this stanza 916 pass 917 918 return True 919 920 921 def _on_unavailableBroadcast(self, presence): 922 fromJID = presence.sender 923 user, resource = fromJID.user, fromJID.resource 924 925 # broadcast presence 926 self._broadcastToContacts(presence) 927 928 if user in self.presences: 929 # send presence to other resources 930 self._broadcastToOtherResources(presence) 931 932 # update stored presences 933 if resource in self.presences[user]: 934 del self.presences[user][resource] 935 936 if not self.presences[user]: 937 # last resource to become unavailable 938 del self.presences[user] 939 940 # TODO: save last unavailable presence 941 942 return True 943 944 945 def _on_unavailableDirected(self, presence): 946 self.send(presence.element) 947 return True 948 949 950 def unavailableReceivedOutbound(self, presence): 951 if presence.recipient: 952 return self._on_unavailableDirected(presence) 953 else: 954 return self._on_unavailableBroadcast(presence) 955 956 # def unavailableReceived(self, presence): 957 958 959 def subscribedReceivedOutbound(self, presence): 960 log.msg("%r subscribed %s to its presence" % (presence.sender, 961 presence.recipient)) 962 self.send(presence.element) 963 return True 964 965 966 def subscribedReceived(self, presence): 967 log.msg("%r subscribed %s to its presence" % (presence.sender, 968 presence.recipient)) 969 970 971 def unsubscribedReceivedOutbound(self, presence): 972 log.msg("%r unsubscribed %s from its presence" % (presence.sender, 973 presence.recipient)) 974 self.send(presence.element) 975 return True 976 977 978 def unsubscribedReceived(self, presence): 979 log.msg("%r unsubscribed %s from its presence" % (presence.sender, 980 presence.recipient)) 981 982 983 def subscribeReceivedOutbound(self, presence): 984 log.msg("%r requests subscription to %s" % (presence.sender, 985 presence.recipient)) 986 self.send(presence.element) 987 return True 988 989 990 def subscribeReceived(self, presence): 991 log.msg("%r requests subscription to %s" % (presence.sender, 992 presence.recipient)) 993 994 995 def unsubscribeReceivedOutbound(self, presence): 996 log.msg("%r requests unsubscription from %s" % (presence.sender, 997 presence.recipient)) 998 self.send(presence.element) 999 return True 1000 1001 1002 def unsubscribeReceived(self, presence): 1003 log.msg("%r requests unsubscription from %s" % (presence.sender, 1004 presence.recipient)) 1005 1006 1007 def probeReceived(self, presence): 1008 fromJID = presence.sender 1009 toJID = presence.recipient 1010 1011 if toJID.user not in self.roster or \ 1012 fromJID.userhost() not in self.roster[toJID.user] or \ 1013 not self.roster[toJID.user][fromJID.userhost()].subscriptionFrom: 1014 # send unsubscribed 1015 pass 1016 elif toJID.user not in self.presences: 1017 # send last unavailable or nothing 1018 pass 1019 else: 1020 for resourcePresence in self.presences[toJID.user].itervalues(): 1021 outPresence = clonePresence(resourcePresence) 1022 outPresence['to'] = fromJID.userhost() 1023 self.send(outPresence) 1024 1025 1026 585 1027 class RosterServerProtocol(XMPPHandler, IQHandlerMixin): 586 1028 """ 587 1029 XMPP subprotocol handler for the roster, server side. -
wokkel/test/test_im.py
diff -r 24c6e79ab1c4 wokkel/test/test_im.py
a b 13 13 from twisted.words.xish import domish, utility 14 14 15 15 from wokkel import im 16 from wokkel.generic import ErrorStanza, parseXml16 from wokkel.generic import ErrorStanza, Stanza, parseXml 17 17 from wokkel.test.helpers import TestableRequestHandlerMixin, XmlStreamStub 18 18 19 19 NS_XML = 'http://www.w3.org/XML/1998/namespace' … … 846 846 847 847 848 848 849 class AccountIQHandlerTest(unittest.TestCase): 850 """ 851 Tests for L{im.AccountIQHandler}. 852 """ 853 854 def setUp(self): 855 self.stub = XmlStreamStub() 856 self.protocol = im.AccountIQHandler(None) 857 self.protocol.makeConnection(self.stub.xmlstream) 858 self.protocol.connectionInitialized() 859 860 861 def test_onIQNotUser(self): 862 """ 863 IQs to JIDs without local part are ignored. 864 """ 865 xml = """ 866 <iq to='example.org'> 867 <query xmlns='jabber:iq:version'/> 868 </iq> 869 """ 870 871 iq = parseXml(xml) 872 self.stub.send(iq) 873 874 self.assertFalse(getattr(iq, 'handled')) 875 876 877 878 class AccountMessageHandlerTest(unittest.TestCase): 879 """ 880 Tests for L{im.AccountMessageHandler}. 881 """ 882 883 def setUp(self): 884 self.stub = XmlStreamStub() 885 self.protocol = im.AccountMessageHandler(None) 886 self.protocol.makeConnection(self.stub.xmlstream) 887 self.protocol.connectionInitialized() 888 889 890 def test_onMessageNotUser(self): 891 """ 892 Messages to JIDs without local part are ignored. 893 """ 894 xml = """ 895 <message to='example.org'> 896 <body>Hello</body> 897 </message> 898 """ 899 900 message = parseXml(xml) 901 self.stub.send(message) 902 903 self.assertFalse(getattr(message, 'handled')) 904 905 906 907 class ClonePresenceTest(unittest.TestCase): 908 """ 909 Tests for L{im.clonePresence}. 910 """ 911 912 def test_rootElement(self): 913 """ 914 The copied presence stanza is not identical, but renders identically. 915 """ 916 originalElement = domish.Element((None, 'presence')) 917 stanza = Stanza.fromElement(originalElement) 918 copyElement = im.clonePresence(stanza) 919 920 self.assertNotIdentical(copyElement, originalElement) 921 self.assertEquals(copyElement.toXml(), originalElement.toXml()) 922 923 924 849 925 class RosterServerProtocolTest(unittest.TestCase, TestableRequestHandlerMixin): 850 926 """ 851 927 Tests for L{im.RosterServerProtocol}.
Note: See TracBrowser
for help on using the repository browser.