Skip to content

Instantly share code, notes, and snippets.

@nghuyenthevinh2000
Created May 19, 2023 11:44
Show Gist options
  • Save nghuyenthevinh2000/6cf396c77700f50dc5c54db8547430ea to your computer and use it in GitHub Desktop.
Save nghuyenthevinh2000/6cf396c77700f50dc5c54db8547430ea to your computer and use it in GitHub Desktop.
script to crawl smart contract address on classic
import requests
import csv
import json
import time
BATCH_SIZE = 100
MAX_RETRIES = 10
ITEMS_PER_PAGE = 100
code_id = 1643
count_wrote = 0
retries = 0
with open('file.csv', 'a', newline='') as file:
writer = csv.writer(file)
# writer.writerow(['code_id', 'contract_address'])
while True:
offset = 0
total = 1
while retries < MAX_RETRIES:
try:
while offset < total:
url = f'https://lcd.terrarebels.net/cosmos/tx/v1beta1/txs?events=instantiate_contract.code_id={code_id}&pagination.offset={offset}'
response = requests.get(
url='https://proxy.scrapeops.io/v1/',
params={
'api_key': '83e4456c-d9fe-4e6b-8d3e-3ba0ca9c8dec',
'url': url,
},
)
if(response.status_code != 200):
raise Exception(f'Status code is not 200: {response.status_code} code_id: {code_id}')
# Convert the response to JSON
data = response.json()
# handle null
if not data:
raise Exception(f'No data found for code_id: {code_id}')
tx_responses = data['tx_responses']
for tx_response in tx_responses:
logs = tx_response['logs']
for log in logs:
instantiate_contracts = [item for item in log["events"] if item.get("type") == "instantiate_contract"]
for instantiate_contract in instantiate_contracts:
contract_address = next((item for item in instantiate_contract['attributes'] if item.get("key") == "contract_address"), None)
if contract_address:
writer.writerow([code_id, contract_address['value']])
count_wrote += 1
if count_wrote % BATCH_SIZE == 0:
print(count_wrote)
file.flush()
total = int(data['pagination']['total'])
offset += ITEMS_PER_PAGE
retries = 0
code_id += 1
break
except Exception as e:
print(f"An error occurred: {e}")
print(f"Retrying in some seconds...")
time.sleep(3 + retries * 2) # Wait for 5 seconds before retrying
retries += 1
if retries == MAX_RETRIES:
print("Max retries reached attempts:", retries, " code_id:", code_id, " count_wrote:", count_wrote)
break
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment