LoginSignup
3

More than 5 years have passed since last update.

M5BALAでライントレース(iPhone Camera-Pythonista-BLE編)

Last updated at Posted at 2018-11-30

概要

M5BALAでライントレース、今回は iPhone のカメラで線の位置を認識し、線の位置によって M5BALA の進行方向を制御します。プログラムは iPhone 上の Pythonista による実装です。

  • iPhone と M5Stack は BLE で接続
  • iPhoneのカメラのビデオデータをフレームごとに取得
  • ビデオデータの輝度情報から線を認識
  • 線の位置のと画面の中央位置との差によってM5BALAの動く方向を制御
  • M5Stack側は 「iPhoneを使ってM5BALAを操作する(Pythonista-BLE編)」 のものをそのまま使用

環境

実行例

iPhoneを載せ、前方を照らしながら進む M5BALA。走らせる前、iPhoneの載せた状態でバランスがを取るのが一番難しいところです。微妙な差で、後ろのめり、前のめりになってしまいます。
IMG_0214.jpeg

Pythonista3プログラム

iPhoneのカメラのビデオデータをフレームごとに取得し、輝度情報から線を認識します。線の部分の認識方法は以下の通り。

  • ビデオデータはX軸が短辺、Y軸が長辺。X軸が右端→左端、Y軸=0は画面の上部、Y軸=最大値は画面の下部。
    IMG_2015.jpeg

  • Y軸のどの部分を取り出すのが最適かはあまり試していません。今回はY軸の中央部分を輝度データを左端→右端の配列となるようにデータを取り出しています。値が落ち込んでいる部分が黒線。
    IMG_2016.jpeg

  • 差分を求めると、最小値部分が白から黒へ変化した位置、最大値部分が黒から白へ変化した位置。
    IMG_2017.jpeg

  • 今回のプログラムでは、線の左側が中央に位置するように、M5BALAの動きを制御しています。前方向の進ませ方および回転の度合いについては今回は単純な計算で済ませていますが、PID制御とか、まだまだ改善の余地がありそうです。

M5BALA_iPhone_Video_BLE.py
# coding: utf-8

from objc_util import *
from ctypes import c_void_p, cast
import ui
import cb
import struct
import time
import numpy as np
import matplotlib.pyplot as plt

M5BALA_SERVICE_UUID        = 'c44205a6-c87c-11e8-a8d5-f2801f1b9fd1'.upper()
M5BALA_CHARACTERISTIC_UUID = 'c442090c-c87c-11e8-a8d5-f2801f1b9fd1'.upper()
M5BALA_NAME = 'M5BALA'

DEBUG_MODE = False

# iPhone6 ではビデオのフレームレートが 30fps なので
# 制御用の処理は 6 フレームに1回(5fps)とする
FRAME_INTERVAL = 6  # 30fps / 6 = 5fps

TORCH_MODE = True
TURN_MAX = 150
MOVE_OFFSET = 30
LINE_DETECTION_THRESHOLD = -10

frame_counter = 0
last_fps_time = time.time()
fps_counter = 0

main_view = None

AVCaptureSession = ObjCClass('AVCaptureSession')
AVCaptureDevice = ObjCClass('AVCaptureDevice')
AVCaptureDeviceInput = ObjCClass('AVCaptureDeviceInput')
AVCaptureVideoDataOutput = ObjCClass('AVCaptureVideoDataOutput')
AVCaptureVideoPreviewLayer = ObjCClass('AVCaptureVideoPreviewLayer')

dispatch_get_current_queue = c.dispatch_get_current_queue
dispatch_get_current_queue.restype = c_void_p

CMSampleBufferGetImageBuffer = c.CMSampleBufferGetImageBuffer
CMSampleBufferGetImageBuffer.argtypes = [c_void_p]
CMSampleBufferGetImageBuffer.restype = c_void_p

CVPixelBufferLockBaseAddress = c.CVPixelBufferLockBaseAddress
CVPixelBufferLockBaseAddress.argtypes = [c_void_p, c_int]
CVPixelBufferLockBaseAddress.restype = None

CVPixelBufferGetBaseAddressOfPlane = c.CVPixelBufferGetBaseAddressOfPlane
CVPixelBufferGetBaseAddressOfPlane.argtypes = [c_void_p, c_int]
CVPixelBufferGetBaseAddressOfPlane.restype = c_void_p

CVPixelBufferGetBytesPerRowOfPlane = c.CVPixelBufferGetBytesPerRowOfPlane
CVPixelBufferGetBytesPerRowOfPlane.argtypes = [c_void_p, c_int]
CVPixelBufferGetBytesPerRowOfPlane.restype = c_int

