Created
November 5, 2019 20:00
-
-
Save julian-klode/be46a2e9964f2ad8878c68c1e6ce04d7 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python3 | |
import json | |
import requests | |
import os | |
import sys | |
import time | |
from collections import OrderedDict | |
from typing import Any, Dict, Iterator | |
user = "FIXME" | |
password = "FIXME" | |
LOGIN_CACHE = os.path.expanduser("~/.cache/n26-login.json") | |
# When syncing, reject cached transactions older than this | |
USER_AGENT = ( | |
"Mozilla/5.0 (X11; Linux x86_64) " | |
"AppleWebKit/537.36 (KHTML, like Gecko) " | |
"Chrome/59.0.3071.86 Safari/537.36" | |
) | |
BASIC_HEADERS = { | |
"Authorization": "Basic bXktdHJ1c3RlZC13ZHBDbGllbnQ6c2VjcmV0", | |
"User-Agent": USER_AGENT, | |
} | |
def authorize_mfa_request_token() -> str: | |
"""Request a new MFA token.""" | |
data = [("username", user), ("password", password), ("grant_type", "password")] | |
response = requests.post( | |
"https://api.tech26.de/oauth/token", data=data, headers=BASIC_HEADERS | |
) | |
try: | |
mfa_token = response.json()["mfaToken"] | |
assert isinstance(mfa_token, str) | |
return mfa_token | |
except KeyError as e: | |
response.raise_for_status() | |
raise e | |
def authorize_mfa_start(mfa_token: str) -> None: | |
"""Start the multi-factor authentication on the associated device.""" | |
response = requests.post( | |
"https://api.tech26.de/api/mfa/challenge", | |
json={"challengeType": "oob", "mfaToken": mfa_token}, | |
headers=BASIC_HEADERS, | |
) | |
response.raise_for_status() | |
def authorize_mfa_end(mfa_token: str) -> requests.Response: | |
"""Complete the multi-factor authentication. | |
This tries 6 times, waiting 10 seconds each. | |
""" | |
print("Waiting to approve login request", end="", file=sys.stderr, flush=True) | |
for _ in range(6): | |
time.sleep(10) | |
print(".", end="", file=sys.stderr, flush=True) | |
response = requests.post( | |
"https://api.tech26.de/oauth/token", | |
data={"grant_type": "mfa_oob", "mfaToken": mfa_token}, | |
headers=BASIC_HEADERS, | |
) | |
if response.ok: | |
break | |
print(file=sys.stderr) | |
response.raise_for_status() | |
return response | |
def authorize_refresh() -> requests.Response: | |
"""Re-authorize from refresh_token""" | |
with open(LOGIN_CACHE, "r") as cachef: | |
cache = json.load(cachef) | |
response = requests.post( | |
"https://api.tech26.de/oauth/token", | |
data={"grant_type": "refresh_token", "refresh_token": cache["refresh_token"]}, | |
headers=BASIC_HEADERS, | |
) | |
response.raise_for_status() | |
return response | |
def authorize() -> str: | |
"Return the bearer." | |
try: | |
response = authorize_refresh() | |
except Exception as e: | |
print(e, file=sys.stderr) | |
mfa_token = authorize_mfa_request_token() | |
authorize_mfa_start(mfa_token) | |
response = authorize_mfa_end(mfa_token) | |
with open(LOGIN_CACHE, "w") as cachef: | |
result = response.json() | |
result["expires_at"] = result["expires_in"] + int(time.time()) | |
json.dump(result, cachef) | |
access_token = response.json()["access_token"] | |
assert isinstance(access_token, str) | |
return access_token | |
def get_transactions(bearer: str) -> Iterator[Dict[str, Any]]: | |
last_id = "" | |
new_last_id = "" | |
while True: | |
print("Requesting transactions after %s" % last_id, file=sys.stderr) | |
response = requests.get( | |
"https://api.tech26.de/api/smrt/transactions?limit=50" + last_id, | |
headers={"Authorization": "bearer %s" % bearer, "User-Agent": USER_AGENT}, | |
) | |
response.raise_for_status() | |
body = response.json() | |
for transaction in body: | |
yield transaction | |
new_last_id = "&lastId=%s" % transaction["id"] | |
if new_last_id == last_id: | |
return | |
last_id = new_last_id | |
def new_transactions(bearer: str, old: Dict[str, Dict[str, Any]]) -> None: | |
new_transactions = [] | |
found_overlap = 0 | |
for transaction in get_transactions(bearer): | |
if transaction["id"] not in old or transaction["type"] == "AA": | |
found_overlap = 0 | |
elif transaction["id"] in old and transaction["type"] != "AA": | |
found_overlap += 1 | |
last_overlap = transaction | |
new_transactions.append(transaction) | |
# We found 120 completed transactions, let's assume we are done | |
if found_overlap == 100: | |
found = False | |
for k in old: | |
old_transaction = old[k] | |
if old_transaction["id"] == last_overlap["id"]: | |
found = True | |
elif found: | |
new_transactions.append(old_transaction) | |
break | |
json.dump(new_transactions, fp=sys.stdout, indent=4) | |
return | |
with open("n26.json") as fobj: | |
old = OrderedDict((t["id"], t) for t in json.load(fobj)) | |
bearer = authorize() | |
new_transactions(bearer, old) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Beware that the sync algorithm is not entirely accurate, it may occassionally produce wrong results, and you need to resync or increase the overlap value.