0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

Debezium Listener を用いた変更履歴テーブルの構築

Last updated at Posted at 2025-02-20

背景

実装目的

データベースの変更履歴を記録し、変更前後のデータを履歴テーブルに保存したい。これにより、データの変更内容を正確に追跡し、監査やデータ復旧に役立てる。

Debeziumの仕組み

データベースのバイナリログ(log bin)から変更データを取得する。
画像.jpeg
画像出典
https://github.com/Hornetlabs/synchdb?tab=readme-ov-file

メリット

・アプリケーションの処理とは独立してデータ変更を取得できるため、業務への影響を最小限に抑えられる。
・統一されたデータフォーマットで取得できるので、JSONデータにスムーズに変換できる。
・値がnullのデータは自動的に省略されるため、不要なデータの処理を削減できる。

デメリット

・DBのlog binを有効化する必要があるため、運用環境によっては制限がある。
・Kafka を利用する構成が一般的なため、インフラチームとの調整が必要でメッセージブローカーの運用や管理が発生する。

実際の実装

今回は、インフラチームとの調整が困難なため、アプリケーション側で直接データを受け取り、DB に保存する方式を採用。

参考実装

DebeziumConnectorConfig.java

(Debezium コネクタの設定 application.ymlに合わせてDB情報の取得を変更)

@Configuration
public class DebeziumConnectorConfig {

  @Bean
  public io.debezium.config.Configuration customerConnector(Environment env) throws IOException {
    var offsetStorageTempFile = File.createTempFile("offsets_", ".dat");
    var dbHistoryTempFile = File.createTempFile("dbhistory_", ".dat");
    return io.debezium.config.Configuration.create()
        // https://debezium.io/documentation/reference/stable/connectors/mysql.html#mysql-property-name
        .with("name", "customer_mysql_connector")

        // https://debezium.io/documentation/reference/stable/connectors/mysql.html#mysql-property-connector-class
        .with("connector.class", "io.debezium.connector.mysql.MySqlConnector")

        // https://debezium.io/documentation/reference/stable/development/engine.html#:~:text=the%20MySQL%20connector.-,offset.storage,-%3C%E2%80%A6%E2%80%8B%3E.FileOffsetBackingStore
        .with("offset.storage", "org.apache.kafka.connect.storage.FileOffsetBackingStore")

        // https://debezium.io/documentation/reference/stable/development/engine.html#:~:text=implement%20%3C%E2%80%A6%E2%80%8B%3E.OffsetBackingStore%20interface.-,offset.storage.file.filename,-%22%22
        .with("offset.storage.file.filename", offsetStorageTempFile.getAbsolutePath())

        // https://debezium.io/documentation/reference/stable/development/engine.html#:~:text=upon%20time%20intervals.-,offset.flush.interval.ms,-60000
        .with("offset.flush.interval.ms", "60000")

        // https://debezium.io/documentation/reference/stable/connectors/mysql.html#mysql-property-database-hostname
        .with("database.hostname", env.getProperty("DB.datasource.host"))

        // https://debezium.io/documentation/reference/stable/connectors/mysql.html#mysql-property-database-port
        .with("database.port", env.getProperty("DB.datasource.port")) // defaults to 3306

        // https://debezium.io/documentation/reference/stable/connectors/mysql.html#mysql-property-database-user
        .with("database.user", env.getProperty("DB.datasource.username"))

        // https://debezium.io/documentation/reference/stable/connectors/mysql.html#mysql-property-database-password
        .with("database.password", env.getProperty("DB.datasource.password"))

        // https://debezium.io/documentation/reference/stable/connectors/sqlserver.html#sqlserver-property-database-dbname
        .with("database.dbname", env.getProperty("DB.datasource.database"))

        // https://debezium.io/documentation/reference/stable/connectors/mysql.html#mysql-property-database-include-list
        .with("database.include.list", env.getProperty("DB.datasource.database"))

        // https://debezium.io/documentation/reference/stable/connectors/mysql.html#mysql-property-include-schema-changes
        .with("include.schema.changes", "true")

        // allowPublicKeyRetrieval=true allows the client to automatically request the
        // public key from the mysql server
        // https://dev.mysql.com/doc/connector-j/8.0/en/connector-j-connp-props-security.html#:~:text=5.1.31-,allowPublicKeyRetrieval,-Allows%20special%20handshake
        .with("database.allowPublicKeyRetrieval", "true")

        // https://debezium.io/documentation/reference/stable/connectors/mysql.html#mysql-property-database-server-id
        .with("database.server.id", "10181")

        // https://debezium.io/documentation/reference/stable/connectors/mysql.html#mysql-property-database-server-name
        .with("database.server.name", "customer-mysql-db-server")

        // https://debezium.io/documentation/reference/stable/operations/debezium-server.html#debezium-source-database-history-class
        .with("database.history", "io.debezium.relational.history.FileDatabaseHistory")

        // https://debezium.io/documentation/reference/stable/operations/debezium-server.html#debezium-source-database-history-file-filename
        .with("database.history.file.filename", dbHistoryTempFile.getAbsolutePath())
        .build();
  }
}
application.yml (DebeziumConnectorConfigに合わせてDB情報を記載)
DB:
  datasource:
    host: localhost
    port: 3306
    username: root
    password: root
    database: xxx
DebeziumListener.java (Debeziumイベントリスナー)
@Slf4j
@Component
public class DebeziumListener {

  private final Executor executor = Executors.newSingleThreadExecutor();
  private final DebeziumEngine<RecordChangeEvent<SourceRecord>> debeziumEngine;

  public DebeziumListener(Configuration customerConnectorConfiguration) {
    this.debeziumEngine = DebeziumEngine.create(ChangeEventFormat.of(Connect.class))
        .using(customerConnectorConfiguration.asProperties())
        .notifying(this::handleChangeEvent)
        .build();
  }

  private void handleChangeEvent(RecordChangeEvent<SourceRecord> sourceRecordRecordChangeEvent) {
    try {
      var sourceRecord = sourceRecordRecordChangeEvent.record();
      var sourceRecordChangeValue = (Struct) sourceRecord.value();

      // 複合キーとその値(複数存在の場合は"_"でつなげる)
      StringBuilder keyName = new StringBuilder();
      StringBuilder keyValue = new StringBuilder();
      if (sourceRecord.key() instanceof Struct) {
        Struct key = (Struct) sourceRecord.key();
        Schema schema = key.schema();
        boolean isFirstField = true;

        for (Field field : schema.fields()) {
          if (isFirstField) {
            isFirstField = false;
          } else {
            keyName.append("_");
            keyValue.append("_");
          }
          keyName.append(field.name());
          keyValue.append(key.get(field.name()));
        }
      }
      // 変更前エンティティ
      String oldDataJson = "{}";
      try {
        Struct struct = (Struct) sourceRecordChangeValue.get("before");
        Map<String, Object> payload = struct.schema().fields().stream()
            .map(Field::name)
            .filter(fieldName -> struct.get(fieldName) != null)
            .map(fieldName -> Pair.of(fieldName, struct.get(fieldName)))
            .collect(toMap(Pair::getKey, Pair::getValue));
        oldDataJson = objectMapper.writeValueAsString(payload);
      } catch (Exception e) {

      }
      // 変更後エンティティと変更ユーザID
      String newDataJson = "{}";
      String changeUserId = "";
      try {
        Struct struct = (Struct) sourceRecordChangeValue.get("after");
        Map<String, Object> payload = struct.schema().fields().stream()
            .map(Field::name)
            .filter(fieldName -> struct.get(fieldName) != null)
            .map(fieldName -> Pair.of(fieldName, struct.get(fieldName)))
            .collect(toMap(Pair::getKey, Pair::getValue));

        newDataJson = objectMapper.writeValueAsString(payload);
        if (payload.get("update_user_id") != null) {
          changeUserId = payload.get("update_user_id").toString();
        } else {
          changeUserId = payload.get("create_user_id").toString();
        }
      } catch (Exception e) {

      }

      // 変更履歴 RevisionInfoオブジェクトを作成し、値を設定
      RevisionInfo revisionInfo = new RevisionInfo();

      revisionInfo.setChangeUserId(changeUserId);
      revisionInfo.setChangeDateTime(LocalDateTime.now());
      revisionInfo.setTargetTable(sourceRecord.topic());
      revisionInfo.setPrimaryKey(keyName.toString());
      revisionInfo.setPrimaryValue(keyValue.toString());
      revisionInfo.setOldData(oldDataJson);
      revisionInfo.setNewData(newDataJson);

      if (oldDataJson.equals("{}")) {
        revisionInfo.setChangeType(ChangeType.INSERT.getValue());
      } else {
        if (newDataJson.equals("{}")) {
          revisionInfo.setChangeType(ChangeType.DELETE.getValue());
        } else {
          revisionInfo.setChangeType(ChangeType.UPDATE.getValue());
        }
      }

      revisionInfoRepository.save(revisionInfo);
    } catch (AbstractMethodError e) {
      log.error("DebeziumListener Change Data Capture Failed", e);
      throw e;
    }
  }

  @PostConstruct
  private void start() {
    this.executor.execute(debeziumEngine);
  }

  @PreDestroy
  private void stop() throws IOException {
    if (Objects.nonNull(this.debeziumEngine)) {
      this.debeziumEngine.close();
    }
  }
}

これでエンティティ変更の情報を取得して、変更履歴テーブルに保存することができる。

同じ目的の異なる実装方法

Hibernate Event Listener を用いた変更履歴テーブルの構築
https://qiita.com/teaco/items/9a9cd5a53d22a1416df0

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?