Last active
May 29, 2024 18:33
-
-
Save vrbadev/8f808f86b6789ad6bab2582d44a168bc to your computer and use it in GitHub Desktop.
Simple Python script which gathers all available Bitstamp OHLC data for selected trading pair and interval.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -*- | |
""" | |
Created on Sat Sep 23 14:04:42 2023 | |
@author: Vojtech Vrba ([email protected]) | |
Simple Python script which gathers all available Bitstamp OHLC data for selected trading pair and interval. | |
The data is stored in a new CSV file / only new data is appended to the end of an existing CSV file. | |
""" | |
import datetime as dt | |
import os | |
import requests | |
import signal | |
import time | |
import tqdm | |
# Configuration | |
CURRENCY_PAIR = "btceur" | |
MAX_OHLC_ROWS_PER_REQUEST = 1000 | |
OHLC_INTERVAL_SECONDS = 60 | |
REQUEST_DELAY_SECONDS = 0.1 | |
CSV_FILEPATH = "./bitstamp_ohlc_pair-%s_int-%d.csv" % (CURRENCY_PAIR, OHLC_INTERVAL_SECONDS) | |
CSV_COLUMNS_TYPES = {"timestamp": int, "open": float, "high": float, "low": float, "close": float, "volume": float} | |
# An efficient way to get only the last line from a large CSV file | |
def get_csv_tail(filepath, encoding="UTF-8"): | |
with open(filepath, "rb") as f: | |
header_line = f.readline().decode(encoding).strip() | |
try: | |
f.seek(-1, 2) # file end -1 byte | |
except: | |
return None, None | |
last_line_found = False | |
row_len = 0 | |
while not last_line_found: | |
row_len = 0 | |
try: | |
while f.read(1) != b'\n': | |
row_len += 1 | |
f.seek(-2, 1) # curr pos -2 bytes | |
except IOError: | |
f.seek(-1, 1) # curr pos -1 byte | |
if f.tell() == 0: | |
break | |
if row_len >= 1: | |
last_line_found = True | |
f.seek(-2, 1) # curr pos -2 bytes | |
last_line = None | |
if last_line_found: | |
f.seek(1, 1) # curr pos +1 byte | |
last_line = f.read().decode(encoding).strip() | |
return header_line, last_line | |
# Request OHLC data from Bitstamp server using HTTP GET, parse as JSON | |
def get_ohlc_data(currency_pair, start_unix, num_values=1000, interval_sec=60): | |
url = "https://www.bitstamp.net/api/v2/ohlc/%s/" % (currency_pair) | |
data = requests.get(url, params={"step": interval_sec, "limit": num_values, "start": start_unix}) | |
return data.json()["data"]["ohlc"] | |
# Binary search to find the starting timestamp of a consistent remote OHLC data block | |
def find_absolute_starting_unix(currency_pair, interval_sec=60): | |
now_timestamp = int(dt.datetime.now().timestamp()) | |
test_timestamp = now_timestamp | |
step = test_timestamp // 2 | |
while True: | |
response = get_ohlc_data(currency_pair, test_timestamp, num_values=2, interval_sec=interval_sec) | |
if len(response) == 0: | |
test_timestamp += step | |
elif len(response) == 2 or now_timestamp - int(response[0]["timestamp"]) <= 60: | |
test_timestamp -= step | |
else: | |
test_timestamp = int(response[0]["timestamp"]) | |
break | |
step //= 2 | |
return test_timestamp | |
# Pretty formatting of a UNIX timestamp | |
def format_unix_str(timestamp): | |
return dt.datetime.fromtimestamp(timestamp).strftime("%Y-%m-%d %H:%M") | |
if __name__ == "__main__": | |
terminated = False | |
signal.signal(signal.SIGINT, lambda sig, frame: globals().update(terminated=True)) | |
now = dt.datetime.now() | |
print("Checking Bitstamp data availability...") | |
start_unix = find_absolute_starting_unix(CURRENCY_PAIR, interval_sec=OHLC_INTERVAL_SECONDS) | |
print("Bistamp data for pair '%s' are available since timestamp %d (%s)" % (CURRENCY_PAIR, start_unix, format_unix_str(start_unix))) | |
if os.path.exists(CSV_FILEPATH): | |
header_line, last_line = get_csv_tail(CSV_FILEPATH) | |
if header_line == None: | |
print("Found existing CSV file, but it is empty - removing the file.") | |
os.remove(CSV_FILEPATH) | |
else: | |
last_row = {col_name: col_type(data) for (col_name, col_type), data in zip(CSV_COLUMNS_TYPES.items(), last_line.split(','))} | |
if last_line != None: | |
start_unix = int(last_row["timestamp"]) | |
print("Found existing CSV file with last saved timestamp %d (%s)." % (start_unix, format_unix_str(start_unix))) | |
start_unix += OHLC_INTERVAL_SECONDS | |
else: | |
print("Found existing CSV file, but it contains only header line!") | |
if not os.path.exists(CSV_FILEPATH): | |
print("No existing CSV file found, creating a new one.") | |
with open(CSV_FILEPATH, "w") as f: | |
f.write(','.join(CSV_COLUMNS_TYPES.keys()) + '\n') | |
missing_values = (int(now.timestamp()) - start_unix) // OHLC_INTERVAL_SECONDS | |
if missing_values > 0: | |
print("Program will start downloading data starting with timestamp: %d (%s)" % (start_unix, format_unix_str(start_unix))) | |
print("Missing %d-second intervals upto now: %d" % (OHLC_INTERVAL_SECONDS, missing_values)) | |
print("Running...") | |
last_unix = start_unix | |
with tqdm.tqdm(total=missing_values) as pbar: | |
while not terminated: | |
data = get_ohlc_data(CURRENCY_PAIR, start_unix, num_values=MAX_OHLC_ROWS_PER_REQUEST, interval_sec=OHLC_INTERVAL_SECONDS) | |
if len(data) == 0: | |
print("\nError: No data received!") | |
break | |
elif len(data) == 1 and last_unix >= int(data[-1]["timestamp"]): | |
print("\nReached the end of the currently available data!") | |
break | |
if len(data) != MAX_OHLC_ROWS_PER_REQUEST: | |
print("\nWarning: Got only %d/%d intervals of data!" % (len(data), MAX_OHLC_ROWS_PER_REQUEST)) | |
new_lines = list() | |
for entry in data: | |
row = {c: t(entry[c]) for c, t in CSV_COLUMNS_TYPES.items()} | |
new_lines.append(','.join([str(v) for v in row.values()]) + '\n') | |
pbar.update(1) | |
last_unix = row["timestamp"] | |
start_unix = last_unix + OHLC_INTERVAL_SECONDS | |
with open(CSV_FILEPATH, "a") as f: | |
f.writelines(new_lines) | |
pbar.set_description("Last saved: %s (timestamp %d) | Processed" % (format_unix_str(last_unix), last_unix)) | |
time.sleep(REQUEST_DELAY_SECONDS) | |
else: | |
print("No available data is missing from the CSV file.") | |
print("\nProgram done!") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment