LoginSignup
0
0

6つのドメインを1つのドメインに統合いたしました

Posted at

こんにちは。ムシンサコミュニティ開発チームのバックエンドエンジニア、ジョ・ユシンと申します。コミュニティ開発チームは、ムシンサでコミュニティに関連するドメインを担当しており、スナップ、いいね、インフルエンサーマーケティングなどを手掛けております。この度のポストでは、6つのコンテンツドメインを統合された単一ドメインモデルへ移行した事例を共有させていただきたく存じます。

レガシードメインのマイグレーション

レガシードメインを統合された単一モデルとして提供するために、APIで抽象化したり集計して提供することも可能ですが、分散されて異なる形式で保存されたデータには実装上の限界があります。ドメインモデルのライフサイクル管理やフィルタ、推薦機能などの実装を容易にするためには、データの統合が必要です。

Strangler Fig Pattern

The most important reason to consider a strangler fig application over a cut-over rewrite is reduced risk. A strangler fig can give value steadily and the frequent releases allow you to monitor its progress more carefully. Many people still don't consider a strangler fig since they think it will cost more - I'm not convinced about that. Since you can use shorter release cycles with a strangler fig you can avoid a lot of the unnecessary features that cut over rewrites often generate.

サービスの方向性がどのように変わるか分からないため、DBのDMLや単純なバッチ処理を通じたマイグレーションよりも、準リアルタイムに近いレガシードメインデータのマイグレーションパイプラインを構築し、以前のドメインの下位互換性を保ちながら新しい統合ドメインを導入し、統合ドメインモデルの提供範囲を広げていくことを決定いたしました。

Data Migration Pipeline

一般的にデータマイグレーションパイプラインの構成時には、選択肢が大きく(1)Batch(2)Lambda Architecture(3)Kappa Architectureで構成されますが、これらの構成について簡単にご紹介します。

(1) Batch

通常、バックエンドの開発者がすべて経験した配置は、一定時間ごとに動作するシステムを意味します。 配置は、一般的にBatch Jobを具現するものと、Jobを一定の周期で動作させるSchedulerで構成されます。 弊社の配置インフラはSpring BatchとArgo Workflowを使用していたため、高可用性スケジューラと共に配置ジョブにインスタンスリソースを自由に調節して使用できる環境を備えていました。

Screen Shot 2024-06-26 at 4.18.46 PM.png

しかし、配置は上図のように一定の周期で動作するため、以前の配置の動作以降、次の配置動作前までの新規データはリアルタイムで移行されないという問題点がありました。 つまり、実装に慣れて容易ですが、準リアルタイム性の保障が難しいという短所があります。

(2) Lambda Architecture

Lambda Architectureは、以前の配置構成とともにStream Layer(Speed Layer)というものを一緒に維持するアーキテクチャです。 配置が一定周期で動作し、システムの整合性を合わせ、配置動作間の時間ではStream Layerが最新データを処理してくれたリアルタイム性を保障するアーキテクチャです。

Screen Shot 2024-06-26 at 4.19.43 PM.png

一般的にStream Layerに配信されるデータは、コードが発行するドメインイベント、あるいはWAL(Write-Ahead Log、MySQLならbinlog、MongoDBならoplog)データになることがあります。 Lambda Architectureは、配置とストリームの長所をすべて使用するアーキテクチャであり、配置とストリームレイヤーを利用して容易な復元配置、準リアルタイム性の保障が可能ですが、二つのレイヤーをすべて実装および管理しなければならないという短所があります。

(3) Kappa Architecture

Screen Shot 2024-06-26 at 4.20.10 PM.png

Kappa ArchitectureはLambda ArchitectureでBatch Layerを除いてSpeed Layer(以下Stream)だけを残して使用するアーキテクチャで、Lambda Architectureが複数のレイヤーを同時に維持しなければならないという短所を除去したアーキテクチャです。 現在までの実装事例を見ると、ほとんどがKafkaと一緒に実装され、それだけKafkaの利点をうまく活用できるアーキテクチャです。 Kappa ArchitectureでSource Layerは一般的にKafka Clusterです。 Kafka Topicに保存されたデータをプロセッシングしてServing LayerのDatasourceに加工して積載するもので、これは俗に言うKafka Consumerを意味します。 Kafkaで提供するTopic Partitioningを利用して並列処理を容易にすることができ、Consumer Offsetを利用してAt-Least Once実行を保障することができます。 ただし、まだ他のアーキテクチャに比べて実装事例が不足しており、ランニングカーブが高いという欠点があります。

Screen Shot 2024-06-26 at 4.21.41 PM.png

アーキテクチャ決定

