- url_data = urlparse.parse_qs(player_config["args"]["url_encoded_fmt_stream_map"])
- url_data = itertools.izip_longest(
- url_data["url"],
- url_data["type"],
- url_data["quality"],
- url_data.get("sig", []),
- )
- best_url = None
- best_quality = None
- best_extension = None
- for video_url, mimetype, quality, signature in url_data:
- mimetype = mimetype.split(";")[0]
- if mimetype not in MIMETYPES:
- continue
- extension = "." + MIMETYPES[mimetype]
- quality = QUALITIES.get(quality.split(",")[0], -1)
- if best_quality is None or quality > best_quality:
- if signature:
- video_url = append_to_qs(video_url, {"signature": signature})
- best_url = video_url
- best_quality = quality
- best_extension = extension
-
- return best_url, best_extension
-
-def get_video_url(doc):
- unavailable = doc.xpath("//div[@id='unavailable-message']/text()")
- if unavailable:
- raise VideoUnavailable(unavailable[0].strip())
-
- player_config = get_player_config(doc)
- if not player_config:
- raise VideoUnavailable("Could not find video URL")
-
- video_url, extension = get_best_video(player_config)
- if not video_url:
- return None, None
-
- title = doc.xpath("/html/head/title/text()")[0]
- title = re.sub("\s+", " ", title.strip())
- valid_chars = frozenset("-_.() abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789")
- filename = "".join(c for c in title.encode("ascii", "ignore") if c in valid_chars)
- filename += extension
-
- return video_url, filename
+ url_data_list = player_config["args"]["url_encoded_fmt_stream_map"].split(",")
+ js_url = player_config["assets"]["js"]
+
+ best_url = None
+ best_quality = None
+ best_extension = None
+ for url_data in url_data_list:
+ url_data = urllib.parse.parse_qs(url_data)
+ mimetype = url_data["type"][0].split(";")[0]
+ quality = url_data["quality"][0]
+
+ if "stereo3d" in url_data:
+ continue
+ if quality not in QUALITIES:
+ continue
+ if mimetype not in MIMETYPES:
+ continue
+
+ extension = MIMETYPES[mimetype]
+ quality = QUALITIES.get(quality, -1)
+
+ if best_quality is not None and quality < best_quality:
+ continue
+
+ video_url = url_data["url"][0]
+ if "sig" in url_data:
+ signature = url_data["sig"][0]
+ elif "s" in url_data:
+ signature = decode_signature(js_url, url_data["s"][0])
+ else:
+ signature = None
+
+ if signature:
+ video_url = append_to_qs(video_url, {"signature": signature})
+
+ best_url = video_url
+ best_quality = quality
+ best_extension = extension
+
+ return best_url, best_extension
+
+def sanitize_filename(filename):
+ return (
+ re.sub("\s+", " ", filename.strip())
+ .replace("\\", "-")
+ .replace("/", "-")
+ .replace("\0", " ")
+ )
+
+def get_video_url(page):
+ player_config = get_player_config(page.scripts)
+ if not player_config:
+ raise VideoUnavailable(page.unavailable_message or "Could not find video URL")
+
+ video_url, extension = get_best_video(player_config)
+ if not video_url:
+ return None, None
+
+ filename = sanitize_filename(page.title)
+ filename += "." + extension
+
+ return video_url, filename
+
+class YouTubeVideoPageParser(html.parser.HTMLParser):
+ def __init__(self):
+ super().__init__()
+ self.title = None
+ self.unavailable_message = None
+ self.scripts = []
+
+ def handle_starttag(self, tag, attrs):
+ attrs = dict(attrs)
+ self._handle_title(tag, attrs)
+ self._handle_unavailable_message(tag, attrs)
+ self._handle_script(tag, attrs)
+
+ def handle_endtag(self, tag):
+ self.handle_data = self._ignore_data
+
+ def _ignore_data(self, _):
+ pass
+
+ def _handle_title(self, tag, attrs):
+ if tag == "title":
+ self.handle_data = self._handle_title_data
+
+ def _handle_title_data(self, data):
+ self.title = data.strip()
+
+ def _handle_unavailable_message(self, tag, attrs):
+ if attrs.get("id", None) == "unavailable-message":
+ self.handle_data = self._handle_unavailable_message_data
+
+ def _handle_unavailable_message_data(self, data):
+ self.unavailable_message = data.strip()
+
+ def _handle_script(self, tag, attrs):
+ if tag == "script":
+ self.handle_data = self._handle_script_data
+
+ def _handle_script_data(self, data):
+ if data:
+ self.scripts.append(data)
+
+def write_video(filename, video_data):
+ quoted_filename = urllib.parse.quote(filename.encode("utf-8"))
+ sys.stdout.buffer.write(
+ b"Content-Disposition: attachment; filename*=UTF-8''{0}\r\n"
+ .replace(b"{0}", quoted_filename.encode("utf-8"))
+ )
+ sys.stdout.buffer.write(
+ b"Content-Length: {0}\r\n"
+ .replace(b"{0}", video_data.getheader("Content-Length").encode("utf-8"))
+ )
+ sys.stdout.buffer.write(b"\r\n")
+ shutil.copyfileobj(video_data, sys.stdout.buffer)
+ video_data.close()