summaryrefslogtreecommitdiff
diff options
authorPMR <pmr@pmr-lander>2020-05-29 07:16:44 +0000
committerPMR <pmr@pmr-lander>2020-05-29 07:16:44 +0000
commit078485c68547aacdcbc49afb7cd3f3b4f0e859b7 (patch)
tree44132a9239e43612b2155cb96e27c44a5391152d
parentebf8a2479675321f1659852f68bc2934b5e93470 (diff)
parent229bed724ed30beda6a4279240921eec476699b2 (diff)
Merge #384062 from ~alanec/plainbox-provider-checkbox:ipmi_test-py_patch
bin/ipmi_test.py: Added support for multiple IPMI driver types (at once), cleaned up ipmi_channel/_hlpr, syntax cleanup. -lp:#1874629
-rwxr-xr-xbin/ipmi_test.py373
1 files changed, 183 insertions, 190 deletions
diff --git a/bin/ipmi_test.py b/bin/ipmi_test.py
index c074f6bf..0aefea19 100755
--- a/bin/ipmi_test.py
+++ b/bin/ipmi_test.py
@@ -21,7 +21,6 @@ Tests IPMI subsystem on SUT.
"""
import re
-import os
import shutil
import sys
import argparse
@@ -34,255 +33,241 @@ from subprocess import (
SubprocessError)
-class IpmiTest(object):
+class FreeIpmiTest:
+
def __init__(self):
- # paths to kernel_module binaries
- self.path_lsmod = self._get_path('lsmod')
- self.path_modprobe = self._get_path('modprobe')
- # kernel modules to load/verify
- self.kernel_modules = (
+ def get_path(binary):
+ """Get absolute path of FreeIPMI/nix binary,
+ warn upon failure.
+ """
+ path_full = shutil.which(binary)
+ if path_full:
+ return path_full
+ else:
+ logging.info(
+ '* Unable to stat absolute path for %s!'
+ % binary)
+ return binary
+
+ # paths to load_kernel_module() binaries
+ self._path_lsmod = get_path('lsmod')
+ self._path_modprobe = get_path('modprobe')
+ # ipmi kernel modules to load/verify
+ self._kernel_modules = (
'ipmi_si',
'ipmi_devintf',
'ipmi_powernv',
'ipmi_ssif',
'ipmi_msghandler')
- # paths to freeipmi tools
- self.path_ipmi_chassis = self._get_path('ipmi-chassis')
- self.path_ipmi_config = self._get_path('ipmi-config')
- self.path_bmc_info = self._get_path('bmc-info')
- self.path_ipmi_locate = self._get_path('ipmi-locate')
- # function subprocess commands
- self.cmd_kernel_mods = [
- 'sudo', self.path_lsmod]
- self.cmd_ipmi_chassis = [
- 'sudo', self.path_ipmi_chassis, '--get-status']
- self.cmd_ipmi_channel = [
- 'sudo', self.path_ipmi_config, '--checkout',
+ # method subprocess commands (FreeIPMI)
+ self._cmd_ipmi_chassis = [
+ get_path('ipmi-chassis'), '--get-status']
+ self._cmd_ipmi_channel = [
+ get_path('ipmi-config'), '--checkout',
'--lan-channel-number']
- self.cmd_bmc_info = [
- 'sudo', self.path_bmc_info]
- self.cmd_ipmi_locate = [
- 'sudo', self.path_ipmi_locate]
+ self._cmd_get_bmc_info = [
+ get_path('bmc-info')]
+ self._cmd_ipmi_locate = [
+ get_path('ipmi-locate')]
# min. ipmi version to pass
- self.ipmi_ver = 2.0
+ self._ipmi_ver = 2.0
# subprocess call timeout (s)
- self.subproc_timeout = 10
+ self._subproc_timeout = 10
# raised subproc exceptions to handle
- self.sub_proc_excs = (
+ # (decoupled from self._process_exc())
+ self._sub_process_excs = (
TimeoutExpired,
SubprocessError,
OSError,
- TypeError)
+ TypeError,
+ FileNotFoundError)
- # fetch absolute path via shutil lib w/ exception handling
- def _get_path(self, binary):
- try:
- path_full = shutil.which(binary)
- return path_full
- except (self.sub_proc_excs[2:3]):
- logging.info('Unable to stat path via shutil lib!')
- logging.info('Using relative paths...')
- return binary
-
- # subprocess stdin/stderr handling
def _subproc_logging(self, cmd):
+ """Subprocess stdin/stderr handling."""
process = Popen(
cmd, stdout=PIPE, stderr=PIPE, universal_newlines=True)
- output, error = process.communicate(timeout=self.subproc_timeout)
+ output, error = process.communicate(timeout=self._subproc_timeout)
logging.debug('## Debug Output: ##')
- if (len(output) > 0):
- # padding
+ if output:
logging.debug(' [Stdout]\n')
- logging.debug(f'{output}\n')
- if (len(error) > 0):
- # padding
+ logging.debug('%s\n' % output)
+ if error:
logging.debug(' [Stderr]\n')
- logging.debug(f'{error}\n')
+ logging.debug('%s\n' % error)
logging.debug('## End Debug Output ##\n')
+ # ignore stderr
return output
- # post-process exception handling
- def _proc_exc(self, exc, subtest):
- if (type(exc) == TimeoutExpired):
+ def _process_exc(self, exc, test_method):
+ """Allows for bundling of exception handling for all
+ methods within this class.
+ """
+ if type(exc) is TimeoutExpired:
logging.info(
- f'Timeout calling {subtest}!'
- f' ({self.subproc_timeout}s)\n')
- elif (type(exc) == TypeError):
+ '* Timeout calling %s! (%ss)\n' %
+ (test_method, self._subproc_timeout))
+ elif type(exc) is FileNotFoundError:
logging.info(
- f'Error calling {subtest}!'
- ' Check your paths!\n')
+ '* Error calling %s! Check cmds/paths.\n' % test_method)
else:
- logging.info(f'Error calling {subtest}!\n')
+ logging.info('* Error calling %s!\n' % test_method)
- # kernel_mods() helper function to call modprobe
def _modprobe_hlpr(self, module):
+ """load_kernel_mods() helper function to call modprobe."""
try:
check_call(
- [self.path_modprobe, module],
- stderr=PIPE, timeout=self.subproc_timeout)
- except self.sub_proc_excs:
- logging.info(f'* Unable to load module {module}!')
+ [self._path_modprobe, module],
+ stderr=PIPE, timeout=self._subproc_timeout)
+ except self._sub_process_excs:
+ logging.info('* Unable to load module %s!' % module)
logging.info(' **********************************************')
- logging.info(f' Warning: proceeding, but in-band IPMI may fail')
+ logging.info(' Warning: proceeding, but in-band IPMI may fail')
logging.info(' **********************************************')
else:
- logging.info(f'- Successfully loaded module {module}')
+ logging.info('- Successfully loaded module %s' % module)
- # check (and load) kernel modules
- def kernel_mods(self):
+ def load_kernel_mods(self):
+ """Check (and load) kernel modules."""
logging.info('-----------------------')
logging.info('Verifying kernel modules:')
try:
- output = self._subproc_logging(self.cmd_kernel_mods)
- for module in self.kernel_modules:
+ output = self._subproc_logging(self._path_lsmod)
+ for module in self._kernel_modules:
if module in output:
- logging.info(f'- {module} already loaded')
+ logging.info('- %s already loaded' % module)
else:
self._modprobe_hlpr(module)
logging.info('')
- except self.sub_proc_excs as exc:
- self._proc_exc(exc, 'lsmod')
+ except self._sub_process_excs as exc:
+ self._process_exc(exc, self.load_kernel_mods.__qualname__)
+ return False
- # get ipmi chassis data
- # pass if called w/o error
- def impi_chassis(self):
+ def get_impi_chassis(self):
+ """Get ipmi chassis data, pass if called w/o error."""
logging.info('-----------------------')
logging.info('Fetching chassis status:')
try:
- self._subproc_logging(self.cmd_ipmi_chassis)
- except self.sub_proc_excs as exc:
- self._proc_exc(exc, 'ipmi_chassis()')
- return 1
+ self._subproc_logging(self._cmd_ipmi_chassis)
+ except self._sub_process_excs as exc:
+ self._process_exc(exc, self.get_impi_chassis.__qualname__)
+ return False
else:
- logging.info('Fetched chassis status!\n')
- return 0
+ logging.info('- Fetched chassis status!\n')
+ return True
- # get power status via ipmi chassis data
- # pass if called w/o error & system power field present
- def pwr_status(self):
+ def get_pwr_status(self):
+ """Get power status via ipmi chassis data,
+ pass if called w/o error & system power field present.
+ """
logging.info('-----------------------')
logging.info('Fetching power status:')
regex = re.compile('^System Power')
try:
- output = self._subproc_logging(self.cmd_ipmi_chassis)
+ output = self._subproc_logging(self._cmd_ipmi_chassis)
for line in output.rstrip().split('\n'):
if re.search(regex, line):
- logging.info('Fetched power status!\n')
- return 0
+ logging.info('- Fetched power status!\n')
+ return True
+ logging.info('* Unable to retrieve power status via IPMI.\n')
+ return False
+ except self._sub_process_excs as exc:
+ self._process_exc(exc, self.get_pwr_status.__qualname__)
+ return False
+
+ def get_bmc_info(self):
+ """Call bmc-info, pass if called w/o error."""
+ logging.info('-----------------------')
+ logging.info('Fetching BMC information:')
+ try:
+ self._subproc_logging(self._cmd_get_bmc_info)
+ except self._sub_process_excs as exc:
+ self._process_exc(exc, self.get_bmc_info.__qualname__)
+ return False
+ else:
+ logging.info('- Fetched BMC information!\n')
+ return True
+
+ def _ipmi_version_hlpr(self):
+ """Ipmi version discovery loop."""
+ regex = re.compile('^IPMI Version')
+ output = self._subproc_logging(self._cmd_get_bmc_info)
+ for line in output.rstrip().split('\n'):
+ if re.search(regex, line):
+ version = (line.split(':'))[1].strip()
+ return float(version)
+
+ def chk_ipmi_version(self):
+ """Fetch ipmi version via bmc-info sdout,
+ pass if ipmi version >= self._ipmi_ver.
+ """
+ logging.info('-----------------------')
+ logging.info('Validating IPMI version:')
+ try:
+ version = self._ipmi_version_hlpr()
+ logging.info('- IPMI version: %.1f' % version)
+ except self._sub_process_excs as exc:
+ self._process_exc(exc, self.chk_ipmi_version.__qualname__)
+ return False
+ else:
+ if version < float(self._ipmi_ver):
+ logging.info('* IPMI version below %d!\n' % self._ipmi_ver)
+ return False
else:
- logging.info('Unable to retrieve power status via IPMI.\n')
- return 1
- except self.sub_proc_excs as exc:
- self._proc_exc(exc, 'pwr_status()')
- return 1
+ logging.info(' IPMI version compliant!\n')
+ return True
- # ipmi_channel discovery loop
- def _ipmi_channel_hlpr(self, i, matches, channel):
- regex = re.compile('Section User')
- cmd = self.cmd_ipmi_channel
- if (len(cmd) > 4):
+ def _ipmi_channel_hlpr(self, i, regex, channel):
+ """get_ipmi_channel discovery loop."""
+ cmd = self._cmd_ipmi_channel
+ if len(cmd) > 3:
cmd.pop(-1)
cmd.append(str(i))
output = self._subproc_logging(cmd)
for line in output.rstrip().split('\n'):
if re.search(regex, line):
- matches.append(1)
channel.append(i)
- break
- return (matches, channel)
+ return channel
- # get ipmi channel(s) in use
- # pass if user data returns after calling ipmi-config
- def ipmi_channel(self):
+ def get_ipmi_channel(self):
+ """Get ipmi channel(s) in use,
+ pass if user data returns after calling ipmi-config.
+ """
logging.info('-----------------------')
- logging.info('Fetching IPMI channel:')
- matches = []
+ logging.info('Fetching IPMI channels:')
# support multiple channels
channel = []
+ regex = re.compile('Section User')
# test channels 0 - 15
for i in range(16):
+ # channel 12, 13 are invalid channels, skip
+ if i == (12 | 13):
+ continue
try:
- self._ipmi_channel_hlpr(i, matches, channel)
- except self.sub_proc_excs as exc:
- self._proc_exc(exc, 'ipmi_channel()')
- return 1
- else:
- if (sum(matches) > 0):
- logging.info(f'Found {sum(matches)} channel(s)!')
- logging.info(f'IPMI Channel(s): {channel}\n')
- return 0
- else:
- logging.info('Unable to fetch IPMI channel!\n')
- return 1
-
- # call bmc-info
- # pass if called w/o error
- def bmc_info(self):
- logging.info('-----------------------')
- logging.info('Fetching BMC information:')
- try:
- self._subproc_logging(self.cmd_bmc_info)
- except self.sub_proc_excs as exc:
- self._proc_exc(exc, 'bmc_info()')
- return 1
+ self._ipmi_channel_hlpr(i, regex, channel)
+ except self._sub_process_excs as exc:
+ self._process_exc(exc, self.get_ipmi_channel.__qualname__)
+ return False
+ if channel:
+ logging.info('- Found %d channel(s)!' % len(channel))
+ logging.info(' IPMI channel(s): %s\n' % channel)
+ return True
else:
- logging.info('Fetched BMC information!\n')
- return 0
+ logging.info('* Unable to fetch IPMI channel!\n')
+ return False
- # fetch ipmi version via bmc-info sdout
- # pass if ipmi version >= self.ipmi_ver
- def ipmi_version(self):
- logging.info('-----------------------')
- logging.info('Testing IPMI version:')
- try:
- output = self._subproc_logging(self.cmd_bmc_info)
- # Prefer .index() over .find() for exception handling
- res_index = output.index('IPMI Version')
- version = output[(res_index + 24):(res_index + 27)]
- logging.info(f'IPMI Version: {version}\n')
- if (float(version) < float(self.ipmi_ver)):
- logging.info(f'IPMI Version below {self.ipmi_ver}!\n')
- return 1
- else:
- return 0
- except self.sub_proc_excs as exc:
- self._proc_exc(exc, 'ipmi_version()')
- return 1
-
- # call ipmi-locate
- # pass if driver is loaded
- def ipmi_locate(self):
- logging.info('-----------------------')
- logging.info('Testing ipmi-locate:')
- regex = re.compile('driver:')
- try:
- output = self._subproc_logging(self.cmd_ipmi_locate)
- if re.search(regex, output):
- logging.info('Located IPMI driver!\n')
- return 0
- else:
- logging.info('Unable to locate IPMI driver!\n')
- return 1
- except self.sub_proc_excs as exc:
- self._proc_exc(exc, 'ipmi_locate()')
- return 1
-
- # initialize kernel modules and run ipmi tests
def run_test(self):
+ """Initialize kernel modules, run ipmi tests."""
# load/val kernel modules
- self.kernel_mods()
- # tally results
- results = [self.impi_chassis(),
- self.pwr_status(),
- self.ipmi_channel(),
- self.bmc_info(),
- self.ipmi_version(),
- self.ipmi_locate()]
+ self.load_kernel_mods()
+ results = [self.get_impi_chassis(),
+ self.get_pwr_status(),
+ self.get_bmc_info(),
+ self.chk_ipmi_version(),
+ self.get_ipmi_channel()]
return results
def main():
- # init logging subsystem
# instantiate argparse as parser
parser = argparse.ArgumentParser()
parser.add_argument('-d', '--debug', action='store_true',
@@ -290,35 +275,43 @@ def main():
parser.add_argument('-q', '--quiet', action='store_true',
help='suppress output')
args = parser.parse_args()
- if ((not args.quiet) or args.debug):
+ # init logging
+ if not args.quiet or args.debug:
logger = logging.getLogger()
logger.setLevel(logging.DEBUG)
- if (not args.quiet):
+ if not args.quiet:
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.INFO)
logger.addHandler(console_handler)
if args.debug:
console_handler.setLevel(logging.DEBUG)
- # instantiate IpmiTest as ipmi_test
- # pass to [results] for post-processing
- ipmi_test = IpmiTest()
- results = ipmi_test.run_test()
+ print('## Running IPMI Tests ##')
+ # instantiate FreeIpmiTest as f_ipmi_test
+ f_ipmi_test = FreeIpmiTest()
+ results = f_ipmi_test.run_test()
+ results_dict = {'Chassis': results[0],
+ 'Power': results[1],
+ 'BMC': results[2],
+ 'Version': results[3],
+ 'Channel': results[4]}
# tally results
- if (sum(results) > 0):
- print ('-----------------------')
- print ('## IPMI tests failed! ##')
- print (
- f'## Chassis: {results[0]} Power: {results[1]} ',
- f'Channel: {results[2]} BMC: {results[3]} ',
- f'IPMI Version: {results[4]} IPMI Locate: {results[5]} ##')
+ if sum(results) < len(results):
+ # transpose readable values into results_dict
+ for test, result in results_dict.items():
+ if result:
+ results_dict[test] = 'Pass'
+ else:
+ results_dict[test] = 'Fail'
+ print('-----------------------')
+ print('## IPMI tests failed! ##')
+ print('## %r ##' % results_dict)
return 1
else:
- print ('-----------------------')
- print ('## IPMI tests passed! ##')
+ print('-----------------------')
+ print('## IPMI tests passed! ##')
return 0
-# call main()
if __name__ == '__main__':
sys.exit(main())