##方法
├── parallel.dig
└── util
├── __init__.py
まずデータ構造はこの様になっています
次にdigdagのワークフローは次の様にします
並列数は3にしてあります。
parallel.dig
+set_params:
_export:
parallel_num: 3
docker:
image: python:3.6
py>: util.Utility.set_state
+do:
for_each>:
params: ${parallel_param}
_parallel: false
_do:
+do_with_parallel:
for_each>:
param: ${params}
_parallel: true
_do:
+echo:
sh>: echo ${param}
parallel.digの外ループでは_parallel: false、内では_parallel: trueにするのが肝になります。
またdigdagから読んでいるpythonコードは下記の通りです
__init__.py
import digdag
class Utility(object):
def set_state(self):
parallel_num = digdag.env.params['parallel_num']
params = [["test_{}_{}".format(r_num, p_num) for p_num in range(parallel_num)] for r_num in range(120)]
digdag.env.store({"parallel_param": params})
実行
実行すると下記のログの様になります
2018-12-09 02:00:33 +0900 [INFO] (main): Starting a new session project id=1 workflow name=parallel session_time=2018-12-08T00:00:00+00:00
2018-12-09 02:00:34 +0900 [INFO] (0018@[0:default]+parallel+set_params): py>: util.Utility.set_state
2018-12-09 02:00:37 +0900 [INFO] (0018@[0:default]+parallel+do): for_each>: {params=[["test_0_0","test_0_1","test_0_2"],["test_1_0","test_1_1","test_1_2"],["test_2_0","test_2_1","test_2_2"],["test_3_0","test_3_1","test_3_2"],["test_4_0","test_4_1","test_4_2"],["test_5_0","test_5_1","test_5_2"],["test_6_0","test_6_1","test_6_2"],["test_7_0","test_7_1","test_7_2"],["test_8_0","test_8_1","test_8_2"],["test_9_0","test_9_1","test_9_2"],["test_10_0","test_10_1","test_10_2"],["test_11_0","test_11_1","test_11_2"],["test_12_0","test_12_1","test_12_2"],["test_13_0","test_13_1","test_13_2"],["test_14_0","test_14_1","test_14_2"],["test_15_0","test_15_1","test_15_2"],["test_16_0","test_16_1","test_16_2"],["test_17_0","test_17_1","test_17_2"],["test_18_0","test_18_1","test_18_2"],["test_19_0","test_19_1","test_19_2"],["test_20_0","test_20_1","test_20_2"],["test_21_0","test_21_1","test_21_2"],["test_22_0","test_22_1","test_22_2"],["test_23_0","test_23_1","test_23_2"],["test_24_0","test_24_1","test_24_2"],["test_25_0","test_25_1","test_25_2"],["test_26_0","test_26_1","test_26_2"],["test_27_0","test_27_1","test_27_2"],["test_28_0","test_28_1","test_28_2"],["test_29_0","test_29_1","test_29_2"],["test_30_0","test_30_1","test_30_2"],["test_31_0","test_31_1","test_31_2"],["test_32_0","test_32_1","test_32_2"],["test_33_0","test_33_1","test_33_2"],["test_34_0","test_34_1","test_34_2"],["test_35_0","test_35_1","test_35_2"],["test_36_0","test_36_1","test_36_2"],["test_37_0","test_37_1","test_37_2"],["test_38_0","test_38_1","test_38_2"],["test_39_0","test_39_1","test_39_2"],["test_40_0","test_40_1","test_40_2"],["test_41_0","test_41_1","test_41_2"],["test_42_0","test_42_1","test_42_2"],["test_43_0","test_43_1","test_43_2"],["test_44_0","test_44_1","test_44_2"],["test_45_0","test_45_1","test_45_2"],["test_46_0","test_46_1","test_46_2"],["test_47_0","test_47_1","test_47_2"],["test_48_0","test_48_1","test_48_2"],["test_49_0","test_49_1","test_49_2"],["test_50_0","test_50_1","test_50_2"],["test_51_0","test_51_1","test_51_2"],["test_52_0","test_52_1","test_52_2"],["test_53_0","test_53_1","test_53_2"],["test_54_0","test_54_1","test_54_2"],["test_55_0","test_55_1","test_55_2"],["test_56_0","test_56_1","test_56_2"],["test_57_0","test_57_1","test_57_2"],["test_58_0","test_58_1","test_58_2"],["test_59_0","test_59_1","test_59_2"],["test_60_0","test_60_1","test_60_2"],["test_61_0","test_61_1","test_61_2"],["test_62_0","test_62_1","test_62_2"],["test_63_0","test_63_1","test_63_2"],["test_64_0","test_64_1","test_64_2"],["test_65_0","test_65_1","test_65_2"],["test_66_0","test_66_1","test_66_2"],["test_67_0","test_67_1","test_67_2"],["test_68_0","test_68_1","test_68_2"],["test_69_0","test_69_1","test_69_2"],["test_70_0","test_70_1","test_70_2"],["test_71_0","test_71_1","test_71_2"],["test_72_0","test_72_1","test_72_2"],["test_73_0","test_73_1","test_73_2"],["test_74_0","test_74_1","test_74_2"],["test_75_0","test_75_1","test_75_2"],["test_76_0","test_76_1","test_76_2"],["test_77_0","test_77_1","test_77_2"],["test_78_0","test_78_1","test_78_2"],["test_79_0","test_79_1","test_79_2"],["test_80_0","test_80_1","test_80_2"],["test_81_0","test_81_1","test_81_2"],["test_82_0","test_82_1","test_82_2"],["test_83_0","test_83_1","test_83_2"],["test_84_0","test_84_1","test_84_2"],["test_85_0","test_85_1","test_85_2"],["test_86_0","test_86_1","test_86_2"],["test_87_0","test_87_1","test_87_2"],["test_88_0","test_88_1","test_88_2"],["test_89_0","test_89_1","test_89_2"],["test_90_0","test_90_1","test_90_2"],["test_91_0","test_91_1","test_91_2"],["test_92_0","test_92_1","test_92_2"],["test_93_0","test_93_1","test_93_2"],["test_94_0","test_94_1","test_94_2"],["test_95_0","test_95_1","test_95_2"],["test_96_0","test_96_1","test_96_2"],["test_97_0","test_97_1","test_97_2"],["test_98_0","test_98_1","test_98_2"],["test_99_0","test_99_1","test_99_2"],["test_100_0","test_100_1","test_100_2"],["test_101_0","test_101_1","test_101_2"],["test_102_0","test_102_1","test_102_2"],["test_103_0","test_103_1","test_103_2"],["test_104_0","test_104_1","test_104_2"],["test_105_0","test_105_1","test_105_2"],["test_106_0","test_106_1","test_106_2"],["test_107_0","test_107_1","test_107_2"],["test_108_0","test_108_1","test_108_2"],["test_109_0","test_109_1","test_109_2"],["test_110_0","test_110_1","test_110_2"],["test_111_0","test_111_1","test_111_2"],["test_112_0","test_112_1","test_112_2"],["test_113_0","test_113_1","test_113_2"],["test_114_0","test_114_1","test_114_2"],["test_115_0","test_115_1","test_115_2"],["test_116_0","test_116_1","test_116_2"],["test_117_0","test_117_1","test_117_2"],["test_118_0","test_118_1","test_118_2"],["test_119_0","test_119_1","test_119_2"]]}
2018-12-09 02:00:38 +0900 [INFO] (0018@[0:default]+parallel+do^sub+for-0=params=0=%5B%22test_0+do_with_parallel): for_each>: {param=["test_0_0","test_0_1","test_0_2"]}
2018-12-09 02:00:39 +0900 [INFO] (0018@[0:default]+parallel+do^sub+for-0=params=0=%5B%22test_0+do_with_parallel^sub+for-0=param=0=test_0_0+echo): sh>: echo test_0_0
2018-12-09 02:00:39 +0900 [INFO] (0020@[0:default]+parallel+do^sub+for-0=params=0=%5B%22test_0+do_with_parallel^sub+for-0=param=1=test_0_1+echo): sh>: echo test_0_1
test_0_0
2018-12-09 02:00:39 +0900 [INFO] (0021@[0:default]+parallel+do^sub+for-0=params=0=%5B%22test_0+do_with_parallel^sub+for-0=param=2=test_0_2+echo): sh>: echo test_0_2
test_0_1
test_0_2
2018-12-09 02:00:40 +0900 [INFO] (0021@[0:default]+parallel+do^sub+for-0=params=1=%5B%22test_1+do_with_parallel): for_each>: {param=["test_1_0","test_1_1","test_1_2"]}
2018-12-09 02:00:41 +0900 [INFO] (0020@[0:default]+parallel+do^sub+for-0=params=1=%5B%22test_1+do_with_parallel^sub+for-0=param=1=test_1_1+echo): sh>: echo test_1_1
2018-12-09 02:00:41 +0900 [INFO] (0021@[0:default]+parallel+do^sub+for-0=params=1=%5B%22test_1+do_with_parallel^sub+for-0=param=0=test_1_0+echo): sh>: echo test_1_0
2018-12-09 02:00:41 +0900 [INFO] (0018@[0:default]+parallel+do^sub+for-0=params=1=%5B%22test_1+do_with_parallel^sub+for-0=param=2=test_1_2+echo): sh>: echo test_1_2
test_1_1
test_1_2
test_1_0
上記のログの様に3並列で実行されているのが確認できると思います。
しかし3つのタスク全てが終わらないと、次のループに行けないため時間の無駄が発生し得ます。
よってパラメータにより実行時間に大きな時間のバラツキが発生する場合はコードベースでのスケジューリングをする必要があります。
##まとめ
digdag内で複数のembulkタスクを全力で回したらOOMで死んだので上記な様なものが必要でした。
データ構造で単純なワークフローを少し複雑にできるのもdigdagの面白みですね。