データセットの前処理と言えば、オープンソースのPandasでDataframeとSeriesを操作するのが一般的ですが、データのサイズが大きいほど、大量のデータを読み込もうとすると、メモリ不足でエラーになる可能性、もしくは長時間待たされる可能性も高くなります。
実際のデータ分析業務の中で、かならずしもビッグデータの規模とは言えないですが、本記事では、ビッグデータの前処理に焦点をあてながら、PySpark(Apache SparkのPythonインタフェース)で2次元テーブル、そしてテーブルの列からデータの取り扱い方について、展開させて頂きたいと思います。
事前準備
まず、E-Mapreduce(Hadoop)のクラスタを構築します。構築の詳細手順はAlibaba Cloud公式ドキュメントに記載されておりますので、ご参照頂ければと思います。
次は、検証に使うデータセットをダウンロードします。こちらはAnalytics Vidhya社の一般公開データセットをダウンロードしました。とあるリテール会社の商品販売データです。最後ダウンロードしたデータセット(csvファイル)を一旦Alibaba Cloud OSSにアップロードしておきます。
データフレームの作成
E-Mapreduceのマスターノードにsshログインし、下記のコマンドでPySpark Shellを起動します。
pyspark --master yarn
ossutilやosscmdなどのECSからOSSへのアクセスツール要らず、下記コマンドのように、sparkよりossバケットにtrain.csvファイルを参照することによって、直接データフレームを作成することができるのでかなり楽です。
train = spark.read.option("inferSchema","true").option("header","true").csv("oss://xxxxxxxxxxxxxxxx/train.csv")
データフレームの操作
データ型の確認
コラムのデータ型を確認するために、printSchema()を利用して、木構造のスキーマをプリントすることができます。
>>>train.printSchema()
root
|-- User_ID: integer (nullable = true)
|-- Product_ID: string (nullable = true)
|-- Gender: string (nullable = true)
|-- Age: string (nullable = true)
|-- Occupation: integer (nullable = true)
|-- City_Category: string (nullable = true)
|-- Stay_In_Current_City_Years: string (nullable = true)
|-- Marital_Status: integer (nullable = true)
|-- Product_Category_1: integer (nullable = true)
|-- Product_Category_2: integer (nullable = true)
|-- Product_Category_3: integer (nullable = true)
|-- Purchase: integer (nullable = true)
先頭要素の確認
headを使って、先頭のN行を表示させることができ、Pandasのheadオペレーションと似ています。
>>> train.head(5)
[Row(User_ID=1000001, Product_ID=u'P00069042', Gender=u'F', Age=u'0-17', Occupation=10, City_Category=u'A', Stay_In_Current_City_Years=u'2', Marital_Status=0, Product_Category_1=3, Product_Category_2=None, Product_Category_3=None, Purchase=8370), Row(User_ID=1000001, Product_ID=u'P00248942', Gender=u'F', Age=u'0-17', Occupation=10, City_Category=u'A', Stay_In_Current_City_Years=u'2', Marital_Status=0, Product_Category_1=1, Product_Category_2=6, Product_Category_3=14, Purchase=15200), Row(User_ID=1000001, Product_ID=u'P00087842', Gender=u'F', Age=u'0-17', Occupation=10, City_Category=u'A', Stay_In_Current_City_Years=u'2', Marital_Status=0, Product_Category_1=12, Product_Category_2=None, Product_Category_3=None, Purchase=1422), Row(User_ID=1000001, Product_ID=u'P00085442', Gender=u'F', Age=u'0-17', Occupation=10, City_Category=u'A', Stay_In_Current_City_Years=u'2', Marital_Status=0, Product_Category_1=12, Product_Category_2=14, Product_Category_3=None, Purchase=1057), Row(User_ID=1000002, Product_ID=u'P00285442', Gender=u'M', Age=u'55+', Occupation=16, City_Category=u'C', Stay_In_Current_City_Years=u'4+', Marital_Status=0, Product_Category_1=8, Product_Category_2=None, Product_Category_3=None, Purchase=7969)]
さらに、2次元テーブルの形式で返したい場合は、showオペレーションを使うことも可能です。
>>> train.show(2,truncate=True)
+-------+----------+------+----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+
|User_ID|Product_ID|Gender| Age|Occupation|City_Category|Stay_In_Current_City_Years|Marital_Status|Product_Category_1|Product_Category_2|Product_Category_3|Purchase|
+-------+----------+------+----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+
|1000001| P00069042| F|0-17| 10| A| 2| 0| 3| null| null| 8370|
|1000001| P00248942| F|0-17| 10| A| 2| 0| 1| 6| 14| 15200|
+-------+----------+------+----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+
データ集計
特定の列に対して、要約統計量(平均値、標準偏差、最大値、最小値、カウント)を取得したい場合、descrie()を利用します。
>>> train.describe('Purchase').show()
+-------+-----------------+
|summary| Purchase|
+-------+-----------------+
| count| 550068|
| mean|9263.968712959126|
| stddev|5023.065393820575|
| min| 12|
| max| 23961|
+-------+-----------------+
コラムのサブセット
列をサブセットにするには、選択操作を使用する必要があり、またコンマで区切られた列名を渡す必要があります。例えば、「User_ID」と「年齢」の先頭の5行を選択してみましょう。
>>> train.select('User_ID','Age').show(5)
+-------+----+
|User_ID| Age|
+-------+----+
|1000001|0-17|
|1000001|0-17|
|1000001|0-17|
|1000001|0-17|
|1000002| 55+|
+-------+----+
only showing top 5 rows
フィルタ操作
DataFrame のPurchase列にフィルタ操作を適用し、15000を超える値を持つ行を選出ことができます。例えば、条件を通す必要があるデータフレームのPurchase列にフィルタをかけて、15000を超える購入数をプリントしてみましょう。
>>> train.filter(train.Purchase > 15000).show(5)
+-------+----------+------+-----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+
|User_ID|Product_ID|Gender| Age|Occupation|City_Category|Stay_In_Current_City_Years|Marital_Status|Product_Category_1|Product_Category_2|Product_Category_3|Purchase|
+-------+----------+------+-----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+
|1000001| P00248942| F| 0-17| 10| A| 2| 0| 1| 6| 14| 15200|
|1000003| P00193542| M|26-35| 15| A| 3| 0| 1| 2| null| 15227|
|1000004| P00184942| M|46-50| 7| B| 2| 1| 1| 8| 17| 19215|
|1000004| P00346142| M|46-50| 7| B| 2| 1| 1| 15| null| 15854|
|1000004| P0097242| M|46-50| 7| B| 2| 1| 1| 16| null| 15686|
+-------+----------+------+-----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+
マップ操作
マップ操作を使用して、DataFrame の各行に関数を適用できます。この関数を適用した後、RDDの形式で結果を取得します。TrainのUser_ID列にマップ操作を適用し、関数を適用した後にマップされたRDD(x,1)の最初の5 つの要素をプリントしてみましょう。
>>> train.select('User_ID').rdd.map(lambda x:(x,1)).take(5)
[(Row(User_ID=1000001), 1), (Row(User_ID=1000001), 1), (Row(User_ID=1000001), 1), (Row(User_ID=1000001), 1), (Row(User_ID=1000002), 1)]
SQL操作
RDDとは異なり、SparkはDataFrameでSQLクエリを実行することができます。DataFrameにSQLクエリを適用するには、DataFrameをテーブルとして登録する必要があります。まず、データフレームをテーブルとして登録してみましょう。
train.registerTempTable('train_table')
上記のコードでは、registerTempTable操作の助けを借りて、テーブル('train_table')として'train'を登録しました。'train_table' にSQLクエリを適用してProduct_ID を選択してみましょう。結果を得るには、show()アクションを適用する必要があります。
>>> spark.sql('select Product_ID from train_table').show(5)
+----------+
|Product_ID|
+----------+
| P00069042|
| P00248942|
| P00087842|
| P00085442|
| P00285442|
+----------+
only showing top 5 rows
上記のコードでは、SQLクエリを指定するためにspark.sql を使いしました。次はもっと複雑なクエリで、train_tableで各年齢グループの最大購入を取得してみましょう。
>>> spark.sql('select Age, max(Purchase) from train_table group by Age').show()
+-----+-------------+
| Age|max(Purchase)|
+-----+-------------+
|18-25| 23958|
|26-35| 23961|
| 0-17| 23955|
|46-50| 23960|
|51-55| 23960|
|36-45| 23960|
| 55+| 23960|
+-----+-------------+
最後
この記事では、E-Mapreduceを利用して、Apache SparkのDataFrameの最も一般的な操作のいくつかについて紹介しました、もし皆さん普段の分析業務にお役に立てば幸いです。実はDataFrameで定義されている操作は数多くあり、1つの記事ですべてをカバーするのは難しいですが、DataFrame操作の詳細については、PythonのPyspark.sqlモジュールドキュメントをご参照いただければと思います。