[55] | 1 | # -*- test-case-name: wokkel.test.test_server -*- |
---|
| 2 | # |
---|
| 3 | # Copyright (c) 2003-2008 Ralph Meijer |
---|
| 4 | # See LICENSE for details. |
---|
| 5 | |
---|
| 6 | """ |
---|
| 7 | XMPP Server-to-Server protocol. |
---|
| 8 | |
---|
| 9 | This module implements several aspects of XMPP server-to-server communications |
---|
| 10 | as described in XMPP Core (RFC 3920). Refer to that document for the meaning |
---|
| 11 | of the used terminology. |
---|
| 12 | """ |
---|
| 13 | |
---|
| 14 | # hashlib is new in Python 2.5, try that first. |
---|
| 15 | try: |
---|
| 16 | from hashlib import sha256 |
---|
| 17 | digestmod = sha256 |
---|
| 18 | except ImportError: |
---|
| 19 | import Crypto.Hash.SHA256 as digestmod |
---|
| 20 | sha256 = digestmod.new |
---|
| 21 | |
---|
| 22 | import hmac |
---|
| 23 | |
---|
| 24 | from zope.interface import implements |
---|
| 25 | |
---|
| 26 | from twisted.application import service |
---|
| 27 | from twisted.internet import defer, reactor |
---|
| 28 | from twisted.names.srvconnect import SRVConnector |
---|
| 29 | from twisted.python import log, randbytes |
---|
| 30 | from twisted.words.protocols.jabber import error, ijabber, jid, xmlstream |
---|
| 31 | from twisted.words.xish import domish |
---|
| 32 | |
---|
| 33 | from wokkel.generic import DeferredXmlStreamFactory, XmlPipe |
---|
| 34 | from wokkel.compat import XmlStreamServerFactory |
---|
| 35 | |
---|
| 36 | NS_DIALBACK = 'jabber:server:dialback' |
---|
| 37 | |
---|
| 38 | def generateKey(secret, receivingServer, originatingServer, streamID): |
---|
| 39 | """ |
---|
| 40 | Generate a dialback key for server-to-server XMPP Streams. |
---|
| 41 | |
---|
| 42 | The dialback key is generated using the algorithm described in |
---|
| 43 | U{XEP-0185<http://www.xmpp.org/extensions/xep-0185.html>}. The used |
---|
| 44 | terminology for the parameters is described in RFC-3920. |
---|
| 45 | |
---|
| 46 | @param secret: the shared secret known to the Originating Server and |
---|
| 47 | Authoritive Server. |
---|
| 48 | @type secret: C{str} |
---|
| 49 | @param receivingServer: the Receiving Server host name. |
---|
| 50 | @type receivingServer: C{str} |
---|
| 51 | @param originatingServer: the Originating Server host name. |
---|
| 52 | @type originatingServer: C{str} |
---|
| 53 | @param streamID: the Stream ID as generated by the Receiving Server. |
---|
| 54 | @type streamID: C{str} |
---|
| 55 | @return: hexadecimal digest of the generated key. |
---|
| 56 | @type: C{str} |
---|
| 57 | """ |
---|
| 58 | |
---|
| 59 | hashObject = sha256() |
---|
| 60 | hashObject.update(secret) |
---|
| 61 | hashedSecret = hashObject.hexdigest() |
---|
| 62 | message = " ".join([receivingServer, originatingServer, streamID]) |
---|
| 63 | hash = hmac.HMAC(hashedSecret, message, digestmod=digestmod) |
---|
| 64 | return hash.hexdigest() |
---|
| 65 | |
---|
| 66 | |
---|
| 67 | def trapStreamError(xs, observer): |
---|
| 68 | """ |
---|
| 69 | Trap stream errors. |
---|
| 70 | |
---|
| 71 | This wraps an observer to catch exceptions. In case of a |
---|
| 72 | L{error.StreamError}, it is send over the given XML stream. All other |
---|
| 73 | exceptions yield a C{'internal-server-error'} stream error, that is |
---|
| 74 | sent over the stream, while the exception is logged. |
---|
| 75 | |
---|
| 76 | @return: Wrapped observer |
---|
| 77 | """ |
---|
| 78 | |
---|
| 79 | def wrappedObserver(element): |
---|
| 80 | try: |
---|
| 81 | observer(element) |
---|
| 82 | except error.StreamError, exc: |
---|
| 83 | xs.sendStreamError(exc) |
---|
| 84 | except: |
---|
| 85 | log.err() |
---|
| 86 | exc = error.StreamError('internal-server-error') |
---|
| 87 | xs.sendStreamError(exc) |
---|
| 88 | |
---|
| 89 | return wrappedObserver |
---|
| 90 | |
---|
| 91 | |
---|
| 92 | class XMPPServerConnector(SRVConnector): |
---|
| 93 | def __init__(self, reactor, domain, factory): |
---|
| 94 | SRVConnector.__init__(self, reactor, 'xmpp-server', domain, factory) |
---|
| 95 | |
---|
| 96 | |
---|
| 97 | def pickServer(self): |
---|
| 98 | host, port = SRVConnector.pickServer(self) |
---|
| 99 | |
---|
| 100 | if not self.servers and not self.orderedServers: |
---|
| 101 | # no SRV record, fall back.. |
---|
| 102 | port = 5269 |
---|
| 103 | |
---|
| 104 | return host, port |
---|
| 105 | |
---|
| 106 | |
---|
| 107 | class DialbackFailed(Exception): |
---|
| 108 | pass |
---|
| 109 | |
---|
| 110 | |
---|
| 111 | |
---|
| 112 | class OriginatingDialbackInitializer(object): |
---|
| 113 | """ |
---|
| 114 | Server Dialback Initializer for the Orginating Server. |
---|
| 115 | """ |
---|
| 116 | |
---|
| 117 | implements(ijabber.IInitiatingInitializer) |
---|
| 118 | |
---|
| 119 | _deferred = None |
---|
| 120 | |
---|
| 121 | def __init__(self, xs, thisHost, otherHost, secret): |
---|
| 122 | self.xmlstream = xs |
---|
| 123 | self.thisHost = thisHost |
---|
| 124 | self.otherHost = otherHost |
---|
| 125 | self.secret = secret |
---|
| 126 | |
---|
| 127 | |
---|
| 128 | def initialize(self): |
---|
| 129 | self._deferred = defer.Deferred() |
---|
| 130 | self.xmlstream.addObserver(xmlstream.STREAM_ERROR_EVENT, |
---|
| 131 | self.onStreamError) |
---|
| 132 | self.xmlstream.addObserver("/result[@xmlns='%s']" % NS_DIALBACK, |
---|
| 133 | self.onResult) |
---|
| 134 | |
---|
| 135 | key = generateKey(self.secret, self.otherHost, |
---|
| 136 | self.thisHost, self.xmlstream.sid) |
---|
| 137 | |
---|
| 138 | result = domish.Element((NS_DIALBACK, 'result')) |
---|
| 139 | result['from'] = self.thisHost |
---|
| 140 | result['to'] = self.otherHost |
---|
| 141 | result.addContent(key) |
---|
| 142 | |
---|
| 143 | self.xmlstream.send(result) |
---|
| 144 | |
---|
| 145 | return self._deferred |
---|
| 146 | |
---|
| 147 | |
---|
| 148 | def onResult(self, result): |
---|
| 149 | self.xmlstream.removeObserver(xmlstream.STREAM_ERROR_EVENT, |
---|
| 150 | self.onStreamError) |
---|
| 151 | if result['type'] == 'valid': |
---|
| 152 | self.xmlstream.otherEntity = jid.internJID(self.otherHost) |
---|
| 153 | self._deferred.callback(None) |
---|
| 154 | else: |
---|
| 155 | self._deferred.errback(DialbackFailed()) |
---|
| 156 | |
---|
| 157 | |
---|
| 158 | def onStreamError(self, failure): |
---|
| 159 | self.xmlstream.removeObserver("/result[@xmlns='%s']" % NS_DIALBACK, |
---|
| 160 | self.onResult) |
---|
| 161 | self._deferred.errback(failure) |
---|
| 162 | |
---|
| 163 | |
---|
| 164 | |
---|
| 165 | class ReceivingDialbackInitializer(object): |
---|
| 166 | """ |
---|
| 167 | Server Dialback Initializer for the Receiving Server. |
---|
| 168 | """ |
---|
| 169 | |
---|
| 170 | implements(ijabber.IInitiatingInitializer) |
---|
| 171 | |
---|
| 172 | _deferred = None |
---|
| 173 | |
---|
| 174 | def __init__(self, xs, thisHost, otherHost, originalStreamID, key): |
---|
| 175 | self.xmlstream = xs |
---|
| 176 | self.thisHost = thisHost |
---|
| 177 | self.otherHost = otherHost |
---|
| 178 | self.originalStreamID = originalStreamID |
---|
| 179 | self.key = key |
---|
| 180 | |
---|
| 181 | |
---|
| 182 | def initialize(self): |
---|
| 183 | self._deferred = defer.Deferred() |
---|
| 184 | self.xmlstream.addObserver(xmlstream.STREAM_ERROR_EVENT, |
---|
| 185 | self.onStreamError) |
---|
| 186 | self.xmlstream.addObserver("/verify[@xmlns='%s']" % NS_DIALBACK, |
---|
| 187 | self.onVerify) |
---|
| 188 | |
---|
| 189 | verify = domish.Element((NS_DIALBACK, 'verify')) |
---|
| 190 | verify['from'] = self.thisHost |
---|
| 191 | verify['to'] = self.otherHost |
---|
| 192 | verify['id'] = self.originalStreamID |
---|
| 193 | verify.addContent(self.key) |
---|
| 194 | |
---|
| 195 | self.xmlstream.send(verify) |
---|
| 196 | return self._deferred |
---|
| 197 | |
---|
| 198 | |
---|
| 199 | def onVerify(self, verify): |
---|
| 200 | self.xmlstream.removeObserver(xmlstream.STREAM_ERROR_EVENT, |
---|
| 201 | self.onStreamError) |
---|
| 202 | if verify['id'] != self.originalStreamID: |
---|
| 203 | self.xmlstream.sendStreamError(error.StreamError('invalid-id')) |
---|
| 204 | self._deferred.errback(DialbackFailed()) |
---|
| 205 | elif verify['to'] != self.thisHost: |
---|
| 206 | self.xmlstream.sendStreamError(error.StreamError('host-unknown')) |
---|
| 207 | self._deferred.errback(DialbackFailed()) |
---|
| 208 | elif verify['from'] != self.otherHost: |
---|
| 209 | self.xmlstream.sendStreamError(error.StreamError('invalid-from')) |
---|
| 210 | self._deferred.errback(DialbackFailed()) |
---|
| 211 | elif verify['type'] == 'valid': |
---|
| 212 | self._deferred.callback(None) |
---|
| 213 | else: |
---|
| 214 | self._deferred.errback(DialbackFailed()) |
---|
| 215 | |
---|
| 216 | |
---|
| 217 | def onStreamError(self, failure): |
---|
| 218 | self.xmlstream.removeObserver("/verify[@xmlns='%s']" % NS_DIALBACK, |
---|
| 219 | self.onVerify) |
---|
| 220 | self._deferred.errback(failure) |
---|
| 221 | |
---|
| 222 | |
---|
| 223 | |
---|
| 224 | class XMPPServerConnectAuthenticator(xmlstream.ConnectAuthenticator): |
---|
| 225 | """ |
---|
| 226 | Authenticator for an outgoing XMPP server-to-server connection. |
---|
| 227 | |
---|
| 228 | This authenticator connects to C{otherHost} (the Receiving Server) and then |
---|
| 229 | initiates dialback as C{thisHost} (the Originating Server) using |
---|
| 230 | L{OriginatingDialbackInitializer}. |
---|
| 231 | |
---|
| 232 | @ivar thisHost: The domain this server connects from (the Originating |
---|
| 233 | Server) . |
---|
| 234 | @ivar otherHost: The domain of the server this server connects to (the |
---|
| 235 | Receiving Server). |
---|
| 236 | @ivar secret: The shared secret that is used for verifying the validity |
---|
| 237 | of this new connection. |
---|
| 238 | """ |
---|
| 239 | namespace = 'jabber:server' |
---|
| 240 | |
---|
| 241 | def __init__(self, thisHost, otherHost, secret): |
---|
| 242 | self.thisHost = thisHost |
---|
| 243 | self.otherHost = otherHost |
---|
| 244 | self.secret = secret |
---|
| 245 | xmlstream.ConnectAuthenticator.__init__(self, otherHost) |
---|
| 246 | |
---|
| 247 | |
---|
| 248 | def connectionMade(self): |
---|
| 249 | self.xmlstream.thisEntity = jid.internJID(self.thisHost) |
---|
| 250 | self.xmlstream.prefixes = {xmlstream.NS_STREAMS: 'stream', |
---|
| 251 | NS_DIALBACK: 'db'} |
---|
| 252 | xmlstream.ConnectAuthenticator.connectionMade(self) |
---|
| 253 | |
---|
| 254 | |
---|
| 255 | def associateWithStream(self, xs): |
---|
| 256 | xmlstream.ConnectAuthenticator.associateWithStream(self, xs) |
---|
| 257 | init = OriginatingDialbackInitializer(xs, self.thisHost, |
---|
| 258 | self.otherHost, self.secret) |
---|
| 259 | xs.initializers = [init] |
---|
| 260 | |
---|
| 261 | |
---|
| 262 | |
---|
| 263 | class XMPPServerVerifyAuthenticator(xmlstream.ConnectAuthenticator): |
---|
| 264 | """ |
---|
| 265 | Authenticator for an outgoing connection to verify an incoming connection. |
---|
| 266 | |
---|
| 267 | This authenticator connects to C{otherHost} (the Authoritative Server) and |
---|
| 268 | then initiates dialback as C{thisHost} (the Receiving Server) using |
---|
| 269 | L{ReceivingDialbackInitializer}. |
---|
| 270 | |
---|
| 271 | @ivar thisHost: The domain this server connects from (the Receiving |
---|
| 272 | Server) . |
---|
| 273 | @ivar otherHost: The domain of the server this server connects to (the |
---|
| 274 | Authoritative Server). |
---|
| 275 | @ivar originalStreamID: The stream ID of the incoming connection that is |
---|
| 276 | being verified. |
---|
| 277 | @ivar key: The key provided by the Receving Server to be verified. |
---|
| 278 | """ |
---|
| 279 | namespace = 'jabber:server' |
---|
| 280 | |
---|
| 281 | def __init__(self, thisHost, otherHost, originalStreamID, key): |
---|
| 282 | self.thisHost = thisHost |
---|
| 283 | self.otherHost = otherHost |
---|
| 284 | self.originalStreamID = originalStreamID |
---|
| 285 | self.key = key |
---|
| 286 | xmlstream.ConnectAuthenticator.__init__(self, otherHost) |
---|
| 287 | |
---|
| 288 | |
---|
| 289 | def connectionMade(self): |
---|
| 290 | self.xmlstream.thisEntity = jid.internJID(self.thisHost) |
---|
| 291 | self.xmlstream.prefixes = {xmlstream.NS_STREAMS: 'stream', |
---|
| 292 | NS_DIALBACK: 'db'} |
---|
| 293 | xmlstream.ConnectAuthenticator.connectionMade(self) |
---|
| 294 | |
---|
| 295 | |
---|
| 296 | def associateWithStream(self, xs): |
---|
| 297 | xmlstream.ConnectAuthenticator.associateWithStream(self, xs) |
---|
| 298 | init = ReceivingDialbackInitializer(xs, self.thisHost, self.otherHost, |
---|
| 299 | self.originalStreamID, self.key) |
---|
| 300 | xs.initializers = [init] |
---|
| 301 | |
---|
| 302 | |
---|
| 303 | |
---|
| 304 | class XMPPServerListenAuthenticator(xmlstream.ListenAuthenticator): |
---|
| 305 | """ |
---|
| 306 | Authenticator for an incoming XMPP server-to-server connection. |
---|
| 307 | |
---|
| 308 | This authenticator handles two types of incoming connections. Regular |
---|
| 309 | server-to-server connections are from the Originating Server to the |
---|
| 310 | Receiving Server, where this server is the Receiving Server. These |
---|
| 311 | connections start out by receiving a dialback key, verifying the |
---|
| 312 | key with the Authoritative Server, and then accept normal XMPP stanzas. |
---|
| 313 | |
---|
| 314 | The other type of connections is from a Receiving Server to an |
---|
| 315 | Authoritative Server, where this server acts as the Authoritative Server. |
---|
| 316 | These connections are used to verify the validity of an outgoing connection |
---|
| 317 | from this server. In this case, this server receives a verification |
---|
| 318 | request, checks the key and then returns the result. |
---|
| 319 | |
---|
| 320 | @ivar service: The service that keeps the list of domains we accept |
---|
| 321 | connections for. |
---|
| 322 | """ |
---|
| 323 | namespace = 'jabber:server' |
---|
| 324 | |
---|
| 325 | def __init__(self, service): |
---|
| 326 | xmlstream.ListenAuthenticator.__init__(self) |
---|
| 327 | self.service = service |
---|
| 328 | |
---|
| 329 | |
---|
| 330 | def streamStarted(self, rootElement): |
---|
| 331 | xmlstream.ListenAuthenticator.streamStarted(self, rootElement) |
---|
| 332 | |
---|
| 333 | # Compatibility fix for pre-8.2 implementations of ListenAuthenticator |
---|
| 334 | if not self.xmlstream.sid: |
---|
| 335 | self.xmlstream.sid = randbytes.secureRandom(8).encode('hex') |
---|
| 336 | |
---|
| 337 | if self.xmlstream.thisEntity: |
---|
| 338 | targetDomain = self.xmlstream.thisEntity.host |
---|
| 339 | else: |
---|
| 340 | targetDomain = self.service.defaultDomain |
---|
| 341 | |
---|
| 342 | def prepareStream(domain): |
---|
| 343 | self.xmlstream.namespace = self.namespace |
---|
| 344 | self.xmlstream.prefixes = {xmlstream.NS_STREAMS: 'stream', |
---|
| 345 | NS_DIALBACK: 'db'} |
---|
| 346 | if domain: |
---|
| 347 | self.xmlstream.thisEntity = jid.internJID(domain) |
---|
| 348 | |
---|
| 349 | try: |
---|
| 350 | if xmlstream.NS_STREAMS != rootElement.uri or \ |
---|
| 351 | self.namespace != self.xmlstream.namespace or \ |
---|
| 352 | ('db', NS_DIALBACK) not in rootElement.localPrefixes.iteritems(): |
---|
| 353 | raise error.StreamError('invalid-namespace') |
---|
| 354 | |
---|
| 355 | if targetDomain and targetDomain not in self.service.domains: |
---|
| 356 | raise error.StreamError('host-unknown') |
---|
| 357 | except error.StreamError, exc: |
---|
| 358 | prepareStream(self.service.defaultDomain) |
---|
| 359 | self.xmlstream.sendStreamError(exc) |
---|
| 360 | return |
---|
| 361 | |
---|
| 362 | self.xmlstream.addObserver("//verify[@xmlns='%s']" % NS_DIALBACK, |
---|
| 363 | trapStreamError(self.xmlstream, |
---|
| 364 | self.onVerify)) |
---|
| 365 | self.xmlstream.addObserver("//result[@xmlns='%s']" % NS_DIALBACK, |
---|
| 366 | self.onResult) |
---|
| 367 | |
---|
| 368 | prepareStream(targetDomain) |
---|
| 369 | self.xmlstream.sendHeader() |
---|
| 370 | |
---|
| 371 | if self.xmlstream.version >= (1, 0): |
---|
| 372 | features = domish.Element((xmlstream.NS_STREAMS, 'features')) |
---|
| 373 | self.xmlstream.send(features) |
---|
| 374 | |
---|
| 375 | |
---|
| 376 | def onVerify(self, verify): |
---|
| 377 | try: |
---|
| 378 | receivingServer = jid.JID(verify['from']).host |
---|
| 379 | originatingServer = jid.JID(verify['to']).host |
---|
| 380 | except (KeyError, jid.InvalidFormat): |
---|
| 381 | raise error.StreamError('improper-addressing') |
---|
| 382 | |
---|
| 383 | if originatingServer not in self.service.domains: |
---|
| 384 | raise error.StreamError('host-unknown') |
---|
| 385 | |
---|
| 386 | if (self.xmlstream.otherEntity and |
---|
| 387 | receivingServer != self.xmlstream.otherEntity.host): |
---|
| 388 | raise error.StreamError('invalid-from') |
---|
| 389 | |
---|
| 390 | streamID = verify.getAttribute('id', '') |
---|
| 391 | key = unicode(verify) |
---|
| 392 | |
---|
| 393 | calculatedKey = generateKey(self.service.secret, receivingServer, |
---|
| 394 | originatingServer, streamID) |
---|
| 395 | validity = (key == calculatedKey) and 'valid' or 'invalid' |
---|
| 396 | |
---|
| 397 | reply = domish.Element((NS_DIALBACK, 'verify')) |
---|
| 398 | reply['from'] = originatingServer |
---|
| 399 | reply['to'] = receivingServer |
---|
| 400 | reply['id'] = streamID |
---|
| 401 | reply['type'] = validity |
---|
| 402 | self.xmlstream.send(reply) |
---|
| 403 | |
---|
| 404 | |
---|
| 405 | def onResult(self, result): |
---|
| 406 | def reply(validity): |
---|
| 407 | reply = domish.Element((NS_DIALBACK, 'result')) |
---|
| 408 | reply['from'] = result['to'] |
---|
| 409 | reply['to'] = result['from'] |
---|
| 410 | reply['type'] = validity |
---|
| 411 | self.xmlstream.send(reply) |
---|
| 412 | |
---|
| 413 | def valid(xs): |
---|
| 414 | reply('valid') |
---|
| 415 | if not self.xmlstream.thisEntity: |
---|
| 416 | self.xmlstream.thisEntity = jid.internJID(receivingServer) |
---|
| 417 | self.xmlstream.otherEntity = jid.internJID(originatingServer) |
---|
| 418 | self.xmlstream.dispatch(self.xmlstream, |
---|
| 419 | xmlstream.STREAM_AUTHD_EVENT) |
---|
| 420 | |
---|
| 421 | def invalid(failure): |
---|
| 422 | log.err(failure) |
---|
| 423 | reply('invalid') |
---|
| 424 | |
---|
| 425 | receivingServer = result['to'] |
---|
| 426 | originatingServer = result['from'] |
---|
| 427 | key = unicode(result) |
---|
| 428 | |
---|
| 429 | d = self.service.validateConnection(receivingServer, originatingServer, |
---|
| 430 | self.xmlstream.sid, key) |
---|
| 431 | d.addCallbacks(valid, invalid) |
---|
| 432 | return d |
---|
| 433 | |
---|
| 434 | |
---|
| 435 | |
---|
| 436 | class DeferredS2SClientFactory(DeferredXmlStreamFactory): |
---|
| 437 | """ |
---|
| 438 | Deferred firing factory for initiating XMPP server-to-server connection. |
---|
| 439 | |
---|
| 440 | The deferred has its callbacks called upon succesful authentication with |
---|
| 441 | the other server. In case of failed authentication or connection, the |
---|
| 442 | deferred will have its errbacks called instead. |
---|
| 443 | """ |
---|
| 444 | |
---|
| 445 | logTraffic = False |
---|
| 446 | |
---|
| 447 | def __init__(self, authenticator): |
---|
| 448 | DeferredXmlStreamFactory.__init__(self, authenticator) |
---|
| 449 | |
---|
| 450 | self.addBootstrap(xmlstream.STREAM_CONNECTED_EVENT, |
---|
| 451 | self.onConnectionMade) |
---|
| 452 | |
---|
| 453 | self.serial = 0 |
---|
| 454 | |
---|
| 455 | |
---|
| 456 | def onConnectionMade(self, xs): |
---|
| 457 | xs.serial = self.serial |
---|
| 458 | self.serial += 1 |
---|
| 459 | |
---|
| 460 | def logDataIn(buf): |
---|
| 461 | log.msg("RECV (%d): %r" % (xs.serial, buf)) |
---|
| 462 | |
---|
| 463 | def logDataOut(buf): |
---|
| 464 | log.msg("SEND (%d): %r" % (xs.serial, buf)) |
---|
| 465 | |
---|
| 466 | if self.logTraffic: |
---|
| 467 | xs.rawDataInFn = logDataIn |
---|
| 468 | xs.rawDataOutFn = logDataOut |
---|
| 469 | |
---|
| 470 | |
---|
| 471 | |
---|
| 472 | def initiateS2S(factory): |
---|
| 473 | domain = factory.authenticator.otherHost |
---|
| 474 | c = XMPPServerConnector(reactor, domain, factory) |
---|
| 475 | c.connect() |
---|
| 476 | return factory.deferred |
---|
| 477 | |
---|
| 478 | |
---|
| 479 | |
---|
| 480 | class XMPPS2SServerFactory(XmlStreamServerFactory): |
---|
| 481 | """ |
---|
| 482 | XMPP Server-to-Server Server factory. |
---|
| 483 | |
---|
| 484 | This factory accepts XMPP server-to-server connections. |
---|
| 485 | """ |
---|
| 486 | |
---|
| 487 | logTraffic = False |
---|
| 488 | |
---|
| 489 | def __init__(self, service): |
---|
| 490 | self.service = service |
---|
| 491 | |
---|
| 492 | def authenticatorFactory(): |
---|
| 493 | return XMPPServerListenAuthenticator(service) |
---|
| 494 | |
---|
| 495 | XmlStreamServerFactory.__init__(self, authenticatorFactory) |
---|
| 496 | self.addBootstrap(xmlstream.STREAM_CONNECTED_EVENT, |
---|
| 497 | self.onConnectionMade) |
---|
| 498 | self.addBootstrap(xmlstream.STREAM_AUTHD_EVENT, |
---|
| 499 | self.onAuthenticated) |
---|
| 500 | |
---|
| 501 | self.serial = 0 |
---|
| 502 | |
---|
| 503 | |
---|
| 504 | def onConnectionMade(self, xs): |
---|
| 505 | """ |
---|
| 506 | Called when a server-to-server connection was made. |
---|
| 507 | |
---|
| 508 | This enables traffic debugging on incoming streams. |
---|
| 509 | """ |
---|
| 510 | xs.serial = self.serial |
---|
| 511 | self.serial += 1 |
---|
| 512 | |
---|
| 513 | def logDataIn(buf): |
---|
| 514 | log.msg("RECV (%d): %r" % (xs.serial, buf)) |
---|
| 515 | |
---|
| 516 | def logDataOut(buf): |
---|
| 517 | log.msg("SEND (%d): %r" % (xs.serial, buf)) |
---|
| 518 | |
---|
| 519 | if self.logTraffic: |
---|
| 520 | xs.rawDataInFn = logDataIn |
---|
| 521 | xs.rawDataOutFn = logDataOut |
---|
| 522 | |
---|
| 523 | xs.addObserver(xmlstream.STREAM_ERROR_EVENT, self.onError) |
---|
| 524 | |
---|
| 525 | |
---|
| 526 | def onAuthenticated(self, xs): |
---|
| 527 | thisHost = xs.thisEntity.host |
---|
| 528 | otherHost = xs.otherEntity.host |
---|
| 529 | |
---|
| 530 | log.msg("Incoming connection %d from %r to %r established" % |
---|
| 531 | (xs.serial, otherHost, thisHost)) |
---|
| 532 | |
---|
| 533 | xs.addObserver(xmlstream.STREAM_END_EVENT, self.onConnectionLost, |
---|
| 534 | 0, xs) |
---|
| 535 | xs.addObserver('/*', self.onElement, 0, xs) |
---|
| 536 | |
---|
| 537 | |
---|
| 538 | def onConnectionLost(self, xs, reason): |
---|
| 539 | thisHost = xs.thisEntity.host |
---|
| 540 | otherHost = xs.otherEntity.host |
---|
| 541 | |
---|
| 542 | log.msg("Incoming connection %d from %r to %r disconnected" % |
---|
| 543 | (xs.serial, otherHost, thisHost)) |
---|
| 544 | |
---|
| 545 | |
---|
| 546 | def onError(self, reason): |
---|
| 547 | log.err(reason, "Stream Error") |
---|
| 548 | |
---|
| 549 | |
---|
| 550 | def onElement(self, xs, element): |
---|
| 551 | """ |
---|
| 552 | Called when an element was received from one of the connected streams. |
---|
| 553 | |
---|
| 554 | """ |
---|
| 555 | if element.handled: |
---|
| 556 | return |
---|
| 557 | else: |
---|
| 558 | self.service.dispatch(xs, element) |
---|
| 559 | |
---|
| 560 | |
---|
| 561 | |
---|
| 562 | class ServerService(object): |
---|
| 563 | """ |
---|
| 564 | Service for managing XMPP server to server connections. |
---|
| 565 | """ |
---|
| 566 | |
---|
| 567 | logTraffic = False |
---|
| 568 | |
---|
| 569 | def __init__(self, router, domain=None, secret=None): |
---|
| 570 | self.router = router |
---|
| 571 | |
---|
| 572 | self.defaultDomain = domain |
---|
| 573 | self.domains = set() |
---|
| 574 | if self.defaultDomain: |
---|
| 575 | self.domains.add(self.defaultDomain) |
---|
| 576 | |
---|
| 577 | if secret is not None: |
---|
| 578 | self.secret = secret |
---|
| 579 | else: |
---|
| 580 | self.secret = randbytes.secureRandom(16).encode('hex') |
---|
| 581 | |
---|
| 582 | self._outgoingStreams = {} |
---|
| 583 | self._outgoingQueues = {} |
---|
| 584 | self._outgoingConnecting = set() |
---|
| 585 | self.serial = 0 |
---|
| 586 | |
---|
| 587 | pipe = XmlPipe() |
---|
| 588 | self.xmlstream = pipe.source |
---|
| 589 | self.router.addRoute(None, pipe.sink) |
---|
| 590 | self.xmlstream.addObserver('/*', self.send) |
---|
| 591 | |
---|
| 592 | |
---|
| 593 | def outgoingInitialized(self, xs): |
---|
| 594 | thisHost = xs.thisEntity.host |
---|
| 595 | otherHost = xs.otherEntity.host |
---|
| 596 | |
---|
| 597 | log.msg("Outgoing connection %d from %r to %r established" % |
---|
| 598 | (xs.serial, thisHost, otherHost)) |
---|
| 599 | |
---|
| 600 | self._outgoingStreams[thisHost, otherHost] = xs |
---|
| 601 | xs.addObserver(xmlstream.STREAM_END_EVENT, |
---|
| 602 | lambda _: self.outgoingDisconnected(xs)) |
---|
| 603 | |
---|
| 604 | if (thisHost, otherHost) in self._outgoingQueues: |
---|
| 605 | for element in self._outgoingQueues[thisHost, otherHost]: |
---|
| 606 | xs.send(element) |
---|
| 607 | del self._outgoingQueues[thisHost, otherHost] |
---|
| 608 | |
---|
| 609 | |
---|
| 610 | def outgoingDisconnected(self, xs): |
---|
| 611 | thisHost = xs.thisEntity.host |
---|
| 612 | otherHost = xs.otherEntity.host |
---|
| 613 | |
---|
| 614 | log.msg("Outgoing connection %d from %r to %r disconnected" % |
---|
| 615 | (xs.serial, thisHost, otherHost)) |
---|
| 616 | |
---|
| 617 | del self._outgoingStreams[thisHost, otherHost] |
---|
| 618 | |
---|
| 619 | |
---|
| 620 | def initiateOutgoingStream(self, thisHost, otherHost): |
---|
| 621 | """ |
---|
| 622 | Initiate an outgoing XMPP server-to-server connection. |
---|
| 623 | """ |
---|
| 624 | |
---|
| 625 | def resetConnecting(_): |
---|
| 626 | self._outgoingConnecting.remove((thisHost, otherHost)) |
---|
| 627 | |
---|
| 628 | if (thisHost, otherHost) in self._outgoingConnecting: |
---|
| 629 | return |
---|
| 630 | |
---|
| 631 | authenticator = XMPPServerConnectAuthenticator(thisHost, |
---|
| 632 | otherHost, |
---|
| 633 | self.secret) |
---|
| 634 | factory = DeferredS2SClientFactory(authenticator) |
---|
| 635 | factory.addBootstrap(xmlstream.STREAM_AUTHD_EVENT, |
---|
| 636 | self.outgoingInitialized) |
---|
| 637 | factory.logTraffic = self.logTraffic |
---|
| 638 | |
---|
| 639 | self._outgoingConnecting.add((thisHost, otherHost)) |
---|
| 640 | |
---|
| 641 | d = initiateS2S(factory) |
---|
| 642 | d.addBoth(resetConnecting) |
---|
| 643 | return d |
---|
| 644 | |
---|
| 645 | |
---|
| 646 | def validateConnection(self, thisHost, otherHost, sid, key): |
---|
| 647 | """ |
---|
| 648 | Validate an incoming XMPP server-to-server connection. |
---|
| 649 | """ |
---|
| 650 | |
---|
| 651 | def connected(xs): |
---|
| 652 | # Set up stream for immediate disconnection. |
---|
| 653 | def disconnect(_): |
---|
| 654 | xs.transport.loseConnection() |
---|
| 655 | xs.addObserver(xmlstream.STREAM_AUTHD_EVENT, disconnect) |
---|
| 656 | xs.addObserver(xmlstream.INIT_FAILED_EVENT, disconnect) |
---|
| 657 | |
---|
| 658 | authenticator = XMPPServerVerifyAuthenticator(thisHost, otherHost, |
---|
| 659 | sid, key) |
---|
| 660 | factory = DeferredS2SClientFactory(authenticator) |
---|
| 661 | factory.addBootstrap(xmlstream.STREAM_CONNECTED_EVENT, connected) |
---|
| 662 | factory.logTraffic = self.logTraffic |
---|
| 663 | |
---|
| 664 | d = initiateS2S(factory) |
---|
| 665 | return d |
---|
| 666 | |
---|
| 667 | |
---|
| 668 | def send(self, stanza): |
---|
| 669 | """ |
---|
| 670 | Send stanza to the proper XML Stream. |
---|
| 671 | |
---|
| 672 | This uses addressing embedded in the stanza to find the correct stream |
---|
| 673 | to forward the stanza to. |
---|
| 674 | """ |
---|
| 675 | |
---|
| 676 | otherHost = jid.internJID(stanza["to"]).host |
---|
| 677 | thisHost = jid.internJID(stanza["from"]).host |
---|
| 678 | |
---|
| 679 | if (thisHost, otherHost) not in self._outgoingStreams: |
---|
| 680 | # There is no connection with the destination (yet). Cache the |
---|
| 681 | # outgoing stanza until the connection has been established. |
---|
| 682 | # XXX: If the connection cannot be established, the queue should |
---|
| 683 | # be emptied at some point. |
---|
| 684 | if (thisHost, otherHost) not in self._outgoingQueues: |
---|
| 685 | self._outgoingQueues[(thisHost, otherHost)] = [] |
---|
| 686 | self._outgoingQueues[(thisHost, otherHost)].append(stanza) |
---|
| 687 | self.initiateOutgoingStream(thisHost, otherHost) |
---|
| 688 | else: |
---|
| 689 | self._outgoingStreams[(thisHost, otherHost)].send(stanza) |
---|
| 690 | |
---|
| 691 | |
---|
| 692 | def dispatch(self, xs, stanza): |
---|
| 693 | """ |
---|
| 694 | Send on element to be routed within the server. |
---|
| 695 | """ |
---|
| 696 | stanzaFrom = stanza.getAttribute('from') |
---|
| 697 | stanzaTo = stanza.getAttribute('to') |
---|
| 698 | |
---|
| 699 | if not stanzaFrom or not stanzaTo: |
---|
| 700 | xs.sendStreamError(error.StreamError('improper-addressing')) |
---|
| 701 | else: |
---|
| 702 | try: |
---|
| 703 | sender = jid.internJID(stanzaFrom) |
---|
| 704 | recipient = jid.internJID(stanzaTo) |
---|
| 705 | except jid.InvalidFormat: |
---|
| 706 | log.msg("Dropping error stanza with malformed JID") |
---|
| 707 | |
---|
| 708 | if sender.host != xs.otherEntity.host: |
---|
| 709 | xs.sendStreamError(error.StreamError('invalid-from')) |
---|
| 710 | else: |
---|
| 711 | self.xmlstream.send(stanza) |
---|