import __main__ import argparse import fileinput import multiprocessing import os import re import resource import signal import stat import subprocess import sys import time import types from pybin import settings from subprocess import Popen, PIPE, STDOUT ################################################################################ # shell helpers ################################################################################ # helper functions to run terminal commands def sh(cmd, print2stdout = True, timeout = False, output = None, input = None): # add input redirection if needed if input and os.path.isfile(input): cmd += " < %s" % input # add output redirection if needed if output: cmd += " > %s" % output # if this is a dry_run, only print the commands that would be ran if settings.dry_run : print("cmd: %s" % cmd) return 0, None # otherwise create a pipe and run the desired command else : try: proc = subprocess.run( cmd, stdout=None if print2stdout else PIPE, stderr=STDOUT, shell=True, timeout=settings.timeout.single if timeout else None ) return proc.returncode, proc.stdout.decode("utf-8") if proc.stdout else None except subprocess.TimeoutExpired: return 124, str(None) def is_ascii(fname): if settings.dry_run: print("is_ascii: %s" % fname) return True if not os.path.isfile(fname): return False code, out = sh("file %s" % fname, print2stdout = False) if code != 0: return False match = re.search(".*: (.*)", out) if not match: return False return match.group(1).startswith("ASCII text") def is_exe(fname): return os.path.isfile(fname) and os.access(fname, os.X_OK) # Remove 1 or more files silently def rm( files ): if isinstance(files, str ): files = [ files ] for file in files: sh("rm -f %s > /dev/null 2>&1" % file ) # Create 1 or more directory def mkdir( files ): if isinstance(files, str ): files = [ files ] for file in files: p = os.path.normpath( file ) d = os.path.dirname ( p ) sh( "mkdir -p {}".format(d) ) def chdir( dest = __main__.__file__ ): abspath = os.path.abspath(dest) dname = os.path.dirname(abspath) os.chdir(dname) # diff two files def diff( lhs, rhs ): # diff the output of the files diff_cmd = ("diff --text " "--old-group-format='\t\tmissing lines :\n" "%%<' \\\n" "--new-group-format='\t\tnew lines :\n" "%%>' \\\n" "--unchanged-group-format='%%=' \\" "--changed-group-format='\t\texpected :\n" "%%<" "\t\tgot :\n" "%%>\n' \\\n" "--new-line-format='\t\t%%dn\t%%L' \\\n" "--old-line-format='\t\t%%dn\t%%L' \\\n" "--unchanged-line-format='' \\\n" "%s %s") # fetch return code and error from the diff command return sh(diff_cmd % (lhs, rhs), False) # call make def make(target, flags = '', redirects = '', error_file = None, silent = False): test_param = """test="%s" """ % (error_file) if error_file else '' cmd = ' '.join([ settings.make, '-s' if silent else '', test_param, settings.arch.flags, settings.debug.flags, settings.install.flags, flags, target, redirects ]) return sh(cmd) def which(program): fpath, fname = os.path.split(program) if fpath: if is_exe(program): return program else: for path in os.environ["PATH"].split(os.pathsep): exe_file = os.path.join(path, program) if is_exe(exe_file): return exe_file return None ################################################################################ # file handling ################################################################################ # move a file def mv(source, dest): ret, _ = sh("mv %s %s" % (source, dest)) return ret # cat one file into the other def cat(source, dest): ret, _ = sh("cat %s > %s" % (source, dest)) return ret # helper function to replace patterns in a file def file_replace(fname, pat, s_after): if settings.dry_run: print("replacing '%s' with '%s' in %s" % (pat, s_after, fname)) return file = fileinput.FileInput(fname, inplace=True, backup='.bak') for line in file: print(line.replace(pat, s_after), end='') file.close() # helper function to check if a files contains only a specific string def file_contains_only(file, text) : with open(file) as f: ff = f.read().strip() result = ff == text.strip() return result # transform path to canonical form def canonical_path(path): abspath = os.path.abspath(__main__.__file__) dname = os.path.dirname(abspath) return os.path.join(dname, os.path.normpath(path) ) # compare path even if form is different def path_cmp(lhs, rhs): return canonical_path( lhs ) == canonical_path( rhs ) # walk all files in a path def path_walk( op ): dname = settings.SRCDIR for dirname, _, names in os.walk(dname): for name in names: path = os.path.join(dirname, name) op( path ) ################################################################################ # system ################################################################################ # count number of jobs to create def job_count( options, tests ): # check if the user already passed in a number of jobs for multi-threading if not options.jobs: make_flags = os.environ.get('MAKEFLAGS') force = bool(make_flags) make_jobs_fds = re.search("--jobserver-(auth|fds)=\s*([0-9]+),([0-9]+)", make_flags) if make_flags else None if make_jobs_fds : tokens = os.read(int(make_jobs_fds.group(2)), 1024) options.jobs = len(tokens) os.write(int(make_jobs_fds.group(3)), tokens) else : options.jobs = multiprocessing.cpu_count() else : force = True # make sure we have a valid number of jobs that corresponds to user input if options.jobs <= 0 : print('ERROR: Invalid number of jobs', file=sys.stderr) sys.exit(1) return min( options.jobs, len(tests) ), force # enable core dumps for all the test children resource.setrlimit(resource.RLIMIT_CORE, (resource.RLIM_INFINITY, resource.RLIM_INFINITY)) ################################################################################ # misc ################################################################################ # check if arguments is yes or no def yes_no(string): if string == "yes" : return True if string == "no" : return False raise argparse.ArgumentTypeError(msg) def fancy_print(text): column = which('column') if column: subprocess.run(column, input=bytes(text + "\n", "UTF-8")) else: print(text) def core_info(path): cmd = os.path.join(settings.SRCDIR, "pybin/print-core.gdb") if not os.path.isfile(cmd): return 1, "ERR Printing format for core dumps not found" dname = os.path.dirname(path) core = os.path.join(dname, "core" ) if not os.path.isfile(path): return 1, "ERR Executable path is wrong" if not os.path.isfile(core): return 1, "ERR No core dump" return sh("gdb -n %s %s -batch -x %s" % (path, core, cmd), print2stdout=False) class Timed: def __enter__(self): self.start = time.time() return self def __exit__(self, *args): self.end = time.time() self.duration = self.end - self.start