3
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

IBM Cloud:Data EngineからAnalytics Engineへの移行・利用方法について

Last updated at Posted at 2024-03-28

はじめに

IBM Cloud Data Engineは2024年1月18日をもって非推奨となり、2024年2月18日以降、新規インスタンスの作成はできなくなります。またこのサービスは、2025年1月18日をもってEoSとなるため、現在ご利用のインスタンスから後継サービスであるAnalytics Engineの利用へ変更する必要があります。

本記事では、Analytics Engineの利用方法について記載します。

参考:

環境情報

  • Data Engine
    • EOS予定
  • Analytics Engine
    • 後継サービス
  • IBM Cloud Object Storage
    • データソース、実行スクリプト、クエリ実行結果データを保存
    • 単一のICOS利用を想定

複数のICOSを利用する場合、それぞれに対して認証情報・サービス名の定義が必要となります。詳しくは以下をご覧ください。
IBM Cloud docs:Object Storage の資格情報について

Analytics Engineでのクエリ実行手順

準備

Analytics Engineをオーダー

IBM Cloudポータル上で「Analytics Engine」を検索し、オーダーページへ移動します。
2024年3月現在DallasFrankfurtで利用可能です。
Screenshot 2024-03-05 at 17.34.04.jpg
Screenshot 2024-03-05 at 18.04.53.jpg

SQLバッチクエリーの実行

Pythonスクリプトをダウンロード・編集

Analytics Engineで実行するためのPythonスクリプトをダウンロード、または以下をコピーします。

read_write_sql_query_data.py
from pyspark.sql import SparkSession

def init_spark():
  spark = SparkSession.builder.appName("read-write-data-to-cos-bucket").getOrCreate()
  sc = spark.sparkContext
  return spark,sc

def read_customer_data(spark,sc):
  print("starting reading data from given cos bucket")
  # read the data from cos bucket (Pass CosBucketPathWithFileName e.g cos://cos-dataengine.service/customer.csv)
  read_df = spark.read.option("header",True).csv("<#CosBucketPathWithFileName>")
  print("data successfully uploaded to cos bucket")
  return read_df

def create_table(read_df):
  # create table from the read data frame (pass tablename like customerTable)
  read_df.createOrReplaceTempView("<#TableName>")

def query_to_fetch_data(spark):
  # sql query to fetch data from the table ex: SELECT customerTable.companyName FROM customerTable
  query_df = spark.sql("<#SQlQuery>")
  # to print the data
  query_df.show()
  return query_df

def upload_data(query_df):
  print("starting uploading data to given cos bucket")
  # to write the data to the given cos bucket (pass CosBucket Filename to write the data e.g: cos://cos-dataengine.service/customer_query_data_3.csv)
  query_df.write.csv("<#CosBucketPathWithFileName>")
  print("data successfully uploaded to cos bucket")

def main():
  spark,sc = init_spark()
  read_df = read_customer_data(spark,sc)
  create_table(read_df)
  query_df = query_to_fetch_data(spark)
  upload_data(query_df)
  
if __name__ == '__main__':
  main()

以下の項目を修正します。

  • read_customer_data内
    • <#CosBucketPathWithFileName>:バケット内に置かれたソースデータファイルへのパス
      • 形式:cos://<バケット名>.<任意のサービス名>/<ファイル名>
      • ex. cos://takason-bucket-for-analytics-engine.myservice/row-data.csv
      • リージョン名は含めません
  • create_table内
    • #TableName:作成するテーブル名
  • query_to_fetch_data内
    • < #SQlQuery >:作成されたテーブルに対するクエリ
  • upload_data内
    • < #CosBucketPathWithFileName >:出力データファイルのパス
    • cos://takason-bucket-for-analytics-engine.myservice/sql-migrate.csv

ICOSへアップロード

修正したPythonスクリプトをICOSへアップロードします。
特に場所の指定はありませんが、今回はアーカイブ・データと同じICOSにアップロードしました。

Analytics Engine APIを実行

Analytics Engine APIを実行し、pythonスクリプトに記載されたクエリを実行します。

$ curl https://<APIエンドポイント>/spark_applications \
-H "Authorization: Bearer <iam-bearer-token>" \
-X POST -d '{ "application_details": {
   "conf": {
    "spark.hadoop.fs.cos.<サービス名>.endpoint": "<ICOS Direct エンドポイント>",
    "spark.hadoop.fs.cos.<サービス名>.iam.api.key": "<api_key>"
     },
   "application": "<スクリプトへのパス>",
   "runtime": {
       "spark_version": "<ランタイムバージョン>"
  }
}}'

各修正項目については、Service Credentials、ICOS詳細画面等より確認します。

  • < APIエンドポイント >:Analytics EngineのService Credentials内「applications_api」
    image.png

  • < iam-bearer-token >(ibm cloud docs) アクセス・トークンの取得

  • < サービス名 >:Pythonスクリプト内で指定したサービス名と同じものを指定する

  • < ICOS Direct エンドポイント >:Analytics Engine詳細画面内の「Manage」タブの「 Endipoint」(後述のバージョンも記載あり)
    image.png

  • < api_key >:ICOSインスタンスのService Credentials内「apikey」

  • < スクリプトへのパス >

    • 形式:cos://<バケット名>.<任意のサービス名>/<ファイル名>
    • ex. cos://takason-bucket-for-analytics-engine.myservice/read_write_sql_query_data.py
    • リージョン名は含めません
  • < ランタイムバージョン >:Analytics Engine詳細画面のConfigurationに記載されているバージョン (例:3.3)

以下のAPIにより、ジョブ状態をチェックすることも可能です。

$ curl https://<APIエンドポイント>/spark_applications \
-H "Authorization: Bearer <iam-bearer-token>" \
-X GET | jq

{"id":"5dea99d0-15ed-4812-800e-5865ee07e4f5","state":"finished","start_time":"2024-03-14T05:48:02.661Z","finish_time":"2024-03-14T05:48:31.228Z","auto_termination_time":"2024-03-17T05:48:02.661Z","end_time":"2024-03-14T05:48:31.228Z"}

実行結果

無事ファイルが作成されました。
image.png

Log Analysisでのログ出力

クエリ等を実行すると、Failedというステータスになる場合があります。
そのような時はLog Analysisによりログ詳細を見ることが可能です。

Platform Metricsを有効化

以下記事を参考に、Analytics Engineと同じリージョンのLog AnalysisのPlatform Metricsを有効化します。
IBM Cloud docs:Configuring platform logs through the Observability dashboard

Analytics Engineでソート

Log Analysis ダッシュボードを開き、 Sources をクリックし、 ibmanalyticsengine を選択するとログが出力されます。
image.png

以下はIAMの認証で失敗している例です(正しい権限が付与されたAPIキーに変えると成功しました)。
image.png

3
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
3
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?