Last active
March 6, 2020 01:22
-
-
Save nsivabalan/8cbb0fb58271f8e2ddf62efdf599a569 to your computer and use it in GitHub Desktop.
Simple Index using spark joins
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// HoodieSparkJoinIndex.java | |
/** | |
* Find <HoodieKey, HoodieRecordLocation> for all incoming HoodieKeys | |
*/ | |
@VisibleForTesting | |
JavaPairRDD<HoodieKey, HoodieRecordLocation> findMatchingFilesForRecordKeysLocal(JavaSparkContext jsc, | |
List<Tuple2<String, String>> partitionToFileIndexInfo, | |
JavaPairRDD<String, String> partitionRecordKeyPairRDD, HoodieTable hoodieTable) { | |
// Step 1: Create JavaPairRDD< Tuple2<PartitionPath, RecordKey>, Optional<HoodieRecordLocation> > from input with Optional<HoodieRecordLocation> as Empty. | |
JavaPairRDD<Tuple2<String, String>, Optional<HoodieRecordLocation>> partitionRecordPairs = | |
partitionRecordKeyPairRDD.mapToPair((PairFunction<Tuple2<String, String>, Tuple2<String, String>, Optional<HoodieRecordLocation>>) stringStringTuple2 | |
-> new Tuple2(stringStringTuple2, Optional.absent())); | |
// Step 2: Create JavaRDD< Tuple2 <Partition, FileId> > from partitions to be touched | |
JavaRDD<Tuple2<String, String>> partitionFileIdTupleRDD = jsc.parallelize(partitionToFileIndexInfo); | |
// Step 3: For each partiion, fileId Tuple -> Fetch RDD of triplets ( Tuple2 <PartitionPath, recordKey>, HoodieRecordLocation ) | |
JavaRDD<Tuple2<Tuple2<String, String>, java.util.Optional<HoodieRecordLocation>>> partitionFileIdLocationEntries = partitionFileIdTupleRDD.flatMap(partitionFileIdTuple -> { | |
HoodieDataFile latestDataFile = getLatestDataFile(hoodieTable, Pair.of(partitionFileIdTuple._1, partitionFileIdTuple._2)); | |
List<Pair<Pair<String, String>, java.util.Optional<HoodieRecordLocation>>> resultList = ParquetUtils.filterParquetRows(hoodieTable.getHadoopConf(), new Path(partitionFileIdTuple._2), | |
null, HoodieAvroUtils.getRecordKeyPartitionPathFileIdSchema(), latestDataFile.getCommitTime()); | |
List<Tuple2<Tuple2<String, String>, java.util.Optional<HoodieRecordLocation>>> toReturn = new ArrayList<>(); | |
resultList.forEach(entry -> toReturn.add(new Tuple2<>(new Tuple2<>(entry.getLeft().getLeft(), entry.getLeft().getRight()), | |
entry.getRight()))); | |
return toReturn.iterator(); | |
}); | |
// Step4: convert result from step3 to pair of (Tuple2 <PartitionPath, recordKey>, HoodieRecordLocation) | |
JavaPairRDD<Tuple2<String, String>, java.util.Optional<HoodieRecordLocation>> partitionFileIdRecLocationEntryPairRDD = partitionFileIdLocationEntries.mapToPair( | |
(PairFunction<Tuple2<Tuple2<String, String>, java.util.Optional<HoodieRecordLocation>>, Tuple2<String, String>, java.util.Optional<HoodieRecordLocation>>) | |
w -> new Tuple2<Tuple2<String, String>, java.util.Optional<HoodieRecordLocation>>(w._1, w._2)); | |
// Step 5: Join output from step1 and step4 to find location for all (partition, recordKey) pairs | |
JavaPairRDD<Tuple2<String, String>, Tuple2<Optional<HoodieRecordLocation>, java.util.Optional<HoodieRecordLocation>>> taggedRecords = partitionRecordPairs.join(partitionFileIdRecLocationEntryPairRDD); | |
// Convert result from step5 to JavaPairRDD<HoodieKey, HoodieRecordLocation> | |
JavaPairRDD<HoodieKey, HoodieRecordLocation> result = taggedRecords.mapToPair( | |
(PairFunction<Tuple2<Tuple2<String, String>, Tuple2<Optional<HoodieRecordLocation>, java.util.Optional<HoodieRecordLocation>>>, HoodieKey, HoodieRecordLocation>) | |
entry -> new Tuple2(new HoodieKey(entry._1._2, entry._1._1), entry._2._2.get())); | |
return result; | |
} | |
// ParquetUtils.java | |
/** | |
* Read the rowKey list matching the given filter, from the given parquet file. If the filter is empty, then this will | |
* return all the rowkeys. | |
* | |
* @param filePath The parquet file path. | |
* @param configuration configuration to build fs object | |
* @param filter record keys filter | |
* @param readSchema schema of columns to be read | |
* @return Set Set of row keys matching candidateRecordKeys | |
*/ | |
public static List<Pair<Pair<String, String>, Optional<HoodieRecordLocation>>> filterParquetRows(Configuration configuration, Path filePath, Set<String> filter, | |
Schema readSchema, String baseInstantTime) { | |
Option<RecordKeysFilterFunction> filterFunction = Option.empty(); | |
if (filter != null && !filter.isEmpty()) { | |
filterFunction = Option.of(new RecordKeysFilterFunction(filter)); | |
} | |
List<Pair<Pair<String, String>, Optional<HoodieRecordLocation>>> rows = new ArrayList<>(); | |
// try { | |
Configuration conf = new Configuration(configuration); | |
// conf.addResource(filePath.getFileSystem(configuration).getConf()); | |
conf.addResource(FSUtils.getFs(filePath.toString(), conf).getConf()); | |
// conf.addResource(configuration); | |
AvroReadSupport.setAvroReadSchema(conf, readSchema); | |
AvroReadSupport.setRequestedProjection(conf, readSchema); | |
try (ParquetReader reader = AvroParquetReader.builder(filePath).withConf(conf).build()) { | |
Object obj = reader.read(); | |
while (obj != null) { | |
if (obj instanceof GenericRecord) { | |
String recordKey = ((GenericRecord) obj).get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString(); | |
if (!filterFunction.isPresent() || filterFunction.get().apply(recordKey)) { | |
String partitionPath = ((GenericRecord) obj).get(HoodieRecord.PARTITION_PATH_METADATA_FIELD).toString(); | |
String fileIdName = ((GenericRecord) obj).get(HoodieRecord.FILENAME_METADATA_FIELD).toString(); | |
rows.add(Pair.of(Pair.of(partitionPath, recordKey), Optional.of(new HoodieRecordLocation(baseInstantTime, fileIdName)))); | |
} | |
} | |
obj = reader.read(); | |
} | |
} catch (IOException e) { | |
throw new HoodieIOException("Failed to read row keys from Parquet 111 " + filePath, e); | |
} | |
/* } | |
catch (IOException ioe){ | |
throw new HoodieIOException("Failed to read row keys from Parquet 222 " + filePath, ioe); | |
} | |
*/ | |
System.out.println("Returning rows for " + filePath.toString()); | |
return rows; | |
} | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment