source:
ralphm-patches/c2s_stanza_handlers.patch
@
54:03ec57713c90
Last change on this file since 54:03ec57713c90 was 54:03ec57713c90, checked in by Ralph Meijer <ralphm@…>, 11 years ago | |
---|---|
File size: 21.8 KB |
-
new file doc/examples/client_service.tac
# HG changeset patch # Parent 9e8497278e0e4f8a145f321a4e4d22e3bb499b38 Add c2s protocol handlers for iq, message and presence stanzas. TODO: * Add tests. * Add docstrings. * Save last unavailable presence for future probes. diff -r 9e8497278e0e doc/examples/client_service.tac
- + 1 from twisted.application import service, strports 2 from twisted.internet import defer 3 4 from wokkel import client, im 5 from wokkel.component import InternalComponent, Router 6 from wokkel.generic import FallbackHandler 7 from wokkel.ping import PingHandler 8 from wokkel.im import RosterItem 9 10 from twisted.words.protocols.jabber.jid import internJID as JID 11 12 import socket 13 domain = socket.gethostname() 14 15 RALPHM = JID('ralphm@'+domain) 16 INTOSI = JID('intosi@'+domain) 17 TERMIE = JID('termie@'+domain) 18 19 roster = { 20 'ralphm': { 21 INTOSI: RosterItem(INTOSI, 22 subscriptionTo=True, 23 subscriptionFrom=True, 24 name='Intosi', 25 groups=set(['Friends'])), 26 TERMIE: RosterItem(TERMIE, 27 subscriptionTo=True, 28 subscriptionFrom=True, 29 name='termie'), 30 }, 31 'termie': { 32 RALPHM: RosterItem(RALPHM, 33 subscriptionTo=True, 34 subscriptionFrom=True, 35 name='ralphm'), 36 } 37 } 38 39 accounts = set(roster.keys()) 40 41 42 class StaticRoster(im.RosterServerProtocol): 43 44 def __init__(self, roster): 45 im.RosterServerProtocol.__init__(self) 46 self.roster = roster 47 48 def getRoster(self, request): 49 user = request.sender.user 50 return defer.succeed(self.roster[user].values()) 51 52 53 54 application = service.Application("Jabber server") 55 56 router = Router() 57 component = InternalComponent(router, domain) 58 component.setServiceParent(application) 59 60 sessionManager = client.SessionManager(domain, accounts) 61 sessionManager.setHandlerParent(component) 62 63 im.AccountIQHandler(sessionManager).setHandlerParent(component) 64 im.AccountMessageHandler(sessionManager).setHandlerParent(component) 65 im.PresenceServerHandler(sessionManager, domain, roster).setHandlerParent(component) 66 FallbackHandler().setHandlerParent(component) 67 StaticRoster(roster).setHandlerParent(component) 68 PingHandler().setHandlerParent(component) 69 70 c2sFactory = client.XMPPC2SServerFactory(sessionManager) 71 c2sFactory.logTraffic = True 72 c2sService = strports.service('5224', c2sFactory) 73 c2sService.setServiceParent(application) 74 75 sessionManager.connectionManager = c2sFactory -
wokkel/im.py
diff -r 9e8497278e0e wokkel/im.py
a b 10 10 U{RFC 6121<http://www.xmpp.org/rfcs/rfc6121.html>} (XMPP IM). 11 11 """ 12 12 13 import copy 14 13 15 from twisted.internet import defer 16 from twisted.python import log 14 17 from twisted.words.protocols.jabber import jid 15 18 from twisted.words.protocols.jabber import error 16 19 from twisted.words.xish import domish … … 168 171 self.xmlstream.addObserver("/presence", self._onPresence) 169 172 170 173 171 def _onPresence(self, element): 174 def parsePresence(self, element): 175 """ 176 Parse presence. 177 """ 172 178 stanza = Stanza.fromElement(element) 173 179 174 180 presenceType = stanza.stanzaType or 'available' … … 178 184 except KeyError: 179 185 return 180 186 181 presence = parser.fromElement(element) 187 return parser.fromElement(element) 188 189 190 def _onPresence(self, element): 191 presence = self.parsePresence(element) 192 presenceType = presence.stanzaType or 'available' 182 193 183 194 try: 184 195 handler = getattr(self, '%sReceived' % presenceType) 185 196 except AttributeError: 186 197 return 187 198 else: 188 handler(presence)199 element.handled = handler(presence) 189 200 190 201 191 202 def errorReceived(self, presence): … … 553 564 554 565 555 566 567 class AccountIQHandler(XMPPHandler): 568 569 def __init__(self, sessionManager): 570 XMPPHandler.__init__(self) 571 self.sessionManager = sessionManager 572 573 574 def connectionMade(self): 575 self.xmlstream.addObserver('/iq', self.onIQ, 1) 576 577 578 def onIQ(self, iq): 579 """ 580 Handler for iq stanzas to user accounts' connected resources. 581 582 If the recipient is a bare JID or there is no associated user, this 583 handler ignores the stanza, so that other handlers have a chance 584 to pick it up. If used, L{generic.FallbackHandler} will respond with a 585 C{'service-unavailable'} stanza error if no other handlers handle 586 the iq. 587 """ 588 589 if iq.handled: 590 return 591 592 try: 593 recipient = jid.internJID(iq['to']) 594 except KeyError: 595 return 596 597 if not recipient.user: 598 # This is not for an account, ignore it 599 return 600 elif recipient.user not in self.sessionManager.accounts: 601 # This is not a user, ignore it 602 return 603 elif not recipient.resource: 604 # Bare JID at local domain, ignore it 605 return 606 607 userSessions = self.sessionManager.sessions.get(recipient.user, 608 {}) 609 if recipient.resource in userSessions: 610 self.sessionManager.deliverStanza(iq, recipient) 611 else: 612 # Full JID without connected resource, return error 613 exc = error.StanzaError('service-unavailable') 614 if iq['type'] in ('result', 'error'): 615 log.err(exc, 'Could not deliver IQ response') 616 else: 617 self.send(exc.toResponse(iq)) 618 619 iq.handled = True 620 621 622 623 class AccountMessageHandler(XMPPHandler): 624 625 def __init__(self, sessionManager): 626 XMPPHandler.__init__(self) 627 self.sessionManager = sessionManager 628 629 630 def connectionMade(self): 631 self.xmlstream.addObserver('/message', self.onMessage, 1) 632 633 634 def onMessage(self, message): 635 """ 636 Handler for message stanzas to user accounts. 637 """ 638 639 if message.handled: 640 return 641 642 try: 643 recipient = jid.internJID(message['to']) 644 except KeyError: 645 return 646 647 stanzaType = message.getAttribute('type', 'normal') 648 649 try: 650 if not recipient.user: 651 # This is not for an account, ignore it 652 return 653 elif recipient.user not in self.sessionManager.accounts: 654 # This is not a user, ignore it 655 return 656 elif recipient.resource: 657 userSessions = self.sessionManager.sessions.get(recipient.user, 658 {}) 659 if recipient.resource in userSessions: 660 self.sessionManager.deliverStanza(message, recipient) 661 else: 662 if stanzaType in ('normal', 'chat', 'headline'): 663 self.onMessageBareJID(message, recipient.userhostJID()) 664 elif stanzaType == 'error': 665 log.msg("Dropping message to unconnected resource %r" % 666 recipient.full()) 667 elif stanzaType == 'groupchat': 668 raise error.StanzaError('service-unavailable') 669 else: 670 self.onMessageBareJID(message, recipient) 671 except error.StanzaError, exc: 672 if stanzaType == 'error': 673 log.err(exc, "Undeliverable error") 674 else: 675 self.send(exc.toResponse(message)) 676 677 message.handled = True 678 679 680 def onMessageBareJID(self, message, bareJID): 681 stanzaType = message.getAttribute('type', 'normal') 682 683 userSessions = self.sessionManager.sessions.get(bareJID.user, {}) 684 685 recipients = set() 686 687 if stanzaType == 'headline': 688 for session in userSessions: 689 if session.presence.priority >= 0: 690 recipients.add(session.entity) 691 elif stanzaType in ('chat', 'normal'): 692 priorities = {} 693 for session in userSessions.itervalues(): 694 if not session.presence or not session.presence.available: 695 continue 696 priority = session.presence.priority 697 if priority >= 0: 698 priorities.setdefault(priority, set()).add(session.entity) 699 maxPriority = max(priorities.keys()) 700 recipients.update(priorities[maxPriority]) 701 elif stanzaType == 'groupchat': 702 raise error.StanzaError('service-unavailable') 703 704 if recipients: 705 for recipient in recipients: 706 self.sessionManager.deliverStanza(message, recipient) 707 elif stanzaType in ('chat', 'normal'): 708 raise error.StanzaError('service-unavailable') 709 else: 710 # silently discard 711 log.msg("Discarding message to %r" % message['to']) 712 713 714 715 716 def clonePresence(presence): 717 """ 718 Make a deep copy of a presence stanza. 719 720 The returned presence stanza is an orphaned deep copy of the given 721 original. 722 723 @note: Since the reference to the original parent, if any, is gone, 724 inherited attributes like C{xml:lang} are not preserved. 725 """ 726 element = presence.element 727 728 parent = element.parent 729 element.parent = None 730 newElement = copy.deepcopy(element) 731 element.parent = parent 732 return newElement 733 734 735 736 class PresenceServerHandler(PresenceProtocol): 737 738 def __init__(self, sessionManager, domain, roster): 739 PresenceProtocol.__init__(self) 740 self.sessionManager = sessionManager 741 self.domain = domain 742 self.roster = roster 743 self.presences = {} # user -> resource -> presence 744 self.offlinePresences = {} # user -> presence 745 self.remotePresences = {} # user -> remote entity -> presence 746 747 self.sessionManager.clientStream.addObserver('/presence', 748 self._onPresenceOutbound) 749 750 751 def _onPresenceOutbound(self, element): 752 log.msg("Got outbound presence: %r" % element.toXml()) 753 presence = self.parsePresence(element) 754 755 presenceType = presence.stanzaType or 'available' 756 method = '%sReceivedOutbound' % presenceType 757 print method 758 759 try: 760 handler = getattr(self, method) 761 except AttributeError: 762 return 763 else: 764 element.handled = handler(presence) 765 766 767 def _broadcastToOtherResources(self, presence): 768 """ 769 Broadcast presence to other available resources. 770 """ 771 fromJID = presence.sender 772 for otherResource in self.presences[fromJID.user]: 773 if otherResource == fromJID.resource: 774 continue 775 776 resourceJID = jid.JID(tuple=(fromJID.user, 777 fromJID.host, 778 otherResource)) 779 outPresence = clonePresence(presence) 780 outPresence['to'] = resourceJID.full() 781 self.sessionManager.deliverStanza(outPresence, resourceJID) 782 783 784 def _broadcastToContacts(self, presence): 785 """ 786 Broadcast presence to subscribed entities. 787 """ 788 fromJID = presence.sender 789 roster = self.roster[fromJID.user] 790 791 for item in roster.itervalues(): 792 if not item.subscriptionFrom: 793 continue 794 795 outPresence = clonePresence(presence) 796 outPresence['to'] = item.entity.full() 797 798 if item.entity.host == self.domain: 799 # local contact 800 if item.entity.user in self.presences: 801 # broadcast to contact's available resources 802 for itemResource in self.presences[item.entity.user]: 803 resourceJID = jid.JID(tuple=(item.entity.user, 804 item.entity.host, 805 itemResource)) 806 self.sessionManager.deliverStanza(outPresence, 807 resourceJID) 808 else: 809 # remote contact 810 self.send(outPresence) 811 812 813 def _on_availableBroadcast(self, presence): 814 fromJID = presence.sender 815 user, resource = fromJID.user, fromJID.resource 816 roster = self.roster[user] 817 818 if user not in self.presences: 819 # initial presence 820 self.presences[user] = {} 821 self.remotePresences[user] = {} 822 823 # send out probes 824 for item in roster.itervalues(): 825 if item.subscriptionTo and item.entity.host != self.domain: 826 self.probe(item.entity, fromJID) 827 else: 828 if resource not in self.presences[user]: 829 # initial presence with another available resource 830 831 # send last known presences from remote contacts 832 remotePresences = self.remotePresences[user] 833 for entity, remotePresence in remotePresences.iteritems(): 834 self.sessionManager.deliverStanza(remotePresence.element, 835 fromJID) 836 837 # send presence to other resources 838 self._broadcastToOtherResources(presence) 839 840 # Send last known local presences 841 if user not in self.presences or resource not in self.presences[user]: 842 for item in roster.itervalues(): 843 if item.subscriptionTo and \ 844 item.entity.host == self.domain and \ 845 item.entity.user in self.presences: 846 for contactPresence in \ 847 self.presences[item.entity.user].itervalues(): 848 outPresence = clonePresence(contactPresence) 849 outPresence['to'] = fromJID.userhost() 850 self.sessionManager.deliverStanza(outPresence, fromJID) 851 852 # broadcast presence 853 self._broadcastToContacts(presence) 854 855 # save presence 856 self.presences[user][resource] = presence 857 self.sessionManager.sessions[user][resource].presence = presence 858 859 return True 860 861 862 def _on_availableDirected(self, presence): 863 self.send(presence.element) 864 return True 865 866 867 def availableReceivedOutbound(self, presence): 868 if presence.recipient: 869 return self._on_availableDirected(presence) 870 else: 871 return self._on_availableBroadcast(presence) 872 873 874 def availableReceived(self, presence): 875 fromJID = presence.sender 876 toJID = presence.recipient 877 878 if toJID.user not in self.roster: 879 return False 880 881 if toJID.user in self.presences: 882 for resource in self.presences[toJID.user]: 883 resourceJID = jid.JID(tuple=(toJID.user, 884 toJID.host, 885 resource)) 886 self.sessionManager.deliverStanza(presence.element, resourceJID) 887 self.remotePresences[toJID.user][fromJID] = presence 888 else: 889 # no such user or no available resource, ignore this stanza 890 pass 891 892 return True 893 894 895 def _on_unavailableBroadcast(self, presence): 896 fromJID = presence.sender 897 user, resource = fromJID.user, fromJID.resource 898 899 # broadcast presence 900 self._broadcastToContacts(presence) 901 902 if user in self.presences: 903 # send presence to other resources 904 self._broadcastToOtherResources(presence) 905 906 # update stored presences 907 if resource in self.presences[user]: 908 del self.presences[user][resource] 909 910 if not self.presences[user]: 911 # last resource to become unavailable 912 del self.presences[user] 913 914 # TODO: save last unavailable presence 915 916 return True 917 918 919 def _on_unavailableDirected(self, presence): 920 self.send(presence.element) 921 return True 922 923 924 def unavailableReceivedOutbound(self, presence): 925 if presence.recipient: 926 return self._on_unavailableDirected(presence) 927 else: 928 return self._on_unavailableBroadcast(presence) 929 930 # def unavailableReceived(self, presence): 931 932 933 def subscribedReceivedOutbound(self, presence): 934 log.msg("%r subscribed %s to its presence" % (presence.sender, 935 presence.recipient)) 936 self.send(presence.element) 937 return True 938 939 940 def subscribedReceived(self, presence): 941 log.msg("%r subscribed %s to its presence" % (presence.sender, 942 presence.recipient)) 943 944 945 def unsubscribedReceivedOutbound(self, presence): 946 log.msg("%r unsubscribed %s from its presence" % (presence.sender, 947 presence.recipient)) 948 self.send(presence.element) 949 return True 950 951 952 def unsubscribedReceived(self, presence): 953 log.msg("%r unsubscribed %s from its presence" % (presence.sender, 954 presence.recipient)) 955 956 957 def subscribeReceivedOutbound(self, presence): 958 log.msg("%r requests subscription to %s" % (presence.sender, 959 presence.recipient)) 960 self.send(presence.element) 961 return True 962 963 964 def subscribeReceived(self, presence): 965 log.msg("%r requests subscription to %s" % (presence.sender, 966 presence.recipient)) 967 968 969 def unsubscribeReceivedOutbound(self, presence): 970 log.msg("%r requests unsubscription from %s" % (presence.sender, 971 presence.recipient)) 972 self.send(presence.element) 973 return True 974 975 976 def unsubscribeReceived(self, presence): 977 log.msg("%r requests unsubscription from %s" % (presence.sender, 978 presence.recipient)) 979 980 981 def probeReceived(self, presence): 982 fromJID = presence.sender 983 toJID = presence.recipient 984 985 if toJID.user not in self.roster or \ 986 fromJID.userhost() not in self.roster[toJID.user] or \ 987 not self.roster[toJID.user][fromJID.userhost()].subscriptionFrom: 988 # send unsubscribed 989 pass 990 elif toJID.user not in self.presences: 991 # send last unavailable or nothing 992 pass 993 else: 994 for resourcePresence in self.presences[toJID.user].itervalues(): 995 outPresence = clonePresence(resourcePresence) 996 outPresence['to'] = fromJID.userhost() 997 self.send(outPresence) 998 999 1000 556 1001 class RosterServerProtocol(XMPPHandler, IQHandlerMixin): 557 1002 """ 558 1003 XMPP subprotocol handler for the roster, server side. -
wokkel/test/test_im.py
diff -r 9e8497278e0e wokkel/test/test_im.py
a b 13 13 from twisted.words.xish import domish, utility 14 14 15 15 from wokkel import im 16 from wokkel.generic import ErrorStanza, parseXml16 from wokkel.generic import ErrorStanza, Stanza, parseXml 17 17 from wokkel.test.helpers import TestableRequestHandlerMixin, XmlStreamStub 18 18 19 19 NS_XML = 'http://www.w3.org/XML/1998/namespace' … … 846 846 847 847 848 848 849 class AccountIQHandlerTest(unittest.TestCase): 850 """ 851 Tests for L{im.AccountIQHandler}. 852 """ 853 854 def setUp(self): 855 self.stub = XmlStreamStub() 856 self.protocol = im.AccountIQHandler(None) 857 self.protocol.makeConnection(self.stub.xmlstream) 858 self.protocol.connectionInitialized() 859 860 861 def test_onIQNotUser(self): 862 """ 863 IQs to JIDs without local part are ignored. 864 """ 865 xml = """ 866 <iq to='example.org'> 867 <query xmlns='jabber:iq:version'/> 868 </iq> 869 """ 870 871 iq = parseXml(xml) 872 self.stub.send(iq) 873 874 self.assertFalse(getattr(iq, 'handled')) 875 876 877 878 class AccountMessageHandlerTest(unittest.TestCase): 879 """ 880 Tests for L{im.AccountMessageHandler}. 881 """ 882 883 def setUp(self): 884 self.stub = XmlStreamStub() 885 self.protocol = im.AccountMessageHandler(None) 886 self.protocol.makeConnection(self.stub.xmlstream) 887 self.protocol.connectionInitialized() 888 889 890 def test_onMessageNotUser(self): 891 """ 892 Messages to JIDs without local part are ignored. 893 """ 894 xml = """ 895 <message to='example.org'> 896 <body>Hello</body> 897 </message> 898 """ 899 900 message = parseXml(xml) 901 self.stub.send(message) 902 903 self.assertFalse(getattr(message, 'handled')) 904 905 906 907 class ClonePresenceTest(unittest.TestCase): 908 """ 909 Tests for L{im.clonePresence}. 910 """ 911 912 def test_rootElement(self): 913 """ 914 The copied presence stanza is not identical, but renders identically. 915 """ 916 originalElement = domish.Element((None, 'presence')) 917 stanza = Stanza.fromElement(originalElement) 918 copyElement = im.clonePresence(stanza) 919 920 self.assertNotIdentical(copyElement, originalElement) 921 self.assertEquals(copyElement.toXml(), originalElement.toXml()) 922 923 924 849 925 class RosterServerProtocolTest(unittest.TestCase, TestableRequestHandlerMixin): 850 926 """ 851 927 Tests for L{im.RosterServerProtocol}.
Note: See TracBrowser
for help on using the repository browser.