Skip to content

Instantly share code, notes, and snippets.

@dbist
Last active March 31, 2022 21:27
Show Gist options
  • Select an option

  • Save dbist/ef50fc96261d48ce1fe2347ea73b894d to your computer and use it in GitHub Desktop.

Select an option

Save dbist/ef50fc96261d48ce1fe2347ea73b894d to your computer and use it in GitHub Desktop.

SaaS Galore: Integrating CockroachDB with Confluent Kafka, FiveTran and Snowflake


This tutorial is meant to demonstrate how CockroachDB Change Data Capture can be integrated with a rich third party ecosystem of products to build sophisticated data pipelines.


Motivation

The problem this tutorial is trying to solve is lack of native Fivetran connector for CockroachDB. My customer has built their analytics pipeline based on Fivetran. Given there is no native integration, their next best guess was to set up a Postgres connector:

fivetran_pg

CockroachDB is PostgreSQL wire compatible but it is not correct to assume it is 1:1.

Let's attempt to configure the connector

fivetran_pg2

Eventually you will hit a snag with error java.lang.NumberFormatException: For input string: "CCL".

The problem stems from the way Fivetran parses the output of SELECT VERSION() query. In CockroachDB, the output is CockroachDB CCL v21.2.5 (x86_64-unknown-linux-gnu, built 2022/02/07 21:01:07, go1.16.6) and in PostgreSQL it is PostgreSQL 14.2 on x86_64-apple-darwin21.3.0, compiled by Apple clang version 13.0.0 (clang-1300.0.29.30), 64-bit or some version of that. There is an open thread with Fivetran support requesting a CockroachDB connector. In the meantime, I have a problem to solve and intuitively, I suggested to leverage CockroachDB Change Data Capture. We quickly looked at the available Fivetran connectors and lo and behold, both Kafka and S3 connectors are available. I had a potential solution. My goal was to configure CockroachDB CDC outputing to Kafka, then Fivetran would pick up the topics and deliver to Snowflake. Let's see how it all went down.

All of the products discussed in this tutorial are available as SaaS. I'm going to attempt to use all free tier or trial services. One note, CockroachDB CDC is not available in free tier today. You have to use a dedicated instance, I'm using CockroachDB Dedicated. Enterprise CDC in free tier is something we're considering. Stay tuned.

Disclaimer: I am only familiar with the tools discussed on the surface. I don't claim expert knowledge of any products. This tutorial is intended to demonstrate the art of possible. For best practices, refer to the product documentation.

High Level Steps

  • Create and configure a Confluent Cloud Kafka cluster
  • Create and configure a Cockroach Cloud Dedicated cluster
  • Create and configure a Snowflake instance
  • Create and configure a Fivetran connector for Confluent Kafka
  • Create and configure a Fivetran destination for Snowflake
  • Verify

Step by step instructions

Create and configure a Confluent Cloud Kafka cluster

Create an account with Confluent Cloud, then create a cluster.

Create cluster: Basic

kafka_create_cluster

Region/zones: Google Cloud Region: us-east4 Availability: Single zone

kafka_create1

Continue

Cluster name: cluster_0 Launch cluster

Install Confluent CLI

curl -sL --http1.1 https://cnfl.io/cli | sh -s -- latest
export PATH=$(pwd)/bin:$PATH 

Login with Confluent CLI

confluent login --save

List the available environments and set the active environment context

confluent environment list
confluent environment use <envid>

List the current Kafka clusters and set the active cluster context

confluent kafka cluster list
confluent kafka cluster use <clusterid>

Create an API key and secret and store to seamlessly interact with the environment via CLI

confluent api-key create --resource <clusterid>
confluent api-key store --resource <clusterid> --force

Get a Kafka Endpoint, we will need it when we set up CockroachDB CDC

confluent kafka cluster describe <clusterid>

I am going to use the built-in TPC-C workload for this tutorial and will proactively create a few topics in Kafka based on the CockroachDB table names. Alternatively, CockroachDB CDC can create the topics. Kafka cluster must have auto.create.topics.enable setting set to true for this to work.

I will create a single changefeed tracking multiple tables, the tables I will use are customer, district and history.

confluent kafka topic create customer --partitions 6
confluent kafka topic create district --partitions 6
confluent kafka topic create history --partitions 6

After creating the topics, we can start a Kafka consumer to verify data flows correctly.

confluent api-key use <API KEY> --resource <clusterid>
confluent kafka topic consume district

We just need to verify with a single consumer, if you're curious, you can open a few other terminal windows and consume those topics as well

confluent kafka topic consume customer
confluent kafka topic consume history

We're done with the Confluent Cloud part, let's switch to CockroachDBD for the changefeed configuration.

Create and configure a Cockroach Cloud Dedicated cluster

Given a Dedicated cluster, we need to connect to the cluster and enable changefeeds

SET CLUSTER SETTING kv.rangefeed.enabled = true;

At this point, we can start writing data and have CDC push it to Kafka. I am going to use the TPC-C workload that comes built in with the Cockroach binary.

cockroach workload fixtures import tpcc --warehouses=10 'postgresql://user@cluster.cockroachlabs.cloud:26257/tpcc?sslmode=verify-full&sslrootcert=/path/certs/cluster-ca.crt'
cockroach workload run tpcc --warehouses=10 --ramp=3m --duration=1h 'postgresql://user@cluster.cockroachlabs.cloud:26257/tpcc?sslmode=verify-full&sslrootcert=/path/certs/cluster-ca.crt'

At this point we have a Kafka consumer terminal waiting for incoming messages on the district topic. I have a tpcc workload terminal window open generating workload on CockroachDB. I also have another CockroachDB SQL client window open to set up a changefeed.

I'm using the following SQL syntax to create a changefeed on the three tables

use tpcc;
CREATE CHANGEFEED FOR TABLE customer, district, history INTO "kafka://confluentcluster.confluent.cloud:9092?tls_enabled=true&sasl_enabled=true&sasl_user=<APIKEY>&sasl_password=<url-encoded secret>&sasl_mechanism=PLAIN" WITH updated, key_in_value, format = json;

where sasl_password is the url-encoded secret from Confluent Cloud. I used the following service https://www.urlencoder.org/ to url-encode the secret.

At this point we have a working changefeed and your Confluent consumer should start showing signs of life

Here are my terminal windows, from top to bottom: Kafka consumer, CockroachDB SQL client with the job ID of the changefeed and an active TPC-C workload window generating load on the bottom.

terminal

We can now set up Snowflake for our destination warehouse

Create and configure a Snowflake instance

Create a Snowflake account, navigate to databases and create a destination database.

snowflake_databases

I called mine fivetran.

We're in the final stretch, we can now switch to Fivetran and connect the dots.

Create and configure a Fivetran connector for Confluent Kafka

Create an account with Fivetran.

  • Add Connector Confluent Cloud
  • Destination schema confluent_cloud
  • Consumer Group cockroachdb
  • Servers yourcluster.confluent.cloud:9092
  • Message Type json
  • Sync Type Packed
  • Security Protocol SASL
  • API Key your original Confluent Cloud API key
  • API Secret your original Confluent Cloud secret
  • Save & Test
  • Select your data
  • Start initial Sync

fivetran_kafka

fivtran_success

Create and configure a Fivetran destination for Snowflake

  • Add Destination
  • Snowflake
  • Host youraccount.snowflakecomputing.com
  • Port 443
  • User your Snowflake user
  • Database tpcc
  • Auth PASSWORD

fivetran_snowflake_success

Verify

After Fivetran is done syncing, you can navigate to Snowflake and view your data under Databases / Fivetran / confluent_cloud / Tables / District / Data Preview

fivetran_sync

snowflake_preview

Hope you found this useful!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment