Skip to content

Instantly share code, notes, and snippets.

View anna-geller's full-sized avatar
🎯
Focusing

Anna Geller anna-geller

🎯
Focusing
View GitHub Profile
import prefect
from prefect import task, Flow
from prefect.storage import GitHub
from prefect.run_configs import LocalRun
FLOW_NAME = "03_dashboards"
STORAGE = GitHub(
repo="anna-geller/flow-of-flows",
path=f"flows/{FLOW_NAME}.py",
access_token_secret="GITHUB_ACCESS_TOKEN",
from prefect import Flow
from prefect.storage import GitHub
from prefect.tasks.prefect import create_flow_run, wait_for_flow_run
from prefect.run_configs import LocalRun
FLOW_NAME = "04_orchestrating_flow"
STORAGE = GitHub(
repo="anna-geller/flow-of-flows",
path=f"flows/{FLOW_NAME}.py",
access_token_secret="GITHUB_ACCESS_TOKEN",
from prefect import Flow
from prefect.storage import GitHub
from prefect.tasks.prefect import StartFlowRun
from prefect.run_configs import LocalRun
FLOW_NAME = "04_orchestrating_flow"
STORAGE = GitHub(
repo="anna-geller/flow-of-flows",
path=f"flows/{FLOW_NAME}.py",
access_token_secret="GITHUB_ACCESS_TOKEN",
from prefect import Flow, Parameter, task
from prefect.tasks.postgres import PostgresExecute
from prefect.tasks.secrets import PrefectSecret
postgres_execute = PostgresExecute(
db_name="abc", user="peter", host="my_host", port=5432,
)
@task
import requests
url = (
"https://min-api.cryptocompare.com/data/pricemulti?fsyms=BTC,ETH,REP,DASH&tsyms=USD"
)
r = requests.get(url)
data = r.json()
print(data)
{
"BTC": {"USD": 62289.47},
"ETH": {"USD": 4600.19},
"REP": {"USD": 24.92},
"DASH": {"USD": 192.05}
}
btc = data['BTC']['USD']
eth = data['ETH']['USD']
rep = data['REP']['USD']
dash = data['DASH']['USD']
import boto3
write_client = boto3.client("timestream-write")
db_arn = write_client.create_database(DatabaseName="demo")
tbl_arn = write_client.create_table(
DatabaseName="demo",
TableName="data",
RetentionProperties={
"MemoryStoreRetentionPeriodInHours": 1,
"MagneticStoreRetentionPeriodInDays": 365,
records = [
{
"Time": now,
"TimeUnit": "MILLISECONDS",
"Dimensions": [{"Name": "crypto", "Value": "BTC"}],
"MeasureName": "Price",
"MeasureValue": str(btc),
"MeasureValueType": "DOUBLE",
},
{
import boto3
write_client = boto3.client("timestream-write")
rejected_records = write_client.write_records(
DatabaseName="demo", TableName="data", Records=records, CommonAttributes={}
)
print(rejected_records)