-
-
Save jongan69/d615720f647e2ffea4113ed93203cb2f to your computer and use it in GitHub Desktop.
/lock_in buy bot
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
TELEGRAM_BOT_TOKEN=your-telegram-bot-token | |
PRIVATE_KEY=your-wallet-private-key | |
SOLANA_RPC_ENDPOINT=https://api.mainnet-beta.solana.com | |
FIXED_AMOUNT=1 # Replace with the fixed amount you want to buy every time |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import base64 | |
import json | |
import os | |
from dotenv import load_dotenv | |
from solana.rpc.async_api import AsyncClient | |
from solana.rpc.commitment import Processed | |
from solana.rpc.types import TxOpts | |
from solders.keypair import Keypair | |
from solders import message | |
from solders.transaction import VersionedTransaction | |
from telegram import Update | |
from telegram.ext import Application, CommandHandler, ContextTypes | |
from jupiter_python_sdk.jupiter import Jupiter | |
import base58 | |
# Load environment variables | |
load_dotenv() | |
BOT_TOKEN = os.getenv("TELEGRAM_BOT_TOKEN") | |
PRIVATE_KEY = Keypair.from_bytes(base58.b58decode(os.getenv("PRIVATE_KEY"))) | |
SOLANA_RPC_ENDPOINT = os.getenv("SOLANA_RPC_ENDPOINT") | |
FIXED_AMOUNT = int(os.getenv("FIXED_AMOUNT")) | |
# Mint addresses | |
INPUT_TOKEN_MINT = "So11111111111111111111111111111111111111112" | |
OUTPUT_TOKEN_MINT = "8Ki8DpuWNxu9VsS3kQbarsCWMcFGWkzzA8pUPto9zBd5" | |
# Initialize AsyncClient and Jupiter | |
async_client = AsyncClient(SOLANA_RPC_ENDPOINT) | |
jupiter = Jupiter( | |
async_client=async_client, | |
keypair=PRIVATE_KEY, | |
quote_api_url="https://quote-api.jup.ag/v6/quote?", | |
swap_api_url="https://quote-api.jup.ag/v6/swap", | |
open_order_api_url="https://jup.ag/api/limit/v1/createOrder", | |
cancel_orders_api_url="https://jup.ag/api/limit/v1/cancelOrders", | |
query_open_orders_api_url="https://jup.ag/api/limit/v1/openOrders?wallet=", | |
query_order_history_api_url="https://jup.ag/api/limit/v1/orderHistory", | |
query_trade_history_api_url="https://jup.ag/api/limit/v1/tradeHistory") | |
async def buy_token(update: Update, context: ContextTypes.DEFAULT_TYPE): | |
"""Handle the /buy command to buy a fixed amount of a token.""" | |
try: | |
# Execute the swap transaction | |
transaction_data = await jupiter.swap( | |
input_mint=INPUT_TOKEN_MINT, | |
output_mint=OUTPUT_TOKEN_MINT, | |
amount=FIXED_AMOUNT, | |
slippage_bps=1, | |
) | |
raw_transaction = VersionedTransaction.from_bytes( | |
base64.b64decode(transaction_data)) | |
signature = PRIVATE_KEY.sign_message( | |
message.to_bytes_versioned(raw_transaction.message)) | |
signed_txn = VersionedTransaction.populate(raw_transaction.message, | |
[signature]) | |
opts = TxOpts(skip_preflight=False, preflight_commitment=Processed) | |
result = await async_client.send_raw_transaction(txn=bytes(signed_txn), | |
opts=opts) | |
transaction_id = json.loads(result.to_json())['result'] | |
# Respond with the transaction details | |
await update.message.reply_text( | |
f"Purchased token. Transaction ID: https://explorer.solana.com/tx/{transaction_id}" | |
) | |
except Exception as e: | |
await update.message.reply_text(f"An error occurred: {str(e)}") | |
def main(): | |
# Initialize the Telegram bot application | |
application = Application.builder().token(BOT_TOKEN).build() | |
# Add command handler for /buy | |
application.add_handler(CommandHandler("lock_in", buy_token)) | |
# Start the bot | |
print("Bot started with /buy command functionality!") | |
application.run_polling() | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment