Skip to content

Instantly share code, notes, and snippets.

@animeshtrivedi
Last active September 20, 2017 14:25
Show Gist options
  • Save animeshtrivedi/eda6a723d145e262494b2c656793eef1 to your computer and use it in GitHub Desktop.
Save animeshtrivedi/eda6a723d145e262494b2c656793eef1 to your computer and use it in GitHub Desktop.
/**
* Created by atr on 19.09.17.
*/
import java.io.{BufferedWriter, File, FileWriter}
import com.ibm.crail.CrailFS
import com.ibm.crail.conf.CrailConfiguration
import org.apache.spark.SparkContext
import scala.util.Random
object Test {
val conf = new CrailConfiguration
val fs = CrailFS.newInstance(conf)
val sdir = "/spark/shuffle/"
val bdir = "/spark/broadcast/"
def enumAll(file: String): Seq[(String, Long)] = {
val f1 = fs.lookup(file)
val f2 = f1.get()
if (f2.getType.isDataFile) {
/* we got a file hence we return a sequence */
Seq((f2.getPath, f2.getCapacity))
} else if (f2.getType.isDirectory || f2.getType.isMultiFile) {
/* we enum all and call self */
val itr = if(f2.getType.isDirectory) {
f2.asDirectory().listEntries()
} else {
f2.asMultiFile().listEntries()
}
var result = Seq[(String, Long)]()
while(itr.hasNext){
val nx = itr.next()
val res = enumAll(nx)
result = result.++(res)
}
result
} else {
throw new Exception("Illegal file type: " + file + " it is " + f2.getType)
}
}
// we can just enum at the stage of a multi-file
def enumPerTask(file: String): Seq[(String, Long)] = {
val f1 = fs.lookup(file)
val f2 = f1.get()
if (f2.getType.isMultiFile) {
/* we got a multifile file hence we return a sequence */
val mx = f2.asMultiFile()
val itr = mx.listEntries()
var totalSize = 0L
while(itr.hasNext){
totalSize+=fs.lookup(itr.next()).get().getCapacity
}
println(" returning here with " + f2.getPath)
Seq(("MULTIFILE : " + f2.getPath, totalSize))
} else if (f2.getType.isDirectory) {
/* we enum all and call self */
val itr = f2.asDirectory().listEntries()
var result = Seq[(String, Long)]()
while(itr.hasNext){
val nx = itr.next()
val res = enumPerTask(nx)
result = result.++(res)
}
result
} else {
throw new Exception("Illegal file type: " + file + " it is " + f2.getType)
}
}
def executeRDD(dir:String, spark:SparkContext):Seq[(String, Long, Int)] = {
val _items = enumPerTask(dir)
val rdd1 = spark.parallelize(_items)
val total = _items.size
// sort
val rdd2 = rdd1.sortBy(_._2)
// broadcast
val totalBC = spark.broadcast(total)
// convert
val rddZip = rdd2.zipWithIndex()
// calculate percentile
val rdd3= rddZip.map( k => {
/* we have to create a type with String, Long, Int */
(k._1._1, k._1._2, ((k._2 + 1) * 100 / totalBC.value).toInt)
})
rdd3.collect().toSeq
}
def execute(dir:String):Seq[(String, Long, Int)] = {
val _items = enumAll(dir)
val total = _items.size
println("Got all directories, now sorting " + _items.size + " elements ")
val items = _items.sortBy(_._2)
println(" Got all list with size : " + total)
var current = 0
var itemsWithPercentile = Seq[(String, Long, Int)]()
while (current < total) {
val kv = items(current)
val entry = Seq((kv._1, kv._2, (current + 1) * 100 / total)) // percentile
itemsWithPercentile = itemsWithPercentile.++(entry)
current+=1
println(" processed " + current)
}
// itemsWithPercentile.foreach(k => {
// println(k._2 + " bytes " + k._3 + " % file name : " + k._1)
// })
itemsWithPercentile
}
def sizeTime(size:Int):Unit = {
var soFar = 0
val str = "/spark/shuffle/shuffle_6/part_166/1-2-1347095298"
var result = Seq[(String, Long)]()
val rand = new Random()
while(soFar < size ) {
val key = Seq((str + soFar, rand.nextLong()))
result = result.++(key)
soFar+=1
}
println(" Data is generated now ")
val start = System.nanoTime()
val k = result.sortBy(_._2)
println( " Sorted : " + size + " entries in " + (System.nanoTime() - start)/1000 + " usecs")
}
def writeFile(fileName:String, data:Seq[(String, Long, Int)]):Unit = {
val file = new File(fileName)
val bw = new BufferedWriter(new FileWriter(file))
for ( lat <- 0 until data.size) {
bw.write( data(lat)._2 + " bytes " + data(lat)._3 + " % \n")
}
bw.close()
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment