Skip to main content

Overview

SafeNetworking enriches firewall threat events with malware intelligence from AutoFocus. The processing pipeline implements intelligent caching, parallel processing, and adaptive rate limiting to maximize throughput while staying within API quotas.

Event Lifecycle

Complete Event Journey

1

Event Creation

Logstash receives syslog from PAN firewall, parses fields, and indexes to Elasticsearch threat-* with SFN.processed=0
2

Event Discovery

DNS background thread queries for unprocessed events every DNS_POOL_TIME seconds (default: 5)
3

Event Classification

Events categorized as primary (cached domain) or secondary (requires AutoFocus lookup)
4

Parallel Enrichment

Multi-process pool (up to 16 workers) enriches events with domain intelligence and tag assessment
5

Event Update

Enriched data written back to event document with SFN.processed=1 or 55
6

Visualization

Updated events available in Kibana dashboards with threat context, confidence scores, and malware classification

DNS Event Processing

The DNS runner queries for events tagged as DNS threats that haven’t been enriched:
def unprocessedEventSearch():
    qSize = app.config["DNS_EVENT_QUERY_SIZE"]  # Default: 1000
    
    eventSearch = Search(index="threat-*") \
        .query("match", tags="DNS") \
        .query("match", ** { "SFN.processed":0})  \
        .sort({"@timestamp": {"order" : "desc"}})
    eventSearch = eventSearch[:qSize]
    searchResponse = eventSearch.execute()
From project/dns/runner.py:13-39
Query Strategy: Events are processed newest-first to prioritize recent threats. The batch size balances throughput with memory usage.

Event Classification

Events are classified based on cache availability:
for hit in searchResponse.hits:
    domainName = hit['SFN']['domain_name']
    
    # Check if domain is cached
    domainSearch = Search(index="sfn-domain-details") \
        .query("match", name=domainName)
    
    if domainSearch.execute():
        priDocIds.append(entry)  # Primary: use cache
    else:
        secDocIds.append(entry)  # Secondary: needs lookup
From project/dns/runner.py:44-66 Primary Events: Domain intelligence cached in sfn-domain-details
  • Faster processing (no API call)
  • Cache validated for age (DNS_DOMAIN_INFO_MAX_AGE)
  • If expired, treated as secondary
Secondary Events: Domain not cached or cache expired
  • Requires AutoFocus API lookup (10 points)
  • Tag metadata queries (2 points per tag)
  • Results cached for future events

Multi-Process Enrichment

Both primary and secondary events are processed in parallel:
def processDNS():
    priDocIds, secDocIds = unprocessedEventSearch()
    
    # Cap at 16 for AutoFocus minute point limits
    multiProcNum = min(app.config['DNS_POOL_COUNT'], 16)
    
    # Process primary events (cached domains)
    with Pool(multiProcNum) as pool:
        results = pool.map(searchDomain, priDocIds)
    
    # Process secondary events (AutoFocus lookups)
    with Pool(multiProcNum) as pool:
        results = pool.map(searchDomain, secDocIds)
From project/dns/runner.py:85-107
Rate Limiting: AutoFocus allows maximum 16 API calls per minute. The system enforces DNS_POOL_COUNT + URL_POOL_COUNT <= 16 to prevent quota violations.

Domain Search Workflow

The searchDomain() function orchestrates event enrichment:
def searchDomain(event):
    eventDomainName = event['domain_name']
    
    # Get or create domain document
    domainDoc = getDomainDoc(eventDomainName)
    
    # Assess tags to find best match
    if domainDoc.tags == None:
        eventTag = {
            'tag_name': 'No tags found for domain',
            'confidence_level': 0
        }
        processedValue = 55
    else:
        eventTag = assessTags(domainDoc.tags)
        processedValue = 1
    
    # Update event with enrichment
    eventDoc = DNSEventDoc.get(id=eventID, index=eventIndex)
    eventDoc.SFN.tag_name = eventTag['tag_name']
    eventDoc.SFN.confidence_level = eventTag['confidence_level']
    eventDoc.SFN.processed = processedValue
    eventDoc.save()
From project/dns/runner.py:131-206

AutoFocus Integration

Domain Lookup API Call

When domain is not cached or expired, SafeNetworking queries AutoFocus:
def getDomainInfo(threatDomain):
    searchData = {
        "apiKey": app.config['AUTOFOCUS_API_KEY'],
        "query": {
            "operator": "all",
            "children": [{
                "field": "alias.domain",
                "operator": "contains",
                "value": threatDomain
            }]
        },
        "size": 100,
        "from": 0,
        "sort": {"create_date": {"order": "desc"}},
        "scope": "global",
        "artifactSource": "af"
    }
    
    # Initial query returns a "cookie" (10 API points)
    queryResponse = requests.post(
        url=app.config["AUTOFOCUS_SEARCH_URL"],
        headers={"Content-Type": "application/json"},
        data=json.dumps(searchData)
    )
From project/dns/dnsutils.py:343-384 API Point Costs:
  • Domain search: 10 points
  • Results polling: 2 points per check
  • Tag metadata: 2 points per tag

Asynchronous Result Retrieval

AutoFocus searches are asynchronous. SafeNetworking polls for completion:
cookie = queryData['af_cookie']
cookieURL = resultURL + cookie

# Poll until timeout or completion threshold
for timer in range(lookupTimeout):  # Default: 2 minutes
    time.sleep(61)  # Wait 1 minute
    cookieResults = requests.post(url=cookieURL, data=resultData)
    domainData = cookieResults.json()
    
    # Check if enough results returned
    if domainData['af_complete_percentage'] >= maxPercentage:  # Default: 20%
        break
From project/dns/dnsutils.py:418-438
Performance Optimization: SafeNetworking doesn’t wait for 100% completion. If 20% of results are returned within 2 minutes, it proceeds with available data. This balances accuracy with throughput.

Tag Extraction from Samples

Each malware sample contains multiple tags. SafeNetworking extracts and caches them:
def processTagList(tagObj):
    tagList = list()
    sample = tagObj['_source']
    
    if 'tag' in sample:
        for tagName in sample['tag']:
            tagData = processTag(tagName)  # 2 API points if not cached
            tagList.append(tagData)
    
    return tagList
From project/dns/dnsutils.py:131-150 The processTag() function checks sfn-tag-details cache before querying AutoFocus.

Rate Limit Handling

AutoFocus returns quota info with each response. SafeNetworking monitors and reacts:
if 'message' in queryData:
    if "Daily Bucket Exceeded" in queryData['message']:
        checkAfPoints(queryData['bucket_info'])
        # Retry after throttling/sleeping
    elif "Minute Bucket Exceeded" in queryData['message']:
        checkAfPoints(queryData['bucket_info'])
        # Retry after throttling/sleeping
From project/dns/dnsutils.py:388-412

AutoFocus Point Management

Adaptive Throttling

The system adjusts processing speed based on remaining quota:
def checkAfPoints(bucketInfo):
    pointsRemaining = bucketInfo['daily_points_remaining']
    
    # Critical: Stop all processing
    if pointsRemaining <= app.config['AF_POINT_NOEXEC']:  # Default: 500
        while resetFlag:
            app.logger.info(f"Sleeping for {afNoExecTime} seconds")
            time.sleep(app.config['AF_NOEXEC_CKTIME'])  # Default: 3600
            # Check if quota reset
            newBucketInfo = getTagInfo("WildFireTest")
            if newBucketInfo['daily_points_remaining'] > noExecPoints:
                resetFlag = False
    
    # Warning: Slow to single-threaded
    elif pointsRemaining < app.config['AF_POINTS_LOW']:  # Default: 5000
        app.config['AF_POINTS_MODE'] = True
    
    # Normal: Multi-threaded processing
    else:
        app.config['AF_POINTS_MODE'] = False
From project/dns/dnsutils.py:59-105 Processing Modes:
ModeQuota RemainingBehavior
Normal> 5000 pointsMulti-threaded (up to 16 workers)
Slow500-5000 pointsSingle-threaded (1 event at a time)
Halted< 500 pointsSleep 1 hour, recheck quota

Point Tracking

Background thread updates quota every 10 minutes:
def updateAfStats():
    # Query for current point totals (2 API points)
    returnData = getTagInfo("WildFireTest")
    afInfo = returnData['bucket_info']
    
    # Update af-details document
    afDoc.daily_points_remaining = afInfo['daily_points_remaining']
    afDoc.minute_points_remaining = afInfo['minute_points_remaining']
    afDoc.save()
From project/dns/dnsutils.py:15-56

Tag Assessment and Confidence Scoring

Tag Prioritization Algorithm

When a domain has multiple threat tags, SafeNetworking selects the most significant:
def assessTags(tagsObj):
    # Priority order: Campaign > Actor > Malware Family
    
    for entry in tagsObj:
        for tag in entry[2]:  # tag list
            tagClass = tag[2]
            
            # Highest priority: Campaign attribution
            if tagClass == "campaign":
                return {
                    "tag_name": tagName,
                    "tag_class": tagClass,
                    "confidence_level": 90
                }
            
            # Medium priority: Threat actor
            elif tagClass == "actor":
                return {
                    "tag_name": tagName,
                    "tag_class": tagClass,
                    "confidence_level": 90
                }
            
            # Lower priority: Malware family (scored by age)
            elif tagClass == "malware_family":
                confidence = calculateConfidence(sampleDate)
                return {
                    "tag_name": tagName,
                    "tag_class": tagClass,
                    "confidence_level": confidence
                }
From project/dns/dnsutils.py:243-340 Tag Class Priority:
  1. Campaign (e.g., “APT28”, “Sofacy”) - Highest confidence (90%)
  2. Actor (e.g., “Lazarus Group”) - High confidence (90%)
  3. Malware Family (e.g., “Gootkit”, “Mirai”) - Age-based scoring

Confidence Scoring for Malware Families

Malware family confidence degrades over time:
def calculateConfidence(sampleDate):
    dateDiff = datetime.now() - datetime.strptime(sampleDate, "%Y-%m-%dT%H:%M:%S")
    ageInDays = dateDiff.days
    
    # Default confidence levels
    # {'15':90, '25':80, '40':70, '50':60, '60':50}
    tagConfLevels = literal_eval(app.config['CONFIDENCE_LEVELS'])
    
    for days in tagConfLevels:
        if ageInDays < int(days):
            return tagConfLevels[days]
    
    # Older than 60 days: very low confidence
    return 5
From project/dns/dnsutils.py:297-318 Default Confidence Levels:
Sample AgeConfidenceRationale
0-15 days90%Very recent threat activity
16-25 days80%Recent threat activity
26-40 days70%Moderately recent activity
41-50 days60%Aging threat intelligence
51-60 days50%Old threat intelligence
60+ days5%Potentially stale intelligence
Confidence levels are configurable via the CONFIDENCE_LEVELS parameter in .panrc.

Special Processing States

Events without usable threat intelligence are marked differently:
# No tags found for domain
if domainDoc.tags == None:
    eventTag = {
        'tag_name': 'No tags found for domain',
        'confidence_level': 0
    }
    processedValue = 55  # Special state for no tags

# Low priority tags (not campaign/actor/malware_family)
else:
    eventTag = {
        'tag_name': 'Low Priority Tags',
        'confidence_level': 0
    }
    processedValue = 1
From project/dns/runner.py:160-174 and project/dns/dnsutils.py:327-336

IoT Event Processing

External Database Synchronization

IoT processing differs from DNS - it pulls from an external honeypot database:
def processIoT():
    # Calculate time since last update
    latestTime = round(getLatestTime('sfn-iot-details'))  # in minutes
    
    # Query external API for new IPs
    hpQuery = f"{app.config['IOT_DB_URL']}/query_sfn_ip?gap={latestTime}"
    queryResponse = requests.get(url=hpQuery)
    updateDict = literal_eval(queryResponse.text)
From project/iot/runner.py:136-156

Data Normalization

IoT threat data is normalized to match SafeNetworking schema:
def normalizeIoTData(updateDict):
    for item in updateDict['data']:
        familyInfo = literal_eval(item['familyinfo'])
        
        # Normalize family names (e.g., "mirai" -> "Unit42.ELFMirai")
        tag_name, public_tag_name = __normalizeFamilyInfo(familyInfo)
        
        # Get tag metadata from AutoFocus
        tagObject = processTag(tag_name)
        
        normalizedData = {
            'id': item['id'],
            'ip': item['ip'],
            'tag_name': tag_name,
            'public_tag_name': public_tag_name,
            'tag_class': tagObject[2],
            'tag_group_name': tagObject[3],
            'description': tagObject[4]
        }
From project/iot/runner.py:65-94

Cache Update

Normalized IoT data is upserted to sfn-iot-details:
def updateLocalIoTDB(updateDict):
    for item in updateDict['data']:
        try:
            iotDoc = IoTDetailsDoc.get(id=item['id'])
        except NotFoundError:
            # Create new document if not exists
            iotDoc = IoTDetailsDoc(meta={'id': item['id']})
        
        iotDoc.ip = item['ip']
        iotDoc.tag_name = item['tag_name']
        iotDoc.tag_class = item['tag_class']
        iotDoc.save()
From project/iot/runner.py:97-133

Error Handling

Processing Failures

Individual event failures don’t stop batch processing:
try:
    results = searchDomain(event)
    return f"{eventID} save: SUCCESS"
except TransportError as te:
    app.logger.error(f"Transport Error: {te.info}")
    return f"{eventID} save: FAIL"
except Exception as e:
    app.logger.error(f"Unable to work with event doc {eventID} - {e}")
    return f"{eventID} save: FAIL"
From project/dns/runner.py:196-206 Failed events remain in processed=0 state and are retried on next cycle.

AutoFocus API Errors

API failures are logged but don’t crash the system:
if 'af_cookie' not in queryData:
    app.logger.error(f"Unable to retrieve domain info from AutoFocus")
    # Return empty result set
    return [('2000-01-01T00:00:00', 'NA', 
            [('No Samples Returned for Domain', ...)])]
From project/dns/dnsutils.py:453-456

Elasticsearch Connection Issues

try:
    eventSearch = Search(index="threat-*").execute()
except ConnectionTimeout:
    app.logger.error(f"Connection timeout to Elasticsearch")
except Exception as e:
    app.logger.debug(f"Error connecting to Elasticsearch: {e}")
From project/dns/runner.py:70-73

Performance Optimization

Cache Strategy

  • Cache Duration: 30 days (configurable)
  • Cache Key: Domain name
  • Cache Value: Full tag list with sample dates
  • Invalidation: Lazy (check on read)
  • Cache Duration: 120 days (configurable)
  • Cache Key: Tag name
  • Cache Value: Tag metadata and classification
  • Invalidation: Lazy (check on read)
  • Cache Duration: Indefinite
  • Cache Key: Honeypot record ID
  • Cache Value: IP and malware family
  • Invalidation: Upsert on update

Batch Processing

# Process up to 1000 events per cycle
app.config['DNS_EVENT_QUERY_SIZE'] = 1000

# Run cycle every 5 seconds
app.config['DNS_POOL_TIME'] = 5

# Theoretical maximum: 12,000 events/minute
# Actual: Limited by AutoFocus quota and cache hit rate

Parallel Execution

# 16 workers processing simultaneously
multiProcNum = 16

with Pool(multiProcNum) as pool:
    # Each worker processes events independently
    results = pool.map(searchDomain, priDocIds)
Tuning Recommendations:
  • High cache hit rate (>80%): Increase DNS_POOL_COUNT to 16
  • Low AutoFocus quota: Decrease DNS_POOL_COUNT to 4-8
  • High event volume: Increase DNS_EVENT_QUERY_SIZE to 2000+
  • Low-latency Elasticsearch: Decrease DNS_POOL_TIME to 3 seconds

Monitoring and Observability

Key Metrics to Track

  1. Processing Rate: Events enriched per minute
  2. Cache Hit Rate: Percentage of primary vs secondary events
  3. AutoFocus Quota: daily_points_remaining in af-details
  4. Error Rate: Failed enrichments per cycle
  5. Processing Lag: Time delta between event creation and enrichment

Log Analysis

All processing activities are logged to log/sfn.log:
[DEBUG]    : 2026-03-04 10:23:45 : dns.runner:processDNS:[85] : Gathering 1000 THREAT events
[DEBUG]    : 2026-03-04 10:23:46 : dns.runner:processDNS:[95] : Running pri-keys on 16 processes
[INFO]     : 2026-03-04 10:23:47 : dns.dnsutils:getDomainDoc:[492] : No local cache for malicious.example.com
[DEBUG]    : 2026-03-04 10:23:48 : dns.dnsutils:getDomainInfo:[380] : Gathering domain info (10 API-points)

Next Steps

Architecture

Review system architecture and component interaction

Data Model

Explore Elasticsearch schemas and field definitions

Build docs developers (and LLMs) love