LoginSignup
18
14

More than 1 year has passed since last update.

初めてECS+Digdag+Embulkでデータ分析基盤を作った話

Last updated at Posted at 2022-09-28

こんにちは、theLetterの荻田です。

データ分析基盤を作る機会があり、拡張のしやすさ・現状のデータ量や仕様に合うか・予算問題などを考えた結果どう判断したのかという過程と実装を紹介します。
今後運用する上で出てきた改善点や課題などは半年後くらいに振り返りの記事を書こうと思います。

気になることがあれば気軽にDM(@kai_ogita)してください
一緒に技術選定から実装までゴリゴリやりたい人募集中です! theLetter採用ページ

About me

  • サーバーサイドエンジニアの人
  • TreasureDataやBigqueryは本当に少し触ったことある
  • ETLやデータ分析基盤などの知識は0
  • GCPよりAWSに触れてきた

About theLetter

theLetter はニュースレターメディアを誰もがつくれるプラットフォームで、現在はリリース数ヶ月で読者数15 万人を突破しており、初期フェーズの小規模スタートアップだとしてもトラフィック対策や分析基盤構築の優先度が上がってきている状況です。

データ分析基盤を何に使いたいか

  • ユーザーにサービス内のダッシュボードで、PVやその他の数字をフィードバックしたい
    • イメージ画像(引用元https://ghostboard.io/)
    • よくある日別PV数とは違い、流入経路別や特殊なエンゲージメント計算、メールのログの集計、コホート、etcなど表示したい項目が将来的にも多岐にわたるイメージです。
      sample
  • 障害時にログを追いやすくしたい
  • 事業上必要な数字を俯瞰的にダッシュボードで見たい
    などの理由があり、一番の理由はサービス内ダッシューボード機能の作成でした。

要望

  • 更新頻度は基本日次、種類によっては3時間程度で更新したい場合がある
  • データ基盤の専任はいないので、管理するものが少ない方がいい
  • 固定費はできるだけ下げたい(変動費は許容)
  • 今後様々なワークフローが増える予定
  • スケジューリングとリトライがある
  • データレイクはS3固定
    などのスタートアップ的な理由も多々入っています。

構成について

大きく考える構成部分は2つあり、ETLを何で組むか、データウェアハウス(クエリサービス)に何使うかになると思います。
データ基盤のプロでもなく調べ尽くしたわけでもないですが、比較した対象と比較項目を以下にまとめました。

1. ETLをどうするか

ETL SaaS系 CDPの機能 自前ECS+Embulk EC2+Embulk AWS Glue GCP Dataflow
料金 × Saas 使用料金初期10万/m~ 大抵30万/m~ × 弊社パターン試算 15万/m~ ◎ 1万/m~ ◎ 1万/m~ ◎ 0.1万/m~ ◎ 0.1万/m~
I/Oの拡張性 ◎ サービスによるがほぼ全て ◎ サービスによるがほぼ全て
学習コスト
インフラの管理コスト なし なし
リアルタイム反映 サービスによる サービスによる
インフラ構築難易度 なし なし なし なし

自前で用意するか、お金で解決かの天秤ぽくなった結果、ETLは自前でECS+Embulkを採用しました。

CDPやSaaSのプランを見るとすぐに月額30万~になりそうで、スタートアップにはこの費用が割と痛い(もしかすると割引などあるかも)
DataflowとGlueについて色々調べたが、Embulkと比べて採用事例などが少なく、学習コストも高くなりそうな不安があったため見送りました。
何もわからん状態だった初期はDataflow,Glueはとっつきにくかったですが、ベースの知識を持った今なら選択肢に入ります。

今後、資金調達などを行いエンジニアが増えた際には、ETL部分はSaaSに移動する可能性はあります。

重たい処理がなく、ETL基盤を拡張しない予定であれば一番安いプランのSaaSで始めるのが正解かと思います。

2. DWH(クエリサービス)に何を使うか

CDP BigQuery Elasticsearch Redshift
GUI
料金 × 15万/m~ ◎ 主にクエリ課金 △ インスタンス時間課金 △ インスタンス時間課金
構築のしやすさ ×
データ量が10xになった時 △ 設定変更必須

BigQueryを選択しました。

RedshiftやAthenaなどでAWS内で完結するのもよかったのですが、相談に乗ってくださったエンジニアの方や様々な企業がBigQueryに移行しており、将来同じ問題で移行するのは嫌なので除外しました。(Redshift Serverlessは期待しています。)

作りたいものが実現はできるためElasticsearch(Opensearch)を比較対象として入れていますが、リアルタイム性がそこまで必要がない、かつ容量問題やindexの面倒を見ないといけないため、今回のパターンではあまりメリットがなさそうでした。

例外として、全部MongoDBに入れられれば幸せだったかもしれません。

最終的な構成

インフラ周りにはECS/Fargateを採用しました。
ワークフローエンジンはDigdagで、DockerにDigdag serverを立てて、実行するワークフローのタスク毎に、新規でその処理にあったスペックのECSタスクを立ち上げる構成にしました。

構成図だと以下のようになっています。

Digdag構成図-展望v2-現状.drawio.png

構築して感じたメリット

  • Digdagから起動するECSタスクを選択できるため、ワークフローのタスク別にサーバーのスペックを変更でき料金の節約が可能
  • Dockerなのでローカルデバックが可能
  • 同時刻に処理が走っても個別ECSタスクなので影響を受けることがない(ハズ)
  • SaaSも裏側はDigdagだったりとベースの知識を入れることができた。

感じたデメリット

  • Digdagが割と学習コストがかかった
  • SaaS系と比べて見るところが多い
  • お金があればお金で解決がやっぱり正解カモ

実際の構築と使い方について

ファイル構成はこのようになっています(一部抜粋)

.
├── Dockerfile #ビルド用のDockerfile
├── config
│   ├── server.local.properties #digdag用で使用するproperties
│   ├── server.production.properties
│   └── server.staging.properties
├── docker-compose.yml
├── docker-entrypoint.sh
├── ses-etl                         ##project dir
│   ├── emaillog-to-bq.dig                 # workflow
│   ├── ses_log_s3_to_bq.yml.liquid #embulkのinput/outputのconfig
│   ├── setup
│   │   ├── __init__.py             #env読み込み用script
│   │   └── config.yml              #env読み込み用file

Dockerfile

次のようにして、Embulk,Digdag,Python3,google-cloud-sdk,その他必要なものをinstallしています。

javaは詳しくないですが、Emublkの対応する最新まで上げる方がセキュリティ的にいいと思います。

FROM openjdk:8-slim

ENV LANG=C.UTF-8 \
    PATH_TO_EMBULK=/opt/embulk \
    PATH=${PATH}:/opt/embulk

RUN ln -sf /usr/share/zoneinfo/Asia/Tokyo /etc/localtime

RUN apt-get update && apt-get install -y curl vim
ARG CLOUD_SDK_VERSION=393.0.0
ENV CLOUD_SDK_VERSION=$CLOUD_SDK_VERSION
RUN apt-get update -qqy && apt-get install -qqy \
        curl \
        gcc \
        python3-dev \
        python3-pip \
        apt-transport-https \
        lsb-release \
        openssh-client \
        git \
        gnupg && \
    pip3 install -U crcmod && \
    export CLOUD_SDK_REPO="cloud-sdk-$(lsb_release -c -s)" && \
    echo "deb https://packages.cloud.google.com/apt $CLOUD_SDK_REPO main" > /etc/apt/sources.list.d/google-cloud-sdk.list && \
    curl https://packages.cloud.google.com/apt/doc/apt-key.gpg | apt-key add - && \
    apt-get update && apt-get install -y google-cloud-sdk=${CLOUD_SDK_VERSION}-0 $INSTALL_COMPONENTS && \
    gcloud config set core/disable_usage_reporting true && \
    gcloud config set component_manager/disable_update_check true && \
    gcloud config set metrics/environment github_docker_image && \
    gcloud --version

# Embulk install
RUN mkdir -p ${PATH_TO_EMBULK} \
    && curl --create-dirs -o ${PATH_TO_EMBULK}/embulk -L "https://dl.embulk.org/embulk-0.9.24.jar" \
    && chmod +x ${PATH_TO_EMBULK}/embulk

RUN curl -o /usr/local/bin/digdag -L "https://dl.digdag.io/digdag-latest" && chmod +x /usr/local/bin/digdag

RUN embulk gem install public_suffix -v 4.0.7 && embulk gem install jwt:2.3.0 && embulk gem install embulk-output-bigquery embulk-filter-add_uuid embulk-parser-jsonpath embulk-input-s3 embulk-parser-jsonl embulk-input-union embulk-filter-row embulk-filter-column embulk-output-bigquery

RUN python3 -m pip install pyyaml

ENV PATH $PATH:/usr/local/gcloud/google-cloud-sdk/bin

WORKDIR /app

ENTRYPOINT ["./docker-entrypoint.sh"]

docker-entrypoint.sh

entrypointでは基本的に認証情報の生成とコマンドの実行をやっています。

#!/bin/bash
# set -e

# aws credentialの登録
# ~~~~~
# gcp credentialの登録

exec "$@"

docker-compose.yml

Digdag用のコンテナとPostgresのコンテナを立ち上げています。
AWS環境だとログファイルの保存先はS3が追加になりますが、今回はローカル保存になります。

credential情報はAWS system manager parameter storeから取得しています。

version: '3'
services:
  digdag:
    container_name: local-digdag
    build:
      context: .
      dockerfile: Dockerfile
    tty: true
    volumes:
      - ./:/app/
      - /var/run/docker.sock:/var/run/docker.sock
    ports:
      - 65432:65432
    environment:
      DIGDAG_ENV: dev
      AWS_ACCESS_KEY: 
      AWS_SECRET_ACCESS_KEY: 
      secret_gcp_credential: 
    command: "/bin/sh -c 'digdag server --config ./config/server.local.properties --task-log ./task_log"
  postgres:
    image: postgres:13.1-alpine
    ports:
      - "5432:5432"
    environment:
      POSTGRES_DB: 
      POSTGRES_USER: 
      POSTGRES_PASSWORD: 

Digdag Taskの内容(サンプル)

ワークフロー中のタスク +sample は_export設定でECSのtask definitionを指定しているため、ここのタスクのみECSで新規コンテナが立ちその中で処理されます。

しかしローカル開発環境では、ECSと連携せずローカルで処理し、Digdagのワークフローも同じファイルを違う環境で使いまわしたいです。
その場合Digdag command executorにはfallbackが設定されているので、Digdag起動時に読み込ませるconfigにagent.command_executor.ecs.~~~~という設定を書かなければ、自動でローカル実行になります。(Dockerで実行することも可能です)

timezone: Asia/Tokyo
schedule:
  daily>: 10:00:00
_export:
  plugin:
    repositories:
      - https://jitpack.io
    dependencies:
      - com.github.szyn:digdag-slack:0.1.2
  webhook_url: 
  workflow_name: slack
  py:
    python: /usr/bin/python3

+sample:
  _export:
    s3_path_prefix: ${s3.path_prefix}
    ecs:
      task_definition_arn: "arn:aws:ecs:xxx"
  sh> echo "run!" #ECSで実行されLogに出力される

IAM Policy(一部抜粋)

S3への権限、ECSタスク起動の権限、Log取得の権限をそれぞれのディレクトリに最小限に対象を絞って付与しています。
ここあたりの設定が抜けていると、DigdagからECSタスクの起動やDigdagの実行に失敗します。

{
    "Version": "2012-10-17",
    "Statement": [
        ...
        {
            "Sid": "VisualEditor1",
            "Effect": "Allow",
            "Action": [
                "s3:PutObject",
                "s3:GetObject",
                "iam:PassRole",
                "ecs:RunTask",
                "ecs:ExecuteCommand",
                "logs:GetLogEvents",
                "ecs:StartTask",
                "s3:ListBucket",
                "ecs:StopTask",
                "s3:DeleteObject",
                "ecs:DescribeTasks",
                "ecs:DescribeClusters"
            ],
            "Resource": [
                ...
            ]
        }
    ]
}

ECSタスク・サービス

特別なことをしているわけではないので割愛

大きめのRDBのデータや大量のS3のログなどEmbulkで処理すると、その分ディスク容量*1.5倍ほどの容量必要となるため、それを見越したディスク容量を心がけると良さそうです。

現状の料金

固定で必須なのは、料金はELBとECSのDigdag serverの料金になり、
従量課金はBigQueryのクエリ課金・容量課金・ECSのタスクの起動時間課金やS3・外部サービスなどへの転送料などです。

将来構成

Digdag%E6%A7%8B%E6%88%90%E5%9B%B3-%E5%B1%95%E6%9C%9Bv2.drawio.png

  • HA構成
  • GitHub Actionsなどを用いたCD環境の構築
  • MYSQL,CloudWatchなどのその他INPUTソースの追加
  • 一部S3のGlacierに逃す
  • Emblukのアップデート
  • Digdagの起動時間を計算して、年払いかスポットインスタンス化させる
    など対応をフェーズに合わせて対応できればと思っています。

所感

何もわからん状態からちまちま作り、現在状況でのコスト最適化も行うことができ、
将来サービスが大きくなることを前提として、大きくなった時の対応も可能な構成になったのかなと思っています。
この基盤開発は「総合格闘技」と言われていた記事(※1)もあり、規模は違えど、「その通りだな」という一部を味わえました。

まだ運用を始めて日が浅いので、また半年後や1年後などに改善や問題などがあれば今後記事にしていきたいと思います。

余談

データ基盤・ログ基盤・データ分析基盤など呼び方が多々あってよくわからない・・・泣

先日、googleから「Datastream for BigQuery PREVIEW」が発表されました。
ETLなしでMysqlなどのデータソースからニアリアルタイムで直接データのレプリケーションをBigQueryに同期してくれるサービスです。
MYSQLの自社データをEmbulkを使ってBigqueryに同期している場合は、料金によっては検討してみても良いかもしれないです。
データのマスキングなどの処理する部分があれば最強かも?(自前でパイプライン組むのかな)
BigQueryへMySQLやPostgreSQLから直接ニアリアルタイムでレプリケーション可能に。「Datastream for BigQuery」登場

参考にさせていただいた記事

thanks
・はっさん(@hassasa3)

18
14
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
18
14