source: tests/pybin/tools.py@ d3c1c6a

ADT arm-eh ast-experimental cleanup-dtors enum forall-pointer-decay jacob/cs343-translation jenkins-sandbox new-ast new-ast-unique-expr pthread-emulation qualifiedEnum
Last change on this file since d3c1c6a was 5b993e0, checked in by tdelisle <tdelisle@…>, 7 years ago

Updated test scripts to use python3

  • Property mode set to 100644
File size: 8.0 KB
Line 
1import __main__
2import argparse
3import fileinput
4import multiprocessing
5import os
6import re
7import resource
8import signal
9import stat
10import sys
11import time
12import types
13
14from pybin import settings
15from subprocess import Popen, PIPE, STDOUT
16
17################################################################################
18# shell helpers
19################################################################################
20
21# helper functions to run terminal commands
22def sh(cmd, print2stdout = True, input = None):
23 # add input redirection if needed
24 if input and os.path.isfile(input):
25 cmd += " < %s" % input
26
27 # if this is a dry_run, only print the commands that would be ran
28 if settings.dry_run :
29 print("cmd: %s" % cmd)
30 return 0, None
31
32 # otherwise create a pipe and run the desired command
33 else :
34 proc = Popen(cmd, stdout=None if print2stdout else PIPE, stderr=STDOUT, shell=True)
35 out, err = proc.communicate()
36 return proc.returncode, out
37
38def is_ascii(fname):
39 if settings.dry_run:
40 print("is_ascii: %s" % fname)
41 return True
42
43 if not os.path.isfile(fname):
44 return False
45
46 code, out = sh("file %s" % fname, print2stdout = False)
47 if code != 0:
48 return False
49
50 match = re.search(".*: (.*)", out)
51
52 if not match:
53 return False
54
55 return match.group(1).startswith("ASCII text")
56
57# Remove 1 or more files silently
58def rm( files ):
59 if isinstance(files, str ): files = [ files ]
60 for file in files:
61 sh("rm -f %s > /dev/null 2>&1" % file )
62
63# Create 1 or more directory
64def mkdir( files ):
65 if isinstance(files, str ): files = [ files ]
66 for file in files:
67 p = os.path.normpath( file )
68 d = os.path.dirname ( p )
69 sh( "mkdir -p {}".format(d) )
70
71
72def chdir( dest = __main__.__file__ ):
73 abspath = os.path.abspath(dest)
74 dname = os.path.dirname(abspath)
75 os.chdir(dname)
76
77# diff two files
78def diff( lhs, rhs ):
79 # diff the output of the files
80 diff_cmd = ("diff --text "
81# "--ignore-all-space "
82# "--ignore-blank-lines "
83 "--old-group-format='\t\tmissing lines :\n"
84 "%%<' \\\n"
85 "--new-group-format='\t\tnew lines :\n"
86 "%%>' \\\n"
87 "--unchanged-group-format='%%=' \\"
88 "--changed-group-format='\t\texpected :\n"
89 "%%<"
90 "\t\tgot :\n"
91 "%%>\n' \\\n"
92 "--new-line-format='\t\t%%dn\t%%L' \\\n"
93 "--old-line-format='\t\t%%dn\t%%L' \\\n"
94 "--unchanged-line-format='' \\\n"
95 "%s %s")
96
97 # fetch return code and error from the diff command
98 return sh(diff_cmd % (lhs, rhs), False)
99
100# call make
101def make(target, flags = '', redirects = '', error_file = None, silent = False):
102 test_param = """test="%s" """ % (error_file) if error_file else ''
103 cmd = ' '.join([
104 settings.make,
105 '-s' if silent else '',
106 test_param,
107 settings.arch.flags,
108 settings.debug.flags,
109 settings.install.flags,
110 flags,
111 target,
112 redirects
113 ])
114 return sh(cmd)
115
116def which(program):
117 import os
118 def is_exe(fpath):
119 return os.path.isfile(fpath) and os.access(fpath, os.X_OK)
120
121 fpath, fname = os.path.split(program)
122 if fpath:
123 if is_exe(program):
124 return program
125 else:
126 for path in os.environ["PATH"].split(os.pathsep):
127 exe_file = os.path.join(path, program)
128 if is_exe(exe_file):
129 return exe_file
130
131 return None
132
133def run(exe, output, input):
134 ret, _ = sh("timeout %d %s > %s 2>&1" % (settings.timeout.single, exe, output), input = input)
135 return ret
136
137################################################################################
138# file handling
139################################################################################
140# move a file
141def mv(source, dest):
142 ret, _ = sh("mv %s %s" % (source, dest))
143 return ret
144
145# cat one file into the other
146def cat(source, dest):
147 ret, _ = sh("cat %s > %s" % (source, dest))
148 return ret
149
150# helper function to replace patterns in a file
151def file_replace(fname, pat, s_after):
152 if settings.dry_run:
153 print("replacing '%s' with '%s' in %s" % (pat, s_after, fname))
154 return
155
156 file = fileinput.FileInput(fname, inplace=True, backup='.bak')
157 for line in file:
158 print(line.replace(pat, s_after), end='')
159 file.close()
160
161# helper function to check if a files contains only a specific string
162def fileContainsOnly(file, text) :
163 with open(file) as f:
164 ff = f.read().strip()
165 result = ff == text.strip()
166
167 return result;
168
169# check whether or not a file is executable
170def fileIsExecutable(file) :
171 try :
172 fileinfo = os.stat(file)
173 return bool(fileinfo.st_mode & stat.S_IXUSR)
174 except Exception as inst:
175 print(type(inst)) # the exception instance
176 print(inst.args) # arguments stored in .args
177 print(inst)
178 return False
179
180# transform path to canonical form
181def canonicalPath(path):
182 abspath = os.path.abspath(__main__.__file__)
183 dname = os.path.dirname(abspath)
184 return os.path.join(dname, os.path.normpath(path) )
185
186# compare path even if form is different
187def pathCmp(lhs, rhs):
188 return canonicalPath( lhs ) == canonicalPath( rhs )
189
190# walk all files in a path
191def pathWalk( op ):
192 def step(_, dirname, names):
193 for name in names:
194 path = os.path.join(dirname, name)
195 op( path )
196
197 # Start the walk
198 dname = settings.SRCDIR
199 for dirname, _, names in os.walk(dname):
200 for name in names:
201 path = os.path.join(dirname, name)
202 op( path )
203
204################################################################################
205# system
206################################################################################
207# count number of jobs to create
208def jobCount( options, tests ):
209 # check if the user already passed in a number of jobs for multi-threading
210 if not options.jobs:
211 make_flags = os.environ.get('MAKEFLAGS')
212 force = bool(make_flags)
213 make_jobs_fds = re.search("--jobserver-(auth|fds)=\s*([0-9]+),([0-9]+)", make_flags) if make_flags else None
214 if make_jobs_fds :
215 tokens = os.read(int(make_jobs_fds.group(2)), 1024)
216 options.jobs = len(tokens)
217 os.write(int(make_jobs_fds.group(3)), tokens)
218 else :
219 options.jobs = multiprocessing.cpu_count()
220 else :
221 force = True
222
223 # make sure we have a valid number of jobs that corresponds to user input
224 if options.jobs <= 0 :
225 print('ERROR: Invalid number of jobs', file=sys.stderr)
226 sys.exit(1)
227
228 return min( options.jobs, len(tests) ), force
229
230# setup a proper processor pool with correct signal handling
231def setupPool(jobs):
232 original_sigint_handler = signal.signal(signal.SIGINT, signal.SIG_IGN)
233 pool = multiprocessing.Pool(jobs)
234 signal.signal(signal.SIGINT, original_sigint_handler)
235
236 return pool
237
238# handle signals in scope
239class SignalHandling():
240 def __enter__(self):
241 # enable signal handling
242 signal.signal(signal.SIGINT, signal.SIG_DFL)
243
244 def __exit__(self, type, value, traceback):
245 # disable signal handling
246 signal.signal(signal.SIGINT, signal.SIG_IGN)
247
248
249# enable core dumps for all the test children
250resource.setrlimit(resource.RLIMIT_CORE, (resource.RLIM_INFINITY, resource.RLIM_INFINITY))
251
252################################################################################
253# misc
254################################################################################
255
256# check if arguments is yes or no
257def yes_no(string):
258 if string == "yes" :
259 return True
260 if string == "no" :
261 return False
262 raise argparse.ArgumentTypeError(msg)
263 return False
264
265def fancy_print(text):
266 column = which('column')
267 if column:
268 cmd = "%s 2> /dev/null" % column
269 proc = Popen(cmd, stdin=PIPE, stderr=None, shell=True)
270 proc.communicate(input=bytes(text + "\n", "UTF-8"))
271 else:
272 print(text)
273
274
275def coreInfo(path):
276 cmd = os.path.join(settings.SRCDIR, "pybin/print-core.gdb")
277 if not os.path.isfile(cmd):
278 return 1, "ERR Printing format for core dumps not found"
279
280 dname = os.path.dirname(path)
281 core = os.path.join(dname, "core" )
282 if not os.path.isfile(path):
283 return 1, "ERR Executable path is wrong"
284
285 if not os.path.isfile(core):
286 return 1, "ERR No core dump"
287
288 return sh("gdb -n %s %s -batch -x %s" % (path, core, cmd), print2stdout=False)
289
290class Timed:
291 def __enter__(self):
292 self.start = time.time()
293 return self
294
295 def __exit__(self, *args):
296 self.end = time.time()
297 self.duration = self.end - self.start
Note: See TracBrowser for help on using the repository browser.