Skip to main content
The P2P transport enables federated learning in cloud environments without local SyftBox installations. It uses Google Drive API for file-based message passing between participants.

SyftP2PClient

Lightweight client for P2P sync mode using Google Drive API.

Class Definition

from syft_flwr.client import SyftP2PClient

class SyftP2PClient(SyftFlwrClient):
    """Client for P2P (Google Drive) sync mode using Google Drive API.
    
    This client is used when FL jobs are submitted via syft_client's
    Google Drive-based sync system.
    
    Key differences from syft_core:
    - Uses Google Drive API directly (via GDriveFileIO) instead of filesystem paths
    - No RPC/crypto/event - Google Drive handles transport and access control
    - Email comes from environment variable or explicit parameter
    """
Source: src/syft_flwr/client/syft_p2p_client.py:8

Constructor

email
str
required
The user’s email address for Google Drive access
Example:
from syft_flwr.client import SyftP2PClient

client = SyftP2PClient(email="user@example.com")
print(client.email)  # user@example.com

Class Methods

from_env

@classmethod
def from_env(cls) -> SyftP2PClient
Create a client from environment variables set by the job runner. Environment Variables:
  • SYFTBOX_EMAIL: The data owner’s email (set by job_runner.py)
return
SyftP2PClient
Initialized client instance
Raises:
  • ValueError: If SYFTBOX_EMAIL is not set
Example:
import os
os.environ["SYFTBOX_EMAIL"] = "user@example.com"

from syft_flwr.client import SyftP2PClient

client = SyftP2PClient.from_env()
print(client.email)  # user@example.com

Properties

email

@property
def email(self) -> str
return
str
User’s email address

syftbox_folder

@property
def syftbox_folder(self) -> Path
return
Path
Logical path to SyftBox folder in Google Drive (“SyftBox”)

config_path

@property
def config_path(self) -> Path
return
Path
Logical path to config directory (“SyftBox/.config”)
In P2P mode, there is no actual config file - this returns a logical path for compatibility.

my_datasite

@property
def my_datasite(self) -> Path
return
Path
Logical path to user’s datasite in Google Drive (e.g., “SyftBox/user@example.com”)

datasites

@property
def datasites(self) -> Path
return
Path
Logical path to datasites root (“SyftBox”)

Methods

app_data

def app_data(
    self,
    app_name: Optional[str] = None,
    datasite: Optional[str] = None,
) -> Path
Get the app data directory path in Google Drive.
app_name
str
default:"None"
Name of the application (e.g., “flwr/my_fl_app”)
datasite
str
default:"self.email"
Email address of the datasite owner
return
Path
Logical path to app data directory in Google Drive
Example:
client = SyftP2PClient(email="user@example.com")

# Get app data for current user
app_path = client.app_data("flwr/diabetes")
print(app_path)
# SyftBox/user@example.com/app_data/flwr/diabetes

# Get app data for another datasite
other_path = client.app_data("flwr/diabetes", "other@example.com")
print(other_path)
# SyftBox/other@example.com/app_data/flwr/diabetes

get_client

def get_client(self) -> SyftP2PClient
Return self - this IS the client (no underlying native client).
return
SyftP2PClient
Returns self for API compatibility

P2PFileRpc

File-based RPC adapter using Google Drive API for message passing.

Class Definition

from syft_flwr.rpc import P2PFileRpc

class P2PFileRpc(SyftFlwrRpc):
    """P2P File-based RPC adapter using Google Drive API via syft-client.
    
    Instead of using filesystem paths, this adapter uses GDriveFileIO to:
    - Write .request files to the shared outbox folder via Google Drive API
    - Poll for .response files in the inbox folder via Google Drive API
    - Uses in-memory tracking for pending futures
    """
Source: src/syft_flwr/rpc/p2p_file_rpc.py:12

Directory Structure

Messages are organized in Google Drive using inbox/outbox folders:
SyftBox/
├── syft_outbox_inbox_sender_to_recipient/
│   └── {app_name}/
│       └── rpc/
│           └── {endpoint}/
│               └── {future_id}.request
└── syft_outbox_inbox_recipient_to_sender/
    └── {app_name}/
        └── rpc/
            └── {endpoint}/
                └── {future_id}.response

Constructor

sender_email
str
required
Email address of the sender
app_name
str
required
Name of the FL application

Methods

send

def send(
    self,
    to_email: str,
    app_name: str,
    endpoint: str,
    body: bytes,
    encrypt: bool = False,
) -> str
Send a message by writing a .request file to Google Drive.
to_email
str
required
Recipient’s email address
app_name
str
required
Name of the FL application
endpoint
str
required
RPC endpoint (e.g., “messages”)
body
bytes
required
Message body as bytes
encrypt
bool
default:false
Ignored (encryption not yet supported in P2P mode)
return
str
Future ID (UUID) for tracking the response
Encryption is not yet supported in P2P mode. Setting encrypt=True will log a warning and send unencrypted.
Example:
from syft_flwr.rpc import P2PFileRpc

rpc = P2PFileRpc(
    sender_email="sender@example.com",
    app_name="flwr/my_app"
)

future_id = rpc.send(
    to_email="recipient@example.com",
    app_name="flwr/my_app",
    endpoint="messages",
    body=b"serialized_message",
    encrypt=False
)

print(f"Sent message, future_id: {future_id}")

get_response

def get_response(self, future_id: str) -> Optional[bytes]
Poll for a .response file in the inbox via Google Drive API.
future_id
str
required
The future ID returned by send()
return
Optional[bytes]
Response body as bytes, or None if not available yet
Example:
import time

future_id = rpc.send(
    to_email="recipient@example.com",
    app_name="flwr/my_app",
    endpoint="messages",
    body=b"request_data"
)

# Poll for response
while True:
    response = rpc.get_response(future_id)
    if response is not None:
        print(f"Got response: {len(response)} bytes")
        break
    time.sleep(2)  # Wait before polling again

delete_future

def delete_future(self, future_id: str) -> None
Clear in-memory tracking for a future ID.
future_id
str
required
The future ID to remove from tracking
Unlike SyftRpc, this only clears in-memory tracking. Response files are owned by the data owner and cannot be deleted by the data scientist.
Example:
response = rpc.get_response(future_id)
if response is not None:
    process_response(response)
    rpc.delete_future(future_id)  # Clean up tracking

P2PFileEvents

Polling-based event handler for processing incoming messages from Google Drive.

Class Definition

from syft_flwr.events import P2PFileEvents

class P2PFileEvents(SyftFlwrEvents):
    """P2P File-based polling events using Google Drive API via syft-client.
    
    This adapter:
    - Polls inbox folders for incoming .request files (from other participants) via Google Drive API
    - Calls the registered handler with the message bytes
    - Writes the response to the outbox folder (back to sender) via Google Drive API
    """
Source: src/syft_flwr/events/p2p_fle_events.py:18

Constructor

app_name
str
required
Name of the FL application
client_email
str
required
Email address of the client
poll_interval
float
Polling interval in seconds
max_processed_requests
int
default:10000
Maximum number of processed request IDs to track (LRU eviction)

Properties

client_email

@property
def client_email(self) -> str
return
str
Email address of the current client

app_dir

@property
def app_dir(self) -> Path
return
Path
Logical path to app data directory in Google Drive

is_running

@property
def is_running(self) -> bool
return
bool
Whether the polling loop is currently running

Methods

on_request

def on_request(
    self,
    endpoint: str,
    handler: Callable[[bytes], Optional[Union[str, bytes]]],
    auto_decrypt: bool = True,
    encrypt_reply: bool = False,
) -> None
Register a handler for incoming messages at an endpoint.
endpoint
str
required
The endpoint path (e.g., “/messages”)
handler
Callable[[bytes], Optional[Union[str, bytes]]]
required
Function that receives message bytes and returns response
auto_decrypt
bool
default:true
Ignored in P2P mode (Google Drive handles access control)
encrypt_reply
bool
default:false
Ignored in P2P mode
auto_decrypt and encrypt_reply parameters are ignored in P2P mode since Google Drive handles access control instead of X3DH encryption.
Example:
from syft_flwr.events import P2PFileEvents

events = P2PFileEvents(
    app_name="flwr/my_app",
    client_email="user@example.com",
    poll_interval=2.0
)

def handle_message(body: bytes) -> bytes:
    """Process incoming FL message."""
    print(f"Received: {len(body)} bytes")
    # Process message...
    return b"response_data"

events.on_request(
    endpoint="/messages",
    handler=handle_message
)

print("Starting polling loop...")
events.run_forever()

run_forever

def run_forever(self) -> None
Start the polling loop and block until stopped. Continuously polls Google Drive for new .request files. Polling Behavior:
  • Checks all inbox folders for messages from known senders
  • Processes each endpoint registered via on_request()
  • Tracks processed requests to avoid reprocessing (LRU cache)
  • Sleeps for poll_interval seconds between polls
Example:
try:
    events.run_forever()
except KeyboardInterrupt:
    print("Shutting down...")
    events.stop()

stop

def stop(self) -> None
Signal the polling loop to stop gracefully.

Usage Example

Complete example of using P2P transport for federated learning:
import os
from syft_flwr.client import create_client
from syft_flwr.rpc import create_rpc
from syft_flwr.events import create_events_watcher

# Set environment for P2P mode
os.environ["SYFTBOX_EMAIL"] = "dataowner@example.com"

# Create client (auto-detects P2P from config)
client = create_client(
    transport="p2p",
    email="dataowner@example.com"
)

# Create RPC adapter (returns P2PFileRpc)
rpc = create_rpc(client=client, app_name="flwr/diabetes")

# Send a message
future_id = rpc.send(
    to_email="aggregator@example.com",
    app_name="flwr/diabetes",
    endpoint="messages",
    body=b"training_update"
)

# Create events watcher (returns P2PFileEvents)
events = create_events_watcher(
    app_name="flwr/diabetes",
    client=client,
    poll_interval=2.0
)

def handle_training_request(body: bytes) -> bytes:
    """Handle incoming training requests from aggregator."""
    # Train local model...
    return b"local_model_weights"

events.on_request("/messages", handle_training_request)
events.run_forever()

Limitations

Current Limitations:
  • End-to-end encryption not yet implemented (relies on Google Drive access control)
  • Polling-based (higher latency than watchdog-based SyftBox)
  • Requires Google Drive API credentials
  • Cannot delete response files (owned by recipient)

See Also

Build docs developers (and LLMs) love