概要
背景
さくらインターネットのライブ配信PaaS「ImageFlux Live Streaming」が、2026年3月、RTMP to HLS配信に対応した(記事)。
RTMPはライブ配信のIngestにおいて最大シェアを誇るプロトコルである。これにより、Webブラウザからだけでなく、多種多様な機材・アプリケーションからライブ配信を行うことが可能になった。
そこで今回は、Raspberry Pi 5からライブ配信を行う仕組みを組んでみた。
実装内容
今回はハードウェア、ソフトウェア両方を扱うため、分けて解説する。
ハードウェア
Raspberry Pi 5で実現する内容は以下の通り。

Raspberry Pi 5に接続したカメラおよびマイクでライブ配信をする。
- 待機状態でタクトスイッチを押下するとRTMP to HLSチャンネル作成・配信開始し、緑色LEDを消灯&赤色LEDを点灯。
- 配信状態でタクトスイッチを押下すると配信終了・配信していたチャンネルを削除し、赤色LEDを消灯&緑色LEDを点灯。
という至ってシンプルな仕様である。
ソフトウェア
配信が開始されたら、AppRun共用型を用いてImageFlux Live StreamingのイベントWebhook通知を受け取る。APIで視聴用URL(.m3u8のパス)を取得し、シンプル通知経由でメール送付する。
これで、配信側ではスイッチを押すだけ、視聴者側ではメールで通知されたURLにアクセスするだけで、配信が成立する。

これは任意だが、録画はオブジェクトストレージに保存し、ウェブアクセラレータ経由で配信できるようにもした。
構築
ハードウェア調達
筆者はスイッチサイエンスにてRaspberry Pi 5のコンプリートキットおよびカメラを調達した。マイクは市販されている廉価なUSBマイクを用いた。
ほか、ブレッドボード、ジャンプワイヤー、LED2種、抵抗2個、タクトスイッチが必要である。
回路は至って単純で、
1.GPIO17番-抵抗(330Ω)-LED(緑)-GND
2.GPIO27番-抵抗(330Ω)-LED(赤)-GND
3.GPIO22番-タクトスイッチ-GND
という構成。

慣れていればものの5分で結線可能である。
クラウド基盤構築
続けてクラウド基盤を構築する。
シンプル通知の実行には「作成・削除」権限を持つサービスプリンシパルおよびサービスプリンシパルキーが必要であるため発行(参考)、ImageFlux Live StreamingのAPIトークンも発行した(参考)。
コンテナレジストリにAppRun共用型で動作させるコンテナイメージを格納後、Terraformで基盤を構築する。AppRun共用型の最小スケールを0に設定することで、イベントWebhook通知が走っている場合のみ起動し、コストを最小限に抑えるように設計している。
resource "sakura_simple_notification_destination" "notification_target_for_imageflux_live_streaming_archive_notifier" {
name = "通知先メールアドレス"
description = "ImageFlux Live Streamingの通知先メールアドレス"
type = "email"
value = var.email_address
}
resource "sakura_simple_notification_group" "notification_group_for_imageflux_live_streaming_archive_notifier" {
name = "ImageFlux Live Streaming通知グループ"
description = "ImageFlux Live Streaming配信開始・アーカイブ保存完了の通知グループ"
destinations = [sakura_simple_notification_destination.notification_target_for_imageflux_live_streaming_archive_notifier.id]
}
//1変数で秘密鍵の値を保持できないため、分割
locals {
service_principal_private_key_pem_content = file(var.service_principal_private_key_pem_path)
service_principal_private_key_pem_b64 = base64encode(local.service_principal_private_key_pem_content)
service_principal_private_key_b64_chunk_count = ceil(length(local.service_principal_private_key_pem_b64) / var.service_principal_private_key_b64_chunk_size)
service_principal_private_key_b64_chunk_envs = [for i in range(local.service_principal_private_key_b64_chunk_count) : {
key = format("SERVICE_PRINCIPAL_PRIVATE_KEY_PEM_B64_%03d", i)
value = substr(local.service_principal_private_key_pem_b64, i * var.service_principal_private_key_b64_chunk_size, var.service_principal_private_key_b64_chunk_size)
}]
}
resource "sakura_apprun_shared" "imageflux_live_streaming_notifier" {
name = "ImageFlux Live Streaming通知機能"
max_scale = 3
min_scale = 0
port = 8080
timeout_seconds = 60
components = [{
name = "ImageFlux Live Streaming通知コンテナ"
max_cpu = "0.5"
max_memory = "1Gi"
deploy_source = {
container_registry = {
image = var.container_registry_image
}
}
env = concat([
{
key = "SERVICE_PRINCIPAL_KEY_KID"
value = var.service_principal_key_kid
},
{
key = "SERVICE_PRINCIPAL_RESOURCE_ID"
value = var.service_principal_resource_id
},
{
key = "SERVICE_PRINCIPAL_PRIVATE_KEY_PEM_B64_CHUNK_COUNT"
value = tostring(local.service_principal_private_key_b64_chunk_count)
},
{
key = "NOTIFICATION_GROUP_ID"
value = sakura_simple_notification_group.notification_group_for_imageflux_live_streaming_archive_notifier.id
},
{
key = "IMAGEFLUX_ACCESS_TOKEN"
value = var.imageflux_access_token
}
], local.service_principal_private_key_b64_chunk_envs)
}]
traffics = [{
version_index = 0
percent = 100
}]
}
オブジェクトストレージとさくらのウェブアクセラレータは既存のものを使用したため、Terraformには記載していない。
ソースコード全体はリポジトリを参照されたし。
Terraform init
Terraform apply
を叩けば、すぐ構築されるはずだ。Raspberry Pi 5上に載せるソースコードに設定するAppRun共用型の公開URLも、ここで取得できる。
ほどなくシンプル通知の本登録リンクがメールアドレスに届くので、その本登録も済ませておこう。
Raspberry Pi 5へのサービス登録
いよいよ、Raspberry Pi 5に、ライブ配信機能を載せる。
ソースコードは以下の通り。コンプリートキットで購入したmicroSDにはOSがインストール済みであり、PythonやFFmpeg等の実行環境も整っている。そのため、冒頭の設定値だけ設定すればよい。
なお、アーカイブ保存先IDはImageFlux Live StreamingのAPIで作成・取得できる。
from gpiozero import Button, LED
import requests
import signal
import subprocess
import sys
import time
# ========================================================
# 利用者設定項目
# ========================================================
ACCESS_TOKEN = "ImageFlux Live Streamingのアクセストークン"
ARCHIVE_DESTINATION_ID = "ImageFlux Live Streamingのアーカイブ保存先ID"
EVENT_WEBHOOK_URL = "イベントWebhook通知先URL"
API_URL = "https://live-api.imageflux.jp/"
# APIのリクエストタイムアウト設定 (接続タイムアウト, 読み込みタイムアウト)
REQUEST_TIMEOUT = (5, 15)
# プロセス終了待機タイムアウト (秒)
PROCESS_WAIT_TIMEOUT = 5
# ALSAの入力デバイス名。既定デバイスを使う場合は "default"。
# 必要に応じて "hw:1,0" や "plughw:1,0" に変更可能。
AUDIO_DEVICE = "default"
# 音声設定
AUDIO_SAMPLE_RATE = 48000
AUDIO_CHANNELS = 1
AUDIO_BITRATE = "160k"
# ========================================================
# ハードウェア設定 (gpiozeroを使用)
# ========================================================
# bounce_time=0.1 でチャタリングを抑制
btn = Button(22, pull_up=True, bounce_time=0.1)
led_g = LED(17)
led_r = LED(27)
# ========================================================
# 状態管理変数
# ========================================================
is_streaming = False
current_channel_id = None
rpicam_proc = None
ffmpeg_proc = None
# 配信開始
def start_stream():
global is_streaming, current_channel_id, rpicam_proc, ffmpeg_proc
print("配信チャンネルを作成中...")
headers = {
"Content-Type": "application/json",
"X-Sora-Target": "ImageFlux_20250901.CreateRTMPChannel",
"Authorization": f"Bearer {ACCESS_TOKEN}"
}
payload = {
"hls": [{
"durationSeconds": 2,
"startTimeOffset": 2,
"video": {
"width": 1280,
"height": 720,
"fps": 30,
"bps": 4000000,
"codec": "h264_high"
},
"audio": {
"bps": 160000
},
"archive": {
"archive_destination_id": ARCHIVE_DESTINATION_ID
}
},{
"durationSeconds": 2,
"startTimeOffset": 2,
"video": {
"width": 720,
"height": 480,
"fps": 30,
"bps": 2000000,
"codec": "h264_high"
},
"audio": {
"bps": 160000
},
"archive": {
"archive_destination_id": ARCHIVE_DESTINATION_ID
}
}],
"event_webhook_url": EVENT_WEBHOOK_URL
}
try:
# API要求
res = requests.post(API_URL, headers=headers, json=payload, timeout=REQUEST_TIMEOUT)
res.raise_for_status() # 200番台以外は例外として処理
except requests.RequestException as e:
print(f"チャンネル作成エラー: {e}")
return
try:
data = res.json()
except ValueError as e:
print(f"API応答JSON解析エラー: {e}")
return
current_channel_id = data.get("channel_id")
ingest_url = data.get("ingest_url")
playlist_url = data.get("playlist_url")
# 必要な値がない場合、終了(チャンネルIDは削除時、 ingestURLは配信開始時に必要)
if not isinstance(current_channel_id, str) or not current_channel_id.strip():
print("不完全なチャンネル作成API応答: channel_id がありません。")
return
if not isinstance(ingest_url, str) or not ingest_url.strip():
print("不完全なチャンネル作成API応答: ingest_url がありません。")
return
current_channel_id = current_channel_id.strip()
ingest_url = ingest_url.strip()
print(f"チャンネル作成完了 チャンネルID: {current_channel_id} Ingest URL: {ingest_url}")
# 映像処理コマンドの定義
# タイムスタンプをFFmpegのものに合わせるため、--libav-format を mpegts としている。
rpicam_cmd = [
"rpicam-vid", "-t", "0", "--width", "1280", "--height", "720",
"--framerate", "30", "--codec", "h264", "--profile", "high",
"--inline", "--nopreview", "--libav-format", "mpegts", "-o", "-"
]
# ffmpeg側は映像を mpegts で受け取り、USBマイク(ALSA)入力を合成してRTMPへ送出
ffmpeg_cmd = [
"ffmpeg",
"-thread_queue_size", "1024", "-f", "mpegts", "-i", "-",
"-thread_queue_size", "1024", "-f", "alsa", "-ac", str(AUDIO_CHANNELS), "-ar", str(AUDIO_SAMPLE_RATE), "-i", AUDIO_DEVICE,
"-map", "0:v:0", "-map", "1:a:0",
"-c:v", "copy",
"-c:a", "aac", "-b:a", AUDIO_BITRATE,
"-af", "aresample=async=1:first_pts=0",
"-f", "flv", ingest_url
]
print("カメラとFFmpegを起動します...")
try:
# rpicam-vid の標準出力を ffmpeg の標準入力にパイプで繋ぐ
rpicam_proc = subprocess.Popen(rpicam_cmd, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL)
ffmpeg_proc = subprocess.Popen(ffmpeg_cmd, stdin=rpicam_proc.stdout, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
# 親プロセスのパイプFDを閉じ、子プロセス終了を妨げないようにする
if rpicam_proc.stdout:
rpicam_proc.stdout.close()
except Exception as e:
# カメラまたはFFmpegの起動に失敗した場合、エラーメッセージを表示してクリーンアップする
print(f"配信プロセス起動エラー: {e}")
stop_process(ffmpeg_proc, "FFmpeg")
stop_process(rpicam_proc, "カメラ")
ffmpeg_proc = None
rpicam_proc = None
delete_channel(current_channel_id)
return
# 起動直後に落ちていないか確認してから配信状態へ遷移(カメラが他で使用中だと起動直後に落ちたりするため)
time.sleep(1)
if rpicam_proc.poll() is not None or ffmpeg_proc.poll() is not None:
print("配信プロセスが正常起動しませんでした。チャンネルを破棄します。")
stop_process(ffmpeg_proc, "FFmpeg")
stop_process(rpicam_proc, "カメラ")
ffmpeg_proc = None
rpicam_proc = None
delete_channel(current_channel_id)
return
# 配信開始が確認できた後にLEDを更新 (緑OFF, 赤ON)
led_g.off()
led_r.on()
is_streaming = True
# 配信停止・チャンネル削除・LED更新をまとめて行う
def stop_stream():
global is_streaming, current_channel_id, rpicam_proc, ffmpeg_proc
print("配信を停止しています...")
# プロセスを安全に終了させる
stop_process(ffmpeg_proc, "FFmpeg")
stop_process(rpicam_proc, "カメラ")
ffmpeg_proc = None
rpicam_proc = None
# チャンネル削除APIの呼び出し
if current_channel_id:
print("チャンネルを削除中...")
delete_channel(current_channel_id)
current_channel_id = None
# LED状態を更新 (赤OFF, 緑ON)
led_r.off()
led_g.on()
is_streaming = False
print("待機状態に戻りました。")
# APIでチャンネルを削除する。
def delete_channel(channel_id):
headers = {
"Content-Type": "application/json",
"X-Sora-Target": "ImageFlux_20180501.DeleteChannel",
"Authorization": f"Bearer {ACCESS_TOKEN}"
}
try:
res = requests.post(
API_URL,
headers=headers,
json={"channel_id": channel_id},
timeout=REQUEST_TIMEOUT,
)
# 200番台以外は例外として処理
res.raise_for_status()
except requests.RequestException as e:
print(f"チャンネル削除エラー: {e}")
# ボタン押下時、配信開始/停止を切り替える
def toggle_stream():
if is_streaming:
stop_stream()
else:
start_stream()
# プロセスを安全に停止する。終了しない場合は強制終了する。
def stop_process(proc, name):
if not proc:
return
if proc.poll() is not None:
return
proc.terminate()
try:
proc.wait(timeout=PROCESS_WAIT_TIMEOUT)
except subprocess.TimeoutExpired:
print(f"{name} が終了しないため強制終了します。")
proc.kill()
try:
proc.wait(timeout=2)
except subprocess.TimeoutExpired:
print(f"{name} の強制終了待機もタイムアウトしました。")
# プログラム終了時(Ctrl+Cやシャットダウン時)、安全に終了する
def cleanup(signum, frame):
print("\nImageFlux Live Streaming RTMP to HLS Machineを終了します...")
# 配信中の場合は停止処理を行う
if is_streaming:
stop_stream()
# LEDをすべて消す
led_g.off()
led_r.off()
sys.exit(0)
# 配信中のプロセス異常終了を監視し、自動で待機状態へ戻す。
def monitor_stream_health():
global is_streaming
if not is_streaming:
return
# カメラまたはFFmpegのいずれかが終了している場合、配信を停止する
if (rpicam_proc and rpicam_proc.poll() is not None) or (ffmpeg_proc and ffmpeg_proc.poll() is not None):
rpicam_code = rpicam_proc.poll() if rpicam_proc else None
ffmpeg_code = ffmpeg_proc.poll() if ffmpeg_proc else None
print(f"配信プロセスが停止しました (rpicam-vid={rpicam_code}, ffmpeg={ffmpeg_code})")
stop_stream()
# ========================================================
# メイン処理
# ========================================================
if __name__ == "__main__":
# 強制終了シグナルを捕捉して cleanup関数を呼ぶ
signal.signal(signal.SIGINT, cleanup)
signal.signal(signal.SIGTERM, cleanup)
print("=== ImageFlux Live Streaming RTMP to HLS Machine起動 ===")
# 待機状態(緑ON)でスタート
led_g.on()
led_r.off()
# ボタンが押されたら toggle_stream 関数を実行するよう紐付け
btn.when_pressed = toggle_stream
# メインループ(イベント待ち + プロセス監視)
try:
while True:
monitor_stream_health()
time.sleep(1)
except Exception:
cleanup(None, None)
サービス登録して再起動すれば、先の写真のようにすぐ緑色LEDが点灯する。
# サービス情報を記載
sudo nano /etc/systemd/system/stream_app.service
sudo systemctl daemon-reload
sudo systemctl enable stream_app.service
sudo reboot
[Unit]
Description=ImageFlux Live Streaming Service
# ネットワーク(Wi-Fi等)が完全に繋がってから起動させるための設定
After=network-online.target
Wants=network-online.target
[Service]
Type=simple
# 実行するユーザー
User=
# スクリプトがあるディレクトリ
WorkingDirectory=
# 実行するコマンド(Pythonの絶対パス + スクリプトの絶対パス)
ExecStart=/usr/bin/python3 /...
# 万が一プログラムがクラッシュしても、10秒後に自動で再起動して復旧させる
Restart=always
RestartSec=10
[Install]
# OSの起動プロセスに組み込むための設定
WantedBy=multi-user.target
配信
緑色LEDが点灯している状態でタクトスイッチを押せば、ほどなく配信が開始され、赤色LEDが点灯する。

続けて、
配信が開始されました: チャンネルID:[チャンネルID] HLSプレイリストURL:[.m3u8のURL]
という内容のメールが届けば正解だ。URLからライブ配信を視聴できる。

再度タクトスイッチを押せば、配信が終了し、録画先のパスがメール通知される。
アーカイブが作成されました: チャンネルID:[チャンネルID] アーカイブ保存先URI:[URI] ファイルパス:[バケットより後ろのパス] ファイルサイズ:[size] ファイル形式:"m3u8"
これで完成だ。
まとめ
今回は、RTMP to HLS配信で、Raspberry Pi 5からImageFlux Live Streamingに向けてIngestする仕組みを構築してみた。
ボタンを押して配信するシンプルな仕組みだが、これを拡張して遠隔で配信開始・終了するようにしたり、センサーを搭載して人や物が近くにある時だけ配信・監視できるようにすることも可能である。
IoTライブ配信の片鱗を、感じていただけたろうか。