概要
M5BALAでライントレース、今回は iPhone のカメラで線の位置を認識し、線の位置によって M5BALA の進行方向を制御します。プログラムは iPhone 上の Pythonista による実装です。
- iPhone と M5Stack は BLE で接続
- iPhoneのカメラのビデオデータをフレームごとに取得
- ビデオデータの輝度情報から線を認識
- 線の位置のと画面の中央位置との差によってM5BALAの動く方向を制御
- M5Stack側は 「iPhoneを使ってM5BALAを操作する(Pythonista-BLE編)」 のものをそのまま使用
環境
- Pythonista3 v3.2
- iPhone6 iOS 12.1
- M5BALA+M5Stack FIRE
-
Arduino 1.8.7 以下のライブラリを使用(Arduino IDEの「ライブラリを管理」からインストール可)
- M5Stack 0.2.4 - https://github.com/m5stack/M5Stack
- MPU6050_tockn 1.4.0 - https://github.com/tockn/MPU6050_tockn
- NeoPixelBus 2.3.4 - https://github.com/Makuna/NeoPixelBus
実行例
iPhoneを載せ、前方を照らしながら進む M5BALA。走らせる前、iPhoneの載せた状態でバランスがを取るのが一番難しいところです。微妙な差で、後ろのめり、前のめりになってしまいます。
M5BALAと遊ぼう、iPhoneのカメラでライントレースの3回目。今回が本来やりたかったPythonistaでカメラのビデオデータを処理する方式。M5StackとBLEで接続して制御する部分は同じです。コースはくびれを入れたくて少し大きくしました。
— 稲澤祐一 (@inasawa) 2018年11月30日
実装はこちら。 https://t.co/6vFv8rLZPH#M5Stack #M5BALA pic.twitter.com/fasAjQIgcv
Pythonista3プログラム
iPhoneのカメラのビデオデータをフレームごとに取得し、輝度情報から線を認識します。線の部分の認識方法は以下の通り。
Y軸のどの部分を取り出すのが最適かはあまり試していません。今回はY軸の中央部分を輝度データを左端→右端の配列となるようにデータを取り出しています。値が落ち込んでいる部分が黒線。
今回のプログラムでは、線の左側が中央に位置するように、M5BALAの動きを制御しています。前方向の進ませ方および回転の度合いについては今回は単純な計算で済ませていますが、PID制御とか、まだまだ改善の余地がありそうです。
# coding: utf-8
from objc_util import *
from ctypes import c_void_p, cast
import ui
import cb
import struct
import time
import numpy as np
import matplotlib.pyplot as plt
M5BALA_SERVICE_UUID = 'c44205a6-c87c-11e8-a8d5-f2801f1b9fd1'.upper()
M5BALA_CHARACTERISTIC_UUID = 'c442090c-c87c-11e8-a8d5-f2801f1b9fd1'.upper()
M5BALA_NAME = 'M5BALA'
DEBUG_MODE = False
# iPhone6 ではビデオのフレームレートが 30fps なので
# 制御用の処理は 6 フレームに1回(5fps)とする
FRAME_INTERVAL = 6 # 30fps / 6 = 5fps
TORCH_MODE = True
TURN_MAX = 150
MOVE_OFFSET = 30
LINE_DETECTION_THRESHOLD = -10
frame_counter = 0
last_fps_time = time.time()
fps_counter = 0
main_view = None
AVCaptureSession = ObjCClass('AVCaptureSession')
AVCaptureDevice = ObjCClass('AVCaptureDevice')
AVCaptureDeviceInput = ObjCClass('AVCaptureDeviceInput')
AVCaptureVideoDataOutput = ObjCClass('AVCaptureVideoDataOutput')
AVCaptureVideoPreviewLayer = ObjCClass('AVCaptureVideoPreviewLayer')
dispatch_get_current_queue = c.dispatch_get_current_queue
dispatch_get_current_queue.restype = c_void_p
CMSampleBufferGetImageBuffer = c.CMSampleBufferGetImageBuffer
CMSampleBufferGetImageBuffer.argtypes = [c_void_p]
CMSampleBufferGetImageBuffer.restype = c_void_p
CVPixelBufferLockBaseAddress = c.CVPixelBufferLockBaseAddress
CVPixelBufferLockBaseAddress.argtypes = [c_void_p, c_int]
CVPixelBufferLockBaseAddress.restype = None
CVPixelBufferGetBaseAddressOfPlane = c.CVPixelBufferGetBaseAddressOfPlane
CVPixelBufferGetBaseAddressOfPlane.argtypes = [c_void_p, c_int]
CVPixelBufferGetBaseAddressOfPlane.restype = c_void_p
CVPixelBufferGetBytesPerRowOfPlane = c.CVPixelBufferGetBytesPerRowOfPlane
CVPixelBufferGetBytesPerRowOfPlane.argtypes = [c_void_p, c_int]
CVPixelBufferGetBytesPerRowOfPlane.restype = c_int
CVPixelBufferGetWidthOfPlane = c.CVPixelBufferGetWidthOfPlane
CVPixelBufferGetWidthOfPlane.argtypes = [c_void_p, c_int]
CVPixelBufferGetWidthOfPlane.restype = c_int
CVPixelBufferGetHeightOfPlane = c.CVPixelBufferGetHeightOfPlane
CVPixelBufferGetHeightOfPlane.argtypes = [c_void_p, c_int]
CVPixelBufferGetHeightOfPlane.restype = c_int
CVPixelBufferUnlockBaseAddress = c.CVPixelBufferUnlockBaseAddress
CVPixelBufferUnlockBaseAddress.argtypes = [c_void_p, c_int]
CVPixelBufferUnlockBaseAddress.restype = None
class MyCentralManagerDelegate (object):
def __init__(self):
self.peripheral = None
self.data_characteristics = None
def did_discover_peripheral(self, p):
if p.name and M5BALA_NAME in p.name and not self.peripheral:
# Keep a reference to the peripheral, so it doesn't get garbage-collected:
print('+++ Discovered peripheral: %s (%s)' % (p.name, p.uuid))
self.peripheral = p
cb.connect_peripheral(self.peripheral)
label_status.text = 'Detected'
def did_connect_peripheral(self, p):
print('*** Connected: %s' % p.name)
print('Discovering services...')
label_status.text = 'Connected'
p.discover_services()
def did_fail_to_connect_peripheral(self, p, error):
print('Failed to connect')
label_status.text = 'Failed'
def did_disconnect_peripheral(self, p, error):
print('Disconnected, error: %s' % (error,))
self.peripheral = None
self.data_characteristics = None
label_status.text = 'Scanning'
cb.scan_for_peripherals()
def did_discover_services(self, p, error):
for s in p.services:
print(s.uuid)
if M5BALA_SERVICE_UUID in s.uuid:
print('M5BALA found')
p.discover_characteristics(s)
def did_discover_characteristics(self, s, error):
if M5BALA_SERVICE_UUID in s.uuid:
for c in s.characteristics:
if M5BALA_CHARACTERISTIC_UUID in c.uuid:
self.data_characteristics = c
def send_action(self, move, turn):
if self.peripheral and self.data_characteristics:
data = struct.pack('hh', move, turn)
self.peripheral.write_characteristic_value(self.data_characteristics, data, True)
def captureOutput_didOutputSampleBuffer_fromConnection_(_self, _cmd, _output, _sample_buffer, _conn):
global frame_counter, move, turn
global fps_counter, last_fps_time
fps_counter += 1
now = time.time()
if int(now) > int(last_fps_time):
label_fps.text = '{:5.2f} fps'.format((fps_counter) / (now - last_fps_time))
last_fps_time = now
fps_counter = 0
if frame_counter == 0:
_imageBuffer = CMSampleBufferGetImageBuffer(_sample_buffer)
CVPixelBufferLockBaseAddress(_imageBuffer, 0)
# ビデオデータの形式が YCbCr なので線の認識には輝度データのみを使用する
base_address = CVPixelBufferGetBaseAddressOfPlane(_imageBuffer, 0)
bytes_per_row = CVPixelBufferGetBytesPerRowOfPlane(_imageBuffer, 0)
width = CVPixelBufferGetWidthOfPlane(_imageBuffer, 0)
height = CVPixelBufferGetHeightOfPlane(_imageBuffer, 0)
if DEBUG_MODE:
print(base_address, bytes_per_row, width, height)
# ビデオデータを配列として扱う
img = np.ctypeslib.as_array(cast(base_address, POINTER(c_ubyte)), shape=((height, width)))
# X軸が短辺、Y軸が長辺
# X軸が右端→左端となっているので、左端→右端の配列となるようにデータを取り出す
# Y軸=0 は画面の上部
#imx = img[::-1, 0].astype(np.float32)
# Y軸=-1 は画面の下部
#imx = img[::-1, -1].astype(np.float32)
# 今回はY軸の中央部分を使用
imx = img[::-1, img.shape[1] // 2].astype(np.float32)
# 輝度データの差分を取得する
imd = np.diff(imx)
if DEBUG_MODE:
plt.figure()
plt.imshow(img)
plt.gray()
plt.show()
plt.figure()
plt.plot(imx)
plt.show()
plt.figure()
plt.plot(imd)
plt.show()
plt.close()
# 差分データの最小値、最大値のインデックスを取得
# 最小値:白から黒になる部分(線の左端)
# 最大値:黒から白になる部分(線の右端)
min_i = np.argmin(imd)
max_i = np.argmax(imd)
# move 前方向の移動量は固定値を設定
move = MOVE_OFFSET
# turn 線の左端部分と中央の差によって回転の移動量を計算する
turn = TURN_MAX
imd_xh = imd.shape[0] / 2
# 輝度の差分の最小値が閾値より小さい時に線を認識したとして回転の計算をする
# 閾値以上の場合は線を見失ったものとして最大の回転を行う
if imd[min_i] < LINE_DETECTION_THRESHOLD:
turn = int((min_i - imd_xh) / imd_xh * TURN_MAX)
else:
turn = TURN_MAX
label_action.text = 'min:{} max:{} move:{} turn:{}'.format(imd[min_i], imd[max_i], move, turn)
if DEBUG_MODE:
print(min_i, max_i, imd[min_i], imd[max_i], min_i - imd_xh, move, turn)
if cb_delegate.peripheral and cb_delegate.data_characteristics:
cb_delegate.send_action(move, turn)
CVPixelBufferUnlockBaseAddress(_imageBuffer, 0)
frame_counter = (frame_counter + 1) % FRAME_INTERVAL
sampleBufferDelegate = create_objc_class(
'sampleBufferDelegate',
methods=[captureOutput_didOutputSampleBuffer_fromConnection_],
protocols=['AVCaptureVideoDataOutputSampleBufferDelegate'])
@on_main_thread
def main():
global cb_delegate, main_view, label_status, label_action, label_fps
delegate = sampleBufferDelegate.new()
frame_w = 375
frame_h = 550
main_view = ui.View(frame=(0, 0, frame_w, frame_h))
main_view.name = 'M5BALA Line Trace'
session = AVCaptureSession.alloc().init()
device = AVCaptureDevice.defaultDeviceWithMediaType_('vide')
_input = AVCaptureDeviceInput.deviceInputWithDevice_error_(device, None)
if _input:
session.addInput_(_input)
else:
print('Failed to create input')
return
output = AVCaptureVideoDataOutput.alloc().init()
queue = ObjCInstance(dispatch_get_current_queue())
output.setSampleBufferDelegate_queue_(delegate, queue)
output.alwaysDiscardsLateVideoFrames = True
session.addOutput_(output)
# 用途がライントレース用の線の認識なので 640 x 480 の画像とする
session.sessionPreset = 'AVCaptureSessionPreset640x480'
prev_layer = AVCaptureVideoPreviewLayer.layerWithSession_(session)
prev_layer.frame = ObjCInstance(main_view).bounds()
prev_layer.setVideoGravity_('AVLayerVideoGravityResizeAspectFill')
ObjCInstance(main_view).layer().addSublayer_(prev_layer)
# M5BALA制御用の M5Stack との BLE 接続状況の表示
label_status = ui.Label(frame=(0, 0, frame_w, 30), flex='W', name='status')
label_status.background_color = (0, 0, 0, 0.5)
label_status.text_color = 'white'
label_status.text = 'Scanning'
label_status.alignment = ui.ALIGN_CENTER
# M5Stack に送信する move/turn の値表示
label_action = ui.Label(frame=(0, 30, frame_w, 30), flex='W', name='action')
label_action.background_color = (0, 0, 0, 0.5)
label_action.text_color = 'white'
label_action.text = ''
label_action.alignment = ui.ALIGN_CENTER
# ビデオデータの実 FPS 表示
label_fps = ui.Label(frame=(0, 60, frame_w, 30), flex='W', name='fps')
label_fps.background_color = (0, 0, 0, 0.5)
label_fps.text_color = 'white'
label_fps.text = ''
label_fps.alignment = ui.ALIGN_CENTER
main_view.add_subview(label_status)
main_view.add_subview(label_action)
main_view.add_subview(label_fps)
cb_delegate = MyCentralManagerDelegate()
print('Scanning for peripherals...')
cb.set_central_delegate(cb_delegate)
cb.scan_for_peripherals()
session.startRunning()
# iPhoneのフラッシュを照明として利用
if TORCH_MODE and device.hasTorch():
device.lockForConfiguration_(None)
device.setTorchMode_(1)
device.unlockForConfiguration()
main_view.present('sheet')
main_view.wait_modal()
session.stopRunning()
delegate.release()
session.release()
output.release()
cb.reset()
if __name__ == '__main__':
main()
おまけ
ライントレースの線の判定ロジックを修正して、ケーブルトレース。(ほぼ)ケーブルに触れることなくたどっています。これならコースをより自由に作れる、と思ったけど急なカーブはやはり苦手です。#M5Stack #M5BALA pic.twitter.com/iT2zKmugJG
— 稲澤祐一 (@inasawa) 2018年12月1日