]> code.delx.au - youtube-cgi/blob - youtube.cgi
Fix for Google changes
[youtube-cgi] / youtube.cgi
1 #!/usr/bin/env python3
2
3 import cgi
4 import html.parser
5 import http.cookiejar
6 import json
7 import os
8 import re
9 import shutil
10 import subprocess
11 import sys
12 import time
13 import urllib.error
14 import urllib.parse
15 import urllib.request
16
17
18 USER_AGENT = "Mozilla/5.0 (X11; Linux x86_64; rv:67.0) Gecko/20100101 Firefox/67.0"
19
20 MIMETYPES = {
21 "video/mp4": "mp4",
22 "video/x-flv": "flv",
23 "video/3gpp": "3gp",
24 }
25
26 QUALITIES = {
27 "hd1080": 5,
28 "hd720": 4,
29 "large": 3,
30 "medium": 2,
31 "small": 1,
32 }
33
34
35 class VideoUnavailable(Exception):
36 pass
37
38 class NotYouTube(Exception):
39 pass
40
41 def print_form(url="", msg=""):
42 script_url = "https://%s%s" % (os.environ["HTTP_HOST"], os.environ["REQUEST_URI"])
43 sys.stdout.write("Content-Type: text/html\r\n\r\n")
44 sys.stdout.write("""
45 <!DOCTYPE html>
46 <html>
47 <head>
48 <title>delx.net.au - YouTube Scraper</title>
49 <link rel="stylesheet" type="text/css" href="/style.css">
50 <style type="text/css">
51 input[type="text"] {
52 width: 100%;
53 }
54 .error {
55 color: red;
56 }
57 </style>
58 </head>
59 <body>
60 <h1>delx.net.au - YouTube Scraper</h1>
61 {0}
62 <form action="" method="get">
63 <p>This page will let you easily download YouTube videos to watch offline. It
64 will automatically grab the highest quality version.</p>
65 <div><input type="text" name="url" value="{1}"/></div>
66 <div><input type="submit" value="Download!"/></div>
67 </form>
68 <p>Tip! Use this bookmarklet: <a href="javascript:(function(){window.location='{2}?url='+escape(location);})()">YouTube Download</a>
69 to easily download videos. Right-click the link and add it to bookmarks,
70 then when you're looking at a YouTube page select that bookmark from your
71 browser's bookmarks menu to download the video straight away.</p>
72 </body>
73 </html>
74 """.replace("{0}", msg).replace("{1}", url).replace("{2}", script_url))
75
76 cookiejar = http.cookiejar.CookieJar()
77 urlopener = urllib.request.build_opener(urllib.request.HTTPCookieProcessor(cookiejar))
78 referrer = ""
79
80 def urlopen(url, offset=None):
81 if url.startswith("//"):
82 url = "https:" + url
83 if not url.startswith("http://") and not url.startswith("https://"):
84 url = "https://www.youtube.com" + url
85
86 global referrer
87 req = urllib.request.Request(url)
88 if not referrer:
89 referrer = url
90 else:
91 req.add_header("Referer", referrer)
92
93 req.add_header("User-Agent", USER_AGENT)
94
95 if offset:
96 req.add_header("Range", "bytes=%d-" % offset)
97
98 res = urlopener.open(req)
99
100 content_range = res.getheader("Content-Range")
101 if content_range:
102 tokens = content_range.split()
103 assert tokens[0] == "bytes"
104 start = int(tokens[1].split("-")[0])
105 assert start == offset
106 return res
107
108 def validate_url(url):
109 parsed_url = urllib.parse.urlparse(url)
110 scheme_ok = parsed_url.scheme == "https"
111 host_ok = parsed_url.netloc.lstrip("www.") in ["youtube.com", "youtu.be"]
112
113 if scheme_ok and host_ok:
114 return
115 else:
116 raise NotYouTube()
117
118 def parse_url(url, parser):
119 f = urlopen(url)
120 parser.feed(f.read().decode("utf-8"))
121 parser.close()
122 f.close()
123
124 def append_to_qs(url, params):
125 r = list(urllib.parse.urlsplit(url))
126 qs = urllib.parse.parse_qs(r[3])
127 qs.update(params)
128 r[3] = urllib.parse.urlencode(qs, True)
129 url = urllib.parse.urlunsplit(r)
130 return url
131
132 def get_player_config(scripts):
133 player_config = None
134 for script in scripts:
135 for line in script.split("\n"):
136 s = "ytplayer.config = {"
137 if s in line:
138 p1 = line.find(s) + len(s) - 1
139 p2 = line.find("};", p1) + 1
140 if p1 >= 0 and p2 > 0:
141 return json.loads(line[p1:p2])
142
143 def extract_js(script):
144 PREFIX = "var _yt_player={};(function(g){var window=this;"
145 SUFFIX = ";})(_yt_player);\n"
146 assert script.startswith(PREFIX)
147 assert script.endswith(SUFFIX)
148
149 return script[len(PREFIX):-len(SUFFIX)]
150
151 def find_func_name(script):
152 FUNC_NAME = R"([a-zA-Z0-9$]+)"
153 DECODE_URI_COMPONENT = R"(\(decodeURIComponent)?"
154 FUNC_PARAMS = R"(\([a-zA-Z,\.]+\.s\))"
155 TERMINATOR = R"[,;\)]"
156 PATTERN = FUNC_NAME + DECODE_URI_COMPONENT + FUNC_PARAMS + TERMINATOR
157
158 match = re.search(PATTERN, script)
159 func_name = match.groups()[0]
160 return func_name
161
162 def decode_signature(js_url, signature):
163 f = urlopen(js_url)
164 script = f.read().decode("utf-8")
165 f.close()
166
167 func_name = find_func_name(script)
168
169 params = {
170 "func_name": func_name,
171 "signature": json.dumps(signature),
172 "code": json.dumps(extract_js(script)),
173 }
174 p = subprocess.Popen(
175 "node",
176 shell=True,
177 close_fds=True,
178 stdin=subprocess.PIPE,
179 stdout=subprocess.PIPE
180 )
181 js_decode_script = ("""
182 const vm = require('vm');
183
184 const sandbox = {
185 location: {
186 hash: '',
187 href: '',
188 protocol: 'http:'
189 },
190 history: {
191 pushState: function(){}
192 },
193 document: {},
194 navigator: {
195 userAgent: ''
196 },
197 XMLHttpRequest: class XMLHttpRequest {},
198 matchMedia: () => ({matches: () => {}, media: ''}),
199 signature: %(signature)s,
200 transformed_signature: null,
201 g: function(){} // this is _yt_player
202 };
203 sandbox.window = sandbox;
204
205 const code_string = %(code)s + ';';
206 const exec_string = 'transformed_signature = %(func_name)s(signature);';
207 vm.runInNewContext(code_string + exec_string, sandbox);
208
209 console.log(sandbox.transformed_signature);
210 """ % params)
211
212 p.stdin.write(js_decode_script.encode("utf-8"))
213 p.stdin.close()
214
215 transformed_signature = p.stdout.read().decode("utf-8").strip()
216 transformed_signature = urllib.parse.unquote(transformed_signature)
217 if p.wait() != 0:
218 raise Exception("js failed to execute: %d" % p.returncode)
219
220 return transformed_signature
221
222 def get_best_video(player_config):
223 js_url = player_config["assets"]["js"]
224
225 player_args = player_config["args"]
226 player_response = json.loads(player_args["player_response"])
227 formats = player_response["streamingData"]["formats"]
228
229 best_url = None
230 best_quality = None
231 best_extension = None
232 for format_data in formats:
233 mimetype = format_data["mimeType"].split(";")[0]
234 quality = format_data["quality"]
235
236 if quality not in QUALITIES:
237 continue
238 if mimetype not in MIMETYPES:
239 continue
240
241 extension = MIMETYPES[mimetype]
242 quality = QUALITIES.get(quality, -1)
243
244 if best_quality is not None and quality < best_quality:
245 continue
246
247 if "cipher" in format_data:
248 cipher = urllib.parse.parse_qs(format_data["cipher"])
249 video_url = cipher["url"][0]
250 if "sig" in cipher:
251 signature = cipher["sig"][0]
252 elif "s" in cipher:
253 signature = decode_signature(js_url, cipher["s"][0])
254 sp = cipher.get("sp", ["signature"])[0]
255 video_url = append_to_qs(video_url, {sp: signature})
256 else:
257 video_url = format_data["url"]
258
259 best_url = video_url
260 best_quality = quality
261 best_extension = extension
262
263 return best_url, best_extension
264
265 def sanitize_filename(filename):
266 return (
267 re.sub("\s+", " ", filename.strip())
268 .replace("\\", "-")
269 .replace("/", "-")
270 .replace("\0", " ")
271 )
272
273 def get_video_url(page):
274 player_config = get_player_config(page.scripts)
275 if not player_config:
276 raise VideoUnavailable(page.unavailable_message or "Could not find video URL")
277
278 video_url, extension = get_best_video(player_config)
279 if not video_url:
280 return None, None
281
282 title = player_config["args"].get("title", None)
283 if not title:
284 title = json.loads(player_config["args"]["player_response"])["videoDetails"]["title"]
285 if not title:
286 title = "Unknown title"
287
288 filename = sanitize_filename(title) + "." + extension
289
290 return video_url, filename
291
292 class YouTubeVideoPageParser(html.parser.HTMLParser):
293 def __init__(self):
294 super().__init__()
295 self.unavailable_message = None
296 self.scripts = []
297
298 def handle_starttag(self, tag, attrs):
299 attrs = dict(attrs)
300 self._handle_unavailable_message(tag, attrs)
301 self._handle_script(tag, attrs)
302
303 def handle_endtag(self, tag):
304 self.handle_data = self._ignore_data
305
306 def _ignore_data(self, _):
307 pass
308
309 def _handle_unavailable_message(self, tag, attrs):
310 if attrs.get("id", None) == "unavailable-message":
311 self.handle_data = self._handle_unavailable_message_data
312
313 def _handle_unavailable_message_data(self, data):
314 self.unavailable_message = data.strip()
315
316 def _handle_script(self, tag, attrs):
317 if tag == "script":
318 self.handle_data = self._handle_script_data
319
320 def _handle_script_data(self, data):
321 if data:
322 self.scripts.append(data)
323
324 def write_video(filename, video_data):
325 quoted_filename = urllib.parse.quote(filename.encode("utf-8"))
326 sys.stdout.buffer.write(
327 b"Content-Disposition: attachment; filename*=UTF-8''{0}\r\n"
328 .replace(b"{0}", quoted_filename.encode("utf-8"))
329 )
330 sys.stdout.buffer.write(
331 b"Content-Length: {0}\r\n"
332 .replace(b"{0}", video_data.getheader("Content-Length").encode("utf-8"))
333 )
334 sys.stdout.buffer.write(b"\r\n")
335 shutil.copyfileobj(video_data, sys.stdout.buffer)
336 video_data.close()
337
338 def cgimain():
339 args = cgi.parse()
340 try:
341 url = args["url"][0]
342 except:
343 print_form(url="https://www.youtube.com/watch?v=FOOBAR")
344 return
345
346 try:
347 page = YouTubeVideoPageParser()
348 validate_url(url)
349 parse_url(url, page)
350 video_url, filename = get_video_url(page)
351 video_data = urlopen(video_url)
352 except VideoUnavailable as e:
353 print_form(
354 url=url,
355 msg="<p class='error'>Sorry, there was an error: %s</p>" % cgi.escape(e.args[0])
356 )
357 except NotYouTube:
358 print_form(
359 url=url,
360 msg="<p class='error'>Sorry, that does not look like a YouTube page!</p>"
361 )
362 except Exception as e:
363 print_form(
364 url=url,
365 msg="<p class='error'>Sorry, there was an unknown error.</p>"
366 )
367 return
368
369 write_video(filename, video_data)
370
371 def pp_size(size):
372 suffixes = ["", "KiB", "MiB", "GiB"]
373 for i, suffix in enumerate(suffixes):
374 if size < 1024:
375 break
376 size /= 1024
377 return "%.2f %s" % (size, suffix)
378
379 def copy_with_progress(content_length, infile, outfile):
380 def print_status():
381 rate = 0
382 if now != last_ts:
383 rate = last_bytes_read / (now - last_ts)
384 sys.stdout.write("\33[2K\r")
385 sys.stdout.write("%s / %s (%s/sec)" % (
386 pp_size(bytes_read),
387 pp_size(content_length),
388 pp_size(rate),
389 ))
390 sys.stdout.flush()
391
392 last_ts = 0
393 last_bytes_read = 0
394 bytes_read = 0
395 while True:
396 now = time.time()
397 if now - last_ts > 0.5:
398 print_status()
399 last_ts = now
400 last_bytes_read = 0
401
402 buf = infile.read(32768)
403 if not buf:
404 break
405 outfile.write(buf)
406 last_bytes_read += len(buf)
407 bytes_read += len(buf)
408
409 # Newline at the end
410 print_status()
411 print()
412
413 def main():
414 try:
415 url = sys.argv[1]
416 except:
417 print("Usage: %s https://youtube.com/watch?v=FOOBAR" % sys.argv[0], file=sys.stderr)
418 sys.exit(1)
419
420 page = YouTubeVideoPageParser()
421 parse_url(url, page)
422 video_url, filename = get_video_url(page)
423 print("Downloading", filename)
424
425 outfile = open(filename, "ab")
426 offset = outfile.tell()
427 if offset > 0:
428 print("Resuming download from", pp_size(offset))
429 total_size = None
430
431 while True:
432 try:
433 video_data = urlopen(video_url, offset)
434 except urllib.error.HTTPError as e:
435 if e.code == 416:
436 print("File is complete!")
437 break
438 else:
439 raise
440
441 content_length = int(video_data.getheader("Content-Length"))
442 if total_size is None:
443 total_size = content_length
444
445 try:
446 copy_with_progress(content_length, video_data, outfile)
447 except IOError as e:
448 print()
449
450 video_data.close()
451 if outfile.tell() != total_size:
452 old_offset = offset
453 offset = outfile.tell()
454 if old_offset == offset:
455 time.sleep(1)
456 print("Restarting download from", pp_size(offset))
457 else:
458 break
459
460 outfile.close()
461
462
463 if __name__ == "__main__":
464 if "SCRIPT_NAME" in os.environ:
465 cgimain()
466 else:
467 try:
468 main()
469 except KeyboardInterrupt:
470 print("\nExiting...")
471 sys.exit(1)
472