Add a soft limit to the number of running processes

This commit is contained in:
Nicolas Setton
2018-07-20 00:34:08 -04:00
parent def6e0f21e
commit 8cb4282fd0
5 changed files with 74 additions and 3 deletions

View File

@@ -8,3 +8,4 @@ pytz==2017.2
PyYAML==3.12
Sphinx==1.7.5
ipython==5.7.0
psutil==5.4.6

View File

@@ -12,7 +12,7 @@ import tempfile
from rest_framework.response import Response
from rest_framework.decorators import api_view
from compile_server.app.models import Resource, Example
from compile_server.app.models import Resource, Example, ProgramRun
from compile_server.app import process_handling
from compile_server.app.views import CrossDomainResponse
@@ -23,6 +23,8 @@ ALLOWED_EXTRA_ARGS = {'spark-flow': "--mode=flow",
# We maintain a list of extra arguments that can be passed to the command
# line. For security we don't want the user to pass arguments as-is.
PROCESSES_LIMIT = 300 # The limit of processes that can be running
def check_gnatprove():
"""Check that gnatprove is found on the PATH"""
@@ -34,6 +36,16 @@ def check_gnatprove():
return gnatprove_found
def resources_available():
"""Return whether we have enough resources on the machine"""
if len(ProgramRun.objects.all()) > PROCESSES_LIMIT:
# We're over the limit: first attempt a cleanup
process_handling.cleanup_old_processes()
return len(ProgramRun.objects.all()) <= PROCESSES_LIMIT
else:
return True
@api_view(['POST'])
def check_output(request):
"""Check the output of a running process."""
@@ -146,6 +158,12 @@ def check_program(request):
return CrossDomainResponse(
{'identifier': '', 'message': "example not found"})
# Check whether we have too many processes running
if not resources_available():
return CrossDomainResponse(
{'identifier': '',
'message': "the machine is busy processing too many requests"})
tempd = prep_example_directory(e, received_json)
main = get_main(received_json)
@@ -196,6 +214,12 @@ def run_program(request):
return CrossDomainResponse(
{'identifier': '', 'message': "main not specified"})
# Check whether we have too many processes running
if not resources_available():
return CrossDomainResponse(
{'identifier': '',
'message': "the machine is busy processing too many requests"})
doctor_main_gpr(tempd, main)
# Run the command(s) to check the program
@@ -212,6 +236,8 @@ def run_program(request):
try:
p = process_handling.SeparateProcess(commands, tempd)
stored_run = ProgramRun(working_dir=p.working_dir)
stored_run.save()
message = "running gnatprove"
except subprocess.CalledProcessError, exception:

View File

@@ -48,3 +48,10 @@ class Example(models.Model):
# An example is a contains a set of resources
resources = models.ManyToManyField(Resource)
class ProgramRun(models.Model):
"""Represents programs currently being run"""
working_dir = models.TextField()
timestamp = models.DateTimeField(auto_now_add=True)

View File

@@ -14,8 +14,10 @@ import shutil
import sys
import subprocess
import time
import psutil
from threading import Thread
from Queue import Queue, Empty
from compile_server.app.models import ProgramRun
TIMEOUT_SECONDS = 30
@@ -34,6 +36,7 @@ class SeparateProcess(object):
self.cmd_lines = cmd_lines
self.q = Queue()
self.working_dir = cwd
self.interrupted = False # Whether we interrupted forcefully
self.output_file = os.path.join(self.working_dir, 'output.txt')
self.status_file = os.path.join(self.working_dir, 'status.txt')
with open(self.status_file, 'wb') as f:
@@ -55,10 +58,21 @@ class SeparateProcess(object):
while self.processes_running:
if time.time() - self.time > TIMEOUT_SECONDS:
self.interrupted = True
with open(self.output_file, 'ab') as f:
f.write("<interrupted after timeout>")
# The current process took too long, kill it
self.p.kill()
with open(self.status_file, 'wb') as f:
f.write("-1")
# The current process took too long, kill it, first with
# sigabort, then with sigkill
try:
self.p.kill()
time.sleep(0.01)
os.kill(self.p.pid, 9)
except OSError:
pass
return
time.sleep(1.0)
def _enqueue_output(self):
@@ -77,6 +91,8 @@ class SeparateProcess(object):
# Write the output line by line in the output file
for line in iter(self.p.stdout.readline, b''):
if self.interrupted:
return
with open(self.output_file, 'ab') as f:
f.write(line)
@@ -91,6 +107,7 @@ class SeparateProcess(object):
if returncode != 0:
break
# Write the last return code in the status file
with open(self.status_file, 'wb') as f:
f.write(str(returncode))
@@ -146,3 +163,14 @@ class ProcessReader(object):
lines = f.readlines()
return lines[already_read:]
def cleanup_old_processes():
"""Cleanup the list of running processes"""
for a in ProgramRun.objects.all():
print a.timestamp, a.working_dir
# Remove from the database all the processes where the working dir does
# no longer exist
if not os.path.exists(a.working_dir):
print "deleting because dir has been cleared", a.working_dir
a.delete()

View File

@@ -5,3 +5,12 @@ lxc exec safecontainer -- killall -u unprivileged --older-than 30s -signal KILL
# Delete all old directories
lxc exec safecontainer -- find /tmp/ -mindepth 1 -type d -mmin +1 -exec rm -rf {} \;
(if [ -d /webapps/compile_server/bin/ ] ; then
export PATH=/webapps/compile_server/bin/:$PATH
cd /webapps/compile_server/bin/cloudchecker
else
cd ..
fi
./manage.py clear_sessions
)