Source code for geoips.interfaces.module_based.readers
# # # This source code is subject to the license referenced at
# # # https://github.com/NRLMMD-GEOIPS.
"""Readers interface module."""
import collections
from datetime import datetime
from os.path import basename
import numpy as np
from xarray import concat, Dataset
from geoips.errors import NoValidFilesError
from geoips.interfaces.base import BaseModuleInterface
from geoips.plugins.modules.readers.utils.hrit_reader import HritError
[docs]class ReadersInterface(BaseModuleInterface):
"""Interface for ingesting a specific data type.
Provides specification for ingensting a specific data type, and storing in
the GeoIPS xarray-based internal format.
"""
name = "readers"
required_args = {"standard": ["fnames"]}
required_kwargs = {
"standard": ["metadata_only", "chans", "area_def", "self_register"]
}
[docs] def read_data_to_xarray_dict(
self,
fnames,
read_single_time_func,
metadata_only=False,
chans=None,
area_def=None,
self_register=False,
):
"""Read in data potentially from multiple scan times into an xarray dict.
This function does not require that you provide multiple scan times, but allows
for that in the case those are provided.
Call this with information specific to your reader to generate a dictionary of
xarray datasets created from the data provided in 'fnames'.
Parameters
----------
fnames : list
* List of strings, full paths to files
all_metadata : dict
* Dictionary of metadata from all files in 'fnames'
read_single_time_func : python function
* Function which can be used to read a single scan time of files from a
reader plugin.
* Most likely named 'call_single_time'.
metadata_only : bool, default=False
* Return before actually reading data if True
chans : list of str, default=None
* List of desired channels (skip unneeded variables as needed).
* Include all channels if None.
area_def : pyresample.AreaDefinition, default=None
* Specify region to read
* Read all data if None.
self_register : str or bool, default=False
* register all data to the specified dataset id (as specified in the
return dictionary keys).
* Read multiple resolutions of data if False.
Returns
-------
dict of xarray.Datasets
* dictionary of xarray.Dataset objects with required Variables and
Attributes.
* Dictionary keys can be any descriptive dataset ids.
See Also
--------
:ref:`xarray_standards`
Additional information regarding required attributes and variables
for GeoIPS-formatted xarray Datasets.
"""
# Sort fnames. This should sort them by time and channel more often than not.
fnames = sorted(fnames)
# We really only need to get the start time from all of these files. I wish
# there were an easier and more efficient route, but this should work for the
# time being. Personally, I think it'd be a good idea to add a new argument to
# this method that accepts a function which calculates the start datetime of
# all files provided.
all_file_metadata = []
updated_fnames = []
for x in fnames:
try:
all_file_metadata.append(
read_single_time_func([x], metadata_only=True, chans=chans)[
"METADATA"
]
)
updated_fnames += [x]
except NoValidFilesError:
# If the current file is not valid, just skip this one.
# print(f"{str(e)}: No files found, skipping")
continue
except (ValueError, HritError) as e:
# Value error is raised for 'inconsistent' metadata, or in this case
# No appropriate file for the channels selected
if isinstance(e, ValueError):
st = None
et = None
else:
"""
This occurs from the seviri_hrit reader in 'get_top_level_metadata'
Parse out the start and end datetimes, as this file still could be
Relevant, but is missing 'block_2'. If the set of files all are
missing block_2, or the projection of block_2 is not GEOS, it will
cause an HritError in the for loop before, which doesn't have a try
except statement.
Error Format:
f"Unknown projection encountered: {projection}.\n"
f"start_datetime={st.isoformat()}\n"
f"end_datetime={et.isoformat()}"
"""
emsg = str(e).split("\n")
# Recreate the datetime objects from the isoformat strings provided
st = datetime.fromisoformat(emsg[1].split("=")[1])
et = datetime.fromisoformat(emsg[2].split("=")[1])
# Add st, et as datetimes for the file, nonetheless if they are None
# or a valid datetime
all_file_metadata.append(
Dataset(attrs=dict(start_datetime=st, end_datetime=et))
)
updated_fnames += [x]
self.start_times = [md.attrs["start_datetime"] for md in all_file_metadata]
self.end_times = [md.attrs["end_datetime"] for md in all_file_metadata]
# Get unique start times and end times. We do this by initally creating a set
# from all found start and end times. Keep in mind, some of these times could
# be 'None' if a value error is raised from the reader call function. This
# occurs when none of the selected channels were found in the file provided.
# Meaning, we don't need that file..
# Now, remove 'None' from the set (the files the don't need). This leaves a list
# of unique file times that have been found in datasets that contain the correct
# channels.
self.unique_stimes = list(set(self.start_times).difference(set([None])))
self.unique_etimes = list(set(self.end_times).difference(set([None])))
# Set these values to this class so they can be used downstream for reading
# data from the correct time steps
metadata_by_scan_time = []
for stime in self.unique_stimes:
# Get the indices of all files which match the current start time
same_scan_time_files = [dt == stime for dt in self.start_times]
# Now get the metadata of all of the files which match that time
metadata_by_scan_time.append(
read_single_time_func(
list(np.array(updated_fnames)[same_scan_time_files]),
metadata_only=True,
chans=chans,
)["METADATA"]
)
all_metadata = self.concatenate_metadata(metadata_by_scan_time)
if metadata_only:
return all_metadata
dict_xarrays = self.call_files_and_get_top_level_metadata(
updated_fnames,
all_metadata,
read_single_time_func,
metadata_only,
chans,
area_def,
self_register,
)
return dict_xarrays
[docs] def concatenate_metadata(self, all_metadata):
"""Merge together metadata sourced from a list of files into one dictionary.
Where the structure of the merged metadata is a nested dictionary of metadata.
Ie. (xarray_obj has no data and is merely just a container for metadata):
{"METADATA": xobj.source_file_attributes: {fname: xobj, ..., "fnamex": xobj}}
Parameters
----------
all_metadata: list of xarray.Datasets
- The incoming metadata from X number of files
Returns
-------
md: dict of xarray Datasets
- All metadata merged into a dictionary of xarray Datasets
"""
md = {"METADATA": Dataset()}
for md_idx in range(len(all_metadata)):
# Set required attributes of the top-level metadata when this loop is
# started
if md_idx == 0:
md["METADATA"].attrs = all_metadata[md_idx].attrs
md["METADATA"].attrs["source_file_names"] = []
md["METADATA"].attrs["source_file_attributes"] = {}
md["METADATA"].attrs["source_file_datetimes"] = []
md["METADATA"].attrs["end_datetime"] = all_metadata[-1].end_datetime
# Add to optional attributes of the top-level metadata for each xobj
# provided
md["METADATA"].attrs["source_file_names"] += [
basename(x) for x in all_metadata[md_idx].attrs["source_file_names"]
]
md["METADATA"].attrs["source_file_datetimes"].append(
[
all_metadata[md_idx].start_datetime,
all_metadata[md_idx].end_datetime,
],
)
for x in all_metadata[md_idx].attrs["source_file_names"]:
md["METADATA"].attrs["source_file_attributes"][basename(x)] = (
all_metadata[md_idx]
)
return md
[docs] def call_files_and_get_top_level_metadata(
self,
fnames,
all_metadata,
call_single_file_func,
metadata_only=False,
chans=None,
area_def=None,
self_register=False,
):
"""
Read in data from a list of filenames.
Parameters
----------
fnames : list
* List of strings, full paths to files
all_metadata : dict
* Dictionary of metadata from all files in 'fnames'
read_single_time_func : python function
* Function which can be used to read a single scan time of files from a
reader plugin.
* Most likely named 'call_single_time'.
metadata_only : bool, default=False
* Return before actually reading data if True
chans : list of str, default=None
* List of desired channels (skip unneeded variables as needed).
* Include all channels if None.
area_def : pyresample.AreaDefinition, default=None
* Specify region to read
* Read all data if None.
self_register : str or bool, default=False
* register all data to the specified dataset id (as specified in the
return dictionary keys).
* Read multiple resolutions of data if False.
Returns
-------
dict of xarray.Datasets
* dictionary of xarray.Dataset objects with required Variables and
Attributes.
* Dictionary keys can be any descriptive dataset ids.
See Also
--------
:ref:`xarray_standards`
Additional information regarding required attributes and variables
for GeoIPS-formatted xarray Datasets.
"""
ingested_xarrays = collections.defaultdict(list)
for time in self.unique_stimes:
scan_time_files = [dt == time for dt in self.start_times]
# Call the associated reader for a series of files associated with the same
# scan time
data_dict = call_single_file_func(
list(np.array(fnames)[scan_time_files]),
metadata_only=metadata_only,
chans=chans,
area_def=area_def,
self_register=self_register,
)
for (
dname,
dset,
) in data_dict.items():
ingested_xarrays[dname].append(dset)
if len(self.unique_stimes) == 1:
# No need to stack if we are only reading in one scan time
# This is likely temporary to maintain backwards compatibility
# This is not hit if we are provided multiple scan times.
return data_dict
# Now that we've ingested all scan times, stack along time dimension
metadata = all_metadata["METADATA"]
dict_xarrays = {}
for dname, list_xarrays in ingested_xarrays.items():
if dname == "METADATA":
continue
merged_dset = concat(list_xarrays, dim="time")
merged_dset.attrs["start_datetime"] = min(self.unique_stimes)
merged_dset.attrs["end_datetime"] = max(self.unique_etimes)
merged_dset = merged_dset.assign_coords({"time": self.unique_stimes})
dict_xarrays[dname] = merged_dset
# Override source_file_* attributes with what's set in all_metadata.
dict_xarrays[dname].attrs["source_file_names"] = metadata.attrs[
"source_file_names"
]
dict_xarrays[dname].attrs["source_file_attributes"] = metadata.attrs[
"source_file_attributes"
]
dict_xarrays[dname].attrs["source_file_datetimes"] = metadata.attrs[
"source_file_datetimes"
]
metadata.attrs["start_datetime"] = min(self.unique_stimes)
metadata.attrs["end_datetime"] = max(self.unique_etimes)
dict_xarrays["METADATA"] = metadata
return dict_xarrays
readers = ReadersInterface()