import __main__ import argparse import contextlib import datetime import fileinput import multiprocessing import os import re import resource import signal import stat import subprocess import sys import tempfile import time import types from pybin import settings ################################################################################ # shell helpers ################################################################################ # helper functions to run terminal commands def sh(*cmd, timeout = False, output = None, input = None, error = subprocess.STDOUT): cmd = list(cmd) # if this is a dry_run, only print the commands that would be ran if settings.dry_run : cmd = "{} cmd: {}".format(os.getcwd(), ' '.join(cmd)) if output and not isinstance(output, int): cmd += " > " cmd += output if error and not isinstance(error, int): cmd += " 2> " cmd += error if input and not isinstance(input, int) and os.path.isfile(input): cmd += " < " cmd += input print(cmd) return 0, None with contextlib.ExitStack() as onexit: # add input redirection if needed input = openfd(input, 'r', onexit, True) # add output redirection if needed output = openfd(output, 'w', onexit, False) # add error redirection if needed error = openfd(error, 'w', onexit, False) # run the desired command try: proc = subprocess.run( cmd, stdin =input, stdout=output, stderr=error, timeout=settings.timeout.single if timeout else None ) return proc.returncode, proc.stdout.decode("utf-8") if proc.stdout else None except subprocess.TimeoutExpired: return 124, str(None) def is_ascii(fname): if settings.dry_run: print("is_ascii: %s" % fname) return True if not os.path.isfile(fname): return False code, out = sh("file %s" % fname, output=subprocess.PIPE) if code != 0: return False match = re.search(".*: (.*)", out) if not match: return False return match.group(1).startswith("ASCII text") def is_exe(fname): return os.path.isfile(fname) and os.access(fname, os.X_OK) def openfd(file, mode, exitstack, checkfile): if not file: return file if isinstance(file, int): return file if checkfile and not os.path.isfile(file): return None file = open(file, mode) exitstack.push(file) return file # Remove 1 or more files silently def rm( files ): if isinstance(files, str ): files = [ files ] for file in files: sh( 'rm', '-f', file, output=subprocess.DEVNULL, error=subprocess.DEVNULL ) # Create 1 or more directory def mkdir( files ): if isinstance(files, str ): files = [ files ] for file in files: p = os.path.normpath( file ) d = os.path.dirname ( p ) sh( 'mkdir', '-p', d, output=subprocess.DEVNULL, error=subprocess.DEVNULL ) def chdir( dest = __main__.__file__ ): abspath = os.path.abspath(dest) dname = os.path.dirname(abspath) os.chdir(dname) # diff two files def diff( lhs, rhs ): # fetch return code and error from the diff command return sh( '''diff''', '''--text''', '''--old-group-format=\t\tmissing lines :\n%<''', '''--new-line-format=\t\t%dn\t%L''', '''--new-group-format=\t\tnew lines : \n%>''', '''--old-line-format=\t\t%dn\t%L''', '''--unchanged-group-format=%=''', '''--changed-group-format=\t\texpected :\n%<\t\tgot :\n%>''', '''--unchanged-line-format=''', lhs, rhs, output=subprocess.PIPE ) # call make def make(target, *, flags = '', output = None, error = None, error_file = None, silent = False): test_param = """test="%s" """ % (error_file) if error_file else None cmd = [ *settings.make, '-s' if silent else None, test_param, settings.arch.flags, settings.debug.flags, settings.install.flags, flags, target ] cmd = [s for s in cmd if s] return sh(*cmd, output=output, error=error) def which(program): fpath, fname = os.path.split(program) if fpath: if is_exe(program): return program else: for path in os.environ["PATH"].split(os.pathsep): exe_file = os.path.join(path, program) if is_exe(exe_file): return exe_file return None @contextlib.contextmanager def tempdir(): cwd = os.getcwd() with tempfile.TemporaryDirectory() as temp: os.chdir(temp) try: yield temp finally: os.chdir(cwd) def killgroup(): try: os.killpg(os.getpgrp(), signal.SIGINT) except KeyboardInterrupt: pass # expected except Exception as exc: print("Unexpected exception", file=sys.stderr) print(exc, file=sys.stderr) sys.stderr.flush() sys.exit(2) ################################################################################ # file handling ################################################################################ # move a file def mv(source, dest): ret, _ = sh("mv", source, dest) return ret # cat one file into the other def cat(source, dest): ret, _ = sh("cat", source, output=dest) return ret # helper function to replace patterns in a file def file_replace(fname, pat, s_after): if settings.dry_run: print("replacing '%s' with '%s' in %s" % (pat, s_after, fname)) return file = fileinput.FileInput(fname, inplace=True, backup='.bak') for line in file: print(line.replace(pat, s_after), end='') file.close() # helper function to check if a files contains only a specific string def file_contains_only(file, text) : with open(file) as f: ff = f.read().strip() result = ff == text.strip() return result # transform path to canonical form def canonical_path(path): abspath = os.path.abspath(__main__.__file__) dname = os.path.dirname(abspath) return os.path.join(dname, os.path.normpath(path) ) # compare path even if form is different def path_cmp(lhs, rhs): return canonical_path( lhs ) == canonical_path( rhs ) # walk all files in a path def path_walk( op ): dname = settings.SRCDIR for dirname, _, names in os.walk(dname): for name in names: path = os.path.join(dirname, name) op( path ) ################################################################################ # system ################################################################################ # count number of jobs to create def job_count( options, tests ): # check if the user already passed in a number of jobs for multi-threading if not options.jobs: make_flags = os.environ.get('MAKEFLAGS') force = bool(make_flags) make_jobs_fds = re.search("--jobserver-(auth|fds)=\s*([0-9]+),([0-9]+)", make_flags) if make_flags else None if make_jobs_fds : tokens = os.read(int(make_jobs_fds.group(2)), 1024) options.jobs = len(tokens) os.write(int(make_jobs_fds.group(3)), tokens) else : options.jobs = multiprocessing.cpu_count() else : force = True # make sure we have a valid number of jobs that corresponds to user input if options.jobs <= 0 : print('ERROR: Invalid number of jobs', file=sys.stderr) sys.exit(1) return min( options.jobs, len(tests) ), force # enable core dumps for all the test children resource.setrlimit(resource.RLIMIT_CORE, (resource.RLIM_INFINITY, resource.RLIM_INFINITY)) ################################################################################ # misc ################################################################################ def pretty_now(): ts = time.time() print(ts, file=sys.stderr) return datetime.datetime.fromtimestamp(ts).strftime('%Y-%m-%d_%H:%M:%S') # check if arguments is yes or no def yes_no(string): if string == "yes" : return True if string == "no" : return False raise argparse.ArgumentTypeError(msg) def fancy_print(text): column = which('column') if column: subprocess.run(column, input=bytes(text + "\n", "UTF-8")) else: print(text) def core_info(path): if not os.path.isfile(path): return 1, "ERR Executable path is wrong" cmd = os.path.join(settings.SRCDIR, "pybin/print-core.gdb") if not os.path.isfile(cmd): return 1, "ERR Printing format for core dumps not found" core = os.path.join(os.getcwd(), "core" ) if not os.path.isfile(core): return 1, "ERR No core dump" return sh('gdb', '-n', path, core, '-batch', '-x', cmd, output=subprocess.PIPE) def core_archive(dst, name, exe): # Get the files to copy core = os.path.join(os.getcwd(), "core" ) # Uncomment if we want timestamps on coredumps # dst = os.path.join(dst, "%s_%s" % (name, pretty_now())) # make a directory for this test mkdir(os.path.join(dst, "dir")) # moves the files mv( core, os.path.join(dst, "core" ) ) mv( exe , os.path.join(dst, name ) ) # return explanatory test return "Archiving %s (executable and core) to %s" % (os.path.relpath(exe, settings.BUILDDIR), os.path.relpath(dst, settings.original_path)) class Timed: def __enter__(self): self.start = time.time() return self def __exit__(self, *args): self.end = time.time() self.duration = self.end - self.start def timed(src, timeout): expire = time.time() + timeout i = iter(src) while True: yield i.next(max(expire - time.time(), 0))