Last active
May 3, 2022 01:09
-
-
Save BdVade/6fa19c43e62d3e3a461d6116dabebb47 to your computer and use it in GitHub Desktop.
Two Scripts to compile the volume traded of a token on a pool from it's AMM . 1 through Solscan The other through an RPC Node
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import requests | |
import time | |
import json | |
def get_amm_historical_volume_from_rpc_node(amm_address: str): | |
url = 'https://api.mainnet-beta.solana.com/' | |
rerun = True | |
volumes = [] | |
volume = 0 | |
c = 0 | |
body = { | |
"jsonrpc": "2.0", | |
"id": 1, | |
"method": "getSignaturesForAddress", | |
"params": [ | |
amm_address, | |
{ | |
"limit": 1000 | |
} | |
] | |
} | |
headers = { | |
"Content-Type": "application/json", | |
"Accept": "*/*" | |
} | |
while rerun: | |
try: | |
response = requests.post(url, data=json.dumps(body), headers=headers) | |
except requests.exceptions.ConnectionError: | |
time.sleep(5) | |
continue | |
except ConnectionError: | |
time.sleep(5) | |
continue | |
try: | |
data = response.json().get("result") | |
except Exception as e: | |
print(e) | |
time.sleep(5) | |
continue | |
rerun = True if len(data) >= 1000 else False | |
for item in data: | |
while True: | |
try: | |
item_response = requests.post(url, data=json.dumps({ | |
"jsonrpc": "2.0", | |
"id": 1, | |
"method": "getTransaction", | |
"params": [ | |
item.get("signature"), | |
"json" | |
] | |
}), headers=headers).json() | |
if item_response.get("result").get("meta").get("err"): | |
pass | |
else: | |
after = item_response.get("result").get("meta").get("postBalances")[0] | |
before = item_response.get("result").get("meta").get("preBalances")[0] | |
balance_difference = abs(after-before) | |
volume += balance_difference | |
volumes.append(balance_difference) | |
print(item_response) | |
break | |
except Exception as e: | |
print(f"Attempt to get signature deets. Error is : {e}") | |
time.sleep(3) | |
continue | |
body = { | |
"jsonrpc": "2.0", | |
"id": 1, | |
"method": "getSignaturesForAddress", | |
"params": [ | |
amm_address, | |
{ | |
"limit": 1000, | |
"before": data[-1].get("signature") | |
} | |
] | |
} | |
c += 1 | |
print(c) | |
print(volume) | |
print(f"the volume is {sum(volumes)}") | |
return volumes | |
get_amm_historical_volume_from_rpc_node("7XawhbbxtsRcQA8KTkHT9f9nc6d69UwqCDh6U5EEbEmX") |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import requests | |
import time | |
def get_amm_historical_volume(amm_address: str): | |
url = f"https://api.solscan.io/amm/txs?address={amm_address}&type=swap&offset=0&limit=10&" \ | |
f"&tx_status=success" | |
rerun = True | |
volumes = [] | |
volume = 0 | |
c = 0 | |
while rerun: | |
try: | |
response = requests.get(url) | |
except requests.exceptions.ConnectionError: | |
time.sleep(5) | |
continue | |
except ConnectionError: | |
time.sleep(5) | |
continue | |
try: | |
data = response.json().get("data") | |
except Exception as e: | |
print(e) | |
time.sleep(5) | |
continue | |
rerun = (data.get("hasNext")) | |
items = data.get("items") | |
for item in items: | |
volumes.append(float(item.get("volume"))) | |
volume += float(item.get("volume")) | |
unix_time = items[-1].get("blockUnixTime") | |
_id = items[-1].get("_id") | |
url = f"https://api.solscan.io/amm/txs?address={amm_address}&type=swap&offset=0&limit=10&" \ | |
f"&tx_status=success&before_id={_id}&before_time={unix_time}" | |
c += 1 | |
print(c) | |
print(volume) | |
print(sum(volumes)) | |
return volumes | |
get_amm_historical_volume("7XawhbbxtsRcQA8KTkHT9f9nc6d69UwqCDh6U5EEbEmX") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment