from __future__ import print_function import __main__ import argparse import fileinput import multiprocessing import os import re import resource import signal import stat import sys import time from pybin import settings from subprocess import Popen, PIPE, STDOUT ################################################################################ # shell helpers ################################################################################ # helper functions to run terminal commands def sh(cmd, print2stdout = True, input = None): # add input redirection if needed if input and os.path.isfile(input): cmd += " < %s" % input # if this is a dry_run, only print the commands that would be ran if settings.dry_run : print("cmd: %s" % cmd) return 0, None # otherwise create a pipe and run the desired command else : proc = Popen(cmd, stdout=None if print2stdout else PIPE, stderr=STDOUT, shell=True) out, err = proc.communicate() return proc.returncode, out def is_ascii(fname): if settings.dry_run: print("is_ascii: %s" % fname) return True if not os.path.isfile(fname): return False code, out = sh("file %s" % fname, print2stdout = False) if code != 0: return False match = re.search(".*: (.*)", out) if not match: return False return match.group(1).startswith("ASCII text") # Remove 1 or more files silently def rm( files ): if isinstance( files, basestring ): sh("rm -f %s > /dev/null 2>&1" % files ) else: for file in files: sh("rm -f %s > /dev/null 2>&1" % file ) # Create 1 or more directory def mkdir( files ): if isinstance( files, basestring ): sh("mkdir -p %s" % os.path.dirname(files) ) else: for file in files: sh("mkdir -p %s" % os.path.dirname(file) ) def chdir( dest = __main__.__file__ ): abspath = os.path.abspath(dest) dname = os.path.dirname(abspath) os.chdir(dname) # diff two files def diff( lhs, rhs ): # diff the output of the files diff_cmd = ("diff --text " # "--ignore-all-space " # "--ignore-blank-lines " "--old-group-format='\t\tmissing lines :\n" "%%<' \\\n" "--new-group-format='\t\tnew lines :\n" "%%>' \\\n" "--unchanged-group-format='%%=' \\" "--changed-group-format='\t\texpected :\n" "%%<" "\t\tgot :\n" "%%>\n' \\\n" "--new-line-format='\t\t%%dn\t%%L' \\\n" "--old-line-format='\t\t%%dn\t%%L' \\\n" "--unchanged-line-format='' \\\n" "%s %s") # fetch return code and error from the diff command return sh(diff_cmd % (lhs, rhs), False) # call make def make(target, flags = '', redirects = '', error_file = None, silent = False): test_param = """test="%s" """ % (error_file) if error_file else '' cmd = ' '.join([ settings.make, '-s' if silent else '', test_param, settings.arch.flags, settings.debug.flags, settings.install.flags, flags, target, redirects ]) return sh(cmd) def which(program): import os def is_exe(fpath): return os.path.isfile(fpath) and os.access(fpath, os.X_OK) fpath, fname = os.path.split(program) if fpath: if is_exe(program): return program else: for path in os.environ["PATH"].split(os.pathsep): exe_file = os.path.join(path, program) if is_exe(exe_file): return exe_file return None def run(exe, output, input): ret, _ = sh("timeout %d %s > %s 2>&1" % (settings.timeout.single, exe, output), input = input) return ret ################################################################################ # file handling ################################################################################ # move a file def mv(source, dest): ret, _ = sh("mv %s %s" % (source, dest)) return ret # cat one file into the other def cat(source, dest): ret, _ = sh("cat %s > %s" % (source, dest)) return ret # helper function to replace patterns in a file def file_replace(fname, pat, s_after): if settings.dry_run: print("replacing '%s' with '%s' in %s" % (pat, s_after, fname)) return file = fileinput.FileInput(fname, inplace=True, backup='.bak') for line in file: print(line.replace(pat, s_after), end='') file.close() # helper function to check if a files contains only a specific string def fileContainsOnly(file, text) : with open(file) as f: ff = f.read().strip() result = ff == text.strip() return result; # check whether or not a file is executable def fileIsExecutable(file) : try : fileinfo = os.stat(file) return bool(fileinfo.st_mode & stat.S_IXUSR) except Exception as inst: print(type(inst)) # the exception instance print(inst.args) # arguments stored in .args print(inst) return False # transform path to canonical form def canonicalPath(path): abspath = os.path.abspath(__main__.__file__) dname = os.path.dirname(abspath) return os.path.join(dname, os.path.normpath(path) ) # compare path even if form is different def pathCmp(lhs, rhs): return canonicalPath( lhs ) == canonicalPath( rhs ) # walk all files in a path def pathWalk( op ): def step(_, dirname, names): for name in names: path = os.path.join(dirname, name) op( path ) # Start the walk dname = settings.SRCDIR os.path.walk(dname, step, '') ################################################################################ # system ################################################################################ # count number of jobs to create def jobCount( options, tests ): # check if the user already passed in a number of jobs for multi-threading if not options.jobs: make_flags = os.environ.get('MAKEFLAGS') force = bool(make_flags) make_jobs_fds = re.search("--jobserver-(auth|fds)=\s*([0-9]+),([0-9]+)", make_flags) if make_flags else None if make_jobs_fds : tokens = os.read(int(make_jobs_fds.group(2)), 1024) options.jobs = len(tokens) os.write(int(make_jobs_fds.group(3)), tokens) else : options.jobs = multiprocessing.cpu_count() else : force = True # make sure we have a valid number of jobs that corresponds to user input if options.jobs <= 0 : print('ERROR: Invalid number of jobs', file=sys.stderr) sys.exit(1) return min( options.jobs, len(tests) ), force # setup a proper processor pool with correct signal handling def setupPool(jobs): original_sigint_handler = signal.signal(signal.SIGINT, signal.SIG_IGN) pool = multiprocessing.Pool(jobs) signal.signal(signal.SIGINT, original_sigint_handler) return pool # handle signals in scope class SignalHandling(): def __enter__(self): # enable signal handling signal.signal(signal.SIGINT, signal.SIG_DFL) def __exit__(self, type, value, traceback): # disable signal handling signal.signal(signal.SIGINT, signal.SIG_IGN) # enable core dumps for all the test children resource.setrlimit(resource.RLIMIT_CORE, (resource.RLIM_INFINITY, resource.RLIM_INFINITY)) ################################################################################ # misc ################################################################################ # check if arguments is yes or no def yes_no(string): if string == "yes" : return True if string == "no" : return False raise argparse.ArgumentTypeError(msg) return False def fancy_print(text): column = which('column') if column: cmd = "%s 2> /dev/null" % column proc = Popen(cmd, stdin=PIPE, stderr=None, shell=True) proc.communicate(input=text + "\n") else: print(text) def coreInfo(path): cmd = os.path.join(settings.SRCDIR, "pybin/print-core.gdb") if not os.path.isfile(cmd): return 1, "ERR Printing format for core dumps not found" dname = os.path.dirname(path) core = os.path.join(dname, "core" ) if not os.path.isfile(path): return 1, "ERR Executable path is wrong" if not os.path.isfile(core): return 1, "ERR No core dump" return sh("gdb -n %s %s -batch -x %s" % (path, core, cmd), print2stdout=False) class Timed: def __enter__(self): self.start = time.time() return self def __exit__(self, *args): self.end = time.time() self.duration = self.end - self.start