48
21

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

ABEJAAdvent Calendar 2021

Day 1

OBD2 x RaspberryPiで車両情報をクラウド側で蓄積&BIで分析できるようにする

Last updated at Posted at 2021-12-01

はじめに

皆さん、こんにちは。ABEJAアドベントカレンダー2021の1日目の記事です。最近車を購入しました。組み込み技術とかクラウドとかAIとかが好きなので、世界に一つだけのMyドライブレコーダーを作りたいなーと思い、趣味で色々やりはじめました。

スクリーンショット_2021-12-01_12_39_45.png

68747470733a2f2f71696974612d696d6167652d73746f72652e73332e61702d6e6f727468656173742d312e616d617a6f6e6177732e636f6d2f302f3132383132332f37363163326263312d646162382d383565322d633235332d3566643762326631386263372e706e67.png

※社内勉強会ABECONにおける発表資料から抜粋

最近、お遊びでOBD2経由で取得した車両情報をBIツール上で振り返られるようにしたので、その部分の取り組みについてご紹介できればと思います。

OBDについて

  • OBD = On Board Diagnosis (車の自己診断機能)
    • CAN通信というプロトコルを拡張している。(CANに関しては後述)
  • OBDの歴史は深い
    • 1971年 FI(Fuel Injection: 電子制御式燃料噴射装置)搭載車の登場
    • 1980年代 自己診断機能で修理しやすく
    • 1982年 自己診断機能の統一化からOBDの誕生 => OBDのベースに
    • 1996年 OBD-II(OBD2)の登場
      • 実は20年以上前からある枯れてきていえるプロトコル
  • 車両に関わるめっちゃいろんなデータが取れたり、制御ができる
  • OBD2端子は車のどこかにあって、OBD車検などでも活用される

スクリーンショット 2021-11-20 22.16.29.png

スクリーンショット 2021-11-20 21.40.06.png
Fig: 実際のOBDの通信フレームとでデータ解釈の例

OBD2を使う事によって、車自体の情報(速度・エンジン回転数・冷却材温度・負荷・センサー温度)を記録できる。今回は、OBD2を使って車両情報の取得・蓄積・可視化について検討

CAN通信について

  • Controller Area Networkの略。
  • ドイツのBosch社が開発したシリアル通信プロトコル
  • 1994年に国際標準化機構(ISO)により標準規格(ISO11898/ISO11519)に。
    • 現在では、ほぼすべての自動車に採用さ
  • 車だけではなく、人工衛星の通信バスや工場のFA(ファクトリーオートメーション)にも利用される

OBD2でやりとりされる通信はCAN通信を拡張して作られている為、CAN通信ができるようなハードウェアを用意する事で、車のECUと通信ができる

システム全体像

スクリーンショット 2021-12-01 13.17.26.png

OBD2データの取得部

スクリーンショット 2021-12-01 13.17.36.png

  • RaspberryPi側

    • OBD2情報取得プログラム (obd_recorder.go)
      • 役割1:後述のCAN-BUS Moduleと通信をし、ECU側にデータ送信要求を投げる
        • 例: エンジンの回転数をくれー
      • 役割2:受信したデータをデコードして、記憶装置にファイルとして書き出し
  • USB⇔CAN通信側

    • 役割: RaspberryPi側からOBD2端子経由で車のBUSにアクセスできるようにする。橋渡し的役割。

取得データの転送・可視化部

スクリーンショット 2021-12-01 13.17.52.png

  • RaspberryPi側

    • データ転送部分 (Fluentd)
      • 役割: 前述のOBD2情報取得プログラムによって吐き出されたデータをBigQueryに書き出す
  • GCP側

    • DWH部分 (BigQuery)
      • 役割: OBD2情報を継続的に蓄積し、分析できる土壌を提供する
    • BIツール部分 (Redash)
      • 役割: 蓄積されたデータを良い感じにグラフィカルに表示する&分析できるようにする

構築メモ

OBD2データの取得部分

ハードウェア部分

スクリーンショット 2021-12-01 13.21.00.png

ソフトウェア部分

  • CAN通信モジュールの使い方は、下記マニュアルを参照

  • obd_recorder.goでやっている事

    • ATコマンドを発行してCAN通信モジュール自体の設定を行う (初回起動時のみ)
    • 14バイトのバイト列を送る事で、モジュール経由でECUにメッセージを送信
    • ECUからのメッセージ(12バイトが基本)をリングバッファーで受け取る
    • 受け取ったデータを実際の物理量などに換算して、ファイルに書き出し

※とりあえず動かすこと最優先で、コードはさくっと。

obd_recorder.go
package main

import (
	"os"
	"fmt"
	"github.com/jacobsa/go-serial/serial"
	"io"
	"time"
	"path/filepath"
	"encoding/json"
)

// OBD2で使うPIDの定義。(車種・メーカーによって異なるので注意)
var (
	CAR_NAME                       = "TAGURO_RAIZE"
	AT_CMD_MODE                    = "AT_CMD_MODE"
	DATA_MODE                      = "DATA_MODE"
	CAN_ID_PID                     = int32(0x7DF)
	PID_ENGINE_LOAD                = byte(0x04)
	PID_ENGINE_RPM                 = byte(0x0C)
	PID_VEHICLE_SPEED              = byte(0x0D)
	PID_COOLANT_TEMP               = byte(0x05)
	PID_AXEL_PERCENT               = byte(0x11)
	PID_REMAIN_FUEL                = byte(0x2F)
	PID_OUTSIDE_TEMP               = byte(0x46)
	PID_DISTANCE_TRAVELED          = byte(0x31)
	PID_RUNTIME_SINCE_ENGINE_START = byte(0x1F)
	PID_INTAKE_AIR_TEMP            = byte(0x0F)
	PID_ENGINE_OIL_TEMP            = byte(0x5C)
	PID_CONTROL_MODULE_VOLT        = byte(0x42)
	PID_ABS_BAROMETRIC_PRESSURE    = byte(0x33)
	PID_SHORT_TERM_FUEL_TRIM       = byte(0x06)
	PID_LONG_TERM_FUEL_TRIM        = byte(0x07)
	PID_TIMING_ADVANCE             = byte(0x0E)
    PID_INTAKE_MANIFOLD_ABS_PRESSURE = byte(0x0B)

)

func makeRequest(port io.ReadWriteCloser, canIdPid int32, pid byte) {
	port.Write([]byte{
		(byte)(canIdPid >> 24),
		(byte)((canIdPid >> 16) & 0xFF),
		(byte)((canIdPid >> 8) & 0xFF),
		(byte)(canIdPid & 0xFF),
		0x00, 0x00,
		0x02, 0x01,
		pid,
		0x00,
		0x00, 0x00, 0x00, 0x00,
	})
}

// ATコマンドを発行してCAN通信を初期化
func initCanBus(port io.ReadWriteCloser) {
	// ATコマンドスタートに合図
	fmt.Println("+++")
	port.Write([]byte("+++"))
	time.Sleep(200 * time.Microsecond)
	// シリアル通信の速度設定
	fmt.Println("AT+S=0")
	port.Write([]byte("AT+S=0\n"))
	time.Sleep(200 * time.Microsecond)
	// CAN通信の速度設定
	fmt.Println("AT+C=16")
	port.Write([]byte("AT+C=16\n"))
	time.Sleep(200 * time.Microsecond)
	// CAN通信モジュールのMask設定
	for i := 0; i < 2; i++ {
		str := "AT+M=[" + string("0"[0]+(byte)(i)) + "][0][000007FC]\n"
		fmt.Println(str)
		port.Write([]byte(str))
		time.Sleep(200 * time.Microsecond)
	}
	// CAN通信モジュールのFilter設定
	for i := 0; i < 6; i++ {
		str := "AT+F=[" + string("0"[0]+(byte)(i)) + "][0][000007E8]\n"
		fmt.Println(str)
		port.Write([]byte(str))
		time.Sleep(200 * time.Microsecond)
	}
	fmt.Println("AT+Q")
	// ATコマンド終了合図
	port.Write([]byte("AT+Q\n"))
	time.Sleep(1000 * time.Microsecond)
}

type Record struct {
	Timestamp string `json:"timestamp"`
	DataType string `json:"dataType"`
	Value float64 `json:"value"`
	Unit string `json:"unit"`
}

func writeRecord(tStr string, dataType string, value interface{}, unit string) {
	// int32もfloat64もどっちも処理できるように。
	newValue := 0.0
	switch value.(type) {
	case int32:
		newValue = (float64)(value.(int32))
	case float64:
		newValue = value.(float64)
	default:
		fmt.Printf("[WARN] Can't handle this type of value : %T\n", value)
	}
	// JSONLとして書き出す (書き出しファイルはFluentdが解釈してBQ側に送信)
	jsonRecord, err := json.Marshal(Record{
		Timestamp: tStr,
		DataType: dataType,
		Value: newValue,
		Unit: unit,
	})
	if err != nil {
		panic(err)
	}
	jsonRecordString := string(jsonRecord) + "\n"
	jsonFilePath := filepath.Join("./records", fmt.Sprintf("./record_%s.jsonl", dataType))
	f, err := os.OpenFile(jsonFilePath, os.O_APPEND|os.O_WRONLY|os.O_CREATE, 0600)
	if err != nil {
		panic(err)
	}
	defer f.Close()
	if _, err = f.WriteString(jsonRecordString); err != nil {
		panic(err)
	}
}

func parseDataFromECU (tmpBuf []byte) {
	// ECUからのバイト列を解釈する
	// 参照: https://en.wikipedia.org/wiki/OBD-II_PIDs
	tStr := time.Now().Format(time.RFC3339)
	if tmpBuf[6] == PID_ENGINE_LOAD {
		writeRecord(tStr, "EngineLoad", (int32)(tmpBuf[7]) * 100.0 / 255.0, "%")
	}
	if tmpBuf[6] == PID_ENGINE_RPM {
		writeRecord(tStr, "EngineRpm", ((int32)(tmpBuf[7])*256+(int32)(tmpBuf[8]))/4, "RPM")
	}
	if tmpBuf[6] == PID_COOLANT_TEMP {
		writeRecord(tStr, "CoolantTemp", ((int32)(tmpBuf[7]) - 40), "C")
	}
	if tmpBuf[6] == PID_VEHICLE_SPEED {
		writeRecord(tStr, "VehicleSpeed", ((int32)(tmpBuf[7])), "km/h")
	}
	if tmpBuf[6] == PID_AXEL_PERCENT {
		writeRecord(tStr, "Axel", ((int32)(tmpBuf[7]) * 100.0 / 255.0), "%")
	}
	if tmpBuf[6] == PID_REMAIN_FUEL {
		writeRecord(tStr, "RemainFuel", ((int32)(tmpBuf[7]) * 100.0 / 255.0), "%")
	}
	if tmpBuf[6] == PID_OUTSIDE_TEMP {
		writeRecord(tStr, "OutsideTemp", ((int32)(tmpBuf[7]) - 40), "C")
	}
	if tmpBuf[6] == PID_INTAKE_AIR_TEMP {
		writeRecord(tStr, "IntakeAirTemp", ((int32)(tmpBuf[7]) - 40), "C")
	}
	if tmpBuf[6] == PID_DISTANCE_TRAVELED {
		writeRecord(tStr, "DistanceTraveled", ((int32)(tmpBuf[7])*256 + (int32)(tmpBuf[8])), "km")
	}
	if tmpBuf[6] == PID_RUNTIME_SINCE_ENGINE_START {
		writeRecord(tStr, "RunTimeEngineStarted", ((int32)(tmpBuf[7])*256 + (int32)(tmpBuf[8])), "second")
	}
	if tmpBuf[6] == PID_CONTROL_MODULE_VOLT {
		writeRecord(tStr, "ContorlModuleVolt", (float64)((int32)(tmpBuf[7])*256+(int32)(tmpBuf[8]))/1000.0, "V")
	}
	if tmpBuf[6] == PID_ABS_BAROMETRIC_PRESSURE {
		writeRecord(tStr, "AbsoluteBarometricPressure", ((int32)(tmpBuf[7])), "kPa")
	}
	if tmpBuf[6] == PID_SHORT_TERM_FUEL_TRIM {
		writeRecord(tStr, "ShortTermFuelTrim", (float64)(tmpBuf[7]) / 1.28  - 100.0 , "%")
	}
	if tmpBuf[6] == PID_LONG_TERM_FUEL_TRIM {
		writeRecord(tStr, "LongTermFuelTrim", (float64)(tmpBuf[7]) / 1.28  - 100.0 , "%")
	}
	if tmpBuf[6] == PID_TIMING_ADVANCE {
		writeRecord(tStr, "TimingAdvance", (float64)(tmpBuf[7]) / 2.0  - 64.0 , "%")
	}
	if tmpBuf[6] == PID_INTAKE_MANIFOLD_ABS_PRESSURE {
		writeRecord(tStr, "IntakeManifoldAbsPressure", (int32)(tmpBuf[7]) , "kPa")
	}
}

func main() {
	var err error
	// シリアルポートの初期化
	options := serial.OpenOptions{
		PortName:              "/dev/ttyUSB0",
		BaudRate:              9600,
		DataBits:              8,
		StopBits:              1,
		InterCharacterTimeout: 1,
		MinimumReadSize:       1,
	}
	port, err := serial.Open(options)
	if err != nil {
		panic(err.Error())
	}
	defer port.Close()
	go func() {
		tmpStr := ""
		tmpBuf := make([]byte, 12)
		bufPtr := 0

		//mode := AT_CMD_MODE
		mode := DATA_MODE

		for {
			buf := make([]byte, 100)
			//fmt.Print("Recving..(", mode, ")")
			n, err := port.Read(buf)
			if err != nil {
				fmt.Println(err.Error())
				break
			}
			//fmt.Printf("%d byte received.\n", n)
			// ECUからのデータを処理するリングバッファー
			for i := 0; i < n && mode == DATA_MODE; i++ {
				ptr := (bufPtr + i) % 12
				//fmt.Printf("tmpBuf[%d]=%d\n", ptr, buf[i])
				tmpBuf[ptr] = buf[i]
				if ptr == (12 - 1) {
					//fmt.Println("**Received**", hex.EncodeToString((tmpBuf)))
					//src := hex.EncodeToString(tmpBuf[0:3])
					//dataLength := hex.EncodeToString([]byte{tmpBuf[4]})
					//dataMode := hex.EncodeToString([]byte{tmpBuf[5]})
					//pid := hex.EncodeToString([]byte{tmpBuf[6]})
					//data := hex.EncodeToString(tmpBuf[7:12])
					//fmt.Println(hex.EncodeToString(tmpBuf))
					//fmt.Println("src=", src)
					//fmt.Println("dataLength=", dataLength)
					//fmt.Println("dataMode=", dataMode)
					//fmt.Println("pid=", pid)
					//fmt.Println("data=", data)
					parseDataFromECU(tmpBuf)
				}
			}
			bufPtr = (bufPtr + n) % 12

			// ATコマンドのレスポンスを解釈
			for i := 0; i < n && mode == AT_CMD_MODE; i++ {
				//fmt.Println(buf[i])
				if buf[i] == 0x0D {
					continue
				}
				if buf[i] == 0x0A {
					fmt.Println("tmpStr=[", tmpStr, "]", len(tmpStr))
					if tmpStr == "INIT OK" {
						mode = DATA_MODE
						fmt.Println("Change to DataMode.")
					}
					tmpStr = ""
					continue
				}
				tmpStr += string(buf[i])
			}
		}
		fmt.Println("Reader closed")
	}()
	// CAN通信モジュールを初期化する (初回のみ)
	//initCanBus(port)
	for {
		// ECUへデータ取得リクエストを送信
		makeRequest(port, CAN_ID_PID, PID_ENGINE_LOAD)
		makeRequest(port, CAN_ID_PID, PID_ENGINE_RPM)
		makeRequest(port, CAN_ID_PID, PID_COOLANT_TEMP)
		makeRequest(port, CAN_ID_PID, PID_VEHICLE_SPEED)
		makeRequest(port, CAN_ID_PID, PID_AXEL_PERCENT)
		//makeRequest(port, CAN_ID_PID, PID_REMAIN_FUEL)
		makeRequest(port, CAN_ID_PID, PID_OUTSIDE_TEMP)
		makeRequest(port, CAN_ID_PID, PID_DISTANCE_TRAVELED)
		makeRequest(port, CAN_ID_PID, PID_RUNTIME_SINCE_ENGINE_START)
		makeRequest(port, CAN_ID_PID, PID_INTAKE_AIR_TEMP)
		//makeRequest(port, CAN_ID_PID, PID_ENGINE_OIL_TEMP)
		makeRequest(port, CAN_ID_PID, PID_CONTROL_MODULE_VOLT)
		makeRequest(port, CAN_ID_PID, PID_ABS_BAROMETRIC_PRESSURE)
		makeRequest(port, CAN_ID_PID, PID_SHORT_TERM_FUEL_TRIM)
		makeRequest(port, CAN_ID_PID, PID_LONG_TERM_FUEL_TRIM)
		makeRequest(port, CAN_ID_PID, PID_TIMING_ADVANCE)
		makeRequest(port, CAN_ID_PID, PID_INTAKE_MANIFOLD_ABS_PRESSURE)
		// データ取得周期の設定
		time.Sleep(time.Second * 1)
	}
	select {}
}

