|
|
|
// import org.apache.commons.lang3.RandomUtils; |
|
import org.apache.hadoop.conf.Configuration; |
|
import org.apache.hadoop.fs.FSDataInputStream; |
|
import org.apache.hadoop.fs.FileSystem; |
|
import org.apache.hadoop.fs.Path; |
|
|
|
import com.google.common.base.Stopwatch; |
|
|
|
import java.io.IOException; |
|
import java.util.Random; |
|
|
|
|
|
/** |
|
* Licensed to the Apache Software Foundation (ASF) under one or more |
|
* contributor license agreements. See the NOTICE file distributed with this |
|
* work for additional information regarding copyright ownership. The ASF |
|
* licenses this file to you under the Apache License, Version 2.0 (the |
|
* "License"); you may not use this file except in compliance with the License. |
|
* You may obtain a copy of the License at |
|
* <p/> |
|
* http://www.apache.org/licenses/LICENSE-2.0 |
|
* <p/> |
|
* Unless required by applicable law or agreed to in writing, software |
|
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT |
|
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the |
|
* License for the specific language governing permissions and limitations under |
|
* the License. |
|
*/ |
|
public class HdfsSeekRead { |
|
|
|
//Size to read in every read operation in random read |
|
static final int READ_SIZE = 1024 * 1024; |
|
|
|
//How much to skip in random read. Q: What if someone has a TCP read buffer as 32 MB? |
|
static final int SKIP_SIZE = 4 * 1024 * 1024; |
|
|
|
/** |
|
* Read sequentially some "X" MB of data |
|
* |
|
* @param fdis |
|
* @param target |
|
* @throws IOException |
|
*/ |
|
public static void readSequential(FSDataInputStream fdis, int target) |
|
throws IOException { |
|
long totalRead = 0; |
|
long totalReadMB = 0; |
|
Stopwatch stopWatch = new Stopwatch(); |
|
stopWatch.start(); |
|
while (fdis.available() != -1 && totalReadMB < target) { |
|
byte[] buf = new byte[4096]; |
|
|
|
/** |
|
* This would NOT go through pread path in DFSInputStream. |
|
*/ |
|
totalRead += fdis.read(buf); |
|
|
|
if (totalRead > READ_SIZE) { |
|
totalReadMB += totalRead / (READ_SIZE); |
|
totalRead -= READ_SIZE; |
|
System.out.println("Read : " + totalReadMB); |
|
} |
|
} |
|
stopWatch.stop(); |
|
System.out.println("Time taken for readSequential : " |
|
+ stopWatch.elapsedMillis() + ", totalReadMB=" + totalReadMB); |
|
} |
|
|
|
/** |
|
* Seek to a position, read READ_SIZE amount of data. |
|
* Repeat this "X" number of times. |
|
* |
|
* Ensure to keep it within single block limit (so that it can read from same node) |
|
* |
|
* |
|
* @param fdis |
|
* @param target number of times random read has to be performed |
|
* @throws IOException |
|
* @throws InterruptedException |
|
*/ |
|
public static void readRandom(FSDataInputStream fdis, int target) |
|
throws IOException, InterruptedException { |
|
|
|
long seekPos = 1; |
|
|
|
Stopwatch stopWatch = new Stopwatch(); |
|
stopWatch.start(); |
|
for (int i = 0; i < target; i++) { |
|
fdis.seek(seekPos); |
|
|
|
byte[] buffer = new byte[READ_SIZE]; |
|
|
|
/** |
|
* This goes through "pread" path which turns out to be expensive. |
|
* |
|
* Connection gets evicted from PeerCache after "dfs.client.socketcache.expiryMsec" (default 3000) |
|
* 1. As a part of FS open, it sets up a connection to NN |
|
* 2. As a part of connecting to DN, it sets up connection once and later adds it to PeerCache (eviction 3000 ms) |
|
* 3. But everytime, It ends up setting a BlockReader everytime even though connection etc is reused. |
|
* - Locally it takes 250+ ms to build BlockReader. This is expensive in DN as well which is impacts the following |
|
* - RemoteBlockReader2.readNextPacket() --> packetReceiver.receiveNextPacket(in); |
|
* - DN is doing the work during this time with expensive DXReceiver. |
|
* |
|
* This causes substantial delays when compared to sequential read. Hence random reads are atleast 4x slower. |
|
*/ |
|
fdis.readFully(buffer, 0, READ_SIZE); |
|
|
|
System.out.println("Read from seekPos:" + seekPos + ", time taken:" + stopWatch.elapsedMillis()); |
|
|
|
seekPos += SKIP_SIZE; |
|
} |
|
stopWatch.stop(); |
|
System.out.println("Time taken for readRandom : " |
|
+ stopWatch.elapsedMillis()); |
|
} |
|
|
|
/** |
|
* Seek randomly anywhere in the file and read some data. |
|
* Data can span across different data nodes. |
|
* |
|
* NOTE: HAVE NOT ANALYZED THIS YET |
|
* |
|
* @param fdis |
|
* @param fileLen |
|
* @param target |
|
* @throws IOException |
|
*/ |
|
public static void readRandom(FSDataInputStream fdis, long fileLen, int target) throws IOException { |
|
Stopwatch stopWatch = new Stopwatch(); |
|
stopWatch.start(); |
|
|
|
for (int i = 0; i < target; i++) { |
|
int seekPos = (int) (new Random()).nextInt(((int)fileLen)-READ_SIZE-1); |
|
fdis.seek(Math.max(0, seekPos)); |
|
|
|
byte[] buffer = new byte[READ_SIZE]; |
|
fdis.readFully(buffer, 0, READ_SIZE); |
|
} |
|
|
|
stopWatch.stop(); |
|
System.out.println("Time taken for readRandom : " |
|
+ stopWatch.elapsedMillis() + ", totalTarget : " + target); |
|
|
|
} |
|
|
|
public static void main(String[] args) throws Exception { |
|
String identifier = "HdfsSeekRead-"+System.currentTimeMillis() |
|
System.out.println("Starting with " + identifier); |
|
|
|
Configuration conf = new Configuration(); |
|
// dfs client id is read from this (sadly) |
|
conf.set("mapreduce.task.attempt.id", identifier); |
|
Path path = new Path(args[0]); |
|
System.out.println("Reading from " + path); |
|
|
|
/* |
|
try (FileSystem fs = path.getFileSystem(conf); |
|
FSDataInputStream fdis = fs.open(path)) { |
|
// SequentialRead |
|
readSequential(fdis, 10); |
|
|
|
} catch (IOException ioe) { |
|
ioe.printStackTrace(); |
|
} |
|
*/ |
|
|
|
|
|
|
|
try (FileSystem fs = path.getFileSystem(conf); |
|
FSDataInputStream fdis = fs.open(path)) { |
|
// Read random |
|
for (int i = 0; i < 1024; i++) { |
|
fdis.seek(0); |
|
readRandom(fdis, 10); |
|
} |
|
} catch (IOException ioe) { |
|
ioe.printStackTrace(); |
|
} |
|
|
|
|
|
|
|
System.out.println("Done!!"); |
|
System.out.println("Grep your hdfs logs for " + identifier); |
|
} |
|
} |