先日、Qiita に上がった記事『ページネーションを利用したAPIと再帰関数の相性が抜群だった』の中で、無限リストを使えないかという面白そうなお題があった。ちょうど何か FS2 を使った記事を書きたかったので、今回はそれをやってみる。
お題と方針
Google Calendar API を使って、以下のような趣向でカレンダーのイベントリストを取得してみる。
- Google で公開されている「日本の祝日」カレンダー1を対象とする。
- 祝日(=イベント)の数が十分に多い期間を指定、または期間指定なしで検索する。
- 5件ごとのページネーションで12件まで取得する。
- 祝日ストリームに終わりがあるかどうか分からない「てい」でやってみる。つまりページネーションを繰り返して全件取得してから12件だけ取り出すことは避けて、なるべく必要な分だけの API 呼び出しに抑える。最短では、5件 + 5件 + 2件 = 12件で 3回の呼び出しになる。
- Scala 標準の Stream と、FS2 の Stream の2つの方式で実装してみる。
※ 使用するライブラリのバージョンはこのあたり
実装
見通しを良くするため、Java で書かれた Google Calendar API へのアクセスは、以下のような専用のモジュールに隔離しておく2。使う側では calendar.retrieveEvents(...)
を、EventsFunc
型(下記コード参照)の関数として扱う形になる。
object CalendarApiOps {
import collection.JavaConverters._
... 略
val calendar: Calendar = new Calendar.Builder(httpTransport, jacksonFactory, credential)
.setApplicationName(appName)
.build()
type PageToken = String
type EventsFunc = Option[PageToken] => Events
implicit class CalendarOps(val cal: Calendar) extends AnyVal {
def retrieveEvents(calendarId: String, modifier: EventsModifier = identity): EventsFunc =
maybeToken => {
println("********* calling API ***********") // カレンダーAPI呼び出し確認用
modifier(cal.events().list(calendarId))
.setTokenOrNull(maybeToken)
.setMaxResults(5) // 5件ずつ Event を得る
.execute()
}
}
}
Scala 標準 Stream 版
Scala 標準 Stream
は Haskell の Data.List
におおよそ対応するが、Haskell の方では以下のような関数 unfoldr がある3。簡単に言うと、リストを畳み込んで値を得る fold
と逆に、ある値から出発してリスト(無限かもしれない)を生成する働きの関数になるが、詳しいことはリンク先を読むとわかる。
unfoldr :: (b -> Maybe (a, b)) -> b -> [a]
ここでは Haskell 版のシグネーチャを参考にして定義しておく4。
def unfoldr[B, A](f: B => Option[(A, B)], b: B): Stream[A] = f(b) match {
case Some((a, newB)) => a #:: unfoldr(f, newB)
case None => Stream.empty
}
これを用いて祝日ストリーム Stream[Event]
を得る関数は以下のようになる。見通しを得るため Try
などでラップせずに直接 EventsFunc
型の関数を評価してみた。
def eventStream(f: EventsFunc): Stream[Event] = {
val g: Option[Events] => Option[Events] = { // 前回の f の結果にマッチ
case Some(e) if !e.hasNext => None // 前回トークンがなければ終了
case _ => f(prev >>= (_.nextToken)).some // unfoldr を継続
}
unfoldr[Option[Events], Events](g(_).map(e => (e, e.some)), None)
.flatMap(_.items.toStream)
}
以下のようなコードを実行して標準出力を見てみると、、、
eventStream(calendar.retrieveEvents(calendarId, _.setStartDate(start).setEndDate(end)))
.take(12)
.map(e => s"start:${e.getStart}, summary: ${e.getSummary}")
.foreach(println)
12件のみが3回の呼び出しで取得されていることが確認できる。
********* calling API ***********
start:{"date":"2018-01-01"}, summary: 元日
start:{"date":"2018-01-08"}, summary: 成人の日
start:{"date":"2018-02-11"}, summary: 建国記念の日
start:{"date":"2018-02-12"}, summary: 建国記念の日 振替休日
start:{"date":"2018-03-21"}, summary: 春分の日
********* calling API ***********
start:{"date":"2018-04-29"}, summary: 昭和の日
start:{"date":"2018-04-30"}, summary: 昭和の日 振替休日
start:{"date":"2018-05-03"}, summary: 憲法記念日
start:{"date":"2018-05-04"}, summary: みどりの日
start:{"date":"2018-05-05"}, summary: こどもの日
********* calling API ***********
start:{"date":"2018-07-16"}, summary: 海の日
start:{"date":"2018-08-11"}, summary: 山の日
Process finished with exit code 0
FS2 の Stream 版
次に Doobie や http4s などでも使われている、関数型ストリーミングライブラリ FS2 の Stream
を試してみる。ストリームの型は、エフェクトF[_]
を加味して fs2.Stream[F, Event]
のようになるが、F[_]
は Cats Effect の Sync
、ContextShift
のインスタンスを持つ必要がある。
前節で書いた unfoldr
相当の関数は Stream
のメソッドとして最初から提供されていて、F[_]
を除けば上で書いた Scala標準 Stream 版の eventStream
とほぼ同じコードで祝日ストリームを得ることができる。
implicit val SyncF: Sync[F]
implicit val ContextShiftF: ContextShift[F]
def eventStream(f: EventsFunc): Stream[F, Event] = {
val g: Option[Events] => Option[Events] = {
case Some(e) if !e.hasNext => None
case prev => f(prev >>= (_.nextToken)).some
}
Stream.unfold[F, Option[Events], Events](None)(g(_).map(e => (e, e.some)))
.flatMap(e => Stream(e.items: _*))
}
このストリームから 12個の祝日を取り出して標準出力に書き出すために、通常の println
を使えなくもないが、せっかくなので FS2 の fs2.io.stdin
と Cats Effect の Resource
を使ってみる。以下のようなコードになる。
def blockingEC: Resource[F, ExecutionContextExecutorService] =
Resource.make(
SyncF.delay(ExecutionContext.fromExecutorService(Executors.newFixedThreadPool(2)))
)(ec => SyncF.delay(ec.shutdown()))
def program(calendarId: String)(modifier: EventsModifier): Stream[F, Unit] =
Stream.resource(blockingEC) flatMap { ec =>
eventStream(calendar.retrieveEvents(calendarId, modifier))
.take(12)
.map(e => s"start:${e.getStart}, summary: ${e.getSummary}\n")
.through(text.utf8Encode)
.through(io.stdout(ec))
}
実行するには F[_]
を具体的に与える必要があるが、例えば IO
だとすれば以下のように書ける。
object FS2main extends IOApp with Program[IO] {
val SyncF: Sync[IO] = Sync[IO]
val ContextShiftF: ContextShift[IO] = ContextShift[IO]
...
def run(args: List[String]): IO[ExitCode] =
program(calendarId)(_.setStartDate(start).setEndDate(end))
.compile
.drain
.as(ExitCode.Success)
}
動かしてみると、ほとんど同じような実装ながら FS2 の場合一回だけ多く先読みして、都合 4回呼ばれているのがわかる(コメントは後で書き足した)。
********* calling API *********** // 1) '18年元旦から'18年春分の日まで
********* calling API *********** // 2) '18年昭和の日から'18年こどもの日まで
start:{"date":"2018-01-01"}, summary: 元日
start:{"date":"2018-01-08"}, summary: 成人の日
start:{"date":"2018-02-11"}, summary: 建国記念の日
start:{"date":"2018-02-12"}, summary: 建国記念の日 振替休日
start:{"date":"2018-03-21"}, summary: 春分の日
********* calling API *********** // 3) '18年海の日から'18年秋分の日まで
start:{"date":"2018-04-29"}, summary: 昭和の日
start:{"date":"2018-04-30"}, summary: 昭和の日 振替休日
start:{"date":"2018-05-03"}, summary: 憲法記念日
start:{"date":"2018-05-04"}, summary: みどりの日
start:{"date":"2018-05-05"}, summary: こどもの日
********* calling API *********** // 4) '18年体育の日から'18年天皇誕生日振替休日まで
start:{"date":"2018-07-16"}, summary: 海の日
start:{"date":"2018-08-11"}, summary: 山の日
Process finished with exit code 0
謝辞
@DaisukeKURATA お題お借りしました。ありがとうございました。
-
Calendar ID = ja.japanese#holiday@group.v.calendar.google.com ↩
-
クラスパスに credentials.json がないと NullPointerException が発生したりするが、そのあたりは適当に妥協した。 ↩
-
余談ながら Project Euler のような数学系のプログラミングパズルだと、お題となる無限数列から何かを探したり、合計したりする問題が多いので、Haskell で挑戦していると
unfoldr
は結構多用する。 ↩ -
Scalaz では同様の関数が提供されている ↩