source: tests/pybin/tools.py @ 093a5d7

ADTarm-ehast-experimentalenumforall-pointer-decayjacob/cs343-translationjenkins-sandboxnew-astnew-ast-unique-exprpthread-emulationqualifiedEnum
Last change on this file since 093a5d7 was f806b61, checked in by tdelisle <tdelisle@…>, 6 years ago

Tests are now run in temporary directory

  • Property mode set to 100644
File size: 7.8 KB
Line 
1import __main__
2import argparse
3import contextlib
4import fileinput
5import multiprocessing
6import os
7import re
8import resource
9import signal
10import stat
11import subprocess
12import sys
13import tempfile
14import time
15import types
16
17from pybin import settings
18
19################################################################################
20#               shell helpers
21################################################################################
22
23# helper functions to run terminal commands
24def sh(*cmd, timeout = False, output = None, input = None, error = subprocess.STDOUT):
25        cmd = list(cmd)
26
27        # if this is a dry_run, only print the commands that would be ran
28        if settings.dry_run :
29                cmd = "{} cmd: {}".format(os.getcwd(), ' '.join(cmd))
30                if output and not isinstance(output, int):
31                        cmd += " > "
32                        cmd += output
33
34                if error and not isinstance(error, int):
35                        cmd += " 2> "
36                        cmd += error
37
38                if input and not isinstance(input, int) and os.path.isfile(input):
39                        cmd += " < "
40                        cmd += input
41
42                print(cmd)
43                return 0, None
44
45        with contextlib.ExitStack() as onexit:
46                # add input redirection if needed
47                input = openfd(input, 'r', onexit, True)
48
49                # add output redirection if needed
50                output = openfd(output, 'w', onexit, False)
51
52                # add error redirection if needed
53                error = openfd(error, 'w', onexit, False)
54
55                # run the desired command
56                try:
57                        proc = subprocess.run(
58                                cmd,
59                                stdin =input,
60                                stdout=output,
61                                stderr=error,
62                                timeout=settings.timeout.single if timeout else None
63                        )
64                        return proc.returncode, proc.stdout.decode("utf-8") if proc.stdout else None
65                except subprocess.TimeoutExpired:
66                        return 124, str(None)
67
68def is_ascii(fname):
69        if settings.dry_run:
70                print("is_ascii: %s" % fname)
71                return True
72
73        if not os.path.isfile(fname):
74                return False
75
76        code, out = sh("file %s" % fname, output=subprocess.PIPE)
77        if code != 0:
78                return False
79
80        match = re.search(".*: (.*)", out)
81
82        if not match:
83                return False
84
85        return match.group(1).startswith("ASCII text")
86
87def is_exe(fname):
88        return os.path.isfile(fname) and os.access(fname, os.X_OK)
89
90def openfd(file, mode, exitstack, checkfile):
91        if not file:
92                return file
93
94        if isinstance(file, int):
95                return file
96
97        if checkfile and not os.path.isfile(file):
98                return None
99
100        file = open(file, mode)
101        exitstack.push(file)
102        return file
103
104# Remove 1 or more files silently
105def rm( files ):
106        if isinstance(files, str ): files = [ files ]
107        for file in files:
108                sh( 'rm', '-f', file, output=subprocess.DEVNULL, error=subprocess.DEVNULL )
109
110# Create 1 or more directory
111def mkdir( files ):
112        if isinstance(files, str ): files = [ files ]
113        for file in files:
114                p = os.path.normpath( file )
115                d = os.path.dirname ( p )
116                sh( 'mkdir', '-p', d, output=subprocess.DEVNULL, error=subprocess.DEVNULL )
117
118
119def chdir( dest = __main__.__file__ ):
120        abspath = os.path.abspath(dest)
121        dname = os.path.dirname(abspath)
122        os.chdir(dname)
123
124# diff two files
125def diff( lhs, rhs ):
126        # fetch return code and error from the diff command
127        return sh(
128                '''diff''',
129                '''--text''',
130                '''--old-group-format=\t\tmissing lines :\n%<''',
131                '''--new-line-format=\t\t%dn\t%L''',
132                '''--new-group-format=\t\tnew lines : \n%>''',
133                '''--old-line-format=\t\t%dn\t%L''',
134                '''--unchanged-group-format=%=''',
135                '''--changed-group-format=\t\texpected :\n%<\t\tgot :\n%>''',
136                '''--unchanged-line-format=''',
137                lhs,
138                rhs,
139                output=subprocess.PIPE
140        )
141
142# call make
143def make(target, *, flags = '', output = None, error = None, error_file = None, silent = False):
144        test_param = """test="%s" """ % (error_file) if error_file else None
145        cmd = [
146                *settings.make,
147                '-s' if silent else None,
148                test_param,
149                settings.arch.flags,
150                settings.debug.flags,
151                settings.install.flags,
152                flags,
153                target
154        ]
155        cmd = [s for s in cmd if s]
156        return sh(*cmd, output=output, error=error)
157
158def which(program):
159    fpath, fname = os.path.split(program)
160    if fpath:
161        if is_exe(program):
162            return program
163    else:
164        for path in os.environ["PATH"].split(os.pathsep):
165            exe_file = os.path.join(path, program)
166            if is_exe(exe_file):
167                return exe_file
168
169    return None
170
171@contextlib.contextmanager
172def tempdir():
173        cwd = os.getcwd()
174        with tempfile.TemporaryDirectory() as temp:
175                os.chdir(temp)
176                try:
177                        yield temp
178                finally:
179                        os.chdir(cwd)
180
181################################################################################
182#               file handling
183################################################################################
184# move a file
185def mv(source, dest):
186        ret, _ = sh("mv", source, dest)
187        return ret
188
189# cat one file into the other
190def cat(source, dest):
191        ret, _ = sh("cat", source, output=dest)
192        return ret
193
194# helper function to replace patterns in a file
195def file_replace(fname, pat, s_after):
196        if settings.dry_run:
197                print("replacing '%s' with '%s' in %s" % (pat, s_after, fname))
198                return
199
200        file = fileinput.FileInput(fname, inplace=True, backup='.bak')
201        for line in file:
202                print(line.replace(pat, s_after), end='')
203        file.close()
204
205# helper function to check if a files contains only a specific string
206def file_contains_only(file, text) :
207        with open(file) as f:
208                ff = f.read().strip()
209                result = ff == text.strip()
210
211                return result
212
213# transform path to canonical form
214def canonical_path(path):
215        abspath = os.path.abspath(__main__.__file__)
216        dname = os.path.dirname(abspath)
217        return os.path.join(dname, os.path.normpath(path) )
218
219# compare path even if form is different
220def path_cmp(lhs, rhs):
221        return canonical_path( lhs ) == canonical_path( rhs )
222
223# walk all files in a path
224def path_walk( op ):
225        dname = settings.SRCDIR
226        for dirname, _, names in os.walk(dname):
227                for name in names:
228                        path = os.path.join(dirname, name)
229                        op( path )
230
231################################################################################
232#               system
233################################################################################
234# count number of jobs to create
235def job_count( options, tests ):
236        # check if the user already passed in a number of jobs for multi-threading
237        if not options.jobs:
238                make_flags = os.environ.get('MAKEFLAGS')
239                force = bool(make_flags)
240                make_jobs_fds = re.search("--jobserver-(auth|fds)=\s*([0-9]+),([0-9]+)", make_flags) if make_flags else None
241                if make_jobs_fds :
242                        tokens = os.read(int(make_jobs_fds.group(2)), 1024)
243                        options.jobs = len(tokens)
244                        os.write(int(make_jobs_fds.group(3)), tokens)
245                else :
246                        options.jobs = multiprocessing.cpu_count()
247        else :
248                force = True
249
250        # make sure we have a valid number of jobs that corresponds to user input
251        if options.jobs <= 0 :
252                print('ERROR: Invalid number of jobs', file=sys.stderr)
253                sys.exit(1)
254
255        return min( options.jobs, len(tests) ), force
256
257# enable core dumps for all the test children
258resource.setrlimit(resource.RLIMIT_CORE, (resource.RLIM_INFINITY, resource.RLIM_INFINITY))
259
260################################################################################
261#               misc
262################################################################################
263
264# check if arguments is yes or no
265def yes_no(string):
266        if string == "yes" :
267                return True
268        if string == "no" :
269                return False
270        raise argparse.ArgumentTypeError(msg)
271
272def fancy_print(text):
273        column = which('column')
274        if column:
275                subprocess.run(column, input=bytes(text + "\n", "UTF-8"))
276        else:
277                print(text)
278
279
280def core_info(path):
281        if not os.path.isfile(path):
282                return 1, "ERR Executable path is wrong"
283
284        cmd   = os.path.join(settings.SRCDIR, "pybin/print-core.gdb")
285        if not os.path.isfile(cmd):
286                return 1, "ERR Printing format for core dumps not found"
287
288        core  = os.path.join(os.getcwd(), "core" )
289
290        if not os.path.isfile(core):
291                return 1, "ERR No core dump"
292
293        return sh('gdb', '-n', path, core, '-batch', '-x', cmd, output=subprocess.PIPE)
294
295class Timed:
296    def __enter__(self):
297        self.start = time.time()
298        return self
299
300    def __exit__(self, *args):
301        self.end = time.time()
302        self.duration = self.end - self.start
Note: See TracBrowser for help on using the repository browser.