Skip to content

Instantly share code, notes, and snippets.

@julian-klode
Last active November 19, 2020 15:28
Show Gist options
  • Save julian-klode/bcef26e3e7f47f0b6813c8f43a478234 to your computer and use it in GitHub Desktop.
Save julian-klode/bcef26e3e7f47f0b6813c8f43a478234 to your computer and use it in GitHub Desktop.
#!/usr/bin/python3
#
# Copyright (C) 2019 Julian Andres Klode
#
# SPDX-License-Identifier: AGPL-3.0-or-later
#
import json
import requests
import os
import sys
import time
import uuid
from collections import OrderedDict
from typing import Any, Dict, Iterator
from gi.repository import Secret # type: ignore
item = Secret.Service.get_sync(0).search_sync(
None, {"signon_realm": "https://app.n26.com/"}, 0, None
)[0]
item.load_secret_sync()
user: str = item.get_attributes()["username_value"]
password: str = item.get_secret().get_text()
LOGIN_CACHE = os.path.expanduser("~/.cache/n26-login.json")
with open("/etc/machine-id", "r") as machine_id_file:
MACHINE_ID = machine_id_file.read().strip()
# When syncing, reject cached transactions older than this
USER_AGENT = (
"Mozilla/5.0 (X11; Linux x86_64) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/59.0.3071.86 Safari/537.36"
)
BASIC_HEADERS = {
"Authorization": "Basic bXktdHJ1c3RlZC13ZHBDbGllbnQ6c2VjcmV0",
"User-Agent": USER_AGENT,
"device-token": str(uuid.UUID(hex=MACHINE_ID)),
}
def authorize_mfa_request_token() -> str:
"""Request a new MFA token."""
data = [("username", user), ("password", password), ("grant_type", "password")]
response = requests.post(
"https://api.tech26.de/oauth2/token", data=data, headers=BASIC_HEADERS
)
try:
mfa_token = response.json()["mfaToken"]
assert isinstance(mfa_token, str)
return mfa_token
except KeyError as e:
response.raise_for_status()
raise e
def authorize_mfa_start(mfa_token: str) -> None:
"""Start the multi-factor authentication on the associated device."""
response = requests.post(
"https://api.tech26.de/api/mfa/challenge",
json={"challengeType": "oob", "mfaToken": mfa_token},
headers=BASIC_HEADERS,
)
response.raise_for_status()
def authorize_mfa_end(mfa_token: str) -> requests.Response:
"""Complete the multi-factor authentication.
This tries 6 times, waiting 10 seconds each.
"""
print("Waiting to approve login request", end="", file=sys.stderr, flush=True)
while True:
input()
print(".", end="", file=sys.stderr, flush=True)
response = requests.post(
"https://api.tech26.de/oauth2/token",
data={"grant_type": "mfa_oob", "mfaToken": mfa_token},
headers=BASIC_HEADERS,
)
if response.ok:
break
print(file=sys.stderr)
response.raise_for_status()
return response
def authorize_refresh() -> requests.Response:
"""Re-authorize from refresh_token"""
with open(LOGIN_CACHE, "r") as cachef:
cache = json.load(cachef)
response = requests.post(
"https://api.tech26.de/oauth2/token",
data={"grant_type": "refresh_token", "refresh_token": cache["refresh_token"]},
headers=BASIC_HEADERS,
)
response.raise_for_status()
return response
def authorize() -> str:
"Return the bearer."
try:
response = authorize_refresh()
except Exception as e:
print(e, file=sys.stderr)
mfa_token = authorize_mfa_request_token()
authorize_mfa_start(mfa_token)
response = authorize_mfa_end(mfa_token)
with open(LOGIN_CACHE, "w") as cachef:
result = response.json()
result["expires_at"] = result["expires_in"] + int(time.time())
json.dump(result, cachef)
access_token = response.json()["access_token"]
assert isinstance(access_token, str)
return access_token
def get_transactions(bearer: str) -> Iterator[Dict[str, Any]]:
last_id = ""
new_last_id = ""
while True:
print("Requesting transactions after %s" % last_id, file=sys.stderr)
response = requests.get(
"https://api.tech26.de/api/smrt/transactions?limit=50" + last_id,
headers={"Authorization": "bearer %s" % bearer, "User-Agent": USER_AGENT},
)
response.raise_for_status()
body = response.json()
for transaction in body:
yield transaction
new_last_id = "&lastId=%s" % transaction["id"]
if new_last_id == last_id:
return
last_id = new_last_id
def new_transactions(bearer: str, old: Dict[str, Dict[str, Any]]) -> None:
new_transactions = []
found_overlap = 0
for transaction in get_transactions(bearer):
if transaction["id"] not in old or transaction["type"] == "AA":
found_overlap = 0
elif transaction["id"] in old and transaction["type"] != "AA":
found_overlap += 1
last_overlap = transaction
new_transactions.append(transaction)
# We found 120 completed transactions, let's assume we are done
if found_overlap == 100:
found = False
for k in old:
old_transaction = old[k]
if old_transaction["id"] == last_overlap["id"]:
found = True
elif found:
new_transactions.append(old_transaction)
break
json.dump(new_transactions, fp=sys.stdout, indent=4)
return
with open(".cache/n26.json") as fobj:
old = OrderedDict((t["id"], t) for t in json.load(fobj))
bearer = authorize()
new_transactions(bearer, old)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment