This gist contains code samples for the blog post "How we compress Pub/Sub messages and more, saving a load of money".
Last active
October 28, 2022 09:00
-
-
Save lawrencejones/25fda1c6c4d945bef0570eed1e80c9a2 to your computer and use it in GitHub Desktop.
How we compress Pub/Sub messages and more, saving a load of money
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
var ( | |
exportPubsubWriteCompressionRatio = promauto.NewHistogram( | |
prometheus.HistogramOpts{ | |
Name: "elastic_toolbox_export_pubsub_write_compression_ratio", | |
Help: "Distribution of compression ratio", | |
Buckets: prometheus.LinearBuckets(0.1, 0.1, 10), // 0.0 -> 1.0 | |
}, | |
) | |
exportPubsubWriteCompressDurationSeconds = promauto.NewHistogram( | |
prometheus.HistogramOpts{ | |
Name: "elastic_toolbox_export_pubsub_write_compress_duration_seconds", | |
Help: "Distribution of time taken to compress hits", | |
Buckets: prometheus.ExponentialBuckets(0.0625, 2, 8), // 0.0625 -> 16s | |
}, | |
) | |
) | |
// compress applies gzip compression to the incoming data, and instruments | |
// compression efficiency. | |
func (f *pubsubExportTarget) compress(data []byte) ([]byte, error) { | |
defer prometheus.NewTimer(prometheus.ObserverFunc(func(v float64) { | |
exportPubsubWriteCompressDurationSeconds.Observe(v) | |
})).ObserveDuration() | |
var buffer bytes.Buffer | |
zw := gzip.NewWriter(&buffer) | |
if _, err := zw.Write(data); err != nil { | |
return nil, err | |
} | |
if err := zw.Close(); err != nil { | |
return nil, err | |
} | |
compressed := buffer.Bytes() | |
exportPubsubWriteCompressionRatio.Observe( | |
float64(len(compressed)) / float64(len(data))) | |
return compressed, nil | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Publish takes a message and publishes it to the Pub/Sub topic. If | |
// compression is enabled, the message payload is compressed, and the | |
// message is marked with a compress=true attribute. | |
func (f *pubsubExportTarget) Publish(ctx context.Context, msg Message) error { | |
data, _ := json.Marshal(msg) | |
if f.opt.Compress { | |
data, _ = f.compress(data) | |
} | |
// enqueue marks a message as available to be sent, passing it | |
// to the Pub/Sub client | |
f.enqueue(ctx, &pubsub.Message{ | |
Data: data, | |
Attributes: map[string]string{ | |
"compress": fmt.Sprintf("%v", f.opt.Compress), | |
}, | |
}) | |
return nil | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment