summaryrefslogtreecommitdiff
path: root/bin
diff options
authorAdrian Lane <adrian.lane@canonical.com>2020-05-16 22:30:23 -0700
committerAdrian Lane <adrian.lane@canonical.com>2020-05-16 22:30:23 -0700
commitd9c88169c9c49c0466d0600d6991a7c5d80a27ae (patch)
tree124d19f63c8fefe74f12030814d5b3d9e0c74023 /bin
parentb3edeb2ae96ad5a1747fd08dad8d6c59ec4350cd (diff)
Major refactor for: helper fn and __init__ cleanup, docstrings added and removed ipmi_locate() and its helper.
Diffstat (limited to 'bin')
-rwxr-xr-xbin/ipmi_test.py306
1 files changed, 133 insertions, 173 deletions
diff --git a/bin/ipmi_test.py b/bin/ipmi_test.py
index 685c381..f8f029d 100755
--- a/bin/ipmi_test.py
+++ b/bin/ipmi_test.py
@@ -21,7 +21,6 @@ Tests IPMI subsystem on SUT.
"""
import re
-import os
import shutil
import sys
import argparse
@@ -34,133 +33,132 @@ from subprocess import (
SubprocessError)
-class IpmiTest(object):
+class FreeIpmiTest:
def __init__(self):
- # paths to kernel_module binaries
- self._path_lsmod = self._get_path('lsmod')
- self._path_modprobe = self._get_path('modprobe')
- # kernel modules to load/verify
+ def get_path(binary):
+ """Get absolute path of FreeIPMI/nix binary,
+ warn upon failure.
+ """
+ path_full = shutil.which(binary)
+ if path_full:
+ return path_full
+ else:
+ logging.info(
+ '* Unable to stat absolute path for %s!'
+ % binary)
+ return binary
+
+ # paths to load_kernel_module() binaries
+ self._path_lsmod = get_path('lsmod')
+ self._path_modprobe = get_path('modprobe')
+ # ipmi kernel modules to load/verify
self._kernel_modules = (
'ipmi_si',
'ipmi_devintf',
'ipmi_powernv',
'ipmi_ssif',
'ipmi_msghandler')
- # paths to freeipmi tools
- self._path_ipmi_chassis = self._get_path('ipmi-chassis')
- self._path_ipmi_config = self._get_path('ipmi-config')
- self._path_bmc_info = self._get_path('bmc-info')
- self._path_ipmi_locate = self._get_path('ipmi-locate')
- # function subprocess commands
- self._cmd_kernel_mods = [
- 'sudo', self._path_lsmod]
+ # method subprocess commands (FreeIPMI)
self._cmd_ipmi_chassis = [
- 'sudo', self._path_ipmi_chassis, '--get-status']
+ get_path('ipmi-chassis'), '--get-status']
self._cmd_ipmi_channel = [
- 'sudo', self._path_ipmi_config, '--checkout',
+ get_path('ipmi-config'), '--checkout',
'--lan-channel-number']
- self._cmd_bmc_info = [
- 'sudo', self._path_bmc_info]
+ self._cmd_get_bmc_info = [
+ get_path('bmc-info')]
self._cmd_ipmi_locate = [
- 'sudo', self._path_ipmi_locate]
+ get_path('ipmi-locate')]
# min. ipmi version to pass
self._ipmi_ver = 2.0
# subprocess call timeout (s)
self._subproc_timeout = 10
# raised subproc exceptions to handle
- self._sub_proc_excs = (
+ # (decoupled from self._process_exc())
+ self._sub_process_excs = (
TimeoutExpired,
SubprocessError,
OSError,
- TypeError)
+ TypeError,
+ FileNotFoundError)
- # get absolute path
- def _get_path(self, binary):
- try:
- path_full = shutil.which(binary)
- return path_full
- except (self._sub_proc_excs[-2:-1]):
- logging.info('* Unable to stat path via shutil lib!')
- logging.info('* Using relative paths...')
- return binary
-
- # subprocess stdin/stderr handling
def _subproc_logging(self, cmd):
+ """Subprocess stdin/stderr handling."""
process = Popen(
cmd, stdout=PIPE, stderr=PIPE, universal_newlines=True)
output, error = process.communicate(timeout=self._subproc_timeout)
logging.debug('## Debug Output: ##')
- if (len(output) > 0):
- # padding
+ if output:
logging.debug(' [Stdout]\n')
- logging.debug(f'{output}\n')
- if (len(error) > 0):
- # padding
+ logging.debug('%s\n' % output)
+ if error:
logging.debug(' [Stderr]\n')
- logging.debug(f'{error}\n')
+ logging.debug('%s\n' % error)
logging.debug('## End Debug Output ##\n')
+ # ignore stderr
return output
- # post-process exception handling
- def _proc_exc(self, exc, test_method):
- if (type(exc) == TimeoutExpired):
+ def _process_exc(self, exc, test_method):
+ """Allows for bundling of exception handling for all
+ methods within this class.
+ """
+ if type(exc) == TimeoutExpired:
logging.info(
- f'* Timeout calling {test_method}!'
- f' ({self._subproc_timeout}s)\n')
- elif (type(exc) == TypeError):
+ '* Timeout calling %s! (%ss)\n' %
+ (test_method, self._subproc_timeout))
+ elif type(exc) == FileNotFoundError:
logging.info(
- f'* Error calling {test_method}!'
- ' Check your paths!\n')
+ '* Error calling %s! Check cmds/paths.\n' % test_method)
else:
- logging.info(f'* Error calling {test_method}!\n')
+ logging.info('* Error calling %s!\n' % test_method)
- # kernel_mods() helper function to call modprobe
def _modprobe_hlpr(self, module):
+ """load_kernel_mods() helper function to call modprobe."""
try:
check_call(
[self._path_modprobe, module],
stderr=PIPE, timeout=self._subproc_timeout)
- except self._sub_proc_excs:
- logging.info(f'* Unable to load module {module}!')
+ except self._sub_process_excs:
+ logging.info('* Unable to load module %s!' % module)
logging.info(' **********************************************')
- logging.info(f' Warning: proceeding, but in-band IPMI may fail')
+ logging.info(' Warning: proceeding, but in-band IPMI may fail')
logging.info(' **********************************************')
else:
- logging.info(f'- Successfully loaded module {module}')
+ logging.info('- Successfully loaded module %s' % module)
- # check (and load) kernel modules
- def kernel_mods(self):
+ def load_kernel_mods(self):
+ """Check (and load) kernel modules."""
logging.info('-----------------------')
logging.info('Verifying kernel modules:')
try:
- output = self._subproc_logging(self._cmd_kernel_mods)
+ output = self._subproc_logging(self._path_lsmod)
for module in self._kernel_modules:
if module in output:
- logging.info(f'- {module} already loaded')
+ logging.info('- %s already loaded' % module)
else:
self._modprobe_hlpr(module)
logging.info('')
- except self._sub_proc_excs as exc:
- self._proc_exc(exc, self.kernel_mods.__qualname__)
+ except self._sub_process_excs as exc:
+ self._process_exc(exc, self.load_kernel_mods.__qualname__)
+ return False
- # get ipmi chassis data
- # pass if called w/o error
- def impi_chassis(self):
+ def get_impi_chassis(self):
+ """Get ipmi chassis data, pass if called w/o error."""
logging.info('-----------------------')
logging.info('Fetching chassis status:')
try:
self._subproc_logging(self._cmd_ipmi_chassis)
- except self._sub_proc_excs as exc:
- self._proc_exc(exc, self.impi_chassis.__qualname__)
- return 1
+ except self._sub_process_excs as exc:
+ self._process_exc(exc, self.get_impi_chassis.__qualname__)
+ return False
else:
logging.info('- Fetched chassis status!\n')
- return 0
+ return True
- # get power status via ipmi chassis data
- # pass if called w/o error & system power field present
- def pwr_status(self):
+ def get_pwr_status(self):
+ """Get power status via ipmi chassis data,
+ pass if called w/o error & system power field present.
+ """
logging.info('-----------------------')
logging.info('Fetching power status:')
regex = re.compile('^System Power')
@@ -169,60 +167,59 @@ class IpmiTest(object):
for line in output.rstrip().split('\n'):
if re.search(regex, line):
logging.info('- Fetched power status!\n')
- return 0
- else:
- logging.info('* Unable to retrieve power status via IPMI.\n')
- return 1
- except self._sub_proc_excs as exc:
- self._proc_exc(exc, self.pwr_status.__qualname__)
- return 1
+ return True
+ logging.info('* Unable to retrieve power status via IPMI.\n')
+ return False
+ except self._sub_process_excs as exc:
+ self._process_exc(exc, self.get_pwr_status.__qualname__)
+ return False
- # call bmc-info
- # pass if called w/o error
- def bmc_info(self):
+ def get_bmc_info(self):
+ """Call bmc-info, pass if called w/o error."""
logging.info('-----------------------')
logging.info('Fetching BMC information:')
try:
- self._subproc_logging(self._cmd_bmc_info)
- except self._sub_proc_excs as exc:
- self._proc_exc(exc, self.bmc_info.__qualname__)
- return 1
+ self._subproc_logging(self._cmd_get_bmc_info)
+ except self._sub_process_excs as exc:
+ self._process_exc(exc, self.get_bmc_info.__qualname__)
+ return False
else:
logging.info('- Fetched BMC information!\n')
- return 0
+ return True
- # ipmi driver discovery loop
def _ipmi_version_hlpr(self):
+ """Ipmi version discovery loop."""
regex = re.compile('^IPMI Version')
- output = self._subproc_logging(self._cmd_bmc_info)
+ output = self._subproc_logging(self._cmd_get_bmc_info)
for line in output.rstrip().split('\n'):
if re.search(regex, line):
version = (line.split(':'))[1].strip()
- return version
+ return float(version)
- # fetch ipmi version via bmc-info sdout
- # pass if ipmi version >= self._ipmi_ver
- def ipmi_version(self):
+ def chk_ipmi_version(self):
+ """Fetch ipmi version via bmc-info sdout,
+ pass if ipmi version >= self._ipmi_ver.
+ """
logging.info('-----------------------')
logging.info('Validating IPMI version:')
try:
version = self._ipmi_version_hlpr()
- logging.info(f'- IPMI version: {version}')
- except self._sub_proc_excs as exc:
- self._proc_exc(exc, self.ipmi_version.__qualname__)
- return 1
+ logging.info('- IPMI version: %.1f' % version)
+ except self._sub_process_excs as exc:
+ self._process_exc(exc, self.chk_ipmi_version.__qualname__)
+ return False
else:
- if (float(version) < float(self._ipmi_ver)):
- logging.info(f'* IPMI version below {self._ipmi_ver}!\n')
- return 1
+ if version < float(self._ipmi_ver):
+ logging.info('* IPMI version below %d!\n' % self._ipmi_ver)
+ return False
else:
- logging.info(f' IPMI version compliant!\n')
- return 0
+ logging.info(' IPMI version compliant!\n')
+ return True
- # ipmi_channel discovery loop
def _ipmi_channel_hlpr(self, i, regex, channel):
+ """get_ipmi_channel discovery loop."""
cmd = self._cmd_ipmi_channel
- if (len(cmd) > 4):
+ if len(cmd) > 3:
cmd.pop(-1)
cmd.append(str(i))
output = self._subproc_logging(cmd)
@@ -231,9 +228,10 @@ class IpmiTest(object):
channel.append(i)
return channel
- # get ipmi channel(s) in use
- # pass if user data returns after calling ipmi-config
- def ipmi_channel(self):
+ def get_ipmi_channel(self):
+ """Get ipmi channel(s) in use,
+ pass if user data returns after calling ipmi-config.
+ """
logging.info('-----------------------')
logging.info('Fetching IPMI channels:')
# support multiple channels
@@ -243,68 +241,26 @@ class IpmiTest(object):
for i in range(16):
try:
self._ipmi_channel_hlpr(i, regex, channel)
- except self._sub_proc_excs as exc:
- self._proc_exc(exc, self.ipmi_channel.__qualname__)
- return 1
- else:
- if (len(channel) > 0):
- logging.info(f'- Found {len(channel)} channel(s)!')
- logging.info(f' IPMI channel(s): {channel}\n')
- return 0
- else:
- logging.info('* Unable to fetch IPMI channel!\n')
- return 1
-
- # ipmi driver discovery loop
- def _ipmi_locate_hlpr(self, ipmi_drivers):
- regex = re.compile('driver:')
- output = self._subproc_logging(self._cmd_ipmi_locate)
- for line in output.rstrip().split('\n'):
- if re.search(regex, line):
- driver = (line.split(':'))[1].lstrip()
- ipmi_drivers.append(driver)
+ except self._sub_process_excs as exc:
+ self._process_exc(exc, self.get_ipmi_channel.__qualname__)
+ return False
+ if len(channel) > 0:
+ logging.info('- Found %d channel(s)!' % len(channel))
+ logging.info(' IPMI channel(s): %s\n' % channel)
+ return True
else:
- return ipmi_drivers
+ logging.info('* Unable to fetch IPMI channel!\n')
+ return False
- # call ipmi-locate
- # pass if drivers are loaded
- # -- update 4-29-20 --
- # ipmi-locate is throwing segfault when run on some focal
- # systems during regression testing. bladernr/Jeff has
- # filed a bug w/ FreeIPMI to address.
- # we've opted to leave this test informational for data
- # while this is being investigated.
- def ipmi_locate(self):
- logging.info('-----------------------')
- logging.info('Locating IPMI drivers:')
- # support multiple driver types
- ipmi_drivers = []
- try:
- self._ipmi_locate_hlpr(ipmi_drivers)
- except self._sub_proc_excs as exc:
- self._proc_exc(exc, self.ipmi_locate.__qualname__)
- # see above comments
- return 0
- else:
- if (len(ipmi_drivers) > 0):
- logging.info(f'- Found {len(ipmi_drivers)} IPMI driver(s)!')
- logging.info(f' IPMI driver(s) loaded: {ipmi_drivers}\n')
- return 0
- else:
- logging.info('* Unable to locate IPMI driver(s)!\n')
- # see above comments
- return 0
-
- # initialize kernel modules, run ipmi tests
def run_test(self):
+ """Initialize kernel modules, run ipmi tests."""
# load/val kernel modules
- self.kernel_mods()
- results = [self.impi_chassis(),
- self.pwr_status(),
- self.bmc_info(),
- self.ipmi_version(),
- self.ipmi_channel(),
- self.ipmi_locate()]
+ self.load_kernel_mods()
+ results = [self.get_impi_chassis(),
+ self.get_pwr_status(),
+ self.get_bmc_info(),
+ self.chk_ipmi_version(),
+ self.get_ipmi_channel()]
return results
@@ -317,31 +273,36 @@ def main():
help='suppress output')
args = parser.parse_args()
# init logging
- if ((not args.quiet) or args.debug):
+ if not args.quiet or args.debug:
logger = logging.getLogger()
logger.setLevel(logging.DEBUG)
- if (not args.quiet):
+ if not args.quiet:
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.INFO)
logger.addHandler(console_handler)
if args.debug:
console_handler.setLevel(logging.DEBUG)
- # instantiate IpmiTest as ipmi_test
- ipmi_test = IpmiTest()
- # run tests, return in [results]
- results = ipmi_test.run_test()
+ print('## Running IPMI Tests ##')
+ # instantiate FreeIpmiTest as f_ipmi_test
+ f_ipmi_test = FreeIpmiTest()
+ results = f_ipmi_test.run_test()
results_dict = {'Chassis': results[0],
'Power': results[1],
'BMC': results[2],
- 'IPMI Version': results[3],
- 'Channel': results[4],
- 'IPMI Locate': results[5]}
+ 'Version': results[3],
+ 'Channel': results[4]}
# tally results
- if (sum(results) > 0):
+ if sum(results) < len(results):
+ # transpose readable values into results_dict
+ for test, result in results_dict.items():
+ if result:
+ results_dict[test] = 'Pass'
+ else:
+ results_dict[test] = 'Fail'
print('-----------------------')
print('## IPMI tests failed! ##')
- print(f'## {results_dict} ##')
+ print('## %r ##' % results_dict)
return 1
else:
print('-----------------------')
@@ -349,6 +310,5 @@ def main():
return 0
-# call main()
if __name__ == '__main__':
sys.exit(main())