Skip to main content

Documentation Index

Fetch the complete documentation index at: https://mintlify.com/happyme531/ztu_somemodelruntime_ez_rknn_async/llms.txt

Use this file to discover all available pages before exploring further.

EZ RKNN Async exposes three distinct ways to run inference through InferenceSession. All three share the same session setup — you choose the calling convention that fits your workload. run blocks until results are ready, run_async fires a callback when inference completes without blocking the caller, and run_pipeline keeps a sliding window of in-flight frames so the NPU is never idle between calls.

The three modes at a glance

ModeBlocks caller?Returns resultsBest for
runYesImmediately as List[np.ndarray]Scripts, one-shot inference, ORT drop-in
run_asyncNoVia callback when readyReal-time systems, event loops
run_pipelineYes (on oldest result)After pipeline fillsVideo/streaming, maximum throughput

Synchronous inference with run

run submits a task internally and waits for the result before returning. The API is identical to onnxruntime.InferenceSession.run, making it a direct drop-in replacement for existing code.
from ztu_somemodelruntime_ez_rknn_async import InferenceSession, make_provider_options
import numpy as np

sess = InferenceSession(
    "model.rknn",
    provider_options=make_provider_options(),
)

# input_feed accepts a dict, list, or single array
input_data = np.random.rand(1, 3, 224, 224).astype(np.float32)
outputs = sess.run(None, {"input": input_data})
print(outputs[0].shape)
Use run when:
  • You are porting code from onnxruntime and want minimal changes.
  • You are writing a script or a test that runs inference once or in a simple loop.
  • Latency per call matters more than overall throughput.

Async inference with run_async

run_async submits a task to the NPU worker queue and returns None immediately. Your callback is invoked on a dedicated callback thread when the result is ready.
import threading
import numpy as np
from ztu_somemodelruntime_ez_rknn_async import InferenceSession, make_provider_options

done = threading.Event()

def on_result(results, user_data, err):
    if err:
        print(f"Inference error: {err}")
    else:
        print(f"Got result for frame {user_data}: shape={results[0].shape}")
    done.set()

sess = InferenceSession(
    "model.rknn",
    provider_options=make_provider_options(sequential_callbacks=True),
)

frame_id = 42
input_data = np.random.rand(1, 3, 224, 224).astype(np.float32)

sess.run_async(None, {"input": input_data}, callback=on_result, user_data=frame_id)

done.wait()
The callback signature is:
def callback(results: List[Any], user_data: Any, err: Optional[str]) -> None:
    ...
Use run_async when:
  • Your main thread drives an event loop or UI that must not stall.
  • You want to pipeline NPU work with other CPU processing done in the callback.
  • You need fine-grained control over when results are consumed.
run_async does not support ztu_modelrt_dispatch_batch=True in run_options. Pass dispatch-batch workloads through run instead.

Pipeline inference with run_pipeline

run_pipeline is a pseudo-synchronous interface designed for continuous frame streams. Each call submits the current frame and returns the result of a frame submitted depth calls earlier. The NPU stays busy during the caller’s inter-frame processing because future frames are already queued.
import numpy as np
from ztu_somemodelruntime_ez_rknn_async import InferenceSession, make_provider_options

sess = InferenceSession(
    "model.rknn",
    provider_options=make_provider_options(schedule=[0, 1, 2]),
)

for frame in video_stream():
    input_data = preprocess(frame)
    result = sess.run_pipeline({"input": input_data}, depth=3)
    if result is not None:
        postprocess(result)

# Drain the pipeline after the stream ends
Use run_pipeline when:
  • You process a continuous stream of frames and want maximum NPU utilization.
  • You can tolerate depth frames of added latency in exchange for higher throughput.
  • Your workload is single-threaded (one producer, one consumer).

Detailed guides

Async inference

Callbacks, queue limits, and ordering guarantees for run_async.

Pipeline inference

Depth tuning, reset behavior, and frame-loop patterns for run_pipeline.

Multi-core NPU

Distribute tasks across NPU cores with schedule or tp_mode.

Migrate from onnxruntime

Side-by-side comparison of the ORT and EZ RKNN Async APIs.

Build docs developers (and LLMs) love