Skip to content

Instantly share code, notes, and snippets.

@korrio
Created October 22, 2024 06:25
Show Gist options
  • Save korrio/bb799d64a0a91097c9190499f3078f8a to your computer and use it in GitHub Desktop.
Save korrio/bb799d64a0a91097c9190499f3078f8a to your computer and use it in GitHub Desktop.
swap_on_uniswap_v3.py
async def swap_on_uniswap_v3(w3, router_contract, token_in, token_out, amount, wallet_address, private_key, deadline):
try:
account = w3.eth.account.from_key(private_key)
# ใช้ quoteExactInputSingle เพื่อประมาณการ amounts_out
quoter_v3 = w3.eth.contract(address=QUOTER_ADDRESS, abi=quoter_abi)
quote = quoter_v3.functions.quoteExactInputSingle(
token_in,
token_out,
3000, # 0.3% fee tier
amount,
0 # sqrtPriceLimitX96
).call()
min_amount_out = int(quote * 0.995) # 0.5% slippage
# เตรียมพารามิเตอร์สำหรับ swap
params = {
'tokenIn': token_in,
'tokenOut': token_out,
'fee': 3000, # 0.3% fee tier
'recipient': account.address,
'deadline': deadline,
'amountIn': amount,
'amountOutMinimum': min_amount_out,
'sqrtPriceLimitX96': 0 # ไม่จำกัดราคา
}
# ฟังก์ชันสำหรับสร้างและส่งธุรกรรม
def create_and_send_transaction(gas_price_multiplier=1.1):
max_fee_per_gas = int(w3.eth.max_priority_fee * gas_price_multiplier)
max_priority_fee_per_gas = int(w3.eth.max_priority_fee * gas_price_multiplier)
swap_txn = router_contract.functions.exactInputSingle(params).build_transaction({
'from': account.address,
'nonce': w3.eth.get_transaction_count(account.address),
'gas': 300000,
'maxFeePerGas': max_fee_per_gas,
'maxPriorityFeePerGas': max_priority_fee_per_gas,
'type': '0x2'
})
signed_txn = account.sign_transaction(swap_txn)
return w3.eth.send_raw_transaction(signed_txn.rawTransaction)
# พยายามส่งธุรกรรมด้วยราคาแก๊สที่เพิ่มขึ้นหากเกิดข้อผิดพลาด
for attempt in range(3):
try:
tx_hash = create_and_send_transaction(1.1 ** attempt)
break
except Exception as e:
if "replacement transaction underpriced" in str(e) and attempt < 2:
print(f"Transaction underpriced, retrying with higher gas price (attempt {attempt + 1})")
continue
raise e
# Start monitoring the transaction in the background
asyncio.create_task(monitor_transaction(w3, tx_hash))
return {
'success': True,
'tx_hash': tx_hash.hex(),
'expected_amount_out': quote,
'estimated_tx_cost':'0.000106789',
'min_amount_out': min_amount_out,
'block_number': w3.eth.block_number
}
except Exception as e:
print(f"{Fore.RED}Error during swap on UniswapV3: {str(e)}")
return {'success': False, 'error': str(e)}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment