Changeset 37:03cd0cb8548c in ralphm-patches for xmpp_client_service.patch
- Timestamp:
- Feb 14, 2010, 11:09:07 AM (12 years ago)
- Branch:
- default
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
xmpp_client_service.patch
r35 r37 1 diff -r 67a42d8a1c73 wokkel/client.py 2 --- a/wokkel/client.py Fri Feb 12 09:31:19 2010 +0100 3 +++ b/wokkel/client.py Fri Feb 12 11:46:41 2010 +0100 1 diff -r 1644083ca235 doc/examples/client_service.tac 2 --- /dev/null Thu Jan 01 00:00:00 1970 +0000 3 +++ b/doc/examples/client_service.tac Sat Feb 13 19:01:07 2010 +0100 4 @@ -0,0 +1,64 @@ 5 +from twisted.application import service, strports 6 +from twisted.internet import defer 7 + 8 +from wokkel import client 9 +from wokkel.generic import FallbackHandler 10 +from wokkel.subprotocols import XMPPHandler 11 +from wokkel.xmppim import RosterItem, RosterServerProtocol 12 + 13 +from twisted.words.protocols.jabber.jid import internJID as JID 14 + 15 +import socket 16 +domain = socket.gethostname() 17 + 18 +class StaticRoster(RosterServerProtocol): 19 + 20 + def __init__(self): 21 + RosterServerProtocol.__init__(self) 22 + self.roster = {'ralphm': [ 23 + RosterItem(JID('intosi@'+domain), 24 + subscriptionTo=True, 25 + subscriptionFrom=True, 26 + name='Intosi', 27 + groups=set(['Friends'])), 28 + RosterItem(JID('termie@'+domain), 29 + subscriptionTo=True, 30 + subscriptionFrom=True, 31 + name='termie'), 32 + ], 33 + 'test': [], 34 + } 35 + 36 + def getRoster(self, entity): 37 + return defer.succeed(self.roster[entity.user]) 38 + 39 + 40 +class Hello(XMPPHandler): 41 + 42 + def q(self): 43 + from wokkel.xmppim import AvailabilityPresence 44 + p = AvailabilityPresence(JID('ralphm@'+domain+'/default'), JID('termie@'+domain+'/Home'), show='chat') 45 + self.parent.send(p.toElement()) 46 + 47 + 48 + def connectionInitialized(self): 49 + from twisted.internet import reactor 50 + reactor.callLater(5, self.q) 51 + 52 + 53 + 54 +application = service.Application("Jabber server") 55 + 56 +sessionManager = client.SessionManager() 57 +FallbackHandler().setHandlerParent(sessionManager) 58 +StaticRoster().setHandlerParent(sessionManager) 59 +Hello().setHandlerParent(sessionManager) 60 + 61 +clientService = client.ClientService(sessionManager, domain) 62 +clientService.logTraffic = True 63 + 64 + 65 +c2sFactory = client.XMPPC2SServerFactory(clientService) 66 +c2sFactory.logTraffic = True 67 +c2sService = strports.service('5224', c2sFactory) 68 +c2sService.setServiceParent(application) 69 diff -r 1644083ca235 wokkel/client.py 70 --- a/wokkel/client.py Sat Feb 13 18:57:27 2010 +0100 71 +++ b/wokkel/client.py Sat Feb 13 19:01:07 2010 +0100 4 72 @@ -17,11 +17,12 @@ 5 73 from twisted.names.srvconnect import SRVConnector … … 7 75 from twisted.words.protocols.jabber import client, error, sasl, xmlstream 8 76 -from twisted.words.protocols.jabber.jid import internJID as JID 9 -from twisted.words.xish import domish10 77 +from twisted.words.protocols.jabber.jid import JID, internJID 11 +from twisted.words.xish import domish, utility 78 from twisted.words.xish import domish 12 79 13 80 from wokkel import generic … … 18 85 NS_CLIENT = 'jabber:client' 19 86 20 @@ -350,3 +351,132 @@ 87 @@ -311,6 +312,7 @@ 88 89 # TODO: check for resource conflicts 90 91 + print self.username, self.domain, self.resource 92 newJID = JID(tuple=(self.username, self.domain, self.resource)) 93 94 reply = domish.Element((None, 'iq')) 95 @@ -350,3 +352,166 @@ 21 96 reply['id'] = iq['id'] 22 97 reply.addElement((client.NS_XMPP_SESSION, 'session')) … … 25 100 + 26 101 + 27 +class DemuxedXmlStream(utility.EventDispatcher): 28 + """ 29 + Fake XML stream for demultiplexing incoming streams. 30 + 31 + Incoming traffic should have its C{from} attribute set to the JID of the 32 + sender and then L{dispatch<utility.EventDispatcher.dispatch>}ed. Outgoing 33 + traffic needs to have the C{to} attribute set. It is then passed on to 34 + the stream manager's C{send} method. 35 + """ 36 + 37 + def send(self, element): 38 + """ 39 + Send element out over the wire. 40 + 41 + This calls the stream manager to forward the element based on 42 + the embedded addressing information. 43 + """ 44 + self.manager.send(element) 45 + 46 + 47 + 48 +class ClientService(XMPPHandlerCollection, service.Service): 102 +class XMPPC2SServerFactory(XmlStreamServerFactory): 103 + 104 + def __init__(self, service): 105 + self.service = service 106 + 107 + def authenticatorFactory(): 108 + return XMPPClientListenAuthenticator(service.domain) 109 + 110 + XmlStreamServerFactory.__init__(self, authenticatorFactory) 111 + self.addBootstrap(xmlstream.STREAM_CONNECTED_EVENT, 112 + self.onConnectionMade) 113 + self.addBootstrap(xmlstream.STREAM_AUTHD_EVENT, 114 + self.onAuthenticated) 115 + 116 + self.serial = 0 117 + 118 + 119 + def onConnectionMade(self, xs): 120 + """ 121 + Called when a client-to-server connection was made. 122 + 123 + This enables traffic debugging on incoming streams. 124 + """ 125 + xs.serial = self.serial 126 + self.serial += 1 127 + 128 + log.msg("Client connection %d made" % xs.serial) 129 + 130 + def logDataIn(buf): 131 + log.msg("RECV (%d): %r" % (xs.serial, buf)) 132 + 133 + def logDataOut(buf): 134 + log.msg("SEND (%d): %r" % (xs.serial, buf)) 135 + 136 + if self.logTraffic: 137 + xs.rawDataInFn = logDataIn 138 + xs.rawDataOutFn = logDataOut 139 + 140 + xs.addObserver(xmlstream.STREAM_ERROR_EVENT, self.onError) 141 + 142 + 143 + def onAuthenticated(self, xs): 144 + log.msg("Client connection %d authenticated" % xs.serial) 145 + 146 + xs.addObserver(xmlstream.STREAM_END_EVENT, self.onConnectionLost, 147 + 0, xs) 148 + xs.addObserver('/*', self.onElement, 0, xs) 149 + 150 + self.service.connectionInitialized(xs) 151 + 152 + 153 + def onConnectionLost(self, xs, reason): 154 + log.msg("Client connection %d disconnected" % xs.serial) 155 + 156 + self.service.connectionLost(xs, reason) 157 + 158 + 159 + def onError(self, reason): 160 + log.err(reason, "Stream Error") 161 + 162 + 163 + def onElement(self, xs, element): 164 + """ 165 + Called when an element was received from one of the connected streams. 166 + 167 + """ 168 + if element.handled: 169 + return 170 + else: 171 + self.service.dispatch(xs, element) 172 + 173 + 174 + 175 +class SessionManager(XMPPHandlerCollection): 176 + 177 + def __init__(self): 178 + XMPPHandlerCollection.__init__(self) 179 + self.xmlstream = None 180 + 181 + 182 + def makeConnection(self, xs): 183 + self.xmlstream = xs 184 + 185 + for handler in self: 186 + handler.makeConnection(xs) 187 + handler.connectionInitialized() 188 + 189 + 190 + def send(self, obj): 191 + if self.xmlstream: 192 + self.xmlstream.send(obj) 193 + 194 + 195 +class ClientService(object): 49 196 + """ 50 197 + Service for accepting XMPP client connections. … … 70 217 + logTraffic = False 71 218 + 72 + def __init__(self, domain, port=5222): 219 + def __init__(self, sessionManager, domain): 220 + self.sessionManager = sessionManager 73 221 + self.domain = domain 74 + self.port = port75 +76 + def authenticatorFactory():77 + return XMPPClientListenAuthenticator(self.domain)78 +79 + self.factory = XmlStreamServerFactory(authenticatorFactory)80 + self.factory.addBootstrap(xmlstream.STREAM_CONNECTED_EVENT,81 + self.makeConnection)82 + self.factory.addBootstrap(xmlstream.STREAM_AUTHD_EVENT,83 + self.connectionInitialized)84 222 + 85 223 + self.streams = {} 86 224 + 87 + XMPPHandlerCollection.__init__(self) 88 + 89 + 90 + def startService(self): 91 + 92 + self.xmlstream = DemuxedXmlStream() 93 + self.xmlstream.manager = self 94 + self._initialized = True 95 + 96 + for handler in self: 97 + handler.makeConnection(self.xmlstream) 98 + handler.connectionInitialized() 99 + 100 + service.Service.startService(self) 101 + reactor.listenTCP(self.port, self.factory) 102 + 103 + 104 + def makeConnection(self, xs): 105 + def logDataIn(buf): 106 + log.msg("RECV: %r" % buf) 107 + 108 + def logDataOut(buf): 109 + log.msg("SEND: %r" % buf) 110 + 111 + if self.logTraffic: 112 + xs.rawDataInFn = logDataIn 113 + xs.rawDataOutFn = logDataOut 225 + pipe = generic.XmlPipe() 226 + self.xmlstream = pipe.source 227 + self.sessionManager.makeConnection(pipe.sink) 228 + self.xmlstream.addObserver('/*', self.send) 114 229 + 115 230 + 116 231 + def connectionInitialized(self, xs): 117 + self.streams[xs.otherEntity.full()] = xs 118 + xs.addObserver(xmlstream.STREAM_END_EVENT, 119 + lambda failure: self.connectionDisconnected(xs)) 120 + xs.addObserver('/*', lambda element: self.onElement(element, xs)) 121 + 122 + 123 + def connectionDisconnected(self, xs): 124 + del self.streams[xs.otherEntity.full()] 125 + 126 + 127 + def onElement(self, element, xs): 232 + self.streams[xs.otherEntity] = xs 233 + 234 + 235 + def connectionLost(self, xs, reason): 236 + if xs.otherEntity in self.streams: 237 + del self.streams[xs.otherEntity] 238 + 239 + 240 + def send(self, stanza): 241 + """ 242 + Send stanza to the proper XML Stream. 243 + 244 + This uses addressing embedded in the element to find the correct stream 245 + to forward the element to. 246 + """ 247 + destination = internJID(stanza["to"]) 248 + 249 + if destination not in self.streams: 250 + log.msg("Euh") 251 + raise Exception("Destination unreachable") 252 + 253 + self.streams[destination].send(stanza) 254 + 255 + 256 + def dispatch(self, xs, stanza): 128 257 + """ 129 258 + Called when an element was received from one of the connected streams. 130 + 131 + """ 132 + if element.handled: 133 + return 134 + 135 + element["from"] = xs.otherEntity.full() 136 + self.xmlstream.dispatch(element) 137 + 138 + 139 + def send(self, element): 140 + """ 141 + Send element to the proper XML Stream. 142 + 143 + This uses addressing embedded in the element to find the correct 144 + stream to forward the element to. 145 + """ 146 + 147 + destination = internJID(element["to"]).full() 148 + 149 + if destination not in self.streams: 150 + raise Exception("Destination unreachable") 151 + 152 + self.streams[destination].send(element) 259 + """ 260 + stanza["from"] = xs.otherEntity.full() 261 + self.xmlstream.send(stanza)
Note: See TracChangeset
for help on using the changeset viewer.