Created
January 9, 2020 14:34
-
-
Save llimacruz/6e8aac2f8fa36723a82c885d4e2e1b6d to your computer and use it in GitHub Desktop.
Streams - branch
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
const kafka = require('kafka-node'); | |
const ConsumerGroup = kafka.ConsumerGroup; | |
const topic = process.argv[2]; | |
console.log('topic name:', topic); | |
const consumer = new ConsumerGroup( | |
{ kafkaHost: 'localhost:9092', groupId: 'node-consumer'}, | |
topic | |
) | |
let count = 1; | |
consumer.on('message', message => { | |
console.log(count++, 'Recebi a mensagem: ', message.value); | |
}) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
var { random } = require('lodash'); | |
const kafka = require('kafka-node'); | |
const ObjectID = require("bson-objectid"); | |
const HighLevelProducer = kafka.HighLevelProducer; | |
const client = new kafka.KafkaClient({ kafkaHost: '127.0.0.1:9092' }); | |
const producer = new HighLevelProducer(client); | |
producer.on('ready', () => { | |
console.log('Iniciado!'); | |
processManyMessages(); | |
}) | |
let deliveryEngines = ['xewards', 'xdh', 'coupon']; | |
let experiences = ['itau', 'bb', 'bradesco', 'marisa']; | |
const processManyMessages = () => { | |
for (let i = 0; i < process.argv[2]; i++) { | |
const messages = []; | |
const paymentId = ObjectID(); | |
messages.push(JSON.stringify({ | |
id: paymentId, | |
experience: experiences[random(0, 3)], | |
deliveryEngine: deliveryEngines[random(0, 2)], | |
})); | |
const payloads = [{ topic: 'payments', messages }]; | |
producer.send(payloads, (err, data) => { | |
if (err) { | |
console.log('err', err) | |
return; | |
} | |
console.log(data); | |
}); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
const { KafkaStreams } = require("kafka-streams"); | |
const config = { | |
"noptions": { | |
"metadata.broker.list": "localhost:9092", | |
"group.id": "kafka-streams-test-native1", | |
"client.id": "kafka-streams-test-name-native", | |
"event_cb": true, | |
"compression.codec": "snappy", | |
"api.version.request": true, | |
"socket.keepalive.enable": true, | |
"socket.blocking.max.ms": 100, | |
"enable.auto.commit": false, | |
"auto.commit.interval.ms": 100, | |
"heartbeat.interval.ms": 250, | |
"retry.backoff.ms": 250, | |
"fetch.min.bytes": 100, | |
"fetch.message.max.bytes": 2 * 1024 * 1024, | |
"queued.min.messages": 100, | |
"fetch.error.backoff.ms": 100, | |
"queued.max.messages.kbytes": 50, | |
"fetch.wait.max.ms": 1000, | |
"queue.buffering.max.ms": 1000, | |
"batch.num.messages": 10000 | |
}, | |
"tconf": { | |
"auto.offset.reset": "earliest", | |
"request.required.acks": 1 | |
}, | |
"batchOptions": { | |
"batchSize": 10, | |
"commitEveryNBatch": 1, | |
"concurrency": 1, | |
"commitSync": false, | |
"noBatchCommits": false | |
} | |
} | |
const kafkaStreams = new KafkaStreams(config); | |
kafkaStreams.on("error", (error) => console.error('Deu ruim:', error)); | |
const filterBranchCoupon = message => { | |
const msg = JSON.parse(message.value); | |
return msg.deliveryEngine === 'coupon'; | |
} | |
const filterBranchXDH = message => { | |
const msg = JSON.parse(message.value); | |
return msg.deliveryEngine === 'xdh'; | |
} | |
const filterBranchX1 = message => { | |
const msg = JSON.parse(message.value); | |
if (msg.deliveryEngine === 'xewards') { | |
if (msg.experience === 'itau') { | |
return true; | |
} | |
const char = msg.id.charCodeAt(msg.id.length - 1); | |
return (char % 2 === 1); | |
} | |
return false; | |
} | |
const filterBranchX2 = message => { | |
const msg = JSON.parse(message.value); | |
if (msg.deliveryEngine === 'xewards') { | |
if (msg.experience === 'itau') { | |
return false; | |
} | |
const char = msg.id.charCodeAt(msg.id.length - 1); | |
return (char % 2 === 0); | |
} | |
return false; | |
} | |
const kafkaTopicName = "payments"; | |
const consumerEntrada = kafkaStreams.getKStream(kafkaTopicName); | |
const [ | |
branchCoupon, | |
branchXDH, | |
branchXewards1, | |
branchXewards2 | |
] = | |
consumerEntrada.branch([ | |
filterBranchCoupon, | |
filterBranchXDH, | |
filterBranchX1, | |
filterBranchX2 | |
]); | |
const producerPromiseCoupon = branchCoupon | |
.mapJSONConvenience() | |
.mapWrapKafkaValue() | |
.tap((msg) => console.log("Coupon", JSON.stringify(msg))) | |
.to("coupon", 1, "buffer"); | |
const producerPromiseXDH = branchXDH | |
.mapJSONConvenience() | |
.mapWrapKafkaValue() | |
.tap((msg) => console.log("XDH ", JSON.stringify(msg))) | |
.to("xdh", 1, "buffer"); | |
const producerPromiseXewards1 = branchXewards1 | |
.mapJSONConvenience() | |
.mapWrapKafkaValue() | |
.tap((msg) => console.log("X1 ", JSON.stringify(msg))) | |
.to("xewards1", 1, "buffer"); | |
const producerPromiseXewards2 = branchXewards2 | |
.mapJSONConvenience() | |
.mapWrapKafkaValue() | |
.tap((msg) => console.log("X2 ", JSON.stringify(msg))) | |
.to("xewards2", 1, "buffer"); | |
Promise.all([ | |
producerPromiseCoupon, | |
producerPromiseXDH, | |
producerPromiseXewards1, | |
producerPromiseXewards2, | |
consumerEntrada.start(), | |
]).then(() => { | |
console.log("Stream started, as kafka consumer and producers are ready."); | |
}, (error) => { | |
console.log("Streaming operation failed to start: ", error); | |
}); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment