1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

【30日でAWSをマスターするハンズオン問題集】Day12:AWS Batch(Fargate)でサーバレスなアクセスログの集計バッチを作成してみよう

Last updated at Posted at 2025-12-06

はじめに

こちらの投稿は2025 Japan AWS Jr.Championsの有志メンバーで作成した『30日間で主要AWSサービスを構築できるようになる』をテーマにした初学者向けのハンズオン問題集のDAY12になります!
問題集の趣旨や作成に至るまでの経緯は以下の記事をご覧いただければと思います。

📝 概要

項目 内容
所要時間 約2時間
メインサービス AWS Batch(Fargate), Amazon S3, Amazon ECR, AWS Lambda
学べること AWS Batch(Fargate)の基礎知識, S3イベント通知とLambdaを利用したAWS Batchジョブの起動方法
想定費用 100円以下

🎯 課題内容

アクセスログを集計・分析し、システムへのアクセス履歴の集計結果を出力するバッチ処理機能を実装します。

📊 アーキテクチャ図

image.png

🔧 実装機能

  • EC2等のアプリケーション・システム等から日次で出力されるアクセスログを取得し、アクセス回数・ステータスコード(成功・失敗)等を集計します
  • S3へのログ出力をトリガーにAWS Batchを起動し、AWS Batchがログを集計します
  • AWS Batchはログの集計結果を集計結果出力用のS3バケットに出力します
  • ユーザは本機能実装にあたり、実行サーバ・OSの管理を望んでおらず、マネージドで実行したいと考えています
  • また、本機能は実行時間の期限が無いため、リソースが利用可能なタイミングでの実行で問題ありません

💡 実装のヒント

AWS Batchの設定方法

AWS Batchでジョブを実行するためには、「環境」「ジョブキュー」「ジョブ定義」の3つを作成する必要があります。「環境」→「ジョブキュー」→「ジョブ定義」の順で作成してみましょう。
また、AWSBatchでは処理内容を定義したDockerイメージを指定する必要があります。AWS Batch作成の前にAmazon ECRを作成し、GitHub Actionでイメージをビルド・Amazon ECRにプッシュしましょう。

AWS Batchの起動方法

ログ出力をトリガーにAWS Batchを起動する際は、S3のイベント通知機能を利用してみましょう。ただし、イベント通知で直接AWS Batchは起動出来ないため、AWS LambdaからAWS Batchを起動する必要があります。

リソースの権限設定の方法

AWS Lmbda・AWS BatchのそれぞれでIAMロールを設定する必要があります。AWS LambdaはAWS Batchを起動するための権限、AWS BatchのECSタスク実行ロールではECSタスク基盤がECRやCloudWatchにアクセスするための権限、タスクロールでは、実装する処理の中でS3からファイルをダウンロードして再度アップロードする権限を追加する必要があります。

✅ 完成後のチェックポイント

  • S3にアクセスログを格納すると、AWS Batchでジョブが実行される
  • AWS Batchのジョブ完了後、S3にサマリ結果が出力されている
  • プラットフォーム機能として、「Fargate」が選択されている

🧰 使用資材

No.1 GitHub Action実行用Yamlファイル(オリジナルで作成しても良いですが、こちらのサンプルを使用しても構いません。)

★ecr_push.yaml

name: Build and Push to ECR

on:
push:
  branches:
    - main
  paths:
    - 'Dockerfile'
    - '**/*.py'

jobs:
build-and-push:
  name: Build & Push Image to ECR
  runs-on: ubuntu-latest
  permissions:
    id-token: write   # OIDC 利用に必須
    contents: read    # ソース取得用

  env:
    # ======== 作成したリソース・アカウントIDを記載する =========
    AWS_ACCOUNT_ID: "AWSアカウントIDを入力"
    AWS_REGION: "デプロイ先リージョンを入力"
    ECR_REPOSITORY: "ECRリポジトリ名を入力"
    IAM_ROLE_NAME: "GitHub Action用IAMロールを入力"
    IMAGE_TAG: "latest"
    # ================================================================
  steps:
    - name: Checkout repository
      uses: actions/checkout@v4

    # GitHub→AWS OIDCでロールを引き受け
    - name: Configure AWS credentials
      uses: aws-actions/configure-aws-credentials@v4
      with:
        role-to-assume: arn:aws:iam::${{ env.AWS_ACCOUNT_ID }}:role/${{ env.IAM_ROLE_NAME }}
        aws-region: ${{ env.AWS_REGION }}

    # ECRへログイン
    - name: Login to Amazon ECR
      id: login-ecr
      uses: aws-actions/amazon-ecr-login@v2

    # Docker Build & Push
    - name: Build, Tag, and Push Image
      run: |
        IMAGE_URI=${{ steps.login-ecr.outputs.registry }}/${{ env.ECR_REPOSITORY }}:${{ env.IMAGE_TAG }}
        echo "Building image: $IMAGE_URI"
        docker build -t $IMAGE_URI .
        docker push $IMAGE_URI

このコードを使用して、Git Hub Actionからアクセスログ集計用のソースコードをビルド・Amazon ECRにプッシュ出来ます。

No2. アクセスログ集計用ソースコード(オリジナルで作成しても良いですが、こちらのサンプルを使用しても構いません。)

★aggregate_accesslog.py

# アクセスログ集計バッチ(S3上のアクセスログを集計し、集計結果をS3に出力する)
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import argparse
import boto3
import csv
import io
from datetime import datetime

s3 = boto3.client("s3")

TS_FORMATS = [
  "%Y-%m-%d %H:%M:%S",  # 例: 2025-09-18 00:00:00
  "%Y-%m-%d %H:%M",     # 例: 2025-09-18 00:00
  "%Y/%m/%d %H:%M:%S",  # 例: 2025/9/18 0:00:00
  "%Y/%m/%d %H:%M",     # 例: 2025/9/18 0:00
]

def parse_ts(s: str):
  s = s.strip()
  for fmt in TS_FORMATS:
      try:
          return datetime.strptime(s, fmt)
      except ValueError:
          continue
  raise ValueError(f"unsupported timestamp format: {s}")

def aggregate(bucket: str, prefix: str):
  min_time = None
  max_time = None
  total = 0
  ok_200 = 0
  ng_other = 0
  skipped = 0

  kwargs = {"Bucket": bucket, "Prefix": prefix}
  while True:
      resp = s3.list_objects_v2(**kwargs)
      for obj in resp.get("Contents", []):
          key = obj["Key"]
          if obj.get("Size", 0) == 0:
              continue
          body = s3.get_object(Bucket=bucket, Key=key)["Body"].read().decode("utf-8-sig", errors="ignore")

          reader = csv.reader(io.StringIO(body))
          header = next(reader, None)
          if not header or not header[0].strip().lower().startswith("timestamp"):
              reader = csv.reader(io.StringIO(body))

          for row in reader:
              if not row or len(row) < 3:
                  skipped += 1
                  continue
              ts_str, _, status_str = row[0], row[1], row[2]
              try:
                  ts = parse_ts(ts_str)
                  status = int(str(status_str).strip())
              except Exception:
                  skipped += 1
                  continue

              total += 1
              if status == 200:
                  ok_200 += 1
              else:
                  ng_other += 1

              if min_time is None or ts < min_time:
                  min_time = ts
              if max_time is None or ts > max_time:
                  max_time = ts

      if resp.get("IsTruncated"):
          kwargs["ContinuationToken"] = resp["NextContinuationToken"]
      else:
          break

  return {
      "period_start": min_time.strftime("%Y-%m-%d %H:%M:%S") if min_time else "",
      "period_end":   max_time.strftime("%Y-%m-%d %H:%M:%S") if max_time else "",
      "total": total,
      "status_200": ok_200,
      "status_other": ng_other,
      "skipped_rows": skipped,
  }

def upload_result(bucket: str, key: str, result: dict):
  output = io.StringIO()
  writer = csv.writer(output, lineterminator="\n")
  writer.writerow(["metric","value"])
  for k, v in result.items():
      writer.writerow([k, v])
  s3.put_object(
      Bucket=bucket,
      Key=key,
      Body=output.getvalue().encode("utf-8"),
      ContentType="text/csv; charset=utf-8"
  )

def main():
  parser = argparse.ArgumentParser(description="access log aggregator")
  parser.add_argument("--input-bucket", required=True)
  parser.add_argument("--input-prefix", required=True)
  parser.add_argument("--output-bucket", required=True)
  parser.add_argument("--output-key", required=True, help="例: reports/summary.csv")
  args = parser.parse_args()

  result = aggregate(args.input_bucket, args.input_prefix)
  upload_result(args.output_bucket, args.output_key, result)
  print("[OK] 集計完了:", result)

if __name__ == "__main__":
  main()

このソースコードをビルドしてイメージを生成します。

No3. Dockerイメージビルド用Dockerファイル(オリジナルで作成しても良いですが、こちらのサンプルを使用しても構いません。)

★Dockerfile

# Dockerfile
FROM public.ecr.aws/amazonlinux/amazonlinux:2023

# OSパッケージ
RUN dnf -y update && dnf -y install python3 python3-pip && dnf clean all

# Python依存(boto3 だけでOK)
RUN pip3 install --no-cache-dir boto3

# アプリ配置
WORKDIR /app
COPY aggregate_accesslog.py /app/

# 実行
ENTRYPOINT ["python3", "/app/aggregate_accesslog.py"]

このソースコードをビルドしてイメージを生成します。

No.4 AWS Lambda用ソースコード(オリジナルで作成しても良いですが、こちらのサンプルを使用しても構いません。)
import os
import json
import boto3
from datetime import datetime
from urllib.parse import unquote_plus

batch = boto3.client("batch")

# 環境変数で設定
JOB_QUEUE_ARN       = os.environ["BATCH_JOB_QUEUE_ARN"]
JOB_DEFINITION_ARN  = os.environ["BATCH_JOB_DEFINITION_ARN"]
OUTPUT_BUCKET       = os.environ["OUTPUT_BUCKET"]
JOB_NAME_PREFIX     = os.environ.get("JOB_NAME_PREFIX", "accesslog-aggregate")

def lambda_handler(event, context):
  print("Event:", json.dumps(event))

  results = []

  for rec in event.get("Records", []):
      bucket = rec["s3"]["bucket"]["name"]
      key    = unquote_plus(rec["s3"]["object"]["key"])  # URLエンコード解除

      # 入力ファイル名から日付を抽出
      # ファイル形式: accesslog_YYYYMMDD.csv
      base_name = os.path.basename(key)
      date_str = base_name.replace("accesslog_", "").replace(".csv", "")
      
      # 出力ファイル: YYYYMMDD/summary_YYYYMMDD.csv
      output_key = f"{date_str}/summary_{date_str}.csv"

      # ジョブ名はユニークに(日付+timestamp)
      job_name = f"{JOB_NAME_PREFIX}-{date_str}-{int(datetime.utcnow().timestamp())}"

      response = batch.submit_job(
          jobName=job_name,
          jobQueue=JOB_QUEUE_ARN,
          jobDefinition=JOB_DEFINITION_ARN,
          containerOverrides={
              "command": [
                  "--input-bucket", bucket,
                  "--input-prefix", key,
                  "--output-bucket", OUTPUT_BUCKET,
                  "--output-key", output_key
              ]
          }
      )
      print("Submitted job:", response["jobId"], "for key:", key)
      results.append({
          "jobName": job_name,
          "jobId": response["jobId"],
          "inputKey": key,
          "outputKey": output_key
      })

  return {"submitted": results}

このソースコードをAWS Lambda関数にデプロイ・実行することで、AWS Batchジョブを起動出来ます。

No.5 テスト実施用アクセスログ(オリジナルで作成しても良いですが、提供したアクセスログ集計用ソースコードを使用している場合は本ファイルを利用してください。)

↓以下のようなイメージのフォーマットでCSVファイルを作成。
(生成AIを利用してテストデータを作成すると便利です。)
image.png

このアクセスログファイルをS3に格納して実装した機能の動作確認を実施してください。

🔗 リファレンスリンク

🛠️ 解答・構築手順(クリックで開く)

解答と構築手順を見る

✅ ステップ1:事前準備①(GitHub Action用IDプロバイダ・IAMロールの作成・アクセスログ出力用S3バケット作成)

IDプロバイダ作成

  • AWS マネジメントコンソールを開く
  • 「IAM」>「IDプロバイダ」>「プロバイダを追加」を押下し、以下の値を入力してIDプロバイダを作成
項目 設定値
プロバイダのタイプ OpenID Connect
プロバイダのURL https://token.actions.githubusercontent.com
対象者 sts.amazonaws.com

IAMロール作成

  • 「IAM」>「ロール」>「ロールを作成」を押下し、以下値を入力する
項目 設定値
信頼されたエンティティタイプ ウェブアイデンティティ
アイデンティティプロバイダー https://token.actions.githubusercontent.com
Audience sts.amazonaws.com
GitHub organization 自分のGitHubアカウント名
  • GitHub ActionからECRにイメージをPushするため、ECRのフルアクセス権限を付与したポリシーを追加する。(IAMでの細かい権限制御は本問題の本筋から逸れるため割愛。)

  • IAMロール名を入力し、IAMロールを作成。

S3バケット作成

  • 「S3」>「汎用バケット」>「バケットを作成」を押下
  • バケットの作成画面にて任意のバケット名を入力し、バケットを作成(その他設定はデフォルトで問題無い)
    ※ 機能実装後、テストの際に本S3バケットへアクセスログをアップロードします。

✅ ステップ2:集計結果の格納用S3バケット作成

  • AWSマネジメントコンソールを開く
  • 「S3」>「汎用バケット」>「バケットを作成」を押下
  • バケットの作成画面にて任意のバケット名を入力し、バケットを作成(その他設定はデフォルトで問題無い)
    ※ 機能実装後、テストの際に本S3バケットに集計結果が出力されます。

✅ ステップ3:ECRリポジトリの作成・Dockerイメージをプッシュ

  • AWSマネジメントコンソールを開く
  • 「Elastic Container Registry」>「プライベートレジストリ」>「リポジトリ」>「リポジトリを作成」を押下
  • リポジトリ名を入力し、リポジトリを作成
  • Git Hubにて利用するリポジトリに「利用するGitリポジトリ名/.github/workflows/ecr_push.yaml」を配置
    (使用資材のNo.1を利用する)
  • 利用するGitリポジトリの直下に「aggregate_accesslog.py」と「Dockerfile」をコミットする。
    (使用資材のNo.2とNo3を利用する)
  • GitリポジトリのActionタブより、Workflowが正常終了していることを確認後、再度AWSコンソールより作成したリポジトリのページに遷移し、イメージがプッシュされていることを確認する。

※ 今回はGitHubからイメージをプッシュする方法を取りましたが、イメージをECRにプッシュする方法として、Day11の以下ハンズオン問題集も参考になります!

【30日でAWSをマスターするハンズオン問題集】Day11:ECR 脆弱性スキャンと CodePipeline で安全な自動デプロイを構築しよう

✅ ステップ4:AWS Batch用のIAMロールの作成

  1. AWSマネジメントコンソールを開く
  2. IAMコンソールより、以下IAMロールを作成する
項目 設定値
信頼されたエンティティタイプ/ユースケース AWSのサービス / Elastic Container Service Task
ポリシー AmazonECSTaskExecutionRolePolicy, AmazonS3FullAccess(今回はAWSマネージドのIAMポリシーを利用する)

✅ ステップ5:AWS Batchの設定

  • AWSマネジメントコンソールを開く

  • 「AWS Batch」>「環境」>「環境を作成」>「コンピューティング環境」を選択
    image.jpg

  • 以下設定値を入力して環境を作成

項目 設定値
コンピューティング環境設定 Fargate
名前 任意の名前を設定
サービスリンクロール デフォルト(AWSServiceRoleForBatch)

image.png

