Last active
March 21, 2025 03:51
-
-
Save mbyrne00/8cd63f17b8c11100bf022779be7f0c94 to your computer and use it in GitHub Desktop.
Load one or more CSVs with a header row to BigQuery, using the file name as the table name. Script to help quickly get data into BigQuery that you can then transform later.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env bash | |
set -e | |
this_dir="$( cd "$( dirname "${BASH_SOURCE[0]}" )" >/dev/null && pwd )" | |
# Default values | |
location="australia-southeast1" | |
schema_overrides=() | |
dupe_col_suffix="__dupe" | |
require_gcloud_auth() { | |
if gcloud auth print-access-token &>/dev/null; then | |
echo " ✅ Authenticated with gcloud" | |
else | |
echo "No valid authentication found. Running gcloud auth login..." | |
gcloud auth login | |
fi | |
} | |
usage() { | |
echo "=====================" | |
echo "LOAD CSVS TO BIGQUERY" | |
echo "=====================" | |
echo | |
echo "This script loads CSV files in raw state to BigQuery tables with all columns defined as string type by default." | |
echo "This assumes that your first row is a header row. All generated table and column names are lowercased, spaces" | |
echo "replaced with '_' and all remaining non-alpha-numeric chars removed that aren't '_'. If they are camelCase, they" | |
echo "will crudely be converted to snake_case." | |
echo | |
echo "Schema can be overridden per field if you want any specific type. For example, to override a column named" | |
echo "'structures' to be of type 'json', you can use --overrides structures:json. This option supports a comma-separated" | |
echo "list of overrides (without spaces)." | |
echo | |
echo "Usage, one of below: " | |
echo " $0 -p <project_id> -d <dataset_name> [-o <column:type>] <csv_file1> [<csv_file2> ...]" | |
echo " $0 --project <project_id> --dataset <dataset_name> [--overrides <column:type>] <csv_file1> [<csv_file2> ...]" | |
echo | |
echo "Examples: " | |
echo " $0 -p my-sandbox-project-123 -d my_test_dataset *.csv" | |
echo " $0 --project my-sandbox-project-123 --dataset my_test_dataset --overrides structures:json csv1.csv csv2.csv csv3.csv" | |
exit 1 | |
} | |
cleanse_name_allow_comma() { | |
local input | |
if [ -t 0 ]; then | |
input="$1" | |
else | |
input=$(cat) | |
fi | |
if [[ "$input" =~ [a-z] ]]; then | |
# Crude camel case to snake case ... will be lower cased next step | |
input="$(echo "$input" | sed 's/\([a-z]\)\([A-Z]\)/\1_\2/g')" | |
fi | |
echo "$input" | tr '[:upper:]' '[:lower:]' | tr ' ' '_' | tr '-' '_' | tr -cd '[:alnum:]_,' | |
} | |
cleanse_name() { | |
local input | |
if [ -t 0 ]; then | |
input="$1" | |
else | |
input=$(cat) | |
fi | |
echo "$input" | cleanse_name_allow_comma | tr -d ',' | |
} | |
get_table_name() { | |
file=$1 | |
echo "$(cleanse_name "$(basename "${file%.csv}")")" | |
} | |
to_qualified_table_name() { | |
echo " ${bq_project}.${dataset_name}.${1}" | |
} | |
get_schema() { | |
local header=$(head -n 1 "$1" | tr -d "\r" | cleanse_name_allow_comma) | |
local col_names=() | |
local unique_col_names=() | |
IFS=',' read -ra col_names <<< "$header" | |
for col_name in "${col_names[@]}"; do | |
if [[ " ${unique_col_names[@]} " =~ " ${col_name} " ]]; then | |
col_name="${col_name}${dupe_col_suffix}" | |
fi | |
unique_col_names+=("$col_name") | |
done | |
IFS=',' | |
local joined_col_names="${unique_col_names[*]}" | |
unset IFS | |
local schema=$(echo "${joined_col_names}" | sed 's/,/:string,/g' | awk '{print $0 ":string"}') | |
for override in "${schema_overrides[@]}"; do | |
column=$(echo "$override" | cut -d':' -f1) | |
type=$(echo "$override" | cut -d':' -f2) | |
schema=$(echo "$schema" | sed "s/${column}:string/${column}:${type}/") | |
done | |
echo "$schema" | |
} | |
# Function to handle script termination | |
cleanup() { | |
echo "Terminating background processes..." | |
pkill -P $$ | |
wait | |
echo "Cleanup complete." | |
} | |
# Trap SIGINT (Ctrl + C) and call cleanup | |
trap cleanup SIGINT | |
test_mode=false | |
# Parse arguments | |
while [[ "$#" -gt 0 ]]; do | |
case $1 in | |
-t|--test) test_mode=true ;; | |
-p|--project) bq_project="$2"; shift ;; | |
-d|--dataset) dataset_name="$2"; shift ;; | |
-o|--override) | |
IFS=',' read -ra overrides <<< "$2" | |
for override in "${overrides[@]}"; do | |
schema_overrides+=("$override") | |
done | |
shift | |
;; | |
*) csv_files+=("$1") ;; | |
esac | |
shift | |
done | |
# Validate that project and dataset arguments are provided | |
if [ -z "$bq_project" ] || [ -z "$dataset_name" ]; then | |
usage | |
fi | |
# Validate that there is at least one CSV file | |
if [ ${#csv_files[@]} -eq 0 ]; then | |
usage | |
fi | |
echo "Processing CSV files in parallel:" | |
for file in "${csv_files[@]}"; do | |
file_size=$(stat -c%s "$file" | numfmt --to=iec) | |
table_name=$(get_table_name $file) | |
schema=$(get_schema $file) | |
echo " File: ${file}" | |
echo " Size: ${file_size}" | |
echo " Table: $(to_qualified_table_name $table_name)" | |
echo " Schema: ${schema}" | |
dupe_count=$(echo "$schema" | grep -o "${dupe_col_suffix}" | wc -l) | |
if [ $dupe_count -gt 0 ]; then | |
echo " ⚠️ ${dupe_count} duplicate columns detected and '${dupe_col_suffix}' suffix added." | |
fi | |
echo | |
done | |
if [ "$test_mode" = true ]; then | |
echo "Test mode enabled. No files will be loaded to BigQuery." | |
exit 0 | |
fi | |
require_gcloud_auth | |
if bq --project_id=${bq_project} ls --location=${location} --datasets | grep -q "${dataset_name}"; then | |
echo " ✅ Dataset ${bq_project}.${dataset_name} exists." | |
else | |
echo "Creating dataset ${dataset_name} ..." | |
bq --project_id=${bq_project} mk --location=${location} ${dataset_name} | |
fi | |
echo | |
# Iterate again and do work. Separate iteration stops parallel processes polluting our output above. | |
for file in "${csv_files[@]}"; do | |
table_name=$(get_table_name $file) | |
bq load \ | |
--source_format=CSV \ | |
--skip_leading_rows=1 \ | |
--schema=$(get_schema $file) \ | |
--project_id=${bq_project} \ | |
${bq_project}:${dataset_name}.${table_name} \ | |
${file} & | |
done | |
# Wait for all background processes to finish | |
echo | |
echo "Waiting for all BigQuery load jobs to finish. This can take some time ...." | |
wait | |
echo | |
echo " ✅ All CSV files successfully loaded to BigQuery dataset ${bq_project}.${dataset_name}" |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Probably starting to get a bit large for a bash script vs python, but does the job.