Created
March 21, 2025 23:35
-
-
Save sap1ens/e484965a0c2cecb0fca67b63755a8d48 to your computer and use it in GitHub Desktop.
Flink / Iceberg connecting to Confluent Tableflow via REST Catalog
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.example.demo; | |
import org.apache.flink.streaming.api.datastream.DataStream; | |
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; | |
import org.apache.flink.table.data.RowData; | |
import org.apache.hadoop.conf.Configuration; | |
import org.apache.iceberg.catalog.Catalog; | |
import org.apache.iceberg.catalog.Namespace; | |
import org.apache.iceberg.catalog.TableIdentifier; | |
import org.apache.iceberg.flink.CatalogLoader; | |
import org.apache.iceberg.flink.TableLoader; | |
import org.apache.iceberg.flink.source.IcebergSource; | |
import java.util.Map; | |
public class IcebergDemo { | |
public static void main(String[] args) throws Exception { | |
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); | |
CatalogLoader catalogLoader = CatalogLoader.rest("confluent", new Configuration(), Map.of( | |
"uri", "<Tableflow endpoint>", | |
"credential", "<api_key>:<secret>", | |
"io-impl", "org.apache.iceberg.aws.s3.S3FileIO", | |
"s3.remote-signing-enabled", "true" | |
)); | |
Namespace namespace = Namespace.of("<cluster_id>"); | |
Catalog catalog = catalogLoader.loadCatalog(); | |
System.out.println("Available tables: " + catalog.listTables(namespace)); | |
TableLoader tableLoader = TableLoader.fromCatalog(catalogLoader, TableIdentifier.of(namespace, "<topic_name>")); | |
DataStream<RowData> source = IcebergSource.forRowData() | |
.tableLoader(tableLoader) | |
.streaming(true) | |
.buildStream(env); | |
source.print(); | |
env.execute("Iceberg Demo"); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<!-- Maven dependencies --> | |
<dependencies> | |
<!-- Your standard Flink dependencies ... --> | |
<!-- Then what's needed for Iceberg --> | |
<dependency> | |
<groupId>org.apache.iceberg</groupId> | |
<artifactId>iceberg-flink-runtime-1.19</artifactId> | |
<version>1.8.1</version> | |
</dependency> | |
<dependency> | |
<groupId>org.apache.iceberg</groupId> | |
<artifactId>iceberg-aws-bundle</artifactId> | |
<version>1.8.1</version> | |
</dependency> | |
<dependency> | |
<groupId>org.apache.hadoop</groupId> | |
<artifactId>hadoop-common</artifactId> | |
<version>2.8.5</version> | |
</dependency> | |
<dependency> | |
<groupId>org.apache.flink</groupId> | |
<artifactId>flink-connector-files</artifactId> | |
<version>1.19.1</version> | |
</dependency> | |
<dependency> | |
<groupId>commons-io</groupId> | |
<artifactId>commons-io</artifactId> | |
<version>2.18.0</version> | |
</dependency> | |
</dependencies> |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Note: it seems like there is a bug in Iceberg which incorrectly parses table names with dots.