Connectors are pyinfra’s abstraction for executing commands on remote hosts. While pyinfra includes connectors for SSH, Docker, and local execution, you can create custom connectors for other connection methods like custom protocols, cloud APIs, or container orchestrators.Documentation Index
Fetch the complete documentation index at: https://mintlify.com/pyinfra-dev/pyinfra/llms.txt
Use this file to discover all available pages before exploring further.
Understanding Connectors
Connectors in pyinfra:- Establish and manage connections to remote hosts
- Execute shell commands and return output
- Transfer files to and from remote hosts
- Can define custom host data and configuration
- Are loaded via Python entry points
The BaseConnector Class
All connectors inherit fromBaseConnector, defined in src/pyinfra/connectors/base.py:71:
from pyinfra.connectors.base import BaseConnector, DataMeta
from typing import Tuple, Iterator
from io import IOBase
class BaseConnector:
state: State # Current deployment state
host: Host # Current host being managed
handles_execution = False # Set to True if connector executes commands
data_cls: Type = ConnectorData # TypedDict for connector data
data_meta: dict[str, DataMeta] = {} # Metadata for connector data
def __init__(self, state: State, host: Host):
self.state = state
self.host = host
self.data = host_to_connector_data(
self.data_cls, self.data_meta, host.data
)
@staticmethod
@abstractmethod
def make_names_data(name: str) -> Iterator[tuple[str, dict, list[str]]]:
"""Generate inventory targets from a connection string."""
pass
def connect(self) -> None:
"""Establish connection to the remote host."""
pass
def disconnect(self) -> None:
"""Close connection to the remote host."""
pass
@abstractmethod
def run_shell_command(
self,
command: StringCommand,
print_output: bool,
print_input: bool,
**arguments: ConnectorArguments,
) -> Tuple[bool, CommandOutput]:
"""Execute a command and return success status and output."""
pass
@abstractmethod
def put_file(
self,
filename_or_io: Union[str, IOBase],
remote_filename: str,
remote_temp_filename: Optional[str] = None,
print_output: bool = False,
print_input: bool = False,
**arguments: ConnectorArguments,
) -> bool:
"""Upload a file to the remote host."""
pass
@abstractmethod
def get_file(
self,
remote_filename: str,
filename_or_io: Union[str, IOBase],
remote_temp_filename: Optional[str] = None,
print_output: bool = False,
print_input: bool = False,
**arguments: ConnectorArguments,
) -> bool:
"""Download a file from the remote host."""
pass
Creating a Basic Connector
Here’s a simple example based on the local connector pattern fromsrc/pyinfra/connectors/local.py:26:
from pyinfra.connectors.base import BaseConnector, DataMeta
from pyinfra.connectors.util import (
CommandOutput,
make_unix_command_for_host,
run_local_process,
)
from pyinfra.api.exceptions import ConnectError, InventoryError
from typing_extensions import TypedDict, Unpack, override
from typing import Tuple
class CustomConnectorData(TypedDict):
"""Define connector-specific configuration."""
custom_param: str
custom_timeout: int
class CustomConnector(BaseConnector):
"""
Custom connector for executing commands via a custom protocol.
"""
handles_execution = True # This connector can execute commands
data_cls = CustomConnectorData
data_meta = {
"custom_param": DataMeta("Custom parameter description", default="default_value"),
"custom_timeout": DataMeta("Custom timeout in seconds", default=30),
}
@override
@staticmethod
def make_names_data(name: str):
"""Parse connection string and generate host data."""
# Parse name (e.g., "@custom:host1,host2")
if not name.startswith("@custom:"):
raise InventoryError(
f"Invalid custom connector name: {name}. "
"Expected format: @custom:hostname"
)
hostname = name.replace("@custom:", "")
host_data = {"custom_param": "value_from_name"}
groups = ["custom"]
yield hostname, host_data, groups
@override
def connect(self) -> None:
"""Establish connection to remote host."""
# Implement connection logic
try:
# Example: connect to custom service
self._connection = self._establish_connection(
host=self.host.name,
timeout=self.data["custom_timeout"]
)
except Exception as e:
raise ConnectError(f"Failed to connect: {e}")
@override
def disconnect(self) -> None:
"""Close connection."""
if hasattr(self, "_connection") and self._connection:
self._connection.close()
self._connection = None
@override
def run_shell_command(
self,
command: StringCommand,
print_output: bool = False,
print_input: bool = False,
**arguments: Unpack[ConnectorArguments],
) -> Tuple[bool, CommandOutput]:
"""Execute a command on the remote host."""
# Build the actual command
unix_command = make_unix_command_for_host(
self.state, self.host, command, **arguments
)
actual_command = unix_command.get_raw_value()
# Execute via your custom protocol
exit_code, stdout, stderr = self._execute_remote_command(
actual_command
)
# Return success status and output
success = exit_code == 0
output = CommandOutput(
stdout_lines=stdout.split("\n"),
stderr_lines=stderr.split("\n")
)
return success, output
@override
def put_file(
self,
filename_or_io,
remote_filename: str,
remote_temp_filename=None,
print_output: bool = False,
print_input: bool = False,
**arguments,
) -> bool:
"""Upload a file to the remote host."""
from pyinfra.api.util import get_file_io
# Read file content
with get_file_io(filename_or_io) as file_io:
content = file_io.read()
if isinstance(content, str):
content = content.encode()
# Upload via your custom protocol
try:
self._upload_file(content, remote_filename)
return True
except Exception as e:
logger.error(f"Failed to upload file: {e}")
return False
@override
def get_file(
self,
remote_filename: str,
filename_or_io,
remote_temp_filename=None,
print_output: bool = False,
print_input: bool = False,
**arguments,
) -> bool:
"""Download a file from the remote host."""
try:
content = self._download_file(remote_filename)
# Write to local file or IO object
if isinstance(filename_or_io, str):
with open(filename_or_io, "wb") as f:
f.write(content)
else:
filename_or_io.write(content)
return True
except Exception as e:
logger.error(f"Failed to download file: {e}")
return False
# Helper methods for your custom protocol
def _establish_connection(self, host: str, timeout: int):
"""Implement your connection logic."""
# Example: return custom_protocol.connect(host, timeout)
pass
def _execute_remote_command(self, command: str) -> Tuple[int, str, str]:
"""Execute command and return (exit_code, stdout, stderr)."""
# Example: return self._connection.exec(command)
pass
def _upload_file(self, content: bytes, remote_path: str):
"""Upload file content to remote path."""
# Example: self._connection.put(content, remote_path)
pass
def _download_file(self, remote_path: str) -> bytes:
"""Download file from remote path."""
# Example: return self._connection.get(remote_path)
pass
Complete Example: REST API Connector
Here’s a more complete example of a connector that manages resources via a REST API:import requests
from pyinfra import logger
from pyinfra.connectors.base import BaseConnector, DataMeta
from pyinfra.connectors.util import CommandOutput
from pyinfra.api.exceptions import ConnectError
from typing_extensions import TypedDict, Unpack, override
class ApiConnectorData(TypedDict):
api_url: str
api_key: str
api_timeout: int
class ApiConnector(BaseConnector):
"""
Execute commands on remote hosts via a REST API.
Connection format: @api:resource_id
Example:
pyinfra @api:server-123 deploy.py
"""
handles_execution = True
data_cls = ApiConnectorData
data_meta = {
"api_url": DataMeta("API base URL", default="https://api.example.com"),
"api_key": DataMeta("API authentication key"),
"api_timeout": DataMeta("API request timeout", default=60),
}
@override
@staticmethod
def make_names_data(name: str):
"""Parse @api:resource_id format."""
if not name.startswith("@api:"):
raise InventoryError(
f"Invalid API connector name: {name}. "
"Expected format: @api:resource_id"
)
resource_id = name.replace("@api:", "")
# Yield (hostname, host_data, groups)
yield resource_id, {}, ["api"]
@override
def connect(self) -> None:
"""Verify API connectivity and resource existence."""
url = f"{self.data['api_url']}/resources/{self.host.name}"
try:
response = requests.get(
url,
headers={"Authorization": f"Bearer {self.data['api_key']}"},
timeout=self.data["api_timeout"]
)
response.raise_for_status()
self.host.host_data["api_resource"] = response.json()
logger.info(f"Connected to API resource: {self.host.name}")
except requests.RequestException as e:
raise ConnectError(
f"Failed to connect to API resource {self.host.name}: {e}"
)
@override
def run_shell_command(
self,
command: StringCommand,
print_output: bool = False,
print_input: bool = False,
**arguments: Unpack[ConnectorArguments],
) -> Tuple[bool, CommandOutput]:
"""Execute command via API."""
url = f"{self.data['api_url']}/resources/{self.host.name}/exec"
# Convert StringCommand to string
command_str = command.get_raw_value()
if print_input:
logger.info(f">>> {command_str}")
try:
response = requests.post(
url,
json={
"command": command_str,
"timeout": arguments.get("_timeout", 300),
},
headers={"Authorization": f"Bearer {self.data['api_key']}"},
timeout=self.data["api_timeout"]
)
response.raise_for_status()
result = response.json()
exit_code = result.get("exit_code", 0)
stdout = result.get("stdout", "")
stderr = result.get("stderr", "")
if print_output:
if stdout:
logger.info(stdout)
if stderr:
logger.error(stderr)
success = exit_code == arguments.get("_success_exit_codes", [0])[0]
output = CommandOutput(
stdout_lines=stdout.split("\n") if stdout else [],
stderr_lines=stderr.split("\n") if stderr else []
)
return success, output
except requests.RequestException as e:
logger.error(f"Command execution failed: {e}")
return False, CommandOutput([], [str(e)])
@override
def put_file(
self,
filename_or_io,
remote_filename: str,
remote_temp_filename=None,
print_output: bool = False,
print_input: bool = False,
**arguments,
) -> bool:
"""Upload file via API."""
from pyinfra.api.util import get_file_io
import base64
url = f"{self.data['api_url']}/resources/{self.host.name}/files"
# Read and encode file content
with get_file_io(filename_or_io) as file_io:
content = file_io.read()
if isinstance(content, str):
content = content.encode()
content_b64 = base64.b64encode(content).decode()
try:
response = requests.post(
url,
json={
"path": remote_filename,
"content": content_b64,
"encoding": "base64",
},
headers={"Authorization": f"Bearer {self.data['api_key']}"},
timeout=self.data["api_timeout"]
)
response.raise_for_status()
if print_output:
logger.info(f"Uploaded file to {remote_filename}")
return True
except requests.RequestException as e:
logger.error(f"File upload failed: {e}")
return False
@override
def get_file(
self,
remote_filename: str,
filename_or_io,
remote_temp_filename=None,
print_output: bool = False,
print_input: bool = False,
**arguments,
) -> bool:
"""Download file via API."""
import base64
url = f"{self.data['api_url']}/resources/{self.host.name}/files/{remote_filename}"
try:
response = requests.get(
url,
headers={"Authorization": f"Bearer {self.data['api_key']}"},
timeout=self.data["api_timeout"]
)
response.raise_for_status()
result = response.json()
content_b64 = result.get("content", "")
content = base64.b64decode(content_b64)
# Write to file or IO object
if isinstance(filename_or_io, str):
with open(filename_or_io, "wb") as f:
f.write(content)
else:
filename_or_io.write(content)
if print_output:
logger.info(f"Downloaded file from {remote_filename}")
return True
except requests.RequestException as e:
logger.error(f"File download failed: {e}")
return False
Registering Your Connector
Connectors are loaded via Python entry points. Add this to yoursetup.py or pyproject.toml:
setup.py
from setuptools import setup
setup(
name="pyinfra-custom-connector",
version="0.1.0",
packages=["pyinfra_custom"],
entry_points={
"pyinfra.connectors": [
"custom = pyinfra_custom.connector:CustomConnector",
"api = pyinfra_custom.connector:ApiConnector",
],
},
)
pyproject.toml
[project.entry-points."pyinfra.connectors"]
custom = "pyinfra_custom.connector:CustomConnector"
api = "pyinfra_custom.connector:ApiConnector"
Helper Utilities
pyinfra provides utilities for common connector tasks:Command Building
from pyinfra.connectors.util import make_unix_command_for_host
# Build command with sudo, su_user, env, etc.
unix_command = make_unix_command_for_host(
state, host, command,
_sudo=True,
_sudo_user="root",
_env={"PATH": "/usr/local/bin:/usr/bin"},
)
Running Local Processes
from pyinfra.connectors.util import run_local_process
exit_code, output = run_local_process(
"ls -la /tmp",
stdin=None,
timeout=30,
print_output=True,
print_prefix="[local] ",
)
Sudo Retry Logic
from pyinfra.connectors.util import execute_command_with_sudo_retry
def execute():
# Your execution logic
return exit_code, output
exit_code, output = execute_command_with_sudo_retry(
host,
arguments, # ConnectorArguments with _sudo, _sudo_password, etc.
execute,
)
Connector Data Types
Define typed connector data for better validation:from typing_extensions import TypedDict
from pyinfra.connectors.base import DataMeta
class SshConnectorData(TypedDict):
ssh_hostname: str
ssh_port: int
ssh_user: str
ssh_password: str
ssh_key: str
data_meta = {
"ssh_hostname": DataMeta("SSH hostname"),
"ssh_port": DataMeta("SSH port", default=22),
"ssh_user": DataMeta("SSH user", default="root"),
"ssh_password": DataMeta("SSH password"),
"ssh_key": DataMeta("SSH key filename"),
}
Testing Your Connector
Create a test script:# test_custom_connector.py
from pyinfra import host
from pyinfra.operations import server
# Test basic command execution
result = server.shell(
name="Test custom connector",
commands=["echo 'Hello from custom connector'"],
)
print(f"Command succeeded: {result.did_succeed()}")
print(f"Output: {result.stdout}")
# Test file operations
from pyinfra.operations import files
files.put(
name="Upload test file",
src="test.txt",
dest="/tmp/test.txt",
)
pyinfra @custom:myhost test_custom_connector.py
Best Practices
- Error Handling: Always raise
ConnectErrorfor connection failures - Logging: Use pyinfra’s logger for consistent output
- Type Safety: Use TypedDict for connector data and type hints throughout
- Resource Cleanup: Implement
disconnect()to clean up connections - Timeout Handling: Respect timeout arguments for all operations
- Sudo Support: Use
make_unix_command_for_hostto handle sudo/su properly - Test Thoroughly: Test with various commands, file operations, and error conditions
- Documentation: Document connector usage and configuration options
- Connection Pooling: Consider implementing connection reuse for performance
- Error Messages: Provide clear, actionable error messages
Next Steps
- Learn about Writing Operations that use your connector
- Explore Writing Facts to query host state
- See Performance Tuning for optimization strategies
- Check API Reference for complete API documentation