私たちのチームは初期に実装経験の多いLambda Architectureを利用してレガシーデータマイグレーションパイプラインを構築しようとしました。 しかし、6つのドメインソースから派生するテーブルの数は非常に多く、Polyglot MSAで多様な言語とサービスからなるソースコードにドメインのルートテーブルと派生テーブルのすべてのデータ変化を追跡するようにドメインイベント発行コードを埋め込むことは不可能に近く、CDC技術を導入することにしました。 以前に述べたように、Kafka Connect Clusterを構築し、ルートテーブルと派生テーブルをすべてKafka Topicに保存すると、Lambda Architectureの配置レイヤーからRDBMSにアクセスして再びリソースを取得することがリソースの無駄だと感じました。 それで、Kafka Streamsを利用してテーブルのCDC Topicのイベントデータを逆正規化してマイグレーションするロジックを作成し、結果的にKappa Architectureと似た形を帯びるようになりました。 そこで、この導入過程でStrimzi Operatorと一緒に悩んだことを紹介します。


Strimzi Operator 紹介

Screen Shot 2024-06-26 at 4.22.50 PM.png
Strimzi Operatorは、クバネティスの上でKafka ClusterおよびKafkaに関連するリソースをCRDを利用してクバネティスオブジェクトとして管理できるようにするオープンソースで、現在CNCFプロジェクトにも登録されています。 この文を作成する基準0.41バージョンまで公開されています。 Kafka Connect Clusterを構築する当時に既存に使用していたAWS DMSサービスもありましたが、メッセージに対する設定、SMTなど機能的制約があったため排除しました。 その後、AWS MSK ConnectサービスとStrimzi Operatorのどちらかで悩みましたが、AWS MSK ConnectはTerraform、Cloudformationを通じて管理が必要であり、コネクタの設定を変更する際に既存のコネクタタスクを直接削除しなければなりませんでした。 クバネティスのYAML形式が開発者が使用するにはランニングカーブが低く容易だと判断し、Strimzi Operatorはコネクタの設定が変更されても再配布を利用してコネクタを再生成することができたため、Strimzi Operatorを使用しようと決めました。

Strimzi Operatorを利用したKafka Connect Cluster構築

Strimzi Operatorを現在運営中のクバネティスクラスターに成功的に構築し、RBACを適切に割り当てたら、下記のCRDを利用してKafka Connect Clusterを構築することができます。

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect

# strimzi KafkaConnectSpec : https://strimzi.io/docs/operators/latest/configuring#type-KafkaConnectSpec-reference
# Debezium : https://debezium.io/documentation/reference/stable/operations/kubernetes.html
metadata:
  name: # Kafka Connect Cluster 이름
  annotations:
    strimzi.io/use-connector-resources: "true"
spec:
  image: # Connector Plugin 이 설치된 이미지
  replicas: # Connect Cluster를 구성하는 파드 개수
  bootstrapServers: # kafka Connect의 상태를 저장할 Kafka Cluster

  resources:
    limits:
      memory: #
    requests:
      cpu: #
      memory: #

  config:
    group.id: #

    # Secret을 Kubernetes Secret을 사용한다면 설정
    config.providers: secrets
    config.providers.secrets.class: io.strimzi.kafka.KubernetesSecretConfigProvider

    # Connect Cluster의 상태 토픽
    offset.storage.topic: offset.storage.topic
    status.storage.topic: status.storage.topic
    config.storage.topic: config.storage.topic
    offset.storage.replication.factor: 2
    status.storage.replication.factor: 2
    config.storage.replication.factor: 2

  # 로깅 설정
  logging:

  template: # Kafka Connect Cluster Pod의 템플릿 설정
    pod:

重要な設定だけを残していますが、思ったより設定が簡潔であることが確認できます。 Kafka Connectで重要な設定は、実際にKafka Connect Clusterを構成するパッドの設定と、Kafka Connectで管理する状態トピック、そしてセキュリティをどのように管理するかについてのものです。 弊社はExternal Secret Operatorを利用してAWS SecretをKubernetes SecretでSyncしてコンテナに注入して使用しており、ファイアウォールのような設定はPod Selectorを利用して注入しています。 これらの設定は、実際にCMやSecretなどでプロビジョニングされ、配布したときにStrimzi Operatorがこれらの設定を利用して、パドとタスクを自動的に生成してくれます。 そして、Kafka Connect Clusterを生成する時、FadのイメージにはKafka Connector Pluginがあらかじめインストールされていなければなりませんが、build Templateという設定を利用してこれも自動化することができます。

Connector生成

私たちはDebeziumを利用してSource Connectorを構成しました。 事前にKafka Connect Clusterコンテナイメージに必要なDebezium Pluginを全てインストールしており、以下がその中の一つのソースコネクタに対する例です。

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector

metadata:
  name: # 커넥터 이름
  labels:
    strimzi.io/cluster: # Kafka Connect Cluster 이름
spec:
  # MySQLConnector
  class: io.debezium.connector.mysql.MySqlConnector

  # WAL 처리시 작업 수는 1개를 권장함
  tasksMax: 1

  # Auto Restart
  autoRestart:
    enabled: true
    maxRestarts: 10

  config:
    # DB 연결 정보
    database.hostname: 
    database.port: 
    database.user: ${secrets:k8s시크릿}
    database.password: ${secrets:k8s시크릿}
    database.server.id: 

    # 데이터베이스 스키마 기록용 토픽. 내부용으로만 사용되며 Consumer가 사용해서는 안됨
    schema.history.internal.kafka.bootstrap.servers: 
    schema.history.internal.kafka.topic: 
    schema.history.internal.producer.security.protocol: 
    schema.history.internal.consumer.security.protocol: 

    # Debezium MySQL 커넥터가 처음 시작되면 데이터베이스의 일관된 초기 스냅샷을 수행
    # 스냅샷 중에 커넥터가 테이블 잠금을 획득하지 못하도록 설정
    # 스냅샷이 실행되는 동안 스키마 변경이 발생하지 않는 경우에만 사용하는 것이 안전
    snapshot.mode: initial
    snapshot.locking.mode: none

    # 스키마 변경에 대해 포함 여부 true 일 경우 토픽이 생성되며 스키마 변경이 기록됨
    include.schema.changes: false

    # 메세지 키에 스키마 정보 포함 여부
    key.converter.schemas.enable: 
    key.converter: 

    # Signal 허용 채널 설정
    signal.data.collection: 

    # 메세지 변경을 위한 SMT(single message transformations)
    transforms: unwrap
    transforms.unwrap.type: io.debezium.transforms.ExtractNewRecordState
    transforms.unwrap.drop.tombstones: false
    transforms.unwrap.delete.tombstone.handling.mode: tombstone

    # CDC 토픽 Prefix
    topic.prefix: 

    # 대상 데이터베이스 목록
    database.include.list: 
    table.include.list: 

    # 토픽 생성 설정
    topic.creation.enable: 
    topic.creation.default.replication.factor: 
    topic.creation.default.partitions: 
    topic.creation.default.cleanup.policy: compact
    topic.creation.default.compression.type: snappy

    # notification
    notification.enabled.channels: sink
    notification.sink.topic.name: 

基本的にSource DBにアクセスできる権限や接続情報が必要であり、トピックに発行されるメッセージがどのような形で積載されるか、またSMT(Single Message Transformation)設定を利用して、希望する形で発行させることができます。 その他にもsnapshot.modeを利用して、コネクタの初期生成時にデータベースのデータをどのように積載するかを決定することができ、コネクタが自動的に生成してくれるトピックの設定もあらかじめ設定することができます。 私たちはCDCトピックを利用してSSOTを具現し、これを利用してデータマイグレーションパイプラインを構築しようとしたため、トピックのcleanup.ploicyをcompactに設定して作成しました。

その他の設定は、以下のリンクをご参照いただければ幸いです。

Strimzi Operator Kafka Connector Spec : Link
Debezium MySQL Connector : Link

Kafka Event Denormalization 具現

Screen Shot 2024-06-26 at 4.25.16 PM.png

Strimzi Operatorを利用して成功的にKafka Connect Clusterを構築し、マイグレーション対象ドメインのテーブルのCDCログを利用して各トピックを生成しました。

Screen Shot 2024-06-26 at 4.25.39 PM.png

実際に私たちが構築したパイプラインを簡単にご紹介しますと、Source TopicをすべてKTableにして、このKTableのChangelogをDeduplication Processorによって同じキーの変化に対して重複除去をした後、このStreamと既存に生成したKTableをJoinして統合されたデータモデルを作り、このモデルをSinkしました。 コパティショニングのためにCDC Topicをもう一度別のTopicにSinkしてパーティション数を流動的に持っていけるように構成しました。

Data Replay

開発中または運用中に移行要件が変更されたり、追加のデータ移行が必要な場合は、最初からもう一度移行が必要です。 上記のようなアーキテクチャでパイプライン全体をデータに再移行するには、次のような方法があります。

  • Debezium Signal機能を利用したIncremental Snapshotを利用
  • Streams Application Consumer Offset 初期化, Application ID 変更
  • Rich Data StreamのJoin Triggerとして利用するDeduplication Processor以降のトピックについて、希望するソースストリームのルートストリームに再発行

私たちは上記の3つの過程をすべて具現しました。 弊社のサービスはユーザートラフィックをリアルタイムで受信し、リアルタイムデータに対する処理保障も必要なので、ストリームズアプリケーションと最終Sink Consumer/ConnectorのConsumer Lagが発生しない速度で調節し、全体データを再マイグレーションできるように全体データを再実行する過程で適切な設定と速度を付与しました。 もう一つの方法としては、このようなパイプラインを二つのバージョンに維持する方法がありますが、Sink対象であるRDBMSの限界とコードを二元化させずに解決する方法が思い浮かばず、まず保留しておきました。 現在、これらをよりスムーズにできるようにするチケットがKafka Streamsに発行されています。

この他にも詳細は以下の通りです

  • KotlinのExtension Function文法を利用すれば、Kafka Streamsコードを可読性高く作成することができます
  • Source Datasourceの実際のデータサイズよりも、パイプラインで使用されるデータのサイズが大きいです
  • Kafka Connect Cluster のログはVector を用いて収集します

Refernce

  • Confluent : Event Data Normalization vs Denromalization
  • 무신사 : Kafka Streams를 활용한 실시간 이상 로그인 감지 시스템 도입하기
  • Strimzi, Debezium, Apach Kafka Streams
0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0