2
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

ImageFlux Live StreamingでRaspberry Pi 5からRTMP to HLS配信

2
Last updated at Posted at 2026-06-22

概要

背景

さくらインターネットのライブ配信PaaS「ImageFlux Live Streaming」が、2026年3月、RTMP to HLS配信に対応した(記事)。
RTMPはライブ配信のIngestにおいて最大シェアを誇るプロトコルである。これにより、Webブラウザからだけでなく、多種多様な機材・アプリケーションからライブ配信を行うことが可能になった。
そこで今回は、Raspberry Pi 5からライブ配信を行う仕組みを組んでみた。

実装内容

今回はハードウェア、ソフトウェア両方を扱うため、分けて解説する。

ハードウェア

Raspberry Pi 5で実現する内容は以下の通り。
image.png
Raspberry Pi 5に接続したカメラおよびマイクでライブ配信をする。

  • 待機状態でタクトスイッチを押下するとRTMP to HLSチャンネル作成・配信開始し、緑色LEDを消灯&赤色LEDを点灯。
  • 配信状態でタクトスイッチを押下すると配信終了・配信していたチャンネルを削除し、赤色LEDを消灯&緑色LEDを点灯。

という至ってシンプルな仕様である。

ソフトウェア

配信が開始されたら、AppRun共用型を用いてImageFlux Live StreamingのイベントWebhook通知を受け取る。APIで視聴用URL(.m3u8のパス)を取得し、シンプル通知経由でメール送付する。
これで、配信側ではスイッチを押すだけ、視聴者側ではメールで通知されたURLにアクセスするだけで、配信が成立する。
image.png
これは任意だが、録画はオブジェクトストレージに保存し、ウェブアクセラレータ経由で配信できるようにもした。

構築

ハードウェア調達

筆者はスイッチサイエンスにてRaspberry Pi 5のコンプリートキットおよびカメラを調達した。マイクは市販されている廉価なUSBマイクを用いた。
ほか、ブレッドボード、ジャンプワイヤー、LED2種、抵抗2個、タクトスイッチが必要である。
回路は至って単純で、
1.GPIO17番-抵抗(330Ω)-LED(緑)-GND
2.GPIO27番-抵抗(330Ω)-LED(赤)-GND
3.GPIO22番-タクトスイッチ-GND
という構成。
IMG_0040.jpg
慣れていればものの5分で結線可能である。

クラウド基盤構築

続けてクラウド基盤を構築する。
シンプル通知の実行には「作成・削除」権限を持つサービスプリンシパルおよびサービスプリンシパルキーが必要であるため発行(参考)、ImageFlux Live StreamingのAPIトークンも発行した(参考)。
コンテナレジストリにAppRun共用型で動作させるコンテナイメージを格納後、Terraformで基盤を構築する。AppRun共用型の最小スケールを0に設定することで、イベントWebhook通知が走っている場合のみ起動し、コストを最小限に抑えるように設計している。

resource "sakura_simple_notification_destination" "notification_target_for_imageflux_live_streaming_archive_notifier" {
  name        = "通知先メールアドレス"
  description = "ImageFlux Live Streamingの通知先メールアドレス"
  type        = "email"
  value       = var.email_address
}

resource "sakura_simple_notification_group" "notification_group_for_imageflux_live_streaming_archive_notifier" {
  name         = "ImageFlux Live Streaming通知グループ"
  description  = "ImageFlux Live Streaming配信開始・アーカイブ保存完了の通知グループ"
  destinations = [sakura_simple_notification_destination.notification_target_for_imageflux_live_streaming_archive_notifier.id]
}
//1変数で秘密鍵の値を保持できないため、分割
locals {
  service_principal_private_key_pem_content     = file(var.service_principal_private_key_pem_path)
  service_principal_private_key_pem_b64         = base64encode(local.service_principal_private_key_pem_content)
  service_principal_private_key_b64_chunk_count = ceil(length(local.service_principal_private_key_pem_b64) / var.service_principal_private_key_b64_chunk_size)
  service_principal_private_key_b64_chunk_envs = [for i in range(local.service_principal_private_key_b64_chunk_count) : {
    key   = format("SERVICE_PRINCIPAL_PRIVATE_KEY_PEM_B64_%03d", i)
    value = substr(local.service_principal_private_key_pem_b64, i * var.service_principal_private_key_b64_chunk_size, var.service_principal_private_key_b64_chunk_size)
  }]
}

resource "sakura_apprun_shared" "imageflux_live_streaming_notifier" {
  name = "ImageFlux Live Streaming通知機能"

  max_scale       = 3
  min_scale       = 0
  port            = 8080
  timeout_seconds = 60

  components = [{
    name       = "ImageFlux Live Streaming通知コンテナ"
    max_cpu    = "0.5"
    max_memory = "1Gi"
    deploy_source = {
      container_registry = {
        image = var.container_registry_image
      }
    }
    env = concat([
      {
        key   = "SERVICE_PRINCIPAL_KEY_KID"
        value = var.service_principal_key_kid
      },
      {
        key   = "SERVICE_PRINCIPAL_RESOURCE_ID"
        value = var.service_principal_resource_id
      },
      {
        key   = "SERVICE_PRINCIPAL_PRIVATE_KEY_PEM_B64_CHUNK_COUNT"
        value = tostring(local.service_principal_private_key_b64_chunk_count)
      },
      {
        key   = "NOTIFICATION_GROUP_ID"
        value = sakura_simple_notification_group.notification_group_for_imageflux_live_streaming_archive_notifier.id
      },
      {
        key   = "IMAGEFLUX_ACCESS_TOKEN"
        value = var.imageflux_access_token
      }
    ], local.service_principal_private_key_b64_chunk_envs)
  }]
  traffics = [{
    version_index = 0
    percent       = 100
  }]
}

オブジェクトストレージとさくらのウェブアクセラレータは既存のものを使用したため、Terraformには記載していない。
ソースコード全体はリポジトリを参照されたし。

Terraform init
Terraform apply

を叩けば、すぐ構築されるはずだ。Raspberry Pi 5上に載せるソースコードに設定するAppRun共用型の公開URLも、ここで取得できる。
ほどなくシンプル通知の本登録リンクがメールアドレスに届くので、その本登録も済ませておこう。

Raspberry Pi 5へのサービス登録

いよいよ、Raspberry Pi 5に、ライブ配信機能を載せる。
ソースコードは以下の通り。コンプリートキットで購入したmicroSDにはOSがインストール済みであり、PythonやFFmpeg等の実行環境も整っている。そのため、冒頭の設定値だけ設定すればよい。
なお、アーカイブ保存先IDはImageFlux Live StreamingのAPIで作成・取得できる。

from gpiozero import Button, LED
import requests
import signal
import subprocess
import sys
import time

# ========================================================
# 利用者設定項目
# ========================================================
ACCESS_TOKEN = "ImageFlux Live Streamingのアクセストークン"
ARCHIVE_DESTINATION_ID = "ImageFlux Live Streamingのアーカイブ保存先ID"
EVENT_WEBHOOK_URL = "イベントWebhook通知先URL"

API_URL = "https://live-api.imageflux.jp/"
# APIのリクエストタイムアウト設定 (接続タイムアウト, 読み込みタイムアウト)
REQUEST_TIMEOUT = (5, 15)
# プロセス終了待機タイムアウト (秒)
PROCESS_WAIT_TIMEOUT = 5
# ALSAの入力デバイス名。既定デバイスを使う場合は "default"。
# 必要に応じて "hw:1,0" や "plughw:1,0" に変更可能。
AUDIO_DEVICE = "default"
# 音声設定
AUDIO_SAMPLE_RATE = 48000
AUDIO_CHANNELS = 1
AUDIO_BITRATE = "160k"

# ========================================================
# ハードウェア設定 (gpiozeroを使用)
# ========================================================
# bounce_time=0.1 でチャタリングを抑制
btn = Button(22, pull_up=True, bounce_time=0.1)
led_g = LED(17)
led_r = LED(27)

# ========================================================
# 状態管理変数
# ========================================================
is_streaming = False
current_channel_id = None
rpicam_proc = None
ffmpeg_proc = None

# 配信開始
def start_stream():
    global is_streaming, current_channel_id, rpicam_proc, ffmpeg_proc

    print("配信チャンネルを作成中...")
    headers = {
        "Content-Type": "application/json",
        "X-Sora-Target": "ImageFlux_20250901.CreateRTMPChannel",
        "Authorization": f"Bearer {ACCESS_TOKEN}"
    }
    payload = {
        "hls": [{
            "durationSeconds": 2,
            "startTimeOffset": 2,
            "video": {
                "width": 1280,
                "height": 720,
                "fps": 30,
                "bps": 4000000,
                "codec": "h264_high"
            },
            "audio": {
                "bps": 160000
            },
            "archive": {
                "archive_destination_id": ARCHIVE_DESTINATION_ID
            }
        },{
            "durationSeconds": 2,
            "startTimeOffset": 2,
            "video": {
                "width": 720,
                "height": 480,
                "fps": 30,
                "bps": 2000000,
                "codec": "h264_high"
            },
            "audio": {
                "bps": 160000
            },
            "archive": {
                "archive_destination_id": ARCHIVE_DESTINATION_ID
            }
        }],
        "event_webhook_url": EVENT_WEBHOOK_URL
    }

    try:
        # API要求
        res = requests.post(API_URL, headers=headers, json=payload, timeout=REQUEST_TIMEOUT)
        res.raise_for_status()  # 200番台以外は例外として処理
    except requests.RequestException as e:
        print(f"チャンネル作成エラー: {e}")
        return

    try:
        data = res.json()
    except ValueError as e:
        print(f"API応答JSON解析エラー: {e}")
        return

    current_channel_id = data.get("channel_id")
    ingest_url = data.get("ingest_url")
    playlist_url = data.get("playlist_url")

    # 必要な値がない場合、終了(チャンネルIDは削除時、 ingestURLは配信開始時に必要)
    if not isinstance(current_channel_id, str) or not current_channel_id.strip():
        print("不完全なチャンネル作成API応答: channel_id がありません。")
        return
    if not isinstance(ingest_url, str) or not ingest_url.strip():
        print("不完全なチャンネル作成API応答: ingest_url がありません。")
        return

    current_channel_id = current_channel_id.strip()
    ingest_url = ingest_url.strip()

    print(f"チャンネル作成完了 チャンネルID: {current_channel_id} Ingest URL: {ingest_url}")

    # 映像処理コマンドの定義
    # タイムスタンプをFFmpegのものに合わせるため、--libav-format を mpegts としている。
    rpicam_cmd = [
        "rpicam-vid", "-t", "0", "--width", "1280", "--height", "720",
        "--framerate", "30", "--codec", "h264", "--profile", "high",
        "--inline", "--nopreview", "--libav-format", "mpegts", "-o", "-"
    ]
    # ffmpeg側は映像を mpegts で受け取り、USBマイク(ALSA)入力を合成してRTMPへ送出
    ffmpeg_cmd = [
        "ffmpeg",
        "-thread_queue_size", "1024", "-f", "mpegts", "-i", "-",
        "-thread_queue_size", "1024", "-f", "alsa", "-ac", str(AUDIO_CHANNELS), "-ar", str(AUDIO_SAMPLE_RATE), "-i", AUDIO_DEVICE,
        "-map", "0:v:0", "-map", "1:a:0",
        "-c:v", "copy",
        "-c:a", "aac", "-b:a", AUDIO_BITRATE,
        "-af", "aresample=async=1:first_pts=0",
        "-f", "flv", ingest_url
    ]

    print("カメラとFFmpegを起動します...")
    try:
        # rpicam-vid の標準出力を ffmpeg の標準入力にパイプで繋ぐ
        rpicam_proc = subprocess.Popen(rpicam_cmd, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL)
        ffmpeg_proc = subprocess.Popen(ffmpeg_cmd, stdin=rpicam_proc.stdout, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)

        # 親プロセスのパイプFDを閉じ、子プロセス終了を妨げないようにする
        if rpicam_proc.stdout:
            rpicam_proc.stdout.close()
    except Exception as e:
        # カメラまたはFFmpegの起動に失敗した場合、エラーメッセージを表示してクリーンアップする
        print(f"配信プロセス起動エラー: {e}")
        stop_process(ffmpeg_proc, "FFmpeg")
        stop_process(rpicam_proc, "カメラ")
        ffmpeg_proc = None
        rpicam_proc = None
        delete_channel(current_channel_id)
        return

    # 起動直後に落ちていないか確認してから配信状態へ遷移(カメラが他で使用中だと起動直後に落ちたりするため)
    time.sleep(1)
    if rpicam_proc.poll() is not None or ffmpeg_proc.poll() is not None:
        print("配信プロセスが正常起動しませんでした。チャンネルを破棄します。")
        stop_process(ffmpeg_proc, "FFmpeg")
        stop_process(rpicam_proc, "カメラ")
        ffmpeg_proc = None
        rpicam_proc = None
        delete_channel(current_channel_id)
        return

    # 配信開始が確認できた後にLEDを更新 (緑OFF, 赤ON)
    led_g.off()
    led_r.on()

    is_streaming = True

# 配信停止・チャンネル削除・LED更新をまとめて行う
def stop_stream():
    global is_streaming, current_channel_id, rpicam_proc, ffmpeg_proc

    print("配信を停止しています...")
    # プロセスを安全に終了させる
    stop_process(ffmpeg_proc, "FFmpeg")
    stop_process(rpicam_proc, "カメラ")
    ffmpeg_proc = None
    rpicam_proc = None

    # チャンネル削除APIの呼び出し
    if current_channel_id:
        print("チャンネルを削除中...")
        delete_channel(current_channel_id)
        current_channel_id = None

    # LED状態を更新 (赤OFF, 緑ON)
    led_r.off()
    led_g.on()
    is_streaming = False
    print("待機状態に戻りました。")

# APIでチャンネルを削除する。
def delete_channel(channel_id):
    headers = {
        "Content-Type": "application/json",
        "X-Sora-Target": "ImageFlux_20180501.DeleteChannel",
        "Authorization": f"Bearer {ACCESS_TOKEN}"
    }
    try:
        res = requests.post(
            API_URL,
            headers=headers,
            json={"channel_id": channel_id},
            timeout=REQUEST_TIMEOUT,
        )
        # 200番台以外は例外として処理
        res.raise_for_status()
    except requests.RequestException as e:
        print(f"チャンネル削除エラー: {e}")

# ボタン押下時、配信開始/停止を切り替える
def toggle_stream():
    if is_streaming:
        stop_stream()
    else:
        start_stream()

# プロセスを安全に停止する。終了しない場合は強制終了する。
def stop_process(proc, name):
    if not proc:
        return
    if proc.poll() is not None:
        return

    proc.terminate()
    try:
        proc.wait(timeout=PROCESS_WAIT_TIMEOUT)
    except subprocess.TimeoutExpired:
        print(f"{name} が終了しないため強制終了します。")
        proc.kill()
        try:
            proc.wait(timeout=2)
        except subprocess.TimeoutExpired:
            print(f"{name} の強制終了待機もタイムアウトしました。")

# プログラム終了時(Ctrl+Cやシャットダウン時)、安全に終了する
def cleanup(signum, frame):
    print("\nImageFlux Live Streaming RTMP to HLS Machineを終了します...")
    # 配信中の場合は停止処理を行う
    if is_streaming:
        stop_stream()
    # LEDをすべて消す
    led_g.off()
    led_r.off()
    sys.exit(0)

# 配信中のプロセス異常終了を監視し、自動で待機状態へ戻す。
def monitor_stream_health():
    global is_streaming

    if not is_streaming:
        return

    # カメラまたはFFmpegのいずれかが終了している場合、配信を停止する
    if (rpicam_proc and rpicam_proc.poll() is not None) or (ffmpeg_proc and ffmpeg_proc.poll() is not None):
        rpicam_code = rpicam_proc.poll() if rpicam_proc else None
        ffmpeg_code = ffmpeg_proc.poll() if ffmpeg_proc else None
        print(f"配信プロセスが停止しました (rpicam-vid={rpicam_code}, ffmpeg={ffmpeg_code})")
        stop_stream()

# ========================================================
# メイン処理
# ========================================================
if __name__ == "__main__":
    # 強制終了シグナルを捕捉して cleanup関数を呼ぶ
    signal.signal(signal.SIGINT, cleanup)
    signal.signal(signal.SIGTERM, cleanup)

    print("=== ImageFlux Live Streaming RTMP to HLS Machine起動 ===")
    
    # 待機状態(緑ON)でスタート
    led_g.on()
    led_r.off()

    # ボタンが押されたら toggle_stream 関数を実行するよう紐付け
    btn.when_pressed = toggle_stream

    # メインループ(イベント待ち + プロセス監視)
    try:
        while True:
            monitor_stream_health()
            time.sleep(1)
    except Exception:
        cleanup(None, None)

サービス登録して再起動すれば、先の写真のようにすぐ緑色LEDが点灯する。

# サービス情報を記載
sudo nano /etc/systemd/system/stream_app.service
sudo systemctl daemon-reload
sudo systemctl enable stream_app.service
sudo reboot
[Unit]
Description=ImageFlux Live Streaming Service
# ネットワーク(Wi-Fi等)が完全に繋がってから起動させるための設定
After=network-online.target
Wants=network-online.target

[Service]
Type=simple
# 実行するユーザー
User=
# スクリプトがあるディレクトリ
WorkingDirectory=
# 実行するコマンド(Pythonの絶対パス + スクリプトの絶対パス)
ExecStart=/usr/bin/python3 /...
# 万が一プログラムがクラッシュしても、10秒後に自動で再起動して復旧させる
Restart=always
RestartSec=10

[Install]
# OSの起動プロセスに組み込むための設定
WantedBy=multi-user.target

配信

緑色LEDが点灯している状態でタクトスイッチを押せば、ほどなく配信が開始され、赤色LEDが点灯する。
IMG_0042.jpg
続けて、

配信が開始されました: チャンネルID:[チャンネルID] HLSプレイリストURL:[.m3u8のURL]

という内容のメールが届けば正解だ。URLからライブ配信を視聴できる。
image.png
再度タクトスイッチを押せば、配信が終了し、録画先のパスがメール通知される。

アーカイブが作成されました: チャンネルID:[チャンネルID] アーカイブ保存先URI:[URI] ファイルパス:[バケットより後ろのパス] ファイルサイズ:[size] ファイル形式:"m3u8"

これで完成だ。

まとめ

今回は、RTMP to HLS配信で、Raspberry Pi 5からImageFlux Live Streamingに向けてIngestする仕組みを構築してみた。
ボタンを押して配信するシンプルな仕組みだが、これを拡張して遠隔で配信開始・終了するようにしたり、センサーを搭載して人や物が近くにある時だけ配信・監視できるようにすることも可能である。
IoTライブ配信の片鱗を、感じていただけたろうか。

2
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?