+ if url.startswith("//"):
+ url = "https:" + url
+ if not url.startswith("http://") and not url.startswith("https://"):
+ url = "https://www.youtube.com" + url
+
+ global referrer
+ req = urllib.request.Request(url)
+ if not referrer:
+ referrer = url
+ else:
+ req.add_header("Referer", referrer)
+
+ req.add_header("User-Agent", USER_AGENT)
+
+ if offset:
+ req.add_header("Range", "bytes=%d-" % offset)
+
+ res = urlopener.open(req)
+
+ content_range = res.getheader("Content-Range")
+ if content_range:
+ tokens = content_range.split()
+ assert tokens[0] == "bytes"
+ start = int(tokens[1].split("-")[0])
+ assert start == offset
+ return res
+
+def validate_url(url):
+ parsed_url = urllib.parse.urlparse(url)
+ scheme_ok = parsed_url.scheme == "https"
+ host_ok = parsed_url.netloc.lstrip("www.") in ["youtube.com", "youtu.be"]
+
+ if scheme_ok and host_ok:
+ return
+ else:
+ raise NotYouTube()
+
+def parse_url(url, parser):
+ f = urlopen(url)
+ parser.feed(f.read().decode("utf-8"))
+ parser.close()
+ f.close()
+
+def append_to_qs(url, params):
+ r = list(urllib.parse.urlsplit(url))
+ qs = urllib.parse.parse_qs(r[3])
+ qs.update(params)
+ r[3] = urllib.parse.urlencode(qs, True)
+ url = urllib.parse.urlunsplit(r)
+ return url
+
+def get_player_config(scripts):
+ player_config = None
+ for script in scripts:
+ for line in script.split("\n"):
+ s = "ytplayer.config = {"
+ if s in line:
+ p1 = line.find(s) + len(s) - 1
+ p2 = line.find("};", p1) + 1
+ if p1 >= 0 and p2 > 0:
+ return json.loads(line[p1:p2])
+
+def extract_js(script):
+ PREFIX = "var _yt_player={};(function(g){var window=this;"
+ SUFFIX = ";})(_yt_player);\n"
+ assert script.startswith(PREFIX)
+ assert script.endswith(SUFFIX)
+
+ return script[len(PREFIX):-len(SUFFIX)]
+
+def find_func_name(script):
+ FUNC_NAME = R"([a-zA-Z0-9$]+)"
+ FUNC_PARAMS = R"(\([a-zA-Z,\.]+\.s\))"
+ TERMINATOR = R"[,;\)]"
+ PATTERN = FUNC_NAME + FUNC_PARAMS + TERMINATOR
+
+ match = re.search(PATTERN, script)
+ func_name = match.groups()[0]
+ return func_name
+
+def decode_signature(js_url, signature):
+ f = urlopen(js_url)
+ script = f.read().decode("utf-8")
+ f.close()
+
+ func_name = find_func_name(script)
+
+ params = {
+ "func_name": func_name,
+ "signature": json.dumps(signature),
+ "code": json.dumps(extract_js(script)),
+ }
+ p = subprocess.Popen(
+ "node",
+ shell=True,
+ close_fds=True,
+ stdin=subprocess.PIPE,
+ stdout=subprocess.PIPE
+ )
+ js_decode_script = ("""
+ const vm = require('vm');
+
+ const sandbox = {
+ location: {
+ hash: '',
+ href: '',
+ protocol: 'http:'
+ },
+ history: {
+ pushState: function(){}
+ },
+ document: {},
+ navigator: {
+ userAgent: ''
+ },
+ XMLHttpRequest: class XMLHttpRequest {},
+ matchMedia: () => ({matches: () => {}, media: ''}),
+ signature: %(signature)s,
+ transformed_signature: null,
+ g: function(){} // this is _yt_player
+ };
+ sandbox.window = sandbox;
+
+ const code_string = %(code)s + ';';
+ const exec_string = 'transformed_signature = %(func_name)s("", "MARKER", signature);';
+ vm.runInNewContext(code_string + exec_string, sandbox);
+
+ function findSignature(obj) {
+ if (typeof obj !== 'object') {
+ return;
+ }
+ for (const [key, value] of Object.entries(obj)) {
+ if (key === 'MARKER') {
+ return value;
+ }
+ const result = findSignature(value);
+ if (result) {
+ return result;
+ }
+ }
+ }
+ console.log(findSignature(sandbox.transformed_signature));
+ """ % params)
+
+ p.stdin.write(js_decode_script.encode("utf-8"))
+ p.stdin.close()
+
+ transformed_signature = p.stdout.read().decode("utf-8").strip()
+ transformed_signature = urllib.parse.unquote(transformed_signature)
+ if p.wait() != 0:
+ raise Exception("js failed to execute: %d" % p.returncode)
+
+ return transformed_signature
+
+def get_best_video(player_config):
+ url_data_list = player_config["args"]["url_encoded_fmt_stream_map"].split(",")
+ js_url = player_config["assets"]["js"]
+
+ best_url = None
+ best_quality = None
+ best_extension = None
+ for url_data in url_data_list:
+ url_data = urllib.parse.parse_qs(url_data)
+ mimetype = url_data["type"][0].split(";")[0]
+ quality = url_data["quality"][0]
+
+ if "stereo3d" in url_data:
+ continue
+ if quality not in QUALITIES:
+ continue
+ if mimetype not in MIMETYPES:
+ continue
+
+ extension = MIMETYPES[mimetype]
+ quality = QUALITIES.get(quality, -1)
+
+ if best_quality is not None and quality < best_quality:
+ continue
+
+ video_url = url_data["url"][0]
+ if "sig" in url_data:
+ signature = url_data["sig"][0]
+ elif "s" in url_data:
+ signature = decode_signature(js_url, url_data["s"][0])
+ else:
+ signature = None
+
+ if signature:
+ sp = url_data.get("sp", ["signature"])[0]
+ video_url = append_to_qs(video_url, {sp: signature})
+
+ best_url = video_url
+ best_quality = quality
+ best_extension = extension
+
+ return best_url, best_extension