This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// imports and license left out for clarity | |
public class OptimizedStreams { | |
public static void main(String[] args) { | |
final Properties properties = new Properties(); | |
properties.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "test-application"); | |
properties.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092 "); | |
properties.setProperty(StreamsConfig.TOPOLOGY_OPTIMIZATION, StreamsConfig.OPTIMIZE); | |
properties.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import org.apache.kafka.common.serialization.Serdes; | |
import org.apache.kafka.streams.KafkaStreams; | |
import org.apache.kafka.streams.StreamsBuilder; | |
import org.apache.kafka.streams.StreamsConfig; | |
import org.apache.kafka.streams.kstream.KStream; | |
import com.github.megachucky.kafka.streams.machinelearning.TensorflowObjectRecogniser; |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Configure Kafka Streams Application | |
final String bootstrapServers = args.length > 0 ? args[0] : "localhost:9092"; | |
final Properties streamsConfiguration = new Properties(); | |
// Give the Streams application a unique name. The name must be unique | |
// in the Kafka cluster against which the application is run. | |
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "kafka-streams-tensorflow-serving-gRPC-example"); | |
// Where to find Kafka broker(s). |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
KStream<String, Object> transformedMessage = imageInputLines.mapValues(value -> { | |
System.out.println("Image path: " + value); | |
imagePath = value; | |
TensorflowObjectRecogniser recogniser = new TensorflowObjectRecogniser(server, port); | |
System.out.println("Image = " + imagePath); | |
InputStream jpegStream; |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Start Kafka Streams Application to process new incoming images from the Input Topic | |
final KafkaStreams streams = new KafkaStreams(builder.build(), streamsConfiguration); | |
streams.start(); |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import org.apache.kafka.streams.KafkaStreams; | |
import org.apache.kafka.streams.KeyValue; | |
import org.apache.kafka.streams.StreamsBuilder; | |
import org.apache.kafka.streams.StreamsConfig; | |
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; | |
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils; | |
import org.apache.kafka.streams.kstream.KStream; | |
import org.deeplearning4j.nn.modelimport.keras.KerasModelImport; | |
import org.deeplearning4j.nn.multilayer.MultiLayerNetwork; |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Step 1: Load Keras TensorFlow Model using DeepLearning4J API | |
String simpleMlp = new ClassPathResource("generatedModels/Keras/simple_mlp.h5").getFile().getPath(); | |
System.out.println(simpleMlp.toString()); | |
MultiLayerNetwork model = KerasModelImport.importKerasSequentialModelAndWeights(simpleMlp); |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Configure Kafka Streams Application | |
Properties streamsConfiguration = new Properties(); | |
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "kafka-streams-tensorflow-keras-integration-test"); | |
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); | |
// Specify default (de)serializers for record keys and for record values | |
streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); | |
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
final KStream<String, String> inputEvents = builder.stream(inputTopic); | |
inputEvents.foreach((key, value) -> { | |
// Transform input values (list of Strings) to expected DL4J parameters (two Integer values): | |
String[] valuesAsArray = value.split(","); | |
INDArray input = Nd4j.create(Integer.parseInt(valuesAsArray[0]), Integer.parseInt(valuesAsArray[1])); | |
// Model inference in real time: | |
output = model.output(input); |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
final KafkaStreams streams = new TestKafkaStreams(builder.build(), streamsConfiguration); | |
streams.cleanUp(); | |
streams.start(); |