Last active
April 3, 2024 14:48
-
-
Save jongan69/2310ff39b8572d17da539f6daaae67ba to your computer and use it in GitHub Desktop.
V1 of MEMEME Bot
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import os | |
| import re | |
| import json | |
| from dotenv import load_dotenv | |
| import requests | |
| import asyncio | |
| import aiohttp | |
| from telegram import Update | |
| from telegram.ext import Application, CommandHandler, ConversationHandler, MessageHandler, filters, ContextTypes | |
| import itertools | |
| from solana.rpc.api import Client | |
| Base_URL = 'https://MEMEME.ooo/' | |
| Burn_URL = f'{Base_URL}/api/burn' | |
| Balance_URL = f'{Base_URL}/api/mememeBalance' | |
| Airdrop_URL = f'{Base_URL}/api/startAirdrop' | |
| Price_URL = f'{Base_URL}/api/getPrice' | |
| Burn_AMOUNT = 1000 | |
| Contract_Address = "7Q8Q6QbxsgiwAgfGMWWm2wU4bi1sWELCC4Vt3ypw3BM2" | |
| Burn_Address = "4cjrPocxTryHXka56qSnNPqJY5METi3UQKMs7EwwPKfs" | |
| Token_URL = "https://www.pump.fun/7Q8Q6QbxsgiwAgfGMWWm2wU4bi1sWELCC4Vt3ypw3BM2" | |
| LAUNCH_MESSAGE = "Hello, I am a bot that will help you burn your Solana NFTs. \n" | |
| READY_MESSAGE = "Hey @mememeLady1, I think we're ready to do an airdrop" | |
| load_dotenv() # Take environment variables from .env. | |
| YOUR_BOT_TOKEN = os.environ.get("YOUR_BOT_TOKEN") | |
| SOLANA_ADDRESS_PATTERN = r"\b[1-9A-HJ-NP-Za-km-z]{32,44}\b" | |
| ALC_API_KEY = os.environ.get("ALC_API_KEY") | |
| http_client = Client(f"https://solana-mainnet.g.alchemy.com/v2/{ALC_API_KEY}") | |
| TIMER_INTERVAL = 15 # Interval in seconds | |
| toggle = itertools.cycle([True, False]).__next__ | |
| YAP = False | |
| last_message_sender = {} | |
| # Configure logging | |
| # logging.basicConfig( | |
| # format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', | |
| # level=logging.INFO) | |
| # logger = logging.getLogger(__name__) | |
| async def is_user_admin(update: Update, context: ContextTypes.DEFAULT_TYPE): | |
| """Check if the user is an admin of the chat.""" | |
| user_id = update.effective_user.id | |
| chat_id = update.effective_chat.id | |
| # For private chats, the user is considered an admin. | |
| if update.effective_chat.type == "private": | |
| return True | |
| # Check admin status in non-private chats. | |
| admins = await context.bot.get_chat_administrators(chat_id) | |
| return any(admin.user.id == user_id for admin in admins) | |
| async def message_handler(update, context: ContextTypes.DEFAULT_TYPE): | |
| """Check every message for Solana addresses and ensure no duplicates are saved, managing a separate list for each chat.""" | |
| chat_id = str(update.message.chat.id) | |
| user_id = str(update.effective_user.username) | |
| message_text = update.message.text | |
| if user_id: | |
| last_message_sender[user_id] = chat_id | |
| solana_matches = re.findall(SOLANA_ADDRESS_PATTERN, message_text) | |
| if solana_matches: | |
| addresses = read_addresses() | |
| if chat_id not in addresses or not addresses[chat_id].get( | |
| 'adding_allowed', False): | |
| if YAP: | |
| await update.message.reply_text( | |
| "Adding new Solana addresses is currently not allowed in this chat." | |
| ) | |
| return | |
| for address in solana_matches: | |
| if user_id not in addresses[chat_id]["addresses"]: | |
| addresses[chat_id]["addresses"][user_id] = address | |
| write_addresses(addresses) | |
| await update.message.reply_text(f"Saved Solana address: {address}") | |
| else: | |
| if YAP: | |
| await update.message.reply_text( | |
| "Each user can only save one address.") | |
| else: | |
| return | |
| # V1 FUNCTIONS | |
| async def start_adding(update: Update, context: ContextTypes.DEFAULT_TYPE): | |
| """Allow adding of new addresses if the user is an admin.""" | |
| if not await is_user_admin(update, context): | |
| if YAP: | |
| await update.message.reply_text( | |
| "This command can only be used by administrators.") | |
| return | |
| chat_id = str(update.message.chat.id) | |
| addresses = read_addresses() | |
| if chat_id not in addresses: | |
| addresses[chat_id] = {"adding_allowed": False, "addresses": {}} | |
| addresses[chat_id]['adding_allowed'] = True | |
| write_addresses(addresses) | |
| if YAP: | |
| await update.message.reply_text( | |
| "Adding new Solana addresses is now allowed.") | |
| async def stop_adding(update: Update, context: ContextTypes.DEFAULT_TYPE): | |
| """Stop adding of new addresses if the user is an admin.""" | |
| if not await is_user_admin(update, context): | |
| if YAP: | |
| await update.message.reply_text( | |
| "This command can only be used by administrators.") | |
| return | |
| chat_id = str(update.message.chat.id) | |
| addresses = read_addresses() | |
| if chat_id not in addresses: | |
| addresses[chat_id] = {"adding_allowed": False, "addresses": {}} | |
| addresses[chat_id]['adding_allowed'] = False | |
| write_addresses(addresses) | |
| if YAP: | |
| await update.message.reply_text( | |
| "Adding new Solana addresses is now disabled.") | |
| def read_addresses(): | |
| """Read the addresses from the JSON file.""" | |
| try: | |
| with open("solana_addresses_by_chat.json", "r") as file: | |
| return json.load(file) | |
| except FileNotFoundError: | |
| return {} | |
| def write_addresses(addresses): | |
| """Write the addresses to the JSON file.""" | |
| with open("solana_addresses_by_chat.json", "w") as file: | |
| json.dump(addresses, file, indent=4) | |
| async def list_addresses(update, context: ContextTypes.DEFAULT_TYPE): | |
| """List all whitelisted Solana addresses for the chat, in chunks if necessary.""" | |
| print(f"{update.message.chat.username}") | |
| chat_id = str(update.message.chat.id) | |
| addresses = read_addresses() | |
| if chat_id in addresses and addresses[chat_id]: | |
| # Prepare the initial part of the message. | |
| base_message = "Whitelisted Solana addresses:\n" | |
| current_message = base_message | |
| all_addresses = addresses[chat_id]["addresses"] | |
| for user in all_addresses: | |
| address = all_addresses[user] | |
| # If adding another address exceeds the limit, send the current message and start a new one. | |
| if len(current_message) + len( | |
| address) + 1 > 4096: # +1 for the newline character | |
| await update.message.reply_text(current_message) | |
| current_message = base_message # Reset with the base message for continuity. | |
| current_message += f"User @{user} has wallet `{address}`" + "\n" | |
| # Send any remaining addresses in the final chunk. | |
| if current_message != base_message: | |
| await update.message.reply_text(current_message, parse_mode='MarkdownV2') | |
| else: | |
| await update.message.reply_text( | |
| "No whitelisted Solana addresses found for this chat, to donate to the bot: `9ex8SZWhb2X5MRmqZt4Uu9UEbWtRbJDnMozbyN5sCU7N`", | |
| parse_mode='MarkdownV2') | |
| async def export_addresses(update, context: ContextTypes.DEFAULT_TYPE): | |
| """Export whitelisted Solana addresses to a CSV file.""" | |
| chat_id = str(update.message.chat.id) | |
| addresses = read_addresses() | |
| if chat_id in addresses and addresses[chat_id]: | |
| csv_content = "Wallet Address\n" | |
| all_addresses = addresses[chat_id]["addresses"] | |
| for user in all_addresses: | |
| address = all_addresses[user] | |
| csv_content += f"{address}\n" | |
| with open("solana_addresses_export.csv", "w") as csv_file: | |
| csv_file.write(csv_content) | |
| await update.message.reply_document( | |
| document=open("solana_addresses_export.csv", "rb")) | |
| else: | |
| await update.message.reply_text( | |
| "No whitelisted Solana addresses found to export.") | |
| async def toggleYap(update, context: ContextTypes.DEFAULT_TYPE): | |
| global YAP # Declare the variable as global to modify it | |
| if not await is_user_admin(update, context): | |
| if YAP: | |
| await update.message.reply_text( | |
| "This command can only be used by administrators.") | |
| return | |
| YAP = toggle() | |
| await update.message.reply_text(f"Yapp is set to {YAP}") | |
| return YAP | |
| # END V1 FUNCTIONS | |
| # STATIC FUNCTIONS | |
| async def wherethefuck(update, context: ContextTypes.DEFAULT_TYPE): | |
| message = "Where the FUCK is my $mememe I have been WAITING for DAYS. I cannot wait any longer my bonkbot is loaded up and ready to ape. I just snorted a line and i'm ready to go to valhalla with my bags.\nNow give me my $mememe or im fudding." | |
| await update.message.reply_text(f"{message}") | |
| return message | |
| async def launch(update, context: ContextTypes.DEFAULT_TYPE): | |
| message = f"{LAUNCH_MESSAGE}" | |
| await update.message.reply_text(f"{message}") | |
| return message | |
| async def CA(update, context: ContextTypes.DEFAULT_TYPE): | |
| message = Contract_Address | |
| await update.message.reply_text(f"{message}", parse_mode='MarkdownV2') | |
| return message | |
| async def BA(update, context: ContextTypes.DEFAULT_TYPE): | |
| message = Burn_Address | |
| await update.message.reply_text(f"{message}", parse_mode='MarkdownV2') | |
| return message | |
| async def site(update, context: ContextTypes.DEFAULT_TYPE): | |
| message = Base_URL | |
| await update.message.reply_text(f"{message}") | |
| return message | |
| async def about(update, context: ContextTypes.DEFAULT_TYPE): | |
| message = f"{Base_URL}/about" | |
| await update.message.reply_text(f"{message}") | |
| return message | |
| async def pump(update, context: ContextTypes.DEFAULT_TYPE): | |
| message = Token_URL | |
| await update.message.reply_text(f"{message}") | |
| return message | |
| async def ready(update, context: ContextTypes.DEFAULT_TYPE): | |
| message = READY_MESSAGE | |
| await update.message.reply_text(f"{message}") | |
| return message | |
| async def heyTellHim(update: Update, | |
| context: ContextTypes.DEFAULT_TYPE) -> int: | |
| user_id = update.effective_user.id | |
| # Check if we have a record of a previous sender | |
| if user_id in last_message_sender: | |
| # last_user_id = last_message_sender[chat_id] | |
| chat_id = last_message_sender[user_id] | |
| print(chat_id) | |
| # Craft a reply. Here, using username. Adjust based on your need. | |
| message = "shut the FUCK up" | |
| await update.message.reply_text(message, reply_to_message_id=chat_id) | |
| else: | |
| await update.message.reply_text( | |
| "Couldn't find a recent different user to reply to.") | |
| async def dev(update, context: ContextTypes.DEFAULT_TYPE): | |
| try: | |
| lol = str("The dev wants to know if going to jail is bullish π°π°π°π°π°π°π°") | |
| await update.message.reply_text(f"{lol}") | |
| except Exception as e: | |
| await update.message.reply_text(f"{e}") | |
| async def ai(update, context: ContextTypes.DEFAULT_TYPE): | |
| try: | |
| lol = str( | |
| "Once a raydium liquidity pool is created, token holders will be able to invoke MEMEME ai protocol functions with MEMEME tokens" | |
| ) | |
| await update.message.reply_text(f"{lol}") | |
| except Exception as e: | |
| await update.message.reply_text(f"{e}") | |
| async def generate(update, context: ContextTypes.DEFAULT_TYPE): | |
| try: | |
| lol = str("This feature is stil under construction please wait") | |
| await update.message.reply_text(f"{lol}") | |
| except Exception as e: | |
| await update.message.reply_text(f"{e}") | |
| async def meme(update, context: ContextTypes.DEFAULT_TYPE): | |
| try: | |
| lol = str( | |
| "If this project breaks 100k Market Cap before @mememeLady1 tweets about it he/she/they/dumbest fucking milady/it has to suck their own dick" | |
| ) | |
| await update.message.reply_text(f"{lol}") | |
| except Exception as e: | |
| await update.message.reply_text(f"{e}") | |
| async def meme1(update, context: ContextTypes.DEFAULT_TYPE): | |
| try: | |
| lol = str("Selling @mememeLady1 for 1 MEMEME") | |
| await update.message.reply_text(f"{lol}") | |
| except Exception as e: | |
| await update.message.reply_text(f"{e}") | |
| async def meme2(update, context: ContextTypes.DEFAULT_TYPE): | |
| try: | |
| lol = str( | |
| "This is an ad for getting @mememeLady1 some bitches, unfortunately, they have yet to aquire any and needs to be aided by donation of 1 MEMEME" | |
| ) | |
| await update.message.reply_text(f"{lol}") | |
| except Exception as e: | |
| await update.message.reply_text(f"{e}") | |
| async def thisguy(update, context: ContextTypes.DEFAULT_TYPE): | |
| try: | |
| lol = str("The only thing hes fucking is stupid") | |
| await update.message.reply_text(f"{lol}") | |
| except Exception as e: | |
| await update.message.reply_text(f"{e}") | |
| # END STATIC FUNCTIONS | |
| # API FUNCTIONS | |
| async def tokensInBurner(): | |
| payload = {"walletAddress": Burn_Address} | |
| headers = {"Content-Type": "application/json"} | |
| try: | |
| response = requests.post(Balance_URL, json=payload, headers=headers) | |
| if response.status_code == 200: | |
| print("Success") | |
| return response.json() | |
| return response.json() | |
| except Exception as e: | |
| print(f"Failed: {e}") | |
| return None | |
| async def left(update, context: ContextTypes.DEFAULT_TYPE): | |
| tokensInBurnerRes = await tokensInBurner() | |
| if tokensInBurnerRes: | |
| try: | |
| message = f"{tokensInBurnerRes['message']}" | |
| await update.message.reply_text(f"{message}") | |
| return message | |
| except Exception as e: | |
| await update.message.reply_text(f"{e}") | |
| # Async function to burn tokens | |
| async def burn_tokens(amount): | |
| endpoint_url = f"{Burn_URL}" # Replace with your actual endpoint URL | |
| payload = {"amount": amount} | |
| headers = {"Content-Type": "application/json"} | |
| try: | |
| async with aiohttp.ClientSession() as session: | |
| async with session.post(endpoint_url, json=payload, | |
| headers=headers) as response: | |
| response.raise_for_status() | |
| if response.headers.get('Content-Type') == 'application/json': | |
| return await response.json() | |
| else: | |
| print("Response not in JSON format or request failed") | |
| return None | |
| except Exception as e: | |
| print(f"An exception occurred during the token burn operation: {e}") | |
| return None | |
| # Define a single state for numeric input | |
| GET_NUMBER = range(1) | |
| async def startBurnConvo(update: Update, | |
| context: ContextTypes.DEFAULT_TYPE) -> int: | |
| await update.message.reply_text( | |
| "Please enter a valid number. You have 3 attempts.") | |
| return GET_NUMBER | |
| async def get_number(update: Update, | |
| context: ContextTypes.DEFAULT_TYPE) -> int: | |
| user_input = update.message.text | |
| # Initialize or increment attempt count in user's context if not directly storing it | |
| attempts_made = context.user_data.get("attempts", 0) + 1 | |
| context.user_data["attempts"] = attempts_made | |
| if user_input.isdigit(): | |
| amount = int(user_input) | |
| # Assuming burn operation is successful or you handle it accordingly | |
| await update.message.reply_text(f"Attempting to burn {amount} tokens...") | |
| burnRes = await burn_tokens(amount) | |
| if burnRes: | |
| context.user_data.pop("attempts", | |
| None) # Reset attempt count for next time | |
| lol = str("π₯π₯π₯π₯π₯Burnπ₯π₯π₯π₯π₯") | |
| await update.message.reply_text( | |
| f"{lol}\n{burnRes['message']}\n{burnRes['url']}") | |
| return ConversationHandler.END | |
| else: | |
| if attempts_made < 3: | |
| await update.message.reply_text( | |
| f"Attempt {attempts_made} failed. Please enter a valid number.") | |
| return GET_NUMBER | |
| else: | |
| await update.message.reply_text( | |
| "Maximum attempts reached. Canceling operation.") | |
| context.user_data.pop("attempts", None) # Ensure to reset for future use | |
| return ConversationHandler.END | |
| async def cancelBurnConvo(update: Update, | |
| context: ContextTypes.DEFAULT_TYPE) -> int: | |
| await update.message.reply_text( | |
| 'Operation cancelled becasue you are retarded') | |
| context.user_data.pop("attempts", None) # Reset attempt count for next time | |
| return ConversationHandler.END | |
| burn_tasks = {} | |
| # Async task to schedule burning tokens | |
| async def schedule_burn_tokens(bot, chat_id, amount, mint_address): | |
| try: | |
| while True: | |
| result = await burn_tokens(amount) | |
| if result: | |
| lol = str("π₯π₯π₯π₯π₯Burnπ₯π₯π₯π₯π₯") | |
| message = f"{lol}\n{result['message']}\n{result['url']}" | |
| else: | |
| message = f"Woopsy: {str(result)}" | |
| await bot.send_message(chat_id=chat_id, text=message) | |
| await asyncio.sleep(15) # Wait for 15 seconds before the next call | |
| finally: | |
| burn_tasks.pop(chat_id, | |
| None) # Remove the task from the dictionary when done | |
| # Command handler to toggle burning tokens on and off | |
| async def toggle_burn(update: Update, context: ContextTypes.DEFAULT_TYPE): | |
| chat_id = update.effective_chat.id | |
| amount = Burn_AMOUNT # Replace with the actual amount to burn | |
| mint_address = Contract_Address # Replace with the actual mint address | |
| if chat_id in burn_tasks: | |
| # Task exists, so cancel it | |
| burn_tasks[chat_id].cancel() | |
| await context.bot.send_message(chat_id=chat_id, | |
| text="Token burning stopped.") | |
| else: | |
| # No task exists, so start it | |
| task = asyncio.create_task( | |
| schedule_burn_tokens(context.bot, chat_id, amount, mint_address)) | |
| burn_tasks[chat_id] = task | |
| await context.bot.send_message( | |
| chat_id=chat_id, | |
| text= | |
| f"APRIL FOOLS YOU CUNT <3 Token burning {Burn_AMOUNT} MEMEME started every {TIMER_INTERVAL} seconds" | |
| ) | |
| async def startAirdrop(): | |
| payload = {"contractAddress": Contract_Address} | |
| headers = {"Content-Type": "application/json"} | |
| try: | |
| response = requests.post(Airdrop_URL, json=payload, headers=headers) | |
| if response: | |
| print(f"Response: {response.json()}") | |
| return response.json() | |
| except Exception as e: | |
| print(f"Failed: {str(e)}") | |
| return None | |
| async def airdrop(update, context: ContextTypes.DEFAULT_TYPE): | |
| try: | |
| airdropRes = await startAirdrop() | |
| if airdropRes: | |
| succcess = airdropRes['success'] | |
| lol = str("π¦π¦π¦π¦Airdropπ¦π¦π¦π¦π¦") | |
| if succcess: | |
| await update.message.reply_text( | |
| f"{lol}\n{airdropRes['message']}\nTransactions:\n{airdropRes['txIds']}" | |
| ) | |
| else: | |
| await update.message.reply_text(f"{lol}\n{airdropRes['error']}\n") | |
| except Exception as e: | |
| await update.message.reply_text(f"Ooof:\n{e}\n") | |
| async def getPrice(): | |
| headers = {"Content-Type": "application/json"} | |
| try: | |
| response = requests.get(Price_URL, headers=headers) | |
| if response: | |
| print("Success") | |
| return response.json() | |
| except Exception as e: | |
| print(f"Failed: {e}") | |
| return None | |
| async def price(update, context: ContextTypes.DEFAULT_TYPE): | |
| try: | |
| priceRes = await getPrice() | |
| if priceRes: | |
| lol = str( | |
| "This is the current price based on supply and USD Market Cap: ") | |
| await update.message.reply_text(f"{lol}\n${priceRes['price']}") | |
| except Exception as e: | |
| await update.message.reply_text(f"{e}") | |
| # END API FUNCTIONS | |
| def main(): | |
| application = Application.builder().token(YOUR_BOT_TOKEN).build() | |
| conv_handler = ConversationHandler( | |
| entry_points=[CommandHandler('burn', startBurnConvo)], | |
| states={ | |
| GET_NUMBER: | |
| [MessageHandler(filters.TEXT & ~filters.COMMAND, get_number)], | |
| }, | |
| fallbacks=[CommandHandler('cancel', cancelBurnConvo)], | |
| ) | |
| application.add_handler(conv_handler) | |
| application.add_handler( | |
| MessageHandler(filters.TEXT & ~filters.COMMAND, message_handler)) | |
| # Useful | |
| application.add_handler(CommandHandler("about", about)) | |
| application.add_handler(CommandHandler("site", site)) | |
| application.add_handler(CommandHandler("yap", toggleYap)) | |
| # Token Specific | |
| application.add_handler(CommandHandler("left", left)) | |
| application.add_handler(CommandHandler("price", price)) | |
| application.add_handler(CommandHandler("airdrop", airdrop)) | |
| application.add_handler(CommandHandler("ba", BA)) | |
| application.add_handler(CommandHandler("autoburn", toggle_burn)) | |
| application.add_handler(CommandHandler("ca", CA)) | |
| application.add_handler(CommandHandler("ba", BA)) | |
| application.add_handler(CommandHandler("pump", pump)) | |
| # Funny | |
| application.add_handler(CommandHandler("wherethefuck", wherethefuck)) | |
| application.add_handler(CommandHandler("meme", meme)) | |
| application.add_handler(CommandHandler("meme1", meme1)) | |
| application.add_handler(CommandHandler("meme2", meme2)) | |
| application.add_handler(CommandHandler("thisguyfucks", thisguy)) | |
| application.add_handler(CommandHandler("dev", dev)) | |
| # Need to Fix | |
| application.add_handler(CommandHandler("start", start_adding)) | |
| application.add_handler(CommandHandler("stop", stop_adding)) | |
| application.add_handler(CommandHandler("export", export_addresses)) | |
| application.add_handler(CommandHandler("list", list_addresses)) | |
| application.add_handler(CommandHandler("stop", stop_adding)) | |
| application.add_handler(CommandHandler("export", export_addresses)) | |
| # To Do | |
| application.add_handler(CommandHandler("ai", ai)) | |
| application.add_handler(CommandHandler("generate", generate)) | |
| # Old | |
| application.add_handler(CommandHandler("heytellhim", heyTellHim)) | |
| application.add_handler(CommandHandler("launch", launch)) | |
| application.add_handler(CommandHandler("ready", ready)) | |
| print("Bot started with enhanced functionality!", flush=True) | |
| application.run_polling() | |
| if __name__ == '__main__': | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment