Skip to main content

Documentation Index

Fetch the complete documentation index at: https://mintlify.com/nokia/moler/llms.txt

Use this file to discover all available pages before exploring further.

Moler separates the command/observer API from the execution backend. The same command objects can be driven by a threaded runner or by an asyncio runner — you choose via the runner parameter or a configuration setting. This lets you integrate Moler into existing asyncio applications without rewriting commands.

Runners overview

Runner variantClassBest for
threadedThreadedRunnerStandard synchronous test scripts; simplest to use
asyncioAsyncioRunnerasync def coroutine-based code; all work on one event loop
asyncio-in-threadAsyncioInThreadRunnerSynchronous (def) code that needs asyncio under the hood; mixes threads and asyncio
Most test automation uses the threaded runner (the default). Switch to asyncio when:
  • Your test harness already uses an asyncio event loop.
  • You want to await observers directly with asyncio.wait_for().
  • You want to avoid thread overhead for high-concurrency scenarios.

Using the asyncio runner

To use the asyncio runner, pass runner=get_runner(variant="asyncio") when creating an observer, or configure it in the YAML file.
Inside async def functions, use asyncio.wait_for() to await observers:
import asyncio
from moler.connection_factory import get_connection
from moler.runner_factory import get_runner
from network_toggle_observers import NetworkDownDetector

async def ping_observing_task(ext_io_connection, ping_ip):
    net_down_detector = NetworkDownDetector(
        ping_ip,
        connection=ext_io_connection.moler_connection,
        runner=get_runner(variant="asyncio"),
    )

    net_down_detector.start()  # start in background

    async with ext_io_connection:
        try:
            net_down_time = await asyncio.wait_for(
                net_down_detector, timeout=10
            )
            print(f'Network {ping_ip} is down at {net_down_time}')
        except asyncio.TimeoutError:
            print('Network down detector timed out')

async def main():
    tcp_connection = get_connection(name='net_1', variant='asyncio')
    await ping_observing_task(tcp_connection, '10.0.2.15')

asyncio.run(main())
The await asyncio.wait_for(net_down_detector, timeout=10) syntax works because Moler command and observer objects implement __await__ via the runner’s wait_for_iterator().

AsyncioRunner internals

The AsyncioRunner (defined in moler/asyncio_runner.py) works as follows:
1

submit()

When an observer is started, the runner calls submit(). This:
  1. Gets or creates an asyncio event loop for the current thread.
  2. Calls _start_feeding() — subscribes a secure_data_received callback to the Moler connection so the observer starts receiving data immediately.
  3. Schedules a feed() coroutine via asyncio.ensure_future().
  4. Returns the asyncio Task (a Future) for use by wait_for().
2

feed() coroutine

The feed() coroutine runs inside the event loop. It polls every 5ms (await asyncio.sleep(0.005)) checking whether the observer is done or has timed out. When done, it unsubscribes the data receiver.
async def feed(self, connection_observer, subscribed_data_receiver, observer_lock):
    while True:
        if connection_observer.done():
            break
        run_duration = time.monotonic() - start_time
        if connection_observer.timeout and run_duration >= connection_observer.timeout:
            time_out_observer(connection_observer, ...)
            break
        await asyncio.sleep(0.005)  # yield control
3

wait_for()

wait_for() runs the event loop (via event_loop.run_forever()) until the observer’s future completes or a timeout occurs. If called from inside an already-running event loop (i.e., from async def code), it raises WrongUsage — use await observer or await asyncio.wait_for(observer, timeout) instead.

AsyncioInThreadRunner

AsyncioInThreadRunner is designed for synchronous code. It uses a shared AsyncioLoopThread — a single background thread running an asyncio event loop that is created once on first use:
# Internally, AsyncioInThreadRunner does:
thread4async = get_asyncio_loop_thread()  # singleton background thread
connection_observer_future = thread4async.run_async_coroutine(
    start_feeder(), timeout=0.5
)
This means submit() blocks for up to 0.5 seconds waiting for the feeder coroutine to actually start — guaranteeing that no data is lost before the observer is ready.

Asyncio IO connections

Moler provides asyncio-native IO implementations in moler/io/asyncio/:
from moler.io.asyncio.terminal import AsyncioTerminal
from moler.threaded_moler_connection import ThreadedMolerConnection

# AsyncioTerminal spawns /bin/bash as a subprocess via asyncio pty
terminal = AsyncioTerminal(
    moler_connection=ThreadedMolerConnection()
)
# Open inside a running event loop:
async def run():
    async with terminal:
        # connection is now open and data flows
        pass
AsyncioTerminal (in moler/io/asyncio/terminal.py) uses asyncio.SubprocessProtocol and a pty (pseudo-terminal) to spawn and interact with a shell subprocess inside the event loop, without threads.

Configuring runner variant in YAML

You can specify the connection variant in the YAML config so all devices and connections use asyncio without changing Python code:
CONNECTIONS:
  default_variant:
    terminal: asyncio
    tcp: asyncio

DEVICES:
  MyMachine:
    DEVICE_CLASS: moler.device.unixlocal.UnixLocal
With this config, get_connection(io_type='terminal') returns an asyncio-backed terminal connection.

When to use which runner

The runner choice affects how observers are scheduled, not what they parse. All command classes work with all runners.
SituationRecommended runner
Simple sequential test scriptsthreaded (default)
Test framework already uses asyncio event loopasyncio
Synchronous code, high connection count, want asyncio efficiencyasyncio-in-thread
Inside async def and want direct await syntaxasyncio

Thread safety in the asyncio runner

The AsyncioRunner protects against concurrent write access to observers using an observer_lock (a threading.Lock). This is important because data can arrive from a connection (potentially in a different thread or callback) while the runner’s wait_for() is also accessing the observer:
# From asyncio_runner.py — secure_data_received wraps data delivery
def secure_data_received(data, recv_time):
    try:
        if connection_observer.done() or self._in_shutdown:
            return
        with observer_lock:
            connection_observer.data_received(data, recv_time)
    except Exception as exc:
        with observer_lock:
            connection_observer.set_exception(exc)
This design means you can safely use multiple observers on the same connection from multiple threads.

Build docs developers (and LLMs) love