Skip to main content

Documentation Index

Fetch the complete documentation index at: https://mintlify.com/happyme531/ztu_somemodelruntime_ez_rknn_async/llms.txt

Use this file to discover all available pages before exploring further.

run_pipeline is a pseudo-synchronous interface designed for continuous frame streams. On each call you submit a new frame and receive the result from a frame submitted depth calls ago. Because the NPU is processing the current frame while your CPU is consuming the previous result, the NPU is never left idle between loop iterations.

How the pipeline works

Internally, run_pipeline uses a std::queue of futures. Each call:
  1. Submits the new input via submit_task_blocking (blocks until a queue slot opens).
  2. Pushes the resulting future onto pipeline_queue_.
  3. Returns None if pipeline_queue_.size() <= depth — the pipeline is still filling.
  4. Once the queue exceeds depth, pops and resolves the oldest future, returning its outputs.
This gives you depth frames of buffered latency. After the pipeline is full, every call returns exactly one result.

Basic video loop example

import numpy as np
from ztu_somemodelruntime_ez_rknn_async import InferenceSession, make_provider_options

sess = InferenceSession(
    "model.rknn",
    provider_options=make_provider_options(schedule=[0, 1, 2]),
)

DEPTH = 3  # number of frames to keep in flight simultaneously

for frame in video_stream():
    input_data = preprocess(frame)
    result = sess.run_pipeline({"input": input_data}, depth=DEPTH)

    if result is None:
        # Pipeline is still filling — no result yet
        continue

    # result is a list of numpy arrays (all outputs)
    postprocess(result)
The first depth calls always return None while the pipeline fills. Your loop must handle None explicitly before attempting to process the result.

Draining the pipeline after a stream ends

When the source stream is exhausted you still have depth frames of results in the pipeline. Submit dummy inputs or keep feeding real frames until run_pipeline has returned all buffered results. An alternative pattern is to track how many results you have consumed versus submitted:
submitted = 0
received = 0

for frame in video_stream():
    result = sess.run_pipeline({"input": preprocess(frame)}, depth=DEPTH)
    submitted += 1
    if result is not None:
        postprocess(result)
        received += 1

# After the stream, we have (submitted - received) results still in the pipeline.
# Continue submitting the last frame (or a blank) to drain them.
while received < submitted:
    result = sess.run_pipeline({"input": last_frame}, depth=DEPTH)
    if result is not None:
        postprocess(result)
        received += 1

Resetting the pipeline

Pass reset=True to clear all buffered futures and restart with a fresh pipeline. Use this when the input changes in a way that makes queued results invalid — for example, when switching camera resolutions or switching to a different input stream.
# Start a new stream segment with a clean pipeline
result = sess.run_pipeline({"input": first_frame_of_new_segment}, depth=DEPTH, reset=True)
# Returns None — the pipeline refills from scratch
Any futures still in the pipeline when reset=True are discarded. Their results are never returned. Do not reset mid-stream unless you intentionally want to discard those frames.

Depth, latency, and throughput

depth is the number of frames that are simultaneously in flight on the NPU. There is a direct tradeoff:
DepthAdded latencyNPU utilization
11 frameLow — NPU idles while CPU processes each result
3 (default)3 framesGood — NPU keeps processing while CPU handles results
HigherMore framesDiminishing returns; capped by task queue size
Start with depth=3 and increase if profiling shows the NPU is still idle between frames. Setting depth much larger than max_queue_size provides no additional benefit because the submit call will block waiting for a queue slot.

run_pipeline vs run_async for streaming

Simpler single-threaded loop. Submission and result consumption happen in one call. Use when your producer and consumer are the same thread.
result = sess.run_pipeline({"input": frame}, depth=3)

Method signature

def run_pipeline(
    self,
    input_feed: Any,       # dict, list, or single numpy array
    depth: int = 3,        # number of frames to buffer
    reset: bool = False,   # True clears the pipeline before submitting
) -> Any:                  # List[np.ndarray] or None
    ...

Build docs developers (and LLMs) love