背景
Dagsterを使用してワークフローを作成しているときに、直前のフローのMetadataを受け取りたいという当たり前の欲求がわいてきたのですが、方法を見つけるのに苦労したので情報共有の意味でも記事投稿します。
例として画像で言うと、S3のアセットで動的にサイズや格納場所が決定されますが、その値をRedshiftのアセットで受け取りたい、という要件です。
普通のワークフローツールならそれくらい簡単にできるだろうと思って公式ドキュメントやその他ドキュメントを読み漁っていたのですが、全然見つかりませんでした……
ということで、頑張って自作しました。
解決策
専用の関数が用意されていない(気づいてないだけ?)のですが、メタデータ自体はContextの内部にちゃんとありました。
なので、オブジェクトの中の方を掘り下げて取得しています。
- context : Assetが受け取るcontextそのものです
- asset_prefix : Assetのdefsに存在する、依存アセットのPrefixです。私の場合、接尾語に
v1_1
のような値を自由に付けたかったので、このような実装にしています
また、ハードコーディングされている静的Metadataと、処理実行中に付与された動的メタデータとで取得の方法が異なっていたので、両方記述します。
静的Metadata
def get_source_metadata_from_asset_prefix(
context: dg.AssetExecutionContext, asset_prefix: str
) -> ArbitraryMetadataMapping:
# 依存元のAssetキーを取得
asset_key = _get_asset_key_from_asset_prefix(context, asset_prefix)
# Assetキーから、Assetの定義そのものを取得
asset_def = context.repository_def.assets_defs_by_key[asset_key]
metadata = asset_def.metadata_by_key[asset_key]
return metadata
# 依存AssetをPrefixで検索しているので紛らわしいですが、AssetKey型の値を取得しています。重要なのは①②とコメントしてある部分だけです。
def _get_asset_key_from_asset_prefix(
context: dg.AssetExecutionContext, asset_prefix: str
):
deps_names = context.assets_def.input_names # ①
deps_candidate_list = []
for dep_name in list(deps_names):
if dep_name.startswith(asset_prefix):
deps_candidate_list.append(dep_name)
if len(deps_candidate_list) == 0:
raise RuntimeError(f"Asset Key is not found. Prefix: {asset_prefix}")
elif len(deps_candidate_list) > 1:
print(f"{deps_candidate_list=}")
raise RuntimeError(f"Multiple Asst key is found. Prefix: {asset_prefix}")
key_candidate = context.assets_def.input_names.mapping.get(deps_candidate_list[0]) # ②
return key_candidate
動的Metadata
def get_dynamic_metadata_from_asset_prefix(
context: dg.AssetExecutionContext, asset_prefix: str
) -> ArbitraryMetadataMapping:
# 対象の依存元AssetKeyを取得
asset_key = _get_asset_key_from_asset_prefix(context, asset_prefix)
try:
# Assetキーから直近のイベントを取得
latest_event = context.instance.get_latest_materialization_event(asset_key)
if (
latest_event is not None
and latest_event.dagster_event is not None
and latest_event.dagster_event.event_specific_data is not None
):
# 最新イベントから動的メタデータを取得
materialize_event = (
latest_event.dagster_event.event_specific_data.materialization.metadata
)
else:
raise AttributeError("Metadata can not be found.")
except AttributeError:
raise RuntimeError("Metadata can not be found.")
return materialize_event
ドキュメントを見てもわからなかったので、デバッグしながら目当てのデータが存在するかを探していました。さながら、川で砂金を探すような作業でした。
コメント
正直、ワークフローのメタデータを取得するだけなのにこんな回りくどいのはちょっとおかしいと思います。
Dagsterの思想とは異なることをしているのか、それとも良い方法を見逃しているのか。
有識者の方は是非教えていただけると幸いです!