Forked from kbendick/flink_iceberg_diff_1.14_to_1.15.txt
Created
May 3, 2022 15:50
-
-
Save rdblue/74ff65d715c97518de0c5c4d912ae84b to your computer and use it in GitHub Desktop.
Propsed Iceberg Support for Flink 1.15 Diff from Iceberg Flink 1.14
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
diff --git a/./flink/v1.14/build.gradle b/./flink/v1.15/build.gradle | |
index a0e01c8a4..46572696f 100644 | |
--- a/./flink/v1.14/build.gradle | |
+++ b/./flink/v1.15/build.gradle | |
@@ -17,8 +17,8 @@ | |
* under the License. | |
*/ | |
-String flinkVersion = '1.14.0' | |
-String flinkMajorVersion = '1.14' | |
+String flinkVersion = '1.15.0' | |
+String flinkMajorVersion = '1.15' | |
String scalaVersion = System.getProperty("scalaVersion") != null ? System.getProperty("scalaVersion") : System.getProperty("defaultScalaVersion") | |
project(":iceberg-flink:iceberg-flink-${flinkMajorVersion}") { | |
@@ -32,11 +32,12 @@ project(":iceberg-flink:iceberg-flink-${flinkMajorVersion}") { | |
implementation project(':iceberg-orc') | |
implementation project(':iceberg-parquet') | |
implementation project(':iceberg-hive-metastore') | |
- | |
- compileOnly "org.apache.flink:flink-streaming-java_${scalaVersion}:${flinkVersion}" | |
- compileOnly "org.apache.flink:flink-streaming-java_${scalaVersion}:${flinkVersion}:tests" | |
- compileOnly "org.apache.flink:flink-table-api-java-bridge_${scalaVersion}:${flinkVersion}" | |
+ compileOnly "org.apache.flink:flink-streaming-java:${flinkVersion}" | |
+ compileOnly "org.apache.flink:flink-streaming-java:${flinkVersion}:tests" | |
+ compileOnly "org.apache.flink:flink-table-api-java-bridge:${flinkVersion}" | |
compileOnly "org.apache.flink:flink-table-planner_${scalaVersion}:${flinkVersion}" | |
+ compileOnly "org.apache.flink:flink-connector-base:${flinkVersion}" | |
+ compileOnly "org.apache.flink:flink-connector-files:${flinkVersion}" | |
compileOnly "org.apache.hadoop:hadoop-hdfs" | |
compileOnly "org.apache.hadoop:hadoop-common" | |
compileOnly("org.apache.hadoop:hadoop-minicluster") { | |
@@ -65,7 +66,7 @@ project(":iceberg-flink:iceberg-flink-${flinkMajorVersion}") { | |
testImplementation ("org.apache.flink:flink-test-utils-junit:${flinkVersion}") { | |
exclude group: 'junit' | |
} | |
- testImplementation("org.apache.flink:flink-test-utils_${scalaVersion}:${flinkVersion}") { | |
+ testImplementation("org.apache.flink:flink-test-utils:${flinkVersion}") { | |
exclude group: "org.apache.curator", module: 'curator-test' | |
exclude group: 'junit' | |
} | |
diff --git a/./flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/IcebergTableSink.java b/./flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/IcebergTableSink.java | |
index f2eecb9b6..855e9e73c 100644 | |
--- a/./flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/IcebergTableSink.java | |
+++ b/./flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/IcebergTableSink.java | |
@@ -63,7 +63,7 @@ public class IcebergTableSink implements DynamicTableSink, SupportsPartitioning, | |
.map(UniqueConstraint::getColumns) | |
.orElseGet(ImmutableList::of); | |
- return (DataStreamSinkProvider) dataStream -> FlinkSink.forRowData(dataStream) | |
+ return (DataStreamSinkProvider) (providerContext, dataStream) -> FlinkSink.forRowData(dataStream) | |
.tableLoader(tableLoader) | |
.tableSchema(tableSchema) | |
.equalityFieldColumns(equalityColumns) | |
diff --git a/./flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/IcebergTableSource.java b/./flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/IcebergTableSource.java | |
index dd8f6454e..a0bc04c03 100644 | |
--- a/./flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/IcebergTableSource.java | |
+++ b/./flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/IcebergTableSource.java | |
@@ -28,6 +28,7 @@ import org.apache.flink.streaming.api.datastream.DataStream; | |
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; | |
import org.apache.flink.table.api.TableSchema; | |
import org.apache.flink.table.connector.ChangelogMode; | |
+import org.apache.flink.table.connector.ProviderContext; | |
import org.apache.flink.table.connector.source.DataStreamScanProvider; | |
import org.apache.flink.table.connector.source.DynamicTableSource; | |
import org.apache.flink.table.connector.source.ScanTableSource; | |
@@ -159,7 +160,8 @@ public class IcebergTableSource | |
public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderContext) { | |
return new DataStreamScanProvider() { | |
@Override | |
- public DataStream<RowData> produceDataStream(StreamExecutionEnvironment execEnv) { | |
+ public DataStream<RowData> produceDataStream( | |
+ ProviderContext providerContext, StreamExecutionEnvironment execEnv) { | |
return createDataStream(execEnv); | |
} | |
diff --git a/./flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/TestChangeLogTable.java b/./flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/TestChangeLogTable.java | |
index 68b706e2d..ea36fb7ea 100644 | |
--- a/./flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/TestChangeLogTable.java | |
+++ b/./flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/TestChangeLogTable.java | |
@@ -90,6 +90,9 @@ public class TestChangeLogTable extends ChangeLogTableTestBase { | |
sql("USE CATALOG %s", CATALOG_NAME); | |
sql("CREATE DATABASE %s", DATABASE_NAME); | |
sql("USE %s", DATABASE_NAME); | |
+ // Set the table.exec.sink.upsert-materialize=NONE, so that downstream operators will receive the | |
+ // records with the same order as the source operator, bypassing Flink's inferred shuffle. | |
+ getTableEnv().getConfig().set("table.exec.sink.upsert-materialize", "NONE"); | |
} | |
@After | |
diff --git a/./flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/TestIcebergConnector.java b/./flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/TestIcebergConnector.java | |
index 272f7a716..26bff05a6 100644 | |
--- a/./flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/TestIcebergConnector.java | |
+++ b/./flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/TestIcebergConnector.java | |
@@ -27,7 +27,6 @@ import org.apache.flink.configuration.CoreOptions; | |
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; | |
import org.apache.flink.table.api.EnvironmentSettings; | |
import org.apache.flink.table.api.TableEnvironment; | |
-import org.apache.flink.table.api.ValidationException; | |
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; | |
import org.apache.flink.table.catalog.Catalog; | |
import org.apache.flink.table.catalog.ObjectPath; | |
@@ -273,7 +272,7 @@ public class TestIcebergConnector extends FlinkTestBase { | |
testCreateConnectorTable(); | |
// Ensure that the table was created under the specific database. | |
AssertHelpers.assertThrows("Table should already exists", | |
- ValidationException.class, | |
+ org.apache.flink.table.api.TableException.class, | |
"Could not execute CreateTable in path", | |
() -> sql("CREATE TABLE `default_catalog`.`%s`.`%s`", databaseName(), TABLE_NAME)); | |
} finally { | |
diff --git a/./flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/source/BoundedTableFactory.java b/./flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/BoundedTableFactory.java | |
index b0041c3bc..bf38550c8 100644 | |
--- a/./flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/source/BoundedTableFactory.java | |
+++ b/./flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/BoundedTableFactory.java | |
@@ -34,6 +34,7 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; | |
import org.apache.flink.streaming.api.functions.source.SourceFunction; | |
import org.apache.flink.table.api.TableSchema; | |
import org.apache.flink.table.connector.ChangelogMode; | |
+import org.apache.flink.table.connector.ProviderContext; | |
import org.apache.flink.table.connector.source.DataStreamScanProvider; | |
import org.apache.flink.table.connector.source.DynamicTableSource; | |
import org.apache.flink.table.connector.source.ScanTableSource; | |
@@ -134,7 +135,8 @@ public class BoundedTableFactory implements DynamicTableSourceFactory { | |
public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderContext) { | |
return new DataStreamScanProvider() { | |
@Override | |
- public DataStream<RowData> produceDataStream(StreamExecutionEnvironment env) { | |
+ public DataStream<RowData> produceDataStream( | |
+ ProviderContext providerContext, StreamExecutionEnvironment env) { | |
boolean checkpointEnabled = env.getCheckpointConfig().isCheckpointingEnabled(); | |
SourceFunction<Row> source = new BoundedTestSource<>(elementsPerCheckpoint, checkpointEnabled); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment