はじめに
この記事ではWebRTCとflutter、typescriptを使ってローカル環境で動作する簡易的なライブ配信Webアプリケーションと配信視聴モバイルアプリケーションを作成します。
大学院のプロジェクトでWebRTCを活用したIoT連携のWebアプリケーションを開発しているのと、インターンシップでflutterアプリケーションを開発しているのでこれら二つを合わせて何かやってみたいと思ったことが開発の背景です。
記事の内容に誤りがないように注意していますが、万が一誤った内容や誤字脱字等ありましたら コメントや編集リクエストなどでやさしくご指摘いただけますと幸いです。
制作物の概要
概要とデモ動画
今回作成するアプリケーションは以下のような動作をするものです。
また、ソースコードはGitHub上に公開しております。
機能としてはライブ配信をしているユーザーの動画・音声を視聴者が閲覧するというプリミティブなライブ配信アプリです。
システム構成は以下のとおりです。
配信サイトはHTML/CSS/JSを用いたWebアプリケーションです(simulatorだとcameraが使えないためwebアプリでの実装です).
サーバーはTypeScriptで作成しています。
視聴者が映像を視聴するのはFlutter製のアプリケーションです。
このアプリケーションは同じマシン内で実行されることを前提としています。
mediasoupサーバと各クライアント間でWebRTC接続が行われるため、ICE経路を確立する必要があります。
今回は特にホスティング等はしていないため、その範囲が同一マシン内に限定されるということになります。
mediasoupについて
ここで、mediasoupについて説明します。
通常、WebRTCはp2pの通信を実現するための通信方式です。
p2pでライブ配信をしようとすると配信者は全視聴者とWebRTC接続をする必要があり、負荷が大きくなるという問題があります。
このmediasoup
は SFU(Selective Forwarding Unit)という機能を提供するためのライブラリです。
SFUとはp2pとは異なり、配信者の映像や音声をサーバー経由で視聴者に送る機能です。
これによって、配信者はMediasoupサーバーとだけ接続すれば良いということになり、p2p通信の際に起こり得る負荷の問題を解消することができます。
実装
事前準備
本コードは以下の環境で作成・実行しました。
$ uname -a
Darwin you22fynoMacBook-Air-2.local 24.5.0 Darwin Kernel Version 24.5.0: Tue Apr 22 19:54:33 PDT 2025; root:xnu-11417.121.6~2/RELEASE_ARM64_T8122 arm64
$ node -v
v23.11.0
$ npm -v
10.9.2
$ flutter doctor
Doctor summary (to see all details, run flutter doctor -v):
[✓] Flutter (Channel stable, 3.27.4, on macOS 15.5 24F74 darwin-arm64, locale ja-JP)
[✓] Android toolchain - develop for Android devices (Android SDK version 35.0.0)
[✓] Xcode - develop for iOS and macOS (Xcode 16.0)
[✓] Chrome - develop for the web
[✓] Android Studio (version 2024.1)
[✓] VS Code (version 1.100.2)
作業ディレクトリを作成して、以下のコマンドを実行してください。
flutter create app
mkdir server && cd server && npm init -y
cd ..
mkdir server && cd web && npm init -y
こののような構成になっていればOKです。
.
├── app
├── server
└── web
全体像
アプリケーションの全体像をシーケンス図に示したものです。
配信者の接続とライブ配信を開始する
視聴者の接続とライブ視聴開始
新規配信者が参加した場合の視聴者への通知
mediasoupサーバーの実装
mediasoupサーバーの実装はTypescriptを使用します。
まずは、必要なパッケージをインストールします。
mediasoupのシグナリングにはソケットを利用するのでws
をインストールします。
また、Webサーバーの立ち上げのために express
も利用します。
npm install mediasoup ws express
server/main.ts
ファイルを作成して以下の内容を記入します。
import * as mediasoup from 'mediasoup';
import { Worker, Router, WebRtcTransport, Producer, Consumer, WorkerLogLevel, WorkerLogTag } from 'mediasoup/node/lib/types';
import express from 'express';
import * as http from 'http';
import * as WebSocket from 'ws';
const app = express();
const server = http.createServer(app);
const wss = new WebSocket.Server({ server });
const config = {
listenIp: '0.0.0.0',
listenPort: 8000,
mediasoup: {
numWorkers: 1,
worker: {
rtcMinPort: 10000,
rtcMaxPort: 10100,
logLevel: 'warn' as WorkerLogLevel,
logTags: ['info', 'ice', 'dtls', 'rtp', 'srtp', 'rtcp'] as WorkerLogTag[]
},
router: {
mediaCodecs: [
{
kind: 'audio' as const,
mimeType: 'audio/opus',
clockRate: 48000,
channels: 2
},
{
kind: 'video' as const,
mimeType: 'video/VP8',
clockRate: 90000,
parameters: {}
},
{
kind: 'video' as const,
mimeType: 'video/H264',
clockRate: 90000,
parameters: {
'packetization-mode': 1,
'profile-level-id': '4d0032',
'level-asymmetry-allowed': 1
}
}
]
},
webRtcTransport: {
listenIps: [
{
ip: '0.0.0.0',
announcedIp: undefined as string | undefined
}
],
maxIncomingBitrate: 1500000,
initialAvailableOutgoingBitrate: 1000000
}
}
};
interface Peer {
id: string;
ws: WebSocket;
transports: Map<string, WebRtcTransport>;
producers: Map<string, Producer>;
consumers: Map<string, Consumer>;
}
class MediasoupServer {
private workers: Worker[] = [];
private routers: Router[] = [];
private peers: Map<string, Peer> = new Map();
private nextWorkerIdx = 0;
async init() {
await this.createWorkers();
await this.startWebSocketServer();
console.log(`MediaSoup SFU Server is running on port ${config.listenPort}`);
}
private async createWorkers() {
const { numWorkers } = config.mediasoup;
for (let i = 0; i < numWorkers; i++) {
const worker = await mediasoup.createWorker({
logLevel: config.mediasoup.worker.logLevel,
logTags: config.mediasoup.worker.logTags,
rtcMinPort: config.mediasoup.worker.rtcMinPort,
rtcMaxPort: config.mediasoup.worker.rtcMaxPort
});
worker.on('died', () => {
console.error('mediasoup worker が停止しました。');
process.exit(1);
});
this.workers.push(worker);
const router = await worker.createRouter({
mediaCodecs: config.mediasoup.router.mediaCodecs
});
this.routers.push(router);
}
}
private getRouter(): Router {
const router = this.routers[this.nextWorkerIdx];
this.nextWorkerIdx = (this.nextWorkerIdx + 1) % this.routers.length;
return router;
}
private async startWebSocketServer() {
wss.on('connection', (ws: WebSocket) => {
const peerId = this.generatePeerId();
const peer: Peer = {
id: peerId,
ws,
transports: new Map(),
producers: new Map(),
consumers: new Map()
};
this.peers.set(peerId, peer);
console.log(`新しいpeerが接続: ${peerId}`);
ws.on('message', async (message: string) => {
try {
const data = JSON.parse(message);
await this.handleMessage(peer, data);
} catch (error) {
console.error(error);
ws.send(JSON.stringify({
type: 'error',
error: error instanceof Error ? error.message : String(error)
}));
}
});
ws.on('close', () => {
console.log(`peerが切断: ${peerId}`);
this.closePeer(peerId);
});
ws.on('error', (error) => {
console.error(error);
});
ws.send(JSON.stringify({
type: 'connected',
peerId,
routerRtpCapabilities: this.getRouter().rtpCapabilities
}));
setTimeout(() => {
this.notifyNewPeerAboutExistingProducers(peer);
}, 1000);
});
server.listen(config.listenPort, config.listenIp, () => {
console.log(`WebSocketサーバーが ${config.listenIp}:${config.listenPort} でlistenしています`);
});
}
private async handleMessage(peer: Peer, data: any) {
const { type, ...params } = data;
switch (type) {
case 'createTransport':
await this.createTransport(peer, params);
break;
case 'connectTransport':
await this.connectTransport(peer, params);
break;
case 'produce':
await this.produce(peer, params);
break;
case 'consume':
await this.consume(peer, params);
break;
case 'resume':
await this.resume(peer, params);
break;
case 'getProducers':
await this.getProducers(peer);
break;
default:
throw new Error(`Unknown message type: ${type}`);
}
}
private async createTransport(peer: Peer, { direction }: { direction: 'send' | 'recv' }) {
const router = this.getRouter();
const transport = await router.createWebRtcTransport({
listenIps: config.mediasoup.webRtcTransport.listenIps,
initialAvailableOutgoingBitrate: config.mediasoup.webRtcTransport.initialAvailableOutgoingBitrate,
enableUdp: true,
enableTcp: true,
preferUdp: true,
appData: { peerId: peer.id, direction }
});
peer.transports.set(transport.id, transport);
peer.ws.send(JSON.stringify({
type: 'transportCreated',
id: transport.id,
iceParameters: transport.iceParameters,
iceCandidates: transport.iceCandidates,
dtlsParameters: transport.dtlsParameters
}));
}
private async connectTransport(
peer: Peer,
{ transportId, dtlsParameters }: { transportId: string; dtlsParameters: any }
) {
const transport = peer.transports.get(transportId);
if (!transport) {
throw new Error(`Transport ${transportId} not found`);
}
await transport.connect({ dtlsParameters });
peer.ws.send(JSON.stringify({
type: 'transportConnected',
transportId
}));
}
private async produce(
peer: Peer,
{
transportId,
kind,
rtpParameters,
appData
}: {
transportId: string;
kind: mediasoup.types.MediaKind;
rtpParameters: any;
appData: any;
}
) {
const transport = peer.transports.get(transportId);
if (!transport) {
throw new Error(`Transport ${transportId} not found`);
}
const producer = await transport.produce({
kind,
rtpParameters,
appData: { ...appData, peerId: peer.id }
});
peer.producers.set(producer.id, producer);
peer.ws.send(JSON.stringify({
type: 'produced',
id: producer.id
}));
this.peers.forEach((otherPeer) => {
if (otherPeer.id !== peer.id) {
otherPeer.ws.send(JSON.stringify({
type: 'newProducer',
producerId: producer.id,
peerId: peer.id,
kind: producer.kind
}));
}
});
}
private async consume(
peer: Peer,
{ producerId, rtpCapabilities }: { producerId: string; rtpCapabilities: any }
) {
const router = this.getRouter();
if (!router.canConsume({ producerId, rtpCapabilities })) {
throw new Error('Cannot consume');
}
let producer: Producer | undefined;
let producerPeer: Peer | undefined;
for (const [, p] of this.peers) {
const prod = p.producers.get(producerId);
if (prod) {
producer = prod;
producerPeer = p;
break;
}
}
if (!producer || !producerPeer) {
throw new Error(`Producer ${producerId} not found`);
}
const transport = Array.from(peer.transports.values()).find(
t => (t.appData as any).direction === 'recv'
);
if (!transport) {
throw new Error('No receive transport found');
}
const consumer = await transport.consume({
producerId,
rtpCapabilities,
paused: false,
appData: { peerId: peer.id, producerPeerId: producerPeer.id }
});
peer.consumers.set(consumer.id, consumer);
peer.ws.send(JSON.stringify({
type: 'consumed',
id: consumer.id,
producerId: producer.id,
kind: consumer.kind,
rtpParameters: consumer.rtpParameters,
producerPeerId: producerPeer.id
}));
}
private async resume(peer: Peer, { consumerId }: { consumerId: string }) {
const consumer = peer.consumers.get(consumerId);
if (!consumer) {
throw new Error(`Consumer ${consumerId} not found`);
}
await consumer.resume();
peer.ws.send(JSON.stringify({
type: 'resumed',
consumerId
}));
}
private async getProducers(peer: Peer) {
const producers: Array<{ producerId: string; peerId: string; kind: string }> = [];
this.peers.forEach((otherPeer) => {
if (otherPeer.id !== peer.id) {
otherPeer.producers.forEach((producer) => {
producers.push({
producerId: producer.id,
peerId: otherPeer.id,
kind: producer.kind
});
});
}
});
if (producers.length > 0) {
producers.forEach(p => {
console.log(` producer: ${p.producerId} (${p.kind}) peer: ${p.peerId}`);
});
}
peer.ws.send(JSON.stringify({
type: 'existingProducers',
producers
}));
}
private notifyNewPeerAboutExistingProducers(peer: Peer) {
this.peers.forEach((otherPeer) => {
if (otherPeer.id !== peer.id) {
otherPeer.producers.forEach((producer) => {
console.log(`peer: ${otherPeer.id} producer: ${producer.id} (${producer.kind})`);
peer.ws.send(JSON.stringify({
type: 'newProducer',
producerId: producer.id,
peerId: otherPeer.id,
kind: producer.kind
}));
});
}
});
}
private closePeer(peerId: string) {
const peer = this.peers.get(peerId);
if (!peer) return;
peer.transports.forEach(transport => transport.close());
peer.producers.forEach(producer => producer.close());
peer.consumers.forEach(consumer => consumer.close());
this.peers.delete(peerId);
this.peers.forEach((otherPeer) => {
otherPeer.ws.send(JSON.stringify({
type: 'peerClosed',
peerId
}));
});
}
private generatePeerId(): string {
return Math.random().toString(36).substring(2, 15);
}
}
async function getLocalIp(): Promise<string> {
const os = await import('os');
const interfaces = os.networkInterfaces();
for (const name of Object.keys(interfaces)) {
for (const iface of interfaces[name]!) {
if (iface.family === 'IPv4' && !iface.internal) {
return iface.address;
}
}
}
return '127.0.0.1';
}
async function main() {
try {
const localIp = await getLocalIp();
(config.mediasoup.webRtcTransport.listenIps[0] as any).announcedIp = localIp;
console.log(`IPアドレス: ${localIp}`);
const server = new MediasoupServer();
await server.init();
} catch (error) {
console.error('サーバーの起動に失敗しました:', error);
process.exit(1);
}
}
main();
配信Webの実装
次に配信者側が使用するWebアプリケーションを実装します。
まずは必要なパッケージをインストールします。
mediasoup-client
はクライアントサイド側のmediasoupライブラリです。
また、今回はwebサーバのためにvite
を使用します。
npm install --save-dev vite
npm install mediasoup-client
続いて、二つのファイルをweb/
に作成してください。
cd web
touch main.js index.html
import { Device } from 'mediasoup-client';
class MediasoupClient {
constructor() {
this.ws = null;
this.device = null;
this.sendTransport = null;
this.producers = new Map();
this.localStream = null;
this.pendingTransportType = null;
this.isDeviceLoaded = false;
this.messageQueue = [];
this.isProcessingMessages = false;
this.initializeUI();
}
initializeUI() {
this.connectBtn = document.getElementById('connectBtn');
this.disconnectBtn = document.getElementById('disconnectBtn');
this.produceBtn = document.getElementById('produceBtn');
this.stopProduceBtn = document.getElementById('stopProduceBtn');
this.statusEl = document.getElementById('status');
this.localVideo = document.getElementById('localVideo');
this.connectBtn.addEventListener('click', () => this.connect());
this.disconnectBtn.addEventListener('click', () => this.disconnect());
this.produceBtn.addEventListener('click', () => this.startProducing());
this.stopProduceBtn.addEventListener('click', () => this.stopProducing());
}
log(message, type = 'info') {
console.log(`[${type.toUpperCase()}] ${message}`);
}
updateStatus(status) {
const statusMap = {
'Connecting': '接続中',
'Connected': '接続済み',
'Disconnected': '切断中'
};
this.statusEl.textContent = statusMap[status] || status;
this.statusEl.className = 'status ' + status.toLowerCase();
}
async connect() {
try {
this.updateStatus('Connecting');
this.log('WebSocketサーバーに接続中...');
this.ws = new WebSocket('ws://localhost:8000');
this.ws.onopen = async () => {
this.log('WebSocket接続完了', 'success');
this.updateStatus('Connected');
this.connectBtn.disabled = true;
this.disconnectBtn.disabled = false;
this.produceBtn.disabled = false;
await this.getRouterCapabilities();
};
this.ws.onmessage = async (event) => {
const message = JSON.parse(event.data);
this.queueMessage(message);
};
this.ws.onerror = (error) => {
this.log(`WebSocketエラー: ${error}`, 'error');
};
this.ws.onclose = () => {
this.log('WebSocket切断', 'error');
this.updateStatus('Disconnected');
this.resetUI();
};
} catch (error) {
this.log(`接続エラー: ${error.message}`, 'error');
this.updateStatus('Disconnected');
}
}
disconnect() {
if (this.ws) {
this.ws.close();
this.ws = null;
}
this.resetUI();
}
resetUI() {
this.connectBtn.disabled = false;
this.disconnectBtn.disabled = true;
this.produceBtn.disabled = true;
this.stopProduceBtn.disabled = true;
if (this.localStream) {
this.localStream.getTracks().forEach(track => track.stop());
this.localStream = null;
this.localVideo.srcObject = null;
}
}
async getRouterCapabilities() {
}
queueMessage(message) {
this.messageQueue.push(message);
this.processMessageQueue();
}
async processMessageQueue() {
if (this.isProcessingMessages) return;
this.isProcessingMessages = true;
while (this.messageQueue.length > 0) {
const message = this.messageQueue.shift();
try {
await this.handleMessage(message);
} catch (error) {
this.log(`Error processing message: ${error.message}`, 'error');
}
}
this.isProcessingMessages = false;
}
async handleMessage(message) {
switch (message.type) {
case 'connected':
await this.loadDevice(message.routerRtpCapabilities);
this.log(`peer ID ${message.peerId} で接続完了`, 'success');
break;
case 'transportCreated':
await this.createSendTransport(message);
break;
case 'transportConnected':
this.log(`transport ${message.transportId} connected`, 'success');
break;
case 'produced':
this.handleProduceResponse(message);
break;
case 'peerClosed':
this.log(`peer ${message.peerId} が切断されました`, 'info');
break;
case 'error':
this.log(`エラー: ${message.error}`, 'error');
break;
default:
this.log(`不明なメッセージタイプ: ${message.type}`, 'error');
}
}
sendMessage(message) {
if (this.ws && this.ws.readyState === WebSocket.OPEN) {
this.ws.send(JSON.stringify(message));
}
}
async loadDevice(routerRtpCapabilities) {
try {
this.device = new Device();
await this.device.load({ routerRtpCapabilities });
this.isDeviceLoaded = true;
this.log('device loaded', 'success');
await this.createTransports();
} catch (error) {
this.log(`デバイス読み込み失敗: ${error.message}`, 'error');
}
}
async createTransports() {
this.pendingTransportType = 'send';
this.sendMessage({
type: 'createTransport',
direction: 'send'
});
}
async createSendTransport(params) {
try {
this.sendTransport = this.device.createSendTransport({
id: params.id,
iceParameters: params.iceParameters,
iceCandidates: params.iceCandidates,
dtlsParameters: params.dtlsParameters
});
this.sendTransport.on('connect', async ({ dtlsParameters }, callback, errback) => {
try {
this.sendMessage({
type: 'connectTransport',
transportId: this.sendTransport.id,
dtlsParameters
});
callback();
} catch (error) {
errback(error);
}
});
this.sendTransport.on('produce', async ({ kind, rtpParameters, appData }, callback, errback) => {
try {
this.sendMessage({
type: 'produce',
transportId: this.sendTransport.id,
kind,
rtpParameters,
appData: appData || {}
});
this.produceCallback = callback;
} catch (error) {
errback(error);
}
});
this.log('送信トランスポート作成完了', 'success');
} catch (error) {
this.log(`送信トランスポート作成失敗: ${error.message}`, 'error');
}
}
async startProducing() {
try {
if (!this.isDeviceLoaded) {
this.log('デバイスがまだ読み込まれていません。お待ちください...', 'error');
return;
}
if (!this.sendTransport) {
this.log('送信トランスポートの準備ができていません。お待ちください...', 'error');
return;
}
this.log('ユーザーメディア取得中...');
this.localStream = await navigator.mediaDevices.getUserMedia({
video: {
width: { ideal: 1280 },
height: { ideal: 720 }
},
audio: true
});
this.localVideo.srcObject = this.localStream;
this.log('ユーザーメディア取得完了', 'success');
for (const track of this.localStream.getTracks()) {
const producer = await this.sendTransport.produce({
track,
codecOptions: track.kind === 'video' ? {
videoGoogleStartBitrate: 1000
} : {}
});
this.producers.set(producer.id, producer);
this.log(`${track.kind === 'video' ? '映像' : '音声'}配信開始`, 'success');
}
this.produceBtn.disabled = true;
this.stopProduceBtn.disabled = false;
} catch (error) {
this.log(`配信開始失敗: ${error.message}`, 'error');
}
}
handleProduceResponse(message) {
if (this.produceCallback) {
this.produceCallback({ id: message.id });
this.produceCallback = null;
}
}
async stopProducing() {
for (const [id, producer] of this.producers) {
producer.close();
this.sendMessage({
type: 'closeProducer',
data: { producerId: id }
});
}
this.producers.clear();
if (this.localStream) {
this.localStream.getTracks().forEach(track => track.stop());
this.localStream = null;
this.localVideo.srcObject = null;
}
this.produceBtn.disabled = false;
this.stopProduceBtn.disabled = true;
this.log('配信停止', 'success');
}
}
document.addEventListener('DOMContentLoaded', () => {
window.client = new MediasoupClient();
});
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>ライブ配信</title>
<style>
body {
font-family: 'Roboto', -apple-system, BlinkMacSystemFont, 'Segoe UI', sans-serif;
max-width: 800px;
margin: 0 auto;
padding: 16px;
background-color: #fafafa;
min-height: 100vh;
}
h1 {
color: #212121;
text-align: center;
font-size: 2rem;
font-weight: 400;
margin-bottom: 32px;
margin-top: 16px;
}
.container {
background-color: white;
border-radius: 4px;
padding: 16px;
box-shadow: 0 2px 4px rgba(0, 0, 0, 0.1);
margin-bottom: 16px;
border: 1px solid #e0e0e0;
}
.video-container {
display: flex;
gap: 20px;
flex-wrap: wrap;
justify-content: center;
margin: 20px 0;
}
video {
width: 100%;
max-width: 600px;
height: auto;
aspect-ratio: 16/9;
background-color: #000;
border-radius: 4px;
}
.controls {
display: flex;
gap: 10px;
flex-wrap: wrap;
justify-content: center;
margin: 20px 0;
}
button {
padding: 8px 16px;
font-size: 14px;
font-weight: 500;
border: none;
border-radius: 4px;
cursor: pointer;
background-color: #1976d2;
color: white;
transition: background-color 0.2s;
text-transform: uppercase;
letter-spacing: 0.5px;
}
button:hover {
background-color: #1565c0;
}
button:disabled {
background-color: #e0e0e0;
color: #9e9e9e;
cursor: not-allowed;
}
.status {
padding: 12px 16px;
border-radius: 4px;
margin: 8px 0;
font-size: 14px;
font-weight: 500;
}
.status.connected {
background-color: #e8f5e8;
color: #2e7d32;
border-left: 4px solid #4caf50;
}
.status.disconnected {
background-color: #ffebee;
color: #c62828;
border-left: 4px solid #f44336;
}
.status.connecting {
background-color: #fff8e1;
color: #f57c00;
border-left: 4px solid #ff9800;
}
#logs {
background-color: #f8f9fa;
border: 1px solid #dee2e6;
border-radius: 4px;
padding: 10px;
max-height: 300px;
overflow-y: auto;
font-family: monospace;
font-size: 12px;
}
.log-entry {
margin: 2px 0;
padding: 2px;
}
.log-info {
color: #0066cc;
}
.log-error {
color: #cc0000;
}
.log-success {
color: #009900;
}
</style>
</head>
<body>
<h1>ライブ配信</h1>
<div class="container">
<h2 style="color: #333; margin-bottom: 15px; font-size: 1.5em;">接続状態</h2>
<div id="status" class="status disconnected">切断中</div>
</div>
<div class="container">
<h2 style="color: #333; margin-bottom: 20px; font-size: 1.5em;">配信操作</h2>
<div class="controls">
<button id="connectBtn">サーバーに接続</button>
<button id="disconnectBtn" disabled>切断</button>
<button id="produceBtn" disabled>配信開始</button>
<button id="stopProduceBtn" disabled>配信停止</button>
</div>
</div>
<div class="container">
<h2 style="color: #333; margin-bottom: 20px; font-size: 1.5em;">配信プレビュー</h2>
<div class="video-container">
<div style="text-align: center;">
<h3 style="color: #555; margin-bottom: 15px;">あなたの配信</h3>
<video id="localVideo" autoplay muted></video>
</div>
</div>
</div>
<script type="module" src="main.js"></script>
</body>
</html>
HTMLは完全にAIによる実装になっております。
Flutterアプリの実装
最後に視聴者向けのFlutterアプリを作成します。
mediasfu_mediasoup_clientというパッケージを使用します。これはmediasoupのflutter向けパッケージです。
また、同様にシグナリングのためにweb_socket_channelも使用します。
flutter pub add mediasfu_mediasoup_client
flutter pub add web_socket_channel
また、app/lib/
に2つのファイルを新規で作成してください。
cd lib
touch listener_page.dart signaling.dart
実装は以下のようになっています。
import 'package:app/listener_page.dart';
import 'package:flutter/material.dart';
void main() {
runApp(const App());
}
class App extends StatelessWidget {
const App({super.key});
@override
Widget build(BuildContext context) {
return MaterialApp(
navigatorKey: GlobalKey<NavigatorState>(),
home: Home(),
);
}
}
class Home extends StatelessWidget {
const Home({super.key});
@override
Widget build(BuildContext context) {
return Scaffold(
appBar: AppBar(
title: Text('ライブ配信アプリ'),
),
body: Center(
child: TextButton(
style: TextButton.styleFrom(
backgroundColor: Colors.green,
foregroundColor: Colors.white,
textStyle: const TextStyle(fontSize: 32),
),
onPressed: () async {
await Navigator.push(
context,
MaterialPageRoute(
builder: (context) => ListenerPage(),
),
);
},
child: Text('視聴する'),
),
),
);
}
}
import 'dart:async';
import 'dart:convert';
import 'package:flutter/material.dart';
import 'package:mediasfu_mediasoup_client/mediasfu_mediasoup_client.dart';
import 'package:web_socket_channel/web_socket_channel.dart';
class SignalingService {
WebSocketChannel? _channel;
Device? _device;
Transport? _recvTransport;
String? _peerId;
final Map<String, Consumer> _consumers = {};
final Map<String, dynamic> _pendingTransportParams = {};
final StreamController<String> _statusController =
StreamController<String>.broadcast();
final StreamController<String> _logController =
StreamController<String>.broadcast();
final StreamController<MediaStreamTrack> _trackController =
StreamController<MediaStreamTrack>.broadcast();
Stream<String> get statusStream => _statusController.stream;
Stream<String> get logStream => _logController.stream;
Stream<MediaStreamTrack> get trackStream => _trackController.stream;
void _updateStatus(String status) {
_statusController.add(status);
}
Future<void> connect(String serverUrl) async {
try {
_updateStatus('connecting');
_channel = WebSocketChannel.connect(Uri.parse(serverUrl));
_channel!.stream.listen(
(message) {
_handleMessage(jsonDecode(message));
},
onDone: () {
_updateStatus('disconnected');
_cleanup();
},
onError: (error) {
_updateStatus('error');
_cleanup();
},
);
_updateStatus('connected');
} catch (e) {
_updateStatus('error');
rethrow;
}
}
void _handleMessage(Map<String, dynamic> message) async {
try {
final type = message['type'];
switch (type) {
case 'connected':
await _handleConnected(message);
break;
case 'transportCreated':
await _handleTransportCreated(message);
break;
case 'transportConnected':
debugPrint('Transport connected: ${message['transportId']}');
break;
case 'newProducer':
await _handleNewProducer(message);
break;
case 'existingProducers':
await _handleExistingProducers(message);
break;
case 'consumed':
await _handleConsumed(message);
break;
case 'resumed':
debugPrint('Consumer resumed: ${message['consumerId']}');
break;
case 'peerClosed':
debugPrint('Peer closed: ${message['peerId']}');
break;
case 'error':
debugPrint('Server error: ${message['error']}');
break;
default:
debugPrint('Unknown message type: $type');
}
} catch (e) {
debugPrint('Error handling message: $e');
}
}
Future<void> _handleConnected(Map<String, dynamic> message) async {
try {
_peerId = message['peerId'];
final routerRtpCapabilities = message['routerRtpCapabilities'];
_device = Device();
final rtpCapabilities = RtpCapabilities.fromMap(routerRtpCapabilities);
await _device!.load(routerRtpCapabilities: rtpCapabilities);
// Create receive transport
await _createReceiveTransport();
} catch (e) {
debugPrint('Error in handleConnected: $e');
}
}
Future<void> _createReceiveTransport() async {
try {
_sendMessage({
'type': 'createTransport',
'direction': 'recv',
});
} catch (e) {
debugPrint('Error creating receive transport: $e');
}
}
Future<void> _handleTransportCreated(Map<String, dynamic> message) async {
try {
_pendingTransportParams['id'] = message['id'];
_pendingTransportParams['iceParameters'] = message['iceParameters'];
_pendingTransportParams['iceCandidates'] = message['iceCandidates'];
_pendingTransportParams['dtlsParameters'] = message['dtlsParameters'];
void consumerCallback(Consumer consumer, dynamic appData) {
_consumers[consumer.id] = consumer;
_trackController.add(consumer.track);
}
_recvTransport = _device!.createRecvTransportFromMap(
_pendingTransportParams,
consumerCallback: consumerCallback,
);
_recvTransport!.on('connect', (Map data) {
_sendMessage({
'type': 'connectTransport',
'transportId': _recvTransport!.id,
'dtlsParameters': data['dtlsParameters'].toMap(),
});
data['callback']();
});
_sendMessage({
'type': 'getProducers',
});
} catch (e) {
debugPrint('Error handling transport created: $e');
}
}
Future<void> _handleNewProducer(Map<String, dynamic> message) async {
try {
final producerId = message['producerId'];
if (_device == null || _recvTransport == null) return;
_sendMessage({
'type': 'consume',
'producerId': producerId,
'rtpCapabilities': _device!.rtpCapabilities.toMap(),
});
} catch (e) {
debugPrint('Error handling new producer: $e');
}
}
Future<void> _handleExistingProducers(Map<String, dynamic> message) async {
try {
final producers = message['producers'] as List<dynamic>;
if (_device == null || _recvTransport == null) {
await Future.delayed(Duration(milliseconds: 500));
if (_device == null || _recvTransport == null) return;
}
for (final producer in producers) {
final producerId = producer['producerId'];
await Future.delayed(Duration(milliseconds: 100));
_sendMessage({
'type': 'consume',
'producerId': producerId,
'rtpCapabilities': _device!.rtpCapabilities.toMap(),
});
}
} catch (e) {
debugPrint('Error handling existing producers: $e');
}
}
Future<void> _handleConsumed(Map<String, dynamic> message) async {
try {
final consumerId = message['id'];
final producerId = message['producerId'];
final kind = message['kind'];
final rtpParameters = message['rtpParameters'];
final RTCRtpMediaType mediaType;
switch (kind) {
case 'audio':
mediaType = RTCRtpMediaType.RTCRtpMediaTypeAudio;
break;
case 'video':
mediaType = RTCRtpMediaType.RTCRtpMediaTypeVideo;
break;
case 'data':
mediaType = RTCRtpMediaType.RTCRtpMediaTypeData;
break;
default:
throw Exception('Unknown media type: $kind');
}
_recvTransport!.consume(
id: consumerId,
producerId: producerId,
kind: mediaType,
rtpParameters: RtpParameters.fromMap(rtpParameters),
peerId: _peerId!,
);
_sendMessage({
'type': 'resume',
'consumerId': consumerId,
});
} catch (e) {
debugPrint('Error handling consumed: $e');
}
}
void _sendMessage(Map<String, dynamic> message) {
if (_channel != null) {
_channel!.sink.add(jsonEncode(message));
}
}
void _cleanup() {
_recvTransport?.close();
_recvTransport = null;
for (final consumer in _consumers.values) {
consumer.close();
}
_consumers.clear();
_device = null;
_peerId = null;
_pendingTransportParams.clear();
}
void disconnect() {
_channel?.sink.close();
_channel = null;
_cleanup();
_updateStatus('disconnected');
}
void dispose() {
disconnect();
_statusController.close();
_logController.close();
_trackController.close();
}
}
import 'package:flutter/material.dart';
import 'signaling.dart';
import 'package:mediasfu_mediasoup_client/mediasfu_mediasoup_client.dart';
class ListenerPage extends StatefulWidget {
const ListenerPage({super.key});
@override
State<ListenerPage> createState() => _ListenerPageState();
}
class _ListenerPageState extends State<ListenerPage> {
final SignalingService _signalingService = SignalingService();
RTCVideoRenderer? _videoRenderer;
String _status = 'disconnected';
final String _serverUrl = 'ws://localhost:8000';
@override
void initState() {
super.initState();
_setupSignalingListeners();
_autoConnect();
}
void _setupSignalingListeners() {
_signalingService.statusStream.listen((status) {
setState(() {
_status = status;
});
});
_signalingService.trackStream.listen((track) async {
if (track.kind == 'video') {
try {
if (_videoRenderer != null) {
_videoRenderer!.srcObject?.dispose();
_videoRenderer!.dispose();
}
final renderer = RTCVideoRenderer();
await renderer.initialize();
final stream = await createLocalMediaStream(
'video_${DateTime.now().millisecondsSinceEpoch}');
await stream.addTrack(track);
renderer.srcObject = stream;
if (mounted) {
setState(() {
_videoRenderer = renderer;
});
}
} catch (e) {
if (mounted) {
ScaffoldMessenger.of(context).showSnackBar(
SnackBar(content: Text('映像の初期化に失敗しました: $e')),
);
}
}
}
});
}
Future<void> _autoConnect() async {
try {
await _signalingService.connect(_serverUrl);
} catch (e) {
if (mounted) {
ScaffoldMessenger.of(context).showSnackBar(
SnackBar(content: Text('接続に失敗しました: $e')),
);
}
}
}
void _autoDisconnect() {
_signalingService.disconnect();
if (_videoRenderer != null) {
_videoRenderer!.srcObject?.dispose();
_videoRenderer!.dispose();
}
if (mounted) {
setState(() {
_videoRenderer = null;
});
}
}
@override
void dispose() {
_autoDisconnect();
_signalingService.dispose();
super.dispose();
}
@override
Widget build(BuildContext context) {
return Scaffold(
appBar: AppBar(
title: Text(
'ライブ視聴',
style: TextStyle(
fontSize: 18,
fontWeight: FontWeight.bold,
),
),
backgroundColor: _getStatusColor(),
),
body: Column(
mainAxisAlignment: MainAxisAlignment.start,
children: [
Container(
padding: EdgeInsets.all(16),
child: Text(
'ステータス: ${_getStatusText()}',
style: TextStyle(
fontSize: 18,
fontWeight: FontWeight.bold,
color: _getStatusColor(),
),
),
),
Expanded(
child: _videoRenderer == null
? Center(
child: Text(
_status == 'connected' ? '配信を待機中...' : '接続中...',
style: TextStyle(color: Colors.grey, fontSize: 18),
),
)
: RTCVideoView(
_videoRenderer!,
objectFit:
RTCVideoViewObjectFit.RTCVideoViewObjectFitContain,
),
),
],
),
);
}
String _getStatusText() {
switch (_status) {
case 'connected':
return '接続済み';
case 'connecting':
return '接続中';
case 'error':
return 'エラー';
case 'disconnected':
return '切断済み';
default:
return '不明';
}
}
Color _getStatusColor() {
switch (_status) {
case 'connected':
return Colors.green;
case 'connecting':
return Colors.orange;
case 'error':
return Colors.red;
default:
return Colors.grey;
}
}
}
さいごに
今回はローカルマシン内で動作するmediasoupを使ったSFU型のWebRTCサーバーと動画配信Webサイト、配信視聴flutterアプリを作成しました。
今後の展望としては、flutterアプリから配信をできるようにする(simulatorではカメラが使えなかったため今回は断念)ことと、実機での動作確認です。
特に実機での動作確認ではネットワーク周りを考慮する(NAT超えやそれに関するSTUNサーバー等の設定など)必要が出てくるの思うので検証してみたいと思っています。
最後まで記事を読んでくださりありがとうございました。