import time, os
import gi
from gi.repository import GLib, GObject, Gst, GstBase, Gtk
import numpy as np
gi.require_version("Gst", "1.0")
gi.require_version("GstBase", "1.0")
gi.require_version("Gtk", "3.0")
os.environ["GST_DEBUG"] = "*:2"
Gst.init(None)
pipeline = " ! ".join([
"v4l2src device=/dev/video0 io-mode=mmap",
"video/x-raw,format=YUY2,width=1920,height=1080,framerate=30/1",
"queue max-size-buffers=20 leaky=downstream",
"appsink name=appsink sync=false emit-signals=true max-buffers=10 drop=true"
])
pipeline = Gst.parse_launch(pipeline)
tic = time.perf_counter()
def on_new_sample(sink, preroll):
global tic
toc = time.perf_counter()
print("new sample", f"{(toc - tic)*1000}ms")
tic = toc
sample = sink.emit("pull-preroll" if preroll else "pull-sample")
gstbuffer = sample.get_buffer()
success, map_info = gstbuffer.map(Gst.MapFlags.READ)
if not success:
raise RuntimeError("Could not map buffer data!")
# numpy_frame = np.zeros((1080, 1920, 3), dtype=np.uint8)
numpy_frame = np.frombuffer(map_info.data, dtype=np.uint8)
gstbuffer.unmap(map_info)
return Gst.FlowReturn.OK
appsink = pipeline.get_by_name("appsink")
appsink.connect("new-sample", on_new_sample, False)
loop = GLib.MainLoop()
pipeline.set_state(Gst.State.PLAYING)
loop.run()