はじめに
OCI 監査ログを OCI AIデータ・プラットフォーム (AIDP) を使って集計・分析する連載の Part 3 です。
Qiita の記事は 3本に分けて掲載します。
Part 3 では、Part 2 で整えられた監査ログを使って分析作業を進めていきます。
また、分析結果を Volume や Autonomous AI Database に書き出すことも行います。
尚、全編を通して、ociaidp と言うコンパートメント内で作業を行なっています。
分析作業の確認
監査ログの分析には色々な切り口がありますが、今回はシンプルに以下の2つを例に挙げて話を進めていきます。
- エラー(HTTP Status: 5xx)が多く発生している API リクエストの特定
- 秒間あたりの呼び出し回数の多い API リクエストの特定
前者は、Excelなどで扱えるように CSV ファイルに落とし、後者は、Autonomous AI Lakehouse (Oracle Autonomous Data Warehouse の改称なので ADW と省略します) のテーブルにエクスポートして、さらにそこから分析作業を進めていきます。
Notebook の作成と監査ログのロード
では、新しく Notebook を作成し、Paart 2 で前処理された監査ログの Parquet ファイルを読み込んで内容を確認するところから始めましょう。
$\small \textsf{script}$
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
os_namespace = "xxxxx"
audit_log_bucket = "audit_log"
parsed_file = f"oci://{audit_log_bucket}@{os_namespace}/analysis/parsed.parquet"
df = spark.read.format("parquet").load(parsed_file)
print(f"Loaded {df.count():,} records.")
df.printSchema()
(分析1) エラーが多く発生している API リクエストの特定
集計作業
やり方は複数ありますが、ここでは Spark SQL を使ってみます。
$\small \textsf{script}$
df.createOrReplaceTempView("LOG_DATA")
errors_by_type = spark.sql("""
SELECT compartmentName, type, status, count(*) as count FROM LOG_DATA
WHERE status >= 500
GROUP BY compartmentName, type, status
ORDER BY count desc
""")
errors_by_type.show(truncate=False)
(監査ログの量が僅かなので、エラーがほとんど出ていません...)
Volume を使用したオブジェクト・ストレージへの CSV ファイル書き出し
オブジェクト・ファイルの書き出しは、従来通り oci://~ の形式でも構いませんが、AIDP の Volume を構成すると、オブジェクト・ストレージを抽象化してファイルシステムのパス形式で読み書きできるようになりますので、ここでは Volume を使ったファイルの保存方法を試します。
AIDP には カタログ と スキーマ という基本的な概念が存在し、AIDP の様々な構成要素は、この名前空間の中に整理されて存在します。ボリュームも特定のカタログ/スキーマの下に存在します。
ファイルにアクセスする際のパスは、
/Volumes/<catalog_name>/<schema_name>/<volume_name>/<file_name>
となります。
Volume について大まかに理解したところで、早速作成してみましょう。
Master Catalog の画面右上の Create catalog をクリックして新しいカタログを作成します。
"my_catalog" という名前でカタログを作成します。
"my_catalog" が作成されましたが、同時に "default" というスキーマも作成されています。Create schema で新しくスキーマを作成してもいいですが、ここでは "default" スキーマの中に Volume を作成します。
"default" スキーマを開いて、右上 Add to schema から Volume を作成します。
"my_volume" という名前で Volume を作成します。
今回は External Volume としてオブジェクト・ストレージを使用しますので、あらかじめ作成しておいた "aidp_volume" というバケットを指定します。
ボリュームが作成されました。
では DataFrame をこの Volume 上の CSV ファイルに書き出します。
$\small \textsf{script}$
catalog_name = "my_catalog"
schema_name = "default"
volume_name = "my_volume"
file_name = "errors_by_type.csv"
csv_file = f"/Volumes/{catalog_name}/{schema_name}/{volume_name}/{file_name}"
errors_by_type.coalesce(1).write.format("csv")\
.mode('overwrite').option("header", "true").save(csv_file)
(分析2) 秒間あたりの呼び出し回数の多い API リクエストの特定
秒間あたりの API コール数 (RPS) を、compartmentName, type 別に集計してみます。
$\small \textsf{script}$
from pyspark.sql.functions import (
col, max, max_by, desc, date_trunc, count
)
print("requests per second by compartmentName and type")
rps_by_type = (
df
.withColumn("ts_sec", date_trunc("second", col("time")))
.groupBy("compartmentName", "type", "ts_sec")
.agg(
count("ts_sec").alias("rps"),
)
.orderBy(desc("rps"))
.select("rps", "compartmentName", "type", "ts_sec")
)
# 表示は rps >= 10 に絞る
rps_by_type.where(col("rps") >= 10).show(100, truncate=False)
Autonomous AI Lakehouse (ADW) への書き出し
この RPS 集計結果を Autonomous AI Lakehouse (ADW) に書き出してみます。
ADW 側の準備
まず、ADW 側でユーザを作成しますが、ここでは "OML" という名前のユーザを作成します。分析作業用に Oracle Machine Learning を利用可能にしておきましょう。
AIDP 側の準備
次に AIDP 側で、この ADW に対応する External catalog を作成します。
Master catalog 画面の右上 Create Catalog をクリックします。
Catalog name は ここでは "lakehouse" にしておきます。
Catalog type を "External catalog" に、External source type を "Oracle Autonomous Data Warehouse" にして、必要な情報を埋めていきます。
Test connection ボタンを押して接続の確認を行なってから Create を押して下さい。
カタログの準備ができました。
ADW テーブルにデータを保存
準備が整ったので、集計結果の DataFrame を ADWのテーブルに書き込みます。
$\small \textsf{script}$
"""
テーブルは <カタログ名>.<スキーマ名(Oracle DBユーザ名)>.<テーブル名> で表す
テーブル名、カラム名は大文字にしておかないと、Oracle DB 側で "" をつけないと操作できなくなるので注意
例) SELECT "rps" FROM "rps_by_type"
"""
(
rps_by_type
.withColumnRenamed("rps", "RPS")
.withColumnRenamed("compartmentName", "COMPARTMENTNAME")
.withColumnRenamed("type", "TYPE")
.withColumnRenamed("ts_sec", "TS_SEC")
.write.mode("overwrite").saveAsTable("lakehouse.oml.RPS_BY_TYPE")
)
書き出したテーブルのデータは、同じくカタログを使って参照が可能です。
-
DataFrame パターン
$\small \textsf{script}$
spark.table("lakehouse.oml.rps_by_type").withColumn("rps", col("rps").cast("int")) \ .sort("rps", ascending=False).show(3, truncate=False) -
SQL パターン
$\small \textsf{script}$
%sql SELECT * FROM lakehouse.oml.rps_by_type ORDER BY rps DESC
集計データを分析する
AIDP で分析する
AIDP の Cluster にはライブラリを追加することができます。Python の
pip install -r requirements.txt
と同じ要領で、matplotlib をリストに入れた requirements.txt ファイルを作成して Cluster の "Library" のところから追加すれば、 Notebook でグラフが描けるようになります。
API コール数の多いタイプを抽出して円グラフにしてみました。
$\small \textsf{script}$
import matplotlib.pyplot as plt
from pyspark.sql.functions import sum
pd = (
rps_by_type.groupBy("type").agg(sum("rps").alias("total"))
.filter("total >= 150").sort("total", ascending=False)
.toPandas()
)
print(pd)
plt.pie(pd["total"], labels=pd["type"])
plt.show()
ADW で分析する
詳細は省きますが、ADW に取り込まれた集計データは、ADW の備える各種ツールを使って、分析をすることができます。
Database Actions - Charts を使ったビジュアライズ
Oralce Machime Learning (OML) Notebook を使った分析
$\small \textsf{script}$
%python
import oml
import matplotlib.pyplot as plt
query = """
select (ts_sec + 9/24) as jst, sum(rps) as rps from oml.rps_by_type
group by ts_sec order by ts_sec
"""
oml_rps = oml.sync(query = query)
pd_rps = oml_rps.pull()
plt.figure(figsize=(32, 8))
plt.scatter(pd_rps["JST"], pd_rps["RPS"])
plt.show()
外部ツールを使って分析する
Cluster に接続するための JDBCドライバ、ODBCドライバが利用できますので、これを使って外部の分析ツールから AIDP が管理するデータにアクセスすることができます。
まとめ
OCIの生の監査ログから分析結果をビジュアライズするところまでを一通り実施しました。
実際の現場でとてつもないログの量を処理するケースでは、監査ログを整えるフェーズでの Cluster のオートスケールの状況などが確認できると思います。
AIDP を使えば、分析に関する様々な処理を単一プラットフォームで完結でき、作業効率がグンと上がります。 Oracle Database とのデータ連携も非常にスムースです。さらに今後 AI 関連の機能が強化されていくロードマップもあるので期待が膨らみますね。






















