diff options
author | Adrian Lane <adrian.lane@canonical.com> | 2020-04-24 00:18:50 -0700 |
---|---|---|
committer | Adrian Lane <adrian.lane@canonical.com> | 2020-04-24 00:18:50 -0700 |
commit | 4ce8722888d4c35849633677fb6d986bdcf27e57 (patch) | |
tree | 7b70113afd269cb1678a0389c30e805fa001afd1 /bin | |
parent | 0894152d433a9ededf5f32b2fefdc712a0f91509 (diff) |
Added support for multiple IPMI driver types (at once), cleaned up ipmi_channel/_hlpr, syntax cleanup. -lp:#1874629
Diffstat (limited to 'bin')
-rwxr-xr-x | bin/ipmi_test.py | 162 |
1 files changed, 88 insertions, 74 deletions
diff --git a/bin/ipmi_test.py b/bin/ipmi_test.py index c074f6bf..a5a128ca 100755 --- a/bin/ipmi_test.py +++ b/bin/ipmi_test.py @@ -37,38 +37,38 @@ from subprocess import ( class IpmiTest(object): def __init__(self): # paths to kernel_module binaries - self.path_lsmod = self._get_path('lsmod') - self.path_modprobe = self._get_path('modprobe') + self._path_lsmod = self._get_path('lsmod') + self._path_modprobe = self._get_path('modprobe') # kernel modules to load/verify - self.kernel_modules = ( + self._kernel_modules = ( 'ipmi_si', 'ipmi_devintf', 'ipmi_powernv', 'ipmi_ssif', 'ipmi_msghandler') # paths to freeipmi tools - self.path_ipmi_chassis = self._get_path('ipmi-chassis') - self.path_ipmi_config = self._get_path('ipmi-config') - self.path_bmc_info = self._get_path('bmc-info') - self.path_ipmi_locate = self._get_path('ipmi-locate') + self._path_ipmi_chassis = self._get_path('ipmi-chassis') + self._path_ipmi_config = self._get_path('ipmi-config') + self._path_bmc_info = self._get_path('bmc-info') + self._path_ipmi_locate = self._get_path('ipmi-locate') # function subprocess commands - self.cmd_kernel_mods = [ - 'sudo', self.path_lsmod] - self.cmd_ipmi_chassis = [ - 'sudo', self.path_ipmi_chassis, '--get-status'] - self.cmd_ipmi_channel = [ - 'sudo', self.path_ipmi_config, '--checkout', + self._cmd_kernel_mods = [ + 'sudo', self._path_lsmod] + self._cmd_ipmi_chassis = [ + 'sudo', self._path_ipmi_chassis, '--get-status'] + self._cmd_ipmi_channel = [ + 'sudo', self._path_ipmi_config, '--checkout', '--lan-channel-number'] - self.cmd_bmc_info = [ - 'sudo', self.path_bmc_info] - self.cmd_ipmi_locate = [ - 'sudo', self.path_ipmi_locate] + self._cmd_bmc_info = [ + 'sudo', self._path_bmc_info] + self._cmd_ipmi_locate = [ + 'sudo', self._path_ipmi_locate] # min. ipmi version to pass - self.ipmi_ver = 2.0 + self._ipmi_ver = 2.0 # subprocess call timeout (s) - self.subproc_timeout = 10 + self._subproc_timeout = 10 # raised subproc exceptions to handle - self.sub_proc_excs = ( + self._sub_proc_excs = ( TimeoutExpired, SubprocessError, OSError, @@ -79,7 +79,7 @@ class IpmiTest(object): try: path_full = shutil.which(binary) return path_full - except (self.sub_proc_excs[2:3]): + except (self._sub_proc_excs[-2:-1]): logging.info('Unable to stat path via shutil lib!') logging.info('Using relative paths...') return binary @@ -88,7 +88,7 @@ class IpmiTest(object): def _subproc_logging(self, cmd): process = Popen( cmd, stdout=PIPE, stderr=PIPE, universal_newlines=True) - output, error = process.communicate(timeout=self.subproc_timeout) + output, error = process.communicate(timeout=self._subproc_timeout) logging.debug('## Debug Output: ##') if (len(output) > 0): # padding @@ -102,25 +102,25 @@ class IpmiTest(object): return output # post-process exception handling - def _proc_exc(self, exc, subtest): + def _proc_exc(self, exc, test_method): if (type(exc) == TimeoutExpired): logging.info( - f'Timeout calling {subtest}!' - f' ({self.subproc_timeout}s)\n') + f'Timeout calling {test_method}!' + f' ({self._subproc_timeout}s)\n') elif (type(exc) == TypeError): logging.info( - f'Error calling {subtest}!' + f'Error calling {test_method}!' ' Check your paths!\n') else: - logging.info(f'Error calling {subtest}!\n') + logging.info(f'Error calling {test_method}!\n') # kernel_mods() helper function to call modprobe def _modprobe_hlpr(self, module): try: check_call( - [self.path_modprobe, module], - stderr=PIPE, timeout=self.subproc_timeout) - except self.sub_proc_excs: + [self._path_modprobe, module], + stderr=PIPE, timeout=self._subproc_timeout) + except self._sub_proc_excs: logging.info(f'* Unable to load module {module}!') logging.info(' **********************************************') logging.info(f' Warning: proceeding, but in-band IPMI may fail') @@ -133,15 +133,15 @@ class IpmiTest(object): logging.info('-----------------------') logging.info('Verifying kernel modules:') try: - output = self._subproc_logging(self.cmd_kernel_mods) - for module in self.kernel_modules: + output = self._subproc_logging(self._cmd_kernel_mods) + for module in self._kernel_modules: if module in output: logging.info(f'- {module} already loaded') else: self._modprobe_hlpr(module) logging.info('') - except self.sub_proc_excs as exc: - self._proc_exc(exc, 'lsmod') + except self._sub_proc_excs as exc: + self._proc_exc(exc, self.kernel_mods.__qualname__) # get ipmi chassis data # pass if called w/o error @@ -149,9 +149,9 @@ class IpmiTest(object): logging.info('-----------------------') logging.info('Fetching chassis status:') try: - self._subproc_logging(self.cmd_ipmi_chassis) - except self.sub_proc_excs as exc: - self._proc_exc(exc, 'ipmi_chassis()') + self._subproc_logging(self._cmd_ipmi_chassis) + except self._sub_proc_excs as exc: + self._proc_exc(exc, self.impi_chassis.__qualname__) return 1 else: logging.info('Fetched chassis status!\n') @@ -164,7 +164,7 @@ class IpmiTest(object): logging.info('Fetching power status:') regex = re.compile('^System Power') try: - output = self._subproc_logging(self.cmd_ipmi_chassis) + output = self._subproc_logging(self._cmd_ipmi_chassis) for line in output.rstrip().split('\n'): if re.search(regex, line): logging.info('Fetched power status!\n') @@ -172,43 +172,41 @@ class IpmiTest(object): else: logging.info('Unable to retrieve power status via IPMI.\n') return 1 - except self.sub_proc_excs as exc: - self._proc_exc(exc, 'pwr_status()') + except self._sub_proc_excs as exc: + self._proc_exc(exc, self.pwr_status.__qualname__) return 1 # ipmi_channel discovery loop - def _ipmi_channel_hlpr(self, i, matches, channel): + def _ipmi_channel_hlpr(self, i, channel): regex = re.compile('Section User') - cmd = self.cmd_ipmi_channel + cmd = self._cmd_ipmi_channel if (len(cmd) > 4): cmd.pop(-1) cmd.append(str(i)) output = self._subproc_logging(cmd) for line in output.rstrip().split('\n'): if re.search(regex, line): - matches.append(1) channel.append(i) break - return (matches, channel) + return channel # get ipmi channel(s) in use # pass if user data returns after calling ipmi-config def ipmi_channel(self): logging.info('-----------------------') logging.info('Fetching IPMI channel:') - matches = [] # support multiple channels channel = [] # test channels 0 - 15 for i in range(16): try: - self._ipmi_channel_hlpr(i, matches, channel) - except self.sub_proc_excs as exc: - self._proc_exc(exc, 'ipmi_channel()') + self._ipmi_channel_hlpr(i, channel) + except self._sub_proc_excs as exc: + self._proc_exc(exc, self.ipmi_channel.__qualname__) return 1 else: - if (sum(matches) > 0): - logging.info(f'Found {sum(matches)} channel(s)!') + if (len(channel) > 0): + logging.info(f'Found {len(channel)} channel(s)!') logging.info(f'IPMI Channel(s): {channel}\n') return 0 else: @@ -221,51 +219,64 @@ class IpmiTest(object): logging.info('-----------------------') logging.info('Fetching BMC information:') try: - self._subproc_logging(self.cmd_bmc_info) - except self.sub_proc_excs as exc: - self._proc_exc(exc, 'bmc_info()') + self._subproc_logging(self._cmd_bmc_info) + except self._sub_proc_excs as exc: + self._proc_exc(exc, self.bmc_info.__qualname__) return 1 else: logging.info('Fetched BMC information!\n') return 0 # fetch ipmi version via bmc-info sdout - # pass if ipmi version >= self.ipmi_ver + # pass if ipmi version >= self._ipmi_ver def ipmi_version(self): logging.info('-----------------------') logging.info('Testing IPMI version:') try: - output = self._subproc_logging(self.cmd_bmc_info) + output = self._subproc_logging(self._cmd_bmc_info) # Prefer .index() over .find() for exception handling res_index = output.index('IPMI Version') version = output[(res_index + 24):(res_index + 27)] logging.info(f'IPMI Version: {version}\n') - if (float(version) < float(self.ipmi_ver)): - logging.info(f'IPMI Version below {self.ipmi_ver}!\n') + if (float(version) < float(self._ipmi_ver)): + logging.info(f'IPMI Version below {self._ipmi_ver}!\n') return 1 else: return 0 - except self.sub_proc_excs as exc: - self._proc_exc(exc, 'ipmi_version()') + except self._sub_proc_excs as exc: + self._proc_exc(exc, self.ipmi_version.__qualname__) return 1 + # ipmi driver discovery loop + def _ipmi_locate_hlpr(self, ipmi_drivers): + regex = re.compile('driver:') + output = self._subproc_logging(self._cmd_ipmi_locate) + for line in output.rstrip().split('\n'): + if re.search(regex, line): + driver = (line.split(':'))[1].lstrip() + ipmi_drivers.append(driver) + return ipmi_drivers + # call ipmi-locate - # pass if driver is loaded + # pass if drivers are loaded def ipmi_locate(self): logging.info('-----------------------') - logging.info('Testing ipmi-locate:') - regex = re.compile('driver:') + logging.info('Locating IPMI drivers:') + # support multiple driver types + ipmi_drivers = [] try: - output = self._subproc_logging(self.cmd_ipmi_locate) - if re.search(regex, output): - logging.info('Located IPMI driver!\n') + self._ipmi_locate_hlpr(ipmi_drivers) + except self._sub_proc_excs as exc: + self._proc_exc(exc, self.ipmi_locate.__qualname__) + return 1 + else: + if (len(ipmi_drivers) > 0): + logging.info(f'Found {len(ipmi_drivers)} IPMI driver(s)!') + logging.info(f'IPMI driver(s) loaded: {ipmi_drivers}\n') return 0 else: - logging.info('Unable to locate IPMI driver!\n') + logging.info('Unable to locate IPMI driver(s)!\n') return 1 - except self.sub_proc_excs as exc: - self._proc_exc(exc, 'ipmi_locate()') - return 1 # initialize kernel modules and run ipmi tests def run_test(self): @@ -301,17 +312,20 @@ def main(): console_handler.setLevel(logging.DEBUG) # instantiate IpmiTest as ipmi_test - # pass to [results] for post-processing ipmi_test = IpmiTest() + # run test >> [results] results = ipmi_test.run_test() + results_dict = {'Chassis': results[0], + 'Power': results[1], + 'Channel': results[2], + 'BMC': results[3], + 'IPMI Version': results[4], + 'IPMI Locate': results[5]} # tally results if (sum(results) > 0): print ('-----------------------') print ('## IPMI tests failed! ##') - print ( - f'## Chassis: {results[0]} Power: {results[1]} ', - f'Channel: {results[2]} BMC: {results[3]} ', - f'IPMI Version: {results[4]} IPMI Locate: {results[5]} ##') + print (f'## {results_dict} ##') return 1 else: print ('-----------------------') |