diff options
author | Adrian Lane <adrian.lane@canonical.com> | 2020-04-26 22:32:24 -0700 |
---|---|---|
committer | Adrian Lane <adrian.lane@canonical.com> | 2020-04-26 22:32:24 -0700 |
commit | 83eab9d81a35e6aaca90777b796d2eb89778bdbb (patch) | |
tree | 5df8df83ed44413babae07a45509da79aa58f3e2 | |
parent | 41a078175bd4648ae90e03529eee2f4243941b0f (diff) |
Enhanced ipmi driver retrival, cleaned up output formatting & syntax, fixed multiple helper fn logic.
-rwxr-xr-x | bin/ipmi_test.py | 125 |
1 files changed, 66 insertions, 59 deletions
diff --git a/bin/ipmi_test.py b/bin/ipmi_test.py index 9eb1282..02cba07 100755 --- a/bin/ipmi_test.py +++ b/bin/ipmi_test.py @@ -74,14 +74,14 @@ class IpmiTest(object): OSError, TypeError) - # fetch absolute path via shutil lib w/ exception handling + # get absolute path def _get_path(self, binary): try: path_full = shutil.which(binary) return path_full except (self._sub_proc_excs[-2:-1]): - logging.info('Unable to stat path via shutil lib!') - logging.info('Using relative paths...') + logging.info('* Unable to stat path via shutil lib!') + logging.info('* Using relative paths...') return binary # subprocess stdin/stderr handling @@ -105,14 +105,14 @@ class IpmiTest(object): def _proc_exc(self, exc, test_method): if (type(exc) == TimeoutExpired): logging.info( - f'Timeout calling {test_method}!' + f'* Timeout calling {test_method}!' f' ({self._subproc_timeout}s)\n') elif (type(exc) == TypeError): logging.info( - f'Error calling {test_method}!' + f'* Error calling {test_method}!' ' Check your paths!\n') else: - logging.info(f'Error calling {test_method}!\n') + logging.info(f'* Error calling {test_method}!\n') # kernel_mods() helper function to call modprobe def _modprobe_hlpr(self, module): @@ -154,7 +154,7 @@ class IpmiTest(object): self._proc_exc(exc, self.impi_chassis.__qualname__) return 1 else: - logging.info('Fetched chassis status!\n') + logging.info('- Fetched chassis status!\n') return 0 # get power status via ipmi chassis data @@ -167,15 +167,57 @@ class IpmiTest(object): output = self._subproc_logging(self._cmd_ipmi_chassis) for line in output.rstrip().split('\n'): if re.search(regex, line): - logging.info('Fetched power status!\n') + logging.info('- Fetched power status!\n') return 0 else: - logging.info('Unable to retrieve power status via IPMI.\n') + logging.info('* Unable to retrieve power status via IPMI.\n') return 1 except self._sub_proc_excs as exc: self._proc_exc(exc, self.pwr_status.__qualname__) return 1 + # call bmc-info + # pass if called w/o error + def bmc_info(self): + logging.info('-----------------------') + logging.info('Fetching BMC information:') + try: + self._subproc_logging(self._cmd_bmc_info) + except self._sub_proc_excs as exc: + self._proc_exc(exc, self.bmc_info.__qualname__) + return 1 + else: + logging.info('- Fetched BMC information!\n') + return 0 + + # ipmi driver discovery loop + def _ipmi_version_hlpr(self): + regex = re.compile('^IPMI Version') + output = self._subproc_logging(self._cmd_bmc_info) + for line in output.rstrip().split('\n'): + if re.search(regex, line): + version = (line.split(':'))[1].strip() + return version + + # fetch ipmi version via bmc-info sdout + # pass if ipmi version >= self._ipmi_ver + def ipmi_version(self): + logging.info('-----------------------') + logging.info('Validating IPMI version:') + try: + version = self._ipmi_version_hlpr() + logging.info(f'- IPMI version: {version}') + except self._sub_proc_excs as exc: + self._proc_exc(exc, self.ipmi_version.__qualname__) + return 1 + else: + if (float(version) < float(self._ipmi_ver)): + logging.info(f'* IPMI version below {self._ipmi_ver}!\n') + return 1 + else: + logging.info(f' IPMI version compliant!\n') + return 0 + # ipmi_channel discovery loop def _ipmi_channel_hlpr(self, i, channel): regex = re.compile('Section User') @@ -187,14 +229,13 @@ class IpmiTest(object): for line in output.rstrip().split('\n'): if re.search(regex, line): channel.append(i) - break - return channel + return channel # get ipmi channel(s) in use # pass if user data returns after calling ipmi-config def ipmi_channel(self): logging.info('-----------------------') - logging.info('Fetching IPMI channel:') + logging.info('Fetching IPMI channels:') # support multiple channels channel = [] # test channels 0 - 15 @@ -206,47 +247,13 @@ class IpmiTest(object): return 1 else: if (len(channel) > 0): - logging.info(f'Found {len(channel)} channel(s)!') - logging.info(f'IPMI Channel(s): {channel}\n') + logging.info(f'- Found {len(channel)} channel(s)!') + logging.info(f' IPMI channel(s): {channel}\n') return 0 else: - logging.info('Unable to fetch IPMI channel!\n') + logging.info('* Unable to fetch IPMI channel!\n') return 1 - # call bmc-info - # pass if called w/o error - def bmc_info(self): - logging.info('-----------------------') - logging.info('Fetching BMC information:') - try: - self._subproc_logging(self._cmd_bmc_info) - except self._sub_proc_excs as exc: - self._proc_exc(exc, self.bmc_info.__qualname__) - return 1 - else: - logging.info('Fetched BMC information!\n') - return 0 - - # fetch ipmi version via bmc-info sdout - # pass if ipmi version >= self._ipmi_ver - def ipmi_version(self): - logging.info('-----------------------') - logging.info('Testing IPMI version:') - try: - output = self._subproc_logging(self._cmd_bmc_info) - # Prefer .index() over .find() for exception handling - res_index = output.index('IPMI Version') - version = output[(res_index + 24):(res_index + 27)] - logging.info(f'IPMI Version: {version}\n') - if (float(version) < float(self._ipmi_ver)): - logging.info(f'IPMI Version below {self._ipmi_ver}!\n') - return 1 - else: - return 0 - except self._sub_proc_excs as exc: - self._proc_exc(exc, self.ipmi_version.__qualname__) - return 1 - # ipmi driver discovery loop def _ipmi_locate_hlpr(self, ipmi_drivers): regex = re.compile('driver:') @@ -255,7 +262,8 @@ class IpmiTest(object): if re.search(regex, line): driver = (line.split(':'))[1].lstrip() ipmi_drivers.append(driver) - return ipmi_drivers + else: + return ipmi_drivers # call ipmi-locate # pass if drivers are loaded @@ -271,29 +279,27 @@ class IpmiTest(object): return 1 else: if (len(ipmi_drivers) > 0): - logging.info(f'Found {len(ipmi_drivers)} IPMI driver(s)!') - logging.info(f'IPMI driver(s) loaded: {ipmi_drivers}\n') + logging.info(f'- Found {len(ipmi_drivers)} IPMI driver(s)!') + logging.info(f' IPMI driver(s) loaded: {ipmi_drivers}\n') return 0 else: - logging.info('Unable to locate IPMI driver(s)!\n') + logging.info('* Unable to locate IPMI driver(s)!\n') return 1 - # initialize kernel modules and run ipmi tests + # initialize kernel modules, run ipmi tests def run_test(self): # load/val kernel modules self.kernel_mods() - # tally results results = [self.impi_chassis(), self.pwr_status(), - self.ipmi_channel(), self.bmc_info(), self.ipmi_version(), + self.ipmi_channel(), self.ipmi_locate()] return results def main(): - # init logging subsystem # instantiate argparse as parser parser = argparse.ArgumentParser() parser.add_argument('-d', '--debug', action='store_true', @@ -301,6 +307,7 @@ def main(): parser.add_argument('-q', '--quiet', action='store_true', help='suppress output') args = parser.parse_args() + # init logging if ((not args.quiet) or args.debug): logger = logging.getLogger() logger.setLevel(logging.DEBUG) @@ -313,13 +320,13 @@ def main(): # instantiate IpmiTest as ipmi_test ipmi_test = IpmiTest() - # run test >> [results] + # run tests, return in [results] results = ipmi_test.run_test() results_dict = {'Chassis': results[0], 'Power': results[1], - 'Channel': results[2], 'BMC': results[3], 'IPMI Version': results[4], + 'Channel': results[2], 'IPMI Locate': results[5]} # tally results if (sum(results) > 0): |