本記事はBIPROGY / ユニアデックス社内AWSコミュニティ「BIPROGY AWS Ambassador」の
定期投稿企画第9回目の記事です。他の定期投稿企画の記事は、
#BIPROGY_AWS_Ambassadorタグ または Organizationページをご覧ください。
目次
- 背景
- やったこと
- アーキテクチャ
- 各リソースの実装方法
4.1. Athenaテーブルの作成
4.2. Pythonプログラムの作成 - 動作確認
- まとめ・今後の展望
1.背景
突然ですが、AWSを使っていると「EC2インスタンス間やVPC内の通信状況をAWS側で確認したい!」と思うこと、ありませんか?そんな時に便利なのがVPCフローログです!
VPCフローログとは、VPC内のEC2インスタンスや他のリソース間でやり取りされる通信(送信元IP、宛先IP、ポート番号、プロトコル、通信の許可/拒否など)をまとめて記録し、Amazon S3に保存できるログ機能です。
早速、マネージメントコンソールから、通信が行われたタイミング付近で出力された、VPCフローログを1つダウンロードし、中身を確認してみました。
正直、ぱっと見では分かりづらい...。
しかも画像では数行しか映っていませんが、実際は何百行ものログが1つのファイルに詰まっています。
この無数の行から目的の通信ログを目視で探すのは大変、、
ということで、「これを何とかしたい!」と思い工夫してみることにしました。
2.やったこと
今回やったことは大きく2つです。
1.Amazon Athena(以下、Athena)でVPCフローログをSQL検索
S3に保存されたVPCフローログを、Athenaというサービスを使ってSQLクエリで検索できるようにしました。これにより、膨大なログの中から必要な通信ログだけを簡単に抽出できるようになりました。
2.Python+Amazon Bedrock(以下、Bedrock)でSQLクエリ作成と結果抽出を自動化
上記1.のAthenaによるログ検索を、さらに効率化・自動化するために、PythonプログラムとBedrock(Claude 3 Haikuモデル)を組み合わせました。Claude 3 Haikuモデルの選定理由は、Claudeシリーズの中でも応答性が速く、低コストであるためです。
コマンドラインから条件を入力するだけで、Bedrockが自動でSQLクエリを生成し、Athenaに渡し結果を取得・表示できる仕組みを作りました。
これにより、SQLの知識が無くても、目的の通信ログを効率よく検索できるようになりました。
3.アーキテクチャ
今回の構成は下記になります。
処理概要
アーキテクチャ図内の処理の流れは以下になります。
1. EC2インスタンス間でICMP通信が発生する
2. VPCフローログが自動でS3に保存される
3. ユーザーがPythonプログラムに条件を入力する
4. Pythonプログラム内でBedrockが呼び出され、入力された条件からSQLクエリを自動で生成し、Athenaへ渡す
5. AthenaからS3に対しSQLクエリを送信する
6. S3からフローログを抽出する
7. 抽出結果をBedrockへ渡す
8. ユーザーに結果を表示する
予めVPCの設定でVPCフローログを有効化にし、保存先として任意のS3バケットを指定する必要があります。
構成のポイント
構成のポイントは下記3点になります。
・S3+Athenaの組み合わせで、大量のログデータからSQLクエリでログを検索可能にした(処理概要5,6)
・BedrockでSQLを自動で作成させることで、ユーザーにSQLの知識が無くても、簡単にログ抽出が出来るようにした(処理概要4)
・Pythonプログラムで一連の処理(処理概要3~8の過程)を自動化し、作業効率を大幅に向上するようにした
4.各リソースの実装方法
各リソースの実装方法は下記になります。
4.1.Athenaテーブルの作成
AWSのVPCフローログは、ネットワーク通信記録をテキストファイルとしてS3に保存します。しかし、このままでは「1.背景」で記載したように目的の通信ログを効率よく検索することが困難です。
Athenaは、S3上のデータをSQLで検索できるサービスですが、Athenaで検索するためには「どんな形式のデータがS3にあるか」をテーブル定義で教える必要があります。
具体的には、以下を実施します。
・ログファイルの各項目(送信元IP、宛先IP、プロトコル番号等)をカラムとして定義
・ログの1行がどのような形式(区切り文字や正規表現)で記録されているかを指定
・S3上のどのパスにログが保存されているかを指定
これにより、AthenaはS3上の通信ログを「データベースのテーブル」として認識し、SQLで柔軟に検索できるようになります。
■実際のテーブル定義例
まず、テーブルを定義する前にデータベースを作成しておく必要があります。
マネージメントコンソールからAthenaのクエリエディタに「create_database.sql」クエリを記載し、実行します。
CREATE DATABASE IF NOT EXISTS my_logs;
これにより、マネージメントコンソールから「my_logs」というデータベースが、作成されたことを確認します。
次に、データベースとして「my_logs」を指定し、「create_vpc_flow_logs_regex.sql」クエリを記載し、実行します。
これにより、「vpc_flow_logs_regex(任意のテーブル名)」を作成することが出来ます。
CREATE EXTERNAL TABLE IF NOT EXISTS vpc_flow_logs_regex ( version int, account_id string, interface_id string, srcaddr string, dstaddr string, srcport int, dstport int, protocol int, packets int, bytes bigint, `start` bigint, `end` bigint, action string, log_status string ) ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.RegexSerDe' WITH SERDEPROPERTIES ( "input.regex" = "^(\\S+)\\s+(\\S+)\\s+(\\S+)\\s+(\\S+)\\s+(\\S+)\\s+(\\S+)\\s+(\\S+)\\s+(\\S+)\\s+(\\S+)\\s+(\\S+)\\s+(\\S+)\\s+(\\S+)\\s+(\\S+)\\s+(\\S+)$" ) LOCATION 's3://[バケット名]/[パス]/';
LOCATIONのS3のパス名はVPCフローログが格納されているS3バケットを指します。
マネージメントコンソールでは下記のようにテーブルを設定できます。
クエリを実行すると、画面左側のテーブルに作成されたテーブル名が設定されます。
既存のデータベースやテーブルに同じ構造のログデータが格納されている場合、そのまま利用可能です。ただし、既存テーブルに、VPCフローログとは異なるログ形式のデータが混在している場合、誤検出のリスクがあります。
4.2.Pythonプログラムの作成
このPythonプログラムは、ユーザーがコマンドラインから検索条件(テーブル名、プロトコル番号、送信元/宛先IP、通信の成否を表すアクション)を入力するだけで、Amazon Bedrockを使ってSQLクエリを自動生成し、Athenaに投げて結果を取得・表示する一連の処理を自動化します。
今回の実践では、入力パラメータ数を最低限にしております。また、本記事ではプログラム内でマッピング処理を入れていない為、プロトコル名ではなくプロトコル番号(ICMP=1,TCP=6,UDP=17)を利用するといった仕様になっております。
AthenaやBedrockを利用するPythonプログラムを実行するには、AWS SDK(boto3)がAWS環境にアクセスできるように認証情報を設定する必要があります。本記事では、「~/.aws/credentials」ファイルにAWS環境への認証情報が設定されている状態を前提としており、認証設定の詳細については省きます。
全体の処理フローは下記になります。
1. コマンドライン引数で検索条件を受け取る
2. Bedrock APIにプロンプトを送信し、SQLクエリを自動生成
3. AthenaでSQLクエリを実行
4. クエリ結果を取得・表示
Pythonプログラムは下記になります。
import boto3
import json
import time
import sys
import argparse
REGION = "ap-northeast-1"
BEDROCK_MODEL_ID = 'anthropic.claude-3-haiku-20240307-v1:0'
ATHENA_DATABASE = "<データベース名>"
ATHENA_OUTPUT_BUCKET = "s3://<バケット名>"
VPC_FLOW_LOG_TABLE = "<テーブル名>"
import re
def extract_sql(text):
# ```sql ... ``` の中身を抽出
match = re.search(r"```sql\s*(.*?)\s*```", text, re.DOTALL)
if match:
sql = match.group(1)
else:
#fallback:最初に "SELECT" が出てくる行から抽出
lines = text.splitlines()
for i, line in enumerate(lines):
if line.strip().upper().startswith("SELECT"):
sql = "\n".join(lines[i:])
break
else:
sql = "SQLが作成されませんでした。"
#不要な文字を除去
sql = re.sub(r"--.*", "", sql) #コメント削除
sql = re.sub(r"/\*.*?\*/", "", sql, flags=re.DOTALL) #ブロックコメント削除
sql = sql.replace("`", "") #バッククォート削除
sql = sql.strip()
#1行に整形
sql = " ".join(sql.split())
return sql
#パラメータの引き渡し
def parse_arguments():
parser = argparse.ArgumentParser(description="Generate SQL prompt for Bedrock Claude")
parser.add_argument("--table", required=True, help="テーブル名")
parser.add_argument("--protocolnum", required=True, help="通信プロトコル番号(例: ICMP=1)")
parser.add_argument("--srcaddr", nargs="+", required=True, help="送信元IPアドレス(複数可)")
parser.add_argument("--dstaddr", nargs="+", required=True, help="宛先IPアドレス(複数可)")
parser.add_argument("--action", required=True, help="アクション(例: ACCEPT)")
return parser.parse_args()
def generate_sql_with_bedrock():
args = parse_arguments()
bedrock = boto3.client("bedrock-runtime", region_name="ap-northeast-1")
#IP条件の組み立て
ip_conditions = []
for src in args.srcaddr:
for dst in args.dstaddr:
ip_conditions.append(f"((srcaddr = '{src}' AND dstaddr = '{dst}'))")
ip_condition_text = " OR ".join(ip_conditions)
#Claudeに渡すプロンプト
user_prompt = (
f"Athenaで実行可能なSQLクエリを生成してください。\n"
f"対象はVPCフローログで、テーブル名は '{args.table}' です。\n"
f"以下の条件を満たすレコードを抽出してください:\n"
f"- protocol ={args.protocolnum}\n"
f"- 指定されたIPアドレス間の片方向通信(srcaddrからdstaddrの方向のみ)を対象とします。\n"
f"- 通信条件:{ip_condition_text}\n"
f"- action = '{args.action}'\n"
f"SQLクエリのみを返してください。説明文や装飾(Markdownなど)は不要です。"
)
messages = [{"role": "user", "content": user_prompt}] #「ユーザーが入力したプロンプト」として定義
#Claude APiに送るために少し加工
body = json.dumps({
"messages": messages,
"anthropic_version": "bedrock-2023-05-31",
"max_tokens": 500,
"temperature": 0.2
})
response = bedrock.invoke_model(
modelId=BEDROCK_MODEL_ID, #BEDROCK_MODEL_ID = 'anthropic.claude-3-haiku-20240307-v1:0'を定義済み
contentType="application/json",
accept="application/json",
body=body
)
response_body = json.loads(response['body'].read())
raw_text = response_body["content"][0]["text"] if isinstance(response_body.get("content"), list) else response_body.get("content", "")
sql_query = extract_sql(raw_text) #返答テキストからSQLクエリ部分だけを抽出する関数「extract_sql」を実行
print("Generated SQL Query:\n", sql_query)
return sql_query
#生成したSQLクエリをAthenaに投げて実行する関数
def execute_athena_query(sql_query):
athena = boto3.client("athena", region_name=REGION)
response = athena.start_query_execution(
QueryString=sql_query,
QueryExecutionContext={"Database": ATHENA_DATABASE},
ResultConfiguration={"OutputLocation": ATHENA_OUTPUT_BUCKET}
)
return response["QueryExecutionId"]
#Athenaのクエリが完了するまで待機し、ステータスが「SUCCEEDED」となれば、結果を取得する関数を呼び出す
def wait_for_query_completion(query_execution_id):
athena = boto3.client("athena", region_name=REGION)
while True:
result = athena.get_query_execution(QueryExecutionId=query_execution_id)
state = result["QueryExecution"]["Status"]["State"]
if state in ["SUCCEEDED", "FAILED", "CANCELLED"]:
return state
time.sleep(2)
#Atheaで実行したクエリの結果を取得し、出力の表示をする関数
def fetch_query_results(query_execution_id):
athena = boto3.client("athena", region_name=REGION)
result = athena.get_query_results(QueryExecutionId=query_execution_id)
print("\nQuery Results:")
for row in result["ResultSet"]["Rows"]:
print([col.get("VarCharValue", "") for col in row["Data"]])
def main():
sql_query = generate_sql_with_bedrock()
query_execution_id = execute_athena_query(sql_query)
status = wait_for_query_completion(query_execution_id)
if status == "SUCCEEDED":
fetch_query_results(query_execution_id)
else:
print(f"Query failed with status: {status}")
if __name__ == '__main__':
main()
上記プログラムを加工する場合、下記パラメータへ任意の値を入れてください。
・REGION:リージョン名
・BEDROCK_MODEL_ID:使用するAmazon BedrockのモデルID
*マネージメントコンソールでAmazon Bedrockのページにアクセスし、「プレイグラウンド」からモデルが使用できるか確認してください。使用できない場合、必要情報を送信すると2~3分でモデルが利用可能になります。
・ATHENA_DATABASE:データベース名
・ATHENA_OUTPUT_BUCKET:バケット名
・VPC_FLOW_LOG_TABLE:テーブル名
各関数の処理内容
関数ごとに行われている処理を、以下にまとめました。
1. extract_sql(text):12~29行目
Amazon BedrockのClaudeモデルから返ってきたレスポンスから、SQLクエリ部分だけを抽出する関数です。この関数は、後述する「3.generate_sql_with_bedrock()」から呼び出されます。
主な処理:
・sql ... で囲まれた部分を抽出
・もし見つからなければ、最初に"SELECT"で始まる行から下を抜き出す
・コメントやバッククォート等の不要な文字を削除し、1行に整形する
2. parse_arguments():30~37行目
コマンドライン引数をパースし、テーブル名やプロトコル番号、IPアドレスなどのパラメータを取得する関数です。
主な処理:
・--table(テーブル名)、--protocolnum(プロトコル番号)、--srcaddr(送信元IP)、--dstaddr(宛先IP)、--action(アクション)を受け取る
3. generate_sql_with_bedrock():38~73行目
引数で受け取った条件をもとに、Amazon Bedrock(Claudeモデル)に「この条件でAthenaに渡す用のSQLクエリを作成して」とプロンプトを送り、返ってきたレスポンスを1.の関数に渡してSQLクエリのみを抽出する関数です。
主な処理:
・コマンドライン引数を取得
・IPアドレスの組み合わせ条件を作成
・あらかじめ作成しておいた自然言語と、引数で取得したパラメータを組み合わせたプロンプトをClaudeモデルに送信し、SQLクエリを作成
・返答(レスポンス)からSQL部分だけを抽出して返す
4. execute_athena_query(sql_query):74~81行目
3.の関数でreturnされた(生成された)SQLクエリをAthenaに投げて実行する関数です。
主な処理:
・Athenaのクライアントを作成
・指定したデータベース・出力先バケットでクエリを実行
・クエリ実行IDを返す
5. wait_for_query_completion(query_execution_id):82~89行目
4.の関数で実行しているAthenaのクエリが完了するまで待機をさせる関数です
主な処理:
・クエリの状態を定期的にチェック
・状態が「SUCCEEDED」,「FAILED」,「CANCELLED」のいずれかになるまでループ
6. fetch_query_results(query_execution_id):90~95行目
Athenaで実行したクエリの結果を取得し、出力する関数です。
主な処理:
・クエリ結果の取得
・各行を表示
7. main():96~105行目
1~6の関数を順番に呼び出し、全体の処理を制御する関数です。
主な処理:
・3.の関数で取得したSQLクエリを「sql_query」パラメータに代入
・4.の関数で取得したクエリ実行IDを「query_execution_id 」パラメータに代入
・5.の関数で取得したクエリの状態を「status」パラメータに代入
・「status」が「SUCCEEDED」であればクエリ結果を出力する関数を呼び出し、結果の表示を行う
5.動作確認
実際にPythonプログラムを実行し、VPCフローログの検索が自動化できることを確認します。
■実際コマンド例
各パラメータに以下のような値を指定します。
・--table:Athenaで作成したテーブル名
・--protocolnum:プロトコル番号(画像ではICMP通信ログを確認したかったため「1」を入力)
・--srcaddr:送信元IPアドレス(172.24.20.208)
・--dstaddr:宛先IPアドレス(172.24.30.35)
・--action:アクション(許可された通信ならば「ACCEPT」、拒否された通信ならば「REJECT」)
■実行結果
コマンドを実行すると、Amazon Bedrockによって自動作成されたSQLクエリが「Generated SQL Query:」(上記画像赤枠)として出力されます。続いて、そのSQLがAthenaに送信され、クエリ結果が「Query Results:」(上記画像緑枠)として表示されます。
出力された結果を確認すると、指定した条件に合致するVPCフローログのみが抽出されていることが分かります。これにより、従来は手作業で行っていた煩雑なログ検索作業を、簡単なコマンド実行だけで効率的に実現できるようになりました。
6.まとめ・今後の展望
■まとめ
本記事ではVPCフローログをAthenaとAmazon Bedrockを組み合わせて効率よく(”いい感じ”に)抽出する仕組みを構築してみました。従来は手作業や複雑なSQLが必要だったログ検索も、自然言語ベースで条件を入力するだけで、Amazon Bedrockが自動でSQLを生成し、Athenaで高速に検索できるようになりました。
■今後の展望
今後の展望としては下記を考えています。
・UIのブラウザ化
CLIだけでなくWeb UI化することで、より直感的に操作できる仕組みを作成したい
・柔軟な条件指定への対応
日付範囲やプロトコル名、ポート番号など、さらに細かい条件での検索にも対応できるように拡張したい
・出力形式の多様化
検索結果を、コマンドラインでの表示ではなく、CSVやExcel形式でダウンロードできるようにすることで、レポート作成やデータ分析に活用できるようにしたい
今後もAWSのサービスを触りながら、AWSの知識をどんどんインプット、アウトプットして楽しいAWSライフを過ごしていきたいと思います!
最後までお読み頂きありがとうございました!
We Are Hiring!




