背景
- Airflowを使ってGCPのタスクを管理したい
- CloudComposerを使うほどでもない
- Docker使ってサクッと環境作りたい
- または、CloudComposerの開発環境としてDockerでCerery環境作りたい
Docker-Airflowを用意する
- docker-airflowを使う
- docker-airflow を clone して、プロジェクト内に移動
- README通りに image を pull する
docker pull puckel/docker-airflow
- Dockerがない場合はここから -> Get started with Docker Desktop for Mac
- Dockerがわからない場合はこの6回までを1通り読むのがオススメ -> Docker入門(第一回)~Dockerとは何か、何が良いのか~
Dockerfileを編集する
-
RUN
の中に BigQueryなどのGCP周りを触るためのライブラリをpip install
する
# 略
RUN set -ex \
&& buildDeps=' \
freetds-dev \
libkrb5-dev \
libsasl2-dev \
libssl-dev \
libffi-dev \
libpq-dev \
git \
' \
&& apt-get update -yqq \
&& apt-get upgrade -yqq \
&& apt-get install -yqq --no-install-recommends \
$buildDeps \
freetds-bin \
build-essential \
default-libmysqlclient-dev \
apt-utils \
curl \
rsync \
netcat \
locales \
&& sed -i 's/^# en_US.UTF-8 UTF-8$/en_US.UTF-8 UTF-8/g' /etc/locale.gen \
&& locale-gen \
&& update-locale LANG=en_US.UTF-8 LC_ALL=en_US.UTF-8 \
&& useradd -ms /bin/bash -d ${AIRFLOW_USER_HOME} airflow \
&& pip install -U pip setuptools wheel \
&& pip install pytz \
&& pip install pyOpenSSL \
&& pip install ndg-httpsclient \
&& pip install pyasn1 \
&& pip install apache-airflow[crypto,celery,postgres,hive,jdbc,mysql,ssh${AIRFLOW_DEPS:+,}${AIRFLOW_DEPS}]==${AIRFLOW_VERSION} \
&& pip install 'redis==3.2' \
&& pip install httplib2 \
# ここから↓
&& pip install google \
&& pip install google-cloud \
&& pip install google-cloud-vision \
&& pip install google-auth-httplib2 \
&& pip install --upgrade google-api-python-client \
&& pip install --upgrade google-cloud-storage \
# ここまで追加
&& if [ -n "${PYTHON_DEPS}" ]; then pip install ${PYTHON_DEPS}; fi \
&& apt-get purge --auto-remove -yqq $buildDeps \
&& apt-get autoremove -yqq --purge \
&& apt-get clean \
&& pip install pandas-gbq \
&& rm -rf \
/var/lib/apt/lists/* \
/tmp/* \
/var/tmp/* \
/usr/share/man \
/usr/share/doc \
/usr/share/doc-base
COPY script/entrypoint.sh /entrypoint.sh
# 略
DockerをビルドしてImage作成する
-
puckel/docker-airflow:for_gcp
っていうimageになるように指定してbuildする
docker build --rm --build-arg AIRFLOW_DEPS="datadog,dask" --build-arg PYTHON_DEPS="flask_oauthlib>=0.9" -t puckel/docker-airflow:for_gcp .
docker-composer.ymlを編集する
- 今回はCeleryExecutorを使いたいので
docker-compose-CeleryExecutor.yml
を編集する - defaultでは image が
puckel/docker-airflow:1.10.4
が指定されているので、puckel/docker-airflow:for_gcp
を使うようにする
version: '2.1'
services:
redis:
image: 'redis:5.0.5'
# command: redis-server --requirepass redispass
# == 略 ==
webserver:
# image: puckel/docker-airflow:1.10.4 ## <- ここ
image: puckel/docker-airflow:for_gcp ## <- ここ
restart: always
depends_on:
- postgres
- redis
environment:
- LOAD_EX=n
- FERNET_KEY=46BKJoQYlPPOexq0OhDZnIlNepKFf87WFwLbfzqDDho=
- EXECUTOR=Celery
# - POSTGRES_USER=airflow
# - POSTGRES_PASSWORD=airflow
# - POSTGRES_DB=airflow
# - REDIS_PASSWORD=redispass
volumes:
- ./dags:/usr/local/airflow/dags
# Uncomment to include custom plugins
# - ./plugins:/usr/local/airflow/plugins
ports:
- "8080:8080"
command: webserver
healthcheck:
test: ["CMD-SHELL", "[ -f /usr/local/airflow/airflow-webserver.pid ]"]
interval: 30s
timeout: 30s
retries: 3
flower:
# image: puckel/docker-airflow:1.10.4 ## <- ここ
image: puckel/docker-airflow:for_gcp ## <- ここ
# 以下同様
BigQuery使う簡単なDagを作る
- 以下のような
$AIRFLOW_HOME/dags
に置く
bq_test.py
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
from airflow.contrib.operators.bigquery_operator import BigQueryOperator
default_args = {
"owner": "airflow",
"depends_on_past": False,
"start_date": datetime(2015, 6, 1),
"email": ["airflow@airflow.com"],
"email_on_failure": False,
"email_on_retry": False,
"retries": 1,
"retry_delay": timedelta(minutes=5),
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
# 'end_date': datetime(2016, 1, 1),
}
dag = DAG("bq_test", default_args=default_args, schedule_interval=timedelta(1))
t1 = BashOperator(task_id="print_date", bash_command="date", dag=dag)
t2 = BashOperator(task_id="sleep", bash_command="sleep 5", retries=3, dag=dag)
t3 = BigQueryOperator(
task_id='bq_writing',
use_legacy_sql=False,
write_disposition='WRITE_TRUNCATE',
allow_large_results=True,
bql='''
#standardSQL
SELECT
*,
CASE
WHEN ENDS_WITH(email, "@gmail.com") AND REGEXP_CONTAINS(email, r".*hoge.*@gmail.com") THEN concat("ok", '@example.com')
WHEN ENDS_WITH(email, "@gmail.com") AND NOT REGEXP_CONTAINS(email, r".*hoge.*@gmail.com") THEN concat("no", '@example.com')
ELSE concat("ii", '@hoge.com')
END
FROM `project.dataset.table`
''',
destination_dataset_table='proj.datasource.table',
dag=dag)
t2.set_upstream(t1)
t3.set_upstream(t1)
docker-compose 起動
docker-compose -f docker-compose-CeleryExecutor.yml up -d