]> code.delx.au - monosys/blob - opal-card-tool
lib-ext-backup: set fs mountpoint and permissions
[monosys] / opal-card-tool
1 #!/usr/bin/env python3
2
3 import argparse
4 import csv
5 import datetime
6 import getpass
7 import itertools
8 import lxml.html
9 import os
10 import pickle
11 import requests
12 import subprocess
13 import sys
14 import tempfile
15 import time
16
17
18 VERSION = 3
19
20 CACHE_DIR = os.environ.get("XDG_CACHE_HOME", os.path.expanduser("~/.cache/opal-card-tool"))
21 PICKLE_FILE = os.path.join(CACHE_DIR, "pickle")
22
23 USER_AGENT = "Mozilla/5.0 (X11; Linux x86_64; rv:59.0) Gecko/20100101 Firefox/59.0"
24 OPAL_BASE = "https://www.opal.com.au"
25 LOGIN_URL = OPAL_BASE + "/login/registeredUserUsernameAndPasswordLogin"
26 CARD_DETAILS_URL = OPAL_BASE + "/registered/getJsonCardDetailsArray"
27 TRANSACTION_LIST_URL = OPAL_BASE + "/registered/opal-card-transactions/opal-card-activities-list?AMonth=-1&AYear=-1&cardIndex=%d&pageIndex=%d"
28
29
30
31 def stringify(el):
32 return " ".join(t.strip() for t in el.itertext()).strip()
33
34 def get_first(l):
35 for x in l:
36 return x
37
38 def is_weekday(d):
39 return d.isoweekday() <= 5
40
41 def n_days_ago(days):
42 d = datetime.datetime.now() - datetime.timedelta(days=days)
43 d = d.replace(hour=0, minute=0, second=0, microsecond=0)
44 return d
45
46
47 class FatalError(Exception):
48 pass
49
50 class Transaction(object):
51 pass
52
53 class Card(object):
54 def __init__(self):
55 self.transaction_list = []
56
57 def get_max_transaction(self):
58 if self.transaction_list:
59 return self.transaction_list[0].number
60 else:
61 return -1
62
63 def add_transactions(self, l):
64 self.transaction_list = l + self.transaction_list
65
66 class Opal(object):
67 def __init__(self, username, password):
68 self.version = VERSION
69 self.username = username
70 self.password = password
71 self.cards = []
72
73 self.init()
74
75 def init(self):
76 self.session = requests.Session()
77 self.session.headers["User-Agent"] = USER_AGENT
78
79 def login(self):
80 print("Attempting login ", end="", flush=True)
81 for i in range(10):
82 print(".", end="", flush=True)
83 self.login_once()
84 if self.check_login():
85 print(" ok")
86 return
87
88 time.sleep(1)
89
90 def login_once(self):
91 r = self.session.post(LOGIN_URL, {
92 "h_username": self.username,
93 "h_password": self.password,
94 })
95 if not r.ok:
96 raise Exception("Failed to login, error code: %d" % r.status_code)
97
98 json = r.json()
99 if json["errorMessage"]:
100 raise Exception("Failed to login: %s" % json["errorMessage"])
101
102 def check_login(self):
103 r = self.session.get(CARD_DETAILS_URL)
104 try:
105 r.json()
106 return True
107 except:
108 return False
109
110 def load(self):
111 self.load_cards()
112 for card in self.cards:
113 self.load_transactions(card)
114
115 def resolve_card_number(self, card_number):
116 if int(card_number) < len(self.cards):
117 return self.cards[int(card_number)].number
118 else:
119 return card_number
120
121 def get_transaction_list_for_card(self, card_number):
122 for card in self.cards:
123 if card.number == card_number:
124 return card.transaction_list
125
126 return []
127
128 def load_cards(self):
129 r = self.session.get(CARD_DETAILS_URL)
130 if not r.ok:
131 raise Exception("Failed to login, error code: %d" % r.status_code)
132
133 for index, card_json in enumerate(r.json()):
134 card_number = card_json["cardNumber"]
135
136 for card in self.cards:
137 if card.number == card_number:
138 break
139 else:
140 card = Card()
141 self.cards.append(card)
142
143 card.number = card_number
144 card.name = card_json["cardNickName"]
145 card.index = index
146
147 def load_transactions(self, card):
148 print("Loading transactions for", card.number, "", end="", flush=True)
149 max_transaction = card.get_max_transaction()
150 transaction_list = []
151
152 for page in itertools.count(1):
153 print(".", end="", flush=True)
154 transaction_page = self.fetch_transaction_page(card.index, page)
155 continue_paging = False
156
157 for transaction in transaction_page:
158 if transaction.number <= max_transaction:
159 continue_paging = False
160 break
161
162 transaction_list.append(transaction)
163 continue_paging = True
164
165 if not continue_paging:
166 break
167
168 print(" done")
169 card.add_transactions(transaction_list)
170
171 def parse_transaction(self, cells):
172 t = Transaction()
173 t.number = int(stringify(cells["transaction number"]))
174 t.timestamp = datetime.datetime.strptime(stringify(cells["date/time"]), "%a %d/%m/%Y %H:%M")
175 t.mode = get_first(cells["mode"].xpath("img/@alt"))
176 t.details = stringify(cells["details"])
177 t.journey_number = stringify(cells["journey number"])
178 t.fare_applied = stringify(cells["fare applied"])
179 t.fare = stringify(cells["fare"])
180 t.fare_discount = stringify(cells["discount"])
181 t.amount = stringify(cells["amount"])
182 return t
183
184 def fetch_transaction_page(self, card, page):
185 url = TRANSACTION_LIST_URL % (card, page)
186 r = self.session.get(url)
187 if not r.ok:
188 raise Exception("Failed to fetch transactions, error code: %d" % r.status_code)
189
190 doc = lxml.html.fromstring(r.text)
191 headers = [stringify(th).lower() for th in doc.xpath("//table/thead//th")]
192
193 if not headers:
194 return []
195
196 for tr in doc.xpath("//table/tbody/tr"):
197 try:
198 yield self.parse_transaction(dict(zip(headers, tr.getchildren())))
199 except Exception:
200 print("Failed to parse:", headers, lxml.html.tostring(tr), file=sys.stderr)
201 raise
202
203
204 class CommuterGraph(object):
205 class gnuplot_dialect(csv.excel):
206 delimiter = " "
207
208 def __init__(self):
209 self.data_am_csv, self.data_am_file = self.new_csv()
210 self.data_pm_csv, self.data_pm_file = self.new_csv()
211 self.plot_file = self.new_tempfile()
212 self.files = [self.data_am_file, self.data_pm_file, self.plot_file]
213
214 self.xrange_start = None
215 self.xrange_end = None
216
217 def is_plottable(self):
218 return self.xrange_start is not None and self.xrange_end is not None
219
220 def graph(self, transaction_list):
221 try:
222 self.write_points(transaction_list)
223 if not self.is_plottable():
224 print("No transactions!", file=sys.stderr)
225 return
226 self.write_plot_command()
227 self.flush_files()
228 self.run_gnuplot()
229 finally:
230 self.cleanup()
231
232 def new_tempfile(self):
233 return tempfile.NamedTemporaryFile(
234 mode="w",
235 encoding="utf-8",
236 prefix="opal-card-tool-",
237 delete=True,
238 )
239
240 def new_csv(self):
241 f = self.new_tempfile()
242 out = csv.writer(f, dialect=self.gnuplot_dialect)
243 return out, f
244
245 def update_xrange(self, ts):
246 if self.xrange_start is None or ts < self.xrange_start:
247 self.xrange_start = ts
248 if self.xrange_end is None or ts > self.xrange_end:
249 self.xrange_end = ts
250
251 def generate_point(self, transaction):
252 ts = transaction.timestamp
253 x_date = ts.strftime("%Y-%m-%dT00:00:00")
254 y_time = ts.strftime("2000-01-01T%H:%M:00")
255 y_label = ts.strftime("%H:%M")
256 return [x_date, y_time, y_label]
257
258 def write_point(self, ts, point):
259 if ts.time() < datetime.time(12):
260 out_csv = self.data_am_csv
261 else:
262 out_csv = self.data_pm_csv
263
264 out_csv.writerow(point)
265
266 def write_points(self, transaction_list):
267 for transaction in transaction_list:
268 if not self.is_commuter_transaction(transaction):
269 continue
270
271 self.update_xrange(transaction.timestamp)
272 point = self.generate_point(transaction)
273 self.write_point(transaction.timestamp, point)
274
275 def is_commuter_transaction(self, transaction):
276 if not is_weekday(transaction.timestamp):
277 return False
278 if transaction.details.startswith("Auto top up"):
279 return False
280 return True
281
282 def write_plot_command(self):
283 d = {
284 "data_am_filename": self.data_am_file.name,
285 "data_pm_filename": self.data_pm_file.name,
286 "xrange_start": self.xrange_start - datetime.timedelta(hours=24),
287 "xrange_end": self.xrange_end + datetime.timedelta(hours=24),
288 }
289 self.plot_file.write(R"""
290 set timefmt '%Y-%m-%dT%H:%M:%S'
291
292 set xlabel 'Date'
293 set xdata time
294 set format x '%a %d'
295 set xtics 86400 scale 1.0,0.0
296 set xrange [ '{xrange_start}' : '{xrange_end}' ]
297
298 set ylabel 'Time'
299 set ydata time
300 set format y '%H:%M'
301 set yrange [ '2000-01-01T06:00:00' : '2000-01-01T23:00:00' ]
302
303 set key box opaque
304 set terminal qt \
305 persist \
306 title 'opal-card-tool graph' \
307 font 'Sans,10' \
308 enhanced \
309 size 1000,700
310
311 plot \
312 '{data_pm_filename}' \
313 using 1:2 \
314 with line \
315 title 'Afternoon departure time' \
316 , \
317 '{data_pm_filename}' \
318 using 1:2:3 \
319 with labels \
320 offset 0,1 \
321 notitle \
322 , \
323 '{data_am_filename}' \
324 using 1:2 \
325 with line \
326 title 'Morning departure time' \
327 , \
328 '{data_am_filename}' \
329 using 1:2:3 \
330 with labels \
331 offset 0,1 \
332 notitle \
333 """.format(**d))
334
335 def flush_files(self):
336 for f in self.files:
337 f.flush()
338
339 def cleanup(self):
340 for f in self.files:
341 try:
342 f.close()
343 except:
344 pass
345
346 def run_gnuplot(self):
347 subprocess.check_call([
348 "gnuplot",
349 self.plot_file.name,
350 ])
351
352 def restrict_days(transaction_list, num_days):
353 oldest_date = n_days_ago(num_days)
354 for transaction in transaction_list:
355 if transaction.timestamp < oldest_date:
356 return
357 yield transaction
358
359 def graph_commuter(transaction_list):
360 g = CommuterGraph()
361 g.graph(transaction_list)
362
363 def print_transaction_list(transaction_list):
364 headers = []
365 headers.extend(["number", "timestamp"])
366 headers.extend(h for h in sorted(transaction_list[0].__dict__.keys()) if h not in headers)
367
368 out = csv.DictWriter(sys.stdout, headers)
369 out.writeheader()
370 for transaction in transaction_list:
371 out.writerow(transaction.__dict__)
372
373 def print_cards(opal):
374 for i, card in enumerate(opal.cards):
375 print("Card", i)
376 print(" number:", card.number)
377 print(" name:", card.name)
378 print(" transactions:", len(card.transaction_list))
379 print()
380
381 def try_unpickle():
382 if not os.path.isfile(PICKLE_FILE):
383 return None
384
385 with open(PICKLE_FILE, "rb") as f:
386 return pickle.load(f)
387
388 def save_pickle(opal):
389 if not os.path.isdir(CACHE_DIR):
390 os.makedirs(CACHE_DIR)
391 with open(PICKLE_FILE, "wb") as f:
392 pickle.dump(opal, f)
393
394
395
396 def upgrade_opal_v2(opal):
397 # example upgrade!
398 opal.version = 3
399
400 def upgrade_opal(opal):
401 while opal.version < VERSION:
402 print("Upgrading from version", opal.version, file=sys.stderr)
403 upgrade_func = globals()["upgrade_opal_v%d" % opal.version]
404 upgrade_func(opal)
405
406
407
408 def load_opal():
409 opal = try_unpickle()
410
411 if opal:
412 upgrade_opal(opal)
413 opal.init()
414 else:
415 username = input("Username: ")
416 password = getpass.getpass()
417 opal = Opal(username, password)
418
419 save_pickle(opal)
420 return opal
421
422 def do_load():
423 opal = load_opal()
424 opal.login()
425 opal.load()
426 save_pickle(opal)
427
428 def do_show_cards():
429 opal = load_opal()
430 print_cards(opal)
431 save_pickle(opal)
432
433 def do_print(args):
434 opal = load_opal()
435
436 if args.card_number:
437 card_number = args.card_number
438 else:
439 card_number = 0
440 card_number = opal.resolve_card_number(card_number)
441
442 if args.num_days:
443 num_days = int(args.num_days)
444 elif args.graph_commuter:
445 num_days = 14
446 else:
447 num_days = 365
448
449 transaction_list = opal.get_transaction_list_for_card(card_number)
450 transaction_list = list(restrict_days(transaction_list, num_days))
451
452 if not transaction_list:
453 print("No transactions!", file=sys.stderr)
454 return
455
456 if args.show_transactions:
457 print_transaction_list(transaction_list)
458 elif args.graph_commuter:
459 graph_commuter(transaction_list)
460 else:
461 print("Missing display function!", file=sys.stderr)
462
463 def parse_args():
464 parser = argparse.ArgumentParser(description="Opal card activity fetcher")
465
466 parser.add_argument("--num-days",
467 help="restrict to NUM_DAYS of output"
468 )
469 parser.add_argument("--card-number",
470 help="Opal card number or index (eg: 0,1,etc"
471 )
472
473 group = parser.add_mutually_exclusive_group(required=True)
474 group.add_argument("--load", action="store_true",
475 help="load any new data from the Opal website"
476 )
477 group.add_argument("--show-cards", action="store_true",
478 help="show a list of cards"
479 )
480 group.add_argument("--show-transactions", action="store_true",
481 help="show transactions for card"
482 )
483 group.add_argument("--graph-commuter", action="store_true",
484 help="draw commuter graph for card with gnuplot"
485 )
486
487 args = parser.parse_args()
488
489 return args
490
491 def main():
492 args = parse_args()
493
494 if args.load:
495 do_load()
496 return
497
498 elif args.show_cards:
499 do_show_cards()
500 return
501
502 else:
503 do_print(args)
504
505 if __name__ == "__main__":
506 try:
507 main()
508 except (KeyboardInterrupt, BrokenPipeError) as e:
509 print("Exiting:", e, file=sys.stderr)
510 sys.exit(1)
511