最近kotlinでORCファイルを直接読み書きする機会があったのでメモ
ORCファイルとは
ORCファイルはHiveの処理に最適化された列指向のファイルフォーマットです。HiveのテーブルデータをORCファイルにするだけで様々な最適化が行われ、クエリの実行速度を大幅に改善することができます。
また、ORCは列指向ファイルフォーマット(同じ種類のデータで構成)なので圧縮効率が非常に高く、データ量の削減にもつながります。zlibとsnappyなどがサポートされています。(非圧縮も可能)
使用するライブラリ
以下のライブラリを使用します
dependencies {
implementation "org.jetbrains.kotlin:kotlin-stdlib-jdk8"
implementation group: 'org.apache.orc', name: 'orc-core', version: '1.6.3'
implementation group: 'org.apache.hadoop', name: 'hadoop-core', version: '1.2.1'
}
ORCファイルの書き込み
書き込むデータ
以下のようなデータをorcへ書き込んでみます。
data class OrcFields(
val numberField: Int,
val stringField: String,
val dateField: Date,
val timestampField: Timestamp,
val nullableField: String?
)
fun dataList() = listOf(
OrcFields(1, "str1", Date.valueOf("2020-01-01"), Timestamp.valueOf("2020-01-01 01:02:03"), null),
OrcFields(2, "str2", Date.valueOf("2020-01-02"), Timestamp.valueOf("2020-01-02 01:02:03"), "not null")
)
コード
fun writeOrcFile() {
// TypeDescription.createStruct()でschemaを定義します
val schema = TypeDescription.createStruct()
.addField("number_field", TypeDescription.createInt())
.addField("string_field", TypeDescription.createString())
.addField("date_field", TypeDescription.createDate())
.addField("timestamp_field", TypeDescription.createTimestamp())
.addField("nullable_field", TypeDescription.createString())
// configやoptionを作成し、それらを使用して OrcFile.createWriter()でWriterを作成します
val path = Path("test.orc")
val conf = Configuration()
// optionにschemaをセットしておかないとwriter作成時にNullPointerExceptionになります
val option = OrcFile.writerOptions(conf).setSchema(schema)
val writer = OrcFile.createWriter(path, option)
// schemaから書き込みバッチを生成します
// batch.colはschema定義に応じたColumnVectorのリストです
val batch = schema.createRowBatch()
val numberFieldCol = batch.cols[0] as LongColumnVector
val stringFiledCol = batch.cols[1] as BytesColumnVector
val dateFieldCol = batch.cols[2] as DateColumnVector
val timestampFieldCol = batch.cols[3] as TimestampColumnVector
val nullableFieldCol = batch.cols[4] as BytesColumnVector
dataList().forEach { data ->
// 各列に対する書き込み位置を取得します
val row = batch.size
// 各列の書き込み位置に値を設定します
numberFieldCol.vector[row] = data.numberField.toLong()
stringFiledCol.setVal(row, data.stringField.toByteArray())
dateFieldCol.vector[row] = data.dateField.toLocalDate().toEpochDay()
timestampFieldCol.set(row, data.timestampField)
if (data.nullableField == null) {
// null値を書き込む場合は書き込み位置のColumnVector.isNullにtrueをセットし
// ColumnVector.noNullsにfalseをセットします
nullableFieldCol.isNull[row] = true
nullableFieldCol.noNulls = false
} else {
nullableFieldCol.setVal(row, data.nullableField.toByteArray())
}
// batchがいっぱいになったらwriter.addRowBatch()でファイルへ書き出します
batch.size += 1
if (batch.maxSize <= batch.size) {
writer.addRowBatch(batch)
batch.reset()
}
}
// 書き込まれていないデータをファイルへ書き出します
if (batch.size != 0) {
writer.addRowBatch(batch)
batch.reset()
}
writer.close()
}
結果
orcファイルの中身は orc-tools
というツールで確認できます。
※orc-toolsは以下からダウンロードできます。
https://repo1.maven.org/maven2/org/apache/orc/orc-tools/1.6.3/
orc-tools-1.6.3-uber.jar
をダウンロード
使い方 -> https://orc.apache.org/docs/java-tools.html
# 型確認
$ java -jar orc-tools-1.5.5-uber.jar meta test.orc
# ... 略 ...
Processing data file test.orc [length: 688]
Structure for test.orc
File Version: 0.12 with FUTURE
Rows: 2
Compression: ZLIB
Compression size: 262144
Type: struct<number_field:int,string_field:string,date_field:date,timestamp_field:timestamp,nullable_field:string>
# ... 略 ...
# データ確認
$ java -jar orc-tools-1.5.5-uber.jar data test.orc
# ... 略 ...
Processing data file test.orc [length: 688]
{"number_field":1,"string_field":"str1","date_field":"2020-01-01","timestamp_field":"2020-01-01 01:02:03.123456789","nullable_field":null}
{"number_field":2,"string_field":"str2","date_field":"2020-01-02","timestamp_field":"2020-01-02 01:02:03.123456789","nullable_field":"not null"}
データ型と格納されているデータが想定通りであることが確認できます。
ORCファイルの読み込み
先ほど作成したファイルを読み込んでみます
コード
fun readOrcFile() {
// configやoptionを作成し、それらを使用して OrcFile.createReader()でReaderを作成します
val conf = Configuration()
val path = Path("test.orc")
val options = OrcFile.readerOptions(conf)
val reader = OrcFile.createReader(path, options)
// schemaを読み込み、読込batchを作成します
// batch.colはschema定義に応じたColumnVectorのリストです
val rows = reader.rows()
val batch = reader.schema.createRowBatch()
val numberFieldCol = batch.cols[0] as LongColumnVector
val stringFiledCol = batch.cols[1] as BytesColumnVector
val dateFieldCol = batch.cols[2] as DateColumnVector
val timestampFieldCol = batch.cols[3] as TimestampColumnVector
val nullableFieldCol = batch.cols[4] as BytesColumnVector
// batch単位でファイルから読み込みます
while (rows.nextBatch(batch)) {
// batchの中身を一行ずつ取得します
0.until(batch.size).forEach { i ->
val numberValue = numberFieldCol.vector[i].toInt()
val stringValue = stringFiledCol.toString(i)
val dateValue = Date.valueOf(LocalDate.ofEpochDay(dateFieldCol.vector[i]))
val timestampValue = Timestamp(timestampFieldCol.time[i]).apply { nanos = timestampFieldCol.nanos[i] }
val nullableValue = if (!nullableFieldCol.noNulls && nullableFieldCol.isNull[i]) {
null
} else {
nullableFieldCol.toString(i)
}
println("numberValue = $numberValue")
println("stringValue = $stringValue")
println("dateValue = $dateValue")
println("timestampValue = $timestampValue")
println("nullableValue = $nullableValue")
// 取得した各値を用いていい感じに処理します
val data = OrcFields(
numberField = numberValue,
stringField = stringValue,
dateField = dateValue,
timestampField = timestampValue,
nullableField = nullableValue
)
println(data)
}
}
rows.close();
}
出力
numberValue = 1
stringValue = str1
dateValue = 2020-01-01
timestampValue = 2020-01-01 01:02:03.123456789
nullableValue = null
OrcFields(numberField=1, stringField=str1, dateField=2020-01-01, timestampField=2020-01-01 01:02:03.123456789, nullableField=null)
numberValue = 2
stringValue = str2
dateValue = 2020-01-02
timestampValue = 2020-01-02 01:02:03.123456789
nullableValue = not null
OrcFields(numberField=2, stringField=str2, dateField=2020-01-02, timestampField=2020-01-02 01:02:03.123456789, nullableField=not null)
先ほど書き込んだ値が読み込めていることが確認できます。
まとめ
それなりに簡単にORC形式のファイルの読み書きができます。
データ型によって読み書きするメソッドが微妙に違う場合があるので、そこだけ気を付ける必要があります。