初版:2020/3/18
著者:高重 聡一, 伊藤 雅博, 株式会社 日立製作所
はじめに
この投稿では、機械学習モデルを組み込んだシステム設計を行う際の、データ前処理の設計ノウハウとデータ前処理の性能検証結果について紹介します。
第3回目は、並列分散処理基盤であるSparkを用いたデータ前処理における性能向上ノウハウと検証結果について紹介します。
投稿一覧:
- 機械学習を利用するシステムのデータ前処理について
- 機械学習向けデータ前処理の性能検証(数値データ編)(その1)
- 機械学習向けデータ前処理の性能検証(数値データ編)(その2) (本投稿)
データ前処理におけるSparkの活用
前回の投稿では、データ量が大きい場合に シングルノードによるPythonによるデータ前処理を実施すると、メモリ不足が起きてしまうことを示しました。このような場合には並列分散処理基盤を利用することが有効であることが多いです。今回は、代表的な並列分散処理基盤であるSparkによるデータ前処理について紹介していきます。
データ処理方式に応じた処理の書き換え
機械学習を活用するシステムの開発では、第1回の投稿に記載したとおり最初に機械学習の有効性を確認するためのPoCを行ってから、その結果をもとに本番システムの開発に入るケースが多いですが、このPoCの段階ではデータ前処理がPythonで実装されていることが多いです。したがって、本番システムの開発の際にSparkを利用する場合は、PythonコードをSpark向けに書き換える必要があります。
今回はBigBenchの業務シナリオ#5に対するPandasのデータフレームによるデータ前処理を、以下のような指針に従ってSparkの処理に書き換えました。
-
初期化
Before(pandas)
import numpy as np
import pandas as pd
```python:After(spark)
import pyspark
import pyspark.sql
form pyspark.sql import SparkSession
from pyspark.sql import HiveContext
from pyspark.context import SparkContext
spark = SparkSession.builder.master(…) #Sparkとの接続の設定を記載
sc = spark.getContext()
hive_context = HiveContext(sc)
-
データフレーム作成
Before(pandas)
df = pd.DataFrame()
```python:After(spark)
df = spark.createDataFrame(data, schema)
-
ファイルの読み込み(CSV)
Before(pandas)
df = pd.read_csv(filename)
```python:After(spark)
df = spark.read.csv(filename)
-
値がNULLではない行の取得
Before(pandas)
df.loc[df[“columnname”]notnull(),:]
```python:After(spark)
df.filter(df[“columnname”].isNotNull())
-
特定のデータ行へのアクセス
Before(pandas)
df.loc[rownumber]
```python:After(spark)
# 特定の行へのアクセスは時間がかかるため行アクセスが要らないようにロジックを変更する
-
データ列の指定
特定のデータ列へのアクセス方法は、pandasでもsparkでも変わりません。
df[‘columnname’]
* ある列の値を検索条件とした行絞り込み(filterなど)
```python:Before(pandas)
df.loc[df[‘columnname’] == value]
df.filter(df[‘columnname’] == value)
-
複数のテーブルの結合(merge / inner join)
Before(pandas)
pd.merge(df1, df2, how=’inner’, left_on=”l-key” right_on=”r-key”)
```python:After(spark)
df1.join(df2, df1.l_key == df2.r_key, ‘inner’)
-
データのグルーピング
Before(pandas)
def function(df):
#何か処理を行う
return data
df.groupby(“key”).apply(function)
```python:After(spark)
out_schema = StructType(...) #out_schemaは関数が出力するデータスキーマの定義
@pandas_udf(out_schema, PandasUDFType.GROUPED_MAP)
def function(df):
#何か処理を行う ※Pandas UDFの仕様により、書き換える必要がある場合有り(後述)
return data
df.groupby(“key”).apply(function)
Pandas UDFによる処理の書き換え
Pandasを用いたPythonの処理を、Sparkで実行する方法の1つとしてPandas UDFが挙げられます。Pandas UDF は、Spark が提供する Python と Spark の連携の仕組みで、Python(Pandas)で実装された処理を、Spark で並列分散処理することが可能です。Pandas UDFでは表 1に示す3種類の関数が利用可能で、Sparkのデータフレーム操作から呼び出すことが出来ます。
表 1 Pandas UDFで利用できるPython関数の種類
# | 処理関数種類 | 関数で実施できる処理の説明 | 利用シーン |
---|---|---|---|
1 | SCALAR | 1つのpandas.Series を受け取り、処理を行ったうえで、1つのpandas.Series を返す。入力と出力で要素名と型が一致する必要がある。 |
あるデータ列に対する次の繰り替え処理に適用
|
2 | AGG | 1つ以上のpandas.Series を受け取り,何らかの処理をしたうえで、1つのpandas.Series を返す。入力と出力のデータ数は一致する必要はない。 |
集計処理に対して適用
|
3 | GROUPED_MAP | 1つのpandas.DataFrame を受け取り、各要素に対して何らかの処理を行い、その結果として別のpandas.DataFrame を1つ返す。入力と出力のデータ行数、列名などが一致する必要はない |
グルーピングしたデータセットの各データセットに対して適用
|
Pandasでgroupby.apply
処理を書いている場合、そのapply
に渡しているPythonの関数を、上記のGROUPED_MAP関数として登録することで、再実装などの工数をかけることなくSparkによる並列実行が実現出来ます。
ただし、PandasとSparkのapply
関数では若干仕様が異なるため、少し手直しが必要なる場合があります。今回の実装では次の2つの仕様の差異に対応する必要があり、書き換えを行いました。なお、説明中のdf
はデータフレームを渡される変数名、keyname
はデータフレームの列名、function
は繰り返し処理を定義する関数名です。
-
引数の値の違い
- 挙動の違い
- `df.groupby(“keyname”).apply(function)`を実行すると、最初にデータは`keyname`の値が同一のものでグルーピングされ、グループ単位で1つのデータフレームにまとめられます。`apply`関数で指定した関数`function(df)`の引数には、Pandas, Sparkのいずれにおいてもそれぞれのライブラリでの`DataFrame`オブジェクトが渡されます。Pandasでは、この`DataFrame`には「`df.name`」というプロパティが設定され、グループにおける`keyname`の値を取得することが出来ます。一方、Sparkではそういったプロパティはありません。
- 対策
- 関数`function(df)`に渡されるデータフレームには、原則として`keyname`の要素も含まれています。`name = df[“keyname”][0]`などとすることで、`name`の値を取得することが出来ます。
-
戻り値の扱いの違い
- 挙動の違い
- Pandasでは`df.groupby(“keyname”).apply(function)`では、結果の戻り値に含まれる列の名称、データ型に従って、解釈が変わってきます。特に、出力されたデータに対して、`groupby`関数が`keyname`の列をインデックスとして追加したデータを自動的に作成し、それらを集約した`Series`もしくは`DataFrame`が作られます。一方で、Sparkではこういった自動的な列の補完などの動作は行いません。
- 対策
- `function(df)`の戻り値には必ず`keyname`を列名として、その値として元のデータの`keyname`の値を含むように構成します。
BigBenchの業務シナリオ#5のSpark化コードの例
第2回の投稿で示した、PythonによるBigBenchの業務シナリオ#5のデータ前処理の実装に対して、これまでに挙げてきたノウハウを用いてSparkに書き換えた結果、最終的に 下図 1のようなコードになりました。
import pandas as pd
import numpy as np
import pyspark
from pyspark.context import SparkContext
from pyspark.sql import HiveContext
from pyspark.sql.functions import udf, pandas_udf, PandasUDFType, when
from pyspark.sql.types import *
sc = SparkContext.getOrCreate()
hive_context = HiveContext(sc)
web_clickstreams = hive_context.read.table("bigbench.web_clickstreams")
item = hive_context.read.table("bigbench.item")
customer = hive_context.read.table("bigbench.customer")
customer_demographics = hive_context.read.table("bigbench.customer_demographics")
# 処理①
data = web_clickstreams.filter(web_clickstreams[“wcs_user_sk”].isNotNull())
data = data.join(item, data["wcs_item_sk"] == item["i_item_sk"], 'inner')
# 処理②:groupbyでユーザIDごとにグルーピング
grouped_users = data.groupby('wcs_user_sk')
# 処理③型定義: 繰り返し処理の出力データ型を定義
types = ["wcs_user_sk", "clicks_in_category"]+["clicks_in_%d"%i for i in range(1,8)]
out_schema = StructType([StructField(i, IntegerType(), True) for i in types])
# 処理③登録: Pandas UDFに繰り返し処理の内容を関数として登録
@pandas_udf(out_schema, PandasUDFType.GROUPED_MAP)
def summarize_per_user(wcs_user_sk_contents):
wcs_user_sk_index = wcs_user_sk_contents['wcs_user_sk'][0]
#処理③-1, ③-2
clicks_in_category = \
len(wcs_user_sk_contents[wcs_user_sk_contents['i_category'] == i_category_index])
clicks_in = [0] * 8
for name, df in wcs_user_sk_contents.groupby('i_category_id'):# ループを1回に最適化
if name < len(clicks_in):
clicks_in[name] = len(df.index)
#処理③-3
return pd.DataFrame([wcs_user_sk_index, clicks_in_category] + clicks_in[1:],\
columns=types)
# 処理③実行
i_category_index = 'Books'
data = grouped_users.apply(summarize_per_user)
# 処理④
data = data.join(customer, data["wcs_user_sk"] == customer["c_customer_sk"], 'inner')
# 処理⑤
data = data.join(customer_demographics, \
data["c_current_cdemo_sk"] == customer_demographics["cd_demo_sk"], 'inner')
# 処理⑥
data.withColumn('college_education',
when(data["cd_education_status"] == 'Advanced Degree', 1)\
.when(data["cd_education_status"] == 'College', 1)\
.when(data[“cd_education_status”] == '4 yr Degree', 1)\
.when(data[“cd_education_status”] == '2 yr Degree', 1)\
.otherwise(0))
data.withColumn('male', when(data[“cd_gender”] == 'M', 1).otherwise(0))
# 結果の保存
data.write.mode('append').parquet('answer_q05_0100.parquet')
図 1 SparkによるBigBenchシナリオ#5のデータ前処理ソースコード
Sparkを使った効果検証
ここから、図 1で実装したSparkによるデータ前処理の性能検証を行っていきます。以下の図 2に今回の検証で並列分散処理用の環境として構築したSparkクラスタのサービス配置を示します。今回は、オンプレミスでのユースケースを想定し、Clouderaディストリビューションを用いてSparkクラスタを構築しています。なお、第2回の投稿で行った、Pythonによるシングルノード処理の検証にはWorker Nodeの1ノードを使用しています。また、Sparkで処理を行う際に実際の並列分散処理が行われるのは3台のWorker Nodeになります。
図 2 今回の検証環境のサービス配置
次に、表 2に今回の検証環境のスペックを示します。今回は、検証マシンとしてAWS上のIaaS(EC2インスタンス)を使用しています。Worker Node用のEC2のインスタンスには1TBのHDD(EBS)を5台接続しており、ノード当り5TBの容量を接続しています。ただし、HDFSでは3多重でデータを書き込むため、実効上のデータ容量は5TB程度です。
表 2 検証環境のハードウェアスペック
Manager Node | Master Node | Worker Node×3 | |
---|---|---|---|
検証環境 | AWS EC2 | AWS EC2 | AWS EC2 |
OS | CentOS 7 64bit | CentOS 7 64bit | CentOS 7 64bit |
CPU(コア数) | 2 | 4 | 96 (32×3ノード) |
Memory(GB) | 16 | 32 | 768 (256GB×3ノード) |
HDD(GB) | 80GB | 80GB | 15TB※(1TB×5 HDD×3ノード) |
また、検証で利用しているソフトウェアのバージョンは次の表 3のとおりです。
表 3 検証環境のソフトウェアバージョン
ソフトウェア | バージョン |
---|---|
Clouderaディストリビューション | CDH 6.3.0 |
Spark | 2.4.0 |
Hive | 2.1.1 |
YARN | 2.5.0 |
HDFS | 3.0.0 |
Python | 3.7.3 |
Pandas | 0.24.2 |
Numpy | 1.16.4 |
比較する処理方式
前回第2回の投稿で検証した、下記1.と2.の処理方式での性能測定結果に加えて、Sparkによる並列分散処理を行った場合の性能(3.)を測定して比較します。
-
Pythonによるシングルノード処理(第2回の投稿の図 5のロジック最適化なし)
第2回の投稿の図 4のコードをPython上で実行します。
-
Pythonによるシングルノード処理(第2回の投稿の図 5のロジック最適化あり)
第2回の投稿の図 4のコードに対して第2回の投稿の図 5の最適化を実施したコードを、Python上で実行します。
-
Sparkによる並列分散処理(第2回の投稿の図 5のロジック最適化あり)
2.で使用したロジック最適化済のPythonによるシングルノード処理向けコードを、Sparkでの処理向けに変更したコード(図 1に示したもの)を用いて、Spark上で並列分散処理を実行します。
Sparkでの処理時の実行パラメータ
タスクを実行する際のSparkの実行パラメータは表 4のように設定しています。各Worker Nodeで1つずつのワーカープロセス(Executor)を起動し、その中でメモリやコアを独占的に利用できるようにする方針で割り当てています。
表 4 Sparkの実行パラメータ
# | 項目 | 設定値 | 備考 |
---|---|---|---|
1 | Executorの数 | 3 | 各ノードに1つずつ起動させることを想定 |
2 | Executorのメモリサイズ | 128 GB | |
3 | Executor当りのコア数 | 30 | マシンの32コアのうち、30コアを割り当てて利用 |
測定の対象とする処理内容とデータ
測定では、第2回の投稿の際の測定と同様に、次の3つの処理に要した合計時間を計測します。
データソースからのデータのメモリ上への読み込み
読み込んだデータに対する、データの結合、集計などの前処理
処理結果のデータストアへの書き込み
また、測定対象のデータは第2回と同じ設定(第2回の投稿中の表 3)にしました。
性能測定結果
BigBenchの業務シナリオ#5に対して、4種類の処理のそれぞれを、データサイズごとに実行して処理時間を評価した結果は図 3の通りとなりました。
図 3 入力データサイズごとのデータ前処理時間測定結果
Pythonによるシングルノードでの処理では、第2回の投稿での結果の通り、入力データサイズを増加させた場合に本番想定のデータセットのサイズ(約50GB)よりも小規模なデータセットの段階でメモリ不足により処理が出来なくなってしまいます。一方、Sparkで並列分散処理した場合は本番想定のデータセットでも処理が正常に終了し、なおかつそれ以上のサイズにおいても処理を完了することが出来ました。
図 4に、サイズ22GBのデータをSparkで処理した場合のCPU, メモリ、ディスクI/O使⽤量の経過を示します。各ワーカーノードのCPUがすべて100%利用されているために計算時間を大きく短縮できたことが確認できます。
メモリも同様に各ノードに分散されています。3ノードのそれぞれにベースとなるOS機能のオーバーヘッドがあるためクラスタ全体でのメモリ使用量は単一マシンの場合よりも大きくなっており、業務シナリオ#5のプログラムを起動後に使用したメモリはワーカノードあたり130~140GB程度、3ノード合計で410GB程度です。
また、処理①Inner joinや処理②groupbyの処理(第2回の投稿図 3参照)結果、ディスク書き込みのI/Oが発生している様子がわかります。このI/Oは結合、あるいはソートしたデータをディスク上に保存しているために発生しています。
図 4 Spark環境でのCPU、メモリ、I/O使用量の時間的変化
性能評価結果の考察
Spark導入による効果
【並列分散化の効果①: 大規模データへの対応】
Sparkを導入することによって、単一サーバのPython上で実行するとメモリ不足で強制終了するような大規模のデータに対して前処理を行うことが出来るようになりました。
1台のマシンのメモリ搭載量の限界を超えたデータを扱う場合には、Sparkの導入は適切であると考えられます。
【並列分散化の効果②: 処理時間の短縮】
PythonはマルチCPUを活用できないため、データサイズが増加しても単一のCPUで逐次的に処理を実行しています。一方で、Sparkではデータがノード毎に分割され、それぞれのデータへの処理を異なるノードやCPUコアに割り当てて並列に処理するため、処理時間を大幅に短縮できています。
PythonとSparkの特徴について
-
Python上でPandasを利用する場合、処理対象のデータはすべてメモリに読み込まれ、単一のCPUで逐次処理されます。また、今回の業務シナリオ#5の例では、繰り返し時間は純粋にデータサイズに比例して増加しています。
-
Sparkでの処理では、データを一定のサイズで区切った区画ごとにメモリへの読み込み、ディスクへの書き出しが行われます。また、分割されたノード間のデータの交換が必要な処理(JOINやGROUPBY、ソートなど)が行われると、その結果をディスクに書き出します。このように、大きなデータを区画ごとに処理することにより、取り扱うデータサイズが物理メモリの搭載量を超えた場合でも、処理を完了することが出来る一方で、処理時間がディスクIOネックとなり、突然処理時間が線形以上に増加することがあります。
- 図 3では、入力データサイズが150GB程度の際に、突然非線形に処理時間が増加していますが、これは扱うデータサイズが搭載メモリ量を超えたため、ディスクIOが発生してボトルネックになっています。
まとめ
大規模なテーブルデータに対してデータ結合・集計などの前処理を行うサンプル業務を対象として、性能評価を行った結果と、その際の性能向上ノウハウをご紹介しました。
処理対象データがそれほど大きくない場合(~数十 GB)には、シングルノードでの Pythonでの前処理でも数時間程度で処理が終了可能なため、Python での前処理コードでデータ前処理を実施可能です。その際、本連載で紹介したロジック最適化を実施することで処理時間を短縮することが可能です。
一方、処理対象データが大量な場合、Pythonでは処理に時間がかかりすぎる、または処理に失敗してしまうことがあるため、Sparkなどの分散処理基盤の適用が現実的な選択肢となります。機械学習で利用する学習対象のデータが22GB程度の場合には前処理にSparkによる並列分散処理を利用することで、処理時間をシングルノード(シングルスレッド)で動作するPythonに比べ94%程度短縮できることを確認しました。また、Worker3ノードのSparkを利用した場合はPythonがシングルノードで処理可能なデータの4倍程度のデータ量まで性能を保ったまま処理可能なことを確認しました。