Last active
September 20, 2017 14:25
-
-
Save animeshtrivedi/eda6a723d145e262494b2c656793eef1 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* Created by atr on 19.09.17. | |
*/ | |
import java.io.{BufferedWriter, File, FileWriter} | |
import com.ibm.crail.CrailFS | |
import com.ibm.crail.conf.CrailConfiguration | |
import org.apache.spark.SparkContext | |
import scala.util.Random | |
object Test { | |
val conf = new CrailConfiguration | |
val fs = CrailFS.newInstance(conf) | |
val sdir = "/spark/shuffle/" | |
val bdir = "/spark/broadcast/" | |
def enumAll(file: String): Seq[(String, Long)] = { | |
val f1 = fs.lookup(file) | |
val f2 = f1.get() | |
if (f2.getType.isDataFile) { | |
/* we got a file hence we return a sequence */ | |
Seq((f2.getPath, f2.getCapacity)) | |
} else if (f2.getType.isDirectory || f2.getType.isMultiFile) { | |
/* we enum all and call self */ | |
val itr = if(f2.getType.isDirectory) { | |
f2.asDirectory().listEntries() | |
} else { | |
f2.asMultiFile().listEntries() | |
} | |
var result = Seq[(String, Long)]() | |
while(itr.hasNext){ | |
val nx = itr.next() | |
val res = enumAll(nx) | |
result = result.++(res) | |
} | |
result | |
} else { | |
throw new Exception("Illegal file type: " + file + " it is " + f2.getType) | |
} | |
} | |
// we can just enum at the stage of a multi-file | |
def enumPerTask(file: String): Seq[(String, Long)] = { | |
val f1 = fs.lookup(file) | |
val f2 = f1.get() | |
if (f2.getType.isMultiFile) { | |
/* we got a multifile file hence we return a sequence */ | |
val mx = f2.asMultiFile() | |
val itr = mx.listEntries() | |
var totalSize = 0L | |
while(itr.hasNext){ | |
totalSize+=fs.lookup(itr.next()).get().getCapacity | |
} | |
println(" returning here with " + f2.getPath) | |
Seq(("MULTIFILE : " + f2.getPath, totalSize)) | |
} else if (f2.getType.isDirectory) { | |
/* we enum all and call self */ | |
val itr = f2.asDirectory().listEntries() | |
var result = Seq[(String, Long)]() | |
while(itr.hasNext){ | |
val nx = itr.next() | |
val res = enumPerTask(nx) | |
result = result.++(res) | |
} | |
result | |
} else { | |
throw new Exception("Illegal file type: " + file + " it is " + f2.getType) | |
} | |
} | |
def executeRDD(dir:String, spark:SparkContext):Seq[(String, Long, Int)] = { | |
val _items = enumPerTask(dir) | |
val rdd1 = spark.parallelize(_items) | |
val total = _items.size | |
// sort | |
val rdd2 = rdd1.sortBy(_._2) | |
// broadcast | |
val totalBC = spark.broadcast(total) | |
// convert | |
val rddZip = rdd2.zipWithIndex() | |
// calculate percentile | |
val rdd3= rddZip.map( k => { | |
/* we have to create a type with String, Long, Int */ | |
(k._1._1, k._1._2, ((k._2 + 1) * 100 / totalBC.value).toInt) | |
}) | |
rdd3.collect().toSeq | |
} | |
def execute(dir:String):Seq[(String, Long, Int)] = { | |
val _items = enumAll(dir) | |
val total = _items.size | |
println("Got all directories, now sorting " + _items.size + " elements ") | |
val items = _items.sortBy(_._2) | |
println(" Got all list with size : " + total) | |
var current = 0 | |
var itemsWithPercentile = Seq[(String, Long, Int)]() | |
while (current < total) { | |
val kv = items(current) | |
val entry = Seq((kv._1, kv._2, (current + 1) * 100 / total)) // percentile | |
itemsWithPercentile = itemsWithPercentile.++(entry) | |
current+=1 | |
println(" processed " + current) | |
} | |
// itemsWithPercentile.foreach(k => { | |
// println(k._2 + " bytes " + k._3 + " % file name : " + k._1) | |
// }) | |
itemsWithPercentile | |
} | |
def sizeTime(size:Int):Unit = { | |
var soFar = 0 | |
val str = "/spark/shuffle/shuffle_6/part_166/1-2-1347095298" | |
var result = Seq[(String, Long)]() | |
val rand = new Random() | |
while(soFar < size ) { | |
val key = Seq((str + soFar, rand.nextLong())) | |
result = result.++(key) | |
soFar+=1 | |
} | |
println(" Data is generated now ") | |
val start = System.nanoTime() | |
val k = result.sortBy(_._2) | |
println( " Sorted : " + size + " entries in " + (System.nanoTime() - start)/1000 + " usecs") | |
} | |
def writeFile(fileName:String, data:Seq[(String, Long, Int)]):Unit = { | |
val file = new File(fileName) | |
val bw = new BufferedWriter(new FileWriter(file)) | |
for ( lat <- 0 until data.size) { | |
bw.write( data(lat)._2 + " bytes " + data(lat)._3 + " % \n") | |
} | |
bw.close() | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment