# Copyright (c) Ralph Meijer.
# See LICENSE for details.
"""
Tests for L{wokkel.generic}.
"""
from twisted.trial import unittest
from twisted.words.xish import domish
from twisted.words.protocols.jabber.jid import JID
from wokkel import generic
from wokkel.test.helpers import XmlStreamStub
NS_VERSION = 'jabber:iq:version'
class VersionHandlerTest(unittest.TestCase):
"""
Tests for L{wokkel.generic.VersionHandler}.
"""
def test_onVersion(self):
"""
Test response to incoming version request.
"""
self.stub = XmlStreamStub()
self.protocol = generic.VersionHandler('Test', '0.1.0')
self.protocol.xmlstream = self.stub.xmlstream
self.protocol.send = self.stub.xmlstream.send
self.protocol.connectionInitialized()
iq = domish.Element((None, 'iq'))
iq['from'] = 'user@example.org/Home'
iq['to'] = 'example.org'
iq['type'] = 'get'
iq.addElement((NS_VERSION, 'query'))
self.stub.send(iq)
response = self.stub.output[-1]
self.assertEquals('user@example.org/Home', response['to'])
self.assertEquals('example.org', response['from'])
self.assertEquals('result', response['type'])
self.assertEquals(NS_VERSION, response.query.uri)
elements = list(domish.generateElementsQNamed(response.query.children,
'name', NS_VERSION))
self.assertEquals(1, len(elements))
self.assertEquals('Test', unicode(elements[0]))
elements = list(domish.generateElementsQNamed(response.query.children,
'version', NS_VERSION))
self.assertEquals(1, len(elements))
self.assertEquals('0.1.0', unicode(elements[0]))
class XmlPipeTest(unittest.TestCase):
"""
Tests for L{wokkel.generic.XmlPipe}.
"""
def setUp(self):
self.pipe = generic.XmlPipe()
def test_sendFromSource(self):
"""
Send an element from the source and observe it from the sink.
"""
def cb(obj):
called.append(obj)
called = []
self.pipe.sink.addObserver('/test[@xmlns="testns"]', cb)
element = domish.Element(('testns', 'test'))
self.pipe.source.send(element)
self.assertEquals([element], called)
def test_sendFromSink(self):
"""
Send an element from the sink and observe it from the source.
"""
def cb(obj):
called.append(obj)
called = []
self.pipe.source.addObserver('/test[@xmlns="testns"]', cb)
element = domish.Element(('testns', 'test'))
self.pipe.sink.send(element)
self.assertEquals([element], called)
class StanzaTest(unittest.TestCase):
"""
Tests for L{generic.Stanza}.
"""
def test_fromElement(self):
xml = """
"""
stanza = generic.Stanza.fromElement(generic.parseXml(xml))
self.assertEqual('chat', stanza.stanzaType)
self.assertEqual(JID('other@example.org'), stanza.sender)
self.assertEqual(JID('user@example.org'), stanza.recipient)
def test_fromElementChildParser(self):
"""
Child elements for which no parser is defined are ignored.
"""
xml = """
"""
class Message(generic.Stanza):
childParsers = {('http://example.org/', 'x'): '_childParser_x'}
elements = []
def _childParser_x(self, element):
self.elements.append(element)
message = Message.fromElement(generic.parseXml(xml))
self.assertEqual(1, len(message.elements))
def test_fromElementChildParserAll(self):
"""
Child elements for which no parser is defined are ignored.
"""
xml = """
"""
class Message(generic.Stanza):
childParsers = {None: '_childParser'}
elements = []
def _childParser(self, element):
self.elements.append(element)
message = Message.fromElement(generic.parseXml(xml))
self.assertEqual(1, len(message.elements))
def test_fromElementChildParserUnknown(self):
"""
Child elements for which no parser is defined are ignored.
"""
xml = """
"""
generic.Stanza.fromElement(generic.parseXml(xml))
class RequestTest(unittest.TestCase):
"""
Tests for L{generic.Request}.
"""
def setUp(self):
self.request = generic.Request()
def test_requestParser(self):
"""
The request's child element is passed to requestParser.
"""
xml = """
"""
class VersionRequest(generic.Request):
elements = []
def parseRequest(self, element):
self.elements.append((element.uri, element.name))
request = VersionRequest.fromElement(generic.parseXml(xml))
self.assertEqual([(NS_VERSION, 'query')], request.elements)
def test_toElementStanzaKind(self):
"""
A request is an iq stanza.
"""
element = self.request.toElement()
self.assertIdentical(None, element.uri)
self.assertEquals('iq', element.name)
def test_toElementStanzaType(self):
"""
The request has type 'get'.
"""
self.assertEquals('get', self.request.stanzaType)
element = self.request.toElement()
self.assertEquals('get', element.getAttribute('type'))
def test_toElementStanzaTypeSet(self):
"""
The request has type 'set'.
"""
self.request.stanzaType = 'set'
element = self.request.toElement()
self.assertEquals('set', element.getAttribute('type'))
def test_toElementStanzaID(self):
"""
A request, when rendered, has an identifier.
"""
element = self.request.toElement()
self.assertNotIdentical(None, self.request.stanzaID)
self.assertEquals(self.request.stanzaID, element.getAttribute('id'))
def test_toElementRecipient(self):
"""
A request without recipient, has no 'to' attribute.
"""
self.request = generic.Request(recipient=JID('other@example.org'))
self.assertEquals(JID('other@example.org'), self.request.recipient)
element = self.request.toElement()
self.assertEquals(u'other@example.org', element.getAttribute('to'))
def test_toElementRecipientNone(self):
"""
A request without recipient, has no 'to' attribute.
"""
element = self.request.toElement()
self.assertFalse(element.hasAttribute('to'))
def test_toElementSender(self):
"""
A request with sender, has a 'from' attribute.
"""
self.request = generic.Request(sender=JID('user@example.org'))
self.assertEquals(JID('user@example.org'), self.request.sender)
element = self.request.toElement()
self.assertEquals(u'user@example.org', element.getAttribute('from'))
def test_toElementSenderNone(self):
"""
A request without sender, has no 'from' attribute.
"""
element = self.request.toElement()
self.assertFalse(element.hasAttribute('from'))
def test_timeoutDefault(self):
"""
The default is no timeout.
"""
self.assertIdentical(None, self.request.timeout)
class PrepareIDNNameTests(unittest.TestCase):
"""
Tests for L{wokkel.generic.prepareIDNName}.
"""
def test_bytestring(self):
"""
An ASCII-encoded byte string is left as-is.
"""
name = b"example.com"
result = generic.prepareIDNName(name)
self.assertEqual(b"example.com", result)
def test_unicode(self):
"""
A unicode all-ASCII name is converted to an ASCII byte string.
"""
name = u"example.com"
result = generic.prepareIDNName(name)
self.assertEqual(b"example.com", result)
def test_unicodeNonASCII(self):
"""
A unicode with non-ASCII is converted to its ACE equivalent.
"""
name = u"\u00e9chec.example.com"
result = generic.prepareIDNName(name)
self.assertEqual(b"xn--chec-9oa.example.com", result)
def test_unicodeHalfwidthIdeographicFullStop(self):
"""
Exotic dots in unicode names are converted to Full Stop.
"""
name = u"\u00e9chec.example\uff61com"
result = generic.prepareIDNName(name)
self.assertEqual(b"xn--chec-9oa.example.com", result)
def test_unicodeTrailingDot(self):
"""
Unicode names with trailing dots retain the trailing dot.
L{encodings.idna.ToASCII} doesn't allow the empty string as the input,
hence the implementation needs to strip a trailing dot, and re-add it
after encoding the labels.
"""
name = u"example.com."
result = generic.prepareIDNName(name)
self.assertEqual(b"example.com.", result)