Last active
December 2, 2023 19:52
-
-
Save cmaggiulli/37b052682d864e7faf4f4dfbe9459b50 to your computer and use it in GitHub Desktop.
A groovy script that will dump all Google BigQuery data to JSON files on the system local to the process
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* Export and download data from Google BigQuery to Google Cloud Storage. | |
* | |
* @param credentialsPath The path to the JSON key file for Google Cloud authentication. | |
* @param gcsBucket The Google Cloud Storage bucket name. | |
* @param projectId The Google Big Query projectId. | |
* @param destinationPath The path for the resulting JSON files. | |
* | |
* @author Chris Maggiulli ([email protected]) | |
*/ | |
def exportAndDownloadFromBigQueryToGCS(credentialsPath, gcsBucket, projectId, destinationPath) { | |
println("Script started at: " + new Date()) | |
// Load credentials from JSON key file. | |
GoogleCredentials credentials | |
File credentialsFile = new File(credentialsPath) | |
try (FileInputStream serviceAccountStream = new FileInputStream(credentialsFile)) { | |
credentials = ServiceAccountCredentials.fromStream(serviceAccountStream) | |
} | |
// Instantiate BigQuery client. | |
BigQuery bigquery = BigQueryOptions.newBuilder() | |
.setCredentials(credentials) | |
.setProjectId(projectId) | |
.build() | |
.getService() | |
// Instantiate Storage client. | |
Storage storage = StorageOptions.newBuilder() | |
.setCredentials(credentials) | |
.setProjectId(projectId) | |
.build() | |
.getService() | |
def format = "NEWLINE_DELIMITED_JSON" | |
// Iterate through datasets in BigQuery. | |
for (Dataset dataset : bigquery.listDatasets().iterateAll()) { | |
println("Processing Dataset: " + dataset.getDatasetId().getDataset()) | |
// Iterate through tables in the dataset. | |
def tables = dataset.list() | |
for (Table table : tables.iterateAll()) { | |
try { | |
def tableName = table.getTableId().getTable() | |
println("Processing Table: " + tableName) | |
// Generate GCS URL for export. | |
def gcsUrl = "gs://" + gcsBucket + "/" + tableName + ".json" | |
// Export to GCS. | |
job = table.extract(format, gcsUrl) | |
try { | |
completedJob = job.waitFor(RetryOption.initialRetryDelay(Duration.ofSeconds(1)), | |
RetryOption.totalTimeout(Duration.ofMinutes(3))) | |
// Check export status. | |
if (completedJob != null && completedJob.getStatus().getError() == null) { | |
println("Table " + tableName + " successfully exported to GCS") | |
} else { | |
println("Table " + tableName + " failed to export to GCS with error: " + completedJob.getStatus().getError()) | |
continue // Skip the download if export fails | |
} | |
} catch (InterruptedException e) { | |
println("Error waiting for job completion: " + e) | |
continue // Skip the download if export fails | |
} | |
// Download from GCS. | |
objectName = table.getTableId().getTable() + ".json" | |
println("Downloading " + objectName + " from GCS to local directory") | |
// Retrieve blob from GCS. | |
blob = storage.get(BlobId.of(gcsBucket, objectName)) | |
newFilePath = Paths.get(destinationPath + objectName) | |
tempFileTo = Files.createFile(newFilePath) | |
// Download blob to local file. | |
blob.downloadTo(tempFileTo) | |
println("Finished downloading " + objectName) | |
} catch (e) { | |
println("Failed to process table " + table.getTableId().getTable() + ": " + e) | |
} | |
} | |
} | |
println("Script completed at: " + new Date()) | |
println("Script runtime: " + (System.currentTimeMillis() - startTime) + " milliseconds") | |
} | |
// Example usage with externalized credentials path, GCS bucket, project ID, and destination path. | |
def credentialsPath = "./fake-credentials.json" | |
def gcsBucket = "fake-bucket-name" | |
def projectId = "fakeprojectid-131113" | |
def destinationPath = "./" | |
def startTime = System.currentTimeMillis() | |
exportAndDownloadFromBigQueryToGCS(credentialsPath, gcsBucket, projectId, destinationPath) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
pipeline { | |
agent any | |
environment { | |
CREDENTIALS_PATH = './fake-credentials.json' | |
GCS_BUCKET = 'fake-bucket-name' | |
PROJECT_ID = 'fakeprojectid-131113' | |
DESTINATION_PATH = './' | |
} | |
stages { | |
stage('Execute Script') { | |
steps { | |
script { | |
def startTime = System.currentTimeMillis() | |
echo "Script started at: ${currentBuild.startTimeInMillis}" | |
// Calling the externalized function | |
exportAndDownloadFromBigQueryToGCS(CREDENTIALS_PATH, GCS_BUCKET, PROJECT_ID, DESTINATION_PATH) | |
echo "Script completed at: ${currentBuild.startTimeInMillis}" | |
echo "Script runtime: ${System.currentTimeMillis() - startTime} milliseconds" | |
} | |
} | |
} | |
} | |
} | |
def exportAndDownloadFromBigQueryToGCS(credentialsPath, gcsBucket, projectId, destinationPath) { | |
// The actual script content (excluding imports and initial print statements) | |
GoogleCredentials credentials | |
File credentialsFile = new File(credentialsPath) | |
try (FileInputStream serviceAccountStream = new FileInputStream(credentialsFile)) { | |
credentials = ServiceAccountCredentials.fromStream(serviceAccountStream) | |
} | |
BigQuery bigquery = BigQueryOptions.newBuilder() | |
.setCredentials(credentials) | |
.setProjectId(projectId) | |
.build() | |
.getService() | |
Storage storage = StorageOptions.newBuilder() | |
.setCredentials(credentials) | |
.setProjectId(projectId) | |
.build() | |
.getService() | |
def format = "NEWLINE_DELIMITED_JSON" | |
for (Dataset dataset : bigquery.listDatasets().iterateAll()) { | |
echo "Processing Dataset: ${dataset.getDatasetId().getDataset()}" | |
def tables = dataset.list() | |
for (Table table : tables.iterateAll()) { | |
try { | |
def tableName = table.getTableId().getTable() | |
echo "Processing Table: ${tableName}" | |
def gcsUrl = "gs://${gcsBucket}/${tableName}.json" | |
job = table.extract(format, gcsUrl) | |
try { | |
completedJob = job.waitFor(RetryOption.initialRetryDelay(Duration.ofSeconds(1)), | |
RetryOption.totalTimeout(Duration.ofMinutes(3))) | |
if (completedJob != null && completedJob.getStatus().getError() == null) { | |
echo "Table ${tableName} successfully exported to GCS" | |
} else { | |
echo "Table ${tableName} failed to export to GCS with error: ${completedJob.getStatus().getError()}" | |
continue | |
} | |
} catch (InterruptedException e) { | |
echo "Error waiting for job completion: ${e}" | |
continue | |
} | |
objectName = "${table.getTableId().getTable()}.json" | |
echo "Downloading ${objectName} from GCS to local directory" | |
blob = storage.get(BlobId.of(gcsBucket, objectName)) | |
newFilePath = Paths.get("${destinationPath}/${objectName}") | |
tempFileTo = Files.createFile(newFilePath) | |
blob.downloadTo(tempFileTo) | |
echo "Finished downloading ${objectName}" | |
} catch (e) { | |
echo "Failed to process table ${table.getTableId().getTable()}: ${e}" | |
} | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment