X-Git-Url: https://code.delx.au/youtube-cgi/blobdiff_plain/c7348757cc65852d4d82c7718f36a661268ab262..88e031fe0052c3be591bf129b778698de4fceba5:/youtube.cgi diff --git a/youtube.cgi b/youtube.cgi index b57636b..0aadef3 100755 --- a/youtube.cgi +++ b/youtube.cgi @@ -15,8 +15,7 @@ import urllib.parse import urllib.request -MAX_MEMORY_BYTES = 128 * 1024*1024 -USER_AGENT = "Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:15.0) Gecko/20100101 Firefox/15.0.1" +USER_AGENT = "Mozilla/5.0 (X11; Linux x86_64; rv:82.0) Gecko/20100101 Firefox/82.0" MIMETYPES = { "video/mp4": "mp4", @@ -36,8 +35,11 @@ QUALITIES = { class VideoUnavailable(Exception): pass +class NotYouTube(Exception): + pass + def print_form(url="", msg=""): - script_url = "http://%s%s" % (os.environ["HTTP_HOST"], os.environ["REQUEST_URI"]) + script_url = "https://%s%s" % (os.environ["HTTP_HOST"], os.environ["REQUEST_URI"]) sys.stdout.write("Content-Type: text/html\r\n\r\n") sys.stdout.write(""" @@ -103,6 +105,17 @@ def urlopen(url, offset=None): assert start == offset return res +def validate_url(url): + parsed_url = urllib.parse.urlparse(url) + scheme_ok = parsed_url.scheme == "https" + host = parsed_url.netloc.lstrip("www.").lstrip("m.") + host_ok = host in ["youtube.com", "youtu.be"] + + if scheme_ok and host_ok: + return + else: + raise NotYouTube() + def parse_url(url, parser): f = urlopen(url) parser.feed(f.read().decode("utf-8")) @@ -118,15 +131,20 @@ def append_to_qs(url, params): return url def get_player_config(scripts): - player_config = None + config_strings = [ + ("ytplayer.config = {", 1, "};", 1), + ("ytcfg.set({\"", 2, "});", 1), + ] + player_config = {} for script in scripts: for line in script.split("\n"): - s = "ytplayer.config = {" - if s in line: - p1 = line.find(s) + len(s) - 1 - p2 = line.find("};", p1) + 1 - if p1 >= 0 and p2 > 0: - return json.loads(line[p1:p2]) + for s1, off1, s2, off2 in config_strings: + if s1 in line: + p1 = line.find(s1) + len(s1) - off1 + p2 = line.find(s2, p1) + off2 + if p1 >= 0 and p2 > 0: + player_config.update(json.loads(line[p1:p2])) + return player_config def extract_js(script): PREFIX = "var _yt_player={};(function(g){var window=this;" @@ -136,26 +154,44 @@ def extract_js(script): return script[len(PREFIX):-len(SUFFIX)] -def find_func_name(script): +def find_cipher_func(script): FUNC_NAME = R"([a-zA-Z0-9$]+)" + DECODE_URI_COMPONENT = R"(\(decodeURIComponent)?" FUNC_PARAMS = R"(\([a-zA-Z,\.]+\.s\))" TERMINATOR = R"[,;\)]" - PATTERN = FUNC_NAME + FUNC_PARAMS + TERMINATOR + PATTERN = FUNC_NAME + DECODE_URI_COMPONENT + FUNC_PARAMS + TERMINATOR match = re.search(PATTERN, script) func_name = match.groups()[0] return func_name -def decode_signature(js_url, signature): +def find_url_func(script): + FUNC_NAME = R"([a-zA-Z0-9$]+)" + PATTERN = R"this\.url\s*=\s*" + FUNC_NAME + R"\s*\(\s*this\s*\)" + + match = re.search(PATTERN, script) + func_name = match.groups()[0] + return func_name + +def decode_cipher_url(js_url, cipher): + cipher = urllib.parse.parse_qs(cipher) + args = [ + cipher["url"][0], + cipher["sp"][0], + cipher["s"][0], + ] + f = urlopen(js_url) script = f.read().decode("utf-8") f.close() - func_name = find_func_name(script) + cipher_func_name = find_cipher_func(script) + url_func_name = find_url_func(script) params = { - "func_name": func_name, - "signature": json.dumps(signature), + "cipher_func_name": cipher_func_name, + "url_func_name": url_func_name, + "args": json.dumps(args), "code": json.dumps(extract_js(script)), } p = subprocess.Popen( @@ -168,70 +204,60 @@ def decode_signature(js_url, signature): js_decode_script = (""" const vm = require('vm'); - const sandbox = { - location: { - hash: '', - href: '', - protocol: 'http:' - }, - history: { - pushState: function(){} - }, - document: {}, - navigator: { - userAgent: '' - }, - matchMedia: () => ({matches: () => {}, media: ''}), - signature: %(signature)s, - transformed_signature: null, - g: function(){} // this is _yt_player + const fakeGlobal = {}; + fakeGlobal.window = fakeGlobal; + fakeGlobal.location = { + hash: '', + host: 'www.youtube.com', + hostname: 'www.youtube.com', + href: 'https://www.youtube.com', + origin: 'https://www.youtube.com', + pathname: '/', + protocol: 'https:' }; - sandbox.window = sandbox; + fakeGlobal.history = { + pushState: function(){} + }; + fakeGlobal.document = { + location: fakeGlobal.location + }; + fakeGlobal.document = {}; + fakeGlobal.navigator = { + userAgent: '' + }; + fakeGlobal.XMLHttpRequest = class XMLHttpRequest {}; + fakeGlobal.matchMedia = () => ({matches: () => {}, media: ''}); + fakeGlobal.result_url = null; + fakeGlobal.g = function(){}; // this is _yt_player const code_string = %(code)s + ';'; - const exec_string = 'transformed_signature = %(func_name)s("", "MARKER", signature);'; - vm.runInNewContext(code_string + exec_string, sandbox); - - function findSignature(obj) { - if (typeof obj !== 'object') { - return; - } - for (const [key, value] of Object.entries(obj)) { - if (key === 'MARKER') { - return value; - } - const result = findSignature(value); - if (result) { - return result; - } - } - } - console.log(findSignature(sandbox.transformed_signature)); + const exec_string = 'result_url = %(url_func_name)s(%(cipher_func_name)s(...%(args)s));'; + vm.runInNewContext(code_string + exec_string, fakeGlobal); + + console.log(fakeGlobal.result_url); """ % params) p.stdin.write(js_decode_script.encode("utf-8")) p.stdin.close() - transformed_signature = p.stdout.read().decode("utf-8").strip() + result_url = p.stdout.read().decode("utf-8").strip() if p.wait() != 0: raise Exception("js failed to execute: %d" % p.returncode) - return transformed_signature + return result_url def get_best_video(player_config): - url_data_list = player_config["args"]["url_encoded_fmt_stream_map"].split(",") - js_url = player_config["assets"]["js"] + player_args = player_config["args"] + player_response = json.loads(player_args["player_response"]) + formats = player_response["streamingData"]["formats"] best_url = None best_quality = None best_extension = None - for url_data in url_data_list: - url_data = urllib.parse.parse_qs(url_data) - mimetype = url_data["type"][0].split(";")[0] - quality = url_data["quality"][0] + for format_data in formats: + mimetype = format_data["mimeType"].split(";")[0] + quality = format_data["quality"] - if "stereo3d" in url_data: - continue if quality not in QUALITIES: continue if mimetype not in MIMETYPES: @@ -243,16 +269,11 @@ def get_best_video(player_config): if best_quality is not None and quality < best_quality: continue - video_url = url_data["url"][0] - if "sig" in url_data: - signature = url_data["sig"][0] - elif "s" in url_data: - signature = decode_signature(js_url, url_data["s"][0]) + if "signatureCipher" in format_data: + js_url = player_config["PLAYER_JS_URL"] + video_url = decode_cipher_url(js_url, format_data["signatureCipher"]) else: - signature = None - - if signature: - video_url = append_to_qs(video_url, {"signature": signature}) + video_url = format_data["url"] best_url = video_url best_quality = quality @@ -277,21 +298,24 @@ def get_video_url(page): if not video_url: return None, None - filename = sanitize_filename(page.title) - filename += "." + extension + title = player_config["args"].get("title", None) + if not title: + title = json.loads(player_config["args"]["player_response"])["videoDetails"]["title"] + if not title: + title = "Unknown title" + + filename = sanitize_filename(title) + "." + extension return video_url, filename class YouTubeVideoPageParser(html.parser.HTMLParser): def __init__(self): super().__init__() - self.title = None self.unavailable_message = None self.scripts = [] def handle_starttag(self, tag, attrs): attrs = dict(attrs) - self._handle_title(tag, attrs) self._handle_unavailable_message(tag, attrs) self._handle_script(tag, attrs) @@ -301,13 +325,6 @@ class YouTubeVideoPageParser(html.parser.HTMLParser): def _ignore_data(self, _): pass - def _handle_title(self, tag, attrs): - if tag == "title": - self.handle_data = self._handle_title_data - - def _handle_title_data(self, data): - self.title = data.strip() - def _handle_unavailable_message(self, tag, attrs): if attrs.get("id", None) == "unavailable-message": self.handle_data = self._handle_unavailable_message_data @@ -342,11 +359,12 @@ def cgimain(): try: url = args["url"][0] except: - print_form(url="http://www.youtube.com/watch?v=FOOBAR") + print_form(url="https://www.youtube.com/watch?v=FOOBAR") return try: page = YouTubeVideoPageParser() + validate_url(url) parse_url(url, page) video_url, filename = get_video_url(page) video_data = urlopen(video_url) @@ -355,10 +373,15 @@ def cgimain(): url=url, msg="

Sorry, there was an error: %s

" % cgi.escape(e.args[0]) ) + except NotYouTube: + print_form( + url=url, + msg="

Sorry, that does not look like a YouTube page!

" + ) except Exception as e: print_form( url=url, - msg="

Sorry, there was an error. Check your URL?

" + msg="

Sorry, there was an unknown error.

" ) return @@ -410,7 +433,7 @@ def main(): try: url = sys.argv[1] except: - print("Usage: %s http://youtube.com/watch?v=FOOBAR" % sys.argv[0], file=sys.stderr) + print("Usage: %s https://youtube.com/watch?v=FOOBAR" % sys.argv[0], file=sys.stderr) sys.exit(1) page = YouTubeVideoPageParser()