Skip to content

Instantly share code, notes, and snippets.

@0xlxy
Created September 8, 2022 21:52
Show Gist options
  • Save 0xlxy/d6e57e049f4d048cd9e3a3da5757eabd to your computer and use it in GitHub Desktop.
Save 0xlxy/d6e57e049f4d048cd9e3a3da5757eabd to your computer and use it in GitHub Desktop.
Sudoswap API
import boto3
import requests
class UpdateDB:
def __init__(self):
self.result = []
self.get_sudoswap()
self.update_db()
def update_db(self):
client_dynamo = boto3.resource("dynamodb")
table_dynamo = client_dynamo.Table("sudoswap")
try:
with table_dynamo.batch_writer() as batch:
for item in self.result:
batch.put_item(Item=item)
except Exception:
raise
def get_sudoswap(self):
page = 1
tmp = []
while True:
response = requests.get(
f"https://sudoapi.xyz/v1/collections?sort=volume_all_time&desc=true&pageNumber={page}",
timeout=10,
).json()["collections"]
if not response:
break
res = []
for i in range(len(response)):
if response[i]["address"] in tmp:
continue
try:
res.append(
{
"address": response[i]["address"].lower(),
"sell_quote": response[i]["sell_quote"],
}
)
tmp.append(response[i]["address"])
except:
pass
self.result += res
page += 1
def lambda_handler(event, context):
UpdateDB()
return "success"
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment