Documentation Index
Fetch the complete documentation index at: https://mintlify.com/utmstack/UTMStack/llms.txt
Use this file to discover all available pages before exploring further.
Monitor the health and activity of data sources to ensure continuous data collection.
Status Monitoring
Data sources have two key status indicators:
- Status - Boolean indicating if the source is actively configured
- Timestamp - Last time data was received from the source
Health Indicators
Active Sources
A data source is considered healthy when:
status is true
timestamp is recent (typically within last 15 minutes)
eventsCount is incrementing
Inactive Sources
A data source may be inactive if:
status is false (manually disabled)
timestamp is old (no recent data)
eventsCount is zero or not incrementing
Monitoring Examples
Check All Source Status
curl -X GET https://your-utmstack-instance.com/api/utm-data-input-statuses?size=100 \
-H "Authorization: Bearer eyJhbGciOiJIUzUxMiJ9..."
Python Monitoring Script
import requests
from datetime import datetime, timedelta
def check_data_source_health(api_url, token):
"""
Check health of all data sources and alert on issues.
"""
headers = {"Authorization": f"Bearer {token}"}
# Get all sources
response = requests.get(
f"{api_url}/utm-data-input-statuses",
headers=headers,
params={"size": 1000}
)
sources = response.json()
# Check each source
inactive_threshold = datetime.utcnow() - timedelta(minutes=15)
issues = []
for source in sources:
if not source["status"]:
issues.append({
"source": source["dataTypeName"],
"issue": "Source is disabled",
"severity": "warning"
})
continue
# Check timestamp
last_seen = datetime.fromisoformat(
source["timestamp"].replace("Z", "+00:00")
)
if last_seen < inactive_threshold:
minutes_ago = int((datetime.utcnow() - last_seen.replace(tzinfo=None)).total_seconds() / 60)
issues.append({
"source": source["dataTypeName"],
"issue": f"No data received for {minutes_ago} minutes",
"severity": "critical" if minutes_ago > 60 else "warning",
"last_seen": source["timestamp"]
})
return issues
# Usage
api_url = "https://your-utmstack-instance.com/api"
token = "YOUR_TOKEN"
issues = check_data_source_health(api_url, token)
for issue in issues:
print(f"[{issue['severity'].upper()}] {issue['source']}: {issue['issue']}")
JavaScript Monitoring
async function checkDataSourceHealth(apiUrl, token) {
const headers = { Authorization: `Bearer ${token}` };
// Get all sources
const response = await fetch(
`${apiUrl}/utm-data-input-statuses?size=1000`,
{ headers }
);
const sources = await response.json();
// Check each source
const inactiveThreshold = new Date(Date.now() - 15 * 60 * 1000);
const issues = [];
for (const source of sources) {
if (!source.status) {
issues.push({
source: source.dataTypeName,
issue: 'Source is disabled',
severity: 'warning'
});
continue;
}
const lastSeen = new Date(source.timestamp);
if (lastSeen < inactiveThreshold) {
const minutesAgo = Math.floor((Date.now() - lastSeen.getTime()) / 60000);
issues.push({
source: source.dataTypeName,
issue: `No data received for ${minutesAgo} minutes`,
severity: minutesAgo > 60 ? 'critical' : 'warning',
lastSeen: source.timestamp
});
}
}
return issues;
}
// Usage
const apiUrl = 'https://your-utmstack-instance.com/api';
const token = 'YOUR_TOKEN';
const issues = await checkDataSourceHealth(apiUrl, token);
issues.forEach(issue => {
console.log(`[${issue.severity.toUpperCase()}] ${issue.source}: ${issue.issue}`);
});
Automated Alerts
Send Notifications for Inactive Sources
import requests
from datetime import datetime, timedelta
def alert_on_inactive_sources(api_url, token, webhook_url):
"""
Check for inactive sources and send alerts via webhook.
"""
headers = {"Authorization": f"Bearer {token}"}
# Get all sources
response = requests.get(
f"{api_url}/utm-data-input-statuses",
headers=headers,
params={"size": 1000}
)
sources = response.json()
# Find inactive sources
threshold = datetime.utcnow() - timedelta(minutes=30)
inactive = []
for source in sources:
if not source["status"]:
continue
last_seen = datetime.fromisoformat(
source["timestamp"].replace("Z", "+00:00")
)
if last_seen < threshold:
inactive.append({
"name": source["dataTypeName"],
"type": source["dataType"],
"lastSeen": source["timestamp"]
})
# Send alert if inactive sources found
if inactive:
alert_message = {
"title": "Data Sources Inactive",
"message": f"{len(inactive)} data source(s) have not reported data in over 30 minutes",
"sources": inactive,
"timestamp": datetime.utcnow().isoformat()
}
requests.post(webhook_url, json=alert_message)
return inactive
return []
Metrics Collection
Collect Source Metrics Over Time
import requests
import time
from datetime import datetime
def collect_source_metrics(api_url, token, interval_seconds=60):
"""
Continuously collect data source metrics.
"""
headers = {"Authorization": f"Bearer {token}"}
metrics_log = []
while True:
response = requests.get(
f"{api_url}/utm-data-input-statuses",
headers=headers,
params={"size": 1000}
)
sources = response.json()
# Record metrics
snapshot = {
"timestamp": datetime.utcnow().isoformat(),
"total_sources": len(sources),
"active_sources": sum(1 for s in sources if s["status"]),
"total_events": sum(s.get("eventsCount", 0) for s in sources),
"sources": [
{
"name": s["dataTypeName"],
"events": s.get("eventsCount", 0),
"active": s["status"]
}
for s in sources
]
}
metrics_log.append(snapshot)
print(f"Collected metrics: {snapshot['active_sources']}/{snapshot['total_sources']} sources active")
# Save or process metrics
# ... save to database, send to monitoring system, etc.
time.sleep(interval_seconds)
# Run continuously
# collect_source_metrics("https://your-utmstack-instance.com/api", "YOUR_TOKEN")
Best Practices
- Regular Polling: Check source status every 5-15 minutes
- Threshold Alerts: Alert when sources are inactive for >30 minutes
- Trend Analysis: Track event counts over time to detect anomalies
- Dashboard Integration: Display source health on monitoring dashboards
- Automated Recovery: Attempt to restart or reconfigure failed sources