|
/* eslint-disable no-restricted-syntax */ |
|
const os = require('os'); |
|
const ping = require('./ping'); |
|
const docker = require('./docker'); |
|
|
|
async function waitUntilKafkaStart(millis) { |
|
const endTime = Date.now() + millis; |
|
|
|
while (Date.now() < endTime) { |
|
try { |
|
// eslint-disable-next-line no-await-in-loop |
|
await ping('localhost', '9092'); |
|
return; |
|
} catch (error) { |
|
// console.log('error pinging', error); |
|
} |
|
} |
|
|
|
throw new Error('timed out while waiting for kafka to start. please try again. if problem perisists, try adding more time to the timeout.'); |
|
} |
|
function sleep(millis) { |
|
return new Promise(resolve => setTimeout(resolve, millis)); |
|
} |
|
|
|
function getHostIp() { |
|
const ifaces = os.networkInterfaces(); |
|
const ips = []; |
|
|
|
// eslint-disable-next-line guard-for-in |
|
for (const ifname in ifaces) { |
|
ifaces[ifname].forEach((iface) => { |
|
if (iface.family !== 'IPv4' || iface.internal) { |
|
return; |
|
} |
|
ips.push({ |
|
name: ifname, |
|
address: iface.address, |
|
}); |
|
}); |
|
} |
|
|
|
const ip = ips[0]; |
|
|
|
if (!ip) { |
|
throw new Error('no ip address found on this machine!'); |
|
} |
|
|
|
return ip.address; |
|
} |
|
|
|
|
|
class KafkaDocker { |
|
constructor(options) { |
|
this.options = { |
|
logger: console, |
|
containerName: 'kafka-for-testing', |
|
...options, |
|
}; |
|
} |
|
|
|
container() { |
|
return docker.findContainer(this.options.containerName); |
|
} |
|
|
|
async start() { |
|
this.deleteContainer(); |
|
|
|
this.options.logger.log('starting kafka container...'); |
|
this.createAndStartContainer(); |
|
|
|
await waitUntilKafkaStart(30 * 1000); |
|
await sleep(5000); // sleep a little for brokers to get up and running |
|
this.options.logger.log('kafka started at localhost:9092'); |
|
} |
|
|
|
async stop() { |
|
const container = await this.container(); |
|
await container.stop(); |
|
} |
|
|
|
async createAndStartContainer() { |
|
const ip = getHostIp(); |
|
|
|
docker.createContainer({ |
|
name: this.options.containerName, |
|
ports: [ |
|
'2181:2181', |
|
'9092:9092', |
|
], |
|
env: [ |
|
`ADVERTISED_HOST=${ip}`, |
|
'ADVERTISED_PORT=9092', |
|
'AUTO_CREATE_TOPICS=true', |
|
], |
|
image: 'spotify/kafka', |
|
}).start(); |
|
} |
|
|
|
async createTopic(topic) { |
|
const container = await this.container(); |
|
const output = container.exec([ |
|
'/opt/kafka_2.11-0.10.1.0/bin/kafka-topics.sh', |
|
'--create', |
|
'--zookeeper', |
|
'localhost:2181', |
|
'--replication-factor', |
|
1, |
|
'--partitions', |
|
1, |
|
'--topic', |
|
topic, |
|
]); |
|
this.options.logger.log(output); |
|
} |
|
|
|
async deleteContainer() { |
|
const container = this.container(); |
|
|
|
if (container) { |
|
this.options.logger.log('deleting existing container...'); |
|
container.remove(true); |
|
this.options.logger.log('container deleted'); |
|
} |
|
} |
|
} |
|
|
|
module.exports = new KafkaDocker(); |