Files
OpenUxAS/infrastructure/run_example.py
M. Anthony Aiello 375c977ee0 Improve run_example.py
Recent updates to linters revealed a number of potential code-quality
issues.

Most were related to the use of f-strings in loggers, which is
discouraged because the string interpolation is greedy (happens even if
that logging level is disabled) and because it makes the job of log
aggregators more difficult (unlikely to matter to us).

There was an instance of use of a "global" that was not intended to be
such. This was spotted by moving the core script-like functionality into
a function, which avoided adding names to the global namespace.

The script is also renamed to conform to python module-naming
conventions.

All warnings and errors are resolved, except the overly broad catching
of Exception, which is explicitly silenced.

Add a workflow for running the HelloWorld example in CI as a simple
sanity check on run-example functionality.
2022-03-24 11:23:03 -04:00

642 lines
18 KiB
Python
Executable File

"""Script to run OpenUxAS examples."""
from __future__ import annotations
import argparse
import logging
import os
import pathlib
import shutil
import subprocess
import sys
import time
import traceback
from typing import TYPE_CHECKING
import yaml
from uxas.paths import (
OPENUXAS_ROOT,
EXAMPLES_DIR,
AMASE_DIR,
UXAS_BIN,
UXAS_ADA_BIN,
SBX_DIR,
)
from uxas.util.logging import (
add_logging_group,
activate_logger,
get_logging_level,
)
if TYPE_CHECKING:
from argparse import Namespace
from typing import Any, Dict, List, Optional, Tuple
# Relativized root
REL_OPENUXAS_ROOT = os.path.relpath(OPENUXAS_ROOT)
# Anod command
ANOD_CMD = os.path.join(REL_OPENUXAS_ROOT, "anod")
# Allow the environment to specify how long we should wait after starting an
# instance of OpenAMASE; default to 0 seconds.
AMASE_DELAY = os.environ.get("AMASE_DELAY", 0)
DESCRIPTION = """
Run OpenUxAS, OpenAMASE or both, using the configuration in the specified
example.
This script is a frontend for both OpenUxAS and OpenAMASE that simplifies
running examples contained in the `examples` directory of the OpenUxAS
repository. You run an example by providing the path to its directory, under
`examples`. For example, run:
run-example 02_Example_WaterwaySearch
to run the Waterways example from a single terminal session.
To get a list of available examples, run:
run-example --list
"""
# Hard-code the expected name of the YAML file the defines an example, for now.
CONFIG_FILE = "config.yaml"
# The key for specifying OpenAMASE configuration.
AMASE_YAML_KEY = "amase"
# The key for specifying an OpenAMASE scenario file.
SCENARIO_YAML_KEY = "scenario"
# The key for specying the delay after OpenAMASE opens.
DELAY_YAML_KEY = "delay"
# The key for specifying a single OpenUxAS configuration.
UXAS_YAML_KEY = "uxas"
# The key for specifying multiple OpenUxAS configurations.
UXASES_YAML_KEY = "uxases"
# The key for specifying an OpenUxAS configuration file.
CONFIG_YAML_KEY = "config"
# The key for specifying an OpenUxAS run directory.
RUNDIR_YAML_KEY = "rundir"
# The key for specifying the name of the OpenUxAS binary.
BIN_YAML_KEY = "bin"
# The default prefix for an OpenUxAS run directory.
RUN_DIR = "RUNDIR"
def read_yaml(yaml_filename: str) -> Dict[str, Any]:
"""Read and parse a YAML file, returning the content as a yaml object."""
with open(yaml_filename, encoding="utf-8") as yaml_file:
loaded_yaml = yaml.safe_load(yaml_file.read())
return loaded_yaml
def resolve_examples_dir(args: Namespace) -> str:
"""Resolve the absolute path to the examples directory."""
if args.examples_dir:
return args.examples_dir
else:
return os.path.join(args.uxas_dir, "examples")
MISSING_AMASE = """\
Before you can run examples that use OpenAMASE, you need to build it. You
should:
%s build amase
"""
UNBUILT_SPECIFIED_AMASE = """\
The OpenAMASE path `%s exists, but hasn't been built. You should:
cd "%s/OpenAMASE" && ant
You may need to put ant on your path first, like this:
eval "$( %s printenv ant )"
"""
UNBUILT_LOCAL_AMASE = """\
There is an OpenAMASE in %s, but it hasn't been built. If you
want to use this version of OpenAMASE, you should:
cd "%s/OpenAMASE" && ant
You may need to put ant on your path first, like this:
eval "$( %s printenv ant )"
Trying the anod-built OpenAMASE as a fall back.
"""
def check_amase_dir(path: str) -> bool:
"""Test to make sure a path has the OpenUxAS build."""
return os.path.exists(os.path.join(path, "OpenAMASE", "build"))
def resolve_amase_dir(args: Namespace) -> str:
"""
Resolve the absolute path to the OpenAMASE source directory.
1. if we've been given an absolute path in the arguments, check and use
that.
2. see if there's a local OpenAMASE, check and use that.
3. try to use the anod-built OpenAMASE.
If the OpenAMASE directory exists but doesn't appear to be built,
immediately exit.
"""
if args.amase_dir:
if check_amase_dir(args.amase_dir):
return args.amase_dir
else:
logging.critical(
UNBUILT_SPECIFIED_AMASE, args.amase_dir, args.amase_dir, ANOD_CMD
)
sys.exit(1)
if os.path.exists(AMASE_DIR):
if check_amase_dir(AMASE_DIR):
return AMASE_DIR
else:
logging.warning(UNBUILT_LOCAL_AMASE, AMASE_DIR, AMASE_DIR, ANOD_CMD)
anod_amase_dir = os.path.join(SBX_DIR, "x86_64-linux", "amase", "src")
if os.path.exists(anod_amase_dir) and check_amase_dir(anod_amase_dir):
return anod_amase_dir
else:
logging.critical(MISSING_AMASE, ANOD_CMD)
sys.exit(1)
def list_examples(examples_dir: str) -> None:
"""List all of the examples that have a configuration file."""
full_paths = list()
for (dirpath, _, filenames) in os.walk(examples_dir):
if CONFIG_FILE in filenames:
full_paths += [dirpath]
short_paths = [os.path.relpath(path, examples_dir) for path in full_paths]
short_paths.sort()
for path in short_paths:
print(f" {path}")
def check_amase(
loaded_yaml: Dict[str, Any], example_dir: str, args: Namespace
) -> Tuple[Optional[str], int]:
"""
Check the OpenAMASE configuration in the YAML and return the scenario file.
If any errors are encountered, report them and immediately exit.
"""
if AMASE_YAML_KEY not in loaded_yaml.keys():
return (None, 0)
if SCENARIO_YAML_KEY not in loaded_yaml[AMASE_YAML_KEY].keys():
logging.critical("OpenAMASE configuration must specify a scenario file.")
sys.exit(1)
scenario_file = loaded_yaml[AMASE_YAML_KEY][SCENARIO_YAML_KEY]
if not os.path.exists(os.path.join(example_dir, scenario_file)):
logging.critical("Specified scenario file '%s' does not exist.", scenario_file)
sys.exit(1)
if args.amase_delay is not None:
amase_delay = args.amase_delay
elif DELAY_YAML_KEY in loaded_yaml[AMASE_YAML_KEY].keys():
amase_delay = int(loaded_yaml[AMASE_YAML_KEY][DELAY_YAML_KEY])
else:
amase_delay = AMASE_DELAY
return (scenario_file, amase_delay)
def run_amase(scenario_file: str, example_dir: str, amase_dir: str) -> subprocess.Popen:
"""Run the OpenAMASE part of the example."""
amase_cmd = [
"java",
"-Xmx2048m",
f"-splash:{os.path.join('data', 'amase_splash.png')}",
"-classpath",
f"{os.path.join('dist', '*')}:{os.path.join('lib', '*')}",
"avtas.app.Application",
"--config",
os.path.join("config", "amase"),
"--scenario",
os.path.join(example_dir, scenario_file),
]
logging.info(
"Running OpenAMASE in\n %s\n" " with scenario '%s'.",
amase_dir,
scenario_file,
)
logging.debug(
"Run: cd %s; %s",
os.path.join(amase_dir, "OpenAMASE"),
" ".join(amase_cmd),
)
return subprocess.Popen(amase_cmd, cwd=os.path.join(amase_dir, "OpenAMASE"))
def check_uxas(loaded_yaml: Dict[str, Any], example_dir: str) -> List[Dict[str, str]]:
"""
Check the OpenUxAS configuration in the YAML and return a list of configs.
Calls `check_one_uxas` and thus may immediately exit.
"""
uxas_configs = list()
if UXASES_YAML_KEY in loaded_yaml.keys():
for record in loaded_yaml[UXASES_YAML_KEY]:
uxas_configs += [check_one_uxas(record, example_dir)]
elif UXAS_YAML_KEY in loaded_yaml.keys():
uxas_configs += [check_one_uxas(loaded_yaml[UXAS_YAML_KEY], example_dir)]
return uxas_configs
def find_uxas_bin(bin_name: str) -> Optional[str]:
"""
Attempt to find the path the given binary.
Look for the binary in this order:
1. on the user's path
2. in the expected locally-built location
3. in the expected release anod directory
Note that items 2 and 3 are language specific.
"""
if shutil.which(bin_name) is not None:
return shutil.which(bin_name)
if bin_name == "uxas":
local_bin_path = UXAS_BIN
anod_bin_path = os.path.join(
SBX_DIR, "x86_64-linux", "uxas-release", "install", "bin", "uxas"
)
elif bin_name == "uxas-ada":
local_bin_path = UXAS_ADA_BIN
anod_bin_path = os.path.join(
SBX_DIR, "x86_64-linux", "uxas-ada-release", "install", "bin", "uxas-ada"
)
else:
# We don't know how to handle this language
return None
if os.path.exists(local_bin_path):
return local_bin_path
elif os.path.exists(anod_bin_path):
return anod_bin_path
else:
return None
MISSING_BIN = """\
The command `%s` cannot be found on your path. Either specify the absolute
path to the desired OpenUXAS binary in the config file (*not recommended*),
manually add the desired OpenUxAS binary to your path, perfom a local build of
the binary (e.g., for C++, `make -j all`) or use anod to build the desired
version of OpenUxAS with, e.g., for C++:
%s build %s
"""
def check_one_uxas(record: Dict[str, str], example_dir: str) -> Dict[str, str]:
"""
Check one OpenUxAS configuration from the YAML and return the config.
If any errors are found, print a message and immediately exit.
"""
if CONFIG_YAML_KEY not in record.keys():
logging.critical("OpenUxAS configuration must specify a config file.")
sys.exit(1)
config_file = record[CONFIG_YAML_KEY]
if not os.path.exists(os.path.join(example_dir, config_file)):
logging.critical("Specified config file '%s' does not exist.", config_file)
sys.exit(1)
run_dir_name = RUN_DIR
if RUNDIR_YAML_KEY in record.keys():
run_dir_name = record[RUNDIR_YAML_KEY]
if BIN_YAML_KEY in record.keys():
bin_name = record[BIN_YAML_KEY]
else:
bin_name = "uxas"
uxas_binary = find_uxas_bin(bin_name)
if uxas_binary is None:
logging.critical(MISSING_BIN, bin_name, ANOD_CMD, bin_name)
sys.exit(1)
return {
"config_file": config_file,
"run_dir_name": run_dir_name,
"bin": uxas_binary,
}
def run_uxas(
uxas_configs: List[Dict[str, str]], example_dir: str, popen: bool
) -> List[subprocess.Popen]:
"""Run an OpenUxAS instance for each configuration."""
pids = []
for config in uxas_configs:
pid = run_one_uxas(config, example_dir, popen)
if pid is not None:
pids.append(pid)
if popen:
return pids
else:
return []
def run_one_uxas(
uxas_config: Dict[str, str], example_dir: str, popen: bool
) -> Optional[subprocess.Popen]:
"""Run one OpenUxAS instance."""
config_file = uxas_config["config_file"]
run_dir = uxas_config["run_dir_name"]
uxas_bin = uxas_config["bin"]
uxas_cmd = [uxas_bin, "-cfgPath", os.path.join(example_dir, config_file)]
run_dir = os.path.join(example_dir, run_dir)
pathlib.Path(run_dir).mkdir(parents=True, exist_ok=True)
if popen:
logging.info(
"Running OpenUxAS binary\n"
" %s\n"
" in a separate process with configuration\n"
" %s\n"
" Data and logfiles are in:\n"
" %s",
uxas_bin,
config_file,
run_dir,
)
logging.debug("Run: cd %s; %s", run_dir, " ".join(uxas_cmd))
return subprocess.Popen(uxas_cmd, cwd=run_dir)
else:
logging.info(
"Running OpenUxAS binary\n"
" %s\n"
" with configuration\n"
" %s\n"
" Data and logfiles are in:\n"
" %s",
uxas_bin,
config_file,
run_dir,
)
logging.debug("Run: cd %s; %s", run_dir, " ".join(uxas_cmd))
subprocess.run(uxas_cmd, cwd=run_dir, check=True)
return None
def killall_uxases(popen: bool, pids: List[subprocess.Popen]) -> None:
"""Gracefully quit any instances of OpenUxAS that we started."""
# We only worry about killing off UxAS instances if we Popened them
if popen and len(pids) != 0:
logging.info("Shutting down forked OpenUxAS processes.")
for pid in pids:
pid.terminate()
# Seems as though we may need to wait a moment before
# polling.
time.sleep(0.1)
if pid.poll() is None:
pid.kill()
# Here especially I've seen "Unable to kill ..." when
# the process was actually killed, so we wait a moment.
time.sleep(0.1)
if pid.poll() is None:
logging.error(" Unable to kill PID %s.", pid.pid)
# From
# https://stackoverflow.com/questions/34352405/python-argparse-help-like-option
class _ListAction(argparse.Action):
def __init__(
self,
option_strings,
dest=argparse.SUPPRESS,
default=argparse.SUPPRESS,
help=None, # (needed for superclass) pylint: disable=redefined-builtin
):
super().__init__(
option_strings=option_strings,
dest=dest,
default=default,
nargs=0,
help=help,
)
def __call__(self, parser, namespace, values, option_string=None):
# namespace contains arguments parsed thus far
examples_dir = resolve_examples_dir(namespace)
print(f"In '{examples_dir}' the following examples are available:\n")
list_examples(examples_dir)
# Note that this terminates the program.
parser.exit()
class _CompleteOptionsAction(argparse.Action):
def __init__(
self,
option_strings,
dest=argparse.SUPPRESS,
default=argparse.SUPPRESS,
help=None, # (needed for superclass) pylint: disable=redefined-builtin
):
super().__init__(
option_strings=option_strings,
dest=dest,
default=default,
nargs=0,
help=help,
)
def __call__(self, parser, namespace, values, option_string=None):
# namespace contains arguments parsed thus far
list_examples(resolve_examples_dir(namespace))
# Note that this terminates the program.
parser.exit()
def run_example_main() -> int:
"""Run main processing of run-example."""
argument_parser = argparse.ArgumentParser(
formatter_class=argparse.RawDescriptionHelpFormatter,
description=DESCRIPTION,
)
argument_parser.add_argument(
"--delay",
dest="amase_delay",
type=int,
help="number of seconds to wait after starting OpenAMASE"
"before starting instances of OpenUxAS",
)
argument_parser.add_argument(
"--amase-dir",
help="absolute path to the OpenAMASE repository containing build outputs",
)
argument_parser.add_argument(
"--uxas-bin",
help="absolute path to the OpenUxAS binary",
)
argument_parser.add_argument(
"--uxas-dir",
default=OPENUXAS_ROOT,
help="absolute path to the OpenUxAS repository containing build outputs",
)
argument_parser.add_argument(
"--examples-dir",
help="absolute path to the root of the examples",
)
argument_parser.add_argument(
"-l",
"--list",
dest="list_examples",
default=False,
action=_ListAction,
help="list known examples. Subsequent arguments are ignored, so put "
"after `--examples-dir` if it is used.",
)
# This is only for driving autocomplete
argument_parser.add_argument(
"--complete-options",
default=False,
action=_CompleteOptionsAction,
help=argparse.SUPPRESS,
)
argument_parser.add_argument(
"example",
help="the example directory",
)
add_logging_group(argument_parser)
args = argument_parser.parse_args()
activate_logger(args, get_logging_level(args))
# For KeyboardInterrupt handling
popen = False
pids = list()
try:
examples_dir = resolve_examples_dir(args)
# Allow the user to specify a complete absolute or relative path to
# the example, rather than relying on the example search path. The
# advantage to the user here is that they can leverage autocomplete
if os.path.isabs(args.example):
example_dir = args.example
else:
example_dir = os.path.join(EXAMPLES_DIR, args.example)
logging.info("Running example in\n %s", example_dir)
if not os.path.exists(example_dir):
example_dir = os.path.join(examples_dir, args.example)
if not os.path.exists(example_dir):
logging.critical(
"Example '%s' does not exist in\n %s\n"
" Use the `--list` option for a list of available examples.",
args.example,
examples_dir,
)
return 1
yaml_filename = os.path.join(example_dir, CONFIG_FILE)
if not os.path.exists(yaml_filename):
logging.critical(
"Example '%s' is not property configured.\n"
" There is no '%s' in the example directory.\n"
" Use the `--list` option for a list of available examples.",
args.example,
CONFIG_FILE,
)
return 1
loaded_yaml = read_yaml(yaml_filename)
amase_dir = resolve_amase_dir(args)
(scenario_file, amase_delay) = check_amase(loaded_yaml, example_dir, args)
uxas_configs = check_uxas(loaded_yaml, example_dir)
if scenario_file:
amase_pid = run_amase(scenario_file, example_dir, amase_dir)
if amase_delay > 0:
print(
f"Waiting for {amase_delay} seconds while AMASE starts; "
"press 'Play' as soon as it does."
)
time.sleep(amase_delay)
popen = scenario_file is not None
pids = run_uxas(uxas_configs, example_dir, popen)
if scenario_file:
# Wait for the user to close AMASE
amase_pid.wait()
killall_uxases(popen, pids)
return 0
except KeyboardInterrupt:
print(" ")
killall_uxases(popen, pids)
return 2
except Exception as exception: # pylint: disable=broad-except
logging.critical("Got an exception %s", exception)
traceback.print_exc()
argument_parser.print_usage()
return 3
# Script processing.
if __name__ == "__main__":
sys.exit(run_example_main())