Revert change that was accidentally commited as part of 4314 to
[chromiumos/third_party/autotest.git] / client / common_lib / test.py
1 # Shell class for a test, inherited by all individual tests
2 #
3 # Methods:
4 #       __init__        initialise
5 #       initialize      run once for each job
6 #       setup           run once for each new version of the test installed
7 #       run             run the test (wrapped by job.run_test())
8 #
9 # Data:
10 #       job             backreference to the job this test instance is part of
11 #       outputdir       eg. results/<job>/<testname.tag>
12 #       resultsdir      eg. results/<job>/<testname.tag>/results
13 #       profdir         eg. results/<job>/<testname.tag>/profiling
14 #       debugdir        eg. results/<job>/<testname.tag>/debug
15 #       bindir          eg. tests/<test>
16 #       src             eg. tests/<test>/src
17 #       tmpdir          eg. tmp/<tempname>_<testname.tag>
18
19 import fcntl, getpass, os, re, sys, shutil, tarfile, tempfile, time, traceback
20 import warnings, logging, glob, resource
21
22 from autotest_lib.client.common_lib import error
23 from autotest_lib.client.bin import utils
24
25
26 class base_test:
27     preserve_srcdir = False
28     network_destabilizing = False
29
30     def __init__(self, job, bindir, outputdir):
31         self.job = job
32         self.pkgmgr = job.pkgmgr
33         self.autodir = job.autodir
34         self.outputdir = outputdir
35         self.tagged_testname = os.path.basename(self.outputdir)
36         self.resultsdir = os.path.join(self.outputdir, 'results')
37         os.mkdir(self.resultsdir)
38         self.profdir = os.path.join(self.outputdir, 'profiling')
39         os.mkdir(self.profdir)
40         self.debugdir = os.path.join(self.outputdir, 'debug')
41         os.mkdir(self.debugdir)
42         if getpass.getuser() == 'root':
43             self.configure_crash_handler()
44         else:
45             self.crash_handling_enabled = False
46         self.bindir = bindir
47         self.srcdir = os.path.join(self.bindir, 'src')
48         self.tmpdir = tempfile.mkdtemp("_" + self.tagged_testname,
49                                        dir=job.tmpdir)
50         self._keyvals = []
51         self._new_keyval = False
52         self.failed_constraints = []
53         self.iteration = 0
54         self.before_iteration_hooks = []
55         self.after_iteration_hooks = []
56
57
58     def configure_crash_handler(self):
59         pass
60
61
62     def crash_handler_report(self):
63         pass
64
65
66     def assert_(self, expr, msg='Assertion failed.'):
67         if not expr:
68             raise error.TestError(msg)
69
70
71     def write_test_keyval(self, attr_dict):
72         utils.write_keyval(self.outputdir, attr_dict)
73
74
75     @staticmethod
76     def _append_type_to_keys(dictionary, typename):
77         new_dict = {}
78         for key, value in dictionary.iteritems():
79             new_key = "%s{%s}" % (key, typename)
80             new_dict[new_key] = value
81         return new_dict
82
83
84     def write_perf_keyval(self, perf_dict):
85         self.write_iteration_keyval({}, perf_dict)
86
87
88     def write_attr_keyval(self, attr_dict):
89         self.write_iteration_keyval(attr_dict, {})
90
91
92     def write_iteration_keyval(self, attr_dict, perf_dict):
93         # append the dictionaries before they have the {perf} and {attr} added
94         self._keyvals.append({'attr':attr_dict, 'perf':perf_dict})
95         self._new_keyval = True
96
97         if attr_dict:
98             attr_dict = self._append_type_to_keys(attr_dict, "attr")
99             utils.write_keyval(self.resultsdir, attr_dict, type_tag="attr")
100
101         if perf_dict:
102             perf_dict = self._append_type_to_keys(perf_dict, "perf")
103             utils.write_keyval(self.resultsdir, perf_dict, type_tag="perf")
104
105         keyval_path = os.path.join(self.resultsdir, "keyval")
106         print >> open(keyval_path, "a"), ""
107
108
109     def analyze_perf_constraints(self, constraints):
110         if not self._new_keyval:
111             return
112
113         # create a dict from the keyvals suitable as an environment for eval
114         keyval_env = self._keyvals[-1]['perf'].copy()
115         keyval_env['__builtins__'] = None
116         self._new_keyval = False
117         failures = []
118
119         # evaluate each constraint using the current keyvals
120         for constraint in constraints:
121             logging.info('___________________ constraint = %s', constraint)
122             logging.info('___________________ keyvals = %s', keyval_env)
123
124             try:
125                 if not eval(constraint, keyval_env):
126                     failures.append('%s: constraint was not met' % constraint)
127             except:
128                 failures.append('could not evaluate constraint: %s'
129                                 % constraint)
130
131         # keep track of the errors for each iteration
132         self.failed_constraints.append(failures)
133
134
135     def process_failed_constraints(self):
136         msg = ''
137         for i, failures in enumerate(self.failed_constraints):
138             if failures:
139                 msg += 'iteration %d:%s  ' % (i, ','.join(failures))
140
141         if msg:
142             raise error.TestFail(msg)
143
144
145     def register_before_iteration_hook(self, iteration_hook):
146         """
147         This is how we expect test writers to register a before_iteration_hook.
148         This adds the method to the list of hooks which are executed
149         before each iteration.
150
151         @param iteration_hook: Method to run before each iteration. A valid
152                                hook accepts a single argument which is the
153                                test object.
154         """
155         self.before_iteration_hooks.append(iteration_hook)
156
157
158     def register_after_iteration_hook(self, iteration_hook):
159         """
160         This is how we expect test writers to register an after_iteration_hook.
161         This adds the method to the list of hooks which are executed
162         after each iteration.
163
164         @param iteration_hook: Method to run after each iteration. A valid
165                                hook accepts a single argument which is the
166                                test object.
167         """
168         self.after_iteration_hooks.append(iteration_hook)
169
170
171     def initialize(self):
172         pass
173
174
175     def setup(self):
176         pass
177
178
179     def warmup(self, *args, **dargs):
180         pass
181
182
183     def drop_caches_between_iterations(self):
184         if self.job.drop_caches_between_iterations:
185             print "Dropping caches between iterations"
186             utils.drop_caches()
187
188
189     def _call_run_once(self, constraints, profile_only,
190                        postprocess_profiled_run, args, dargs):
191         self.drop_caches_between_iterations()
192
193         # execute iteration hooks
194         for hook in self.before_iteration_hooks:
195             hook(self)
196
197         if profile_only:
198             if not self.job.profilers.present():
199                 self.job.record('WARN', None, None, 'No profilers have been '
200                                 'added but profile_only is set - nothing '
201                                 'will be run')
202             self.run_once_profiling(postprocess_profiled_run, *args, **dargs)
203         else:
204             self.before_run_once()
205             self.run_once(*args, **dargs)
206             self.after_run_once()
207
208         for hook in self.after_iteration_hooks:
209             hook(self)
210
211         self.postprocess_iteration()
212         self.analyze_perf_constraints(constraints)
213
214
215     def execute(self, iterations=None, test_length=None, profile_only=None,
216                 _get_time=time.time, postprocess_profiled_run=None,
217                 constraints=(), *args, **dargs):
218         """
219         This is the basic execute method for the tests inherited from base_test.
220         If you want to implement a benchmark test, it's better to implement
221         the run_once function, to cope with the profiling infrastructure. For
222         other tests, you can just override the default implementation.
223
224         @param test_length: The minimum test length in seconds. We'll run the
225             run_once function for a number of times large enough to cover the
226             minimum test length.
227
228         @param iterations: A number of iterations that we'll run the run_once
229             function. This parameter is incompatible with test_length and will
230             be silently ignored if you specify both.
231
232         @param profile_only: If true run X iterations with profilers enabled.
233             If false run X iterations and one with profiling if profiles are
234             enabled. If None, default to the value of job.default_profile_only.
235
236         @param _get_time: [time.time] Used for unit test time injection.
237
238         @param postprocess_profiled_run: Run the postprocessing for the
239             profiled run.
240         """
241
242         # For our special class of tests, the benchmarks, we don't want
243         # profilers to run during the test iterations. Let's reserve only
244         # the last iteration for profiling, if needed. So let's stop
245         # all profilers if they are present and active.
246         profilers = self.job.profilers
247         if profilers.active():
248             profilers.stop(self)
249         if profile_only is None:
250             profile_only = self.job.default_profile_only
251         # If the user called this test in an odd way (specified both iterations
252         # and test_length), let's warn them.
253         if iterations and test_length:
254             logging.info('Iterations parameter ignored (timed execution).')
255         if test_length:
256             test_start = _get_time()
257             time_elapsed = 0
258             timed_counter = 0
259             logging.info('Test started. Minimum test length: %d s',
260                                test_length)
261             while time_elapsed < test_length:
262                 timed_counter = timed_counter + 1
263                 if time_elapsed == 0:
264                     logging.info('Executing iteration %d', timed_counter)
265                 elif time_elapsed > 0:
266                     logging.info(
267                             'Executing iteration %d, time_elapsed %d s',
268                             timed_counter, time_elapsed)
269                 self._call_run_once(constraints, profile_only,
270                                     postprocess_profiled_run, args, dargs)
271                 test_iteration_finish = _get_time()
272                 time_elapsed = test_iteration_finish - test_start
273             logging.info('Test finished after %d iterations',
274                                timed_counter)
275             logging.info('Time elapsed: %d s', time_elapsed)
276         else:
277             if iterations is None:
278                 iterations = 1
279             logging.info('Test started. Number of iterations: %d', iterations)
280             for self.iteration in xrange(1, iterations+1):
281                 logging.info('Executing iteration %d of %d', self.iteration,
282                                                              iterations)
283                 self._call_run_once(constraints, profile_only,
284                                     postprocess_profiled_run, args, dargs)
285             logging.info('Test finished after %d iterations.', iterations)
286
287         if not profile_only:
288             self.iteration += 1
289             self.run_once_profiling(postprocess_profiled_run, *args, **dargs)
290
291         # Do any postprocessing, normally extracting performance keyvals, etc
292         self.postprocess()
293         self.process_failed_constraints()
294
295
296     def run_once_profiling(self, postprocess_profiled_run, *args, **dargs):
297         profilers = self.job.profilers
298         # Do a profiling run if necessary
299         if profilers.present():
300             self.drop_caches_between_iterations()
301             profilers.before_start(self)
302
303             self.before_run_once()
304             profilers.start(self)
305             print 'Profilers present. Profiling run started'
306
307             try:
308                 self.run_once(*args, **dargs)
309
310                 # Priority to the run_once() argument over the attribute.
311                 postprocess_attribute = getattr(self,
312                                                 'postprocess_profiled_run',
313                                                 False)
314
315                 if (postprocess_profiled_run or
316                     (postprocess_profiled_run is None and
317                      postprocess_attribute)):
318                     self.postprocess_iteration()
319
320             finally:
321                 profilers.stop(self)
322                 profilers.report(self)
323
324             self.after_run_once()
325
326
327     def postprocess(self):
328         pass
329
330
331     def postprocess_iteration(self):
332         pass
333
334
335     def cleanup(self):
336         pass
337
338
339     def before_run_once(self):
340         """
341         Override in tests that need it, will be called before any run_once()
342         call including the profiling run (when it's called before starting
343         the profilers).
344         """
345         pass
346
347
348     def after_run_once(self):
349         """
350         Called after every run_once (including from a profiled run when it's
351         called after stopping the profilers).
352         """
353         pass
354
355
356     def _exec(self, args, dargs):
357         self.job.logging.tee_redirect_debug_dir(self.debugdir,
358                                                 log_name=self.tagged_testname)
359         try:
360             if self.network_destabilizing:
361                 self.job.disable_warnings("NETWORK")
362
363             # write out the test attributes into a keyval
364             dargs   = dargs.copy()
365             run_cleanup = dargs.pop('run_cleanup', self.job.run_test_cleanup)
366             keyvals = dargs.pop('test_attributes', {}).copy()
367             keyvals['version'] = self.version
368             for i, arg in enumerate(args):
369                 keyvals['param-%d' % i] = repr(arg)
370             for name, arg in dargs.iteritems():
371                 keyvals['param-%s' % name] = repr(arg)
372             self.write_test_keyval(keyvals)
373
374             _validate_args(args, dargs, self.initialize, self.setup,
375                            self.execute, self.cleanup)
376
377             try:
378                 # Initialize:
379                 _cherry_pick_call(self.initialize, *args, **dargs)
380
381                 lockfile = open(os.path.join(self.job.tmpdir, '.testlock'), 'w')
382                 try:
383                     fcntl.flock(lockfile, fcntl.LOCK_EX)
384                     # Setup: (compile and install the test, if needed)
385                     p_args, p_dargs = _cherry_pick_args(self.setup,args,dargs)
386                     utils.update_version(self.srcdir, self.preserve_srcdir,
387                                          self.version, self.setup,
388                                          *p_args, **p_dargs)
389                 finally:
390                     fcntl.flock(lockfile, fcntl.LOCK_UN)
391                     lockfile.close()
392
393                 # Execute:
394                 os.chdir(self.outputdir)
395
396                 # call self.warmup cherry picking the arguments it accepts and
397                 # translate exceptions if needed
398                 _call_test_function(_cherry_pick_call, self.warmup,
399                                     *args, **dargs)
400
401                 if hasattr(self, 'run_once'):
402                     p_args, p_dargs = _cherry_pick_args(self.run_once,
403                                                         args, dargs)
404                     # pull in any non-* and non-** args from self.execute
405                     for param in _get_nonstar_args(self.execute):
406                         if param in dargs:
407                             p_dargs[param] = dargs[param]
408                 else:
409                     p_args, p_dargs = _cherry_pick_args(self.execute,
410                                                         args, dargs)
411
412                 _call_test_function(self.execute, *p_args, **p_dargs)
413             except Exception:
414                 try:
415                     logging.exception('Exception escaping from test:')
416                 except:
417                     pass # don't let logging exceptions here interfere
418
419                 # Save the exception while we run our cleanup() before
420                 # reraising it.
421                 exc_info = sys.exc_info()
422                 try:
423                     try:
424                         if run_cleanup:
425                             _cherry_pick_call(self.cleanup, *args, **dargs)
426                     except Exception:
427                         print 'Ignoring exception during cleanup() phase:'
428                         traceback.print_exc()
429                         print 'Now raising the earlier %s error' % exc_info[0]
430                     self.crash_handler_report()
431                 finally:
432                     self.job.logging.restore()
433                     try:
434                         raise exc_info[0], exc_info[1], exc_info[2]
435                     finally:
436                         # http://docs.python.org/library/sys.html#sys.exc_info
437                         # Be nice and prevent a circular reference.
438                         del exc_info
439             else:
440                 try:
441                     if run_cleanup:
442                         _cherry_pick_call(self.cleanup, *args, **dargs)
443                     self.crash_handler_report()
444                 finally:
445                     self.job.logging.restore()
446         except error.AutotestError:
447             if self.network_destabilizing:
448                 self.job.enable_warnings("NETWORK")
449             # Pass already-categorized errors on up.
450             raise
451         except Exception, e:
452             if self.network_destabilizing:
453                 self.job.enable_warnings("NETWORK")
454             # Anything else is an ERROR in our own code, not execute().
455             raise error.UnhandledTestError(e)
456         else:
457             if self.network_destabilizing:
458                 self.job.enable_warnings("NETWORK")
459
460
461 def _get_nonstar_args(func):
462     """Extract all the (normal) function parameter names.
463
464     Given a function, returns a tuple of parameter names, specifically
465     excluding the * and ** parameters, if the function accepts them.
466
467     @param func: A callable that we want to chose arguments for.
468
469     @return: A tuple of parameters accepted by the function.
470     """
471     return func.func_code.co_varnames[:func.func_code.co_argcount]
472
473
474 def _cherry_pick_args(func, args, dargs):
475     """Sanitize positional and keyword arguments before calling a function.
476
477     Given a callable (func), an argument tuple and a dictionary of keyword
478     arguments, pick only those arguments which the function is prepared to
479     accept and return a new argument tuple and keyword argument dictionary.
480
481     Args:
482       func: A callable that we want to choose arguments for.
483       args: A tuple of positional arguments to consider passing to func.
484       dargs: A dictionary of keyword arguments to consider passing to func.
485     Returns:
486       A tuple of: (args tuple, keyword arguments dictionary)
487     """
488     # Cherry pick args:
489     if func.func_code.co_flags & 0x04:
490         # func accepts *args, so return the entire args.
491         p_args = args
492     else:
493         p_args = ()
494
495     # Cherry pick dargs:
496     if func.func_code.co_flags & 0x08:
497         # func accepts **dargs, so return the entire dargs.
498         p_dargs = dargs
499     else:
500         # Only return the keyword arguments that func accepts.
501         p_dargs = {}
502         for param in _get_nonstar_args(func):
503             if param in dargs:
504                 p_dargs[param] = dargs[param]
505
506     return p_args, p_dargs
507
508
509 def _cherry_pick_call(func, *args, **dargs):
510     """Cherry picks arguments from args/dargs based on what "func" accepts
511     and calls the function with the picked arguments."""
512     p_args, p_dargs = _cherry_pick_args(func, args, dargs)
513     return func(*p_args, **p_dargs)
514
515
516 def _validate_args(args, dargs, *funcs):
517     """Verify that arguments are appropriate for at least one callable.
518
519     Given a list of callables as additional parameters, verify that
520     the proposed keyword arguments in dargs will each be accepted by at least
521     one of the callables.
522
523     NOTE: args is currently not supported and must be empty.
524
525     Args:
526       args: A tuple of proposed positional arguments.
527       dargs: A dictionary of proposed keyword arguments.
528       *funcs: Callables to be searched for acceptance of args and dargs.
529     Raises:
530       error.AutotestError: if an arg won't be accepted by any of *funcs.
531     """
532     all_co_flags = 0
533     all_varnames = ()
534     for func in funcs:
535         all_co_flags |= func.func_code.co_flags
536         all_varnames += func.func_code.co_varnames[:func.func_code.co_argcount]
537
538     # Check if given args belongs to at least one of the methods below.
539     if len(args) > 0:
540         # Current implementation doesn't allow the use of args.
541         raise error.TestError('Unnamed arguments not accepted. Please '
542                               'call job.run_test with named args only')
543
544     # Check if given dargs belongs to at least one of the methods below.
545     if len(dargs) > 0:
546         if not all_co_flags & 0x08:
547             # no func accepts *dargs, so:
548             for param in dargs:
549                 if not param in all_varnames:
550                     raise error.AutotestError('Unknown parameter: %s' % param)
551
552
553 def _installtest(job, url):
554     (group, name) = job.pkgmgr.get_package_name(url, 'test')
555
556     # Bail if the test is already installed
557     group_dir = os.path.join(job.testdir, "download", group)
558     if os.path.exists(os.path.join(group_dir, name)):
559         return (group, name)
560
561     # If the group directory is missing create it and add
562     # an empty  __init__.py so that sub-directories are
563     # considered for import.
564     if not os.path.exists(group_dir):
565         os.mkdir(group_dir)
566         f = file(os.path.join(group_dir, '__init__.py'), 'w+')
567         f.close()
568
569     print name + ": installing test url=" + url
570     tarball = os.path.basename(url)
571     tarball_path = os.path.join(group_dir, tarball)
572     test_dir = os.path.join(group_dir, name)
573     job.pkgmgr.fetch_pkg(tarball, tarball_path,
574                          repo_url = os.path.dirname(url))
575
576     # Create the directory for the test
577     if not os.path.exists(test_dir):
578         os.mkdir(os.path.join(group_dir, name))
579
580     job.pkgmgr.untar_pkg(tarball_path, test_dir)
581
582     os.remove(tarball_path)
583
584     # For this 'sub-object' to be importable via the name
585     # 'group.name' we need to provide an __init__.py,
586     # so link the main entry point to this.
587     os.symlink(name + '.py', os.path.join(group_dir, name,
588                             '__init__.py'))
589
590     # The test is now installed.
591     return (group, name)
592
593
594 def _call_test_function(func, *args, **dargs):
595     """Calls a test function and translates exceptions so that errors
596     inside test code are considered test failures."""
597     try:
598         return func(*args, **dargs)
599     except error.AutotestError:
600         # Pass already-categorized errors on up as is.
601         raise
602     except Exception, e:
603         # Other exceptions must be treated as a FAIL when
604         # raised during the test functions
605         raise error.UnhandledTestFail(e)
606
607
608 def runtest(job, url, tag, args, dargs,
609             local_namespace={}, global_namespace={},
610             before_test_hook=None, after_test_hook=None,
611             before_iteration_hook=None, after_iteration_hook=None):
612     local_namespace = local_namespace.copy()
613     global_namespace = global_namespace.copy()
614
615     # if this is not a plain test name then download and install the
616     # specified test
617     if url.endswith('.tar.bz2'):
618         (group, testname) = _installtest(job, url)
619         bindir = os.path.join(job.testdir, 'download', group, testname)
620         site_bindir = None
621     else:
622         # if the test is local, it can be found in either testdir
623         # or site_testdir. tests in site_testdir override tests
624         # defined in testdir
625         (group, testname) = ('', url)
626         bindir = os.path.join(job.testdir, group, testname)
627         if hasattr(job, 'site_testdir'):
628             site_bindir = os.path.join(job.site_testdir,
629                                        group, testname)
630         else:
631             site_bindir = None
632
633         # The job object here can be that of a server side job or a client
634         # side job. 'install_pkg' method won't be present for server side
635         # jobs, so do the fetch only if that method is present in the job
636         # obj.
637         if hasattr(job, 'install_pkg'):
638             try:
639                 job.install_pkg(testname, 'test', bindir)
640             except error.PackageInstallError, e:
641                 # continue as a fall back mechanism and see if the test code
642                 # already exists on the machine
643                 pass
644
645     outputdir = os.path.join(job.resultdir, testname)
646     if tag:
647         outputdir += '.' + tag
648
649     # if we can find the test in site_bindir, use this version
650     if site_bindir and os.path.exists(site_bindir):
651         bindir = site_bindir
652         testdir = job.site_testdir
653     elif os.path.exists(bindir):
654         testdir = job.testdir
655     else:
656         raise error.TestError(testname + ': test does not exist')
657
658     local_namespace['job'] = job
659     local_namespace['bindir'] = bindir
660     local_namespace['outputdir'] = outputdir
661
662     if group:
663         sys.path.insert(0, os.path.join(testdir, 'download'))
664         group += '.'
665     else:
666         sys.path.insert(0, os.path.join(testdir, testname))
667
668     try:
669         exec ("import %s%s" % (group, testname),
670               local_namespace, global_namespace)
671         exec ("mytest = %s%s.%s(job, bindir, outputdir)" %
672               (group, testname, testname),
673               local_namespace, global_namespace)
674     finally:
675         sys.path.pop(0)
676
677     pwd = os.getcwd()
678     os.chdir(outputdir)
679
680     try:
681         mytest = global_namespace['mytest']
682         if before_test_hook:
683             before_test_hook(mytest)
684
685         # we use the register iteration hooks methods to register the passed
686         # in hooks
687         if before_iteration_hook:
688             mytest.register_before_iteration_hook(before_iteration_hook)
689         if after_iteration_hook:
690             mytest.register_after_iteration_hook(after_iteration_hook)
691         mytest._exec(args, dargs)
692     finally:
693         os.chdir(pwd)
694         if after_test_hook:
695             after_test_hook(mytest)
696         shutil.rmtree(mytest.tmpdir, ignore_errors=True)