1
0

ラズパイだってアートになれる

Last updated at Posted at 2023-01-24

知人の手伝いでラズパイを使って美術展示を制御するプログラムを書きました。
初めてオブジェクト指向なプログラム(多分)も書いたので備忘録がてら掲載します。
PythonとCV2で動画を等速等倍の音声付で再生する制御も取り組みましたのでご参考になれば幸いです。

展示概要

自分がメインのプロジェクトではないので一応詳細は伏せますが、展示概要は

鑑賞者が個室に入ると部屋が光に包まれる。天井には常識を超越した映像が投影され、観客は困惑と興奮の中で超感覚体験をする。

みたいな感じです。

もっと凄いイイ感じの展示なんですけど僕の日本語力が及ばず魅力が伝えられなくて申し訳ないです。
僕は現代アートなるものは難しくて避けがちなのですが、本展示製作者の一員である友人曰く鑑賞者が宇宙猫になれるのを目指したとのことです。

f3072d14ccf044ef823aeb8538ae3ced.png

いいですね。
そういうのは好きです。

システム概要

システムの概要は以下の通りです。
システム図

制御のコアはラズパイが担います。
GPIOでドアの開閉とリセット操作を検知します。
センサー自体は磁石で通電と遮断が変わる簡単なものです。

映像出力はHDMI端子からプロジェクターに流し込みます。

ライト関係の制御はwebhockを通してIFTTTからSwitchbotを操作します。
二つライトがあり、一つは赤外線リモコンで操作するものなのでhub miniを介して赤外線で操作します。
もう一つはSwitchbot製のものなので、ライトをWi-Fiに接続し直接操作します。

プログラム

プログラムはPythonで書きました。
Cやら何やら触ってきて改めて思いますが、本当にPythonは楽ですね。
バックグラウンドで勝手にイイ感じにしてくれて、ライブラリも豊富で、人気なのも納得です。

import pigpio           # import Raspberry Pi's pin library
import time             # import library for sleep function
import numpy as np      # import library for blank screen function
import os               # import library for environment varieties
import cv2              # import library for display movie function
import simpleaudio      # import library for audio function
import requests         # import library for send request to webhock
import subprocess       # import library for hide mouse cursore
from concurrent.futures import ThreadPoolExecutor   # import library for multi-thread

RESET_PIN = 3           # set read pin for reset
DOOR_PIN = 4            # set read pin of sensing a door

pi = pigpio.pi()        # initialize pigpio

pi.set_mode(DOOR_PIN, pigpio.INPUT)     # set pigpio pin mode as input (read level of the pin)
pi.set_mode(RESET_PIN, pigpio.INPUT)

os.environ['DISPLAY'] = ':0'            # select output display
unclutter = subprocess.Popen(['unclutter', '-idle', '0.1', '-root'])    # hide mouse cursore

# Playback related functions
class Playback:
    height = 720    # set height of blank screen
    width = 1280    # set width of blank screen
    black = np.zeros((height, width, 3))    # generate color info of (0, 0, 0) on each pixel
    status = True   # a flag for the exhibition is running
    def stop(self):             # play -> blank
        self.is_blank = True    # a flag for screen status
    def play(self):             # blank -> play
        self.is_blank = False
    def halt(self):             # stop all playback: terminate window then back to the desktop
        self.status = False     # = the main program received a stop signal
    def audio(self):            # audio related functions
        wav = simpleaudio.WaveObject.from_wave_file('./audio.wav') # load WAV file
        self.start = time.perf_counter()                        # time when the audio started playing
        return wav.play()                                       # return playing instance
    def main(self):             # main function of playback
        cv2.namedWindow('Video', cv2.WINDOW_NORMAL)             # open window for video
        cv2.setWindowProperty('Video', cv2.WND_PROP_FULLSCREEN, cv2.WINDOW_FULLSCREEN)  # make the window full screen
        while True:             # repeat during the exhibition
            self.is_blank = True                                # darkness was upon the face of the deep
            cv2.imshow('Video', self.black)                     # display blank screen
            while self.is_blank and self.status:                # continue blank screen while the flags are true
                cv2.waitKey(1)                                  # 1 blank frame
            # break loop

            executer_audio = ThreadPoolExecutor()
            play_obj = executer_audio.submit(self.audio)        # start playing audio
            ####
            ## in python, cv2 cannot play movie with audio
            ## play audio by executing it in parallel threads
            ####

            print('VIDEO START')

            cap = cv2.VideoCapture(r'./video.mp4')              # load movie
            count = 0                                           # frame founter
            while cap.isOpened():                               # repeat until the movie ended
                ret, frame = cap.read()                         # load current frame
                cf = (time.perf_counter() - self.start) * 24    # calculate the appropriate frame based on time
                if count < cf:      # if calculated frame is larger than the actual frame
                    count += 1      # increase the count
                    continue        # break current loop: skip current frame
                ####
                ## actual playback speed is slower than the original due to raspi's hardware restriction
                ## to sync video and audio, skip some frames if the current frame is behind then calculate
                ####
                if ret and not self.is_blank and self.status:   # if playback should be continue
                    cv2.imshow('Video', frame)                  # display the currect frame
                    cv2.waitKey(1)                              # 1 frame
                else:                                           # else playback should be ended
                    break                                       # break loop: end playback -> blank screen
                count += 1                                      # increase the count

            print('VIDEO END')

            if play_obj.result().is_playing():  # if audio is still playing
                play_obj.result().stop()        # stop playing audio
            executer_audio.shutdown()           # shutdown sub thread
            cap.release()                       # free loaded movie
            if not self.status:                 # if main program received stop signal
                break                           # end playback: return to desktop
        cv2.destroyAllWindows()                 # close all window

class Roop:
    global DOOR_PIN
    status = True                   # a flag of exhibition is runnning
    aurora_status = False
    strip_status = False
    is_in = False                   # a flag of whether people are inside or not
    def off(self):
        if self.aurora_status:
            self.kick(0, 0, True)
        if self.strip_status:
            self.kick(1, 0, True)
    def reset(self):
        self.is_in = False
        if not self.pause:
            self.off()
            self.end()
    def halt(self):
        self.status = False
        self.off()
        self.end()
    def wait_in(self):                      # a function of wait going IN
        if pi.read(DOOR_PIN):               # if the door is closed
            while pi.read(DOOR_PIN):        # wait until door is opened
                time.sleep(0.1)
            self.pause = False
            self.is_in = True
            print('GOING IN')
        else:                               # else the door is opened
            while not pi.read(DOOR_PIN):    # wait until door is closed
                time.sleep(0.1)
            self.wait_in()                  # closed, then wait until open
    def wait_out_exac(self):        # a function of wait going OUT
        if self.pause:              # if paused: reset button is pushed
            return
        if pi.read(DOOR_PIN):       # if the door is closed
            while pi.read(DOOR_PIN) and not self.pause:     # wait until the door is opened
                time.sleep(0.1)
            print('GOING OUT')
            self.pb.stop()
            self.is_in = False
            self.reset()
        else:                       # else the door is opened
            while not pi.read(DOOR_PIN) and not self.pause: # wait until the door is closed
                time.sleep(0.1)
            self.wait_out_exac()    # closed, then wait until open
    def wait_out(self):
        executer_wait = ThreadPoolExecutor()
        executer_wait.submit(self.wait_out_exac)
        return executer_wait
    def end(self):
        self.pause = True
        if self.is_in:
            self.wo.shutdown()
    def kick(self, d: int, m: int, f = False):
        if self.pause and not f:
            return
        if d:
            if m:
                triger = 'strip_on'
                self.strip_status = True
            else:
                triger = 'strip_off'
                self.strip_status = False
        else:
            if m:
                triger = 'aurora_on'
                self.aurora_status = True
            else:
                triger = 'aurora_off'
                self.aurora_status = False
        url = "https://maker.ifttt.com/trigger/" + triger + "/json/with/key/XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX"
        requests.get(url)
        print(triger)
    def sleep(self, s: int):
        for i in range(s):
            if self.pause:
                return
            time.sleep(1)
    def main(self, pb):
        self.pb = pb
        while True:
            # 点灯や再生云々の展示系スクリプト
            self.wait_in()
            self.wo = self.wait_out()
            self.kick(1, 1)
            self.kick(0, 1)
            self.sleep(4) #2
            if not self.pause:
                pb.play()       # duration: 95s
            self.sleep(3)
            self.kick(0, 0)     # turn off aurora AFTER START of the video
            self.sleep(7)
            self.kick(1, 0)     # turn off strip AFTER START of the video
            self.sleep(67)
            self.kick(0, 1)     # turn on aurora BEFORE END of the video
            self.sleep(11)
            self.kick(0, 0)     # turn off aurora AFTER END of the video
            print('THE END')
            if not self.status:
                break
            while self.is_in and not self.pause:
                time.sleep(0.5)
            self.end()

class Reset:
    global RESET_PIN
    status = True
    def halt(self):
        self.status = False
    def main(self, pb, rp):
        while True:
            if pi.read(RESET_PIN):
                while pi.read(RESET_PIN) and self.status:
                    time.sleep(1)
            else:
                while not pi.read(RESET_PIN) and self.status:
                    time.sleep(1)
            print('RESET')
            pb.stop()
            rp.reset()
            if not self.status:
                break

try:
    pb = Playback()
    rp = Roop()
    rst = Reset()
    executer_pb = ThreadPoolExecutor()
    executer_pb.submit(pb.main)
    executer_rst = ThreadPoolExecutor()
    executer_rst.submit(rst.main, pb, rp)
    rp.main(pb)
except KeyboardInterrupt:
    print()
    rst.halt()
    pb.halt()
    rp.halt()
    executer_pb.shutdown()
    executer_rst.shutdown()
    unclutter.kill()

コメントを書きましたが途中で力尽きました。
大したことは書いてないのでご容赦ください。

再生関連のクラス、展示スクリプト関連のクラス、リセット関連のクラスでまとめてあります。
可読性が高く、改修も容易だったのでオブジェクト指向すげぇ!ってなりながら書いてました。
ただ、これをオブジェクト指向と呼ぶかは怪しいところです。

Tips

主に工夫した点は以下の通りです。

音声の再生

CV2で映像を再生していますが、音声は再生できません。
クリックだけで再生できるvlc等々のありがたさを感じるところですが、音声も再生したいとのことなので力業で再生させます。
Pythonにはサブプロセスやマルチスレッドといった機能があるのでメインスレッドで映像を再生しながら並列で音楽を再生するようにします。
今回はプロセスは分けず別スレッドで音声を再生する実装にしました。

音声と映像の同期

正確には映像の等倍再生です。
CV2では各フレームを連続して書き出しているだけなので等倍再生が保証されていません。
マシンスペックとフレームレート次第で、早送りになったりスロー再生になったりします。
例えば1フレームの書き出しに4msかかる場合、30fpsの動画を再生すると本来1秒の映像が1.2秒になります。
結果、背後で音声を再生していると、音声と映像がずれます。
等速で再生していないので当然ながら音声が先走っていったり、遅れたりします。
今回の場合は遅れました。どんどん遅くなって全く映像と音声が合わない状況でした。

余談ですが、エンコードによって時間のかかる処理が変わります。
H.264mpeg2を比べると、圧縮率の高いH.264は読み込みは速いですが書き出しに時間がかかります。
mpeg2は逆で、書き出しは速いですが読み込みに時間がかかります。
sdカードと低めのスペックでボトルネックがたくさんあるラズパイならではですが、試行錯誤して結局どれもそんな変わんねーってなりました。
一応、フレームレートの改善に効果があるのは解像度を低くすることでした。

一フレームにかかる時間の例が以下の通りです。

format read draw total
H.264 1ms 2ms 3ms
mpeg2 2ms 1ms 3ms

結局、映像側の工夫では目標のフレームレートに到達することはできず、映像と音声のずれを解消できませんでした。
また、仮に画質を下げるなどしてずれを解消できたとしても、ラズパイの機嫌次第でずれることが考えられます。
根本解決として同期させるための制御を組み込むことにしました。

制御としては考え方は簡単です。
音声再生開始からの経過時間を測定し、経過時間×フレームレートで経過フレームの理論値を算出します。
現在書き出そうとしているフレームが計算したフレームより遅い場合は、そのフレームをスキップすることで音声に追いつくようにします。
イメージとしては適度に早送りしつつ音声と動画を合わせる感じです。

等倍再生だけのサンプルコードを記します。
音声と映像のパスを渡すだけで等倍で再生できます。
システムの性能次第で速い側もしくは遅い側を削除しても問題ないと思います。

等倍再生サンプルコード
import time
import cv2
import simpleaudio
from concurrent.futures import ThreadPoolExecutor

class Playback:
    def audio(self, audio):                                 # audio related functions
        wav = simpleaudio.WaveObject.from_wave_file(audio)  # load WAV file
        self.start = time.perf_counter()                    # time when the audio started playing
        return wav.play()                                   # return playing instance
    def __init__(self, audio, video):                       # main function of playback
        cv2.namedWindow('Video', cv2.WINDOW_NORMAL)         # open window for video
        cv2.setWindowProperty('Video', cv2.WND_PROP_FULLSCREEN, cv2.WINDOW_FULLSCREEN) # make the window full screen

        executer_audio = ThreadPoolExecutor()
        play_obj = executer_audio.submit(self.audio, audio) # start playing audio

        print('VIDEO START')

        cap = cv2.VideoCapture(video)                       # load movie
        fps = cap.get(cv2.CAP_PROP_FPS)                     # get fps
        spf = 1 / fps                                       # second per frame
        count = 0                                           # frame founter
        while cap.isOpened():                               # repeat until the movie ended
            ret, frame = cap.read()                         # load current frame
            fst = time.perf_counter()

            # faster side, if the machine is fast, you don't need this section
            cf = (fst - self.start) * fps                   # calculate the appropriate frame based on time
            if count < cf:                                  # if calculated frame is larger than the actual frame
                count += 1                                  # increase the count
                continue                                    # break current loop: skip current frame

            if ret and self.status:                         # if playback should be continue
                cv2.imshow('Video', frame)                  # display the currect frame

                # slower side, if your machine is slow, you should delete this while section then simply put "cv2.waitKey(1)"
                while fst - time.perf_counter() + spf > 0:  # if elapsed time is faster than frame second
                    cv2.waitKey(1)                          # shortest wait

            else:                                           # else playback should be ended
                break                                       # break loop: end playback
            count += 1                                      # increase the count

        print('VIDEO END')

        if play_obj.result().is_playing():                  # if audio is still playing
            play_obj.result().stop()                        # stop playing audio
        executer_audio.shutdown()                           # shutdown sub thread
        cap.release()                                       # free loaded movie
        cv2.destroyAllWindows()                             # close all window

pb = Playback(audio_path, video_path)
#pb.stop()

無事にリップシンクすることができました。

マウスカーソル

マウスカーソルがずっと表示されていると展示の体験を低下させるのでマウスを非表示にするようにしました。
0.1秒無操作で非表示にするコマンドをサブプロセスで常駐させています。

unclutter -idle 0.1 -root

まとめ

自分はラズパイで航空無線を傍受したり、ウェブサーバーにしたりと実用的なことにしか使ってきませんでしたが、今回展示にかかわったことでラズパイのメディアコアとしての可能性の高さに気づきました。
実行できる言語の自由度が高く、インターフェースも豊富でそこそこ高性能なのでアートの展示などにも非常に使える機体だと思います。
今後も何かといろいろなものを作って遊んでいきたいと思いました。

最後まで読んで下さりありがとうございます。

1
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
0