Skip to main content

Documentation Index

Fetch the complete documentation index at: https://mintlify.com/dallay/corvus/llms.txt

Use this file to discover all available pages before exploring further.

Custom Channel

Build a custom channel to integrate any messaging platform with Corvus.

Overview

Channels implement the Channel trait from src/channels/traits.rs. The core methods are send() and listen().

Step 1: Create the Channel

Create src/channels/my_channel.rs:
use crate::channels::traits::{Channel, ChannelMessage, SendMessage};
use anyhow::Result;
use async_trait::async_trait;
use reqwest::Client;
use tokio::sync::mpsc;

pub struct MyChannel {
    api_token: String,
    api_url: String,
    allowed_users: Vec<String>,
    client: Client,
}

impl MyChannel {
    pub fn new(api_token: &str, api_url: &str, allowed_users: Vec<String>) -> Self {
        Self {
            api_token: api_token.to_string(),
            api_url: api_url.to_string(),
            allowed_users,
            client: Client::new(),
        }
    }
}

#[async_trait]
impl Channel for MyChannel {
    fn name(&self) -> &str {
        "my_channel"
    }
    
    async fn send(&self, message: &SendMessage) -> Result<()> {
        let url = format!("{}/sendMessage", self.api_url);
        
        self.client
            .post(&url)
            .header("Authorization", format!("Bearer {}", self.api_token))
            .json(&serde_json::json!({
                "recipient": message.recipient,
                "content": message.content,
            }))
            .send()
            .await?;
        
        Ok(())
    }
    
    async fn listen(&self, tx: mpsc::Sender<ChannelMessage>) -> Result<()> {
        let mut offset = 0i64;
        
        loop {
            let url = format!("{}/getUpdates", self.api_url);
            
            let resp = self.client
                .get(&url)
                .header("Authorization", format!("Bearer {}", self.api_token))
                .query(&[("offset", offset.to_string())])
                .send()
                .await?
                .json::<serde_json::Value>()
                .await?;
            
            if let Some(updates) = resp["updates"].as_array() {
                for update in updates {
                    if let Some(msg) = update.get("message") {
                        let sender = msg["from"]["username"]
                            .as_str()
                            .unwrap_or("unknown")
                            .to_string();
                        
                        // Check allowlist
                        if !self.allowed_users.is_empty() 
                            && !self.allowed_users.contains(&sender) {
                            tracing::warn!("Ignoring message from unauthorized user: {}", sender);
                            continue;
                        }
                        
                        let channel_msg = ChannelMessage {
                            id: msg["id"].to_string(),
                            sender: sender.clone(),
                            reply_target: msg["chat"]["id"].to_string(),
                            content: msg["text"].as_str().unwrap_or("").to_string(),
                            channel: "my_channel".into(),
                            timestamp: msg["timestamp"].as_u64().unwrap_or(0),
                        };
                        
                        if tx.send(channel_msg).await.is_err() {
                            return Ok(());
                        }
                    }
                    
                    offset = update["id"].as_i64().unwrap_or(offset) + 1;
                }
            }
            
            tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
        }
    }
    
    async fn health_check(&self) -> bool {
        let url = format!("{}/healthCheck", self.api_url);
        
        self.client
            .get(&url)
            .header("Authorization", format!("Bearer {}", self.api_token))
            .send()
            .await
            .map(|r| r.status().is_success())
            .unwrap_or(false)
    }
}

Step 2: Register the Channel

Add to src/channels/mod.rs:
pub mod my_channel;

pub async fn create_channel(config: &ChannelConfig) -> Result<Box<dyn Channel>> {
    match config.channel_type.as_str() {
        "my_channel" => Ok(Box::new(my_channel::MyChannel::new(
            &config.api_token,
            &config.api_url,
            config.allowed_users.clone(),
        ))),
        // ... existing channels
        _ => Err(anyhow::anyhow!("Unknown channel type: {}", config.channel_type)),
    }
}

Step 3: Configure

Update ~/.corvus/config.toml:
[channels_config.my_channel]
api_token = "your-token"
api_url = "https://api.example.com"
allowed_users = ["alice", "bob"]

Step 4: Start Channel

corvus channel start
Or in daemon mode:
corvus daemon

Advanced Features

Typing Indicator

impl Channel for MyChannel {
    async fn start_typing(&self, recipient: &str) -> Result<()> {
        let url = format!("{}/sendChatAction", self.api_url);
        
        self.client
            .post(&url)
            .json(&serde_json::json!({
                "recipient": recipient,
                "action": "typing",
            }))
            .send()
            .await?;
        
        Ok(())
    }
}

Draft Updates (Progressive Streaming)

impl Channel for MyChannel {
    fn supports_draft_updates(&self) -> bool {
        true
    }
    
    async fn send_draft(&self, message: &SendMessage) -> Result<Option<String>> {
        let resp = self.client
            .post(format!("{}/sendMessage", self.api_url))
            .json(&serde_json::json!({
                "recipient": message.recipient,
                "content": "...",
            }))
            .send()
            .await?
            .json::<serde_json::Value>()
            .await?;
        
        let message_id = resp["message_id"].to_string();
        Ok(Some(message_id))
    }
    
    async fn update_draft(
        &self,
        recipient: &str,
        message_id: &str,
        text: &str,
    ) -> Result<()> {
        self.client
            .post(format!("{}/editMessage", self.api_url))
            .json(&serde_json::json!({
                "recipient": recipient,
                "message_id": message_id,
                "content": text,
            }))
            .send()
            .await?;
        
        Ok(())
    }
}

Full Example

See examples/custom_channel.rs:29-119 for a complete Telegram integration.

Build docs developers (and LLMs) love