Last active
July 31, 2023 07:11
-
-
Save javipus/eeef7b961f068627e21b142ae4ba97cf to your computer and use it in GitHub Desktop.
Price oracle manipulation strategies
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from collections import OrderedDict | |
import json | |
import requests | |
from web3 import Web3 | |
from gql import gql, Client | |
from gql.transport.requests import RequestsHTTPTransport | |
from keys import API_KEYS | |
## CONSTANTS ## | |
tokens = ( | |
"WETH", # calling ETH directly gives error, probably because it's not ERC20 compliant | |
"WBTC", | |
"DAI", | |
"USDC", | |
) | |
# Unit conversion | |
wei = 18 | |
# WBTC & USDC not expressed in weis; i calculated these factors empirically, don't know where they come from | |
units = { | |
'WBTC': 8, | |
'USDC': 6, | |
} | |
# see on etherscan https://etherscan.io/address/0x5C69bEe701ef814a2B6a3EDD4B1652CB9cc5aA6f | |
uniswap_factory_address = "0x5C69bEe701ef814a2B6a3EDD4B1652CB9cc5aA6f" | |
## h/t https://towardsdatascience.com/exploring-decentraland-marketplace-sales-with-thegraph-and-graphql-2f5e8e7199b5 | |
# Select your transport with a defined url endpoint | |
transport = RequestsHTTPTransport( | |
url="https://api.thegraph.com/subgraphs/name/uniswap/uniswap-v2") | |
# Create a GraphQL client using the defined transport | |
client = Client(transport=transport, fetch_schema_from_transport=True) | |
# Use infura to fetch and call smart contracts via HTTP | |
web3 = Web3(Web3.HTTPProvider(f'https://mainnet.infura.io/v3/{API_KEYS["infura"]}')) | |
def get_reserves(token0, token1, cache='reserves.json'): | |
_token0, _token1, pool = get_uniswap_pool(token0, token1) | |
reserve0, reserve1, blockTimeStampLast = pool.functions.getReserves().call() | |
decimals0, decimals1 = map(get_decimals, (_token0, _token1)) | |
ret = OrderedDict({ | |
_token0: reserve0 / 10**decimals0, | |
_token1: reserve1 / 10**decimals1, | |
}) | |
# TODO switch to csv cache so you don't have to parse the whole file every time you want to append | |
if cache: | |
with open(cache, 'r') as f: | |
data = json.load(f) | |
data[f'{_token0}/{_token1}'] = ret | |
with open(cache, 'w') as f: | |
json.dump(data, f) | |
return ret | |
def get_marginal_price(token0, token1): | |
"""Marginal price of `token0` in units of `token1`""" | |
_token0, _token1, pool = get_uniswap_pool(token0, token1) | |
reserve0, reserve1, blockTimeStampLast = pool.functions.getReserves().call() | |
decimals0, decimals1 = map(get_decimals, (_token0, _token1)) | |
reserve0 /= 10**decimals0 | |
reserve1 /= 10**decimals1 | |
return reserve0 / reserve1, f"{_token0}/{_token1}" | |
def get_uniswap_pool(token0, token1): | |
""" | |
Get contract object representing the Uniswap pool between `token0` and `token1`. | |
@param token0, token1: String e.g. "DAI" or "WETH". Token symbols are searched on Uniswap's subgraph. The contract with that symbol and highest number of transactions is returned. | |
@return (tokenA, tokenB, pool): `(tokenA, tokenB)` are the pool tokens _in the order they are defined by the contract_. `pool` is the contract object. | |
""" | |
address0, address1 = map(get_token_address, (token0, token1)) | |
uniswap_factory = get_contract_from_address(uniswap_factory_address) | |
pool_address = uniswap_factory.functions.getPair(address0, address1).call() | |
pool = get_contract_from_address(pool_address) | |
_address0, _address1 = pool.functions.token0().call(), pool.functions.token1().call() | |
if _address0 == address0 and \ | |
_address1 == address1: | |
return (token0, token1, pool) | |
elif _address0 == address1 and \ | |
_address1 == address0: | |
print(f"WARNING: You requested {token0}/{token1} but this pool is {token1}/{token0}") | |
return (token1, token0, pool) | |
else: | |
raise Exception("Pool symbols don't match") | |
def get_decimals(token, graphql_client=client, web3_provider=web3): | |
contract = get_contract_from_address(get_token_address(token, graphql_client=graphql_client), web3_provider=web3_provider) | |
try: | |
return contract.functions.decimals().call() | |
except ABIFunctionNotFound: | |
print(f"{token} ERC20 contract has no `decimals` function. Defaulting to {units.get(token, wei)}.") | |
return units.get(token, wei) | |
def get_token_address(token, graphql_client=client): | |
# graphQL magic | |
# TODO use graphene, it's safer | |
query = gql(f""" | |
{{ | |
tokens(where: {{symbol:"{token}"}}, orderBy:txCount, orderDirection: desc, first: 1){{ | |
id, | |
txCount, | |
symbol, | |
}} | |
}}""") | |
result = client.execute(query) | |
assert len(result['tokens']) == 1, f"Found more than one token with name {token}" | |
# TODO addresses are not checksum | |
# I don't know if this is TheGraph's or Uniswap's fault | |
# I'm fixing it myself but this is a hack and it's not safe | |
return Web3.toChecksumAddress(result['tokens'][0]['id']) | |
def get_contract_from_address(address, web3_provider=web3): | |
"""Get web3 contract object from address. Calls etherscan's API to retrieve ABI.""" | |
abi = get_abi(address) | |
contract = web3.eth.contract(address=address, abi=abi) | |
return contract | |
def get_abi(address): | |
# TODO work around API limit of 5 calls / sec | |
response = requests.get( | |
f'https://api.etherscan.io/api?module=contract&action=getabi&apikey={API_KEYS["etherscan"]}&address={address}') | |
rjson = response.json() | |
if rjson['status']=='1' and rjson['message']=='OK': | |
return rjson['result'] | |
print(rjson) | |
raise Exception(rjson['message']) | |
def get_twap(pool, t0, t1): | |
""" | |
Get time-weighted average price of a token pair between times `t0` and `t1`. | |
@param t0 | |
@param t1 | |
""" | |
# TODO see https://uniswap.org/docs/v2/smart-contract-integration/building-an-oracle/ | |
pass | |
if __name__=='__main__': | |
reserves = {} | |
for i, t0 in enumerate(tokens): | |
for j, t1 in enumerate(tokens): | |
if j<=i: continue | |
reserves[f'{t0}/{t1}'] = get_reserves(t0, t1) | |
with open('reserves.json', 'w') as f: | |
json.dump(reserves, f) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
This looks like a great start!