Skip to content

Instantly share code, notes, and snippets.

@sap1ens
Created March 21, 2025 23:35
Show Gist options
  • Save sap1ens/e484965a0c2cecb0fca67b63755a8d48 to your computer and use it in GitHub Desktop.
Save sap1ens/e484965a0c2cecb0fca67b63755a8d48 to your computer and use it in GitHub Desktop.
Flink / Iceberg connecting to Confluent Tableflow via REST Catalog
package com.example.demo;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.data.RowData;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.flink.CatalogLoader;
import org.apache.iceberg.flink.TableLoader;
import org.apache.iceberg.flink.source.IcebergSource;
import java.util.Map;
public class IcebergDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
CatalogLoader catalogLoader = CatalogLoader.rest("confluent", new Configuration(), Map.of(
"uri", "<Tableflow endpoint>",
"credential", "<api_key>:<secret>",
"io-impl", "org.apache.iceberg.aws.s3.S3FileIO",
"s3.remote-signing-enabled", "true"
));
Namespace namespace = Namespace.of("<cluster_id>");
Catalog catalog = catalogLoader.loadCatalog();
System.out.println("Available tables: " + catalog.listTables(namespace));
TableLoader tableLoader = TableLoader.fromCatalog(catalogLoader, TableIdentifier.of(namespace, "<topic_name>"));
DataStream<RowData> source = IcebergSource.forRowData()
.tableLoader(tableLoader)
.streaming(true)
.buildStream(env);
source.print();
env.execute("Iceberg Demo");
}
}
<!-- Maven dependencies -->
<dependencies>
<!-- Your standard Flink dependencies ... -->
<!-- Then what's needed for Iceberg -->
<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-flink-runtime-1.19</artifactId>
<version>1.8.1</version>
</dependency>
<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-aws-bundle</artifactId>
<version>1.8.1</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.8.5</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-files</artifactId>
<version>1.19.1</version>
</dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>2.18.0</version>
</dependency>
</dependencies>
@sap1ens
Copy link
Author

sap1ens commented Mar 21, 2025

Note: it seems like there is a bug in Iceberg which incorrectly parses table names with dots.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment