Skip to main content

Documentation Index

Fetch the complete documentation index at: https://mintlify.com/happyme531/ztu_somemodelruntime_ez_rknn_async/llms.txt

Use this file to discover all available pages before exploring further.

InferenceSession is the primary entry point for running RKNN models with EZ RKNN Async. Its constructor signature is intentionally compatible with onnxruntime.InferenceSession, so you can migrate ORT-based code by swapping the import and adding provider options — no other changes required.

Constructor

InferenceSession(
    path_or_bytes,
    sess_options=None,
    providers=None,
    provider_options=None,
    **kwargs,
)
path_or_bytes
str | PathLike
required
Filesystem path to a compiled .rknn model file. Any object accepted by os.fspath() works, including pathlib.Path. Passing raw bytes is not yet supported and raises RuntimeError.
sess_options
Any
Accepted for API compatibility with ONNX Runtime. Ignored at runtime.
providers
Any
Accepted for API compatibility with ONNX Runtime. Ignored at runtime.
provider_options
RknnProviderOptions | Sequence[RknnProviderOptions]
Runtime configuration for the RKNN session. Pass a RknnProviderOptions dict or a one-element list containing one. Use make_provider_options() to build a validated dict. When None, all options fall back to their defaults.

Example

import ztu_somemodelruntime_ez_rknn_async as ez

# Minimal — all defaults
session = ez.InferenceSession("model.rknn")

# With provider options
session = ez.InferenceSession(
    "model.rknn",
    provider_options=ez.make_provider_options(
        layout="nchw",
        max_queue_size=4,
        schedule=[0, 1, 2],
    ),
)

Methods

run

run(output_names, input_feed, run_options=None) -> List[ndarray]
Runs synchronous inference and blocks until results are ready. Returns a list of float32 NumPy arrays — one per requested output.
output_names
Sequence[str] | None
required
List of output tensor names to return. Pass None to return all outputs in model order.
input_feed
dict[str, ndarray] | list[ndarray] | tuple[ndarray] | ndarray
required
Model inputs. Three forms are accepted:
  • dict — maps each input name to a NumPy array. Keys must match session.input_names exactly.
  • list or tuple — arrays in the same order as session.input_names.
  • single ndarray — only valid when the model has exactly one input.
run_options
dict | RunOptions
Optional run-time flags. Supports the key "ztu_modelrt_dispatch_batch" (bool). When True, the first dimension of each input array is treated as a batch index: the session splits the batch into individual inference requests, dispatches them concurrently across the async queue, then concatenates results. This is ignored by run_async.

Examples

import numpy as np
import ztu_somemodelruntime_ez_rknn_async as ez

session = ez.InferenceSession("model.rknn")
inp = np.random.randn(1, 3, 640, 640).astype(np.float32)

# Return all outputs using a dict feed
outputs = session.run(None, {"images": inp})

# Return a subset of outputs by name
outputs = session.run(["output0"], {"images": inp})

# List feed — positional order
outputs = session.run(None, [inp])

# Single-array feed (single-input models only)
outputs = session.run(None, inp)
Batch dispatch mode:
# 4 samples dispatched concurrently; outputs have shape (4, ...)
batch_inp = np.random.randn(4, 3, 640, 640).astype(np.float32)
outputs = session.run(None, {"images": batch_inp}, run_options={"ztu_modelrt_dispatch_batch": True})
Batch dispatch mode requires the model’s input batch dimension to equal 1. The session slices the leading axis and submits each sample as a separate task.

run_async

run_async(output_names, input_feed, callback, user_data=None, run_options=None) -> None
Submits an inference request to the async queue and returns immediately. Results are delivered through callback when inference completes. The call blocks only when the async queue is full (up to submit_timeout_ms).
output_names
Sequence[str] | None
required
Output names to include in the callback result, or None for all outputs.
input_feed
dict[str, ndarray] | list[ndarray] | ndarray
required
Model inputs, in the same forms accepted by run.
callback
Callable[[List[ndarray], Any, str], None]
required
Callable invoked when the inference task completes. Signature:
def callback(results: List[ndarray], user_data: Any, err: str) -> None: ...
  • results — list of float32 arrays, one per requested output. None on failure.
  • user_data — the value passed as user_data to run_async.
  • err — empty string on success; a non-empty error message on failure.
user_data
Any
Arbitrary value forwarded unchanged to callback as its second argument. Useful for correlating results with the original request.
run_options
dict | RunOptions
Run-time flags. The "ztu_modelrt_dispatch_batch" key is not supported by run_async and raises RuntimeError if set to True.

Example

import threading
import numpy as np
import ztu_somemodelruntime_ez_rknn_async as ez

session = ez.InferenceSession("model.rknn")
done = threading.Event()

def on_result(results, user_data, err):
    if err:
        print(f"Inference error: {err}")
    else:
        print(f"Got {len(results)} output(s), first shape: {results[0].shape}")
        print(f"Request ID: {user_data}")
    done.set()

inp = np.random.randn(1, 3, 640, 640).astype(np.float32)
session.run_async(None, {"images": inp}, callback=on_result, user_data=42)
done.wait()
Do not raise unhandled exceptions inside callback. The GIL is acquired for the callback invocation; an unhandled exception is reported via PyErr_WriteUnraisable and does not propagate to the caller.

run_pipeline

run_pipeline(input_feed, depth=3, reset=False) -> Optional[List[ndarray]]
Implements a sliding-window pipeline over the async queue. Each call submits a new inference request and, once the pipeline has accumulated depth in-flight tasks, returns the oldest completed result. Returns None until the pipeline fills. This mode is designed for streaming workloads — for example, running inference on a video feed — where you want to overlap data preparation on the CPU with NPU inference.
input_feed
dict[str, ndarray] | list[ndarray] | ndarray
required
Model inputs for the current frame, in the same forms accepted by run.
depth
int
default:"3"
Pipeline depth: how many inference requests are kept in flight simultaneously. Must be greater than 0. Changing depth between calls automatically resets the pipeline.
reset
bool
default:"False"
When True, clears the pipeline queue before submitting the new request. Use this when switching input streams or after an error to avoid stale results draining out.

Example

import numpy as np
import ztu_somemodelruntime_ez_rknn_async as ez

session = ez.InferenceSession("model.rknn")
frames = [np.random.randn(1, 3, 640, 640).astype(np.float32) for _ in range(10)]

results = []
for frame in frames:
    output = session.run_pipeline({"images": frame}, depth=3)
    if output is not None:
        results.append(output)

# Drain the remaining frames by passing a dummy input with reset=False
# and collecting until None stops appearing.
The first depth calls return None while the pipeline fills. Plan your result-handling loop accordingly or use run / run_async when you need a result for every input frame.

get_inputs

get_inputs() -> List[NodeArg]
Returns a list of NodeArg objects describing each model input tensor. Each NodeArg exposes .name, .shape, and .type properties.

Example

for node in session.get_inputs():
    print(node.name, node.shape, node.type)
# images [1, 3, 640, 640] tensor(float)

get_outputs

get_outputs() -> List[NodeArg]
Returns a list of NodeArg objects describing each model output tensor.

Example

for node in session.get_outputs():
    print(node.name, node.shape, node.type)
# output0 [1, 25200, 85] tensor(float)

get_modelmeta

get_modelmeta() -> ModelMetadata
Returns a ModelMetadata object containing metadata embedded in the .rknn file at conversion time.

Example

meta = session.get_modelmeta()
print(meta.custom_metadata_map)
# {'rknn_custom_string': 'yolov5s-640'}

Properties

input_names
List[str]
Read-only list of input tensor names, in model order. Equivalent to [n.name for n in session.get_inputs()].
print(session.input_names)  # ['images']
output_names
List[str]
Read-only list of output tensor names, in model order. Equivalent to [n.name for n in session.get_outputs()].
print(session.output_names)  # ['output0', 'output1']

Build docs developers (and LLMs) love