X-Git-Url: https://code.delx.au/youtube-cgi/blobdiff_plain/8fd0fe2caf7348910fcac21738694104aff0befb..c7348757cc65852d4d82c7718f36a661268ab262:/youtube.cgi diff --git a/youtube.cgi b/youtube.cgi index 283e692..b57636b 100755 --- a/youtube.cgi +++ b/youtube.cgi @@ -1,285 +1,468 @@ -#!/usr/bin/env python +#!/usr/bin/env python3 -from __future__ import division - -import cookielib import cgi -import itertools +import html.parser +import http.cookiejar import json -from lxml import html import os import re -import resource import shutil import subprocess import sys import time -import urllib -import urllib2 -import urlparse +import urllib.error +import urllib.parse +import urllib.request MAX_MEMORY_BYTES = 128 * 1024*1024 USER_AGENT = "Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:15.0) Gecko/20100101 Firefox/15.0.1" MIMETYPES = { - "video/mp4": "mp4", - "video/x-flv": "flv", - "video/3gpp": "3gp", + "video/mp4": "mp4", + "video/x-flv": "flv", + "video/3gpp": "3gp", } QUALITIES = { - "hd1080": 5, - "hd720": 4, - "large": 3, - "medium": 2, - "small": 1, + "hd1080": 5, + "hd720": 4, + "large": 3, + "medium": 2, + "small": 1, } class VideoUnavailable(Exception): - pass + pass def print_form(url="", msg=""): - script_url = "http://%s%s" % (os.environ["HTTP_HOST"], os.environ["REQUEST_URI"]) - sys.stdout.write("Content-Type: application/xhtml+xml\r\n\r\n") - sys.stdout.write(""" - - + script_url = "http://%s%s" % (os.environ["HTTP_HOST"], os.environ["REQUEST_URI"]) + sys.stdout.write("Content-Type: text/html\r\n\r\n") + sys.stdout.write(""" + + - delx.net.au - YouTube Scraper - - + delx.net.au - YouTube Scraper + + -

delx.net.au - YouTube Scraper

- {0} -
-

This page will let you easily download YouTube videos to watch offline. It - will automatically grab the highest quality version.

-
-
-
-

Tip! Use this bookmarklet: YouTube Download - to easily download videos. Right-click the link and add it to bookmarks, - then when you're looking at a YouTube page select that bookmark from your - browser's bookmarks menu to download the video straight away.

+

delx.net.au - YouTube Scraper

+ {0} +
+

This page will let you easily download YouTube videos to watch offline. It + will automatically grab the highest quality version.

+
+
+
+

Tip! Use this bookmarklet: YouTube Download + to easily download videos. Right-click the link and add it to bookmarks, + then when you're looking at a YouTube page select that bookmark from your + browser's bookmarks menu to download the video straight away.

""".replace("{0}", msg).replace("{1}", url).replace("{2}", script_url)) -cookiejar = cookielib.CookieJar() -urlopener = urllib2.build_opener(urllib2.HTTPCookieProcessor(cookiejar)) +cookiejar = http.cookiejar.CookieJar() +urlopener = urllib.request.build_opener(urllib.request.HTTPCookieProcessor(cookiejar)) referrer = "" -def urlopen(url): - global referrer - req = urllib2.Request(url) - if referrer: - req.add_header("Referer", referrer) - referrer = url - req.add_header("User-Agent", USER_AGENT) - return urlopener.open(req) - -def parse_url(url): - f = urlopen(url) - doc = html.parse(f, html.HTMLParser(encoding="utf-8", recover=True)) - f.close() - return doc +def urlopen(url, offset=None): + if url.startswith("//"): + url = "https:" + url + if not url.startswith("http://") and not url.startswith("https://"): + url = "https://www.youtube.com" + url + + global referrer + req = urllib.request.Request(url) + if not referrer: + referrer = url + else: + req.add_header("Referer", referrer) + + req.add_header("User-Agent", USER_AGENT) + + if offset: + req.add_header("Range", "bytes=%d-" % offset) + + res = urlopener.open(req) + + content_range = res.getheader("Content-Range") + if content_range: + tokens = content_range.split() + assert tokens[0] == "bytes" + start = int(tokens[1].split("-")[0]) + assert start == offset + return res + +def parse_url(url, parser): + f = urlopen(url) + parser.feed(f.read().decode("utf-8")) + parser.close() + f.close() def append_to_qs(url, params): - r = list(urlparse.urlsplit(url)) - qs = urlparse.parse_qs(r[3]) - qs.update(params) - r[3] = urllib.urlencode(qs, True) - url = urlparse.urlunsplit(r) - return url - -def convert_from_old_itag(player_config): - url_data = urlparse.parse_qs(player_config["args"]["url_encoded_fmt_stream_map"]) - url_data["url"] = [] - for itag_url in url_data["itag"]: - pos = itag_url.find("url=") - url_data["url"].append(itag_url[pos+4:]) - player_config["args"]["url_encoded_fmt_stream_map"] = urllib.urlencode(url_data, True) - -def get_player_config(doc): - player_config = None - for script in doc.xpath("//script"): - if not script.text: - continue - for line in script.text.split("\n"): - if "yt.playerConfig =" in line: - p1 = line.find("=") - p2 = line.rfind(";") - if p1 >= 0 and p2 > 0: - return json.loads(line[p1+1:p2]) - if "'PLAYER_CONFIG': " in line: - p1 = line.find(":") - if p1 >= 0: - player_config = json.loads(line[p1+1:]) - convert_from_old_itag(player_config) - return player_config + r = list(urllib.parse.urlsplit(url)) + qs = urllib.parse.parse_qs(r[3]) + qs.update(params) + r[3] = urllib.parse.urlencode(qs, True) + url = urllib.parse.urlunsplit(r) + return url + +def get_player_config(scripts): + player_config = None + for script in scripts: + for line in script.split("\n"): + s = "ytplayer.config = {" + if s in line: + p1 = line.find(s) + len(s) - 1 + p2 = line.find("};", p1) + 1 + if p1 >= 0 and p2 > 0: + return json.loads(line[p1:p2]) + +def extract_js(script): + PREFIX = "var _yt_player={};(function(g){var window=this;" + SUFFIX = ";})(_yt_player);\n" + assert script.startswith(PREFIX) + assert script.endswith(SUFFIX) + + return script[len(PREFIX):-len(SUFFIX)] + +def find_func_name(script): + FUNC_NAME = R"([a-zA-Z0-9$]+)" + FUNC_PARAMS = R"(\([a-zA-Z,\.]+\.s\))" + TERMINATOR = R"[,;\)]" + PATTERN = FUNC_NAME + FUNC_PARAMS + TERMINATOR + + match = re.search(PATTERN, script) + func_name = match.groups()[0] + return func_name + +def decode_signature(js_url, signature): + f = urlopen(js_url) + script = f.read().decode("utf-8") + f.close() + + func_name = find_func_name(script) + + params = { + "func_name": func_name, + "signature": json.dumps(signature), + "code": json.dumps(extract_js(script)), + } + p = subprocess.Popen( + "node", + shell=True, + close_fds=True, + stdin=subprocess.PIPE, + stdout=subprocess.PIPE + ) + js_decode_script = (""" + const vm = require('vm'); + + const sandbox = { + location: { + hash: '', + href: '', + protocol: 'http:' + }, + history: { + pushState: function(){} + }, + document: {}, + navigator: { + userAgent: '' + }, + matchMedia: () => ({matches: () => {}, media: ''}), + signature: %(signature)s, + transformed_signature: null, + g: function(){} // this is _yt_player + }; + sandbox.window = sandbox; + + const code_string = %(code)s + ';'; + const exec_string = 'transformed_signature = %(func_name)s("", "MARKER", signature);'; + vm.runInNewContext(code_string + exec_string, sandbox); + + function findSignature(obj) { + if (typeof obj !== 'object') { + return; + } + for (const [key, value] of Object.entries(obj)) { + if (key === 'MARKER') { + return value; + } + const result = findSignature(value); + if (result) { + return result; + } + } + } + console.log(findSignature(sandbox.transformed_signature)); + """ % params) + + p.stdin.write(js_decode_script.encode("utf-8")) + p.stdin.close() + + transformed_signature = p.stdout.read().decode("utf-8").strip() + if p.wait() != 0: + raise Exception("js failed to execute: %d" % p.returncode) + + return transformed_signature def get_best_video(player_config): - url_data = urlparse.parse_qs(player_config["args"]["url_encoded_fmt_stream_map"]) - url_data = itertools.izip_longest( - url_data["url"], - url_data["type"], - url_data["quality"], - url_data.get("sig", []), - ) - best_url = None - best_quality = None - best_extension = None - for video_url, mimetype, quality, signature in url_data: - mimetype = mimetype.split(";")[0] - if mimetype not in MIMETYPES: - continue - extension = MIMETYPES[mimetype] - quality = QUALITIES.get(quality.split(",")[0], -1) - if best_quality is None or quality > best_quality: - if signature: - video_url = append_to_qs(video_url, {"signature": signature}) - best_url = video_url - best_quality = quality - best_extension = extension - - return best_url, best_extension + url_data_list = player_config["args"]["url_encoded_fmt_stream_map"].split(",") + js_url = player_config["assets"]["js"] + + best_url = None + best_quality = None + best_extension = None + for url_data in url_data_list: + url_data = urllib.parse.parse_qs(url_data) + mimetype = url_data["type"][0].split(";")[0] + quality = url_data["quality"][0] + + if "stereo3d" in url_data: + continue + if quality not in QUALITIES: + continue + if mimetype not in MIMETYPES: + continue + + extension = MIMETYPES[mimetype] + quality = QUALITIES.get(quality, -1) + + if best_quality is not None and quality < best_quality: + continue + + video_url = url_data["url"][0] + if "sig" in url_data: + signature = url_data["sig"][0] + elif "s" in url_data: + signature = decode_signature(js_url, url_data["s"][0]) + else: + signature = None + + if signature: + video_url = append_to_qs(video_url, {"signature": signature}) + + best_url = video_url + best_quality = quality + best_extension = extension + + return best_url, best_extension def sanitize_filename(filename): - return ( - re.sub("\s+", " ", filename.strip()) - .replace("\\", "-") - .replace("/", "-") - .replace("\0", " ") - ) + return ( + re.sub("\s+", " ", filename.strip()) + .replace("\\", "-") + .replace("/", "-") + .replace("\0", " ") + ) + +def get_video_url(page): + player_config = get_player_config(page.scripts) + if not player_config: + raise VideoUnavailable(page.unavailable_message or "Could not find video URL") + + video_url, extension = get_best_video(player_config) + if not video_url: + return None, None + + filename = sanitize_filename(page.title) + filename += "." + extension + + return video_url, filename + +class YouTubeVideoPageParser(html.parser.HTMLParser): + def __init__(self): + super().__init__() + self.title = None + self.unavailable_message = None + self.scripts = [] + + def handle_starttag(self, tag, attrs): + attrs = dict(attrs) + self._handle_title(tag, attrs) + self._handle_unavailable_message(tag, attrs) + self._handle_script(tag, attrs) + + def handle_endtag(self, tag): + self.handle_data = self._ignore_data + + def _ignore_data(self, _): + pass + + def _handle_title(self, tag, attrs): + if tag == "title": + self.handle_data = self._handle_title_data -def get_video_url(doc): - unavailable = doc.xpath("//div[@id='unavailable-message']/text()") - if unavailable: - raise VideoUnavailable(unavailable[0].strip()) + def _handle_title_data(self, data): + self.title = data.strip() - player_config = get_player_config(doc) - if not player_config: - raise VideoUnavailable("Could not find video URL") + def _handle_unavailable_message(self, tag, attrs): + if attrs.get("id", None) == "unavailable-message": + self.handle_data = self._handle_unavailable_message_data - video_url, extension = get_best_video(player_config) - if not video_url: - return None, None + def _handle_unavailable_message_data(self, data): + self.unavailable_message = data.strip() - title = doc.xpath("/html/head/title/text()")[0] - filename = sanitize_filename(title) - filename += "." + extension + def _handle_script(self, tag, attrs): + if tag == "script": + self.handle_data = self._handle_script_data - return video_url, filename + def _handle_script_data(self, data): + if data: + self.scripts.append(data) def write_video(filename, video_data): - httpinfo = video_data.info() - encoded_filename = urllib.quote(filename.encode("utf-8")) - sys.stdout.write("Content-Disposition: attachment; filename*=UTF-8''%s\r\n" % encoded_filename) - sys.stdout.write("Content-Length: %s\r\n" % httpinfo.getheader("Content-Length")) - sys.stdout.write("\r\n") - shutil.copyfileobj(video_data, sys.stdout) - video_data.close() + quoted_filename = urllib.parse.quote(filename.encode("utf-8")) + sys.stdout.buffer.write( + b"Content-Disposition: attachment; filename*=UTF-8''{0}\r\n" + .replace(b"{0}", quoted_filename.encode("utf-8")) + ) + sys.stdout.buffer.write( + b"Content-Length: {0}\r\n" + .replace(b"{0}", video_data.getheader("Content-Length").encode("utf-8")) + ) + sys.stdout.buffer.write(b"\r\n") + shutil.copyfileobj(video_data, sys.stdout.buffer) + video_data.close() def cgimain(): - args = cgi.parse() - try: - url = args["url"][0] - except: - print_form(url="http://www.youtube.com/watch?v=FOOBAR") - return - - try: - doc = parse_url(url) - video_url, filename = get_video_url(doc) - video_data = urlopen(video_url) - write_video(filename, video_data) - except VideoUnavailable, e: - print_form( - url=url, - msg="

Sorry, there was an error: %s

" % cgi.escape(e.message) - ) - except Exception, e: - print_form( - url=url, - msg="

Sorry, there was an error. Check your URL?

" - ) - return - -def copy_with_progress(total_size, infile, outfile): - def pp_size(size): - suffixes = ["", "KiB", "MiB", "GiB"] - for i, suffix in enumerate(suffixes): - if size < 1024: - break - size /= 1024 - return "%.2f %s" % (size, suffix) - - def print_status(): - sys.stdout.write("\33[2K\r") - sys.stdout.write("%s / %s (%s/sec)" % ( - pp_size(bytes_read), - pp_size(total_size), - pp_size(bytes_read / (now - start_ts)), - )) - sys.stdout.flush() - - start_ts = time.time() - last_ts = 0 - bytes_read = 0 - while True: - now = time.time() - if now - last_ts > 0.5: - last_ts = now - print_status() - - buf = infile.read(32768) - if not buf: - break - outfile.write(buf) - bytes_read += len(buf) - - # Newline at the end - print_status() - print + args = cgi.parse() + try: + url = args["url"][0] + except: + print_form(url="http://www.youtube.com/watch?v=FOOBAR") + return + + try: + page = YouTubeVideoPageParser() + parse_url(url, page) + video_url, filename = get_video_url(page) + video_data = urlopen(video_url) + except VideoUnavailable as e: + print_form( + url=url, + msg="

Sorry, there was an error: %s

" % cgi.escape(e.args[0]) + ) + except Exception as e: + print_form( + url=url, + msg="

Sorry, there was an error. Check your URL?

" + ) + return + + write_video(filename, video_data) + +def pp_size(size): + suffixes = ["", "KiB", "MiB", "GiB"] + for i, suffix in enumerate(suffixes): + if size < 1024: + break + size /= 1024 + return "%.2f %s" % (size, suffix) + +def copy_with_progress(content_length, infile, outfile): + def print_status(): + rate = 0 + if now != last_ts: + rate = last_bytes_read / (now - last_ts) + sys.stdout.write("\33[2K\r") + sys.stdout.write("%s / %s (%s/sec)" % ( + pp_size(bytes_read), + pp_size(content_length), + pp_size(rate), + )) + sys.stdout.flush() + + last_ts = 0 + last_bytes_read = 0 + bytes_read = 0 + while True: + now = time.time() + if now - last_ts > 0.5: + print_status() + last_ts = now + last_bytes_read = 0 + + buf = infile.read(32768) + if not buf: + break + outfile.write(buf) + last_bytes_read += len(buf) + bytes_read += len(buf) + + # Newline at the end + print_status() + print() def main(): - try: - url = sys.argv[1] - except: - print >>sys.stderr, "Usage: %s http://youtube.com/watch?v=FOOBAR" % sys.argv[0] - sys.exit(1) - doc = parse_url(url) - video_url, filename = get_video_url(doc) - video_data = urlopen(video_url) - if os.path.isfile(filename): - print >>sys.stderr, "Error! File exists:", filename - sys.exit(1) - outfile = open(filename, "w") - total_size = int(video_data.info().getheader("Content-Length")) - print "Downloading", filename.encode("utf-8") - copy_with_progress(total_size, video_data, outfile) - video_data.close() - outfile.close() + try: + url = sys.argv[1] + except: + print("Usage: %s http://youtube.com/watch?v=FOOBAR" % sys.argv[0], file=sys.stderr) + sys.exit(1) + + page = YouTubeVideoPageParser() + parse_url(url, page) + video_url, filename = get_video_url(page) + print("Downloading", filename) + + outfile = open(filename, "ab") + offset = outfile.tell() + if offset > 0: + print("Resuming download from", pp_size(offset)) + total_size = None + + while True: + try: + video_data = urlopen(video_url, offset) + except urllib.error.HTTPError as e: + if e.code == 416: + print("File is complete!") + break + else: + raise + + content_length = int(video_data.getheader("Content-Length")) + if total_size is None: + total_size = content_length + + try: + copy_with_progress(content_length, video_data, outfile) + except IOError as e: + print() + + video_data.close() + if outfile.tell() != total_size: + old_offset = offset + offset = outfile.tell() + if old_offset == offset: + time.sleep(1) + print("Restarting download from", pp_size(offset)) + else: + break + + outfile.close() if __name__ == "__main__": - resource.setrlimit(resource.RLIMIT_AS, (MAX_MEMORY_BYTES, MAX_MEMORY_BYTES)) - if os.environ.has_key("SCRIPT_NAME"): - cgimain() - else: - try: - main() - except KeyboardInterrupt: - print "\nExiting..." - sys.exit(1) + if "SCRIPT_NAME" in os.environ: + cgimain() + else: + try: + main() + except KeyboardInterrupt: + print("\nExiting...") + sys.exit(1)