BLE
Pythonista
Pythonista3
M5stack
M5Bala

M5BALAでライントレース(iPhone Camera-Pythonista-BLE編)


概要

M5BALAでライントレース、今回は iPhone のカメラで線の位置を認識し、線の位置によって M5BALA の進行方向を制御します。プログラムは iPhone 上の Pythonista による実装です。


  • iPhone と M5Stack は BLE で接続

  • iPhoneのカメラのビデオデータをフレームごとに取得

  • ビデオデータの輝度情報から線を認識

  • 線の位置のと画面の中央位置との差によってM5BALAの動く方向を制御

  • M5Stack側は 「iPhoneを使ってM5BALAを操作する(Pythonista-BLE編)」 のものをそのまま使用


環境


実行例

iPhoneを載せ、前方を照らしながら進む M5BALA。走らせる前、iPhoneの載せた状態でバランスがを取るのが一番難しいところです。微妙な差で、後ろのめり、前のめりになってしまいます。

IMG_0214.jpeg


Pythonista3プログラム

iPhoneのカメラのビデオデータをフレームごとに取得し、輝度情報から線を認識します。線の部分の認識方法は以下の通り。


  • ビデオデータはX軸が短辺、Y軸が長辺。X軸が右端→左端、Y軸=0は画面の上部、Y軸=最大値は画面の下部。

    IMG_2015.jpeg


  • Y軸のどの部分を取り出すのが最適かはあまり試していません。今回はY軸の中央部分を輝度データを左端→右端の配列となるようにデータを取り出しています。値が落ち込んでいる部分が黒線。

    IMG_2016.jpeg


  • 差分を求めると、最小値部分が白から黒へ変化した位置、最大値部分が黒から白へ変化した位置。

    IMG_2017.jpeg


  • 今回のプログラムでは、線の左側が中央に位置するように、M5BALAの動きを制御しています。前方向の進ませ方および回転の度合いについては今回は単純な計算で済ませていますが、PID制御とか、まだまだ改善の余地がありそうです。



M5BALA_iPhone_Video_BLE.py

# coding: utf-8

from objc_util import *
from ctypes import c_void_p, cast
import ui
import cb
import struct
import time
import numpy as np
import matplotlib.pyplot as plt

M5BALA_SERVICE_UUID = 'c44205a6-c87c-11e8-a8d5-f2801f1b9fd1'.upper()
M5BALA_CHARACTERISTIC_UUID = 'c442090c-c87c-11e8-a8d5-f2801f1b9fd1'.upper()
M5BALA_NAME = 'M5BALA'

DEBUG_MODE = False

# iPhone6 ではビデオのフレームレートが 30fps なので
# 制御用の処理は 6 フレームに1回(5fps)とする
FRAME_INTERVAL = 6 # 30fps / 6 = 5fps

TORCH_MODE = True
TURN_MAX = 150
MOVE_OFFSET = 30
LINE_DETECTION_THRESHOLD = -10

frame_counter = 0
last_fps_time = time.time()
fps_counter = 0

main_view = None

AVCaptureSession = ObjCClass('AVCaptureSession')
AVCaptureDevice = ObjCClass('AVCaptureDevice')
AVCaptureDeviceInput = ObjCClass('AVCaptureDeviceInput')
AVCaptureVideoDataOutput = ObjCClass('AVCaptureVideoDataOutput')
AVCaptureVideoPreviewLayer = ObjCClass('AVCaptureVideoPreviewLayer')

dispatch_get_current_queue = c.dispatch_get_current_queue
dispatch_get_current_queue.restype = c_void_p

CMSampleBufferGetImageBuffer = c.CMSampleBufferGetImageBuffer
CMSampleBufferGetImageBuffer.argtypes = [c_void_p]
CMSampleBufferGetImageBuffer.restype = c_void_p

CVPixelBufferLockBaseAddress = c.CVPixelBufferLockBaseAddress
CVPixelBufferLockBaseAddress.argtypes = [c_void_p, c_int]
CVPixelBufferLockBaseAddress.restype = None

CVPixelBufferGetBaseAddressOfPlane = c.CVPixelBufferGetBaseAddressOfPlane
CVPixelBufferGetBaseAddressOfPlane.argtypes = [c_void_p, c_int]
CVPixelBufferGetBaseAddressOfPlane.restype = c_void_p

CVPixelBufferGetBytesPerRowOfPlane = c.CVPixelBufferGetBytesPerRowOfPlane
CVPixelBufferGetBytesPerRowOfPlane.argtypes = [c_void_p, c_int]
CVPixelBufferGetBytesPerRowOfPlane.restype = c_int

