この記事の内容
これはまだ構想段階ではありますが、将来ECSで立てたコンテナ内のDigdagから、Lambdaをキックさせたいです。
その動作検証の意味で、まずはdocker-composeを使ってローカル環境にDigdagサーバーとPostgreSQLサーバーを立てて、その後ローカルのDigdagからLambda(API Gateway)を実行させました。
当記事はその備忘録です。
※ Lambdaの作成手順・API Gatewayの設定手順は記載しておりません。API Gatewayのエンドポイントへリクエストを送るとLambdaがキックされる状態であることを前提に話を進めております。
備忘録
ローカル環境構築
ローカルのディレクトリ構成ですが以下の通りです。
※ docker-compose実行後に「./digdag/bin/digdag」が生成されるはずです。
$ tree
.
├── digdag
│ ├── Dockerfile
│ ├── etc
│ │ └── digdag.properties
│ └── startup.sh
├── docker-compose.yml
├── postgres
│ └── Dockerfile
各ファイルの内容は以下の通りです。
- ./digdag/Dockerfile
FROM openjdk:8
# パッケージ管理システムのアップデート
RUN apt-get -y update && apt-get -y upgrade
# localeの設定
RUN apt-get -y install locales && \
localedef -f UTF-8 -i ja_JP ja_JP.UTF-8
ENV LANG ja_JP.UTF-8
ENV LANGUAGE ja_JP:ja
ENV LC_ALL ja_JP.UTF-8
# timezone (Asia/Tokyo)の設定
ENV TZ JST-9
# 必要なものインストール
RUN apt-get install -y vim postgresql-client
WORKDIR /root
- ./digdag/etc/digdag.properties
database.type = postgresql
database.user = fuka_user
database.password = fuka_pass
database.host = postgres_etl
database.port = 5432
database.database = digdag
- ./digdag/startup.sh
# !/bin/sh
# digdagをインストール(http://docs.digdag.io/getting_started.html)
curl -o ~/bin/digdag --create-dirs -L "https://dl.digdag.io/digdag-latest"
chmod +x ~/bin/digdag
echo 'export PATH="$HOME/bin:$PATH"' >> ~/.bashrc
. ~/.bashrc
digdag server --config ./etc/digdag.properties --bind 0.0.0.0 --port 65432 --task-log /var/log/digdag/ --access-log /var/log/digdag
- ./postgres/Dockerfile
FROM postgres:latest
- ./docker-compose.yml
version: '3'
services:
dbserver:
container_name: postgres_etl
build: ./postgres
environment:
- POSTGRES_USER=${POSTGRES_USER}
- POSTGRES_PASSWORD=${POSTGRES_PASSWORD}
- PGPASSWORD=${PGPASSWORD}
- DATABASE_HOST=${DATABASE_HOST}
- POSTGRES_DB=${POSTGRES_DB}
ports:
- 5432:5432
tty: true
digdag:
container_name: digdag_etl
build: ./digdag
volumes:
- ./digdag:/root
# /tmpディレクトリがないとDigdagサーバーのホーム画面が起動しないので明示的にマウント。
- /tmp:/tmp
ports:
- "0.0.0.0:65432:65432"
entrypoint: >
sh -c "/root/startup.sh && /bin/bash"
tty: true
networks:
etl_networks:
以下の通りdocker-composeを実行して、Digdagコンテナ・Digdagサーバー・PostgreSQLコンテナを起動します。
$ docker-compose build --no-cache
$ docker-compose up -d
Digdagコンテナの確認
ブラウザからDigdagサーバーにアクセスすると以下のホーム画面が表示されます。
ターミナルからDigdagコンテナに接続して、
1. ワークフローを作成
2. Digdagサーバーへプロジェクト名を指定してワークフローをPUSH
3. ワークフロー実行
の手順で進めることもできますが、今回はブラウザから進めていきます。
まずは画面右上の「New project」ボタンを押します。
すると以下のプロジェクト画面になるので、ここで好きなプロジェクト名を記入し、ワークフローを追加→保存(Add file→Save)していきます。
以下の通り、今回はLambdaに紐づいたAPI Gatewayのエンドポイントを実行する処理だけを追加(Add file)し、保存(Save)します。
PostgreSQLコンテナの確認
以下のようにdigdag DB内にアクセスし、
root@5dddd03ba95a:/# psql -d digdag -U fuka_user -h localhost
psql (13.4 (Debian 13.4-1.pgdg100+1))
Type "help" for help.
digdag-#
digdag-#
digdag-#
テーブル一覧を確認します。
するとDigdag用のテーブル達がデフォルトで作成されていることがわかります。
digdag-# \z
Access privileges
Schema | Name | Type | Access privileges | Column privileges | Policies
--------+-----------------------------+----------+-------------------+-------------------+----------
public | delayed_session_attempts | table | | |
public | projects | table | | |
public | projects_id_seq | sequence | | |
public | queue_settings | table | | |
public | queue_settings_id_seq | sequence | | |
public | queued_task_locks | table | | |
public | queued_task_locks_id_seq | sequence | | |
public | queued_tasks | table | | |
public | queued_tasks_id_seq | sequence | | |
public | queues | table | | |
public | queues_id_seq | sequence | | |
public | resuming_tasks | table | | |
public | resuming_tasks_id_seq | sequence | | |
public | revision_archives | table | | |
public | revision_archives_id_seq | sequence | | |
public | revisions | table | | |
public | revisions_id_seq | sequence | | |
public | schedules | table | | |
public | schedules_id_seq | sequence | | |
public | schema_migrations | table | | |
public | secrets | table | | |
public | secrets_id_seq | sequence | | |
public | session_attempts | table | | |
public | session_attempts_id_seq | sequence | | |
public | session_monitors | table | | |
public | session_monitors_id_seq | sequence | | |
public | sessions | table | | |
public | sessions_id_seq | sequence | | |
public | task_archives | table | | |
public | task_dependencies | table | | |
public | task_dependencies_id_seq | sequence | | |
public | task_details | table | | |
public | task_state_details | table | | |
public | tasks | table | | |
public | tasks_id_seq | sequence | | |
public | workflow_configs | table | | |
public | workflow_configs_id_seq | sequence | | |
public | workflow_definitions | table | | |
public | workflow_definitions_id_seq | sequence | | |
(39 rows)
ちなみにDigdagで実行するワークフローは、queuesテーブルなどに一時的に入れられロックされることで、重複して実行されないようになっています。
Digdagワークフローの実行(API Gatewayの実行)
以下の通り、先ほどDigdagホーム画面から作成したワークフローを「Workflows」より確認できます。
上記の右上のRUNボタンを押すと、ワークフローが実行され、Lambdaがキックされることになります。
※ 実行中にPostgreSQLのqueuesテーブルなどの中身を見てみると、実行中のワークフローがレコードとして登録されているので見てみると面白いかもしれません。
また処理失敗などしてしまい、再度途中から実行したい時などは、このAttemptという単位で途中から実行することができます。(実行履歴がPostgres内のテーブルに入っているため可能)
※ PostgreSQLでなくメモリでDigdagを起動している場合には多分不可能。