]> code.delx.au - webdl/blob - sbs.py
Failsafe in case channel 10 returns bad results again
[webdl] / sbs.py
1 import requests_cache
2 from common import grab_html, grab_json, grab_xml, download_hls, download_mpd, Node, append_to_qs
3
4 import json
5 import logging
6 import sys
7
8 BASE = "https://www.sbs.com.au"
9 FULL_VIDEO_LIST = BASE + "/api/video_feed/f/Bgtm9B/sbs-section-programs/"
10 VIDEO_URL = BASE + "/ondemand/video/single/%s"
11
12 NS = {
13 "smil": "http://www.w3.org/2005/SMIL21/Language",
14 }
15
16
17 class SbsVideoNode(Node):
18 def __init__(self, title, parent, url):
19 Node.__init__(self, title, parent)
20 self.video_id = url.split("/")[-1]
21 self.can_download = True
22
23 def download(self):
24 with requests_cache.disabled():
25 doc = grab_html(VIDEO_URL % self.video_id)
26 player_params = self.get_player_params(doc)
27
28 error = player_params.get("error", None)
29 if error:
30 print("Cannot download:", error)
31 return False
32
33 release_url = player_params["releaseUrls"]["html"]
34 filename = self.title + ".ts"
35
36 hls_url = self.get_hls_url(release_url)
37 if hls_url:
38 return download_hls(filename, hls_url)
39 else:
40 return download_mpd(filename, release_url)
41
42 def get_player_params(self, doc):
43 for script in doc.xpath("//script"):
44 if not script.text:
45 continue
46 for line in script.text.split("\n"):
47 s = "var playerParams = {"
48 if s in line:
49 p1 = line.find(s) + len(s) - 1
50 p2 = line.find("};", p1) + 1
51 if p1 >= 0 and p2 > 0:
52 return json.loads(line[p1:p2])
53 raise Exception("Unable to find player params for %s: %s" % (self.video_id, self.title))
54
55 def get_hls_url(self, release_url):
56 with requests_cache.disabled():
57 doc = grab_xml("https:" + release_url.replace("http:", "").replace("https:", ""))
58 video = doc.xpath("//smil:video", namespaces=NS)
59 if not video:
60 return
61 video_url = video[0].attrib["src"]
62 return video_url
63
64 class SbsNavNode(Node):
65 def create_video_node(self, entry_data):
66 SbsVideoNode(entry_data["title"], self, entry_data["id"])
67
68 def find_existing_child(self, path):
69 for child in self.children:
70 if child.title == path:
71 return child
72
73 class SbsRootNode(SbsNavNode):
74 def __init__(self, parent):
75 Node.__init__(self, "SBS", parent)
76
77 def fill_children(self):
78 all_video_entries = self.load_all_video_entries()
79 category_and_entry_data = self.explode_videos_to_unique_categories(all_video_entries)
80 for category_path, entry_data in category_and_entry_data:
81 nav_node = self.create_nav_node(self, category_path)
82 nav_node.create_video_node(entry_data)
83
84 def load_all_video_entries(self):
85 offset = 1
86 page_size = 500
87 results = {}
88 duplicate_warning = False
89
90 while True:
91 entries = self.fetch_entries_page(offset, page_size)
92 if len(entries) == 0:
93 break
94
95 for entry in entries:
96 guid = entry["guid"]
97 if guid in results and not duplicate_warning:
98 # https://bitbucket.org/delx/webdl/issues/102/recent-sbs-series-missing
99 logging.warn("SBS returned a duplicate response, data is probably missing. Try decreasing page_size.")
100 duplicate_warning = True
101
102 results[guid] = entry
103
104 offset += page_size
105 sys.stdout.write(".")
106 sys.stdout.flush()
107
108 print()
109 return list(results.values())
110
111 def fetch_entries_page(self, offset, page_size):
112 url = append_to_qs(FULL_VIDEO_LIST, {"range": "%s-%s" % (offset, offset+page_size-1)})
113 data = grab_json(url)
114 if "entries" not in data:
115 raise Exception("Missing data in SBS response", data)
116 return data["entries"]
117
118 def explode_videos_to_unique_categories(self, all_video_entries):
119 for entry_data in all_video_entries:
120 for category_data in entry_data["media$categories"]:
121 category_path = self.calculate_category_path(
122 category_data["media$scheme"],
123 category_data["media$name"],
124 )
125 if category_path:
126 yield category_path, entry_data
127
128 def calculate_category_path(self, scheme, name):
129 if not scheme:
130 return
131 if scheme == name:
132 return
133 name = name.split("/")
134 if name[0] != scheme:
135 name.insert(0, scheme)
136 return name
137
138 def create_nav_node(self, parent, category_path):
139 if not category_path:
140 return parent
141
142 current_path = category_path[0]
143 current_node = parent.find_existing_child(current_path)
144 if not current_node:
145 current_node = SbsNavNode(current_path, parent)
146 return self.create_nav_node(current_node, category_path[1:])
147
148 def fill_nodes(root_node):
149 SbsRootNode(root_node)