PySparkを試す の続き
ソースはGist
やったこと
CSVの読み込み
RDDやPandasのDataFrameから変換できるけど2.0からはcsv()で読み込める。
spark = SparkSession.builder.getOrCreate()
# https://gihyo.jp/book/2015/978-4-7741-7631-4/support
df = spark.read.csv("click_data_sample.csv", header=True)
列名に .
が入るとうまく処理できず、回避方法がわからなかったので列名を置換した。もっといい方法があるはず。
# to_timestampの第一引数の指定方法がわからなかったので列名をリネーム
old_cols = df.columns
new_cols = [c.replace('.', '_') for c in df.columns]
for old,new in zip(old_cols, new_cols):
df = df.withColumnRenamed(old, new)
df.show(5)
+-------------------+-------+-----------+
| click_at|user_id|campaign_id|
+-------------------+-------+-----------+
|2015-04-27 20:40:40| 144012|Campaign077|
|2015-04-27 00:27:55| 24485|Campaign063|
|2015-04-27 00:28:13| 24485|Campaign063|
|2015-04-27 00:33:42| 24485|Campaign038|
|2015-04-27 01:00:04| 24485|Campaign063|
+-------------------+-------+-----------+
only showing top 5 rows
型の確認
dtypes かprintSchema()で見れた。
df.dtypes
[('click_at', 'string'), ('user_id', 'string'), ('campaign_id', 'string')]
df.printSchema()
root
|-- click_at: string (nullable = true)
|-- user_id: string (nullable = true)
|-- campaign_id: string (nullable = true)
Datetime型に変換
to_timestamp() を使った。
from pyspark.sql.functions import to_timestamp
df = df.withColumn("parsed", to_timestamp('click_at', 'yyyy-MM-dd HH:mm:ss'))
df.show(5)
+-------------------+-------+-----------+-------------------+
| click_at|user_id|campaign_id| parsed|
+-------------------+-------+-----------+-------------------+
|2015-04-27 20:40:40| 144012|Campaign077|2015-04-27 20:40:40|
|2015-04-27 00:27:55| 24485|Campaign063|2015-04-27 00:27:55|
|2015-04-27 00:28:13| 24485|Campaign063|2015-04-27 00:28:13|
|2015-04-27 00:33:42| 24485|Campaign038|2015-04-27 00:33:42|
|2015-04-27 01:00:04| 24485|Campaign063|2015-04-27 01:00:04|
+-------------------+-------+-----------+-------------------+
only showing top 5 rows
時間ごとに集計
date_format() で時間の文字列を作って、それごとにgroupBy()、count()した。
from pyspark.sql.functions import date_format
df.select(date_format('parsed', 'yyyy-MM-dd HH').alias('hour'), 'user_id').groupBy('hour').count().sort('hour').show(5)
+-------------+-----+
| hour|count|
+-------------+-----+
|2015-04-27 00| 2322|
|2015-04-27 01| 1571|
|2015-04-27 02| 943|
|2015-04-27 03| 631|
|2015-04-27 04| 438|
+-------------+-----+
only showing top 5 rows