diff options
author | PMR <pmr@pmr-lander> | 2020-05-29 07:16:44 +0000 |
---|---|---|
committer | PMR <pmr@pmr-lander> | 2020-05-29 07:16:44 +0000 |
commit | 078485c68547aacdcbc49afb7cd3f3b4f0e859b7 (patch) | |
tree | 44132a9239e43612b2155cb96e27c44a5391152d | |
parent | ebf8a2479675321f1659852f68bc2934b5e93470 (diff) | |
parent | 229bed724ed30beda6a4279240921eec476699b2 (diff) |
Merge #384062 from ~alanec/plainbox-provider-checkbox:ipmi_test-py_patch
bin/ipmi_test.py: Added support for multiple IPMI driver types (at once), cleaned up ipmi_channel/_hlpr, syntax cleanup. -lp:#1874629
-rwxr-xr-x | bin/ipmi_test.py | 373 |
1 files changed, 183 insertions, 190 deletions
diff --git a/bin/ipmi_test.py b/bin/ipmi_test.py index c074f6bf..0aefea19 100755 --- a/bin/ipmi_test.py +++ b/bin/ipmi_test.py @@ -21,7 +21,6 @@ Tests IPMI subsystem on SUT. """ import re -import os import shutil import sys import argparse @@ -34,255 +33,241 @@ from subprocess import ( SubprocessError) -class IpmiTest(object): +class FreeIpmiTest: + def __init__(self): - # paths to kernel_module binaries - self.path_lsmod = self._get_path('lsmod') - self.path_modprobe = self._get_path('modprobe') - # kernel modules to load/verify - self.kernel_modules = ( + def get_path(binary): + """Get absolute path of FreeIPMI/nix binary, + warn upon failure. + """ + path_full = shutil.which(binary) + if path_full: + return path_full + else: + logging.info( + '* Unable to stat absolute path for %s!' + % binary) + return binary + + # paths to load_kernel_module() binaries + self._path_lsmod = get_path('lsmod') + self._path_modprobe = get_path('modprobe') + # ipmi kernel modules to load/verify + self._kernel_modules = ( 'ipmi_si', 'ipmi_devintf', 'ipmi_powernv', 'ipmi_ssif', 'ipmi_msghandler') - # paths to freeipmi tools - self.path_ipmi_chassis = self._get_path('ipmi-chassis') - self.path_ipmi_config = self._get_path('ipmi-config') - self.path_bmc_info = self._get_path('bmc-info') - self.path_ipmi_locate = self._get_path('ipmi-locate') - # function subprocess commands - self.cmd_kernel_mods = [ - 'sudo', self.path_lsmod] - self.cmd_ipmi_chassis = [ - 'sudo', self.path_ipmi_chassis, '--get-status'] - self.cmd_ipmi_channel = [ - 'sudo', self.path_ipmi_config, '--checkout', + # method subprocess commands (FreeIPMI) + self._cmd_ipmi_chassis = [ + get_path('ipmi-chassis'), '--get-status'] + self._cmd_ipmi_channel = [ + get_path('ipmi-config'), '--checkout', '--lan-channel-number'] - self.cmd_bmc_info = [ - 'sudo', self.path_bmc_info] - self.cmd_ipmi_locate = [ - 'sudo', self.path_ipmi_locate] + self._cmd_get_bmc_info = [ + get_path('bmc-info')] + self._cmd_ipmi_locate = [ + get_path('ipmi-locate')] # min. ipmi version to pass - self.ipmi_ver = 2.0 + self._ipmi_ver = 2.0 # subprocess call timeout (s) - self.subproc_timeout = 10 + self._subproc_timeout = 10 # raised subproc exceptions to handle - self.sub_proc_excs = ( + # (decoupled from self._process_exc()) + self._sub_process_excs = ( TimeoutExpired, SubprocessError, OSError, - TypeError) + TypeError, + FileNotFoundError) - # fetch absolute path via shutil lib w/ exception handling - def _get_path(self, binary): - try: - path_full = shutil.which(binary) - return path_full - except (self.sub_proc_excs[2:3]): - logging.info('Unable to stat path via shutil lib!') - logging.info('Using relative paths...') - return binary - - # subprocess stdin/stderr handling def _subproc_logging(self, cmd): + """Subprocess stdin/stderr handling.""" process = Popen( cmd, stdout=PIPE, stderr=PIPE, universal_newlines=True) - output, error = process.communicate(timeout=self.subproc_timeout) + output, error = process.communicate(timeout=self._subproc_timeout) logging.debug('## Debug Output: ##') - if (len(output) > 0): - # padding + if output: logging.debug(' [Stdout]\n') - logging.debug(f'{output}\n') - if (len(error) > 0): - # padding + logging.debug('%s\n' % output) + if error: logging.debug(' [Stderr]\n') - logging.debug(f'{error}\n') + logging.debug('%s\n' % error) logging.debug('## End Debug Output ##\n') + # ignore stderr return output - # post-process exception handling - def _proc_exc(self, exc, subtest): - if (type(exc) == TimeoutExpired): + def _process_exc(self, exc, test_method): + """Allows for bundling of exception handling for all + methods within this class. + """ + if type(exc) is TimeoutExpired: logging.info( - f'Timeout calling {subtest}!' - f' ({self.subproc_timeout}s)\n') - elif (type(exc) == TypeError): + '* Timeout calling %s! (%ss)\n' % + (test_method, self._subproc_timeout)) + elif type(exc) is FileNotFoundError: logging.info( - f'Error calling {subtest}!' - ' Check your paths!\n') + '* Error calling %s! Check cmds/paths.\n' % test_method) else: - logging.info(f'Error calling {subtest}!\n') + logging.info('* Error calling %s!\n' % test_method) - # kernel_mods() helper function to call modprobe def _modprobe_hlpr(self, module): + """load_kernel_mods() helper function to call modprobe.""" try: check_call( - [self.path_modprobe, module], - stderr=PIPE, timeout=self.subproc_timeout) - except self.sub_proc_excs: - logging.info(f'* Unable to load module {module}!') + [self._path_modprobe, module], + stderr=PIPE, timeout=self._subproc_timeout) + except self._sub_process_excs: + logging.info('* Unable to load module %s!' % module) logging.info(' **********************************************') - logging.info(f' Warning: proceeding, but in-band IPMI may fail') + logging.info(' Warning: proceeding, but in-band IPMI may fail') logging.info(' **********************************************') else: - logging.info(f'- Successfully loaded module {module}') + logging.info('- Successfully loaded module %s' % module) - # check (and load) kernel modules - def kernel_mods(self): + def load_kernel_mods(self): + """Check (and load) kernel modules.""" logging.info('-----------------------') logging.info('Verifying kernel modules:') try: - output = self._subproc_logging(self.cmd_kernel_mods) - for module in self.kernel_modules: + output = self._subproc_logging(self._path_lsmod) + for module in self._kernel_modules: if module in output: - logging.info(f'- {module} already loaded') + logging.info('- %s already loaded' % module) else: self._modprobe_hlpr(module) logging.info('') - except self.sub_proc_excs as exc: - self._proc_exc(exc, 'lsmod') + except self._sub_process_excs as exc: + self._process_exc(exc, self.load_kernel_mods.__qualname__) + return False - # get ipmi chassis data - # pass if called w/o error - def impi_chassis(self): + def get_impi_chassis(self): + """Get ipmi chassis data, pass if called w/o error.""" logging.info('-----------------------') logging.info('Fetching chassis status:') try: - self._subproc_logging(self.cmd_ipmi_chassis) - except self.sub_proc_excs as exc: - self._proc_exc(exc, 'ipmi_chassis()') - return 1 + self._subproc_logging(self._cmd_ipmi_chassis) + except self._sub_process_excs as exc: + self._process_exc(exc, self.get_impi_chassis.__qualname__) + return False else: - logging.info('Fetched chassis status!\n') - return 0 + logging.info('- Fetched chassis status!\n') + return True - # get power status via ipmi chassis data - # pass if called w/o error & system power field present - def pwr_status(self): + def get_pwr_status(self): + """Get power status via ipmi chassis data, + pass if called w/o error & system power field present. + """ logging.info('-----------------------') logging.info('Fetching power status:') regex = re.compile('^System Power') try: - output = self._subproc_logging(self.cmd_ipmi_chassis) + output = self._subproc_logging(self._cmd_ipmi_chassis) for line in output.rstrip().split('\n'): if re.search(regex, line): - logging.info('Fetched power status!\n') - return 0 + logging.info('- Fetched power status!\n') + return True + logging.info('* Unable to retrieve power status via IPMI.\n') + return False + except self._sub_process_excs as exc: + self._process_exc(exc, self.get_pwr_status.__qualname__) + return False + + def get_bmc_info(self): + """Call bmc-info, pass if called w/o error.""" + logging.info('-----------------------') + logging.info('Fetching BMC information:') + try: + self._subproc_logging(self._cmd_get_bmc_info) + except self._sub_process_excs as exc: + self._process_exc(exc, self.get_bmc_info.__qualname__) + return False + else: + logging.info('- Fetched BMC information!\n') + return True + + def _ipmi_version_hlpr(self): + """Ipmi version discovery loop.""" + regex = re.compile('^IPMI Version') + output = self._subproc_logging(self._cmd_get_bmc_info) + for line in output.rstrip().split('\n'): + if re.search(regex, line): + version = (line.split(':'))[1].strip() + return float(version) + + def chk_ipmi_version(self): + """Fetch ipmi version via bmc-info sdout, + pass if ipmi version >= self._ipmi_ver. + """ + logging.info('-----------------------') + logging.info('Validating IPMI version:') + try: + version = self._ipmi_version_hlpr() + logging.info('- IPMI version: %.1f' % version) + except self._sub_process_excs as exc: + self._process_exc(exc, self.chk_ipmi_version.__qualname__) + return False + else: + if version < float(self._ipmi_ver): + logging.info('* IPMI version below %d!\n' % self._ipmi_ver) + return False else: - logging.info('Unable to retrieve power status via IPMI.\n') - return 1 - except self.sub_proc_excs as exc: - self._proc_exc(exc, 'pwr_status()') - return 1 + logging.info(' IPMI version compliant!\n') + return True - # ipmi_channel discovery loop - def _ipmi_channel_hlpr(self, i, matches, channel): - regex = re.compile('Section User') - cmd = self.cmd_ipmi_channel - if (len(cmd) > 4): + def _ipmi_channel_hlpr(self, i, regex, channel): + """get_ipmi_channel discovery loop.""" + cmd = self._cmd_ipmi_channel + if len(cmd) > 3: cmd.pop(-1) cmd.append(str(i)) output = self._subproc_logging(cmd) for line in output.rstrip().split('\n'): if re.search(regex, line): - matches.append(1) channel.append(i) - break - return (matches, channel) + return channel - # get ipmi channel(s) in use - # pass if user data returns after calling ipmi-config - def ipmi_channel(self): + def get_ipmi_channel(self): + """Get ipmi channel(s) in use, + pass if user data returns after calling ipmi-config. + """ logging.info('-----------------------') - logging.info('Fetching IPMI channel:') - matches = [] + logging.info('Fetching IPMI channels:') # support multiple channels channel = [] + regex = re.compile('Section User') # test channels 0 - 15 for i in range(16): + # channel 12, 13 are invalid channels, skip + if i == (12 | 13): + continue try: - self._ipmi_channel_hlpr(i, matches, channel) - except self.sub_proc_excs as exc: - self._proc_exc(exc, 'ipmi_channel()') - return 1 - else: - if (sum(matches) > 0): - logging.info(f'Found {sum(matches)} channel(s)!') - logging.info(f'IPMI Channel(s): {channel}\n') - return 0 - else: - logging.info('Unable to fetch IPMI channel!\n') - return 1 - - # call bmc-info - # pass if called w/o error - def bmc_info(self): - logging.info('-----------------------') - logging.info('Fetching BMC information:') - try: - self._subproc_logging(self.cmd_bmc_info) - except self.sub_proc_excs as exc: - self._proc_exc(exc, 'bmc_info()') - return 1 + self._ipmi_channel_hlpr(i, regex, channel) + except self._sub_process_excs as exc: + self._process_exc(exc, self.get_ipmi_channel.__qualname__) + return False + if channel: + logging.info('- Found %d channel(s)!' % len(channel)) + logging.info(' IPMI channel(s): %s\n' % channel) + return True else: - logging.info('Fetched BMC information!\n') - return 0 + logging.info('* Unable to fetch IPMI channel!\n') + return False - # fetch ipmi version via bmc-info sdout - # pass if ipmi version >= self.ipmi_ver - def ipmi_version(self): - logging.info('-----------------------') - logging.info('Testing IPMI version:') - try: - output = self._subproc_logging(self.cmd_bmc_info) - # Prefer .index() over .find() for exception handling - res_index = output.index('IPMI Version') - version = output[(res_index + 24):(res_index + 27)] - logging.info(f'IPMI Version: {version}\n') - if (float(version) < float(self.ipmi_ver)): - logging.info(f'IPMI Version below {self.ipmi_ver}!\n') - return 1 - else: - return 0 - except self.sub_proc_excs as exc: - self._proc_exc(exc, 'ipmi_version()') - return 1 - - # call ipmi-locate - # pass if driver is loaded - def ipmi_locate(self): - logging.info('-----------------------') - logging.info('Testing ipmi-locate:') - regex = re.compile('driver:') - try: - output = self._subproc_logging(self.cmd_ipmi_locate) - if re.search(regex, output): - logging.info('Located IPMI driver!\n') - return 0 - else: - logging.info('Unable to locate IPMI driver!\n') - return 1 - except self.sub_proc_excs as exc: - self._proc_exc(exc, 'ipmi_locate()') - return 1 - - # initialize kernel modules and run ipmi tests def run_test(self): + """Initialize kernel modules, run ipmi tests.""" # load/val kernel modules - self.kernel_mods() - # tally results - results = [self.impi_chassis(), - self.pwr_status(), - self.ipmi_channel(), - self.bmc_info(), - self.ipmi_version(), - self.ipmi_locate()] + self.load_kernel_mods() + results = [self.get_impi_chassis(), + self.get_pwr_status(), + self.get_bmc_info(), + self.chk_ipmi_version(), + self.get_ipmi_channel()] return results def main(): - # init logging subsystem # instantiate argparse as parser parser = argparse.ArgumentParser() parser.add_argument('-d', '--debug', action='store_true', @@ -290,35 +275,43 @@ def main(): parser.add_argument('-q', '--quiet', action='store_true', help='suppress output') args = parser.parse_args() - if ((not args.quiet) or args.debug): + # init logging + if not args.quiet or args.debug: logger = logging.getLogger() logger.setLevel(logging.DEBUG) - if (not args.quiet): + if not args.quiet: console_handler = logging.StreamHandler() console_handler.setLevel(logging.INFO) logger.addHandler(console_handler) if args.debug: console_handler.setLevel(logging.DEBUG) - # instantiate IpmiTest as ipmi_test - # pass to [results] for post-processing - ipmi_test = IpmiTest() - results = ipmi_test.run_test() + print('## Running IPMI Tests ##') + # instantiate FreeIpmiTest as f_ipmi_test + f_ipmi_test = FreeIpmiTest() + results = f_ipmi_test.run_test() + results_dict = {'Chassis': results[0], + 'Power': results[1], + 'BMC': results[2], + 'Version': results[3], + 'Channel': results[4]} # tally results - if (sum(results) > 0): - print ('-----------------------') - print ('## IPMI tests failed! ##') - print ( - f'## Chassis: {results[0]} Power: {results[1]} ', - f'Channel: {results[2]} BMC: {results[3]} ', - f'IPMI Version: {results[4]} IPMI Locate: {results[5]} ##') + if sum(results) < len(results): + # transpose readable values into results_dict + for test, result in results_dict.items(): + if result: + results_dict[test] = 'Pass' + else: + results_dict[test] = 'Fail' + print('-----------------------') + print('## IPMI tests failed! ##') + print('## %r ##' % results_dict) return 1 else: - print ('-----------------------') - print ('## IPMI tests passed! ##') + print('-----------------------') + print('## IPMI tests passed! ##') return 0 -# call main() if __name__ == '__main__': sys.exit(main()) |