2
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

ScalaのZIOやZQueryについて日々の知見

Last updated at Posted at 2023-03-14

概要

SalaのZIOやZQueryで出てくる色んな方や日々の発見について自分用のメモした記事です。
随時更新していければと思います。

型について

ZIOのChunk型について

ZIOは、Scala向けの効率的で安全な非同期プログラミングライブラリで、Chunk型はその中で使用されるデータ構造の1つです。

Chunkは、メモリの塊を表現する不変のデータ構造で、大量のデータを効率的に処理するために設計されています。
Chunkは、リストや配列などの他のデータ構造と同様に、複数の要素を保持できますが、一般的には連続するメモリ領域に要素を格納することで高速なデータアクセスを可能にします。

Chunkは、特にZIOの並列処理や非同期I/O処理において、高速なデータの読み書きや変換を可能にするために使用されます。
また、Chunkは不変であるため、同期やロックなどの同期の必要性を減らし、並列処理をより簡単に実現できます。

例えば、以下のようにChunkを使用して、大量のデータを並列で処理することができます。

.scala
import zio._
import zio.stream._

val data: Chunk[Int] = Chunk(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)

val processed: Task[Chunk[Int]] =
  ZStream.fromChunk(data)
    .mapMPar(4)(i => Task(i * 2))
    .runCollect

この例では、Chunkに格納された整数のリストを4つの並列タスクで2倍に変換し、最終的に処理された整数のリストを収集しています。

ZQueryのバッチ処理: DataSource.fromFunctionBatchedMについて

DataSource.fromFunctionBatchedMは、ZIO-Queryのデータソース構築用の関数の1つであり、
効率的なバッチ読み込みを可能にする方法を提供します。
この関数は、バッチ単位で要素を読み込む関数を提供することで、バッチ処理を可能にするデータソースを構築します。

DataSource.fromFunctionBatchedMを使用してデータソースを構築するには、まず、各要素の識別子と、バッチ単位で要素を読み込む関数を定義する必要があります。

たとえば、以下のような識別子と関数を持つデータソースを定義することができます。

.scala
  case class User(id: Long, name: String)

  val users = List(
    User(1L, "Alice"),
    User(2L, "Bob"),
    User(3L, "Charlie"),
    User(4L, "David"),
    User(5L, "Eve"),
  )

  def loadUsers(ids: Chunk[Long]): Task[Chunk[User]] = Task {
    ids.flatMap(id => users.find(_.id == id))
  }

loadUsersは、Chunk[Long]の識別子のバッチを受け取り、その識別子に対応する要素のバッチを返すタスクを返す関数です。
この関数は、usersリストから、識別子に対応する要素を検索して返します。

DataSource.fromFunctionBatchedMを使用して、
上記のloadUsers関数からバッチ処理可能なデータソースを構築するには、以下のようにします。

.scala
  case class UserRequest(id: Long) extends Request[Throwable, User]
  val usersDataSource: DataSource[Any, UserRequest] =
    DataSource.fromFunctionBatchedM("Users") { ids: Chunk[UserRequest] =>
      for {
        users <- loadUsers(ids.map(_.id))
      } yield {
        users
      }
    }.batchN(1000)

DataSource.fromFunctionBatchedMの第1引数は、データソースの名前を示す文字列です。
第2引数は、バッチ単位で要素を読み込む関数です。この関数は、Chunk[Long]の識別子のバッチを受け取り、その識別子に対応する要素のバッチを含むマップを返します。

このデータソースを使用して、ZQueryを構築することができます。
たとえば、以下のように、usersDataSourceを使用して、ユーザーのIDに対応するユーザー名を取得するクエリを定義することができます。

.scala
  val userNameQuery: ZQuery[Any, Throwable, Set[User]] =
    ZQuery.foreachPar(List(1L, 2L, 3L, 4L, 5L).toSet)(x => ZQuery.fromRequest(UserRequest(x))(usersDataSource))

このクエリは、ZQuery.foreachParを使用して、ユーザーIDのセットを取得し、
それぞれのIDに対応するユーザーの名前をデータソースから取得します。
ZQuery.foreachParは、ユーザーIDのセットをバッチに分割し、並列でデータソースから要素を取得するため、効率的なバッチ読み込みが行われます。

userNameQueryを実行するには、ZIO環境を提供する必要があります。
たとえば、以下のようにして、ZIO Appを使用して実行できます。

.scala
object Example extends App {
  override def run(args: List[String]): ZIO[ZEnv, Nothing, ExitCode] = {
    val program = for {
      userNames <- userNameQuery.provideLayer(Blocking.live).run
      _ <- console.putStrLn(userNames.toString)
    } yield ()

    program.exitCode
  }
}

この例では、provideLayerメソッドを使用して、Blockingレイヤーを提供し、ZQueryを実行します。console.putStrLnを使用して、ユーザー名のマップを標準出力に出力します。

2
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?