1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

ScalaAdvent Calendar 2024

Day 4

Scala 3 +fs2で 1.5TのWikidata JSONをParquetに変換した

Posted at

今年ある仕事のため Wikidata データセットからのデータマイニングをやってみました。Scalaで元データを変換する部分が中核ではないが面白かったので紹介したいと思います。

背景と入力データ

Wikidata はWikimedia Foundationのオープン知識データベースであり、ここでデータフォマットと簡単な例だけを紹介します。

EntityごとにStatementが複数ついていて、Statement内部にはProperty、Value、Reference(参考か出処)、Qualifier(そのStatementの限定条件)が複数存在します。Entity、Statement、PropertyがそれぞれID (P... または Q... が振られていて複雑なデータスキーマになっています。

やはり抽象すぎるので例を見てみましょう。例えば「Taylor Swiftという人間がいて、SNSでのフォロワー数が2023.12時点で9千万人ある」を含めたEntityだとこんなイメージです:

{
  entityId: 'Q12345',
  labels: {
    en: 'Taylor Swift',
  },
  statements: [
    {
      id: (UUID),
      propertyId: 職業
      value: {
        type: 'entity',
        value: 'Q334455' // 「歌手」EntityのID
    },
    {
      id: (UUID)
      propertyId: 'P32000', //  SNSフォロワー数
      value: {
        type: 'number',
        value: '9,000,000'
      },
      references: [
        {
          propertyId: 'P54321' // Twitter Screen ID
          value: {
            type: 'string',
            value: 'taylorswift13'
          }
      ],
      qualifier: [
        {
          propertyId: 'P11111', // sampled time
          value: {
            type: 'date',
            value: '2023-12'
          }
        }
      ]
    }
  ]
}

お分かりになりましたでしょうか?とりあえず細かくてなんでもあるデータセットになっています。

Wikidataが提供したダンプはこれみたいなEntityが大量に含めたJSONファイル1つで、解凍すると1.5TBになります。同時にこのダンプを扱うJava SDKも提供されていて、JSONをスキーマありのJavaオブジェクトに読み込むことが簡単にできます。

ただしこれでも探索とデータマイニングにどうも辛いので、全データをキープしながら変換して、検索できるデータベースに保存することにしました。(探索のためなら、サンプリングも考えられなくはないが、当時はデータ全貌が把握してなくて、あえて情報をなくさないやり方にと考えていました)

検索のための変換

結論から言うと、JSONをLongフォマットに崩して、BigQueryに保存することにしました。BigQueryテーブルの1 rowは元JSONの1つのvalueに該当して、valueとそのvalueのパスを含めて、こんなイメージです:

entityId :: string
statementId :: string
-- valueのタイプによって、 *propertyId は1つだけNON NULL
statementPropertyId :: string
referencePropertyId :: string
qualifierPropertyId :: string
-- valueのタイプによって、 *value は1つだけNON NULL
numberValue :: number
dateValue :: date
entityIdValue :: string
... -- 他のvalue columnは省略

崩すことによってデータのコピーが増えたので、BQでのLogicical Sizeが確か2.2TBくらいに増えた。でもこれでSQLで検索・集計できるようになって、BigQueryの力で結果がリアルタイムで出ます。

変換処理

変換処理自体は Scala 3と fs2 で書いていて、parquet4sやHadoopのいくつかのパッケージに依存しています。

中核コードはこんなイメージです:

import cats.effect.IO
import fs2.Stream
import org.wikidata.wdtk.datamodel.implementation.EntityDocumentImpl

// 出力Parquetのrow。data class定義すれば
case class ParquetRow(entityId: String, statementId: String, ...)

def readJson(jsonPath: String): Stream[IO, EntityDocumentImpl) = ???

def breakEntityDoc(entityDoc: EntityDocumentImpl): Stream[IO, ParquetRow]] = ???

def writeParquet(Stream[IO, ParquetRow])(outputPath: String): IO[Unit] = ???

def main(): IO[Unit] = {
  val threadCount = 48
  // JSONファイルをEntityDocumentImplに読み込んで
  val jsonPath: Stream[IO, String] = ... 
  val entityDocs: Stream[IO, EntityDocumentImpl] = ...

  // breakEntityDocに通しParquetRowのStreamに変換して
  val parquetRows: Stream[IO, ParquetRow] = jsonPath.parEvalMap(threadCount)(breakEntityDoc)

  // chunkして作った短いStreamを一個ずつParquetファイルに書き込む
  val writeParquetFiles: Stream[IO, Unit] = parquetRows.chunkMin(10_000_000).evalMap(writeParquet)

  writeParquetFiles.compile.drain // 書き込みStreamを消費することがmainのIOになる
}

うまく行ったこと

  • 入力データを複数 ND-JSONファイル(改行分割のJSON)に分割すること

JSONを一行ずつ読み込んで、パースする処理だとJVMが意外と早くないです(信じていたのに)。ここ私の作りが簡単すぎたかもしれませんが、とりあえずIOもJSONパースも早くはなかった。JSON読み込みがボトルネックにならないよう、入力データを大量のファイル(1つ 50MB程度)に分割して、複数ファイルを同時に読み込むことで複数コアを同時に回すことができた。

  • Parquetフォマットを使ったこと

Parquetはスキーマありのコラムストレージフォマットで、BigQuery含めて広くサポートされています。ファイル圧縮と豊富かスキーマに対応して、複数言語・複数システムを跨ぐデータ交換において今は個人の選択肢no.1です。

ScalaからParquetを生成する時、上記イメージのようにdata class定義すれば parquet4sがparquet encoderを生成してくれてすごく便利でした。

ちなみにParquetファイルをVSCodeでみるならdata wranglerをお勧めします。

  • fs2を使ったこと

fs2はcats-effectをベースにしていて、これを使って少ないコードで複数コアを駆使できたし、処理の並行数(同時になんこのファイルを開くなど)を簡単に調整できるのが便利でした。

うまくいかなかったこと

  • メモリプレシャーが激しかった

当時96T / 192G memのVMでやっていたが、メモリが満タンに使われいて、CPU利用率が高くいかず60%くらいでした。GCログとFlamegraph 出してみると、そのCPU時間の1/3くらいがGCでした。

コードイメージのように今回の並行処理を複数ファイルを同時に読み込んで、さらに複数スレードで変換して最後のParquet用ストリームにしています。これら一連の処理に消費順番をあまり考慮していなくて、おそらくJVMオブジェクトが長く残ってメモリを多く食ってしまいました。つまりGCに向けて改善できる余地はあるはず。

  • Scala3 LTSバージョンの対応

私の今の仕事では常にScala使うわけではなくて、でもいつになるかわからない後続の開発を簡単にしたいと考えていました。それで最初はLTSのScala 3.3使いたかったが、ライブラリの対応バージョンがバラバラでして、結局どのライブラリの都合のためか(確かfs2-compress)Scala3.4.2に上げることになりました。

最後に

やってることのサイズと比べて、各種SDKとパッケージ自分で少なく書いて済ませられたと思うし、JVM・Scalaのデータ処理エコシステムに感謝しないといけないです。

あとJVMのメモリ周りが深いことを再度感じました。

雑に書いていましたが誰かの参考になれると嬉しいです。

1
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?