Created
October 19, 2024 23:40
-
-
Save HiroNakamura/2327853061ada56f775f994d0e4ab07a to your computer and use it in GitHub Desktop.
BigQuery y Java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import com.google.cloud.bigquery.*; | |
import com.google.cloud.storage.*; | |
import java.io.BufferedReader; | |
import java.io.InputStreamReader; | |
import java.nio.channels.Channels; | |
import java.util.ArrayList; | |
import java.util.List; | |
import java.util.concurrent.ExecutorService; | |
import java.util.concurrent.Executors; | |
import java.util.concurrent.TimeUnit; | |
public class BigQueryFileUploader { | |
private static final String BUCKET_NAME = "tu_bucket"; | |
private static final String DATASET_NAME = "tu_dataset"; | |
private static final String TABLE_NAME = "TABLA_FILE"; | |
private static final int THREAD_COUNT = 4; // Número de threads que deseas usar. | |
public static void main(String[] args) throws Exception { | |
String fileName = "tu_archivo.csv"; | |
List<String[]> fileContent = readFileFromGCS(fileName); | |
// Cargar los datos en BigQuery usando múltiples hilos. | |
uploadToBigQuery(fileContent); | |
} | |
/** | |
* Lee el archivo CSV desde el bucket de GCS. | |
*/ | |
private static List<String[]> readFileFromGCS(String fileName) throws Exception { | |
Storage storage = StorageOptions.getDefaultInstance().getService(); | |
Blob blob = storage.get(BlobId.of(BUCKET_NAME, fileName)); | |
List<String[]> fileContent = new ArrayList<>(); | |
try (BufferedReader reader = new BufferedReader(new InputStreamReader(Channels.newInputStream(blob.reader())))) { | |
String line; | |
while ((line = reader.readLine()) != null) { | |
String[] values = line.split(","); // Separar valores por coma | |
fileContent.add(values); | |
} | |
} | |
return fileContent; | |
} | |
/** | |
* Carga los datos en BigQuery usando hilos (Threads). | |
*/ | |
private static void uploadToBigQuery(List<String[]> fileContent) throws InterruptedException { | |
BigQuery bigquery = BigQueryOptions.getDefaultInstance().getService(); | |
ExecutorService executorService = Executors.newFixedThreadPool(THREAD_COUNT); | |
int batchSize = fileContent.size() / THREAD_COUNT; // Dividir el trabajo en lotes para cada thread. | |
for (int i = 0; i < THREAD_COUNT; i++) { | |
int start = i * batchSize; | |
int end = (i == THREAD_COUNT - 1) ? fileContent.size() : (i + 1) * batchSize; | |
List<String[]> batch = fileContent.subList(start, end); | |
executorService.submit(() -> { | |
try { | |
insertBatchIntoBigQuery(batch, bigquery); | |
} catch (Exception e) { | |
e.printStackTrace(); | |
} | |
}); | |
} | |
executorService.shutdown(); | |
executorService.awaitTermination(1, TimeUnit.HOURS); | |
} | |
/** | |
* Inserta un lote de registros en BigQuery. | |
*/ | |
private static void insertBatchIntoBigQuery(List<String[]> batch, BigQuery bigquery) throws InterruptedException { | |
TableId tableId = TableId.of(DATASET_NAME, TABLE_NAME); | |
List<InsertAllRequest.RowToInsert> rows = new ArrayList<>(); | |
for (String[] record : batch) { | |
rows.add(InsertAllRequest.RowToInsert.of( | |
record[0], // Asume que el primer valor es la clave | |
"campo1", record[0], | |
"campo2", record[1] | |
)); | |
} | |
InsertAllRequest insertRequest = InsertAllRequest.newBuilder(tableId).setRows(rows).build(); | |
InsertAllResponse response = bigquery.insertAll(insertRequest); | |
if (response.hasErrors()) { | |
System.out.println("Errores durante la inserción en BigQuery: " + response.getInsertErrors()); | |
} else { | |
System.out.println("Batch insertado con éxito en BigQuery."); | |
} | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import com.google.cloud.bigquery.BigQuery; | |
import com.google.cloud.bigquery.BigQueryOptions; | |
import com.google.cloud.bigquery.Job; | |
import com.google.cloud.bigquery.JobId; | |
import com.google.cloud.bigquery.QueryJobConfiguration; | |
import com.google.cloud.bigquery.TableResult; | |
import java.util.UUID; | |
public class BigQueryFunctionInvoker { | |
public static void main(String[] args) { | |
// Instancia del cliente de BigQuery | |
BigQuery bigquery = BigQueryOptions.getDefaultInstance().getService(); | |
// Consulta SQL para invocar la función | |
String query = "SELECT myproject.mydataset.get_count() AS total_count"; | |
// Configuración del trabajo de consulta | |
QueryJobConfiguration queryConfig = QueryJobConfiguration.newBuilder(query).build(); | |
// Crear un JobId único para la consulta | |
JobId jobId = JobId.of(UUID.randomUUID().toString()); | |
long totalCount = 0L; | |
// Ejecutar la consulta | |
Job queryJob = bigquery.create(JobInfo.newBuilder(queryConfig).setJobId(jobId).build()); | |
// Esperar a que el Job termine | |
try { | |
queryJob = queryJob.waitFor(); | |
if (queryJob == null) { | |
throw new RuntimeException("Job no encontrado."); | |
} else if (queryJob.getStatus().getError() != null) { | |
throw new RuntimeException(queryJob.getStatus().getError().toString()); | |
} | |
// Obtener y manejar los resultados de la consulta | |
TableResult result = queryJob.getQueryResults(); | |
result.iterateAll().forEach(row -> { | |
totalCount = row.get("total_count").getLongValue(); | |
System.out.println("Total count: " + totalCount); | |
}); | |
System.out.println("La función fue invocada con éxito."); | |
} catch (InterruptedException e) { | |
System.out.println("Error: " + e.getMessage()); | |
Thread.currentThread().interrupt(); | |
} | |
System.out.println("Total: "+totalCount); | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import com.google.cloud.bigquery.BigQuery; | |
import com.google.cloud.bigquery.BigQueryOptions; | |
import com.google.cloud.bigquery.QueryJobConfiguration; | |
import com.google.cloud.bigquery.TableResult; | |
import com.google.cloud.bigquery.FieldValueList; | |
import java.util.ArrayList; | |
import java.util.List; | |
import java.util.concurrent.ExecutorService; | |
import java.util.concurrent.Executors; | |
import java.util.concurrent.TimeUnit; | |
public class MainRedes { | |
public static void main(String[] args) { | |
String nombre = "SECC02"; | |
String fchReg = "2024-09-22"; | |
try { | |
// Obtener la lista de Redes de BigQuery | |
List<Redes> listaRedes = getListaRedes(nombre); | |
// Si hay datos en la lista, iniciar el bucle | |
if (listaRedes.size() > 0) { | |
// Crear un pool de hilos para la concurrencia | |
ExecutorService executor = Executors.newFixedThreadPool(4); | |
// Procesar cada item de la lista de Redes | |
for (Redes mr_red : listaRedes) { | |
executor.submit(() -> { | |
try { | |
// Obtener el número de subredes desde BigQuery | |
int subRedes = getSubRedes(mr_red.getHostname()); | |
// Ajustar el valor de subRedes | |
if (subRedes == 0) { | |
mr_red.setSubRedes(0); | |
} else { | |
mr_red.setSubRedes(subRedes); | |
} | |
// Eliminar los registros existentes con fchReg | |
deleteRed(fchReg); | |
// Insertar el nuevo registro de red | |
insertRed(mr_red); | |
} catch (Exception e) { | |
e.printStackTrace(); | |
} | |
}); | |
} | |
// Cerrar el pool de hilos y esperar a que terminen todos los trabajos | |
executor.shutdown(); | |
executor.awaitTermination(1, TimeUnit.HOURS); | |
} | |
} catch (Exception e) { | |
e.printStackTrace(); | |
} | |
} | |
// Obtener la lista de Redes desde BigQuery | |
public static List<Redes> getListaRedes(String nombre) throws Exception { | |
BigQuery bigQuery = BigQueryOptions.getDefaultInstance().getService(); | |
String query = String.format("SELECT hostname, ip, dns, subredes FROM `tu_proyecto.tu_dataset.REDES` WHERE hostname = '%s'", nombre); | |
QueryJobConfiguration queryConfig = QueryJobConfiguration.newBuilder(query).build(); | |
TableResult result = bigQuery.query(queryConfig); | |
List<Redes> listaRedes = new ArrayList<>(); | |
for (FieldValueList row : result.iterateAll()) { | |
Redes red = new Redes(row.get("hostname").getStringValue(), | |
row.get("ip").getStringValue(), | |
row.get("dns").getStringValue(), | |
row.get("subredes").getLongValue()); | |
listaRedes.add(red); | |
} | |
return listaRedes; | |
} | |
// Obtener el número de subRedes desde BigQuery | |
public static int getSubRedes(String hostname) throws Exception { | |
BigQuery bigQuery = BigQueryOptions.getDefaultInstance().getService(); | |
String query = String.format("SELECT COUNT(sub_redes) as subredes FROM `tu_proyecto.tu_dataset.VPN` WHERE hostname = '%s'", hostname); | |
QueryJobConfiguration queryConfig = QueryJobConfiguration.newBuilder(query).build(); | |
TableResult result = bigQuery.query(queryConfig); | |
int subRedes = 0; | |
for (FieldValueList row : result.iterateAll()) { | |
subRedes = (int) row.get("subredes").getLongValue(); | |
} | |
return subRedes; | |
} | |
// Eliminar registros de REDES por fecha de registro | |
public static void deleteRed(String fchReg) throws Exception { | |
BigQuery bigQuery = BigQueryOptions.getDefaultInstance().getService(); | |
String deleteQuery = String.format("DELETE FROM `tu_proyecto.tu_dataset.REDES` WHERE fchreg = '%s'", fchReg); | |
QueryJobConfiguration deleteConfig = QueryJobConfiguration.newBuilder(deleteQuery).build(); | |
bigQuery.query(deleteConfig); | |
System.out.println("Registros eliminados para la fecha: " + fchReg); | |
} | |
// Insertar registros en REDES | |
public static void insertRed(Redes red) throws Exception { | |
BigQuery bigQuery = BigQueryOptions.getDefaultInstance().getService(); | |
String insertQuery = String.format("INSERT INTO `tu_proyecto.tu_dataset.REDES` (hostname, ip, dns, subredes) " + | |
"VALUES ('%s', '%s', '%s', %d)", | |
red.getHostname(), red.getIp(), red.getDns(), red.getSubRedes()); | |
QueryJobConfiguration insertConfig = QueryJobConfiguration.newBuilder(insertQuery).build(); | |
bigQuery.query(insertConfig); | |
System.out.println("Registro insertado: " + red); | |
} | |
} | |
// Clase que representa el modelo de Red | |
class Redes { | |
private String hostname; | |
private String ip; | |
private String dns; | |
private long subRedes; | |
public Redes(String hostname, String ip, String dns, long subRedes) { | |
this.hostname = hostname; | |
this.ip = ip; | |
this.dns = dns; | |
this.subRedes = subRedes; | |
} | |
public String getHostname() { | |
return hostname; | |
} | |
public String getIp() { | |
return ip; | |
} | |
public String getDns() { | |
return dns; | |
} | |
public long getSubRedes() { | |
return subRedes; | |
} | |
public void setSubRedes(long subRedes) { | |
this.subRedes = subRedes; | |
} | |
@Override | |
public String toString() { | |
return String.format("Hostname: %s, IP: %s, DNS: %s, SubRedes: %d", hostname, ip, dns, subRedes); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment