LoginSignup
6
4

More than 1 year has passed since last update.

pythonで長時間の動画を音声付きで再生する

Last updated at Posted at 2022-05-15

Opencvで動画を再生したい場合、Opencvだけだと音を再生することができない。

音声付きで動画を再生するには、音声を別スレッドで再生する必要がある。

以下の記事で方法が紹介されているが、一度音声をメモリに展開しているため長時間の動画は読み込みの時間がかかり、場合によっては再生できない場合がある。

そこでメモリに展開せずに逐次読み込む方法を記載した。
seekを使って任意の時刻から再生可能。

BackgroundAudioPlay.py

import queue
import sys
import threading
import sounddevice as sd
import soundfile as sf

class BGAudio():
    # 長時間の音声ファイルをバックグラウンドで再生する
    def __init__(self, path, device=2, buffersize=5, blocksize=2048):
        
        # デバイスの番号を確認
        print(sd.query_devices())
        self.filename = path
        self.device = device
        self.buffersize = buffersize
        self.blocksize = blocksize

        self.stop_flag = False
        self.q = queue.Queue(maxsize=self.buffersize)
        self.event = threading.Event()

    def callback(self, outdata, frames, time, status):
        assert frames == self.blocksize
        if status.output_underflow:
            print('Output underflow: increase blocksize?', file=sys.stderr)
            raise sd.CallbackAbort
        assert not status
        try:
            data = self.q.get_nowait()
        except queue.Empty as e:
            if self.stop_flag==False:
                print('Buffer is empty: increase buffersize?', file=sys.stderr)
            raise sd.CallbackAbort from e
        if len(data) < len(outdata):
            outdata[:len(data)] = data
            outdata[len(data):].fill(0)
            raise sd.CallbackStop
        else:
            outdata[:] = data
    
    def play(self, sec):
        # 停止のためのフラグとキューの初期化
        self.sec = sec
        self.stop_flag = False
        while not self.q.empty():
            self.q.get()
        # 別スレッドで再生する
        self.audio_thread = threading.Thread(target=self._stream)
        self.audio_thread.start()
    def stop(self):
        self.stop_flag = True

    def _stream(self):
        try:
            with sf.SoundFile(self.filename) as f:
                f.seek(int(self.sec * f.samplerate))
                for _ in range(self.buffersize):
                    data = f.read(self.blocksize)
                    if not len(data):
                        break
                    self.q.put_nowait(data)  # Pre-fill queue
                stream = sd.OutputStream(
                    samplerate=f.samplerate, blocksize=self.blocksize,
                    device=self.device, channels=f.channels,
                    callback=self.callback, finished_callback=self.event.set)
                with stream:
                    timeout = self.blocksize * self.buffersize / f.samplerate
                    while len(data):
                        data = f.read(self.blocksize)
                        self.q.put(data, timeout=timeout)
                        if self.stop_flag:
                            break
                    self.event.wait()  # Wait until playback is finished
        except KeyboardInterrupt:
            print('\nInterrupted by user')
        except Exception as e:
            print("error", e)

main.py
import cv2
import numpy as np
from BackgroundAudioPlay import BGAudio

if __name__ == '__main__':
    # 動画の音声をffmpeg等でwavファイルに抽出しておく
    bga = BGAudio("test.wav")
    cap = cv2.VideoCapture("test.mp4")

    # 環境によって、音を再生する前に一回opencvを描画してからじゃないと落ちる場合がある。(要検証)
    cv2.imshow("Video", np.zeros((100, 100, 3)))
    bga.play(0.0)
    while(cap.isOpened()):
        ret, frame = cap.read()
        if not ret:
            break
        cv2.imshow("Video", frame)        
        if cv2.waitKey(100) & 0xFF == ord('q'): 
            break
    bga.stop()
    print("finish")    

(追記)
再度検索したら逐次的に再生する方法が書かれた記事もありました。

この記事ではPyAudioを使って再生しているので、sounddeviceを使う場合ということで、、、

6
4
1

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
6
4