Created
February 10, 2024 12:22
-
-
Save srikanthccv/882027fe4229cef8fa403f28fe8babd0 to your computer and use it in GitHub Desktop.
Script to compare delta and cumulative
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"context" | |
"fmt" | |
"time" | |
"github.com/ClickHouse/clickhouse-go/v2" | |
) | |
const tracesPxx = `SELECT | |
serviceName, | |
toStartOfInterval(timestamp, toIntervalSecond(60)) AS ts, | |
quantile(%.2f)(durationNano) / 1000000. AS value | |
FROM signoz_traces.distributed_signoz_index_v2 | |
WHERE ((timestamp >= '1706788020000000000') AND (timestamp <= '1706789820000000000')) | |
GROUP BY | |
serviceName, | |
ts | |
ORDER BY | |
serviceName ASC, | |
ts ASC | |
` | |
const metricsPxxCumulative = `SELECT | |
service_name, | |
ts, | |
histogramQuantile(arrayMap(x -> toFloat64(x), groupArray(le)), groupArray(value), %.2f) AS value | |
FROM | |
( | |
SELECT | |
service_name, | |
le, | |
ts, | |
sum(rate_value) AS value | |
FROM | |
( | |
SELECT | |
service_name, | |
le, | |
ts, | |
If((value - lagInFrame(value, 1, 0) OVER rate_window) < 0, nan, If((ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window) >= 86400, nan, (value - lagInFrame(value, 1, 0) OVER rate_window) / (ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window))) AS rate_value | |
FROM | |
( | |
SELECT | |
fingerprint, | |
service_name, | |
le, | |
toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), toIntervalSecond(60)) AS ts, | |
max(value) AS value | |
FROM signoz_metrics.distributed_samples_v2 | |
INNER JOIN | |
( | |
SELECT | |
JSONExtractString(labels, 'service_name') AS service_name, | |
JSONExtractString(labels, 'le') AS le, | |
fingerprint | |
FROM signoz_metrics.time_series_v2 | |
WHERE (metric_name = 'signoz_latency_bucket') AND (temporality IN ['Cumulative', 'Unspecified']) AND (JSONExtractString(labels, 'deployment_environment') = 'prod') | |
) AS filtered_time_series USING (fingerprint) | |
WHERE (metric_name = 'signoz_latency_bucket') AND (timestamp_ms >= 1706788020000) AND (timestamp_ms < 1706789820000) | |
GROUP BY | |
fingerprint, | |
service_name, | |
le, | |
ts | |
ORDER BY | |
fingerprint ASC, | |
service_name ASC, | |
le ASC, | |
ts ASC | |
) | |
WINDOW rate_window AS (PARTITION BY fingerprint, service_name, le ORDER BY fingerprint ASC, service_name ASC, le ASC, ts ASC) | |
) | |
WHERE isNaN(rate_value) = 0 | |
GROUP BY | |
GROUPING SETS ( | |
(service_name, le, ts), | |
(service_name, le)) | |
ORDER BY | |
service_name ASC, | |
le ASC, | |
ts ASC | |
) | |
GROUP BY | |
service_name, | |
ts | |
ORDER BY | |
service_name ASC, | |
ts ASC | |
` | |
const metricsPxxDelta = `SELECT | |
service_name, | |
ts, | |
histogramQuantile(arrayMap(x -> toFloat64(x), groupArray(le)), groupArray(value), %.2f) AS value | |
FROM | |
( | |
SELECT | |
service_name, | |
le, | |
toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), toIntervalSecond(60)) AS ts, | |
sum(value)/60 AS value | |
FROM signoz_metrics.distributed_samples_v2 | |
INNER JOIN | |
( | |
SELECT DISTINCT | |
JSONExtractString(labels, 'service_name') AS service_name, | |
JSONExtractString(labels, 'le') AS le, | |
fingerprint | |
FROM signoz_metrics.time_series_v2 | |
WHERE (metric_name = 'signoz_latency_bucket') AND (temporality IN ['Delta']) AND (JSONExtractString(labels, 'deployment_environment') = 'prod') | |
) AS filtered_time_series USING (fingerprint) | |
WHERE (metric_name = 'signoz_latency_bucket') AND (timestamp_ms >= 1706788020000) AND (timestamp_ms < 1706789820000) | |
GROUP BY | |
service_name, | |
le, | |
ts | |
ORDER BY | |
service_name ASC, | |
le, | |
ts ASC | |
) | |
GROUP BY | |
service_name, | |
ts | |
ORDER BY | |
service_name ASC, | |
ts ASC | |
` | |
func main() { | |
conn, err := clickhouse.Open(&clickhouse.Options{Addr: []string{"localhost:9000"}}) | |
if err != nil { | |
panic(err) | |
} | |
tracesLatencyByServiceName := make(map[string]map[time.Time]float64) | |
metricsCumulativeLatencyByServiceName := make(map[string]map[time.Time]float64) | |
metricsDeltaLatencyByServiceName := make(map[string]map[time.Time]float64) | |
for _, quantile := range []float64{0.99} { | |
// traces | |
query := fmt.Sprintf(tracesPxx, quantile) | |
rows, err := conn.Query(context.Background(), query) | |
if err != nil { | |
panic(err) | |
} | |
for rows.Next() { | |
var ( | |
serviceName string | |
ts time.Time | |
value float64 | |
) | |
if err := rows.Scan(&serviceName, &ts, &value); err != nil { | |
panic(err) | |
} | |
if _, ok := tracesLatencyByServiceName[serviceName]; !ok { | |
tracesLatencyByServiceName[serviceName] = make(map[time.Time]float64) | |
} | |
tracesLatencyByServiceName[serviceName][ts] = value | |
} | |
rows.Close() | |
if err := rows.Err(); err != nil { | |
panic(err) | |
} | |
// cumulative | |
metricsCumulativeQuery := fmt.Sprintf(metricsPxxCumulative, quantile) | |
rows, err = conn.Query(context.Background(), metricsCumulativeQuery) | |
if err != nil { | |
panic(err) | |
} | |
for rows.Next() { | |
var ( | |
serviceName string | |
ts time.Time | |
value float64 | |
) | |
if err := rows.Scan(&serviceName, &ts, &value); err != nil { | |
panic(err) | |
} | |
if _, ok := metricsCumulativeLatencyByServiceName[serviceName]; !ok { | |
metricsCumulativeLatencyByServiceName[serviceName] = make(map[time.Time]float64) | |
} | |
metricsCumulativeLatencyByServiceName[serviceName][ts] = value | |
} | |
rows.Close() | |
if err := rows.Err(); err != nil { | |
panic(err) | |
} | |
// delta | |
metricsDeltaQuery := fmt.Sprintf(metricsPxxDelta, quantile) | |
rows, err = conn.Query(context.Background(), metricsDeltaQuery) | |
if err != nil { | |
panic(err) | |
} | |
for rows.Next() { | |
var ( | |
serviceName string | |
ts time.Time | |
value float64 | |
) | |
if err := rows.Scan(&serviceName, &ts, &value); err != nil { | |
panic(err) | |
} | |
if _, ok := metricsDeltaLatencyByServiceName[serviceName]; !ok { | |
metricsDeltaLatencyByServiceName[serviceName] = make(map[time.Time]float64) | |
} | |
metricsDeltaLatencyByServiceName[serviceName][ts] = value | |
} | |
rows.Close() | |
if err := rows.Err(); err != nil { | |
panic(err) | |
} | |
} | |
for serviceName := range metricsCumulativeLatencyByServiceName { | |
for ts, value := range metricsCumulativeLatencyByServiceName[serviceName] { | |
deltaValue := metricsDeltaLatencyByServiceName[serviceName][ts] | |
tracesValue := tracesLatencyByServiceName[serviceName][ts] | |
fmt.Printf("%s || %v || %f || %f || %f\n", serviceName, ts, value, deltaValue, tracesValue) | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment