Changeset 35:eb020b49a77d
- Timestamp:
- Oct 10, 2008, 5:24:28 PM (14 years ago)
- Branch:
- default
- Convert:
- svn:b33ecbfc-034c-dc11-8662-000475d9059e/trunk@90
- Location:
- wokkel
- Files:
-
- 1 added
- 3 edited
Legend:
- Unmodified
- Added
- Removed
-
wokkel/component.py
r6 r35 1 # Copyright (c) 2003-2007 Ralph Meijer 1 # -*- test-case-name: wokkel.test.test_component -*- 2 # 3 # Copyright (c) 2003-2008 Ralph Meijer 2 4 # See LICENSE for details. 3 5 … … 8 10 from twisted.application import service 9 11 from twisted.internet import reactor 10 from twisted.words.protocols.jabber import component 12 from twisted.python import log 13 from twisted.words.protocols.jabber.jid import internJID as JID 14 from twisted.words.protocols.jabber import component, error, xmlstream 11 15 from twisted.words.xish import domish 12 16 13 17 from wokkel.subprotocols import StreamManager 18 19 NS_COMPONENT_ACCEPT = 'jabber:component:accept' 14 20 15 21 class Component(StreamManager, service.Service): … … 57 63 def _getConnection(self): 58 64 return reactor.connectTCP(self.host, self.port, self.factory) 65 66 67 68 class ListenComponentAuthenticator(xmlstream.ListenAuthenticator): 69 """ 70 Authenticator for accepting components. 71 """ 72 namespace = NS_COMPONENT_ACCEPT 73 74 def __init__(self, secret): 75 self.secret = secret 76 xmlstream.ListenAuthenticator.__init__(self) 77 78 79 def associateWithStream(self, xs): 80 xs.version = (0, 0) 81 xmlstream.ListenAuthenticator.associateWithStream(self, xs) 82 83 84 def streamStarted(self, rootElement): 85 xmlstream.ListenAuthenticator.streamStarted(self, rootElement) 86 87 if rootElement.defaultUri != self.namespace: 88 exc = error.StreamError('invalid-namespace') 89 self.xmlstream.sendStreamError(exc) 90 return 91 92 # self.xmlstream.thisEntity is set to the address the component 93 # wants to assume. This should probably be checked. 94 if not self.xmlstream.thisEntity: 95 exc = error.StreamError('improper-addressing') 96 self.xmlstream.sendStreamError(exc) 97 return 98 99 self.xmlstream.sid = 'random' # FIXME 100 101 self.xmlstream.sendHeader() 102 self.xmlstream.addOnetimeObserver('/*', self.onElement) 103 104 105 def onElement(self, element): 106 if (element.uri, element.name) == (self.namespace, 'handshake'): 107 self.onHandshake(unicode(element)) 108 else: 109 exc = error.streamError('not-authorized') 110 self.xmlstream.sendStreamError(exc) 111 112 113 def onHandshake(self, handshake): 114 calculatedHash = xmlstream.hashPassword(self.xmlstream.sid, self.secret) 115 if handshake != calculatedHash: 116 exc = error.StreamError('not-authorized', text='Invalid hash') 117 self.xmlstream.sendStreamError(exc) 118 else: 119 self.xmlstream.send('<handshake/>') 120 self.xmlstream.dispatch(self.xmlstream, 121 xmlstream.STREAM_AUTHD_EVENT) 122 123 124 125 class RouterService(service.Service): 126 """ 127 XMPP Server's Router Service. 128 129 This service connects the different components of the XMPP service and 130 routes messages between them based on the given routing table. 131 132 Connected components are trusted to have correct addressing in the 133 stanzas they offer for routing. 134 135 A route destination of C{None} adds a default route. Traffic for which no 136 specific route exists, will be routed to this default route. 137 138 @ivar routes: Routes based on the host part of JIDs. Maps host names to the 139 L{EventDispatcher<utility.EventDispatcher>}s that should 140 receive the traffic. A key of C{None} means the default 141 route. 142 @type routes: C{dict} 143 """ 144 145 def __init__(self): 146 self.routes = {} 147 148 149 def addRoute(self, destination, xs): 150 """ 151 Add a new route. 152 153 The passed XML Stream C{xs} will have an observer for all stanzas 154 added to route its outgoing traffic. In turn, traffic for 155 C{destination} will be passed to this stream. 156 157 @param destination: Destination of the route to be added as a host name 158 or C{None} for the default route. 159 @type destination: C{str} or C{NoneType}. 160 @param xs: XML Stream to register the route for. 161 @type xs: L{EventDispatcher<utility.EventDispatcher>}. 162 """ 163 self.routes[destination] = xs 164 xs.addObserver('/*', self.route) 165 166 167 def removeRoute(self, destination, xs): 168 """ 169 Remove a route. 170 171 @param destination: Destination of the route that should be removed. 172 @type destination: C{str}. 173 @param xs: XML Stream to remove the route for. 174 @type xs: L{EventDispatcher<utility.EventDispatcher>}. 175 """ 176 xs.removeObserver('/*', self.route) 177 if (xs == self.routes[destination]): 178 del self.routes[destination] 179 180 181 def route(self, stanza): 182 """ 183 Route a stanza. 184 185 @param stanza: The stanza to be routed. 186 @type stanza: L{domish.Element}. 187 """ 188 if not list(stanza.elements()): 189 return 190 191 destination = JID(stanza['to']) 192 193 log.msg("Routing to %s: %r" % (destination.full(), stanza.toXml())) 194 195 if destination.host in self.routes: 196 self.routes[destination.host].send(stanza) 197 else: 198 self.routes[None].send(stanza) 199 200 201 202 class ComponentServer(service.Service): 203 """ 204 XMPP Component Server service. 205 206 This service accepts XMPP external component connections and makes 207 the router service route traffic for a component's bound domain 208 to that component. 209 """ 210 211 logTraffic = False 212 213 def __init__(self, router, port=5347, secret='secret'): 214 self.router = router 215 self.port = port 216 self.secret = secret 217 218 def authenticatorFactory(): 219 return ListenComponentAuthenticator(self.secret) 220 221 self.factory = xmlstream.XmlStreamServerFactory(authenticatorFactory) 222 self.factory.addBootstrap(xmlstream.STREAM_CONNECTED_EVENT, 223 self.makeConnection) 224 self.factory.addBootstrap(xmlstream.STREAM_AUTHD_EVENT, 225 self.connectionInitialized) 226 227 self.serial = 0 228 229 230 def startService(self): 231 service.Service.startService(self) 232 reactor.listenTCP(self.port, self.factory) 233 234 235 def makeConnection(self, xs): 236 """ 237 Called when a component connection was made. 238 239 This enables traffic debugging on incoming streams. 240 """ 241 xs.serial = self.serial 242 self.serial += 1 243 244 def logDataIn(buf): 245 log.msg("RECV (%d): %r" % (xs.serial, buf)) 246 247 def logDataOut(buf): 248 log.msg("SEND (%d): %r" % (xs.serial, buf)) 249 250 if self.logTraffic: 251 xs.rawDataInFn = logDataIn 252 xs.rawDataOutFn = logDataOut 253 254 xs.addObserver(xmlstream.STREAM_ERROR_EVENT, self.onError) 255 256 257 def connectionInitialized(self, xs): 258 """ 259 Called when a component has succesfully authenticated. 260 261 Add the component to the routing table and establish a handler 262 for a closed connection. 263 """ 264 destination = xs.thisEntity.host 265 266 self.router.addRoute(destination, xs) 267 xs.addObserver(xmlstream.STREAM_END_EVENT, self.connectionLost, 0, 268 destination, xs) 269 270 271 def onError(self, reason): 272 log.err(reason, "Stream Error") 273 274 275 def connectionLost(self, destination, xs, reason): 276 self.router.removeRoute(destination, xs) -
wokkel/generic.py
r30 r35 13 13 from twisted.words.protocols.jabber import error 14 14 from twisted.words.protocols.jabber.xmlstream import toResponse 15 from twisted.words.xish import domish 15 from twisted.words.xish import domish, utility 16 16 17 17 from wokkel import disco … … 121 121 def getDiscoItems(self, requestor, target, node): 122 122 return defer.succeed([]) 123 124 125 126 class XmlPipe(object): 127 """ 128 XML stream pipe. 129 130 Connects two objects that communicate stanzas through an XML stream like 131 interface. Each of the ends of the pipe (sink and source) can be used to 132 send XML stanzas to the other side, or add observers to process XML stanzas 133 that were sent from the other side. 134 135 XML pipes are usually used in place of regular XML streams that are 136 transported over TCP. This is the reason for the use of the names source 137 and sink for both ends of the pipe. The source side corresponds with the 138 entity that initiated the TCP connection, whereas the sink corresponds with 139 the entity that accepts that connection. In this object, though, the source 140 and sink are treated equally. 141 142 Unlike Jabber 143 L{XmlStream<twisted.words.protocols.jabber.xmlstream.XmlStream>}s, the sink 144 and source objects are assumed to represent an eternal connected and 145 initialized XML stream. As such, events corresponding to connection, 146 disconnection, initialization and stream errors are not dispatched or 147 processed. 148 149 @ivar source: Source XML stream. 150 @ivar sink: Sink XML stream. 151 """ 152 153 def __init__(self): 154 self.source = utility.EventDispatcher() 155 self.sink = utility.EventDispatcher() 156 self.source.send = lambda obj: self.sink.dispatch(obj) 157 self.sink.send = lambda obj: self.source.dispatch(obj) -
wokkel/test/test_generic.py
r20 r35 49 49 self.assertEquals(1, len(elements)) 50 50 self.assertEquals('0.1.0', unicode(elements[0])) 51 52 53 54 class XmlPipeTest(unittest.TestCase): 55 """ 56 Tests for L{wokkel.generic.XmlPipe}. 57 """ 58 59 def setUp(self): 60 self.pipe = generic.XmlPipe() 61 62 63 def test_sendFromSource(self): 64 """ 65 Send an element from the source and observe it from the sink. 66 """ 67 def cb(obj): 68 called.append(obj) 69 70 called = [] 71 self.pipe.sink.addObserver('/test[@xmlns="testns"]', cb) 72 element = domish.Element(('testns', 'test')) 73 self.pipe.source.send(element) 74 self.assertEquals([element], called) 75 76 77 def test_sendFromSink(self): 78 """ 79 Send an element from the sink and observe it from the source. 80 """ 81 def cb(obj): 82 called.append(obj) 83 84 called = [] 85 self.pipe.source.addObserver('/test[@xmlns="testns"]', cb) 86 element = domish.Element(('testns', 'test')) 87 self.pipe.sink.send(element) 88 self.assertEquals([element], called)
Note: See TracChangeset
for help on using the changeset viewer.