Skip to content

Instantly share code, notes, and snippets.

View haigopi's full-sized avatar
💭
Who loves to code!

Gopi haigopi

💭
Who loves to code!
View GitHub Profile
@Slf4j
public class FluxTest {
@Test
public void test() {
Flux<String> b = Flux.just("Bob cat", "is in the ", "party").delayElements(Duration.ofSeconds(2));
Flux.first(b)
@Slf4j
public class FluxPublishTest {
@Test
public void test() {
Scheduler scheduler = Schedulers.immediate();
Flux.range(1, 5)
.map(n -> Mono.just(n).publishOn(scheduler).map(k -> process(k))
@Slf4j
public class FluxPublishTest {
@Test
public void test() {
Scheduler scheduler = Schedulers.newParallel("P");
Flux.range(1, 6)
.parallel()
@Slf4j
public class FluxPublishTest {
@Test
public void test() {
Scheduler scheduler = Schedulers.single();
Flux.range(1, 6)
.parallel()
@Slf4j
public class FluxPublishTest {
@Test
public void test() {
Scheduler scheduler = Schedulers.elastic();
Flux.range(1, 6)
.parallel()
$ ./confluent start
Starting zookeeper
zookeeper is [UP]
Starting kafka
kafka is [UP]
Starting schema-registry
schema-registry is [UP]
Starting kafka-rest
kafka-rest is [UP]
#=
ModuleCaller:
- Julia version: 1.1.0
- Author: GOPI K KANCHARLA
- Date: 2019-02-25
=#
module Bar
#=
ReadData:
- Julia version:
- Author: GOPI K KANCHARLA
- Date: 2019-02-23
=#
println("typeof(1): ", typeof(1))
println("Sys.WORD_SIZE: ",Sys.WORD_SIZE)
@haigopi
haigopi / Retry.java
Created March 5, 2019 01:30
Reactor Kafka Perfect Retry
ReceiverOptions<Integer, String> options = receiverOptions.subscription(Collections.singleton(topic))
.addAssignListener(partitions -> log.info("Partitions Assigned {}", AOUtilities.printPartitions(partitions)))
.addRevokeListener(partitions -> log.info("Partitions Revoked {}", partitions))
// .assignment(Collections.singleton(new TopicPartition(topic, 5))) // <-- ** EASY TEST ONLY **
.commitInterval(Duration.ZERO)
.commitBatchSize(0)// <-- interval and batch size: can dramatically improve the performance. Make sure to test the retries as a side effect.
.consumerProperty(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
KafkaReceiver<Integer, String> receiver = KafkaReceiver.create(options);
errorCodes:
codeDescription:
application:
NULL_EMPTY_CHECK: 101 | Application details cannot be Empty
"[application.applicationType]":