mirror of
https://github.com/AdaCore/OpenUxAS.git
synced 2026-02-12 13:07:16 -08:00
Recent updates to linters revealed a number of potential code-quality issues. Most were related to the use of f-strings in loggers, which is discouraged because the string interpolation is greedy (happens even if that logging level is disabled) and because it makes the job of log aggregators more difficult (unlikely to matter to us). There was an instance of use of a "global" that was not intended to be such. This was spotted by moving the core script-like functionality into a function, which avoided adding names to the global namespace. The script is also renamed to conform to python module-naming conventions. All warnings and errors are resolved, except the overly broad catching of Exception, which is explicitly silenced. Add a workflow for running the HelloWorld example in CI as a simple sanity check on run-example functionality.
642 lines
18 KiB
Python
Executable File
642 lines
18 KiB
Python
Executable File
"""Script to run OpenUxAS examples."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import logging
|
|
import os
|
|
import pathlib
|
|
import shutil
|
|
import subprocess
|
|
import sys
|
|
import time
|
|
import traceback
|
|
from typing import TYPE_CHECKING
|
|
import yaml
|
|
|
|
from uxas.paths import (
|
|
OPENUXAS_ROOT,
|
|
EXAMPLES_DIR,
|
|
AMASE_DIR,
|
|
UXAS_BIN,
|
|
UXAS_ADA_BIN,
|
|
SBX_DIR,
|
|
)
|
|
|
|
from uxas.util.logging import (
|
|
add_logging_group,
|
|
activate_logger,
|
|
get_logging_level,
|
|
)
|
|
|
|
if TYPE_CHECKING:
|
|
from argparse import Namespace
|
|
from typing import Any, Dict, List, Optional, Tuple
|
|
|
|
|
|
# Relativized root
|
|
REL_OPENUXAS_ROOT = os.path.relpath(OPENUXAS_ROOT)
|
|
|
|
# Anod command
|
|
ANOD_CMD = os.path.join(REL_OPENUXAS_ROOT, "anod")
|
|
|
|
|
|
# Allow the environment to specify how long we should wait after starting an
|
|
# instance of OpenAMASE; default to 0 seconds.
|
|
AMASE_DELAY = os.environ.get("AMASE_DELAY", 0)
|
|
|
|
DESCRIPTION = """
|
|
Run OpenUxAS, OpenAMASE or both, using the configuration in the specified
|
|
example.
|
|
|
|
This script is a frontend for both OpenUxAS and OpenAMASE that simplifies
|
|
running examples contained in the `examples` directory of the OpenUxAS
|
|
repository. You run an example by providing the path to its directory, under
|
|
`examples`. For example, run:
|
|
|
|
run-example 02_Example_WaterwaySearch
|
|
|
|
to run the Waterways example from a single terminal session.
|
|
|
|
To get a list of available examples, run:
|
|
|
|
run-example --list
|
|
"""
|
|
|
|
# Hard-code the expected name of the YAML file the defines an example, for now.
|
|
CONFIG_FILE = "config.yaml"
|
|
|
|
# The key for specifying OpenAMASE configuration.
|
|
AMASE_YAML_KEY = "amase"
|
|
|
|
# The key for specifying an OpenAMASE scenario file.
|
|
SCENARIO_YAML_KEY = "scenario"
|
|
|
|
# The key for specying the delay after OpenAMASE opens.
|
|
DELAY_YAML_KEY = "delay"
|
|
|
|
# The key for specifying a single OpenUxAS configuration.
|
|
UXAS_YAML_KEY = "uxas"
|
|
|
|
# The key for specifying multiple OpenUxAS configurations.
|
|
UXASES_YAML_KEY = "uxases"
|
|
|
|
# The key for specifying an OpenUxAS configuration file.
|
|
CONFIG_YAML_KEY = "config"
|
|
|
|
# The key for specifying an OpenUxAS run directory.
|
|
RUNDIR_YAML_KEY = "rundir"
|
|
|
|
# The key for specifying the name of the OpenUxAS binary.
|
|
BIN_YAML_KEY = "bin"
|
|
|
|
# The default prefix for an OpenUxAS run directory.
|
|
RUN_DIR = "RUNDIR"
|
|
|
|
|
|
def read_yaml(yaml_filename: str) -> Dict[str, Any]:
|
|
"""Read and parse a YAML file, returning the content as a yaml object."""
|
|
with open(yaml_filename, encoding="utf-8") as yaml_file:
|
|
loaded_yaml = yaml.safe_load(yaml_file.read())
|
|
|
|
return loaded_yaml
|
|
|
|
|
|
def resolve_examples_dir(args: Namespace) -> str:
|
|
"""Resolve the absolute path to the examples directory."""
|
|
if args.examples_dir:
|
|
return args.examples_dir
|
|
else:
|
|
return os.path.join(args.uxas_dir, "examples")
|
|
|
|
|
|
MISSING_AMASE = """\
|
|
Before you can run examples that use OpenAMASE, you need to build it. You
|
|
should:
|
|
|
|
%s build amase
|
|
"""
|
|
|
|
UNBUILT_SPECIFIED_AMASE = """\
|
|
The OpenAMASE path `%s exists, but hasn't been built. You should:
|
|
|
|
cd "%s/OpenAMASE" && ant
|
|
|
|
You may need to put ant on your path first, like this:
|
|
|
|
eval "$( %s printenv ant )"
|
|
"""
|
|
|
|
|
|
UNBUILT_LOCAL_AMASE = """\
|
|
There is an OpenAMASE in %s, but it hasn't been built. If you
|
|
want to use this version of OpenAMASE, you should:
|
|
|
|
cd "%s/OpenAMASE" && ant
|
|
|
|
You may need to put ant on your path first, like this:
|
|
|
|
eval "$( %s printenv ant )"
|
|
|
|
Trying the anod-built OpenAMASE as a fall back.
|
|
"""
|
|
|
|
|
|
def check_amase_dir(path: str) -> bool:
|
|
"""Test to make sure a path has the OpenUxAS build."""
|
|
return os.path.exists(os.path.join(path, "OpenAMASE", "build"))
|
|
|
|
|
|
def resolve_amase_dir(args: Namespace) -> str:
|
|
"""
|
|
Resolve the absolute path to the OpenAMASE source directory.
|
|
|
|
1. if we've been given an absolute path in the arguments, check and use
|
|
that.
|
|
2. see if there's a local OpenAMASE, check and use that.
|
|
3. try to use the anod-built OpenAMASE.
|
|
|
|
If the OpenAMASE directory exists but doesn't appear to be built,
|
|
immediately exit.
|
|
"""
|
|
if args.amase_dir:
|
|
if check_amase_dir(args.amase_dir):
|
|
return args.amase_dir
|
|
else:
|
|
logging.critical(
|
|
UNBUILT_SPECIFIED_AMASE, args.amase_dir, args.amase_dir, ANOD_CMD
|
|
)
|
|
sys.exit(1)
|
|
|
|
if os.path.exists(AMASE_DIR):
|
|
if check_amase_dir(AMASE_DIR):
|
|
return AMASE_DIR
|
|
else:
|
|
logging.warning(UNBUILT_LOCAL_AMASE, AMASE_DIR, AMASE_DIR, ANOD_CMD)
|
|
|
|
anod_amase_dir = os.path.join(SBX_DIR, "x86_64-linux", "amase", "src")
|
|
if os.path.exists(anod_amase_dir) and check_amase_dir(anod_amase_dir):
|
|
return anod_amase_dir
|
|
else:
|
|
logging.critical(MISSING_AMASE, ANOD_CMD)
|
|
sys.exit(1)
|
|
|
|
|
|
def list_examples(examples_dir: str) -> None:
|
|
"""List all of the examples that have a configuration file."""
|
|
full_paths = list()
|
|
for (dirpath, _, filenames) in os.walk(examples_dir):
|
|
if CONFIG_FILE in filenames:
|
|
full_paths += [dirpath]
|
|
|
|
short_paths = [os.path.relpath(path, examples_dir) for path in full_paths]
|
|
short_paths.sort()
|
|
|
|
for path in short_paths:
|
|
print(f" {path}")
|
|
|
|
|
|
def check_amase(
|
|
loaded_yaml: Dict[str, Any], example_dir: str, args: Namespace
|
|
) -> Tuple[Optional[str], int]:
|
|
"""
|
|
Check the OpenAMASE configuration in the YAML and return the scenario file.
|
|
|
|
If any errors are encountered, report them and immediately exit.
|
|
"""
|
|
if AMASE_YAML_KEY not in loaded_yaml.keys():
|
|
return (None, 0)
|
|
|
|
if SCENARIO_YAML_KEY not in loaded_yaml[AMASE_YAML_KEY].keys():
|
|
logging.critical("OpenAMASE configuration must specify a scenario file.")
|
|
sys.exit(1)
|
|
|
|
scenario_file = loaded_yaml[AMASE_YAML_KEY][SCENARIO_YAML_KEY]
|
|
if not os.path.exists(os.path.join(example_dir, scenario_file)):
|
|
logging.critical("Specified scenario file '%s' does not exist.", scenario_file)
|
|
sys.exit(1)
|
|
|
|
if args.amase_delay is not None:
|
|
amase_delay = args.amase_delay
|
|
elif DELAY_YAML_KEY in loaded_yaml[AMASE_YAML_KEY].keys():
|
|
amase_delay = int(loaded_yaml[AMASE_YAML_KEY][DELAY_YAML_KEY])
|
|
else:
|
|
amase_delay = AMASE_DELAY
|
|
|
|
return (scenario_file, amase_delay)
|
|
|
|
|
|
def run_amase(scenario_file: str, example_dir: str, amase_dir: str) -> subprocess.Popen:
|
|
"""Run the OpenAMASE part of the example."""
|
|
amase_cmd = [
|
|
"java",
|
|
"-Xmx2048m",
|
|
f"-splash:{os.path.join('data', 'amase_splash.png')}",
|
|
"-classpath",
|
|
f"{os.path.join('dist', '*')}:{os.path.join('lib', '*')}",
|
|
"avtas.app.Application",
|
|
"--config",
|
|
os.path.join("config", "amase"),
|
|
"--scenario",
|
|
os.path.join(example_dir, scenario_file),
|
|
]
|
|
|
|
logging.info(
|
|
"Running OpenAMASE in\n %s\n" " with scenario '%s'.",
|
|
amase_dir,
|
|
scenario_file,
|
|
)
|
|
logging.debug(
|
|
"Run: cd %s; %s",
|
|
os.path.join(amase_dir, "OpenAMASE"),
|
|
" ".join(amase_cmd),
|
|
)
|
|
|
|
return subprocess.Popen(amase_cmd, cwd=os.path.join(amase_dir, "OpenAMASE"))
|
|
|
|
|
|
def check_uxas(loaded_yaml: Dict[str, Any], example_dir: str) -> List[Dict[str, str]]:
|
|
"""
|
|
Check the OpenUxAS configuration in the YAML and return a list of configs.
|
|
|
|
Calls `check_one_uxas` and thus may immediately exit.
|
|
"""
|
|
uxas_configs = list()
|
|
|
|
if UXASES_YAML_KEY in loaded_yaml.keys():
|
|
for record in loaded_yaml[UXASES_YAML_KEY]:
|
|
uxas_configs += [check_one_uxas(record, example_dir)]
|
|
elif UXAS_YAML_KEY in loaded_yaml.keys():
|
|
uxas_configs += [check_one_uxas(loaded_yaml[UXAS_YAML_KEY], example_dir)]
|
|
|
|
return uxas_configs
|
|
|
|
|
|
def find_uxas_bin(bin_name: str) -> Optional[str]:
|
|
"""
|
|
Attempt to find the path the given binary.
|
|
|
|
Look for the binary in this order:
|
|
1. on the user's path
|
|
2. in the expected locally-built location
|
|
3. in the expected release anod directory
|
|
|
|
Note that items 2 and 3 are language specific.
|
|
"""
|
|
if shutil.which(bin_name) is not None:
|
|
return shutil.which(bin_name)
|
|
|
|
if bin_name == "uxas":
|
|
local_bin_path = UXAS_BIN
|
|
anod_bin_path = os.path.join(
|
|
SBX_DIR, "x86_64-linux", "uxas-release", "install", "bin", "uxas"
|
|
)
|
|
elif bin_name == "uxas-ada":
|
|
local_bin_path = UXAS_ADA_BIN
|
|
anod_bin_path = os.path.join(
|
|
SBX_DIR, "x86_64-linux", "uxas-ada-release", "install", "bin", "uxas-ada"
|
|
)
|
|
else:
|
|
# We don't know how to handle this language
|
|
return None
|
|
|
|
if os.path.exists(local_bin_path):
|
|
return local_bin_path
|
|
elif os.path.exists(anod_bin_path):
|
|
return anod_bin_path
|
|
else:
|
|
return None
|
|
|
|
|
|
MISSING_BIN = """\
|
|
The command `%s` cannot be found on your path. Either specify the absolute
|
|
path to the desired OpenUXAS binary in the config file (*not recommended*),
|
|
manually add the desired OpenUxAS binary to your path, perfom a local build of
|
|
the binary (e.g., for C++, `make -j all`) or use anod to build the desired
|
|
version of OpenUxAS with, e.g., for C++:
|
|
|
|
%s build %s
|
|
"""
|
|
|
|
|
|
def check_one_uxas(record: Dict[str, str], example_dir: str) -> Dict[str, str]:
|
|
"""
|
|
Check one OpenUxAS configuration from the YAML and return the config.
|
|
|
|
If any errors are found, print a message and immediately exit.
|
|
"""
|
|
if CONFIG_YAML_KEY not in record.keys():
|
|
logging.critical("OpenUxAS configuration must specify a config file.")
|
|
sys.exit(1)
|
|
|
|
config_file = record[CONFIG_YAML_KEY]
|
|
if not os.path.exists(os.path.join(example_dir, config_file)):
|
|
logging.critical("Specified config file '%s' does not exist.", config_file)
|
|
sys.exit(1)
|
|
|
|
run_dir_name = RUN_DIR
|
|
if RUNDIR_YAML_KEY in record.keys():
|
|
run_dir_name = record[RUNDIR_YAML_KEY]
|
|
|
|
if BIN_YAML_KEY in record.keys():
|
|
bin_name = record[BIN_YAML_KEY]
|
|
else:
|
|
bin_name = "uxas"
|
|
|
|
uxas_binary = find_uxas_bin(bin_name)
|
|
if uxas_binary is None:
|
|
logging.critical(MISSING_BIN, bin_name, ANOD_CMD, bin_name)
|
|
sys.exit(1)
|
|
|
|
return {
|
|
"config_file": config_file,
|
|
"run_dir_name": run_dir_name,
|
|
"bin": uxas_binary,
|
|
}
|
|
|
|
|
|
def run_uxas(
|
|
uxas_configs: List[Dict[str, str]], example_dir: str, popen: bool
|
|
) -> List[subprocess.Popen]:
|
|
"""Run an OpenUxAS instance for each configuration."""
|
|
pids = []
|
|
for config in uxas_configs:
|
|
pid = run_one_uxas(config, example_dir, popen)
|
|
if pid is not None:
|
|
pids.append(pid)
|
|
|
|
if popen:
|
|
return pids
|
|
else:
|
|
return []
|
|
|
|
|
|
def run_one_uxas(
|
|
uxas_config: Dict[str, str], example_dir: str, popen: bool
|
|
) -> Optional[subprocess.Popen]:
|
|
"""Run one OpenUxAS instance."""
|
|
config_file = uxas_config["config_file"]
|
|
run_dir = uxas_config["run_dir_name"]
|
|
uxas_bin = uxas_config["bin"]
|
|
|
|
uxas_cmd = [uxas_bin, "-cfgPath", os.path.join(example_dir, config_file)]
|
|
|
|
run_dir = os.path.join(example_dir, run_dir)
|
|
pathlib.Path(run_dir).mkdir(parents=True, exist_ok=True)
|
|
|
|
if popen:
|
|
logging.info(
|
|
"Running OpenUxAS binary\n"
|
|
" %s\n"
|
|
" in a separate process with configuration\n"
|
|
" %s\n"
|
|
" Data and logfiles are in:\n"
|
|
" %s",
|
|
uxas_bin,
|
|
config_file,
|
|
run_dir,
|
|
)
|
|
logging.debug("Run: cd %s; %s", run_dir, " ".join(uxas_cmd))
|
|
|
|
return subprocess.Popen(uxas_cmd, cwd=run_dir)
|
|
else:
|
|
logging.info(
|
|
"Running OpenUxAS binary\n"
|
|
" %s\n"
|
|
" with configuration\n"
|
|
" %s\n"
|
|
" Data and logfiles are in:\n"
|
|
" %s",
|
|
uxas_bin,
|
|
config_file,
|
|
run_dir,
|
|
)
|
|
logging.debug("Run: cd %s; %s", run_dir, " ".join(uxas_cmd))
|
|
|
|
subprocess.run(uxas_cmd, cwd=run_dir, check=True)
|
|
return None
|
|
|
|
|
|
def killall_uxases(popen: bool, pids: List[subprocess.Popen]) -> None:
|
|
"""Gracefully quit any instances of OpenUxAS that we started."""
|
|
# We only worry about killing off UxAS instances if we Popened them
|
|
if popen and len(pids) != 0:
|
|
logging.info("Shutting down forked OpenUxAS processes.")
|
|
for pid in pids:
|
|
pid.terminate()
|
|
|
|
# Seems as though we may need to wait a moment before
|
|
# polling.
|
|
time.sleep(0.1)
|
|
if pid.poll() is None:
|
|
pid.kill()
|
|
|
|
# Here especially I've seen "Unable to kill ..." when
|
|
# the process was actually killed, so we wait a moment.
|
|
time.sleep(0.1)
|
|
if pid.poll() is None:
|
|
logging.error(" Unable to kill PID %s.", pid.pid)
|
|
|
|
|
|
# From
|
|
# https://stackoverflow.com/questions/34352405/python-argparse-help-like-option
|
|
class _ListAction(argparse.Action):
|
|
def __init__(
|
|
self,
|
|
option_strings,
|
|
dest=argparse.SUPPRESS,
|
|
default=argparse.SUPPRESS,
|
|
help=None, # (needed for superclass) pylint: disable=redefined-builtin
|
|
):
|
|
super().__init__(
|
|
option_strings=option_strings,
|
|
dest=dest,
|
|
default=default,
|
|
nargs=0,
|
|
help=help,
|
|
)
|
|
|
|
def __call__(self, parser, namespace, values, option_string=None):
|
|
# namespace contains arguments parsed thus far
|
|
examples_dir = resolve_examples_dir(namespace)
|
|
|
|
print(f"In '{examples_dir}' the following examples are available:\n")
|
|
|
|
list_examples(examples_dir)
|
|
|
|
# Note that this terminates the program.
|
|
parser.exit()
|
|
|
|
|
|
class _CompleteOptionsAction(argparse.Action):
|
|
def __init__(
|
|
self,
|
|
option_strings,
|
|
dest=argparse.SUPPRESS,
|
|
default=argparse.SUPPRESS,
|
|
help=None, # (needed for superclass) pylint: disable=redefined-builtin
|
|
):
|
|
super().__init__(
|
|
option_strings=option_strings,
|
|
dest=dest,
|
|
default=default,
|
|
nargs=0,
|
|
help=help,
|
|
)
|
|
|
|
def __call__(self, parser, namespace, values, option_string=None):
|
|
# namespace contains arguments parsed thus far
|
|
list_examples(resolve_examples_dir(namespace))
|
|
|
|
# Note that this terminates the program.
|
|
parser.exit()
|
|
|
|
|
|
def run_example_main() -> int:
|
|
"""Run main processing of run-example."""
|
|
argument_parser = argparse.ArgumentParser(
|
|
formatter_class=argparse.RawDescriptionHelpFormatter,
|
|
description=DESCRIPTION,
|
|
)
|
|
|
|
argument_parser.add_argument(
|
|
"--delay",
|
|
dest="amase_delay",
|
|
type=int,
|
|
help="number of seconds to wait after starting OpenAMASE"
|
|
"before starting instances of OpenUxAS",
|
|
)
|
|
|
|
argument_parser.add_argument(
|
|
"--amase-dir",
|
|
help="absolute path to the OpenAMASE repository containing build outputs",
|
|
)
|
|
|
|
argument_parser.add_argument(
|
|
"--uxas-bin",
|
|
help="absolute path to the OpenUxAS binary",
|
|
)
|
|
|
|
argument_parser.add_argument(
|
|
"--uxas-dir",
|
|
default=OPENUXAS_ROOT,
|
|
help="absolute path to the OpenUxAS repository containing build outputs",
|
|
)
|
|
|
|
argument_parser.add_argument(
|
|
"--examples-dir",
|
|
help="absolute path to the root of the examples",
|
|
)
|
|
|
|
argument_parser.add_argument(
|
|
"-l",
|
|
"--list",
|
|
dest="list_examples",
|
|
default=False,
|
|
action=_ListAction,
|
|
help="list known examples. Subsequent arguments are ignored, so put "
|
|
"after `--examples-dir` if it is used.",
|
|
)
|
|
|
|
# This is only for driving autocomplete
|
|
argument_parser.add_argument(
|
|
"--complete-options",
|
|
default=False,
|
|
action=_CompleteOptionsAction,
|
|
help=argparse.SUPPRESS,
|
|
)
|
|
|
|
argument_parser.add_argument(
|
|
"example",
|
|
help="the example directory",
|
|
)
|
|
|
|
add_logging_group(argument_parser)
|
|
|
|
args = argument_parser.parse_args()
|
|
|
|
activate_logger(args, get_logging_level(args))
|
|
|
|
# For KeyboardInterrupt handling
|
|
popen = False
|
|
pids = list()
|
|
|
|
try:
|
|
examples_dir = resolve_examples_dir(args)
|
|
|
|
# Allow the user to specify a complete absolute or relative path to
|
|
# the example, rather than relying on the example search path. The
|
|
# advantage to the user here is that they can leverage autocomplete
|
|
if os.path.isabs(args.example):
|
|
example_dir = args.example
|
|
else:
|
|
example_dir = os.path.join(EXAMPLES_DIR, args.example)
|
|
|
|
logging.info("Running example in\n %s", example_dir)
|
|
|
|
if not os.path.exists(example_dir):
|
|
example_dir = os.path.join(examples_dir, args.example)
|
|
|
|
if not os.path.exists(example_dir):
|
|
logging.critical(
|
|
"Example '%s' does not exist in\n %s\n"
|
|
" Use the `--list` option for a list of available examples.",
|
|
args.example,
|
|
examples_dir,
|
|
)
|
|
return 1
|
|
|
|
yaml_filename = os.path.join(example_dir, CONFIG_FILE)
|
|
if not os.path.exists(yaml_filename):
|
|
logging.critical(
|
|
"Example '%s' is not property configured.\n"
|
|
" There is no '%s' in the example directory.\n"
|
|
" Use the `--list` option for a list of available examples.",
|
|
args.example,
|
|
CONFIG_FILE,
|
|
)
|
|
return 1
|
|
|
|
loaded_yaml = read_yaml(yaml_filename)
|
|
|
|
amase_dir = resolve_amase_dir(args)
|
|
(scenario_file, amase_delay) = check_amase(loaded_yaml, example_dir, args)
|
|
|
|
uxas_configs = check_uxas(loaded_yaml, example_dir)
|
|
|
|
if scenario_file:
|
|
amase_pid = run_amase(scenario_file, example_dir, amase_dir)
|
|
|
|
if amase_delay > 0:
|
|
print(
|
|
f"Waiting for {amase_delay} seconds while AMASE starts; "
|
|
"press 'Play' as soon as it does."
|
|
)
|
|
time.sleep(amase_delay)
|
|
|
|
popen = scenario_file is not None
|
|
pids = run_uxas(uxas_configs, example_dir, popen)
|
|
|
|
if scenario_file:
|
|
# Wait for the user to close AMASE
|
|
amase_pid.wait()
|
|
|
|
killall_uxases(popen, pids)
|
|
return 0
|
|
|
|
except KeyboardInterrupt:
|
|
print(" ")
|
|
killall_uxases(popen, pids)
|
|
return 2
|
|
|
|
except Exception as exception: # pylint: disable=broad-except
|
|
logging.critical("Got an exception %s", exception)
|
|
traceback.print_exc()
|
|
argument_parser.print_usage()
|
|
return 3
|
|
|
|
|
|
# Script processing.
|
|
if __name__ == "__main__":
|
|
sys.exit(run_example_main())
|