How to Use Databricks to Scale Modern Industrial IoT Analytics - Part 2 - The Databricks Blogの翻訳です。
本書は抄訳であり内容の正確性を保証するものではありません。正確な内容に関しては原文を参照ください。
イントロダクション
シリーズのパート1のAzureにおけるモダンなインダストリアルIoT分析では、ビッグデータユースケースとモダンなIIoT分析のゴールをウォークスルーし、大規模IIoTをデプロイするために企業によって活用されているリアルワールドの再現可能なアーキテクチャを共有し、モダンなIIoT分析に求められるデータレイクの機能のそれぞれに対するDeltaフォーマットのメリットを探索しました。
デプロイメント
リアルワールドのマシンツーマシンのセンサーの読み取りデータをシミュレートするためにAzureのRaspberry PI IoT Simulatorを用い、これらをAzure IoT Hubに送信します。
データ取り込み: データレイクへのAzure IoT Hub
我々のデプロイメントでは、IoTクラウドコンピューティングハブに送信される気候に対するセンサーデータ(風速と風向、気温、湿度)と風力タービンのテレマティクス(角度とRPM)があります。Azure DatabricksはネイティブにIoT Hubから直接ADLS上のDeltaテーブルにデータをストリーミングすることができ、入力に対するデータの処理速度を表示することができます。
# Read directly from IoT Hubs using the EventHubs library for Azure Databricks
iot_stream = (
spark.readStream.format("eventhubs") # Read from IoT Hubs directly
.options(**ehConf) # Use the Event-Hub-enabled connect string
.load() # Load the data
.withColumn('reading', F.from_json(F.col('body').cast('string'), schema)) # Extract the payload from the messages
.select('reading.*', F.to_date('reading.timestamp').alias('date')) # Create a "date" field for partitioning
)
# Split our IoT Hubs stream into separate streams and write them both into their own Delta locations
write_turbine_to_delta = (
iot_stream.filter('temperature is null') # Filter out turbine telemetry from other streams
.select('date','timestamp','deviceId','rpm','angle') # Extract the fields of interest
.writeStream.format('delta') # Write our stream to the Delta format
.partitionBy('date') # Partition our data by Date for performance
.option("checkpointLocation", ROOT_PATH + "/bronze/cp/turbine") # Checkpoint so we can restart streams gracefully
.start(ROOT_PATH + "/bronze/data/turbine_raw") # Stream the data into an ADLS Path
)
Deltaを用いることで、IoT Hubでキャプチャされてから数秒のうちにIoTデータをクエリーすることができます。
%sql
-- We can query the data directly from storage immediately as it streams into Delta
SELECT * FROM delta.`/tmp/iiot/bronze/data/turbine_raw` WHERE deviceid = 'WindTurbine-1'
これで、データ分析のためのIIoTアプリケーション向けのデータを補強、集計する後段のパイプラインを構築することができます。
データ蓄積と処理: Azure DatabricksとDelta Lake
Deltaは、パイプラインを通じてデータ品質と集計度合いが増加するデータエンジニアリングに対して、マルチホップのパイプラインアプローチをサポートしています。我々の時系列データは、以下のブロンズ、シルバー、ゴールドデータレベルを流れていきます。
我々のブロンズからシルバーへのパイプラインは、シンプルにタービンのセンサーデータを1時間間隔で集計します。シルバーのDeltaテーブルに対して集計したレコードをupsertするために、ストリーミングMERGEコマンドを使用します。
# Create functions to merge turbine and weather data into their target Delta tables
def merge_records(incremental, target_path):
incremental.createOrReplaceTempView("incremental")
# MERGE consists of a target table, a source table (incremental),
# a join key to identify matches (deviceid, time_interval), and operations to perform
# (UPDATE, INSERT, DELETE) when a match occurs or not
incremental._jdf.sparkSession().sql(f"""
MERGE INTO turbine_hourly t
USING incremental i
ON i.date=t.date AND i.deviceId = t.deviceid AND i.time_interval = t.time_interval
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
""")
# Perform the streaming merge into our data stream
turbine_stream = (
spark.readStream.format('delta').table('turbine_raw') # Read data as a stream from our source Delta table
.groupBy('deviceId','date',F.window('timestamp','1 hour')) # Aggregate readings to hourly intervals
.agg({"rpm":"avg","angle":"avg"})
.writeStream
.foreachBatch(merge_records) # Pass each micro-batch to a function
.outputMode("update") # Merge works with update mod
.start()
)
シルバーからゴールドへのパイプラン位は、2つのストリーミングをjoinし、時間ごとの気候とタービンの計測値を持つ単一のテーブルにまとめます。
# Read streams from Delta Silver tables
turbine_hourly = spark.readStream.format('delta').option("ignoreChanges", True).table("turbine_hourly")
weather_hourly = spark.readStream.format('delta').option("ignoreChanges", True).table("weather_hourly")
# Perform a streaming join to enrich the data
turbine_enriched = turbine_hourly.join(weather_hourly, ['date','time_interval'])
# Perform a streaming merge into our Gold data stream
merge_gold_stream = (
turbine_enriched.writeStream
.foreachBatch(merge_records)
.start()
)
すぐにゴールドのDeltaテーブルにクエリーを実行することができます。
ノートブックには、モデルトレーニングに使える時間あたりの電力値と日次のメンテナンスログを生成するセルも含まれています。このセルを実行することで、以下のことが行われます。
- turbine_enrichedテーブルに対して1年間過去の読み取り値をバックフィル
- power_outputテーブルで、それぞれのタービンの過去の電力値を生成
- turbine_maintenanceテーブルで、それぞれのタービンの過去のメンテナンスログを生成
これで、資産の使用率を最適化するために、データサイエンスのモデリングに流し込むことができるAzure Data Lake上の高性能かつ信頼できるフォーマットで補強され人工知能(AI)に活用できるデータを手に入れたことになります。
%sql
-- Query all 3 tables together
CREATE OR REPLACE VIEW gold_readings AS
SELECT r.*,
p.power,
m.maintenance as maintenance
FROM turbine_enriched r
JOIN turbine_power p ON (r.date=p.date AND r.time_interval=p.time_interval AND r.deviceid=p.deviceid)
LEFT JOIN turbine_maintenance m ON (r.date=m.date AND r.deviceid=m.deviceid);
SELECT * FROM gold_readings
これで我々のデータエンジニアリングパイプラインは完成です!今やデータはIoT Hubからブロンズ(生データ)、シルバー(集計データ)、そしてゴールド(補強データ)に流れていきます。これで、データに対する分析を行う準備が整いました。
サマリー
まとめると、ここまでで以下のことを達成しました。
- フィールドデバイスからAzureにリアルタイムIIoTデータを取り込み
- 直接データレイクに対して複雑な時系列処理を実施
すべてを結びつけるキーテクノロジーはDelta Lakeです。ADLS上のDeltaは信頼できるストリーミングデータパイプラインと、大規模時系列データに対する非常に高速なデータサイエンス、分析クエリーを提供します。最後になりますが、これによって企業はベストオブブリードのAzureツールに一度のみの書き込み、高頻度アクセスのデータストアを持ち込むことで、真のレイクハウスパターンを導入することができます。
次は?
こちらのノートブックをトライしてみてください。この3パートのトレーニングシリーズでAzure Databricksを学んでみてください。また、こちらのウェビナーに参加してモダンなデータアーキテクチャの構築方法をご覧ください。