Last active
February 9, 2021 00:20
-
-
Save PeterCorless/7e86210bae8231e2ecfa059245e97191 to your computer and use it in GitHub Desktop.
Reading CDC with Java and Go
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| try (Cluster cluster = Cluster.builder().addContactPoint(source).build(); | |
| Session session = cluster.connect()) { |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| // Create the set of tables we want to watch. | |
| Set<TableName> tables = Collections.singleton(new TableName(keyspace, table)); |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| RawChangeConsumer changeConsumer = change -> { | |
| // Print the change. | |
| printChange(change); | |
| return CompletableFuture.completedFuture(null); | |
| }; |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| // Build a shared consumer of changes. | |
| RawChangeConsumer sharedChangeConsumer = change -> { | |
| // Print the change. | |
| printChange(change); | |
| return CompletableFuture.completedFuture(null); | |
| }; | |
| // Build a provider of this shared consumer. | |
| RawChangeConsumerProvider changeConsumerProvider = threadId -> { | |
| return sharedChangeConsumer; | |
| }; |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| // Build a provider of consumers. | |
| RawChangeConsumerProvider changeConsumerProvider = threadId -> { | |
| // Build a consumer of changes. | |
| RawChangeConsumer changeConsumer = change -> { | |
| // Print the change. | |
| printChange(change); | |
| return CompletableFuture.completedFuture(null); | |
| }; | |
| return changeConsumer; | |
| }; |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| // Build a CDCConsumer instance. We start it in a single-thread | |
| // configuration: workersCount(1). | |
| CDCConsumer consumer = CDCConsumerBuilder.builder(session, changeConsumerProvider, tables).workersCount(1).build(); | |
| consumer.start(); | |
| // Consume changes for 10 seconds. | |
| Thread.sleep(10000); | |
| consumer.stop(); |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| private static void printChange(RawChange change) { | |
| ChangeId changeId = change.getId(); | |
| StreamId streamId = changeId.getStreamId(); | |
| ChangeTime changeTime = changeId.getChangeTime(); |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| // Get the operation type, for example: ROW_UPDATE, POST_IMAGE. | |
| RawChange.OperationType operationType = change.getOperationType(); |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| ChangeSchema changeSchema = change.getSchema(); |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| List<ChangeSchema.ColumnDefinition> nonCdcColumnDefinitions = changeSchema.getNonCdcColumnDefinitions(); | |
| for (ChangeSchema.ColumnDefinition columnDefinition : nonCdcColumnDefinitions) { | |
| String columnName = columnDefinition.getColumnName(); | |
| // We can get information if this column was a part of primary key | |
| // in the base table. Note that in CDC log table different columns | |
| // are part of a primary key (cdc$stream_id, cdc$time, batch_seq_no). | |
| ChangeSchema.ColumnType baseTableColumnType = columnDefinition.getBaseTableColumnType(); | |
| // Get the information about the data type (as present in CDC log). | |
| ChangeSchema.DataType logDataType = columnDefinition.getCdcLogDataType(); |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Cell cell = change.getCell(columnName); | |
| Object cellValue = cell.getAsObject(); |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Integer intCellValue = cell.getInt(); |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| git clone https://github.com/scylladb/scylla-cdc-java.git | |
| cd scylla-cdc-java | |
| mvn clean install | |
| cd scylla-cdc-printer | |
| ./scylla-cdc-printer -k KEYSPACE -t TABLE -s SOURCE |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| go get https://github.com/scylladb/scylla-cdc-go |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| cluster := gocql.NewCluster(source) | |
| cluster.PoolConfig.HostSelectionPolicy = gocql.TokenAwareHostPolicy(gocql.DCAwareRoundRobinPolicy("local-dc")) | |
| session, err := cluster.CreateSession() | |
| if err != nil { | |
| log.Fatal(err) | |
| } | |
| defer session.Close() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| cfg := &scyllacdc.ReaderConfig{ | |
| Session: session, | |
| ChangeConsumerFactory: changeConsumerFactory, | |
| TableNames: []string{keyspace + "." + table}, | |
| Logger: log.New(os.Stderr, "", log.Ldate|log.Lmicroseconds|log.Lshortfile), | |
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| func printerConsumer(ctx context.Context, tableName string, c scyllacdc.Change) error { | |
| // Your logic goes here | |
| return nil | |
| } | |
| var changeConsumerFactory = scyllacdc.MakeChangeConsumerFactoryFromFunc(printerConsumer) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| type myConsumer struct { /* ... */ } | |
| func (mc *myConsumer) Consume(ctx context.Context, change scyllacdc.Change) error { | |
| // Your logic goes here | |
| return nil | |
| } | |
| func (mc *myConsumer) End() error { | |
| return nil | |
| } | |
| type myFactory struct { /* ... */ } | |
| func (f *myFactory) CreateChangeConsumer(ctx context.Context, input scyllacdc.CreateChangeConsumerInput) (ChangeConsumer, error) | |
| return &myConsumer{ | |
| /* ... */ | |
| }, nil | |
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| reader, err := scyllacdc.NewReader(context.Background(), cfg) | |
| if err != nil { | |
| log.Fatal(err) | |
| } | |
| // Stop the consumer after 10 seconds | |
| time.AfterFunc(10*time.Second, func() { | |
| reader.Stop() | |
| }) | |
| if err := reader.Run(context.Background()); err != nil { | |
| log.Fatal(err) | |
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| fmt.Printf("[%s %s]:\n", c.StreamID, c.Time.String()) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| for _, ci := range c.Columns() { | |
| if !strings.HasPrefix(ci.Name, "cdc$") { | |
| fmt.Println(ci.Name) | |
| } | |
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| pkRaw, _ := changeRow.GetValue("pk") | |
| ckRaw, _ := changeRow.GetValue("ck") | |
| v := changeRow.GetAtomicChange("v") | |
| pk := pkRaw.(*int) | |
| ck := ckRaw.(*int) | |
| fmt.Printf("Operation: %s, pk: %s, ck: %s\n", changeRow.GetOperation(), | |
| nullableIntToStr(pk), nullableIntToStr(ck)) | |
| if v.IsDeleted { | |
| fmt.Printf(" Column v was set to null/deleted\n") | |
| } else { | |
| vInt := v.Value.(*int) | |
| if vInt != nil { | |
| fmt.Printf(" Column v was set to %d\n", *vInt) | |
| } else { | |
| fmt.Print(" Column v was not changed\n") | |
| } | |
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| for _, r := range c.PreImage { | |
| fmt.Printf(" %s\n", r) | |
| } | |
| for _, r := range c.Delta { | |
| fmt.Printf(" %s\n", r) | |
| } | |
| for _, r := range c.PostImage { | |
| fmt.Printf(" %s\n", r) | |
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| git clone https://github.com/scylladb/scylla-cdc-go | |
| cd scylla-cdc-go | |
| go run ./examples/printer/ -keyspace KEYSPACE -table TABLE -source SOURCE |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment