Use this file to discover all available pages before exploring further.
Context managers provide automatic resource creation and cleanup, ensuring resources are properly deleted even if errors occur. This is one of the most powerful features of the OpenShift Python Wrapper.
from ocp_resources.namespace import Namespacefrom ocp_resources.resource import get_clientclient = get_client()# Resource is created on entry, deleted on exitwith Namespace(client=client, name="test-namespace") as ns: print(f"Namespace {ns.name} exists: {ns.exists}") # True # Do work with the namespace ns.wait_for_status(status=Namespace.Status.ACTIVE, timeout=120)# Namespace is automatically deleted hereprint(f"Namespace exists: {ns.exists}") # False
from ocp_resources.pod import Podtry: with Pod( client=client, name="test-pod", namespace="default", containers=[{"name": "nginx", "image": "nginx:latest"}] ) as pod: # Wait for pod to be ready pod.wait_for_condition( condition=Pod.Condition.READY, status=Pod.Condition.Status.TRUE, timeout=300 ) # Simulate an error raise ValueError("Something went wrong")except ValueError as e: print(f"Error occurred: {e}") # Pod is still cleaned up automatically!
from ocp_resources.namespace import Namespace# Set teardown=False to keep resource after context exitswith Namespace( client=client, name="persistent-namespace", teardown=False # Won't be deleted) as ns: ns.wait_for_status(status=Namespace.Status.ACTIVE, timeout=60) print(f"Namespace created: {ns.name}")# Namespace still exists after contextprint(f"Still exists: {ns.exists}") # True# Manual cleanup if neededns.clean_up()
By default, __enter__ calls deploy(wait=False). You can change this:
from ocp_resources.pod import Pod# Create pod with wait_for_resource=Truewith Pod( client=client, namespace="default", name="nginx-pod", containers=[{"name": "nginx", "image": "nginx:latest"}], wait_for_resource=True # Waits for pod to exist before proceeding) as pod: # Pod is guaranteed to exist here print(f"Pod status: {pod.status}")
The wrapper supports environment variables for controlling cleanup:
# Skip teardown of specific resourcesexport SKIP_RESOURCE_TEARDOWN="{Pod: {my-pod: my-namespace}}"# Skip teardown of all resources of a kindexport SKIP_RESOURCE_TEARDOWN="{Namespace: {}}"# Skip teardown of multiple resourcesexport SKIP_RESOURCE_TEARDOWN="{Namespace: {test-ns:}, Pod: {test-pod: default}}"
from ocp_resources.pod import Pod# This pod won't be deleted if SKIP_RESOURCE_TEARDOWN is set appropriatelywith Pod( client=client, namespace="my-namespace", name="my-pod", containers=[{"name": "nginx", "image": "nginx"}]) as pod: # Do work pass# Cleanup skipped due to environment variable
from ocp_resources.exceptions import ResourceTeardownErrorfrom ocp_resources.pod import Podtry: with Pod( client=client, namespace="default", name="test-pod", containers=[{"name": "nginx", "image": "nginx"}] ) as pod: # Work with pod print(f"Pod created: {pod.name}")except ResourceTeardownError as e: print(f"Failed to clean up resource: {e}") # Handle cleanup failure
You can create custom resource classes with specialized cleanup:
from ocp_resources.secret import Secretclass SecretWithBackup(Secret): """Secret that backs up data before deletion.""" def __exit__(self, exc_type, exc_val, exc_tb): if self.exists: # Back up secret data secret_data = self.instance.to_dict() with open(f"{self.name}-backup.yaml", "w") as f: import yaml yaml.dump(secret_data, f) print(f"Backed up secret to {self.name}-backup.yaml") # Call parent cleanup super().__exit__(exc_type, exc_val, exc_tb)# Usagewith SecretWithBackup( client=client, namespace="default", name="important-secret", string_data={"key": "value"}) as secret: # Work with secret pass# Secret is backed up before deletion
Context managers create resources sequentially. For parallel creation:
from concurrent.futures import ThreadPoolExecutorfrom ocp_resources.pod import Poddef create_pod(client, namespace, name): """Create a pod.""" pod = Pod( client=client, namespace=namespace, name=name, containers=[{"name": "nginx", "image": "nginx"}], teardown=False # Manual cleanup ) pod.deploy() return podclient = get_client()pod_names = [f"pod-{i}" for i in range(10)]# Create pods in parallelwith ThreadPoolExecutor(max_workers=5) as executor: pods = list(executor.map( lambda name: create_pod(client, "default", name), pod_names ))try: # Work with pods for pod in pods: print(f"Pod {pod.name} exists: {pod.exists}")finally: # Clean up all pods for pod in pods: pod.clean_up(wait=False)