Created
December 6, 2014 02:51
-
-
Save mravi/a340200bfef66192bf52 to your computer and use it in GitHub Desktop.
Phoenix MR Example
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package org.apache.phoenix.example.bean; | |
import java.util.Arrays; | |
public final class StockBean { | |
private String stockName; | |
private Integer year; | |
private double[] recordings; | |
private double average; | |
public String getStockName() { | |
return stockName; | |
} | |
public void setStockName(String stockName) { | |
this.stockName = stockName; | |
} | |
public Integer getYear() { | |
return year; | |
} | |
public void setYear(Integer year) { | |
this.year = year; | |
} | |
public double[] getRecordings() { | |
return recordings; | |
} | |
public void setRecordings(double[] recordings) { | |
this.recordings = recordings; | |
} | |
public double getAverage() { | |
return average; | |
} | |
public void setAverage(double average) { | |
this.average = average; | |
} | |
@Override | |
public int hashCode() { | |
final int prime = 31; | |
int result = 1; | |
long temp; | |
temp = Double.doubleToLongBits(average); | |
result = prime * result + (int)(temp ^ (temp >>> 32)); | |
result = prime * result + Arrays.hashCode(recordings); | |
result = prime * result + ((stockName == null) ? 0 : stockName.hashCode()); | |
result = prime * result + ((year == null) ? 0 : year.hashCode()); | |
return result; | |
} | |
@Override | |
public boolean equals(Object obj) { | |
if (this == obj) return true; | |
if (obj == null) return false; | |
if (getClass() != obj.getClass()) return false; | |
StockBean other = (StockBean)obj; | |
if (Double.doubleToLongBits(average) != Double.doubleToLongBits(other.average)) return false; | |
if (!Arrays.equals(recordings, other.recordings)) return false; | |
if (stockName == null) { | |
if (other.stockName != null) return false; | |
} else if (!stockName.equals(other.stockName)) return false; | |
if (year == null) { | |
if (other.year != null) return false; | |
} else if (!year.equals(other.year)) return false; | |
return true; | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package org.apache.phoenix.example.job; | |
import java.io.IOException; | |
import org.apache.hadoop.conf.Configuration; | |
import org.apache.hadoop.hbase.HBaseConfiguration; | |
import org.apache.hadoop.hbase.HConstants; | |
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; | |
import org.apache.hadoop.io.NullWritable; | |
import org.apache.hadoop.mapreduce.Job; | |
import org.apache.hadoop.mapreduce.Mapper; | |
import org.apache.phoenix.mapreduce.PhoenixOutputFormat; | |
import org.apache.phoenix.mapreduce.bean.StockBean; | |
import org.apache.phoenix.mapreduce.util.PhoenixMapReduceUtil; | |
import org.apache.phoenix.mapreduce.example.StockWritable; | |
public class StockComputationJob { | |
protected final static String zkUrl = "sandbox.hortonworks.com:2181"; | |
public static class StockMapper extends Mapper<NullWritable, StockWritable, NullWritable , StockWritable> { | |
@Override | |
protected void map(NullWritable key, StockWritable stockWritable, Context context) | |
throws IOException, InterruptedException { | |
final StockBean bean = stockWritable.getStockBean(); | |
double[] recordings = bean.getRecordings(); | |
if(recordings.length == 0) { | |
return; | |
} | |
double sum = 0.0; | |
for(double recording: recordings) { | |
sum += recording; | |
} | |
double avg = sum / recordings.length; | |
bean.setAverage(avg); | |
stockWritable.setStockBean(bean); | |
context.write(NullWritable.get(), stockWritable); | |
} | |
} | |
public static void main(String[] args) throws Exception { | |
final Configuration conf = new Configuration(); | |
HBaseConfiguration.addHbaseResources(conf); | |
conf.set(HConstants.ZOOKEEPER_QUORUM, zkUrl); | |
final Job job = Job.getInstance(conf, "stock-stats-job"); | |
final String selectQuery = "SELECT STOCK_NAME,RECORDING_YEAR,RECORDINGS_QUARTER FROM STOCKS "; | |
PhoenixMapReduceUtil.setInput(job, StockWritable.class, "STOCKS", selectQuery); | |
PhoenixMapReduceUtil.setOutput(job, "STOCKS", "STOCK_NAME,RECORDING_YEAR,RECORDINGS_AVG"); | |
job.setMapperClass(StockMapper.class); | |
job.setOutputFormatClass(PhoenixOutputFormat.class); | |
job.setNumReduceTasks(0); | |
job.setOutputKeyClass(NullWritable.class); | |
job.setOutputValueClass(StockWritable.class); | |
TableMapReduceUtil.addDependencyJars(job); | |
job.waitForCompletion(true); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package org.apache.phoenix.example.writable; | |
import java.io.DataInput; | |
import java.io.DataOutput; | |
import java.io.IOException; | |
import java.sql.Array; | |
import java.sql.PreparedStatement; | |
import java.sql.ResultSet; | |
import java.sql.SQLException; | |
import org.apache.hadoop.io.Writable; | |
import org.apache.hadoop.mapreduce.lib.db.DBWritable; | |
import org.apache.phoenix.mapreduce.example.StockBean; | |
import com.google.common.base.Preconditions; | |
public class StockWritable implements DBWritable,Writable { | |
private StockBean stockBean; | |
@Override | |
public void readFields(DataInput input) throws IOException { | |
} | |
@Override | |
public void write(DataOutput output) throws IOException { | |
} | |
@Override | |
public void readFields(ResultSet rs) throws SQLException { | |
final String stockName = rs.getString("STOCK_NAME"); | |
final int year = rs.getInt("RECORDING_YEAR"); | |
final Array recordingsArray = rs.getArray("RECORDINGS_QUARTER"); | |
final double[] recordings = (double[])recordingsArray.getArray(); | |
stockBean = new StockBean(); | |
stockBean.setStockName(stockName); | |
stockBean.setYear(year); | |
stockBean.setRecordings(recordings); | |
} | |
@Override | |
public void write(PreparedStatement pstmt) throws SQLException { | |
Preconditions.checkNotNull(stockBean); | |
pstmt.setString(1, stockBean.getStockName()); | |
pstmt.setInt(2, stockBean.getYear()); | |
pstmt.setDouble(3, stockBean.getAverage()); | |
} | |
public final StockBean getStockBean() { | |
return stockBean; | |
} | |
public final void setStockBean(StockBean stockBean) { | |
this.stockBean = stockBean; | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment