Skip to content

Instantly share code, notes, and snippets.

@jacksonlee-civis
Created March 30, 2023 00:33
Show Gist options
  • Save jacksonlee-civis/60dfddb92640715c6e2c770ec16162a5 to your computer and use it in GitHub Desktop.
Save jacksonlee-civis/60dfddb92640715c6e2c770ec16162a5 to your computer and use it in GitHub Desktop.
Sample code using the async-graph-data-flow package
# This Python script was tested with Python 3.11.
# It requires the async-graph-data-flow package plus several other third-party dependencies.
# Before running this script, we recommend installing all these dependencies with pip:
# pip install async-graph-data-flow==1.1.0 aiocsv==1.2.3 aiofile==3.8.1 aiohttp==3.8.4
import aiocsv
import aiofile
import aiohttp
from async_graph_data_flow import AsyncGraph, AsyncExecutor
# API doc: https://www.openbrewerydb.org/documentation
URL = "https://api.openbrewerydb.org/v1/breweries"
CSV_HEADER = [
"id",
"name",
"brewery_type",
"address_1",
"address_2",
"address_3",
"city",
"state_province",
"postal_code",
"country",
"longitude",
"latitude",
"phone",
"website_url",
"state",
"street",
]
OUTPUT_FILENAME = "breweries_us_async.csv"
has_written_csv_header = False
async def get_open_brewery_data():
page = 1
async with aiohttp.ClientSession() as session:
while True:
params = {
"by_country": "United States",
"page": page,
"per_page": 200,
}
async with session.get(URL, params=params) as response:
response.raise_for_status()
data = await response.json()
if not data:
break
else:
yield data
page += 1
async def write_to_csv(data: list[dict[str, str]]|None):
global has_written_csv_header
async with aiofile.async_open(OUTPUT_FILENAME, mode="a", encoding="utf8") as f:
csv_writer = aiocsv.AsyncDictWriter(f, CSV_HEADER)
if not has_written_csv_header:
await csv_writer.writeheader()
has_written_csv_header = True
await csv_writer.writerows(data)
yield
def main():
graph = AsyncGraph()
graph.add_node(get_open_brewery_data)
graph.add_node(write_to_csv)
graph.add_edge(get_open_brewery_data, write_to_csv)
executor = AsyncExecutor(graph)
executor.execute()
print("data downloaded:", OUTPUT_FILENAME)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment