Skip to content

Instantly share code, notes, and snippets.

@andersy005
Last active July 18, 2020 05:06
Show Gist options
  • Select an option

  • Save andersy005/5fb0954efd9e5fd6ea29d95c73ec6e76 to your computer and use it in GitHub Desktop.

Select an option

Save andersy005/5fb0954efd9e5fd6ea29d95c73ec6e76 to your computer and use it in GitHub Desktop.
import os.path
import warnings
from glob import glob
from io import BytesIO
from numbers import Number
from pathlib import Path
from xarray.backends.common import AbstractDataStore, ArrayWriter
from xarray.core.utils import close_on_error, is_grib_path, is_remote_uri
from xarray.core import indexing
from xarray import backends, conventions, coding
def _get_default_engine_remote_uri():
try:
import netCDF4 # noqa: F401
engine = "netcdf4"
except ImportError: # pragma: no cover
try:
import pydap # noqa: F401
engine = "pydap"
except ImportError:
raise ValueError(
"netCDF4 or pydap is required for accessing "
"remote datasets via OPeNDAP"
)
return engine
def _get_default_engine_grib():
msgs = []
try:
import Nio # noqa: F401
msgs += ["set engine='pynio' to access GRIB files with PyNIO"]
except ImportError: # pragma: no cover
pass
try:
import cfgrib # noqa: F401
msgs += ["set engine='cfgrib' to access GRIB files with cfgrib"]
except ImportError: # pragma: no cover
pass
if msgs:
raise ValueError(" or\n".join(msgs))
else:
raise ValueError("PyNIO or cfgrib is required for accessing " "GRIB files")
def _get_default_engine_gz():
try:
import scipy # noqa: F401
engine = "scipy"
except ImportError: # pragma: no cover
raise ValueError("scipy is required for accessing .gz files")
return engine
def _get_default_engine_netcdf():
try:
import netCDF4 # noqa: F401
engine = "netcdf4"
except ImportError: # pragma: no cover
try:
import scipy.io.netcdf # noqa: F401
engine = "scipy"
except ImportError:
raise ValueError(
"cannot read or write netCDF files without "
"netCDF4-python or scipy installed"
)
return engine
def _get_engine_from_magic_number(filename_or_obj):
# check byte header to determine file type
if isinstance(filename_or_obj, bytes):
magic_number = filename_or_obj[:8]
else:
if filename_or_obj.tell() != 0:
raise ValueError(
"file-like object read/write pointer not at zero "
"please close and reopen, or use a context "
"manager"
)
magic_number = filename_or_obj.read(8)
filename_or_obj.seek(0)
if magic_number.startswith(b"CDF"):
engine = "scipy"
elif magic_number.startswith(b"\211HDF\r\n\032\n"):
engine = "h5netcdf"
if isinstance(filename_or_obj, bytes):
raise ValueError(
"can't open netCDF4/HDF5 as bytes "
"try passing a path or file-like object"
)
else:
if isinstance(filename_or_obj, bytes) and len(filename_or_obj) > 80:
filename_or_obj = filename_or_obj[:80] + b"..."
raise ValueError(
"{} is not a valid netCDF file "
"did you mean to pass a string for a path instead?".format(filename_or_obj)
)
return engine
def _get_default_engine(path, allow_remote=False):
if allow_remote and is_remote_uri(path):
engine = _get_default_engine_remote_uri()
elif is_grib_path(path):
engine = _get_default_engine_grib()
elif path.endswith(".gz"):
engine = _get_default_engine_gz()
else:
engine = _get_default_engine_netcdf()
return engine
def _normalize_path(path):
if is_remote_uri(path):
return path
else:
return os.path.abspath(os.path.expanduser(path))
def _validate_dataset_names(dataset):
"""DataArray.name and Dataset keys must be a string or None"""
def check_name(name):
if isinstance(name, str):
if not name:
raise ValueError(
"Invalid name for DataArray or Dataset key: "
"string must be length 1 or greater for "
"serialization to netCDF files"
)
elif name is not None:
raise TypeError(
"DataArray.name or Dataset key must be either a "
"string or None for serialization to netCDF files"
)
for k in dataset.variables:
check_name(k)
def _validate_attrs(dataset):
"""`attrs` must have a string key and a value which is either: a number,
a string, an ndarray or a list/tuple of numbers/strings.
"""
def check_attr(name, value):
if isinstance(name, str):
if not name:
raise ValueError(
"Invalid name for attr: string must be "
"length 1 or greater for serialization to "
"netCDF files"
)
else:
raise TypeError(
"Invalid name for attr: {} must be a string for "
"serialization to netCDF files".format(name)
)
if not isinstance(value, (str, Number, np.ndarray, np.number, list, tuple)):
raise TypeError(
"Invalid value for attr: {} must be a number, "
"a string, an ndarray or a list/tuple of "
"numbers/strings for serialization to netCDF "
"files".format(value)
)
# Check attrs on the dataset itself
for k, v in dataset.attrs.items():
check_attr(k, v)
# Check attrs on each variable within the dataset
for variable in dataset.variables.values():
for k, v in variable.attrs.items():
check_attr(k, v)
def _protect_dataset_variables_inplace(dataset, cache):
for name, variable in dataset.variables.items():
if name not in variable.dims:
# no need to protect IndexVariable objects
data = indexing.CopyOnWriteArray(variable._data)
if cache:
data = indexing.MemoryCachedArray(data)
variable.data = data
def _finalize_store(write, store):
""" Finalize this store by explicitly syncing and closing"""
del write # ensure writing is done first
store.close()
def maybe_decode_store(store, mask_and_scale, decode_times, concat_characters, decode_coords, drop_variables, use_cftime, decode_timedelta, cache, chunks, filename_or_obj, group, decode_cf, engine, lock=False):
ds = conventions.decode_cf(
store,
mask_and_scale=mask_and_scale,
decode_times=decode_times,
concat_characters=concat_characters,
decode_coords=decode_coords,
drop_variables=drop_variables,
use_cftime=use_cftime,
decode_timedelta=decode_timedelta,
)
_protect_dataset_variables_inplace(ds, cache)
if chunks is not None:
from dask.base import tokenize
# if passed an actual file path, augment the token with
# the file modification time
if isinstance(filename_or_obj, str) and not is_remote_uri(filename_or_obj):
mtime = os.path.getmtime(filename_or_obj)
else:
mtime = None
token = tokenize(
filename_or_obj,
mtime,
group,
decode_cf,
mask_and_scale,
decode_times,
concat_characters,
decode_coords,
engine,
chunks,
drop_variables,
use_cftime,
decode_timedelta,
)
name_prefix = "open_dataset-%s" % token
ds2 = ds.chunk(chunks, name_prefix=name_prefix, token=token)
ds2._file_obj = ds._file_obj
else:
ds2 = ds
return ds2
def open_dataset(
filename_or_obj,
group=None,
decode_cf=True,
mask_and_scale=None,
decode_times=True,
autoclose=None,
concat_characters=True,
decode_coords=True,
engine=None,
chunks=None,
lock=None,
cache=None,
drop_variables=None,
backend_kwargs=None,
use_cftime=None,
decode_timedelta=None,
):
"""Open and decode a dataset from a file or file-like object.
Parameters
----------
filename_or_obj : str, Path, file or xarray.backends.*DataStore
Strings and Path objects are interpreted as a path to a netCDF file
or an OpenDAP URL and opened with python-netCDF4, unless the filename
ends with .gz, in which case the file is gunzipped and opened with
scipy.io.netcdf (only netCDF3 supported). Byte-strings or file-like
objects are opened by scipy.io.netcdf (netCDF3) or h5py (netCDF4/HDF).
group : str, optional
Path to the netCDF4 group in the given file to open (only works for
netCDF4 files).
decode_cf : bool, optional
Whether to decode these variables, assuming they were saved according
to CF conventions.
mask_and_scale : bool, optional
If True, replace array values equal to `_FillValue` with NA and scale
values according to the formula `original_values * scale_factor +
add_offset`, where `_FillValue`, `scale_factor` and `add_offset` are
taken from variable attributes (if they exist). If the `_FillValue` or
`missing_value` attribute contains multiple values a warning will be
issued and all array values matching one of the multiple values will
be replaced by NA. mask_and_scale defaults to True except for the
pseudonetcdf backend.
decode_times : bool, optional
If True, decode times encoded in the standard NetCDF datetime format
into datetime objects. Otherwise, leave them encoded as numbers.
autoclose : bool, optional
If True, automatically close files to avoid OS Error of too many files
being open. However, this option doesn't work with streams, e.g.,
BytesIO.
concat_characters : bool, optional
If True, concatenate along the last dimension of character arrays to
form string arrays. Dimensions will only be concatenated over (and
removed) if they have no corresponding variable and if they are only
used as the last dimension of character arrays.
decode_coords : bool, optional
If True, decode the 'coordinates' attribute to identify coordinates in
the resulting dataset.
engine : {'netcdf4', 'scipy', 'pydap', 'h5netcdf', 'pynio', 'cfgrib', \
'pseudonetcdf'}, optional
Engine to use when reading files. If not provided, the default engine
is chosen based on available dependencies, with a preference for
'netcdf4'.
chunks : int or dict, optional
If chunks is provided, it used to load the new dataset into dask
arrays. ``chunks={}`` loads the dataset with dask using a single
chunk for all arrays.
lock : False or duck threading.Lock, optional
Resource lock to use when reading data from disk. Only relevant when
using dask or another form of parallelism. By default, appropriate
locks are chosen to safely read and write files with the currently
active dask scheduler.
cache : bool, optional
If True, cache data loaded from the underlying datastore in memory as
NumPy arrays when accessed to avoid reading from the underlying data-
store multiple times. Defaults to True unless you specify the `chunks`
argument to use dask, in which case it defaults to False. Does not
change the behavior of coordinates corresponding to dimensions, which
always load their data from disk into a ``pandas.Index``.
drop_variables: string or iterable, optional
A variable or list of variables to exclude from being parsed from the
dataset. This may be useful to drop variables with problems or
inconsistent values.
backend_kwargs: dictionary, optional
A dictionary of keyword arguments to pass on to the backend. This
may be useful when backend options would improve performance or
allow user control of dataset processing.
use_cftime: bool, optional
Only relevant if encoded dates come from a standard calendar
(e.g. 'gregorian', 'proleptic_gregorian', 'standard', or not
specified). If None (default), attempt to decode times to
``np.datetime64[ns]`` objects; if this is not possible, decode times to
``cftime.datetime`` objects. If True, always decode times to
``cftime.datetime`` objects, regardless of whether or not they can be
represented using ``np.datetime64[ns]`` objects. If False, always
decode times to ``np.datetime64[ns]`` objects; if this is not possible
raise an error.
decode_timedelta : bool, optional
If True, decode variables and coordinates with time units in
{'days', 'hours', 'minutes', 'seconds', 'milliseconds', 'microseconds'}
into timedelta objects. If False, leave them encoded as numbers.
If None (default), assume the same value of decode_time.
Returns
-------
dataset : Dataset
The newly created dataset.
Notes
-----
``open_dataset`` opens the file with read-only access. When you modify
values of a Dataset, even one linked to files on disk, only the in-memory
copy you are manipulating in xarray is modified: the original file on disk
is never touched.
See Also
--------
open_mfdataset
"""
engines = [
None,
"netcdf4",
"scipy",
"pydap",
"h5netcdf",
"pynio",
"cfgrib",
"pseudonetcdf",
]
if engine not in engines:
raise ValueError(
"unrecognized engine for open_dataset: {}\n"
"must be one of: {}".format(engine, engines)
)
if autoclose is not None:
warnings.warn(
"The autoclose argument is no longer used by "
"xarray.open_dataset() and is now ignored; it will be removed in "
"a future version of xarray. If necessary, you can control the "
"maximum number of simultaneous open files with "
"xarray.set_options(file_cache_maxsize=...).",
FutureWarning,
stacklevel=2,
)
if mask_and_scale is None:
mask_and_scale = not engine == "pseudonetcdf"
if not decode_cf:
mask_and_scale = False
decode_times = False
concat_characters = False
decode_coords = False
decode_timedelta = False
if cache is None:
cache = chunks is None
if backend_kwargs is None:
backend_kwargs = {}
if isinstance(filename_or_obj, Path):
filename_or_obj = str(filename_or_obj)
if isinstance(filename_or_obj, AbstractDataStore):
store = filename_or_obj
elif isinstance(filename_or_obj, str):
filename_or_obj = _normalize_path(filename_or_obj)
if engine is None:
engine = _get_default_engine(filename_or_obj, allow_remote=True)
if engine == "netcdf4":
store = backends.NetCDF4DataStore.open(
filename_or_obj, group=group, lock=lock, **backend_kwargs
)
elif engine == "scipy":
store = backends.ScipyDataStore(filename_or_obj, **backend_kwargs)
elif engine == "pydap":
store = backends.PydapDataStore.open(filename_or_obj, **backend_kwargs)
elif engine == "h5netcdf":
store = backends.H5NetCDFStore.open(
filename_or_obj, group=group, lock=lock, **backend_kwargs
)
elif engine == "pynio":
store = backends.NioDataStore(filename_or_obj, lock=lock, **backend_kwargs)
elif engine == "pseudonetcdf":
store = backends.PseudoNetCDFDataStore.open(
filename_or_obj, lock=lock, **backend_kwargs
)
elif engine == "cfgrib":
store = backends.CfGribDataStore(
filename_or_obj, lock=lock, **backend_kwargs
)
else:
if engine not in [None, "scipy", "h5netcdf"]:
raise ValueError(
"can only read bytes or file-like objects "
"with engine='scipy' or 'h5netcdf'"
)
engine = _get_engine_from_magic_number(filename_or_obj)
if engine == "scipy":
store = backends.ScipyDataStore(filename_or_obj, **backend_kwargs)
elif engine == "h5netcdf":
store = backends.H5NetCDFStore.open(
filename_or_obj, group=group, lock=lock, **backend_kwargs
)
with close_on_error(store):
ds = maybe_decode_store(store, mask_and_scale, decode_times, concat_characters, decode_coords, drop_variables, use_cftime, decode_timedelta, cache, chunks, filename_or_obj, group, decode_cf, engine)
# Ensure source filename always stored in dataset object (GH issue #2550)
if "source" not in ds.encoding:
if isinstance(filename_or_obj, str):
ds.encoding["source"] = filename_or_obj
return ds
Display the source blob
Display the rendered blob
Raw
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment