Last active
December 21, 2015 06:09
-
-
Save haosdent/6262185 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package me.haosdent.test; | |
import java.io.IOException; | |
import java.util.LinkedList; | |
import java.util.List; | |
import java.util.concurrent.Callable; | |
import java.util.concurrent.Executors; | |
import java.util.concurrent.atomic.AtomicInteger; | |
import java.util.concurrent.atomic.AtomicLong; | |
import org.apache.hadoop.conf.Configuration; | |
import org.apache.hadoop.fs.FSDataOutputStream; | |
import org.apache.hadoop.fs.FileSystem; | |
import org.apache.hadoop.fs.Path; | |
import com.google.common.util.concurrent.ListenableFuture; | |
import com.google.common.util.concurrent.ListeningExecutorService; | |
import com.google.common.util.concurrent.MoreExecutors; | |
public class Test { | |
ListeningExecutorService concurrentor; | |
Configuration conf = new Configuration(); | |
List<ListenableFuture<Object>> futureList = new LinkedList<ListenableFuture<Object>>(); | |
FileSystem fs; | |
byte[] buffer; | |
int threadNum; | |
String path; | |
int size; | |
int bufferSize; | |
int count; | |
String syncType; | |
long[] totalTimes; | |
long[] syncTimes; | |
long ioStart; | |
long ioEnd; | |
AtomicLong atomCount = new AtomicLong(0); | |
public Test(int threadNum, String path, int size, int bufferSize, | |
String syncType) { | |
this.threadNum = threadNum; | |
this.path = path; | |
this.size = size; | |
this.bufferSize = bufferSize; | |
this.syncType = syncType; | |
this.count = size / bufferSize; | |
this.totalTimes = new long[threadNum]; | |
this.syncTimes = new long[threadNum * count]; | |
concurrentor = MoreExecutors.listeningDecorator(Executors | |
.newFixedThreadPool(threadNum)); | |
buffer = new byte[bufferSize]; | |
for (int i = 0; i < bufferSize; i++) { | |
buffer[i] = (byte) (i % 50); | |
} | |
try { | |
// conf.set("fs.defaultFS", "hdfs://10.232.98.30:7000"); | |
fs = FileSystem.get(conf); | |
} catch (IOException e) { | |
e.printStackTrace(); | |
} | |
} | |
public void doIO() { | |
this.ioStart = System.currentTimeMillis(); | |
for (int i = 0; i < threadNum; i++) { | |
final int index = i; | |
ListenableFuture<Object> future = concurrentor | |
.submit(new Callable<Object>() { | |
@Override | |
public Object call() { | |
try { | |
Path path = new Path("/user/haosong.hhs/test/" + syncType + "_" | |
+ bufferSize + "_" + index); | |
FSDataOutputStream out = fs.create(path, true, bufferSize); | |
long writeStart = System.currentTimeMillis(); | |
System.out.println("Start: " + index + ", " + writeStart); | |
for (int j = 0; j < count; j++) { | |
out.write(buffer); | |
long syncStart = System.nanoTime(); | |
if ("hsync".equals(syncType)) { | |
out.hsync(); | |
} else if ("hflush".equals(syncType)) { | |
out.hflush(); | |
} | |
atomCount.addAndGet(1); | |
long syncEnd = System.nanoTime(); | |
System.out.println("Hsync Time: " + (index * count + j) + "," | |
+ (syncEnd - syncStart)); | |
syncTimes[index * count + j] = syncEnd - syncStart; | |
} | |
long writeEnd = System.currentTimeMillis(); | |
totalTimes[index] = writeEnd - writeStart; | |
out.close(); | |
System.out.println("End: " + index + ", " + writeEnd); | |
} catch (IOException e) { | |
e.printStackTrace(); | |
} | |
return null; | |
} | |
}); | |
futureList.add(future); | |
} | |
for (ListenableFuture<Object> future : futureList) { | |
try { | |
future.get(); | |
} catch (Exception e) { | |
e.printStackTrace(); | |
} | |
} | |
this.ioEnd =System.currentTimeMillis(); | |
} | |
public void stat() { | |
System.out.println("============ Test Result =============="); | |
System.out.println("Thread Num: " + threadNum); | |
System.out.println("Buffer Size: " + bufferSize); | |
System.out.println("File Size: " + size); | |
System.out.println("Sync Type: " + syncType); | |
int totalSpeed = 0; | |
for (int i = 0; i < threadNum; i++) { | |
if (totalTimes[i] != 0) { | |
totalSpeed += size / totalTimes[i]; | |
} | |
} | |
int averSpeed = totalSpeed / threadNum; | |
System.out.println("Average Speed: " + averSpeed + " KB/s"); | |
long totalSyncTime = 0; | |
for (int i = 0; i < syncTimes.length; i++) { | |
if(i % count != 0){ | |
totalSyncTime += syncTimes[i]; | |
} | |
} | |
long averSyncTime = totalSyncTime / (syncTimes.length - threadNum); | |
System.out.println("Average Sync Time: " + averSyncTime); | |
double averSyncCount = atomCount.get() / (double) (this.ioEnd - this.ioStart); | |
System.out.println("Sync Count per second: " + (averSyncCount * 1000)); | |
long size = this.size; | |
long throughput = size * this.threadNum / (this.ioEnd - this.ioStart); | |
System.out.println("Throughput: " + throughput + " KB/s"); | |
} | |
public static void main(String[] args) { | |
// args = new String[] { "1", "hsync", "10000", "100", "hsync" }; | |
int threadNum = Integer.parseInt(args[0]); | |
String path = args[1]; | |
int size = Integer.parseInt(args[2]); | |
int bufferSize = Integer.parseInt(args[3]); | |
String syncType = args[4]; | |
Test test = new Test(threadNum, path, size, bufferSize, syncType); | |
test.doIO(); | |
test.stat(); | |
System.exit(0); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment