Skip to main content

Overview

Logstash processes incoming syslog messages from Palo Alto Networks firewalls, parses the logs, enriches them with threat intelligence, and forwards them to Elasticsearch. SafeNetworking uses multiple pipelines to handle different log types.

Pipeline Architecture

Logstash pipelines are configured in /etc/logstash/pipelines.yml:
- pipeline.id: threat
  path.config: "/etc/logstash/pipelines/threat.conf"
  
- pipeline.id: iot
  path.config: "/etc/logstash/pipelines/iot.conf"
  
- pipeline.id: gtp
  path.config: "/etc/logstash/pipelines/gtp.conf"
  
- pipeline.id: customer
  path.config: "/etc/logstash/pipelines/customer.conf"
Each pipeline runs independently and processes specific log types:
  • threat - PAN-OS threat, traffic, config, and system logs
  • iot - IoT threat detection from edge routers
  • gtp - GTP event code processing (optional)
  • customer - Custom pipeline for user-specific processing

Threat Pipeline Configuration

The primary pipeline for PAN-OS logs is defined in /etc/logstash/pipelines/threat.conf.

Input Section

Receives syslog messages from PAN-OS firewalls via UDP:
input {
    # Input for PAN-OS device logs
    udp {
        host => "0.0.0.0"
        port => "5514"
        type => "udp"
        tags => [ "PAN-OS_syslog" ]
    }
}
Configuration:
  • Protocol: UDP syslog
  • Port: 5514
  • Bind Address: 0.0.0.0 (listens on all interfaces)
  • Tag: PAN-OS_syslog (used for filtering)
Configure your PAN-OS firewalls to send syslog to this server on port 5514. See the PAN-OS documentation for syslog server configuration.

Filter Section - Threat Logs

Parses and enriches threat logs from PAN-OS:
filter {
    if "PAN-OS_syslog" in [tags] {
        if ([message] =~ /THREAT/) {
            csv {
                source => "message"
                columns => [ 
                    "Domain", "ReceiveTime", "SerialNumber", "Type", "Threat_ContentType", "Config Version",
                    "GeneratedTime", "SourceIP", "DestinationIP", "NATSourceIP", "NATDestinationIP", "RuleName", 
                    "SourceUser", "DestinationUser", "Application", "VirtualSystem", "SourceZone", "DestinationZone",
                    "InboundInterface", "OutboundInterface", "LogAction", "Time Logged", "SessionID",
                    "RepeatCount", "SourcePort", "DestinationPort", "NATSourcePort", "NATDestinationPort", "Flags",
                    "Protocol", "Action", "URL_Filename", "ThreatID", "Category", "Severity", "Direction",
                    "SequenceNumber", "ActionFlags", "SourceLocation", "DestinationLocation", "cpadding", 
                    "ContentType", "PCAP_ID", "FileDigest", "Cloud", "URLIndex", "UserAgent", "FileType",
                    "X-Forwarded-For", "Referer", "Sender", "Subject", "Recipient", "ReportID",
                    "DeviceGroupHierarchyLevel1", "DeviceGroupHierarchyLevel2", "DeviceGroupHierarchyLevel3",
                    "DeviceGroupHierarchyLevel4", "VirtualSystemName", "DeviceName", "File_URL", "SourceVMUUID",
                    "DestinationVMUUID", "HTTPMethod", "TunnelID_IMSI", "MonitorTag_IMEI", "ParentSessionID", 
                    "ParentStartTime", "TunnelType", "ThreatCategory", "ContentVersion", "FUTURE_USE",
                    "SCTPAssociationID", "PayloadProtocolID", "HTTPHeaders" 
                ]
            }
        }
    }
}
This CSV parser maps PAN-OS threat log fields to Logstash event fields based on the official PAN-OS syslog field descriptions.

DNS Threat Detection

SafeNetworking has specialized logic to detect and parse DNS-related threats:

DNS EDL (External Dynamic List)

if ([ThreatID] == "Suspicious Domain(12000000)") {
    if ([URL_Filename] =~ "^Suspicious DNS Query") {
        grok {
            match => {"URL_Filename" => "^Suspicious DNS Query \\(%{HOSTNAME:[SFN][domain_name]}\\)"}
            add_field => {"[SFN][processed]" => 0}
            add_field => {"[SFN][threat_name]" => "EDL"}
            add_field => {"[SFN][sig_num]" => "12000000"}
            add_tag => [ "SFN-DNS" ]
            add_tag => [ "SFN-EDL" ]
        }
    }
}

DNS Cloud Security

if ([ThreatCategory] =~ "^dns-cloud" or [ThreatCategory] =~ "^dns-security") {
    grok {
        match => {"URL_Filename" => "^Suspicious DNS Query \\(%{DATA:[SFN][domain_name]}\\)"}
        match => {"ThreatID" =>"^%{NUMBER:[SFN][sig_num]}\\(%{NUMBER}\\)"}
        add_field => {"[SFN][processed]" => 0}
        add_field => {"[SFN][threat_name]" => "CLOUD"}
        add_tag => [ "SFN-CLOUD" ]
        add_tag => [ "SFN-DNS" ]
    }
}

DNS Content-Based Threats

if ([ThreatID] =~ "^Suspicious DNS Query") {
    grok {
        match => {"ThreatID" => "^Suspicious DNS Query \\(%{DATA:[SFN][threat_name]}:?.%{HOSTNAME:[SFN][domain_name]}\\)\\(%{NUMBER:[SFN][sig_num]}\\)"}
        add_field => {"[SFN][processed]" => 0}
        add_tag => [ "SFN-DNS" ]
        add_tag => [ "SFN-CONTENT" ]
    }
}
These patterns extract:
  • Domain name - The malicious or suspicious domain
  • Threat name - Threat classification (EDL, CLOUD, or specific threat)
  • Signature number - AutoFocus signature ID
  • Tags - For routing to SafeNetworking processors

URL Filtering

Parses URL category threats:
if ([Threat_ContentType] =~ "url") {
    mutate {
        add_field => {"[SFN][url_name]" => "%{URL_Filename}"}
        add_field => {"[SFN][url_category]" => "%{URLCategory}"}
        add_field => {"[SFN][processed]" => 1}
        add_tag => [ "SFN-CONTENT" ]
        add_tag => ["SFN-URL"]
        remove_field => [ "URL_Filename","URLCategory" ]
    }
}

GeoIP Enrichment

Adds geographic information for source and destination IPs:
# Geolocate source IP (if not RFC1918)
if [SourceIP] and [SourceIP] !~ "(^127\\.0\\.0\\.1)|(^10\\.)|(^172\\.1[6-9]\\.)|(^172\\.2[0-9]\\.)|(^172\\.3[0-1]\\.)|(^192\\.168\\.)|(^169\\.254\\.)" {
    geoip {
       source => "SourceIP"
       target => "SourceIPGeo"
    }
    
    # Remove invalid 0,0 coordinates
    if ([SourceIPGeo.location] and [SourceIPGeo.location] =~ "0,0") {
        mutate {
            replace => [ "SourceIPGeo.location", "" ]
        }
    }
}

# Geolocate destination IP (if not RFC1918)
if [DestinationIP] and [DestinationIP] !~ "(^127\\.0\\.0\\.1)|(^10\\.)|(^172\\.1[6-9]\\.)|(^172\\.2[0-9]\\.)|(^172\\.3[0-1]\\.)|(^192\\.168\\.)|(^169\\.254\\.)" {
    geoip {
        source => "DestinationIP"
        target => "DestinationIPGeo"
    }
    
    # Remove invalid 0,0 coordinates
    if ([DestinationIPGeo.location] and [DestinationIPGeo.location] =~ "0,0") {
        mutate {
            replace => [ "DestinationIPGeo.location", "" ]
        }
    }
}
This adds geographic data including:
  • Country, city, region
  • Latitude/longitude
  • Timezone
  • Postal code
GeoIP lookups are skipped for private (RFC1918) IP addresses to save processing time.

Flow Fingerprinting

Creates a unique hash for each network flow:
if [SourceIP] and [DestinationIP] {
    fingerprint {
        concatenate_sources => true
        method => "SHA1"
        key => "logstash"
        source => [ "SourceIP", "SourcePort", "DestinationIP", "DestinationPort", "Protocol" ]
    }
}
This 5-tuple hash enables efficient flow-based queries and aggregations.

Output Section

Routes parsed logs to appropriate Elasticsearch indices:
output {
    if "PAN-OS_threat" in [tags] {
        elasticsearch {
            hosts    => [ 'elasticsearch' ]
            index => "threat-%{+YYYY.MM}"
        }
    }
    else if "PAN-OS_traffic" in [tags] {
        elasticsearch {
            hosts    => [ 'elasticsearch' ]
            index => "traffic-%{+YYYY.MM}"
        }
    }
    else if "PAN-OS_system" in [tags] {
        elasticsearch {
            hosts    => [ 'elasticsearch' ]
            index => "system-%{+YYYY.MM}"
        }
    }
    else if "PAN-OS_config" in [tags] {
        elasticsearch {
            hosts    => [ 'elasticsearch' ]
            index => "config-%{+YYYY.MM}"
        }
    }
    else {
        file {
            path => "/var/log/logstash/failed_threat_events-%{+YYYY.MM}.log"
        }
    }
}
Index Patterns:
  • Threat logs → threat-YYYY.MM (monthly indices)
  • Traffic logs → traffic-YYYY.MM
  • System logs → system-YYYY.MM
  • Config logs → config-YYYY.MM
  • Failed events → /var/log/logstash/failed_threat_events-YYYY.MM.log

IoT Pipeline Configuration

Processes IoT threat detection logs from edge routers (/etc/logstash/pipelines/iot.conf).

Input Configuration

input {
    # Input for Edge Routers alerting to known malicious IoT IPs
    udp {
        host => "0.0.0.0"
        port => "5510"
        tags => "IOT_External"
    }
}
Configuration:
  • Port: 5510
  • Tag: IOT_External

Filter Configuration

filter {
    grok {
        match => { "message" => "<%{NONNEGINT:syslog_pri}>%{NONNEGINT} %{TIMESTAMP_ISO8601:syslog_timestamp} %{SYSLOGHOST:received_from} - - - - %{DATA:flex_card} %{DATA:log_prefix}: %{DATA:router_type}: %{DATA} %{GREEDYDATA:log_app} %{GREEDYDATA} -> %{GREEDYDATA} %{DATA:protocol} %{IP:SourceIP} %{IP:DestinationIP}  %{GREEDYDATA}" }
        add_tag => [ "SFN-IOT" ]
    }
    
    # Look up malware info from IoT database
    elasticsearch {
        hosts => ["elasticsearch"]
        index => ["sfn-iot-details"]
        enable_sort => "false"
        query => "ip:%{[DestinationIP]}"
        fields => { "filetype" => "[SFN][file_type]" }
        fields => { "tag_name" => "[SFN][tag_name]" }
        fields => { "public_tag_name" => "[SFN][public_tag_name]" }
        fields => { "tag_description" => "[SFN][tag_description]" }
        fields => { "tag_group_name" => "[SFN][tag_group_name]" }
        fields => { "tag_class" => "[SFN][tag_class]" }
    }
}
The IoT pipeline:
  1. Parses edge router syslog format
  2. Extracts source/destination IPs
  3. Looks up destination IP in IoT threat database
  4. Enriches event with malware classification

Output Configuration

output {
    if "SFN-IOT" in [tags] {
        elasticsearch {
            hosts    => [ 'elasticsearch' ]
            user     => 'elastic'
            password => 'changeme'
            index => "iot-%{+YYYY.MM}"
        }
    }
    else {
        file {
            path => "/var/log/logstash/failed_iot_events-%{+YYYY.MM}.log"
        }
    }
}
Update the user and password fields if you’re using Elasticsearch authentication.

Logstash System Configuration

JVM Memory Settings

Configured in /etc/logstash/config/jvm.options:
# Heap size (initial and maximum should match)
-Xms1g
-Xmx1g
Default: 1GB heap size
Adjust heap size based on log volume. For high-throughput environments, increase to 2-4GB.

Service Management

# Enable and start Logstash
sudo systemctl daemon-reload
sudo systemctl enable logstash.service
sudo systemctl restart logstash.service

# Check status
sudo systemctl status logstash.service

# Test UDP port
(echo >/dev/udp/localhost/5514) >/dev/null 2>&1 && echo "Port open" || echo "Port closed"

Adding Custom Pipelines

To add a custom pipeline for specialized processing:
1

Create Pipeline Configuration

Create a new configuration file:
sudo nano /etc/logstash/pipelines/custom.conf
Define input, filter, and output sections:
input {
    udp {
        host => "0.0.0.0"
        port => "5515"
        tags => [ "custom" ]
    }
}

filter {
    # Add custom filters here
    if "custom" in [tags] {
        # Your processing logic
    }
}

output {
    if "custom" in [tags] {
        elasticsearch {
            hosts => [ 'elasticsearch' ]
            index => "custom-%{+YYYY.MM}"
        }
    }
}
2

Register Pipeline

Add the pipeline to /etc/logstash/pipelines.yml:
- pipeline.id: custom
  path.config: "/etc/logstash/pipelines/custom.conf"
3

Restart Logstash

Restart the service to load the new pipeline:
sudo systemctl restart logstash.service
4

Verify Pipeline

Check logs to ensure the pipeline loaded:
sudo tail -f /var/log/logstash/logstash-plain.log | grep custom

Optional Enrichments

The threat pipeline includes commented-out enrichment options:

DNS Resolution

Resolve malicious domains to IPs (use with caution):
# Copy domain to resolved_ip field
mutate {
    copy => { "[SFN][domain_name]" => "[SFN][resolved_ip]"}
}

# Resolve domain
dns {
    timeout => 1
    resolve => [ "[SFN][resolved_ip]" ]
    action => "replace"
}
Security Risk: Only enable this if your logging system is NOT behind a firewall that will detect the DNS query as a threat, or you’ll create a recursive logging loop.

Customer Database Enrichment

Lookup customer information from Elasticsearch:
elasticsearch {
    hosts => ["elasticsearch"]
    index => ["customer-db"]
    enable_sort => "false"
    query => "imsi:%{[TunnelID_IMSI]}"
    fields => { "name" => "[CUSTOMER][name]" }
    fields => { "address" => "[CUSTOMER][address]" }
    fields => { "city" => "[CUSTOMER][city]" }
    fields => { "country" => "[CUSTOMER][country]" }
    fields => { "zip" => "[CUSTOMER][zip]" }
    fields => { "phone" => "[CUSTOMER][phone]" }
    fields => { "email" => "[CUSTOMER][email]" }
    fields => { "acct_number" => "[CUSTOMER][acct_number]" }
    fields => { "imsi" => "[CUSTOMER][imsi]" }
    fields => { "imei" => "[CUSTOMER][imei]" }
    fields => { "ip_addr" => "[CUSTOMER][ip_addr]" }
}
This enriches events with customer details for service provider deployments.

Troubleshooting

Logstash Not Starting

Check logs:
sudo journalctl -u logstash.service -f
sudo tail -f /var/log/logstash/logstash-plain.log
Common issues:
  • JVM heap too large for available RAM
  • Syntax errors in pipeline configuration
  • Port already in use

No Events in Elasticsearch

Verify Logstash is receiving data:
# Send test syslog message
logger -n localhost -P 5514 "Test message"

# Check Logstash metrics
curl -XGET 'localhost:9600/_node/stats/pipelines?pretty'
Check firewall:
sudo firewall-cmd --list-all
sudo firewall-cmd --permanent --add-port=5514/udp
sudo firewall-cmd --reload

Pipeline Errors

Check pipeline status:
curl -XGET 'localhost:9600/_node/pipelines?pretty'
Validate configuration:
sudo /usr/share/logstash/bin/logstash --config.test_and_exit \
  -f /etc/logstash/pipelines/threat.conf

High Memory Usage

Solutions:
  • Increase JVM heap in /etc/logstash/config/jvm.options
  • Reduce batch size in pipeline configuration
  • Split pipelines for better resource distribution

Performance Tuning

Optimize Logstash for high-throughput environments:
# In /etc/logstash/logstash.yml
pipeline.workers: 4
pipeline.batch.size: 125
pipeline.batch.delay: 50
  • workers: Number of parallel processing threads (default: CPU cores)
  • batch.size: Events to process per batch (default: 125)
  • batch.delay: Max wait time in milliseconds for batch to fill (default: 50)
Start with defaults and increase pipeline.workers if CPU is underutilized during peak load.

Source References

Pipeline configurations:
  • install/logstash/pipelines.yml - Pipeline registration
  • install/logstash/threat.conf - Main threat processing pipeline
  • install/logstash/iot.conf - IoT threat detection pipeline
  • install/logstash/config/jvm.options - JVM memory settings
  • install/setup.sh:93-107 - Automated installation script

Build docs developers (and LLMs) love