11
4

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

OCI の監査ログを AIデータ・プラットフォーム (AIDP) を使って分析してみた(Part 1/3 - 監査ログを収集する)

11
Last updated at Posted at 2026-01-05

はじめに

最近、とあるプロジェクトで1ヶ月分(23億+ APIコール)の OCI 監査ログを集計・分析する機会があり、その時は実績重視で OCI データ・フロー と OCI データ・サイエンス を使用しました。既に OCI AIデータ・プラットフォーム (AIDP) がリリースされていたので、本当は AIDP を使いたかったのですが、ぶっつけ本番で Try & Error しながら進めるには緊急度が高いミッションだったので、AIDP はお預けにしていました。

ということで、今回はデータ・フロー & データ・サイエンスを使用して行った監査ログの分析を、そのまま丸々 AIDP でやってみました。結論から言うと、AIDP のデータ分析統合環境としての完成度の高さを実感できました。

全体像は下図の通りなのですが、結構な長丁場なので、Qiita の記事は 3本に分けて掲載します。

image.png

尚、AIデータ・プラットフォーム を使っていますが、Generaitive AI とか Machine Learning とかは一切出てきません。

では「Part 1 - 監査ログを収集する」から始めましょう!

全編を通して、ociaidp と言うコンパートメント内で作業を行なっています。

監査ログ分析の動機

そもそも何で監査ログを分析する必要があったのか?

  • エラー(HTTP Status: 5xx)が多く発生している API リクエストの特定
  • 時間あたりの呼び出し回数の多い API リクエストや時間帯の特定

等が、主たる目的でした。また、分析の結果、稼働中のアプリケーションの裏側で呼び出される API の状況が可視化でき、オペレーションの方式やツールの最適化にも役立ちました。

監査ログの収集方法

二つ方法があります。

  1. 過去の特定の期間の監査ログを収集する
  2. リアルタイムに監査ログを収集する

それぞれの方法について以下に解説しますが、今回の分析では、後者のリアルタイムな方法を用います。

方法1 - 過去の特定の期間の監査ログを収集する

API を使って任意の期間の監査ログを収集することができます。
ピンポイントで特定の時間のログを取得するには便利ですが、 処理が遅いのが難点です。
監査ログの1レコードは CloudEvents 形式の Json データです。
下記のサンプル・コードでは、複数のJsonレコードを改行を入れながら1ファイルに保存します。

# get_audit_logs.py
import oci, os, json, argparse, sys
from datetime import datetime, timedelta, timezone

def get_audit_logs(start_time: datetime, 
                   end_time: datetime, 
                   compartment_id: str,
                   output_file: str,
                   overwrite: bool,
                   config_path: str = "~/.oci/config",
                   profile_name: str = "DEFAULT") -> int:
    """指定期間の監査ログを取得する"""
    
    # AuditClient を作成
    config = oci.config.from_file(config_path, profile_name)
    audit_client = oci.audit.audit_client.AuditClient(config)
    
    if os.path.exists(output_file):
        if overwrite:
            with open(output_file, 'w') as f:
                f.truncate(0)
        else:
            raise FileExistsError(output_file)

    with open(output_file, 'a+') as f:
        counter = 0
        page = None
        while True:
            # 監査ログを取得
            response = audit_client.list_events(
                compartment_id=compartment_id,
                start_time=start_time,
                end_time=end_time,
                page=page,
                retry_strategy=oci.retry.DEFAULT_RETRY_STRATEGY # Rate Limit 対策
            )

            logs = response.data # type:list[oci.audit.models.AuditEvent]

            # 監査ログをファイルに追加 - json形式で改行して複数レコードを格納
            for log in logs:
                logdata = json.loads(f"{log}")
                f.write(json.dumps(logdata) + "\n")        
            counter = counter + len(logs)

            # 次のページがなければ終了
            if response.has_next_page:
                page = response.next_page
                print(".", end="", file=sys.stderr, flush=True)
            else:
                print("", file=sys.stderr)
                break

    return counter

if __name__ == "__main__":

    parser = argparse.ArgumentParser(description="Get OCI audit logs.")
    parser.add_argument("--start", help="default = 24h ago")
    parser.add_argument("--end", help="default = now")
    parser.add_argument("--compartment-id", help="default = ${COMPERTMENT_ID}")
    parser.add_argument("--out", required=True, help="output file")
    parser.add_argument("--overwrite", action='store_true')
    args = parser.parse_args()

    # デフォルト過去24時間分のログを取得する
    end_time_str = args.end
    end_time = datetime.fromisoformat(end_time_str) if end_time_str else datetime.now(timezone.utc)
    start_time_str = args.start
    start_time = datetime.fromisoformat(start_time_str) if start_time_str else end_time - timedelta(days=1)

    output_file = args.out # "../data/audit_logs.json"
    compartment_id = args.compartment_id
    if not compartment_id:
        compartment_id = os.getenv("COMPARTMENT_ID")

    print(f"start={start_time}, end={end_time}, compartment-id={compartment_id}, out={output_file}")
    counter = get_audit_logs(start_time, end_time, compartment_id, output_file, args.overwrite)
    print(f"Saved {counter:,} audit log entries.")

方法2 - リアルタイムに監査ログを収集する

リアルタイムに発生する監査ログをコネクタ・ハブを経由してオブジェクト・ストレージに保存する方法です。大量の監査ログをほぼ遅延なく収集することができます。

バケットの準備

OCI コンソールの ストレージ > オブジェクト・ストレージとアーカイブ・ストレージ バケット からバケット (audit_log) を作成します。

image.png

コネクタ・ハブの設定

アナリティクスとAI > メッセージング コネクタ・ハブ から「コネクタの作成」を実行します。 

image.png

ソースを ロギング に、ターゲットを オブジェクト・ストレージ に設定します。

image.png

次に、ソース&ターゲットの構成を設定していきます。

ソースの構成では、適切なコンパートメントを選択し、ログ・グループは、監査ログなので _Audit を選択します。

image.png

ターゲットの設定では、先ほど作成したバケット audit_log を選択します。

オプションでバッチ・サイズとバッチ時間を指定できます。いずれかの条件に合致したタイミングでターゲットへの送信がトリガーされます。

image.png

必要なポリシーが設定されていない場合、ポリシーの作成を促されますので「作成」ボタンを押します。

image.png

ポリシーの内容は、以下のようなものです。

allow any-user to manage objects in compartment id <コンパートメントID> where all {request.principal.type='serviceconnector', target.bucket.name='audit_log', request.principal.compartment.id='<コンパートメントID>'}

最後に「作成」ボタンを押すと、サービス・コネクタが作成されます。

image.png

この状態で、コネクタはアクティブなので、監査ログが順次オブジェクト・ストレージに格納されていきます。
一時的にログの収集を中断したい場合は、非アクティブ化することができます。

image.png

オブジェクト・ストレージに保存された監査ログ

ターゲットに指定したバケットの中に サービス。コネクタIDの名前のついたフォルダがされ、そこに監査ログが順次保存されていきます。

image.png

各ファイルは、.gz で圧縮されていますが、AIDP ではこのファイルをそのまま読み込むことができます。
ファイルの中身は、Json 形式のデータが1行1レコードの形式で格納されています。この Json データは、API で取得した CloudEvents 形式と若干フォーマットが異なるので注意して下さい。

まとめ

監査ログ分析の第一歩として、生の監査ログをオブジェクト・ストレージに格納しました。
Part 2 では、AIDP を使って監査ログを分析するための前処理を行います。

11
4
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
11
4

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?