2
2

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

Sparkを用いたサンフランシスコ消防署の通報データの分析

Posted at

ノートブックギャラリーで公開されているAnalysis of the San Fransisco fire calls with Sparkをウォークスルーする内容となっています。

翻訳版ノートブックはこちらとなります。

このノートブックは書籍Learning Spark 2nd Editionの第3章のエンドツーエンドのサンプルを示すものであり、San Francisco Fire Department Calls データセットに対する一般的な分析パターンとオペレーションのために、どのようにデータフレームとSpark SQLを用いるのかを説明します。また、分析のためにどのようにETLやデータの検証、データのクエリーを行うのかをデモします。さらに、インメモリのSparkデータフレームをどのようにParquetファイルとして保存するのか、SparkがサポートするParquetデータソースとしてどのように読み込むのかを説明します。

データの確認

%fs ls /databricks-datasets/learning-spark-v2/sf-fire/sf-fire-calls.csv

Screen Shot 2022-07-19 at 10.16.59.png

Python
from pyspark.sql.types import *
from pyspark.sql.functions import *

sf_fire_file = "/databricks-datasets/learning-spark-v2/sf-fire/sf-fire-calls.csv"
%fs head /databricks-datasets/learning-spark-v2/sf-fire/sf-fire-calls.csv

Screen Shot 2022-07-19 at 10.17.39.png

スキーマの定義

ファイルには400万レコードが含まれているのでスキーマを定義します。大規模ファイルにおけるスキーマ推定には大きなコストが必要となります。

Python
fire_schema = StructType([StructField('CallNumber', IntegerType(), True),
                     StructField('UnitID', StringType(), True),
                     StructField('IncidentNumber', IntegerType(), True),
                     StructField('CallType', StringType(), True),                  
                     StructField('CallDate', StringType(), True),      
                     StructField('WatchDate', StringType(), True),
                     StructField('CallFinalDisposition', StringType(), True),
                     StructField('AvailableDtTm', StringType(), True),
                     StructField('Address', StringType(), True),       
                     StructField('City', StringType(), True),       
                     StructField('Zipcode', IntegerType(), True),       
                     StructField('Battalion', StringType(), True),                 
                     StructField('StationArea', StringType(), True),       
                     StructField('Box', StringType(), True),       
                     StructField('OriginalPriority', StringType(), True),       
                     StructField('Priority', StringType(), True),       
                     StructField('FinalPriority', IntegerType(), True),       
                     StructField('ALSUnit', BooleanType(), True),       
                     StructField('CallTypeGroup', StringType(), True),
                     StructField('NumAlarms', IntegerType(), True),
                     StructField('UnitType', StringType(), True),
                     StructField('UnitSequenceInCallDispatch', IntegerType(), True),
                     StructField('FirePreventionDistrict', StringType(), True),
                     StructField('SupervisorDistrict', StringType(), True),
                     StructField('Neighborhood', StringType(), True),
                     StructField('Location', StringType(), True),
                     StructField('RowID', StringType(), True),
                     StructField('Delay', FloatType(), True)])

データの読み込み

Python
fire_df = spark.read.csv(sf_fire_file, header=True, schema=fire_schema)

データに対して複数のオペレーションを実行するので、データフレームをキャッシュします。

Python
fire_df.cache()
Python
fire_df.count()

Screen Shot 2022-07-19 at 10.21.11.png

データフレームの最初の50レコードを表示します。以下ではグラフが設定されているので、呼び出しごとの遅延時間が棒グラフで表示されます。

Python
display(fire_df.limit(50))

Screen Shot 2022-07-19 at 10.22.04.png

フィルタリング

Call Typeの"Medical Incident"を除外します。

データフレームに対するfilter()where()メソッドは同じものであることに注意してください。引数のタイプに関してはドキュメントをチェックしてください。

Python
few_fire_df = (fire_df.select("IncidentNumber", "AvailableDtTm", "CallType") 
              .where(col("CallType") != "Medical Incident"))

few_fire_df.show(5, truncate=False)

Screen Shot 2022-07-19 at 10.23.15.png

Q-1) 消防署に対して何種類の呼び出しの電話がありましたか?

念のために、カラムに含まれる"null"文字列はカウントしないようにしましょう。

Python
fire_df.select("CallType").where(col("CallType").isNotNull()).distinct().count()

Screen Shot 2022-07-19 at 10.24.16.png

Q-2) 消防署に対してどのようなタイプの呼び出しがありましたか?

これらがSF Fire Departmentに対する全ての呼び出しタイプとなります。

Python
fire_df.select("CallType").where(col("CallType").isNotNull()).distinct().show(10, False)

Screen Shot 2022-07-19 at 10.25.24.png

display()メソッドを使用することもできます。

Python
fire_df.select("CallType").where(col("CallType").isNotNull()).distinct().display()

Screen Shot 2022-07-19 at 10.26.37.png

Q-3) 遅延時間が5分以上の全ての応答を見つけ出してください

  1. カラム名DelayReponseDelayedinMinsに変更します。
  2. 新たなデータフレームを返却します。
  3. 出火地点への反応時間が5分以上遅延した全ての電話を探し出します。
Python
new_fire_df = fire_df.withColumnRenamed("Delay", "ResponseDelayedinMins")
new_fire_df.select("ResponseDelayedinMins").where(col("ResponseDelayedinMins") > 5).show(5, False)

Screen Shot 2022-07-19 at 10.27.48.png

ETL(Extract/Transform/Load)

いくつかのETL処理をやってみましょう:

  1. 後で時間に基づくクエリーを行えるように、文字列の日付をSparkのTimestampデータ型に変換します。
  2. 変換された結果が返却されます。
  3. 新たなデータフレームをキャッシュします。
Python
fire_ts_df = (new_fire_df
              .withColumn("IncidentDate", to_timestamp(col("CallDate"), "MM/dd/yyyy")).drop("CallDate") 
              .withColumn("OnWatchDate",   to_timestamp(col("WatchDate"), "MM/dd/yyyy")).drop("WatchDate")
              .withColumn("AvailableDtTS", to_timestamp(col("AvailableDtTm"), "MM/dd/yyyy hh:mm:ss a")).drop("AvailableDtTm"))        
Python
fire_ts_df.cache()
fire_ts_df.columns

Screen Shot 2022-07-19 at 10.29.43.png

変換されたカラムがSpark Timestamp型であることを確認します。

Python
fire_ts_df.select("IncidentDate", "OnWatchDate", "AvailableDtTS").show(5, False)

Screen Shot 2022-07-19 at 10.30.20.png

Q-4) 最も多い呼び出しタイプは何ですか?

降順で並び替えをしましょう。

Python
(fire_ts_df
 .select("CallType").where(col("CallType").isNotNull())
 .groupBy("CallType")
 .count()
 .orderBy("count", ascending=False)
 .show(n=10, truncate=False))

Screen Shot 2022-07-19 at 10.31.30.png

Q-4a) 呼び出しの大部分を占めているzipコードはなんですか?

サンフランシスコ消防署への通報においてどのzipコードが多いのか、どの場所でどのタイプが多いのかを調査してみましょう。

  1. CallTypeでフィルタリングします。
  2. CallTypeZip codeでグルーピングします。
  3. カウントを行い、降順で表示します。

最も共通する電話はMedical Incidentに関連するものであり、多いzipコードは9410294103です。

Python
display(fire_ts_df
 .select("CallType", "ZipCode")
 .where(col("CallType").isNotNull())
 .groupBy("CallType", "Zipcode")
 .count()
 .orderBy("count", ascending=False))

Screen Shot 2022-07-19 at 10.32.17.png

Q-4b) Zipコード94102と94103はサンフランシスコのどの地域ですか?

これらの2つのZipコードに関連づけられる地域を見つけ出しましょう。おそらく、通報率が高い地域は隣接しているケースがあるのでしょう。

Python
display(fire_ts_df.select("Neighborhood", "Zipcode").where((col("Zipcode") == 94102) | (col("Zipcode") == 94103)).distinct())

Screen Shot 2022-07-19 at 10.32.56.png

Q-5) 全ての呼び出しの合計、呼び出しに対する反応時間の平均値、最小値、最大値は何ですか?

いくつかのカラムに対して合計、平均値、最小値、最大値を計算するためにビルトインのSpark SQL関数を使いましょう。

  • 通報の合計数
  • 通報地点に消防隊員が到着するまでの反応時間の平均値、最小値、最大値
Python
fire_ts_df.select(sum("NumAlarms"), avg("ResponseDelayedinMins"), min("ResponseDelayedinMins"), max("ResponseDelayedinMins")).show()

Screen Shot 2022-07-19 at 10.33.34.png

Q-6a) CSVファイルには何年分のデータが含まれていますか?

Timestamp型のIncidentDateから年を取り出すためにSpark SQL関数year()を使うことができます。

全体的には2000-2018のデータが含まれていることがわかります。

Python
fire_ts_df.select(year('IncidentDate')).distinct().orderBy(year('IncidentDate')).show()

Screen Shot 2022-07-19 at 10.34.09.png

Q-6b) 2018年のどの週が最も通報が多かったですか?

注意: Week 1は新年の週で、Week 25は7/4の週となります。花火の季節を考えると、この週の通報が多いのは納得できます。

Python
fire_ts_df.filter(year('IncidentDate') == 2018).groupBy(weekofyear('IncidentDate')).count().orderBy('count', ascending=False).show()

Screen Shot 2022-07-19 at 10.34.46.png

Q-7) 2018年で最も反応が悪かったサンフランシスコの地域はどこですか?

Presidio Heightsに住んでいると消防隊員は3分以内に到着し、Mission Bayに住んでいるのであれば6分以上かかるようです。

Python
fire_ts_df.select("Neighborhood", "ResponseDelayedinMins").filter(year("IncidentDate") == 2018).show(10, False)

Screen Shot 2022-07-19 at 10.35.20.png

Q-8a) どのようにしてデータフレームをParquetファイルに保存し、読み戻すことができますか?

Python
fire_ts_df.write.format("parquet").mode("overwrite").save(work_path)

Q-8b) データを保存し、読み戻せるようにするためにどのようにSQLテーブルを使用できますか?

Python
fire_ts_df.write.format("parquet").mode("overwrite").saveAsTable("FireServiceCalls")
SQL
%sql
CACHE TABLE FireServiceCalls
SQL
%sql
SELECT * FROM FireServiceCalls LIMIT 10

Screen Shot 2022-07-19 at 10.37.18.png

Q-8c) どのようにParquetファイルを読み込むことができますか?

Parquetメタデータの一部にスキーマが格納されているので、スキーマを指定する必要がないことに注意してください。

Python
file_parquet_df = spark.read.format("parquet").load(work_path)
display(file_parquet_df.limit(10))

Screen Shot 2022-07-19 at 10.38.15.png

Databricks 無料トライアル

Databricks 無料トライアル

2
2
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
2

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?