はじめに
時系列データでAIモデルを構築する際、スライディングウィンドウによる特徴量計算を行った上で各種アルゴリズムで学習を行う、という処理フローはよく実施されます。この時系列データの特徴量抽出ですが、メモリに乗りきらないような容量の大きいデータ(いわゆるビッグデータ)では、pandas
やnumpy
などの便利なパッケージが使えないため処理の難易度が上がります。このような場合、どのように実現していますか?
本記事ではその一例としてPySpark
を用いる方法をご紹介します。文量が多くなってしまいますので、2回に分けてご紹介していきます。
今回は基礎編ということで、一般的な「PySparkでスライディングウィンドウによる特徴量計算する方法」を紹介します。
検証環境
PySpark
の実行環境としてAzure Synapse Analyticsを使いました。
主要パッケージのバージョンは以下の通りです。(2020年8月20日時点のデフォルト設定)
Apache Spark 2.4
Python version 3.6.1
PySparkでスライディングウィンドウによる特徴量計算する方法
検証環境作成方法は割愛させていただきます。
Azure Synapse AnalyticsでどのようにSpark実行環境を構築するかについては、ご要望がありましたら記事にしたいと思います。コメントでご要望いただけると幸いです。
1. データの準備
適当なデータをPySparkのデータフレームとして定義します。
df = sqlContext.createDataFrame([
(1, 2.65,2.42,6.90,4.93),
(2, 2.57,8.50,2.40,5.37),
(3, 2.13,3.76,7.52,7.67),
(4, 3.09,7.28,3.59,6.34),
(5, 5.75,4.69,5.26,3.11),
(6, 6.91,4.04,2.03,6.28),
(7, 5.44,3.22,2.87,7.14),
(8, 4.86,7.47,3.68,0.32),
(9, 9.70,7.43,4.43,7.74),
(10,6.30,7.72,7.78,7.91),
],
["time", "data1", "data2", "data3", "data4"])
df.show()
# +----+-----+-----+-----+-----+
# |time|data1|data2|data3|data4|
# +----+-----+-----+-----+-----+
# | 1| 2.65| 2.42| 6.9| 4.93|
# | 2| 2.57| 8.5| 2.4| 5.37|
# | 3| 2.13| 3.76| 7.52| 7.67|
# | 4| 3.09| 7.28| 3.59| 6.34|
# | 5| 5.75| 4.69| 5.26| 3.11|
# | 6| 6.91| 4.04| 2.03| 6.28|
# | 7| 5.44| 3.22| 2.87| 7.14|
# | 8| 4.86| 7.47| 3.68| 0.32|
# | 9| 9.7| 7.43| 4.43| 7.74|
# | 10| 6.3| 7.72| 7.78| 7.91|
# +----+-----+-----+-----+-----+
仮に以下のようなデータが取れたものと考えてください。
カラム名 | 意味 |
---|---|
time | 記録時間(秒) |
data1~data6 | 計測データ |
2. スライディングウィンドウの定義
PySpark
のスライディングウィンドウはpyspark.sql.window
のWindows
で定義します。ここではウィンドウ幅5秒のスライディングウィンドウを定義しています。
from pyspark.sql.window import Window
import pyspark.sql.functions as F
# sliding-windowの設定
window_size = 5
sliding_window = Window.orderBy(F.col("time")).rowsBetween(Window.currentRow, window_size-1)
orderBy("カラム名")
でソートキーを指定しています。Spark
では処理順序が保証されないため、ソートキーを指定することはとても重要です。本例では記録時間を示すtime
の順、つまりtime
を昇順に並べた上で1レコード目から順に処理して欲しいため、orderBy(F.col("time"))
という指定をしています。ちなみにデフォルトではASC(昇順)
で処理されます。DESC(降順)
で処理したい場合は以下のように記述します。
sliding_window = Window.orderBy(F.col("time").desc()).rowsBetween(Window.currentRow, window_size-1)
F.col("time")
に.desc()
を付けると降順扱いとなります。
次にウィンドウ幅をrowsBetween(Window.currentRow, window_size-1)
で定義しています。第1引数は開始位置の定義であり、ここではWindow.currentRow
と現在の行を指定しています。第2引数は終了位置の定義であり、ここではwindow_size-1
と現在の行から4行先(4秒先)を指定しています。これで現在の行を含む4行先までの5行分(5秒分)のデータを一つのウィンドウとして定義できたことになります。
3. 特徴量計算
先ほど設定したスライディングウィンドウの定義を使った特徴量抽出を実施します。data1
に対してウィンドウ幅内のmax(最大値)
、min(最小値)
、avg(平均値)
を取得してみます。
df.withColumn('feat_max_data1', F.max('data1').over(sliding_window))\
.withColumn('feat_min_data1', F.min('data1').over(sliding_window))\
.withColumn('feat_avg_data1', F.avg('data1').over(sliding_window))\
.select('time', 'data1', 'feat_max_data1', 'feat_min_data1', 'feat_avg_data1')\
.show()
# +----+-----+--------------+--------------+------------------+
# |time|data1|feat_max_data1|feat_min_data1| feat_avg_data1|
# +----+-----+--------------+--------------+------------------+
# | 1| 2.65| 5.75| 2.13|3.2379999999999995|
# | 2| 2.57| 6.91| 2.13| 4.09|
# | 3| 2.13| 6.91| 2.13| 4.664|
# | 4| 3.09| 6.91| 3.09| 5.21|
# | 5| 5.75| 9.7| 4.86| 6.531999999999999|
# | 6| 6.91| 9.7| 4.86| 6.642|
# | 7| 5.44| 9.7| 4.86| 6.575|
# | 8| 4.86| 9.7| 4.86| 6.953333333333333|
# | 9| 9.7| 9.7| 6.3| 8.0|
# | 10| 6.3| 6.3| 6.3| 6.3|
# +----+-----+--------------+--------------+------------------+
withColumn("カラム名", 処理内容)
でデータフレームの新規カラムとして、指定したカラム名で指定した処理内容の結果が追加されます。max
を計算する処理コードを見るとwithColumn('feat_max_data1', F.max('data1').over(sliding_window))
となっており、「data1
のmax
を取ります。over(sliding_window)
の条件で。結果はfeat_max_data1
カラムとして追加します。」という解釈になります。PySpark
でのスライディングウィンドウ指定はover()
で定義します。
PySpark
では1つずつ処理を定義していくため、この例の様に1つのカラムから複数の特徴量を取得する場合は複数の処理コードを羅列していく必要があります。
まとめ
以上、「PySparkでスライディングウィンドウによる特徴量計算する方法」の基礎編を紹介しました。
今回紹介した処理方法は一般的な手法であり、データが少ない場合や抽出する特徴量が少ない場合は、十分事足りると思います。しかし、データ量が多い場合、処理するカラムが多い場合、抽出する特徴量が多い場合など、この処理方法では処理効率が悪く処理コストが大きくなってしまいます。ですが、処理方法を工夫することで処理コストを劇的に改善することが可能です。
次回は応用編として、どのような工夫を行えばより効率的に処理を行えるのかをご紹介します。
ご一読いただき、ありがとうございました。