Created
May 11, 2013 03:25
-
-
Save quintona/5558787 to your computer and use it in GitHub Desktop.
A way of doing an outer join of 2 separate streams, in storm trident. Use an outer join reducer. Here is the code for the reducer and associated state. Simply use topology.multiReduce(s1, s2, function, outputFields).
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package storm.cookbook.tfidf.functions; | |
import java.util.Map; | |
import storm.trident.operation.MultiReducer; | |
import storm.trident.operation.TridentCollector; | |
import storm.trident.operation.TridentMultiReducerContext; | |
import storm.trident.tuple.TridentTuple; | |
import backtype.storm.tuple.Values; | |
public class OuterJoinReducer implements MultiReducer<OuterJoinState> { | |
private static final long serialVersionUID = 1L; | |
@Override | |
public void prepare(Map conf, TridentMultiReducerContext context) { | |
} | |
@Override | |
public OuterJoinState init(TridentCollector collector) { | |
return new OuterJoinState(); | |
} | |
@Override | |
public void cleanup() { | |
// TODO Auto-generated method stub | |
} | |
@Override | |
public void execute(OuterJoinState state, int streamIndex, | |
TridentTuple input, TridentCollector collector) { | |
state.addValues(streamIndex, input); | |
} | |
@Override | |
public void complete(OuterJoinState state, | |
TridentCollector collector) { | |
for(Values vals : state.join()){ | |
collector.emit(vals); | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package storm.cookbook.tfidf.functions; | |
import java.util.ArrayList; | |
import java.util.Arrays; | |
import java.util.HashMap; | |
import java.util.List; | |
import storm.trident.tuple.TridentTuple; | |
import backtype.storm.tuple.Values; | |
public class OuterJoinState { | |
//2 columns, based on streamId. Each column contains the tuples from the given stream | |
private HashMap<Integer, List<Object[]>> bothSides = new HashMap<Integer,List<Object[]>>(); | |
public void addValues(int streamId, TridentTuple input){ | |
if(!bothSides.keySet().contains(streamId)){ | |
if(bothSides.keySet().size() >= 2) | |
throw new IllegalArgumentException("Outer join can only be performed between 2 streams"); | |
bothSides.put(streamId, new ArrayList<Object[]>()); | |
} | |
bothSides.get(streamId).add(input.toArray()); | |
} | |
//the shorter side is the LHS | |
private int getLHS(){ | |
int len = Integer.MAX_VALUE; | |
int index = 0; | |
for(int id : bothSides.keySet()){ | |
if(bothSides.get(id).size() < len){ | |
len = bothSides.get(id).size(); | |
index = id; | |
} | |
} | |
return index; | |
} | |
private int getRhs(int lhs){ | |
for(int test : bothSides.keySet()){ | |
if(test != lhs) | |
return test; | |
} | |
throw new IllegalArgumentException("Can't find RHS!"); | |
} | |
public List<Values> join(){ | |
List<Values> ret = new ArrayList<Values>(); | |
try{ | |
int lhsId = getLHS(); | |
int rhsId = getRhs(lhsId); | |
for(Object[] lhs : bothSides.get(lhsId)){ | |
for(Object[] rhs : bothSides.get(rhsId)){ | |
Values vals = new Values(lhs); | |
vals.addAll(Arrays.asList(rhs)); | |
ret.add(vals); | |
} | |
} | |
}catch(Exception e){} | |
return ret; | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment