結論
結論だけサクッと書きます。
def get_partition_key(context: dg.AssetCheckExecutionContext) -> str:
step_context = context.get_step_execution_context()
partition = step_context.partition_key
return partition
AssetCheckに渡されるContextを上記の関数に渡せばOKです
以下、実際の`@asset`と`@asset_check`のコードも記述します。 (実践的には`@multi_asset_check`の方を使うことが多いらしいので、サンプルコードはそちらで書きました。どちらにしても挙動は変わらないはずです)
# Asset
@dg.asset(
compute_kind="duckdb",
group_name="ingestion",
partitions_def=vresion_partition,
metadata={"table_name": "products"},
)
def products(context: dg.AssetExecutionContext, duckdb: DuckDBResource) -> str:
context.add_output_metadata(
{"dynamic_add": "123"}
)
context.add_asset_metadata(
metadata={
"row_count": 567,
"preview": "preview",
}
)
return "products"
# 以下AssetCheckコード
@dg.multi_asset_check(
specs=[
dg.AssetCheckSpec(blocking=True, name="record_num_check", asset="products"),
dg.AssetCheckSpec(blocking=True, name="value_check", asset="products"),
]
)
def check_products(context: dg.AssetCheckExecutionContext) -> Iterable[dg.AssetCheckResult]:
# 現在のパーティションのメタデータ取得
partition = get_partition_key(context)
# チェックロジック(例としてダミー)
is_ok = True
yield dg.AssetCheckResult(
check_name="record_num_check",
passed=is_ok,
asset_key="products",
partition=partition
)
value_is_ok = True
yield dg.AssetCheckResult(
check_name="value_check",
passed=value_is_ok,
asset_key="products",
partition=partition
)
@asset
と@asset_check
ではcontextの型が違うので、そこは注意が必要です。
本当は親AssetのMetaデータまでは取得したかったのですが、深ぼっても見当たりませんでした。
やりようはある気がしますが、パーティションさえ取得できればデータチェックに必要なデータは揃うので、今回はここで撤退することにしました。
有識者の方がいらっしゃいましたら、是非ともご教授お願いいたします。