source:
ralphm-patches/xmpp_client_service.patch
@
51:6edeb69e910c
Last change on this file since 51:6edeb69e910c was 51:6edeb69e910c, checked in by Ralph Meijer <ralphm@…>, 11 years ago | |
---|---|
File size: 32.2 KB |
-
new file doc/examples/client_service.tac
diff -r d7fa09914b70 doc/examples/client_service.tac
- + 1 from twisted.application import service, strports 2 from twisted.internet import defer 3 4 from wokkel import client, xmppim 5 from wokkel.component import InternalComponent, Router 6 from wokkel.generic import FallbackHandler 7 from wokkel.ping import PingHandler 8 from wokkel.xmppim import RosterItem 9 10 from twisted.words.protocols.jabber.jid import internJID as JID 11 12 import socket 13 domain = socket.gethostname() 14 15 RALPHM = JID('ralphm@'+domain) 16 INTOSI = JID('intosi@'+domain) 17 TERMIE = JID('termie@'+domain) 18 19 roster = { 20 'ralphm': { 21 INTOSI: RosterItem(INTOSI, 22 subscriptionTo=True, 23 subscriptionFrom=True, 24 name='Intosi', 25 groups=set(['Friends'])), 26 TERMIE: RosterItem(TERMIE, 27 subscriptionTo=True, 28 subscriptionFrom=True, 29 name='termie'), 30 }, 31 'termie': { 32 RALPHM: RosterItem(RALPHM, 33 subscriptionTo=True, 34 subscriptionFrom=True, 35 name='ralphm'), 36 } 37 } 38 39 accounts = set(roster.keys()) 40 41 42 class StaticRoster(xmppim.RosterServerProtocol): 43 44 def __init__(self, roster): 45 xmppim.RosterServerProtocol.__init__(self) 46 self.roster = roster 47 48 def getRoster(self, entity): 49 return defer.succeed(self.roster[entity.user].values()) 50 51 52 53 application = service.Application("Jabber server") 54 55 router = Router() 56 component = InternalComponent(router, domain) 57 component.setServiceParent(application) 58 59 sessionManager = client.SessionManager(domain, accounts) 60 sessionManager.setHandlerParent(component) 61 62 xmppim.AccountIQHandler(sessionManager).setHandlerParent(component) 63 xmppim.AccountMessageHandler(sessionManager).setHandlerParent(component) 64 xmppim.PresenceServerHandler(sessionManager, domain, roster).setHandlerParent(component) 65 FallbackHandler().setHandlerParent(component) 66 StaticRoster(roster).setHandlerParent(component) 67 PingHandler().setHandlerParent(component) 68 69 c2sFactory = client.XMPPC2SServerFactory(sessionManager) 70 c2sFactory.logTraffic = True 71 c2sService = strports.service('5224', c2sFactory) 72 c2sService.setServiceParent(application) 73 74 sessionManager.connectionManager = c2sFactory -
wokkel/client.py
diff -r d7fa09914b70 wokkel/client.py
a b 10 10 that should probably eventually move there. 11 11 """ 12 12 13 import base64 14 13 15 from twisted.application import service 14 from twisted.internet import reactor16 from twisted.internet import defer, reactor 15 17 from twisted.names.srvconnect import SRVConnector 16 from twisted.words.protocols.jabber import client, sasl, xmlstream 18 from twisted.python import log, randbytes 19 from twisted.words.protocols.jabber import client, error, sasl, xmlstream 20 from twisted.words.protocols.jabber.jid import JID, internJID 21 from twisted.words.xish import domish, utility 17 22 18 23 from wokkel import generic 19 from wokkel.subprotocols import StreamManager 24 from wokkel.compat import XmlStreamServerFactory 25 from wokkel.subprotocols import StreamManager, XMPPHandler 26 27 NS_CLIENT = 'jabber:client' 28 29 XPATH_ALL = "/*" 30 XPATH_AUTH = "/auth[@xmlns='%s']" % sasl.NS_XMPP_SASL 31 XPATH_BIND = "/iq[@type='set']/bind[@xmlns='%s']" % client.NS_XMPP_BIND 32 XPATH_SESSION = "/iq[@type='set']/session[@xmlns='%s']" % \ 33 client.NS_XMPP_SESSION 20 34 21 35 class CheckAuthInitializer(object): 22 36 """ … … 51 65 autentication. 52 66 """ 53 67 54 namespace = 'jabber:client'68 namespace = NS_CLIENT 55 69 56 70 def __init__(self, jid, password): 57 71 xmlstream.ConnectAuthenticator.__init__(self, jid.host) … … 186 200 c = XMPPClientConnector(reactor, domain, factory) 187 201 c.connect() 188 202 return factory.deferred 203 204 205 206 class XMPPClientListenAuthenticator(xmlstream.ListenAuthenticator): 207 namespace = NS_CLIENT 208 209 def __init__(self, service): 210 self.service = service 211 self.failureGrace = 3 212 self.state = 'auth' 213 214 215 def associateWithStream(self, xs): 216 xmlstream.ListenAuthenticator.associateWithStream(self, xs) 217 self.xmlstream.addObserver(XPATH_ALL, self.onElementFallback, -1) 218 219 220 def onElementFallback(self, element): 221 if element.handled: 222 return 223 224 exc = error.StreamError('not-authorized') 225 self.xmlstream.sendStreamError(exc) 226 227 228 def streamStarted(self, rootElement): 229 xmlstream.ListenAuthenticator.streamStarted(self, rootElement) 230 231 # check namespace 232 #if self.xmlstream.namespace != self.namespace: 233 # self.xmlstream.namespace = self.namespace 234 # exc = error.StreamError('invalid-namespace') 235 # self.xmlstream.sendStreamError(exc) 236 # return 237 238 # TODO: check domain (self.service.domain) 239 240 self.xmlstream.sendHeader() 241 242 try: 243 stateHandlerName = 'streamStarted_' + self.state 244 stateHandler = getattr(self, stateHandlerName) 245 except AttributeError: 246 log.msg('streamStarted handler for', self.state, 'not found') 247 else: 248 stateHandler() 249 250 251 def toState(self, state): 252 self.state = state 253 if state == 'initialized': 254 self.xmlstream.removeObserver(XPATH_ALL, self.onElementFallback) 255 self.xmlstream.addOnetimeObserver(XPATH_SESSION, self.onSession, 1) 256 self.xmlstream.dispatch(self.xmlstream, 257 xmlstream.STREAM_AUTHD_EVENT) 258 259 260 def streamStarted_auth(self): 261 features = domish.Element((xmlstream.NS_STREAMS, 'features')) 262 features.addElement((sasl.NS_XMPP_SASL, 'mechanisms')) 263 features.mechanisms.addElement('mechanism', content='PLAIN') 264 self.xmlstream.send(features) 265 self.xmlstream.addOnetimeObserver(XPATH_AUTH, self.onAuth) 266 267 268 def onAuth(self, auth): 269 auth.handled = True 270 271 if auth.getAttribute('mechanism') != 'PLAIN': 272 failure = domish.Element((sasl.NS_XMPP_SASL, 'failure')) 273 failure.addElement('invalid-mechanism') 274 self.xmlstream.send(failure) 275 276 # Close stream on too many failing authentication attempts 277 self.failureGrace -= 1 278 if self.failureGrace == 0: 279 self.xmlstream.sendFooter() 280 else: 281 self.xmlstream.addOnetimeObserver(XPATH_AUTH, self.onAuth) 282 283 return 284 285 initialResponse = base64.b64decode(unicode(auth)) 286 authzid, authcid, passwd = initialResponse.split('\x00') 287 288 # TODO: check passwd 289 290 # authenticated 291 292 self.username = authcid 293 294 success = domish.Element((sasl.NS_XMPP_SASL, 'success')) 295 self.xmlstream.send(success) 296 self.xmlstream.reset() 297 298 self.toState('bind') 299 300 301 def streamStarted_bind(self): 302 features = domish.Element((xmlstream.NS_STREAMS, 'features')) 303 features.addElement((client.NS_XMPP_BIND, 'bind')) 304 features.addElement((client.NS_XMPP_SESSION, 'session')) 305 self.xmlstream.send(features) 306 self.xmlstream.addOnetimeObserver(XPATH_BIND, self.onBind) 307 308 309 def onBind(self, iq): 310 def cb(boundJID): 311 self.xmlstream.otherEntity = boundJID 312 self.toState('initialized') 313 314 response = xmlstream.toResponse(iq, 'result') 315 response.addElement((client.NS_XMPP_BIND, 'bind')) 316 response.bind.addElement((client.NS_XMPP_BIND, 'jid'), 317 content=boundJID.full()) 318 319 return response 320 321 def eb(failure): 322 if not isinstance(failure, error.StanzaError): 323 log.msg(failure) 324 exc = error.StanzaError('internal-server-error') 325 else: 326 exc = failure.value 327 328 return exc.toResponse(iq) 329 330 iq.handled = True 331 resource = unicode(iq.bind) or None 332 d = self.service.bindResource(self.username, 333 self.service.domain, 334 resource) 335 d.addCallback(cb) 336 d.addErrback(eb) 337 d.addCallback(self.xmlstream.send) 338 339 340 def onSession(self, iq): 341 iq.handled = True 342 343 reply = domish.Element((None, 'iq')) 344 reply['type'] = 'result' 345 if iq.getAttribute('id'): 346 reply['id'] = iq['id'] 347 reply.addElement((client.NS_XMPP_SESSION, 'session')) 348 self.xmlstream.send(reply) 349 350 351 352 class RecipientUnavailable(Exception): 353 """ 354 The addressed entity is not, or no longer, available. 355 """ 356 357 358 359 class XMPPC2SServerFactory(XmlStreamServerFactory): 360 361 def __init__(self, service): 362 self.service = service 363 364 def authenticatorFactory(): 365 return XMPPClientListenAuthenticator(service) 366 367 XmlStreamServerFactory.__init__(self, authenticatorFactory) 368 self.addBootstrap(xmlstream.STREAM_CONNECTED_EVENT, 369 self.onConnectionMade) 370 self.addBootstrap(xmlstream.STREAM_AUTHD_EVENT, 371 self.onAuthenticated) 372 373 self.serial = 0 374 self.streams = {} 375 376 377 def onConnectionMade(self, xs): 378 """ 379 Called when a client-to-server connection was made. 380 381 This enables traffic debugging on incoming streams. 382 """ 383 xs.serial = self.serial 384 self.serial += 1 385 386 log.msg("Client connection %d made" % xs.serial) 387 388 def logDataIn(buf): 389 log.msg("RECV (%d): %r" % (xs.serial, buf)) 390 391 def logDataOut(buf): 392 log.msg("SEND (%d): %r" % (xs.serial, buf)) 393 394 if self.logTraffic: 395 xs.rawDataInFn = logDataIn 396 xs.rawDataOutFn = logDataOut 397 398 xs.addObserver(xmlstream.STREAM_ERROR_EVENT, self.onError) 399 400 401 def onAuthenticated(self, xs): 402 log.msg("Client connection %d authenticated" % xs.serial) 403 404 xs.addObserver(xmlstream.STREAM_END_EVENT, self.onConnectionLost, 405 0, xs) 406 xs.addObserver('/*', self.onElement, 0, xs) 407 408 # Record this stream as bound to the authenticated JID 409 self.streams[xs.otherEntity] = xs 410 411 412 def onConnectionLost(self, xs, reason): 413 log.msg("Client connection %d disconnected" % xs.serial) 414 415 entity = xs.otherEntity 416 self.service.unbindResource(entity.user, 417 entity.host, 418 entity.resource, 419 reason) 420 421 # If the lost connections had been bound, remove the reference 422 if xs.otherEntity in self.streams: 423 del self.streams[xs.otherEntity] 424 425 426 def onError(self, reason): 427 log.err(reason, "Stream Error") 428 429 430 def onElement(self, xs, stanza): 431 """ 432 Called when an element was received from one of the connected streams. 433 434 """ 435 if stanza.handled: 436 return 437 else: 438 self.service.onElement(stanza, xs.otherEntity) 439 440 441 def deliverStanza(self, element, recipient): 442 if recipient in self.streams: 443 self.streams[recipient].send(element) 444 else: 445 raise RecipientUnavailable(u"There is no connection for %s" % 446 recipient.full()) 447 448 449 class Session(object): 450 def __init__(self, entity): 451 self.entity = entity 452 self.connected = False 453 self.interested = False 454 self.presence = None 455 456 457 458 class SessionManager(XMPPHandler): 459 460 """ 461 Session Manager. 462 463 @ivar xmlstream: XML Stream to inject incoming stanzas from client 464 connections into. Stanzas where the C{'to'} attribute is not set 465 or is directed at the local domain are injected as if received on 466 the XML Stream (using C{dispatch}), other stanzas are injected as if 467 they were sent from the XML Stream (using C{send}). 468 """ 469 470 def __init__(self, domain, accounts): 471 XMPPHandler.__init__(self) 472 self.domain = domain 473 self.accounts = accounts 474 475 self.connectionManager = None 476 self.sessions = {} 477 self.clientStream = utility.EventDispatcher() 478 self.clientStream.addObserver('/*', self.routeOrDeliver, -1) 479 480 481 def bindResource(self, localpart, domain, resource): 482 if domain != self.domain: 483 raise Exception("I don't host this domain!") 484 485 try: 486 userSessions = self.sessions[localpart] 487 except KeyError: 488 userSessions = self.sessions[localpart] = {} 489 490 if resource is None: 491 resource = randbytes.secureRandom(8).encode('hex') 492 elif resource in self.userSessions: 493 resource = resource + ' ' + randbytes.secureRandom(8).encode('hex') 494 495 entity = JID(tuple=(localpart, domain, resource)) 496 session = Session(entity) 497 session.connected = True 498 userSessions[resource] = session 499 500 return defer.succeed(entity) 501 502 503 def unbindResource(self, localpart, domain, resource, reason=None): 504 try: 505 session = self.sessions[localpart][resource] 506 except KeyError: 507 pass 508 else: 509 session.connected = False 510 del self.sessions[localpart][resource] 511 if not self.sessions[localpart]: 512 del self.sessions[localpart] 513 514 return defer.succeed(None) 515 516 517 def onElement(self, element, sender): 518 # Make sure each stanza has a sender address 519 if (element.name == 'presence' and 520 element.getAttribute('type') in ('subscribe', 'subscribed', 521 'unsubscribe', 'unsubscribed')): 522 element['from'] = sender.userhost() 523 else: 524 element['from'] = sender.full() 525 526 self.clientStream.dispatch(element) 527 528 529 def routeOrDeliver(self, element): 530 if element.handled: 531 return 532 533 if (not element.hasAttribute('to') or 534 internJID(element['to']).host == self.domain): 535 # This stanza is for local delivery 536 log.msg("Delivering locally: %r" % element.toXml()) 537 self.xmlstream.dispatch(element) 538 else: 539 # This stanza is for remote routing 540 log.msg("Routing remotely: %r" % element.toXml()) 541 self.xmlstream.send(element) 542 543 544 def deliverStanza(self, element, recipient): 545 if self.connectionManager: 546 self.connectionManager.deliverStanza(element, recipient) 547 else: 548 raise Exception("No connection manager set") -
wokkel/component.py
diff -r d7fa09914b70 wokkel/component.py
a b 313 313 """ 314 314 destination = JID(stanza['to']) 315 315 316 log.msg("Routing to %s: %r" % (destination.full(), stanza.toXml()))317 316 318 317 if destination.host in self.routes: 318 msg = "Routing to %s (default route): %r" 319 log.msg("Routing to %s: %r" % (destination.full(), 320 stanza.toXml())) 319 321 self.routes[destination.host].send(stanza) 322 elif None in self.routes: 323 log.msg("Routing to %s (default route): %r" % (destination.full(), 324 stanza.toXml())) 325 self.routes[None].send(stanza) 320 326 else: 321 self.routes[None].send(stanza) 327 log.msg("No route to %s: %r" % (destination.full(), 328 stanza.toXml())) 329 if stanza.getAttribute('type') not in ('result', 'error'): 330 # No route, send back error 331 exc = error.StanzaError('remote-server-timeout', type='wait') 332 exc.code = '504' 333 response = exc.toResponse(stanza) 334 self.route(response) 322 335 323 336 324 337 -
wokkel/im.py
diff -r d7fa09914b70 wokkel/im.py
a b 170 170 self.xmlstream.addObserver("/presence", self._onPresence) 171 171 172 172 173 def _onPresence(self, element): 173 def parsePresence(self, element): 174 """ 175 Parse presence. 176 """ 174 177 stanza = Stanza.fromElement(element) 175 178 176 179 presenceType = stanza.stanzaType or 'available' … … 180 183 except KeyError: 181 184 return 182 185 183 presence = parser.fromElement(element) 186 return parser.fromElement(element) 187 188 189 def _onPresence(self, element): 190 presence = self.parsePresence(element) 191 presenceType = presence.stanzaType or 'available' 184 192 185 193 try: 186 194 handler = getattr(self, '%sReceived' % presenceType) 187 195 except AttributeError: 188 196 return 189 197 else: 190 handler(presence)198 element.handled = handler(presence) 191 199 192 200 193 201 def errorReceived(self, presence): … … 583 591 584 592 585 593 594 class AccountIQHandler(XMPPHandler): 595 596 def __init__(self, sessionManager): 597 XMPPHandler.__init__(self) 598 self.sessionManager = sessionManager 599 600 601 def connectionMade(self): 602 self.xmlstream.addObserver('/iq', self.onIQ, 1) 603 604 605 def onIQ(self, iq): 606 """ 607 Handler for iq stanzas to user accounts' connected resources. 608 609 If the recipient is a bare JID or there is no associated user, this 610 handler ignores the stanza, so that other handlers have a chance 611 to pick it up. If used, L{generic.FallbackHandler} will respond with a 612 C{'service-unavailable'} stanza error if no other handlers handle 613 the iq. 614 """ 615 616 if iq.handled: 617 return 618 619 try: 620 recipient = internJID(iq['to']) 621 except KeyError: 622 return 623 624 if not recipient.user: 625 # This is not for an account, ignore it 626 return 627 elif recipient.user not in self.sessionManager.accounts: 628 # This is not a user, ignore it 629 return 630 elif not recipient.resource: 631 # Bare JID at local domain, ignore it 632 return 633 634 userSessions = self.sessionManager.sessions.get(recipient.user, 635 {}) 636 if recipient.resource in userSessions: 637 self.sessionManager.deliverStanza(iq, recipient) 638 else: 639 # Full JID without connected resource, return error 640 exc = error.StanzaError('service-unavailable') 641 if iq['type'] in ('result', 'error'): 642 log.err(exc, 'Could not deliver IQ response') 643 else: 644 self.send(exc.toResponse(iq)) 645 646 iq.handled = True 647 648 649 650 class AccountMessageHandler(XMPPHandler): 651 652 def __init__(self, sessionManager): 653 XMPPHandler.__init__(self) 654 self.sessionManager = sessionManager 655 656 657 def connectionMade(self): 658 self.xmlstream.addObserver('/message', self.onMessage, 1) 659 660 661 def onMessage(self, message): 662 """ 663 Handler for message stanzas to user accounts. 664 """ 665 666 if message.handled: 667 return 668 669 try: 670 recipient = internJID(message['to']) 671 except KeyError: 672 return 673 674 stanzaType = message.getAttribute('type', 'normal') 675 676 try: 677 if not recipient.user: 678 # This is not for an account, ignore it 679 return 680 elif recipient.user not in self.sessionManager.accounts: 681 # This is not a user, ignore it 682 return 683 elif recipient.resource: 684 userSessions = self.sessionManager.sessions.get(recipient.user, 685 {}) 686 if recipient.resource in userSessions: 687 self.sessionManager.deliverStanza(message, recipient) 688 else: 689 if stanzaType in ('normal', 'chat', 'headline'): 690 self.onMessageBareJID(message, recipient.userhostJID()) 691 elif stanzaType == 'error': 692 log.msg("Dropping message to unconnected resource %r" % 693 recipient.full()) 694 elif stanzaType == 'groupchat': 695 raise error.StanzaError('service-unavailable') 696 else: 697 self.onMessageBareJID(message, recipient) 698 except error.StanzaError, exc: 699 if stanzaType == 'error': 700 log.err(exc, "Undeliverable error") 701 else: 702 self.send(exc.toResponse(message)) 703 704 message.handled = True 705 706 707 def onMessageBareJID(self, message, bareJID): 708 stanzaType = message.getAttribute('type', 'normal') 709 710 userSessions = self.sessionManager.sessions.get(bareJID.user, {}) 711 712 recipients = set() 713 714 if stanzaType == 'headline': 715 for session in userSessions: 716 if session.presence.priority >= 0: 717 recipients.add(session.entity) 718 elif stanzaType in ('chat', 'normal'): 719 priorities = {} 720 for session in userSessions.itervalues(): 721 if not session.presence or not session.presence.available: 722 continue 723 priority = session.presence.priority 724 if priority >= 0: 725 priorities.setdefault(priority, set()).add(session.entity) 726 maxPriority = max(priorities.keys()) 727 recipients.update(priorities[maxPriority]) 728 elif stanzaType == 'groupchat': 729 raise error.StanzaError('service-unavailable') 730 731 if recipients: 732 for recipient in recipients: 733 self.sessionManager.deliverStanza(message, recipient) 734 elif stanzaType in ('chat', 'normal'): 735 raise error.StanzaError('service-unavailable') 736 else: 737 # silently discard 738 log.msg("Discarding message to %r" % message['to']) 739 740 741 742 743 def clonePresence(presence): 744 """ 745 Make a deep copy of a presence stanza. 746 747 The returned presence stanza is an orphaned deep copy of the given 748 original. 749 750 @note: Since the reference to the original parent, if any, is gone, 751 inherited attributes like C{xml:lang} are not preserved. 752 """ 753 element = presence.element 754 755 parent = element.parent 756 element.parent = None 757 newElement = copy.deepcopy(element) 758 element.parent = parent 759 return newElement 760 761 762 763 class PresenceServerHandler(PresenceProtocol): 764 765 def __init__(self, sessionManager, domain, roster): 766 PresenceProtocol.__init__(self) 767 self.sessionManager = sessionManager 768 self.domain = domain 769 self.roster = roster 770 self.presences = {} # user -> resource -> presence 771 self.offlinePresences = {} # user -> presence 772 self.remotePresences = {} # user -> remote entity -> presence 773 774 self.sessionManager.clientStream.addObserver('/presence', 775 self._onPresenceOutbound) 776 777 778 def _onPresenceOutbound(self, element): 779 log.msg("Got outbound presence: %r" % element.toXml()) 780 presence = self.parsePresence(element) 781 782 presenceType = presence.stanzaType or 'available' 783 method = '%sReceivedOutbound' % presenceType 784 print method 785 786 try: 787 handler = getattr(self, method) 788 except AttributeError: 789 return 790 else: 791 element.handled = handler(presence) 792 793 794 def _broadcastToOtherResources(self, presence): 795 """ 796 Broadcast presence to other available resources. 797 """ 798 fromJID = presence.sender 799 for otherResource in self.presences[fromJID.user]: 800 if otherResource == fromJID.resource: 801 continue 802 803 resourceJID = JID(tuple=(fromJID.user, fromJID.host, otherResource)) 804 outPresence = clonePresence(presence) 805 outPresence['to'] = resourceJID.full() 806 self.sessionManager.deliverStanza(outPresence, resourceJID) 807 808 809 def _broadcastToContacts(self, presence): 810 """ 811 Broadcast presence to subscribed entities. 812 """ 813 fromJID = presence.sender 814 roster = self.roster[fromJID.user] 815 816 for item in roster.itervalues(): 817 if not item.subscriptionFrom: 818 continue 819 820 outPresence = clonePresence(presence) 821 outPresence['to'] = item.jid.full() 822 823 if item.jid.host == self.domain: 824 # local contact 825 if item.jid.user in self.presences: 826 # broadcast to contact's available resources 827 for itemResource in self.presences[item.jid.user]: 828 resourceJID = JID(tuple=(item.jid.user, 829 item.jid.host, 830 itemResource)) 831 self.sessionManager.deliverStanza(outPresence, 832 resourceJID) 833 else: 834 # remote contact 835 self.send(outPresence) 836 837 838 def _on_availableBroadcast(self, presence): 839 fromJID = presence.sender 840 user, resource = fromJID.user, fromJID.resource 841 roster = self.roster[user] 842 843 if user not in self.presences: 844 # initial presence 845 self.presences[user] = {} 846 self.remotePresences[user] = {} 847 848 # send out probes 849 for item in roster.itervalues(): 850 if item.subscriptionTo and item.jid.host != self.domain: 851 self.probe(item.jid, fromJID) 852 else: 853 if resource not in self.presences[user]: 854 # initial presence with another available resource 855 856 # send last known presences from remote contacts 857 remotePresences = self.remotePresences[user] 858 for entity, remotePresence in remotePresences.iteritems(): 859 self.sessionManager.deliverStanza(remotePresence.element, 860 fromJID) 861 862 # send presence to other resources 863 self._broadcastToOtherResources(presence) 864 865 # Send last known local presences 866 if user not in self.presences or resource not in self.presences[user]: 867 for item in roster.itervalues(): 868 if item.subscriptionTo and \ 869 item.jid.host == self.domain and \ 870 item.jid.user in self.presences: 871 for contactPresence in \ 872 self.presences[item.jid.user].itervalues(): 873 outPresence = clonePresence(contactPresence) 874 outPresence['to'] = fromJID.userhost() 875 self.sessionManager.deliverStanza(outPresence, fromJID) 876 877 # broadcast presence 878 self._broadcastToContacts(presence) 879 880 # save presence 881 self.presences[user][resource] = presence 882 self.sessionManager.sessions[user][resource].presence = presence 883 884 return True 885 886 887 def _on_availableDirected(self, presence): 888 self.send(presence.element) 889 return True 890 891 892 def availableReceivedOutbound(self, presence): 893 if presence.recipient: 894 return self._on_availableDirected(presence) 895 else: 896 return self._on_availableBroadcast(presence) 897 898 899 def availableReceived(self, presence): 900 fromJID = presence.sender 901 toJID = presence.recipient 902 903 if toJID.user not in self.roster: 904 return False 905 906 if toJID.user in self.presences: 907 for resource in self.presences[toJID.user]: 908 resourceJID = JID(tuple=(toJID.user, toJID.host, resource)) 909 self.sessionManager.deliverStanza(presence.element, resourceJID) 910 self.remotePresences[toJID.user][fromJID] = presence 911 else: 912 # no such user or no available resource, ignore this stanza 913 pass 914 915 return True 916 917 918 def _on_unavailableBroadcast(self, presence): 919 fromJID = presence.sender 920 user, resource = fromJID.user, fromJID.resource 921 922 # broadcast presence 923 self._broadcastToContacts(presence) 924 925 if user in self.presences: 926 # send presence to other resources 927 self._broadcastToOtherResources(presence) 928 929 # update stored presences 930 if resource in self.presences[user]: 931 del self.presences[user][resource] 932 933 if not self.presences[user]: 934 # last resource to become unavailable 935 del self.presences[user] 936 937 # TODO: save last unavailable presence 938 939 return True 940 941 942 def _on_unavailableDirected(self, presence): 943 self.send(presence.element) 944 return True 945 946 947 def unavailableReceivedOutbound(self, presence): 948 if presence.recipient: 949 return self._on_unavailableDirected(presence) 950 else: 951 return self._on_unavailableBroadcast(presence) 952 953 # def unavailableReceived(self, presence): 954 955 956 def subscribedReceivedOutbound(self, presence): 957 log.msg("%r subscribed %s to its presence" % (presence.sender, 958 presence.recipient)) 959 self.send(presence.element) 960 return True 961 962 963 def subscribedReceived(self, presence): 964 log.msg("%r subscribed %s to its presence" % (presence.sender, 965 presence.recipient)) 966 967 968 def unsubscribedReceivedOutbound(self, presence): 969 log.msg("%r unsubscribed %s from its presence" % (presence.sender, 970 presence.recipient)) 971 self.send(presence.element) 972 return True 973 974 975 def unsubscribedReceived(self, presence): 976 log.msg("%r unsubscribed %s from its presence" % (presence.sender, 977 presence.recipient)) 978 979 980 def subscribeReceivedOutbound(self, presence): 981 log.msg("%r requests subscription to %s" % (presence.sender, 982 presence.recipient)) 983 self.send(presence.element) 984 return True 985 986 987 def subscribeReceived(self, presence): 988 log.msg("%r requests subscription to %s" % (presence.sender, 989 presence.recipient)) 990 991 992 def unsubscribeReceivedOutbound(self, presence): 993 log.msg("%r requests unsubscription from %s" % (presence.sender, 994 presence.recipient)) 995 self.send(presence.element) 996 return True 997 998 999 def unsubscribeReceived(self, presence): 1000 log.msg("%r requests unsubscription from %s" % (presence.sender, 1001 presence.recipient)) 1002 1003 1004 def probeReceived(self, presence): 1005 fromJID = presence.sender 1006 toJID = presence.recipient 1007 1008 if toJID.user not in self.roster or \ 1009 fromJID.userhost() not in self.roster[toJID.user] or \ 1010 not self.roster[toJID.user][fromJID.userhost()].subscriptionFrom: 1011 # send unsubscribed 1012 pass 1013 elif toJID.user not in self.presences: 1014 # send last unavailable or nothing 1015 pass 1016 else: 1017 for resourcePresence in self.presences[toJID.user].itervalues(): 1018 outPresence = clonePresence(resourcePresence) 1019 outPresence['to'] = fromJID.userhost() 1020 self.send(outPresence) 1021 1022 1023 586 1024 class RosterServerProtocol(XMPPHandler, IQHandlerMixin): 587 1025 """ 588 1026 XMPP subprotocol handler for the roster, server side.
Note: See TracBrowser
for help on using the repository browser.