This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{ | |
"type": "record", | |
"name": "myrecord", | |
"namespace": "loginStream", | |
"fields": [ | |
{ | |
"name": "userName", | |
"type": "string" | |
}, | |
{ |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public static void registerSchema(String topic,String schemaPath, String schemaUrl) throws IOException, RestClientException { | |
// subject convention is "<topic-name>-value" | |
String subject = topic + "-value"; | |
String schema; | |
FileInputStream inputStream = new FileInputStream(schemaPath); | |
try { | |
schema = IOUtils.toString(inputStream); |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \ | |
- data '{ "schema": "{ \"type\": \"record\", \"name\": \"Persone\", \"namespace\": \"com.ippontech.kafkatutorialse\", \"fields\": [ { \"name\": \"firstName\", \"type\": \"string\" }, { \"name\": \"lastName\", \"type\": \"string\" }, { \"name\": \"birthDate\", \"type\": \"long\" } ]}" }' \ | |
http://localhost:8081/subjects/persons-avro3-value/versions |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Serde<GenericRecord> genericAvroSerde = new GenericAvroSerde(); | |
boolean isKeySerde = false; | |
genericAvroSerde.configure( | |
Collections.singletonMap( | |
AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, | |
"http://localhost:8081"), | |
isKeySerde); |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
counts.mapValues((windowed, counter) -> | |
{ | |
try { | |
return buildRecord(new LoginAttackCount( | |
windowed.key(), | |
counter, | |
windowed.window().start(), | |
windowed.window().end())); | |
} catch (IOException e) { | |
throw new RuntimeException(e); |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
private static GenericRecord buildRecord(LoginAttackCount loginAttackCount) throws IOException { | |
// avro schema avsc file path. | |
String schemaPath = "src/main/java/loginStream/login-attack-count.avsc"; | |
String schemaString; | |
try (FileInputStream inputStream = new FileInputStream(schemaPath)) { | |
schemaString = org.apache.commons.io.IOUtils.toString(inputStream); | |
} catch (Exception e) { | |
throw new RuntimeException(e); |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Kafka | RabbitMq | ||
---|---|---|---|
Distributed Consumer | v (consumer groups) | v(compete consumers) | |
Distributed Producer | v | v | |
Distributed queues | v Topic partitions | x | |
Clustering system | v Also zookeeper | x | |
Replicated queue | V By default | V By queue configuration | |
Messages persistence | V by retention period | X persistence guarantees aren't strong |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
PCollection<Long> ticks = p | |
// Produce 1 "tick" per second | |
.apply(GenerateSequence.from(0).withRate(1, Duration.standardSeconds(1))) | |
// Window the ticks into 1-minute windows | |
.apply(Window.into(FixedWindows.of(Duration.standardSeconds(1)))) | |
// Use an arbitrary per-window combiner to reduce to 1 element per window | |
.apply(Combine.globally(Count.<Long>combineFn()).withoutDefaults()); | |
String bucketName = options.getBigQuerySchemaConfigBucketName().get(); |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
p.apply("BQ-write", BigQueryIO.write() | |
.to(tableSpec) | |
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER) | |
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND); |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
@ProcessElement | |
public void processElement(@Element JsonNode> element, OutputReceiver<TableRowWithEvent> out) { | |
TableRow convertedRow = new TableRow(); | |
insertLong(element.get("server_time"), "server_time", convertedRow); | |
insertFloat(element.get("screen_dpi"), "screen_dpi", convertedRow); | |
// more transformation to come |