Created
September 7, 2022 23:40
-
-
Save samredai/53ee2250f91d55c9ac44067513937b4d to your computer and use it in GitHub Desktop.
Iceberg: Write records to an Iceberg table
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import org.apache.iceberg.CatalogProperties; | |
import org.apache.iceberg.DataFile; | |
import org.apache.iceberg.Files; | |
import org.apache.iceberg.PartitionSpec; | |
import org.apache.iceberg.Schema; | |
import org.apache.iceberg.Table; | |
import org.apache.iceberg.catalog.Catalog; | |
import org.apache.iceberg.catalog.TableIdentifier; | |
import org.apache.iceberg.catalog.Namespace; | |
import org.apache.iceberg.data.GenericRecord; | |
import org.apache.iceberg.data.parquet.GenericParquetWriter; | |
import org.apache.iceberg.data.Record; | |
import org.apache.iceberg.io.DataWriter; | |
import org.apache.iceberg.io.OutputFile; | |
import org.apache.iceberg.jdbc.JdbcCatalog; | |
import org.apache.iceberg.parquet.Parquet; | |
import org.apache.iceberg.types.Types; | |
import org.apache.iceberg.hadoop.HadoopFileIO; | |
import org.apache.hadoop.conf.Configuration; | |
import com.google.common.collect.ImmutableList; | |
import com.google.common.collect.ImmutableMap; | |
import java.util.UUID; | |
// Load a catalog | |
Map<String, String> properties = new HashMap<>(); | |
properties.put(CatalogProperties.CATALOG_IMPL, JdbcCatalog.class.getName()); | |
properties.put(CatalogProperties.URI, "jdbc:postgresql://postgres:5432/demo_catalog"); | |
properties.put(JdbcCatalog.PROPERTY_PREFIX + "user", "admin"); | |
properties.put(JdbcCatalog.PROPERTY_PREFIX + "password", "password"); | |
properties.put(CatalogProperties.WAREHOUSE_LOCATION, "/home/iceberg/warehouse"); | |
properties.put(CatalogProperties.FILE_IO_IMPL, HadoopFileIO.class.getName()); | |
JdbcCatalog catalog = new JdbcCatalog(); | |
Configuration conf = new Configuration(); | |
catalog.setConf(conf); | |
catalog.initialize("demo", properties); | |
// Create a table with a schema | |
Namespace nyc = Namespace.of("nyc"); | |
TableIdentifier name = TableIdentifier.of(nyc, "test3"); | |
Schema schema = new Schema( | |
Types.NestedField.required(1, "level", Types.StringType.get())); | |
catalog.createTable(name, schema); | |
// Create a record | |
GenericRecord record = GenericRecord.create(schema); | |
ImmutableList.Builder<Record> builder = ImmutableList.builder(); | |
builder.add(record.copy(ImmutableMap.of("level", "debug"))); | |
builder.add(record.copy(ImmutableMap.of("level", "info"))); | |
builder.add(record.copy(ImmutableMap.of("level", "error"))); | |
builder.add(record.copy(ImmutableMap.of("level", "fatal"))); | |
// Write record and then convert the data writer to a data file | |
File testFile = new File("/home/iceberg/warehouse/" + UUID.randomUUID().toString()); | |
OutputFile file = Files.localOutput(testFile); | |
DataWriter<Record> dataWriter = | |
Parquet.writeData(file) | |
.schema(schema) | |
.createWriterFunc(GenericParquetWriter::buildWriter) | |
.overwrite() | |
.withSpec(PartitionSpec.unpartitioned()) | |
.build(); | |
try { | |
for (Record record : builder.build()) { | |
dataWriter.write(record); | |
} | |
} finally { | |
dataWriter.close(); | |
} | |
DataFile dataFile = dataWriter.toDataFile(); | |
// Load the table, append the data file, and commit it | |
Table tbl = catalog.loadTable(name); | |
tbl.newAppend().appendFile(dataFile).commit() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment