Last active
July 30, 2019 04:27
-
-
Save cicdw/cad4bc7e116130691dc20b51fc3cfd07 to your computer and use it in GitHub Desktop.
choppy version of the Prefect reminder Flow
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
@task | |
def get_collection_name(): | |
""" | |
Returns the current date, formatted, which maps | |
to a Google Firestore collection name. | |
""" | |
date_format = "%Y-%m-%d" | |
now = prefect.context["scheduled_start_time"] | |
return now.strftime(date_format) | |
@task(max_retries=2, retry_delay=datetime.timedelta(minutes=1)) | |
def get_latest_updates(date): | |
""" | |
Returns dictionary of team members -> their updates from a given date. | |
""" | |
client = Client(project="marvin-standup") | |
collection = client.collection(f"standup/{date}/users") | |
updates = collection.get() | |
user_dict = {doc.id: (doc.to_dict() or {}).get("updates") for doc in updates} | |
return user_dict | |
@task | |
def get_team(): | |
""" | |
Retrieve all current full-time Slack users. | |
Returns: | |
- a list of (user name, Slack User ID) tuples | |
""" | |
client = google.cloud.firestore.Client(project="marvin-standup") | |
collection = client.collection("users") | |
users = [u.to_dict() for u in collection.get()] | |
return [(u["name"], u["slack"]) for u in users if u["office"] == "DC"] | |
@task | |
def is_reminder_needed(user_info, current_updates): | |
user_name, user_id = user_info | |
if current_updates.get(user_name.lower()) is not None: | |
raise SKIP(f"{user_name} has already provided an update") | |
else: | |
return user_info | |
@task(max_retries=2, retry_delay=datetime.timedelta(minutes=1)) | |
def send_reminder(user_info): | |
user_name, user_id = user_info | |
TOKEN = Secret("MARVIN_TOKEN").get() | |
## get private channel ID for this user | |
params = {"token": TOKEN, "user": user_id} | |
r = requests.post("https://slack.com/api/im.open", data=params) | |
channel_id = json.loads(r.text)["channel"]["id"] | |
params.pop("user") | |
text = ( | |
f"Hi {user_name}! I haven't heard from you yet; what updates do you have for the team today?" | |
" Please respond by using the slash command `/standup`" | |
" and remember: your response will be shared!" | |
) | |
params.update( | |
{ | |
"as_user": "true", | |
"link_names": "true", | |
"mrkdwn": "true", | |
"channel": channel_id, | |
"text": text, | |
} | |
) | |
r = requests.post("https://slack.com/api/chat.postMessage", data=params) | |
r.raise_for_status() | |
if r.json()["ok"] is False: | |
raise ValueError(r.json().get("error", "Requests error")) | |
return user_name | |
@task(skip_on_upstream_skip=False) | |
def report(users): | |
url = Secret("SLACK_WEBHOOK_URL").get() | |
user_string = ", ".join([user for user in users if user != NoResult]) | |
if user_string.strip() == "": | |
user_string = ":marvin-parrot:" | |
message = f"Reminders sent via Prefect `v{prefect.__version__}`: {user_string}" | |
r = requests.post( | |
url, json={"text": message, "mrkdwn": "true", "link_names": "true"} | |
) | |
weekday_schedule = CronSchedule("30 13 * * 1-5") | |
with Flow(name="standup-reminder", schedule=weekday_schedule) as flow: | |
updates = get_latest_updates(get_collection_name) | |
reminder_flag = is_reminder_needed.map(get_team, unmapped(updates)) | |
result = send_reminder.map(reminder_flag) | |
final = report(result) | |
flow.set_reference_tasks([result]) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment