]> code.delx.au - webdl/blob - sbs.py
Download SBS player params from the API
[webdl] / sbs.py
1 import requests_cache
2 from common import grab_html, grab_json, grab_xml, download_hls, download_mpd, Node, append_to_qs
3
4 import json
5 import logging
6 import os
7 import sys
8
9 BASE = "https://www.sbs.com.au"
10 FULL_VIDEO_LIST = BASE + "/api/video_feed/f/Bgtm9B/sbs-section-programs/"
11 VIDEO_URL = BASE + "/ondemand/video/single/%s"
12 PARAMS_URL = BASE + "/api/video_pdkvars/id/%s?form=json"
13
14 NS = {
15 "smil": "http://www.w3.org/2005/SMIL21/Language",
16 }
17
18
19 class SbsVideoNode(Node):
20 def __init__(self, title, parent, url):
21 Node.__init__(self, title, parent)
22 self.video_id = url.split("/")[-1]
23 self.can_download = True
24
25 def download(self):
26 with requests_cache.disabled():
27 doc = grab_html(VIDEO_URL % self.video_id)
28 player_params = grab_json(PARAMS_URL % self.video_id)
29
30 error = player_params.get("error", None)
31 if error:
32 print("Cannot download:", error)
33 return False
34
35 release_url = player_params["releaseUrls"]["html"]
36 filename = self.title + ".ts"
37
38 hls_url = self.get_hls_url(release_url)
39 if hls_url:
40 return download_hls(filename, hls_url)
41 else:
42 return download_mpd(filename, release_url)
43
44 def get_hls_url(self, release_url):
45 with requests_cache.disabled():
46 doc = grab_xml("https:" + release_url.replace("http:", "").replace("https:", ""))
47 video = doc.xpath("//smil:video", namespaces=NS)
48 if not video:
49 return
50 video_url = video[0].attrib["src"]
51 return video_url
52
53 class SbsNavNode(Node):
54 def create_video_node(self, entry_data):
55 SbsVideoNode(entry_data["title"], self, entry_data["id"])
56
57 def find_existing_child(self, path):
58 for child in self.children:
59 if child.title == path:
60 return child
61
62 class SbsRootNode(SbsNavNode):
63 def __init__(self, parent):
64 Node.__init__(self, "SBS", parent)
65
66 def fill_children(self):
67 all_video_entries = self.load_all_video_entries()
68 category_and_entry_data = self.explode_videos_to_unique_categories(all_video_entries)
69 for category_path, entry_data in category_and_entry_data:
70 nav_node = self.create_nav_node(self, category_path)
71 nav_node.create_video_node(entry_data)
72
73 def load_all_video_entries(self):
74 channels = [
75 "Channel/SBS1",
76 "Channel/SBS Food",
77 "Channel/SBS VICELAND",
78 "Channel/SBS World Movies",
79 "Channel/Web Exclusive",
80 ]
81
82 all_entries = {}
83 for channel in channels:
84 self.load_all_video_entries_for_channel(all_entries, channel)
85
86 all_entries = list(all_entries.values())
87 print(" SBS fetched", len(all_entries))
88 return all_entries
89
90 def load_all_video_entries_for_channel(self, all_entries, channel):
91 offset = 1
92 page_size = 500
93 duplicate_warning = False
94
95 while True:
96 entries = self.fetch_entries_page(channel, offset, page_size)
97 if len(entries) == 0:
98 break
99
100 for entry in entries:
101 guid = entry["guid"]
102 if guid in entries and not duplicate_warning:
103 # https://bitbucket.org/delx/webdl/issues/102/recent-sbs-series-missing
104 logging.warn("SBS returned a duplicate response, data is probably missing. Try decreasing page_size.")
105 duplicate_warning = True
106
107 all_entries[guid] = entry
108
109 offset += page_size
110 if os.isatty(sys.stdout.fileno()):
111 sys.stdout.write(".")
112 sys.stdout.flush()
113
114 def fetch_entries_page(self, channel, offset, page_size):
115 url = append_to_qs(FULL_VIDEO_LIST, {
116 "range": "%s-%s" % (offset, offset+page_size-1),
117 "byCategories": channel,
118 })
119 data = grab_json(url)
120 if "entries" not in data:
121 raise Exception("Missing data in SBS response", data)
122 return data["entries"]
123
124 def explode_videos_to_unique_categories(self, all_video_entries):
125 for entry_data in all_video_entries:
126 for category_data in entry_data["media$categories"]:
127 category_path = self.calculate_category_path(
128 category_data["media$scheme"],
129 category_data["media$name"],
130 )
131 if category_path:
132 yield category_path, entry_data
133
134 def calculate_category_path(self, scheme, name):
135 if not scheme:
136 return
137 if scheme == name:
138 return
139 name = name.split("/")
140 if name[0] != scheme:
141 name.insert(0, scheme)
142 return name
143
144 def create_nav_node(self, parent, category_path):
145 if not category_path:
146 return parent
147
148 current_path = category_path[0]
149 current_node = parent.find_existing_child(current_path)
150 if not current_node:
151 current_node = SbsNavNode(current_path, parent)
152 return self.create_nav_node(current_node, category_path[1:])
153
154 def fill_nodes(root_node):
155 SbsRootNode(root_node)