Created
November 13, 2021 23:06
-
-
Save DoMINAToR98/58197cb7913ed344f4b53a349df24154 to your computer and use it in GitHub Desktop.
Slack Automation
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import requests | |
import json | |
import argparse | |
import sys | |
conversationList = "https://slack.com/api/conversations.list" | |
postMessage = "https://slack.com/api/chat.postMessage" | |
fileUpload = "https://slack.com/api/files.upload" | |
apiKey = "key" | |
channelIDs = {"channel_name": "channel_id"} | |
headers = {'Authorization': "Bearer {}".format(apiKey), 'Content-Type': 'application/json'} | |
headersForFile = {'Authorization': "Bearer {}".format(apiKey), 'Content-Type': 'application/x-www-form-urlencoded'} | |
def firstNotification(target: str): | |
try: | |
timestamps = {} | |
for channel in channelIDs.values(): | |
data = {"channel": channel, "text": "*{target}*".format(target=target)} | |
data = json.dumps(data) | |
con = requests.post(url=postMessage, headers=headers, data=data) | |
resp = json.loads(con.text) | |
# print(resp) | |
con.close() | |
timestamps.update({channel: resp['ts']}) | |
with open('.timestamps', 'wb') as o: | |
o.write(json.dumps(timestamps).encode()) | |
o.close() | |
print("Thread {target} started in all channels.".format(target=target)) | |
except Exception as e: | |
print("Error in web request. Try again later.") | |
print(e) | |
def startThread(channel: str, target: str): | |
timestamps = {} | |
try: | |
if channel in channelIDs.keys(): | |
data = {"channel": channelIDs.get(channel), "text": "*{target}*".format(target=target)} | |
data = json.dumps(data) | |
con = requests.post(url=postMessage, headers=headers, data=data) | |
resp = json.loads(con.text) | |
con.close() | |
timestamps.update({channel: resp['ts']}) | |
with open('.timestamps', 'ab') as o: | |
o.write(json.dumps(timestamps).encode()+b"\n") | |
o.close() | |
print("Thread {target} started in channel {channel}.".format(target=target, channel=channel)) | |
except Exception as e: | |
print("Error in web request. Try again later.") | |
print(e) | |
def getTimeStamp(channel: str): | |
try: | |
f = open('.timestamps', 'rb') | |
lines = f.readlines() | |
f.close() | |
if len(lines) != 1: | |
timestamps = [] | |
for line in lines: | |
timestamps.append(json.loads(line.strip())) | |
for obj in timestamps: | |
if channel in obj.keys(): | |
return obj[channel] | |
raise Exception("Invalid Channel ?") | |
else: | |
return json.loads(lines[0].strip())[channel] | |
except Exception as e: | |
print("File .timestamps not found. Was the first notification ever sent?\n"+str(e)) | |
exit(69) | |
def notify(msg: str, channel: str): | |
if channel in channelIDs.keys(): | |
try: | |
ts = getTimeStamp(channel) | |
data = {"channel": channelIDs[channel], "thread_ts": ts, "text": msg} | |
data = json.dumps(data) | |
con = requests.post(url=postMessage, headers=headers, data=data) | |
if con.status_code == 200: | |
print("Notification Sent") | |
except OSError: | |
print("FILE NOT FOUND.") | |
exit() | |
except Exception as e: | |
print("Error sending notification. Logging.") | |
logError(e, channel, msg) | |
else: | |
print("Invalid Channel. Aborting.") | |
def sendReport(channel: str, reportFile: str, fileName: str, nonThread: bool): | |
if channel in channelIDs.keys(): | |
try: | |
if not nonThread: | |
ts = getTimeStamp(channel) | |
data = {"channels": channelIDs[channel], "content": open(reportFile, 'rb').read(), "thread_ts": ts, | |
"filename": fileName} | |
else: | |
data = {"channels": channelIDs[channel], "content": open(reportFile, 'rb').read(), | |
"filename": fileName} | |
conn = requests.post(fileUpload, headers=headersForFile, data=data) | |
if conn.status_code == 200: | |
print("Report sent successfully") | |
except Exception as e: | |
print("Error sending report file.") | |
logError(e, channel, reportFile) | |
def nonThreadNotify(channel: str, msg: str): | |
if channel in channelIDs.keys(): | |
try: | |
data = {"channel": channel, "text": "*{message}*".format(message=msg)} | |
data = json.dumps(data) | |
con = requests.post(url=postMessage, headers=headers, data=data) | |
if con.status_code != 200: | |
raise Exception("Non 200 Status Code: "+str(con.status_code)) | |
con.close() | |
except Exception as e: | |
print("An error has occurred.") | |
logError(e, channel, msg) | |
def logError(e: Exception, channel: str, msg: str): | |
with open('ts-{channel}.log'.format(channel=channel), 'w') as log: | |
log.write(msg+'\nError: '+str(e)) | |
log.close() | |
parser = argparse.ArgumentParser(description="Send alerts and updates on Slack.") | |
parser.add_argument("--target", nargs=1, type=str, action='store', dest="target") | |
parser.add_argument("--channel", nargs=1, type=str, action='store', dest="channel", choices=channelIDs.keys()) | |
parser.add_argument("--report", nargs=1, type=str, action='store', dest="fileName") | |
parser.add_argument("--message", nargs=1, type=str, action='store', dest="message") | |
parser.add_argument("--first", action="store_true") | |
parser.add_argument("--single-channel", action="store_true") | |
parser.add_argument("--non-thread-notify", action="store_true") | |
if not(len(sys.argv) > 1): | |
parser.print_help() | |
exit(1) | |
args = parser.parse_args() | |
if "--first" in sys.argv: | |
firstNotification(args.target[0]) | |
exit(0) | |
elif "--report" in sys.argv: | |
if "--non-thread-notify" in sys.argv: | |
sendReport(channel=args.channel[0], fileName="sample", reportFile=args.fileName[0], nonThread=True) | |
else: | |
sendReport(channel=args.channel[0], fileName="sample", reportFile=args.fileName[0], nonThread=False) | |
exit(0) | |
elif "--single-channel" in sys.argv: | |
startThread(target=args.target[0], channel=args.channel[0]) | |
exit(0) | |
elif "--non-thread-notify" in sys.argv: | |
nonThreadNotify(channel=args.channel[0], msg=args.message[0]) | |
else: | |
notify(msg=args.message[0], channel=args.channel[0]) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment