Skip to content

Instantly share code, notes, and snippets.

@kbendick
Created April 21, 2022 21:00
Show Gist options
  • Save kbendick/e6634fd3010322914d4b49927d2e7d30 to your computer and use it in GitHub Desktop.
Save kbendick/e6634fd3010322914d4b49927d2e7d30 to your computer and use it in GitHub Desktop.
Diff between Iceberg's Flink 1.14 source and Flink 1.15 source as of April 21st, 2022
diff --git a/./flink/v1.14/build.gradle b/./flink/v1.15/build.gradle
index a0e01c8a4..7e0b07bdc 100644
--- a/./flink/v1.14/build.gradle
+++ b/./flink/v1.15/build.gradle
@@ -17,8 +17,8 @@
* under the License.
*/
-String flinkVersion = '1.14.0'
-String flinkMajorVersion = '1.14'
+String flinkVersion = '1.15.0'
+String flinkMajorVersion = '1.15'
String scalaVersion = System.getProperty("scalaVersion") != null ? System.getProperty("scalaVersion") : System.getProperty("defaultScalaVersion")
project(":iceberg-flink:iceberg-flink-${flinkMajorVersion}") {
@@ -32,11 +32,14 @@ project(":iceberg-flink:iceberg-flink-${flinkMajorVersion}") {
implementation project(':iceberg-orc')
implementation project(':iceberg-parquet')
implementation project(':iceberg-hive-metastore')
-
- compileOnly "org.apache.flink:flink-streaming-java_${scalaVersion}:${flinkVersion}"
- compileOnly "org.apache.flink:flink-streaming-java_${scalaVersion}:${flinkVersion}:tests"
- compileOnly "org.apache.flink:flink-table-api-java-bridge_${scalaVersion}:${flinkVersion}"
+ compileOnly "org.apache.flink:flink-streaming-java:${flinkVersion}"
+ compileOnly "org.apache.flink:flink-streaming-java:${flinkVersion}:tests"
+ compileOnly "org.apache.flink:flink-table-api-java-bridge:${flinkVersion}"
compileOnly "org.apache.flink:flink-table-planner_${scalaVersion}:${flinkVersion}"
+ compileOnly "org.apache.flink:flink-connector-base:${flinkVersion}"
+ compileOnly "org.apache.flink:flink-connector-files:${flinkVersion}"
+ // This should be connectors-base, plus some other things that are needed
+ // compileOnly "org.apache.flink:flink-connectors:${flinkVersion}"
compileOnly "org.apache.hadoop:hadoop-hdfs"
compileOnly "org.apache.hadoop:hadoop-common"
compileOnly("org.apache.hadoop:hadoop-minicluster") {
@@ -65,7 +68,7 @@ project(":iceberg-flink:iceberg-flink-${flinkMajorVersion}") {
testImplementation ("org.apache.flink:flink-test-utils-junit:${flinkVersion}") {
exclude group: 'junit'
}
- testImplementation("org.apache.flink:flink-test-utils_${scalaVersion}:${flinkVersion}") {
+ testImplementation("org.apache.flink:flink-test-utils:${flinkVersion}") {
exclude group: "org.apache.curator", module: 'curator-test'
exclude group: 'junit'
}
diff --git a/./flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/IcebergTableSink.java b/./flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/IcebergTableSink.java
index f2eecb9b6..855e9e73c 100644
--- a/./flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/IcebergTableSink.java
+++ b/./flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/IcebergTableSink.java
@@ -63,7 +63,7 @@ public class IcebergTableSink implements DynamicTableSink, SupportsPartitioning,
.map(UniqueConstraint::getColumns)
.orElseGet(ImmutableList::of);
- return (DataStreamSinkProvider) dataStream -> FlinkSink.forRowData(dataStream)
+ return (DataStreamSinkProvider) (providerContext, dataStream) -> FlinkSink.forRowData(dataStream)
.tableLoader(tableLoader)
.tableSchema(tableSchema)
.equalityFieldColumns(equalityColumns)
diff --git a/./flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/IcebergTableSource.java b/./flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/IcebergTableSource.java
index dd8f6454e..b7736d570 100644
--- a/./flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/IcebergTableSource.java
+++ b/./flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/IcebergTableSource.java
@@ -28,6 +28,7 @@ import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.ProviderContext;
import org.apache.flink.table.connector.source.DataStreamScanProvider;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.connector.source.ScanTableSource;
@@ -158,8 +159,13 @@ public class IcebergTableSource
@Override
public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderContext) {
return new DataStreamScanProvider() {
+
+ // UPDATED - Needs to be added as support for other signature is entirely removed.
+ // TODO - This should probably be ported to 1.14 as well to make future changes
+ // easier to backport.
@Override
- public DataStream<RowData> produceDataStream(StreamExecutionEnvironment execEnv) {
+ public DataStream<RowData> produceDataStream(
+ ProviderContext providerContext, StreamExecutionEnvironment execEnv) {
return createDataStream(execEnv);
}
diff --git a/./flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/TestChangeLogTable.java b/./flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/TestChangeLogTable.java
index 68b706e2d..ed3735fd2 100644
--- a/./flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/TestChangeLogTable.java
+++ b/./flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/TestChangeLogTable.java
@@ -90,6 +90,11 @@ public class TestChangeLogTable extends ChangeLogTableTestBase {
sql("USE CATALOG %s", CATALOG_NAME);
sql("CREATE DATABASE %s", DATABASE_NAME);
sql("USE %s", DATABASE_NAME);
+ // https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/config/#table-exec-sink-upsert-materialize
+ // UPDATED - Needed to make testChangeLogOnDataKey work.
+ // TODO - Add tests with all configuraiton values as follow up - and possibly remove our own injected shuffle
+ // as Flink can now do it.
+ getTableEnv().getConfig().set("table.exec.sink.upsert-materialize", "NONE");
}
@After
diff --git a/./flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/TestIcebergConnector.java b/./flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/TestIcebergConnector.java
index 272f7a716..0e55a9079 100644
--- a/./flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/TestIcebergConnector.java
+++ b/./flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/TestIcebergConnector.java
@@ -27,7 +27,6 @@ import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
-import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.catalog.Catalog;
import org.apache.flink.table.catalog.ObjectPath;
@@ -272,8 +271,9 @@ public class TestIcebergConnector extends FlinkTestBase {
try {
testCreateConnectorTable();
// Ensure that the table was created under the specific database.
+ // UPDATED
AssertHelpers.assertThrows("Table should already exists",
- ValidationException.class,
+ org.apache.flink.table.api.TableException.class,
"Could not execute CreateTable in path",
() -> sql("CREATE TABLE `default_catalog`.`%s`.`%s`", databaseName(), TABLE_NAME));
} finally {
diff --git a/./flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/source/BoundedTableFactory.java b/./flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/BoundedTableFactory.java
index b0041c3bc..d73363395 100644
--- a/./flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/source/BoundedTableFactory.java
+++ b/./flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/BoundedTableFactory.java
@@ -34,6 +34,7 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.ProviderContext;
import org.apache.flink.table.connector.source.DataStreamScanProvider;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.connector.source.ScanTableSource;
@@ -133,8 +134,10 @@ public class BoundedTableFactory implements DynamicTableSourceFactory {
@Override
public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderContext) {
return new DataStreamScanProvider() {
+ // UPDATED
@Override
- public DataStream<RowData> produceDataStream(StreamExecutionEnvironment env) {
+ public DataStream<RowData> produceDataStream(
+ ProviderContext providerContext, StreamExecutionEnvironment env) {
boolean checkpointEnabled = env.getCheckpointConfig().isCheckpointingEnabled();
SourceFunction<Row> source = new BoundedTestSource<>(elementsPerCheckpoint, checkpointEnabled);
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment