-
-
Save andresaristizabal/43fdc519199a99384c6b3603ae65c9c9 to your computer and use it in GitHub Desktop.
wasm defintion for indexing record values to Elastic Search server
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
const { | |
SimpleTransform, | |
PolicyError, | |
} = require("@vectorizedio/wasm-api"); | |
const {Client} = require('@elastic/elasticsearch') | |
const client = new Client({ node: 'http://localhost:9200' }) | |
const transform = new SimpleTransform(); | |
/* Topics that fire the transform function */ | |
transform.subscribe(["produce"]); | |
/* The strategy the transform engine will use when handling errors */ | |
transform.errorHandler(PolicyError.SkipOnFailure); | |
/* Auxiliar transform function for records */ | |
const containHeader = (key, value, record) => | |
record.headers.some( header => | |
header.headerKey.equals(Buffer.from(key)) && | |
header.value.equals(Buffer.from(value)) | |
) | |
/* Transform function */ | |
transform.processRecord((recordBatch) => { | |
console.log("Applying wasm function") | |
return Promise.all( | |
recordBatch.records.map((record) => { | |
if (containHeader("save", "elastic", record)) { | |
console.log("Index record value to Elastic Search") | |
return client.index({ | |
index: "result_wasm", | |
type: "redpanda_wasm", | |
body: { | |
record_message: record.value.toString() | |
} | |
}) | |
} | |
}) | |
).then(() => { | |
const result = new Map(); | |
result.set("elasticIndex", recordBatch) | |
return result | |
}) | |
.catch((e) => { | |
// handling error by coprocessor promise and not | |
// coprocessor policy error | |
const result = new Map(); | |
console.log("fail to publish record batch to Elastic Search, " + | |
`record batch info`, recordBatch.header); | |
result.set("elasticIndexFail", recordBatch) | |
return result | |
}) | |
}); | |
exports["default"] = transform; |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment