前説
この方の記事を読んでみて、RaspberryPiで測定した温湿度データをDatabricksのUnity Catalogカタログ上で加工し、最終的にはDatabricksのダッシュボードで可視化したいと思いました。しかし、私はAzureもAzure Databricksも経験が浅く、やってみるとなかなか難しい事が多かったため、成功までの過程・学んだことを残しておきたく記事にしました。同じようなことをしたい方の参考になると幸いです。
この記事では、RaspberryPiのセンサーデータ → (Pythonスクリプトで転送)→ ADLS コンテナ → Azure Databricksのワークスペースでデータ参照、までを実現します。
Azure初心者の方でもこの手順通りにすれば上記が実現できるかと思います。
1. 外部ボリューム用のコンテナ準備
Unity Catalogのロケーションとしてセットアップされたストレージアカウントに、コンテナ(例: temp-humidity-external
)を1つ作成します。このコンテナはRaspberryPiから送られた温湿度データの置き場になります。
もし、Unity Catalogのロケーションとしてストレージアカウントを設定する方法がわからない方はこの前回の私の記事を参考にしてみてください。
ストレージアカウントのShared Access Signature (SAS) を控えておきます。
データを置きたいストレージアカウントの画面 > セキュリティとネットワーク > Shared Access Signatureを押下し、下記設定値でSASキーを生成します。
- 使用できるリソースの種類:コンテナ、オブジェクト
- アクセス許可:書き込み
ただ、今後いちいちSASキーを定期的に作り変えるのは手間ですし、セキュリティ的にもよろしくないので、運用面で検討の余地はあります。
2. RaspberryPiからADLSにテストアップロード(逐次実行)
ADLSストレージのコンテナーにPythonコードでファイルをアップロードします。
pip install azure-storage-blob
import os, datetime
from azure.storage.blob import BlobClient
# SAS URLにコンテナ名を含める
STORAGE_ACCOUNT = "ストレージアカウント名"
CONTAINER_NAME = "{コンテナ名}"
LOCAL_FILE_PATH = "{LOCAL_FILE_PATH}"
SAS_URL = f"https://{ストレージアカウント名}.blob.core.windows.net/{CONTAINER_NAME}?sv=xxxxx"
def upload_csv(local_path: str):
# blob name like 2025-04-05_12-30-00.csv
ts = datetime.datetime.utcnow().strftime("temp_humidity_%Y-%m-%d_%H-%M-%S")
# blob_name = f"{directory_path}/{ts}.csv"
blob_name = f"{ts}.csv"
# 正しいBlob URLを構築
blob_url = f"https://{STORAGE_ACCOUNT}.blob.core.windows.net/{CONTAINER_NAME}/{blob_name}?{SAS_URL.split('?')[1]}"
blob = BlobClient.from_blob_url(blob_url)
with open(local_path, "rb") as f:
blob.upload_blob(f, overwrite=False) # 10 MB file ≈ 1 s on home fibre
print("uploaded", blob.url)
if __name__ == "__main__":
upload_csv(LOCAL_FILE_PATH)
これでアップロードができたら、PythonスクリプトでのADLSへのアップロードは問題ないので、次で定期実行を行います。
3. RaspberryPiからADLSにテストアップロード(定期実行)
import time
import csv
import os, sys
from datetime import datetime, timedelta
# 別ディレクトリのupload_to_adls.pyをインポートするために
# プロジェクトのルートディレクトリをsys.pathに追加
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from upload import upload_to_adls # ディレクトリ構成によって変更してください。
measurement_interval_seconds = 120.0 # 計測間隔時間
csv_rotation_interval_minutes = 2880 # 書き込みファイル再作成間隔
device0 = "/sys/bus/iio/devices/iio:device0"
csv_directory = os.path.join(os.getcwd(), 'local_raw_data')
# ファイルの先頭行を読み取って整数値を返す関数
def readFirstLine(filename):
try:
with open(filename, "rt") as f:
value = int(f.readline())
return True, value
except ValueError:
return False, -1
except OSError:
return False, 0
# 新しいCSVファイルを作成する関数
def create_new_csv():
timestamp = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
filename = f"temp_humidity_{timestamp}.csv"
filepath = os.path.join(csv_directory, filename)
with open(filepath, "w", newline="") as csvfile:
writer = csv.writer(csvfile)
writer.writerow(["Timestamp", "Temperature (℃)", "Humidity (%)"])
return filepath
# メイン処理
try:
current_csv = create_new_csv()
start_time = datetime.now()
while True:
# 温度を取得
Flag, Temperature = readFirstLine(device0 + "/in_temp_input")
temperature = Temperature // 1000 if Flag else "N.A."
# 湿度を取得
Flag, Humidity = readFirstLine(device0 + "/in_humidityrelative_input")
humidity = Humidity // 1000 if Flag else "N.A."
# 現在時刻を取得
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
# データをCSVに記録
with open(current_csv, "a", newline="") as csvfile:
writer = csv.writer(csvfile)
writer.writerow([timestamp, temperature, humidity])
print(f"[{timestamp}] Temperature: {temperature} ℃, Humidity: {humidity} %")
# 設定時間経過したらファイルをADLSに送信し、新しいCSVファイルを作成
if datetime.now() - start_time >= timedelta(minutes=csv_rotation_interval_minutes):
upload_to_adls.upload_csv(current_csv)
current_csv = create_new_csv()
start_time = datetime.now()
time.sleep(measurement_interval_seconds)
except KeyboardInterrupt:
print("\n計測を終了しました。")
この部分はディレクトリ構成によって変更してください。
from upload import upload_to_adls
timedelta型にすることで時間の長さに比較演算子が使えるようになります。
if datetime.now() - start_time >= timedelta(minutes=csv_rotation_interval_minutes):
このファイルをただ実行するだけだと、ターミナルが閉じると処理が中断されます。私の場合、RaspberryPiとssh接続していたMacが長めにスリープすると、ssh接続が切れて処理が中断されました。
そのため、nohup
というコマンドでバックグラウンド実行します。(no hung upという意味だそうです)
使い方
$ nohup {実行し続けたいコマンド} > {出力先ファイル} &
バックグラウンドで実行:
nohup python3 /${file_path}/measure_upload.py > output.log 2>&1 &
2>&1
とすることで、標準出力(1)だけでなく、標準エラー(2)もoutput.log
にリダイレクトするようにしています。
バックグラウンド実行を止める場合
プロセス確認:
ps aux | grep measure_upload.py
mech 12345 0.0 1.2 123456 65432 ? S 10:00 0:00 python3 /home/mech/src/raspi-temp-humidity-pipeline/temp_humidity_measure/measure_upload.py
mech 12346 0.0 0.0 1234 456 pts/0 S+ 10:00 0:00 grep --color=auto measure_upload.py
ここで、12345
がスクリプトのプロセスID(PID)です。
プロセスを停止:
kill 12345
停止したことを確認:
ps aux | grep measure_upload.py
4. ADLS ⇨ Azure Databricks
前工程でADLSのコンテナにアップロードしたファイルをDatabricksで参照・いじることができるように、外部ボリュームを作成します。
Azure Databricks画面で、カタログ > 外部データ > 外部ロケーションタブを押下します。
下記のように必要事項を入力し外部ロケーションを作成します。
外部ロケーション名:`temp-humidity-external`(任意の命名)
URL:`abfss://{コンテナ名}@{ストレージアカウント名}.dfs.core.windows.net`
ストレージ資格情報:`{このストレージアカウントの資格情報}`
データを置きたいカタログに、外部ボリュームを作成します。
その際に、作成した外部ロケーションを指定すると、そこにアップロードされたファイルが外部ボリュームから参照できるようになるかと思います。
終わりに
これでようやくAzure DatabricksのUnity Catalog上に温湿度データがアップロードされました!今後はデータパイプラインを構築し、csvファイルをテーブル化したいです!