]> code.delx.au - youtube-cgi/blob - youtube.cgi
Google seems to care about user agents now...
[youtube-cgi] / youtube.cgi
1 #!/usr/bin/env python3
2
3 import cgi
4 import html.parser
5 import http.cookiejar
6 import json
7 import os
8 import re
9 import shutil
10 import subprocess
11 import sys
12 import time
13 import urllib.error
14 import urllib.parse
15 import urllib.request
16
17
18 MOZILLA_RELEASE_URL = "https://www.mozilla.org/en-US/firefox/releases/"
19 USER_AGENT_TEMPLATE = "Mozilla/5.0 (X11; Linux x86_64; rv:83.0) Gecko/20100101 Firefox/%s"
20
21 MIMETYPES = {
22 "video/mp4": "mp4",
23 "video/x-flv": "flv",
24 "video/3gpp": "3gp",
25 }
26
27 QUALITIES = {
28 "hd1080": 5,
29 "hd720": 4,
30 "large": 3,
31 "medium": 2,
32 "small": 1,
33 }
34
35
36 class VideoUnavailable(Exception):
37 pass
38
39 class NotYouTube(Exception):
40 pass
41
42 def print_form(url="", msg=""):
43 script_url = "https://%s%s" % (os.environ["HTTP_HOST"], os.environ["REQUEST_URI"])
44 sys.stdout.write("Content-Type: text/html\r\n\r\n")
45 sys.stdout.write("""
46 <!DOCTYPE html>
47 <html>
48 <head>
49 <title>delx.net.au - YouTube Scraper</title>
50 <link rel="stylesheet" type="text/css" href="/style.css">
51 <style type="text/css">
52 input[type="text"] {
53 width: 100%;
54 }
55 .error {
56 color: red;
57 }
58 </style>
59 </head>
60 <body>
61 <h1>delx.net.au - YouTube Scraper</h1>
62 {0}
63 <form action="" method="get">
64 <p>This page will let you easily download YouTube videos to watch offline. It
65 will automatically grab the highest quality version.</p>
66 <div><input type="text" name="url" value="{1}"/></div>
67 <div><input type="submit" value="Download!"/></div>
68 </form>
69 <p>Tip! Use this bookmarklet: <a href="javascript:(function(){window.location='{2}?url='+escape(location);})()">YouTube Download</a>
70 to easily download videos. Right-click the link and add it to bookmarks,
71 then when you're looking at a YouTube page select that bookmark from your
72 browser's bookmarks menu to download the video straight away.</p>
73 </body>
74 </html>
75 """.replace("{0}", msg).replace("{1}", url).replace("{2}", script_url))
76
77 cookiejar = http.cookiejar.CookieJar()
78 urlopener = urllib.request.build_opener(urllib.request.HTTPCookieProcessor(cookiejar))
79 referrer = ""
80 user_agent = None
81
82 def urlopen(url, offset=None):
83 global user_agent
84 if not user_agent:
85 page = MozillaReleasesPageParser()
86 with urllib.request.urlopen(MOZILLA_RELEASE_URL) as f:
87 page.feed(f.read().decode("utf-8"))
88 page.close()
89 user_agent = USER_AGENT_TEMPLATE % page.latest_release
90
91 if url.startswith("//"):
92 url = "https:" + url
93 if not url.startswith("http://") and not url.startswith("https://"):
94 url = "https://www.youtube.com" + url
95
96 global referrer
97 req = urllib.request.Request(url)
98 if not referrer:
99 referrer = url
100 else:
101 req.add_header("Referer", referrer)
102
103 req.add_header("User-Agent", user_agent)
104
105 if offset:
106 req.add_header("Range", "bytes=%d-" % offset)
107
108 res = urlopener.open(req)
109
110 content_range = res.getheader("Content-Range")
111 if content_range:
112 tokens = content_range.split()
113 assert tokens[0] == "bytes"
114 start = int(tokens[1].split("-")[0])
115 assert start == offset
116 return res
117
118 def validate_url(url):
119 parsed_url = urllib.parse.urlparse(url)
120 scheme_ok = parsed_url.scheme == "https"
121 host = parsed_url.netloc.lstrip("www.").lstrip("m.")
122 host_ok = host in ["youtube.com", "youtu.be"]
123
124 if scheme_ok and host_ok:
125 return
126 else:
127 raise NotYouTube()
128
129 def load_parse_url(url, parser):
130 f = urlopen(url)
131 parser.feed(f.read().decode("utf-8"))
132 parser.close()
133 f.close()
134
135 def append_to_qs(url, params):
136 r = list(urllib.parse.urlsplit(url))
137 qs = urllib.parse.parse_qs(r[3])
138 qs.update(params)
139 r[3] = urllib.parse.urlencode(qs, True)
140 url = urllib.parse.urlunsplit(r)
141 return url
142
143 def get_player_config(scripts):
144 config_strings = [
145 ("ytplayer.config = {", 1, "};", 1),
146 ("ytcfg.set({\"", 2, "});", 1),
147 ]
148 player_config = {}
149 for script in scripts:
150 for line in script.split("\n"):
151 for s1, off1, s2, off2 in config_strings:
152 if s1 in line:
153 p1 = line.find(s1) + len(s1) - off1
154 p2 = line.find(s2, p1) + off2
155 if p1 >= 0 and p2 > 0:
156 player_config.update(json.loads(line[p1:p2]))
157 return player_config
158
159 def extract_js(script):
160 PREFIX = "var _yt_player={};(function(g){var window=this;"
161 SUFFIX = ";})(_yt_player);\n"
162 assert script.startswith(PREFIX)
163 assert script.endswith(SUFFIX)
164
165 return script[len(PREFIX):-len(SUFFIX)]
166
167 def find_cipher_func(script):
168 FUNC_NAME = R"([a-zA-Z0-9$]+)"
169 DECODE_URI_COMPONENT = R"(\(decodeURIComponent)?"
170 FUNC_PARAMS = R"(\([a-zA-Z,\.]+\.s\))"
171 TERMINATOR = R"[,;\)]"
172 PATTERN = FUNC_NAME + DECODE_URI_COMPONENT + FUNC_PARAMS + TERMINATOR
173
174 match = re.search(PATTERN, script)
175 func_name = match.groups()[0]
176 return func_name
177
178 def find_url_func(script):
179 FUNC_NAME = R"([a-zA-Z0-9$]+)"
180 PATTERN = R"this\.url\s*=\s*" + FUNC_NAME + R"\s*\(\s*this\s*\)"
181
182 match = re.search(PATTERN, script)
183 func_name = match.groups()[0]
184 return func_name
185
186 def decode_cipher_url(js_url, cipher):
187 cipher = urllib.parse.parse_qs(cipher)
188 args = [
189 cipher["url"][0],
190 cipher["sp"][0],
191 cipher["s"][0],
192 ]
193
194 f = urlopen(js_url)
195 script = f.read().decode("utf-8")
196 f.close()
197
198 cipher_func_name = find_cipher_func(script)
199 url_func_name = find_url_func(script)
200
201 params = {
202 "cipher_func_name": cipher_func_name,
203 "url_func_name": url_func_name,
204 "args": json.dumps(args),
205 "code": json.dumps(extract_js(script)),
206 }
207 p = subprocess.Popen(
208 "node",
209 shell=True,
210 close_fds=True,
211 stdin=subprocess.PIPE,
212 stdout=subprocess.PIPE
213 )
214 js_decode_script = ("""
215 const vm = require('vm');
216
217 const fakeGlobal = {};
218 fakeGlobal.window = fakeGlobal;
219 fakeGlobal.location = {
220 hash: '',
221 host: 'www.youtube.com',
222 hostname: 'www.youtube.com',
223 href: 'https://www.youtube.com',
224 origin: 'https://www.youtube.com',
225 pathname: '/',
226 protocol: 'https:'
227 };
228 fakeGlobal.history = {
229 pushState: function(){}
230 };
231 fakeGlobal.document = {
232 location: fakeGlobal.location
233 };
234 fakeGlobal.document = {};
235 fakeGlobal.navigator = {
236 userAgent: ''
237 };
238 fakeGlobal.XMLHttpRequest = class XMLHttpRequest {};
239 fakeGlobal.matchMedia = () => ({matches: () => {}, media: ''});
240 fakeGlobal.result_url = null;
241 fakeGlobal.g = function(){}; // this is _yt_player
242 fakeGlobal.TimeRanges = function(){};
243
244 const code_string = %(code)s + ';';
245 const exec_string = 'result_url = %(url_func_name)s(%(cipher_func_name)s(...%(args)s));';
246 vm.runInNewContext(code_string + exec_string, fakeGlobal);
247
248 console.log(fakeGlobal.result_url);
249 """ % params)
250
251 p.stdin.write(js_decode_script.encode("utf-8"))
252 p.stdin.close()
253
254 result_url = p.stdout.read().decode("utf-8").strip()
255 if p.wait() != 0:
256 raise Exception("js failed to execute: %d" % p.returncode)
257
258 return result_url
259
260 def get_best_video(player_config):
261 player_args = player_config["args"]
262 player_response = json.loads(player_args["player_response"])
263 formats = player_response["streamingData"]["formats"]
264
265 best_url = None
266 best_quality = None
267 best_extension = None
268 for format_data in formats:
269 mimetype = format_data["mimeType"].split(";")[0]
270 quality = format_data["quality"]
271
272 if quality not in QUALITIES:
273 continue
274 if mimetype not in MIMETYPES:
275 continue
276
277 extension = MIMETYPES[mimetype]
278 quality = QUALITIES.get(quality, -1)
279
280 if best_quality is not None and quality < best_quality:
281 continue
282
283 if "signatureCipher" in format_data:
284 js_url = player_config["PLAYER_JS_URL"]
285 video_url = decode_cipher_url(js_url, format_data["signatureCipher"])
286 else:
287 video_url = format_data["url"]
288
289 best_url = video_url
290 best_quality = quality
291 best_extension = extension
292
293 return best_url, best_extension
294
295 def sanitize_filename(filename):
296 return (
297 re.sub("\s+", " ", filename.strip())
298 .replace("\\", "-")
299 .replace("/", "-")
300 .replace("\0", " ")
301 )
302
303 def get_video_url(page):
304 player_config = get_player_config(page.scripts)
305 if not player_config:
306 raise VideoUnavailable(page.unavailable_message or "Could not find video URL")
307
308 video_url, extension = get_best_video(player_config)
309 if not video_url:
310 return None, None
311
312 title = player_config["args"].get("title", None)
313 if not title:
314 title = json.loads(player_config["args"]["player_response"])["videoDetails"]["title"]
315 if not title:
316 title = "Unknown title"
317
318 filename = sanitize_filename(title) + "." + extension
319
320 return video_url, filename
321
322 class YouTubeVideoPageParser(html.parser.HTMLParser):
323 def __init__(self):
324 super().__init__()
325 self.unavailable_message = None
326 self.scripts = []
327
328 def handle_starttag(self, tag, attrs):
329 attrs = dict(attrs)
330 self._handle_unavailable_message(tag, attrs)
331 self._handle_script(tag, attrs)
332
333 def handle_endtag(self, tag):
334 self.handle_data = self._ignore_data
335
336 def _ignore_data(self, _):
337 pass
338
339 def _handle_unavailable_message(self, tag, attrs):
340 if attrs.get("id", None) == "unavailable-message":
341 self.handle_data = self._handle_unavailable_message_data
342
343 def _handle_unavailable_message_data(self, data):
344 self.unavailable_message = data.strip()
345
346 def _handle_script(self, tag, attrs):
347 if tag == "script":
348 self.handle_data = self._handle_script_data
349
350 def _handle_script_data(self, data):
351 if data:
352 self.scripts.append(data)
353
354 class MozillaReleasesPageParser(html.parser.HTMLParser):
355 def __init__(self):
356 super().__init__()
357 self.latest_release = "1.0"
358
359 def handle_starttag(self, tag, attrs):
360 attrs = dict(attrs)
361 if attrs.get("data-latest-firefox", None):
362 self.latest_release = attrs.get("data-latest-firefox", None)
363
364 def write_video(filename, video_data):
365 quoted_filename = urllib.parse.quote(filename.encode("utf-8"))
366 sys.stdout.buffer.write(
367 b"Content-Disposition: attachment; filename*=UTF-8''{0}\r\n"
368 .replace(b"{0}", quoted_filename.encode("utf-8"))
369 )
370 sys.stdout.buffer.write(
371 b"Content-Length: {0}\r\n"
372 .replace(b"{0}", video_data.getheader("Content-Length").encode("utf-8"))
373 )
374 sys.stdout.buffer.write(b"\r\n")
375 shutil.copyfileobj(video_data, sys.stdout.buffer)
376 video_data.close()
377
378 def cgimain():
379 args = cgi.parse()
380 try:
381 url = args["url"][0]
382 except:
383 print_form(url="https://www.youtube.com/watch?v=FOOBAR")
384 return
385
386 try:
387 page = YouTubeVideoPageParser()
388 validate_url(url)
389 with urlopen(url) as f:
390 page.feed(f.read().decode("utf-8"))
391 page.close()
392 video_url, filename = get_video_url(page)
393 video_data = urlopen(video_url)
394 except VideoUnavailable as e:
395 print_form(
396 url=url,
397 msg="<p class='error'>Sorry, there was an error: %s</p>" % cgi.escape(e.args[0])
398 )
399 except NotYouTube:
400 print_form(
401 url=url,
402 msg="<p class='error'>Sorry, that does not look like a YouTube page!</p>"
403 )
404 except Exception as e:
405 print_form(
406 url=url,
407 msg="<p class='error'>Sorry, there was an unknown error.</p>"
408 )
409 return
410
411 write_video(filename, video_data)
412
413 def pp_size(size):
414 suffixes = ["", "KiB", "MiB", "GiB"]
415 for i, suffix in enumerate(suffixes):
416 if size < 1024:
417 break
418 size /= 1024
419 return "%.2f %s" % (size, suffix)
420
421 def copy_with_progress(content_length, infile, outfile):
422 def print_status():
423 rate = 0
424 if now != last_ts:
425 rate = last_bytes_read / (now - last_ts)
426 sys.stdout.write("\33[2K\r")
427 sys.stdout.write("%s / %s (%s/sec)" % (
428 pp_size(bytes_read),
429 pp_size(content_length),
430 pp_size(rate),
431 ))
432 sys.stdout.flush()
433
434 last_ts = 0
435 last_bytes_read = 0
436 bytes_read = 0
437 while True:
438 now = time.time()
439 if now - last_ts > 0.5:
440 print_status()
441 last_ts = now
442 last_bytes_read = 0
443
444 buf = infile.read(32768)
445 if not buf:
446 break
447 outfile.write(buf)
448 last_bytes_read += len(buf)
449 bytes_read += len(buf)
450
451 # Newline at the end
452 print_status()
453 print()
454
455 def main():
456 try:
457 url = sys.argv[1]
458 except:
459 print("Usage: %s https://youtube.com/watch?v=FOOBAR" % sys.argv[0], file=sys.stderr)
460 sys.exit(1)
461
462 page = YouTubeVideoPageParser()
463 with urlopen(url) as f:
464 page.feed(f.read().decode("utf-8"))
465 page.close()
466 video_url, filename = get_video_url(page)
467 print("Downloading", filename)
468
469 outfile = open(filename, "ab")
470 offset = outfile.tell()
471 if offset > 0:
472 print("Resuming download from", pp_size(offset))
473 total_size = None
474
475 while True:
476 try:
477 video_data = urlopen(video_url, offset)
478 except urllib.error.HTTPError as e:
479 if e.code == 416:
480 print("File is complete!")
481 break
482 else:
483 raise
484
485 content_length = int(video_data.getheader("Content-Length"))
486 if total_size is None:
487 total_size = content_length
488
489 try:
490 copy_with_progress(content_length, video_data, outfile)
491 except IOError as e:
492 print()
493
494 video_data.close()
495 if outfile.tell() != total_size:
496 old_offset = offset
497 offset = outfile.tell()
498 if old_offset == offset:
499 time.sleep(1)
500 print("Restarting download from", pp_size(offset))
501 else:
502 break
503
504 outfile.close()
505
506
507 if __name__ == "__main__":
508 if "SCRIPT_NAME" in os.environ:
509 cgimain()
510 else:
511 try:
512 main()
513 except KeyboardInterrupt:
514 print("\nExiting...")
515 sys.exit(1)
516