Created
June 1, 2022 07:41
-
-
Save va3093/e9565c17710aa641636654a2c07c26c6 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import csv | |
import datetime | |
import gzip | |
import logging | |
import os | |
import requests | |
import simplejson as json | |
import sys | |
import time | |
from typing import List | |
logging.basicConfig() | |
_logger: logging.Logger = logging.getLogger(__name__) | |
_logger.setLevel(logging.INFO) | |
API_URL = "https://core-hsr.dune.com/v1/graphql" | |
QUERY_ID = 283432 # 283763 | |
HEADERS = {"x-hasura-api-key": "<get_it_from_kms>"} | |
CSV_COLUMNS = [ | |
"trade_date", | |
"project", | |
"version", | |
"category", | |
"token_a_symbol", | |
"token_b_symbol", | |
"exchange_contract_address", | |
"token_a_amount", | |
"token_b_amount", | |
"usd_amount", | |
"trades", | |
"max_trade_usd_amount", | |
] | |
def execute_with_retry(url, query, variables, headers={}, maxRetry=5): | |
curRetry = 1 | |
while curRetry <= maxRetry: | |
if curRetry == maxRetry: | |
raise Exception("Reached max retry. Exit now.") | |
try: | |
req = requests.post(url, json={"query": query, "variables": variables}, headers=headers) | |
result = json.loads(req.text) | |
if "data" in result: | |
return result["data"] | |
else: | |
return result | |
except Exception as e: | |
print(sys.exc_info()[0]) | |
print(e) | |
print(f"Error retry {curRetry}/{maxRetry}") | |
curRetry += 1 | |
time.sleep(5) | |
return None | |
def download_dune_query_to_csv(query_id: int, start_date: str, end_date: str, csv_columns: List, output_dir: str): | |
""" | |
The GraphQL logic is based on https://gist.github.com/mewwts/7ec78380018ed0581d45172761d546ba | |
SELECT | |
date(block_time) trade_date | |
, project | |
, version | |
, category | |
, token_a_symbol | |
, token_b_symbol | |
, encode(exchange_contract_address, 'hex') as exchange_contract_address | |
, SUM(token_a_amount) as token_a_amount | |
, SUM(token_b_amount) as token_b_amount | |
, SUM(usd_amount) as usd_amount | |
, COUNT(*) as trades | |
, MAX(usd_amount) as max_trade_usd_amount | |
FROM dex."trades" t | |
WHERE category = 'DEX' | |
AND block_time >= '{{start_date}}'::date | |
AND block_time < '{{end_date}}'::date | |
GROUP BY 1,2,3,4,5,6,7 | |
""" | |
gquery_get_result = """ | |
query GetResult ($query_id: Int!, $parameters: [Parameter!]){ | |
get_result_v2(query_id: $query_id, parameters: $parameters) { | |
job_id | |
result_id | |
error_id | |
__typename | |
} | |
} | |
""" | |
dex_volume_params = { | |
"query_id": query_id, | |
"parameters": [ | |
{"key": "start_date", "type": "datetime", "value": f"{start_date}"}, | |
{"key": "end_date", "type": "datetime", "value": f"{end_date}"}, | |
], | |
} | |
gquery_get_result_by_result_id = """ | |
query FindResultDataByResult($result_id: uuid!){ | |
query_results(where: {id: {_eq: $result_id}}) { | |
id | |
job_id | |
error | |
runtime | |
generated_at | |
columns | |
__typename | |
} | |
get_result_by_result_id(args: {want_result_id: $result_id}) { | |
data | |
__typename | |
} | |
} | |
""" | |
# main logic: | |
result = None | |
while result is None or "get_result_v2" not in result or result["get_result_v2"]["result_id"] is None: | |
result = execute_with_retry(API_URL, gquery_get_result, dex_volume_params) | |
_logger.info(result) | |
if "errors" in result: | |
raise RuntimeError(result["errors"][0]) | |
time.sleep(5) | |
result_id = result["get_result_v2"]["result_id"] | |
error_id = result["get_result_v2"].get("error_id") | |
if error_id is not None: | |
# When the most recent execution of a query returns an error get_result_v2 will return | |
# the query_error.id along with the most recent succesful result. | |
_logger.warn(f"The most recent execution resulted in an error with id: {error_id}") | |
_logger.info(f"Waiting for RESULT_ID: {result_id}") | |
get_result_by_result_id_params = {"result_id": result_id} | |
result = execute_with_retry( | |
url=API_URL, | |
query=gquery_get_result_by_result_id, | |
variables=get_result_by_result_id_params, | |
headers=HEADERS, | |
) | |
record_count = len(result["get_result_by_result_id"]) | |
OUTPUT_ARRAY = [] | |
for data in result["get_result_by_result_id"]: | |
OUTPUT_ARRAY.append(data["data"]) | |
tag = start_date[0:7].replace("-", "") | |
csv_file = f"{output_dir}/dex_daily_volume.month={tag}.csv.gz" | |
if not os.path.exists(output_dir): | |
os.makedirs(output_dir) | |
try: | |
with gzip.open(csv_file, "wt") as csvfile: | |
writer = csv.DictWriter(csvfile, fieldnames=CSV_COLUMNS) | |
writer.writeheader() | |
for data in OUTPUT_ARRAY: | |
writer.writerow(data) | |
except Exception as e: | |
raise (e) | |
_logger.info(f"{csv_file} downloaded with {record_count:>12} records.") | |
if __name__ == "__main__": | |
# generate [start_date, end_date) list | |
start_date = "2021-12-01" | |
end_date = "2022-02-01" | |
all_query_dates = [] | |
s = datetime.datetime.strptime(start_date, "%Y-%m-%d").replace(day=1) | |
end_date = datetime.datetime.strptime(end_date, "%Y-%m-%d") | |
while s < end_date: | |
e = (s + datetime.timedelta(days=32)).replace(day=1) | |
all_query_dates.append(s.strftime("%Y-%m-%d 00:00:00") + "|" + e.strftime("%Y-%m-%d 00:00:00")) | |
s = e | |
all_query_dates.sort(reverse=True) | |
i = 0 | |
for pair in all_query_dates: | |
(start_date, end_date) = pair.split("|") | |
_logger.info(f"{i:>4} Trying [{start_date}, {end_date}):") | |
download_dune_query_to_csv( | |
query_id=QUERY_ID, | |
start_date=start_date, | |
end_date=end_date, | |
csv_columns=CSV_COLUMNS, | |
output_dir="/var/tmp/dune", | |
) | |
i += 1 |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment