Skip to content

Instantly share code, notes, and snippets.

@jpallari
Created May 7, 2020 14:10
Show Gist options
  • Save jpallari/6a92be829365bbdd3bad467c17894972 to your computer and use it in GitHub Desktop.
Save jpallari/6a92be829365bbdd3bad467c17894972 to your computer and use it in GitHub Desktop.
Example KSQL app deployment for Kubernetes
---
apiVersion: v1
kind: Service
metadata:
name: demo-ksql-server
labels:
app: demo-ksql-server
spec:
ports:
- port: 8088
selector:
app: demo-ksql-server
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: demo-ksql-server
labels:
app: demo-ksql-server
spec:
replicas: 1
selector:
matchLabels:
app: demo-ksql-server
template:
metadata:
labels:
app: demo-ksql-server
annotations:
prometheus.io/scrape: "true"
prometheus.io/port: "5556"
spec:
containers:
- name: ksql-server
image: confluentinc/cp-ksql-server:5.4.1
imagePullPolicy: IfNotPresent
ports:
- name: server
containerPort: 8088
- name: jmx
containerPort: 5555
resources:
limits:
cpu: "1"
memory: 2Gi
requests:
cpu: 100m
memory: 512Mi
volumeMounts:
- name: ksql-queries
mountPath: /etc/ksql/queries
env:
- name: KSQL_BOOTSTRAP_SERVERS
value: PLAINTEXT://kafka-cp-kafka-headless.default.svc.cluster.local:9092
- name: KSQL_KSQL_SERVICE_ID
value: ksql
- name: KSQL_KSQL_SCHEMA_REGISTRY_URL
value: http://kafka-cp-schema-registry.default.svc.cluster.local:8081
- name: KSQL_OPTS
value: "-XX:MaxRAMPercentage=75.0"
- name: KSQL_KSQL_QUERIES_FILE
value: /etc/ksql/queries/queries.sql
- name: KSQL_JMX_PORT
value: "5555"
volumes:
- name: ksql-queries
configMap:
name: demo-ksql-server-ksql-queries-configmap
---
apiVersion: v1
kind: ConfigMap
metadata:
name: demo-ksql-server-ksql-queries-configmap
labels:
app: demo-ksql-server
data:
queries.sql: |-
-- From http://docs.confluent.io/current/ksql/docs/tutorials/basics-docker.html#create-a-stream-and-table
-- Create a stream pageviews_original from the Kafka topic pageviews, specifying the value_format of DELIMITED
CREATE STREAM pageviews_original (viewtime bigint, userid varchar, pageid varchar) WITH (kafka_topic='pageviews', value_format='DELIMITED', partitions=1, replicas=1);
-- Create a table users_original from the Kafka topic users, specifying the value_format of JSON
CREATE TABLE users_original (registertime BIGINT, gender VARCHAR, regionid VARCHAR, userid VARCHAR) WITH (kafka_topic='users', value_format='JSON', key = 'userid', partitions=1, replicas=1);
-- Create a persistent query by using the CREATE STREAM keywords to precede the SELECT statement
CREATE STREAM pageviews_enriched AS SELECT users_original.userid AS userid, pageid, regionid, gender FROM pageviews_original LEFT JOIN users_original ON pageviews_original.userid = users_original.userid;
-- Create a new persistent query where a condition limits the streams content, using WHERE
CREATE STREAM pageviews_female AS SELECT * FROM pageviews_enriched WHERE gender = 'FEMALE';
-- Create a new persistent query where another condition is met, using LIKE
CREATE STREAM pageviews_female_like_89 WITH (kafka_topic='pageviews_enriched_r8_r9', partitions=1, replicas=1) AS SELECT * FROM pageviews_female WHERE regionid LIKE '%_8' OR regionid LIKE '%_9';
-- Create a new persistent query that counts the pageviews for each region and gender combination in a tumbling window of 30 seconds when the count is greater than one
CREATE TABLE pageviews_regions WITH (VALUE_FORMAT='avro') AS SELECT gender, regionid , COUNT(*) AS numusers FROM pageviews_enriched WINDOW TUMBLING (size 30 second) GROUP BY gender, regionid HAVING COUNT(*) > 1;
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment