source: tests/pybin/tools.py@ 5bf1f3e

ADT arm-eh ast-experimental cleanup-dtors enum forall-pointer-decay jacob/cs343-translation jenkins-sandbox new-ast new-ast-unique-expr pthread-emulation qualifiedEnum
Last change on this file since 5bf1f3e was 5bf1f3e, checked in by tdelisle <tdelisle@…>, 7 years ago

Code review of test.py and pybin

  • Property mode set to 100644
File size: 7.4 KB
Line 
1import __main__
2import argparse
3import fileinput
4import multiprocessing
5import os
6import re
7import resource
8import signal
9import stat
10import sys
11import time
12import types
13
14from pybin import settings
15from subprocess import Popen, PIPE, STDOUT
16
17################################################################################
18# shell helpers
19################################################################################
20
21# helper functions to run terminal commands
22def sh(cmd, print2stdout = True, input = None):
23 # add input redirection if needed
24 if input and os.path.isfile(input):
25 cmd += " < %s" % input
26
27 # if this is a dry_run, only print the commands that would be ran
28 if settings.dry_run :
29 print("cmd: %s" % cmd)
30 return 0, None
31
32 # otherwise create a pipe and run the desired command
33 else :
34 proc = Popen(cmd, stdout=None if print2stdout else PIPE, stderr=STDOUT, shell=True)
35 out, err = proc.communicate()
36 return proc.returncode, out
37
38def is_ascii(fname):
39 if settings.dry_run:
40 print("is_ascii: %s" % fname)
41 return True
42
43 if not os.path.isfile(fname):
44 return False
45
46 code, out = sh("file %s" % fname, print2stdout = False)
47 if code != 0:
48 return False
49
50 match = re.search(".*: (.*)", out)
51
52 if not match:
53 return False
54
55 return match.group(1).startswith("ASCII text")
56
57def is_exe(fname):
58 return os.path.isfile(fname) and os.access(fname, os.X_OK)
59
60# Remove 1 or more files silently
61def rm( files ):
62 if isinstance(files, str ): files = [ files ]
63 for file in files:
64 sh("rm -f %s > /dev/null 2>&1" % file )
65
66# Create 1 or more directory
67def mkdir( files ):
68 if isinstance(files, str ): files = [ files ]
69 for file in files:
70 p = os.path.normpath( file )
71 d = os.path.dirname ( p )
72 sh( "mkdir -p {}".format(d) )
73
74
75def chdir( dest = __main__.__file__ ):
76 abspath = os.path.abspath(dest)
77 dname = os.path.dirname(abspath)
78 os.chdir(dname)
79
80# diff two files
81def diff( lhs, rhs ):
82 # diff the output of the files
83 diff_cmd = ("diff --text "
84 "--old-group-format='\t\tmissing lines :\n"
85 "%%<' \\\n"
86 "--new-group-format='\t\tnew lines :\n"
87 "%%>' \\\n"
88 "--unchanged-group-format='%%=' \\"
89 "--changed-group-format='\t\texpected :\n"
90 "%%<"
91 "\t\tgot :\n"
92 "%%>\n' \\\n"
93 "--new-line-format='\t\t%%dn\t%%L' \\\n"
94 "--old-line-format='\t\t%%dn\t%%L' \\\n"
95 "--unchanged-line-format='' \\\n"
96 "%s %s")
97
98 # fetch return code and error from the diff command
99 return sh(diff_cmd % (lhs, rhs), False)
100
101# call make
102def make(target, flags = '', redirects = '', error_file = None, silent = False):
103 test_param = """test="%s" """ % (error_file) if error_file else ''
104 cmd = ' '.join([
105 settings.make,
106 '-s' if silent else '',
107 test_param,
108 settings.arch.flags,
109 settings.debug.flags,
110 settings.install.flags,
111 flags,
112 target,
113 redirects
114 ])
115 return sh(cmd)
116
117def which(program):
118 fpath, fname = os.path.split(program)
119 if fpath:
120 if is_exe(program):
121 return program
122 else:
123 for path in os.environ["PATH"].split(os.pathsep):
124 exe_file = os.path.join(path, program)
125 if is_exe(exe_file):
126 return exe_file
127
128 return None
129
130def run(exe, output, input):
131 ret, _ = sh("timeout %d %s > %s 2>&1" % (settings.timeout.single, exe, output), input = input)
132 return ret
133
134################################################################################
135# file handling
136################################################################################
137# move a file
138def mv(source, dest):
139 ret, _ = sh("mv %s %s" % (source, dest))
140 return ret
141
142# cat one file into the other
143def cat(source, dest):
144 ret, _ = sh("cat %s > %s" % (source, dest))
145 return ret
146
147# helper function to replace patterns in a file
148def file_replace(fname, pat, s_after):
149 if settings.dry_run:
150 print("replacing '%s' with '%s' in %s" % (pat, s_after, fname))
151 return
152
153 file = fileinput.FileInput(fname, inplace=True, backup='.bak')
154 for line in file:
155 print(line.replace(pat, s_after), end='')
156 file.close()
157
158# helper function to check if a files contains only a specific string
159def file_contains_only(file, text) :
160 with open(file) as f:
161 ff = f.read().strip()
162 result = ff == text.strip()
163
164 return result
165
166# transform path to canonical form
167def canonical_path(path):
168 abspath = os.path.abspath(__main__.__file__)
169 dname = os.path.dirname(abspath)
170 return os.path.join(dname, os.path.normpath(path) )
171
172# compare path even if form is different
173def path_cmp(lhs, rhs):
174 return canonical_path( lhs ) == canonical_path( rhs )
175
176# walk all files in a path
177def path_walk( op ):
178 dname = settings.SRCDIR
179 for dirname, _, names in os.walk(dname):
180 for name in names:
181 path = os.path.join(dirname, name)
182 op( path )
183
184################################################################################
185# system
186################################################################################
187# count number of jobs to create
188def job_count( options, tests ):
189 # check if the user already passed in a number of jobs for multi-threading
190 if not options.jobs:
191 make_flags = os.environ.get('MAKEFLAGS')
192 force = bool(make_flags)
193 make_jobs_fds = re.search("--jobserver-(auth|fds)=\s*([0-9]+),([0-9]+)", make_flags) if make_flags else None
194 if make_jobs_fds :
195 tokens = os.read(int(make_jobs_fds.group(2)), 1024)
196 options.jobs = len(tokens)
197 os.write(int(make_jobs_fds.group(3)), tokens)
198 else :
199 options.jobs = multiprocessing.cpu_count()
200 else :
201 force = True
202
203 # make sure we have a valid number of jobs that corresponds to user input
204 if options.jobs <= 0 :
205 print('ERROR: Invalid number of jobs', file=sys.stderr)
206 sys.exit(1)
207
208 return min( options.jobs, len(tests) ), force
209
210# setup a proper processor pool with correct signal handling
211def setup_pool(jobs):
212 original_sigint_handler = signal.signal(signal.SIGINT, signal.SIG_IGN)
213 pool = multiprocessing.Pool(jobs)
214 signal.signal(signal.SIGINT, original_sigint_handler)
215
216 return pool
217
218# handle signals in scope
219class SignalHandling():
220 def __enter__(self):
221 # enable signal handling
222 signal.signal(signal.SIGINT, signal.SIG_DFL)
223
224 def __exit__(self, type, value, traceback):
225 # disable signal handling
226 signal.signal(signal.SIGINT, signal.SIG_IGN)
227
228
229# enable core dumps for all the test children
230resource.setrlimit(resource.RLIMIT_CORE, (resource.RLIM_INFINITY, resource.RLIM_INFINITY))
231
232################################################################################
233# misc
234################################################################################
235
236# check if arguments is yes or no
237def yes_no(string):
238 if string == "yes" :
239 return True
240 if string == "no" :
241 return False
242 raise argparse.ArgumentTypeError(msg)
243
244def fancy_print(text):
245 column = which('column')
246 if column:
247 cmd = "%s 2> /dev/null" % column
248 proc = Popen(cmd, stdin=PIPE, stderr=None, shell=True)
249 proc.communicate(input=bytes(text + "\n", "UTF-8"))
250 else:
251 print(text)
252
253
254def core_info(path):
255 cmd = os.path.join(settings.SRCDIR, "pybin/print-core.gdb")
256 if not os.path.isfile(cmd):
257 return 1, "ERR Printing format for core dumps not found"
258
259 dname = os.path.dirname(path)
260 core = os.path.join(dname, "core" )
261 if not os.path.isfile(path):
262 return 1, "ERR Executable path is wrong"
263
264 if not os.path.isfile(core):
265 return 1, "ERR No core dump"
266
267 return sh("gdb -n %s %s -batch -x %s" % (path, core, cmd), print2stdout=False)
268
269class Timed:
270 def __enter__(self):
271 self.start = time.time()
272 return self
273
274 def __exit__(self, *args):
275 self.end = time.time()
276 self.duration = self.end - self.start
Note: See TracBrowser for help on using the repository browser.