]> code.delx.au - monosys/blob - opal-card-tool
Use '/usr/bin/env python?'
[monosys] / opal-card-tool
1 #!/usr/bin/env python3
2
3 import argparse
4 import csv
5 import datetime
6 import getpass
7 import itertools
8 import lxml.html
9 import os
10 import pickle
11 import requests
12 import subprocess
13 import sys
14 import tempfile
15
16
17 VERSION = 3
18
19 CACHE_DIR = os.environ.get("XDG_CACHE_HOME", os.path.expanduser("~/.cache/opal-card-tool"))
20 PICKLE_FILE = os.path.join(CACHE_DIR, "pickle")
21
22 OPAL_BASE = "https://www.opal.com.au"
23 LOGIN_URL = OPAL_BASE + "/login/registeredUserUsernameAndPasswordLogin"
24 CARD_DETAILS_URL = OPAL_BASE + "/registered/getJsonCardDetailsArray"
25 TRANSACTION_LIST_URL = OPAL_BASE + "/registered/opal-card-transactions/opal-card-activities-list?AMonth=-1&AYear=-1&cardIndex=%d&pageIndex=%d"
26
27
28
29 def stringify(el):
30 return " ".join(t.strip() for t in el.itertext()).strip()
31
32 def get_first(l):
33 for x in l:
34 return x
35
36 def is_weekday(d):
37 return d.isoweekday() <= 5
38
39 def n_days_ago(days):
40 d = datetime.datetime.now() - datetime.timedelta(days=days)
41 d = d.replace(hour=0, minute=0, second=0, microsecond=0)
42 return d
43
44
45 class FatalError(Exception):
46 pass
47
48 class Transaction(object):
49 pass
50
51 class Card(object):
52 def __init__(self):
53 self.transaction_list = []
54
55 def get_max_transaction(self):
56 if self.transaction_list:
57 return self.transaction_list[0].number
58 else:
59 return -1
60
61 def add_transactions(self, l):
62 self.transaction_list = l + self.transaction_list
63
64 class Opal(object):
65 def __init__(self, username, password):
66 self.version = VERSION
67 self.username = username
68 self.password = password
69
70 self.session = requests.Session()
71 self.cards = []
72
73 def login(self):
74 r = self.session.post(LOGIN_URL, {
75 "h_username": self.username,
76 "h_password": self.password,
77 })
78 if not r.ok:
79 raise Exception("Failed to login, error code: %d" % r.status_code)
80
81 json = r.json()
82 if json["errorMessage"]:
83 raise Exception("Failed to login: %s" % json["errorMessage"])
84
85 def load(self):
86 self.load_cards()
87 for card in self.cards:
88 self.load_transactions(card)
89
90 def resolve_card_number(self, card_number):
91 if int(card_number) < len(self.cards):
92 return self.cards[int(card_number)].number
93 else:
94 return card_number
95
96 def get_transaction_list_for_card(self, card_number):
97 for card in self.cards:
98 if card.number == card_number:
99 return card.transaction_list
100
101 return []
102
103 def load_cards(self):
104 r = self.session.get(CARD_DETAILS_URL)
105 if not r.ok:
106 raise Exception("Failed to login, error code: %d" % r.status_code)
107
108 for index, card_json in enumerate(r.json()):
109 card_number = card_json["cardNumber"]
110
111 for card in self.cards:
112 if card.number == card_number:
113 break
114 else:
115 card = Card()
116 self.cards.append(card)
117
118 card.number = card_number
119 card.name = card_json["cardNickName"]
120 card.index = index
121
122 def load_transactions(self, card):
123 print("Loading transactions for", card.number, "", end="", flush=True)
124 max_transaction = card.get_max_transaction()
125 transaction_list = []
126
127 for page in itertools.count(1):
128 print(".", end="", flush=True)
129 transaction_page = self.fetch_transaction_page(card.index, page)
130 continue_paging = False
131
132 for transaction in transaction_page:
133 if transaction.number <= max_transaction:
134 continue_paging = False
135 break
136
137 transaction_list.append(transaction)
138 continue_paging = True
139
140 if not continue_paging:
141 break
142
143 print(" done")
144 card.add_transactions(transaction_list)
145
146 def parse_transaction(self, cells):
147 t = Transaction()
148 t.number = int(stringify(cells["transaction number"]))
149 t.timestamp = datetime.datetime.strptime(stringify(cells["date/time"]), "%a %d/%m/%Y %H:%M")
150 t.mode = get_first(cells["mode"].xpath("img/@alt"))
151 t.details = stringify(cells["details"])
152 t.journey_number = stringify(cells["journey number"])
153 t.fare_applied = stringify(cells["fare applied"])
154 t.fare = stringify(cells["fare"])
155 t.fare_discount = stringify(cells["discount"])
156 t.amount = stringify(cells["amount"])
157 return t
158
159 def fetch_transaction_page(self, card, page):
160 url = TRANSACTION_LIST_URL % (card, page)
161 r = self.session.get(url)
162 if not r.ok:
163 raise Exception("Failed to fetch transactions, error code: %d" % r.status_code)
164
165 doc = lxml.html.fromstring(r.text)
166 headers = [stringify(th).lower() for th in doc.xpath("//table/thead//th")]
167
168 if not headers:
169 return []
170
171 for tr in doc.xpath("//table/tbody/tr"):
172 try:
173 yield self.parse_transaction(dict(zip(headers, tr.getchildren())))
174 except Exception:
175 print("Failed to parse:", headers, lxml.html.tostring(tr), file=sys.stderr)
176 raise
177
178
179 class CommuterGraph(object):
180 class gnuplot_dialect(csv.excel):
181 delimiter = " "
182
183 def __init__(self):
184 self.data_am_csv, self.data_am_file = self.new_csv()
185 self.data_pm_csv, self.data_pm_file = self.new_csv()
186 self.plot_file = self.new_tempfile()
187 self.files = [self.data_am_file, self.data_pm_file, self.plot_file]
188
189 self.xrange_start = None
190 self.xrange_end = None
191
192 def is_plottable(self):
193 return self.xrange_start is not None and self.xrange_end is not None
194
195 def graph(self, transaction_list):
196 try:
197 self.write_points(transaction_list)
198 if not self.is_plottable():
199 print("No transactions!", file=sys.stderr)
200 return
201 self.write_plot_command()
202 self.flush_files()
203 self.run_gnuplot()
204 finally:
205 self.cleanup()
206
207 def new_tempfile(self):
208 return tempfile.NamedTemporaryFile(
209 mode="w",
210 encoding="utf-8",
211 prefix="opal-card-tool-",
212 delete=True,
213 )
214
215 def new_csv(self):
216 f = self.new_tempfile()
217 out = csv.writer(f, dialect=self.gnuplot_dialect)
218 return out, f
219
220 def update_xrange(self, ts):
221 if self.xrange_start is None or ts < self.xrange_start:
222 self.xrange_start = ts
223 if self.xrange_end is None or ts > self.xrange_end:
224 self.xrange_end = ts
225
226 def generate_point(self, transaction):
227 ts = transaction.timestamp
228 x_date = ts.strftime("%Y-%m-%dT00:00:00")
229 y_time = ts.strftime("2000-01-01T%H:%M:00")
230 y_label = ts.strftime("%H:%M")
231 return [x_date, y_time, y_label]
232
233 def write_point(self, ts, point):
234 if ts.time() < datetime.time(12):
235 out_csv = self.data_am_csv
236 else:
237 out_csv = self.data_pm_csv
238
239 out_csv.writerow(point)
240
241 def write_points(self, transaction_list):
242 for transaction in transaction_list:
243 if not self.is_commuter_transaction(transaction):
244 continue
245
246 self.update_xrange(transaction.timestamp)
247 point = self.generate_point(transaction)
248 self.write_point(transaction.timestamp, point)
249
250 def is_commuter_transaction(self, transaction):
251 if not is_weekday(transaction.timestamp):
252 return False
253 if transaction.details.startswith("Auto top up"):
254 return False
255 return True
256
257 def write_plot_command(self):
258 d = {
259 "data_am_filename": self.data_am_file.name,
260 "data_pm_filename": self.data_pm_file.name,
261 "xrange_start": self.xrange_start - datetime.timedelta(hours=24),
262 "xrange_end": self.xrange_end + datetime.timedelta(hours=24),
263 }
264 self.plot_file.write(R"""
265 set timefmt '%Y-%m-%dT%H:%M:%S'
266
267 set xlabel 'Date'
268 set xdata time
269 set format x '%a %d'
270 set xtics 86400 scale 1.0,0.0
271 set xrange [ '{xrange_start}' : '{xrange_end}' ]
272
273 set ylabel 'Time'
274 set ydata time
275 set format y '%H:%M'
276 set yrange [ '2000-01-01T06:00:00' : '2000-01-01T23:00:00' ]
277
278 set key box opaque
279 set terminal qt \
280 persist \
281 title 'opal-card-tool graph' \
282 font 'Sans,10' \
283 enhanced \
284 size 1000,700
285
286 plot \
287 '{data_pm_filename}' \
288 using 1:2 \
289 with line \
290 title 'Afternoon departure time' \
291 , \
292 '{data_pm_filename}' \
293 using 1:2:3 \
294 with labels \
295 offset 0,1 \
296 notitle \
297 , \
298 '{data_am_filename}' \
299 using 1:2 \
300 with line \
301 title 'Morning departure time' \
302 , \
303 '{data_am_filename}' \
304 using 1:2:3 \
305 with labels \
306 offset 0,1 \
307 notitle \
308 """.format(**d))
309
310 def flush_files(self):
311 for f in self.files:
312 f.flush()
313
314 def cleanup(self):
315 for f in self.files:
316 try:
317 f.close()
318 except:
319 pass
320
321 def run_gnuplot(self):
322 subprocess.check_call([
323 "gnuplot",
324 self.plot_file.name,
325 ])
326
327 def restrict_days(transaction_list, num_days):
328 oldest_date = n_days_ago(num_days)
329 for transaction in transaction_list:
330 if transaction.timestamp < oldest_date:
331 return
332 yield transaction
333
334 def graph_commuter(transaction_list):
335 g = CommuterGraph()
336 g.graph(transaction_list)
337
338 def print_transaction_list(transaction_list):
339 headers = []
340 headers.extend(["number", "timestamp"])
341 headers.extend(h for h in sorted(transaction_list[0].__dict__.keys()) if h not in headers)
342
343 out = csv.DictWriter(sys.stdout, headers)
344 out.writeheader()
345 for transaction in transaction_list:
346 out.writerow(transaction.__dict__)
347
348 def print_cards(opal):
349 for i, card in enumerate(opal.cards):
350 print("Card", i)
351 print(" number:", card.number)
352 print(" name:", card.name)
353 print(" transactions:", len(card.transaction_list))
354 print()
355
356 def try_unpickle():
357 if not os.path.isfile(PICKLE_FILE):
358 return None
359
360 with open(PICKLE_FILE, "rb") as f:
361 return pickle.load(f)
362
363 def save_pickle(opal):
364 if not os.path.isdir(CACHE_DIR):
365 os.makedirs(CACHE_DIR)
366 with open(PICKLE_FILE, "wb") as f:
367 pickle.dump(opal, f)
368
369
370
371 def upgrade_opal_v2(opal):
372 # example upgrade!
373 opal.version = 3
374
375 def upgrade_opal(opal):
376 while opal.version < VERSION:
377 print("Upgrading from version", opal.version, file=sys.stderr)
378 upgrade_func = globals()["upgrade_opal_v%d" % opal.version]
379 upgrade_func(opal)
380
381
382
383 def load_opal():
384 opal = try_unpickle()
385
386 if opal:
387 upgrade_opal(opal)
388 else:
389 username = input("Username: ")
390 password = getpass.getpass()
391 opal = Opal(username, password)
392
393 save_pickle(opal)
394 return opal
395
396 def do_load():
397 opal = load_opal()
398 opal.login()
399 opal.load()
400 save_pickle(opal)
401
402 def do_show_cards():
403 opal = load_opal()
404 print_cards(opal)
405 save_pickle(opal)
406
407 def do_print(args):
408 opal = load_opal()
409
410 if args.card_number:
411 card_number = args.card_number
412 else:
413 card_number = 0
414 card_number = opal.resolve_card_number(card_number)
415
416 if args.num_days:
417 num_days = int(args.num_days)
418 elif args.graph_commuter:
419 num_days = 14
420 else:
421 num_days = 365
422
423 transaction_list = opal.get_transaction_list_for_card(card_number)
424 transaction_list = list(restrict_days(transaction_list, num_days))
425
426 if not transaction_list:
427 print("No transactions!", file=sys.stderr)
428 return
429
430 if args.show_transactions:
431 print_transaction_list(transaction_list)
432 elif args.graph_commuter:
433 graph_commuter(transaction_list)
434 else:
435 print("Missing display function!", file=sys.stderr)
436
437 def parse_args():
438 parser = argparse.ArgumentParser(description="Opal card activity fetcher")
439
440 parser.add_argument("--num-days",
441 help="restrict to NUM_DAYS of output"
442 )
443 parser.add_argument("--card-number",
444 help="Opal card number or index (eg: 0,1,etc"
445 )
446
447 group = parser.add_mutually_exclusive_group(required=True)
448 group.add_argument("--load", action="store_true",
449 help="load any new data from the Opal website"
450 )
451 group.add_argument("--show-cards", action="store_true",
452 help="show a list of cards"
453 )
454 group.add_argument("--show-transactions", action="store_true",
455 help="show transactions for card"
456 )
457 group.add_argument("--graph-commuter", action="store_true",
458 help="draw commuter graph for card with gnuplot"
459 )
460
461 args = parser.parse_args()
462
463 return args
464
465 def main():
466 args = parse_args()
467
468 if args.load:
469 do_load()
470 return
471
472 elif args.show_cards:
473 do_show_cards()
474 return
475
476 else:
477 do_print(args)
478
479 if __name__ == "__main__":
480 try:
481 main()
482 except (KeyboardInterrupt, BrokenPipeError) as e:
483 print("Exiting:", e, file=sys.stderr)
484 sys.exit(1)
485