LoginSignup
8
1

More than 3 years have passed since last update.

digdagでプラグイン作成時の注意点

Posted at

はじめに

digdagプラグイン作成時に期待しない挙動となり困った点がいくつかありました。
その中から2点ほど注意すべき点を紹介したいと思います。
digdagのバージョンはv0.9.37です。

注意する点

(1)CommandExecutorを使用する
(2)runTaskで投げる例外による挙動の違い

(1)CommandExecutorを使用する

Dockerコンテナ上での使用を想定するオペレータの場合、コマンドの実行はCommandExecutorを介して行いましょう。
CommandExecutorを使用しない場合コンテナ上で動かすことは出来ません。
CommandExecutorの実装はDockerCommandExecutorで、workflowにdockerが設定されているかどうかによってコンテナ実行するかの分岐処理が行われています。

DockerCommandExecutor.java
public Process start(Path projectPath, TaskRequest request, ProcessBuilder pb)
    throws IOException
{
    // TODO set TZ environment variable
    Config config = request.getConfig();
    if (config.has("docker")) {
        return startWithDocker(projectPath, request, pb);
    }
    else {
        return simple.start(projectPath.toAbsolutePath(), request, pb);
    }
}

以下CommandExecutorを使用したプラグインの簡単な例です。

HogePlugin.scala
import java.util.{Arrays => JArrays, List => JList}

import io.digdag.spi._
import javax.inject.Inject

object HogePlugin {
  class HogeOperatorProvider @Inject()(
      commandExecutor: CommandExecutor
  ) extends OperatorProvider {
    override def get(): JList[OperatorFactory] = JArrays.asList(
      new HogeOperatorFactory(commandExecutor)
    )
  }
}

class HogePlugin extends Plugin {
  override def getServiceProvider[T](aClass: Class[T]): Class[_ <: T] = {
    if (aClass ne classOf[OperatorProvider]) null
    else classOf[HogePlugin.HogeOperatorProvider].asSubclass(aClass)
  }
}
HogeOperatorFactory.scala
class HogeOperatorFactory(
    commandExecutor: CommandExecutor
) extends OperatorFactory {
  override def getType: String = "hoge"

  override def newOperator(context: OperatorContext): Operator = {
    new HogeOperatorFactory.HogeOperator(context, commandExecutor)
  }
}

object HogeOperatorFactory {
  private class HogeOperator(
      context: OperatorContext,
      commandExecutor: CommandExecutor,
  ) extends BaseOperator(context) {
    override def runTask(): TaskResult = {
      val pb = new ProcessBuilder("/bin/ls", "-l")
      commandExecutor.start(workspace.getPath, request, pb)
    }
  }
}

io.digdag.standards.operator. ShOperatorFactoryの実装がとても参考になります

(2)runTaskで投げる例外による挙動の違い

Operatorを実装する場合はBaseOperatorを継承してrunTaskを実装すると思いますが、ここでスローする例外によって挙動が変わります。
io.digdag.core.agent.OperatorManagerを見るとざっくり以下の3つに分かれているようです。

  1. TaskExecutionException
  2. ConfigException
  3. 上記以外のRuntimeExceptionと、AssertionError

2と3は出力されるログの内容が異なるくらいのようですが、リトライを行いたい場合は1のTaskExecutionExceptionを投げる必要があります。

An exception thrown when an expected exception happens.

またio.digdag.spi.TaskExecutionExceptionに以上のように記載されていますが、予想可能な例外発生の場合はTaskExecutionExceptionをスローするべきです。TaskExecutionExceptionの場合、stacktraceを表示せず、またタスクの再実行も行われます。

最初何も気にせず適当にExceptionを投げていたらタスクが失敗しているにも関わらずステータスがPendingのままになってしまってました。こんなことにならないように気をつけてください。

まとめ

ハマった場合はとりあえずdigdagのソースを読むのが一番の解決の近道でした。
io.digdag.plugin.example.ExamplePluginにプラグインの参考実装があります。
また、ワークフローエンジンDigdagのまとめ にプラグインがまとめられているので、参考に出来る実装はたくさんありそうです。

個人で作っているプラグインもあるのでよかったら参考にしてみてください。
https://github.com/tosametal/digdag-plugins

8
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
8
1