はじめに
こんにちは。NTTデータ先端技術の@tsuchidahrkです。
本記事は、データ活用基盤を作ってみた連載記事のその8です。
本シリーズの取り組みの内容についてはその1(構成シナリオ)をご覧ください。
前の記事は、その7(構築・IAM)からご覧ください。
本記事では、今回作成したデータ活用基盤データ収集処理の設計、構築方法、データ収集処理の実装について紹介します。
目次
- データ収集処理設計
- データ収集処理の流れ
- 構築方法
- データ収集処理の実装
- まとめ
データ収集処理の設計
データ活用基盤を構築する際に欠かせないデータ収集処理の設計について紹介します。
本システムでは、その1の機能・非機能要望を満たせるように設計しました。
データ収集処理に関連する機能・非機能要望は以下になります。
項目 | 要望 | |
---|---|---|
機能 | - | 月次ジョブLambdaで日本郵便からzipファイルダウンロードしてS3に保存する |
非機能 | 可用性 | 新構成のデータ活用基盤側は全てマネージドサービスを使用すること |
性能/拡張性 | SmallStart時点で具体的な性能目標は定義しない。ただし、今後の拡大かに備えて、スケール可能なリソースを利用する | |
運用・保守性 | 営業時間帯 10:30~18:00にオンライン作業ができること。 |
上記の機能、非機能要望から以下のように設計しました。
項目 | 設計内容 | 理由 |
---|---|---|
収集サービス | 収集については、Lambdaを利用。月次ジョブの実行には、EventBridgeとStepFunctionを利用。 | 収集については、機能要望で指定されていたため。月次ジョブの実行については、マネージドサービスを利用する必要があったため。 |
収集タイミング | 毎月1日の0:00に収集。タイムゾーンはAsia/Tokyo | オンライン作業に影響を与えない時間帯を選んだため。 |
収集先 | 日本郵便のサイト | 機能要望で指定されていたため。 |
収集対象 | 都道府県一覧の全国一括データ ken_all.zip |
機能要望で指定されていたため。 |
収集対象の格納先 | S3バケット | 機能要望で指定されていたため。 |
データ収集処理の流れ
構成図
手順
- 毎月1日の0:00にEventBridge Schedulerが起動し、Step Functionのワークフローを動作させます。
- ワークフロージョブのLambdaを起動し、処理を実行します。
- Lambdaは、郵便局サイトにアクセスして、「ken_all」というファイル名のZIPファイルをローカルストレージにダウンロードします。その後、S3上の加工前バケットにダウンロードしたZIPファイルをLambdaのローカルストレージからアップロードします。
- 後続のデータ加工処理にワークフロージョブが遷移します。
構築方法
今回作成しているAWSリソースはTerraformを用いたInfrastructure as Code(IaC)で作成しています。以下に作成したTerraform構成ファイル(jobs.tf)の内容について記載します。
データ収集処理
Lambdaリソースの作成
Lambdaリソースで実行するスクリプトが配置されたS3バケットとファイル名を記載しています。
また、スクリプトの動作環境となるランタイムのバージョン設定とスクリプト内のハンドラー登録を行うよう定義しています。
resource "aws_lambda_function" "biwa-prod-lambda-function-zip-001" {
function_name = "biwa-prod-lambda-function-zip-001"
role = aws_iam_role.biwa-prod-iam-role-lambda-execution-001.arn
s3_bucket = "biwa-prd-s3-bucket-scripts-001"
s3_key = "lambda_function.zip"
runtime = "python3.9"
handler = "lambda_function.lambda_handler"
}
月次ジョブ
EventBridge Schedulerリソースの作成
スケジュール設定と実行ターゲットの設定をしています。
スケジュール設定では、タイムゾーンに東京を指定し、cron形式で毎月1日の0:00にジョブが実行するよう定義しています。
また、実行ターゲットの設定では、Step Functions WorkFlowを実行対象として定義しています。
resource "aws_scheduler_schedule" "biwa-prod-EventBridge-scheduler-monthly-001" {
name = "biwa-prod-EventBridge-scheduler-monthly-001"
flexible_time_window {
mode = "OFF"
}
schedule_expression = "cron(0 1 * ? *)"
schedule_expression_timezone = "Asia/Tokyo"
target {
arn = "arn:aws:states:ap-northeast-1:125336375984:stateMachine:biwa-prod-sfn-statemachine-zip-001"
role_arn = aws_iam_role.biwa-prod-iam-role-EventBridge-execution-001.arn
}
}
Step Functionsリソースの作成
リソース作成時に、データ収集処理を行うLambda関数の実行と後続処理のデータ加工処理のジョブも定義しています。
データ加工処理のジョブについては、データ活用基盤を作ってみたその10で紹介します。
resource "aws_sfn_state_machine" "biwa-prod-sfn-statemachine-zip-001" {
name = "biwa-prod-sfn-statemachine-zip-001"
role_arn = aws_iam_role.biwa-prod-iam-role-stepFunctions-execution-001.arn
definition = <<EOF
{
"StartAt": "RunExtractionJob",
"States": {
"RunExtractionJob": {
"Type": "Task",
"Resource": "${aws_lambda_function.biwa-prod-lambda-function-zip-001.arn}",
"Next": "RunPreprocessingJob"
},
"RunPreprocessingJob": {
"Type": "Task",
"Resource": "arn:aws:states:::glue:startJobRun.sync",
"Parameters": {
"JobName": "biwa-prod-glue-job-zip-001"
},
"Retry": [
{
"ErrorEquals": ["States.ALL"],
"IntervalSeconds": 1,
"MaxAttempts": 3,
"BackoffRate": 2.0
}
],
"Next": "WaitForPreprocessingJob"
},
"WaitForPreprocessingJob": {
"Type": "Wait",
"Seconds": 60,
"Next" : "RunCrawler"
},
"RunCrawler": {
"Type": "Task",
"Resource": "arn:aws:states:::aws-sdk:glue:startCrawler",
"Parameters": {
"Name": "biwa-prod-glue-crawler-analytics-001"
},
"End" : true
}
}
}
EOF
}
データ収集処理の実装
データ収集処理の流れで説明した手順3の郵便局サイトからZIPファイルを収集しS3バケットへ格納する処理の実装が以下になります。
import urllib
import boto3
from botocore.config import Config
import botocore.exceptions
import os
import logging
URL = 'https://www.post.japanpost.jp/zipcode/dl/kogaki/zip/ken_all.zip'
FILENAME = 'ken_all.zip'
DOWNLOAD_PATH = os.path.join('/tmp', FILENAME)
AWS_CONFIG = Config(
connect_timeout = 120,
read_timeout = 120
)
S3 = boto3.resource('s3', config=AWS_CONFIG)
BUCKET = S3.Bucket('biwa-prod-s3-bucket-datastorage-001')
def download_zipcode(url, download_path):
"""郵便局データファイルのダウンロード
郵便局サイトにアクセスし、郵便番号データファイルをダウンロードする。
:param String url: 郵便局サイトのURL
:param String download_path: ファイルを書き込むパス
:return: 戻り値なし
:raises: URLError: URLが存在しない
"""
try:
urllib.request.urlretrieve(url, download_path)
logging.info('file download success!')
return None
except urllib.error.URLError as e:
print(e)
def upload_s3(bucket, download_path, filename):
"""S3へ郵便局データファイルのアップロード
S3にアクセスし、ローカルから郵便番号データファイルをアップロードする。
:param boto3.resources.factory.s3.Bucket Bucket: 郵便局サイトのURL
:param String download_path: ローカルの郵便番号データファイルのパス
:param String filename: 郵便番号データファイルのファイル名
:return: 戻り値なし
:raises: S3UploadFailedError: S3へのファイルアップロードに失敗
"""
try:
bucket.upload_file(download_path, filename)
logging.info('file upload success!')
return None
except boto3.exceptions.S3UploadFailedError as e:
print(e)
def lambda_handler(event, context):
"""郵便局サイトから郵便番号データファイルをダウンロードし、S3へファイルをアップロードする。
メイン処理
:param Dict event: テストの値
:param awslambdaric.lambda_context.LambdaContext context: ランタイム情報
:return: 戻り値なし
:raises: 例外処理なし
"""
download_zipcode(URL, DOWNLOAD_PATH)
upload_s3(BUCKET, DOWNLOAD_PATH, FILENAME)
if os.path.isfile(DOWNLOAD_PATH):
os.remove(DOWNLOAD_PATH)
logging.info('file remove success!')
return None
return None
上記、データ収集処理のプログラムは以下の流れで動作します。
- lambda_handlerメソッドからdownload_zipメソッドが呼び出されます。download_zipメソッドより、urllib.request.urlretrieve関数を用いて郵便局サイトにアクセスし、URLリンクへのアクセスができれば郵便番号データファイルをLambdaのローカルストレージにダウンロードします。
- lambda_handlerメソッドに戻り、upload_s3メソッドが呼び出されます。upload_s3メソッドより、指定されたS3バケットへLambdaのローカルストレージからアップロードします。
- Lambdaのローカルストレージにある郵便番号データファイルを削除します。
まとめ
今回はデータ収集処理の設計や、TerraformのIacを用いてLambdaリソースを作成する方法について紹介しました。
Lambdaを利用したことやAWS上のリソース間を連携させて処理を実装したことは無かったので、AWS上のマネージドサービスを組み合わせて処理を作る楽しさを知ることが出来ました。
今後は複数のデータソースから様々な方法でデータ収集処理を実現出来るようにしていきたいです。
次の記事は、その9(構築・データ蓄積-S3)です。
最後までお読みいただきありがとうございました。