This is an exploratory article delving into various ways to ingest real-time data into Snowflake. The premise of the article is to examine the level of difficulty and effort required to ingest change data capture (CDC) data into Snowflake.
- Using CockroachDB CDC with Apache Pulsar
- Using CockroachDB CDC with Azure Event Hubs
- SaaS Galore: Integrating CockroachDB with Confluent Kafka, FiveTran and Snowflake
- Using CockroachDB CDC with Confluent Cloud Kafka and Schema Registry
- CockroachDB CDC using Minio as cloud storage sink
- CockroachDB CDC using Hadoop Ozone S3 Gateway as cloud storage sink
I work with financial services clients, and it's common to encounter a need for streaming changes in the operational datastore into a data warehouse or a data lake. A former colleague recently reached out for advice on the fastest and most efficient way to load trade data into Snowflake. I've come up with at least three methods, which I will explore in a follow-up series of articles. However, I've decided to first explore Redpanda Connect, a solution that has recently caught my attention. This is by no means a conclusive guide on how changefeed data must be loaded into Snowflake; we're merely exploring the possibilities and discussing the pros and cons in later articles.
CockroachDB changefeeds are an enterprise feature and require a license. In this tutorial, I'm using a free-to-start version of CockroachDB Serverless, which has enterprise changefeeds enabled.
- Deploy a CockroachDB cluster with enterprise changefeeds
- Deploy Redpanda Connect
- Deploy Snowflake
- Verify
- Conclusion
Start an instance of CockroachDB or use the managed service
To enable CDC we need to execute the following commands:
SET CLUSTER SETTING cluster.organization = '<organization name>';
SET CLUSTER SETTING enterprise.license = '<secret>';
SET CLUSTER SETTING kv.rangefeed.enabled = true;
I am using CockroachDB Serverless and the above steps are not necessary. You may confirm whether the changefeeds are indeed enabled using the following command:
SHOW CLUSTER SETTING kv.rangefeed.enabled;
If the value is false
, change it to true
.
Generate sample data
CREATE TABLE office_dogs (
id INT PRIMARY KEY,
name STRING);
INSERT INTO office_dogs VALUES
(1, 'Petee'),
(2, 'Carl');
UPDATE office_dogs SET name = 'Petee H' WHERE id = 1;
We've populated the table and then updated a record, let's add more data to make it interesting
INSERT INTO office_dogs SELECT generate_series(3, 10000), md5(random()::string);
SELECT * FROM office_dogs LIMIT 5;
id,name
1,Petee H
2,Carl
3,6e19280ae649efffa7a58584c7f46032
4,5e4e897f008bb752c8edfa64a3aed356
5,abc0d898318d27f23a43060f89d62e34
SELECT COUNT(*) FROM office_dogs;
I'm running Redpanda Connect in a Docker Compose file
docker compose -f compose-redpanda.yaml up -d
The contents of the file are:
services:
redpanda:
container_name: redpanda-connect
hostname: redpanda-connect
image: docker.redpanda.com/redpandadata/connect
volumes:
- ./redpanda/connect.yaml:/connect.yaml
- /Users/aervits/.ssh/rsa_key.pem:/rsa_key.pem
I will be using the connect.yaml
file as the foundation to connect all the components in this article. For more detailed information, you can refer to the [documentation]](https://docs.redpanda.com/redpanda-connect/guides/getting_started/) provided by Redpanda.
The most basic configuration looks like so:
input:
stdin: {}
pipeline:
processors: []
output:
stdout: {}
Since I'm using CockroachDB input, mine looks like this:
input:
# CockroachDB Input
label: ""
cockroachdb_changefeed:
dsn: postgresql://<user>:<password>@<cockroachdb-cluster>:<port>/<database>?sslmode=verify-full
tls:
skip_cert_verify: true
#enable_renegotiation: false
#root_cas: ""
#root_cas_file: ""
client_certs: []
tables: [table_for_cdc] # No default (required)
cursor_cache: "" # No default (optional)
auto_replay_nacks: true
pipeline:
processors: []
output:
stdout: {}
Leave the pipeline and output as default.
For reference, I'm including the repo with my source code where you can reference the values.
If you have been following along, you may have noticed that I haven't started a changefeed job in CockroachDB. The cockroachdb_changefeed
input directly subscribes to the table, which can be observed by examining the logs using the command docker logs redpanda-connect --follow
. If you look at the connect.yaml
file, the output is sent to stdout
:
{"primary_key":"[9998]","row":"{\"after\": {\"id\": 9998, \"name\": \"0794a9d1c99e8e47ee4515be6e0d736f\"}}","table":"office_dogs"}
{"primary_key":"[9999]","row":"{\"after\": {\"id\": 9999, \"name\": \"c85a6b38154f7e3085d467d567141d45\"}}","table":"office_dogs"}
{"primary_key":"[10000]","row":"{\"after\": {\"id\": 10000, \"name\": \"aae9e0849fff8f47e0371a4c06fb255b\"}}","table":"office_dogs"}
The next step is to configure Snowflake. We are not going to look at the available processors today.
I'm using a Snowflake trial account. You get a generous credit which should be sufficient to complete this tutorial.
We need to create a database and a table where we will output the changefeed data.
CREATE OR REPLACE DATABASE FROM_COCKROACH;
CREATE OR REPLACE TABLE OFFICE_DOGS (RECORD variant);
We also need to create a user with key-pair authentication as we're going to be using the Snowpipe service.
openssl genrsa 2048 | openssl pkcs8 -topk8 -v2 des3 -inform PEM -out rsa_key.p8
We must use an encrypted key as Redpanda doesn't support unencrypted version.
Generate a public key
openssl rsa -in rsa_key.p8 -pubout -out rsa_key.pub
Lastly, generate a pem file from the private key
openssl pkcs8 -in rsa_key.p8 -out rsa_key.pem
In Snowflake, alter the user to use the key pair generated in the previous step.
ALTER USER username SET rsa_public_key='MIIB...';
We can now populate the connect.yaml
file with the required information for the snowflake_put
output. This output type is for commercial use and requires a license, but since we're using it for demo purposes, we are able to proceed.
output:
# Snowflake Output
label: ""
snowflake_put:
account: <snowflake-account>
user: <user>
private_key_file: rsa_key.pem
role: ACCOUNTADMIN
database: <database>
warehouse: <warehouse>
schema: <schema>
stage: "@%implicit_table_stage_name"
path: "path"
upload_parallel_threads: 4
compression: NONE
batching:
count: 10
period: 3s
processors:
- archive:
format: json_array
max_in_flight: 1
If we restart the compose environment and tail the logs, we can see the following:
level=info msg="Running main config from specified file" @service=benthos benthos_version=v4.32.1 path=/connect.yaml
level=info msg="Listening for HTTP requests at: http://0.0.0.0:4195" @service=benthos
level=info msg="Launching a Redpanda Connect instance, use CTRL+C to close" @service=benthos
level=info msg="Output type snowflake_put is now active" @service=benthos label="" path=root.output
level=info msg="Input type cockroachdb_changefeed is now active" @service=benthos label="" path=root.input
Let's look at the implicit table stage and observe if anything is changed.
list @%office_dogs
| dogs/f2f3cf47-d6bc-46f4-88f2-c82519b67481.json | 1312 | 30f709e4962bae9d10b48565d22e9f32 | Wed, 14 Aug 2024 18:58:43 GMT |
| dogs/f6adbf39-3955-4848-93c3-06f873a88078.json | 1312 | 28be7a619ef1e139599077e977ea130b | Wed, 14 Aug 2024 18:58:13 GMT |
| dogs/f8705606-eb07-400a-9ffe-da6834fa1a30.json | 1296 | 5afbdce0e8929fc38a2eb5e0f12b96d6 | Wed, 14 Aug 2024 18:57:29 GMT |
| dogs/f9e5c01a-7dda-4e76-840d-13b8a1e4946a.json | 1296 | 5480c01f1578f67afe2761c7619e9123 | Wed, 14 Aug 2024 18:57:32 GMT |
| dogs/fad4efe7-3f3f-48bc-bdb4-9f0310abcf4d.json | 1312 | 5942c6e2dbaef5ee257d4a9b8e68827d | Wed, 14 Aug 2024 18:58:04 GMT |
The files are ready to be copied into a table. Let's create a pipe
CREATE OR REPLACE PIPE FROM_COCKROACH.PUBLIC.cockroach_pipe AUTO_INGEST = FALSE AS COPY INTO FROM_COCKROACH.PUBLIC.OFFICE_DOGS FROM (SELECT * FROM @%office_dogs) FILE_FORMAT = (TYPE = JSON COMPRESSION = AUTO STRIP_OUTER_ARRAY = TRUE);
The last remaining step is to refresh the pipe.
ALTER PIPE cockroach_pipe REFRESH;
| dogs/ff0871b1-6f49-43a4-a929-958d07f74046.json | SENT |
| dogs/ff131d8d-3781-4cf6-8700-edd50dbb87de.json | SENT |
| dogs/ff216da1-4f9d-4b37-9776-bcd559dd4a6f.json | SENT |
| dogs/ff221430-4c3a-46be-bbc2-d335cc6cc9e3.json | SENT |
| dogs/ffbd7d45-5084-4e36-8907-61874ac652b4.json | SENT |
| dogs/fffb5fa6-23cc-4450-934a-29ccf01c67b9.json | SENT |
Let's query the table in Snowflake:
SELECT * FROM OFFICE_DOGS LIMIT 5;
| { |
| "primary_key": "[5241]", |
| "row": "{\"after\": {\"id\": 5241, \"name\": \"5e0360a0d10d849afbbfa319a50bccf2\"}}", |
| "table": "office_dogs" |
| } |
| { |
| "primary_key": "[5242]", |
| "row": "{\"after\": {\"id\": 5242, \"name\": \"62be250249afe74bfbc5dd356e7b0ad9\"}}", |
| "table": "office_dogs" |
| } |
| { |
| "primary_key": "[5243]", |
| "row": "{\"after\": {\"id\": 5243, \"name\": \"7f286800a8a03e74938d09fdba52f869\"}}", |
| "table": "office_dogs" |
| } |
| { |
| "primary_key": "[5244]", |
| "row": "{\"after\": {\"id\": 5244, \"name\": \"16a330b8f09bcd314f9760ffe26d0ae2\"}}", |
| "table": "office_dogs" |
| }
We expect 10000 rows
SELECT COUNT(*) FROM OFFICE_DOGS;
+----------+
| COUNT(*) |
|----------|
| 10000 |
+----------+
The data is in JSON format, let's create a view and flatten the data out.
CREATE VIEW v_office_dogs AS
SELECT PARSE_JSON(record:row):after:id::INTEGER AS id,
PARSE_JSON(record:row):after:name::STRING AS name FROM OFFICE_DOGS;
Query the view
SELECT * FROM v_office_dogs WHERE id < 6;
+----+----------------------------------+
| ID | NAME |
|----+----------------------------------|
| 1 | Petee H |
| 2 | Carl |
| 3 | 6e19280ae649efffa7a58584c7f46032 |
| 4 | 5e4e897f008bb752c8edfa64a3aed356 |
| 5 | abc0d898318d27f23a43060f89d62e34 |
+----+----------------------------------+
Let's make things a bit more interesting and delete data in CockroachDB.
DELETE FROM office_dogs WHERE name = 'Carl';
DELETE FROM office_dogs WHERE id = 1;
In Snowflake, let's refresh the pipe as of a few minutes ago:
ALTER PIPE cockroach_pipe REFRESH MODIFIED_AFTER='2024-08-14T12:10:00-07:00';
Notice there are a couple of files
+------------------------------------------------+--------+
| File | Status |
|------------------------------------------------+--------|
| dogs/2a4ee400-6b37-4513-97cb-097764a340bc.json | SENT |
| dogs/8f5b5b69-8a00-4dbf-979a-60c3814d96b4.json | SENT |
+------------------------------------------------+--------+
I must caution that if you run the REFRESH
manually, you may cause duplicates in your Snowflake data. We will look at better approaches in a future article.
Let's look at the row count:
+----------+
| COUNT(*) |
|----------|
| 10002 |
+----------+
The removal process didn't properly update in Snowflake as anticipated; it recognized the deleted records but failed to mirror the state in CockroachDB. We need to incorporate additional logic to achieve this. This will be a task for another time.
Lastly, I would like to note that using Redpanda Connect as a compose file is optional. You have the option to run the Docker container by executing the following command:
docker run --rm -it -v ./redpanda/connect.yaml:/connect.yaml -v ./snowflake/rsa_key.pem:/rsa_key.pem docker.redpanda.com/redpandadata/connect run
Today, we explored Redpanda Connect as a means to deliver streaming changefeeds into Snowflake. We've only just begun to delve into this topic, and future articles will build upon the foundations laid today.