2
3

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

pythonでrtmpクライアントを作り生成映像・音声をリアルタイム配信

Posted at

動画配信をプログラムで生成したい。それもリアルタイムで。
そう考えたとき、一番簡単なのかhlsで配信を行う方法。
この場合、一端rtmpでサーバに打ち上げる必要がある。
打ち上げにはffmpegを利用するが、コイツがなかなか思ったように動かないし、copilotも正解を教えてくれないし、世の中にサンプルもなかったので、とりあえず動作できるようにしたメモ。
説明は詳細はかかないでコードみて。という自分の備忘録的記事です。

python歴は限りなく短いのでpython的なコードでなく古来の手続き型な書き方になっているのはご容赦いただきたく。classでかっこよく実装しようとしたけど無理だった。

○サンプルコードを動かすために必要なライブラリ

導入不要なもの以外はpipでインストールがあらかじめ必要です。ここら辺いじる人ならすでに入っている物ばかりだと思います。導入不要とかいてあるものは、インストール不要で最初から入っているライブラリだと思っていますが、クリーンインストールはずいぶん前のことなので覚えてません。間違ってるかも。

・opencv (mjpeeg作成用)
・numpy(映像送信および音声変換用)
・sounddevice(音声取り込み用)
・threading(マルチスレッド動作用)
・Queue(スレッド間データ通信用)
・subprocess (ffmpeg起動用/導入不要)
・time(FPS管理用/導入不要)
・socket(ffmpegのサーバポート待ち受け用/導入不要)

○その他必要なもの

 RTMP-HLSサーバ
 ffmpeg-pathをソース内に記載する必要があります。

rtmp_ffmpeg_sample.py
import cv2
import numpy
import subprocess
import time
import sounddevice as sd
import socket
import threading
from queue import Queue
import subprocess
ffmpeg_path="C:\\work\\ffmpeg\\bin\\ffmpeg.exe" ##変更必要
rtmp_url = "rtmp://localhost/live/stream" #変更必要
ffmpeg_image_port = 12346 #それぞれのポート。内部利用なので変更不要
ffmpeg_sound_port = 12347
frame_rate = 15  #入力FPS。画像生成される頻度
output_frame_rate=15    #出力フレームレート。 frame_rateの倍数にする
pixel_width=480    #画像の幅と高さ。16の倍数にする
pixel_height=720
sample_rate = 44100 #オーディオタイプ/送られるデータと合わせる
channel=2   #オーディオチャンネル。ステレオ

duration =  1/frame_rate    # フレームの時間を計算。単位は秒
samples = int(sample_rate * duration *channel)    #16bitなので2倍
count=0                 #画像に焼き付けるカウンターの番号
connect_flag=0          #sock接続状況のフラグ。0 none 1 video 2 audio

##開始時ダミーイメージデータの作成
canvas = numpy.zeros((pixel_height, pixel_width, 3), dtype=numpy.uint8)
ret, latest_image_data = cv2.imencode('.jpg', canvas)
#latest_image_data = image_data
#変数アクセスのスレッド間ロック用
lock = threading.Lock()
#スレッド間のデータ共有(音声データ)用
queue = Queue()

##ffmpegのサブプロセスを作成
def create_ffmpeg_subprocess():
    command = [
        ffmpeg_path,
        '-y',  # 出力ファイルが存在する場合は上書き
##video formats
        '-analyzeduration', '0',  #入力ストリームを解析する際のデータ量/μ秒
        '-f', 'mjpeg',  # 入力フォーマット
        '-r', str(frame_rate),  # 入力フレームレート
        '-i', 'tcp://localhost:'+str(ffmpeg_image_port),  # 入力(TCP/IP)
##audio formats
        '-f', 's16le',  # 入力フォーマット(オーディオ)
        '-ar', str(sample_rate),  # オーディオのサンプルレート
        '-ac', str(channel),  # オーディオのチャンネル数
        '-i', 'tcp://localhost:'+str(ffmpeg_sound_port),   
##output formats
        '-c:v', 'libx264',  # 出力ビデオコーデック
        '-c:a', 'aac',
        '-b:a', '128k',
        '-r', str(output_frame_rate),  # 出力フレームレート
        '-f','flv',
#        'c:\\\\work\\test.flv'        #ファイルに出しテストするなら
        rtmp_url  # 出力(RTMP URL)
    ]
    return subprocess.Popen(command, stdin=subprocess.PIPE,shell=True)

##ウェイト用デコレータの設定。二重関数にして引数入れる
def fpstimer(framerate):
    def decorator(func):
        def wrapper(*args, **kwargs):
            start_time=time.time()
            result = func(*args, **kwargs)
            sleep_time = ((1/framerate) - (time.time() - start_time))
            if sleep_time > 0:
                time.sleep(sleep_time)
            return result
        return wrapper
    return decorator

##ポートの接続をする関数。connection_typeはデバッグ表示用
def establish_connection(port, connection_type):
    server_address = ('localhost', port)
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    sock.bind(server_address)
    sock.listen(1)
    print(f"Waiting for a {connection_type} connection...")
    try:
        conn, client_address = sock.accept()
        print(f"Accepted {connection_type} connection from ", client_address)
        return conn, sock
    except Exception as e:
        print(f"Error establishing {connection_type} connection: {e}")

##fpstimerで一定間隔ウェイトをいれつつ画像を生成しlatest_image_dataに保存
@fpstimer(frame_rate)
def generate_image():
    global count
    global lock
    global latest_image_data
    count += 1
    canvas = numpy.zeros((pixel_height, pixel_width, 3), dtype=numpy.uint8)
    cv2.putText(canvas, str(count), (50, 50),cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 2, cv2.LINE_AA)
    ret, image_data = cv2.imencode('.jpg', canvas)
    if ret:
        with lock:
            latest_image_data = image_data
           
##映像を作って待つを無限ループする画像作成スレッド。非連動
def generate_image_data():
    while True:
        generate_image()

##イメージの送信。fpstimerで送信したらウェイトが入る構造にしてある
@fpstimer(frame_rate)
def send_image(conn):
    global latest_image_data
    global lock
    global connect_flag
    if connect_flag >0:
        with lock:
            conn.sendall(latest_image_data)
#            print ("send image")

def send_image_ffmpeg():
    global connect_flag
    global lock
    conn, sock = establish_connection(ffmpeg_image_port, "image")
    with lock:
        connect_flag=1
    while True:
        send_image(conn)
##終わった後はそのまま止まるので、行儀悪い

def send_sound_ffmpeg():
    global connect_flag
    global lock
    qdata=b''
    conn,sock =establish_connection(ffmpeg_sound_port, "sound")
    with lock:
        connect_flag=211111
    while True:
        while queue.qsize() > 0:
            qdata = queue.get()
            conn.sendall(qdata)
        time.sleep(0.002)   #1/60 の半分以下が目安でウェイトしないと全力で動いちゃう
##終わった後はそのまま止まるので、行儀悪い

##音声は一定間隔でコールバックで取り込みqueueへ。コードで生成するならここ
def sound_callback(indata, frames, time, status):
    # 音声データをバイト列に変換
    global connect_flag
    global lock
    with lock:
        if connect_flag==2:
            audio_bytes = (indata * 32767).astype(numpy.int16).tobytes()
            queue.put(audio_bytes)

def generate_sound_data():
    with sd.InputStream(callback=sound_callback, channels=channel, samplerate=sample_rate, blocksize=samples) as stream:
            while True:  # ストリームが開始され続けるように無限ループを追加
                time.sleep(1)  # CPU使用率を抑えるためのウェイト
   
####main部分
# スレッドを作成
t1 = threading.Thread(target=generate_image_data)   #イメージデータ生成
t2 = threading.Thread(target=generate_sound_data)     #音声データ生成
t3 = threading.Thread(target=send_sound_ffmpeg)     #ffmpegが接続する音声TCP
t4 = threading.Thread(target=send_image_ffmpeg)     #ffmpegが接続する映像TCP
# スレッドリストに追加
threads = []
threads.append(t1)
threads.append(t2)
threads.append(t3)
threads.append(t4)
# スレッドを開始
for t in threads:
    t.start()

# ffmpegをサブプロセスとして起動
proc=create_ffmpeg_subprocess()
pass
##修了処理なし

○キモは何?

ffmpegへの設定値が、一定の組み合わせでしか動かない。映像のみなら動くこともあったりするのがまた厄介で。ffmpegには、音声/動画を送る必要がある。ffmpegをsubprocessで起動し、なんらかの方法でffmpegへデータを送る必要がある。そこらへんでいろいろクセがあって動作させるのには難航した。ひっかかりそうなところ(or引っかかったところ)は以下に箇条書きしておきます。

・音声と映像は別々に送らないとなぜか映像部分に音声が割り込んだりしてしまう。標準入力は1本しかない。世の中にあるサンプルコードは音声をあきらめているケースが多くてあてにならず。結果としてはtcpポートを複数あけて通信するのが正。
また、
・映像の形式はmjpegが無難。rawvideoを送っても動きそうだけどヘッダがないのでなぜかズレることがあった。
・音声は無圧縮でいけるけどある程度タイミングを想定していれていかないとバッファが不足してffmpegがおっこちる。
・オプション周りはいろいろ設定すると動かなくなるケースなどがありましたが、一番わからなかったのは、-sオプションを指定するとエラーが出ることでした(映像のみのエンコードなら動くのが謎)
・Sockで映像と音声をおくると、映像を受信して一定フレーム数受け取り、フォーマットを確定してから音声につなぎに行くのでその分ずれる。なので、データの用意はグローバル変数を用意して、Sockの接続が始まった後にデータを流すようにしないと派手に音声と映像がずれた。

○サンプルコードがやること

・オーディオデバイスから音声をサンプリングする
・映像は、cvを使ってカウントアップするjpegをつくる
・あわせてrtmpで送りつけて動画配信として再生する

○再生の確認

nginxで構築するのが簡単。構築済みはWindowsならダウンロードするだけで使えるものもある。c:\に展開し、localhost/live/ にストリームキーstreamで送ってやればブラウザでテストができる。

Windows+Nginx+RTMP+HLS+Dash
https://github.com/ustoopia/Live-stream-server-portable-Windows-Nginx-RTMP-HLS-Dash

英語ムズカシイという方は以下のnoteが参考になるかも。30分とされてるけど5分もかかりません。

nginxを使ってRTMPSとHLS再生を1コンテナで実現する
https://note.com/educator/n/n57210c570efc

○サンプルコードの問題点
・きちんと使う場合、エラー処理は、各自入れる必要がある
・スレッドで動いてるが終了処理がちゃんと入ってないのでハングアップする。
・sockが切断されたらエラーを補足していないのてローズして終了しない
・しばらく再生すると止まる(リトライで再開はする)
・CTRL+Cでプログラムが止まらない
・映像、音声が1分ほど遅れます。これはコードを停止しても1分くらい再生が続いているので、FFMPEGではなくHLS/NGINX側のキャッシュの問題です。今回はプログラムの作成記事で、サーバー構築はテスト用なので、サーバーの設定について、解決のための調査がやりきれていません。

もし必要であれば、ソースは小さいので、まるごとcopilotにぶんなげて「このコードはエラー処理が行われていないので正常に停止できません。また、while Trueで無限ループしており、正常に終了処理が行えないので、これらをきちんと整えたコードをください」といえばそれなりに整ったコードがでてくることでしょう。ただし、リトライはffmpegが切断した時点で配信も中断され、行うことはできないので、特に考慮する必要はありません。

2
3
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
3

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?