Last active
November 29, 2017 03:56
-
-
Save iandexter/b18115265da2d42225b93d93dc57c1ad to your computer and use it in GitHub Desktop.
Clean up Slack workspace
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
"""Clean up Slack workspace""" | |
# -*- coding: utf-8 -*- | |
# pylint: disable=invalid-name,redefined-outer-name,logging-format-interpolation | |
from __future__ import print_function | |
import logging | |
import os | |
import sys | |
import requests | |
DEBUG = True | |
logging.basicConfig() | |
logger = logging.getLogger(__name__) | |
if DEBUG: | |
logger.setLevel(logging.DEBUG) | |
else: | |
logger.setLevel(logging.INFO) | |
"""Get the legacy Slack API token from | |
https://api.slack.com/custom-integrations/legacy-tokens""" | |
if os.getenv('SLACK_TOKEN'): | |
SLACK_TOKEN = os.getenv('SLACK_TOKEN') | |
else: | |
logger.error('No Slack API token') | |
sys.exit(1) | |
SIZE_LIMIT = 1048576 # 1Mb ; Set to 0 for ALL files | |
def encode_filename(filename): | |
"""Encode the filename to utf-8""" | |
return u''.join(filename).encode('utf-8') | |
def bytes_2_human_readable(number_of_bytes): | |
"""Cribbed from https://stackoverflow.com/a/37423778""" | |
if number_of_bytes < 0: | |
raise ValueError("!!! numberOfBytes can't be smaller than 0 !!!") | |
step_to_greater_unit = 1024. | |
number_of_bytes = float(number_of_bytes) | |
unit = 'bytes' | |
if (number_of_bytes / step_to_greater_unit) >= 1: | |
number_of_bytes /= step_to_greater_unit | |
unit = 'KB' | |
if (number_of_bytes / step_to_greater_unit) >= 1: | |
number_of_bytes /= step_to_greater_unit | |
unit = 'MB' | |
if (number_of_bytes / step_to_greater_unit) >= 1: | |
number_of_bytes /= step_to_greater_unit | |
unit = 'GB' | |
if (number_of_bytes / step_to_greater_unit) >= 1: | |
number_of_bytes /= step_to_greater_unit | |
unit = 'TB' | |
precision = 1 | |
number_of_bytes = round(number_of_bytes, precision) | |
return str(number_of_bytes) + ' ' + unit | |
def to_plural(num=0, text=''): | |
"""Convert nount to plural""" | |
return "%d %s%s" % (num, text, "s"[num == 1:]) | |
def call_api(method, payload): | |
"""Form API URL""" | |
api_url = 'https://slack.com/api' | |
url = '{}/{}'.format(api_url, method) | |
payload['token'] = SLACK_TOKEN | |
try: | |
resp = requests.post(url, data=payload) | |
except requests.exceptions.RequestException as err: | |
logger.error(err) | |
return {} | |
return resp.json() | |
def get_users(): | |
"""Get user list""" | |
users = [] | |
payload = {'limit':500} | |
users_list = call_api('users.list', payload) | |
if users_list: | |
for user in users_list['members']: | |
users.append({'id':user['id'], 'name':user['name']}) | |
logger.info('Got {} users'.format(len(users))) | |
return sorted(users, key=lambda k: k['name']) | |
return [] | |
def get_files(user): | |
"""Get list of files for a user""" | |
payload = {'count':1000, 'user':user['id']} | |
files_list = call_api('files.list', payload) | |
if files_list: | |
total = files_list['paging']['total'] | |
files_list['for_deletion'] = 0 | |
if total > 0: | |
for_deletion = [] | |
for f in files_list['files']: | |
to_be_deleted = delete_file(f['id']) | |
if to_be_deleted: | |
for_deletion.append(to_be_deleted) | |
total_for_deletion = len(for_deletion) | |
if total_for_deletion > 0: | |
logger.debug('Deleted {} from {}'.format( | |
to_plural(total_for_deletion, 'file'), | |
user['name'] | |
)) | |
files_list['for_deletion'] = len(for_deletion) | |
return files_list | |
return [] | |
def get_user_files(): | |
"""Return list of users with files""" | |
users = [] | |
for user in get_users(): | |
files = get_files(user) | |
user['files'] = files['paging']['total'] | |
user['for_deletion'] = files['for_deletion'] | |
if user['files'] > 0: | |
users.append(user) | |
return sorted(users, key=lambda k: k['files']) | |
def delete_file(filenum): | |
"""Delete files""" | |
payload = {'file':filenum} | |
files_info = call_api('files.info', payload) | |
if files_info and files_info['file']['size'] > SIZE_LIMIT: | |
logger.debug('{} ({}, {})'.format( | |
encode_filename(files_info['file']['name']), | |
files_info['file']['mimetype'], | |
bytes_2_human_readable(files_info['file']['size']) | |
)) | |
files_delete = call_api('files.delete', payload) | |
if files_delete['ok']: | |
return filenum | |
return None | |
if __name__ == '__main__': | |
for user in get_user_files(): | |
print('{:24} {:12} {:>3} {:>3}'.format( | |
user['name'], | |
user['id'], | |
user['files'], | |
user['for_deletion'] | |
)) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/bin/bash | |
slack_token=GET_YOUR_OWN_LEGACY_API_TOKEN | |
slack_url="https://slack.com/api" | |
email=YOUR_EMAIL_GOES_HERE | |
url_teaminfo="${slack_url}/team.info?token=${slack_token}" | |
team=$(curl -s ${url_teaminfo} | jq -r .team.id) | |
url_userinfo="${slack_url}/auth.findUser?token=${slack_token}&team=${team}&email=${email}" | |
user=$(curl -s ${url_userinfo} | jq -r .user_id) | |
url_filelist="${slack_url}/files.list?token=${slack_token}&count=500&user=${user}" | |
url_filedelete="${slack_url}/files.delete?token=${slack_token}" | |
echo -n "Deleting files for ${email} " | |
for f in $(curl -s ${url_filelist} | jq -r .files[].id) ; do | |
curl -s "${url_filedelete}&file=${f}" &>/dev/null | |
echo -n "." | |
done | |
echo " done" |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment