概要
本記事では、Databricks と Confluent を連携して活用する方法について整理します。近年、Kafka を中心としたストリーミング基盤の需要が高まり、Confluent Cloud と Databricks はどちらも高いスケーラビリティと豊富な機能を提供するため、組み合わせることで強力なデータ活用基盤を実現できます。
引用元:Databricks Real-Time Apps with Confluent | Databricks Blog
Databricks と Confluent の概要
Databricks は Apache Spark をベースにした高性能なデータ分析プラットフォームです。一方、Confluent は、Apache Kafka を中核とするデータストリーミングプラットフォームを提供しており、Confluent Cloud やオンプレミス向けの Confluent Platform など複数の形態が存在します。
両者を組み合わせることで、リアルタイムのデータ収集・蓄積・分析をスムーズに行うことができるようになります。
Confluent には Kafka の動作を補完する機能として Confluent Schema Registry が用意されています。Kafka で扱うデータ形式(Avro、Protobuf、JSON など)のスキーマを管理できるため、データの進化やスキーマの不整合を防ぐ役割を担います。Kafka を使ってデータを運用する場合は、Schema Registry の連携が非常に重要です。
引用元:Schema Registry for Confluent Platform | Confluent Documentation
Confluent 社の公式ページでも Databricks との連携方法が紹介されており、いわゆるテクニカルパートナーとして協業関係にあります。また、Confluent はフレームワークとして Flink も利用可能なため、Spark Streaming と機能が部分的に競合する面もあります。
上記図では Confluent → Databricks の一方向の連携イメージが示されていますが、実際には Databricks → Confluent の双方向連携が可能です。例えば「Confluent → Databricks → Confluent」というエンドツーエンドのフローを下記記事で紹介しておありますので、興味のある方は下記記事を参考にしてください。
引用元:Confluent と Databricks を連携したリアルタイム分析の実践 #Python - Qiita
Databricks と Confluent の連携方法
連携方法の概要
Databricks 上で Confluent と連携する場合、多くは Kafka フォーマットを利用した構造化ストリーミングでの処理となります。必要なコンポーネントは Databricks Runtime にインストール済みのため、追加の準備は比較的少なくて済むようです。
引用元:Apache Kafka と Azure Databricks を使用したストリーム処理 - Azure Databricks | Microsoft Learn
Confluent Schema Registry と連携する方法もあり、2024年12月30日時点では Avro 形式 と Protobuf 形式のシリアライズ/デシリアライズがサポートされています。
引用元:ストリーミング Avro データの読み取りと書き込み - Azure Databricks | Microsoft Learn
引用元:プロトコル バッファーの読み取りと書き込みを行う - Azure Databricks | Microsoft Learn
Databricks で Confluent からデータを取得(Consume)する方法
Databricks から Confluent のデータを取得する際は、Kafka の設定値をどう指定するかが重要です。Kafka には Consumer Group を設定することが多いですが、Spark 側で状態管理するため、Databricks では推奨されない場合もあります。
引用元:Apache Kafka と Azure Databricks を使用したストリーム処理 - Azure Databricks | Microsoft Learn
Confluent の Topic を読み込むと、以下のように key
と value
が binary 型のスキーマで取得されるため、目的のデータ形式に合わせてデシリアライズが必要です。
引用元:Apache Kafka と Azure Databricks を使用したストリーム処理 - Azure Databricks | Microsoft Learn
Databricks 上で Confluent の Topic データを取得する具体的な方法は、投稿済みの下記記事で紹介しています。
- Confluent Schema Regsitry を利用する方法
- スキーマを手動で指定する方法
Databricks で Confluent へデータを送信(Produce)する方法
Databricks から Confluent へデータを書き込む場合も、Kafka の構造化ストリーミングの下記の設定が必要です。
引用元:Apache Kafka と Azure Databricks を使用したストリーム処理 - Azure Databricks | Microsoft Learn
Confluent へ書き込む前に、Spark データフレーム側で次のスキーマを持たせておく必要があります。value
列のみ必須で、それ以外は任意となります。
引用元:Apache Kafka と Azure Databricks を使用したストリーム処理 - Azure Databricks | Microsoft Learn
具体的な実装方法は、以下の記事で紹介しています。
- Confluent Schema Regsitry を利用する方法
- スキーマを手動で指定する方法
Databricks から Confluent のオブジェクトを管理する方法
Confluent のオブジェクトを管理するには、Confluent 社が提供する confluent-kafka
という Python ライブラリを利用できます。Confluent Schema Registry に対して各種操作を行うサンプルも公開していますので、詳しくは以下をご確認ください。
引用元:Python × Confluent Schema Registry:Confluent Schema Registry 入門 #Kafka - Qiita
Confluent で Databricks と連携する方法
Confluent には Databricks Delta Lake Sink Connector が提供されており、Confluent Cloud 上から Databricks への書き込みが可能です。現時点では AWS 環境のみがサポートされていますが、興味のある方は以下の公式ドキュメントを参照してください。
参考リンク
- Kafka 連携関連
-
Avro 形式関連
- Avro ファイル - Azure Databricks | Microsoft Learn
- ストリーミング Avro データの読み取りと書き込み - Azure Databricks | Microsoft Learn
- to_avro 関数 - Azure Databricks - Databricks SQL | Microsoft Learn
- from_avro 関数 - Azure Databricks - Databricks SQL | Microsoft Learn
- pyspark.sql.avro.functions.from_avro — PySpark master documentation
- pyspark.sql.avro.functions.to_avro — PySpark master documentation
-
JSON 形式関連
- JSON ファイル - Azure Databricks | Microsoft Learn
- to_json 関数 - Azure Databricks - Databricks SQL | Microsoft Learn
- from_json 関数 - Azure Databricks - Databricks SQL | Microsoft Learn
- pyspark.sql.functions.from_json — PySpark master documentation
- pyspark.sql.functions.to_json — PySpark master documentation
- Protobuf 形式関連