背景
実装目的
データベースの変更履歴を記録し、変更前後のデータを履歴テーブルに保存したい。これにより、データの変更内容を正確に追跡し、監査やデータ復旧に役立てる。
Debeziumの仕組み
データベースのバイナリログ(log bin)から変更データを取得する。
画像出典
https://github.com/Hornetlabs/synchdb?tab=readme-ov-file
メリット
・アプリケーションの処理とは独立してデータ変更を取得できるため、業務への影響を最小限に抑えられる。
・統一されたデータフォーマットで取得できるので、JSONデータにスムーズに変換できる。
・値がnullのデータは自動的に省略されるため、不要なデータの処理を削減できる。
デメリット
・DBのlog binを有効化する必要があるため、運用環境によっては制限がある。
・Kafka を利用する構成が一般的なため、インフラチームとの調整が必要でメッセージブローカーの運用や管理が発生する。
実際の実装
今回は、インフラチームとの調整が困難なため、アプリケーション側で直接データを受け取り、DB に保存する方式を採用。
参考実装
DebeziumConnectorConfig.java
(Debezium コネクタの設定 application.ymlに合わせてDB情報の取得を変更)
@Configuration
public class DebeziumConnectorConfig {
@Bean
public io.debezium.config.Configuration customerConnector(Environment env) throws IOException {
var offsetStorageTempFile = File.createTempFile("offsets_", ".dat");
var dbHistoryTempFile = File.createTempFile("dbhistory_", ".dat");
return io.debezium.config.Configuration.create()
// https://debezium.io/documentation/reference/stable/connectors/mysql.html#mysql-property-name
.with("name", "customer_mysql_connector")
// https://debezium.io/documentation/reference/stable/connectors/mysql.html#mysql-property-connector-class
.with("connector.class", "io.debezium.connector.mysql.MySqlConnector")
// https://debezium.io/documentation/reference/stable/development/engine.html#:~:text=the%20MySQL%20connector.-,offset.storage,-%3C%E2%80%A6%E2%80%8B%3E.FileOffsetBackingStore
.with("offset.storage", "org.apache.kafka.connect.storage.FileOffsetBackingStore")
// https://debezium.io/documentation/reference/stable/development/engine.html#:~:text=implement%20%3C%E2%80%A6%E2%80%8B%3E.OffsetBackingStore%20interface.-,offset.storage.file.filename,-%22%22
.with("offset.storage.file.filename", offsetStorageTempFile.getAbsolutePath())
// https://debezium.io/documentation/reference/stable/development/engine.html#:~:text=upon%20time%20intervals.-,offset.flush.interval.ms,-60000
.with("offset.flush.interval.ms", "60000")
// https://debezium.io/documentation/reference/stable/connectors/mysql.html#mysql-property-database-hostname
.with("database.hostname", env.getProperty("DB.datasource.host"))
// https://debezium.io/documentation/reference/stable/connectors/mysql.html#mysql-property-database-port
.with("database.port", env.getProperty("DB.datasource.port")) // defaults to 3306
// https://debezium.io/documentation/reference/stable/connectors/mysql.html#mysql-property-database-user
.with("database.user", env.getProperty("DB.datasource.username"))
// https://debezium.io/documentation/reference/stable/connectors/mysql.html#mysql-property-database-password
.with("database.password", env.getProperty("DB.datasource.password"))
// https://debezium.io/documentation/reference/stable/connectors/sqlserver.html#sqlserver-property-database-dbname
.with("database.dbname", env.getProperty("DB.datasource.database"))
// https://debezium.io/documentation/reference/stable/connectors/mysql.html#mysql-property-database-include-list
.with("database.include.list", env.getProperty("DB.datasource.database"))
// https://debezium.io/documentation/reference/stable/connectors/mysql.html#mysql-property-include-schema-changes
.with("include.schema.changes", "true")
// allowPublicKeyRetrieval=true allows the client to automatically request the
// public key from the mysql server
// https://dev.mysql.com/doc/connector-j/8.0/en/connector-j-connp-props-security.html#:~:text=5.1.31-,allowPublicKeyRetrieval,-Allows%20special%20handshake
.with("database.allowPublicKeyRetrieval", "true")
// https://debezium.io/documentation/reference/stable/connectors/mysql.html#mysql-property-database-server-id
.with("database.server.id", "10181")
// https://debezium.io/documentation/reference/stable/connectors/mysql.html#mysql-property-database-server-name
.with("database.server.name", "customer-mysql-db-server")
// https://debezium.io/documentation/reference/stable/operations/debezium-server.html#debezium-source-database-history-class
.with("database.history", "io.debezium.relational.history.FileDatabaseHistory")
// https://debezium.io/documentation/reference/stable/operations/debezium-server.html#debezium-source-database-history-file-filename
.with("database.history.file.filename", dbHistoryTempFile.getAbsolutePath())
.build();
}
}
application.yml (DebeziumConnectorConfigに合わせてDB情報を記載)
DB:
datasource:
host: localhost
port: 3306
username: root
password: root
database: xxx
DebeziumListener.java (Debeziumイベントリスナー)
@Slf4j
@Component
public class DebeziumListener {
private final Executor executor = Executors.newSingleThreadExecutor();
private final DebeziumEngine<RecordChangeEvent<SourceRecord>> debeziumEngine;
public DebeziumListener(Configuration customerConnectorConfiguration) {
this.debeziumEngine = DebeziumEngine.create(ChangeEventFormat.of(Connect.class))
.using(customerConnectorConfiguration.asProperties())
.notifying(this::handleChangeEvent)
.build();
}
private void handleChangeEvent(RecordChangeEvent<SourceRecord> sourceRecordRecordChangeEvent) {
try {
var sourceRecord = sourceRecordRecordChangeEvent.record();
var sourceRecordChangeValue = (Struct) sourceRecord.value();
// 複合キーとその値(複数存在の場合は"_"でつなげる)
StringBuilder keyName = new StringBuilder();
StringBuilder keyValue = new StringBuilder();
if (sourceRecord.key() instanceof Struct) {
Struct key = (Struct) sourceRecord.key();
Schema schema = key.schema();
boolean isFirstField = true;
for (Field field : schema.fields()) {
if (isFirstField) {
isFirstField = false;
} else {
keyName.append("_");
keyValue.append("_");
}
keyName.append(field.name());
keyValue.append(key.get(field.name()));
}
}
// 変更前エンティティ
String oldDataJson = "{}";
try {
Struct struct = (Struct) sourceRecordChangeValue.get("before");
Map<String, Object> payload = struct.schema().fields().stream()
.map(Field::name)
.filter(fieldName -> struct.get(fieldName) != null)
.map(fieldName -> Pair.of(fieldName, struct.get(fieldName)))
.collect(toMap(Pair::getKey, Pair::getValue));
oldDataJson = objectMapper.writeValueAsString(payload);
} catch (Exception e) {
}
// 変更後エンティティと変更ユーザID
String newDataJson = "{}";
String changeUserId = "";
try {
Struct struct = (Struct) sourceRecordChangeValue.get("after");
Map<String, Object> payload = struct.schema().fields().stream()
.map(Field::name)
.filter(fieldName -> struct.get(fieldName) != null)
.map(fieldName -> Pair.of(fieldName, struct.get(fieldName)))
.collect(toMap(Pair::getKey, Pair::getValue));
newDataJson = objectMapper.writeValueAsString(payload);
if (payload.get("update_user_id") != null) {
changeUserId = payload.get("update_user_id").toString();
} else {
changeUserId = payload.get("create_user_id").toString();
}
} catch (Exception e) {
}
// 変更履歴 RevisionInfoオブジェクトを作成し、値を設定
RevisionInfo revisionInfo = new RevisionInfo();
revisionInfo.setChangeUserId(changeUserId);
revisionInfo.setChangeDateTime(LocalDateTime.now());
revisionInfo.setTargetTable(sourceRecord.topic());
revisionInfo.setPrimaryKey(keyName.toString());
revisionInfo.setPrimaryValue(keyValue.toString());
revisionInfo.setOldData(oldDataJson);
revisionInfo.setNewData(newDataJson);
if (oldDataJson.equals("{}")) {
revisionInfo.setChangeType(ChangeType.INSERT.getValue());
} else {
if (newDataJson.equals("{}")) {
revisionInfo.setChangeType(ChangeType.DELETE.getValue());
} else {
revisionInfo.setChangeType(ChangeType.UPDATE.getValue());
}
}
revisionInfoRepository.save(revisionInfo);
} catch (AbstractMethodError e) {
log.error("DebeziumListener Change Data Capture Failed", e);
throw e;
}
}
@PostConstruct
private void start() {
this.executor.execute(debeziumEngine);
}
@PreDestroy
private void stop() throws IOException {
if (Objects.nonNull(this.debeziumEngine)) {
this.debeziumEngine.close();
}
}
}
これでエンティティ変更の情報を取得して、変更履歴テーブルに保存することができる。
同じ目的の異なる実装方法
Hibernate Event Listener を用いた変更履歴テーブルの構築
https://qiita.com/teaco/items/9a9cd5a53d22a1416df0