前回、こちらの記事でFRDM-IMX95を用いてYOLOを使った物体認識デモを紹介しました。その記事の中では1フレームの処理を1ループの中で全部シーケンシャルに実施し、12~15fps程度のフレームレートになっていました。
今回は、最適化を行い、フレームレートの大幅な向上を図ってみます。
上記の記事でYOLOv8mが動いている前提での説明します。
(作業時間:10分) *i.MX95カメラ+AI – YOLOv8mの例が完了している前提
というのが常套手段かと思います。
既存のデモでは、簡単に使えるハードウェアアクセラレータについてはすでに組み込んでいます。
3860x2160といった高い解像度の入力を、プレビュー表示に十分な1280x720へのリサイズをするところにimxvideoconvert_g2d=GPU 2Dを使っています。
今回は、まだ実施していないスレッド分割をやってみます。
まずはパイプライン構成の概念から説明します。
カメラ入力+AIでの画像認識では、大きく分けて3つのパートに分かれます。
これらが全部終わって、ようやく画像認識結果がディスプレイに表示されます。
仮に
かかったという例を考えます。トータル60msかかって、結果1000/60=16.6fpsとなるわけですね。
赤くかこったポストプロセスの箱が、ディスプレイに結果表示が行われるところです。
これを、画像入力スレッド、NPU推論スレッド、ポストプロセススレッドの3本のスレッドにわけると以下のようにできます。
一番時間のかかるスレッドのことをクリティカルパスと言ったりしますが、この場合NPU推論処理がクリティカルパスになりますが、1つのループにかかる時間がクリティカルパスの処理時間に短縮できます。最初のディスプレイ出力結果は上の例よりも遅くなる(=レイテンシが増える)ものの、2回目以降のディスプレイ出力は早くなり、結果的にフレームレートが高くなる、という結果が得られます。
これはCPUのパイプライン処理、周波数を上げる手法と全く同じものです。
フレームレートを高めることを至上目的とする場合、原理的にはAPI単独で一番時間がかかるものを特定してそのAPI単体のスレッドを作成、それ以外のスレッドは、先に特定したクリティカルパスのスレッドを超えないようにAPI、コードをまとめていく、というやり方になるかとは思います。ただその結果スレッドが増えることでのオーバーヘッドが大きくなってしまったり、スレッドの本数分だけレイテンシが追加されることになりますので、現実的なバランス、メンテナンスのしやすさを考慮してスレッド分割するのがいいですね。
上の例で示したような3つのスレッドに分ける実装をしてみました。
#!/usr/bin/env python3
import os
import time
import threading
import queue
import cv2
import numpy as np
import tflite_runtime.interpreter as tflite
# ============================================================
# 設定
# ============================================================
MODEL_PATH = "/usr/bin/tensorflow-lite-2.19.0/examples/yolov8m_int8_neutron.tflite"
LABEL_PATH = "/usr/bin/tensorflow-lite-2.19.0/examples/labels_yolov8.txt" # COCO 80 classes
CONF_TH = 0.60
IOU_TH = 0.45
MAX_DET = 50
MIN_BOX_W = 4
MIN_BOX_H = 4
GST_PIPELINE = (
"libcamerasrc "
"camera-name=/base/soc/bus@42000000/i2c@42540000/os08a20_mipi@36 ! "
"video/x-raw,format=YUY2,framerate=30/1,width=3840,height=2160 ! "
"imxvideoconvert_g2d rotation=4 ! "
"video/x-raw,width=1280,height=720,format=BGRA ! "
"appsink drop=true max-buffers=1"
)
# libcamera / TFLite 関連環境変数
os.environ["LIBCAMERA_IPA_MODULE_PATH"] = "/usr/lib/libcamera/ipa-nxp-neo-uguzzi/"
os.environ["LIBCAMERA_PIPELINES_MATCH_LIST"] = "nxp/neo,imx8-isi,uvc"
# Neutron 使用時は XNNPACK を無効化
os.environ["TFLITE_DISABLE_XNNPACK"] = "1"
DELEGATE_LIB = "/usr/lib/libneutron_delegate.so"
# ============================================================
# ユーティリティ
# ============================================================
def load_labels(path: str) -> list[str]:
with open(path, "r", encoding="utf-8") as f:
return [line.strip() for line in f]
def bbox_iou(box: np.ndarray, boxes: np.ndarray) -> np.ndarray:
"""1 つの box と複数 box 群との IoU を計算。"""
x1 = np.maximum(box[0], boxes[:, 0])
y1 = np.maximum(box[1], boxes[:, 1])
x2 = np.minimum(box[2], boxes[:, 2])
y2 = np.minimum(box[3], boxes[:, 3])
inter_w = np.maximum(0, x2 - x1)
inter_h = np.maximum(0, y2 - y1)
inter = inter_w * inter_h
area1 = (box[2] - box[0]) * (box[3] - box[1])
area2 = (boxes[:, 2] - boxes[:, 0]) * (boxes[:, 3] - boxes[:, 1])
union = area1 + area2 - inter + 1e-6
return inter / union
def nms(boxes: np.ndarray, scores: np.ndarray, iou_th: float) -> list[int]:
"""単純な NMS 実装。"""
idxs = np.argsort(scores)[::-1]
keep: list[int] = []
while len(idxs) > 0:
i = idxs[0]
keep.append(i)
if len(idxs) == 1:
break
ious = bbox_iou(boxes[i], boxes[idxs[1:]])
idxs = idxs[1:][ious < iou_th]
return keep
def sigmoid(x: np.ndarray) -> np.ndarray:
return 1.0 / (1.0 + np.exp(-x))
# ============================================================
# スレッド: 1. カメラキャプチャ (+ 前処理 + 入力テンソル作成)
# ============================================================
def capture_worker(
cap: cv2.VideoCapture,
frame_queue: "queue.Queue[tuple]",
stop_event: threading.Event,
in_h: int,
in_w: int,
input_dtype: np.dtype,
in_scale: float,
in_zero: float,
) -> None:
"""
cap.read() でフレームを取得し、前処理+入力テンソル作成まで行って frame_queue に流すスレッド。
キューには以下のタプルを入れる:
(frame_bgr, input_data, scale, left, top, w0, h0)
"""
canvas = np.full((in_h, in_w, 3), 114, dtype=np.uint8)
try:
while not stop_event.is_set():
ret, frame_bgra = cap.read()
if not ret:
print("Failed to read frame")
stop_event.set()
break
frame_bgr = cv2.cvtColor(frame_bgra, cv2.COLOR_BGRA2BGR)
h0, w0 = frame_bgr.shape[:2]
# 前処理: letterbox
scale = min(in_w / w0, in_h / h0)
nw, nh = int(w0 * scale), int(h0 * scale)
resized = cv2.resize(frame_bgr, (nw, nh))
canvas[:] = 114
top = (in_h - nh) // 2
left = (in_w - nw) // 2
canvas[top : top + nh, left : left + nw] = resized
img_rgb = canvas # 必要ならここで BGR→RGB に変更
# 入力テンソル作成
if input_dtype == np.uint8:
input_data = np.empty((1, in_h, in_w, 3), dtype=np.uint8)
input_data[0, ...] = img_rgb
else:
# int8 モデル
img_f = img_rgb.astype(np.float32) / 255.0
q = (img_f / in_scale + in_zero).astype(np.int8)
input_data = q[np.newaxis, ...] # (1, H, W, 3)
# 古いフレームを捨てて最新 1 枚だけキープ
try:
while True:
frame_queue.get_nowait()
except queue.Empty:
pass
try:
frame_queue.put(
(frame_bgr, input_data, scale, left, top, w0, h0),
timeout=0.01,
)
except queue.Full:
# ここに来ることはほぼないが、一応無視
pass
finally:
cap.release()
# ============================================================
# スレッド: 2. 推論 (interpreter.invoke のみ)
# ============================================================
def inference_worker(
frame_queue: "queue.Queue[tuple]",
result_queue: "queue.Queue[tuple]",
interpreter: tflite.Interpreter,
input_index: int,
output_index: int,
out_scale: float,
out_zero: float,
out_dtype: np.dtype,
stop_event: threading.Event,
) -> None:
"""
capture_worker から前処理済み入力テンソルを受け取り、
interpreter.invoke() と出力のデ量子化まで行うスレッド。
後処理用に out(=N×C), スケール情報などを result_queue に渡す。
"""
while not stop_event.is_set():
try:
frame_bgr, input_data, scale, left, top, w0, h0 = frame_queue.get(timeout=0.1)
except queue.Empty:
continue
# 推論
interpreter.set_tensor(input_index, input_data)
interpreter.invoke()
out_raw = interpreter.get_tensor(output_index)[0] # 期待形状: (84, 2100) 等
# 逆量子化
if np.issubdtype(out_dtype, np.integer):
out_f = (out_raw.astype(np.float32) - out_zero) * out_scale
else:
out_f = out_raw.astype(np.float32)
# 形状正規化: (84, N) または (N, 84) -> (N, 84)
if out_f.ndim == 3 and out_f.shape[0] == 1:
out_f = out_f[0]
if out_f.ndim == 2 and out_f.shape[0] in (84, 85):
out = out_f.reshape(out_f.shape[0], -1).T
elif out_f.ndim == 2 and out_f.shape[1] in (84, 85):
out = out_f
else:
out = out_f.reshape(-1, out_f.shape[-1])
# 後段用キューに結果を渡す(最新のみ保持)
try:
while True:
result_queue.get_nowait()
except queue.Empty:
pass
try:
result_queue.put(
(frame_bgr, out, scale, left, top, w0, h0),
timeout=0.01,
)
except queue.Full:
# ここに来ることはほぼないが、一応無視
pass
# ============================================================
# スレッド: 3. 後処理(NMS / 描画 / 表示)
# ============================================================
def postprocess_worker(
result_queue: "queue.Queue[tuple]",
labels: list[str],
in_h: int,
in_w: int,
stop_event: threading.Event,
) -> None:
"""
推論結果(out)を受け取り、後処理(sigmoid/閾値/NMS/描画)と表示を行うスレッド。
"""
fps = 0.0
prev_time = time.time()
window_name = "i.MX95 CPU (YOLOv8 via TFLite + OpenCV)"
while not stop_event.is_set():
try:
frame_bgr, out, scale, left, top, w0, h0 = result_queue.get(timeout=0.1)
except queue.Empty:
continue
# FPS 計算(表示ループベース)
now = time.time()
dt = now - prev_time
if dt > 0:
fps = 1.0 / dt
prev_time = now
num_det, num_ch = out.shape
boxes: list | np.ndarray = []
scores: list | np.ndarray = []
classes: list | np.ndarray = []
# YOLOv8 風: [cx, cy, w, h, cls_logits x 80]
if num_ch == 84:
cx = out[:, 0]
cy = out[:, 1]
w = out[:, 2]
h = out[:, 3]
cls_logits = out[:, 4:] # (N, 80)
cls_probs = sigmoid(cls_logits)
cls_id = np.argmax(cls_probs, axis=1)
conf = cls_probs[np.arange(num_det), cls_id]
# 信頼度しきい値
mask = conf >= CONF_TH
if np.any(mask):
cx = cx[mask]
cy = cy[mask]
w = w[mask]
h = h[mask]
conf = conf[mask]
cls_id = cls_id[mask]
# 0〜1 正規化座標 -> 入力解像度
x1 = (cx - w / 2.0) * in_w
y1 = (cy - h / 2.0) * in_h
x2 = (cx + w / 2.0) * in_w
y2 = (cy + h / 2.0) * in_h
# letterbox を元画像座標に補正
x1 = (x1 - left) / scale
y1 = (y1 - top) / scale
x2 = (x2 - left) / scale
y2 = (y2 - top) / scale
# ボックスの最小サイズでフィルタ
bw = x2 - x1
bh = y2 - y1
size_mask = (bw >= MIN_BOX_W) & (bh >= MIN_BOX_H)
if np.any(size_mask):
x1 = x1[size_mask]
y1 = y1[size_mask]
x2 = x2[size_mask]
y2 = y2[size_mask]
conf = conf[size_mask]
cls_id = cls_id[size_mask]
boxes = np.stack([x1, y1, x2, y2], axis=1).astype(np.float32)
scores = conf.astype(np.float32)
classes = cls_id.astype(np.int32)
# NMS & 描画
if isinstance(boxes, np.ndarray):
has_det = boxes.shape[0] > 0
else:
has_det = len(boxes) > 0
if has_det:
boxes = np.asarray(boxes, dtype=np.float32)
scores = np.asarray(scores, dtype=np.float32)
classes = np.asarray(classes, dtype=np.int32)
keep = nms(boxes, scores, IOU_TH)
keep = sorted(keep, key=lambda i: scores[i], reverse=True)[:MAX_DET]
for idx in keep:
x1, y1, x2, y2 = boxes[idx]
cls_id = int(classes[idx])
conf = float(scores[idx])
label = labels[cls_id] if 0 <= cls_id < len(labels) else f"id{cls_id}"
x1 = int(max(0, min(w0 - 1, x1)))
y1 = int(max(0, min(h0 - 1, y1)))
x2 = int(max(0, min(w0 - 1, x2)))
y2 = int(max(0, min(h0 - 1, y2)))
cv2.rectangle(frame_bgr, (x1, y1), (x2, y2), (0, 255, 0), 2)
cv2.putText(
frame_bgr,
f"{label}:{conf:.2f}",
(x1, y1 - 5),
cv2.FONT_HERSHEY_SIMPLEX,
0.6,
(0, 255, 0),
2,
)
cv2.putText(
frame_bgr,
f"FPS: {fps:.1f}",
(10, 30),
cv2.FONT_HERSHEY_SIMPLEX,
0.8,
(0, 255, 255),
2,
)
cv2.imshow(window_name, frame_bgr)
if (cv2.waitKey(1) & 0xFF) == ord("q"):
stop_event.set()
break
cv2.destroyAllWindows()
# ============================================================
# main
# ============================================================
def main() -> None:
# TFLite Interpreter 準備
delegate = tflite.load_delegate(DELEGATE_LIB)
interpreter = tflite.Interpreter(
model_path=MODEL_PATH,
experimental_delegates=[delegate],
)
interpreter.allocate_tensors()
input_details = interpreter.get_input_details()
output_details = interpreter.get_output_details()
print("Input details :", input_details)
print("Output details:", output_details)
input_index = input_details[0]["index"]
input_shape = input_details[0]["shape"] # [1, H, W, 3]
_, in_h, in_w, _ = input_shape
input_dtype = input_details[0]["dtype"]
output_index = output_details[0]["index"]
out_qparams = output_details[0]["quantization_parameters"]
out_scale = out_qparams["scales"][0] if len(out_qparams["scales"]) > 0 else 1.0
out_zero = out_qparams["zero_points"][0] if len(out_qparams["zero_points"]) > 0 else 0
out_dtype = output_details[0]["dtype"]
labels = load_labels(LABEL_PATH)
# 入力量子化パラメータ(int8 モデル用)
in_qparams = input_details[0]["quantization_parameters"]
in_scale = in_qparams["scales"][0] if len(in_qparams["scales"]) > 0 else 1.0
in_zero = in_qparams["zero_points"][0] if len(in_qparams["zero_points"]) > 0 else 0
# カメラ初期化
cap = cv2.VideoCapture(GST_PIPELINE, cv2.CAP_GSTREAMER)
if not cap.isOpened():
raise RuntimeError("Failed to open camera")
frame_queue: "queue.Queue[tuple]" = queue.Queue(maxsize=1)
result_queue: "queue.Queue[tuple]" = queue.Queue(maxsize=1)
stop_event = threading.Event()
# スレッド起動
t_capture = threading.Thread(
target=capture_worker,
args=(cap, frame_queue, stop_event, in_h, in_w, input_dtype, in_scale, in_zero),
daemon=True,
)
t_infer = threading.Thread(
target=inference_worker,
args=(
frame_queue,
result_queue,
interpreter,
input_index,
output_index,
out_scale,
out_zero,
out_dtype,
stop_event,
),
daemon=True,
)
t_post = threading.Thread(
target=postprocess_worker,
args=(result_queue, labels, in_h, in_w, stop_event),
daemon=True,
)
print("Press 'q' to quit")
t_capture.start()
t_infer.start()
t_post.start()
try:
while not stop_event.is_set():
time.sleep(0.1)
except KeyboardInterrupt:
stop_event.set()
t_capture.join()
t_infer.join()
t_post.join()
if __name__ == "__main__":
main()
この結果、同じモデルを使って、28~30fpsにまでフレームレートを上げることができました。カメラCMOSセンサからの入力が30fpsですので、狙うべきところまで達成できましたね(動画は割愛)。
各スレッドの処理にかかる時間を測定してみたところ、カメラキャプチャスレッドが33~36ms、NPUの推論スレッドが28~30ms、ポストプロセススレッドが26~30msと、偶然にもいいバランスで調整することができました。
今回はパフォーマンス向上の例としてマルチスレッドによるパイプライン化を試してみました。
レイテンシを許容できる前提とはなりますが、効果的なチューニング方法ではないかと思います。
※この記事には、AI生成コードをベースに投稿者が内容を確認した内容が含まれます。
=========================
本投稿の「Comment」欄にコメントをいただいても、現在返信に対応しておりません。
お手数をおかけしますが、お問い合わせの際には「NXPへの技術質問 - 問い合わせ方法 (日本語ブログ)」をご参照ください。
(既に弊社NXP代理店、もしくはNXPとお付き合いのある方は、直接担当者へご質問いただいてもかまいません。)
You must be a registered user to add a comment. If you've already registered, sign in. Otherwise, register and sign in.