You've already forked code_examples_server
mirror of
https://github.com/AdaCore/code_examples_server.git
synced 2026-02-12 12:45:18 -08:00
198 lines
6.5 KiB
Python
198 lines
6.5 KiB
Python
# This package contains two classes that can be used to launch processes
|
|
# and process their output.
|
|
#
|
|
# SeparateProcess is used to launch a series of processes: for each instance,
|
|
# a task is created to monitor the output of the processes.
|
|
#
|
|
# ProcessReader is used to read the current status of a process.
|
|
#
|
|
# The code expects that the client will make regular calls to
|
|
# ProcessReader.poll() until the processes are completed.
|
|
|
|
import os
|
|
import shutil
|
|
import sys
|
|
import subprocess
|
|
import time
|
|
import psutil
|
|
from threading import Thread
|
|
from Queue import Queue, Empty
|
|
from compile_server.app.models import ProgramRun
|
|
from safe_run import INTERRUPT_STRING
|
|
|
|
|
|
TIMEOUT_SECONDS = 30
|
|
# Number of seconds to allow to a process
|
|
|
|
MAX_SESSION_AGE = 60
|
|
# Number of seconds after which to remove a session from memory
|
|
|
|
|
|
class SeparateProcess(object):
|
|
|
|
def __init__(self, cmd_lines, cwd):
|
|
"""Launch the given command lines in sequence in the background.
|
|
cmd_lines is a list of lists representing the command lines
|
|
to launch.
|
|
cwd is a directory in which the command line is run; this directory
|
|
is erased when the processes are finished.
|
|
"""
|
|
self.cmd_lines = cmd_lines
|
|
self.q = Queue()
|
|
self.working_dir = cwd
|
|
self.interrupted = False # Whether we interrupted forcefully
|
|
self.output_file = os.path.join(self.working_dir, 'output.txt')
|
|
self.status_file = os.path.join(self.working_dir, 'status.txt')
|
|
with open(self.status_file, 'wb') as f:
|
|
f.write("")
|
|
self.p = None # the current running process
|
|
self.time = time.time() # the start time of the running process
|
|
self.processes_running = True
|
|
|
|
t = Thread(target=self._enqueue_output)
|
|
t.daemon = True
|
|
t.start()
|
|
|
|
t2 = Thread(target=self._monitor_timeout)
|
|
t2.daemon = True
|
|
t2.start()
|
|
|
|
def _monitor_timeout(self):
|
|
"""Monitor the running process, interrupting it if it takes too long"""
|
|
|
|
while self.processes_running:
|
|
if time.time() - self.time > TIMEOUT_SECONDS:
|
|
self.interrupted = True
|
|
with open(self.output_file, 'ab') as f:
|
|
f.write("<interrupted after timeout>")
|
|
with open(self.status_file, 'wb') as f:
|
|
f.write("-1")
|
|
# The current process took too long, kill it, first with
|
|
# sigabort, then with sigkill
|
|
try:
|
|
self.p.kill()
|
|
time.sleep(0.01)
|
|
os.kill(self.p.pid, 9)
|
|
except OSError:
|
|
pass
|
|
return
|
|
|
|
time.sleep(1.0)
|
|
|
|
def _enqueue_output(self):
|
|
"""The function that reads the output from the process"""
|
|
|
|
# Launch each process in sequence, in the same task
|
|
for cmd in self.cmd_lines:
|
|
self.time = time.time()
|
|
self.p = subprocess.Popen(
|
|
cmd,
|
|
cwd=self.working_dir,
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.STDOUT,
|
|
bufsize=1,
|
|
close_fds=True)
|
|
|
|
# Write the output line by line in the output file
|
|
for line in iter(self.p.stdout.readline, b''):
|
|
if self.interrupted:
|
|
return
|
|
with open(self.output_file, 'ab') as f:
|
|
f.write(line)
|
|
|
|
# Write the return code
|
|
self.p.wait()
|
|
returncode = self.p.returncode
|
|
|
|
# Cleanup
|
|
self.p.stdout.close()
|
|
|
|
# If the process returned nonzero, do not run the next process
|
|
if returncode != 0:
|
|
break
|
|
|
|
# Write the last return code in the status file
|
|
with open(self.status_file, 'wb') as f:
|
|
f.write(str(returncode))
|
|
|
|
# When we have finished running processes, the monitor task should
|
|
# stop
|
|
self.processes_running = False
|
|
|
|
|
|
class ProcessReader(object):
|
|
|
|
def __init__(self, working_dir):
|
|
self.working_dir = working_dir
|
|
self.output_file = os.path.join(self.working_dir, 'output.txt')
|
|
self.status_file = os.path.join(self.working_dir, 'status.txt')
|
|
self.interrupt_detected = False
|
|
|
|
def poll(self):
|
|
""" Check whether the process is still running.
|
|
return None if the process is still running, otherwise return
|
|
the status code.
|
|
"""
|
|
# Custom debug codes, to debug application
|
|
if not os.path.isdir(self.working_dir):
|
|
return 101
|
|
|
|
if not os.path.isfile(self.status_file):
|
|
return None
|
|
|
|
with open(self.status_file) as f:
|
|
status_text = f.read().strip()
|
|
|
|
if not status_text:
|
|
return None
|
|
else:
|
|
# When all the processes are completed, remove the working dir
|
|
shutil.rmtree(self.working_dir)
|
|
if self.interrupt_detected:
|
|
return 1
|
|
return int(status_text)
|
|
|
|
def read_lines(self, already_read=0):
|
|
"""Read all the available lines from the process.
|
|
already_read indicates the number of lines that have already been
|
|
read by the process.
|
|
Return an empty list if there is nothing to read.
|
|
"""
|
|
|
|
# Custom debug codes, to debug application
|
|
if not os.path.isdir(self.working_dir):
|
|
return 103
|
|
|
|
if not os.path.isfile(self.output_file):
|
|
return []
|
|
|
|
lines = []
|
|
with open(self.output_file, "rb") as f:
|
|
for x in f.readlines():
|
|
# Doctor: remove the mentions of working dir from output
|
|
lines.append(x.replace(self.working_dir, '.'))
|
|
|
|
if x.strip() == INTERRUPT_STRING:
|
|
self.interrupt_detected = True
|
|
|
|
return lines[already_read:]
|
|
|
|
|
|
def cleanup_old_processes():
|
|
"""Cleanup the list of running processes"""
|
|
for a in ProgramRun.objects.all():
|
|
# Uncomment this to print the running sessions
|
|
# print "running:", a.timestamp, a.working_dir
|
|
|
|
# Remove from the database all the processes where the working dir does
|
|
# no longer exist
|
|
now = time.time()
|
|
if os.path.exists(a.working_dir):
|
|
if now - os.path.getctime(a.working_dir) > MAX_SESSION_AGE:
|
|
print "deleting dir because it is too old:", a.working_dir
|
|
shutil.rmtree(a.working_dir)
|
|
a.delete()
|
|
else:
|
|
print "deleting because dir has been cleared:", a.working_dir
|
|
a.delete()
|