9:47 put link to prep episode
2:06 show starts! 7:52 intro 10:03 NEWS! 10:35 [blog] smarter rebalances in Kafka 12:55 Introducing Meetup Hub
package io.confluent.developer.iqrest; | |
import org.apache.kafka.common.serialization.Serde; | |
import org.apache.kafka.common.serialization.Serdes; | |
import org.apache.kafka.streams.KafkaStreams; | |
import org.apache.kafka.streams.StoreQueryParameters; | |
import org.apache.kafka.streams.StreamsBuilder; | |
import org.apache.kafka.streams.kstream.Consumed; | |
import org.apache.kafka.streams.kstream.Materialized; | |
import org.apache.kafka.streams.state.QueryableStoreTypes; |
SET 'auto.offset.reset' = 'earliest'; | |
CREATE STREAM twitter_raw ( \ | |
CreatedAt bigint,Id bigint, Text VARCHAR, SOURCE VARCHAR, Truncated VARCHAR, InReplyToStatusId VARCHAR, InReplyToUserId VARCHAR, InReplyToScreenName VARCHAR, GeoLocation VARCHAR, Place VARCHAR, Favorited VARCHAR, Retweeted VARCHAR, FavoriteCount VARCHAR, User VARCHAR, Retweet VARCHAR, Contributors VARCHAR, RetweetCount VARCHAR, RetweetedByMe VARCHAR, CurrentUserRetweetId VARCHAR, PossiblySensitive VARCHAR, Lang VARCHAR, WithheldInCountries VARCHAR, HashtagEntities VARCHAR, UserMentionEntities VARCHAR, MediaEntities VARCHAR, SymbolEntities VARCHAR, URLEntities VARCHAR) \ | |
WITH (KAFKA_TOPIC='twitter_json_01', partitions=12, VALUE_FORMAT='JSON'); | |
CREATE STREAM twitter AS \ | |
SELECT TIMESTAMPTOSTRING(CreatedAt, 'yyyy-MM-dd HH:mm:ss.SSS') AS CreatedAt,\ | |
EXTRACTJSONFIELD(user,'$.Name') AS user_Name, \ | |
EXTRACTJSONFIELD(user,'$.ScreenName') AS user_ScreenName, \ |
import org.apache.kafka.streams.kstream.KGroupedStream | |
import org.apache.kafka.streams.kstream.KStream | |
import org.springframework.stereotype.Component | |
import org.springframework.cloud.stream.annotation.StreamListener | |
@Component | |
class CurrencyProcessing { | |
@StreamListener | |
fun processCurrency(input: KStream<String, Double>) { |
Producer | |
Setup | |
bin/kafka-topics.sh --zookeeper localhost:2181/kafka-local --create --topic test-rep-one --partitions 6 --replication-factor 1 | |
bin/kafka-topics.sh --zookeeper localhost:2181/kafka-local --create --topic test-rep-two --partitions 6 --replication-factor 3 | |
Single thread, no replication | |
bin/kafka-run-class.sh org.apache.kafka.tools.ProducerPerformance --print-metrics --topic test-rep-one --num-records 6000000 --throughput 100000 --record-size 100 --producer-props bootstrap.servers=kafka_host:9092 buffer.memory=67108864 batch.size=8196 | |
Single-thread, async 3x replication |
/* The MIT License (MIT) | |
* Copyright (c) 2012 Carl Eriksson | |
* | |
* Permission is hereby granted, free of charge, to any person obtaininga | |
* copy of this software and associated documentation files (the "Software"), | |
* to deal in the Software without restriction,including without limitation | |
* the rights to use, copy, modify, merge, publish, distribute, sublicense, | |
* and/or sell copies of the Software, and to permit persons to whom the Software | |
* is furnished to do so, subject to the following conditions: | |
* |
# a comma separated list of the the ccloud broker endpoints. e.g. | |
bootstrap.servers=r0.great-app.confluent.aws.prod.cloud:9092,r1.great-app.confluent.aws.prod.cloud:9093,r2.great-app.confluent.aws.prod.cloud:9094 | |
ksql.sink.replicas=3 | |
replication.factor=3 | |
security.protocol=SASL_SSL | |
sasl.mechanism=PLAIN | |
sasl.jaas.config=\ | |
org.apache.kafka.common.security.plain.PlainLoginModule required \ | |
username="<confluent cloud access key>" \ | |
password="<confluent cloud secret>"; |
package com.nyjavasig.how.hazelcast.demo; | |
import java.io.InputStream; | |
import java.io.InputStreamReader; | |
import java.io.LineNumberReader; | |
import java.util.HashMap; | |
import java.util.Map; | |
import java.util.regex.Pattern; | |
public class WordUtil { |