#!/usr/bin/env python from __future__ import division import cookielib import cgi import itertools import json from lxml import html import os import re import resource import shutil import subprocess import sys import time import urllib import urllib2 import urlparse MAX_MEMORY_BYTES = 128 * 1024*1024 USER_AGENT = "Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:15.0) Gecko/20100101 Firefox/15.0.1" MIMETYPES = { "video/mp4": "mp4", "video/x-flv": "flv", "video/3gpp": "3gp", } QUALITIES = { "hd1080": 5, "hd720": 4, "large": 3, "medium": 2, "small": 1, } class VideoUnavailable(Exception): pass def print_form(url="", msg=""): script_url = "http://%s%s" % (os.environ["HTTP_HOST"], os.environ["REQUEST_URI"]) sys.stdout.write("Content-Type: application/xhtml+xml\r\n\r\n") sys.stdout.write(""" delx.net.au - YouTube Scraper

delx.net.au - YouTube Scraper

{0}

This page will let you easily download YouTube videos to watch offline. It will automatically grab the highest quality version.

Tip! Use this bookmarklet: YouTube Download to easily download videos. Right-click the link and add it to bookmarks, then when you're looking at a YouTube page select that bookmark from your browser's bookmarks menu to download the video straight away.

""".replace("{0}", msg).replace("{1}", url).replace("{2}", script_url)) cookiejar = cookielib.CookieJar() urlopener = urllib2.build_opener(urllib2.HTTPCookieProcessor(cookiejar)) referrer = "" def urlopen(url): global referrer req = urllib2.Request(url) if referrer: req.add_header("Referer", referrer) referrer = url req.add_header("User-Agent", USER_AGENT) return urlopener.open(req) def parse_url(url): f = urlopen(url) doc = html.parse(f, html.HTMLParser(encoding="utf-8", recover=True)) f.close() return doc def append_to_qs(url, params): r = list(urlparse.urlsplit(url)) qs = urlparse.parse_qs(r[3]) qs.update(params) r[3] = urllib.urlencode(qs, True) url = urlparse.urlunsplit(r) return url def convert_from_old_itag(player_config): url_data = urlparse.parse_qs(player_config["args"]["url_encoded_fmt_stream_map"]) url_data["url"] = [] for itag_url in url_data["itag"]: pos = itag_url.find("url=") url_data["url"].append(itag_url[pos+4:]) player_config["args"]["url_encoded_fmt_stream_map"] = urllib.urlencode(url_data, True) def get_player_config(doc): player_config = None for script in doc.xpath("//script"): if not script.text: continue for line in script.text.split("\n"): if "yt.playerConfig =" in line: p1 = line.find("=") p2 = line.rfind(";") if p1 >= 0 and p2 > 0: return json.loads(line[p1+1:p2]) if "'PLAYER_CONFIG': " in line: p1 = line.find(":") if p1 >= 0: player_config = json.loads(line[p1+1:]) convert_from_old_itag(player_config) return player_config def get_best_video(player_config): url_data = urlparse.parse_qs(player_config["args"]["url_encoded_fmt_stream_map"]) url_data = itertools.izip_longest( url_data["url"], url_data["type"], url_data["quality"], url_data.get("sig", []), ) best_url = None best_quality = None best_extension = None for video_url, mimetype, quality, signature in url_data: mimetype = mimetype.split(";")[0] if mimetype not in MIMETYPES: continue extension = MIMETYPES[mimetype] quality = QUALITIES.get(quality.split(",")[0], -1) if best_quality is None or quality > best_quality: if signature: video_url = append_to_qs(video_url, {"signature": signature}) best_url = video_url best_quality = quality best_extension = extension return best_url, best_extension def sanitize_filename(filename): return ( re.sub("\s+", " ", filename.strip()) .replace("\\", "-") .replace("/", "-") .replace("\0", " ") ) def get_video_url(doc): unavailable = doc.xpath("//div[@id='unavailable-message']/text()") if unavailable: raise VideoUnavailable(unavailable[0].strip()) player_config = get_player_config(doc) if not player_config: raise VideoUnavailable("Could not find video URL") video_url, extension = get_best_video(player_config) if not video_url: return None, None title = doc.xpath("/html/head/title/text()")[0] filename = sanitize_filename(title) filename += "." + extension return video_url, filename def write_video(filename, video_data): httpinfo = video_data.info() encoded_filename = urllib.quote(filename.encode("utf-8")) sys.stdout.write("Content-Disposition: attachment; filename*=UTF-8''%s\r\n" % encoded_filename) sys.stdout.write("Content-Length: %s\r\n" % httpinfo.getheader("Content-Length")) sys.stdout.write("\r\n") shutil.copyfileobj(video_data, sys.stdout) video_data.close() def cgimain(): args = cgi.parse() try: url = args["url"][0] except: print_form(url="http://www.youtube.com/watch?v=FOOBAR") return try: doc = parse_url(url) video_url, filename = get_video_url(doc) video_data = urlopen(video_url) write_video(filename, video_data) except VideoUnavailable, e: print_form( url=url, msg="

Sorry, there was an error: %s

" % cgi.escape(e.message) ) except Exception, e: print_form( url=url, msg="

Sorry, there was an error. Check your URL?

" ) return def copy_with_progress(total_size, infile, outfile): def pp_size(size): suffixes = ["", "KiB", "MiB", "GiB"] for i, suffix in enumerate(suffixes): if size < 1024: break size /= 1024 return "%.2f %s" % (size, suffix) start_ts = time.time() last_ts = 0 bytes_read = 0 while True: now = time.time() if now - last_ts > 0.5: last_ts = now sys.stdout.write("\33[2K\r") sys.stdout.write("%s / %s (%s/sec)" % ( pp_size(bytes_read), pp_size(total_size), pp_size(bytes_read / (now - start_ts)), )) sys.stdout.flush() buf = infile.read(32768) if not buf: break outfile.write(buf) bytes_read += len(buf) def main(): try: url = sys.argv[1] except: print >>sys.stderr, "Usage: %s http://youtube.com/watch?v=FOOBAR" % sys.argv[0] sys.exit(1) doc = parse_url(url) video_url, filename = get_video_url(doc) video_data = urlopen(video_url) outfile = open(filename, "w") total_size = int(video_data.info().getheader("Content-Length")) print "Downloading", filename.encode("utf-8") copy_with_progress(total_size, video_data, outfile) video_data.close() outfile.close() if __name__ == "__main__": resource.setrlimit(resource.RLIMIT_AS, (MAX_MEMORY_BYTES, MAX_MEMORY_BYTES)) if os.environ.has_key("SCRIPT_NAME"): cgimain() else: try: main() except KeyboardInterrupt: print "\nExiting..." sys.exit(1)