Dagster について
https://dagster.io/
Dagster は Asset ベースの Data Orchestration ツールになります。
Asset を Python コードで定義することで、データの Lineage を見ることができたり 定期実行する Job や、イベントをトリガーとした Job の実行などを管理できるツールです。
Dagster Univerity とは
https://courses.dagster.io/courses/dagster-essentials
Dagster University は、Dagster 公式で用意されている Learning 用のコンテンツです。
チュートリアル的に手を動かして学べるのと、各章の最後に Quiz があり理解度チェックをすることができます。
基礎的な概念から実際に手を動かすパートなどあり、Dagster を使うとどんなことができるかのイメージが付きます。
Dagster Essentialsの内容
- Lesson 1: Introduction
- Lesson 2: Prerequisites & setup
- Lesson 3: Software-defined Assets
- Lesson 4: Asset dependencies
- Lesson 5: Definitions and code locations
- Lesson 6: Resources
- Lesson 7: Schedules
- Lesson 8: Partitions and backfills
- Lesson 9: Sensors
- Capstone
- Extra credit: Metadata
すべて英語な上、サイトの作りの影響なのか、ブラウザの翻訳が効かないです。
内容を翻訳サイトにコピペして、日本語で内容把握していきました。
修了証
つまったところ
Lesson9: Sensors - Configuring asset creation
class AdhocRequestConfig(Config):
filename: str
borough: str
start_date: str
end_date: str
を書くと、
Metaclass conflict: the metaclass of a derived class must be a (non-strict) subclass of the metaclasses of all its basesMypymisc
が発生。
解決方法
正しいかは分からないですが、dagsterのバージョンを1.8系に上げるとエラー発生しなくなりました。(※使用しているpythonは、3.12.5)
手順では、 pip install 'dagster~=1.7'
とあり、1.7系をinstallすることになっていますが、このバージョンだとエラーとなり、1.8系にあげるとエラー発生しなくなりました。
やったこと
install_requires=[
"dagster==1.8.*", ←1.8に変更
に変更した後、
dagster_university % pip install -e ".[dev]"
です。
Lesson9: Sensors - Creating a job
adhoc_request = AssetSelection.assets(["adhoc_request"])
adhoc_request_job = define_asset_job(
name="adhoc_request_job",
selection=adhoc_request,
)
をした後、dagster UIで、adhoc_requestが見つからないというエラーが出る。
解決方法
Lesson 7: Schedules - Updating the Definitions object で、
all_jobs = [trip_update_job, weekly_update_job]
などの追加をしているが、requests.pyを同様に追加していない。
この手順が、Lesson 9に書かれていないので、手順通りにやってもエラーになってしまいます。
top levelの__init__.pyはこのようになるはずです。
# fmt: off
from dagster import Definitions, load_assets_from_modules
from .assets import metrics, trips, requests
from .resources import database_resource
from .jobs import trip_update_job, weekly_update_job
from .schedules import trip_update_schedule, weekly_update_schedule
trip_assets = load_assets_from_modules([trips])
metric_assets = load_assets_from_modules([metrics])
requests_assets = load_assets_from_modules([requests])
all_jobs = [trip_update_job, weekly_update_job]
all_schedules = [trip_update_schedule, weekly_update_schedule]
defs = Definitions(
assets=[*trip_assets, *metric_assets, *requests_assets],
resources={
"database": database_resource,
},
jobs=all_jobs,
schedules=all_schedules,
)