diff options
author | Adrian Lane <adrian.lane@canonical.com> | 2020-05-16 22:30:23 -0700 |
---|---|---|
committer | Adrian Lane <adrian.lane@canonical.com> | 2020-05-16 22:30:23 -0700 |
commit | d9c88169c9c49c0466d0600d6991a7c5d80a27ae (patch) | |
tree | 124d19f63c8fefe74f12030814d5b3d9e0c74023 /bin | |
parent | b3edeb2ae96ad5a1747fd08dad8d6c59ec4350cd (diff) |
Major refactor for: helper fn and __init__ cleanup, docstrings added and removed ipmi_locate() and its helper.
Diffstat (limited to 'bin')
-rwxr-xr-x | bin/ipmi_test.py | 306 |
1 files changed, 133 insertions, 173 deletions
diff --git a/bin/ipmi_test.py b/bin/ipmi_test.py index 685c381..f8f029d 100755 --- a/bin/ipmi_test.py +++ b/bin/ipmi_test.py @@ -21,7 +21,6 @@ Tests IPMI subsystem on SUT. """ import re -import os import shutil import sys import argparse @@ -34,133 +33,132 @@ from subprocess import ( SubprocessError) -class IpmiTest(object): +class FreeIpmiTest: def __init__(self): - # paths to kernel_module binaries - self._path_lsmod = self._get_path('lsmod') - self._path_modprobe = self._get_path('modprobe') - # kernel modules to load/verify + def get_path(binary): + """Get absolute path of FreeIPMI/nix binary, + warn upon failure. + """ + path_full = shutil.which(binary) + if path_full: + return path_full + else: + logging.info( + '* Unable to stat absolute path for %s!' + % binary) + return binary + + # paths to load_kernel_module() binaries + self._path_lsmod = get_path('lsmod') + self._path_modprobe = get_path('modprobe') + # ipmi kernel modules to load/verify self._kernel_modules = ( 'ipmi_si', 'ipmi_devintf', 'ipmi_powernv', 'ipmi_ssif', 'ipmi_msghandler') - # paths to freeipmi tools - self._path_ipmi_chassis = self._get_path('ipmi-chassis') - self._path_ipmi_config = self._get_path('ipmi-config') - self._path_bmc_info = self._get_path('bmc-info') - self._path_ipmi_locate = self._get_path('ipmi-locate') - # function subprocess commands - self._cmd_kernel_mods = [ - 'sudo', self._path_lsmod] + # method subprocess commands (FreeIPMI) self._cmd_ipmi_chassis = [ - 'sudo', self._path_ipmi_chassis, '--get-status'] + get_path('ipmi-chassis'), '--get-status'] self._cmd_ipmi_channel = [ - 'sudo', self._path_ipmi_config, '--checkout', + get_path('ipmi-config'), '--checkout', '--lan-channel-number'] - self._cmd_bmc_info = [ - 'sudo', self._path_bmc_info] + self._cmd_get_bmc_info = [ + get_path('bmc-info')] self._cmd_ipmi_locate = [ - 'sudo', self._path_ipmi_locate] + get_path('ipmi-locate')] # min. ipmi version to pass self._ipmi_ver = 2.0 # subprocess call timeout (s) self._subproc_timeout = 10 # raised subproc exceptions to handle - self._sub_proc_excs = ( + # (decoupled from self._process_exc()) + self._sub_process_excs = ( TimeoutExpired, SubprocessError, OSError, - TypeError) + TypeError, + FileNotFoundError) - # get absolute path - def _get_path(self, binary): - try: - path_full = shutil.which(binary) - return path_full - except (self._sub_proc_excs[-2:-1]): - logging.info('* Unable to stat path via shutil lib!') - logging.info('* Using relative paths...') - return binary - - # subprocess stdin/stderr handling def _subproc_logging(self, cmd): + """Subprocess stdin/stderr handling.""" process = Popen( cmd, stdout=PIPE, stderr=PIPE, universal_newlines=True) output, error = process.communicate(timeout=self._subproc_timeout) logging.debug('## Debug Output: ##') - if (len(output) > 0): - # padding + if output: logging.debug(' [Stdout]\n') - logging.debug(f'{output}\n') - if (len(error) > 0): - # padding + logging.debug('%s\n' % output) + if error: logging.debug(' [Stderr]\n') - logging.debug(f'{error}\n') + logging.debug('%s\n' % error) logging.debug('## End Debug Output ##\n') + # ignore stderr return output - # post-process exception handling - def _proc_exc(self, exc, test_method): - if (type(exc) == TimeoutExpired): + def _process_exc(self, exc, test_method): + """Allows for bundling of exception handling for all + methods within this class. + """ + if type(exc) == TimeoutExpired: logging.info( - f'* Timeout calling {test_method}!' - f' ({self._subproc_timeout}s)\n') - elif (type(exc) == TypeError): + '* Timeout calling %s! (%ss)\n' % + (test_method, self._subproc_timeout)) + elif type(exc) == FileNotFoundError: logging.info( - f'* Error calling {test_method}!' - ' Check your paths!\n') + '* Error calling %s! Check cmds/paths.\n' % test_method) else: - logging.info(f'* Error calling {test_method}!\n') + logging.info('* Error calling %s!\n' % test_method) - # kernel_mods() helper function to call modprobe def _modprobe_hlpr(self, module): + """load_kernel_mods() helper function to call modprobe.""" try: check_call( [self._path_modprobe, module], stderr=PIPE, timeout=self._subproc_timeout) - except self._sub_proc_excs: - logging.info(f'* Unable to load module {module}!') + except self._sub_process_excs: + logging.info('* Unable to load module %s!' % module) logging.info(' **********************************************') - logging.info(f' Warning: proceeding, but in-band IPMI may fail') + logging.info(' Warning: proceeding, but in-band IPMI may fail') logging.info(' **********************************************') else: - logging.info(f'- Successfully loaded module {module}') + logging.info('- Successfully loaded module %s' % module) - # check (and load) kernel modules - def kernel_mods(self): + def load_kernel_mods(self): + """Check (and load) kernel modules.""" logging.info('-----------------------') logging.info('Verifying kernel modules:') try: - output = self._subproc_logging(self._cmd_kernel_mods) + output = self._subproc_logging(self._path_lsmod) for module in self._kernel_modules: if module in output: - logging.info(f'- {module} already loaded') + logging.info('- %s already loaded' % module) else: self._modprobe_hlpr(module) logging.info('') - except self._sub_proc_excs as exc: - self._proc_exc(exc, self.kernel_mods.__qualname__) + except self._sub_process_excs as exc: + self._process_exc(exc, self.load_kernel_mods.__qualname__) + return False - # get ipmi chassis data - # pass if called w/o error - def impi_chassis(self): + def get_impi_chassis(self): + """Get ipmi chassis data, pass if called w/o error.""" logging.info('-----------------------') logging.info('Fetching chassis status:') try: self._subproc_logging(self._cmd_ipmi_chassis) - except self._sub_proc_excs as exc: - self._proc_exc(exc, self.impi_chassis.__qualname__) - return 1 + except self._sub_process_excs as exc: + self._process_exc(exc, self.get_impi_chassis.__qualname__) + return False else: logging.info('- Fetched chassis status!\n') - return 0 + return True - # get power status via ipmi chassis data - # pass if called w/o error & system power field present - def pwr_status(self): + def get_pwr_status(self): + """Get power status via ipmi chassis data, + pass if called w/o error & system power field present. + """ logging.info('-----------------------') logging.info('Fetching power status:') regex = re.compile('^System Power') @@ -169,60 +167,59 @@ class IpmiTest(object): for line in output.rstrip().split('\n'): if re.search(regex, line): logging.info('- Fetched power status!\n') - return 0 - else: - logging.info('* Unable to retrieve power status via IPMI.\n') - return 1 - except self._sub_proc_excs as exc: - self._proc_exc(exc, self.pwr_status.__qualname__) - return 1 + return True + logging.info('* Unable to retrieve power status via IPMI.\n') + return False + except self._sub_process_excs as exc: + self._process_exc(exc, self.get_pwr_status.__qualname__) + return False - # call bmc-info - # pass if called w/o error - def bmc_info(self): + def get_bmc_info(self): + """Call bmc-info, pass if called w/o error.""" logging.info('-----------------------') logging.info('Fetching BMC information:') try: - self._subproc_logging(self._cmd_bmc_info) - except self._sub_proc_excs as exc: - self._proc_exc(exc, self.bmc_info.__qualname__) - return 1 + self._subproc_logging(self._cmd_get_bmc_info) + except self._sub_process_excs as exc: + self._process_exc(exc, self.get_bmc_info.__qualname__) + return False else: logging.info('- Fetched BMC information!\n') - return 0 + return True - # ipmi driver discovery loop def _ipmi_version_hlpr(self): + """Ipmi version discovery loop.""" regex = re.compile('^IPMI Version') - output = self._subproc_logging(self._cmd_bmc_info) + output = self._subproc_logging(self._cmd_get_bmc_info) for line in output.rstrip().split('\n'): if re.search(regex, line): version = (line.split(':'))[1].strip() - return version + return float(version) - # fetch ipmi version via bmc-info sdout - # pass if ipmi version >= self._ipmi_ver - def ipmi_version(self): + def chk_ipmi_version(self): + """Fetch ipmi version via bmc-info sdout, + pass if ipmi version >= self._ipmi_ver. + """ logging.info('-----------------------') logging.info('Validating IPMI version:') try: version = self._ipmi_version_hlpr() - logging.info(f'- IPMI version: {version}') - except self._sub_proc_excs as exc: - self._proc_exc(exc, self.ipmi_version.__qualname__) - return 1 + logging.info('- IPMI version: %.1f' % version) + except self._sub_process_excs as exc: + self._process_exc(exc, self.chk_ipmi_version.__qualname__) + return False else: - if (float(version) < float(self._ipmi_ver)): - logging.info(f'* IPMI version below {self._ipmi_ver}!\n') - return 1 + if version < float(self._ipmi_ver): + logging.info('* IPMI version below %d!\n' % self._ipmi_ver) + return False else: - logging.info(f' IPMI version compliant!\n') - return 0 + logging.info(' IPMI version compliant!\n') + return True - # ipmi_channel discovery loop def _ipmi_channel_hlpr(self, i, regex, channel): + """get_ipmi_channel discovery loop.""" cmd = self._cmd_ipmi_channel - if (len(cmd) > 4): + if len(cmd) > 3: cmd.pop(-1) cmd.append(str(i)) output = self._subproc_logging(cmd) @@ -231,9 +228,10 @@ class IpmiTest(object): channel.append(i) return channel - # get ipmi channel(s) in use - # pass if user data returns after calling ipmi-config - def ipmi_channel(self): + def get_ipmi_channel(self): + """Get ipmi channel(s) in use, + pass if user data returns after calling ipmi-config. + """ logging.info('-----------------------') logging.info('Fetching IPMI channels:') # support multiple channels @@ -243,68 +241,26 @@ class IpmiTest(object): for i in range(16): try: self._ipmi_channel_hlpr(i, regex, channel) - except self._sub_proc_excs as exc: - self._proc_exc(exc, self.ipmi_channel.__qualname__) - return 1 - else: - if (len(channel) > 0): - logging.info(f'- Found {len(channel)} channel(s)!') - logging.info(f' IPMI channel(s): {channel}\n') - return 0 - else: - logging.info('* Unable to fetch IPMI channel!\n') - return 1 - - # ipmi driver discovery loop - def _ipmi_locate_hlpr(self, ipmi_drivers): - regex = re.compile('driver:') - output = self._subproc_logging(self._cmd_ipmi_locate) - for line in output.rstrip().split('\n'): - if re.search(regex, line): - driver = (line.split(':'))[1].lstrip() - ipmi_drivers.append(driver) + except self._sub_process_excs as exc: + self._process_exc(exc, self.get_ipmi_channel.__qualname__) + return False + if len(channel) > 0: + logging.info('- Found %d channel(s)!' % len(channel)) + logging.info(' IPMI channel(s): %s\n' % channel) + return True else: - return ipmi_drivers + logging.info('* Unable to fetch IPMI channel!\n') + return False - # call ipmi-locate - # pass if drivers are loaded - # -- update 4-29-20 -- - # ipmi-locate is throwing segfault when run on some focal - # systems during regression testing. bladernr/Jeff has - # filed a bug w/ FreeIPMI to address. - # we've opted to leave this test informational for data - # while this is being investigated. - def ipmi_locate(self): - logging.info('-----------------------') - logging.info('Locating IPMI drivers:') - # support multiple driver types - ipmi_drivers = [] - try: - self._ipmi_locate_hlpr(ipmi_drivers) - except self._sub_proc_excs as exc: - self._proc_exc(exc, self.ipmi_locate.__qualname__) - # see above comments - return 0 - else: - if (len(ipmi_drivers) > 0): - logging.info(f'- Found {len(ipmi_drivers)} IPMI driver(s)!') - logging.info(f' IPMI driver(s) loaded: {ipmi_drivers}\n') - return 0 - else: - logging.info('* Unable to locate IPMI driver(s)!\n') - # see above comments - return 0 - - # initialize kernel modules, run ipmi tests def run_test(self): + """Initialize kernel modules, run ipmi tests.""" # load/val kernel modules - self.kernel_mods() - results = [self.impi_chassis(), - self.pwr_status(), - self.bmc_info(), - self.ipmi_version(), - self.ipmi_channel(), - self.ipmi_locate()] + self.load_kernel_mods() + results = [self.get_impi_chassis(), + self.get_pwr_status(), + self.get_bmc_info(), + self.chk_ipmi_version(), + self.get_ipmi_channel()] return results @@ -317,31 +273,36 @@ def main(): help='suppress output') args = parser.parse_args() # init logging - if ((not args.quiet) or args.debug): + if not args.quiet or args.debug: logger = logging.getLogger() logger.setLevel(logging.DEBUG) - if (not args.quiet): + if not args.quiet: console_handler = logging.StreamHandler() console_handler.setLevel(logging.INFO) logger.addHandler(console_handler) if args.debug: console_handler.setLevel(logging.DEBUG) - # instantiate IpmiTest as ipmi_test - ipmi_test = IpmiTest() - # run tests, return in [results] - results = ipmi_test.run_test() + print('## Running IPMI Tests ##') + # instantiate FreeIpmiTest as f_ipmi_test + f_ipmi_test = FreeIpmiTest() + results = f_ipmi_test.run_test() results_dict = {'Chassis': results[0], 'Power': results[1], 'BMC': results[2], - 'IPMI Version': results[3], - 'Channel': results[4], - 'IPMI Locate': results[5]} + 'Version': results[3], + 'Channel': results[4]} # tally results - if (sum(results) > 0): + if sum(results) < len(results): + # transpose readable values into results_dict + for test, result in results_dict.items(): + if result: + results_dict[test] = 'Pass' + else: + results_dict[test] = 'Fail' print('-----------------------') print('## IPMI tests failed! ##') - print(f'## {results_dict} ##') + print('## %r ##' % results_dict) return 1 else: print('-----------------------') @@ -349,6 +310,5 @@ def main(): return 0 -# call main() if __name__ == '__main__': sys.exit(main()) |