source:
ralphm-patches/xmpp_client_service.patch
@
39:d6a0f8cbabf3
Last change on this file since 39:d6a0f8cbabf3 was 39:d6a0f8cbabf3, checked in by Ralph Meijer <ralphm@…>, 11 years ago | |
---|---|
File size: 37.4 KB |
-
new file doc/examples/client_service.tac
diff -r 62f841ed2a99 doc/examples/client_service.tac
- + 1 from twisted.application import service, strports 2 from twisted.internet import defer 3 4 from wokkel import client, xmppim 5 from wokkel.component import InternalComponent, Router 6 from wokkel.generic import FallbackHandler 7 from wokkel.ping import PingHandler 8 from wokkel.xmppim import RosterItem 9 10 from twisted.words.protocols.jabber.jid import internJID as JID 11 12 import socket 13 domain = socket.gethostname() 14 15 RALPHM = JID('ralphm@'+domain) 16 INTOSI = JID('intosi@'+domain) 17 TERMIE = JID('termie@'+domain) 18 19 roster = { 20 'ralphm': { 21 INTOSI: RosterItem(INTOSI, 22 subscriptionTo=True, 23 subscriptionFrom=True, 24 name='Intosi', 25 groups=set(['Friends'])), 26 TERMIE: RosterItem(TERMIE, 27 subscriptionTo=True, 28 subscriptionFrom=True, 29 name='termie'), 30 }, 31 'termie': { 32 RALPHM: RosterItem(RALPHM, 33 subscriptionTo=True, 34 subscriptionFrom=True, 35 name='ralphm'), 36 } 37 } 38 39 accounts = set(roster.keys()) 40 41 42 class StaticRoster(xmppim.RosterServerProtocol): 43 44 def __init__(self, roster): 45 xmppim.RosterServerProtocol.__init__(self) 46 self.roster = roster 47 48 def getRoster(self, entity): 49 return defer.succeed(self.roster[entity.user].values()) 50 51 52 53 application = service.Application("Jabber server") 54 55 router = Router() 56 component = InternalComponent(router, domain) 57 component.setServiceParent(application) 58 59 sessionManager = client.SessionManager(domain, accounts) 60 sessionManager.setHandlerParent(component) 61 62 xmppim.AccountIQHandler(sessionManager).setHandlerParent(component) 63 xmppim.AccountMessageHandler(sessionManager).setHandlerParent(component) 64 xmppim.PresenceServerHandler(sessionManager, domain, roster).setHandlerParent(component) 65 FallbackHandler().setHandlerParent(component) 66 StaticRoster(roster).setHandlerParent(component) 67 PingHandler().setHandlerParent(component) 68 69 c2sFactory = client.XMPPC2SServerFactory(sessionManager) 70 c2sFactory.logTraffic = True 71 c2sService = strports.service('5224', c2sFactory) 72 c2sService.setServiceParent(application) 73 74 sessionManager.connectionManager = c2sFactory -
wokkel/client.py
diff -r 62f841ed2a99 wokkel/client.py
a b 10 10 that should probably eventually move there. 11 11 """ 12 12 13 import base64 14 13 15 from twisted.application import service 14 from twisted.internet import reactor16 from twisted.internet import defer, reactor 15 17 from twisted.names.srvconnect import SRVConnector 16 from twisted.words.protocols.jabber import client, sasl, xmlstream 18 from twisted.python import log, randbytes 19 from twisted.words.protocols.jabber import client, error, sasl, xmlstream 20 from twisted.words.protocols.jabber.jid import JID, internJID 21 from twisted.words.xish import domish 17 22 18 23 from wokkel import generic 19 from wokkel.subprotocols import StreamManager 24 from wokkel.compat import XmlStreamServerFactory 25 from wokkel.subprotocols import StreamManager, XMPPHandler 26 27 NS_CLIENT = 'jabber:client' 28 29 XPATH_ALL = "/*" 30 XPATH_AUTH = "/auth[@xmlns='%s']" % sasl.NS_XMPP_SASL 31 XPATH_BIND = "/iq[@type='set']/bind[@xmlns='%s']" % client.NS_XMPP_BIND 32 XPATH_SESSION = "/iq[@type='set']/session[@xmlns='%s']" % \ 33 client.NS_XMPP_SESSION 20 34 21 35 class CheckAuthInitializer(object): 22 36 """ … … 51 65 autentication. 52 66 """ 53 67 54 namespace = 'jabber:client'68 namespace = NS_CLIENT 55 69 56 70 def __init__(self, jid, password): 57 71 xmlstream.ConnectAuthenticator.__init__(self, jid.host) … … 186 200 c = XMPPClientConnector(reactor, domain, factory) 187 201 c.connect() 188 202 return factory.deferred 203 204 205 206 class XMPPClientListenAuthenticator(xmlstream.ListenAuthenticator): 207 namespace = NS_CLIENT 208 209 def __init__(self, service): 210 self.service = service 211 self.failureGrace = 3 212 self.state = 'auth' 213 214 215 def associateWithStream(self, xs): 216 xmlstream.ListenAuthenticator.associateWithStream(self, xs) 217 self.xmlstream.addObserver(XPATH_ALL, self.onElementFallback, -1) 218 219 220 def onElementFallback(self, element): 221 if element.handled: 222 return 223 224 exc = error.StreamError('not-authorized') 225 self.xmlstream.sendStreamError(exc) 226 227 228 def streamStarted(self, rootElement): 229 xmlstream.ListenAuthenticator.streamStarted(self, rootElement) 230 231 # check namespace 232 #if self.xmlstream.namespace != self.namespace: 233 # self.xmlstream.namespace = self.namespace 234 # exc = error.StreamError('invalid-namespace') 235 # self.xmlstream.sendStreamError(exc) 236 # return 237 238 # TODO: check domain (self.service.domain) 239 240 self.xmlstream.sendHeader() 241 242 try: 243 stateHandlerName = 'streamStarted_' + self.state 244 stateHandler = getattr(self, stateHandlerName) 245 except AttributeError: 246 log.msg('streamStarted handler for', self.state, 'not found') 247 else: 248 stateHandler() 249 250 251 def toState(self, state): 252 self.state = state 253 if state == 'initialized': 254 self.xmlstream.removeObserver(XPATH_ALL, self.onElementFallback) 255 self.xmlstream.addOnetimeObserver(XPATH_SESSION, self.onSession, 1) 256 self.xmlstream.dispatch(self.xmlstream, 257 xmlstream.STREAM_AUTHD_EVENT) 258 259 260 def streamStarted_auth(self): 261 features = domish.Element((xmlstream.NS_STREAMS, 'features')) 262 features.addElement((sasl.NS_XMPP_SASL, 'mechanisms')) 263 features.mechanisms.addElement('mechanism', content='PLAIN') 264 self.xmlstream.send(features) 265 self.xmlstream.addOnetimeObserver(XPATH_AUTH, self.onAuth) 266 267 268 def onAuth(self, auth): 269 auth.handled = True 270 271 if auth.getAttribute('mechanism') != 'PLAIN': 272 failure = domish.Element((sasl.NS_XMPP_SASL, 'failure')) 273 failure.addElement('invalid-mechanism') 274 self.xmlstream.send(failure) 275 276 # Close stream on too many failing authentication attempts 277 self.failureGrace -= 1 278 if self.failureGrace == 0: 279 self.xmlstream.sendFooter() 280 else: 281 self.xmlstream.addOnetimeObserver(XPATH_AUTH, self.onAuth) 282 283 return 284 285 initialResponse = base64.b64decode(unicode(auth)) 286 authzid, authcid, passwd = initialResponse.split('\x00') 287 288 # TODO: check passwd 289 290 # authenticated 291 292 self.username = authcid 293 294 success = domish.Element((sasl.NS_XMPP_SASL, 'success')) 295 self.xmlstream.send(success) 296 self.xmlstream.reset() 297 298 self.toState('bind') 299 300 301 def streamStarted_bind(self): 302 features = domish.Element((xmlstream.NS_STREAMS, 'features')) 303 features.addElement((client.NS_XMPP_BIND, 'bind')) 304 features.addElement((client.NS_XMPP_SESSION, 'session')) 305 self.xmlstream.send(features) 306 self.xmlstream.addOnetimeObserver(XPATH_BIND, self.onBind) 307 308 309 def onBind(self, iq): 310 def cb(boundJID): 311 self.xmlstream.otherEntity = boundJID 312 self.toState('initialized') 313 314 response = xmlstream.toResponse(iq, 'result') 315 response.addElement((client.NS_XMPP_BIND, 'bind')) 316 response.bind.addElement((client.NS_XMPP_BIND, 'jid'), 317 content=boundJID.full()) 318 319 return response 320 321 def eb(failure): 322 if not isinstance(failure, error.StanzaError): 323 log.msg(failure) 324 exc = error.StanzaError('internal-server-error') 325 else: 326 exc = failure.value 327 328 return exc.toResponse(iq) 329 330 iq.handled = True 331 resource = unicode(iq.bind) or None 332 d = self.service.bindResource(self.username, 333 self.service.domain, 334 resource) 335 d.addCallback(cb) 336 d.addErrback(eb) 337 d.addCallback(self.xmlstream.send) 338 339 340 def onSession(self, iq): 341 iq.handled = True 342 343 reply = domish.Element((None, 'iq')) 344 reply['type'] = 'result' 345 if iq.getAttribute('id'): 346 reply['id'] = iq['id'] 347 reply.addElement((client.NS_XMPP_SESSION, 'session')) 348 self.xmlstream.send(reply) 349 350 351 352 class RecipientUnavailable(Exception): 353 """ 354 The addressed entity is not, or no longer, available. 355 """ 356 357 358 359 class XMPPC2SServerFactory(XmlStreamServerFactory): 360 361 def __init__(self, service): 362 self.service = service 363 364 def authenticatorFactory(): 365 return XMPPClientListenAuthenticator(service) 366 367 XmlStreamServerFactory.__init__(self, authenticatorFactory) 368 self.addBootstrap(xmlstream.STREAM_CONNECTED_EVENT, 369 self.onConnectionMade) 370 self.addBootstrap(xmlstream.STREAM_AUTHD_EVENT, 371 self.onAuthenticated) 372 373 self.serial = 0 374 self.streams = {} 375 376 377 def onConnectionMade(self, xs): 378 """ 379 Called when a client-to-server connection was made. 380 381 This enables traffic debugging on incoming streams. 382 """ 383 xs.serial = self.serial 384 self.serial += 1 385 386 log.msg("Client connection %d made" % xs.serial) 387 388 def logDataIn(buf): 389 log.msg("RECV (%d): %r" % (xs.serial, buf)) 390 391 def logDataOut(buf): 392 log.msg("SEND (%d): %r" % (xs.serial, buf)) 393 394 if self.logTraffic: 395 xs.rawDataInFn = logDataIn 396 xs.rawDataOutFn = logDataOut 397 398 xs.addObserver(xmlstream.STREAM_ERROR_EVENT, self.onError) 399 400 401 def onAuthenticated(self, xs): 402 log.msg("Client connection %d authenticated" % xs.serial) 403 404 xs.addObserver(xmlstream.STREAM_END_EVENT, self.onConnectionLost, 405 0, xs) 406 xs.addObserver('/*', self.onElement, 0, xs) 407 408 # Record this stream as bound to the authenticated JID 409 self.streams[xs.otherEntity] = xs 410 411 412 def onConnectionLost(self, xs, reason): 413 log.msg("Client connection %d disconnected" % xs.serial) 414 415 entity = xs.otherEntity 416 self.service.unbindResource(entity.user, 417 entity.host, 418 entity.resource, 419 reason) 420 421 # If the lost connections had been bound, remove the reference 422 if xs.otherEntity in self.streams: 423 del self.streams[xs.otherEntity] 424 425 426 def onError(self, reason): 427 log.err(reason, "Stream Error") 428 429 430 def onElement(self, xs, stanza): 431 """ 432 Called when an element was received from one of the connected streams. 433 434 """ 435 if stanza.handled: 436 return 437 else: 438 self.service.onElement(stanza, xs.otherEntity) 439 440 441 def deliverStanza(self, element, recipient): 442 if recipient in self.streams: 443 self.streams[recipient].send(element) 444 else: 445 raise RecipientUnavailable(u"There is no connection for %s" % 446 recipient.full()) 447 448 449 class Session(object): 450 def __init__(self, entity): 451 self.entity = entity 452 self.connected = False 453 self.interested = False 454 self.presence = None 455 456 457 458 class SessionManager(XMPPHandler): 459 460 """ 461 Session Manager. 462 463 @ivar xmlstream: XML Stream to inject incoming stanzas from client 464 connections into. Stanzas where the C{'to'} attribute is not set 465 or is directed at the local domain are injected as if received on 466 the XML Stream (using C{dispatch}), other stanzas are injected as if 467 they were sent from the XML Stream (using C{send}). 468 """ 469 470 def __init__(self, domain, accounts): 471 XMPPHandler.__init__(self) 472 self.domain = domain 473 self.accounts = accounts 474 475 self.connectionManager = None 476 self.sessions = {} 477 478 479 def bindResource(self, localpart, domain, resource): 480 if domain != self.domain: 481 raise Exception("I don't host this domain!") 482 483 try: 484 userSessions = self.sessions[localpart] 485 except KeyError: 486 userSessions = self.sessions[localpart] = {} 487 488 if resource is None: 489 resource = randbytes.secureRandom(8).encode('hex') 490 elif resource in self.userSessions: 491 resource = resource + ' ' + randbytes.secureRandom(8).encode('hex') 492 493 entity = JID(tuple=(localpart, domain, resource)) 494 session = Session(entity) 495 session.connected = True 496 userSessions[resource] = session 497 498 return defer.succeed(entity) 499 500 501 def unbindResource(self, localpart, domain, resource, reason=None): 502 try: 503 session = self.sessions[localpart][resource] 504 except KeyError: 505 pass 506 else: 507 session.connected = False 508 del self.sessions[localpart][resource] 509 if not self.sessions[localpart]: 510 del self.sessions[localpart] 511 512 return defer.succeed(None) 513 514 515 def onElement(self, element, sender): 516 # Make sure each stanza has a sender address 517 if (element.name == 'presence' and 518 element.getAttribute('type') in ('subscribe', 'subscribed', 519 'unsubscribe', 'unsubscribed')): 520 element['from'] = sender.userhost() 521 else: 522 element['from'] = sender.full() 523 524 if (not element.hasAttribute('to') or 525 internJID(element['to']).host == self.domain): 526 # This stanza is for local delivery 527 self.xmlstream.dispatch(element) 528 else: 529 # This stanza is for remote delivery 530 self.xmlstream.send(element) 531 532 533 def deliverStanza(self, element, recipient): 534 if self.connectionManager: 535 self.connectionManager.deliverStanza(element, recipient) 536 else: 537 raise Exception("No connection manager set") -
wokkel/component.py
diff -r 62f841ed2a99 wokkel/component.py
a b 313 313 """ 314 314 destination = JID(stanza['to']) 315 315 316 log.msg("Routing to %s: %r" % (destination.full(), stanza.toXml()))317 316 318 317 if destination.host in self.routes: 318 msg = "Routing to %s (default route): %r" 319 log.msg("Routing to %s: %r" % (destination.full(), 320 stanza.toXml())) 319 321 self.routes[destination.host].send(stanza) 322 elif None in self.routers: 323 log.msg("Routing to %s (default route): %r" % (destination.full(), 324 stanza.toXml())) 325 self.routes[None].send(stanza) 320 326 else: 321 self.routes[None].send(stanza) 327 log.msg("No route to %s: %r" % (destination.full(), 328 stanza.toXml())) 329 if stanza.getAttribute('type') not in ('result', 'error'): 330 # No route, send back error 331 exc = error.StanzaError('remote-server-timeout') 332 response = exc.toResponse(stanza) 333 self.route(response) 322 334 323 335 324 336 -
wokkel/xmppim.py
diff -r 62f841ed2a99 wokkel/xmppim.py
a b 12 12 All of it should eventually move to Twisted. 13 13 """ 14 14 15 import copy 16 17 from twisted.python import log 15 18 from twisted.words.protocols.jabber import error, xmlstream 16 from twisted.words.protocols.jabber.jid import JID 19 from twisted.words.protocols.jabber.jid import JID, internJID 17 20 from twisted.words.xish import domish 18 21 19 22 from wokkel.compat import IQ … … 85 88 handler(presence) 86 89 87 90 def _onPresenceAvailable(self, presence): 88 entity = JID(presence["from"])91 entity = internJID(presence["from"]) 89 92 90 93 show = unicode(presence.show or '') 91 94 if show not in ['away', 'xa', 'chat', 'dnd']: … … 101 104 self.availableReceived(entity, show, statuses, priority) 102 105 103 106 def _onPresenceUnavailable(self, presence): 104 entity = JID(presence["from"])107 entity = internJID(presence["from"]) 105 108 106 109 statuses = self._getStatuses(presence) 107 110 108 111 self.unavailableReceived(entity, statuses) 109 112 110 113 def _onPresenceSubscribed(self, presence): 111 self.subscribedReceived( JID(presence["from"]))114 self.subscribedReceived(internJID(presence["from"])) 112 115 113 116 def _onPresenceUnsubscribed(self, presence): 114 self.unsubscribedReceived( JID(presence["from"]))117 self.unsubscribedReceived(internJID(presence["from"])) 115 118 116 119 def _onPresenceSubscribe(self, presence): 117 self.subscribeReceived( JID(presence["from"]))120 self.subscribeReceived(internJID(presence["from"])) 118 121 119 122 def _onPresenceUnsubscribe(self, presence): 120 self.unsubscribeReceived( JID(presence["from"]))123 self.unsubscribeReceived(internJID(presence["from"])) 121 124 122 125 123 126 def availableReceived(self, entity, show=None, statuses=None, priority=0): … … 125 128 Available presence was received. 126 129 127 130 @param entity: entity from which the presence was received. 128 @type entity: {JID}131 @type entity: L{JID} 129 132 @param show: detailed presence information. One of C{'away'}, C{'xa'}, 130 133 C{'chat'}, C{'dnd'} or C{None}. 131 134 @type show: C{str} or C{NoneType} … … 143 146 Unavailable presence was received. 144 147 145 148 @param entity: entity from which the presence was received. 146 @type entity: {JID}149 @type entity: L{JID} 147 150 @param statuses: dictionary of natural language descriptions of the 148 151 availability status, keyed by the language 149 152 descriptor. A status without a language … … 156 159 Subscription approval confirmation was received. 157 160 158 161 @param entity: entity from which the confirmation was received. 159 @type entity: {JID}162 @type entity: L{JID} 160 163 """ 161 164 162 165 def unsubscribedReceived(self, entity): … … 164 167 Unsubscription confirmation was received. 165 168 166 169 @param entity: entity from which the confirmation was received. 167 @type entity: {JID}170 @type entity: L{JID} 168 171 """ 169 172 170 173 def subscribeReceived(self, entity): … … 172 175 Subscription request was received. 173 176 174 177 @param entity: entity from which the request was received. 175 @type entity: {JID}178 @type entity: L{JID} 176 179 """ 177 180 178 181 def unsubscribeReceived(self, entity): … … 180 183 Unsubscription request was received. 181 184 182 185 @param entity: entity from which the request was received. 183 @type entity: {JID}186 @type entity: L{JID} 184 187 """ 185 188 186 189 def available(self, entity=None, show=None, statuses=None, priority=0): … … 188 191 Send available presence. 189 192 190 193 @param entity: optional entity to which the presence should be sent. 191 @type entity: {JID}194 @type entity: L{JID} 192 195 @param show: optional detailed presence information. One of C{'away'}, 193 196 C{'xa'}, C{'chat'}, C{'dnd'}. 194 197 @type show: C{str} … … 207 210 Send unavailable presence. 208 211 209 212 @param entity: optional entity to which the presence should be sent. 210 @type entity: {JID}213 @type entity: L{JID} 211 214 @param statuses: dictionary of natural language descriptions of the 212 215 availability status, keyed by the language 213 216 descriptor. A status without a language … … 221 224 Send subscription request 222 225 223 226 @param entity: entity to subscribe to. 224 @type entity: {JID}227 @type entity: L{JID} 225 228 """ 226 229 self.send(Presence(to=entity, type='subscribe')) 227 230 … … 230 233 Send unsubscription request 231 234 232 235 @param entity: entity to unsubscribe from. 233 @type entity: {JID}236 @type entity: L{JID} 234 237 """ 235 238 self.send(Presence(to=entity, type='unsubscribe')) 236 239 … … 239 242 Send subscription confirmation. 240 243 241 244 @param entity: entity that subscribed. 242 @type entity: {JID}245 @type entity: L{JID} 243 246 """ 244 247 self.send(Presence(to=entity, type='subscribed')) 245 248 … … 248 251 Send unsubscription confirmation. 249 252 250 253 @param entity: entity that unsubscribed. 251 @type entity: {JID}254 @type entity: L{JID} 252 255 """ 253 256 self.send(Presence(to=entity, type='unsubscribed')) 254 257 … … 478 481 479 482 @param recipient: Optional Recipient to which the presence should be 480 483 sent. 481 @type recipient: {JID}484 @type recipient: L{JID} 482 485 483 486 @param show: Optional detailed presence information. One of C{'away'}, 484 487 C{'xa'}, C{'chat'}, C{'dnd'}. … … 503 506 Send unavailable presence. 504 507 505 508 @param recipient: Optional entity to which the presence should be sent. 506 @type recipient: {JID}509 @type recipient: L{JID} 507 510 508 511 @param statuses: dictionary of natural language descriptions of the 509 512 availability status, keyed by the language descriptor. A status … … 520 523 Send subscription request 521 524 522 525 @param recipient: Entity to subscribe to. 523 @type recipient: {JID}526 @type recipient: L{JID} 524 527 """ 525 528 presence = SubscriptionPresence(recipient=recipient, sender=sender) 526 529 presence.stanzaType = 'subscribe' … … 532 535 Send unsubscription request 533 536 534 537 @param recipient: Entity to unsubscribe from. 535 @type recipient: {JID}538 @type recipient: L{JID} 536 539 """ 537 540 presence = SubscriptionPresence(recipient=recipient, sender=sender) 538 541 presence.stanzaType = 'unsubscribe' … … 544 547 Send subscription confirmation. 545 548 546 549 @param recipient: Entity that subscribed. 547 @type recipient: {JID}550 @type recipient: L{JID} 548 551 """ 549 552 presence = SubscriptionPresence(recipient=recipient, sender=sender) 550 553 presence.stanzaType = 'subscribed' … … 556 559 Send unsubscription confirmation. 557 560 558 561 @param recipient: Entity that unsubscribed. 559 @type recipient: {JID}562 @type recipient: L{JID} 560 563 """ 561 564 presence = SubscriptionPresence(recipient=recipient, sender=sender) 562 565 presence.stanzaType = 'unsubscribed' … … 568 571 Send presence probe. 569 572 570 573 @param recipient: Entity to be probed. 571 @type recipient: {JID}574 @type recipient: L{JID} 572 575 """ 573 576 presence = ProbePresence(recipient=recipient, sender=sender) 574 577 self.send(presence.toElement()) … … 652 655 653 656 654 657 def _parseRosterItem(self, element): 655 jid = JID(element['jid'])658 jid = internJID(element['jid']) 656 659 item = RosterItem(jid) 657 660 item.name = element.getAttribute('name') 658 661 subscription = element.getAttribute('subscription') … … 715 718 itemElement = iq.query.item 716 719 717 720 if unicode(itemElement['subscription']) == 'remove': 718 self.onRosterRemove( JID(itemElement['jid']))721 self.onRosterRemove(internJID(itemElement['jid'])) 719 722 else: 720 723 item = self._parseRosterItem(iq.query.item) 721 724 self.onRosterSet(item) … … 763 766 def _onRosterGet(self, iq): 764 767 iq.handled = True 765 768 766 d = self.getRoster( JID(iq["from"]))769 d = self.getRoster(internJID(iq["from"])) 767 770 d.addCallback(self._toRosterReply, iq) 768 771 d.addErrback(lambda _: error.ErrorStanza('internal-error').toResponse(iq)) 769 772 d.addBoth(self.send) … … 808 811 """ 809 812 Called when a message stanza was received. 810 813 """ 814 815 816 817 class AccountIQHandler(XMPPHandler): 818 819 def __init__(self, sessionManager): 820 XMPPHandler.__init__(self) 821 self.sessionManager = sessionManager 822 823 824 def connectionMade(self): 825 self.xmlstream.addObserver('/iq', self.onIQ, 1) 826 827 828 def onIQ(self, iq): 829 """ 830 Handler for iq stanzas to user accounts' connected resources. 831 832 If the recipient is a bare JID or there is no associated user, this 833 handler ignores the stanza, so that other handlers have a chance 834 to pick it up. If used, L{generic.FallbackHandler} will respond with a 835 C{'service-unavailable'} stanza error if no other handlers handle 836 the iq. 837 """ 838 839 if iq.handled: 840 return 841 842 try: 843 recipient = internJID(iq['to']) 844 except KeyError: 845 return 846 847 if not recipient.user: 848 # This is not for an account, ignore it 849 return 850 elif recipient.user not in self.sessionManager.accounts: 851 # This is not a user, ignore it 852 return 853 elif not recipient.resource: 854 # Bare JID at local domain, ignore it 855 return 856 elif recipient.user in self.sessionManager.sessions: 857 # Full JID with connected resource, deliver the stanza 858 self.sessionManager.deliverStanza(iq, recipient) 859 else: 860 # Full JID without connected resource, return error 861 exc = error.StanzaError('service-unavailable') 862 if iq['type'] in ('result', 'error'): 863 log.err(exc, 'Could not deliver IQ response') 864 else: 865 self.send(exc.toResponse(iq)) 866 867 iq.handled = True 868 869 870 871 class AccountMessageHandler(XMPPHandler): 872 873 def __init__(self, sessionManager): 874 XMPPHandler.__init__(self) 875 self.sessionManager = sessionManager 876 877 878 def connectionMade(self): 879 self.xmlstream.addObserver('/message', self.onMessage, 1) 880 881 882 def onMessage(self, message): 883 """ 884 Handler for message stanzas to user accounts. 885 """ 886 887 if message.handled: 888 return 889 890 try: 891 recipient = internJID(message['to']) 892 except KeyError: 893 return 894 895 stanzaType = message.getAttribute('type', 'normal') 896 897 try: 898 if not recipient.user: 899 # This is not for an account, ignore it 900 return 901 elif recipient.user not in self.sessionManager.accounts: 902 # This is not a user, ignore it 903 return 904 elif recipient.resource: 905 userSessions = self.sessionManager.sessions.get(recipient.user, 906 {}) 907 if recipient.resource in userSessions: 908 self.sessionManager.deliverStanza(message, recipient) 909 else: 910 if stanzaType in ('normal', 'chat', 'headline'): 911 self.onMessageBareJID(message, recipient.userhostJID()) 912 elif stanzaType == 'error': 913 log.msg("Dropping message to unconnected resource %r" % 914 recipient.full()) 915 elif stanzaType == 'groupchat': 916 raise error.StanzaError('service-unavailable') 917 else: 918 self.onMessageBareJID(message, recipient) 919 except error.StanzaError, exc: 920 if stanzaType == 'error': 921 log.err(exc, "Undeliverable error") 922 else: 923 self.send(exc.toResponse(message)) 924 925 message.handled = True 926 927 928 def onMessageBareJID(self, message, bareJID): 929 stanzaType = message.getAttribute('type', 'normal') 930 931 userSessions = self.sessionManager.sessions.get(bareJID.user, {}) 932 print userSessions 933 934 recipients = set() 935 936 if stanzaType == 'headline': 937 for session in userSessions: 938 if session.presence.priority >= 0: 939 recipients.add(session.entity) 940 elif stanzaType in ('chat', 'normal'): 941 priorities = {} 942 for session in userSessions.itervalues(): 943 if not session.presence or not session.presence.available: 944 continue 945 priority = session.presence.priority 946 if priority >= 0: 947 priorities.setdefault(priority, set()).add(session.entity) 948 maxPriority = max(priorities.keys()) 949 recipients.update(priorities[maxPriority]) 950 elif stanzaType == 'groupchat': 951 raise error.StanzaError('service-unavailable') 952 953 if recipients: 954 for recipient in recipients: 955 self.sessionManager.deliverStanza(message, recipient) 956 elif stanzaType in ('chat', 'normal'): 957 raise error.StanzaError('service-unavailable') 958 else: 959 # silently discard 960 log.msg("Discarding message to %r" % message['to']) 961 962 963 964 965 def clonePresence(presence): 966 """ 967 Make a deep copy of a presence stanza. 968 969 The returned presence stanza is an orphaned deep copy of the given 970 original. 971 972 @note: Since the reference to the original parent, if any, is gone, 973 inherited attributes like C{xml:lang} are not preserved. 974 """ 975 element = presence.element 976 977 parent = element.parent 978 element.parent = None 979 newElement = copy.deepcopy(element) 980 element.parent = parent 981 return newElement 982 983 984 985 class PresenceServerHandler(PresenceProtocol): 986 987 def __init__(self, sessionManager, domain, roster): 988 PresenceProtocol.__init__(self) 989 self.sessionManager = sessionManager 990 self.domain = domain 991 self.roster = roster 992 self.presences = {} # user -> resource -> presence 993 self.offlinePresences = {} # user -> presence 994 self.remotePresences = {} # user -> remote entity -> presence 995 996 997 def _broadcastToOtherResources(self, presence): 998 """ 999 Broadcast presence to other available resources. 1000 """ 1001 fromJID = presence.sender 1002 for otherResource in self.presences[fromJID.user]: 1003 if otherResource == fromJID.resource: 1004 continue 1005 1006 resourceJID = JID(tuple=(fromJID.user, fromJID.host, otherResource)) 1007 outPresence = clonePresence(presence) 1008 outPresence['to'] = resourceJID.full() 1009 self.sessionManager.deliverStanza(outPresence, resourceJID) 1010 1011 1012 def _broadcastToContacts(self, presence): 1013 """ 1014 Broadcast presence to subscribed entities. 1015 """ 1016 fromJID = presence.sender 1017 roster = self.roster[fromJID.user] 1018 1019 for item in roster.itervalues(): 1020 if not item.subscriptionFrom: 1021 continue 1022 1023 outPresence = clonePresence(presence) 1024 outPresence['to'] = item.jid.full() 1025 1026 if item.jid.host == self.domain: 1027 # local contact 1028 if item.jid.user in self.presences: 1029 # broadcast to contact's available resources 1030 for itemResource in self.presences[item.jid.user]: 1031 resourceJID = JID(tuple=(item.jid.user, 1032 item.jid.host, 1033 itemResource)) 1034 self.sessionManager.deliverStanza(outPresence, 1035 resourceJID) 1036 else: 1037 # remote contact 1038 self.send(outPresence) 1039 1040 1041 def _on_availableBroadcast(self, presence): 1042 fromJID = presence.sender 1043 user, resource = fromJID.user, fromJID.resource 1044 roster = self.roster[user] 1045 1046 if user not in self.presences: 1047 # initial presence 1048 self.presences[user] = {} 1049 self.remotePresences[user] = {} 1050 1051 # send out probes 1052 for item in roster.itervalues(): 1053 if item.subscriptionTo and item.jid.host != self.domain: 1054 self.probe(item.jid, fromJID) 1055 else: 1056 if resource not in self.presences[user]: 1057 # initial presence with another available resource 1058 1059 # send last known presences from remote contacts 1060 remotePresences = self.remotePresences[user] 1061 for entity, remotePresence in remotePresences.iteritems(): 1062 self.sessionManager.deliverStanza(remotePresence.element, 1063 fromJID) 1064 1065 # send presence to other resources 1066 self._broadcastToOtherResources(presence) 1067 1068 # Send last known local presences 1069 if user not in self.presences or resource not in self.presences[user]: 1070 for item in roster.itervalues(): 1071 if item.subscriptionTo and \ 1072 item.jid.host == self.domain and \ 1073 item.jid.user in self.presences: 1074 for contactPresence in \ 1075 self.presences[item.jid.user].itervalues(): 1076 outPresence = clonePresence(contactPresence) 1077 outPresence['to'] = fromJID.userhost() 1078 self.sessionManager.deliverStanza(outPresence, fromJID) 1079 1080 # broadcast presence 1081 self._broadcastToContacts(presence) 1082 1083 # save presence 1084 self.presences[user][resource] = presence 1085 self.sessionManager.sessions[user][resource].presence = presence 1086 1087 1088 #def _on_availableDirected(self, presence): 1089 # pass 1090 1091 1092 def _on_availableInbound(self, presence): 1093 fromJID = presence.sender 1094 toJID = presence.recipient 1095 if (toJID.user in self.roster and 1096 toJID.user in self.presences): 1097 for resource in self.presences[toJID.user]: 1098 resourceJID = JID(tuple=(toJID.user, toJID.host, resource)) 1099 self.sessionManager.deliverStanza(presence.element, resourceJID) 1100 self.remotePresences[toJID.user][fromJID] = presence 1101 else: 1102 # no such user or no available resource, ignore this stanza 1103 pass 1104 1105 1106 def _on_unavailableBroadcast(self, presence): 1107 fromJID = presence.sender 1108 user, resource = fromJID.user, fromJID.resource 1109 1110 # broadcast presence 1111 self._broadcastToContacts(presence) 1112 1113 if user in self.presences: 1114 # send presence to other resources 1115 self._broadcastToOtherResources(presence) 1116 1117 # update stored presences 1118 if resource in self.presences[user]: 1119 del self.presences[user][resource] 1120 1121 if not self.presences[user]: 1122 # last resource to become unavailable 1123 del self.presences[user] 1124 1125 # TODO: save last unavailable presence 1126 1127 1128 # def _on_unavailableDirected(self, presence): 1129 # pass 1130 1131 1132 # def _on_unavailableInbound(self, presence): 1133 # pass 1134 1135 1136 def getDirection(self, presence): 1137 if not presence.recipient: 1138 if presence.sender.host == self.domain: 1139 # broadcast presence from local domain 1140 return 'Broadcast' 1141 else: 1142 raise Exception("Unexpected missing to address") 1143 else: 1144 if presence.sender.host == self.domain: 1145 # directed presence from local domain 1146 return 'Directed' 1147 elif presence.recipient.host == self.domain: 1148 # incoming remote presence 1149 return 'Inbound' 1150 else: 1151 raise Exception("Badly routed presence") 1152 1153 def availableReceived(self, presence): 1154 direction = self.getDirection(presence) 1155 handler = getattr(self, "_on_available%s" % direction) 1156 if handler: 1157 handler(presence) 1158 presence.handled = True 1159 else: 1160 print "Unhandled: %r" % presence.element.toXml() 1161 1162 1163 def unavailableReceived(self, presence): 1164 direction = self.getDirection(presence) 1165 handler = getattr(self, "_on_unavailable%s" % direction) 1166 if handler: 1167 handler(presence) 1168 presence.handled = True 1169 else: 1170 print "Unhandled: %r" % presence.element.toXml() 1171 1172 1173 1174 def probeReceived(self, presence): 1175 fromJID = presence.sender 1176 toJID = presence.recipient 1177 1178 if toJID.user not in self.roster or \ 1179 fromJID.userhost() not in self.roster[toJID.user] or \ 1180 not self.roster[toJID.user][fromJID.userhost()].subscriptionFrom: 1181 # send unsubscribed 1182 pass 1183 elif toJID.user not in self.presences: 1184 # send last unavailable or nothing 1185 pass 1186 else: 1187 for resourcePresence in self.presences[toJID.user].itervalues(): 1188 outPresence = clonePresence(resourcePresence) 1189 outPresence['to'] = fromJID.userhost() 1190 self.send(outPresence)
Note: See TracBrowser
for help on using the repository browser.