Created
March 23, 2012 18:39
-
-
Save mushkevych/2173634 to your computer and use it in GitHub Desktop.
Csv -> HBase importer, based on Surus
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.reinvent.synergy.data.csvimport; | |
import com.reinvent.synergy.data.model.Constants; | |
import com.reinvent.synergy.data.model.UserLog; | |
import com.reinvent.synergy.data.system.PoolManager; | |
import com.reinvent.synergy.data.system.TableContext; | |
import com.reinvent.synergy.data.system.TimePeriodHelper; | |
import org.apache.log4j.Logger; | |
import org.apache.log4j.PropertyConfigurator; | |
import org.supercsv.io.CsvMapReader; | |
import org.supercsv.io.ICsvMapReader; | |
import org.supercsv.prefs.CsvPreference; | |
import java.io.FileReader; | |
import java.io.IOException; | |
import java.util.Map; | |
import java.util.concurrent.*; | |
/** | |
* @author Bohdan Mushkevych | |
* date: 22 Mar 2012 | |
* Description: Method opens CSV file and streams it to workers for processing | |
*/ | |
public class CsvImportServer<T> extends Thread { | |
public static final String PROPERTY_LOG4J = "log4j.configuration"; | |
public static final int NUMBER_OF_CONCURRENT_THREADS = 5; | |
protected PoolManager<T> poolManager; | |
protected Logger logger; | |
protected String filePath; | |
protected String tableName; | |
protected String familyName; | |
protected Class<T> clazzDataModel; | |
protected BlockingQueue<Runnable> blockingQueue = new LinkedBlockingQueue<Runnable>(NUMBER_OF_CONCURRENT_THREADS * 11); | |
protected ExecutorService threadPool = new ThreadPoolExecutor(NUMBER_OF_CONCURRENT_THREADS, | |
NUMBER_OF_CONCURRENT_THREADS, | |
0L, | |
TimeUnit.MILLISECONDS, | |
blockingQueue); | |
static { | |
PropertyConfigurator.configure(System.getProperty(PROPERTY_LOG4J)); | |
} | |
public CsvImportServer(String filePath, String familyName, Class<T> clazzDataModel, String tableName) { | |
this.filePath = filePath; | |
this.familyName = familyName; | |
this.clazzDataModel = clazzDataModel; | |
this.tableName = tableName; | |
this.poolManager = TableContext.getPoolManager(tableName); | |
logger = Logger.getLogger(this.clazzDataModel.getSimpleName()); | |
logger.info(String.format("Started Synergy CVS Importer from %s for %s:%s", this.filePath, this.tableName, this.familyName)); | |
} | |
/** | |
* Algorithm: | |
* - Open CSV file and read CSV maps into thread-pool queue. | |
* - Queue must not be overloaded, as it causes OOM exceptions and degrades performance | |
*/ | |
public void run() { | |
Map<String, String> map; | |
ICsvMapReader reader = null; | |
try { | |
reader = new CsvMapReader(new FileReader(this.filePath), CsvPreference.STANDARD_PREFERENCE); | |
String[] header = reader.getCSVHeader(true); | |
map = reader.read(header); | |
while (map != null) { | |
try { | |
while (blockingQueue.remainingCapacity() < NUMBER_OF_CONCURRENT_THREADS) { | |
Thread.yield(); | |
} | |
threadPool.submit(new CsvImportWorker(map, familyName, poolManager)); | |
map = reader.read(header); | |
} catch (Exception e) { | |
logger.error("Exception at worker level", e); | |
} | |
} | |
logger.info(String.format("Imported %d events %n", reader.getLineNumber())); | |
shutdownAndFlush(); | |
} catch (IOException e) { | |
logger.error("Server side exception.", e); | |
} catch (Exception e) { | |
logger.error("Unexpected server side exception.", e); | |
} finally { | |
try { | |
if (reader != null) { | |
reader.close(); | |
} | |
} catch (IOException e) { | |
logger.error("Exception on closing CSV reader.", e); | |
} | |
} | |
} | |
/** | |
* Since we use both thread-pooling and resource-pooling, we must make sure they are properly disposed | |
* Thread-pooling is given 5 minutes for normal shut-down, while resource-pooling is default to 30 secs | |
* @throws InterruptedException the | |
*/ | |
void shutdownAndFlush() throws InterruptedException { | |
threadPool.shutdown(); // Disable new tasks from being submitted | |
try { | |
// Wait a while for existing tasks to terminate | |
if (!threadPool.awaitTermination(300, TimeUnit.SECONDS)) { | |
threadPool.shutdownNow(); // Cancel currently executing tasks | |
// Wait a while for tasks to respond to being cancelled | |
if (!threadPool.awaitTermination(60, TimeUnit.SECONDS)) { | |
logger.error("Thread Pool did not terminate"); | |
} | |
} | |
} catch (InterruptedException ie) { | |
// (Re-)Cancel if current thread also interrupted | |
threadPool.shutdownNow(); | |
logger.error("Interrupted Exception on Thread Pool closing. Re-shutting down."); | |
} | |
poolManager.flushTable(); | |
logger.info("HBase flush requested. Waiting to safely persist datum"); | |
while (poolManager.isFlushRequested()) { | |
Thread.yield(); | |
poolManager.flushTable(); | |
} | |
logger.info("HBase flush completed. Exiting"); | |
} | |
public static void main(String args[]) throws Exception { | |
if (args.length != 2) { | |
System.out.println("Wrong number of parameters. Should be csv_file_path column_name"); | |
return; | |
} | |
CsvImportServer<UserLog> server = new CsvImportServer<UserLog>(args[0], args[1], UserLog.class, Constants.TABLE_USER_LOG); | |
server.start(); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.reinvent.synergy.data.csvimport; | |
import com.reinvent.synergy.data.model.Constants; | |
import com.reinvent.synergy.data.rest.RequestHandler; | |
import com.reinvent.synergy.data.rest.UserLogPostHandler; | |
import com.reinvent.synergy.data.system.PoolManager; | |
import org.apache.hadoop.hbase.client.HTable; | |
import org.apache.hadoop.hbase.client.Put; | |
import org.apache.hadoop.hbase.util.Bytes; | |
import org.apache.log4j.Logger; | |
import java.util.Map; | |
/** | |
* @author Bohdan Mushkevych | |
* date 21 Mar 2012 | |
* Description: receives CSV mapping from Server and converts it into HBase Put | |
*/ | |
public class CsvImportWorker implements Runnable { | |
protected static Logger logger = Logger.getLogger(CsvImportWorker.class.getSimpleName()); | |
protected PoolManager poolManager; | |
protected Map<String, String> map; | |
public CsvImportWorker(Map<String, String> map, String strColumnFamily, PoolManager poolManager) { | |
this.map = map; | |
this.poolManager = poolManager; | |
} | |
public void run() { | |
HTable hTable = null; | |
try { | |
hTable = poolManager.getTable(); | |
if (map != null) { | |
Integer userId = Integer.valueOf(map.get(RequestHandler.PARAMETER_USER_ID)); | |
String timestamp = map.get(RequestHandler.PARAMETER_CREATED_AT); | |
Put putA = new Put(Bytes.toBytes(userId)); | |
// *** | |
// SOMETHING VERY USEFUL HERE | |
// *** | |
hTable.put(putA); | |
} | |
} catch (Exception e) { | |
logger.error("Unexpected error during processing"); | |
} finally { | |
if (hTable != null) { | |
poolManager.putTable(hTable); | |
} | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment