Last active
November 21, 2016 19:24
-
-
Save Crim/61537958df65a5e13b3844b2d5e28cde to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.pardot.storm.grouping; | |
import org.apache.storm.generated.GlobalStreamId; | |
import org.apache.storm.grouping.CustomStreamGrouping; | |
import org.apache.storm.task.WorkerTopologyContext; | |
import java.io.Serializable; | |
import java.util.*; | |
import java.util.concurrent.atomic.AtomicInteger; | |
/** | |
* Everybody do the Pardot Shuffle. A slightly different take/implementation on the "ShuffleGrouping" grouping. | |
*/ | |
public class ThePardotShuffle implements CustomStreamGrouping, Serializable { | |
private ArrayList<List<Integer>> choices; | |
private AtomicInteger counter; | |
private int totalSize; | |
@Override | |
public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks) { | |
// Clone the list | |
choices = new ArrayList<List<Integer>>(targetTasks.size()); | |
for (Integer i : targetTasks) { | |
choices.add(Arrays.asList(i)); | |
} | |
counter = new AtomicInteger(0); | |
totalSize = choices.size(); | |
} | |
@Override | |
public List<Integer> chooseTasks(int taskId, List<Object> values) { | |
final int nextTask = Math.abs(counter.getAndIncrement() % totalSize); | |
return choices.get(nextTask); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.pardot.storm.grouping; | |
import com.google.common.collect.Lists; | |
import org.apache.storm.grouping.ShuffleGrouping; | |
import org.apache.storm.task.WorkerTopologyContext; | |
import org.junit.Test; | |
import org.slf4j.Logger; | |
import org.slf4j.LoggerFactory; | |
import java.util.List; | |
import java.util.Random; | |
import java.util.concurrent.*; | |
import static org.junit.Assert.*; | |
import static org.mockito.Mockito.mock; | |
/** | |
* | |
*/ | |
public class ThePardotShuffleTest { | |
private static final Logger logger = LoggerFactory.getLogger(ThePardotShuffleTest.class); | |
/** | |
* Tests that we round robbin correctly using ThePardotShuffle implementation. | |
* | |
* Output: Total Distribution [5000, 5000, 5000, 5000, 5000, 5000] | |
*/ | |
@Test | |
public void testThePardotShuffle() { | |
// Task Id not used, so just pick a static value | |
final int inputTaskId = 100; | |
// Define our taskIds | |
final List<Integer> availableTaskIds = Lists.newArrayList(0, 1, 2, 3, 4, 5); | |
ThePardotShuffle grouper = new ThePardotShuffle(); | |
WorkerTopologyContext context = mock(WorkerTopologyContext.class); | |
grouper.prepare(context, null, availableTaskIds); | |
// Keep track of how many times we see each taskId | |
int[] taskCounts = {0, 0, 0, 0, 0, 0}; | |
for (int i = 1; i <= 30000; i++) { | |
List<Integer> taskIds = grouper.chooseTasks(inputTaskId, Lists.newArrayList()); | |
// Validate a single task id return | |
assertNotNull("Not null taskId list returned", taskIds); | |
assertEquals("Single task Id returned", 1, taskIds.size()); | |
int taskId = taskIds.get(0); | |
assertTrue("TaskId should exist", taskId >= 0 && taskId <= 5); | |
taskCounts[taskId]++; | |
} | |
logger.info("Total Distribution {}", taskCounts); | |
assertEquals("Distribution should be even for all nodes", 5000, taskCounts[0]); | |
assertEquals("Distribution should be even for all nodes", 5000, taskCounts[1]); | |
assertEquals("Distribution should be even for all nodes", 5000, taskCounts[2]); | |
assertEquals("Distribution should be even for all nodes", 5000, taskCounts[3]); | |
assertEquals("Distribution should be even for all nodes", 5000, taskCounts[4]); | |
assertEquals("Distribution should be even for all nodes", 5000, taskCounts[5]); | |
} | |
/** | |
* Tests that we round robbin correctly with multiple threads using ThePardotShuffle implementation. | |
* | |
* Output: | |
* Result: [4865, 4966, 5120, 5023, 5070, 4956] | |
* Result: [4942, 4964, 5102, 5071, 4924, 4997] | |
* Result: [5123, 4889, 5034, 5005, 5134, 4815] | |
* Result: [5003, 5057, 4880, 5196, 4836, 5028] | |
* Result: [5068, 5018, 4915, 4786, 5045, 5168] | |
* Result: [4999, 5106, 4949, 4919, 4991, 5036] | |
*/ | |
@Test | |
public void testThePardotShuffleMultiThreaded() throws InterruptedException, ExecutionException { | |
// Task Id not used, so just pick a static value | |
final int inputTaskId = 100; | |
// Define our taskIds | |
final List<Integer> availableTaskIds = Lists.newArrayList(0, 1, 2, 3, 4, 5); | |
final ThePardotShuffle grouper = new ThePardotShuffle(); | |
final WorkerTopologyContext context = mock(WorkerTopologyContext.class); | |
// Call prepare with our available taskIds | |
grouper.prepare(context, null, availableTaskIds); | |
List<Callable<int[]>> threadTasks = Lists.newArrayList(); | |
for (int x=0; x<availableTaskIds.size(); x++) { | |
Callable<int[]> threadTask = new Callable<int[]>() { | |
@Override | |
public int[] call() throws Exception { | |
int[] taskCounts = {0, 0, 0, 0, 0, 0}; | |
for (int i = 1; i <= 30000; i++) { | |
List<Integer> taskIds = grouper.chooseTasks(inputTaskId, Lists.newArrayList()); | |
// Validate a single task id return | |
assertNotNull("Not null taskId list returned", taskIds); | |
assertEquals("Single task Id returned", 1, taskIds.size()); | |
int taskId = taskIds.get(0); | |
assertTrue("TaskId should exist", taskId >= 0 && taskId <= 5); | |
taskCounts[taskId]++; | |
} | |
return taskCounts; | |
} | |
}; | |
// Add to our collection. | |
threadTasks.add(threadTask); | |
} | |
ExecutorService executor = Executors.newFixedThreadPool(threadTasks.size()); | |
List<Future<int[]>> taskResults = executor.invokeAll(threadTasks); | |
// Wait for all tasks to complete | |
for (Future taskResult: taskResults) { | |
while (!taskResult.isDone()) { | |
Thread.sleep(1000); | |
} | |
logger.info("Result: {}", taskResult.get()); | |
} | |
} | |
/** | |
* Tests that we round robbin correctly using ShuffleGrouping implementation. | |
* | |
* OUTPUT: "total Distribution [5000, 5000, 5000, 4999, 5000, 5001]" | |
*/ | |
@Test | |
public void testShuffleGrouping() { | |
// Task Id not used, so just pick a static value | |
final int inputTaskId = 100; | |
// Define our taskIds | |
final List<Integer> availableTaskIds = Lists.newArrayList(0, 1, 2, 3, 4, 5); | |
ShuffleGrouping grouper = new ShuffleGrouping(); | |
WorkerTopologyContext context = mock(WorkerTopologyContext.class); | |
grouper.prepare(context, null, availableTaskIds); | |
// Keep track of how many times we see each taskId | |
int[] taskCounts = {0, 0, 0, 0, 0, 0}; | |
for (int i = 1; i <= 30000; i++) { | |
List<Integer> taskIds = grouper.chooseTasks(inputTaskId, Lists.newArrayList()); | |
// Validate a single task id return | |
assertNotNull("Not null taskId list returned", taskIds); | |
assertEquals("Single task Id returned", 1, taskIds.size()); | |
int taskId = taskIds.get(0); | |
assertTrue("TaskId should exist", taskId >= 0 && taskId <= 5); | |
taskCounts[taskId]++; | |
} | |
logger.info("Total Distribution {}", taskCounts); | |
assertEquals("Distribution should be even for all nodes", 5000, taskCounts[0]); | |
assertEquals("Distribution should be even for all nodes", 5000, taskCounts[1]); | |
assertEquals("Distribution should be even for all nodes", 5000, taskCounts[2]); | |
assertEquals("Distribution should be even for all nodes", 5000, taskCounts[3]); | |
assertEquals("Distribution should be even for all nodes", 5000, taskCounts[4]); | |
assertEquals("Distribution should be even for all nodes", 5000, taskCounts[5]); | |
} | |
/** | |
* Tests that we round robbin correctly with multiple threads using ShuffleGrouping implementation. | |
* Output: | |
* Result: [27312, 34, 2226, 252, 93, 83] | |
* Result: [26244, 55, 2918, 398, 248, 137] | |
* Result: [27190, 44, 2357, 232, 88, 89] | |
* Result: [27220, 26, 2424, 209, 63, 58] | |
* Result: [27226, 0, 2313, 178, 191, 92] | |
* Result: [27288, 0, 2285, 254, 134, 39] | |
*/ | |
@Test | |
public void testShuffleGroupMultiThreaded() throws InterruptedException, ExecutionException { | |
// Task Id not used, so just pick a static value | |
final int inputTaskId = 100; | |
// Define our taskIds | |
final List<Integer> availableTaskIds = Lists.newArrayList(0, 1, 2, 3, 4, 5); | |
final ShuffleGrouping grouper = new ShuffleGrouping(); | |
final WorkerTopologyContext context = mock(WorkerTopologyContext.class); | |
// Call prepare with our available taskIds | |
grouper.prepare(context, null, availableTaskIds); | |
List<Callable<int[]>> threadTasks = Lists.newArrayList(); | |
for (int x=0; x<availableTaskIds.size(); x++) { | |
Callable<int[]> threadTask = new Callable<int[]>() { | |
@Override | |
public int[] call() throws Exception { | |
int[] taskCounts = {0, 0, 0, 0, 0, 0}; | |
for (int i = 1; i <= 30000; i++) { | |
List<Integer> taskIds = grouper.chooseTasks(inputTaskId, Lists.newArrayList()); | |
// Validate a single task id return | |
assertNotNull("Not null taskId list returned", taskIds); | |
assertEquals("Single task Id returned", 1, taskIds.size()); | |
int taskId = taskIds.get(0); | |
assertTrue("TaskId should exist", taskId >= 0 && taskId <= 5); | |
taskCounts[taskId]++; | |
} | |
return taskCounts; | |
} | |
}; | |
// Add to our collection. | |
threadTasks.add(threadTask); | |
} | |
ExecutorService executor = Executors.newFixedThreadPool(threadTasks.size()); | |
List<Future<int[]>> taskResults = executor.invokeAll(threadTasks); | |
// Wait for all tasks to complete | |
for (Future taskResult: taskResults) { | |
while (!taskResult.isDone()) { | |
Thread.sleep(1000); | |
} | |
logger.info("Result: {}", taskResult.get()); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment