GNU Linux-libre 4.19.268-gnu1
[releases.git] / tools / testing / selftests / tc-testing / tdc.py
1 #!/usr/bin/env python3
2 # SPDX-License-Identifier: GPL-2.0
3
4 """
5 tdc.py - Linux tc (Traffic Control) unit test driver
6
7 Copyright (C) 2017 Lucas Bates <lucasb@mojatatu.com>
8 """
9
10 import re
11 import os
12 import sys
13 import argparse
14 import importlib
15 import json
16 import subprocess
17 import time
18 import traceback
19 from collections import OrderedDict
20 from string import Template
21
22 from tdc_config import *
23 from tdc_helper import *
24
25 import TdcPlugin
26
27
28 class PluginMgrTestFail(Exception):
29     def __init__(self, stage, output, message):
30         self.stage = stage
31         self.output = output
32         self.message = message
33
34 class PluginMgr:
35     def __init__(self, argparser):
36         super().__init__()
37         self.plugins = {}
38         self.plugin_instances = []
39         self.args = []
40         self.argparser = argparser
41
42         # TODO, put plugins in order
43         plugindir = os.getenv('TDC_PLUGIN_DIR', './plugins')
44         for dirpath, dirnames, filenames in os.walk(plugindir):
45             for fn in filenames:
46                 if (fn.endswith('.py') and
47                     not fn == '__init__.py' and
48                     not fn.startswith('#') and
49                     not fn.startswith('.#')):
50                     mn = fn[0:-3]
51                     foo = importlib.import_module('plugins.' + mn)
52                     self.plugins[mn] = foo
53                     self.plugin_instances.append(foo.SubPlugin())
54
55     def call_pre_suite(self, testcount, testidlist):
56         for pgn_inst in self.plugin_instances:
57             pgn_inst.pre_suite(testcount, testidlist)
58
59     def call_post_suite(self, index):
60         for pgn_inst in reversed(self.plugin_instances):
61             pgn_inst.post_suite(index)
62
63     def call_pre_case(self, test_ordinal, testid):
64         for pgn_inst in self.plugin_instances:
65             try:
66                 pgn_inst.pre_case(test_ordinal, testid)
67             except Exception as ee:
68                 print('exception {} in call to pre_case for {} plugin'.
69                       format(ee, pgn_inst.__class__))
70                 print('test_ordinal is {}'.format(test_ordinal))
71                 print('testid is {}'.format(testid))
72                 raise
73
74     def call_post_case(self):
75         for pgn_inst in reversed(self.plugin_instances):
76             pgn_inst.post_case()
77
78     def call_pre_execute(self):
79         for pgn_inst in self.plugin_instances:
80             pgn_inst.pre_execute()
81
82     def call_post_execute(self):
83         for pgn_inst in reversed(self.plugin_instances):
84             pgn_inst.post_execute()
85
86     def call_add_args(self, parser):
87         for pgn_inst in self.plugin_instances:
88             parser = pgn_inst.add_args(parser)
89         return parser
90
91     def call_check_args(self, args, remaining):
92         for pgn_inst in self.plugin_instances:
93             pgn_inst.check_args(args, remaining)
94
95     def call_adjust_command(self, stage, command):
96         for pgn_inst in self.plugin_instances:
97             command = pgn_inst.adjust_command(stage, command)
98         return command
99
100     @staticmethod
101     def _make_argparser(args):
102         self.argparser = argparse.ArgumentParser(
103             description='Linux TC unit tests')
104
105
106 def replace_keywords(cmd):
107     """
108     For a given executable command, substitute any known
109     variables contained within NAMES with the correct values
110     """
111     tcmd = Template(cmd)
112     subcmd = tcmd.safe_substitute(NAMES)
113     return subcmd
114
115
116 def exec_cmd(args, pm, stage, command):
117     """
118     Perform any required modifications on an executable command, then run
119     it in a subprocess and return the results.
120     """
121     if len(command.strip()) == 0:
122         return None, None
123     if '$' in command:
124         command = replace_keywords(command)
125
126     command = pm.call_adjust_command(stage, command)
127     if args.verbose > 0:
128         print('command "{}"'.format(command))
129     proc = subprocess.Popen(command,
130         shell=True,
131         stdout=subprocess.PIPE,
132         stderr=subprocess.PIPE,
133         env=ENVIR)
134     (rawout, serr) = proc.communicate()
135
136     if proc.returncode != 0 and len(serr) > 0:
137         foutput = serr.decode("utf-8", errors="ignore")
138     else:
139         foutput = rawout.decode("utf-8", errors="ignore")
140
141     proc.stdout.close()
142     proc.stderr.close()
143     return proc, foutput
144
145
146 def prepare_env(args, pm, stage, prefix, cmdlist, output = None):
147     """
148     Execute the setup/teardown commands for a test case.
149     Optionally terminate test execution if the command fails.
150     """
151     if args.verbose > 0:
152         print('{}'.format(prefix))
153     for cmdinfo in cmdlist:
154         if isinstance(cmdinfo, list):
155             exit_codes = cmdinfo[1:]
156             cmd = cmdinfo[0]
157         else:
158             exit_codes = [0]
159             cmd = cmdinfo
160
161         if not cmd:
162             continue
163
164         (proc, foutput) = exec_cmd(args, pm, stage, cmd)
165
166         if proc and (proc.returncode not in exit_codes):
167             print('', file=sys.stderr)
168             print("{} *** Could not execute: \"{}\"".format(prefix, cmd),
169                   file=sys.stderr)
170             print("\n{} *** Error message: \"{}\"".format(prefix, foutput),
171                   file=sys.stderr)
172             print("returncode {}; expected {}".format(proc.returncode,
173                                                       exit_codes))
174             print("\n{} *** Aborting test run.".format(prefix), file=sys.stderr)
175             print("\n\n{} *** stdout ***".format(proc.stdout), file=sys.stderr)
176             print("\n\n{} *** stderr ***".format(proc.stderr), file=sys.stderr)
177             raise PluginMgrTestFail(
178                 stage, output,
179                 '"{}" did not complete successfully'.format(prefix))
180
181 def run_one_test(pm, args, index, tidx):
182     global NAMES
183     result = True
184     tresult = ""
185     tap = ""
186     if args.verbose > 0:
187         print("\t====================\n=====> ", end="")
188     print("Test " + tidx["id"] + ": " + tidx["name"])
189
190     # populate NAMES with TESTID for this test
191     NAMES['TESTID'] = tidx['id']
192
193     pm.call_pre_case(index, tidx['id'])
194     prepare_env(args, pm, 'setup', "-----> prepare stage", tidx["setup"])
195
196     if (args.verbose > 0):
197         print('-----> execute stage')
198     pm.call_pre_execute()
199     (p, procout) = exec_cmd(args, pm, 'execute', tidx["cmdUnderTest"])
200     if p:
201         exit_code = p.returncode
202     else:
203         exit_code = None
204
205     pm.call_post_execute()
206
207     if (exit_code is None or exit_code != int(tidx["expExitCode"])):
208         result = False
209         print("exit: {!r}".format(exit_code))
210         print("exit: {}".format(int(tidx["expExitCode"])))
211         #print("exit: {!r} {}".format(exit_code, int(tidx["expExitCode"])))
212         print(procout)
213     else:
214         if args.verbose > 0:
215             print('-----> verify stage')
216         match_pattern = re.compile(
217             str(tidx["matchPattern"]), re.DOTALL | re.MULTILINE)
218         (p, procout) = exec_cmd(args, pm, 'verify', tidx["verifyCmd"])
219         if procout:
220             match_index = re.findall(match_pattern, procout)
221             if len(match_index) != int(tidx["matchCount"]):
222                 result = False
223         elif int(tidx["matchCount"]) != 0:
224             result = False
225
226     if not result:
227         tresult += 'not '
228     tresult += 'ok {} - {} # {}\n'.format(str(index), tidx['id'], tidx['name'])
229     tap += tresult
230
231     if result == False:
232         if procout:
233             tap += procout
234         else:
235             tap += 'No output!\n'
236
237     prepare_env(args, pm, 'teardown', '-----> teardown stage', tidx['teardown'], procout)
238     pm.call_post_case()
239
240     index += 1
241
242     # remove TESTID from NAMES
243     del(NAMES['TESTID'])
244     return tap
245
246 def test_runner(pm, args, filtered_tests):
247     """
248     Driver function for the unit tests.
249
250     Prints information about the tests being run, executes the setup and
251     teardown commands and the command under test itself. Also determines
252     success/failure based on the information in the test case and generates
253     TAP output accordingly.
254     """
255     testlist = filtered_tests
256     tcount = len(testlist)
257     index = 1
258     tap = ''
259     badtest = None
260     stage = None
261     emergency_exit = False
262     emergency_exit_message = ''
263
264     if args.notap:
265         if args.verbose:
266             tap = 'notap requested:  omitting test plan\n'
267     else:
268         tap = str(index) + ".." + str(tcount) + "\n"
269     try:
270         pm.call_pre_suite(tcount, [tidx['id'] for tidx in testlist])
271     except Exception as ee:
272         ex_type, ex, ex_tb = sys.exc_info()
273         print('Exception {} {} (caught in pre_suite).'.
274               format(ex_type, ex))
275         # when the extra print statements are uncommented,
276         # the traceback does not appear between them
277         # (it appears way earlier in the tdc.py output)
278         # so don't bother ...
279         # print('--------------------(')
280         # print('traceback')
281         traceback.print_tb(ex_tb)
282         # print('--------------------)')
283         emergency_exit_message = 'EMERGENCY EXIT, call_pre_suite failed with exception {} {}\n'.format(ex_type, ex)
284         emergency_exit = True
285         stage = 'pre-SUITE'
286
287     if emergency_exit:
288         pm.call_post_suite(index)
289         return emergency_exit_message
290     if args.verbose > 1:
291         print('give test rig 2 seconds to stabilize')
292     time.sleep(2)
293     for tidx in testlist:
294         if "flower" in tidx["category"] and args.device == None:
295             if args.verbose > 1:
296                 print('Not executing test {} {} because DEV2 not defined'.
297                       format(tidx['id'], tidx['name']))
298             continue
299         try:
300             badtest = tidx  # in case it goes bad
301             tap += run_one_test(pm, args, index, tidx)
302         except PluginMgrTestFail as pmtf:
303             ex_type, ex, ex_tb = sys.exc_info()
304             stage = pmtf.stage
305             message = pmtf.message
306             output = pmtf.output
307             print(message)
308             print('Exception {} {} (caught in test_runner, running test {} {} {} stage {})'.
309                   format(ex_type, ex, index, tidx['id'], tidx['name'], stage))
310             print('---------------')
311             print('traceback')
312             traceback.print_tb(ex_tb)
313             print('---------------')
314             if stage == 'teardown':
315                 print('accumulated output for this test:')
316                 if pmtf.output:
317                     print(pmtf.output)
318             print('---------------')
319             break
320         index += 1
321
322     # if we failed in setup or teardown,
323     # fill in the remaining tests with ok-skipped
324     count = index
325     if not args.notap:
326         tap += 'about to flush the tap output if tests need to be skipped\n'
327         if tcount + 1 != index:
328             for tidx in testlist[index - 1:]:
329                 msg = 'skipped - previous {} failed'.format(stage)
330                 tap += 'ok {} - {} # {} {} {}\n'.format(
331                     count, tidx['id'], msg, index, badtest.get('id', '--Unknown--'))
332                 count += 1
333
334         tap += 'done flushing skipped test tap output\n'
335
336     if args.pause:
337         print('Want to pause\nPress enter to continue ...')
338         if input(sys.stdin):
339             print('got something on stdin')
340
341     pm.call_post_suite(index)
342
343     return tap
344
345 def has_blank_ids(idlist):
346     """
347     Search the list for empty ID fields and return true/false accordingly.
348     """
349     return not(all(k for k in idlist))
350
351
352 def load_from_file(filename):
353     """
354     Open the JSON file containing the test cases and return them
355     as list of ordered dictionary objects.
356     """
357     try:
358         with open(filename) as test_data:
359             testlist = json.load(test_data, object_pairs_hook=OrderedDict)
360     except json.JSONDecodeError as jde:
361         print('IGNORING test case file {}\n\tBECAUSE:  {}'.format(filename, jde))
362         testlist = list()
363     else:
364         idlist = get_id_list(testlist)
365         if (has_blank_ids(idlist)):
366             for k in testlist:
367                 k['filename'] = filename
368     return testlist
369
370
371 def args_parse():
372     """
373     Create the argument parser.
374     """
375     parser = argparse.ArgumentParser(description='Linux TC unit tests')
376     return parser
377
378
379 def set_args(parser):
380     """
381     Set the command line arguments for tdc.
382     """
383     parser.add_argument(
384         '-p', '--path', type=str,
385         help='The full path to the tc executable to use')
386     sg = parser.add_argument_group(
387         'selection', 'select which test cases: ' +
388         'files plus directories; filtered by categories plus testids')
389     ag = parser.add_argument_group(
390         'action', 'select action to perform on selected test cases')
391
392     sg.add_argument(
393         '-D', '--directory', nargs='+', metavar='DIR',
394         help='Collect tests from the specified directory(ies) ' +
395         '(default [tc-tests])')
396     sg.add_argument(
397         '-f', '--file', nargs='+', metavar='FILE',
398         help='Run tests from the specified file(s)')
399     sg.add_argument(
400         '-c', '--category', nargs='*', metavar='CATG', default=['+c'],
401         help='Run tests only from the specified category/ies, ' +
402         'or if no category/ies is/are specified, list known categories.')
403     sg.add_argument(
404         '-e', '--execute', nargs='+', metavar='ID',
405         help='Execute the specified test cases with specified IDs')
406     ag.add_argument(
407         '-l', '--list', action='store_true',
408         help='List all test cases, or those only within the specified category')
409     ag.add_argument(
410         '-s', '--show', action='store_true', dest='showID',
411         help='Display the selected test cases')
412     ag.add_argument(
413         '-i', '--id', action='store_true', dest='gen_id',
414         help='Generate ID numbers for new test cases')
415     parser.add_argument(
416         '-v', '--verbose', action='count', default=0,
417         help='Show the commands that are being run')
418     parser.add_argument(
419         '-N', '--notap', action='store_true',
420         help='Suppress tap results for command under test')
421     parser.add_argument('-d', '--device',
422                         help='Execute the test case in flower category')
423     parser.add_argument(
424         '-P', '--pause', action='store_true',
425         help='Pause execution just before post-suite stage')
426     return parser
427
428
429 def check_default_settings(args, remaining, pm):
430     """
431     Process any arguments overriding the default settings,
432     and ensure the settings are correct.
433     """
434     # Allow for overriding specific settings
435     global NAMES
436
437     if args.path != None:
438         NAMES['TC'] = args.path
439     if args.device != None:
440         NAMES['DEV2'] = args.device
441     if not os.path.isfile(NAMES['TC']):
442         print("The specified tc path " + NAMES['TC'] + " does not exist.")
443         exit(1)
444
445     pm.call_check_args(args, remaining)
446
447
448 def get_id_list(alltests):
449     """
450     Generate a list of all IDs in the test cases.
451     """
452     return [x["id"] for x in alltests]
453
454
455 def check_case_id(alltests):
456     """
457     Check for duplicate test case IDs.
458     """
459     idl = get_id_list(alltests)
460     return [x for x in idl if idl.count(x) > 1]
461
462
463 def does_id_exist(alltests, newid):
464     """
465     Check if a given ID already exists in the list of test cases.
466     """
467     idl = get_id_list(alltests)
468     return (any(newid == x for x in idl))
469
470
471 def generate_case_ids(alltests):
472     """
473     If a test case has a blank ID field, generate a random hex ID for it
474     and then write the test cases back to disk.
475     """
476     import random
477     for c in alltests:
478         if (c["id"] == ""):
479             while True:
480                 newid = str('{:04x}'.format(random.randrange(16**4)))
481                 if (does_id_exist(alltests, newid)):
482                     continue
483                 else:
484                     c['id'] = newid
485                     break
486
487     ufilename = []
488     for c in alltests:
489         if ('filename' in c):
490             ufilename.append(c['filename'])
491     ufilename = get_unique_item(ufilename)
492     for f in ufilename:
493         testlist = []
494         for t in alltests:
495             if 'filename' in t:
496                 if t['filename'] == f:
497                     del t['filename']
498                     testlist.append(t)
499         outfile = open(f, "w")
500         json.dump(testlist, outfile, indent=4)
501         outfile.write("\n")
502         outfile.close()
503
504 def filter_tests_by_id(args, testlist):
505     '''
506     Remove tests from testlist that are not in the named id list.
507     If id list is empty, return empty list.
508     '''
509     newlist = list()
510     if testlist and args.execute:
511         target_ids = args.execute
512
513         if isinstance(target_ids, list) and (len(target_ids) > 0):
514             newlist = list(filter(lambda x: x['id'] in target_ids, testlist))
515     return newlist
516
517 def filter_tests_by_category(args, testlist):
518     '''
519     Remove tests from testlist that are not in a named category.
520     '''
521     answer = list()
522     if args.category and testlist:
523         test_ids = list()
524         for catg in set(args.category):
525             if catg == '+c':
526                 continue
527             print('considering category {}'.format(catg))
528             for tc in testlist:
529                 if catg in tc['category'] and tc['id'] not in test_ids:
530                     answer.append(tc)
531                     test_ids.append(tc['id'])
532
533     return answer
534
535 def get_test_cases(args):
536     """
537     If a test case file is specified, retrieve tests from that file.
538     Otherwise, glob for all json files in subdirectories and load from
539     each one.
540     Also, if requested, filter by category, and add tests matching
541     certain ids.
542     """
543     import fnmatch
544
545     flist = []
546     testdirs = ['tc-tests']
547
548     if args.file:
549         # at least one file was specified - remove the default directory
550         testdirs = []
551
552         for ff in args.file:
553             if not os.path.isfile(ff):
554                 print("IGNORING file " + ff + "\n\tBECAUSE does not exist.")
555             else:
556                 flist.append(os.path.abspath(ff))
557
558     if args.directory:
559         testdirs = args.directory
560
561     for testdir in testdirs:
562         for root, dirnames, filenames in os.walk(testdir):
563             for filename in fnmatch.filter(filenames, '*.json'):
564                 candidate = os.path.abspath(os.path.join(root, filename))
565                 if candidate not in testdirs:
566                     flist.append(candidate)
567
568     alltestcases = list()
569     for casefile in flist:
570         alltestcases = alltestcases + (load_from_file(casefile))
571
572     allcatlist = get_test_categories(alltestcases)
573     allidlist = get_id_list(alltestcases)
574
575     testcases_by_cats = get_categorized_testlist(alltestcases, allcatlist)
576     idtestcases = filter_tests_by_id(args, alltestcases)
577     cattestcases = filter_tests_by_category(args, alltestcases)
578
579     cat_ids = [x['id'] for x in cattestcases]
580     if args.execute:
581         if args.category:
582             alltestcases = cattestcases + [x for x in idtestcases if x['id'] not in cat_ids]
583         else:
584             alltestcases = idtestcases
585     else:
586         if cat_ids:
587             alltestcases = cattestcases
588         else:
589             # just accept the existing value of alltestcases,
590             # which has been filtered by file/directory
591             pass
592
593     return allcatlist, allidlist, testcases_by_cats, alltestcases
594
595
596 def set_operation_mode(pm, args):
597     """
598     Load the test case data and process remaining arguments to determine
599     what the script should do for this run, and call the appropriate
600     function.
601     """
602     ucat, idlist, testcases, alltests = get_test_cases(args)
603
604     if args.gen_id:
605         if (has_blank_ids(idlist)):
606             alltests = generate_case_ids(alltests)
607         else:
608             print("No empty ID fields found in test files.")
609         exit(0)
610
611     duplicate_ids = check_case_id(alltests)
612     if (len(duplicate_ids) > 0):
613         print("The following test case IDs are not unique:")
614         print(str(set(duplicate_ids)))
615         print("Please correct them before continuing.")
616         exit(1)
617
618     if args.showID:
619         for atest in alltests:
620             print_test_case(atest)
621         exit(0)
622
623     if isinstance(args.category, list) and (len(args.category) == 0):
624         print("Available categories:")
625         print_sll(ucat)
626         exit(0)
627
628     if args.list:
629         if args.list:
630             list_test_cases(alltests)
631             exit(0)
632
633     if len(alltests):
634         catresults = test_runner(pm, args, alltests)
635     else:
636         catresults = 'No tests found\n'
637     if args.notap:
638         print('Tap output suppression requested\n')
639     else:
640         print('All test results: \n\n{}'.format(catresults))
641
642 def main():
643     """
644     Start of execution; set up argument parser and get the arguments,
645     and start operations.
646     """
647     parser = args_parse()
648     parser = set_args(parser)
649     pm = PluginMgr(parser)
650     parser = pm.call_add_args(parser)
651     (args, remaining) = parser.parse_known_args()
652     args.NAMES = NAMES
653     check_default_settings(args, remaining, pm)
654     if args.verbose > 2:
655         print('args is {}'.format(args))
656
657     set_operation_mode(pm, args)
658
659     exit(0)
660
661
662 if __name__ == "__main__":
663     main()