Created
February 25, 2019 01:02
-
-
Save cicdw/749da99b68ed0c09790705d54902091a to your computer and use it in GitHub Desktop.
complete code of the standup Prefect flow
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import datetime | |
from google.cloud.firestore import Client | |
import random | |
import requests | |
import prefect | |
from prefect import Flow, Parameter, task | |
from prefect.client import Secret | |
from prefect.schedules import CronSchedule | |
@task | |
def get_collection_name(): | |
""" | |
Returns the current date, formatted, which maps | |
to a Google Firestore collection name. | |
""" | |
date_format = "%Y-%m-%d" | |
now = prefect.context["scheduled_start_time"] | |
return now.strftime(date_format) | |
@task(max_retries=2, retry_delay=datetime.timedelta(minutes=1)) | |
def get_latest_updates(date): | |
""" | |
Returns dictionary of team members -> their updates from a given date. | |
""" | |
client = Client(project="marvin-standup") | |
collection = client.collection(f"standup/{date}/users") | |
updates = collection.get() | |
user_dict = {doc.id: (doc.to_dict() or {}).get("updates") for doc in updates} | |
return user_dict | |
@task(max_retries=2, retry_delay=datetime.timedelta(minutes=1)) | |
def post_standup(updates, channel): | |
""" | |
Given a dictionary of updates and a Slack Channel ID, | |
posts updates to the given channel as the Marvin user. | |
""" | |
public_msg = ( | |
f"<!here> are today's standup updates:\n" | |
+ "=" * 30 | |
) | |
items = list(updates.items()) | |
random.shuffle(items) | |
for user, update in items: | |
public_msg += f"\n*{user}*: {update}" | |
TOKEN = Secret("MARVIN_TOKEN").get() # use a Prefect Secret instead | |
params = { | |
"token": TOKEN, | |
"as_user": "true", | |
"link_names": "true", | |
"mrkdwn": "true", | |
"channel": channel, | |
"text": public_msg, | |
} | |
r = requests.post("https://slack.com/api/chat.postMessage", data=params) | |
r.raise_for_status() | |
if r.json()["ok"] is False: | |
raise ValueError(r.json().get("error", "Slack error")) | |
return r | |
weekday_schedule = CronSchedule("0 14 * * 1-5") | |
with Flow(name="post-standup", schedule=weekday_schedule) as flow: | |
standup_channel = Parameter("standup_channel", default="XXXXXXXXX") | |
res = post_standup(get_latest_updates(get_collection_name()), standup_channel) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment