Hi, here is the code. I cannot share the model but you can try with any quantized tflite model.
import multiprocessing
from multiprocessing import Queue, Process
import numpy as np
from threading import Thread
from random import random, randint
import tflite_runtime.interpreter as tflite
import time
import cv2
import os
import sys
import psutil
class ClassificationModel(object):
def __init__(self, path, mask_path=None):
self.interpreter = tflite.Interpreter(model_path=path)
self.interpreter.allocate_tensors()
self.input_details = self.interpreter.get_input_details()
self.output_details = self.interpreter.get_output_details()
self.input_shape = self.input_details[0]['shape']
def predict(self, img, resize=True):
if resize:
img = cv2.resize(img, (self.input_shape[2], self.input_shape[1]))
img = (img/255.0).astype(np.float32)
img = np.expand_dims(img, 0)
self.interpreter.set_tensor(self.input_details[0]['index'], img)
self.interpreter.invoke()
output = self.interpreter.get_tensor(self.output_details[0]['index'])
output = np.squeeze(output)
return output
def my_thread_1():
print("Start threaded task 1")
simulate_cpu_load()
print("Task 1 completed")
def worker():
while True:
pass
def simulate_cpu_load():
num_cores = multiprocessing.cpu_count()
processes = []
for _ in range(num_cores):
p = multiprocessing.Process(target=worker)
p.start()
processes.append(p)
time.sleep(3)
for p in processes:
p.terminate()
if __name__ == '__main__':
classifier_1 = ClassificationModel('mymodel.tflite)
cap = cv2.VideoCapture()
for i in range(5):
cap.open(i)
if cap.isOpened():
break
if not cap.isOpened():
print("Could not open camera")
exit()
try:
while True:
# get image
ret, img = cap.read()
# predict
p1 = classifier_1.predict(img)
print(p1)
# threaded task
if random() < 0.1:
t = Thread(target=my_thread_1)
t.start()
except(KeyboardInterrupt):
exit()