はじめに
皆さん、こんにちは。ABEJAアドベントカレンダー2021の1日目の記事です。最近車を購入しました。組み込み技術とかクラウドとかAIとかが好きなので、世界に一つだけのMyドライブレコーダーを作りたいなーと思い、趣味で色々やりはじめました。
※社内勉強会ABECONにおける発表資料から抜粋
最近、お遊びでOBD2経由で取得した車両情報をBIツール上で振り返られるようにしたので、その部分の取り組みについてご紹介できればと思います。
OBDについて
- OBD = On Board Diagnosis (車の自己診断機能)
- CAN通信というプロトコルを拡張している。(CANに関しては後述)
- OBDの歴史は深い
- 1971年 FI(Fuel Injection: 電子制御式燃料噴射装置)搭載車の登場
- 1980年代 自己診断機能で修理しやすく
- 1982年 自己診断機能の統一化からOBDの誕生 => OBDのベースに
- 1996年 OBD-II(OBD2)の登場
- 実は20年以上前からある枯れてきていえるプロトコル
- 車両に関わるめっちゃいろんなデータが取れたり、制御ができる
- OBD2端子は車のどこかにあって、OBD車検などでも活用される
OBD2を使う事によって、車自体の情報(速度・エンジン回転数・冷却材温度・負荷・センサー温度)を記録できる。今回は、OBD2を使って車両情報の取得・蓄積・可視化について検討
CAN通信について
- Controller Area Networkの略。
- ドイツのBosch社が開発したシリアル通信プロトコル
- 1994年に国際標準化機構(ISO)により標準規格(ISO11898/ISO11519)に。
- 現在では、ほぼすべての自動車に採用さ
- 車だけではなく、人工衛星の通信バスや工場のFA(ファクトリーオートメーション)にも利用される
OBD2でやりとりされる通信はCAN通信を拡張して作られている為、CAN通信ができるようなハードウェアを用意する事で、車のECUと通信ができる
システム全体像
OBD2データの取得部
-
RaspberryPi側
- OBD2情報取得プログラム (obd_recorder.go)
- 役割1:後述のCAN-BUS Moduleと通信をし、ECU側にデータ送信要求を投げる
- 例: エンジンの回転数をくれー
- 役割2:受信したデータをデコードして、記憶装置にファイルとして書き出し
- 役割1:後述のCAN-BUS Moduleと通信をし、ECU側にデータ送信要求を投げる
- OBD2情報取得プログラム (obd_recorder.go)
-
USB⇔CAN通信側
- 役割: RaspberryPi側からOBD2端子経由で車のBUSにアクセスできるようにする。橋渡し的役割。
取得データの転送・可視化部
-
RaspberryPi側
- データ転送部分 (Fluentd)
- 役割: 前述のOBD2情報取得プログラムによって吐き出されたデータをBigQueryに書き出す
- データ転送部分 (Fluentd)
-
GCP側
- DWH部分 (BigQuery)
- 役割: OBD2情報を継続的に蓄積し、分析できる土壌を提供する
- BIツール部分 (Redash)
- 役割: 蓄積されたデータを良い感じにグラフィカルに表示する&分析できるようにする
- DWH部分 (BigQuery)
構築メモ
OBD2データの取得部分
ハードウェア部分
-
Serial CAN-BUS Module + FT232HL USBシリアル変換モジュールを利用
- USB経由でCAN通信(受信・送信)ができる。
-
有名どころのICであるMCP2551やMCP2515を直接RaspberryPiに接続する事も可能だが、今回は楽するためにモジュールを利用
ソフトウェア部分
-
CAN通信モジュールの使い方は、下記マニュアルを参照
-
obd_recorder.goでやっている事
- ATコマンドを発行してCAN通信モジュール自体の設定を行う (初回起動時のみ)
- 14バイトのバイト列を送る事で、モジュール経由でECUにメッセージを送信
- ECUからのメッセージ(12バイトが基本)をリングバッファーで受け取る
- 受け取ったデータを実際の物理量などに換算して、ファイルに書き出し
※とりあえず動かすこと最優先で、コードはさくっと。
package main
import (
"os"
"fmt"
"github.com/jacobsa/go-serial/serial"
"io"
"time"
"path/filepath"
"encoding/json"
)
// OBD2で使うPIDの定義。(車種・メーカーによって異なるので注意)
var (
CAR_NAME = "TAGURO_RAIZE"
AT_CMD_MODE = "AT_CMD_MODE"
DATA_MODE = "DATA_MODE"
CAN_ID_PID = int32(0x7DF)
PID_ENGINE_LOAD = byte(0x04)
PID_ENGINE_RPM = byte(0x0C)
PID_VEHICLE_SPEED = byte(0x0D)
PID_COOLANT_TEMP = byte(0x05)
PID_AXEL_PERCENT = byte(0x11)
PID_REMAIN_FUEL = byte(0x2F)
PID_OUTSIDE_TEMP = byte(0x46)
PID_DISTANCE_TRAVELED = byte(0x31)
PID_RUNTIME_SINCE_ENGINE_START = byte(0x1F)
PID_INTAKE_AIR_TEMP = byte(0x0F)
PID_ENGINE_OIL_TEMP = byte(0x5C)
PID_CONTROL_MODULE_VOLT = byte(0x42)
PID_ABS_BAROMETRIC_PRESSURE = byte(0x33)
PID_SHORT_TERM_FUEL_TRIM = byte(0x06)
PID_LONG_TERM_FUEL_TRIM = byte(0x07)
PID_TIMING_ADVANCE = byte(0x0E)
PID_INTAKE_MANIFOLD_ABS_PRESSURE = byte(0x0B)
)
func makeRequest(port io.ReadWriteCloser, canIdPid int32, pid byte) {
port.Write([]byte{
(byte)(canIdPid >> 24),
(byte)((canIdPid >> 16) & 0xFF),
(byte)((canIdPid >> 8) & 0xFF),
(byte)(canIdPid & 0xFF),
0x00, 0x00,
0x02, 0x01,
pid,
0x00,
0x00, 0x00, 0x00, 0x00,
})
}
// ATコマンドを発行してCAN通信を初期化
func initCanBus(port io.ReadWriteCloser) {
// ATコマンドスタートに合図
fmt.Println("+++")
port.Write([]byte("+++"))
time.Sleep(200 * time.Microsecond)
// シリアル通信の速度設定
fmt.Println("AT+S=0")
port.Write([]byte("AT+S=0\n"))
time.Sleep(200 * time.Microsecond)
// CAN通信の速度設定
fmt.Println("AT+C=16")
port.Write([]byte("AT+C=16\n"))
time.Sleep(200 * time.Microsecond)
// CAN通信モジュールのMask設定
for i := 0; i < 2; i++ {
str := "AT+M=[" + string("0"[0]+(byte)(i)) + "][0][000007FC]\n"
fmt.Println(str)
port.Write([]byte(str))
time.Sleep(200 * time.Microsecond)
}
// CAN通信モジュールのFilter設定
for i := 0; i < 6; i++ {
str := "AT+F=[" + string("0"[0]+(byte)(i)) + "][0][000007E8]\n"
fmt.Println(str)
port.Write([]byte(str))
time.Sleep(200 * time.Microsecond)
}
fmt.Println("AT+Q")
// ATコマンド終了合図
port.Write([]byte("AT+Q\n"))
time.Sleep(1000 * time.Microsecond)
}
type Record struct {
Timestamp string `json:"timestamp"`
DataType string `json:"dataType"`
Value float64 `json:"value"`
Unit string `json:"unit"`
}
func writeRecord(tStr string, dataType string, value interface{}, unit string) {
// int32もfloat64もどっちも処理できるように。
newValue := 0.0
switch value.(type) {
case int32:
newValue = (float64)(value.(int32))
case float64:
newValue = value.(float64)
default:
fmt.Printf("[WARN] Can't handle this type of value : %T\n", value)
}
// JSONLとして書き出す (書き出しファイルはFluentdが解釈してBQ側に送信)
jsonRecord, err := json.Marshal(Record{
Timestamp: tStr,
DataType: dataType,
Value: newValue,
Unit: unit,
})
if err != nil {
panic(err)
}
jsonRecordString := string(jsonRecord) + "\n"
jsonFilePath := filepath.Join("./records", fmt.Sprintf("./record_%s.jsonl", dataType))
f, err := os.OpenFile(jsonFilePath, os.O_APPEND|os.O_WRONLY|os.O_CREATE, 0600)
if err != nil {
panic(err)
}
defer f.Close()
if _, err = f.WriteString(jsonRecordString); err != nil {
panic(err)
}
}
func parseDataFromECU (tmpBuf []byte) {
// ECUからのバイト列を解釈する
// 参照: https://en.wikipedia.org/wiki/OBD-II_PIDs
tStr := time.Now().Format(time.RFC3339)
if tmpBuf[6] == PID_ENGINE_LOAD {
writeRecord(tStr, "EngineLoad", (int32)(tmpBuf[7]) * 100.0 / 255.0, "%")
}
if tmpBuf[6] == PID_ENGINE_RPM {
writeRecord(tStr, "EngineRpm", ((int32)(tmpBuf[7])*256+(int32)(tmpBuf[8]))/4, "RPM")
}
if tmpBuf[6] == PID_COOLANT_TEMP {
writeRecord(tStr, "CoolantTemp", ((int32)(tmpBuf[7]) - 40), "C")
}
if tmpBuf[6] == PID_VEHICLE_SPEED {
writeRecord(tStr, "VehicleSpeed", ((int32)(tmpBuf[7])), "km/h")
}
if tmpBuf[6] == PID_AXEL_PERCENT {
writeRecord(tStr, "Axel", ((int32)(tmpBuf[7]) * 100.0 / 255.0), "%")
}
if tmpBuf[6] == PID_REMAIN_FUEL {
writeRecord(tStr, "RemainFuel", ((int32)(tmpBuf[7]) * 100.0 / 255.0), "%")
}
if tmpBuf[6] == PID_OUTSIDE_TEMP {
writeRecord(tStr, "OutsideTemp", ((int32)(tmpBuf[7]) - 40), "C")
}
if tmpBuf[6] == PID_INTAKE_AIR_TEMP {
writeRecord(tStr, "IntakeAirTemp", ((int32)(tmpBuf[7]) - 40), "C")
}
if tmpBuf[6] == PID_DISTANCE_TRAVELED {
writeRecord(tStr, "DistanceTraveled", ((int32)(tmpBuf[7])*256 + (int32)(tmpBuf[8])), "km")
}
if tmpBuf[6] == PID_RUNTIME_SINCE_ENGINE_START {
writeRecord(tStr, "RunTimeEngineStarted", ((int32)(tmpBuf[7])*256 + (int32)(tmpBuf[8])), "second")
}
if tmpBuf[6] == PID_CONTROL_MODULE_VOLT {
writeRecord(tStr, "ContorlModuleVolt", (float64)((int32)(tmpBuf[7])*256+(int32)(tmpBuf[8]))/1000.0, "V")
}
if tmpBuf[6] == PID_ABS_BAROMETRIC_PRESSURE {
writeRecord(tStr, "AbsoluteBarometricPressure", ((int32)(tmpBuf[7])), "kPa")
}
if tmpBuf[6] == PID_SHORT_TERM_FUEL_TRIM {
writeRecord(tStr, "ShortTermFuelTrim", (float64)(tmpBuf[7]) / 1.28 - 100.0 , "%")
}
if tmpBuf[6] == PID_LONG_TERM_FUEL_TRIM {
writeRecord(tStr, "LongTermFuelTrim", (float64)(tmpBuf[7]) / 1.28 - 100.0 , "%")
}
if tmpBuf[6] == PID_TIMING_ADVANCE {
writeRecord(tStr, "TimingAdvance", (float64)(tmpBuf[7]) / 2.0 - 64.0 , "%")
}
if tmpBuf[6] == PID_INTAKE_MANIFOLD_ABS_PRESSURE {
writeRecord(tStr, "IntakeManifoldAbsPressure", (int32)(tmpBuf[7]) , "kPa")
}
}
func main() {
var err error
// シリアルポートの初期化
options := serial.OpenOptions{
PortName: "/dev/ttyUSB0",
BaudRate: 9600,
DataBits: 8,
StopBits: 1,
InterCharacterTimeout: 1,
MinimumReadSize: 1,
}
port, err := serial.Open(options)
if err != nil {
panic(err.Error())
}
defer port.Close()
go func() {
tmpStr := ""
tmpBuf := make([]byte, 12)
bufPtr := 0
//mode := AT_CMD_MODE
mode := DATA_MODE
for {
buf := make([]byte, 100)
//fmt.Print("Recving..(", mode, ")")
n, err := port.Read(buf)
if err != nil {
fmt.Println(err.Error())
break
}
//fmt.Printf("%d byte received.\n", n)
// ECUからのデータを処理するリングバッファー
for i := 0; i < n && mode == DATA_MODE; i++ {
ptr := (bufPtr + i) % 12
//fmt.Printf("tmpBuf[%d]=%d\n", ptr, buf[i])
tmpBuf[ptr] = buf[i]
if ptr == (12 - 1) {
//fmt.Println("**Received**", hex.EncodeToString((tmpBuf)))
//src := hex.EncodeToString(tmpBuf[0:3])
//dataLength := hex.EncodeToString([]byte{tmpBuf[4]})
//dataMode := hex.EncodeToString([]byte{tmpBuf[5]})
//pid := hex.EncodeToString([]byte{tmpBuf[6]})
//data := hex.EncodeToString(tmpBuf[7:12])
//fmt.Println(hex.EncodeToString(tmpBuf))
//fmt.Println("src=", src)
//fmt.Println("dataLength=", dataLength)
//fmt.Println("dataMode=", dataMode)
//fmt.Println("pid=", pid)
//fmt.Println("data=", data)
parseDataFromECU(tmpBuf)
}
}
bufPtr = (bufPtr + n) % 12
// ATコマンドのレスポンスを解釈
for i := 0; i < n && mode == AT_CMD_MODE; i++ {
//fmt.Println(buf[i])
if buf[i] == 0x0D {
continue
}
if buf[i] == 0x0A {
fmt.Println("tmpStr=[", tmpStr, "]", len(tmpStr))
if tmpStr == "INIT OK" {
mode = DATA_MODE
fmt.Println("Change to DataMode.")
}
tmpStr = ""
continue
}
tmpStr += string(buf[i])
}
}
fmt.Println("Reader closed")
}()
// CAN通信モジュールを初期化する (初回のみ)
//initCanBus(port)
for {
// ECUへデータ取得リクエストを送信
makeRequest(port, CAN_ID_PID, PID_ENGINE_LOAD)
makeRequest(port, CAN_ID_PID, PID_ENGINE_RPM)
makeRequest(port, CAN_ID_PID, PID_COOLANT_TEMP)
makeRequest(port, CAN_ID_PID, PID_VEHICLE_SPEED)
makeRequest(port, CAN_ID_PID, PID_AXEL_PERCENT)
//makeRequest(port, CAN_ID_PID, PID_REMAIN_FUEL)
makeRequest(port, CAN_ID_PID, PID_OUTSIDE_TEMP)
makeRequest(port, CAN_ID_PID, PID_DISTANCE_TRAVELED)
makeRequest(port, CAN_ID_PID, PID_RUNTIME_SINCE_ENGINE_START)
makeRequest(port, CAN_ID_PID, PID_INTAKE_AIR_TEMP)
//makeRequest(port, CAN_ID_PID, PID_ENGINE_OIL_TEMP)
makeRequest(port, CAN_ID_PID, PID_CONTROL_MODULE_VOLT)
makeRequest(port, CAN_ID_PID, PID_ABS_BAROMETRIC_PRESSURE)
makeRequest(port, CAN_ID_PID, PID_SHORT_TERM_FUEL_TRIM)
makeRequest(port, CAN_ID_PID, PID_LONG_TERM_FUEL_TRIM)
makeRequest(port, CAN_ID_PID, PID_TIMING_ADVANCE)
makeRequest(port, CAN_ID_PID, PID_INTAKE_MANIFOLD_ABS_PRESSURE)
// データ取得周期の設定
time.Sleep(time.Second * 1)
}
select {}
}
取得データの転送部分
-
Fluentd x fluent-plugin-bigqueryプラグインベースで構築
-
fluent.confの動作
- ①Tailプラグインでobd_recorder.goが吐き出したファイルをInputとして扱う
- ポジションファイルを使うことで、トラブった時にResumeできるようにする
- ②BigQueryInsertプラグインを用いてテーブル自動作成
- ③ファイル内容の変更(追記部分)のストリーミングインサートを実施
- ①Tailプラグインでobd_recorder.goが吐き出したファイルをInputとして扱う
※本来であれば、ちゃんと分割テーブル設計やスキーマ設計をすべきですが、今回はさくっと。
<source>
@type tail
format json
path /home/pi/records/*
pos_file /home/pi/records/pos
tag result.*
</source>
<match result.**>
@type bigquery_insert
auth_method json_key
json_key /home/pi/gcp-serviceaccount.json
project <GCP_PROJECT_ID>
dataset taguro_car
auto_create_table true
table metrics_%Y%m%d
<buffer time>
flush_interval 10
total_limit_size 10g
flush_thread_count 4
timekey 1d
</buffer>
<inject>
time_key timestamp
time_type string
time_format %Y-%m-%dT%H:%M:%S%:z
utc true
</inject>
schema [
{ "name": "timestamp", "type": "TIMESTAMP" },
{ "name": "dataType", "type": "STRING" },
{ "name": "value", "type": "FLOAT"},
{ "name": "unit", "type": "STRING"}
]
</match>
取得データの可視化部分
- ①Redashサーバーの構築
- GCEを利用。CloudShell上で下記コマンドを叩くと、Redash8のホスティングされたインスタンスが立ち上がる
$ gcloud compute images create "redash-8-0-0" --source-uri gs://redash-images/redash.8.0.0-b32245-1.tar.gz
$ gcloud compute instances create redash --image redash-8-0-0
参考: https://redash.io/help/open-source/setup#-Google-Compute-Engine
- ②Redashサーバーのファイアウォール設定
- HTTPトラフィックを許可(この時、IP制限を付与できるとベター)
- ③初期設定の実施
- 立ち上がったGCEのマシンのグローバルIPでアクセスすると、Redashの初期設定ができる
- ④GCP側でサービスアカウントの発行
- 注意点: 権限はBigQureyアクセスに絞っておく。
- ⑤データソースの設定&サービスアカウント設定
- GCPのProjectID及びサービスアカウントのアップロードを行う
- ⑥クエリーやダッシュボードの作成
可視化したいデータの分だけQueryを作り、それをまとめたDashboardを作る。
(BQへ投げるクエリーが全スキャンになってしまってえいるので、どこかで治す)
デモンストレーション
高速道路を走行。OBD端子経由で取得したデータをクラウド側で確認する事ができた。
(一秒に一回はデータとりすぎ感。BIツールが重い、、、)
今回のまとめ
-
OBD2端子からデータを引っこ抜くことができた
-
引っこ抜いたデータをBQに転送する事ができた
-
BQ上のデータをRedashを用いて可視化する事ができた
今後の展望 (残課題)
-
多分タイヤの回転数から走行距離を算出しているせいか、微妙に走行距離が合わない
- 係数1.2ぐらいかけるとちょうどいいかも。
-
TOYOTA RAIZEでは動作確認できたが、他メーカーの車で試してみる
- おそらくECUのアドレスとか変わってくる気がする
-
分割テーブル設計やスキーマ設計ちゃんとやる。クエリーもちゃんとする。
- BigQueryへのクエリーのスキャン量の適正化等
-
データ点数が多くなりすぎてBIツールが重い問題
- データ取得頻度の変更(今は一秒に一回)
- クラウド側で10分平均等、随時集計をする
-
燃費情報とか、ガソリン消費量や残量等をOBD取得データから可視化する
- ガソリン切れそうだったらSlackで自動通知とか
余談
- ABEJAではIoTとかBigDataとかAIとかWeb大好きって人を探しております。ぜひお気軽にご連絡ください!