Created
March 30, 2023 00:33
-
-
Save jacksonlee-civis/60dfddb92640715c6e2c770ec16162a5 to your computer and use it in GitHub Desktop.
Sample code using the async-graph-data-flow package
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# This Python script was tested with Python 3.11. | |
# It requires the async-graph-data-flow package plus several other third-party dependencies. | |
# Before running this script, we recommend installing all these dependencies with pip: | |
# pip install async-graph-data-flow==1.1.0 aiocsv==1.2.3 aiofile==3.8.1 aiohttp==3.8.4 | |
import aiocsv | |
import aiofile | |
import aiohttp | |
from async_graph_data_flow import AsyncGraph, AsyncExecutor | |
# API doc: https://www.openbrewerydb.org/documentation | |
URL = "https://api.openbrewerydb.org/v1/breweries" | |
CSV_HEADER = [ | |
"id", | |
"name", | |
"brewery_type", | |
"address_1", | |
"address_2", | |
"address_3", | |
"city", | |
"state_province", | |
"postal_code", | |
"country", | |
"longitude", | |
"latitude", | |
"phone", | |
"website_url", | |
"state", | |
"street", | |
] | |
OUTPUT_FILENAME = "breweries_us_async.csv" | |
has_written_csv_header = False | |
async def get_open_brewery_data(): | |
page = 1 | |
async with aiohttp.ClientSession() as session: | |
while True: | |
params = { | |
"by_country": "United States", | |
"page": page, | |
"per_page": 200, | |
} | |
async with session.get(URL, params=params) as response: | |
response.raise_for_status() | |
data = await response.json() | |
if not data: | |
break | |
else: | |
yield data | |
page += 1 | |
async def write_to_csv(data: list[dict[str, str]]|None): | |
global has_written_csv_header | |
async with aiofile.async_open(OUTPUT_FILENAME, mode="a", encoding="utf8") as f: | |
csv_writer = aiocsv.AsyncDictWriter(f, CSV_HEADER) | |
if not has_written_csv_header: | |
await csv_writer.writeheader() | |
has_written_csv_header = True | |
await csv_writer.writerows(data) | |
yield | |
def main(): | |
graph = AsyncGraph() | |
graph.add_node(get_open_brewery_data) | |
graph.add_node(write_to_csv) | |
graph.add_edge(get_open_brewery_data, write_to_csv) | |
executor = AsyncExecutor(graph) | |
executor.execute() | |
print("data downloaded:", OUTPUT_FILENAME) | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment