|
/** |
|
* Anomaly Detection Module |
|
* |
|
* Handles all OpenSearch Anomaly Detection plugin operations including: |
|
* - Detector configuration and creation |
|
* - Feature engineering and aggregation setup |
|
* - Model training and validation |
|
* - Real-time and historical analysis |
|
* - Result interpretation and alerting |
|
* - Detector lifecycle management |
|
* |
|
* This module abstracts the complexity of the OpenSearch AD plugin and provides |
|
* high-level methods for common anomaly detection workflows. |
|
* |
|
* OpenSearch Anomaly Detection Documentation: |
|
* - Overview: https://opensearch.org/docs/latest/monitoring-plugins/ad/ |
|
* - API Reference: https://opensearch.org/docs/latest/monitoring-plugins/ad/api/ |
|
* - Settings: https://opensearch.org/docs/latest/monitoring-plugins/ad/settings/ |
|
* - Troubleshooting: https://opensearch.org/docs/latest/monitoring-plugins/ad/troubleshoot/ |
|
*/ |
|
|
|
/** |
|
* Anomaly Detection Manager class |
|
* |
|
* Provides high-level interface for OpenSearch anomaly detection operations |
|
*/ |
|
const ProgressTracker = require('./progress-tracker') |
|
|
|
class AnomalyDetectionManager { |
|
/** |
|
* Initialize the anomaly detection manager |
|
* |
|
* @param {OpenSearchClient} client - Initialized OpenSearch client instance |
|
* @param {Object} defaultConfig - Default configuration options |
|
*/ |
|
constructor(client, defaultConfig = {}) { |
|
this.client = client |
|
this.indexName = client.indexName |
|
|
|
// Default configuration for anomaly detectors |
|
// These can be overridden when creating specific detectors |
|
this.defaultConfig = { |
|
// Detector naming and description |
|
namePrefix: this.indexName || 'anomaly-detector', |
|
description: 'Automated anomaly detection for system metrics', |
|
|
|
// Time series configuration |
|
timeField: 'timestamp', |
|
intervalMinutes: 5, // Detection interval |
|
windowDelayMinutes: 1, // Processing delay to account for late data |
|
|
|
// Feature configuration |
|
maxFeatures: 5, // Maximum features per detector (OpenSearch limit) |
|
aggregationWindow: '1m', // Aggregation window for features |
|
|
|
// Model training |
|
trainingDataPeriod: 7, // Days of data needed for training |
|
minTrainingData: 256, // Minimum data points for training |
|
|
|
// Detection sensitivity |
|
anomalyGradeThreshold: 0.7, // Minimum grade to consider anomalous |
|
confidenceThreshold: 0.8, // Minimum confidence for alerts |
|
|
|
// Performance settings |
|
maxCategoryFields: 2, // Maximum category fields for high cardinality detection |
|
shingleSize: 8, // Time series shingle size (OpenSearch default) |
|
|
|
...defaultConfig |
|
} |
|
|
|
// Track created detectors for management |
|
this.activeDetectors = new Map() |
|
|
|
console.log('🔍 Anomaly Detection Manager initialized') |
|
console.log(` Default interval: ${this.defaultConfig.intervalMinutes} minutes`) |
|
console.log(` Training period: ${this.defaultConfig.trainingDataPeriod} days`) |
|
} |
|
|
|
/** |
|
* Create a basic single-metric anomaly detector |
|
* |
|
* Ideal for detecting anomalies in a single metric like CPU usage, |
|
* response time, or error rate. |
|
* |
|
* @param {Object} config - Detector configuration |
|
* @param {string} config.name - Detector name |
|
* @param {string} config.metricField - Field containing the metric values |
|
* @param {string} config.category - Optional category filter |
|
* @param {string} config.aggregationType - Aggregation type (avg, max, min, sum) |
|
* @returns {Object} Created detector information |
|
*/ |
|
async createBasicDetector(config) { |
|
const { |
|
name = `${this.defaultConfig.namePrefix}-basic`, |
|
metricField = 'metric_value', |
|
category = null, |
|
aggregationType = 'avg', |
|
description = `Basic anomaly detection for ${metricField}` |
|
} = config |
|
|
|
console.log(`🔧 Creating basic detector: ${name}`) |
|
console.log(` Metric: ${metricField} (${aggregationType})`) |
|
console.log(` Category filter: ${category || 'none'}`) |
|
|
|
// Build feature definition |
|
// Features define what metrics to analyze and how to aggregate them |
|
// Documentation: https://opensearch.org/docs/latest/monitoring-plugins/ad/api/#create-detector |
|
const featureName = `${metricField}_${aggregationType}` |
|
const featureConfig = { |
|
feature_name: featureName, |
|
feature_enabled: true, |
|
aggregation_query: { |
|
[featureName]: { |
|
[aggregationType]: { |
|
field: metricField |
|
} |
|
} |
|
} |
|
} |
|
|
|
// Build filter query if category is specified |
|
let filterQuery = { match_all: {} } |
|
if (category) { |
|
filterQuery = { |
|
bool: { |
|
filter: [ |
|
{ term: { category: category } } |
|
] |
|
} |
|
} |
|
} |
|
|
|
// Create detector configuration |
|
const detectorConfig = { |
|
name: name, |
|
description: description, |
|
time_field: this.defaultConfig.timeField, |
|
indices: [this.indexName], |
|
feature_attributes: [featureConfig], |
|
filter_query: filterQuery, |
|
detection_interval: { |
|
period: { |
|
interval: this.defaultConfig.intervalMinutes, |
|
unit: 'Minutes' |
|
} |
|
}, |
|
window_delay: { |
|
period: { |
|
interval: this.defaultConfig.windowDelayMinutes, |
|
unit: 'Minutes' |
|
} |
|
} |
|
} |
|
|
|
try { |
|
const detector = await this.client.createAnomalyDetector(detectorConfig) |
|
|
|
// Track the detector |
|
this.activeDetectors.set(detector._id, { |
|
id: detector._id, |
|
name: name, |
|
type: 'basic', |
|
config: detectorConfig, |
|
created: new Date(), |
|
status: 'created' |
|
}) |
|
|
|
console.log(`✅ Basic detector created successfully: ${detector._id}`) |
|
return detector |
|
|
|
} catch (error) { |
|
console.error(`❌ Failed to create basic detector: ${error.message}`) |
|
throw error |
|
} |
|
} |
|
|
|
/** |
|
* Create a multi-metric detector for comprehensive monitoring |
|
* |
|
* Analyzes multiple related metrics simultaneously to detect |
|
* complex anomalies that span multiple dimensions. |
|
* |
|
* @param {Object} config - Multi-metric detector configuration |
|
* @param {string} config.name - Detector name |
|
* @param {Array} config.metrics - Array of metric configurations |
|
* @param {Object} config.filters - Additional filters |
|
* @returns {Object} Created detector information |
|
*/ |
|
async createMultiMetricDetector(config) { |
|
const { |
|
name = `${this.defaultConfig.namePrefix}-multi-metric`, |
|
metrics = [], |
|
filters = {}, |
|
description = 'Multi-metric anomaly detection for system monitoring' |
|
} = config |
|
|
|
console.log(`🔧 Creating multi-metric detector: ${name}`) |
|
console.log(` Metrics: ${metrics.length}`) |
|
|
|
if (metrics.length === 0) { |
|
throw new Error('At least one metric configuration is required') |
|
} |
|
|
|
if (metrics.length > this.defaultConfig.maxFeatures) { |
|
console.warn(`⚠️ Warning: ${metrics.length} metrics exceeds recommended limit of ${this.defaultConfig.maxFeatures}`) |
|
} |
|
|
|
// Build feature configurations for each metric |
|
const featureAttributes = [] |
|
|
|
for (const metric of metrics) { |
|
const { |
|
field, |
|
aggregations = ['avg'], |
|
category = null, |
|
enabled = true |
|
} = metric |
|
|
|
// Create features for each aggregation type |
|
for (const aggType of aggregations) { |
|
const featureName = `${field}_${aggType}` |
|
if (category) { |
|
featureName += `_${category}` |
|
} |
|
|
|
const featureConfig = { |
|
feature_name: featureName, |
|
feature_enabled: enabled, |
|
aggregation_query: { |
|
[featureName]: { |
|
[aggType]: { |
|
field: field |
|
} |
|
} |
|
} |
|
} |
|
|
|
featureAttributes.push(featureConfig) |
|
console.log(` + Feature: ${featureName}`) |
|
} |
|
} |
|
|
|
// Build comprehensive filter query |
|
const mustFilters = [] |
|
|
|
// Add time range filter for recent data |
|
mustFilters.push({ |
|
range: { |
|
[this.defaultConfig.timeField]: { |
|
gte: 'now-1h' |
|
} |
|
} |
|
}) |
|
|
|
// Add custom filters |
|
Object.entries(filters).forEach(([field, value]) => { |
|
if (Array.isArray(value)) { |
|
mustFilters.push({ terms: { [field]: value } }) |
|
} else { |
|
mustFilters.push({ term: { [field]: value } }) |
|
} |
|
}) |
|
|
|
const filterQuery = { |
|
bool: { |
|
filter: mustFilters |
|
} |
|
} |
|
|
|
// Create detector configuration |
|
const detectorConfig = { |
|
name: name, |
|
description: description, |
|
time_field: this.defaultConfig.timeField, |
|
indices: [this.indexName], |
|
feature_attributes: featureAttributes, |
|
filter_query: filterQuery, |
|
detection_interval: { |
|
period: { |
|
interval: this.defaultConfig.intervalMinutes, |
|
unit: 'Minutes' |
|
} |
|
}, |
|
window_delay: { |
|
period: { |
|
interval: this.defaultConfig.windowDelayMinutes, |
|
unit: 'Minutes' |
|
} |
|
} |
|
} |
|
|
|
try { |
|
const detector = await this.client.createAnomalyDetector(detectorConfig) |
|
|
|
// Track the detector |
|
this.activeDetectors.set(detector._id, { |
|
id: detector._id, |
|
name: name, |
|
type: 'multi-metric', |
|
config: detectorConfig, |
|
metrics: metrics, |
|
created: new Date(), |
|
status: 'created' |
|
}) |
|
|
|
console.log(`✅ Multi-metric detector created successfully: ${detector._id}`) |
|
return detector |
|
|
|
} catch (error) { |
|
console.error(`❌ Failed to create multi-metric detector: ${error.message}`) |
|
throw error |
|
} |
|
} |
|
|
|
/** |
|
* Create a high-cardinality detector for entity-based anomaly detection |
|
* |
|
* Detects anomalies across multiple entities (servers, users, etc.) |
|
* using category fields to separate different entity behaviors. |
|
* |
|
* Documentation: https://opensearch.org/docs/latest/monitoring-plugins/ad/api/#create-detector |
|
* |
|
* @param {Object} config - High-cardinality detector configuration |
|
* @param {Array} config.categoryFields - Fields that define entities |
|
* @param {string} config.metricField - Field to analyze |
|
* @param {number} config.maxEntities - Maximum entities to track |
|
* @returns {Object} Created detector information |
|
*/ |
|
async createHighCardinalityDetector(config) { |
|
const { |
|
name = `${this.defaultConfig.namePrefix}-high-cardinality`, |
|
categoryFields = ['source'], |
|
metricField = 'metric_value', |
|
aggregationType = 'avg', |
|
maxEntities = 1000, |
|
description = 'High-cardinality entity-based anomaly detection' |
|
} = config |
|
|
|
console.log(`🔧 Creating high-cardinality detector: ${name}`) |
|
console.log(` Category fields: ${categoryFields.join(', ')}`) |
|
console.log(` Metric: ${metricField}`) |
|
console.log(` Max entities: ${maxEntities}`) |
|
|
|
if (categoryFields.length > this.defaultConfig.maxCategoryFields) { |
|
throw new Error(`Too many category fields. Maximum allowed: ${this.defaultConfig.maxCategoryFields}`) |
|
} |
|
|
|
// Feature configuration for high-cardinality detection |
|
const featureName = `${metricField}_${aggregationType}_hc` |
|
const featureConfig = { |
|
feature_name: featureName, |
|
feature_enabled: true, |
|
aggregation_query: { |
|
[featureName]: { |
|
[aggregationType]: { |
|
field: metricField |
|
} |
|
} |
|
} |
|
} |
|
|
|
// High-cardinality specific configuration |
|
const detectorConfig = { |
|
name: name, |
|
description: description, |
|
time_field: this.defaultConfig.timeField, |
|
indices: [this.indexName], |
|
feature_attributes: [featureConfig], |
|
|
|
// Category fields enable high-cardinality detection |
|
// Each unique combination of category field values creates a separate model |
|
category_field: categoryFields, |
|
|
|
filter_query: { match_all: {} }, |
|
|
|
detection_interval: { |
|
period: { |
|
interval: this.defaultConfig.intervalMinutes, |
|
unit: 'Minutes' |
|
} |
|
}, |
|
|
|
window_delay: { |
|
period: { |
|
interval: this.defaultConfig.windowDelayMinutes, |
|
unit: 'Minutes' |
|
} |
|
}, |
|
|
|
// High-cardinality specific settings |
|
result_index: `opensearch-ad-plugin-result-${name.toLowerCase()}`, |
|
|
|
// Adjust shingle size for high-cardinality (optional) |
|
shingle_size: this.defaultConfig.shingleSize |
|
} |
|
|
|
try { |
|
const detector = await this.client.createAnomalyDetector(detectorConfig) |
|
|
|
// Track the detector |
|
this.activeDetectors.set(detector._id, { |
|
id: detector._id, |
|
name: name, |
|
type: 'high-cardinality', |
|
config: detectorConfig, |
|
categoryFields: categoryFields, |
|
maxEntities: maxEntities, |
|
created: new Date(), |
|
status: 'created' |
|
}) |
|
|
|
console.log(`✅ High-cardinality detector created successfully: ${detector._id}`) |
|
console.log(` Result index: ${detectorConfig.result_index}`) |
|
|
|
return detector |
|
|
|
} catch (error) { |
|
console.error(`❌ Failed to create high-cardinality detector: ${error.message}`) |
|
throw error |
|
} |
|
} |
|
|
|
/** |
|
* Create a detector with custom aggregations and advanced features |
|
* |
|
* Provides full control over detector configuration for advanced use cases |
|
* including custom aggregations, complex filters, and specialized features. |
|
* |
|
* @param {Object} config - Advanced detector configuration |
|
* @returns {Object} Created detector information |
|
*/ |
|
async createAdvancedDetector(config) { |
|
const { |
|
name = `${this.defaultConfig.namePrefix}-advanced`, |
|
features = [], |
|
filters = [], |
|
customAggregations = {}, |
|
modelSettings = {}, |
|
description = 'Advanced anomaly detector with custom configuration' |
|
} = config |
|
|
|
console.log(`🔧 Creating advanced detector: ${name}`) |
|
console.log(` Features: ${features.length}`) |
|
console.log(` Custom aggregations: ${Object.keys(customAggregations).length}`) |
|
|
|
if (features.length === 0) { |
|
throw new Error('At least one feature configuration is required') |
|
} |
|
|
|
// Build advanced feature configurations |
|
const featureAttributes = [] |
|
|
|
for (const feature of features) { |
|
const { |
|
name: featureName, |
|
enabled = true, |
|
aggregation |
|
} = feature |
|
|
|
if (!aggregation) { |
|
throw new Error(`Feature ${featureName} must have an aggregation configuration`) |
|
} |
|
|
|
const featureConfig = { |
|
feature_name: featureName, |
|
feature_enabled: enabled, |
|
aggregation_query: aggregation |
|
} |
|
|
|
featureAttributes.push(featureConfig) |
|
console.log(` + Advanced feature: ${featureName}`) |
|
} |
|
|
|
// Build complex filter query |
|
let filterQuery = { match_all: {} } |
|
|
|
if (filters.length > 0) { |
|
filterQuery = { |
|
bool: { |
|
filter: filters |
|
} |
|
} |
|
} |
|
|
|
// Build detector configuration with advanced settings |
|
const detectorConfig = { |
|
name: name, |
|
description: description, |
|
time_field: this.defaultConfig.timeField, |
|
indices: [this.indexName], |
|
feature_attributes: featureAttributes, |
|
filter_query: filterQuery, |
|
|
|
detection_interval: { |
|
period: { |
|
interval: modelSettings.intervalMinutes || this.defaultConfig.intervalMinutes, |
|
unit: 'Minutes' |
|
} |
|
}, |
|
|
|
window_delay: { |
|
period: { |
|
interval: modelSettings.windowDelayMinutes || this.defaultConfig.windowDelayMinutes, |
|
unit: 'Minutes' |
|
} |
|
}, |
|
|
|
// Apply custom model settings |
|
...modelSettings |
|
} |
|
|
|
// Add custom aggregations to the configuration |
|
if (Object.keys(customAggregations).length > 0) { |
|
detectorConfig.custom_aggregations = customAggregations |
|
} |
|
|
|
try { |
|
const detector = await this.client.createAnomalyDetector(detectorConfig) |
|
|
|
// Track the detector |
|
this.activeDetectors.set(detector._id, { |
|
id: detector._id, |
|
name: name, |
|
type: 'advanced', |
|
config: detectorConfig, |
|
features: features, |
|
created: new Date(), |
|
status: 'created' |
|
}) |
|
|
|
console.log(`✅ Advanced detector created successfully: ${detector._id}`) |
|
return detector |
|
|
|
} catch (error) { |
|
console.error(`❌ Failed to create advanced detector: ${error.message}`) |
|
throw error |
|
} |
|
} |
|
|
|
/** |
|
* Start a detector and begin anomaly detection |
|
* |
|
* @param {string} detectorId - ID of the detector to start |
|
* @param {Object} options - Start options |
|
* @returns {Object} Start response |
|
*/ |
|
async startDetector(detectorId, options = {}) { |
|
const { |
|
waitForInitialization = true, |
|
initializationTimeoutMs = 60000 |
|
} = options |
|
|
|
console.log(`▶️ Starting detector: ${detectorId}`) |
|
|
|
try { |
|
const response = await this.client.startAnomalyDetector(detectorId) |
|
|
|
// Update detector status |
|
if (this.activeDetectors.has(detectorId)) { |
|
this.activeDetectors.get(detectorId).status = 'starting' |
|
} |
|
|
|
if (waitForInitialization) { |
|
console.log('⏳ Waiting for detector initialization...') |
|
await this.waitForDetectorInitialization(detectorId, initializationTimeoutMs) |
|
} |
|
|
|
console.log(`✅ Detector ${detectorId} started successfully`) |
|
return response |
|
|
|
} catch (error) { |
|
console.error(`❌ Failed to start detector ${detectorId}: ${error.message}`) |
|
|
|
// Update detector status |
|
if (this.activeDetectors.has(detectorId)) { |
|
this.activeDetectors.get(detectorId).status = 'error' |
|
} |
|
|
|
throw error |
|
} |
|
} |
|
|
|
/** |
|
* Wait for detector to complete initialization |
|
* |
|
* @param {string} detectorId - Detector ID |
|
* @param {number} timeoutMs - Timeout in milliseconds |
|
*/ |
|
async waitForDetectorInitialization(detectorId, timeoutMs = 60000) { |
|
const startTime = Date.now() |
|
const pollInterval = 5000 // Check every 5 seconds |
|
|
|
while (Date.now() - startTime < timeoutMs) { |
|
try { |
|
const profile = await this.client.getDetectorProfile(detectorId) |
|
|
|
if (profile.state === 'RUNNING' || profile.state === 'INIT') { |
|
console.log(`🔄 Detector state: ${profile.state}`) |
|
|
|
if (profile.state === 'RUNNING') { |
|
// Update detector status |
|
if (this.activeDetectors.has(detectorId)) { |
|
this.activeDetectors.get(detectorId).status = 'running' |
|
} |
|
return profile |
|
} |
|
} |
|
|
|
if (profile.error) { |
|
throw new Error(`Detector initialization failed: ${profile.error}`) |
|
} |
|
|
|
} catch (error) { |
|
console.warn(`⚠️ Could not get detector profile: ${error.message}`) |
|
} |
|
|
|
// Wait before next check |
|
await new Promise(resolve => setTimeout(resolve, pollInterval)) |
|
} |
|
|
|
throw new Error(`Detector initialization timeout after ${timeoutMs}ms`) |
|
} |
|
|
|
/** |
|
* Stop a detector |
|
* |
|
* @param {string} detectorId - ID of the detector to stop |
|
* @returns {Object} Stop response |
|
*/ |
|
async stopDetector(detectorId) { |
|
console.log(`⏹️ Stopping detector: ${detectorId}`) |
|
|
|
try { |
|
const response = await this.client.stopAnomalyDetector(detectorId) |
|
|
|
// Update detector status |
|
if (this.activeDetectors.has(detectorId)) { |
|
this.activeDetectors.get(detectorId).status = 'stopped' |
|
} |
|
|
|
console.log(`✅ Detector ${detectorId} stopped successfully`) |
|
return response |
|
|
|
} catch (error) { |
|
console.error(`❌ Failed to stop detector ${detectorId}: ${error.message}`) |
|
throw error |
|
} |
|
} |
|
|
|
/** |
|
* Run historical analysis on past data |
|
* |
|
* @param {string} detectorId - Detector ID |
|
* @param {Object} options - Analysis options |
|
* @param {Date} options.startTime - Analysis start time |
|
* @param {Date} options.endTime - Analysis end time |
|
* @param {boolean} options.waitForCompletion - Wait for analysis to complete |
|
* @returns {Object} Analysis response |
|
*/ |
|
async runHistoricalAnalysis(detectorId, options = {}) { |
|
const { |
|
startTime = new Date(Date.now() - 7 * 24 * 60 * 60 * 1000), // Default: 7 days ago |
|
endTime = new Date(), |
|
waitForCompletion = false |
|
} = options |
|
|
|
console.log(`📈 Running historical analysis for detector: ${detectorId}`) |
|
console.log(` Time range: ${startTime.toISOString()} to ${endTime.toISOString()}`) |
|
|
|
try { |
|
const response = await this.client.runHistoricalAnalysis(detectorId, { |
|
startTime: startTime.getTime(), |
|
endTime: endTime.getTime() |
|
}) |
|
|
|
if (waitForCompletion) { |
|
console.log('⏳ Waiting for historical analysis to complete...') |
|
await this.waitForHistoricalAnalysisCompletion(detectorId) |
|
} |
|
|
|
return response |
|
|
|
} catch (error) { |
|
console.error(`❌ Failed to run historical analysis: ${error.message}`) |
|
throw error |
|
} |
|
} |
|
|
|
/** |
|
* Wait for historical analysis to complete with progress tracking |
|
* |
|
* @param {string} detectorId - Detector ID |
|
* @param {number} timeoutMs - Timeout in milliseconds |
|
*/ |
|
async waitForHistoricalAnalysisCompletion(detectorId, timeoutMs = 300000) { |
|
const tracker = new ProgressTracker() |
|
|
|
const checkProgress = async () => { |
|
try { |
|
const profile = await this.client.getDetectorProfile(detectorId) |
|
|
|
if (profile.ad_task && profile.ad_task.task_progress !== undefined) { |
|
return { |
|
percentage: Math.round(profile.ad_task.task_progress), |
|
status: profile.ad_task.state || 'Processing', |
|
completed: profile.ad_task.task_progress >= 100, |
|
error: profile.ad_task.error |
|
} |
|
} |
|
|
|
// Fallback for when no task progress is available |
|
return { |
|
status: profile.state || 'Running', |
|
completed: profile.state === 'COMPLETED' || profile.state === 'FINISHED' |
|
} |
|
|
|
} catch (error) { |
|
return { error: error.message } |
|
} |
|
} |
|
|
|
return await tracker.trackProgress(checkProgress, { |
|
operationName: 'Historical Analysis', |
|
pollIntervalMs: 5000, |
|
timeoutMs: timeoutMs |
|
}) |
|
} |
|
|
|
/** |
|
* Get and interpret anomaly detection results |
|
* |
|
* @param {string} detectorId - Detector ID |
|
* @param {Object} options - Query options |
|
* @returns {Object} Interpreted results |
|
*/ |
|
async getAnomalyResults(detectorId, options = {}) { |
|
const { |
|
startTime = new Date(Date.now() - 24 * 60 * 60 * 1000), |
|
endTime = new Date(), |
|
includeDetails = true |
|
} = options |
|
|
|
console.log(`📊 Getting anomaly results for detector: ${detectorId}`) |
|
|
|
try { |
|
const rawResults = await this.client.getDetectorResults(detectorId, { |
|
startTime: startTime.getTime(), |
|
endTime: endTime.getTime() |
|
}) |
|
|
|
// Interpret and enrich the results |
|
const interpretedResults = this.interpretAnomalyResults(rawResults, includeDetails) |
|
|
|
console.log(`📋 Found ${interpretedResults.anomalies.length} anomalous periods`) |
|
console.log(` Severity breakdown: High: ${interpretedResults.summary.highSeverity}, Medium: ${interpretedResults.summary.mediumSeverity}, Low: ${interpretedResults.summary.lowSeverity}`) |
|
|
|
return interpretedResults |
|
|
|
} catch (error) { |
|
console.error(`❌ Failed to get anomaly results: ${error.message}`) |
|
throw error |
|
} |
|
} |
|
|
|
/** |
|
* Interpret raw anomaly detection results |
|
* |
|
* @param {Object} rawResults - Raw results from OpenSearch |
|
* @param {boolean} includeDetails - Include detailed analysis |
|
* @returns {Object} Interpreted results with severity levels and insights |
|
*/ |
|
interpretAnomalyResults(rawResults, includeDetails = true) { |
|
const anomalies = [] |
|
const summary = { |
|
total: 0, |
|
highSeverity: 0, |
|
mediumSeverity: 0, |
|
lowSeverity: 0, |
|
timeRange: { start: null, end: null } |
|
} |
|
|
|
if (!rawResults.anomaly_result || rawResults.anomaly_result.length === 0) { |
|
return { anomalies, summary, insights: ['No anomalies detected in the specified time range'] } |
|
} |
|
|
|
// Process each anomaly result |
|
rawResults.anomaly_result.forEach(result => { |
|
const anomalyGrade = result.anomaly_grade || 0 |
|
const confidence = result.confidence || 0 |
|
const timestamp = new Date(result.data_start_time) |
|
|
|
// Determine severity based on grade and confidence |
|
let severity = 'low' |
|
if (anomalyGrade >= 0.8 && confidence >= 0.8) { |
|
severity = 'high' |
|
summary.highSeverity++ |
|
} else if (anomalyGrade >= 0.5 && confidence >= 0.6) { |
|
severity = 'medium' |
|
summary.mediumSeverity++ |
|
} else { |
|
summary.lowSeverity++ |
|
} |
|
|
|
const anomaly = { |
|
timestamp: timestamp.toISOString(), |
|
anomalyGrade: parseFloat(anomalyGrade.toFixed(3)), |
|
confidence: parseFloat(confidence.toFixed(3)), |
|
severity: severity, |
|
features: result.feature_data || [] |
|
} |
|
|
|
// Add detailed analysis if requested |
|
if (includeDetails) { |
|
anomaly.analysis = this.analyzeAnomalyFeatures(result.feature_data || []) |
|
} |
|
|
|
anomalies.push(anomaly) |
|
summary.total++ |
|
|
|
// Update time range |
|
if (!summary.timeRange.start || timestamp < new Date(summary.timeRange.start)) { |
|
summary.timeRange.start = timestamp.toISOString() |
|
} |
|
if (!summary.timeRange.end || timestamp > new Date(summary.timeRange.end)) { |
|
summary.timeRange.end = timestamp.toISOString() |
|
} |
|
}) |
|
|
|
// Sort anomalies by severity and timestamp |
|
anomalies.sort((a, b) => { |
|
const severityOrder = { high: 3, medium: 2, low: 1 } |
|
if (severityOrder[a.severity] !== severityOrder[b.severity]) { |
|
return severityOrder[b.severity] - severityOrder[a.severity] |
|
} |
|
return new Date(b.timestamp) - new Date(a.timestamp) |
|
}) |
|
|
|
// Generate insights |
|
const insights = this.generateAnomalyInsights(anomalies, summary) |
|
|
|
return { |
|
anomalies, |
|
summary, |
|
insights |
|
} |
|
} |
|
|
|
/** |
|
* Analyze individual anomaly features to provide insights |
|
* |
|
* @param {Array} featureData - Feature data from anomaly result |
|
* @returns {Object} Feature analysis |
|
*/ |
|
analyzeAnomalyFeatures(featureData) { |
|
const analysis = { |
|
affectedFeatures: [], |
|
primaryDriver: null, |
|
correlations: [] |
|
} |
|
|
|
if (!featureData || featureData.length === 0) { |
|
return analysis |
|
} |
|
|
|
// Analyze each feature |
|
featureData.forEach(feature => { |
|
const featureName = feature.feature_name |
|
const value = feature.data |
|
|
|
if (value !== null && value !== undefined) { |
|
analysis.affectedFeatures.push({ |
|
name: featureName, |
|
value: parseFloat(value.toFixed(3)), |
|
deviation: this.calculateFeatureDeviation(feature) |
|
}) |
|
} |
|
}) |
|
|
|
// Identify primary driver (feature with highest deviation) |
|
if (analysis.affectedFeatures.length > 0) { |
|
analysis.primaryDriver = analysis.affectedFeatures.reduce((max, feature) => |
|
feature.deviation > (max.deviation || 0) ? feature : max |
|
) |
|
} |
|
|
|
return analysis |
|
} |
|
|
|
/** |
|
* Calculate feature deviation (simplified heuristic) |
|
* |
|
* @param {Object} feature - Feature data |
|
* @returns {number} Deviation score |
|
*/ |
|
calculateFeatureDeviation(feature) { |
|
// This is a simplified calculation |
|
// In practice, you would compare against historical baselines |
|
const value = feature.data || 0 |
|
return Math.abs(value) // Simplified deviation metric |
|
} |
|
|
|
/** |
|
* Generate insights from anomaly patterns |
|
* |
|
* @param {Array} anomalies - Processed anomalies |
|
* @param {Object} summary - Summary statistics |
|
* @returns {Array} Array of insights |
|
*/ |
|
generateAnomalyInsights(anomalies, summary) { |
|
const insights = [] |
|
|
|
// Overall severity assessment |
|
if (summary.highSeverity > 0) { |
|
insights.push(`🚨 ${summary.highSeverity} high-severity anomalies detected - immediate attention recommended`) |
|
} |
|
|
|
if (summary.mediumSeverity > 0) { |
|
insights.push(`⚠️ ${summary.mediumSeverity} medium-severity anomalies detected - monitoring recommended`) |
|
} |
|
|
|
if (summary.lowSeverity > 0) { |
|
insights.push(`ℹ️ ${summary.lowSeverity} low-severity anomalies detected - normal variation likely`) |
|
} |
|
|
|
// Temporal patterns |
|
if (anomalies.length > 0) { |
|
const timestamps = anomalies.map(a => new Date(a.timestamp)) |
|
const timeSpan = timestamps.reduce((max, t) => t > max ? t : max) - timestamps.reduce((min, t) => t < min ? t : min) |
|
const hoursSpan = timeSpan / (1000 * 60 * 60) |
|
|
|
if (hoursSpan < 1) { |
|
insights.push('🕐 Anomalies clustered within 1 hour - possible sudden system event') |
|
} else if (hoursSpan < 24) { |
|
insights.push('📅 Anomalies span less than 24 hours - temporary issue likely') |
|
} else { |
|
insights.push('📈 Anomalies span multiple days - systematic issue possible') |
|
} |
|
} |
|
|
|
// Feature analysis insights |
|
const featureAnalysis = this.analyzeFeaturePatterns(anomalies) |
|
if (featureAnalysis.dominantFeature) { |
|
insights.push(`🎯 Primary anomaly driver: ${featureAnalysis.dominantFeature}`) |
|
} |
|
|
|
if (featureAnalysis.multipleFeatures) { |
|
insights.push('🔗 Multiple features affected - complex system interaction detected') |
|
} |
|
|
|
// Confidence patterns |
|
const highConfidenceCount = anomalies.filter(a => a.confidence >= 0.8).length |
|
const lowConfidenceCount = anomalies.filter(a => a.confidence < 0.5).length |
|
|
|
if (highConfidenceCount > summary.total * 0.7) { |
|
insights.push('✅ High confidence in anomaly detection - results reliable') |
|
} else if (lowConfidenceCount > summary.total * 0.5) { |
|
insights.push('❓ Many low-confidence detections - consider adjusting detector sensitivity') |
|
} |
|
|
|
// Rate of anomalies |
|
if (summary.total > 10 && hoursSpan) { |
|
const anomalyRate = summary.total / hoursSpan |
|
if (anomalyRate > 1) { |
|
insights.push(`⚡ High anomaly rate: ${anomalyRate.toFixed(1)} per hour - system instability possible`) |
|
} |
|
} |
|
|
|
return insights |
|
} |
|
|
|
/** |
|
* Analyze patterns in anomaly features |
|
* |
|
* @param {Array} anomalies - Array of anomalies with feature data |
|
* @returns {Object} Feature pattern analysis |
|
*/ |
|
analyzeFeaturePatterns(anomalies) { |
|
const featureCounts = {} |
|
let totalFeatures = 0 |
|
|
|
// Count feature occurrences across anomalies |
|
anomalies.forEach(anomaly => { |
|
if (anomaly.features && anomaly.features.length > 0) { |
|
totalFeatures += anomaly.features.length |
|
anomaly.features.forEach(feature => { |
|
const featureName = feature.feature_name || feature.name |
|
featureCounts[featureName] = (featureCounts[featureName] || 0) + 1 |
|
}) |
|
} |
|
}) |
|
|
|
// Find dominant feature |
|
let dominantFeature = null |
|
let maxCount = 0 |
|
Object.entries(featureCounts).forEach(([feature, count]) => { |
|
if (count > maxCount) { |
|
maxCount = count |
|
dominantFeature = feature |
|
} |
|
}) |
|
|
|
return { |
|
dominantFeature: dominantFeature, |
|
multipleFeatures: Object.keys(featureCounts).length > 1, |
|
featureCounts: featureCounts, |
|
averageFeaturesPerAnomaly: totalFeatures / Math.max(anomalies.length, 1) |
|
} |
|
} |
|
|
|
/** |
|
* Get detector status and statistics |
|
* |
|
* @param {string} detectorId - Detector ID |
|
* @returns {Object} Detector status information |
|
*/ |
|
async getDetectorStatus(detectorId) { |
|
console.log(`📊 Getting status for detector: ${detectorId}`) |
|
|
|
try { |
|
const profile = await this.client.getDetectorProfile(detectorId) |
|
const localInfo = this.activeDetectors.get(detectorId) |
|
|
|
const status = { |
|
id: detectorId, |
|
name: localInfo?.name || 'Unknown', |
|
type: localInfo?.type || 'Unknown', |
|
state: profile.state || 'Unknown', |
|
created: localInfo?.created || null, |
|
lastUpdate: new Date().toISOString(), |
|
|
|
// Model information |
|
modelInfo: { |
|
initProgress: profile.init_progress || null, |
|
modelSizeInBytes: profile.model?.model_size_in_bytes || null, |
|
modelId: profile.model?.model_id || null |
|
}, |
|
|
|
// Performance metrics |
|
metrics: { |
|
totalSamples: profile.total_size_in_bytes || 0, |
|
anomalyRate: profile.anomaly_grade?.anomaly_grade || null, |
|
lastAnomalyTime: profile.last_update_time || null |
|
}, |
|
|
|
// Error information |
|
error: profile.error || null, |
|
|
|
// Raw profile for debugging |
|
rawProfile: profile |
|
} |
|
|
|
console.log(`📋 Detector ${detectorId} status: ${status.state}`) |
|
return status |
|
|
|
} catch (error) { |
|
console.error(`❌ Failed to get detector status: ${error.message}`) |
|
throw error |
|
} |
|
} |
|
|
|
/** |
|
* Clean up and delete a detector |
|
* |
|
* @param {string} detectorId - Detector ID to delete |
|
* @param {Object} options - Deletion options |
|
* @returns {Object} Deletion response |
|
*/ |
|
async deleteDetector(detectorId, options = {}) { |
|
const { stopFirst = true, force = false } = options |
|
|
|
console.log(`🗑️ Deleting detector: ${detectorId}`) |
|
|
|
try { |
|
// Stop detector first if requested |
|
if (stopFirst) { |
|
try { |
|
await this.stopDetector(detectorId) |
|
} catch (error) { |
|
if (!force) { |
|
throw error |
|
} |
|
console.warn(`⚠️ Could not stop detector, continuing with deletion: ${error.message}`) |
|
} |
|
} |
|
|
|
// Delete the detector |
|
const response = await this.client.deleteAnomalyDetector(detectorId) |
|
|
|
// Remove from local tracking |
|
this.activeDetectors.delete(detectorId) |
|
|
|
console.log(`✅ Detector ${detectorId} deleted successfully`) |
|
return response |
|
|
|
} catch (error) { |
|
console.error(`❌ Failed to delete detector: ${error.message}`) |
|
throw error |
|
} |
|
} |
|
|
|
/** |
|
* List all managed detectors |
|
* |
|
* @returns {Array} Array of detector information |
|
*/ |
|
listManagedDetectors() { |
|
return Array.from(this.activeDetectors.values()).map(detector => ({ |
|
id: detector.id, |
|
name: detector.name, |
|
type: detector.type, |
|
status: detector.status, |
|
created: detector.created |
|
})) |
|
} |
|
|
|
/** |
|
* Export detector configuration for backup or replication |
|
* |
|
* @param {string} detectorId - Detector ID |
|
* @returns {Object} Exportable detector configuration |
|
*/ |
|
exportDetectorConfig(detectorId) { |
|
const detector = this.activeDetectors.get(detectorId) |
|
if (!detector) { |
|
throw new Error(`Detector ${detectorId} not found in managed detectors`) |
|
} |
|
|
|
return { |
|
config: detector.config, |
|
metadata: { |
|
name: detector.name, |
|
type: detector.type, |
|
created: detector.created, |
|
exportedAt: new Date().toISOString() |
|
} |
|
} |
|
} |
|
} |
|
|
|
module.exports = AnomalyDetectionManager |