Skip to content

Instantly share code, notes, and snippets.

@moskomule
Last active January 11, 2025 09:15
Show Gist options
  • Save moskomule/12f6a3cc9beab7ce6253b67dedfff14e to your computer and use it in GitHub Desktop.
Save moskomule/12f6a3cc9beab7ce6253b67dedfff14e to your computer and use it in GitHub Desktop.
# /// script
# requires-python = ">=3.11"
# dependencies = [
# "Authlib",
# "httpx",
# "rich",
# ]
# ///
import asyncio
import json
import logging
import pathlib
import httpx
from authlib.integrations.httpx_client import AsyncOAuth1Client
def extract_tweet_ids_from_archive(path: str) -> list[str]:
path = pathlib.Path(path)
tweet_ids = []
with path.open() as f:
data = f.read()
data = data.replace("window.YTD.tweets.part0 = ", "")
tweets_data = json.loads(data)
for tweet in tweets_data:
tweet_id = tweet["tweet"]["id_str"]
tweet_ids.append(tweet_id)
return tweet_ids
async def delete_tweet(
session: httpx.AsyncClient,
semaphore: asyncio.Semaphore,
id: str,
) -> bool | None:
async with semaphore:
try:
ret = await session.post(f"https://api.twitter.com/1.1/statuses/destroy/{id}.json")
if ret.status_code == httpx.codes.OK:
logging.info(f"Successfully deleted tweet ID: {id}")
return True
else:
logging.error(f"Failed to delete tweet ID: {id}. Status: {ret.status_code}")
except httpx.RequestError as e:
logging.error(f"Request failed for tweet ID: {id}. Error: {e}")
async def main(
tweets: list[str],
args,
):
semaphore = asyncio.Semaphore(100)
limits = httpx.Limits(max_connections=100, max_keepalive_connections=50)
async with AsyncOAuth1Client(
args.api_key, args.api_key_secret, args.access_token, args.access_token_secret, limits=limits
) as client:
tasks = [delete_tweet(client, semaphore, id) for id in tweets]
results = await asyncio.gather(*tasks, return_exceptions=True)
results = [res for res in results if res is not None]
logging.info(f"Finished processing {len(results)}/{len(tweets)} tweets.")
if __name__ == "__main__":
import argparse
from rich.logging import RichHandler
logging.basicConfig(level="INFO", format="%(message)s", datefmt="[%X]", handlers=[RichHandler()])
logging.getLogger("httpx").setLevel(logging.WARNING)
p = argparse.ArgumentParser()
p.add_argument("api_key")
p.add_argument("api_key_secret")
p.add_argument("access_token")
p.add_argument("access_token_secret")
p.add_argument("archive_path")
args = p.parse_args()
logging.info(f"args: {args}")
tweets = extract_tweet_ids_from_archive(args.archive_path)
asyncio.run(main(tweets, args))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment