Skip to content

Instantly share code, notes, and snippets.

@rffontenelle
Created June 17, 2025 18:41
Show Gist options
  • Select an option

  • Save rffontenelle/fe2b6c9b0d18f25e3fc424bd94721ac3 to your computer and use it in GitHub Desktop.

Select an option

Save rffontenelle/fe2b6c9b0d18f25e3fc424bd94721ac3 to your computer and use it in GitHub Desktop.
import asyncio
import logging
from nio import AsyncClient, LoginResponse
# Configuration
HOMESERVER = "YOUR_HOMESERVER" # e.g. https://matrix.org
USERNAME = "@your_user:HOMESERVER" # e.g. @john_doe:matrix.org
PASSWORD = "your_password"
ROOM_ID = "!room_id:HOMESERVER" # or room name as #room_name:homeserver
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("my-app")
async def main():
client = AsyncClient(HOMESERVER, USERNAME)
# Authenticate with username and password
print("Authenticating...")
response = await client.login(PASSWORD)
if isinstance(response, LoginResponse):
logger.info("Successfully authenticated.")
else:
logger.error(f"Authentication failed: {response}")
return
# Ensure we are in the target room
joined_rooms = await client.joined_rooms()
if ROOM_ID not in joined_rooms.rooms:
logger.info(f"Not in room {ROOM_ID}, attempting to join...")
join_response = await client.join(ROOM_ID)
if hasattr(join_response, "room_id"):
logger.info("Successfully joined the room.")
else:
logger.error(f"Failed to join the room: {join_response}")
return
# Fetch room members
logger.info(f"Fetching members of room {ROOM_ID}...")
members_response = await client.joined_members(ROOM_ID)
if hasattr(members_response, "members"):
t2bot_users = []
print("Members:")
for member in members_response.members:
if ':t2bot.io' in member.user_id:
print(f" - {member.user_id} ({member.display_name})")
t2bot_users.append(member.user_id)
print(f'{len(t2bot_users)} t2bot.io users found')
logger.info("Kicking Telegram-Matrix bridge users (t2bot.io)...")
reason = "Removing Telegram-Matrix bridge accounts from t2bot"
for user_to_kick in t2bot_users:
kick_response = await client.room_kick(ROOM_ID, user_to_kick, reason)
else:
print(f"Failed to fetch members: {members_response}")
await client.logout()
await client.close()
if __name__ == "__main__":
asyncio.run(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment