初めに
本記事では、IAM ユーザーのアクセスキーを自動ローテーションし、メールで通知する方法について説明します。
■使用ツール
・Lambda
・EventBridge
■Lambda
【設定手順】
1.Lambda関数を作成
・関数名を入力(例: AccessKeyRotation)
・ランタイムはPythonの最新サポート対象を選択
・「実行ロールの作成」で「基本的なLambdaアクセス権限で新しいロールを作成」を選択
・「関数の作成」ボタンをクリック
2.Lambda関数のコードを入力
・以下のPythonコードを「コードソース」エリアに貼り付ける
※コード内の「IAMユーザー名」と「SNSトピック名」と「アカウントID」のみ修正して使用する
import boto3
import datetime
import time
import os
from botocore.exceptions import ClientError
def lambda_handler(event, context):
# 設定
max_age_minutes = 1 # アクセスキーの最大有効期間(分)
iam = boto3.client('iam')
# 処理対象のユーザー名リスト
target_users = ['IAMユーザー名'] # 実際のIAMユーザー名
# SNSトピックARNを直接指定
topic_arn = 'arn:aws:sns:ap-northeast-1:アカウントID:SNSトピック名' # 実際のアカウントID、SNSトピック名
print(f"SNSトピック '{topic_arn}' を使用します")
# 結果サマリー(通知用)
summary = []
for username in target_users:
print(f"ユーザー {username} のアクセスキーを確認中...")
user_result = f"ユーザー {username} の処理結果:"
try:
# ユーザーのアクセスキー一覧を取得
keys = iam.list_access_keys(UserName=username)['AccessKeyMetadata']
if not keys:
print(f"ユーザー {username} にはアクセスキーがありません")
user_result += "\n - アクセスキーがありません"
summary.append(user_result)
continue
# キーを日付でソート(古い順)
keys.sort(key=lambda k: k['CreateDate'])
# アクティブなキーと非アクティブなキーを分類
active_keys = [k for k in keys if k['Status'] == 'Active']
inactive_keys = [k for k in keys if k['Status'] == 'Inactive']
print(f"ユーザー {username} には {len(active_keys)} 個のアクティブなキーと {len(inactive_keys)} 個の非アクティブなキーがあります")
# 各キーの情報をログに記録
for key in keys:
key_id = key['AccessKeyId']
status = key['Status']
create_date = key['CreateDate']
# キーの経過時間を計算(分単位)
now = datetime.datetime.now(datetime.timezone.utc)
age_delta = now - create_date
age_minutes = age_delta.total_seconds() / 60
user_result += f"\n - キーID: {key_id}, 状態: {status}, 作成日: {create_date.strftime('%Y-%m-%d %H:%M:%S')}, 経過時間: {int(age_minutes)}分"
print(f" - キーID: {key_id}, 状態: {status}, 作成日: {create_date}, 経過時間: {int(age_minutes)}分")
# アクティブキーがなければ作成して終了
if not active_keys:
print(f"ユーザー {username} にはアクティブなキーがありません。新しいキーを作成します。")
# 非アクティブキーがあれば削除
for key in inactive_keys:
iam.delete_access_key(
UserName=username,
AccessKeyId=key['AccessKeyId']
)
print(f" - 非アクティブなキーID {key['AccessKeyId']} を削除しました")
user_result += f"\n → 非アクティブなキーID {key['AccessKeyId']} を削除"
# 新しいキーを作成
new_key = iam.create_access_key(UserName=username)['AccessKey']
print(f" - 新しいキーを作成しました: {new_key['AccessKeyId']}")
user_result += f"\n → 新しいキーを作成: {new_key['AccessKeyId']}"
summary.append(user_result)
continue
# 最も古いアクティブなキーをチェック
oldest_active_key = active_keys[0]
key_id = oldest_active_key['AccessKeyId']
create_date = oldest_active_key['CreateDate']
# キーの経過時間を計算(分単位)
age_delta = now - create_date
age_minutes = age_delta.total_seconds() / 60
# 1分以上経過した最も古いアクティブなキーを処理
if age_minutes >= max_age_minutes:
print(f" - 最も古いキーID {key_id} は {int(age_minutes)}分経過しています。ローテーションします。")
user_result += f"\n → {int(age_minutes)}分経過したため、ローテーションを実行"
# ステップ1: 非アクティブなキーがあれば削除
for key in inactive_keys:
iam.delete_access_key(
UserName=username,
AccessKeyId=key['AccessKeyId']
)
print(f" - 非アクティブなキーID {key['AccessKeyId']} を削除しました")
user_result += f"\n → 非アクティブなキーID {key['AccessKeyId']} を削除"
# ステップ2: 残ったアクティブなキーを無効化
iam.update_access_key(
UserName=username,
AccessKeyId=key_id,
Status='Inactive'
)
print(f" - アクティブなキーID {key_id} を無効化しました")
user_result += f"\n → アクティブなキーID {key_id} を無効化"
# ステップ3: 新たにアクセスキーを発行
new_key = iam.create_access_key(UserName=username)['AccessKey']
new_key_id = new_key['AccessKeyId']
new_secret = new_key['SecretAccessKey']
print(f" - 新しいキーを作成しました: {new_key_id}")
user_result += f"\n → 新しいキーを作成: {new_key_id}"
# 新しいキー情報を通知
try:
sns = boto3.client('sns')
message = f"""
IAMユーザー {username} のアクセスキーがローテーションされました。
■ ローテーション情報
- ユーザー名: {username}
- 新しいアクセスキーID: {new_key_id}
- 新しいシークレットキー: {new_secret}
- 作成日時: {datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
■ 無効化された古いキー情報
- アクセスキーID: {key_id}
- 作成日: {create_date.strftime('%Y-%m-%d %H:%M:%S')}
- 経過時間: {int(age_minutes)}分
※このキー情報は安全に管理し、関連するサービスやアプリケーションの設定を更新してください。
※古いキーは無効化されています。サービスの移行が完了したら、次回の実行時に自動的に削除されます。
"""
response = sns.publish(
TopicArn=topic_arn,
Subject=f"AWS アクセスキーローテーション通知 - {username}",
Message=message
)
message_id = response.get('MessageId')
print(f"ローテーション通知を SNSトピック '{topic_arn}' に送信しました (MessageId: {message_id})")
user_result += f"\n → 通知を SNSトピックに送信しました"
except Exception as sns_error:
print(f"SNS通知エラー: {str(sns_error)}")
user_result += f"\n → SNS通知エラー: {str(sns_error)}"
else:
print(f" - 最も古いキーID {key_id} は {int(age_minutes)}分経過していますが、ローテーション条件 ({max_age_minutes}分以上) に達していません。")
except Exception as e:
error_msg = str(e)
print(f"エラー ({username}): {error_msg}")
user_result += f"\n → エラー: {error_msg}"
summary.append(user_result)
# 処理結果サマリーをログに出力
print("\n=== 処理サマリー ===")
for result in summary:
print(result)
return {
'statusCode': 200,
'body': 'アクセスキーのチェックとローテーションが完了しました'
}
3.Lambdaの実行ロールにIAM権限を追加
・「設定」タブをクリック
・「アクセス権限」セクションで「実行ロール」の名前をクリック
・IAMコンソールが開くので、「アクセス権限を追加」をクリック
・「ポリシーをアタッチ」を選択
・検索ボックスに「IAM」と入力
・「IAMFullAccess」にチェックを入れる
・検索ボックスに「SNS」と入力
・「SNSFullAccess」にチェックを入れる
・「アクセス権限を追加」ボタンをクリック
※最終的に適用されているポリシーが3つ(IAMFullAccess、SNSFullAccess、Lambda関数で自動生成したもの)になっていればOK!
・タイムアウト設定を10分程度に設定
(極端に短いと、EventBridgeで実行したときに中途半端にしか処理できずに終わってしまう。)
■EventBridge
【設定手順】
- EventBridgeでスケジュール設定
・左側のナビゲーションから「スケジュール」を選択
・「スケジュールを作成」ボタンをクリック
・スケジュール名を入力
・説明を入力
・スケジュールパターンでrate ベースのスケジュールを30days(今回は30日ごとのローテーションで想定しています。適宜変更してください。)
・「次へ」をクリック
・ターゲットの詳細
・頻繁に使用される APIで「AWS Lambda Invoke」を選択
・関数で先ほど作成した AccessKeyRotation を選択
・「次へ」をクリック
・スケジュールを有効化
・スケジュール完了後のアクションで「NONE」
・アクセス許可でこのスケジュールの新しいロールを作成
・内容を確認し「スケジュールの保存」をクリック
まとめ
ボタン一つで自動ローテーションできる機能があれば便利だとは思うのですが、そもそもアクセスキーを使用することが非推奨だとされているので実装は望み薄のようです。ただ、設定がそこまで煩雑なわけではないので設定してみてください。
今回使用しているコードはAIに出力してもらいました。コードを一から書かなくても出してくれてとても便利。情報の正誤を判断することは必須となりますが、使いこなせると仕事の効率が上がります。