項目 設定値
Fargate Spot容量を使用 有効化
最大vCPU 1(今回は最小でOK)

image.png

項目 設定値
仮想プライベートクラウド(VPC)ID 任意のVPCを選択(Defaultでも問題無い)
サブネット インターネット通信可能なサブネットを選択(Defaultで問題無い)
セキュリティグループ アウトバウンドルールでPort:443 送信先0.0.0.0/0のアウトバウンドルールを持つセキュリティグループを指定。(Defaultで問題無い)

image.png

  • 「AWS Batch」>「ジョブキュー」>「作成」を選択
    image.png

  • 以下設定値を入力してジョブキューを作成

項目 設定値
オーケストレーションタイプ Fargate
名前 任意の名前を設定
優先度 1(今回は1つのジョブキューのみを作成するため
接続されたコンピューティング環境 前の手順で作成したコンピューティング環境を設定

image.png

image.png

  • 「AWS Batch」>「ジョブ定義」>「作成」を押下
    image.png

  • 以下の値を入力してジョブ定義を作成する(記載の無い項目はデフォルト値でOK)

項目 設定値
オーケストレーションタイプ Fargate
オーケストレーションタイプ Fargate
エフェメラルドストレージ 21(今回は最小値で問題無い)
実行ロール ステップ4にて作成したIAMロールを選択
ジョブの試行 1

image.png

image.png

image.png

項目 設定値
イメージ  ECRにPushしたイメージのURIを指定
ジョブロール  ステップ4にて作成したIAMロールを選択
vCPU 1.0(最小でOK)
メモリ 2GiB(最小でOK)

image.png
image.png

項目 設定値
ログドライバー  awslogs

image.png

✅ ステップ6:AWS Lambdaの設定・S3イベント通知の有効化

  • Lambda関数を作成(記載の無い項目はデフォルト設定で問題無い)
項目 設定値
ランタイム  awslogs
実行ロール  作成したIAMロール
  • AWS Lambda用ソースコードをデプロイする(使用資材のNo.4を使用)
  • 環境変数で以下を設定する
項目 設定値
BATCH_JOB_DEFINITION_ARN  作成したAWS Batchのジョブ定義のARNを設定
BATCH_JOB_QUEUE_ARN  作成したジョブキューのARNを設定
OUTPUT_BUCKET  作成した集計結果格納用のS3バケット名を設定

image.png

  • 「S3」>「集計結果格納用のS3バケット」を選択

  • 「プロパティ」>「イベント通知」>「イベント通知を作成」を選択
    image.png

  • 以下設定値を入力し、イベント通知を作成

項目 設定値
名前  任意の名前を設定
イベントタイプ  PUT(s3:ObjectCreated:Put)
送信先  Lambda関数(前の手順で作成したLambda関数)

✅ ステップ6:動作確認

  • アクセスログ出力用のS3バケットにアクセスログのサンプルを格納する(使用資材のNo.5を利用)
  • AWS Batchコンソールを開き、「ジョブ」>「結果を更新」を押下して、ジョブが開始していることを確認する
    image.png
  • ジョブを選択>ジョブ設定>プラットフォーム機能を確認し、「Fargate」となっていることを確認する
    image.png
  • ジョブのステータスがSucceededになったら、集計結果格納用のS3バケットに集計結果が出力されていることを確認する
    image.png

🧹 片付け(リソース削除)

  • Lambda 関数を削除
  • AWS Batchを削除(「ジョブ定義」→「ジョブキュー」→「環境」の順番で削除)
  • S3を削除(アクセスログ出力用・集計結果格納用)
  • ECRリポジトリを削除
  • IAMを削除(ステップ①で作成したIAMロール・IDプロバイダ、ステップ④で作成したIAMロール)

🏁 おつかれさまでした!

この課題ではAWS Batchの基本的な設定方法及び、S3イベント通知・AWS Lambdaと連携したAWS Batchジョブの起動方法を学ぶことが出来ます。
今回は簡単な設定・処理内容でしたが、AWS Batchを使えば処理時間に左右されないバッチ処理をサーバレスに実行することが出来ます。

1
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?