#!/usr/bin/python2 from __future__ import division import cookielib import cgi import json from lxml import html import os import re import resource import shutil import subprocess import sys import time import urllib import urllib2 import urlparse MAX_MEMORY_BYTES = 128 * 1024*1024 USER_AGENT = "Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:15.0) Gecko/20100101 Firefox/15.0.1" MIMETYPES = { "video/mp4": "mp4", "video/x-flv": "flv", "video/3gpp": "3gp", } QUALITIES = { "hd1080": 5, "hd720": 4, "large": 3, "medium": 2, "small": 1, } class VideoUnavailable(Exception): pass def print_form(url="", msg=""): script_url = "http://%s%s" % (os.environ["HTTP_HOST"], os.environ["REQUEST_URI"]) sys.stdout.write("Content-Type: application/xhtml+xml\r\n\r\n") sys.stdout.write(""" delx.net.au - YouTube Scraper

delx.net.au - YouTube Scraper

{0}

This page will let you easily download YouTube videos to watch offline. It will automatically grab the highest quality version.

Tip! Use this bookmarklet: YouTube Download to easily download videos. Right-click the link and add it to bookmarks, then when you're looking at a YouTube page select that bookmark from your browser's bookmarks menu to download the video straight away.

""".replace("{0}", msg).replace("{1}", url).replace("{2}", script_url)) cookiejar = cookielib.CookieJar() urlopener = urllib2.build_opener(urllib2.HTTPCookieProcessor(cookiejar)) referrer = "" def urlopen(url, offset=None): if url.startswith("//"): url = "http:" + url global referrer req = urllib2.Request(url) if not referrer: referrer = url else: req.add_header("Referer", referrer) req.add_header("User-Agent", USER_AGENT) if offset: req.add_header("Range", "bytes=%d-" % offset) res = urlopener.open(req) content_range = res.info().getheader("Content-Range") if content_range: tokens = content_range.split() assert tokens[0] == "bytes" start = int(tokens[1].split("-")[0]) assert start == offset return res def parse_url(url): f = urlopen(url) doc = html.parse(f, html.HTMLParser(encoding="utf-8", recover=True)) f.close() return doc def append_to_qs(url, params): r = list(urlparse.urlsplit(url)) qs = urlparse.parse_qs(r[3]) qs.update(params) r[3] = urllib.urlencode(qs, True) url = urlparse.urlunsplit(r) return url def get_player_config(doc): player_config = None for script in doc.xpath("//script"): if not script.text: continue for line in script.text.split("\n"): s = "ytplayer.config = {" if s in line: p1 = line.find(s) + len(s) - 1 p2 = line.find("};", p1) + 1 if p1 >= 0 and p2 > 0: return json.loads(line[p1:p2]) def extract_js(script): PREFIX = "var _yt_player={};(function(g){var window=this;" SUFFIX = ";})(_yt_player);\n" assert script.startswith(PREFIX) assert script.endswith(SUFFIX) return script[len(PREFIX):-len(SUFFIX)] def find_func_name(script): FUNC_NAME = R"([a-zA-Z0-9$]+)" FUNC_PARAMS = R"(\([a-zA-Z]+\.s\))" PATTERN = FUNC_NAME + FUNC_PARAMS + ";" match = re.search(PATTERN, script) func_name = match.groups()[0] return func_name def decode_signature(js_url, signature): script = urlopen(js_url).read() func_name = find_func_name(script) params = { "func_name": func_name, "signature": json.dumps(signature), "code": json.dumps(extract_js(script)), } p = subprocess.Popen( "nodejs", shell=True, close_fds=True, stdin=subprocess.PIPE, stdout=subprocess.PIPE ) js_decode_script = (""" var vm = require('vm'); var sandbox = { location: { hash: '', href: '', protocol: 'http:' }, history: { pushState: function(){} }, document: {}, navigator: { userAgent: '' }, signature: %(signature)s, transformed_signature: null, g: function(){} // this is _yt_player }; sandbox.window = sandbox; var code_string = %(code)s + ';'; var exec_string = 'transformed_signature = %(func_name)s(signature);'; vm.runInNewContext(code_string + exec_string, sandbox); console.log(sandbox.transformed_signature); """ % params) p.stdin.write(js_decode_script) p.stdin.close() transformed_signature = p.stdout.read().strip() if p.wait() != 0: raise Exception("js failed to execute: %d" % p.returncode) return transformed_signature def get_best_video(player_config): url_data_list = player_config["args"]["url_encoded_fmt_stream_map"].split(",") js_url = player_config["assets"]["js"] best_url = None best_quality = None best_extension = None for url_data in url_data_list: url_data = urlparse.parse_qs(url_data) mimetype = url_data["type"][0].split(";")[0] quality = url_data["quality"][0] if url_data.has_key("stereo3d"): continue if quality not in QUALITIES: continue if mimetype not in MIMETYPES: continue extension = MIMETYPES[mimetype] quality = QUALITIES.get(quality, -1) if best_quality is not None and quality < best_quality: continue video_url = url_data["url"][0] if "sig" in url_data: signature = url_data["sig"][0] elif "s" in url_data: signature = decode_signature(js_url, url_data["s"][0]) else: signature = None if signature: video_url = append_to_qs(video_url, {"signature": signature}) best_url = video_url best_quality = quality best_extension = extension return best_url, best_extension def sanitize_filename(filename): return ( re.sub("\s+", " ", filename.strip()) .replace("\\", "-") .replace("/", "-") .replace("\0", " ") ) def get_video_url(doc): unavailable = doc.xpath("//div[@id='unavailable-message']/text()") if unavailable: raise VideoUnavailable(unavailable[0].strip()) player_config = get_player_config(doc) if not player_config: raise VideoUnavailable("Could not find video URL") video_url, extension = get_best_video(player_config) if not video_url: return None, None title = doc.xpath("/html/head/title/text()")[0] filename = sanitize_filename(title) filename += "." + extension return video_url, filename def write_video(filename, video_data): httpinfo = video_data.info() encoded_filename = urllib.quote(filename.encode("utf-8")) sys.stdout.write("Content-Disposition: attachment; filename*=UTF-8''%s\r\n" % encoded_filename) sys.stdout.write("Content-Length: %s\r\n" % httpinfo.getheader("Content-Length")) sys.stdout.write("\r\n") shutil.copyfileobj(video_data, sys.stdout) video_data.close() def cgimain(): args = cgi.parse() try: url = args["url"][0] except: print_form(url="http://www.youtube.com/watch?v=FOOBAR") return try: doc = parse_url(url) video_url, filename = get_video_url(doc) video_data = urlopen(video_url) write_video(filename, video_data) except VideoUnavailable, e: print_form( url=url, msg="

Sorry, there was an error: %s

" % cgi.escape(e.message) ) except Exception, e: print_form( url=url, msg="

Sorry, there was an error. Check your URL?

" ) return def pp_size(size): suffixes = ["", "KiB", "MiB", "GiB"] for i, suffix in enumerate(suffixes): if size < 1024: break size /= 1024 return "%.2f %s" % (size, suffix) def copy_with_progress(content_length, infile, outfile): def print_status(): rate = 0 if now != last_ts: rate = last_bytes_read / (now - last_ts) sys.stdout.write("\33[2K\r") sys.stdout.write("%s / %s (%s/sec)" % ( pp_size(bytes_read), pp_size(content_length), pp_size(rate), )) sys.stdout.flush() last_ts = 0 last_bytes_read = 0 bytes_read = 0 while True: now = time.time() if now - last_ts > 0.5: print_status() last_ts = now last_bytes_read = 0 buf = infile.read(32768) if not buf: break outfile.write(buf) last_bytes_read += len(buf) bytes_read += len(buf) # Newline at the end print_status() print def main(): try: url = sys.argv[1] except: print >>sys.stderr, "Usage: %s http://youtube.com/watch?v=FOOBAR" % sys.argv[0] sys.exit(1) doc = parse_url(url) video_url, filename = get_video_url(doc) print "Downloading", filename.encode("utf-8") outfile = open(filename, "a") offset = outfile.tell() if offset > 0: print "Resuming download from", pp_size(offset) total_size = None while True: try: video_data = urlopen(video_url, offset) except urllib2.HTTPError, e: if e.code == 416: print "File is complete!" break else: raise content_length = int(video_data.info().getheader("Content-Length")) if total_size is None: total_size = content_length try: copy_with_progress(content_length, video_data, outfile) except IOError, e: print video_data.close() if outfile.tell() != total_size: old_offset = offset offset = outfile.tell() if old_offset == offset: time.sleep(1) print "Restarting download from", pp_size(offset) else: break outfile.close() if __name__ == "__main__": ### resource.setrlimit(resource.RLIMIT_AS, (MAX_MEMORY_BYTES, MAX_MEMORY_BYTES)) if os.environ.has_key("SCRIPT_NAME"): cgimain() else: try: main() except KeyboardInterrupt: print "\nExiting..." sys.exit(1)