From 291b6052be2d34ded7c4b97980d2ed861b189db5 Mon Sep 17 00:00:00 2001 From: Nicolas Setton Date: Mon, 11 Sep 2017 15:47:46 -0400 Subject: [PATCH] Support several server instances To do this, we can no longer rely on in-memory sharing of data: instead, use the filesystem to store the output and status of the running processes. --- compile_server/app/checker.py | 24 +++---- compile_server/app/process_handling.py | 87 +++++++++++++++++++++----- compile_server/app/static/editors.js | 17 +++-- 3 files changed, 90 insertions(+), 38 deletions(-) diff --git a/compile_server/app/checker.py b/compile_server/app/checker.py index 61f3d9e..2a8ea7f 100644 --- a/compile_server/app/checker.py +++ b/compile_server/app/checker.py @@ -1,5 +1,6 @@ # -*- coding: utf-8 -*- +import glob import os import codecs import distutils.spawn @@ -16,9 +17,6 @@ from compile_server.app import process_handling gnatprove_found = False -RUNNING_PROCESSES = {} -# The currently running processes (SeparateProcess) objects, indexed by ID - def check_gnatprove(): """Check that gnatprove is found on the PATH""" @@ -36,15 +34,11 @@ def check_output(request): received_json = json.loads(request.body) identifier = received_json['identifier'] - if identifier not in RUNNING_PROCESSES: - # Pretend the process has finished - return Response({'output_lines': [], - 'status': 0, - 'completed': True, - 'message': "completed"}) + p = process_handling.ProcessReader( + os.path.join(tempfile.gettempdir(), identifier)) - p = RUNNING_PROCESSES[identifier] - lines = p.read_lines() + print received_json['already_read'] + lines = p.read_lines(received_json['already_read']) # Remove some noise from the gnatprove output lines = [l.strip() for l in lines if not l.startswith("Summary logged")] @@ -58,8 +52,6 @@ def check_output(request): 'message': "running"}) else: - # The program has finished - del(RUNNING_PROCESSES[identifier]) return Response({'output_lines': lines, 'status': returncode, 'completed': True, @@ -91,8 +83,9 @@ def check_program(request): identifier = os.path.basename(tempd) # Copy the original resources in a sandbox directory - target = os.path.join(tempd, os.path.basename(e.original_dir)) - shutil.copytree(e.original_dir, target) + target = tempd + for g in glob.glob(os.path.join(e.original_dir, '*')): + shutil.copy(g, target) # Overwrite with the user-contributed files for file in received_json['files']: @@ -105,7 +98,6 @@ def check_program(request): try: p = process_handling.SeparateProcess([command], target) - RUNNING_PROCESSES[identifier] = p message = "running gnatprove" except subprocess.CalledProcessError, exception: diff --git a/compile_server/app/process_handling.py b/compile_server/app/process_handling.py index 28ef6f8..e525bc6 100644 --- a/compile_server/app/process_handling.py +++ b/compile_server/app/process_handling.py @@ -1,3 +1,15 @@ +# This package contains two classes that can be used to launch processes +# and process their output. +# +# SeparateProcess is used to launch a series of processes: for each instance, +# a task is created to monitor the output of the processes. +# +# ProcessReader is used to read the current status of a process. +# +# The code expects that the client will make regular calls to +# ProcessReader.poll() until the processes are completed. + +import os import shutil import sys import subprocess @@ -17,6 +29,10 @@ class SeparateProcess(object): self.cmd_lines = cmd_lines self.q = Queue() self.working_dir = cwd + self.output_file = os.path.join(self.working_dir, 'output.txt') + self.status_file = os.path.join(self.working_dir, 'status.txt') + with open(self.status_file, 'wb') as f: + f.write("") self.p = None t = Thread(target=self._enqueue_output) t.daemon = True @@ -35,32 +51,69 @@ class SeparateProcess(object): bufsize=1, close_fds=True) + # Write the output line by line in the output file for line in iter(self.p.stdout.readline, b''): - self.q.put(line) + with open(self.output_file, 'ab') as f: + f.write(line) + # Write the return code + self.p.poll() + returncode = self.p.returncode + with open(self.status_file, 'wb') as f: + f.write(str(returncode)) + + # Cleanup self.p.stdout.close() - # When all the processes are complete, - shutil.rmtree(self.working_dir) + # If the process returned nonzero, do not run the next process + if returncode != 0: + break + + +class ProcessReader(object): + + def __init__(self, working_dir): + self.working_dir = working_dir + self.output_file = os.path.join(self.working_dir, 'output.txt') + self.status_file = os.path.join(self.working_dir, 'status.txt') def poll(self): """ Check whether the process is still running. return None if the process is still running, otherwise return the status code. """ - if not self.p: - return None - self.p.poll() - return self.p.returncode + # Custom debug codes, to debug application + if not os.path.isdir(self.working_dir): + return 101 - def read_lines(self): + if not os.path.isfile(self.status_file): + return None + + with open(self.status_file) as f: + status_text = f.read().strip() + + if not status_text: + return None + else: + # When all the processes are completed, remove the working dir + shutil.rmtree(self.working_dir) + return int(status_text) + + def read_lines(self, already_read=0): """Read all the available lines from the process. - Return an empty list if there""" - lines = [] - while True: - try: - # Read the queue - line = self.q.get_nowait() - lines.append(line) - except Empty: - return lines + already_read indicates the number of lines that have already been + read by the process. + Return an empty list if there is nothing to read. + """ + + # Custom debug codes, to debug application + if not os.path.isdir(self.working_dir): + return 103 + + if not os.path.isfile(self.output_file): + return [] + + with open(self.output_file, "rb") as f: + lines = f.readlines() + + return lines[already_read:] diff --git a/compile_server/app/static/editors.js b/compile_server/app/static/editors.js index a7f70b8..a4c4bb2 100644 --- a/compile_server/app/static/editors.js +++ b/compile_server/app/static/editors.js @@ -13,7 +13,12 @@ function output_error(output_area, message){ // TODO: make use of message function process_check_output(editors, output_area, output, status, completed, message){ // Process the lines + + read_lines = 0 + output.forEach(function (l){ + read_lines++ + // Look for lines that contain an error message var match_found = l.match(/^([a-zA-Z._-]+):(\d+):(\d+):(.+)$/) var klass = match_found?"output_msg":"output_line" @@ -65,10 +70,12 @@ function process_check_output(editors, output_area, output, status, completed, m div.appendTo(output_area) } } + + return read_lines } -function get_output_from_identifier(editors, output_area, identifier) { - data = {"identifier": identifier} +function get_output_from_identifier(editors, output_area, identifier, already_read) { + data = {"identifier": identifier, "already_read": already_read} $.ajax({ url: "/check_output/", data: JSON.stringify(data), @@ -77,14 +84,14 @@ function get_output_from_identifier(editors, output_area, identifier) { contentType: 'application/json; charset=UTF-8', }) .done(function( json ) { - process_check_output( + read_lines = process_check_output( editors, output_area, json.output_lines, json.status, json.completed, json.message ) if (!json.completed) { // We have not finished processing the output: call this again setTimeout(function(){ - get_output_from_identifier(editors, output_area, identifier) + get_output_from_identifier(editors, output_area, identifier, already_read + read_lines) }, 250) } }) @@ -124,7 +131,7 @@ function query_check_result(example_name, editors, output_area) { if (json.identifier == ""){ output_error(output_area, json.message) } else { - get_output_from_identifier(editors, output_area, json.identifier) + get_output_from_identifier(editors, output_area, json.identifier, 0) } }) .fail(function( xhr, status, errorThrown ) {