watsonx.dataでPrestoエンジンに加えて以下の3つのエンジンがサポートされ、用途に応じて、watsonx.dataで管理しているデータにアクセスするエンジンを選択できるようになりました。
- IBM Analytics Engine (Spark)
- IBM Db2 Warehouse
- IBM Netteza
今回は、IBM Analytics Engine (Spark) エンジンを使って、PySparkで、オブジェクト・ストレージに保存されたデータにアクセスして、機械学習モデルを作ってみました。
この記事の内容
-
今回試した環境
-
IBM Cloud Pak for Data (CP4D) 4.8.0 上に以下のサービスを導入
- watsonx.data (SW版) 1.1.0
- Analytics Engine powered by Apache Spark 4.8.0
- Spark 3.3 と 3.4 をサポート
- Watson Studio 4.8.0
- Watson Machine Learning 4.8.0
- Spark 3.3 をサポート
-
IBM Cloud Pak for Data (CP4D) 4.8.0 上に以下のサービスを導入
-
機械学習モデルとしては、以下のサイトに記載されているデータを使って二項分類モデルをランダム・フォレストとLightGBMで作成
-
学習&テスト・データはwatsonx.dataを使って、オブジェクト・ストレージ上に保存
- 学習&テスト・データファイルを小さめのファイル(1000レコード、100KB)と残りの2つに分割した。小さめのファイルを使って
ファイルから表を作成する
機能でテーブルの作成とデータの投入を行い、取り込みジョブの作成
機能で残りのデータを投入した。
- 学習&テスト・データファイルを小さめのファイル(1000レコード、100KB)と残りの2つに分割した。小さめのファイルを使って
-
Spark環境でのプログラムの実行は、ストレージ・ボリュームを作成し、そこにプログラムをアップロードし、REST APIのPOSTメソッドで実行要求
-
ランダム・フォレストのモデルは、Watson Studioのプロジェクトに保存後、Watson Machine Learning (WML)のデプロイメント・スペースにプロモート&デプロイ (これらをモデルを作成するプログラムに組み込んだ)
-
LightGBMのモデルをWMLにデプロイするためには、そのモデルで使用しているパッケージをWMLの環境に組み込む必要があるが、現時点ではWMLのSpark環境にパッケージを組み込む機能がサポートされていなかった。別のCloud環境にデプロイして使うシナリオに利用したい。
-
LightGBMに必要なパッケージは以下のようにすることで導入できた。
- パッケージを導入するために指定するプロパティ値はプログラム内ではなく、プログラムを実行要求する際に設定する
- パッケージのキャッシュに使用するパスはプロセス間で共有しないとエラーとなるため、ストレージ・ボリュームを使って共有させる(
spark.jars.ivy
プロパティの設定)
この記事の流れ
この記事は、以下の流れで説明する。
- 学習&テスト・データの投入
- Sparkの導入と構成
- Sparkでwatsonx.dataのデータにアクセスしてみる
- ランダム・フォレストのモデルを作成してみる
- LightGBMのモデルを作成してみる
1. 学習&テスト・データの投入
別の記事で紹介したファイルから表を作成する
機能を使うと、ファイルからテーブルを作成し、データをロードできるが、許容できるファイルの最大サイズが2MBとなっている。今回使用する学習&テスト・データはそのサイズを超えている(約3.6MB)。
この記事で紹介する取り込みジョブの作成
機能を使うと既存のテーブルにデータを投入することができ、最大サイズも特に明記されてない。しかしながら、新規にテーブルを作成する機能は現時点では提供されていない。
そこで、学習&テスト・データファイルを小さめのファイル(1000レコード、約0.1MB)と残りの2つに分割し、最初のファイルでファイルから表を作成する
機能を使ってテーブルの作成とデータのロードを行い、残りの学習&テスト・データは取り込みジョブの作成
機能を使ってロードすることとした。
ちなみに、テーブルをあらかじめ作成しておき、その後、全データを取り込みジョブの作成
でロードすることも可能である。今回のテーブルを作成するためには、以下のCREATE TABLE
文を発行する。
CREATE TABLE "icos1"."ai"."adult_census" (
"age" INT,
"workclass" VARCHAR,
"fnlwgt" INT,
"education" VARCHAR,
"education_num" INT,
"Marital" VARCHAR,
"occupation" VARCHAR,
"relationship" VARCHAR,
"race" VARCHAR,
"sex" VARCHAR,
"capitalgain" INT,
"loss" INT,
"hoursper" INT,
"citizen_status" VARCHAR,
"label" VARCHAR
);
1-1. ICOSのバケットをwatsonx.dataのカタログとして組み込む
以下の手順に従う。以下の記事同様、バケット名をwatsonx-data-mo-icos-2023
、カタログ名をICOS1
とした。
1-2. 学習&テスト・データを用意し、watsonx.dataでICOSにテーブルを作成し、データを投入する
まず、以下のコマンドでデータを取得する
(あるいは、以下のURLをブラウザで開き、表示されたcsvファイルをダウンロードする)
wget https://raw.githubusercontent.com/IBM/watson-openscale-samples/main/IBM%20Cloud/WML/assets/data/adult_census/Indirect_bias_AdultCensusdata.csv
上記ファイルを、最初の1000行と残りのデータに分割する。残りのデータに対するヘッダーは付けても付けなくても良い。
内容 | ファイル名 | サイズ(バイト) |
---|---|---|
全て | Indirect_bias_AdultCensusdata.csv | 3551143 |
最初の1000行 | Indirect_bias_AdultCensusdata_1000.csv | 109027 |
残り | Indirect_bias_AdultCensusdata_other_than_1000.csv | 3442248 |
分割した1000行のデータファイルを使って、以下の手順に従い、テーブルの作成とデータのロードを行う(スキーマ名はai
、テーブル名はadult_census
とした)。
1-3. 作成したテーブルに残りのデータをロードする
ロードするデータはwatsonx.dataに登録されているバケットに保存する必要がある。そのため、バケットを別途作成し、ロードするデータをそのバケットに保存した。バケットの名前はwatsonx-data-mo-icos-2023-2
とした。
次に、watsonx.dataにバケットを登録する(アクティブ化
は不要)。カタログの名前はicos2
とした。
次に、バケットに保存したデータをテーブルにロードするために、データ・マネージャー
の取り込みジョブ
タブを選択し、右側にある取り込みジョブの作成
ボタンをクリックする。
Iceberg copy loader
が選択されていることを確認して、Next
をクリックする。
(Sparkが構成されていればSparkで行うことも可能)
ソース・バケット
で作成したバケットwatsonx-data-mo-icos-2023-2
を選択し、ソース・ディレクトリー
でアップロードしたIndirect_bias_AdultCensusdata_other_than_1000.csv
をチェックする。
ヘッダーを付けなかった場合は、右下のHas header
のチェックを外す。
CSV file configuration
で定義されている内容を確認して、Next
をクリックする。
Target
として、以下を選択して、Next
をクリックする。
項目 | 値 |
---|---|
Catalog | icos1 |
Schema | ai |
Table | adult_census |
内容を確認して、取り込み
をクリックする。
ジョブが開始されるので、終了するのを待つ。
照会ワークスペース
でテーブルのデータ数をカウントしてみる。
データ数が32561と増えており、データの取り込みが成功している。
Sparkが構成されていれば、Sparkでデータを投入することもできる。Sparkの設定画面例を以下に示す。
(Executor数等が指定可能)
2. Sparkの導入と構成
2-1. Sparkインスタンスを作成する
Spark (Analytics Engine powered by Apache Spark) は、他のサービス同様、コマンドで導入する。
その後、以下のように、GUIを使ってインスタンスを作成する。
サービス・カタログ
からAnalytics Engine
で検索すると以下の画面が表示されるので、そこに表示されているAnalytics Engine powered by Apache Spark
をクリックする。
画面右のインスタンス
の右をクリックし、プルダウンで表示された新規インスタンス
をクリックする。
適当な名前を入力し、名前空間を選択して、次へ
をクリックする。今回は、以下を入力した。
項目 | 値 |
---|---|
名前 | analytics-engine-spark |
名前空間 | cpd-instance |
ストレージについての情報を入力し、次へ
をクリックする。今回は、以下を入力した。
(このストレージにプログラム実行時の標準出力(stdout)ファイル等が保存される)
項目 | 値 |
---|---|
Select storage | 新規ストレージの作成 |
ボリューム名 | spark-storage |
ストレージ・クラス | ocs-storagecluster-cephfs |
GB単位のサイズ | 200 |
内容を確認して作成
をクリックする。
インスタンス
のリストに登録され、状況が実行中になったことを確認する。
2-2. watsonx.dataにSparkを登録する
watsonx.dataのインフラストラクチャー・マネージャー
画面の右にあるコンポーネントの追加
をクリックし、Add engine
をクリックする。
以下のような情報を入力し、Register
をクリックする。
項目 | 値 |
---|---|
タイプ | IBM Analytics Engine (Spark) |
表示名 | spark-01 |
Registration mode | Select a co-located instance |
Instance | analytics-engine-spark |
以下のように、Sparkエンジンが追加され、エンジンが2つになった。
2-3. 実行するプログラム等を保存するストレージ・ボリュームを作成する
CP4Dの左上メニューからストレージ・ボリューム
をクリックする。
Sparkの実行ログ等が保存されるボリュームcpd-instance::spark-storage
がリストされている。ここで、実行するプログラム等を保存するストレージ・ボリュームを作成するため、画面の右に表示されている新しいボリューム
をクリックする。
以下のような情報を入力し、追加
をクリックする。
項目 | 値 |
---|---|
名前空間 | cpd-instance (default) |
ボリューム名 | spark01 |
ボリューム・タイプ | 新しいPVC |
ストレージ・クラス | ocs-storagecluster-cephfs |
GB単位のサイズ | 200 |
マウント・パス | /mnts/spark01 |
追加したストレージ・ボリュームの状況が実行中になるまで待つ。
2-4. watsonx.dataのデータにアクセスするために必要な共通のプロパティ値
以下のURLを参照する。
上記のURLに記載されているプロパティ値のうちwatsonx.data環境にアクセスする上で共通なものは以下と思われる。
(usernameはadmin
を使用した)
項目 | 値 |
---|---|
spark.driver.extraClassPath | /opt/ibm/connectors/iceberg-lakehouse/iceberg-3.3.2-1.2.1-hms-4.0.0-shaded.jar |
spark.sql.catalogImplementation | hive |
spark.sql.extensions | org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions |
spark.sql.iceberg.vectorization.enabled | false |
spark.hive.metastore.client.auth.mode | PLAIN |
spark.hive.metastore.client.plain.username | admin |
spark.hive.metastore.client.plain.password | xxx |
spark.hive.metastore.use.SSL | true |
spark.hive.metastore.truststore.type | JKS |
spark.hive.metastore.truststore.path | file:///opt/ibm/jdk/lib/security/cacerts |
spark.hive.metastore.truststore.password | changeit |
Spark環境としてのデフォルト値を上記のURLに記載のREST APIコールで登録することができる。しかしながら、試行錯誤で不要となったプロパティ値を削除してみたところ、実際には削除されずその値にnullがセットされ、nullによってエラーとなるケースもあった(恐らくバグ)。回避策として、この記事ではデフォルト値をセットせず、プログラム実行時に指定するようにした。最終的に全ての処理で共通となる値をデフォルト値として登録すると良いと思う。今回は、nullを除去するためにSparkのインスタンスを再作成した。
以下の画面は、インスタンスのリスト画面からSparkをクリックすることで得られる。デフォルトのプロパティ値は、この画面の右下に表示され、その枠の右上にあるEdit
をクリックすることで編集できる。
右上のAccess endpoints
にあるSpark jobs V4 endpoint
は、プログラムの実行要求先となるURLである。
https://cpd-cpd-instance.mycluster-jp-tok-1-bx2-16-309ee4c04392c50ffd16fd0a4c2688e1-0000.jp-tok.containers.appdomain.cloud
/v4/analytics_engines/b13d7305-79c0-47f0-82b7-9a7da804937a/spark_applications
上記で/v4/analytics_engines/
に続くIDがSparkのインスタンスIDである。
(上記の場合、b13d7305-79c0-47f0-82b7-9a7da804937a
)
3. Sparkでwatsonx.dataのデータにアクセスしてみる
以下のように、オブジェクト・ストレージのバケットにアクセスするための情報と、カタログ(Iceberg)に関する情報が必要となる。今回、これらの情報はプログラム内に定義した。
このプログラムでアクセスするバケット名はwatsonx-data-mo-icos-2023
、カタログ名はicos1
である。xxx.access.key
とxxx.secret.key
は、以下のURLで説明している値をセットする(access_key_id
とsecret_access_key
)。
プロパティ | 値 |
---|---|
spark.hadoop.fs.s3a.bucket.watsonx-data-mo-icos-2023.endpoint | s3.jp-tok.cloud-object-storage.appdomain.cloud |
spark.hadoop.fs.s3a.bucket.watsonx-data-mo-icos-2023.access.key | xxx |
spark.hadoop.fs.s3a.bucket.watsonx-data-mo-icos-2023.secret.key | xxx |
spark.sql.catalog.icos1 | org.apache.iceberg.spark.SparkCatalog |
spark.sql.catalog.icos1.type | hive |
spark.sql.catalog.icos1.uri | thrift://ibm-lh-lakehouse-hive-metastore-svc.cpd-instance.svc.cluster.local:9083 |
カタログのuri(spark.sql.catalog.icos1.uri
)は、インフラストラクチャー・マネージャー
画面の該当カタログをクリックして得られるメタストア・ホスト
を指定する(以下の画面の右下)。
オブジェクト・ストレージのデータにアクセスするプログラム
from pyspark.sql import SparkSession
def init_spark():
spark = SparkSession.builder \
.appName("lh-hms-cloud") \
.config("spark.hadoop.fs.s3a.bucket.watsonx-data-mo-icos-2023.endpoint" ,"s3.jp-tok.cloud-object-storage.appdomain.cloud") \
.config("spark.hadoop.fs.s3a.bucket.watsonx-data-mo-icos-2023.access.key" ,"xxx") \
.config("spark.hadoop.fs.s3a.bucket.watsonx-data-mo-icos-2023.secret.key" ,"xxx") \
.config("spark.sql.catalog.icos1", "org.apache.iceberg.spark.SparkCatalog") \
.config("spark.sql.catalog.icos1.type", "hive") \
.config("spark.sql.catalog.icos1.uri", "thrift://ibm-lh-lakehouse-hive-metastore-svc.cpd-instance.svc.cluster.local:9083") \
.enableHiveSupport() \
.getOrCreate()
return spark
def select_data(spark):
sqlDF = spark.sql("select * from icos1.ai.adult_census")
sqlDF.show()
def main():
try:
spark = init_spark()
select_data(spark)
finally:
spark.stop()
if __name__ == '__main__':
main()
上記プログラムを実行要求するコマンド
curl -k -X POST https://${CPD_URL}/v4/analytics_engines/${INSTANCE_ID}/spark_applications -H "Authorization: ZenApiKey $TOKEN" -X POST -d '{
"application_details": {
"application": "/spark01/watsonx_data_spark_sample.py",
"conf": {
"spark.driver.extraClassPath": "/opt/ibm/connectors/iceberg-lakehouse/iceberg-3.3.2-1.2.1-hms-4.0.0-shaded.jar",
"spark.sql.catalogImplementation": "hive",
"spark.sql.extensions": "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions",
"spark.sql.iceberg.vectorization.enabled": "false",
"spark.hive.metastore.client.auth.mode": "PLAIN",
"spark.hive.metastore.client.plain.username": "admin",
"spark.hive.metastore.client.plain.password": "xxx",
"spark.hive.metastore.use.SSL": "true",
"spark.hive.metastore.truststore.type": "JKS",
"spark.hive.metastore.truststore.path": "file:///opt/ibm/jdk/lib/security/cacerts",
"spark.hive.metastore.truststore.password": "changeit"
}
},
"volumes": [{
"name": "cpd-instance::spark01",
"mount_path": "/spark01/",
"source_sub_path": "spark"
}]
}'
CPD_URL
はこのCP4DのベースURLで、INSTANCE_ID
はSparkのインスタンスIDである。
TOKEN
は、ユーザー名とパスワードをコロン(:)で繋げてbase64でエンコードした値で、以下のように生成する。
$ echo admin:xxx | base64
YWRtaW46eHh4Cg==
実行するプログラム等を保存するストレージ・ボリュームを作成した際、マウント・パスは/mnts/spark01
とした。その場合、上記のREST APIのデータのmount_path
の値は、/spark01/
となる。
application
にプログラムを配置したパス付きファイル名を指定するが、上記の場合、/spark01/watsonx_data_spark_sample.py
となっている。ここで注意すべき値がsource_sub_path
でspark
となっている。結果として、実際にプログラムを配置すべき場所は、/mnts/spark01
のストレージ・ボリュームのspark
ディレクトリ配下となる。
下図はストレージ・ボリューム
のリストからcpd-instance::spark01
をクリックし、ファイル・ブラウザ
タブをクリックして表示された画面で、その左下の新規フォルダー
をクリックし、spark
フォルダーを作成し、そのフォルダー配下にプログラムをアップロードする。
結果として、以下のようになる。
それでは実行してみる。
$ curl -k -X POST https://${CPD_URL}/v4/analytics_engines/${INSTANCE_ID}/spark_applications -H "Authorization: ZenApiKey $TOKEN" -X POST -d '{
"application_details": {
"application": "/spark01/watsonx_data_spark_sample.py",
"conf": {
"spark.driver.extraClassPath": "/opt/ibm/connectors/iceberg-lakehouse/iceberg-3.3.2-1.2.1-hms-4.0.0-shaded.jar",
"spark.sql.catalogImplementation": "hive",
"spark.sql.extensions": "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions",
"spark.sql.iceberg.vectorization.enabled": "false",
"spark.hive.metastore.client.auth.mode": "PLAIN",
"spark.hive.metastore.client.plain.username": "admin",
"spark.hive.metastore.client.plain.password": "xxx",
"spark.hive.metastore.use.SSL": "true",
"spark.hive.metastore.truststore.type": "JKS",
"spark.hive.metastore.truststore.path": "file:///opt/ibm/jdk/lib/security/cacerts",
"spark.hive.metastore.truststore.password": "changeit"
}
},
"volumes": [{
"name": "cpd-instance::spark01",
"mount_path": "/spark01/",
"source_sub_path": "spark"
}]
}'
{"application_id":"cd086293-38d5-4ab8-aedd-611ebe037d39","state":"ACCEPTED"}
実行ステータスは、インフラストラクチャー・マネージャー
で該当するSparkエンジンをクリックして表示された画面で、applications
タブをクリックすると実行したプログラム一覧が表示される。
ログは、Sparkのインスタンス作成時に作成したストレージ・ボリューム上に保存される。
最初のフォルダー名がSparkのインスタンスIDで、次のフォルダー名がプログラムを実行した際に得られるアプリケーションIDである。
logs
フォルダーにある、spark-driver-xxx-stdout
ファイルにプログラムが標準出力に表示したテキストが保存されているので、こちらをダウンロードして確認してみる。
以下のようにselect
文で得られたデータの一部が表示されていることがわかる。
23/12/11 14:30:22 DRIVER INFO CodeGenerator: Code generated in 111.533375 ms
+---+----------------+------+------------+-------------+--------------------+-----------------+-------------+------------------+------+-----------+----+--------+--------------+-----+
|age| workclass|fnlwgt| education|education_num| marital| occupation| relationship| race| sex|capitalgain|loss|hoursper|citizen_status|label|
+---+----------------+------+------------+-------------+--------------------+-----------------+-------------+------------------+------+-----------+----+--------+--------------+-----+
| 39| State-gov| 77516| Bachelors| 13| Never-married| Adm-clerical|Not-in-family| White| Male| 2174| 0| 40| United-States|<=50K|
| 50|Self-emp-not-inc| 83311| Bachelors| 13| Married-civ-spouse| Exec-managerial| Husband| White| Male| 0| 0| 13| United-States|<=50K|
| 38| Private|215646| HS-grad| 9| Divorced|Handlers-cleaners|Not-in-family| White| Male| 0| 0| 40| United-States|<=50K|
...
4. ランダム・フォレストのモデルを作成してみる
ランダム・フォレストのモデルを作成し、そのモデルをプロジェクトに保存した後、デプロイメント・スペースにプロモード&デプロイするプログラムを以下に示す。
from pyspark.sql import SparkSession
from pyspark.ml.feature import StringIndexer, IndexToString, VectorAssembler
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.classification import RandomForestClassifier
from ibm_watson_machine_learning import APIClient
WML_CREDENTIALS = {
"url": "https://cpd-cpd-instance.mycluster-jp-tok-1-bx2-16-309ee4c04392c50ffd16fd0a4c2688e1-0000.jp-tok.containers.appdomain.cloud",
"username": "admin",
"password": "xxx",
"instance_id": "openshift",
"version": "4.8"
}
PROJECT_ID = "de382df6-765f-468e-b4de-b87d875117ef"
DEPLOYMENT_SPACE_ID = "c4cd6278-3fe5-438c-8870-08691f8e270c"
MODEL_NAME = "adult_census_model_rf"
def init_spark():
spark = SparkSession.builder \
.appName("lightgbm-model-creation") \
.config("spark.hadoop.fs.s3a.bucket.watsonx-data-mo-icos-2023.endpoint" ,"s3.jp-tok.cloud-object-storage.appdomain.cloud") \
.config("spark.hadoop.fs.s3a.bucket.watsonx-data-mo-icos-2023.access.key" ,"xxx") \
.config("spark.hadoop.fs.s3a.bucket.watsonx-data-mo-icos-2023.secret.key" ,"xxx") \
.config("spark.sql.catalog.icos1", "org.apache.iceberg.spark.SparkCatalog") \
.config("spark.sql.catalog.icos1.type", "hive") \
.config("spark.sql.catalog.icos1.uri", "thrift://ibm-lh-lakehouse-hive-metastore-svc.cpd-instance.svc.cluster.local:9083") \
.enableHiveSupport() \
.getOrCreate()
return spark
def prepare_data(spark):
spark_df = spark.sql("select * from icos1.ai.adult_census")
protected_attributes = ["race", "age", "sex"]
for attr in protected_attributes:
spark_df = spark_df.drop(attr)
train_data, test_data = spark_df.randomSplit([0.85, 0.15], seed=24)
return train_data, test_data
def build_model(spark, train_data, test_data):
cat_features = ['workclass', 'education', 'marital', 'occupation', 'relationship', 'citizen_status']
num_features = ["fnlwgt", "education_num", "capitalgain", "loss", "hoursper"]
stages = []
cat_features_idx = [col + "_idx" for col in cat_features]
str_indexer = StringIndexer(inputCols=cat_features, outputCols=cat_features_idx)
stages.append(str_indexer)
str_indexer_label = StringIndexer(inputCol="label", outputCol="encoded_label").fit(train_data)
label_converter = IndexToString(inputCol="prediction", outputCol="predictedLabel", labels=str_indexer_label.labels)
stages.append(str_indexer_label)
va_features = VectorAssembler(
inputCols=cat_features_idx + num_features, outputCol="features"
)
stages.append(va_features)
classifier = RandomForestClassifier(labelCol="encoded_label", featuresCol="features", maxBins=42)
stages.append(classifier)
stages.append(label_converter)
pipeline = Pipeline(stages=stages)
model = pipeline.fit(train_data)
predictions = model.transform(test_data)
evaluator = BinaryClassificationEvaluator(labelCol="encoded_label", rawPredictionCol="rawPrediction")
roc = evaluator.evaluate(predictions)
print(f"Area under the ROC = {roc}")
return model, pipeline
def get_wml_client(wml_credentials):
wml_client = APIClient(wml_credentials)
return wml_client
def store_model(wml_client, project_id, model_name, model, pipeline, train_data):
wml_client.set.default_project(project_id)
software_spec_uid = wml_client.software_specifications.get_id_by_name("spark-mllib_3.3")
print("Software Specification ID: {}".format(software_spec_uid))
model_props = {
wml_client._models.ConfigurationMetaNames.NAME:"{}".format(model_name),
wml_client._models.ConfigurationMetaNames.TYPE: "mllib_3.3",
wml_client._models.ConfigurationMetaNames.SOFTWARE_SPEC_UID: software_spec_uid,
wml_client._models.ConfigurationMetaNames.LABEL_FIELD: "label",
}
print("Storing model ...")
model_details = wml_client.repository.store_model(
model=model,
meta_props=model_props,
training_data=train_data,
pipeline=pipeline)
model_id = wml_client.repository.get_model_id(model_details)
print(f"Model ID: {model_id}")
return model_id
def promote_and_deploy_model(wml_client, project_id, deployment_space_id, model_name, model_id):
wml_client.set.default_project(project_id)
# space_model_id = wml_client.repository.promote_model(model_id=model_id, source_project_id=project_id, target_space_id=deployment_space_id)
space_model_id = wml_client.spaces.promote(asset_id=model_id, source_project_id=project_id, target_space_id=deployment_space_id)
wml_client.set.default_space(deployment_space_id)
metadata = {
"name": model_name
}
meta_props={
wml_client.deployments.ConfigurationMetaNames.NAME: model_name,
wml_client.deployments.ConfigurationMetaNames.ONLINE: {}
}
deployment_details = wml_client.deployments.create(artifact_uid=space_model_id, meta_props=meta_props, target_space_id=deployment_space_id)
print(f"Deployment space model asset ID: {space_model_id}")
print(f'Deployment space model deployment ID: {deployment_details["metadata"]["id"]}')
def main():
try:
spark = init_spark()
train_data, test_data = prepare_data(spark)
model, pipeline = build_model(spark, train_data, test_data)
wml_client = get_wml_client(WML_CREDENTIALS)
model_id = store_model(wml_client, PROJECT_ID, MODEL_NAME, model, pipeline, train_data)
promote_and_deploy_model(wml_client, PROJECT_ID, DEPLOYMENT_SPACE_ID, MODEL_NAME, model_id)
finally:
spark.stop()
if __name__ == '__main__':
main()
プログラムの最初に以下の4つの値をセットする。
変数名 | 値 |
---|---|
WML_CREDENTIALS | CP4Dのurl/username/passwordをセットする。その他は変更不要 |
PROJECT_ID | xxx |
DEPLOYMENT_SPACE_ID | xxx |
MODEL_NAME | adult_census_model_rf |
PROJECT_ID
は、該当プロジェクトを開き、管理
タブに記載されているプロジェクトID
の値をセットする。
DEPLOYMENT_SPACE_ID
は、該当デプロイメント・スペースを開き、管理
タブに記載されているスペースGUID
の値をセットする。
上記のプログラムをspark01
のストレージ・ボリュームにアップロードし、そのプログラムを実行要求する以下のコマンドを実行してみる。
$ curl -k -X POST https://${CPD_URL}/v4/analytics_engines/${INSTANCE_ID}/spark_applications -H "Authorization: ZenApiKey $TOKEN" -X POST -d '{
"application_details": {
"application": "/spark01/watsonx_data_model_creation_random_forest.py",
"conf": {
"spark.driver.extraClassPath": "/opt/ibm/connectors/iceberg-lakehouse/iceberg-3.3.2-1.2.1-hms-4.0.0-shaded.jar",
"spark.sql.catalogImplementation": "hive",
"spark.sql.extensions": "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions",
"spark.sql.iceberg.vectorization.enabled": "false",
"spark.hive.metastore.client.auth.mode": "PLAIN",
"spark.hive.metastore.client.plain.username": "admin",
"spark.hive.metastore.client.plain.password": "xxx",
"spark.hive.metastore.use.SSL": "true",
"spark.hive.metastore.truststore.type": "JKS",
"spark.hive.metastore.truststore.path": "file:///opt/ibm/jdk/lib/security/cacerts",
"spark.hive.metastore.truststore.password": "changeit"
}
},
"volumes": [{
"name": "cpd-instance::spark01",
"mount_path": "/spark01/",
"source_sub_path": "spark"
}]
}'
{"application_id":"167dfc01-2a35-4be8-ba25-9e6f5a52e8b1","state":"ACCEPTED"}
プログラムの実行が終了すると、デプロイメント・スペースにモデルがデプロイされる。
モデルをテストしてみる。
問題なく推論結果が得られた。
5. LightGBMのモデルを作成してみる
LightGBMのモデルの作成に必要なパッケージの導入は以下のURLを参照した。
結果として、以下のプロパティ値を設定することで導入できた。
(以下はSpark 3.3で必要となるパッケージ)
"spark.jars.packages": "com.microsoft.azure:synapseml_2.12:0.11.4-spark3.3"
"spark.jars.repositories": "https://mmlspark.azureedge.net/maven"
さらに冒頭で述べたように、spark.jars.ivy
プロパティに共有ストレージのパスをセットする必要があり、今回は/spark01/ivy2
をセットしている。
これらの設定は、プログラムを実行要求するコマンドで設定する必要がある。
LightGBMのモデルを作成するプログラムを以下に示す。
from pyspark.sql import SparkSession
from pyspark.ml.feature import StringIndexer, IndexToString, VectorAssembler
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from synapse.ml.lightgbm import LightGBMClassifier
MODEL_NAME = "adult_census_model_lightgbm"
def init_spark():
spark = SparkSession.builder \
.appName("lightgbm-model-creation") \
.config("spark.hadoop.fs.s3a.bucket.watsonx-data-mo-icos-2023.endpoint" ,"s3.jp-tok.cloud-object-storage.appdomain.cloud") \
.config("spark.hadoop.fs.s3a.bucket.watsonx-data-mo-icos-2023.access.key" ,"xxx") \
.config("spark.hadoop.fs.s3a.bucket.watsonx-data-mo-icos-2023.secret.key" ,"xxx") \
.config("spark.sql.catalog.icos1", "org.apache.iceberg.spark.SparkCatalog") \
.config("spark.sql.catalog.icos1.type", "hive") \
.config("spark.sql.catalog.icos1.uri", "thrift://ibm-lh-lakehouse-hive-metastore-svc.cpd-instance.svc.cluster.local:9083") \
.enableHiveSupport() \
.getOrCreate()
return spark
def prepare_data(spark):
spark_df = spark.sql("select * from icos1.ai.adult_census")
protected_attributes = ["race", "age", "sex"]
for attr in protected_attributes:
spark_df = spark_df.drop(attr)
train_data, test_data = spark_df.randomSplit([0.85, 0.15], seed=1)
return train_data, test_data
def build_model(spark, train_data, test_data):
cat_features = ['workclass', 'education', 'marital', 'occupation', 'relationship', 'citizen_status']
num_features = ["fnlwgt", "education_num", "capitalgain", "loss", "hoursper"]
stages = []
cat_features_idx = [col + "_idx" for col in cat_features]
str_indexer = StringIndexer(
inputCols=cat_features, outputCols=cat_features_idx
)
stages.append(str_indexer)
str_indexer_label = StringIndexer(inputCol="label", outputCol="encoded_label").fit(train_data)
label_converter = IndexToString(inputCol="prediction", outputCol="predictedLabel", labels=str_indexer_label.labels)
stages.append(str_indexer_label)
va_features = VectorAssembler(
inputCols=cat_features_idx + num_features, outputCol="features"
)
stages.append(va_features)
params = {
"learningRate": .2,
"numIterations": 200,
"numLeaves": 31
}
classifier = LightGBMClassifier(objective="binary", featuresCol="features", categoricalSlotNames=cat_features_idx, labelCol="encoded_label", **params)
stages.append(classifier)
stages.append(label_converter)
pipeline = Pipeline(stages=stages)
model = pipeline.fit(train_data)
predictions = model.transform(test_data)
evaluator = BinaryClassificationEvaluator(labelCol="encoded_label", rawPredictionCol="rawPrediction")
roc = evaluator.evaluate(predictions)
print(f"Area under the ROC = {roc}")
return model, pipeline
def save_model(model, model_name):
model.save(model_name)
def main():
try:
spark = init_spark()
train_data, test_data = prepare_data(spark)
model, pipeline = build_model(spark, train_data, test_data)
save_model(model, MODEL_NAME)
finally:
spark.stop()
if __name__ == '__main__':
main()
上記のプログラムをspark01
のストレージ・ボリュームにアップロードし、そのプログラムを実行要求する以下のコマンドを実行してみる。
$ curl -k -X POST https://${CPD_URL}/v4/analytics_engines/${INSTANCE_ID}/spark_applications -H "Authorization: ZenApiKey $TOKEN" -X POST -d '{
"application_details": {
"application": "/spark01/watsonx_data_model_creation_lightgbm.py",
"conf": {
"spark.driver.extraClassPath": "/opt/ibm/connectors/iceberg-lakehouse/iceberg-3.3.2-1.2.1-hms-4.0.0-shaded.jar",
"spark.sql.catalogImplementation": "hive",
"spark.sql.extensions": "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions",
"spark.sql.iceberg.vectorization.enabled": "false",
"spark.hive.metastore.client.auth.mode": "PLAIN",
"spark.hive.metastore.client.plain.username": "admin",
"spark.hive.metastore.client.plain.password": "5TvBgyaTqjzE",
"spark.hive.metastore.use.SSL": "true",
"spark.hive.metastore.truststore.type": "JKS",
"spark.hive.metastore.truststore.path": "file:///opt/ibm/jdk/lib/security/cacerts",
"spark.hive.metastore.truststore.password": "changeit",
"spark.jars.packages": "com.microsoft.azure:synapseml_2.12:0.11.4-spark3.3",
"spark.jars.repositories": "https://mmlspark.azureedge.net/maven",
"spark.jars.ivy": "/spark01/ivy2"
}
},
"volumes": [{
"name": "cpd-instance::spark01",
"mount_path": "/spark01/",
"source_sub_path": "spark"
}]
}'
{"application_id":"09bbc7e7-2b2d-47a5-8bd0-56e0f3430566","state":"ACCEPTED"}
プログラムの実行が終了すると、モデルがログとともに保存される。
(下図のadult_census_model_lightgbm
フォルダーにモデルが保存されている)
今回のプログラムではモデルを保存する際にパスを指定していないため、ログ・ファイル等が保存されるフォルダーと同じところにモデルが保存されている。モデルを保存する際に、/spark01/
パスを指定すると、cpd-instance::spark01
の方に保存される。
補足: Spark3.3 から Spark3.4 への切り替え
以下のURLに従う。
実際に Spark 3.4 に変えてみる。
(環境変数名やその値は今まで述べてきたやり方に従っており、上記のURLとは少し異なる)
$ export CPD_URL=cpd-cpd-instance.mycluster-jp-tok-1-bx2-16-309ee4c04392c50ffd16fd0a4c2688e1-0000.jp-tok.containers.appdomain.cloud
$ export INSTANCE_ID=b13d7305-79c0-47f0-82b7-9a7da804937a
$ export TOKEN=xxx
$ curl -X PUT https://${CPD_URL}/v4/analytics_engines/${INSTANCE_ID}/default_runtime \
--header "Authorization: ZenApiKey ${TOKEN}" \
--header "Content-Type: application/json" \
--data-raw '{
"spark_version": "3.4"
}'
{"spark_version":"3.4"}
Spark3.4に変更した環境を使って、LightGBMのパッケージをSpark3.4用に変えて、LightGBMのモデルを作成するプログラムを実行してみる。
(Spark3.4用のLightGBMパッケージ: com.microsoft.azure:synapseml_2.12:1.0.2
)
$ curl -k -X POST https://${CPD_URL}/v4/analytics_engines/${INSTANCE_ID}/spark_applications -H "Authorization: ZenApiKey $TOKEN" -X POST -d '{
"application_details": {
"application": "/spark01/watsonx_data_model_creation_lightgbm.py",
"conf": {
"spark.driver.extraClassPath": "/opt/ibm/connectors/iceberg-lakehouse/iceberg-3.3.2-1.2.1-hms-4.0.0-shaded.jar",
"spark.sql.catalogImplementation": "hive",
"spark.sql.extensions": "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions",
"spark.sql.iceberg.vectorization.enabled": "false",
"spark.hive.metastore.client.auth.mode": "PLAIN",
"spark.hive.metastore.client.plain.username": "admin",
"spark.hive.metastore.client.plain.password": "5TvBgyaTqjzE",
"spark.hive.metastore.use.SSL": "true",
"spark.hive.metastore.truststore.type": "JKS",
"spark.hive.metastore.truststore.path": "file:///opt/ibm/jdk/lib/security/cacerts",
"spark.hive.metastore.truststore.password": "changeit",
"spark.jars.packages": "com.microsoft.azure:synapseml_2.12:1.0.2",
"spark.jars.repositories": "https://mmlspark.azureedge.net/maven",
"spark.jars.ivy": "/spark01/ivy2"
}
},
"volumes": [{
"name": "cpd-instance::spark01",
"mount_path": "/spark01/",
"source_sub_path": "spark"
}]
}'
{"application_id":"5df9920a-d56c-4d2a-a88f-d2e8cc9581a2","template_id":"spark-3.4-cp4d-template","state":"ACCEPTED","runtime":{"spark_version":"3.4"}}
プログラムの実行結果は以下のように成功し、Spark version
が 3.4 となっている。
Spark3.3での実行結果と同様、LightGBMモデルもログと共に保存されている。