source:
ralphm-patches/async-observer.patch
Last change on this file was 79:0752a1cca356, checked in by Ralph Meijer <ralphm@…>, 6 years ago | |
---|---|
File size: 14.6 KB |
-
wokkel/subprotocols.py
# HG changeset patch # Parent 56607e5ddb53e3fafbecb41118bd220dea9310c7 diff --git a/wokkel/subprotocols.py b/wokkel/subprotocols.py
a b 10 10 __all__ = ['XMPPHandler', 'XMPPHandlerCollection', 'StreamManager', 11 11 'IQHandlerMixin'] 12 12 13 from functools import wraps 14 13 15 from zope.interface import implements 14 16 15 17 from twisted.internet import defer … … 379 381 380 382 381 383 384 def asyncObserver(observer): 385 """ 386 Decorator for asynchronous stanza observers. 387 388 This decorator makes it easier to do proper error handling for stanza 389 observers and supports deferred results: 390 391 >>> class MyHandler(XMPPHandler): 392 ... def connectionInitialized(self): 393 ... self.xmlstream.addObserver('/message', self.onMessage) 394 ... @asyncObserver 395 ... def onMessage(self, element): 396 ... return False 397 ... @asyncObserver 398 ... def onMessage(self, element): 399 ... return True 400 ... @asyncObserver 401 ... def onMessage(self, element): 402 ... raise NotImplementedError 403 ... @asyncObserver 404 ... def onMessage(self, element): 405 ... return defer.fail(StanzaError('bad-request')) 406 407 408 If the stanza had its C{handled} attribute set to C{True}, it will be 409 ignored and the observer will not be called. 410 411 The return value of the wrapped observer is used to set the C{handled} 412 attribute, so that handlers may choose to ignore processing the same 413 stanza. 414 415 If an exception is raised, or the deferred has its errback called, the 416 exception is checked for being a L{error.StanzaError}. If so, an error 417 response is sent. A L{NotImplementedError} will cause an error response 418 with the condition C{service-unavailable}. Any other exception will cause a 419 error response of C{internal-server-error} to be sent. 420 421 The return value of the deferred is not used. 422 """ 423 @wraps(observer) 424 def observe(self, element): 425 def checkStanzaType(failure): 426 if element.getAttribute('type') in ('result', 'error'): 427 log.err(failure, 'Cannot return error in response to a ' 428 'response stanza.') 429 else: 430 return failure 431 432 def trapNotImplemented(failure): 433 failure.trap(NotImplementedError) 434 raise error.StanzaError('service-unavailable') 435 436 def trapOtherError(failure): 437 if failure.check(error.StanzaError): 438 return failure 439 else: 440 log.err(failure, "Unhandled error in observer") 441 raise error.StanzaError('internal-server-error') 442 443 def trapStanzaError(failure): 444 failure.trap(error.StanzaError) 445 self.send(failure.value.toResponse(element)) 446 447 if element.handled: 448 return 449 450 try: 451 result = observer(self, element) 452 except Exception: 453 result = defer.fail() 454 455 if isinstance(result, defer.Deferred): 456 result.addErrback(checkStanzaType) 457 result.addErrback(trapNotImplemented) 458 result.addErrback(trapOtherError) 459 result.addErrback(trapStanzaError) 460 result.addErrback(log.err) 461 462 element.handled = result 463 return observe 464 465 466 382 467 class IQHandlerMixin(object): 383 468 """ 384 469 XMPP subprotocol mixin for handle incoming IQ stanzas. … … 401 486 402 487 A typical way to use this mixin, is to set up L{xpath} observers on the 403 488 C{xmlstream} to call handleRequest, for example in an overridden 404 L{XMPPHandler.connection Made}. It is likely a good idea to only listen for405 incoming iq get and/org iq set requests, and not for any iq, to prevent406 hijacking incoming responses to outgoing iq requests. An example:489 L{XMPPHandler.connectionInitialized}. It is likely a good idea to only 490 listen for incoming iq get and/org iq set requests, and not for any iq, to 491 prevent hijacking incoming responses to outgoing iq requests. An example: 407 492 408 493 >>> QUERY_ROSTER = "/query[@xmlns='jabber:iq:roster']" 409 494 >>> class MyHandler(XMPPHandler, IQHandlerMixin): 410 495 ... iqHandlers = {"/iq[@type='get']" + QUERY_ROSTER: 'onRosterGet', 411 496 ... "/iq[@type='set']" + QUERY_ROSTER: 'onRosterSet'} 412 ... def connection Made(self):497 ... def connectionInitialized(self): 413 498 ... self.xmlstream.addObserver( 414 499 ... "/iq[@type='get' or @type='set']" + QUERY_ROSTER, 415 500 ... self.handleRequest) … … 425 510 426 511 iqHandlers = None 427 512 513 @asyncObserver 428 514 def handleRequest(self, iq): 429 515 """ 430 516 Find a handler and wrap the call for sending a response stanza. … … 435 521 if result: 436 522 if IElement.providedBy(result): 437 523 response.addChild(result) 438 else:439 for element in result:440 response.addChild(element)441 524 442 525 return response 443 526 444 def checkNotImplemented(failure):527 def trapNotImplemented(failure): 445 528 failure.trap(NotImplementedError) 446 529 raise error.StanzaError('feature-not-implemented') 447 530 448 def fromStanzaError(failure, iq):449 failure.trap(error.StanzaError)450 return failure.value.toResponse(iq)451 452 def fromOtherError(failure, iq):453 log.msg("Unhandled error in iq handler:", isError=True)454 log.err(failure)455 return error.StanzaError('internal-server-error').toResponse(iq)456 457 531 handler = None 458 532 for queryString, method in self.iqHandlers.iteritems(): 459 533 if xpath.internQuery(queryString).matches(iq): … … 461 535 462 536 if handler: 463 537 d = defer.maybeDeferred(handler, iq) 538 d.addCallback(toResult, iq) 539 d.addCallback(self.send) 464 540 else: 465 541 d = defer.fail(NotImplementedError()) 466 542 467 d.addCallback(toResult, iq) 468 d.addErrback(checkNotImplemented) 469 d.addErrback(fromStanzaError, iq) 470 d.addErrback(fromOtherError, iq) 471 472 d.addCallback(self.send) 473 474 iq.handled = True 543 d.addErrback(trapNotImplemented) 544 return d -
wokkel/test/test_subprotocols.py
diff --git a/wokkel/test/test_subprotocols.py b/wokkel/test/test_subprotocols.py
a b 790 790 self.xmlstream.send(obj) 791 791 792 792 793 794 class AsyncObserverTest(unittest.TestCase): 795 """ 796 Tests for L{wokkel.subprotocols.asyncObserver}. 797 """ 798 799 def setUp(self): 800 self.output = [] 801 802 803 def send(self, element): 804 self.output.append(element) 805 806 807 def test_handled(self): 808 """ 809 If the element is marked as handled, it is ignored. 810 """ 811 called = [] 812 813 @subprotocols.asyncObserver 814 def observer(self, element): 815 called.append(element) 816 817 element = domish.Element((None, u'message')) 818 element.handled = True 819 820 observer(self, element) 821 self.assertFalse(called) 822 self.assertTrue(element.handled) 823 self.assertFalse(self.output) 824 825 826 def test_syncFalse(self): 827 """ 828 If the observer returns False, the element is not marked as handled. 829 """ 830 @subprotocols.asyncObserver 831 def observer(self, element): 832 return False 833 834 element = domish.Element((None, u'message')) 835 836 observer(self, element) 837 self.assertFalse(element.handled) 838 self.assertFalse(self.output) 839 840 841 def test_syncTrue(self): 842 """ 843 If the observer returns True, the element is marked as handled. 844 """ 845 @subprotocols.asyncObserver 846 def observer(self, element): 847 return True 848 849 element = domish.Element((None, u'message')) 850 851 observer(self, element) 852 self.assertTrue(element.handled) 853 self.assertFalse(self.output) 854 855 856 def test_syncNotImplemented(self): 857 """ 858 NotImplementedError causes a service-unavailable response. 859 """ 860 @subprotocols.asyncObserver 861 def observer(self, element): 862 raise NotImplementedError() 863 864 element = domish.Element((None, u'message')) 865 866 observer(self, element) 867 self.assertTrue(element.handled) 868 self.assertEquals(1, len(self.output)) 869 exc = error.exceptionFromStanza(self.output[-1]) 870 self.assertEquals(u'service-unavailable', exc.condition) 871 872 873 def test_syncStanzaError(self): 874 """ 875 A StanzaError is sent back as is. 876 """ 877 @subprotocols.asyncObserver 878 def observer(self, element): 879 raise error.StanzaError(u'forbidden') 880 881 element = domish.Element((None, u'message')) 882 883 observer(self, element) 884 self.assertTrue(element.handled) 885 self.assertEquals(1, len(self.output)) 886 exc = error.exceptionFromStanza(self.output[-1]) 887 self.assertEquals(u'forbidden', exc.condition) 888 889 890 def test_syncOtherError(self): 891 """ 892 Other exceptions are logged and cause an internal-service response. 893 """ 894 class Error(Exception): 895 pass 896 897 @subprotocols.asyncObserver 898 def observer(self, element): 899 raise Error(u"oops") 900 901 element = domish.Element((None, u'message')) 902 903 observer(self, element) 904 self.assertTrue(element.handled) 905 self.assertEquals(1, len(self.output)) 906 exc = error.exceptionFromStanza(self.output[-1]) 907 self.assertEquals(u'internal-server-error', exc.condition) 908 self.assertEquals(1, len(self.flushLoggedErrors())) 909 910 911 def test_asyncError(self): 912 """ 913 Other exceptions are logged and cause an internal-service response. 914 """ 915 class Error(Exception): 916 pass 917 918 @subprotocols.asyncObserver 919 def observer(self, element): 920 return defer.fail(Error("oops")) 921 922 element = domish.Element((None, u'message')) 923 924 observer(self, element) 925 self.assertTrue(element.handled) 926 self.assertEquals(1, len(self.output)) 927 exc = error.exceptionFromStanza(self.output[-1]) 928 self.assertEquals(u'internal-server-error', exc.condition) 929 self.assertEquals(1, len(self.flushLoggedErrors())) 930 931 932 def test_errorMessage(self): 933 """ 934 If the element is an error message, observer exceptions are just logged. 935 """ 936 class Error(Exception): 937 pass 938 939 @subprotocols.asyncObserver 940 def observer(self, element): 941 raise Error("oops") 942 943 element = domish.Element((None, u'message')) 944 element[u'type'] = u'error' 945 946 observer(self, element) 947 self.assertTrue(element.handled) 948 self.assertEquals(0, len(self.output)) 949 self.assertEquals(1, len(self.flushLoggedErrors())) 950 951 952 793 953 class IQHandlerTest(unittest.TestCase): 954 """ 955 Tests for L{subprotocols.IQHandler}. 956 """ 794 957 795 958 def test_match(self): 796 959 """ 797 T est that the matching handler gets called.960 The matching handler gets called. 798 961 """ 799 800 962 class Handler(DummyIQHandler): 801 963 called = False 802 964 … … 810 972 handler.handleRequest(iq) 811 973 self.assertTrue(handler.called) 812 974 975 813 976 def test_noMatch(self): 814 977 """ 815 Test that the matching handler getscalled.978 If the element does not match the handler is not called. 816 979 """ 817 818 980 class Handler(DummyIQHandler): 819 981 called = False 820 982 … … 828 990 handler.handleRequest(iq) 829 991 self.assertFalse(handler.called) 830 992 993 831 994 def test_success(self): 832 995 """ 833 Test response when the request is handled successfully.996 If None is returned, an empty result iq is returned. 834 997 """ 835 836 998 class Handler(DummyIQHandler): 837 999 def onGet(self, iq): 838 1000 return None … … 847 1009 self.assertEquals('iq', response.name) 848 1010 self.assertEquals('result', response['type']) 849 1011 1012 850 1013 def test_successPayload(self): 851 1014 """ 852 Test response when the request is handled successfully with payload.1015 If an Element is returned it is added as the payload of the result iq. 853 1016 """ 854 855 1017 class Handler(DummyIQHandler): 856 1018 payload = domish.Element(('testns', 'foo')) 857 1019 … … 870 1032 payload = response.elements().next() 871 1033 self.assertEqual(handler.payload, payload) 872 1034 1035 873 1036 def test_successDeferred(self): 874 1037 """ 875 Test response when where the handler was a deferred.1038 A deferred result is used when fired. 876 1039 """ 877 878 1040 class Handler(DummyIQHandler): 879 1041 def onGet(self, iq): 880 1042 return defer.succeed(None) … … 889 1051 self.assertEquals('iq', response.name) 890 1052 self.assertEquals('result', response['type']) 891 1053 1054 892 1055 def test_failure(self): 893 1056 """ 894 Test response when the request is handled unsuccessfully.1057 A raised StanzaError causes an error response. 895 1058 """ 896 897 1059 class Handler(DummyIQHandler): 898 1060 def onGet(self, iq): 899 1061 raise error.StanzaError('forbidden') … … 910 1072 e = error.exceptionFromStanza(response) 911 1073 self.assertEquals('forbidden', e.condition) 912 1074 1075 913 1076 def test_failureUnknown(self): 914 1077 """ 915 Test response when the request handler raises a non-stanza-error.1078 Any Exception cause an internal-server-error response. 916 1079 """ 917 918 1080 class TestError(Exception): 919 1081 pass 920 1082 … … 935 1097 self.assertEquals('internal-server-error', e.condition) 936 1098 self.assertEquals(1, len(self.flushLoggedErrors(TestError))) 937 1099 1100 938 1101 def test_notImplemented(self): 939 1102 """ 940 Test response when the request is recognised but not implemented.1103 A NotImplementedError causes a feature-not-implemented response. 941 1104 """ 942 943 1105 class Handler(DummyIQHandler): 944 1106 def onGet(self, iq): 945 1107 raise NotImplementedError() … … 956 1118 e = error.exceptionFromStanza(response) 957 1119 self.assertEquals('feature-not-implemented', e.condition) 958 1120 1121 959 1122 def test_noHandler(self): 960 1123 """ 961 Test when the request is not recognised.1124 A missing handler causes a feature-not-implemented response. 962 1125 """ 963 964 1126 iq = domish.Element((None, 'iq')) 965 1127 iq['type'] = 'set' 966 1128 iq['id'] = 'r1'
Note: See TracBrowser
for help on using the repository browser.