Skip to content

Instantly share code, notes, and snippets.

@makasim
Last active June 3, 2026 15:56
Show Gist options
  • Select an option

  • Save makasim/8abd0504e71e8be74b3d851281117cae to your computer and use it in GitHub Desktop.

Select an option

Save makasim/8abd0504e71e8be74b3d851281117cae to your computer and use it in GitHub Desktop.
package mdx
import (
"flag"
"strconv"
"sync"
"sync/atomic"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb"
"github.com/VictoriaMetrics/metrics"
)
var (
mdxInstanceEntryTTL = flagutil.NewExtendedDuration("mdx.instanceEntryTTL", "1h", "After not receiving metrics for the VictoriaMetrics instance for the configured time, remove this instance from the MDX instance list."+
"It should be several times the scrape interval for VictoriaMetrics instances. The cleanup mechanism helps release memory after a VictoriaMetrics instance is permanently taken offline, preventing the MDX instance list from growing indefinitely."+
"It must be explicitly set when -remoteWrite.mdx.enable is set and requires explicit unit suffixes (s, m, h, d, w, y). Please see https://docs.victoriametrics.com/victoriametrics/vmagent/#monitoring-data-exchange")
keepMetricsWithLabelName = flag.String("mdx.keepMetricsWithLabel.name", "", "Keep metrics containing specific label and label value to the `-remoteWrite.url` that configured with `-remoteWrite.mdx.enable=true`. "+
"See also -mdx.keepMetricsWithLabel.value.")
keepMetricsWithLabelValue = flag.String("mdx.keepMetricsWithLabel.value", "", "Keep metrics containing specific label and label value to the `-remoteWrite.url` that configured with `-remoteWrite.mdx.enable=true`. "+
"See also -mdx.keepMetricsWithLabel.name")
)
// Filter manages the list of VictoriaMetrics instances discovered from previous data flow, and uses it to filter out metrics that are not from VictoriaMetrics instances.
type Filter struct {
mu sync.RWMutex
wg sync.WaitGroup
stopCh chan struct{}
vmInstance map[string]*atomic.Int64
filterByLabel bool
}
var GlobalFilter *Filter
func InitGlobalFilter() {
GlobalFilter = &Filter{
vmInstance: make(map[string]*atomic.Int64),
stopCh: make(chan struct{}),
}
if len(*keepMetricsWithLabelName) > 0 && len(*keepMetricsWithLabelValue) > 0 {
GlobalFilter.filterByLabel = true
} else if len(*keepMetricsWithLabelName) > 0 || len(*keepMetricsWithLabelValue) > 0 {
logger.Fatalf("Both -mdx.keepMetricsWithLabel.name and -mdx.keepMetricsWithLabel.value must be set if one of them is set.")
}
_ = metrics.NewGauge("vmagent_mdx_tracked_vm_instances", func() float64 {
GlobalFilter.mu.RLock()
n := len(GlobalFilter.vmInstance)
GlobalFilter.mu.RUnlock()
return float64(n)
})
if mdxInstanceEntryTTL.Milliseconds() != 0 {
GlobalFilter.wg.Go(GlobalFilter.cleanStale)
}
}
func (filter *Filter) cleanStale() {
ttlSec := int64(mdxInstanceEntryTTL.Duration().Seconds())
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
filter.mu.Lock()
currTs := time.Now().Unix()
dst := make(map[string]*atomic.Int64, len(filter.vmInstance))
for k, v := range filter.vmInstance {
if currTs-v.Load() < ttlSec {
dst[k] = v
}
}
if len(dst) != len(filter.vmInstance) {
filter.vmInstance = dst
}
filter.mu.Unlock()
case <-filter.stopCh:
return
}
}
}
func (filter *Filter) MustStop() {
if filter == nil {
return
}
close(filter.stopCh)
filter.wg.Wait()
}
func (filter *Filter) Filter(tss []prompb.TimeSeries, resTss []prompb.TimeSeries) []prompb.TimeSeries {
currTs := time.Now().Unix()
var identicalKey []byte
nextTss:
for _, ts := range tss {
var hasVersionLabel, triedJobInstance bool
var job, instance string
for _, label := range ts.Labels {
if filter.filterByLabel && label.Name == *keepMetricsWithLabelName && label.Value == *keepMetricsWithLabelValue {
resTss = append(resTss, ts)
continue nextTss
}
if label.Name == "__name__" && label.Value == "vm_app_version" {
hasVersionLabel = true
}
if instance == "" && label.Name == "instance" {
if label.Value == "" {
continue
}
instance = label.Value
}
if job == "" && label.Name == "job" {
if label.Value == "" {
continue
}
job = label.Value
}
if !triedJobInstance && job != "" && instance != "" {
identicalKey = identicalKey[:0]
identicalKey = strconv.AppendQuote(identicalKey, job)
identicalKey = append(identicalKey, ':')
identicalKey = strconv.AppendQuote(identicalKey, instance)
filter.mu.RLock()
ptr, found := filter.vmInstance[bytesutil.ToUnsafeString(identicalKey)]
filter.mu.RUnlock()
if found {
ptr.Store(currTs)
resTss = append(resTss, ts)
continue nextTss
}
triedJobInstance = true
}
if hasVersionLabel && job != "" && instance != "" {
identicalKey = identicalKey[:0]
identicalKey = strconv.AppendQuote(identicalKey, job)
identicalKey = append(identicalKey, ':')
identicalKey = strconv.AppendQuote(identicalKey, instance)
v := &atomic.Int64{}
v.Store(currTs)
filter.mu.Lock()
filter.vmInstance[string(identicalKey)] = v
filter.mu.Unlock()
continue nextTss
}
}
}
return resTss
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment