source:
ralphm-patches/xmpp_client_service.patch
@
52:a6ed3b9703cb
Last change on this file since 52:a6ed3b9703cb was 52:a6ed3b9703cb, checked in by Ralph Meijer <ralphm@…>, 11 years ago | |
---|---|
File size: 35.2 KB |
-
new file doc/examples/client_service.tac
diff -r 5c11baa0ef4c doc/examples/client_service.tac
- + 1 from twisted.application import service, strports 2 from twisted.internet import defer 3 4 from wokkel import client, im 5 from wokkel.component import InternalComponent, Router 6 from wokkel.generic import FallbackHandler 7 from wokkel.ping import PingHandler 8 from wokkel.im import RosterItem 9 10 from twisted.words.protocols.jabber.jid import internJID as JID 11 12 import socket 13 domain = socket.gethostname() 14 15 RALPHM = JID('ralphm@'+domain) 16 INTOSI = JID('intosi@'+domain) 17 TERMIE = JID('termie@'+domain) 18 19 roster = { 20 'ralphm': { 21 INTOSI: RosterItem(INTOSI, 22 subscriptionTo=True, 23 subscriptionFrom=True, 24 name='Intosi', 25 groups=set(['Friends'])), 26 TERMIE: RosterItem(TERMIE, 27 subscriptionTo=True, 28 subscriptionFrom=True, 29 name='termie'), 30 }, 31 'termie': { 32 RALPHM: RosterItem(RALPHM, 33 subscriptionTo=True, 34 subscriptionFrom=True, 35 name='ralphm'), 36 } 37 } 38 39 accounts = set(roster.keys()) 40 41 42 class StaticRoster(im.RosterServerProtocol): 43 44 def __init__(self, roster): 45 im.RosterServerProtocol.__init__(self) 46 self.roster = roster 47 48 def getRoster(self, request): 49 user = request.sender.user 50 return defer.succeed(self.roster[user].values()) 51 52 53 54 application = service.Application("Jabber server") 55 56 router = Router() 57 component = InternalComponent(router, domain) 58 component.setServiceParent(application) 59 60 sessionManager = client.SessionManager(domain, accounts) 61 sessionManager.setHandlerParent(component) 62 63 im.AccountIQHandler(sessionManager).setHandlerParent(component) 64 im.AccountMessageHandler(sessionManager).setHandlerParent(component) 65 im.PresenceServerHandler(sessionManager, domain, roster).setHandlerParent(component) 66 FallbackHandler().setHandlerParent(component) 67 StaticRoster(roster).setHandlerParent(component) 68 PingHandler().setHandlerParent(component) 69 70 c2sFactory = client.XMPPC2SServerFactory(sessionManager) 71 c2sFactory.logTraffic = True 72 c2sService = strports.service('5224', c2sFactory) 73 c2sService.setServiceParent(application) 74 75 sessionManager.connectionManager = c2sFactory -
wokkel/client.py
diff -r 5c11baa0ef4c wokkel/client.py
a b 10 10 that should probably eventually move there. 11 11 """ 12 12 13 import base64 14 13 15 from twisted.application import service 14 from twisted.internet import reactor16 from twisted.internet import defer, reactor 15 17 from twisted.names.srvconnect import SRVConnector 16 from twisted.words.protocols.jabber import client, sasl, xmlstream 18 from twisted.python import log, randbytes 19 from twisted.words.protocols.jabber import client, error, sasl, xmlstream 20 from twisted.words.protocols.jabber.jid import JID, internJID 21 from twisted.words.xish import domish, utility 17 22 18 23 from wokkel import generic 19 from wokkel.subprotocols import StreamManager 24 from wokkel.compat import XmlStreamServerFactory 25 from wokkel.subprotocols import StreamManager, XMPPHandler 26 27 NS_CLIENT = 'jabber:client' 28 29 XPATH_ALL = "/*" 30 XPATH_AUTH = "/auth[@xmlns='%s']" % sasl.NS_XMPP_SASL 31 XPATH_BIND = "/iq[@type='set']/bind[@xmlns='%s']" % client.NS_XMPP_BIND 32 XPATH_SESSION = "/iq[@type='set']/session[@xmlns='%s']" % \ 33 client.NS_XMPP_SESSION 20 34 21 35 class CheckAuthInitializer(object): 22 36 """ … … 51 65 autentication. 52 66 """ 53 67 54 namespace = 'jabber:client'68 namespace = NS_CLIENT 55 69 56 70 def __init__(self, jid, password): 57 71 xmlstream.ConnectAuthenticator.__init__(self, jid.host) … … 186 200 c = XMPPClientConnector(reactor, domain, factory) 187 201 c.connect() 188 202 return factory.deferred 203 204 205 206 class XMPPClientListenAuthenticator(xmlstream.ListenAuthenticator): 207 namespace = NS_CLIENT 208 209 def __init__(self, service): 210 self.service = service 211 self.failureGrace = 3 212 self.state = 'auth' 213 214 215 def associateWithStream(self, xs): 216 xmlstream.ListenAuthenticator.associateWithStream(self, xs) 217 self.xmlstream.addObserver(XPATH_ALL, self.onElementFallback, -1) 218 219 220 def onElementFallback(self, element): 221 if element.handled: 222 return 223 224 exc = error.StreamError('not-authorized') 225 self.xmlstream.sendStreamError(exc) 226 227 228 def streamStarted(self, rootElement): 229 xmlstream.ListenAuthenticator.streamStarted(self, rootElement) 230 231 # check namespace 232 #if self.xmlstream.namespace != self.namespace: 233 # self.xmlstream.namespace = self.namespace 234 # exc = error.StreamError('invalid-namespace') 235 # self.xmlstream.sendStreamError(exc) 236 # return 237 238 # TODO: check domain (self.service.domain) 239 240 self.xmlstream.sendHeader() 241 242 try: 243 stateHandlerName = 'streamStarted_' + self.state 244 stateHandler = getattr(self, stateHandlerName) 245 except AttributeError: 246 log.msg('streamStarted handler for', self.state, 'not found') 247 else: 248 stateHandler() 249 250 251 def toState(self, state): 252 self.state = state 253 if state == 'initialized': 254 self.xmlstream.removeObserver(XPATH_ALL, self.onElementFallback) 255 self.xmlstream.addOnetimeObserver(XPATH_SESSION, self.onSession, 1) 256 self.xmlstream.dispatch(self.xmlstream, 257 xmlstream.STREAM_AUTHD_EVENT) 258 259 260 def streamStarted_auth(self): 261 features = domish.Element((xmlstream.NS_STREAMS, 'features')) 262 features.addElement((sasl.NS_XMPP_SASL, 'mechanisms')) 263 features.mechanisms.addElement('mechanism', content='PLAIN') 264 self.xmlstream.send(features) 265 self.xmlstream.addOnetimeObserver(XPATH_AUTH, self.onAuth) 266 267 268 def onAuth(self, auth): 269 auth.handled = True 270 271 if auth.getAttribute('mechanism') != 'PLAIN': 272 failure = domish.Element((sasl.NS_XMPP_SASL, 'failure')) 273 failure.addElement('invalid-mechanism') 274 self.xmlstream.send(failure) 275 276 # Close stream on too many failing authentication attempts 277 self.failureGrace -= 1 278 if self.failureGrace == 0: 279 self.xmlstream.sendFooter() 280 else: 281 self.xmlstream.addOnetimeObserver(XPATH_AUTH, self.onAuth) 282 283 return 284 285 initialResponse = base64.b64decode(unicode(auth)) 286 authzid, authcid, passwd = initialResponse.split('\x00') 287 288 # TODO: check passwd 289 290 # authenticated 291 292 self.username = authcid 293 294 success = domish.Element((sasl.NS_XMPP_SASL, 'success')) 295 self.xmlstream.send(success) 296 self.xmlstream.reset() 297 298 self.toState('bind') 299 300 301 def streamStarted_bind(self): 302 features = domish.Element((xmlstream.NS_STREAMS, 'features')) 303 features.addElement((client.NS_XMPP_BIND, 'bind')) 304 features.addElement((client.NS_XMPP_SESSION, 'session')) 305 self.xmlstream.send(features) 306 self.xmlstream.addOnetimeObserver(XPATH_BIND, self.onBind) 307 308 309 def onBind(self, iq): 310 def cb(boundJID): 311 self.xmlstream.otherEntity = boundJID 312 self.toState('initialized') 313 314 response = xmlstream.toResponse(iq, 'result') 315 response.addElement((client.NS_XMPP_BIND, 'bind')) 316 response.bind.addElement((client.NS_XMPP_BIND, 'jid'), 317 content=boundJID.full()) 318 319 return response 320 321 def eb(failure): 322 if not isinstance(failure, error.StanzaError): 323 log.msg(failure) 324 exc = error.StanzaError('internal-server-error') 325 else: 326 exc = failure.value 327 328 return exc.toResponse(iq) 329 330 iq.handled = True 331 resource = unicode(iq.bind) or None 332 d = self.service.bindResource(self.username, 333 self.service.domain, 334 resource) 335 d.addCallback(cb) 336 d.addErrback(eb) 337 d.addCallback(self.xmlstream.send) 338 339 340 def onSession(self, iq): 341 iq.handled = True 342 343 reply = domish.Element((None, 'iq')) 344 reply['type'] = 'result' 345 if iq.getAttribute('id'): 346 reply['id'] = iq['id'] 347 reply.addElement((client.NS_XMPP_SESSION, 'session')) 348 self.xmlstream.send(reply) 349 350 351 352 class RecipientUnavailable(Exception): 353 """ 354 The addressed entity is not, or no longer, available. 355 """ 356 357 358 359 class XMPPC2SServerFactory(XmlStreamServerFactory): 360 361 def __init__(self, service): 362 self.service = service 363 364 def authenticatorFactory(): 365 return XMPPClientListenAuthenticator(service) 366 367 XmlStreamServerFactory.__init__(self, authenticatorFactory) 368 self.addBootstrap(xmlstream.STREAM_CONNECTED_EVENT, 369 self.onConnectionMade) 370 self.addBootstrap(xmlstream.STREAM_AUTHD_EVENT, 371 self.onAuthenticated) 372 373 self.serial = 0 374 self.streams = {} 375 376 377 def onConnectionMade(self, xs): 378 """ 379 Called when a client-to-server connection was made. 380 381 This enables traffic debugging on incoming streams. 382 """ 383 xs.serial = self.serial 384 self.serial += 1 385 386 log.msg("Client connection %d made" % xs.serial) 387 388 def logDataIn(buf): 389 log.msg("RECV (%d): %r" % (xs.serial, buf)) 390 391 def logDataOut(buf): 392 log.msg("SEND (%d): %r" % (xs.serial, buf)) 393 394 if self.logTraffic: 395 xs.rawDataInFn = logDataIn 396 xs.rawDataOutFn = logDataOut 397 398 xs.addObserver(xmlstream.STREAM_ERROR_EVENT, self.onError) 399 400 401 def onAuthenticated(self, xs): 402 log.msg("Client connection %d authenticated" % xs.serial) 403 404 xs.addObserver(xmlstream.STREAM_END_EVENT, self.onConnectionLost, 405 0, xs) 406 xs.addObserver('/*', self.onElement, 0, xs) 407 408 # Record this stream as bound to the authenticated JID 409 self.streams[xs.otherEntity] = xs 410 411 412 def onConnectionLost(self, xs, reason): 413 log.msg("Client connection %d disconnected" % xs.serial) 414 415 entity = xs.otherEntity 416 self.service.unbindResource(entity.user, 417 entity.host, 418 entity.resource, 419 reason) 420 421 # If the lost connections had been bound, remove the reference 422 if xs.otherEntity in self.streams: 423 del self.streams[xs.otherEntity] 424 425 426 def onError(self, reason): 427 log.err(reason, "Stream Error") 428 429 430 def onElement(self, xs, stanza): 431 """ 432 Called when an element was received from one of the connected streams. 433 434 """ 435 if stanza.handled: 436 return 437 else: 438 self.service.onElement(stanza, xs.otherEntity) 439 440 441 def deliverStanza(self, element, recipient): 442 if recipient in self.streams: 443 self.streams[recipient].send(element) 444 else: 445 raise RecipientUnavailable(u"There is no connection for %s" % 446 recipient.full()) 447 448 449 class Session(object): 450 def __init__(self, entity): 451 self.entity = entity 452 self.connected = False 453 self.interested = False 454 self.presence = None 455 456 457 458 class SessionManager(XMPPHandler): 459 460 """ 461 Session Manager. 462 463 @ivar xmlstream: XML Stream to inject incoming stanzas from client 464 connections into. Stanzas where the C{'to'} attribute is not set 465 or is directed at the local domain are injected as if received on 466 the XML Stream (using C{dispatch}), other stanzas are injected as if 467 they were sent from the XML Stream (using C{send}). 468 """ 469 470 def __init__(self, domain, accounts): 471 XMPPHandler.__init__(self) 472 self.domain = domain 473 self.accounts = accounts 474 475 self.connectionManager = None 476 self.sessions = {} 477 self.clientStream = utility.EventDispatcher() 478 self.clientStream.addObserver('/*', self.routeOrDeliver, -1) 479 480 481 def bindResource(self, localpart, domain, resource): 482 if domain != self.domain: 483 raise Exception("I don't host this domain!") 484 485 try: 486 userSessions = self.sessions[localpart] 487 except KeyError: 488 userSessions = self.sessions[localpart] = {} 489 490 if resource is None: 491 resource = randbytes.secureRandom(8).encode('hex') 492 elif resource in self.userSessions: 493 resource = resource + ' ' + randbytes.secureRandom(8).encode('hex') 494 495 entity = JID(tuple=(localpart, domain, resource)) 496 session = Session(entity) 497 session.connected = True 498 userSessions[resource] = session 499 500 return defer.succeed(entity) 501 502 503 def unbindResource(self, localpart, domain, resource, reason=None): 504 try: 505 session = self.sessions[localpart][resource] 506 except KeyError: 507 pass 508 else: 509 session.connected = False 510 del self.sessions[localpart][resource] 511 if not self.sessions[localpart]: 512 del self.sessions[localpart] 513 514 return defer.succeed(None) 515 516 517 def onElement(self, element, sender): 518 # Make sure each stanza has a sender address 519 if (element.name == 'presence' and 520 element.getAttribute('type') in ('subscribe', 'subscribed', 521 'unsubscribe', 'unsubscribed')): 522 element['from'] = sender.userhost() 523 else: 524 element['from'] = sender.full() 525 526 self.clientStream.dispatch(element) 527 528 529 def routeOrDeliver(self, element): 530 if element.handled: 531 return 532 533 if (not element.hasAttribute('to') or 534 internJID(element['to']).host == self.domain): 535 # This stanza is for local delivery 536 log.msg("Delivering locally: %r" % element.toXml()) 537 self.xmlstream.dispatch(element) 538 else: 539 # This stanza is for remote routing 540 log.msg("Routing remotely: %r" % element.toXml()) 541 self.xmlstream.send(element) 542 543 544 def deliverStanza(self, element, recipient): 545 if self.connectionManager: 546 self.connectionManager.deliverStanza(element, recipient) 547 else: 548 raise Exception("No connection manager set") -
wokkel/component.py
diff -r 5c11baa0ef4c wokkel/component.py
a b 313 313 """ 314 314 destination = JID(stanza['to']) 315 315 316 log.msg("Routing to %s: %r" % (destination.full(), stanza.toXml()))317 316 318 317 if destination.host in self.routes: 318 msg = "Routing to %s (default route): %r" 319 log.msg("Routing to %s: %r" % (destination.full(), 320 stanza.toXml())) 319 321 self.routes[destination.host].send(stanza) 322 elif None in self.routes: 323 log.msg("Routing to %s (default route): %r" % (destination.full(), 324 stanza.toXml())) 325 self.routes[None].send(stanza) 320 326 else: 321 self.routes[None].send(stanza) 327 log.msg("No route to %s: %r" % (destination.full(), 328 stanza.toXml())) 329 if stanza.getAttribute('type') not in ('result', 'error'): 330 # No route, send back error 331 exc = error.StanzaError('remote-server-timeout', type='wait') 332 exc.code = '504' 333 response = exc.toResponse(stanza) 334 self.route(response) 322 335 323 336 324 337 -
wokkel/im.py
diff -r 5c11baa0ef4c wokkel/im.py
a b 10 10 U{RFC 6121<http://www.xmpp.org/rfcs/rfc6121.html>} (XMPP IM). 11 11 """ 12 12 13 import copy 14 13 15 from twisted.internet import defer 16 from twisted.python import log 14 17 from twisted.words.protocols.jabber import jid 15 18 from twisted.words.protocols.jabber import error 16 19 from twisted.words.xish import domish … … 168 171 self.xmlstream.addObserver("/presence", self._onPresence) 169 172 170 173 171 def _onPresence(self, element): 174 def parsePresence(self, element): 175 """ 176 Parse presence. 177 """ 172 178 stanza = Stanza.fromElement(element) 173 179 174 180 presenceType = stanza.stanzaType or 'available' … … 178 184 except KeyError: 179 185 return 180 186 181 presence = parser.fromElement(element) 187 return parser.fromElement(element) 188 189 190 def _onPresence(self, element): 191 presence = self.parsePresence(element) 192 presenceType = presence.stanzaType or 'available' 182 193 183 194 try: 184 195 handler = getattr(self, '%sReceived' % presenceType) 185 196 except AttributeError: 186 197 return 187 198 else: 188 handler(presence)199 element.handled = handler(presence) 189 200 190 201 191 202 def errorReceived(self, presence): … … 553 564 554 565 555 566 567 class AccountIQHandler(XMPPHandler): 568 569 def __init__(self, sessionManager): 570 XMPPHandler.__init__(self) 571 self.sessionManager = sessionManager 572 573 574 def connectionMade(self): 575 self.xmlstream.addObserver('/iq', self.onIQ, 1) 576 577 578 def onIQ(self, iq): 579 """ 580 Handler for iq stanzas to user accounts' connected resources. 581 582 If the recipient is a bare JID or there is no associated user, this 583 handler ignores the stanza, so that other handlers have a chance 584 to pick it up. If used, L{generic.FallbackHandler} will respond with a 585 C{'service-unavailable'} stanza error if no other handlers handle 586 the iq. 587 """ 588 589 if iq.handled: 590 return 591 592 try: 593 recipient = jid.internJID(iq['to']) 594 except KeyError: 595 return 596 597 if not recipient.user: 598 # This is not for an account, ignore it 599 return 600 elif recipient.user not in self.sessionManager.accounts: 601 # This is not a user, ignore it 602 return 603 elif not recipient.resource: 604 # Bare JID at local domain, ignore it 605 return 606 607 userSessions = self.sessionManager.sessions.get(recipient.user, 608 {}) 609 if recipient.resource in userSessions: 610 self.sessionManager.deliverStanza(iq, recipient) 611 else: 612 # Full JID without connected resource, return error 613 exc = error.StanzaError('service-unavailable') 614 if iq['type'] in ('result', 'error'): 615 log.err(exc, 'Could not deliver IQ response') 616 else: 617 self.send(exc.toResponse(iq)) 618 619 iq.handled = True 620 621 622 623 class AccountMessageHandler(XMPPHandler): 624 625 def __init__(self, sessionManager): 626 XMPPHandler.__init__(self) 627 self.sessionManager = sessionManager 628 629 630 def connectionMade(self): 631 self.xmlstream.addObserver('/message', self.onMessage, 1) 632 633 634 def onMessage(self, message): 635 """ 636 Handler for message stanzas to user accounts. 637 """ 638 639 if message.handled: 640 return 641 642 try: 643 recipient = jid.internJID(message['to']) 644 except KeyError: 645 return 646 647 stanzaType = message.getAttribute('type', 'normal') 648 649 try: 650 if not recipient.user: 651 # This is not for an account, ignore it 652 return 653 elif recipient.user not in self.sessionManager.accounts: 654 # This is not a user, ignore it 655 return 656 elif recipient.resource: 657 userSessions = self.sessionManager.sessions.get(recipient.user, 658 {}) 659 if recipient.resource in userSessions: 660 self.sessionManager.deliverStanza(message, recipient) 661 else: 662 if stanzaType in ('normal', 'chat', 'headline'): 663 self.onMessageBareJID(message, recipient.userhostJID()) 664 elif stanzaType == 'error': 665 log.msg("Dropping message to unconnected resource %r" % 666 recipient.full()) 667 elif stanzaType == 'groupchat': 668 raise error.StanzaError('service-unavailable') 669 else: 670 self.onMessageBareJID(message, recipient) 671 except error.StanzaError, exc: 672 if stanzaType == 'error': 673 log.err(exc, "Undeliverable error") 674 else: 675 self.send(exc.toResponse(message)) 676 677 message.handled = True 678 679 680 def onMessageBareJID(self, message, bareJID): 681 stanzaType = message.getAttribute('type', 'normal') 682 683 userSessions = self.sessionManager.sessions.get(bareJID.user, {}) 684 685 recipients = set() 686 687 if stanzaType == 'headline': 688 for session in userSessions: 689 if session.presence.priority >= 0: 690 recipients.add(session.entity) 691 elif stanzaType in ('chat', 'normal'): 692 priorities = {} 693 for session in userSessions.itervalues(): 694 if not session.presence or not session.presence.available: 695 continue 696 priority = session.presence.priority 697 if priority >= 0: 698 priorities.setdefault(priority, set()).add(session.entity) 699 maxPriority = max(priorities.keys()) 700 recipients.update(priorities[maxPriority]) 701 elif stanzaType == 'groupchat': 702 raise error.StanzaError('service-unavailable') 703 704 if recipients: 705 for recipient in recipients: 706 self.sessionManager.deliverStanza(message, recipient) 707 elif stanzaType in ('chat', 'normal'): 708 raise error.StanzaError('service-unavailable') 709 else: 710 # silently discard 711 log.msg("Discarding message to %r" % message['to']) 712 713 714 715 716 def clonePresence(presence): 717 """ 718 Make a deep copy of a presence stanza. 719 720 The returned presence stanza is an orphaned deep copy of the given 721 original. 722 723 @note: Since the reference to the original parent, if any, is gone, 724 inherited attributes like C{xml:lang} are not preserved. 725 """ 726 element = presence.element 727 728 parent = element.parent 729 element.parent = None 730 newElement = copy.deepcopy(element) 731 element.parent = parent 732 return newElement 733 734 735 736 class PresenceServerHandler(PresenceProtocol): 737 738 def __init__(self, sessionManager, domain, roster): 739 PresenceProtocol.__init__(self) 740 self.sessionManager = sessionManager 741 self.domain = domain 742 self.roster = roster 743 self.presences = {} # user -> resource -> presence 744 self.offlinePresences = {} # user -> presence 745 self.remotePresences = {} # user -> remote entity -> presence 746 747 self.sessionManager.clientStream.addObserver('/presence', 748 self._onPresenceOutbound) 749 750 751 def _onPresenceOutbound(self, element): 752 log.msg("Got outbound presence: %r" % element.toXml()) 753 presence = self.parsePresence(element) 754 755 presenceType = presence.stanzaType or 'available' 756 method = '%sReceivedOutbound' % presenceType 757 print method 758 759 try: 760 handler = getattr(self, method) 761 except AttributeError: 762 return 763 else: 764 element.handled = handler(presence) 765 766 767 def _broadcastToOtherResources(self, presence): 768 """ 769 Broadcast presence to other available resources. 770 """ 771 fromJID = presence.sender 772 for otherResource in self.presences[fromJID.user]: 773 if otherResource == fromJID.resource: 774 continue 775 776 resourceJID = jid.JID(tuple=(fromJID.user, 777 fromJID.host, 778 otherResource)) 779 outPresence = clonePresence(presence) 780 outPresence['to'] = resourceJID.full() 781 self.sessionManager.deliverStanza(outPresence, resourceJID) 782 783 784 def _broadcastToContacts(self, presence): 785 """ 786 Broadcast presence to subscribed entities. 787 """ 788 fromJID = presence.sender 789 roster = self.roster[fromJID.user] 790 791 for item in roster.itervalues(): 792 if not item.subscriptionFrom: 793 continue 794 795 outPresence = clonePresence(presence) 796 outPresence['to'] = item.entity.full() 797 798 if item.entity.host == self.domain: 799 # local contact 800 if item.entity.user in self.presences: 801 # broadcast to contact's available resources 802 for itemResource in self.presences[item.entity.user]: 803 resourceJID = jid.JID(tuple=(item.entity.user, 804 item.entity.host, 805 itemResource)) 806 self.sessionManager.deliverStanza(outPresence, 807 resourceJID) 808 else: 809 # remote contact 810 self.send(outPresence) 811 812 813 def _on_availableBroadcast(self, presence): 814 fromJID = presence.sender 815 user, resource = fromJID.user, fromJID.resource 816 roster = self.roster[user] 817 818 if user not in self.presences: 819 # initial presence 820 self.presences[user] = {} 821 self.remotePresences[user] = {} 822 823 # send out probes 824 for item in roster.itervalues(): 825 if item.subscriptionTo and item.entity.host != self.domain: 826 self.probe(item.entity, fromJID) 827 else: 828 if resource not in self.presences[user]: 829 # initial presence with another available resource 830 831 # send last known presences from remote contacts 832 remotePresences = self.remotePresences[user] 833 for entity, remotePresence in remotePresences.iteritems(): 834 self.sessionManager.deliverStanza(remotePresence.element, 835 fromJID) 836 837 # send presence to other resources 838 self._broadcastToOtherResources(presence) 839 840 # Send last known local presences 841 if user not in self.presences or resource not in self.presences[user]: 842 for item in roster.itervalues(): 843 if item.subscriptionTo and \ 844 item.entity.host == self.domain and \ 845 item.entity.user in self.presences: 846 for contactPresence in \ 847 self.presences[item.entity.user].itervalues(): 848 outPresence = clonePresence(contactPresence) 849 outPresence['to'] = fromJID.userhost() 850 self.sessionManager.deliverStanza(outPresence, fromJID) 851 852 # broadcast presence 853 self._broadcastToContacts(presence) 854 855 # save presence 856 self.presences[user][resource] = presence 857 self.sessionManager.sessions[user][resource].presence = presence 858 859 return True 860 861 862 def _on_availableDirected(self, presence): 863 self.send(presence.element) 864 return True 865 866 867 def availableReceivedOutbound(self, presence): 868 if presence.recipient: 869 return self._on_availableDirected(presence) 870 else: 871 return self._on_availableBroadcast(presence) 872 873 874 def availableReceived(self, presence): 875 fromJID = presence.sender 876 toJID = presence.recipient 877 878 if toJID.user not in self.roster: 879 return False 880 881 if toJID.user in self.presences: 882 for resource in self.presences[toJID.user]: 883 resourceJID = jid.JID(tuple=(toJID.user, 884 toJID.host, 885 resource)) 886 self.sessionManager.deliverStanza(presence.element, resourceJID) 887 self.remotePresences[toJID.user][fromJID] = presence 888 else: 889 # no such user or no available resource, ignore this stanza 890 pass 891 892 return True 893 894 895 def _on_unavailableBroadcast(self, presence): 896 fromJID = presence.sender 897 user, resource = fromJID.user, fromJID.resource 898 899 # broadcast presence 900 self._broadcastToContacts(presence) 901 902 if user in self.presences: 903 # send presence to other resources 904 self._broadcastToOtherResources(presence) 905 906 # update stored presences 907 if resource in self.presences[user]: 908 del self.presences[user][resource] 909 910 if not self.presences[user]: 911 # last resource to become unavailable 912 del self.presences[user] 913 914 # TODO: save last unavailable presence 915 916 return True 917 918 919 def _on_unavailableDirected(self, presence): 920 self.send(presence.element) 921 return True 922 923 924 def unavailableReceivedOutbound(self, presence): 925 if presence.recipient: 926 return self._on_unavailableDirected(presence) 927 else: 928 return self._on_unavailableBroadcast(presence) 929 930 # def unavailableReceived(self, presence): 931 932 933 def subscribedReceivedOutbound(self, presence): 934 log.msg("%r subscribed %s to its presence" % (presence.sender, 935 presence.recipient)) 936 self.send(presence.element) 937 return True 938 939 940 def subscribedReceived(self, presence): 941 log.msg("%r subscribed %s to its presence" % (presence.sender, 942 presence.recipient)) 943 944 945 def unsubscribedReceivedOutbound(self, presence): 946 log.msg("%r unsubscribed %s from its presence" % (presence.sender, 947 presence.recipient)) 948 self.send(presence.element) 949 return True 950 951 952 def unsubscribedReceived(self, presence): 953 log.msg("%r unsubscribed %s from its presence" % (presence.sender, 954 presence.recipient)) 955 956 957 def subscribeReceivedOutbound(self, presence): 958 log.msg("%r requests subscription to %s" % (presence.sender, 959 presence.recipient)) 960 self.send(presence.element) 961 return True 962 963 964 def subscribeReceived(self, presence): 965 log.msg("%r requests subscription to %s" % (presence.sender, 966 presence.recipient)) 967 968 969 def unsubscribeReceivedOutbound(self, presence): 970 log.msg("%r requests unsubscription from %s" % (presence.sender, 971 presence.recipient)) 972 self.send(presence.element) 973 return True 974 975 976 def unsubscribeReceived(self, presence): 977 log.msg("%r requests unsubscription from %s" % (presence.sender, 978 presence.recipient)) 979 980 981 def probeReceived(self, presence): 982 fromJID = presence.sender 983 toJID = presence.recipient 984 985 if toJID.user not in self.roster or \ 986 fromJID.userhost() not in self.roster[toJID.user] or \ 987 not self.roster[toJID.user][fromJID.userhost()].subscriptionFrom: 988 # send unsubscribed 989 pass 990 elif toJID.user not in self.presences: 991 # send last unavailable or nothing 992 pass 993 else: 994 for resourcePresence in self.presences[toJID.user].itervalues(): 995 outPresence = clonePresence(resourcePresence) 996 outPresence['to'] = fromJID.userhost() 997 self.send(outPresence) 998 999 1000 556 1001 class RosterServerProtocol(XMPPHandler, IQHandlerMixin): 557 1002 """ 558 1003 XMPP subprotocol handler for the roster, server side. -
wokkel/test/test_im.py
diff -r 5c11baa0ef4c wokkel/test/test_im.py
a b 13 13 from twisted.words.xish import domish, utility 14 14 15 15 from wokkel import im 16 from wokkel.generic import ErrorStanza, parseXml16 from wokkel.generic import ErrorStanza, Stanza, parseXml 17 17 from wokkel.test.helpers import TestableRequestHandlerMixin, XmlStreamStub 18 18 19 19 NS_XML = 'http://www.w3.org/XML/1998/namespace' … … 846 846 847 847 848 848 849 class AccountIQHandlerTest(unittest.TestCase): 850 """ 851 Tests for L{im.AccountIQHandler}. 852 """ 853 854 def setUp(self): 855 self.stub = XmlStreamStub() 856 self.protocol = im.AccountIQHandler(None) 857 self.protocol.makeConnection(self.stub.xmlstream) 858 self.protocol.connectionInitialized() 859 860 861 def test_onIQNotUser(self): 862 """ 863 IQs to JIDs without local part are ignored. 864 """ 865 xml = """ 866 <iq to='example.org'> 867 <query xmlns='jabber:iq:version'/> 868 </iq> 869 """ 870 871 iq = parseXml(xml) 872 self.stub.send(iq) 873 874 self.assertFalse(getattr(iq, 'handled')) 875 876 877 878 class AccountMessageHandlerTest(unittest.TestCase): 879 """ 880 Tests for L{im.AccountMessageHandler}. 881 """ 882 883 def setUp(self): 884 self.stub = XmlStreamStub() 885 self.protocol = im.AccountMessageHandler(None) 886 self.protocol.makeConnection(self.stub.xmlstream) 887 self.protocol.connectionInitialized() 888 889 890 def test_onMessageNotUser(self): 891 """ 892 Messages to JIDs without local part are ignored. 893 """ 894 xml = """ 895 <message to='example.org'> 896 <body>Hello</body> 897 </message> 898 """ 899 900 message = parseXml(xml) 901 self.stub.send(message) 902 903 self.assertFalse(getattr(message, 'handled')) 904 905 906 907 class ClonePresenceTest(unittest.TestCase): 908 """ 909 Tests for L{im.clonePresence}. 910 """ 911 912 def test_rootElement(self): 913 """ 914 The copied presence stanza is not identical, but renders identically. 915 """ 916 originalElement = domish.Element((None, 'presence')) 917 stanza = Stanza.fromElement(originalElement) 918 copyElement = im.clonePresence(stanza) 919 920 self.assertNotIdentical(copyElement, originalElement) 921 self.assertEquals(copyElement.toXml(), originalElement.toXml()) 922 923 924 849 925 class RosterServerProtocolTest(unittest.TestCase, TestableRequestHandlerMixin): 850 926 """ 851 927 Tests for L{im.RosterServerProtocol}.
Note: See TracBrowser
for help on using the repository browser.