source:
ralphm-patches/listening-authenticator-stream-features.patch
@
66:b713f442b222
Last change on this file since 66:b713f442b222 was 66:b713f442b222, checked in by Ralph Meijer <ralphm@…>, 10 years ago | |
---|---|
File size: 24.5 KB |
-
wokkel/generic.py
# HG changeset patch # Parent 9393ad83138bbe6ab1c5249a6a115b8e56144622 Add FeatureListeningAuthenticator. This new authenticator is for incoming streams and uses initializers for stream negotiation, similar to inializers for clients. diff --git a/wokkel/generic.py b/wokkel/generic.py
a b 10 10 from zope.interface import implements 11 11 12 12 from twisted.internet import defer, protocol 13 from twisted.python import reflect13 from twisted.python import log, reflect 14 14 from twisted.words.protocols.jabber import error, jid, xmlstream 15 15 from twisted.words.protocols.jabber.xmlstream import toResponse 16 16 from twisted.words.xish import domish, utility 17 17 from twisted.words.xish.xmlstream import BootstrapMixin 18 18 19 from wokkel.iwokkel import IDisco 19 from wokkel.iwokkel import IDisco, IReceivingInitializer 20 20 from wokkel.subprotocols import XMPPHandler 21 21 22 22 IQ_GET = '/iq[@type="get"]' … … 25 25 NS_VERSION = 'jabber:iq:version' 26 26 VERSION = IQ_GET + '/query[@xmlns="' + NS_VERSION + '"]' 27 27 28 XPATH_ALL = "/*" 29 28 30 def parseXml(string): 29 31 """ 30 32 Parse serialized XML into a DOM structure. … … 327 329 328 330 def clientConnectionFailed(self, connector, reason): 329 331 self.deferred.errback(reason) 332 333 334 335 class TestableXmlStream(xmlstream.XmlStream): 336 """ 337 XML Stream that buffers outgoing data and catches special events. 338 339 This implementation overrides relevant methods to prevent any data 340 to be sent out on a transport. Instead it buffers all outgoing stanzas, 341 sets flags instead of sending the stream header and footer and logs stream 342 errors so it can be caught by a logging observer. 343 344 @ivar output: Sequence of objects sent out using L{send}. Usually these are 345 L{domish.Element} instances. 346 @type output: C{list} 347 348 @ivar headerSent: Flag set when a stream header would have been sent. When 349 a stream restart occurs through L{reset}, this flag is reset as well. 350 @type headerSent: C{bool} 351 352 @ivar footerSent: Flag set when a stream footer would have been sent 353 explicitly. Note that it is not set when a stream error is sent 354 using L{sendStreamError}. 355 @type footerSent: C{bool} 356 """ 357 358 359 def __init__(self, authenticator): 360 xmlstream.XmlStream.__init__(self, authenticator) 361 self.headerSent = False 362 self.footerSent = False 363 self.output = [] 364 365 366 def reset(self): 367 xmlstream.XmlStream.reset(self) 368 self.headerSent = False 369 370 371 def sendHeader(self): 372 self.headerSent = True 373 374 375 def sendFooter(self): 376 self.footerSent = True 377 378 379 def sendStreamError(self, streamError): 380 """ 381 Log a stream error. 382 383 If this is called from a Twisted Trial test case, the stream error 384 will be observed by the Trial logging observer. If it is not explicitly 385 tested for (i.e. flushed), this will cause the test case to 386 automatically fail. See L{assertStreamError} for a convenience method 387 to test for stream errors. 388 389 @type streamError: L{error.StreamError} 390 """ 391 log.err(streamError) 392 393 394 @staticmethod 395 def assertStreamError(testcase, condition=None, exc=None): 396 """ 397 Check if a stream error was sent out. 398 399 To check for stream errors sent out by L{sendStreamError}, this method 400 will flush logged stream errors and inspect the last one. If 401 C{condition} was passed, the logged error is asserted to match that 402 condition. If C{exc} was passed, the logged error is asserted to be 403 identical to it. 404 405 Note that this is takes the calling test case as the first argument, to 406 be able to hook into its methods for flushing errors and making 407 assertions. 408 409 @param testcase: The test case instance that is calling this method. 410 @type testcase: {twisted.trial.unittest.TestCase} 411 412 @param condition: The optional stream error condition to match against. 413 @type condition: C{unicode}. 414 415 @param exc: The optional stream error to check identity against. 416 @type exc: L{error.StreamError} 417 """ 418 419 loggedErrors = testcase.flushLoggedErrors(error.StreamError) 420 testcase.assertTrue(loggedErrors, "No stream error was sent") 421 streamError = loggedErrors[-1].value 422 if condition: 423 testcase.assertEqual(condition, streamError.condition) 424 elif exc: 425 testcase.assertIdentical(exc, streamError) 426 427 428 def send(self, obj): 429 """ 430 Buffer all outgoing stanzas. 431 432 @type obj: L{domish.Element} 433 """ 434 self.output.append(obj) 435 436 437 438 class BaseReceivingInitializer(object): 439 """ 440 Base stream initializer for receiving entities. 441 """ 442 implements(IReceivingInitializer) 443 444 required = False 445 446 def __init__(self, name, xs): 447 self.name = name 448 self.xmlstream = xs 449 self.deferred = defer.Deferred() 450 451 452 def getFeatures(self): 453 raise NotImplementedError() 454 455 456 def initialize(self): 457 return self.deferred 458 459 460 461 class FeatureListenAuthenticator(xmlstream.ListenAuthenticator): 462 """ 463 Authenticator for receiving entities with support for initializers. 464 """ 465 466 def __init__(self): 467 self.completedInitializers = [] 468 469 470 def _onElementFallback(self, element): 471 """ 472 Fallback observer that rejects XML Stanzas. 473 474 This observer is active while stream feature negotiation has not yet 475 completed. 476 """ 477 # ignore elements that are not XML Stanzas 478 if (element.uri not in (self.namespace) or 479 element.name not in ('iq', 'message', 'presence')): 480 return 481 482 # ignore elements that have already been handled 483 if element.handled: 484 return 485 486 exc = error.StreamError('not-authorized') 487 self.xmlstream.sendStreamError(exc) 488 489 490 def connectionMade(self): 491 """ 492 Called when the connection has been made. 493 494 Adds an observer to reject XML Stanzas until stream feature negotiation 495 has completed. 496 """ 497 xmlstream.ListenAuthenticator.connectionMade(self) 498 self.xmlstream.addObserver(XPATH_ALL, self._onElementFallback, -1) 499 500 501 def _cbInit(self, result): 502 """ 503 Mark the initializer as completed and continue to the next. 504 """ 505 result, index = result 506 self.completedInitializers.append(self._initializers[index].name) 507 del self._initializers[index] 508 509 if result is xmlstream.Reset: 510 # The initializer initiated a stream restart, bail. 511 return 512 else: 513 self._initializeStream() 514 515 516 def _ebInit(self, failure): 517 """ 518 Called when an initializer raises an exception. 519 520 If the exception is a L{error.StreamError} it is sent out, otherwise 521 the error is logged and a stream error with condition 522 C{'internal-server-error'} is sent out instead. 523 """ 524 firstError = failure.value 525 subFailure = firstError.subFailure 526 if subFailure.check(error.StreamError): 527 exc = subFailure.value 528 else: 529 log.err(subFailure, index=firstError.index) 530 exc = error.StreamError('internal-server-error') 531 532 self.xmlstream.sendStreamError(exc) 533 534 535 def _initializeStream(self): 536 """ 537 Initialize the stream. 538 539 This walks all initializers to retrieve their features and determine 540 if there is at least one required initializer. If not, the stream is 541 ready for the exchange of stanzas. The features are sent out and each 542 initializer will have its C{initialize} method called. 543 544 If negation has completed, L{xmlstream.STREAM_AUTHD_EVENT} is 545 dispatched and the observer rejecting incoming stanzas is removed. 546 """ 547 features = domish.Element((xmlstream.NS_STREAMS, 'features')) 548 ds = [] 549 required = False 550 for initializer in self._initializers: 551 required = required or initializer.required 552 for feature in initializer.getFeatures(): 553 features.addChild(feature) 554 d = initializer.initialize() 555 ds.append(d) 556 557 self.xmlstream.send(features) 558 559 if not required: 560 # There are no required initializers anymore. This stream is 561 # now ready for the exchange of stanzas. 562 self.xmlstream.removeObserver(XPATH_ALL, self._onElementFallback) 563 self.xmlstream.dispatch(self.xmlstream, xmlstream.STREAM_AUTHD_EVENT) 564 565 if ds: 566 d = defer.DeferredList(ds, fireOnOneCallback=True, 567 fireOnOneErrback=True, 568 consumeErrors=True) 569 d.addCallbacks(self._cbInit, self._ebInit) 570 571 572 def getInitializers(self): 573 """ 574 Get the initializers for the current stage of stream negotiation. 575 576 This will be called at the start of each stream start to retrieve 577 the initializers for which the features are advertised and initiated. 578 579 @rtype: C{list} of C{IReceivingInitializer} instances. 580 """ 581 raise NotImplementedError() 582 583 584 def checkStream(self): 585 """ 586 Check the stream before sending out a stream header and initialization. 587 588 This is the place to inspect the stream properties and raise a relevant 589 L{error.StreamError} if needed. 590 """ 591 # Check stream namespace 592 if self.xmlstream.namespace != self.namespace: 593 self.xmlstream.namespace = self.namespace 594 raise error.StreamError('invalid-namespace') 595 596 597 def streamStarted(self, rootElement): 598 """ 599 Called when the stream header has been received. 600 601 Check the stream properties through L{checkStream}, send out 602 a stream header, and retrieve and initialize the stream initializers. 603 """ 604 xmlstream.ListenAuthenticator.streamStarted(self, rootElement) 605 606 try: 607 self.checkStream() 608 except error.StreamError, exc: 609 self.xmlstream.sendStreamError(exc) 610 return 611 612 self.xmlstream.sendHeader() 613 614 self._initializers = self.getInitializers() 615 self._initializeStream() -
wokkel/iwokkel.py
diff --git a/wokkel/iwokkel.py b/wokkel/iwokkel.py
a b 11 11 'IPubSubClient', 'IPubSubService', 'IPubSubResource', 12 12 'IMUCClient', 'IMUCStatuses'] 13 13 14 from zope.interface import Interface14 from zope.interface import Attribute, Interface 15 15 from twisted.python.deprecate import deprecatedModuleAttribute 16 16 from twisted.python.versions import Version 17 17 from twisted.words.protocols.jabber.ijabber import IXMPPHandler … … 982 982 """ 983 983 Return the number of status conditions. 984 984 """ 985 986 987 988 class IReceivingInitializer(Interface): 989 """ 990 Interface for XMPP stream initializers for receiving entities. 991 """ 992 993 required = Attribute( 994 """ 995 This initializer is required to complete feature negotiation. 996 """) 997 name = Attribute( 998 """ 999 Identifier for this initializer. 1000 1001 This identifier is included in 1002 L{wokkel.generic.FeatureListenAuthenticator} when an initializer has 1003 completed. 1004 """) 1005 xmlstream = Attribute( 1006 """ 1007 The XML Stream. 1008 """) 1009 deferred = Attribute( 1010 """ 1011 The deferred returned from initialize. 1012 """) 1013 1014 1015 def getFeatures(): 1016 """ 1017 Get stream features for this initializer. 1018 1019 @rtype: C{list} of L{twisted.words.xish.domish.Element} 1020 """ 1021 1022 1023 def initialize(): 1024 """ 1025 Initialize the initializer. 1026 1027 This is where observers for feature negotiation are set up. When 1028 the returned deferred fires, it is assumed to have completed. 1029 1030 @rtype: L{twisted.internet.defer.Deferred} 1031 """ -
wokkel/test/test_generic.py
diff --git a/wokkel/test/test_generic.py b/wokkel/test/test_generic.py
a b 5 5 Tests for L{wokkel.generic}. 6 6 """ 7 7 8 from zope.interface import verify 9 10 from twisted.internet import defer 11 from twisted.test import proto_helpers 8 12 from twisted.trial import unittest 9 13 from twisted.words.xish import domish 10 14 from twisted.words.protocols.jabber.jid import JID 15 from twisted.words.protocols.jabber import error, xmlstream 11 16 12 from wokkel import generic 17 from wokkel import generic, iwokkel 13 18 from wokkel.test.helpers import XmlStreamStub 14 19 15 20 NS_VERSION = 'jabber:iq:version' … … 268 273 The default is no timeout. 269 274 """ 270 275 self.assertIdentical(None, self.request.timeout) 276 277 278 279 class BaseReceivingInitializerTest(unittest.TestCase): 280 """ 281 Tests for L{generic.BaseReceivingInitializer}. 282 """ 283 284 def setUp(self): 285 self.init = generic.BaseReceivingInitializer('init', None) 286 287 288 def test_interface(self): 289 verify.verifyObject(iwokkel.IReceivingInitializer, self.init) 290 291 292 def test_getFeatures(self): 293 self.assertRaises(NotImplementedError, self.init.getFeatures) 294 295 296 def test_initialize(self): 297 d = self.init.initialize() 298 self.init.deferred.callback(None) 299 return d 300 301 302 303 class TestableReceivingInitializer(generic.BaseReceivingInitializer): 304 """ 305 Testable initializer for receiving entities. 306 307 This initializer advertises support for a stream feature denoted by 308 C{uri} and C{name}. Its C{deferred} should be fired to complete 309 initialization for this initializer. 310 311 @ivar uri: Namespace of the stream feature. 312 @ivar localname: Element localname for the stream feature. 313 """ 314 required = True 315 316 def __init__(self, name, xs, uri, localname): 317 generic.BaseReceivingInitializer.__init__(self, name, xs) 318 self.uri = uri 319 self.localname = localname 320 self.deferred = defer.Deferred() 321 322 323 def getFeatures(self): 324 return [domish.Element((self.uri, self.localname))] 325 326 327 328 class FeatureListenAuthenticatorTest(unittest.TestCase): 329 """ 330 Tests for L{generic.FeatureListenAuthenticator}. 331 """ 332 333 def setUp(self): 334 self.gotAuthenticated = False 335 self.initFailure = None 336 self.authenticator = generic.FeatureListenAuthenticator() 337 self.authenticator.namespace = 'jabber:server' 338 self.xmlstream = generic.TestableXmlStream(self.authenticator) 339 self.xmlstream.addObserver('//event/stream/authd', 340 self.onAuthenticated) 341 self.xmlstream.addObserver('//event/xmpp/initfailed', 342 self.onInitFailed) 343 344 self.init = TestableReceivingInitializer('init', self.xmlstream, 345 'testns', 'test') 346 347 def getInitializers(): 348 return [self.init] 349 350 self.authenticator.getInitializers = getInitializers 351 352 353 def onAuthenticated(self, obj): 354 self.gotAuthenticated = True 355 356 357 def onInitFailed(self, failure): 358 self.initFailure = failure 359 360 361 def test_getInitializers(self): 362 """ 363 Unoverridden getInitializers raises NotImplementedError. 364 """ 365 authenticator = generic.FeatureListenAuthenticator() 366 self.assertRaises( 367 NotImplementedError, 368 authenticator.getInitializers) 369 370 371 def test_streamStarted(self): 372 """ 373 Upon stream start, stream initializers are set up. 374 375 The method getInitializers will determine the available stream 376 initializers given the current state of stream initialization. 377 hen, each of the returned initializers will be called to set 378 themselves up. 379 """ 380 xs = self.xmlstream 381 xs.makeConnection(proto_helpers.StringTransport()) 382 xs.dataReceived("<stream:stream xmlns='jabber:server' " 383 "xmlns:stream='http://etherx.jabber.org/streams' " 384 "from='example.com' to='example.org' id='12345' " 385 "version='1.0'>") 386 387 self.assertTrue(xs.headerSent) 388 389 # Check if features were sent 390 features = xs.output[-1] 391 self.assertEquals(xmlstream.NS_STREAMS, features.uri) 392 self.assertEquals('features', features.name) 393 feature = features.elements().next() 394 self.assertEqual('testns', feature.uri) 395 self.assertEqual('test', feature.name) 396 397 self.assertFalse(self.gotAuthenticated) 398 399 self.init.deferred.callback(None) 400 self.assertTrue(self.gotAuthenticated) 401 402 403 def test_streamStartedStreamError(self): 404 """ 405 A stream error raised by the initializer is sent out. 406 """ 407 xs = self.xmlstream 408 xs.makeConnection(proto_helpers.StringTransport()) 409 xs.dataReceived("<stream:stream xmlns='jabber:server' " 410 "xmlns:stream='http://etherx.jabber.org/streams' " 411 "from='example.com' to='example.org' id='12345' " 412 "version='1.0'>") 413 414 self.assertTrue(xs.headerSent) 415 416 xs.output = [] 417 exc = error.StreamError('policy-violation') 418 self.init.deferred.errback(exc) 419 420 self.xmlstream.assertStreamError(self, exc=exc) 421 self.assertFalse(xs.output) 422 self.assertFalse(self.gotAuthenticated) 423 424 425 def test_streamStartedOtherError(self): 426 """ 427 Initializer exceptions are logged and yield a internal-server-error. 428 """ 429 xs = self.xmlstream 430 xs.makeConnection(proto_helpers.StringTransport()) 431 xs.dataReceived("<stream:stream xmlns='jabber:server' " 432 "xmlns:stream='http://etherx.jabber.org/streams' " 433 "from='example.com' to='example.org' id='12345' " 434 "version='1.0'>") 435 436 self.assertTrue(xs.headerSent) 437 438 xs.output = [] 439 class Error(Exception): 440 pass 441 self.init.deferred.errback(Error()) 442 443 self.xmlstream.assertStreamError(self, condition='internal-server-error') 444 self.assertFalse(xs.output) 445 self.assertFalse(self.gotAuthenticated) 446 self.assertEqual(1, len(self.flushLoggedErrors(Error))) 447 448 449 def test_streamStartedInitializerCompleted(self): 450 """ 451 Succesfully finished initializers are recorded. 452 """ 453 xs = self.xmlstream 454 xs.makeConnection(proto_helpers.StringTransport()) 455 xs.dataReceived("<stream:stream xmlns='jabber:server' " 456 "xmlns:stream='http://etherx.jabber.org/streams' " 457 "from='example.com' to='example.org' id='12345' " 458 "version='1.0'>") 459 460 xs.output = [] 461 self.init.deferred.callback(None) 462 self.assertEqual(['init'], self.authenticator.completedInitializers) 463 464 465 def test_streamStartedInitializerCompletedFeatures(self): 466 """ 467 After completing an initializer, stream features are sent again. 468 469 In this case, with only one initializer, there are no more features. 470 """ 471 xs = self.xmlstream 472 xs.makeConnection(proto_helpers.StringTransport()) 473 xs.dataReceived("<stream:stream xmlns='jabber:server' " 474 "xmlns:stream='http://etherx.jabber.org/streams' " 475 "from='example.com' to='example.org' id='12345' " 476 "version='1.0'>") 477 478 xs.output = [] 479 self.init.deferred.callback(None) 480 481 self.assertEqual(1, len(xs.output)) 482 features = xs.output[-1] 483 self.assertEqual('features', features.name) 484 self.assertEqual(xmlstream.NS_STREAMS, features.uri) 485 self.assertFalse(features.children) 486 487 488 def test_streamStartedInitializerCompletedReset(self): 489 """ 490 If an initializer completes with Reset, no features are sent. 491 """ 492 xs = self.xmlstream 493 xs.makeConnection(proto_helpers.StringTransport()) 494 xs.dataReceived("<stream:stream xmlns='jabber:server' " 495 "xmlns:stream='http://etherx.jabber.org/streams' " 496 "from='example.com' to='example.org' id='12345' " 497 "version='1.0'>") 498 499 xs.output = [] 500 self.init.deferred.callback(xmlstream.Reset) 501 502 self.assertEqual(0, len(xs.output)) 503 504 505 def test_streamStartedXmlStanzasRejected(self): 506 """ 507 XML Stanzas may not be sent before feature negotiation has completed. 508 """ 509 xs = self.xmlstream 510 xs.makeConnection(proto_helpers.StringTransport()) 511 xs.dataReceived("<stream:stream xmlns='jabber:server' " 512 "xmlns:stream='http://etherx.jabber.org/streams' " 513 "from='example.com' to='example.org' id='12345' " 514 "version='1.0'>") 515 516 xs.dataReceived("<iq to='example.org' from='example.com' type='set'>" 517 " <query xmlns='jabber:iq:version'/>" 518 "</iq>") 519 520 self.xmlstream.assertStreamError(self, condition='not-authorized') 521 522 523 def test_streamStartedCompleteXmlStanzasAllowed(self): 524 """ 525 XML Stanzas may sent after feature negotiation has completed. 526 """ 527 xs = self.xmlstream 528 xs.makeConnection(proto_helpers.StringTransport()) 529 xs.dataReceived("<stream:stream xmlns='jabber:server' " 530 "xmlns:stream='http://etherx.jabber.org/streams' " 531 "from='example.com' to='example.org' id='12345' " 532 "version='1.0'>") 533 534 self.init.deferred.callback(None) 535 536 xs.dataReceived("<iq to='example.org' from='example.com' type='set'>" 537 " <query xmlns='jabber:iq:version'/>" 538 "</iq>") 539 540 541 def test_streamStartedXmlStanzasHandledIgnored(self): 542 """ 543 XML Stanzas that have already been handled are ignored. 544 """ 545 xs = self.xmlstream 546 xs.makeConnection(proto_helpers.StringTransport()) 547 xs.dataReceived("<stream:stream xmlns='jabber:server' " 548 "xmlns:stream='http://etherx.jabber.org/streams' " 549 "from='example.com' to='example.org' id='12345' " 550 "version='1.0'>") 551 552 iq = generic.parseXml("<iq to='example.org' from='example.com' type='set'>" 553 " <query xmlns='jabber:iq:version'/>" 554 "</iq>") 555 iq.handled = True 556 xs.dispatch(iq) 557 558 559 def test_streamStartedNonXmlStanzasIgnored(self): 560 """ 561 Elements that are not XML Stranzas are not rejected. 562 """ 563 xs = self.xmlstream 564 xs.makeConnection(proto_helpers.StringTransport()) 565 xs.dataReceived("<stream:stream xmlns='jabber:server' " 566 "xmlns:stream='http://etherx.jabber.org/streams' " 567 "from='example.com' to='example.org' id='12345' " 568 "version='1.0'>") 569 570 xs.dataReceived("<test xmlns='myns'/>") 571 572 573 def test_streamStartedCheckStream(self): 574 """ 575 Stream errors raised by checkStream are sent out. 576 """ 577 def checkStream(): 578 raise error.StreamError('undefined-condition') 579 580 self.authenticator.checkStream = checkStream 581 xs = self.xmlstream 582 xs.makeConnection(proto_helpers.StringTransport()) 583 xs.dataReceived("<stream:stream xmlns='jabber:server' " 584 "xmlns:stream='http://etherx.jabber.org/streams' " 585 "from='example.com' to='example.org' id='12345' " 586 "version='1.0'>") 587 588 self.xmlstream.assertStreamError(self, condition='undefined-condition') 589 self.assertFalse(xs.headerSent) 590 591 592 def test_checkStreamNamespace(self): 593 """ 594 The stream namespace must match the pre-defined stream namespace. 595 """ 596 xs = self.xmlstream 597 xs.makeConnection(proto_helpers.StringTransport()) 598 xs.dataReceived("<stream:stream xmlns='jabber:client' " 599 "xmlns:stream='http://etherx.jabber.org/streams' " 600 "from='example.com' to='example.org' id='12345' " 601 "version='1.0'>") 602 603 self.xmlstream.assertStreamError(self, condition='invalid-namespace')
Note: See TracBrowser
for help on using the repository browser.