Python で WebRTC しよう…!
python aiortc を使うときに、tkinter で雑に viewer を実装するとfpsが出なかったので、ちゃんとめに実装しましたた
(python で webrtc ってややナンセンスそう? とかはさておき)
こんな感じ
- aiortc の example を拡張しました
- tkinter 周りは StreamDiffusion の example を参考にしました
- コード等は github gist で公開しています
- ローカル <-> サーバー通信も多少は動作確認しています
- ⇧動画では 60fps になってますが、手元の同一PC内通信は 240fps までは出ることを確認しました
環境
- windows + WSL / windows powershell
- python <= 3.12 (大事 for
pyav==12.3.0
)pip install aiortc opencv-python
-
pip install pytk
orsudo apt install python3.10-tk
など (場合による?)
WSL の場合
$ python offer.py
$ python answer.py
Powershell の場合
- signaling の IO 周りでエラーが出るので少し⇧とはコマンドを変える
- offer のあと 5秒くらい待ってから answer を実行すると良さそう)
% python .\offer.py --signaling tcp-socket --signaling-port 1234
% python .\answer.py --signaling tcp-socket --signaling-port 1234
コード
offer.py
import asyncio
import logging
import math
import cv2
import numpy
from aiortc import (
RTCIceCandidate,
RTCPeerConnection,
RTCSessionDescription,
VideoStreamTrack,
)
from aiortc.rtcrtpsender import RTCRtpSender
from aiortc.contrib.signaling import BYE, create_signaling
from av import VideoFrame
import aiortc.mediastreams
FPS = 240
aiortc.mediastreams.VIDEO_PTIME = 1 / FPS # monkey patch
class FlagVideoStreamTrack(VideoStreamTrack):
def __init__(self):
super().__init__() # don't forget this!
self.counter = 0
height, width = 480, 640
# generate flag
data_bgr = numpy.hstack(
[
self._create_rectangle(width=213, height=480, color=(255, 0, 0)), # blue
self._create_rectangle(width=214, height=480, color=(255, 255, 255)), # white
self._create_rectangle(width=213, height=480, color=(0, 0, 255)), # red
]
)
# shrink and center it
M = numpy.float32([[0.5, 0, width / 4], [0, 0.5, height / 4]])
data_bgr = cv2.warpAffine(data_bgr, M, (width, height))
# compute animation
omega = 2 * math.pi / height
id_x = numpy.tile(numpy.array(range(width), dtype=numpy.float32), (height, 1))
id_y = numpy.tile(
numpy.array(range(height), dtype=numpy.float32), (width, 1)
).transpose()
self.frames = []
for k in range(FPS):
phase = 2 * k * math.pi / FPS
map_x = id_x + 10 * numpy.cos(omega * id_x + phase)
map_y = id_y + 10 * numpy.sin(omega * id_x + phase)
frame_bgr = cv2.remap(data_bgr, map_x, map_y, cv2.INTER_LINEAR)
# draw frame numbers
text = str(k)
font = cv2.FONT_HERSHEY_SIMPLEX
font_scale = 2.0
color = (0, 0, 0) # black
thickness = 2
text_size, _ = cv2.getTextSize(text, font, font_scale, thickness)
text_w, text_h = text_size
text_x = (width - text_w) // 2
text_y = (height + text_h) // 2
cv2.putText(frame_bgr, text, (text_x, text_y), font, font_scale, color, thickness)
self.frames.append(
VideoFrame.from_ndarray(frame_bgr, format="bgr24")
)
async def recv(self):
pts, time_base = await self.next_timestamp()
frame = self.frames[self.counter % FPS]
frame.pts = pts
frame.time_base = time_base
self.counter += 1
return frame
def _create_rectangle(self, width, height, color):
data_bgr = numpy.zeros((height, width, 3), numpy.uint8)
data_bgr[:, :] = color
return data_bgr
async def print_stats_periodically(pc):
while True:
stats = await pc.getStats()
print(stats)
await asyncio.sleep(5)
async def run(pc, signaling):
def force_codec(pc, sender, forced_codec):
kind = forced_codec.split("/")[0]
codecs = RTCRtpSender.getCapabilities(kind).codecs
transceiver = next(t for t in pc.getTransceivers() if t.sender == sender)
transceiver.setCodecPreferences(
[codec for codec in codecs if codec.mimeType == forced_codec]
)
# 送信用映像トラック追加
video_sender = pc.addTrack(FlagVideoStreamTrack())
force_codec(pc, video_sender, "video/VP8")
@pc.on("connectionstatechange")
async def on_connectionstatechange():
print("Connection state is %s" % pc.connectionState)
if pc.connectionState == "connected":
print("Connection established")
asyncio.create_task(print_stats_periodically(pc))
# signaling の接続開始
await signaling.connect()
# Offer作成と送信
await pc.setLocalDescription(await pc.createOffer())
await signaling.send(pc.localDescription)
while True:
obj = await signaling.receive()
if isinstance(obj, RTCSessionDescription):
# 相手からAnswer受信
await pc.setRemoteDescription(obj)
elif isinstance(obj, RTCIceCandidate):
await pc.addIceCandidate(obj)
elif obj is BYE:
print("Exiting")
break
if __name__ == "__main__":
import argparse
from aiortc.contrib.signaling import add_signaling_arguments
parser = argparse.ArgumentParser(description="Offer side")
parser.add_argument("--verbose", "-v", action="count")
add_signaling_arguments(parser)
args = parser.parse_args()
if args.verbose:
logging.basicConfig(level=logging.DEBUG)
signaling = create_signaling(args)
pc = RTCPeerConnection()
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(
run(
pc=pc,
signaling=signaling,
)
)
except KeyboardInterrupt:
pass
finally:
loop.run_until_complete(signaling.close())
loop.run_until_complete(pc.close())
answer.py
import argparse
import asyncio
import logging
import threading
import queue
import time
from PIL import Image, ImageTk
import cv2
import tkinter as tk
import json
from aiortc import (
RTCIceCandidate,
RTCPeerConnection,
RTCSessionDescription,
)
from aiortc.contrib.signaling import BYE, create_signaling
class MediaConsumer:
def __init__(self) -> None:
self.__tracks = {}
self.frame_queue = queue.Queue()
def addTrack(self, track):
if track not in self.__tracks:
self.__tracks[track] = None
async def start(self) -> None:
for track, task in self.__tracks.items():
if task is None:
self.__tracks[track] = asyncio.ensure_future(self._display_frames(track))
async def stop(self) -> None:
for task in self.__tracks.values():
if task is not None:
task.cancel()
self.__tracks = {}
async def _display_frames(self, track):
while True:
try:
frame = await track.recv()
except Exception as e:
print("Frame receiving stopped:", e)
break
img = frame.to_ndarray(format="bgr24")
img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
pil_img = Image.fromarray(img)
# 最新フレームのみキープ
while not self.frame_queue.empty():
self.frame_queue.get_nowait()
self.frame_queue.put(pil_img)
def empty(self):
return self.frame_queue.empty()
def get_nowait(self):
return self.frame_queue.get_nowait()
async def print_stats_periodically(pc):
while True:
stats = await pc.getStats()
print(stats)
await asyncio.sleep(5)
async def run(pc, recorder, signaling):
@pc.on("track")
def on_track(track):
print("Receiving %s" % track.kind)
if track.kind == "video":
recorder.addTrack(track)
@pc.on("connectionstatechange")
async def on_connectionstatechange():
print("Connection state is %s" % pc.connectionState)
if pc.connectionState == "connected":
print("Connection established")
asyncio.create_task(print_stats_periodically(pc))
# signaling の接続開始
await signaling.connect()
while True:
try:
# signaling._reader._limit = 6400 # useless
obj = await signaling.receive()
except json.decoder.JSONDecodeError as e:
print(e)
print("****************************************************")
print("*** This may be due to a small N_TTY_BUF_SIZE. ***")
print("*** A simple solution is to set a short OFFER. ***")
print("****************************************************")
raise
if isinstance(obj, RTCSessionDescription):
await pc.setRemoteDescription(obj)
await recorder.start()
if obj.type == "offer":
# Answer返す
await pc.setLocalDescription(await pc.createAnswer())
await signaling.send(pc.localDescription)
elif isinstance(obj, RTCIceCandidate):
await pc.addIceCandidate(obj)
elif obj is BYE:
print("Exiting")
break
def run_tk_window(recorder):
root = tk.Tk()
root.title("Viewer")
label = tk.Label(root)
label.pack()
fps_label = tk.Label(root, text="FPS: --")
fps_label.pack()
frame_count = 0
last_time = time.time()
def update_label():
nonlocal frame_count, last_time
if not recorder.empty():
pil_img = recorder.get_nowait()
tk_img = ImageTk.PhotoImage(pil_img)
label.config(image=tk_img)
label.image = tk_img
# FPS計測
frame_count += 1
now = time.time()
elapsed = now - last_time
if elapsed >= 1.0:
fps = frame_count / elapsed
fps_label.config(text=f"FPS: {fps:.2f}")
frame_count = 0
last_time = now
label.after(1, update_label)
update_label()
root.mainloop()
if __name__ == "__main__":
from aiortc.contrib.signaling import add_signaling_arguments
parser = argparse.ArgumentParser(description="Answer side")
parser.add_argument("--verbose", "-v", action="count")
add_signaling_arguments(parser)
args = parser.parse_args()
if args.verbose:
logging.basicConfig(level=logging.DEBUG)
# signaling & pc
signaling = create_signaling(args)
pc = RTCPeerConnection()
recorder = MediaConsumer()
loop = asyncio.get_event_loop()
def asyncio_thread():
try:
loop.run_until_complete(
run(
pc=pc,
recorder=recorder,
signaling=signaling,
)
)
except KeyboardInterrupt:
pass
finally:
loop.run_until_complete(recorder.stop())
loop.run_until_complete(signaling.close())
loop.run_until_complete(pc.close())
t = threading.Thread(target=asyncio_thread)
t.start()
# Tk表示(メインスレッド)
run_tk_window(recorder)
# Tkクローズ時イベントループ停止
loop.call_soon_threadsafe(loop.stop)
t.join()
以上!
- 余談ですが、ちょっと頑張ると javascript (three.js 等) <-> python の webrtc もできます (いつか書く)