Created
April 21, 2022 21:00
-
-
Save kbendick/e6634fd3010322914d4b49927d2e7d30 to your computer and use it in GitHub Desktop.
Diff between Iceberg's Flink 1.14 source and Flink 1.15 source as of April 21st, 2022
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
diff --git a/./flink/v1.14/build.gradle b/./flink/v1.15/build.gradle | |
index a0e01c8a4..7e0b07bdc 100644 | |
--- a/./flink/v1.14/build.gradle | |
+++ b/./flink/v1.15/build.gradle | |
@@ -17,8 +17,8 @@ | |
* under the License. | |
*/ | |
-String flinkVersion = '1.14.0' | |
-String flinkMajorVersion = '1.14' | |
+String flinkVersion = '1.15.0' | |
+String flinkMajorVersion = '1.15' | |
String scalaVersion = System.getProperty("scalaVersion") != null ? System.getProperty("scalaVersion") : System.getProperty("defaultScalaVersion") | |
project(":iceberg-flink:iceberg-flink-${flinkMajorVersion}") { | |
@@ -32,11 +32,14 @@ project(":iceberg-flink:iceberg-flink-${flinkMajorVersion}") { | |
implementation project(':iceberg-orc') | |
implementation project(':iceberg-parquet') | |
implementation project(':iceberg-hive-metastore') | |
- | |
- compileOnly "org.apache.flink:flink-streaming-java_${scalaVersion}:${flinkVersion}" | |
- compileOnly "org.apache.flink:flink-streaming-java_${scalaVersion}:${flinkVersion}:tests" | |
- compileOnly "org.apache.flink:flink-table-api-java-bridge_${scalaVersion}:${flinkVersion}" | |
+ compileOnly "org.apache.flink:flink-streaming-java:${flinkVersion}" | |
+ compileOnly "org.apache.flink:flink-streaming-java:${flinkVersion}:tests" | |
+ compileOnly "org.apache.flink:flink-table-api-java-bridge:${flinkVersion}" | |
compileOnly "org.apache.flink:flink-table-planner_${scalaVersion}:${flinkVersion}" | |
+ compileOnly "org.apache.flink:flink-connector-base:${flinkVersion}" | |
+ compileOnly "org.apache.flink:flink-connector-files:${flinkVersion}" | |
+ // This should be connectors-base, plus some other things that are needed | |
+ // compileOnly "org.apache.flink:flink-connectors:${flinkVersion}" | |
compileOnly "org.apache.hadoop:hadoop-hdfs" | |
compileOnly "org.apache.hadoop:hadoop-common" | |
compileOnly("org.apache.hadoop:hadoop-minicluster") { | |
@@ -65,7 +68,7 @@ project(":iceberg-flink:iceberg-flink-${flinkMajorVersion}") { | |
testImplementation ("org.apache.flink:flink-test-utils-junit:${flinkVersion}") { | |
exclude group: 'junit' | |
} | |
- testImplementation("org.apache.flink:flink-test-utils_${scalaVersion}:${flinkVersion}") { | |
+ testImplementation("org.apache.flink:flink-test-utils:${flinkVersion}") { | |
exclude group: "org.apache.curator", module: 'curator-test' | |
exclude group: 'junit' | |
} | |
diff --git a/./flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/IcebergTableSink.java b/./flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/IcebergTableSink.java | |
index f2eecb9b6..855e9e73c 100644 | |
--- a/./flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/IcebergTableSink.java | |
+++ b/./flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/IcebergTableSink.java | |
@@ -63,7 +63,7 @@ public class IcebergTableSink implements DynamicTableSink, SupportsPartitioning, | |
.map(UniqueConstraint::getColumns) | |
.orElseGet(ImmutableList::of); | |
- return (DataStreamSinkProvider) dataStream -> FlinkSink.forRowData(dataStream) | |
+ return (DataStreamSinkProvider) (providerContext, dataStream) -> FlinkSink.forRowData(dataStream) | |
.tableLoader(tableLoader) | |
.tableSchema(tableSchema) | |
.equalityFieldColumns(equalityColumns) | |
diff --git a/./flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/IcebergTableSource.java b/./flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/IcebergTableSource.java | |
index dd8f6454e..b7736d570 100644 | |
--- a/./flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/IcebergTableSource.java | |
+++ b/./flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/IcebergTableSource.java | |
@@ -28,6 +28,7 @@ import org.apache.flink.streaming.api.datastream.DataStream; | |
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; | |
import org.apache.flink.table.api.TableSchema; | |
import org.apache.flink.table.connector.ChangelogMode; | |
+import org.apache.flink.table.connector.ProviderContext; | |
import org.apache.flink.table.connector.source.DataStreamScanProvider; | |
import org.apache.flink.table.connector.source.DynamicTableSource; | |
import org.apache.flink.table.connector.source.ScanTableSource; | |
@@ -158,8 +159,13 @@ public class IcebergTableSource | |
@Override | |
public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderContext) { | |
return new DataStreamScanProvider() { | |
+ | |
+ // UPDATED - Needs to be added as support for other signature is entirely removed. | |
+ // TODO - This should probably be ported to 1.14 as well to make future changes | |
+ // easier to backport. | |
@Override | |
- public DataStream<RowData> produceDataStream(StreamExecutionEnvironment execEnv) { | |
+ public DataStream<RowData> produceDataStream( | |
+ ProviderContext providerContext, StreamExecutionEnvironment execEnv) { | |
return createDataStream(execEnv); | |
} | |
diff --git a/./flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/TestChangeLogTable.java b/./flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/TestChangeLogTable.java | |
index 68b706e2d..ed3735fd2 100644 | |
--- a/./flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/TestChangeLogTable.java | |
+++ b/./flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/TestChangeLogTable.java | |
@@ -90,6 +90,11 @@ public class TestChangeLogTable extends ChangeLogTableTestBase { | |
sql("USE CATALOG %s", CATALOG_NAME); | |
sql("CREATE DATABASE %s", DATABASE_NAME); | |
sql("USE %s", DATABASE_NAME); | |
+ // https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/config/#table-exec-sink-upsert-materialize | |
+ // UPDATED - Needed to make testChangeLogOnDataKey work. | |
+ // TODO - Add tests with all configuraiton values as follow up - and possibly remove our own injected shuffle | |
+ // as Flink can now do it. | |
+ getTableEnv().getConfig().set("table.exec.sink.upsert-materialize", "NONE"); | |
} | |
@After | |
diff --git a/./flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/TestIcebergConnector.java b/./flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/TestIcebergConnector.java | |
index 272f7a716..0e55a9079 100644 | |
--- a/./flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/TestIcebergConnector.java | |
+++ b/./flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/TestIcebergConnector.java | |
@@ -27,7 +27,6 @@ import org.apache.flink.configuration.CoreOptions; | |
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; | |
import org.apache.flink.table.api.EnvironmentSettings; | |
import org.apache.flink.table.api.TableEnvironment; | |
-import org.apache.flink.table.api.ValidationException; | |
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; | |
import org.apache.flink.table.catalog.Catalog; | |
import org.apache.flink.table.catalog.ObjectPath; | |
@@ -272,8 +271,9 @@ public class TestIcebergConnector extends FlinkTestBase { | |
try { | |
testCreateConnectorTable(); | |
// Ensure that the table was created under the specific database. | |
+ // UPDATED | |
AssertHelpers.assertThrows("Table should already exists", | |
- ValidationException.class, | |
+ org.apache.flink.table.api.TableException.class, | |
"Could not execute CreateTable in path", | |
() -> sql("CREATE TABLE `default_catalog`.`%s`.`%s`", databaseName(), TABLE_NAME)); | |
} finally { | |
diff --git a/./flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/source/BoundedTableFactory.java b/./flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/BoundedTableFactory.java | |
index b0041c3bc..d73363395 100644 | |
--- a/./flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/source/BoundedTableFactory.java | |
+++ b/./flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/BoundedTableFactory.java | |
@@ -34,6 +34,7 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; | |
import org.apache.flink.streaming.api.functions.source.SourceFunction; | |
import org.apache.flink.table.api.TableSchema; | |
import org.apache.flink.table.connector.ChangelogMode; | |
+import org.apache.flink.table.connector.ProviderContext; | |
import org.apache.flink.table.connector.source.DataStreamScanProvider; | |
import org.apache.flink.table.connector.source.DynamicTableSource; | |
import org.apache.flink.table.connector.source.ScanTableSource; | |
@@ -133,8 +134,10 @@ public class BoundedTableFactory implements DynamicTableSourceFactory { | |
@Override | |
public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderContext) { | |
return new DataStreamScanProvider() { | |
+ // UPDATED | |
@Override | |
- public DataStream<RowData> produceDataStream(StreamExecutionEnvironment env) { | |
+ public DataStream<RowData> produceDataStream( | |
+ ProviderContext providerContext, StreamExecutionEnvironment env) { | |
boolean checkpointEnabled = env.getCheckpointConfig().isCheckpointingEnabled(); | |
SourceFunction<Row> source = new BoundedTestSource<>(elementsPerCheckpoint, checkpointEnabled); | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment