Skip to main content

Overview

The pipeline module orchestrates the full Magpie workflow: task classification → blueprint execution → CI loops → PR creation. It’s the main entry point for running autonomous coding tasks.

Core Function

run_pipeline()

Runs the complete Magpie pipeline from task to PR.
pub async fn run_pipeline(
    platform: &dyn ChatPlatform,
    channel_id: &str,
    user: &str,
    task: &str,
    config: &PipelineConfig,
) -> Result<PipelineResult>
platform
&dyn ChatPlatform
required
Chat platform adapter (Discord, Teams, CLI) for sending progress updates
channel_id
&str
required
Channel/thread ID where the task originated
user
&str
required
User who requested the task
task
&str
required
Natural language task description (e.g. “add OAuth2 login”)
config
&PipelineConfig
required
Pipeline configuration (repo, commands, CI rounds, etc.)
Result<PipelineResult>
PipelineResult
Returns the pipeline outcome including PR URL, CI status, and agent output
Pipeline Flow:
  1. Fetch conversation history from chat platform
  2. Create sandbox (Daytona remote or local)
  3. Clone repo (if github_org is configured)
  4. Generate branch name via Tier 1 LLM
  5. Create git branch
  6. Classify task complexity (Simple/Standard/BugFix)
  7. Execute appropriate blueprint
  8. Run CI loop (lint + test) with fix retries
  9. Generate commit message via LLM
  10. Commit, push, create PR via gh
  11. Update Plane issue (if configured)
  12. Cleanup sandbox
Example:
use magpie_core::{run_pipeline, PipelineConfig};

let config = PipelineConfig {
    repo_dir: PathBuf::from("/workspace/my-repo"),
    base_branch: "main".to_string(),
    test_command: "cargo test".to_string(),
    lint_command: "cargo clippy".to_string(),
    max_ci_rounds: 2,
    ..Default::default()
};

let platform = CliPlatform;
let result = run_pipeline(
    &platform,
    "cli",
    "alice",
    "fix the login bug",
    &config,
).await?;

println!("Status: {:?}", result.status);
println!("PR URL: {:?}", result.pr_url);

Configuration Types

PipelineConfig

Configuration for a pipeline run.
pub struct PipelineConfig {
    pub repo_dir: PathBuf,
    pub base_branch: String,
    pub test_command: String,
    pub lint_command: String,
    pub max_ci_rounds: u32,
    pub plane: Option<PlaneConfig>,
    pub dry_run: bool,
    pub github_org: Option<String>,
    pub trace_dir: Option<PathBuf>,
    pub daytona: Option<DaytonaConfig>,
    #[cfg(feature = "daytona")]
    pub pool: Option<Arc<WarmPool>>,
}
repo_dir
PathBuf
required
Local repository directory (used when github_org is not set)
base_branch
String
default:"main"
Base branch for PRs (e.g. “main”, “develop”)
test_command
String
default:"cargo test"
Test command for CI validation
lint_command
String
default:"cargo clippy"
Lint command for CI validation
max_ci_rounds
u32
default:"2"
Maximum CI retry attempts if tests fail
plane
Option<PlaneConfig>
Plane.so integration config for issue tracking
dry_run
bool
default:"false"
When true, agent steps are replaced with shell echo stubs
github_org
Option<String>
GitHub org to restrict repo access. When set, parses repo from task message and clones it
trace_dir
Option<PathBuf>
Directory for JSONL trace files. When set, all agent calls are traced
daytona
Option<DaytonaConfig>
Daytona sandbox config. When set, runs commands remotely in Daytona
pool
Option<Arc<WarmPool>>
Warm sandbox pool (feature-gated). Enables pre-built sandbox reuse

PipelineResult

Outcome of a pipeline run.
pub struct PipelineResult {
    pub output: String,
    pub pr_url: Option<String>,
    pub plane_issue_id: Option<String>,
    pub ci_passed: bool,
    pub rounds_used: u32,
    pub status: PipelineStatus,
}
output
String
Final agent output (typically success message or error details)
pr_url
Option<String>
GitHub PR URL if successfully created
plane_issue_id
Option<String>
Plane issue ID if integration is configured
ci_passed
bool
Whether lint + test commands passed
rounds_used
u32
Number of CI rounds executed (1 = first attempt, 2+ = retries)
status
PipelineStatus
High-level outcome classification

PipelineStatus

High-level pipeline outcome.
pub enum PipelineStatus {
    Success,          // Agent completed + CI passed
    PartialSuccess,   // Agent completed + CI failed
    AgentFailed,      // Agent errored during execution
    SetupFailed,      // Git/sandbox setup failed
}

Task Classification

classify_task()

Classifies a task as Simple, Standard, or BugFix to determine which blueprint to execute.
pub async fn classify_task(
    task: &str,
    dry_run: bool,
    trace_dir: Option<&PathBuf>,
) -> TaskComplexity
task
&str
required
Task description to classify
dry_run
bool
required
When true, ambiguous tasks default to Simple without calling Claude
trace_dir
Option<&PathBuf>
Directory for trace logging
TaskComplexity
enum
  • Simple: Docs, typos, renames (single-shot agent call)
  • Standard: Features, refactors (TDD blueprint)
  • BugFix: Bug fixes (diagnostic blueprint with investigation step)
Classification Strategy:
  1. Keyword matching for clear-cut cases:
    • Simple: “fix typo”, “update readme”, “fix docs”
    • BugFix: “fix bug”, “fix crash”, “investigate”, “broken”
    • Standard: “add”, “implement”, “refactor”, “integrate”
  2. Claude classification for ambiguous tasks (Tier 1 call)
  3. Default to Simple in dry_run mode

Blueprint Builders

build_main_blueprint()

Builds the Simple blueprint: validate → agent.
pub fn build_main_blueprint(
    trigger: &TriggerContext,
    config: &PipelineConfig,
    working_dir: &str,
) -> Result<(Blueprint, StepContext)>
Used for Simple-complexity tasks (docs, typos). Single agent call with no structured phases.

build_tdd_blueprint()

Builds the TDD blueprint: scan → plan → write-tests → verify-fail → implement → test → lint.
pub fn build_tdd_blueprint(
    trigger: &TriggerContext,
    config: &PipelineConfig,
    working_dir: &str,
) -> Result<(Blueprint, StepContext)>
Used for Standard-complexity tasks. Implements test-driven development flow:
  1. scan-repo: Get file tree for context
  2. plan: Agent creates implementation plan
  3. write-tests: Agent writes tests ONLY (no implementation)
  4. verify-tests-fail: Run tests, expect failure (TDD red phase)
  5. implement: Agent writes implementation to make tests pass
  6. run-tests: Run tests, expect success (TDD green phase)
  7. lint-check: Run linter

build_diagnostic_blueprint()

Builds the Diagnostic blueprint: scan → investigate → plan → write-regression-test → verify-fail → fix → test → lint.
pub fn build_diagnostic_blueprint(
    trigger: &TriggerContext,
    config: &PipelineConfig,
    working_dir: &str,
) -> Result<(Blueprint, StepContext)>
Used for BugFix-complexity tasks. Key difference from TDD: investigate step forces root cause analysis before planning.
  1. scan-repo: Get file tree
  2. investigate: Agent traces root cause WITHOUT modifying files
  3. plan: Agent creates targeted fix plan based on investigation
  4. write-regression-test: Agent writes test that reproduces the bug
  5. verify-test-fails: Run test, expect failure
  6. implement-fix: Agent fixes the root cause
  7. run-tests: Run all tests, expect success
  8. lint-check: Run linter

build_fix_blueprint()

Builds a Fix blueprint for CI retry rounds.
pub fn build_fix_blueprint(
    trigger: &TriggerContext,
    config: &PipelineConfig,
    test_output: &str,
    working_dir: &str,
) -> Result<(Blueprint, StepContext)>
Used when CI fails. Agent receives test failure output and fixes the code. Single agent step with failure context injected.

Helper Functions

ensure_multi_word_slug()

Ensures a branch slug has at least two hyphen-separated words.
pub fn ensure_multi_word_slug(slug: &str, task: &str) -> String
AI sometimes returns single-word slugs like “authentication”. This enriches them:
  • If task starts with a verb (“add”, “fix”), prepend it: "fix" + "pipeline""fix-pipeline"
  • Otherwise fall back to slugify(task) for a natural multi-word slug
Example:
let slug = ensure_multi_word_slug("authentication", "add OAuth2 login");
assert_eq!(slug, "add-authentication");

let slug = ensure_multi_word_slug("fix-bug", "fix the login bug");
assert_eq!(slug, "fix-bug"); // Already multi-word, unchanged

Real-World Example

use magpie_core::{run_pipeline, PipelineConfig, ChatPlatform};
use std::path::PathBuf;

#[async_trait::async_trait]
impl ChatPlatform for MyPlatform {
    fn name(&self) -> &str { "myplatform" }
    async fn fetch_history(&self, channel_id: &str) -> Result<String> {
        // Fetch conversation history
        Ok(self.get_messages(channel_id).await?)
    }
    async fn send_message(&self, channel_id: &str, text: &str) -> Result<()> {
        self.post_message(channel_id, text).await
    }
}

let config = PipelineConfig {
    repo_dir: PathBuf::from("/workspace/my-repo"),
    base_branch: "main".to_string(),
    test_command: "npm test".to_string(),
    lint_command: "npm run lint".to_string(),
    max_ci_rounds: 3,
    github_org: Some("my-org".to_string()),
    trace_dir: Some(PathBuf::from("/tmp/traces")),
    ..Default::default()
};

let platform = MyPlatform::new();
let result = run_pipeline(
    &platform,
    "channel-123",
    "alice",
    "add health check endpoint in my-repo",
    &config,
).await?;

match result.status {
    PipelineStatus::Success => {
        println!("Success! PR: {}", result.pr_url.unwrap());
    }
    PipelineStatus::PartialSuccess => {
        println!("Code written but CI failed after {} rounds", result.rounds_used);
    }
    PipelineStatus::AgentFailed => {
        println!("Agent error: {}", result.output);
    }
    PipelineStatus::SetupFailed => {
        println!("Setup error: {}", result.output);
    }
}

Build docs developers (and LLMs) love