0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

CloudTrailで検知したSSM→EC2のアクセスログを読みやすい形にしてSNSで通知したい

Posted at

概要

今回はCloudTrailとSNSを連携させてアクションがあった際に通知する仕組みをつくりました。
しかし、json生ファイルで見れたものじゃなかったので工夫をしてみましたので紹介します。

現状の構成

ssm-loginarert-sns.drawio.png

配信される内容の例

.json
{
	"version": "0",
	"id": ""
	"detail-type": "AWS API Call via CloudTrail",
	"source": "",
	"account": "",
	"time": "2023-04-25T09:47:10Z",
	"region": "ap-northeast-1",
	"resources": [],
	"detail": {
		"eventVersion": "1.08",
		"userIdentity": {
			"type": "",
			"principalId": "",
			"arn": "",
			"accountId": "",
			"accessKeyId": "",
			"userName": ""
		},
		"eventTime": "2023-04-25T09:47:10Z",
		"eventSource": "ssm.amazonaws.com",
		"eventName": "StartSession",
		"awsRegion": "ap-northeast-1",
		"sourceIPAddress": "",
		"userAgent": "aws-cli/2.11.8 Python/3.11.3 Darwin/21.4.0 source/arm64 prompt/off command/ssm.start-session",
		"requestParameters": {
			"target": ""
		},
		"responseElements": {
			"sessionId": "",
			"tokenValue": "Value hidden due to security reasons.",
			"streamUrl": ""
		},
		"requestID": "",
		"eventID": "",
		"readOnly": false,
		"eventType": "AwsApiCall",
		"managementEvent": true,
		"recipientAccountId": "",
		"eventCategory": "Management",
		"tlsDetails": {
			"tlsVersion": "TLSv1.2",
			"cipherSuite": "",
			"clientProvidedHostHeader": "ssm.ap-northeast-1.amazonaws.com"
		}
	}
}

このままだとこのjsonがそのままメールでおくられてきちゃいます。読みにくくてどうにもなりません。
なので一度Lambdaに突っ込むことにしました。

構成図

ssm-loginarert-sns-lambda.drawio.png

  • この追加したLambda関数では以下のことをさせています。
    • UTCをJSTに変換
    • 受け取ったJsonからEC2へのアクセス通知を把握する上で必要な項目のみを抽出 ついでに余計な""も消します。
    • 抽出した項目を日本語に変換
      • region⇨リージョン
      • source⇨イベント
      • eventName⇨イベントネーム
      • userName⇨ユーザネーム(IAMユーザ)
      • sourceIPAddress⇨接続元IPアドレス
      • target⇨対象リソース

以下がLambda関数です。 Pythonで記述しています

lambda.py
import json
import boto3
import re
from datetime import datetime, timedelta

def lambda_handler(event, context):
    formatted_message = '' # フォーマットされたメッセージを格納する変数を初期化
    timechange = '' # UTCからJSTに変換するための変数を初期化
    utc_format = "%Y-%m-%d %H:%M:%SZ" # UTC時間を表すフォーマットを指定

    # イベントの各行に対してループを行う
    for line in json.dumps(event,indent=0).split('\n'):
        stripped_line = line.strip() # 各行の前後の空白文字を除去して、対象文字列を検索しやすくする
        if '"eventTime"' in stripped_line: # イベント発生時刻が含まれる行の場合
            timechange += line.replace('"eventTime"', '発生時刻') + '\n' # 行の文字列を日本語に置換して、UTCからJSTに変換するための変数に追加
            match = re.search(r'\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}Z', timechange) # 正規表現でUTC時間を表す文字列を検索
            event_time_utc = match.group(0) # UTC時間を表す文字列を取得
            utc_dt = datetime.strptime(event_time_utc, utc_format) # UTC時間をdatetimeオブジェクトに変換
            jst_dt = utc_dt + timedelta(hours=9) # UTCから9時間進めたdatetimeオブジェクトをJST時間に変換
            jst_format = "%Y-%m-%dT%H:%M:%S%z" # JST時間を表すフォーマットを指定
            jst_str = jst_dt.strftime(jst_format) # JST時間をフォーマットに従って文字列に変換
            formatted_message += timechange.replace(event_time_utc, jst_str) # フォーマットされたメッセージに、UTCからJSTに変換されたイベント発生時刻を追加
        elif '"region"' in stripped_line: # AWSリージョンが含まれる行の場合
            formatted_message += line.replace('"region"', 'リージョン') + '\n' # 行の文字列を日本語に置換して、フォーマットされたメッセージに追加
        elif '"source"' in stripped_line: # イベントソースが含まれる行の場合
            formatted_message += line.replace('"source"', 'イベント') + '\n' # 行の文字列を日本語に置換して、フォーマットされたメッセージに追加
        elif '"eventName"' in stripped_line: # イベント名が含まれる行の場合
            formatted_message += line.replace('"eventName"', 'イベントネーム') + '\n' # 行の文字列を日本語に置換して、フォーマットされたメッセージに追加
        elif '"userName"' in stripped_line: # ユーザ名が含まれる行の場合
            formatted_message += line.replace('"userName"', 'ユーザネーム') + '\n' # 行の文字列を日本語に置換して、フォーマットされたメッセージに追加
        elif '"sourceIPAddress"' in stripped_line: # イベントが発生したIPアドレスが含まれる行の場合
            formatted_message += line.replace('"sourceIPAddress"', '接続元IPアドレス') + '\n' # 行の文字列を日本語に置換して、フォーマットされたメッセージに追加
        elif '"target"' in stripped_line: # 対象リソースが含まれる行の場合
            formatted_message += line.replace('"target"', '対象リソース') + '\n' # 行の文字列を日本語に置換して、フォーマットされたメッセージに追加

    # SNSトピックにフォーマットされたメッセージを送信する
    sns = boto3.client('sns') # AWS SDKを使用して、SNSクライアントを作成
    topic_arn = 'AWS-SNSのarnを記述' # AWS-SNSのARNを指定
    response = sns.publish(
    TopicArn=topic_arn,
    Message=formatted_message,
    Subject='通知メールの件名を記述',
    ) # SNSトピックにメッセージを送信する

    return {
        'statusCode': 200,
        'body': json.dumps('Successfully sent formatted message to SNS') # Lambda関数の実行結果を返す
    }

Lambdaを使って整形された通知の例

イベント: "aws.ssm",
リージョン: "ap-northeast-1",
ユーザネーム: "hogehoge"
発生時刻: "2023-04-25T09:47:10Z",
イベントネーム: "StartSession",
接続元IPアドレス: "xxx.xxx.xxx.xxx",
対象リソース: "i-xxxxxxxxxxxxxxx"

最後に

Lambda関数を初めてつかいました。
Pythonは少し触れたことがあったのでそこまで難易度は高くなかった気がします。
AWSは目的に対して到達する手段が多すぎるので何がベストプラクティスか毎日考え悩んでます。

誰かの参考というよりも自分のお勉強メモなのであくまで参考程度にお願いします。

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?