取得データの転送部分

  • Fluentd x fluent-plugin-bigqueryプラグインベースで構築

  • fluent.confの動作

    • ①Tailプラグインでobd_recorder.goが吐き出したファイルをInputとして扱う
      • ポジションファイルを使うことで、トラブった時にResumeできるようにする
    • ②BigQueryInsertプラグインを用いてテーブル自動作成
    • ③ファイル内容の変更(追記部分)のストリーミングインサートを実施

※本来であれば、ちゃんと分割テーブル設計やスキーマ設計をすべきですが、今回はさくっと。

fluent.conf
<source>
  @type tail
  format json
  path /home/pi/records/*
  pos_file /home/pi/records/pos
  tag result.*
</source>

<match result.**>
  @type bigquery_insert
  auth_method json_key
  json_key /home/pi/gcp-serviceaccount.json
  project <GCP_PROJECT_ID>
  dataset taguro_car
  auto_create_table true
  table metrics_%Y%m%d
  <buffer time>
    flush_interval 10
    total_limit_size 10g
    flush_thread_count 4
    timekey 1d
  </buffer>
  <inject>
    time_key timestamp
    time_type string
    time_format %Y-%m-%dT%H:%M:%S%:z
    utc true
  </inject>
  schema [
    { "name": "timestamp", "type": "TIMESTAMP" },
    { "name": "dataType", "type": "STRING" },
    { "name": "value", "type": "FLOAT"},
    { "name": "unit", "type": "STRING"}
  ]
</match>

取得データの可視化部分

  • ①Redashサーバーの構築
    • GCEを利用。CloudShell上で下記コマンドを叩くと、Redash8のホスティングされたインスタンスが立ち上がる
$ gcloud compute images create "redash-8-0-0" --source-uri gs://redash-images/redash.8.0.0-b32245-1.tar.gz
$ gcloud compute instances create redash --image redash-8-0-0

参考: https://redash.io/help/open-source/setup#-Google-Compute-Engine

  • ②Redashサーバーのファイアウォール設定
    • HTTPトラフィックを許可(この時、IP制限を付与できるとベター)

スクリーンショット 2021-12-01 11.42.59.png

  • ③初期設定の実施
    • 立ち上がったGCEのマシンのグローバルIPでアクセスすると、Redashの初期設定ができる

スクリーンショット 2021-12-01 11.44.03.png

  • ④GCP側でサービスアカウントの発行
    • 注意点: 権限はBigQureyアクセスに絞っておく。

スクリーンショット_2021-12-01_11_58_55.png

  • ⑤データソースの設定&サービスアカウント設定
    • GCPのProjectID及びサービスアカウントのアップロードを行う

スクリーンショット 2021-12-01 11.52.13.png

  • ⑥クエリーやダッシュボードの作成

スクリーンショット_2021-12-01_11_55_52.png

スクリーンショット 2021-12-01 11.55.31.png

可視化したいデータの分だけQueryを作り、それをまとめたDashboardを作る。
(BQへ投げるクエリーが全スキャンになってしまってえいるので、どこかで治す)

デモンストレーション

高速道路を走行。OBD端子経由で取得したデータをクラウド側で確認する事ができた。
(一秒に一回はデータとりすぎ感。BIツールが重い、、、)

スクリーンショット 2021-11-20 22.37.43.png

スクリーンショット 2021-11-20 21.41.02.png

今回のまとめ

  • OBD2端子からデータを引っこ抜くことができた

  • 引っこ抜いたデータをBQに転送する事ができた

  • BQ上のデータをRedashを用いて可視化する事ができた

今後の展望 (残課題)

  • 多分タイヤの回転数から走行距離を算出しているせいか、微妙に走行距離が合わない

    • 係数1.2ぐらいかけるとちょうどいいかも。
  • TOYOTA RAIZEでは動作確認できたが、他メーカーの車で試してみる

    • おそらくECUのアドレスとか変わってくる気がする
  • 分割テーブル設計やスキーマ設計ちゃんとやる。クエリーもちゃんとする。

    • BigQueryへのクエリーのスキャン量の適正化等
  • データ点数が多くなりすぎてBIツールが重い問題

    • データ取得頻度の変更(今は一秒に一回)
    • クラウド側で10分平均等、随時集計をする
  • 燃費情報とか、ガソリン消費量や残量等をOBD取得データから可視化する

    • ガソリン切れそうだったらSlackで自動通知とか

余談

  • ABEJAではIoTとかBigDataとかAIとかWeb大好きって人を探しております。ぜひお気軽にご連絡ください!

外部参考資料

48
21
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
48
21

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?