#!/usr/bin/env python3 import cgi import html.parser import http.cookiejar import json import os import re import shutil import subprocess import sys import time import urllib.error import urllib.parse import urllib.request MAX_MEMORY_BYTES = 128 * 1024*1024 USER_AGENT = "Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:15.0) Gecko/20100101 Firefox/15.0.1" MIMETYPES = { "video/mp4": "mp4", "video/x-flv": "flv", "video/3gpp": "3gp", } QUALITIES = { "hd1080": 5, "hd720": 4, "large": 3, "medium": 2, "small": 1, } class VideoUnavailable(Exception): pass def print_form(url="", msg=""): script_url = "http://%s%s" % (os.environ["HTTP_HOST"], os.environ["REQUEST_URI"]) sys.stdout.write("Content-Type: text/html\r\n\r\n") sys.stdout.write(""" delx.net.au - YouTube Scraper

delx.net.au - YouTube Scraper

{0}

This page will let you easily download YouTube videos to watch offline. It will automatically grab the highest quality version.

Tip! Use this bookmarklet: YouTube Download to easily download videos. Right-click the link and add it to bookmarks, then when you're looking at a YouTube page select that bookmark from your browser's bookmarks menu to download the video straight away.

""".replace("{0}", msg).replace("{1}", url).replace("{2}", script_url)) cookiejar = http.cookiejar.CookieJar() urlopener = urllib.request.build_opener(urllib.request.HTTPCookieProcessor(cookiejar)) referrer = "" def urlopen(url, offset=None): if url.startswith("//"): url = "https:" + url if not url.startswith("http://") and not url.startswith("https://"): url = "https://www.youtube.com" + url global referrer req = urllib.request.Request(url) if not referrer: referrer = url else: req.add_header("Referer", referrer) req.add_header("User-Agent", USER_AGENT) if offset: req.add_header("Range", "bytes=%d-" % offset) res = urlopener.open(req) content_range = res.getheader("Content-Range") if content_range: tokens = content_range.split() assert tokens[0] == "bytes" start = int(tokens[1].split("-")[0]) assert start == offset return res def parse_url(url, parser): f = urlopen(url) parser.feed(f.read().decode("utf-8")) parser.close() f.close() def append_to_qs(url, params): r = list(urllib.parse.urlsplit(url)) qs = urllib.parse.parse_qs(r[3]) qs.update(params) r[3] = urllib.parse.urlencode(qs, True) url = urllib.parse.urlunsplit(r) return url def get_player_config(scripts): player_config = None for script in scripts: for line in script.split("\n"): s = "ytplayer.config = {" if s in line: p1 = line.find(s) + len(s) - 1 p2 = line.find("};", p1) + 1 if p1 >= 0 and p2 > 0: return json.loads(line[p1:p2]) def extract_js(script): PREFIX = "var _yt_player={};(function(g){var window=this;" SUFFIX = ";})(_yt_player);\n" assert script.startswith(PREFIX) assert script.endswith(SUFFIX) return script[len(PREFIX):-len(SUFFIX)] def find_func_name(script): FUNC_NAME = R"([a-zA-Z0-9$]+)" FUNC_PARAMS = R"(\([a-zA-Z]+\.s\))" TERMINATOR = R"[,;\)]" PATTERN = FUNC_NAME + FUNC_PARAMS + TERMINATOR match = re.search(PATTERN, script) func_name = match.groups()[0] return func_name def decode_signature(js_url, signature): f = urlopen(js_url) script = f.read().decode("utf-8") f.close() func_name = find_func_name(script) params = { "func_name": func_name, "signature": json.dumps(signature), "code": json.dumps(extract_js(script)), } p = subprocess.Popen( "node", shell=True, close_fds=True, stdin=subprocess.PIPE, stdout=subprocess.PIPE ) js_decode_script = (""" var vm = require('vm'); var sandbox = { location: { hash: '', href: '', protocol: 'http:' }, history: { pushState: function(){} }, document: {}, navigator: { userAgent: '' }, matchMedia: () => ({matches: () => {}, media: ''}), signature: %(signature)s, transformed_signature: null, g: function(){} // this is _yt_player }; sandbox.window = sandbox; var code_string = %(code)s + ';'; var exec_string = 'transformed_signature = %(func_name)s(signature);'; vm.runInNewContext(code_string + exec_string, sandbox); console.log(sandbox.transformed_signature); """ % params) p.stdin.write(js_decode_script.encode("utf-8")) p.stdin.close() transformed_signature = p.stdout.read().decode("utf-8").strip() if p.wait() != 0: raise Exception("js failed to execute: %d" % p.returncode) return transformed_signature def get_best_video(player_config): url_data_list = player_config["args"]["url_encoded_fmt_stream_map"].split(",") js_url = player_config["assets"]["js"] best_url = None best_quality = None best_extension = None for url_data in url_data_list: url_data = urllib.parse.parse_qs(url_data) mimetype = url_data["type"][0].split(";")[0] quality = url_data["quality"][0] if "stereo3d" in url_data: continue if quality not in QUALITIES: continue if mimetype not in MIMETYPES: continue extension = MIMETYPES[mimetype] quality = QUALITIES.get(quality, -1) if best_quality is not None and quality < best_quality: continue video_url = url_data["url"][0] if "sig" in url_data: signature = url_data["sig"][0] elif "s" in url_data: signature = decode_signature(js_url, url_data["s"][0]) else: signature = None if signature: video_url = append_to_qs(video_url, {"signature": signature}) best_url = video_url best_quality = quality best_extension = extension return best_url, best_extension def sanitize_filename(filename): return ( re.sub("\s+", " ", filename.strip()) .replace("\\", "-") .replace("/", "-") .replace("\0", " ") ) def get_video_url(page): player_config = get_player_config(page.scripts) if not player_config: raise VideoUnavailable(page.unavailable_message or "Could not find video URL") video_url, extension = get_best_video(player_config) if not video_url: return None, None filename = sanitize_filename(page.title) filename += "." + extension return video_url, filename class YouTubeVideoPageParser(html.parser.HTMLParser): def __init__(self): super().__init__() self.title = None self.unavailable_message = None self.scripts = [] def handle_starttag(self, tag, attrs): attrs = dict(attrs) self._handle_title(tag, attrs) self._handle_unavailable_message(tag, attrs) self._handle_script(tag, attrs) def handle_endtag(self, tag): self.handle_data = self._ignore_data def _ignore_data(self, _): pass def _handle_title(self, tag, attrs): if tag == "title": self.handle_data = self._handle_title_data def _handle_title_data(self, data): self.title = data.strip() def _handle_unavailable_message(self, tag, attrs): if attrs.get("id", None) == "unavailable-message": self.handle_data = self._handle_unavailable_message_data def _handle_unavailable_message_data(self, data): self.unavailable_message = data.strip() def _handle_script(self, tag, attrs): if tag == "script": self.handle_data = self._handle_script_data def _handle_script_data(self, data): if data: self.scripts.append(data) def write_video(filename, video_data): quoted_filename = urllib.parse.quote(filename.encode("utf-8")) sys.stdout.buffer.write( b"Content-Disposition: attachment; filename*=UTF-8''{0}\r\n" .replace(b"{0}", quoted_filename.encode("utf-8")) ) sys.stdout.buffer.write( b"Content-Length: {0}\r\n" .replace(b"{0}", video_data.getheader("Content-Length").encode("utf-8")) ) sys.stdout.buffer.write(b"\r\n") shutil.copyfileobj(video_data, sys.stdout.buffer) video_data.close() def cgimain(): args = cgi.parse() try: url = args["url"][0] except: print_form(url="http://www.youtube.com/watch?v=FOOBAR") return try: page = YouTubeVideoPageParser() parse_url(url, page) video_url, filename = get_video_url(page) video_data = urlopen(video_url) except VideoUnavailable as e: print_form( url=url, msg="

Sorry, there was an error: %s

" % cgi.escape(e.args[0]) ) except Exception as e: print_form( url=url, msg="

Sorry, there was an error. Check your URL?

" ) return write_video(filename, video_data) def pp_size(size): suffixes = ["", "KiB", "MiB", "GiB"] for i, suffix in enumerate(suffixes): if size < 1024: break size /= 1024 return "%.2f %s" % (size, suffix) def copy_with_progress(content_length, infile, outfile): def print_status(): rate = 0 if now != last_ts: rate = last_bytes_read / (now - last_ts) sys.stdout.write("\33[2K\r") sys.stdout.write("%s / %s (%s/sec)" % ( pp_size(bytes_read), pp_size(content_length), pp_size(rate), )) sys.stdout.flush() last_ts = 0 last_bytes_read = 0 bytes_read = 0 while True: now = time.time() if now - last_ts > 0.5: print_status() last_ts = now last_bytes_read = 0 buf = infile.read(32768) if not buf: break outfile.write(buf) last_bytes_read += len(buf) bytes_read += len(buf) # Newline at the end print_status() print() def main(): try: url = sys.argv[1] except: print("Usage: %s http://youtube.com/watch?v=FOOBAR" % sys.argv[0], file=sys.stderr) sys.exit(1) page = YouTubeVideoPageParser() parse_url(url, page) video_url, filename = get_video_url(page) print("Downloading", filename) outfile = open(filename, "ab") offset = outfile.tell() if offset > 0: print("Resuming download from", pp_size(offset)) total_size = None while True: try: video_data = urlopen(video_url, offset) except urllib.error.HTTPError as e: if e.code == 416: print("File is complete!") break else: raise content_length = int(video_data.getheader("Content-Length")) if total_size is None: total_size = content_length try: copy_with_progress(content_length, video_data, outfile) except IOError as e: print() video_data.close() if outfile.tell() != total_size: old_offset = offset offset = outfile.tell() if old_offset == offset: time.sleep(1) print("Restarting download from", pp_size(offset)) else: break outfile.close() if __name__ == "__main__": if "SCRIPT_NAME" in os.environ: cgimain() else: try: main() except KeyboardInterrupt: print("\nExiting...") sys.exit(1)