#!/usr/bin/env python3 # Copyright (C) 2018 Canonical Ltd # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License version 3 as # published by the Free Software Foundation. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . import logging import os import subprocess import argparse import itertools from concurrent.futures import ThreadPoolExecutor from multiprocessing import cpu_count from typing import Dict import yaml # default shell for shellcheck SHELLCHECK_SHELL = os.getenv('SHELLCHECK_SHELL', 'bash') # set to non-empty to ignore all errors NO_FAIL = os.getenv('NO_FAIL') # set to non empty to enable 'set -x' D = os.getenv('D') # set to non-empty to enable verbose logging V = os.getenv('V') # set to a number to use these many threads N = int(os.getenv('N') or cpu_count()) # file with list of files that can fail validation CAN_FAIL = os.getenv('CAN_FAIL') # names of sections SECTIONS = ['prepare', 'prepare-each', 'restore', 'restore-each', 'debug', 'debug-each', 'execute', 'repack'] def parse_arguments(): parser = argparse.ArgumentParser(description='spread shellcheck helper') parser.add_argument('-s', '--shell', default='bash', help='shell') parser.add_argument('-n', '--no-errors', action='store_true', default=False, help='ignore all errors ') parser.add_argument('-v', '--verbose', action='store_true', default=False, help='verbose logging') parser.add_argument('--can-fail', default=None, help=('file with list of files that are can fail ' 'validation')) parser.add_argument('-P', '--max-procs', default=N, type=int, metavar='N', help='run these many shellchecks in parallel (default: %(default)s)') parser.add_argument('paths', nargs='+', help='paths to check') return parser.parse_args() class ShellcheckRunError(Exception): def __init__(self, stderr): super().__init__() self.stderr = stderr class ShellcheckError(Exception): def __init__(self, path): super().__init__() self.sectionerrors = {} self.path = path def addfailure(self, section, error): self.sectionerrors[section] = error def __len__(self): return len(self.sectionerrors) class ShellcheckFailures(Exception): def __init__(self, failures=None): super().__init__() self.failures = set() if failures: self.failures = set(failures) def merge(self, otherfailures): self.failures = self.failures.union(otherfailures.failures) def __len__(self): return len(self.failures) def intersection(self, other): return self.failures.intersection(other) def difference(self, other): return self.failures.difference(other) def __iter__(self): return iter(self.failures) def checksection(data, env: Dict[str, str]): # spread shell snippets are executed under 'set -e' shell, make sure # shellcheck knows about that script_data = [] script_data.append('set -e') for key, value in env.items(): value = str(value) # Unpack the special "$(HOST: ...) syntax and tell shellcheck not to # worry about the use of echo to print variable value. if value.startswith("$(HOST:") and value.endswith(")"): script_data.append("# shellcheck disable=SC2116") value = "$({})".format(value[len("$(HOST:"):-1]) # XXX: poor man's shell key=value assignment with values in double # quotes so that one value can refer to another value. if '"' in value: value = value.replace('"', '\"') # converts # FOO: "$(HOST: echo $foo)" -> FOO="$(echo $foo)" # FOO: "$(HOST: echo \"$foo\")" -> FOO="$(echo \"$foo\")" # FOO: "foo" -> FOO="foo" script_data.append("{}=\"{}\"".format(key, value)) script_data.append("export {}".format(key)) script_data.append(data) proc = subprocess.Popen("shellcheck -s {} -x -".format(SHELLCHECK_SHELL), stdout=subprocess.PIPE, stdin=subprocess.PIPE, shell=True) stdout, _ = proc.communicate(input='\n'.join(script_data).encode('utf-8'), timeout=30) if proc.returncode != 0: raise ShellcheckRunError(stdout) def checkfile(path, executor): logging.debug("checking file %s", path) with open(path) as inf: data = yaml.load(inf, Loader=yaml.CSafeLoader) errors = ShellcheckError(path) # TODO: handle stacking of environment from other places that influence it: # spread.yaml -> global env + backend env + suite env -> task.yaml (task # env + variant env). env = {} for key, value in data.get("environment", {}).items(): if "/" in key: # TODO: re-check with each variant's value set. key = key.split('/', 1)[0] env[key] = value for section in SECTIONS: if section not in data: continue try: logging.debug("%s: checking section %s", path, section) checksection(data[section], env) except ShellcheckRunError as serr: errors.addfailure(section, serr.stderr.decode('utf-8')) if path.endswith('spread.yaml') and 'suites' in data: # check suites suites_sections_and_futures = [] for suite in data['suites'].keys(): for section in SECTIONS: if section not in data['suites'][suite]: continue logging.debug("%s (suite %s): checking section %s", path, suite, section) future = executor.submit(checksection, data['suites'][suite][section], env) suites_sections_and_futures.append((suite, section, future)) for item in suites_sections_and_futures: suite, section, future = item try: future.result() except ShellcheckRunError as serr: errors.addfailure('suites/' + suite + '/' + section, serr.stderr.decode('utf-8')) if errors: raise errors def findfiles(locations): for loc in locations: if os.path.isdir(loc): for root, _, files in os.walk(loc, topdown=True): for name in files: if name in ['spread.yaml', 'task.yaml']: yield os.path.join(root, name) else: yield loc def check1path(path, executor): try: checkfile(path, executor) except ShellcheckError as err: return err return None def checkpaths(locs, executor): # setup iterator locations = findfiles(locs) failed = [] for serr in executor.map(check1path, locations, itertools.repeat(executor)): if serr is None: continue logging.error(('shellcheck failed for file %s in sections: ' '%s; error log follows'), serr.path, ', '.join(serr.sectionerrors.keys())) for section, error in serr.sectionerrors.items(): logging.error("%s: section '%s':\n%s", serr.path, section, error) failed.append(serr.path) if failed: raise ShellcheckFailures(failures=failed) def loadfilelist(flistpath): flist = set() with open(flistpath) as inf: for line in inf: if not line.startswith('#'): flist.add(line.strip()) return flist def main(opts): paths = opts.paths or ['.'] failures = ShellcheckFailures() with ThreadPoolExecutor(max_workers=opts.max_procs) as executor: try: checkpaths(paths, executor) except ShellcheckFailures as sf: failures.merge(sf) if failures: if opts.can_fail: can_fail = loadfilelist(opts.can_fail) unexpected = failures.difference(can_fail) if unexpected: logging.error(('validation failed for the following ' 'non-whitelisted files:\n%s'), '\n'.join([' - ' + f for f in sorted(unexpected)])) raise SystemExit(1) did_not_fail = can_fail - failures.intersection(can_fail) if did_not_fail: logging.error(('the following files are whitelisted ' 'but validated successfully:\n%s'), '\n'.join([' - ' + f for f in sorted(did_not_fail)])) raise SystemExit(1) # no unexpected failures return logging.error('validation failed for the following files:\n%s', '\n'.join([' - ' + f for f in sorted(failures)])) if NO_FAIL or opts.no_errors: logging.warning("ignoring errors") else: raise SystemExit(1) if __name__ == '__main__': opts = parse_arguments() if opts.verbose or D or V: lvl = logging.DEBUG else: lvl = logging.INFO logging.basicConfig(level=lvl) if CAN_FAIL: opts.can_fail = CAN_FAIL if NO_FAIL: opts.no_errors = True if opts.max_procs == 1: # TODO: temporary workaround for a deadlock when running with a single # worker opts.max_procs += 1 logging.warning('workers count bumped to 2 to workaround a deadlock') main(opts)