2024/4/12に翔泳社よりApache Spark徹底入門を出版します!
書籍のサンプルノートブックをウォークスルーしていきます。Python/Chapter10/10-1 Data Cleansing
となります。
翻訳ノートブックのリポジトリはこちら。
ノートブックはこちら
いくつかの探索的データ分析やクレンジングからスタートします。Inside AirbnbのSF Airbnbレンタルデータセットを使用します。
SF Airbnbデータセットをロードしましょう(オプションの動作を確認したい場合にはコメントアウトしてください)。
filePath = "/databricks-datasets/learning-spark-v2/sf-airbnb/sf-airbnb.csv"
rawDF = spark.read.csv(filePath, header="true", inferSchema="true", multiLine="true", escape='"')
display(rawDF)
rawDF.columns
['id',
'listing_url',
'scrape_id',
'last_scraped',
'name',
'summary',
'space',
'description',
'experiences_offered',
'neighborhood_overview',
'notes',
'transit',
'access',
'interaction',
'house_rules',
'thumbnail_url',
'medium_url',
'picture_url',
'xl_picture_url',
'host_id',
'host_url',
'host_name',
'host_since',
'host_location',
'host_about',
'host_response_time',
'host_response_rate',
'host_acceptance_rate',
'host_is_superhost',
'host_thumbnail_url',
'host_picture_url',
'host_neighbourhood',
'host_listings_count',
'host_total_listings_count',
'host_verifications',
'host_has_profile_pic',
'host_identity_verified',
'street',
'neighbourhood',
'neighbourhood_cleansed',
'neighbourhood_group_cleansed',
'city',
'state',
'zipcode',
'market',
'smart_location',
'country_code',
'country',
'latitude',
'longitude',
'is_location_exact',
'property_type',
'room_type',
'accommodates',
'bathrooms',
'bedrooms',
'beds',
'bed_type',
'amenities',
'square_feet',
'price',
'weekly_price',
'monthly_price',
'security_deposit',
'cleaning_fee',
'guests_included',
'extra_people',
'minimum_nights',
'maximum_nights',
'minimum_minimum_nights',
'maximum_minimum_nights',
'minimum_maximum_nights',
'maximum_maximum_nights',
'minimum_nights_avg_ntm',
'maximum_nights_avg_ntm',
'calendar_updated',
'has_availability',
'availability_30',
'availability_60',
'availability_90',
'availability_365',
'calendar_last_scraped',
'number_of_reviews',
'number_of_reviews_ltm',
'first_review',
'last_review',
'review_scores_rating',
'review_scores_accuracy',
'review_scores_cleanliness',
'review_scores_checkin',
'review_scores_communication',
'review_scores_location',
'review_scores_value',
'requires_license',
'license',
'jurisdiction_names',
'instant_bookable',
'is_business_travel_ready',
'cancellation_policy',
'require_guest_profile_picture',
'require_guest_phone_verification',
'calculated_host_listings_count',
'calculated_host_listings_count_entire_homes',
'calculated_host_listings_count_private_rooms',
'calculated_host_listings_count_shared_rooms',
'reviews_per_month']
シンプルにするために、このデータセットの特定のカラムのみを保持します。あとで、特徴量選択について言及します。
columnsToKeep = [
"host_is_superhost",
"cancellation_policy",
"instant_bookable",
"host_total_listings_count",
"neighbourhood_cleansed",
"latitude",
"longitude",
"property_type",
"room_type",
"accommodates",
"bathrooms",
"bedrooms",
"beds",
"bed_type",
"minimum_nights",
"number_of_reviews",
"review_scores_rating",
"review_scores_accuracy",
"review_scores_cleanliness",
"review_scores_checkin",
"review_scores_communication",
"review_scores_location",
"review_scores_value",
"price"]
baseDF = rawDF.select(columnsToKeep)
baseDF.cache().count()
display(baseDF)
データ型の修正
上のスキーマを確認してみます。price
フィールドが文字列として判定されていることがわかります。我々のタスクでは、これを数値型のフィールド(double型)にする必要があります。
修正しましょう。
from pyspark.sql.functions import col, translate
fixedPriceDF = baseDF.withColumn("price", translate(col("price"), "$,", "").cast("double"))
display(fixedPriceDF)
サマリー統計情報
2つのオプションがあります:
- describe
- summary (describe + IQR)
display(fixedPriceDF.describe())
display(fixedPriceDF.summary())
Null値
Null値を取り扱うための数多くの手法があります。時には、nullは実際には予測しようとする事柄のキーのインジケータとなることがあります(例: フォームの特定の割合を記入しない場合、承認される確率が減少する)。
nullを取り扱う方法として:
- nullを含むすべてのレコードを削除
- 数値型:
- mean/median/zeroなどで補完
- カテゴリー型:
- モードで置換
- nullに対する特殊なカテゴリーを作成
- 欠損地を挿入するために設計されたALSのようなテクニックを使う
カテゴリー特徴量/数値特徴量に何かしらの補完テクニックを用いる場合、当該フィールドが保管されたことを示す追加フィールドを含めるべきです(なぜこれが必要なのかを考えてみてください)
カテゴリー特徴量host_is_superhost
にいくつかnullが含まれています。これらのカラムのいずれかがnullである行を除外しましょう。
SparkMLのImputer(この後カバーします)は、カテゴリー特徴量の補完をサポートしていませんので、この時点ではこれがもっともシンプルなアプローチとなります。
noNullsDF = fixedPriceDF.na.drop(subset=["host_is_superhost"])
補完: Doubleへのキャスト
SparkMLのImputer
は、すべてのフィールドがdouble型である必要があります Python/Scala。すべてのintegerフィールドをdoubleにキャストしましょう。
from pyspark.sql.functions import col
from pyspark.sql.types import IntegerType
integerColumns = [x.name for x in baseDF.schema.fields if x.dataType == IntegerType()]
doublesDF = noNullsDF
for c in integerColumns:
doublesDF = doublesDF.withColumn(c, col(c).cast("double"))
columns = "\n - ".join(integerColumns)
print(f"Columns converted from Integer to Double:\n - {columns}")
Columns converted from Integer to Double:
- host_total_listings_count
- accommodates
- bedrooms
- beds
- minimum_nights
- number_of_reviews
- review_scores_rating
- review_scores_accuracy
- review_scores_cleanliness
- review_scores_checkin
- review_scores_communication
- review_scores_location
- review_scores_value
値を補完したかどうかを示すダミー変数を追加します。
from pyspark.sql.functions import when
imputeCols = [
"bedrooms",
"bathrooms",
"beds",
"review_scores_rating",
"review_scores_accuracy",
"review_scores_cleanliness",
"review_scores_checkin",
"review_scores_communication",
"review_scores_location",
"review_scores_value"
]
for c in imputeCols:
doublesDF = doublesDF.withColumn(c + "_na", when(col(c).isNull(), 1.0).otherwise(0.0))
display(doublesDF.describe())
from pyspark.ml.feature import Imputer
imputer = Imputer(strategy="median", inputCols=imputeCols, outputCols=imputeCols)
imputedDF = imputer.fit(doublesDF).transform(doublesDF)
外れ値を排除
price
カラムの min と max の値を見てみましょう:
display(imputedDF.select("price").describe())
いくつか非常に高価な物件があります。しかし、これらに対して何をすべきかを決めるのはデータサイエンティストの仕事です。しかし、「無料」のAirbnbはフィルタリングします。
priceがゼロである物件がいくつあるのかを見てみましょう。
imputedDF.filter(col("price") == 0).count()
1
厳密に正のpriceを持つ行のみを保持します。
posPricesDF = imputedDF.filter(col("price") > 0)
minimum_nightsカラムの min と max を見てみましょう:
display(posPricesDF.select("minimum_nights").describe())
display(posPricesDF
.groupBy("minimum_nights").count()
.orderBy(col("count").desc(), col("minimum_nights"))
)
1年という最小滞在期間は合理的な制限のように見えます。minimum_nightsが365を上回るレコードを除外しましょう:
cleanDF = posPricesDF.filter(col("minimum_nights") <= 365)
display(cleanDF)
OK、データが綺麗になりました。これを用いてモデルの構築をスタートできるように、このデータフレームをファイルに保存しましょう。
outputPath = "/tmp/sf-airbnb/sf-airbnb-clean.parquet"
cleanDF.write.mode("overwrite").parquet(outputPath)