LoginSignup
17
3

会社のデータ分析環境をAWSで整えた話

Last updated at Posted at 2023-12-14

この記事はWano Group Advent Calendar 2023の15日目の記事となります。

経緯

私は2023年2月にTuneCore Japan1人目のData Analyticsチームメンバーとして株式会社WANOに入社しました。

当時は、開発チームがMySQLのテーブルデータをS3へ格納したものを見たいメンバーがTableauで見るという状態で、適切な分析の為のテーブル環境が整っていませんでした。

そこで、みんながデータ活用できるような環境をAWSで作っています。
現時点でまだ足らない箇所もありますが、現時点の内容を紹介したいと思います。

構成図

Tableau利用環境(旧).drawio (4).png

以前は

データを見たいメンバー -> Tableau ServerへアクセスしAthenaやGoogle Driveのデータを閲覧
開発メンバー      -> MySQLテーブルデータをS3へアップロードしAthenaテーブルのデータを更新

する形でした。

ここでの問題点は以下です。

  • テーブルが開発の為のものだからデータを見る側に適していない
  • Tableauで集計するまでにいくつものテーブルを結合する必要がある
  • データが重い
  • Tableau Serverを実用面で管理する人がいない

新(20231202現在)

Tableau利用環境(20231202現在).drawio (4).png

今は上記のような環境になっています。
行ったこととしては

  • データを見る側に適したテーブルを作るためのAnalytics AWS環境を作成し、Tableauから接続
  • S3に保存されているデータをparquet.snappyへ変更しクエリ実行時間を短縮
  • Tableau ServerのLB変更やインスタンスを追加し動作を快適に
  • BigQueryのみで使用しているデータを、Tableauからログデータも見れるように設定

です。

AWS TuneCore Japan Analyticsについて

Tableau利用環境(20231202現在).drawio (4) - コピー.png

この部分についてのお話です。

私が入社時、新たにAWS TuneCore Japan Analyticsアカウントを作成していただきました。
以降このAWSアカウントでDWH/Datamartテーブルを作成するPipeline環境を作っています。

テーブル作成Pipelineについて

基本的にはAWS TuneCore Japan ProdアカウントにあるAthenaテーブルやS3データを定期的に読み込んでCTASクエリを実行し、Analytics AWSアカウントのテーブルを更新するスタイルにしています。

例えば、Production AWSアカウント Athenaテーブルを用いて、Tableauで描画する為の集計用KPI確認テーブルを作るとします。
その場合、流れはStep Functionsで構築していて、

image.png

これが基本形になっています。

  1. クエリを解析し、CTASクエリかどうか・テーブルが存在するかどうか・S3の保存先URIはどこかを確認
  2. CTASクエリである場合、そのテーブルをDROP TABLEし、テーブルデータの保存先S3URI オブジェクトを削除します。
    1. CTASクエリでない場合は基本的に想定していません。
  3. CTASクエリを実行
  4. SNSへPublish(Slack通知用途)

という流れです。トリガーはEventBridge Schedulerで行っています。

DROP TABLE→CREATE TABLEで一時的にテーブルが削除されるので、実行は影響の少ない時間に行っています。
ですが、作り直しをすることでINSERTによる増分更新などと比較して、失敗した時の対処が楽という利点があります。

また、全体の構成として基本サーバーレスのサービスを用いているため、更新時のみ動きます。
そのためEC2インスタンス等で更新を行うよりも安価に稼働することができます。

これ以外にもSQLだけでは処理が行えないようなテーブルを作る時がありますが、その際はLambdaを立ち上げPythonで処理しています。EC2インスタンスは1つも使用していません。

CTASクエリが複数ある場合

上記の基本形のStep Functionsの流れを、Step Functions Distributed Map機能を用いて並列実行しています。

実際の運用ではこちらを主に使っています。

202312034.png

さきほどのフローと比較するとMapがついています。
デフォルトで1000の子ワークフローを実行することができます。
(とはいえAthenaはActive DDL/DML queriesのQuotaがあるので、そこも確認する必要があります)

並列実行対象の.sqlファイルがどこに入っているかを表すcsvファイルを読み取り実行しています。

csv
bucket,file_key
bucket-name,aggregate1.sql
bucket-name,aggregate2.sql
bucket-name,aggregate3.sql

リソース管理で後述しますが、このcsvファイルはGithubにあるResourceをAWS CodePipelineで読み取ってCodeBuildにより作成されます。

DWH/Datamartの階層化処理について

データソースの呼び名と用途や粒度の関係ですが、以下の様に私は認識しています。

Data Lake Data Ware House Data Mart
データ粒度
データ量
どんなもの 開発で用いているテーブルを書き出したもの。ログ。生データ データ活用の視点で使いやすい形に非正規化等されたテーブルデータ 単純な描画用。集計済みのテーブルデータ

よって、Data Ware HouseやData Mart用のテーブルを作成する際、

  • Data Ware Houseテーブルを作るとき、FROM句に他のData Ware House,Data Martテーブルを用いない
  • Data Martテーブルを作る時、From句に他のData Martテーブルを用いない

ように、sqlファイルをAWSへ配置する前にGithubのGithub Actionsで制御しています。

また、「CTASクエリが複数ある場合」のStep Functionsも、以下の様に別のStep Functionsステートマシンを用いてData Ware House→Data Martの順番で実行されるようにしています。

image.png

高速化されたテーブルの作成

MySQLからProduction AWSアカウントへ置かれるデータは、一部Tableauで用いるにはクエリ実行速度の面で使いづらいものでした。

また、2023年8月頃に極端にクエリ実行速度が遅くなる事象が発生していたため、Analytics AWSアカウント側で、列指向のデータファイル形式であるparquet.snappyに差し替えたテーブルを作成しました。

単一オブジェクトの場合

Athenaテーブルで参照しているオブジェクトが単一である場合、Lambda S3 Trigger機能を用いて

sql
CREATE TABLE tunecorejapan.parquet_snappy_table_xxx WITH (
    format = 'PARQUET',
    external_location = 's3://bucket/parquet_snappy_table_xxx/',
    parquet_compression = 'SNAPPY'
) AS
SELECT * FROM table_xxx

のような拡張子だけ変更しテーブルを作成するクエリを単一クエリをDROP→CTASし更新するStep Functionsで実行し更新しています。

複数オブジェクトの場合

ものによっては、HIVEスタイルのパーティション分割を利用し、1つのテーブルに対して複数のオブジェクトからなるものがあります。

この場合、Lambda S3 Triggerではオブジェクトの分だけトリガーが発生してしまうので、AWS Glue ETL Jobsで以下のようなジョブを作成して定期実行しています。

image.png

  1. Data Source - S3 BucketにはProduction AWS アカウントのHIVE式パーティションされているS3 URIを指定
  2. Change Schemaを定義
  3. AWS Analytics S3にparquet.snappyでデータを書き出しテーブルも作成

の流れです。

Change Schemaについては、型によって間違った定義をするとデータが消えたりしますので慎重に行っています。

最後のData TargetでS3にオブジェクトを置くのと同時にAthenaへテーブルも作ることができるので、テーブルを作成します。

image.png

Bookmark機能をONにすれば、一度処理したデータについては再度処理されませんので、それで高速化テーブルを更新しています。

(初回で過去分データを一気に拡張子変換し、その後は更新分だけ変換できる)
202312035.png

リソース管理

リソース管理は全てGitHubを用いています。

Tableau利用環境(リソース管理).drawio.png

Githubで特定ブランチへPushした時、AWS CodePipelineがそれをResourceとして、AWS Codebuildが走るようになっています。
そして、buildspec.ymlから、実行に必要なリソースをS3へ配置したり、aws cloudformation package,aws cloudformation deployコマンドを実行しAWSリソースを作成・更新しています。

Lambda,Step Functionsの更新

CloudFormationでLambda Zipファイル内のlambda_functions.py中身を変更してもCloudFormationスタックから見ると「更新がある」と判定されない為、工夫が必要です。

私が行った工夫はリソースを配置するS3バケットに対してバージョニングを有効にし、Buildspecが走るたびに置かれるリソースの最新バージョンIDを取得し、それを用いてaws cloudformation deployを実行しています。

簡単な例としては以下のような形です。

cfn.yml
Parameters: 
  DeleteS3ObjectsParam: 
    Type: String
    Default: '' -- オブジェクトの最新バージョンIDが入る
Resources: 
  DeleteS3Objects:
    Type: AWS::Lambda::Function
    Properties:
      Code:
        S3Bucket: !Sub ${ResourceBucket}
        S3Key: lambda/src/delete-s3-objects.zip
        S3ObjectVersion: !Ref DeleteS3ObjectsParam -- S3keyの最新バージョンIDが入る
      FunctionName: delete-s3-objects
      Handler: lambda_function.lambda_handler
      Role: !GetAtt CommonServiceRole.Arn
      Runtime: python3.9
      Timeout: 900
buildspec.yml
version: 0.2


phases:
  pre_build:
    commands:
      - echo check aws cli version
      - aws --version
  build:
    commands:
      - echo Create Lambda Function zip
      - bash aws/script/create_zip_files.sh -- lambdaのzipファイルを作成するスクリプト
      - echo get Lambda Zip Version ID -- zipファイルの最新オブジェクトバージョンIDを取得
      - DELETE_S3_OBJECTS=$(aws s3api list-object-versions --bucket bucket-name --prefix lambda/src/delete-s3-objects.zip | jq -r '.Versions[0].VersionId')
  post_build:
    commands:
      - echo deploy dwh/datamart environment cloudformation
      - aws cloudformation package --template-file aws/cloudformation/dwh-operation-environment.yml --s3-bucket $target_bucket --s3-prefix cloudformation/package --output-template-file /tmp/cloudformation-$dwh_stack_name.yml --force-upload
      - | 
        aws cloudformation deploy --no-fail-on-empty-changeset --template-file /tmp/cloudformation-$dwh_stack_name.yml --stack-name $dwh_stack_name --capabilities CAPABILITY_NAMED_IAM CAPABILITY_IAM \
        --parameter-overrides \
        "DeleteS3ObjectsParam=$DELETE_S3_OBJECTS"  -- パラメータを最新バージョンIDでoverride

これにより、Githubで管理しているLambda,Step Functionsのコードを変更し、CloudFormationのスタック更新で反映させるようにしています。

Tableau Serverについて

Tableau利用環境(20231202現在).drawio (4).png

この部分についてのお話です。

Tableau Server Version

2021.1を使用していたので、2023.1にアップグレードしました。
アクションやパラメータがServerで使えなかったのと、Tableau app for Slackを導入したかったのでアップグレードを実施しました。

その他変更点で良かったものは、

  • データに聞く」がViewerでも使用可能
  • 「アクション」がServerでも使用可能
  • 「パラメータアクション」がServerでも使用可能
  • 「セットアクション」がServerでも使用可能
  • 「上位 or 下位」フィルタがServerでも使用可能
  • Tableau ServerのSlackワークスペース連携
  • 「ダッシュボードオブジェクトの並べ替え」がServerでも使用可能
  • ライブ接続における更新頻度を設定可能(データ鮮度ポリシー
  • Viewの読み込み時間高速化
  • ワークブックオプティマイザーによるパフォーマンスの評価が使用可能
  • シートの詳細が使用可能
  • アクセラレーション機能が使用可能

です。

Load Balancer

Lightsail Load Balancerを通じてインスタンスを動かしていましたが、ConnectionのTimtout設定が60秒から変更できず、Tableau ServerでのAthena接続時に60秒経過するとエラーを吐くためnginxのLoad Balancingに切り替えました。

Tableau ServerとAthenaの接続

AWSアカウントが2つ存在することにより、TableauからAthenaへの接続用アクセスキーが2つ存在していて、使いづらくなっていました。

Tableau利用環境(20231202現在).drawio (3) - コピー.png

これを1つにする対処として、まずはAnalytics AWSアカウント側からProduction AWSアカウントのAthenaテーブルを読み取れるようにしました。
これにより、Analytics AWSアカウントへ接続した場合には、Production AWSアカウント側で見れるデータを包含できます。

ただし、注意点としてAthena データソースをTableau側で変更するには、athena.propertiesファイルを作成し所定の場所に置く必要があります。
これはTableau Server,Desktop,Prep Builder全て該当するので、Desktop,Prep Builderを用いるユーザーには、ローカルPCにathena.propertiesファイルを所定の場所に置いてもらうという案内が必要です。

最後に、Production AWSアカウント側のアクセスキーを既に使っているTableauワークブックについて対処する必要があります。
これについては対象ワークブックの数も50程度であり、数が少なかったので全て手作業で移行しました。
Tableau REST APIを用いて接続情報を変更するという手もありますが、ワークブックによってはアクション機能が失われたり、接続の関係が失われたりするので、手作業は免れませんでした。

GCP BigQueryについて

202312031.png

一部、BigQueryにもテーブルを作成している為、これをTableauでも使えるようにしています。

Tableau Serverへの接続

TableauはBigQueryとの接続もサポートされているので、接続用の権限を設定し使えるようにしました。

Amazon Athenaとの接続

Amazon Athena Google BigQuery Connectorの環境を作成し、Athenaへデータソースを作成しています。

これにより、BigQueryのデータとAthenaのデータを突合することができます。
ですが内部的にはLambdaを動かしているためクエリの実行時間は15分までの制約があります。

さいごに

以上が現在のデータ分析環境の概要です。

環境構築の部分で未だ問題だと思っている点は

  • Production AWSアカウント、Analytics AWSアカウント、Tableau 抽出機能でデータ更新タイミングを完全同期していない
    • 求められるデータ鮮度の要件にもよりますが、スケジューラによる日次更新を行っているだけなので、Tableau上では1日~最悪2日程度データ更新にラグが発生します。
    • Tableau 抽出機能については、Tableau REST APIを用いてAnalytics AWSアカウントのテーブル更新時に対象データソースの抽出を更新するようすればできなくはないと思っています。
  • 高速化されたテーブルの更新がDROP TABLE -> CREATE TABLEの為、実行中にクエリ実行されるとエラーになる
    • UNLOADを用いることで回避できますが、そうするとMySQLテーブルのスキーマが変更されたときに対応できません。いい方法を探しています。

です。今後はこの環境をもとにデータ利活用の促進をしていこうと思います。

ここまで閲覧いただきありがとうございました。何かしら参考になれば幸いです。

人材募集

現在、Wanoグループでは人材募集をしています。興味のある方は下記を参照してください。
Wano | Wano Group JOBS
TuneCore Japan | TuneCore Japan JOBS

17
3
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
17
3