はじめに
皆さんこんにちは。今日も元気にジョブを実行しているでしょうか。
さて、ジョブのワークフローツールと言ったら、Airflowは定番ですよね。
pythonでフロー定義が書ける、なんともイカしたOSSです。
そのAirflowのハンズオンワークショップに参加したのですが、当日はあまり進められなかったので家でリベンジしました。
その中で詰まったポイントを記事にしようと思います。
参加したハンズオンワークショップの内容は以下に載っているので、ぜひ皆さんも試してみてください。
(資料はpdfでダウンロードすると、コマンドがコピペできて良い感じです)
ちなみに、課題 #3については以下にアップロードされていました。
動作環境
Windows 10 Home バージョン 21H2
WSL2(Ubuntu 20.04)
詰まったポイント
環境準備
Airflowを立ち上げなおすとうまく動かない
事象
一度airflow standaloneコマンドを打って立ち上げた後に、ctrl+cで抜けて再度同じコマンドを打って立ち上げなおすと、うまく動きませんでした。具体的には、コンソール画面にはアクセスできますが、dagsのpythonファイルを更新しても全く読み込まなくなってしまいました。
また、logsフォルダの配下にジョブの実行ログが出力されていくはずですが、これも出力されなくなってしまいました。
その際は、以下のようなログが出力されていました。
webserver | [2022-11-05 13:21:26 +0900] [22010] [INFO] Starting gunicorn 20.1.0
webserver | [2022-11-05 13:21:27 +0900] [22010] [ERROR] Connection in use: ('0.0.0.0', 8080)
webserver | [2022-11-05 13:21:27 +0900] [22010] [ERROR] Retrying in 1 second.
scheduler | [2022-11-05 13:21:27 +0900] [22008] [ERROR] Connection in use: ('::', 8793)
scheduler | [2022-11-05 13:21:27 +0900] [22008] [ERROR] Retrying in 1 second.
Airflow側が、ローカルのフォルダ内のファイルを認識しなくなっているように見えます。
解決策
結局、一度venvの仮想環境を削除して、再度作り直すことで解決しました。(原因わからず…)
このエラー内容から、この2つのポート(8080, 8793)のプロセスがまだ生きているのでは?と思い、
- lsof -iコマンドでポートとプロセスIDの紐づけを確認
- killコマンドで8080と8793のポートのプロセスをkill
してみましたが、状況変わらずでした。
課題 #1
特になし。ログがちゃんと見れました。
課題 #2
1時間に1回起動設定になっているが、数秒に1回起動する
事象
以下のコードを実行したとき、「schedule="0 * * * *"」となっているため1時間に1回の起動設定になっています。
(参考:https://airflow.apache.org/docs/apache-airflow/1.10.1/scheduler.html)
import datetime
from airflow import DAG
with DAG(
dag_id="exercise_02",
description="A simple tutorial DAG",
schedule="0 * * * *",
# schedule_interval="0 * * * *", # Airflow 2.4未満の場合
start_date=datetime.datetime(2021, 11, 7),
tags=["example"],
) as dag:
@dag.task
def compute_random_number():
import random
num = random.randint(1, 10)
return num
random_number = compute_random_number()
しかし、実際には3-5秒に1回どんどん実行されて行ってしまいました。
以下の図のStart Dateの列を見るとわかると思います。
原因
これは、コードの中で
start_date=datetime.datetime(2021, 11, 7)
過去の日付を指定していることが原因です。
start_dateは2021年11月7日となっていますが、もうそれは過去の日付です。
そのため、とにかく実行しまくって、現在の時刻に追いつこうとしているのです。
ただし、実際のスケジューリングは1時間に1回なので、Logical Date(Airflowが認識している論理的な時間)はstart_dateの2021年11月7日から1時間ずつ実行されたことになっているわけです。
おそらく、これが現在の時刻に追いつけば、想定通り1時間に1度の実行になるはずです。
解決策
今回はテスト動作が分かればいいので、以下の方法でスケジュール実行をしないようにして、手動実行のみにすることで解決できます。
- start_dateを未来の日付にする
- scheduleオプションを指定しない
ただ、今回は数秒に1回実行されていても特に問題はないので、このまま放置しました。笑
messageが定義されていないエラー
事象
これは、以下の1文が追加されたときに発生したエラーです。
そのままコピペしてスケジューリング実行させると、エラーが発生しました。
bash_task_2 = BashOperator(
task_id="bash_task_2",
bash_command="echo 'message: {{ dag_run.conf[\"message\"] }}'",
)
原因
dag_run.conf["message"] は、実行するときに渡した引数の値を取得するものでした。
スケジュール実行では、特に何も渡さないで実行することになっていたので、エラーとなっていました。
解決策
DUGの右上にある三角ボタンから、Trigger DAG w/ configボタンを押します。
その後、引数入力ができるので、JSON形式でパラメータを設定して実行すると解決しました。
課題 #3
SQLite3に接続できない
事象
課題 #3は、自分で建てたSQLiteにデータを追加していくワークフローです。
そのために、事前準備としてSQLiteを自身の環境にinstallして、データベースを作成しておきます。
参考:https://www.javadrive.jp/sqlite/database/index1.html
その後、githubのコードをコピペして実行すると、接続できないエラーが発生しました。
原因
接続情報を作成していないことが原因でした。
以下がDBへの接続の部分ですが、「sqlite_conn_id=SQLITE_CONN_ID」と接続情報のIDを指定しています。これに対応する接続情報を前もって作成しておく必要がありました。
SQLITE_CONN_ID = "sqlite_test_conn"
~~~省略~~~
create_db_table = SqliteOperator(
task_id="create_table_sqlite",
sqlite_conn_id=SQLITE_CONN_ID,
sql="""
CREATE TABLE IF NOT EXISTS Users (
id INT PRIMARY KEY,
name TEXT,
email TEXT,
timestamp TEXT
);
""",
)
解決策
メニューから、Admin→Connectionsを選択します。その画面から、新規に接続情報を作成します。
ここで、埋めなくてはいけない情報は3つです。
- Connection Id:コード側で指定した名前です。今回はsqlite_test_connです。
- Connection Type:Sqlite
- Host:事前に作成しているSQLiteのデータベースファイルの絶対パス
- (例)/root/work/airflow_handson/db_add/test_db
これらを埋めて再度実行したところ、エラーが解消されました。
おわりに
いくつかハマりポイントはありましたが、実際にAirflowを動かせて感覚がつかめた気がします。
特に、DAG同士の変数のやり取りや、実行時の引数のやり取りが楽にPythonで書けるのが良いところだなと思いました。
このあたり、他のワークフローツールだと独特の記法があったり、なにかファイルを介さないといけなかったり少し面倒な印象があります。
今度は、そのあたりのところを他のツールと比較しながら、実装レベルの理解を深めていきたいなと思います。