scalaz.zio.stream
scalaz.zioのストリーム処理用package
scalaz-zio-streams に含まれている
ZIOについてはマイクロサイトと他の方が書いてくださっている記事を読めばだいたい理解できそうな感じだったので
本記事ではStreamに重点を置き、各APIを解説してみる
ZIOについても必要な部分では少し触れる
全体
zioのStreamは主に ZStream [source] と ZSink[source] と言うクラスによって構成されている
簡単に説明するならば ZStream はPublisherで ZSink はConsumerだ
それぞれ Stream Sink という型第一引数を Any にしたエイリアスが提供されている(型第一引数はZIOのDI的機能を利用するときに必要だが、使わない場合は気にしなくていい、本記事では説明しない)
Streamの実行
基本的には Stream#run 関数に Sink を渡すことで ZIO インスタンスを得ることができる
ZIO インスタンスとは Runtime#unsafeRun に渡すなどで実行することのできる再利用可能なタスクのようなもの
ZIOアプリケーション( scalaz.zio.App を継承したメインクラス)ではアプリケーション全体がひとつの大きなZIOインスタンスとなる(これはHaskellのmainメソッドが大きなIOモナドであることと同じ)
Publisher
この項ではStreamのファクトリメソッドを紹介していく
- fromIterable
def fromIterable = {
Stream.fromIterable(1 to 3).foreach{ i => putStr(s"$i ") } // run task => 1 2 3
}
引数の scala.collection.Iterable の要素を一つずつ渡すStreamを生成する
すべての要素を渡し終えるとStreamが終了する
foreach はStreamを実行可能状態にする一番カンタンな方法(多分)
- bracket
def bracket(filePath: String) = {
Stream.bracket(openFile(filePath)) { bf => IO.effectTotal(bf.close()) } { bf => IO.effect(readNext(bf)) }
.foreach(putStrLn) // output file content for std out
}
外部リソースから要素を取得するStreamを作成する
第一引数はリソースの確保、第二引数はリソースの開放処理、第三引数はリソースから要素を取得する処理になる
第三引数の関数の戻り値は ZIO[R, E, Option[A]] となり、取得した要素をOptionで渡し、値がNoneの場合はStreamが終了する
- fromQueue
def fromQueue = {
for {
q <- Queue.unbounded[Int]
f <- Stream.fromQueue(q).take(3).foreach{ i => putStr(s"$i ") } .fork
_ <- q.offer(1) *> q.offer(2) *> q.offer(3) *> q.offer(4)
_ <- f.join
} yield () // run task => 1 2 3
}
ZQueue より要素を取得する( ZQueue はZIOコンテキストで使えるインメモリキュー)
ZQueue がシャットダウンした場合にStreamが終了するが、基本的に永続的に流れ続けるStreamの上流として使うことが多いだろう
ZIO#fork は対象ZIOタスクを非同期で行うためのもの(Streamの実行はそのままではBlocking処理のため)
- fromEffect
def fromEffect = {
val effect = IO(100)
Stream.fromEffect(effect).foreach{ i => putStr(s"$i ") } // run task => 100
}
ZIOインスタンスの結果値より要素を取得する
ZIO[R, E, List[A]] を引数に渡しても List[A] 型の要素が一つだけ渡されるStreamが生成されることになる
Stream[E, List[A]] を Stream[A] に変換するのは後述の Stream#mapConcat を使うといい
Consumer
この項ではStreamに流れてくる要素を消費/収集するConsumerであるSinkのファクトリメソッドを紹介する
- collect
def collect_sink = {
pub.run(Sink.collect[Int]) // run task => List(1, 2, 3)
}
流れてきた要素をすべて収集し、Listとして返すコンシューマ
- readWhile
def readWhile = {
pub.run(Sink.readWhile[Int](_ < 3)) // run task => List(1, 2)
}
引数の関数がfalseになるまで収集するコンシューマ
Stream#run の引数として渡している場合は、falseになった場合Streamが残っていても処理を終了する
(後述するがSinkは Stream#run のほか Stream#transduce に渡して使用することもある)
- drain
def drain_sink = {
pub.tap { i => putStr(s"$i ") }.run(Sink.drain) // run task => List(1, 2)
}
流れてくる要素をすべて捨てる
Stream中に行われる副作用のみを必要としているときなどに使う
- identity
def identity = {
pub.run(Sink.identity[Int]) // run task => 1
}
要素を一つだけ収集するコンビネータ
Stream#run の引数として渡している場合は、1要素取得した時点でStreamが終了する
- foldLeft
def foldLeft = {
pub.run(Sink.foldLeft[Int, Int, Int](0)(_ + _)) // run task => 6
}
流れてくる要素に対して畳み込みを実行するコンビネータ
Operator
この項ではStreamのに流れる要素を加工するコンビネータ関数を紹介していく
- map
def map = {
pub.map(_ + 1).foreach{ i => putStr(s"$i ") } // run task => 2 3 4
}
いわゆる Functor#map
説明するまでもないが A => B の関数を引数に渡すことで流れてくる要素を加工することができる
- filter
def filter = {
pub.filter(_ % 2 != 0).foreach{ i => putStr(s"$i ") } // run task => 1 3
}
A => Boolean の関数を渡すことで流れてくる要素をフィルタリングする
- collect
def collect_stream = {
pub.collect {
case 1 => "one"
case 2 => "two"
}.foreach{ i => putStr(s"$i ") } // run task => one two
}
パターンマッチを使うことでFilterと値の加工の両方をすることができる
- take/drop
def take = {
pub.take(2).foreach{ i => putStr(s"$i ") } // run task => 1 2
}
def drop = {
pub.drop(2).foreach{ i => putStr(s"$i ") } // run task => 3
}
先頭からn個の要素のみを取得するStreamに変換する/先頭からn個の要素を捨てるStreamに変換する
- drain
def drain_stream = {
pub.tap { i => putStr(s"$i ") }.drain.run(Sink.collect) // run task => 1 2 3 (return is Nil)
}
流れてきた要素を捨てる(下流に流れなくする)
- ++
def ++ = {
(pub ++ Stream.fromIterable(4 to 6)).foreach{ i => putStr(s"$i ") } // run task => 1 2 3 4 5 6
}
ふたつのStreamを結合する、ただし、先頭Streamが流れ切らない限り、後続Streamは流れない(直列の合成)
- zip
def zip = {
pub.zip(Stream.fromIterable(4 to 6)).foreach{ i => putStr(s"$i ") } // run task => (1, 4) (2, 5) (3, 6)
}
ふたつのStreamを結合する、要素は2つのStreamから一つずつ取得したペアになる
- merge
def merge = {
import scalaz.zio.duration._
def countup(start: Ref[Int], q: Queue[Int]) = {
for {
i <- start.get
_ <- q.offer(i)
_ <- start.set(i + 1)
} yield ()
}
for {
q1 <- Queue.unbounded[Int]
q2 <- Queue.unbounded[Int]
s1Start <- Ref.make(1)
s2Start <- Ref.make(4)
_ <- countup(s1Start, q1).repeat(Schedule.spaced(1000.millis) && Schedule.recurs(4)).fork
_ <- countup(s2Start, q2).delay(500.millis).repeat(Schedule.spaced(1000.millis) && Schedule.recurs(4)).fork
_ <- Stream.fromQueue(q1).merge(Stream.fromQueue(q2)).take(6).foreach{ i => putStr(s"$i ") } // run task => 1 4 2 5 3 4
} yield ()
}
ふたつのStreamを結合する、Trigger自体もマージされ、流れる順番は保証されない(並列の合成)
- mapAccum
def mapAccum = {
pub.mapAccum(0) { case (s, i) => i -> s"$s -> $i"}.foreach{ i => putStr(s"$i, ") } // run task => 0 -> 1, 1 -> 2, 2 -> 3
}
Stream中に更新可能な状態付きのmap処理
- mapConcat
def mapConcat = {
Stream.fromIterable(List("abcd", "cdef")).mapConcat(s => Chunk.fromArray(s.toCharArray)).run(Sink.collect[Char]) // run task => List(a, b, c, d, c, d, e, f)
}
A => Chunk[B] の関数を渡すことで大きな塊の流れてくるStreamを小さな値のStreamに変換することができる
Chunk はZIOライブラリにて提供されている効率的な Array のようなもの
- transduce
def transduce = {
(pub ++ Stream(4)).transduce {
Sink.fold[Int, Int, List[Int]](Nil) { case (xs, i) => if (xs.length < 2) Step.more(i :: xs) else Step.done(xs, Chunk(i)) }.map(_.sum)
}.foreach{ i => putStr(s"$i ") } // run task => 3 7
// (pub ++ Stream(4)).transduce { // same behavior
// for {
// a <- Sink.identity[Int]
// b <- Sink.identity[Int]
// } yield a + b
// }.foreach{ i => putStr(s"$i ") } // run task => 3 7
}
Sinkを引数にわたすことでStream内の複数の値の合成ができるコンビネータ
Sinkを工夫することで様々な凝った加工ができる
一部のSinkはdoシンタックスで書ける(つまり、flatMapを実装している)ためコメントアウト箇所の用に直感的に記述することも可能になっている
- mapM
def mapM = {
for {
q <- Queue.unbounded[Int]
_ <- pub.mapM { i => q.offer(i).map(_ => s"offer $i") }.foreach{ i => putStr(s"$i ") }
res <- q.takeAll
} yield res // run task => offer 1 offer 2 offer 3 (return List(1, 2, 3))
}
通常のmapから渡す関数の戻り値が ZIO[R, E, A] になったもの
map時にZIOコンテキストでの処理を行うことができる
mapに限らず、他の関数にも渡す関数の戻り値が ZIO になっている末尾 M versionの関数が多く存在している、使い方は一緒
他にも様々な関数が存在しているが、ひとまず自分が調べたのは以上だ
最後に各種関数を実行するメインメソッドとそれを含むソースコードを貼り付けておく、よければ手元で動かしてみて頂けると嬉しいです、それによりさらなる理解が得られることを願います
object Main extends App {
import Examples._
def run(args: List[String]) = {
def withLog[R <: Clock with Console, E, A](name: String, f: ZIO[R, E, A]): ZIO[R, E, Unit] = {
for {
_ <- putStrLn(s"-- ${name}")
res <- f
_<- putStrLn(s"${name} res: $res \n")
} yield ()
}
(for {
_ <- withLog("fromIterable", fromIterable)
_ <- withLog("bracket", bracket("test.txt"))
_ <- withLog("fromQueue", fromIterable)
_ <- withLog("fromEffect", fromEffect)
_ <- withLog("collect_sink", collect_sink)
_ <- withLog("readWhile", readWhile)
_ <- withLog("drain_sink", drain_sink)
_ <- withLog("identity", identity)
_ <- withLog("foldLeft", foldLeft)
_ <- withLog("map", map)
_ <- withLog("filter", filter)
_ <- withLog("collect_stream", collect_stream)
_ <- withLog("take", take)
_ <- withLog("drop", drop)
_ <- withLog("++", ++)
_ <- withLog("zip", zip)
_ <- withLog("mapM", mapM)
_ <- withLog("mapConcat", mapConcat)
_ <- withLog("merge", merge)
_ <- withLog("mapAccum", mapAccum)
_ <- withLog("transduce", transduce)
} yield ()).fold(_ => 1, _ => 0)
}
}
- 結果出力
-- fromIterable
1 2 3 fromIterable res: ()
-- bracket
aaa
bbb
ccc
bracket res: ()
-- fromQueue
1 2 3 fromQueue res: ()
-- fromEffect
100 fromEffect res: ()
-- collect_sink
collect_sink res: List(1, 2, 3)
-- readWhile
readWhile res: List(1, 2)
-- drain_sink
1 2 3 drain_sink res: ()
-- identity
identity res: 1
-- foldLeft
foldLeft res: 6
-- map
2 3 4 map res: ()
-- filter
1 3 filter res: ()
-- collect_stream
one two collect_stream res: ()
-- take
1 2 take res: ()
-- drop
3 drop res: ()
-- ++
1 2 3 4 5 6 ++ res: ()
-- zip
(1,4) (2,5) (3,6) zip res: ()
-- mapM
offer 1 offer 2 offer 3 mapM res: List(1, 2, 3)
-- mapConcat
mapConcat res: List(a, b, c, d, c, d, e, f)
-- merge
1 4 2 5 3 4 merge res: ()
-- mapAccum
0 -> 1, 1 -> 2, 2 -> 3, mapAccum res: ()
-- transduce
3 7 transduce res: ()