Created
April 24, 2019 03:23
-
-
Save peketamin/632b79039deca2fb72584f351bb6cdb6 to your computer and use it in GitHub Desktop.
Slack notification
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
"""Slack notification | |
This needs: | |
- requests==2.11.1 | |
- slackclient==1.3.1 | |
""" | |
import logging | |
import os | |
from socket import gethostname | |
from textwrap import dedent | |
import requests | |
from slackclient import SlackClient | |
logger = logging.getLogger(__name__) | |
# Slack {{{ | |
# ref: https://api.slack.com/apps/AJ4TM84UW/general | |
class SlackAPI: | |
"""ref: https://api.slack.com/methods/chat.postMessage""" | |
def __init__(self, channel_name: str): | |
_slack_token = os.environ["SLACK_API_TOKEN"] | |
if not _slack_token: | |
raise ValueError('ENV: SLACK_API_TOKEN must be set.') | |
self._sc = SlackClient(_slack_token) | |
self.channel_name = channel_name | |
def post(self, message): | |
self._sc.api_call( | |
"chat.postMessage", | |
channel=self.channel_name, | |
text=dedent( | |
f"""\ | |
*[{gethostname()}]* | |
{message} | |
""" | |
), | |
) | |
class SlackWebhook: | |
"""ref: https://api.slack.com/methods/chat.postMessage""" | |
def __init__(self): | |
_webhook_url = os.getenv('SLACK_WEBHOOK_ENDPOINT') | |
if not _webhook_url: | |
raise ValueError('ENV: SLACK_WEBHOOK_ENDPOINT must be set.') | |
self.webhook_url = _webhook_url | |
def post(self, message: str): | |
r = requests.post( | |
self.webhook_url, | |
json={ | |
"text": dedent( | |
f"""\ | |
*[{gethostname()}]* | |
{message} | |
""" | |
), | |
} | |
) | |
if not r.ok: | |
logger.error(f'[SlackWebhook.message] Failed to post: {r.text}') | |
# }}} /Slack | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment