はじめに
ある意味こちらの続き
ここではSpark(pyspark)を使ってデータクレンジングをしていた時にやらかして手戻りなど色々あったので、あのとき気を付ければなーと思ったことを纏めます。
また、当たり前な感もありますが、個人的tipsを合わせて纏めます。
特に明言していなければPythonコードはpyspark(df=DataFrame)を想定して記載しています。
pythonは3系、pysparkは2.0系を想定しています。
気を付けたい事
処理前にデータの型を確認する
私の中で忘れがち&やらかしがち1位。そして気づくのが遅れるとやべぇやつ。
読み込んだデータのカラムが文字列なのか数値なのかでsortやmax、minの処理が変わります。
csvを読み込む際はカラムの型を推定するようにoption(inferSchema, True)
とすると幸せになれます。
from pyspark.sql.functions import *
from pyspark.sql.types import *
# csvからデータを読み込み。読み込む際はinferSchemaで推定する
df = spark.read.option("inferSchema", True).csv("csvファイル")
# 後からカラムをキャストする場合はこんな感じ(int型(IntegerType)にキャスト)
# キャストする型はpyspark.sql.typesのxxxTypeを参照
df = df.withColumn("キャストする", df["キャストする"].cast(IntegerType()))
処理前後でデータの量を確認する
私の中で忘れがち&やらかしがち2位。気づくのが遅れるとそこそこやべぇやつ。
where(フィルタ)やjoinした前後でデータ量が想定より上下していると、想定していないデータが増えた(減った)可能性があります。
また、joinの種類(inner/outer/left/...)やカラムの数によって想定していない結果が出てきたりします。
# 処理前のデータのカウント
df.count()
# データをフィルタリング。その後
df = df.where(df.columns > 10)
df.count()
# データのjoin
# dfのidとdf2のidでleft joinする例
# df2のidに重複がある場合はデータが増えることに注意
df = df.join(df2, df.id == df2.id, "left").drop(df2.id)
df.count()
個人的tips
idを再度振る
データにいわゆるDBの主キー(id)となるカラムが存在しているが、そのidが実は一意になっていない、なんてことがあります(ちゃんとデータ作ってりゃー基本無いはずですなんですけどね、、、)。
データのイメージとしては↓こんな感じ。本来はid - A が1対1の関係になっているが、新たにidとして3が振られているなどです。
id | A |
---|---|
1 | 1 |
2 | 2 |
3 ↑は1であってほしい |
1 |
そんな時にidを再度振る方法です。
# 新たにnew_idとしてidを振りなおす
# dfは↑のデータを読み込んでいるDataFrameである前提
# monotonically_increasing_id()は1から増加する訳ではないことに注意
tmp = df.withColumn("new_id", monotonically_increasing_id())
# idに対して唯一のnew_idを決める。
# new_idは重複がないのでここではminを取ってidに対して唯一のnew_idを取得する
tmp = tmp.groupBy("id").agg(min("new_id").alias("new_id"))
# idでjoin。必要に応じてnew_idをidにリネームなどすればok
df = df.join(tmp, df.id == tmp.id, "inner").drop(tmp.id)
複数カラムの情報を1カラムに
複数のカラムの入力の状態を元に、例えばstateとして1つのカラムに纏めたい、ということがあります。
複数のマスタデータから一つに統合されたデータなどにありがちなデータでしょうか。
データのイメージとしては↓こんな感じ。カラムAとBの値の組み合わせで、ある状態を表している場合などは、その状態を表すカラムを追加するほうが分かりやすく混乱も少なくなります。
id | A | B |
---|---|---|
1 | True | True |
2 | False | True |
3 | True | False |
そんな時に新たにカラムを追加する方法です。
簡単に言えば、一般的なプログラムと同じように2進数でデータを振り、統合します。
# カラムAに対する状態を取得。Aを統合用にリネーム
tmp_a = df.select("id", "A").withColumnRenamed("A","state")
# 値に対して0/1で置き換える
tmp_a = tmp_a.withColumn("state", when(tmp_a.A == True, 1).otherwise(0))
# カラムBも同様に処理。ただし数値に置き換える場合は10/0で置き換え
tmp_b = df.select("id", "B").withColumnRenamed("B","state")
tmp_b = tmp_a.withColumn("state", when(tmp_b.B == True, 10).otherwise(0))
# unionで上下にデータを合体
# idでstateをsumすることでA/Bの組み合わせ(0/1/10/11)が分かる
tmp = tmp_a.union(tmp_b)
tmp = tmp.groupBy("id").agg(sum("state").alias("state"))
〆
他にもいろいろあったはず、、、なんですが忘れてしまっているので今後も拡張していきたい。。。