Skip to content

Instantly share code, notes, and snippets.

@nodebotanist
Last active August 26, 2025 21:08
Show Gist options
  • Save nodebotanist/5c76cedcc53c0da1d9d6f5f63193b4de to your computer and use it in GitHub Desktop.
Save nodebotanist/5c76cedcc53c0da1d9d6f5f63193b4de to your computer and use it in GitHub Desktop.
index.js
# OpenSearch Cluster Configuration
# Copy this file to .env and update with your actual values
# OpenSearch cluster endpoint (required)
# For NetApp Instaclustr: Use your cluster endpoint URL
# Format: https://your-cluster-id.instaclustr.io:9200
OPENSEARCH_NODE=https://localhost:9200
# Authentication credentials (required)
# Use your cluster username and password
OPENSEARCH_USERNAME=admin
OPENSEARCH_PASSWORD=admin
# Optional: Custom index name for demo data
# Default: opensearch-sample-data-logs (recommended for demo consistency)
OPENSEARCH_INDEX=opensearch-sample-data-logs
# Optional: Automatically cleanup detectors after demo
# Values: true/false
# Default: false
CLEANUP_DETECTOR=false
# Optional: Data generation settings
# Hours of historical data to generate
DATA_HOURS=2
# Anomaly probability (0.0 = none, 1.0 = all anomalies)
ANOMALY_RATE=0.05

🔍 OpenSearch Anomaly Detection CLI

Professional command-line interface for demonstrating OpenSearch's anomaly detection capabilities. Generate realistic time-series data, create ML-based anomaly detectors, and showcase production-ready error handling with real-time progress tracking.

🎯 Purpose

This demo showcases OpenSearch's ability to handle AI workloads by demonstrating:

  • High-volume data ingestion with progress tracking
  • ML-based anomaly detection using the Random Cut Forest algorithm
  • Production-ready error handling with network resilience
  • Real-time monitoring capabilities for system metrics

Perfect for technical evaluations, conference demonstrations, and educational workshops.

🚀 Quick Start

Prerequisites

  • Node.js 14.0.0 or higher
  • Access to an OpenSearch cluster (local or cloud)
  • Network connectivity to your OpenSearch instance

Installation

  1. Clone and install dependencies:

    git clone <repository-url>
    cd opensearch-anomaly-detection
    npm install
  2. Configure environment:

    cp .env.example .env
    # Edit .env with your OpenSearch cluster details
  3. Set your OpenSearch credentials in .env:

    OPENSEARCH_NODE=https://your-cluster:9200
    OPENSEARCH_USERNAME=your-username
    OPENSEARCH_PASSWORD=your-password
    OPENSEARCH_INDEX=opensearch-sample-data-logs

🎮 Usage

Generate Historical Training Data

Create 2 weeks of realistic system metrics data:

# Generate comprehensive training dataset
node index.js generate --hours=336 --interval=5

# Quick test dataset (2 hours)
node index.js generate --hours=2 --interval=1

Run Anomaly Detection Demos

Basic Single-Metric Detection:

node index.js basic

Advanced Multi-Metric Detection:

node index.js advanced --hours=4 --rate=0.08

Chaos Engineering Demo:

node index.js chaos --duration=30 --severity=severe

Data Generation Only:

node index.js generate --hours=24 --servers=20

Detector Management:

# Check detector health
node index.js health --all

# Compare multiple detectors
node index.js compare detector1 detector2 detector3

# Cleanup resources
node index.js cleanup

📊 Command Reference

Global Options

Option Description Default
--index <name> Target index name opensearch-sample-data-logs
--cleanup Delete detectors after demo false
--verbose Enable detailed logging false

Data Generation Options

Option Description Default Example
--hours <n> Hours of historical data 2 --hours=336 (2 weeks)
--interval <min> Data point interval (minutes) 5 --interval=1
--rate <n> Anomaly probability (0.0-1.0) 0.05 --rate=0.08
--servers <n> Number of servers to simulate 10 --servers=50
--metrics <list> Comma-separated metrics cpu_usage,memory_usage,response_time --metrics=cpu_usage,network_traffic

Chaos Options

Option Description Default Example
--duration <min> Chaos scenario duration (minutes) 30 --duration=45
--severity <level> Chaos intensity: mild, moderate, severe severe --severity=moderate
Option Description Default
-------- ------------- ---------
--detection-interval <min> Detection frequency 5
--high-cardinality Enable entity-based detection false
--historical Run historical analysis false

🏗️ Architecture

Module Structure

├── index.js              # CLI entry point with Commander.js
├── os-client.js           # OpenSearch operations & error handling  
├── data-generator.js      # Realistic time-series data generation
├── anomaly-detection.js   # High-level anomaly detection management
├── progress-tracker.js    # Reusable progress tracking utility
└── .env.example          # Environment configuration template

Key Features

🟡 Intelligent Progress Tracking:

  • Progress bars for percentage-based operations (bulk insert, ML training)
  • Text status updates for binary state operations (detector startup)
  • Real-time polling with network timeout handling

🔍 Multiple Detector Types:

  • Basic: Single-metric anomaly detection
  • Multi-metric: Analyze multiple related metrics simultaneously
  • High-cardinality: Entity-based detection across multiple servers/users
  • Advanced: Custom aggregations and complex feature engineering

📈 Realistic Data Generation:

  • Multiple metric types: CPU, memory, network, response time, disk I/O
  • Seasonal patterns: Business hours, weekday/weekend cycles
  • Correlated anomalies: System failures affecting multiple metrics
  • Configurable anomaly types: spikes, drops, drifts, outliers

🛠️ Configuration

Environment Variables

Create a .env file with your cluster details:

# Required: OpenSearch connection
OPENSEARCH_NODE=https://your-cluster:9200
OPENSEARCH_USERNAME=your-username  
OPENSEARCH_PASSWORD=your-password

# Optional: Demo settings
OPENSEARCH_INDEX=opensearch-sample-data-logs
CLEANUP_DETECTOR=false
DATA_HOURS=2
ANOMALY_RATE=0.05

Example Configurations

Conference Demo (Fast):

DATA_HOURS=2
ANOMALY_RATE=0.08
CLEANUP_DETECTOR=true

Production Evaluation (Comprehensive):

DATA_HOURS=336
ANOMALY_RATE=0.02
CLEANUP_DETECTOR=false

🚨 Error Handling

The application includes production-ready error handling that demonstrates OpenSearch's robustness:

Network Resilience:

  • Automatic retries with exponential backoff
  • Clear timeout detection and reporting
  • Graceful degradation for non-critical operations

Data Quality Validation:

  • Incomplete ingestion detection with impact analysis
  • Training data sufficiency warnings
  • Rust-like error messaging with actionable recommendations

📖 OpenSearch Integration

Anomaly Detection Plugin Features

  • Random Cut Forest (RCF) algorithm for unsupervised learning
  • Real-time detection with configurable intervals
  • Historical analysis for batch processing of past data
  • High-cardinality support for entity-based monitoring
  • Integration with OpenSearch Alerting for notifications

API Documentation References

🔧 Troubleshooting

Common Issues

Connection Errors:

❌ OpenSearch Connection Error
   ├─ Cause: Failed to connect after 3 attempts
   └─ Recommendation: Check OpenSearch cluster status and network connectivity

Solution: Verify OPENSEARCH_NODE URL and network access.

Authentication Failures:

❌ Error: authentication failed

Solution: Check OPENSEARCH_USERNAME and OPENSEARCH_PASSWORD in .env.

Incomplete Data Ingestion:

❌ Incomplete Data Ingestion Error
   ├─ Cause: Only 1,247 of 2,880 expected documents inserted (43% completion)
   └─ Impact: Insufficient training data for reliable anomaly detection

Solution: Check cluster resources or reduce data volume.

Performance Optimization

For Large Datasets:

  • Increase batch size: Modify batchSize in data generation
  • Adjust detection intervals: Use longer intervals for high-volume data
  • Monitor cluster resources: Ensure adequate CPU/memory for ML operations

For Conference Demos:

  • Use smaller datasets (--hours=2)
  • Higher anomaly rates (--rate=0.08) for visible results
  • Enable cleanup (--cleanup) for quick iterations

🤝 Contributing

This demo code is designed for:

  • Students: Learning OpenSearch anomaly detection concepts
  • Developers: Implementing anomaly detection in production systems
  • Evaluators: Assessing OpenSearch capabilities for AI workloads

Feel free to extend the detector types, add new metrics, or enhance the progress tracking for your specific use cases.

📝 License

MIT License - See LICENSE file for details.


Ready to detect anomalies? Start with node index.js basic and explore OpenSearch's powerful ML capabilities!

/**
* Anomaly Detection Module
*
* Handles all OpenSearch Anomaly Detection plugin operations including:
* - Detector configuration and creation
* - Feature engineering and aggregation setup
* - Model training and validation
* - Real-time and historical analysis
* - Result interpretation and alerting
* - Detector lifecycle management
*
* This module abstracts the complexity of the OpenSearch AD plugin and provides
* high-level methods for common anomaly detection workflows.
*
* OpenSearch Anomaly Detection Documentation:
* - Overview: https://opensearch.org/docs/latest/monitoring-plugins/ad/
* - API Reference: https://opensearch.org/docs/latest/monitoring-plugins/ad/api/
* - Settings: https://opensearch.org/docs/latest/monitoring-plugins/ad/settings/
* - Troubleshooting: https://opensearch.org/docs/latest/monitoring-plugins/ad/troubleshoot/
*/
/**
* Anomaly Detection Manager class
*
* Provides high-level interface for OpenSearch anomaly detection operations
*/
const ProgressTracker = require('./progress-tracker')
class AnomalyDetectionManager {
/**
* Initialize the anomaly detection manager
*
* @param {OpenSearchClient} client - Initialized OpenSearch client instance
* @param {Object} defaultConfig - Default configuration options
*/
constructor(client, defaultConfig = {}) {
this.client = client
this.indexName = client.indexName
// Default configuration for anomaly detectors
// These can be overridden when creating specific detectors
this.defaultConfig = {
// Detector naming and description
namePrefix: this.indexName || 'anomaly-detector',
description: 'Automated anomaly detection for system metrics',
// Time series configuration
timeField: 'timestamp',
intervalMinutes: 5, // Detection interval
windowDelayMinutes: 1, // Processing delay to account for late data
// Feature configuration
maxFeatures: 5, // Maximum features per detector (OpenSearch limit)
aggregationWindow: '1m', // Aggregation window for features
// Model training
trainingDataPeriod: 7, // Days of data needed for training
minTrainingData: 256, // Minimum data points for training
// Detection sensitivity
anomalyGradeThreshold: 0.7, // Minimum grade to consider anomalous
confidenceThreshold: 0.8, // Minimum confidence for alerts
// Performance settings
maxCategoryFields: 2, // Maximum category fields for high cardinality detection
shingleSize: 8, // Time series shingle size (OpenSearch default)
...defaultConfig
}
// Track created detectors for management
this.activeDetectors = new Map()
console.log('🔍 Anomaly Detection Manager initialized')
console.log(` Default interval: ${this.defaultConfig.intervalMinutes} minutes`)
console.log(` Training period: ${this.defaultConfig.trainingDataPeriod} days`)
}
/**
* Create a basic single-metric anomaly detector
*
* Ideal for detecting anomalies in a single metric like CPU usage,
* response time, or error rate.
*
* @param {Object} config - Detector configuration
* @param {string} config.name - Detector name
* @param {string} config.metricField - Field containing the metric values
* @param {string} config.category - Optional category filter
* @param {string} config.aggregationType - Aggregation type (avg, max, min, sum)
* @returns {Object} Created detector information
*/
async createBasicDetector(config) {
const {
name = `${this.defaultConfig.namePrefix}-basic`,
metricField = 'metric_value',
category = null,
aggregationType = 'avg',
description = `Basic anomaly detection for ${metricField}`
} = config
console.log(`🔧 Creating basic detector: ${name}`)
console.log(` Metric: ${metricField} (${aggregationType})`)
console.log(` Category filter: ${category || 'none'}`)
// Build feature definition
// Features define what metrics to analyze and how to aggregate them
// Documentation: https://opensearch.org/docs/latest/monitoring-plugins/ad/api/#create-detector
const featureName = `${metricField}_${aggregationType}`
const featureConfig = {
feature_name: featureName,
feature_enabled: true,
aggregation_query: {
[featureName]: {
[aggregationType]: {
field: metricField
}
}
}
}
// Build filter query if category is specified
let filterQuery = { match_all: {} }
if (category) {
filterQuery = {
bool: {
filter: [
{ term: { category: category } }
]
}
}
}
// Create detector configuration
const detectorConfig = {
name: name,
description: description,
time_field: this.defaultConfig.timeField,
indices: [this.indexName],
feature_attributes: [featureConfig],
filter_query: filterQuery,
detection_interval: {
period: {
interval: this.defaultConfig.intervalMinutes,
unit: 'Minutes'
}
},
window_delay: {
period: {
interval: this.defaultConfig.windowDelayMinutes,
unit: 'Minutes'
}
}
}
try {
const detector = await this.client.createAnomalyDetector(detectorConfig)
// Track the detector
this.activeDetectors.set(detector._id, {
id: detector._id,
name: name,
type: 'basic',
config: detectorConfig,
created: new Date(),
status: 'created'
})
console.log(`✅ Basic detector created successfully: ${detector._id}`)
return detector
} catch (error) {
console.error(`❌ Failed to create basic detector: ${error.message}`)
throw error
}
}
/**
* Create a multi-metric detector for comprehensive monitoring
*
* Analyzes multiple related metrics simultaneously to detect
* complex anomalies that span multiple dimensions.
*
* @param {Object} config - Multi-metric detector configuration
* @param {string} config.name - Detector name
* @param {Array} config.metrics - Array of metric configurations
* @param {Object} config.filters - Additional filters
* @returns {Object} Created detector information
*/
async createMultiMetricDetector(config) {
const {
name = `${this.defaultConfig.namePrefix}-multi-metric`,
metrics = [],
filters = {},
description = 'Multi-metric anomaly detection for system monitoring'
} = config
console.log(`🔧 Creating multi-metric detector: ${name}`)
console.log(` Metrics: ${metrics.length}`)
if (metrics.length === 0) {
throw new Error('At least one metric configuration is required')
}
if (metrics.length > this.defaultConfig.maxFeatures) {
console.warn(`⚠️ Warning: ${metrics.length} metrics exceeds recommended limit of ${this.defaultConfig.maxFeatures}`)
}
// Build feature configurations for each metric
const featureAttributes = []
for (const metric of metrics) {
const {
field,
aggregations = ['avg'],
category = null,
enabled = true
} = metric
// Create features for each aggregation type
for (const aggType of aggregations) {
const featureName = `${field}_${aggType}`
if (category) {
featureName += `_${category}`
}
const featureConfig = {
feature_name: featureName,
feature_enabled: enabled,
aggregation_query: {
[featureName]: {
[aggType]: {
field: field
}
}
}
}
featureAttributes.push(featureConfig)
console.log(` + Feature: ${featureName}`)
}
}
// Build comprehensive filter query
const mustFilters = []
// Add time range filter for recent data
mustFilters.push({
range: {
[this.defaultConfig.timeField]: {
gte: 'now-1h'
}
}
})
// Add custom filters
Object.entries(filters).forEach(([field, value]) => {
if (Array.isArray(value)) {
mustFilters.push({ terms: { [field]: value } })
} else {
mustFilters.push({ term: { [field]: value } })
}
})
const filterQuery = {
bool: {
filter: mustFilters
}
}
// Create detector configuration
const detectorConfig = {
name: name,
description: description,
time_field: this.defaultConfig.timeField,
indices: [this.indexName],
feature_attributes: featureAttributes,
filter_query: filterQuery,
detection_interval: {
period: {
interval: this.defaultConfig.intervalMinutes,
unit: 'Minutes'
}
},
window_delay: {
period: {
interval: this.defaultConfig.windowDelayMinutes,
unit: 'Minutes'
}
}
}
try {
const detector = await this.client.createAnomalyDetector(detectorConfig)
// Track the detector
this.activeDetectors.set(detector._id, {
id: detector._id,
name: name,
type: 'multi-metric',
config: detectorConfig,
metrics: metrics,
created: new Date(),
status: 'created'
})
console.log(`✅ Multi-metric detector created successfully: ${detector._id}`)
return detector
} catch (error) {
console.error(`❌ Failed to create multi-metric detector: ${error.message}`)
throw error
}
}
/**
* Create a high-cardinality detector for entity-based anomaly detection
*
* Detects anomalies across multiple entities (servers, users, etc.)
* using category fields to separate different entity behaviors.
*
* Documentation: https://opensearch.org/docs/latest/monitoring-plugins/ad/api/#create-detector
*
* @param {Object} config - High-cardinality detector configuration
* @param {Array} config.categoryFields - Fields that define entities
* @param {string} config.metricField - Field to analyze
* @param {number} config.maxEntities - Maximum entities to track
* @returns {Object} Created detector information
*/
async createHighCardinalityDetector(config) {
const {
name = `${this.defaultConfig.namePrefix}-high-cardinality`,
categoryFields = ['source'],
metricField = 'metric_value',
aggregationType = 'avg',
maxEntities = 1000,
description = 'High-cardinality entity-based anomaly detection'
} = config
console.log(`🔧 Creating high-cardinality detector: ${name}`)
console.log(` Category fields: ${categoryFields.join(', ')}`)
console.log(` Metric: ${metricField}`)
console.log(` Max entities: ${maxEntities}`)
if (categoryFields.length > this.defaultConfig.maxCategoryFields) {
throw new Error(`Too many category fields. Maximum allowed: ${this.defaultConfig.maxCategoryFields}`)
}
// Feature configuration for high-cardinality detection
const featureName = `${metricField}_${aggregationType}_hc`
const featureConfig = {
feature_name: featureName,
feature_enabled: true,
aggregation_query: {
[featureName]: {
[aggregationType]: {
field: metricField
}
}
}
}
// High-cardinality specific configuration
const detectorConfig = {
name: name,
description: description,
time_field: this.defaultConfig.timeField,
indices: [this.indexName],
feature_attributes: [featureConfig],
// Category fields enable high-cardinality detection
// Each unique combination of category field values creates a separate model
category_field: categoryFields,
filter_query: { match_all: {} },
detection_interval: {
period: {
interval: this.defaultConfig.intervalMinutes,
unit: 'Minutes'
}
},
window_delay: {
period: {
interval: this.defaultConfig.windowDelayMinutes,
unit: 'Minutes'
}
},
// High-cardinality specific settings
result_index: `opensearch-ad-plugin-result-${name.toLowerCase()}`,
// Adjust shingle size for high-cardinality (optional)
shingle_size: this.defaultConfig.shingleSize
}
try {
const detector = await this.client.createAnomalyDetector(detectorConfig)
// Track the detector
this.activeDetectors.set(detector._id, {
id: detector._id,
name: name,
type: 'high-cardinality',
config: detectorConfig,
categoryFields: categoryFields,
maxEntities: maxEntities,
created: new Date(),
status: 'created'
})
console.log(`✅ High-cardinality detector created successfully: ${detector._id}`)
console.log(` Result index: ${detectorConfig.result_index}`)
return detector
} catch (error) {
console.error(`❌ Failed to create high-cardinality detector: ${error.message}`)
throw error
}
}
/**
* Create a detector with custom aggregations and advanced features
*
* Provides full control over detector configuration for advanced use cases
* including custom aggregations, complex filters, and specialized features.
*
* @param {Object} config - Advanced detector configuration
* @returns {Object} Created detector information
*/
async createAdvancedDetector(config) {
const {
name = `${this.defaultConfig.namePrefix}-advanced`,
features = [],
filters = [],
customAggregations = {},
modelSettings = {},
description = 'Advanced anomaly detector with custom configuration'
} = config
console.log(`🔧 Creating advanced detector: ${name}`)
console.log(` Features: ${features.length}`)
console.log(` Custom aggregations: ${Object.keys(customAggregations).length}`)
if (features.length === 0) {
throw new Error('At least one feature configuration is required')
}
// Build advanced feature configurations
const featureAttributes = []
for (const feature of features) {
const {
name: featureName,
enabled = true,
aggregation
} = feature
if (!aggregation) {
throw new Error(`Feature ${featureName} must have an aggregation configuration`)
}
const featureConfig = {
feature_name: featureName,
feature_enabled: enabled,
aggregation_query: aggregation
}
featureAttributes.push(featureConfig)
console.log(` + Advanced feature: ${featureName}`)
}
// Build complex filter query
let filterQuery = { match_all: {} }
if (filters.length > 0) {
filterQuery = {
bool: {
filter: filters
}
}
}
// Build detector configuration with advanced settings
const detectorConfig = {
name: name,
description: description,
time_field: this.defaultConfig.timeField,
indices: [this.indexName],
feature_attributes: featureAttributes,
filter_query: filterQuery,
detection_interval: {
period: {
interval: modelSettings.intervalMinutes || this.defaultConfig.intervalMinutes,
unit: 'Minutes'
}
},
window_delay: {
period: {
interval: modelSettings.windowDelayMinutes || this.defaultConfig.windowDelayMinutes,
unit: 'Minutes'
}
},
// Apply custom model settings
...modelSettings
}
// Add custom aggregations to the configuration
if (Object.keys(customAggregations).length > 0) {
detectorConfig.custom_aggregations = customAggregations
}
try {
const detector = await this.client.createAnomalyDetector(detectorConfig)
// Track the detector
this.activeDetectors.set(detector._id, {
id: detector._id,
name: name,
type: 'advanced',
config: detectorConfig,
features: features,
created: new Date(),
status: 'created'
})
console.log(`✅ Advanced detector created successfully: ${detector._id}`)
return detector
} catch (error) {
console.error(`❌ Failed to create advanced detector: ${error.message}`)
throw error
}
}
/**
* Start a detector and begin anomaly detection
*
* @param {string} detectorId - ID of the detector to start
* @param {Object} options - Start options
* @returns {Object} Start response
*/
async startDetector(detectorId, options = {}) {
const {
waitForInitialization = true,
initializationTimeoutMs = 60000
} = options
console.log(`▶️ Starting detector: ${detectorId}`)
try {
const response = await this.client.startAnomalyDetector(detectorId)
// Update detector status
if (this.activeDetectors.has(detectorId)) {
this.activeDetectors.get(detectorId).status = 'starting'
}
if (waitForInitialization) {
console.log('⏳ Waiting for detector initialization...')
await this.waitForDetectorInitialization(detectorId, initializationTimeoutMs)
}
console.log(`✅ Detector ${detectorId} started successfully`)
return response
} catch (error) {
console.error(`❌ Failed to start detector ${detectorId}: ${error.message}`)
// Update detector status
if (this.activeDetectors.has(detectorId)) {
this.activeDetectors.get(detectorId).status = 'error'
}
throw error
}
}
/**
* Wait for detector to complete initialization
*
* @param {string} detectorId - Detector ID
* @param {number} timeoutMs - Timeout in milliseconds
*/
async waitForDetectorInitialization(detectorId, timeoutMs = 60000) {
const startTime = Date.now()
const pollInterval = 5000 // Check every 5 seconds
while (Date.now() - startTime < timeoutMs) {
try {
const profile = await this.client.getDetectorProfile(detectorId)
if (profile.state === 'RUNNING' || profile.state === 'INIT') {
console.log(`🔄 Detector state: ${profile.state}`)
if (profile.state === 'RUNNING') {
// Update detector status
if (this.activeDetectors.has(detectorId)) {
this.activeDetectors.get(detectorId).status = 'running'
}
return profile
}
}
if (profile.error) {
throw new Error(`Detector initialization failed: ${profile.error}`)
}
} catch (error) {
console.warn(`⚠️ Could not get detector profile: ${error.message}`)
}
// Wait before next check
await new Promise(resolve => setTimeout(resolve, pollInterval))
}
throw new Error(`Detector initialization timeout after ${timeoutMs}ms`)
}
/**
* Stop a detector
*
* @param {string} detectorId - ID of the detector to stop
* @returns {Object} Stop response
*/
async stopDetector(detectorId) {
console.log(`⏹️ Stopping detector: ${detectorId}`)
try {
const response = await this.client.stopAnomalyDetector(detectorId)
// Update detector status
if (this.activeDetectors.has(detectorId)) {
this.activeDetectors.get(detectorId).status = 'stopped'
}
console.log(`✅ Detector ${detectorId} stopped successfully`)
return response
} catch (error) {
console.error(`❌ Failed to stop detector ${detectorId}: ${error.message}`)
throw error
}
}
/**
* Run historical analysis on past data
*
* @param {string} detectorId - Detector ID
* @param {Object} options - Analysis options
* @param {Date} options.startTime - Analysis start time
* @param {Date} options.endTime - Analysis end time
* @param {boolean} options.waitForCompletion - Wait for analysis to complete
* @returns {Object} Analysis response
*/
async runHistoricalAnalysis(detectorId, options = {}) {
const {
startTime = new Date(Date.now() - 7 * 24 * 60 * 60 * 1000), // Default: 7 days ago
endTime = new Date(),
waitForCompletion = false
} = options
console.log(`📈 Running historical analysis for detector: ${detectorId}`)
console.log(` Time range: ${startTime.toISOString()} to ${endTime.toISOString()}`)
try {
const response = await this.client.runHistoricalAnalysis(detectorId, {
startTime: startTime.getTime(),
endTime: endTime.getTime()
})
if (waitForCompletion) {
console.log('⏳ Waiting for historical analysis to complete...')
await this.waitForHistoricalAnalysisCompletion(detectorId)
}
return response
} catch (error) {
console.error(`❌ Failed to run historical analysis: ${error.message}`)
throw error
}
}
/**
* Wait for historical analysis to complete with progress tracking
*
* @param {string} detectorId - Detector ID
* @param {number} timeoutMs - Timeout in milliseconds
*/
async waitForHistoricalAnalysisCompletion(detectorId, timeoutMs = 300000) {
const tracker = new ProgressTracker()
const checkProgress = async () => {
try {
const profile = await this.client.getDetectorProfile(detectorId)
if (profile.ad_task && profile.ad_task.task_progress !== undefined) {
return {
percentage: Math.round(profile.ad_task.task_progress),
status: profile.ad_task.state || 'Processing',
completed: profile.ad_task.task_progress >= 100,
error: profile.ad_task.error
}
}
// Fallback for when no task progress is available
return {
status: profile.state || 'Running',
completed: profile.state === 'COMPLETED' || profile.state === 'FINISHED'
}
} catch (error) {
return { error: error.message }
}
}
return await tracker.trackProgress(checkProgress, {
operationName: 'Historical Analysis',
pollIntervalMs: 5000,
timeoutMs: timeoutMs
})
}
/**
* Get and interpret anomaly detection results
*
* @param {string} detectorId - Detector ID
* @param {Object} options - Query options
* @returns {Object} Interpreted results
*/
async getAnomalyResults(detectorId, options = {}) {
const {
startTime = new Date(Date.now() - 24 * 60 * 60 * 1000),
endTime = new Date(),
includeDetails = true
} = options
console.log(`📊 Getting anomaly results for detector: ${detectorId}`)
try {
const rawResults = await this.client.getDetectorResults(detectorId, {
startTime: startTime.getTime(),
endTime: endTime.getTime()
})
// Interpret and enrich the results
const interpretedResults = this.interpretAnomalyResults(rawResults, includeDetails)
console.log(`📋 Found ${interpretedResults.anomalies.length} anomalous periods`)
console.log(` Severity breakdown: High: ${interpretedResults.summary.highSeverity}, Medium: ${interpretedResults.summary.mediumSeverity}, Low: ${interpretedResults.summary.lowSeverity}`)
return interpretedResults
} catch (error) {
console.error(`❌ Failed to get anomaly results: ${error.message}`)
throw error
}
}
/**
* Interpret raw anomaly detection results
*
* @param {Object} rawResults - Raw results from OpenSearch
* @param {boolean} includeDetails - Include detailed analysis
* @returns {Object} Interpreted results with severity levels and insights
*/
interpretAnomalyResults(rawResults, includeDetails = true) {
const anomalies = []
const summary = {
total: 0,
highSeverity: 0,
mediumSeverity: 0,
lowSeverity: 0,
timeRange: { start: null, end: null }
}
if (!rawResults.anomaly_result || rawResults.anomaly_result.length === 0) {
return { anomalies, summary, insights: ['No anomalies detected in the specified time range'] }
}
// Process each anomaly result
rawResults.anomaly_result.forEach(result => {
const anomalyGrade = result.anomaly_grade || 0
const confidence = result.confidence || 0
const timestamp = new Date(result.data_start_time)
// Determine severity based on grade and confidence
let severity = 'low'
if (anomalyGrade >= 0.8 && confidence >= 0.8) {
severity = 'high'
summary.highSeverity++
} else if (anomalyGrade >= 0.5 && confidence >= 0.6) {
severity = 'medium'
summary.mediumSeverity++
} else {
summary.lowSeverity++
}
const anomaly = {
timestamp: timestamp.toISOString(),
anomalyGrade: parseFloat(anomalyGrade.toFixed(3)),
confidence: parseFloat(confidence.toFixed(3)),
severity: severity,
features: result.feature_data || []
}
// Add detailed analysis if requested
if (includeDetails) {
anomaly.analysis = this.analyzeAnomalyFeatures(result.feature_data || [])
}
anomalies.push(anomaly)
summary.total++
// Update time range
if (!summary.timeRange.start || timestamp < new Date(summary.timeRange.start)) {
summary.timeRange.start = timestamp.toISOString()
}
if (!summary.timeRange.end || timestamp > new Date(summary.timeRange.end)) {
summary.timeRange.end = timestamp.toISOString()
}
})
// Sort anomalies by severity and timestamp
anomalies.sort((a, b) => {
const severityOrder = { high: 3, medium: 2, low: 1 }
if (severityOrder[a.severity] !== severityOrder[b.severity]) {
return severityOrder[b.severity] - severityOrder[a.severity]
}
return new Date(b.timestamp) - new Date(a.timestamp)
})
// Generate insights
const insights = this.generateAnomalyInsights(anomalies, summary)
return {
anomalies,
summary,
insights
}
}
/**
* Analyze individual anomaly features to provide insights
*
* @param {Array} featureData - Feature data from anomaly result
* @returns {Object} Feature analysis
*/
analyzeAnomalyFeatures(featureData) {
const analysis = {
affectedFeatures: [],
primaryDriver: null,
correlations: []
}
if (!featureData || featureData.length === 0) {
return analysis
}
// Analyze each feature
featureData.forEach(feature => {
const featureName = feature.feature_name
const value = feature.data
if (value !== null && value !== undefined) {
analysis.affectedFeatures.push({
name: featureName,
value: parseFloat(value.toFixed(3)),
deviation: this.calculateFeatureDeviation(feature)
})
}
})
// Identify primary driver (feature with highest deviation)
if (analysis.affectedFeatures.length > 0) {
analysis.primaryDriver = analysis.affectedFeatures.reduce((max, feature) =>
feature.deviation > (max.deviation || 0) ? feature : max
)
}
return analysis
}
/**
* Calculate feature deviation (simplified heuristic)
*
* @param {Object} feature - Feature data
* @returns {number} Deviation score
*/
calculateFeatureDeviation(feature) {
// This is a simplified calculation
// In practice, you would compare against historical baselines
const value = feature.data || 0
return Math.abs(value) // Simplified deviation metric
}
/**
* Generate insights from anomaly patterns
*
* @param {Array} anomalies - Processed anomalies
* @param {Object} summary - Summary statistics
* @returns {Array} Array of insights
*/
generateAnomalyInsights(anomalies, summary) {
const insights = []
// Overall severity assessment
if (summary.highSeverity > 0) {
insights.push(`🚨 ${summary.highSeverity} high-severity anomalies detected - immediate attention recommended`)
}
if (summary.mediumSeverity > 0) {
insights.push(`⚠️ ${summary.mediumSeverity} medium-severity anomalies detected - monitoring recommended`)
}
if (summary.lowSeverity > 0) {
insights.push(`ℹ️ ${summary.lowSeverity} low-severity anomalies detected - normal variation likely`)
}
// Temporal patterns
if (anomalies.length > 0) {
const timestamps = anomalies.map(a => new Date(a.timestamp))
const timeSpan = timestamps.reduce((max, t) => t > max ? t : max) - timestamps.reduce((min, t) => t < min ? t : min)
const hoursSpan = timeSpan / (1000 * 60 * 60)
if (hoursSpan < 1) {
insights.push('🕐 Anomalies clustered within 1 hour - possible sudden system event')
} else if (hoursSpan < 24) {
insights.push('📅 Anomalies span less than 24 hours - temporary issue likely')
} else {
insights.push('📈 Anomalies span multiple days - systematic issue possible')
}
}
// Feature analysis insights
const featureAnalysis = this.analyzeFeaturePatterns(anomalies)
if (featureAnalysis.dominantFeature) {
insights.push(`🎯 Primary anomaly driver: ${featureAnalysis.dominantFeature}`)
}
if (featureAnalysis.multipleFeatures) {
insights.push('🔗 Multiple features affected - complex system interaction detected')
}
// Confidence patterns
const highConfidenceCount = anomalies.filter(a => a.confidence >= 0.8).length
const lowConfidenceCount = anomalies.filter(a => a.confidence < 0.5).length
if (highConfidenceCount > summary.total * 0.7) {
insights.push('✅ High confidence in anomaly detection - results reliable')
} else if (lowConfidenceCount > summary.total * 0.5) {
insights.push('❓ Many low-confidence detections - consider adjusting detector sensitivity')
}
// Rate of anomalies
if (summary.total > 10 && hoursSpan) {
const anomalyRate = summary.total / hoursSpan
if (anomalyRate > 1) {
insights.push(`⚡ High anomaly rate: ${anomalyRate.toFixed(1)} per hour - system instability possible`)
}
}
return insights
}
/**
* Analyze patterns in anomaly features
*
* @param {Array} anomalies - Array of anomalies with feature data
* @returns {Object} Feature pattern analysis
*/
analyzeFeaturePatterns(anomalies) {
const featureCounts = {}
let totalFeatures = 0
// Count feature occurrences across anomalies
anomalies.forEach(anomaly => {
if (anomaly.features && anomaly.features.length > 0) {
totalFeatures += anomaly.features.length
anomaly.features.forEach(feature => {
const featureName = feature.feature_name || feature.name
featureCounts[featureName] = (featureCounts[featureName] || 0) + 1
})
}
})
// Find dominant feature
let dominantFeature = null
let maxCount = 0
Object.entries(featureCounts).forEach(([feature, count]) => {
if (count > maxCount) {
maxCount = count
dominantFeature = feature
}
})
return {
dominantFeature: dominantFeature,
multipleFeatures: Object.keys(featureCounts).length > 1,
featureCounts: featureCounts,
averageFeaturesPerAnomaly: totalFeatures / Math.max(anomalies.length, 1)
}
}
/**
* Get detector status and statistics
*
* @param {string} detectorId - Detector ID
* @returns {Object} Detector status information
*/
async getDetectorStatus(detectorId) {
console.log(`📊 Getting status for detector: ${detectorId}`)
try {
const profile = await this.client.getDetectorProfile(detectorId)
const localInfo = this.activeDetectors.get(detectorId)
const status = {
id: detectorId,
name: localInfo?.name || 'Unknown',
type: localInfo?.type || 'Unknown',
state: profile.state || 'Unknown',
created: localInfo?.created || null,
lastUpdate: new Date().toISOString(),
// Model information
modelInfo: {
initProgress: profile.init_progress || null,
modelSizeInBytes: profile.model?.model_size_in_bytes || null,
modelId: profile.model?.model_id || null
},
// Performance metrics
metrics: {
totalSamples: profile.total_size_in_bytes || 0,
anomalyRate: profile.anomaly_grade?.anomaly_grade || null,
lastAnomalyTime: profile.last_update_time || null
},
// Error information
error: profile.error || null,
// Raw profile for debugging
rawProfile: profile
}
console.log(`📋 Detector ${detectorId} status: ${status.state}`)
return status
} catch (error) {
console.error(`❌ Failed to get detector status: ${error.message}`)
throw error
}
}
/**
* Clean up and delete a detector
*
* @param {string} detectorId - Detector ID to delete
* @param {Object} options - Deletion options
* @returns {Object} Deletion response
*/
async deleteDetector(detectorId, options = {}) {
const { stopFirst = true, force = false } = options
console.log(`🗑️ Deleting detector: ${detectorId}`)
try {
// Stop detector first if requested
if (stopFirst) {
try {
await this.stopDetector(detectorId)
} catch (error) {
if (!force) {
throw error
}
console.warn(`⚠️ Could not stop detector, continuing with deletion: ${error.message}`)
}
}
// Delete the detector
const response = await this.client.deleteAnomalyDetector(detectorId)
// Remove from local tracking
this.activeDetectors.delete(detectorId)
console.log(`✅ Detector ${detectorId} deleted successfully`)
return response
} catch (error) {
console.error(`❌ Failed to delete detector: ${error.message}`)
throw error
}
}
/**
* List all managed detectors
*
* @returns {Array} Array of detector information
*/
listManagedDetectors() {
return Array.from(this.activeDetectors.values()).map(detector => ({
id: detector.id,
name: detector.name,
type: detector.type,
status: detector.status,
created: detector.created
}))
}
/**
* Export detector configuration for backup or replication
*
* @param {string} detectorId - Detector ID
* @returns {Object} Exportable detector configuration
*/
exportDetectorConfig(detectorId) {
const detector = this.activeDetectors.get(detectorId)
if (!detector) {
throw new Error(`Detector ${detectorId} not found in managed detectors`)
}
return {
config: detector.config,
metadata: {
name: detector.name,
type: detector.type,
created: detector.created,
exportedAt: new Date().toISOString()
}
}
}
}
module.exports = AnomalyDetectionManager
/**
* Application Constants
*
* Centralized constants organized by data type to prevent repetition
* and improve maintainability across the OpenSearch anomaly detection application.
*/
// Field names used in documents and queries
const FIELD_NAMES = {
TIMESTAMP: 'timestamp',
METRIC_VALUE: 'metric_value',
CATEGORY: 'category',
SOURCE: 'source',
IS_ANOMALY: 'is_anomaly',
ANOMALY_SCORE: 'anomaly_score',
METADATA: 'metadata',
REGION: 'region',
ENVIRONMENT: 'environment',
SERVER_TYPE: 'server_type',
DATACENTER: 'datacenter'
}
// OpenSearch detector states
const DETECTOR_STATES = {
RUNNING: 'RUNNING',
DISABLED: 'DISABLED',
INIT: 'INIT',
STOPPED: 'STOPPED',
ERROR: 'ERROR'
}
// Time units for OpenSearch APIs
const TIME_UNITS = {
MINUTES: 'Minutes',
HOURS: 'Hours',
DAYS: 'Days',
SECONDS: 'Seconds'
}
// Metric categories for system monitoring
const METRIC_CATEGORIES = {
CPU_USAGE: 'cpu_usage',
MEMORY_USAGE: 'memory_usage',
NETWORK_TRAFFIC: 'network_traffic',
RESPONSE_TIME: 'response_time',
DISK_IOPS: 'disk_iops',
DB_CONNECTIONS: 'db_connections'
}
// Aggregation types for detector features
const AGGREGATION_TYPES = {
AVG: 'avg',
MAX: 'max',
MIN: 'min',
SUM: 'sum',
COUNT: 'count'
}
// Anomaly severity levels
const SEVERITY_LEVELS = {
HIGH: 'high',
MEDIUM: 'medium',
LOW: 'low'
}
// Anomaly types for data generation
const ANOMALY_TYPES = {
SPIKE: 'spike',
DROP: 'drop',
DRIFT: 'drift',
OUTLIER: 'outlier',
PLATEAU: 'plateau'
}
// Environment types
const ENVIRONMENTS = {
PRODUCTION: 'production',
STAGING: 'staging',
DEVELOPMENT: 'development'
}
// Default configuration values
const DEFAULTS = {
// Connection settings
OPENSEARCH_NODE: 'https://localhost:9200',
USERNAME: 'admin',
PASSWORD: 'admin',
// Detection intervals
DETECTION_INTERVAL_MINUTES: 5,
WINDOW_DELAY_MINUTES: 1,
DATA_INTERVAL_MINUTES: 1,
// Data generation
DATA_HOURS: 2,
ANOMALY_PROBABILITY: 0.05,
BATCH_SIZE: 50,
SERVER_COUNT: 10,
// Thresholds
ANOMALY_GRADE_THRESHOLD: 0.7,
CONFIDENCE_THRESHOLD: 0.8,
// Limits
MAX_FEATURES: 5,
MAX_CATEGORY_FIELDS: 2,
SHINGLE_SIZE: 8
}
module.exports = {
FIELD_NAMES,
DETECTOR_STATES,
TIME_UNITS,
METRIC_CATEGORIES,
AGGREGATION_TYPES,
SEVERITY_LEVELS,
ANOMALY_TYPES,
ENVIRONMENTS,
DEFAULTS
}
/**
* Data Generator Module
*
* Generates realistic test data for OpenSearch anomaly detection testing.
* Creates various patterns including normal behavior, seasonal patterns,
* trends, and anomalous events for comprehensive detector validation.
*
* This module works in conjunction with the AnomalyDetectionManager to
* provide synthetic data for testing anomaly detection capabilities.
*/
const { randomBytes } = require('crypto')
class DataGenerator {
/**
* Initialize the data generator
*
* @param {Object} config - Generator configuration
*/
constructor(config = {}) {
this.config = {
// Base data generation settings
baselineValue: 100,
noiseLevel: 0.1,
trendStrength: 0.02,
seasonalStrength: 0.3,
// Anomaly settings
anomalyProbability: 0.05,
anomalyMagnitude: 3.0,
anomalyDuration: 5, // Data points
// Data source simulation
sources: ['server-01', 'server-02', 'server-03', 'api-gateway', 'database'],
categories: ['cpu', 'memory', 'network', 'disk', 'response_time'],
// Time series settings
intervalMinutes: 1,
startTime: new Date(),
...config
}
// Internal state for pattern generation
this.patternState = {
trend: 0,
lastValues: new Map(),
anomalyCountdown: new Map(),
seasonalPhase: 0
}
console.log('📊 Data Generator initialized')
console.log(` Sources: ${this.config.sources.length}`)
console.log(` Categories: ${this.config.categories.length}`)
console.log(` Anomaly probability: ${(this.config.anomalyProbability * 100).toFixed(1)}%`)
}
/**
* Generate a single data point with realistic patterns
*
* @param {Object} options - Generation options
* @returns {Object} Generated data point
*/
generateDataPoint(options = {}) {
const {
timestamp = new Date(),
source = this.randomChoice(this.config.sources),
category = this.randomChoice(this.config.categories),
includeAnomaly = null // null = random, true/false = forced
} = options
const key = `${source}_${category}`
const lastValue = this.patternState.lastValues.get(key) || this.config.baselineValue
// Calculate base value with trend
let baseValue = this.config.baselineValue + this.patternState.trend
// Add seasonal component
const hourOfDay = timestamp.getHours()
const dayOfWeek = timestamp.getDay()
const seasonal = this.calculateSeasonalComponent(hourOfDay, dayOfWeek, category)
// Add noise and smooth transitions
const noise = (Math.random() - 0.5) * this.config.noiseLevel * baseValue
const smoothing = 0.7 // Smooth transitions between values
let value = (smoothing * lastValue) + ((1 - smoothing) * (baseValue + seasonal + noise))
// Check for anomalies
let isAnomaly = false
let anomalyType = null
if (this.patternState.anomalyCountdown.has(key) && this.patternState.anomalyCountdown.get(key) > 0) {
// Continue existing anomaly
isAnomaly = true
anomalyType = 'continuing'
this.patternState.anomalyCountdown.set(key, this.patternState.anomalyCountdown.get(key) - 1)
// Apply anomaly effect
value = this.applyAnomalyEffect(value, category, 'continuing')
} else if (includeAnomaly === true || (includeAnomaly !== false && Math.random() < this.config.anomalyProbability)) {
// Start new anomaly
isAnomaly = true
anomalyType = 'spike'
this.patternState.anomalyCountdown.set(key, this.config.anomalyDuration)
// Apply anomaly effect
value = this.applyAnomalyEffect(value, category, 'spike')
}
// Ensure positive values for certain categories
if (['cpu', 'memory', 'response_time'].includes(category)) {
value = Math.max(0, value)
}
// Update state
this.patternState.lastValues.set(key, value)
this.patternState.trend += (Math.random() - 0.5) * this.config.trendStrength
this.patternState.seasonalPhase += 1
return {
timestamp: timestamp.toISOString(),
source: source,
category: category,
metric_value: parseFloat(value.toFixed(3)),
is_anomaly: isAnomaly,
anomaly_type: anomalyType,
// Additional contextual fields
hour_of_day: hourOfDay,
day_of_week: dayOfWeek,
baseline: parseFloat(baseValue.toFixed(3)),
seasonal_component: parseFloat(seasonal.toFixed(3))
}
}
/**
* Calculate seasonal patterns based on time and category
*
* @param {number} hour - Hour of day (0-23)
* @param {number} dayOfWeek - Day of week (0-6)
* @param {string} category - Metric category
* @returns {number} Seasonal adjustment
*/
calculateSeasonalComponent(hour, dayOfWeek, category) {
const baseValue = this.config.baselineValue
let seasonal = 0
// Different patterns for different categories
switch (category) {
case 'cpu':
// Higher during business hours, lower at night
seasonal = Math.sin((hour - 6) / 24 * 2 * Math.PI) * this.config.seasonalStrength * baseValue
// Weekend reduction
if (dayOfWeek === 0 || dayOfWeek === 6) {
seasonal *= 0.7
}
break
case 'response_time':
// Inverse pattern - higher load = higher response time
seasonal = Math.sin((hour - 10) / 24 * 2 * Math.PI + Math.PI) * this.config.seasonalStrength * baseValue * 0.5
break
case 'network':
// Peaks during data sync hours
if (hour === 2 || hour === 14) {
seasonal = this.config.seasonalStrength * baseValue
}
break
case 'memory':
// Gradual buildup during the day, cleanup at night
seasonal = (hour / 24 - 0.5) * this.config.seasonalStrength * baseValue
break
default:
// Generic pattern
seasonal = Math.sin(hour / 24 * 2 * Math.PI) * this.config.seasonalStrength * baseValue * 0.3
}
return seasonal
}
/**
* Apply anomaly effects to a value
*
* @param {number} baseValue - Base value to modify
* @param {string} category - Metric category
* @param {string} type - Anomaly type
* @returns {number} Modified value
*/
applyAnomalyEffect(baseValue, category, type) {
let multiplier = this.config.anomalyMagnitude
// Category-specific anomaly patterns
switch (category) {
case 'cpu':
case 'memory':
// Usually spike upward
multiplier = 1 + (Math.random() * this.config.anomalyMagnitude)
break
case 'response_time':
// Can spike very high
multiplier = 1 + (Math.random() * this.config.anomalyMagnitude * 2)
break
case 'network':
// Can go both ways
multiplier = Math.random() > 0.5 ?
1 + (Math.random() * this.config.anomalyMagnitude) :
Math.max(0.1, 1 - (Math.random() * 0.8))
break
default:
// Random direction
multiplier = Math.random() > 0.5 ?
1 + (Math.random() * this.config.anomalyMagnitude) :
Math.max(0.1, 1 - (Math.random() * this.config.anomalyMagnitude * 0.5))
}
return baseValue * multiplier
}
/**
* Generate a time series of data points
*
* @param {Object} options - Generation options
* @returns {Array} Array of data points
*/
generateTimeSeries(options = {}) {
const {
duration = 24, // hours
source = null,
category = null,
includeAnomalies = true,
anomalyCount = null // Specific number of anomalies to inject
} = options
const points = []
const totalPoints = Math.floor(duration * 60 / this.config.intervalMinutes)
const startTime = new Date(this.config.startTime)
console.log(`📈 Generating time series: ${totalPoints} points over ${duration} hours`)
// Calculate anomaly injection points if specific count requested
let anomalyPoints = new Set()
if (anomalyCount && includeAnomalies) {
while (anomalyPoints.size < Math.min(anomalyCount, Math.floor(totalPoints * 0.1))) {
anomalyPoints.add(Math.floor(Math.random() * totalPoints))
}
console.log(` Injecting ${anomalyPoints.size} targeted anomalies`)
}
for (let i = 0; i < totalPoints; i++) {
const timestamp = new Date(startTime.getTime() + i * this.config.intervalMinutes * 60 * 1000)
const forceAnomaly = anomalyPoints.has(i)
const point = this.generateDataPoint({
timestamp,
source: source,
category: category,
includeAnomaly: includeAnomalies ? forceAnomaly : false
})
points.push(point)
}
const anomaliesGenerated = points.filter(p => p.is_anomaly).length
console.log(`✅ Generated ${points.length} data points with ${anomaliesGenerated} anomalies`)
return points
}
/**
* Generate multi-source, multi-category dataset
*
* @param {Object} options - Generation options
* @returns {Array} Array of data points from multiple sources
*/
generateMultiSourceDataset(options = {}) {
const {
duration = 24,
sources = this.config.sources,
categories = this.config.categories,
correlateAnomalies = true, // Create correlated anomalies across sources
anomalyEvents = 3 // Number of system-wide anomaly events
} = options
const dataset = []
const totalPoints = Math.floor(duration * 60 / this.config.intervalMinutes)
console.log(`🏗️ Generating multi-source dataset`)
console.log(` Sources: ${sources.length}, Categories: ${categories.length}`)
console.log(` Total combinations: ${sources.length * categories.length}`)
// Generate anomaly event times for correlation
const anomalyEvents = []
if (correlateAnomalies && options.anomalyEvents > 0) {
for (let i = 0; i < options.anomalyEvents; i++) {
anomalyEvents.push({
startPoint: Math.floor(Math.random() * (totalPoints - 10)),
duration: 3 + Math.floor(Math.random() * 7), // 3-10 points
affectedSources: this.sampleArray(sources, Math.ceil(sources.length * 0.6)),
affectedCategories: this.sampleArray(categories, Math.ceil(categories.length * 0.4))
})
}
console.log(` ${anomalyEvents.length} correlated anomaly events planned`)
}
// Generate data for each source-category combination
for (const source of sources) {
for (const category of categories) {
console.log(` Generating ${source}/${category}...`)
for (let i = 0; i < totalPoints; i++) {
const timestamp = new Date(this.config.startTime.getTime() + i * this.config.intervalMinutes * 60 * 1000)
// Check if this point should be anomalous due to system event
let forceAnomaly = false
for (const event of anomalyEvents) {
if (i >= event.startPoint && i < event.startPoint + event.duration &&
event.affectedSources.includes(source) && event.affectedCategories.includes(category)) {
forceAnomaly = true
break
}
}
const point = this.generateDataPoint({
timestamp,
source,
category,
includeAnomaly: forceAnomaly ? true : null
})
// Add event correlation info
if (forceAnomaly) {
point.correlated_event = true
}
dataset.push(point)
}
}
}
// Shuffle the dataset to simulate real-world data arrival patterns
this.shuffleArray(dataset)
const totalAnomalies = dataset.filter(p => p.is_anomaly).length
const correlatedAnomalies = dataset.filter(p => p.correlated_event).length
console.log(`✅ Generated ${dataset.length} total data points`)
console.log(` Anomalies: ${totalAnomalies} (${correlatedAnomalies} correlated)`)
return dataset
}
/**
* Generate data with specific anomaly patterns for testing
*
* @param {string} pattern - Pattern type ('spike', 'dip', 'trend', 'seasonal')
* @param {Object} options - Pattern-specific options
* @returns {Array} Array of data points with specific pattern
*/
generatePatternData(pattern, options = {}) {
const {
duration = 6, // hours
intensity = 1.5,
source = 'test-source',
category = 'test-metric'
} = options
console.log(`🎯 Generating pattern data: ${pattern}`)
// Temporarily adjust config for pattern generation
const originalConfig = { ...this.config }
switch (pattern) {
case 'spike':
this.config.anomalyProbability = 0.3
this.config.anomalyMagnitude = intensity * 2
this.config.anomalyDuration = 2
break
case 'dip':
this.config.anomalyProbability = 0.2
this.config.anomalyMagnitude = -intensity
this.config.anomalyDuration = 4
break
case 'trend':
this.config.trendStrength = intensity * 0.1
this.config.anomalyProbability = 0.05
break
case 'seasonal':
this.config.seasonalStrength = intensity
this.config.anomalyProbability = 0.1
break
default:
console.warn(`Unknown pattern: ${pattern}, using default`)
}
const data = this.generateTimeSeries({
duration,
source,
category,
includeAnomalies: true
})
// Restore original config
this.config = originalConfig
return data
}
/**
* Export dataset in various formats
*
* @param {Array} dataset - Dataset to export
* @param {string} format - Export format ('json', 'csv', 'bulk')
* @returns {string} Formatted data
*/
exportDataset(dataset, format = 'json') {
console.log(`📤 Exporting ${dataset.length} data points as ${format}`)
switch (format) {
case 'json':
return JSON.stringify(dataset, null, 2)
case 'csv':
if (dataset.length === 0) return ''
const headers = Object.keys(dataset[0]).join(',')
const rows = dataset.map(point =>
Object.values(point).map(value =>
typeof value === 'string' ? `"${value}"` : value
).join(',')
)
return [headers, ...rows].join('\n')
case 'bulk':
// OpenSearch bulk format
return dataset.map(point =>
JSON.stringify({ index: {} }) + '\n' + JSON.stringify(point)
).join('\n') + '\n'
default:
throw new Error(`Unsupported export format: ${format}`)
}
}
// Utility methods
randomChoice(array) {
return array[Math.floor(Math.random() * array.length)]
}
sampleArray(array, count) {
const shuffled = [...array].sort(() => Math.random() - 0.5)
return shuffled.slice(0, count)
}
shuffleArray(array) {
for (let i = array.length - 1; i > 0; i--) {
const j = Math.floor(Math.random() * (i + 1))
;[array[i], array[j]] = [array[j], array[i]]
}
}
/**
* Reset internal state for fresh data generation
*/
reset() {
this.patternState = {
trend: 0,
lastValues: new Map(),
anomalyCountdown: new Map(),
seasonalPhase: 0
}
console.log('🔄 Data generator state reset')
}
/**
* Get current generator statistics
*
* @returns {Object} Generator statistics
*/
getStats() {
return {
activeAnomalies: Array.from(this.patternState.anomalyCountdown.entries())
.filter(([_, count]) => count > 0).length,
trackedSeries: this.patternState.lastValues.size,
currentTrend: this.patternState.trend,
config: { ...this.config }
}
}
}
// Export factory function for easy configuration
function createDataGenerator(config = {}) {
return new DataGenerator(config)
}
module.exports = { DataGenerator, createDataGenerator }
#!/usr/bin/env node
/**
* OpenSearch Anomaly Detection CLI
*
* Entry point for the OpenSearch anomaly detection application.
* Uses Commander.js for robust CLI argument processing and provides
* a professional command-line interface for anomaly detection workflows.
*
* Usage:
* opensearch-anomaly [command] [options]
*
* Commands:
* basic 🎯 Run basic anomaly detection demo
* advanced 🚀 Run advanced multi-metric detection
* generate 📊 Generate data only (no detection)
* detect 🔍 Run detection on existing data
* compare 📈 Compare multiple detectors
* health 🏥 Check detector health status
*
* Global Options:
* --node <url> 🔌 OpenSearch cluster endpoint
* --username <user> 👤 Authentication username
* --password <pass> 🔐 Authentication password
* --index <name> 📋 Target index name
* --cleanup 🧹 Delete detectors after demo
* --verbose 📝 Enable verbose logging
*/
const { Command } = require('commander')
const chalk = require('chalk')
const OpenSearchClient = require('./os-client')
const DataGenerator = require('./data-generator')
const AnomalyDetectionManager = require('./anomaly-detection')
const program = new Command()
// ASCII Art Banner
const banner = `
${chalk.cyan('╔══════════════════════════════════════════════════════════════╗')}
${chalk.cyan('║')} ${chalk.bold.white('🔍 OpenSearch Anomaly Detection CLI')} ${chalk.cyan('║')}
${chalk.cyan('║')} ${chalk.gray('Advanced anomaly detection for time-series data')} ${chalk.cyan('║')}
${chalk.cyan('╚══════════════════════════════════════════════════════════════╝')}
`
/**
* Configure the main CLI program
*/
function setupCLI() {
program
.command('chaos')
.description('💥 Generate chaotic system failures with correlated anomalies for dramatic demo')
.option('-d, --duration <minutes>', '⏰ Duration of chaos scenario in minutes', '30')
.option('-s, --severity <level>', '🔥 Chaos severity: mild, moderate, severe', 'severe')
.option('--interval <minutes>', '⏱️ Data interval in minutes', '1')
.action(async (options) => {
const config = buildConfig(program.opts(), options, 'chaos')
await runChaosDemo(config)
})
program
.name('opensearch-anomaly')
.description('🔍 OpenSearch Anomaly Detection CLI - Advanced anomaly detection for time-series data')
.version('1.0.0')
.addHelpText('before', banner)
// Global options available to all commands
program
.option('-n, --node <url>', '🔌 OpenSearch cluster endpoint', process.env.OPENSEARCH_NODE || 'https://localhost:9200')
.option('-u, --username <user>', '👤 Authentication username', process.env.OPENSEARCH_USERNAME || 'admin')
.option('-p, --password <pass>', '🔐 Authentication password', process.env.OPENSEARCH_PASSWORD || 'admin')
.option('-i, --index <name>', '📋 Target index name', process.env.OPENSEARCH_INDEX || 'system-metrics-anomaly-test')
.option('-c, --cleanup', '🧹 Delete detectors after demo', process.env.CLEANUP_DETECTOR === 'true')
.option('-v, --verbose', '📝 Enable verbose logging', false)
// Basic anomaly detection command
program
.command('basic')
.description('🎯 Run basic single-metric anomaly detection demo')
.option('-h, --hours <number>', '⏰ Hours of historical data to generate', '2')
.option('-r, --rate <number>', '📊 Anomaly probability (0.0-1.0)', '0.05')
.option('-m, --metric <type>', '📈 Primary metric type', 'cpu_usage')
.option('--interval <minutes>', '⏱️ Data interval in minutes', '1')
.option('--detection-interval <minutes>', '🔍 Detection interval in minutes', '5')
.action(async (options) => {
const config = buildConfig(program.opts(), options, 'basic')
await runBasicDemo(config)
})
// Advanced multi-metric detection command
program
.command('advanced')
.description('🚀 Run advanced multi-metric anomaly detection')
.option('-h, --hours <number>', '⏰ Hours of historical data to generate', '4')
.option('-r, --rate <number>', '📊 Anomaly probability (0.0-1.0)', '0.03')
.option('-m, --metrics <list>', '📈 Comma-separated metric types', 'cpu_usage,memory_usage,response_time')
.option('--interval <minutes>', '⏱️ Data interval in minutes', '1')
.option('--high-cardinality', '🎛️ Enable high-cardinality detection', false)
.action(async (options) => {
const config = buildConfig(program.opts(), options, 'advanced')
await runAdvancedDemo(config)
})
// Data generation only command
program
.command('generate')
.description('📊 Generate time-series data without running detection')
.option('-h, --hours <number>', '⏰ Hours of historical data to generate', '24')
.option('-r, --rate <number>', '📊 Anomaly probability (0.0-1.0)', '0.02')
.option('-m, --metrics <list>', '📈 Comma-separated metric types', 'cpu_usage,memory_usage,network_traffic,response_time')
.option('--interval <minutes>', '⏱️ Data interval in minutes', '5')
.option('--servers <number>', '🖥️ Number of servers to simulate', '10')
.action(async (options) => {
const config = buildConfig(program.opts(), options, 'generate')
await runDataGeneration(config)
})
// Detection on existing data command
program
.command('detect')
.description('🔍 Run anomaly detection on existing data')
.option('-d, --detector-name <name>', '🏷️ Custom detector name')
.option('-m, --metric <type>', '📈 Metric type to analyze', 'cpu_usage')
.option('--historical', '📈 Run historical analysis', false)
.option('--detection-interval <minutes>', '🔍 Detection interval in minutes', '5')
.action(async (options) => {
const config = buildConfig(program.opts(), options, 'detect')
await runDetectionOnly(config)
})
// Compare detectors command
program
.command('compare <detector-ids...>')
.description('📈 Compare performance of multiple detectors')
.option('--time-range <hours>', '⏰ Time range for comparison in hours', '24')
.action(async (detectorIds, options) => {
const config = buildConfig(program.opts(), options, 'compare')
config.detectorIds = detectorIds
await runComparison(config)
})
// Health check command
program
.command('health [detector-id]')
.description('🏥 Check detector health status')
.option('--all', '📊 Check all managed detectors', false)
.action(async (detectorId, options) => {
const config = buildConfig(program.opts(), options, 'health')
config.detectorId = detectorId
await runHealthCheck(config)
})
// Cleanup command
program
.command('cleanup')
.description('🧹 Clean up detectors and resources')
.option('--all', '💥 Delete all detectors (use with caution)', false)
.option('--inactive-only', '⏸️ Delete only inactive detectors', true)
.action(async (options) => {
const config = buildConfig(program.opts(), options, 'cleanup')
await runCleanup(config)
})
}
/**
* Build comprehensive configuration from CLI options
*/
function buildConfig(globalOpts, commandOpts, mode) {
const config = {
// OpenSearch connection settings
opensearch: {
node: globalOpts.node,
auth: {
username: globalOpts.username,
password: globalOpts.password
},
ssl: {
rejectUnauthorized: false
}
},
// Index configuration
indexName: globalOpts.index,
// Data generation settings
dataGeneration: {
dataHours: parseInt(commandOpts.hours || '2'),
intervalMinutes: parseInt(commandOpts.interval || '1'),
categories: commandOpts.metrics ? commandOpts.metrics.split(',') : ['cpu_usage', 'memory_usage', 'response_time'],
anomalyProbability: parseFloat(commandOpts.rate || '0.05'),
batchSize: 50,
serverCount: parseInt(commandOpts.servers || '10')
},
// Anomaly detection settings
detection: {
detectionInterval: parseInt(commandOpts.detectionInterval || '5'),
windowDelay: 1,
cleanup: globalOpts.cleanup,
detectorName: commandOpts.detectorName,
primaryMetric: commandOpts.metric || 'cpu_usage',
highCardinality: commandOpts.highCardinality || false,
historical: commandOpts.historical || false
},
// Runtime settings
mode: mode,
verbose: globalOpts.verbose,
timeRange: parseInt(commandOpts.timeRange || '24'),
// Command-specific options
...commandOpts
}
return config
}
/**
* 🎯 Basic anomaly detection demo
*/
async function runBasicDemo(config) {
console.log(chalk.cyan('🚀 Starting Basic OpenSearch Anomaly Detection Demo...\n'))
const client = new OpenSearchClient(config.opensearch)
const generator = new DataGenerator()
const anomalyManager = new AnomalyDetectionManager(client, {
namePrefix: config.indexName,
intervalMinutes: config.detection.detectionInterval,
windowDelayMinutes: config.detection.windowDelay
})
try {
console.log(chalk.blue('🔌 Connecting to OpenSearch...'))
await client.initialize(config.indexName)
console.log(chalk.green('📊 Generating time series data with embedded anomalies...'))
const startDate = new Date(Date.now() - config.dataGeneration.dataHours * 60 * 60 * 1000)
const data = generator.generateTimeSeriesData({
startDate: startDate,
endDate: new Date(),
intervalMinutes: config.dataGeneration.intervalMinutes,
categories: config.dataGeneration.categories,
anomalyProbability: config.dataGeneration.anomalyProbability
})
console.log(chalk.yellow(`💾 Inserting ${data.length} documents in batches...`))
await client.bulkInsert(data, config.dataGeneration.batchSize)
const stats = await client.getIndexStats()
console.log(chalk.magenta('\n📈 Index Statistics:'), JSON.stringify(stats, null, 2))
console.log(chalk.gray('\n⏳ Waiting for data to be indexed...'))
await new Promise(resolve => setTimeout(resolve, 5000))
console.log(chalk.cyan('\n🔍 Creating basic anomaly detector...'))
const detector = await anomalyManager.createBasicDetector({
name: `${config.indexName}-${config.detection.primaryMetric}-detector`,
metricField: 'metric_value',
category: config.detection.primaryMetric,
aggregationType: 'avg'
})
const detectorId = detector._id
console.log(chalk.green(`✅ Anomaly detector created with ID: ${detectorId}`))
console.log(chalk.blue('\n▶️ Starting anomaly detector...'))
await anomalyManager.startDetector(detectorId, { waitForInitialization: true })
console.log(chalk.gray('\n⏳ Waiting for detector to process data (30 seconds)...'))
await new Promise(resolve => setTimeout(resolve, 30000))
console.log(chalk.cyan('\n📋 Getting anomaly detection results...'))
const results = await anomalyManager.getAnomalyResults(detectorId, {
startTime: startDate,
endTime: new Date()
})
displayDetectionResults(results)
const health = await anomalyManager.getDetectorHealth(detectorId)
console.log(chalk.blue('\n🏥 Detector Health Check:'))
console.log(` Status: ${health.isHealthy ? chalk.green('✅ Healthy') : chalk.red('❌ Issues')}`)
console.log(` State: ${health.status}`)
if (health.issues.length > 0) {
console.log(chalk.yellow(` ⚠️ Issues: ${health.issues.join(', ')}`))
}
if (health.recommendations.length > 0) {
console.log(chalk.blue(` 💡 Recommendations: ${health.recommendations.join(', ')}`))
}
await compareWithManualAnomalies(client, config)
if (config.detection.cleanup) {
console.log(chalk.red('\n🧹 Cleaning up detector...'))
await anomalyManager.deleteDetector(detectorId)
} else {
console.log(chalk.gray(`\n💡 To cleanup later, run: ${chalk.cyan('opensearch-anomaly cleanup')} or delete detector ${detectorId}`))
}
console.log(chalk.green('\n✅ Basic demo completed successfully! 🎉'))
} catch (error) {
console.error(chalk.red('❌ Basic demo error:'), error)
handleCommonErrors(error)
}
}
/**
* 🚀 Advanced multi-metric anomaly detection demo
*/
async function runAdvancedDemo(config) {
console.log(chalk.cyan('🚀 Advanced Multi-Metric Anomaly Detection Demo...\n'))
const client = new OpenSearchClient(config.opensearch)
const generator = new DataGenerator()
const anomalyManager = new AnomalyDetectionManager(client, {
namePrefix: config.indexName,
intervalMinutes: 1,
maxFeatures: config.detection.highCardinality ? 2 : 4
})
try {
await client.initialize(config.indexName)
// Generate more comprehensive data
console.log(chalk.green('📊 Generating comprehensive multi-metric data...'))
const startDate = new Date(Date.now() - config.dataGeneration.dataHours * 60 * 60 * 1000)
const data = generator.generateTimeSeriesData({
startDate: startDate,
endDate: new Date(),
intervalMinutes: config.dataGeneration.intervalMinutes,
categories: config.dataGeneration.categories,
anomalyProbability: config.dataGeneration.anomalyProbability,
serverCount: config.dataGeneration.serverCount
})
await client.bulkInsert(data, config.dataGeneration.batchSize)
// Create appropriate detector type
console.log(chalk.cyan('\n🔍 Creating advanced anomaly detector...'))
let detector
if (config.detection.highCardinality) {
console.log(chalk.blue('🎛️ Using high-cardinality detector for entity-based analysis'))
detector = await anomalyManager.createHighCardinalityDetector({
name: `${config.indexName}-hc-detector`,
categoryFields: ['source'],
metricField: 'metric_value',
maxEntities: 100
})
} else {
console.log(chalk.blue('📈 Using multi-metric detector'))
detector = await anomalyManager.createMultiMetricDetector({
name: `${config.indexName}-multi-metric-detector`,
metrics: config.dataGeneration.categories.slice(0, 3).map(category => ({
field: 'metric_value',
aggregations: ['avg'],
category: category
})),
filters: {
environment: 'production'
}
})
}
const detectorId = detector._id
console.log(chalk.green(`✅ Advanced detector created: ${detectorId}`))
// Start detector and run historical analysis
await anomalyManager.startDetector(detectorId, { waitForInitialization: true })
console.log(chalk.blue('\n📈 Running historical analysis...'))
await anomalyManager.runHistoricalAnalysis(detectorId, {
startTime: new Date(Date.now() - 12 * 60 * 60 * 1000),
endTime: new Date(),
waitForCompletion: false
})
// Wait for results
console.log(chalk.gray('\n⏳ Waiting for historical analysis (60 seconds)...'))
await new Promise(resolve => setTimeout(resolve, 60000))
const results = await anomalyManager.getAnomalyResults(detectorId)
console.log(chalk.cyan('\n📊 Historical analysis results:'))
displayDetectionResults(results)
// Check detector health
const health = await anomalyManager.getDetectorHealth(detectorId)
console.log(chalk.blue(`\n🏥 Detector Health: ${health.isHealthy ? chalk.green('✅ Healthy') : chalk.red('❌ Issues')}`))
if (config.detection.cleanup) {
await anomalyManager.deleteDetector(detectorId)
}
console.log(chalk.green('\n✅ Advanced demo completed successfully! 🎉'))
} catch (error) {
console.error(chalk.red('❌ Advanced demo error:'), error)
handleCommonErrors(error)
}
}ectorId, {
startTime: new Date(Date.now() - 12 * 60 * 60 * 1000),
endTime: new Date(),
waitForCompletion: false
})
// Wait for results
console.log('\n⏳ Waiting for historical analysis (60 seconds)...')
await new Promise(resolve => setTimeout(resolve, 60000))
const results = await anomalyManager.getAnomalyResults(detectorId)
console.log('\n📊 Historical analysis results:')
displayDetectionResults(results)
// Check detector health
const health = await anomalyManager.getDetectorHealth(detectorId)
console.log(`\n🏥 Detector Health: ${health.isHealthy ? 'Healthy' : 'Issues detected'}`)
if (config.detection.cleanup) {
await anomalyManager.deleteDetector(detectorId)
}
console.log('\n✅ Advanced demo completed successfully!')
} catch (error) {
console.error('❌ Advanced demo error:', error)
handleCommonErrors(error)
}
}
/**
* Generate data only (no anomaly detection)
*/
async function runDataGeneration(config) {
console.log('📊 Data Generation Mode...\n')
const client = new OpenSearchClient(config.opensearch)
const generator = new DataGenerator()
try {
await client.initialize(config.indexName)
const startDate = new Date(Date.now() - config.dataGeneration.dataHours * 60 * 60 * 1000)
const data = generator.generateTimeSeriesData({
startDate: startDate,
endDate: new Date(),
intervalMinutes: config.dataGeneration.intervalMinutes,
categories: config.dataGeneration.categories,
anomalyProbability: config.dataGeneration.anomalyProbability
})
console.log(`💾 Inserting ${data.length} documents...`)
await client.bulkInsert(data, config.dataGeneration.batchSize)
const stats = await client.getIndexStats()
console.log('\n📈 Final Statistics:', JSON.stringify(stats, null, 2))
console.log('\n✅ Data generation completed!')
} catch (error) {
console.error('❌ Data generation error:', error)
}
}
/**
* Run detection only on existing data
*/
async function runDetectionOnly(config) {
console.log('🔍 Detection Only Mode...\n')
const client = new OpenSearchClient(config.opensearch)
try {
await client.connect()
// Check if index exists and has data
const stats = await client.getIndexStats()
if (stats.total_documents === 0) {
console.log('❌ No data found in index. Run data generation first.')
return
}
console.log(`📊 Found ${stats.total_documents} documents in index`)
// Create detector for existing data
const detectorConfig = {
name: `${config.indexName}-existing-data-detector`,
description: 'Detector for existing data analysis',
feature_attributes: [
{
feature_name: 'metric_analysis',
feature_enabled: true,
aggregation_query: {
metric_analysis: { avg: { field: 'metric_value' } }
}
}
]
}
const detector = await client.createAnomalyDetector(detectorConfig)
const detectorId = detector._id
console.log(`✅ Detector created: ${detectorId}`)
await client.startAnomalyDetector(detectorId)
// Run historical analysis on existing data
await client.runHistoricalAnalysis(detectorId, {
startTime: new Date(Date.now() - 24 * 60 * 60 * 1000).getTime(),
endTime: new Date().getTime()
})
console.log('\n⏳ Processing existing data...')
await new Promise(resolve => setTimeout(resolve, 30000))
const results = await client.getDetectorResults(detectorId)
displayDetectionResults(results)
if (config.detection.cleanup) {
await client.deleteAnomalyDetector(detectorId)
}
console.log('\n✅ Detection analysis completed!')
} catch (error) {
console.error('❌ Detection error:', error)
}
}
/**
* Display anomaly detection results in a readable format
*/
function displayDetectionResults(results) {
if (results.anomaly_result && results.anomaly_result.length > 0) {
console.log(`🚨 Found ${results.anomaly_result.length} anomaly detection results:`)
results.anomaly_result.forEach((result, index) => {
console.log(`\n${index + 1}. Detection Result:`)
console.log(` Time: ${new Date(result.data_start_time).toISOString()}`)
console.log(` Anomaly Grade: ${result.anomaly_grade || 'N/A'}`)
console.log(` Confidence: ${result.confidence || 'N/A'}`)
if (result.feature_data && result.feature_data.length > 0) {
console.log(' Features:')
result.feature_data.forEach(feature => {
console.log(` - ${feature.feature_name}: ${feature.data || 'N/A'}`)
})
}
})
} else {
console.log('ℹ️ No anomaly results found yet. The detector may need more time to train.')
console.log(' Try running the detector for a longer period or with more data.')
}
}
/**
* Compare ML-detected anomalies with manually labeled ones
*/
async function compareWithManualAnomalies(client, config) {
console.log('\n🏷️ Comparing with manually labeled anomalies...')
try {
const manualAnomalies = await client.searchAnomalies({
category: 'cpu_usage',
minAnomalyScore: 0.7,
startTime: new Date(Date.now() - config.dataGeneration.dataHours * 60 * 60 * 1000)
})
console.log(`Found ${manualAnomalies.length} manually labeled anomalies:`)
manualAnomalies.slice(0, 5).forEach((anomaly, index) => {
console.log(`${index + 1}. ${anomaly.timestamp}: ${anomaly.metric_value.toFixed(2)} (manual score: ${anomaly.anomaly_score.toFixed(2)})`)
})
} catch (error) {
console.log('Could not retrieve manual anomalies for comparison')
}
}
/**
* Handle common OpenSearch and anomaly detection errors
*/
function handleCommonErrors(error) {
if (error.message.includes('anomaly_detection')) {
console.error('\n💡 Tip: Make sure OpenSearch has the Anomaly Detection plugin installed and enabled.')
console.error(' For OpenSearch Docker: Ensure AD plugin is installed and security is properly configured.')
} else if (error.message.includes('Connection')) {
console.error('\n💡 Tip: Check your OpenSearch connection settings and ensure the cluster is running.')
} else if (error.message.includes('authentication') || error.message.includes('401')) {
console.error('\n💡 Tip: Verify your username and password are correct.')
} else if (error.message.includes('permission') || error.message.includes('403')) {
console.error('\n💡 Tip: Ensure your user has permissions for anomaly detection operations.')
}
}
/**
* Main entry point
*/
async function main() {
const config = parseConfig()
// Handle help request
if (config.mode === 'help' || config.flags.help) {
showHelp()
return
}
// Display configuration
console.log('⚙️ Configuration:')
console.log(` Mode: ${config.mode}`)
console.log(` OpenSearch: ${config.opensearch.node}`)
console.log(` Index: ${config.indexName}`)
console.log(` Data Hours: ${config.dataGeneration.dataHours}`)
console.log(` Anomaly Rate: ${config.dataGeneration.anomalyProbability}`)
console.log('')
// Route to appropriate function based on mode
switch (config.mode) {
case 'basic':
await runBasicDemo(config)
break
case 'advanced':
await runAdvancedDemo(config)
break
case 'generate':
await runDataGeneration(config)
break
case 'detect':
await runDetectionOnly(config)
break
default:
console.error(`❌ Unknown mode: ${config.mode}`)
showHelp()
process.exit(1)
}
}
// Run main function if this file is executed directly
if (require.main === module) {
main().catch(error => {
console.error('❌ Fatal error:', error)
process.exit(1)
})
}
module.exports = {
main,
runBasicDemo,
runAdvancedDemo,
runDataGeneration,
runDetectionOnly,
parseConfig
}
/**
* OpenSearch Client Module
*
* Handles all OpenSearch operations including:
* - Connection management and initialization
* - Index creation and management with proper mappings
* - Document CRUD operations (Create, Read, Update, Delete)
* - Search and aggregation queries
* - Anomaly Detection plugin integration
*
* OpenSearch Documentation References:
* - Client API: https://opensearch.org/docs/latest/clients/javascript/
* - Index Mapping: https://opensearch.org/docs/latest/opensearch/mappings/
* - Bulk API: https://opensearch.org/docs/latest/api-reference/document-apis/bulk/
* - Search API: https://opensearch.org/docs/latest/api-reference/search/
* - Anomaly Detection: https://opensearch.org/docs/latest/monitoring-plugins/ad/
*/
const { Client } = require('@opensearch-project/opensearch')
class OpenSearchClient {
/**
* Initialize OpenSearch client with configuration
* @param {Object} config - OpenSearch connection configuration
* @param {string} config.node - OpenSearch cluster endpoint
* @param {Object} config.auth - Authentication credentials
* @param {Object} config.ssl - SSL configuration
*/
constructor(config) {
this.config = config
this.client = new Client({
node: config.node,
auth: config.auth,
ssl: config.ssl || { rejectUnauthorized: false }
})
this.indexName = null
console.log(`🔌 OpenSearch client configured for: ${config.node}`)
}
/**
* Initialize connection and create index if needed
* @param {string} indexName - Name of the index to work with
*/
async initialize(indexName) {
this.indexName = indexName
try {
// Test connection
await this.connect()
// Check if index exists, create if it doesn't
const indexExists = await this.client.indices.exists({
index: this.indexName
})
if (!indexExists.body) {
await this.createIndex()
console.log(`✅ Index "${this.indexName}" created successfully`)
} else {
console.log(`ℹ️ Index "${this.indexName}" already exists`)
}
} catch (error) {
console.error('❌ Failed to initialize OpenSearch client:', error)
throw error
}
}
/**
* Test OpenSearch connection with retry logic for network resilience
*/
async connect() {
const maxRetries = 3
const baseDelay = 1000 // 1 second
for (let attempt = 1; attempt <= maxRetries; attempt++) {
try {
console.log(attempt === 1 ?
`🔌 Connecting to OpenSearch...` :
`🔄 Connection retry ${attempt}/${maxRetries}...`)
const response = await this.client.info()
console.log(`✅ Connected to OpenSearch cluster: ${response.body.cluster_name} (v${response.body.version.number})`)
return response.body
} catch (error) {
if (attempt === maxRetries) {
console.error(`\n❌ OpenSearch Connection Error`)
console.error(` ├─ Cause: Failed to connect after ${maxRetries} attempts`)
console.error(` ├─ Last error: ${error.message}`)
console.error(` ├─ Impact: Cannot proceed with anomaly detection demo`)
console.error(` └─ Recommendation: Check OpenSearch cluster status and network connectivity`)
throw new Error(`OpenSearch connection failed after ${maxRetries} retries: ${error.message}`)
}
const delay = baseDelay * Math.pow(2, attempt - 1) // Exponential backoff
console.warn(`⚠️ Connection attempt ${attempt} failed: ${error.message}`)
console.log(`⏳ Retrying in ${delay/1000} seconds...`)
await new Promise(resolve => setTimeout(resolve, delay))
}
}
}
/**
* Create index with optimized mapping for time-series anomaly data
*
* Mapping Documentation: https://opensearch.org/docs/latest/opensearch/mappings/
* Field Types: https://opensearch.org/docs/latest/opensearch/supported-field-types/
*/
async createIndex() {
// Define comprehensive mapping optimized for OpenSearch logs sample data structure
// This mapping follows the OpenSearch logs sample dataset conventions
// References:
// - OpenSearch Field Types: https://opensearch.org/docs/latest/field-types/supported-field-types/
// - ECS Field Reference: https://www.elastic.co/guide/en/ecs/current/ecs-field-reference.html
const mapping = {
settings: {
// Index settings optimized for log data
// https://opensearch.org/docs/latest/opensearch/index-settings/
number_of_shards: 1,
number_of_replicas: 0,
refresh_interval: '5s',
// Enable best compression for log data
'codec': 'best_compression',
// Analysis settings for log messages
analysis: {
analyzer: {
log_analyzer: {
type: 'custom',
tokenizer: 'standard',
filter: ['lowercase', 'stop']
}
}
}
},
mappings: {
// Dynamic mapping settings
// https://opensearch.org/docs/latest/opensearch/mappings/#dynamic-mapping
dynamic: 'strict',
properties: {
// Primary timestamp fields (OpenSearch logs standard)
'@timestamp': {
type: 'date',
format: 'strict_date_optional_time||epoch_millis'
},
timestamp: {
type: 'date',
format: 'strict_date_optional_time||epoch_millis'
},
// Core log fields matching OpenSearch sample logs structure
log_level: {
type: 'keyword',
index: true
},
message: {
type: 'text',
analyzer: 'log_analyzer',
fields: {
keyword: {
type: 'keyword',
ignore_above: 512
}
}
},
host: {
type: 'keyword',
index: true
},
service: {
type: 'keyword',
index: true
},
// Agent information (common in OpenSearch logs)
agent: {
type: 'object',
properties: {
type: {
type: 'keyword'
},
version: {
type: 'keyword'
}
}
},
// Custom fields for our metrics and anomaly data
fields: {
type: 'object',
properties: {
metric_type: {
type: 'keyword',
index: true
},
metric_value: {
type: 'double',
doc_values: true
},
is_anomaly: {
type: 'boolean'
},
anomaly_score: {
type: 'double',
meta: {
min: 0.0,
max: 1.0
}
},
anomaly_type: {
type: 'keyword'
}
}
},
// Cloud and infrastructure fields (ECS standard)
cloud: {
type: 'object',
properties: {
region: {
type: 'keyword'
},
provider: {
type: 'keyword'
}
}
},
// Container information
container: {
type: 'object',
properties: {
name: {
type: 'keyword'
}
}
},
// Tags for categorization
tags: {
type: 'keyword'
},
// Error information for anomalous logs
error: {
type: 'object',
properties: {
type: {
type: 'keyword'
},
message: {
type: 'text',
fields: {
keyword: {
type: 'keyword',
ignore_above: 256
}
}
}
}
}
}
}
}
try {
const response = await this.client.indices.create({
index: this.indexName,
body: mapping
})
console.log(`📋 Index mapping created following OpenSearch logs sample structure`)
return response.body
} catch (error) {
console.error('❌ Failed to create index:', error)
throw error
}
}
/**
* Insert documents in bulk for efficiency with progress tracking
*
* Bulk API Documentation: https://opensearch.org/docs/latest/api-reference/document-apis/bulk/
*
* @param {Array} documents - Array of documents to insert
* @param {number} batchSize - Number of documents per batch
*/
async bulkInsert(documents, batchSize = 100) {
console.log(`💾 Starting bulk insert of ${documents.length} documents in batches of ${batchSize}`)
let insertedCount = 0
let errorCount = 0
const totalBatches = Math.ceil(documents.length / batchSize)
const expectedDocuments = documents.length
// 🟡 Polling-style progress tracking with batch-level granularity
console.log(`📊 Processing ${totalBatches} batches...`)
for (let i = 0; i < documents.length; i += batchSize) {
const batch = documents.slice(i, i + batchSize)
const batchNumber = Math.floor(i / batchSize) + 1
// Show progress bar for current batch
const progressPercent = Math.round((batchNumber / totalBatches) * 100)
const progressBar = this.createProgressBar(progressPercent, 30)
process.stdout.write(`\r📊 ${progressBar} ${progressPercent}% (${batchNumber}/${totalBatches} batches)`)
const body = batch.flatMap(doc => [
{ index: { _index: this.indexName } },
doc
])
try {
const response = await this.executeWithTimeout(
() => this.client.bulk({ body }),
30000, // 30 second timeout for bulk operations
`Bulk insert batch ${batchNumber}`
)
if (response.body.errors) {
const errors = response.body.items.filter(item => item.index.error)
console.log(`\n⚠️ Batch ${batchNumber}: ${errors.length} errors out of ${batch.length} documents`)
errorCount += errors.length
errors.slice(0, 3).forEach(error => {
console.warn(` Error: ${error.index.error.reason}`)
})
} else {
insertedCount += batch.length
console.log(`\n✅ Batch ${batchNumber}: ${batch.length} documents inserted`)
}
} catch (error) {
console.error(`\n❌ Bulk Insert Network Error`)
console.error(` ├─ Batch: ${batchNumber}/${totalBatches}`)
console.error(` ├─ Cause: ${error.message}`)
console.error(` ├─ Impact: ${batch.length} documents not inserted, may affect anomaly detection training data`)
console.error(` └─ Recommendation: Check network stability and cluster performance`)
errorCount += batch.length
}
// Small delay between batches to avoid overwhelming the cluster
if (i + batchSize < documents.length) {
await new Promise(resolve => setTimeout(resolve, 100))
}
}
// Clear progress line and show final summary
process.stdout.write('\r' + ' '.repeat(80) + '\r')
console.log(`📊 Bulk insert completed: ${insertedCount} inserted, ${errorCount} errors`)
// Check for incomplete data ingestion that could affect anomaly detection
await this.validateDataIngestionForAnomalyDetection(insertedCount, expectedDocuments, errorCount)
await this.refreshIndex()
return { insertedCount, errorCount }
}
/**
* Validate data ingestion completeness for anomaly detection reliability
* Surfaces critical issues that could compromise ML model training
*
* @param {number} insertedCount - Successfully inserted documents
* @param {number} expectedDocuments - Total expected documents
* @param {number} errorCount - Failed document insertions
*/
async validateDataIngestionForAnomalyDetection(insertedCount, expectedDocuments, errorCount) {
const completionRate = insertedCount / expectedDocuments
const minimumThreshold = 0.85 // 85% minimum for reliable anomaly detection
if (completionRate < minimumThreshold) {
const missingDocuments = expectedDocuments - insertedCount
console.error(`\n❌ Incomplete Data Ingestion Error`)
console.error(` ├─ Cause: Only ${insertedCount} of ${expectedDocuments} expected documents inserted (${(completionRate * 100).toFixed(1)}% completion)`)
console.error(` ├─ Missing: ${missingDocuments} documents (${errorCount} errors, ${missingDocuments - errorCount} unprocessed)`)
console.error(` ├─ Impact: Insufficient training data for reliable anomaly detection (minimum ${(minimumThreshold * 100)}% required)`)
console.error(` └─ Recommendation: Check cluster resources, increase batch timeout, or reduce data volume`)
throw new Error(`Data ingestion incomplete: ${(completionRate * 100).toFixed(1)}% success rate below ${(minimumThreshold * 100)}% threshold`)
}
if (completionRate < 0.95) {
console.warn(`\n⚠️ Data Ingestion Warning`)
console.warn(` ├─ Status: ${insertedCount} of ${expectedDocuments} documents inserted (${(completionRate * 100).toFixed(1)}% completion)`)
console.warn(` ├─ Impact: Anomaly detection may have reduced accuracy with incomplete training data`)
console.warn(` └─ Recommendation: Monitor anomaly detection performance for reliability`)
}
}
/**
* Create a simple progress bar for terminal display
*
* @param {number} percent - Progress percentage (0-100)
* @param {number} width - Width of the progress bar
* @returns {string} Progress bar string
*/
createProgressBar(percent, width = 30) {
const filled = Math.round(width * percent / 100)
const empty = width - filled
const bar = '█'.repeat(filled) + '░'.repeat(empty)
return `[${bar}]`
}
return { insertedCount, errorCount }
}
/**
* Execute operation with timeout and descriptive error handling
*
* @param {Function} operation - Async operation to execute
* @param {number} timeoutMs - Timeout in milliseconds
* @param {string} operationName - Human-readable operation name for errors
* @returns {Promise} Operation result
*/
async executeWithTimeout(operation, timeoutMs, operationName) {
const timeoutPromise = new Promise((_, reject) => {
setTimeout(() => {
reject(new Error(`Network timeout after ${timeoutMs/1000}s during: ${operationName}`))
}, timeoutMs)
})
return Promise.race([operation(), timeoutPromise])
}
/**
* Refresh index to make documents immediately searchable with error handling
* https://opensearch.org/docs/latest/api-reference/index-apis/refresh/
*/
async refreshIndex() {
try {
console.log(`🔄 Refreshing index for immediate searchability...`)
await this.executeWithTimeout(
() => this.client.indices.refresh({ index: this.indexName }),
10000, // 10 second timeout for refresh
'Index refresh operation'
)
console.log(`✅ Index refreshed successfully`)
} catch (error) {
console.error(`\n❌ Index Refresh Network Error`)
console.error(` ├─ Cause: ${error.message}`)
console.error(` ├─ Impact: Documents may not be immediately searchable for anomaly detection`)
console.error(` └─ Recommendation: Wait for automatic refresh or retry manually`)
// Don't throw - this is a non-critical operation
console.warn(`⚠️ Continuing despite refresh failure - data should be available shortly`)
}
}
/**
* Get comprehensive index statistics
* https://opensearch.org/docs/latest/api-reference/stats-api/
*/
async getIndexStats() {
try {
// Get document count
const totalDocs = await this.client.count({ index: this.indexName })
// Count anomalies
const anomalies = await this.client.count({
index: this.indexName,
body: {
query: { term: { is_anomaly: true } }
}
})
// Get category breakdown using aggregations
// https://opensearch.org/docs/latest/opensearch/aggregations/
const categories = await this.client.search({
index: this.indexName,
body: {
size: 0, // Don't return documents, just aggregations
aggs: {
categories: {
terms: {
field: 'category',
size: 20 // Get top 20 categories
}
},
sources: {
terms: {
field: 'source',
size: 10 // Get top 10 sources
}
},
// Time range of data
time_range: {
min: { field: 'timestamp' },
max: { field: 'timestamp' }
}
}
}
})
const stats = {
total_documents: totalDocs.body.count,
total_anomalies: anomalies.body.count,
anomaly_percentage: totalDocs.body.count > 0
? ((anomalies.body.count / totalDocs.body.count) * 100).toFixed(2)
: 0,
categories: categories.body.aggregations.categories.buckets,
sources: categories.body.aggregations.sources.buckets,
time_range: {
min: categories.body.aggregations.time_range.min_as_string,
max: categories.body.aggregations.time_range.max_as_string
}
}
return stats
} catch (error) {
console.error('❌ Error getting index stats:', error)
throw error
}
}
/**
* Search for anomalies with flexible filtering
* https://opensearch.org/docs/latest/opensearch/query-dsl/
*
* @param {Object} options - Search options
* @param {string} options.category - Filter by metric category
* @param {Date} options.startTime - Start time for range filter
* @param {Date} options.endTime - End time for range filter
* @param {number} options.minAnomalyScore - Minimum anomaly score threshold
* @param {number} options.size - Maximum number of results
*/
async searchAnomalies(options = {}) {
const {
category = null,
startTime = null,
endTime = null,
minAnomalyScore = 0.7,
size = 100
} = options
// Build query using bool query for multiple conditions
// https://opensearch.org/docs/latest/opensearch/query-dsl/bool/
const query = {
bool: {
must: [
// Must be marked as anomaly
{ term: { is_anomaly: true } },
// Must meet minimum anomaly score
{ range: { anomaly_score: { gte: minAnomalyScore } } }
]
}
}
// Add optional filters
if (category) {
query.bool.must.push({ term: { category } })
}
if (startTime || endTime) {
const dateRange = {}
if (startTime) dateRange.gte = startTime
if (endTime) dateRange.lte = endTime
query.bool.must.push({ range: { timestamp: dateRange } })
}
try {
const response = await this.client.search({
index: this.indexName,
body: {
query,
// Sort by timestamp descending (newest first)
sort: [
{ timestamp: { order: 'desc' } },
{ anomaly_score: { order: 'desc' } }
],
size: size,
// Include only relevant fields in response
_source: [
'timestamp', 'metric_value', 'category', 'source',
'is_anomaly', 'anomaly_score', 'metadata'
]
}
})
return response.body.hits.hits.map(hit => hit._source)
} catch (error) {
console.error('❌ Error searching anomalies:', error)
throw error
}
}
/**
* Perform advanced search with custom query
* https://opensearch.org/docs/latest/opensearch/query-dsl/
*
* @param {Object} queryBody - Complete OpenSearch query body
*/
async search(queryBody) {
try {
const response = await this.client.search({
index: this.indexName,
body: queryBody
})
return response.body
} catch (error) {
console.error('❌ Search error:', error)
throw error
}
}
// ========================================
// ANOMALY DETECTION PLUGIN INTEGRATION
// ========================================
/**
* Create anomaly detector using OpenSearch AD plugin
* https://opensearch.org/docs/latest/monitoring-plugins/ad/api/
*
* @param {Object} detectorConfig - Detector configuration
* @param {string} detectorConfig.name - Detector name
* @param {string} detectorConfig.description - Detector description
* @param {string} detectorConfig.time_field - Timestamp field name
* @param {Array} detectorConfig.indices - Target indices
* @param {Array} detectorConfig.feature_attributes - Feature definitions
* @param {Object} detectorConfig.filter_query - Data filter query
* @param {Object} detectorConfig.detection_interval - Detection frequency
* @param {Object} detectorConfig.window_delay - Processing delay
*/
async createAnomalyDetector(detectorConfig) {
// Set default configuration values
const defaultConfig = {
name: `${this.indexName}-anomaly-detector`,
description: `Anomaly detector for ${this.indexName} metrics`,
time_field: 'timestamp',
indices: [this.indexName],
detection_interval: {
period: {
interval: 5,
unit: 'Minutes'
}
},
window_delay: {
period: {
interval: 1,
unit: 'Minutes'
}
}
}
// Merge with provided configuration
const config = { ...defaultConfig, ...detectorConfig }
try {
const response = await this.client.transport.request({
method: 'POST',
path: '/_plugins/_anomaly_detection/detectors',
body: config
})
console.log(`🔍 Anomaly detector created with ID: ${response.body._id}`)
console.log(` Name: ${config.name}`)
console.log(` Features: ${config.feature_attributes?.length || 0}`)
console.log(` Detection Interval: ${config.detection_interval.period.interval} ${config.detection_interval.period.unit}`)
return response.body
} catch (error) {
console.error('❌ Error creating anomaly detector:', error)
// Provide helpful error context
if (error.message.includes('Invalid field')) {
console.error('💡 Check that all field names in feature_attributes exist in your index mapping')
} else if (error.message.includes('permission')) {
console.error('💡 Ensure your user has anomaly detection permissions')
}
throw error
}
}
/**
* Start anomaly detector
* https://opensearch.org/docs/latest/monitoring-plugins/ad/api/#start-detector
*
* @param {string} detectorId - Detector ID to start
*/
async startAnomalyDetector(detectorId) {
try {
const response = await this.client.transport.request({
method: 'POST',
path: `/_plugins/_anomaly_detection/detectors/${detectorId}/_start`
})
console.log(`▶️ Detector ${detectorId} started successfully`)
return response.body
} catch (error) {
console.error('❌ Error starting detector:', error)
if (error.message.includes('already')) {
console.log('ℹ️ Detector is already running')
}
throw error
}
}
/**
* Stop anomaly detector
* https://opensearch.org/docs/latest/monitoring-plugins/ad/api/#stop-detector
*
* @param {string} detectorId - Detector ID to stop
*/
async stopAnomalyDetector(detectorId) {
try {
const response = await this.client.transport.request({
method: 'POST',
path: `/_plugins/_anomaly_detection/detectors/${detectorId}/_stop`
})
console.log(`⏹️ Detector ${detectorId} stopped successfully`)
return response.body
} catch (error) {
console.error('❌ Error stopping detector:', error)
throw error
}
}
/**
* Get anomaly detection results
* https://opensearch.org/docs/latest/monitoring-plugins/ad/api/#get-detector-results
*
* @param {string} detectorId - Detector ID
* @param {Object} options - Query options
* @param {number} options.startTime - Start time (epoch milliseconds)
* @param {number} options.endTime - End time (epoch milliseconds)
* @param {number} options.size - Maximum results to return
*/
async getDetectorResults(detectorId, options = {}) {
const {
startTime = new Date(Date.now() - 24 * 60 * 60 * 1000).getTime(),
endTime = new Date().getTime(),
size = 100
} = options
try {
const response = await this.client.transport.request({
method: 'POST',
path: `/_plugins/_anomaly_detection/detectors/${detectorId}/results`,
body: {
period_start: startTime,
period_end: endTime,
size: size
}
})
console.log(`📊 Retrieved ${response.body.anomaly_result?.length || 0} detection results`)
return response.body
} catch (error) {
console.error('❌ Error getting detector results:', error)
throw error
}
}
/**
* List all anomaly detectors
* https://opensearch.org/docs/latest/monitoring-plugins/ad/api/#search-detector
*/
async listAnomalyDetectors() {
try {
const response = await this.client.transport.request({
method: 'GET',
path: '/_plugins/_anomaly_detection/detectors/_search',
body: {
query: { match_all: {} },
size: 100
}
})
const detectors = response.body.hits.hits.map(hit => ({
id: hit._id,
...hit._source
}))
console.log(`📋 Found ${detectors.length} anomaly detectors`)
return detectors
} catch (error) {
console.error('❌ Error listing detectors:', error)
throw error
}
}
/**
* Delete anomaly detector
* https://opensearch.org/docs/latest/monitoring-plugins/ad/api/#delete-detector
*
* @param {string} detectorId - Detector ID to delete
*/
async deleteAnomalyDetector(detectorId) {
try {
// Stop detector first if it's running
try {
await this.stopAnomalyDetector(detectorId)
// Wait for detector to fully stop
await new Promise(resolve => setTimeout(resolve, 2000))
} catch (stopError) {
// Detector might already be stopped, continue with deletion
console.log('ℹ️ Detector may already be stopped')
}
const response = await this.client.transport.request({
method: 'DELETE',
path: `/_plugins/_anomaly_detection/detectors/${detectorId}`
})
console.log(`🗑️ Detector ${detectorId} deleted successfully`)
return response.body
} catch (error) {
console.error('❌ Error deleting detector:', error)
throw error
}
}
/**
* Get detector profile and status information
* https://opensearch.org/docs/latest/monitoring-plugins/ad/api/#profile-detector
*
* @param {string} detectorId - Detector ID
*/
async getDetectorProfile(detectorId) {
try {
const response = await this.client.transport.request({
method: 'GET',
path: `/_plugins/_anomaly_detection/detectors/${detectorId}/_profile`
})
return response.body
} catch (error) {
console.error('❌ Error getting detector profile:', error)
throw error
}
}
/**
* Preview detector configuration before creation
* https://opensearch.org/docs/latest/monitoring-plugins/ad/api/#preview-detector
*
* @param {Object} detectorConfig - Detector configuration to preview
*/
async previewDetector(detectorConfig) {
try {
const response = await this.client.transport.request({
method: 'POST',
path: '/_plugins/_anomaly_detection/detectors/_preview',
body: detectorConfig
})
return response.body
} catch (error) {
console.error('❌ Error previewing detector:', error)
throw error
}
}
/**
* Run historical analysis on past data
* https://opensearch.org/docs/latest/monitoring-plugins/ad/api/#run-detector
*
* @param {string} detectorId - Detector ID
* @param {Object} options - Analysis options
* @param {number} options.startTime - Start time (epoch milliseconds)
* @param {number} options.endTime - End time (epoch milliseconds)
*/
async runHistoricalAnalysis(detectorId, options = {}) {
const {
startTime = new Date(Date.now() - 7 * 24 * 60 * 60 * 1000).getTime(),
endTime = new Date().getTime()
} = options
try {
const response = await this.client.transport.request({
method: 'POST',
path: `/_plugins/_anomaly_detection/detectors/${detectorId}/_run`,
body: {
period_start: startTime,
period_end: endTime
}
})
console.log(`📈 Historical analysis started for detector ${detectorId}`)
console.log(` Time range: ${new Date(startTime).toISOString()} to ${new Date(endTime).toISOString()}`)
return response.body
} catch (error) {
console.error('❌ Error running historical analysis:', error)
throw error
}
}
/**
* Delete index and all its data
* https://opensearch.org/docs/latest/api-reference/index-apis/delete-index/
*/
async deleteIndex() {
try {
const response = await this.client.indices.delete({
index: this.indexName
})
console.log(`🗑️ Index ${this.indexName} deleted successfully`)
return response.body
} catch (error) {
console.error('❌ Error deleting index:', error)
throw error
}
}
}
module.exports = OpenSearchClient
{
"name": "opensearch-anomaly-detection",
"version": "1.0.0",
"description": "Node.js application for generating time-series data with anomalies and testing OpenSearch anomaly detection capabilities",
"main": "index.js",
"bin": {
"opensearch-anomaly": "./index.js"
},
"scripts": {
"start": "node index.js",
"basic": "node index.js basic",
"advanced": "node index.js advanced",
"generate": "node index.js generate",
"detect": "node index.js detect",
"help": "node index.js help",
"test": "node test/test-runner.js",
"lint": "eslint *.js",
"format": "prettier --write *.js"
},
"keywords": [
"opensearch",
"anomaly-detection",
"time-series",
"monitoring",
"elasticsearch",
"machine-learning",
"data-generation"
],
"author": "OpenSearch Anomaly Detection Demo",
"license": "MIT",
"dependencies": {
"@opensearch-project/opensearch": "^2.3.0",
"commander": "^9.4.1",
"chalk": "^4.1.2",
"dotenv": "^16.0.3"
},
"devDependencies": {
"eslint": "^8.0.0",
"prettier": "^2.8.0"
},
"engines": {
"node": ">=14.0.0"
},
"repository": {
"type": "git",
"url": "https://github.com/your-org/opensearch-anomaly-detection.git"
},
"bugs": {
"url": "https://github.com/your-org/opensearch-anomaly-detection/issues"
},
"homepage": "https://github.com/your-org/opensearch-anomaly-detection#readme"
}
/**
* Progress Tracker Utility
*
* Simple utility for tracking and displaying progress of OpenSearch operations
* that report percentage completion through their APIs.
*/
class ProgressTracker {
constructor() {
this.isActive = false
this.currentOperation = null
}
/**
* Track progress for an operation that reports percentage completion
*
* @param {Function} progressCheckFn - Function that returns progress info
* @param {Object} options - Tracking options
* @param {string} options.operationName - Display name for the operation
* @param {number} options.pollIntervalMs - How often to check progress (default 5000ms)
* @param {number} options.timeoutMs - Maximum time to wait (default 300000ms)
*/
async trackProgress(progressCheckFn, options = {}) {
const {
operationName = 'Operation',
pollIntervalMs = 5000,
timeoutMs = 300000 // 5 minutes default
} = options
this.isActive = true
this.currentOperation = operationName
console.log(`🔄 ${operationName} started - tracking progress...`)
const startTime = Date.now()
while (this.isActive && (Date.now() - startTime) < timeoutMs) {
try {
const progressInfo = await progressCheckFn()
if (progressInfo.percentage !== undefined) {
// 💚 We have percentage data - show progress bar
const progressBar = this.createProgressBar(progressInfo.percentage, 30)
const elapsed = Math.round((Date.now() - startTime) / 1000)
process.stdout.write(`\r🔄 ${progressBar} ${progressInfo.percentage}% - ${operationName} (${elapsed}s)`)
if (progressInfo.percentage >= 100) {
process.stdout.write('\n')
console.log(`✅ ${operationName} completed successfully`)
this.isActive = false
return progressInfo
}
} else if (progressInfo.status) {
// 🟡 We have status but no percentage - show text updates
const elapsed = Math.round((Date.now() - startTime) / 1000)
process.stdout.write(`\r🔄 ${operationName}: ${progressInfo.status} (${elapsed}s)`)
} else {
// 🟥 No progress info available - show basic status
const elapsed = Math.round((Date.now() - startTime) / 1000)
process.stdout.write(`\r🔄 ${operationName} in progress... (${elapsed}s)`)
}
if (progressInfo.completed) {
process.stdout.write('\n')
console.log(`✅ ${operationName} completed`)
this.isActive = false
return progressInfo
}
if (progressInfo.error) {
process.stdout.write('\n')
console.error(`❌ ${operationName} failed: ${progressInfo.error}`)
this.isActive = false
throw new Error(`${operationName} failed: ${progressInfo.error}`)
}
// Wait before next poll
await new Promise(resolve => setTimeout(resolve, pollIntervalMs))
} catch (error) {
process.stdout.write('\n')
console.error(`❌ Error tracking ${operationName} progress: ${error.message}`)
this.isActive = false
throw error
}
}
// Timeout reached
process.stdout.write('\n')
console.warn(`⚠️ ${operationName} progress tracking timed out after ${timeoutMs/1000}s`)
this.isActive = false
}
/**
* Create a simple progress bar for terminal display
*
* @param {number} percent - Progress percentage (0-100)
* @param {number} width - Width of the progress bar
* @returns {string} Progress bar string
*/
createProgressBar(percent, width = 30) {
const filled = Math.round(width * percent / 100)
const empty = width - filled
const bar = '█'.repeat(filled) + '░'.repeat(empty)
return `[${bar}]`
}
/**
* Stop progress tracking
*/
stop() {
this.isActive = false
if (this.currentOperation) {
process.stdout.write('\n')
console.log(`⏹️ ${this.currentOperation} progress tracking stopped`)
}
}
}
module.exports = ProgressTracker
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment