Test 1)
Forcing the seed broker to fail with ECONNRESET
. The consumer example on the repo was changed to:
const kafka = new Kafka({
brokers: ['127.0.0.1:9094'],
// other configs
})
Test 1)
Forcing the seed broker to fail with ECONNRESET
. The consumer example on the repo was changed to:
const kafka = new Kafka({
brokers: ['127.0.0.1:9094'],
// other configs
})
const createProducer = require('../../producer') | |
const createConsumer = require('../index') | |
const { Types, Codecs } = require('../../protocol/message/compression') | |
const LZ4 = require('kafkajs-lz4') | |
Codecs[Types.LZ4] = new LZ4().codec | |
const { | |
secureRandom, | |
createCluster, |
const fs = require('fs') | |
const ip = require('ip') | |
const cluster = require('cluster') | |
const { Kafka, logLevel } = require('../index') | |
const errorTypes = ['unhandledRejection', 'uncaughtException'] | |
const signalTraps = ['SIGTERM', 'SIGINT', 'SIGUSR2'] | |
const host = process.env.HOST_IP || ip.address() |
const { promisify } = require('util') | |
const snappy = require('snappy') | |
const snappyCompress = promisify(snappy.compress) | |
const snappyDecompress = promisify(snappy.uncompress) | |
const XERIAL_HEADER = Buffer.from([130, 83, 78, 65, 80, 80, 89, 0]) | |
const SIZE_BYTES = 4 | |
const SIZE_OFFSET = 16 |
$ phobos start | |
[2016-08-13T17:29:59:218+0200Z] INFO -- Phobos : <Hash> {:message=>"Phobos configured", | |
:env=>"development"} | |
______ _ _ | |
| ___ \ | | | | |
| |_/ / |__ ___ | |__ ___ ___ | |
| __/| '_ \ / _ \| '_ \ / _ \/ __| | |
| | | | | | (_) | |_) | (_) \__ \ | |
\_| |_| |_|\___/|_.__/ \___/|___/ |
class MyHandler | |
include PhobosDBCheckpoint::Handler | |
def consume(payload, metadata) | |
my_event = JSON.parse(payload) | |
# <-- your logic (which possibly skips messages) here | |
ack(my_event['id'], Time.now) | |
end | |
end |
class MyHandler | |
include Phobos::Handler | |
def self.start(kafka_client) | |
# setup handler | |
end | |
def self.stop | |
# teardown | |
end |
class MyHandler | |
include Phobos::Handler | |
def consume(payload, metadata) | |
# payload - This is the content of your Kafka message, Phobos does not attempt to | |
# parse this content, it is delivered raw to you | |
# metadata - A hash with useful information about this event, it contains: key, | |
# partition, offset, retry_count, topic, group_id, and listener_id | |
end | |
end |
# Gemfile | |
platforms :ruby do | |
gem 'byebug' | |
end | |
platforms :jruby do | |
gem 'pry' | |
end |
# ... | |
Gem::Specification.new do |spec| | |
# ... | |
if RUBY_PLATFORM =~ /java/ | |
spec.platform = 'java' | |
spec.add_dependency 'activerecord-jdbcpostgresql-adapter' | |
else | |
spec.add_dependency 'pg' | |
end | |
end |