from __future__ import print_function import __main__ import argparse import multiprocessing import os import re import signal import stat import sys import fileinput from pybin import settings from subprocess import Popen, PIPE, STDOUT ################################################################################ # shell helpers ################################################################################ # helper functions to run terminal commands def sh(cmd, print2stdout = True, input = None): # add input redirection if needed if input and os.path.isfile(input): cmd += " < %s" % input # if this is a dry_run, only print the commands that would be ran if settings.dry_run : print("cmd: %s" % cmd) return 0, None # otherwise create a pipe and run the desired command else : proc = Popen(cmd, stdout=None if print2stdout else PIPE, stderr=STDOUT, shell=True) out, err = proc.communicate() return proc.returncode, out def is_ascii(fname): if settings.dry_run: print("is_ascii: %s" % fname) return True if not os.path.isfile(fname): return False code, out = sh("file %s" % fname, print2stdout = False) if code != 0: return False match = re.search(".*: (.*)", out) if not match: return False return match.group(1).startswith("ASCII text") # Remove 1 or more files silently def rm( files ): if isinstance( files, basestring ): sh("rm -f %s > /dev/null 2>&1" % files ) else: for file in files: sh("rm -f %s > /dev/null 2>&1" % file ) # Create 1 or more directory def mkdir( files ): if isinstance( files, basestring ): sh("mkdir -p %s" % os.path.dirname(files) ) else: for file in files: sh("mkdir -p %s" % os.path.dirname(file) ) def chdir( dest = __main__.__file__ ): abspath = os.path.abspath(dest) dname = os.path.dirname(abspath) os.chdir(dname) # diff two files def diff( lhs, rhs ): # diff the output of the files diff_cmd = ("diff --ignore-all-space --text " "--ignore-blank-lines " "--old-group-format='\t\tmissing lines :\n" "%%<' \\\n" "--new-group-format='\t\tnew lines :\n" "%%>' \\\n" "--unchanged-group-format='%%=' \\" "--changed-group-format='\t\texpected :\n" "%%<" "\t\tgot :\n" "%%>\n' \\\n" "--new-line-format='\t\t%%dn\t%%L' \\\n" "--old-line-format='\t\t%%dn\t%%L' \\\n" "--unchanged-line-format='' \\\n" "%s %s") # fetch return code and error from the diff command return sh(diff_cmd % (lhs, rhs), False) # call make def make(target, flags = '', redirects = '', error_file = None, silent = False): test_param = """test="%s" """ % (error_file) if error_file else '' cmd = ' '.join([ settings.make, '-s' if silent else '', test_param, settings.arch.flags, settings.debug.flags, settings.install.flags, flags, target, redirects ]) return sh(cmd) def which(program): import os def is_exe(fpath): return os.path.isfile(fpath) and os.access(fpath, os.X_OK) fpath, fname = os.path.split(program) if fpath: if is_exe(program): return program else: for path in os.environ["PATH"].split(os.pathsep): exe_file = os.path.join(path, program) if is_exe(exe_file): return exe_file return None ################################################################################ # file handling ################################################################################ # helper function to replace patterns in a file def file_replace(fname, pat, s_after): if settings.dry_run: print("replacing '%s' with '%s' in %s" % (pat, s_after, fname)) return file = fileinput.FileInput(fname, inplace=True, backup='.bak') for line in file: print(line.replace(pat, s_after), end='') file.close() # helper function to check if a files contains only a specific string def fileContainsOnly(file, text) : with open(file) as f: ff = f.read().strip() result = ff == text.strip() return result; # check whether or not a file is executable def fileIsExecutable(file) : try : fileinfo = os.stat(file) return bool(fileinfo.st_mode & stat.S_IXUSR) except Exception as inst: print(type(inst)) # the exception instance print(inst.args) # arguments stored in .args print(inst) return False # transform path to canonical form def canonicalPath(path): abspath = os.path.abspath(__main__.__file__) dname = os.path.dirname(abspath) return os.path.join(dname, os.path.normpath(path) ) # compare path even if form is different def pathCmp(lhs, rhs): return canonicalPath( lhs ) == canonicalPath( rhs ) # walk all files in a path def pathWalk( op ): def step(_, dirname, names): for name in names: path = os.path.join(dirname, name) op( path ) # Start the walk dname = settings.SRCDIR os.path.walk(dname, step, '') ################################################################################ # system ################################################################################ # count number of jobs to create def jobCount( options, tests ): # check if the user already passed in a number of jobs for multi-threading if not options.jobs: make_flags = os.environ.get('MAKEFLAGS') force = bool(make_flags) make_jobs_fds = re.search("--jobserver-(auth|fds)=\s*([0-9]+),([0-9]+)", make_flags) if make_flags else None if make_jobs_fds : tokens = os.read(int(make_jobs_fds.group(2)), 1024) options.jobs = len(tokens) os.write(int(make_jobs_fds.group(3)), tokens) else : options.jobs = multiprocessing.cpu_count() else : force = True # make sure we have a valid number of jobs that corresponds to user input if options.jobs <= 0 : print('ERROR: Invalid number of jobs', file=sys.stderr) sys.exit(1) return min( options.jobs, len(tests) ), force # setup a proper processor pool with correct signal handling def setupPool(jobs): original_sigint_handler = signal.signal(signal.SIGINT, signal.SIG_IGN) pool = multiprocessing.Pool(jobs) signal.signal(signal.SIGINT, original_sigint_handler) return pool # handle signals in scope class SignalHandling(): def __enter__(self): # enable signal handling signal.signal(signal.SIGINT, signal.SIG_DFL) def __exit__(self, type, value, traceback): # disable signal handling signal.signal(signal.SIGINT, signal.SIG_IGN) ################################################################################ # misc ################################################################################ # check if arguments is yes or no def yes_no(string): if string == "yes" : return True if string == "no" : return False raise argparse.ArgumentTypeError(msg) return False def fancy_print(text): column = which('column') if column: cmd = "%s 2> /dev/null" % column print(cmd) proc = Popen(cmd, stdin=PIPE, stderr=None, shell=True) proc.communicate(input=text) else: print(text)