Last active
August 29, 2015 13:57
-
-
Save codyaray/9897217 to your computer and use it in GitHub Desktop.
Storm trident function to parse metrics from JSON
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Copyright 2014 BrightTag, Inc. All rights reserved. | |
package com.brighttag.storm.utils; | |
import backtype.storm.tuple.Values; | |
import storm.trident.operation.BaseFunction; | |
import storm.trident.operation.TridentCollector; | |
import storm.trident.tuple.TridentTuple; | |
/** | |
* Converts the first tuple from a byte array into a string. | |
*/ | |
public class BinaryToString extends BaseFunction { | |
private static final long serialVersionUID = -8686873770270590062L; | |
@Override | |
public void execute(TridentTuple tuple, TridentCollector collector) { | |
String field = new String(tuple.getBinary(0)); | |
collector.emit(new Values(field)); | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Copyright 2014 BrightTag, Inc. All rights reserved. | |
package com.brighttag.storm.utils; | |
import java.math.RoundingMode; | |
import backtype.storm.tuple.Values; | |
import com.google.common.math.DoubleMath; | |
import storm.trident.operation.BaseFunction; | |
import storm.trident.operation.TridentCollector; | |
import storm.trident.tuple.TridentTuple; | |
/** | |
* Maps a timestamp into a bucket of size {@code interval}. | |
* | |
* Assumes the first tuple value is a long timestamp. | |
* Outputs the {@code bucketStart} and {@code bucketEnd}. | |
*/ | |
public class Bucket extends BaseFunction { | |
private static final long serialVersionUID = 1042081321412192768L; | |
private final long interval; | |
public Bucket(long interval) { | |
this.interval = interval; | |
} | |
@Override | |
public void execute(TridentTuple tuple, TridentCollector collector) { | |
long timestamp = tuple.getLong(0); | |
long bucketStart = DoubleMath.roundToLong( | |
Math.floor(timestamp / interval), RoundingMode.UNNECESSARY) * interval; | |
long bucketEnd = bucketStart + interval; | |
collector.emit(new Values(String.valueOf(bucketStart), String.valueOf(bucketEnd))); | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Copyright 2014 BrightTag, Inc. All rights reserved. | |
package com.brighttag.storm.json; | |
import backtype.storm.tuple.Values; | |
import org.codehaus.jettison.json.JSONArray; | |
import org.codehaus.jettison.json.JSONException; | |
import org.codehaus.jettison.json.JSONObject; | |
import storm.trident.operation.BaseFunction; | |
import storm.trident.operation.TridentCollector; | |
import storm.trident.tuple.TridentTuple; | |
/** | |
* Parses the first tuple as a JSON string into a metric tuple | |
* with the form (timestamp, name, value). | |
* | |
* The input JSON must have the form | |
* <pre>{@code | |
* { | |
* "name": [string], | |
* "timestamp": [long], | |
* "value": [long] | |
* } | |
* }</pre> | |
*/ | |
public class MetricJsonParser extends BaseFunction { | |
private static final long serialVersionUID = 7592816813615529588L; | |
@Override | |
public void execute(TridentTuple tuple, TridentCollector collector) { | |
try { | |
JSONArray array = new JSONArray(tuple.getString(0)); | |
for (int i = 0; i < array.length(); i++) { | |
JSONObject object = array.getJSONObject(i); | |
String name = object.getString("name"); | |
long timestamp = object.getLong("timestamp"); | |
long value = object.getLong("value"); | |
collector.emit(new Values(timestamp, name, value)); | |
} | |
} catch (JSONException e) { | |
System.out.println("Problem with incoming JSON: " + e); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment