I hereby claim:
- I am gwenshap on github.
- I am gwenshap (https://keybase.io/gwenshap) on keybase.
- I have a public key whose fingerprint is DBF5 7259 F7B0 DF66 D3F6 FB20 7385 5CE2 D0A6 7B5A
To claim this, I am signing this object:
echo '{"name":"s3-sink-json-1", "config":{"connector.class":"io.confluent.connect.s3.S3SinkConnector", "topics":"mysql.login", "flush.size":1, "format.class":"io.confluent.connect.s3.format.json.JsonFormat","s3.bucket.name":"gwen-s3-test","storage.class":"io.confluent.connect.s3.storage.S3Storage","schema.generator.class":"io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator","partitioner.class":"io.confluent.connect.storage.partitioner.DefaultPartitioner"}}' | curl -X POST -d @- http://localhost:8083/connectors --header "content-Type:application/json" |
curl -X PUT -d "{\"connector.class\":\"JdbcSourceConnector\",\"connection.url\":\"jdbc:mysql://127.0.0.1:3306/test?user=root\",\"mode\":\"timestamp\", \"table.whitelist\":\"login\", \"validate.non.null\":false, \"timestamp.column.name\":\"login_time\", \"topic.prefix\":\"mysql.\", \"transforms\":\"InsertSource\", \"transforms.InsertSource.type\":\"org.apache.kafka.connect.transforms.InsertField\$Value\", \"transforms.InsertSource.static.field\":\"data_source\", \"transforms.InsertSource.static.value\":\"MyLocalMySQL\"}" localhost:8083/connector-plugins/JdbcSourceConnector/config/validate --header "content-Type:application/json" | python -m json.tool |
echo '{"name":"mysql-login-connector", "connector.class":"JdbcSourceConnector", "connection.url":"jdbc:mysql://127.0.0.1:3306/test?user=root", "mode":"timestamp", "table.whitelist":"login", "validate.non.null":false, "timestamp.column.name":"login_time", "topic.prefix":"mysql.", "transforms":"InsertSource", "transforms.InsertSource.type":"org.apache.kafka.connect.transforms.InsertField$Value", "transforms.InsertSource.static.field":"data_source", "transforms.InsertSource.static.value":"MyLocalMySQL"}"}' | curl -X PUT -d @- localhost:8083/connector-plugins/JdbcSourceConnector/config/validate --header "content-Type:application/json" |
echo '{"name":"mysql-login-connector", "config":{"connector.class":"JdbcSourceConnector", "connection.url":"jdbc:mysql://127.0.0.1:3306/test?user=root", "mode":"timestamp", "table.whitelist":"login", "validate.non.null":false, "timestamp.column.name":"login_time", "topic.prefix":"mysql.", "transforms":"InsertSource", "transforms.InsertSource.type":"org.apache.kafka.connect.transforms.InsertField$Value", "transforms.InsertSource.static.field":"data_source", "transforms.InsertSource.static.value":"MyLocalMySQL"}}' | curl -X POST -d @- http://localhost:8083/connectors --header "content-Type:application/json" |
Producer: Hey, bootstrap server, I want to tell a Kafka joke, who do I tell it to? | |
Broker 5: Broker 1 is our Kafka jokes leader, talk to them. | |
Producer: Hey, Broker 1, here's a great Kafka joke, make sure you friends all hear it too! | |
Broker 2: Hey Broker 1, do you have any new Kafka jokes for me? | |
Broker 1: Here's the latest! | |
Broker 3: .... (for 10 seconds) | |
Broker 5: OK, no problems, everyone got it. Great joke! Thanks! | |
Broker 3: Oh! Good morning! Hey Broker 1, any new jokes? | |
Broker 1: Sure! Here's the latest! | |
Consumer: Hey, bootstrap server, I want the latest Kafka jokes, who do I talk to? |
I hereby claim:
To claim this, I am signing this object:
@Test | |
public void testDrainTwiceWithSnappy() throws Exception { | |
long now = time.milliseconds(); | |
RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, CompressionType.SNAPPY, 10L, 100L, false, metrics, time, metricTags); | |
int appends = 56; | |
for (int i = 0; i < appends; i++) { | |
accum.append(tp1, key, value, null); | |
assertEquals("No partitions should be ready.", 0, accum.ready(cluster, now).readyNodes.size()); | |
} | |
accum.append(tp1, key, value, null); |
++ @Test | |
public void testDrainTwiceWithSnappy() throws Exception { | |
long now = time.milliseconds(); | |
RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, CompressionType.SNAPPY, 10L, 100L, false, metrics, time, metricTags); | |
int appends = 56; | |
for (int i = 0; i < appends; i++) { | |
accum.append(tp1, key, value, null); | |
assertEquals("No partitions should be ready.", 0, accum.ready(cluster, now).readyNodes.size()); | |
} | |
accum.append(tp1, key, value, null); |
public enum ApiVersion { | |
KAFKA_082("0.8.2.X"), | |
KAFKA_083("0.8.3.X"); | |
private final String version; | |
public boolean onOrAfter(ApiVersion other) { | |
return compareTo(other) >= 0; | |
} |
diff --git a/core/src/test/scala/unit/kafka/zk/EmbeddedZookeeper.scala b/core/src/test/scala/unit/kafka/zk/EmbeddedZookeeper.scala | |
index d883bde..3021a8c 100644 | |
--- a/core/src/test/scala/unit/kafka/zk/EmbeddedZookeeper.scala | |
+++ b/core/src/test/scala/unit/kafka/zk/EmbeddedZookeeper.scala | |
@@ -19,6 +19,7 @@ package kafka.zk | |
import org.apache.zookeeper.server.ZooKeeperServer | |
import org.apache.zookeeper.server.NIOServerCnxn | |
+import org.apache.zookeeper.server.NIOServerCnxnFactory | |
import kafka.utils.TestUtils |