当記事は2016/10月に投稿しましたが、現時点(【2019/2月】)では既に古くなっています。記事自体はアーカイブの目的でこのまま残しますが、当記事の内容を参考になさらないようにお願いいたします。代替の記事としては下記などがございます。
こんにちわ!石田です。2016年初にQiitaでBluemixのPredictive Analyticsを使ってみたという記事を書きましたが、その続編です。当時はスコアリングはリアルタイム(1件毎)しかできなかったのですが、その後まとめて「バッチ」で実行できるようになりましたので、その機能を紹介します。
注) 2016/10/25付でPredictive Analyticsの名前はWatson Machine Learningになりました。
今後どんどん機能が拡張されるようです。お楽しみに!
Predictive Analyticsの「バッチ」とは?
要は多数のレコードに対して予測分析をしたいときに、「一度にまとめてスコアリングを行う」ことで性能を上げる実行モードです。例えば前回の記事に倣って、10万人の顧客の各々に対して融資可否の判定をしたいとします。これをリアルタイム・スコアリングで行う場合は、顧客1件毎のスコアリング=RESTのリクエストを10万回投げることになるので処理時間が非常にかかります。(※1) それに対してバッチ・スコアリングの場合は事前にデータベースのテーブルに10万件の顧客情報を格納しておき「このテーブルのスコアリングを実行せよ」というREST要求を1回出すだけで済みます。あとはサービス側でバックグラウンドでスコアリングのジョブが実行され、その結果は別のテーブルに書き出されます。なおバッチの場合は処理は非同期になります。(=ジョブの実行リクエストを受け付けたらBluemix上でジョブが開始され、実行リクエストはいったん完結します。ジョブが終わったかどうかは別の状況照会リクエストで確認する流れになります)
※1..正確には、リアルタイム・スコアリングでも複数件のレコードを配列で渡せますが、限界があるでしょう。
「バッチ」の機能
参考文献:Predictive Analyticsの「バッチ」のドキュメント
バッチAPIを使って、以下のようなことができます。
- バッチ・スコアリング (Batch Score)
- それ以外にも学習 (Training),評価 (Evaluation),自動最新表示 (Auto-Refresh),ストリームの実行 (Run Stream)など
- 現在は入出力ともリレーショナルDBのみサポート。(Swiftなどの)ストレージ上のファイルはサポートされません。
- RDBはBluemixのPredictive Analyticsからアクセスできること( dashDBなどは最適かと)
バッチ実行の作業の流れ(概要)
以下は作業の流れです。
①. スコアリング対象のテーブルとデータの準備
②. ストリームのファイル(*.str) をアップロード
③. ジョブの実行要求 → 受付
④. ジョブの実行開始 ( テーブルを読んでスコアリングして結果を別テーブルに格納)
⑤. ジョブのステータスを定期的に確認
⑥. ジョブが終わったら結果のテーブルを参照
⑦. (オプション) ジョブを削除
やってみる
今回は以前の記事で使ったものと同じく、融資審査の例をバッチ・スコアリングでやってみます。記述の重複を避けるため、既に以前の記事での「スコアリング・ブランチの設定」までの作業が完了しており、予測モデルのファイル(*.str)が出来ているものとします。
1. スコアリング対象のテーブルとデータの準備
今回はストリームの入力となるtree_credit.savと出力となるTableをdashDB上のテーブルで置換して実行します。なおこの置換はジョブ実行時の定義(JSON)で指定します。実行時にRDBを使うからといって、元のストリームファイルを修正する必要は、ありません。
要はdashDB上にtree_credit.savと同じ形式のテーブルとデータを準備すればいいです。dashDBの細かい操作は割愛しますが、テスト的に行うなら以下の手順が楽かと思います。先に一点だけ注意点ですが、作成するテーブルのカラム名はストリームの対象ノードのフィールド名と(大文字・小文字/空白など含め) 完全に同じものにする必要があります。 異なっていると、ジョブ実行時にエラーになります。
1.SPSS Modeler上でtree_credit.saveを「編集」で開き、以下の操作でcsvファイルとしてエクスポート
2.お好みのテキスト・エディターで、エクスポートしたファイルの一行目のフィールド名を下記のようにダブル・クオーテーションで囲むように編集して保存します。
これを行わないと、次の3.の手順でdashDB上のテーブルのカラム名が大文字に変換されてしまい、実行時に不一致になってしまいす。
3.dashDBのコンソールの「Tables」で以下の操作にてCSVファイルを指定してRUN DDLするとテーブルが作成されます。
前述の通り、テーブルをご自分で定義する場合は、カラムの名前をオリジナルのtree_credit.savのフィールド名と完全に一致するようにしてください。(テーブル名は任意)
4.dashDBのコンソールの「Load」で以下の操作にてCSVファイルを指定して進んでいくとテーブルにデータが10件ロードされます。(詳細は割愛)
2. ストリームのファイル(*.str) をアップロード
参考文献: Predictive Analytics サービスのバッチ・ジョブ API
ここからはRESTでバッチジョブAPIをたたく操作となります。詳細は上記のドキュメントをご参照ください。当記事では前回同様、Postmanを使っていきますが、実際はアプリからAPIを呼び出します。まずはSPSS Modeler上で作成したstreamのファイルをアップロードします。リアルタイム・スコアリングの場合はコンソールでの操作が可能でしたが、バッチでは、あのコンソールは利用できません。 自分でファイルをPUTする必要があります。
PUT http://{service instance}/pm/v1/file/{fileid}?accesskey={access_key}
- {}部分は環境により適宜置換
- Content-Type:multipart/form-dataであること
以下がPostmanでの指定・実行例です。始めなので細かくご説明します。
① HTTPのPUT
② FileIDは任意。ここでは「loanstr01」としました。※この名前は後で使います
③ 「form-data」を選択
④ HeaderはデフォルトがContent-Type:multipart/form-dataなので指定不要です。あえて指定する場合は(ファイルをPUTする際に区切りが必要なため)以下のようにboudndaryも指定が必要です。
Content-Type:multipart/form-data; boundary=----WebKitFormBoundaryB2fBBB0AIY21wD38
⑤ bodyで送信するファイルを指定します
⑥ keyの名前は何でもいいみたいです
⑦ ストリーム・ファイルを指定します。PostmanではUIで簡単に指定できますが、以下の記述でも動きました。
{ value: fs.createReadStream("d:\\tmp\\modelingintro.str")}
以下でアップロードの状況を確認できます
GET http://{service instance}/pm/v1/file/{fileid}?accesskey={access_key}
- {}部分は環境により適宜置換
3. ジョブの実行要求 → 受付
参考文献: ジョブ定義JSON
次はジョブの実行要求です。ジョブの実行要求自体はRESTですが、その内容はジョブ定義JSONをbodyに指定して引き渡します。JobIDは自動的に割り振られるのではなく、要求時にURL上で明示的に指定します。ちなみに同じJOBIDで複数回の要求を出した場合、重複エラーにはならず上書き・再利用されるようです。
PUT http://{service instance}/pm/v1/jobs/{jobid}?accesskey={access_key}
- {}部分は環境により適宜置換
- {jobid}は任意の文字列
以下の図でバッチスコアリングをするジョブ定義JSONとストリームの関係を説明します
①実行する内容(ジョブタイプ)を指定します。BATCH_SCORE以外にもTRAINING/EVALUATION/AUTO_REFRESH/RUN_STREAMなどが指定できます。
②実行するストリーム・ファイルのid. 先ほどPUTした際に指定した{fileid}です。nameはオプションでなくてもいいです。
③使用するRDBの接続情報。Bluemix上のDBの場合はVCAP_SERVICESで提供される接続情報を指定します。
④入力(ソース・ノード)の指定。ここでストリーム上のノードとRDB上のテーブルを関連付けます。node:で指定するノードの名前はCase-Sensitiveです。 大文字・小文字まで正確に指定してください。
⑤出力(エクスポート・ノード)の指定。同様にストリーム上のノードとRDB上のテーブルを関連付けます。こちらのノードの名前もCase-Sensitiveです。
- なお出力のテーブルですが、insertMode=Dropの場合は新規に作ってくれますので、未定義でもOKです
insertMode 属性 | 説明 |
---|---|
Append | 表が存在し、挿入のための互換性がある必要があります |
Create | 表が作成されます (表が既に存在する場合はエラーが返されます) |
Drop | 参照される表が存在する場合は除去され、再作成されます |
Refresh | 表の既存の行は、新しい行が挿入される前に削除されます |
- 入力と出力で異なるDBを使う場合は③のdbDefinitionsにエントリーを追加してdbRefでその名前を指定します
- スコアリングの場合、実行が成功すると以下のように元のテーブルに予測結果カラムが追加されたテーブルに結果が書き込まれます。
コピペ用に下記に定義を貼っておきます。
{
"action": "BATCH_SCORE",
"model": {
"id": "loanstr01",
"name": "modelingintro1.str"
},
"dbDefinitions": {
"db": {
"type": "DashDB",
"host": "awh-yp-small02.services.dal.bluemix.net",
"port": 50000,
"db": "BLUDB",
"username": "dash110891",
"password": "xxxxxxxxx",
"options": ""
}
},
"setting": {
"inputs": [{
"odbc": {
"dbRef": "db",
"table": "TREE_CREDIT"
},
"node": "tree_credit.sav",
"attributes": []
}],
"exports": [{
"odbc": {
"dbRef": "db",
"table": "CREDIT_RESULT",
"insertMode": "Drop"
},
"node": "Table",
"attributes": []
}]
}
}
Postmanでの実行結果(jobidとして「job01」を指定しています)
5. ジョブのステータスを定期的に確認
個々のジョブのステータス確認(/jobs/{jobid})も、一式まとめての確認(/jobs)もできます。
GET http://{service instance}/pm/v1/jobs/{jobid}?accesskey={access_key}
- {}部分は環境により適宜置換
- {jobid}は実行要求時に指定したジョブのID
GET http://{service instance}/pm/v1/jobs?accesskey={access_key}
- {}部分は環境により適宜置換
上記はジョブが正常に終了(SUCCESS)した場合のレスポンスです。ジョブのステータスには以下がありえます。
- 保留中(Pending)
- 実行中(Running)
- キャンセル中 (Canceling)
- キャンセル済み(Canceled)
- 失敗(Failed)
- 成功(Success)
ご参考までに末尾にOK/NGケースのレスポンスの例を掲載しておきますね。
6. ジョブが終わったら結果のテーブルを参照
dashDBの出力テーブルを確認します。(今回はDROPを指定したので新たにテーブルが作成されています。) 新たに$R-Credit ratingと$RC-Credit ratingのカラムが追加され、各々の予測結果がセットされていることがおわかりいただけるかと思います。
7. (オプション) ジョブを削除
ジョブの実行情報は明示的に消さない限り残っています。
以下の要求で削除できます。
DELETE http://{service instance}/pm/v1/jobs/{jobid}?accesskey={access_key}
- {}部分は環境により適宜置換
- {jobid}は実行要求時に指定したジョブのID
成功した場合削除されたジョブの数(この場合は1)が返ります。
終わりに
いかがでしたでしょうか。今回はバッチの代表的なユースケースであるスコアリングのみご紹介しましたが、他にも色々とできることがあります。
- スコアリングでパラメーターを渡す
- 学習 (Training)/評価 (Evaluation)/自動最新表示 (Auto-Refresh)/ストリームの実行 (Run Stream)
- 今回ご紹介できていないバッチAPI
これらの情報はPredictive Analyticsの「バッチ」のドキュメントをご参照ください。ではでは。
ご参考ージョブ実行のレスポンスの例(JSON)
成功した例
{
"result": {
"jobId": "job01",
"jobStatus": "SUCCESS"
},
"namespace": "us-south$33303cff-1cd5-4546-8016-6803bd44f5e3",
"storageId": "us-south$33303cff-1cd5-4546-8016-6803bd44f5e3/job01",
"action": "BATCH_SCORE",
"createdAt": 1476448558361,
"creater": "Tenant [instanceId=33303cff-1cd5-4546-8016-6803bd44f5e3, planId=3f6acf43-ede8-413a-ac69-f8af3bb0cbfe, appId=26d6c84e-dfdf-4a09-ae84-af855358ac2c, enable=true]",
"finishedAt": 1476448572146,
"id": "job01",
"model": {
"id": "loanstr01",
"name": "modelingintro1.str"
},
"status": "SUCCESS",
"setting": {
"exports": [
{
"odbc": {
"insertMode": "Drop",
"dbRef": "db",
"table": "CREDIT_RESULT"
},
"node": "Table",
"attributes": []
}
],
"inputs": [
{
"odbc": {
"dbRef": "db",
"table": "TREE_CREDIT"
},
"node": "tree_credit.sav",
"attributes": []
}
],
"parameterOverride": []
},
"dbDefinitions": {
"db": {
"db": "BLUDB",
"host": "awh-yp-small02.services.dal.bluemix.net",
"options": "",
"password": "xxxxxxxx",
"port": 50000,
"username": "dash110891",
"type": "DashDB"
}
}
}
失敗した例-メッセージが返ります。
{
"result": {
"failureMsg": "Failed to execute job, com.spss.psapi.session.SessionException: Model refers to undefined field 'Age', details: java.lang.RuntimeException: com.spss.psapi.session.SessionException: Model refers to undefined field 'Age'\n\tat com.ibm.spss.internal.PredictiveModelImpl.scoreBatch(PredictiveModelImpl.java:199)\n\tat com.ibm.spss.internal.PredictiveModelImpl.scoreBatch(PredictiveModelImpl.java:177)\n\tat com.ibm.spss.blackbox.job.executor.impl.BatchScoreExecutorImpl.run(BatchScoreExecutorImpl.java:35)\n\tat com.ibm.spss.blackbox.job.executor.impl.AbstractExecutor.execute(AbstractExecutor.java:74)\n\tat com.ibm.spss.blackbox.job.director.JobDirector.run(JobDirector.java:430)\n\tat java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:522)\n\tat java.util.concurrent.FutureTask.run(FutureTask.java:277)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1153)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)\n\tat java.lang.Thread.run(Thread.java:785)\nCaused by: com.spss.psapi.session.SessionException: Model refers to undefined field 'Age'\n\tat com.spss.clementine.psapi.task.TaskManager.execute(Unknown Source)\n\tat com.spss.clementine.psapi.task.SessionTaskManager.execute(Unknown Source)\n\tat com.spss.clementine.session.DefaultSession.runSelected(Unknown Source)\n\tat com.spss.clementine.session.DefaultSession.run(Unknown Source)\n\tat com.ibm.spss.internal.util.StreamNodeHelper.executeTerminalNode(StreamNodeHelper.java:354)\n\tat com.ibm.spss.internal.PredictiveModelImpl.scoreBatch(PredictiveModelImpl.java:187)\n\t... 9 more\nCaused by: com.spss.clementine.component.ExecuteException: Model refers to undefined field 'Number of credit cards'\n\tat com.spss.clementine.component.StreamExecutionJob.validate(Unknown Source)\n\tat com.spss.clementine.component.StreamExecutionJob.makeConsistent(Unknown Source)\n\tat com.spss.clementine.component.StreamExecutionJob.makeConsistent(Unknown Source)\n\tat com.spss.clementine.component.StreamExecutionJob.execute(Unknown Source)\n\tat com.spss.clementine.component.Stream.execute(Unknown Source)\n\tat com.spss.clementine.session.ExecutionManager.executeNodes(Unknown Source)\n\tat com.spss.clementine.session.ExecutionManager.executeNodes(Unknown Source)\n\tat com.spss.clementine.session.ExecutionManager.access$300(Unknown Source)\n\tat com.spss.clementine.session.ExecutionManager$12.execute(Unknown Source)\n\tat com.spss.clementine.session.ExecutionManager$ExecutionTask.work(Unknown Source)\n\tat com.spss.clementine.session.util.Task.callWork(Unknown Source)\n\tat com.spss.clementine.session.util.DefaultTaskHarness.taskWork(Unknown Source)\n\tat com.spss.clementine.session.util.DefaultTaskHarness$1.run(Unknown Source)\n\t... 1 more\n",
"jobId": "job01",
"jobStatus": "FAILED"
},
"namespace": "us-south$33303cff-1cd5-4546-8016-6803bd44f5e3",
"storageId": "us-south$33303cff-1cd5-4546-8016-6803bd44f5e3/job01",
"action": "BATCH_SCORE",
"createdAt": 1476422737855,
"creater": "Tenant [instanceId=33303cff-1cd5-4546-8016-6803bd44f5e3, planId=3f6acf43-ede8-413a-ac69-f8af3bb0cbfe, appId=26d6c84e-dfdf-4a09-ae84-af855358ac2c, enable=true]",
"finishedAt": 1476422746017,
"id": "job01",
"model": {
"id": "loanstr01",
"name": "modelingintro1.str"
},
"status": "FAILED",
"setting": {
"exports": [
{
"odbc": {
"insertMode": "Drop",
"dbRef": "db",
"table": "CREDIT_RESULT"
},
"node": "Table",
"attributes": []
}
],
"inputs": [
{
"odbc": {
"dbRef": "db",
"table": "TREE_CREDIT"
},
"node": "tree_credit.sav",
"attributes": []
}
],
"parameterOverride": []
},
"dbDefinitions": {
"db": {
"db": "BLUDB",
"host": "awh-yp-small02.services.dal.bluemix.net",
"options": "",
"password": "xxxxxxxx",
"port": 50000,
"username": "dash110891",
"type": "DashDB"
}
}
}