Last active
November 12, 2020 14:01
-
-
Save AcidLeroy/fc45b8a44fe41f97e3a6bc0b0543f679 to your computer and use it in GitHub Desktop.
Server side event throughput client test for RIG.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
const uuid = require('uuid'); | |
var kafka = require('kafka-node') | |
const kafkaHost = process.env.KAFKA_HOST || "localhost:9092"; | |
HighLevelProducer = kafka.HighLevelProducer, | |
client = new kafka.KafkaClient({ | |
kafkaHost: kafkaHost | |
}); | |
console.log(`Publishing to kafka host "${kafkaHost}"`); | |
let payload = Buffer.alloc(1e5, 'z'); | |
let producer = new HighLevelProducer(client); | |
let cloudEvent = { | |
"specversion": "0.2", | |
"type": "test-topic", | |
"source": "rig", | |
"id": uuid.v4(), | |
"data": payload.toString() | |
}; | |
let payloads = [ | |
{topic: 'test-topic', messages: [JSON.stringify(cloudEvent)]} | |
]; | |
producer.on('ready', function() { | |
console.log("READY"); | |
function send() { | |
producer.send(payloads, function (err, data) { | |
// console.log("Sent"); | |
if (err) { | |
console.log('Received an error sending data to Kafka: ', err); | |
return; | |
} | |
send(); | |
}); | |
} | |
send() | |
}) | |
producer.on('error', function(err) { | |
console.log("We got an error: ", err); | |
}); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
const kafka = require('kafka-node'); | |
const Consumer = kafka.Consumer; | |
const Offset = kafka.Offset; | |
const kafkaHost = process.env.KAFKA_HOST || "localhost:9092" | |
let client = new kafka.KafkaClient({ | |
kafkaHost: kafkaHost | |
}); | |
var offset = new Offset(client); | |
console.log(`Subscribing to kafka at "${kafkaHost}"`); | |
const kafkaTopic='test-topic'; | |
let consumer = new Consumer( | |
client, | |
[ | |
{ | |
topic: kafkaTopic | |
} | |
], | |
{ | |
autoCommit: true | |
} | |
); | |
let start = null; | |
let totalData = 0; | |
consumer.on('message', function (message) { | |
//console.log('got a message'); | |
if (start === null) { | |
start = Date.now(); | |
// console.log("message = ", message); | |
} | |
//console.log(`last message length = ${message.value.length}`) | |
totalData += message.value.length; | |
let now = Date.now(); | |
const totalMB = totalData/1e6; | |
const diffSeconds = (now-start)/1e3; | |
console.log(`Throughput = ${totalMB/diffSeconds} MB/s `) | |
}); | |
consumer.on('error', function (err) { | |
console.log('got an error: ', err); | |
}); | |
consumer.on('offsetOutOfRange', function (topic) { | |
topic.maxNum = 2; | |
offset.fetch([topic], function (err, offsets) { | |
if (err) { | |
return console.error(err); | |
} | |
var min = Math.min.apply(null, offsets[topic.topic][topic.partition]); | |
consumer.setOffset(topic.topic, topic.partition, min); | |
}); | |
}); | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
const EventSource = require('eventsource'); | |
const fetch = require('node-fetch'); | |
const host = process.env.RIG_HOST || "http://localhost:4000" | |
const endpoint=`${host}/_rig/v1/connection/sse` | |
const kafkaTopic = "test-topic" | |
var source = new EventSource(endpoint) | |
source.onopen = (e) => console.log("open", e) | |
source.onmessage = (e) => console.log("message", e) | |
source.onerror = (e) => console.log("error", e) | |
let count = 0; | |
let totalBytes = 0; | |
let timeNow = 0; | |
source.addEventListener(kafkaTopic, e => { | |
console.log('Received data') | |
if (timeNow === 0) { | |
timeNow = Date.now(); | |
} | |
const diff = Date.now() - timeNow; | |
console.log(`last message length = ${e.data.length}`) | |
totalBytes += e.data.length; | |
const sizeMB = totalBytes/1e6; | |
console.log(`Throughput = ${sizeMB/(diff/1e3)} MB/s`); | |
count++; | |
}) | |
source.addEventListener("error", function (e) { | |
if (e.readyState == EventSource.CLOSED) { | |
console.log("Connection was closed.") | |
} else { | |
console.log("Connection error:", e) | |
} | |
}, false); | |
source.addEventListener("rig.connection.create", async function (e) { | |
let cloudEvent = JSON.parse(e.data) | |
let payload = cloudEvent.data | |
let connectionToken = payload["connection_token"] | |
console.log('cloudEvent ', cloudEvent); | |
console.log('connection Token = ', connectionToken); | |
let res = await createSubscription(connectionToken) | |
let text = await res.text() | |
console.log('DONE===> ', text) | |
}, false); | |
function createSubscription(connectionToken) { | |
const eventType = kafkaTopic; | |
const token = connectionToken; | |
const route=`${host}/_rig/v1/connection/sse/${token}/subscriptions`; | |
console.log('route = ', route); | |
return fetch(route, { | |
method: "PUT", | |
headers: { "Content-Type": "application/json" }, | |
body: JSON.stringify({ | |
"subscriptions": [{ | |
"eventType": eventType | |
}] | |
}) | |
}) | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
const WebSocket = require('websocket').w3cwebsocket; | |
const fetch = require('node-fetch'); | |
const host = process.env.RIG_HOST || "https://localhost:4000" | |
const wsHost = process.env.WS_RIG_HOST || "wss://localhost:4000" | |
const kafkaTopic = 'test-topic'; | |
let count = 0; | |
let totalBytes = 0; | |
let timeNow = 0; | |
function addEvent(e) { | |
//console.log('Received data') | |
if (timeNow === 0) { | |
timeNow = Date.now(); | |
} | |
const diff = Date.now() - timeNow; | |
console.log(`last message length = ${e.data.length}`) | |
totalBytes += e.data.length; | |
const sizeMB = totalBytes/1e6; | |
console.log(`Throughput = ${sizeMB/(diff/1e3)} MB/s`); | |
count++; | |
} | |
const baseUrl = `${host}/_rig/v1` | |
const wsUrl = `${wsHost}/_rig/v1` | |
function createSubscription(connectionToken) { | |
const eventType = kafkaTopic; | |
return fetch(`${baseUrl}/connection/ws/${connectionToken}/subscriptions`, { | |
method: "PUT", | |
mode: "cors", | |
headers: { | |
"Content-Type": "application/json; charset=utf-8" | |
}, | |
body: JSON.stringify({ "subscriptions": [{ "eventType": eventType }] }) | |
}) | |
.then(json => { | |
console.log(`Subscription created to kafkaTopic "${kafkaTopic}"`) | |
return json | |
}) | |
.catch(err => { | |
console.log("Failed to create subscription:", err) | |
}) | |
} | |
const source = new WebSocket(`${wsUrl}/connection/ws`) | |
source.onopen = (e) => console.log("Connection opened") | |
source.onmessage = (e) => { | |
const cloudEvent = JSON.parse(e.data) | |
if (cloudEvent.type === 'rig.connection.create') { | |
payload = cloudEvent.data | |
connectionToken = payload["connection_token"] | |
createSubscription(connectionToken) | |
} else if (cloudEvent.type === kafkaTopic) { | |
addEvent(cloudEvent) | |
} else if (cloudEvent.type === 'rig.subscriptions_set') { | |
} | |
} | |
source.onerror = (e) => console.log("error", e) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment