Files
snapd/tests/lib/tools/log-parser
Sergio Cazzolato 146c387942 tests: new spread log parser (#10552)
* New log analyzer and tests

This log analyzer can be used to take actions based on spread test
results, like to re-execute just failed tests

* Making log analizer executable

* Fix error on new line

* Rename log-analyzer as log-parser

* Update the workding and how the log is indexed to retrieve the details

* strip lines to avoid error getting results

* fix shellcheck
2021-08-27 09:20:00 -03:00

549 lines
16 KiB
Python
Executable File

#!/usr/bin/env python3
"""
This tool reads a spread log and creates a file with all the data
The output file includes the more important information extracted
from the log to be analyzed
"""
import argparse
import json
import os
import re
import sys
# Info types
ERROR_TYPE = 'Error'
DEBUG_TYPE = 'Debug'
WARN_TYPE = 'WARNING:'
# Results
FAILED_TYPE = 'Failed'
ABORTED_TYPE = 'Aborted'
SUCCESSFUL_TYPE = 'Successful'
# Printable names
ALL = 'all'
NONE = 'none'
ACTION = 'action'
OPERATION = 'operation'
INFO = 'info'
ERROR = 'error'
ERROR_DEBUG = 'error-debug'
FAILED = 'failed'
ABORTED = 'aborted'
SUCCESSFUL = 'successful'
RESULT = 'result'
START = 'Found'
SPREAD_FILE = 'spread.yaml'
EXEC_VERBS = ['Preparing', 'Executing', 'Restoring']
INFO_TYPES = [ERROR_TYPE, DEBUG_TYPE, WARN_TYPE]
OPERATIONS = [
'Rebooting', 'Discarding', 'Allocating', 'Waiting',
'Allocated', 'Connecting', 'Connected', 'Sending'
]
RESULTS = ['Successful', 'Aborted', 'Failed']
class Action:
"""
Action represents the main spread tasks actions
The actions can be: Preparing, Executing and Restoring
"""
def __init__(self, verb, task, date, time, source_line):
self.type = ACTION
self.verb = verb
self.time = time
self.date = date
self.task = task
self.source_line = source_line
def __repr__(self):
return self.source_line
def __dict__(self):
return {
'type': 'action',
'date': self.date,
'time': self.time,
'verb': self.verb,
'task': self.task
}
class Result:
"""
Result represents the results for a spread run
The results can be: Successful, failed and aborted
"""
def __init__(self, result_type, level, number, date, time,
detail, source_line):
self.type = RESULT
self.result_type = result_type
self.level = level
self.number = number
self.time = time
self.date = date
self.detail = detail
self.source_line = source_line
def __repr__(self):
if self.detail:
return '{}{}'.format(self.source_line, str(self.detail))
return self.source_line
def __dict__(self):
prepared_detail = None
if self.detail:
prepared_detail = self.detail.__dict__()
return {
'type': self.type,
'date': self.date,
'time': self.time,
'result_type': self.result_type,
'level': self.level,
'number': self.number,
'detail': prepared_detail
}
class Info:
"""
Info represents the extra tasks information which is included in the
spread log. The info can be: Error, Debug and Warning
"""
def __init__(self, info_type, verb, task, extra, date, time,
detail, source_line):
self.type = INFO
self.info_type = info_type
self.verb = verb
self.time = time
self.date = date
self.task = task
self.extra = extra
self.detail = detail
self.source_line = source_line
def __repr__(self):
if self.detail:
return '{}{}'.format(self.source_line, self.detail)
return self.source_line
def __dict__(self):
prepared_detail = None
if self.detail:
prepared_detail = self.detail.__dict__()
return {
'type': self.type,
'date': self.date,
'time': self.time,
'info_type': self.info_type,
'verb': self.verb,
'task': self.task,
'extra': self.extra,
'detail': prepared_detail
}
class Detail:
"""
Detail represents the extra lines which are displayed after the info
"""
def __init__(self, lines_limit, lines):
self.lines_limit = lines_limit
self.lines = lines
def _get_lines(self):
if self.lines_limit < 0 or self.lines_limit > len(self.lines):
return self.lines
# Use self.lines_limit-1 because the last line is a '.' and we don't
# want to count it as a line in the log details
return self.lines[-self.lines_limit-1:]
def __repr__(self):
return ''.join(self._get_lines())
def __dict__(self):
return {'lines': self.lines[-self.lines_limit-1:]}
class Operation:
"""
Operation represents other actions that the spread running can do while
executing tests like: Rebooting, Discarding, Allocating, Waiting,
Allocated, Connecting, Connected, Sending
"""
def __init__(self, verb, task, extra, date, time, source_line):
self.type = OPERATION
self.verb = verb
self.time = time
self.extra = extra
self.date = date
self.task = task
self.source_line = source_line
def __repr__(self):
return self.source_line
def __dict__(self):
return {
'type': self.type,
'date': self.date,
'time': self.time,
'verb': self.verb,
'task': self.task,
'extra': self.extra
}
class LogReader:
"""
LogReader manages the spread log, it allows to read, export and print
"""
def __init__(self, filepath, output_type, lines_limit, store_setup):
self.filepath = filepath
self.output_type = output_type
self.lines_limit = lines_limit
self.store_setup = store_setup
self.lines = []
self.iter = 0
self.full_log = []
def __repr__(self):
return str(self.__dict__())
def __dict__(self):
return {'full_log': self.full_log}
def print_log(self, details, results):
if not self.full_log:
return
# Print the details
if details == ALL:
print(''.join(str(x) for x in self.full_log))
elif details == NONE:
pass
elif details == ERROR:
print(''.join(str(x) for x in self.full_log if x.type == INFO and
x.info_type == ERROR_TYPE))
elif details == ERROR_DEBUG:
print(''.join(str(x) for x in self.full_log if x.type == INFO and
(x.info_type == ERROR_TYPE or x.info_type == DEBUG_TYPE)))
else:
print(''.join(str(x) for x in self.full_log if x.type == details))
# Print the results
if results == ALL:
print(''.join(str(x) for x in self.full_log if x.type == RESULT))
elif results == NONE:
pass
elif results == FAILED:
print(''.join(str(x) for x in self.full_log if x.type == RESULT and
x.result_type == FAILED_TYPE))
elif results == ABORTED:
print(''.join(str(x) for x in self.full_log if x.type == RESULT and
x.result_type == ABORTED_TYPE))
else:
print(''.join(str(x) for x in self.full_log if x.type == RESULT and
x.result_type == SUCCESSFUL_TYPE))
def export_log(self, filepath):
prepared_log = []
for item in self.full_log:
if isinstance(item, str):
prepared_log.append(item)
else:
prepared_log.append(item.__dict__())
with open(filepath, 'w') as json_file:
json.dump(prepared_log, json_file)
def _next_line(self):
self.iter = self.iter + 1
return self.lines[self.iter-1]
def check_log_exists(self):
return os.path.exists(self.filepath)
def read_spread_log(self):
with open(self.filepath) as filepath:
self.lines = filepath.readlines()
# Find the start of the log, the log file could include
# initial lines which are not part of the spread log itself
self.iter = 0
if self.store_setup:
while self.iter < len(self.lines):
line = self._next_line()
if self._match_start(line):
break
self.full_log.append(line)
if self.iter >= len(self.lines):
# Start not found, the log is either empty, corrupted or cut
self.iter = 0
# Then iterate line by line analyzing the log
while self.iter < len(self.lines):
line = self._next_line()
# The line is a task execution; preparing, executing, restoring
if self._match_task(line):
action = self._get_action(line)
if action:
self.full_log.append(action)
continue
# The line shows info: error, debug, warning
if self._match_info(line):
info = self._get_info(line)
if info:
self.full_log.append(info)
continue
# The line is another opertion: Rebooting, Discarding, Allocating
# Waiting, Allocated, Connecting, Connected, Sending'
if self._match_operation(line):
operation = self._get_operation(line)
if operation:
self.full_log.append(operation)
continue
# The line is a result: Successful, Aborted, Failed
if self._match_result(line):
result = self._get_result(line)
if result:
self.full_log.append(result)
continue
def _match_date(self, date):
return re.findall(r'\d{4}-\d{2}-\d{2}', date)
def _match_time(self, time):
return re.findall(r'\d{2}:\d{2}:\d{2}', time)
def _match_info(self, line):
parts = line.strip().split(' ')
return len(parts) > 3 and \
parts[2] in INFO_TYPES and \
self._match_date(parts[0]) and \
self._match_time(parts[1])
def _match_task(self, line):
parts = line.strip().split(' ')
return len(parts) > 2 and \
parts[2] in EXEC_VERBS and \
self._match_date(parts[0]) and \
self._match_time(parts[1])
def _match_start(self, line):
parts = line.strip().split(' ')
return len(parts) > 2 and \
parts[2] == START and \
self._match_date(parts[0]) and \
self._match_time(parts[1]) and \
SPREAD_FILE in parts[3]
def _match_operation(self, line):
parts = line.strip().split(' ')
return len(parts) > 2 and \
parts[2] in OPERATIONS and \
self._match_date(parts[0]) and \
self._match_time(parts[1])
def _match_result(self, line):
parts = line.strip().split(' ')
return len(parts) > 2 and \
parts[2] in RESULTS and \
self._match_date(parts[0]) and \
self._match_time(parts[1])
def _get_detail(self, other_limit=None):
"""
This function is used to get the piece of log which is after the
info lines (error, debug, warning). The detail could also include
a limit of lines to tail the log and show the last lines.
It returns a Detail object included all the lines.
"""
initial_iter = self.iter
while self.iter < len(self.lines):
line = self._next_line()
if self._match_task(line) or self._match_info(line) or \
self._match_operation(line) or self._match_result(line):
break
if not self.iter == len(self.lines):
self.iter = self.iter - 1
if not other_limit:
other_limit = self.lines_limit
return Detail(other_limit, self.lines[initial_iter:self.iter])
def _get_info(self, line):
"""
Get the Info object for the error, debug and warning lines including
the details for this
"""
parts = line.strip().split(' ')
if len(parts) < 3:
return None
date = parts[0]
time = parts[1]
info_type = parts[2]
verb = None
task = None
if info_type == WARN_TYPE:
info_type = info_type.split(':')[0]
verb = None
task = None
extra = ' '.join(parts[3:])
elif info_type == ERROR_TYPE:
verb = parts[3]
task = parts[4]
extra = None
elif info_type == DEBUG_TYPE:
verb = parts[3]
task = parts[4]
extra = None
else:
print('log-parser: detail type not recognized: {}'.format(info_type))
detail = self._get_detail()
return Info(info_type, verb, task, extra, date, time, detail, line)
def _get_result(self, line):
""" Get the Result object including the details for the result """
parts = line.strip().split(' ')
if len(parts) < 3:
print(parts)
return None
date = parts[0]
time = parts[1]
result_type = parts[2]
level = parts[3].split(':')[0]
number = parts[-1]
if result_type == FAILED_TYPE:
detail = self._get_detail(other_limit=-1)
else:
detail = None
return Result(result_type, level, number.strip(), date, time, detail,
line)
def _get_action(self, line):
"""
Get the Action object for lines preparing, executing and restoring
"""
parts = line.strip().split(' ')
if len(parts) < 3:
return None
date = parts[0]
time = parts[1]
verb = parts[2]
task = parts[3]
return Action(verb, task.split('...')[0], date, time, line)
def _get_operation(self, line):
""" Get the Operation object for lines rebooting, allocating, etc """
parts = line.strip().split(' ')
if len(parts) < 3:
return None
date = parts[0]
time = parts[1]
verb = parts[2]
task = None
extra = ' '.join(parts[3:])
return Operation(verb, task, extra, date, time, line)
def _make_parser():
# type: () -> argparse.ArgumentParser
parser = argparse.ArgumentParser(
description="""
Parse the spread log and generates a file with a standarized output. It also
allows to filter the output by type and define the number of lines to show
for the error/debug/warning output.
"""
)
parser.add_argument(
"-c",
"--cut",
type=int,
default=1000,
help="maximun number of lines for logs on errors and debug sections",
)
parser.add_argument(
"-f",
"--format",
type=str,
default="json",
choices=['json'],
help="format for the output",
)
parser.add_argument(
"-pd",
"--print-details",
type=str,
default=NONE,
choices=[ALL, ERROR, ERROR_DEBUG, OPERATION, ACTION, INFO, NONE],
help="Filter which info to print",
)
parser.add_argument(
"-pr",
"--print-results",
type=str,
default=NONE,
choices=[ALL, FAILED, ABORTED, SUCCESSFUL, NONE],
help="Filter which results to print",
)
parser.add_argument(
"-o",
"--output",
default="spread-results.json",
type=str,
help="output file to save the result",
)
parser.add_argument(
"--store-setup",
action="store_true",
help="will save all the text before the spread run is started",
)
parser.add_argument(
"logpath", metavar="PATH", help="path to the log to be analyzed"
)
return parser
def main():
# type: () -> None
parser = _make_parser()
args = parser.parse_args()
if len(args.logpath) == 0:
parser.print_usage()
parser.exit(0)
reader = LogReader(args.logpath, args.format, args.cut, args.store_setup)
if not reader.check_log_exists():
print("log-parser: log not found")
sys.exit(1)
reader.read_spread_log()
if args.output:
reader.export_log(args.output)
reader.print_log(args.print_details, args.print_results)
if __name__ == "__main__":
main()