CVPixelBufferGetWidthOfPlane = c.CVPixelBufferGetWidthOfPlane
CVPixelBufferGetWidthOfPlane.argtypes = [c_void_p, c_int]
CVPixelBufferGetWidthOfPlane.restype = c_int

CVPixelBufferGetHeightOfPlane = c.CVPixelBufferGetHeightOfPlane
CVPixelBufferGetHeightOfPlane.argtypes = [c_void_p, c_int]
CVPixelBufferGetHeightOfPlane.restype = c_int

CVPixelBufferUnlockBaseAddress = c.CVPixelBufferUnlockBaseAddress
CVPixelBufferUnlockBaseAddress.argtypes = [c_void_p, c_int]
CVPixelBufferUnlockBaseAddress.restype = None


class MyCentralManagerDelegate (object):
    def __init__(self):
        self.peripheral = None
        self.data_characteristics = None

    def did_discover_peripheral(self, p):
        if p.name and M5BALA_NAME in p.name and not self.peripheral:
            # Keep a reference to the peripheral, so it doesn't get garbage-collected:
            print('+++ Discovered peripheral: %s (%s)' % (p.name, p.uuid))
            self.peripheral = p
            cb.connect_peripheral(self.peripheral)
            label_status.text = 'Detected'

    def did_connect_peripheral(self, p):
        print('*** Connected: %s' % p.name)
        print('Discovering services...')
        label_status.text = 'Connected'
        p.discover_services()

    def did_fail_to_connect_peripheral(self, p, error):
        print('Failed to connect')
        label_status.text = 'Failed'

    def did_disconnect_peripheral(self, p, error):
        print('Disconnected, error: %s' % (error,))
        self.peripheral = None
        self.data_characteristics = None
        label_status.text = 'Scanning'
        cb.scan_for_peripherals()

    def did_discover_services(self, p, error):
        for s in p.services:
            print(s.uuid)
            if M5BALA_SERVICE_UUID in s.uuid:
                print('M5BALA found')
                p.discover_characteristics(s)

    def did_discover_characteristics(self, s, error):
        if M5BALA_SERVICE_UUID in s.uuid:
            for c in s.characteristics:
                if M5BALA_CHARACTERISTIC_UUID in c.uuid:
                    self.data_characteristics = c

    def send_action(self, move, turn):
        if self.peripheral and self.data_characteristics:
            data = struct.pack('hh', move, turn)
            self.peripheral.write_characteristic_value(self.data_characteristics, data, True)


def captureOutput_didOutputSampleBuffer_fromConnection_(_self, _cmd, _output, _sample_buffer, _conn):
    global frame_counter, move, turn
    global fps_counter, last_fps_time

    fps_counter += 1
    now = time.time()
    if int(now) > int(last_fps_time):
        label_fps.text = '{:5.2f} fps'.format((fps_counter) / (now - last_fps_time))
        last_fps_time = now
        fps_counter = 0

    if frame_counter == 0:
        _imageBuffer = CMSampleBufferGetImageBuffer(_sample_buffer)
        CVPixelBufferLockBaseAddress(_imageBuffer, 0)

        # ビデオデータの形式が YCbCr なので線の認識には輝度データのみを使用する
        base_address  = CVPixelBufferGetBaseAddressOfPlane(_imageBuffer, 0)
        bytes_per_row = CVPixelBufferGetBytesPerRowOfPlane(_imageBuffer, 0)
        width         = CVPixelBufferGetWidthOfPlane(_imageBuffer, 0)
        height        = CVPixelBufferGetHeightOfPlane(_imageBuffer, 0)
        if DEBUG_MODE:
            print(base_address, bytes_per_row, width, height)

        # ビデオデータを配列として扱う
        img = np.ctypeslib.as_array(cast(base_address, POINTER(c_ubyte)), shape=((height, width)))

        # X軸が短辺、Y軸が長辺
        # X軸が右端→左端となっているので、左端→右端の配列となるようにデータを取り出す
        # Y軸=0 は画面の上部
        #imx = img[::-1, 0].astype(np.float32)
        # Y軸=-1 は画面の下部
        #imx = img[::-1, -1].astype(np.float32)
        # 今回はY軸の中央部分を使用
        imx = img[::-1, img.shape[1] // 2].astype(np.float32)
        # 輝度データの差分を取得する
        imd = np.diff(imx)

        if DEBUG_MODE:
            plt.figure()
            plt.imshow(img)
            plt.gray()
            plt.show()
            plt.figure()
            plt.plot(imx)
            plt.show()
            plt.figure()
            plt.plot(imd)
            plt.show()
            plt.close()

        # 差分データの最小値、最大値のインデックスを取得
        # 最小値:白から黒になる部分(線の左端)
        # 最大値:黒から白になる部分(線の右端)
        min_i = np.argmin(imd)
        max_i = np.argmax(imd)

        # move 前方向の移動量は固定値を設定
        move = MOVE_OFFSET
        # turn 線の左端部分と中央の差によって回転の移動量を計算する
        turn = TURN_MAX
        imd_xh = imd.shape[0] / 2
        # 輝度の差分の最小値が閾値より小さい時に線を認識したとして回転の計算をする
        # 閾値以上の場合は線を見失ったものとして最大の回転を行う
        if imd[min_i] < LINE_DETECTION_THRESHOLD:
            turn = int((min_i - imd_xh) / imd_xh * TURN_MAX)
        else:
            turn = TURN_MAX

        label_action.text = 'min:{} max:{} move:{} turn:{}'.format(imd[min_i], imd[max_i], move, turn)
        if DEBUG_MODE:
            print(min_i, max_i, imd[min_i], imd[max_i], min_i - imd_xh, move, turn)

        if cb_delegate.peripheral and cb_delegate.data_characteristics:
            cb_delegate.send_action(move, turn)

        CVPixelBufferUnlockBaseAddress(_imageBuffer, 0)

    frame_counter = (frame_counter + 1) % FRAME_INTERVAL


sampleBufferDelegate = create_objc_class(
                            'sampleBufferDelegate',
                            methods=[captureOutput_didOutputSampleBuffer_fromConnection_],
                            protocols=['AVCaptureVideoDataOutputSampleBufferDelegate'])

@on_main_thread
def main():
    global cb_delegate, main_view, label_status, label_action, label_fps

    delegate = sampleBufferDelegate.new()

    frame_w = 375
    frame_h = 550
    main_view = ui.View(frame=(0, 0, frame_w, frame_h))
    main_view.name = 'M5BALA Line Trace'
    session = AVCaptureSession.alloc().init()
    device = AVCaptureDevice.defaultDeviceWithMediaType_('vide')
    _input = AVCaptureDeviceInput.deviceInputWithDevice_error_(device, None)
    if _input:
        session.addInput_(_input)
    else:
        print('Failed to create input')
        return

    output = AVCaptureVideoDataOutput.alloc().init()
    queue = ObjCInstance(dispatch_get_current_queue())
    output.setSampleBufferDelegate_queue_(delegate, queue)
    output.alwaysDiscardsLateVideoFrames = True

    session.addOutput_(output)
    # 用途がライントレース用の線の認識なので 640 x 480 の画像とする
    session.sessionPreset = 'AVCaptureSessionPreset640x480'

    prev_layer = AVCaptureVideoPreviewLayer.layerWithSession_(session)
    prev_layer.frame = ObjCInstance(main_view).bounds()
    prev_layer.setVideoGravity_('AVLayerVideoGravityResizeAspectFill')
    ObjCInstance(main_view).layer().addSublayer_(prev_layer)

    # M5BALA制御用の M5Stack との BLE 接続状況の表示
    label_status = ui.Label(frame=(0, 0, frame_w, 30), flex='W', name='status')
    label_status.background_color = (0, 0, 0, 0.5)
    label_status.text_color = 'white'
    label_status.text = 'Scanning'
    label_status.alignment = ui.ALIGN_CENTER

    # M5Stack に送信する move/turn の値表示
    label_action = ui.Label(frame=(0, 30, frame_w, 30), flex='W', name='action')
    label_action.background_color = (0, 0, 0, 0.5)
    label_action.text_color = 'white'
    label_action.text = ''
    label_action.alignment = ui.ALIGN_CENTER

    # ビデオデータの実 FPS 表示
    label_fps = ui.Label(frame=(0, 60, frame_w, 30), flex='W', name='fps')
    label_fps.background_color = (0, 0, 0, 0.5)
    label_fps.text_color = 'white'
    label_fps.text = ''
    label_fps.alignment = ui.ALIGN_CENTER

    main_view.add_subview(label_status)
    main_view.add_subview(label_action)
    main_view.add_subview(label_fps)

    cb_delegate = MyCentralManagerDelegate()
    print('Scanning for peripherals...')
    cb.set_central_delegate(cb_delegate)
    cb.scan_for_peripherals()

    session.startRunning()

    # iPhoneのフラッシュを照明として利用
    if TORCH_MODE and device.hasTorch():
        device.lockForConfiguration_(None)
        device.setTorchMode_(1)
        device.unlockForConfiguration()

    main_view.present('sheet')

    main_view.wait_modal()

    session.stopRunning()
    delegate.release()
    session.release()
    output.release()
    cb.reset()

if __name__ == '__main__':
    main()

おまけ

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
3