LoginSignup
9
6

More than 1 year has passed since last update.

FS2 Stream の作り方アラカルト

Last updated at Posted at 2019-09-28

純粋関数型の Streaming IO ライブラリ FS2 でいろんなストリームを作ってみる。

はじめに

Scala 開発で、http4sdoobie といった関数型なライブラリがもっと使われるようになってほしいのだけど、だんだん普及してきた気がする一方で1、いまいち伸び悩んでる気もする2

仮にそうだとして理由を考えてみると、関数型プログラミング自体の難度だけではなく、http4s や doobie でも使われている Streaming I/O ライブラリ FS2 の難しさにも一因がある気がしないでもない3

そこで FS2 入門者向けの記事を書いてみる。Cats/Cats Effect が何となくわかれば読めると思う。

切り口として、そもそもストリームを使うには、まずストリームを得ることができなければいけないので、ここではストリームの作り方だけいろいろ集めてみる。その過程でストリームがどんなものかもイメージしやすくなればいいと思う。

★ 2022/06/21 に、Scala 3系、Cats 3系に書き直した。

趣向

以下のような各種のデータソースから、Stream[F, String] 型のストリームを作ってみる。

  • 単一の値
  • 複数の値
  • Either
  • エフェクトF[_]内の値
  • 標準入力
  • ファイル
  • タイマー
  • DBへのクエリ結果
  • HTTPレスポンス
  • Kafka コンシューマ

方針として、以下の共通コードの抽象メソッド stream をデータソースごとに実装し、run メソッドで5要素だけとって標準出力に表示してみる 4ソース)。

trait StreamDemoApp extends IOApp.Simple:
  def run: IO[Unit] = stream[IO]
    .take(5)
    .map(s => s"$s\n")
    .through(text.utf8.encode)
    .through(io.stdout)
    .compile.drain

  def stream[F[_]: Async] : Stream[F, String]

Scala やライブラリのバージョンはこの辺り

実装

一つの値から

まずは一番単純に、与えられた一つの値からストリームを作ってみる。

与えられた値から、要素が一つだけのストリームを作るには、emit を使う。

Stream.emit("apple")

このように型アノテーションをつけない場合、型は Stream[Pure, String] となるが、type Pure[A] <: Nothing のように定義されているので、任意の F[_]Stream[F, String] に変換できる。

与えられた一つの値を繰り返し流すストリームを作るには、constant を使う。

Stream.constant("apple")

これだけだと "apple" を無限に繰り返すストリームになるが、最初に提示した共通コード上で実行すると "apple" を5回出力して終了する。

ソース

複数の値から

fs2.Stream のコンストラクタは可変長引数を受け取るので、これを使うと与えた要素が順に流れてくるストリームが作れる。

Stream("apple", "banana", "chocolate")

Seq 上の複数要素からストリームを作るには メソッド emits を使う。例として定番のフィボナッチでやってみると以下のようになる。

val fib = LazyList.iterate((0, 1))((a, b) => (b, a + b)).take(100)
Stream.emits(fib).map(_._1.show)

Stream.fromIterator を使うと Iterator をストリームに変換できる。後述のファイルからストリームを得るサンプルコードでは、scala.io.Source#getLines で得た Iterator からストリームを作っている。

ソース

Either から

Stream.fromEitherEither[Throwable, A] からストリームが得られる。Right値 から Stream[F, String] を得るには以下のようになる。

Stream.fromEither[F]("hello".asRight)

Left 値からも同様に Stream が得られるが、そのまま先の共通コードで評価すると実行時例外が送出される。

Stream.fromEither[F](Exception("test").asLeft)

実は fromEitherF[_] は、ApplicativeError であることが前提となっていて、例外は ApplicativeError[F] の文脈で保持されているので、.compile.drain などで F[_] にしてから、Cats の applicativeError 構文でハンドリングできる。たとえば下例のようになる。

object FromEitherIO extends IOApp.Simple:
  def run: IO[Unit] =
    Stream.fromEither[IO](Exception("test").asLeft)
      .compile.drain
      .handleErrorWith(_.getMessage pipe IO.println)

ソース

F[A]

エフェクト F[_] に入った A から Stream[F, A] を得るには、Stream.eval を使う。
例えば Stream.eval(IO("Hello")) とすると、型 Stream[IO, String] のストリームが得られる。

F[Seq[_]] を値域とする evalSeq も提供されていて、例えば以下のように書ける。

val strings: F[List[String]] = Sync[F].delay(List("apple", "banana", "chocolate"))
Stream.evalSeq(strings)

ソース

標準入力から

標準入力からのストリームを得るには fs2-io モジュールの fs2.io.stdin* メソッドを使う。以下のコードは、入力した文字列をそのままエコーする。5

io.stdinUtf8(4096).through(text.lines)

ソース

ファイルから

例えば、

ファイル内のテキストを一行ずつ区切ってStream[F, String]とする。ただし使用後のファイルは確実に閉じたい。

といった状況で、Cats Effect の Resource が使える。

F[A] が F[_]: SyncA <: AutoCloseable であれば Resource.fromAutoCloseable を使って F[A] から Resource が生成でき、得られた ResourceStream.resource でストリームにできる。

例えば以下のように書ける。

val file: Resource[F, Source] =
  Resource.fromAutoCloseable(sync[F].delay(Source.fromFile("README.md")))

Stream.resource(file) >>= (s => Stream.fromIterator(s.getLines, 4096))

ソース

タイマーから

たとえば次のような「2秒スリープしてスリープ前後の時刻をエミットする」ようなタスクもストリームとして表現できる。

val currentSec: F[String] =
  Clock[F].realTime.map(n => (n.toSeconds % 60).toString)

val task: Stream[F, String] = for {
  s <- Stream.eval(currentSec)    // 開始秒
  _ <- Stream.sleep[F](2 seconds) // 2秒スリープ
  e <- Stream.eval(currentSec)    // 終了秒
} yield s"$s ---> $e"             // 開始秒と終了秒を書式化

さらに FS2 のタイマー関連メソッドを使うとこれを定期実行することもでき、たとえば fixedDelay を使うと以下のように書ける。

Stream.fixedDelay[F](3 seconds) zipRight task.repeat // タスクが終了するごとに3秒待つ

repeat で task を無限に繰り返すストリームにした上で、3秒間隔の fixedDelayzipRight しており、実行すると以下のような結果になる。

13 ---> 15
18 ---> 20
23 ---> 25
28 ---> 30
33 ---> 35

ソース

DBクエリ結果から

doobie を使うと、データベースへのクエリ結果をストリームとして得ることができる。たとえば以下のように書ける。

val xa = Transactor.fromDriverManager[F](
  "org.postgresql.Driver", // driver classname
  "jdbc:postgresql:world", // connect URL (driver-specific)
  "postgres",              // user
  ""                       // password
)
sql"select name from country where population > 50000000 order by population desc"
  .query[String] // Query0[String]
  .stream        // Stream[ConnectionIO, String]
  .transact(xa)  // Stream[F, String]

※ doobie サンプルの PostgreSQL 上の world DB を使った。セットアップ手順はここ

ソース

HTTP レスポンスから

http4s のクライアントを使って、Twitter のサンプルレスポンスをストリームにしてみる。

val req = Request[F](Method.GET, uri"https://stream.twitter.com/1.1/statuses/sample.json")

// リクエストに署名するメソッド(詳細は下に別記)
val sign: Request[F] => F[Request[F]] = ???

for {
  client    <- BlazeClientBuilder[F].stream // Blaze クライアントを得る
  signedReq <- Stream.eval(sign(req))   // eval で F[Request] から Stream[F, Request]に
  res       <- client.stream(signedReq) // Stream[F, Response[F]] を得る
                 .flatMap(_.body.chunks.parseJsonStream) // Stream[F, Json]を得る
} yield res.spaces2 // いい感じに文字列化する
【補足】リクエストの署名の仕方
以下のようなコードで Request に署名できる。
val env = (F: Async[F]) ?=> (key: String) => F.delay(
  sys.env.get(key).toRight(RuntimeException(s"no env value for key: $key"))
) >>= F.fromEither

val sign: Request[F] => F[Request[F]] = req => for {
  consumerKey    <- env("consumerKey")
  consumerSecret <- env("consumerSecret")
  accessToken    <- env("accessToken")
  accessSecret   <- env("accessSecret")
  signedReq      <- oauth1.signRequest(
    req                = req,
    consumer           = Consumer(consumerKey, consumerSecret),
    token              = Token(accessToken, accessSecret).some,
    realm              = None,
    timestampGenerator = Timestamp.now,
    nonceGenerator     = Nonce.now)
  } yield signedReq

実際に動かすには、Twitter のディベロッパーアカウントで App を作って、環境変数に consumerKey, consumerSecret, accessToken, accessSecret を設定しておく。

ソース

Kafka Consumer から

fs2-kafka を使うと、Kafka の Consumer からストリームを得ることができる。例えば以下のようなコードになる。

val consumerSettings = ConsumerSettings[F, String, String]
  .withAutoOffsetReset(AutoOffsetReset.Earliest)
  .withBootstrapServers("localhost:9092")
  .withGroupId("group")

 KafkaConsumer.stream(consumerSettings)
   .evalTap(_.subscribeTo("topic"))
   .flatMap(_.stream)  // ここで KafkaConsumer から stream を得る
   .map(c => s"${c.record.key}->${c.record.value}")

上のコードを起動すると待ち状態になるので、たとえば Python の REPL 上で Producer を作ってメッセージを送ると、一個ずつ読み込んでコンソールに表示し、5つのメッセージを処理すると終了する。下のコードでは kafka-python を使った例。

>>> from kafka import KafkaProducer
>>> producer = KafkaProducer(bootstrap_servers='localhost:9092')
>>> producer.send('topic', key=b'kkk', value=bytes(f"hello", 'utf-8'))
【補足】Docker で Kafka を起動するやり方
docker-compose.yaml に以下のように書いておけば、`$ docker-compose up -d` で手軽にすぐ起動できる。
services:
  ...
  zookeeper:
    image: wurstmeister/zookeeper
    ports:
      - "2181:2181"
  kafka:
    image: wurstmeister/kafka
    ports:
      - "9092:9092"
    environment:
      KAFKA_ADVERTISED_HOST_NAME: 127.0.0.1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181

ソース

  1. Scala 2019(3月) のアンケートでは、RDBMS ライブラリ部門で Doobie が初めて一番人気になったという。

  2. 同アンケートで、http4s/fs2 の得票はまだまだ akka http/streams より低い。ちなみに日本の Scala 案件/求人情報なんかだと、akka http どころか未だに Play が大半だったりする。

  3. 自分が『FP in Scala』で勉強していたときも、'Chapter 15. Stream processing and incremental I/O' の辺りが特にむずかしかった気がする。

  4. 旧サンプルでは IOApp を継承していたが、Cats Effect 3 では、このように IOApp.Simple が使えて少しスッキリする。。

  5. 旧fs2 では、io.stdinを使うためにBlockerが必要だったが、現在は不要。

9
6
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
9
6