Skip to content

Instantly share code, notes, and snippets.

@pangyuteng
Last active October 3, 2024 16:46
Show Gist options
  • Save pangyuteng/cffda14bba8b1a0526ec6992118b9fa4 to your computer and use it in GitHub Desktop.
Save pangyuteng/cffda14bba8b1a0526ec6992118b9fa4 to your computer and use it in GitHub Desktop.
triggering of http dag jobs with asyncio and aiohttp

Sample code to run DAG jobs with python asyncio and aiohttp libraries.

example DAG

    D______
A _/  C    \
   \_/ \_E__\_F
     \ /
      B

First script workflow-asyncio.py uses only asyncio.

Second script workflow-asyncio-aiohttp.py uses asyncio and aiohttp.

'''
D______
A _/ C \
\_/ \_E__\_F
\ /
B
'''
import asyncio
import aiohttp
import time
# declared as a method in Class, for ease of testing, and import
class Job(object):
async def job(url,sec):
print('started',url)
get_url = 'https://example.com/'
async with aiohttp.ClientSession() as session:
async with session.get(get_url) as response:
text = await response.text()
#simulate goto-url by assigning get-url
goto_url = get_url
start_time = time.time()
wait_time = 0
val=0
# simulate pulling to get status
while wait_time < sec and isinstance(goto_url,str):
async with aiohttp.ClientSession() as session:
async with session.get(goto_url) as response:
text2 = await response.text()
val+=len(text2)
await asyncio.sleep(2)
wait_time = time.time()-start_time
print('done',url)
return len(text)+val
job = Job.job
async def main():
taskA = asyncio.create_task(job('A',1))
rA = await taskA
print(rA,'A')
taskD = asyncio.create_task(job('D',20))
rBC = await asyncio.gather(job('B',5), job('C',1))
print(rBC,'BC')
rE = await asyncio.gather(job('E',2))
print(rE,'E')
rD = await taskD
print(rD,'D')
taskF = asyncio.create_task(job('F',1))
rF = await taskF
print(rF,'F')
asyncio.run(main())
'''
D______
A _/ C \
\_/ \_E__\_F
\ /
B
'''
import asyncio
async def job(name,sec):
print("starting",name)
await asyncio.sleep(sec)
print("complete",name)
async def main():
taskA = asyncio.create_task(job('A',1))
await taskA
taskD = asyncio.create_task(job('D',10))
await asyncio.gather(job('B',1), job('C',1))
await asyncio.gather(job('E',2))
await taskD
taskF = asyncio.create_task(job('F',1))
await taskF
asyncio.run(main())
@pangyuteng
Copy link
Author

for file access, use aiofiles ( alternative is aiofile )

aiofiles: https://github.com/Tinche/aiofiles
aiofile: https://github.com/mosquito/aiofile

import aiofiles
import aiofiles.os

async def test(folder):
    await aiofiles.os.makedirs(folder,exist_ok=True)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment