Created
April 15, 2015 13:41
-
-
Save bigsquirrel/748b50daf15d5b9cb640 to your computer and use it in GitHub Desktop.
big data hw1
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.ivanchou; | |
import org.apache.hadoop.conf.Configuration; | |
import org.apache.hadoop.fs.*; | |
import org.apache.hadoop.hbase.HBaseConfiguration; | |
import org.apache.hadoop.hbase.HColumnDescriptor; | |
import org.apache.hadoop.hbase.HTableDescriptor; | |
import org.apache.hadoop.hbase.TableName; | |
import org.apache.hadoop.hbase.client.HBaseAdmin; | |
import org.apache.hadoop.hbase.client.HTable; | |
import org.apache.hadoop.hbase.client.Put; | |
import java.io.*; | |
import java.net.URI; | |
import java.util.Enumeration; | |
import java.util.Hashtable; | |
/** | |
* Created by ivanchou on 4/10/15. | |
*/ | |
public class Hw1Grp2 { | |
private static final String TABLE_NAME = "Result"; | |
private static final String COLUMN_FAMILY = "res"; | |
private static final String COLUMN_COUNT = "count"; | |
private static final String COLUMN_AVG = "avg"; | |
private static final String COLUMN_MAX = "max"; | |
private static final String USAGE = "usage: java Hw1Grp2 R=[FilePath] groupby:R[X] res:[count | avg(R[X]) | max]"; | |
private String filePath; | |
private int groupByColumn; | |
private boolean countFlag; | |
private boolean maxFlag; | |
private boolean avgFlag; | |
private int avgColumn; | |
private Hashtable<String, DataStructure> hashTable; | |
private BufferedReader bufferedReader; | |
private FileSystem fileSystem; | |
private void handleArgs(String[] args) throws UsageNotCorrectException{ | |
int flag = 0; | |
if (args.length != 3) { | |
throw new UsageNotCorrectException(); | |
} | |
for (int i = 0; i < args.length; i++) { | |
if (args[i].contains("R=")) { | |
flag++; | |
filePath = args[i].split("\\=")[1]; | |
} | |
if (args[i].contains("groupby")) { | |
flag++; | |
groupByColumn = Character.getNumericValue((args[i].charAt(args[i].length() - 1))); | |
} | |
if (args[i].contains("res")) { | |
flag++; | |
String tmp = args[i].split(":")[1]; | |
String[] resType = tmp.split(","); | |
for (int j = 0; j < resType.length; j++) { | |
if (resType[j].equals("count")) { | |
countFlag = true; | |
} else if (resType[j].equals("max")) { | |
maxFlag = true; | |
} else if (resType[j].contains("avg")) { | |
avgFlag = true; | |
if (resType[j].length() == 7) { | |
avgColumn = Character.getNumericValue(resType[j].charAt(5)); | |
} else { | |
throw new UsageNotCorrectException(); | |
} | |
} | |
} | |
} | |
} | |
if (flag != 3) { | |
throw new UsageNotCorrectException(); | |
} | |
} | |
private void readFileFromHadoop() throws IOException { | |
Configuration conf = new Configuration(); | |
fileSystem = FileSystem.get(URI.create(filePath), conf); | |
Path path = new Path(filePath); | |
FSDataInputStream is = fileSystem.open(path); | |
bufferedReader = new BufferedReader(new InputStreamReader(is)); | |
} | |
private void handleFile() throws IOException, NumberFormatException { | |
String str; | |
DataStructure ds; | |
hashTable = new Hashtable<String, DataStructure>(); | |
while ((str = bufferedReader.readLine()) != null) { | |
String[] strs = str.split("\\|"); | |
int tmp = Integer.valueOf(strs[avgColumn]); | |
if (hashTable.get(strs[groupByColumn]) != null) { | |
ds = hashTable.get(strs[groupByColumn]); | |
ds.count++; | |
ds.sum += tmp; | |
ds.avg = (float)ds.sum / ds.count; | |
if (ds.max < tmp) { | |
ds.max = tmp; | |
} | |
hashTable.put(strs[groupByColumn], ds); | |
} else { | |
ds = new DataStructure(); | |
ds.avg = ds.max = ds.sum = tmp; | |
ds.count++; | |
hashTable.put(strs[groupByColumn], ds); | |
} | |
} | |
bufferedReader.close(); | |
fileSystem.close(); | |
} | |
private void writeToHBase() throws IOException { | |
HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(TABLE_NAME)); | |
HColumnDescriptor cf = new HColumnDescriptor(COLUMN_FAMILY); | |
htd.addFamily(cf); | |
Configuration conf = HBaseConfiguration.create(); | |
HBaseAdmin hAdmin = new HBaseAdmin(conf); | |
if (hAdmin.tableExists(TABLE_NAME)) { | |
hAdmin.disableTable(TABLE_NAME); | |
hAdmin.deleteTable(TABLE_NAME); | |
} | |
hAdmin.createTable(htd); | |
hAdmin.close(); | |
HTable hTable = new HTable(conf, TABLE_NAME); | |
Enumeration<String> enumKey = hashTable.keys(); | |
while (enumKey.hasMoreElements()) { | |
String key = enumKey.nextElement(); | |
DataStructure val = hashTable.get(key); | |
Put put = new Put(key.getBytes()); | |
if (countFlag) { | |
put.add(COLUMN_FAMILY.getBytes(), COLUMN_COUNT.getBytes(), String.valueOf(val.count).getBytes()); | |
} | |
if (maxFlag) { | |
put.add(COLUMN_FAMILY.getBytes(), COLUMN_MAX.getBytes(), String.valueOf(val.max).getBytes()); | |
} | |
if (avgFlag) { | |
put.add(COLUMN_FAMILY.getBytes(), (COLUMN_AVG + "(R" + avgColumn + ")").getBytes(), String.valueOf(val.avg).getBytes()); | |
} | |
hTable.put(put); | |
} | |
hTable.close(); | |
} | |
public static void main(String[] args) { | |
Hw1Grp2 hw1Grp2 = new Hw1Grp2(); | |
try { | |
// 处理输入 | |
hw1Grp2.handleArgs(args); | |
// Hadoop 读入文件 | |
hw1Grp2.readFileFromHadoop(); | |
// 分析文件 | |
hw1Grp2.handleFile(); | |
// 写入 HBase | |
hw1Grp2.writeToHBase(); | |
} catch (NumberFormatException e) { | |
System.out.println("The avg column can't be string!"); | |
} catch (UsageNotCorrectException e) { | |
System.out.println(USAGE); | |
} catch (IOException e) { | |
e.printStackTrace(); | |
} | |
} | |
public class DataStructure { | |
public long count; | |
public float avg; | |
public long max; | |
public long sum; | |
public DataStructure() { | |
avg = count = max = sum = 0; | |
} | |
} | |
public class UsageNotCorrectException extends Exception { | |
public UsageNotCorrectException() { | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment