本記事は「AWSを触り始めたばかりで、Lambda/EventBridge/IAMにまだ慣れていない方」を想定しています。
こんにちは。
ソーイ株式会社、入社1年目の村上です。
会社の業務として稼働中のサービスから出力されるCloudWatchLogsのログファイルを、タスクを担当することになりました。
実装を行う理由としては、運用上、CloudWatch Logsの保持期間だけでは不十分で、長期保管・調査用にS3へ定期保存する必要があったためです。
ソフトウェアエンジニア経験1年未満のAWS初心者としては歯応えのあるタスクでしたが、無事に実装できたため備忘録として流れを残したいという試みです。
この記事でわかること
- CloudWatch LogsをLambdaでS3に定期保存する方法
- EventBridge(cron)で毎日00:00(JST)に実行する設定
- AWS初心者がハマった権限・時刻・ページネーションの注意点
要件を実現するための実装手順
1.CloudWatchLogsからS3にログファイルを送信するLambda関数の作成
2.EventBridgeを使用し毎日00:00にログの保存処理が行われるように起動周期を設定
全体構成
今回使用するAWSサービスとしては、以下の通りです。
Amazon CloudWatch Logs
AWS上のログを集中管理し、後から検索・分析できるサービス。
AWS S3
ログやデータを長期保存・共有するためのスケーラブルなストレージサービス。
AWS Lambda
サーバーレスでコードを実行できるサービス。今回作成するのは、
・Amazon CloudWatch Logsに保存されているログファイルをS3に保存する関数
Amazon EventBridge
イベントや定期実行をトリガーに処理を自動化するサービス
全体処理フロー
全体の流れとして、以上のものを実装していきます。
Lambdaコードを作る
まずLambda関数を作成しないと話にならないので作成します。
今回はPython 3.14.0を使用しました。
要件として2つ、
・CloudWatchLogs内特定のログファイルを指定でき、S3へファイル輸送できる
・ファイル輸送時、作成されるディレクトリはログファイル名/年月日とする
を考えて実装しました(全体コードは記事末尾に掲載)。
特に下記のような処理を実装し、要件を満たしました。
CloudWatch Logsエクスポート時の注意点・制限(公式より)
実装の前に把握しておくべきことがあります。
def create_export_task(
s3_bucket_name: str,
log_group_name: str,
from_datetime: datetime,
to_datetime: datetime,
environment_name: str,
date_str: str,
):
"""
CloudWatch Logs を S3 にエクスポートする
"""
# ロググループ名を S3 ディレクトリ用に簡略化
log_group_simple = log_group_name.split('/')[-1]
# S3 出力先:
# {env}/logs/{log_group}/{YYYY-MM-DD}/
s3_path_prefix = (
f"{environment_name}/logs/{log_group_simple}/{date_str}"
)
response = client.create_export_task(
logGroupName=log_group_name,
fromTime=int(from_datetime.timestamp() * 1000),
to=int(to_datetime.timestamp() * 1000),
destination=s3_bucket_name,
destinationPrefix=s3_path_prefix,
)
return response["taskId"]
CloudWatch LogsのS3エクスポートは同時に1件しか実行できない仕様です。
今回は「1日1回の定期実行」かつ「対象ロググループ数が限定的」という前提で運用する想定だったため、同時実行制御は行っていません。
CloudWatch Logs のエクスポート処理は、
1つでも実行中(RUNNING)または開始待ち(PENDING)のタスクが存在すると、
次のエクスポートを「予約」することすらできません。
この制限を超えて複数のエクスポートタスクを同時に作成しようとすると、
LimitExceededException が発生します。
これは create_export_task API の公式仕様として定められています。
ドキュメントの中では、複数のロググループを頻繁にエクスポートする場合や、
処理完了の保証が必要な場合は、
describe_export_tasksを使い一定間隔で状態を確認する仕組み(ポーリング)を追加することが推奨されます。
稼働させた場合に作成されるディレクトリを可視化するとこんな感です。
s3://example-log-backup-bucket/
└ production/
└ logs/
└ ExampleAppProdWorker/
└ 2025-01-12/
├ aws-logs-write-test(動作時に自動で作成されるファイル)
├ 000000000000.gz
ここで身に覚えのないaws-logs-write-testとの記載があります。
私個人、自分で設定していないファイルができるもので戸惑いました。
結果としては、LambdaがS3にアクセス可能か確認するために自動で作成されるテスト用ファイルのようで、問題ないとのこと。
環境ファイルごとにマッピングして関数登録
# 環境判定用マッピング
ENVIRONMENT_MAPPING = {
"local": ["ExampleAppLocal"],
"dev": ["ExampleAppDevApi", "ExampleAppDevWorker"],
"staging": ["ExampleAppStg"],
"production": ["ExampleAppProdApi", "ExampleAppProdWorker"],
}
def get_environment_from_log_group(log_group_name: str) -> str | None:
"""
ロググループ名から環境を判定する
"""
for env, keywords in ENVIRONMENT_MAPPING.items():
for keyword in keywords:
if keyword in log_group_name:
return env
return None
こちらはロググループ命名規則を前提にした環境自動判定を行っています。
今回はファイルの追加が今後ないと考えられる箇所だったため、ハードコーディングしようかと考えもしましたが、万が一改修が行われることもあり得たので可読性も加味してこの形に落ち着きました。
UTC -> JSTへの対応
JST = timezone(timedelta(hours=9), "JST")
now = datetime.now().astimezone(JST)
yesterday = now.date() - timedelta(days=1)
from_time = datetime.combine(
yesterday, datetime.min.time(), JST
)
to_time = datetime.combine(
yesterday, datetime.max.time(), JST
)
00:00時に動く際、機能のデータを取得する際にJSTで参照できないことが気になりました。
そこでUTC->JSTへコード上で変換し、
昨日の00:00 ~ 23:59までのデータを閲覧し反映させることに成功しました。
generatorを用いたスマートな実装
CloudWatch Logsに存在するロググループ一覧を取得するためのAWS APIであるdescribe_log_groupsは
1回で最大50件、nextToken によるページネーションが必要ということで、
def get_all_log_groups():
all_groups = []
while True:
response = logs_client.describe_log_groups(...)
all_groups.extend(response["logGroups"])
if "nextToken" not in response:
break
return all_groups
というような全部リストに詰め込む実装だと、
全ロググループをメモリへ格納しないとループ処理が始まらず、
結果処理に遅れが生まれることを予想できました。
そんな問題を回避するために、
Python言語機能の1つであるgeneratorを使用しました。
def iter_log_groups(next_token: str | None = None):
params = {"limit": 50}
if next_token:
params["nextToken"] = next_token
response = logs_client.describe_log_groups(**params)
# 今回のページ分を返す
yield from response.get("logGroups", [])
# 次のページがあれば再帰的に取得
if "nextToken" in response:
yield from iter_log_groups(response["nextToken"])
generatorを使用することで、
1件確認し処理->次へ進むというフローになり、
同時に持つデータ量を最小化することに成功、
処理時間の短縮に貢献できました。
コードファイルをLambdaにデプロイ&テスト
コードが無事実装できたので、
Lambdaにコードを反映させてテストを行いました。
コードエディタ上でコードを書いた場合、Lambdaに反映する必要があります。
今回の自分がこのパターンでした。
反映する方法としては
・AWSコンソールでLambda関数のページにあるコードソースへ手動貼り付け
・AWS CLIを使用してソースファイルをデプロイ
と便利な方法もありましたが、
自分はAWS CLIの存在を完全に忘れていたため
手動で張り付ける方法を使い実装しました。
手動でコンソールにコードを反映させる
Lambdaで関数作成後、コードを手動で反映させた方法ですが、
古典的な方法です。コピー&ペーストした後にデプロイを行いました。
手順
- コードソースのファイルに作成したコードを張り付ける
- 貼り付けを行った後「Deploy」ボタンを押下(Ctrl+Shift+Uでも可)
これでLambdaの実行環境にコードが反映され、関数が利用可能になります。

左側がコード編集画面、右上の「Deploy」ボタンを押すことで反映されます
IAM Roleの作成
Lambdaを動作させる前に、まず権限の設定をしなければなりません。
必要な権限は
・CloudWatch Logsからログ情報を取得・エクスポートする権限
・S3にログファイルを書き込むための権限
これら2つの要素をカバーするIAM Roleの作成が必要でした。
今回は本タスク用に新規でIAM Roleを作成します。
既存のIAM Roleを変更する方法もありますが、
権限の影響範囲を限定し、将来的な変更や削除の安全性を考慮すると、
用途ごとに新規で追加する方が安全かつ扱いやすいと判断しました。
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"logs:DescribeLogGroups",
"logs:CreateExportTask",
"logs:DescribeExportTasks"
],
"Resource": "*"
},
{
"Effect": "Allow",
"Action": [
"s3:PutObject",
"s3:GetBucketLocation"
],
"Resource": [
"arn:aws:s3:::<log-backup-bucket>/*",
"arn:aws:s3:::<log-backup-bucket>"
]
}
]
}
以上の権限を設定したRoleに名前をつけて保存。
何の権限かわかりやすいRole名にすることをおすすめします(1敗)。
動作テスト
設定が完了したら動作テストを行いましょう。
Lambda関数の動作テストは、テストケースを作成した後、それにのっとり動かす形となります。ケース作成は簡単で、Lambdaコンソール画面の「Test」ボタン(またはCtrl+Shift+I)を押下すると、「テストイベント新規作成タブ」が開きます。


Testを開き、イベントの選択か、新規作成を行う。
テストイベント新規作成タブ(Create new test event)
上記で新規作成を選んだ場合、
Create new test eventというイベントを新規作成するページが出てきます。
Create new test eventは、実際のトリガーを用意せずに、
Lambdaに渡されるeventを擬似的に作成し、関数の動作を確認できるテスト機能です。
Create new test eventのメリット・デメリット
Create new test eventは手軽にLambdaの動作確認ができる便利な機能ですが、
すべてを確認できるわけではありません。
用途を理解したうえで使い分けることが重要です。
メリット
- 実際のトリガーを用意せずに、Lambda関数単体の動作確認ができる
- IAM権限が不足している場合、実行時にエラーとして早期に気づける
デメリット
- API GatewayやEventBridgeなど、実トリガー経由の挙動は確認できない
- 認証処理やネットワーク(VPCなど)を含めた動作確認には向かない
- 本番環境と完全に同じ条件でのテストにはならない
今回は主に、ロジックの分岐やIAM権限が正しいかを確認する目的で使用します。
テスト設定
Event Name
このイベントの名前入力欄です。
イベント内容の判別ができるわかりやすい名前を心がけてください。
Invocation type
Lambda関数をどの方式で実行するかを指定します。
指定方式として2種類、
-
Synchronous(同期実行)
実行が終了次第、成功/エラーがその場で表示される。
-> デバックに向いている。 -
Asynchronous(非同期実行)
結果がCloudWatch Logsなどに記述される。
-> 本番に近い形の動作確認に向いている。
今回はSynchronousを選択しました。
Event sharing settings
作成したテストイベントを誰が使えるか、使用権限の設定です。
こちらも2種類あり、
-
Private(プライベート)
自分だけ使える設定。 -
Shareable(共有)
同じAWSアカウント内のIAMユーザーと共有可能
チームで同じテストイベントを使い回せる
今回はどちらでも良かったですが、Shareableを選択しました。
Template - optional
よくあるイベント形式のサンプルJSONを自動生成してくれます。
event は、「Lambda に何が起きたか」を伝えるためのデータです。
どこから呼ばれたか(API、定期実行、S3など)によって、中身が変わります。
詳しくはドキュメントを確認してください。
今回はLambda関数の処理を確認したかっただけなので空で行いました。
これらの設定完了後、画面右上にある「Save」ボタン押下時に保存と設定が完了します。
もう1度「Test」ボタンを押下すると、テストが行われます。
結果として、以下のものが表示されればOKです(動作はテンプレートのもの)。
Status: Succeeded
Test Event Name: example_test
Response:
{
"statusCode": 200,
"body": "\"Hello from Lambda!\""
}
このようにコード反映->設定->動作テストという流れでコードと権限の確認ができます。
テスト動作時ハマったポイント(IAM / S3)
最初S3の保存がうまくいきませんでした。
問題はIAM Role設定、下記の場所で起こっていました。
"Resource": [
"arn:aws:s3:::<log-backup-bucket>/*",
"arn:aws:s3:::<log-backup-bucket>"
]
/*付きの方はオブジェクトARNと呼ばれ、
バケット内部のオブジェクトに関する権限を持つものです。
当時バケット用権限と、オブジェクト用権限が別になっている事実を理解していなかったため、権限設定が足りておらず、オブジェクトの操作ができない状態でした。
EventBridgeの設定
テスト稼働を行い、実際にS3へ保存されていることを確認しました。
コードが正常だとわかったので、
毎日決まった時間に処理が動くようにスケジューラーの設定を行いました。
今回はLambdaを定期的に動かすだけなので、
EventBridge用のIAM権限追加は必要ありません。
ルールの作成
EventBridgeにアクセスし、
「スケジュールされたルール (レガシー)」を選択。

「スケジュールされたルールを作成」から作成を行う。
設定時気を付ける個所としてはスケジュールパターンの設定である。

今回の場合は「特定の時刻に実行されるきめ細かいスケジュール」を選択します。
cron式といい分 時 日 月 曜日 年の入力が求められます。
そしてAWSの時間関係はUTCです。
よって今回「毎日00:00に動かす」とするならば、以下の設定を行う必要がありました。
cron(0 15 * * ? *)
ここでJSTの要領で
cron(0 0 * * ? *)
と設定してしまうと、日本時間で09:00に動く設定となるため注意です。
もう1つ、スケジュールパターンとして通常レートで実行されるスケジュールがあります。
こちらはrate式で設定できるもので、
「前回実行からの経過時間基準」で設定可能です。
rate(1 day)
上記だと前回実行から1日経過したタイミングで動作する設定ができます。
cron式、rate式はAWSのドキュメントにも詳細な記載があります。
cron式の作成にはツールを使うと便利です。
今回は00:00ぴったりに動かしたかったため、cronによる実装を行いました。
設定ができたら後は00:00にエラーなく動くことを願って祈るのみです。
今回はうまくいきました。
まとめ
今回は、CloudWatch Logsに保存されている特定のログをS3に保存するためのLambda関数を実装し、EventBridgeによるスケジュール実装までを行いました。
AWS初心者ということもあり、特にIAM権限の設計には苦戦しましたが、「なぜこの権限が必要なのか」を都度確認しながら設定を進めたことで、単に動かすだけでなく、権限とサービスの関係を理解する良い機会になりました。
Lambdaの実行方式やテスト方法についても理解が深まりました。
今後については、業務を通してAWSの基礎や運用に関する理解を深めつつ、個人としては、自分の強みであるAI分野の知識を活かしたAWS開発にも自主学習の一環として取り組んでいきたいと考えています。
今回の実装を通して多くのことを学ぶ一方で、自身の知識や経験の足りなさも改めて実感しました。今後も実装と振り返りを繰り返しながら、経験を積んでいきたいと考えます。
付録
from datetime import datetime, timedelta, timezone
import boto3
import os
# ========= 設定 =========
AWS_REGION = "ap-northeast-1"
# S3 バケット名(環境変数優先)
S3_BUCKET_NAME = os.environ.get(
"LOG_EXPORT_BUCKET",
"example-log-backup-bucket"
)
# JST タイムゾーン
JST = timezone(timedelta(hours=9), "JST")
logs_client = boto3.client("logs", region_name=AWS_REGION)
# ロググループ名 → 環境名のマッピング(サンプル)
ENVIRONMENT_MAPPING = {
"local": ["ExampleAppLocal"],
"dev": ["ExampleAppDevApi", "ExampleAppDevWorker"],
"staging": ["ExampleAppStg"],
"production": ["ExampleAppProdApi", "ExampleAppProdWorker"],
}
# ========= 環境判定 =========
def get_environment_from_log_group(log_group_name: str) -> str | None:
"""
ロググループ名から環境名を判定する
"""
for env, keywords in ENVIRONMENT_MAPPING.items():
for keyword in keywords:
if keyword in log_group_name:
return env
return None
# ========= ロググループ取得(generator) =========
def iter_log_groups(next_token: str | None = None):
"""
CloudWatch Logs のロググループを generator として取得する
"""
params = {"limit": 50}
if next_token:
params["nextToken"] = next_token
response = logs_client.describe_log_groups(**params)
yield from response.get("logGroups", [])
if "nextToken" in response:
yield from iter_log_groups(response["nextToken"])
# ========= Export タスク作成 =========
def create_export_task(
log_group_name: str,
environment: str,
from_datetime: datetime,
to_datetime: datetime,
date_str: str,
):
"""
CloudWatch Logs を S3 にエクスポートする
"""
# ロググループ名をディレクトリ用に簡略化
log_group_simple = log_group_name.split("/")[-1]
# S3 パス:
# {env}/logs/{log_group}/{YYYY-MM-DD}/
s3_prefix = (
f"{environment}/logs/"
f"{log_group_simple}/"
f"{date_str}"
)
response = logs_client.create_export_task(
logGroupName=log_group_name,
fromTime=int(from_datetime.timestamp() * 1000),
to=int(to_datetime.timestamp() * 1000),
destination=S3_BUCKET_NAME,
destinationPrefix=s3_prefix,
)
return response["taskId"]
# ========= Lambda エントリーポイント =========
def lambda_handler(event, context):
now = datetime.now().astimezone(JST)
# 前日(JST)を対象にする
yesterday = now.date() - timedelta(days=1)
from_time = datetime.combine(
yesterday, datetime.min.time(), JST
)
to_time = datetime.combine(
yesterday, datetime.max.time(), JST
)
date_str = yesterday.strftime("%Y-%m-%d")
# 環境ごとにロググループをまとめる
log_groups_by_env: dict[str, list[str]] = {}
for log_group in iter_log_groups():
name = log_group["logGroupName"]
env = get_environment_from_log_group(name)
if not env:
# 想定外のロググループはスキップ
continue
log_groups_by_env.setdefault(env, []).append(name)
# Export タスクを作成
for env, log_groups in log_groups_by_env.items():
for log_group_name in log_groups:
task_id = create_export_task(
log_group_name,
env,
from_time,
to_time,
date_str,
)
print(
f"Created export task: {task_id} "
f"(env={env}, log_group={log_group_name})"
)
return {
"statusCode": 200,
"body": "Export tasks created"
}
お知らせ
技術ブログを週1〜2本更新中、ソーイをフォローして最新記事をチェック!
https://qiita.com/organizations/sewii

