Skip to main content
The Server class manages multiple client connections, creating a Channel instance for each connected client. You can run it in the main thread or as a background daemon for peer-to-peer scenarios.

Creating a Server Subclass

To build a server, you need to:
1

Define a Channel subclass

Create a Channel subclass that handles client-specific logic and messages.
server.py
from repod import Channel, Server

class GameChannel(Channel):
    """Server-side channel for a connected client."""
    
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.player_name = "Guest"
    
    def Network_chat(self, data: dict) -> None:
        # Broadcast chat messages to all clients
        self.server.send_to_all({
            "action": "chat",
            "text": data["text"],
            "from": self.player_name
        })
2

Create a Server subclass

Subclass Server and set the channel_class attribute to your Channel subclass.
server.py
class GameServer(Server):
    channel_class = GameChannel
    
    def __init__(self, host: str = "127.0.0.1", port: int = 5071):
        super().__init__(host, port)
        self.players = {}  # Track connected players
3

Launch the server

Use the launch() method to start accepting connections.
server.py
if __name__ == "__main__":
    server = GameServer(host="0.0.0.0", port=5071)
    server.launch()  # Blocks until stopped

Handling Connections

The server provides lifecycle hooks for managing client connections:

on_connect

Called when a new client connects. Use this to initialize player state:
class GameServer(Server):
    channel_class = GameChannel
    
    def __init__(self, host: str = "127.0.0.1", port: int = 5071):
        super().__init__(host, port)
        self.players = {}
    
    def on_connect(self, channel: GameChannel, addr: tuple[str, int]) -> None:
        print(f"New player connected from {addr[0]}:{addr[1]}")
        self.players[channel] = channel.player_name
        
        # Notify all clients about the new player
        self.send_to_all({
            "action": "player_joined",
            "count": len(self.channels)
        })

on_disconnect

Called when a client disconnects. Use this for cleanup:
def on_disconnect(self, channel: GameChannel) -> None:
    if channel in self.players:
        player_name = self.players.pop(channel)
        print(f"Player {player_name} disconnected")
        
        # Notify remaining clients
        self.send_to_all({
            "action": "player_left",
            "name": player_name,
            "count": len(self.channels)
        })
The server automatically removes disconnected channels from self.channels. You only need to clean up your own application state.

Broadcasting Messages

Use send_to_all() to broadcast messages to every connected client:
class GameServer(Server):
    channel_class = GameChannel
    
    def update_game_state(self, state: dict) -> None:
        """Send game state to all connected players."""
        self.send_to_all({
            "action": "game_state",
            "state": state,
            "timestamp": time.time()
        })
For selective broadcasting, iterate over self.channels directly:
def send_to_team(self, team_id: int, data: dict) -> None:
    """Send a message only to players on a specific team."""
    for channel in self.channels:
        if channel.team_id == team_id:
            channel.send(data)

Complete Example

Here’s a complete chat server implementation:
from repod import Channel, Server

class ClientChannel(Channel):
    """Server-side channel for a connected chat client."""
    
    nickname: str
    
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.nickname = "Anonymous"
    
    def on_close(self) -> None:
        self.server.del_player(self)
    
    def Network_message(self, data: dict) -> None:
        self.server.broadcast({
            "action": "message",
            "text": data["text"],
            "nickname": self.nickname
        })
    
    def Network_nickname(self, data: dict) -> None:
        self.nickname = data["nickname"]
        self.server.send_players()

class ChatServer(Server):
    """Chat server that manages connected clients."""
    
    channel_class = ClientChannel
    
    def __init__(self, host: str = "127.0.0.1", port: int = 5071):
        super().__init__(host, port)
        self.players = {}
    
    def on_connect(self, channel: ClientChannel, addr: tuple[str, int]) -> None:
        self.players[channel] = channel.nickname
        print(f"Player connected: {addr}")
        self.broadcast({
            "action": "system",
            "text": f"{channel.nickname} joined the chat"
        })
        self.send_players()
    
    def del_player(self, player: ClientChannel) -> None:
        if player in self.players:
            nickname = self.players.pop(player)
            self.broadcast({
                "action": "system",
                "text": f"{nickname} left the chat"
            })
            self.send_players()
    
    def send_players(self) -> None:
        """Broadcast the current player list."""
        self.broadcast({
            "action": "players",
            "list": [p.nickname for p in self.players]
        })
    
    def broadcast(self, data: dict) -> None:
        """Send a message to all connected players."""
        for player in self.players:
            player.send(data)

if __name__ == "__main__":
    print("Chat server running on 127.0.0.1:5071")
    print("Press Ctrl+C to stop")
    ChatServer(host="127.0.0.1", port=5071).launch()

Server Properties

  • host - The hostname or IP address the server is bound to
  • port - The port number the server listens on
  • address - A (host, port) tuple property
  • channels - List of all currently connected Channel instances
  • channel_class - The Channel subclass to instantiate for new connections
Use host="0.0.0.0" to listen on all network interfaces, making your server accessible from other machines on the network.

Manual Start/Stop

For more control over the asyncio event loop, use start() and stop() directly:
import asyncio

async def main():
    server = GameServer(host="0.0.0.0", port=5071)
    await server.start()
    print("Server started")
    
    try:
        await server.run()  # Run forever
    except KeyboardInterrupt:
        pass
    finally:
        await server.stop()
        print("Server stopped")

asyncio.run(main())
The launch() method handles KeyboardInterrupt gracefully and ensures proper cleanup. If you manage the event loop manually, make sure to call stop() in a finally block.

Build docs developers (and LLMs) love