Skip to content

Instantly share code, notes, and snippets.

echo '{"name":"s3-sink-json-1", "config":{"connector.class":"io.confluent.connect.s3.S3SinkConnector", "topics":"mysql.login", "flush.size":1, "format.class":"io.confluent.connect.s3.format.json.JsonFormat","s3.bucket.name":"gwen-s3-test","storage.class":"io.confluent.connect.s3.storage.S3Storage","schema.generator.class":"io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator","partitioner.class":"io.confluent.connect.storage.partitioner.DefaultPartitioner"}}' | curl -X POST -d @- http://localhost:8083/connectors --header "content-Type:application/json"
@gwenshap
gwenshap / gist:32be0fc68b8a422a3f4917e512d31386
Created March 24, 2017 20:17
validate connector with json tool
curl -X PUT -d "{\"connector.class\":\"JdbcSourceConnector\",\"connection.url\":\"jdbc:mysql://127.0.0.1:3306/test?user=root\",\"mode\":\"timestamp\", \"table.whitelist\":\"login\", \"validate.non.null\":false, \"timestamp.column.name\":\"login_time\", \"topic.prefix\":\"mysql.\", \"transforms\":\"InsertSource\", \"transforms.InsertSource.type\":\"org.apache.kafka.connect.transforms.InsertField\$Value\", \"transforms.InsertSource.static.field\":\"data_source\", \"transforms.InsertSource.static.value\":\"MyLocalMySQL\"}" localhost:8083/connector-plugins/JdbcSourceConnector/config/validate --header "content-Type:application/json" | python -m json.tool
@gwenshap
gwenshap / gist:c4a14021b8f612f136a7f2a8527c0805
Created March 24, 2017 20:16
Validate connector with simple transformation
echo '{"name":"mysql-login-connector", "connector.class":"JdbcSourceConnector", "connection.url":"jdbc:mysql://127.0.0.1:3306/test?user=root", "mode":"timestamp", "table.whitelist":"login", "validate.non.null":false, "timestamp.column.name":"login_time", "topic.prefix":"mysql.", "transforms":"InsertSource", "transforms.InsertSource.type":"org.apache.kafka.connect.transforms.InsertField$Value", "transforms.InsertSource.static.field":"data_source", "transforms.InsertSource.static.value":"MyLocalMySQL"}"}' | curl -X PUT -d @- localhost:8083/connector-plugins/JdbcSourceConnector/config/validate --header "content-Type:application/json"
@gwenshap
gwenshap / gist:e21c10979103bee512867b68486562f0
Created March 24, 2017 20:15
Create connector with a simple transformation
echo '{"name":"mysql-login-connector", "config":{"connector.class":"JdbcSourceConnector", "connection.url":"jdbc:mysql://127.0.0.1:3306/test?user=root", "mode":"timestamp", "table.whitelist":"login", "validate.non.null":false, "timestamp.column.name":"login_time", "topic.prefix":"mysql.", "transforms":"InsertSource", "transforms.InsertSource.type":"org.apache.kafka.connect.transforms.InsertField$Value", "transforms.InsertSource.static.field":"data_source", "transforms.InsertSource.static.value":"MyLocalMySQL"}}' | curl -X POST -d @- http://localhost:8083/connectors --header "content-Type:application/json"
Producer: Hey, bootstrap server, I want to tell a Kafka joke, who do I tell it to?
Broker 5: Broker 1 is our Kafka jokes leader, talk to them.
Producer: Hey, Broker 1, here's a great Kafka joke, make sure you friends all hear it too!
Broker 2: Hey Broker 1, do you have any new Kafka jokes for me?
Broker 1: Here's the latest!
Broker 3: .... (for 10 seconds)
Broker 5: OK, no problems, everyone got it. Great joke! Thanks!
Broker 3: Oh! Good morning! Hey Broker 1, any new jokes?
Broker 1: Sure! Here's the latest!
Consumer: Hey, bootstrap server, I want the latest Kafka jokes, who do I talk to?

Keybase proof

I hereby claim:

  • I am gwenshap on github.
  • I am gwenshap (https://keybase.io/gwenshap) on keybase.
  • I have a public key whose fingerprint is DBF5 7259 F7B0 DF66 D3F6 FB20 7385 5CE2 D0A6 7B5A

To claim this, I am signing this object:

@Test
public void testDrainTwiceWithSnappy() throws Exception {
long now = time.milliseconds();
RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, CompressionType.SNAPPY, 10L, 100L, false, metrics, time, metricTags);
int appends = 56;
for (int i = 0; i < appends; i++) {
accum.append(tp1, key, value, null);
assertEquals("No partitions should be ready.", 0, accum.ready(cluster, now).readyNodes.size());
}
accum.append(tp1, key, value, null);
++ @Test
public void testDrainTwiceWithSnappy() throws Exception {
long now = time.milliseconds();
RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, CompressionType.SNAPPY, 10L, 100L, false, metrics, time, metricTags);
int appends = 56;
for (int i = 0; i < appends; i++) {
accum.append(tp1, key, value, null);
assertEquals("No partitions should be ready.", 0, accum.ready(cluster, now).readyNodes.size());
}
accum.append(tp1, key, value, null);
public enum ApiVersion {
KAFKA_082("0.8.2.X"),
KAFKA_083("0.8.3.X");
private final String version;
public boolean onOrAfter(ApiVersion other) {
return compareTo(other) >= 0;
}
@gwenshap
gwenshap / gist:d95b36e0bced53cab5bb
Last active August 29, 2015 14:04
Changes to EmbeddedZookeeper
diff --git a/core/src/test/scala/unit/kafka/zk/EmbeddedZookeeper.scala b/core/src/test/scala/unit/kafka/zk/EmbeddedZookeeper.scala
index d883bde..3021a8c 100644
--- a/core/src/test/scala/unit/kafka/zk/EmbeddedZookeeper.scala
+++ b/core/src/test/scala/unit/kafka/zk/EmbeddedZookeeper.scala
@@ -19,6 +19,7 @@ package kafka.zk
import org.apache.zookeeper.server.ZooKeeperServer
import org.apache.zookeeper.server.NIOServerCnxn
+import org.apache.zookeeper.server.NIOServerCnxnFactory
import kafka.utils.TestUtils