Overview
The Task Manager module provides a comprehensive framework for managing and executing code analysis tasks. It handles task initialization, environment setup, agent execution, and result management.
Core Components
TaskManager
Main class for task management operations.
Static Methods
get_work_dir
Retrieve the current working directory from application configuration.
@staticmethod
def get_work_dir() -> str
Returns: Current session’s working directory path
get_task_prompt
Get the standard task prompt template.
@staticmethod
def get_task_prompt() -> str
Returns: Task description template with placeholders for:
{task_description}: Main task description
{repo_path}: Repository absolute path
{input_data}: Input file paths and descriptions
{output_dir_path}: Output directory path
load_config
Load task configuration from YAML file.
@staticmethod
def load_config(config_path: str) -> dict
Path to YAML configuration file
Returns: Configuration dictionary
Example:
from src.core.git_task import TaskManager
config = TaskManager.load_config("tasks/my_task.yaml")
initialize_tasks
Initialize task environment and create task information.
@staticmethod
def initialize_tasks(args, root_path: str = 'coding') -> dict
Arguments object containing config_data with task configuration
Root path for task workspace creation
Returns: Task information dictionary with structure:
{
'repo': {...}, # Repository information
'task_description': str, # Task description
'task_prompt': str, # Formatted prompt
'input_data': [...], # Input data paths
'parameters': {...}, # Additional parameters
'root_path': str, # Root directory
'work_task_path': str, # Task workspace path
'task_id': str # Unique task identifier
}
prepare_task_description
Generate complete task description with all placeholders replaced.
@staticmethod
def prepare_task_description(
task_info: dict,
target_output_path: str,
target_input_data: Union[list, str],
target_repo_path: str
) -> str
Task information dictionary
Path where output files should be saved
Path to target repository
Returns: Formatted task description ready for agent execution
AgentRunner
Execute code analysis agents to complete tasks.
Static Methods
run_agent
Execute a code analysis agent for a specific task.
@staticmethod
def run_agent(
task_info: dict,
retry_times: int = 2,
work_dir: Optional[str] = None
) -> str
Complete task information dictionary
Number of retry attempts if task fails
Custom working directory (auto-generated if not provided)
Returns: Agent’s response/answer after task completion
Example:
from src.core.git_task import AgentRunner, TaskManager
# Load and initialize task
config = TaskManager.load_config("config.yaml")
args = type('Args', (), {'config_data': config, 'retry': 2})()
task_info = TaskManager.initialize_tasks(args)
# Run agent
answer = AgentRunner.run_agent(
task_info,
retry_times=2,
work_dir="/tmp/custom_workspace"
)
run_sequential
Execute all tasks sequentially (non-parallel mode).
@staticmethod
def run_sequential(args)
Arguments object with task configuration
Example:
import argparse
parser = argparse.ArgumentParser()
args = parser.parse_args()
args.config_data = TaskManager.load_config("config.yaml")
AgentRunner.run_sequential(args)
Utility Classes
PathManager
Handle path creation and management operations.
Methods
class PathManager:
@staticmethod
def generate_task_id() -> str:
"""Generate unique task ID with timestamp"""
@staticmethod
def create_unique_path(base_path: str) -> str:
"""Create unique non-existing path"""
@staticmethod
def create_unique_dir(base_path: str, prefix: str) -> str:
"""Create unique directory with prefix"""
@staticmethod
def get_dir_size(path: str) -> int:
"""Get directory size in bytes"""
@staticmethod
def check_code_files(
repo_path: str,
extensions: list = [".py", ".ipynb"]
) -> bool:
"""Check if repo contains code files with extensions"""
Example:
from src.core.git_task import PathManager
# Generate unique task ID
task_id = PathManager.generate_task_id()
# Returns: 'gitbench_0304_1425'
# Create unique workspace
workspace = PathManager.create_unique_path('workspaces')
# Returns: 'workspaces/gitbench_0304_1425'
# Check for Python files
has_code = PathManager.check_code_files('/path/to/repo', ['.py', '.js'])
DataProcessor
Handle data operations like copying and extraction.
Methods
class DataProcessor:
@staticmethod
def copy_dataset(data_path: str, target_path: str) -> str:
"""Copy or link dataset to target location"""
@staticmethod
def unzip_data(data_path: str):
"""Extract zip files in dataset directory"""
@staticmethod
def setup_task_environment(
task_info: dict,
work_dir: str
) -> Tuple[str, list, str]:
"""Setup complete task environment
Returns: (output_path, input_data, repo_path)
"""
Example:
from src.core.git_task import DataProcessor
# Setup task environment
task_info = {
'repo': {'type': 'local', 'path': '/source/repo'},
'input_data': [
{'path': '/data/input.csv', 'description': 'Training data'}
]
}
output_path, input_data, repo_path = DataProcessor.setup_task_environment(
task_info,
'/workspace'
)
BaseCodeExplorer
Base class providing virtual environment management for agents.
class BaseCodeExplorer:
def __init__(
self,
work_dir: str,
use_venv: bool = False,
task_id: Optional[str] = None,
is_cleanup_venv: bool = True
)
Working directory for agent operations
Whether to use virtual environment
Unique task identifier (auto-generated if not provided)
Whether to cleanup virtual environment after completion
Methods
_load_venv_context
Load or create virtual environment.
def _load_venv_context(
venv_dir: Optional[str] = None,
is_clear_venv: Optional[bool] = None,
base_venv_path: Optional[str] = None
)
Directory for virtual environment (defaults to ./.venvs)
Path to base environment to copy from (faster than fresh install)
cleanup_venv
Clean up virtual environment after task completion.
# Repository configuration
repo:
type: local # or 'github'
path: /path/to/repository
# url: https://github.com/user/repo.git # for github type
# Task description
task_description: |
Analyze the codebase and identify the main entry points
and core functionality modules.
# Optional: Custom task prompt template
task_prompt: |
{task_description}
Repository: {repo_path}
Input files: {input_data}
Output directory: {output_dir_path}
# Input data (optional)
input_data:
- path: /data/requirements.txt
description: Project dependencies
- path: /data/test_cases.json
description: Test cases to run
# Additional parameters
parameters:
max_depth: 5
include_tests: true
Complete Usage Example
import asyncio
from src.core.git_task import TaskManager, AgentRunner, PathManager
from src.core.agent_code_explore import CodeExplorer
# 1. Load configuration
config = TaskManager.load_config("tasks/analyze_repo.yaml")
# 2. Initialize task
class Args:
config_data = config
retry = 2
root_path = 'coding'
args = Args()
task_info = TaskManager.initialize_tasks(args)
# 3. Run agent
answer = AgentRunner.run_agent(
task_info,
retry_times=2
)
print(f"Task completed: {answer}")
Advanced Example: Custom Agent Execution
import asyncio
from src.core.agent_code_explore import CodeExplorer
from src.core.git_task import DataProcessor, TaskManager
# Setup environment
task_info = {
'repo': {'type': 'local', 'path': '/source/repo'},
'task_description': 'Analyze code quality',
'input_data': []
}
work_dir = '/tmp/workspace'
output_path, input_data, repo_path = DataProcessor.setup_task_environment(
task_info,
work_dir
)
# Create explorer with custom settings
explorer = CodeExplorer(
repo_path=repo_path,
work_dir=work_dir,
remote_repo_path=None,
task_type="custom_analysis",
use_venv=True,
is_cleanup_venv=False # Keep environment for next run
)
# Generate task prompt
task = TaskManager.prepare_task_description(
task_info,
output_path,
input_data,
repo_path
)
# Execute analysis
answer = asyncio.run(explorer.a_code_analysis(task, max_turns=20))
See Also