]> code.delx.au - youtube-cgi/blob - youtube.cgi
0aadef35e5885458ec97282c27bfb2465cd9a375
[youtube-cgi] / youtube.cgi
1 #!/usr/bin/env python3
2
3 import cgi
4 import html.parser
5 import http.cookiejar
6 import json
7 import os
8 import re
9 import shutil
10 import subprocess
11 import sys
12 import time
13 import urllib.error
14 import urllib.parse
15 import urllib.request
16
17
18 USER_AGENT = "Mozilla/5.0 (X11; Linux x86_64; rv:82.0) Gecko/20100101 Firefox/82.0"
19
20 MIMETYPES = {
21 "video/mp4": "mp4",
22 "video/x-flv": "flv",
23 "video/3gpp": "3gp",
24 }
25
26 QUALITIES = {
27 "hd1080": 5,
28 "hd720": 4,
29 "large": 3,
30 "medium": 2,
31 "small": 1,
32 }
33
34
35 class VideoUnavailable(Exception):
36 pass
37
38 class NotYouTube(Exception):
39 pass
40
41 def print_form(url="", msg=""):
42 script_url = "https://%s%s" % (os.environ["HTTP_HOST"], os.environ["REQUEST_URI"])
43 sys.stdout.write("Content-Type: text/html\r\n\r\n")
44 sys.stdout.write("""
45 <!DOCTYPE html>
46 <html>
47 <head>
48 <title>delx.net.au - YouTube Scraper</title>
49 <link rel="stylesheet" type="text/css" href="/style.css">
50 <style type="text/css">
51 input[type="text"] {
52 width: 100%;
53 }
54 .error {
55 color: red;
56 }
57 </style>
58 </head>
59 <body>
60 <h1>delx.net.au - YouTube Scraper</h1>
61 {0}
62 <form action="" method="get">
63 <p>This page will let you easily download YouTube videos to watch offline. It
64 will automatically grab the highest quality version.</p>
65 <div><input type="text" name="url" value="{1}"/></div>
66 <div><input type="submit" value="Download!"/></div>
67 </form>
68 <p>Tip! Use this bookmarklet: <a href="javascript:(function(){window.location='{2}?url='+escape(location);})()">YouTube Download</a>
69 to easily download videos. Right-click the link and add it to bookmarks,
70 then when you're looking at a YouTube page select that bookmark from your
71 browser's bookmarks menu to download the video straight away.</p>
72 </body>
73 </html>
74 """.replace("{0}", msg).replace("{1}", url).replace("{2}", script_url))
75
76 cookiejar = http.cookiejar.CookieJar()
77 urlopener = urllib.request.build_opener(urllib.request.HTTPCookieProcessor(cookiejar))
78 referrer = ""
79
80 def urlopen(url, offset=None):
81 if url.startswith("//"):
82 url = "https:" + url
83 if not url.startswith("http://") and not url.startswith("https://"):
84 url = "https://www.youtube.com" + url
85
86 global referrer
87 req = urllib.request.Request(url)
88 if not referrer:
89 referrer = url
90 else:
91 req.add_header("Referer", referrer)
92
93 req.add_header("User-Agent", USER_AGENT)
94
95 if offset:
96 req.add_header("Range", "bytes=%d-" % offset)
97
98 res = urlopener.open(req)
99
100 content_range = res.getheader("Content-Range")
101 if content_range:
102 tokens = content_range.split()
103 assert tokens[0] == "bytes"
104 start = int(tokens[1].split("-")[0])
105 assert start == offset
106 return res
107
108 def validate_url(url):
109 parsed_url = urllib.parse.urlparse(url)
110 scheme_ok = parsed_url.scheme == "https"
111 host = parsed_url.netloc.lstrip("www.").lstrip("m.")
112 host_ok = host in ["youtube.com", "youtu.be"]
113
114 if scheme_ok and host_ok:
115 return
116 else:
117 raise NotYouTube()
118
119 def parse_url(url, parser):
120 f = urlopen(url)
121 parser.feed(f.read().decode("utf-8"))
122 parser.close()
123 f.close()
124
125 def append_to_qs(url, params):
126 r = list(urllib.parse.urlsplit(url))
127 qs = urllib.parse.parse_qs(r[3])
128 qs.update(params)
129 r[3] = urllib.parse.urlencode(qs, True)
130 url = urllib.parse.urlunsplit(r)
131 return url
132
133 def get_player_config(scripts):
134 config_strings = [
135 ("ytplayer.config = {", 1, "};", 1),
136 ("ytcfg.set({\"", 2, "});", 1),
137 ]
138 player_config = {}
139 for script in scripts:
140 for line in script.split("\n"):
141 for s1, off1, s2, off2 in config_strings:
142 if s1 in line:
143 p1 = line.find(s1) + len(s1) - off1
144 p2 = line.find(s2, p1) + off2
145 if p1 >= 0 and p2 > 0:
146 player_config.update(json.loads(line[p1:p2]))
147 return player_config
148
149 def extract_js(script):
150 PREFIX = "var _yt_player={};(function(g){var window=this;"
151 SUFFIX = ";})(_yt_player);\n"
152 assert script.startswith(PREFIX)
153 assert script.endswith(SUFFIX)
154
155 return script[len(PREFIX):-len(SUFFIX)]
156
157 def find_cipher_func(script):
158 FUNC_NAME = R"([a-zA-Z0-9$]+)"
159 DECODE_URI_COMPONENT = R"(\(decodeURIComponent)?"
160 FUNC_PARAMS = R"(\([a-zA-Z,\.]+\.s\))"
161 TERMINATOR = R"[,;\)]"
162 PATTERN = FUNC_NAME + DECODE_URI_COMPONENT + FUNC_PARAMS + TERMINATOR
163
164 match = re.search(PATTERN, script)
165 func_name = match.groups()[0]
166 return func_name
167
168 def find_url_func(script):
169 FUNC_NAME = R"([a-zA-Z0-9$]+)"
170 PATTERN = R"this\.url\s*=\s*" + FUNC_NAME + R"\s*\(\s*this\s*\)"
171
172 match = re.search(PATTERN, script)
173 func_name = match.groups()[0]
174 return func_name
175
176 def decode_cipher_url(js_url, cipher):
177 cipher = urllib.parse.parse_qs(cipher)
178 args = [
179 cipher["url"][0],
180 cipher["sp"][0],
181 cipher["s"][0],
182 ]
183
184 f = urlopen(js_url)
185 script = f.read().decode("utf-8")
186 f.close()
187
188 cipher_func_name = find_cipher_func(script)
189 url_func_name = find_url_func(script)
190
191 params = {
192 "cipher_func_name": cipher_func_name,
193 "url_func_name": url_func_name,
194 "args": json.dumps(args),
195 "code": json.dumps(extract_js(script)),
196 }
197 p = subprocess.Popen(
198 "node",
199 shell=True,
200 close_fds=True,
201 stdin=subprocess.PIPE,
202 stdout=subprocess.PIPE
203 )
204 js_decode_script = ("""
205 const vm = require('vm');
206
207 const fakeGlobal = {};
208 fakeGlobal.window = fakeGlobal;
209 fakeGlobal.location = {
210 hash: '',
211 host: 'www.youtube.com',
212 hostname: 'www.youtube.com',
213 href: 'https://www.youtube.com',
214 origin: 'https://www.youtube.com',
215 pathname: '/',
216 protocol: 'https:'
217 };
218 fakeGlobal.history = {
219 pushState: function(){}
220 };
221 fakeGlobal.document = {
222 location: fakeGlobal.location
223 };
224 fakeGlobal.document = {};
225 fakeGlobal.navigator = {
226 userAgent: ''
227 };
228 fakeGlobal.XMLHttpRequest = class XMLHttpRequest {};
229 fakeGlobal.matchMedia = () => ({matches: () => {}, media: ''});
230 fakeGlobal.result_url = null;
231 fakeGlobal.g = function(){}; // this is _yt_player
232
233 const code_string = %(code)s + ';';
234 const exec_string = 'result_url = %(url_func_name)s(%(cipher_func_name)s(...%(args)s));';
235 vm.runInNewContext(code_string + exec_string, fakeGlobal);
236
237 console.log(fakeGlobal.result_url);
238 """ % params)
239
240 p.stdin.write(js_decode_script.encode("utf-8"))
241 p.stdin.close()
242
243 result_url = p.stdout.read().decode("utf-8").strip()
244 if p.wait() != 0:
245 raise Exception("js failed to execute: %d" % p.returncode)
246
247 return result_url
248
249 def get_best_video(player_config):
250 player_args = player_config["args"]
251 player_response = json.loads(player_args["player_response"])
252 formats = player_response["streamingData"]["formats"]
253
254 best_url = None
255 best_quality = None
256 best_extension = None
257 for format_data in formats:
258 mimetype = format_data["mimeType"].split(";")[0]
259 quality = format_data["quality"]
260
261 if quality not in QUALITIES:
262 continue
263 if mimetype not in MIMETYPES:
264 continue
265
266 extension = MIMETYPES[mimetype]
267 quality = QUALITIES.get(quality, -1)
268
269 if best_quality is not None and quality < best_quality:
270 continue
271
272 if "signatureCipher" in format_data:
273 js_url = player_config["PLAYER_JS_URL"]
274 video_url = decode_cipher_url(js_url, format_data["signatureCipher"])
275 else:
276 video_url = format_data["url"]
277
278 best_url = video_url
279 best_quality = quality
280 best_extension = extension
281
282 return best_url, best_extension
283
284 def sanitize_filename(filename):
285 return (
286 re.sub("\s+", " ", filename.strip())
287 .replace("\\", "-")
288 .replace("/", "-")
289 .replace("\0", " ")
290 )
291
292 def get_video_url(page):
293 player_config = get_player_config(page.scripts)
294 if not player_config:
295 raise VideoUnavailable(page.unavailable_message or "Could not find video URL")
296
297 video_url, extension = get_best_video(player_config)
298 if not video_url:
299 return None, None
300
301 title = player_config["args"].get("title", None)
302 if not title:
303 title = json.loads(player_config["args"]["player_response"])["videoDetails"]["title"]
304 if not title:
305 title = "Unknown title"
306
307 filename = sanitize_filename(title) + "." + extension
308
309 return video_url, filename
310
311 class YouTubeVideoPageParser(html.parser.HTMLParser):
312 def __init__(self):
313 super().__init__()
314 self.unavailable_message = None
315 self.scripts = []
316
317 def handle_starttag(self, tag, attrs):
318 attrs = dict(attrs)
319 self._handle_unavailable_message(tag, attrs)
320 self._handle_script(tag, attrs)
321
322 def handle_endtag(self, tag):
323 self.handle_data = self._ignore_data
324
325 def _ignore_data(self, _):
326 pass
327
328 def _handle_unavailable_message(self, tag, attrs):
329 if attrs.get("id", None) == "unavailable-message":
330 self.handle_data = self._handle_unavailable_message_data
331
332 def _handle_unavailable_message_data(self, data):
333 self.unavailable_message = data.strip()
334
335 def _handle_script(self, tag, attrs):
336 if tag == "script":
337 self.handle_data = self._handle_script_data
338
339 def _handle_script_data(self, data):
340 if data:
341 self.scripts.append(data)
342
343 def write_video(filename, video_data):
344 quoted_filename = urllib.parse.quote(filename.encode("utf-8"))
345 sys.stdout.buffer.write(
346 b"Content-Disposition: attachment; filename*=UTF-8''{0}\r\n"
347 .replace(b"{0}", quoted_filename.encode("utf-8"))
348 )
349 sys.stdout.buffer.write(
350 b"Content-Length: {0}\r\n"
351 .replace(b"{0}", video_data.getheader("Content-Length").encode("utf-8"))
352 )
353 sys.stdout.buffer.write(b"\r\n")
354 shutil.copyfileobj(video_data, sys.stdout.buffer)
355 video_data.close()
356
357 def cgimain():
358 args = cgi.parse()
359 try:
360 url = args["url"][0]
361 except:
362 print_form(url="https://www.youtube.com/watch?v=FOOBAR")
363 return
364
365 try:
366 page = YouTubeVideoPageParser()
367 validate_url(url)
368 parse_url(url, page)
369 video_url, filename = get_video_url(page)
370 video_data = urlopen(video_url)
371 except VideoUnavailable as e:
372 print_form(
373 url=url,
374 msg="<p class='error'>Sorry, there was an error: %s</p>" % cgi.escape(e.args[0])
375 )
376 except NotYouTube:
377 print_form(
378 url=url,
379 msg="<p class='error'>Sorry, that does not look like a YouTube page!</p>"
380 )
381 except Exception as e:
382 print_form(
383 url=url,
384 msg="<p class='error'>Sorry, there was an unknown error.</p>"
385 )
386 return
387
388 write_video(filename, video_data)
389
390 def pp_size(size):
391 suffixes = ["", "KiB", "MiB", "GiB"]
392 for i, suffix in enumerate(suffixes):
393 if size < 1024:
394 break
395 size /= 1024
396 return "%.2f %s" % (size, suffix)
397
398 def copy_with_progress(content_length, infile, outfile):
399 def print_status():
400 rate = 0
401 if now != last_ts:
402 rate = last_bytes_read / (now - last_ts)
403 sys.stdout.write("\33[2K\r")
404 sys.stdout.write("%s / %s (%s/sec)" % (
405 pp_size(bytes_read),
406 pp_size(content_length),
407 pp_size(rate),
408 ))
409 sys.stdout.flush()
410
411 last_ts = 0
412 last_bytes_read = 0
413 bytes_read = 0
414 while True:
415 now = time.time()
416 if now - last_ts > 0.5:
417 print_status()
418 last_ts = now
419 last_bytes_read = 0
420
421 buf = infile.read(32768)
422 if not buf:
423 break
424 outfile.write(buf)
425 last_bytes_read += len(buf)
426 bytes_read += len(buf)
427
428 # Newline at the end
429 print_status()
430 print()
431
432 def main():
433 try:
434 url = sys.argv[1]
435 except:
436 print("Usage: %s https://youtube.com/watch?v=FOOBAR" % sys.argv[0], file=sys.stderr)
437 sys.exit(1)
438
439 page = YouTubeVideoPageParser()
440 parse_url(url, page)
441 video_url, filename = get_video_url(page)
442 print("Downloading", filename)
443
444 outfile = open(filename, "ab")
445 offset = outfile.tell()
446 if offset > 0:
447 print("Resuming download from", pp_size(offset))
448 total_size = None
449
450 while True:
451 try:
452 video_data = urlopen(video_url, offset)
453 except urllib.error.HTTPError as e:
454 if e.code == 416:
455 print("File is complete!")
456 break
457 else:
458 raise
459
460 content_length = int(video_data.getheader("Content-Length"))
461 if total_size is None:
462 total_size = content_length
463
464 try:
465 copy_with_progress(content_length, video_data, outfile)
466 except IOError as e:
467 print()
468
469 video_data.close()
470 if outfile.tell() != total_size:
471 old_offset = offset
472 offset = outfile.tell()
473 if old_offset == offset:
474 time.sleep(1)
475 print("Restarting download from", pp_size(offset))
476 else:
477 break
478
479 outfile.close()
480
481
482 if __name__ == "__main__":
483 if "SCRIPT_NAME" in os.environ:
484 cgimain()
485 else:
486 try:
487 main()
488 except KeyboardInterrupt:
489 print("\nExiting...")
490 sys.exit(1)
491