-#!/usr/bin/env python
+#!/usr/bin/python2
from __future__ import division
USER_AGENT = "Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:15.0) Gecko/20100101 Firefox/15.0.1"
MIMETYPES = {
- "video/mp4": "mp4",
- "video/x-flv": "flv",
- "video/3gpp": "3gp",
+ "video/mp4": "mp4",
+ "video/x-flv": "flv",
+ "video/3gpp": "3gp",
}
QUALITIES = {
- "hd1080": 5,
- "hd720": 4,
- "large": 3,
- "medium": 2,
- "small": 1,
+ "hd1080": 5,
+ "hd720": 4,
+ "large": 3,
+ "medium": 2,
+ "small": 1,
}
class VideoUnavailable(Exception):
- pass
+ pass
def print_form(url="", msg=""):
- script_url = "http://%s%s" % (os.environ["HTTP_HOST"], os.environ["REQUEST_URI"])
- sys.stdout.write("Content-Type: application/xhtml+xml\r\n\r\n")
- sys.stdout.write("""
+ script_url = "http://%s%s" % (os.environ["HTTP_HOST"], os.environ["REQUEST_URI"])
+ sys.stdout.write("Content-Type: application/xhtml+xml\r\n\r\n")
+ sys.stdout.write("""
<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.1//EN" "http://www.w3.org/TR/xhtml11/DTD/xhtml11.dtd">
<html xmlns="http://www.w3.org/1999/xhtml">
<head>
- <title>delx.net.au - YouTube Scraper</title>
- <link rel="stylesheet" type="text/css" href="/style.css"/>
- <style type="text/css">
- input[type="text"] {
- width: 100%;
- }
- .error {
- color: red;
- }
- </style>
+ <title>delx.net.au - YouTube Scraper</title>
+ <link rel="stylesheet" type="text/css" href="/style.css"/>
+ <style type="text/css">
+ input[type="text"] {
+ width: 100%;
+ }
+ .error {
+ color: red;
+ }
+ </style>
</head>
<body>
- <h1>delx.net.au - YouTube Scraper</h1>
- {0}
- <form action="" method="get">
- <p>This page will let you easily download YouTube videos to watch offline. It
- will automatically grab the highest quality version.</p>
- <div><input type="text" name="url" value="{1}"/></div>
- <div><input type="submit" value="Download!"/></div>
- </form>
- <p>Tip! Use this bookmarklet: <a href="javascript:(function(){window.location='{2}?url='+escape(location);})()">YouTube Download</a>
- to easily download videos. Right-click the link and add it to bookmarks,
- then when you're looking at a YouTube page select that bookmark from your
- browser's bookmarks menu to download the video straight away.</p>
+ <h1>delx.net.au - YouTube Scraper</h1>
+ {0}
+ <form action="" method="get">
+ <p>This page will let you easily download YouTube videos to watch offline. It
+ will automatically grab the highest quality version.</p>
+ <div><input type="text" name="url" value="{1}"/></div>
+ <div><input type="submit" value="Download!"/></div>
+ </form>
+ <p>Tip! Use this bookmarklet: <a href="javascript:(function(){window.location='{2}?url='+escape(location);})()">YouTube Download</a>
+ to easily download videos. Right-click the link and add it to bookmarks,
+ then when you're looking at a YouTube page select that bookmark from your
+ browser's bookmarks menu to download the video straight away.</p>
</body>
</html>
""".replace("{0}", msg).replace("{1}", url).replace("{2}", script_url))
referrer = ""
def urlopen(url, offset=None):
- global referrer
- req = urllib2.Request(url)
- if referrer:
- req.add_header("Referer", referrer)
- referrer = url
+ if url.startswith("//"):
+ url = "http:" + url
- req.add_header("User-Agent", USER_AGENT)
+ global referrer
+ req = urllib2.Request(url)
+ if not referrer:
+ referrer = url
+ else:
+ req.add_header("Referer", referrer)
- if offset:
- req.add_header("Range", "bytes=%d-" % offset)
+ req.add_header("User-Agent", USER_AGENT)
- res = urlopener.open(req)
+ if offset:
+ req.add_header("Range", "bytes=%d-" % offset)
- content_range = res.info().getheader("Content-Range")
- if content_range:
- tokens = content_range.split()
- assert tokens[0] == "bytes"
- start = int(tokens[1].split("-")[0])
- assert start == offset
- return res
+ res = urlopener.open(req)
+
+ content_range = res.info().getheader("Content-Range")
+ if content_range:
+ tokens = content_range.split()
+ assert tokens[0] == "bytes"
+ start = int(tokens[1].split("-")[0])
+ assert start == offset
+ return res
def parse_url(url):
- f = urlopen(url)
- doc = html.parse(f, html.HTMLParser(encoding="utf-8", recover=True))
- f.close()
- return doc
+ f = urlopen(url)
+ doc = html.parse(f, html.HTMLParser(encoding="utf-8", recover=True))
+ f.close()
+ return doc
def append_to_qs(url, params):
- r = list(urlparse.urlsplit(url))
- qs = urlparse.parse_qs(r[3])
- qs.update(params)
- r[3] = urllib.urlencode(qs, True)
- url = urlparse.urlunsplit(r)
- return url
-
-def convert_from_old_itag(player_config):
- url_data = urlparse.parse_qs(player_config["args"]["url_encoded_fmt_stream_map"])
- url_data["url"] = []
- for itag_url in url_data["itag"]:
- pos = itag_url.find("url=")
- url_data["url"].append(itag_url[pos+4:])
- player_config["args"]["url_encoded_fmt_stream_map"] = urllib.urlencode(url_data, True)
+ r = list(urlparse.urlsplit(url))
+ qs = urlparse.parse_qs(r[3])
+ qs.update(params)
+ r[3] = urllib.urlencode(qs, True)
+ url = urlparse.urlunsplit(r)
+ return url
def get_player_config(doc):
- player_config = None
- for script in doc.xpath("//script"):
- if not script.text:
- continue
- for line in script.text.split("\n"):
- if "yt.playerConfig =" in line:
- p1 = line.find("=")
- p2 = line.rfind(";")
- if p1 >= 0 and p2 > 0:
- return json.loads(line[p1+1:p2])
- if "ytplayer.config =" in line:
- p1 = line.find("ytplayer.config =")
- p2 = line.rfind(";")
- if p1 >= 0 and p2 > 0:
- return json.loads(line[p1+18:p2])
- if "'PLAYER_CONFIG': " in line:
- p1 = line.find(":")
- if p1 >= 0:
- player_config = json.loads(line[p1+1:])
- convert_from_old_itag(player_config)
- return player_config
+ player_config = None
+ for script in doc.xpath("//script"):
+ if not script.text:
+ continue
+ for line in script.text.split("\n"):
+ s = "ytplayer.config = {"
+ if s in line:
+ p1 = line.find(s) + len(s) - 1
+ p2 = line.find("};", p1) + 1
+ if p1 >= 0 and p2 > 0:
+ return json.loads(line[p1:p2])
+
+def extract_js(script):
+ PREFIX = "(function(){"
+ SUFFIX = "})();\n"
+ assert script.startswith(PREFIX)
+ assert script.endswith(SUFFIX)
+
+ return script[len(PREFIX):-len(SUFFIX)]
+
+def find_func_name(script):
+ FUNC_NAME = R"([a-zA-Z0-9$]+)"
+ FUNC_PARAMS = R"(\([a-zA-Z]+\.s\))"
+ PATTERN = FUNC_NAME + FUNC_PARAMS + ";"
+
+ match = re.search(PATTERN, script)
+ func_name = match.groups()[0]
+ return func_name
+
+def decode_signature(js_url, signature):
+ script = urlopen(js_url).read()
+ func_name = find_func_name(script)
+
+ params = {
+ "func_name": func_name,
+ "signature": json.dumps(signature),
+ "code": json.dumps(extract_js(script)),
+ }
+ p = subprocess.Popen(
+ "nodejs",
+ shell=True,
+ close_fds=True,
+ stdin=subprocess.PIPE,
+ stdout=subprocess.PIPE
+ )
+ js_decode_script = ("""
+ var vm = require('vm');
+
+ var sandbox = {
+ window: {
+ location: {
+ hash: '',
+ href: ''
+ },
+ history: {
+ pushState: function(){}
+ },
+ navigator: {}
+ },
+ document: {},
+ navigator: {},
+ signature: %(signature)s,
+ transformed_signature: null
+ };
+
+ var code_string = %(code)s + ';';
+ var exec_string = 'transformed_signature = %(func_name)s(signature);';
+ vm.runInNewContext(code_string + exec_string, sandbox);
+
+ console.log(sandbox.transformed_signature);
+ """ % params)
+
+ p.stdin.write(js_decode_script)
+ p.stdin.close()
+
+ transformed_signature = p.stdout.read().strip()
+ if p.wait() != 0:
+ raise Exception("js failed to execute: %d" % p.returncode)
+
+ return transformed_signature
def get_best_video(player_config):
- url_data_list = player_config["args"]["url_encoded_fmt_stream_map"].split(",")
-
- best_url = None
- best_quality = None
- best_extension = None
- for url_data in url_data_list:
- url_data = urlparse.parse_qs(url_data)
- video_url = url_data["url"][0]
- mimetype = url_data["type"][0].split(";")[0]
- quality = url_data["quality"][0]
- signature = url_data["sig"][0]
-
- if quality not in QUALITIES:
- continue
- if mimetype not in MIMETYPES:
- continue
-
- extension = MIMETYPES[mimetype]
- quality = QUALITIES.get(quality, -1)
- video_url = append_to_qs(video_url, {"signature": signature})
-
- if best_quality is None or quality > best_quality:
- best_url = video_url
- best_quality = quality
- best_extension = extension
-
- return best_url, best_extension
+ url_data_list = player_config["args"]["url_encoded_fmt_stream_map"].split(",")
+ js_url = player_config["assets"]["js"]
+
+ best_url = None
+ best_quality = None
+ best_extension = None
+ for url_data in url_data_list:
+ url_data = urlparse.parse_qs(url_data)
+ mimetype = url_data["type"][0].split(";")[0]
+ quality = url_data["quality"][0]
+
+ if url_data.has_key("stereo3d"):
+ continue
+ if quality not in QUALITIES:
+ continue
+ if mimetype not in MIMETYPES:
+ continue
+
+ extension = MIMETYPES[mimetype]
+ quality = QUALITIES.get(quality, -1)
+
+ if best_quality is not None and quality < best_quality:
+ continue
+
+ video_url = url_data["url"][0]
+ if "sig" in url_data:
+ signature = url_data["sig"][0]
+ elif "s" in url_data:
+ signature = decode_signature(js_url, url_data["s"][0])
+ else:
+ signature = None
+
+ if signature:
+ video_url = append_to_qs(video_url, {"signature": signature})
+
+ best_url = video_url
+ best_quality = quality
+ best_extension = extension
+
+ return best_url, best_extension
def sanitize_filename(filename):
- return (
- re.sub("\s+", " ", filename.strip())
- .replace("\\", "-")
- .replace("/", "-")
- .replace("\0", " ")
- )
+ return (
+ re.sub("\s+", " ", filename.strip())
+ .replace("\\", "-")
+ .replace("/", "-")
+ .replace("\0", " ")
+ )
def get_video_url(doc):
- unavailable = doc.xpath("//div[@id='unavailable-message']/text()")
- if unavailable:
- raise VideoUnavailable(unavailable[0].strip())
+ unavailable = doc.xpath("//div[@id='unavailable-message']/text()")
+ if unavailable:
+ raise VideoUnavailable(unavailable[0].strip())
- player_config = get_player_config(doc)
- if not player_config:
- raise VideoUnavailable("Could not find video URL")
+ player_config = get_player_config(doc)
+ if not player_config:
+ raise VideoUnavailable("Could not find video URL")
- video_url, extension = get_best_video(player_config)
- if not video_url:
- return None, None
+ video_url, extension = get_best_video(player_config)
+ if not video_url:
+ return None, None
- title = doc.xpath("/html/head/title/text()")[0]
- filename = sanitize_filename(title)
- filename += "." + extension
+ title = doc.xpath("/html/head/title/text()")[0]
+ filename = sanitize_filename(title)
+ filename += "." + extension
- return video_url, filename
+ return video_url, filename
def write_video(filename, video_data):
- httpinfo = video_data.info()
- encoded_filename = urllib.quote(filename.encode("utf-8"))
- sys.stdout.write("Content-Disposition: attachment; filename*=UTF-8''%s\r\n" % encoded_filename)
- sys.stdout.write("Content-Length: %s\r\n" % httpinfo.getheader("Content-Length"))
- sys.stdout.write("\r\n")
- shutil.copyfileobj(video_data, sys.stdout)
- video_data.close()
+ httpinfo = video_data.info()
+ encoded_filename = urllib.quote(filename.encode("utf-8"))
+ sys.stdout.write("Content-Disposition: attachment; filename*=UTF-8''%s\r\n" % encoded_filename)
+ sys.stdout.write("Content-Length: %s\r\n" % httpinfo.getheader("Content-Length"))
+ sys.stdout.write("\r\n")
+ shutil.copyfileobj(video_data, sys.stdout)
+ video_data.close()
def cgimain():
- args = cgi.parse()
- try:
- url = args["url"][0]
- except:
- print_form(url="http://www.youtube.com/watch?v=FOOBAR")
- return
-
- try:
- doc = parse_url(url)
- video_url, filename = get_video_url(doc)
- video_data = urlopen(video_url)
- write_video(filename, video_data)
- except VideoUnavailable, e:
- print_form(
- url=url,
- msg="<p class='error'>Sorry, there was an error: %s</p>" % cgi.escape(e.message)
- )
- except Exception, e:
- print_form(
- url=url,
- msg="<p class='error'>Sorry, there was an error. Check your URL?</p>"
- )
- return
+ args = cgi.parse()
+ try:
+ url = args["url"][0]
+ except:
+ print_form(url="http://www.youtube.com/watch?v=FOOBAR")
+ return
+
+ try:
+ doc = parse_url(url)
+ video_url, filename = get_video_url(doc)
+ video_data = urlopen(video_url)
+ write_video(filename, video_data)
+ except VideoUnavailable, e:
+ print_form(
+ url=url,
+ msg="<p class='error'>Sorry, there was an error: %s</p>" % cgi.escape(e.message)
+ )
+ except Exception, e:
+ print_form(
+ url=url,
+ msg="<p class='error'>Sorry, there was an error. Check your URL?</p>"
+ )
+ return
def pp_size(size):
- suffixes = ["", "KiB", "MiB", "GiB"]
- for i, suffix in enumerate(suffixes):
- if size < 1024:
- break
- size /= 1024
- return "%.2f %s" % (size, suffix)
+ suffixes = ["", "KiB", "MiB", "GiB"]
+ for i, suffix in enumerate(suffixes):
+ if size < 1024:
+ break
+ size /= 1024
+ return "%.2f %s" % (size, suffix)
def copy_with_progress(content_length, infile, outfile):
- def print_status():
- rate = 0
- if now != last_ts:
- rate = last_bytes_read / (now - last_ts)
- sys.stdout.write("\33[2K\r")
- sys.stdout.write("%s / %s (%s/sec)" % (
- pp_size(bytes_read),
- pp_size(content_length),
- pp_size(rate),
- ))
- sys.stdout.flush()
-
- last_ts = 0
- last_bytes_read = 0
- bytes_read = 0
- while True:
- now = time.time()
- if now - last_ts > 0.5:
- print_status()
- last_ts = now
- last_bytes_read = 0
-
- buf = infile.read(32768)
- if not buf:
- break
- outfile.write(buf)
- last_bytes_read += len(buf)
- bytes_read += len(buf)
-
- # Newline at the end
- print_status()
- print
+ def print_status():
+ rate = 0
+ if now != last_ts:
+ rate = last_bytes_read / (now - last_ts)
+ sys.stdout.write("\33[2K\r")
+ sys.stdout.write("%s / %s (%s/sec)" % (
+ pp_size(bytes_read),
+ pp_size(content_length),
+ pp_size(rate),
+ ))
+ sys.stdout.flush()
+
+ last_ts = 0
+ last_bytes_read = 0
+ bytes_read = 0
+ while True:
+ now = time.time()
+ if now - last_ts > 0.5:
+ print_status()
+ last_ts = now
+ last_bytes_read = 0
+
+ buf = infile.read(32768)
+ if not buf:
+ break
+ outfile.write(buf)
+ last_bytes_read += len(buf)
+ bytes_read += len(buf)
+
+ # Newline at the end
+ print_status()
+ print
def main():
- try:
- url = sys.argv[1]
- except:
- print >>sys.stderr, "Usage: %s http://youtube.com/watch?v=FOOBAR" % sys.argv[0]
- sys.exit(1)
-
- doc = parse_url(url)
- video_url, filename = get_video_url(doc)
- print "Downloading", filename.encode("utf-8")
-
- outfile = open(filename, "a")
- offset = outfile.tell()
- if offset > 0:
- print "Resuming download from", pp_size(offset)
- total_size = None
-
- while True:
- try:
- video_data = urlopen(video_url, offset)
- except urllib2.HTTPError, e:
- if e.code == 416:
- print "File is complete!"
- break
- else:
- raise
-
- content_length = int(video_data.info().getheader("Content-Length"))
- if total_size is None:
- total_size = content_length
-
- try:
- copy_with_progress(content_length, video_data, outfile)
- except IOError, e:
- print
-
- video_data.close()
- if outfile.tell() != total_size:
- old_offset = offset
- offset = outfile.tell()
- if old_offset == offset:
- time.sleep(1)
- print "Restarting download from", pp_size(offset)
- else:
- break
-
- outfile.close()
+ try:
+ url = sys.argv[1]
+ except:
+ print >>sys.stderr, "Usage: %s http://youtube.com/watch?v=FOOBAR" % sys.argv[0]
+ sys.exit(1)
+
+ doc = parse_url(url)
+ video_url, filename = get_video_url(doc)
+ print "Downloading", filename.encode("utf-8")
+
+ outfile = open(filename, "a")
+ offset = outfile.tell()
+ if offset > 0:
+ print "Resuming download from", pp_size(offset)
+ total_size = None
+
+ while True:
+ try:
+ video_data = urlopen(video_url, offset)
+ except urllib2.HTTPError, e:
+ if e.code == 416:
+ print "File is complete!"
+ break
+ else:
+ raise
+
+ content_length = int(video_data.info().getheader("Content-Length"))
+ if total_size is None:
+ total_size = content_length
+
+ try:
+ copy_with_progress(content_length, video_data, outfile)
+ except IOError, e:
+ print
+
+ video_data.close()
+ if outfile.tell() != total_size:
+ old_offset = offset
+ offset = outfile.tell()
+ if old_offset == offset:
+ time.sleep(1)
+ print "Restarting download from", pp_size(offset)
+ else:
+ break
+
+ outfile.close()
if __name__ == "__main__":
- resource.setrlimit(resource.RLIMIT_AS, (MAX_MEMORY_BYTES, MAX_MEMORY_BYTES))
- if os.environ.has_key("SCRIPT_NAME"):
- cgimain()
- else:
- try:
- main()
- except KeyboardInterrupt:
- print "\nExiting..."
- sys.exit(1)
+### resource.setrlimit(resource.RLIMIT_AS, (MAX_MEMORY_BYTES, MAX_MEMORY_BYTES))
+ if os.environ.has_key("SCRIPT_NAME"):
+ cgimain()
+ else:
+ try:
+ main()
+ except KeyboardInterrupt:
+ print "\nExiting..."
+ sys.exit(1)