Skip to main content

Documentation Index

Fetch the complete documentation index at: https://mintlify.com/happyme531/ztu_somemodelruntime_ez_rknn_async/llms.txt

Use this file to discover all available pages before exploring further.

run_async lets you submit an inference task and return to the caller immediately. The session queues the work, a background worker thread runs it on the NPU, and a dedicated callback thread fires your function when the result is ready. This decouples your main thread from NPU execution time.

How it works

When you call run_async, the session tries to place the task into the internal queue. If there is room, it returns None immediately and your callback is invoked later. If the queue is full, run_async blocks until capacity becomes available (up to submit_timeout_ms) and then raises if still saturated. The maximum callback queue depth is 8 (MAX_CALLBACK_QUEUE_SIZE). The task queue depth is controlled by max_queue_size (default 3). Together they form a two-stage pipeline between submission and result delivery.

Callback signature

def callback(
    results: List[Any],       # list of numpy arrays, one per output
    user_data: Any,           # value you passed as user_data=
    err: Optional[str],       # empty string on success, error message on failure
) -> None:
    ...
results is None when inference fails; check err for the reason.

Basic example

import threading
import numpy as np
from ztu_somemodelruntime_ez_rknn_async import InferenceSession, make_provider_options

sess = InferenceSession(
    "model.rknn",
    provider_options=make_provider_options(
        sequential_callbacks=True,   # callbacks fire in submission order
        submit_timeout_ms=5000,      # raise after 5 s if queue stays full
    ),
)

results_received = []
lock = threading.Lock()
all_done = threading.Event()
TOTAL = 10

def on_result(results, user_data, err):
    frame_id = user_data
    if err:
        print(f"Frame {frame_id} failed: {err}")
    else:
        with lock:
            results_received.append((frame_id, results[0]))
            if len(results_received) == TOTAL:
                all_done.set()

for i in range(TOTAL):
    frame = np.random.rand(1, 3, 224, 224).astype(np.float32)
    sess.run_async(None, {"input": frame}, callback=on_result, user_data=i)

all_done.wait()
print(f"Received {len(results_received)} results")

Queue behavior and back-pressure

run_async implements a blocking retry loop internally. When both the task queue (max_queue_size) and the callback queue (hard limit of 8) are full, it waits on a condition variable until a slot opens, then retries. If submit_timeout_ms elapses before a slot opens, it raises RuntimeError.
opts = make_provider_options(
    max_queue_size=3,          # up to 3 tasks waiting for the NPU
    submit_timeout_ms=10000,   # wait up to 10 s before raising (default)
)
Do not call run_async in a tight loop without any pacing. If your producer is faster than the NPU, the queue fills and every call blocks until submit_timeout_ms — at which point it raises. Use sequential_callbacks=True and allow the callback to signal the next submission to keep the queue from saturating.

Callback ordering

Set sequential_callbacks=True (the default) to guarantee that callbacks are invoked in the exact order tasks were submitted, regardless of which NPU core finished first. The callback thread holds completed results in a map keyed by task ID and waits for the next expected ID before firing. Set sequential_callbacks=False to fire callbacks as soon as results arrive, which can improve tail latency when core execution times vary.
# In-order delivery (default)
opts = make_provider_options(sequential_callbacks=True)

# Out-of-order delivery for lowest per-result latency
opts = make_provider_options(sequential_callbacks=False)
run_async does not support ztu_modelrt_dispatch_batch=True in run_options. Attempting to set that flag raises a RuntimeError. Use run with ztu_modelrt_dispatch_batch=True for batch dispatch.

Selecting output names

Like run, run_async accepts an output_names list to filter which outputs are delivered to the callback. Pass None to receive all outputs.
sess.run_async(
    ["output_logits"],       # only deliver this output
    {"input": frame},
    callback=on_result,
    user_data=frame_id,
)

Capacity limits summary

LimitSourceDefault
Task queue maxmax_queue_size option3
Callback queue maxMAX_CALLBACK_QUEUE_SIZE (hard-coded)8
Submit block timeoutsubmit_timeout_ms option10000 ms

Build docs developers (and LLMs) love