Skip to content

Instantly share code, notes, and snippets.

@soeirosantos
Last active December 2, 2018 20:14
Show Gist options
  • Save soeirosantos/28c8ae4e58ac797b2026ffc31116cdab to your computer and use it in GitHub Desktop.
Save soeirosantos/28c8ae4e58ac797b2026ffc31116cdab to your computer and use it in GitHub Desktop.
Playing around with KSQL and KSQL JDBC Driver

Playing around with KSQL and KSQL JDBC Driver

Start by initializing the environment

$ docker-compose up broker ksql-cli

Connect to the KSQL CLI:

$ docker-compose exec ksql-cli ksql http://ksql-server:8088

Create a Stream

ksql> create stream expiring_content (id varchar, expiring_time bigint) with (kafka_topic='expiring_content', value_format='json');

Execute a simple query

ksql> select * from expiring_content;

Run the TestProducer class (below) to produce some content. Alternatively, you can use:

$ docker-compose exec broker /usr/bin/kafka-console-producer --broker-list broker:9092 --topic my_topic --property "parse.key=true" --property "key.separator=:"
> le_key:{"id":"foo", "expiration_time": 1543781651404}

Follow the steps here https://github.com/mmolimar/ksql-jdbc-driver to package the ksql-jdbc-driver

Run the KSQL query from the Java code - execute the TestKsqlJdbc class.


If you want to try it on Kafka 2.0 replace the version 4.1.2-1 by 5.0.1 in the docker-compose.yaml file. Notice that, currently, the ksql-jdbc-driver won't work on Kafka 2.0

---
version: '2'
services:
zookeeper:
image: confluentinc/cp-zookeeper:4.1.2-1
hostname: zookeeper
container_name: zookeeper
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
broker:
image: confluentinc/cp-enterprise-kafka:4.1.2-1
hostname: broker
container_name: broker
depends_on:
- zookeeper
ports:
- "9092:9092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://broker:9092'
KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: broker:9092
CONFLUENT_METRICS_REPORTER_ZOOKEEPER_CONNECT: zookeeper:2181
CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1
CONFLUENT_METRICS_ENABLE: 'true'
CONFLUENT_SUPPORT_CUSTOMER_ID: 'anonymous'
schema_registry:
image: confluentinc/cp-schema-registry
hostname: schema_registry
container_name: schema_registry
depends_on:
- zookeeper
- broker
ports:
- "8081:8081"
environment:
SCHEMA_REGISTRY_HOST_NAME: schema_registry
SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: 'zookeeper:2181'
connect:
image: confluentinc/cp-kafka-connect:4.1.2-1
hostname: connect
container_name: connect
depends_on:
- zookeeper
- broker
- schema_registry
ports:
- "8083:8083"
environment:
CONNECT_BOOTSTRAP_SERVERS: 'broker:9092'
CONNECT_REST_ADVERTISED_HOST_NAME: connect
CONNECT_REST_PORT: 8083
CONNECT_GROUP_ID: compose-connect-group
CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
CONNECT_OFFSET_FLUSH_INTERVAL_MS: 10000
CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
CONNECT_KEY_CONVERTER: io.confluent.connect.avro.AvroConverter
CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: 'http://schema_registry:8081'
CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: 'http://schema_registry:8081'
CONNECT_INTERNAL_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
CONNECT_INTERNAL_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
CONNECT_ZOOKEEPER_CONNECT: 'zookeeper:2181'
ksql-server:
image: confluentinc/cp-ksql-server:4.1.2-1
hostname: ksql-server
container_name: ksql-server
depends_on:
- broker
- connect
ports:
- "8088:8088"
environment:
KSQL_CONFIG_DIR: "/etc/ksql"
KSQL_LOG4J_OPTS: "-Dlog4j.configuration=file:/etc/ksql/log4j-rolling.properties"
KSQL_BOOTSTRAP_SERVERS: "broker:9092"
KSQL_HOST_NAME: ksql-server
KSQL_APPLICATION_ID: "cp-all-in-one"
KSQL_LISTENERS: "http://0.0.0.0:8088"
KSQL_CACHE_MAX_BYTES_BUFFERING: 0
KSQL_KSQL_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
KSQL_PRODUCER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor"
KSQL_CONSUMER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor"
ksql-cli:
image: confluentinc/cp-ksql-cli:4.1.2-1
container_name: ksql-cli
depends_on:
- broker
- connect
- ksql-server
entrypoint: /bin/sh
tty: true
import com.github.mmolimar.ksql.jdbc.KsqlDriver;
import com.github.mmolimar.ksql.jdbc.resultset.KsqlResultSet;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
import static java.sql.ResultSet.CONCUR_READ_ONLY;
import static java.sql.ResultSet.HOLD_CURSORS_OVER_COMMIT;
import static java.sql.ResultSet.TYPE_FORWARD_ONLY;
public class TestKsqlJdbc {
public static void main(String[] args) throws SQLException {
DriverManager.registerDriver(new KsqlDriver());
Connection conn = DriverManager.getConnection("jdbc:ksql://localhost:8088");
Statement pstmt = conn.createStatement(TYPE_FORWARD_ONLY, CONCUR_READ_ONLY, HOLD_CURSORS_OVER_COMMIT);
KsqlResultSet rs = (KsqlResultSet) pstmt.executeQuery("select * from expiring_content");
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
try {
rs.close();
pstmt.close();
conn .close();
} catch (SQLException e) {
//shhhh
}
}));
while (rs.next()) {
String uri = rs.getValue(3);
Long expirationTime = rs.getValue(4);
System.out.println("---");
System.out.println(uri);
System.out.println(expirationTime);
}
}
}
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import java.sql.Timestamp;
import java.time.LocalDateTime;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
public class TestProducer {
public static void main(String[] args) throws ExecutionException, InterruptedException {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
for (int i=0; i<5; i++) {
ProducerRecord<String, String> record =
new ProducerRecord<>("expiring_content",
"{\"id\":\""+ UUID.randomUUID().toString() +"\"," +
"\"expiration_time\":" + Timestamp.valueOf(LocalDateTime.now().plusDays(5)).getTime() + "}");
producer.send(record).get();
}
producer.close();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment