#! /usr/bin/env python import getopt import os import Queue import re import signal import subprocess import sys import time import threading TIME_SLICE = 0.1 WAIT_TIMEOUT = 0.1 TERMINATE_TIMEOUT = 5 JOIN_TIMEOUT = 1 class Default(object): COMPILER = 'g++' COMPILE_ARGS = '-g -Wall -Werror -std=c++11 -pedantic -pthread -lstdc++' PROJ_PATH = os.getcwd() TEST_PATH = os.getcwd() TIME_LIMIT = 30 WORKER_NUM = 4 SUFFIX = '_test.cpp' class Params(object): compiler = Default.COMPILER compile_args = Default.COMPILE_ARGS proj_path = Default.PROJ_PATH test_path = Default.TEST_PATH time_limit = Default.TIME_LIMIT worker_num = Default.WORKER_NUM suffix = Default.SUFFIX PARSE_HELP = 0 PARSE_ERROR = -1 PARSE_ACCEPT = 1 @staticmethod def Parse(argv): optlist, args = getopt.getopt(argv, 'c:a:hw:p:P:s:t:') for (opt, arg) in optlist: if opt == '-c': Params.compiler = arg elif opt == '-a': Params.compile_args = arg elif opt == '-h': return (Params.PARSE_HELP, Params.HelpString()) elif opt == '-p': Params.proj_path = arg elif opt == '-P': Params.test_path = arg elif opt == '-s': Params.suffix = arg elif opt == '-t': Params.time_limit = int(arg) elif opt == '-w': Params.worker_num = int(arg) else: return (Params.PARSE_ERROR, 'Error: Unknown argument %r' % opt) return (Params.PARSE_ACCEPT, '') @staticmethod def HelpString(): ret = '\n'.join([ 'USAGE:', ' test.py [-c <compiler>] [-a <arguments>] [-p <proj_path>]' + ' [-P <test_path>] [-s <suffix>] [-t <time_limit>]' + ' [-w <worker_num] [-h]', '', 'ARGUMENTS:', ' -c <compiler> Specifies the compiler, default=%s' % Default.COMPILER, ' -a <arguments> Specifies the compile arguments, default=%s' % Default.COMPILE_ARGS, ' -p <proj_path> Specifies the project pathname, default=%s' % Default.PROJ_PATH, ' -P <test_path> Specifies the unittest pathname, default=%s' % Default.TEST_PATH, ' -s <suffix> Specifies the suffix string for the testing' + ' file, default=%s' % Default.SUFFIX, ' -t <time_limit> Specifies the time limit for each testing, ' + 'default=%d' % Default.TIME_LIMIT, ' -w <worker_num> Specifies the number of workers, default=%d' % Default.WORKER_NUM, ]) return ret @staticmethod def PrintParams(): print('Compiler & its arguments: %r' % GetCompileCmd()) print('Time limit per test: %r' % Params.time_limit) print('Number of workers: %r' % Params.worker_num) print('Project path: %r' % Params.proj_path) print('Unittest files path: %r (with suffix=%r)' % (Params.test_path, Params.suffix)) def GetCompileCmd(): return ' '.join([Params.compiler, '-I %r' % Params.proj_path, Params.compile_args]) class Testing(object): def __init__(self, path, filename): self._name = filename self._source_pathname = path + '/' + filename self._exec_pathname = path + '/' + self._name + '.bin' self._log_pathname = path + '/' + self._name + '.log' self._proc = None def Test(self): self._InitLogfile() if not self._Compile(): return (False, 'Compile error') self._Run() if self._WaitTimeout(Params.time_limit) is None: self.Stop() return (False, 'Timeout') return (True, '') if self._IsSuccessful() else (False, 'Test failure.') def _InitLogfile(self): if not os.path.isfile(self._log_pathname): with open(self._log_pathname, 'w'): pass else: with open(self._log_pathname, 'a') as f: f.write(('=' * 72) + '\n') def _Compile(self): retcode = None try: compile_cmd = GetCompileCmd() retcode = subprocess.call(compile_cmd + " -o '%s' " % self._exec_pathname + self._source_pathname + ' >>%s' % self._log_pathname + ' 2>&1', shell=True) except: pass return (retcode == 0) def _Run(self): self._proc = subprocess.Popen(self._exec_pathname + ' >>%s' % self._log_pathname + ' 2>&1', shell=True, preexec_fn=os.setsid) def Stop(self): os.killpg(self._proc.pid, signal.SIGTERM) if self._WaitTimeout(TERMINATE_TIMEOUT) is None: os.killpg(self._proc.pid, signal.SIGKILL) def _IsSuccessful(self): return (self._proc.returncode == 0) def _WaitTimeout(self, timeout): time_sum = 0 while self._proc.poll() is None and time_sum < timeout: time.sleep(TIME_SLICE) time_sum += TIME_SLICE return self._proc.poll() @property def name(self): return self._name @property def log_filename(self): return self._log_pathname class StopWorkerNotify(object): pass def Worker(job_queue): while True: try: job = job_queue.get(True, WAIT_TIMEOUT) except: continue if isinstance(job, StopWorkerNotify): break succ, reason = job.Test() if succ: Log.Pass(job.name) else: Log.Fail(job.name, reason) class Log(object): _all_pass = True _print_lock = threading.Lock() @staticmethod def Pass(test_name): with Log._print_lock: print('\033[32m%r >>> Passed\033[39m' % test_name) @staticmethod def Fail(test_name, reason): Log._all_pass = False with Log._print_lock: print('\033[31m%r >>> Failed for %r \033[39m' % (test_name, reason)) @staticmethod def IsAllPass(): return Log._all_pass def ParseArgv(): result, result_str = Params.Parse(sys.argv[1:]) if result == Params.PARSE_HELP: print(result_str) return (False, 0) elif result == Params.PARSE_ERROR: sys.stderr.write(result_str) return (False, 1) return (True, 0) def CreateAndStartWorkers(job_queue): workers = [] for i in range(Params.worker_num): worker = threading.Thread(target=Worker, args=(job_queue,)) worker.daemon = True worker.start() workers += [worker] return workers def PutsJobs(job_queue): for (path, unused_dirnames, filenames) in os.walk(Params.test_path): for filename in filenames: if filename.endswith(Params.suffix): job_queue.put(Testing(path, filename)) def JoinWorkers(workers, job_queue): for i in range(Params.worker_num): job_queue.put(StopWorkerNotify()) while workers: alive_workers = [] for worker in workers: worker.join(timeout=JOIN_TIMEOUT) if worker.isAlive(): alive_workers += [worker] workers = alive_workers def RunTestings(): job_queue = Queue.Queue() workers = CreateAndStartWorkers(job_queue) PutsJobs(job_queue) JoinWorkers(workers, job_queue) return (0 if Log.IsAllPass else 1) def SignalTermHandler(signal, frame): pass def SignalIntHandler(signal, frame): pass def HandleSignals(): signal.signal(signal.SIGTERM, SignalTermHandler) signal.signal(signal.SIGINT, SignalIntHandler) def Main(): HandleSignals() keep_going, exit_code = ParseArgv() if not keep_going: return exit_code Params.PrintParams() exit_code = RunTestings() return exit_code if __name__ == '__main__': exit_code = Main() exit(exit_code)