9
6

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

AirflowをDockerで構築して、BigQueryをいじるDAGをサクッと作成する

Posted at

背景

  • Airflowを使ってGCPのタスクを管理したい
  • CloudComposerを使うほどでもない
  • Docker使ってサクッと環境作りたい
  • または、CloudComposerの開発環境としてDockerでCerery環境作りたい

Docker-Airflowを用意する

  • docker-airflowを使う
  • docker-airflow を clone して、プロジェクト内に移動
  • README通りに image を pull する

docker pull puckel/docker-airflow

Dockerfileを編集する

  • RUN の中に BigQueryなどのGCP周りを触るためのライブラリを pip install する

# 略

 RUN set -ex \
     && buildDeps=' \
         freetds-dev \
         libkrb5-dev \
         libsasl2-dev \
         libssl-dev \
         libffi-dev \
         libpq-dev \
         git \
     ' \
     && apt-get update -yqq \
     && apt-get upgrade -yqq \
     && apt-get install -yqq --no-install-recommends \
         $buildDeps \
         freetds-bin \
         build-essential \
         default-libmysqlclient-dev \
         apt-utils \
         curl \
         rsync \
         netcat \
         locales \
     && sed -i 's/^# en_US.UTF-8 UTF-8$/en_US.UTF-8 UTF-8/g' /etc/locale.gen \
     && locale-gen \
     && update-locale LANG=en_US.UTF-8 LC_ALL=en_US.UTF-8 \
     && useradd -ms /bin/bash -d ${AIRFLOW_USER_HOME} airflow \
     && pip install -U pip setuptools wheel \
     && pip install pytz \
     && pip install pyOpenSSL \
     && pip install ndg-httpsclient \
     && pip install pyasn1 \
     && pip install apache-airflow[crypto,celery,postgres,hive,jdbc,mysql,ssh${AIRFLOW_DEPS:+,}${AIRFLOW_DEPS}]==${AIRFLOW_VERSION} \
     && pip install 'redis==3.2' \
     && pip install httplib2 \
# ここから↓
     && pip install google \
     && pip install google-cloud \
     && pip install google-cloud-vision \
     && pip install google-auth-httplib2 \
     && pip install --upgrade google-api-python-client \
     && pip install --upgrade google-cloud-storage \
# ここまで追加
     && if [ -n "${PYTHON_DEPS}" ]; then pip install ${PYTHON_DEPS}; fi \
     && apt-get purge --auto-remove -yqq $buildDeps \
     && apt-get autoremove -yqq --purge \
     && apt-get clean \
     && pip install pandas-gbq \
     && rm -rf \
         /var/lib/apt/lists/* \
         /tmp/* \
         /var/tmp/* \
         /usr/share/man \
         /usr/share/doc \
         /usr/share/doc-base

 COPY script/entrypoint.sh /entrypoint.sh

# 略

DockerをビルドしてImage作成する

  • puckel/docker-airflow:for_gcp っていうimageになるように指定してbuildする

docker build --rm --build-arg AIRFLOW_DEPS="datadog,dask" --build-arg PYTHON_DEPS="flask_oauthlib>=0.9" -t puckel/docker-airflow:for_gcp .

docker-composer.ymlを編集する

  • 今回はCeleryExecutorを使いたいので docker-compose-CeleryExecutor.yml を編集する
  • defaultでは image が puckel/docker-airflow:1.10.4 が指定されているので、 puckel/docker-airflow:for_gcp を使うようにする
version: '2.1'
 services:
     redis:
         image: 'redis:5.0.5'
         # command: redis-server --requirepass redispass
# == 略 ==
     webserver:
       # image: puckel/docker-airflow:1.10.4 ## <- ここ
         image: puckel/docker-airflow:for_gcp ## <- ここ
         restart: always
         depends_on:
             - postgres
             - redis
         environment:
             - LOAD_EX=n
             - FERNET_KEY=46BKJoQYlPPOexq0OhDZnIlNepKFf87WFwLbfzqDDho=
             - EXECUTOR=Celery
             # - POSTGRES_USER=airflow
             # - POSTGRES_PASSWORD=airflow
             # - POSTGRES_DB=airflow
             # - REDIS_PASSWORD=redispass
         volumes:
             - ./dags:/usr/local/airflow/dags
             # Uncomment to include custom plugins
             # - ./plugins:/usr/local/airflow/plugins
         ports:
             - "8080:8080"
         command: webserver
         healthcheck:
             test: ["CMD-SHELL", "[ -f /usr/local/airflow/airflow-webserver.pid ]"]
             interval: 30s
             timeout: 30s
             retries: 3

     flower:
       # image: puckel/docker-airflow:1.10.4 ## <- ここ
         image: puckel/docker-airflow:for_gcp ## <- ここ

# 以下同様

BigQuery使う簡単なDagを作る

  • 以下のような$AIRFLOW_HOME/dags に置く
bq_test.py

from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
from airflow.contrib.operators.bigquery_operator import BigQueryOperator


default_args = {
    "owner": "airflow",
    "depends_on_past": False,
    "start_date": datetime(2015, 6, 1),
    "email": ["airflow@airflow.com"],
    "email_on_failure": False,
    "email_on_retry": False,
    "retries": 1,
    "retry_delay": timedelta(minutes=5),
    # 'queue': 'bash_queue',
    # 'pool': 'backfill',
    # 'priority_weight': 10,
    # 'end_date': datetime(2016, 1, 1),
}

dag = DAG("bq_test", default_args=default_args, schedule_interval=timedelta(1))

t1 = BashOperator(task_id="print_date", bash_command="date", dag=dag)
t2 = BashOperator(task_id="sleep", bash_command="sleep 5", retries=3, dag=dag)

t3 = BigQueryOperator(
    task_id='bq_writing',
    use_legacy_sql=False,
    write_disposition='WRITE_TRUNCATE',
    allow_large_results=True,
    bql='''
    #standardSQL
    SELECT
      *,
      CASE
        WHEN ENDS_WITH(email, "@gmail.com") AND REGEXP_CONTAINS(email, r".*hoge.*@gmail.com") THEN concat("ok", '@example.com')
        WHEN ENDS_WITH(email, "@gmail.com") AND NOT REGEXP_CONTAINS(email, r".*hoge.*@gmail.com") THEN concat("no", '@example.com')
        ELSE concat("ii", '@hoge.com')
      END
    FROM `project.dataset.table`
    ''',
    destination_dataset_table='proj.datasource.table',
    dag=dag)

t2.set_upstream(t1)
t3.set_upstream(t1)

docker-compose 起動


docker-compose -f docker-compose-CeleryExecutor.yml up -d
9
6
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
9
6

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?