Source code for geoips.commandline.geoips_run
# # # This source code is subject to the license referenced at
# # # https://github.com/NRLMMD-GEOIPS.
"""GeoIPS CLI "run" command.
Runs the appropriate script based on the args provided.
"""
from geoips.commandline.args import add_args
from geoips.commandline.run_procflow import main
from geoips.commandline.geoips_command import GeoipsCommand, GeoipsExecutableCommand
from geoips.utils.context_managers import import_optional_dependencies
data_fusion_installed = False
with import_optional_dependencies(loglevel="info"):
"""Attempt to import data_fusion_args from Data Fusion."""
# NOTE: loglevel is set to 'info' here so we don't get the constant output of
# 'Failed to import data_fusion.commandline at /path/to/geoips/geoips/commandline/geoips_run.py:19. If you need it, install it.' # NOQA
# every time we use the CLI. If a user needs the CLI, I'm assuming they'll know to
# install it.
from data_fusion.commandline.args import add_args as data_fusion_add_args
data_fusion_installed = True
[docs]class GeoipsRunConfigBased(GeoipsExecutableCommand):
"""Run Command for executing the config based process-workflow (procflow)."""
name = "config_based"
command_classes = []
[docs] def add_arguments(self):
"""Add arguments to the run-subparser for the 'run config_based' Command."""
add_args(parser=self.parser, legacy=self.legacy)
def __call__(self, args):
"""Run the provided GeoIPS command.
Parameters
----------
args: Namespace()
- The argument namespace to parse through
"""
if args.procflow is None and self.legacy:
err_str = (
"Deprecated, Legacy 'run_procflow' call was used and --procflow "
"flag wasn't specified. Please either specify which procflow is "
"being executed via '--procflow <procflow_name>' or use the "
"supported procflow call 'geoips run <procflow_name>'"
)
self.parser.error(err_str)
elif args.procflow is None:
# If None, set to 'config_based'. We don't want users to have to specify
# what procflow will be used as it is specified in
# 'geoips run config_based'.
args.procflow = "config_based"
main(ARGS=args)
[docs]class GeoipsRunDataFusion(GeoipsExecutableCommand):
"""Run Command for executing the data fusion process-workflow (procflow)."""
name = "data_fusion"
command_classes = []
[docs] def add_arguments(self):
"""Add arguments to the run-subparser for the 'run data_fusion' Command."""
if data_fusion_installed:
data_fusion_add_args(parser=self.parser, legacy=self.legacy)
def __call__(self, args):
"""Run the provided GeoIPS command.
Parameters
----------
args: Namespace()
- The argument namespace to parse through
"""
if data_fusion_installed:
if args.procflow is None and self.legacy:
err_str = (
"Deprecated, Legacy 'data_fusion_procflow' call was used and "
"--procflow flag wasn't specified. Please either specify which "
"procflow is being executed via '--procflow <procflow_name>' or "
"use the supported procflow call 'geoips run <procflow_name>'"
)
self.parser.error(err_str)
elif args.procflow is None:
# If None, set to 'data_fusion'. We don't want users to have to specify
# what procflow will be used as it is specified in
# 'geoips run data_fusion'.
args.procflow = "data_fusion"
main(ARGS=args)
else:
print(
"Data fusion is not installed. If you want to run this type of "
"functionality, please clone and install "
"https://github.com/NRLMMD-GEOIPS/data_fusion.git"
)
[docs]class GeoipsRunSingleSource(GeoipsExecutableCommand):
"""Run Command for executing the single source process-workflow (procflow)."""
name = "single_source"
command_classes = []
[docs] def add_arguments(self):
"""Add arguments to the run-subparser for the 'run single_source' Command."""
add_args(parser=self.parser, legacy=self.legacy)
def __call__(self, args):
"""Run the provided GeoIPS command.
Parameters
----------
args: Namespace()
- The argument namespace to parse through
"""
if args.procflow is None and self.legacy:
err_str = (
"Deprecated, Legacy 'run_procflow' call was used and --procflow "
"flag wasn't specified. Please either specify which procflow is "
"being executed via '--procflow <procflow_name>' or use the "
"supported procflow call 'geoips run <procflow_name>'"
)
self.parser.error(err_str)
elif args.procflow is None:
# If None, set to 'single_source'. We don't want users to have to specify
# what procflow will be used as it is specified in
# 'geoips run single_source'.
args.procflow = "single_source"
main(ARGS=args)
[docs]class GeoipsRun(GeoipsCommand):
"""Run Command for running process-workflows (procflows)."""
name = "run"
command_classes = [
GeoipsRunSingleSource,
GeoipsRunDataFusion,
GeoipsRunConfigBased,
]