#!/usr/bin/env python from __future__ import division import cookielib import cgi import json from lxml import html import os import re import resource import shutil import subprocess import sys import time import urllib import urllib2 import urlparse MAX_MEMORY_BYTES = 128 * 1024*1024 USER_AGENT = "Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:15.0) Gecko/20100101 Firefox/15.0.1" MIMETYPES = { "video/mp4": "mp4", "video/x-flv": "flv", "video/3gpp": "3gp", } QUALITIES = { "hd1080": 5, "hd720": 4, "large": 3, "medium": 2, "small": 1, } class VideoUnavailable(Exception): pass def print_form(url="", msg=""): script_url = "http://%s%s" % (os.environ["HTTP_HOST"], os.environ["REQUEST_URI"]) sys.stdout.write("Content-Type: application/xhtml+xml\r\n\r\n") sys.stdout.write(""" delx.net.au - YouTube Scraper

delx.net.au - YouTube Scraper

{0}

This page will let you easily download YouTube videos to watch offline. It will automatically grab the highest quality version.

Tip! Use this bookmarklet: YouTube Download to easily download videos. Right-click the link and add it to bookmarks, then when you're looking at a YouTube page select that bookmark from your browser's bookmarks menu to download the video straight away.

""".replace("{0}", msg).replace("{1}", url).replace("{2}", script_url)) cookiejar = cookielib.CookieJar() urlopener = urllib2.build_opener(urllib2.HTTPCookieProcessor(cookiejar)) referrer = "" def urlopen(url, offset=None): global referrer req = urllib2.Request(url) if referrer: req.add_header("Referer", referrer) referrer = url req.add_header("User-Agent", USER_AGENT) if offset: req.add_header("Range", "bytes=%d-" % offset) res = urlopener.open(req) content_range = res.info().getheader("Content-Range") if content_range: tokens = content_range.split() assert tokens[0] == "bytes" start = int(tokens[1].split("-")[0]) assert start == offset return res def parse_url(url): f = urlopen(url) doc = html.parse(f, html.HTMLParser(encoding="utf-8", recover=True)) f.close() return doc def append_to_qs(url, params): r = list(urlparse.urlsplit(url)) qs = urlparse.parse_qs(r[3]) qs.update(params) r[3] = urllib.urlencode(qs, True) url = urlparse.urlunsplit(r) return url def convert_from_old_itag(player_config): url_data = urlparse.parse_qs(player_config["args"]["url_encoded_fmt_stream_map"]) url_data["url"] = [] for itag_url in url_data["itag"]: pos = itag_url.find("url=") url_data["url"].append(itag_url[pos+4:]) player_config["args"]["url_encoded_fmt_stream_map"] = urllib.urlencode(url_data, True) def get_player_config(doc): player_config = None for script in doc.xpath("//script"): if not script.text: continue for line in script.text.split("\n"): if "yt.playerConfig =" in line: p1 = line.find("=") p2 = line.rfind(";") if p1 >= 0 and p2 > 0: return json.loads(line[p1+1:p2]) if "ytplayer.config =" in line: p1 = line.find("ytplayer.config =") p2 = line.rfind(";") if p1 >= 0 and p2 > 0: return json.loads(line[p1+18:p2]) if "'PLAYER_CONFIG': " in line: p1 = line.find(":") if p1 >= 0: player_config = json.loads(line[p1+1:]) convert_from_old_itag(player_config) return player_config def extract_function(output, script, func_name): p1 = script.find("function " + func_name) p2 = script.find("}", p1) code = script[p1:p2+1] output.append(code) deps = re.findall(R"[^\.]\b([a-zA-Z]+)\(", code) deps = set(deps) deps.remove(func_name) for dep in deps: extract_function(output, script, dep) def decode_signature(js_url, s): script = urlopen(js_url).read() func_name = re.search(R"\b([a-zA-Z]+)\([a-zA-Z]+\.s\);", script).groups()[0] codes = [] extract_function(codes, script, func_name) p = subprocess.Popen( "js", shell=True, close_fds=True, stdin=subprocess.PIPE, stdout=subprocess.PIPE ) for code in codes: p.stdin.write(code + "\n") p.stdin.write("console.log(%s('%s'));\n" % (func_name, s)) p.stdin.close() signature = p.stdout.read().strip() if p.wait() != 0: raise Exception("js failed to execute: %d" % p.returncode) return signature def get_best_video(player_config): url_data_list = player_config["args"]["url_encoded_fmt_stream_map"].split(",") js_url = player_config["assets"]["js"] best_url = None best_quality = None best_extension = None for url_data in url_data_list: url_data = urlparse.parse_qs(url_data) mimetype = url_data["type"][0].split(";")[0] quality = url_data["quality"][0] if quality not in QUALITIES: continue if mimetype not in MIMETYPES: continue extension = MIMETYPES[mimetype] quality = QUALITIES.get(quality, -1) if best_quality is not None and quality < best_quality: continue video_url = url_data["url"][0] if "sig" in url_data: signature = url_data["sig"][0] else: signature = decode_signature(js_url, url_data["s"][0]) video_url = append_to_qs(video_url, {"signature": signature}) best_url = video_url best_quality = quality best_extension = extension return best_url, best_extension def sanitize_filename(filename): return ( re.sub("\s+", " ", filename.strip()) .replace("\\", "-") .replace("/", "-") .replace("\0", " ") ) def get_video_url(doc): unavailable = doc.xpath("//div[@id='unavailable-message']/text()") if unavailable: raise VideoUnavailable(unavailable[0].strip()) player_config = get_player_config(doc) if not player_config: raise VideoUnavailable("Could not find video URL") video_url, extension = get_best_video(player_config) if not video_url: return None, None title = doc.xpath("/html/head/title/text()")[0] filename = sanitize_filename(title) filename += "." + extension return video_url, filename def write_video(filename, video_data): httpinfo = video_data.info() encoded_filename = urllib.quote(filename.encode("utf-8")) sys.stdout.write("Content-Disposition: attachment; filename*=UTF-8''%s\r\n" % encoded_filename) sys.stdout.write("Content-Length: %s\r\n" % httpinfo.getheader("Content-Length")) sys.stdout.write("\r\n") shutil.copyfileobj(video_data, sys.stdout) video_data.close() def cgimain(): args = cgi.parse() try: url = args["url"][0] except: print_form(url="http://www.youtube.com/watch?v=FOOBAR") return try: doc = parse_url(url) video_url, filename = get_video_url(doc) video_data = urlopen(video_url) write_video(filename, video_data) except VideoUnavailable, e: print_form( url=url, msg="

Sorry, there was an error: %s

" % cgi.escape(e.message) ) except Exception, e: print_form( url=url, msg="

Sorry, there was an error. Check your URL?

" ) return def pp_size(size): suffixes = ["", "KiB", "MiB", "GiB"] for i, suffix in enumerate(suffixes): if size < 1024: break size /= 1024 return "%.2f %s" % (size, suffix) def copy_with_progress(content_length, infile, outfile): def print_status(): rate = 0 if now != last_ts: rate = last_bytes_read / (now - last_ts) sys.stdout.write("\33[2K\r") sys.stdout.write("%s / %s (%s/sec)" % ( pp_size(bytes_read), pp_size(content_length), pp_size(rate), )) sys.stdout.flush() last_ts = 0 last_bytes_read = 0 bytes_read = 0 while True: now = time.time() if now - last_ts > 0.5: print_status() last_ts = now last_bytes_read = 0 buf = infile.read(32768) if not buf: break outfile.write(buf) last_bytes_read += len(buf) bytes_read += len(buf) # Newline at the end print_status() print def main(): try: url = sys.argv[1] except: print >>sys.stderr, "Usage: %s http://youtube.com/watch?v=FOOBAR" % sys.argv[0] sys.exit(1) doc = parse_url(url) video_url, filename = get_video_url(doc) print "Downloading", filename.encode("utf-8") outfile = open(filename, "a") offset = outfile.tell() if offset > 0: print "Resuming download from", pp_size(offset) total_size = None while True: try: video_data = urlopen(video_url, offset) except urllib2.HTTPError, e: if e.code == 416: print "File is complete!" break else: raise content_length = int(video_data.info().getheader("Content-Length")) if total_size is None: total_size = content_length try: copy_with_progress(content_length, video_data, outfile) except IOError, e: print video_data.close() if outfile.tell() != total_size: old_offset = offset offset = outfile.tell() if old_offset == offset: time.sleep(1) print "Restarting download from", pp_size(offset) else: break outfile.close() if __name__ == "__main__": ### resource.setrlimit(resource.RLIMIT_AS, (MAX_MEMORY_BYTES, MAX_MEMORY_BYTES)) if os.environ.has_key("SCRIPT_NAME"): cgimain() else: try: main() except KeyboardInterrupt: print "\nExiting..." sys.exit(1)