Skip to content

Instantly share code, notes, and snippets.

@mushkevych
Created March 23, 2012 18:39
Show Gist options
  • Save mushkevych/2173634 to your computer and use it in GitHub Desktop.
Save mushkevych/2173634 to your computer and use it in GitHub Desktop.
Csv -> HBase importer, based on Surus
package com.reinvent.synergy.data.csvimport;
import com.reinvent.synergy.data.model.Constants;
import com.reinvent.synergy.data.model.UserLog;
import com.reinvent.synergy.data.system.PoolManager;
import com.reinvent.synergy.data.system.TableContext;
import com.reinvent.synergy.data.system.TimePeriodHelper;
import org.apache.log4j.Logger;
import org.apache.log4j.PropertyConfigurator;
import org.supercsv.io.CsvMapReader;
import org.supercsv.io.ICsvMapReader;
import org.supercsv.prefs.CsvPreference;
import java.io.FileReader;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.*;
/**
* @author Bohdan Mushkevych
* date: 22 Mar 2012
* Description: Method opens CSV file and streams it to workers for processing
*/
public class CsvImportServer<T> extends Thread {
public static final String PROPERTY_LOG4J = "log4j.configuration";
public static final int NUMBER_OF_CONCURRENT_THREADS = 5;
protected PoolManager<T> poolManager;
protected Logger logger;
protected String filePath;
protected String tableName;
protected String familyName;
protected Class<T> clazzDataModel;
protected BlockingQueue<Runnable> blockingQueue = new LinkedBlockingQueue<Runnable>(NUMBER_OF_CONCURRENT_THREADS * 11);
protected ExecutorService threadPool = new ThreadPoolExecutor(NUMBER_OF_CONCURRENT_THREADS,
NUMBER_OF_CONCURRENT_THREADS,
0L,
TimeUnit.MILLISECONDS,
blockingQueue);
static {
PropertyConfigurator.configure(System.getProperty(PROPERTY_LOG4J));
}
public CsvImportServer(String filePath, String familyName, Class<T> clazzDataModel, String tableName) {
this.filePath = filePath;
this.familyName = familyName;
this.clazzDataModel = clazzDataModel;
this.tableName = tableName;
this.poolManager = TableContext.getPoolManager(tableName);
logger = Logger.getLogger(this.clazzDataModel.getSimpleName());
logger.info(String.format("Started Synergy CVS Importer from %s for %s:%s", this.filePath, this.tableName, this.familyName));
}
/**
* Algorithm:
* - Open CSV file and read CSV maps into thread-pool queue.
* - Queue must not be overloaded, as it causes OOM exceptions and degrades performance
*/
public void run() {
Map<String, String> map;
ICsvMapReader reader = null;
try {
reader = new CsvMapReader(new FileReader(this.filePath), CsvPreference.STANDARD_PREFERENCE);
String[] header = reader.getCSVHeader(true);
map = reader.read(header);
while (map != null) {
try {
while (blockingQueue.remainingCapacity() < NUMBER_OF_CONCURRENT_THREADS) {
Thread.yield();
}
threadPool.submit(new CsvImportWorker(map, familyName, poolManager));
map = reader.read(header);
} catch (Exception e) {
logger.error("Exception at worker level", e);
}
}
logger.info(String.format("Imported %d events %n", reader.getLineNumber()));
shutdownAndFlush();
} catch (IOException e) {
logger.error("Server side exception.", e);
} catch (Exception e) {
logger.error("Unexpected server side exception.", e);
} finally {
try {
if (reader != null) {
reader.close();
}
} catch (IOException e) {
logger.error("Exception on closing CSV reader.", e);
}
}
}
/**
* Since we use both thread-pooling and resource-pooling, we must make sure they are properly disposed
* Thread-pooling is given 5 minutes for normal shut-down, while resource-pooling is default to 30 secs
* @throws InterruptedException the
*/
void shutdownAndFlush() throws InterruptedException {
threadPool.shutdown(); // Disable new tasks from being submitted
try {
// Wait a while for existing tasks to terminate
if (!threadPool.awaitTermination(300, TimeUnit.SECONDS)) {
threadPool.shutdownNow(); // Cancel currently executing tasks
// Wait a while for tasks to respond to being cancelled
if (!threadPool.awaitTermination(60, TimeUnit.SECONDS)) {
logger.error("Thread Pool did not terminate");
}
}
} catch (InterruptedException ie) {
// (Re-)Cancel if current thread also interrupted
threadPool.shutdownNow();
logger.error("Interrupted Exception on Thread Pool closing. Re-shutting down.");
}
poolManager.flushTable();
logger.info("HBase flush requested. Waiting to safely persist datum");
while (poolManager.isFlushRequested()) {
Thread.yield();
poolManager.flushTable();
}
logger.info("HBase flush completed. Exiting");
}
public static void main(String args[]) throws Exception {
if (args.length != 2) {
System.out.println("Wrong number of parameters. Should be csv_file_path column_name");
return;
}
CsvImportServer<UserLog> server = new CsvImportServer<UserLog>(args[0], args[1], UserLog.class, Constants.TABLE_USER_LOG);
server.start();
}
}
package com.reinvent.synergy.data.csvimport;
import com.reinvent.synergy.data.model.Constants;
import com.reinvent.synergy.data.rest.RequestHandler;
import com.reinvent.synergy.data.rest.UserLogPostHandler;
import com.reinvent.synergy.data.system.PoolManager;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.log4j.Logger;
import java.util.Map;
/**
* @author Bohdan Mushkevych
* date 21 Mar 2012
* Description: receives CSV mapping from Server and converts it into HBase Put
*/
public class CsvImportWorker implements Runnable {
protected static Logger logger = Logger.getLogger(CsvImportWorker.class.getSimpleName());
protected PoolManager poolManager;
protected Map<String, String> map;
public CsvImportWorker(Map<String, String> map, String strColumnFamily, PoolManager poolManager) {
this.map = map;
this.poolManager = poolManager;
}
public void run() {
HTable hTable = null;
try {
hTable = poolManager.getTable();
if (map != null) {
Integer userId = Integer.valueOf(map.get(RequestHandler.PARAMETER_USER_ID));
String timestamp = map.get(RequestHandler.PARAMETER_CREATED_AT);
Put putA = new Put(Bytes.toBytes(userId));
// ***
// SOMETHING VERY USEFUL HERE
// ***
hTable.put(putA);
}
} catch (Exception e) {
logger.error("Unexpected error during processing");
} finally {
if (hTable != null) {
poolManager.putTable(hTable);
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment