Skip to content

Instantly share code, notes, and snippets.

@jbusecke
Created September 14, 2021 19:54
Show Gist options
  • Save jbusecke/9a2be41abef1528adf20765631f43db6 to your computer and use it in GitHub Desktop.
Save jbusecke/9a2be41abef1528adf20765631f43db6 to your computer and use it in GitHub Desktop.
recipe pickle hack for charles
@contextmanager
def open_input(
input_key: InputKey,
file_pattern: FilePattern,
input_cache: Optional[CacheFSSpecTarget],
cache_inputs: bool,
copy_input_to_local_file: bool,
xarray_open_kwargs: dict,
delete_input_encoding: bool,
process_input: Optional[Callable[[xr.Dataset, str], xr.Dataset]],
) -> xr.Dataset:
fname = file_pattern[input_key]
logger.info(f"Opening input with Xarray {input_key!s}: '{fname}'")
if file_pattern.is_opendap:
if input_cache:
raise ValueError("Can't cache opendap inputs")
if copy_input_to_local_file:
raise ValueError("Can't copy opendap inputs to local file")
cache = input_cache if cache_inputs else None
with file_opener(
fname,
cache=cache,
copy_to_local=copy_input_to_local_file,
bypass_open=file_pattern.is_opendap,
secrets=file_pattern.query_string_secrets,
**file_pattern.fsspec_open_kwargs,
) as f:
with dask.config.set(scheduler="single-threaded"): # make sure we don't use a scheduler
logger.debug(f"about to enter xr.open_dataset context on {f}")
with pickle.load(open(f.name, 'rb')) as ds:
logger.debug("successfully opened dataset")
ds = fix_scalar_attr_encoding(ds)
if delete_input_encoding:
for var in ds.variables:
ds[var].encoding = {}
if process_input is not None:
ds = process_input(ds, str(fname))
logger.debug(f"{ds}")
yield ds
#Original version
# @contextmanager
# def open_input(
# input_key: InputKey,
# file_pattern: FilePattern,
# input_cache: Optional[CacheFSSpecTarget],
# cache_inputs: bool,
# copy_input_to_local_file: bool,
# xarray_open_kwargs: dict,
# delete_input_encoding: bool,
# process_input: Optional[Callable[[xr.Dataset, str], xr.Dataset]],
# ) -> xr.Dataset:
# fname = file_pattern[input_key]
# logger.info(f"Opening input with Xarray {input_key!s}: '{fname}'")
# if file_pattern.is_opendap:
# if input_cache:
# raise ValueError("Can't cache opendap inputs")
# if copy_input_to_local_file:
# raise ValueError("Can't copy opendap inputs to local file")
# cache = input_cache if cache_inputs else None
# with file_opener(
# fname,
# cache=cache,
# copy_to_local=copy_input_to_local_file,
# bypass_open=file_pattern.is_opendap,
# secrets=file_pattern.query_string_secrets,
# **file_pattern.fsspec_open_kwargs,
# ) as f:
# with dask.config.set(scheduler="single-threaded"): # make sure we don't use a scheduler
# kw = xarray_open_kwargs.copy()
# if "engine" not in kw:
# kw["engine"] = "h5netcdf"
# logger.debug(f"about to enter xr.open_dataset context on {f}")
# with xr.open_dataset(f, **kw) as ds:
# logger.debug("successfully opened dataset")
# ds = fix_scalar_attr_encoding(ds)
# if delete_input_encoding:
# for var in ds.variables:
# ds[var].encoding = {}
# if process_input is not None:
# ds = process_input(ds, str(fname))
# logger.debug(f"{ds}")
# yield ds
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment