Created
May 15, 2014 15:31
-
-
Save codyaray/d58c1aaf688f27b72fdd to your computer and use it in GitHub Desktop.
Best practice to randomly shard data between multiple TridentStates
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
List<StateFactory> factories = Lists.newArrayListWithCapacity(mongoHosts.size()); | |
for (String host : mongoHosts) { | |
factories.add(MongoStateFactory.opaque(host, mongoDatabase)); | |
} | |
StateFactory stateFactory = new RandomShardState.Factory(factories); |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Copyright 2014 BrightTag, Inc. All rights reserved. | |
package com.brighttag.storm.shard; | |
import java.util.List; | |
import java.util.Map; | |
import java.util.Random; | |
import backtype.storm.task.IMetricsContext; | |
import storm.trident.state.State; | |
import storm.trident.state.StateFactory; | |
import storm.trident.state.map.MapState; | |
import storm.trident.state.map.ReadOnlyMapState; | |
import com.google.common.collect.Lists; | |
/** | |
* Randomly shards batches between delegate {@link State state}s. | |
* | |
* @author codyaray | |
* @since 4/23/2014 | |
*/ | |
public class RandomShardState implements MapState, State { | |
private final Random rand = new Random(); | |
private final List<State> delegates; | |
// Set and unset in beginCommit/commit | |
private State shard; | |
private RandomShardState(List<State> delegates) { | |
this.delegates = delegates; | |
} | |
@Override | |
public void beginCommit(Long txid) { | |
shard = delegates.get(rand.nextInt(delegates.size())); | |
shard.beginCommit(txid); | |
} | |
@Override | |
public void commit(Long txid) { | |
shard.commit(txid); | |
shard = null; | |
} | |
@Override | |
public List multiGet(List keys) { | |
return ((ReadOnlyMapState) shard).multiGet(keys); | |
} | |
@Override | |
public List multiUpdate(List keys, List vals) { | |
return ((MapState) shard).multiUpdate(keys, vals); | |
} | |
@Override | |
public void multiPut(List keys, List vals) { | |
((MapState) shard).multiPut(keys, vals); | |
} | |
/** | |
* Factory to create {@link RandomShardState}s. | |
*/ | |
public static class Factory implements StateFactory { | |
private static final long serialVersionUID = -6289401502963920055L; | |
private final List<StateFactory> delegates; | |
public Factory(List<StateFactory> delegates) { | |
this.delegates = delegates; | |
} | |
@Override | |
@SuppressWarnings({ "rawtypes" }) | |
public State makeState(Map conf, IMetricsContext metrics, int partitionIndex, int numPartitions) { | |
List<State> states = Lists.newArrayListWithCapacity(delegates.size()); | |
for (StateFactory factory : delegates) { | |
states.add(factory.makeState(conf, metrics, partitionIndex, numPartitions)); | |
} | |
return new RandomShardState(states); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment