System Overview
The system is organized as a pipeline of independent modules connected by Qt signals and shared SQLite state. Packet capture and analysis run in a background thread; the GUI reacts to emitted signals without polling.
┌─────────────────────────────────────────────────────────────────┐
│ Sniffer Thread │
│ │
│ Network Interface │
│ │ │
│ ▼ │
│ AsyncSniffer (Scapy) ──► procesar_paquete() │
│ │ │
│ ┌─────────────────────┼──────────────────────┐ │
│ ▼ ▼ ▼ │
│ detectar_syn_flood detectar_ddos detectar_ │
│ detectar_escaneo_ detectar_exploit udp_flood │
│ puertos detectar_sql_ │
│ │ injection │ │
│ └─────────────────────┬──────────────────────┘ │
│ ▼ │
│ guardar_ataque() │
│ │ │
│ ┌─────────────────────┼──────────────────────┐ │
│ ▼ ▼ ▼ │
│ clasificar_ataque_ml SQLite (intrusiones.db) CSV log │
│ (≥70% → ML label, ataques / bloqueos dataset │
│ <70% → Heurística) tables │
│ │ │
└──────────────┼───────────────────────────────────────────────── ┘
│ Qt Signals (thread-safe)
│ nuevo_evento / nuevo_bloqueo / nuevo_trafico
▼
┌──────────────────────────────────────────────────────────────── ┐
│ Main (UI) Thread │
│ │
│ IDSInterface (PyQt5) │
│ ├── Live Traffic Panel (nuevo_trafico) │
│ ├── Events Table (nuevo_evento) │
│ ├── IPS Block Table (nuevo_bloqueo) │
│ └── Matplotlib Charts (QTimer refresh) │
└─────────────────────────────────────────────────────────────────┘
│
▼
respuesta_activa.py ── PowerShell ── Windows Firewall
telegram_alert.py ── HTTP POST ── Telegram Bot API
abuseipdb_module.py ── (mock mode) IP reputation
Module Breakdown
ids.py — Core Engine
The central module. It owns the AsyncSniffer, all six heuristic detectors, the ML inference call, and the SQLite writes. It also defines ComunicadorIDS, the Qt signal bus shared across the entire application.
Key responsibilities:
- Start and stop
AsyncSniffer without blocking the UI thread
- Run each captured packet through all six detectors sequentially
- Throttle repeated alerts from the same IP (
TIEMPO_ENTRE_ALERTAS = 2 seconds)
- Coordinate ML classification and IPS block decision in
guardar_ataque()
- Emit Qt signals so the GUI stays updated without polling
Detection thresholds (tuned to avoid false positives on normal traffic):
| Threshold | Value | Rationale |
|---|
THRESHOLD_SYN_FLOOD | 10 packets / 500 ms | Pure SYN packets are rare in normal browsing |
THRESHOLD_DDOS | 500 packets / 1 s | High enough to not trigger on YouTube/downloads |
PORT_SCAN_THRESHOLD | 10 unique ports | Touching 10 distinct ports in one session is anomalous |
THRESHOLD_UDP_FLOOD | 500 packets / 1 s | Accommodates heavy DNS/streaming traffic |
Exploit detector — monitored ports:
# ids.py — detectar_exploit()
PUERTOS_EXPLOIT = {135, 139, 445, 3389, 5900, 21, 22, 23, 69}
# 135/139/445: SMB/RPC (EternalBlue, WannaCry)
# 3389: RDP (BlueKeep)
# 5900: VNC
# 21/22/23: FTP, SSH, Telnet
# 69: TFTP
Only SYN ('S') and SYN-ACK ('SA') flags are flagged — established ACK/PSH traffic is ignored.
SQL injection detector — regex pattern:
# ids.py — detectar_sql_injection()
sql_pattern = re.compile(
r"(?i)(\b(select|union|insert|update|delete|drop|alter|create|exec|execute|cast|declare|grant|revoke)\b"
r".?(--|#|;|/\|\*/|@@|char\(|nchar\(|varchar\(|nvarchar\()|"
r"('(\s)or(\s)\d+=\d+)|" # OR 1=1 auth bypass
r"(\bunion\b.*\bselect\b)|" # UNION SELECT exfiltration
r"(\bexec\b(\s|\+)+(s|x)p\w+)|" # EXEC xp_* stored procedures
r"(;?\s*--)|" # SQL comment injection
r"(\bwaitfor\b\s+delay\b)|" # Time-blind injection
r"(sleep\(\d+\))"
r")"
)
The detector reads the raw TCP payload (scapy.Raw.load) and applies an exclusion list of benign HTTP parameters (order=desc, limit=, search=, etc.) before matching.
CEREBRO.py — ML Training Pipeline
A standalone training script that produces the .pkl artifacts consumed by ids.py at startup. It does not run at inference time.
Pipeline stages:
# CEREBRO.py
pipeline = ImbPipeline([
('smote', SMOTE(random_state=42)), # 1. Oversample minority classes
('scaler', StandardScaler()), # 2. Normalize to mean=0, std=1
('clf', ensemble_model) # 3. Train the voting ensemble
])
Ensemble composition:
| Model | Role | Key parameters |
|---|
RandomForestClassifier | Majority vote anchor, robust to overfitting | n_estimators=100, class_weight='balanced' |
MLPClassifier | Captures non-linear feature interactions | hidden_layer_sizes=(64, 64), early_stopping=True |
XGBClassifier | High-precision gradient boosting on tabular data | eval_metric='mlogloss', use_label_encoder=False |
All three estimators vote with soft voting — their class probability outputs are averaged, not their discrete predictions.
Feature vector (6 features):
# CEREBRO.py
features = ['src_ip_int', 'dst_ip_int', 'dst_port', 'protocol_encoded', 'flag_encoded', 'hour']
IPs are converted to 32-bit integers via socket.inet_aton + struct.unpack. Protocol and flag strings are LabelEncoder-encoded. The hour feature captures time-of-day patterns.
Artifacts produced:
| File | Contents |
|---|
modelo_ensamble_optimizado.pkl | Full ImbPipeline (SMOTE + scaler + ensemble) |
features_seleccionadas.pkl | Ordered list of feature names for input validation |
flag_encoder.pkl | Fitted LabelEncoder for TCP flag strings |
protocol_encoder.pkl | Fitted LabelEncoder for protocol strings |
tipo_ataque_encoder.pkl | Fitted LabelEncoder for attack class labels |
The training dataset (Dataset/escanerpuertos.csv) contains 20,000 synthetic records: 60% normal interactive traffic and 40% attack traffic spread across six attack types. The trained model achieves 91.90% accuracy and is evaluated with macro F1-score to account for class imbalance.
interfasc.py — PyQt5 SOC Dashboard
The main window (IDSInterface) is a QWidget with a QTabWidget containing multiple panels. A background DataProcessor thread (QThread) consumes events from a deque and emits them in batches to avoid overwhelming the UI event loop.
Performance constants:
# interfasc.py
MAX_EVENTOS_TABLA = 1000 # Max visible rows (prevents scroll lag)
MAX_EVENTOS_MEMORIA = 10000 # Global deque limit (prevents memory leak)
MAX_TRAFICO_LINEAS = 500 # Lines in the live traffic panel
UPDATE_BATCH_SIZE = 50 # Events processed per DataProcessor cycle
Attack color map (for the events table):
# interfasc.py
ATTACK_STYLE = {
"Inyección SQL": {"color": "#ff5370"}, # Red
"PORT scanner": {"color": "#bb86fc"}, # Violet
"DDOS": {"color": "#00eaff"}, # Cyan
"SYN FLOOD": {"color": "#82b1ff"}, # Light blue
"UDP Flood": {"color": "#ffa000"}, # Orange
}
Unknown attack types get a deterministic color from the tab20 colormap: abs(hash(label)) % cmap.N.
IPS panel features:
- 7-column table: Time, Blocked IP, Attack Type, Severity, Action Applied, Status, Time Remaining
- Live countdown via
QTimer — rows automatically transition to “Expirado” at zero
- Manual unblock marks the row blue (“Desbloqueado”) without removing it, preserving audit history
- “Bloqueo simulado” state shown when PowerShell command fails (non-admin execution)
respuesta_activa.py — Windows Firewall Rule Management
Adds and removes inbound block rules using PowerShell’s New-NetFirewallRule / Remove-NetFirewallRule cmdlets. Each command runs through subprocess.run with capture_output=True so stderr is captured for diagnostics rather than silently discarded.
# respuesta_activa.py — bloquear_ip()
comando = (
f"New-NetFirewallRule -DisplayName '{nombre_regla}' "
f"-Direction Inbound -Action Block -RemoteAddress {ip} "
f"-Description 'Bloqueo automático IDS UNIPAZ'"
)
Automatic unblock is scheduled by spawning a daemon thread that sleeps for the block duration (default: 60 seconds) and then calls Remove-NetFirewallRule. All block and unblock actions are also written to logs_bloqueos.txt.
Run the application as Administrator to enable real firewall blocking. Without elevated privileges, New-NetFirewallRule will fail and the system will automatically fall back to “Bloqueo simulado”, which is still recorded in SQLite and shown in the IPS panel.
telegram_alert.py — Async Telegram Notifications
Sends a plain-text HTTP POST to https://api.telegram.org/bot{TOKEN}/sendMessage for every confirmed attack. Supports multiple recipients by iterating over a list of chat_id values.
The call is always dispatched from ids.py via a daemon thread:
# ids.py
def _enviar_alerta_async(mensaje: str):
Thread(target=lambda: enviar_alerta(mensaje), daemon=True).start()
This ensures that Telegram’s 1–5 second HTTP latency never stalls the sniffer’s packet capture loop.
abuseipdb_module.py — IP Reputation (Mock Mode)
Provides a GestorAbuseIPDB class used by the GUI to look up IP reputation scores. The current implementation is a mock that returns a simulated score of 15/100 and country “COLOMBIA (Simulado)” for any IP. It is designed to be swapped for a real AbuseIPDB API client without changing the interface expected by interfasc.py.
generar_dataset.py / guardar_dataset.py — Training Data
generar_dataset.py synthesizes the escanerpuertos.csv training file (20,000 records, 60% normal / 40% attack traffic across six classes). guardar_dataset.py appends each live-detected event — along with both its heuristic and ML classification — to a running CSV file, building a feedback corpus for future retraining:
# guardar_dataset.py — guardar_evento_en_dataset()
writer.writerow([
time.ctime(), # Timestamp
ip_src, # Source IP
ip_dst, # Destination IP
puerto, # Destination port
protocolo, # Protocol
flag, # TCP flag
tipo_ataque, # Heuristic classification
tipo_ataque_ml # ML classification
])
Qt Signal Bus Architecture
ComunicadorIDS is a QObject subclass instantiated once at module load time in ids.py and imported by interfasc.py. It acts as the sole communication channel between the sniffer thread and the UI thread.
# ids.py
class ComunicadorIDS(QObject):
# Carries: [timestamp, ip_src, ip_dst, port, protocol, flag, attack_type]
nuevo_evento = pyqtSignal(list)
# Carries: [ip, action, duration, attack_type, severity]
nuevo_bloqueo = pyqtSignal(list)
# Carries: packet.summary() string for the live traffic panel
nuevo_trafico = pyqtSignal(str)
comunicador = ComunicadorIDS()
Qt’s signal-slot mechanism automatically detects that the emitting thread (sniffer) differs from the receiving thread (UI) and queues the delivery via the event loop — making cross-thread updates safe without requiring explicit locks in the GUI code.
interfasc.py also uses a DataProcessor (QThread) as a secondary buffer. It receives events via comunicador.nuevo_evento, batches them in groups of up to UPDATE_BATCH_SIZE = 50, and re-emits them to the main window. This prevents the UI from being called on every single packet during a flood attack.
Decision Flow
For every packet that matches a heuristic detector, the following logic runs inside guardar_ataque():
Packet captured
│
▼
Heuristic detector fires
│
▼
Is IPS mode active AND ML loaded?
├── No → label = "{attack_type} (Heurística)"
│
└── Yes → Run clasificar_ataque_ml()
│
▼
confidence ≥ 70%?
├── Yes → label = "{ML_label} (ML: {confidence}%)"
│
└── No → label = "{attack_type} (Heurística)"
│
▼
Is attack type critical?
(exploit / sql / flood / ddos / escaneo)
├── No → No block
│
└── Yes →
Label came from ML?
├── Yes, confidence ≥ 70% → Block
└── No (Heurística) → Block
│
▼
respuesta_activa.bloquear_ip()
│
┌─────────┴──────────┐
▼ ▼
Success Fail (no admin)
estado='ACTIVO' estado='SIMULADO'
│
▼
SQLite INSERT → bloqueos table
comunicador.nuevo_bloqueo.emit()
Data Persistence
All events are written to intrusiones.db (SQLite) in the project directory. The schema is created idempotently at startup:
ataques table — one row per detected attack event:
CREATE TABLE IF NOT EXISTS ataques (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp TEXT,
tipo_ataque TEXT, -- e.g. "SYN Flood (ML: 97.3%)" or "Escaneo de Puertos (Heurística)"
ip_src TEXT,
protocolo TEXT,
puerto INTEGER
);
bloqueos table — one row per IPS block action:
CREATE TABLE IF NOT EXISTS bloqueos (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp TEXT,
ip_src TEXT,
tipo_ataque TEXT,
duracion INTEGER, -- Block duration in seconds (default: 60)
estado TEXT -- 'ACTIVO', 'SIMULADO', or 'EXPIRADO'
);
All INSERT statements use parameterized placeholders (?) to prevent second-order SQL injection against the IDS’s own database.