Created
August 26, 2014 19:28
-
-
Save hernanliendo/9a4d9c592a3feb6060cf to your computer and use it in GitHub Desktop.
Hackademy - BigQueryClient
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package ar.com.zupcat.lib.bean.audit; | |
import ar.com.zupcat.lib.bean.IClosure; | |
import ar.com.zupcat.lib.bean.RetryingExecutor; | |
import ar.com.zupcat.lib.bean.enums.ErrorType; | |
import ar.com.zupcat.lib.exception.GAEException; | |
import ar.com.zupcat.lib.util.RandomUtils; | |
import com.google.api.client.util.Data; | |
import com.google.api.services.bigquery.Bigquery; | |
import com.google.api.services.bigquery.model.*; | |
import java.io.IOException; | |
import java.util.*; | |
public final class BigQueryClient { | |
private final String projectNumberId; | |
private final Bigquery bigquery; | |
protected BigQueryClient(final String _projectNumberId, final Bigquery _bigquery) { | |
this.projectNumberId = _projectNumberId; | |
this.bigquery = _bigquery; | |
} | |
public void createDatasetAndTable(final String datasetName, final String tableName, final LinkedHashMap<String, String> fields) { | |
try { | |
final Dataset dataset = new Dataset(); | |
final DatasetReference datasetRef = new DatasetReference(); | |
datasetRef.setProjectId(projectNumberId); | |
datasetRef.setDatasetId(datasetName); | |
dataset.setDatasetReference(datasetRef); | |
bigquery.datasets().insert(projectNumberId, dataset).execute(); | |
final TableSchema schema = new TableSchema(); | |
final List<TableFieldSchema> tableFieldSchema = new ArrayList<>(); | |
for (final Map.Entry<String, String> entry : fields.entrySet()) { | |
final TableFieldSchema schemaEntry = new TableFieldSchema(); | |
schemaEntry.setName(entry.getKey()); | |
schemaEntry.setType(entry.getValue()); | |
tableFieldSchema.add(schemaEntry); | |
} | |
schema.setFields(tableFieldSchema); | |
final Table table = new Table(); | |
table.setSchema(schema); | |
final TableReference tableRef = new TableReference(); | |
tableRef.setDatasetId(datasetName); | |
tableRef.setProjectId(projectNumberId); | |
tableRef.setTableId(tableName); | |
table.setTableReference(tableRef); | |
bigquery.tables().insert(projectNumberId, datasetName, table).execute(); | |
} catch (final Exception _exception) { | |
throw new GAEException(ErrorType.PROGRAMMING, _exception); | |
} | |
} | |
private void saveRows(final List<TableRow> rows, final List<Map<String, Object>> result) { | |
if (rows != null) { | |
for (final TableRow row : rows) { | |
int i = 0; | |
final Map<String, Object> map = new HashMap<>(); | |
result.add(map); | |
for (final TableCell cell : row.getF()) { | |
map.put("" + i, Data.isNull(cell.getV()) ? "null" : cell.getV().toString()); | |
i++; | |
} | |
} | |
} | |
} | |
public List<Map<String, Object>> executeQuery(final String query) { | |
final List<Map<String, Object>> result = new ArrayList<>(); | |
try { | |
final QueryRequest queryRequest = new QueryRequest().setQuery(query); | |
final QueryResponse queryResponse = bigquery.jobs().query(projectNumberId, queryRequest).execute(); | |
if (queryResponse.getJobComplete()) { | |
saveRows(queryResponse.getRows(), result); | |
if (null == queryResponse.getPageToken()) { | |
return result; | |
} | |
} | |
// This loop polls until results are present, then loops over result pages. | |
String pageToken = null; | |
while (true) { | |
final GetQueryResultsResponse queryResults = bigquery.jobs() | |
.getQueryResults(projectNumberId, queryResponse.getJobReference() | |
.getJobId()) | |
.setPageToken(pageToken).execute(); | |
if (queryResults.getJobComplete()) { | |
saveRows(queryResults.getRows(), result); | |
pageToken = queryResults.getPageToken(); | |
if (null == pageToken || queryResults.getRows() == null) { | |
return result; | |
} | |
} | |
} | |
} catch (final Exception _exception) { | |
throw new GAEException(ErrorType.PROGRAMMING, _exception); | |
} | |
} | |
public TableDataInsertAllResponse insertRow(final String datasetName, final String tableName, final Map<String, Object> rowValues) { | |
final List<Map<String, Object>> items = new ArrayList<>(); | |
items.add(rowValues); | |
return insertRows(datasetName, tableName, items); | |
} | |
public TableDataInsertAllResponse insertRows(final String datasetName, final String tableName, final List<Map<String, Object>> rowValues) { | |
try { | |
final List<TableDataInsertAllRequest.Rows> rowList = new ArrayList<>(rowValues.size()); | |
long randomValue = RandomUtils.getInstance().getRandomLong() - 10000; | |
for (final Map<String, Object> rowValue : rowValues) { | |
final TableRow row = new TableRow(); | |
for (final Map.Entry<String, Object> entry : rowValue.entrySet()) { | |
row.set(entry.getKey(), entry.getValue()); | |
} | |
final TableDataInsertAllRequest.Rows rows = new TableDataInsertAllRequest.Rows(); | |
rows.setInsertId("" + randomValue); | |
rows.setJson(row); | |
rowList.add(rows); | |
randomValue++; | |
} | |
final TableDataInsertAllRequest content = new TableDataInsertAllRequest().setRows(rowList); | |
final TableDataInsertAllResponse[] responses = new TableDataInsertAllResponse[1]; | |
responses[0] = null; | |
final RetryingExecutor retryingExecutor = new RetryingExecutor(5, 1000, new IClosure() { | |
@Override | |
public void execute(final Object params) throws Exception { | |
responses[0] = bigquery.tabledata().insertAll(projectNumberId, datasetName, tableName, content).execute(); | |
} | |
}, null); | |
retryingExecutor.startExecution(); | |
return responses[0]; | |
} catch (final Exception _exception) { | |
throw new GAEException(ErrorType.PROGRAMMING, _exception); | |
} | |
} | |
public List<String> getAllTableNames() { | |
final List<String> result = new ArrayList<>(); | |
try { | |
final Bigquery.Datasets.List datasetRequest = bigquery.datasets().list(projectNumberId); | |
final DatasetList datasetList = datasetRequest.execute(); | |
for (final DatasetList.Datasets dataset : datasetList.getDatasets()) { | |
result.add(dataset.getDatasetReference().getDatasetId()); | |
} | |
} catch (final IOException _ioException) { | |
throw new GAEException(ErrorType.PROGRAMMING, _ioException); | |
} | |
return result; | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment