はじめに
digdagプラグイン作成時に期待しない挙動となり困った点がいくつかありました。
その中から2点ほど注意すべき点を紹介したいと思います。
digdagのバージョンはv0.9.37です。
注意する点
(1)CommandExecutorを使用する
(2)runTaskで投げる例外による挙動の違い
(1)CommandExecutorを使用する
Dockerコンテナ上での使用を想定するオペレータの場合、コマンドの実行はCommandExecutorを介して行いましょう。
CommandExecutorを使用しない場合コンテナ上で動かすことは出来ません。
CommandExecutorの実装はDockerCommandExecutorで、workflowにdocker
が設定されているかどうかによってコンテナ実行するかの分岐処理が行われています。
public Process start(Path projectPath, TaskRequest request, ProcessBuilder pb)
throws IOException
{
// TODO set TZ environment variable
Config config = request.getConfig();
if (config.has("docker")) {
return startWithDocker(projectPath, request, pb);
}
else {
return simple.start(projectPath.toAbsolutePath(), request, pb);
}
}
以下CommandExecutorを使用したプラグインの簡単な例です。
import java.util.{Arrays => JArrays, List => JList}
import io.digdag.spi._
import javax.inject.Inject
object HogePlugin {
class HogeOperatorProvider @Inject()(
commandExecutor: CommandExecutor
) extends OperatorProvider {
override def get(): JList[OperatorFactory] = JArrays.asList(
new HogeOperatorFactory(commandExecutor)
)
}
}
class HogePlugin extends Plugin {
override def getServiceProvider[T](aClass: Class[T]): Class[_ <: T] = {
if (aClass ne classOf[OperatorProvider]) null
else classOf[HogePlugin.HogeOperatorProvider].asSubclass(aClass)
}
}
class HogeOperatorFactory(
commandExecutor: CommandExecutor
) extends OperatorFactory {
override def getType: String = "hoge"
override def newOperator(context: OperatorContext): Operator = {
new HogeOperatorFactory.HogeOperator(context, commandExecutor)
}
}
object HogeOperatorFactory {
private class HogeOperator(
context: OperatorContext,
commandExecutor: CommandExecutor,
) extends BaseOperator(context) {
override def runTask(): TaskResult = {
val pb = new ProcessBuilder("/bin/ls", "-l")
commandExecutor.start(workspace.getPath, request, pb)
}
}
}
io.digdag.standards.operator. ShOperatorFactory
の実装がとても参考になります
(2)runTaskで投げる例外による挙動の違い
Operatorを実装する場合はBaseOperatorを継承してrunTaskを実装すると思いますが、ここでスローする例外によって挙動が変わります。
io.digdag.core.agent.OperatorManager
を見るとざっくり以下の3つに分かれているようです。
- TaskExecutionException
- ConfigException
- 上記以外のRuntimeExceptionと、AssertionError
2と3は出力されるログの内容が異なるくらいのようですが、リトライを行いたい場合は1のTaskExecutionExceptionを投げる必要があります。
An exception thrown when an expected exception happens.
またio.digdag.spi.TaskExecutionException
に以上のように記載されていますが、予想可能な例外発生の場合はTaskExecutionExceptionをスローするべきです。TaskExecutionExceptionの場合、stacktraceを表示せず、またタスクの再実行も行われます。
最初何も気にせず適当にExceptionを投げていたらタスクが失敗しているにも関わらずステータスがPendingのままになってしまってました。こんなことにならないように気をつけてください。
まとめ
ハマった場合はとりあえずdigdagのソースを読むのが一番の解決の近道でした。
io.digdag.plugin.example.ExamplePlugin
にプラグインの参考実装があります。
また、ワークフローエンジンDigdagのまとめ にプラグインがまとめられているので、参考に出来る実装はたくさんありそうです。
個人で作っているプラグインもあるのでよかったら参考にしてみてください。
https://github.com/tosametal/digdag-plugins