This is a series of gists documenting testing done with the numeric.mapping option in Kafka Connect.
- Oracle
- MS SQL Server
- Postgres
- MySQL - n/a because of #563
—@rmoff January 9, 2019
| @Bean | |
| public ConcurrentKafkaListenerContainerFactory<String, GenericRecord> | |
| kafkaListenerContainerFactory() { | |
| ConcurrentKafkaListenerContainerFactory<String, GenericRecord> factory | |
| = new ConcurrentKafkaListenerContainerFactory<>(); | |
| factory.setConsumerFactory(consumerFactory()); | |
| factory.getContainerProperties().setErrorHandler(new ErrorHandler() { |
| /* | |
| * Available context bindings: | |
| * COLUMNS List<DataColumn> | |
| * ROWS Iterable<DataRow> | |
| * OUT { append() } | |
| * FORMATTER { format(row, col); formatValue(Object, col) } | |
| * TRANSPOSED Boolean | |
| * plus ALL_COLUMNS, TABLE, DIALECT | |
| * | |
| * where: |
| plugins { | |
| id 'java' | |
| id 'application' | |
| } | |
| repositories { | |
| jcenter() | |
| } | |
| dependencies { |
| StreamsBuilder builder = new StreamsBuilder(); | |
| builder | |
| .stream("inJlinkTopic", Consumed.with(Serdes.String(), Serdes.String())) | |
| .peek((key, value) -> log.info("Received message: {}", value)) | |
| .filter((key, value) -> "PASS".equals(value)) | |
| .to("outJlinkTopic", Produced.with(Serdes.String(), Serdes.String())); | |
| KafkaStreams streams = new KafkaStreams(builder.build(), config); | |
| streams.start(); |
| /* | |
| * Licensed to the Apache Software Foundation (ASF) under one or more | |
| * contributor license agreements. See the NOTICE file distributed with | |
| * this work for additional information regarding copyright ownership. | |
| * The ASF licenses this file to You under the Apache License, Version 2.0 | |
| * (the "License"); you may not use this file except in compliance with | |
| * the License. You may obtain a copy of the License at | |
| * | |
| * http://www.apache.org/licenses/LICENSE-2.0 | |
| * |
This is a series of gists documenting testing done with the numeric.mapping option in Kafka Connect.
—@rmoff January 9, 2019
| // imports and license left out for clarity | |
| public class OptimizedStreams { | |
| public static void main(String[] args) { | |
| final Properties properties = new Properties(); | |
| properties.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "test-application"); | |
| properties.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092 "); | |
| properties.setProperty(StreamsConfig.TOPOLOGY_OPTIMIZATION, StreamsConfig.OPTIMIZE); | |
| properties.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); |
Kafka 0.11.0.0 (Confluent 3.3.0) added support to manipulate offsets for a consumer group via cli kafka-consumer-groups command.
kafka-consumer-groups --bootstrap-server <kafkahost:port> --group <group_id> --describeNote the values under "CURRENT-OFFSET" and "LOG-END-OFFSET". "CURRENT-OFFSET" is the offset where this consumer group is currently at in each of the partitions.
| dataByAirportStream.join(regressionsByAirPortTable, | |
| (k, v) -> k, | |
| DataRegression::new) | |
| .mapValues(Predictor::predict) |
Here are my attempts to script an IntelliJ-based IDE.
IDE Scripting Console is backed by JSR-223 (javax.script.*) API.
Groovy, Clojure, JavaScript and other scripting languages may be used.
Open IDE Scripting Console, type a statement, hit Ctrl-Enter to execute the current line or selection.
.profile.language-extension file in the same directory will be executed along with it if present.