source: tests/pybin/tools.py @ 5b993e0

ADTarm-ehast-experimentalcleanup-dtorsenumforall-pointer-decayjacob/cs343-translationjenkins-sandboxnew-astnew-ast-unique-exprpthread-emulationqualifiedEnum
Last change on this file since 5b993e0 was 5b993e0, checked in by tdelisle <tdelisle@…>, 5 years ago

Updated test scripts to use python3

  • Property mode set to 100644
File size: 8.0 KB
Line 
1import __main__
2import argparse
3import fileinput
4import multiprocessing
5import os
6import re
7import resource
8import signal
9import stat
10import sys
11import time
12import types
13
14from pybin import settings
15from subprocess import Popen, PIPE, STDOUT
16
17################################################################################
18#               shell helpers
19################################################################################
20
21# helper functions to run terminal commands
22def sh(cmd, print2stdout = True, input = None):
23        # add input redirection if needed
24        if input and os.path.isfile(input):
25                cmd += " < %s" % input
26
27        # if this is a dry_run, only print the commands that would be ran
28        if settings.dry_run :
29                print("cmd: %s" % cmd)
30                return 0, None
31
32        # otherwise create a pipe and run the desired command
33        else :
34                proc = Popen(cmd, stdout=None if print2stdout else PIPE, stderr=STDOUT, shell=True)
35                out, err = proc.communicate()
36                return proc.returncode, out
37
38def is_ascii(fname):
39        if settings.dry_run:
40                print("is_ascii: %s" % fname)
41                return True
42
43        if not os.path.isfile(fname):
44                return False
45
46        code, out = sh("file %s" % fname, print2stdout = False)
47        if code != 0:
48                return False
49
50        match = re.search(".*: (.*)", out)
51
52        if not match:
53                return False
54
55        return match.group(1).startswith("ASCII text")
56
57# Remove 1 or more files silently
58def rm( files ):
59        if isinstance(files, str ): files = [ files ]
60        for file in files:
61                sh("rm -f %s > /dev/null 2>&1" % file )
62
63# Create 1 or more directory
64def mkdir( files ):
65        if isinstance(files, str ): files = [ files ]
66        for file in files:
67                p = os.path.normpath( file )
68                d = os.path.dirname ( p )
69                sh( "mkdir -p {}".format(d) )
70
71
72def chdir( dest = __main__.__file__ ):
73        abspath = os.path.abspath(dest)
74        dname = os.path.dirname(abspath)
75        os.chdir(dname)
76
77# diff two files
78def diff( lhs, rhs ):
79        # diff the output of the files
80        diff_cmd = ("diff --text "
81#                               "--ignore-all-space "
82#                               "--ignore-blank-lines "
83                                "--old-group-format='\t\tmissing lines :\n"
84                                "%%<' \\\n"
85                                "--new-group-format='\t\tnew lines :\n"
86                                "%%>' \\\n"
87                                "--unchanged-group-format='%%=' \\"
88                                "--changed-group-format='\t\texpected :\n"
89                                "%%<"
90                                "\t\tgot :\n"
91                                "%%>\n' \\\n"
92                                "--new-line-format='\t\t%%dn\t%%L' \\\n"
93                                "--old-line-format='\t\t%%dn\t%%L' \\\n"
94                                "--unchanged-line-format='' \\\n"
95                                "%s %s")
96
97        # fetch return code and error from the diff command
98        return sh(diff_cmd % (lhs, rhs), False)
99
100# call make
101def make(target, flags = '', redirects = '', error_file = None, silent = False):
102        test_param = """test="%s" """ % (error_file) if error_file else ''
103        cmd = ' '.join([
104                settings.make,
105                '-s' if silent else '',
106                test_param,
107                settings.arch.flags,
108                settings.debug.flags,
109                settings.install.flags,
110                flags,
111                target,
112                redirects
113        ])
114        return sh(cmd)
115
116def which(program):
117    import os
118    def is_exe(fpath):
119        return os.path.isfile(fpath) and os.access(fpath, os.X_OK)
120
121    fpath, fname = os.path.split(program)
122    if fpath:
123        if is_exe(program):
124            return program
125    else:
126        for path in os.environ["PATH"].split(os.pathsep):
127            exe_file = os.path.join(path, program)
128            if is_exe(exe_file):
129                return exe_file
130
131    return None
132
133def run(exe, output, input):
134        ret, _ = sh("timeout %d %s > %s 2>&1" % (settings.timeout.single, exe, output), input = input)
135        return ret
136
137################################################################################
138#               file handling
139################################################################################
140# move a file
141def mv(source, dest):
142        ret, _ = sh("mv %s %s" % (source, dest))
143        return ret
144
145# cat one file into the other
146def cat(source, dest):
147        ret, _ = sh("cat %s > %s" % (source, dest))
148        return ret
149
150# helper function to replace patterns in a file
151def file_replace(fname, pat, s_after):
152        if settings.dry_run:
153                print("replacing '%s' with '%s' in %s" % (pat, s_after, fname))
154                return
155
156        file = fileinput.FileInput(fname, inplace=True, backup='.bak')
157        for line in file:
158                print(line.replace(pat, s_after), end='')
159        file.close()
160
161# helper function to check if a files contains only a specific string
162def fileContainsOnly(file, text) :
163        with open(file) as f:
164                ff = f.read().strip()
165                result = ff == text.strip()
166
167                return result;
168
169# check whether or not a file is executable
170def fileIsExecutable(file) :
171        try :
172                fileinfo = os.stat(file)
173                return bool(fileinfo.st_mode & stat.S_IXUSR)
174        except Exception as inst:
175                print(type(inst))    # the exception instance
176                print(inst.args)     # arguments stored in .args
177                print(inst)
178                return False
179
180# transform path to canonical form
181def canonicalPath(path):
182        abspath = os.path.abspath(__main__.__file__)
183        dname = os.path.dirname(abspath)
184        return os.path.join(dname, os.path.normpath(path) )
185
186# compare path even if form is different
187def pathCmp(lhs, rhs):
188        return canonicalPath( lhs ) == canonicalPath( rhs )
189
190# walk all files in a path
191def pathWalk( op ):
192        def step(_, dirname, names):
193                for name in names:
194                        path = os.path.join(dirname, name)
195                        op( path )
196
197        # Start the walk
198        dname = settings.SRCDIR
199        for dirname, _, names in os.walk(dname):
200                for name in names:
201                        path = os.path.join(dirname, name)
202                        op( path )
203
204################################################################################
205#               system
206################################################################################
207# count number of jobs to create
208def jobCount( options, tests ):
209        # check if the user already passed in a number of jobs for multi-threading
210        if not options.jobs:
211                make_flags = os.environ.get('MAKEFLAGS')
212                force = bool(make_flags)
213                make_jobs_fds = re.search("--jobserver-(auth|fds)=\s*([0-9]+),([0-9]+)", make_flags) if make_flags else None
214                if make_jobs_fds :
215                        tokens = os.read(int(make_jobs_fds.group(2)), 1024)
216                        options.jobs = len(tokens)
217                        os.write(int(make_jobs_fds.group(3)), tokens)
218                else :
219                        options.jobs = multiprocessing.cpu_count()
220        else :
221                force = True
222
223        # make sure we have a valid number of jobs that corresponds to user input
224        if options.jobs <= 0 :
225                print('ERROR: Invalid number of jobs', file=sys.stderr)
226                sys.exit(1)
227
228        return min( options.jobs, len(tests) ), force
229
230# setup a proper processor pool with correct signal handling
231def setupPool(jobs):
232        original_sigint_handler = signal.signal(signal.SIGINT, signal.SIG_IGN)
233        pool = multiprocessing.Pool(jobs)
234        signal.signal(signal.SIGINT, original_sigint_handler)
235
236        return pool
237
238# handle signals in scope
239class SignalHandling():
240        def __enter__(self):
241                # enable signal handling
242                signal.signal(signal.SIGINT, signal.SIG_DFL)
243
244        def __exit__(self, type, value, traceback):
245                # disable signal handling
246                signal.signal(signal.SIGINT, signal.SIG_IGN)
247
248
249# enable core dumps for all the test children
250resource.setrlimit(resource.RLIMIT_CORE, (resource.RLIM_INFINITY, resource.RLIM_INFINITY))
251
252################################################################################
253#               misc
254################################################################################
255
256# check if arguments is yes or no
257def yes_no(string):
258        if string == "yes" :
259                return True
260        if string == "no" :
261                return False
262        raise argparse.ArgumentTypeError(msg)
263        return False
264
265def fancy_print(text):
266        column = which('column')
267        if column:
268                cmd = "%s 2> /dev/null" % column
269                proc = Popen(cmd, stdin=PIPE, stderr=None, shell=True)
270                proc.communicate(input=bytes(text + "\n", "UTF-8"))
271        else:
272                print(text)
273
274
275def coreInfo(path):
276        cmd   = os.path.join(settings.SRCDIR, "pybin/print-core.gdb")
277        if not os.path.isfile(cmd):
278                return 1, "ERR Printing format for core dumps not found"
279
280        dname = os.path.dirname(path)
281        core  = os.path.join(dname, "core" )
282        if not os.path.isfile(path):
283                return 1, "ERR Executable path is wrong"
284
285        if not os.path.isfile(core):
286                return 1, "ERR No core dump"
287
288        return sh("gdb -n %s %s -batch -x %s" % (path, core, cmd), print2stdout=False)
289
290class Timed:
291    def __enter__(self):
292        self.start = time.time()
293        return self
294
295    def __exit__(self, *args):
296        self.end = time.time()
297        self.duration = self.end - self.start
Note: See TracBrowser for help on using the repository browser.