LoginSignup
8
4

More than 1 year has passed since last update.

逆引きPySpark (2.日付時刻編)

Last updated at Posted at 2022-09-04

PySparkでこういう場合はどうしたらいいのかをまとめた逆引きPySparkシリーズの日付時刻編です。
(随時更新予定です。)

  • 原則としてApache Spark 3.3のPySparkのAPIに準拠していますが、一部、便利なDatabricks限定の機能も利用しています(利用しているところはその旨記載しています)。
  • Databricks Runtime 11.0 上で動作することを確認しています。
  • ノートブックをこちらのリポジトリ からReposにてご使用のDatabricksの環境にダウンロードできます。
  • 逆引きPySparkの他の章については、こちらの記事をご覧ください。

例文の前提条件

  • SparkSessionオブジェクトがsparkという変数名で利用可能なこと

2-1 現在の取得

2-1-1 現在の時刻を取得する

current_timestamp()関数を使って、現在の時刻を取得します。

# 構文
df.withColumn( <追加するカラム名>, F.current_timestamp() )

# 例文
from pyspark.sql import functions as F

df = df.withColumn( "current_time_utc", F.current_timestamp() )
display( df.select( "device_id", "current_time_utc" ) )

出力例

device_id current_time_utc
1 1 2022-09-03T13:43:25.514+0000

2-1-2 現在の日付を取得する

current_date()関数を使って、現在の日付を取得します。

# 構文
df.withColumn( <追加するカラム名>, F.current_date() )

# 例文
from pyspark.sql import functions as F

df = df.withColumn( "today", F.current_date() )
display( df.select( "device_id", "today" ) )

出力例

device_id today
1 1 2022-09-03

2-2 抽出

2-2-1 時刻から年の値を取り出す

year()関数を使って、時刻から年の値を取り出す。

# 構文
df.withColumn( <追加するカラム名>, F.year(<時刻型のカラム>) )

# 例文
from pyspark.sql import functions as F

df = df.withColumn( "year", F.year( "time" ) )
display( df.select( "time", "year" ) )
time year
1 2019-03-14T22:07:35.910+0000 2019
2 2016-09-29T07:34:26.932+0000 2016

2-2-2 時刻から月の値を取り出す

month()関数を使って、時刻から年の値を取り出す。

# 構文
df.withColumn( <追加するカラム名>, F.month(<時刻型のカラム>) )

# 例文
from pyspark.sql import functions as F

df = df.withColumn( "month", F.month( "time" ) )
display( df.select( "time", "month" ) )
time month
1 2019-03-14T22:07:35.910+0000 3
2 2016-09-29T07:34:26.932+0000 9

2-2-3 時刻から時の値を取り出す

hour()関数を使って、時刻から時の値を取り出す。

# 構文
df.withColumn( <追加するカラム名>, F.hour(<時刻型のカラム>) )

# 例文
from pyspark.sql import functions as F

df = df.withColumn( "hour", F.hour( "time" ) )
display( df.select( "time", "hour" ) )
time hour
1 2018-06-02T19:41:21.216+0000 19
2 2017-09-04T02:34:43.919+0000 2

2-2-4 時刻から分の値を取り出す

minute()関数を使って、時刻から分の値を取り出す。

# 構文
df.withColumn( <追加するカラム名>, F.minute(<時刻型のカラム>) )

# 例文
from pyspark.sql import functions as F

df = df.withColumn( "minute", F.minute( "time" ) )
display( df.select( "time", "minute" ) )
time minute
1 2018-06-02T19:41:21.216+0000 41
2 2017-09-04T02:34:43.919+0000 34

2-2-5 時刻から秒の値を取り出す

second()関数を使って、時刻から秒の値を取り出す。

# 構文
df.withColumn( <追加するカラム名>, F.second(<時刻型のカラム>) )

# 例文
from pyspark.sql import functions as F

df = df.withColumn( "second", F.second( "time" ) )
display( df.select( "time", "second" ) )
time second
1 2018-06-02T19:41:21.216+0000 21
2 2017-09-04T02:34:43.919+0000 43

2-2-6 時刻を特定の単位で切り捨てる

date_trunc()関数を使って、時刻を特定の単位で切り捨てます。例えば、"year"を指定すると、年より小さい単位の情報が切り捨てられ一番小さい値に置き換えられます。

# 構文
df.withColumn( <追加するカラム名>, F.date_trunc( <切り捨てる単位>, <時刻型のカラム> ) )

# 例文
from pyspark.sql import functions as F

df = df.withColumn( "date_trunc", F.date_trunc( "month", "time" ) )
display( df.select( "time","date_trunc" ) )
time date_trunc
1 2018-06-02T19:41:21.216+0000 2016-03-01T00:00:00.000+0000
2 2017-09-04T02:34:43.919+0000 2016-03-01T00:00:00.000+0000

2-2-7 日付を特定の単位で切り捨てる

trunc()関数を使って、日付を特定の単位で切り捨てます。例えば、"year"を指定すると、年より小さい単位の情報が切り捨てられ一番小さい値に置き換えられます。

# 構文
df.withColumn( <追加するカラム名>, F.trunc( <切り捨てる単位>, <時刻型のカラム> ) )

# 例文
from pyspark.sql import functions as F

df = df.withColumn( "trunc", F.trunc( "date", "month" ) )
display( df.select( "date", "trunc" ) )
date trunc
1 2018-06-02 2018-06-01
2 2017-09-04 2017-09-01

2-2-8 日付が年の何日目かに該当するかを求める

dayofyear()関数を使って、日付が年の何日目かに該当するかを求めます。1月1日は1になります。

# 構文
df.withColumn( <追加するカラム名>, F.dayofyear( <日付型のカラム> ) )

# 例文
from pyspark.sql import functions as F

df = df.withColumn( "dayofyear", F.dayofyear( "date" ) )
display( df.select( "date", "dayofyear" ) )
date dayofyear
1 2018-06-02 153
2 2017-09-04 247

2-2-9 日付が年の何週目に該当するかを求める

weekofyear()関数を使って、日付が年の何週目かに該当するかを求めます。1月1日が属する週は1になります。

# 構文
df.withColumn( <追加するカラム名>, F.weekofyear( <日付型のカラム> ) )

# 例文
from pyspark.sql import functions as F

df = df.withColumn( "weekofyear", F.weekofyear( "date" ) )
display( df.select( "date", "weekofyear" ) )
date weekofyear
1 2018-06-02 22
2 2017-09-04 36

2-2-10 日付が年の第何四半期に該当するかを求める

quarter()関数を使って、日付が年の第何四半期に該当するかを求めます。

# 構文
df.withColumn( <追加するカラム名>, F.quarter( <日付型のカラム> ) )

# 例文
from pyspark.sql import functions as F

df = df.withColumn( "quarter", F.quarter( "date" ) )
display( df.select( "date", "quarter" ) )
date quarter
1 2018-06-02 2
2 2017-09-04 3

2-2-11 日付が月の何日目かに該当するかを求める

dayofmonth()関数を使って、日付が年の何日目に該当するかを求めます。

# 構文
df.withColumn( <追加するカラム名>, F.dayofmonth( <日付型のカラム> ) )

# 例文
from pyspark.sql import functions as F

df = df.withColumn( "day", F.dayofmonth( "date" ) )
display( df.select( "date", "day" ) )
date day
1 2018-06-02 2
2 2017-09-04 4

2-2-12 日付が週の何日目かに該当するかを求める

dayofweek()関数を使って、日付が週の何日目かに該当するかを求めます。日曜日が1となります。

# 構文
df.withColumn( <追加するカラム名>, F.dayofweek( <日付型のカラム> ) )

# 例文
from pyspark.sql import functions as F

df = df.withColumn( "dayofweek", F.dayofweek( "date" ) )
display( df.select( "date", "dayofweek" ) )
date dayofweek
1 2018-06-02 7
2 2017-09-04 2

2-2-13 その月の月末に該当する日付を求める

last_day()関数を使って、日付から、その月の月末に該当する日付を求める。

# 構文
df.withColumn( <追加するカラム名>, F.last_day( <日付型のカラム> ) )

# 例文
from pyspark.sql import functions as F

df = df.withColumn( "last_day_of_month", F.last_day( "date" ) )
display( df.select( "date", "last_day_of_month" ) )
date last_day_of_month
1 2018-06-02 2018-06-30
2 2017-09-04 2017-09-30

2-3 変換

2-3-1 UNIX時間から時刻へ変換する

timestamp_seconds()関数を使って、UNIX時間を時刻に変換します。

# 構文
df.withColumn( <追加するカラム名>, F.timestamp_seconds( <日付型のカラム> ) )

# 例文
from pyspark.sql import functions as F

df = df.withColumn( "time", F.timestamp_seconds("timestamp") )
display( df.select( "timestamp", "time" ) )
timestamp time
1 1527968481.2164252 2018-06-02T19:41:21.216+0000
2 1504492483.9199414 2017-09-04T02:34:43.919+0000

2-3-2 時刻からUNIX時間へ変換する

unix_timestamp()関数を使って、時刻をUNIX時間に変換します。

# 構文
df.withColumn( <追加するカラム名>, F.unix_timestamp( <日付型のカラム> ) )

# 例文
from pyspark.sql import functions as F

df = df.withColumn( "unixtime", F.unix_timestamp("time") )
display( df.select( "time", "unixtime" ) )
time unixtime
1 2018-06-02T19:41:21.216+0000 1527968481
2 2017-09-04T02:34:43.919+0000 1504492483

2-3-3 時刻をUTCから特定のタイムゾーンへ変換する

from_utc_timestamp()関数を使って、時刻をUTCから特定のタイムゾーンへ変換します。タイムゾーンは、以下の4つの形式のいずれかで指定できます。

フォーマット 備考
<地域>/<都市名> Asia/Tokyo
(+|-)HH:mm +09:00
UTC ‘+00:00’のエイリアス
Z ‘+00:00’のエイリアス
# 構文
df.withColumn( <追加するカラム名>, F.from_utc_timestamp( <時刻型のカラム>, <タイムゾーン> ) )

# 例文
from pyspark.sql import functions as F

df = df.withColumn( "time_jst", F.from_utc_timestamp("time", "Asia/Tokyo") )
display( df.select( "time", "time_jst" ) )
time time_jst
1 2018-06-02T19:41:21.216+0000 2018-06-03T04:41:21.216+0000
2 2017-09-04T02:34:43.919+0000 2017-09-04T11:34:43.919+0000

2-3-4 時刻を特定のタイムゾーンからUTCへ変換する

to_utc_timestamp()関数を使って、時刻を特定のタイムゾーンからUTCへ変換します。タイムゾーンは、以下の4つの形式のいずれかで指定できます。

フォーマット 備考
<地域>/<都市名> Asia/Tokyo
(+|-)HH:mm +09:00
UTC ‘+00:00’のエイリアス
Z ‘+00:00’のエイリアス
# 構文
df.withColumn( <追加するカラム名>, F.to_utc_timestamp( <時刻型のカラム>, <タイムゾーン> ) )

# 例文
from pyspark.sql import functions as F

df = df.withColumn( "time_utc", F.to_utc_timestamp("time_jst", "Asia/Tokyo") )
display( df.select( "time_jst", "time_utc" ) )
time_jst time_utc
1 2018-06-03T04:41:21.216+0000 2018-06-02T19:41:21.216+0000
2 2017-09-04T11:34:43.919+0000 2017-09-04T02:34:43.919+0000

2-3-5 時刻/日付から文字列へ変換する

date_format()関数を使って、時刻や日付を、指定したフォーマットに従って文字列に変換します。

# 構文
df.withColumn( <追加するカラム名>, F.date_format( <時刻型/日付型のカラム>, <フォーマット> ) )

# 例文
from pyspark.sql import functions as F

df = df.withColumn( "timestamp_string", F.date_format( "time", "yyyyMMdd-HHmmss" ) )
display( df.select( "time", "timestamp_string" ) )
time timestamp_string
1 2018-06-02T19:41:21.216+0000 20180602-194121
2 2017-09-04T02:34:43.919+0000 20170904-023443

2-3-6 文字列から時刻へ変換する

to_timestamp()関数を使って、文字列を、指定したフォーマットに従って時刻に変換します。

# 構文
df.withColumn( <追加するカラム名>, F.to_timestamp( <文字列のカラム>, <フォーマット> ) )

# 例文
from pyspark.sql import functions as F

df = df.withColumn( "to_timestamp", F.to_timestamp( "timestamp_string", "yyyyMMdd-HHmmss" ) )
display( df.select( "timestamp_string", "to_timestamp" ) )
timestamp_string to_timestamp
1 20180602-194121 2018-06-02T19:41:21.216+0000
2 20170904-023443 2017-09-04T02:34:43.919+0000

2-3-7 文字列から日付へ変換する

to_date()関数を使って、文字列を、指定したフォーマットに従って日付に変換します。

# 構文
df.withColumn( <追加するカラム名>, F.to_date( <文字列のカラム>, <フォーマット> ) )

# 例文
from pyspark.sql import functions as F

df = df.withColumn( "to_date", F.to_date( "timestamp_string", "yyyyMMdd-HHmmss" ) )
display( df.select( "timestamp_string", "to_date" ) )
timestamp_string to_date
1 20180602-194121 2018-06-02
2 20170904-023443 2017-09-04

2-3-8 時刻を日付に変換する

to_date()関数を使って、時刻を日付に変換します。

# 構文
df.withColumn( <追加するカラム名>, F.to_date( <時刻型のカラム> ) )

# 例文
from pyspark.sql import functions as F

df = df.withColumn( "date", F.to_date( "time_jst" ) )
display( df.select( "time_jst", "date" ) )
time_jst date
1 2018-06-03T04:41:21.216+0000 2018-06-03
2 2017-09-04T11:34:43.919+0000 2017-09-04

2-3-9 年、月、日から、日付を作成する

make_date()関数を使って、時刻を日付に変換します。

# 構文
df.withColumn( <追加するカラム名>, F.make_date( <年を表す文字列>, <月を表す文字列>, <日を表す文字列> ) )

# 例文
from pyspark.sql import functions as F

df = df.withColumn( "made_date", F.make_date( "year", "month", "day" ) )
display( df.select( "year", "month", "day", "made_date" ) )
year month day make_date
1 2018 6 3 2018-06-03
2 2017 9 4 2017-09-04

2-4 日付の足し算、引き算

2-4-1 2つの日付の間の日数を求める

datediff()関数を使って、2つの日付の間の日数を求めます。前の日付が2022-01-01で、後の日付が2022-01-02の場合は1になります。前の日付が2022-01-02で、後の日付が2022-01-01の場合は-1になります。

# 構文
df.withColumn( <追加するカラム名>, F.datediff( <後の日付の日付型カラム>, <前の日付の日付型カラム> ) )

# 例文
from pyspark.sql import functions as F

df2 = df2.withColumn( "datediff", F.datediff( "end_date", "start_date" ) )
display( df2.select( "start_date", "end_date", "datediff" ) )
start_date end_date datediff
1 2017-11-07 2017-10-01 -37
2 2016-11-17 2016-05-30 -171

2-4-2 2つの日付の間の月数を求める

months_between()関数を使って、2つの日付の間の月数を求めます。どちらの日付も、月の中で同じ日、あるいは月末である場合は整数が返ります。それ以外の場合は、月を31日として計算された浮動小数点数が返ります。

# 構文
df.withColumn( <追加するカラム名>, F.months_between( <後の日付の日付型カラム>, <前の日付の日付型カラム> ) )

# 例文
from pyspark.sql import functions as F

df2 = df2.withColumn( "months_between", F.months_between( "end_date", "start_date" ) )
display( df2.select( "start_date", "end_date", "months_between" ) )
start_date end_date months_between
1 2017-11-07 2017-10-01 -1.19354839
2 2016-11-17 2016-05-30 -5.58064516

2-4-3 次の○曜日の日付を求める(例:次の月曜日の日付)

next_day()関数を使って、入力となる日付の次の○曜日(例:月曜日)の日付を求めます。曜日として“Mon”、“Tue”、“Wed”、“Thu”、“Fri”、“Sat”、“Sun”を設定可能です(大文字小文字不問)。

# 構文
df.withColumn( <追加するカラム名>, F.next_day( <日付型のカラム>, <曜日> ) )

# 例文
from pyspark.sql import functions as F

df2 = df2.withColumn( "next_day_of_end_date", F.next_day( "end_date", "Mon" ) )
display( df2.select( "end_date", "next_day_of_end_date" )  )
end_date next_day_of_end_date
1 2018-05-23 2018-05-28
2 2018-12-22 2018-12-24

2-4-4 ○日後の日付を求める

date_add()関数を使って、入力となる日付の○日後の日付を求めます。

# 構文
df.withColumn( <追加するカラム名>, F.date_add( <日付型のカラム>, <日数> ) )

# 例文
from pyspark.sql import functions as F

df2 = df2.withColumn( "three_days_after_end_date", F.date_add( "end_date", 3 ) )
display( df2.select( "end_date", "three_days_after_end_date" )  )
end_date three_days_after_end_date
1 2018-05-23 2018-05-26
2 2018-12-22 2018-12-25

2-4-5 ○日前の日付を求める

date_sub()関数を使って、入力となる日付の○日前の日付を求めます。

# 構文
df.withColumn( <追加するカラム名>, F.date_sub( <日付型のカラム>, <日数> ) )

# 例文
from pyspark.sql import functions as F

df2 = df2.withColumn( "three_days_before_start_date", F.date_sub( "start_date", 3 ) )
display( df2.select( "start_date", "three_days_before_start_date" )  )
end_date three_days_before_start_date
1 2018-05-23 2018-05-24
2 2018-12-22 2018-02-13

2-4-6 ○ヶ月後の日付を求める

add_months()関数を使って、入力となる日付の○日前の日付を求めます。

# 構文
df.withColumn( <追加するカラム名>, F.add_months( <日付型のカラム>, <日数> ) )

# 例文
from pyspark.sql import functions as F

df2 = df2.withColumn( "two_months_after_end_date", F.add_months( "end_date", 2 ) )
display( df2.select( "end_date", "two_months_after_end_date" )  )
end_date two_months_after_end_date
1 2018-05-23 2018-07-23
2 2018-12-22 2019-02-22

2-5 その他

2-5-1 特定の長さのウィンドウについて統計値を集計する

window()関数を使って、特定の長さの期間を表すウィンドウを作成します。groupBy()の引数としてウィンドウを与えることで、そのウィンドウについて統計値を集計することができます。引数としてスライディングウィンドウの間隔を設定することで、設定した間隔ごとのスライディングウィンドウが作成されます。設定されない場合はタンブリングウィンドウが作成されます。

# 構文
window( <時刻型のカラム>, <ウィンドウの長さ>[, <スライディングウィンドウの間隔>[, <オフセット>]] )

# 例文
from pyspark.sql import functions as F

df_group = df2.groupBy( F.window("end_time", "5 days", "1 day" ) ).agg( F.avg("humidity").alias("average_humidity") )
display( df_group )
window average_humidity
1 {"start": "2017-04-11T00:00:00.000+0000", "end": "2017-04-16T00:00:00.000+0000"} 61.60972716488731
2 {"start": "2016-08-30T00:00:00.000+0000", "end": "2016-09-04T00:00:00.000+0000"} 62.39860950173812

(以下略)

2-5-2 セッションウィンドウについて統計値を集計する

session_window()関数を使って、セッションウィンドウを作成します。groupBy()の引数としてセッションウィンドウを与えることで、そのセッションウィンドウについて統計値を集計することができます。

参考:Spark構造化ストリーミングにおけるセッションウィンドウのネイティブサポート

# 構文
session_window( <時刻型のカラム>, <セッションのタイムアウト時間> )

# 例文
from pyspark.sql import functions as F

df_group = df2.groupBy( F.session_window( "end_time", "1 hour" ) ).count()
display( df_group )
session_window count
1 {"start": "2016-03-20T03:27:37.620+0000", "end": "2016-04-27T07:21:58.678+0000"} 6659
2 {"start": "2016-04-27T07:42:01.350+0000", "end": "2016-05-01T02:43:31.451+0000"} 679

(以下略)

8
4
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
8
4