Overview
This guide shows how to integrate the TrashClassificator into your own applications. Learn to use custom video sources, process results programmatically, save detection data, and create advanced integration patterns.Basic Integration Pattern
Minimal Integration
The simplest way to integrate the classifier:basic_integration.py
import cv2
from trash_classificator.processor import TrashClassificator
# Initialize classifier
classifier = TrashClassificator()
# Process a single image
image = cv2.imread('trash_image.jpg')
annotated_image, results = classifier.frame_processing(image)
# Access results
print(f"Detected {results['detected_objects']} objects")
for classification in results['classifications']:
print(f"Class: {classification['class']}, Confidence: {classification['confidence']}")
# Save annotated image
cv2.imwrite('output.jpg', annotated_image)
Custom Video Sources
Multiple Camera Sources
Process multiple camera feeds simultaneously:multi_camera.py
import cv2
import threading
from trash_classificator.processor import TrashClassificator
class MultiCameraSystem:
def __init__(self, camera_ids):
self.cameras = [cv2.VideoCapture(cam_id) for cam_id in camera_ids]
self.classifiers = [TrashClassificator() for _ in camera_ids]
self.results = {}
def process_camera(self, camera_index):
"""Process single camera in separate thread"""
cap = self.cameras[camera_index]
classifier = self.classifiers[camera_index]
while cap.isOpened():
success, frame = cap.read()
if not success:
break
image, results = classifier.frame_processing(frame)
self.results[f'camera_{camera_index}'] = results
cv2.imshow(f'Camera {camera_index}', image)
if cv2.waitKey(1) & 0xFF == ord('q'):
break
def run(self):
"""Start processing all cameras"""
threads = []
for i in range(len(self.cameras)):
thread = threading.Thread(target=self.process_camera, args=(i,))
thread.start()
threads.append(thread)
for thread in threads:
thread.join()
# Cleanup
for cap in self.cameras:
cap.release()
cv2.destroyAllWindows()
if __name__ == "__main__":
# Process cameras 0 and 1
system = MultiCameraSystem([0, 1])
system.run()
IP Camera / RTSP Streams
ip_camera.py
import cv2
from trash_classificator.processor import TrashClassificator
class IPCameraProcessor:
def __init__(self, rtsp_url, username=None, password=None):
# Construct authenticated URL if credentials provided
if username and password:
# Format: rtsp://username:password@ip:port/path
url_parts = rtsp_url.split('//')
rtsp_url = f"{url_parts[0]}//{username}:{password}@{url_parts[1]}"
self.cap = cv2.VideoCapture(rtsp_url)
self.classifier = TrashClassificator()
# Set buffer size to reduce latency
self.cap.set(cv2.CAP_PROP_BUFFERSIZE, 1)
def run(self):
while self.cap.isOpened():
success, frame = self.cap.read()
if not success:
print("Reconnecting...")
self.cap.release()
self.cap = cv2.VideoCapture(rtsp_url)
continue
image, results = self.classifier.frame_processing(frame)
cv2.imshow('IP Camera', image)
if cv2.waitKey(1) & 0xFF == ord('q'):
break
self.cap.release()
cv2.destroyAllWindows()
if __name__ == "__main__":
processor = IPCameraProcessor(
rtsp_url='rtsp://192.168.1.100:554/stream1',
username='admin',
password='password'
)
processor.run()
Image Directory Batch Processing
batch_processing.py
import cv2
import os
from pathlib import Path
from trash_classificator.processor import TrashClassificator
class BatchImageProcessor:
def __init__(self, input_dir, output_dir):
self.input_dir = Path(input_dir)
self.output_dir = Path(output_dir)
self.output_dir.mkdir(exist_ok=True)
self.classifier = TrashClassificator()
def process_directory(self):
"""Process all images in directory"""
image_extensions = ['.jpg', '.jpeg', '.png', '.bmp']
image_files = []
for ext in image_extensions:
image_files.extend(self.input_dir.glob(f'*{ext}'))
results_summary = []
for img_path in image_files:
print(f"Processing {img_path.name}...")
# Read and process image
image = cv2.imread(str(img_path))
annotated, results = self.classifier.frame_processing(image)
# Save annotated image
output_path = self.output_dir / f"annotated_{img_path.name}"
cv2.imwrite(str(output_path), annotated)
# Store results
results_summary.append({
'filename': img_path.name,
'results': results
})
return results_summary
if __name__ == "__main__":
processor = BatchImageProcessor(
input_dir='./images/trash',
output_dir='./images/output'
)
summary = processor.process_directory()
# Print summary
total_objects = sum(r['results']['detected_objects'] for r in summary)
print(f"\nProcessed {len(summary)} images")
print(f"Total objects detected: {total_objects}")
Processing Results Programmatically
Extracting Detection Data
result_extraction.py
import cv2
from trash_classificator.processor import TrashClassificator
class ResultExtractor:
def __init__(self):
self.classifier = TrashClassificator()
def extract_detections(self, image):
"""Extract structured detection data"""
annotated, results = self.classifier.frame_processing(image)
detections = []
for classification in results.get('classifications', []):
detection = {
'class': classification['class'],
'confidence': classification['confidence'],
'bbox': classification.get('bbox', []), # [x1, y1, x2, y2]
'center': self._calculate_center(classification.get('bbox', [])),
'area': self._calculate_area(classification.get('bbox', []))
}
detections.append(detection)
return detections
def _calculate_center(self, bbox):
"""Calculate bounding box center"""
if len(bbox) == 4:
x1, y1, x2, y2 = bbox
return ((x1 + x2) // 2, (y1 + y2) // 2)
return None
def _calculate_area(self, bbox):
"""Calculate bounding box area"""
if len(bbox) == 4:
x1, y1, x2, y2 = bbox
return (x2 - x1) * (y2 - y1)
return 0
if __name__ == "__main__":
extractor = ResultExtractor()
image = cv2.imread('trash.jpg')
detections = extractor.extract_detections(image)
for det in detections:
print(f"Found {det['class']} at {det['center']} "
f"with {det['confidence']:.2%} confidence")
Real-Time Analytics
analytics.py
import cv2
import time
from collections import defaultdict, deque
from trash_classificator.processor import TrashClassificator
class TrashAnalytics:
def __init__(self, video_source, window_size=30):
self.cap = cv2.VideoCapture(video_source)
self.classifier = TrashClassificator()
# Analytics data
self.class_counts = defaultdict(int)
self.detection_history = deque(maxlen=window_size)
self.start_time = time.time()
self.frame_count = 0
def update_analytics(self, results):
"""Update analytics with new results"""
self.frame_count += 1
self.detection_history.append(results['detected_objects'])
for classification in results.get('classifications', []):
self.class_counts[classification['class']] += 1
def get_statistics(self):
"""Get current statistics"""
elapsed_time = time.time() - self.start_time
fps = self.frame_count / elapsed_time if elapsed_time > 0 else 0
avg_detections = sum(self.detection_history) / len(self.detection_history) if self.detection_history else 0
return {
'fps': fps,
'total_frames': self.frame_count,
'avg_detections': avg_detections,
'class_distribution': dict(self.class_counts),
'most_common_class': max(self.class_counts, key=self.class_counts.get) if self.class_counts else None
}
def run(self):
while self.cap.isOpened():
success, frame = self.cap.read()
if not success:
break
image, results = self.classifier.frame_processing(frame)
self.update_analytics(results)
# Display statistics on frame
stats = self.get_statistics()
cv2.putText(image, f"FPS: {stats['fps']:.1f}", (10, 30),
cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)
cv2.putText(image, f"Avg Detections: {stats['avg_detections']:.1f}", (10, 70),
cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)
cv2.imshow('Analytics', image)
if cv2.waitKey(1) & 0xFF == ord('q'):
break
# Print final statistics
final_stats = self.get_statistics()
print("\n=== Final Statistics ===")
print(f"Total Frames: {final_stats['total_frames']}")
print(f"Average FPS: {final_stats['fps']:.2f}")
print(f"Average Detections: {final_stats['avg_detections']:.2f}")
print(f"\nClass Distribution:")
for cls, count in final_stats['class_distribution'].items():
print(f" {cls}: {count}")
self.cap.release()
cv2.destroyAllWindows()
if __name__ == "__main__":
analytics = TrashAnalytics(0)
analytics.run()
Saving Detection Results
JSON Export
json_export.py
import cv2
import json
import datetime
from pathlib import Path
from trash_classificator.processor import TrashClassificator
class ResultExporter:
def __init__(self, output_dir='results'):
self.output_dir = Path(output_dir)
self.output_dir.mkdir(exist_ok=True)
self.classifier = TrashClassificator()
def process_and_export(self, image_path):
"""Process image and export results to JSON"""
image = cv2.imread(str(image_path))
annotated, results = self.classifier.frame_processing(image)
# Prepare export data
export_data = {
'timestamp': datetime.datetime.now().isoformat(),
'image_path': str(image_path),
'image_size': {
'width': image.shape[1],
'height': image.shape[0]
},
'detections': results
}
# Save JSON
json_path = self.output_dir / f"{Path(image_path).stem}_results.json"
with open(json_path, 'w') as f:
json.dump(export_data, f, indent=2)
# Save annotated image
img_path = self.output_dir / f"{Path(image_path).stem}_annotated.jpg"
cv2.imwrite(str(img_path), annotated)
return json_path, img_path
if __name__ == "__main__":
exporter = ResultExporter()
json_path, img_path = exporter.process_and_export('trash.jpg')
print(f"Results saved to {json_path}")
print(f"Annotated image saved to {img_path}")
CSV Logging
csv_logging.py
import cv2
import csv
import datetime
from pathlib import Path
from trash_classificator.processor import TrashClassificator
class CSVLogger:
def __init__(self, csv_path='detections.csv'):
self.csv_path = Path(csv_path)
self.classifier = TrashClassificator()
# Create CSV with headers if it doesn't exist
if not self.csv_path.exists():
with open(self.csv_path, 'w', newline='') as f:
writer = csv.writer(f)
writer.writerow(['timestamp', 'class', 'confidence', 'x1', 'y1', 'x2', 'y2', 'source'])
def log_detections(self, image, source='unknown'):
"""Log detections to CSV file"""
annotated, results = self.classifier.frame_processing(image)
with open(self.csv_path, 'a', newline='') as f:
writer = csv.writer(f)
timestamp = datetime.datetime.now().isoformat()
for classification in results.get('classifications', []):
bbox = classification.get('bbox', [0, 0, 0, 0])
row = [
timestamp,
classification['class'],
classification['confidence'],
*bbox,
source
]
writer.writerow(row)
return annotated, results
if __name__ == "__main__":
logger = CSVLogger()
cap = cv2.VideoCapture(0)
while cap.isOpened():
success, frame = cap.read()
if not success:
break
annotated, results = logger.log_detections(frame, source='webcam')
cv2.imshow('Logging', annotated)
if cv2.waitKey(1) & 0xFF == ord('q'):
break
cap.release()
cv2.destroyAllWindows()
Advanced Integration Patterns
Combined System: Video + Serial + Analytics
advanced_integration.py
import cv2
import json
import time
from collections import defaultdict
from trash_classificator.processor import TrashClassificator
from serial_com import CommunicationManager
class AdvancedTrashSystem:
def __init__(self, video_source, serial_port=None):
# Video processing
self.cap = cv2.VideoCapture(video_source)
self.cap.set(3, 1280)
self.cap.set(4, 720)
self.classifier = TrashClassificator()
# Serial communication (optional)
self.serial = None
if serial_port:
self.serial = CommunicationManager(port=serial_port)
self.serial.connect()
# Analytics
self.stats = defaultdict(int)
self.last_detection_time = {}
self.detection_cooldown = 2.0 # seconds
def should_report_detection(self, class_name):
"""Avoid reporting duplicate detections too quickly"""
current_time = time.time()
last_time = self.last_detection_time.get(class_name, 0)
if current_time - last_time > self.detection_cooldown:
self.last_detection_time[class_name] = current_time
return True
return False
def process_frame(self, frame):
"""Process single frame"""
annotated, results = self.classifier.frame_processing(frame)
# Update statistics
for classification in results.get('classifications', []):
class_name = classification['class']
self.stats[class_name] += 1
# Send via serial if available and not duplicate
if self.serial and self.should_report_detection(class_name):
self.serial.send_message(
message_type='classification',
data=classification
)
return annotated, results
def draw_stats(self, image):
"""Draw statistics overlay on image"""
y_offset = 30
for class_name, count in self.stats.items():
text = f"{class_name}: {count}"
cv2.putText(image, text, (10, y_offset),
cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0), 2)
y_offset += 30
return image
def run(self):
"""Main processing loop"""
try:
while self.cap.isOpened():
success, frame = self.cap.read()
if not success:
break
# Process frame
annotated, results = self.process_frame(frame)
# Draw statistics
annotated = self.draw_stats(annotated)
# Display
cv2.imshow('Advanced Trash System', annotated)
if cv2.waitKey(1) & 0xFF == ord('q'):
break
finally:
# Cleanup
self.cap.release()
cv2.destroyAllWindows()
if self.serial:
self.serial.close()
# Print final statistics
print("\n=== Session Statistics ===")
for class_name, count in self.stats.items():
print(f"{class_name}: {count}")
if __name__ == "__main__":
system = AdvancedTrashSystem(
video_source=0,
serial_port='/dev/ttyUSB0' # Optional
)
system.run()
Best Practices
Resource Management: Always release video captures and close connections in a
finally block or context manager to prevent resource leaks.Performance Optimization: For real-time applications, consider:
- Processing every Nth frame instead of all frames
- Reducing input resolution
- Using GPU acceleration if available
- Running detection in a separate thread
Error Handling: Implement robust error handling for production systems:
try:
image, results = classifier.frame_processing(frame)
except Exception as e:
log.error(f"Processing failed: {e}")
continue # Skip this frame
Next Steps
Video Stream
Basic video stream processing example
Serial Communication
Send results to external devices
API Reference
Detailed API documentation
Configuration
Configure classifier settings