Created
October 8, 2018 18:34
-
-
Save tulios/cfba2d2c3b8112bcfb98dcf12789cfde to your computer and use it in GitHub Desktop.
Unit test for KafkaJS LZ4 compression
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
const createProducer = require('../../producer') | |
const createConsumer = require('../index') | |
const { Types, Codecs } = require('../../protocol/message/compression') | |
const LZ4 = require('kafkajs-lz4') | |
Codecs[Types.LZ4] = new LZ4().codec | |
const { | |
secureRandom, | |
createCluster, | |
createTopic, | |
newLogger, | |
createModPartitioner, | |
waitForMessages, | |
testIfKafka011, | |
waitForConsumerToJoinGroup, | |
} = require('testHelpers') | |
describe('Consumer', () => { | |
let topicName, groupId, cluster, producer, consumer | |
beforeEach(async () => { | |
topicName = `test-topic-${secureRandom()}` | |
groupId = `consumer-group-id-${secureRandom()}` | |
await createTopic({ topic: topicName }) | |
}) | |
afterEach(async () => { | |
await consumer.disconnect() | |
await producer.disconnect() | |
}) | |
testIfKafka011('consume LZ4 messages with 0.11 format', async () => { | |
cluster = createCluster({ allowExperimentalV011: true }) | |
producer = createProducer({ | |
cluster, | |
createPartitioner: createModPartitioner, | |
logger: newLogger(), | |
}) | |
consumer = createConsumer({ | |
cluster, | |
groupId, | |
maxWaitTimeInMs: 100, | |
logger: newLogger(), | |
}) | |
await consumer.connect() | |
await producer.connect() | |
await consumer.subscribe({ topic: topicName, fromBeginning: true }) | |
const messagesConsumed = [] | |
consumer.run({ eachMessage: async event => messagesConsumed.push(event) }) | |
await waitForConsumerToJoinGroup(consumer) | |
const key1 = secureRandom() | |
const message1 = { | |
key: `key-${key1}`, | |
value: `value-${key1}`, | |
headers: { [`header-${key1}`]: `header-value-${key1}` }, | |
} | |
const key2 = secureRandom() | |
const message2 = { | |
key: `key-${key2}`, | |
value: `value-${key2}`, | |
headers: { [`header-${key2}`]: `header-value-${key2}` }, | |
} | |
await producer.send({ | |
acks: 1, | |
topic: topicName, | |
compression: Types.LZ4, | |
messages: [message1, message2], | |
}) | |
await expect(waitForMessages(messagesConsumed, { number: 2 })).resolves.toEqual([ | |
{ | |
topic: topicName, | |
partition: 0, | |
message: expect.objectContaining({ | |
key: Buffer.from(message1.key), | |
value: Buffer.from(message1.value), | |
headers: { | |
[`header-${key1}`]: Buffer.from(message1.headers[`header-${key1}`]), | |
}, | |
magicByte: 2, | |
offset: '0', | |
}), | |
}, | |
{ | |
topic: topicName, | |
partition: 0, | |
message: expect.objectContaining({ | |
key: Buffer.from(message2.key), | |
value: Buffer.from(message2.value), | |
headers: { | |
[`header-${key2}`]: Buffer.from(message2.headers[`header-${key2}`]), | |
}, | |
magicByte: 2, | |
offset: '1', | |
}), | |
}, | |
]) | |
}) | |
}) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment