はじめに
業務アプリケーションの開発に携わっているとCSV形式でデータを取り扱う場面は多くあると思います。
一般的にCSV形式のデータを取り扱うユースケースでは、分析用途など大規模なデータであることが多いと思います。
大規模データの場合、全てのデータを一括で操作するとデータ量に応じてメモリの圧迫やアプリケーションに負荷をかけることが懸念されます。この課題はStreamでデータを取り扱うことでデータを一括で処理することなく逐次的に処理することが可能です。
本記事ではCSV出力処理をStream処理する方法とメリットについてご紹介します。
Stream処理とは
ストリーム処理とは、大量のデータを一度に取り扱うのではなく、小さな単位(chunk)に分けて順次、逐次処理を行う仕組みです。
CSV出力におけるStream処理の例
例えば、大規模データをCSVファイルとして出力するケースを考えます。
Stream処理でない場合
- 全てのデータをデータベースから読み込み
- データをCSV形式に変換
- 一時的な書き込み領域に格納
- ユーザーにレスポンスを返す
上記の場合、以下の課題があります。
- メモリの圧迫
- データベースから読み込んだ大量のデータをメモリが保持するためメモリ領域を圧迫します
- CSV出力の遅延
- 大量のデータを全て処理してからユーザーに返すため、ユーザーの待ち時間が長くなります
- 書き込み領域の用意
- 一時的なCSVファイルの書き込み先を用意する必要があり、ストレージを圧迫する要因となります
Stream処理の場合
- データをデータベースから行ごとに取得
- データを都度CSV形式に変換
- 変換データを出力先に送る(HTTPレスポンスなど)
データベースからの取得処理をchunkに分けて取り扱います。
そのため、メモリ効率が向上しより効率的にデータを取り扱うことが可能となります。
また、一時的な書き込み領域を用意する必要もなく、I/Oコストやストレージの圧迫を防ぐことが可能です。
変換データを出力先に送るタイミングも全てのデータを処理してから返す必要がなく、完了したデータから都度出力先に返すことが可能です。そのためユーザーにより早く結果を返すことが可能となります。
Scalaにおける実装
私は開発言語としてScalaが好きなので、Scalaにおける実装例をご紹介します。
ScalaにおけるStream処理の代表格となるライブラリとしてAkka Streamがあります。Akkaのエコシステムの一部でありバックプレッシャーの仕組みや、Source(データの起点)、Flow(データの変換、処理)、Sink(データの終点)を組み合わせて柔軟にパイプラインを実現可能です。
ただし、Akkaプロジェクトは2022年にBSLライセンスへ移行が行われ商用プロジェクトでAkkaを利用する場合、一定以上の売り上げがある企業にはライセンス料の支払いが必要となりました。そのため、本記事ではオープンソースであるApache PekkoというAkkaのフォークライブラリを引用し実装例をご紹介します。
実装例
@Singleton
class CsvStreamController @Inject()(val controllerComponents: ControllerComponents) extends BaseController {
def exportCsv() = Action { implicit request: Request[AnyContent] =>
implicit val system: ActorSystem = ActorSystem("csvStreamer")
implicit val ec: ExecutionContext = system.dispatcher
val dummyData = (1 to 100).map(i => DummyData(UUID.randomUUID(), "test", i))
val fromDbData: Source[DummyData, _] = Source(dummyData)
def toCsvLine[T](data: T)(using csvWritable: CsvWritable[T]): String =
csvWritable.toFields(data).mkString(",") + "\n"
val headerSource: Source[ByteString, _] = Source.single(ByteString("id,name,age\n"))
val csvFlow: Flow[DummyData, ByteString, _] =
Flow[DummyData].map(data => ByteString(toCsvLine(data)))
val csvSource: Source[ByteString, _] = headerSource.concat(fromDbData.via(csvFlow))
Ok.chunked(
csvSource,
Some("text/csv")
)
}
}
データの取得
実装例では簡略化のために擬似的に用意した、DummyData型をSourceに展開して定義しています。
val dummyData = (1 to 100).map(i => DummyData(UUID.randomUUID(), "test", i))
val fromDbData: Source[DummyData, _] = Source(dummyData)
実際にデータベースからデータを取得する場合にはデータをStreamとして取得することが必要です。例えば、ScalaのDBライブラリであるSlickの場合ではStreaming Actionが提供されており、クエリの結果をStreamとして取得することが可能です。
// DBからデータをstreamとして取得
val quqery = for (c <- dummy_datas) yield c.name
val records = query.result
val publisher: DatabasePublisher[DummyData] = db.stream(records)
DBから取得した値はDatabasePublisher型で定義されます。この型はPekko StreamのSource.fromPublisherメソッドに渡すことでSourceに変換することが可能です。
// DatabasePublisherをPekko Streamsに載せる
val source: Source[DummyData, _] = Source.fromPublisher(publisher)
Sourceの定義
Pekko StreamではSource(データの起点)、Flow(データの変換、処理)、Sink(データの終点)のパイプラインを組むことでデータの流れを定義することができます。
基礎概念については下記の記事が分かりやすかったです。
Sourceとはデータの起点を表す概念となります。例えば、CSVの場合にはCSVヘッダーの値とレコードの値を用意する必要があり、その値をSourceとして定義します。
また、データベースから取得したデータについては、CSV形式に変換する必要性があります。そのためヘッダーのSourceとデータベースから取得したデータのSourceを分けて管理し、結合することで1つのSourceを生成することが可能です。
val headerSource: Source[ByteString, _] = Source.single(ByteString("id,name,age\n"))
val csvFlow: Flow[DummyData, ByteString, _] =
Flow[DummyData].map(data => ByteString(toCsvLine(data)))
val csvSource: Source[ByteString, _] = headerSource.concat(fromDbData.via(csvFlow))
上記の処理ですが、csvFlowの処理が全て完了してからconcatが走るわけではありません。
headerSource.concat(fromDbData.via(csvFlow)) によって、以下のような流れが定義されます。
- headerSourceのデータを消費する: ストリームの最初の要素としてヘッダーが下流に流れます
- fromDbDataのデータを消費する: fromDbData(データベースからのソース)の各要素が csvFlow を通過し、1 行ずつCSVの文字列として変換され下流に流れます
HTTP ResponseでCSVを返す
ScalaでHTTPサーバーを作成する場合にはPlayframeworkがフレームワークとして一般的です。PlayはHTTPレスポンスとしてScala Streamをサポートしています。大量のデータをクライアントに送信する場合、Playはデータを順次読み込みレスポンスとして返すことが可能です。
Chunked Response
本記事ではChunked Responseを使用してHTTP Responseとして返す例を紹介します。
Chunked Responseはメリットとしてデータを即座にChunkとして返すことが可能な反面、データのcontent sizeをブラウザが把握できないため、プログレスバーなどを実装する場合に注意が必要となります。
メソッドの定義は下記となります。引数の先頭にあるcontentの型がSourceとなっている点に注目してください。Playの3系からはPekko Streamに対応するSourceが引数の型として定義されているため、Pekko Streamで作成したSourceを受け渡すことが可能です。
def chunked[C](content: Source[C, _], contentType: Option[String] = None)(
implicit writeable: Writeable[C]
): Result = {
Result(
header = header,
body = HttpEntity
.Chunked(content.map(c => HttpChunk.Chunk(writeable.transform(c))), contentType.orElse(writeable.contentType))
)
}
実際にレスポンスを返す箇所の処理は以下となります。Pekko Streamで定義したSourceの値を直接受け渡すことでHTTPレスポンスをChunked形式で返すことが出来ています。
Ok.chunked(
csvSource,
Some("text/csv")
)
まとめ
この記事ではCSVの出力処理をStream処理する利点と実装例についてご紹介しました。
実装を進めるには概念が難しいものではありますが、Akkaのエコシステムによってシンプルに実装することが可能です。
Stream処理は大規模データに相性が良くメリットも多くあるため気になった方はぜひ試してみて頂ければ幸いです。
おまけ
型クラスの利用
データをCSV形式に変換する場合には、型クラスを利用することで既存のデータの型に影響を与えることなくCSVの変換処理を実装可能です。これにより型と振る舞いを分離して定義を追加することが可能です。
trait CsvWritable[T] {
extension (t: T)
def toFields: Seq[String]
}
object CsvWritable {
given CsvWritable[DummyData] with {
extension (value: DummyData)
def toFields: Seq[String] = Seq(value.id.toString, value.name, value.age.toString)
}
}
型クラスを使用しないケースだと下記のようになります。この場合はDummyDataがCsvWritableを継承する必要があり、特定の振る舞いに型の定義が依存する形となり重要なドメインである場合は密結合となる懸念があります。型クラスをうまく利用することでこの状態を回避することが可能です。
case class DummyData(id: Int, name: String, age: Int) extends CsvWritable[DummyData] {
def toFields: Seq[String] = Seq(id.toString, name, age.toString)
}
利用する箇所では定義した型クラスをインポートして使用します。
import CsvWritable.given_CsvWritable_DummyData
def toCsvLine[T](data: T)(using csvWritable: CsvWritable[T]): String =
csvWritable.toFields(data).mkString(",") + "\n"