Skip to content

Instantly share code, notes, and snippets.

@andresaristizabal
Last active January 28, 2021 17:11
Show Gist options
  • Save andresaristizabal/43fdc519199a99384c6b3603ae65c9c9 to your computer and use it in GitHub Desktop.
Save andresaristizabal/43fdc519199a99384c6b3603ae65c9c9 to your computer and use it in GitHub Desktop.
wasm defintion for indexing record values to Elastic Search server
const {
SimpleTransform,
PolicyError,
} = require("@vectorizedio/wasm-api");
const {Client} = require('@elastic/elasticsearch')
const client = new Client({ node: 'http://localhost:9200' })
const transform = new SimpleTransform();
/* Topics that fire the transform function */
transform.subscribe(["produce"]);
/* The strategy the transform engine will use when handling errors */
transform.errorHandler(PolicyError.SkipOnFailure);
/* Auxiliar transform function for records */
const containHeader = (key, value, record) =>
record.headers.some( header =>
header.headerKey.equals(Buffer.from(key)) &&
header.value.equals(Buffer.from(value))
)
/* Transform function */
transform.processRecord((recordBatch) => {
console.log("Applying wasm function")
return Promise.all(
recordBatch.records.map((record) => {
if (containHeader("save", "elastic", record)) {
console.log("Index record value to Elastic Search")
return client.index({
index: "result_wasm",
type: "redpanda_wasm",
body: {
record_message: record.value.toString()
}
})
}
})
).then(() => {
const result = new Map();
result.set("elasticIndex", recordBatch)
return result
})
.catch((e) => {
// handling error by coprocessor promise and not
// coprocessor policy error
const result = new Map();
console.log("fail to publish record batch to Elastic Search, " +
`record batch info`, recordBatch.header);
result.set("elasticIndexFail", recordBatch)
return result
})
});
exports["default"] = transform;
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment