summaryrefslogtreecommitdiff
diff options
authorAdrian Lane <adrian.lane@canonical.com>2020-04-26 22:32:24 -0700
committerAdrian Lane <adrian.lane@canonical.com>2020-04-26 22:32:24 -0700
commit83eab9d81a35e6aaca90777b796d2eb89778bdbb (patch)
tree5df8df83ed44413babae07a45509da79aa58f3e2
parent41a078175bd4648ae90e03529eee2f4243941b0f (diff)
Enhanced ipmi driver retrival, cleaned up output formatting & syntax, fixed multiple helper fn logic.
-rwxr-xr-xbin/ipmi_test.py125
1 files changed, 66 insertions, 59 deletions
diff --git a/bin/ipmi_test.py b/bin/ipmi_test.py
index 9eb1282..02cba07 100755
--- a/bin/ipmi_test.py
+++ b/bin/ipmi_test.py
@@ -74,14 +74,14 @@ class IpmiTest(object):
OSError,
TypeError)
- # fetch absolute path via shutil lib w/ exception handling
+ # get absolute path
def _get_path(self, binary):
try:
path_full = shutil.which(binary)
return path_full
except (self._sub_proc_excs[-2:-1]):
- logging.info('Unable to stat path via shutil lib!')
- logging.info('Using relative paths...')
+ logging.info('* Unable to stat path via shutil lib!')
+ logging.info('* Using relative paths...')
return binary
# subprocess stdin/stderr handling
@@ -105,14 +105,14 @@ class IpmiTest(object):
def _proc_exc(self, exc, test_method):
if (type(exc) == TimeoutExpired):
logging.info(
- f'Timeout calling {test_method}!'
+ f'* Timeout calling {test_method}!'
f' ({self._subproc_timeout}s)\n')
elif (type(exc) == TypeError):
logging.info(
- f'Error calling {test_method}!'
+ f'* Error calling {test_method}!'
' Check your paths!\n')
else:
- logging.info(f'Error calling {test_method}!\n')
+ logging.info(f'* Error calling {test_method}!\n')
# kernel_mods() helper function to call modprobe
def _modprobe_hlpr(self, module):
@@ -154,7 +154,7 @@ class IpmiTest(object):
self._proc_exc(exc, self.impi_chassis.__qualname__)
return 1
else:
- logging.info('Fetched chassis status!\n')
+ logging.info('- Fetched chassis status!\n')
return 0
# get power status via ipmi chassis data
@@ -167,15 +167,57 @@ class IpmiTest(object):
output = self._subproc_logging(self._cmd_ipmi_chassis)
for line in output.rstrip().split('\n'):
if re.search(regex, line):
- logging.info('Fetched power status!\n')
+ logging.info('- Fetched power status!\n')
return 0
else:
- logging.info('Unable to retrieve power status via IPMI.\n')
+ logging.info('* Unable to retrieve power status via IPMI.\n')
return 1
except self._sub_proc_excs as exc:
self._proc_exc(exc, self.pwr_status.__qualname__)
return 1
+ # call bmc-info
+ # pass if called w/o error
+ def bmc_info(self):
+ logging.info('-----------------------')
+ logging.info('Fetching BMC information:')
+ try:
+ self._subproc_logging(self._cmd_bmc_info)
+ except self._sub_proc_excs as exc:
+ self._proc_exc(exc, self.bmc_info.__qualname__)
+ return 1
+ else:
+ logging.info('- Fetched BMC information!\n')
+ return 0
+
+ # ipmi driver discovery loop
+ def _ipmi_version_hlpr(self):
+ regex = re.compile('^IPMI Version')
+ output = self._subproc_logging(self._cmd_bmc_info)
+ for line in output.rstrip().split('\n'):
+ if re.search(regex, line):
+ version = (line.split(':'))[1].strip()
+ return version
+
+ # fetch ipmi version via bmc-info sdout
+ # pass if ipmi version >= self._ipmi_ver
+ def ipmi_version(self):
+ logging.info('-----------------------')
+ logging.info('Validating IPMI version:')
+ try:
+ version = self._ipmi_version_hlpr()
+ logging.info(f'- IPMI version: {version}')
+ except self._sub_proc_excs as exc:
+ self._proc_exc(exc, self.ipmi_version.__qualname__)
+ return 1
+ else:
+ if (float(version) < float(self._ipmi_ver)):
+ logging.info(f'* IPMI version below {self._ipmi_ver}!\n')
+ return 1
+ else:
+ logging.info(f' IPMI version compliant!\n')
+ return 0
+
# ipmi_channel discovery loop
def _ipmi_channel_hlpr(self, i, channel):
regex = re.compile('Section User')
@@ -187,14 +229,13 @@ class IpmiTest(object):
for line in output.rstrip().split('\n'):
if re.search(regex, line):
channel.append(i)
- break
- return channel
+ return channel
# get ipmi channel(s) in use
# pass if user data returns after calling ipmi-config
def ipmi_channel(self):
logging.info('-----------------------')
- logging.info('Fetching IPMI channel:')
+ logging.info('Fetching IPMI channels:')
# support multiple channels
channel = []
# test channels 0 - 15
@@ -206,47 +247,13 @@ class IpmiTest(object):
return 1
else:
if (len(channel) > 0):
- logging.info(f'Found {len(channel)} channel(s)!')
- logging.info(f'IPMI Channel(s): {channel}\n')
+ logging.info(f'- Found {len(channel)} channel(s)!')
+ logging.info(f' IPMI channel(s): {channel}\n')
return 0
else:
- logging.info('Unable to fetch IPMI channel!\n')
+ logging.info('* Unable to fetch IPMI channel!\n')
return 1
- # call bmc-info
- # pass if called w/o error
- def bmc_info(self):
- logging.info('-----------------------')
- logging.info('Fetching BMC information:')
- try:
- self._subproc_logging(self._cmd_bmc_info)
- except self._sub_proc_excs as exc:
- self._proc_exc(exc, self.bmc_info.__qualname__)
- return 1
- else:
- logging.info('Fetched BMC information!\n')
- return 0
-
- # fetch ipmi version via bmc-info sdout
- # pass if ipmi version >= self._ipmi_ver
- def ipmi_version(self):
- logging.info('-----------------------')
- logging.info('Testing IPMI version:')
- try:
- output = self._subproc_logging(self._cmd_bmc_info)
- # Prefer .index() over .find() for exception handling
- res_index = output.index('IPMI Version')
- version = output[(res_index + 24):(res_index + 27)]
- logging.info(f'IPMI Version: {version}\n')
- if (float(version) < float(self._ipmi_ver)):
- logging.info(f'IPMI Version below {self._ipmi_ver}!\n')
- return 1
- else:
- return 0
- except self._sub_proc_excs as exc:
- self._proc_exc(exc, self.ipmi_version.__qualname__)
- return 1
-
# ipmi driver discovery loop
def _ipmi_locate_hlpr(self, ipmi_drivers):
regex = re.compile('driver:')
@@ -255,7 +262,8 @@ class IpmiTest(object):
if re.search(regex, line):
driver = (line.split(':'))[1].lstrip()
ipmi_drivers.append(driver)
- return ipmi_drivers
+ else:
+ return ipmi_drivers
# call ipmi-locate
# pass if drivers are loaded
@@ -271,29 +279,27 @@ class IpmiTest(object):
return 1
else:
if (len(ipmi_drivers) > 0):
- logging.info(f'Found {len(ipmi_drivers)} IPMI driver(s)!')
- logging.info(f'IPMI driver(s) loaded: {ipmi_drivers}\n')
+ logging.info(f'- Found {len(ipmi_drivers)} IPMI driver(s)!')
+ logging.info(f' IPMI driver(s) loaded: {ipmi_drivers}\n')
return 0
else:
- logging.info('Unable to locate IPMI driver(s)!\n')
+ logging.info('* Unable to locate IPMI driver(s)!\n')
return 1
- # initialize kernel modules and run ipmi tests
+ # initialize kernel modules, run ipmi tests
def run_test(self):
# load/val kernel modules
self.kernel_mods()
- # tally results
results = [self.impi_chassis(),
self.pwr_status(),
- self.ipmi_channel(),
self.bmc_info(),
self.ipmi_version(),
+ self.ipmi_channel(),
self.ipmi_locate()]
return results
def main():
- # init logging subsystem
# instantiate argparse as parser
parser = argparse.ArgumentParser()
parser.add_argument('-d', '--debug', action='store_true',
@@ -301,6 +307,7 @@ def main():
parser.add_argument('-q', '--quiet', action='store_true',
help='suppress output')
args = parser.parse_args()
+ # init logging
if ((not args.quiet) or args.debug):
logger = logging.getLogger()
logger.setLevel(logging.DEBUG)
@@ -313,13 +320,13 @@ def main():
# instantiate IpmiTest as ipmi_test
ipmi_test = IpmiTest()
- # run test >> [results]
+ # run tests, return in [results]
results = ipmi_test.run_test()
results_dict = {'Chassis': results[0],
'Power': results[1],
- 'Channel': results[2],
'BMC': results[3],
'IPMI Version': results[4],
+ 'Channel': results[2],
'IPMI Locate': results[5]}
# tally results
if (sum(results) > 0):