Skip to content

Instantly share code, notes, and snippets.

@michimau
Last active May 31, 2022 20:02
Show Gist options
  • Save michimau/c7cbf104651d1d1ddbff48c77435dd2b to your computer and use it in GitHub Desktop.
Save michimau/c7cbf104651d1d1ddbff48c77435dd2b to your computer and use it in GitHub Desktop.
gis data ingestion into citus postgresql cluster for nomad job usage
#!/bin/bash
DBHOST=$NOMAD_META_db_host
DBNAME=$NOMAD_META_db_name
DBPORT=$NOMAD_META_db_port
DBUSER=$NOMAD_META_db_user
DBPASSWORD=$NOMAD_META_db_password
DBSCHEMA=$NOMAD_META_db_schema
FILEURL=$NOMAD_META_fileurl
STARTTIME=$(date +%s)
PGPASSWORD=$DBPASSWORD psql -q -U $DBUSER -p $DBPORT -h $DBHOST -d $DBNAME -c "CREATE TABLE IF NOT EXISTS public.nomad_job_imports_log (nomad_alloc_id varchar NULL, starttime varchar NULL, endtime varchar NULL, payloadfile varchar NULL, schemaname varchar NULL, filename varchar NULL, layername varchar NULL);" > /dev/null
wget $NOMAD_META_fileurl -P /tmp
FILEPATH=/tmp/`basename $NOMAD_META_fileurl`
FILEURLEXT=${FILEPATH#*.}
FILEURLBASE=${NOMAD_META_fileurl::-4}
if [ "$FILEURLEXT" == "shp" ]; then
wget $FILEURLBASE.cpg -P /tmp
wget $FILEURLBASE.dbf -P /tmp
wget $FILEURLBASE.prj -P /tmp
wget $FILEURLBASE.shx -P /tmp
wget $FILEURLBASE.sbx -P /tmp
wget $FILEURLBASE.sbn -P /tmp
wget $FILEURLBASE.sbn -P /tmp
wget $FILEURLBASE.sbx -P /tmp
fi
CHECK_DISK_FREE_SPACE=false
#DBSCHEMA=a_multi_test
#FILEPATH=/dataset/RiverBasinDistrict/RiverBasinDistrict.shp
SHARDSNUMBER=32
REPLICATIONFACTOR=4
PG_USE_COPY=YES
#display_usage() {
# echo "Usage example: importsqlite.sh SCHEMANAME FILEPATHNAME"
# echo -e "\nUsage: $0 [arguments] \n"
#}
#
#if [ $# -le 1 ]
# then
# display_usage
# exit 1
#fi
function import_raster {
#echo inside import_raster
LAYER_LOWERCASE=$(echo $LAYER | tr '[:upper:]' '[:lower:]' | tr "-" "_")
gdal_translate -of GTIFF $FILEPATH -oo TABLE=$LAYER $LAYER.tif -co COMPRESS=DEFLATE
#raster2pgsql -I -C -M -F -t 100x100 $LAYER.tif $DBSCHEMA.$LAYER_LOWERCASE > $LAYER.sql
#PGPASSWORD=$DBPASSWORD psql -v ON_ERROR_STOP=ON -U $DBUSER -p $DBPORT -h $DBHOST -d $DBNAME -f $LAYER.sql
#rm $LAYER.tif $LAYER.sql
PGPASSWORD=$DBPASSWORD psql -q -U $DBUSER -p $DBPORT -h $DBHOST -d $DBNAME -c "INSERT INTO public.nomad_job_imports_log values ('$NOMAD_ALLOC_ID', '$STARTTIME', null, '$NOMAD_META_PAYLOADFILE', '$DBSCHEMA', '$FILEPATH', '$LAYER_LOWERCASE');"
PGPASSWORD=$DBPASSWORD psql -q -U $DBUSER -p $DBPORT -h $DBHOST -d $DBNAME -c "DROP TABLE IF EXISTS $DBSCHEMA.o_2_$LAYER_LOWERCASE;"
PGPASSWORD=$DBPASSWORD psql -q -U $DBUSER -p $DBPORT -h $DBHOST -d $DBNAME -c "DROP TABLE IF EXISTS $DBSCHEMA.o_4_$LAYER_LOWERCASE;"
PGPASSWORD=$DBPASSWORD psql -q -U $DBUSER -p $DBPORT -h $DBHOST -d $DBNAME -c "DROP TABLE IF EXISTS $DBSCHEMA.o_8_$LAYER_LOWERCASE;"
PGPASSWORD=$DBPASSWORD psql -q -U $DBUSER -p $DBPORT -h $DBHOST -d $DBNAME -c "DROP TABLE IF EXISTS $DBSCHEMA.o_16_$LAYER_LOWERCASE;"
PGPASSWORD=$DBPASSWORD psql -q -U $DBUSER -p $DBPORT -h $DBHOST -d $DBNAME -c "DROP TABLE IF EXISTS $DBSCHEMA.o_32_$LAYER_LOWERCASE;"
#PGPASSWORD=$DBPASSWORD psql -q -U $DBUSER -p $DBPORT -h $DBHOST -d $DBNAME -c "DROP TABLE $DBSCHEMA.$LAYER;"
raster2pgsql -I -C -l 2,4,8,16,32 -M -F -t auto $LAYER.tif $DBSCHEMA.$LAYER_LOWERCASE | PGPASSWORD=$DBPASSWORD psql -v ON_ERROR_STOP=ON -U $DBUSER -p $DBPORT -h $DBHOST -d $DBNAME
PGPASSWORD=$DBPASSWORD psql -q -U $DBUSER -p $DBPORT -h $DBHOST -d $DBNAME -c "SELECT ST_CreateOverview('$DBSCHEMA.$LAYER_LOWERCASE'::regclass, 'rast', 2, 'Lanczos');"
PGPASSWORD=$DBPASSWORD psql -q -U $DBUSER -p $DBPORT -h $DBHOST -d $DBNAME -c "SELECT ST_CreateOverview('$DBSCHEMA.$LAYER_LOWERCASE'::regclass, 'rast', 4, 'Lanczos');"
PGPASSWORD=$DBPASSWORD psql -q -U $DBUSER -p $DBPORT -h $DBHOST -d $DBNAME -c "SELECT ST_CreateOverview('$DBSCHEMA.$LAYER_LOWERCASE'::regclass, 'rast', 8, 'Lanczos');"
#echo DISTRIBUTING TABLE \"$DBSCHEMA\".$LAYER_LOWERCASE
#PGPASSWORD=$DBPASSWORD psql -q -U $DBUSER -p $DBPORT -h $DBHOST -d $DBNAME -c "SET citus.shard_replication_factor = $REPLICATIONFACTOR; SET citus.shard_count=$SHARDSNUMBER; SELECT create_distributed_table('$DBSCHEMA.$LAYER_LOWERCASE', 'rid');" > /dev/null
# echo TRUNCATING LOCAL TABLE $DBSCHEMA.$LAYER_LOWERCASE
# PGPASSWORD=$DBPASSWORD psql -q -U $DBUSER -p $DBPORT -h $DBHOST -d $DBNAME -c "SELECT truncate_local_data_after_distributing_table('$DBSCHEMA.$LAYER_LOWERCASE');" > /dev/null
PGPASSWORD=$DBPASSWORD psql -q -U $DBUSER -p $DBPORT -h $DBHOST -d $DBNAME -c "UPDATE public.nomad_job_imports_log SET endtime = '$ENDTIME' WHERE nomad_alloc_id = '$NOMAD_ALLOC_ID' AND layername = '$LAYER_LOWERCASE';" > /dev/null
}
function distribute_table () {
echo
echo DISTRIBUTING TABLE $DBSCHEMA.$LAYER_LOWERCASE
PGPASSWORD=$DBPASSWORD psql -q -U $DBUSER -p $DBPORT -h $DBHOST -d $DBNAME -c "SET citus.shard_replication_factor = $REPLICATIONFACTOR; SET citus.shard_count = $SHARDSNUMBER; SELECT create_distributed_table('$DBSCHEMA.$LAYER_LOWERCASE', '$1');"
#echo REBALANCING TABLE $DBSCHEMA.$LAYER_LOWERCASE
#PGPASSWORD=$DBPASSWORD psql -q -U $DBUSER -p $DBPORT -h $DBHOST $DBNAME -c "SELECT rebalance_table_shards('$DBSCHEMA.$LAYER_LOWERCASE', rebalance_strategy := 'by_disk_size');"
#echo
#echo TRUNCATING LOCAL TABLE $DBSCHEMA.$LAYER_LOWERCASE
#PGPASSWORD=$DBPASSWORD psql -q -U $DBUSER -p $DBPORT -h $DBHOST -d $DBNAME -c "SELECT truncate_local_data_after_distributing_table('$DBSCHEMA.$LAYER_LOWERCASE');"
echo
if [ "$1" = "ogc_fid" ]; then
echo CREATING INDEX: $LAYERNAME_wkb_geometry_geom_idx ON TABLE $DBSCHEMA.$LAYER_LOWERCASE
PGPASSWORD=$DBPASSWORD psql -q -U $DBUSER -p $DBPORT -h $DBHOST -d $DBNAME -c "CREATE INDEX $LAYER_LOWERCASE_wkb_geometry_geom_idx ON \"$DBSCHEMA\".$LAYER_LOWERCASE USING gist (wkb_geometry)"
fi
#psql -U $DBUSER -p $DBPORT -h $DBHOST $DBN-d $DBNAME -c "VACUUM ANALYZE $DBSCHEMA.$LAYER_LOWERCASE;"
echo DONE WITH CREATING INDEX AND VACUUMING
}
echo PROCESSING $FILEPATH
PGPASSWORD=$DBPASSWORD psql -q -U $DBUSER -p $DBPORT -h $DBHOST -d $DBNAME -c "CREATE SCHEMA IF NOT EXISTS \"$DBSCHEMA\"" > /dev/null
PGPASSWORD=$DBPASSWORD psql -q -U $DBUSER -p $DBPORT -h $DBHOST -d $DBNAME -c "SELECT citus_set_default_rebalance_strategy('by_disk_size');" > /dev/null
#GIS LAYERS PROCESSING
LAYERS=$(ogrinfo -oo LIST_ALL_TABLES=YES $FILEPATH | grep ": " | awk '{print $2}' | tr ' ' '\n' | sed "/Open/d")
declare -a exclusionarray=('geom_cols_ref_sys' 'spatial_ref_sys' 'spatialindex' 'geometry_columns*' 'idx_*_geometry*' 'spatialite_history' 'sql_statements_log' 'sqlite_sequence' 'vector_layers*' 'views_geometry_columns*' 'virts_geometry_columns*')
if [ ! -z "$LAYERS" ]
then
#echo GIS LAYERS: $LAYERS
for LAYER in $LAYERS; do
LAYER_LOWERCASE=$(echo $LAYER | tr '[:upper:]' '[:lower:]' | tr "-" "_")
found_match="false"
for pattern in "${exclusionarray[@]}"; do
if [[ "$LAYER_LOWERCASE" == $pattern ]]; then
found_match="true"
break
fi
done
if [[ "$found_match" == true ]]; then
#echo find match on $LAYER_LOWERCASE
continue
else
EXISTS=$(PGPASSWORD=$DBPASSWORD psql -q -U $DBUSER -p $DBPORT -h $DBHOST -d $DBNAME -c "SELECT EXISTS ( SELECT FROM pg_tables WHERE schemaname='$DBSCHEMA' AND tablename='$LAYER_LOWERCASE') as check" | grep 't' | xargs)
if [ -z "$EXISTS" ]
then
echo OGR2OGR-ING LAYER $LAYER FROM $FILEPATH
echo
#PGPASSWORD=$DBPASSWORD psql -q -U $DBUSER -p $DBPORT -h $DBHOST -d $DBNAME -c "DROP TABLE $DBSCHEMA.$LAYER;"
PGPASSWORD=$DBPASSWORD psql -q -U $DBUSER -p $DBPORT -h $DBHOST -d $DBNAME -c "INSERT INTO public.nomad_job_imports_log values ('$NOMAD_ALLOC_ID', '$STARTTIME', null, '$NOMAD_META_PAYLOADFILE', '$DBSCHEMA', '$FILEPATH', '$LAYER');"
ogr2ogr -progress -f PostgreSQL PG:"dbname='$DBNAME' host='$DBHOST' port='$DBPORT' user='$DBUSER' password='$DBPASSWORD'" -gt 65536 -makevalid -dim XY -lco GEOMETRY_NAME=wkb_geometry -lco FID=ogc_fid -lco PRECISION=NO -lco PG_USE_COPY=YES -lco SCHEMA=$DBSCHEMA -nlt PROMOTE_TO_MULTI $FILEPATH $LAYER
#-overwrite
distribute_table "ogc_fid"
PGPASSWORD=$DBPASSWORD psql -q -U $DBUSER -p $DBPORT -h $DBHOST -d $DBNAME -c "UPDATE public.nomad_job_imports_log SET endtime = '$ENDTIME' WHERE nomad_alloc_id = '$NOMAD_ALLOC_ID' AND layername = '$LAYER';" > /dev/null
fi
fi
done
fi
#RASTER LAYERS PROCESSING
echo gdalinfo $FILEPATH
LAYERS=$(gdalinfo $FILEPATH | grep : | grep DATASET | awk -F':' ' { print $NF } ')
echo RASTER LAYERS $LAYERS
if [ ! -z "$LAYERS" ]
then
for LAYER in $LAYERS; do
LAYER_LOWERCASE=$(echo $LAYER | tr '[:upper:]' '[:lower:]' | tr "-" "_")
#echo LAYER_LOWERCASE=$LAYER_LOWERCASE
EXISTS=$(PGPASSWORD=$DBPASSWORD psql -q -U $DBUSER -p $DBPORT -h $DBHOST -d $DBNAME -c "SELECT EXISTS ( SELECT FROM pg_tables WHERE schemaname='$DBSCHEMA' AND tablename='$LAYER_LOWERCASE') as check" | grep ' t' | xargs)
#echo EXISTS=$EXISTS
#[[ $EXISTS =~ ^(t)$ ]] && continue
if [ -z "$EXISTS" ]
then
echo GDAIL-INGO $DBSCHEMA.$LAYER
echo raster from gdpg/sqlite
import_raster
fi
distribute_table "rid"
done
fi
#PURE RASTER IMPORT
PURERASTER=$(ogrinfo $FILEPATH | grep PROJCRS)
echo PURERASTER=$PURERASTER
if [ -z "$PURERASTER" ]
then
echo "inside pureraster"
LAYER=`basename $FILEPATH | sed 's/\(.*\)\..*/\1/'`
echo LAYER=$LAYER
LAYER_LOWERCASE=$(echo $LAYER | tr '[:upper:]' '[:lower:]' | tr "-" "_")
echo LAYER_LOWERCASE=$LAYER_LOWERCASE
EXISTS=$(PGPASSWORD=$DBPASSWORD psql -q -U $DBUSER -p $DBPORT -h $DBHOST -d $DBNAME -c "SELECT EXISTS ( SELECT FROM pg_tables WHERE schemaname='$DBSCHEMA' AND tablename='$LAYER_LOWERCASE') as check" | grep ' t' | xargs)
echo EXISTS=$EXISTS
if [ -z "$EXISTS" ]
then
echo GDAl-ING $DBSCHEMA.$LAYER
import_raster
fi
distribute_table "rid"
fi
PGPASSWORD=$DBPASSWORD psql -q -U $DBUSER -p $DBPORT -h $DBHOST -d $DBNAME -c "SELECT Populate_Geometry_Columns('\"$DBSCHEMA\".\"$LAYER_LOWERCASE\"'::regclass);" > /dev/null
PGPASSWORD=$DBPASSWORD psql -q -U $DBUSER -p $DBPORT -h $DBHOST -d $DBNAME -c "GRANT USAGE ON SCHEMA \"$DBSCHEMA\" TO gr_gis_sdi_ldap" > /dev/null
PGPASSWORD=$DBPASSWORD psql -q -U $DBUSER -p $DBPORT -h $DBHOST -d $DBNAME -c "SELECT run_command_on_workers(\$cmd\$ GRANT USAGE ON SCHEMA \"$DBSCHEMA\" TO gr_gis_sdi_ldap \$cmd\$)" > /dev/null
PGPASSWORD=$DBPASSWORD psql -q -U $DBUSER -p $DBPORT -h $DBHOST -d $DBNAME -c "GRANT SELECT ON ALL TABLES IN SCHEMA \"$DBSCHEMA\" TO gr_gis_sdi_ldap" > /dev/null
PGPASSWORD=$DBPASSWORD psql -q -U $DBUSER -p $DBPORT -h $DBHOST -d $DBNAME -c "SELECT run_command_on_workers(\$cmd\$ GRANT SELECT ON ALL TABLES IN SCHEMA \"$DBSCHEMA\" TO gr_gis_sdi_ldap \$cmd\$)" > /dev/null
ENDTIME=$(date +%s)
#PGPASSWORD=$DBPASSWORD psql -q -U $DBUSER -p $DBPORT -h $DBHOST -d $DBNAME -c "UPDATE public.nomad_job_imports_log SET endtime = '$ENDTIME' WHERE nomad_alloc_id = '$NOMAD_ALLOC_ID';" > /dev/null
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment