search
LoginSignup
1

More than 3 years have passed since last update.

Organization

PySparkを試す 2日目

PySparkを試す の続き

ソースはGist

やったこと

CSVの読み込み

RDDやPandasのDataFrameから変換できるけど2.0からはcsv()で読み込める。

spark = SparkSession.builder.getOrCreate()
# https://gihyo.jp/book/2015/978-4-7741-7631-4/support
df  = spark.read.csv("click_data_sample.csv", header=True)

列名に . が入るとうまく処理できず、回避方法がわからなかったので列名を置換した。もっといい方法があるはず。

# to_timestampの第一引数の指定方法がわからなかったので列名をリネーム
old_cols = df.columns
new_cols = [c.replace('.', '_') for c in df.columns]
for old,new in zip(old_cols, new_cols):
    df = df.withColumnRenamed(old, new)
df.show(5)
+-------------------+-------+-----------+
|           click_at|user_id|campaign_id|
+-------------------+-------+-----------+
|2015-04-27 20:40:40| 144012|Campaign077|
|2015-04-27 00:27:55|  24485|Campaign063|
|2015-04-27 00:28:13|  24485|Campaign063|
|2015-04-27 00:33:42|  24485|Campaign038|
|2015-04-27 01:00:04|  24485|Campaign063|
+-------------------+-------+-----------+
only showing top 5 rows

型の確認

dtypesprintSchema()で見れた。

df.dtypes
[('click_at', 'string'), ('user_id', 'string'), ('campaign_id', 'string')]

df.printSchema()
root
 |-- click_at: string (nullable = true)
 |-- user_id: string (nullable = true)
 |-- campaign_id: string (nullable = true)

Datetime型に変換

to_timestamp() を使った。

from pyspark.sql.functions import to_timestamp

df = df.withColumn("parsed", to_timestamp('click_at', 'yyyy-MM-dd HH:mm:ss'))
df.show(5)
+-------------------+-------+-----------+-------------------+
|           click_at|user_id|campaign_id|             parsed|
+-------------------+-------+-----------+-------------------+
|2015-04-27 20:40:40| 144012|Campaign077|2015-04-27 20:40:40|
|2015-04-27 00:27:55|  24485|Campaign063|2015-04-27 00:27:55|
|2015-04-27 00:28:13|  24485|Campaign063|2015-04-27 00:28:13|
|2015-04-27 00:33:42|  24485|Campaign038|2015-04-27 00:33:42|
|2015-04-27 01:00:04|  24485|Campaign063|2015-04-27 01:00:04|
+-------------------+-------+-----------+-------------------+
only showing top 5 rows

時間ごとに集計

date_format() で時間の文字列を作って、それごとにgroupBy()count()した。

from pyspark.sql.functions import date_format

df.select(date_format('parsed', 'yyyy-MM-dd HH').alias('hour'), 'user_id').groupBy('hour').count().sort('hour').show(5)
+-------------+-----+
|         hour|count|
+-------------+-----+
|2015-04-27 00| 2322|
|2015-04-27 01| 1571|
|2015-04-27 02|  943|
|2015-04-27 03|  631|
|2015-04-27 04|  438|
+-------------+-----+
only showing top 5 rows

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
What you can do with signing up
1