LoginSignup
3
3

More than 3 years have passed since last update.

Web API のページネーションへの FS2 Stream の応用

Posted at

先日、Qiita に上がった記事『ページネーションを利用したAPIと再帰関数の相性が抜群だった』の中で、無限リストを使えないかという面白そうなお題があった。ちょうど何か FS2 を使った記事を書きたかったので、今回はそれをやってみる。

お題と方針

Google Calendar API を使って、以下のような趣向でカレンダーのイベントリストを取得してみる。

  • Google で公開されている「日本の祝日」カレンダー1を対象とする。
  • 祝日(=イベント)の数が十分に多い期間を指定、または期間指定なしで検索する。
  • 5件ごとのページネーションで12件まで取得する。
  • 祝日ストリームに終わりがあるかどうか分からない「てい」でやってみる。つまりページネーションを繰り返して全件取得してから12件だけ取り出すことは避けて、なるべく必要な分だけの API 呼び出しに抑える。最短では、5件 + 5件 + 2件 = 12件で 3回の呼び出しになる。
  • Scala 標準の Stream と、FS2 の Stream の2つの方式で実装してみる。

※ 使用するライブラリのバージョンはこのあたり

実装

見通しを良くするため、Java で書かれた Google Calendar API へのアクセスは、以下のような専用のモジュールに隔離しておく2。使う側では calendar.retrieveEvents(...) を、EventsFunc 型(下記コード参照)の関数として扱う形になる。

CalendarApiOps.scala
object CalendarApiOps {
  import collection.JavaConverters._

  ... 

  val calendar: Calendar = new Calendar.Builder(httpTransport, jacksonFactory, credential)
    .setApplicationName(appName)
    .build()

  type PageToken  = String
  type EventsFunc = Option[PageToken] => Events

  implicit class CalendarOps(val cal: Calendar) extends AnyVal {
    def retrieveEvents(calendarId: String, modifier: EventsModifier = identity): EventsFunc =
      maybeToken => {
        println("********* calling API ***********") // カレンダーAPI呼び出し確認用
        modifier(cal.events().list(calendarId))
          .setTokenOrNull(maybeToken)
          .setMaxResults(5) // 5件ずつ Event を得る
          .execute()
      }
  }
}

ソース: CalendarApiOps.scala

Scala 標準 Stream 版

Scala 標準 Stream は Haskell の Data.List におおよそ対応するが、Haskell の方では以下のような関数 unfoldr がある3。簡単に言うと、リストを畳み込んで値を得る fold と逆に、ある値から出発してリスト(無限かもしれない)を生成する働きの関数になるが、詳しいことはリンク先を読むとわかる。

unfoldr :: (b -> Maybe (a, b)) -> b -> [a]

ここでは Haskell 版のシグネーチャを参考にして定義しておく4

def unfoldr[B, A](f: B => Option[(A, B)], b: B): Stream[A] = f(b) match {
  case Some((a, newB)) => a #:: unfoldr(f, newB)
  case None            => Stream.empty
}

これを用いて祝日ストリーム Stream[Event] を得る関数は以下のようになる。見通しを得るため Try などでラップせずに直接 EventsFunc 型の関数を評価してみた。

def eventStream(f: EventsFunc): Stream[Event] = {
  val g: Option[Events] => Option[Events] = {                    // 前回の f の結果にマッチ
    case Some(e) if !e.hasNext => None                           // 前回トークンがなければ終了
    case _                     => f(prev >>= (_.nextToken)).some // unfoldr を継続
  }
  unfoldr[Option[Events], Events](g(_).map(e => (e, e.some)), None)
    .flatMap(_.items.toStream)
}

以下のようなコードを実行して標準出力を見てみると、、、

eventStream(calendar.retrieveEvents(calendarId, _.setStartDate(start).setEndDate(end)))
  .take(12)
  .map(e => s"start:${e.getStart}, summary: ${e.getSummary}")
  .foreach(println)

12件のみが3回の呼び出しで取得されていることが確認できる。

********* calling API ***********
start:{"date":"2018-01-01"}, summary: 元日
start:{"date":"2018-01-08"}, summary: 成人の日
start:{"date":"2018-02-11"}, summary: 建国記念の日
start:{"date":"2018-02-12"}, summary: 建国記念の日 振替休日
start:{"date":"2018-03-21"}, summary: 春分の日
********* calling API ***********
start:{"date":"2018-04-29"}, summary: 昭和の日
start:{"date":"2018-04-30"}, summary: 昭和の日 振替休日
start:{"date":"2018-05-03"}, summary: 憲法記念日
start:{"date":"2018-05-04"}, summary: みどりの日
start:{"date":"2018-05-05"}, summary: こどもの日
********* calling API ***********
start:{"date":"2018-07-16"}, summary: 海の日
start:{"date":"2018-08-11"}, summary: 山の日

Process finished with exit code 0

ソース: UnfoldMain.scala

FS2 の Stream 版

次に Doobie や http4s などでも使われている、関数型ストリーミングライブラリ FS2 の Stream を試してみる。ストリームの型は、エフェクトF[_] を加味して fs2.Stream[F, Event] のようになるが、F[_] は Cats Effect の SyncContextShift のインスタンスを持つ必要がある。

前節で書いた unfoldr 相当の関数は Stream のメソッドとして最初から提供されていて、F[_]を除けば上で書いた Scala標準 Stream 版の eventStream とほぼ同じコードで祝日ストリームを得ることができる。

implicit val SyncF: Sync[F]
implicit val ContextShiftF: ContextShift[F]

def eventStream(f: EventsFunc): Stream[F, Event] = {
  val g: Option[Events] => Option[Events] = {
    case Some(e) if !e.hasNext => None
    case prev                  => f(prev >>= (_.nextToken)).some
  }
  Stream.unfold[F, Option[Events], Events](None)(g(_).map(e => (e, e.some)))
        .flatMap(e => Stream(e.items: _*))
}

このストリームから 12個の祝日を取り出して標準出力に書き出すために、通常の println を使えなくもないが、せっかくなので FS2 の fs2.io.stdin と Cats Effect の Resource を使ってみる。以下のようなコードになる。

def blockingEC: Resource[F, ExecutionContextExecutorService] =
  Resource.make(
    SyncF.delay(ExecutionContext.fromExecutorService(Executors.newFixedThreadPool(2)))
  )(ec => SyncF.delay(ec.shutdown()))

def program(calendarId: String)(modifier: EventsModifier): Stream[F, Unit] =
  Stream.resource(blockingEC) flatMap { ec =>
    eventStream(calendar.retrieveEvents(calendarId, modifier))
      .take(12)
      .map(e => s"start:${e.getStart}, summary: ${e.getSummary}\n")
      .through(text.utf8Encode)
      .through(io.stdout(ec))
  }

実行するには F[_] を具体的に与える必要があるが、例えば IO だとすれば以下のように書ける。

object FS2main extends IOApp with Program[IO] {
  val SyncF:         Sync[IO]         = Sync[IO]
  val ContextShiftF: ContextShift[IO] = ContextShift[IO]
  ...

  def run(args: List[String]): IO[ExitCode] =
    program(calendarId)(_.setStartDate(start).setEndDate(end))
      .compile
      .drain
      .as(ExitCode.Success)
}

動かしてみると、ほとんど同じような実装ながら FS2 の場合一回だけ多く先読みして、都合 4回呼ばれているのがわかる(コメントは後で書き足した)。

********* calling API ***********     // 1) '18年元旦から'18年春分の日まで
********* calling API ***********     // 2) '18年昭和の日から'18年こどもの日まで
start:{"date":"2018-01-01"}, summary: 元日
start:{"date":"2018-01-08"}, summary: 成人の日
start:{"date":"2018-02-11"}, summary: 建国記念の日
start:{"date":"2018-02-12"}, summary: 建国記念の日 振替休日
start:{"date":"2018-03-21"}, summary: 春分の日
********* calling API ***********     // 3) '18年海の日から'18年秋分の日まで
start:{"date":"2018-04-29"}, summary: 昭和の日
start:{"date":"2018-04-30"}, summary: 昭和の日 振替休日
start:{"date":"2018-05-03"}, summary: 憲法記念日
start:{"date":"2018-05-04"}, summary: みどりの日
start:{"date":"2018-05-05"}, summary: こどもの日
********* calling API ***********     // 4) '18年体育の日から'18年天皇誕生日振替休日まで
start:{"date":"2018-07-16"}, summary: 海の日
start:{"date":"2018-08-11"}, summary: 山の日

Process finished with exit code 0

ソース: FS2Main.scala

謝辞

@DaisukeKURATA お題お借りしました。ありがとうございました。


  1. Calendar ID = ja.japanese#holiday@group.v.calendar.google.com 

  2. クラスパスに credentials.json がないと NullPointerException が発生したりするが、そのあたりは適当に妥協した。 

  3. 余談ながら Project Euler のような数学系のプログラミングパズルだと、お題となる無限数列から何かを探したり、合計したりする問題が多いので、Haskell で挑戦していると unfoldr は結構多用する。 

  4. Scalaz では同様の関数が提供されている 

3
3
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
3
3