概要
- JavaとApache Sparkを使った機械学習入門編です
- 「価格推定」を題材にApache Sparkの使い方から実際の機械学習(学習、回帰)までステップアップ方式でハンズオンします
- 機械学習アルゴリズムには教師あり学習の勾配ブースティングツリーを使います
- ステップアップ方式に複数回に分けて投稿します
- 全ソースコードはこちら https://github.com/riversun/spark-gradient-boosting-tree-regression-example
環境
- Apache Spark 2.4.3
- Java 8~
データの種類を意識して前処理をする
前回は、CSVデータからSparkの表形式データとして読み込むところまでやりました。
さて、ここで、今回学習に使うデータをもういちど眺めてみます。
データはこちらにあります。
こんなデータになっています。
id,material,shape,weight,brand,shop,price
0,シルバー,ブレスレット,56,海外有名ブランド,百貨店,40864
1,ゴールド,指輪,48,国内有名ブランド,直営店,63055
2,ダイア,イアリング,37,国内有名ブランド,直営店,112159
3,ダイア,ネックレス,20,海外有名ブランド,直営店,216053
4,ダイア,ネックレス,33,海外有名ブランド,百貨店,219666
5,シルバー,ブローチ,55,国内有名ブランド,百貨店,16482
6,プラチナ,ブローチ,58,海外超有名ブランド,直営店,377919
7,ゴールド,イアリング,49,国内有名ブランド,直営店,60484
8,シルバー,ネックレス,59,ノーブランド,激安ショップ,6256
・・・・
質的データと量的データ
さて、変数名(materialとかshapeとか)ごとに、どんな値があるか見ていくと以下のように整理できました。
こうしてみると、左側のデータ(material,brand,shop,shape)はそのカテゴリーを示す文字列の値になっています。このように、数値として直接測定できないものを質的データと呼びます。
一方、右側のデータ(weight,price)は連続的な値をとる数値になっています。このように、数値として直接測定したり四則演算できるようなデータを量的データと呼びます。
カテゴリ変数(Categorical variable)と連続変数(Continuous variable)
前述のとおり、質的データをとるようなmaterial,brand,shop,shapeといった変数は、**カテゴリ変数(Categorical variable)**と呼びます。
また、量的データ、つまり連続する値(連続値)をとるようなweight,priceといった変数は、**連続変数(Categorical variable)**と呼びます。
カテゴリ変数(Categorical variable)の取り扱い
機械学習は、結局のところ数値計算なので、カテゴリ変数がもつ質的データは何らかの数値に変換してから処理することになります。
ここでもう少しカテゴリ変数について見てみます。
カテゴリ変数のmaterialとshapeはアクセサリーの材質(material)と加工後の形態(shape)ですが、これらには意味的な違いがあります。
materialはダイア、プラチナ、ゴールド、シルバーですが、わたしたちは経験的にダイアはシルバーよりも高価であることを知っていますので、materialがもつデータには序列がありそうです。
一方のshapeは指輪、ネックレス、イアリング、ブローチ、ブレスレットですが、指輪よりもブレスレットのほうが上等みたいな序列は存在せず、どの選択肢も対等な関係にあるように思えます。
カテゴリ変数は順序変数(Ordinal variable)と名義変数(Nominal variable)に分けられる
このようにカテゴリー変数の値に序列がつくもの、大小を比較できるものを順序変数(Ordinal variable)(または順序尺度の変数)といいます。1
反対にカテゴリー変数の値どうしに序列がつかないものは名義変数(Nominal variable)(または名義尺度の変数)と呼びます。
統計学の教科書のはじめの方に出てくる内容ですが、機械学習を進めていく上でも重要な知識なので覚えておきます。
今回のデータを整理してみると以下のような感じでしょうか。
今回はいったんカテゴリ変数を順序変数と名義変数に明確に区別せずカテゴリ変数(Categorical variable)と連続変数(Continuous variable)というくくりで処理していきます。
カテゴリ変数(Categorical variable)を数値化(インデックス化)する
上でも述べたように、機械学習するためにはカテゴリ変数を数値化する必要があります。
数値化の技法はいくつかありますが、今回は各変数にシンプルにインデックスをつけていきます。
↓こんなイメージ。
Sparkでカテゴリ変数を扱う
さて、カテゴリ変数を数値化ようコードを書いていきましょう。
まず以下のコードのようにします。
import org.apache.spark.ml.feature.StringIndexer;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
public class GBTRegressionStep02 {
public static void main(String[] args) {
System.setProperty("hadoop.home.dir", "c:\\Temp\\winutil\\");
org.apache.log4j.Logger.getLogger("org").setLevel(org.apache.log4j.Level.ERROR);
org.apache.log4j.Logger.getLogger("akka").setLevel(org.apache.log4j.Level.ERROR);
SparkSession spark = SparkSession
.builder()
.appName("GradientBoostingTreeGegression")
.master("local[*]")
.getOrCreate();
Dataset<Row> dataset = spark
.read()
.format("csv")
.option("header", "true")
.option("inferSchema", "true")
.load("dataset/gem_price_ja.csv");
StringIndexer materialIndexer = new StringIndexer()// (1)
.setInputCol("material")// (2)
.setOutputCol("materialIndex");// (3)
Dataset<Row> materialIndexAddedDataSet = materialIndexer.fit(dataset).transform(dataset);// (4)
materialIndexAddedDataSet.show(10);// (5)
}
}
コード解説
(1)・・・カテゴリ変数を数値化(インデックス化)するためにはStringIndexerを使います。ある変数を入力して、エンコード(変換)した結果を別の変数として出力する役割をもっています。ここでのエンコードは、単なる文字列を数値に変換する処理になります。
(2) .setInputCol("material")・・・このStringIndexerに入力する変数名を指定します。CSVデータをSparkに読み込んだときに表形式のDatasetになっているので、変数は表のカラムと考えることができます。colは**カラム(column)のcolですね。つまり、StringIndexerの入力側のカラム名は"material"***あ
(3) .setOutputCol("materialIndex")・・・こちらは、出力側のカラム名を指定する。
(4)・・・**fit()**メソッドでカテゴリ変数の数値化のための学習モデルをつくる。その結果得られた学習モデルに対してtransformメソッドでDatasetを与えると新しいDatasetを生成する。
fitメソッドでカテゴリ変数の数値化のための学習モデルをつくる
ですが、
ここでmaterialIndexerという名前を付けたStringIndexerオブジェクトのfit()メソッドでは当たり前ですが実際には学習はしていません。単にカテゴリ変数として与えられた文字列を数値化する仕掛け2を作っているだけです。
transformメソッドでは実際に入力となったDatasetにあるmaterialというカラム名から、materialIndexというインデックス化された新しい変数を作る処理が実行されます。
ここが一番意味不明だとおもいますが、あとで意義がわかるので、今は呪文でOKです。
(5)・・・StringIndexerを処理した後のDatasetを表示します。
コードを実行すると以下のようになります。
materialIndexというカラムが右に追加されました!
このように、カテゴリ変数は何らかの形で数値化しないと機械学習で使えませんので、ほかのカテゴリ変数も数値化します。
ということで、おなじノリでほかのカテゴリ変数shape,brand,shopも同じように*StringIndexerを使って数値化(インデックス化)してみます。
すると以下のようになります
import org.apache.spark.ml.feature.StringIndexer;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
public class GBTRegressionStep02_part02 {
public static void main(String[] args) {
SparkSession spark = SparkSession
.builder()
.appName("GradientBoostingTreeGegression")
.master("local[*]")
.getOrCreate();
Dataset<Row> dataset = spark
.read()
.format("csv")
.option("header", "true")
.option("inferSchema", "true")
.load("dataset/gem_price_ja.csv");
StringIndexer materialIndexer = new StringIndexer()// (1)
.setInputCol("material")
.setOutputCol("materialIndex");
StringIndexer shapeIndexer = new StringIndexer()// (2)
.setInputCol("shape")
.setOutputCol("shapeIndex");
StringIndexer brandIndexer = new StringIndexer()// (3)
.setInputCol("brand")
.setOutputCol("brandIndex");
StringIndexer shopIndexer = new StringIndexer()// (4)
.setInputCol("shop")
.setOutputCol("shopIndex");
Dataset<Row> dataset1 = materialIndexer.fit(dataset).transform(dataset);// (5)
Dataset<Row> dataset2 = shapeIndexer.fit(dataset).transform(dataset1);// (6)
Dataset<Row> dataset3 = brandIndexer.fit(dataset).transform(dataset2);// (7)
Dataset<Row> dataset4 = shopIndexer.fit(dataset).transform(dataset3);// (8)
dataset4.show(10);
}
}
(1)~(4)は、それぞれの変数ごとにStringIndexer**を準備しています。
**(5)~(8)**は、materialIndexer→shapeIndexer→brandIndexer→shopIndexerという順番で次々とインデックス化をしては、新しいDatasetを作る処理をしています。
これを実行すると以下のようになります。
右にmaterialIndex,shapeIndex,brandIndex,shopIndexが追加されたことがわかります。
ところで、↓これは何とかならないのでしょうか。
Dataset<Row> dataset1 = materialIndexer.fit(dataset).transform(dataset);// (5)
Dataset<Row> dataset2 = shapeIndexer.fit(dataset).transform(dataset1);// (6)
Dataset<Row> dataset3 = brandIndexer.fit(dataset).transform(dataset2);// (7)
Dataset<Row> dataset4 = shopIndexer.fit(dataset).transform(dataset3);// (8)
何とかなります。
materialIndexer→shapeIndexer→brandIndexer→shopIndexerのように、ある処理がおわったら、その処理を入力として次の処理をしてというのをもっとスマートに書くことができます。
それが次に見るPipelineという仕組みです。
データの前処理をパイプライン化する
↓処理は
Dataset<Row> dataset1 = materialIndexer.fit(dataset).transform(dataset);
Dataset<Row> dataset2 = shapeIndexer.fit(dataset).transform(dataset1);
Dataset<Row> dataset3 = brandIndexer.fit(dataset).transform(dataset2);
Dataset<Row> dataset4 = shopIndexer.fit(dataset).transform(dataset3);
Pipelineをつかって↓のように書き換えることができます。
Pipeline pipeline = new Pipeline()
.setStages(new PipelineStage[] { materialIndexer, shapeIndexer, brandIndexer, shopIndexer });// (1)
PipelineModel pipelineModel = pipeline.fit(dataset);// (2)
Dataset<Row> indexedDataset = pipelineModel.transform(dataset);// (3)
(1)・・・このようにPipelineオブジェクトを作って、setStagesで処理を実行したい順にStringIndexerを記述していきます。こうして並べるだけで、これら一連の処理をPipelineとして、ひとかたまりで実行することができます。
(2)・・・pipeline#fitで、pipelineModelを取得します。現段階でもまだStringIndexerしか使っていないので学習用の処理は入れていませんが、学習用の処理が入るとfit()メソッドで得られるPipelineModelは指定したDatasetを元にした学習モデルを意味することになります。
(3)・・・PipelineModel#transformを実行すると一連のStringIndexer処理をまとめて実行し、結果として新しいDatasetを作ります。こちらも、Pipelineに学習用の処理を入れていた場合には、指定したDatasetに学習モデルを適用することを意味することになります。
以下にソースコードと実行結果を掲載します
import org.apache.spark.ml.Pipeline;
import org.apache.spark.ml.PipelineModel;
import org.apache.spark.ml.PipelineStage;
import org.apache.spark.ml.feature.StringIndexer;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
public class GBTRegressionStep02_part03 {
public static void main(String[] args) {
System.setProperty("hadoop.home.dir", "c:\\Temp\\winutil\\");
org.apache.log4j.Logger.getLogger("org").setLevel(org.apache.log4j.Level.ERROR);
org.apache.log4j.Logger.getLogger("akka").setLevel(org.apache.log4j.Level.ERROR);
SparkSession spark = SparkSession
.builder()
.appName("GradientBoostingTreeGegression")
.master("local[*]")
.getOrCreate();
Dataset<Row> dataset = spark
.read()
.format("csv")
.option("header", "true")
.option("inferSchema", "true")
.load("dataset/gem_price_ja.csv");
StringIndexer materialIndexer = new StringIndexer()
.setInputCol("material")
.setOutputCol("materialIndex");
StringIndexer shapeIndexer = new StringIndexer()
.setInputCol("shape")
.setOutputCol("shapeIndex");
StringIndexer brandIndexer = new StringIndexer()
.setInputCol("brand")
.setOutputCol("brandIndex");
StringIndexer shopIndexer = new StringIndexer()
.setInputCol("shop")
.setOutputCol("shopIndex");
Pipeline pipeline = new Pipeline()
.setStages(new PipelineStage[] { materialIndexer, shapeIndexer, brandIndexer, shopIndexer });// (1)
PipelineModel pipelineModel = pipeline.fit(dataset);// (2)
Dataset<Row> indexedDataset = pipelineModel.transform(dataset);// (3)
indexedDataset.show(10);
}
}
実行結果
意味的に同じことをしているので、実行結果は同じです。
インデクス化処理も、もう少しスマートに書く
さて、処理の実行の部分はPipelineをつかってスッキリしましたが、以下の部分もちょっと冗長です。
StringIndexer materialIndexer = new StringIndexer()
.setInputCol("material")
.setOutputCol("materialIndex");
StringIndexer shapeIndexer = new StringIndexer()
.setInputCol("shape")
.setOutputCol("shapeIndex");
StringIndexer brandIndexer = new StringIndexer()
.setInputCol("brand")
.setOutputCol("brandIndex");
StringIndexer shopIndexer = new StringIndexer()
.setInputCol("shop")
.setOutputCol("shopIndex");
Pipeline pipeline = new Pipeline()
.setStages(new PipelineStage[] { materialIndexer, shapeIndexer, brandIndexer, shopIndexer });/
ここはSparkというよりはJavaの書き方だけの問題ですが、少しスッキリさせてみます。
以下のように書き換えました。
List<String> categoricalColNames = Arrays.asList("material", "shape", "brand", "shop");// (1)
List<StringIndexer> stringIndexers = categoricalColNames.stream()
.map(col -> new StringIndexer()
.setInputCol(col)
.setOutputCol(col + "Index"))// (2)
.collect(Collectors.toList());
Pipeline pipeline = new Pipeline()
.setStages(stringIndexers.toArray(new PipelineStage[0]));// (3)
(1)・・・カテゴリ変数名をListにします
(2)・・・List#streamをつかってStringIndexerを生成します。setOutputColでは出力側のカラム名をカテゴリ変数名+"Index"(つまりmaterialIndex,shapeIndex,brandIndex,shopIndex)となるようにしています。処理結果はStringIndexerのListになります。
(3)・・・PipelineのステージにStringIndexerの配列になるようにセットします
ということで、今回のソースコードをまとめると以下のようになります。
だいぶスッキリしました。
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.spark.ml.Pipeline;
import org.apache.spark.ml.PipelineModel;
import org.apache.spark.ml.PipelineStage;
import org.apache.spark.ml.feature.StringIndexer;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
public class GBTRegressionStep02_part04 {
public static void main(String[] args) {
// System.setProperty("hadoop.home.dir", "c:\\Temp\\winutil\\");//for windows
org.apache.log4j.Logger.getLogger("org").setLevel(org.apache.log4j.Level.ERROR);
org.apache.log4j.Logger.getLogger("akka").setLevel(org.apache.log4j.Level.ERROR);
SparkSession spark = SparkSession
.builder()
.appName("GradientBoostingTreeGegression")
.master("local[*]")
.getOrCreate();
Dataset<Row> dataset = spark
.read()
.format("csv")
.option("header", "true")
.option("inferSchema", "true")
.load("dataset/gem_price_ja.csv");
List<String> categoricalColNames = Arrays.asList("material", "shape", "brand", "shop");
List<StringIndexer> stringIndexers = categoricalColNames.stream()
.map(col -> new StringIndexer()
.setInputCol(col)
.setOutputCol(col + "Index"))
.collect(Collectors.toList());
Pipeline pipeline = new Pipeline()
.setStages(stringIndexers.toArray(new PipelineStage[0]));
PipelineModel pipelineModel = pipeline.fit(dataset);
Dataset<Row> indexedDataset = pipelineModel.transform(dataset);
indexedDataset.show(10);
}
}
これを実行するれば、先ほどと同じく、カテゴリ変数はすべてインデックス化された状態のdatasetを得ることができます。
StringIndexerの数値化ポリシー
カテゴリ変数は順序変数(Ordinal variable)と名義変数に分けられる(Nominal variable)という話題を先にしましたが、SparkのStringIndexerはどのようなポリシーでインデックスの数字を振っているのでしょうか。
StringIndexerはインデックスとして数字をつけて順序付けをしますが、その順序付けのルールはデフォルトだと、出現頻度となります。
出現頻度以外には、アルファベット順を指定することもできます。
以下に設定例を示します。
StringIndexer materialIndexer1 = new StringIndexer()
.setStringOrderType("frequencyDesc")
.setInputCol("material")
.setOutputCol("materialIndex");
StringIndexer materialIndexer2 = new StringIndexer()
.setStringOrderType("alphabetDesc")
.setInputCol("material")
.setOutputCol("materialIndex");
StringIndexer materialIndexer3 = new StringIndexer()
.setStringOrderType("alphabetAsc")
.setInputCol("material")
.setOutputCol("materialIndex");
今回は使いませんが、インデックスの序列に意味を持たせたいときには、変数名を調整してsetStringOrderTypeで並び替えをする方法もあります。
Apache Sparkのspark.mlのパイプライン(pipeline)処理まとめ
次回「 #3 訓練データで学習させ、【価格推定エンジン】を作る」へと続く
次回は実際にデータを学習させて「価格推定エンジン」をつくります
-
今回の例だと、現在同じ重量のゴールドとプラチナだとゴールドのほうが高価なのでのどちらが上みたいなのがハッキリわからない感もありますが、わかりやすい例だと上、中、下とか高、中、低などがつく変数は順序変数と明確に定義することできるとおもいます。 ↩
-
StringIndexerはEstimatorというクラスを継承しています。Estimatorは本来は学習器のためのクラスですが、spark.mlの特徴であるパイプライン処理をする際に、StringIndexerのような前処理系と学習器に同じインタフェースを備えておくことで 前処理→前処理→学習という流れをパイプラインとして実現するためにこのようなインタフェースになっています。 ↩