Last active
November 17, 2020 11:52
-
-
Save uneasyguy/049af33ecde76475cee66f27a4b06d6a to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import time | |
import dateparser | |
import pytz | |
import json | |
import csv | |
import datetime | |
from dateutil.rrule import rrule, MONTHLY | |
from binance.client import Client | |
import os | |
import sys | |
pathname = os.path.dirname(sys.argv[0]) | |
full_path = '{}/'.format(str(os.path.abspath(pathname))) | |
def grab_currencies_list(): | |
base_currencies_list = list() | |
currencies_list = list() | |
info = Client(None,None) | |
pair_query = info.get_all_tickers() | |
list_preference = input('Which base currencies would you like to grab data for?(eg. BTC or ETH,BNB or BTC,ETH,BNB,USDT) ') | |
split_preference = list_preference.split(',') | |
for x in split_preference: | |
base_currency = x.strip().upper() | |
base_currencies_list.append(base_currency) | |
for y in base_currencies_list: | |
base = y | |
for z in range(0,len(pair_query)): | |
symbol = pair_query[z]['symbol'] | |
if symbol[-len(base):] == base: | |
currencies_list.append(symbol) | |
return currencies_list | |
def grab_date_interval(): | |
print ('What date range would you like to pull data from?\nIn MM/YYYY format,except you can enter now for end date to get most recent.') | |
start_date_input = input('Start date: ') | |
start_date_split = start_date_input.split('/') | |
month = int(start_date_split[0]) | |
day = 1 | |
year = int(start_date_split[1]) | |
start_date = datetime.date(year,month,day) | |
end_date_input = input('End date: ') | |
if end_date_input.lower() in ['now']: | |
end_date = datetime.datetime.now() | |
else: | |
end_date_split = end_date_input.split('/') | |
month = int(end_date_split[0]) | |
year = int(end_date_split[1]) | |
if month in [1,3,5,7,8,10,12]: | |
day = 31 | |
elif month in [4,6,9,11]: | |
day = 30 | |
elif month in [2] and year not in [2020,2024,2028,2032,2036,2040]: | |
day = 28 | |
else: | |
day = 29 | |
end_date = datetime.date(year,month,day) | |
return start_date,end_date | |
def grab_kline_interval(): | |
kline_interval = input('What Kline Interal would you prefer? Options: 1m,3m,5m,15m,30m,1h,2h,4h,6h,8h,12h ') | |
if kline_interval in ['1m','3m','5m','15m','30m','1h','2h','4h','6h','8h','12h']: | |
return kline_interval | |
else: | |
print ('{} is an invalid option, please try again'.format(str(kline_interval))) | |
kline_interval_2 = input('What Kline Interal would you prefer? Options: 1m,3m,5m,15m,30m,1h,2h,4h,6h,8h,12h ') | |
if kline_interval_2 in ['1m','3m','5m','15m','30m','1h','2h','4h','6h','8h','12h']: | |
return kline_interval_2 | |
def create_directories(pair_list): | |
main_directory_finder = [x[0] for x in os.walk(full_path)] | |
historical_price_data_directory = '{}historical_price_data'.format(str(full_path)) | |
if historical_price_data_directory not in main_directory_finder: | |
os.makedirs(historical_price_data_directory) | |
pair_directory_finder = [x[0] for x in os.walk('{}historical_price_data/'.format(str(full_path)))] | |
for x in range(0,len(pair_list)): | |
symbol = pair_list[x] | |
if '{}/{}'.format(str(historical_price_data_directory),str(symbol)) not in pair_directory_finder: | |
os.makedirs('{}historical_price_data/{}'.format(str(full_path),str(symbol))) | |
def date_to_milliseconds(date_str): | |
epoch = datetime.datetime.utcfromtimestamp(0).replace(tzinfo=pytz.utc) | |
d = dateparser.parse(date_str) | |
if d.tzinfo is None or d.tzinfo.utcoffset(d) is None: | |
d = d.replace(tzinfo=pytz.utc) | |
return int((d - epoch).total_seconds() * 1000.0) | |
def interval_to_milliseconds(interval): | |
ms = None | |
seconds_per_unit = {'m': 60,'h': 60 * 60,'d': 24 * 60 * 60,'w': 7 * 24 * 60 * 60} | |
unit = interval[-1] | |
if unit in seconds_per_unit: | |
try: | |
ms = int(interval[:-1]) * seconds_per_unit[unit] * 1000 | |
except ValueError: | |
pass | |
return ms | |
def get_historical_klines(symbol, interval, start_str, end_str=None): | |
client = Client(None, None) | |
output_data = [] | |
limit = 500 | |
timeframe = interval_to_milliseconds(interval) | |
start_ts = date_to_milliseconds(start_str) | |
end_ts = None | |
if end_str: | |
end_ts = date_to_milliseconds(end_str) | |
idx = 0 | |
symbol_existed = False | |
while True: | |
try: | |
temp_data = client.get_klines(symbol=symbol,interval=interval,limit=limit,startTime=start_ts,endTime=end_ts) | |
if not symbol_existed and len(temp_data): | |
symbol_existed = True | |
if symbol_existed: | |
output_data += temp_data | |
start_ts = temp_data[len(temp_data) - 1][0] + timeframe | |
else: | |
start_ts += timeframe | |
idx += 1 | |
except IndexError: | |
idx+=1 | |
if len(temp_data) < limit: | |
break | |
if idx % 3 == 0: | |
time.sleep(1) | |
return output_data | |
def grab_data(pair_list,dates,start_date,end_date,interval): | |
for x in range(0,len(pair_list)): | |
symbol = pair_list[x] | |
for x in range(0,len(dates)): | |
date_splitter = str(dates[x]).split('-') | |
year = date_splitter[0] | |
numerical_month = date_splitter[1] | |
month_abbreviation_dict = {'01':'Jan','02':'Feb','03':'Mar','04':'Apr','05':'May','06':'Jun','07':'Jul','08':'Aug','09':'Sept','10':'Oct','11':'Nov','12':'Dec'} | |
for num,abbr in month_abbreviation_dict.items(): | |
if numerical_month==num: | |
calendar_month = abbr | |
klines_date = '{}, {}'.format(str(calendar_month),str(year)) | |
csv_month = '{}-{}-'.format(str(year),str(numerical_month)) | |
if numerical_month in ['01','03','05','07','08','10','12']: | |
day = range(1,32) | |
elif numerical_month in ['04','06','09','11']: | |
day = range(1,31) | |
elif numerical_month in ['02'] and year not in ['2020','2024','2028','2032','2036','2040','2044']: | |
day = range(1,29) | |
else: | |
day = range(1,30) | |
for y in day: | |
next_day = y+1 | |
start = '{} {}'.format(str(y),str(klines_date)) | |
if y == day[-1]: | |
next_day = 1 | |
if int(numerical_month) in range(1,12): | |
next_numerical_month = int(numerical_month)+1 | |
else: | |
next_numerical_month = 1 | |
year = int(year)+1 | |
next_numerical_month_abbreviation_dict = {'1':'Jan','2':'Feb','3':'Mar','4':'Apr','5':'May','6':'Jun','7':'Jul','8':'Aug','9':'Sept','10':'Oct','11':'Nov','12':'Dec'} | |
for key,val in next_numerical_month_abbreviation_dict.items(): | |
if str(next_numerical_month)==key: | |
calendar_month = val | |
klines_date = '{}, {}'.format(str(calendar_month),str(year)) | |
end = '{} {}'.format(str(next_day),str(klines_date)) | |
print ('symbol {} start {} end {}'.format(str(symbol),str(start),str(end))) | |
klines = get_historical_klines(symbol, interval, start, end) | |
if len(klines)>0: | |
titles = ('Date','Open','High','Low','Close','Volume') | |
if y in range(1,10): | |
csv_day = '0{}'.format(str(y)) | |
else: | |
csv_day = str(y) | |
with open('{}historical_price_data/{}/{}{}_{}.csv'.format(str(full_path),str(symbol),str(csv_month),str(csv_day),str(interval)), 'a') as f: | |
writer = csv.writer(f) | |
writer.writerow(titles) | |
history_list = list() | |
for x in range(0,(len(klines)-1)): | |
open_epoch_timestamp = float(klines[x][0])/1000 | |
open_time = datetime.datetime.utcfromtimestamp(open_epoch_timestamp).strftime('%Y-%m-%d %H:%M:%S.%f') | |
open_price = klines[x][1] | |
high = klines[x][2] | |
low = klines[x][3] | |
close_price = klines[x][4] | |
volume = klines[x][5] | |
close_epoch_timestamp = float(klines[x][6])/1000 | |
close_time = datetime.datetime.utcfromtimestamp(close_epoch_timestamp).strftime('%Y-%m-%d %H:%M:%S.%f') | |
quote_asset_volume = klines[x][7] | |
number_of_trades = klines[x][8] | |
taker_buy_base_asset_volume = klines[x][9] | |
taker_buy_quote_asset_volume = klines[x][10] | |
fields = (open_time,open_price,high,low,close_price,volume) | |
history_list.append(fields) | |
with open('{}historical_price_data/{}/{}{}_{}.csv'.format(str(full_path),str(symbol),str(csv_month),str(csv_day),str(interval)), 'a') as f: | |
writer = csv.writer(f) | |
writer.writerow(fields) | |
def main(): | |
pair_list = grab_currencies_list() | |
interval = grab_kline_interval() | |
create_needed_directories = create_directories(pair_list) | |
start_date,end_date = grab_date_interval() | |
dates = [date for date in rrule(MONTHLY, dtstart=start_date, until=end_date)] | |
grab_data(pair_list,dates,start_date,end_date,interval) | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment