Skip to content

Instantly share code, notes, and snippets.

@haosdent
Last active December 21, 2015 06:09
Show Gist options
  • Save haosdent/6262185 to your computer and use it in GitHub Desktop.
Save haosdent/6262185 to your computer and use it in GitHub Desktop.
package me.haosdent.test;
import java.io.IOException;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
public class Test {
ListeningExecutorService concurrentor;
Configuration conf = new Configuration();
List<ListenableFuture<Object>> futureList = new LinkedList<ListenableFuture<Object>>();
FileSystem fs;
byte[] buffer;
int threadNum;
String path;
int size;
int bufferSize;
int count;
String syncType;
long[] totalTimes;
long[] syncTimes;
long ioStart;
long ioEnd;
AtomicLong atomCount = new AtomicLong(0);
public Test(int threadNum, String path, int size, int bufferSize,
String syncType) {
this.threadNum = threadNum;
this.path = path;
this.size = size;
this.bufferSize = bufferSize;
this.syncType = syncType;
this.count = size / bufferSize;
this.totalTimes = new long[threadNum];
this.syncTimes = new long[threadNum * count];
concurrentor = MoreExecutors.listeningDecorator(Executors
.newFixedThreadPool(threadNum));
buffer = new byte[bufferSize];
for (int i = 0; i < bufferSize; i++) {
buffer[i] = (byte) (i % 50);
}
try {
// conf.set("fs.defaultFS", "hdfs://10.232.98.30:7000");
fs = FileSystem.get(conf);
} catch (IOException e) {
e.printStackTrace();
}
}
public void doIO() {
this.ioStart = System.currentTimeMillis();
for (int i = 0; i < threadNum; i++) {
final int index = i;
ListenableFuture<Object> future = concurrentor
.submit(new Callable<Object>() {
@Override
public Object call() {
try {
Path path = new Path("/user/haosong.hhs/test/" + syncType + "_"
+ bufferSize + "_" + index);
FSDataOutputStream out = fs.create(path, true, bufferSize);
long writeStart = System.currentTimeMillis();
System.out.println("Start: " + index + ", " + writeStart);
for (int j = 0; j < count; j++) {
out.write(buffer);
long syncStart = System.nanoTime();
if ("hsync".equals(syncType)) {
out.hsync();
} else if ("hflush".equals(syncType)) {
out.hflush();
}
atomCount.addAndGet(1);
long syncEnd = System.nanoTime();
System.out.println("Hsync Time: " + (index * count + j) + ","
+ (syncEnd - syncStart));
syncTimes[index * count + j] = syncEnd - syncStart;
}
long writeEnd = System.currentTimeMillis();
totalTimes[index] = writeEnd - writeStart;
out.close();
System.out.println("End: " + index + ", " + writeEnd);
} catch (IOException e) {
e.printStackTrace();
}
return null;
}
});
futureList.add(future);
}
for (ListenableFuture<Object> future : futureList) {
try {
future.get();
} catch (Exception e) {
e.printStackTrace();
}
}
this.ioEnd =System.currentTimeMillis();
}
public void stat() {
System.out.println("============ Test Result ==============");
System.out.println("Thread Num: " + threadNum);
System.out.println("Buffer Size: " + bufferSize);
System.out.println("File Size: " + size);
System.out.println("Sync Type: " + syncType);
int totalSpeed = 0;
for (int i = 0; i < threadNum; i++) {
if (totalTimes[i] != 0) {
totalSpeed += size / totalTimes[i];
}
}
int averSpeed = totalSpeed / threadNum;
System.out.println("Average Speed: " + averSpeed + " KB/s");
long totalSyncTime = 0;
for (int i = 0; i < syncTimes.length; i++) {
if(i % count != 0){
totalSyncTime += syncTimes[i];
}
}
long averSyncTime = totalSyncTime / (syncTimes.length - threadNum);
System.out.println("Average Sync Time: " + averSyncTime);
double averSyncCount = atomCount.get() / (double) (this.ioEnd - this.ioStart);
System.out.println("Sync Count per second: " + (averSyncCount * 1000));
long size = this.size;
long throughput = size * this.threadNum / (this.ioEnd - this.ioStart);
System.out.println("Throughput: " + throughput + " KB/s");
}
public static void main(String[] args) {
// args = new String[] { "1", "hsync", "10000", "100", "hsync" };
int threadNum = Integer.parseInt(args[0]);
String path = args[1];
int size = Integer.parseInt(args[2]);
int bufferSize = Integer.parseInt(args[3]);
String syncType = args[4];
Test test = new Test(threadNum, path, size, bufferSize, syncType);
test.doIO();
test.stat();
System.exit(0);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment