はじめに
個人的に推していたIBM Cloudのサービス、Data Engine (旧SQL Query)が、2025年1月18日にEOSを迎えることになりました。こちらのガイドにあるように、Analytics Engine へのマイグレーションが必要になる、とのことです。
とはいえData Engineの使い勝手の良さを忘れられず、watsonx.dataへの移行について以前記事にしてみました。
しかし、SQLをそこそこ書き換える必要があったり、インスタンスに固定のコストがかかったりと様々な制約がありました。
そこで、Analytics Engineへの移行も検証してみたいと思います。Analytics Engineのオーダー方法など、基本的な情報についてはこちらの記事がとても参考になりますので、まずはそちらをご一読ください。
Data Engineで出来ていたこと
例えば、Activity TrackerのアーカイブログをICOSバケットに保管しているとして、そこから2024年4月のICOSに関連するログをCSV形式で抽出したい場合、Data Engineのコンソールを開いて、以下のようなSQLを1本実行すれば結果が得られました。
WITH logs (
SELECT from_unixtime((get_json_object(value, "$._source._ts") / 1000) + 32400, 'yyyy-MM-dd HH:mm:ss') as timestamp,
get_json_object(value, "$._source._host") as host,
get_json_object(value, "$._source.message") as message
FROM cos://jp-tok/[Activity Trackerのアーカイブ先バケット名]/year=2024/month=04/* STORED AS TEXT
)
SELECT timestamp, host, message FROM logs
WHERE host = 'cloud-object-storage'
ORDER BY timestamp
LIMIT 10
INTO cos://jp-tok/[結果出力先のICOSバケット名]/[適当なプレフィックス] JOBPREFIX NONE STORED AS CSV
watsonx.dataの記事同様、今回もこのSQLをAnalytics Engineで動かすにはどうすれば良いのかを検証してみました。
Pythonスクリプトの用意
参考記事にあるように、Pythonスクリプトを用意します。こちらにあるサンプルコードを少しいじったものが以下です。
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode
from pyspark.sql.types import *
# 実行するクエリ
query="""
WITH logs (
SELECT from_unixtime((get_json_object(value, "$._source._ts") / 1000) + 32400, 'yyyy-MM-dd HH:mm:ss') as timestamp,
get_json_object(value, "$._source._host") as host,
get_json_object(value, "$._source.message") as message
FROM aetemptable
)
SELECT timestamp, host, message FROM logs
WHERE host = 'cloud-object-storage'
ORDER BY timestamp
LIMIT 10
"""
# 入力ICOSパス(cos://[バケット名].input/[オブジェクトパス])
input_icos_path="cos://[Activity Trackerのアーカイブ先バケット名].input/year=2024/month=04/*"
# 出力ICOSパス(cos://[バケット名].output/[オブジェクトパス])
output_icos_path="cos://[結果出力先のICOSバケット名].output/logana-out.csv"
# 初期化
def init_spark():
spark = SparkSession.builder.appName("logana").enableHiveSupport().getOrCreate()
spark.conf.set('spark.sql.caseSensitive', True)
#spark.conf.set("spark.sql.session.timeZone", "Asia/Tokyo") ### from_unixtimeのため(クエリ内で"+32400"する場合はコメントアウト)
sc = spark.sparkContext
return spark,sc
# 入力となるログを指定
def read_customer_data(spark,sc):
print("starting reading data from given cos bucket")
read_df = spark.read.text(input_icos_path) ### クエリ内でget_json_objectするのであえてtextで読み込む
return read_df
# 入力テーブル作成
def create_table(read_df):
read_df.createOrReplaceTempView("aetemptable")
# クエリ実行
def query_to_fetch_data(spark):
print("starting query")
query_df = spark.sql(query)
return query_df
# クエリ実行結果の保存
def upload_data(query_df):
print("starting uploading data to given cos bucket")
query_df.write.option("header", True).mode("overwrite").csv(output_icos_path)
print("data successfully uploaded to cos bucket")
# メインルーチン
def main():
spark,sc = init_spark()
read_df = read_customer_data(spark,sc)
create_table(read_df)
query_df = query_to_fetch_data(spark)
upload_data(query_df)
if __name__ == '__main__':
main()
まず冒頭で変数query
を定義し、SQLを記述します。Data Engineで動かしていたものからの変更点は以下です。
- FROM節をテーブルaetemptableに変更(スクリプト内でテーブルを作成します)
- INTO節を削除(スクリプト内で別途CSV出力します)
# 実行するクエリ
query="""
WITH logs (
SELECT from_unixtime((get_json_object(value, "$._source._ts") / 1000) + 32400, 'yyyy-MM-dd HH:mm:ss') as timestamp,
get_json_object(value, "$._source._host") as host,
get_json_object(value, "$._source.message") as message
FROM aetemptable
)
SELECT timestamp, host, message FROM logs
WHERE host = 'cloud-object-storage'
ORDER BY timestamp
LIMIT 10
"""
他に変更すべき点は無いので、Data Engineからの移行という観点では、結果的にwatsonx.dataより楽でした。
次に入出力に使用するICOSのパスをそれぞれinput_icos_path
とoutput_icos_path
に定義します。Data Engineの際と書式が変わります。まずリージョンの記載は不要です。さらに今回はICOSバケットを複数扱うため、バケット名の後ろにそれぞれ.input
、.output
と付けておきます。
# 入力ICOSパス(cos://[バケット名].input/[オブジェクトパス])
input_icos_path="cos://[Activity Trackerのアーカイブ先ICOSバケット名].input/year=2024/month=08/*"
# 出力ICOSパス(cos://[バケット名].output/[オブジェクトパス])
output_icos_path="cos://[結果出力先のICOSバケット名].output/logana-out.csv"
初期化の部分は、サンプルコードに対して、spark.sql.caseSensitive
の設定を追加しています。Log Analysisのログは、英大文字小文字の違いを除いてスペルが全く同じフィールド名が混在していることがあるためです。また、今回は使用しませんが、UNIX時間をローカル時間に変換する場合、spark.sql.session.timeZone
でタイムゾーンを設定しておくといいケースがあるので、必要に応じて使用してください。
# 初期化
def init_spark():
spark = SparkSession.builder.appName("logana").enableHiveSupport().getOrCreate()
spark.conf.set('spark.sql.caseSensitive', True)
#spark.conf.set("spark.sql.session.timeZone", "Asia/Tokyo") ### from_unixtimeのため(クエリ内で"+32400"する場合はコメントアウト)
sc = spark.sparkContext
return spark,sc
JSON形式のログの処理ですが、クエリの中でget_json_object
を使用して実施しているため、あえてread.text
で読み取ります。
# 入力となるログを指定
def read_customer_data(spark,sc):
print("starting reading data from given cos bucket")
read_df = spark.read.text(input_icos_path) ### クエリ内でget_json_objectするのであえてtextで読み込む
return read_df
そして読み取り用のテーブルを作成します。ここで指定したテーブル名をSQLのFROM節で使用しています。
# 入力テーブル作成
def create_table(read_df):
read_df.createOrReplaceTempView("aetemptable")
さらにSQLを実行します。
# クエリ実行
def query_to_fetch_data(spark):
print("starting query")
query_df = spark.sql(query)
return query_df
最後にSQL実行結果をCSV形式で保存します。Data Engineの場合、同じ出力先をINTOで指定していた場合は上書きされていましたが、Analytics Engineの場合、デフォルトでは上書き出来ずエラー(already exist)になりました。今回はData Engineと同じ振る舞いにしたかったため、mode("overwrite")
を追加して上書きするようにしていますが、上書きさせたくない場合はこの部分を削除してください。
# クエリ実行結果の保存
def upload_data(query_df):
print("starting uploading data to given cos bucket")
query_df.write.option("header", True).mode("overwrite").csv(output_icos_path)
print("data successfully uploaded to cos bucket")
作成したPythonスクリプトを、ICOSバケットにアップロードすれば、あとは実行です。
実行
Analytics Engine APIを実行するために必要な情報を環境変数にセットします。
今回は、Pythonスクリプトを保存したバケット、処理するログが保管されているバケット、CSVファイルを出力するバケットを全て別に出来るように環境変数を分けているので、それぞれのAPIキーとエンドポイントを設定します。
### 環境変数設定
# Analytics Engineの資格情報にあるapplication_api エンドポイント
ANALYTICS_ENGINE_URL=https://api.us-south.ae.cloud.ibm.com/v3/analytics_engines/*****/spark_applications
# Analytics Engineの資格情報にあるAPIキー
export IBMCLOUD_API_KEY=*****
# 使用するPythonスクリプト内で指定したappName
PYTHON_APPNAME=logana
# Pythonスクリプトを保管したICOSバケットのAPIキーとエンドポイント
PYTHON_ICOS_APIKEY=*****
PYTHON_ICOS_ENDPOINT=s3.direct.jp-tok.cloud-object-storage.appdomain.cloud
# 使用するPythonスクリプトのパス
# cos://<バケット名>.<任意のサービス名>/<ファイル名>
PYTHON_SCRIPTPATH=cos://[バケット名].${PYTHON_APPNAME}/logana.py
# 入出力ICOSバケットのAPIキーとエンドポイント
INPUT_ICOS_APIKEY=*****
INPUT_ICOS_ENDPOINT=s3.direct.jp-tok.cloud-object-storage.appdomain.cloud
OUTPUT_ICOS_APIKEY=*****
OUTPUT_ICOS_ENDPOINT=s3.direct.jp-tok.cloud-object-storage.appdomain.cloud
そして、ibmcloud CLIを使ってトークンを取得し、それを使用してAnalytics Engine APIを実行します。
ibmcloud login -r jp-tok
IAM_BEARER_TOKEN=$(ibmcloud iam oauth-tokens -o json | jq -r .iam_token)
ANALYTICS_ENGINE_ID=$(curl -s ${ANALYTICS_ENGINE_URL} \
-H "Authorization: ${IAM_BEARER_TOKEN}" \
-X POST -d "{ \"application_details\": {
\"conf\": {
\"spark.hadoop.fs.cos.input.iam.api.key\": \"${INPUT_ICOS_APIKEY}\",
\"spark.hadoop.fs.cos.input.endpoint\": \"${INPUT_ICOS_ENDPOINT}\",
\"spark.hadoop.fs.cos.output.iam.api.key\": \"${OUTPUT_ICOS_APIKEY}\",
\"spark.hadoop.fs.cos.output.endpoint\": \"${OUTPUT_ICOS_ENDPOINT}\",
\"spark.hadoop.fs.cos.${PYTHON_APPNAME}.iam.api.key\": \"${PYTHON_ICOS_APIKEY}\",
\"spark.hadoop.fs.cos.${PYTHON_APPNAME}.endpoint\": \"${PYTHON_ICOS_ENDPOINT}\"
},
\"application\": \"${PYTHON_SCRIPTPATH}\"
}}" | jq -r .id)
API呼び出しが成功すると、ANALYTICS_ENGINE_ID
にアプリケーションIDがセットされるので、以下のコマンドでステータスを確認することができます。もちろん、IBM Cloud Webコンソールでも確認可能です。
curl ${ANALYTICS_ENGINE_URL} -H "Authorization: ${IAM_BEARER_TOKEN}" -X GET -s | jq -r ".applications[] | select( .id | test(\"${ANALYTICS_ENGINE_ID}\")) | .state"
APIを呼び出して、処理が完了するまで確認する一連の処理をシェルで実装すると、以下のような形になりますので、参考にしてみてください。
### 環境変数設定
# Analytics Engineの資格情報にあるapplication_api エンドポイント
ANALYTICS_ENGINE_URL=https://api.us-south.ae.cloud.ibm.com/v3/analytics_engines/*****/spark_applications
# Analytics EngineのSparkバージョン
ANALYTICS_ENGINE_RUNTIMEVERSION=3.4
# Analytics Engineの資格情報にあるAPIキー
export IBMCLOUD_API_KEY=*****
# 使用するPythonスクリプト内で指定したappName
PYTHON_APPNAME=logana
# Pythonスクリプトを保管したICOSバケットのAPIキーとエンドポイント
PYTHON_ICOS_APIKEY=*****
PYTHON_ICOS_ENDPOINT=s3.direct.jp-tok.cloud-object-storage.appdomain.cloud
# 使用するPythonスクリプトのパス
# cos://<バケット名>.<任意のサービス名>/<ファイル名>
PYTHON_SCRIPTPATH=cos://[バケット名].${PYTHON_APPNAME}/logana.py
# 入出力ICOSバケットのAPIキーとエンドポイント
INPUT_ICOS_APIKEY=*****
INPUT_ICOS_ENDPOINT=s3.direct.jp-tok.cloud-object-storage.appdomain.cloud
OUTPUT_ICOS_APIKEY=*****
OUTPUT_ICOS_ENDPOINT=s3.direct.jp-tok.cloud-object-storage.appdomain.cloud
### 処理
# 開始
echo "$(date '+%Y/%m/%d %H:%M:%S') START"
# IBM Cloudにログインし、トークンを取得
ibmcloud config --check-version false
ibmcloud login -r jp-tok
IAM_BEARER_TOKEN=$(ibmcloud iam oauth-tokens -o json | jq -r .iam_token)
# トークンが取れていなければエラー
if [ "${IAM_BEARER_TOKEN}" = "" ]; then
echo "$(date '+%Y/%m/%d %H:%M:%S') ERROR on login to IBM Cloud."
exit 1
fi
# 処理を実行
ANALYTICS_ENGINE_ID=$(curl -s ${ANALYTICS_ENGINE_URL} \
-H "Authorization: ${IAM_BEARER_TOKEN}" \
-X POST -d "{ \"application_details\": {
\"conf\": {
\"spark.hadoop.fs.cos.input.iam.api.key\": \"${INPUT_ICOS_APIKEY}\",
\"spark.hadoop.fs.cos.input.endpoint\": \"${INPUT_ICOS_ENDPOINT}\",
\"spark.hadoop.fs.cos.output.iam.api.key\": \"${OUTPUT_ICOS_APIKEY}\",
\"spark.hadoop.fs.cos.output.endpoint\": \"${OUTPUT_ICOS_ENDPOINT}\",
\"spark.hadoop.fs.cos.${PYTHON_APPNAME}.iam.api.key\": \"${PYTHON_ICOS_APIKEY}\",
\"spark.hadoop.fs.cos.${PYTHON_APPNAME}.endpoint\": \"${PYTHON_ICOS_ENDPOINT}\"
},
\"application\": \"${PYTHON_SCRIPTPATH}\",
\"runtime\": {
\"spark_version\": \"${ANALYTICS_ENGINE_RUNTIMEVERSION}\"
}
}}" | jq -r .id)
# idが取得できなかった場合は正常に処理を実行できていないのでエラー
if [ "${ANALYTICS_ENGINE_ID}" = "" ]; then
echo "$(date '+%Y/%m/%d %H:%M:%S') ERROR on starting. See platform logs."
exit 1
fi
echo "$(date '+%Y/%m/%d %H:%M:%S') ID: ${ANALYTICS_ENGINE_ID}"
# ループしてステータスが成功か失敗になるまで確認
ANALYTICS_ENGINE_STATE=
while [ "${ANALYTICS_ENGINE_STATE}" != "finished" ] && [ "${ANALYTICS_ENGINE_STATE}" != "failed" ]; do
sleep 5
ANALYTICS_ENGINE_STATE=$(curl ${ANALYTICS_ENGINE_URL} -H "Authorization: ${IAM_BEARER_TOKEN}" -X GET -s | jq -r ".applications[] | select( .id | test(\"${ANALYTICS_ENGINE_ID}\")) | .state")
echo "$(date '+%Y/%m/%d %H:%M:%S') STATE: ${ANALYTICS_ENGINE_STATE}"
done
# ステータスがfailedの場合は失敗
if [ "${ANALYTICS_ENGINE_STATE}" = "failed" ]; then
echo "$(date '+%Y/%m/%d %H:%M:%S') ERROR on execution. See platform logs."
exit 1
fi
# 正常終了
echo "$(date '+%Y/%m/%d %H:%M:%S') OK"
まとめ
正直最初はとっつきにくかったAnalytics Engineですが、一度動くPythonスクリプトとシェルが用意できれば、これまでData Engineで動かしていたクエリを移行することが出来そうな気がしてきました。
ただ、実運用として、Data EngineをWatson Studioから呼び出す使い方をしているケースも多いかもしれません。そうすると、今回シェルで用意した部分をPython化すると、同じようにWatson StudioからAnalytics Engineを呼び出すように出来るかもしれませんが、それはまた別途・・。