はじめに
スマホからPCにリアルタイムで映像を送って、Pythonで処理したいと思いました。Pythonのaiortcというライブラリを使うことで実現できることが分かったので、その方法を紹介します。
今回は、スマホからPCに映像を送信し、Python(opencv)で二値化処理を行い、その結果を再びスマホに送信して表示するという一連の流れを作ってみます。
実装
aiortcとは
aiortcはPythonでWebRTCを扱うためのライブラリです。WebRTCは音声・映像・データをリアルタイムにやり取りするための技術です。
これを使えばスマホのカメラ映像をリアルタイムでPythonに送ることができます。
インストール方法はこちらです。aiohttpもインストールします(サーバーを立てるために必要)。
pip install aiortc aiohttp
流れ
今回は、スマホのブラウザ(Webアプリ)から映像を送受信します。
私のスマホはiphoneですが、iOSのネイティブアプリを作るにはMacが必要です。そこで今回はWebアプリにしました。
HTTPSじゃないとカメラが使えない
aiohttpでWebサーバーを立てると、デフォルトではHTTPになります。しかし、HTTPSでないと、スマホのカメラにアクセスできません(少なくともiosでは)。
HTTPSにするには自己署名証明書を作成する必要があります。以下のコマンドで自己署名証明書を作成できます(openssl はGit Bashなどで使用可能)
openssl req -x509 -newkey rsa:2048 -nodes -keyout key.pem -out cert.pem -days 365
このコマンドを実行すると、いくつか質問されますが適当に答えて大丈夫です。Common Name(CN)はlocalhostにしておきました。
実行後、key.pem(秘密鍵)とcert.pem(証明書)が作成されます。これを後で使います。
自己署名証明書だとブラウザから警告が出ますが、ローカルで個人利用する分には問題ないです。
コード
メインのファイルは以下の2つです。
- index.html
- スマホからアクセスするページ
- Pythonサーバーとの通信の確立と設定
- 映像の送受信
- 処理した映像を受け取って表示
- server.py
- サーバーの作成
- 映像の処理
さらに、key.pem(秘密鍵)とcert.pem(証明書)が必要です。
今回は、映像に対して二値化処理を行います。
コードはこちらです。
<!DOCTYPE html>
<html>
<head>
<meta charset="utf-8" />
<title>WebRTC送受信テスト</title>
<style>
video {
width: 100%;
max-width: 480px;
border: 2px solid #444;
}
</style>
</head>
<body>
<h2>📷 スマホのカメラ映像</h2>
<video id="localVideo" autoplay muted playsinline></video>
<h2>🌀 Pythonから再送信された映像</h2>
<video id="remoteVideo" autoplay playsinline></video>
<script>
// WebRTC接続
const pc = new RTCPeerConnection();
const localVideo = document.getElementById("localVideo");
const remoteVideo = document.getElementById("remoteVideo");
// 映像受信時の処理
pc.ontrack = (event) => {
remoteVideo.srcObject = event.streams[0];
};
async function start() {
// 設定
const constraints = {
video: { facingMode: { exact: "environment" } }, // "environment"は背面カメラ(自撮りじゃない方)
audio: false,
};
// カメラ映像取得(HTTPSである必要がある)
const stream = await navigator.mediaDevices.getUserMedia(
constraints
);
if (!stream) {
alert("カメラ映像の取得に失敗しました");
return;
}
localVideo.srcObject = stream;
// カメラ映像を送信
for (const track of stream.getTracks()) {
pc.addTrack(track, stream);
}
// SDP offer作成
const offer = await pc.createOffer();
await pc.setLocalDescription(offer);
// サーバーにSDP送信
const res = await fetch("/offer", {
method: "POST",
body: JSON.stringify(pc.localDescription),
headers: { "Content-Type": "application/json" },
});
const answer = await res.json();
await pc.setRemoteDescription(answer);
}
start();
</script>
</body>
</html>
import asyncio
import ssl
import cv2
from aiohttp import web
from aiortc import RTCPeerConnection, RTCSessionDescription, VideoStreamTrack
# 映像をPCで表示
def show_video(img):
cv2.imshow("WebRTC Video", img)
if cv2.waitKey(1) & 0xFF == ord("q"):
asyncio.get_event_loop().stop()
# 二値化
def binarize_image(img):
gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
# 大津の二値化
_, binary = cv2.threshold(gray, 0, 255, cv2.THRESH_BINARY + cv2.THRESH_OTSU)
return binary
# 受け取った映像ストリームを処理するクラス
class VideoTransformTrack(VideoStreamTrack):
kind = "video"
def __init__(self, track):
super().__init__()
self.track = track
async def recv(self):
frame = await self.track.recv()
img = frame.to_ndarray(format="bgr24")
bin_img = binarize_image(img)
# show_video(bin_img) # 映像を表示
img_bgr = cv2.cvtColor(bin_img, cv2.COLOR_GRAY2BGR) # RGBに戻す
# フレームとして再構成して返す
from av import VideoFrame
new_frame = VideoFrame.from_ndarray(img_bgr, format="bgr24")
new_frame.pts = frame.pts
new_frame.time_base = frame.time_base
return new_frame
# グローバル変数(1接続対応)
pcs = set()
async def offer(request):
params = await request.json()
offer = RTCSessionDescription(sdp=params["sdp"], type=params["type"])
pc = RTCPeerConnection()
pcs.add(pc)
@pc.on("track")
def on_track(track):
if track.kind == "video":
local_video = VideoTransformTrack(track)
pc.addTrack(local_video)
await pc.setRemoteDescription(offer)
answer = await pc.createAnswer()
await pc.setLocalDescription(answer)
return web.json_response({
"sdp": pc.localDescription.sdp,
"type": pc.localDescription.type
})
async def on_shutdown(app):
coros = [pc.close() for pc in pcs]
await asyncio.gather(*coros)
pcs.clear()
async def index(request):
content = open("index.html", "r", encoding="utf-8").read()
return web.Response(content_type="text/html", text=content)
def main():
app = web.Application()
app.on_shutdown.append(on_shutdown)
app.router.add_post("/offer", offer)
app.router.add_get("/", index) # index.htmlを提供
# SSL設定(HTTPSにするために必要)
ctx = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
ctx.load_cert_chain('cert.pem', keyfile='key.pem')
web.run_app(app, port=8080, ssl_context=ctx)
if __name__ == "__main__":
main()
実行
1. server.pyを実行
まず、server.pyを実行します。以下のような出力が出ると思います。
======== Running on https://0.0.0.0:8080 ========
(Press CTRL+C to quit)
2. スマホでページにアクセス
次に、スマホでhttps://<PCのローカルIP>:8080/
に接続しましょう。PCのIPアドレスはipconfig
コマンドで確認できます(Windowsの場合)。
自己署名証明書を使っているため、アクセスすると警告が出ますが、無視してアクセスしましょう。
Chromeの場合だと、以下の手順でアクセスできます:
- 下のほうにある[詳細設定]をクリック
- [localhostにアクセスする(安全ではありません)]をクリック
3. ちゃんと映像が処理されているか確認
ページにアクセスすると、カメラ使用の許可を求められるので許可しましょう。
上に元の映像が表示されます。下の映像はPCで処理されて戻ってきた映像です。今回は二値化処理をしているので、下の映像が白黒になっていたら成功です。
遅延もそんなになくていい感じです。
終わりに
今回はスマホからPCにリアルタイムで映像を送受信しました。
これを使えば、ARアプリや手書き図形認識アプリなどが作れそうで夢が広がります。