Last active
June 3, 2026 15:56
-
-
Save makasim/8abd0504e71e8be74b3d851281117cae to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| package mdx | |
| import ( | |
| "flag" | |
| "strconv" | |
| "sync" | |
| "sync/atomic" | |
| "time" | |
| "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" | |
| "github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil" | |
| "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" | |
| "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb" | |
| "github.com/VictoriaMetrics/metrics" | |
| ) | |
| var ( | |
| mdxInstanceEntryTTL = flagutil.NewExtendedDuration("mdx.instanceEntryTTL", "1h", "After not receiving metrics for the VictoriaMetrics instance for the configured time, remove this instance from the MDX instance list."+ | |
| "It should be several times the scrape interval for VictoriaMetrics instances. The cleanup mechanism helps release memory after a VictoriaMetrics instance is permanently taken offline, preventing the MDX instance list from growing indefinitely."+ | |
| "It must be explicitly set when -remoteWrite.mdx.enable is set and requires explicit unit suffixes (s, m, h, d, w, y). Please see https://docs.victoriametrics.com/victoriametrics/vmagent/#monitoring-data-exchange") | |
| keepMetricsWithLabelName = flag.String("mdx.keepMetricsWithLabel.name", "", "Keep metrics containing specific label and label value to the `-remoteWrite.url` that configured with `-remoteWrite.mdx.enable=true`. "+ | |
| "See also -mdx.keepMetricsWithLabel.value.") | |
| keepMetricsWithLabelValue = flag.String("mdx.keepMetricsWithLabel.value", "", "Keep metrics containing specific label and label value to the `-remoteWrite.url` that configured with `-remoteWrite.mdx.enable=true`. "+ | |
| "See also -mdx.keepMetricsWithLabel.name") | |
| ) | |
| // Filter manages the list of VictoriaMetrics instances discovered from previous data flow, and uses it to filter out metrics that are not from VictoriaMetrics instances. | |
| type Filter struct { | |
| mu sync.RWMutex | |
| wg sync.WaitGroup | |
| stopCh chan struct{} | |
| vmInstance map[string]*atomic.Int64 | |
| filterByLabel bool | |
| } | |
| var GlobalFilter *Filter | |
| func InitGlobalFilter() { | |
| GlobalFilter = &Filter{ | |
| vmInstance: make(map[string]*atomic.Int64), | |
| stopCh: make(chan struct{}), | |
| } | |
| if len(*keepMetricsWithLabelName) > 0 && len(*keepMetricsWithLabelValue) > 0 { | |
| GlobalFilter.filterByLabel = true | |
| } else if len(*keepMetricsWithLabelName) > 0 || len(*keepMetricsWithLabelValue) > 0 { | |
| logger.Fatalf("Both -mdx.keepMetricsWithLabel.name and -mdx.keepMetricsWithLabel.value must be set if one of them is set.") | |
| } | |
| _ = metrics.NewGauge("vmagent_mdx_tracked_vm_instances", func() float64 { | |
| GlobalFilter.mu.RLock() | |
| n := len(GlobalFilter.vmInstance) | |
| GlobalFilter.mu.RUnlock() | |
| return float64(n) | |
| }) | |
| if mdxInstanceEntryTTL.Milliseconds() != 0 { | |
| GlobalFilter.wg.Go(GlobalFilter.cleanStale) | |
| } | |
| } | |
| func (filter *Filter) cleanStale() { | |
| ttlSec := int64(mdxInstanceEntryTTL.Duration().Seconds()) | |
| ticker := time.NewTicker(5 * time.Second) | |
| defer ticker.Stop() | |
| for { | |
| select { | |
| case <-ticker.C: | |
| filter.mu.Lock() | |
| currTs := time.Now().Unix() | |
| dst := make(map[string]*atomic.Int64, len(filter.vmInstance)) | |
| for k, v := range filter.vmInstance { | |
| if currTs-v.Load() < ttlSec { | |
| dst[k] = v | |
| } | |
| } | |
| if len(dst) != len(filter.vmInstance) { | |
| filter.vmInstance = dst | |
| } | |
| filter.mu.Unlock() | |
| case <-filter.stopCh: | |
| return | |
| } | |
| } | |
| } | |
| func (filter *Filter) MustStop() { | |
| if filter == nil { | |
| return | |
| } | |
| close(filter.stopCh) | |
| filter.wg.Wait() | |
| } | |
| func (filter *Filter) Filter(tss []prompb.TimeSeries, resTss []prompb.TimeSeries) []prompb.TimeSeries { | |
| currTs := time.Now().Unix() | |
| var identicalKey []byte | |
| nextTss: | |
| for _, ts := range tss { | |
| var hasVersionLabel, triedJobInstance bool | |
| var job, instance string | |
| for _, label := range ts.Labels { | |
| if filter.filterByLabel && label.Name == *keepMetricsWithLabelName && label.Value == *keepMetricsWithLabelValue { | |
| resTss = append(resTss, ts) | |
| continue nextTss | |
| } | |
| if label.Name == "__name__" && label.Value == "vm_app_version" { | |
| hasVersionLabel = true | |
| } | |
| if instance == "" && label.Name == "instance" { | |
| if label.Value == "" { | |
| continue | |
| } | |
| instance = label.Value | |
| } | |
| if job == "" && label.Name == "job" { | |
| if label.Value == "" { | |
| continue | |
| } | |
| job = label.Value | |
| } | |
| if !triedJobInstance && job != "" && instance != "" { | |
| identicalKey = identicalKey[:0] | |
| identicalKey = strconv.AppendQuote(identicalKey, job) | |
| identicalKey = append(identicalKey, ':') | |
| identicalKey = strconv.AppendQuote(identicalKey, instance) | |
| filter.mu.RLock() | |
| ptr, found := filter.vmInstance[bytesutil.ToUnsafeString(identicalKey)] | |
| filter.mu.RUnlock() | |
| if found { | |
| ptr.Store(currTs) | |
| resTss = append(resTss, ts) | |
| continue nextTss | |
| } | |
| triedJobInstance = true | |
| } | |
| if hasVersionLabel && job != "" && instance != "" { | |
| identicalKey = identicalKey[:0] | |
| identicalKey = strconv.AppendQuote(identicalKey, job) | |
| identicalKey = append(identicalKey, ':') | |
| identicalKey = strconv.AppendQuote(identicalKey, instance) | |
| v := &atomic.Int64{} | |
| v.Store(currTs) | |
| filter.mu.Lock() | |
| filter.vmInstance[string(identicalKey)] = v | |
| filter.mu.Unlock() | |
| continue nextTss | |
| } | |
| } | |
| } | |
| return resTss | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment