Overview
The CommunicationManager class enables bidirectional serial communication between the Trash Classification AI System and external devices (like robotics systems, VEX Brain, Arduino, etc.). This allows you to send classification results and commands, and receive status updates.
Complete CommunicationManager Class
import json
import time
import serial
import logging as log
from typing import Dict, Any, Optional
from threading import Thread, Event
log.basicConfig( level = log. INFO , format = " %(asctime)s - %(levelname)s - %(message)s " )
class CommunicationManager :
def __init__ ( self , port : str = 'COM7' , baudrate : int = 115200 ):
self .port = port
self .baudrate = baudrate
self .message_end = b ' \n '
self .serial_port = None
self .is_connected = False
self ._read_thread = None
self .buffer = bytearray ()
self ._stop_event = Event()
def connect ( self ) -> bool :
"""Establish serial connection"""
if self .is_connected:
return True
try :
self .serial_port = serial.Serial(
port = self .port,
baudrate = self .baudrate,
timeout = 10 ,
write_timeout = 10
)
self .is_connected = True
# Start read loop in separate thread
self ._stop_event.clear()
self ._read_thread = Thread( target = self ._read_loop)
self ._read_thread.daemon = True
self ._read_thread.start()
return True
except Exception as e:
log.error( f 'Error connecting to serial port: { str (e) } ' )
return False
def send_message ( self , message_type : str , data : dict ) -> bool :
"""
Send JSON message to robot
message_type: Type of message ('command', 'status', 'classification', etc)
data: Dictionary with message data
"""
if not self .is_connected or not self .serial_port:
log.error( "Error: Serial port not initialized" )
return False
try :
message = {
'type' : message_type,
'data' : data,
}
encoded_message = json.dumps(message).encode() + self .message_end
log.info( f 'Sending message: { encoded_message } ' )
self .serial_port.write(encoded_message)
return True
except Exception as e:
log.error( f "Error sending message: { e } " )
return False
def _read_loop ( self ):
"""Thread loop for reading messages from connected device"""
while not self ._stop_event.is_set():
if self .serial_port and self .serial_port.in_waiting:
try :
char = self .serial_port.read()
if char == self .message_end:
message = self .buffer.decode()
self .buffer = bytearray ()
try :
data = json.loads(message)
self ._process_message(data)
except json.JSONDecodeError:
log.error( f 'Error decoding message: { message } ' )
else :
self .buffer.extend(char)
except Exception as e:
log.error( f 'Error reading serial port: { e } ' )
time.sleep( 0.01 )
def _process_message ( self , message : dict ):
"""Process incoming message from device"""
try :
msg_type = message.get( 'type' , '' ).lower()
data = message.get( 'data' , {})
if msg_type == 'check_service' :
state = data.get( 'state' )
log.info( f ' { msg_type } status: \n state: { state } ' )
elif msg_type == 'safety_service' :
state = data.get( 'state' )
time_taken = data.get( 'time' )
log.info( f " { msg_type } status: \n state: { state } , \n time: { time_taken } s" )
if state == 'error' :
log.error( f "Safety Service Error: { data.get( 'error_msg' , 'Unknown error' ) } " )
elif msg_type == 'scan_service' :
state = data.get( 'state' )
if state == 'in_progress' :
log.info( f "Scan Data - Object Detected: { data[ 'object' ] } , "
f "Angle: { data[ 'center_angle' ] } °, "
f "Distance: { data[ 'distance' ] } mm, " )
elif state == 'complete' :
objects = data.get( 'objects' , [])
log.info( f "Scan Complete - Objects detected: { len (objects) } " )
for obj in objects:
log.info( f "Object at { obj[ 'center_angle' ] } °, "
f "width: { obj[ 'width' ] } mm, "
f "distance: { obj[ 'distance' ] } mm, "
f "size: { obj[ 'max_size' ] } mm" )
except Exception as e:
log.error( f 'Error processing message: { e } ' )
def close ( self ):
"""Close serial connection"""
self ._stop_event.set()
if self ._read_thread:
self ._read_thread.join( timeout = 1.0 )
if self .serial_port and self .serial_port.is_open:
self .serial_port.close()
self .is_connected = False
log.info( 'Serial connection closed' )
Connection Setup
Initialize Communication Manager
Create an instance with your serial port configuration: serial_manager = CommunicationManager(
port = 'COM7' , # Windows: COM1, COM2, etc.
# Linux/Mac: /dev/ttyUSB0, /dev/ttyACM0, etc.
baudrate = 115200 # Must match device baudrate
)
Finding Your Port :
Windows: Check Device Manager → Ports (COM & LPT)
Linux: ls /dev/tty* or dmesg | grep tty
Mac: ls /dev/cu.*
Establish Connection
Connect to the serial device: if serial_manager.connect():
log.info( "Connected to device" )
else :
log.error( "Failed to connect" )
The connection starts a background thread that continuously listens for incoming messages.
Send Messages
Send commands or data to the connected device: serial_manager.send_message(
message_type = 'scan_service' ,
data = { 'command' : 'start' }
)
Receive Messages
Messages are automatically received and processed in the background thread. The _process_message() method handles different message types.
Close Connection
Always close the connection when done:
All messages use JSON format with newline termination:
Outgoing Message Structure
{
"type" : "message_type" ,
"data" : {
"key1" : "value1" ,
"key2" : "value2"
}
}
Message Type Examples
Check Service
Safety Service
Scan Service
Classification
Query device status: serial_manager.send_message(
message_type = 'check_service' ,
data = { 'query' : 'status' }
)
Expected Response: {
"type" : "check_service" ,
"data" : {
"state" : "ready"
}
}
Execute safety procedures: serial_manager.send_message(
message_type = 'safety_service' ,
data = { 'action' : 'emergency_stop' }
)
Expected Response: {
"type" : "safety_service" ,
"data" : {
"state" : "stopped" ,
"time" : 1.23
}
}
Request object scanning: serial_manager.send_message(
message_type = 'scan_service' ,
data = { 'mode' : 'continuous' }
)
In-Progress Response: {
"type" : "scan_service" ,
"data" : {
"state" : "in_progress" ,
"object" : "detected" ,
"center_angle" : 45.5 ,
"distance" : 250
}
}
Complete Response: {
"type" : "scan_service" ,
"data" : {
"state" : "complete" ,
"objects" : [
{
"center_angle" : 45.5 ,
"width" : 80 ,
"distance" : 250 ,
"max_size" : 120
}
]
}
}
Send trash classification results: serial_manager.send_message(
message_type = 'classification' ,
data = {
'class' : 'plastic' ,
'confidence' : 0.92 ,
'bbox' : [ 100 , 150 , 200 , 300 ]
}
)
Complete Working Example
import time
import logging as log
from serial_com import CommunicationManager
log.basicConfig( level = log. INFO , format = " %(asctime)s - %(levelname)s - %(message)s " )
def main ():
# Initialize serial manager
serial_manager = CommunicationManager(
port = '/dev/ttyUSB0' , # Adjust for your system
baudrate = 115200
)
try :
# Connect to device
if serial_manager.connect():
log.info( "Connected to VEX Brain" )
# Check device status
serial_manager.send_message( 'check_service' , { 'query' : 'status' })
time.sleep( 1 )
# Start safety check
serial_manager.send_message( 'safety_service' , { 'action' : 'check' })
time.sleep( 5 )
# Begin scanning
serial_manager.send_message( 'scan_service' , { 'mode' : 'continuous' })
# Keep program running to receive messages
log.info( "Listening for messages... Press Ctrl+C to exit" )
while True :
time.sleep( 1 )
else :
log.error( "Failed to connect to device" )
except KeyboardInterrupt :
log.info( "Program terminated by user" )
finally :
serial_manager.close()
if __name__ == "__main__" :
main()
Threading and Event Handling
The CommunicationManager uses threading for non-blocking communication:
Background Read Thread
# Read thread continuously monitors incoming data
def _read_loop ( self ):
while not self ._stop_event.is_set():
if self .serial_port and self .serial_port.in_waiting:
# Read and process data
char = self .serial_port.read()
# Buffer until newline
if char == self .message_end:
# Decode and process complete message
message = self .buffer.decode()
data = json.loads(message)
self ._process_message(data)
time.sleep( 0.01 ) # Prevent busy-waiting
Event-Driven Architecture
The system uses a daemon thread for reading, which automatically terminates when the main program exits. The Event object ensures graceful shutdown.
Integration with TrashClassificator
Combine serial communication with video processing:
import cv2
from trash_classificator.processor import TrashClassificator
from serial_com import CommunicationManager
class IntegratedTrashSystem :
def __init__ ( self , video_source , serial_port ):
self .cap = cv2.VideoCapture(video_source)
self .classifier = TrashClassificator()
self .serial = CommunicationManager( port = serial_port)
# Connect to serial device
self .serial.connect()
def run ( self ):
while self .cap.isOpened():
success, frame = self .cap.read()
if not success:
break
# Process frame
image, process_log = self .classifier.frame_processing(frame)
# Send classification results via serial
if process_log.get( 'detected_objects' , 0 ) > 0 :
for classification in process_log.get( 'classifications' , []):
self .serial.send_message(
'classification' ,
classification
)
cv2.imshow( "Frame" , image)
if cv2.waitKey( 1 ) & 0x FF == ord ( 'q' ):
break
self .cap.release()
cv2.destroyAllWindows()
self .serial.close()
if __name__ == "__main__" :
system = IntegratedTrashSystem( 0 , '/dev/ttyUSB0' )
system.run()
Best Practices
Connection Reliability : Always implement connection retry logic for production applications:max_retries = 3
for attempt in range (max_retries):
if serial_manager.connect():
break
time.sleep( 2 )
Error Handling : Monitor connection status and handle disconnections:if not serial_manager.is_connected:
log.warning( "Connection lost, attempting to reconnect..." )
serial_manager.connect()
Baudrate Matching : Ensure both devices use the same baudrate. Common rates: 9600, 115200, 230400
Troubleshooting
Permission Denied (Linux/Mac)
sudo chmod 666 /dev/ttyUSB0
# Or add user to dialout group
sudo usermod -a -G dialout $USER
Port Already in Use
Close other applications using the port or use:
try :
serial_manager.connect()
except serial.SerialException as e:
log.error( f "Port busy: { e } " )
No Data Received
Verify baudrate matches
Check physical connections
Test with serial monitor tool (PuTTY, screen, minicom)
Next Steps
Video Stream Process video streams for classification
Custom Integration Build custom applications with the classifier