1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

RPiにつないだコントローラの入力をLAN経由で流したい(VRPN編)

Posted at

在宅勤務だと、仕事とプライベートの境目が曖昧になりがちなため、休日の自主研究の内容は Qiita にアウトプットすることで区切りをつけようというライフハック。

やりたいこと

LAN 上にある Raspberry Pi につないだコントローラの値を、別のPC上でリアルタイムに受け取りたい。

やること

LAN 上に VRPN のサーバを立てる。
VRPN は、サーバが何らかの方法で得たペリフェラルの値を、接続したクライアントに流すというプロトコル。
VRPN は、地味に様々な VR 関連のデバイスに対応しているため、他のデバイスの都合で実装を集約したいときには便利。

ただし、無線LANに繋がった Raspberry Pi 上で直接 VRPN サーバを立てると、無線LAN上でリアルタイム通信を TCP でやることになり、ハゲそうな気持ちになるので、やめたい。
無線区間はユニキャストの UDP で行きたい。(無線LAN上でブロードキャストの UDP を投げると、それはそれでハゲそうな気持ちになります)

VRPN Server には、JsonNet というデバイスが実装されており、JSON over UDP にてコントローラの値を投げつけると、それを反映してくれる。(名称が同一の全く別のプロジェクトがあるため、検索しづらい)

ということで、RaspberryPi 上から JSON を UDP で投げることに。
VRPN ではなく OSC 使いたくなったときも、このくらい論理的に分割されていれば、載せ替えは簡単なはず……。

Raspberry Pi 側実装

コントローラとしては、GPIO につないだボタンや、USB につないだゲームパッドを想定。

下の python3 のサンプルは、GPIO17に刺したボタンをVRPNのJsonNetのButtonの0番目にしつつ、/dev/input/js0のAXISの0番1番(メインのスティックのX軸Y軸)をVRPNのJsonNetのAnalogの0番1番にする例。

/dev/input/js0 からくる event の timer の精度が10msだったので、10ms単位で最新の情報を常に UDP で送り続ける実装です。JsonNet で JSON を1つの UDP パケットに複数 pack できるのか確認できていないため、センサごとに UDP を1つ送っていますが、今時の無線APなら、300pps くらいは余裕で通してほしいところ。

なお、つい、実装しやすさから python3 を選んでしまいましたが、リアルタイム性が要求されるので、GCの心配のないC++で書く案件だという自覚はあります。

# -*- coding: utf-8 -*-
from pprint import pprint
from struct import unpack
from threading import Thread, Event
from queue import Queue, Empty
import signal
import json
from socket import socket, AF_INET, SOCK_DGRAM
from time import sleep


JS = 1
GPI = 2

jsonnet_host = "192.168.xxx.xxx"
jsonnet_port = 7777

device_path = '/dev/input/js0'
send_interval = 0.01 # 100Hz
button_targets = [(GPI, 17)]
analog_targets = [(JS, 0), (JS, 1)]

timer_event = Event()
def handle_timer_intr(_1, _2):
  timer_event.set()


# JoyStick
JS_BUTTON = 1
JS_AXIS = 2
JS_INIT = 0x80

def handle_device_thread(path, q):
  with open(path, 'rb') as f:
    while True:
      raw = f.read(8)
      # https://www.kernel.org/doc/Documentation/input/joystick-api.txt
      buf = unpack('LhBB', raw)

      event = {}

      event["time"] = buf[0];
      event["value"] = buf[1];
      event["type"] = buf[2]
      event["number"] = buf[3]

      if (event["type"] & JS_INIT) == JS_INIT:
        event["type"] -= JS_INIT
        event["init"] = True
      else:
        event["init"] = False

      q.put(event)

js_queue = Queue(32)

js_button_states = {}
js_analog_states = {}
for t in button_targets:
  if t[0] == JS:
    js_button_states[t[1]] = False
for t in analog_targets:
  if t[0] == JS:
    js_analog_states[t[1]] = 0.0

def update_js():
  try:
    while True:
      event = js_queue.get_nowait()
      pprint(event)
      if event["type"] == JS_BUTTON:
        if event["number"] in js_button_states:
          js_button_states[event["number"]] = (event["value"] != 0)
      elif event["type"] == JS_AXIS:
        if event["number"] in js_analog_states:
          js_analog_states[event["number"]] = event["value"] / 32768.0
  except Empty:
    pass

# GPIO
import RPi.GPIO as GPIO

gpi_queue = Queue()

gpi_button_states = {}

def handle_gpio(gpio_pin):
  state = GPIO.input(gpio_pin)
  event = dict(
    number=gpio_pin,
    value=state
  )
  gpi_queue.put(event)

GPIO.setmode(GPIO.BCM)

for t in button_targets:
  if t[0] == GPI:
    gpi_button_states[t[1]] = False
    GPIO.setup(t[1], GPIO.IN)
    GPIO.add_event_detect(t[1], GPIO.BOTH, callback=handle_gpio, bouncetime=100)

def update_gpi():
  try:
    while True:
      event = gpi_queue.get_nowait()
      pprint(event)
      if event["number"] in gpi_button_states:
          gpi_button_states[event["number"]] = (event["value"] != 0)
  except Empty:
    pass


# Common
def get_button_state(t):
  if t[0] == JS:
    return js_button_states[t[1]]
  elif t[0] == GPI:
    return gpi_button_states[t[1]]
  return False

def get_analog_state(t):
  if t[0] == JS:
    return js_analog_states[t[1]]
  return 0.0

def update():
  update_js()
  update_gpi()

sock = socket(AF_INET, SOCK_DGRAM)

def send(host, port):
  for i, t in enumerate(button_targets):
    packet = json.dumps(dict(
      type=2, button=i, state=get_button_state(t)
    ))
    #print(packet)
    sock.sendto(packet.encode(), (host, port))
  
  for i, t in enumerate(analog_targets):
    packet = json.dumps(dict(
      type=3, num=i, data=get_analog_state(t)
    ))
    #print(packet)
    sock.sendto(packet.encode(), (host, port))


signal.signal(signal.SIGALRM, handle_timer_intr)
signal.setitimer(signal.ITIMER_REAL, send_interval, send_interval)
  
thread = Thread(target=handle_device_thread, args=(device_path, js_queue))
thread.setDaemon(True)
thread.start()

try:
  while True:
    timer_event.wait()
    timer_event.clear()
    update()
    send(jsonnet_host, jsonnet_port)
    #sleep(1)
except KeyboardInterrupt:
  sock.close()
  GPIO.cleanup()

VRPN Server 側の動作確認

VRPN Server のビルド

$ clone https://github.com/vrpn/vrpn
$ cd vrpn
$ git submodule update -i
$ mkdir build
$ cd build
$ ccmake ..
  (コンパイルオプションの中で VRPN_USE_JSONNET と VRPN_USE_LOCAL_JSONCPP を ON にする)
$ make

VRPN Server の動作確認

Server 側

$ cd server_src
$ vi vrpn.cfg
  vrpn_Tracker_JsonNet Jsonnet  7777 の行をコメントアウト
$ ./vrpn_server -v

Client 側

別窓で。

$ cd client_src
$ ./vrpn_print_devices -notracker -nodial -notext Jsonnet@localhost
(中略)
#########################################
Button Jsonnet@localhost, number 0 was just pressed
##########################################
##########################################
Button Jsonnet@localhost, number 0 was just released
##########################################

備考

JsonNet の JSON フォーマット

/* トラッカー */
{
  'type': 1,
  'id': sensor number,
  'quat': quaternion,
  'pos': position
}
/* ボタン */ 
{
  'type': 2,
  'button': button number,
  'state': the boolean state
}
/* アナログ */
{
  'type': 3,
  'num': channel number,
  'data': the analog value
}
/* テキストメッセージ */
{
  'type': 4,
  'data': the text value
}
1
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?