知人の手伝いでラズパイを使って美術展示を制御するプログラムを書きました。
初めてオブジェクト指向なプログラム(多分)も書いたので備忘録がてら掲載します。
PythonとCV2で動画を等速等倍の音声付で再生する制御も取り組みましたのでご参考になれば幸いです。
展示概要
自分がメインのプロジェクトではないので一応詳細は伏せますが、展示概要は
鑑賞者が個室に入ると部屋が光に包まれる。天井には常識を超越した映像が投影され、観客は困惑と興奮の中で超感覚体験をする。
みたいな感じです。
もっと凄いイイ感じの展示なんですけど僕の日本語力が及ばず魅力が伝えられなくて申し訳ないです。
僕は現代アートなるものは難しくて避けがちなのですが、本展示製作者の一員である友人曰く鑑賞者が宇宙猫になれるのを目指したとのことです。
いいですね。
そういうのは好きです。
システム概要
制御のコアはラズパイが担います。
GPIOでドアの開閉とリセット操作を検知します。
センサー自体は磁石で通電と遮断が変わる簡単なものです。
映像出力はHDMI端子からプロジェクターに流し込みます。
ライト関係の制御はwebhock
を通してIFTTT
からSwitchbot
を操作します。
二つライトがあり、一つは赤外線リモコンで操作するものなのでhub mini
を介して赤外線で操作します。
もう一つはSwitchbot製のものなので、ライトをWi-Fiに接続し直接操作します。
プログラム
プログラムはPythonで書きました。
Cやら何やら触ってきて改めて思いますが、本当にPythonは楽ですね。
バックグラウンドで勝手にイイ感じにしてくれて、ライブラリも豊富で、人気なのも納得です。
import pigpio # import Raspberry Pi's pin library
import time # import library for sleep function
import numpy as np # import library for blank screen function
import os # import library for environment varieties
import cv2 # import library for display movie function
import simpleaudio # import library for audio function
import requests # import library for send request to webhock
import subprocess # import library for hide mouse cursore
from concurrent.futures import ThreadPoolExecutor # import library for multi-thread
RESET_PIN = 3 # set read pin for reset
DOOR_PIN = 4 # set read pin of sensing a door
pi = pigpio.pi() # initialize pigpio
pi.set_mode(DOOR_PIN, pigpio.INPUT) # set pigpio pin mode as input (read level of the pin)
pi.set_mode(RESET_PIN, pigpio.INPUT)
os.environ['DISPLAY'] = ':0' # select output display
unclutter = subprocess.Popen(['unclutter', '-idle', '0.1', '-root']) # hide mouse cursore
# Playback related functions
class Playback:
height = 720 # set height of blank screen
width = 1280 # set width of blank screen
black = np.zeros((height, width, 3)) # generate color info of (0, 0, 0) on each pixel
status = True # a flag for the exhibition is running
def stop(self): # play -> blank
self.is_blank = True # a flag for screen status
def play(self): # blank -> play
self.is_blank = False
def halt(self): # stop all playback: terminate window then back to the desktop
self.status = False # = the main program received a stop signal
def audio(self): # audio related functions
wav = simpleaudio.WaveObject.from_wave_file('./audio.wav') # load WAV file
self.start = time.perf_counter() # time when the audio started playing
return wav.play() # return playing instance
def main(self): # main function of playback
cv2.namedWindow('Video', cv2.WINDOW_NORMAL) # open window for video
cv2.setWindowProperty('Video', cv2.WND_PROP_FULLSCREEN, cv2.WINDOW_FULLSCREEN) # make the window full screen
while True: # repeat during the exhibition
self.is_blank = True # darkness was upon the face of the deep
cv2.imshow('Video', self.black) # display blank screen
while self.is_blank and self.status: # continue blank screen while the flags are true
cv2.waitKey(1) # 1 blank frame
# break loop
executer_audio = ThreadPoolExecutor()
play_obj = executer_audio.submit(self.audio) # start playing audio
####
## in python, cv2 cannot play movie with audio
## play audio by executing it in parallel threads
####
print('VIDEO START')
cap = cv2.VideoCapture(r'./video.mp4') # load movie
count = 0 # frame founter
while cap.isOpened(): # repeat until the movie ended
ret, frame = cap.read() # load current frame
cf = (time.perf_counter() - self.start) * 24 # calculate the appropriate frame based on time
if count < cf: # if calculated frame is larger than the actual frame
count += 1 # increase the count
continue # break current loop: skip current frame
####
## actual playback speed is slower than the original due to raspi's hardware restriction
## to sync video and audio, skip some frames if the current frame is behind then calculate
####
if ret and not self.is_blank and self.status: # if playback should be continue
cv2.imshow('Video', frame) # display the currect frame
cv2.waitKey(1) # 1 frame
else: # else playback should be ended
break # break loop: end playback -> blank screen
count += 1 # increase the count
print('VIDEO END')
if play_obj.result().is_playing(): # if audio is still playing
play_obj.result().stop() # stop playing audio
executer_audio.shutdown() # shutdown sub thread
cap.release() # free loaded movie
if not self.status: # if main program received stop signal
break # end playback: return to desktop
cv2.destroyAllWindows() # close all window
class Roop:
global DOOR_PIN
status = True # a flag of exhibition is runnning
aurora_status = False
strip_status = False
is_in = False # a flag of whether people are inside or not
def off(self):
if self.aurora_status:
self.kick(0, 0, True)
if self.strip_status:
self.kick(1, 0, True)
def reset(self):
self.is_in = False
if not self.pause:
self.off()
self.end()
def halt(self):
self.status = False
self.off()
self.end()
def wait_in(self): # a function of wait going IN
if pi.read(DOOR_PIN): # if the door is closed
while pi.read(DOOR_PIN): # wait until door is opened
time.sleep(0.1)
self.pause = False
self.is_in = True
print('GOING IN')
else: # else the door is opened
while not pi.read(DOOR_PIN): # wait until door is closed
time.sleep(0.1)
self.wait_in() # closed, then wait until open
def wait_out_exac(self): # a function of wait going OUT
if self.pause: # if paused: reset button is pushed
return
if pi.read(DOOR_PIN): # if the door is closed
while pi.read(DOOR_PIN) and not self.pause: # wait until the door is opened
time.sleep(0.1)
print('GOING OUT')
self.pb.stop()
self.is_in = False
self.reset()
else: # else the door is opened
while not pi.read(DOOR_PIN) and not self.pause: # wait until the door is closed
time.sleep(0.1)
self.wait_out_exac() # closed, then wait until open
def wait_out(self):
executer_wait = ThreadPoolExecutor()
executer_wait.submit(self.wait_out_exac)
return executer_wait
def end(self):
self.pause = True
if self.is_in:
self.wo.shutdown()
def kick(self, d: int, m: int, f = False):
if self.pause and not f:
return
if d:
if m:
triger = 'strip_on'
self.strip_status = True
else:
triger = 'strip_off'
self.strip_status = False
else:
if m:
triger = 'aurora_on'
self.aurora_status = True
else:
triger = 'aurora_off'
self.aurora_status = False
url = "https://maker.ifttt.com/trigger/" + triger + "/json/with/key/XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX"
requests.get(url)
print(triger)
def sleep(self, s: int):
for i in range(s):
if self.pause:
return
time.sleep(1)
def main(self, pb):
self.pb = pb
while True:
# 点灯や再生云々の展示系スクリプト
self.wait_in()
self.wo = self.wait_out()
self.kick(1, 1)
self.kick(0, 1)
self.sleep(4) #2
if not self.pause:
pb.play() # duration: 95s
self.sleep(3)
self.kick(0, 0) # turn off aurora AFTER START of the video
self.sleep(7)
self.kick(1, 0) # turn off strip AFTER START of the video
self.sleep(67)
self.kick(0, 1) # turn on aurora BEFORE END of the video
self.sleep(11)
self.kick(0, 0) # turn off aurora AFTER END of the video
print('THE END')
if not self.status:
break
while self.is_in and not self.pause:
time.sleep(0.5)
self.end()
class Reset:
global RESET_PIN
status = True
def halt(self):
self.status = False
def main(self, pb, rp):
while True:
if pi.read(RESET_PIN):
while pi.read(RESET_PIN) and self.status:
time.sleep(1)
else:
while not pi.read(RESET_PIN) and self.status:
time.sleep(1)
print('RESET')
pb.stop()
rp.reset()
if not self.status:
break
try:
pb = Playback()
rp = Roop()
rst = Reset()
executer_pb = ThreadPoolExecutor()
executer_pb.submit(pb.main)
executer_rst = ThreadPoolExecutor()
executer_rst.submit(rst.main, pb, rp)
rp.main(pb)
except KeyboardInterrupt:
print()
rst.halt()
pb.halt()
rp.halt()
executer_pb.shutdown()
executer_rst.shutdown()
unclutter.kill()
コメントを書きましたが途中で力尽きました。
大したことは書いてないのでご容赦ください。
再生関連のクラス、展示スクリプト関連のクラス、リセット関連のクラスでまとめてあります。
可読性が高く、改修も容易だったのでオブジェクト指向すげぇ!ってなりながら書いてました。
ただ、これをオブジェクト指向と呼ぶかは怪しいところです。
Tips
主に工夫した点は以下の通りです。
音声の再生
CV2
で映像を再生していますが、音声は再生できません。
クリックだけで再生できるvlc
等々のありがたさを感じるところですが、音声も再生したいとのことなので力業で再生させます。
Pythonにはサブプロセスやマルチスレッドといった機能があるのでメインスレッドで映像を再生しながら並列で音楽を再生するようにします。
今回はプロセスは分けず別スレッドで音声を再生する実装にしました。
音声と映像の同期
正確には映像の等倍再生です。
CV2
では各フレームを連続して書き出しているだけなので等倍再生が保証されていません。
マシンスペックとフレームレート次第で、早送りになったりスロー再生になったりします。
例えば1フレームの書き出しに4msかかる場合、30fpsの動画を再生すると本来1秒の映像が1.2秒になります。
結果、背後で音声を再生していると、音声と映像がずれます。
等速で再生していないので当然ながら音声が先走っていったり、遅れたりします。
今回の場合は遅れました。どんどん遅くなって全く映像と音声が合わない状況でした。
余談ですが、エンコードによって時間のかかる処理が変わります。
H.264
とmpeg2
を比べると、圧縮率の高いH.264
は読み込みは速いですが書き出しに時間がかかります。
mpeg2
は逆で、書き出しは速いですが読み込みに時間がかかります。
sdカードと低めのスペックでボトルネックがたくさんあるラズパイならではですが、試行錯誤して結局どれもそんな変わんねーってなりました。
一応、フレームレートの改善に効果があるのは解像度を低くすることでした。
一フレームにかかる時間の例が以下の通りです。
format | read | draw | total |
---|---|---|---|
H.264 | 1ms | 2ms | 3ms |
mpeg2 | 2ms | 1ms | 3ms |
結局、映像側の工夫では目標のフレームレートに到達することはできず、映像と音声のずれを解消できませんでした。
また、仮に画質を下げるなどしてずれを解消できたとしても、ラズパイの機嫌次第でずれることが考えられます。
根本解決として同期させるための制御を組み込むことにしました。
制御としては考え方は簡単です。
音声再生開始からの経過時間を測定し、経過時間×フレームレートで経過フレームの理論値を算出します。
現在書き出そうとしているフレームが計算したフレームより遅い場合は、そのフレームをスキップすることで音声に追いつくようにします。
イメージとしては適度に早送りしつつ音声と動画を合わせる感じです。
等倍再生だけのサンプルコードを記します。
音声と映像のパスを渡すだけで等倍で再生できます。
システムの性能次第で速い側もしくは遅い側を削除しても問題ないと思います。
import time
import cv2
import simpleaudio
from concurrent.futures import ThreadPoolExecutor
class Playback:
def audio(self, audio): # audio related functions
wav = simpleaudio.WaveObject.from_wave_file(audio) # load WAV file
self.start = time.perf_counter() # time when the audio started playing
return wav.play() # return playing instance
def __init__(self, audio, video): # main function of playback
cv2.namedWindow('Video', cv2.WINDOW_NORMAL) # open window for video
cv2.setWindowProperty('Video', cv2.WND_PROP_FULLSCREEN, cv2.WINDOW_FULLSCREEN) # make the window full screen
executer_audio = ThreadPoolExecutor()
play_obj = executer_audio.submit(self.audio, audio) # start playing audio
print('VIDEO START')
cap = cv2.VideoCapture(video) # load movie
fps = cap.get(cv2.CAP_PROP_FPS) # get fps
spf = 1 / fps # second per frame
count = 0 # frame founter
while cap.isOpened(): # repeat until the movie ended
ret, frame = cap.read() # load current frame
fst = time.perf_counter()
# faster side, if the machine is fast, you don't need this section
cf = (fst - self.start) * fps # calculate the appropriate frame based on time
if count < cf: # if calculated frame is larger than the actual frame
count += 1 # increase the count
continue # break current loop: skip current frame
if ret and self.status: # if playback should be continue
cv2.imshow('Video', frame) # display the currect frame
# slower side, if your machine is slow, you should delete this while section then simply put "cv2.waitKey(1)"
while fst - time.perf_counter() + spf > 0: # if elapsed time is faster than frame second
cv2.waitKey(1) # shortest wait
else: # else playback should be ended
break # break loop: end playback
count += 1 # increase the count
print('VIDEO END')
if play_obj.result().is_playing(): # if audio is still playing
play_obj.result().stop() # stop playing audio
executer_audio.shutdown() # shutdown sub thread
cap.release() # free loaded movie
cv2.destroyAllWindows() # close all window
pb = Playback(audio_path, video_path)
#pb.stop()
無事にリップシンクすることができました。
マウスカーソル
マウスカーソルがずっと表示されていると展示の体験を低下させるのでマウスを非表示にするようにしました。
0.1秒無操作で非表示にするコマンドをサブプロセスで常駐させています。
unclutter -idle 0.1 -root
まとめ
自分はラズパイで航空無線を傍受したり、ウェブサーバーにしたりと実用的なことにしか使ってきませんでしたが、今回展示にかかわったことでラズパイのメディアコアとしての可能性の高さに気づきました。
実行できる言語の自由度が高く、インターフェースも豊富でそこそこ高性能なのでアートの展示などにも非常に使える機体だと思います。
今後も何かといろいろなものを作って遊んでいきたいと思いました。
最後まで読んで下さりありがとうございます。