Skip to content

Instantly share code, notes, and snippets.

@dbist
Last active August 15, 2024 14:09
Show Gist options
  • Save dbist/c022a646a60e65537c316f8fba80c97d to your computer and use it in GitHub Desktop.
Save dbist/c022a646a60e65537c316f8fba80c97d to your computer and use it in GitHub Desktop.
cockroachdb-redpanda-snowflake.md

Tour of Snowflake ingestion using CockroachDB and Redpanda Connect


This is an exploratory article delving into various ways to ingest real-time data into Snowflake. The premise of the article is to examine the level of difficulty and effort required to ingest change data capture (CDC) data into Snowflake.


Previous articles on Snowflake

Previous articles on CockroachDB CDC


Motivation

I work with financial services clients, and it's common to encounter a need for streaming changes in the operational datastore into a data warehouse or a data lake. A former colleague recently reached out for advice on the fastest and most efficient way to load trade data into Snowflake. I've come up with at least three methods, which I will explore in a follow-up series of articles. However, I've decided to first explore Redpanda Connect, a solution that has recently caught my attention. This is by no means a conclusive guide on how changefeed data must be loaded into Snowflake; we're merely exploring the possibilities and discussing the pros and cons in later articles.

CockroachDB changefeeds are an enterprise feature and require a license. In this tutorial, I'm using a free-to-start version of CockroachDB Serverless, which has enterprise changefeeds enabled.

High Level Steps

  • Deploy a CockroachDB cluster with enterprise changefeeds
  • Deploy Redpanda Connect
  • Deploy Snowflake
  • Verify
  • Conclusion

Step by step instructions

Deploy a CockroachDB cluster with enterprise changefeeds

Start an instance of CockroachDB or use the managed service

To enable CDC we need to execute the following commands:

SET CLUSTER SETTING cluster.organization = '<organization name>';

SET CLUSTER SETTING enterprise.license = '<secret>';

SET CLUSTER SETTING kv.rangefeed.enabled = true;

I am using CockroachDB Serverless and the above steps are not necessary. You may confirm whether the changefeeds are indeed enabled using the following command:

SHOW CLUSTER SETTING kv.rangefeed.enabled;

If the value is false, change it to true.

Generate sample data

CREATE TABLE office_dogs (
     id INT PRIMARY KEY,
     name STRING);

INSERT INTO office_dogs VALUES
   (1, 'Petee'),
   (2, 'Carl');

UPDATE office_dogs SET name = 'Petee H' WHERE id = 1;

We've populated the table and then updated a record, let's add more data to make it interesting

INSERT INTO office_dogs SELECT generate_series(3, 10000), md5(random()::string);
SELECT * FROM office_dogs LIMIT 5;
id,name
1,Petee H
2,Carl
3,6e19280ae649efffa7a58584c7f46032
4,5e4e897f008bb752c8edfa64a3aed356
5,abc0d898318d27f23a43060f89d62e34
SELECT COUNT(*) FROM office_dogs;

Deploy Redpanda Connect

I'm running Redpanda Connect in a Docker Compose file

docker compose -f compose-redpanda.yaml up -d

The contents of the file are:

services:

  redpanda:
    container_name: redpanda-connect
    hostname: redpanda-connect
    image: docker.redpanda.com/redpandadata/connect
    volumes:
      - ./redpanda/connect.yaml:/connect.yaml
      - /Users/aervits/.ssh/rsa_key.pem:/rsa_key.pem

I will be using the connect.yaml file as the foundation to connect all the components in this article. For more detailed information, you can refer to the [documentation]](https://docs.redpanda.com/redpanda-connect/guides/getting_started/) provided by Redpanda.

The most basic configuration looks like so:

input:
  stdin: {}

pipeline:
  processors: []

output:
  stdout: {}

Since I'm using CockroachDB input, mine looks like this:

input:
  # CockroachDB Input
  label: ""
  cockroachdb_changefeed:
    dsn: postgresql://<user>:<password>@<cockroachdb-cluster>:<port>/<database>?sslmode=verify-full
    tls:
      skip_cert_verify: true
      #enable_renegotiation: false
      #root_cas: ""
      #root_cas_file: ""
      client_certs: []
    tables: [table_for_cdc] # No default (required)
    cursor_cache: "" # No default (optional)
    auto_replay_nacks: true

pipeline:
  processors: []

output:
  stdout: {}

Leave the pipeline and output as default.

For reference, I'm including the repo with my source code where you can reference the values.

If you have been following along, you may have noticed that I haven't started a changefeed job in CockroachDB. The cockroachdb_changefeed input directly subscribes to the table, which can be observed by examining the logs using the command docker logs redpanda-connect --follow. If you look at the connect.yaml file, the output is sent to stdout:

{"primary_key":"[9998]","row":"{\"after\": {\"id\": 9998, \"name\": \"0794a9d1c99e8e47ee4515be6e0d736f\"}}","table":"office_dogs"}
{"primary_key":"[9999]","row":"{\"after\": {\"id\": 9999, \"name\": \"c85a6b38154f7e3085d467d567141d45\"}}","table":"office_dogs"}
{"primary_key":"[10000]","row":"{\"after\": {\"id\": 10000, \"name\": \"aae9e0849fff8f47e0371a4c06fb255b\"}}","table":"office_dogs"}

The next step is to configure Snowflake. We are not going to look at the available processors today.

Deploy Snowflake

I'm using a Snowflake trial account. You get a generous credit which should be sufficient to complete this tutorial.

We need to create a database and a table where we will output the changefeed data.

CREATE OR REPLACE DATABASE FROM_COCKROACH;
CREATE OR REPLACE TABLE OFFICE_DOGS (RECORD variant);

We also need to create a user with key-pair authentication as we're going to be using the Snowpipe service.

openssl genrsa 2048 | openssl pkcs8 -topk8 -v2 des3 -inform PEM -out rsa_key.p8

We must use an encrypted key as Redpanda doesn't support unencrypted version.

Generate a public key

openssl rsa -in rsa_key.p8 -pubout -out rsa_key.pub 

Lastly, generate a pem file from the private key

openssl pkcs8 -in rsa_key.p8 -out rsa_key.pem

In Snowflake, alter the user to use the key pair generated in the previous step.

ALTER USER username SET rsa_public_key='MIIB...';

We can now populate the connect.yaml file with the required information for the snowflake_put output. This output type is for commercial use and requires a license, but since we're using it for demo purposes, we are able to proceed.

output:
  # Snowflake Output
  label: ""
  snowflake_put:
    account: <snowflake-account>
    user: <user>
    private_key_file: rsa_key.pem
    role: ACCOUNTADMIN
    database: <database>
    warehouse: <warehouse>
    schema: <schema>
    stage: "@%implicit_table_stage_name"
    path: "path"
    upload_parallel_threads: 4
    compression: NONE
    batching:
      count: 10
      period: 3s
      processors:
        - archive:
            format: json_array
    max_in_flight: 1

If we restart the compose environment and tail the logs, we can see the following:

level=info msg="Running main config from specified file" @service=benthos benthos_version=v4.32.1 path=/connect.yaml
level=info msg="Listening for HTTP requests at: http://0.0.0.0:4195" @service=benthos
level=info msg="Launching a Redpanda Connect instance, use CTRL+C to close" @service=benthos
level=info msg="Output type snowflake_put is now active" @service=benthos label="" path=root.output
level=info msg="Input type cockroachdb_changefeed is now active" @service=benthos label="" path=root.input

Let's look at the implicit table stage and observe if anything is changed.

list @%office_dogs
| dogs/f2f3cf47-d6bc-46f4-88f2-c82519b67481.json | 1312 | 30f709e4962bae9d10b48565d22e9f32 | Wed, 14 Aug 2024 18:58:43 GMT |
| dogs/f6adbf39-3955-4848-93c3-06f873a88078.json | 1312 | 28be7a619ef1e139599077e977ea130b | Wed, 14 Aug 2024 18:58:13 GMT |
| dogs/f8705606-eb07-400a-9ffe-da6834fa1a30.json | 1296 | 5afbdce0e8929fc38a2eb5e0f12b96d6 | Wed, 14 Aug 2024 18:57:29 GMT |
| dogs/f9e5c01a-7dda-4e76-840d-13b8a1e4946a.json | 1296 | 5480c01f1578f67afe2761c7619e9123 | Wed, 14 Aug 2024 18:57:32 GMT |
| dogs/fad4efe7-3f3f-48bc-bdb4-9f0310abcf4d.json | 1312 | 5942c6e2dbaef5ee257d4a9b8e68827d | Wed, 14 Aug 2024 18:58:04 GMT |

The files are ready to be copied into a table. Let's create a pipe

CREATE OR REPLACE PIPE FROM_COCKROACH.PUBLIC.cockroach_pipe AUTO_INGEST = FALSE AS COPY INTO FROM_COCKROACH.PUBLIC.OFFICE_DOGS FROM (SELECT * FROM @%office_dogs) FILE_FORMAT = (TYPE = JSON COMPRESSION = AUTO STRIP_OUTER_ARRAY = TRUE);

The last remaining step is to refresh the pipe.

ALTER PIPE cockroach_pipe REFRESH;
| dogs/ff0871b1-6f49-43a4-a929-958d07f74046.json | SENT   |
| dogs/ff131d8d-3781-4cf6-8700-edd50dbb87de.json | SENT   |
| dogs/ff216da1-4f9d-4b37-9776-bcd559dd4a6f.json | SENT   |
| dogs/ff221430-4c3a-46be-bbc2-d335cc6cc9e3.json | SENT   |
| dogs/ffbd7d45-5084-4e36-8907-61874ac652b4.json | SENT   |
| dogs/fffb5fa6-23cc-4450-934a-29ccf01c67b9.json | SENT   |

Let's query the table in Snowflake:

SELECT * FROM OFFICE_DOGS LIMIT 5;
| {                                                                                       |
|   "primary_key": "[5241]",                                                              |
|   "row": "{\"after\": {\"id\": 5241, \"name\": \"5e0360a0d10d849afbbfa319a50bccf2\"}}", |
|   "table": "office_dogs"                                                                |
| }                                                                                       |
| {                                                                                       |
|   "primary_key": "[5242]",                                                              |
|   "row": "{\"after\": {\"id\": 5242, \"name\": \"62be250249afe74bfbc5dd356e7b0ad9\"}}", |
|   "table": "office_dogs"                                                                |
| }                                                                                       |
| {                                                                                       |
|   "primary_key": "[5243]",                                                              |
|   "row": "{\"after\": {\"id\": 5243, \"name\": \"7f286800a8a03e74938d09fdba52f869\"}}", |
|   "table": "office_dogs"                                                                |
| }                                                                                       |
| {                                                                                       |
|   "primary_key": "[5244]",                                                              |
|   "row": "{\"after\": {\"id\": 5244, \"name\": \"16a330b8f09bcd314f9760ffe26d0ae2\"}}", |
|   "table": "office_dogs"                                                                |
| }

We expect 10000 rows

SELECT COUNT(*) FROM OFFICE_DOGS;
+----------+                                                                    
| COUNT(*) |
|----------|
|    10000 |
+----------+

The data is in JSON format, let's create a view and flatten the data out.

CREATE VIEW v_office_dogs AS
    SELECT PARSE_JSON(record:row):after:id::INTEGER AS id,
           PARSE_JSON(record:row):after:name::STRING AS name FROM OFFICE_DOGS;

Query the view

SELECT * FROM v_office_dogs WHERE id < 6;
+----+----------------------------------+                                       
| ID | NAME                             |
|----+----------------------------------|
|  1 | Petee H                          |
|  2 | Carl                             |
|  3 | 6e19280ae649efffa7a58584c7f46032 |
|  4 | 5e4e897f008bb752c8edfa64a3aed356 |
|  5 | abc0d898318d27f23a43060f89d62e34 |
+----+----------------------------------+

Verify

Let's make things a bit more interesting and delete data in CockroachDB.

DELETE FROM office_dogs WHERE name = 'Carl';
DELETE FROM office_dogs WHERE id = 1;

In Snowflake, let's refresh the pipe as of a few minutes ago:

ALTER PIPE cockroach_pipe REFRESH MODIFIED_AFTER='2024-08-14T12:10:00-07:00';

Notice there are a couple of files

+------------------------------------------------+--------+                     
| File                                           | Status |
|------------------------------------------------+--------|
| dogs/2a4ee400-6b37-4513-97cb-097764a340bc.json | SENT   |
| dogs/8f5b5b69-8a00-4dbf-979a-60c3814d96b4.json | SENT   |
+------------------------------------------------+--------+

I must caution that if you run the REFRESH manually, you may cause duplicates in your Snowflake data. We will look at better approaches in a future article.

Let's look at the row count:

+----------+                                                                    
| COUNT(*) |
|----------|
|    10002 |
+----------+

The removal process didn't properly update in Snowflake as anticipated; it recognized the deleted records but failed to mirror the state in CockroachDB. We need to incorporate additional logic to achieve this. This will be a task for another time.

Lastly, I would like to note that using Redpanda Connect as a compose file is optional. You have the option to run the Docker container by executing the following command:

docker run --rm -it -v ./redpanda/connect.yaml:/connect.yaml -v ./snowflake/rsa_key.pem:/rsa_key.pem docker.redpanda.com/redpandadata/connect run

diagram

Conclusion

Today, we explored Redpanda Connect as a means to deliver streaming changefeeds into Snowflake. We've only just begun to delve into this topic, and future articles will build upon the foundations laid today.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment