LoginSignup
11
5

More than 5 years have passed since last update.

digdagでタスクを任意の並列数で実行する方法

Last updated at Posted at 2018-12-09

方法

├── parallel.dig
└── util
    ├── __init__.py

まずデータ構造はこの様になっています

次にdigdagのワークフローは次の様にします
並列数は3にしてあります。

parallel.dig
+set_params:
  _export:
    parallel_num: 3
    docker:
      image: python:3.6
  py>: util.Utility.set_state

+do:
  for_each>:
    params: ${parallel_param}
  _parallel: false
  _do:

    +do_with_parallel:
      for_each>:
        param: ${params}
      _parallel: true
      _do:

        +echo:
          sh>: echo ${param}

parallel.digの外ループでは_parallel: false、内では_parallel: trueにするのが肝になります。

またdigdagから読んでいるpythonコードは下記の通りです

__init__.py
import digdag

class Utility(object):
    def set_state(self):
        parallel_num = digdag.env.params['parallel_num']
        params = [["test_{}_{}".format(r_num, p_num) for p_num in range(parallel_num)] for r_num in range(120)]
        digdag.env.store({"parallel_param": params})

実行

実行すると下記のログの様になります

2018-12-09 02:00:33 +0900 [INFO] (main): Starting a new session project id=1 workflow name=parallel session_time=2018-12-08T00:00:00+00:00
2018-12-09 02:00:34 +0900 [INFO] (0018@[0:default]+parallel+set_params): py>: util.Utility.set_state
2018-12-09 02:00:37 +0900 [INFO] (0018@[0:default]+parallel+do): for_each>: {params=[["test_0_0","test_0_1","test_0_2"],["test_1_0","test_1_1","test_1_2"],["test_2_0","test_2_1","test_2_2"],["test_3_0","test_3_1","test_3_2"],["test_4_0","test_4_1","test_4_2"],["test_5_0","test_5_1","test_5_2"],["test_6_0","test_6_1","test_6_2"],["test_7_0","test_7_1","test_7_2"],["test_8_0","test_8_1","test_8_2"],["test_9_0","test_9_1","test_9_2"],["test_10_0","test_10_1","test_10_2"],["test_11_0","test_11_1","test_11_2"],["test_12_0","test_12_1","test_12_2"],["test_13_0","test_13_1","test_13_2"],["test_14_0","test_14_1","test_14_2"],["test_15_0","test_15_1","test_15_2"],["test_16_0","test_16_1","test_16_2"],["test_17_0","test_17_1","test_17_2"],["test_18_0","test_18_1","test_18_2"],["test_19_0","test_19_1","test_19_2"],["test_20_0","test_20_1","test_20_2"],["test_21_0","test_21_1","test_21_2"],["test_22_0","test_22_1","test_22_2"],["test_23_0","test_23_1","test_23_2"],["test_24_0","test_24_1","test_24_2"],["test_25_0","test_25_1","test_25_2"],["test_26_0","test_26_1","test_26_2"],["test_27_0","test_27_1","test_27_2"],["test_28_0","test_28_1","test_28_2"],["test_29_0","test_29_1","test_29_2"],["test_30_0","test_30_1","test_30_2"],["test_31_0","test_31_1","test_31_2"],["test_32_0","test_32_1","test_32_2"],["test_33_0","test_33_1","test_33_2"],["test_34_0","test_34_1","test_34_2"],["test_35_0","test_35_1","test_35_2"],["test_36_0","test_36_1","test_36_2"],["test_37_0","test_37_1","test_37_2"],["test_38_0","test_38_1","test_38_2"],["test_39_0","test_39_1","test_39_2"],["test_40_0","test_40_1","test_40_2"],["test_41_0","test_41_1","test_41_2"],["test_42_0","test_42_1","test_42_2"],["test_43_0","test_43_1","test_43_2"],["test_44_0","test_44_1","test_44_2"],["test_45_0","test_45_1","test_45_2"],["test_46_0","test_46_1","test_46_2"],["test_47_0","test_47_1","test_47_2"],["test_48_0","test_48_1","test_48_2"],["test_49_0","test_49_1","test_49_2"],["test_50_0","test_50_1","test_50_2"],["test_51_0","test_51_1","test_51_2"],["test_52_0","test_52_1","test_52_2"],["test_53_0","test_53_1","test_53_2"],["test_54_0","test_54_1","test_54_2"],["test_55_0","test_55_1","test_55_2"],["test_56_0","test_56_1","test_56_2"],["test_57_0","test_57_1","test_57_2"],["test_58_0","test_58_1","test_58_2"],["test_59_0","test_59_1","test_59_2"],["test_60_0","test_60_1","test_60_2"],["test_61_0","test_61_1","test_61_2"],["test_62_0","test_62_1","test_62_2"],["test_63_0","test_63_1","test_63_2"],["test_64_0","test_64_1","test_64_2"],["test_65_0","test_65_1","test_65_2"],["test_66_0","test_66_1","test_66_2"],["test_67_0","test_67_1","test_67_2"],["test_68_0","test_68_1","test_68_2"],["test_69_0","test_69_1","test_69_2"],["test_70_0","test_70_1","test_70_2"],["test_71_0","test_71_1","test_71_2"],["test_72_0","test_72_1","test_72_2"],["test_73_0","test_73_1","test_73_2"],["test_74_0","test_74_1","test_74_2"],["test_75_0","test_75_1","test_75_2"],["test_76_0","test_76_1","test_76_2"],["test_77_0","test_77_1","test_77_2"],["test_78_0","test_78_1","test_78_2"],["test_79_0","test_79_1","test_79_2"],["test_80_0","test_80_1","test_80_2"],["test_81_0","test_81_1","test_81_2"],["test_82_0","test_82_1","test_82_2"],["test_83_0","test_83_1","test_83_2"],["test_84_0","test_84_1","test_84_2"],["test_85_0","test_85_1","test_85_2"],["test_86_0","test_86_1","test_86_2"],["test_87_0","test_87_1","test_87_2"],["test_88_0","test_88_1","test_88_2"],["test_89_0","test_89_1","test_89_2"],["test_90_0","test_90_1","test_90_2"],["test_91_0","test_91_1","test_91_2"],["test_92_0","test_92_1","test_92_2"],["test_93_0","test_93_1","test_93_2"],["test_94_0","test_94_1","test_94_2"],["test_95_0","test_95_1","test_95_2"],["test_96_0","test_96_1","test_96_2"],["test_97_0","test_97_1","test_97_2"],["test_98_0","test_98_1","test_98_2"],["test_99_0","test_99_1","test_99_2"],["test_100_0","test_100_1","test_100_2"],["test_101_0","test_101_1","test_101_2"],["test_102_0","test_102_1","test_102_2"],["test_103_0","test_103_1","test_103_2"],["test_104_0","test_104_1","test_104_2"],["test_105_0","test_105_1","test_105_2"],["test_106_0","test_106_1","test_106_2"],["test_107_0","test_107_1","test_107_2"],["test_108_0","test_108_1","test_108_2"],["test_109_0","test_109_1","test_109_2"],["test_110_0","test_110_1","test_110_2"],["test_111_0","test_111_1","test_111_2"],["test_112_0","test_112_1","test_112_2"],["test_113_0","test_113_1","test_113_2"],["test_114_0","test_114_1","test_114_2"],["test_115_0","test_115_1","test_115_2"],["test_116_0","test_116_1","test_116_2"],["test_117_0","test_117_1","test_117_2"],["test_118_0","test_118_1","test_118_2"],["test_119_0","test_119_1","test_119_2"]]}
2018-12-09 02:00:38 +0900 [INFO] (0018@[0:default]+parallel+do^sub+for-0=params=0=%5B%22test_0+do_with_parallel): for_each>: {param=["test_0_0","test_0_1","test_0_2"]}
2018-12-09 02:00:39 +0900 [INFO] (0018@[0:default]+parallel+do^sub+for-0=params=0=%5B%22test_0+do_with_parallel^sub+for-0=param=0=test_0_0+echo): sh>: echo test_0_0
2018-12-09 02:00:39 +0900 [INFO] (0020@[0:default]+parallel+do^sub+for-0=params=0=%5B%22test_0+do_with_parallel^sub+for-0=param=1=test_0_1+echo): sh>: echo test_0_1
test_0_0
2018-12-09 02:00:39 +0900 [INFO] (0021@[0:default]+parallel+do^sub+for-0=params=0=%5B%22test_0+do_with_parallel^sub+for-0=param=2=test_0_2+echo): sh>: echo test_0_2
test_0_1
test_0_2
2018-12-09 02:00:40 +0900 [INFO] (0021@[0:default]+parallel+do^sub+for-0=params=1=%5B%22test_1+do_with_parallel): for_each>: {param=["test_1_0","test_1_1","test_1_2"]}
2018-12-09 02:00:41 +0900 [INFO] (0020@[0:default]+parallel+do^sub+for-0=params=1=%5B%22test_1+do_with_parallel^sub+for-0=param=1=test_1_1+echo): sh>: echo test_1_1
2018-12-09 02:00:41 +0900 [INFO] (0021@[0:default]+parallel+do^sub+for-0=params=1=%5B%22test_1+do_with_parallel^sub+for-0=param=0=test_1_0+echo): sh>: echo test_1_0
2018-12-09 02:00:41 +0900 [INFO] (0018@[0:default]+parallel+do^sub+for-0=params=1=%5B%22test_1+do_with_parallel^sub+for-0=param=2=test_1_2+echo): sh>: echo test_1_2
test_1_1
test_1_2
test_1_0

上記のログの様に3並列で実行されているのが確認できると思います。
しかし3つのタスク全てが終わらないと、次のループに行けないため時間の無駄が発生し得ます。
よってパラメータにより実行時間に大きな時間のバラツキが発生する場合はコードベースでのスケジューリングをする必要があります。

まとめ

digdag内で複数のembulkタスクを全力で回したらOOMで死んだので上記な様なものが必要でした。
データ構造で単純なワークフローを少し複雑にできるのもdigdagの面白みですね。

11
5
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
11
5