Last active
January 13, 2018 05:01
-
-
Save busbey/434d83bafb8ef9faa2376b616f295dea to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Run with --help for cli options | |
# | |
# Look below for the section marked XXX on how to enable deletes | |
# | |
# python2 old_slack_files.py --aggregate-by-type --domain example path/to/my/example.oauth.token.file | |
# | |
# Original content from | |
# | |
# https://www.shiftedup.com/2014/11/13/how-to-bulk-remove-files-from-slack | |
# | |
# Modifications (c) Sean Busbey and licensed under ALv2 | |
# | |
# changes: | |
# * don't delete unless opt-in | |
# * add arguments for limiting to just one user | |
# * handle result pagination | |
# * don't delete starred items unless told to | |
# * don't delete private items unless told to | |
# * add arguments for domain and OATH token | |
# * add summary of bytes for requested deletes | |
# * add summary of things we skip | |
# * don't delete pinned items unless told to | |
# * add argument for how old files must be to be eligible | |
# * optionally print file information | |
# * optionally provide breakdown of files by author and type of file | |
# * take user arg optionally as an email address | |
# | |
import argparse | |
import pprint | |
import requests | |
import json | |
import sys | |
import calendar | |
from datetime import datetime, timedelta | |
import locale | |
locale.setlocale(locale.LC_ALL, 'en_US') | |
parser = argparse.ArgumentParser() | |
parser.add_argument('--delete', action='store_true', help='delete summarized files. default is read-only.') | |
# TODO make this a list of users | |
parser.add_argument('-u', '--only-user', help='only include files from the given user (use email or internal Slack ID)') | |
parser.add_argument('-i', '--include-private', action='store_true', help='include non-public items.') | |
parser.add_argument('-s', '--include-starred', action='store_true', help='include starred items.') | |
parser.add_argument('--domain', default="theparlour", help='slack domain, eg https://theparlour.slack.com/ => theparlour') | |
parser.add_argument('-d', '--days-old', type=int, default=90, help='only select files that are at least this age in days. pass 0 for "everything". default 90.') | |
parser.add_argument('-p', '--include-pinned', action='store_true', help='include pinned items.') | |
parser.add_argument('-l', '--info', action='store_true', help='print out information about selected files.') | |
parser.add_argument('--debug', action='store_true', help='give more details about in-progress effort.') | |
parser.add_argument('token_file', type=argparse.FileType('r'), help='should contain just the OAUTH token from your installed app.') | |
parser.add_argument('-t', '--only-type', default="images,videos,pdfs", help='only include files of the given type. valid options: all, images, videos, zips, pdfs, spaces, snippets, gdocs default: images,videos,pdfs') | |
parser.add_argument('--aggregate-by-type', action='store_true', help='provide counts aggregated by file type') | |
parser.add_argument('--aggregate-by-user', action='store_true', help='provide counts aggregated by authoring user ID') | |
parser.add_argument('--user-names', action='store_true', help='when printing file info or aggregates, use user name instead of slack ID') | |
args = parser.parse_args() | |
# TODO list of tokens, one per line in the file. | |
_token = args.token_file.readline().strip() | |
def lookup_user(id, cache, token): | |
if not id in cache: | |
result = requests.post('https://slack.com/api/users.info', data = { | |
'token': token, | |
'user': id | |
}).json() | |
if result["ok"]: | |
cache[id] = result["user"] | |
else: | |
cache[id] = {'id': id, 'name': id} | |
return cache[id] | |
if __name__ == '__main__': | |
if args.only_user: | |
if '@' in args.only_user: | |
request_url = 'https://slack.com/api/users.lookupByEmail' | |
request_data = { 'token' : _token, 'email' : args.only_user } | |
else: | |
request_url = 'https://slack.com/api/users.info' | |
request_data = { 'token' : _token, 'user' : args.only_user } | |
result = requests.post(request_url, data = request_data).json() | |
if result["ok"]: | |
author = result["user"] | |
else: | |
print "Couldn't find user given in the --only-user option." | |
if args.debug: | |
pprint.pprint(result) | |
sys.exit(-1) | |
if args.debug: | |
if args.only_user: | |
print "Only include files from %s (id %s)" % (author["name"], author["id"]) | |
if args.only_type: | |
print "Only include files of type %s" % (args.only_type) | |
if args.days_old: | |
print "picking out files that are older than %i days" % (args.days_old) | |
if not args.delete: | |
print "Read only mode. Will summarize files but no deleting. Pass --delete to attempt removing files." | |
page = 1 | |
pages = 1 | |
users = {} | |
files_to_delete = [] | |
# TODO summary by user | |
bytes_deleted = 0 | |
files_skipped_star = 0 | |
bytes_skipped_star = 0 | |
files_skipped_private = 0 | |
bytes_skipped_private = 0 | |
files_skipped_pinned = 0 | |
bytes_skipped_pinned = 0 | |
files_aggregate = {'total' : {'count': 0, 'bytes': 0}} if args.aggregate_by_type or args.aggregate_by_user else None | |
while page <= pages: | |
files_list_url = 'https://slack.com/api/files.list' | |
date = str(calendar.timegm((datetime.now() + timedelta(-1 * args.days_old)) | |
.utctimetuple())) | |
data = {"token": _token, "ts_to": date, "page": page, "types": args.only_type} | |
if args.only_user: | |
data["user"] = author["id"] | |
response = requests.post(files_list_url, data = data) | |
result = response.json() | |
if len(result["files"]) == 0: | |
break | |
if args.debug: | |
print "results include %i files on page %i, of total %i files on %i pages" % (len(result["files"]), result["paging"]["page"], result["paging"]["total"], result["paging"]["pages"]) | |
pages = result["paging"]["pages"] | |
for f in result["files"]: | |
if args.only_user and author["id"] != f["user"]: | |
continue | |
if not args.include_starred and "num_stars" in f and f["num_stars"] > 0: | |
files_skipped_star += 1 | |
bytes_skipped_star += f["size"] | |
continue | |
if not args.include_private and not f["is_public"]: | |
files_skipped_private += 1 | |
bytes_skipped_private += f["size"] | |
continue | |
if not args.include_pinned and "pinned_to" in f and len(f["pinned_to"]) > 0: | |
files_skipped_pinned += 1 | |
bytes_skipped_pinned += f["size"] | |
continue | |
bytes_deleted += f["size"] | |
files_to_delete.append(f) | |
if args.info: | |
print "Info on '" + f["name"] + "':" | |
print "\tFile is %s" % ("public" if f["is_public"] else "private") | |
print "\tFile title: %s" % (f["title"]) | |
print "\tFile author: %s" % (lookup_user(f["user"], users, _token)["name"] if args.user_names else f["user"]) | |
print "\tFile type: %s" % (f["filetype"]) | |
print "\tFile size: %i" % (f["size"]) | |
print "\tFile has %i comments" % (f["comments_count"]) | |
print "\tFile is in %i channels" % (len(f["channels"])) | |
print "\tFile is in %i groups" % (len(f["groups"])) | |
print "\tFile is in %i ims" % (len(f["ims"]) if "ims" in f else 0) | |
print "\tFile has %i stars" % (f["num_stars"] if "num_stars" in f else 0) | |
print "\tFile is pinned in %i places" % (len(f["pinned_to"]) if "pinned_to" in f else 0) | |
if files_aggregate: | |
aggregate_dict = files_aggregate | |
increments = [files_aggregate['total']] | |
if args.aggregate_by_user: | |
user_key = lookup_user(f["user"], users, _token)["name"] if args.user_names else f["user"] | |
if not user_key in aggregate_dict: | |
aggregate_dict[user_key] = {'total' : {'count':0, 'bytes':0}} if args.aggregate_by_type else {'count': 0, 'bytes': 0} | |
aggregate_dict = aggregate_dict[user_key] | |
if args.aggregate_by_type: | |
increments.append(aggregate_dict['total']) | |
else: | |
increments.append(aggregate_dict) | |
if args.aggregate_by_type: | |
if not f["filetype"] in aggregate_dict: | |
aggregate_dict[f["filetype"]] = {'count': 0, 'bytes': 0} | |
increments.append(aggregate_dict[f["filetype"]]) | |
for incr in increments: | |
incr['count'] += 1 | |
incr['bytes'] += f['size'] | |
page += 1 | |
if files_skipped_star > 0: | |
print "skipped %i files because they were starred. would have claimed %s additional bytes. pass --include-starred to include them." % (files_skipped_star, locale.format("%d", bytes_skipped_star, grouping=True)) | |
if files_skipped_private > 0: | |
print "skipped %i files because they were private. would have claimed %s additional bytes. pass --include-private to include them." % (files_skipped_private, locale.format("%d", bytes_skipped_private, grouping=True)) | |
if files_skipped_pinned > 0: | |
print "skipped %i files because they were pinned. would have claimed %s additional bytes. pass --include-pinned to include them." % (files_skipped_pinned, locale.format("%d", bytes_skipped_pinned, grouping=True)) | |
if args.delete: | |
print "Attempting to reclaim %s bytes from %i files." % (locale.format("%d", bytes_deleted, grouping=True), len(files_to_delete)) | |
failed_count = 0 | |
failed_bytes = 0 | |
for f in files_to_delete: | |
if args.debug: | |
print "Deleting file " + f["name"] + "..." | |
timestamp = str(calendar.timegm(datetime.now().utctimetuple())) | |
delete_url = "https://" + args.domain + ".slack.com/api/files.delete?t=" + timestamp | |
# XXX Delete this line and the one below, then uncomment the following lines when ready for file deletion | |
delete_response = { "ok" : False } | |
# delete_response = requests.post(delete_url, data = { | |
# "token": _token, | |
# "file": f["id"], | |
# "set_active": "true", | |
# "_attempts": "1"}).json() | |
if not delete_response["ok"]: | |
failed_count += 1 | |
failed_bytes += f["size"] | |
if args.debug: | |
print "Failed to delete file %s" % (f["name"]) | |
pprint.pprint(delete_response) | |
if failed_count > 0: | |
print "Failed to remove %i files, for a total of %s bytes." % (failed_count, locale.format("%d", failed_bytes, grouping=True)) | |
print "DONE!" | |
else: | |
print "Total: %s bytes from %i files." % (locale.format("%d", bytes_deleted, grouping=True), len(files_to_delete)) | |
if files_aggregate: | |
print "breakdown of files:" | |
pprint.pprint(files_aggregate) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment