Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save layandreas/ac955778ad97b55112301e5efaf07e60 to your computer and use it in GitHub Desktop.
Save layandreas/ac955778ad97b55112301e5efaf07e60 to your computer and use it in GitHub Desktop.
#!/usr/bin/env zsh
# This script will set up a docker compose environment with Kafka, Minio, and
# Kafka Connect. We publish messages to a Kafka topic and then use a
# Kafka Connect S3 sink connector to write those messages to a Minio bucket
# Setup folder structure
mkdir kafka-connect-example
cd kafka-connect-example
mkdir kafka-connect-plugins
# Download and unzip the Confluent JDBC connector
curl -O https://hub-downloads.confluent.io/api/plugins/confluentinc/kafka-connect-s3/versions/10.6.7/confluentinc-kafka-connect-s3-10.6.7.zip
unzip confluentinc-kafka-connect-s3-10.6.7.zip -d ./kafka-connect-plugins/confluentinc-kafka-connect-jdbc;
# Start containers using docker compose
curl https://gist.githubusercontent.com/layandreas/ffb57ec5f102ed0d285c1e8e40838e1a/raw/be4dd51730c931085c58f9c00c761840d188326b/docker-compose.yml > docker-compose.yml
docker compose up -d
# Create minio target bucket to write our messages to
docker exec minio mc alias set local http://localhost:9000 minioadmin minioadmin && docker exec minio mc mb local/my-bucket --ignore-existing
# Create Kafka topic. This is were we will send our messages to
docker exec kafka kafka-topics --create \
--topic kafka_message \
--bootstrap-server localhost:9092 \
--partitions 1 \
--replication-factor 1
# Publish messages to Kafka topic
for i in {1..5}; do
echo "Sending message $i..."
docker exec -i kafka kafka-console-producer --broker-list kafka:9092 --topic my-topic <<EOF
{"id":$i,"name":"Alice","email":"[email protected]"}
EOF
done
echo "✅ Done sending 100 messages."
# Create kafka connect S3 sink
echo "Creating S3 sink connector..."
cat <<EOF | curl -X POST -H "Content-Type: application/json" --data @- http://localhost:8083/connectors
{
"name": "s3-sink",
"config": {
"connector.class": "io.confluent.connect.s3.S3SinkConnector",
"tasks.max": "1",
"topics": "my-topic",
"s3.bucket.name": "my-bucket",
"s3.region": "us-east-1",
"store.url": "http://minio:9000",
"flush.size": "3",
"storage.class": "io.confluent.connect.s3.storage.S3Storage",
"format.class": "io.confluent.connect.s3.format.json.JsonFormat",
"schema.compatibility": "NONE",
"aws.access.key.id": "minioadmin",
"aws.secret.access.key": "minioadmin",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",
"key.converter.schemas.enable": "false"
}
}
EOF
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment