Skip to main content

Overview

The CommunicationManager class enables bidirectional serial communication between the Trash Classification AI System and external devices (like robotics systems, VEX Brain, Arduino, etc.). This allows you to send classification results and commands, and receive status updates.

Complete CommunicationManager Class

serial_com.py
import json
import time
import serial
import logging as log
from typing import Dict, Any, Optional
from threading import Thread, Event

log.basicConfig(level=log.INFO, format="%(asctime)s - %(levelname)s - %(message)s")


class CommunicationManager:
    def __init__(self, port: str = 'COM7', baudrate: int = 115200):
        self.port = port
        self.baudrate = baudrate
        self.message_end = b'\n'

        self.serial_port = None
        self.is_connected = False
        self._read_thread = None

        self.buffer = bytearray()
        self._stop_event = Event()

    def connect(self) -> bool:
        """Establish serial connection"""
        if self.is_connected:
            return True

        try:
            self.serial_port = serial.Serial(
                port=self.port,
                baudrate=self.baudrate,
                timeout=10,
                write_timeout=10
            )
            self.is_connected = True

            # Start read loop in separate thread
            self._stop_event.clear()
            self._read_thread = Thread(target=self._read_loop)
            self._read_thread.daemon = True
            self._read_thread.start()
            return True

        except Exception as e:
            log.error(f'Error connecting to serial port: {str(e)}')
            return False

    def send_message(self, message_type: str, data: dict) -> bool:
        """
        Send JSON message to robot
        message_type: Type of message ('command', 'status', 'classification', etc)
        data: Dictionary with message data
        """
        if not self.is_connected or not self.serial_port:
            log.error("Error: Serial port not initialized")
            return False

        try:
            message = {
                'type': message_type,
                'data': data,
            }
            encoded_message = json.dumps(message).encode() + self.message_end
            log.info(f'Sending message: {encoded_message}')
            self.serial_port.write(encoded_message)
            return True
        except Exception as e:
            log.error(f"Error sending message: {e}")
            return False

    def _read_loop(self):
        """Thread loop for reading messages from connected device"""
        while not self._stop_event.is_set():
            if self.serial_port and self.serial_port.in_waiting:
                try:
                    char = self.serial_port.read()
                    if char == self.message_end:
                        message = self.buffer.decode()
                        self.buffer = bytearray()
                        try:
                            data = json.loads(message)
                            self._process_message(data)
                        except json.JSONDecodeError:
                            log.error(f'Error decoding message: {message}')
                    else:
                        self.buffer.extend(char)
                except Exception as e:
                    log.error(f'Error reading serial port: {e}')
            time.sleep(0.01)

    def _process_message(self, message: dict):
        """Process incoming message from device"""
        try:
            msg_type = message.get('type', '').lower()
            data = message.get('data', {})

            if msg_type == 'check_service':
                state = data.get('state')
                log.info(f'{msg_type} status:\nstate: {state}')

            elif msg_type == 'safety_service':
                state = data.get('state')
                time_taken = data.get('time')
                log.info(f"{msg_type} status: \nstate: {state}, \ntime: {time_taken}s")
                if state == 'error':
                    log.error(f"Safety Service Error: {data.get('error_msg', 'Unknown error')}")

            elif msg_type == 'scan_service':
                state = data.get('state')
                if state == 'in_progress':
                    log.info(f"Scan Data - Object Detected: {data['object']}, "
                             f"Angle: {data['center_angle']}°, "
                             f"Distance: {data['distance']}mm, ")

                elif state == 'complete':
                    objects = data.get('objects', [])
                    log.info(f"Scan Complete - Objects detected: {len(objects)}")
                    for obj in objects:
                        log.info(f"Object at {obj['center_angle']}°, "
                                 f"width: {obj['width']}mm, "
                                 f"distance: {obj['distance']}mm, "
                                 f"size: {obj['max_size']}mm")
        except Exception as e:
            log.error(f'Error processing message: {e}')

    def close(self):
        """Close serial connection"""
        self._stop_event.set()
        if self._read_thread:
            self._read_thread.join(timeout=1.0)
        if self.serial_port and self.serial_port.is_open:
            self.serial_port.close()
            self.is_connected = False
            log.info('Serial connection closed')

Connection Setup

1

Initialize Communication Manager

Create an instance with your serial port configuration:
serial_manager = CommunicationManager(
    port='COM7',      # Windows: COM1, COM2, etc.
                      # Linux/Mac: /dev/ttyUSB0, /dev/ttyACM0, etc.
    baudrate=115200   # Must match device baudrate
)
Finding Your Port:
  • Windows: Check Device Manager → Ports (COM & LPT)
  • Linux: ls /dev/tty* or dmesg | grep tty
  • Mac: ls /dev/cu.*
2

Establish Connection

Connect to the serial device:
if serial_manager.connect():
    log.info("Connected to device")
else:
    log.error("Failed to connect")
The connection starts a background thread that continuously listens for incoming messages.
3

Send Messages

Send commands or data to the connected device:
serial_manager.send_message(
    message_type='scan_service',
    data={'command': 'start'}
)
4

Receive Messages

Messages are automatically received and processed in the background thread. The _process_message() method handles different message types.
5

Close Connection

Always close the connection when done:
serial_manager.close()

Message Format

All messages use JSON format with newline termination:

Outgoing Message Structure

{
  "type": "message_type",
  "data": {
    "key1": "value1",
    "key2": "value2"
  }
}

Message Type Examples

Query device status:
serial_manager.send_message(
    message_type='check_service',
    data={'query': 'status'}
)
Expected Response:
{
  "type": "check_service",
  "data": {
    "state": "ready"
  }
}

Complete Working Example

example_usage.py
import time
import logging as log
from serial_com import CommunicationManager

log.basicConfig(level=log.INFO, format="%(asctime)s - %(levelname)s - %(message)s")

def main():
    # Initialize serial manager
    serial_manager = CommunicationManager(
        port='/dev/ttyUSB0',  # Adjust for your system
        baudrate=115200
    )
    
    try:
        # Connect to device
        if serial_manager.connect():
            log.info("Connected to VEX Brain")
            
            # Check device status
            serial_manager.send_message('check_service', {'query': 'status'})
            time.sleep(1)
            
            # Start safety check
            serial_manager.send_message('safety_service', {'action': 'check'})
            time.sleep(5)
            
            # Begin scanning
            serial_manager.send_message('scan_service', {'mode': 'continuous'})
            
            # Keep program running to receive messages
            log.info("Listening for messages... Press Ctrl+C to exit")
            while True:
                time.sleep(1)
        else:
            log.error("Failed to connect to device")
            
    except KeyboardInterrupt:
        log.info("Program terminated by user")
    finally:
        serial_manager.close()

if __name__ == "__main__":
    main()

Threading and Event Handling

The CommunicationManager uses threading for non-blocking communication:

Background Read Thread

# Read thread continuously monitors incoming data
def _read_loop(self):
    while not self._stop_event.is_set():
        if self.serial_port and self.serial_port.in_waiting:
            # Read and process data
            char = self.serial_port.read()
            # Buffer until newline
            if char == self.message_end:
                # Decode and process complete message
                message = self.buffer.decode()
                data = json.loads(message)
                self._process_message(data)
        time.sleep(0.01)  # Prevent busy-waiting

Event-Driven Architecture

The system uses a daemon thread for reading, which automatically terminates when the main program exits. The Event object ensures graceful shutdown.

Integration with TrashClassificator

Combine serial communication with video processing:
integrated_system.py
import cv2
from trash_classificator.processor import TrashClassificator
from serial_com import CommunicationManager

class IntegratedTrashSystem:
    def __init__(self, video_source, serial_port):
        self.cap = cv2.VideoCapture(video_source)
        self.classifier = TrashClassificator()
        self.serial = CommunicationManager(port=serial_port)
        
        # Connect to serial device
        self.serial.connect()
    
    def run(self):
        while self.cap.isOpened():
            success, frame = self.cap.read()
            if not success:
                break
            
            # Process frame
            image, process_log = self.classifier.frame_processing(frame)
            
            # Send classification results via serial
            if process_log.get('detected_objects', 0) > 0:
                for classification in process_log.get('classifications', []):
                    self.serial.send_message(
                        'classification',
                        classification
                    )
            
            cv2.imshow("Frame", image)
            if cv2.waitKey(1) & 0xFF == ord('q'):
                break
        
        self.cap.release()
        cv2.destroyAllWindows()
        self.serial.close()

if __name__ == "__main__":
    system = IntegratedTrashSystem(0, '/dev/ttyUSB0')
    system.run()

Best Practices

Connection Reliability: Always implement connection retry logic for production applications:
max_retries = 3
for attempt in range(max_retries):
    if serial_manager.connect():
        break
    time.sleep(2)
Error Handling: Monitor connection status and handle disconnections:
if not serial_manager.is_connected:
    log.warning("Connection lost, attempting to reconnect...")
    serial_manager.connect()
Baudrate Matching: Ensure both devices use the same baudrate. Common rates: 9600, 115200, 230400

Troubleshooting

Permission Denied (Linux/Mac)

sudo chmod 666 /dev/ttyUSB0
# Or add user to dialout group
sudo usermod -a -G dialout $USER

Port Already in Use

Close other applications using the port or use:
try:
    serial_manager.connect()
except serial.SerialException as e:
    log.error(f"Port busy: {e}")

No Data Received

  • Verify baudrate matches
  • Check physical connections
  • Test with serial monitor tool (PuTTY, screen, minicom)

Next Steps

Video Stream

Process video streams for classification

Custom Integration

Build custom applications with the classifier

Build docs developers (and LLMs) love