This gist contains code samples for the blog post "How we compress Pub/Sub messages and more, saving a load of money".
-
-
Save pyadav/92f7eafcdea2ff633ca335e84e339ec7 to your computer and use it in GitHub Desktop.
How we compress Pub/Sub messages and more, saving a load of money
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
var ( | |
exportPubsubWriteCompressionRatio = promauto.NewHistogram( | |
prometheus.HistogramOpts{ | |
Name: "elastic_toolbox_export_pubsub_write_compression_ratio", | |
Help: "Distribution of compression ratio", | |
Buckets: prometheus.LinearBuckets(0.1, 0.1, 10), // 0.0 -> 1.0 | |
}, | |
) | |
exportPubsubWriteCompressDurationSeconds = promauto.NewHistogram( | |
prometheus.HistogramOpts{ | |
Name: "elastic_toolbox_export_pubsub_write_compress_duration_seconds", | |
Help: "Distribution of time taken to compress hits", | |
Buckets: prometheus.ExponentialBuckets(0.0625, 2, 8), // 0.0625 -> 16s | |
}, | |
) | |
) | |
// compress applies gzip compression to the incoming data, and instruments | |
// compression efficiency. | |
func (f *pubsubExportTarget) compress(data []byte) ([]byte, error) { | |
defer prometheus.NewTimer(prometheus.ObserverFunc(func(v float64) { | |
exportPubsubWriteCompressDurationSeconds.Observe(v) | |
})).ObserveDuration() | |
var buffer bytes.Buffer | |
zw := gzip.NewWriter(&buffer) | |
if _, err := zw.Write(data); err != nil { | |
return nil, err | |
} | |
if err := zw.Close(); err != nil { | |
return nil, err | |
} | |
compressed := buffer.Bytes() | |
exportPubsubWriteCompressionRatio.Observe( | |
float64(len(compressed)) / float64(len(data))) | |
return compressed, nil | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Publish takes a message and publishes it to the Pub/Sub topic. If | |
// compression is enabled, the message payload is compressed, and the | |
// message is marked with a compress=true attribute. | |
func (f *pubsubExportTarget) Publish(ctx context.Context, msg Message) error { | |
data, _ := json.Marshal(msg) | |
if f.opt.Compress { | |
data, _ = f.compress(data) | |
} | |
// enqueue marks a message as available to be sent, passing it | |
// to the Pub/Sub client | |
f.enqueue(ctx, &pubsub.Message{ | |
Data: data, | |
Attributes: map[string]string{ | |
"compress": fmt.Sprintf("%v", f.opt.Compress), | |
}, | |
}) | |
return nil | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment