This is a series of gists documenting testing done with the numeric.mapping
option in Kafka Connect.
- Oracle
- MS SQL Server
- Postgres
- MySQL - n/a because of #563
—@rmoff January 9, 2019
@Bean | |
public ConcurrentKafkaListenerContainerFactory<String, GenericRecord> | |
kafkaListenerContainerFactory() { | |
ConcurrentKafkaListenerContainerFactory<String, GenericRecord> factory | |
= new ConcurrentKafkaListenerContainerFactory<>(); | |
factory.setConsumerFactory(consumerFactory()); | |
factory.getContainerProperties().setErrorHandler(new ErrorHandler() { |
/* | |
* Available context bindings: | |
* COLUMNS List<DataColumn> | |
* ROWS Iterable<DataRow> | |
* OUT { append() } | |
* FORMATTER { format(row, col); formatValue(Object, col) } | |
* TRANSPOSED Boolean | |
* plus ALL_COLUMNS, TABLE, DIALECT | |
* | |
* where: |
plugins { | |
id 'java' | |
id 'application' | |
} | |
repositories { | |
jcenter() | |
} | |
dependencies { |
StreamsBuilder builder = new StreamsBuilder(); | |
builder | |
.stream("inJlinkTopic", Consumed.with(Serdes.String(), Serdes.String())) | |
.peek((key, value) -> log.info("Received message: {}", value)) | |
.filter((key, value) -> "PASS".equals(value)) | |
.to("outJlinkTopic", Produced.with(Serdes.String(), Serdes.String())); | |
KafkaStreams streams = new KafkaStreams(builder.build(), config); | |
streams.start(); |
/* | |
* Licensed to the Apache Software Foundation (ASF) under one or more | |
* contributor license agreements. See the NOTICE file distributed with | |
* this work for additional information regarding copyright ownership. | |
* The ASF licenses this file to You under the Apache License, Version 2.0 | |
* (the "License"); you may not use this file except in compliance with | |
* the License. You may obtain a copy of the License at | |
* | |
* http://www.apache.org/licenses/LICENSE-2.0 | |
* |
This is a series of gists documenting testing done with the numeric.mapping
option in Kafka Connect.
—@rmoff January 9, 2019
// imports and license left out for clarity | |
public class OptimizedStreams { | |
public static void main(String[] args) { | |
final Properties properties = new Properties(); | |
properties.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "test-application"); | |
properties.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092 "); | |
properties.setProperty(StreamsConfig.TOPOLOGY_OPTIMIZATION, StreamsConfig.OPTIMIZE); | |
properties.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); |
Kafka 0.11.0.0 (Confluent 3.3.0) added support to manipulate offsets for a consumer group via cli kafka-consumer-groups
command.
kafka-consumer-groups --bootstrap-server <kafkahost:port> --group <group_id> --describe
Note the values under "CURRENT-OFFSET" and "LOG-END-OFFSET". "CURRENT-OFFSET" is the offset where this consumer group is currently at in each of the partitions.
dataByAirportStream.join(regressionsByAirPortTable, | |
(k, v) -> k, | |
DataRegression::new) | |
.mapValues(Predictor::predict) |
Here are my attempts to script an IntelliJ-based IDE.
IDE Scripting Console is backed by JSR-223 (javax.script.*) API.
Groovy, Clojure, JavaScript and other scripting languages may be used.
Open IDE Scripting Console, type a statement, hit Ctrl-Enter to execute the current line or selection.
.profile.language-extension file in the same directory will be executed along with it if present.