Last active
May 24, 2022 19:55
-
-
Save dudanogueira/1e032b0500d00a0a6a0e3abbca53142d to your computer and use it in GitHub Desktop.
Simple script to remove a user and recreated it inactive for Rocket.Chat
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# LICENSE: MIT | |
# Author: [email protected] | |
# | |
# Required permissions: | |
# - delete-user | |
# - create-user | |
# | |
# Description: This script will: | |
# - Get provided users from a Rocket.Chat Server | |
# - Remove those users | |
# - Create those users, with same email, username, but with random password and deactivated | |
# $ python3 rocketchat.user.remove.deactivate.py -h | |
# usage: rocketchat.user.remove.deactivate.py [-h] [-dry] usernames | |
# Rocket.Chat User Pruner | |
# positional arguments: | |
# usernames Usernames to inspect, optionally comma separated | |
# optional arguments: | |
# -h, --help show this help message and exit | |
# -dry, --dry-run dry/test run. Only shows what will be done (default: False) | |
import argparse | |
import requests | |
import os | |
import random | |
import string | |
ROCKETCHAT_HOST = os.environ.get( | |
"ROCKETCHAT_USER_DEACTIVATOR_HOST", "http://localhost:3000") | |
ROCKETCHAT_USERID = os.environ.get( | |
"ROCKETCHAT_USER_DEACTIVATOR_USERID", "avmX38wqkezv55obK") | |
ROCKETCHAT_TOKEN = os.environ.get( | |
"ROCKETCHAT_USER_DEACTIVATOR_TOKEN", "Dzv4ETunGY5YlzJNiN23GutG_dfGcp27GO9ISO6fDn9") | |
headers = { | |
"X-Auth-Token": ROCKETCHAT_TOKEN, | |
"X-User-Id": ROCKETCHAT_USERID, | |
} | |
parser = argparse.ArgumentParser(description="Rocket.Chat User Pruner", | |
formatter_class=argparse.ArgumentDefaultsHelpFormatter) | |
parser.add_argument( | |
"usernames", help="Usernames to inspect, optionally comma separated") | |
parser.add_argument("-dry", "--dry-run", | |
help="dry/test run. Only shows what will be done", action="store_true") | |
args = parser.parse_args() | |
config = vars(args) | |
def get_user(username): | |
url_get_user = ROCKETCHAT_HOST + \ | |
'/api/v1/users.info?fields={"userRooms": 1}&username=' + username | |
return requests.get(url_get_user, headers=headers) | |
def delete_user(user_id): | |
url_update_user = ROCKETCHAT_HOST + "/api/v1/users.delete" | |
payload = { | |
"userId": user_id, | |
"data": { | |
"active": False | |
} | |
} | |
return requests.post(url_update_user, headers=headers, json=payload) | |
def create_user(payload): | |
url_update_user = ROCKETCHAT_HOST + "/api/v1/users.create" | |
return requests.post(url_update_user, headers=headers, json=payload) | |
if __name__ == "__main__": | |
print("Config: " + str(config)) | |
for user in config["usernames"].split(","): | |
response = get_user(user) | |
if response.status_code == 200: | |
print( | |
"#" * 20 + " USERNAME FOUND {0} ({1})".format(user, response.json()["user"]["_id"])) | |
# remove user | |
if not config["dry_run"]: | |
print("Deleting user...") | |
deleted = delete_user(response.json()["user"]["_id"]) | |
if deleted.json()["success"]: | |
print("User {0} removed".format(user)) | |
else: | |
print(deleted.json()) | |
else: | |
print("DRYRUN: USER GETS DELETED") | |
# create an inactive user with bogus password | |
if deleted.json()["success"]: | |
user = response.json()["user"] | |
random_pwd = ''.join(random.choices(string.ascii_lowercase, k=15)) | |
creation_payload = { | |
"name": user["name"], | |
"email": user["emails"][0]["address"], | |
"roles": [], | |
"password": random_pwd, | |
"username": user["username"], | |
"active": False, | |
"verified": True, | |
"joinDefaultChannels": False, | |
} | |
if not config["dry_run"]: | |
print("Creating user...") | |
new_user = create_user(creation_payload) | |
if new_user.json()["success"]: | |
print("user created...") | |
else: | |
print(new_user.json()) | |
else: | |
print("DRYRUN: NEW USER PAYLOAD TO CREATE ", creation_payload) | |
elif response.status_code == 400: | |
print("#" * 20 + " USER {0} NOT FOUND!".format(user)) | |
if not response.ok: | |
print("Error: " + str(response.status_code) + " " + response.text) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment