]> code.delx.au - monosys/blob - opal-card-tool
opal-card-tool: Use Firefox user-agent
[monosys] / opal-card-tool
1 #!/usr/bin/env python3
2
3 import argparse
4 import csv
5 import datetime
6 import getpass
7 import itertools
8 import lxml.html
9 import os
10 import pickle
11 import requests
12 import subprocess
13 import sys
14 import tempfile
15
16
17 VERSION = 3
18
19 CACHE_DIR = os.environ.get("XDG_CACHE_HOME", os.path.expanduser("~/.cache/opal-card-tool"))
20 PICKLE_FILE = os.path.join(CACHE_DIR, "pickle")
21
22 USER_AGENT = "Mozilla/5.0 (X11; Linux x86_64; rv:59.0) Gecko/20100101 Firefox/59.0"
23 OPAL_BASE = "https://www.opal.com.au"
24 LOGIN_URL = OPAL_BASE + "/login/registeredUserUsernameAndPasswordLogin"
25 CARD_DETAILS_URL = OPAL_BASE + "/registered/getJsonCardDetailsArray"
26 TRANSACTION_LIST_URL = OPAL_BASE + "/registered/opal-card-transactions/opal-card-activities-list?AMonth=-1&AYear=-1&cardIndex=%d&pageIndex=%d"
27
28
29
30 def stringify(el):
31 return " ".join(t.strip() for t in el.itertext()).strip()
32
33 def get_first(l):
34 for x in l:
35 return x
36
37 def is_weekday(d):
38 return d.isoweekday() <= 5
39
40 def n_days_ago(days):
41 d = datetime.datetime.now() - datetime.timedelta(days=days)
42 d = d.replace(hour=0, minute=0, second=0, microsecond=0)
43 return d
44
45
46 class FatalError(Exception):
47 pass
48
49 class Transaction(object):
50 pass
51
52 class Card(object):
53 def __init__(self):
54 self.transaction_list = []
55
56 def get_max_transaction(self):
57 if self.transaction_list:
58 return self.transaction_list[0].number
59 else:
60 return -1
61
62 def add_transactions(self, l):
63 self.transaction_list = l + self.transaction_list
64
65 class Opal(object):
66 def __init__(self, username, password):
67 self.version = VERSION
68 self.username = username
69 self.password = password
70 self.cards = []
71
72 self.init()
73
74 def init(self):
75 self.session = requests.Session()
76 self.session.headers["User-Agent"] = USER_AGENT
77
78 def login(self):
79 r = self.session.post(LOGIN_URL, {
80 "h_username": self.username,
81 "h_password": self.password,
82 })
83 if not r.ok:
84 raise Exception("Failed to login, error code: %d" % r.status_code)
85
86 json = r.json()
87 if json["errorMessage"]:
88 raise Exception("Failed to login: %s" % json["errorMessage"])
89
90 def load(self):
91 self.load_cards()
92 for card in self.cards:
93 self.load_transactions(card)
94
95 def resolve_card_number(self, card_number):
96 if int(card_number) < len(self.cards):
97 return self.cards[int(card_number)].number
98 else:
99 return card_number
100
101 def get_transaction_list_for_card(self, card_number):
102 for card in self.cards:
103 if card.number == card_number:
104 return card.transaction_list
105
106 return []
107
108 def load_cards(self):
109 r = self.session.get(CARD_DETAILS_URL)
110 if not r.ok:
111 raise Exception("Failed to login, error code: %d" % r.status_code)
112
113 for index, card_json in enumerate(r.json()):
114 card_number = card_json["cardNumber"]
115
116 for card in self.cards:
117 if card.number == card_number:
118 break
119 else:
120 card = Card()
121 self.cards.append(card)
122
123 card.number = card_number
124 card.name = card_json["cardNickName"]
125 card.index = index
126
127 def load_transactions(self, card):
128 print("Loading transactions for", card.number, "", end="", flush=True)
129 max_transaction = card.get_max_transaction()
130 transaction_list = []
131
132 for page in itertools.count(1):
133 print(".", end="", flush=True)
134 transaction_page = self.fetch_transaction_page(card.index, page)
135 continue_paging = False
136
137 for transaction in transaction_page:
138 if transaction.number <= max_transaction:
139 continue_paging = False
140 break
141
142 transaction_list.append(transaction)
143 continue_paging = True
144
145 if not continue_paging:
146 break
147
148 print(" done")
149 card.add_transactions(transaction_list)
150
151 def parse_transaction(self, cells):
152 t = Transaction()
153 t.number = int(stringify(cells["transaction number"]))
154 t.timestamp = datetime.datetime.strptime(stringify(cells["date/time"]), "%a %d/%m/%Y %H:%M")
155 t.mode = get_first(cells["mode"].xpath("img/@alt"))
156 t.details = stringify(cells["details"])
157 t.journey_number = stringify(cells["journey number"])
158 t.fare_applied = stringify(cells["fare applied"])
159 t.fare = stringify(cells["fare"])
160 t.fare_discount = stringify(cells["discount"])
161 t.amount = stringify(cells["amount"])
162 return t
163
164 def fetch_transaction_page(self, card, page):
165 url = TRANSACTION_LIST_URL % (card, page)
166 r = self.session.get(url)
167 if not r.ok:
168 raise Exception("Failed to fetch transactions, error code: %d" % r.status_code)
169
170 doc = lxml.html.fromstring(r.text)
171 headers = [stringify(th).lower() for th in doc.xpath("//table/thead//th")]
172
173 if not headers:
174 return []
175
176 for tr in doc.xpath("//table/tbody/tr"):
177 try:
178 yield self.parse_transaction(dict(zip(headers, tr.getchildren())))
179 except Exception:
180 print("Failed to parse:", headers, lxml.html.tostring(tr), file=sys.stderr)
181 raise
182
183
184 class CommuterGraph(object):
185 class gnuplot_dialect(csv.excel):
186 delimiter = " "
187
188 def __init__(self):
189 self.data_am_csv, self.data_am_file = self.new_csv()
190 self.data_pm_csv, self.data_pm_file = self.new_csv()
191 self.plot_file = self.new_tempfile()
192 self.files = [self.data_am_file, self.data_pm_file, self.plot_file]
193
194 self.xrange_start = None
195 self.xrange_end = None
196
197 def is_plottable(self):
198 return self.xrange_start is not None and self.xrange_end is not None
199
200 def graph(self, transaction_list):
201 try:
202 self.write_points(transaction_list)
203 if not self.is_plottable():
204 print("No transactions!", file=sys.stderr)
205 return
206 self.write_plot_command()
207 self.flush_files()
208 self.run_gnuplot()
209 finally:
210 self.cleanup()
211
212 def new_tempfile(self):
213 return tempfile.NamedTemporaryFile(
214 mode="w",
215 encoding="utf-8",
216 prefix="opal-card-tool-",
217 delete=True,
218 )
219
220 def new_csv(self):
221 f = self.new_tempfile()
222 out = csv.writer(f, dialect=self.gnuplot_dialect)
223 return out, f
224
225 def update_xrange(self, ts):
226 if self.xrange_start is None or ts < self.xrange_start:
227 self.xrange_start = ts
228 if self.xrange_end is None or ts > self.xrange_end:
229 self.xrange_end = ts
230
231 def generate_point(self, transaction):
232 ts = transaction.timestamp
233 x_date = ts.strftime("%Y-%m-%dT00:00:00")
234 y_time = ts.strftime("2000-01-01T%H:%M:00")
235 y_label = ts.strftime("%H:%M")
236 return [x_date, y_time, y_label]
237
238 def write_point(self, ts, point):
239 if ts.time() < datetime.time(12):
240 out_csv = self.data_am_csv
241 else:
242 out_csv = self.data_pm_csv
243
244 out_csv.writerow(point)
245
246 def write_points(self, transaction_list):
247 for transaction in transaction_list:
248 if not self.is_commuter_transaction(transaction):
249 continue
250
251 self.update_xrange(transaction.timestamp)
252 point = self.generate_point(transaction)
253 self.write_point(transaction.timestamp, point)
254
255 def is_commuter_transaction(self, transaction):
256 if not is_weekday(transaction.timestamp):
257 return False
258 if transaction.details.startswith("Auto top up"):
259 return False
260 return True
261
262 def write_plot_command(self):
263 d = {
264 "data_am_filename": self.data_am_file.name,
265 "data_pm_filename": self.data_pm_file.name,
266 "xrange_start": self.xrange_start - datetime.timedelta(hours=24),
267 "xrange_end": self.xrange_end + datetime.timedelta(hours=24),
268 }
269 self.plot_file.write(R"""
270 set timefmt '%Y-%m-%dT%H:%M:%S'
271
272 set xlabel 'Date'
273 set xdata time
274 set format x '%a %d'
275 set xtics 86400 scale 1.0,0.0
276 set xrange [ '{xrange_start}' : '{xrange_end}' ]
277
278 set ylabel 'Time'
279 set ydata time
280 set format y '%H:%M'
281 set yrange [ '2000-01-01T06:00:00' : '2000-01-01T23:00:00' ]
282
283 set key box opaque
284 set terminal qt \
285 persist \
286 title 'opal-card-tool graph' \
287 font 'Sans,10' \
288 enhanced \
289 size 1000,700
290
291 plot \
292 '{data_pm_filename}' \
293 using 1:2 \
294 with line \
295 title 'Afternoon departure time' \
296 , \
297 '{data_pm_filename}' \
298 using 1:2:3 \
299 with labels \
300 offset 0,1 \
301 notitle \
302 , \
303 '{data_am_filename}' \
304 using 1:2 \
305 with line \
306 title 'Morning departure time' \
307 , \
308 '{data_am_filename}' \
309 using 1:2:3 \
310 with labels \
311 offset 0,1 \
312 notitle \
313 """.format(**d))
314
315 def flush_files(self):
316 for f in self.files:
317 f.flush()
318
319 def cleanup(self):
320 for f in self.files:
321 try:
322 f.close()
323 except:
324 pass
325
326 def run_gnuplot(self):
327 subprocess.check_call([
328 "gnuplot",
329 self.plot_file.name,
330 ])
331
332 def restrict_days(transaction_list, num_days):
333 oldest_date = n_days_ago(num_days)
334 for transaction in transaction_list:
335 if transaction.timestamp < oldest_date:
336 return
337 yield transaction
338
339 def graph_commuter(transaction_list):
340 g = CommuterGraph()
341 g.graph(transaction_list)
342
343 def print_transaction_list(transaction_list):
344 headers = []
345 headers.extend(["number", "timestamp"])
346 headers.extend(h for h in sorted(transaction_list[0].__dict__.keys()) if h not in headers)
347
348 out = csv.DictWriter(sys.stdout, headers)
349 out.writeheader()
350 for transaction in transaction_list:
351 out.writerow(transaction.__dict__)
352
353 def print_cards(opal):
354 for i, card in enumerate(opal.cards):
355 print("Card", i)
356 print(" number:", card.number)
357 print(" name:", card.name)
358 print(" transactions:", len(card.transaction_list))
359 print()
360
361 def try_unpickle():
362 if not os.path.isfile(PICKLE_FILE):
363 return None
364
365 with open(PICKLE_FILE, "rb") as f:
366 return pickle.load(f)
367
368 def save_pickle(opal):
369 if not os.path.isdir(CACHE_DIR):
370 os.makedirs(CACHE_DIR)
371 with open(PICKLE_FILE, "wb") as f:
372 pickle.dump(opal, f)
373
374
375
376 def upgrade_opal_v2(opal):
377 # example upgrade!
378 opal.version = 3
379
380 def upgrade_opal(opal):
381 while opal.version < VERSION:
382 print("Upgrading from version", opal.version, file=sys.stderr)
383 upgrade_func = globals()["upgrade_opal_v%d" % opal.version]
384 upgrade_func(opal)
385
386
387
388 def load_opal():
389 opal = try_unpickle()
390
391 if opal:
392 upgrade_opal(opal)
393 opal.init()
394 else:
395 username = input("Username: ")
396 password = getpass.getpass()
397 opal = Opal(username, password)
398
399 save_pickle(opal)
400 return opal
401
402 def do_load():
403 opal = load_opal()
404 opal.login()
405 opal.load()
406 save_pickle(opal)
407
408 def do_show_cards():
409 opal = load_opal()
410 print_cards(opal)
411 save_pickle(opal)
412
413 def do_print(args):
414 opal = load_opal()
415
416 if args.card_number:
417 card_number = args.card_number
418 else:
419 card_number = 0
420 card_number = opal.resolve_card_number(card_number)
421
422 if args.num_days:
423 num_days = int(args.num_days)
424 elif args.graph_commuter:
425 num_days = 14
426 else:
427 num_days = 365
428
429 transaction_list = opal.get_transaction_list_for_card(card_number)
430 transaction_list = list(restrict_days(transaction_list, num_days))
431
432 if not transaction_list:
433 print("No transactions!", file=sys.stderr)
434 return
435
436 if args.show_transactions:
437 print_transaction_list(transaction_list)
438 elif args.graph_commuter:
439 graph_commuter(transaction_list)
440 else:
441 print("Missing display function!", file=sys.stderr)
442
443 def parse_args():
444 parser = argparse.ArgumentParser(description="Opal card activity fetcher")
445
446 parser.add_argument("--num-days",
447 help="restrict to NUM_DAYS of output"
448 )
449 parser.add_argument("--card-number",
450 help="Opal card number or index (eg: 0,1,etc"
451 )
452
453 group = parser.add_mutually_exclusive_group(required=True)
454 group.add_argument("--load", action="store_true",
455 help="load any new data from the Opal website"
456 )
457 group.add_argument("--show-cards", action="store_true",
458 help="show a list of cards"
459 )
460 group.add_argument("--show-transactions", action="store_true",
461 help="show transactions for card"
462 )
463 group.add_argument("--graph-commuter", action="store_true",
464 help="draw commuter graph for card with gnuplot"
465 )
466
467 args = parser.parse_args()
468
469 return args
470
471 def main():
472 args = parse_args()
473
474 if args.load:
475 do_load()
476 return
477
478 elif args.show_cards:
479 do_show_cards()
480 return
481
482 else:
483 do_print(args)
484
485 if __name__ == "__main__":
486 try:
487 main()
488 except (KeyboardInterrupt, BrokenPipeError) as e:
489 print("Exiting:", e, file=sys.stderr)
490 sys.exit(1)
491