Skip to content

Instantly share code, notes, and snippets.

@cicdw
Last active July 30, 2019 04:27
Show Gist options
  • Save cicdw/cad4bc7e116130691dc20b51fc3cfd07 to your computer and use it in GitHub Desktop.
Save cicdw/cad4bc7e116130691dc20b51fc3cfd07 to your computer and use it in GitHub Desktop.
choppy version of the Prefect reminder Flow
@task
def get_collection_name():
"""
Returns the current date, formatted, which maps
to a Google Firestore collection name.
"""
date_format = "%Y-%m-%d"
now = prefect.context["scheduled_start_time"]
return now.strftime(date_format)
@task(max_retries=2, retry_delay=datetime.timedelta(minutes=1))
def get_latest_updates(date):
"""
Returns dictionary of team members -> their updates from a given date.
"""
client = Client(project="marvin-standup")
collection = client.collection(f"standup/{date}/users")
updates = collection.get()
user_dict = {doc.id: (doc.to_dict() or {}).get("updates") for doc in updates}
return user_dict
@task
def get_team():
"""
Retrieve all current full-time Slack users.
Returns:
- a list of (user name, Slack User ID) tuples
"""
client = google.cloud.firestore.Client(project="marvin-standup")
collection = client.collection("users")
users = [u.to_dict() for u in collection.get()]
return [(u["name"], u["slack"]) for u in users if u["office"] == "DC"]
@task
def is_reminder_needed(user_info, current_updates):
user_name, user_id = user_info
if current_updates.get(user_name.lower()) is not None:
raise SKIP(f"{user_name} has already provided an update")
else:
return user_info
@task(max_retries=2, retry_delay=datetime.timedelta(minutes=1))
def send_reminder(user_info):
user_name, user_id = user_info
TOKEN = Secret("MARVIN_TOKEN").get()
## get private channel ID for this user
params = {"token": TOKEN, "user": user_id}
r = requests.post("https://slack.com/api/im.open", data=params)
channel_id = json.loads(r.text)["channel"]["id"]
params.pop("user")
text = (
f"Hi {user_name}! I haven't heard from you yet; what updates do you have for the team today?"
" Please respond by using the slash command `/standup`"
" and remember: your response will be shared!"
)
params.update(
{
"as_user": "true",
"link_names": "true",
"mrkdwn": "true",
"channel": channel_id,
"text": text,
}
)
r = requests.post("https://slack.com/api/chat.postMessage", data=params)
r.raise_for_status()
if r.json()["ok"] is False:
raise ValueError(r.json().get("error", "Requests error"))
return user_name
@task(skip_on_upstream_skip=False)
def report(users):
url = Secret("SLACK_WEBHOOK_URL").get()
user_string = ", ".join([user for user in users if user != NoResult])
if user_string.strip() == "":
user_string = ":marvin-parrot:"
message = f"Reminders sent via Prefect `v{prefect.__version__}`: {user_string}"
r = requests.post(
url, json={"text": message, "mrkdwn": "true", "link_names": "true"}
)
weekday_schedule = CronSchedule("30 13 * * 1-5")
with Flow(name="standup-reminder", schedule=weekday_schedule) as flow:
updates = get_latest_updates(get_collection_name)
reminder_flag = is_reminder_needed.map(get_team, unmapped(updates))
result = send_reminder.map(reminder_flag)
final = report(result)
flow.set_reference_tasks([result])
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment