Last active
May 31, 2022 20:02
-
-
Save michimau/c7cbf104651d1d1ddbff48c77435dd2b to your computer and use it in GitHub Desktop.
gis data ingestion into citus postgresql cluster for nomad job usage
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/bin/bash | |
DBHOST=$NOMAD_META_db_host | |
DBNAME=$NOMAD_META_db_name | |
DBPORT=$NOMAD_META_db_port | |
DBUSER=$NOMAD_META_db_user | |
DBPASSWORD=$NOMAD_META_db_password | |
DBSCHEMA=$NOMAD_META_db_schema | |
FILEURL=$NOMAD_META_fileurl | |
STARTTIME=$(date +%s) | |
PGPASSWORD=$DBPASSWORD psql -q -U $DBUSER -p $DBPORT -h $DBHOST -d $DBNAME -c "CREATE TABLE IF NOT EXISTS public.nomad_job_imports_log (nomad_alloc_id varchar NULL, starttime varchar NULL, endtime varchar NULL, payloadfile varchar NULL, schemaname varchar NULL, filename varchar NULL, layername varchar NULL);" > /dev/null | |
wget $NOMAD_META_fileurl -P /tmp | |
FILEPATH=/tmp/`basename $NOMAD_META_fileurl` | |
FILEURLEXT=${FILEPATH#*.} | |
FILEURLBASE=${NOMAD_META_fileurl::-4} | |
if [ "$FILEURLEXT" == "shp" ]; then | |
wget $FILEURLBASE.cpg -P /tmp | |
wget $FILEURLBASE.dbf -P /tmp | |
wget $FILEURLBASE.prj -P /tmp | |
wget $FILEURLBASE.shx -P /tmp | |
wget $FILEURLBASE.sbx -P /tmp | |
wget $FILEURLBASE.sbn -P /tmp | |
wget $FILEURLBASE.sbn -P /tmp | |
wget $FILEURLBASE.sbx -P /tmp | |
fi | |
CHECK_DISK_FREE_SPACE=false | |
#DBSCHEMA=a_multi_test | |
#FILEPATH=/dataset/RiverBasinDistrict/RiverBasinDistrict.shp | |
SHARDSNUMBER=32 | |
REPLICATIONFACTOR=4 | |
PG_USE_COPY=YES | |
#display_usage() { | |
# echo "Usage example: importsqlite.sh SCHEMANAME FILEPATHNAME" | |
# echo -e "\nUsage: $0 [arguments] \n" | |
#} | |
# | |
#if [ $# -le 1 ] | |
# then | |
# display_usage | |
# exit 1 | |
#fi | |
function import_raster { | |
#echo inside import_raster | |
LAYER_LOWERCASE=$(echo $LAYER | tr '[:upper:]' '[:lower:]' | tr "-" "_") | |
gdal_translate -of GTIFF $FILEPATH -oo TABLE=$LAYER $LAYER.tif -co COMPRESS=DEFLATE | |
#raster2pgsql -I -C -M -F -t 100x100 $LAYER.tif $DBSCHEMA.$LAYER_LOWERCASE > $LAYER.sql | |
#PGPASSWORD=$DBPASSWORD psql -v ON_ERROR_STOP=ON -U $DBUSER -p $DBPORT -h $DBHOST -d $DBNAME -f $LAYER.sql | |
#rm $LAYER.tif $LAYER.sql | |
PGPASSWORD=$DBPASSWORD psql -q -U $DBUSER -p $DBPORT -h $DBHOST -d $DBNAME -c "INSERT INTO public.nomad_job_imports_log values ('$NOMAD_ALLOC_ID', '$STARTTIME', null, '$NOMAD_META_PAYLOADFILE', '$DBSCHEMA', '$FILEPATH', '$LAYER_LOWERCASE');" | |
PGPASSWORD=$DBPASSWORD psql -q -U $DBUSER -p $DBPORT -h $DBHOST -d $DBNAME -c "DROP TABLE IF EXISTS $DBSCHEMA.o_2_$LAYER_LOWERCASE;" | |
PGPASSWORD=$DBPASSWORD psql -q -U $DBUSER -p $DBPORT -h $DBHOST -d $DBNAME -c "DROP TABLE IF EXISTS $DBSCHEMA.o_4_$LAYER_LOWERCASE;" | |
PGPASSWORD=$DBPASSWORD psql -q -U $DBUSER -p $DBPORT -h $DBHOST -d $DBNAME -c "DROP TABLE IF EXISTS $DBSCHEMA.o_8_$LAYER_LOWERCASE;" | |
PGPASSWORD=$DBPASSWORD psql -q -U $DBUSER -p $DBPORT -h $DBHOST -d $DBNAME -c "DROP TABLE IF EXISTS $DBSCHEMA.o_16_$LAYER_LOWERCASE;" | |
PGPASSWORD=$DBPASSWORD psql -q -U $DBUSER -p $DBPORT -h $DBHOST -d $DBNAME -c "DROP TABLE IF EXISTS $DBSCHEMA.o_32_$LAYER_LOWERCASE;" | |
#PGPASSWORD=$DBPASSWORD psql -q -U $DBUSER -p $DBPORT -h $DBHOST -d $DBNAME -c "DROP TABLE $DBSCHEMA.$LAYER;" | |
raster2pgsql -I -C -l 2,4,8,16,32 -M -F -t auto $LAYER.tif $DBSCHEMA.$LAYER_LOWERCASE | PGPASSWORD=$DBPASSWORD psql -v ON_ERROR_STOP=ON -U $DBUSER -p $DBPORT -h $DBHOST -d $DBNAME | |
PGPASSWORD=$DBPASSWORD psql -q -U $DBUSER -p $DBPORT -h $DBHOST -d $DBNAME -c "SELECT ST_CreateOverview('$DBSCHEMA.$LAYER_LOWERCASE'::regclass, 'rast', 2, 'Lanczos');" | |
PGPASSWORD=$DBPASSWORD psql -q -U $DBUSER -p $DBPORT -h $DBHOST -d $DBNAME -c "SELECT ST_CreateOverview('$DBSCHEMA.$LAYER_LOWERCASE'::regclass, 'rast', 4, 'Lanczos');" | |
PGPASSWORD=$DBPASSWORD psql -q -U $DBUSER -p $DBPORT -h $DBHOST -d $DBNAME -c "SELECT ST_CreateOverview('$DBSCHEMA.$LAYER_LOWERCASE'::regclass, 'rast', 8, 'Lanczos');" | |
#echo DISTRIBUTING TABLE \"$DBSCHEMA\".$LAYER_LOWERCASE | |
#PGPASSWORD=$DBPASSWORD psql -q -U $DBUSER -p $DBPORT -h $DBHOST -d $DBNAME -c "SET citus.shard_replication_factor = $REPLICATIONFACTOR; SET citus.shard_count=$SHARDSNUMBER; SELECT create_distributed_table('$DBSCHEMA.$LAYER_LOWERCASE', 'rid');" > /dev/null | |
# echo TRUNCATING LOCAL TABLE $DBSCHEMA.$LAYER_LOWERCASE | |
# PGPASSWORD=$DBPASSWORD psql -q -U $DBUSER -p $DBPORT -h $DBHOST -d $DBNAME -c "SELECT truncate_local_data_after_distributing_table('$DBSCHEMA.$LAYER_LOWERCASE');" > /dev/null | |
PGPASSWORD=$DBPASSWORD psql -q -U $DBUSER -p $DBPORT -h $DBHOST -d $DBNAME -c "UPDATE public.nomad_job_imports_log SET endtime = '$ENDTIME' WHERE nomad_alloc_id = '$NOMAD_ALLOC_ID' AND layername = '$LAYER_LOWERCASE';" > /dev/null | |
} | |
function distribute_table () { | |
echo | |
echo DISTRIBUTING TABLE $DBSCHEMA.$LAYER_LOWERCASE | |
PGPASSWORD=$DBPASSWORD psql -q -U $DBUSER -p $DBPORT -h $DBHOST -d $DBNAME -c "SET citus.shard_replication_factor = $REPLICATIONFACTOR; SET citus.shard_count = $SHARDSNUMBER; SELECT create_distributed_table('$DBSCHEMA.$LAYER_LOWERCASE', '$1');" | |
#echo REBALANCING TABLE $DBSCHEMA.$LAYER_LOWERCASE | |
#PGPASSWORD=$DBPASSWORD psql -q -U $DBUSER -p $DBPORT -h $DBHOST $DBNAME -c "SELECT rebalance_table_shards('$DBSCHEMA.$LAYER_LOWERCASE', rebalance_strategy := 'by_disk_size');" | |
#echo | |
#echo TRUNCATING LOCAL TABLE $DBSCHEMA.$LAYER_LOWERCASE | |
#PGPASSWORD=$DBPASSWORD psql -q -U $DBUSER -p $DBPORT -h $DBHOST -d $DBNAME -c "SELECT truncate_local_data_after_distributing_table('$DBSCHEMA.$LAYER_LOWERCASE');" | |
echo | |
if [ "$1" = "ogc_fid" ]; then | |
echo CREATING INDEX: $LAYERNAME_wkb_geometry_geom_idx ON TABLE $DBSCHEMA.$LAYER_LOWERCASE | |
PGPASSWORD=$DBPASSWORD psql -q -U $DBUSER -p $DBPORT -h $DBHOST -d $DBNAME -c "CREATE INDEX $LAYER_LOWERCASE_wkb_geometry_geom_idx ON \"$DBSCHEMA\".$LAYER_LOWERCASE USING gist (wkb_geometry)" | |
fi | |
#psql -U $DBUSER -p $DBPORT -h $DBHOST $DBN-d $DBNAME -c "VACUUM ANALYZE $DBSCHEMA.$LAYER_LOWERCASE;" | |
echo DONE WITH CREATING INDEX AND VACUUMING | |
} | |
echo PROCESSING $FILEPATH | |
PGPASSWORD=$DBPASSWORD psql -q -U $DBUSER -p $DBPORT -h $DBHOST -d $DBNAME -c "CREATE SCHEMA IF NOT EXISTS \"$DBSCHEMA\"" > /dev/null | |
PGPASSWORD=$DBPASSWORD psql -q -U $DBUSER -p $DBPORT -h $DBHOST -d $DBNAME -c "SELECT citus_set_default_rebalance_strategy('by_disk_size');" > /dev/null | |
#GIS LAYERS PROCESSING | |
LAYERS=$(ogrinfo -oo LIST_ALL_TABLES=YES $FILEPATH | grep ": " | awk '{print $2}' | tr ' ' '\n' | sed "/Open/d") | |
declare -a exclusionarray=('geom_cols_ref_sys' 'spatial_ref_sys' 'spatialindex' 'geometry_columns*' 'idx_*_geometry*' 'spatialite_history' 'sql_statements_log' 'sqlite_sequence' 'vector_layers*' 'views_geometry_columns*' 'virts_geometry_columns*') | |
if [ ! -z "$LAYERS" ] | |
then | |
#echo GIS LAYERS: $LAYERS | |
for LAYER in $LAYERS; do | |
LAYER_LOWERCASE=$(echo $LAYER | tr '[:upper:]' '[:lower:]' | tr "-" "_") | |
found_match="false" | |
for pattern in "${exclusionarray[@]}"; do | |
if [[ "$LAYER_LOWERCASE" == $pattern ]]; then | |
found_match="true" | |
break | |
fi | |
done | |
if [[ "$found_match" == true ]]; then | |
#echo find match on $LAYER_LOWERCASE | |
continue | |
else | |
EXISTS=$(PGPASSWORD=$DBPASSWORD psql -q -U $DBUSER -p $DBPORT -h $DBHOST -d $DBNAME -c "SELECT EXISTS ( SELECT FROM pg_tables WHERE schemaname='$DBSCHEMA' AND tablename='$LAYER_LOWERCASE') as check" | grep 't' | xargs) | |
if [ -z "$EXISTS" ] | |
then | |
echo OGR2OGR-ING LAYER $LAYER FROM $FILEPATH | |
echo | |
#PGPASSWORD=$DBPASSWORD psql -q -U $DBUSER -p $DBPORT -h $DBHOST -d $DBNAME -c "DROP TABLE $DBSCHEMA.$LAYER;" | |
PGPASSWORD=$DBPASSWORD psql -q -U $DBUSER -p $DBPORT -h $DBHOST -d $DBNAME -c "INSERT INTO public.nomad_job_imports_log values ('$NOMAD_ALLOC_ID', '$STARTTIME', null, '$NOMAD_META_PAYLOADFILE', '$DBSCHEMA', '$FILEPATH', '$LAYER');" | |
ogr2ogr -progress -f PostgreSQL PG:"dbname='$DBNAME' host='$DBHOST' port='$DBPORT' user='$DBUSER' password='$DBPASSWORD'" -gt 65536 -makevalid -dim XY -lco GEOMETRY_NAME=wkb_geometry -lco FID=ogc_fid -lco PRECISION=NO -lco PG_USE_COPY=YES -lco SCHEMA=$DBSCHEMA -nlt PROMOTE_TO_MULTI $FILEPATH $LAYER | |
#-overwrite | |
distribute_table "ogc_fid" | |
PGPASSWORD=$DBPASSWORD psql -q -U $DBUSER -p $DBPORT -h $DBHOST -d $DBNAME -c "UPDATE public.nomad_job_imports_log SET endtime = '$ENDTIME' WHERE nomad_alloc_id = '$NOMAD_ALLOC_ID' AND layername = '$LAYER';" > /dev/null | |
fi | |
fi | |
done | |
fi | |
#RASTER LAYERS PROCESSING | |
echo gdalinfo $FILEPATH | |
LAYERS=$(gdalinfo $FILEPATH | grep : | grep DATASET | awk -F':' ' { print $NF } ') | |
echo RASTER LAYERS $LAYERS | |
if [ ! -z "$LAYERS" ] | |
then | |
for LAYER in $LAYERS; do | |
LAYER_LOWERCASE=$(echo $LAYER | tr '[:upper:]' '[:lower:]' | tr "-" "_") | |
#echo LAYER_LOWERCASE=$LAYER_LOWERCASE | |
EXISTS=$(PGPASSWORD=$DBPASSWORD psql -q -U $DBUSER -p $DBPORT -h $DBHOST -d $DBNAME -c "SELECT EXISTS ( SELECT FROM pg_tables WHERE schemaname='$DBSCHEMA' AND tablename='$LAYER_LOWERCASE') as check" | grep ' t' | xargs) | |
#echo EXISTS=$EXISTS | |
#[[ $EXISTS =~ ^(t)$ ]] && continue | |
if [ -z "$EXISTS" ] | |
then | |
echo GDAIL-INGO $DBSCHEMA.$LAYER | |
echo raster from gdpg/sqlite | |
import_raster | |
fi | |
distribute_table "rid" | |
done | |
fi | |
#PURE RASTER IMPORT | |
PURERASTER=$(ogrinfo $FILEPATH | grep PROJCRS) | |
echo PURERASTER=$PURERASTER | |
if [ -z "$PURERASTER" ] | |
then | |
echo "inside pureraster" | |
LAYER=`basename $FILEPATH | sed 's/\(.*\)\..*/\1/'` | |
echo LAYER=$LAYER | |
LAYER_LOWERCASE=$(echo $LAYER | tr '[:upper:]' '[:lower:]' | tr "-" "_") | |
echo LAYER_LOWERCASE=$LAYER_LOWERCASE | |
EXISTS=$(PGPASSWORD=$DBPASSWORD psql -q -U $DBUSER -p $DBPORT -h $DBHOST -d $DBNAME -c "SELECT EXISTS ( SELECT FROM pg_tables WHERE schemaname='$DBSCHEMA' AND tablename='$LAYER_LOWERCASE') as check" | grep ' t' | xargs) | |
echo EXISTS=$EXISTS | |
if [ -z "$EXISTS" ] | |
then | |
echo GDAl-ING $DBSCHEMA.$LAYER | |
import_raster | |
fi | |
distribute_table "rid" | |
fi | |
PGPASSWORD=$DBPASSWORD psql -q -U $DBUSER -p $DBPORT -h $DBHOST -d $DBNAME -c "SELECT Populate_Geometry_Columns('\"$DBSCHEMA\".\"$LAYER_LOWERCASE\"'::regclass);" > /dev/null | |
PGPASSWORD=$DBPASSWORD psql -q -U $DBUSER -p $DBPORT -h $DBHOST -d $DBNAME -c "GRANT USAGE ON SCHEMA \"$DBSCHEMA\" TO gr_gis_sdi_ldap" > /dev/null | |
PGPASSWORD=$DBPASSWORD psql -q -U $DBUSER -p $DBPORT -h $DBHOST -d $DBNAME -c "SELECT run_command_on_workers(\$cmd\$ GRANT USAGE ON SCHEMA \"$DBSCHEMA\" TO gr_gis_sdi_ldap \$cmd\$)" > /dev/null | |
PGPASSWORD=$DBPASSWORD psql -q -U $DBUSER -p $DBPORT -h $DBHOST -d $DBNAME -c "GRANT SELECT ON ALL TABLES IN SCHEMA \"$DBSCHEMA\" TO gr_gis_sdi_ldap" > /dev/null | |
PGPASSWORD=$DBPASSWORD psql -q -U $DBUSER -p $DBPORT -h $DBHOST -d $DBNAME -c "SELECT run_command_on_workers(\$cmd\$ GRANT SELECT ON ALL TABLES IN SCHEMA \"$DBSCHEMA\" TO gr_gis_sdi_ldap \$cmd\$)" > /dev/null | |
ENDTIME=$(date +%s) | |
#PGPASSWORD=$DBPASSWORD psql -q -U $DBUSER -p $DBPORT -h $DBHOST -d $DBNAME -c "UPDATE public.nomad_job_imports_log SET endtime = '$ENDTIME' WHERE nomad_alloc_id = '$NOMAD_ALLOC_ID';" > /dev/null |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment