はじめに
IBM Cloud Data Engineは2024年1月18日をもって非推奨となり、2024年2月18日以降、新規インスタンスの作成はできなくなります。またこのサービスは、2025年1月18日をもってEoSとなるため、現在ご利用のインスタンスから後継サービスであるAnalytics Engineの利用へ変更する必要があります。
本記事では、Log Analysisでアーカイブされたデータに対し、Analytics Engineの利用してクエリ実行する方法について記載します。
参考:
- Migrating to IBM Analytics Engine (IBM Cloud docs)
- IBM Log Analysis with LogDNAで収集したログをアーカイブする (Qiita)
環境情報
- Log Analysis
- データソース
- 収集したログを
json.gz
形式で保存
- Data Engine
- EOS予定
- Analytics Engine
- 後継サービス
- IBM Cloud Object Storage
- データソース、実行スクリプト、クエリ実行結果データを保存
- 単一のICOS利用を想定
複数のICOSを利用する場合、それぞれに対して認証情報・サービス名の定義が必要となります。詳しくは以下をご覧ください。
IBM Cloud docs:Object Storage の資格情報について
Analytics Engineでのクエリ実行手順
準備
Analytics Engineをオーダー
IBM Cloudポータル上で「Analytics Engine」を検索し、オーダーページへ移動します。
2024年3月現在Dallas
、Frankfurt
で利用可能です。
SQLバッチクエリーの実行
Pythonスクリプトをダウンロード・編集
Analytics Engineで実行するためのPythonスクリプトをダウンロード、または以下をコピーします。
from pyspark.sql import SparkSession
def init_spark():
spark = SparkSession.builder.appName("read-write-data-to-cos-bucket").getOrCreate()
spark.conf.set('spark.sql.caseSensitive', True) ### 追記
sc = spark.sparkContext
return spark,sc
def read_customer_data(spark,sc):
print("starting reading data from given cos bucket")
# read the data from cos bucket (Pass CosBucketPathWithFileName e.g cos://cos-dataengine.service/customer.csv)
read_df = spark.read.option("header",True).json("<#CosBucketPathWithFileName>") ### json用関数へ変更
print("data successfully uploaded to cos bucket")
return read_df
def create_table(read_df):
# create table from the read data frame (pass tablename like customerTable)
read_df.createOrReplaceTempView("<#TableName>")
def query_to_fetch_data(spark):
# sql query to fetch data from the table ex: SELECT customerTable.companyName FROM customerTable
query_df = spark.sql("<#SQlQuery>")
# to print the data
query_df.show()
return query_df
def upload_data(query_df):
print("starting uploading data to given cos bucket")
# to write the data to the given cos bucket (pass CosBucket Filename to write the data e.g: cos://cos-dataengine.service/customer_query_data_3.csv)
query_df.write.json("<#CosBucketPathWithFileName>")
print("data successfully uploaded to cos bucket")
def main():
spark,sc = init_spark()
read_df = read_customer_data(spark,sc)
create_table(read_df)
query_df = query_to_fetch_data(spark)
upload_data(query_df)
if __name__ == '__main__':
main()
以下の項目を修正します。
- read_customer_data内
- <#CosBucketPathWithFileName>:バケット内に置かれたソースデータファイルへのパス
- 形式:
cos://<バケット名>.<任意のサービス名>/<ファイル名>
- ex.
cos://takason-bucket-for-analytics-engine.myservice/row-data.json.gz
- リージョン名は含めません
- 形式:
- <#CosBucketPathWithFileName>:バケット内に置かれたソースデータファイルへのパス
- create_table内
- #TableName:作成するテーブル名
- query_to_fetch_data内
- < #SQlQuery >:作成されたテーブルに対するクエリ
- ex.
"SELECT * FROM customerTable LIMIT 50"
- upload_data内
- < #CosBucketPathWithFileName >:出力データファイルのパス
cos://takason-bucket-for-analytics-engine.myservice/sql-migrate.json
ICOSへアップロード
修正したPythonスクリプトをICOSへアップロードします。
特に場所の指定はありませんが、今回はアーカイブ・データと同じICOSにアップロードしました。
Analytics Engine APIを実行
Analytics Engine APIを実行し、pythonスクリプトに記載されたクエリを実行します。
$ curl https://<APIエンドポイント>/spark_applications \
-H "Authorization: Bearer <iam-bearer-token>" \
-X POST -d '{ "application_details": {
"conf": {
"spark.hadoop.fs.cos.<サービス名>.endpoint": "<ICOS Direct エンドポイント>",
"spark.hadoop.fs.cos.<サービス名>.iam.api.key": "<api_key>"
},
"application": "<スクリプトへのパス>",
"runtime": {
"spark_version": "<ランタイムバージョン>"
}
}}'
各修正項目については、Service Credentials、ICOS詳細画面等より確認します。
-
< APIエンドポイント >:Analytics Engineの
Service Credentials
内「applications_api」
-
< iam-bearer-token >:(ibm cloud docs) アクセス・トークンの取得
-
< サービス名 >:Pythonスクリプト内で指定したサービス名と同じものを指定する
-
< ICOS Direct エンドポイント >:Analytics Engine詳細画面内の「Manage」タブの「 Endipoint」(後述のバージョンも記載あり)
-
< api_key >:ICOSインスタンスの
Service Credentials
内「apikey」 -
< スクリプトへのパス >:
- 形式:
cos://<バケット名>.<任意のサービス名>/<ファイル名>
- ex. cos://takason-bucket-for-analytics-engine.myservice/read_write_sql_query_data.py
- リージョン名は含めません
- 形式:
-
< ランタイムバージョン >:Analytics Engine詳細画面の
Configuration
に記載されているバージョン (例:3.3)
以下のAPIにより、ジョブ状態をチェックすることも可能です。
$ curl https://<APIエンドポイント>/spark_applications \
-H "Authorization: Bearer <iam-bearer-token>" \
-X GET | jq
{"id":"5dea99d0-15ed-4812-800e-5865ee07e4f5","state":"finished","start_time":"2024-03-14T05:48:02.661Z","finish_time":"2024-03-14T05:48:31.228Z","auto_termination_time":"2024-03-17T05:48:02.661Z","end_time":"2024-03-14T05:48:31.228Z"}
実行結果
Log Analysisでのログ出力
クエリ等を実行すると、Failed
というステータスになる場合があります。
そのような時はLog Analysisによりログ詳細を見ることが可能です。
Platform Metricsを有効化
以下記事を参考に、Analytics Engineと同じリージョンのLog AnalysisのPlatform Metricsを有効化します。
IBM Cloud docs:Configuring platform logs through the Observability dashboard
Analytics Engineでソート
Log Analysis ダッシュボードを開き、 Sources
をクリックし、 ibmanalyticsengine
を選択するとログが出力されます。