1.概要
Databricksのストリーム処理を机上で理解したものの、なかなか実際に手を動かす機会がないため、手軽なセンシングデバイスであるRaspberry Pi Zero5台で取得した自宅の温度データをAzure Event Hubs経由でAzure Databricksに流し、Lakeflow Spark Declarative PipelinesでのリアルタイムETL処理の後にDatabirkcs Appsの超簡易デジタルツインでリアルタイム表示することで、実際に手を動かしながら理解を深める。
- 完成したアプリ
- Raspberry Pi Zeroの端末たち
1.1本記事で取り扱うこと
- Raspberry Pi Zeroを始点とした、Databricksでのストリーム処理の実装方法
- Lakeflow Spark Declarative Pipelinesによるリアルタイム変換の簡易な記述方法
- Databricks AppsやAzure Event Hubsの操作
※掲載のコードについては公式ドキュメントや生成AIも活用しつつ、理解しながら編集した。
1.2本記事で取り扱わないこと
- Databricks Appsの具体的なコーディング(今回はCursorのバイブコーディングで実装)
- 運用、保守性、可用性およびエラーハンドリングに関すること
- Raspberry Piの基本的なセットアップ
2.全体構成
3.Raspberry Pi Zeroで作成する温度センシングデバイス
今回は身近に入手できる機材を中心とした以下を利用する。
-
Raspberry Pi Zero 2 WH電源
3.1 センサーの接続
Raspberry Pi Zeroのピンヘッダは以下画像の配置となっている。

これに対してBMP280とRaspberry Pi Zeroのピンヘッダを次のように接続する。
| BMP280のピン | Raspberry Pi Zeroのピン | 役割 |
|---|---|---|
| VDD | 1 (3.3V PWR) | 電源3.3V |
| SDA | 3 (GPIO2 / SDA1) | I2Cデータ |
| GND | 6 (GND) | グラウンド |
| SCL | 5 (GPIO3 / SCL1) | I2Cクロック |
3.2 センシングプログラム
-
board.I2C()
→ マイコンボード上の I2C ピン(SCL/SDA)を初期化する。 -
Adafruit_BMP280_I2C(i2c, address=0x77)
→ センサーを I2C 経由で制御するためのインスタンスを作成する。
一部のモジュールではデフォルトアドレスが 0x76の場合もあるため、必要に応じて変更する。 temperature = float(bmp.temperature)-
pressure = float(bmp.pressure)
→ センサーデータをfloat値で取得する。
from adafruit_bmp280 import Adafruit_BMP280_I2C
# BMP280 初期化
i2c = board.I2C()
bmp = Adafruit_BMP280_I2C(i2c, address=0x77)
temperature = float(bmp.temperature)
pressure = float(bmp.pressure)
そのため、以下のようにループ処理でセンサーデータを取得して、ペイロードを組み立てkafkaに流すことで、一定間隔でストリームにデータを流すことができる。
import time
import json
import board
import configparser
import datetime
from zoneinfo import ZoneInfo
from kafka import KafkaProducer
# Kafka Producer 初期化
producer = KafkaProducer(
bootstrap_servers=BOOTSTRAP_SERVERS,
security_protocol="SASL_SSL",
sasl_mechanism="PLAIN",
sasl_plain_username=SASL_USERNAME,
sasl_plain_password=SASL_PASSWORD,
value_serializer=lambda v: json.dumps(v).encode("utf-8"),
)
# 一定間隔ごとにkafkaにデータを流す処理
while True:
temperature = float(bmp.temperature)
pressure = float(bmp.pressure)
timestamp = int(time.time() * 1000)
dt_jst = dt_jst = datetime.datetime.fromtimestamp(timestamp / 1000, tz=ZoneInfo("Asia/Tokyo"))
payload = {
"deviceId": DEVICE_ID,
"temperature": temperature,
"pressure": pressure,
"timestamp": timestamp,
"datetime": dt_jst.isoformat()
}
producer.send(KAFKA_TOPIC, value=payload)
producer.flush()
print("sent:", payload)
time.sleep(SEND_INTERVAL_SEC)
センシングのためのプログラムの全体像は以下の通り。
このプログラムをサービス化することで、Raspberry Pi Zeroの電源を入れるだけで実行されるようにする。
import time
import json
import board
import configparser
import datetime
from zoneinfo import ZoneInfo
from adafruit_bmp280 import Adafruit_BMP280_I2C
from kafka import KafkaProducer
# configparserの宣言とiniファイルの読み込み
config_ini = configparser.ConfigParser()
config_ini.read('/home/ryota/environment_monitor-main/config.ini', encoding='utf-8')
# Azure Event Hubs (Kafka) 接続情報
BOOTSTRAP_SERVERS = config_ini['DEFAULT']['BOOTSTRAP_SERVERS']
# 作成済みのEvent Hub名
KAFKA_TOPIC = config_ini['DEFAULT']['KAFKA_TOPIC']
SASL_USERNAME = config_ini['DEFAULT']['SASL_USERNAME']
SASL_PASSWORD = config_ini['DEFAULT']['SASL_PASSWORD']
DEVICE_ID = config_ini['DEFAULT']['DEVICE_ID']
# 送信間隔(秒)
SEND_INTERVAL_SEC = int(config_ini['DEFAULT']['SEND_INTERVAL_SEC'])
# BMP280 初期化
i2c = board.I2C()
bmp = Adafruit_BMP280_I2C(i2c, address=0x77)
# Kafka Producer 初期化
producer = KafkaProducer(
bootstrap_servers=BOOTSTRAP_SERVERS,
security_protocol="SASL_SSL",
sasl_mechanism="PLAIN",
sasl_plain_username=SASL_USERNAME,
sasl_plain_password=SASL_PASSWORD,
value_serializer=lambda v: json.dumps(v).encode("utf-8"),
)
print("Start sending BMP280 data to Azure Event Hubs (Kafka)...")
try:
while True:
temperature = float(bmp.temperature)
pressure = float(bmp.pressure)
timestamp = int(time.time() * 1000)
dt_jst = dt_jst = datetime.datetime.fromtimestamp(timestamp / 1000, tz=ZoneInfo("Asia/Tokyo"))
payload = {
"deviceId": DEVICE_ID,
"temperature": temperature,
"pressure": pressure,
"timestamp": timestamp,
"datetime": dt_jst.isoformat()
}
producer.send(KAFKA_TOPIC, value=payload)
producer.flush()
print("sent:", payload)
time.sleep(SEND_INTERVAL_SEC)
except KeyboardInterrupt:
print("Stopping...")
finally:
producer.close()
[DEFAULT]
BOOTSTRAP_SERVERS = irucastream.servicebus.windows.net:9093
KAFKA_TOPIC = bmp280
SASL_USERNAME = $ConnectionString
SASL_PASSWORD = Endpoint=sb://irucastream.servicebus.windows.net/・・・・
DEVICE_ID = temp-sensor1
# 今回は一旦遅めの20秒更新とした
SEND_INTERVAL_SEC = 20
4.Azure Event Hubsの構成
今回は以下の画像の設定でデプロイした。
※スループットユニットは1あたり次のキャパシティがある。
- 書き込み : 1 秒あたり最大約 1 MB または 1,000 イベントのどちらかに達するまで
- 読み取り : 1 秒あたり最大約 2 MB または 4,096 イベントのどちらかに達するまで
Event Hubsの[設定] > [エンティティ] > [Event Hubs]から[+イベントハブ]を選択するとEvent Hubが作成できる。今回は名前をbmp280としている。
bmp280のEvent Hubの概要画面からデータが流れていることを確認できる。

bmp280のEvent Hubのメニューから[Data Explorer]を選択すると実際に流れているデータを確認できる。

5.Lakeflow Spark Declarative PipelinesでのリアルタイムIngest/ETL処理
Lakeflow Spark Declarative PipelinesはバッチとストリーミングのETLを、SQL/Pythonで 「こうなってほしい状態」だけを書けば、実行順やリトライなどは自動でやってくれるフレームワーク。
参考:Lakeflow Spark 宣言型パイプラインの概念
今回はこれにリアルタイムIngest/ETLを宣言しながら変換パイプラインを構築する。
5.1Lakeflow Spark Declarative Pipelinesの作成
Databricksワークスペースのメニューから[ジョブとパイプライン]を選択し、[ETLパイプライン]を選択する。

パイプラインとPythonファイルに名前をつける。このPythonファイルに定義を記載していく。

5.2Lakeflow Spark Declarative PipelinesによるEvent Hubsの取り込み
以下のドキュメントに記載されたコードを元にEvent Hubsのストリームをストリーミングテーブルに記録する。
参考:Azure Event Hubs をパイプライン データ ソースとして使用する
以下でKafkaメッセージのvalueをJSONとしてパースし、bmp280_schemaに沿ってセンサー値のカラムに展開している。@dp.create_tableデコレータを使ってbmp280_stream関数の戻り値をtemp_sensor.digital_twin.bmp280_bronze_streamというBronze層のDeltaテーブルとして宣言的に定義する。中では spark.readStream.format("kafka")でKafkaからBMP280のセンサーのデータをストリーミング取得し、そのままDeltaテーブルに永続化することで、後続のSilver/Goldテーブルから参照できる生データのストリームを構成しする。
※サンプルコードにはデータ検査の定義expectがあるが、今回は割愛するためコメントアウトする。
# JSONを分解したレコードをテーブルに登録
def parse_bmp280(df):
return (
df
.withColumn("json_str", col("value").cast("string"))
.withColumn("parsed_records", from_json(col("json_str"), bmp280_schema))
.select("parsed_records.*")
)
# ストリーミングで流れてくるデータの結果をDeltaテーブル(Bronze)として保存して管理する
@dp.create_table(
# テーブル名
name="temp_sensor.digital_twin.bmp280_bronze_stream",
comment="BMP280 sensor raw stream",
table_properties={
"quality": "bronze",
"pipelines.reset.allowed": "false"
},
)
#@dp.expect("valid_device_id", "deviceId IS NOT NULL")
#@dp.expect("valid_temperature", "temperature IS NOT NULL")
def bmp280_stream():
return (
spark.readStream
.format("kafka")
.options(**KAFKA_OPTIONS)
.load()
.transform(parse_bmp280)
)
5.3 Silver/Goldマテリアライズドビューの宣言
続けて1日分のデータだけを取得したSilverテーブル(マテリアライズドビュー)temp_sensor.digital_twin.bmp280_silver_1dayと各Raspberry Pi Zeroの最新のデータだけをGoldテーブル(マテリアライズドビュー)temp_sensor.digital_twin.bmp280_gold_latestを定義していく。
各レイヤーの関数部分でレイヤー内での操作を定義する。
# Blonzeテーブルより、1日分のレコードを記録したSilverテーブルを作成する
@dp.create_table(
name="temp_sensor.digital_twin.bmp280_silver_1day",
comment="BMP280 sensor last 24 hours",
table_properties={
"quality": "silver",
"pipelines.reset.allowed": "false"
}
)
def bmp280_silver_1day():
# Bronze テーブル(ストリーミング)を読み込み
bronze_df = dp.read("temp_sensor.digital_twin.bmp280_bronze_stream")
# 現在時刻から24時間前を計算
threshold = current_timestamp() - expr("INTERVAL 24 HOURS")
# datetime が直近24時間のレコードだけを残す
filtered_df = bronze_df.filter(col("datetime") >= threshold)
return filtered_df
# Silverテーブルより最新のデータをGoldテーブルとして保管する
@dp.create_table(
name="temp_sensor.digital_twin.bmp280_gold_latest",
comment="BMP280 sensor latest per device",
table_properties={
"quality": "silver",
"pipelines.reset.allowed": "false"
}
)
def bmp280_latest():
# Silver テーブルをソースとして読み込む
bronze_df = dp.read("temp_sensor.digital_twin.bmp280_silver_1day")
# deviceId 単位で datetime が最大の行だけを残す
w = Window.partitionBy("deviceId").orderBy(col("datetime").desc())
latest_df = (
bronze_df
.withColumn("rn", row_number().over(w))
.filter(col("rn") == 1)
.drop("rn")
)
return latest_df
5.4 Lakeflow Spark Declarative Pipelinesの宣言の全体像
ここまでの宣言をまとめると、全体像は以下の通りとなる。
※今回は時間の都合上接続文字列もベタ書きする。
from pyspark import pipelines as dp
import pyspark.sql.types as T
from pyspark.sql.functions import *
from pyspark.sql.window import Window
spark.conf.set("spark.sql.session.timeZone", "Asia/Tokyo")
# Event Hubs configuration
EH_NAMESPACE = "irucastream"
EH_NAME = "bmp280"
# EventHubsの接続文字列(本来はsecretsから取得するのが推奨)
EH_CONN_STR = (
"Endpoint=sb://irucastream.servicebus.windows.net/;・・・"
)
# Kafka Consumer configuration
KAFKA_OPTIONS = {
"kafka.bootstrap.servers" : f"{EH_NAMESPACE}.servicebus.windows.net:9093",
"subscribe" : EH_NAME,
"kafka.sasl.mechanism" : "PLAIN",
"kafka.security.protocol" : "SASL_SSL",
"kafka.sasl.jaas.config" : (
"kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule "
f"required username=\"$ConnectionString\" password=\"{EH_CONN_STR}\";"
),
"failOnDataLoss": "false",
"startingOffsets": "earliest"
}
# BMP280センサーから届くJSONのスキーマを定義
bmp280_schema = T.StructType([
T.StructField("deviceId", T.StringType(), True),
T.StructField("temperature",T.DoubleType(), True),
T.StructField("pressure", T.DoubleType(), True),
T.StructField("timestamp", T.LongType(), True),
T.StructField("datetime", T.TimestampType(), True),
])
# JSONを分解したレコードをテーブルに登録
def parse_bmp280(df):
return (
df
.withColumn("json_str", col("value").cast("string"))
.withColumn("parsed_records", from_json(col("json_str"), bmp280_schema))
.select("parsed_records.*")
)
# ストリーミングで流れてくるデータの結果をDeltaテーブル(Bronze)として保存して管理する
@dp.create_table(
# テーブル名
name="temp_sensor.digital_twin.bmp280_bronze_stream",
comment="BMP280 sensor raw stream",
table_properties={
"quality": "bronze",
"pipelines.reset.allowed": "false"
},
)
#@dp.expect("valid_device_id", "deviceId IS NOT NULL")
#@dp.expect("valid_temperature", "temperature IS NOT NULL")
def bmp280_stream():
return (
spark.readStream
.format("kafka")
.options(**KAFKA_OPTIONS)
.load()
.transform(parse_bmp280)
)
# Blonzeテーブルより、1日分のレコードを記録したSilverテーブルを作成する
@dp.create_table(
name="temp_sensor.digital_twin.bmp280_silver_1day",
comment="BMP280 sensor last 24 hours",
table_properties={
"quality": "silver",
"pipelines.reset.allowed": "false"
}
)
def bmp280_silver_1day():
# Bronze テーブル(ストリーミング)を読み込み
bronze_df = dp.read("temp_sensor.digital_twin.bmp280_bronze_stream")
# 現在時刻から24時間前を計算
threshold = current_timestamp() - expr("INTERVAL 24 HOURS")
# datetime が直近24時間のレコードだけを残す
filtered_df = bronze_df.filter(col("datetime") >= threshold)
return filtered_df
# Silverテーブルより最新のデータをGoldテーブルとして保管する
@dp.create_table(
name="temp_sensor.digital_twin.bmp280_gold_latest",
comment="BMP280 sensor latest per device",
table_properties={
"quality": "silver",
"pipelines.reset.allowed": "false"
}
)
def bmp280_latest():
# Silver テーブルをソースとして読み込む
bronze_df = dp.read("temp_sensor.digital_twin.bmp280_silver_1day")
# deviceId 単位で datetime が最大の行だけを残す
w = Window.partitionBy("deviceId").orderBy(col("datetime").desc())
latest_df = (
bronze_df
.withColumn("rn", row_number().over(w))
.filter(col("rn") == 1)
.drop("rn")
)
return latest_df
5.5パイプラインの構築
宣言が完成したら右上の[開始]ボタンを選択する。
右側のパイプライングラフが動作していることがわかる

カタログにもBronzeのストリーミングテーブルとSilver/Goldのマテリアライズドビューが作成された

Goldテーブルを読み取ってみると、Raspberry Pi Zeroのホストごとに最新のデータが受信できているのがわかる。

6.Databricks Appsによるデータの可視化
5章で完成したテーブルを元にDatabricks Appsでデータの可視化を行う。
[コンピュート]>[アプリ]>[アプリを作成]から空のアプリを作成する。
※ソースコードはCursorのバイブコーディングで作成したため、今回は割愛する。
コードが完成したらDatabricks Appsの画面下部にある「今後の編集内容をDatabricksに同期します」として記載されているコマンドを実行して、ソースコードを同期する。

ソースコードを同期後、画面上部の[デプロイ]を選択してアプリをデプロイ後、アプリのURLを選択してアプリを表示する。

今回はリアルタイムの各部屋の温度と24時間の温度グラフを表示した。

6.まとめ
このように、センシングデータのストリーム処理もDatabricksのサービス群、Azure Event HubsおよびRaspberry Pi Zeroを利用することで簡単に手を動かしながら学べることが確認できた。




