Last active
September 17, 2019 10:49
-
-
Save schoeller/9cff501493d4c66b267647a23ecde471 to your computer and use it in GitHub Desktop.
Simulating dam break
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Based on https://github.com/Hydrata/ChennaiFloodModel and adapted to Anuga examples for further automisation | |
# Anuga imports | |
from math import sin, pi, exp | |
from osgeo import gdal, osr | |
import csv | |
import anuga, os, sys, numpy, urllib | |
import anuga.utilities.quantity_setting_functions as qs | |
import anuga.utilities.log as log | |
from anuga.shallow_water.forcing import Rainfall | |
def run_dambreak(sim_id): | |
project_root = os.path.abspath(os.path.dirname(__file__)) | |
if not os.path.exists(project_root): | |
os.makedirs(project_root) | |
print "project_root = " + project_root | |
inputs_dir = "%s/inputs/" % project_root | |
if not os.path.exists(inputs_dir): | |
os.makedirs(inputs_dir) | |
print "inputs_dir = " + inputs_dir | |
working_dir = "%s/working/%s/" % (project_root, sim_id) | |
if not os.path.exists(working_dir): | |
os.makedirs(working_dir) | |
print "working_dir = " + working_dir | |
outputs_dir = "%s/outputs/%s" % (project_root, sim_id) | |
if not os.path.exists(outputs_dir): | |
os.makedirs(outputs_dir) | |
print "outputs_dir = " + outputs_dir | |
# get data | |
if os.path.isfile(inputs_dir + "N11E008.tif"): | |
print "omitting download..." | |
else: | |
print "downloading data..." | |
urllib.urlretrieve( | |
"http://geoweb.hft-stuttgart.de/SRTM/africa/N11E008.tif", | |
inputs_dir + "N11E008.tif" | |
) | |
# translating data | |
if os.path.isfile(inputs_dir + "N11E008.asc"): | |
print "omitting translation..." | |
else: | |
print "translating to ASC..." | |
options_list = [ | |
"-of AAIGrid" | |
] | |
options_string = " ".join(options_list) | |
gdal.Translate(inputs_dir + "N11E008.asc", | |
inputs_dir + "N11E008.tif", | |
options=options_string) | |
# correcting projection | |
ds=gdal.Open(inputs_dir + "N11E008.asc") | |
prj=ds.GetProjection() | |
#print prj | |
srs=osr.SpatialReference(wkt=prj) | |
if srs.IsProjected: | |
zone = str(srs.GetAttrValue("projcs"))[-3:-1] | |
proj = {"Projection": "UTM", "Zone": zone, "Datum": "WGS84", "Zunits": "NO", "Units": "METERS", "Spheroid": "WGS84", "Xshift": "500000", "Yshift": "10000000", "Parameters":""} | |
f = csv.writer(open(inputs_dir + "N11E008.prj", "w"), delimiter='\t') | |
for key, val in proj.items(): | |
f.writerow([key, val]) | |
#print srs.GetAttrValue("geogcs") | |
print os.listdir(inputs_dir) | |
# configure logging TODO: get this working! | |
log_location = project_root + "/" + sim_id + ".log" | |
open(log_location, "a").close() | |
log.console_logging_level = log.INFO | |
log.log_logging_level = log.DEBUG | |
log.log_filename = log_location | |
print "# log.log_filename is: " + log.log_filename | |
print "# log_location is: " + log_location | |
log.debug("A message at DEBUG level") | |
log.info("Another message, INFO level") | |
print "# runtime parameters" | |
cache = False | |
verbose = True | |
print "# converting DEM to PTS" | |
print inputs_dir + "N11E008.asc" | |
#anuga.asc2dem(inputs_dir + "N11E008.asc", use_cache=False, verbose=True) | |
anuga.dem2pts(inputs_dir + "N11E008.dem", use_cache=False, verbose=True) | |
print "# starting" | |
bounding_polygon_01 = anuga.read_polygon(inputs_dir + "tiga.csv") | |
A = anuga.polygon_area(bounding_polygon_01) / 1000000.0 | |
log.info("Area of bounding polygon = %.2f km^2" % A) | |
boundary_tags_01 = { | |
"inland": [], | |
"ocean": [] | |
} | |
print "# Create domain:" | |
print "# mesh_filename = " + working_dir + "mesh_01.msh" | |
domain = anuga.create_domain_from_regions(bounding_polygon=bounding_polygon_01, | |
boundary_tags=boundary_tags_01, | |
mesh_filename=working_dir + "mesh_01.msh", | |
maximum_triangle_area=100000, | |
verbose=True) | |
domain.set_name(sim_id) | |
domain.set_datadir(outputs_dir) | |
poly_fun_pairs = [ | |
[ | |
"Extent", | |
inputs_dir + "N11E008.tif" | |
] | |
] | |
print "# create topography_function" | |
print "input raster = " + inputs_dir + "N11E008.tif" | |
topography_function = qs.composite_quantity_setting_function( | |
poly_fun_pairs, | |
domain, | |
nan_treatment="exception", | |
) | |
print topography_function | |
print "# set_quantity elevation" | |
domain.set_quantity("elevation", topography_function) # Use function for elevation | |
domain.set_quantity("friction", 0.03) # Constant friction | |
domain.set_quantity("stage", 1) # Constant initial stage | |
print "# all quantities set" | |
print "# Setup boundary conditions" | |
Br = anuga.Reflective_boundary(domain) # Solid reflective wall | |
Bt = anuga.Transmissive_boundary(domain) # Continue all values on boundary | |
Bd = anuga.Dirichlet_boundary([-20, 0., 0.]) # Constant boundary values | |
Bi = anuga.Dirichlet_boundary([10.0, 0, 0]) # Inflow | |
Bw = anuga.Time_boundary( | |
domain=domain, # Time dependent boundary | |
function=lambda t: [(10 * sin(t * 2 * pi) - 0.3) * exp(-t), 0.0, 0.0] | |
) | |
print "# Associate boundary tags with boundary objects" | |
domain.set_boundary({"inland": Br, "ocean": Bd}) | |
print domain.get_boundary_tags() | |
catchmentrainfall = Rainfall( | |
domain=domain, | |
rate=0.2 | |
) | |
# # Note need path to File in String. | |
# # Else assumed in same directory | |
domain.forcing_terms.append(catchmentrainfall) | |
print "#Forcing write conditions for output" | |
domain.store_centroids = False | |
print "# Evolve system through time" | |
counter_timestep = 0 | |
for t in domain.evolve(yieldstep=300, finaltime=6000): | |
counter_timestep += 1 | |
print counter_timestep | |
print domain.timestepping_statistics() | |
asc_out_momentum = outputs_dir + "/" + sim_id + "_momentum.asc" | |
asc_out_depth = outputs_dir + "/" + sim_id + "_depth.asc" | |
anuga.sww2dem(outputs_dir + "/" + sim_id + ".sww", | |
asc_out_momentum, | |
quantity="momentum", | |
number_of_decimal_places=3, | |
cellsize=30, | |
reduction=max, | |
verbose=True) | |
anuga.sww2dem(outputs_dir + "/" + sim_id + ".sww", | |
asc_out_depth, | |
quantity="depth", | |
number_of_decimal_places=3, | |
cellsize=30, | |
reduction=max, | |
verbose=True) | |
outputs =[asc_out_depth, asc_out_momentum] | |
for output in outputs: | |
print "# Properly close the datasets to flush the disk" | |
dst_filename = None | |
src_ds = None | |
print "Done. Nice work." | |
if __name__ == "__main__": | |
# TODO: parse argv for local development | |
run_dambreak("2") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment