Skip to main content
The server orchestration layer enables Flower ServerApps to coordinate federated learning across multiple data owners using SyftBox communication.

syftbox_flwr_server

Run a Flower ServerApp with SyftBox integration for coordinating FL training.

Function Signature

def syftbox_flwr_server(
    server_app: ServerApp,
    context: Context,
    datasites: list[str],
    app_name: str,
    project_dir: Optional[Path] = None,
) -> Context
Source: src/syft_flwr/fl_orchestrator/flower_server.py:15

Parameters

server_app
flwr.server.ServerApp
required
The Flower ServerApp instance containing the aggregation strategy
context
flwr.common.Context
required
Flower context with run configuration and state
datasites
list[str]
required
List of data owner email addresses to coordinate (e.g., [“do1@example.com”, “do2@example.com”])
app_name
str
required
Name of the FL application (e.g., “diabetes_prediction”)
project_dir
Path
default:"None"
Path to the FL project directory (reads transport config from pyproject.toml)
return
flwr.common.Context
Updated context after training completion

Workflow

  1. Setup: Create SyftGrid instance with datasites and transport
  2. Initialize: Set random run ID for the FL session
  3. Train: Execute Flower server with SyftGrid backend
  4. Cleanup: Send stop signals to all clients
  5. Return: Updated context with final state

Example Usage

from pathlib import Path
from flwr.server import ServerApp, ServerConfig
from flwr.server.strategy import FedAvg
from flwr.common import Context
from syft_flwr.fl_orchestrator import syftbox_flwr_server

# Define datasites (data owners)
datasites = [
    "dataowner1@example.com",
    "dataowner2@example.com",
    "dataowner3@example.com"
]

# Create server app with FedAvg strategy
strategy = FedAvg(
    fraction_fit=1.0,  # Use all clients for training
    fraction_evaluate=1.0,  # Use all clients for evaluation
    min_available_clients=3,  # Wait for all clients
)

config = ServerConfig(num_rounds=5)

def server_fn(context: Context):
    return strategy, config

app = ServerApp(server_fn=server_fn)

# Run aggregation server
if __name__ == "__main__":
    context = Context(state={}, run_config={})
    
    updated_context = syftbox_flwr_server(
        server_app=app,
        context=context,
        datasites=datasites,
        app_name="diabetes_prediction",
        project_dir=Path("./diabetes_fl_project")
    )
    
    print(f"Training complete! Final context: {updated_context}")

SyftGrid Integration

The server uses SyftGrid as the communication backend:
from syft_flwr.fl_orchestrator.syft_grid import SyftGrid

# Automatically created by syftbox_flwr_server
syft_grid = SyftGrid(
    app_name="flwr/diabetes_prediction",
    datasites=datasites,
    client=client  # Auto-detected SyftFlwrClient
)

# Set run ID
run_id = 12345
syft_grid.set_run(run_id)

# SyftGrid handles all message routing
See SyftGrid for detailed API reference.

Stop Signal Behavior

After training completes (success or failure), the server sends stop signals to all clients:
# Sent to each datasite
{
    "metadata": {
        "message_type": "SYSTEM",
        "group_id": "final",
        "dst_node_id": <client_node_id>
    },
    "content": {
        "config": {
            "action": "stop",
            "reason": "Server stopped"
        }
    }
}
Clients will gracefully shut down upon receiving this signal.

Error Handling

The server handles errors and still sends stop signals:
try:
    updated_context = run_server(
        syft_grid,
        context=context,
        loaded_server_app=server_app,
        server_app_dir=""
    )
    logger.info(f"Server completed with context: {updated_context}")
except Exception as e:
    logger.error(f"Server encountered an error: {str(e)}")
    logger.error(f"Traceback: {traceback.format_exc()}")
    updated_context = context  # Return original context
finally:
    syft_grid.send_stop_signal(group_id="final", reason="Server stopped")
    logger.info("Sending stop signals to the clients")

Transport Support

SyftBox Mode:
  • Full RPC/crypto stack with optional encryption
  • Real-time message delivery via filesystem watching
  • Futures database for response tracking
P2P Mode:
  • File-based messaging via Google Drive API
  • Polling-based message retrieval
  • No encryption (access control via Drive permissions)

Environment Variables

SYFTBOX_EMAIL
str
Email address for P2P mode (required when using P2P transport)
SYFT_FLWR_ENCRYPTION_ENABLED
str
default:"true"
Enable/disable encryption in SyftBox mode (“true” or “false”)
SYFT_FLWR_MSG_TIMEOUT
float
default:"120.0"
Maximum time (seconds) to wait for client responses
SYFT_FLWR_POLL_INTERVAL
float
default:"3.0"
Polling interval (seconds) for checking client responses

Complete Example

Full working example with custom strategy and metrics:
from pathlib import Path
from typing import Dict, List, Optional, Tuple
import numpy as np

from flwr.server import ServerApp, ServerConfig, ClientManager
from flwr.server.strategy import FedAvg
from flwr.common import (
    Context,
    Parameters,
    FitRes,
    EvaluateRes,
    Scalar,
    parameters_to_ndarrays,
    ndarrays_to_parameters
)
from syft_flwr.fl_orchestrator import syftbox_flwr_server

class DiabetesFedAvg(FedAvg):
    """Custom FedAvg with metrics aggregation for diabetes prediction."""
    
    def aggregate_fit(
        self,
        server_round: int,
        results: List[Tuple[ClientProxy, FitRes]],
        failures: List[BaseException]
    ) -> Tuple[Optional[Parameters], Dict[str, Scalar]]:
        """Aggregate model updates and log statistics."""
        
        # Call parent aggregation
        parameters_aggregated, metrics_aggregated = super().aggregate_fit(
            server_round, results, failures
        )
        
        # Log round statistics
        if results:
            total_examples = sum([r.num_examples for _, r in results])
            print(f"\n=== Round {server_round} ===")
            print(f"Clients participated: {len(results)}")
            print(f"Total training examples: {total_examples}")
            print(f"Failures: {len(failures)}")
            
            metrics_aggregated["num_clients"] = len(results)
            metrics_aggregated["total_examples"] = total_examples
        
        return parameters_aggregated, metrics_aggregated
    
    def aggregate_evaluate(
        self,
        server_round: int,
        results: List[Tuple[ClientProxy, EvaluateRes]],
        failures: List[BaseException]
    ) -> Tuple[Optional[float], Dict[str, Scalar]]:
        """Aggregate evaluation results."""
        
        if not results:
            return None, {}
        
        # Weighted average of losses
        total_examples = sum([r.num_examples for _, r in results])
        weighted_losses = [
            r.loss * r.num_examples for _, r in results
        ]
        avg_loss = sum(weighted_losses) / total_examples
        
        # Aggregate accuracy metrics
        accuracies = [
            r.metrics.get("accuracy", 0.0) * r.num_examples
            for _, r in results
        ]
        avg_accuracy = sum(accuracies) / total_examples
        
        print(f"\n=== Evaluation Round {server_round} ===")
        print(f"Average loss: {avg_loss:.4f}")
        print(f"Average accuracy: {avg_accuracy:.4f}")
        
        metrics = {
            "accuracy": avg_accuracy,
            "num_clients_evaluated": len(results)
        }
        
        return avg_loss, metrics

# Configuration
datasites = [
    "hospital1@example.com",
    "hospital2@example.com",
    "hospital3@example.com"
]

strategy = DiabetesFedAvg(
    fraction_fit=1.0,
    fraction_evaluate=1.0,
    min_fit_clients=3,
    min_evaluate_clients=3,
    min_available_clients=3
)

config = ServerConfig(num_rounds=10)

def server_fn(context: Context):
    return strategy, config

app = ServerApp(server_fn=server_fn)

if __name__ == "__main__":
    print("Starting Diabetes Prediction FL Server")
    print(f"Datasites: {datasites}")
    
    context = Context(state={}, run_config={})
    
    final_context = syftbox_flwr_server(
        server_app=app,
        context=context,
        datasites=datasites,
        app_name="diabetes_prediction",
        project_dir=Path("./diabetes_fl_project")
    )
    
    print("\n=== Training Complete ===")
    print(f"Final context state: {final_context.state}")

Run ID Assignment

The server assigns a random run ID for each FL session:
from random import randint

run_id = randint(0, 1000)
syft_grid.set_run(run_id)
In production deployments, you may want to use a more sophisticated run ID generation scheme (e.g., timestamp-based, UUID, or database-assigned IDs).

See Also

Build docs developers (and LLMs) love