純粋関数型の Streaming IO ライブラリ FS2 でいろんなストリームを作ってみる。
はじめに
Scala 開発で、http4s や doobie といった関数型なライブラリがもっと使われるようになってほしいのだけど、だんだん普及してきた気がする一方で1、いまいち伸び悩んでる気もする2。
仮にそうだとして理由を考えてみると、関数型プログラミング自体の難度だけではなく、http4s や doobie でも使われている Streaming I/O ライブラリ FS2 の難しさにも一因がある気がしないでもない3。
そこで FS2 入門者向けの記事を書いてみる。Cats/Cats Effect が何となくわかれば読めると思う。
切り口として、そもそもストリームを使うには、まずストリームを得ることができなければいけないので、ここではストリームの作り方だけいろいろ集めてみる。その過程でストリームがどんなものかもイメージしやすくなればいいと思う。
★ 2022/06/21 に、Scala 3系、Cats 3系に書き直した。
趣向
以下のような各種のデータソースから、Stream[F, String]
型のストリームを作ってみる。
- 単一の値
- 複数の値
- Either
- エフェクト
F[_]
内の値 - 標準入力
- ファイル
- タイマー
- DBへのクエリ結果
- HTTPレスポンス
- Kafka コンシューマ
方針として、以下の共通コードの抽象メソッド stream
をデータソースごとに実装し、run
メソッドで5要素だけとって標準出力に表示してみる 4(ソース)。
trait StreamDemoApp extends IOApp.Simple:
def run: IO[Unit] = stream[IO]
.take(5)
.map(s => s"$s\n")
.through(text.utf8.encode)
.through(io.stdout)
.compile.drain
def stream[F[_]: Async] : Stream[F, String]
Scala やライブラリのバージョンはこの辺り
実装
一つの値から
まずは一番単純に、与えられた一つの値からストリームを作ってみる。
与えられた値から、要素が一つだけのストリームを作るには、emit
を使う。
Stream.emit("apple")
このように型アノテーションをつけない場合、型は Stream[Pure, String]
となるが、type Pure[A] <: Nothing
のように定義されているので、任意の F[_]
のStream[F, String]
に変換できる。
与えられた一つの値を繰り返し流すストリームを作るには、constant
を使う。
Stream.constant("apple")
これだけだと "apple" を無限に繰り返すストリームになるが、最初に提示した共通コード上で実行すると "apple" を5回出力して終了する。
複数の値から
fs2.Stream
のコンストラクタは可変長引数を受け取るので、これを使うと与えた要素が順に流れてくるストリームが作れる。
Stream("apple", "banana", "chocolate")
Seq
上の複数要素からストリームを作るには メソッド emits
を使う。例として定番のフィボナッチでやってみると以下のようになる。
val fib = LazyList.iterate((0, 1))((a, b) => (b, a + b)).take(100)
Stream.emits(fib).map(_._1.show)
Stream.fromIterator
を使うと Iterator
をストリームに変換できる。後述のファイルからストリームを得るサンプルコードでは、scala.io.Source#getLines
で得た Iterator
からストリームを作っている。
Either から
Stream.fromEither
でEither[Throwable, A]
からストリームが得られる。Right値 から Stream[F, String]
を得るには以下のようになる。
Stream.fromEither[F]("hello".asRight)
Left 値からも同様に Stream が得られるが、そのまま先の共通コードで評価すると実行時例外が送出される。
Stream.fromEither[F](Exception("test").asLeft)
実は fromEither
の F[_]
は、ApplicativeError
であることが前提となっていて、例外は ApplicativeError[F]
の文脈で保持されているので、.compile.drain
などで F[_]
にしてから、Cats の applicativeError
構文でハンドリングできる。たとえば下例のようになる。
object FromEitherIO extends IOApp.Simple:
def run: IO[Unit] =
Stream.fromEither[IO](Exception("test").asLeft)
.compile.drain
.handleErrorWith(_.getMessage pipe IO.println)
F[A]
エフェクト F[_]
に入った A
から Stream[F, A]
を得るには、Stream.eval
を使う。
例えば Stream.eval(IO("Hello"))
とすると、型 Stream[IO, String]
のストリームが得られる。
F[Seq[_]]
を値域とする evalSeq
も提供されていて、例えば以下のように書ける。
val strings: F[List[String]] = Sync[F].delay(List("apple", "banana", "chocolate"))
Stream.evalSeq(strings)
標準入力から
標準入力からのストリームを得るには fs2-io モジュールの fs2.io.stdin*
メソッドを使う。以下のコードは、入力した文字列をそのままエコーする。5
io.stdinUtf8(4096).through(text.lines)
ファイルから
例えば、
ファイル内のテキストを一行ずつ区切って
Stream[F, String]
とする。ただし使用後のファイルは確実に閉じたい。
といった状況で、Cats Effect の Resource
が使える。
F[A] が F[_]: Sync
、A <: AutoCloseable
であれば Resource.fromAutoCloseable
を使って F[A] から Resource
が生成でき、得られた Resource
は Stream.resource
でストリームにできる。
例えば以下のように書ける。
val file: Resource[F, Source] =
Resource.fromAutoCloseable(sync[F].delay(Source.fromFile("README.md")))
Stream.resource(file) >>= (s => Stream.fromIterator(s.getLines, 4096))
タイマーから
たとえば次のような「2秒スリープしてスリープ前後の時刻をエミットする」ようなタスクもストリームとして表現できる。
val currentSec: F[String] =
Clock[F].realTime.map(n => (n.toSeconds % 60).toString)
val task: Stream[F, String] = for {
s <- Stream.eval(currentSec) // 開始秒
_ <- Stream.sleep[F](2 seconds) // 2秒スリープ
e <- Stream.eval(currentSec) // 終了秒
} yield s"$s ---> $e" // 開始秒と終了秒を書式化
さらに FS2 のタイマー関連メソッドを使うとこれを定期実行することもでき、たとえば fixedDelay
を使うと以下のように書ける。
Stream.fixedDelay[F](3 seconds) zipRight task.repeat // タスクが終了するごとに3秒待つ
repeat
で task を無限に繰り返すストリームにした上で、3秒間隔の fixedDelay
と zipRight
しており、実行すると以下のような結果になる。
13 ---> 15
18 ---> 20
23 ---> 25
28 ---> 30
33 ---> 35
DBクエリ結果から
doobie を使うと、データベースへのクエリ結果をストリームとして得ることができる。たとえば以下のように書ける。
val xa = Transactor.fromDriverManager[F](
"org.postgresql.Driver", // driver classname
"jdbc:postgresql:world", // connect URL (driver-specific)
"postgres", // user
"" // password
)
sql"select name from country where population > 50000000 order by population desc"
.query[String] // Query0[String]
.stream // Stream[ConnectionIO, String]
.transact(xa) // Stream[F, String]
※ doobie サンプルの PostgreSQL 上の world DB を使った。セットアップ手順はここ
HTTP レスポンスから
http4s のクライアントを使って、Twitter のサンプルレスポンスをストリームにしてみる。
val req = Request[F](Method.GET, uri"https://stream.twitter.com/1.1/statuses/sample.json")
// リクエストに署名するメソッド(詳細は下に別記)
val sign: Request[F] => F[Request[F]] = ???
for {
client <- BlazeClientBuilder[F].stream // Blaze クライアントを得る
signedReq <- Stream.eval(sign(req)) // eval で F[Request] から Stream[F, Request]に
res <- client.stream(signedReq) // Stream[F, Response[F]] を得る
.flatMap(_.body.chunks.parseJsonStream) // Stream[F, Json]を得る
} yield res.spaces2 // いい感じに文字列化する
【補足】リクエストの署名の仕方
val env = (F: Async[F]) ?=> (key: String) => F.delay(
sys.env.get(key).toRight(RuntimeException(s"no env value for key: $key"))
) >>= F.fromEither
val sign: Request[F] => F[Request[F]] = req => for {
consumerKey <- env("consumerKey")
consumerSecret <- env("consumerSecret")
accessToken <- env("accessToken")
accessSecret <- env("accessSecret")
signedReq <- oauth1.signRequest(
req = req,
consumer = Consumer(consumerKey, consumerSecret),
token = Token(accessToken, accessSecret).some,
realm = None,
timestampGenerator = Timestamp.now,
nonceGenerator = Nonce.now)
} yield signedReq
実際に動かすには、Twitter のディベロッパーアカウントで App を作って、環境変数に consumerKey, consumerSecret, accessToken, accessSecret を設定しておく。
Kafka Consumer から
fs2-kafka を使うと、Kafka の Consumer からストリームを得ることができる。例えば以下のようなコードになる。
val consumerSettings = ConsumerSettings[F, String, String]
.withAutoOffsetReset(AutoOffsetReset.Earliest)
.withBootstrapServers("localhost:9092")
.withGroupId("group")
KafkaConsumer.stream(consumerSettings)
.evalTap(_.subscribeTo("topic"))
.flatMap(_.stream) // ここで KafkaConsumer から stream を得る
.map(c => s"${c.record.key}->${c.record.value}")
上のコードを起動すると待ち状態になるので、たとえば Python の REPL 上で Producer を作ってメッセージを送ると、一個ずつ読み込んでコンソールに表示し、5つのメッセージを処理すると終了する。下のコードでは kafka-python を使った例。
>>> from kafka import KafkaProducer
>>> producer = KafkaProducer(bootstrap_servers='localhost:9092')
>>> producer.send('topic', key=b'kkk', value=bytes(f"hello", 'utf-8'))
【補足】Docker で Kafka を起動するやり方
services:
...
zookeeper:
image: wurstmeister/zookeeper
ports:
- "2181:2181"
kafka:
image: wurstmeister/kafka
ports:
- "9092:9092"
environment:
KAFKA_ADVERTISED_HOST_NAME: 127.0.0.1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
-
Scala 2019(3月) のアンケートでは、RDBMS ライブラリ部門で Doobie が初めて一番人気になったという。 ↩
-
同アンケートで、http4s/fs2 の得票はまだまだ akka http/streams より低い。ちなみに日本の Scala 案件/求人情報なんかだと、akka http どころか未だに Play が大半だったりする。 ↩
-
自分が『FP in Scala』で勉強していたときも、'Chapter 15. Stream processing and incremental I/O' の辺りが特にむずかしかった気がする。 ↩
-
旧サンプルでは
IOApp
を継承していたが、Cats Effect 3 では、このようにIOApp.Simple
が使えて少しスッキリする。。 ↩ -
旧fs2 では、
io.stdin
を使うためにBlocker
が必要だったが、現在は不要。 ↩