初めに
DagsterのAssetCheckと、その完了をトリガにして後続処理を走らせるのに少し苦労したので備忘録に残します。
やりたいこと
- AssetにAssetCheckを行い、自動でテストを行う
- Upstreamの実行完了時、Downstreamを自動実行する
- UpstreamのCheckに失敗した場合、Downstreamは実行しない
うまくいかない例
import dagster as dg
@dg.asset(
group_name="test",
)
def upstream(context: dg.AssetExecutionContext) -> dg.MaterializeResult:
return dg.MaterializeResult(
metadata={
"partition_2":f"____v2"
}
)
@dg.asset_check(
asset=upstream,
)
def upstream_check(duckdb: DuckDBResource) -> dg.AssetCheckResult:
return dg.AssetCheckResult(
passed=False,
metadata={"check": 12}
)
@dg.asset(
group_name="test",
automation_condition = dg.AutomationCondition.eager(), # Checkが失敗しても実行される
deps=[upstream],
)
def downstream(context: dg.AssetExecutionContext):
return ""
原因
2点あります。
- AssetCheckに
bloaking
属性を付与していないこと - AutomationConditionに
all_deps_blocking_checks_passed
条件が付与されていないこと
改善例
import dagster as dg
# カスタムAutomationを設定
condition = (
dg.AutomationCondition.eager()
& dg.AutomationCondition.all_deps_blocking_checks_passed()
)
@dg.asset(
group_name="test",
)
def upstream(context: dg.AssetExecutionContext) -> dg.MaterializeResult:
return dg.MaterializeResult(
metadata={
"partition_2":f"____v2"
}
)
@dg.asset_check(
asset=upstream,
blocking=True, # ⇦追加
)
def upstream_check(duckdb: DuckDBResource) -> dg.AssetCheckResult:
return dg.AssetCheckResult(
passed=False,
metadata={"check": 12}
)
@dg.asset(
group_name="test",
automation_condition = condition # ⇦変更
deps=[upstream],
)
def downstream(context: dg.AssetExecutionContext):
return ""
以上です。
AssetCheckを導入したけれど、うまく動作しなかった場合h参考になるかと思います。