Created
October 30, 2022 21:26
-
-
Save joao-parana/2adbd97c70c701668cd5e778a92262ea to your computer and use it in GitHub Desktop.
TestAppendWithPartitioning for test identity transform partitioning in Iceberg version 1.0.0
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import org.apache.hadoop.conf.Configuration; | |
import org.apache.iceberg.*; | |
import org.apache.iceberg.catalog.Catalog; | |
import org.apache.iceberg.catalog.TableIdentifier; | |
import org.apache.iceberg.data.GenericRecord; | |
import org.apache.iceberg.data.IcebergGenerics; | |
import org.apache.iceberg.data.Record; | |
import org.apache.iceberg.data.parquet.GenericParquetWriter; | |
import org.apache.iceberg.hadoop.HadoopCatalog; | |
import org.apache.iceberg.io.CloseableIterable; | |
import org.apache.iceberg.io.FileAppender; | |
import org.apache.iceberg.parquet.Parquet; | |
import org.apache.iceberg.relocated.com.google.common.collect.Lists; | |
import org.apache.iceberg.types.Types; | |
import org.slf4j.Logger; | |
import org.slf4j.LoggerFactory; | |
import java.io.File; | |
import java.io.IOException; | |
import java.time.LocalDate; | |
import java.time.LocalDateTime; | |
import java.time.ZoneId; | |
import java.util.List; | |
import static org.apache.iceberg.Files.localInput; | |
import static org.apache.iceberg.types.Types.NestedField.optional; | |
import static org.apache.iceberg.types.Types.NestedField.required; | |
public class TestAppendWithPartitioning { | |
static final Logger logger = LoggerFactory.getLogger(TestAppendWithPartitioning.class); | |
static { | |
System.setProperty("log4j2.configurationFile", "dlh-log4j2.xml"); | |
// TestLog4JV1.test(); | |
// TestLog4J2.test(); | |
} | |
private static String lakehouse = "/tmp/iceberg-test-2"; | |
private Catalog catalog; | |
private Schema schema; | |
public static void main(String[] args) { | |
// Execute "rm -rf /tmp/iceberg-test-2/bookings" before test | |
TestAppendWithPartitioning tableAppender = new TestAppendWithPartitioning(); | |
tableAppender.setupCatalogAndSchema(); | |
System.out.println("\n\n"); | |
PartitionSpec unpartitioned = PartitionSpec.unpartitioned(); | |
Table table1 = tableAppender.createAndAppend(unpartitioned, "001"); | |
PartitionSpec partition1 = PartitionSpec.builderFor(tableAppender.schema) | |
.identity("hotel_name") | |
.build(); | |
Table table2 = tableAppender.createAndAppend(partition1, "002"); | |
PartitionSpec partition2 = PartitionSpec.builderFor(tableAppender.schema) | |
.month("ts") | |
.build(); | |
Table table3 = tableAppender.createAndAppend(partition2, "003"); | |
PartitionSpec partition3 = PartitionSpec.builderFor(tableAppender.schema) | |
.identity("hotel_name") | |
.month("ts") | |
.build(); | |
Table table4 = tableAppender.createAndAppend(partition3, "004"); | |
PartitionSpec partition4 = PartitionSpec.builderFor(tableAppender.schema) | |
.day("ts") | |
.build(); | |
Table table5 = tableAppender.createAndAppend(partition4, "005"); | |
tableAppender.listIcebergTableContent(table1); | |
tableAppender.listIcebergTableContent(table2); | |
tableAppender.listIcebergTableContent(table3); | |
tableAppender.listIcebergTableContent(table4); | |
tableAppender.listIcebergTableContent(table5); | |
} | |
private void setupCatalogAndSchema() { | |
this.schema = new Schema( | |
required(1, "ts", Types.TimestampType.withoutZone()), | |
required(2, "hotel_id", Types.LongType.get()), | |
optional(3, "hotel_name", Types.StringType.get()), | |
required(4, "arrival_date", Types.DateType.get()), | |
required(5, "value", Types.DoubleType.get())); | |
Configuration conf = new Configuration(); | |
conf.set(CatalogProperties.WAREHOUSE_LOCATION, TestAppendWithPartitioning.lakehouse); | |
String warehousePath = "file://" + TestAppendWithPartitioning.lakehouse; | |
this.catalog = new HadoopCatalog(conf, warehousePath); | |
} | |
private Table createAndAppend( | |
PartitionSpec partitionSpec, String sequentialFormatedNumber) { | |
Table table = null; | |
String dataDir = "bookings/rome_hotels_" + sequentialFormatedNumber; | |
TableIdentifier tableIdentifier = | |
TableIdentifier.parse(dataDir.replace('/', '.')); | |
try { | |
table = this.catalog.createTable(tableIdentifier, this.schema, partitionSpec); | |
logger.info("Iceberg table '" + table.name() + | |
"' created on '" + table.location() + "'"); | |
} catch (Exception e) { | |
logger.error("Error when I tried to create table Iceberg: " + e.getLocalizedMessage()); | |
} | |
File parquetFile = new File(this.lakehouse + "/" + dataDir + "/arq_" + | |
sequentialFormatedNumber + ".parquet"); | |
this.appendDataToIcebergTable(this.schema, table, parquetFile); | |
return table; | |
} | |
private void appendDataToIcebergTable( | |
Schema schema, Table table, File parquetFile) { | |
logger.info("Appending records "); | |
List<GenericRecord> records = Lists.newArrayList(); | |
// Generating one record for particioning test | |
GenericRecord genericRecord = GenericRecord.create(schema); | |
LocalDateTime localDateTime = LocalDateTime.now(ZoneId.of("UTC")); | |
genericRecord.setField("ts", localDateTime); | |
genericRecord.setField("hotel_id", 1000L); | |
genericRecord.setField("hotel_name", "hotel_name-" + 1000); | |
genericRecord.setField("arrival_date", LocalDate.of(2023, 1, 1)); | |
genericRecord.setField("value", (double) 4.13); | |
records.add(genericRecord); | |
FileAppender<GenericRecord> fileAppender = null; | |
try { | |
fileAppender = Parquet.write(Files.localOutput(parquetFile)) | |
.schema(table.schema()) | |
.createWriterFunc(GenericParquetWriter::buildWriter) | |
.build(); | |
fileAppender.addAll(records); | |
} catch (IOException e) { | |
throw new RuntimeException(e); | |
} finally { | |
try { | |
fileAppender.close(); | |
} catch (IOException e) { | |
throw new RuntimeException(e); | |
} | |
} | |
final SortOrder sortOrder = SortOrder.builderFor(schema) | |
// .withOrderId(1) | |
.asc("ts", NullOrder.NULLS_FIRST) | |
.asc("hotel_name", NullOrder.NULLS_FIRST) | |
.build(); | |
DataFile dataFile = DataFiles.builder(table.spec()) | |
.withInputFile(localInput(parquetFile)) | |
.withMetrics(fileAppender.metrics()) | |
.withFormat(FileFormat.PARQUET) | |
.withSortOrder(sortOrder) | |
.build(); | |
// A cada commit de transação é gerado um novo snapshot | |
Transaction t = table.newTransaction(); | |
t.newAppend().appendFile(dataFile).commit(); | |
// commit all changes to the table | |
t.commitTransaction(); | |
} | |
private void listIcebergTableContent(Table table) { | |
IcebergGenerics.ScanBuilder scanBuilder = IcebergGenerics.read(table); | |
String suffix = "" + table.name().split("_")[2] + ":\t"; | |
try (CloseableIterable<Record> result = scanBuilder.build()) { | |
for (Record record : result) { | |
System.out.println(suffix + "\t" + record); | |
} | |
System.out.println(line()); | |
} catch (IOException e) { | |
e.printStackTrace(); | |
throw new RuntimeException("Error when try to get values from Iceberg table"); | |
} | |
} | |
private static String line() { | |
return "-".repeat(127 - 41); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment