LoginSignup
2
0

データ活用基盤を作ってみたその8(構築・データ収集-Lambda)

Last updated at Posted at 2023-11-02

はじめに

こんにちは。NTTデータ先端技術の@tsuchidahrkです。
本記事は、データ活用基盤を作ってみた連載記事のその8です。
本シリーズの取り組みの内容についてはその1(構成シナリオ)をご覧ください。
前の記事は、その7(構築・IAM)からご覧ください。
本記事では、今回作成したデータ活用基盤データ収集処理の設計、構築方法、データ収集処理の実装について紹介します。

目次

  • データ収集処理設計
  • データ収集処理の流れ
  • 構築方法
  • データ収集処理の実装
  • まとめ

データ収集処理の設計

データ活用基盤を構築する際に欠かせないデータ収集処理の設計について紹介します。
本システムでは、その1の機能・非機能要望を満たせるように設計しました。
データ収集処理に関連する機能・非機能要望は以下になります。

項目 要望
機能 - 月次ジョブLambdaで日本郵便からzipファイルダウンロードしてS3に保存する
非機能 可用性 新構成のデータ活用基盤側は全てマネージドサービスを使用すること
性能/拡張性 SmallStart時点で具体的な性能目標は定義しない。ただし、今後の拡大かに備えて、スケール可能なリソースを利用する
運用・保守性 営業時間帯 10:30~18:00にオンライン作業ができること。

上記の機能、非機能要望から以下のように設計しました。

項目 設計内容 理由
収集サービス 収集については、Lambdaを利用。月次ジョブの実行には、EventBridgeとStepFunctionを利用。 収集については、機能要望で指定されていたため。月次ジョブの実行については、マネージドサービスを利用する必要があったため。
収集タイミング 毎月1日の0:00に収集。タイムゾーンはAsia/Tokyo オンライン作業に影響を与えない時間帯を選んだため。
収集先 日本郵便のサイト 機能要望で指定されていたため。
収集対象 都道府県一覧の全国一括データ
ken_all.zip
機能要望で指定されていたため。
収集対象の格納先 S3バケット 機能要望で指定されていたため。

データ収集処理の流れ

構成図

TechLearning記事_構成図.PNG

手順

  1. 毎月1日の0:00にEventBridge Schedulerが起動し、Step Functionのワークフローを動作させます。
  2. ワークフロージョブのLambdaを起動し、処理を実行します。
  3. Lambdaは、郵便局サイトにアクセスして、「ken_all」というファイル名のZIPファイルをローカルストレージにダウンロードします。その後、S3上の加工前バケットにダウンロードしたZIPファイルをLambdaのローカルストレージからアップロードします。
  4. 後続のデータ加工処理にワークフロージョブが遷移します。

構築方法

今回作成しているAWSリソースはTerraformを用いたInfrastructure as Code(IaC)で作成しています。以下に作成したTerraform構成ファイル(jobs.tf)の内容について記載します。

データ収集処理

Lambdaリソースの作成

Lambdaリソースで実行するスクリプトが配置されたS3バケットとファイル名を記載しています。
また、スクリプトの動作環境となるランタイムのバージョン設定とスクリプト内のハンドラー登録を行うよう定義しています。

jobs.tf
resource "aws_lambda_function" "biwa-prod-lambda-function-zip-001" {
  function_name = "biwa-prod-lambda-function-zip-001"
  role = aws_iam_role.biwa-prod-iam-role-lambda-execution-001.arn

  s3_bucket = "biwa-prd-s3-bucket-scripts-001"
  s3_key = "lambda_function.zip"

  runtime = "python3.9"
  handler = "lambda_function.lambda_handler"
}

月次ジョブ

EventBridge Schedulerリソースの作成

スケジュール設定と実行ターゲットの設定をしています。
スケジュール設定では、タイムゾーンに東京を指定し、cron形式で毎月1日の0:00にジョブが実行するよう定義しています。
また、実行ターゲットの設定では、Step Functions WorkFlowを実行対象として定義しています。

jobs.tf
resource "aws_scheduler_schedule" "biwa-prod-EventBridge-scheduler-monthly-001" {
  name = "biwa-prod-EventBridge-scheduler-monthly-001"

  flexible_time_window {
    mode = "OFF"
  }

  schedule_expression          = "cron(0 1 * ? *)"
  schedule_expression_timezone = "Asia/Tokyo"

  target {
    arn = "arn:aws:states:ap-northeast-1:125336375984:stateMachine:biwa-prod-sfn-statemachine-zip-001"
    role_arn = aws_iam_role.biwa-prod-iam-role-EventBridge-execution-001.arn
  }
}

Step Functionsリソースの作成

リソース作成時に、データ収集処理を行うLambda関数の実行と後続処理のデータ加工処理のジョブも定義しています。
データ加工処理のジョブについては、データ活用基盤を作ってみたその10で紹介します。

jobs.tf
resource "aws_sfn_state_machine" "biwa-prod-sfn-statemachine-zip-001" {
  name = "biwa-prod-sfn-statemachine-zip-001"
  role_arn = aws_iam_role.biwa-prod-iam-role-stepFunctions-execution-001.arn

  definition = <<EOF
  {
      "StartAt": "RunExtractionJob",
      "States": {
        "RunExtractionJob": {
          "Type": "Task",
          "Resource": "${aws_lambda_function.biwa-prod-lambda-function-zip-001.arn}",
          "Next": "RunPreprocessingJob"
        },
        "RunPreprocessingJob": {
          "Type": "Task",
          "Resource": "arn:aws:states:::glue:startJobRun.sync",
          "Parameters": {
            "JobName": "biwa-prod-glue-job-zip-001"
          },
          "Retry": [
            {
              "ErrorEquals": ["States.ALL"],
              "IntervalSeconds": 1,
              "MaxAttempts": 3,
              "BackoffRate": 2.0
            }
          ],
          "Next": "WaitForPreprocessingJob"
        },
        "WaitForPreprocessingJob": {
          "Type": "Wait",
          "Seconds": 60,
          "Next" : "RunCrawler"
        },
        "RunCrawler": {
          "Type": "Task",
          "Resource": "arn:aws:states:::aws-sdk:glue:startCrawler",
          "Parameters": {
            "Name": "biwa-prod-glue-crawler-analytics-001"
          },
          "End" : true
        }
      }
  }
  EOF
}

データ収集処理の実装

データ収集処理の流れで説明した手順3の郵便局サイトからZIPファイルを収集しS3バケットへ格納する処理の実装が以下になります。

lambda_function.py
import urllib
import boto3
from botocore.config import Config
import botocore.exceptions
import os
import logging

URL = 'https://www.post.japanpost.jp/zipcode/dl/kogaki/zip/ken_all.zip'
FILENAME = 'ken_all.zip'
DOWNLOAD_PATH = os.path.join('/tmp', FILENAME)

AWS_CONFIG = Config(
    connect_timeout = 120,
    read_timeout = 120
)

S3 = boto3.resource('s3', config=AWS_CONFIG)
BUCKET = S3.Bucket('biwa-prod-s3-bucket-datastorage-001')

def download_zipcode(url, download_path):
    """郵便局データファイルのダウンロード

    郵便局サイトにアクセスし、郵便番号データファイルをダウンロードする。

    :param String url: 郵便局サイトのURL
    :param String download_path: ファイルを書き込むパス
    :return: 戻り値なし
    :raises: URLError: URLが存在しない
    """
    try:
        urllib.request.urlretrieve(url, download_path)
        logging.info('file download success!')
        return None
    except urllib.error.URLError as e:
         print(e)

def upload_s3(bucket, download_path, filename):
    """S3へ郵便局データファイルのアップロード

    S3にアクセスし、ローカルから郵便番号データファイルをアップロードする。

    :param boto3.resources.factory.s3.Bucket Bucket: 郵便局サイトのURL
    :param String download_path: ローカルの郵便番号データファイルのパス
    :param String filename: 郵便番号データファイルのファイル名
    :return: 戻り値なし
    :raises: S3UploadFailedError: S3へのファイルアップロードに失敗
    """
    try:
        bucket.upload_file(download_path, filename)
        logging.info('file upload success!')
        return None
    except boto3.exceptions.S3UploadFailedError as e:
            print(e)

def lambda_handler(event, context):
    """郵便局サイトから郵便番号データファイルをダウンロードし、S3へファイルをアップロードする。

    メイン処理

    :param Dict event: テストの値
    :param awslambdaric.lambda_context.LambdaContext context: ランタイム情報
    :return: 戻り値なし
    :raises: 例外処理なし
    """
    download_zipcode(URL, DOWNLOAD_PATH)
    upload_s3(BUCKET, DOWNLOAD_PATH, FILENAME)

    if os.path.isfile(DOWNLOAD_PATH):
        os.remove(DOWNLOAD_PATH)
        logging.info('file remove success!')
        return None
    
    return None

上記、データ収集処理のプログラムは以下の流れで動作します。

  1. lambda_handlerメソッドからdownload_zipメソッドが呼び出されます。download_zipメソッドより、urllib.request.urlretrieve関数を用いて郵便局サイトにアクセスし、URLリンクへのアクセスができれば郵便番号データファイルをLambdaのローカルストレージにダウンロードします。
  2. lambda_handlerメソッドに戻り、upload_s3メソッドが呼び出されます。upload_s3メソッドより、指定されたS3バケットへLambdaのローカルストレージからアップロードします。
  3. Lambdaのローカルストレージにある郵便番号データファイルを削除します。

まとめ

今回はデータ収集処理の設計や、TerraformのIacを用いてLambdaリソースを作成する方法について紹介しました。
Lambdaを利用したことやAWS上のリソース間を連携させて処理を実装したことは無かったので、AWS上のマネージドサービスを組み合わせて処理を作る楽しさを知ることが出来ました。
今後は複数のデータソースから様々な方法でデータ収集処理を実現出来るようにしていきたいです。

次の記事は、その9(構築・データ蓄積-S3)です。
最後までお読みいただきありがとうございました。

2
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
0