This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
val metrics = client.clusterMetrics() | |
val clusterQueueInfo = client.clusterQueue() | |
val queueWithCapacity = client.freeQueues(metrics, clusterQueueInfo) | |
val selectedYarnApps = client.appsBy(user, status) | |
if (hasApps(selectedYarnApps)) { | |
val now = System.currentTimeMillis() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
trait YarnClient { | |
def clusterMetrics(): ClusterMetrics | |
def clusterQueue(): ClusterScheduler | |
def freeQueues(metrics: ClusterMetrics, clusterQueueInfo: ClusterScheduler): List[Queue] | |
def appsBy(user: String, status: String): YarnApps | |
def moveApp(app: YarnApp, queues: List[Queue]) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
19/01/27 13:35:57 INFO StageAccumulatorListener: Stage accumulator values:map at SparkMonitoringAdvance.scala:44 | |
19/01/27 13:35:57 INFO StageAccumulatorListener: 8:Some(internal.metrics.memoryBytesSpilled):Some(0) | |
19/01/27 13:35:57 INFO StageAccumulatorListener: 2:Some(internal.metrics.executorDeserializeCpuTime):Some(468750000) | |
19/01/27 13:35:57 INFO StageAccumulatorListener: 20:Some(internal.metrics.shuffle.write.writeTime):Some(3082492535) | |
19/01/27 13:35:57 INFO StageAccumulatorListener: 5:Some(internal.metrics.resultSize):Some(396352) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class StageAccumulatorListener extends SparkListener { | |
val log = LoggerFactory.getLogger(this.getClass.getName) | |
override def onStageCompleted(event: SparkListenerStageCompleted): Unit = { | |
log.info(s"Stage accumulator values:${event.stageInfo.name}") | |
event.stageInfo.accumulables.foreach { case (id, accInfo) => | |
log.info(s"$id:${accInfo.name}:${accInfo.value}") | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
private def monitorRecordsProcessed(pointsRecordCounter: LongAccumulator): Unit = { | |
val startTime = System.currentTimeMillis() | |
val counter = pointsRecordCounter | |
log.info("Records monitoring started") | |
while (true) { | |
val timeInSec = TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis() - startTime) | |
val recordsCount = counter.sum | |
val tp = recordsCount.toFloat / timeInSec | |
log.info(s"Records processed ${recordsCount} in ${timeInSec} sec , throughput ${tp} / sec") | |
Thread.sleep(TimeUnit.SECONDS.toMillis(1)) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
private def checkSparkContext(): Unit = { | |
val startTime = System.currentTimeMillis() | |
log.info("Context creation monitor thread started") | |
while (SparkContextBuilder.isContextNotCreated()) { | |
Thread.sleep(TimeUnit.SECONDS.toMillis(1)) | |
val total = System.currentTimeMillis() - startTime | |
log.info("Waiting for spark context from {} seconds", TimeUnit.MILLISECONDS.toSeconds(total)) | |
} | |
log.info("Spark context creation took {} seconds", TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis() - startTime)) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
val total = randomNumbers.mapPartitions(data => { | |
val by2 = data.map(x => { | |
log_.info("Processing {}", x) | |
x * 2 | |
}) | |
by2 | |
}).sum() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
var x layouttest | |
var y compactyouttest | |
fmt.Printf("Int alignment %v \n", unsafe.Alignof(10)) | |
fmt.Printf("Int8 aligment %v \n", unsafe.Alignof(int8(10))) | |
fmt.Printf("Int16 aligment %v \n", unsafe.Alignof(int16(10))) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
//Create Alias for int type | |
type RichInt int | |
func main() { | |
var ri RichInt | |
ri = 100 | |
fmt.Println("Value of rich int", ri) | |
fmt.Println("Convert to Int", int(ri)) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
var value int | |
var f float64 | |
var b bool | |
var by byte | |
var name string | |
var x rune | |
//Variable are declared and initialized by compiler to ZERO VALUE of its type | |
//https://golang.org/ref/spec#The_zero_value |