はじめに
Apache Cassandraで大量データをロードする際には、sstable形式がサポートされているのですが、それ以外のフォーマットはサポートされていないと、マニュアルに記載されています。
CSVについては、cqlshのCOPYコマンドでロードすることが可能なのですが、データ量によっては適切ではない事態になる可能性もあります。
そのため、全てのケースに適用できる方法ではありません。
https://cassandra.apache.org/doc/stable/cassandra/operating/bulk_loading.html
Bulk loading Apache Cassandra data is supported by different tools. The data to bulk load must be in the form of SSTables. Cassandra does not support loading data in any other format such as CSV, JSON, and XML directly. Although the cqlsh COPY command can load CSV data, it is not a good option for amounts of data. Bulk loading is used to:
- Restore incremental backups and snapshots. Backups and snapshots are already in the form of SSTables.
- Load existing SSTables into another cluster. The data can have a different number of nodes or replication strategy.
- Load external data to a cluster.
他のクラスタからデータをコピーしてくるだけであれば問題ないのですが、本番データを開発環境に運ぶ事は組織によってはなかなか難易度が高く、テストデータを用いるシーンが存在すると思います。
私自身もそのような状況に直面し、元となるsstableをどのように作成すれば良いのかが分からず、その方法を調査したので結果を共有します。
バージョン情報
今回利用したバージョンは下記です。
パッケージ | バージョン |
---|---|
Cassandra | 3.11.10 |
SSTableLoader | 3.11.10(cassandraのrpm同梱) |
どんな人の どんな課題を解決するか
このドキュメントは、テストデータをcassandraにロードしたいが、レコード数が多くて単純にはロードしきれないという苦境に立たされている人の課題を解決する内容となっております。
利用するツール選定
Cassandraのコードを直接解析し、愚直にテストデータをsstableに変換するのは非常に手間がかかるため、すでに世の中にあるそういったツールを調べました。
いくつかの候補がヒットしましたが、メンテナンスが十分に行われていないものや、うまく機能しないものが多く見つかりました。その中で唯一利用できたのがinstaclustrのcassandra-sstable-generatorでした。
したがって、この記事では、instaclustrのcassandra-sstable-generatorを使ったsstableの作成方法について詳しく説明します。
sstableを作る前に
いよいよ本題のsstable作成方法についてといきたいのですが、実は私が利用したコードはそのままでは動かず、微妙な修正が必要でした。
したがって、まずはその修正ポイントについて記述します。
このドキュメントを読んでいる時点で不具合が解消している可能性もあるため、以下は必ず必要な修正というわけではありません。
コード修正
不具合
最初にこのツールを実行した際に、何故かデータが全く入らない状態に遭遇しました。
「なぜこんなことになるのか?」と調査しているとき、一部の条件判定が逆であるのではないかと思い、その条件を反転させてみたところ、うまく動作しました。
そのため、以下の修正が必要になります。
diff --git a/generator/src/main/java/com/instaclustr/sstable/generator/loader/CSVGenerator.java b/generator/src/main/java/com/instaclustr/sstable/generator/loader/CSVGenerator.java
index e27549b..9a9b742 100644
--- a/generator/src/main/java/com/instaclustr/sstable/generator/loader/CSVGenerator.java
+++ b/generator/src/main/java/com/instaclustr/sstable/generator/loader/CSVGenerator.java
@@ -64,7 +64,7 @@ public class CSVGenerator implements Generator {
try {
lastReadRow = csvListReader.read();
- return lastReadRow == null;
+ return lastReadRow != null;
} catch (IOException e) {
throw new IllegalStateException("Unable to determine if there is next row!", e);
}
上記は不具合的な修正ですが、ロード完了の待ちも無駄に長かったので短くします。
diff --git a/generator/src/main/java/com/instaclustr/sstable/generator/BulkLoader.java b/generator/src/main/java/com/instaclustr/sstable/generator/BulkLoader.java
index 753cb4a..80679f7 100644
--- a/generator/src/main/java/com/instaclustr/sstable/generator/BulkLoader.java
+++ b/generator/src/main/java/com/instaclustr/sstable/generator/BulkLoader.java
@@ -45,7 +45,7 @@ public abstract class BulkLoader implements Runnable {
while (!Arrays.stream(threads).allMatch(val -> val.status)) {
try {
- Thread.sleep(60 * 1000);
+ Thread.sleep(6 * 1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
スキーマ定義の作成
デフォルトで生成するsstableのスキーマをそのまま使うことは無いと思うので、下記のファイル追加も必要です。
こちらで記載しているスキーマ定義もサンプルなので、適時必要な内容へ書き換えての作成をしてください。
package com.instaclustr.sstable.generator;
import static java.lang.String.format;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.function.Supplier;
import java.util.stream.Stream;
import java.io.BufferedWriter;
import java.io.FileWriter;
import java.io.IOException;
public class RowMapperMySchema implements RowMapper {
public static String KEYSPACE = "mykeyspace";
public static String TABLE = "mytable";
public static final UUID UUID_1 = UUID.randomUUID();
public static final UUID UUID_2 = UUID.randomUUID();
public static final UUID UUID_3 = UUID.randomUUID();
@Override
public List<Object> map(final List<String> row) {
UUID id = UUID.fromString(row.get(0));
String name = row.get(1);
String address = row.get(2);
List<Object> mappedList = new ArrayList<Object>();
mappedList.add(id);
mappedList.add(name);
mappedList.add(address);
return mappedList;
}
@Override
public Stream<List<Object>> get() {
return Stream.of(
new ArrayList<Object>() {{
add(UUID_1);
add("John");
add("Doe");
}},
new ArrayList<Object>() {{
add(UUID_2);
add("Marry");
add("Poppins");
}},
new ArrayList<Object>() {{
add(UUID_3);
add("Jim");
add("Jack");
}});
}
@Override
public List<Object> random() {
return new ArrayList<Object>() {{
add(UUID.randomUUID());
add(UUID.randomUUID().toString());
add(UUID.randomUUID().toString());
}};
}
@Override
public String insertStatement() {
return format("INSERT INTO %s.%s (id, name, address) VALUES (?, ?, ?);", KEYSPACE, TABLE);
}
}
作成した定義を利用するように修正します。
diff --git a/impl/src/main/resources/META-INF/services/com.instaclustr.sstable.generator.RowMapper b/impl/src/main/resources/META-INF/services/com.instaclustr.sstable.generator.RowMapper
index 78b8bf4..cd9175c 100644
--- a/impl/src/main/resources/META-INF/services/com.instaclustr.sstable.generator.RowMapper
+++ b/impl/src/main/resources/META-INF/services/com.instaclustr.sstable.generator.RowMapper
@@ -1 +1 @@
-com.instaclustr.sstable.generator.RowMapper1
\ No newline at end of file
+com.instaclustr.sstable.generator.RowMapperMySchema
ビルドに向けて依存関係の追加も必要だったので対応します。
diff --git a/pom.xml b/pom.xml
index 417b379..d31378d 100644
--- a/pom.xml
+++ b/pom.xml
@@ -8,6 +8,14 @@
<version>1.5</version>
<packaging>pom</packaging>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.cassandra</groupId>
+ <artifactId>cassandra-all</artifactId>
+ <version>3.11.10</version>
+ </dependency>
+ </dependencies>
+
<modules>
<module>api</module>
<module>impl</module>
ビルド
コード修正が終わったら次はビルドします。
mvn clean install \
-pl '!:sstable-generator-cassandra-2-2-19,!:sstable-generator-cassandra-3-0-28,!:sstable-generator-cassandra-4-0-7,!:sstable-generator-cassandra-4-1-0' \
-DskipTests
動かしてみて、ちゃんとヘルプメッセージが出てくれればOKです。
$ java \
-cp impl/target/sstable-generator-impl.jar:generator/target/sstable-generator.jar:cassandra-3/target/sstable-generator-cassandra-3.jar \
com.instaclustr.sstable.generator.LoaderApplication
// 出力例
Usage: <main class> [-V] COMMAND
-V, --version print version information and exit
Commands:
csv tool for bulk-loading of data from csv
random tool for bulk-loading of random data
fixed tool for bulk-loading of fixed data
Cassandraにスキーマ作成
コード追加したスキーマと同じ定義でcassandraにスキーマ作成します。
CREATE KEYSPACE IF NOT EXISTS myschema
WITH REPLICATION = {
'class' : 'NetworkTopologyStrategy',
'dc1' : 3
};
CREATE TABLE IF NOT EXISTS myschema.mytable (
id uuid,
name text,
address text,
PRIMARY KEY (id , address)
);
テストデータ作成
適当なCSVを作成します。
for i in {1..1000}; do echo `uuidgen`,test${i},address${i} >> testdata.csv; done
sstable生成
ようやくsstableの生成にたどり着きました。
java \
-cp cassandra-sstable-generator/impl/target/sstable-generator-impl.jar:cassandra-sstable-generator/generator/target/sstable-generator.jar:cassandra-sstable-generator/cassandra-3/target/sstable-generator-cassandra-3.jar \
com.instaclustr.sstable.generator.LoaderApplication \
csv \
--keyspace=mykeyspace \
--table=mytable \
--output-dir=cassandra-sstable-generator/output/csv/ \
--schema=cassandra-sstable-generator/myschema.mytable.cql \
--file=cassandra-sstable-generator/testdata.csv
確認
sstableが出来たら、それがちゃんとロードできるのか確認します。
sstabledumpを使うので、リンク先から取得しておきます。
sstabledump output/csv/myschema/mytable/md-1-big-Data.db > output/csv/myschema/mytable/md-1-big-Data.db.json
// データを1つ確認
cat output/csv/myschema/mytable/md-1-big-Data.db.json | jq .[0]
{
"partition": {
"key": [
"b242b2ad-456b-4273-b34b-853ae6869c9f"
],
"position": 0
},
"rows": [
{
"type": "row",
"position": 30,
"clustering": [
"address239"
],
"liveness_info": {
"tstamp": "2024-03-01T10:51:18.252Z"
},
"cells": [
{
"name": "name",
"value": "test239"
}
]
}
]
}
// 件数確認
$ cat output/csv/myschema/mytable/md-1-big-Data.db.json | jq ".|length"
1000
sstableloader実行
長かったですが、sstableの生成に成功したので、あとはsstableloaderでテストデータをロードすれば目的達成です。
sstableloader \
--nodes <HOSTNAME> \
--username <USERNAME> \
--password <PASSWORD> \
--conf-path /etc/cassandra/conf/cassandra.yaml \
cassandra-sstable-generator/output/csv/myschema/mytable
// 出力例
Summary statistics:
Connections per host : 1
Total files transferred : 9
Total bytes transferred : 282.990KiB
Total duration : 3955 ms
Average transfer rate : 71.535KiB/s
Peak transfer rate : 72.136KiB/s
確認
cqlsh> select * from myschema.mytable where id = b242b2ad-456b-4273-b34b-853ae6869c9f;
id | address | name
--------------------------------------+------------+---------
b242b2ad-456b-4273-b34b-853ae6869c9f | address239 | test239
(1 rows)
cqlsh> select count(1) from myschema.mytable ;
count
-------
1000
(1 rows)
まとめ
長かったですがsstableを作成することが出来ました。
途中作成したスキーマの定義を自由に書き換えれば、任意のスキーマのテストデータをsstable化することが出来るので、大量データをcassandraにロードすることが出来るようになります。
また、今回はRandomでも動かせるようなサンプルとしているので、csvの用意を飛ばしてrandomでの実行も出来るかと思います。
データの質は問わず、レコード数やデータ量に重きをおいたテストに利用いただけます。
今回共有した内容が、直接sstableを持ってこれない環境でテストをしたい方に刺さる内容であれば幸いです。