Skip to content

Instantly share code, notes, and snippets.

@HiroNakamura
Created October 19, 2024 23:40
Show Gist options
  • Save HiroNakamura/2327853061ada56f775f994d0e4ab07a to your computer and use it in GitHub Desktop.
Save HiroNakamura/2327853061ada56f775f994d0e4ab07a to your computer and use it in GitHub Desktop.
BigQuery y Java
import com.google.cloud.bigquery.*;
import com.google.cloud.storage.*;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.nio.channels.Channels;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
public class BigQueryFileUploader {
private static final String BUCKET_NAME = "tu_bucket";
private static final String DATASET_NAME = "tu_dataset";
private static final String TABLE_NAME = "TABLA_FILE";
private static final int THREAD_COUNT = 4; // Número de threads que deseas usar.
public static void main(String[] args) throws Exception {
String fileName = "tu_archivo.csv";
List<String[]> fileContent = readFileFromGCS(fileName);
// Cargar los datos en BigQuery usando múltiples hilos.
uploadToBigQuery(fileContent);
}
/**
* Lee el archivo CSV desde el bucket de GCS.
*/
private static List<String[]> readFileFromGCS(String fileName) throws Exception {
Storage storage = StorageOptions.getDefaultInstance().getService();
Blob blob = storage.get(BlobId.of(BUCKET_NAME, fileName));
List<String[]> fileContent = new ArrayList<>();
try (BufferedReader reader = new BufferedReader(new InputStreamReader(Channels.newInputStream(blob.reader())))) {
String line;
while ((line = reader.readLine()) != null) {
String[] values = line.split(","); // Separar valores por coma
fileContent.add(values);
}
}
return fileContent;
}
/**
* Carga los datos en BigQuery usando hilos (Threads).
*/
private static void uploadToBigQuery(List<String[]> fileContent) throws InterruptedException {
BigQuery bigquery = BigQueryOptions.getDefaultInstance().getService();
ExecutorService executorService = Executors.newFixedThreadPool(THREAD_COUNT);
int batchSize = fileContent.size() / THREAD_COUNT; // Dividir el trabajo en lotes para cada thread.
for (int i = 0; i < THREAD_COUNT; i++) {
int start = i * batchSize;
int end = (i == THREAD_COUNT - 1) ? fileContent.size() : (i + 1) * batchSize;
List<String[]> batch = fileContent.subList(start, end);
executorService.submit(() -> {
try {
insertBatchIntoBigQuery(batch, bigquery);
} catch (Exception e) {
e.printStackTrace();
}
});
}
executorService.shutdown();
executorService.awaitTermination(1, TimeUnit.HOURS);
}
/**
* Inserta un lote de registros en BigQuery.
*/
private static void insertBatchIntoBigQuery(List<String[]> batch, BigQuery bigquery) throws InterruptedException {
TableId tableId = TableId.of(DATASET_NAME, TABLE_NAME);
List<InsertAllRequest.RowToInsert> rows = new ArrayList<>();
for (String[] record : batch) {
rows.add(InsertAllRequest.RowToInsert.of(
record[0], // Asume que el primer valor es la clave
"campo1", record[0],
"campo2", record[1]
));
}
InsertAllRequest insertRequest = InsertAllRequest.newBuilder(tableId).setRows(rows).build();
InsertAllResponse response = bigquery.insertAll(insertRequest);
if (response.hasErrors()) {
System.out.println("Errores durante la inserción en BigQuery: " + response.getInsertErrors());
} else {
System.out.println("Batch insertado con éxito en BigQuery.");
}
}
}
import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.BigQueryOptions;
import com.google.cloud.bigquery.Job;
import com.google.cloud.bigquery.JobId;
import com.google.cloud.bigquery.QueryJobConfiguration;
import com.google.cloud.bigquery.TableResult;
import java.util.UUID;
public class BigQueryFunctionInvoker {
public static void main(String[] args) {
// Instancia del cliente de BigQuery
BigQuery bigquery = BigQueryOptions.getDefaultInstance().getService();
// Consulta SQL para invocar la función
String query = "SELECT myproject.mydataset.get_count() AS total_count";
// Configuración del trabajo de consulta
QueryJobConfiguration queryConfig = QueryJobConfiguration.newBuilder(query).build();
// Crear un JobId único para la consulta
JobId jobId = JobId.of(UUID.randomUUID().toString());
long totalCount = 0L;
// Ejecutar la consulta
Job queryJob = bigquery.create(JobInfo.newBuilder(queryConfig).setJobId(jobId).build());
// Esperar a que el Job termine
try {
queryJob = queryJob.waitFor();
if (queryJob == null) {
throw new RuntimeException("Job no encontrado.");
} else if (queryJob.getStatus().getError() != null) {
throw new RuntimeException(queryJob.getStatus().getError().toString());
}
// Obtener y manejar los resultados de la consulta
TableResult result = queryJob.getQueryResults();
result.iterateAll().forEach(row -> {
totalCount = row.get("total_count").getLongValue();
System.out.println("Total count: " + totalCount);
});
System.out.println("La función fue invocada con éxito.");
} catch (InterruptedException e) {
System.out.println("Error: " + e.getMessage());
Thread.currentThread().interrupt();
}
System.out.println("Total: "+totalCount);
}
}
import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.BigQueryOptions;
import com.google.cloud.bigquery.QueryJobConfiguration;
import com.google.cloud.bigquery.TableResult;
import com.google.cloud.bigquery.FieldValueList;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
public class MainRedes {
public static void main(String[] args) {
String nombre = "SECC02";
String fchReg = "2024-09-22";
try {
// Obtener la lista de Redes de BigQuery
List<Redes> listaRedes = getListaRedes(nombre);
// Si hay datos en la lista, iniciar el bucle
if (listaRedes.size() > 0) {
// Crear un pool de hilos para la concurrencia
ExecutorService executor = Executors.newFixedThreadPool(4);
// Procesar cada item de la lista de Redes
for (Redes mr_red : listaRedes) {
executor.submit(() -> {
try {
// Obtener el número de subredes desde BigQuery
int subRedes = getSubRedes(mr_red.getHostname());
// Ajustar el valor de subRedes
if (subRedes == 0) {
mr_red.setSubRedes(0);
} else {
mr_red.setSubRedes(subRedes);
}
// Eliminar los registros existentes con fchReg
deleteRed(fchReg);
// Insertar el nuevo registro de red
insertRed(mr_red);
} catch (Exception e) {
e.printStackTrace();
}
});
}
// Cerrar el pool de hilos y esperar a que terminen todos los trabajos
executor.shutdown();
executor.awaitTermination(1, TimeUnit.HOURS);
}
} catch (Exception e) {
e.printStackTrace();
}
}
// Obtener la lista de Redes desde BigQuery
public static List<Redes> getListaRedes(String nombre) throws Exception {
BigQuery bigQuery = BigQueryOptions.getDefaultInstance().getService();
String query = String.format("SELECT hostname, ip, dns, subredes FROM `tu_proyecto.tu_dataset.REDES` WHERE hostname = '%s'", nombre);
QueryJobConfiguration queryConfig = QueryJobConfiguration.newBuilder(query).build();
TableResult result = bigQuery.query(queryConfig);
List<Redes> listaRedes = new ArrayList<>();
for (FieldValueList row : result.iterateAll()) {
Redes red = new Redes(row.get("hostname").getStringValue(),
row.get("ip").getStringValue(),
row.get("dns").getStringValue(),
row.get("subredes").getLongValue());
listaRedes.add(red);
}
return listaRedes;
}
// Obtener el número de subRedes desde BigQuery
public static int getSubRedes(String hostname) throws Exception {
BigQuery bigQuery = BigQueryOptions.getDefaultInstance().getService();
String query = String.format("SELECT COUNT(sub_redes) as subredes FROM `tu_proyecto.tu_dataset.VPN` WHERE hostname = '%s'", hostname);
QueryJobConfiguration queryConfig = QueryJobConfiguration.newBuilder(query).build();
TableResult result = bigQuery.query(queryConfig);
int subRedes = 0;
for (FieldValueList row : result.iterateAll()) {
subRedes = (int) row.get("subredes").getLongValue();
}
return subRedes;
}
// Eliminar registros de REDES por fecha de registro
public static void deleteRed(String fchReg) throws Exception {
BigQuery bigQuery = BigQueryOptions.getDefaultInstance().getService();
String deleteQuery = String.format("DELETE FROM `tu_proyecto.tu_dataset.REDES` WHERE fchreg = '%s'", fchReg);
QueryJobConfiguration deleteConfig = QueryJobConfiguration.newBuilder(deleteQuery).build();
bigQuery.query(deleteConfig);
System.out.println("Registros eliminados para la fecha: " + fchReg);
}
// Insertar registros en REDES
public static void insertRed(Redes red) throws Exception {
BigQuery bigQuery = BigQueryOptions.getDefaultInstance().getService();
String insertQuery = String.format("INSERT INTO `tu_proyecto.tu_dataset.REDES` (hostname, ip, dns, subredes) " +
"VALUES ('%s', '%s', '%s', %d)",
red.getHostname(), red.getIp(), red.getDns(), red.getSubRedes());
QueryJobConfiguration insertConfig = QueryJobConfiguration.newBuilder(insertQuery).build();
bigQuery.query(insertConfig);
System.out.println("Registro insertado: " + red);
}
}
// Clase que representa el modelo de Red
class Redes {
private String hostname;
private String ip;
private String dns;
private long subRedes;
public Redes(String hostname, String ip, String dns, long subRedes) {
this.hostname = hostname;
this.ip = ip;
this.dns = dns;
this.subRedes = subRedes;
}
public String getHostname() {
return hostname;
}
public String getIp() {
return ip;
}
public String getDns() {
return dns;
}
public long getSubRedes() {
return subRedes;
}
public void setSubRedes(long subRedes) {
this.subRedes = subRedes;
}
@Override
public String toString() {
return String.format("Hostname: %s, IP: %s, DNS: %s, SubRedes: %d", hostname, ip, dns, subRedes);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment