]> code.delx.au - youtube-cgi/blobdiff - youtube.cgi
Google seems to care about user agents now...
[youtube-cgi] / youtube.cgi
index ef7b6084433884ec75d0402a823d19c5278d35a7..212ceae83234d81ccc0c63b7681f00d9fadbd92f 100755 (executable)
@@ -15,7 +15,8 @@ import urllib.parse
 import urllib.request
 
 
-USER_AGENT = "Mozilla/5.0 (X11; Linux x86_64; rv:67.0) Gecko/20100101 Firefox/67.0"
+MOZILLA_RELEASE_URL = "https://www.mozilla.org/en-US/firefox/releases/"
+USER_AGENT_TEMPLATE = "Mozilla/5.0 (X11; Linux x86_64; rv:83.0) Gecko/20100101 Firefox/%s"
 
 MIMETYPES = {
     "video/mp4": "mp4",
@@ -76,8 +77,17 @@ def print_form(url="", msg=""):
 cookiejar = http.cookiejar.CookieJar()
 urlopener = urllib.request.build_opener(urllib.request.HTTPCookieProcessor(cookiejar))
 referrer = ""
+user_agent = None
 
 def urlopen(url, offset=None):
+    global user_agent
+    if not user_agent:
+        page = MozillaReleasesPageParser()
+        with urllib.request.urlopen(MOZILLA_RELEASE_URL) as f:
+            page.feed(f.read().decode("utf-8"))
+            page.close()
+        user_agent = USER_AGENT_TEMPLATE % page.latest_release
+
     if url.startswith("//"):
         url = "https:" + url
     if not url.startswith("http://") and not url.startswith("https://"):
@@ -90,7 +100,7 @@ def urlopen(url, offset=None):
     else:
         req.add_header("Referer", referrer)
 
-    req.add_header("User-Agent", USER_AGENT)
+    req.add_header("User-Agent", user_agent)
 
     if offset:
         req.add_header("Range", "bytes=%d-" % offset)
@@ -108,14 +118,15 @@ def urlopen(url, offset=None):
 def validate_url(url):
     parsed_url = urllib.parse.urlparse(url)
     scheme_ok = parsed_url.scheme == "https"
-    host_ok = parsed_url.netloc.lstrip("www.") in ["youtube.com", "youtu.be"]
+    host = parsed_url.netloc.lstrip("www.").lstrip("m.")
+    host_ok = host in ["youtube.com", "youtu.be"]
 
     if scheme_ok and host_ok:
         return
     else:
         raise NotYouTube()
 
-def parse_url(url, parser):
+def load_parse_url(url, parser):
     f = urlopen(url)
     parser.feed(f.read().decode("utf-8"))
     parser.close()
@@ -130,15 +141,20 @@ def append_to_qs(url, params):
     return url
 
 def get_player_config(scripts):
-    player_config = None
+    config_strings = [
+        ("ytplayer.config = {", 1, "};", 1),
+        ("ytcfg.set({\"", 2, "});", 1),
+    ]
+    player_config = {}
     for script in scripts:
         for line in script.split("\n"):
-            s = "ytplayer.config = {"
-            if s in line:
-                p1 = line.find(s) + len(s) - 1
-                p2 = line.find("};", p1) + 1
-                if p1 >= 0 and p2 > 0:
-                    return json.loads(line[p1:p2])
+            for s1, off1, s2, off2 in config_strings:
+                if s1 in line:
+                    p1 = line.find(s1) + len(s1) - off1
+                    p2 = line.find(s2, p1) + off2
+                    if p1 >= 0 and p2 > 0:
+                        player_config.update(json.loads(line[p1:p2]))
+    return player_config
 
 def extract_js(script):
     PREFIX = "var _yt_player={};(function(g){var window=this;"
@@ -148,26 +164,44 @@ def extract_js(script):
 
     return script[len(PREFIX):-len(SUFFIX)]
 
-def find_func_name(script):
+def find_cipher_func(script):
     FUNC_NAME = R"([a-zA-Z0-9$]+)"
+    DECODE_URI_COMPONENT = R"(\(decodeURIComponent)?"
     FUNC_PARAMS = R"(\([a-zA-Z,\.]+\.s\))"
     TERMINATOR = R"[,;\)]"
-    PATTERN = FUNC_NAME + FUNC_PARAMS + TERMINATOR
+    PATTERN = FUNC_NAME + DECODE_URI_COMPONENT + FUNC_PARAMS + TERMINATOR
 
     match = re.search(PATTERN, script)
     func_name = match.groups()[0]
     return func_name
 
-def decode_signature(js_url, signature):
+def find_url_func(script):
+    FUNC_NAME = R"([a-zA-Z0-9$]+)"
+    PATTERN = R"this\.url\s*=\s*" + FUNC_NAME + R"\s*\(\s*this\s*\)"
+
+    match = re.search(PATTERN, script)
+    func_name = match.groups()[0]
+    return func_name
+
+def decode_cipher_url(js_url, cipher):
+    cipher = urllib.parse.parse_qs(cipher)
+    args = [
+        cipher["url"][0],
+        cipher["sp"][0],
+        cipher["s"][0],
+    ]
+
     f = urlopen(js_url)
     script = f.read().decode("utf-8")
     f.close()
 
-    func_name = find_func_name(script)
+    cipher_func_name = find_cipher_func(script)
+    url_func_name = find_url_func(script)
 
     params = {
-        "func_name": func_name,
-        "signature": json.dumps(signature),
+        "cipher_func_name": cipher_func_name,
+        "url_func_name": url_func_name,
+        "args": json.dumps(args),
         "code": json.dumps(extract_js(script)),
     }
     p = subprocess.Popen(
@@ -180,72 +214,61 @@ def decode_signature(js_url, signature):
     js_decode_script = ("""
         const vm = require('vm');
 
-        const sandbox = {
-            location: {
-                hash: '',
-                href: '',
-                protocol: 'http:'
-            },
-            history: {
-                pushState: function(){}
-            },
-            document: {},
-            navigator: {
-                userAgent: ''
-            },
-            XMLHttpRequest: class XMLHttpRequest {},
-            matchMedia: () => ({matches: () => {}, media: ''}),
-            signature: %(signature)s,
-            transformed_signature: null,
-            g: function(){} // this is _yt_player
+        const fakeGlobal = {};
+        fakeGlobal.window = fakeGlobal;
+        fakeGlobal.location = {
+            hash: '',
+            host: 'www.youtube.com',
+            hostname: 'www.youtube.com',
+            href: 'https://www.youtube.com',
+            origin: 'https://www.youtube.com',
+            pathname: '/',
+            protocol: 'https:'
+        };
+        fakeGlobal.history = {
+            pushState: function(){}
+        };
+        fakeGlobal.document = {
+            location: fakeGlobal.location
+        };
+        fakeGlobal.document = {};
+        fakeGlobal.navigator = {
+            userAgent: ''
         };
-        sandbox.window = sandbox;
+        fakeGlobal.XMLHttpRequest = class XMLHttpRequest {};
+        fakeGlobal.matchMedia = () => ({matches: () => {}, media: ''});
+        fakeGlobal.result_url = null;
+        fakeGlobal.g = function(){}; // this is _yt_player
+        fakeGlobal.TimeRanges = function(){};
 
         const code_string = %(code)s + ';';
-        const exec_string = 'transformed_signature = %(func_name)s("", "MARKER", signature);';
-        vm.runInNewContext(code_string + exec_string, sandbox);
-
-        function findSignature(obj) {
-            if (typeof obj !== 'object') {
-                return;
-            }
-            for (const [key, value] of Object.entries(obj)) {
-                if (key === 'MARKER') {
-                    return value;
-                }
-                const result = findSignature(value);
-                if (result) {
-                    return result;
-                }
-            }
-        }
-        console.log(findSignature(sandbox.transformed_signature));
+        const exec_string = 'result_url = %(url_func_name)s(%(cipher_func_name)s(...%(args)s));';
+        vm.runInNewContext(code_string + exec_string, fakeGlobal);
+
+        console.log(fakeGlobal.result_url);
     """ % params)
 
     p.stdin.write(js_decode_script.encode("utf-8"))
     p.stdin.close()
 
-    transformed_signature = p.stdout.read().decode("utf-8").strip()
-    transformed_signature = urllib.parse.unquote(transformed_signature)
+    result_url = p.stdout.read().decode("utf-8").strip()
     if p.wait() != 0:
         raise Exception("js failed to execute: %d" % p.returncode)
 
-    return transformed_signature
+    return result_url
 
 def get_best_video(player_config):
-    url_data_list = player_config["args"]["url_encoded_fmt_stream_map"].split(",")
-    js_url = player_config["assets"]["js"]
+    player_args = player_config["args"]
+    player_response = json.loads(player_args["player_response"])
+    formats = player_response["streamingData"]["formats"]
 
     best_url = None
     best_quality = None
     best_extension = None
-    for url_data in url_data_list:
-        url_data = urllib.parse.parse_qs(url_data)
-        mimetype = url_data["type"][0].split(";")[0]
-        quality = url_data["quality"][0]
+    for format_data in formats:
+        mimetype = format_data["mimeType"].split(";")[0]
+        quality = format_data["quality"]
 
-        if "stereo3d" in url_data:
-            continue
         if quality not in QUALITIES:
             continue
         if mimetype not in MIMETYPES:
@@ -257,17 +280,11 @@ def get_best_video(player_config):
         if best_quality is not None and quality < best_quality:
             continue
 
-        video_url = url_data["url"][0]
-        if "sig" in url_data:
-            signature = url_data["sig"][0]
-        elif "s" in url_data:
-            signature = decode_signature(js_url, url_data["s"][0])
+        if "signatureCipher" in format_data:
+            js_url = player_config["PLAYER_JS_URL"]
+            video_url = decode_cipher_url(js_url, format_data["signatureCipher"])
         else:
-            signature = None
-
-        if signature:
-            sp = url_data.get("sp", ["signature"])[0]
-            video_url = append_to_qs(video_url, {sp: signature})
+            video_url = format_data["url"]
 
         best_url = video_url
         best_quality = quality
@@ -334,6 +351,16 @@ class YouTubeVideoPageParser(html.parser.HTMLParser):
         if data:
             self.scripts.append(data)
 
+class MozillaReleasesPageParser(html.parser.HTMLParser):
+    def __init__(self):
+        super().__init__()
+        self.latest_release = "1.0"
+
+    def handle_starttag(self, tag, attrs):
+        attrs = dict(attrs)
+        if attrs.get("data-latest-firefox", None):
+            self.latest_release = attrs.get("data-latest-firefox", None)
+
 def write_video(filename, video_data):
     quoted_filename = urllib.parse.quote(filename.encode("utf-8"))
     sys.stdout.buffer.write(
@@ -359,7 +386,9 @@ def cgimain():
     try:
         page = YouTubeVideoPageParser()
         validate_url(url)
-        parse_url(url, page)
+        with urlopen(url) as f:
+            page.feed(f.read().decode("utf-8"))
+            page.close()
         video_url, filename = get_video_url(page)
         video_data = urlopen(video_url)
     except VideoUnavailable as e:
@@ -431,7 +460,9 @@ def main():
         sys.exit(1)
 
     page = YouTubeVideoPageParser()
-    parse_url(url, page)
+    with urlopen(url) as f:
+        page.feed(f.read().decode("utf-8"))
+        page.close()
     video_url, filename = get_video_url(page)
     print("Downloading", filename)