Created
March 5, 2015 12:30
-
-
Save MikhailGolubtsov/2cca99f57392e5397d23 to your computer and use it in GitHub Desktop.
Hazelcast map-reduce example - counting distribution
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import com.hazelcast.client.HazelcastClient; | |
import com.hazelcast.client.config.ClientConfig; | |
import com.hazelcast.client.config.ClientNetworkConfig; | |
import com.hazelcast.core.HazelcastInstance; | |
import com.hazelcast.core.IMap; | |
import com.hazelcast.mapreduce.Combiner; | |
import com.hazelcast.mapreduce.CombinerFactory; | |
import com.hazelcast.mapreduce.Context; | |
import com.hazelcast.mapreduce.Job; | |
import com.hazelcast.mapreduce.JobCompletableFuture; | |
import com.hazelcast.mapreduce.JobTracker; | |
import com.hazelcast.mapreduce.KeyValueSource; | |
import com.hazelcast.mapreduce.Mapper; | |
import com.hazelcast.mapreduce.Reducer; | |
import com.hazelcast.mapreduce.ReducerFactory; | |
import java.io.Serializable; | |
import java.util.Arrays; | |
import java.util.HashMap; | |
import java.util.List; | |
import java.util.Map; | |
import java.util.Random; | |
import java.util.concurrent.ExecutionException; | |
import java.util.concurrent.TimeUnit; | |
import java.util.concurrent.TimeoutException; | |
@SuppressWarnings("serial") | |
public class DistributionTest { | |
public static void main(String[] args) throws InterruptedException, ExecutionException, TimeoutException { | |
ClientNetworkConfig clientNetworkConfig = new ClientNetworkConfig(); | |
ClientConfig clientConfig = new ClientConfig(); | |
clientConfig.setNetworkConfig(clientNetworkConfig); | |
HazelcastInstance client = HazelcastClient.newHazelcastClient(clientConfig); | |
IMap<String, ParametersValues> map = client.getMap("parameters"); | |
Map<String, ParametersValues> data = new HashMap<>(); | |
Random random = new Random(); | |
if (map.isEmpty()) { | |
System.out.println("Preparing data..."); | |
for (int i = 0; i < 500000; i++) { | |
ParametersValues values = new ParametersValues(); | |
for (int j = 0; j < 100; j++) { | |
String paramName = "param" + j; | |
values.setParamValue(paramName, new ParameterValue(random.nextInt(10))); | |
} | |
map.put("number" + i, values); | |
} | |
map.putAll(data); | |
} | |
System.out.println("Data is ready"); | |
JobTracker jobTracker = client.getJobTracker("default"); | |
KeyValueSource<String, ParametersValues> source = KeyValueSource.fromMap(map); | |
long begin = System.currentTimeMillis(); | |
System.out.println("Starting map reduce"); | |
Job<String, ParametersValues> job = jobTracker.newJob(source); | |
JobCompletableFuture<Map<String, Distribution>> distrF = job | |
.mapper(filterMapper()) | |
.combiner(distrCombinerFactory()) | |
.reducer(distrReducerFactory()) | |
.submit(); | |
System.out.println("job is submitted"); | |
Map<String, Distribution> distr = distrF.get(180, TimeUnit.SECONDS); | |
System.out.println("Distributions are " + distr); | |
System.out.println("it took " + (System.currentTimeMillis() - begin)); | |
client.shutdown(); | |
} | |
public static CombinerFactory<String, ParameterValue, Distribution> distrCombinerFactory() { | |
return new CombinerFactory<String, ParameterValue, Distribution>() { | |
public Combiner<ParameterValue, Distribution> newCombiner(String key) { | |
return new Combiner<ParameterValue, Distribution>() { | |
private Distribution distr; | |
public void beginCombine() { | |
distr = new Distribution(); | |
} | |
public void combine(ParameterValue value) { | |
for (Integer v: value.values) { | |
distr.count(v); | |
} | |
} | |
public Distribution finalizeChunk() { | |
Distribution result = distr; | |
beginCombine(); | |
return result; | |
} | |
}; | |
} | |
}; | |
} | |
public static ReducerFactory<String, Distribution, Distribution> distrReducerFactory() { | |
return new ReducerFactory<String, Distribution, Distribution>() { | |
public Reducer<Distribution, Distribution> newReducer(String key) { | |
return new Reducer<Distribution, Distribution>() { | |
private Distribution distr = new Distribution(); | |
public void reduce(Distribution other) { | |
distr.merge(other); | |
} | |
public Distribution finalizeReduce() { | |
return distr; | |
} | |
}; | |
} | |
}; | |
} | |
public static class ParametersValues implements Serializable { | |
private Map<String, ParameterValue> values = new HashMap<>(); | |
public void setParamValue(String paramName, ParameterValue value) { | |
values.put(paramName, value); | |
} | |
} | |
public static class ParameterValue implements Serializable { | |
private final List<Integer> values; | |
public ParameterValue(List<Integer> values) { | |
this.values = values; | |
} | |
public ParameterValue(Integer singleVal) { | |
this.values = Arrays.asList(singleVal); | |
} | |
} | |
public static class Distribution implements Serializable { | |
private final Map<Integer, Integer> distr = new HashMap<>(); | |
public void count(Integer value) { | |
if (!distr.containsKey(value)) { | |
distr.put(value, 0); | |
} | |
distr.put(value, distr.get(value) + 1); | |
} | |
public String toString() { | |
return "Distribution [distr=" + distr + "]"; | |
} | |
public void merge(Distribution other) { | |
for (Integer otherKey : other.distr.keySet()) { | |
int thisCount = distr.containsKey(otherKey) ? distr.get(otherKey) : 0; | |
int otherCount = other.distr.get(otherKey); | |
distr.put(otherKey, thisCount + otherCount); | |
} | |
} | |
} | |
public static Mapper<String, ParametersValues, String, ParameterValue> filterMapper() { | |
return new CustomMapper(); | |
} | |
public static class CustomMapper implements Mapper<String, ParametersValues, String, ParameterValue>, Serializable { | |
public void map(String key, ParametersValues values, Context<String, ParameterValue> context) { | |
for (String paramName: values.values.keySet()) { | |
ParameterValue parameterValue = values.values.get(paramName); | |
context.emit(paramName, parameterValue); | |
} | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment