[FRDM-IMX95] i.MX 95 AIネットワークカメラの実現 (日本語ブログ)

cancel
Showing results for 
Show  only  | Search instead for 
Did you mean: 

[FRDM-IMX95] i.MX 95 AIネットワークカメラの実現 (日本語ブログ)

shigenobukatagi
NXP Employee
NXP Employee
0 0 72

はじめに

FRDM-IMX95を使ってネットワークカメラ、Yoloでの物体認識と最適化をそれぞれやってみました。

今回はこれらを組み合わせ、「AIネットワークカメラ」を実現してみます。

(作業時間:10分) *i.MX95ネットワークカメラカメラ+AIの最適化が完了している前提

AI_network_camera.png

 

Ubuntu PCのセットアップ

こちらの記事と同じことをして頂きます。
インストールは終わってる前提で、コマンドだけ貼っておきます。

gst-launch-1.0 \
udpsrc port=5000 caps="application/x-rtp,media=(string)video,encoding-name=(string)H265,payload=(int)96" ! \
rtpjitterbuffer latency=10 ! \
rtph265depay ! \
h265parse ! \
avdec_h265 ! \
videoconvert ! \
autovideosink sync=false

※Ubuntu PCのIPアドレスも忘れずに控えておいてください。

 

FRDM-IMX95のセットアップ

こちらも同様にi.MX95ネットワークカメラYOLOv8mの物体認識をご準備ください。

 

Pythonコード

こちらがコードです。YOLOの最適化のコードで、出力先をGstreamerのUDPSINKにしているだけ、です。UBUNTU_IPには、Ubuntu PCのIPアドレスをセットしてください。

#!/usr/bin/env python3
import os
import time
import threading
import queue

import cv2
import numpy as np
import tflite_runtime.interpreter as tflite

# ============================================================
# 設定
# ============================================================
MODEL_PATH = "/usr/bin/tensorflow-lite-2.19.0/examples/yolov8m_int8_neutron.tflite"
LABEL_PATH = "/usr/bin/tensorflow-lite-2.19.0/examples/labels_yolov8.txt"  # COCO 80 classes

CONF_TH = 0.60
IOU_TH = 0.45
MAX_DET = 50
MIN_BOX_W = 4
MIN_BOX_H = 4

# カメラ入力(i.MX95 + OS08A20)
GST_PIPELINE = (
    "libcamerasrc "
    "camera-name=/base/soc/bus@42000000/i2c@42540000/os08a20_mipi@36 ! "
    "video/x-raw,format=YUY2,framerate=30/1,width=3840,height=2160 ! "
    "imxvideoconvert_g2d rotation=4 ! "
    "video/x-raw,width=1280,height=720,format=BGRA ! "
    "appsink drop=true max-buffers=1"
)

# ネットワーク送信先(Ubuntu PC)
UBUNTU_IP = "192.168.0.10"   # ★Ubuntu 側の IP アドレスに変更
UBUNTU_PORT = 5000

# H.265 + RTP + UDP 送信用 GStreamer パイプライン
# v4l2h265enc の名前は BSP によって異なる場合あり(必要に応じて gst-inspect で確認)
GST_NET_SINK = (
    "appsrc ! "
    "videoconvert ! "
    "video/x-raw,format=NV12,width=1280,height=720,framerate=30/1 ! "
    "v4l2h265enc ! "
    "rtph265pay config-interval=-1 pt=96 ! "
    f"udpsink host={UBUNTU_IP} port={UBUNTU_PORT} sync=false"
)

# libcamera / TFLite 関連環境変数
os.environ["LIBCAMERA_IPA_MODULE_PATH"] = "/usr/lib/libcamera/ipa-nxp-neo-uguzzi/"
os.environ["LIBCAMERA_PIPELINES_MATCH_LIST"] = "nxp/neo,imx8-isi,uvc"
# Neutron 使用時は XNNPACK を無効化
os.environ["TFLITE_DISABLE_XNNPACK"] = "1"

DELEGATE_LIB = "/usr/lib/libneutron_delegate.so"

# ============================================================
# ユーティリティ
# ============================================================
def load_labels(path: str) -> list[str]:
    with open(path, "r", encoding="utf-8") as f:
        return [line.strip() for line in f]

def bbox_iou(box: np.ndarray, boxes: np.ndarray) -> np.ndarray:
    """1 つの box と複数 box 群との IoU を計算。"""
    x1 = np.maximum(box[0], boxes[:, 0])
    y1 = np.maximum(box[1], boxes[:, 1])
    x2 = np.minimum(box[2], boxes[:, 2])
    y2 = np.minimum(box[3], boxes[:, 3])

    inter_w = np.maximum(0, x2 - x1)
    inter_h = np.maximum(0, y2 - y1)
    inter = inter_w * inter_h

    area1 = (box[2] - box[0]) * (box[3] - box[1])
    area2 = (boxes[:, 2] - boxes[:, 0]) * (boxes[:, 3] - boxes[:, 1])
    union = area1 + area2 - inter + 1e-6

    return inter / union

def nms(boxes: np.ndarray, scores: np.ndarray, iou_th: float) -> list[int]:
    """単純な NMS 実装。"""
    idxs = np.argsort(scores)[::-1]
    keep: list[int] = []

    while len(idxs) > 0:
        i = idxs[0]
        keep.append(i)
        if len(idxs) == 1:
            break
        ious = bbox_iou(boxes[i], boxes[idxs[1:]])
        idxs = idxs[1:][ious < iou_th]

    return keep

def sigmoid(x: np.ndarray) -> np.ndarray:
    return 1.0 / (1.0 + np.exp(-x))

# ============================================================
# スレッド: 1. カメラキャプチャ (+ 前処理 + 入力テンソル作成)
# ============================================================
def capture_worker(
    cap: cv2.VideoCapture,
    frame_queue: queue.Queue,
    stop_event: threading.Event,
    in_h: int,
    in_w: int,
    input_dtype: np.dtype,
    in_scale: float,
    in_zero: float,
) -> None:
    """
    cap.read() でフレームを取得し、前処理+入力テンソル作成まで行って frame_queue に流すスレッド。
    キューには以下のタプルを入れる:
      (frame_bgr, input_data, scale, left, top, w0, h0)
    """
    canvas = np.full((in_h, in_w, 3), 114, dtype=np.uint8)

    try:
        while not stop_event.is_set():
            ret, frame_bgra = cap.read()
            if not ret:
                print("Failed to read frame")
                stop_event.set()
                break

            frame_bgr = cv2.cvtColor(frame_bgra, cv2.COLOR_BGRA2BGR)
            h0, w0 = frame_bgr.shape[:2]

            # 前処理: letterbox
            scale = min(in_w / w0, in_h / h0)
            nw, nh = int(w0 * scale), int(h0 * scale)

            resized = cv2.resize(frame_bgr, (nw, nh))
            canvas[:] = 114
            top = (in_h - nh) // 2
            left = (in_w - nw) // 2
            canvas[top : top + nh, left : left + nw] = resized

            img_rgb = canvas  # 必要ならここで BGR→RGB に変更

            # 入力テンソル作成
            if input_dtype == np.uint8:
                input_data = np.empty((1, in_h, in_w, 3), dtype=np.uint8)
                input_data[0, ...] = img_rgb
            else:
                # int8 モデル
                img_f = img_rgb.astype(np.float32) / 255.0
                q = (img_f / in_scale + in_zero).astype(np.int8)
                input_data = q[np.newaxis, ...]  # (1, H, W, 3)

            # 古いフレームを捨てて最新 1 枚だけキープ
            try:
                while True:
                    frame_queue.get_nowait()
            except queue.Empty:
                pass

            try:
                frame_queue.put(
                    (frame_bgr, input_data, scale, left, top, w0, h0),
                    timeout=0.01,
                )
            except queue.Full:
                pass
    finally:
        cap.release()

# ============================================================
# スレッド: 2. 推論 (interpreter.invoke のみ)
# ============================================================
def inference_worker(
    frame_queue: queue.Queue,
    result_queue: queue.Queue,
    interpreter: tflite.Interpreter,
    input_index: int,
    output_index: int,
    out_scale: float,
    out_zero: float,
    out_dtype: np.dtype,
    stop_event: threading.Event,
) -> None:
    """
    capture_worker から前処理済み入力テンソルを受け取り、
    interpreter.invoke() と出力のデ量子化まで行うスレッド。
    後処理用に out(=N×C), スケール情報などを result_queue に渡す。
    """
    while not stop_event.is_set():
        try:
            frame_bgr, input_data, scale, left, top, w0, h0 = frame_queue.get(timeout=0.1)
        except queue.Empty:
            continue

        # 推論
        interpreter.set_tensor(input_index, input_data)
        interpreter.invoke()

        out_raw = interpreter.get_tensor(output_index)[0]  # 期待形状: (84, 2100) 等

        # 逆量子化
        if np.issubdtype(out_dtype, np.integer):
            out_f = (out_raw.astype(np.float32) - out_zero) * out_scale
        else:
            out_f = out_raw.astype(np.float32)

        # 形状正規化: (84, N) または (N, 84) -> (N, 84)
        if out_f.ndim == 3 and out_f.shape[0] == 1:
            out_f = out_f[0]

        if out_f.ndim == 2 and out_f.shape[0] in (84, 85):
            out = out_f.reshape(out_f.shape[0], -1).T
        elif out_f.ndim == 2 and out_f.shape[1] in (84, 85):
            out = out_f
        else:
            out = out_f.reshape(-1, out_f.shape[-1])

        # 後段用キューに結果を渡す(最新のみ保持)
        try:
            while True:
                result_queue.get_nowait()
        except queue.Empty:
            pass

        try:
            result_queue.put(
                (frame_bgr, out, scale, left, top, w0, h0),
                timeout=0.01,
            )
        except queue.Full:
            pass

# ============================================================
# スレッド: 3. 後処理(NMS / 描画 / ネットワーク送信)
# ============================================================
def postprocess_worker(
    result_queue: queue.Queue,
    labels: list[str],
    in_h: int,
    in_w: int,
    stop_event: threading.Event,
    net_writer: cv2.VideoWriter,
) -> None:
    """
    推論結果(out)を受け取り、後処理(sigmoid/閾値/NMS/描画)と
    H.265 + RTP + UDP での送信を行うスレッド。
    """
    fps = 0.0
    prev_time = time.time()

    while not stop_event.is_set():
        try:
            frame_bgr, out, scale, left, top, w0, h0 = result_queue.get(timeout=0.1)
        except queue.Empty:
            continue

        # FPS 計算(送信ループベース)
        now = time.time()
        dt = now - prev_time
        if dt > 0:
            fps = 1.0 / dt
        prev_time = now

        num_det, num_ch = out.shape

        boxes: list | np.ndarray = []
        scores: list | np.ndarray = []
        classes: list | np.ndarray = []

        # YOLOv8 風: [cx, cy, w, h, cls_logits x 80]
        if num_ch == 84:
            cx = out[:, 0]
            cy = out[:, 1]
            w = out[:, 2]
            h = out[:, 3]

            cls_logits = out[:, 4:]  # (N, 80)
            cls_probs = sigmoid(cls_logits)
            cls_id = np.argmax(cls_probs, axis=1)
            conf = cls_probs[np.arange(num_det), cls_id]

            # 信頼度しきい値
            mask = conf >= CONF_TH
            if np.any(mask):
                cx = cx[mask]
                cy = cy[mask]
                w = w[mask]
                h = h[mask]
                conf = conf[mask]
                cls_id = cls_id[mask]

                # 0〜1 正規化座標 -> 入力解像度
                x1 = (cx - w / 2.0) * in_w
                y1 = (cy - h / 2.0) * in_h
                x2 = (cx + w / 2.0) * in_w
                y2 = (cy + h / 2.0) * in_h

                # letterbox を元画像座標に補正
                x1 = (x1 - left) / scale
                y1 = (y1 - top) / scale
                x2 = (x2 - left) / scale
                y2 = (y2 - top) / scale

                # ボックスの最小サイズでフィルタ
                bw = x2 - x1
                bh = y2 - y1
                size_mask = (bw >= MIN_BOX_W) & (bh >= MIN_BOX_H)

                if np.any(size_mask):
                    x1 = x1[size_mask]
                    y1 = y1[size_mask]
                    x2 = x2[size_mask]
                    y2 = y2[size_mask]
                    conf = conf[size_mask]
                    cls_id = cls_id[size_mask]

                    boxes = np.stack([x1, y1, x2, y2], axis=1).astype(np.float32)
                    scores = conf.astype(np.float32)
                    classes = cls_id.astype(np.int32)

        # NMS & 描画
        if isinstance(boxes, np.ndarray):
            has_det = boxes.shape[0] > 0
        else:
            has_det = len(boxes) > 0

        if has_det:
            boxes = np.asarray(boxes, dtype=np.float32)
            scores = np.asarray(scores, dtype=np.float32)
            classes = np.asarray(classes, dtype=np.int32)

            keep = nms(boxes, scores, IOU_TH)
            keep = sorted(keep, key=lambda i: scores[i], reverse=True)[:MAX_DET]

            for idx in keep:
                x1, y1, x2, y2 = boxes[idx]
                cls_id = int(classes[idx])
                conf = float(scores[idx])

                label = labels[cls_id] if 0 <= cls_id < len(labels) else f"id{cls_id}"

                x1 = int(max(0, min(w0 - 1, x1)))
                y1 = int(max(0, min(h0 - 1, y1)))
                x2 = int(max(0, min(w0 - 1, x2)))
                y2 = int(max(0, min(h0 - 1, y2)))

                cv2.rectangle(frame_bgr, (x1, y1), (x2, y2), (0, 255, 0), 2)
                cv2.putText(
                    frame_bgr,
                    f"{label}:{conf:.2f}",
                    (x1, y1 - 5),
                    cv2.FONT_HERSHEY_SIMPLEX,
                    0.6,
                    (0, 255, 0),
                    2,
                )

        cv2.putText(
            frame_bgr,
            f"FPS: {fps:.1f}",
            (10, 30),
            cv2.FONT_HERSHEY_SIMPLEX,
            0.8,
            (0, 255, 255),
            2,
        )

        # ネットワーク送信(H.265 + RTP + UDP)
        if net_writer is not None and net_writer.isOpened():
            net_writer.write(frame_bgr)

        # ローカル表示が不要ならコメントアウトのままで良い
        # cv2.imshow("YOLOv8 i.MX95", frame_bgr)
        # if (cv2.waitKey(1) & 0xFF) == ord("q"):
        #     stop_event.set()
        #     break

    # cv2.destroyAllWindows()

# ============================================================
# main
# ============================================================
def main() -> None:
    # TFLite Interpreter 準備
    delegate = tflite.load_delegate(DELEGATE_LIB)
    interpreter = tflite.Interpreter(
        model_path=MODEL_PATH,
        experimental_delegates=[delegate],
    )
    interpreter.allocate_tensors()

    input_details = interpreter.get_input_details()
    output_details = interpreter.get_output_details()

    print("Input details :", input_details)
    print("Output details:", output_details)

    input_index = input_details[0]["index"]
    input_shape = input_details[0]["shape"]  # [1, H, W, 3]
    _, in_h, in_w, _ = input_shape
    input_dtype = input_details[0]["dtype"]

    output_index = output_details[0]["index"]
    out_qparams = output_details[0]["quantization_parameters"]
    out_scale = out_qparams["scales"][0] if len(out_qparams["scales"]) > 0 else 1.0
    out_zero = out_qparams["zero_points"][0] if len(out_qparams["zero_points"]) > 0 else 0
    out_dtype = output_details[0]["dtype"]

    labels = load_labels(LABEL_PATH)

    # 入力量子化パラメータ(int8 モデル用)
    in_qparams = input_details[0]["quantization_parameters"]
    in_scale = in_qparams["scales"][0] if len(in_qparams["scales"]) > 0 else 1.0
    in_zero = in_qparams["zero_points"][0] if len(in_qparams["zero_points"]) > 0 else 0

    # カメラ初期化
    cap = cv2.VideoCapture(GST_PIPELINE, cv2.CAP_GSTREAMER)
    if not cap.isOpened():
        raise RuntimeError("Failed to open camera")

    # ネットワーク送信用 VideoWriter 作成
    net_writer = cv2.VideoWriter(
        GST_NET_SINK,
        cv2.CAP_GSTREAMER,
        0,
        30,
        (1280, 720),
        True,
    )
    if not net_writer.isOpened():
        print("Failed to open network VideoWriter (H.265 UDP)")
        net_writer = None

    frame_queue = queue.Queue(maxsize=1)
    result_queue = queue.Queue(maxsize=1)
    stop_event = threading.Event()

    # スレッド起動
    t_capture = threading.Thread(
        target=capture_worker,
        args=(cap, frame_queue, stop_event, in_h, in_w, input_dtype, in_scale, in_zero),
        daemon=True,
    )
    t_infer = threading.Thread(
        target=inference_worker,
        args=(
            frame_queue,
            result_queue,
            interpreter,
            input_index,
            output_index,
            out_scale,
            out_zero,
            out_dtype,
            stop_event,
        ),
        daemon=True,
    )
    t_post = threading.Thread(
        target=postprocess_worker,
        args=(result_queue, labels, in_h, in_w, stop_event, net_writer),
        daemon=True,
    )

    print("Running... (Ctrl+C to quit)")

    t_capture.start()
    t_infer.start()
    t_post.start()

    try:
        while not stop_event.is_set():
            time.sleep(0.1)
    except KeyboardInterrupt:
        stop_event.set()

    t_capture.join()
    t_infer.join()
    t_post.join()

    if net_writer is not None:
        net_writer.release()

if __name__ == "__main__":
    main()

(view in My Videos)

 

わりに

i.MX 95の得意なマルチメディア+エッジAIを組み合わせて、「AIネットワークカメラ」を実現してみました。
参考にして頂ければ幸いです。

※この記事には、AI生成コードをベースに投稿者が内容を確認した内容が含まれます。

 

=========================

本投稿の「Comment」欄にコメントをいただいても、現在返信に対応しておりません。
お手数をおかけしますが、お問い合わせの際には「NXPへの技術質問 問い合わせ方法 (日本語ブログ)」をご参照ください。
(既に弊社NXP代理店、もしくはNXPとお付き合いのある方は、直接担当者へご質問いただいてもかまいません。)