Skip to content

Instantly share code, notes, and snippets.

@astromechza
Created June 10, 2024 07:41
Show Gist options
  • Save astromechza/39c70f5a5844a5c410b6bf3507095491 to your computer and use it in GitHub Desktop.
Save astromechza/39c70f5a5844a5c410b6bf3507095491 to your computer and use it in GitHub Desktop.
blog score-compose kafka provisioner
--- a/.score-compose/00-kafka-topic-provisioner.provisioners.yaml
+++ b/.score-compose/00-kafka-topic-provisioner.provisioners.yaml
@@ -1 +1,6 @@
-{}
+- uri: template://custom-provisioners/kafka-topic
+ type: kafka-topic
+ outputs: |
+ host: unknown
+ port: "9092"
+ name: unknown
--- a/.score-compose/00-kafka-topic-provisioner.provisioners.yaml
+++ b/.score-compose/00-kafka-topic-provisioner.provisioners.yaml
@@ -1,6 +1,22 @@
- uri: template://custom-provisioners/kafka-topic
type: kafka-topic
+ state: |
+ topic: {{ dig "topic" (print "topic-" (randAlphaNum 6)) .State | quote }}
+ shared: |
+ shared_kafka_instance_name: {{ dig "shared_kafka_instance_name" (print "kafka-" (randAlphaNum 6)) .Shared | quote }}
+ services: |
+ {{ .Shared.shared_kafka_instance_name }}:
+ image: bitnami/kafka:latest
+ restart: always
+ environment:
+ KAFKA_CFG_NODE_ID: "0"
+ KAFKA_CFG_PROCESS_ROLES: controller,broker
+ KAFKA_CFG_LISTENERS: "PLAINTEXT://:9092,CONTROLLER://:9093"
+ KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: "CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT"
+ KAFKA_CFG_CONTROLLER_QUORUM_VOTERS: "0@{{ .Shared.shared_kafka_instance_name }}:9093"
+ KAFKA_CFG_CONTROLLER_LISTENER_NAMES: CONTROLLER
+ KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE: "false"
outputs: |
- host: unknown
+ host: {{ .Shared.shared_kafka_instance_name }}
port: "9092"
- name: unknown
+ name: {{ .State.topic }}
+ num_partitions: 3
--- a/.score-compose/00-kafka-topic-provisioner.provisioners.yaml
+++ b/.score-compose/00-kafka-topic-provisioner.provisioners.yaml
@@ -16,6 +16,11 @@
KAFKA_CFG_CONTROLLER_QUORUM_VOTERS: "0@{{ .Shared.shared_kafka_instance_name }}:9093"
KAFKA_CFG_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE: "false"
+ healthcheck:
+ test: ["CMD", "kafka-topics.sh", "--list", "--bootstrap-server=localhost:9092"]
+ interval: 2s
+ timeout: 2s
+ retries: 10
outputs: |
--- a/.score-compose/00-kafka-topic-provisioner.provisioners.yaml
+++ b/.score-compose/00-kafka-topic-provisioner.provisioners.yaml
@@ -21,6 +21,17 @@
interval: 2s
timeout: 2s
retries: 10
+ {{ .State.topic }}-init:
+ image: bitnami/kafka:latest
+ entrypoint: ["/bin/sh"]
+ command: ["-c", "kafka-topics.sh --topic={{.State.topic}} --bootstrap-server=localhost:9092 --describe || kafka-topics.sh --topic={{.State.topic}} --bootstrap-server=localhost:9092 --create --partitions=3"]
+ network_mode: "service:{{ .Shared.shared_kafka_instance_name }}"
+ labels:
+ dev.score.compose.labels.is-init-container: "true"
+ depends_on:
+ {{ .Shared.shared_kafka_instance_name }}:
+ condition: service_healthy
+ restart: true
outputs: |
--- a/.score-compose/00-kafka-topic-provisioner.provisioners.yaml
+++ b/.score-compose/00-kafka-topic-provisioner.provisioners.yaml
@@ -21,6 +21,10 @@
interval: 2s
timeout: 2s
retries: 10
+ volumes:
+ - type: volume
+ source: {{ .Shared.shared_kafka_instance_name }}-data
+ target: /bitnami/kafka
{{ .State.topic }}-init:
image: bitnami/kafka:latest
entrypoint: ["/bin/sh"]
@@ -32,6 +36,9 @@
{{ .Shared.shared_kafka_instance_name }}:
condition: service_healthy
restart: true
+ volumes: |
+ {{ .Shared.shared_kafka_instance_name }}-data:
+ driver: local
outputs: |
--- a/.score-compose/00-kafka-topic-provisioner.provisioners.yaml
+++ b/.score-compose/00-kafka-topic-provisioner.provisioners.yaml
@@ -21,6 +21,12 @@
interval: 2s
timeout: 2s
retries: 10
+ {{ $publishPort := (dig "annotations" "compose.score.dev/publish-port" "0" .Metadata | atoi) }}
+ {{ if ne $publishPort 0 }}
+ ports:
+ - target: 9092
+ published: {{ $publishPort }}
+ {{ end }}
volumes:
FROM golang:1.22-alpine AS builder
WORKDIR /app
ENV CGO_ENABLED=0 GOOS=linux GOARCH=amd64 GOWORK=off
COPY go.mod go.sum ./
RUN --mount=type=cache,target=/go/pkg/mod \
go mod download
RUN --mount=target=. \
--mount=type=cache,target=/go/pkg/mod \
--mount=type=cache,target=/root/.cache/go-build \
go build -o /sample .
FROM gcr.io/distroless/static as final
COPY --from=builder /sample .
ENTRYPOINT ["/sample"]
package main
import (
"context"
"fmt"
"log"
"log/slog"
"os"
"time"
"github.com/segmentio/kafka-go"
)
func main() {
host, port, topicName := os.Getenv("KAFKA_HOST"), os.Getenv("KAFKA_PORT"), os.Getenv("KAFKA_TOPIC")
if host == "" {
log.Fatal("KAFKA_HOST is empty or not set")
} else if port == "" {
log.Fatal("KAFKA_PORT is empty or not set")
} else if topicName == "" {
log.Fatal("KAFKA_TOPIC is empty or not set")
}
log.Printf("connecting to %s:%s/%s and sending 'ping' every 5s", host, port, topicName)
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()
conn, err := kafka.DialLeader(ctx, "tcp", fmt.Sprintf("%s:%s", host, port), topicName, 0)
if err != nil {
log.Fatalf("failed to dial to kafka: %v", err)
}
defer conn.Close()
for {
_ = conn.SetWriteDeadline(time.Now().Add(time.Second * 5))
if r, err := conn.Write([]byte("ping")); err != nil {
log.Printf("error: failed to ping: %v", err)
} else {
slog.Info("successfully sent message", slog.Int("#bytes", r))
}
time.Sleep(time.Second * 5)
}
}
apiVersion: score.dev/v1b1
metadata:
name: sample-app
containers:
main:
image: .
variables:
KAFKA_HOST: ${resources.bus.host}
KAFKA_PORT: ${resources.bus.port}
KAFKA_TOPIC: ${resources.bus.name}
resources:
bus:
type: kafka-topic
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment