]> code.delx.au - pymsnt/blob - src/ft.py
Fixed file receiving.
[pymsnt] / src / ft.py
1 # Copyright 2005 James Bunton <james@delx.cjb.net>
2 # Licensed for distribution under the GPL version 2, check COPYING for details
3
4 from tlib.xmlw import Element
5 from twisted.internet import protocol
6
7 import disco
8 from debug import LogEvent, INFO, WARN, ERROR
9 import config
10 import utils
11
12 import random
13 import sys
14
15
16 ###########
17 # Sending #
18 ###########
19
20 class FTSend:
21 """ For file transfers going from Jabber to MSN. """
22 def __init__(self, startTransfer, cancelTransfer, filename, filesize):
23 self.startTransfer = startTransfer
24 self.cancelTransfer = cancelTransfer
25 self.filename = filename
26 self.filesize = filesize
27
28 def accept(self, legacyFileSend):
29 self.startTransfer(legacyFileSend)
30
31 def reject(self):
32 del self.startTransfer
33 self.cancelTransfer()
34
35
36 try:
37 from twisted.web import http
38 except ImportError:
39 try:
40 from twisted.protocols import http
41 except ImportError:
42 print "Couldn't find http.HTTPClient. If you're using Twisted 2.0, make sure that you've installed twisted.web"
43 raise
44
45
46 class OOBHeaderHelper(http.HTTPClient):
47 """ Makes a HEAD request and grabs the length """
48 def connectionMade(self):
49 self.sendCommand("HEAD", self.factory.path.encode("utf-8"))
50 self.sendHeader("Host", (self.factory.host + ":" + str(self.factory.port)).encode("utf-8"))
51 self.endHeaders()
52
53 def handleEndHeaders(self):
54 self.factory.gotLength(self.length)
55
56 def handleResponse(self, data):
57 pass
58
59
60 class OOBSendConnector(http.HTTPClient):
61 def connectionMade(self):
62 self.sendCommand("GET", self.factory.path.encode("utf-8"))
63 self.sendHeader("Host", (self.factory.host + ":" + str(self.factory.port)).encode("utf-8"))
64 self.endHeaders()
65 self.first = True
66
67 def handleResponsePart(self, data):
68 self.factory.consumer.write(data)
69
70 def handleResponseEnd(self):
71 # This is called once before writing is finished, and once when the
72 # connection closes. We only consumer.close() on the second.
73 if self.first:
74 self.first = False
75 else:
76 self.factory.consumer.close()
77 self.factory.consumer = None
78 self.factory.finished()
79
80
81
82
83
84 #############
85 # Receiving #
86 #############
87
88 class FTReceive:
89 """ For file transfers going from MSN to Jabber. """
90
91 """
92 Plan of action for this class:
93 * Determine the FT support of the Jabber client.
94 * If we find a common protocol, then send the invitation.
95 * Tell the legacyftp object the result of the invitation.
96 * If it was accepted, then start the transfer.
97
98 """
99
100 def __init__(self, session, senderJID, legacyftp):
101 self.session = session
102 self.toJID = self.session.jabberID + "/" + self.session.highestResource()
103 self.senderJID = senderJID
104 self.ident = (self.toJID, self.senderJID)
105 self.legacyftp = legacyftp
106 LogEvent(INFO)
107 self.checkSupport()
108
109 def checkSupport(self):
110 def discoDone(features):
111 LogEvent(INFO, self.ident)
112 enabledS5B = hasattr(self.session.pytrans, "ftSOCKS5Receive")
113 enabledOOB = hasattr(self.session.pytrans, "ftOOBReceive")
114 hasFT = features.count(disco.FT)
115 hasS5B = features.count(disco.S5B)
116 hasOOB = features.count(disco.IQOOB)
117 LogEvent(INFO, self.ident, "Choosing transfer mode.")
118 if hasFT > 0 and hasS5B > 0 and enabledS5B:
119 self.socksMode()
120 elif hasOOB > 0 and enabledOOB:
121 self.oobMode()
122 elif enabledOOB:
123 self.messageOobMode()
124 else:
125 # No support
126 self.legacyftp.reject()
127 del self.legacyftp
128
129 def discoFail(err=None):
130 LogEvent(INFO, self.ident, str(err))
131 self.messageOobMode()
132
133 d = disco.DiscoRequest(self.session.pytrans, self.toJID).doDisco()
134 d.addCallbacks(discoDone, discoFail)
135
136 def socksMode(self):
137 def ftReply(el):
138 if el.getAttribute("type") != "result":
139 ftDeclined()
140 return
141 self.session.pytrans.ftSOCKS5.addConnection(utils.socks5Hash(self.sid, self.senderJID, self.toJID), self.legacyftp)
142 LogEvent(INFO, self.ident)
143 iq = Element((None, "iq"))
144 iq.attributes["type"] = "set"
145 iq.attributes["to"] = self.toJID
146 iq.attributes["from"] = self.senderJID
147 query = iq.addElement("query")
148 query.attributes["xmlns"] = disco.S5B
149 query.attributes["sid"] = self.sid
150 query.attributes["mode"] = "tcp"
151 streamhost = query.addElement("streamhost")
152 streamhost.attributes["jid"] = self.senderJID
153 streamhost.attributes["host"] = config.ip
154 streamhost.attributes["port"] = config.ftJabberPort
155 d = self.session.pytrans.discovery.sendIq(iq)
156 d.addErrback(ftDeclined) # Timeout
157
158 def ftDeclined(el):
159 self.legacyftp.reject()
160 del self.legacyftp
161
162 LogEvent(INFO, self.ident)
163 self.sid = str(random.randint(1000, sys.maxint))
164 iq = Element((None, "iq"))
165 iq.attributes["type"] = "set"
166 iq.attributes["to"] = self.toJID
167 iq.attributes["from"] = self.senderJID
168 si = iq.addElement("si")
169 si.attributes["xmlns"] = disco.SI
170 si.attributes["profile"] = disco.FT
171 si.attributes["id"] = self.sid
172 file = si.addElement("file")
173 file.attributes["xmlns"] = disco.FT
174 file.attributes["size"] = str(self.legacyftp.filesize)
175 file.attributes["name"] = self.legacyftp.filename
176 # Feature negotiation
177 feature = si.addElement("feature")
178 feature.attributes["xmlns"] = disco.FEATURE_NEG
179 x = feature.addElement("x")
180 x.attributes["xmlns"] = disco.XDATA
181 x.attributes["type"] = "form"
182 field = x.addElement("field")
183 field.attributes["type"] = "list-single"
184 field.attributes["var"] = "stream-method"
185 option = field.addElement("option")
186 value = option.addElement("value")
187 value.addContent(disco.S5B)
188 d = self.session.pytrans.discovery.sendIq(iq, 60*3)
189 d.addCallback(ftReply)
190 d.addErrback(ftDeclined)
191
192 def oobMode(self):
193 def cb(el):
194 if el.getAttribute("type") != "result":
195 self.legacyftp.reject()
196 del self.legacyftp
197 self.session.pytrans.ftOOB.remFile(filename)
198
199 def ecb(ignored=None):
200 self.legacyftp.reject()
201 del self.legacyftp
202
203 LogEvent(INFO, self.ident)
204 filename = self.session.pytrans.ftOOB.putFile(self, self.legacyftp.filename)
205 iq = Element((None, "iq"))
206 iq.attributes["to"] = self.toJID
207 iq.attributes["from"] = self.senderJID
208 query = m.addElement("query")
209 query.attributes["xmlns"] = disco.IQOOB
210 query.addElement("url").addContent(config.ftOOBRoot + "/" + filename)
211 d = self.session.send(iq)
212 d.addCallbacks(cb, ecb)
213
214 def messageOobMode(self):
215 LogEvent(INFO, self.ident)
216 filename = self.session.pytrans.ftOOB.putFile(self, self.legacyftp.filename)
217 m = Element((None, "message"))
218 m.attributes["to"] = self.session.jabberID
219 m.attributes["from"] = self.senderJID
220 m.addElement("body").addContent(config.ftOOBRoot + "/" + filename)
221 x = m.addElement("x")
222 x.attributes["xmlns"] = disco.XOOB
223 x.addElement("url").addContent(config.ftOOBRoot + "/" + filename)
224 self.session.pytrans.send(m)
225
226 def error(self, ignored=None):
227 # FIXME
228 LogEvent(WARN)
229
230
231
232 # SOCKS5
233
234 from tlib import socks5
235 import struct
236
237 class JEP65ConnectionSend(protocol.Protocol):
238 # TODO, clean up and move this to tlib.socks5
239 STATE_INITIAL = 1
240 STATE_WAIT_AUTHOK = 2
241 STATE_WAIT_CONNECTOK = 3
242 STATE_READY = 4
243
244 def __init__(self):
245 self.state = self.STATE_INITIAL
246 self.buf = ""
247
248 def connectionMade(self):
249 self.transport.write(struct.pack("!BBB", 5, 1, 0))
250 self.state = self.STATE_WAIT_AUTHOK
251
252 def connectionLost(self, reason):
253 if self.state == self.STATE_READY:
254 self.factory.consumer.close()
255
256 def _waitAuthOk(self):
257 ver, method = struct.unpack("!BB", self.buf[:2])
258 if ver != 5 or method != 0:
259 self.transport.loseConnection()
260 return
261 self.buf = self.buf[2:] # chop
262
263 # Send CONNECT request
264 length = len(self.factory.hash)
265 self.transport.write(struct.pack("!BBBBB", 5, 1, 0, 3, length))
266 self.transport.write("".join([struct.pack("!B" , ord(x))[0] for x in self.factory.hash]))
267 self.transport.write(struct.pack("!H", 0))
268 self.state = self.STATE_WAIT_CONNECTOK
269
270 def _waitConnectOk(self):
271 ver, rep, rsv, atyp = struct.unpack("!BBBB", self.buf[:4])
272 if not (ver == 5 and rep == 0):
273 self.transport.loseConnection()
274 return
275
276 self.state = self.STATE_READY
277 self.factory.madeConnection(self.transport.addr[0])
278
279 def dataReceived(self, buf):
280 if self.state == self.STATE_READY:
281 self.factory.consumer.write(buf)
282
283 self.buf += buf
284 if self.state == self.STATE_WAIT_AUTHOK:
285 self._waitAuthOk()
286 elif self.state == self.STATE_WAIT_CONNECTOK:
287 self._waitConnectOk()
288
289
290 class JEP65ConnectionReceive(socks5.SOCKSv5):
291 def __init__(self, listener):
292 socks5.SOCKSv5.__init__(self)
293 self.listener = listener
294 self.supportedAuthMechs = [socks5.AUTHMECH_ANON]
295 self.supportedAddrs = [socks5.ADDR_DOMAINNAME]
296 self.enabledCommands = [socks5.CMD_CONNECT]
297 self.addr = ""
298
299 def connectRequested(self, addr, port):
300 # So that the legacyftp can close the connection
301 self.transport.close = self.transport.loseConnection
302
303 # Check for special connect to the namespace -- this signifies that
304 # the client is just checking that it can connect to the streamhost
305 if addr == disco.S5B:
306 self.connectCompleted(addr, 0)
307 self.transport.loseConnection()
308 return
309
310 self.addr = addr
311
312 if self.listener.isActive(addr):
313 self.sendErrorReply(socks5.REPLY_CONN_NOT_ALLOWED)
314 return
315
316 if self.listener.addConnection(addr, self):
317 self.connectCompleted(addr, 0)
318 else:
319 self.sendErrorReply(socks5.REPLY_CONN_REFUSED)
320
321 def connectionLost(self, reason):
322 if self.state == socks5.STATE_CONNECT_PENDING:
323 self.listener.removePendingConnection(self.addr, self)
324 else:
325 self.transport.unregisterProducer()
326 if self.peersock != None:
327 self.peersock.peersock = None
328 self.peersock.transport.unregisterProducer()
329 self.peersock = None
330 self.listener.removeActiveConnection(self.addr)
331
332 class Proxy65(protocol.Factory):
333 def __init__(self, port):
334 LogEvent(INFO)
335 reactor.listenTCP(port, self)
336 self.pendingConns = {}
337 self.activeConns = {}
338
339 def buildProtocol(self, addr):
340 return JEP65ConnectionReceive(self)
341
342 def isActive(self, address):
343 return address in self.activeConns
344
345 def activateStream(self, address):
346 if address in self.pendingConns:
347 olist = self.pendingConns[address]
348 if len(olist) != 2:
349 LogEvent(WARN, '', "Not exactly two!")
350 return
351
352 assert address not in self.activeConns
353 self.activeConns[address] = None
354
355 if not isinstance(olist[0], (JEP65ConnectionReceive, JEP65ConnectionSend)):
356 legacyftp = olist[0]
357 connection = olist[1]
358 elif not isinstance(olist[1], (JEP65ConnectionReceive, JEP65ConnectionSend)):
359 legacyftp = olist[1]
360 connection = olist[0]
361 else:
362 LogEvent(WARN, '', "No JEP65Connection")
363 return
364
365 legacyftp.accept(connection.transport)
366 else:
367 LogEvent(WARN, '', "No pending connection.")
368
369 def addConnection(self, address, connection):
370 olist = self.pendingConns.get(address, [])
371 if len(olist) <= 1:
372 olist.append(connection)
373 self.pendingConns[address] = olist
374 if len(olist) == 2:
375 self.activateStream(address)
376 return True
377 else:
378 return False
379
380 def removePendingConnection(self, address, connection):
381 olist = self.pendingConns[address]
382 if len(olist) == 1:
383 del self.pendingConns[address]
384 else:
385 olist.remove(connection)
386
387 def removeActiveConnection(self, address):
388 del self.activeConns[address]
389
390
391 # OOB download server
392
393 from twisted.web import server, resource, error
394 from twisted.internet import reactor
395
396 from debug import LogEvent, INFO, WARN, ERROR
397
398 class OOBReceiveConnector:
399 def __init__(self, ftReceive, ftHttpPush):
400 self.ftReceive, self.ftHttpPush = ftReceive, ftHttpPush
401 self.ftReceive.legacyftp.accept(self)
402
403 def write(self, data):
404 self.ftHttpPush.write(data)
405
406 def close(self):
407 self.ftHttpPush.finish()
408
409 def error(self):
410 self.ftHttpPush.finish()
411 self.ftReceive.error()
412
413 class FileTransferOOBReceive(resource.Resource):
414 def __init__(self, port):
415 LogEvent(INFO)
416 self.isLeaf = True
417 self.files = {}
418 self.oobSite = server.Site(self)
419 reactor.listenTCP(port, self.oobSite)
420
421 def putFile(self, file, filename):
422 path = str(random.randint(100000000, 999999999))
423 filename = (path + "/" + filename).replace("//", "/")
424 self.files[filename] = file
425 return filename
426
427 def remFile(self, filename):
428 if self.files.has_key(filename):
429 del self.files[filename]
430
431 def render_GET(self, request):
432 filename = request.path[1:] # Remove the leading /
433 if self.files.has_key(filename):
434 file = self.files[filename]
435 request.setHeader("Content-Length", str(file.legacyftp.filesize))
436 request.setHeader("Content-Disposition", "attachment; filename=\"%s\"" % file.legacyftp.filename.encode("utf-8"))
437 OOBReceiveConnector(file, request)
438 del self.files[filename]
439 return server.NOT_DONE_YET
440 else:
441 page = error.NoResource(message="404 File Not Found")
442 return page.render(request)
443
444 def render_HEAD(self, request):
445 filename = request.path[1:] # Remove the leading /
446 if self.files.has_key(filename):
447 file = self.files[filename]
448 request.setHeader("Content-Length", str(file.legacyftp.filesize))
449 request.setHeader("Content-Disposition", "attachment; filename=\"%s\"" % file.legacyftp.filename.encode("utf-8"))
450 return ""
451 else:
452 page = error.NoResource(message="404 File Not Found")
453 return page.render(request)
454
455