Skip to main content

Documentation Index

Fetch the complete documentation index at: https://mintlify.com/nokia/moler/llms.txt

Use this file to discover all available pages before exploring further.

Overview

ConnectionObserver is the abstract base class for every object that watches a connection for data. Both Command and Event extend this class. An observer is started, runs in the background parsing incoming data, and eventually completes (with a result or exception) or is cancelled.
moler.connection_observer.ConnectionObserver
ConnectionObserver is an abstract class. You must subclass it and implement data_received(data, recv_time) to use it.

Constructor

ConnectionObserver(connection=None, runner=None)
connection
AbstractMolerConnection
default:"None"
The connection from which data will be received. If None, a connection must be assigned before calling start(). The observer subscribes to incoming data on this connection.
runner
ConnectionObserverRunner
default:"None"
The runner that manages background execution. If None, the runner is resolved in order from: the connection’s runner, then the global default runner.

Lifecycle methods

start

observer.start(timeout=None, *args, **kwargs) -> ConnectionObserver
Start background execution of the observer. The observer’s timeout clock begins from this call, even before the underlying concurrency machinery (thread or coroutine) has obtained control.
timeout
float
default:"None"
Timeout in seconds. If provided, overrides the observer’s current timeout attribute. Must be a positive value.
Returns self, allowing chained calls. Raises:
  • WrongUsage — if the observer is already done.
  • NoConnectionProvided — if no connection is set.
  • ConnectionObserverTimeout — if timeout is not a positive value.

await_done

observer.await_done(timeout=None) -> Any
Block until the observer finishes, then return its result. This is a synchronous blocking call.
Do not call await_done() from inside an async def function — it will block the event loop. Instead use await observer or await asyncio.wait_for(observer, timeout=10).
timeout
float
default:"None"
Maximum seconds to wait. If None, uses the observer’s configured timeout.
Returns the observer’s result value. Raises:
  • ConnectionObserverNotStarted — if start() has not been called.
  • Any exception passed to set_exception().

cancel

observer.cancel() -> bool
Cancel the observer. Sets the observer to done state without a result. Returns True if the cancel was applied, False if the observer was already done or already cancelled.

State query methods

done

observer.done() -> bool
Returns True when the observer has finished — either with a result, an exception, or via cancellation.

running

observer.running() -> bool
Returns True if the observer is currently executing (started but not yet done).

cancelled

observer.cancelled() -> bool
Returns True if the observer was cancelled.

Result methods

result

observer.result() -> Any
Retrieve the final result of the observer. Raises:
  • The stored exception, if set_exception() was called.
  • NoResultSinceCancelCalled — if the observer was cancelled.
  • ResultNotAvailableYet — if the observer is not yet done.

set_result

observer.set_result(result) -> None
Store the final result and mark the observer as done. Call this from within data_received() once parsing is complete.
result
Any
required
The value to store as the final result.
Raises ResultAlreadySet if the observer is already done.

set_exception

observer.set_exception(exception) -> None
Record a failure and mark the observer as done. Calling result() after this will re-raise the stored exception.
exception
Exception
required
The exception representing the failure.

exception

Access the stored exception directly via the _exception attribute (not a public method), or let result() raise it.

Callback methods

add_done_callback

ConnectionObserver does not expose add_done_callback as a named public method. Completion notification is handled by the runner and through subclassing on_timeout().

Abstract method to implement

data_received

observer.data_received(data, recv_time) -> None
This abstract method is the entry point for all incoming data. The connection calls this method each time new data arrives. Parse the data here and call set_result() when done.
data
list[str]
Strings received from the device.
recv_time
datetime.datetime
Timestamp of when the data was read from the connection.

Override for custom behavior

on_timeout

observer.on_timeout() -> None
Called by the runner when the observer exceeds its configured timeout. The default implementation logs the timeout. Override this method to perform custom cleanup or logging.

on_inactivity

observer.on_inactivity() -> None
Called when no data is received within life_status.inactivity_timeout seconds. The default implementation does nothing. Override to handle connection silence.

Other public methods

extend_timeout

observer.extend_timeout(timedelta) -> None
Extend the remaining timeout by timedelta seconds.
timedelta
float
required
Seconds to add to the current timeout.

is_command

observer.is_command() -> bool
Returns False for base ConnectionObserver. Overridden to return True by Command.

get_long_desc / get_short_desc

observer.get_long_desc() -> str
observer.get_short_desc() -> str
Return human-readable descriptions of the observer. Both return "Observer '<module>.<classname>(id:...)'" by default. Override in subclasses to provide more informative descriptions.

get_unraised_exceptions

ConnectionObserver.get_unraised_exceptions(remove=True) -> list
Static method. Returns any exceptions that were set on observers but never retrieved via result(). Useful in test teardown.
remove
boolean
default:"True"
If True, clears the exceptions from the internal list after returning them.

Key attributes

AttributeTypeDescription
connectionAbstractMolerConnectionConnection being observed.
runnerConnectionObserverRunnerManages background execution.
timeoutfloatTimeout in seconds (settable property).
start_timefloatMonotonic time at which start() was called.
terminating_timeoutfloatAdditional time allowed after the main timeout for graceful shutdown.
life_statusConnectionObserverLifeStatusInternal lifecycle state container.
loggerlogging.LoggerLogger scoped to moler.connection.<name>.
device_loggerlogging.LoggerLogger scoped to moler.<name>.

Usage example

import re
from moler.connection_observer import ConnectionObserver

class WaitForBanner(ConnectionObserver):
    """Waits for a login banner on the connection."""

    def data_received(self, data, recv_time):
        for line in data:
            if re.search(r'Welcome to', line):
                self.set_result({'banner': line.strip()})
                return  # done — stop processing

    def on_timeout(self):
        # Custom timeout handling
        self._log_timeout()
        # Could also call set_exception() here for a custom error
Call observer.start() then observer.await_done() for simple synchronous use. For concurrent observation of multiple things, call start() on each observer first, then await_done() on each in turn.

Build docs developers (and LLMs) love