Declarative Streaming Data Pipelines with Delta Live Tables and Apache Kafka - The Databricks Blogの翻訳です。
本書は抄訳であり内容の正確性を保証するものではありません。正確な内容に関しては原文を参照ください。
Delta Live Tables (DLT)は、バッチやストリーミングデータに対する高信頼なデータパイプラインを作成し、大規模インフラストラクチャを完全に管理するためにシンプルな宣言型のアプローチを使用する初めてのETLフレームワークです。多くのユースケースでは、アクション可能な洞察をニアリアルタイムのデータから導き出す必要があります。Delta Live Tablesは、Apache Kafka、AWS Kinesis、Confluent Cloud、Amazon MSK、Azure Event Hubsのようなイベントバスからデータを直接取り込むことで、低レーテンシーを必要とするそのようなユースケースをサポートするために低レーテンシーのストリーミングデータパイプラインを実現します。
本書では、ストリームを取り込むために必要なPythonコードを提供しながらも、Apache KafkaとDLTを用いてウォークスルーを行います。推奨のシステムアーキテクチャが説明され、途中では検討の価値があるDLTの関連設定を探索していきます。
ストリーミングプラットフォーム
イベントバスやメッセージバスは、メッセージのプロデューサーをコンシューマーから分離します。よくあるストリーミングのユースケースには、すべてのユーザーインタラクションがApache Kafkaのイベントとして格納される、ウェブサイトのナビゲーションによるクリックスルーのコレクションがあります。Kafkaからのイベントストリームは、リアルタイムのストリーミングデータ分析に活用されます。複数のメッセージコンシューマーは、Kafkaから同じデータを読み込むことができ、聴衆の興味やコンバージョン率、離脱の理由を学ぶためにデータを活用することができます。また、ユーザーインタラクションによるリアルタイムのストリーミングイベントは、多くの場合、請求データベースに格納される実際の購入と関連づけられる必要があります。
Apache Kafka
Apache Kafkaは、オープンソースのイベントバスです。Kafkaは、特定の期間でメッセージがバッファされる、追記のみのイベントの分散ログであるトピックの概念を使用しています。Kafkaのメッセージは消費されても削除されませんが、これらは永久に保存されるわけでもありません。Kafkaのメッセージ保持期間はトピックごとに設定され、デフォルトは7日間です。有効期限が切れたメッセージは最終的には削除されます。
本書では、Apache Kafkaにフォーカスします。しかし、ここで議論されるコンセプトは、多くのイベントバスやメッセージングシステムにも適用することができます。
ストリーミングデータパイプライン
データフローパイプラインにおいては、Delta Live Tablesとそれらの依存関係は標準的なSQLのCreate Table As Select (CTAS)文とDLTのキーワードlive
で宣言することができます。
PythonでDLTを開発する際には、Delta Liveテーブルを作成するために@dlt.table
デコレーターを使用します。パイプラインのデータ品質を保証するために、DLTは不正なレコードを持つパイプラインの挙動を定義するシンプルなSQLの制約であるエクスペクテーションを使用します。
ストリーミングワークロードは、多くの場合予期できないデータボリュームを伴うので、Databricksでは、不要なインフラストラクチャをシャットダウンすることでコストを削減しつつも、全体的なエンドツーエンドのレーテンシーを最小化するために、データフローに対して強化オートスケーリングを適用します。
Delta Live Tablesは、それぞれのパイプラインの実行において正確に一度のみ、適切な順序で完全に再計算されます。
逆に、ストリーミングDelta Live Tablesはステートフルであり、最後のパイプライン実行以降に追加されたデータのみをインクリメンタルに処理します。ストリーミングライブテーブルを定義するクエリーが変更された際、新たなクエリーに基づいて新規のデータは処理されますが、既存データは再計算されません。ストリーミングライブテーブルは常にストリーミングソースを使用し、Kafka、Kinesis、Auto Loaderのような追加のみのストリームに対してのみ動作します。ストリーミングDLTはSpark構造化ストリーミングをベースとしています。
例えば、非常に大量のデータを取り扱うワークロードと低レーテンシー要件というように、複数のストリーミングパイプラインをチェーンすることができます。
ストリーミングエンジンからの直接取り込み
Pythonで記述されたDelta Live Tableは、Spark構造化ストリーミングを用いることでKafkaのようなイベントバスから直接データを取り込むことができます。コンプライアンスの問題を回避し、コストを削減するためにKafkaのトピックの保持期間を短くすることで、Deltaが提供する安価かつ弾力性のあるガバナンスの効いたストレージのメリットを享受することができます。
パイプラインにおける第一歩として、ブロンズ(生データ)テーブルにデータを取り込み、重要なデータを削除してしまう様な複雑な変換処理を避けることをお勧めします。他のDeltaテーブルと同じ様に、ブロンズテーブルは履歴を保持し、GDPRや他のコンプライアンスタスクを実行することを可能とします。
Apache Kafkaからのストリーミングデータの取り込み
PythonでDLTパイプラインを記述する際には、DLTテーブルを作成するために@dlt.table
のアノテーションを使用します。PythonでストリーミングDLTであることを示すための特別な属性はありません。ストリームにアクセスするには、シンプルにspark.readStream()
を使用します。以下にKafkaのトピックを消費するkafka_bronze
という名前のDLTテーブルを作成するサンプルコードを示します。
import dlt
from pyspark.sql.functions import *
from pyspark.sql.types import *
TOPIC = "tracker-events"
KAFKA_BROKER = spark.conf.get("KAFKA_SERVER")
# subscribe to TOPIC at KAFKA_BROKER
raw_kafka_events = (spark.readStream
.format("kafka")
.option("subscribe", TOPIC)
.option("kafka.bootstrap.servers", KAFKA_BROKER)
.option("startingOffsets", "earliest")
.load()
)
@dlt.table(table_properties={"pipelines.reset.allowed":"false"})
def kafka_bronze():
return raw_kafka_events
pipelines.reset.allowed
Deltaはデータを永久的に保持するのとは異なり、イベントバスは通常、特定の期間の後にメッセージを破棄することに注意してください。
このため、DLTパイプラインのフルリフレッシュを実行する際に、Kafkaのソースデータがすでに削除されているという状況が起こり得ます。この場合、メッセージングプラットフォームからすべての履歴データがバックフィルされない場合があり、DLTテーブルでデータの欠損が起こり得ます。データ削除を防ぐためには、以下のDLTテーブルのプロパティを使用します。
pipelines.reset.allowed=false
pipelines.reset.allowed
をfalse
に設定することで、テーブルのリフレッシュをガードしますが、テーブルに流れ込む新規データやテーブルへのインクリメンタルの書き込みは許可します。
チェックポイント
あなたが経験豊富なSpark構造化ストリーミング開発者であれば、上述のコードでチェックポイント作成が無いことに気づいたでしょう。Spark構造化ストリーミングにおいては、どのデータが処理に成功のか失敗したのかに関する進捗情報を永続化するためにチェックポイントの作成が必要となり、失敗したクエリーを失敗した時点から再開するためにこのメタデータを使用します。
Spark構造化ストリーミングでは一度のみの処理を保障するために障害復旧のためにチェックポイントが必要となりますが、DLTは手動の設定や明示的なチェックポイントを必要とすることなしに、自動で状態をハンドリングします。
DLTパイプラインにおけるSQLとPythonの混在
複数のノートブックからDLTパイプラインを構成することができますが、一つのDLTノートブックは、全体がSQLかPythonで記述される必要があります(単一のノートブックでセルごとに異なる言語を記述できるDatabricksノートブックとは異なります)。
このため、SQLが好みであるならば、PythonのノートブックでApache Kafkaからデータ取り込みのコーディングを行い、別のSQLノートブックでデータパイプラインの変換ロジックを記述することになります。
スキーママッピング
メッセージングプラットフォームからデータを読み取る際、データストリームは不明瞭でありスキーマを提供する必要があります。
以下のPythonサンプルでは、フィットネストラッカーからのイベントのスキーマ定義と、どのようにKafkaメッセージの値の部分がこのスキーマにマッピングされるのかを示しています。
event_schema = StructType([ \
StructField("time", TimestampType(),True) , \
StructField("version", StringType(),True), \
StructField("model", StringType(),True) , \
StructField("heart_bpm", IntegerType(),True), \
StructField("kcal", IntegerType(),True) \
])
# temporary table, visible in pipeline but not in data browser,
# cannot be queried interactively
@dlt.table(comment="real schema for Kakfa payload",
temporary=True)
def kafka_silver():
return (
# kafka streams are (timestamp,value)
# value contains the kafka payload
dlt.read_stream("kafka_bronze")
.select(col("timestamp"),from_json(col("value")
.cast("string"), event_schema).alias("event"))
.select("timestamp", "event.*")
)
メリット
DLTでメッセージブローカーから直接ストリーミングデータを読み込むことで、アーキテクチャ上の複雑性を最小化し、データはメッセージングブローカーから直接ストリーミングされ、中間ステップを含んでいないので、低レーテンシーを実現することができます。
中間クラウドオブジェクトストレージを用いたストリーミング取り込み
特定のユースケースにおいては、Kafkaコネクターを用いて、中間層としてクラウドオブジェクトにストリーミングデータを保存し、Apache Kafkaからデータをオフロードしたいと考えるかもしれません。Databricksワークスペースでは、クラウドベンダー固有のオブジェクトストアは、クラウドに依存しないフォルダーとしてDatabricksファイルシステム(DBFS)にマッピングされます。データがオフロードされたら、Databricksのオートローダーでファイルを取り込むことができます。
Auto Loaderは1行のSQLコードでデータを取り込むことができます。DLTテーブルへのJSONファイルの取り込みの構文を以下に示します(可読性のために2行で記述しています)。
-- INGEST with Auto Loader
create or replace streaming live table raw
as select * FROM cloud_files("dbfs:/data/twitter", "json")
Auto Loader自体がストリーミングデータソースであり、新たに到着したファイルは1回のみ処理され、当該テーブルにインクリメンタルにデータが取り込まれることを示すキーワードstreaming
がraw
テーブルに指定されていることに注意してください。
ストリーミングデータをクラウドオブジェクトストアにオフロードすることで、あなたのシステムアーキテクチャに追加のステップを導入することになるので、エンドツーエンドのレーテンシーを引き上げ、追加のストレージコストを発生させます。クラウドオブジェクトストアにイベントデータを書き込むKafkaコネクターは管理する必要がるために、オペレーション上の複雑性を増加させることに注意してください。
このため、上述した様にSpark構造化ストリーミングを用いてDLTからイベントバスのデータに直接アクセスすることをベストプラクティスとしてお勧めします。
他のイベントバスやメッセージングシステム
本書はApache Kafkaにフォーカスしています。しかし、議論されているコンセプトは他のイベントバスやメッセージングシステムに適用することができます。DLTでは、Databricksランタイムが直接サポートしているデータソースをサポートしています。
Amazon Kinesis
Kinesisでは、完全に管理されたサーバレスストリームにメッセージを書き込みます。Kafkaと同じ様に、Kinesisはメッセージの永続化は行いません。Kinesisのデフォルトのメッセージ保持期間は1日です。
Amazon Kinesisを使用する際、上述したストリーミング取り込みのPythonコードでformat("kafka")
をformat("kinesis")
で置き換え、option()
を用いてAmazon Kinesis固有の設定を追加します。詳細に関しては、Spark構造化ストリーミングのKinesis Integrationのセクションを参照ください。
Azure Event Hubs
Azure Event Hubsの設定に関しては、MicrosoftのドキュメントとDelta Live Tables recipes: Consuming from Azure Event Hubsをご覧ください。
サマリー
DLTはETLのT
以上のものです。DLTを用いることで、任意のクラウド上のDatabricksレイクハウスプラットフォーム上で、データ品質を担保しつつストリーミングとバッチソースから容易に取り込むことができます。
PythonのDLTノートブックを用いて、Kafkaブローカーに直接接続することで、Apache Kafkaからのデータを取り込むことができます。Kafkaのストリーミングレイヤーのソースデータが破棄されたとしても、パイプラインのフルリフレッシュによってデータの損失を防ぐことができます。
使い始める
Databricksを利用されているのであれば、スタートガイドに従ってください。このGAリリースに何が含まれているのかに関してはリリースノートをご覧ください。Databricksを利用されていないのであれば、フリートライアルにサインアップいただき、こちらのDLT価格の詳細を確認ください。
Data + AI Summit 2022に関して議論しているDatabricks Communityに参加ください。
さらには、サミットのDive Deeper into Data Engineeringをご覧ください。このセッションでは、Twitterのライブストリーム、Auto Loader、SQLにおけるDelta Live Tables、Hugging Faceの感情分析を用いたストリーミングデータのサンプルコードをウォークスルーしています。