Changeset 35:eb020b49a77d for wokkel/component.py
- Timestamp:
- Oct 10, 2008, 5:24:28 PM (14 years ago)
- Branch:
- default
- Convert:
- svn:b33ecbfc-034c-dc11-8662-000475d9059e/trunk@90
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
wokkel/component.py
r6 r35 1 # Copyright (c) 2003-2007 Ralph Meijer 1 # -*- test-case-name: wokkel.test.test_component -*- 2 # 3 # Copyright (c) 2003-2008 Ralph Meijer 2 4 # See LICENSE for details. 3 5 … … 8 10 from twisted.application import service 9 11 from twisted.internet import reactor 10 from twisted.words.protocols.jabber import component 12 from twisted.python import log 13 from twisted.words.protocols.jabber.jid import internJID as JID 14 from twisted.words.protocols.jabber import component, error, xmlstream 11 15 from twisted.words.xish import domish 12 16 13 17 from wokkel.subprotocols import StreamManager 18 19 NS_COMPONENT_ACCEPT = 'jabber:component:accept' 14 20 15 21 class Component(StreamManager, service.Service): … … 57 63 def _getConnection(self): 58 64 return reactor.connectTCP(self.host, self.port, self.factory) 65 66 67 68 class ListenComponentAuthenticator(xmlstream.ListenAuthenticator): 69 """ 70 Authenticator for accepting components. 71 """ 72 namespace = NS_COMPONENT_ACCEPT 73 74 def __init__(self, secret): 75 self.secret = secret 76 xmlstream.ListenAuthenticator.__init__(self) 77 78 79 def associateWithStream(self, xs): 80 xs.version = (0, 0) 81 xmlstream.ListenAuthenticator.associateWithStream(self, xs) 82 83 84 def streamStarted(self, rootElement): 85 xmlstream.ListenAuthenticator.streamStarted(self, rootElement) 86 87 if rootElement.defaultUri != self.namespace: 88 exc = error.StreamError('invalid-namespace') 89 self.xmlstream.sendStreamError(exc) 90 return 91 92 # self.xmlstream.thisEntity is set to the address the component 93 # wants to assume. This should probably be checked. 94 if not self.xmlstream.thisEntity: 95 exc = error.StreamError('improper-addressing') 96 self.xmlstream.sendStreamError(exc) 97 return 98 99 self.xmlstream.sid = 'random' # FIXME 100 101 self.xmlstream.sendHeader() 102 self.xmlstream.addOnetimeObserver('/*', self.onElement) 103 104 105 def onElement(self, element): 106 if (element.uri, element.name) == (self.namespace, 'handshake'): 107 self.onHandshake(unicode(element)) 108 else: 109 exc = error.streamError('not-authorized') 110 self.xmlstream.sendStreamError(exc) 111 112 113 def onHandshake(self, handshake): 114 calculatedHash = xmlstream.hashPassword(self.xmlstream.sid, self.secret) 115 if handshake != calculatedHash: 116 exc = error.StreamError('not-authorized', text='Invalid hash') 117 self.xmlstream.sendStreamError(exc) 118 else: 119 self.xmlstream.send('<handshake/>') 120 self.xmlstream.dispatch(self.xmlstream, 121 xmlstream.STREAM_AUTHD_EVENT) 122 123 124 125 class RouterService(service.Service): 126 """ 127 XMPP Server's Router Service. 128 129 This service connects the different components of the XMPP service and 130 routes messages between them based on the given routing table. 131 132 Connected components are trusted to have correct addressing in the 133 stanzas they offer for routing. 134 135 A route destination of C{None} adds a default route. Traffic for which no 136 specific route exists, will be routed to this default route. 137 138 @ivar routes: Routes based on the host part of JIDs. Maps host names to the 139 L{EventDispatcher<utility.EventDispatcher>}s that should 140 receive the traffic. A key of C{None} means the default 141 route. 142 @type routes: C{dict} 143 """ 144 145 def __init__(self): 146 self.routes = {} 147 148 149 def addRoute(self, destination, xs): 150 """ 151 Add a new route. 152 153 The passed XML Stream C{xs} will have an observer for all stanzas 154 added to route its outgoing traffic. In turn, traffic for 155 C{destination} will be passed to this stream. 156 157 @param destination: Destination of the route to be added as a host name 158 or C{None} for the default route. 159 @type destination: C{str} or C{NoneType}. 160 @param xs: XML Stream to register the route for. 161 @type xs: L{EventDispatcher<utility.EventDispatcher>}. 162 """ 163 self.routes[destination] = xs 164 xs.addObserver('/*', self.route) 165 166 167 def removeRoute(self, destination, xs): 168 """ 169 Remove a route. 170 171 @param destination: Destination of the route that should be removed. 172 @type destination: C{str}. 173 @param xs: XML Stream to remove the route for. 174 @type xs: L{EventDispatcher<utility.EventDispatcher>}. 175 """ 176 xs.removeObserver('/*', self.route) 177 if (xs == self.routes[destination]): 178 del self.routes[destination] 179 180 181 def route(self, stanza): 182 """ 183 Route a stanza. 184 185 @param stanza: The stanza to be routed. 186 @type stanza: L{domish.Element}. 187 """ 188 if not list(stanza.elements()): 189 return 190 191 destination = JID(stanza['to']) 192 193 log.msg("Routing to %s: %r" % (destination.full(), stanza.toXml())) 194 195 if destination.host in self.routes: 196 self.routes[destination.host].send(stanza) 197 else: 198 self.routes[None].send(stanza) 199 200 201 202 class ComponentServer(service.Service): 203 """ 204 XMPP Component Server service. 205 206 This service accepts XMPP external component connections and makes 207 the router service route traffic for a component's bound domain 208 to that component. 209 """ 210 211 logTraffic = False 212 213 def __init__(self, router, port=5347, secret='secret'): 214 self.router = router 215 self.port = port 216 self.secret = secret 217 218 def authenticatorFactory(): 219 return ListenComponentAuthenticator(self.secret) 220 221 self.factory = xmlstream.XmlStreamServerFactory(authenticatorFactory) 222 self.factory.addBootstrap(xmlstream.STREAM_CONNECTED_EVENT, 223 self.makeConnection) 224 self.factory.addBootstrap(xmlstream.STREAM_AUTHD_EVENT, 225 self.connectionInitialized) 226 227 self.serial = 0 228 229 230 def startService(self): 231 service.Service.startService(self) 232 reactor.listenTCP(self.port, self.factory) 233 234 235 def makeConnection(self, xs): 236 """ 237 Called when a component connection was made. 238 239 This enables traffic debugging on incoming streams. 240 """ 241 xs.serial = self.serial 242 self.serial += 1 243 244 def logDataIn(buf): 245 log.msg("RECV (%d): %r" % (xs.serial, buf)) 246 247 def logDataOut(buf): 248 log.msg("SEND (%d): %r" % (xs.serial, buf)) 249 250 if self.logTraffic: 251 xs.rawDataInFn = logDataIn 252 xs.rawDataOutFn = logDataOut 253 254 xs.addObserver(xmlstream.STREAM_ERROR_EVENT, self.onError) 255 256 257 def connectionInitialized(self, xs): 258 """ 259 Called when a component has succesfully authenticated. 260 261 Add the component to the routing table and establish a handler 262 for a closed connection. 263 """ 264 destination = xs.thisEntity.host 265 266 self.router.addRoute(destination, xs) 267 xs.addObserver(xmlstream.STREAM_END_EVENT, self.connectionLost, 0, 268 destination, xs) 269 270 271 def onError(self, reason): 272 log.err(reason, "Stream Error") 273 274 275 def connectionLost(self, destination, xs, reason): 276 self.router.removeRoute(destination, xs)
Note: See TracChangeset
for help on using the changeset viewer.