diff options
26 files changed, 741 insertions, 789 deletions
diff --git a/.bumpversion.cfg b/.bumpversion.cfg index 57cb8060..9567a8eb 100644 --- a/.bumpversion.cfg +++ b/.bumpversion.cfg @@ -1,5 +1,5 @@ [bumpversion] -current_version = 0.51.0.dev0 +current_version = 0.52.0.dev0 files = manage.py parse = (?P<major>\d+)\.(?P<minor>\d+)(\.(?P<patch>\d+))?((?P<release>\.?[a-z]+)(?P<N>\d+))? serialize = diff --git a/bin/cpuid.py b/bin/cpuid.py index e9f2b96c..d0cdea91 100755 --- a/bin/cpuid.py +++ b/bin/cpuid.py @@ -78,7 +78,8 @@ is_64bit = ctypes.sizeof(ctypes.c_voidp) == 8 CPUIDS = { "Amber Lake": ['0x806e9'], "AMD EPYC": ['0x800f12'], - "AMD Opteron 6100": ['0x100f91'], + "AMD Lisbon": ['0x100f81'], + "AMD Magny-Cours": ['0x100f91'], "AMD ROME": ['0x830f10'], "Broadwell": ['0x4067', '0x306d4', '0x5066', '0x406f'], "Canon Lake": ['0x6066'], diff --git a/bin/disk_read_performance_test b/bin/disk_read_performance_test index fec27c2c..8d6d6a05 100755 --- a/bin/disk_read_performance_test +++ b/bin/disk_read_performance_test @@ -49,7 +49,7 @@ for disk in $@; do "ide" ) MIN_BUF_READ=40;; "mmc" ) MIN_BUF_READ=$DEFAULT_BUF_READ;; "nvme" ) MIN_BUF_READ=200;; - "mdadm" ) MIN_BUF_READ=500;; + "mdadm" ) MIN_BUF_READ=80;; * ) MIN_BUF_READ=$DEFAULT_BUF_READ;; esac echo "INFO: $disk_type: Using $MIN_BUF_READ MB/sec as the minimum throughput speed" diff --git a/bin/dkms_info b/bin/dkms_info.py index 9b36a282..9addc2d9 100755 --- a/bin/dkms_info +++ b/bin/dkms_info.py @@ -4,6 +4,7 @@ # Written by: # Shawn Wang <shawn.wang@canonical.com> # Zygmunt Krynicki <zygmunt.krynicki@canonical.com> +# Jonathan Cave <jonathan.cave@canonical.com> # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License version 3, @@ -30,6 +31,7 @@ supported output format: - dumps: json output (fully information) """ +import argparse import fnmatch import functools import email.parser @@ -39,14 +41,7 @@ import logging import os import subprocess import sys -import unittest -from guacamole import Command - -try: - from unittest import mock -except ImportError: - from plainbox.vendor import mock _logger = logging.getLogger(None) @@ -158,99 +153,6 @@ def match_patterns(patterns): return matched -class SystemInfoTests(unittest.TestCase): - - """Tests for System Information Parsing and Collection.""" - - _proc_modules = """\ -xt_REDIRECT 16384 3 - Live 0x0000000000000000 -nf_nat_redirect 16384 1 xt_REDIRECT, Live 0x0000000000000000 -xt_hl 16384 3 - Live 0x0000000000000000 -hid_generic 16384 0 - Live 0x0000000000000000 -usbhid 53248 0 - Live 0x0000000000000000 -hid 110592 2 hid_generic,usbhid, Live 0x0000000000000000 -overlay 45056 1 - Live 0x0000000000000000 -""" - _modalias = """\ -usb:v1D6Bp0003d0319dc09dsc00dp03ic09isc00ip00in00 -""" - - def setUp(self): - """Common setup code.""" - get_system_module_list.cache_clear() - get_system_modaliases.cache_clear() - - @mock.patch('io.open', mock.mock_open(read_data=_proc_modules)) - def test_get_module_list__calls_and_parses_lsmod(self): - """Ensure that get_module_list() parses lsmod output.""" - # NOTE: Return value was loaded from my system running kernel 4.0. - # The first few and last rows to be precise. - modules = get_system_module_list() - self.assertEqual(modules, [ - 'xt_REDIRECT', 'nf_nat_redirect', 'xt_hl', 'hid_generic', - 'usbhid', 'hid', 'overlay']) - - @mock.patch('io.open', mock.mock_open(read_data=_proc_modules)) - def test_get_module_list_is_cached(self): - """Ensure that get_module_list() cache works.""" - modules1 = get_system_module_list() - modules2 = get_system_module_list() - self.assertIn('xt_REDIRECT', modules1) - self.assertIn('overlay', modules2) - self.assertEqual(modules1, modules2) - - @mock.patch('os.walk') - @mock.patch('io.open', mock.mock_open(read_data=_modalias)) - def test_get_system_modalias(self, mock_os_walk): - """test_get_system_modalias.""" - mock_os_walk.return_value = [ - ("/sys/devices/pci0000:00/0000:00:14.0/usb2/2-0:1.0/modalias", - ["driver", "subsystem"], - ["modalias", "uevent"]), - ] - - """fetch hw_modaliases from machine and check modalis types.""" - modaliases = get_system_modaliases() - self.assertEqual(len(modaliases), 1) - self.assertIn("usb", modaliases) - - @mock.patch('os.uname') - @mock.patch('os.walk') - def test_get_installed_dkms_modules(self, mock_os_walk, mock_os_uname): - """test_get_installed_dkms_modules.""" - mock_os_walk.return_value = [ - ("/var/lib/dkms/hello/0.1", - ["3.19.0-15-generic", "build", "source"], - []), - ] - o = mock.Mock() - o.release = "3.19.0-15-generic" - mock_os_uname.return_value = o - self.assertEqual([['hello', '0.1']], - get_installed_dkms_modules()) - - @mock.patch('__main__.get_system_modaliases') - def test_match_patterns(self, mock_get_system_modaliases): - """Test of match_patterns.""" - mock_get_system_modaliases.return_value = { - "pci": ["v0000168Cd00000036sv0000103Csd0000217Fbc02sc80i00", - "v00008086d00008C26sv0000103Csd000022D9bc0Csc03i20"], - "usb": ["v8087p8000d0005dc09dsc00dp01ic09isc00ip00in00", - "v1D6Bp0002d0319dc09dsc00dp00ic09isc00ip00in00"], - } - pkg_modalieses = ["pci:v00008086d00008C26sv*sd*bc*sc*i*", - "usb:v07B4p010Ad0102dc*dsc*dp*ic*isc*ip*in*", - "oemalias:test"] - matched_modalieses = match_patterns(tuple(pkg_modalieses)) - # match_patterns - self.assertIn("pci:v00008086d00008C26sv*sd*bc*sc*i*", - matched_modalieses) - self.assertIn("oemalias:test", - matched_modalieses) - self.assertNotIn("usb:v07B4p010Ad0102dc*dsc*dp*ic*isc*ip*in*", - matched_modalieses) - - class DkmsPackage(object): """ @@ -295,9 +197,9 @@ class DkmsPackage(object): if not fn.endswith(".list"): continue with io.open(os.path.join(dpkg_info_root, fn), 'rt', - encoding='UTF-8') as stream: + encoding='UTF-8') as stream: if path in stream.read(): - return fn[:-len(".list")] + return fn[:-len(".list")] return None def _list_modules(self): @@ -350,6 +252,7 @@ class DkmsPackage(object): install_mods[m] = match_patterns(tuple(aliases)) return install_mods + def _headers_to_dist(pkg_str): """ Convert rft822 headers string to dict. @@ -366,6 +269,7 @@ def _headers_to_dist(pkg_str): target[key.lower()] = header[key] return target + class DebianPackageHandler(object): """Use rtf822(email) to handle the package information from file_object.""" @@ -396,7 +300,6 @@ class DebianPackageHandler(object): for pkg_str in self._file_object.read().split('\n\n'): yield pkg_str - def _get_device_pkgs(self): """ Only device packages have debian package header 'Modaliases'. @@ -409,23 +312,21 @@ class DebianPackageHandler(object): """ _logger.info("Looking for packages providing modaliases") - _modalias_pkgs = {} - target_str = "" result = {} for pkg_str in self._gen_all_pkg_strs(): for pkg in self.extra_pkgs: - if pkg.pkg_name is None: - continue - pstr = "Package: {}".format(pkg.pkg_name) - if pstr in pkg_str: - _logger.info("Gathering information of package, {}".format( - pkg.pkg_name)) - pkg.pkg = _headers_to_dist(pkg_str) - break + if pkg.pkg_name is None: + continue + pstr = "Package: {}".format(pkg.pkg_name) + if pstr in pkg_str: + _logger.info("Gathering information of package, {}".format( + pkg.pkg_name)) + pkg.pkg = _headers_to_dist(pkg_str) + break else: - if "Modaliases:" in pkg_str: + if "Modaliases:" in pkg_str: pkg = _headers_to_dist(pkg_str) (modalias_header, pattern_str) = \ @@ -434,15 +335,17 @@ class DebianPackageHandler(object): patterns.sort() pkg['match_patterns'] = match_patterns(tuple(patterns)) - with io.open("/var/lib/dpkg/info/{}.list".format(pkg['package']), - 'rt', encoding='UTF-8') as stream: + dpkgf = "/var/lib/dpkg/info/{}.list".format(pkg['package']) + with io.open(dpkgf, 'rt', encoding='UTF-8') as stream: if "/dkms.conf" in stream.read(): - pkg['unused_dkms'] = True + pkg['unused_dkms'] = True result[pkg['package']] = pkg return result def to_json(self): - return json.dumps({ "dkms": self.extra_pkgs, "non-dkms": self.pkgs }, default=lambda o: o.__dict__, sort_keys=True, indent=4) + return json.dumps( + {"dkms": self.extra_pkgs, "non-dkms": self.pkgs}, + default=lambda o: o.__dict__, sort_keys=True, indent=4) def to_outline(self): result = "" @@ -467,48 +370,7 @@ class DebianPackageHandler(object): return result -class DebianPackageHandlerTest(unittest.TestCase): - - """Test of DebianPackageHandler.""" - - _var_lib_dpkg_status = """\ -Package: foo -Status: install ok installed -Modaliases: hwe(pci:v000099DDd00000036sv*sd*bc*sc*i*) - -Package: foo1 -Status: install ok installed -Modaliases: hwe(pci:v0000AADDd00000036sv*sd*bc*sc*i*) - -Package: foo2 -Status: install ok installed - -Package: foo3 -Status: install ok installed - -Package: bar -Status: install ok installed - -""" - - - @mock.patch('io.open', mock.mock_open(read_data=_var_lib_dpkg_status)) - @mock.patch('__main__.get_system_modaliases') - def test_get_pkgs(self, mock_get_system_modaliases): - """Test of test_get_pkgs.""" - mock_get_system_modaliases.return_value = { - "pci": ["v0000168Cd00000036sv0000103Csd0000217Fbc02sc80i00", - "v00008086d00008C26sv0000103Csd000022D9bc0Csc03i20"], - "usb": ["v8087p8000d0005dc09dsc00dp01ic09isc00ip00in00", - "v1D6Bp0002d0319dc09dsc00dp00ic09isc00ip00in00"], - } - - self.pkg_handler = DebianPackageHandler( - file_object=io.StringIO(self._var_lib_dpkg_status)) - self.assertEqual(len(self.pkg_handler.pkgs), 2) - - -class DeviceInfo(Command): +class DeviceInfo(): """ Implementation of the dkms-info command. @@ -529,37 +391,35 @@ class DeviceInfo(Command): - dumps: json output (fully information) """ - def register_arguments(self, parser): - """Register command line arguments for dkms-info.""" - + def main(self): + """Invoke dkms-info.""" + parser = argparse.ArgumentParser() parser.add_argument( '--format', default="onelines", choices=["summary", "json"], help=("Choose output format type: " "summary (one line per packages) " "or json (json format, fully information)")) - parser.add_argument( '--output', default=None, help=("Output filename to store the output date")) + args = parser.parse_args() - def invoked(self, ctx): - """Invoke dkms-info.""" logging.basicConfig( level=logging.INFO, format='[%(relativeCreated)06dms] %(message)s') _logger.info("Started") - dkms_pkgs = [] + dkms_pkgs = [] for (dkms_name, dkms_ver) in get_installed_dkms_modules(): dkms_pkg = DkmsPackage(dkms_name, dkms_ver) dkms_pkgs.append(dkms_pkg) output = sys.stdout - if ctx.args.output is not None: - output = open(ctx.args.output, 'wt', encoding='UTF-8') + if args.output is not None: + output = open(args.output, 'wt', encoding='UTF-8') pkg_handler = DebianPackageHandler(extra_pkgs=dkms_pkgs) - if ctx.args.format == "summary": + if args.format == "summary": output.write(pkg_handler.to_outline()) else: output.write(pkg_handler.to_json()) @@ -567,8 +427,4 @@ class DeviceInfo(Command): if __name__ == '__main__': - if '--test' in sys.argv: - sys.argv.remove('--test') - unittest.main() - else: - DeviceInfo().main() + DeviceInfo().main() diff --git a/bin/dmi-sysfs-resource b/bin/dmi-sysfs-resource deleted file mode 100755 index 67410307..00000000 --- a/bin/dmi-sysfs-resource +++ /dev/null @@ -1,46 +0,0 @@ -#!/usr/bin/env python3 -# Copyright 2015 Canonical Ltd. -# All rights reserved. -# -# Written by: -# Zygmunt Krynicki <zygmunt.krynicki@canonical.com> - -"""Collect information about all sysfs attributes related to DMI.""" - -import os - -import guacamole - - -class dmi_sysfs_resource(guacamole.Command): - - """ - Collect information about all sysfs attributes related to DMI. - - This program reads all the readable files in /sys/class/dmi/id/ and - presents them a single RFC822 record. - - @EPILOG@ - - Unreadable files (typically due to permissions) are silently skipped. - Please run this program as root if you wish to access various serial - numbers. - """ - - def invoked(self, ctx): - sysfs_root = '/sys/class/dmi/id/' - if not os.path.isdir(sysfs_root): - return - for dmi_attr in sorted(os.listdir(sysfs_root)): - dmi_filename = os.path.join(sysfs_root, dmi_attr) - if not os.path.isfile(dmi_filename): - continue - if not os.access(dmi_filename, os.R_OK): - continue - with open(dmi_filename, 'rt', encoding='utf-8') as stream: - dmi_data = stream.read().strip() - print("{}: {}".format(dmi_attr, dmi_data)) - - -if __name__ == "__main__": - dmi_sysfs_resource().main() diff --git a/bin/dmi_sysfs_resource.py b/bin/dmi_sysfs_resource.py new file mode 100755 index 00000000..a098976c --- /dev/null +++ b/bin/dmi_sysfs_resource.py @@ -0,0 +1,43 @@ +#!/usr/bin/env python3 +# Copyright 2015-2020 Canonical Ltd. +# All rights reserved. +# +# Written by: +# Zygmunt Krynicki <zygmunt.krynicki@canonical.com> +# Jonathan Cave <jonathan.cave@canonical.com> + +"""Collect information about all sysfs attributes related to DMI.""" + +import os + +""" +Collect information about all sysfs attributes related to DMI. + +This program reads all the readable files in /sys/class/dmi/id/ and +presents them a single RFC822 record. + +@EPILOG@ + +Unreadable files (typically due to permissions) are silently skipped. +Please run this program as root if you wish to access various serial +numbers. +""" + + +def main(): + sysfs_root = '/sys/class/dmi/id/' + if not os.path.isdir(sysfs_root): + return + for dmi_attr in sorted(os.listdir(sysfs_root)): + dmi_filename = os.path.join(sysfs_root, dmi_attr) + if not os.path.isfile(dmi_filename): + continue + if not os.access(dmi_filename, os.R_OK): + continue + with open(dmi_filename, 'rt', encoding='utf-8') as stream: + dmi_data = stream.read().strip() + print("{}: {}".format(dmi_attr, dmi_data)) + + +if __name__ == "__main__": + main() diff --git a/bin/module_loaded_test b/bin/module_loaded_test.py index 028d70b8..b23fc631 100755 --- a/bin/module_loaded_test +++ b/bin/module_loaded_test.py @@ -1,5 +1,5 @@ #!/usr/bin/env python3 -# Copyright 2015 Canonical Ltd. +# Copyright 2015-2020 Canonical Ltd. # All rights reserved. # # Written by: @@ -10,31 +10,32 @@ Script to test if a module is loaded in the running kernel. If not the module will be loaded. This can be disabled using a flag. """ +import argparse import subprocess -from guacamole import Command - -class ModuleLoadedTest(Command): +class ModuleLoadedTest(): """Check if a kernel module is loaded. If not load it.""" - def register_arguments(self, parser): + def main(self): + parser = argparse.ArgumentParser() parser.add_argument("module", help="Specify the module to test for.") - parser.add_argument("-n", "--no-load", action="store_true", help=( "Don't try and load the module just test.")) + args = parser.parse_args() - def invoked(self, context): - if self.is_module_loaded(context.args.module): + if self.is_module_loaded(args.module): return # module not loaded - if context.args.no_load: - return 1 + if args.no_load: + raise SystemExit("ERROR: module {} not loaded in running" + " kernel".format(args.module)) # attempt a load of the module - if not self.load_module(context.args.module): - return 1 + if not self.load_module(args.module): + raise SystemExit( + "ERROR: attempt to load module {} failed".format(args.module)) def is_module_loaded(self, module): with open('/proc/modules', 'r') as modules: diff --git a/bin/recovery_info b/bin/recovery_info deleted file mode 100755 index b2feedb0..00000000 --- a/bin/recovery_info +++ /dev/null @@ -1,399 +0,0 @@ -#!/usr/bin/env python3 -# Copyright 2015 Canonical Ltd. -# Written by: -# Shawn Wang <shawn.wang@canonical.com> -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License version 3, -# as published by the Free Software Foundation. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see <http://www.gnu.org/licenses/>. - -"""Show the recovery partition information for the preinstalled OS.""" - -import os -import re -import subprocess -import sys -import tempfile -import unittest -import xml.dom.minidom as minidom - -from guacamole import Command - -try: - from unittest import mock -except ImportError: - from plainbox.vendor import mock - -RECOVERY_PACKAGES = ["dell-recovery", "ubuntu-recovery"] - - -def get_recovery_package(): - """ - Test with RECOVERY_PACKAGES. - - to check recovery application is installed or not - - :return: - string of package_version or None - """ - for pkg in RECOVERY_PACKAGES: - output = subprocess.check_output(["apt-cache", "policy", pkg], - universal_newlines=True) - for line in output.split("\n"): - if line.startswith(" Installed:"): - ver = line.split(": ")[1] - return "{}_{}".format(pkg, ver.strip()) - return None - - -RECOVERY_LABELS = {"HP_TOOLS": "HP", - "PQSERVICE": "UBUNTU", - "BACKUP": "TEST", - "INSTALL": "DELL", - "OS": "DELL", - "RECOVERY": "DELL"} - - -_escape_pattern = re.compile(r'\\x([0-9a-fA-F][0-9a-fA-F])') - - -def lsblk_unescape(label): - """Un-escape text escaping done by lsblk(8).""" - return _escape_pattern.sub( - lambda match: chr(int(match.group(1), 16)), label) - - -def get_recovery_partition(): - """ - Get the type and location of the recovery partition. - - :return: - (recovery_type, recovery_partition) or None - - Use lsblk(8) to inspect available block devices looking - for a partition with FAT or NTFS and a well-known label. - """ - cmd = ['lsblk', '-o', 'TYPE,FSTYPE,NAME,LABEL', '--raw'] - for line in subprocess.check_output(cmd).splitlines()[1:]: - type, fstype, name, label = line.split(b' ', 3) - # Skip everything but partitions - if type != b'part': - continue - # Skip everything but FAT and NTFS - if fstype != b'vfat' and fstype != b'ntfs': - continue - label = lsblk_unescape(label.decode('utf-8')) - recovery_type = RECOVERY_LABELS.get(label) - # Skip unknown labels - if recovery_type is None: - continue - recovery_partition = '/dev/{}'.format(name.decode('utf-8')) - return (recovery_type, recovery_partition) - - -class FunctionTests(unittest.TestCase): - - """Tests for several functions.""" - - @mock.patch('subprocess.check_output') - def test_get_recovery_package(self, mock_subprocess_check_output): - """Smoke test for get_recovery_package().""" - mock_subprocess_check_output.return_value = """\ -dell-recovery: - Installed: 1.11 - Candidate: 1.11 - Version table: - 1.11 - 500 https://archive/cesg-mirror/ test/public amd64 Packages -""" - self.assertEqual(get_recovery_package(), - "dell-recovery_1.11") - - @mock.patch('subprocess.check_output') - def test_get_recovery_partition(self, mock_subprocess_check_output): - """Smoke test for get_recovery_partition().""" - mock_subprocess_check_output.return_value = ( - b'TYPE FSTYPE NAME LABEL\n' - b'disk linux_raid_member sda fx:2x250GB\n' - b'raid1 bcache md127 \n' - b'disk ext4 bcache0 Ultra\n' - b'disk linux_raid_member sdb fx:2x250GB\n' - b'raid1 bcache md127 \n' - b'disk ext4 bcache0 Ultra\n' - b'disk sdc \n' - b'part btrfs sdc1 vol1\n' - b'disk sdd \n' - b'part ntfs sdd1 Windows\x208.1\n' - b'part sdd2 \n' - b'part ext4 sdd5 Utopic\n' - b'part swap sdd6 \n' - b'disk bcache sde \n' - b'disk ext4 bcache0 Ultra\n' - b'disk sdf \n' - b'part ntfs sda3 RECOVERY\n') - self.assertEqual(get_recovery_partition(), ("DELL", "/dev/sda3")) - - def test_lsblk_unescape(self): - """Smoke tests for lsblk_unescape().""" - self.assertEqual(lsblk_unescape('Windows\\x208.1'), 'Windows 8.1') - self.assertEqual(lsblk_unescape('Windows XP'), 'Windows XP') - - -class MountedPartition(object): - - """ - Mount Manager to mount partition on tempdir. - - e.g. - with MountedPartition("/dev/sda1") as tmp: - print("This is the mount point: {}".format(tmp)) - do_stuff() - """ - - def __init__(self, part): - """ - Prepare the mntdir point. - - :param part: string of the partition device file, like /dev/sda2 - """ - self.part = part - self.mntdir = tempfile.mkdtemp() - - def __enter__(self): - """ - __enter__ method for python's with statement. - - Mount the partition device to the mntdir. - """ - cmd = ["mount", self.part, self.mntdir] - subprocess.check_output(cmd, universal_newlines=True) - return self.mntdir - - def __exit__(self, type, value, traceback): - """ - __exit__ method for python's with statement. - - Unmount and remove the mntdir. - """ - subprocess.check_output(["umount", self.mntdir], - universal_newlines=True) - os.rmdir(self.mntdir) - - -class MountedPartitionTests(unittest.TestCase): - - """Unittest of MountedPartition.""" - - @mock.patch('subprocess.check_output') - def test_with_of_MountedPartition(self, mock_subprocess_check_output): - """Test mount point.""" - test_dir = "" - with MountedPartition("/dev/test") as tmp: - test_dir = tmp - self.assertTrue(os.path.exists(test_dir)) - mock_subprocess_check_output.assert_has_calls( - [mock.call(['mount', '/dev/test', test_dir], - universal_newlines=True)]) - self.assertFalse(os.path.exists(test_dir)) - mock_subprocess_check_output.assert_has_calls( - [mock.call(['umount', test_dir], - universal_newlines=True)]) - - -class RecoveryVersion(Command): - - """ - print the version of recovery image. - - @EPILOG@ - - This commands prints information such as: - - image_version: xxx - bto_version: REV_xxx.iso (dell only) - """ - - def invoked(self, ctx): - """ - Guacamole method called when the command is invoked. - - /etc/buildstamp is a image information file, - it created by the oem image builder. - - oilpalm Fri, 20 Jun 2014 04:02:07 +0000 - somerville-trusty-amd64-20140620-0 - - If /etc/buildstamp exist, print out the second line (image iso name). - - For Dell-recovery partition, /etc/buildstamp shows base image info. - If recovery_partition/bto.xml, - print out the bto_version (read from xml file). - """ - if os.path.isfile("/etc/buildstamp"): - with open('/etc/buildstamp', 'rt', encoding='UTF-8') as stream: - data = stream.readlines() - print("image_version: {}".format(data[1].strip())) - - with MountedPartition(ctx.recovery_partition) as mntdir: - fname = "{}/bto.xml".format(mntdir) - if os.path.isfile(fname): - o = minidom.parse("{}/bto.xml".format(mntdir)) - bto_platform = o.getElementsByTagName("platform") - bto_revision = o.getElementsByTagName("revision") - if bto_platform and bto_revision: - bto_platform = bto_platform[0].firstChild.data - bto_revision = bto_revision[0].firstChild.data - bto_version = bto_platform + " " + bto_revision - else: - bto_iso = o.getElementsByTagName("iso") - bto_version = bto_iso[0].firstChild.data - print("bto_version: {}".format(bto_version)) - - -class RecoveryFile(Command): - - """ - display a single file from the recovery partition - - This command can be used to ``cat`` any file from the recovery partition - """ - - def register_arguments(self, parser): - """ - Guacamole method used by the argparse ingredient. - - :param parser: - Argument parser (from :mod:`argparse`) specific to this command. - """ - parser.add_argument('file', help='name of the file to display') - - def invoked(self, ctx): - """ - Guacamole method used by the command ingredient. - - :param ctx: - The guacamole context object. Context provides access to all - features of guacamole. The argparse ingredient adds the ``args`` - attribute to it. That attribute contains the result of parsing - command line arguments. - :returns: - The return code of the command. Guacamole translates ``None`` to a - successful exit status (return code zero). - """ - with MountedPartition(ctx.recovery_partition) as mnt: - return subprocess.call([ - 'cat', '--', os.path.join(mnt, ctx.args.file)]) - - -class RecoveryCheckType(Command): - - """ - test if the recovery partition is of the given type. - - This command can be used for scripted tests, to see if the recovery - partition on the current system is of a concrete type or not (e.g. - DELL-specific) - - @EPILOG@ - - The exit code is 0 if the recovery partition type matches and 1 otherwise. - """ - - def register_arguments(self, parser): - """ - Guacamole method used by the argparse ingredient. - - :param parser: - Argument parser (from :mod:`argparse`) specific to this command. - """ - parser.add_argument( - 'type', help="expected type of the recovery partition") - - def invoked(self, ctx): - """ - Guacamole method used by the command ingredient. - - :param ctx: - The guacamole context object. Context provides access to all - features of guacamole. The argparse ingredient adds the ``args`` - attribute to it. That attribute contains the result of parsing - command line arguments. - :returns: - The return code of the command. Guacamole translates ``None`` to a - successful exit status (return code zero). - """ - if ctx.recovery_type != ctx.args.type: - return 1 - - -class RecoveryInfo(Command): - - """ - Inspect the recovery partition. - - This command can be used to inspect the recovery partition. It has several - sub-commands that do various tasks. If the system has no recovery - partition, the command exits with the error code 1. - """ - - sub_commands = ( - ('version', RecoveryVersion), - ('file', RecoveryFile), - ('checktype', RecoveryCheckType), - ) - - def invoked(self, ctx): - """ - Guacamole method used by the command ingredient. - - :param ctx: - The guacamole context object. Context provides access to all - features of guacamole. The argparse ingredient adds the ``args`` - attribute to it. That attribute contains the result of parsing - command line arguments. - :returns: - The return code of the command. Guacamole translates ``None`` to a - successful exit status (return code zero). - """ - partition = get_recovery_partition() - - if partition is None: - print("Recovery partition not found", file=sys.stderr) - return 1 - (recovery_type, recovery_partition) = partition - ctx.recovery_partition = recovery_partition - ctx.recovery_type = recovery_type - - -class RecoveryInfoTests(unittest.TestCase): - - """Tests for RecoveryInfo.""" - - @mock.patch('__main__.get_recovery_package') - @mock.patch('__main__.get_recovery_partition') - def test_smoke(self, mock_get_recovery_partition, - mock_get_recovery_package): - """Smoke tests for running recovery_info.""" - mock_get_recovery_partition.return_value = ("DELL", "/dev/sda3") - mock_get_recovery_package.return_value = "dell-recovery_1.11" - self.assertEqual(RecoveryInfo().main(argv=[], exit=False), 0) - self.assertEqual( - RecoveryInfo().main(argv=["checktype", "HP"], exit=False), 1) - self.assertEqual( - RecoveryInfo().main(argv=["checktype", "DELL"], exit=False), 0) - - -if __name__ == '__main__': - if '--test' in sys.argv: - sys.argv.remove('--test') - unittest.main() - else: - RecoveryInfo().main() diff --git a/bin/recovery_info.py b/bin/recovery_info.py new file mode 100755 index 00000000..5ce1ba91 --- /dev/null +++ b/bin/recovery_info.py @@ -0,0 +1,206 @@ +#!/usr/bin/env python3 +# Copyright 2015-2020 Canonical Ltd. +# Written by: +# Shawn Wang <shawn.wang@canonical.com> +# Jonathan Cave <jonathan.cave@canonical.com> +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License version 3, +# as published by the Free Software Foundation. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +"""Show the recovery partition information for the preinstalled OS.""" + +import os +import re +import subprocess +import sys +import tempfile + +import xml.dom.minidom as minidom + + +RECOVERY_PACKAGES = ["dell-recovery", "ubuntu-recovery"] + + +def get_recovery_package(): + """ + Test with RECOVERY_PACKAGES. + + to check recovery application is installed or not + + :return: + string of package_version or None + """ + for pkg in RECOVERY_PACKAGES: + output = subprocess.check_output(["apt-cache", "policy", pkg], + universal_newlines=True) + for line in output.split("\n"): + if line.startswith(" Installed:"): + ver = line.split(": ")[1] + return "{}_{}".format(pkg, ver.strip()) + return None + + +RECOVERY_LABELS = {"HP_TOOLS": "HP", + "PQSERVICE": "UBUNTU", + "BACKUP": "TEST", + "INSTALL": "DELL", + "OS": "DELL", + "RECOVERY": "DELL"} + + +_escape_pattern = re.compile(r'\\x([0-9a-fA-F][0-9a-fA-F])') + + +def lsblk_unescape(label): + """Un-escape text escaping done by lsblk(8).""" + return _escape_pattern.sub( + lambda match: chr(int(match.group(1), 16)), label) + + +def get_recovery_partition(): + """ + Get the type and location of the recovery partition. + + :return: + (recovery_type, recovery_partition) or None + + Use lsblk(8) to inspect available block devices looking + for a partition with FAT or NTFS and a well-known label. + """ + cmd = ['lsblk', '-o', 'TYPE,FSTYPE,NAME,LABEL', '--raw'] + for line in subprocess.check_output(cmd).splitlines()[1:]: + type, fstype, name, label = line.split(b' ', 3) + # Skip everything but partitions + if type != b'part': + continue + # Skip everything but FAT and NTFS + if fstype != b'vfat' and fstype != b'ntfs': + continue + label = lsblk_unescape(label.decode('utf-8')) + recovery_type = RECOVERY_LABELS.get(label) + # Skip unknown labels + if recovery_type is None: + continue + recovery_partition = '/dev/{}'.format(name.decode('utf-8')) + return (recovery_type, recovery_partition) + + +class MountedPartition(object): + + """ + Mount Manager to mount partition on tempdir. + + e.g. + with MountedPartition("/dev/sda1") as tmp: + print("This is the mount point: {}".format(tmp)) + do_stuff() + """ + + def __init__(self, part): + """ + Prepare the mntdir point. + + :param part: string of the partition device file, like /dev/sda2 + """ + self.part = part + self.mntdir = tempfile.mkdtemp() + + def __enter__(self): + """ + __enter__ method for python's with statement. + + Mount the partition device to the mntdir. + """ + cmd = ["mount", self.part, self.mntdir] + subprocess.check_output(cmd, universal_newlines=True) + return self.mntdir + + def __exit__(self, type, value, traceback): + """ + __exit__ method for python's with statement. + + Unmount and remove the mntdir. + """ + subprocess.check_output(["umount", self.mntdir], + universal_newlines=True) + os.rmdir(self.mntdir) + + +class RecoveryInfo(): + + """ + Inspect the recovery partition. + + This command can be used to inspect the recovery partition. It has several + sub-commands that do various tasks. If the system has no recovery + partition, the command exits with the error code 1. + """ + + def main(self): + partition = get_recovery_partition() + if len(sys.argv) == 1: + # no subcommand == detect test + if partition is None: + raise SystemExit("FAIL: Recovery partition not found") + else: + print("Found recovery partiion") + return + + (recovery_type, recovery_partition) = partition + + subcommand = sys.argv[1] + sub_commands = ('version', 'file', 'checktype') + if subcommand not in sub_commands: + raise SystemExit("ERROR: unexpected subcommand") + + if subcommand == "checktype": + if len(sys.argv) != 3: + raise SystemExit( + "ERROR: recovery_info.py checktype EXPECTED_TYPE") + expected_type = sys.argv[2] + if recovery_type != expected_type: + raise SystemExit("FAIL: expected {}, found {}".format( + expected_type, recovery_type)) + + if subcommand == "file": + if len(sys.argv) != 3: + raise SystemExit( + "ERROR: recovery_info.py file FILE") + file = sys.argv[2] + with MountedPartition(recovery_partition) as mnt: + return subprocess.call([ + 'cat', '--', os.path.join(mnt, file)]) + + if subcommand == "version": + if os.path.isfile("/etc/buildstamp"): + with open('/etc/buildstamp', 'rt', encoding='UTF-8') as stream: + data = stream.readlines() + print("image_version: {}".format(data[1].strip())) + + with MountedPartition(recovery_partition) as mntdir: + fname = "{}/bto.xml".format(mntdir) + if os.path.isfile(fname): + o = minidom.parse("{}/bto.xml".format(mntdir)) + bto_platform = o.getElementsByTagName("platform") + bto_revision = o.getElementsByTagName("revision") + if bto_platform and bto_revision: + bto_platform = bto_platform[0].firstChild.data + bto_revision = bto_revision[0].firstChild.data + bto_version = bto_platform + " " + bto_revision + else: + bto_iso = o.getElementsByTagName("iso") + bto_version = bto_iso[0].firstChild.data + print("bto_version: {}".format(bto_version)) + + +if __name__ == '__main__': + RecoveryInfo().main() diff --git a/bin/snap_tests.py b/bin/snap_tests.py index f548db89..fe80288e 100755 --- a/bin/snap_tests.py +++ b/bin/snap_tests.py @@ -1,5 +1,5 @@ #!/usr/bin/env python3 -# Copyright 2015-2019 Canonical Ltd. +# Copyright 2015-2020 Canonical Ltd. # All rights reserved. # # Written by: @@ -15,7 +15,16 @@ from checkbox_support.snap_utils.snapd import Snapd # - the snap must not be installed at the start of the nested test plan # - the snap must be strictly confined (no classic or devmode flags) # - there must be different revisions on the stable & edge channels -TEST_SNAP = os.getenv('TEST_SNAP', 'test-snapd-tools') +try: + TEST_SNAP = os.environ['TEST_SNAP'] +except KeyError: + runtime = os.getenv('CHECKBOX_RUNTIME', '/snap/checkbox/current') + if 'checkbox18' in runtime: + TEST_SNAP = 'test-snapd-tools-core18' + elif 'checkbox20' in runtime: + TEST_SNAP = 'test-snapd-tools-core20' + else: + TEST_SNAP = 'test-snapd-tools' SNAPD_TASK_TIMEOUT = int(os.getenv('SNAPD_TASK_TIMEOUT', 30)) SNAPD_POLL_INTERVAL = int(os.getenv('SNAPD_POLL_INTERVAL', 1)) @@ -60,7 +69,7 @@ class SnapInstall(): parser.add_argument('channel', help='channel to install from') args = parser.parse_args(sys.argv[2:]) print('Install {}...'.format(TEST_SNAP)) - s = Snapd(SNAPD_TASK_TIMEOUT, SNAPD_POLL_INTERVAL) + s = Snapd(SNAPD_TASK_TIMEOUT, SNAPD_POLL_INTERVAL, verbose=True) s.install(TEST_SNAP, args.channel) print('Confirm in snap list...') data = s.list() diff --git a/bin/storage_test.py b/bin/storage_test.py index 4f363bc9..1481a0dd 100755 --- a/bin/storage_test.py +++ b/bin/storage_test.py @@ -30,7 +30,7 @@ def find_largest_partition(device): out = sp.check_output(cmd, shell=True) blk_devs = [BlkDev(*p.strip().split()) for p in out.decode(sys.stdout.encoding).splitlines()] - blk_devs[:] = [bd for bd in blk_devs if bd.type == 'part'] + blk_devs[:] = [bd for bd in blk_devs if bd.type in ('part', 'md')] if not blk_devs: raise SystemExit( 'ERROR: No partitions found on device {}'.format(device)) diff --git a/bin/tpm-sysfs-resource b/bin/tpm-sysfs-resource deleted file mode 100755 index 34f81b57..00000000 --- a/bin/tpm-sysfs-resource +++ /dev/null @@ -1,68 +0,0 @@ -#!/usr/bin/env python3 -# Copyright 2015 Canonical Ltd. -# All rights reserved. -# -# Written by: -# Zygmunt Krynicki <zygmunt.krynicki@canonical.com> - - -"""Collect information about all sysfs attributes related to TPM.""" - -import os - -import guacamole - - -class tpm_sysfs_resource(guacamole.Command): - - """ - Collect information about all sysfs attributes related to TPM. - - This program traverses all the TPM device nodes found in /sys/class/tpm/. - Each present device is subsequently inspected by reading all readable files - in /sys/class/tpm/*/device/* and presenting the data present there as - subsequent RFC822 records. There is one record per TPM chip. In order to - differentiate each chip, each record contains the field x-sysfs-device-name - that stores the full sysfs directory name of the chip. - - @EPILOG@ - - Unreadable files (typically due to permissions) are silently skipped. - """ - - def invoked(self, ctx): - # This is found on 4.2 kernels - sysfs_root_tpm = '/sys/class/tpm/' - # This is found on 3.19 kernels - sysfs_root_misc = '/sys/class/misc/' - if os.path.isdir(sysfs_root_tpm): - sysfs_root = sysfs_root_tpm - elif os.path.isdir(sysfs_root_misc): - sysfs_root = sysfs_root_misc - else: - return - for tpm_id in sorted(os.listdir(sysfs_root)): - if sysfs_root == sysfs_root_misc and not tpm_id.startswith('tpm'): - continue - print("x-sysfs-device-name: {}".format(tpm_id)) - tpm_dirname = os.path.join(sysfs_root, tpm_id, 'device') - for tpm_attr in sorted(os.listdir(tpm_dirname)): - tpm_filename = os.path.join(tpm_dirname, tpm_attr) - if not os.path.isfile(tpm_filename): - continue - if not os.access(tpm_filename, os.R_OK): - continue - with open(tpm_filename, 'rt', encoding='utf-8') as stream: - tpm_data = stream.read() - tpm_data = tpm_data.rstrip() - if '\n' in tpm_data: - print("{}:".format(tpm_attr)) - for tpm_data_chunk in tpm_data.splitlines(): - print(" {}".format(tpm_data_chunk)) - else: - print("{}: {}".format(tpm_attr, tpm_data)) - print() - - -if __name__ == "__main__": - tpm_sysfs_resource().main() diff --git a/bin/tpm_sysfs_resoource.py b/bin/tpm_sysfs_resoource.py new file mode 100755 index 00000000..2f8c256a --- /dev/null +++ b/bin/tpm_sysfs_resoource.py @@ -0,0 +1,62 @@ +#!/usr/bin/env python3 +# Copyright 2015-2020 Canonical Ltd. +# All rights reserved. +# +# Written by: +# Zygmunt Krynicki <zygmunt.krynicki@canonical.com> +# Jonathan Cave <jonathan.cave@canonical.com> + +""" +Collect information about all sysfs attributes related to TPM. + +This program traverses all the TPM device nodes found in /sys/class/tpm/. +Each present device is subsequently inspected by reading all readable files +in /sys/class/tpm/*/device/* and presenting the data present there as +subsequent RFC822 records. There is one record per TPM chip. In order to +differentiate each chip, each record contains the field x-sysfs-device-name +that stores the full sysfs directory name of the chip. + +@EPILOG@ + +Unreadable files (typically due to permissions) are silently skipped. +""" + +import os + + +def main(): + # This is found on 4.2 kernels + sysfs_root_tpm = '/sys/class/tpm/' + # This is found on 3.19 kernels + sysfs_root_misc = '/sys/class/misc/' + if os.path.isdir(sysfs_root_tpm): + sysfs_root = sysfs_root_tpm + elif os.path.isdir(sysfs_root_misc): + sysfs_root = sysfs_root_misc + else: + return + for tpm_id in sorted(os.listdir(sysfs_root)): + if sysfs_root == sysfs_root_misc and not tpm_id.startswith('tpm'): + continue + print("x-sysfs-device-name: {}".format(tpm_id)) + tpm_dirname = os.path.join(sysfs_root, tpm_id, 'device') + for tpm_attr in sorted(os.listdir(tpm_dirname)): + tpm_filename = os.path.join(tpm_dirname, tpm_attr) + if not os.path.isfile(tpm_filename): + continue + if not os.access(tpm_filename, os.R_OK): + continue + with open(tpm_filename, 'rt', encoding='utf-8') as stream: + tpm_data = stream.read() + tpm_data = tpm_data.rstrip() + if '\n' in tpm_data: + print("{}:".format(tpm_attr)) + for tpm_data_chunk in tpm_data.splitlines(): + print(" {}".format(tpm_data_chunk)) + else: + print("{}: {}".format(tpm_attr, tpm_data)) + print() + + +if __name__ == "__main__": + main() diff --git a/bin/wifi_master_mode b/bin/wifi_master_mode.py index 5953ea34..3888caa6 100755 --- a/bin/wifi_master_mode +++ b/bin/wifi_master_mode.py @@ -1,32 +1,32 @@ #!/usr/bin/env python3 -# Copyright 2015 Canonical Ltd. +# Copyright 2015-2020 Canonical Ltd. # All rights reserved. # # Written by: # Jonathan Cave <jonathan.cave@canonical.com> # Po-Hsu Lin <po-hsu.lin@canonical.com> +import argparse import logging import os import sys import subprocess import tempfile -from guacamole import Command - -class WifiMasterMode(Command): +class WifiMasterMode(): """Make system to act as an 802.11 Wi-Fi Access Point.""" - def register_arguments(self, parser): + def main(self): + parser = argparse.ArgumentParser() parser.add_argument('--protocol', default='g', choices=['a', 'b', 'g', 'ad'], help="802.11 protocol, possible value: a/b/g/ad") parser.add_argument('--auto', action='store_true', help="Run in the automated mode") + args = parser.parse_args() - def invoked(self, ctx): data_dir = "" try: data_dir = os.environ['PLAINBOX_PROVIDER_DATA'] @@ -50,12 +50,12 @@ class WifiMasterMode(Command): with tempfile.NamedTemporaryFile(mode='w+t') as conf_file_out: with open(conf_in, "r") as conf_file_in: data_in = conf_file_in.read() - data_out = data_in.replace("$PROTOCOL", ctx.args.protocol) + data_out = data_in.replace("$PROTOCOL", args.protocol) data_out = data_out.replace("$WIFI-DEV-NAME", wifi_dev) conf_file_out.write(data_out) conf_file_out.flush() - if ctx.args.auto: + if args.auto: child = subprocess.Popen(['hostapd', '-d', conf_file_out.name], stdout=subprocess.PIPE, universal_newlines=True) @@ -87,5 +87,6 @@ class WifiMasterMode(Command): finally: child.terminate() + if __name__ == "__main__": WifiMasterMode().main() diff --git a/bin/wwan_tests b/bin/wwan_tests.py index 17389e57..c19c547e 100755 --- a/bin/wwan_tests +++ b/bin/wwan_tests.py @@ -1,11 +1,12 @@ #!/usr/bin/env python3 -# Copyright 2015 Canonical Ltd. +# Copyright 2015-2020 Canonical Ltd. # All rights reserved. # # Written by: # Jonathan Cave <jonathan.cave@canonical.com> # Po-Hsu Lin <po-hsu.lin@canonical.com> +import argparse import logging import os import subprocess @@ -13,7 +14,6 @@ import sys import time import dbus -from guacamole import Command TEST_IP = "8.8.8.8" @@ -56,7 +56,8 @@ class MMDbus(): if (excp.get_dbus_name() == "org.freedesktop.DBus.Error.ServiceUnknown"): logging.error(excp.get_dbus_message()) - logging.error("Note: wwan_tests requires ModemManager >=1.0") + logging.error( + "Note: wwan_tests.py requires ModemManager >=1.0") else: logging.error(excp.get_dbus_message()) return @@ -258,9 +259,10 @@ def _ping_test(if_name): return ret_code -class ThreeGppConnection(Command): +class ThreeGppConnection(): - def register_arguments(self, parser): + def invoked(self, ctx): + parser = argparse.ArgumentParser() parser.add_argument('wwan_control_if', type=str, help='The control interface for the device') parser.add_argument('wwan_net_if', type=str, @@ -269,14 +271,13 @@ class ThreeGppConnection(Command): help='The APN for data connection') parser.add_argument('wwan_setup_time', type=int, default=30, help='delay before ping test') - - def invoked(self, ctx): + args = parser.parse_args(sys.argv[2:]) ret_code = 1 try: - _create_3gpp_connection(ctx.args.wwan_control_if, ctx.args.apn) + _create_3gpp_connection(ctx.args.wwan_control_if, args.apn) _wwan_radio_on() - time.sleep(ctx.args.wwan_setup_time) - ret_code = _ping_test(ctx.args.wwan_net_if) + time.sleep(args.wwan_setup_time) + ret_code = _ping_test(args.wwan_net_if) except: pass _destroy_3gpp_connection() @@ -284,20 +285,28 @@ class ThreeGppConnection(Command): return ret_code -class CountModems(Command): +class CountModems(): - def invoked(self, ctx): - if ctx.args.use_cli: + def invoked(self): + parser = argparse.ArgumentParser() + parser.add_argument('--use-cli', action='store_true', + help="Use mmcli for all calls rather than dbus") + args = parser.parse_args(sys.argv[2:]) + if args.use_cli: mm = MMCLI() else: mm = MMDbus() print(len(mm.get_modem_ids())) -class Resources(Command): +class Resources(): - def invoked(self, ctx): - if ctx.args.use_cli: + def invoked(self): + parser = argparse.ArgumentParser() + parser.add_argument('--use-cli', action='store_true', + help="Use mmcli for all calls rather than dbus") + args = parser.parse_args(sys.argv[2:]) + if args.use_cli: mm = MMCLI() else: mm = MMDbus() @@ -332,55 +341,60 @@ class Resources(Command): print() -class SimPresent(Command): +class SimPresent(): - def register_arguments(self, parser): + def invoked(self): + parser = argparse.ArgumentParser() parser.add_argument('hw_id', type=str, help='The hardware ID of the modem whose attached' 'SIM we want to query') - - def invoked(self, ctx): - if ctx.args.use_cli: + parser.add_argument('--use-cli', action='store_true', + help="Use mmcli for all calls rather than dbus") + args = parser.parse_args(sys.argv[2:]) + if args.use_cli: mm = MMCLI() else: mm = MMDbus() - mm_id = mm.equipment_id_to_mm_id(ctx.args.hw_id) + mm_id = mm.equipment_id_to_mm_id(args.hw_id) if not mm.sim_present(mm_id): return 1 -class SimInfo(Command): +class SimInfo(): - def register_arguments(self, parser): + def invoked(self): + parser = argparse.ArgumentParser() parser.add_argument('hw_id', type=str, help='The hardware ID of the modem whose attached' 'SIM we want to query') - - def invoked(self, ctx): - if ctx.args.use_cli: + parser.add_argument('--use-cli', action='store_true', + help="Use mmcli for all calls rather than dbus") + args = parser.parse_args(sys.argv[2:]) + if args.use_cli: mm = MMCLI() else: mm = MMDbus() - mm_id = mm.equipment_id_to_mm_id(ctx.args.hw_id) + mm_id = mm.equipment_id_to_mm_id(args.hw_id) print("Operator: {}".format(mm.get_sim_operatorname(mm_id))) print("IMSI: {}".format(mm.get_sim_imsi(mm_id))) print("MCC/MNC: {}".format(mm.get_sim_operatoridentifier(mm_id))) print("ICCID: {}".format(mm.get_sim_simidentifier(mm_id))) -class WWANTests(Command): - - sub_commands = ( - ('count', CountModems), - ('resources', Resources), - ('3gpp-connection', ThreeGppConnection), - ('sim-present', SimPresent), - ('sim-info', SimInfo) - ) - - def register_arguments(self, parser): - parser.add_argument('--use-cli', action='store_true', - help="Use mmcli for all calls rather than dbus") +class WWANTests(): + + def main(self): + sub_commands = { + 'count': CountModems, + 'resources': Resources, + '3gpp-connection': ThreeGppConnection, + 'sim-present': SimPresent, + 'sim-info': SimInfo + } + parser = argparse.ArgumentParser() + parser.add_argument('subcommand', type=str, choices=sub_commands) + args = parser.parse_args(sys.argv[1:2]) + sub_commands[args.subcommand]().invoked() if __name__ == "__main__": @@ -5,7 +5,7 @@ from plainbox.provider_manager import N_ setup( name='plainbox-provider-checkbox', namespace='com.canonical.certification', - version="0.51.0.dev0", + version="0.52.0.dev0", description=N_("Checkbox provider"), gettext_domain='plainbox-provider-checkbox', strict=False, deprecated=False, diff --git a/requirements/container-tests-provider-checkbox b/requirements/container-tests-provider-checkbox index 311b4034..7f8a0ba9 100755 --- a/requirements/container-tests-provider-checkbox +++ b/requirements/container-tests-provider-checkbox @@ -9,7 +9,12 @@ git clone git://git.launchpad.net/checkbox-ng $TMPDIR/checkbox-ng echo "I: running 'develop' on checkbox-ng" ( cd $TMPDIR/checkbox-ng/ && python3 setup.py develop --quiet | sed -e 's/^/I (develop output) /' ) +git clone git://git.launchpad.net/checkbox-support $TMPDIR/checkbox-support +echo "I: running 'develop' on checkbox-support" +( cd $TMPDIR/checkbox-support/ && python3 setup.py develop --quiet | sed -e 's/^/I (develop output) /' ) + git clone git://git.launchpad.net/plainbox-provider-resource $TMPDIR/plainbox-provider-resource python3 $TMPDIR/plainbox-provider-resource/manage.py develop --force ./manage.py validate +./manage.py test diff --git a/tests/test_dkms_info.py b/tests/test_dkms_info.py new file mode 100644 index 00000000..30f3ae48 --- /dev/null +++ b/tests/test_dkms_info.py @@ -0,0 +1,158 @@ +#!/usr/bin/env python3 +# encoding: utf-8 +# Copyright 2015 Canonical Ltd. +# Written by: +# Shawn Wang <shawn.wang@canonical.com> +# Zygmunt Krynicki <zygmunt.krynicki@canonical.com> +# Jonathan Cave <jonathan.cave@canonical.com> +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License version 3, +# as published by the Free Software Foundation. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +import io +import unittest +from unittest import mock + +import dkms_info + + +class SystemInfoTests(unittest.TestCase): + + """Tests for System Information Parsing and Collection.""" + + _proc_modules = """\ +xt_REDIRECT 16384 3 - Live 0x0000000000000000 +nf_nat_redirect 16384 1 xt_REDIRECT, Live 0x0000000000000000 +xt_hl 16384 3 - Live 0x0000000000000000 +hid_generic 16384 0 - Live 0x0000000000000000 +usbhid 53248 0 - Live 0x0000000000000000 +hid 110592 2 hid_generic,usbhid, Live 0x0000000000000000 +overlay 45056 1 - Live 0x0000000000000000 +""" + _modalias = """\ +usb:v1D6Bp0003d0319dc09dsc00dp03ic09isc00ip00in00 +""" + + def setUp(self): + """Common setup code.""" + dkms_info.get_system_module_list.cache_clear() + dkms_info.get_system_modaliases.cache_clear() + + @mock.patch('io.open', mock.mock_open(read_data=_proc_modules)) + def test_get_module_list__calls_and_parses_lsmod(self): + """Ensure that get_module_list() parses lsmod output.""" + # NOTE: Return value was loaded from my system running kernel 4.0. + # The first few and last rows to be precise. + modules = dkms_info.get_system_module_list() + self.assertEqual(modules, [ + 'xt_REDIRECT', 'nf_nat_redirect', 'xt_hl', 'hid_generic', + 'usbhid', 'hid', 'overlay']) + + @mock.patch('io.open', mock.mock_open(read_data=_proc_modules)) + def test_get_module_list_is_cached(self): + """Ensure that get_module_list() cache works.""" + modules1 = dkms_info.get_system_module_list() + modules2 = dkms_info.get_system_module_list() + self.assertIn('xt_REDIRECT', modules1) + self.assertIn('overlay', modules2) + self.assertEqual(modules1, modules2) + + @mock.patch('os.walk') + @mock.patch('io.open', mock.mock_open(read_data=_modalias)) + def test_get_system_modalias(self, mock_os_walk): + """test_get_system_modalias.""" + mock_os_walk.return_value = [ + ("/sys/devices/pci0000:00/0000:00:14.0/usb2/2-0:1.0/modalias", + ["driver", "subsystem"], + ["modalias", "uevent"]), + ] + + """fetch hw_modaliases from machine and check modalis types.""" + modaliases = dkms_info.get_system_modaliases() + self.assertEqual(len(modaliases), 1) + self.assertIn("usb", modaliases) + + @mock.patch('os.uname') + @mock.patch('os.walk') + def test_get_installed_dkms_modules(self, mock_os_walk, mock_os_uname): + """test_get_installed_dkms_modules.""" + mock_os_walk.return_value = [ + ("/var/lib/dkms/hello/0.1", + ["3.19.0-15-generic", "build", "source"], + []), + ] + o = mock.Mock() + o.release = "3.19.0-15-generic" + mock_os_uname.return_value = o + self.assertEqual([['hello', '0.1']], + dkms_info.get_installed_dkms_modules()) + + @mock.patch('dkms_info.get_system_modaliases') + def test_match_patterns(self, mock_get_system_modaliases): + """Test of match_patterns.""" + mock_get_system_modaliases.return_value = { + "pci": ["v0000168Cd00000036sv0000103Csd0000217Fbc02sc80i00", + "v00008086d00008C26sv0000103Csd000022D9bc0Csc03i20"], + "usb": ["v8087p8000d0005dc09dsc00dp01ic09isc00ip00in00", + "v1D6Bp0002d0319dc09dsc00dp00ic09isc00ip00in00"], + } + pkg_modalieses = ["pci:v00008086d00008C26sv*sd*bc*sc*i*", + "usb:v07B4p010Ad0102dc*dsc*dp*ic*isc*ip*in*", + "oemalias:test"] + matched_modalieses = dkms_info.match_patterns(tuple(pkg_modalieses)) + # match_patterns + self.assertIn("pci:v00008086d00008C26sv*sd*bc*sc*i*", + matched_modalieses) + self.assertIn("oemalias:test", + matched_modalieses) + self.assertNotIn("usb:v07B4p010Ad0102dc*dsc*dp*ic*isc*ip*in*", + matched_modalieses) + + +class DebianPackageHandlerTest(unittest.TestCase): + + """Test of DebianPackageHandler.""" + + _var_lib_dpkg_status = """\ +Package: foo +Status: install ok installed +Modaliases: hwe(pci:v000099DDd00000036sv*sd*bc*sc*i*) + +Package: foo1 +Status: install ok installed +Modaliases: hwe(pci:v0000AADDd00000036sv*sd*bc*sc*i*) + +Package: foo2 +Status: install ok installed + +Package: foo3 +Status: install ok installed + +Package: bar +Status: install ok installed + +""" + + @mock.patch('io.open', mock.mock_open(read_data=_var_lib_dpkg_status)) + @mock.patch('dkms_info.get_system_modaliases') + def test_get_pkgs(self, mock_get_system_modaliases): + """Test of test_get_pkgs.""" + mock_get_system_modaliases.return_value = { + "pci": ["v0000168Cd00000036sv0000103Csd0000217Fbc02sc80i00", + "v00008086d00008C26sv0000103Csd000022D9bc0Csc03i20"], + "usb": ["v8087p8000d0005dc09dsc00dp01ic09isc00ip00in00", + "v1D6Bp0002d0319dc09dsc00dp00ic09isc00ip00in00"], + } + + self.pkg_handler = dkms_info.DebianPackageHandler( + file_object=io.StringIO(self._var_lib_dpkg_status)) + self.assertEqual(len(self.pkg_handler.pkgs), 2) diff --git a/tests/test_recovery_info.py b/tests/test_recovery_info.py new file mode 100644 index 00000000..19658712 --- /dev/null +++ b/tests/test_recovery_info.py @@ -0,0 +1,122 @@ +#!/usr/bin/env python3 +# Copyright 2015-2020 Canonical Ltd. +# Written by: +# Shawn Wang <shawn.wang@canonical.com> +# Jonathan Cave <jonathan.cave@canonical.com> +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License version 3, +# as published by the Free Software Foundation. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +import os +import unittest +from unittest import mock +from unittest.mock import patch +import sys + +import recovery_info + + +class FunctionTests(unittest.TestCase): + + """Tests for several functions.""" + + @mock.patch('subprocess.check_output') + def test_get_recovery_package(self, mock_subprocess_check_output): + """Smoke test for get_recovery_package().""" + mock_subprocess_check_output.return_value = """\ +dell-recovery: + Installed: 1.11 + Candidate: 1.11 + Version table: + 1.11 + 500 https://archive/cesg-mirror/ test/public amd64 Packages +""" + self.assertEqual(recovery_info.get_recovery_package(), + "dell-recovery_1.11") + + @mock.patch('subprocess.check_output') + def test_get_recovery_partition(self, mock_subprocess_check_output): + """Smoke test for get_recovery_partition().""" + mock_subprocess_check_output.return_value = ( + b'TYPE FSTYPE NAME LABEL\n' + b'disk linux_raid_member sda fx:2x250GB\n' + b'raid1 bcache md127 \n' + b'disk ext4 bcache0 Ultra\n' + b'disk linux_raid_member sdb fx:2x250GB\n' + b'raid1 bcache md127 \n' + b'disk ext4 bcache0 Ultra\n' + b'disk sdc \n' + b'part btrfs sdc1 vol1\n' + b'disk sdd \n' + b'part ntfs sdd1 Windows\x208.1\n' + b'part sdd2 \n' + b'part ext4 sdd5 Utopic\n' + b'part swap sdd6 \n' + b'disk bcache sde \n' + b'disk ext4 bcache0 Ultra\n' + b'disk sdf \n' + b'part ntfs sda3 RECOVERY\n') + self.assertEqual(recovery_info.get_recovery_partition(), + ("DELL", "/dev/sda3")) + + def test_lsblk_unescape(self): + """Smoke tests for lsblk_unescape().""" + self.assertEqual(recovery_info.lsblk_unescape( + 'Windows\\x208.1'), 'Windows 8.1') + self.assertEqual(recovery_info.lsblk_unescape( + 'Windows XP'), 'Windows XP') + + +class MountedPartitionTests(unittest.TestCase): + + """Unittest of MountedPartition.""" + + @mock.patch('subprocess.check_output') + def test_with_of_MountedPartition(self, mock_subprocess_check_output): + """Test mount point.""" + test_dir = "" + with recovery_info.MountedPartition("/dev/test") as tmp: + test_dir = tmp + self.assertTrue(os.path.exists(test_dir)) + mock_subprocess_check_output.assert_has_calls( + [mock.call(['mount', '/dev/test', test_dir], + universal_newlines=True)]) + self.assertFalse(os.path.exists(test_dir)) + mock_subprocess_check_output.assert_has_calls( + [mock.call(['umount', test_dir], + universal_newlines=True)]) + + +class RecoveryInfoTests(unittest.TestCase): + + """Tests for RecoveryInfo.""" + + @mock.patch('recovery_info.get_recovery_package') + @mock.patch('recovery_info.get_recovery_partition') + def test_smoke(self, mock_get_recovery_partition, + mock_get_recovery_package): + """Smoke tests for running recovery_info.""" + mock_get_recovery_partition.return_value = ("DELL", "/dev/sda3") + mock_get_recovery_package.return_value = "dell-recovery_1.11" + + testargs = ["recovery_info.py"] + with patch.object(sys, 'argv', testargs): + self.assertIsNone(recovery_info.RecoveryInfo().main()) + + testargs = ["recovery_info.py", "checktype", "HP"] + with patch.object(sys, 'argv', testargs): + with self.assertRaises(SystemExit): + recovery_info.RecoveryInfo().main() + + testargs = ["recovery_info.py", "checktype", "DELL"] + with patch.object(sys, 'argv', testargs): + self.assertIsNone(recovery_info.RecoveryInfo().main()) diff --git a/units/info/jobs.pxu b/units/info/jobs.pxu index a513deb7..e1d56aea 100644 --- a/units/info/jobs.pxu +++ b/units/info/jobs.pxu @@ -218,7 +218,7 @@ plugin: attachment category_id: com.canonical.plainbox::info requires: package.name == 'dkms' -command: dkms_info --format json +command: dkms_info.py --format json _description: Attaches json dumps of installed dkms package information. _summary: Attaches json dumps of installed dkms package information. @@ -361,7 +361,7 @@ user: root estimated_duration: 0.2 _summary: Check existence of recovery partition _description: Check existence of recovery partition -command: recovery_info +command: recovery_info.py plugin: shell category_id: com.canonical.plainbox::info @@ -372,7 +372,7 @@ user: root estimated_duration: 0.2 _summary: Check the recovery type is dell or not _description: Check the recovery type is dell or not -command: recovery_info checktype DELL +command: recovery_info.py checktype DELL plugin: attachment category_id: com.canonical.plainbox::info @@ -389,7 +389,7 @@ _description: Example: image_version: somerville-trusty-amd64-20140620-0 bto_version: A00_dell-bto-trusty-houston-15-A11-iso-20141203-0.iso -command: recovery_info version +command: recovery_info.py version plugin: attachment category_id: com.canonical.plainbox::info @@ -404,7 +404,7 @@ _description: The information include: - fish packages - dell recovery stage 2 boot log -command: recovery_info file bto.xml +command: recovery_info.py file bto.xml plugin: attachment category_id: com.canonical.plainbox::info diff --git a/units/info/packaging.pxu b/units/info/packaging.pxu index 67f18c2b..61e26d23 100644 --- a/units/info/packaging.pxu +++ b/units/info/packaging.pxu @@ -3,11 +3,6 @@ unit: packaging meta-data os-id: debian Depends: python3-debian -# The dkms_info script requires python3-guacamole package -unit: packaging meta-data -os-id: debian -Depends: python3-guacamole (>= 0.9) - # This is for lsblk attachment and disk/detect unit: packaging meta-data os-id: debian diff --git a/units/submission/jobs.pxu b/units/submission/jobs.pxu index b433fd38..e0829b63 100644 --- a/units/submission/jobs.pxu +++ b/units/submission/jobs.pxu @@ -2,7 +2,7 @@ id: dkms_info_json plugin: attachment category_id: com.canonical.plainbox::info command: - dkms_info --format json | python3 -m plainbox dev parse dkms-info | \ + dkms_info.py --format json | python3 -m plainbox dev parse dkms-info | \ jq '.dkms_info' _description: Attaches json dumps of installed dkms package information. _summary: Attaches json dumps of installed dkms package information. @@ -62,7 +62,7 @@ _description: The information include: - fish packages - dell recovery stage 2 boot log -command: recovery_info file bto.xml | python3 -m plainbox dev parse bto +command: recovery_info.py file bto.xml | python3 -m plainbox dev parse bto id: recovery_info_attachment_json plugin: attachment @@ -79,7 +79,7 @@ _description: Example: image_version: somerville-trusty-amd64-20140620-0 bto_version: A00_dell-bto-trusty-houston-15-A11-iso-20141203-0.iso -command: recovery_info version | python3 -m plainbox dev parse recovery-info +command: recovery_info.py version | python3 -m plainbox dev parse recovery-info id: system_info_json plugin: attachment diff --git a/units/tpm/sysfs.pxu b/units/tpm/sysfs.pxu index fdbf3902..5db5fe74 100644 --- a/units/tpm/sysfs.pxu +++ b/units/tpm/sysfs.pxu @@ -11,8 +11,8 @@ plugin: resource _summary: Count the number of visible TPM chips in sysfs _description: This job just counts the number of visible TPM chips in as reported by - tpm-sysfs-resource tool. The only resource attribute is 'count' -command: echo "count: $(tpm-sysfs-resource | grep -F 'x-sysfs-device-name' | wc -l)" + tpm_sysfs_resource.py tool. The only resource attribute is 'count' +command: echo "count: $(tpm_sysfs_resource.py | grep -F 'x-sysfs-device-name' | wc -l)" estimated_duration: 2s flags: preserve-locale @@ -24,7 +24,7 @@ _summary: Collect TPM information from sysfs _description: This job collects all the available TPM information from /sys/class/tpm/*/device/*. -command: tpm-sysfs-resource +command: tpm_sysfs_resource.py estimated_duration: 2s flags: preserve-locale # Tie this resource with the has_tpm_chip manifest entry. This way it will @@ -43,7 +43,7 @@ _description: This job collects all the available TPM information from /sys/class/tpm/*/device/*. Distinct files present there are converted to attributes of resource records. -command: tpm-sysfs-resource +command: tpm_sysfs_resource.py estimated_duration: 2s flags: preserve-locale # See note about manifest on the sysfs_tpm job above. @@ -60,7 +60,7 @@ _description: This job collects all the available TPM information from /sys/class/tpm/*/device/*. Distinct files present there are converted to attributes of resource records. -command: tpm-sysfs-resource +command: tpm_sysfs_resource.py estimated_duration: 2s flags: preserve-locale # See note about manifest on the sysfs_tpm job above. @@ -76,6 +76,6 @@ _description: This job collects all the available DMI information from /sys/class/dmi/id/*. The main purpose of including this job is to allow the provider to include vendor-specific quirks by looking at the sysfs_dmi.bios_vendor attribute. -command: dmi-sysfs-resource +command: dmi_sysfs_resource.py estimated_duration: 1s flags: preserve-locale diff --git a/units/wireless/wifi-ap.pxu b/units/wireless/wifi-ap.pxu index 335d5051..493a2ca4 100644 --- a/units/wireless/wifi-ap.pxu +++ b/units/wireless/wifi-ap.pxu @@ -585,7 +585,6 @@ plugin: user-interact-verify requires: wifi_interface_mode.{interface}_AP == 'supported' snap.name == 'wifi-ap' -depends: wireless/wireless_connection_open_bg_nm_{interface} estimated_duration: 120.0 environ: LD_LIBRARY_PATH OPEN_BG_SSID command: @@ -624,7 +623,6 @@ plugin: shell requires: wifi_interface_mode.{interface}_AP == 'supported' snap.name == 'wifi-ap' -depends: wireless/wireless_connection_open_bg_nm_{interface} estimated_duration: 120.0 environ: LD_LIBRARY_PATH OPEN_BG_SSID WIFI_AP_SETUPTIME command: @@ -673,7 +671,6 @@ plugin: user-interact-verify requires: wifi_interface_mode.{interface}_AP == 'supported' snap.name == 'wifi-ap' -depends: wireless/wireless_connection_open_bg_nm_{interface} estimated_duration: 120.0 environ: LD_LIBRARY_PATH $OPEN_BG_SSID command: @@ -712,7 +709,6 @@ plugin: shell requires: wifi_interface_mode.{interface}_AP == 'supported' snap.name == 'wifi-ap' -depends: wireless/wireless_connection_open_bg_nm_{interface} estimated_duration: 120.0 environ: LD_LIBRARY_PATH OPEN_BG_SSID WIFI_AP_SETUPTIME command: @@ -761,7 +757,6 @@ plugin: user-interact-verify requires: wifi_interface_mode.{interface}_AP == 'supported' snap.name == 'wifi-ap' -depends: wireless/wireless_connection_open_bg_nm_{interface} estimated_duration: 120.0 environ: LD_LIBRARY_PATH OPEN_BG_SSID command: @@ -800,7 +795,6 @@ plugin: shell requires: wifi_interface_mode.{interface}_AP == 'supported' snap.name == 'wifi-ap' -depends: wireless/wireless_connection_open_bg_nm_{interface} estimated_duration: 120.0 environ: LD_LIBRARY_PATH OPEN_BG_SSID WIFI_AP_SETUPTIME command: @@ -849,7 +843,6 @@ plugin: user-interact-verify requires: wifi_interface_mode.{interface}_AP == 'supported' snap.name == 'wifi-ap' -depends: wireless/wireless_connection_open_bg_nm_{interface} estimated_duration: 120.0 environ: LD_LIBRARY_PATH OPEN_BG_SSID command: @@ -888,7 +881,6 @@ plugin: shell requires: wifi_interface_mode.{interface}_AP == 'supported' snap.name == 'wifi-ap' -depends: wireless/wireless_connection_open_bg_nm_{interface} estimated_duration: 120.0 environ: LD_LIBRARY_PATH OPEN_BG_SSID WIFI_AP_SETUPTIME command: diff --git a/units/wwan/jobs.pxu b/units/wwan/jobs.pxu index d18cdc31..b0313856 100644 --- a/units/wwan/jobs.pxu +++ b/units/wwan/jobs.pxu @@ -15,7 +15,7 @@ _purpose: plugin: shell user: root command: - COUNT=$(wwan_tests count) + COUNT=$(wwan_tests.py count) if [ $COUNT -eq 0 ]; then exit 1 fi @@ -38,7 +38,7 @@ _description: plugin: shell command: BEGIN_CONNECTION_TEST_TS=`date '+%Y-%m-%d %H:%M:%S'` - wwan_tests 3gpp-connection $WWAN_CONTROL_IF $WWAN_NET_IF $WWAN_APN ${{WWAN_SETUPTIME:-30}} + wwan_tests.py 3gpp-connection $WWAN_CONTROL_IF $WWAN_NET_IF $WWAN_APN ${{WWAN_SETUPTIME:-30}} RETVAL=$? if [ $RETVAL -ne 0 ]; then echo "==== Service units logs ====" @@ -64,7 +64,7 @@ _summary: Check if a SIM card is present in a slot connected to the modem _description: Check if a SIM card is present in a slot connected to the modem plugin: shell -command: wwan_tests sim-present {hw_id} +command: wwan_tests.py sim-present {hw_id} environ: LD_LIBRARY_PATH user: root estimated_duration: 10.0 @@ -90,7 +90,7 @@ _steps: 1. Start the test to automatically retrieve information from the SIM card _verification: Check the output, if as expected then mark the test as passed. -command: wwan_tests sim-info {hw_id} +command: wwan_tests.py sim-info {hw_id} environ: LD_LIBRARY_PATH user: root estimated_duration: 5s diff --git a/units/wwan/resource.pxu b/units/wwan/resource.pxu index a0e82ff8..0197f15e 100644 --- a/units/wwan/resource.pxu +++ b/units/wwan/resource.pxu @@ -11,7 +11,7 @@ category_id: wwan plugin: resource _summary: Gather device info about WWAN modems _description: Gather device info about WWAN modems -command: wwan_tests --use-cli resources +command: wwan_tests.py --use-cli resources user: root estimated_duration: 3s flags: preserve-locale |
