Created
July 7, 2014 18:16
-
-
Save codyaray/75533044fc8c0a12fa67 to your computer and use it in GitHub Desktop.
Time-Series Event Aggregation with Storm
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Copyright 2014 BrightTag, Inc. All rights reserved. | |
package com.brighttag.storm.utils; | |
import backtype.storm.tuple.Values; | |
import storm.trident.operation.BaseFunction; | |
import storm.trident.operation.TridentCollector; | |
import storm.trident.tuple.TridentTuple; | |
/** | |
* Converts the first tuple from a byte array into a string. | |
*/ | |
public class BinaryToString extends BaseFunction { | |
private static final long serialVersionUID = -8686873770270590062L; | |
@Override | |
public void execute(TridentTuple tuple, TridentCollector collector) { | |
String field = new String(tuple.getBinary(0)); | |
collector.emit(new Values(field)); | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Copyright 2014 BrightTag, Inc. All rights reserved. | |
package com.brighttag.storm.utils; | |
import java.math.RoundingMode; | |
import backtype.storm.tuple.Values; | |
import com.google.common.math.DoubleMath; | |
import storm.trident.operation.BaseFunction; | |
import storm.trident.operation.TridentCollector; | |
import storm.trident.tuple.TridentTuple; | |
/** | |
* Maps a timestamp into a bucket of size {@code interval}. | |
* | |
* Assumes the first tuple value is a long timestamp. | |
* Outputs the {@code bucketStart} and {@code bucketEnd}. | |
*/ | |
public class Bucket extends BaseFunction { | |
private static final long serialVersionUID = 1042081321412192768L; | |
private final long interval; | |
public Bucket(long interval) { | |
this.interval = interval; | |
} | |
@Override | |
public void execute(TridentTuple tuple, TridentCollector collector) { | |
long timestamp = tuple.getLong(0); | |
long bucketStart = DoubleMath.roundToLong( | |
Math.floor(timestamp / interval), RoundingMode.UNNECESSARY) * interval; | |
long bucketEnd = bucketStart + interval; | |
collector.emit(new Values(String.valueOf(bucketStart), String.valueOf(bucketEnd))); | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Copyright 2014 BrightTag, Inc. All rights reserved. | |
package com.brighttag.storm.mongo; | |
import java.util.Arrays; | |
import java.util.List; | |
import com.google.common.base.Objects; | |
/** | |
* Represents a key in MongoDB. Provides a helper for | |
* parsing a Trident key into a Mongo key. | |
* | |
* @author codyaray | |
* @since 3/19/2014 | |
*/ | |
class MongoKey { | |
final String collection; | |
final long timestamp; | |
final String siteId; | |
final String fieldName; | |
public MongoKey(String collection, long timestamp, String siteId, String fieldName) { | |
this.collection = collection; | |
this.timestamp = timestamp; | |
this.siteId = siteId; | |
this.fieldName = fieldName; | |
} | |
public static MongoKey fromTrident(List<Object> key) { | |
String collection = key.get(0).toString(); | |
long timestamp = Long.parseLong(key.get(1).toString()); | |
String siteId = key.get(2).toString(); | |
String fieldName = key.get(3).toString(); | |
return new MongoKey(collection, timestamp, siteId, fieldName); | |
} | |
@Override | |
public String toString() { | |
return Objects.toStringHelper(this) | |
.add("collection", collection) | |
.add("timestamp", timestamp) | |
.add("siteId", siteId) | |
.add("fieldName", fieldName) | |
.toString(); | |
} | |
Object[] significantAttributes() { | |
return new Object[]{ collection, timestamp, siteId, fieldName }; | |
} | |
@Override | |
public int hashCode() { | |
return Objects.hashCode(significantAttributes()); | |
} | |
@Override | |
public boolean equals(Object obj) { | |
if (obj == null) { | |
return false; | |
} else if (obj == this) { | |
return true; | |
} else if (!getClass().isAssignableFrom(obj.getClass())) { | |
return false; | |
} | |
return Arrays.equals(significantAttributes(), getClass().cast(obj).significantAttributes()); | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Copyright 2014 BrightTag, Inc. All rights reserved. | |
package com.brighttag.storm.mongo.serializer; | |
import com.google.common.reflect.TypeToken; | |
import com.mongodb.BasicDBObject; | |
import com.mongodb.DBObject; | |
/** | |
* Serialize a key-value pair to MongoDB without any transactional metadata. | |
* | |
* @author codyaray | |
* @since 3/19/2014 | |
* @param <T> the type of the value | |
*/ | |
public class MongoNonTransactionalSerializer<T> implements MongoSerializer<T> { | |
private static final long serialVersionUID = 1L; | |
@Override | |
public DBObject serialize(String fieldName, T val) { | |
return new BasicDBObject(fieldName, val); | |
} | |
@Override | |
public T deserialize(DBObject obj, String fieldName) { | |
return MongoSerializerHelper.parse(obj.get(fieldName), getTypeParameter()); | |
} | |
@SuppressWarnings("serial") | |
private Class<? super T> getTypeParameter() { | |
return new TypeToken<T>(getClass()) { }.getRawType(); | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Copyright 2014 BrightTag, Inc. All rights reserved. | |
package com.brighttag.storm.mongo.serializer; | |
import com.google.common.reflect.TypeToken; | |
import com.mongodb.DBObject; | |
import storm.trident.state.OpaqueValue; | |
import storm.trident.state.TransactionalValue; | |
/** | |
* Serialize a key-value pair to MongoDB with opaque transactional metadata. | |
* | |
* @author codyaray | |
* @since 3/19/2014 | |
* @param <T> the type of the value | |
*/ | |
public class MongoOpaqueSerializer<T> implements MongoSerializer<OpaqueValue<T>> { | |
private static final long serialVersionUID = 1L; | |
private static final String PREV_PREFIX = "prev-"; | |
private final MongoTransactionalSerializer<T> base; | |
public MongoOpaqueSerializer(MongoTransactionalSerializer<T> base) { | |
this.base = base; | |
} | |
@Override | |
public DBObject serialize(String fieldName, OpaqueValue<T> val) { | |
TransactionalValue<T> value = new TransactionalValue<T>(val.getCurrTxid(), val.getCurr()); | |
DBObject obj = base.serialize(fieldName, value); | |
obj.put(PREV_PREFIX + fieldName, val.getPrev()); | |
return obj; | |
} | |
@Override | |
public OpaqueValue<T> deserialize(DBObject obj, String fieldName) { | |
TransactionalValue<T> val = base.deserialize(obj, fieldName); | |
T prev = MongoSerializerHelper.parse(obj.get(PREV_PREFIX + fieldName), getTypeParameter()); | |
return new OpaqueValue<T>(val.getTxid(), val.getVal(), prev); | |
} | |
@SuppressWarnings("serial") | |
private Class<? super T> getTypeParameter() { | |
return new TypeToken<T>(getClass()) { }.getRawType(); | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Copyright 2014 BrightTag, Inc. All rights reserved. | |
package com.brighttag.storm.mongo.serializer; | |
import java.io.Serializable; | |
import com.mongodb.DBObject; | |
/** | |
* Serialize a key-value pair to MongoDB. | |
* | |
* @author codyaray | |
* @since 3/19/2014 | |
* @param <T> the type of the value | |
*/ | |
public interface MongoSerializer<T> extends Serializable { | |
/** | |
* Returns a mongo {@link DBObject} for the key-value pair. | |
* | |
* @param key the key | |
* @param val the value | |
* @return the mongo {@link DBObject} | |
*/ | |
DBObject serialize(String key, T val); | |
/** | |
* Returns the value corresponding to {@code key} in {@code obj}. | |
* | |
* @param obj the mongo {@link DBObject} | |
* @param key the key | |
* @return the value | |
*/ | |
T deserialize(DBObject obj, String key); | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Copyright 2014 BrightTag, Inc. All rights reserved. | |
package com.brighttag.storm.mongo.serializer; | |
import javax.annotation.Nullable; | |
import org.slf4j.Logger; | |
import org.slf4j.LoggerFactory; | |
/** | |
* Helpers for the {@link MongoSerializer}s. | |
* | |
* @author codyaray | |
* @since 3/19/2014 | |
*/ | |
class MongoSerializerHelper { | |
private static final Logger log = LoggerFactory.getLogger(MongoSerializerHelper.class); | |
/** | |
* Parse {@code object} as {@code type}. Requires {@code type} to have | |
* a constructor with a single String argument. | |
* | |
* @return {@code object} as a {@code type} or {@code null} if {@code object} is {@code null} | |
* or does not have a constructor which accepts a single String argument. | |
*/ | |
static @Nullable <T> T parse(@Nullable Object object, Class<? super T> type) { | |
if (object == null) { | |
return null; | |
} | |
try { | |
@SuppressWarnings("unchecked") | |
T instance = (T) type.getConstructor(String.class).newInstance(object.toString()); | |
return instance; | |
} catch (Exception e) { | |
log.warn("Can't parse Object {} as type {}: {}", new Object[]{ object, type, e }); | |
} | |
return null; | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Copyright 2014 BrightTag, Inc. All rights reserved. | |
package com.brighttag.storm.mongo; | |
import java.util.Collections; | |
import java.util.List; | |
import java.util.Map; | |
import javax.annotation.Nullable; | |
import com.google.common.collect.Lists; | |
import com.google.common.collect.Maps; | |
import com.mongodb.BasicDBObject; | |
import com.mongodb.DB; | |
import com.mongodb.DBCollection; | |
import com.mongodb.DBObject; | |
import com.brighttag.storm.mongo.serializer.MongoSerializer; | |
import storm.trident.state.map.IBackingMap; | |
/** | |
* A one-off {@link MongoState} which seeks to reproduce the existing Mongo schema | |
* written by statscollector and read by stathub. | |
* | |
* @author codyaray | |
* @since 3/19/2014 | |
*/ | |
public class MongoState<T> implements IBackingMap<T> { | |
private final DB db; | |
private final MongoSerializer<T> serializer; | |
public MongoState(DB db, MongoSerializer<T> serializer) { | |
this.db = db; | |
this.serializer = serializer; | |
} | |
/** | |
* Fetch multiple metrics from MongoDB. | |
* | |
* @param keys list of metric key fields in this order: | |
* (collection_name [string], start_date [long], site_id [string], field_name [string]) | |
* @return list of values corresponding to the {@code key} with the same index in {@code keys} | |
* or {@code null} for each key without a value | |
*/ | |
@Override | |
public List<T> multiGet(List<List<Object>> keys) { | |
List<T> values = Lists.newArrayList(Collections.<T>nCopies(keys.size(), null)); | |
Map<MongoKey, Map<String, Integer>> siteBuckets = new IdentityGenerator().toSiteBuckets(keys); | |
for (Map.Entry<MongoKey, Map<String, Integer>> siteBucketFields : siteBuckets.entrySet()) { | |
MongoKey key = siteBucketFields.getKey(); | |
DBCollection collection = db.getCollection(key.collection); | |
DBObject obj = collection.findOne(createQuery(key.timestamp, key.siteId)); | |
for (Map.Entry<String, Integer> field : siteBucketFields.getValue().entrySet()) { | |
if (obj != null) { | |
values.set(field.getValue(), serializer.deserialize(obj, field.getKey())); | |
} | |
} | |
} | |
return values; | |
} | |
/** | |
* Store multiple metrics into MongoDB. This save is idempotent; any data for | |
* {@code keys} that already exists is overwritten with the new {@code vals}. | |
* | |
* @param keys list of metric key fields in this order: | |
* (collection_name [string], start_date [long], site_id [string], field_name [string]) | |
*/ | |
@Override | |
public void multiPut(List<List<Object>> keys, List<T> vals) { | |
Map<MongoKey, Map<String, T>> siteBuckets = new ValueFromListGenerator<T>(vals).toSiteBuckets(keys); | |
for (Map.Entry<MongoKey, Map<String, T>> entry : siteBuckets.entrySet()) { | |
MongoKey key = entry.getKey(); | |
DBCollection collection = db.getCollection(key.collection); | |
collection.update(createQuery(key.timestamp, key.siteId), | |
createUpdateObject(key.timestamp, key.siteId, entry.getValue()), true, false); | |
} | |
} | |
private DBObject createUpdateObject(long timestamp, String siteId, Map<String, T> fields) { | |
DBObject upsert = createQuery(timestamp, siteId); | |
for (Map.Entry<String, T> entry : fields.entrySet()) { | |
upsert.putAll(serializer.serialize(entry.getKey(), entry.getValue())); | |
} | |
return new BasicDBObject("$set", upsert); | |
} | |
private static DBObject createQuery(long timeslot, String siteId) { | |
DBObject query = new BasicDBObject(); | |
query.put("timeStamp", timeslot); | |
query.put("siteId", siteId); | |
return query; | |
} | |
/** | |
* Nasty hack to workaround the lack of callbacks/generators/blocks in Java. | |
* We just need a way to provide a function to {@link #toSiteBuckets(List)} | |
* that accepts the current index and returns the content for that field. | |
* | |
* @param <T> the type of the field value | |
*/ | |
private abstract static class Generator<T> { | |
/** | |
* Transforms a list of metric key tuples into a map of site-buckets to fields, | |
* where each field can also have its own value. The field value is provided | |
* by the generator function {@link #getContent(int)}. | |
* | |
* @param keys list of metric key fields in this order: | |
* (collection_name [string], start_date [long], site_id [string], field_name [string]) | |
* @return the map of site-buckets to fields | |
*/ | |
public Map<MongoKey, Map<String, T>> toSiteBuckets(List<List<Object>> keys) { | |
// (collection, start_date, site_id) => {field_name => field_value} | |
Map<MongoKey, Map<String, T>> siteBuckets = Maps.newHashMap(); | |
for (int i = 0; i < keys.size(); i++) { | |
MongoKey key = MongoKey.fromTrident(keys.get(i)); | |
MongoKey siteBucket = new MongoKey(key.collection, key.timestamp, key.siteId, null); | |
Map<String, T> fields = siteBuckets.containsKey(siteBucket) ? | |
siteBuckets.get(siteBucket) : Maps.<String, T>newHashMap(); | |
fields.put(key.fieldName, getContent(i)); | |
siteBuckets.put(siteBucket, fields); | |
} | |
return siteBuckets; | |
} | |
/** | |
* The callback/generator function/block itself. | |
*/ | |
protected abstract T getContent(int i); | |
} | |
/** | |
* Generator that returns the content at index {@code i} from a list. | |
* | |
* @param <T> the type of the field value | |
*/ | |
private static class ValueFromListGenerator<T> extends Generator<T> { | |
private final List<T> values; | |
ValueFromListGenerator(@Nullable List<T> values) { | |
this.values = values; | |
} | |
@Override | |
protected T getContent(int i) { | |
return values.get(i); | |
} | |
} | |
/** | |
* Generator that returns the index itself. | |
*/ | |
private static class IdentityGenerator extends Generator<Integer> { | |
@Override | |
protected Integer getContent(int i) { | |
return i; | |
} | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Copyright 2014 BrightTag, Inc. All rights reserved. | |
package com.brighttag.storm.mongo; | |
import java.net.UnknownHostException; | |
import java.util.Map; | |
import backtype.storm.task.IMetricsContext; | |
import backtype.storm.tuple.Values; | |
import com.google.common.base.Objects; | |
import com.google.common.base.Throwables; | |
import com.google.common.collect.ImmutableMap; | |
import com.mongodb.Mongo; | |
import com.mongodb.MongoException; | |
import com.brighttag.storm.mongo.serializer.MongoNonTransactionalSerializer; | |
import com.brighttag.storm.mongo.serializer.MongoOpaqueSerializer; | |
import com.brighttag.storm.mongo.serializer.MongoSerializer; | |
import com.brighttag.storm.mongo.serializer.MongoTransactionalSerializer; | |
import storm.trident.state.OpaqueValue; | |
import storm.trident.state.State; | |
import storm.trident.state.StateFactory; | |
import storm.trident.state.StateType; | |
import storm.trident.state.map.CachedMap; | |
import storm.trident.state.map.IBackingMap; | |
import storm.trident.state.map.MapState; | |
import storm.trident.state.map.NonTransactionalMap; | |
import storm.trident.state.map.OpaqueMap; | |
import storm.trident.state.map.SnapshottableMap; | |
import storm.trident.state.map.TransactionalMap; | |
/** | |
* Factory for creating {@link MongoState}s. | |
* | |
* @author codyaray | |
* @since 3/19/2014 | |
* @param <T> the type of value (wrapped with transaction metadata, if appropriate). | |
*/ | |
@SuppressWarnings({ "rawtypes", "unchecked", "serial" }) | |
public class MongoStateFactory<T> implements StateFactory { | |
private static final long serialVersionUID = 80291612465975321L; | |
private final StateType stateType; | |
private final String host; | |
private final String dbName; | |
private final MongoStateOptions<T> options; | |
private final MongoSerializer<T> serializer; | |
private static final Map<StateType, MongoSerializer> DEFAULT_SERIALIZERS; | |
static { | |
MongoNonTransactionalSerializer<Integer> nonTransactional = new MongoNonTransactionalSerializer<Integer>() {}; | |
MongoTransactionalSerializer<Integer> transactional = new MongoTransactionalSerializer<Integer>(nonTransactional) {}; | |
MongoOpaqueSerializer<Integer> opaque = new MongoOpaqueSerializer<Integer>(transactional) {}; | |
DEFAULT_SERIALIZERS = ImmutableMap.<StateType, MongoSerializer>of( | |
StateType.NON_TRANSACTIONAL, nonTransactional, | |
StateType.TRANSACTIONAL, transactional, | |
StateType.OPAQUE, opaque); | |
} | |
/** | |
* Return an opaque {@link MongoStateFactory}. | |
*/ | |
public static <T> StateFactory opaque(String host, String dbName) { | |
return opaque(host, dbName, new MongoStateOptions<OpaqueValue<T>>()); | |
} | |
/** | |
* Return an opaque {@link MongoStateFactory} with the given {@code options}. | |
*/ | |
public static <T> StateFactory opaque(String host, String dbName, MongoStateOptions<OpaqueValue<T>> options) { | |
return new MongoStateFactory<OpaqueValue<T>>(StateType.OPAQUE, host, dbName, options); | |
} | |
public MongoStateFactory(StateType stateType, String host, String dbName, MongoStateOptions<T> options) { | |
this.stateType = stateType; | |
this.host = host; | |
this.dbName = dbName; | |
this.options = options; | |
this.serializer = Objects.firstNonNull(options.serializer, DEFAULT_SERIALIZERS.get(stateType)); | |
} | |
@Override | |
public State makeState(Map conf, IMetricsContext metrics, int partitionIndex, int numPartitions) { | |
try { | |
MongoState<T> state = new MongoState<T>(new Mongo(host).getDB(dbName), serializer); | |
CachedMap<T> cachedMap = new CachedMap<T>(state, options.localCacheSize); | |
MapState<T> mapState = buildMapState(cachedMap); | |
return new SnapshottableMap<T>(mapState, new Values(options.globalKey)); | |
} catch (UnknownHostException e) { | |
throw Throwables.propagate(e); | |
} catch (MongoException e) { | |
throw Throwables.propagate(e); | |
} | |
} | |
private MapState<T> buildMapState(IBackingMap cachedMap) { | |
switch (stateType) { | |
case NON_TRANSACTIONAL: | |
return NonTransactionalMap.build(cachedMap); | |
case OPAQUE: | |
return OpaqueMap.build(cachedMap); | |
case TRANSACTIONAL: | |
return TransactionalMap.build(cachedMap); | |
default: | |
throw new RuntimeException("Unknown state type: " + stateType); | |
} | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Copyright 2014 BrightTag, Inc. All rights reserved. | |
package com.brighttag.storm.mongo; | |
import java.io.Serializable; | |
import com.brighttag.storm.mongo.serializer.MongoSerializer; | |
/** | |
* Various options for configuring the {@link MongoState}. | |
* | |
* @author codyaray | |
* @since 3/19/2014 | |
* @param <T> the type of value (wrapped with transaction metadata, if appropriate). | |
*/ | |
public class MongoStateOptions<T> implements Serializable { | |
private static final long serialVersionUID = 3936747562105453714L; | |
public int localCacheSize = 10000; | |
public String globalKey = "$__GLOBAL_KEY__$"; | |
public MongoSerializer<T> serializer; | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Copyright 2014 BrightTag, Inc. All rights reserved. | |
package com.brighttag.storm.mongo.serializer; | |
import com.mongodb.DBObject; | |
import storm.trident.state.TransactionalValue; | |
/** | |
* Serialize a key-value pair to MongoDB with transactional metadata. | |
* | |
* @author codyaray | |
* @since 3/19/2014 | |
* @param <T> the type of the value | |
*/ | |
public class MongoTransactionalSerializer<T> implements MongoSerializer<TransactionalValue<T>> { | |
private static final long serialVersionUID = 1L; | |
private static final String TXID_PREFIX = "txid-"; | |
private final MongoNonTransactionalSerializer<T> base; | |
public MongoTransactionalSerializer(MongoNonTransactionalSerializer<T> base) { | |
this.base = base; | |
} | |
@Override | |
public DBObject serialize(String fieldName, TransactionalValue<T> val) { | |
DBObject obj = base.serialize(fieldName, val.getVal()); | |
obj.put(TXID_PREFIX + fieldName, val.getTxid()); | |
return obj; | |
} | |
@Override | |
public TransactionalValue<T> deserialize(DBObject obj, String fieldName) { | |
T val = base.deserialize(obj, fieldName); | |
Object txidObj = obj.get(TXID_PREFIX + fieldName); | |
long txid = (txidObj == null) ? 0 : Long.parseLong(txidObj.toString()); | |
return new TransactionalValue<T>(txid, val); | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Copyright 2014 BrightTag, Inc. All rights reserved. | |
package com.brighttag.storm.shard; | |
import java.util.List; | |
import java.util.Map; | |
import java.util.Random; | |
import com.google.common.collect.Lists; | |
import com.brighttag.storm.mongo.MongoStateFactory; | |
import backtype.storm.task.IMetricsContext; | |
import storm.trident.state.State; | |
import storm.trident.state.StateFactory; | |
import storm.trident.state.map.MapState; | |
import storm.trident.state.map.ReadOnlyMapState; | |
/** | |
* Randomly shards batches between delegate {@link State state}s. | |
* | |
* @author codyaray | |
* @since 4/23/2014 | |
*/ | |
public class RandomShardState implements MapState, State { | |
private final Random rand = new Random(); | |
private final List<State> delegates; | |
// Set and unset in beginCommit/commit | |
private State shard; | |
private RandomShardState(List<State> delegates) { | |
this.delegates = delegates; | |
} | |
@Override | |
public void beginCommit(Long txid) { | |
shard = delegates.get(rand.nextInt(delegates.size())); | |
shard.beginCommit(txid); | |
} | |
@Override | |
public void commit(Long txid) { | |
shard.commit(txid); | |
shard = null; | |
} | |
@Override | |
public List multiGet(List keys) { | |
return ((ReadOnlyMapState) shard).multiGet(keys); | |
} | |
@Override | |
public List multiUpdate(List keys, List vals) { | |
return ((MapState) shard).multiUpdate(keys, vals); | |
} | |
@Override | |
public void multiPut(List keys, List vals) { | |
((MapState) shard).multiPut(keys, vals); | |
} | |
/** | |
* Factory to create {@link RandomShardState}s. | |
*/ | |
public static class Factory implements StateFactory { | |
private static final long serialVersionUID = -6289401502963920055L; | |
private final List<StateFactory> delegates; | |
public Factory(List<StateFactory> delegates) { | |
this.delegates = delegates; | |
} | |
@Override | |
@SuppressWarnings({ "rawtypes" }) | |
public State makeState(Map conf, IMetricsContext metrics, int partitionIndex, int numPartitions) { | |
List<State> states = Lists.newArrayListWithCapacity(delegates.size()); | |
for (StateFactory factory : delegates) { | |
states.add(factory.makeState(conf, metrics, partitionIndex, numPartitions)); | |
} | |
return new RandomShardState(states); | |
} | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Copyright 2014 BrightTag, Inc. All rights reserved. | |
package com.brighttag.storm.utils; | |
import static com.google.common.base.Preconditions.checkArgument; | |
/** | |
* Computes the parallelism for a particular topology and machine configuration. | |
* | |
* @author codyaray | |
* @since 4/21/2014 | |
*/ | |
public class StormParallelism { | |
private final int numberHosts; | |
private final int numberWorkersPerHost; | |
private final int numberWorkers; | |
private final int numberKafkaPartitions; | |
private final int numberKafkaPartitionsPerSpout; | |
private final int spoutParallelism; | |
private final int numberExecutorsPerCore; | |
private final int numberCoresPerHost; | |
private final int numberExecutorsPerHost; | |
private final int numberExecutorsPerWorker; | |
private final int transformParallelism; | |
private final int persistenceParallelism; | |
private StormParallelism(Builder builder) { | |
// Number of workers should be a multiple of number of machines | |
this.numberHosts = builder.numberHosts; | |
this.numberWorkersPerHost = builder.numberWorkersPerHost; | |
this.numberWorkers = numberHosts * numberWorkersPerHost; | |
// Number of partitions should be a multiple of spout parallelism | |
this.numberKafkaPartitions = builder.numberKafkaPartitions; | |
this.numberKafkaPartitionsPerSpout = builder.numberKafkaPartitionsPerSpout; | |
this.spoutParallelism = numberKafkaPartitions / numberKafkaPartitionsPerSpout; | |
// Parallelism should be a multiple of number of workers | |
this.numberExecutorsPerCore = builder.numberExecutorsPerCore; | |
this.numberCoresPerHost = builder.numberCoresPerHost; | |
this.numberExecutorsPerHost = numberExecutorsPerCore * numberCoresPerHost; | |
this.numberExecutorsPerWorker = numberExecutorsPerHost / numberWorkersPerHost; | |
this.transformParallelism = numberExecutorsPerWorker * numberWorkers; | |
// Reduce parallelism in persistence for best cache efficiency and lowest bulk-request overhead | |
this.persistenceParallelism = numberWorkers; | |
} | |
public int getNumberWorkers() { | |
return numberWorkers; | |
} | |
public int forSpoutLayer() { | |
return spoutParallelism; | |
} | |
public int forTransformLayer() { | |
return transformParallelism; | |
} | |
public int forPersistenceLayer() { | |
return persistenceParallelism; | |
} | |
public static Builder builder() { | |
return new Builder(); | |
} | |
/** | |
* Builder of {@link StormParallelism} configurations. | |
*/ | |
public static class Builder { | |
private int numberHosts; | |
private int numberWorkersPerHost = 1; | |
private int numberKafkaPartitions; | |
private int numberKafkaPartitionsPerSpout = 1; | |
private int numberCoresPerHost; | |
private int numberExecutorsPerCore = 1; | |
public Builder numberHosts(int numberHosts) { | |
this.numberHosts = numberHosts; | |
return this; | |
} | |
public Builder numberWorkersPerHost(int numberWorkersPerHost) { | |
this.numberWorkersPerHost = numberWorkersPerHost; | |
return this; | |
} | |
public Builder numberKafkaPartitions(int numberKafkaPartitions) { | |
this.numberKafkaPartitions = numberKafkaPartitions; | |
return this; | |
} | |
public Builder numberKafkaPartitionsPerSpout(int numberKafkaPartitionsPerSpout) { | |
this.numberKafkaPartitionsPerSpout = numberKafkaPartitionsPerSpout; | |
return this; | |
} | |
public Builder numberCoresPerHost(int numberCoresPerHost) { | |
this.numberCoresPerHost = numberCoresPerHost; | |
return this; | |
} | |
public Builder numberExecutorsPerCore(int numberExecutorsPerCore) { | |
this.numberExecutorsPerCore = numberExecutorsPerCore; | |
return this; | |
} | |
public StormParallelism build() { | |
checkArgument(numberHosts > 0); | |
checkArgument(numberKafkaPartitions > 0); | |
checkArgument(numberCoresPerHost > 0); | |
return new StormParallelism(this); | |
} | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Copyright 2014 BrightTag, Inc. All rights reserved. | |
package com.brighttag.storm.utils; | |
import javax.annotation.Nullable; | |
import backtype.storm.Config; | |
import backtype.storm.LocalCluster; | |
import backtype.storm.StormSubmitter; | |
import backtype.storm.generated.AlreadyAliveException; | |
import backtype.storm.generated.InvalidTopologyException; | |
import backtype.storm.generated.StormTopology; | |
import backtype.storm.utils.Utils; | |
/** | |
* Helpers to run topologies in Storm. | |
* | |
* @author codyaray | |
* @since 3/26/2014 | |
*/ | |
public class StormRunner { | |
private static final int MILLIS_IN_SEC = 1000; | |
private StormRunner() { /* No instances */ } | |
/** | |
* Runs a N-worker topology in a cluster or locally if {@code topologyName} is {@code null}. | |
* | |
* @param topology the storm topology | |
* @param numWorkers the number of workers for this topology or {@code maxTaxParallelism} in local mode | |
* @param topologyName the topology name or {@code null} to run in local mode | |
*/ | |
public static void runTopology(StormTopology topology, int numWorkers, @Nullable String topologyName) | |
throws AlreadyAliveException, InvalidTopologyException { | |
Config conf = new Config(); | |
if (topologyName != null) { | |
conf.setNumWorkers(numWorkers); | |
} else { | |
conf.setMaxTaskParallelism(numWorkers); | |
} | |
runTopology(topology, conf, topologyName); | |
} | |
/** | |
* Runs a topology in a cluster or locally if {@code topologyName} is {@code null}. | |
* | |
* @param topology the storm topology | |
* @param topologyName the topology name or {@code null} to run in local mode | |
* @param conf the topology configuration | |
*/ | |
public static void runTopology(StormTopology topology, Config conf, @Nullable String topologyName) | |
throws AlreadyAliveException, InvalidTopologyException { | |
if (topologyName != null) { | |
StormSubmitter.submitTopology(topologyName, conf, topology); | |
} else { | |
runTopologyLocally(topology, "stormseries", conf, 10000); | |
} | |
} | |
/** | |
* Runs a topology in local mode. | |
* | |
* @param topology the storm topology | |
* @param topologyName the topology name | |
* @param conf the topology configuration | |
* @param runtimeInSeconds how long to run the topology before killing it | |
*/ | |
public static void runTopologyLocally(StormTopology topology, String topologyName, Config conf, long runtimeInSeconds) { | |
LocalCluster cluster = new LocalCluster(); | |
cluster.submitTopology(topologyName, conf, topology); | |
Utils.sleep(runtimeInSeconds * MILLIS_IN_SEC); | |
cluster.killTopology(topologyName); | |
cluster.shutdown(); | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Copyright 2014 BrightTag, Inc. All rights reserved. | |
package com.brighttag.storm.utils; | |
import backtype.storm.tuple.Values; | |
import storm.trident.operation.BaseFunction; | |
import storm.trident.operation.TridentCollector; | |
import storm.trident.tuple.TridentTuple; | |
/** | |
* Formats a given string from the incoming fields. | |
*/ | |
public class StringFormatter extends BaseFunction { | |
private static final long serialVersionUID = 9223269171323302719L; | |
private final String format; | |
public StringFormatter(String format) { | |
this.format = format; | |
} | |
@Override | |
public void execute(TridentTuple tuple, TridentCollector collector) { | |
Object[] formatArgs = new String[tuple.size()]; | |
for (int i=0; i<tuple.size(); i++) { | |
formatArgs[i] = tuple.getString(i); | |
} | |
collector.emit(new Values(String.format(format, formatArgs))); | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Copyright 2014 BrightTag, Inc. All rights reserved. | |
package com.brighttag.storm.apps; | |
import java.net.UnknownHostException; | |
import java.util.List; | |
import java.util.Map; | |
import backtype.storm.generated.AlreadyAliveException; | |
import backtype.storm.generated.InvalidTopologyException; | |
import backtype.storm.generated.StormTopology; | |
import backtype.storm.tuple.Fields; | |
import com.google.common.base.Splitter; | |
import com.google.common.collect.ImmutableList; | |
import com.google.common.collect.ImmutableMap; | |
import com.google.common.collect.Lists; | |
import storm.kafka.ZkHosts; | |
import storm.kafka.trident.OpaqueTridentKafkaSpout; | |
import storm.kafka.trident.TridentKafkaConfig; | |
import storm.trident.Stream; | |
import storm.trident.TridentTopology; | |
import storm.trident.operation.builtin.Sum; | |
import storm.trident.state.StateFactory; | |
import com.brighttag.storm.shard.RandomShardState; | |
import com.brighttag.storm.json.TagRequestMetricJsonParser; | |
import com.brighttag.storm.mongo.MongoStateFactory; | |
import com.brighttag.storm.utils.BinaryToString; | |
import com.brighttag.storm.utils.Bucket; | |
import com.brighttag.storm.utils.StormParallelism; | |
import com.brighttag.storm.utils.StormRunner; | |
import com.brighttag.storm.utils.StringFormatter; | |
/** | |
* Storm topology which ingests metrics as {@link TagRequestMetrics} JSON from Kafka, | |
* aggregates them into buckets, and stores the aggregated results in MongoDB. | |
* | |
* This is a transition topology designed to pre-aggregate data for reducing write load on Mongo. | |
* Its designed to be a turn-key replacement for current statscollectors, writing data | |
* in the same format (but with additional txid-* and prev-* fields). | |
* | |
* @author codyaray | |
* @since 3/18/2014 | |
*/ | |
public class TagRequestMongoBucketizer { | |
private static final Map<String, Long> BUCKETS = ImmutableMap.of("30s", 30000L, "30m", 1800000L); | |
private static final int NUMBER_KAFKA_PARTITIONS = 8; | |
private static final int NUMBER_STORM_HOSTS = 3; | |
private static final int NUMBER_STORM_CORES_PER_HOST = 2; | |
private static final int NUMBER_STORM_EXECUTORS_PER_CORE = 2; | |
/** | |
* Builds the Storm topology. | |
*/ | |
private static StormTopology buildTopology(StormParallelism parallelism, String zookeepers, | |
String kafkaTopic, List<String> mongoHosts, String mongoDatabase, String mongoCollectionPrefix) | |
throws UnknownHostException { | |
// Input Spout | |
TridentKafkaConfig config = new TridentKafkaConfig(new ZkHosts(zookeepers), kafkaTopic); | |
config.fetchSizeBytes = 5 * (1024*1024); // 5 MB | |
OpaqueTridentKafkaSpout spout = new OpaqueTridentKafkaSpout(config); | |
// Output State | |
List<StateFactory> factories = Lists.newArrayListWithCapacity(mongoHosts.size()); | |
for (String host : mongoHosts) { | |
factories.add(MongoStateFactory.opaque(host, mongoDatabase)); | |
} | |
StateFactory stateFactory = new RandomShardState.Factory(factories); | |
// Aggregation Topology | |
TridentTopology topology = new TridentTopology(); | |
Stream stream = topology.newStream("spout2", spout) | |
.parallelismHint(parallelism.forSpoutLayer()).shuffle().name("Transform") | |
.each(new Fields("bytes"), new BinaryToString(), new Fields("string")) | |
.each(new Fields("string"), new TagRequestMetricJsonParser(), new Fields("timestamp", "siteId", "field", "value")); | |
for (Map.Entry<String, Long> entry : BUCKETS.entrySet()) { | |
stream.each(new Fields("timestamp"), new Bucket(entry.getValue()), new Fields("bucketStart", "bucketEnd")) | |
.each(new StringFormatter(mongoCollectionPrefix + entry.getKey()), new Fields("collection")) | |
.parallelismHint(parallelism.forTransformLayer()) | |
.groupBy(new Fields("collection", "bucketStart", "siteId", "field")) | |
.name("Aggregator-" + entry.getKey()) | |
.persistentAggregate(stateFactory, new Fields("value"), new Sum(), new Fields("count")) | |
.parallelismHint(parallelism.forPersistenceLayer()); | |
} | |
return topology.build(); | |
} | |
private static final Splitter HOST_SPLITTER = Splitter.on(',').trimResults().omitEmptyStrings(); | |
private static List<String> split(String str) { | |
return ImmutableList.copyOf(HOST_SPLITTER.split(str)); | |
} | |
/** | |
* Runs the topology locally or in a cluster if a [cluster-topology-name] is given. | |
*/ | |
public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException, UnknownHostException { | |
if (args.length < 5) { | |
System.out.println("Usage: " + TagRequestMongoBucketizer.class.getSimpleName() + " <zoo-hosts> <kafka-topic> <mongo-host> <mongo-database> <mongo-collection-prefix> [cluster-topology-name]"); | |
return; | |
} | |
StormParallelism parallelism = StormParallelism.builder() | |
.numberKafkaPartitions(NUMBER_KAFKA_PARTITIONS) | |
.numberHosts(NUMBER_STORM_HOSTS) | |
.numberCoresPerHost(NUMBER_STORM_CORES_PER_HOST) | |
.numberExecutorsPerCore(NUMBER_STORM_EXECUTORS_PER_CORE) | |
.build(); | |
StormTopology stormTopology = buildTopology(parallelism, args[0], args[1], split(args[2]), args[3], args[4]); | |
StormRunner.runTopology(stormTopology, parallelism.getNumberWorkers(), args.length < 6 ? null : args[5]); | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Copyright 2014 BrightTag, Inc. All rights reserved. | |
package com.brighttag.storm.json; | |
import org.slf4j.Logger; | |
import org.slf4j.LoggerFactory; | |
import backtype.storm.tuple.Values; | |
import com.onetag.metrics.TagRequestMetrics; | |
import com.onetag.metrics.json.JSONTagRequestMetricsMarshaller; | |
import storm.trident.operation.BaseFunction; | |
import storm.trident.operation.TridentCollector; | |
import storm.trident.tuple.TridentTuple; | |
/** | |
* Parses the first tuple as a TagRequestMetric JSON string and emits key-value tuple | |
* as per the current tagserve/statscollector logic. | |
*/ | |
public class TagRequestMetricJsonParser extends BaseFunction { | |
private static final long serialVersionUID = 7592816813615529588L; | |
private static final Logger log = LoggerFactory.getLogger(TagRequestMetricJsonParser.class); | |
private static final String TAG_PREFIX = "tag"; | |
private static final String PAGE_PREFIX = "page"; | |
private static JSONTagRequestMetricsMarshaller MARSHALLER = new JSONTagRequestMetricsMarshaller(); | |
@Override | |
public void execute(TridentTuple tuple, TridentCollector collector) { | |
try { | |
TagRequestMetrics metrics = MARSHALLER.unmarshal(tuple.getString(0)); | |
String siteId = metrics.getSiteId(); | |
long timestamp = metrics.getRequestInterval().getStartMillis(); | |
if (!metrics.isSecondaryRequest()) { | |
collector.emit(new Values(timestamp, siteId, "siteCount", 1)); | |
collector.emit(new Values(timestamp, siteId, metrics.getSource().lowerCamelName(), 1)); | |
for (Long pageId : metrics.getPageIds()) { | |
collector.emit(new Values(timestamp, siteId, PAGE_PREFIX + pageId, 1)); | |
} | |
} | |
for (Long tagId : metrics.getTagFires().keySet()) { | |
collector.emit(new Values(timestamp, siteId, TAG_PREFIX + tagId, 1)); | |
} | |
} catch (Exception e) { | |
log.info("Problem parsing JSON {}: {}", tuple.getString(0), e); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment