Created
November 3, 2022 13:18
-
-
Save charlesdarkwind/3fc5d12f89bce93b43b0b2554a6ad9f3 to your computer and use it in GitHub Desktop.
binance2.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Python 3.9 | |
import asyncio | |
import hashlib | |
import aiohttp | |
from aiofile import async_open, AIOFile, Reader | |
BASE_URL = 'https://data.binance.vision' | |
def get_file_name(symbol, interval, year, month, ext): | |
return f'{symbol}-{interval}-{year}-{month}.{ext}' | |
async def request(session: aiohttp.ClientSession, symbol, interval, file_name): | |
url = f'{BASE_URL}/data/spot/monthly/klines/{symbol}/{interval}/' + file_name | |
checksum_url = url + '.CHECKSUM' | |
res = await session.get(url) | |
if res.status == 404 and 'NoSuchKey' in await res.text(): | |
return 'NoSuchKey', None | |
if res.status != 200: | |
raise Exception(await res.text()) | |
checksum_res = await session.get(checksum_url) | |
if checksum_res.status != 200: | |
raise Exception(await checksum_res.text()) | |
checksum = (await checksum_res.text()).split(' ')[0] | |
return res, checksum | |
async def write_zip(zip_data, path): | |
async with async_open(path, 'wb') as f: | |
await f.write(zip_data) | |
async def generate_checksum(file_path): | |
sha256_hash = hashlib.sha256() | |
async with AIOFile(file_path, 'rb') as f: | |
reader = Reader(f) | |
async for chunk in reader: | |
sha256_hash.update(chunk) | |
return sha256_hash.hexdigest() | |
async def verify_checksum(file_path, expected, generated): | |
if expected != generated: | |
raise ValueError( | |
f'Wrong checksum for file {file_path}. expected {expected}, got {generated}.') | |
async def main(): | |
async with aiohttp.ClientSession() as session: | |
symbol, interval, year, month = 'LTCBTC', '1m', '2022', '10' | |
file_name = get_file_name(symbol, interval, year, month, 'zip') | |
res, checksum = await request(session, symbol, interval, file_name) | |
if res == 'NoSuchKey': | |
print(f'No data') | |
print(f'Found data') | |
zip_data = await res.read() | |
await write_zip(zip_data, './data.zip') | |
generated_checksum = await generate_checksum('./data.zip') | |
await verify_checksum('./data.zip', checksum, generated_checksum) | |
if __name__ == '__main__': | |
loop = asyncio.get_event_loop() | |
loop.run_until_complete(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment