1.Introduction
いつもながらアホっぽい実装を行います。 Python + Coral Edge TPU Accelerator 3本を使用して推論を並列処理し、物体検出を高速化します。 動画撮影と各TPUの処理を全てMultiProcessで非同期並列処理します。 処理の実行中は、3本のTPUの白色のランプの部分が同時にメカメカ光ります。 なお、カメラの撮影レートが遅すぎて 80FPS ほどの性能をロスしていると思います。 150FPS 程度のUSBカメラなら、TPUは2本で十分です。
処理の前後関係は特に意識しない雑な実装です。 説明は特に不要だと思いますので、結果の動画をご覧ください。 動画の撮影レートが 60 FPS ですので、下の動画よりも実際の動作速度は 3倍 速いパフォーマンスが出ています。面倒でしたので、撮影時にハードウェアエンコーダは使用していません。
Multi-TPU (Edge TPU 3本) を MobileNetV2-SSD + Python で実装してみました。 必要以上に高速です。 カメラの撮影性能の上限が 150 FPS ですので、処理性能上の限界以上まで速くなりました。 @Nextremer_nb_o さんのDeeplabV3の実装をMulti-TPUに置き換えてみたい😆https://t.co/XRf8Qw7mQL
— PINTO0309 (@PINTO03091) August 11, 2019
**<おまけ, DeeplabV3 Semantic Segmentation, Coral Edge TPU x3 boosted, 320x240>** **https://github.com/NobuoTsukamoto/edge_tpu.git**
DeeplabV3, Edge TPU x3 boosted, 320x240, about 45 FPShttps://t.co/geoikJ1Jc4
— PINTO0309 (@PINTO03091) August 12, 2019
2.Environment
- Ubuntu 16.04 x86_64, USB 3.0
- Coral Edge TPU Accelerator x 3本
- Python 3.5.3
- mobilenet_ssd_v2_coco_quant_postprocess_edgetpu.tflite
- edgetpu runtime 2.11.1
- OpenCV 4.1.1-openvino
- USB Camera (Playstationeye, 320x240, 150 FPS)
- Self-powered USB3.0 HUB
3.Implementation
Pythonのロジック内で空きのTPUを自動的に検出し、別々のプロセスへ空きTPUを割り当てながら並列に処理を実行します。 Edge TPUが発売された半年前に実装はほぼ終わっていましたが、 Tensorflow のビルドに夢中になりすぎてずっと放置していました。 バージョンが上がった最新のAPIに適合させるために少しだけテーラリングしています。 すごくシンプルな実装ですので、他のモデルにも簡単に応用が効きますね。
リソース一式はページ最上部の巨大なニコちゃんマークのリンク先のGithubリポジトリからダウンロード可能です。
import argparse
import platform
import numpy as np
import cv2
import time
from PIL import Image
from time import sleep
import multiprocessing as mp
from edgetpu.detection.engine import DetectionEngine
from edgetpu.basic import edgetpu_utils
lastresults = None
processes = []
frameBuffer = None
results = None
fps = ""
detectfps = ""
framecount = 0
detectframecount = 0
time1 = 0
time2 = 0
box_color = (255, 128, 0)
box_thickness = 1
label_background_color = (125, 175, 75)
label_text_color = (255, 255, 255)
percentage = 0.0
# Function to read labels from text files.
def ReadLabelFile(file_path):
with open(file_path, 'r') as f:
lines = f.readlines()
ret = {}
for line in lines:
pair = line.strip().split(maxsplit=1)
ret[int(pair[0])] = pair[1].strip()
return ret
def camThread(label, results, frameBuffer, camera_width, camera_height, vidfps, usbcamno):
global fps
global detectfps
global framecount
global detectframecount
global time1
global time2
global lastresults
global cam
global window_name
cam = cv2.VideoCapture(usbcamno)
cam.set(cv2.CAP_PROP_FPS, vidfps)
cam.set(cv2.CAP_PROP_FRAME_WIDTH, camera_width)
cam.set(cv2.CAP_PROP_FRAME_HEIGHT, camera_height)
window_name = "USB Camera"
cv2.namedWindow(window_name, cv2.WINDOW_AUTOSIZE)
while True:
t1 = time.perf_counter()
ret, color_image = cam.read()
if not ret:
continue
if frameBuffer.full():
frameBuffer.get()
frames = color_image
frameBuffer.put(color_image.copy())
res = None
if not results.empty():
res = results.get(False)
detectframecount += 1
imdraw = overlay_on_image(frames, res, label, camera_width, camera_height)
lastresults = res
else:
imdraw = overlay_on_image(frames, lastresults, label, camera_width, camera_height)
cv2.imshow('USB Camera', imdraw)
if cv2.waitKey(1)&0xFF == ord('q'):
break
# FPS calculation
framecount += 1
if framecount >= 15:
fps = "(Playback) {:.1f} FPS".format(time1/15)
detectfps = "(Detection) {:.1f} FPS".format(detectframecount/time2)
framecount = 0
detectframecount = 0
time1 = 0
time2 = 0
t2 = time.perf_counter()
elapsedTime = t2-t1
time1 += 1/elapsedTime
time2 += elapsedTime
def inferencer(results, frameBuffer, model, camera_width, camera_height):
engine = None
# Acquisition of TPU list without model assignment
devices = edgetpu_utils.ListEdgeTpuPaths(edgetpu_utils.EDGE_TPU_STATE_UNASSIGNED)
devopen = False
for device in devices:
try:
engine = DetectionEngine(model, device)
devopen = True
break
except:
continue
if devopen == False:
print("TPU Devices open Error!!!")
sys.exit(1)
print("Loaded Graphs!!! ")
while True:
if frameBuffer.empty():
continue
# Run inference.
color_image = frameBuffer.get()
prepimg = color_image[:, :, ::-1].copy()
prepimg = Image.fromarray(prepimg)
tinf = time.perf_counter()
ans = engine.DetectWithImage(prepimg, threshold=0.5, keep_aspect_ratio=True, relative_coord=False, top_k=10)
print(time.perf_counter() - tinf, "sec")
results.put(ans)
def overlay_on_image(frames, object_infos, label, camera_width, camera_height):
color_image = frames
if isinstance(object_infos, type(None)):
return color_image
img_cp = color_image.copy()
for obj in object_infos:
box = obj.bounding_box.flatten().tolist()
box_left = int(box[0])
box_top = int(box[1])
box_right = int(box[2])
box_bottom = int(box[3])
cv2.rectangle(img_cp, (box_left, box_top), (box_right, box_bottom), box_color, box_thickness)
percentage = int(obj.score * 100)
label_text = label[obj.label_id] + " (" + str(percentage) + "%)"
label_size = cv2.getTextSize(label_text, cv2.FONT_HERSHEY_SIMPLEX, 0.5, 1)[0]
label_left = box_left
label_top = box_top - label_size[1]
if (label_top < 1):
label_top = 1
label_right = label_left + label_size[0]
label_bottom = label_top + label_size[1]
cv2.rectangle(img_cp, (label_left - 1, label_top - 1), (label_right + 1, label_bottom + 1), label_background_color, -1)
cv2.putText(img_cp, label_text, (label_left, label_bottom), cv2.FONT_HERSHEY_SIMPLEX, 0.5, label_text_color, 1)
cv2.putText(img_cp, fps, (camera_width-170,15), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (38,0,255), 1, cv2.LINE_AA)
cv2.putText(img_cp, detectfps, (camera_width-170,30), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (38,0,255), 1, cv2.LINE_AA)
return img_cp
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument("--model", default="mobilenet_ssd_v2_coco_quant_postprocess_edgetpu.tflite", help="Path of the detection model.")
parser.add_argument("--label", default="coco_labels.txt", help="Path of the labels file.")
parser.add_argument("--usbcamno", type=int, default=0, help="USB Camera number.")
args = parser.parse_args()
model = args.model
label = ReadLabelFile(args.label)
usbcamno = args.usbcamno
camera_width = 320
camera_height = 240
vidfps = 150
try:
mp.set_start_method('forkserver')
frameBuffer = mp.Queue(10)
results = mp.Queue()
# Start streaming
p = mp.Process(target=camThread,
args=(label, results, frameBuffer, camera_width, camera_height, vidfps, usbcamno),
daemon=True)
p.start()
processes.append(p)
# Activation of inferencer
devices = edgetpu_utils.ListEdgeTpuPaths(edgetpu_utils.EDGE_TPU_STATE_UNASSIGNED)
for devnum in range(len(devices)):
p = mp.Process(target=inferencer,
args=(results, frameBuffer, model, camera_width, camera_height),
daemon=True)
p.start()
processes.append(p)
while True:
sleep(1)
finally:
for p in range(len(processes)):
processes[p].terminate()
[150 FPS ++] Connect three Coral Edge TPU accelerators and infer in parallel processing to get ultra-fast object detection inference performance ーTo the extreme of useless high performanceー
1.Introduction
Use Python + Coral Edge TPU Accelerator x3 to parallelize inferences and speed up object detection. Asynchronous parallel processing uses MultiProcess to process each TPU and video recording. I think that the shooting rate of the camera is too slow, and the performance of 80FPS is lost. For a USB camera of about 150FPS, two TPUs are sufficient.
This is a crude implementation that is not particularly aware of the processing context. I don’t think there’s any need for an explanation, so watch the video. Since the shooting rate of the movie is 60 FPS, the actual operation speed is 3 times faster than the movie below. No hardware encoder is used during shooting.
I implemented Multi-TPU (3 Edge TPUs) with MobileNetV2-SSD + Python. Faster than necessary. Since the upper limit of the camera's shooting performance is 150 FPS, it has become faster than the limit on processing performance. @Nextremer_nb_o I want to replace DeeplabV3 implementation with Multi-TPU.😆https://t.co/XRf8Qw7mQL
— PINTO0309 (@PINTO03091) August 11, 2019
<Appendix, DeeplabV3 Semantic Segmentation, Coral Edge TPU x3 boosted, 320x240>
https://github.com/NobuoTsukamoto/edge_tpu.git
DeeplabV3, Edge TPU x3 boosted, 320x240, about 45 FPShttps://t.co/geoikJ1Jc4
— PINTO0309 (@PINTO03091) August 12, 2019
2.Environment
- Ubuntu 16.04 x86_64, USB 3.0
- Coral Edge TPU Accelerator x3 pieces
- Python 3.5.3
- mobilenet_ssd_v2_coco_quant_postprocess_edgetpu.tflite
- edgetpu runtime 2.11.1
- OpenCV 4.1.1-openvino
- USB Camera (Playstationeye, 320x240, 150 FPS)
- Self-powered USB3.0 HUB
3.Implementation
Free TPU is automatically detected in Python logic, and processes are executed in parallel while assigning free TPUs to different processes. Half a year ago the logic was complete, but I was so obsessed with building Tensorflow that I left it alone. Because it is a very simple implementation, it can be easily applied to other models.
import argparse
import platform
import numpy as np
import cv2
import time
from PIL import Image
from time import sleep
import multiprocessing as mp
from edgetpu.detection.engine import DetectionEngine
from edgetpu.basic import edgetpu_utils
lastresults = None
processes = []
frameBuffer = None
results = None
fps = ""
detectfps = ""
framecount = 0
detectframecount = 0
time1 = 0
time2 = 0
box_color = (255, 128, 0)
box_thickness = 1
label_background_color = (125, 175, 75)
label_text_color = (255, 255, 255)
percentage = 0.0
# Function to read labels from text files.
def ReadLabelFile(file_path):
with open(file_path, 'r') as f:
lines = f.readlines()
ret = {}
for line in lines:
pair = line.strip().split(maxsplit=1)
ret[int(pair[0])] = pair[1].strip()
return ret
def camThread(label, results, frameBuffer, camera_width, camera_height, vidfps, usbcamno):
global fps
global detectfps
global framecount
global detectframecount
global time1
global time2
global lastresults
global cam
global window_name
cam = cv2.VideoCapture(usbcamno)
cam.set(cv2.CAP_PROP_FPS, vidfps)
cam.set(cv2.CAP_PROP_FRAME_WIDTH, camera_width)
cam.set(cv2.CAP_PROP_FRAME_HEIGHT, camera_height)
window_name = "USB Camera"
cv2.namedWindow(window_name, cv2.WINDOW_AUTOSIZE)
while True:
t1 = time.perf_counter()
ret, color_image = cam.read()
if not ret:
continue
if frameBuffer.full():
frameBuffer.get()
frames = color_image
frameBuffer.put(color_image.copy())
res = None
if not results.empty():
res = results.get(False)
detectframecount += 1
imdraw = overlay_on_image(frames, res, label, camera_width, camera_height)
lastresults = res
else:
imdraw = overlay_on_image(frames, lastresults, label, camera_width, camera_height)
cv2.imshow('USB Camera', imdraw)
if cv2.waitKey(1)&0xFF == ord('q'):
break
# FPS calculation
framecount += 1
if framecount >= 15:
fps = "(Playback) {:.1f} FPS".format(time1/15)
detectfps = "(Detection) {:.1f} FPS".format(detectframecount/time2)
framecount = 0
detectframecount = 0
time1 = 0
time2 = 0
t2 = time.perf_counter()
elapsedTime = t2-t1
time1 += 1/elapsedTime
time2 += elapsedTime
def inferencer(results, frameBuffer, model, camera_width, camera_height):
engine = None
# Acquisition of TPU list without model assignment
devices = edgetpu_utils.ListEdgeTpuPaths(edgetpu_utils.EDGE_TPU_STATE_UNASSIGNED)
devopen = False
for device in devices:
try:
engine = DetectionEngine(model, device)
devopen = True
break
except:
continue
if devopen == False:
print("TPU Devices open Error!!!")
sys.exit(1)
print("Loaded Graphs!!! ")
while True:
if frameBuffer.empty():
continue
# Run inference.
color_image = frameBuffer.get()
prepimg = color_image[:, :, ::-1].copy()
prepimg = Image.fromarray(prepimg)
tinf = time.perf_counter()
ans = engine.DetectWithImage(prepimg, threshold=0.5, keep_aspect_ratio=True, relative_coord=False, top_k=10)
print(time.perf_counter() - tinf, "sec")
results.put(ans)
def overlay_on_image(frames, object_infos, label, camera_width, camera_height):
color_image = frames
if isinstance(object_infos, type(None)):
return color_image
img_cp = color_image.copy()
for obj in object_infos:
box = obj.bounding_box.flatten().tolist()
box_left = int(box[0])
box_top = int(box[1])
box_right = int(box[2])
box_bottom = int(box[3])
cv2.rectangle(img_cp, (box_left, box_top), (box_right, box_bottom), box_color, box_thickness)
percentage = int(obj.score * 100)
label_text = label[obj.label_id] + " (" + str(percentage) + "%)"
label_size = cv2.getTextSize(label_text, cv2.FONT_HERSHEY_SIMPLEX, 0.5, 1)[0]
label_left = box_left
label_top = box_top - label_size[1]
if (label_top < 1):
label_top = 1
label_right = label_left + label_size[0]
label_bottom = label_top + label_size[1]
cv2.rectangle(img_cp, (label_left - 1, label_top - 1), (label_right + 1, label_bottom + 1), label_background_color, -1)
cv2.putText(img_cp, label_text, (label_left, label_bottom), cv2.FONT_HERSHEY_SIMPLEX, 0.5, label_text_color, 1)
cv2.putText(img_cp, fps, (camera_width-170,15), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (38,0,255), 1, cv2.LINE_AA)
cv2.putText(img_cp, detectfps, (camera_width-170,30), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (38,0,255), 1, cv2.LINE_AA)
return img_cp
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument("--model", default="mobilenet_ssd_v2_coco_quant_postprocess_edgetpu.tflite", help="Path of the detection model.")
parser.add_argument("--label", default="coco_labels.txt", help="Path of the labels file.")
parser.add_argument("--usbcamno", type=int, default=0, help="USB Camera number.")
args = parser.parse_args()
model = args.model
label = ReadLabelFile(args.label)
usbcamno = args.usbcamno
camera_width = 320
camera_height = 240
vidfps = 150
try:
mp.set_start_method('forkserver')
frameBuffer = mp.Queue(10)
results = mp.Queue()
# Start streaming
p = mp.Process(target=camThread,
args=(label, results, frameBuffer, camera_width, camera_height, vidfps, usbcamno),
daemon=True)
p.start()
processes.append(p)
# Activation of inferencer
devices = edgetpu_utils.ListEdgeTpuPaths(edgetpu_utils.EDGE_TPU_STATE_UNASSIGNED)
for devnum in range(len(devices)):
p = mp.Process(target=inferencer,
args=(results, frameBuffer, model, camera_width, camera_height),
daemon=True)
p.start()
processes.append(p)
while True:
sleep(1)
finally:
for p in range(len(processes)):
processes[p].terminate()