やりたいこと
CCoE の方をはじめ、AWS の運用に携わったことがある方なら誰しも、VPC フローログの分析を行ったことがあるのではないでしょうか?
慣れた方なら難しい作業ではないですが、Athena を使用するので多少なりとも SQL の知識が必要になります。
そこで、例えば以下のような投げかけをしただけで分析結果が返ってくるようなものがあったら素敵じゃないですか?
- 今日最も通信が発生していたIPアドレスを教えてください
- 先週と今週で最も通信量が増えたインスタンスを教えてください
今回は Claude3.7 のお力を借りてそんなウェブアプリを作ったのでご紹介します。
最終イメージ
先に最終的に作成したウェブアプリのイメージを載せておきます。
こちらに質問を入力し、実行します。
まずは実行するSQLクエリと実行結果が表示され、その後以下のように分析結果が表示されます。
実装
全体構成
全体の構成としては以下のようになっています。
事前準備
以下は完了済みの前提で進めます。
- ローカルのデフォルトプロファイル(IAM)が全体構成にあるサービスを利用可能なこと(リージョンはus-east-1)
- Athena の出力先設定がされていること
- Bedrock でモデル(Claude3.7)へのアクセスが付与されていること
実行ファイルの作成
以下のようなディレクトリ構成でファイルを作成します。
(README は任意です)
vpc-flow-logs-analyzer/
├── vpc_flow_logs_catalog.py
├── vpc_flow_logs_translator.py
├── frontend_visualization.py
└── README.md
それぞれのファイル内容は以下の通りです。
S3 バケット名などは適宜変更してください。(★は変更必須)
vpc_flow_logs_catalog.py
import boto3
import time
from botocore.exceptions import ClientError
def create_database_and_table(database_name, table_name, s3_flow_logs_path):
"""
AWS Glue を使用して VPC フローログ用のデータベースとテーブルを作成する
"""
glue_client = boto3.client('glue')
# データベースの作成
try:
glue_client.create_database(
DatabaseInput={
'Name': database_name,
'Description': 'Database for VPC Flow Logs analysis'
}
)
print(f"データベース '{database_name}' を作成しました")
except ClientError as e:
if e.response['Error']['Code'] == 'AlreadyExistsException':
print(f"データベース '{database_name}' は既に存在します")
else:
raise e
# テーブルの作成
try:
glue_client.create_table(
DatabaseName=database_name,
TableInput={
'Name': table_name,
'Description': 'VPC Flow Logs table',
'StorageDescriptor': {
'Columns': [
{'Name': 'version', 'Type': 'int'},
{'Name': 'account_id', 'Type': 'string'},
{'Name': 'interface_id', 'Type': 'string'},
{'Name': 'srcaddr', 'Type': 'string'},
{'Name': 'dstaddr', 'Type': 'string'},
{'Name': 'srcport', 'Type': 'int'},
{'Name': 'dstport', 'Type': 'int'},
{'Name': 'protocol', 'Type': 'int'},
{'Name': 'packets', 'Type': 'int'},
{'Name': 'bytes', 'Type': 'bigint'},
{'Name': 'start', 'Type': 'int'},
{'Name': 'end', 'Type': 'int'},
{'Name': 'action', 'Type': 'string'},
{'Name': 'log_status', 'Type': 'string'},
{'Name': 'vpc_id', 'Type': 'string'},
{'Name': 'subnet_id', 'Type': 'string'},
{'Name': 'instance_id', 'Type': 'string'},
{'Name': 'tcp_flags', 'Type': 'int'},
{'Name': 'type', 'Type': 'string'},
{'Name': 'pkt_srcaddr', 'Type': 'string'},
{'Name': 'pkt_dstaddr', 'Type': 'string'},
{'Name': 'region', 'Type': 'string'},
{'Name': 'az_id', 'Type': 'string'},
{'Name': 'sublocation_type', 'Type': 'string'},
{'Name': 'sublocation_id', 'Type': 'string'},
],
'Location': s3_flow_logs_path,
'InputFormat': 'org.apache.hadoop.mapred.TextInputFormat',
'OutputFormat': 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat',
'SerdeInfo': {
'SerializationLibrary': 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe',
'Parameters': {
'field.delim': ' ',
'serialization.format': ' '
}
},
},
'TableType': 'EXTERNAL_TABLE',
'Parameters': {
'classification': 'csv',
'skip.header.line.count': '1'
}
}
)
print(f"テーブル '{table_name}' を作成しました")
except ClientError as e:
if e.response['Error']['Code'] == 'AlreadyExistsException':
print(f"テーブル '{table_name}' は既に存在します")
else:
raise e
def test_athena_query(database_name, table_name, s3_output_location):
"""
Athena を使用してテストクエリを実行する
"""
athena_client = boto3.client('athena')
query = f"SELECT * FROM {database_name}.{table_name} LIMIT 10"
# クエリの実行
response = athena_client.start_query_execution(
QueryString=query,
QueryExecutionContext={
'Database': database_name
},
ResultConfiguration={
'OutputLocation': s3_output_location
}
)
query_execution_id = response['QueryExecutionId']
print(f"クエリ実行 ID: {query_execution_id}")
# クエリ完了を待つ
while True:
response = athena_client.get_query_execution(QueryExecutionId=query_execution_id)
state = response['QueryExecution']['Status']['State']
if state in ['SUCCEEDED', 'FAILED', 'CANCELLED']:
break
print(f"クエリ実行中... 状態: {state}")
time.sleep(1)
if state == 'SUCCEEDED':
# 結果の取得
results = athena_client.get_query_results(QueryExecutionId=query_execution_id)
print("クエリが正常に完了しました。VPC フローログのサンプルデータ:")
# ヘッダー行を取得
header = [col['Label'] for col in results['ResultSet']['ResultSetMetadata']['ColumnInfo']]
print(", ".join(header))
# データ行を取得
for row in results['ResultSet']['Rows'][1:]: # 最初の行はヘッダーなので飛ばす
data = [field.get('VarCharValue', '') for field in row['Data']]
print(", ".join(data))
else:
print(f"クエリは失敗しました。状態: {state}")
if 'StateChangeReason' in response['QueryExecution']['Status']:
print(f"理由: {response['QueryExecution']['Status']['StateChangeReason']}")
if __name__ == "__main__":
# パラメータを設定
★ACCOUNT_ID = '123456789123' # VPCフローログのアカウント
DATABASE_NAME = "vpc_flow_logs_db"
TABLE_NAME = "vpc_flow_logs"
★S3_FLOW_LOGS_PATH = f"s3://vpc-flow-logs-analyzer-bucket-xxxxxxxx/AWSLogs/{ACCOUNT_ID}/vpcflowlogs/" # VPCフローログ格納バケット
★S3_OUTPUT_LOCATION = "s3://vpc-flowlogs-athena-results-xxxxxxxx/" # Athenaの出力先バケット
# データベースとテーブルの作成
create_database_and_table(DATABASE_NAME, TABLE_NAME, S3_FLOW_LOGS_PATH)
# テストクエリの実行
test_athena_query(DATABASE_NAME, TABLE_NAME, S3_OUTPUT_LOCATION)
vpc_flow_logs_translator.py
import boto3
import json
import os
import time
import re
from datetime import datetime, timedelta
class VPCFlowLogsQueryTranslator:
def __init__(self, region_name='us-east-1'):
"""
VPC フローログの自然言語クエリを SQL に変換するクラス
"""
self.bedrock_runtime = boto3.client('bedrock-runtime', region_name=region_name)
self.athena_client = boto3.client('athena', region_name=region_name)
self.database_name = "vpc_flow_logs_db"
self.table_name = "vpc_flow_logs"
self.s3_output_location = "s3://vpc-flowlogs-athena-results-202503/"
# テーブルスキーマの情報
self.schema_info = """
VPC フローログスキーマ:
- version: フローログのバージョン
- account_id: AWS アカウント ID
- interface_id: ネットワークインターフェース ID
- srcaddr: 送信元 IP アドレス
- dstaddr: 宛先 IP アドレス
- srcport: 送信元ポート
- dstport: 宛先ポート
- protocol: IANA プロトコル番号 (例: 6=TCP, 17=UDP)
- packets: パケット数
- bytes: バイト数
- start: 開始時間(Unix 秒)
- end: 終了時間(Unix 秒)
- action: アクション(ACCEPT または REJECT)
- log_status: ログステータス
- vpc_id: VPC ID
- subnet_id: サブネット ID
- instance_id: インスタンス ID
- tcp_flags: TCP フラグ
- type: トラフィックタイプ
- region: リージョン
- az_id: アベイラビリティーゾーン ID
"""
# 一般的なプロトコル番号とポート番号の参照
self.protocol_reference = """
一般的なプロトコル番号:
- 1: ICMP
- 6: TCP
- 17: UDP
一般的なポート番号:
- 22: SSH
- 80: HTTP
- 443: HTTPS
- 3389: RDP
- 53: DNS
"""
def natural_language_to_sql(self, user_query):
"""
自然言語クエリを SQL に変換する
"""
# 現在の時刻情報を計算(クエリの相対時間参照用)
now = datetime.now()
yesterday = now - timedelta(days=1)
last_week = now - timedelta(days=7)
last_month = now - timedelta(days=30)
time_context = f"""
現在の日時: {now.strftime('%Y-%m-%d %H:%M:%S')}
昨日: {yesterday.strftime('%Y-%m-%d')}
先週: {last_week.strftime('%Y-%m-%d')} から {now.strftime('%Y-%m-%d')}
先月: {last_month.strftime('%Y-%m-%d')} から {now.strftime('%Y-%m-%d')}
注意: VPC フローログの start/end フィールドは Unix 秒形式です。
"""
# Bedrock へのプロンプト
prompt = f"""
あなたは VPC フローログを分析する SQL クエリを生成するエキスパートです。
以下の情報に基づいて、ユーザーの自然言語クエリを Athena SQL クエリに変換してください:
{self.schema_info}
{self.protocol_reference}
{time_context}
テーブル情報:
データベース名: {self.database_name}
テーブル名: {self.table_name}
ユーザーからの質問: "{user_query}"
出力形式:
以下のJSON形式でのみ回答してください。
{{
"sql_query": "生成されたSQL文",
"explanation": "このSQLクエリがどのようにユーザーの質問に答えるかの説明"
}}
必ず有効な SQL クエリを生成し、テーブルの実際のカラム名を使用してください。
ただし、AWS Athenaで使用できる関数のみを利用するよう注意してください。
以下のような制約が主にあります。
・時刻の比較をする際には「from_unixtime(start, 9, 0) >= cast('2025-03-23 00:00:00 +09:00' as timestamp with time zone)」のような記述方法で作成してください。
・カラム名にSQLとして意味のある単語がある場合はダブルクォーテーションで囲ってください。(start,endなど)
"""
# Bedrock を使用して Bedrock モデルを呼び出す
response = self.bedrock_runtime.invoke_model(
★modelId="arn:aws:bedrock:us-east-1:123456789123:inference-profile/us.anthropic.claude-3-7-sonnet-20250219-v1:0",
contentType="application/json",
accept="application/json",
body=json.dumps({
"anthropic_version": "bedrock-2023-05-31",
"max_tokens": 1000,
"messages": [
{
"role": "user",
"content": prompt
}
]
})
)
# レスポンスをパースする
response_body = json.loads(response['body'].read().decode('utf-8'))
response_content = response_body['content'][0]['text']
try:
json_text = re.sub(r'^```json\s*\n|\n```$', '', response_content.strip())
# JSON レスポンスを抽出する
result = json.loads(json_text)
return result
except json.JSONDecodeError:
# JSON 解析に失敗した場合、テキスト全体を返す
return {"error": "SQL の抽出に失敗しました", "raw_response": response_content}
def execute_query(self, sql_query):
"""
Athena でクエリを実行する
"""
# クエリの実行
response = self.athena_client.start_query_execution(
QueryString=sql_query,
QueryExecutionContext={
'Database': self.database_name
},
ResultConfiguration={
'OutputLocation': self.s3_output_location
}
)
query_execution_id = response['QueryExecutionId']
print(f"クエリ実行 ID: {query_execution_id}")
# クエリ完了を待つ
while True:
response = self.athena_client.get_query_execution(QueryExecutionId=query_execution_id)
state = response['QueryExecution']['Status']['State']
if state in ['SUCCEEDED', 'FAILED', 'CANCELLED']:
break
print(f"クエリ実行中... 状態: {state}")
time.sleep(1)
if state == 'SUCCEEDED':
# 結果の取得
results = self.athena_client.get_query_results(QueryExecutionId=query_execution_id)
# ヘッダー行とデータ行を整形
headers = [col['Label'] for col in results['ResultSet']['ResultSetMetadata']['ColumnInfo']]
rows = []
for row in results['ResultSet']['Rows'][1:]: # 最初の行はヘッダーなので飛ばす
data = [field.get('VarCharValue', '') for field in row['Data']]
rows.append(dict(zip(headers, data)))
return {
'status': 'success',
'headers': headers,
'rows': rows,
'row_count': len(rows)
}
else:
error_message = response['QueryExecution']['Status'].get('StateChangeReason', 'Unknown error')
return {
'status': 'error',
'message': f"クエリは失敗しました: {error_message}"
}
def process_natural_language_query(self, nl_query):
"""
自然言語クエリを処理してSQL変換と実行を行う
"""
# 自然言語をSQLに変換
translation_result = self.natural_language_to_sql(nl_query)
if 'error' in translation_result:
return translation_result
sql_query = translation_result['sql_query']
explanation = translation_result['explanation']
print(f"生成されたSQL: {sql_query}")
print(f"説明: {explanation}")
# SQLを実行
query_result = self.execute_query(sql_query)
# 結果を返す
return {
'original_query': nl_query,
'generated_sql': sql_query,
'explanation': explanation,
'query_result': query_result
}
# 使用例
if __name__ == "__main__":
translator = VPCFlowLogsQueryTranslator()
# テスト用の自然言語クエリ
test_queries = [
"先週一番トラフィックの多かった EC2 インスタンスは?",
"ポート 22 への接続要求のうち拒否されたものを一覧表示して",
"一昨日最も通信が多かったポート番号は??"
]
for query in test_queries:
print(f"\n-----\n自然言語クエリ: '{query}'")
result = translator.process_natural_language_query(query)
print(json.dumps(result, indent=2, ensure_ascii=False))
frontend_visualization.py
import streamlit as st
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import json
import boto3
import time
from datetime import datetime, timedelta
import numpy as np
import matplotlib.dates as mdates
from io import BytesIO
import base64
import re
# VPCFlowLogsQueryTranslator クラスのインポート(前のコードからインポート)
from vpc_flow_logs_translator import VPCFlowLogsQueryTranslator
# Bedrock を使った分析結果の自然言語解説生成
class BedrockAnalysisExplainer:
def __init__(self, region_name='us-east-1'):
self.bedrock_runtime = boto3.client('bedrock-runtime', region_name=region_name)
def generate_explanation(self, query, sql, results):
"""
Bedrock を使用してクエリ結果の解説を生成
"""
# 結果をテキスト形式に変換
result_text = ""
if isinstance(results, dict) and 'query_result' in results:
query_result = results['query_result']
if query_result['status'] == 'success':
headers = query_result['headers']
rows = query_result['rows']
# テーブル形式のデータを文字列に変換
result_text += f"結果の行数: {len(rows)}\n\n"
if len(rows) > 0:
# ヘッダー行
result_text += " | ".join(headers) + "\n"
result_text += "-" * (sum(len(h) for h in headers) + 3 * (len(headers) - 1)) + "\n"
# データ行(最大10行まで)
for i, row in enumerate(rows[:10]):
result_text += " | ".join([str(row.get(h, "")) for h in headers]) + "\n"
if len(rows) > 10:
result_text += "... (さらに行が続きます) ...\n"
# Bedrock へのプロンプト
prompt = f"""
あなたはネットワークセキュリティと AWS VPC フローログの専門家です。
以下のクエリとその結果について、日本語で詳細な分析と洞察を提供してください。
ユーザーの質問:
{query}
実行された SQL クエリ:
{sql}
クエリの結果:
{result_text}
この結果から読み取れる重要なパターン、異常、セキュリティ上の懸念事項、または最適化の機会があれば指摘してください。
以下のポイントを考慮してください:
1. トラフィックの分布パターン
2. 潜在的なセキュリティリスク
3. ネットワークパフォーマンスの改善点
4. 追加で実行すべき分析クエリの提案
あなたの回答は、IT マネージャーにも理解できる簡潔な日本語で提供してください。
回答は Markdown 形式で構造化し、セクションごとに見出しを付けてください。
"""
# Bedrock を使用して Bedrock モデルを呼び出す
response = self.bedrock_runtime.invoke_model(
★modelId="arn:aws:bedrock:us-east-1:123456789123:inference-profile/us.anthropic.claude-3-7-sonnet-20250219-v1:0",
contentType="application/json",
accept="application/json",
body=json.dumps({
"anthropic_version": "bedrock-2023-05-31",
"max_tokens": 2000,
"messages": [
{
"role": "user",
"content": prompt
}
]
})
)
# レスポンスをパースする
response_body = json.loads(response['body'].read().decode('utf-8'))
explanation = response_body['content'][0]['text']
return explanation
# 可視化のヘルパー関数
def generate_visualizations(results):
"""
クエリ結果に基づいて適切な可視化を生成
"""
visualizations = []
try:
if isinstance(results, dict) and 'query_result' in results:
query_result = results['query_result']
if query_result['status'] == 'success' and len(query_result['rows']) > 0:
# 結果を DataFrame に変換
df = pd.DataFrame(query_result['rows'])
if not df.empty:
# データ型を変換(可能な場合)
for col in df.columns:
try:
if df[col].str.match(r'^\d+$').all():
df[col] = pd.to_numeric(df[col])
elif df[col].str.match(r'^\d+\.\d+$').all():
df[col] = pd.to_numeric(df[col])
except:
pass
# plotに日本語対応フォントを設定
plt.rcParams['font.family'] = 'Yu Gothic'
# 1. バーチャートの作成(最初の数値列と最初の文字列列を使用)
numeric_cols = df.select_dtypes(include=['number']).columns
string_cols = df.select_dtypes(include=['object']).columns
if len(numeric_cols) > 0 and len(string_cols) > 0:
plt.figure(figsize=(10, 6))
# 最大10個の項目に制限
plot_df = df.nlargest(10, numeric_cols[0]) if len(df) > 10 else df
sns.barplot(x=string_cols[0], y=numeric_cols[0], data=plot_df)
plt.title(f'{string_cols[0]} ごとの {numeric_cols[0]}')
plt.xticks(rotation=45)
plt.tight_layout()
buf = BytesIO()
plt.savefig(buf, format='png')
buf.seek(0)
img1 = base64.b64encode(buf.read()).decode()
plt.close()
visualizations.append({
'type': 'bar_chart',
'title': f'{string_cols[0]} ごとの {numeric_cols[0]}',
'image': img1
})
# 2. 時系列プロット(日時カラムが含まれている場合)
time_cols = [col for col in df.columns if 'time' in col.lower() or 'date' in col.lower()]
if len(time_cols) > 0 and len(numeric_cols) > 0:
try:
df[time_cols[0]] = pd.to_datetime(df[time_cols[0]])
plt.figure(figsize=(10, 6))
plt.plot(df[time_cols[0]], df[numeric_cols[0]], marker='o')
plt.title(f'{time_cols[0]} に対する {numeric_cols[0]} の変化')
plt.xticks(rotation=45)
plt.tight_layout()
buf = BytesIO()
plt.savefig(buf, format='png')
buf.seek(0)
img2 = base64.b64encode(buf.read()).decode()
plt.close()
visualizations.append({
'type': 'time_series',
'title': f'{time_cols[0]} に対する {numeric_cols[0]} の変化',
'image': img2
})
except:
pass
# 3. 円グラフ(パーセンテージデータがある場合)
if len(string_cols) > 0 and len(numeric_cols) > 0:
plt.figure(figsize=(10, 6))
plot_df = df.nlargest(10, numeric_cols[0]) if len(df) > 10 else df
plt.pie(plot_df[numeric_cols[0]], labels=plot_df[string_cols[0]], autopct='%1.1f%%')
plt.title(f'{string_cols[0]} の分布')
plt.tight_layout()
buf = BytesIO()
plt.savefig(buf, format='png')
buf.seek(0)
img3 = base64.b64encode(buf.read()).decode()
plt.close()
visualizations.append({
'type': 'pie_chart',
'title': f'{string_cols[0]} の分布',
'image': img3
})
except Exception as e:
st.error(f"可視化の生成中にエラーが発生しました: {str(e)}")
return visualizations
# Streamlit インターフェースの構築
def main():
st.set_page_config(page_title="VPC フローログ分析ツール", layout="wide")
st.title("VPC フローログ自然言語分析ツール")
st.markdown("""
このアプリケーションでは、自然言語で VPC フローログに関する質問ができます。
Amazon Bedrock と AWS のサービスを組み合わせて、質問を SQL に変換し、結果を分析します。
""")
# 初期化
if 'translator' not in st.session_state:
st.session_state.translator = VPCFlowLogsQueryTranslator()
if 'explainer' not in st.session_state:
st.session_state.explainer = BedrockAnalysisExplainer()
if 'bedrock_runtime' not in st.session_state:
st.session_state.bedrock_runtime = boto3.client('bedrock-runtime', region_name='us-east-1')
# サンプルクエリのリスト
sample_queries = [
"先週最も通信量の多かった EC2 インスタンスは?",
"ポート 22 への拒否された接続をすべて表示して",
"外部 IP アドレス 203.0.113.x からの接続を一覧表示して",
"先月と比較して、トラフィックパターンに変化のあるサブネットを示して"
]
# サイドバーにサンプルクエリ表示
st.sidebar.header("サンプルクエリ")
for query in sample_queries:
if st.sidebar.button(query):
st.session_state.query = query
# クエリ入力
query = st.text_input("VPC フローログについて質問してください(日本語で)", key="query")
col1, col2 = st.columns([1, 3])
with col1:
execute_button = st.button("実行")
if execute_button and query:
with st.spinner("クエリを処理中..."):
try:
# 自然言語クエリの処理
results = st.session_state.translator.process_natural_language_query(query)
print(results)
# 結果の表示
st.subheader("SQL クエリ")
st.code(results['generated_sql'])
st.subheader("クエリの説明")
st.write(results['explanation'])
if 'query_result' in results and results['query_result']['status'] == 'success':
query_result = results['query_result']
st.subheader("クエリ結果")
if len(query_result['rows']) > 0:
df = pd.DataFrame(query_result['rows'])
st.dataframe(df)
else:
st.info("クエリは成功しましたが、結果が 0 件でした。")
# 可視化の生成と表示
visualizations = generate_visualizations(results)
if visualizations:
st.subheader("データの可視化")
viz_cols = st.columns(min(len(visualizations), 2))
for i, viz in enumerate(visualizations):
col_idx = i % 2
with viz_cols[col_idx]:
st.subheader(viz['title'])
st.image(f"data:image/png;base64,{viz['image']}")
# Bedrock による解説の生成
with st.spinner("AI による分析を生成中..."):
explanation = st.session_state.explainer.generate_explanation(
query, results['generated_sql'], results)
st.subheader("AI による分析")
st.markdown(explanation)
else:
st.error(f"クエリの実行中にエラーが発生しました: {results['query_result']['message']}")
except Exception as e:
st.error(f"処理中にエラーが発生しました: {str(e)}")
if __name__ == "__main__":
main()
カタログ化
vpc_flow_logs_catalog.py を実行します。
実行後、Glue にテーブルが作成されていることを確認します。
Streamlit起動
下記実行し、Streamlit を起動します。
streamlit run frontend_visualization.py
正常に起動すると、冒頭に紹介したような画面が開きます。
実装の不足点と展望
以下の点が考慮されていないので、必要に応じてコードを変更したり、仕組み作りが必要です。(他にもいろいろあるかと思います。)
- 複数アカウントに対応していない
- カタログ化のCI/CD
- Athena特有のSQL制約への対応
- セキュリティ面の検討
さいごに
今回は AWS 運用に携わったことがある人ならほとんどの人がめんどくさいと思ったことがあるであろう VPC フローログの解析を簡単に行う方法を検証してみました。
他にも CloudTrail のログに対して同じような仕組みで「昨日○○インスタンスを終了させたのは誰?」といった感じで聞けたら楽でいいですね!
Claude を使用して、このアプリは半日程度で作れました。(ちなみに非課金ユーザーです)
そのほとんどは軽微なエラー修正でしたので、本当に簡単に作れちゃいます!
是非皆さんも色々試してみてはいかがでしょうか!
弊社では一緒に働く仲間を募集中です!
現在、様々な職種を募集しております。
カジュアル面談も可能ですので、ご連絡お待ちしております!
募集内容等詳細は、是非採用サイトをご確認ください。
https://engineer.po-holdings.co.jp/