IBM Cloud Data Engine(旧SQL Query)は2024年1月18日をもって非推奨となり、2024年2月18日以降、新規インスタンスの作成が出来なくなりました。さらに、2025年1月18日をもってEoSとなるため、現在ご利用のインスタンスから後継サービスであるAnalytics Engineなどの利用へ変更する必要があります。
はじめに
過去記事で首題について記述しましたが、以下のマニュアルや、Data Engine(SQL Query)に追加されたサンプル「Read as text queries」で、LogDNAのアーカイブログを検索するためのより良い方法が示されたので、改めて。
Query data with an unstable schema
https://cloud.ibm.com/docs/sql-query?topic=sql-query-overview#query-logs
必要な環境
- LogDNAアーカイブ出力先ICOSバケット
- Data Engine (SQL Query)
- クエリー実行結果保存先ICOSバケット
- Watson Studio
- Python 3.10
- 上記全てを利用可能なAPIキー
Notebook
過去記事同様、まずSQL Queryのモジュールを導入、importします。
# 事前処理
!pip -qqq install ibmcloudsql
try:
import ibmcloudsql
except Exception as e:
print(e)
次に、実行中のメッセージに日本時間のタイムスタンプをつけて見やすくしたいので、必要な定義をします。
# Notebook実行時のメッセージ表示用
from datetime import datetime, date
import pytz
import os
# 日本時間にするための事前設定
JST = pytz.timezone('Asia/Tokyo')
# 時刻付きメッセージ表示
def printmsg(_messagestr):
print("[" + str(datetime.now().astimezone(JST)) + "] " + _messagestr)
return
続いて、諸々の変数を設定しておきます。それぞれの意味はソース内のコメントを参照ください。
target_date_path_list
には、処理したいアーカイブされたログのプレフィックスをリスト形式で指定します。記載例のように日単位で指定するか、'year=2023/month=04', 'year=2023/month=05'
と月単位でも指定可能ですが、1つのクエリーの処理量があまり大量になると処理時間の上限超過によるエラーになったりするので、その場合は適当に分散されるようにします。
# 環境設定
cloud_api_key='*********' # APIキー
sql_crn = 'crn:**************::' # SQL QueryのCRN
# 入力
target_cos_endpoint='cos://jp-tok/**************' # LogDNAアーカイブ先バケット名
target_date_path_list = [ # アーカイブされたオブジェクトのプレフィックスリスト
'year=2023/month=04/day=29/',
'year=2023/month=04/day=30/',
]
# 出力
output_cos_endpoint = 'cos://jp-tok/*********************/' # 結果出力先バケットへのパス
output_prefix='test-search-0501/' # 結果出力先パス
output_tmp=output_cos_endpoint+output_prefix+'tmp/' # プレフィックスリスト単位の一時出力先
output_result=output_cos_endpoint+output_prefix+'result/' # 最終結果出力先
ここで、Data Engine(SQL Query)でクエリーを実行するための関数を定義します。関数内で結構頑張ってリトライしているので、たまにInternal errorになったり、最大同時実行数を超過してエラーになってもへこたれません。ただ、どんなエラーでもなり振り構わずリトライしまくるので、クエリー自体が正常かどうかは、最初に流す際に見ておきます。
# SQL Queryログイン
sqlClient = ibmcloudsql.SQLQuery(cloud_api_key, sql_crn)
sqlClient.logon()
# SQL Query呼び出し関数
import time
def call_sqlquery(query):
printmsg('以下のクエリーを実行します')
print(query)
print()
# 試行回数とインターバル
tries = 10
interval = 180
for tc in range(0, tries+1):
try:
# ジョブ投入 (例外が発生すれば、forループでリトライ)
jobid = sqlClient.submit_sql(query)
time.sleep(10)
# クエリー実行状況チェックループ
while True:
# このチェックで例外が発生すれば、forループでリトライ
chkstatus = sqlClient.get_job(jobid)['status']
# ステータスが終了時のものなら、チェックwhileループを抜ける
if chkstatus == "failed" or chkstatus == "completed":
printmsg('done')
break
# チェックループのスリープ
printmsg('running')
time.sleep(10)
# ステータスがfailed
if chkstatus == "failed":
if ('Path does not exist' in sqlClient.get_job(jobid)['error_message']):
# ログが存在しない場合はループを抜ける
printmsg('skipped')
printmsg(sqlClient.get_job(jobid)['error_message'])
break
else:
# それ以外は例外を上げて、forループ内でリトライ
printmsg('failed')
printmsg(sqlClient.get_job(jobid)['error_message'])
raise Exception(sqlClient.get_job(jobid)['error_message'])
# ステータスがcompletedなら、リトライforループを抜ける
elif chkstatus == "completed":
printmsg('completed')
break
else:
printmsg('unknown status : ' + chkstatus)
except Exception as e:
printmsg('error')
printmsg(str(e))
if tc < tries-1:
printmsg('will retry after ' + str(interval) + 's')
time.sleep(interval)
printmsg('retry count : ' + str(tc+1) + ' / ' + str(tries-1))
continue
# 最大リトライ数に達していたらエラー
if tc == tries:
printmsg('NG, max retried')
failedflgz = True
else:
printmsg('OK')
return jobid
準備ができたので、まずはリストで与えられたプレフィックスを順に処理します。もし目的のフィールドが無い場合は、クエリーに追加してください。WHERE文節も適宜変更します。
# リストで設定されたプレフィックスのオブジェクトを順に処理
for target_date in target_date_path_list:
# 実行するクエリー
query = """
WITH logs (
SELECT from_unixtime((get_json_object(value, "$._source._ts") / 1000) + 32400, 'yyyy-MM-dd HH:mm:ss') as timestamp, --- UNIXミリ秒を日本時間に変換
get_json_object(value, "$._source._app") as app,
get_json_object(value, "$._source._host") as host,
get_json_object(value, "$._source._file") as file,
get_json_object(value, "$._source._line") as line
FROM """+target_cos_endpoint+"""/"""+target_date+"""* STORED AS TEXT
ORDER BY get_json_object(value, "$._source._ts")
)
SELECT * FROM logs
WHERE host = 'vs-hirokuda' AND file = '/var/log/audit/audit.log' --- 特定ホストの特定ログを抽出
INTO """+output_tmp+target_date+""" JOBPREFIX NONE STORED AS CSV"""
# 実行
jobid=call_sqlquery(query)
# 結果サマリー
printmsg(str(sqlClient.get_job(jobid)))
最後に結果を1つにまとめて、その一部を表示します。変数output_result
で設定したICOSパスに結果が保管されます。また、UTCとの時差の関係で、月を跨ぐアーカイブログファイルを集計し、最後に目的の月のみ抜粋したい場合は、ここでWHERE timestamp LIKE '2023-04%'
とすると、その月のログのみ抽出されます。
# 複数オブジェクトに対する結果を1つにまとめる
query = """
SELECT *
FROM """+output_tmp+""" STORED AS CSV
ORDER BY timestamp
INTO """+output_result+""" JOBPREFIX NONE STORED AS CSV"""
# 実行
jobid=call_sqlquery(query)
# 結果サマリー
printmsg(str(sqlClient.get_job(jobid)))
# 結果内容(一部)表示
sqlClient.get_result(jobid)
JSON形式のログを、STORED AS JSON
ではなく、STORED AS TEXT
で処理して必要なフィールドをget_json_object
で抽出するこの方式は、面倒ですが、(過去記事が残念に思えるほど)超高速なので、おすすめです。