3
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

watsonx.dataのSparkエンジンでオブジェクト・ストレージのデータを使って機械学習モデルを作ってみた

Posted at

watsonx.dataPrestoエンジンに加えて以下の3つのエンジンがサポートされ、用途に応じて、watsonx.dataで管理しているデータにアクセスするエンジンを選択できるようになりました。

今回は、IBM Analytics Engine (Spark) エンジンを使って、PySparkで、オブジェクト・ストレージに保存されたデータにアクセスして、機械学習モデルを作ってみました。

この記事の内容

  • 今回試した環境

    • IBM Cloud Pak for Data (CP4D) 4.8.0 上に以下のサービスを導入
      • watsonx.data (SW版) 1.1.0
      • Analytics Engine powered by Apache Spark 4.8.0
        • Spark 3.3 と 3.4 をサポート
      • Watson Studio 4.8.0
      • Watson Machine Learning 4.8.0
        • Spark 3.3 をサポート
  • 機械学習モデルとしては、以下のサイトに記載されているデータを使って二項分類モデルをランダム・フォレストとLightGBMで作成

  • 学習&テスト・データはwatsonx.dataを使って、オブジェクト・ストレージ上に保存

    • 学習&テスト・データファイルを小さめのファイル(1000レコード、100KB)と残りの2つに分割した。小さめのファイルを使ってファイルから表を作成する機能でテーブルの作成とデータの投入を行い、取り込みジョブの作成機能で残りのデータを投入した。
  • Spark環境でのプログラムの実行は、ストレージ・ボリュームを作成し、そこにプログラムをアップロードし、REST APIのPOSTメソッドで実行要求

  • ランダム・フォレストのモデルは、Watson Studioのプロジェクトに保存後、Watson Machine Learning (WML)のデプロイメント・スペースにプロモート&デプロイ (これらをモデルを作成するプログラムに組み込んだ)

  • LightGBMのモデルをWMLにデプロイするためには、そのモデルで使用しているパッケージをWMLの環境に組み込む必要があるが、現時点ではWMLのSpark環境にパッケージを組み込む機能がサポートされていなかった。別のCloud環境にデプロイして使うシナリオに利用したい。

  • LightGBMに必要なパッケージは以下のようにすることで導入できた。

    • パッケージを導入するために指定するプロパティ値はプログラム内ではなく、プログラムを実行要求する際に設定する
    • パッケージのキャッシュに使用するパスはプロセス間で共有しないとエラーとなるため、ストレージ・ボリュームを使って共有させる(spark.jars.ivyプロパティの設定)

この記事の流れ

この記事は、以下の流れで説明する。

  1. 学習&テスト・データの投入
  2. Sparkの導入と構成
  3. Sparkでwatsonx.dataのデータにアクセスしてみる
  4. ランダム・フォレストのモデルを作成してみる
  5. LightGBMのモデルを作成してみる

1. 学習&テスト・データの投入

別の記事で紹介したファイルから表を作成する機能を使うと、ファイルからテーブルを作成し、データをロードできるが、許容できるファイルの最大サイズが2MBとなっている。今回使用する学習&テスト・データはそのサイズを超えている(約3.6MB)。

この記事で紹介する取り込みジョブの作成機能を使うと既存のテーブルにデータを投入することができ、最大サイズも特に明記されてない。しかしながら、新規にテーブルを作成する機能は現時点では提供されていない。

そこで、学習&テスト・データファイルを小さめのファイル(1000レコード、約0.1MB)と残りの2つに分割し、最初のファイルでファイルから表を作成する機能を使ってテーブルの作成とデータのロードを行い、残りの学習&テスト・データは取り込みジョブの作成機能を使ってロードすることとした。

ちなみに、テーブルをあらかじめ作成しておき、その後、全データを取り込みジョブの作成でロードすることも可能である。今回のテーブルを作成するためには、以下のCREATE TABLE文を発行する。

CREATE TABLE "icos1"."ai"."adult_census" (
	"age" INT,
	"workclass" VARCHAR,
	"fnlwgt" INT,
	"education" VARCHAR,
	"education_num" INT,
	"Marital" VARCHAR,
	"occupation" VARCHAR,
	"relationship" VARCHAR,
	"race" VARCHAR,
	"sex" VARCHAR,
	"capitalgain" INT,
	"loss" INT,
	"hoursper" INT,
	"citizen_status" VARCHAR,
	"label" VARCHAR
);

1-1. ICOSのバケットをwatsonx.dataのカタログとして組み込む

以下の手順に従う。以下の記事同様、バケット名をwatsonx-data-mo-icos-2023、カタログ名をICOS1とした。

1-2. 学習&テスト・データを用意し、watsonx.dataでICOSにテーブルを作成し、データを投入する

まず、以下のコマンドでデータを取得する
(あるいは、以下のURLをブラウザで開き、表示されたcsvファイルをダウンロードする)

wget https://raw.githubusercontent.com/IBM/watson-openscale-samples/main/IBM%20Cloud/WML/assets/data/adult_census/Indirect_bias_AdultCensusdata.csv

上記ファイルを、最初の1000行と残りのデータに分割する。残りのデータに対するヘッダーは付けても付けなくても良い。

内容 ファイル名 サイズ(バイト)
全て Indirect_bias_AdultCensusdata.csv 3551143
最初の1000行 Indirect_bias_AdultCensusdata_1000.csv 109027
残り Indirect_bias_AdultCensusdata_other_than_1000.csv 3442248

分割した1000行のデータファイルを使って、以下の手順に従い、テーブルの作成とデータのロードを行う(スキーマ名はai、テーブル名はadult_censusとした)。

1-3. 作成したテーブルに残りのデータをロードする

ロードするデータはwatsonx.dataに登録されているバケットに保存する必要がある。そのため、バケットを別途作成し、ロードするデータをそのバケットに保存した。バケットの名前はwatsonx-data-mo-icos-2023-2とした。

次に、watsonx.dataにバケットを登録する(アクティブ化は不要)。カタログの名前はicos2とした。

次に、バケットに保存したデータをテーブルにロードするために、データ・マネージャー取り込みジョブタブを選択し、右側にある取り込みジョブの作成ボタンをクリックする。

watsonx_data_ingest_data_1.png

Iceberg copy loaderが選択されていることを確認して、Nextをクリックする。
(Sparkが構成されていればSparkで行うことも可能)

watsonx_data_ingest_data_2.png

ソース・バケットで作成したバケットwatsonx-data-mo-icos-2023-2を選択し、ソース・ディレクトリーでアップロードしたIndirect_bias_AdultCensusdata_other_than_1000.csvをチェックする。
ヘッダーを付けなかった場合は、右下のHas headerのチェックを外す。
CSV file configurationで定義されている内容を確認して、Nextをクリックする。

watsonx_data_ingest_data_3.png

Targetとして、以下を選択して、Nextをクリックする。

項目
Catalog icos1
Schema ai
Table adult_census

watsonx_data_ingest_data_4.png

内容を確認して、取り込みをクリックする。

watsonx_data_ingest_data_5.png

ジョブが開始されるので、終了するのを待つ。

watsonx_data_ingest_data_6.png

照会ワークスペースでテーブルのデータ数をカウントしてみる。

watsonx_data_ingest_data_7.png

データ数が32561と増えており、データの取り込みが成功している。

Sparkが構成されていれば、Sparkでデータを投入することもできる。Sparkの設定画面例を以下に示す。
(Executor数等が指定可能)

watsonx_data_ingest_data_8_spark.png

2. Sparkの導入と構成

2-1. Sparkインスタンスを作成する

Spark (Analytics Engine powered by Apache Spark) は、他のサービス同様、コマンドで導入する。

その後、以下のように、GUIを使ってインスタンスを作成する。

サービス・カタログからAnalytics Engineで検索すると以下の画面が表示されるので、そこに表示されているAnalytics Engine powered by Apache Sparkをクリックする。

spark_1.png

画面右のインスタンスの右をクリックし、プルダウンで表示された新規インスタンスをクリックする。

spark_2.png

適当な名前を入力し、名前空間を選択して、次へをクリックする。今回は、以下を入力した。

項目
名前 analytics-engine-spark
名前空間 cpd-instance

ストレージについての情報を入力し、次へをクリックする。今回は、以下を入力した。
(このストレージにプログラム実行時の標準出力(stdout)ファイル等が保存される)

項目
Select storage 新規ストレージの作成
ボリューム名 spark-storage
ストレージ・クラス ocs-storagecluster-cephfs
GB単位のサイズ 200

内容を確認して作成をクリックする。

spark_3.png

インスタンスのリストに登録され、状況が実行中になったことを確認する。

spark_4.png

2-2. watsonx.dataにSparkを登録する

watsonx.dataのインフラストラクチャー・マネージャー画面の右にあるコンポーネントの追加をクリックし、Add engineをクリックする。

watsonx_data_spark_1.png

以下のような情報を入力し、Registerをクリックする。

項目
タイプ IBM Analytics Engine (Spark)
表示名 spark-01
Registration mode Select a co-located instance
Instance analytics-engine-spark

watsonx_data_spark_2.png

以下のように、Sparkエンジンが追加され、エンジンが2つになった。

watsonx_data_spark_3.png

2-3. 実行するプログラム等を保存するストレージ・ボリュームを作成する

CP4Dの左上メニューからストレージ・ボリュームをクリックする。
Sparkの実行ログ等が保存されるボリュームcpd-instance::spark-storageがリストされている。ここで、実行するプログラム等を保存するストレージ・ボリュームを作成するため、画面の右に表示されている新しいボリュームをクリックする。

storage_volume_1.png

以下のような情報を入力し、追加をクリックする。

項目
名前空間 cpd-instance (default)
ボリューム名 spark01
ボリューム・タイプ 新しいPVC
ストレージ・クラス ocs-storagecluster-cephfs
GB単位のサイズ 200
マウント・パス /mnts/spark01

storage_volume_2.png

追加したストレージ・ボリュームの状況が実行中になるまで待つ。

storage_volume_3.png

2-4. watsonx.dataのデータにアクセスするために必要な共通のプロパティ値

以下のURLを参照する。

上記のURLに記載されているプロパティ値のうちwatsonx.data環境にアクセスする上で共通なものは以下と思われる。
(usernameはadminを使用した)

項目
spark.driver.extraClassPath /opt/ibm/connectors/iceberg-lakehouse/iceberg-3.3.2-1.2.1-hms-4.0.0-shaded.jar
spark.sql.catalogImplementation hive
spark.sql.extensions org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions
spark.sql.iceberg.vectorization.enabled false
spark.hive.metastore.client.auth.mode PLAIN
spark.hive.metastore.client.plain.username admin
spark.hive.metastore.client.plain.password xxx
spark.hive.metastore.use.SSL true
spark.hive.metastore.truststore.type JKS
spark.hive.metastore.truststore.path file:///opt/ibm/jdk/lib/security/cacerts
spark.hive.metastore.truststore.password changeit

Spark環境としてのデフォルト値を上記のURLに記載のREST APIコールで登録することができる。しかしながら、試行錯誤で不要となったプロパティ値を削除してみたところ、実際には削除されずその値にnullがセットされ、nullによってエラーとなるケースもあった(恐らくバグ)。回避策として、この記事ではデフォルト値をセットせず、プログラム実行時に指定するようにした。最終的に全ての処理で共通となる値をデフォルト値として登録すると良いと思う。今回は、nullを除去するためにSparkのインスタンスを再作成した。

以下の画面は、インスタンスのリスト画面からSparkをクリックすることで得られる。デフォルトのプロパティ値は、この画面の右下に表示され、その枠の右上にあるEditをクリックすることで編集できる。

spark_5.png

右上のAccess endpointsにあるSpark jobs V4 endpointは、プログラムの実行要求先となるURLである。

https://cpd-cpd-instance.mycluster-jp-tok-1-bx2-16-309ee4c04392c50ffd16fd0a4c2688e1-0000.jp-tok.containers.appdomain.cloud
/v4/analytics_engines/b13d7305-79c0-47f0-82b7-9a7da804937a/spark_applications

上記で/v4/analytics_engines/に続くIDがSparkのインスタンスIDである。
(上記の場合、b13d7305-79c0-47f0-82b7-9a7da804937a)

3. Sparkでwatsonx.dataのデータにアクセスしてみる

以下のように、オブジェクト・ストレージのバケットにアクセスするための情報と、カタログ(Iceberg)に関する情報が必要となる。今回、これらの情報はプログラム内に定義した。

このプログラムでアクセスするバケット名はwatsonx-data-mo-icos-2023、カタログ名はicos1である。xxx.access.keyxxx.secret.keyは、以下のURLで説明している値をセットする(access_key_idsecret_access_key)。

プロパティ
spark.hadoop.fs.s3a.bucket.watsonx-data-mo-icos-2023.endpoint s3.jp-tok.cloud-object-storage.appdomain.cloud
spark.hadoop.fs.s3a.bucket.watsonx-data-mo-icos-2023.access.key xxx
spark.hadoop.fs.s3a.bucket.watsonx-data-mo-icos-2023.secret.key xxx
spark.sql.catalog.icos1 org.apache.iceberg.spark.SparkCatalog
spark.sql.catalog.icos1.type hive
spark.sql.catalog.icos1.uri thrift://ibm-lh-lakehouse-hive-metastore-svc.cpd-instance.svc.cluster.local:9083

カタログのuri(spark.sql.catalog.icos1.uri)は、インフラストラクチャー・マネージャー画面の該当カタログをクリックして得られるメタストア・ホストを指定する(以下の画面の右下)。

watsonx_data_catalog_info.png

オブジェクト・ストレージのデータにアクセスするプログラム

watsonx_data_spark_sample.py
from pyspark.sql import SparkSession

def init_spark():
    spark = SparkSession.builder \
        .appName("lh-hms-cloud") \
        .config("spark.hadoop.fs.s3a.bucket.watsonx-data-mo-icos-2023.endpoint" ,"s3.jp-tok.cloud-object-storage.appdomain.cloud") \
        .config("spark.hadoop.fs.s3a.bucket.watsonx-data-mo-icos-2023.access.key" ,"xxx") \
        .config("spark.hadoop.fs.s3a.bucket.watsonx-data-mo-icos-2023.secret.key" ,"xxx") \
        .config("spark.sql.catalog.icos1", "org.apache.iceberg.spark.SparkCatalog") \
        .config("spark.sql.catalog.icos1.type", "hive") \
        .config("spark.sql.catalog.icos1.uri", "thrift://ibm-lh-lakehouse-hive-metastore-svc.cpd-instance.svc.cluster.local:9083") \
        .enableHiveSupport() \
        .getOrCreate()
    return spark

def select_data(spark):
    sqlDF = spark.sql("select * from icos1.ai.adult_census")
    sqlDF.show()

def main():
    try:
        spark = init_spark()
        select_data(spark)
    finally:
        spark.stop()

if __name__ == '__main__':
  main()

上記プログラムを実行要求するコマンド

curl -k -X POST https://${CPD_URL}/v4/analytics_engines/${INSTANCE_ID}/spark_applications -H "Authorization: ZenApiKey $TOKEN" -X POST -d '{
        "application_details": {
            "application": "/spark01/watsonx_data_spark_sample.py",
            "conf": {
                "spark.driver.extraClassPath": "/opt/ibm/connectors/iceberg-lakehouse/iceberg-3.3.2-1.2.1-hms-4.0.0-shaded.jar",
                "spark.sql.catalogImplementation": "hive",
                "spark.sql.extensions": "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions",
                "spark.sql.iceberg.vectorization.enabled": "false",
                "spark.hive.metastore.client.auth.mode": "PLAIN",
                "spark.hive.metastore.client.plain.username": "admin",
                "spark.hive.metastore.client.plain.password": "xxx",
                "spark.hive.metastore.use.SSL": "true",
                "spark.hive.metastore.truststore.type": "JKS",
                "spark.hive.metastore.truststore.path": "file:///opt/ibm/jdk/lib/security/cacerts",
                "spark.hive.metastore.truststore.password": "changeit"
            }
        },
        "volumes": [{
            "name": "cpd-instance::spark01",
            "mount_path": "/spark01/",
            "source_sub_path": "spark"
        }]
    }'

CPD_URLはこのCP4DのベースURLで、INSTANCE_IDはSparkのインスタンスIDである。
TOKENは、ユーザー名とパスワードをコロン(:)で繋げてbase64でエンコードした値で、以下のように生成する。

$ echo admin:xxx | base64
YWRtaW46eHh4Cg==

実行するプログラム等を保存するストレージ・ボリュームを作成した際、マウント・パスは/mnts/spark01とした。その場合、上記のREST APIのデータのmount_pathの値は、/spark01/となる。
applicationにプログラムを配置したパス付きファイル名を指定するが、上記の場合、/spark01/watsonx_data_spark_sample.pyとなっている。ここで注意すべき値がsource_sub_pathsparkとなっている。結果として、実際にプログラムを配置すべき場所は、/mnts/spark01のストレージ・ボリュームのsparkディレクトリ配下となる。
下図はストレージ・ボリュームのリストからcpd-instance::spark01をクリックし、ファイル・ブラウザタブをクリックして表示された画面で、その左下の新規フォルダーをクリックし、sparkフォルダーを作成し、そのフォルダー配下にプログラムをアップロードする。

storage_volume_4.png

結果として、以下のようになる。

storage_volume_5.png

それでは実行してみる。

$ curl -k -X POST https://${CPD_URL}/v4/analytics_engines/${INSTANCE_ID}/spark_applications -H "Authorization: ZenApiKey $TOKEN" -X POST -d '{
        "application_details": {
            "application": "/spark01/watsonx_data_spark_sample.py",
            "conf": {
                "spark.driver.extraClassPath": "/opt/ibm/connectors/iceberg-lakehouse/iceberg-3.3.2-1.2.1-hms-4.0.0-shaded.jar",
                "spark.sql.catalogImplementation": "hive",
                "spark.sql.extensions": "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions",
                "spark.sql.iceberg.vectorization.enabled": "false",
                "spark.hive.metastore.client.auth.mode": "PLAIN",
                "spark.hive.metastore.client.plain.username": "admin",
                "spark.hive.metastore.client.plain.password": "xxx",
                "spark.hive.metastore.use.SSL": "true",
                "spark.hive.metastore.truststore.type": "JKS",
                "spark.hive.metastore.truststore.path": "file:///opt/ibm/jdk/lib/security/cacerts",
                "spark.hive.metastore.truststore.password": "changeit"
            }
        },
        "volumes": [{
            "name": "cpd-instance::spark01",
            "mount_path": "/spark01/",
            "source_sub_path": "spark"
        }]
    }'
{"application_id":"cd086293-38d5-4ab8-aedd-611ebe037d39","state":"ACCEPTED"}

実行ステータスは、インフラストラクチャー・マネージャーで該当するSparkエンジンをクリックして表示された画面で、applicationsタブをクリックすると実行したプログラム一覧が表示される。

spark_6.png

ログは、Sparkのインスタンス作成時に作成したストレージ・ボリューム上に保存される。
最初のフォルダー名がSparkのインスタンスIDで、次のフォルダー名がプログラムを実行した際に得られるアプリケーションIDである。
logsフォルダーにある、spark-driver-xxx-stdoutファイルにプログラムが標準出力に表示したテキストが保存されているので、こちらをダウンロードして確認してみる。

spark_7.png

以下のようにselect文で得られたデータの一部が表示されていることがわかる。

23/12/11 14:30:22 DRIVER INFO CodeGenerator: Code generated in 111.533375 ms
+---+----------------+------+------------+-------------+--------------------+-----------------+-------------+------------------+------+-----------+----+--------+--------------+-----+
|age|       workclass|fnlwgt|   education|education_num|             marital|       occupation| relationship|              race|   sex|capitalgain|loss|hoursper|citizen_status|label|
+---+----------------+------+------------+-------------+--------------------+-----------------+-------------+------------------+------+-----------+----+--------+--------------+-----+
| 39|       State-gov| 77516|   Bachelors|           13|       Never-married|     Adm-clerical|Not-in-family|             White|  Male|       2174|   0|      40| United-States|<=50K|
| 50|Self-emp-not-inc| 83311|   Bachelors|           13|  Married-civ-spouse|  Exec-managerial|      Husband|             White|  Male|          0|   0|      13| United-States|<=50K|
| 38|         Private|215646|     HS-grad|            9|            Divorced|Handlers-cleaners|Not-in-family|             White|  Male|          0|   0|      40| United-States|<=50K|
...

4. ランダム・フォレストのモデルを作成してみる

ランダム・フォレストのモデルを作成し、そのモデルをプロジェクトに保存した後、デプロイメント・スペースにプロモード&デプロイするプログラムを以下に示す。

watsonx_data_model_creation_random_forest.py
from pyspark.sql import SparkSession
from pyspark.ml.feature import StringIndexer, IndexToString, VectorAssembler
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.classification import RandomForestClassifier
from ibm_watson_machine_learning import APIClient

WML_CREDENTIALS = {
    "url": "https://cpd-cpd-instance.mycluster-jp-tok-1-bx2-16-309ee4c04392c50ffd16fd0a4c2688e1-0000.jp-tok.containers.appdomain.cloud",
    "username": "admin",
    "password": "xxx",
    "instance_id": "openshift",
    "version": "4.8"
}
PROJECT_ID = "de382df6-765f-468e-b4de-b87d875117ef"
DEPLOYMENT_SPACE_ID = "c4cd6278-3fe5-438c-8870-08691f8e270c"
MODEL_NAME = "adult_census_model_rf"

def init_spark():
    spark = SparkSession.builder \
        .appName("lightgbm-model-creation") \
        .config("spark.hadoop.fs.s3a.bucket.watsonx-data-mo-icos-2023.endpoint" ,"s3.jp-tok.cloud-object-storage.appdomain.cloud") \
        .config("spark.hadoop.fs.s3a.bucket.watsonx-data-mo-icos-2023.access.key" ,"xxx") \
        .config("spark.hadoop.fs.s3a.bucket.watsonx-data-mo-icos-2023.secret.key" ,"xxx") \
        .config("spark.sql.catalog.icos1", "org.apache.iceberg.spark.SparkCatalog") \
        .config("spark.sql.catalog.icos1.type", "hive") \
        .config("spark.sql.catalog.icos1.uri", "thrift://ibm-lh-lakehouse-hive-metastore-svc.cpd-instance.svc.cluster.local:9083") \
        .enableHiveSupport() \
        .getOrCreate()
    return spark

def prepare_data(spark):
    spark_df = spark.sql("select * from icos1.ai.adult_census")
    protected_attributes = ["race", "age", "sex"]
    for attr in protected_attributes:
        spark_df = spark_df.drop(attr)
    train_data, test_data = spark_df.randomSplit([0.85, 0.15], seed=24)
    return train_data, test_data

def build_model(spark, train_data, test_data):
    cat_features = ['workclass', 'education', 'marital', 'occupation', 'relationship', 'citizen_status'] 
    num_features = ["fnlwgt", "education_num", "capitalgain", "loss", "hoursper"]
    stages = []

    cat_features_idx = [col + "_idx" for col in cat_features]
    str_indexer = StringIndexer(inputCols=cat_features, outputCols=cat_features_idx)
    stages.append(str_indexer)

    str_indexer_label = StringIndexer(inputCol="label", outputCol="encoded_label").fit(train_data)
    label_converter = IndexToString(inputCol="prediction", outputCol="predictedLabel", labels=str_indexer_label.labels)
    stages.append(str_indexer_label)

    va_features = VectorAssembler(
        inputCols=cat_features_idx + num_features, outputCol="features"
    )
    stages.append(va_features)

    classifier = RandomForestClassifier(labelCol="encoded_label", featuresCol="features", maxBins=42)
    stages.append(classifier)
    stages.append(label_converter)

    pipeline = Pipeline(stages=stages)
    model = pipeline.fit(train_data)
    predictions = model.transform(test_data)
    evaluator = BinaryClassificationEvaluator(labelCol="encoded_label", rawPredictionCol="rawPrediction")
    roc = evaluator.evaluate(predictions)
    print(f"Area under the ROC = {roc}")
    return model, pipeline

def get_wml_client(wml_credentials):
    wml_client = APIClient(wml_credentials)
    return wml_client

def store_model(wml_client, project_id, model_name, model, pipeline, train_data):
    wml_client.set.default_project(project_id)
    software_spec_uid = wml_client.software_specifications.get_id_by_name("spark-mllib_3.3")
    print("Software Specification ID: {}".format(software_spec_uid))
    model_props = {
        wml_client._models.ConfigurationMetaNames.NAME:"{}".format(model_name),
        wml_client._models.ConfigurationMetaNames.TYPE: "mllib_3.3",
        wml_client._models.ConfigurationMetaNames.SOFTWARE_SPEC_UID: software_spec_uid,
        wml_client._models.ConfigurationMetaNames.LABEL_FIELD: "label",
    }
    print("Storing model ...")
    model_details = wml_client.repository.store_model(
        model=model, 
        meta_props=model_props, 
        training_data=train_data, 
        pipeline=pipeline)
    model_id = wml_client.repository.get_model_id(model_details)
    print(f"Model ID: {model_id}")
    return model_id 

def promote_and_deploy_model(wml_client, project_id, deployment_space_id, model_name, model_id):
    wml_client.set.default_project(project_id)
    # space_model_id = wml_client.repository.promote_model(model_id=model_id, source_project_id=project_id, target_space_id=deployment_space_id)
    space_model_id = wml_client.spaces.promote(asset_id=model_id, source_project_id=project_id, target_space_id=deployment_space_id)
    wml_client.set.default_space(deployment_space_id)
    metadata = {
        "name": model_name
    }
    meta_props={
        wml_client.deployments.ConfigurationMetaNames.NAME: model_name,
        wml_client.deployments.ConfigurationMetaNames.ONLINE: {}
    }
    deployment_details = wml_client.deployments.create(artifact_uid=space_model_id, meta_props=meta_props, target_space_id=deployment_space_id)
    print(f"Deployment space model asset ID: {space_model_id}")
    print(f'Deployment space model deployment ID: {deployment_details["metadata"]["id"]}')

def main():
    try:
        spark = init_spark()
        train_data, test_data = prepare_data(spark)
        model, pipeline = build_model(spark, train_data, test_data)
        wml_client = get_wml_client(WML_CREDENTIALS)
        model_id = store_model(wml_client, PROJECT_ID, MODEL_NAME, model, pipeline, train_data)
        promote_and_deploy_model(wml_client, PROJECT_ID, DEPLOYMENT_SPACE_ID, MODEL_NAME, model_id)
    finally:
        spark.stop()

if __name__ == '__main__':
    main()

プログラムの最初に以下の4つの値をセットする。

変数名
WML_CREDENTIALS CP4Dのurl/username/passwordをセットする。その他は変更不要
PROJECT_ID xxx
DEPLOYMENT_SPACE_ID xxx
MODEL_NAME adult_census_model_rf

PROJECT_IDは、該当プロジェクトを開き、管理タブに記載されているプロジェクトIDの値をセットする。

watson_project_id.png

DEPLOYMENT_SPACE_IDは、該当デプロイメント・スペースを開き、管理タブに記載されているスペースGUIDの値をセットする。

watson_deployment_space_id.png

上記のプログラムをspark01のストレージ・ボリュームにアップロードし、そのプログラムを実行要求する以下のコマンドを実行してみる。

$ curl -k -X POST https://${CPD_URL}/v4/analytics_engines/${INSTANCE_ID}/spark_applications -H "Authorization: ZenApiKey $TOKEN" -X POST -d '{
        "application_details": {
            "application": "/spark01/watsonx_data_model_creation_random_forest.py",
            "conf": {
                "spark.driver.extraClassPath": "/opt/ibm/connectors/iceberg-lakehouse/iceberg-3.3.2-1.2.1-hms-4.0.0-shaded.jar",
                "spark.sql.catalogImplementation": "hive",
                "spark.sql.extensions": "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions",
                "spark.sql.iceberg.vectorization.enabled": "false",
                "spark.hive.metastore.client.auth.mode": "PLAIN",
                "spark.hive.metastore.client.plain.username": "admin",
                "spark.hive.metastore.client.plain.password": "xxx",
                "spark.hive.metastore.use.SSL": "true",
                "spark.hive.metastore.truststore.type": "JKS",
                "spark.hive.metastore.truststore.path": "file:///opt/ibm/jdk/lib/security/cacerts",
                "spark.hive.metastore.truststore.password": "changeit"
            }
        },
        "volumes": [{
            "name": "cpd-instance::spark01",
            "mount_path": "/spark01/",
            "source_sub_path": "spark"
        }]
    }'
{"application_id":"167dfc01-2a35-4be8-ba25-9e6f5a52e8b1","state":"ACCEPTED"}

プログラムの実行が終了すると、デプロイメント・スペースにモデルがデプロイされる。

watson_model_deployed.png

モデルをテストしてみる。

watson_model_test.png

問題なく推論結果が得られた。

watson_model_test_result.png

5. LightGBMのモデルを作成してみる

LightGBMのモデルの作成に必要なパッケージの導入は以下のURLを参照した。

結果として、以下のプロパティ値を設定することで導入できた。
(以下はSpark 3.3で必要となるパッケージ)

"spark.jars.packages": "com.microsoft.azure:synapseml_2.12:0.11.4-spark3.3"
"spark.jars.repositories": "https://mmlspark.azureedge.net/maven"

さらに冒頭で述べたように、spark.jars.ivyプロパティに共有ストレージのパスをセットする必要があり、今回は/spark01/ivy2をセットしている。

これらの設定は、プログラムを実行要求するコマンドで設定する必要がある。

LightGBMのモデルを作成するプログラムを以下に示す。

watsonx_data_model_creation_lightgbm.py
from pyspark.sql import SparkSession
from pyspark.ml.feature import StringIndexer, IndexToString, VectorAssembler
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from synapse.ml.lightgbm import LightGBMClassifier

MODEL_NAME = "adult_census_model_lightgbm"

def init_spark():
    spark = SparkSession.builder \
        .appName("lightgbm-model-creation") \
        .config("spark.hadoop.fs.s3a.bucket.watsonx-data-mo-icos-2023.endpoint" ,"s3.jp-tok.cloud-object-storage.appdomain.cloud") \
        .config("spark.hadoop.fs.s3a.bucket.watsonx-data-mo-icos-2023.access.key" ,"xxx") \
        .config("spark.hadoop.fs.s3a.bucket.watsonx-data-mo-icos-2023.secret.key" ,"xxx") \
        .config("spark.sql.catalog.icos1", "org.apache.iceberg.spark.SparkCatalog") \
        .config("spark.sql.catalog.icos1.type", "hive") \
        .config("spark.sql.catalog.icos1.uri", "thrift://ibm-lh-lakehouse-hive-metastore-svc.cpd-instance.svc.cluster.local:9083") \
        .enableHiveSupport() \
        .getOrCreate()
    return spark

def prepare_data(spark):
    spark_df = spark.sql("select * from icos1.ai.adult_census")
    protected_attributes = ["race", "age", "sex"]
    for attr in protected_attributes:
        spark_df = spark_df.drop(attr)
    train_data, test_data = spark_df.randomSplit([0.85, 0.15], seed=1)
    return train_data, test_data

def build_model(spark, train_data, test_data):
    cat_features = ['workclass', 'education', 'marital', 'occupation', 'relationship', 'citizen_status'] 
    num_features = ["fnlwgt", "education_num", "capitalgain", "loss", "hoursper"]
    stages = []

    cat_features_idx = [col + "_idx" for col in cat_features]
    str_indexer = StringIndexer(
        inputCols=cat_features, outputCols=cat_features_idx
    )
    stages.append(str_indexer)

    str_indexer_label = StringIndexer(inputCol="label", outputCol="encoded_label").fit(train_data)
    label_converter = IndexToString(inputCol="prediction", outputCol="predictedLabel", labels=str_indexer_label.labels)
    stages.append(str_indexer_label)

    va_features = VectorAssembler(
        inputCols=cat_features_idx + num_features, outputCol="features"
    )
    stages.append(va_features)

    params = {
        "learningRate": .2,
        "numIterations": 200,
        "numLeaves": 31
    }
    classifier = LightGBMClassifier(objective="binary", featuresCol="features", categoricalSlotNames=cat_features_idx, labelCol="encoded_label", **params)
    stages.append(classifier)
    stages.append(label_converter)

    pipeline = Pipeline(stages=stages)
    model = pipeline.fit(train_data)
    predictions = model.transform(test_data)
    evaluator = BinaryClassificationEvaluator(labelCol="encoded_label", rawPredictionCol="rawPrediction")
    roc = evaluator.evaluate(predictions)
    print(f"Area under the ROC = {roc}")
    return model, pipeline

def save_model(model, model_name):
    model.save(model_name)

def main():
    try:
        spark = init_spark()
        train_data, test_data = prepare_data(spark)
        model, pipeline = build_model(spark, train_data, test_data)
        save_model(model, MODEL_NAME)
    finally:
        spark.stop()

if __name__ == '__main__':
    main()

上記のプログラムをspark01のストレージ・ボリュームにアップロードし、そのプログラムを実行要求する以下のコマンドを実行してみる。

$ curl -k -X POST https://${CPD_URL}/v4/analytics_engines/${INSTANCE_ID}/spark_applications -H "Authorization: ZenApiKey $TOKEN" -X POST -d '{
        "application_details": {
            "application": "/spark01/watsonx_data_model_creation_lightgbm.py",
            "conf": {
                "spark.driver.extraClassPath": "/opt/ibm/connectors/iceberg-lakehouse/iceberg-3.3.2-1.2.1-hms-4.0.0-shaded.jar",
                "spark.sql.catalogImplementation": "hive",
                "spark.sql.extensions": "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions",
                "spark.sql.iceberg.vectorization.enabled": "false",
                "spark.hive.metastore.client.auth.mode": "PLAIN",
                "spark.hive.metastore.client.plain.username": "admin",
                "spark.hive.metastore.client.plain.password": "5TvBgyaTqjzE",
                "spark.hive.metastore.use.SSL": "true",
                "spark.hive.metastore.truststore.type": "JKS",
                "spark.hive.metastore.truststore.path": "file:///opt/ibm/jdk/lib/security/cacerts",
                "spark.hive.metastore.truststore.password": "changeit",
                "spark.jars.packages": "com.microsoft.azure:synapseml_2.12:0.11.4-spark3.3",
                "spark.jars.repositories": "https://mmlspark.azureedge.net/maven",
                "spark.jars.ivy": "/spark01/ivy2"
            }
        },
        "volumes": [{
            "name": "cpd-instance::spark01",
            "mount_path": "/spark01/",
            "source_sub_path": "spark"
        }]
    }'
{"application_id":"09bbc7e7-2b2d-47a5-8bd0-56e0f3430566","state":"ACCEPTED"}

プログラムの実行が終了すると、モデルがログとともに保存される。
(下図のadult_census_model_lightgbmフォルダーにモデルが保存されている)

storage_volume_lightgbm_model.png

今回のプログラムではモデルを保存する際にパスを指定していないため、ログ・ファイル等が保存されるフォルダーと同じところにモデルが保存されている。モデルを保存する際に、/spark01/パスを指定すると、cpd-instance::spark01の方に保存される。

補足: Spark3.3 から Spark3.4 への切り替え

以下のURLに従う。

実際に Spark 3.4 に変えてみる。
(環境変数名やその値は今まで述べてきたやり方に従っており、上記のURLとは少し異なる)

$ export CPD_URL=cpd-cpd-instance.mycluster-jp-tok-1-bx2-16-309ee4c04392c50ffd16fd0a4c2688e1-0000.jp-tok.containers.appdomain.cloud
$ export INSTANCE_ID=b13d7305-79c0-47f0-82b7-9a7da804937a
$ export TOKEN=xxx
$ curl -X PUT https://${CPD_URL}/v4/analytics_engines/${INSTANCE_ID}/default_runtime \
    --header "Authorization: ZenApiKey ${TOKEN}" \
    --header "Content-Type: application/json" \
    --data-raw '{
        "spark_version": "3.4"
    }'
{"spark_version":"3.4"}

Spark3.4に変更した環境を使って、LightGBMのパッケージをSpark3.4用に変えて、LightGBMのモデルを作成するプログラムを実行してみる。
(Spark3.4用のLightGBMパッケージ: com.microsoft.azure:synapseml_2.12:1.0.2)

$ curl -k -X POST https://${CPD_URL}/v4/analytics_engines/${INSTANCE_ID}/spark_applications -H "Authorization: ZenApiKey $TOKEN" -X POST -d '{
        "application_details": {
            "application": "/spark01/watsonx_data_model_creation_lightgbm.py",
            "conf": {
                "spark.driver.extraClassPath": "/opt/ibm/connectors/iceberg-lakehouse/iceberg-3.3.2-1.2.1-hms-4.0.0-shaded.jar",
                "spark.sql.catalogImplementation": "hive",
                "spark.sql.extensions": "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions",
                "spark.sql.iceberg.vectorization.enabled": "false",
                "spark.hive.metastore.client.auth.mode": "PLAIN",
                "spark.hive.metastore.client.plain.username": "admin",
                "spark.hive.metastore.client.plain.password": "5TvBgyaTqjzE",
                "spark.hive.metastore.use.SSL": "true",
                "spark.hive.metastore.truststore.type": "JKS",
                "spark.hive.metastore.truststore.path": "file:///opt/ibm/jdk/lib/security/cacerts",
                "spark.hive.metastore.truststore.password": "changeit",
                "spark.jars.packages": "com.microsoft.azure:synapseml_2.12:1.0.2",
                "spark.jars.repositories": "https://mmlspark.azureedge.net/maven",
                "spark.jars.ivy": "/spark01/ivy2"
            }
        },
        "volumes": [{
            "name": "cpd-instance::spark01",
            "mount_path": "/spark01/",
            "source_sub_path": "spark"
        }]
    }'
{"application_id":"5df9920a-d56c-4d2a-a88f-d2e8cc9581a2","template_id":"spark-3.4-cp4d-template","state":"ACCEPTED","runtime":{"spark_version":"3.4"}}

プログラムの実行結果は以下のように成功し、Spark versionが 3.4 となっている。

spark34_lightgbm_result.png

Spark3.3での実行結果と同様、LightGBMモデルもログと共に保存されている。

storage_volume_lightgbm_model_spark34.png

3
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
3
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?