Skip to content

Instantly share code, notes, and snippets.

@serihiro
Last active September 6, 2018 19:54
Show Gist options
  • Save serihiro/8fd86fb82edabe6bc609fd4052665da4 to your computer and use it in GitHub Desktop.
Save serihiro/8fd86fb82edabe6bc609fd4052665da4 to your computer and use it in GitHub Desktop.
digdag task parallelization experiments
import os
import time
import random
import digdag
class ParamSet(object):
def set_param(self):
time.sleep(int(os.environ['sleep']))
digdag.env.store({'value': os.environ['value']})
print(f"{os.environ['value']} finished")
+top_task:
echo>: "top_task"
+loop_task:
for_each>:
value: [task1, task2, task3, task4, task5]
_parallel: true
_do:
py>: tasks.ParamSet.set_param
_env:
value: ${value}
sleep: 0
+show_value:
sh>: echo ${value}
+top_task:
echo>: "top_task"
+loop_task:
_parallel: true
+task1:
py>: tasks.ParamSet.set_param
_env:
value: task1
sleep: 5
+task2:
py>: tasks.ParamSet.set_param
_env:
value: task2
sleep: 4
+task3:
py>: tasks.ParamSet.set_param
_env:
value: task3
sleep: 3
+task4:
py>: tasks.ParamSet.set_param
_env:
value: task4
sleep: 2
+task5:
py>: tasks.ParamSet.set_param
_env:
value: task5
sleep: 1
+show_value:
sh>: echo ${value}
_export:
td:
database: serizawa_test
+loop_td_run:
loop>: 10
_parallel: true
_do:
+td_run:
td_run>: 99999
+show_last_job_id:
sh>: echo ${td.last_job_id}
+show_last_job_id_after_loop:
sh>: echo ${td.last_job_id}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment