Last active
July 7, 2020 21:42
-
-
Save joyoyoyoyoyo/af62f719e8cc66da08cd074d9af860ee to your computer and use it in GitHub Desktop.
Programming exercise: Calculate the "poor ratio" for devices, "poor" is considered a device ping count which is lower than the device type's average ping. For devices marked as poor performance, calculate the ratio of devices considered poor. If the average ping count for an iPhone is 50, then any device below 50 would be considered poor, for ex…
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package io.angelortega | |
import org.apache.spark.rdd.RDD | |
object MapReduceSolution extends LocalSparkContext { | |
def main(args: Array[String]): Unit = { | |
val inputRdd: RDD[String] = sc.textFile(args.head) | |
val deviceTypePoorRatios = calcPoorRatios(inputRdd) | |
printOutput(deviceTypePoorRatios) | |
sc.stop() | |
} | |
/** | |
* Big-O(N), where N is sum(partition size_i * parallelExecutionTime_i)) and | |
* i is the num stages from 0 until numStages) | |
**/ | |
def calcPoorRatios(inputRdd: RDD[String]): RDD[(String, Double)] = { | |
/** | |
* Read device inputs and use (ip, deviceType) this is useful for reducing shuffling. The key provides more control | |
* of our emission of data to block storage or partitions. A key of (ip, deviceType) can naturally experience | |
* many collisions if one ip is more frequent than another. The result could be block spillage since the key will | |
* assign to use one hash function by default. | |
* | |
* Skew may be reduced if (ips and deviceTypes) are distributed evenly across the blockStorage. | |
* - Removing ip data skew can be achieved by salting our keys | |
* - Salting our keys can help distribute the devices evenly across each partition, | |
* this will work well since our reduceByKey phase will merge our values prior to the shuffle stage | |
* - An alternative to salting is repartitioning, either by | |
* a. Calling repartition to adjust numPartitions and optimizing to make sure our block size stays within the 64MB < 256 MB range. | |
* b. Creating our own HashPartitioner which can evenly distribute our keys | |
* c. Reducing our partitions, so there is more likely a chance for collisions and increasing blocksize to prevent block spillage | |
* - An alternative could by to stream the output to a sink | |
* (Kafka would serve well, because we can adjust the partition count according to key--a good | |
* example of this is twitter increasing partitions for high segment users, a continuous log must be delivered to many followers. | |
* The same can apply to an ip, if the ip hits pings more frequently) | |
**/ | |
val DELIMITER = ',' | |
val devicePerformanceLog = inputRdd | |
.map(_.split(DELIMITER)) | |
.flatMap { | |
case arr if arr.length == 3 && scala.util.Try(arr(2).toInt).isSuccess => | |
Some(arr(0), arr(1), arr(2).toInt) | |
case _ => None | |
} | |
.map(device => { | |
val id = device._1 | |
val deviceType = device._2 | |
val score = device._3 | |
((id, deviceType), (score, 1)) // K, V | |
}) | |
/** | |
* Big-O((numWorkerThreads/numCores)), where we have numCores is 1, i.e, a single core machine. We can no longer | |
* have the parallelization. Big-O(N), linear time + executionDelay | |
* | |
* In the best case, O(workerThreads/numCores), we have many more cores than threads, numCores >> workerThreads. | |
* This assumes our executor environment carries this trait. | |
* An example: Say we have 4 threads and 8 cores O(4/8) = O(1/2) = O(C), we now have a constant rate | |
* O(1) | |
* This is what we mean by having horizontal scalability. we get O(1) time, | |
* however our memory size is now O(blockSize * numPartitions). This can grow at a linear rate | |
* to take advantage of parallelization for better time complexity. | |
* | |
* | |
* In the best case scenario, we say our time complexity is O(1), with numCores >> numWorkerThreads. | |
* because each map phase occurs at constant time under one parallel operation. To achieve O(1), | |
* we require many cores and RAM storage. | |
* We would like many cores, and keep our data in RAM so we have 700ns-100ns latency, | |
* and do not have network latency. | |
* | |
* However, this is the ideal world. Overall, we will be limited by the max execution cycle for one process and | |
* this will be our bottleneck. | |
* So a pending task may wait and be staged for computation until the execution cycle is finished for that stage. | |
* | |
* The time complexity obeys Amdahl's law: | |
* https://en.wikipedia.org/wiki/Amdahl%27s_law | |
* http://15418.courses.cs.cmu.edu/spring2016content/lectures/19_heterogeneity/images/slide_006.jpg | |
* | |
**/ | |
// The resulting RDD | |
// (1.1.1.1,android,20,1) | |
// (1.1.1.1,android,100,1) | |
// (2.2.2.2,iphone,10,1) | |
// (2.2.2.2,iphone,20,1) | |
// (3.3.3.3,android,10,1) | |
// (3.3.3.3,android,40,1) | |
// (3.3.3.3,android,10,1) | |
// (4.4.4.4,iphone,10,1) | |
// Big-O(N), where N is ((partition size * parallelExecutionTime)) | |
// Add an aggregator, with totalScore and ping count per device combinator and device sequential functions | |
val (totalScore, totalPings) = (0, 0) | |
val deviceAccumulatedMetrics = devicePerformanceLog.aggregateByKey((totalScore, totalPings))((u, device) => { | |
val (scoreAccumulator, devicePingAccumulator) = u | |
val (deviceScore, devicePingCount) = device | |
((scoreAccumulator + deviceScore), (devicePingCount + devicePingAccumulator)) | |
}, (u1, u2) => { | |
val (mergedScore, mergedCount) = (u1._1 + u2._1, u1._2 + u2._2) | |
(mergedScore, mergedCount) | |
}) | |
// Row((2.2.2.2,iphone),(30,2)) | |
// Row((3.3.3.3,android),(60,3)) | |
// Row((1.1.1.1,android),(120,2)) | |
// Row((4.4.4.4,iphone),(10,1)) | |
// AVG Performance score and isPoor boolean reached from device accumulated metrics | |
val devicePerformanceMetrics = deviceAccumulatedMetrics.map { case (deviceKey, v) => | |
val (totalScore, totalPings, avgPerfScore, isPoor) = (v._1, v._2, v._1 / v._2, v._1 / v._2 <= 50) | |
(deviceKey, (totalScore, totalPings, avgPerfScore, isPoor)) | |
} | |
val poorRatio = devicePerformanceMetrics.map { | |
case ((id, deviceType), (totalScore, totalPings, avgPerfScore, isPoor)) => | |
((deviceType), ((if (isPoor) 1 else 0), 1)) | |
} | |
val isPoorAggByDevice = poorRatio.reduceByKey((u, v) => ((u._1 + v._1), (u._2 + v._2))) | |
val results = isPoorAggByDevice.map { | |
case (deviceType, (isPoorTally, totalDeviceType)) => (deviceType, (isPoorTally.toDouble / totalDeviceType)) | |
} | |
results | |
/** | |
* Big-O(N), where N is sum(partition size_i * parallelExecutionTime_i)) and | |
* i is the num stages from 0 until numStages) | |
*/ | |
} | |
def printOutput(outputRdd: RDD[(String, Double)]): Unit = { | |
outputRdd.collect().foreach { case (deviceType, poorRatio) => | |
println(s"$deviceType,$poorRatio") | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment