はじめに
以前書いた記事 の続きで、今回は Reladomo の MT Loader を使ってみたお話です。
MT Loader(Multi-Threaded Matcher Loader)とは?
Multi-Threaded matcher Loader (MT Loader) は、別のソース(file, feed, other DB, etc.)からの変更を出力先のDBにマージするための機能です。ドキュメントを読んだ感じだと、Reladomo で扱うデータ量が多い場合に MT Loader が推奨されることが多いです。
MT Loader の動作イメージ
- Input と Output(Database) の2つのデータセットのどちらにも存在するデータを検出する
- どちらにも存在するデータはすべて
UPDATE
する(ただし、データに変更があった場合) - Input にはあるが Output(Database) にはないデータセットは
INSERT
する - Output(Database) にあるが Input にないものはすべてクローズする(実装により、
DELETE
するか有効期間を期限切れにするか)
MT Loader のアーキテクチャ
読み取り、比較、書き込みに複数のスレッドを使用して、IOを分散できる仕組みになっています。
MT Loader の特徴
- File-to-Database である必要はない。 Database-to-Database または Memory-to-Database などに簡単に変更できる。
- MatcherThread または SingleQueueExecutor のサブクラス化により、さまざまな要件に合わせたチューニングが可能。
- 書き込みは1つのテーブルへの書き込みのみ。複数テーブルへの書き込みも可能だが、トランザクションの保証はない。 大量データを扱うユースケースで強みを発揮するものなので、トランザクション管理が重要なケースでは使えない。(使い所には注意が必要)
- 設計で冪等性を担保することができる。
MT Loader を使ってみる
Spring Boot と組み合わせて API を書くことが多いので、その組み合わせで書いてみます。
実際のコードは https://github.com/amtkxa/spring-boot-reladomo-mt-loader にあります。
テストコードを書く
事前準備
テストコード実行前にテスト用のDBにあらかじめテストデータの読み込みが完了している状態を作りたいと思います。テストデータの読み込みには MithraTestResource.addTestDataToDatabase
を使うことを想定しているので、以下のようなファイルを用意します。
class com.amtkxa.domain.entity.Customer
customerId, name, country, businessDateFrom, businessDateTo, processingDateFrom, processingDateTo
1,"Liam","USA","2019-12-01 00:00:00.000","9999-12-01 23:59:00.000","2019-12-01 00:00:00.000","9999-12-01 23:59:00.000"
2,"Emma","USA","2019-12-01 00:00:00.000","9999-12-01 23:59:00.000","2019-12-01 00:00:00.000","9999-12-01 23:59:00.000"
3,"Noah","USA","2019-12-01 00:00:00.000","9999-12-01 23:59:00.000","2019-12-01 00:00:00.000","9999-12-01 23:59:00.000"
4,"Olivia","USA","2019-12-01 00:00:00.000","9999-12-01 23:59:00.000","2019-12-01 00:00:00.000","9999-12-01 23:59:00.000"
5,"William","USA","2019-12-01 00:00:00.000","9999-12-01 23:59:00.000","2019-12-01 00:00:00.000","9999-12-01 23:59:00.000"
6,"Ava","USA","2019-12-01 00:00:00.000","9999-12-01 23:59:00.000","2019-12-01 00:00:00.000","9999-12-01 23:59:00.000"
7,"James","USA","2019-12-01 00:00:00.000","9999-12-01 23:59:00.000","2019-12-01 00:00:00.000","9999-12-01 23:59:00.000"
H2 Database に事前にテストデータを取り込むための abstract なクラスも用意しました。
public abstract class AbstractReladomoTest {
private static Logger log = LoggerFactory.getLogger(AbstractReladomoTest.class);
private MithraTestResource mithraTestResource;
protected abstract String[] getTestDataFilenames();
protected String getMithraConfigXmlFilename() {
return "reladomo/config/TestReladomoRuntimeConfiguration.xml";
}
@BeforeEach
public void setUp() {
log.info("Setting up reladomo on h2");
this.mithraTestResource = new MithraTestResource(this.getMithraConfigXmlFilename());
ConnectionManagerForTests connectionManager = ConnectionManagerForTests.getInstanceForDbName("testdb");
this.mithraTestResource.createSingleDatabase(connectionManager);
for (String filename : this.getTestDataFilenames()) {
this.mithraTestResource.addTestDataToDatabase(filename, connectionManager);
}
this.mithraTestResource.setUp();
}
@AfterEach
public void tearDown() {
this.mithraTestResource.tearDown();
}
}
MT Loader をテストコードで動かす
MT Loader に以下のような入力データを与えて DB 更新をしてみます。
- customerId: 6 のユーザの country を更新
- 新しいユーザを1人分追加
- それ以外のユーザは入力データに存在しない(該当ユーザの有効期限が期限切れに更新されることを期待)
実際に書いたテストコードを以下のようになりました。
public class SingleQueueExecutorParallelLoadTest extends AbstractReladomoTest {
private static int NUMBER_OF_THREADS = 2;
private static int BATCH_SIZE = 5;
private static int INSERT_THREADS = 3;
@Override
public String[] getTestDataFilenames() {
return new String[] { "testdata/customer_data.txt" };
}
private List<Customer> getInputData() {
Timestamp businessDate = DateUtil.parse("2019-12-05 00:00:00");
CustomerList customerList = new CustomerList();
customerList.add(new Customer(6, "Ava", "JPN", businessDate));
customerList.add(new Customer(8, "Arthur", "USA", businessDate));
return customerList;
}
private CustomerList getDbRecords() {
return CustomerFinder.findMany(
CustomerFinder.all()
.and(CustomerFinder.businessDate().equalsEdgePoint())
.and(CustomerFinder.processingDate().equalsInfinity())
);
}
@Test
public void testLoadDataParallel() {
try {
QueueExecutor queueExecutor = new SingleQueueExecutor(
NUMBER_OF_THREADS,
CustomerFinder.customerId().ascendingOrderBy(),
BATCH_SIZE,
CustomerFinder.getFinderInstance(),
INSERT_THREADS
);
MatcherThread<Customer> matcherThread = new MatcherThread<>(
queueExecutor,
new Extractor[] { CustomerFinder.customerId() }
);
matcherThread.start();
// Database data load: Parallel
DbLoadThread dbLoadThread = new DbLoadThread(getDbRecords(), null, matcherThread);
dbLoadThread.start();
// Input data load: Parallel
PlainInputThread inputThread = new PlainInputThread(new InputDataLoader(), matcherThread);
inputThread.run();
matcherThread.waitTillDone();
// Assert
checkResult(queueExecutor);
} catch (Exception e) {
throw new ReladomoMTLoaderException("Failed to load data. " + e.getMessage(), e.getCause());
}
}
private void checkResult(QueueExecutor queueExecutor) {
// Whatever is in Output Set but not in Input Set will be closed out (terminated).
CustomerList customerList = getDbRecords();
assertEquals(2, customerList.count());
// Whatever is in the intersection, will be updated (but only if something changed)
Customer customer = CustomerFinder.findOne(
CustomerFinder.customerId().eq(6)
.and(CustomerFinder.businessDate().equalsEdgePoint())
.and(CustomerFinder.processingDate().equalsInfinity())
);
assertAll("Check updated customer data",
() -> assertEquals("Ava", customer.getName()),
() -> assertEquals("JPN", customer.getCountry()) // Updated from USD to JPN
);
// Whatever in in Input Set but not in Output Set will be inserted
Customer customer8 = CustomerFinder.findOne(
CustomerFinder.customerId().eq(8)
.and(CustomerFinder.businessDate().equalsEdgePoint())
.and(CustomerFinder.processingDate().equalsInfinity())
);
assertAll("Check inserted customer data",
() -> assertEquals("Arthur", customer8.getName()),
() -> assertEquals("USA", customer8.getCountry())
);
assertAll("Check the count of inserts, updates, terminates",
() -> assertEquals(1, queueExecutor.getTotalInserts()),
() -> assertEquals(1, queueExecutor.getTotalUpdates()),
() -> assertEquals(6, queueExecutor.getTotalTerminates())
);
}
private class InputDataLoader implements InputLoader {
private boolean firstTime = true;
@Override
public List<? extends MithraTransactionalObject> getNextParsedObjectList() {
return getInputData();
}
@Override
public boolean isFileParsingComplete() {
if (firstTime) {
firstTime = false;
return false;
} else {
return true;
}
}
}
}
MT Loader による更新処理が動く前は、DB は以下の状態になっています。
-- select * from customer where business_date_to = '9999-12-01 23:59:00.000' and processing_date_to = '9999-12-01 23:59:00.000';
+-------------+---------+---------+-------------------------+-------------------------+-------------------------+-------------------------+
| customer_id | name | country | business_date_from | business_date_to | processing_date_from | processing_date_to |
+-------------+---------+---------+-------------------------+-------------------------+-------------------------+-------------------------+
| 1 | Liam | USA | 2019-12-01 00:00:00.000 | 9999-12-01 23:59:00.000 | 2019-12-01 00:00:00.000 | 9999-12-01 23:59:00.000 |
| 2 | Emma | USA | 2019-12-01 00:00:00.000 | 9999-12-01 23:59:00.000 | 2019-12-01 00:00:00.000 | 9999-12-01 23:59:00.000 |
| 3 | Noah | USA | 2019-12-01 00:00:00.000 | 9999-12-01 23:59:00.000 | 2019-12-01 00:00:00.000 | 9999-12-01 23:59:00.000 |
| 4 | Olivia | USA | 2019-12-01 00:00:00.000 | 9999-12-01 23:59:00.000 | 2019-12-01 00:00:00.000 | 9999-12-01 23:59:00.000 |
| 5 | William | USA | 2019-12-01 00:00:00.000 | 9999-12-01 23:59:00.000 | 2019-12-01 00:00:00.000 | 9999-12-01 23:59:00.000 |
| 6 | Ava | USA | 2019-12-01 00:00:00.000 | 9999-12-01 23:59:00.000 | 2019-12-01 00:00:00.000 | 9999-12-01 23:59:00.000 |
| 7 | James | USA | 2019-12-01 00:00:00.000 | 9999-12-01 23:59:00.000 | 2019-12-01 00:00:00.000 | 9999-12-01 23:59:00.000 |
+-------------+---------+---------+-------------------------+-------------------------+-------------------------+-------------------------+
MT Loader による更新処理が動いた後は、DB は以下の状態になり、期待通りの結果になりました。
-- select * from customer where business_date_to = '9999-12-01 23:59:00.000' and processing_date_to = '9999-12-01 23:59:00.000';
+-------------+--------+---------+-------------------------+-------------------------+-------------------------+-------------------------+
| customer_id | name | country | business_date_from | business_date_to | processing_date_from | processing_date_to |
+-------------+--------+---------+-------------------------+-------------------------+-------------------------+-------------------------+
| 6 | Ava | JPN | 2020-10-10 14:24:52.387 | 9999-12-01 23:59:00.000 | 2020-10-10 14:24:53.250 | 9999-12-01 23:59:00.000 |
| 8 | Arthur | USA | 2020-10-10 14:24:52.387 | 9999-12-01 23:59:00.000 | 2020-10-10 14:24:53.250 | 9999-12-01 23:59:00.000 |
+-------------+--------+---------+-------------------------+-------------------------+-------------------------+-------------------------+
さいごに
Reladomo はお仕事でよく利用していて、個人的に好きな技術のひとつなんですが、理解して使えるようになるところに到達するまでのハードルが少し高い印象があります。
例えば....... 比較的よく利用されるであろう Spring Boot に組み込んで動かすサンプルなどが kata になくて、使ってみようとすると DBConnectionManager ってどうやって組み込めばいいんだろ...とか考える必要があるので、恥ずかしながら、そもそも動くレベルのものを作るのにも結構苦労した思い出があります。
Reladomo 自体はとてもいい技術なのに、それついて書かれている記事ってあまり見つからないな......と感じていて、少しもったいないような気持ちになったので、今回は自分で調べたことをまとめて共有してみることにしました。