1
2

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

AWS S3, Athena, Glue を活用した統合データ分析基盤 - AWS RDS, Kintone, Freee のデータを一元管理

Last updated at Posted at 2024-05-29

社内で RDS, Kintone, Freee など様々なサービスを使っていて、それらの情報をー箇所で集約し、分析するためのデータ基盤の必要がありました。
そのために以下の手順でデータ基盤を構築してみました。

  1. データのまとめ:AWS Glue を用いてデータを S3 に抽出しカタログ化
  2. データ加工:dbt-athena でデータ変換
  3. クエリ実行:Athena でクエリ
  4. データ分析:Metabase でのデータ分析とお可視化

将来的に集めるデータ量が増加見込みなので、コスト効率と拡張性の面で S3 での保存は適切かと考えました。
また、開発チュー自体少人数なので、煩雑な管理作業を避けるためにサーバーレスや自動化に注力し、ETL(Extract, Transform, Load)ワークフローは AWS Glue、Athena、dbt で構築しました。

安全かつスマートにインフラを管理

インフラ管理には AWS コンソールで直接操作するのではなく、CloudFormation を活用していますので、CloudFormation と SaaS サービスの連携例も記載しています。

全体図

overall_view.jpeg

1. データのまとめ: Glue を使って データを S3 に格納

<> の囲んだものは適切な設定で置き換えてください。

先に S3 Bucket と Glue Job で利用するロールを作成します。
S3DataLakeBucket: # 今回はこのBucketにはデータ以外に、Glue Job の ファイルや他のライブラリも一緒におきます
  Type: AWS::S3::Bucket
  Properties:
    BucketName: <BUCKET_NAME>
    PublicAccessBlockConfiguration:
      BlockPublicAcls: true
      BlockPublicPolicy: true
      IgnorePublicAcls: true
      RestrictPublicBuckets: true
    VersioningConfiguration:
      Status: Enabled
ETLGlueJobRole: # Glue Job が S3やRDS にアクセスするための Roleです。
  Type: AWS::IAM::Role
  Properties:
    RoleName: etl-glue-job-role
    AssumeRolePolicyDocument:
      Version: '2012-10-17'
      Statement:
        - Effect: Allow
          Principal:
            Service: glue.amazonaws.com
          Action: sts:AssumeRole
    Policies:
      - PolicyName: MyGlueJobPolicy # サンプルのため一部のPolicyは全てのリソースに対して権限を付与
        PolicyDocument:
          Version: '2012-10-17'
          Statement:
            - Effect: Allow
              Action:
                - logs:CreateLogGroup
                - logs:CreateLogStream
                - logs:PutLogEvents
              Resource: arn:aws:logs:*:*:*
            - Effect: Allow
              Action:
                - ssm:GetParameter
              Resource: '*'
            - Effect: Allow
              Action:
                - s3:GetObject
                - s3:ListBucket
                - s3:GetBucketLocation
                - s3:PutObject
                - s3:DeleteObject
              Resource:
                - !GetAtt S3DataLakeBucket.Arn
                - !Join
                  - ''
                  - - !GetAtt S3DataLakeBucket.Arn
                    - /*
            - Effect: Allow
              Action:
                - glue:StartJobRun
              Resource: arn:aws:glue:*:*:job/*
            - Sid: ForGlueCrawler
              Effect: Allow
              Action:
                - glue:*
              Resource:
                - arn:aws:glue:ap-northeast-1:*:catalog
                - arn:aws:glue:ap-northeast-1:*:connection/*
                - arn:aws:glue:ap-northeast-1:<AWS_ACCOUNT_ID>:database/*
                - arn:aws:glue:ap-northeast-1:<AWS_ACCOUNT_ID>:table/*
            - Effect: Allow
              Action:
                - secretsmanager:GetResourcePolicy
                - secretsmanager:GetSecretValue
                - secretsmanager:DescribeSecret
                - secretsmanager:ListSecretVersionIds
              Resource:
                - arn:aws:secretsmanager:ap-northeast-1:<AWS_ACCOUNT_ID>:secret:*
            - Action:
                - ec2:DescribeVpcEndpoints
                - ec2:DescribeRouteTables
                - ec2:DescribeNetworkInterfaces
                - ec2:CreateNetworkInterface
                - ec2:DeleteNetworkInterface
                - ec2:DescribeSecurityGroups
                - ec2:DescribeSubnets
                - ec2:DescribeVpcAttribute
              Resource: '*'
              Effect: Allow
            - Effect: Allow
              Action:
                - ec2:CreateTags
                - ec2:DeleteTags
              Condition:
                ForAllValues:StringEquals:
                  aws:TagKeys:
                    - aws-glue-service-resource
              Resource:
                - arn:aws:ec2:*:*:network-interface/*
                - arn:aws:ec2:*:*:security-group/*
                - arn:aws:ec2:*:*:instance/*
            - Effect: Allow
              Action:
                - athena:StartQueryExecution
                - athena:GetQueryExecution
                - athena:GetQueryResults
              Resource: '*'

RDS のデータを定期的に S3 にエクスポート

RDS のデータは Glue ETL を使って全てのデータを S3 に出力しています。
今回は MySQL のデータを S3 に Parquet として出力する template を例として見てみます。

RDS に接続するための Glue Connection を作成
SampleDBConnectionSecurityGroup:
  Type: AWS::EC2::SecurityGroup
  Properties:
    GroupDescription: 'for db connection'
    GroupName: sgp-glue-connection
    VpcId: <VPC_ID>
    SecurityGroupEgress:
      - IpProtocol: '-1'
        CidrIp: 0.0.0.0/0
      - IpProtocol: '-1'
        CidrIpv6: '::/0'
SampleDBConnectionSecurityGroupIngress:
  Type: AWS::EC2::SecurityGroupIngress
  Properties:
    GroupId: !Ref SampleDBConnectionSecurityGroup
    IpProtocol: tcp
    FromPort: '0'
    ToPort: '65535'
    SourceSecurityGroupId: !Ref SampleDBConnectionSecurityGroup
SampleDBConnection:
  Type: AWS::Glue::Connection
  Properties:
    CatalogId: !Ref AWS::AccountId
    ConnectionInput:
      ConnectionType: JDBC
      ConnectionProperties:
        JDBC_CONNECTION_URL: 'jdbc:mysql://<DB_HOST>:<DB_PORT>/<DB_NAME>' # mysqlの場合
        SECRET_ID: <SECRET_ID> # パスワードは SecretManger で保存されているので、SecretManager の IDで指定する - https://docs.aws.amazon.com/ja_jp/AWSCloudFormation/latest/UserGuide/aws-properties-glue-connection-connectioninput.html#cfn-glue-connection-connectioninput-connectionproperties
      Name: glue-connection
      PhysicalConnectionRequirements:
        SecurityGroupIdList:
          - !Ref SampleDBConnectionSecurityGroup
        SubnetId: <SUBNET_ID>

SecurityGroup は自己参照ルールを追加する必要があります

CloudFormation で Glue Connection を作成すると成功にはなりますが、GlueJobでその connection を利用しようとすると Unable to resolve any valid connection になる場合がありましたので、そうなった場合は AWS コンソールで対象 connection を選択→編集→何も変更せず保存 をするとちゃんと接続できるようになります。

次にGlue Data Catalog で Database を作成します。
履歴データも保存しておきたいので履歴データは別 DB として保存しています。
partition で最新データを管理するのも可能ですが、SQL で最新データを取得時にいちいち partition キーを指定したくないため、別データベースで管理した方が良さそうな気がしたので、別々で管理しています。

SampleDBDatabase:
  Type: AWS::Glue::Database
  Properties:
    CatalogId: !Ref AWS::AccountId
    DatabaseInput:
      Name: sample-db
      Description: 'RDS'
SampleDBHistoryDatabase:
  Type: AWS::Glue::Database
  Properties:
    CatalogId: !Ref AWS::AccountId
    DatabaseInput:
      Name: sample-db-history
      Description: 'DB_HISTORY'

s3://<BUCKET_NAME>/glueJobs/ に以下のファイル2つをアップロードします。

SampleDBGlueJob.py(このファイルはGlue Studio の Visual ETL をベースに全テーブルを出力する形にしています)
import datetime
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from utils_s3 import get_s3_history_prefix, get_s3_latest_prefix

args = getResolvedOptions(
    sys.argv, ["JOB_NAME", "s3Bucket", "s3Folder", "dbConnection"]
)
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args["JOB_NAME"], args)

today_string = datetime.datetime.today().strftime("%Y%m%d")

# get all tables from app db
allTables = glueContext.create_dynamic_frame.from_options(
    connection_type="mysql",
    connection_options={
        "useConnectionProperties": "true",
        "dbtable": "information_schema.tables",
        "connectionName": args["dbConnection"],
    },
    transformation_ctx="MySQLtable_node1",
)
targetTables = (
    allTables.toDF()
    .filter("TABLE_SCHEMA != 'information_schema'")
    .filter("TABLE_TYPE = 'BASE TABLE'")
    .collect()
)

for row in targetTables:
    table_name = row["TABLE_NAME"]
    schema_name = row["TABLE_SCHEMA"]
    print(f"SCHEMA_NAME: {schema_name}, TABLE_NAME: {table_name}")

    ds = glueContext.create_dynamic_frame.from_options(
        connection_type="mysql",
        connection_options={
            "useConnectionProperties": "true",
            "dbtable": table_name,
            "connectionName": args["dbConnection"],
        },
        transformation_ctx="MySQLtable_node1",
    )

    # 最新情報として保存
    latestDataPath = (
        f's3://{args["s3Bucket"]}/{get_s3_latest_prefix(args["s3Folder"], table_name)}/'
    )
    glueContext.purge_s3_path(latestDataPath, {"retentionPeriod": 0})
    glueContext.write_dynamic_frame.from_options(
        frame=ds,
        connection_type="s3",
        connection_options={"path": latestDataPath},
        format="parquet",
    )

    # 履歴情報として保存
    historyDataPath = f's3://{args["s3Bucket"]}/{get_s3_history_prefix(args["s3Folder"], table_name)}/'
    glueContext.purge_s3_path(historyDataPath, {"retentionPeriod": 0})

    source_dyf = glueContext.create_dynamic_frame_from_options(
        connection_type="s3",
        connection_options={"path": latestDataPath, "recurse": True},
        format="parquet",
    )

    glueContext.write_dynamic_frame.from_options(
        frame=source_dyf,
        connection_type="s3",
        connection_options={"path": historyDataPath, "overwrite": True},
        format="parquet",
    )

job.commit()
utils_s3.py も S3にアップロード
import datetime
import pytz
import boto3

tz = pytz.timezone('Asia/Tokyo')
now = datetime.datetime.now(tz)
history_year = now.strftime('%Y')
history_month = now.strftime('%m')
history_day = now.strftime('%d')

s3 = boto3.client("s3")

def get_s3_latest_prefix(prefix, tableName):
    return f'{prefix}/latest/{tableName}';

def get_s3_history_prefix(prefix,tableName):
    return f'{prefix}/history/{tableName}/history_year={history_year}/history_month={history_month}/history_day={history_day}';

def save_local_file_to_s3(bucket, prefix, tableName, fileName):
    """
    ローカルファイルをS3にアップロードする(履歴も保存される)
    - s3://{bucket}/{prefix}/latest/{tableName}/{fileName}
    - s3://{bucket}/{prefix}/history/{tableName}/history_year=YYYY/history_month=MM/history_day=DD/{fileName}
    """
    s3_latest_path = f'{get_s3_latest_prefix(prefix, tableName)}/{fileName}'
    s3.upload_file(fileName, bucket, s3_latest_path)

    s3_history_path = f'{get_s3_history_prefix(prefix, tableName)}/{fileName}'
    s3.copy_object(
        Bucket=bucket,
        CopySource={'Bucket': bucket, 'Key': s3_latest_path},
        Key=s3_history_path
    )

次に、Workflow を作成し、Workflow で Glue Triggers、Glue Crawlers などが自動で発火されるようにします。

SampleDBETLWorkFlow:
  Type: AWS::Glue::Workflow
  Properties:
    Name: sample-db-etl-workflow
SampleDBGlueJob: # RDS のデータを抽出する Glue Job です
  Type: AWS::Glue::Job
  Properties:
    Name: sample-db-glue-job
    Role: !GetAtt ETLGlueJobRole.Arn
    Command:
      Name: glueetl
      ScriptLocation: !Join
        - ''
        - - s3://
          - !Ref S3DataLakeBucket
          - /glueJobs/SampleDBGlueJob.py
    GlueVersion: '4.0'
    WorkerType: G.1X
    NumberOfWorkers: 2
    Connections:
      Connections:
        - !Ref SampleDBConnection
    DefaultArguments:
      '--s3Bucket': !Ref S3DataLakeBucket
      '--s3Folder': bronze/sample-db
      '--dbConnection': !Ref SampleDBConnection
      '--extra-py-files': !Join
        - ''
        - - s3://
          - !Ref S3DataLakeBucket
          - /glueJobs/utils_s3.py
SampleDBGlueJobTrigger: # 毎日出力するための Trigger
  Type: AWS::Glue::Trigger
  Properties:
    Actions:
      - JobName: !Ref SampleDBGlueJob
    Name: sample-db-glue-job-trigger
    Schedule: cron(0 * * * ? *)
    Type: SCHEDULED
    WorkflowName: !Ref SampleDBETLWorkFlow
    StartOnCreation: true
SampleDBCrawler: # 出力されたファイルを Glue Data Catalog に反映するためには、Glue Crawler を使います
  Type: AWS::Glue::Crawler
  Properties:
    DatabaseName: !Ref SampleDBDatabase
    Role: !GetAtt ETLGlueJobRole.Arn
    Targets:
      S3Targets:
        - Path: !Join
            - ''
            - - s3://
              - !Ref S3DataLakeBucket
              - /bronze/sample-db/latest/
    Name: sample-db-crawler
    Configuration: '{"Version":1,"Grouping":{"TableLevelConfiguration":5}}' # 基本こちら設定なくても Glue Crawler の方で階層を自動で判断してくれますが、念の為設定しています
SampleDBCrawlerTrigger:
  Type: AWS::Glue::Trigger
  Properties:
    Actions:
      - CrawlerName: !Ref SampleDBCrawler
    Name: sample-db-crawler-trigger
    Predicate:
      Conditions:
        - LogicalOperator: EQUALS
          JobName: !Ref SampleDBGlueJob
          State: SUCCEEDED # GlueJob のデータ抽出が成功した場合にのみ Crawler を発火させます
      Logical: ANY
    Type: CONDITIONAL
    WorkflowName: !Ref SampleDBETLWorkFlow
    StartOnCreation: true
SampleDBHistoryCrawler:
  Type: AWS::Glue::Crawler
  Properties:
    DatabaseName: !Ref SampleDBHistoryDatabase
    Role: !GetAtt ETLGlueJobRole.Arn
    Targets:
      S3Targets:
        - Path: !Join
            - ''
            - - s3://
              - !Ref S3DataLakeBucket
              - /bronze/sample-db/history/
    Name: sample-db-history-crawler
    Configuration: '{"Version":1,"Grouping":{"TableLevelConfiguration":5,"TableGroupingPolicy":"CombineCompatibleSchemas"},"CreatePartitionIndex":true}' # サンプルDBは 日付で Partitionします
    RecrawlPolicy:
      RecrawlBehavior: CRAWL_NEW_FOLDERS_ONLY # 追加のpartition分だけクロールするように設定
SampleDBHistoryCrawlerTrigger:
  Type: AWS::Glue::Trigger
  Properties:
    Actions:
      - CrawlerName: !Ref SampleDBHistoryCrawler
    Name: sample-db-history-crawler-trigger
    Predicate:
      Conditions:
        - LogicalOperator: EQUALS
          JobName: !Ref SampleDBGlueJob
          State: SUCCEEDED
      Logical: ANY
    Type: CONDITIONAL
    WorkflowName: !Ref SampleDBETLWorkFlow
    StartOnCreation: true

上記でRDSのデータが毎日 S3 に出力されます。
WorkFlow でステータス確認
Screenshot 2024-05-29 at 18.45.13.png

Athenaからもクエリすることができます。
Screenshot 2024-05-29 at 18.26.52.png

社内では簡易データベースなども EC2 で動いており、それらも上記と同じ方法で S3 に出力を行っています。

Kintone のデータを S3 に出力

Kintone データの出力は Glue Python Shell で行います。
Kintone API を叩いて S3 に格納する形です。
GlueJob 以外のリソースは上記の設定とかわりません。

KintoneGlueJob:
  Type: AWS::Glue::Job
  Properties:
    Name: kintone-glue-job
    Role: !GetAtt ETLGlueJobRole.Arn
    Command:
      Name: pythonshell
      PythonVersion: '3.9'
      ScriptLocation: !Join
        - ''
        - - s3://
          - !Ref S3DataWarehouse
          - /glueJobs/KintoneGlueJob.py
    DefaultArguments:
      '--s3Bucket': !Ref S3DataWarehouse
      '--s3Folder': bronze/kintone

      '--extra-py-files': !Join
        - ''
        - - s3://
          - !Ref S3DataWarehouse
          - /glueJobs/utils_s3.py
KintoneGlueJob.py
import json
import os
import sys
import boto3
import datetime
import requests
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
from awsglue.utils import getResolvedOptions
import logging
from dateutil.parser import parse
from utils_s3 import save_local_file_to_s3

s3 = boto3.resource("s3")
ssm = boto3.client("ssm")

args = getResolvedOptions(
    sys.argv,
    ["stage", "s3Bucket", "s3Folder", "ssmKintoneAPIUrlName", "ssmKintoneAPIKeysName"],
)

KINTONE_DOMAIN = ssm.get_parameter(Name=args["ssmKintoneAPIUrlName"])["Parameter"][
    "Value"
]
KINTONE_API_KEYS = json.loads(
    ssm.get_parameter(Name=args["ssmKintoneAPIKeysName"], WithDecryption=True)[
        "Parameter"
    ]["Value"]
)

# 設定自体は DynamoDB に保存した方が良さそう
KINTONE_SYNC_LIST = [
    {
        "tableNameForGlue": "kintone_app",
        "kintoneAppId": "0",
        "kintoneQuery": "order by $id asc",
        "description": "KINTONEテストアプリ",
        "kintoneFields": [
            {
                "field_code": "id",
                "label": "id",
            },
            {
                "field_code": "Name",
                "label": "Name",
            },
            {
                "field_code": "Age",
                "label": "Age",
                "type": "NUMBER",
            },
        ],
    }
]

logger = logging.getLogger()
logger.setLevel(logging.INFO)

# glue で logger を動くようにする
logger.info = lambda msg: print(f"{datetime.datetime.now()}	INFO	{msg}")

errors = []

for sync in KINTONE_SYNC_LIST:
    logger.info(f'start - {sync["tableNameForGlue"]}')

    # cursor 作成
    url = f"{KINTONE_DOMAIN}/k/v1/records/cursor.json"
    headers = {
        "X-Cybozu-API-Token": f"{KINTONE_API_KEYS[sync['kintoneAppId']]}",
        "Content-Type": "application/json",
    }
    fieldCodeList = ["$id"] + [
        field["field_code"] for field in (sync["kintoneFields"] or [])
    ]
    response = requests.post(
        url,
        headers=headers,
        json={
            "app": sync["kintoneAppId"],
            "fields": fieldCodeList,
            "query": sync["kintoneQuery"],
            "size": 500,
        },
    )
    response.raise_for_status()
    cursor = response.json()
    logger.info(cursor)

    writer = None
    fileName = f'{sync["tableNameForGlue"]}.parquet'
    retrieved_records_total = 0

    try:
        while True:
            # cursor からレコード取得
            response = requests.get(url, headers=headers, json=cursor)
            response.raise_for_status()
            result = response.json()

            retrieved_records_total += len(result["records"])
            logger.info(
                {
                    "retrieved_records_total": retrieved_records_total,
                    "next": result["next"],
                }
            )

            schema = pa.schema([], metadata={"description": "test description"})

            # 取得した分だけ parquetに書き込む
            convertedRecordList = []
            for record in result["records"]:
                convertedRecord = {}
                schema = pa.schema([])

                # sort record property by fieldCode
                sorted_items = sorted(
                    record.items(),
                    key=lambda key_value: (
                        fieldCodeList.index(key_value[0])
                        if key_value[0] in fieldCodeList
                        else len(fieldCodeList) + 1
                    ),
                )

                # loop through fields
                for originalFieldCode, typeObj in sorted_items:
                    customField = next(
                        (
                            field
                            for field in sync["kintoneFields"]
                            if field["field_code"] == originalFieldCode
                        ),
                        {},
                    )

                    fieldCode = customField.get("label") or originalFieldCode
                    fieldType = customField.get("type") or typeObj["type"]
                    value = typeObj["value"]
                    try:
                        if fieldType in ["__ID__", "NUMBER"]:
                            convertedRecord[fieldCode] = int(float(value or 0))
                            schema = schema.append(pa.field(fieldCode, pa.int64()))
                        elif fieldType in ["DOUBLE"]:
                            convertedRecord[fieldCode] = float(value or 0)
                            schema = schema.append(pa.field(fieldCode, pa.float64()))
                        elif fieldType in ["DATE"]:
                            convertedRecord[fieldCode] = (
                                None
                                if value is None or value == ""
                                else parse(value).date()
                            )
                            schema = schema.append(pa.field(fieldCode, pa.date32()))
                        elif fieldType in ["DATETIME", "CREATED_TIME", "UPDATED_TIME"]:
                            convertedRecord[fieldCode] = (
                                None
                                if value is None or value == ""
                                else datetime.datetime.strptime(
                                    value, "%Y-%m-%dT%H:%M:%SZ"
                                )
                            )
                            schema = schema.append(
                                pa.field(fieldCode, pa.timestamp("ns"))
                            )
                        elif fieldType in ["TIME"]:
                            # Athena does not support TIME type, so convert to TIMESTAMP
                            convertedRecord[fieldCode] = (
                                None
                                if value is None or value == ""
                                else datetime.datetime.utcfromtimestamp(0).strptime(
                                    value, "%H:%M"
                                )
                            )
                            schema = schema.append(
                                pa.field(fieldCode, pa.timestamp("ns"))
                            )
                        elif fieldType in ["SUBTABLE"]:
                            print("--------------------")
                            print(fieldType)
                            convertedRecord[fieldCode] = (
                                None
                                if value is None or value == ""
                                else json.dumps(value)
                            )
                            schema = schema.append(pa.field(fieldCode, pa.string()))
                        else:
                            convertedRecord[fieldCode] = (
                                None if value is None or value == "" else str(value)
                            )
                            schema = schema.append(pa.field(fieldCode, pa.string()))
                    except Exception as e:
                        logger.info(record)
                        logger.error(
                            f"originalFieldCode: {originalFieldCode}, fieldType: {fieldType}, value: {value}"
                        )
                        raise e

                convertedRecordList.append(convertedRecord)

            if writer is None:
                writer = pq.ParquetWriter(fileName, schema)

            df = pd.DataFrame(convertedRecordList)
            writer.write_table(pa.Table.from_pandas(df, schema=schema))

            if result["next"] == False:
                break

        writer.close()

        # upload file to s3
        save_local_file_to_s3(
            args["s3Bucket"], args["s3Folder"], sync["tableNameForGlue"], fileName
        )

        logger.info(f'end - {sync["tableNameForGlue"]}')
    except Exception as e:
        if writer is not None:
            writer.close()

        errors.append(e)
        logger.info(f'errorOccurred - {sync["tableNameForGlue"]}')
    finally:
        # delete cursor
        requests.delete(url, headers=headers, json=cursor)
        logger.info("cursor deleted")

        # delete local file
        os.remove(fileName)
        logger.info(f"local file deleted")

# log errors
if len(errors) > 0:
    for error in errors:
        logger.error(error)

Freee

Freee も Kintone 同様で Freee API を叩いて結果を S3 に格納しています

2. データ加工:dbt-athena でデータ変換

glue job で dbt を設定し dbt でデータの処理や変換を行います
DbtGlueJob:
  Type: AWS::Glue::Job
  Properties:
    Name: dbt-glue-job
    Role: !GetAtt ETLGlueJobRole.Arn
    Command:
      Name: pythonshell
      PythonVersion: '3.9'
      ScriptLocation: !Join
        - ''
        - - s3://
          - !Ref S3DataLakeBucket
          - /glueJobs/dbt.py
    ExecutionProperty:
      MaxConcurrentRuns: 4
    DefaultArguments:
      '--additional-python-modules': dbt-athena-community
      '--s3Bucket': !Ref S3DataLakeBucket
      '--dbtRepositoryZipKey': glue/dbt/my_dbt_project.zip # dbt projectの設定は https://docs.getdbt.com/docs/introduction 参照
      '--models': '*'
DbtTrigger:
  Type: AWS::Glue::Trigger
  Properties:
    Actions:
      - JobName: !Ref DbtGlueJob
        Arguments:
          '--models': sample-db
    Name: dbt-trigger
    Predicate:
      Conditions:
        - LogicalOperator: EQUALS
          JobName: !Ref SampleDBGlueJob
          State: SUCCEEDED
      Logical: ANY
    Type: CONDITIONAL
    WorkflowName: !Ref SampleDBETLWorkFlow
    StartOnCreation: true
dbt.py
import subprocess
import sys
import boto3
from dbt.cli.main import dbtRunner
from awsglue.utils import getResolvedOptions
import os

args = getResolvedOptions(
    sys.argv, ["s3Bucket", "dbtRepositoryZipKey", "models"],
)

s3 = boto3.resource('s3')
s3.Bucket(args["s3Bucket"]).download_file(Filename="dbt.zip", Key=args["dbtRepositoryZipKey"])

cmd = r"unzip dbt.zip"
print(subprocess.check_call(cmd.split()))

dbt = dbtRunner()
dbtArgs = ['run']

silverPrefix = f'silver.{args["models"]}';
goldPrefix = f'gold.{args["models"]}';

print(f'running {silverPrefix} models')
dbtArgs.append('--models')
dbtArgs.append(silverPrefix)
dbtRunnerResult = dbt.invoke(dbtArgs)
print('silver completed')

print(f'running {goldPrefix} models')
dbtArgs.pop()
dbtArgs.append(f'gold.{args["models"]}')
dbtRunnerResult = dbt.invoke(dbtArgs)
print('gold completed')

dbt project で SQL を書いて bronze から silver や gold にデータ変換を行う感じです。

上記で、Workflow 自体は以下のような感じで動きます。

3.クエリ実行:Athena でクエリ

GlueJob で出力したデータや dbt で変換されたデータは Athena でクエリしています

4. データ分析:Metabase でのデータ分析とお可視化

現状 metabase を使ってデータ分析や・可視化を行っています。
(ただ、metabase もサーバの管理が必要なのと、いくつかのグラフで表示速度が最適ではないため、今後は QuickSight に移行するのも検討している状況です。)

まとめ

  1. 多様なデータソースの集約

    • 今まで、AWS RDS, Kintone, Freee など、バラバラに散乱していいるデータを一元管理することで、ビジネスインサイトなの精度向上や意思決定の迅速化に繋ぐのではないかと思っています。
  2. サーバレスアーキテクチャの採用

    • AWS Glue と Athena を使用してサーバーレスな ETL ワークフローが実現できました
    • 少人数の開発チームでも運用が容易であり、コスト効率と拡張性を重視したインフラ設計になっていると思います
    • 今後は、GA や Google Search Console のデータも AWS AppFlow 経由で管理する形で、より容易は構築と正確なインサイトが得られるようにしていこうと考えております。
1
2
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
2

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?