Skip to content

Instantly share code, notes, and snippets.

View TristinDavis's full-sized avatar

Tristin Davis TristinDavis

  • Thrivent Financial
  • Arkansas, USA
  • 09:12 (UTC -05:00)
View GitHub Profile
@Bean
public ConcurrentKafkaListenerContainerFactory<String, GenericRecord>
kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, GenericRecord> factory
= new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.getContainerProperties().setErrorHandler(new ErrorHandler() {
/*
* Available context bindings:
* COLUMNS List<DataColumn>
* ROWS Iterable<DataRow>
* OUT { append() }
* FORMATTER { format(row, col); formatValue(Object, col) }
* TRANSPOSED Boolean
* plus ALL_COLUMNS, TABLE, DIALECT
*
* where:
plugins {
id 'java'
id 'application'
}
repositories {
jcenter()
}
dependencies {
StreamsBuilder builder = new StreamsBuilder();
builder
.stream("inJlinkTopic", Consumed.with(Serdes.String(), Serdes.String()))
.peek((key, value) -> log.info("Received message: {}", value))
.filter((key, value) -> "PASS".equals(value))
.to("outJlinkTopic", Produced.with(Serdes.String(), Serdes.String()));
KafkaStreams streams = new KafkaStreams(builder.build(), config);
streams.start();
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
@confluentgist
confluentgist / 00_numeric.mapping_README.md
Last active August 30, 2022 11:44 — forked from rmoff/00_numeric.mapping_README.md
Kafka Connect JDBC Connector - numeric.mapping
// imports and license left out for clarity
public class OptimizedStreams {
public static void main(String[] args) {
final Properties properties = new Properties();
properties.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "test-application");
properties.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092 ");
properties.setProperty(StreamsConfig.TOPOLOGY_OPTIMIZATION, StreamsConfig.OPTIMIZE);
properties.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
@marwei
marwei / how_to_reset_kafka_consumer_group_offset.md
Created November 9, 2017 23:39
How to Reset Kafka Consumer Group Offset

Kafka 0.11.0.0 (Confluent 3.3.0) added support to manipulate offsets for a consumer group via cli kafka-consumer-groups command.

  1. List the topics to which the group is subscribed
kafka-consumer-groups --bootstrap-server <kafkahost:port> --group <group_id> --describe

Note the values under "CURRENT-OFFSET" and "LOG-END-OFFSET". "CURRENT-OFFSET" is the offset where this consumer group is currently at in each of the partitions.

  1. Reset the consumer offset for a topic (preview)
@bbejeck
bbejeck / mappingValues.java
Last active July 25, 2019 23:54
Mapping the Values with Prediction
dataByAirportStream.join(regressionsByAirPortTable,
(k, v) -> k,
DataRegression::new)
.mapValues(Predictor::predict)

Here are my attempts to script an IntelliJ-based IDE.

IDE Scripting Console is backed by JSR-223 (javax.script.*) API.

Groovy, Clojure, JavaScript and other scripting languages may be used.

Open IDE Scripting Console, type a statement, hit Ctrl-Enter to execute the current line or selection.

.profile.language-extension file in the same directory will be executed along with it if present.