社内で RDS, Kintone, Freee など様々なサービスを使っていて、それらの情報をー箇所で集約し、分析するためのデータ基盤の必要がありました。
そのために以下の手順でデータ基盤を構築してみました。
- データのまとめ:AWS Glue を用いてデータを S3 に抽出しカタログ化
- データ加工:dbt-athena でデータ変換
- クエリ実行:Athena でクエリ
- データ分析:Metabase でのデータ分析とお可視化
将来的に集めるデータ量が増加見込みなので、コスト効率と拡張性の面で S3 での保存は適切かと考えました。
また、開発チュー自体少人数なので、煩雑な管理作業を避けるためにサーバーレスや自動化に注力し、ETL(Extract, Transform, Load)ワークフローは AWS Glue、Athena、dbt で構築しました。
安全かつスマートにインフラを管理
インフラ管理には AWS コンソールで直接操作するのではなく、CloudFormation を活用していますので、CloudFormation と SaaS サービスの連携例も記載しています。
全体図
1. データのまとめ: Glue を使って データを S3 に格納
<>
の囲んだものは適切な設定で置き換えてください。
S3DataLakeBucket: # 今回はこのBucketにはデータ以外に、Glue Job の ファイルや他のライブラリも一緒におきます
Type: AWS::S3::Bucket
Properties:
BucketName: <BUCKET_NAME>
PublicAccessBlockConfiguration:
BlockPublicAcls: true
BlockPublicPolicy: true
IgnorePublicAcls: true
RestrictPublicBuckets: true
VersioningConfiguration:
Status: Enabled
ETLGlueJobRole: # Glue Job が S3やRDS にアクセスするための Roleです。
Type: AWS::IAM::Role
Properties:
RoleName: etl-glue-job-role
AssumeRolePolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Principal:
Service: glue.amazonaws.com
Action: sts:AssumeRole
Policies:
- PolicyName: MyGlueJobPolicy # サンプルのため一部のPolicyは全てのリソースに対して権限を付与
PolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Action:
- logs:CreateLogGroup
- logs:CreateLogStream
- logs:PutLogEvents
Resource: arn:aws:logs:*:*:*
- Effect: Allow
Action:
- ssm:GetParameter
Resource: '*'
- Effect: Allow
Action:
- s3:GetObject
- s3:ListBucket
- s3:GetBucketLocation
- s3:PutObject
- s3:DeleteObject
Resource:
- !GetAtt S3DataLakeBucket.Arn
- !Join
- ''
- - !GetAtt S3DataLakeBucket.Arn
- /*
- Effect: Allow
Action:
- glue:StartJobRun
Resource: arn:aws:glue:*:*:job/*
- Sid: ForGlueCrawler
Effect: Allow
Action:
- glue:*
Resource:
- arn:aws:glue:ap-northeast-1:*:catalog
- arn:aws:glue:ap-northeast-1:*:connection/*
- arn:aws:glue:ap-northeast-1:<AWS_ACCOUNT_ID>:database/*
- arn:aws:glue:ap-northeast-1:<AWS_ACCOUNT_ID>:table/*
- Effect: Allow
Action:
- secretsmanager:GetResourcePolicy
- secretsmanager:GetSecretValue
- secretsmanager:DescribeSecret
- secretsmanager:ListSecretVersionIds
Resource:
- arn:aws:secretsmanager:ap-northeast-1:<AWS_ACCOUNT_ID>:secret:*
- Action:
- ec2:DescribeVpcEndpoints
- ec2:DescribeRouteTables
- ec2:DescribeNetworkInterfaces
- ec2:CreateNetworkInterface
- ec2:DeleteNetworkInterface
- ec2:DescribeSecurityGroups
- ec2:DescribeSubnets
- ec2:DescribeVpcAttribute
Resource: '*'
Effect: Allow
- Effect: Allow
Action:
- ec2:CreateTags
- ec2:DeleteTags
Condition:
ForAllValues:StringEquals:
aws:TagKeys:
- aws-glue-service-resource
Resource:
- arn:aws:ec2:*:*:network-interface/*
- arn:aws:ec2:*:*:security-group/*
- arn:aws:ec2:*:*:instance/*
- Effect: Allow
Action:
- athena:StartQueryExecution
- athena:GetQueryExecution
- athena:GetQueryResults
Resource: '*'
RDS のデータを定期的に S3 にエクスポート
RDS のデータは Glue ETL を使って全てのデータを S3 に出力しています。
今回は MySQL のデータを S3 に Parquet として出力する template を例として見てみます。
SampleDBConnectionSecurityGroup:
Type: AWS::EC2::SecurityGroup
Properties:
GroupDescription: 'for db connection'
GroupName: sgp-glue-connection
VpcId: <VPC_ID>
SecurityGroupEgress:
- IpProtocol: '-1'
CidrIp: 0.0.0.0/0
- IpProtocol: '-1'
CidrIpv6: '::/0'
SampleDBConnectionSecurityGroupIngress:
Type: AWS::EC2::SecurityGroupIngress
Properties:
GroupId: !Ref SampleDBConnectionSecurityGroup
IpProtocol: tcp
FromPort: '0'
ToPort: '65535'
SourceSecurityGroupId: !Ref SampleDBConnectionSecurityGroup
SampleDBConnection:
Type: AWS::Glue::Connection
Properties:
CatalogId: !Ref AWS::AccountId
ConnectionInput:
ConnectionType: JDBC
ConnectionProperties:
JDBC_CONNECTION_URL: 'jdbc:mysql://<DB_HOST>:<DB_PORT>/<DB_NAME>' # mysqlの場合
SECRET_ID: <SECRET_ID> # パスワードは SecretManger で保存されているので、SecretManager の IDで指定する - https://docs.aws.amazon.com/ja_jp/AWSCloudFormation/latest/UserGuide/aws-properties-glue-connection-connectioninput.html#cfn-glue-connection-connectioninput-connectionproperties
Name: glue-connection
PhysicalConnectionRequirements:
SecurityGroupIdList:
- !Ref SampleDBConnectionSecurityGroup
SubnetId: <SUBNET_ID>
SecurityGroup は自己参照ルールを追加する必要があります
CloudFormation で Glue Connection を作成すると成功にはなりますが、GlueJobでその connection を利用しようとすると Unable to resolve any valid connection
になる場合がありましたので、そうなった場合は AWS コンソールで対象 connection を選択→編集→何も変更せず保存 をするとちゃんと接続できるようになります。
次にGlue Data Catalog で Database を作成します。
履歴データも保存しておきたいので履歴データは別 DB として保存しています。
partition で最新データを管理するのも可能ですが、SQL で最新データを取得時にいちいち partition キーを指定したくないため、別データベースで管理した方が良さそうな気がしたので、別々で管理しています。
SampleDBDatabase:
Type: AWS::Glue::Database
Properties:
CatalogId: !Ref AWS::AccountId
DatabaseInput:
Name: sample-db
Description: 'RDS'
SampleDBHistoryDatabase:
Type: AWS::Glue::Database
Properties:
CatalogId: !Ref AWS::AccountId
DatabaseInput:
Name: sample-db-history
Description: 'DB_HISTORY'
s3://<BUCKET_NAME>/glueJobs/
に以下のファイル2つをアップロードします。
SampleDBGlueJob.py(このファイルはGlue Studio の Visual ETL をベースに全テーブルを出力する形にしています)
import datetime
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from utils_s3 import get_s3_history_prefix, get_s3_latest_prefix
args = getResolvedOptions(
sys.argv, ["JOB_NAME", "s3Bucket", "s3Folder", "dbConnection"]
)
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args["JOB_NAME"], args)
today_string = datetime.datetime.today().strftime("%Y%m%d")
# get all tables from app db
allTables = glueContext.create_dynamic_frame.from_options(
connection_type="mysql",
connection_options={
"useConnectionProperties": "true",
"dbtable": "information_schema.tables",
"connectionName": args["dbConnection"],
},
transformation_ctx="MySQLtable_node1",
)
targetTables = (
allTables.toDF()
.filter("TABLE_SCHEMA != 'information_schema'")
.filter("TABLE_TYPE = 'BASE TABLE'")
.collect()
)
for row in targetTables:
table_name = row["TABLE_NAME"]
schema_name = row["TABLE_SCHEMA"]
print(f"SCHEMA_NAME: {schema_name}, TABLE_NAME: {table_name}")
ds = glueContext.create_dynamic_frame.from_options(
connection_type="mysql",
connection_options={
"useConnectionProperties": "true",
"dbtable": table_name,
"connectionName": args["dbConnection"],
},
transformation_ctx="MySQLtable_node1",
)
# 最新情報として保存
latestDataPath = (
f's3://{args["s3Bucket"]}/{get_s3_latest_prefix(args["s3Folder"], table_name)}/'
)
glueContext.purge_s3_path(latestDataPath, {"retentionPeriod": 0})
glueContext.write_dynamic_frame.from_options(
frame=ds,
connection_type="s3",
connection_options={"path": latestDataPath},
format="parquet",
)
# 履歴情報として保存
historyDataPath = f's3://{args["s3Bucket"]}/{get_s3_history_prefix(args["s3Folder"], table_name)}/'
glueContext.purge_s3_path(historyDataPath, {"retentionPeriod": 0})
source_dyf = glueContext.create_dynamic_frame_from_options(
connection_type="s3",
connection_options={"path": latestDataPath, "recurse": True},
format="parquet",
)
glueContext.write_dynamic_frame.from_options(
frame=source_dyf,
connection_type="s3",
connection_options={"path": historyDataPath, "overwrite": True},
format="parquet",
)
job.commit()
utils_s3.py も S3にアップロード
import datetime
import pytz
import boto3
tz = pytz.timezone('Asia/Tokyo')
now = datetime.datetime.now(tz)
history_year = now.strftime('%Y')
history_month = now.strftime('%m')
history_day = now.strftime('%d')
s3 = boto3.client("s3")
def get_s3_latest_prefix(prefix, tableName):
return f'{prefix}/latest/{tableName}';
def get_s3_history_prefix(prefix,tableName):
return f'{prefix}/history/{tableName}/history_year={history_year}/history_month={history_month}/history_day={history_day}';
def save_local_file_to_s3(bucket, prefix, tableName, fileName):
"""
ローカルファイルをS3にアップロードする(履歴も保存される)
- s3://{bucket}/{prefix}/latest/{tableName}/{fileName}
- s3://{bucket}/{prefix}/history/{tableName}/history_year=YYYY/history_month=MM/history_day=DD/{fileName}
"""
s3_latest_path = f'{get_s3_latest_prefix(prefix, tableName)}/{fileName}'
s3.upload_file(fileName, bucket, s3_latest_path)
s3_history_path = f'{get_s3_history_prefix(prefix, tableName)}/{fileName}'
s3.copy_object(
Bucket=bucket,
CopySource={'Bucket': bucket, 'Key': s3_latest_path},
Key=s3_history_path
)
次に、Workflow を作成し、Workflow で Glue Triggers、Glue Crawlers などが自動で発火されるようにします。
SampleDBETLWorkFlow:
Type: AWS::Glue::Workflow
Properties:
Name: sample-db-etl-workflow
SampleDBGlueJob: # RDS のデータを抽出する Glue Job です
Type: AWS::Glue::Job
Properties:
Name: sample-db-glue-job
Role: !GetAtt ETLGlueJobRole.Arn
Command:
Name: glueetl
ScriptLocation: !Join
- ''
- - s3://
- !Ref S3DataLakeBucket
- /glueJobs/SampleDBGlueJob.py
GlueVersion: '4.0'
WorkerType: G.1X
NumberOfWorkers: 2
Connections:
Connections:
- !Ref SampleDBConnection
DefaultArguments:
'--s3Bucket': !Ref S3DataLakeBucket
'--s3Folder': bronze/sample-db
'--dbConnection': !Ref SampleDBConnection
'--extra-py-files': !Join
- ''
- - s3://
- !Ref S3DataLakeBucket
- /glueJobs/utils_s3.py
SampleDBGlueJobTrigger: # 毎日出力するための Trigger
Type: AWS::Glue::Trigger
Properties:
Actions:
- JobName: !Ref SampleDBGlueJob
Name: sample-db-glue-job-trigger
Schedule: cron(0 * * * ? *)
Type: SCHEDULED
WorkflowName: !Ref SampleDBETLWorkFlow
StartOnCreation: true
SampleDBCrawler: # 出力されたファイルを Glue Data Catalog に反映するためには、Glue Crawler を使います
Type: AWS::Glue::Crawler
Properties:
DatabaseName: !Ref SampleDBDatabase
Role: !GetAtt ETLGlueJobRole.Arn
Targets:
S3Targets:
- Path: !Join
- ''
- - s3://
- !Ref S3DataLakeBucket
- /bronze/sample-db/latest/
Name: sample-db-crawler
Configuration: '{"Version":1,"Grouping":{"TableLevelConfiguration":5}}' # 基本こちら設定なくても Glue Crawler の方で階層を自動で判断してくれますが、念の為設定しています
SampleDBCrawlerTrigger:
Type: AWS::Glue::Trigger
Properties:
Actions:
- CrawlerName: !Ref SampleDBCrawler
Name: sample-db-crawler-trigger
Predicate:
Conditions:
- LogicalOperator: EQUALS
JobName: !Ref SampleDBGlueJob
State: SUCCEEDED # GlueJob のデータ抽出が成功した場合にのみ Crawler を発火させます
Logical: ANY
Type: CONDITIONAL
WorkflowName: !Ref SampleDBETLWorkFlow
StartOnCreation: true
SampleDBHistoryCrawler:
Type: AWS::Glue::Crawler
Properties:
DatabaseName: !Ref SampleDBHistoryDatabase
Role: !GetAtt ETLGlueJobRole.Arn
Targets:
S3Targets:
- Path: !Join
- ''
- - s3://
- !Ref S3DataLakeBucket
- /bronze/sample-db/history/
Name: sample-db-history-crawler
Configuration: '{"Version":1,"Grouping":{"TableLevelConfiguration":5,"TableGroupingPolicy":"CombineCompatibleSchemas"},"CreatePartitionIndex":true}' # サンプルDBは 日付で Partitionします
RecrawlPolicy:
RecrawlBehavior: CRAWL_NEW_FOLDERS_ONLY # 追加のpartition分だけクロールするように設定
SampleDBHistoryCrawlerTrigger:
Type: AWS::Glue::Trigger
Properties:
Actions:
- CrawlerName: !Ref SampleDBHistoryCrawler
Name: sample-db-history-crawler-trigger
Predicate:
Conditions:
- LogicalOperator: EQUALS
JobName: !Ref SampleDBGlueJob
State: SUCCEEDED
Logical: ANY
Type: CONDITIONAL
WorkflowName: !Ref SampleDBETLWorkFlow
StartOnCreation: true
上記でRDSのデータが毎日 S3 に出力されます。
WorkFlow でステータス確認
社内では簡易データベースなども EC2 で動いており、それらも上記と同じ方法で S3 に出力を行っています。
Kintone のデータを S3 に出力
Kintone データの出力は Glue Python Shell で行います。
Kintone API を叩いて S3 に格納する形です。
GlueJob 以外のリソースは上記の設定とかわりません。
KintoneGlueJob:
Type: AWS::Glue::Job
Properties:
Name: kintone-glue-job
Role: !GetAtt ETLGlueJobRole.Arn
Command:
Name: pythonshell
PythonVersion: '3.9'
ScriptLocation: !Join
- ''
- - s3://
- !Ref S3DataWarehouse
- /glueJobs/KintoneGlueJob.py
DefaultArguments:
'--s3Bucket': !Ref S3DataWarehouse
'--s3Folder': bronze/kintone
'--extra-py-files': !Join
- ''
- - s3://
- !Ref S3DataWarehouse
- /glueJobs/utils_s3.py
KintoneGlueJob.py
import json
import os
import sys
import boto3
import datetime
import requests
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
from awsglue.utils import getResolvedOptions
import logging
from dateutil.parser import parse
from utils_s3 import save_local_file_to_s3
s3 = boto3.resource("s3")
ssm = boto3.client("ssm")
args = getResolvedOptions(
sys.argv,
["stage", "s3Bucket", "s3Folder", "ssmKintoneAPIUrlName", "ssmKintoneAPIKeysName"],
)
KINTONE_DOMAIN = ssm.get_parameter(Name=args["ssmKintoneAPIUrlName"])["Parameter"][
"Value"
]
KINTONE_API_KEYS = json.loads(
ssm.get_parameter(Name=args["ssmKintoneAPIKeysName"], WithDecryption=True)[
"Parameter"
]["Value"]
)
# 設定自体は DynamoDB に保存した方が良さそう
KINTONE_SYNC_LIST = [
{
"tableNameForGlue": "kintone_app",
"kintoneAppId": "0",
"kintoneQuery": "order by $id asc",
"description": "KINTONEテストアプリ",
"kintoneFields": [
{
"field_code": "id",
"label": "id",
},
{
"field_code": "Name",
"label": "Name",
},
{
"field_code": "Age",
"label": "Age",
"type": "NUMBER",
},
],
}
]
logger = logging.getLogger()
logger.setLevel(logging.INFO)
# glue で logger を動くようにする
logger.info = lambda msg: print(f"{datetime.datetime.now()} INFO {msg}")
errors = []
for sync in KINTONE_SYNC_LIST:
logger.info(f'start - {sync["tableNameForGlue"]}')
# cursor 作成
url = f"{KINTONE_DOMAIN}/k/v1/records/cursor.json"
headers = {
"X-Cybozu-API-Token": f"{KINTONE_API_KEYS[sync['kintoneAppId']]}",
"Content-Type": "application/json",
}
fieldCodeList = ["$id"] + [
field["field_code"] for field in (sync["kintoneFields"] or [])
]
response = requests.post(
url,
headers=headers,
json={
"app": sync["kintoneAppId"],
"fields": fieldCodeList,
"query": sync["kintoneQuery"],
"size": 500,
},
)
response.raise_for_status()
cursor = response.json()
logger.info(cursor)
writer = None
fileName = f'{sync["tableNameForGlue"]}.parquet'
retrieved_records_total = 0
try:
while True:
# cursor からレコード取得
response = requests.get(url, headers=headers, json=cursor)
response.raise_for_status()
result = response.json()
retrieved_records_total += len(result["records"])
logger.info(
{
"retrieved_records_total": retrieved_records_total,
"next": result["next"],
}
)
schema = pa.schema([], metadata={"description": "test description"})
# 取得した分だけ parquetに書き込む
convertedRecordList = []
for record in result["records"]:
convertedRecord = {}
schema = pa.schema([])
# sort record property by fieldCode
sorted_items = sorted(
record.items(),
key=lambda key_value: (
fieldCodeList.index(key_value[0])
if key_value[0] in fieldCodeList
else len(fieldCodeList) + 1
),
)
# loop through fields
for originalFieldCode, typeObj in sorted_items:
customField = next(
(
field
for field in sync["kintoneFields"]
if field["field_code"] == originalFieldCode
),
{},
)
fieldCode = customField.get("label") or originalFieldCode
fieldType = customField.get("type") or typeObj["type"]
value = typeObj["value"]
try:
if fieldType in ["__ID__", "NUMBER"]:
convertedRecord[fieldCode] = int(float(value or 0))
schema = schema.append(pa.field(fieldCode, pa.int64()))
elif fieldType in ["DOUBLE"]:
convertedRecord[fieldCode] = float(value or 0)
schema = schema.append(pa.field(fieldCode, pa.float64()))
elif fieldType in ["DATE"]:
convertedRecord[fieldCode] = (
None
if value is None or value == ""
else parse(value).date()
)
schema = schema.append(pa.field(fieldCode, pa.date32()))
elif fieldType in ["DATETIME", "CREATED_TIME", "UPDATED_TIME"]:
convertedRecord[fieldCode] = (
None
if value is None or value == ""
else datetime.datetime.strptime(
value, "%Y-%m-%dT%H:%M:%SZ"
)
)
schema = schema.append(
pa.field(fieldCode, pa.timestamp("ns"))
)
elif fieldType in ["TIME"]:
# Athena does not support TIME type, so convert to TIMESTAMP
convertedRecord[fieldCode] = (
None
if value is None or value == ""
else datetime.datetime.utcfromtimestamp(0).strptime(
value, "%H:%M"
)
)
schema = schema.append(
pa.field(fieldCode, pa.timestamp("ns"))
)
elif fieldType in ["SUBTABLE"]:
print("--------------------")
print(fieldType)
convertedRecord[fieldCode] = (
None
if value is None or value == ""
else json.dumps(value)
)
schema = schema.append(pa.field(fieldCode, pa.string()))
else:
convertedRecord[fieldCode] = (
None if value is None or value == "" else str(value)
)
schema = schema.append(pa.field(fieldCode, pa.string()))
except Exception as e:
logger.info(record)
logger.error(
f"originalFieldCode: {originalFieldCode}, fieldType: {fieldType}, value: {value}"
)
raise e
convertedRecordList.append(convertedRecord)
if writer is None:
writer = pq.ParquetWriter(fileName, schema)
df = pd.DataFrame(convertedRecordList)
writer.write_table(pa.Table.from_pandas(df, schema=schema))
if result["next"] == False:
break
writer.close()
# upload file to s3
save_local_file_to_s3(
args["s3Bucket"], args["s3Folder"], sync["tableNameForGlue"], fileName
)
logger.info(f'end - {sync["tableNameForGlue"]}')
except Exception as e:
if writer is not None:
writer.close()
errors.append(e)
logger.info(f'errorOccurred - {sync["tableNameForGlue"]}')
finally:
# delete cursor
requests.delete(url, headers=headers, json=cursor)
logger.info("cursor deleted")
# delete local file
os.remove(fileName)
logger.info(f"local file deleted")
# log errors
if len(errors) > 0:
for error in errors:
logger.error(error)
Freee
Freee も Kintone 同様で Freee API を叩いて結果を S3 に格納しています
2. データ加工:dbt-athena でデータ変換
DbtGlueJob:
Type: AWS::Glue::Job
Properties:
Name: dbt-glue-job
Role: !GetAtt ETLGlueJobRole.Arn
Command:
Name: pythonshell
PythonVersion: '3.9'
ScriptLocation: !Join
- ''
- - s3://
- !Ref S3DataLakeBucket
- /glueJobs/dbt.py
ExecutionProperty:
MaxConcurrentRuns: 4
DefaultArguments:
'--additional-python-modules': dbt-athena-community
'--s3Bucket': !Ref S3DataLakeBucket
'--dbtRepositoryZipKey': glue/dbt/my_dbt_project.zip # dbt projectの設定は https://docs.getdbt.com/docs/introduction 参照
'--models': '*'
DbtTrigger:
Type: AWS::Glue::Trigger
Properties:
Actions:
- JobName: !Ref DbtGlueJob
Arguments:
'--models': sample-db
Name: dbt-trigger
Predicate:
Conditions:
- LogicalOperator: EQUALS
JobName: !Ref SampleDBGlueJob
State: SUCCEEDED
Logical: ANY
Type: CONDITIONAL
WorkflowName: !Ref SampleDBETLWorkFlow
StartOnCreation: true
dbt.py
import subprocess
import sys
import boto3
from dbt.cli.main import dbtRunner
from awsglue.utils import getResolvedOptions
import os
args = getResolvedOptions(
sys.argv, ["s3Bucket", "dbtRepositoryZipKey", "models"],
)
s3 = boto3.resource('s3')
s3.Bucket(args["s3Bucket"]).download_file(Filename="dbt.zip", Key=args["dbtRepositoryZipKey"])
cmd = r"unzip dbt.zip"
print(subprocess.check_call(cmd.split()))
dbt = dbtRunner()
dbtArgs = ['run']
silverPrefix = f'silver.{args["models"]}';
goldPrefix = f'gold.{args["models"]}';
print(f'running {silverPrefix} models')
dbtArgs.append('--models')
dbtArgs.append(silverPrefix)
dbtRunnerResult = dbt.invoke(dbtArgs)
print('silver completed')
print(f'running {goldPrefix} models')
dbtArgs.pop()
dbtArgs.append(f'gold.{args["models"]}')
dbtRunnerResult = dbt.invoke(dbtArgs)
print('gold completed')
dbt project で SQL を書いて bronze から silver や gold にデータ変換を行う感じです。
上記で、Workflow 自体は以下のような感じで動きます。
3.クエリ実行:Athena でクエリ
GlueJob で出力したデータや dbt で変換されたデータは Athena でクエリしています
4. データ分析:Metabase でのデータ分析とお可視化
現状 metabase を使ってデータ分析や・可視化を行っています。
(ただ、metabase もサーバの管理が必要なのと、いくつかのグラフで表示速度が最適ではないため、今後は QuickSight に移行するのも検討している状況です。)
まとめ
-
多様なデータソースの集約
- 今まで、AWS RDS, Kintone, Freee など、バラバラに散乱していいるデータを一元管理することで、ビジネスインサイトなの精度向上や意思決定の迅速化に繋ぐのではないかと思っています。
-
サーバレスアーキテクチャの採用
- AWS Glue と Athena を使用してサーバーレスな ETL ワークフローが実現できました
- 少人数の開発チームでも運用が容易であり、コスト効率と拡張性を重視したインフラ設計になっていると思います
- 今後は、GA や Google Search Console のデータも AWS AppFlow 経由で管理する形で、より容易は構築と正確なインサイトが得られるようにしていこうと考えております。