CVPixelBufferGetWidthOfPlane = c.CVPixelBufferGetWidthOfPlane
CVPixelBufferGetWidthOfPlane.argtypes = [c_void_p, c_int]
CVPixelBufferGetWidthOfPlane.restype = c_int

CVPixelBufferGetHeightOfPlane = c.CVPixelBufferGetHeightOfPlane
CVPixelBufferGetHeightOfPlane.argtypes = [c_void_p, c_int]
CVPixelBufferGetHeightOfPlane.restype = c_int

CVPixelBufferUnlockBaseAddress = c.CVPixelBufferUnlockBaseAddress
CVPixelBufferUnlockBaseAddress.argtypes = [c_void_p, c_int]
CVPixelBufferUnlockBaseAddress.restype = None

class MyCentralManagerDelegate (object):
def __init__(self):
self.peripheral = None
self.data_characteristics = None

def did_discover_peripheral(self, p):
if p.name and M5BALA_NAME in p.name and not self.peripheral:
# Keep a reference to the peripheral, so it doesn't get garbage-collected:
print('+++ Discovered peripheral: %s (%s)' % (p.name, p.uuid))
self.peripheral = p
cb.connect_peripheral(self.peripheral)
label_status.text = 'Detected'

def did_connect_peripheral(self, p):
print('*** Connected: %s' % p.name)
print('Discovering services...')
label_status.text = 'Connected'
p.discover_services()

def did_fail_to_connect_peripheral(self, p, error):
print('Failed to connect')
label_status.text = 'Failed'

def did_disconnect_peripheral(self, p, error):
print('Disconnected, error: %s' % (error,))
self.peripheral = None
self.data_characteristics = None
label_status.text = 'Scanning'
cb.scan_for_peripherals()

def did_discover_services(self, p, error):
for s in p.services:
print(s.uuid)
if M5BALA_SERVICE_UUID in s.uuid:
print('M5BALA found')
p.discover_characteristics(s)

def did_discover_characteristics(self, s, error):
if M5BALA_SERVICE_UUID in s.uuid:
for c in s.characteristics:
if M5BALA_CHARACTERISTIC_UUID in c.uuid:
self.data_characteristics = c

def send_action(self, move, turn):
if self.peripheral and self.data_characteristics:
data = struct.pack('hh', move, turn)
self.peripheral.write_characteristic_value(self.data_characteristics, data, True)

def captureOutput_didOutputSampleBuffer_fromConnection_(_self, _cmd, _output, _sample_buffer, _conn):
global frame_counter, move, turn
global fps_counter, last_fps_time

fps_counter += 1
now = time.time()
if int(now) > int(last_fps_time):
label_fps.text = '{:5.2f} fps'.format((fps_counter) / (now - last_fps_time))
last_fps_time = now
fps_counter = 0

if frame_counter == 0:
_imageBuffer = CMSampleBufferGetImageBuffer(_sample_buffer)
CVPixelBufferLockBaseAddress(_imageBuffer, 0)

# ビデオデータの形式が YCbCr なので線の認識には輝度データのみを使用する
base_address = CVPixelBufferGetBaseAddressOfPlane(_imageBuffer, 0)
bytes_per_row = CVPixelBufferGetBytesPerRowOfPlane(_imageBuffer, 0)
width = CVPixelBufferGetWidthOfPlane(_imageBuffer, 0)
height = CVPixelBufferGetHeightOfPlane(_imageBuffer, 0)
if DEBUG_MODE:
print(base_address, bytes_per_row, width, height)

# ビデオデータを配列として扱う
img = np.ctypeslib.as_array(cast(base_address, POINTER(c_ubyte)), shape=((height, width)))

# X軸が短辺、Y軸が長辺
# X軸が右端→左端となっているので、左端→右端の配列となるようにデータを取り出す
# Y軸=0 は画面の上部
#imx = img[::-1, 0].astype(np.float32)
# Y軸=-1 は画面の下部
#imx = img[::-1, -1].astype(np.float32)
# 今回はY軸の中央部分を使用
imx = img[::-1, img.shape[1] // 2].astype(np.float32)
# 輝度データの差分を取得する
imd = np.diff(imx)

if DEBUG_MODE:
plt.figure()
plt.imshow(img)
plt.gray()
plt.show()
plt.figure()
plt.plot(imx)
plt.show()
plt.figure()
plt.plot(imd)
plt.show()
plt.close()

# 差分データの最小値、最大値のインデックスを取得
# 最小値:白から黒になる部分(線の左端)
# 最大値:黒から白になる部分(線の右端)
min_i = np.argmin(imd)
max_i = np.argmax(imd)

# move 前方向の移動量は固定値を設定
move = MOVE_OFFSET
# turn 線の左端部分と中央の差によって回転の移動量を計算する
turn = TURN_MAX
imd_xh = imd.shape[0] / 2
# 輝度の差分の最小値が閾値より小さい時に線を認識したとして回転の計算をする
# 閾値以上の場合は線を見失ったものとして最大の回転を行う
if imd[min_i] < LINE_DETECTION_THRESHOLD:
turn = int((min_i - imd_xh) / imd_xh * TURN_MAX)
else:
turn = TURN_MAX

label_action.text = 'min:{} max:{} move:{} turn:{}'.format(imd[min_i], imd[max_i], move, turn)
if DEBUG_MODE:
print(min_i, max_i, imd[min_i], imd[max_i], min_i - imd_xh, move, turn)

if cb_delegate.peripheral and cb_delegate.data_characteristics:
cb_delegate.send_action(move, turn)

CVPixelBufferUnlockBaseAddress(_imageBuffer, 0)

frame_counter = (frame_counter + 1) % FRAME_INTERVAL

sampleBufferDelegate = create_objc_class(
'sampleBufferDelegate',
methods=[captureOutput_didOutputSampleBuffer_fromConnection_],
protocols=['AVCaptureVideoDataOutputSampleBufferDelegate'])

@on_main_thread
def main():
global cb_delegate, main_view, label_status, label_action, label_fps

delegate = sampleBufferDelegate.new()

frame_w = 375
frame_h = 550
main_view = ui.View(frame=(0, 0, frame_w, frame_h))
main_view.name = 'M5BALA Line Trace'
session = AVCaptureSession.alloc().init()
device = AVCaptureDevice.defaultDeviceWithMediaType_('vide')
_input = AVCaptureDeviceInput.deviceInputWithDevice_error_(device, None)
if _input:
session.addInput_(_input)
else:
print('Failed to create input')
return

output = AVCaptureVideoDataOutput.alloc().init()
queue = ObjCInstance(dispatch_get_current_queue())
output.setSampleBufferDelegate_queue_(delegate, queue)
output.alwaysDiscardsLateVideoFrames = True

session.addOutput_(output)
# 用途がライントレース用の線の認識なので 640 x 480 の画像とする
session.sessionPreset = 'AVCaptureSessionPreset640x480'

prev_layer = AVCaptureVideoPreviewLayer.layerWithSession_(session)
prev_layer.frame = ObjCInstance(main_view).bounds()
prev_layer.setVideoGravity_('AVLayerVideoGravityResizeAspectFill')
ObjCInstance(main_view).layer().addSublayer_(prev_layer)

# M5BALA制御用の M5Stack との BLE 接続状況の表示
label_status = ui.Label(frame=(0, 0, frame_w, 30), flex='W', name='status')
label_status.background_color = (0, 0, 0, 0.5)
label_status.text_color = 'white'
label_status.text = 'Scanning'
label_status.alignment = ui.ALIGN_CENTER

# M5Stack に送信する move/turn の値表示
label_action = ui.Label(frame=(0, 30, frame_w, 30), flex='W', name='action')
label_action.background_color = (0, 0, 0, 0.5)
label_action.text_color = 'white'
label_action.text = ''
label_action.alignment = ui.ALIGN_CENTER

# ビデオデータの実 FPS 表示
label_fps = ui.Label(frame=(0, 60, frame_w, 30), flex='W', name='fps')
label_fps.background_color = (0, 0, 0, 0.5)
label_fps.text_color = 'white'
label_fps.text = ''
label_fps.alignment = ui.ALIGN_CENTER

main_view.add_subview(label_status)
main_view.add_subview(label_action)
main_view.add_subview(label_fps)

cb_delegate = MyCentralManagerDelegate()
print('Scanning for peripherals...')
cb.set_central_delegate(cb_delegate)
cb.scan_for_peripherals()

session.startRunning()

# iPhoneのフラッシュを照明として利用
if TORCH_MODE and device.hasTorch():
device.lockForConfiguration_(None)
device.setTorchMode_(1)
device.unlockForConfiguration()

main_view.present('sheet')

main_view.wait_modal()

session.stopRunning()
delegate.release()
session.release()
output.release()
cb.reset()

if __name__ == '__main__':
main()



おまけ