Created
June 4, 2018 14:07
-
-
Save vinothchandar/7cd380a213653b0ad80b973260d0632f to your computer and use it in GitHub Desktop.
Hoodie MicroBenchmark
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import com.uber.hoodie.common.model.HoodieLogFile; | |
import com.uber.hoodie.common.table.log.HoodieLogFileReader; | |
import com.uber.hoodie.common.table.log.block.HoodieAvroDataBlock; | |
import com.uber.hoodie.common.table.log.block.HoodieLogBlock; | |
import com.uber.hoodie.common.table.log.block.HoodieLogBlock.HoodieLogBlockType; | |
import com.uber.hoodie.common.util.FSUtils; | |
import com.uber.hoodie.common.util.ParquetUtils; | |
import com.uber.hoodie.exception.HoodieIOException; | |
import java.io.IOException; | |
import java.util.ArrayList; | |
import java.util.HashSet; | |
import java.util.Iterator; | |
import java.util.List; | |
import java.util.Set; | |
import org.apache.avro.Schema; | |
import org.apache.avro.generic.GenericRecord; | |
import org.apache.avro.generic.IndexedRecord; | |
import org.apache.hadoop.conf.Configuration; | |
import org.apache.hadoop.fs.BufferedFSInputStream; | |
import org.apache.hadoop.fs.FSDataInputStream; | |
import org.apache.hadoop.fs.FSInputStream; | |
import org.apache.hadoop.fs.FileSystem; | |
import org.apache.hadoop.fs.Path; | |
import org.apache.parquet.avro.AvroParquetReader; | |
import org.apache.parquet.hadoop.ParquetReader; | |
public class HoodieMicroBench { | |
static final Path logPath = new Path( | |
"file:///home/vinoth/hdd/cache/hoodie-bench/nyc-taxi-data-hoodie-gen/green/.1f7286f4-bad2-4db0-871e-358a777fb22d_001.log.1"); | |
static final Path parquetPath = new Path( | |
"file:///home/vinoth/hdd/cache/hoodie-bench/nyc-taxi-data-hoodie-gen/green/1f7286f4-bad2-4db0-871e-358a777fb22d_0_001.parquet"); | |
public static void timeHoodieParquetRecordRead() throws Exception { | |
long startMs = System.currentTimeMillis(); | |
ParquetReader reader = null; | |
long totalRecords = 0; | |
try { | |
reader = AvroParquetReader.builder(parquetPath).withConf(new Configuration()).build(); | |
Object obj = reader.read(); | |
while (obj != null) { | |
if (obj instanceof GenericRecord) { | |
totalRecords++; | |
} | |
obj = reader.read(); | |
} | |
} catch (IOException e) { | |
throw new HoodieIOException("Failed to read avro records from Parquet " + parquetPath, e); | |
} finally { | |
if (reader != null) { | |
try { | |
reader.close(); | |
} catch (IOException e) { | |
// ignore | |
} | |
} | |
} | |
System.out.println( | |
"ParquetRead (Full record): Read " + totalRecords + " records in " + ( | |
System.currentTimeMillis() | |
- startMs) + " ms"); | |
} | |
public static void timeHoodieParquetKeyRead() throws Exception { | |
long startMs = System.currentTimeMillis(); | |
Set<String> filter = new HashSet<>(); | |
filter.add("dummy_key"); | |
Set<String> rowKeys = ParquetUtils | |
.filterParquetRowKeys(new Configuration(), parquetPath, filter); | |
System.out.println( | |
"ParquetRead (Key): Read " + rowKeys.size() + " keys in " + (System.currentTimeMillis() | |
- startMs) + " ms"); | |
} | |
public static void timeHoodieLogFileReader() throws Exception { | |
Schema schema = ParquetUtils.readAvroSchema(new Configuration(), parquetPath); | |
HoodieLogFile logFile = new HoodieLogFile(logPath); | |
long startMs = System.currentTimeMillis(); | |
HoodieLogFileReader logFileReader = new HoodieLogFileReader( | |
FSUtils.getFs(logPath.toString(), new Configuration()), logFile, schema, 1024 * 1024, true, | |
false); | |
long totalBlocks = 0; | |
List<HoodieAvroDataBlock> dataBlocks = new ArrayList<>(); | |
while (logFileReader.hasNext()) { | |
HoodieLogBlock logBlock = logFileReader.next(); | |
totalBlocks += 1; | |
if (logBlock.getBlockType().equals(HoodieLogBlockType.AVRO_DATA_BLOCK)) { | |
dataBlocks.add((HoodieAvroDataBlock) logBlock); | |
} | |
} | |
System.out.println( | |
"HoodieLogFileReader: Read " + totalBlocks + " block metadata in " + ( | |
System.currentTimeMillis() - startMs) | |
+ " ms"); | |
startMs = System.currentTimeMillis(); | |
long totalRecords = 0; | |
Iterator<HoodieAvroDataBlock> dataBlockIterator = dataBlocks.iterator(); | |
while (dataBlockIterator.hasNext()) { | |
HoodieAvroDataBlock dataBlock = dataBlockIterator.next(); | |
long blockReadStartMs = System.currentTimeMillis(); | |
for (IndexedRecord record : dataBlock.getRecords()) { | |
totalRecords += 1; | |
} | |
//System.out.println("Read block in " + (System.currentTimeMillis() - blockReadStartMs)); | |
dataBlockIterator.remove(); | |
} | |
System.out.println( | |
"HoodieLogFileReader: Read " + totalRecords + " avro records in " + ( | |
System.currentTimeMillis() - startMs) | |
+ " ms"); | |
} | |
public static void timeLogBytesRead() throws Exception { | |
long startMs = System.currentTimeMillis(); | |
FileSystem fs = FSUtils.getFs(logPath.toString(), new Configuration()); | |
final int bufferSize = 1 * 1024 * 1024; | |
byte[] buffer = new byte[bufferSize]; | |
FSDataInputStream inputStream = new FSDataInputStream( | |
new BufferedFSInputStream( | |
(FSInputStream) fs.open(logPath, bufferSize).getWrappedStream(), | |
bufferSize)); | |
long totalBytes = 0; | |
while (true) { | |
int read = inputStream.read(buffer); | |
totalBytes += read; | |
if (read < buffer.length) { | |
break; | |
} | |
} | |
System.out.println( | |
"RawLogBytesRead: Read " + totalBytes + " bytes in " + (System.currentTimeMillis() | |
- startMs) + " ms"); | |
} | |
public static void main(String[] args) throws Exception { | |
//timeLogBytesRead(); | |
timeHoodieLogFileReader(); | |
//timeHoodieParquetKeyRead(); | |
//timeHoodieParquetRecordRead(); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment