Created
October 2, 2018 20:05
-
-
Save dwelch2344/fabec47e0da62e8bc9578b15d7e845c9 to your computer and use it in GitHub Desktop.
kafka avro nodejs example
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
const registryUrl = 'http://schema-registry:8081' | |
const avro = require('avsc') | |
const registry = require('avro-schema-registry')(registryUrl) | |
const kafka = require('kafka-node') | |
const client = new kafka.KafkaClient({ kafkaHost: 'kafka:9092' }) | |
const topic = { | |
topic: 'actor7', | |
offset: 3695 | |
} | |
const opts = { | |
// groupId: 'dummy-' + new Date().getTime(), | |
groupId: 'dummy', | |
fromOffset: true | |
} | |
// don't make this go up linter | |
;(async () => { | |
const results = [] | |
// ensure we connect | |
await client.once('connect', msg => { | |
console.log('connect', { msg }) | |
}) | |
// wire up our stuff | |
const consumer = new kafka.Consumer(client, [topic], opts) | |
consumer.on('error', err => { | |
console.warn(err) | |
}) | |
consumer.on('message', async __message => { | |
const key = await registry.decode(new Buffer(__message.key)) | |
const value = await registry.decode(new Buffer(__message.value)) | |
results.push({ __message, key, value }) | |
}) | |
console.log('starting Timer') | |
setTimeout(() => { | |
console.log(results) | |
}, 5 * 1000) | |
})() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
const registryUrl = 'http://schema-registry:8081' | |
const avro = require('avsc') | |
const registry = require('avro-schema-registry')(registryUrl) | |
const kafka = require('kafka-node') | |
const request = require('request') | |
const client = new kafka.KafkaClient({ kafkaHost: 'kafka:9092' }) | |
const topic = { | |
topic: 'actors7' | |
} | |
const opts = { | |
// Configuration for when to consider a message as acknowledged, default 1 | |
requireAcks: 1, | |
// The amount of time in milliseconds to wait for all acks before considered, default 100ms | |
ackTimeoutMs: 100, | |
// Partitioner type (default = 0, random = 1, cyclic = 2, keyed = 3, custom = 4), default 0 | |
partitionerType: 3 | |
} | |
const customPartitioners = undefined | |
// const customPartitioners = [{...}] | |
// don't make this go up linter | |
;(async () => { | |
const schemas = { | |
key: await fetchSchema(registryUrl, `${topic.topic}-key`, 1), | |
value: await fetchSchema(registryUrl, `${topic.topic}-value`, 1) | |
} | |
// ensure we connect | |
await client.once('connect', msg => { | |
console.log('connect', { msg }) | |
}) | |
console.log('connected') | |
const producer = new kafka.Producer(client, opts, customPartitioners) | |
console.log('Producer created', producer) | |
producer.on('error', err => { | |
console.warn('Producer error', err) | |
}) | |
// producer.on('ready', async () => { | |
console.log('producer ready') | |
const payloads = [] | |
for (let id = 0; id < 10; id++) { | |
const k = { | |
id: id.toString(), | |
timestamp: `2017-05-26 14:${id}:57.62` | |
} | |
const v = { | |
id, | |
firstName: 'First' + id, | |
lastName: 'Last' + id, | |
lastUpdate: k.timestamp | |
} | |
const key = await registry.encodeKey(topic.topic, schemas.key, k) | |
const value = await registry.encodeMessage(topic.topic, schemas.value, v) | |
// const value = await registry.encodeMessage(topic.topic, schemas.value, v) | |
payloads.push({ | |
topic: topic.topic, | |
messages: [new kafka.KeyedMessage(key, value)] | |
}) | |
} | |
producer.send(payloads, function(err, data) { | |
if (err) { | |
console.warn('errored', err) | |
} else { | |
console.log(data) | |
} | |
}) | |
})() | |
function fetchSchema(registryUrl, topic, version) { | |
return new Promise((resolve, reject) => { | |
request( | |
`${registryUrl}/subjects/${topic}/versions/${version}/schema`, | |
(err, res, body) => { | |
if (res.statusCode !== 200) { | |
const error = JSON.parse(data) | |
return reject( | |
new Error( | |
`Schema registry error: ${error.error_code} - ${error.message}` | |
) | |
) | |
} | |
resolve(JSON.parse(body)) | |
} | |
) | |
}) | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{ | |
"avro-schema-registry": "^1.1.0", | |
"avsc": "^5.4.3", | |
"kafka-node": "3.0.1", | |
"request": "2.88.0" | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment