-
-
Save parzonka/9ab676de54d4030af61daf25f19f2aba to your computer and use it in GitHub Desktop.
Hazelcast map-reduce example - counting distribution
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import com.hazelcast.core.Hazelcast; | |
import com.hazelcast.core.HazelcastInstance; | |
import com.hazelcast.core.IMap; | |
import com.hazelcast.mapreduce.Combiner; | |
import com.hazelcast.mapreduce.CombinerFactory; | |
import com.hazelcast.mapreduce.Context; | |
import com.hazelcast.mapreduce.Job; | |
import com.hazelcast.mapreduce.JobCompletableFuture; | |
import com.hazelcast.mapreduce.JobTracker; | |
import com.hazelcast.mapreduce.KeyValueSource; | |
import com.hazelcast.mapreduce.Mapper; | |
import com.hazelcast.mapreduce.Reducer; | |
import com.hazelcast.mapreduce.ReducerFactory; | |
import java.io.Serializable; | |
import java.util.Arrays; | |
import java.util.HashMap; | |
import java.util.List; | |
import java.util.Map; | |
import java.util.Random; | |
import java.util.concurrent.ExecutionException; | |
import java.util.concurrent.TimeUnit; | |
import java.util.concurrent.TimeoutException; | |
@SuppressWarnings("serial") | |
public class DistributionTest { | |
private static final int HAZELCAST_INSTANCES = 2; | |
public static void main(String[] args) throws InterruptedException, ExecutionException, TimeoutException { | |
HazelcastInstance inst = Hazelcast.newHazelcastInstance(); | |
for (int i = 1; i < HAZELCAST_INSTANCES; i++) { | |
Hazelcast.newHazelcastInstance(); | |
} | |
while (!inst.getPartitionService().isClusterSafe()) { | |
System.out.println("Waiting until cluster with " + HAZELCAST_INSTANCES + " instances has stabilized..."); | |
Thread.sleep(1000); | |
} | |
IMap<String, ParametersValues> map = inst.getMap("parameters"); | |
Map<String, ParametersValues> data = new HashMap<>(); | |
Random random = new Random(); | |
if (map.isEmpty()) { | |
System.out.println("Preparing data..."); | |
for (int i = 0; i < 5000; i++) { | |
ParametersValues values = new ParametersValues(); | |
for (int j = 0; j < 100; j++) { | |
String paramName = "param" + j; | |
values.setParamValue(paramName, new ParameterValue(random.nextInt(10))); | |
} | |
map.put("number" + i, values); | |
} | |
map.putAll(data); | |
} | |
while (!inst.getPartitionService().isClusterSafe()) { | |
Thread.sleep(1000); | |
} | |
System.out.println("Data is ready"); | |
JobTracker jobTracker = inst.getJobTracker("default"); | |
KeyValueSource<String, ParametersValues> source = KeyValueSource.fromMap(map); | |
long begin = System.currentTimeMillis(); | |
System.out.println("Starting map reduce"); | |
Job<String, ParametersValues> job = jobTracker.newJob(source); | |
JobCompletableFuture<Map<String, Distribution>> distrF = job.mapper(filterMapper()) | |
.combiner(distrCombinerFactory()).reducer(distrReducerFactory()).submit(); | |
System.out.println("job is submitted"); | |
Map<String, Distribution> distr = distrF.get(180, TimeUnit.SECONDS); | |
System.out.println("Distributions are " + distr); | |
System.out.println("it took " + (System.currentTimeMillis() - begin)); | |
inst.shutdown(); | |
} | |
public static CombinerFactory<String, ParameterValue, Distribution> distrCombinerFactory() { | |
return new CombinerFactory<String, ParameterValue, Distribution>() { | |
public Combiner<ParameterValue, Distribution> newCombiner(String key) { | |
return new Combiner<ParameterValue, Distribution>() { | |
private Distribution distr; | |
public void beginCombine() { | |
distr = new Distribution(); | |
} | |
public void combine(ParameterValue value) { | |
for (Integer v : value.values) { | |
distr.count(v); | |
} | |
} | |
public Distribution finalizeChunk() { | |
Distribution result = distr; | |
beginCombine(); | |
return result; | |
} | |
}; | |
} | |
}; | |
} | |
public static ReducerFactory<String, Distribution, Distribution> distrReducerFactory() { | |
return new ReducerFactory<String, Distribution, Distribution>() { | |
public Reducer<Distribution, Distribution> newReducer(String key) { | |
return new Reducer<Distribution, Distribution>() { | |
private Distribution distr = new Distribution(); | |
public void reduce(Distribution other) { | |
distr.merge(other); | |
} | |
public Distribution finalizeReduce() { | |
return distr; | |
} | |
}; | |
} | |
}; | |
} | |
public static class ParametersValues implements Serializable { | |
private Map<String, ParameterValue> values = new HashMap<>(); | |
public void setParamValue(String paramName, ParameterValue value) { | |
values.put(paramName, value); | |
} | |
} | |
public static class ParameterValue implements Serializable { | |
private final List<Integer> values; | |
public ParameterValue(List<Integer> values) { | |
this.values = values; | |
} | |
public ParameterValue(Integer singleVal) { | |
this.values = Arrays.asList(singleVal); | |
} | |
} | |
public static class Distribution implements Serializable { | |
private final Map<Integer, Integer> distr = new HashMap<>(); | |
public void count(Integer value) { | |
if (!distr.containsKey(value)) { | |
distr.put(value, 0); | |
} | |
distr.put(value, distr.get(value) + 1); | |
} | |
public String toString() { | |
return "Distribution [distr=" + distr + "]"; | |
} | |
public void merge(Distribution other) { | |
for (Integer otherKey : other.distr.keySet()) { | |
int thisCount = distr.containsKey(otherKey) ? distr.get(otherKey) : 0; | |
int otherCount = other.distr.get(otherKey); | |
distr.put(otherKey, thisCount + otherCount); | |
} | |
} | |
} | |
public static Mapper<String, ParametersValues, String, ParameterValue> filterMapper() { | |
return new CustomMapper(); | |
} | |
public static class CustomMapper implements Mapper<String, ParametersValues, String, ParameterValue>, Serializable { | |
public void map(String key, ParametersValues values, Context<String, ParameterValue> context) { | |
for (String paramName : values.values.keySet()) { | |
ParameterValue parameterValue = values.values.get(paramName); | |
context.emit(paramName, parameterValue); | |
} | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment