Skip to content

Instantly share code, notes, and snippets.

@nsivabalan
Last active March 6, 2020 01:22
Show Gist options
  • Save nsivabalan/8cbb0fb58271f8e2ddf62efdf599a569 to your computer and use it in GitHub Desktop.
Save nsivabalan/8cbb0fb58271f8e2ddf62efdf599a569 to your computer and use it in GitHub Desktop.
Simple Index using spark joins
// HoodieSparkJoinIndex.java
/**
* Find <HoodieKey, HoodieRecordLocation> for all incoming HoodieKeys
*/
@VisibleForTesting
JavaPairRDD<HoodieKey, HoodieRecordLocation> findMatchingFilesForRecordKeysLocal(JavaSparkContext jsc,
List<Tuple2<String, String>> partitionToFileIndexInfo,
JavaPairRDD<String, String> partitionRecordKeyPairRDD, HoodieTable hoodieTable) {
// Step 1: Create JavaPairRDD< Tuple2<PartitionPath, RecordKey>, Optional<HoodieRecordLocation> > from input with Optional<HoodieRecordLocation> as Empty.
JavaPairRDD<Tuple2<String, String>, Optional<HoodieRecordLocation>> partitionRecordPairs =
partitionRecordKeyPairRDD.mapToPair((PairFunction<Tuple2<String, String>, Tuple2<String, String>, Optional<HoodieRecordLocation>>) stringStringTuple2
-> new Tuple2(stringStringTuple2, Optional.absent()));
// Step 2: Create JavaRDD< Tuple2 <Partition, FileId> > from partitions to be touched
JavaRDD<Tuple2<String, String>> partitionFileIdTupleRDD = jsc.parallelize(partitionToFileIndexInfo);
// Step 3: For each partiion, fileId Tuple -> Fetch RDD of triplets ( Tuple2 <PartitionPath, recordKey>, HoodieRecordLocation )
JavaRDD<Tuple2<Tuple2<String, String>, java.util.Optional<HoodieRecordLocation>>> partitionFileIdLocationEntries = partitionFileIdTupleRDD.flatMap(partitionFileIdTuple -> {
HoodieDataFile latestDataFile = getLatestDataFile(hoodieTable, Pair.of(partitionFileIdTuple._1, partitionFileIdTuple._2));
List<Pair<Pair<String, String>, java.util.Optional<HoodieRecordLocation>>> resultList = ParquetUtils.filterParquetRows(hoodieTable.getHadoopConf(), new Path(partitionFileIdTuple._2),
null, HoodieAvroUtils.getRecordKeyPartitionPathFileIdSchema(), latestDataFile.getCommitTime());
List<Tuple2<Tuple2<String, String>, java.util.Optional<HoodieRecordLocation>>> toReturn = new ArrayList<>();
resultList.forEach(entry -> toReturn.add(new Tuple2<>(new Tuple2<>(entry.getLeft().getLeft(), entry.getLeft().getRight()),
entry.getRight())));
return toReturn.iterator();
});
// Step4: convert result from step3 to pair of (Tuple2 <PartitionPath, recordKey>, HoodieRecordLocation)
JavaPairRDD<Tuple2<String, String>, java.util.Optional<HoodieRecordLocation>> partitionFileIdRecLocationEntryPairRDD = partitionFileIdLocationEntries.mapToPair(
(PairFunction<Tuple2<Tuple2<String, String>, java.util.Optional<HoodieRecordLocation>>, Tuple2<String, String>, java.util.Optional<HoodieRecordLocation>>)
w -> new Tuple2<Tuple2<String, String>, java.util.Optional<HoodieRecordLocation>>(w._1, w._2));
// Step 5: Join output from step1 and step4 to find location for all (partition, recordKey) pairs
JavaPairRDD<Tuple2<String, String>, Tuple2<Optional<HoodieRecordLocation>, java.util.Optional<HoodieRecordLocation>>> taggedRecords = partitionRecordPairs.join(partitionFileIdRecLocationEntryPairRDD);
// Convert result from step5 to JavaPairRDD<HoodieKey, HoodieRecordLocation>
JavaPairRDD<HoodieKey, HoodieRecordLocation> result = taggedRecords.mapToPair(
(PairFunction<Tuple2<Tuple2<String, String>, Tuple2<Optional<HoodieRecordLocation>, java.util.Optional<HoodieRecordLocation>>>, HoodieKey, HoodieRecordLocation>)
entry -> new Tuple2(new HoodieKey(entry._1._2, entry._1._1), entry._2._2.get()));
return result;
}
// ParquetUtils.java
/**
* Read the rowKey list matching the given filter, from the given parquet file. If the filter is empty, then this will
* return all the rowkeys.
*
* @param filePath The parquet file path.
* @param configuration configuration to build fs object
* @param filter record keys filter
* @param readSchema schema of columns to be read
* @return Set Set of row keys matching candidateRecordKeys
*/
public static List<Pair<Pair<String, String>, Optional<HoodieRecordLocation>>> filterParquetRows(Configuration configuration, Path filePath, Set<String> filter,
Schema readSchema, String baseInstantTime) {
Option<RecordKeysFilterFunction> filterFunction = Option.empty();
if (filter != null && !filter.isEmpty()) {
filterFunction = Option.of(new RecordKeysFilterFunction(filter));
}
List<Pair<Pair<String, String>, Optional<HoodieRecordLocation>>> rows = new ArrayList<>();
// try {
Configuration conf = new Configuration(configuration);
// conf.addResource(filePath.getFileSystem(configuration).getConf());
conf.addResource(FSUtils.getFs(filePath.toString(), conf).getConf());
// conf.addResource(configuration);
AvroReadSupport.setAvroReadSchema(conf, readSchema);
AvroReadSupport.setRequestedProjection(conf, readSchema);
try (ParquetReader reader = AvroParquetReader.builder(filePath).withConf(conf).build()) {
Object obj = reader.read();
while (obj != null) {
if (obj instanceof GenericRecord) {
String recordKey = ((GenericRecord) obj).get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString();
if (!filterFunction.isPresent() || filterFunction.get().apply(recordKey)) {
String partitionPath = ((GenericRecord) obj).get(HoodieRecord.PARTITION_PATH_METADATA_FIELD).toString();
String fileIdName = ((GenericRecord) obj).get(HoodieRecord.FILENAME_METADATA_FIELD).toString();
rows.add(Pair.of(Pair.of(partitionPath, recordKey), Optional.of(new HoodieRecordLocation(baseInstantTime, fileIdName))));
}
}
obj = reader.read();
}
} catch (IOException e) {
throw new HoodieIOException("Failed to read row keys from Parquet 111 " + filePath, e);
}
/* }
catch (IOException ioe){
throw new HoodieIOException("Failed to read row keys from Parquet 222 " + filePath, ioe);
}
*/
System.out.println("Returning rows for " + filePath.toString());
return rows;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment