Changeset 39:d6a0f8cbabf3 in ralphm-patches for xmpp_client_service.patch
- Timestamp:
- Feb 21, 2010, 2:29:49 PM (11 years ago)
- Branch:
- default
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
xmpp_client_service.patch
r38 r39 1 diff -r 1644083ca235doc/examples/client_service.tac1 diff -r 62f841ed2a99 doc/examples/client_service.tac 2 2 --- /dev/null Thu Jan 01 00:00:00 1970 +0000 3 +++ b/doc/examples/client_service.tac S at Feb 13 19:01:072010 +01004 @@ -0,0 +1, 64 @@3 +++ b/doc/examples/client_service.tac Sun Feb 21 14:28:09 2010 +0100 4 @@ -0,0 +1,74 @@ 5 5 +from twisted.application import service, strports 6 6 +from twisted.internet import defer 7 7 + 8 +from wokkel import client 8 +from wokkel import client, xmppim 9 +from wokkel.component import InternalComponent, Router 9 10 +from wokkel.generic import FallbackHandler 10 +from wokkel. subprotocols import XMPPHandler11 +from wokkel.xmppim import RosterItem , RosterServerProtocol11 +from wokkel.ping import PingHandler 12 +from wokkel.xmppim import RosterItem 12 13 + 13 14 +from twisted.words.protocols.jabber.jid import internJID as JID … … 16 17 +domain = socket.gethostname() 17 18 + 18 +class StaticRoster(RosterServerProtocol): 19 + 20 + def __init__(self): 21 + RosterServerProtocol.__init__(self) 22 + self.roster = {'ralphm': [ 23 + RosterItem(JID('intosi@'+domain), 24 + subscriptionTo=True, 25 + subscriptionFrom=True, 26 + name='Intosi', 27 + groups=set(['Friends'])), 28 + RosterItem(JID('termie@'+domain), 29 + subscriptionTo=True, 30 + subscriptionFrom=True, 31 + name='termie'), 32 + ], 33 + 'test': [], 34 + } 19 +RALPHM = JID('ralphm@'+domain) 20 +INTOSI = JID('intosi@'+domain) 21 +TERMIE = JID('termie@'+domain) 22 + 23 +roster = { 24 + 'ralphm': { 25 + INTOSI: RosterItem(INTOSI, 26 + subscriptionTo=True, 27 + subscriptionFrom=True, 28 + name='Intosi', 29 + groups=set(['Friends'])), 30 + TERMIE: RosterItem(TERMIE, 31 + subscriptionTo=True, 32 + subscriptionFrom=True, 33 + name='termie'), 34 + }, 35 + 'termie': { 36 + RALPHM: RosterItem(RALPHM, 37 + subscriptionTo=True, 38 + subscriptionFrom=True, 39 + name='ralphm'), 40 + } 41 + } 42 + 43 +accounts = set(roster.keys()) 44 + 45 + 46 +class StaticRoster(xmppim.RosterServerProtocol): 47 + 48 + def __init__(self, roster): 49 + xmppim.RosterServerProtocol.__init__(self) 50 + self.roster = roster 35 51 + 36 52 + def getRoster(self, entity): 37 + return defer.succeed(self.roster[entity.user]) 38 + 39 + 40 +class Hello(XMPPHandler): 41 + 42 + def q(self): 43 + from wokkel.xmppim import AvailabilityPresence 44 + p = AvailabilityPresence(JID('ralphm@'+domain+'/default'), JID('termie@'+domain+'/Home'), show='chat') 45 + self.parent.send(p.toElement()) 46 + 47 + 48 + def connectionInitialized(self): 49 + from twisted.internet import reactor 50 + reactor.callLater(5, self.q) 53 + return defer.succeed(self.roster[entity.user].values()) 51 54 + 52 55 + … … 54 57 +application = service.Application("Jabber server") 55 58 + 56 +sessionManager = client.SessionManager() 57 +FallbackHandler().setHandlerParent(sessionManager) 58 +StaticRoster().setHandlerParent(sessionManager) 59 +Hello().setHandlerParent(sessionManager) 60 + 61 +clientService = client.ClientService(sessionManager, domain) 62 +clientService.logTraffic = True 63 + 64 + 65 +c2sFactory = client.XMPPC2SServerFactory(clientService) 59 +router = Router() 60 +component = InternalComponent(router, domain) 61 +component.setServiceParent(application) 62 + 63 +sessionManager = client.SessionManager(domain, accounts) 64 +sessionManager.setHandlerParent(component) 65 + 66 +xmppim.AccountIQHandler(sessionManager).setHandlerParent(component) 67 +xmppim.AccountMessageHandler(sessionManager).setHandlerParent(component) 68 +xmppim.PresenceServerHandler(sessionManager, domain, roster).setHandlerParent(component) 69 +FallbackHandler().setHandlerParent(component) 70 +StaticRoster(roster).setHandlerParent(component) 71 +PingHandler().setHandlerParent(component) 72 + 73 +c2sFactory = client.XMPPC2SServerFactory(sessionManager) 66 74 +c2sFactory.logTraffic = True 67 75 +c2sService = strports.service('5224', c2sFactory) 68 76 +c2sService.setServiceParent(application) 69 diff -r 1644083ca235 wokkel/client.py 70 --- a/wokkel/client.py Sat Feb 13 18:57:27 2010 +0100 71 +++ b/wokkel/client.py Sat Feb 13 19:01:07 2010 +0100 72 @@ -17,11 +17,12 @@ 77 + 78 +sessionManager.connectionManager = c2sFactory 79 diff -r 62f841ed2a99 wokkel/client.py 80 --- a/wokkel/client.py Sat Feb 13 18:57:26 2010 +0100 81 +++ b/wokkel/client.py Sun Feb 21 14:28:09 2010 +0100 82 @@ -10,13 +10,27 @@ 83 that should probably eventually move there. 84 """ 85 86 +import base64 87 + 88 from twisted.application import service 89 -from twisted.internet import reactor 90 +from twisted.internet import defer, reactor 73 91 from twisted.names.srvconnect import SRVConnector 74 from twisted.python import log 75 from twisted.words.protocols.jabber import client, error, sasl, xmlstream 76 -from twisted.words.protocols.jabber.jid import internJID as JID 92 -from twisted.words.protocols.jabber import client, sasl, xmlstream 93 +from twisted.python import log, randbytes 94 +from twisted.words.protocols.jabber import client, error, sasl, xmlstream 77 95 +from twisted.words.protocols.jabber.jid import JID, internJID 78 96 +from twisted.words.xish import domish 79 97 80 98 from wokkel import generic 81 99 -from wokkel.subprotocols import StreamManager 82 100 +from wokkel.compat import XmlStreamServerFactory 83 +from wokkel.subprotocols import StreamManager, XMPPHandlerCollection 84 85 NS_CLIENT = 'jabber:client' 86 87 @@ -311,6 +312,7 @@ 88 89 # TODO: check for resource conflicts 90 91 + print self.username, self.domain, self.resource 92 newJID = JID(tuple=(self.username, self.domain, self.resource)) 93 94 reply = domish.Element((None, 'iq')) 95 @@ -350,3 +352,166 @@ 96 reply['id'] = iq['id'] 97 reply.addElement((client.NS_XMPP_SESSION, 'session')) 98 self.xmlstream.send(reply) 99 + 100 + 101 + 102 +class XMPPC2SServerFactory(XmlStreamServerFactory): 101 +from wokkel.subprotocols import StreamManager, XMPPHandler 102 + 103 +NS_CLIENT = 'jabber:client' 104 + 105 +XPATH_ALL = "/*" 106 +XPATH_AUTH = "/auth[@xmlns='%s']" % sasl.NS_XMPP_SASL 107 +XPATH_BIND = "/iq[@type='set']/bind[@xmlns='%s']" % client.NS_XMPP_BIND 108 +XPATH_SESSION = "/iq[@type='set']/session[@xmlns='%s']" % \ 109 + client.NS_XMPP_SESSION 110 111 class CheckAuthInitializer(object): 112 """ 113 @@ -51,7 +65,7 @@ 114 autentication. 115 """ 116 117 - namespace = 'jabber:client' 118 + namespace = NS_CLIENT 119 120 def __init__(self, jid, password): 121 xmlstream.ConnectAuthenticator.__init__(self, jid.host) 122 @@ -186,3 +200,338 @@ 123 c = XMPPClientConnector(reactor, domain, factory) 124 c.connect() 125 return factory.deferred 126 + 127 + 128 + 129 +class XMPPClientListenAuthenticator(xmlstream.ListenAuthenticator): 130 + namespace = NS_CLIENT 103 131 + 104 132 + def __init__(self, service): 105 133 + self.service = service 134 + self.failureGrace = 3 135 + self.state = 'auth' 136 + 137 + 138 + def associateWithStream(self, xs): 139 + xmlstream.ListenAuthenticator.associateWithStream(self, xs) 140 + self.xmlstream.addObserver(XPATH_ALL, self.onElementFallback, -1) 141 + 142 + 143 + def onElementFallback(self, element): 144 + if element.handled: 145 + return 146 + 147 + exc = error.StreamError('not-authorized') 148 + self.xmlstream.sendStreamError(exc) 149 + 150 + 151 + def streamStarted(self, rootElement): 152 + xmlstream.ListenAuthenticator.streamStarted(self, rootElement) 153 + 154 + # check namespace 155 + #if self.xmlstream.namespace != self.namespace: 156 + # self.xmlstream.namespace = self.namespace 157 + # exc = error.StreamError('invalid-namespace') 158 + # self.xmlstream.sendStreamError(exc) 159 + # return 160 + 161 + # TODO: check domain (self.service.domain) 162 + 163 + self.xmlstream.sendHeader() 164 + 165 + try: 166 + stateHandlerName = 'streamStarted_' + self.state 167 + stateHandler = getattr(self, stateHandlerName) 168 + except AttributeError: 169 + log.msg('streamStarted handler for', self.state, 'not found') 170 + else: 171 + stateHandler() 172 + 173 + 174 + def toState(self, state): 175 + self.state = state 176 + if state == 'initialized': 177 + self.xmlstream.removeObserver(XPATH_ALL, self.onElementFallback) 178 + self.xmlstream.addOnetimeObserver(XPATH_SESSION, self.onSession, 1) 179 + self.xmlstream.dispatch(self.xmlstream, 180 + xmlstream.STREAM_AUTHD_EVENT) 181 + 182 + 183 + def streamStarted_auth(self): 184 + features = domish.Element((xmlstream.NS_STREAMS, 'features')) 185 + features.addElement((sasl.NS_XMPP_SASL, 'mechanisms')) 186 + features.mechanisms.addElement('mechanism', content='PLAIN') 187 + self.xmlstream.send(features) 188 + self.xmlstream.addOnetimeObserver(XPATH_AUTH, self.onAuth) 189 + 190 + 191 + def onAuth(self, auth): 192 + auth.handled = True 193 + 194 + if auth.getAttribute('mechanism') != 'PLAIN': 195 + failure = domish.Element((sasl.NS_XMPP_SASL, 'failure')) 196 + failure.addElement('invalid-mechanism') 197 + self.xmlstream.send(failure) 198 + 199 + # Close stream on too many failing authentication attempts 200 + self.failureGrace -= 1 201 + if self.failureGrace == 0: 202 + self.xmlstream.sendFooter() 203 + else: 204 + self.xmlstream.addOnetimeObserver(XPATH_AUTH, self.onAuth) 205 + 206 + return 207 + 208 + initialResponse = base64.b64decode(unicode(auth)) 209 + authzid, authcid, passwd = initialResponse.split('\x00') 210 + 211 + # TODO: check passwd 212 + 213 + # authenticated 214 + 215 + self.username = authcid 216 + 217 + success = domish.Element((sasl.NS_XMPP_SASL, 'success')) 218 + self.xmlstream.send(success) 219 + self.xmlstream.reset() 220 + 221 + self.toState('bind') 222 + 223 + 224 + def streamStarted_bind(self): 225 + features = domish.Element((xmlstream.NS_STREAMS, 'features')) 226 + features.addElement((client.NS_XMPP_BIND, 'bind')) 227 + features.addElement((client.NS_XMPP_SESSION, 'session')) 228 + self.xmlstream.send(features) 229 + self.xmlstream.addOnetimeObserver(XPATH_BIND, self.onBind) 230 + 231 + 232 + def onBind(self, iq): 233 + def cb(boundJID): 234 + self.xmlstream.otherEntity = boundJID 235 + self.toState('initialized') 236 + 237 + response = xmlstream.toResponse(iq, 'result') 238 + response.addElement((client.NS_XMPP_BIND, 'bind')) 239 + response.bind.addElement((client.NS_XMPP_BIND, 'jid'), 240 + content=boundJID.full()) 241 + 242 + return response 243 + 244 + def eb(failure): 245 + if not isinstance(failure, error.StanzaError): 246 + log.msg(failure) 247 + exc = error.StanzaError('internal-server-error') 248 + else: 249 + exc = failure.value 250 + 251 + return exc.toResponse(iq) 252 + 253 + iq.handled = True 254 + resource = unicode(iq.bind) or None 255 + d = self.service.bindResource(self.username, 256 + self.service.domain, 257 + resource) 258 + d.addCallback(cb) 259 + d.addErrback(eb) 260 + d.addCallback(self.xmlstream.send) 261 + 262 + 263 + def onSession(self, iq): 264 + iq.handled = True 265 + 266 + reply = domish.Element((None, 'iq')) 267 + reply['type'] = 'result' 268 + if iq.getAttribute('id'): 269 + reply['id'] = iq['id'] 270 + reply.addElement((client.NS_XMPP_SESSION, 'session')) 271 + self.xmlstream.send(reply) 272 + 273 + 274 + 275 +class RecipientUnavailable(Exception): 276 + """ 277 + The addressed entity is not, or no longer, available. 278 + """ 279 + 280 + 281 + 282 +class XMPPC2SServerFactory(XmlStreamServerFactory): 283 + 284 + def __init__(self, service): 285 + self.service = service 106 286 + 107 287 + def authenticatorFactory(): 108 + return XMPPClientListenAuthenticator(service .domain)288 + return XMPPClientListenAuthenticator(service) 109 289 + 110 290 + XmlStreamServerFactory.__init__(self, authenticatorFactory) … … 115 295 + 116 296 + self.serial = 0 297 + self.streams = {} 117 298 + 118 299 + … … 148 329 + xs.addObserver('/*', self.onElement, 0, xs) 149 330 + 150 + self.service.connectionInitialized(xs) 331 + # Record this stream as bound to the authenticated JID 332 + self.streams[xs.otherEntity] = xs 151 333 + 152 334 + … … 154 336 + log.msg("Client connection %d disconnected" % xs.serial) 155 337 + 156 + self.service.connectionLost(xs, reason) 338 + entity = xs.otherEntity 339 + self.service.unbindResource(entity.user, 340 + entity.host, 341 + entity.resource, 342 + reason) 343 + 344 + # If the lost connections had been bound, remove the reference 345 + if xs.otherEntity in self.streams: 346 + del self.streams[xs.otherEntity] 157 347 + 158 348 + … … 161 351 + 162 352 + 163 + def onElement(self, xs, element):353 + def onElement(self, xs, stanza): 164 354 + """ 165 355 + Called when an element was received from one of the connected streams. 166 356 + 167 357 + """ 168 + if element.handled:358 + if stanza.handled: 169 359 + return 170 360 + else: 171 + self.service.dispatch(xs, element) 172 + 173 + 174 + 175 +class SessionManager(XMPPHandlerCollection): 176 + 177 + def __init__(self): 178 + XMPPHandlerCollection.__init__(self) 179 + self.xmlstream = None 180 + 181 + 182 + def makeConnection(self, xs): 183 + self.xmlstream = xs 184 + 185 + for handler in self: 186 + handler.makeConnection(xs) 187 + handler.connectionInitialized() 188 + 189 + 190 + def send(self, obj): 191 + if self.xmlstream: 192 + self.xmlstream.send(obj) 193 + 194 + 195 +class ClientService(object): 361 + self.service.onElement(stanza, xs.otherEntity) 362 + 363 + 364 + def deliverStanza(self, element, recipient): 365 + if recipient in self.streams: 366 + self.streams[recipient].send(element) 367 + else: 368 + raise RecipientUnavailable(u"There is no connection for %s" % 369 + recipient.full()) 370 + 371 + 372 +class Session(object): 373 + def __init__(self, entity): 374 + self.entity = entity 375 + self.connected = False 376 + self.interested = False 377 + self.presence = None 378 + 379 + 380 + 381 +class SessionManager(XMPPHandler): 382 + 196 383 + """ 197 + Service for accepting XMPP client connections. 198 + 199 + Incoming client connections are first authenticated using the 200 + L{XMPPClientListenAuthenticator}, and then kept in a dictionary that is 201 + keyed with the JID that was bound to that connection. 202 + 203 + Where L{StreamManager} manages one connection at a time, this 204 + service demultiplexes incoming connections. For the subprotocol handlers 205 + (objects providing {ijabber.IXMPPHandler}), their stream is always 206 + connected and initialized, but they only have to deal with one stream. This 207 + makes it easier to create adapters. 208 + 209 + As an L{xmlstream.XMPPHandlerCollection}, this service creates a fake XML 210 + stream that is passed to the XMPP subprotocol handlers. The received 211 + stanzas from the incoming client connections are passed on to the handlers 212 + using this fake XML, while having their C{from} attribute set to the JID of 213 + the client stream. Stanzas sent by the handlers are forwarded to the 214 + matching client stream, selected by the stanzas C{to} attribute. 384 + Session Manager. 385 + 386 + @ivar xmlstream: XML Stream to inject incoming stanzas from client 387 + connections into. Stanzas where the C{'to'} attribute is not set 388 + or is directed at the local domain are injected as if received on 389 + the XML Stream (using C{dispatch}), other stanzas are injected as if 390 + they were sent from the XML Stream (using C{send}). 215 391 + """ 216 392 + 217 + logTraffic = False 218 + 219 + def __init__(self, sessionManager, domain): 393 + def __init__(self, domain, accounts): 394 + XMPPHandler.__init__(self) 395 + self.domain = domain 396 + self.accounts = accounts 397 + 398 + self.connectionManager = None 399 + self.sessions = {} 400 + 401 + 402 + def bindResource(self, localpart, domain, resource): 403 + if domain != self.domain: 404 + raise Exception("I don't host this domain!") 405 + 406 + try: 407 + userSessions = self.sessions[localpart] 408 + except KeyError: 409 + userSessions = self.sessions[localpart] = {} 410 + 411 + if resource is None: 412 + resource = randbytes.secureRandom(8).encode('hex') 413 + elif resource in self.userSessions: 414 + resource = resource + ' ' + randbytes.secureRandom(8).encode('hex') 415 + 416 + entity = JID(tuple=(localpart, domain, resource)) 417 + session = Session(entity) 418 + session.connected = True 419 + userSessions[resource] = session 420 + 421 + return defer.succeed(entity) 422 + 423 + 424 + def unbindResource(self, localpart, domain, resource, reason=None): 425 + try: 426 + session = self.sessions[localpart][resource] 427 + except KeyError: 428 + pass 429 + else: 430 + session.connected = False 431 + del self.sessions[localpart][resource] 432 + if not self.sessions[localpart]: 433 + del self.sessions[localpart] 434 + 435 + return defer.succeed(None) 436 + 437 + 438 + def onElement(self, element, sender): 439 + # Make sure each stanza has a sender address 440 + if (element.name == 'presence' and 441 + element.getAttribute('type') in ('subscribe', 'subscribed', 442 + 'unsubscribe', 'unsubscribed')): 443 + element['from'] = sender.userhost() 444 + else: 445 + element['from'] = sender.full() 446 + 447 + if (not element.hasAttribute('to') or 448 + internJID(element['to']).host == self.domain): 449 + # This stanza is for local delivery 450 + self.xmlstream.dispatch(element) 451 + else: 452 + # This stanza is for remote delivery 453 + self.xmlstream.send(element) 454 + 455 + 456 + def deliverStanza(self, element, recipient): 457 + if self.connectionManager: 458 + self.connectionManager.deliverStanza(element, recipient) 459 + else: 460 + raise Exception("No connection manager set") 461 diff -r 62f841ed2a99 wokkel/component.py 462 --- a/wokkel/component.py Sat Feb 13 18:57:26 2010 +0100 463 +++ b/wokkel/component.py Sun Feb 21 14:28:09 2010 +0100 464 @@ -313,12 +313,24 @@ 465 """ 466 destination = JID(stanza['to']) 467 468 - log.msg("Routing to %s: %r" % (destination.full(), stanza.toXml())) 469 470 if destination.host in self.routes: 471 + msg = "Routing to %s (default route): %r" 472 + log.msg("Routing to %s: %r" % (destination.full(), 473 + stanza.toXml())) 474 self.routes[destination.host].send(stanza) 475 + elif None in self.routers: 476 + log.msg("Routing to %s (default route): %r" % (destination.full(), 477 + stanza.toXml())) 478 + self.routes[None].send(stanza) 479 else: 480 - self.routes[None].send(stanza) 481 + log.msg("No route to %s: %r" % (destination.full(), 482 + stanza.toXml())) 483 + if stanza.getAttribute('type') not in ('result', 'error'): 484 + # No route, send back error 485 + exc = error.StanzaError('remote-server-timeout') 486 + response = exc.toResponse(stanza) 487 + self.route(response) 488 489 490 491 diff -r 62f841ed2a99 wokkel/xmppim.py 492 --- a/wokkel/xmppim.py Sat Feb 13 18:57:26 2010 +0100 493 +++ b/wokkel/xmppim.py Sun Feb 21 14:28:09 2010 +0100 494 @@ -12,8 +12,11 @@ 495 All of it should eventually move to Twisted. 496 """ 497 498 +import copy 499 + 500 +from twisted.python import log 501 from twisted.words.protocols.jabber import error, xmlstream 502 -from twisted.words.protocols.jabber.jid import JID 503 +from twisted.words.protocols.jabber.jid import JID, internJID 504 from twisted.words.xish import domish 505 506 from wokkel.compat import IQ 507 @@ -85,7 +88,7 @@ 508 handler(presence) 509 510 def _onPresenceAvailable(self, presence): 511 - entity = JID(presence["from"]) 512 + entity = internJID(presence["from"]) 513 514 show = unicode(presence.show or '') 515 if show not in ['away', 'xa', 'chat', 'dnd']: 516 @@ -101,23 +104,23 @@ 517 self.availableReceived(entity, show, statuses, priority) 518 519 def _onPresenceUnavailable(self, presence): 520 - entity = JID(presence["from"]) 521 + entity = internJID(presence["from"]) 522 523 statuses = self._getStatuses(presence) 524 525 self.unavailableReceived(entity, statuses) 526 527 def _onPresenceSubscribed(self, presence): 528 - self.subscribedReceived(JID(presence["from"])) 529 + self.subscribedReceived(internJID(presence["from"])) 530 531 def _onPresenceUnsubscribed(self, presence): 532 - self.unsubscribedReceived(JID(presence["from"])) 533 + self.unsubscribedReceived(internJID(presence["from"])) 534 535 def _onPresenceSubscribe(self, presence): 536 - self.subscribeReceived(JID(presence["from"])) 537 + self.subscribeReceived(internJID(presence["from"])) 538 539 def _onPresenceUnsubscribe(self, presence): 540 - self.unsubscribeReceived(JID(presence["from"])) 541 + self.unsubscribeReceived(internJID(presence["from"])) 542 543 544 def availableReceived(self, entity, show=None, statuses=None, priority=0): 545 @@ -125,7 +128,7 @@ 546 Available presence was received. 547 548 @param entity: entity from which the presence was received. 549 - @type entity: {JID} 550 + @type entity: L{JID} 551 @param show: detailed presence information. One of C{'away'}, C{'xa'}, 552 C{'chat'}, C{'dnd'} or C{None}. 553 @type show: C{str} or C{NoneType} 554 @@ -143,7 +146,7 @@ 555 Unavailable presence was received. 556 557 @param entity: entity from which the presence was received. 558 - @type entity: {JID} 559 + @type entity: L{JID} 560 @param statuses: dictionary of natural language descriptions of the 561 availability status, keyed by the language 562 descriptor. A status without a language 563 @@ -156,7 +159,7 @@ 564 Subscription approval confirmation was received. 565 566 @param entity: entity from which the confirmation was received. 567 - @type entity: {JID} 568 + @type entity: L{JID} 569 """ 570 571 def unsubscribedReceived(self, entity): 572 @@ -164,7 +167,7 @@ 573 Unsubscription confirmation was received. 574 575 @param entity: entity from which the confirmation was received. 576 - @type entity: {JID} 577 + @type entity: L{JID} 578 """ 579 580 def subscribeReceived(self, entity): 581 @@ -172,7 +175,7 @@ 582 Subscription request was received. 583 584 @param entity: entity from which the request was received. 585 - @type entity: {JID} 586 + @type entity: L{JID} 587 """ 588 589 def unsubscribeReceived(self, entity): 590 @@ -180,7 +183,7 @@ 591 Unsubscription request was received. 592 593 @param entity: entity from which the request was received. 594 - @type entity: {JID} 595 + @type entity: L{JID} 596 """ 597 598 def available(self, entity=None, show=None, statuses=None, priority=0): 599 @@ -188,7 +191,7 @@ 600 Send available presence. 601 602 @param entity: optional entity to which the presence should be sent. 603 - @type entity: {JID} 604 + @type entity: L{JID} 605 @param show: optional detailed presence information. One of C{'away'}, 606 C{'xa'}, C{'chat'}, C{'dnd'}. 607 @type show: C{str} 608 @@ -207,7 +210,7 @@ 609 Send unavailable presence. 610 611 @param entity: optional entity to which the presence should be sent. 612 - @type entity: {JID} 613 + @type entity: L{JID} 614 @param statuses: dictionary of natural language descriptions of the 615 availability status, keyed by the language 616 descriptor. A status without a language 617 @@ -221,7 +224,7 @@ 618 Send subscription request 619 620 @param entity: entity to subscribe to. 621 - @type entity: {JID} 622 + @type entity: L{JID} 623 """ 624 self.send(Presence(to=entity, type='subscribe')) 625 626 @@ -230,7 +233,7 @@ 627 Send unsubscription request 628 629 @param entity: entity to unsubscribe from. 630 - @type entity: {JID} 631 + @type entity: L{JID} 632 """ 633 self.send(Presence(to=entity, type='unsubscribe')) 634 635 @@ -239,7 +242,7 @@ 636 Send subscription confirmation. 637 638 @param entity: entity that subscribed. 639 - @type entity: {JID} 640 + @type entity: L{JID} 641 """ 642 self.send(Presence(to=entity, type='subscribed')) 643 644 @@ -248,7 +251,7 @@ 645 Send unsubscription confirmation. 646 647 @param entity: entity that unsubscribed. 648 - @type entity: {JID} 649 + @type entity: L{JID} 650 """ 651 self.send(Presence(to=entity, type='unsubscribed')) 652 653 @@ -478,7 +481,7 @@ 654 655 @param recipient: Optional Recipient to which the presence should be 656 sent. 657 - @type recipient: {JID} 658 + @type recipient: L{JID} 659 660 @param show: Optional detailed presence information. One of C{'away'}, 661 C{'xa'}, C{'chat'}, C{'dnd'}. 662 @@ -503,7 +506,7 @@ 663 Send unavailable presence. 664 665 @param recipient: Optional entity to which the presence should be sent. 666 - @type recipient: {JID} 667 + @type recipient: L{JID} 668 669 @param statuses: dictionary of natural language descriptions of the 670 availability status, keyed by the language descriptor. A status 671 @@ -520,7 +523,7 @@ 672 Send subscription request 673 674 @param recipient: Entity to subscribe to. 675 - @type recipient: {JID} 676 + @type recipient: L{JID} 677 """ 678 presence = SubscriptionPresence(recipient=recipient, sender=sender) 679 presence.stanzaType = 'subscribe' 680 @@ -532,7 +535,7 @@ 681 Send unsubscription request 682 683 @param recipient: Entity to unsubscribe from. 684 - @type recipient: {JID} 685 + @type recipient: L{JID} 686 """ 687 presence = SubscriptionPresence(recipient=recipient, sender=sender) 688 presence.stanzaType = 'unsubscribe' 689 @@ -544,7 +547,7 @@ 690 Send subscription confirmation. 691 692 @param recipient: Entity that subscribed. 693 - @type recipient: {JID} 694 + @type recipient: L{JID} 695 """ 696 presence = SubscriptionPresence(recipient=recipient, sender=sender) 697 presence.stanzaType = 'subscribed' 698 @@ -556,7 +559,7 @@ 699 Send unsubscription confirmation. 700 701 @param recipient: Entity that unsubscribed. 702 - @type recipient: {JID} 703 + @type recipient: L{JID} 704 """ 705 presence = SubscriptionPresence(recipient=recipient, sender=sender) 706 presence.stanzaType = 'unsubscribed' 707 @@ -568,7 +571,7 @@ 708 Send presence probe. 709 710 @param recipient: Entity to be probed. 711 - @type recipient: {JID} 712 + @type recipient: L{JID} 713 """ 714 presence = ProbePresence(recipient=recipient, sender=sender) 715 self.send(presence.toElement()) 716 @@ -652,7 +655,7 @@ 717 718 719 def _parseRosterItem(self, element): 720 - jid = JID(element['jid']) 721 + jid = internJID(element['jid']) 722 item = RosterItem(jid) 723 item.name = element.getAttribute('name') 724 subscription = element.getAttribute('subscription') 725 @@ -715,7 +718,7 @@ 726 itemElement = iq.query.item 727 728 if unicode(itemElement['subscription']) == 'remove': 729 - self.onRosterRemove(JID(itemElement['jid'])) 730 + self.onRosterRemove(internJID(itemElement['jid'])) 731 else: 732 item = self._parseRosterItem(iq.query.item) 733 self.onRosterSet(item) 734 @@ -763,7 +766,7 @@ 735 def _onRosterGet(self, iq): 736 iq.handled = True 737 738 - d = self.getRoster(JID(iq["from"])) 739 + d = self.getRoster(internJID(iq["from"])) 740 d.addCallback(self._toRosterReply, iq) 741 d.addErrback(lambda _: error.ErrorStanza('internal-error').toResponse(iq)) 742 d.addBoth(self.send) 743 @@ -808,3 +811,380 @@ 744 """ 745 Called when a message stanza was received. 746 """ 747 + 748 + 749 + 750 +class AccountIQHandler(XMPPHandler): 751 + 752 + def __init__(self, sessionManager): 753 + XMPPHandler.__init__(self) 754 + self.sessionManager = sessionManager 755 + 756 + 757 + def connectionMade(self): 758 + self.xmlstream.addObserver('/iq', self.onIQ, 1) 759 + 760 + 761 + def onIQ(self, iq): 762 + """ 763 + Handler for iq stanzas to user accounts' connected resources. 764 + 765 + If the recipient is a bare JID or there is no associated user, this 766 + handler ignores the stanza, so that other handlers have a chance 767 + to pick it up. If used, L{generic.FallbackHandler} will respond with a 768 + C{'service-unavailable'} stanza error if no other handlers handle 769 + the iq. 770 + """ 771 + 772 + if iq.handled: 773 + return 774 + 775 + try: 776 + recipient = internJID(iq['to']) 777 + except KeyError: 778 + return 779 + 780 + if not recipient.user: 781 + # This is not for an account, ignore it 782 + return 783 + elif recipient.user not in self.sessionManager.accounts: 784 + # This is not a user, ignore it 785 + return 786 + elif not recipient.resource: 787 + # Bare JID at local domain, ignore it 788 + return 789 + elif recipient.user in self.sessionManager.sessions: 790 + # Full JID with connected resource, deliver the stanza 791 + self.sessionManager.deliverStanza(iq, recipient) 792 + else: 793 + # Full JID without connected resource, return error 794 + exc = error.StanzaError('service-unavailable') 795 + if iq['type'] in ('result', 'error'): 796 + log.err(exc, 'Could not deliver IQ response') 797 + else: 798 + self.send(exc.toResponse(iq)) 799 + 800 + iq.handled = True 801 + 802 + 803 + 804 +class AccountMessageHandler(XMPPHandler): 805 + 806 + def __init__(self, sessionManager): 807 + XMPPHandler.__init__(self) 808 + self.sessionManager = sessionManager 809 + 810 + 811 + def connectionMade(self): 812 + self.xmlstream.addObserver('/message', self.onMessage, 1) 813 + 814 + 815 + def onMessage(self, message): 816 + """ 817 + Handler for message stanzas to user accounts. 818 + """ 819 + 820 + if message.handled: 821 + return 822 + 823 + try: 824 + recipient = internJID(message['to']) 825 + except KeyError: 826 + return 827 + 828 + stanzaType = message.getAttribute('type', 'normal') 829 + 830 + try: 831 + if not recipient.user: 832 + # This is not for an account, ignore it 833 + return 834 + elif recipient.user not in self.sessionManager.accounts: 835 + # This is not a user, ignore it 836 + return 837 + elif recipient.resource: 838 + userSessions = self.sessionManager.sessions.get(recipient.user, 839 + {}) 840 + if recipient.resource in userSessions: 841 + self.sessionManager.deliverStanza(message, recipient) 842 + else: 843 + if stanzaType in ('normal', 'chat', 'headline'): 844 + self.onMessageBareJID(message, recipient.userhostJID()) 845 + elif stanzaType == 'error': 846 + log.msg("Dropping message to unconnected resource %r" % 847 + recipient.full()) 848 + elif stanzaType == 'groupchat': 849 + raise error.StanzaError('service-unavailable') 850 + else: 851 + self.onMessageBareJID(message, recipient) 852 + except error.StanzaError, exc: 853 + if stanzaType == 'error': 854 + log.err(exc, "Undeliverable error") 855 + else: 856 + self.send(exc.toResponse(message)) 857 + 858 + message.handled = True 859 + 860 + 861 + def onMessageBareJID(self, message, bareJID): 862 + stanzaType = message.getAttribute('type', 'normal') 863 + 864 + userSessions = self.sessionManager.sessions.get(bareJID.user, {}) 865 + print userSessions 866 + 867 + recipients = set() 868 + 869 + if stanzaType == 'headline': 870 + for session in userSessions: 871 + if session.presence.priority >= 0: 872 + recipients.add(session.entity) 873 + elif stanzaType in ('chat', 'normal'): 874 + priorities = {} 875 + for session in userSessions.itervalues(): 876 + if not session.presence or not session.presence.available: 877 + continue 878 + priority = session.presence.priority 879 + if priority >= 0: 880 + priorities.setdefault(priority, set()).add(session.entity) 881 + maxPriority = max(priorities.keys()) 882 + recipients.update(priorities[maxPriority]) 883 + elif stanzaType == 'groupchat': 884 + raise error.StanzaError('service-unavailable') 885 + 886 + if recipients: 887 + for recipient in recipients: 888 + self.sessionManager.deliverStanza(message, recipient) 889 + elif stanzaType in ('chat', 'normal'): 890 + raise error.StanzaError('service-unavailable') 891 + else: 892 + # silently discard 893 + log.msg("Discarding message to %r" % message['to']) 894 + 895 + 896 + 897 + 898 +def clonePresence(presence): 899 + """ 900 + Make a deep copy of a presence stanza. 901 + 902 + The returned presence stanza is an orphaned deep copy of the given 903 + original. 904 + 905 + @note: Since the reference to the original parent, if any, is gone, 906 + inherited attributes like C{xml:lang} are not preserved. 907 + """ 908 + element = presence.element 909 + 910 + parent = element.parent 911 + element.parent = None 912 + newElement = copy.deepcopy(element) 913 + element.parent = parent 914 + return newElement 915 + 916 + 917 + 918 +class PresenceServerHandler(PresenceProtocol): 919 + 920 + def __init__(self, sessionManager, domain, roster): 921 + PresenceProtocol.__init__(self) 220 922 + self.sessionManager = sessionManager 221 923 + self.domain = domain 222 + 223 + self.streams = {} 224 + 225 + pipe = generic.XmlPipe() 226 + self.xmlstream = pipe.source 227 + self.sessionManager.makeConnection(pipe.sink) 228 + self.xmlstream.addObserver('/*', self.send) 229 + 230 + 231 + def connectionInitialized(self, xs): 232 + self.streams[xs.otherEntity] = xs 233 + 234 + 235 + def connectionLost(self, xs, reason): 236 + if xs.otherEntity in self.streams: 237 + del self.streams[xs.otherEntity] 238 + 239 + 240 + def send(self, stanza): 924 + self.roster = roster 925 + self.presences = {} # user -> resource -> presence 926 + self.offlinePresences = {} # user -> presence 927 + self.remotePresences = {} # user -> remote entity -> presence 928 + 929 + 930 + def _broadcastToOtherResources(self, presence): 241 931 + """ 242 + Send stanza to the proper XML Stream. 243 + 244 + This uses addressing embedded in the element to find the correct stream 245 + to forward the element to. 932 + Broadcast presence to other available resources. 246 933 + """ 247 + destination = internJID(stanza["to"]) 248 + 249 + if destination not in self.streams: 250 + log.msg("Euh") 251 + raise Exception("Destination unreachable") 252 + 253 + self.streams[destination].send(stanza) 254 + 255 + 256 + def dispatch(self, xs, stanza): 934 + fromJID = presence.sender 935 + for otherResource in self.presences[fromJID.user]: 936 + if otherResource == fromJID.resource: 937 + continue 938 + 939 + resourceJID = JID(tuple=(fromJID.user, fromJID.host, otherResource)) 940 + outPresence = clonePresence(presence) 941 + outPresence['to'] = resourceJID.full() 942 + self.sessionManager.deliverStanza(outPresence, resourceJID) 943 + 944 + 945 + def _broadcastToContacts(self, presence): 257 946 + """ 258 + Called when an element was received from one of the connected streams.947 + Broadcast presence to subscribed entities. 259 948 + """ 260 + stanza["from"] = xs.otherEntity.full() 261 + self.xmlstream.send(stanza) 949 + fromJID = presence.sender 950 + roster = self.roster[fromJID.user] 951 + 952 + for item in roster.itervalues(): 953 + if not item.subscriptionFrom: 954 + continue 955 + 956 + outPresence = clonePresence(presence) 957 + outPresence['to'] = item.jid.full() 958 + 959 + if item.jid.host == self.domain: 960 + # local contact 961 + if item.jid.user in self.presences: 962 + # broadcast to contact's available resources 963 + for itemResource in self.presences[item.jid.user]: 964 + resourceJID = JID(tuple=(item.jid.user, 965 + item.jid.host, 966 + itemResource)) 967 + self.sessionManager.deliverStanza(outPresence, 968 + resourceJID) 969 + else: 970 + # remote contact 971 + self.send(outPresence) 972 + 973 + 974 + def _on_availableBroadcast(self, presence): 975 + fromJID = presence.sender 976 + user, resource = fromJID.user, fromJID.resource 977 + roster = self.roster[user] 978 + 979 + if user not in self.presences: 980 + # initial presence 981 + self.presences[user] = {} 982 + self.remotePresences[user] = {} 983 + 984 + # send out probes 985 + for item in roster.itervalues(): 986 + if item.subscriptionTo and item.jid.host != self.domain: 987 + self.probe(item.jid, fromJID) 988 + else: 989 + if resource not in self.presences[user]: 990 + # initial presence with another available resource 991 + 992 + # send last known presences from remote contacts 993 + remotePresences = self.remotePresences[user] 994 + for entity, remotePresence in remotePresences.iteritems(): 995 + self.sessionManager.deliverStanza(remotePresence.element, 996 + fromJID) 997 + 998 + # send presence to other resources 999 + self._broadcastToOtherResources(presence) 1000 + 1001 + # Send last known local presences 1002 + if user not in self.presences or resource not in self.presences[user]: 1003 + for item in roster.itervalues(): 1004 + if item.subscriptionTo and \ 1005 + item.jid.host == self.domain and \ 1006 + item.jid.user in self.presences: 1007 + for contactPresence in \ 1008 + self.presences[item.jid.user].itervalues(): 1009 + outPresence = clonePresence(contactPresence) 1010 + outPresence['to'] = fromJID.userhost() 1011 + self.sessionManager.deliverStanza(outPresence, fromJID) 1012 + 1013 + # broadcast presence 1014 + self._broadcastToContacts(presence) 1015 + 1016 + # save presence 1017 + self.presences[user][resource] = presence 1018 + self.sessionManager.sessions[user][resource].presence = presence 1019 + 1020 + 1021 + #def _on_availableDirected(self, presence): 1022 + # pass 1023 + 1024 + 1025 + def _on_availableInbound(self, presence): 1026 + fromJID = presence.sender 1027 + toJID = presence.recipient 1028 + if (toJID.user in self.roster and 1029 + toJID.user in self.presences): 1030 + for resource in self.presences[toJID.user]: 1031 + resourceJID = JID(tuple=(toJID.user, toJID.host, resource)) 1032 + self.sessionManager.deliverStanza(presence.element, resourceJID) 1033 + self.remotePresences[toJID.user][fromJID] = presence 1034 + else: 1035 + # no such user or no available resource, ignore this stanza 1036 + pass 1037 + 1038 + 1039 + def _on_unavailableBroadcast(self, presence): 1040 + fromJID = presence.sender 1041 + user, resource = fromJID.user, fromJID.resource 1042 + 1043 + # broadcast presence 1044 + self._broadcastToContacts(presence) 1045 + 1046 + if user in self.presences: 1047 + # send presence to other resources 1048 + self._broadcastToOtherResources(presence) 1049 + 1050 + # update stored presences 1051 + if resource in self.presences[user]: 1052 + del self.presences[user][resource] 1053 + 1054 + if not self.presences[user]: 1055 + # last resource to become unavailable 1056 + del self.presences[user] 1057 + 1058 + # TODO: save last unavailable presence 1059 + 1060 + 1061 +# def _on_unavailableDirected(self, presence): 1062 +# pass 1063 + 1064 + 1065 +# def _on_unavailableInbound(self, presence): 1066 +# pass 1067 + 1068 + 1069 + def getDirection(self, presence): 1070 + if not presence.recipient: 1071 + if presence.sender.host == self.domain: 1072 + # broadcast presence from local domain 1073 + return 'Broadcast' 1074 + else: 1075 + raise Exception("Unexpected missing to address") 1076 + else: 1077 + if presence.sender.host == self.domain: 1078 + # directed presence from local domain 1079 + return 'Directed' 1080 + elif presence.recipient.host == self.domain: 1081 + # incoming remote presence 1082 + return 'Inbound' 1083 + else: 1084 + raise Exception("Badly routed presence") 1085 + 1086 + def availableReceived(self, presence): 1087 + direction = self.getDirection(presence) 1088 + handler = getattr(self, "_on_available%s" % direction) 1089 + if handler: 1090 + handler(presence) 1091 + presence.handled = True 1092 + else: 1093 + print "Unhandled: %r" % presence.element.toXml() 1094 + 1095 + 1096 + def unavailableReceived(self, presence): 1097 + direction = self.getDirection(presence) 1098 + handler = getattr(self, "_on_unavailable%s" % direction) 1099 + if handler: 1100 + handler(presence) 1101 + presence.handled = True 1102 + else: 1103 + print "Unhandled: %r" % presence.element.toXml() 1104 + 1105 + 1106 + 1107 + def probeReceived(self, presence): 1108 + fromJID = presence.sender 1109 + toJID = presence.recipient 1110 + 1111 + if toJID.user not in self.roster or \ 1112 + fromJID.userhost() not in self.roster[toJID.user] or \ 1113 + not self.roster[toJID.user][fromJID.userhost()].subscriptionFrom: 1114 + # send unsubscribed 1115 + pass 1116 + elif toJID.user not in self.presences: 1117 + # send last unavailable or nothing 1118 + pass 1119 + else: 1120 + for resourcePresence in self.presences[toJID.user].itervalues(): 1121 + outPresence = clonePresence(resourcePresence) 1122 + outPresence['to'] = fromJID.userhost() 1123 + self.send(outPresence)
Note: See TracChangeset
for help on using the changeset viewer.