The State class is the central coordinator for pyinfra deployments. It manages the entire deployment lifecycle, tracks operations across all hosts, coordinates parallel execution, and maintains the current execution stage.
Every pyinfra deployment has exactly one State instance that orchestrates all operations, hosts, and connectors.
# From src/pyinfra/api/state.py:127-135class StateHostMeta: ops = 0 # Total operations ops_change = 0 # Operations that will change ops_no_change = 0 # Operations with no changes op_hashes: set[str] # Set of operation hashes def __init__(self) -> None: self.op_hashes = set()
Accessing host metadata:
meta = state.get_meta_for_host(host)print(f"Total ops: {meta.ops}")print(f"Will change: {meta.ops_change}")print(f"No change: {meta.ops_no_change}")
# From src/pyinfra/api/state.py:370-380def activate_host(self, host: Host): """ Flag a host as active. """ logger.debug("Activating host: %s", host) # Add to *both* activated and active # activated tracks all hosts ever activated # active tracks currently active (non-failed) hosts self.activated_hosts.add(host) self.active_hosts.add(host)
# From src/pyinfra/api/state.py:382-424def fail_hosts(self, hosts_to_fail, activated_count=None): """ Flag a set of hosts as failed, error if exceeding FAIL_PERCENT. """ if not hosts_to_fail: return activated_count = activated_count or len(self.activated_hosts) logger.debug( "Failing hosts: {}".format(", ".join(host.name for host in hosts_to_fail)) ) self.failed_hosts.update(hosts_to_fail) self.active_hosts -= hosts_to_fail # Check we're not above the fail percent active_hosts = self.active_hosts # No hosts left! if not active_hosts: raise PyinfraError("No hosts remaining!") if self.config.FAIL_PERCENT is not None: percent_failed = (1 - len(active_hosts) / activated_count) * 100 if percent_failed > self.config.FAIL_PERCENT: raise PyinfraError( f"Over {self.config.FAIL_PERCENT}% of hosts failed ({int(round(percent_failed))}%)" )
# From src/pyinfra/api/state.py:426-435def is_host_in_limit(self, host: Host): """ Returns a boolean indicating if the host is within the current state limit. """ limit_hosts = self.limit_hosts if not isinstance(limit_hosts, list): return True return host in limit_hosts
Usage:
# Limit deployment to specific hostsweb1 = inventory.get_host("web1.example.com")state.limit_hosts = [web1]# Check if host is in limitfor host in inventory: if state.is_host_in_limit(host): # Run operations on this host pass
State builds a Directed Acyclic Graph (DAG) of operations to determine execution order:
# From src/pyinfra/api/state.py:310-344def get_op_order(self): ts: TopologicalSorter = TopologicalSorter() # Build DAG from each host's operation order for host in self.inventory: for i, op_hash in enumerate(host.op_hash_order): if not i: ts.add(op_hash) # First op has no dependencies else: ts.add(op_hash, host.op_hash_order[i - 1]) # Depends on previous final_op_order = [] try: ts.prepare() except CycleError as e: raise PyinfraError( "Cycle detected in operation ordering DAG.\n" f" Error: {e}\n\n" " This can happen when using loops in operation code" ) while ts.is_active(): # Sort operations that can run in parallel by line number node_group = sorted( ts.get_ready(), key=lambda op_hash: self.op_meta[op_hash].op_order, ) ts.done(*node_group) final_op_order.extend(node_group) return final_op_order
from pyinfra.operations import apt, server# These operations create a DAG:apt.update() # Op 1: No dependenciesapt.packages( # Op 2: Depends on Op 1 (order matters) packages=["nginx"],)server.service( # Op 3: Depends on Op 2 (nginx must be installed) service="nginx", running=True,)# State builds this DAG:# Op 1 (apt.update)# ↓# Op 2 (apt.packages)# ↓# Op 3 (server.service)
# Two separate poolsself.pool = Pool(config.PARALLEL) # For operationsself.fact_pool = Pool(config.PARALLEL) # For facts
Example parallel execution:
import geventfrom pyinfra.context import ctx_hostdef run_on_host(host): with ctx_host.use(host): # Run operations on this host pass# Execute on all hosts in parallelgreenlets = [ state.pool.spawn(run_on_host, host) for host in state.inventory]# Wait for all to completegevent.joinall(greenlets)
# Increment warning countstate.increment_warning_counter()# Get warnings for current stagewarnings = state.get_warning_counter()print(f"Warnings in {state.current_stage}: {warnings}")
# Get operation metadataop_meta = state.get_op_meta(op_hash)# Get operation data for a hostop_data = state.get_op_data_for_host(host, op_hash)# Set operation data for a hoststate.set_op_data_for_host(host, op_hash, op_data)
# Enable change detection without executionstate.check_for_changes = Truestate.set_stage(StateStage.Prepare)# Operations are discovered but not executed# ... call operations ...# Check what would changefor host in inventory: meta = state.get_meta_for_host(host) print(f"{host.name}: {meta.ops_change} operations will change")