Created
February 13, 2021 21:06
-
-
Save amb/c1ade483248eb090992cc69241aa66db to your computer and use it in GitHub Desktop.
Simple chainanalysis with Python and Blockcypher
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from blockcypher import get_address_details, get_transaction_details | |
import json | |
import datetime | |
def open_chain_info(addr, func): | |
"cache web API data locally" | |
def _datetime_conv(o): | |
if isinstance(o, datetime.datetime): | |
return o.__str__() | |
filepath = "store/" + addr + ".json" | |
try: | |
f = open(filepath, "r") | |
try: | |
c = json.load(f) | |
except json.decoder.JSONDecodeError: | |
print("Faulty JSON, reloading...") | |
else: | |
f.close() | |
return c | |
except FileNotFoundError: | |
print("File not found, creating:", filepath) | |
f = open(filepath, "w") | |
contents = func(addr) | |
json.dump(contents, f, default=_datetime_conv) | |
f.close() | |
return contents | |
class BitcoinAddressInputs: | |
__slots__ = "transactions", "input_addresses", "total_received", "address", "all_addresses" | |
def __init__(self, address): | |
self.address = address | |
self.transactions = [] | |
self.input_addresses = set() | |
self.total_received = 1.0 | |
def pretty_transactions(self): | |
return "".join(f"{i[0]} \t=> {self.address} \t{i[1]}\n" for i in self.transactions) | |
def list_transactions(self): | |
# tuple(input, output, amount) | |
return [(i[0], self.address, i[1]) for i in self.transactions] | |
def add_input(self, a, val): | |
s = set([a]) | |
self.input_addresses |= s | |
self.transactions.append((a, val)) | |
def normalize_inputs(self): | |
"Divide all inputs by (total_received/sum(inputs))" | |
s = sum(i[1] for i in self.transactions) | |
t = self.total_received | |
self.transactions = [(i[0], i[1] * t / s) for i in self.transactions] | |
def get_all_addresses(self): | |
return set([self.address]) | self.input_addresses | |
def address_all_inputs(walk_address, max_total=2.0, filtered=set([])): | |
"read all inputs for a given Bitcoin address" | |
adi = BitcoinAddressInputs(walk_address) | |
res = open_chain_info(walk_address, get_address_details) | |
t = res["total_received"] / 100000000 | |
if t > max_total: | |
return None | |
adi.total_received = t | |
for t in res["txrefs"]: | |
tx = t["tx_hash"] | |
deets = open_chain_info(tx, get_transaction_details) | |
# check if walk_address in outputs | |
atest = [ | |
i | |
for i in deets["outputs"] | |
if i["addresses"] is not None and walk_address in i["addresses"] | |
] | |
assert len(atest) < 2 | |
# walk through all inputs | |
if len(atest) > 0: | |
for it in deets["inputs"]: | |
if "addresses" not in it or it["addresses"][0] in filtered: | |
# empty input, or address in filter list | |
continue | |
a = it["addresses"][0] | |
assert len(it["addresses"]) == 1 | |
adi.add_input(a, atest[0]["value"] / 100000000) | |
# adi.add_input(a, atest[0]["value"] / 100000000 / len(deets["inputs"])) | |
# adi.add_input(a, 1.0) | |
adi.normalize_inputs() | |
return adi | |
filter_these = set([""]) | |
# backtrack N steps | |
# 14 max | |
addr = "" | |
# input transactions | |
btc_i = address_all_inputs(addr, filtered=filter_these) | |
all_addr = btc_i.get_all_addresses() | |
psource, pdest, pvalue = [list(i) for i in zip(*btc_i.list_transactions())] | |
already_scanned = set([addr]) | |
inputs_loop = btc_i.input_addresses | |
for _ in range(5): | |
new_inputs = set([]) | |
for a in inputs_loop: | |
if a in already_scanned: | |
continue | |
already_scanned.add(a) | |
new_i = address_all_inputs(a, filtered=filter_these) | |
if new_i is None: | |
# hit max received | |
continue | |
all_addr |= new_i.get_all_addresses() | |
new_inputs |= new_i.input_addresses | |
a, b, c = [tuple(i)[0] for i in zip(*new_i.list_transactions())] | |
psource.append(a) | |
pdest.append(b) | |
pvalue.append(c) | |
inputs_loop = new_inputs | |
# format data for plotting | |
indices = {i: e for e, i in enumerate(all_addr)} | |
plabel = list(indices.keys()) | |
for i in range(len(psource)): | |
psource[i] = indices[psource[i]] | |
pdest[i] = indices[pdest[i]] | |
def transform_a(tx_inputs, value): | |
all_addresses = [] | |
all_inputs = {} | |
all_inputs[addr] = list(zip(tx_inputs, [value] * len(tx_inputs))) | |
for _ in range(0): | |
all_addresses.extend(tx_inputs) | |
new_addresses = set() | |
for i in tx_inputs: | |
d, v = address_all_inputs(i) | |
ai = set(d) - filter_these | |
all_inputs[i] = list(zip(ai, [v] * len(ai))) | |
new_addresses |= ai | |
tx_inputs = list(new_addresses) | |
return all_inputs | |
def plot_transactions(labels, psource, pdest, pvalue): | |
import plotly | |
import plotly.graph_objects as go | |
gs = go.Sankey( | |
orientation="v", | |
node=dict(pad=15, thickness=20, label=labels), | |
link=dict(source=psource, target=pdest, value=pvalue), | |
) | |
fig = go.Figure(data=[gs]) | |
fig.update_layout(title_text="Blockchain transactions", font_size=10) | |
# fig.show() | |
plotly.offline.plot(fig, filename="html/transactions.html") | |
def plot_input_graph(g_inputs): | |
indexing = {} | |
labels = [] | |
# index all discovered addresses as numbers | |
count = 0 | |
for k, v in g_inputs.items(): | |
indexing[k] = count | |
labels.append(k) | |
count += 1 | |
for a, b in v: | |
if a not in indexing: | |
indexing[a] = count | |
labels.append(a) | |
count += 1 | |
# create list for plotly | |
psource, pdest, pvalue = [], [], [] | |
for k, v in g_inputs.items(): | |
for s in v: | |
psource.append(indexing[s[0]]) | |
pdest.append(indexing[k]) | |
pvalue.append(s[1]) | |
plot_transactions(labels, psource, pdest, pvalue) | |
plot_transactions(plabel, psource, pdest, pvalue) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment