LoginSignup
1
0

IBM CloudのLogDNAで収集した、30日以上過去のログを検索する方法(改善版)

Last updated at Posted at 2023-05-01

IBM Cloud Data Engine(旧SQL Query)は2024年1月18日をもって非推奨となり、2024年2月18日以降、新規インスタンスの作成が出来なくなりました。さらに、2025年1月18日をもってEoSとなるため、現在ご利用のインスタンスから後継サービスであるAnalytics Engineなどの利用へ変更する必要があります。

はじめに

過去記事で首題について記述しましたが、以下のマニュアルや、Data Engine(SQL Query)に追加されたサンプル「Read as text queries」で、LogDNAのアーカイブログを検索するためのより良い方法が示されたので、改めて。

Query data with an unstable schema
https://cloud.ibm.com/docs/sql-query?topic=sql-query-overview#query-logs

必要な環境

  • LogDNAアーカイブ出力先ICOSバケット
  • Data Engine (SQL Query)
  • クエリー実行結果保存先ICOSバケット
  • Watson Studio
    • Python 3.10
  • 上記全てを利用可能なAPIキー

Notebook

過去記事同様、まずSQL Queryのモジュールを導入、importします。

Cell-01
# 事前処理
!pip -qqq install ibmcloudsql
try:
    import ibmcloudsql
except Exception as e:
    print(e)

次に、実行中のメッセージに日本時間のタイムスタンプをつけて見やすくしたいので、必要な定義をします。

Cell-02
# Notebook実行時のメッセージ表示用
from datetime import datetime, date
import pytz
import os

# 日本時間にするための事前設定
JST = pytz.timezone('Asia/Tokyo')

# 時刻付きメッセージ表示
def printmsg(_messagestr):
    print("[" + str(datetime.now().astimezone(JST)) + "] " + _messagestr)
    return

続いて、諸々の変数を設定しておきます。それぞれの意味はソース内のコメントを参照ください。

target_date_path_listには、処理したいアーカイブされたログのプレフィックスをリスト形式で指定します。記載例のように日単位で指定するか、'year=2023/month=04', 'year=2023/month=05'と月単位でも指定可能ですが、1つのクエリーの処理量があまり大量になると処理時間の上限超過によるエラーになったりするので、その場合は適当に分散されるようにします。

Cell-03
# 環境設定
cloud_api_key='*********' # APIキー
sql_crn = 'crn:**************::' # SQL QueryのCRN

# 入力
target_cos_endpoint='cos://jp-tok/**************'  #  LogDNAアーカイブ先バケット名
target_date_path_list = [ # アーカイブされたオブジェクトのプレフィックスリスト
    'year=2023/month=04/day=29/',
    'year=2023/month=04/day=30/',
]

# 出力
output_cos_endpoint = 'cos://jp-tok/*********************/' # 結果出力先バケットへのパス
output_prefix='test-search-0501/' # 結果出力先パス
output_tmp=output_cos_endpoint+output_prefix+'tmp/' # プレフィックスリスト単位の一時出力先
output_result=output_cos_endpoint+output_prefix+'result/' # 最終結果出力先

ここで、Data Engine(SQL Query)でクエリーを実行するための関数を定義します。関数内で結構頑張ってリトライしているので、たまにInternal errorになったり、最大同時実行数を超過してエラーになってもへこたれません。ただ、どんなエラーでもなり振り構わずリトライしまくるので、クエリー自体が正常かどうかは、最初に流す際に見ておきます。

Cell-04
# SQL Queryログイン
sqlClient = ibmcloudsql.SQLQuery(cloud_api_key, sql_crn)
sqlClient.logon()

# SQL Query呼び出し関数
import time

def call_sqlquery(query):
    printmsg('以下のクエリーを実行します')
    print(query)
    print()

    # 試行回数とインターバル
    tries = 10
    interval = 180

    for tc in range(0, tries+1):
        try:
            # ジョブ投入 (例外が発生すれば、forループでリトライ)
            jobid = sqlClient.submit_sql(query)    
            time.sleep(10)

            # クエリー実行状況チェックループ
            while True:
                # このチェックで例外が発生すれば、forループでリトライ
                chkstatus = sqlClient.get_job(jobid)['status']

                # ステータスが終了時のものなら、チェックwhileループを抜ける
                if chkstatus == "failed" or  chkstatus == "completed":
                    printmsg('done')
                    break

                # チェックループのスリープ
                printmsg('running')
                time.sleep(10)

            # ステータスがfailed
            if chkstatus == "failed":
                if ('Path does not exist' in sqlClient.get_job(jobid)['error_message']):
                    # ログが存在しない場合はループを抜ける
                    printmsg('skipped')
                    printmsg(sqlClient.get_job(jobid)['error_message'])
                    break
                else:
                    # それ以外は例外を上げて、forループ内でリトライ
                    printmsg('failed')
                    printmsg(sqlClient.get_job(jobid)['error_message'])
                    raise Exception(sqlClient.get_job(jobid)['error_message'])
            # ステータスがcompletedなら、リトライforループを抜ける
            elif chkstatus == "completed":
                printmsg('completed')
                break
            else:
                printmsg('unknown status : ' + chkstatus)

        except Exception as e:
            printmsg('error')
            printmsg(str(e))
            if tc < tries-1:
                printmsg('will retry after ' + str(interval) + 's')
                time.sleep(interval)
                printmsg('retry count : ' + str(tc+1) + ' / ' + str(tries-1))
            continue

    # 最大リトライ数に達していたらエラー
    if tc == tries:
        printmsg('NG, max retried')
        failedflgz = True
    else:
        printmsg('OK')   
    
    return jobid

準備ができたので、まずはリストで与えられたプレフィックスを順に処理します。もし目的のフィールドが無い場合は、クエリーに追加してください。WHERE文節も適宜変更します。

Cell-05
# リストで設定されたプレフィックスのオブジェクトを順に処理
for target_date in target_date_path_list:
    # 実行するクエリー
    query = """
    WITH logs (
    SELECT from_unixtime((get_json_object(value, "$._source._ts") / 1000) + 32400, 'yyyy-MM-dd HH:mm:ss') as timestamp, --- UNIXミリ秒を日本時間に変換
    get_json_object(value, "$._source._app") as app,
    get_json_object(value, "$._source._host") as host,
    get_json_object(value, "$._source._file") as file,
    get_json_object(value, "$._source._line") as line
    FROM """+target_cos_endpoint+"""/"""+target_date+"""*  STORED AS TEXT
    ORDER BY get_json_object(value, "$._source._ts")
    )
    SELECT * FROM logs
    WHERE host = 'vs-hirokuda' AND file = '/var/log/audit/audit.log'   --- 特定ホストの特定ログを抽出
    INTO """+output_tmp+target_date+""" JOBPREFIX NONE STORED AS CSV"""

    # 実行
    jobid=call_sqlquery(query)
    
    # 結果サマリー
    printmsg(str(sqlClient.get_job(jobid)))

最後に結果を1つにまとめて、その一部を表示します。変数output_resultで設定したICOSパスに結果が保管されます。また、UTCとの時差の関係で、月を跨ぐアーカイブログファイルを集計し、最後に目的の月のみ抜粋したい場合は、ここでWHERE timestamp LIKE '2023-04%'とすると、その月のログのみ抽出されます。

Cell-06
# 複数オブジェクトに対する結果を1つにまとめる
query = """
SELECT *
FROM """+output_tmp+"""  STORED AS CSV
ORDER BY timestamp
INTO """+output_result+""" JOBPREFIX NONE STORED AS CSV"""

# 実行
jobid=call_sqlquery(query)

# 結果サマリー
printmsg(str(sqlClient.get_job(jobid)))

# 結果内容(一部)表示
sqlClient.get_result(jobid)

JSON形式のログを、STORED AS JSONではなく、STORED AS TEXTで処理して必要なフィールドをget_json_objectで抽出するこの方式は、面倒ですが、(過去記事が残念に思えるほど)超高速なので、おすすめです。

1
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
0