1
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

Python aiortc に +tkinter で受信映像を表示 (ちゃんとfps出る)

Last updated at Posted at 2024-12-16

Python で WebRTC しよう…!

python aiortc を使うときに、tkinter で雑に viewer を実装するとfpsが出なかったので、ちゃんとめに実装しましたた
(python で webrtc ってややナンセンスそう? とかはさておき)

こんな感じ

  • aiortc の example を拡張しました
  • tkinter 周りは StreamDiffusion の example を参考にしました
  • コード等は github gist で公開しています
  • ローカル <-> サーバー通信も多少は動作確認しています
  • ⇧動画では 60fps になってますが、手元の同一PC内通信は 240fps までは出ることを確認しました

環境

  • windows + WSL / windows powershell
  • python <= 3.12 (大事 for pyav==12.3.0)
    • pip install aiortc opencv-python
    • pip install pytk or sudo apt install python3.10-tk など (場合による?)

WSL の場合

$ python offer.py
$ python answer.py

Powershell の場合

  • signaling の IO 周りでエラーが出るので少し⇧とはコマンドを変える
  • offer のあと 5秒くらい待ってから answer を実行すると良さそう)
% python .\offer.py --signaling tcp-socket --signaling-port 1234
% python .\answer.py --signaling tcp-socket --signaling-port 1234

コード

offer.py
import asyncio
import logging
import math
import cv2
import numpy
from aiortc import (
    RTCIceCandidate,
    RTCPeerConnection,
    RTCSessionDescription,
    VideoStreamTrack,
)
from aiortc.rtcrtpsender import RTCRtpSender
from aiortc.contrib.signaling import BYE, create_signaling
from av import VideoFrame


import aiortc.mediastreams
FPS = 240
aiortc.mediastreams.VIDEO_PTIME = 1 / FPS  # monkey patch


class FlagVideoStreamTrack(VideoStreamTrack):
    def __init__(self):
        super().__init__()  # don't forget this!
        self.counter = 0
        height, width = 480, 640

        # generate flag
        data_bgr = numpy.hstack(
            [
                self._create_rectangle(width=213, height=480, color=(255, 0, 0)),   # blue
                self._create_rectangle(width=214, height=480, color=(255, 255, 255)), # white
                self._create_rectangle(width=213, height=480, color=(0, 0, 255)),   # red
            ]
        )

        # shrink and center it
        M = numpy.float32([[0.5, 0, width / 4], [0, 0.5, height / 4]])
        data_bgr = cv2.warpAffine(data_bgr, M, (width, height))

        # compute animation
        omega = 2 * math.pi / height
        id_x = numpy.tile(numpy.array(range(width), dtype=numpy.float32), (height, 1))
        id_y = numpy.tile(
            numpy.array(range(height), dtype=numpy.float32), (width, 1)
        ).transpose()

        self.frames = []
        for k in range(FPS):
            phase = 2 * k * math.pi / FPS
            map_x = id_x + 10 * numpy.cos(omega * id_x + phase)
            map_y = id_y + 10 * numpy.sin(omega * id_x + phase)
            frame_bgr = cv2.remap(data_bgr, map_x, map_y, cv2.INTER_LINEAR)

            # draw frame numbers
            text = str(k)
            font = cv2.FONT_HERSHEY_SIMPLEX
            font_scale = 2.0
            color = (0, 0, 0) # black
            thickness = 2
            text_size, _ = cv2.getTextSize(text, font, font_scale, thickness)
            text_w, text_h = text_size
            text_x = (width - text_w) // 2
            text_y = (height + text_h) // 2

            cv2.putText(frame_bgr, text, (text_x, text_y), font, font_scale, color, thickness)
            self.frames.append(
                VideoFrame.from_ndarray(frame_bgr, format="bgr24")
            )

    async def recv(self):
        pts, time_base = await self.next_timestamp()
        frame = self.frames[self.counter % FPS]
        frame.pts = pts
        frame.time_base = time_base
        self.counter += 1
        return frame

    def _create_rectangle(self, width, height, color):
        data_bgr = numpy.zeros((height, width, 3), numpy.uint8)
        data_bgr[:, :] = color
        return data_bgr


async def print_stats_periodically(pc):
    while True:
        stats = await pc.getStats()
        print(stats)
        await asyncio.sleep(5)


async def run(pc, signaling):
    def force_codec(pc, sender, forced_codec):
        kind = forced_codec.split("/")[0]
        codecs = RTCRtpSender.getCapabilities(kind).codecs
        transceiver = next(t for t in pc.getTransceivers() if t.sender == sender)
        transceiver.setCodecPreferences(
            [codec for codec in codecs if codec.mimeType == forced_codec]
        )

    # 送信用映像トラック追加
    video_sender = pc.addTrack(FlagVideoStreamTrack())
    force_codec(pc, video_sender, "video/VP8")

    @pc.on("connectionstatechange")
    async def on_connectionstatechange():
        print("Connection state is %s" % pc.connectionState)
        if pc.connectionState == "connected":
            print("Connection established")
            asyncio.create_task(print_stats_periodically(pc))

    # signaling の接続開始
    await signaling.connect()

    # Offer作成と送信
    await pc.setLocalDescription(await pc.createOffer())
    await signaling.send(pc.localDescription)

    while True:
        obj = await signaling.receive()
        if isinstance(obj, RTCSessionDescription):
            # 相手からAnswer受信
            await pc.setRemoteDescription(obj)
        elif isinstance(obj, RTCIceCandidate):
            await pc.addIceCandidate(obj)
        elif obj is BYE:
            print("Exiting")
            break


if __name__ == "__main__":
    import argparse
    from aiortc.contrib.signaling import add_signaling_arguments

    parser = argparse.ArgumentParser(description="Offer side")
    parser.add_argument("--verbose", "-v", action="count")
    add_signaling_arguments(parser)
    args = parser.parse_args()

    if args.verbose:
        logging.basicConfig(level=logging.DEBUG)

    signaling = create_signaling(args)
    pc = RTCPeerConnection()

    loop = asyncio.get_event_loop()

    try:
        loop.run_until_complete(
            run(
                pc=pc,
                signaling=signaling,
            )
        )
    except KeyboardInterrupt:
        pass
    finally:
        loop.run_until_complete(signaling.close())
        loop.run_until_complete(pc.close())
answer.py
import argparse
import asyncio
import logging
import threading
import queue
import time
from PIL import Image, ImageTk
import cv2
import tkinter as tk


import json

from aiortc import (
    RTCIceCandidate,
    RTCPeerConnection,
    RTCSessionDescription,
)
from aiortc.contrib.signaling import BYE, create_signaling


class MediaConsumer:
    def __init__(self) -> None:
        self.__tracks = {}
        self.frame_queue = queue.Queue()

    def addTrack(self, track):
        if track not in self.__tracks:
            self.__tracks[track] = None

    async def start(self) -> None:
        for track, task in self.__tracks.items():
            if task is None:
                self.__tracks[track] = asyncio.ensure_future(self._display_frames(track))

    async def stop(self) -> None:
        for task in self.__tracks.values():
            if task is not None:
                task.cancel()
        self.__tracks = {}

    async def _display_frames(self, track):
        while True:
            try:
                frame = await track.recv()
            except Exception as e:
                print("Frame receiving stopped:", e)
                break

            img = frame.to_ndarray(format="bgr24")
            img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
            pil_img = Image.fromarray(img)
            # 最新フレームのみキープ
            while not self.frame_queue.empty():
                self.frame_queue.get_nowait()
            self.frame_queue.put(pil_img)

    def empty(self):
        return self.frame_queue.empty()

    def get_nowait(self):
        return self.frame_queue.get_nowait()


async def print_stats_periodically(pc):
    while True:
        stats = await pc.getStats()
        print(stats)
        await asyncio.sleep(5)


async def run(pc, recorder, signaling):
    @pc.on("track")
    def on_track(track):
        print("Receiving %s" % track.kind)
        if track.kind == "video":
            recorder.addTrack(track)

    @pc.on("connectionstatechange")
    async def on_connectionstatechange():
        print("Connection state is %s" % pc.connectionState)
        if pc.connectionState == "connected":
            print("Connection established")
            asyncio.create_task(print_stats_periodically(pc))

    # signaling の接続開始
    await signaling.connect()

    while True:
        try:
            # signaling._reader._limit = 6400  # useless
            obj = await signaling.receive()
        except json.decoder.JSONDecodeError as e:
            print(e)
            print("****************************************************")
            print("*** This may be due to a small N_TTY_BUF_SIZE.   ***")
            print("*** A simple solution is to set a short OFFER.   ***")
            print("****************************************************")
            raise

        if isinstance(obj, RTCSessionDescription):
            await pc.setRemoteDescription(obj)
            await recorder.start()

            if obj.type == "offer":
                # Answer返す
                await pc.setLocalDescription(await pc.createAnswer())
                await signaling.send(pc.localDescription)

        elif isinstance(obj, RTCIceCandidate):
            await pc.addIceCandidate(obj)

        elif obj is BYE:
            print("Exiting")
            break


def run_tk_window(recorder):
    root = tk.Tk()
    root.title("Viewer")
    label = tk.Label(root)
    label.pack()

    fps_label = tk.Label(root, text="FPS: --")
    fps_label.pack()

    frame_count = 0
    last_time = time.time()

    def update_label():
        nonlocal frame_count, last_time
        if not recorder.empty():
            pil_img = recorder.get_nowait()
            tk_img = ImageTk.PhotoImage(pil_img)
            label.config(image=tk_img)
            label.image = tk_img

            # FPS計測
            frame_count += 1
            now = time.time()
            elapsed = now - last_time
            if elapsed >= 1.0:
                fps = frame_count / elapsed
                fps_label.config(text=f"FPS: {fps:.2f}")
                frame_count = 0
                last_time = now

        label.after(1, update_label)

    update_label()
    root.mainloop()


if __name__ == "__main__":
    from aiortc.contrib.signaling import add_signaling_arguments

    parser = argparse.ArgumentParser(description="Answer side")
    parser.add_argument("--verbose", "-v", action="count")
    add_signaling_arguments(parser)
    args = parser.parse_args()

    if args.verbose:
        logging.basicConfig(level=logging.DEBUG)

    # signaling & pc
    signaling = create_signaling(args)
    pc = RTCPeerConnection()

    recorder = MediaConsumer()

    loop = asyncio.get_event_loop()

    def asyncio_thread():
        try:
            loop.run_until_complete(
                run(
                    pc=pc,
                    recorder=recorder,
                    signaling=signaling,
                )
            )
        except KeyboardInterrupt:
            pass
        finally:
            loop.run_until_complete(recorder.stop())
            loop.run_until_complete(signaling.close())
            loop.run_until_complete(pc.close())

    t = threading.Thread(target=asyncio_thread)
    t.start()

    # Tk表示(メインスレッド)
    run_tk_window(recorder)

    # Tkクローズ時イベントループ停止
    loop.call_soon_threadsafe(loop.stop)
    t.join()

以上!

  • 余談ですが、ちょっと頑張ると javascript (three.js 等) <-> python の webrtc もできます (いつか書く)
1
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?