LoginSignup
0
0

More than 3 years have passed since last update.

E-Mapreduceでデータの前処理を実践

Last updated at Posted at 2019-10-21

データセットの前処理と言えば、オープンソースのPandasでDataframeとSeriesを操作するのが一般的ですが、データのサイズが大きいほど、大量のデータを読み込もうとすると、メモリ不足でエラーになる可能性、もしくは長時間待たされる可能性も高くなります。

実際のデータ分析業務の中で、かならずしもビッグデータの規模とは言えないですが、本記事では、ビッグデータの前処理に焦点をあてながら、PySpark(Apache SparkのPythonインタフェース)で2次元テーブル、そしてテーブルの列からデータの取り扱い方について、展開させて頂きたいと思います。

事前準備

まず、E-Mapreduce(Hadoop)のクラスタを構築します。構築の詳細手順はAlibaba Cloud公式ドキュメントに記載されておりますので、ご参照頂ければと思います。

次は、検証に使うデータセットをダウンロードします。こちらはAnalytics Vidhya社の一般公開データセットをダウンロードしました。とあるリテール会社の商品販売データです。最後ダウンロードしたデータセット(csvファイル)を一旦Alibaba Cloud OSSにアップロードしておきます。

f:id:sbc_kou:20190913161520p:plain

データフレームの作成

E-Mapreduceのマスターノードにsshログインし、下記のコマンドでPySpark Shellを起動します。

pyspark --master yarn

f:id:sbc_kou:20190913162357p:plain

ossutilosscmdなどのECSからOSSへのアクセスツール要らず、下記コマンドのように、sparkよりossバケットにtrain.csvファイルを参照することによって、直接データフレームを作成することができるのでかなり楽です。

train = spark.read.option("inferSchema","true").option("header","true").csv("oss://xxxxxxxxxxxxxxxx/train.csv")

データフレームの操作

データ型の確認

コラムのデータ型を確認するために、printSchema()を利用して、木構造のスキーマをプリントすることができます。

>>>train.printSchema()
root
 |-- User_ID: integer (nullable = true)
 |-- Product_ID: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Age: string (nullable = true)
 |-- Occupation: integer (nullable = true)
 |-- City_Category: string (nullable = true)
 |-- Stay_In_Current_City_Years: string (nullable = true)
 |-- Marital_Status: integer (nullable = true)
 |-- Product_Category_1: integer (nullable = true)
 |-- Product_Category_2: integer (nullable = true)
 |-- Product_Category_3: integer (nullable = true)
 |-- Purchase: integer (nullable = true)
先頭要素の確認

headを使って、先頭のN行を表示させることができ、Pandasのheadオペレーションと似ています。

>>> train.head(5)
[Row(User_ID=1000001, Product_ID=u'P00069042', Gender=u'F', Age=u'0-17', Occupation=10, City_Category=u'A', Stay_In_Current_City_Years=u'2', Marital_Status=0, Product_Category_1=3, Product_Category_2=None, Product_Category_3=None, Purchase=8370), Row(User_ID=1000001, Product_ID=u'P00248942', Gender=u'F', Age=u'0-17', Occupation=10, City_Category=u'A', Stay_In_Current_City_Years=u'2', Marital_Status=0, Product_Category_1=1, Product_Category_2=6, Product_Category_3=14, Purchase=15200), Row(User_ID=1000001, Product_ID=u'P00087842', Gender=u'F', Age=u'0-17', Occupation=10, City_Category=u'A', Stay_In_Current_City_Years=u'2', Marital_Status=0, Product_Category_1=12, Product_Category_2=None, Product_Category_3=None, Purchase=1422), Row(User_ID=1000001, Product_ID=u'P00085442', Gender=u'F', Age=u'0-17', Occupation=10, City_Category=u'A', Stay_In_Current_City_Years=u'2', Marital_Status=0, Product_Category_1=12, Product_Category_2=14, Product_Category_3=None, Purchase=1057), Row(User_ID=1000002, Product_ID=u'P00285442', Gender=u'M', Age=u'55+', Occupation=16, City_Category=u'C', Stay_In_Current_City_Years=u'4+', Marital_Status=0, Product_Category_1=8, Product_Category_2=None, Product_Category_3=None, Purchase=7969)]

さらに、2次元テーブルの形式で返したい場合は、showオペレーションを使うことも可能です。

>>> train.show(2,truncate=True)
+-------+----------+------+----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+
|User_ID|Product_ID|Gender| Age|Occupation|City_Category|Stay_In_Current_City_Years|Marital_Status|Product_Category_1|Product_Category_2|Product_Category_3|Purchase|
+-------+----------+------+----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+
|1000001| P00069042|     F|0-17|        10|            A|                         2|             0|                 3|              null|              null|    8370|
|1000001| P00248942|     F|0-17|        10|            A|                         2|             0|                 1|                 6|                14|   15200|
+-------+----------+------+----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+
データ集計

特定の列に対して、要約統計量(平均値、標準偏差、最大値、最小値、カウント)を取得したい場合、descrie()を利用します。

>>> train.describe('Purchase').show()
+-------+-----------------+
|summary|         Purchase|
+-------+-----------------+
|  count|           550068|
|   mean|9263.968712959126|
| stddev|5023.065393820575|
|    min|               12|
|    max|            23961|
+-------+-----------------+
コラムのサブセット

列をサブセットにするには、選択操作を使用する必要があり、またコンマで区切られた列名を渡す必要があります。例えば、「User_ID」と「年齢」の先頭の5行を選択してみましょう。

>>> train.select('User_ID','Age').show(5)
+-------+----+
|User_ID| Age|
+-------+----+
|1000001|0-17|
|1000001|0-17|
|1000001|0-17|
|1000001|0-17|
|1000002| 55+|
+-------+----+
only showing top 5 rows
フィルタ操作

DataFrame のPurchase列にフィルタ操作を適用し、15000を超える値を持つ行を選出ことができます。例えば、条件を通す必要があるデータフレームのPurchase列にフィルタをかけて、15000を超える購入数をプリントしてみましょう。

>>> train.filter(train.Purchase > 15000).show(5)
+-------+----------+------+-----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+
|User_ID|Product_ID|Gender|  Age|Occupation|City_Category|Stay_In_Current_City_Years|Marital_Status|Product_Category_1|Product_Category_2|Product_Category_3|Purchase|
+-------+----------+------+-----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+
|1000001| P00248942|     F| 0-17|        10|            A|                         2|             0|                 1|                 6|                14|   15200|
|1000003| P00193542|     M|26-35|        15|            A|                         3|             0|                 1|                 2|              null|   15227|
|1000004| P00184942|     M|46-50|         7|            B|                         2|             1|                 1|                 8|                17|   19215|
|1000004| P00346142|     M|46-50|         7|            B|                         2|             1|                 1|                15|              null|   15854|
|1000004|  P0097242|     M|46-50|         7|            B|                         2|             1|                 1|                16|              null|   15686|
+-------+----------+------+-----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+
マップ操作

マップ操作を使用して、DataFrame の各行に関数を適用できます。この関数を適用した後、RDDの形式で結果を取得します。TrainのUser_ID列にマップ操作を適用し、関数を適用した後にマップされたRDD(x,1)の最初の5 つの要素をプリントしてみましょう。

>>> train.select('User_ID').rdd.map(lambda x:(x,1)).take(5)
[(Row(User_ID=1000001), 1), (Row(User_ID=1000001), 1), (Row(User_ID=1000001), 1), (Row(User_ID=1000001), 1), (Row(User_ID=1000002), 1)]
SQL操作

RDDとは異なり、SparkはDataFrameでSQLクエリを実行することができます。DataFrameにSQLクエリを適用するには、DataFrameをテーブルとして登録する必要があります。まず、データフレームをテーブルとして登録してみましょう。

train.registerTempTable('train_table')

上記のコードでは、registerTempTable操作の助けを借りて、テーブル('train_table')として'train'を登録しました。'train_table' にSQLクエリを適用してProduct_ID を選択してみましょう。結果を得るには、show()アクションを適用する必要があります。

>>> spark.sql('select Product_ID from train_table').show(5)
+----------+
|Product_ID|
+----------+
| P00069042|
| P00248942|
| P00087842|
| P00085442|
| P00285442|
+----------+
only showing top 5 rows

上記のコードでは、SQLクエリを指定するためにspark.sql を使いしました。次はもっと複雑なクエリで、train_tableで各年齢グループの最大購入を取得してみましょう。

>>> spark.sql('select Age, max(Purchase) from train_table group by Age').show()
+-----+-------------+
|  Age|max(Purchase)|
+-----+-------------+
|18-25|        23958|
|26-35|        23961|
| 0-17|        23955|
|46-50|        23960|
|51-55|        23960|
|36-45|        23960|
|  55+|        23960|
+-----+-------------+

最後

この記事では、E-Mapreduceを利用して、Apache SparkのDataFrameの最も一般的な操作のいくつかについて紹介しました、もし皆さん普段の分析業務にお役に立てば幸いです。実はDataFrameで定義されている操作は数多くあり、1つの記事ですべてをカバーするのは難しいですが、DataFrame操作の詳細については、PythonのPyspark.sqlモジュールドキュメントをご参照いただければと思います。

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0