]> code.delx.au - youtube-cgi/blob - youtube.cgi
ef7b6084433884ec75d0402a823d19c5278d35a7
[youtube-cgi] / youtube.cgi
1 #!/usr/bin/env python3
2
3 import cgi
4 import html.parser
5 import http.cookiejar
6 import json
7 import os
8 import re
9 import shutil
10 import subprocess
11 import sys
12 import time
13 import urllib.error
14 import urllib.parse
15 import urllib.request
16
17
18 USER_AGENT = "Mozilla/5.0 (X11; Linux x86_64; rv:67.0) Gecko/20100101 Firefox/67.0"
19
20 MIMETYPES = {
21 "video/mp4": "mp4",
22 "video/x-flv": "flv",
23 "video/3gpp": "3gp",
24 }
25
26 QUALITIES = {
27 "hd1080": 5,
28 "hd720": 4,
29 "large": 3,
30 "medium": 2,
31 "small": 1,
32 }
33
34
35 class VideoUnavailable(Exception):
36 pass
37
38 class NotYouTube(Exception):
39 pass
40
41 def print_form(url="", msg=""):
42 script_url = "https://%s%s" % (os.environ["HTTP_HOST"], os.environ["REQUEST_URI"])
43 sys.stdout.write("Content-Type: text/html\r\n\r\n")
44 sys.stdout.write("""
45 <!DOCTYPE html>
46 <html>
47 <head>
48 <title>delx.net.au - YouTube Scraper</title>
49 <link rel="stylesheet" type="text/css" href="/style.css">
50 <style type="text/css">
51 input[type="text"] {
52 width: 100%;
53 }
54 .error {
55 color: red;
56 }
57 </style>
58 </head>
59 <body>
60 <h1>delx.net.au - YouTube Scraper</h1>
61 {0}
62 <form action="" method="get">
63 <p>This page will let you easily download YouTube videos to watch offline. It
64 will automatically grab the highest quality version.</p>
65 <div><input type="text" name="url" value="{1}"/></div>
66 <div><input type="submit" value="Download!"/></div>
67 </form>
68 <p>Tip! Use this bookmarklet: <a href="javascript:(function(){window.location='{2}?url='+escape(location);})()">YouTube Download</a>
69 to easily download videos. Right-click the link and add it to bookmarks,
70 then when you're looking at a YouTube page select that bookmark from your
71 browser's bookmarks menu to download the video straight away.</p>
72 </body>
73 </html>
74 """.replace("{0}", msg).replace("{1}", url).replace("{2}", script_url))
75
76 cookiejar = http.cookiejar.CookieJar()
77 urlopener = urllib.request.build_opener(urllib.request.HTTPCookieProcessor(cookiejar))
78 referrer = ""
79
80 def urlopen(url, offset=None):
81 if url.startswith("//"):
82 url = "https:" + url
83 if not url.startswith("http://") and not url.startswith("https://"):
84 url = "https://www.youtube.com" + url
85
86 global referrer
87 req = urllib.request.Request(url)
88 if not referrer:
89 referrer = url
90 else:
91 req.add_header("Referer", referrer)
92
93 req.add_header("User-Agent", USER_AGENT)
94
95 if offset:
96 req.add_header("Range", "bytes=%d-" % offset)
97
98 res = urlopener.open(req)
99
100 content_range = res.getheader("Content-Range")
101 if content_range:
102 tokens = content_range.split()
103 assert tokens[0] == "bytes"
104 start = int(tokens[1].split("-")[0])
105 assert start == offset
106 return res
107
108 def validate_url(url):
109 parsed_url = urllib.parse.urlparse(url)
110 scheme_ok = parsed_url.scheme == "https"
111 host_ok = parsed_url.netloc.lstrip("www.") in ["youtube.com", "youtu.be"]
112
113 if scheme_ok and host_ok:
114 return
115 else:
116 raise NotYouTube()
117
118 def parse_url(url, parser):
119 f = urlopen(url)
120 parser.feed(f.read().decode("utf-8"))
121 parser.close()
122 f.close()
123
124 def append_to_qs(url, params):
125 r = list(urllib.parse.urlsplit(url))
126 qs = urllib.parse.parse_qs(r[3])
127 qs.update(params)
128 r[3] = urllib.parse.urlencode(qs, True)
129 url = urllib.parse.urlunsplit(r)
130 return url
131
132 def get_player_config(scripts):
133 player_config = None
134 for script in scripts:
135 for line in script.split("\n"):
136 s = "ytplayer.config = {"
137 if s in line:
138 p1 = line.find(s) + len(s) - 1
139 p2 = line.find("};", p1) + 1
140 if p1 >= 0 and p2 > 0:
141 return json.loads(line[p1:p2])
142
143 def extract_js(script):
144 PREFIX = "var _yt_player={};(function(g){var window=this;"
145 SUFFIX = ";})(_yt_player);\n"
146 assert script.startswith(PREFIX)
147 assert script.endswith(SUFFIX)
148
149 return script[len(PREFIX):-len(SUFFIX)]
150
151 def find_func_name(script):
152 FUNC_NAME = R"([a-zA-Z0-9$]+)"
153 FUNC_PARAMS = R"(\([a-zA-Z,\.]+\.s\))"
154 TERMINATOR = R"[,;\)]"
155 PATTERN = FUNC_NAME + FUNC_PARAMS + TERMINATOR
156
157 match = re.search(PATTERN, script)
158 func_name = match.groups()[0]
159 return func_name
160
161 def decode_signature(js_url, signature):
162 f = urlopen(js_url)
163 script = f.read().decode("utf-8")
164 f.close()
165
166 func_name = find_func_name(script)
167
168 params = {
169 "func_name": func_name,
170 "signature": json.dumps(signature),
171 "code": json.dumps(extract_js(script)),
172 }
173 p = subprocess.Popen(
174 "node",
175 shell=True,
176 close_fds=True,
177 stdin=subprocess.PIPE,
178 stdout=subprocess.PIPE
179 )
180 js_decode_script = ("""
181 const vm = require('vm');
182
183 const sandbox = {
184 location: {
185 hash: '',
186 href: '',
187 protocol: 'http:'
188 },
189 history: {
190 pushState: function(){}
191 },
192 document: {},
193 navigator: {
194 userAgent: ''
195 },
196 XMLHttpRequest: class XMLHttpRequest {},
197 matchMedia: () => ({matches: () => {}, media: ''}),
198 signature: %(signature)s,
199 transformed_signature: null,
200 g: function(){} // this is _yt_player
201 };
202 sandbox.window = sandbox;
203
204 const code_string = %(code)s + ';';
205 const exec_string = 'transformed_signature = %(func_name)s("", "MARKER", signature);';
206 vm.runInNewContext(code_string + exec_string, sandbox);
207
208 function findSignature(obj) {
209 if (typeof obj !== 'object') {
210 return;
211 }
212 for (const [key, value] of Object.entries(obj)) {
213 if (key === 'MARKER') {
214 return value;
215 }
216 const result = findSignature(value);
217 if (result) {
218 return result;
219 }
220 }
221 }
222 console.log(findSignature(sandbox.transformed_signature));
223 """ % params)
224
225 p.stdin.write(js_decode_script.encode("utf-8"))
226 p.stdin.close()
227
228 transformed_signature = p.stdout.read().decode("utf-8").strip()
229 transformed_signature = urllib.parse.unquote(transformed_signature)
230 if p.wait() != 0:
231 raise Exception("js failed to execute: %d" % p.returncode)
232
233 return transformed_signature
234
235 def get_best_video(player_config):
236 url_data_list = player_config["args"]["url_encoded_fmt_stream_map"].split(",")
237 js_url = player_config["assets"]["js"]
238
239 best_url = None
240 best_quality = None
241 best_extension = None
242 for url_data in url_data_list:
243 url_data = urllib.parse.parse_qs(url_data)
244 mimetype = url_data["type"][0].split(";")[0]
245 quality = url_data["quality"][0]
246
247 if "stereo3d" in url_data:
248 continue
249 if quality not in QUALITIES:
250 continue
251 if mimetype not in MIMETYPES:
252 continue
253
254 extension = MIMETYPES[mimetype]
255 quality = QUALITIES.get(quality, -1)
256
257 if best_quality is not None and quality < best_quality:
258 continue
259
260 video_url = url_data["url"][0]
261 if "sig" in url_data:
262 signature = url_data["sig"][0]
263 elif "s" in url_data:
264 signature = decode_signature(js_url, url_data["s"][0])
265 else:
266 signature = None
267
268 if signature:
269 sp = url_data.get("sp", ["signature"])[0]
270 video_url = append_to_qs(video_url, {sp: signature})
271
272 best_url = video_url
273 best_quality = quality
274 best_extension = extension
275
276 return best_url, best_extension
277
278 def sanitize_filename(filename):
279 return (
280 re.sub("\s+", " ", filename.strip())
281 .replace("\\", "-")
282 .replace("/", "-")
283 .replace("\0", " ")
284 )
285
286 def get_video_url(page):
287 player_config = get_player_config(page.scripts)
288 if not player_config:
289 raise VideoUnavailable(page.unavailable_message or "Could not find video URL")
290
291 video_url, extension = get_best_video(player_config)
292 if not video_url:
293 return None, None
294
295 title = player_config["args"].get("title", None)
296 if not title:
297 title = json.loads(player_config["args"]["player_response"])["videoDetails"]["title"]
298 if not title:
299 title = "Unknown title"
300
301 filename = sanitize_filename(title) + "." + extension
302
303 return video_url, filename
304
305 class YouTubeVideoPageParser(html.parser.HTMLParser):
306 def __init__(self):
307 super().__init__()
308 self.unavailable_message = None
309 self.scripts = []
310
311 def handle_starttag(self, tag, attrs):
312 attrs = dict(attrs)
313 self._handle_unavailable_message(tag, attrs)
314 self._handle_script(tag, attrs)
315
316 def handle_endtag(self, tag):
317 self.handle_data = self._ignore_data
318
319 def _ignore_data(self, _):
320 pass
321
322 def _handle_unavailable_message(self, tag, attrs):
323 if attrs.get("id", None) == "unavailable-message":
324 self.handle_data = self._handle_unavailable_message_data
325
326 def _handle_unavailable_message_data(self, data):
327 self.unavailable_message = data.strip()
328
329 def _handle_script(self, tag, attrs):
330 if tag == "script":
331 self.handle_data = self._handle_script_data
332
333 def _handle_script_data(self, data):
334 if data:
335 self.scripts.append(data)
336
337 def write_video(filename, video_data):
338 quoted_filename = urllib.parse.quote(filename.encode("utf-8"))
339 sys.stdout.buffer.write(
340 b"Content-Disposition: attachment; filename*=UTF-8''{0}\r\n"
341 .replace(b"{0}", quoted_filename.encode("utf-8"))
342 )
343 sys.stdout.buffer.write(
344 b"Content-Length: {0}\r\n"
345 .replace(b"{0}", video_data.getheader("Content-Length").encode("utf-8"))
346 )
347 sys.stdout.buffer.write(b"\r\n")
348 shutil.copyfileobj(video_data, sys.stdout.buffer)
349 video_data.close()
350
351 def cgimain():
352 args = cgi.parse()
353 try:
354 url = args["url"][0]
355 except:
356 print_form(url="https://www.youtube.com/watch?v=FOOBAR")
357 return
358
359 try:
360 page = YouTubeVideoPageParser()
361 validate_url(url)
362 parse_url(url, page)
363 video_url, filename = get_video_url(page)
364 video_data = urlopen(video_url)
365 except VideoUnavailable as e:
366 print_form(
367 url=url,
368 msg="<p class='error'>Sorry, there was an error: %s</p>" % cgi.escape(e.args[0])
369 )
370 except NotYouTube:
371 print_form(
372 url=url,
373 msg="<p class='error'>Sorry, that does not look like a YouTube page!</p>"
374 )
375 except Exception as e:
376 print_form(
377 url=url,
378 msg="<p class='error'>Sorry, there was an unknown error.</p>"
379 )
380 return
381
382 write_video(filename, video_data)
383
384 def pp_size(size):
385 suffixes = ["", "KiB", "MiB", "GiB"]
386 for i, suffix in enumerate(suffixes):
387 if size < 1024:
388 break
389 size /= 1024
390 return "%.2f %s" % (size, suffix)
391
392 def copy_with_progress(content_length, infile, outfile):
393 def print_status():
394 rate = 0
395 if now != last_ts:
396 rate = last_bytes_read / (now - last_ts)
397 sys.stdout.write("\33[2K\r")
398 sys.stdout.write("%s / %s (%s/sec)" % (
399 pp_size(bytes_read),
400 pp_size(content_length),
401 pp_size(rate),
402 ))
403 sys.stdout.flush()
404
405 last_ts = 0
406 last_bytes_read = 0
407 bytes_read = 0
408 while True:
409 now = time.time()
410 if now - last_ts > 0.5:
411 print_status()
412 last_ts = now
413 last_bytes_read = 0
414
415 buf = infile.read(32768)
416 if not buf:
417 break
418 outfile.write(buf)
419 last_bytes_read += len(buf)
420 bytes_read += len(buf)
421
422 # Newline at the end
423 print_status()
424 print()
425
426 def main():
427 try:
428 url = sys.argv[1]
429 except:
430 print("Usage: %s https://youtube.com/watch?v=FOOBAR" % sys.argv[0], file=sys.stderr)
431 sys.exit(1)
432
433 page = YouTubeVideoPageParser()
434 parse_url(url, page)
435 video_url, filename = get_video_url(page)
436 print("Downloading", filename)
437
438 outfile = open(filename, "ab")
439 offset = outfile.tell()
440 if offset > 0:
441 print("Resuming download from", pp_size(offset))
442 total_size = None
443
444 while True:
445 try:
446 video_data = urlopen(video_url, offset)
447 except urllib.error.HTTPError as e:
448 if e.code == 416:
449 print("File is complete!")
450 break
451 else:
452 raise
453
454 content_length = int(video_data.getheader("Content-Length"))
455 if total_size is None:
456 total_size = content_length
457
458 try:
459 copy_with_progress(content_length, video_data, outfile)
460 except IOError as e:
461 print()
462
463 video_data.close()
464 if outfile.tell() != total_size:
465 old_offset = offset
466 offset = outfile.tell()
467 if old_offset == offset:
468 time.sleep(1)
469 print("Restarting download from", pp_size(offset))
470 else:
471 break
472
473 outfile.close()
474
475
476 if __name__ == "__main__":
477 if "SCRIPT_NAME" in os.environ:
478 cgimain()
479 else:
480 try:
481 main()
482 except KeyboardInterrupt:
483 print("\nExiting...")
484 sys.exit(1)
485