1.はじめに
こんにちは。(株) 日立製作所のLumada Data Science Lab.の山崎 建です。
私は入社1年目の新人データサイエンティストであり、普段の業務では、データ分析による課題解決や、データ利活用のためのコンサルティングを行っています。
今回は 自身のデータサイエンスの知識と生成AIを用いて、自宅で活用できるデバイスを短期間・低コストで開発できる ことを実体験を紹介しながらお伝えします。
本記事が、ご覧くださっている方々が気軽にデバイスの開発に取り組み、その面白さを体感するきっかけとなれば嬉しいです。データサイエンスを学んでいる方は自身の知識をモノに昇華することができ、初心者の方はChatGPTに質問しながら開発を行うことで副次的に知識習得につながると考えます。
2.デバイス開発におけるChatGPTの活用
今回デバイス開発においてChatGPTを用いましたが、主にコードの生成に活用しています。
マイコンをプログラミングする際、リソースの問題からライブラリの使用は可能な限り抑えることが望ましいですが、スクラッチでコードをすべて書くのは労力がかかります。そこで、ChatGPTにコードを書かせることで、短期間で簡単に実装を行います。
労力を減らすという観点でマイコンとChatGPTの組み合わせは相性が良いと考えます。
3.目的 ~サーキュレーターの故障の異常検知~
今回はサーキュレーターの故障の異常検知を行うデバイスを開発します。最終的にはマイクロ風車の異常検知まで行いたいと考えており、その一歩目として自宅のサーキュレーターを対象としました。
サーキュレーターに異常を検知するデバイスを設置し、異常があったらLEDの点灯でユーザーにアラートを伝えます。今回は異常の検知に加速度センサを使用し、振動を活用して異常を検知します。データの処理や制御にはRasberry Pi Pico Wを使用します。
サーキュレーターの稼働条件は以下を想定します。
- サーキュレーターの風向は首振りではなく固定状態
- デバイス設置時には異常はなく正常状態
4.システム構成
今回は以下を用いてデバイスを作成しました。
非常に低コストで異常検知が可能なデバイスを実現できます。
・Rasberry Pi Pico W
Raspberry Pi PicoにInfineon CYW43439無線チップを追加搭載し、無線LAN機能が利用可能なモデルです。Micro Pythonでコーデイングします。
※今回は無線LAN機能は使用していないため、Raspberry Pi Picoで代替可能です。
・3軸加速度センサ:KXR94-2050(Kionix製)
3軸の加速度をアナログ出力するセンサです。
・LEDライト
一般的なLEDライトです。デバイスの動作状態や異常検知のアラートを表すインターフェイスとして活用します。
回路構成
後述のコード中のピン配置に従い回路を組んだデバイスは以下になります。
5.実装 ~ChatGPTとの共創~
ChatGPTでコードを生成しながら実装していきます。
今回はGPT-3.5を使用します。
ユーザーは要件を考えてプロンプト入力だけでコードが生成されるため 短時間で実装できます。
① センサ値のモニタリング
初めに、加速度センサが正常に測定できているか確認します。
今回は以下の理由からセンサから得られた値を変換せずにそのまま使用します。
1.フィードバックの想定はないため、可読性のある物理的な値への変換は不必要
2.センサから得られる値の、変換時の数字の丸まりによる情報損失を防止
センサ値出力を確認するコードを生成します。
上記で生成したコードを実行すると、正しくセンサ値が出力されました。
② 異常検知
加速度センサの値が正常に取れることが確認できたため、次に異常検知のコードを生成します。
今回のサーキュレーターの想定は下記です。
サーキュレーターの稼働条件は以下を想定します。
- サーキュレーターの風向は首振りではなく固定状態
- デバイス設置時には異常はなく正常状態
デバイス設置時は正常状態であり、サーキュレーターの実際の動きに応じた閾値を設定するために、最初にサンプルデータを取得し、そのサンプルデータを活用して閾値を決定する仕様にします。
また、センサ値はノイズがあるものの基本的に固定状態であるため、X軸、Y軸、Z軸それぞれのセンサ値は正規分布に従うと考えます。よって、3軸間の相関関係も考慮できる マハラノビス距離 を用いた異常検知とします。
マハラノビス距離:
母集団の分布を考慮し,特徴量の間に強い相関がある場合でも正常データと異常データを分離しやすい特徴を持った距離の定義方法。データが多変量正規分布に従うことを前提に計算される。異常検知の分野ではマハラノビス距離がよく使用される。
左図:ユークリッド距離で分離する場合 右図:マハラノビス距離で分離する場合
参考 :scipyを使って特徴量の相関を考慮したマハラノビス距離を計算する
※実際に①で取得できたセンサ値からヒストグラムを作成すると正規分布に近い形をしていました。
上記を踏まえて異常検知のコード生成を生成します。
マハラノビス距離算出における共分散の逆行列の計算はコード量が多くなると予期していましたが、生成された上記コードを確認すると該当処理部分が1行で書かれていることに違和感があり、実際にデバッグするとマハラノビス距離が意図しない値を出力しました。
そこで、コード中の共分散の逆行列の計算に着目すると、行列の各要素の値を逆数にする仕様で書かれていたため、正確ではありませんでした。
正しく計算するようコードを修正します。
余因子を用いた逆行列の計算を行うよう修正されました。
上記のコードではマハラノビス距離の閾値が0で異常を検知する仕様です。これをサンプルデータを活用した閾値設定に変更します。
サンプルデータ取得時は正常状態であるため、多少の許容を加え、閾値はサンプルデータのマハラノビス距離の最大値の1.05倍にします。
また、サンプルをより多く取得するためにサンプルデータ取得中のウェイトは0.1秒に変更し、より敏感に異常検知ができるようメインループのウェイトは0.01秒に変更します。
最後に、異常検知をLEDにより視覚で確認できるようにします。
LEDの状態は下記4種類に分けます。
1.サンプルデータ取得中
2,メインループ中
3.異常発生後10秒間
4.システム的なエラー発生時
以下を自身で修正します。
- 見えやすいようにLEDのピン配置を2,3,4,5に変更
- システムエラー時LED(led_system_errorピン)を点灯する仕様がないため、追加
- 異常検知後10秒間、異常検知LED(led_anomaly_detectedピン)が点灯している間は、メインループ中LED(led_main_loopピン)を消灯するコードを追加
以下が最終的なコードです。
from machine import ADC, Pin
import utime
import math
# センサが接続されているアナログ入力ピン
analog_pin_x = ADC(Pin(26))
analog_pin_y = ADC(Pin(27))
analog_pin_z = ADC(Pin(28))
# LEDが接続されているピン(例:GP15、GP16、GP17、GP18)
led_sample_data = Pin(2, Pin.OUT)
led_main_loop = Pin(3, Pin.OUT)
led_anomaly_detected = Pin(4, Pin.OUT)
led_system_error = Pin(5, Pin.OUT)
# サンプルデータを格納するリスト
sample_data = []
# サンプル取得時間(秒)
sample_duration = 180 # 3分間
# 閾値係数(調整可能)
threshold_coefficient = 1.05
# Mahalanobis距離の閾値を初期化
threshold = None
# Mahalanobis距離を計算する関数
def mahalanobis_distance(sample, mean, covariance):
diff = [sample[i] - mean[i] for i in range(len(sample))]
inv_covariance = inverse_matrix(covariance)
dot_product = sum(diff[i] * sum(inv_covariance[i][j] * diff[j] for j in range(len(diff))) for i in range(len(diff)))
return math.sqrt(dot_product)
# 逆行列を計算する関数
def inverse_matrix(matrix):
determinant = matrix[0][0] * matrix[1][1] * matrix[2][2] + matrix[0][1] * matrix[1][2] * matrix[2][0] + matrix[0][2] * matrix[1][0] * matrix[2][1] - matrix[0][0] * matrix[1][2] * matrix[2][1] - matrix[0][1] * matrix[1][0] * matrix[2][2] - matrix[0][2] * matrix[1][1] * matrix[2][0]
# 行列式が0の場合は逆行列が存在しない
if determinant == 0:
raise ValueError("逆行列が存在しません。")
inv_determinant = 1 / determinant
# 逆行列を計算
result = [
[
(matrix[1][1] * matrix[2][2] - matrix[1][2] * matrix[2][1]) * inv_determinant,
(matrix[0][2] * matrix[2][1] - matrix[0][1] * matrix[2][2]) * inv_determinant,
(matrix[0][1] * matrix[1][2] - matrix[0][2] * matrix[1][1]) * inv_determinant
],
[
(matrix[1][2] * matrix[2][0] - matrix[1][0] * matrix[2][2]) * inv_determinant,
(matrix[0][0] * matrix[2][2] - matrix[0][2] * matrix[2][0]) * inv_determinant,
(matrix[0][2] * matrix[1][0] - matrix[0][0] * matrix[1][2]) * inv_determinant
],
[
(matrix[1][0] * matrix[2][1] - matrix[1][1] * matrix[2][0]) * inv_determinant,
(matrix[0][1] * matrix[2][0] - matrix[0][0] * matrix[2][1]) * inv_determinant,
(matrix[0][0] * matrix[1][1] - matrix[0][1] * matrix[1][0]) * inv_determinant
]
]
return result
try:
# サンプルデータを取得
print("サンプルデータを取得中...")
led_sample_data.value(1) # サンプルデータ取得中のLEDを点灯
start_time = utime.time()
while utime.time() - start_time < sample_duration:
x, y, z = [analog_pin_x.read_u16(), analog_pin_y.read_u16(), analog_pin_z.read_u16()]
sample_data.append((x, y, z))
utime.sleep_ms(100) # サンプルデータ取得中の待機時間
led_sample_data.value(0) # サンプルデータ取得中のLEDを消灯
print("サンプルデータの取得が完了しました.")
# サンプルデータから平均と共分散行列を計算
mean_vector = [sum(data[i] for data in sample_data) / len(sample_data) for i in range(3)]
covariance_matrix = [[sum((data[i] - mean_vector[i]) * (data[j] - mean_vector[j]) for data in sample_data) / (len(sample_data) - 1) for j in range(3)] for i in range(3)]
# 閾値を設定
threshold = threshold_coefficient * max(mahalanobis_distance(data, mean_vector, covariance_matrix) for data in sample_data)
print("異常検知を開始します。")
while True:
# センサ値を取得
x, y, z = [analog_pin_x.read_u16(), analog_pin_y.read_u16(), analog_pin_z.read_u16()]
# Mahalanobis距離を計算
mahalanobis_dist = mahalanobis_distance((x, y, z), mean_vector, covariance_matrix)
# メインループ中のLEDを点灯
led_main_loop.value(1)
# 異常が検知された場合
if mahalanobis_dist > threshold:
print("異常が検知されました! Mahalanobis距離:", mahalanobis_dist)
# 異常検知後10秒間のLEDを点灯
led_anomaly_detected.value(1)
led_main_loop.value(0)
utime.sleep(10)
led_anomaly_detected.value(0) # LEDを消灯
led_main_loop.value(1)
# 適切な待機時間を設定
utime.sleep_ms(10) # メインループでのウェイト
except Exception as e:
print("システムエラーが発生しました: {}".format(e))
led_system_error.value(1)
utime.sleep(10)
led_system_error.value(0)
6.デモンストレーション
上記で完成したコードを書き込み、実際にサーキュレーターに取り付けました。
異常現象として細い紐をサーキュレーターの羽根に接触させたところ、正しく異常を検知しました。
※異常発生をシミュレーションするために本デモでは「細い紐」を羽根に接触させています。
(サーキュレーター本来の用途目的以外であるため推奨するものではありません。)
7.デバイス開発におけるCharGPTの活用の所感
今回デバイス開発において主にコードの生成にChatGPTを用いました。
メリットは冒頭で紹介した通りスクラッチでコードを書く手間が省けることです。最後に自身で少し修正しましたが、コードのほとんどはChatGPTが生成するため、効率的に開発を進めることができました。また、コード実行時のエラー対応にもChatGPTを活用可能でした。今回の記事では紹介していませんが、同じ質問でどの程度異なるコードが生成されるか確認していたところ、生成されたコードを実行するとマイコンにてメモリエラーが発生しました。その際にエラー文をそのままプロンプトとして与えると、原因の説明とその対策案とコードの候補を生成しました。ユーザーがエラー対応について考える時間を短縮することができます。
一方、ユーザーもデータサイエンスやエンジニアリングの基礎的な知識は必要だと実感しました。今回のケースでは、逆行列の計算が実データを想定していない計算だったため、ユーザーがそれに気づいて修正する必要がありました。ChatGPTは常に絶対に正しい回答をしないことを念頭に置き、一定の知識を持ってコードをチェックすることが必要不可欠だと考えます。
また、今回は私の事前知識をもとにマハラノビス距離を使用しましたが、「加速度センサを用いた異常検知にはどのような手法がありますか」のような根本の手法から尋ねるプロンプトを与えることで、結果として事前知識がなくともマハラノビス距離を使用するコードになることも考えられます。しかし、マハラノビス距離が正しく計算されているかチェックするためには知識が必要になります。すなわち、会話の中でユーザーも知識を習得しそれをもとに次の指示を出す、といったChatGPTとのインタラクションを重ねることで洗練されたコードが生成できるのではないかと考えます。
8.おわりに
今回は自身のデータサイエンスの知識と生成AIを用いて、サーキュレーターの異常検知を行うデバイスを短期間・低コストで開発しました。
さらに高度な異常検知の実現には、ChatGPT活用にあたるプロンプトエンジニアリングや、サンプルデータ数の調整、閾値の設定方法などの改善点が考えられます。
今後は、今回のサーキュレーターの異常検知から マイクロ風車の異常検知までの対象拡大 や、 自宅で活用できる他のデバイス開発 などに挑戦してみたいと考えております。
自身の知識を活用してモノを作る経験は大変楽しく、生成AIが普及している現在は気軽に取り組むことができます。今回の体験の紹介が、ご覧くださっている方々のデバイス開発のきっかけになればと思います。