Files
langkit/testsuite/python_support/expect.py
2025-03-26 14:04:20 +00:00

422 lines
14 KiB
Python

############################################################################
# #
# EXPECT.PY #
# #
# Copyright (C) 2010-2014 Ada Core Technologies, Inc. #
# #
# This program is free software: you can redistribute it and/or modify #
# it under the terms of the GNU General Public License as published by #
# the Free Software Foundation, either version 3 of the License, or #
# (at your option) any later version. #
# #
# This program is distributed in the hope that it will be useful, #
# but WITHOUT ANY WARRANTY; without even the implied warranty of #
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the #
# GNU General Public License for more details. #
# #
# You should have received a copy of the GNU General Public License #
# along with this program. If not, see <http://www.gnu.org/licenses/> #
# #
############################################################################
"""Expect like features.
The module provides an API similar to TCL/Expect.
"""
from __future__ import annotations
import datetime
import logging
import pexpect
import re
import sys
from time import sleep
from typing import List, Match, Optional, Protocol, Tuple, Union
from e3.os.fs import which
EXPECT_TIMEOUT = -2
EXPECT_DIED = -3
logger = logging.getLogger("gnatpython.expect")
class ExpectError(Exception):
"""Expect exception."""
def __init__(self, cmd: str, msg: str):
super().__init__(self, cmd, msg)
self.cmd = cmd
self.msg = msg
def __str__(self) -> str:
return "%s: %s" % (self.cmd, self.msg)
class TermEOF(Exception):
"""A local subsitute for the pexpect.EOF exception."""
def __init__(self) -> None:
super().__init__(self, "")
class TermTIMEOUT(Exception):
"""A local subsitute for the pexpect.TIMEOUT exception."""
def __init__(self) -> None:
super().__init__(self, "")
TIMEOUT = pexpect.TIMEOUT
EOF = pexpect.EOF
Timeout = Union[int, float]
class PexpectProcess:
"""Stub for pexpect's process objects."""
pid: int
def expect(
self, pattern: Union[str, List[str]], timeout: Optional[int]
) -> None: ...
def send(self, s: str) -> int: ...
def read_nonblocking(
self, size: int = 1, timeout: Timeout = -1
) -> str: ...
def terminate(self, force: bool = False) -> bool: ...
def wait(self) -> int: ...
def sendintr(self) -> None: ...
class SpawnFunction(Protocol):
def __call__(
self, command: str, arguments: List[str], timeout: Optional[int]
) -> PexpectProcess: ...
spawn: SpawnFunction
if sys.platform == "win32":
# On Windows we need to have a small wrapper around the
# PopenSpawn class.
from pexpect.popen_spawn import PopenSpawn
import signal
class WindowsSpawn(PopenSpawn):
def __init__(self, command: str, arguments: List[str], timeout: int):
PopenSpawn.__init__(
self, [command] + arguments, encoding="cp1252", timeout=timeout
)
def terminate(self, force: bool) -> None:
# Ignore the parameter.
# The kill method doesn't work correctly on Windows,
# so we need to use terminate here.
self.proc.terminate()
def sendintr(self) -> None:
# The kill method doesn't work correctly on Windows,
# so we need to use send_signal here.
self.proc.send_signal(signal.CTRL_C_EVENT)
spawn = WindowsSpawn
else:
# On Unix hosts we can use the ordinary pexpect.
def unix_spawn(
command: str, arguments: List[str], timeout: Optional[int]
) -> PexpectProcess:
result = pexpect.spawn(
command, arguments, timeout=timeout, encoding="UTF-8"
)
result.setecho(False)
return result
spawn = unix_spawn
class ExpectProcess(object):
"""Expect Main class.
ATTRIBUTES
command_line: list of strings containg the command line used to spawn the
process.
status: The return code. None while the command is still running, and an
integer after method "close" has been called.
"""
def __init__(
self,
command_line: List[str],
save_output: bool = False,
save_input: bool = False,
):
"""Constructor.
:param command_line: list of strings representing the command line to
be spawned.
:type command_line: list[str]
:param save_output: Save all output generated during the session for
later retrieval (see method get_session_logs).
:type save_output: bool
:param save_input: Save all input generated during the session for
later retrieval (see method get_session_logs).
:type save_input: bool
"""
self.save_output = save_output
self.save_input = save_input
# Convert the command line to a list of string is needed
command_line = [str(arg) for arg in command_line]
if len(command_line) < 1:
raise ExpectError(
"__init__", "expect a non empty list as argument"
)
command_line[0] = which(command_line[0])
# Store the command line used
logger.debug("spawn %s" % " ".join(command_line))
self.command_line = command_line
# Spawn the process.
self._child: Optional[PexpectProcess] = spawn(
command_line[0], command_line[1:], timeout=None
)
self.pid = self._child.pid
# Initialize our buffer
self.buffer = ""
# If we have to save input or output keep another buffer that
# is never flushed.
self.saved_buffer = ""
# Keep the state of the process
self.process_is_dead = False
# This is where we store that last successful expect result
self.last_match: Optional[Tuple[int, str, Match[str]]] = None
# This is where the command returned status will be stored
# when the command has exited. For the moment, it is not
# available.
self.status: Optional[int] = None
def __poll(self, timeout: Timeout) -> None:
"""Poll for new output.
:param timeout: Maximum time in seconds we wait for new output.
"""
assert self._child is not None
try:
result = self._child.read_nonblocking(size=16384, timeout=timeout)
result = result.replace("\r", "")
self.buffer += result
if self.save_output:
self.saved_buffer += result
except TIMEOUT:
pass
except EOF:
self.process_is_dead = True
def flush(self) -> None:
"""Flush output buffer.
Output read before the call to flush will be completely ignored
in next expect calls.
"""
self.__poll(0)
self.buffer = ""
def sendline(self, msg: str) -> int:
"""Send a line to the process.
:param msg: String to be sent.
:return: 1 of OK and 0 otherwise.
"""
return self.send(msg + "\n")
def send(
self, msg: str, add_lf: bool = True, flush_buffer: bool = False
) -> int:
"""Send a msg to the program.
:param msg: the message to send
:return: 1 if OK, 0 otherwise.
:raise ExceptError: if the process has been closed
"""
if self._child is None:
raise ExpectError("send", "process has been closed")
if add_lf:
msg += "\n"
if flush_buffer:
self.flush()
if self.save_input:
self.saved_buffer += msg
write_status = self._child.send(msg)
if write_status <= 0:
return 0
else:
return 1
def expect(self, patterns: List[str], timeout: int) -> Optional[int]:
"""Expect an output.
:param patterns: A list of regexp.
:param timeout: Maximum time in seconds we wait for a match.
:return: If a pattern was matched then it returns it position in
the list of patterns. Otherwise the function returns EXPECT_DIED if
the process die before matching one of the pattern, or
EXPECT_TIMEOUT if none of the patterns were matched during timeout
seconds.
"""
if self._child is None:
raise ExpectError("expect", "process has been closed")
match = None
result = 0
expect_start = datetime.datetime.utcnow()
time_left: Timeout = int(timeout * 1000.0)
while match is None and time_left > 0:
# Do we have a match with the current output
for index, pattern in enumerate(patterns):
match = re.search(pattern, self.buffer)
if match is not None:
result = index
break
if match is not None:
break
else:
# We don't have a match so poll for new output
self.__poll(time_left / 1000)
if self.process_is_dead:
return EXPECT_DIED
# update time_left.
# The update is done only if current time is superior to time
# at which the function started. This test might seem a bit
# weird but on some Linux machines on VmWare we have found
# huge clock drift that the system tries to compensate. The
# consequence is that we cannot assume that the clock is
# monotonic.
current_time = datetime.datetime.utcnow()
if current_time > expect_start:
time_spent = current_time - expect_start
time_left = int(timeout * 1000.0) - (
time_spent.seconds * 1000
+ time_spent.microseconds / 1000
)
if match is not None:
self.last_match = (result, self.buffer[: match.start(0)], match)
self.buffer = self.buffer[match.end(0) :]
return result
if time_left <= 0:
return EXPECT_TIMEOUT
return None
def out(self) -> Tuple[str, str]:
"""Retrieve matched output.
:return: a tuple of strings. The first element is the output read
before the match and the second element the output matched by
the regexp. If if there is no previous match then it return
("", "")
"""
if self.last_match is None:
return ("", "")
else:
return (self.last_match[1], self.last_match[2].group(0))
def shutdown(self) -> None:
"""Ensure that if python ends all expect object are killed.
This is specially important when the python process doing the expect
is interrupted by a Ctrl-C. This method ensure that we don't let
expect processes alive.
"""
self.close()
def close(self) -> None:
"""Close an expect session.
If the underlying process is not dead yet, kill it. Set the
status attribute to the command return code.
"""
if self._child is not None:
self.interrupt()
sleep(0.05)
if not self.process_is_dead:
self._child.terminate(force=True)
self.status = self._child.wait()
self._child = None
def interrupt(self) -> None:
"""Interrupt.
This is an equivalent of sending Ctrl-C to the process
"""
if not self.process_is_dead and self._child is not None:
self._child.sendintr()
def get_session_logs(self) -> str:
"""Return the saved output and/or input.
:return: A string containing the logs of the session.
The output and/or input are only available if the constructor was
called with save_output and/or save_output respectively set to True
(this is NOT the default).
"""
return self.saved_buffer
def set_timer(self, delay: Timeout) -> None:
"""Set timer.
:param delay: A float or integer representing seconds.
When timer is set, you can check its expiration with the
has_timer_expired method.
"""
self.timer_end = datetime.datetime.utcnow() + datetime.timedelta(
seconds=delay
)
def has_timer_expired(self) -> bool:
"""Check if timer has expired.
:return: True if timer has expired, False otherwise.
"""
return self.timer_end < datetime.datetime.utcnow()
def wait(self) -> int:
"""Wait for the end of the process and return the status.
Note that the function is a direct wrapper around waitpid. While
waiting for the process end, the output of the process will not be
read so you should ensure before calling that function that not too
much output will be generated (i.e less than 32Ko), otherwise the
process might block.
:return: The process exit status also stored in self.status.
"""
if self._child is None:
raise ExpectError("expect", "process has been closed")
self.status = self._child.wait()
self._child = None
return self.status