]> code.delx.au - monosys/blob - scripts/opal-card-tool
opal-card-tool: More command line options
[monosys] / scripts / opal-card-tool
1 #!/usr/bin/python3
2
3 import argparse
4 import csv
5 import datetime
6 import getpass
7 import itertools
8 import lxml.html
9 import os
10 import pickle
11 import requests
12 import subprocess
13 import sys
14 import tempfile
15
16
17 VERSION = 3
18
19 CACHE_DIR = os.environ.get("XDG_CACHE_HOME", os.path.expanduser("~/.cache/opal-card-tool"))
20 PICKLE_FILE = os.path.join(CACHE_DIR, "pickle")
21
22 OPAL_BASE = "https://www.opal.com.au"
23 LOGIN_URL = OPAL_BASE + "/login/registeredUserUsernameAndPasswordLogin"
24 CARD_DETAILS_URL = OPAL_BASE + "/registered/getJsonCardDetailsArray"
25 TRANSACTION_LIST_URL = OPAL_BASE + "/registered/opal-card-transactions/opal-card-activities-list?AMonth=-1&AYear=-1&cardIndex=%d&pageIndex=%d"
26
27
28
29 def stringify(el):
30 return " ".join(t.strip() for t in el.itertext()).strip()
31
32 def get_first(l):
33 for x in l:
34 return x
35
36 def is_weekday(d):
37 return d.isoweekday() <= 5
38
39 def n_days_ago(days):
40 d = datetime.datetime.now() - datetime.timedelta(days=days)
41 d = d.replace(hour=0, minute=0, second=0, microsecond=0)
42 return d
43
44
45 class FatalError(Exception):
46 pass
47
48 class Transaction(object):
49 pass
50
51 class Card(object):
52 def __init__(self):
53 self.transaction_list = []
54
55 def get_max_transaction(self):
56 if self.transaction_list:
57 return self.transaction_list[0].number
58 else:
59 return -1
60
61 def add_transactions(self, l):
62 self.transaction_list = l + self.transaction_list
63
64 class Opal(object):
65 def __init__(self, username, password):
66 self.version = VERSION
67 self.username = username
68 self.password = password
69
70 self.session = requests.Session()
71 self.cards = []
72
73 def login(self):
74 r = self.session.post(LOGIN_URL, {
75 "h_username": self.username,
76 "h_password": self.password,
77 })
78 if not r.ok:
79 raise Exception("Failed to login, error code: %d" % r.status_code)
80
81 json = r.json()
82 if json["errorMessage"]:
83 raise Exception("Failed to login: %s" % json["errorMessage"])
84
85 def load(self):
86 self.load_cards()
87 for card in self.cards:
88 self.load_transactions(card)
89
90 def resolve_card_number(self, card_number):
91 if int(card_number) < len(self.cards):
92 return self.cards[int(card_number)].number
93 else:
94 return card_number
95
96 def get_transaction_list_for_card(self, card_number):
97 for card in self.cards:
98 if card.number == card_number:
99 return card.transaction_list
100
101 return []
102
103 def load_cards(self):
104 r = self.session.get(CARD_DETAILS_URL)
105 if not r.ok:
106 raise Exception("Failed to login, error code: %d" % r.status_code)
107
108 for index, card_json in enumerate(r.json()):
109 card_number = card_json["cardNumber"]
110
111 for card in self.cards:
112 if card.number == card_number:
113 break
114 else:
115 card = Card()
116 self.cards.append(card)
117
118 card.number = card_number
119 card.name = card_json["cardNickName"]
120 card.index = index
121
122 def load_transactions(self, card):
123 print("Loading transactions for", card.number, "", end="", flush=True)
124 max_transaction = card.get_max_transaction()
125 transaction_list = []
126
127 for page in itertools.count(1):
128 print(".", end="", flush=True)
129 transaction_page = self.fetch_transaction_page(card.index, page)
130 continue_paging = False
131
132 for transaction in transaction_page:
133 if transaction.number <= max_transaction:
134 continue_paging = False
135 break
136
137 transaction_list.append(transaction)
138 continue_paging = True
139
140 if not continue_paging:
141 break
142
143 print(" done")
144 card.add_transactions(transaction_list)
145
146 def parse_transaction(self, cells):
147 t = Transaction()
148 t.number = int(stringify(cells["transaction number"]))
149 t.timestamp = datetime.datetime.strptime(stringify(cells["date/time"]), "%a %d/%m/%Y %H:%M")
150 t.mode = get_first(cells["mode"].xpath("img/@alt"))
151 t.details = stringify(cells["details"])
152 t.journey_number = stringify(cells["journey number"])
153 t.fare_applied = stringify(cells["fare applied"])
154 t.fare = stringify(cells["fare"])
155 t.fare_discount = stringify(cells["discount"])
156 t.amount = stringify(cells["amount"])
157 return t
158
159 def fetch_transaction_page(self, card, page):
160 url = TRANSACTION_LIST_URL % (card, page)
161 r = self.session.get(url)
162 if not r.ok:
163 raise Exception("Failed to fetch transactions, error code: %d" % r.status_code)
164
165 doc = lxml.html.fromstring(r.text)
166 headers = [stringify(th).lower() for th in doc.xpath("//table/thead//th")]
167
168 if not headers:
169 return []
170
171 for tr in doc.xpath("//table/tbody/tr"):
172 try:
173 yield self.parse_transaction(dict(zip(headers, tr.getchildren())))
174 except Exception:
175 print("Failed to parse:", headers, lxml.html.tostring(tr), file=sys.stderr)
176 raise
177
178
179 class CommuterGraph(object):
180 class gnuplot_dialect(csv.excel):
181 delimiter = " "
182
183 def __init__(self):
184 self.data_am_csv, self.data_am_file = self.new_csv()
185 self.data_pm_csv, self.data_pm_file = self.new_csv()
186 self.plot_file = self.new_tempfile()
187 self.files = [self.data_am_file, self.data_pm_file, self.plot_file]
188
189 self.xrange_start = None
190 self.xrange_end = None
191
192 def graph(self, transaction_list):
193 try:
194 self.write_points(transaction_list)
195 self.write_plot_command()
196 self.flush_files()
197 self.run_gnuplot()
198 finally:
199 self.cleanup()
200
201 def new_tempfile(self):
202 return tempfile.NamedTemporaryFile(
203 mode="w",
204 encoding="utf-8",
205 prefix="opal-card-tool-",
206 delete=True,
207 )
208
209 def new_csv(self):
210 f = self.new_tempfile()
211 out = csv.writer(f, dialect=self.gnuplot_dialect)
212 return out, f
213
214 def update_xrange(self, ts):
215 if self.xrange_start is None or ts < self.xrange_start:
216 self.xrange_start = ts
217 if self.xrange_end is None or ts > self.xrange_end:
218 self.xrange_end = ts
219
220 def generate_point(self, transaction):
221 ts = transaction.timestamp
222 x_date = ts.strftime("%Y-%m-%dT00:00:00")
223 y_time = ts.strftime("2000-01-01T%H:%M:00")
224 y_label = ts.strftime("%H:%M")
225 return [x_date, y_time, y_label]
226
227 def write_point(self, ts, point):
228 if ts.time() < datetime.time(12):
229 out_csv = self.data_am_csv
230 else:
231 out_csv = self.data_pm_csv
232
233 out_csv.writerow(point)
234
235 def write_points(self, transaction_list):
236 for transaction in transaction_list:
237 if not self.is_commuter_transaction(transaction):
238 continue
239
240 self.update_xrange(transaction.timestamp)
241 point = self.generate_point(transaction)
242 self.write_point(transaction.timestamp, point)
243
244 def is_commuter_transaction(self, transaction):
245 if not is_weekday(transaction.timestamp):
246 return False
247 if transaction.details.startswith("Auto top up"):
248 return False
249 return True
250
251 def write_plot_command(self):
252 d = {
253 "data_am_filename": self.data_am_file.name,
254 "data_pm_filename": self.data_pm_file.name,
255 "xrange_start": self.xrange_start - datetime.timedelta(hours=24),
256 "xrange_end": self.xrange_end + datetime.timedelta(hours=24),
257 }
258 self.plot_file.write(R"""
259 set timefmt '%Y-%m-%dT%H:%M:%S'
260
261 set xlabel 'Date'
262 set xdata time
263 set format x '%a %d'
264 set xtics 86400 scale 1.0,0.0
265 set xrange [ '{xrange_start}' : '{xrange_end}' ]
266
267 set ylabel 'Time'
268 set ydata time
269 set format y '%H:%M'
270 set yrange [ '2000-01-01T06:00:00' : '2000-01-01T23:00:00' ]
271
272 set key box opaque
273 set terminal qt \
274 persist \
275 title 'opal-card-tool graph' \
276 font 'Sans,10' \
277 enhanced \
278 size 1000,700
279
280 plot \
281 '{data_pm_filename}' \
282 using 1:2 \
283 with line \
284 title 'Afternoon departure time' \
285 , \
286 '{data_pm_filename}' \
287 using 1:2:3 \
288 with labels \
289 offset 0,1 \
290 notitle \
291 , \
292 '{data_am_filename}' \
293 using 1:2 \
294 with line \
295 title 'Morning departure time' \
296 , \
297 '{data_am_filename}' \
298 using 1:2:3 \
299 with labels \
300 offset 0,1 \
301 notitle \
302 """.format(**d))
303
304 def flush_files(self):
305 for f in self.files:
306 f.flush()
307
308 def cleanup(self):
309 for f in self.files:
310 try:
311 f.close()
312 except:
313 pass
314
315 def run_gnuplot(self):
316 subprocess.check_call([
317 "gnuplot",
318 self.plot_file.name,
319 ])
320
321 def restrict_days(transaction_list, num_days):
322 oldest_date = n_days_ago(num_days)
323 for transaction in transaction_list:
324 if transaction.timestamp < oldest_date:
325 return
326 yield transaction
327
328 def graph_commuter(transaction_list):
329 g = CommuterGraph()
330 g.graph(transaction_list)
331
332 def print_transaction_list(transaction_list):
333 headers = []
334 headers.extend(["number", "timestamp"])
335 headers.extend(h for h in sorted(transaction_list[0].__dict__.keys()) if h not in headers)
336
337 out = csv.DictWriter(sys.stdout, headers)
338 out.writeheader()
339 for transaction in transaction_list:
340 out.writerow(transaction.__dict__)
341
342 def print_cards(opal):
343 for i, card in enumerate(opal.cards):
344 print("Card", i)
345 print(" number:", card.number)
346 print(" name:", card.name)
347 print(" transactions:", len(card.transaction_list))
348 print()
349
350 def try_unpickle():
351 if not os.path.isfile(PICKLE_FILE):
352 return None
353
354 with open(PICKLE_FILE, "rb") as f:
355 return pickle.load(f)
356
357 def save_pickle(opal):
358 if not os.path.isdir(CACHE_DIR):
359 os.makedirs(CACHE_DIR)
360 with open(PICKLE_FILE, "wb") as f:
361 pickle.dump(opal, f)
362
363
364
365 def upgrade_opal_v2(opal):
366 # example upgrade!
367 opal.version = 3
368
369 def upgrade_opal(opal):
370 while opal.version < VERSION:
371 print("Upgrading from version", opal.version, file=sys.stderr)
372 upgrade_func = globals()["upgrade_opal_v%d" % opal.version]
373 upgrade_func(opal)
374
375
376
377 def load_opal():
378 opal = try_unpickle()
379
380 if opal:
381 upgrade_opal(opal)
382 else:
383 username = input("Username: ")
384 password = getpass.getpass()
385 opal = Opal(username, password)
386
387 save_pickle(opal)
388 return opal
389
390 def do_load():
391 opal = load_opal()
392 opal.login()
393 opal.load()
394 save_pickle(opal)
395
396 def do_show_cards():
397 opal = load_opal()
398 print_cards(opal)
399 save_pickle(opal)
400
401 def do_print(args):
402 opal = load_opal()
403
404 if args.card_number:
405 card_number = args.card_number
406 else:
407 card_number = 0
408 card_number = opal.resolve_card_number(card_number)
409
410 if args.num_days:
411 num_days = int(args.num_days)
412 elif args.graph_commuter:
413 num_days = 14
414 else:
415 num_days = 365
416
417 transaction_list = opal.get_transaction_list_for_card(card_number)
418 transaction_list = list(restrict_days(transaction_list, num_days))
419
420 if not transaction_list:
421 print("No transactions!", file=sys.stderr)
422 return
423
424 if args.show_transactions:
425 print_transaction_list(transaction_list)
426 elif args.graph_commuter:
427 graph_commuter(transaction_list)
428 else:
429 print("Missing display function!", file=sys.stderr)
430
431 def parse_args():
432 parser = argparse.ArgumentParser(description="Opal card activity fetcher")
433
434 parser.add_argument("--num-days",
435 help="restrict to NUM_DAYS of output"
436 )
437 parser.add_argument("--card-number",
438 help="Opal card number or index (eg: 0,1,etc"
439 )
440
441 group = parser.add_mutually_exclusive_group(required=True)
442 group.add_argument("--load", action="store_true",
443 help="load any new data from the Opal website"
444 )
445 group.add_argument("--show-cards", action="store_true",
446 help="show a list of cards"
447 )
448 group.add_argument("--show-transactions", action="store_true",
449 help="show transactions for card"
450 )
451 group.add_argument("--graph-commuter", action="store_true",
452 help="draw commuter graph for card with gnuplot"
453 )
454
455 args = parser.parse_args()
456
457 return args
458
459 def main():
460 args = parse_args()
461
462 if args.load:
463 do_load(opal)
464 return
465
466 elif args.show_cards:
467 do_show_cards()
468 return
469
470 else:
471 do_print(args)
472
473 if __name__ == "__main__":
474 try:
475 main()
476 except (KeyboardInterrupt, BrokenPipeError) as e:
477 print("Exiting:", e, file=sys.stderr)
478 sys.exit(1)
479