1. はじめに
「カムプログラムロボットをRaspberryPiZeroでリモートコントロール ①キーボード操作編」に続き第2弾です。
Skywriterを使って、ジェスチャーでカムロボット操作にチャレンジします。
当初のイメージでは、糸なしマリオネットで指先だけで簡単にカムロボットをコントロールするので、楽に作ろうと思ってましたが、意に反して難題が多く何とか完成しました。
2. 使用したパーツ
「カムプログラムロボットをRaspberryPiZeroでリモートコントロール ①キーボード操作編」から追加したパーツは、
(1) Raspberry PI
Skywriterを接続して、コントロール用に使用。
(2)Skywriter HAT
Pimoroni製。静電容量を利用した3Dジェスチャー・タッチを検出でき、指を近づけると、X/Y/Zの座標を取得したり、フリックやタップなどのジェスチャーを取得。
(3)超音波距離センサーモジュール HC-SR04
障害物があると、自動的に停止するために使用。
(4)その他
Exploer pHATは、モータとHC-SR04接続に使用。
3. 全体構成
コントローラとカムロボット間は、無線LANでSocket通信です。
カムロボットがサーバ機能、コントローラがクライアント機能で、コントローラがコマンド発効のたびに通信接続⇒メッセージ送受信通信⇒通信切断を行う仕様にしています。
3-1. カムロボット側
Raspberry Pi Zero WHにExplorer pHATをのせて、Explorer pHAt経由で2個のモータの操作と距離センサー(HC-SR04)で情報を入手します。
3-1-1. Explorer pHATの結線
| Explorer phat | モータケーブル |
|---|---|
| MOTORS 1 + | 右モータ青ケーブル |
| MOTORS 1 - | 右モータ黄ケーブル |
| MOTORS 2 + | 左モータ黄ケーブル |
| MOTORS 2 - | 左モータ青ケーブル |
| Explorer phat | HC-SR04 |
|---|---|
| INPUT1 | Echo |
| 5V | Vcc |
| GND | Gnd |
| OUTPUT1 | Trig |
| 注意点:5V---VccとOUTPUT1---Trig間に10kΩのプルアップ抵抗を接続します。 |
3-2. コントローラ側
Skywriter HATをRaspberry Piに挿すだけですので、特に問題ないかと思います。
4. プログラム内容
4-1. カムロボット側
① Robot3Server.py
コントローラからの要求を受け付け、モータに回転指示を出すサーバプログラムです。
IPアドレスはRaspberry Pi Zero WHで設定されたものに変更します。
(HOST_IP = XXX.XXX.XXX.XXX)
ここでは、192.168.1.42を設定しています。
コントローラからコマンド文字を受け取り、
・explorerhat.motor.one.forwards(XX)
・explorerhat.motor.two.forwards(XX)
で動作します。
| 受信文字 | 動作 |
|---|---|
| S | 停止 |
| F | 前進 |
| B | 後退 |
| C | 時計回り回転 |
| D | 反時計回り回転 |
| R | 右 |
| L | 左 |
② Robot3Auto_stop.py
Raspberry Pi Zero WH のOSを起動すると、Explorer pHAT関連のプログラムが起動してモーターが回転してしまいます。
ドライバーの関係でしょうが、ここは簡単にOS起動後に自動起動する「モータを停止する」プログラムで対応しました。
それでも、少し動いて停止しますが我慢してください。
自動起動の方法はいろいろありますが、ここでは/etc/rc.localに停止プログラム(Robot3Auto_stop.py)を登録しました。
また、今回作成したプログラムは ~/robot3ディレクトリーに置いているので、下記内容を最終行(exit 0)の前に挿入してください。
/usr/bin/python3 /home/pi/robot3/Robot3Server_stop.py &
停止プログラムは単にモータを停止するコマンドを送っているだけです。
・explorerhat.motor.one.forwards(0)
・explorerhat.motor.two.forwards(0)
③ Robot3Server_stop.py
障害物手前10cmになると、自動停止して1秒間後退してから停止します。
HC-SR04をExplorer pHATを直接操作するコマンドがありますが(例:explorerhat.output.one.on)、距離検出しないため、ここではGPIOを直接操作しました。
HC-SR04の操作に関しては色々な情報がありますので、ここでは割愛いたします。
尚、気温20度の場合を音速を想定していますが、問題ないと思います。
4-2. コントローラ側
④ Robot3Client.py
Skywriterでのジェスチャー毎にSocket接続して、コマンドをサーバへ送ります。
Skywriterの操作は、
・フリック--- 前進、後退、右、左
・ホイール--- 時計回り回転、反時計回り回転
・タップ----- 停止
です。
5. プログラム
5-1. カムロボット側
# _*_ coding: utf-8 _*_
import socket
import threading
from datetime import datetime
import time
import explorerhat
# IPアドレスはお使いの環境にあわせて修正してください
HOST_IP = "192.168.1.42" # サーバーのIPアドレス
PORT = 60000 # 使用するポート
CLIENTNUM = 3 # クライアントの接続上限数
DATESIZE = 1024 # 受信データバイト数
class SocketServer():
def __init__(self, host, port):
self.host = host
self.port = port
# サーバー起動
def run_server(self):
# server_socketインスタンスを生成
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as server_socket:
server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server_socket.bind((self.host, self.port))
server_socket.listen(CLIENTNUM)
print('[{}] run server'.format(datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
while True:
# クライアントからの接続要求受け入れ
client_socket, address = server_socket.accept()
print('[{0}] connect client -> address : {1}'.format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'), address) )
client_socket.settimeout(60)
# クライアントごとにThread起動 send/recvのやり取りをする
t = threading.Thread(target = self.conn_client, args = (client_socket,address))
t.setDaemon(True)
t.start()
# クライアントごとにThread起動する関数
def conn_client(self, client_socket, address):
with client_socket:
while True:
# クライアントからデータ受信
rcv_data = client_socket.recv(DATESIZE)
# 停止
print(rcv_data)
if rcv_data == b'S':
explorerhat.motor.one.forwards(0)
explorerhat.motor.two.forwards(0)
# 前進
if rcv_data == b'F':
explorerhat.motor.one.forwards(75)
explorerhat.motor.two.forwards(75)
# 後退
if rcv_data == b'B':
explorerhat.motor.one.backwards(75)
explorerhat.motor.two.backwards(75)
# 時計回り回転
if rcv_data == b'C':
explorerhat.motor.one.backwards(75)
explorerhat.motor.two.forwards(75)
# 反時計回り回転
if rcv_data == b'D':
explorerhat.motor.one.forwards(75)
explorerhat.motor.two.backwards(75)
# 右
if rcv_data == b'R':
explorerhat.motor.one.forwards(0)
explorerhat.motor.two.forwards(75)
# 左
if rcv_data == b'L':
explorerhat.motor.one.forwards(75)
explorerhat.motor.two.forwards(0)
#
if rcv_data:
# データ受信したデータをそのままクライアントへ送信
client_socket.send(rcv_data)
print('[{0}] recv date : {1}'.format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'), rcv_data.decode('utf-8')) )
else:
break
print('[{0}] disconnect client -> address : {1}'.format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'), address) )
if __name__ == "__main__":
SocketServer(HOST_IP, PORT).run_server()
# _*_ coding: utf-8 _*_
import RPi.GPIO as GPIO
import time
import signal
import socket
from datetime import datetime
# IPアドレスは、お使いの環境に合わせて修正してください
HOST_IP = "192.168.1.42" # 接続するサーバーのIPアドレス
PORT = 60000 # 接続するサーバーのポート
DATESIZE = 1024 # 受信データバイト数
GPIO.setmode(GPIO.BCM)
TRIG = 6
ECHO = 23
C = 343 # 気温20度の時の音速(m/s)
GPIO.setup(TRIG, GPIO.OUT)
GPIO.setup(ECHO, GPIO.IN)
GPIO.output(TRIG, 0)
time.sleep(0.3)
class SocketClient():
def __init__(self, host, port):
self.host = host
self.port = port
self.socket = None
def send_recv(self, input_data):
# sockインスタンスを生成
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
# ソケットをオープンにして、サーバーに接続
sock.connect((self.host, self.port))
print('[{0}] input data : {1}'.format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'), input_data) )
# 入力データをサーバーへ送信
sock.send(input_data.encode('utf-8'))
# サーバーからのデータを受信
rcv_data = sock.recv(DATESIZE)
rcv_data = rcv_data.decode('utf-8')
print('[{0}] recv data : {1}'.format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'), rcv_data) )
# 障害物までの距離計測
def readDistance():
global signaloff
global signalon
GPIO.output(TRIG, 1)
time.sleep(0.00001) # 0.00001秒待つ
GPIO.output(TRIG, 0)
while GPIO.input(ECHO) == 0:
signaloff = time.time()
while GPIO.input(ECHO) == 1:
signalon = time.time()
t = signalon - signaloff
distance = t * C * 100 / 2
return distance
def cleanup():
print('cleanup')
GPIO.cleanup()
# 障害物までの距離判定による動作
try:
while True:
dist = readDistance()
print(dist)
if dist < 20 : # 障害物まで10cmなら停止し、1秒後退して停止
print("Near")
client = SocketClient(HOST_IP, PORT)
client.send_recv("S") # 停止
client.send_recv("B") # 後退
time.sleep(1)
client.send_recv("S") # 停止
time.sleep(0.5)
except Exception as e:
print(e)
# 終了処理
finally:
cleanup()
# _*_ coding: utf-8 _*_
import time
import explorerhat
def main():
# モータの停止
explorerhat.motor.one.forwards(0)
explorerhat.motor.two.forwards(0)
if __name__ == "__main__":
main()
# !/bin/sh -e
#
# rc.local
#
# This script is executed at the end of each multiuser runlevel.
# Make sure that the script will "exit 0" on success or any other
# value on error.
#
# In order to enable or disable this script just change the execution
# bits.
#
# By default this script does nothing.
# Print the IP address
_IP=$(hostname -I) || true
if [ "$_IP" ]; then
printf "My IP address is %s\n" "$_IP"
fi
/usr/bin/python3 /home/pi/robot3/Robot3Server_stop.py &
exit 0
5-2. コントローラ側
# _*_ coding: utf-8 _*_
import socket
import time
from datetime import datetime
import skywriter
import signal
# IPアドレスはお使いの環境に合わせて修正してください
HOST_IP = "192.168.1.42" # 接続するサーバーのIPアドレス
PORT = 60000 # 接続するサーバーのポート
DATESIZE = 1024 # 受信データバイト数
cmd_str = ""
cmd_str_before = ""
class SocketClient():
def __init__(self, host, port):
self.host = host
self.port = port
self.socket = None
def send_recv(self, input_data):
# sockインスタンスを生成
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
# ソケットをオープンにして、サーバーに接続
sock.connect((self.host, self.port))
print('[{0}] input data : {1}'.format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'), input_data) )
# 入力データをサーバーへ送信
sock.send(input_data.encode('utf-8'))
# サーバーからのデータを受信
rcv_data = sock.recv(DATESIZE)
rcv_data = rcv_data.decode('utf-8')
print('[{0}] recv data : {1}'.format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'), rcv_data) )
# Skywriterの処理
@skywriter.flick()
def flick(start,finish):
global cmd_str
global cmd_str_before
print('Got a flick!', start, finish)
if start == "south" and finish == "north": # 前進
cmd_str = "F"
if start == "north" and finish == "south": # 後退
cmd_str = "B"
if start == "west" and finish == "east": # 右
cmd_str = "R"
if start == "east" and finish == "west": # 左
cmd_str = "L"
# Skywriterの操作で、同じコマンドが連続で送られないようにする
if cmd_str != cmd_str_before :
client = SocketClient(HOST_IP, PORT)
client.send_recv(cmd_str)
cmd_str_before = cmd_str
time.sleep(1)
@skywriter.tap()
def tap(position):
global cmd_str
global cmd_str_before
print('Tap!', position)
if position == "center": # 停止
cmd_str = "S"
client = SocketClient(HOST_IP, PORT)
client.send_recv(cmd_str)
cmd_str_before = cmd_str
time.sleep(1)
@skywriter.airwheel()
def airwheel(delta):
global cmd_str
global cmd_str_before
print('Airwheel', delta)
if delta >= 0: # 時計回り回転
cmd_str = "C"
if delta < 0: # 反時計回り回転
cmd_str = "D"
#
client = SocketClient(HOST_IP, PORT)
client.send_recv(cmd_str)
cmd_str_before = cmd_str
time.sleep(1)
signal.pause()
6. 操作
カムロボット側のプログラムを起動してから、コントロール側を起動してください。
操作は指先で簡単に行えますが、最初のうちはSkywriterとの距離感をつかむのに慣れが必要かもしれません。(1~2cmほど離します)
6-1. 前後左右
6-2. 時計回り回転・反時計回り回転
指先で、回転したい方向に円を描きます。(1~2cmほど離します)

6-3. 停止
Skywriterを軽くタップします。
接触しているので、空中でのジェスチャーとはいいがたいですが...。
7. おわりに
・ジェスチャーセンサーを使ったのは初めてで、距離間やスイープ操作を習得するのに試行錯誤しました。
・障害物自動停止機能を追加したく、Explorer pHATにHC-SR04を接続しました。なかなかデータを取得できず、Trigケーブルに触れるとデータが読み取れて、離すと読み取れない状況でした。
色々情報を探したところ、Raspberry PiとHC-SR04には、Echoに10kΩプルアップ抵抗があった方が良いという情報を頼りに試しましたがうまくいかず、結局Trigに入れてうまく読み取れました。
・HC-SR04の情報を入手するのに、Explorer pHATが提供する関数を使用しましたが、全く情報が取れないので、GPIOで操作しました。
以上、色々ありましたが何とか出来上がりました。
今後、コントローラ側を代えて試す予定で、筋電センサーも試しましたがセンサー1個ではオンオフ操作のみで操作も難しいですので、複数で試したいところです。


