From 5476520cd9983b2c8d65c98ea1ba542ac922fe95 Mon Sep 17 00:00:00 2001 From: Sylvain Pineau Date: Fri, 17 Jul 2020 10:48:52 +0200 Subject: bin:testlib.py: Deleted --- bin/testlib.py | 1450 -------------------------------------------------------- 1 file changed, 1450 deletions(-) delete mode 100755 bin/testlib.py diff --git a/bin/testlib.py b/bin/testlib.py deleted file mode 100755 index 5f783f8..0000000 --- a/bin/testlib.py +++ /dev/null @@ -1,1450 +0,0 @@ -from __future__ import print_function -# -# testlib.py quality assurance test script -# Copyright (C) 2008-2016 Canonical Ltd. -# -# This library is free software; you can redistribute it and/or -# modify it under the terms of the GNU Library General Public -# License as published by the Free Software Foundation; either -# version 2 of the License. -# -# This library is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU -# Library General Public License for more details. -# -# You should have received a copy of the GNU Library General Public -# License along with this program. If not, see -# . -# - -'''Common classes and functions for package tests.''' - -import crypt -import glob -import grp -import gzip -import os -import os.path -import platform -import pwd -import random -import re -import shutil -import socket -import string -import subprocess -import sys -import tempfile -import time -import unittest - -from stat import ST_SIZE - -# Don't make python-pexpect mandatory -try: - import pexpect -except ImportError: - pass - -import warnings -warnings.filterwarnings('ignore', message=r'.*apt_pkg\.TagFile.*', category=DeprecationWarning) -try: - import apt_pkg - # cope with apt_pkg api changes. - if 'init_system' in dir(apt_pkg): - apt_pkg.init_system() - else: - apt_pkg.InitSystem() -except: - # On non-Debian system, fall back to simple comparison without debianisms - class apt_pkg(object): - @staticmethod - def version_compare(one, two): - list_one = one.split('.') - list_two = two.split('.') - while len(list_one) > 0 and len(list_two) > 0: - try: - if int(list_one[0]) > int(list_two[0]): - return 1 - if int(list_one[0]) < int(list_two[0]): - return -1 - except: - # ugh, non-numerics in the version, fall back to - # string comparison, which will be wrong for e.g. - # 3.2 vs 3.16rc1 - if list_one[0] > list_two[0]: - return 1 - if list_one[0] < list_two[0]: - return -1 - list_one.pop(0) - list_two.pop(0) - return 0 - - @staticmethod - def VersionCompare(one, two): - return apt_pkg.version_compare(one, two) - -bogus_nxdomain = "208.69.32.132" - -# http://www.chiark.greenend.org.uk/ucgi/~cjwatson/blosxom/2009-07-02-python-sigpipe_html -# This is needed so that the subprocesses that produce endless output -# actually quit when the reader goes away. -import signal -def subprocess_setup(): - # Python installs a SIGPIPE handler by default. This is usually not what - # non-Python subprocesses expect. - signal.signal(signal.SIGPIPE, signal.SIG_DFL) - - -class TimedOutException(Exception): - def __init__(self, value="Timed Out"): - self.value = value - - def __str__(self): - return repr(self.value) - - -def _restore_backup(path): - pathbackup = path + '.autotest' - if os.path.exists(pathbackup): - shutil.move(pathbackup, path) - - -def _save_backup(path): - pathbackup = path + '.autotest' - if os.path.exists(path) and not os.path.exists(pathbackup): - shutil.copy2(path, pathbackup) - # copy2 does not copy ownership, so do it here. - # Reference: http://docs.python.org/library/shutil_html - a = os.stat(path) - os.chown(pathbackup, a[4], a[5]) - - -def config_copydir(path): - if os.path.exists(path) and not os.path.isdir(path): - raise OSError("'%s' is not a directory" % (path)) - _restore_backup(path) - - pathbackup = path + '.autotest' - if os.path.exists(path): - shutil.copytree(path, pathbackup, symlinks=True) - - -def config_replace(path, contents, append=False): - '''Replace (or append) to a config file''' - _restore_backup(path) - if os.path.exists(path): - _save_backup(path) - if append: - with open(path) as fh: - contents = fh.read() + contents - with open(path, 'w') as fh: - fh.write(contents) - - -def config_comment(path, field): - _save_backup(path) - contents = "" - with open(path) as fh: - for line in fh: - if re.search("^\s*%s\s*=" % (field), line): - line = "#" + line - contents += line - - with open(path + '.new', 'w') as new_fh: - new_fh.write(contents) - os.rename(path + '.new', path) - - -def config_set(path, field, value, spaces=True): - _save_backup(path) - contents = "" - if spaces: - setting = '%s = %s\n' % (field, value) - else: - setting = '%s=%s\n' % (field, value) - found = False - with open(path) as fh: - for line in fh: - if re.search("^\s*%s\s*=" % (field), line): - found = True - line = setting - contents += line - if not found: - contents += setting - - with open(path + '.new', 'w') as new_config: - new_config.write(contents) - os.rename(path + '.new', path) - - -def config_patch(path, patch, depth=1): - '''Patch a config file''' - _restore_backup(path) - _save_backup(path) - - handle, name = mkstemp_fill(patch) - rc = subprocess.call(['/usr/bin/patch', '-p%s' % depth, path], stdin=handle, stdout=subprocess.PIPE) - os.unlink(name) - if rc != 0: - raise Exception("Patch failed") - - -def config_restore(path): - '''Rename a replaced config file back to its initial state''' - _restore_backup(path) - - -def timeout(secs, f, *args): - def handler(signum, frame): - raise TimedOutException() - - old = signal.signal(signal.SIGALRM, handler) - result = None - signal.alarm(secs) - try: - result = f(*args) - finally: - signal.alarm(0) - signal.signal(signal.SIGALRM, old) - - return result - - -def require_nonroot(): - if os.geteuid() == 0: - print("This series of tests should be run as a regular user with sudo access, not as root.", file=sys.stderr) - sys.exit(1) - - -def require_root(): - if os.geteuid() != 0: - print("This series of tests should be run with root privileges (e.g. via sudo).", file=sys.stderr) - sys.exit(1) - - -def require_sudo(): - if os.geteuid() != 0 or os.environ.get('SUDO_USER', None) is None: - print("This series of tests must be run under sudo.", file=sys.stderr) - sys.exit(1) - if os.environ['SUDO_USER'] == 'root': - print('Please run this test using sudo from a regular user. (You ran sudo from root.)', file=sys.stderr) - sys.exit(1) - - -def random_string(length, lower=False): - '''Return a random string, consisting of ASCII letters, with given - length.''' - - s = '' - selection = string.ascii_letters - if lower: - selection = string.ascii_lowercase - maxind = len(selection) - 1 - for l in range(length): - s += selection[random.randint(0, maxind)] - return s - - -def mkstemp_fill(contents, suffix='', prefix='testlib-', dir=None): - '''As tempfile.mkstemp does, return a (file, name) pair, but with - prefilled contents.''' - - handle, name = tempfile.mkstemp(suffix=suffix, prefix=prefix, dir=dir) - os.close(handle) - handle = open(name, "w+") - handle.write(contents) - handle.flush() - handle.seek(0) - - return handle, name - - -def create_fill(path, contents, mode=0o644): - '''Safely create a page''' - # make the temp file in the same dir as the destination file so we - # don't get invalid cross-device link errors when we rename - handle, name = mkstemp_fill(contents, dir=os.path.dirname(path)) - handle.close() - os.rename(name, path) - os.chmod(path, mode) - - -def login_exists(login): - '''Checks whether the given login exists on the system.''' - - try: - pwd.getpwnam(login) - return True - except KeyError: - return False - - -def group_exists(group): - '''Checks whether the given login exists on the system.''' - - try: - grp.getgrnam(group) - return True - except KeyError: - return False - - -def is_empty_file(path): - '''test if file is empty, returns True if so''' - with open(path) as fh: - return (len(fh.read()) == 0) - - -def recursive_rm(dirPath, contents_only=False): - '''recursively remove directory''' - names = os.listdir(dirPath) - for name in names: - path = os.path.join(dirPath, name) - if os.path.islink(path) or not os.path.isdir(path): - os.unlink(path) - else: - recursive_rm(path) - if not contents_only: - os.rmdir(dirPath) - - -def check_pidfile(exe, pidfile): - '''Checks if pid in pidfile is running''' - if not os.path.exists(pidfile): - return False - - # get the pid - try: - with open(pidfile, 'r') as fd: - pid = fd.readline().rstrip('\n') - except: - return False - - return check_pid(exe, pid) - - -def check_pid(exe, pid): - '''Checks if pid is running''' - cmdline = "/proc/%s/cmdline" % (str(pid)) - if not os.path.exists(cmdline): - return False - - # get the command line - try: - with open(cmdline, 'r') as fd: - tmp = fd.readline().split('\0') - except: - return False - - # this allows us to match absolute paths or just the executable name - if re.match('^' + exe + '$', tmp[0]) or \ - re.match('.*/' + exe + '$', tmp[0]) or \ - re.match('^' + exe + ': ', tmp[0]) or \ - re.match('^\(' + exe + '\)', tmp[0]): - return True - - return False - - -def check_port(port, proto, ver=4): - '''Check if something is listening on the specified port. - WARNING: for some reason this does not work with a bind mounted /proc - ''' - assert (port >= 1) - assert (port <= 65535) - assert (proto.lower() == "tcp" or proto.lower() == "udp") - assert (ver == 4 or ver == 6) - - fn = "/proc/net/%s" % (proto) - if ver == 6: - fn += str(ver) - - rc, report = cmd(['cat', fn]) - assert (rc == 0) - - hport = "%0.4x" % port - - if re.search(': [0-9a-f]{8}:%s [0-9a-f]' % str(hport).lower(), report.lower()): - return True - return False - - -def get_arch(): - '''Get the current architecture''' - rc, report = cmd(['uname', '-m']) - assert (rc == 0) - return report.strip() - - -def get_multiarch_tuple(): - '''Get the current debian multiarch tuple''' - - # XXX dpkg-architecture on Ubuntu 12.04 requires -q be adjacent to - # the variable name - rc, report = cmd(['dpkg-architecture', '-qDEB_HOST_MULTIARCH']) - assert (rc == 0) - return report.strip() - - -def get_bits(): - '''return the default architecture number of bits (32 or 64, usually)''' - return platform.architecture()[0][0:2] - - -def get_memory(): - '''Gets total ram and swap''' - meminfo = "/proc/meminfo" - memtotal = 0 - swaptotal = 0 - if not os.path.exists(meminfo): - return (False, False) - - try: - fd = open(meminfo, 'r') - for line in fd.readlines(): - splitline = line.split() - if splitline[0] == 'MemTotal:': - memtotal = int(splitline[1]) - elif splitline[0] == 'SwapTotal:': - swaptotal = int(splitline[1]) - fd.close() - except: - return (False, False) - - return (memtotal, swaptotal) - - -def is_running_in_vm(): - '''Check if running under a VM''' - # add other virtualization environments here - for search in ['QEMU Virtual CPU']: - rc, report = cmd_pipe(['dmesg'], ['grep', search]) - if rc == 0: - return True - return False - - -def ubuntu_release(): - '''Get the Ubuntu release''' - f = "/etc/lsb-release" - try: - size = os.stat(f)[ST_SIZE] - except: - return "UNKNOWN" - - if size > 1024 * 1024: - raise IOError('Could not open "%s" (too big)' % f) - - with open("/etc/lsb-release", 'r') as fh: - lines = fh.readlines() - - pat = re.compile(r'DISTRIB_CODENAME') - for line in lines: - if pat.search(line): - return line.split('=')[1].rstrip('\n').rstrip('\r') - - return "UNKNOWN" - - -def cmd(command, input=None, stderr=subprocess.STDOUT, stdout=subprocess.PIPE, stdin=None, timeout=None, env=None): - '''Try to execute given command (array) and return its stdout, or return - a textual error if it failed.''' - - try: - sp = subprocess.Popen(command, stdin=stdin, stdout=stdout, stderr=stderr, close_fds=True, preexec_fn=subprocess_setup, env=env, universal_newlines=True) - except OSError as e: - return [127, str(e)] - - out, outerr = sp.communicate(input) - # Handle redirection of stdout - if out is None: - out = '' - # Handle redirection of stderr - if outerr is None: - outerr = '' - return [sp.returncode, out + outerr] - - -def cmd_pipe(command1, command2, input=None, stderr=subprocess.STDOUT, stdin=None): - '''Try to pipe command1 into command2.''' - try: - sp1 = subprocess.Popen(command1, stdin=stdin, stdout=subprocess.PIPE, stderr=stderr, close_fds=True) - sp2 = subprocess.Popen(command2, stdin=sp1.stdout, stdout=subprocess.PIPE, stderr=stderr, close_fds=True) - except OSError as e: - return [127, str(e)] - - out = sp2.communicate(input)[0] - return [sp2.returncode, out] - - -def cwd_has_enough_space(cdir, total_bytes): - '''Determine if the partition of the current working directory has 'bytes' - free.''' - rc, df_output = cmd(['df']) - if rc != 0: - result = 'df failed, got exit code %d, expected %d\n' % (rc, 0) - raise OSError(result) - if rc != 0: - return False - - kb = total_bytes / 1024 - - mounts = dict() - for line in df_output.splitlines(): - if '/' not in line: - continue - tmp = line.split() - mounts[tmp[5]] = int(tmp[3]) - - cdir = os.getcwd() - while cdir != '/': - if cdir not in mounts: - cdir = os.path.dirname(cdir) - continue - return kb < mounts[cdir] - - return kb < mounts['/'] - - -def get_md5(filename): - '''Gets the md5sum of the file specified''' - - (rc, report) = cmd(["/usr/bin/md5sum", "-b", filename]) - expected = 0 - assert (expected == rc) - - return report.split(' ')[0] - - -def dpkg_compare_installed_version(pkg, check, version): - '''Gets the version for the installed package, and compares it to the - specified version. - ''' - (rc, report) = cmd(["/usr/bin/dpkg", "-s", pkg]) - assert (rc == 0) - assert ("Status: install ok installed" in report) - installed_version = "" - for line in report.splitlines(): - if line.startswith("Version: "): - installed_version = line.split()[1] - - assert (installed_version != "") - - (rc, report) = cmd(["/usr/bin/dpkg", "--compare-versions", installed_version, check, version]) - assert (rc == 0 or rc == 1) - if rc == 0: - return True - return False - - -def _run_apt_command(pkg_list, apt_cmd='install'): - env = os.environ.copy() - env['DEBIAN_FRONTEND'] = 'noninteractive' - # debugging version, but on precise doesn't actually run dpkg - # command = ['apt-get', '-y', '--force-yes', '-o', 'Dpkg::Options::=--force-confold', '-o', 'Debug::pkgDPkgPM=true', apt_cmd] - command = ['apt-get', '-y', '--force-yes', '-o', 'Dpkg::Options::=--force-confold', apt_cmd] - command.extend(pkg_list) - rc, report = cmd(command) - return rc, report - - -# note: for the following install_* functions, you probably want the -# versions in the TestlibCase class (see below) -def install_builddeps(src_pkg): - rc, report = _run_apt_command([src_pkg], 'build-dep') - assert(rc == 0) - - -def install_package(package): - rc, report = _run_apt_command([package], 'install') - assert(rc == 0) - - -def install_packages(pkg_list): - rc, report = _run_apt_command(pkg_list, 'install') - assert(rc == 0) - - -def prepare_source(source, builder, cached_src, build_src, patch_system): - '''Download and unpack source package, installing necessary build depends, - adjusting the permissions for the 'builder' user, and returning the - directory of the unpacked source. Patch system can be one of: - - cdbs - - dpatch - - quilt - - quiltv3 - - None (not the string) - - This is normally used like this: - - def setUp(self): - ... - self.topdir = os.getcwd() - self.cached_src = os.path.join(os.getcwd(), "source") - self.tmpdir = tempfile.mkdtemp(prefix='testlib', dir='/tmp') - self.builder = testlib.TestUser() - testlib.cmd(['chgrp', self.builder.login, self.tmpdir]) - os.chmod(self.tmpdir, 0o775) - - def tearDown(self): - ... - self.builder = None - self.topdir = os.getcwd() - if os.path.exists(self.tmpdir): - testlib.recursive_rm(self.tmpdir) - - def test_suite_build(self): - ... - build_dir = testlib.prepare_source('foo', \ - self.builder, \ - self.cached_src, \ - os.path.join(self.tmpdir, \ - os.path.basename(self.cached_src)), - "quilt") - os.chdir(build_dir) - - # Example for typical build, adjust as necessary - print("") - print(" make clean") - rc, report = testlib.cmd(['sudo', '-u', self.builder.login, 'make', 'clean']) - - print(" configure") - rc, report = testlib.cmd(['sudo', '-u', self.builder.login, './configure', '--prefix=%s' % self.tmpdir, '--enable-debug']) - - print(" make (will take a while)") - rc, report = testlib.cmd(['sudo', '-u', self.builder.login, 'make']) - - print(" make check (will take a while)",) - rc, report = testlib.cmd(['sudo', '-u', self.builder.login, 'make', 'check']) - expected = 0 - result = 'Got exit code %d, expected %d\n' % (rc, expected) - self.assertEqual(expected, rc, result + report) - - def test_suite_cleanup(self): - ... - if os.path.exists(self.cached_src): - testlib.recursive_rm(self.cached_src) - - It is up to the caller to clean up cached_src and build_src (as in the - above example, often the build_src is in a tmpdir that is cleaned in - tearDown() and the cached_src is cleaned in a one time clean-up - operation (eg 'test_suite_cleanup()) which must be run after the build - suite test (obviously). - ''' - - # Make sure we have a clean slate - assert (os.path.exists(os.path.dirname(build_src))) - assert (not os.path.exists(build_src)) - - cdir = os.getcwd() - if os.path.exists(cached_src): - shutil.copytree(cached_src, build_src) - os.chdir(build_src) - else: - # Only install the build dependencies on the initial setup - install_builddeps(source) - os.makedirs(build_src) - os.chdir(build_src) - - # These are always needed - pkgs = ['build-essential', 'dpkg-dev', 'fakeroot'] - install_packages(pkgs) - - rc, report = cmd(['apt-get', 'source', source]) - assert (rc == 0) - shutil.copytree(build_src, cached_src) - print("(unpacked %s)" % os.path.basename(glob.glob('%s_*.dsc' % source)[0]), end=' ') - - unpacked_dir = os.path.join(build_src, glob.glob('%s-*' % source)[0]) - - # Now apply the patches. Do it here so that we don't mess up our cached - # sources. - os.chdir(unpacked_dir) - assert (patch_system in ['cdbs', 'dpatch', 'quilt', 'quiltv3', None]) - if patch_system is not None and patch_system != "quiltv3": - if patch_system == "quilt": - os.environ.setdefault('QUILT_PATCHES', 'debian/patches') - rc, report = cmd(['quilt', 'push', '-a']) - assert (rc == 0) - elif patch_system == "cdbs": - rc, report = cmd(['./debian/rules', 'apply-patches']) - assert (rc == 0) - elif patch_system == "dpatch": - rc, report = cmd(['dpatch', 'apply-all']) - assert (rc == 0) - - cmd(['chown', '-R', '%s:%s' % (builder.uid, builder.gid), build_src]) - os.chdir(cdir) - - return unpacked_dir - - -def get_changelog_version(source_dir): - '''Extract a package version from a changelog''' - package_version = "" - - changelog_file = os.path.join(source_dir, "debian/changelog") - - if os.path.exists(changelog_file): - changelog = open(changelog_file, 'r') - header = changelog.readline().split() - package_version = header[1].strip('()') - - return package_version - - -def _aa_status(): - '''Get aa-status output''' - exe = "/usr/sbin/aa-status" - assert (os.path.exists(exe)) - if os.geteuid() == 0: - return cmd([exe]) - return cmd(['sudo', exe]) - - -def is_apparmor_loaded(path): - '''Check if profile is loaded''' - rc, report = _aa_status() - if rc != 0: - return False - - for line in report.splitlines(): - if line.endswith(path): - return True - return False - - -def is_apparmor_confined(path): - '''Check if application is confined''' - rc, report = _aa_status() - if rc != 0: - return False - - for line in report.splitlines(): - if re.search('%s \(' % path, line): - return True - return False - - -def check_apparmor(path, first_ubuntu_release, is_running=True): - '''Check if path is loaded and confined for everything higher than the - first Ubuntu release specified. - - Usage: - rc, report = testlib.check_apparmor('/usr/sbin/foo', 8.04, is_running=True) - if rc < 0: - return self._skipped(report) - - expected = 0 - result = 'Got exit code %d, expected %d\n' % (rc, expected) - self.assertEqual(expected, rc, result + report) - ''' - global manager - rc = -1 - - if manager.lsb_release["Release"] < first_ubuntu_release: - return (rc, "Skipped apparmor check") - - if not os.path.exists('/sbin/apparmor_parser'): - return (rc, "Skipped (couldn't find apparmor_parser)") - - rc = 0 - msg = "" - if not is_apparmor_loaded(path): - rc = 1 - msg = "Profile not loaded for '%s'" % path - - # this check only makes sense it the 'path' is currently executing - if is_running and rc == 0 and not is_apparmor_confined(path): - rc = 1 - msg = "'%s' is not running in enforce mode" % path - - return (rc, msg) - - -def get_gcc_version(gcc, full=True): - gcc_version = 'none' - if not gcc.startswith('/'): - gcc = '/usr/bin/%s' % (gcc) - if os.path.exists(gcc): - gcc_version = 'unknown' - lines = cmd([gcc, '-v'])[1].strip().splitlines() - version_lines = [x for x in lines if x.startswith('gcc version')] - if len(version_lines) == 1: - gcc_version = " ".join(version_lines[0].split()[2:]) - if not full: - return gcc_version.split()[0] - return gcc_version - - -def is_kdeinit_running(): - '''Test if kdeinit is running''' - # applications that use kdeinit will spawn it if it isn't running in the - # test. This is a problem because it does not exit. This is a helper to - # check for it. - rc, report = cmd(['ps', 'x']) - if 'kdeinit4 Running' not in report: - print("kdeinit not running (you may start/stop any KDE application then run this script again)", file=sys.stderr) - return False - return True - - -def get_pkgconfig_flags(libs=[]): - '''Find pkg-config flags for libraries''' - assert (len(libs) > 0) - rc, pkg_config = cmd(['pkg-config', '--cflags', '--libs'] + libs) - expected = 0 - if rc != expected: - print('Got exit code %d, expected %d\n' % (rc, expected), file=sys.stderr) - assert(rc == expected) - return pkg_config.split() - - -def cap_to_name(cap_num): - '''given an integer, return the capability name''' - rc, output = cmd(['capsh', '--decode=%x' % cap_num]) - expected = 0 - if rc != expected: - print('capsh: got exit code %d, expected %d\n' % (rc, expected), file=sys.stderr) - cap_name = output.strip().split('=')[1] - return cap_name - - -def enumerate_capabilities(): - i = 0 - cap_list = [] - done = False - while not done: - cap_name = cap_to_name(pow(2, i)) - if cap_name == str(i): - done = True - else: - cap_list.append(cap_name) - i += 1 - if i > 64: - done = True - return cap_list - - -class TestDaemon: - '''Helper class to manage daemons consistently''' - def __init__(self, init): - '''Setup daemon attributes''' - self.initscript = init - - def start(self): - '''Start daemon''' - rc, report = cmd([self.initscript, 'start']) - expected = 0 - result = 'Got exit code %d, expected %d\n' % (rc, expected) - time.sleep(2) - if expected != rc: - return (False, result + report) - - if "fail" in report: - return (False, "Found 'fail' in report\n" + report) - - return (True, "") - - def stop(self): - '''Stop daemon''' - rc, report = cmd([self.initscript, 'stop']) - expected = 0 - result = 'Got exit code %d, expected %d\n' % (rc, expected) - if expected != rc: - return (False, result + report) - - if "fail" in report: - return (False, "Found 'fail' in report\n" + report) - - return (True, "") - - def reload(self): - '''Reload daemon''' - rc, report = cmd([self.initscript, 'force-reload']) - expected = 0 - result = 'Got exit code %d, expected %d\n' % (rc, expected) - if expected != rc: - return (False, result + report) - - if "fail" in report: - return (False, "Found 'fail' in report\n" + report) - - return (True, "") - - def restart(self): - '''Restart daemon''' - (res, str) = self.stop() - if not res: - return (res, str) - - (res, str) = self.start() - if not res: - return (res, str) - - return (True, "") - - def force_restart(self): - '''Restart daemon even if already stopped''' - (res, str) = self.stop() - - (res, str) = self.start() - if not res: - return (res, str) - - return (True, "") - - def status(self): - '''Check daemon status''' - rc, report = cmd([self.initscript, 'status']) - expected = 0 - result = 'Got exit code %d, expected %d\n' % (rc, expected) - if expected != rc: - return (False, result + report) - - if "fail" in report: - return (False, "Found 'fail' in report\n" + report) - - return (True, "") - - -class TestlibManager(object): - '''Singleton class used to set up per-test-run information''' - def __init__(self): - # Set glibc aborts to dump to stderr instead of the tty so test output - # is more sane. - os.environ.setdefault('LIBC_FATAL_STDERR_', '1') - - # check verbosity - self.verbosity = False - if (len(sys.argv) > 1 and '-v' in sys.argv[1:]): - self.verbosity = True - - # Load LSB release file - self.lsb_release = dict() - if os.path.exists('/usr/bin/lsb_release') and not os.path.exists('/bin/lsb_release'): - for line in subprocess.Popen(['lsb_release', '-a'], stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True).communicate()[0].splitlines(): - field, value = line.split(':', 1) - value = value.strip() - field = field.strip() - # Convert numerics - try: - value = float(value) - except: - pass - self.lsb_release.setdefault(field, value) - if not self.lsb_release: - self.lsb_release = { - 'Distributor ID': 'Ubuntu Core', - 'Description': 'Ubuntu Core 16', - 'Release': 16, - 'Codename': 'xenial', - } - - # FIXME: hack OEM releases into known-Ubuntu versions - if self.lsb_release['Distributor ID'] == "HP MIE (Mobile Internet Experience)": - if self.lsb_release['Release'] == 1.0: - self.lsb_release['Distributor ID'] = "Ubuntu" - self.lsb_release['Release'] = 8.04 - else: - raise OSError("Unknown version of HP MIE") - - # FIXME: hack to assume a most-recent release if we're not - # running under Ubuntu. - if self.lsb_release['Distributor ID'] not in ["Ubuntu", "Linaro"]: - self.lsb_release['Release'] = 10000 - # Adjust Linaro release to pretend to be Ubuntu - if self.lsb_release['Distributor ID'] in ["Linaro"]: - self.lsb_release['Distributor ID'] = "Ubuntu" - self.lsb_release['Release'] -= 0.01 - - # Load arch - if not os.path.exists('/usr/bin/dpkg'): - machine = cmd(['uname', '-m'])[1].strip() - if machine.endswith('86'): - self.dpkg_arch = 'i386' - elif machine.endswith('_64'): - self.dpkg_arch = 'amd64' - elif machine.startswith('arm'): - self.dpkg_arch = 'armel' - else: - raise ValueError("Unknown machine type '%s'" % (machine)) - else: - self.dpkg_arch = cmd(['dpkg', '--print-architecture'])[1].strip() - - # Find kernel version - self.kernel_is_ubuntu = False - self.kernel_version_signature = None - self.kernel_version = cmd(["uname", "-r"])[1].strip() - versig = '/proc/version_signature' - if os.path.exists(versig): - self.kernel_is_ubuntu = True - self.kernel_version_signature = open(versig).read().strip() - self.kernel_version_ubuntu = self.kernel_version - elif os.path.exists('/usr/bin/dpkg'): - # this can easily be inaccurate but is only an issue for Dapper - rc, out = cmd(['dpkg', '-l', 'linux-image-%s' % (self.kernel_version)]) - if rc == 0: - self.kernel_version_signature = out.strip().split('\n').pop().split()[2] - self.kernel_version_ubuntu = self.kernel_version_signature - if self.kernel_version_signature is None: - # Attempt to fall back to something for non-Debian-based - self.kernel_version_signature = self.kernel_version - self.kernel_version_ubuntu = self.kernel_version - # Build ubuntu version without hardware suffix - try: - self.kernel_version_ubuntu = "-".join([x for x in self.kernel_version_signature.split(' ')[1].split('-') if re.search('^[0-9]', x)]) - except: - pass - - # Find gcc version - self.gcc_version = get_gcc_version('gcc') - - # Find libc - self.path_libc = [x.split()[2] for x in cmd(['ldd', '/bin/ls'])[1].splitlines() if x.startswith('\tlibc.so.')][0] - - # Report self - if self.verbosity: - kernel = self.kernel_version_ubuntu - if kernel != self.kernel_version_signature: - kernel += " (%s)" % (self.kernel_version_signature) - print("Running test: '%s' distro: '%s %.2f' kernel: '%s' arch: '%s' uid: %d/%d SUDO_USER: '%s')" % ( - sys.argv[0], - self.lsb_release['Distributor ID'], - self.lsb_release['Release'], - kernel, - self.dpkg_arch, - os.geteuid(), os.getuid(), - os.environ.get('SUDO_USER', '')), file=sys.stdout) - sys.stdout.flush() - - # Additional heuristics - # if os.environ.get('SUDO_USER', os.environ.get('USER', '')) in ['mdeslaur']: - # sys.stdout.write("Replying to Marc Deslauriers in http://launchpad.net/bugs/%d: " % random.randint(600000, 980000)) - # sys.stdout.flush() - # time.sleep(0.5) - # sys.stdout.write("destroyed\n") - # time.sleep(0.5) - - def hello(self, msg): - print("Hello from %s" % (msg), file=sys.stderr) -# The central instance -manager = TestlibManager() - - -class TestlibCase(unittest.TestCase): - def __init__(self, *args): - '''This is called for each TestCase test instance, which isn't much better - than SetUp.''' - - unittest.TestCase.__init__(self, *args) - - # Attach to and duplicate dicts from manager singleton - self.manager = manager - # self.manager.hello(repr(self) + repr(*args)) - self.my_verbosity = self.manager.verbosity - self.lsb_release = self.manager.lsb_release - self.dpkg_arch = self.manager.dpkg_arch - self.kernel_version = self.manager.kernel_version - self.kernel_version_signature = self.manager.kernel_version_signature - self.kernel_version_ubuntu = self.manager.kernel_version_ubuntu - self.kernel_is_ubuntu = self.manager.kernel_is_ubuntu - self.gcc_version = self.manager.gcc_version - self.path_libc = self.manager.path_libc - - def version_compare(self, one, two): - if 'version_compare' in dir(apt_pkg): - return apt_pkg.version_compare(one, two) - else: - return apt_pkg.VersionCompare(one, two) - - def assertFileType(self, filename, filetype, strict=True): - '''Checks the file type of the file specified''' - - (rc, report, out) = self._testlib_shell_cmd(["/usr/bin/file", "-b", filename]) - out = out.strip() - expected = 0 - # Absolutely no idea why this happens on Hardy - if self.lsb_release['Release'] == 8.04 and rc == 255 and len(out) > 0: - rc = 0 - result = 'Got exit code %d, expected %d:\n%s\n' % (rc, expected, report) - self.assertEqual(expected, rc, result) - - if strict: - filetype = '^%s$' % (filetype) - else: - # accept if the beginning of the line matches - filetype = '^%s' % (filetype) - result = 'File type reported by file: [%s], expected regex: [%s]\n' % (out, filetype) - self.assertNotEqual(None, re.search(filetype, out), result) - - def yank_commonname_from_cert(self, certfile): - '''Extract the commonName from a given PEM''' - rc, out = cmd(['openssl', 'asn1parse', '-in', certfile]) - if rc == 0: - ready = False - for line in out.splitlines(): - if ready: - return line.split(':')[-1] - if ':commonName' in line: - ready = True - return socket.getfqdn() - - def announce(self, text): - if self.my_verbosity: - print("(%s) " % (text), file=sys.stderr, end='') - sys.stdout.flush() - - def make_clean(self): - rc, output = self.shell_cmd(['make', 'clean']) - self.assertEqual(rc, 0, output) - - def get_makefile_compiler(self): - # Find potential compiler name - compiler = 'gcc' - if os.path.exists('Makefile'): - for line in open('Makefile'): - if line.startswith('CC') and '=' in line: - items = [x.strip() for x in line.split('=')] - if items[0] == 'CC': - compiler = items[1] - break - return compiler - - def make_target(self, target, expected=0): - '''Compile a target and report output''' - - compiler = self.get_makefile_compiler() - rc, output = self.shell_cmd(['make', target]) - self.assertEqual(rc, expected, 'rc(%d)!=%d:\n' % (rc, expected) + output) - self.assertTrue('%s ' % (compiler) in output, 'Expected "%s":' % (compiler) + output) - return output - - # call as return testlib.skipped() - def _skipped(self, reason=""): - '''Provide a visible way to indicate that a test was skipped''' - if reason != "": - reason = ': %s' % (reason) - self.announce("skipped%s" % (reason)) - return False - - def _testlib_shell_cmd(self, args, stdin=None, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, env=None): - argstr = "'" + "', '".join(args).strip() + "'" - rc, out = cmd(args, stdin=stdin, stdout=stdout, stderr=stderr, env=env) - report = 'Command: ' + argstr + '\nOutput:\n' + out - return rc, report, out - - def shell_cmd(self, args, stdin=None): - return cmd(args, stdin=stdin) - - def assertShellExitEquals(self, expected, args, stdin=None, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, env=None, msg="", bare_report=False): - '''Test a shell command matches a specific exit code''' - rc, report, out = self._testlib_shell_cmd(args, stdin=stdin, stdout=stdout, stderr=stderr, env=env) - result = 'Got exit code %d, expected %d\n' % (rc, expected) - self.assertEqual(expected, rc, msg + result + report) - if bare_report: - return out - else: - return report - - # make sure exit value is in a list of expected values - def assertShellExitIn(self, expected, args, stdin=None, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, msg=""): - '''Test a shell command matches a specific exit code''' - rc, report, out = self._testlib_shell_cmd(args, stdin=stdin, stdout=stdout, stderr=stderr) - result = 'Got exit code %d, expected one of %s\n' % (rc, ', '.join(map(str, expected))) - self.assertIn(rc, expected, msg + result + report) - - def assertShellExitNotEquals(self, unwanted, args, stdin=None, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, msg=""): - '''Test a shell command doesn't match a specific exit code''' - rc, report, out = self._testlib_shell_cmd(args, stdin=stdin, stdout=stdout, stderr=stderr) - result = 'Got (unwanted) exit code %d\n' % rc - self.assertNotEqual(unwanted, rc, msg + result + report) - - def assertShellOutputContains(self, text, args, stdin=None, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, msg="", invert=False, expected=None): - '''Test a shell command contains a specific output''' - rc, report, out = self._testlib_shell_cmd(args, stdin=stdin, stdout=stdout, stderr=stderr) - result = 'Got exit code %d. Looking for text "%s"\n' % (rc, text) - if not invert: - self.assertTrue(text in out, msg + result + report) - else: - self.assertFalse(text in out, msg + result + report) - if expected is not None: - result = 'Got exit code %d. Expected %d (%s)\n' % (rc, expected, " ".join(args)) - self.assertEqual(rc, expected, msg + result + report) - - def assertShellOutputEquals(self, text, args, stdin=None, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, msg="", invert=False, expected=None): - '''Test a shell command matches a specific output''' - rc, report, out = self._testlib_shell_cmd(args, stdin=stdin, stdout=stdout, stderr=stderr) - result = 'Got exit code %d. Looking for exact text "%s" (%s)\n' % (rc, text, " ".join(args)) - if not invert: - self.assertEqual(text, out, msg + result + report) - else: - self.assertNotEqual(text, out, msg + result + report) - if expected is not None: - result = 'Got exit code %d. Expected %d (%s)\n' % (rc, expected, " ".join(args)) - self.assertEqual(rc, expected, msg + result + report) - - def _word_find(self, report, content, invert=False): - '''Check for a specific string''' - if invert: - warning = 'Found "%s"\n' % content - self.assertTrue(content not in report, warning + report) - else: - warning = 'Could not find "%s"\n' % content - self.assertTrue(content in report, warning + report) - - def _test_sysctl_value(self, path, expected, msg=None, exists=True): - sysctl = '/proc/sys/%s' % (path) - self.assertEqual(exists, os.path.exists(sysctl), sysctl) - value = None - if exists: - with open(sysctl) as sysctl_fd: - value = int(sysctl_fd.read()) - report = "%s is not %d: %d" % (sysctl, expected, value) - if msg: - report += " (%s)" % (msg) - self.assertEqual(value, expected, report) - return value - - def set_sysctl_value(self, path, desired): - sysctl = '/proc/sys/%s' % (path) - self.assertTrue(os.path.exists(sysctl), "%s does not exist" % (sysctl)) - with open(sysctl, 'w') as sysctl_fh: - sysctl_fh.write(str(desired)) - self._test_sysctl_value(path, desired) - - def kernel_at_least(self, introduced): - return self.version_compare(self.kernel_version_ubuntu, - introduced) >= 0 - - def kernel_claims_cve_fixed(self, cve): - changelog = "/usr/share/doc/linux-image-%s/changelog.Debian_gz" % (self.kernel_version) - if os.path.exists(changelog): - for line in gzip.open(changelog): - if cve in line and "revert" not in line and "Revert" not in line: - return True - return False - - def install_builddeps(self, src_pkg): - rc, report = _run_apt_command([src_pkg], 'build-dep') - self.assertEqual(0, rc, 'Failed to install build-deps for %s\nOutput:\n%s' % (src_pkg, report)) - - def install_package(self, package): - rc, report = _run_apt_command([package], 'install') - self.assertEqual(0, rc, 'Failed to install package %s\nOutput:\n%s' % (package, report)) - - def install_packages(self, pkg_list): - rc, report = _run_apt_command(pkg_list, 'install') - self.assertEqual(0, rc, 'Failed to install packages %s\nOutput:\n%s' % (','.join(pkg_list), report)) - - -class TestGroup: - '''Create a temporary test group and remove it again in the dtor.''' - - def __init__(self, group=None, lower=False): - '''Create a new group''' - - self.group = None - if group: - if group_exists(group): - raise ValueError('group name already exists') - else: - while(True): - group = random_string(7, lower=lower) - if not group_exists(group): - break - - assert subprocess.call(['groupadd', group]) == 0 - self.group = group - g = grp.getgrnam(self.group) - self.gid = g[2] - - def __del__(self): - '''Remove the created group.''' - - if self.group: - rc, report = cmd(['groupdel', self.group]) - assert rc == 0 - - -class TestUser: - '''Create a temporary test user and remove it again in the dtor.''' - - def __init__(self, login=None, home=True, group=None, uidmin=None, lower=False, shell=None): - '''Create a new user account with a random password. - - By default, the login name is random, too, but can be explicitly - specified with 'login'. By default, a home directory is created, this - can be suppressed with 'home=False'.''' - - self.login = None - - if os.geteuid() != 0: - raise ValueError("You must be root to run this test") - - if login: - if login_exists(login): - raise ValueError('login name already exists') - else: - while(True): - login = 't' + random_string(7, lower=lower) - if not login_exists(login): - break - - self.salt = random_string(2) - self.password = random_string(8, lower=lower) - self.crypted = crypt.crypt(self.password, self.salt) - - creation = ['useradd', '-p', self.crypted] - if home: - creation += ['-m'] - if group: - creation += ['-G', group] - if uidmin: - creation += ['-K', 'UID_MIN=%d' % uidmin] - if shell: - creation += ['-s', shell] - creation += [login] - assert subprocess.call(creation) == 0 - # Set GECOS - assert subprocess.call(['usermod', '-c', 'Buddy %s' % (login), login]) == 0 - - self.login = login - p = pwd.getpwnam(self.login) - self.uid = p[2] - self.gid = p[3] - self.gecos = p[4] - self.home = p[5] - self.shell = p[6] - - def __del__(self): - '''Remove the created user account.''' - - if self.login: - # sanity check the login name so we don't accidentally wipe too much - if len(self.login) > 3 and '/' not in self.login: - subprocess.call(['rm', '-rf', '/home/' + self.login, '/var/mail/' + self.login]) - rc, report = cmd(['userdel', '-f', self.login]) - assert rc == 0 - - def add_to_group(self, group): - '''Add user to the specified group name''' - rc, report = cmd(['usermod', '-G', group, self.login]) - if rc != 0: - print(report) - assert rc == 0 - - -class AddUser: - '''Create a temporary test user and remove it again in the dtor.''' - - def __init__(self, login=None, home=True, encrypt_home=False, group=None, lower=False, shell=None): - '''Create a new user account with a random password. - - By default, the login name is random, too, but can be explicitly - specified with 'login'. By default, a home directory is created, this - can be suppressed with 'home=False'. - - This class differs from the TestUser class in that the adduser/deluser - tools are used rather than the useradd/user/del tools. The adduser - program is the only commandline program that can be used to add a new - user with an encrypted home directory. It is possible that the AddUser - class may replace the TestUser class in the future.''' - - self.login = None - - if os.geteuid() != 0: - raise ValueError("You must be root to run this test") - - if login: - if login_exists(login): - raise ValueError('login name already exists') - else: - while(True): - login = 't' + random_string(7, lower=True) - if not login_exists(login): - break - - self.password = random_string(8, lower=lower) - - creation = ['adduser', '--quiet'] - - if not home: - creation += ['--no-create-home'] - elif encrypt_home: - creation += ['--encrypt-home'] - - if shell: - creation += ['--shell', shell] - - creation += ['--gecos', 'Buddy %s' % (login), login] - - child = pexpect.spawn(creation.pop(0), creation, timeout=5) - assert child.expect('Enter new UNIX password:') == 0 - child.sendline(self.password) - assert child.expect('Retype new UNIX password:') == 0 - child.sendline(self.password) - - child.wait() - child.close() - assert child.exitstatus == 0 - assert child.signalstatus is None - - self.login = login - - if group: - assert self.add_to_group(group) == 0 - - p = pwd.getpwnam(self.login) - self.uid = p[2] - self.gid = p[3] - self.gecos = p[4] - self.home = p[5] - self.shell = p[6] - - def __del__(self): - '''Remove the created user account.''' - - if self.login: - # sanity check the login name so we don't accidentally wipe too much - rc, report = cmd(['deluser', '--remove-home', self.login]) - assert rc == 0 - - def add_to_group(self, group): - '''Add user to the specified group name''' - rc, report = cmd(['adduser', self.login, group]) - if rc != 0: - print(report) - assert rc == 0 - - -# Timeout handler using alarm() from John P. Speno's Pythonic Avocado -class TimeoutFunctionException(Exception): - """Exception to raise on a timeout""" - pass - - -class TimeoutFunction: - def __init__(self, function, timeout): - self.timeout = timeout - self.function = function - - def handle_timeout(self, signum, frame): - raise TimeoutFunctionException() - - def __call__(self, *args, **kwargs): - old = signal.signal(signal.SIGALRM, self.handle_timeout) - signal.alarm(self.timeout) - try: - result = self.function(*args, **kwargs) - finally: - signal.signal(signal.SIGALRM, old) - signal.alarm(0) - return result - - -def main(): - print("hi") - unittest.main() -- cgit v1.2.3 From 53a9dace03fea49a2de030e7cd1999155637b6ef Mon Sep 17 00:00:00 2001 From: Sylvain Pineau Date: Fri, 17 Jul 2020 10:49:33 +0200 Subject: bin:*.py: Fix all Flake8 errors --- bin/accelerometer_test.py | 29 ++-- bin/alsa_tests.py | 1 + bin/ansi_parser.py | 3 +- bin/audio_driver_info.py | 7 +- bin/audio_test.py | 257 ++++++++++++++++++----------------- bin/battery_test.py | 8 +- bin/bluetooth_test.py | 1 + bin/bmc_info.py | 10 +- bin/boot_mode_test_snappy.py | 1 - bin/brightness_test.py | 21 +-- bin/bt_connect.py | 1 + bin/bt_list_adapters.py | 2 +- bin/color_depth_info.py | 3 +- bin/cpu_offlining.py | 1 + bin/cpu_topology.py | 5 +- bin/cpuid.py | 6 +- bin/create_connection.py | 8 +- bin/dmitest.py | 32 ++--- bin/edid_cycle.py | 6 +- bin/evdev_touch_test.py | 2 +- bin/fan_reaction_test.py | 8 +- bin/frequency_governors_test.py | 222 +++++++++++++++++++----------- bin/fresh_rate_info.py | 5 +- bin/get_make_and_model.py | 12 +- bin/gpu_test.py | 24 ++-- bin/graphic_memory_info.py | 7 +- bin/graphics_driver.py | 10 +- bin/graphics_modes_info.py | 10 +- bin/graphics_stress_test.py | 46 ++++--- bin/gst_pipeline_test.py | 29 ++-- bin/hdd_parking.py | 10 +- bin/hotkey_tests.py | 1 + bin/key_test.py | 14 +- bin/keyboard_test.py | 1 + bin/lock_screen_watcher.py | 5 +- bin/lsmod_info.py | 1 + bin/manage_compiz_plugin.py | 8 +- bin/memory_compare.py | 4 +- bin/network.py | 25 ++-- bin/network_check.py | 8 +- bin/network_ntp_test.py | 12 +- bin/network_reconnect_resume_test.py | 10 +- bin/network_restart.py | 24 ++-- bin/optical_read_test.py | 29 ++-- bin/pm_log_check.py | 16 ++- bin/pm_test.py | 27 ++-- bin/process_wait.py | 9 +- bin/pulse-active-port-change.py | 29 ++-- bin/removable_storage_test.py | 49 +++---- bin/removable_storage_watcher.py | 87 +++++++----- bin/resolution_test.py | 8 +- bin/rotation_test.py | 4 +- bin/sleep_test.py | 9 +- bin/sleep_test_log_check.py | 15 +- bin/sleep_time_check.py | 14 +- bin/stress_ng_test.py | 2 +- bin/test_bt_keyboard.py | 2 +- bin/touchpad_driver_info.py | 4 +- bin/touchpad_test.py | 5 +- bin/virtualization.py | 29 +--- bin/volume_test.py | 78 ++++++----- bin/wifi_ap_wizard.py | 1 + bin/wifi_client_test_netplan.py | 8 +- bin/wifi_master_mode.py | 2 +- bin/wifi_time2reconnect.py | 11 +- bin/window_test.py | 7 +- bin/wwan_tests.py | 25 +--- bin/xrandr_cycle.py | 16 +-- 68 files changed, 764 insertions(+), 622 deletions(-) diff --git a/bin/accelerometer_test.py b/bin/accelerometer_test.py index 2d95ca8..93ae07f 100755 --- a/bin/accelerometer_test.py +++ b/bin/accelerometer_test.py @@ -23,6 +23,7 @@ The purpose of this script is to simply interact with an onboard accelerometer, and check to be sure that the x, y, z axis respond to physical movement of hardware. ''' + from argparse import ArgumentParser import gi import logging @@ -34,9 +35,10 @@ import time gi.require_version('Gdk', '3.0') gi.require_version('GLib', '2.0') gi.require_version("Gtk", "3.0") -from gi.repository import Gdk, GLib, Gtk -from subprocess import Popen, PIPE, check_output, STDOUT, CalledProcessError -from checkbox_support.parsers.modinfo import ModinfoParser +from gi.repository import Gdk, GLib, Gtk # noqa: E402 +from subprocess import Popen, PIPE, check_output, STDOUT # noqa: E402 +from subprocess import CalledProcessError # noqa: E402 +from checkbox_support.parsers.modinfo import ModinfoParser # noqa: E402 handler = logging.StreamHandler() logger = logging.getLogger() @@ -86,7 +88,7 @@ class AccelerometerUI(Gtk.Window): def update_axis_icon(self, direction): """Change desired directional icon to checkmark""" - exec('self.%s_icon.set_from_stock' % (direction) \ + exec('self.%s_icon.set_from_stock' % (direction) + '(Gtk.STOCK_YES, size=Gtk.IconSize.BUTTON)') def update_debug_label(self, text): @@ -131,7 +133,7 @@ class AxisData(threading.Thread): self.x_test_pool = ["up", "down"] self.y_test_pool = ["left", "right"] - if self.ui == None: + if self.ui is None: self.ui.enabled = False def grab_current_readings(self): @@ -223,8 +225,8 @@ class AxisData(threading.Thread): def insert_supported_module(oem_module): """Try and insert supported module to see if we get any init errors""" try: - stream = check_output(['modinfo', oem_module], stderr=STDOUT, - universal_newlines=True) + stream = check_output( + ['modinfo', oem_module], stderr=STDOUT, universal_newlines=True) except CalledProcessError as err: print("Error accessing modinfo for %s: " % oem_module, file=sys.stderr) print(err.output, file=sys.stderr) @@ -232,7 +234,7 @@ def insert_supported_module(oem_module): parser = ModinfoParser(stream) module = os.path.basename(parser.get_field('filename')) - + insmod_output = Popen(['insmod %s' % module], stderr=PIPE, shell=True, universal_newlines=True) @@ -259,10 +261,10 @@ def check_module_status(): if "Permission denied" in error: raise PermissionException(error) - vendor_data = re.findall("Vendor:\s.*", output) + vendor_data = re.findall(r"Vendor:\s.*", output) try: manufacturer = vendor_data[0].split(":")[1].strip() - except IndexError as exception: + except IndexError: logging.error("Failed to find Manufacturing data") return @@ -275,8 +277,8 @@ def check_module_status(): oem_module = oem_driver_pool.get(vendor) break # We've found our desired module to probe. - if oem_module != None: - if insert_supported_module(oem_module) != None: + if oem_module is not None: + if insert_supported_module(oem_module) is not None: logging.error("Failed module insertion") # Check dmesg status for supported module driver_status = Popen(['dmesg'], stdout=PIPE, universal_newlines=True) @@ -347,5 +349,6 @@ def main(): # Reading is not instant. time.sleep(5) + if __name__ == '__main__': - main(); + main() diff --git a/bin/alsa_tests.py b/bin/alsa_tests.py index 16fb044..e919570 100755 --- a/bin/alsa_tests.py +++ b/bin/alsa_tests.py @@ -168,5 +168,6 @@ def main(): device = None return(actions[args.action](args.duration, device)) + if __name__ == '__main__': sys.exit(main()) diff --git a/bin/ansi_parser.py b/bin/ansi_parser.py index c7cf26c..b5cec30 100755 --- a/bin/ansi_parser.py +++ b/bin/ansi_parser.py @@ -128,7 +128,8 @@ def parse_filename(filename): def main(args): usage = "Usage: %prog [OPTIONS] [FILE...]" parser = OptionParser(usage=usage) - parser.add_option("-o", "--output", + parser.add_option( + "-o", "--output", metavar="FILE", help="File where to output the result.") (options, args) = parser.parse_args(args) diff --git a/bin/audio_driver_info.py b/bin/audio_driver_info.py index b9bfaf8..424b125 100755 --- a/bin/audio_driver_info.py +++ b/bin/audio_driver_info.py @@ -41,7 +41,6 @@ class PacmdAudioDevice(): print("Error running %s:" % cmd, file=sys.stderr) print(err.output, file=sys.stderr) return None - if not stream: print("Error: modinfo returned nothing", file=sys.stderr) return None @@ -74,7 +73,8 @@ def list_device_info(): pacmd_entries = check_output(["pacmd", "list-%ss" % vtype], universal_newlines=True) except Exception as e: - print("Error when running pacmd list-%ss: %s" % (vtype, e), + print( + "Error when running pacmd list-%ss: %s" % (vtype, e), file=sys.stderr) return 1 @@ -102,8 +102,9 @@ def list_device_info(): def main(): parser = ArgumentParser("List audio device and driver information") - args = parser.parse_args() + parser.parse_args() return list_device_info() + if __name__ == "__main__": sys.exit(main()) diff --git a/bin/audio_test.py b/bin/audio_test.py index 7bd6c44..2a56df4 100755 --- a/bin/audio_test.py +++ b/bin/audio_test.py @@ -13,7 +13,7 @@ import time try: import gi gi.require_version('GLib', '2.0') - gi.require_version('Gst','1.0') + gi.require_version('Gst', '1.0') from gi.repository import GObject from gi.repository import Gst from gi.repository import GLib @@ -29,26 +29,27 @@ try: except ImportError: from collections import Callable # backward compatible -#Frequency bands for FFT +# Frequency bands for FFT BINS = 256 -#How often to take a sample and do FFT on it. +# How often to take a sample and do FFT on it. FFT_INTERVAL = 100000000 # In nanoseconds, so this is every 1/10th second -#Sampling frequency. The effective maximum frequency we can analyze is -#half of this (see Nyquist's theorem) +# Sampling frequency. The effective maximum frequency we can analyze is +# half of this (see Nyquist's theorem) SAMPLING_FREQUENCY = 44100 -#The default test frequency is in the middle of the band that contains 5000Hz -#This frequency was determined experimentally to be high enough but more -#reliable than others we tried. +# The default test frequency is in the middle of the band that contains 5000Hz +# This frequency was determined experimentally to be high enough but more +# reliable than others we tried. DEFAULT_TEST_FREQUENCY = 5035 -#only sample a signal when peak level is in this range (in dB attenuation, -#0 means no attenuation (and horrible clipping). +# only sample a signal when peak level is in this range (in dB attenuation, +# 0 means no attenuation (and horrible clipping). REC_LEVEL_RANGE = (-2.0, -12.0) -#For our test signal to be considered present, it has to be this much higher -#than the base level (minimum magnitude). This is in dB. +# For our test signal to be considered present, it has to be this much higher +# than the base level (minimum magnitude). This is in dB. MAGNITUDE_THRESHOLD = 2.5 -#Volume for the sample tone (in %) +# Volume for the sample tone (in %) PLAY_VOLUME = 70 + class PIDController(object): """ A Proportional-Integrative-Derivative controller (PID) controls a process's output to try to maintain a desired output value (known as @@ -141,7 +142,7 @@ class PAVolumeController(object): 'set-%s-volume' % (self.pa_types[self.type]), str(self.identifier[0]), str(int(volume)) + "%"] - if False == self.method(command): + if not self.method(command): return False self._volume = volume return True @@ -159,7 +160,7 @@ class PAVolumeController(object): 'set-%s-mute' % (self.pa_types[self.type]), str(self.identifier[0]), mute] - if False == self.method(command): + if not self.method(command): return False return True @@ -187,10 +188,10 @@ class PAVolumeController(object): return None command = ['pactl', 'list', self.pa_types[type] + "s", 'short'] - #Expect lines of this form (field separator is tab): - #\t\t\t\t - #What we need to return is the ID for the first element on this list - #that does not contain auto_null or monitor. + # Expect lines of this form (field separator is tab): + # \t\t\t\t + # What we need to return is the ID for the first element on this list + # that does not contain auto_null or monitor. pa_info = self.method(command) valid_elements = None @@ -203,17 +204,17 @@ class PAVolumeController(object): self.logger.error("No valid PulseAudio elements" " for %s" % (self.type)) return None - #We only need the pulseaudio numeric ID and long name for each element + # We only need the pulseaudio numeric ID and long name for each element valid_elements = [(int(e.split()[0]), e.split()[1]) for e in valid_elements] return valid_elements[0] def _pactl_output(self, command): - #This method mainly calls pactl (hence the name). Since pactl may - #return a failure if the audio layer is not yet initialized, we will - #try running a few times in case of failure. All our invocations of - #pactl should be "idempotent" so repeating them should not have - #any bad effects. + # This method mainly calls pactl (hence the name). Since pactl may + # return a failure if the audio layer is not yet initialized, we will + # try running a few times in case of failure. All our invocations of + # pactl should be "idempotent" so repeating them should not have + # any bad effects. for attempt in range(0, 3): try: return subprocess.check_output(command, @@ -242,8 +243,8 @@ class SpectrumAnalyzer(object): self.number_of_samples = 0 self.wanted_samples = wanted_samples self.sampling_frequency = sampling_frequency - #Frequencies should contain *real* frequency which is half of - #the sampling frequency + # Frequencies should contain *real* frequency which is half of + # the sampling frequency self.frequencies = [((sampling_frequency / 2.0) / points) * i for i in range(points)] @@ -259,14 +260,14 @@ class SpectrumAnalyzer(object): self.number_of_samples += 1 def frequencies_with_peak_magnitude(self, threshold=1.0): - #First establish the base level + # First establish the base level per_magnitude_bins = collections.defaultdict(int) for magnitude in self.spectrum: per_magnitude_bins[magnitude] += 1 base_level = max(per_magnitude_bins, key=lambda x: per_magnitude_bins[x]) - #Now return all values that are higher (more positive) - #than base_level + threshold + # Now return all values that are higher (more positive) + # than base_level + threshold peaks = [] for i in range(1, len(self.spectrum) - 1): first_index = i - 1 @@ -282,10 +283,10 @@ class SpectrumAnalyzer(object): """Convenience function to tell me which band a frequency is contained in """ - #Note that actual frequencies are half of what the sampling - #frequency would tell us. If SF is 44100 then maximum actual - #frequency is 22050, and if I have 10 frequency bins each will - #contain only 2205 Hz, not 4410 Hz. + # Note that actual frequencies are half of what the sampling + # frequency would tell us. If SF is 44100 then maximum actual + # frequency is 22050, and if I have 10 frequency bins each will + # contain only 2205 Hz, not 4410 Hz. max_frequency = self.sampling_frequency / 2 if frequency > max_frequency or frequency < 0: return None @@ -339,34 +340,33 @@ class GStreamerMessageHandler(object): if message.type == Gst.MessageType.ELEMENT: message_name = message.get_structure().get_name() if message_name == 'spectrum': - #TODO: Due to an upstream bug, a structure's get_value method - #doesn't work if the value in question is an array (as is the - #case with the magnitudes). - #https://bugzilla.gnome.org/show_bug.cgi?id=693168 - #We have to resort to parsing the string representation of the - #structure. It's an ugly hack but it works. - #Ideally we'd be able to say this to get fft_magnitudes: - #message.get_structure.get_value('magnitude'). - #If an upstream fix ever makes it into gstreamer, - #remember to remove this hack and the parse_spectrum - #method + # TODO: Due to an upstream bug, a structure's get_value method + # doesn't work if the value in question is an array (as is the + # case with the magnitudes). + # https://bugzilla.gnome.org/show_bug.cgi?id=693168 + # We have to resort to parsing the string representation of the + # structure. It's an ugly hack but it works. + # Ideally we'd be able to say this to get fft_magnitudes: + # message.get_structure.get_value('magnitude'). + # If an upstream fix ever makes it into gstreamer, + # remember to remove this hack and the parse_spectrum method struct_string = message.get_structure().to_string() structure = parse_spectrum_message_structure(struct_string) fft_magnitudes = structure['magnitude'] self.spectrum_method(self.spectrum_analyzer, fft_magnitudes) if message_name == 'level': - #peak_value is our process feedback - #It's returned as an array, so I need the first (and only) - #element + # peak_value is our process feedback + # It's returned as an array, so I need the first (and only) + # element peak_value = message.get_structure().get_value('peak')[0] self.level_method(peak_value, self.pid_controller, self.volume_controller) - #Adjust recording level + # Adjust recording level def level_method(self, level, pid_controller, volume_controller): - #If volume controller doesn't return a valid volume, - #we can't control it :( + # If volume controller doesn't return a valid volume, + # we can't control it :( current_volume = volume_controller.get_volume() if current_volume is None: self.logger.error("Unable to control recording volume." @@ -375,19 +375,20 @@ class GStreamerMessageHandler(object): self.current_level = level change = pid_controller.input_change(level, 0.10) if self.logger: - self.logger.debug("Peak level: %(peak_level).2f, " - "volume: %(volume)d%%, Volume change: %(change)f%%" % - {'peak_level': level, - 'change': change, - 'volume': current_volume}) + self.logger.debug( + "Peak level: %(peak_level).2f, " + "volume: %(volume)d%%, Volume change: %(change)f%%" % { + 'peak_level': level, + 'change': change, + 'volume': current_volume}) volume_controller.set_volume(current_volume + change) - #Only sample if level is within the threshold + # Only sample if level is within the threshold def spectrum_method(self, analyzer, spectrum): if self.rec_level_range[1] <= self.current_level \ or self.current_level <= self.rec_level_range[0]: - self.logger.debug("Sampling, recorded %d samples" % - analyzer.number_of_samples) + self.logger.debug( + "Sampling, recorded %d samples" % analyzer.number_of_samples) analyzer.sample(spectrum) if analyzer.sampling_complete() and self._quit_method: self.logger.info("Sampling complete, ending process") @@ -414,10 +415,11 @@ class GstAudioObject(object): class Player(GstAudioObject): def __init__(self, frequency=DEFAULT_TEST_FREQUENCY, logger=None): super(Player, self).__init__() - self.pipeline_description = ("audiotestsrc wave=sine freq=%s " - "! audioconvert " - "! audioresample " - "! autoaudiosink" % int(frequency)) + self.pipeline_description = ( + "audiotestsrc wave=sine freq=%s " + "! audioconvert " + "! audioresample " + "! autoaudiosink" % int(frequency)) self.logger = logger if self.logger: self.logger.debug(self.pipeline_description) @@ -437,11 +439,11 @@ class Recorder(GstAudioObject): ! audioresample ! spectrum interval=%(fft_interval)s bands = %(bands)s ! wavenc - ! filesink location=%(file)s''' % - {'bands': bins, - 'rate': sampling_frequency, - 'fft_interval': fft_interval, - 'file': output_file}) + ! filesink location=%(file)s''' % { + 'bands': bins, + 'rate': sampling_frequency, + 'fft_interval': fft_interval, + 'file': output_file}) self.logger = logger if self.logger: self.logger.debug(pipeline_description) @@ -457,25 +459,25 @@ class Recorder(GstAudioObject): def parse_spectrum_message_structure(struct_string): - #First let's jsonize this - #This is the message name, which we don't need + # First let's jsonize this + # This is the message name, which we don't need text = struct_string.replace("spectrum, ", "") - #name/value separator in json is : and not = - text = text.replace("=",": ") - #Mutate the {} array notation from the structure to - #[] notation for json. - text = text.replace("{","[") - text = text.replace("}","]") - #Remove a few stray semicolons that aren't needed - text = text.replace(";","") - #Remove the data type fields, as json doesn't need them + # name/value separator in json is : and not = + text = text.replace("=", ": ") + # Mutate the {} array notation from the structure to + # [] notation for json. + text = text.replace("{", "[") + text = text.replace("}", "]") + # Remove a few stray semicolons that aren't needed + text = text.replace(";", "") + # Remove the data type fields, as json doesn't need them text = re.sub(r"\(.+?\)", "", text) - #double-quote the identifiers + # double-quote the identifiers text = re.sub(r"([\w-]+):", r'"\1":', text) - #Wrap the whole thing in brackets + # Wrap the whole thing in brackets text = ("{"+text+"}") - #Try to parse and return something sensible here, even if - #the data was unparsable. + # Try to parse and return something sensible here, even if + # the data was unparsable. try: return json.loads(text) except ValueError: @@ -489,32 +491,38 @@ def process_arguments(): presence of the played frequency, if present it exits with success. """ parser = argparse.ArgumentParser(description=description) - parser.add_argument("-t", "--time", + parser.add_argument( + "-t", "--time", dest='test_duration', action='store', default=30, type=int, help="""Maximum test duration, default %(default)s seconds. It may exit sooner if it determines it has enough data.""") - parser.add_argument("-a", "--audio", + parser.add_argument( + "-a", "--audio", action='store', default="/dev/null", type=str, help="File to save recorded audio in .wav format") - parser.add_argument("-q", "--quiet", + parser.add_argument( + "-q", "--quiet", action='store_true', default=False, help="Be quiet, no output unless there's an error.") - parser.add_argument("-d", "--debug", + parser.add_argument( + "-d", "--debug", action='store_true', default=False, help="Debugging output") - parser.add_argument("-f", "--frequency", + parser.add_argument( + "-f", "--frequency", action='store', default=DEFAULT_TEST_FREQUENCY, type=int, help="Frequency for test signal, default %(default)s Hz") - parser.add_argument("-u", "--spectrum", + parser.add_argument( + "-u", "--spectrum", action='store', type=str, help="""File to save spectrum information for plotting @@ -524,10 +532,10 @@ def process_arguments(): # def main(): - #Get arguments. + # Get arguments. args = process_arguments() - #Setup logging + # Setup logging level = logging.INFO if args.debug: level = logging.DEBUG @@ -535,56 +543,57 @@ def main(): level = logging.ERROR logging.basicConfig(level=level) try: - #Launches recording pipeline. I need to hook up into the gst - #messages. + # Launches recording pipeline. I need to hook up into the gst + # messages. recorder = Recorder(output_file=args.audio, logger=logging) - #Just launches the playing pipeline + # Just launches the playing pipeline player = Player(frequency=args.frequency, logger=logging) except GObject.GError as excp: logging.critical("Unable to initialize GStreamer pipelines: %s", excp) sys.exit(127) - #This just receives a process feedback and tells me how much to change to - #achieve the setpoint + # This just receives a process feedback and tells me how much to change to + # achieve the setpoint pidctrl = PIDController(Kp=0.7, Ki=.01, Kd=0.01, setpoint=REC_LEVEL_RANGE[0]) pidctrl.set_change_limit(5) - #This gathers spectrum data. + # This gathers spectrum data. analyzer = SpectrumAnalyzer(points=BINS, sampling_frequency=SAMPLING_FREQUENCY) - #Volume controllers actually set volumes for their device types. - #we should at least issue a warning + # Volume controllers actually set volumes for their device types. + # we should at least issue a warning recorder.volumecontroller = PAVolumeController(type='input', logger=logging) if not recorder.volumecontroller.get_identifier(): - logging.warning("Unable to get input volume control identifier. " - "Test results will probably be invalid") + logging.warning( + "Unable to get input volume control identifier. " + "Test results will probably be invalid") recorder.volumecontroller.set_volume(0) recorder.volumecontroller.mute(False) player.volumecontroller = PAVolumeController(type='output', logger=logging) if not player.volumecontroller.get_identifier(): - logging.warning("Unable to get output volume control identifier. " - "Test results will probably be invalid") + logging.warning( + "Unable to get output volume control identifier. " + "Test results will probably be invalid") player.volumecontroller.set_volume(PLAY_VOLUME) player.volumecontroller.mute(False) - #This handles the messages from gstreamer and orchestrates - #the passed volume controllers, pid controller and spectrum analyzer - #accordingly. + # This handles the messages from gstreamer and orchestrates + # the passed volume controllers, pid controller and spectrum analyzer + # accordingly. gmh = GStreamerMessageHandler(rec_level_range=REC_LEVEL_RANGE, logger=logging, volumecontroller=recorder.volumecontroller, pidcontroller=pidctrl, spectrum_analyzer=analyzer) - #I need to tell the recorder which method will handle messages. + # I need to tell the recorder which method will handle messages. recorder.register_message_handler(gmh.bus_message_handler) - #Create the loop and add a few triggers -# GObject.threads_init() #Not needed? + # Create the loop and add a few triggers loop = GLib.MainLoop() GLib.timeout_add_seconds(0, player.start) GLib.timeout_add_seconds(0, recorder.start) @@ -595,29 +604,32 @@ def main(): loop.run() - #When the loop ends, set things back to reasonable states + # When the loop ends, set things back to reasonable states player.stop() recorder.stop() player.volumecontroller.set_volume(50) recorder.volumecontroller.set_volume(10) - #See if data gathering was successful. + # See if data gathering was successful. test_band = analyzer.frequency_band_for(args.frequency) - candidate_bands = analyzer.frequencies_with_peak_magnitude(MAGNITUDE_THRESHOLD) + candidate_bands = analyzer.frequencies_with_peak_magnitude( + MAGNITUDE_THRESHOLD) for band in candidate_bands: logging.debug("Band (%.2f,%.2f) contains a magnitude peak" % analyzer.frequencies_for_band(band)) if test_band in candidate_bands: freqs_for_band = analyzer.frequencies_for_band(test_band) - logging.info("PASS: Test frequency of %s in band (%.2f, %.2f) " - "which contains a magnitude peak" % + logging.info( + "PASS: Test frequency of %s in band (%.2f, %.2f) " + "which contains a magnitude peak" % ((args.frequency,) + freqs_for_band)) return_value = 0 else: - logging.info("FAIL: Test frequency of %s is not in one of the " - "bands with magnitude peaks" % args.frequency) + logging.info( + "FAIL: Test frequency of %s is not in one of the " + "bands with magnitude peaks" % args.frequency) return_value = 1 - #Is the microphone broken? + # Is the microphone broken? if len(set(analyzer.spectrum)) <= 1: logging.info("WARNING: Microphone seems broken, didn't even " "record ambient noise") @@ -625,14 +637,17 @@ def main(): if args.spectrum: logging.info("Saving spectrum data for plotting as %s" % args.spectrum) - if not FileDumper().write_to_file(args.spectrum, - ["%s,%s" % t for t in - zip(analyzer.frequencies, - analyzer.spectrum)]): + if ( + not FileDumper().write_to_file( + args.spectrum, + ["%s,%s" % t for t in zip( + analyzer.frequencies, analyzer.spectrum)]) + ): logging.error("Couldn't save spectrum data for plotting", file=sys.stderr) return return_value + if __name__ == "__main__": sys.exit(main()) diff --git a/bin/battery_test.py b/bin/battery_test.py index b2682c7..112ef5e 100755 --- a/bin/battery_test.py +++ b/bin/battery_test.py @@ -1,14 +1,13 @@ #!/usr/bin/env python3 import gi -import os import time import re import subprocess import sys import argparse gi.require_version('Gio', '2.0') -from gi.repository import Gio +from gi.repository import Gio # noqa: E402 class Battery(): @@ -89,10 +88,10 @@ def get_battery_state(): def validate_battery_info(battery): if battery is None: - print ("Error obtaining battery info") + print("Error obtaining battery info") return False if battery._state != "discharging": - print ("Error: battery is not discharging, test will not be valid") + print("Error: battery is not discharging, test will not be valid") return False return True @@ -173,5 +172,6 @@ def main(): return(battery_life(battery_before, battery_after, test_time)) + if __name__ == "__main__": sys.exit(main()) diff --git a/bin/bluetooth_test.py b/bin/bluetooth_test.py index 8fafad6..fd3c956 100755 --- a/bin/bluetooth_test.py +++ b/bin/bluetooth_test.py @@ -145,5 +145,6 @@ def main(): format='%(levelname)s: %(message)s') return getattr(ObexFTPTest(args.file.name, args.btaddr), args.action)() + if __name__ == "__main__": sys.exit(main()) diff --git a/bin/bmc_info.py b/bin/bmc_info.py index d7aec9b..674d7e8 100755 --- a/bin/bmc_info.py +++ b/bin/bmc_info.py @@ -4,6 +4,7 @@ import sys import shlex from subprocess import check_output, CalledProcessError + def main(): # First, we need to get output cmd = "ipmitool mc info" @@ -14,7 +15,7 @@ def main(): file=sys.stderr) return 1 except CalledProcessError as e: - print("Problem running %s. Error was %s" % (cmd,e),file=sys.stderr) + print("Problem running %s. Error was %s" % (cmd, e), file=sys.stderr) return 1 result = result.split('\n') @@ -47,12 +48,11 @@ def main(): if type(data[field]) is list: # Sometimes the first item in the list is ''. This will remove it data[field].remove('') - print(field.ljust(30),':',data[field].pop(0)) + print(field.ljust(30), ':', data[field].pop(0)) for item in data[field]: - print(' '.ljust(32),item) + print(' '.ljust(32), item) else: - print(field.ljust(30),":",data[field]) - #print("{}:\t{}".format(field, data[field])) + print(field.ljust(30), ":", data[field]) return 0 diff --git a/bin/boot_mode_test_snappy.py b/bin/boot_mode_test_snappy.py index 864be29..23b30e4 100755 --- a/bin/boot_mode_test_snappy.py +++ b/bin/boot_mode_test_snappy.py @@ -14,7 +14,6 @@ import tempfile import yaml -from checkbox_support.parsers.kernel_cmdline import parse_kernel_cmdline from checkbox_support.snap_utils.system import get_lk_bootimg_path diff --git a/bin/brightness_test.py b/bin/brightness_test.py index 52d478a..e3a8e90 100755 --- a/bin/brightness_test.py +++ b/bin/brightness_test.py @@ -28,7 +28,6 @@ import sys import os import time -from sys import stdout, stderr from glob import glob @@ -42,7 +41,7 @@ class Brightness(object): # See if the source is a file or a file object # and act accordingly file = path - if file == None: + if file is None: lines_list = [] else: # It's a file @@ -102,8 +101,10 @@ class Brightness(object): Note: this doesn't guarantee that screen brightness changed. ''' - if (abs(self.get_actual_brightness(interface) - - self.get_last_set_brightness(interface)) > 1): + if ( + abs(self.get_actual_brightness(interface) - + self.get_last_set_brightness(interface)) > 1 + ): return 1 else: return 0 @@ -132,9 +133,9 @@ def main(): max_brightness = brightness.get_max_brightness(interface) # Set the brightness to half the max value - brightness.write_value(max_brightness / 2, - os.path.join(interface, - 'brightness')) + brightness.write_value( + max_brightness / 2, + os.path.join(interface, 'brightness')) # Check that "actual_brightness" reports the same value we # set "brightness" to @@ -144,9 +145,9 @@ def main(): time.sleep(2) # Set the brightness back to its original value - brightness.write_value(current_brightness, - os.path.join(interface, - 'brightness')) + brightness.write_value( + current_brightness, + os.path.join(interface, 'brightness')) exit(exit_status) diff --git a/bin/bt_connect.py b/bin/bt_connect.py index 1fed30e..8d6757c 100755 --- a/bin/bt_connect.py +++ b/bin/bt_connect.py @@ -131,5 +131,6 @@ def main(): # capture all other silence failures return 1 + if __name__ == "__main__": sys.exit(main()) diff --git a/bin/bt_list_adapters.py b/bin/bt_list_adapters.py index e791442..c8db787 100755 --- a/bin/bt_list_adapters.py +++ b/bin/bt_list_adapters.py @@ -38,7 +38,7 @@ def main(): with open(namef, 'r') as f: name = f.read().strip() print(rfdev, name) - if found_adatper == False: + if found_adatper is False: raise SystemExit('No bluetooth adatpers registered with rfkill') diff --git a/bin/color_depth_info.py b/bin/color_depth_info.py index 129f051..fcab953 100755 --- a/bin/color_depth_info.py +++ b/bin/color_depth_info.py @@ -53,7 +53,7 @@ def get_color_depth(log_dir='/var/log/'): return (depth, pixmap_format) with open(current_log, 'rb') as stream: - for match in re.finditer('Depth (\d+) pixmap format is (\d+) bpp', + for match in re.finditer(r'Depth (\d+) pixmap format is (\d+) bpp', str(stream.read())): depth = int(match.group(1)) pixmap_format = int(match.group(2)) @@ -72,5 +72,6 @@ def main(): return 1 return 0 + if __name__ == '__main__': sys.exit(main()) diff --git a/bin/cpu_offlining.py b/bin/cpu_offlining.py index 5616e30..3d269ac 100755 --- a/bin/cpu_offlining.py +++ b/bin/cpu_offlining.py @@ -68,5 +68,6 @@ def main(): print(' '.join(failed_onlines)) return 1 + if __name__ == '__main__': main() diff --git a/bin/cpu_topology.py b/bin/cpu_topology.py index 0ce68c5..425e3c4 100755 --- a/bin/cpu_topology.py +++ b/bin/cpu_topology.py @@ -21,8 +21,8 @@ class proc_cpuinfo(): finally: cpu_fh.close() - r_s390 = re.compile("processor [0-9]") - r_x86 = re.compile("processor\s+:") + r_s390 = re.compile(r"processor [0-9]") + r_x86 = re.compile(r"processor\s+:") for i in temp: # Handle s390 first if r_s390.match(i): @@ -111,5 +111,6 @@ def main(): else: return 0 + if __name__ == '__main__': sys.exit(main()) diff --git a/bin/cpuid.py b/bin/cpuid.py index d0cdea9..6543dc4 100755 --- a/bin/cpuid.py +++ b/bin/cpuid.py @@ -25,10 +25,9 @@ # Modifications: 2019 Jeffrey Lane (jeffrey.lane@canonical.com) import ctypes -import os import platform import sys -from ctypes import (c_uint32, c_int, c_long, c_ulong, c_size_t, c_void_p, +from ctypes import (c_uint32, c_int, c_size_t, c_void_p, POINTER, CFUNCTYPE) from subprocess import check_output @@ -84,7 +83,8 @@ CPUIDS = { "Broadwell": ['0x4067', '0x306d4', '0x5066', '0x406f'], "Canon Lake": ['0x6066'], "Cascade Lake": ['0x50655', '0x50656', '0x50657'], - "Coffee Lake": ['0x806ea', '0x906ea', '0x906eb', '0x906ec', '0x906ed'], + "Coffee Lake": [ + '0x806ea', '0x906ea', '0x906eb', '0x906ec', '0x906ed'], "Haswell": ['0x306c', '0x4065', '0x4066', '0x306f'], "Ice Lake": ['0x706e'], "Ivy Bridge": ['0x306a', '0x306e'], diff --git a/bin/create_connection.py b/bin/create_connection.py index 71e9572..880151d 100755 --- a/bin/create_connection.py +++ b/bin/create_connection.py @@ -6,9 +6,8 @@ import time from subprocess import check_call, check_output, CalledProcessError try: - from subprocess import DEVNULL # >= python3.3 + from subprocess import DEVNULL # >= python3.3 except ImportError: - import os DEVNULL = open(os.devnull, 'wb') from uuid import uuid4 @@ -154,7 +153,7 @@ def block_until_created(connection, retries, interval): sys.exit(1) else: try: - nmcli_con_up = check_call(['nmcli', 'con', 'up', 'id', connection]) + check_call(['nmcli', 'con', 'up', 'id', connection]) print("Connection %s activated." % connection) except CalledProcessError as error: print("Failed to activate %s." % connection, file=sys.stderr) @@ -216,7 +215,7 @@ def create_mobilebroadband_connection(args): args.apn, args.username, args.password, - args.pin) + args.pin) if args.type == 'cdma': mobilebroadband_connection += mobilebroadband_ppp_section() @@ -288,5 +287,6 @@ def main(): # Make sure we don't exit until the connection is fully created block_until_created(connection_name, args.retries, args.interval) + if __name__ == "__main__": main() diff --git a/bin/dmitest.py b/bin/dmitest.py index 642af3a..2be50cc 100755 --- a/bin/dmitest.py +++ b/bin/dmitest.py @@ -146,26 +146,26 @@ def standard_tests(args, stream, dmi_data): if find_in_section(stream, dmi_data, 'Chassis Information', 'Manufacturer:', ['empty', 'chassis manufacture', 'null', 'insyde', - 'to be filled by o\.e\.m\.', 'no enclosure', - '\.\.\.\.\.'], True): + r'to be filled by o\.e\.m\.', 'no enclosure', + r'\.\.\.\.\.'], True): dmi_data['Chassis Information']['Manufacturer'] += \ " *** Invalid chassis manufacturer!" retval += 1 if find_in_section(stream, dmi_data, 'System Information', 'Manufacturer:', ['system manufacture', 'insyde', 'standard', - 'to be filled by o\.e\.m\.', 'no enclosure'], True): + r'to be filled by o\.e\.m\.', 'no enclosure'], True): dmi_data['System Information']['Manufacturer'] += \ " *** Invalid system manufacturer!" retval += 1 if find_in_section(stream, dmi_data, 'Base Board Information', 'Manufacturer:', - ['to be filled by o\.e\.m\.'], True): + [r'to be filled by o\.e\.m\.'], True): dmi_data['Base Board Information']['Manufacturer'] += \ " *** Invalid base board manufacturer!" retval += 1 if find_in_section(stream, dmi_data, 'System Information', 'Product Name:', - ['system product name', 'to be filled by o\.e\.m\.'], + ['system product name', r'to be filled by o\.e\.m\.'], False): dmi_data['System Information']['Product Name'] += \ " *** Invalid system product name!" @@ -173,7 +173,7 @@ def standard_tests(args, stream, dmi_data): if find_in_section(stream, dmi_data, 'Base Board Information', 'Product Name:', ['base board product name', - 'to be filled by o\.e\.m\.'], False): + r'to be filled by o\.e\.m\.'], False): dmi_data['Base Board Information']['Product Name'] += \ " *** Invalid base board product name!" retval += 1 @@ -193,21 +193,21 @@ def version_tests(args, stream, dmi_data): """ retval = 0 if find_in_section(stream, dmi_data, 'Chassis Information', 'Version:', - ['to be filled by o\.e\.m\.', 'empty', 'x\.x'], + [r'to be filled by o\.e\.m\.', 'empty', r'x\.x'], False): dmi_data['Chassis Information']['Version'] += \ " *** Invalid chassis version!" retval += 1 if find_in_section(stream, dmi_data, 'System Information', 'Version:', - ['to be filled by o\.e\.m\.', '\(none\)', + [r'to be filled by o\.e\.m\.', r'\(none\)', 'null', 'system version', 'not applicable', - '\.\.\.\.\.'], False): + r'\.\.\.\.\.'], False): dmi_data['System Information']['Version'] += \ " *** Invalid system information version!" retval += 1 if find_in_section(stream, dmi_data, 'Base Board Information', 'Version:', - ['base board version', 'x\.x', - 'empty', 'to be filled by o\.e\.m\.'], False): + ['base board version', r'x\.x', + 'empty', r'to be filled by o\.e\.m\.'], False): dmi_data['Base Board Information']['Version'] += \ " *** Invalid base board version!" retval += 1 @@ -228,8 +228,8 @@ def serial_tests(args, stream, dmi_data): retval = 0 if find_in_section(stream, dmi_data, 'System Information', 'Serial Number:', - ['to be filled by o\.e\.m\.', - 'system serial number', '\.\.\.\.\.'], + [r'to be filled by o\.e\.m\.', + 'system serial number', r'\.\.\.\.\.'], False): dmi_data['System Information']['Serial Number'] += \ " *** Invalid system information serial number!" @@ -237,8 +237,8 @@ def serial_tests(args, stream, dmi_data): if find_in_section(stream, dmi_data, 'Base Board Information', 'Serial Number:', ['n/a', 'base board serial number', - 'to be filled by o\.e\.m\.', - 'empty', '\.\.\.'], + r'to be filled by o\.e\.m\.', + 'empty', r'\.\.\.'], False): dmi_data['Base Board Information']['Serial Number'] += \ " *** Invalid base board serial number!" @@ -306,7 +306,7 @@ def main(): if args.test_serials: retval += serial_tests(args, stream, dmi_data) if find_in_section(stream, dmi_data, 'Processor Information', 'Version:', - ['sample', 'Genuine Intel\(R\) CPU 0000'], False): + ['sample', r'Genuine Intel\(R\) CPU 0000'], False): dmi_data['Processor Information']['Version'] += \ " *** Invalid processor information!" retval += 1 diff --git a/bin/edid_cycle.py b/bin/edid_cycle.py index a6daf1c..e327cb0 100755 --- a/bin/edid_cycle.py +++ b/bin/edid_cycle.py @@ -21,7 +21,7 @@ def check_resolution(): output = subprocess.check_output('xdpyinfo') for line in output.decode(sys.stdout.encoding).splitlines(): if 'dimensions' in line: - match = re.search('(\d+)x(\d+)\ pixels', line) + match = re.search(r'(\d+)x(\d+)\ pixels', line) if match and len(match.groups()) == 2: return '{}x{}'.format(*match.groups()) @@ -29,7 +29,8 @@ def check_resolution(): def change_edid(host, edid_file): with open(edid_file, 'rb') as f: cmd = ['ssh', host, '/snap/bin/pigbox', 'run', - '\'v4l2-ctl --set-edid=file=-,format=raw --fix-edid-checksums\''] + '\'v4l2-ctl --set-edid=file=-,' + 'format=raw --fix-edid-checksums\''] subprocess.check_output(cmd, input=f.read()) @@ -52,5 +53,6 @@ def main(): print('PASS') return failed + if __name__ == '__main__': raise SystemExit(main()) diff --git a/bin/evdev_touch_test.py b/bin/evdev_touch_test.py index 5649701..bc52cc4 100755 --- a/bin/evdev_touch_test.py +++ b/bin/evdev_touch_test.py @@ -53,7 +53,7 @@ while True: for e in dev.read(): tap = args.xfingers if tap == 1: - if (e.type == 3 and e.code == 47 and e.value > 0): + if (e.type == 3 and e.code == 47 and e.value > 0): raise SystemExit( "Multitouch Event detected but Single was expected") # type 1 is evdev.ecodes.EV_KEY diff --git a/bin/fan_reaction_test.py b/bin/fan_reaction_test.py index a4ab744..c37054b 100755 --- a/bin/fan_reaction_test.py +++ b/bin/fan_reaction_test.py @@ -34,6 +34,7 @@ class FanMonitor: if not self._fan_paths: print('Fan monitoring interface not found in SysFS') raise SystemExit(0) + def get_rpm(self): result = {} for p in self._fan_paths: @@ -44,6 +45,7 @@ class FanMonitor: except OSError: print('Fan SysFS node dissappeared ({})'.format(p)) return result + def get_average_rpm(self, period): acc = self.get_rpm() for i in range(period): @@ -83,14 +85,15 @@ class Stressor: def _stress_fun(self): """Actual stress function.""" # generate some random data - data = bytes(random.getrandbits(8) for _ in range(1024)) - hasher = hashlib.sha256() + data = bytes(random.getrandbits(8) for _ in range(1024)) + hasher = hashlib.sha256() hasher.update(data) while True: new_digest = hasher.digest() # use the newly obtained digest as the new data to the hasher hasher.update(new_digest) + def main(): """Entry point.""" @@ -153,5 +156,6 @@ def main(): if not (rpm_rose_during_stress and rpm_dropped_during_cooling): raise SystemExit("Fans did not react to stress expectedly") + if __name__ == '__main__': main() diff --git a/bin/frequency_governors_test.py b/bin/frequency_governors_test.py index 6072ded..52dd74a 100755 --- a/bin/frequency_governors_test.py +++ b/bin/frequency_governors_test.py @@ -8,7 +8,8 @@ import time import argparse import logging -from subprocess import check_output, check_call, CalledProcessError, PIPE +from subprocess import check_output, check_call, CalledProcessError + class CPUScalingTest(object): @@ -37,11 +38,11 @@ class CPUScalingTest(object): for subdirectory in os.listdir(self.sysCPUDirectory): match = pattern.search(subdirectory) if match and match.group("cpuNumber"): - cpufreqDirectory = os.path.join(self.sysCPUDirectory, - subdirectory, "cpufreq") + cpufreqDirectory = os.path.join( + self.sysCPUDirectory, subdirectory, "cpufreq") if not os.path.exists(cpufreqDirectory): - logging.error("CPU %s has no cpufreq directory %s" - % (match.group("cpuNumber"), cpufreqDirectory)) + logging.error("CPU %s has no cpufreq directory %s" % ( + match.group("cpuNumber"), cpufreqDirectory)) return None # otherwise self.cpufreqDirectories.append(cpufreqDirectory) @@ -59,7 +60,8 @@ class CPUScalingTest(object): for cpufreqDirectory in self.cpufreqDirectories: parameters = self.getParameters(cpufreqDirectory, file) if not parameters: - logging.error("Error: could not determine cpu parameters from %s" + logging.error( + "Error: could not determine cpu parameters from %s" % os.path.join(cpufreqDirectory, file)) return None if not current: @@ -91,7 +93,7 @@ class CPUScalingTest(object): return rf return None - logging.debug("Setting %s to %s" % (setFile,value)) + logging.debug("Setting %s to %s" % (setFile, value)) path = None if not skip: if automatch: @@ -115,8 +117,9 @@ class CPUScalingTest(object): parameterFile = open(path) line = parameterFile.readline() if not line or line.strip() != str(value): - logging.error("Error: could not verify that %s was set to %s" - % (path, value)) + logging.error( + "Error: could not verify that %s was set to %s" % ( + path, value)) if line: logging.error("Actual Value: %s" % line) else: @@ -127,6 +130,7 @@ class CPUScalingTest(object): def checkSelectorExecutable(self): logging.debug("Determining if %s is executable" % self.selectorExe) + def is_exe(fpath): return os.path.exists(fpath) and os.access(fpath, os.X_OK) @@ -179,8 +183,9 @@ class CPUScalingTest(object): parameterFile = open(parameterFilePath) line = parameterFile.readline() if not line: - logging.error("Error: failed to get %s for %s" - % (parameter, self.cpufreqDirectory)) + logging.error( + "Error: failed to get %s for %s" % ( + parameter, self.cpufreqDirectory)) return None value = line.strip() return value @@ -198,8 +203,9 @@ class CPUScalingTest(object): parameterFile = open(path) line = parameterFile.readline() if not line: - logging.error("Error: failed to get %s for %s" - % (parameter, cpufreqDirectory)) + logging.error( + "Error: failed to get %s for %s" % ( + parameter, cpufreqDirectory)) return None values.append(line.strip()) logging.debug("Found parameters:") @@ -231,15 +237,18 @@ class CPUScalingTest(object): runTime = stop_elapsed_time - start_elapsed_time else: thisTime = stop_elapsed_time - start_elapsed_time - if ((abs(thisTime - runTime) / runTime) * 100 - < self.retryTolerance): + if ( + (abs(thisTime - runTime) / runTime) * 100 + < self.retryTolerance + ): return runTime else: runTime = thisTime tries += 1 - logging.error("Could not repeat load test times within %.1f%%" - % self.retryTolerance) + logging.error( + "Could not repeat load test times within %.1f%%" % + self.retryTolerance) return None def pi(self): @@ -262,9 +271,11 @@ class CPUScalingTest(object): logging.info("Done.") minimumFrequency = self.getParameter("scaling_min_freq") currentFrequency = self.getParameter("scaling_cur_freq") - if (not minimumFrequency + if ( + not minimumFrequency or not currentFrequency - or (minimumFrequency != currentFrequency)): + or (minimumFrequency != currentFrequency) + ): return False # otherwise @@ -310,7 +321,8 @@ class CPUScalingTest(object): logging.info(" cpu%u: %s" % (i, g)) i += 1 else: - logging.error("Error: could not determine current governor settings") + logging.error( + "Error: could not determine current governor settings") return False self.getCPUFlags() @@ -333,7 +345,7 @@ class CPUScalingTest(object): logging.debug("Found the following CPU Flags:") for line in self.cpuFlags: logging.debug(" %s" % line) - except: + except OSError: logging.warning("Could not read CPU flags") def runUserSpaceTests(self): @@ -356,52 +368,66 @@ class CPUScalingTest(object): # Set the the CPU speed to it's lowest value frequency = self.minFreq - logging.info("Setting CPU frequency to %u MHz" % (int(frequency) / 1000)) + logging.info( + "Setting CPU frequency to %u MHz" % (int(frequency) / 1000)) if not self.setFrequency(frequency): success = False # Verify the speed is set to the lowest value minimumFrequency = self.getParameter("scaling_min_freq") currentFrequency = self.getParameter("scaling_cur_freq") - if (not minimumFrequency + if ( + not minimumFrequency or not currentFrequency - or (minimumFrequency != currentFrequency)): - logging.error("Could not verify that cpu frequency is set to the minimum value of %s" - % minimumFrequency) + or (minimumFrequency != currentFrequency) + ): + logging.error( + "Could not verify that cpu frequency is set to the minimum" + " value of %s" % minimumFrequency) success = False # Run Load Test self.minimumFrequencyTestTime = self.runLoadTest() if not self.minimumFrequencyTestTime: - logging.error("Could not retrieve the minimum frequency test's execution time.") + logging.error( + "Could not retrieve the minimum frequency test's" + " execution time.") success = False else: - logging.info("Minimum frequency load test time: %.2f" - % self.minimumFrequencyTestTime) + logging.info( + "Minimum frequency load test time: %.2f" + % self.minimumFrequencyTestTime) # Set the CPU speed to it's highest value as above. frequency = self.maxFreq - logging.info("Setting CPU frequency to %u MHz" % (int(frequency) / 1000)) + logging.info( + "Setting CPU frequency to %u MHz" % (int(frequency) / 1000)) if not self.setFrequency(frequency): success = False maximumFrequency = self.getParameter("scaling_max_freq") currentFrequency = self.getParameter("scaling_cur_freq") - if (not maximumFrequency + if ( + not maximumFrequency or not currentFrequency - or (maximumFrequency != currentFrequency)): - logging.error("Could not verify that cpu frequency is set to the maximum value of %s" - % maximumFrequency) + or (maximumFrequency != currentFrequency) + ): + logging.error( + "Could not verify that cpu frequency is set to the" + " maximum value of %s" % maximumFrequency) success = False # Repeat workload test self.maximumFrequencyTestTime = self.runLoadTest() if not self.maximumFrequencyTestTime: - logging.error("Could not retrieve the maximum frequency test's execution time.") + logging.error( + "Could not retrieve the maximum frequency test's " + "execution time.") success = False else: - logging.info("Maximum frequency load test time: %.2f" - % self.maximumFrequencyTestTime) + logging.info( + "Maximum frequency load test time: %.2f" + % self.maximumFrequencyTestTime) # Verify MHz increase is comparable to time % decrease predictedSpeedup = (float(maximumFrequency) @@ -409,8 +435,9 @@ class CPUScalingTest(object): # If "ida" turbo thing, increase the expectation by 8% if self.cpuFlags and self.idaFlag in self.cpuFlags: - logging.info("Found %s flag, increasing expected speedup by %.1f%%" - % (self.idaFlag, self.idaSpeedupFactor)) + logging.info( + "Found %s flag, increasing expected speedup by %.1f%%" + % (self.idaFlag, self.idaSpeedupFactor)) predictedSpeedup = \ (predictedSpeedup * (1.0 / (1.0 - (self.idaSpeedupFactor / 100.0)))) @@ -420,18 +447,24 @@ class CPUScalingTest(object): / self.maximumFrequencyTestTime) logging.info("CPU Frequency Speed Up: %.2f" % predictedSpeedup) logging.info("Measured Speed Up: %.2f" % measuredSpeedup) - differenceSpeedUp = (((measuredSpeedup - predictedSpeedup) / predictedSpeedup) * 100) - logging.info("Percentage Difference %.1f%%" % differenceSpeedUp) + differenceSpeedUp = ( + ((measuredSpeedup - predictedSpeedup) / predictedSpeedup) + * 100) + logging.info( + "Percentage Difference %.1f%%" % differenceSpeedUp) if differenceSpeedUp > self.speedUpTolerance: - logging.error("Measured speedup vs expected speedup is %.1f%% and is not within %.1f%% margin." - % (differenceSpeedUp, self.speedUpTolerance)) + logging.error( + "Measured speedup vs expected speedup is %.1f%% " + "and is not within %.1f%% margin." + % (differenceSpeedUp, self.speedUpTolerance)) success = False elif differenceSpeedUp < 0: - logging.info(""" - Measured speed up %.2f exceeded expected speedup %.2f - """ % (measuredSpeedup, predictedSpeedup)) + logging.info( + "Measured speed up %.2f exceeded expected speedup %.2f" + % (measuredSpeedup, predictedSpeedup)) else: - logging.error("Not enough timing data to calculate speed differences.") + logging.error( + "Not enough timing data to calculate speed differences.") return success @@ -453,7 +486,9 @@ class CPUScalingTest(object): # Wait a fixed period of time, then verify current speed # is the slowest in as before if not self.verifyMinimumFrequency(): - logging.error("Could not verify that cpu frequency has settled to the minimum value") + logging.error( + "Could not verify that cpu frequency has settled to the " + "minimum value") success = False # Repeat workload test @@ -462,7 +497,8 @@ class CPUScalingTest(object): logging.warning("No On Demand load test time available.") success = False else: - logging.info("On Demand load test time: %.2f" % onDemandTestTime) + logging.info( + "On Demand load test time: %.2f" % onDemandTestTime) if onDemandTestTime and self.maximumFrequencyTestTime: # Compare the timing to the max results from earlier, @@ -470,19 +506,24 @@ class CPUScalingTest(object): differenceOnDemandVsMaximum = \ (abs(onDemandTestTime - self.maximumFrequencyTestTime) / self.maximumFrequencyTestTime) * 100 - logging.info("Percentage Difference vs. maximum frequency: %.1f%%" - % differenceOnDemandVsMaximum) + logging.info( + "Percentage Difference vs. maximum frequency: %.1f%%" + % differenceOnDemandVsMaximum) if differenceOnDemandVsMaximum > self.speedUpTolerance: - logging.error("On demand performance vs maximum of %.1f%% is not within %.1f%% margin" - % (differenceOnDemandVsMaximum, - self.speedUpTolerance)) + logging.error( + "On demand performance vs maximum of %.1f%% is not " + "within %.1f%% margin" + % (differenceOnDemandVsMaximum, self.speedUpTolerance)) success = False else: - logging.error("Not enough timing data to calculate speed differences.") + logging.error( + "Not enough timing data to calculate speed differences.") # Verify the current speed has returned to the lowest speed again if not self.verifyMinimumFrequency(): - logging.error("Could not verify that cpu frequency has settled to the minimum value") + logging.error( + "Could not verify that cpu frequency has settled to the " + "minimum value") success = False return success @@ -504,11 +545,14 @@ class CPUScalingTest(object): # Verify the current speed is the same as scaling_max_freq maximumFrequency = self.getParameter("scaling_max_freq") currentFrequency = self.getParameter("scaling_cur_freq") - if (not maximumFrequency + if ( + not maximumFrequency or not currentFrequency - or (maximumFrequency != currentFrequency)): - logging.error("Current cpu frequency of %s is not set to the maximum value of %s" - % (currentFrequency, maximumFrequency)) + or (maximumFrequency != currentFrequency) + ): + logging.error( + "Current cpu frequency of %s is not set to the maximum " + "value of %s" % (currentFrequency, maximumFrequency)) success = False # Repeat work load test @@ -517,22 +561,27 @@ class CPUScalingTest(object): logging.error("No Performance load test time available.") success = False else: - logging.info("Performance load test time: %.2f" % performanceTestTime) + logging.info( + "Performance load test time: %.2f" % performanceTestTime) if performanceTestTime and self.maximumFrequencyTestTime: # Compare the timing to the max results differencePerformanceVsMaximum = \ (abs(performanceTestTime - self.maximumFrequencyTestTime) / self.maximumFrequencyTestTime) * 100 - logging.info("Percentage Difference vs. maximum frequency: %.1f%%" - % differencePerformanceVsMaximum) + logging.info( + "Percentage Difference vs. maximum frequency: %.1f%%" + % differencePerformanceVsMaximum) if differencePerformanceVsMaximum > self.speedUpTolerance: - logging.error("Performance setting vs maximum of %.1f%% is not within %.1f%% margin" - % (differencePerformanceVsMaximum, - self.speedUpTolerance)) + logging.error( + "Performance setting vs maximum of %.1f%% is not " + "within %.1f%% margin" + % (differencePerformanceVsMaximum, + self.speedUpTolerance)) success = False else: - logging.error("Not enough timing data to calculate speed differences.") + logging.error( + "Not enough timing data to calculate speed differences.") return success @@ -559,7 +608,9 @@ class CPUScalingTest(object): # Wait a fixed period of time, # then verify current speed is the slowest in as before if not self.verifyMinimumFrequency(10): - logging.error("Could not verify that cpu frequency has settled to the minimum value") + logging.error( + "Could not verify that cpu frequency has settled " + "to the minimum value") success = False # Set the frequency step to 0, @@ -573,28 +624,34 @@ class CPUScalingTest(object): logging.error("No Conservative load test time available.") success = False else: - logging.info("Conservative load test time: %.2f" - % conservativeTestTime) + logging.info( + "Conservative load test time: %.2f" + % conservativeTestTime) if conservativeTestTime and self.minimumFrequencyTestTime: # Compare the timing to the max results differenceConservativeVsMinimum = \ (abs(conservativeTestTime - self.minimumFrequencyTestTime) / self.minimumFrequencyTestTime) * 100 - logging.info("Percentage Difference vs. minimum frequency: %.1f%%" - % differenceConservativeVsMinimum) + logging.info( + "Percentage Difference vs. minimum frequency: %.1f%%" + % differenceConservativeVsMinimum) if differenceConservativeVsMinimum > self.speedUpTolerance: - logging.error("Performance setting vs minimum of %.1f%% is not within %.1f%% margin" - % (differenceConservativeVsMinimum, - self.speedUpTolerance)) + logging.error( + "Performance setting vs minimum of %.1f%% is not " + "within %.1f%% margin" + % (differenceConservativeVsMinimum, + self.speedUpTolerance)) success = False else: - logging.error("Not enough timing data to calculate speed differences.") + logging.error( + "Not enough timing data to calculate speed differences.") return success def restoreGovernors(self): - logging.info("Restoring original governor to %s" % (self.originalGovernors[0])) + logging.info( + "Restoring original governor to %s" % (self.originalGovernors[0])) self.setGovernor(self.originalGovernors[0]) @@ -604,8 +661,9 @@ def main(): help="Suppress output.") parser.add_argument("-c", "--capabilities", action="store_true", help="Only output CPU capabilities.") - parser.add_argument("-d", "--debug", action="store_true", - help="Turn on debug level output for extra info during test run.") + parser.add_argument( + "-d", "--debug", action="store_true", + help="Turn on debug level output for extra info during test run.") args = parser.parse_args() # Set up the logging system (unless we don't want ANY logging) @@ -614,11 +672,11 @@ def main(): logger.setLevel(logging.DEBUG) format = '%(asctime)s %(levelname)-8s %(message)s' date_format = '%Y-%m-%d %H:%M:%S' - + # If we DO want console output if not args.quiet: - console_handler = logging.StreamHandler() #logging.StreamHandler() - console_handler.setFormatter(logging.Formatter(format,date_format)) + console_handler = logging.StreamHandler() + console_handler.setFormatter(logging.Formatter(format, date_format)) console_handler.setLevel(logging.INFO) logger.addHandler(console_handler) diff --git a/bin/fresh_rate_info.py b/bin/fresh_rate_info.py index 43a8f2a..3089b48 100755 --- a/bin/fresh_rate_info.py +++ b/bin/fresh_rate_info.py @@ -38,13 +38,13 @@ def xrandr_paser(data=None): resolution = None xrandrs = list() for line in str(data).split('\n'): - for match in re.finditer('(.+) connected (\d+x\d+)\+', line): + for match in re.finditer(r'(.+) connected (\d+x\d+)\+', line): connector = match.group(1) resolution = match.group(2) break if resolution is None: continue - for match in re.finditer('{0}\s+(.+)\*'.format(resolution), + for match in re.finditer(r'{0}\s+(.+)\*'.format(resolution), line): refresh_rate = match.group(1) xrandr = {'connector': connector, @@ -72,5 +72,6 @@ def main(): xrandr['resolution'], xrandr['refresh_rate'])) + if __name__ == '__main__': sys.exit(main()) diff --git a/bin/get_make_and_model.py b/bin/get_make_and_model.py index 496a949..42fb219 100755 --- a/bin/get_make_and_model.py +++ b/bin/get_make_and_model.py @@ -1,26 +1,28 @@ #!/usr/bin/env python3 -import os.path import shlex from subprocess import check_output + def print_header(value): print("{}:".format(value)) + def print_data(key, value): print(" {}: {}".format(key, value)) + def run_cmd(option): cmd = "lshw -C " + option - out = check_output(shlex.split(cmd), - universal_newlines = True) + out = check_output(shlex.split(cmd), universal_newlines=True) return out.split('\n') + def main(): keys = {'Manufacturer': 'vendor', 'Model': 'product', 'Version': 'version'} - lshw_classes = {'system': 'System', + lshw_classes = {'system': 'System', 'bus': 'Mainboard'} for lshw_class in lshw_classes: @@ -38,6 +40,6 @@ def main(): for key in data: print_data(key, data[key]) + if __name__ == "__main__": raise SystemExit(main()) - diff --git a/bin/gpu_test.py b/bin/gpu_test.py index 49ea31a..4f2af2e 100755 --- a/bin/gpu_test.py +++ b/bin/gpu_test.py @@ -30,9 +30,9 @@ import subprocess import sys import time gi.require_version('Gio', '2.0') -from gi.repository import Gio -from math import cos, sin -from threading import Thread +from gi.repository import Gio # noqa: E402 +from math import cos, sin # noqa: E402 +from threading import Thread # noqa: E402 class GlxThread(Thread): @@ -44,14 +44,13 @@ class GlxThread(Thread): try: self.process = subprocess.Popen( - ["glxgears","-geometry", "400x400"], + ["glxgears", "-geometry", "400x400"], stdout=subprocess.PIPE, stderr=subprocess.STDOUT) self.process.communicate() except (subprocess.CalledProcessError, FileNotFoundError) as er: print("WARNING: Unable to start glxgears (%s)" % er) - def terminate(self): if not hasattr(self, 'id'): print("WARNING: Attempted to terminate non-existing window.") @@ -121,8 +120,8 @@ class Html5VideoThread(Thread): def html5_path(self): if os.getenv('PLAINBOX_PROVIDER_DATA'): return os.path.join( - os.getenv('PLAINBOX_PROVIDER_DATA'), - 'websites/html5_video_html') + os.getenv('PLAINBOX_PROVIDER_DATA'), + 'websites/html5_video_html') def run(self): if self_html5_path and os.path.isfile(self_html5_path): @@ -136,8 +135,8 @@ class Html5VideoThread(Thread): print("WARNING: test results may be invalid.") def terminate(self): - if self_html5_path and os.path.isfile(self_html5_path): - subprocess.call("pkill firefox", shell=True) + if self_html5_path and os.path.isfile(self_html5_path): + subprocess.call("pkill firefox", shell=True) def check_gpu(log=None): @@ -172,10 +171,10 @@ def main(): print("WARNING: Got an exception %s" % er) windows = "" for app in sorted(windows.splitlines(), reverse=True): - if not b'glxgears' in app: + if b'glxgears' not in app: continue GlxWindows[i].id = str( - re.match(b'^(0x\w+)', app).group(0), 'utf-8') + re.match(b'^(0x\w+)', app).group(0), 'utf-8') # noqa: W605 break if hasattr(GlxWindows[i], "id"): rotator = RotateGlxThread(GlxWindows[i].id, i + 1) @@ -204,7 +203,7 @@ def main(): 'gconftool --get /apps/compiz-1/general/screen0/options/vsize', shell=True)) (x_res, y_res) = re.search( - b'DG:\s+(\d+)x(\d+)', + b'DG:\s+(\d+)x(\d+)', # noqa: W605 subprocess.check_output('wmctrl -d', shell=True)).groups() DesktopSwitch = ChangeWorkspace( hsize, vsize, int(x_res) // hsize, int(y_res) // vsize) @@ -230,5 +229,6 @@ def main(): settings.set_int("vsize", vsize_ori) Gio.Settings.sync() + if __name__ == '__main__': sys.exit(main()) diff --git a/bin/graphic_memory_info.py b/bin/graphic_memory_info.py index ba7eece..29040a5 100755 --- a/bin/graphic_memory_info.py +++ b/bin/graphic_memory_info.py @@ -42,13 +42,13 @@ def vgamem_paser(data=None): device = None vgamems = list() for line in data.split('\n'): - for match in re.finditer('(\d\d:\d\d\.\d) VGA(.+): (.+)', line): + for match in re.finditer(r'(\d\d:\d\d\.\d) VGA(.+): (.+)', line): device = match.group(1) name = match.group(3) if device is None: continue -#Memory at e0000000 (32-bit, prefetchable) [size=256M] - for match in re.finditer('Memory(.+) prefetchable\) \[size=(\d+)M\]', +# Memory at e0000000 (32-bit, prefetchable) [size=256M] + for match in re.finditer(r'Memory(.+) prefetchable\) \[size=(\d+)M\]', line): vgamem_size = match.group(2) vgamem = {'device': device, @@ -77,5 +77,6 @@ def main(): vgamem['name'], vgamem['vgamem_size'])) + if __name__ == '__main__': sys.exit(main()) diff --git a/bin/graphics_driver.py b/bin/graphics_driver.py index 6d28f5a..0b41050 100755 --- a/bin/graphics_driver.py +++ b/bin/graphics_driver.py @@ -129,7 +129,7 @@ class XorgLog(object): gathering_module = False module = None m = re.search( - '\(II\) Loading.*modules\/drivers\/(.+)_drv\.so', line) + r'\(II\) Loading.*modules\/drivers\/(.+)_drv\.so', line) if m: found_ddx = True continue @@ -207,7 +207,8 @@ class XorgLog(object): # For NVIDIA m = re.search(r'\(II\) (.*)\(\d+\): Setting mode "(.*?):', line) if not m: - m = re.search(r'\(II\) (.*)\(\d+\): Setting mode "(NULL)"', line) + m = re.search( + r'\(II\) (.*)\(\d+\): Setting mode "(NULL)"', line) if m: self.displays[display_name] = display self.video_driver = m.group(1) @@ -314,8 +315,8 @@ def is_laptop(): def hybrid_graphics_check(xlog): '''Check for Hybrid Graphics''' - card_id1 = re.compile('.*0300: *(.+):(.+) \(.+\)') - card_id2 = re.compile('.*03..: *(.+):(.+)') + card_id1 = re.compile(r'.*0300: *(.+):(.+) \(.+\)') + card_id2 = re.compile(r'.*03..: *(.+):(.+)') cards_dict = {'8086': 'Intel', '10de': 'NVIDIA', '1002': 'AMD'} cards = [] drivers = [] @@ -372,5 +373,6 @@ def main(): return 1 if 1 in results else 0 + if __name__ == "__main__": sys.exit(main()) diff --git a/bin/graphics_modes_info.py b/bin/graphics_modes_info.py index f3c962a..f92cd76 100755 --- a/bin/graphics_modes_info.py +++ b/bin/graphics_modes_info.py @@ -32,7 +32,7 @@ from checkbox_support.contrib import xrandr def print_modes_info(screen): """Print some information about the detected screen and its outputs""" xrandr._check_required_version((1, 0)) - print("Screen %s: minimum %s x %s, current %s x %s, maximum %s x %s" %\ + print("Screen %s: minimum %s x %s, current %s x %s, maximum %s x %s" % (screen._screen, screen._width_min, screen._height_min, screen._width, screen._height, @@ -50,10 +50,9 @@ def print_modes_info(screen): for m in range(len(modes)): mode = modes[m] refresh = mode.dotClock / (mode.hTotal * mode.vTotal) - print(" [%s] %s x %s @ %s Hz" % (m, - mode.width, - mode.height, - refresh), end=' ') + print( + " [%s] %s x %s @ %s Hz" % + (m, mode.width, mode.height, refresh), end=' ') if mode.id == output._mode: print("(current)", end=' ') if m == output.get_preferred_mode(): @@ -70,5 +69,6 @@ def main(): except(xrandr.UnsupportedRRError): print('Error: RandR version lower than 1.0', file=sys.stderr) + if __name__ == '__main__': main() diff --git a/bin/graphics_stress_test.py b/bin/graphics_stress_test.py index d467b3f..c0eeb4a 100755 --- a/bin/graphics_stress_test.py +++ b/bin/graphics_stress_test.py @@ -45,7 +45,7 @@ class VtWrapper(object): vt = 0 proc = Popen(['ps', 'aux'], stdout=PIPE, universal_newlines=True) proc_output = proc.communicate()[0].split('\n') - proc_line = re.compile('.*tty(\d+).+/usr/bin/X.*') + proc_line = re.compile(r'.*tty(\d+).+/usr/bin/X.*') for line in proc_output: match = proc_line.match(line) if match: @@ -56,6 +56,7 @@ class VtWrapper(object): retcode = call(['chvt', '%d' % vt]) return retcode + class SuspendWrapper(object): def __init__(self): pass @@ -132,6 +133,7 @@ class SuspendWrapper(object): return status + class RotationWrapper(object): def __init__(self): @@ -178,6 +180,7 @@ class RotationWrapper(object): result = 1 return result + class RenderCheckWrapper(object): """A simple class to run the rendercheck suites""" @@ -234,14 +237,15 @@ class RenderCheckWrapper(object): passed = int(match_output.group(1).strip()) total = int(match_output.group(2).strip()) logging.info('Results:') - logging.info(' %d tests passed out of %d.' - % (passed, total)) + logging.info( + ' %d tests passed out of %d.' % (passed, total)) if show_errors and match_errors: error = match_errors.group(0).strip() if first_error: - logging.debug('Rendercheck %s suite errors ' - 'from iteration %d:' - % (suites, iteration)) + logging.debug( + 'Rendercheck %s suite errors ' + 'from iteration %d:' + % (suites, iteration)) first_error = False logging.debug(' %s' % error) @@ -254,17 +258,17 @@ class RenderCheckWrapper(object): exit_status = 0 for suite in suites: for it in range(iterations): - logging.info('Iteration %d of Rendercheck %s suite...' - % (it + 1, suite)) - (status, passed, total) = \ - self._print_test_info(suites=suite, - iteration=it + 1, - show_errors=show_errors) + logging.info( + 'Iteration %d of Rendercheck %s suite...' + % (it + 1, suite)) + (status, passed, total) = self._print_test_info( + suites=suite, iteration=it + 1, show_errors=show_errors) if status != 0: # Make sure to catch a non-zero exit status - logging.info('Iteration %d of Rendercheck %s suite ' - 'exited with status %d.' - % (it + 1, suite, status)) + logging.info( + 'Iteration %d of Rendercheck %s suite ' + 'exited with status %d.' + % (it + 1, suite, status)) exit_status = status it += 1 @@ -372,7 +376,6 @@ def main(): logfile_handler.setFormatter(logging.Formatter(format)) logger.addHandler(logfile_handler) - log_path = os.path.abspath(logfile) # Write only to stdout else: @@ -402,16 +405,18 @@ def main(): logging.info('Iteration %d...', it) retcode = vt_wrap.set_vt(target_vt) if retcode != 0: - logging.error('Error: switching to tty%d failed with code %d ' - 'on iteration %d' % (target_vt, retcode, it)) + logging.error( + 'Error: switching to tty%d failed with code %d ' + 'on iteration %d' % (target_vt, retcode, it)) status = 1 else: logging.info('Switching to tty%d: passed' % (target_vt)) time.sleep(2) retcode = vt_wrap.set_vt(vt_wrap.x_vt) if retcode != 0: - logging.error('Error: switching to tty%d failed with code %d ' - 'on iteration %d' % (vt_wrap.x_vt, retcode, it)) + logging.error( + 'Error: switching to tty%d failed with code %d ' + 'on iteration %d' % (vt_wrap.x_vt, retcode, it)) else: logging.info('Switching to tty%d: passed' % (vt_wrap.x_vt)) status = 1 @@ -463,5 +468,6 @@ def main(): return status + if __name__ == '__main__': exit(main()) diff --git a/bin/gst_pipeline_test.py b/bin/gst_pipeline_test.py index c7a9af5..f022632 100755 --- a/bin/gst_pipeline_test.py +++ b/bin/gst_pipeline_test.py @@ -9,9 +9,9 @@ import sys import time gi.require_version('Gst', '1.0') gi.require_version('GLib', '2.0') -from gi.repository import Gst -from gi.repository import GLib -from subprocess import check_output +from gi.repository import Gst # noqa: E402 +from gi.repository import GLib # noqa: E402 +from subprocess import check_output # noqa: E402 def check_state(device): @@ -22,8 +22,11 @@ def check_state(device): data = sink_info.split("\n") try: - device_name = re.findall(".*Name:\s.*%s.*" % device, sink_info, re.IGNORECASE)[0].lstrip() - sink = re.findall(".*Name:\s(.*%s.*)" % device, sink_info, re.IGNORECASE)[0].lstrip() + device_name = re.findall( + r".*Name:\s.*%s.*" % device, sink_info, re.IGNORECASE)[0].lstrip() + sink = re.findall( + r".*Name:\s(.*%s.*)" % device, sink_info, + re.IGNORECASE)[0].lstrip() status = data[data.index("\t" + device_name) - 1] except (IndexError, ValueError): logging.error("Failed to find status for device: %s" % device) @@ -32,22 +35,26 @@ def check_state(device): os.environ['PULSE_SINK'] = sink logging.info("[ Pulse sink ]".center(80, '=')) logging.info("Device: %s %s" % (device_name.strip(), status.strip())) - return status + return status def main(): parser = ArgumentParser(description='Simple GStreamer pipeline player') - parser.add_argument('PIPELINE', + parser.add_argument( + 'PIPELINE', help='Quoted GStreamer pipeline to launch') - parser.add_argument('-t', '--timeout', + parser.add_argument( + '-t', '--timeout', type=int, required=True, help='Timeout for running the pipeline') - parser.add_argument('-d', '--device', + parser.add_argument( + '-d', '--device', type=str, help="Device to check for status") args = parser.parse_args() - logging.basicConfig(format='%(levelname)s:%(message)s', level=logging.INFO, + logging.basicConfig( + format='%(levelname)s:%(message)s', level=logging.INFO, stream=sys.stdout) exit_code = 0 @@ -63,7 +70,7 @@ def main(): except GLib.GError as error: print("Specified pipeline couldn't be processed.") print("Error when processing pipeline: {}".format(error)) - #Exit harmlessly + # Exit harmlessly return(2) print("Pipeline initialized, now starting playback.") diff --git a/bin/hdd_parking.py b/bin/hdd_parking.py index 6d34904..01022ba 100755 --- a/bin/hdd_parking.py +++ b/bin/hdd_parking.py @@ -25,7 +25,7 @@ """ This script verifies that a systems HDD protection capabilities are triggered when appropriate. There are many implementations -of HDD protection from different OEMs, each implemented in a +of HDD protection from different OEMs, each implemented in a different way, so this script can only support implementations which are known to work and are testable. Currently the list of supported implementations is: @@ -42,10 +42,12 @@ from subprocess import Popen, PIPE TIMEOUT = 15.0 + def hdaps_test(run_time): try: - hdapsd = Popen(['/usr/sbin/hdapsd'], stdout=PIPE, stderr=PIPE, - universal_newlines=True) + hdapsd = Popen( + ['/usr/sbin/hdapsd'], stdout=PIPE, stderr=PIPE, + universal_newlines=True) except OSError as err: print("Unable to start hdapsd: {}".format(err)) return 1 @@ -59,6 +61,7 @@ def hdaps_test(run_time): return 0 return 1 + def main(): # First establish the driver used parser = ArgumentParser("Tests a systems HDD protection capabilities. " @@ -70,5 +73,6 @@ def main(): 'all axis. No particular force should be required.') return hdaps_test(parser.parse_args().timeout) + if __name__ == "__main__": sys.exit(main()) diff --git a/bin/hotkey_tests.py b/bin/hotkey_tests.py index f1b3560..066f36c 100755 --- a/bin/hotkey_tests.py +++ b/bin/hotkey_tests.py @@ -554,5 +554,6 @@ def main(): if failed: raise SystemExit('Some test failed!') + if __name__ == '__main__': main() diff --git a/bin/key_test.py b/bin/key_test.py index 2bd95f9..883bf32 100755 --- a/bin/key_test.py +++ b/bin/key_test.py @@ -29,7 +29,7 @@ import termios from gettext import gettext as _ -from gi.repository import GObject +from gi.repository import GLib from optparse import OptionParser @@ -81,7 +81,7 @@ class Reporter(object): self.scancodes = scancodes self.fileno = os.open("/dev/console", os.O_RDONLY) - GObject.io_add_watch(self.fileno, GObject.IO_IN, self.on_key) + GLib.io_add_watch(self.fileno, GLib.IO_IN, self.on_key) # Set terminal attributes self.saved_attributes = termios.tcgetattr(self.fileno) @@ -324,7 +324,8 @@ class GtkReporter(Reporter): def found_key(self, key): super(GtkReporter, self).found_key(key) - self.icons[key].set_from_stock(self.ICON_TESTED, size=self.ICON_SIZE) + self.icons[key].set_from_icon_name( + self.ICON_TESTED, size=self.ICON_SIZE) self.check_keys() @@ -334,7 +335,7 @@ class GtkReporter(Reporter): stock_icon = self.ICON_UNTESTED else: stock_icon = self.ICON_NOT_REQUIRED - self.icons[key].set_from_stock(stock_icon, self.ICON_SIZE) + self.icons[key].set_from_icon_name(stock_icon, self.ICON_SIZE) self.check_keys() @@ -394,10 +395,10 @@ Hint to find codes: key = Key(codes, name) keys.append(key) - main_loop = GObject.MainLoop() + main_loop = GLib.MainLoop() try: reporter = reporter_factory(main_loop, keys, options.scancodes) - except: + except OSError: parser.error("Failed to initialize interface: %s" % options.interface) try: @@ -408,5 +409,6 @@ Hint to find codes: return reporter.exit_code + if __name__ == "__main__": sys.exit(main(sys.argv[1:])) diff --git a/bin/keyboard_test.py b/bin/keyboard_test.py index 1615106..e87dc2e 100755 --- a/bin/keyboard_test.py +++ b/bin/keyboard_test.py @@ -84,5 +84,6 @@ def main(args): return 0 + if __name__ == "__main__": sys.exit(main(sys.argv[1:])) diff --git a/bin/lock_screen_watcher.py b/bin/lock_screen_watcher.py index 1c30270..e4433c8 100755 --- a/bin/lock_screen_watcher.py +++ b/bin/lock_screen_watcher.py @@ -20,8 +20,8 @@ import dbus import gi from dbus.mainloop.glib import DBusGMainLoop gi.require_version('GLib', '2.0') -from gi.repository import GObject -from gi.repository import GLib +from gi.repository import GObject # noqa: E402 +from gi.repository import GLib # noqa: E402 def filter_cb(bus, message): @@ -41,6 +41,7 @@ def on_timeout_expired(): print("You have failed to perform the required manipulation in time") raise SystemExit(1) + DBusGMainLoop(set_as_default=True) bus = dbus.SessionBus() bus.add_match_string("type='signal',interface='com.ubuntu.Upstart0_6'") diff --git a/bin/lsmod_info.py b/bin/lsmod_info.py index 3b81542..cb7c0bc 100755 --- a/bin/lsmod_info.py +++ b/bin/lsmod_info.py @@ -36,5 +36,6 @@ def main(): print('%s: %s' % (module, version)) return 0 + if __name__ == '__main__': sys.exit(main()) diff --git a/bin/manage_compiz_plugin.py b/bin/manage_compiz_plugin.py index 03c0ed8..be9541f 100755 --- a/bin/manage_compiz_plugin.py +++ b/bin/manage_compiz_plugin.py @@ -29,11 +29,10 @@ from gettext import gettext as _ import argparse import gettext import os -import sys import subprocess import time -KEY="/org/compiz/profiles/unity/plugins/core/active-plugins" +KEY = "/org/compiz/profiles/unity/plugins/core/active-plugins" gettext.textdomain("com.canonical.certification.checkbox") gettext.bindtextdomain("com.canonical.certification.checkbox", @@ -41,8 +40,9 @@ gettext.bindtextdomain("com.canonical.certification.checkbox", plugins = eval(subprocess.check_output(["dconf", "read", KEY])) -parser = argparse.ArgumentParser(description=_("enable/disable compiz plugins"), - epilog=_("Available plugins: {}").format(plugins)) +parser = argparse.ArgumentParser( + description=_("enable/disable compiz plugins"), + epilog=_("Available plugins: {}").format(plugins)) parser.add_argument("plugin", type=str, help=_('Name of plugin to control')) parser.add_argument("action", type=str, choices=['enable', 'disable'], help=_("What to do with the plugin")) diff --git a/bin/memory_compare.py b/bin/memory_compare.py index 02e2def..aaf89f4 100755 --- a/bin/memory_compare.py +++ b/bin/memory_compare.py @@ -25,7 +25,6 @@ import os import sys import re -from math import log, copysign from subprocess import check_output, CalledProcessError, PIPE from checkbox_support.helpers.human_readable_bytes import HumanReadableBytes @@ -44,7 +43,7 @@ class LshwJsonResult: desc_regex = re.compile('System Memory', re.IGNORECASE) def addHardware(self, hardware): - if self.desc_regex.match(str(hardware.get('description',0))): + if self.desc_regex.match(str(hardware.get('description', 0))): self.memory_reported += int(hardware.get('size', 0)) elif 'bank' in hardware['id']: self.banks_reported += int(hardware.get('size', 0)) @@ -134,5 +133,6 @@ def main(): (difference, percentage, threshold), file=sys.stderr) return 1 + if __name__ == "__main__": sys.exit(main()) diff --git a/bin/network.py b/bin/network.py index a646f9c..60f7b0a 100755 --- a/bin/network.py +++ b/bin/network.py @@ -255,14 +255,15 @@ class IPerfPerformanceTest(object): logging.warning("The network test against {} failed because:". format(self.target)) if percent < self.fail_threshold: - logging.error(" Transfer speed: {} Mb/s". - format(throughput)) - logging.error(" {:03.2f}% of theoretical max {} Mb/s\n". - format(percent, int(self.iface.max_speed))) + logging.error(" Transfer speed: {} Mb/s".format(throughput)) + logging.error( + " {:03.2f}% of theoretical max {} Mb/s\n".format( + percent, int(self.iface.max_speed))) if cpu_load > self.cpu_load_fail_threshold: logging.error(" CPU load: {}%".format(cpu_load)) - logging.error(" CPU load is above {}% maximum\n". - format(self.cpu_load_fail_threshold)) + logging.error( + " CPU load is above {}% maximum\n".format( + self.cpu_load_fail_threshold)) return 30 logging.debug("Passed benchmark against {}".format(self.target)) @@ -277,9 +278,11 @@ class StressPerformanceTest: def run(self): if self.iperf3: - iperf_cmd = 'timeout -k 1 320 iperf3 -c {} -t 300'.format(self.target) + iperf_cmd = 'timeout -k 1 320 iperf3 -c {} -t 300'.format( + self.target) else: - iperf_cmd = 'timeout -k 1 320 iperf -c {} -t 300'.format(self.target) + iperf_cmd = 'timeout -k 1 320 iperf -c {} -t 300'.format( + self.target) print("Running iperf...") iperf = subprocess.Popen(shlex.split(iperf_cmd)) @@ -365,14 +368,14 @@ class Interface(socket.socket): ethinfo = check_output(['ethtool', self.interface], universal_newlines=True, stderr=STDOUT).split(' ') - expression = '(\\d+)(base)([A-Z]+)|(\d+)(Mb/s)' + expression = r'(\\d+)(base)([A-Z]+)|(\d+)(Mb/s)' regex = re.compile(expression) if ethinfo: for i in ethinfo: hit = regex.search(i) if hit: - speeds.append(int(re.sub("\D", "", hit.group(0)))) + speeds.append(int(re.sub(r"\D", "", hit.group(0)))) except CalledProcessError as e: logging.error('ethtool returned an error!') logging.error(e.output) @@ -391,7 +394,7 @@ class Interface(socket.socket): for s in line.split(' '): hit = regex.search(s) if hit: - speeds.append(int(re.sub("\D", "", hit.group(0)))) + speeds.append(int(re.sub(r"\D", "", hit.group(0)))) except FileNotFoundError: logging.warning('mii-tool not found! Unable to get max speed') except CalledProcessError as e: diff --git a/bin/network_check.py b/bin/network_check.py index 82c00d9..20b88d5 100755 --- a/bin/network_check.py +++ b/bin/network_check.py @@ -6,7 +6,9 @@ ubuntu.com from subprocess import call from argparse import ArgumentParser import http.client -import urllib.request, urllib.error, urllib.parse +import urllib.request +import urllib.error +import urllib.parse import sys @@ -61,12 +63,14 @@ def main(): try: call(command) except OSError: - print("Zenity missing; unable to report test result:\n %s" % message) + print( + "Zenity missing; unable to report test result:\n %s" % message) if any(results.values()): return 0 else: return 1 + if __name__ == "__main__": sys.exit(main()) diff --git a/bin/network_ntp_test.py b/bin/network_ntp_test.py index 6e2ccf8..f7a2297 100755 --- a/bin/network_ntp_test.py +++ b/bin/network_ntp_test.py @@ -84,13 +84,13 @@ def StartStopNTPD(state, pid=0): SilentCall('/etc/init.d/ntp start') ntpd_status = CheckNTPD() - if status == 0: + if ntpd_status == 0: logging.debug('ntpd restarted with PID: %s' % ntpd_status[1]) else: logging.error('ntpd restart failed for some reason') else: - logging.error('%s is an unknown state, unable to start/stop ntpd' % - state) + logging.error( + '%s is an unknown state, unable to start/stop ntpd' % state) def SyncTime(server): @@ -141,7 +141,8 @@ def SkewTime(): def main(): - description = 'Tests the ability to skew and sync the clock with an NTP server' + description = ( + 'Tests the ability to skew and sync the clock with an NTP server') parser = ArgumentParser(description=description) parser.add_argument('--server', action='store', @@ -173,7 +174,7 @@ def main(): if args.debug: logger.setLevel(logging.DEBUG) logger.addHandler(handler) - + # Make sure NTP is installed if not os.access('/usr/sbin/ntpdate', os.F_OK): logging.error('NTP is not installed!') @@ -208,5 +209,6 @@ def main(): else: return 1 + if __name__ == '__main__': sys.exit(main()) diff --git a/bin/network_reconnect_resume_test.py b/bin/network_reconnect_resume_test.py index 8861eb6..17a220e 100755 --- a/bin/network_reconnect_resume_test.py +++ b/bin/network_reconnect_resume_test.py @@ -43,16 +43,17 @@ def get_wifi_reconnect_times(): Returns a list of all the timestamps for wifi reconnects. """ data = subprocess.check_output(['dmesg'], universal_newlines=True) - syntax = re.compile("\[(.*)\] wlan.* associated") + syntax = re.compile(r"\[(.*)\] wlan.* associated") results = re.findall(syntax, data) return map(float, results) + def get_wired_reconnect_times(): """ Returns a list of all the timestamps for wired reconnects. """ data = subprocess.check_output(['dmesg'], universal_newlines=True) - syntax = re.compile("\[(.*)\].*eth.* Link is [uU]p") + syntax = re.compile(r"\[(.*)\].*eth.* Link is [uU]p") results = re.findall(syntax, data) return map(float, results) @@ -63,7 +64,7 @@ def get_resume_time(): If no resume is found, None is returned. """ data = subprocess.check_output(['dmesg'], universal_newlines=True) - syntax = re.compile("\[(.*)\].ACPI: Waking up from system sleep state S3") + syntax = re.compile(r"\[(.*)\].ACPI: Waking up from system sleep state S3") results = re.findall(syntax, data) if not results: return None @@ -85,7 +86,7 @@ def main(): args = parser.parse_args() timedif = get_time_difference(args.device) - + if not timedif: return 0 @@ -98,5 +99,6 @@ def main(): print("PASS: the network connected within the allotted time") return 0 + if __name__ == "__main__": sys.exit(main()) diff --git a/bin/network_restart.py b/bin/network_restart.py index 14c7357..63a99fb 100755 --- a/bin/network_restart.py +++ b/bin/network_restart.py @@ -96,7 +96,8 @@ class GtkApplication(Application): def __init__(self, address, times): Application.__init__(self, address, times) - dialog = Gtk.Dialog(title='Network restart', + dialog = Gtk.Dialog( + title='Network restart', buttons=(Gtk.STOCK_CANCEL, Gtk.ResponseType.CANCEL)) dialog.set_default_size(300, 100) @@ -187,7 +188,8 @@ def ping(address): """ logging.info('Pinging {0!r}...'.format(address)) try: - check_call('ping -c 1 -w 5 {0}'.format(address), + check_call( + 'ping -c 1 -w 5 {0}'.format(address), stdout=open(os.devnull, 'w'), stderr=STDOUT, shell=True) except CalledProcessError: return False @@ -209,8 +211,9 @@ class Networking: """ output = check_output(['/sbin/ifconfig', '-s', '-a']) lines = output.splitlines()[1:] - interfaces = ([interface for interface in - [line.split()[0] for line in lines] if interface != 'lo']) + interfaces = ( + [interface for interface in + [line.split()[0] for line in lines] if interface != 'lo']) return interfaces def restart(self): @@ -287,21 +290,24 @@ def parse_args(): """ Parse command line options """ - parser = ArgumentParser('Reboot networking interface ' - 'and verify that is up again afterwards') - parser.add_argument('-a', '--address', default='ubuntu.com', + parser = ArgumentParser( + 'Reboot networking interface and verify that is up again afterwards') + parser.add_argument( + '-a', '--address', default='ubuntu.com', help=('Address to ping to verify that network connection is up ' "('%(default)s' by default)")) parser.add_argument('-o', '--output', default='/var/log', help='The path to the log directory. \ Default is /var/log') - parser.add_argument('-t', '--times', + parser.add_argument( + '-t', '--times', type=int, default=1, help=('Number of times that the network interface has to be restarted ' '(%(default)s by default)')) log_levels = ['notset', 'debug', 'info', 'warning', 'error', 'critical'] - parser.add_argument('--log-level', dest='log_level_str', default='notset', + parser.add_argument( + '--log-level', dest='log_level_str', default='notset', choices=log_levels, help=('Log level. ' 'One of {0} or {1} (%(default)s by default)' diff --git a/bin/optical_read_test.py b/bin/optical_read_test.py index b3cf23d..7560db1 100755 --- a/bin/optical_read_test.py +++ b/bin/optical_read_test.py @@ -7,7 +7,7 @@ import filecmp import shutil from argparse import ArgumentParser -from subprocess import Popen, PIPE +from subprocess import Popen, PIPE, SubprocessError DEFAULT_DIR = '/tmp/checkbox.optical' DEFAULT_DEVICE_DIR = 'device' @@ -18,11 +18,7 @@ CDROM_ID = '/lib/udev/cdrom_id' def _command(command, shell=True): - proc = Popen(command, - shell=shell, - stdout=PIPE, - stderr=PIPE - ) + proc = Popen(command, shell=shell, stdout=PIPE, stderr=PIPE) return proc @@ -33,7 +29,7 @@ def _command_out(command, shell=True): def compare_tree(source, target): for dirpath, dirnames, filenames in os.walk(source): - #if both tree are empty return false + # if both tree are empty return false if dirpath == source and dirnames == [] and filenames == []: return False for name in filenames: @@ -64,21 +60,21 @@ def read_test(device): mount = _command("mount -o ro %s %s" % (device, device_dir)) mount.communicate() if mount.returncode != 0: - print("Unable to mount %s to %s" % - (device, device_dir), file=sys.stderr) + print("Unable to mount %s to %s" % + (device, device_dir), file=sys.stderr) return False file_copy = _command("cp -dpR %s %s" % (device_dir, image_dir)) file_copy.communicate() if file_copy.returncode != 0: - print("Failed to copy files from %s to %s" % - (device_dir, image_dir), file=sys.stderr) + print("Failed to copy files from %s to %s" % + (device_dir, image_dir), file=sys.stderr) return False if compare_tree(device_dir, image_dir): passed = True - except: - print("File Comparison failed while testing %s" % device, - file=sys.stderr) + except (SubprocessError, OSError): + print("File Comparison failed while testing %s" % device, + file=sys.stderr) passed = False finally: _command("umount %s" % device_dir).communicate(3) @@ -88,7 +84,7 @@ def read_test(device): if passed: print("File Comparison passed (%s)" % device) - + return passed @@ -127,8 +123,9 @@ def main(): print("Testing %s on %s ... " % (test, device), file=sys.stdout) tester = "%s_test" % test return_values.append(globals()[tester](device)) - + return False in return_values + if __name__ == "__main__": sys.exit(main()) diff --git a/bin/pm_log_check.py b/bin/pm_log_check.py index cb75685..fae8eca 100755 --- a/bin/pm_log_check.py +++ b/bin/pm_log_check.py @@ -36,8 +36,8 @@ class Parser(object): """ Reboot test log file parser """ - is_logging_line = (re.compile('^\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2},\d{3}') - .search) + is_logging_line = (re.compile( + r'^\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2},\d{3}').search) is_getting_info_line = (re.compile('Gathering hardware information...$') .search) is_executing_line = (re.compile("Executing: '(?P.*)'...$") @@ -103,8 +103,10 @@ class Parser(object): if self.is_output_line(line): command_output = {} break - if (self.is_executing_line(line) - or self.is_getting_info_line(line)): + if ( + self.is_executing_line(line) + or self.is_getting_info_line(line) + ): # Skip commands with no output iterator.unnext(line) return None @@ -132,8 +134,10 @@ class Parser(object): # for the field value value = [] for line in iterator: - if (self.is_logging_line(line) - or self.is_field_line(line)): + if ( + self.is_logging_line(line) + or self.is_field_line(line) + ): iterator.unnext(line) break diff --git a/bin/pm_test.py b/bin/pm_test.py index 18cf97c..fcaf7d9 100755 --- a/bin/pm_test.py +++ b/bin/pm_test.py @@ -20,7 +20,7 @@ from configparser import ConfigParser from datetime import datetime, timedelta from time import localtime, time gi.require_version("Gtk", "3.0") -from gi.repository import GObject, Gtk +from gi.repository import GObject, Gtk # noqa: E402 def main(): @@ -266,9 +266,9 @@ class PowerManagementOperation(): count = count_s3 + count_s2idle if count != total_suspends_expected: problems = ( - "Found {} occurrences of '{}'. Expected {}".format( - count, magic_line.strip(), - total_suspends_expected)) + "Found {} occurrences of S3/S2idle." + " Expected {}".format( + count, total_suspends_expected)) except FileNotFoundError: problems = "Error opening {}".format(fwts_log_path) if problems: @@ -367,13 +367,13 @@ class WakeUpAlarm(): with open(cls.ALARM_FILENAME) as alarm_file: wakeup_time_stored_str = alarm_file.read() - if not re.match('\d+', wakeup_time_stored_str): + if not re.match(r'\d+', wakeup_time_stored_str): subprocess.check_call('echo "+%d" > %s' % (timeout, cls.ALARM_FILENAME), shell=True) with open(cls.ALARM_FILENAME) as alarm_file2: wakeup_time_stored_str = alarm_file2.read() - if not re.match('\d+', wakeup_time_stored_str): + if not re.match(r'\d+', wakeup_time_stored_str): logging.error('Invalid wakeup time format: {0!r}' .format(wakeup_time_stored_str)) sys.exit(1) @@ -396,7 +396,7 @@ class WakeUpAlarm(): sys.exit(1) with open(cls.RTC_FILENAME) as rtc_file: - separator_regex = re.compile('\s+:\s+') + separator_regex = re.compile(r'\s+:\s+') rtc_data = dict([separator_regex.split(line.rstrip()) for line in rtc_file]) logging.debug('RTC data:\n{0}' @@ -582,14 +582,15 @@ class CountdownDialog(Gtk.Dialog): .run().stdout), ethernet=(Command('lspci | grep Ethernet') .run().stdout), - ifconfig=(Command("ifconfig -a | grep -A1 '^\w'") + ifconfig=(Command( + r"ifconfig -a | grep -A1 '^\w'") .run().stdout), - iwconfig=(Command("iwconfig | grep -A1 '^\w'") + iwconfig=(Command(r"iwconfig | grep -A1 '^\w'") .run().stdout))) logging.debug('Bluetooth Device:\n' '{hciconfig}' - .format(hciconfig=(Command("hciconfig -a " - "| grep -A2 '^\w'") + .format(hciconfig=(Command(r"hciconfig -a " + r"| grep -A2 '^\w'") .run().stdout))) logging.debug('Video Card:\n' '{lspci}' @@ -770,7 +771,7 @@ Exec=sudo {script} -r {repetitions} -w {wakeup} --hardware-delay {hardware_delay Type=Application X-GNOME-Autostart-enabled=true Hidden=false -""" +""" # noqa: E501 def __init__(self, args, user=None): self.args = args @@ -1012,7 +1013,7 @@ class MyArgumentParser(): '{0}.{1}.{2}.log'.format( os.path.splitext(os.path.basename(__file__))[0], args.pm_operation, - args.total))) + args.total)) return args, extra_args diff --git a/bin/process_wait.py b/bin/process_wait.py index 6107d1b..1295991 100755 --- a/bin/process_wait.py +++ b/bin/process_wait.py @@ -36,14 +36,17 @@ def main(args): usage = "Usage: %prog PROCESS [PROCESS...]" parser = OptionParser(usage=usage) - parser.add_option("-s", "--sleep", + parser.add_option( + "-s", "--sleep", type="int", default=default_sleep, help="Number of seconds to sleep between checks.") - parser.add_option("-t", "--timeout", + parser.add_option( + "-t", "--timeout", type="int", help="Number of seconds to timeout from sleeping.") - parser.add_option("-u", "--uid", + parser.add_option( + "-u", "--uid", help="Effective user name or id of the running processes") (options, processes) = parser.parse_args(args) diff --git a/bin/pulse-active-port-change.py b/bin/pulse-active-port-change.py index 90abed8..baf0152 100755 --- a/bin/pulse-active-port-change.py +++ b/bin/pulse-active-port-change.py @@ -71,20 +71,21 @@ class AudioPlugDetection: doc = parse_pactl_output(text) cfg = set() for record in doc.record_list: - active_port = None - port_availability = None - # We go through the attribute list once to try to find an active port - for attr in record.attribute_list: - if attr.name == "Active Port": - active_port = attr.value - # If there is one, we retrieve its availability flag - if active_port: - for attr in record.attribute_list: - if attr.name == "Ports": - for port in attr.value: - if port.name == active_port: - port_availability = port.availability - cfg.add((record.name, active_port, port_availability)) + active_port = None + port_availability = None + # We go through the attribute list once to try to find + # an active port + for attr in record.attribute_list: + if attr.name == "Active Port": + active_port = attr.value + # If there is one, we retrieve its availability flag + if active_port: + for attr in record.attribute_list: + if attr.name == "Ports": + for port in attr.value: + if port.name == active_port: + port_availability = port.availability + cfg.add((record.name, active_port, port_availability)) return cfg def on_timeout(self, signum, frame): diff --git a/bin/removable_storage_test.py b/bin/removable_storage_test.py index 82f1595..4253cbf 100755 --- a/bin/removable_storage_test.py +++ b/bin/removable_storage_test.py @@ -15,26 +15,30 @@ import time import gi gi.require_version('GUdev', '1.0') -from gi.repository import GUdev - -from checkbox_support.dbus import connect_to_system_bus -from checkbox_support.dbus.udisks2 import UDISKS2_BLOCK_INTERFACE -from checkbox_support.dbus.udisks2 import UDISKS2_DRIVE_INTERFACE -from checkbox_support.dbus.udisks2 import UDISKS2_FILESYSTEM_INTERFACE -from checkbox_support.dbus.udisks2 import UDISKS2_LOOP_INTERFACE -from checkbox_support.dbus.udisks2 import UDisks2Model, UDisks2Observer -from checkbox_support.dbus.udisks2 import is_udisks2_supported -from checkbox_support.dbus.udisks2 import lookup_udev_device -from checkbox_support.dbus.udisks2 import map_udisks1_connection_bus -from checkbox_support.heuristics.udisks2 import is_memory_card -from checkbox_support.helpers.human_readable_bytes import HumanReadableBytes -from checkbox_support.parsers.udevadm import CARD_READER_RE -from checkbox_support.parsers.udevadm import GENERIC_RE -from checkbox_support.parsers.udevadm import FLASH_RE -from checkbox_support.parsers.udevadm import find_pkname_is_root_mountpoint -from checkbox_support.udev import get_interconnect_speed -from checkbox_support.udev import get_udev_block_devices -from checkbox_support.udev import get_udev_xhci_devices +from gi.repository import GUdev # noqa: E402 + +from checkbox_support.dbus import connect_to_system_bus # noqa: E402 +from checkbox_support.dbus.udisks2 import ( + UDISKS2_BLOCK_INTERFACE, + UDISKS2_DRIVE_INTERFACE, + UDISKS2_FILESYSTEM_INTERFACE, + UDISKS2_LOOP_INTERFACE, + UDisks2Model, + UDisks2Observer, + is_udisks2_supported, + lookup_udev_device, + map_udisks1_connection_bus) # noqa: E402 +from checkbox_support.heuristics.udisks2 import is_memory_card # noqa: E402 +from checkbox_support.helpers.human_readable_bytes import ( + HumanReadableBytes) # noqa: E402 +from checkbox_support.parsers.udevadm import ( + CARD_READER_RE, + GENERIC_RE, + FLASH_RE, + find_pkname_is_root_mountpoint) # noqa: E402 +from checkbox_support.udev import get_interconnect_speed # noqa: E402 +from checkbox_support.udev import get_udev_block_devices # noqa: E402 +from checkbox_support.udev import get_udev_xhci_devices # noqa: E402 class ActionTimer(): @@ -109,8 +113,8 @@ def on_ubuntucore(): snap = os.getenv("SNAP") if snap: with open(os.path.join(snap, 'meta/snap.yaml')) as f: - for l in f.readlines(): - if l == "confinement: classic\n": + for line in f.readlines(): + if line == "confinement: classic\n": return False return True return False @@ -855,7 +859,6 @@ def main(): # controller drivers. # This will raise KeyError for no # associated disk device was found. - xhci_disks = test.get_disks_xhci() if test.get_disks_xhci().get(disk, '') != 'xhci': raise SystemExit( "\t\tDisk does not use xhci_hcd.") diff --git a/bin/removable_storage_watcher.py b/bin/removable_storage_watcher.py index d5a0722..7120f6d 100755 --- a/bin/removable_storage_watcher.py +++ b/bin/removable_storage_watcher.py @@ -9,16 +9,21 @@ import sys import gi gi.require_version('GUdev', '1.0') -from gi.repository import GObject, GUdev - -from checkbox_support.dbus import connect_to_system_bus -from checkbox_support.dbus.udisks2 import UDisks2Model, UDisks2Observer -from checkbox_support.dbus.udisks2 import is_udisks2_supported -from checkbox_support.dbus.udisks2 import lookup_udev_device -from checkbox_support.dbus.udisks2 import map_udisks1_connection_bus -from checkbox_support.heuristics.udisks2 import is_memory_card -from checkbox_support.parsers.udevadm import CARD_READER_RE, GENERIC_RE, FLASH_RE -from checkbox_support.udev import get_interconnect_speed, get_udev_block_devices +from gi.repository import GObject, GUdev # noqa: E402 + +from checkbox_support.dbus import connect_to_system_bus # noqa: E402 +from checkbox_support.dbus.udisks2 import UDisks2Model # noqa: E402 +from checkbox_support.dbus.udisks2 import UDisks2Observer # noqa: E402 +from checkbox_support.dbus.udisks2 import is_udisks2_supported # noqa: E402 +from checkbox_support.dbus.udisks2 import lookup_udev_device # noqa: E402 +from checkbox_support.dbus.udisks2 import ( + map_udisks1_connection_bus) # noqa: E402 +from checkbox_support.heuristics.udisks2 import is_memory_card # noqa: E402 +from checkbox_support.parsers.udevadm import CARD_READER_RE # noqa: E402 +from checkbox_support.parsers.udevadm import GENERIC_RE # noqa: E402 +from checkbox_support.parsers.udevadm import FLASH_RE # noqa: E402 +from checkbox_support.udev import get_interconnect_speed # noqa: E402 +from checkbox_support.udev import get_udev_block_devices # noqa: E402 # Record representing properties of a UDisks1 Drive object needed by the # UDisks1 version of the watcher implementation @@ -121,19 +126,24 @@ class UDisks1StorageDeviceListener: logging.debug("Desired bus devices: %s", desired_bus_devices) for dev in desired_bus_devices: if self._memorycard: - if (dev.bus != 'sdio' + if ( + dev.bus != 'sdio' and not FLASH_RE.search(dev.media) and not CARD_READER_RE.search(dev.model) - and not GENERIC_RE.search(dev.vendor)): - logging.debug("The device does not seem to be a memory" - " card (bus: %r, model: %r), skipping", - dev.bus, dev.model) + and not GENERIC_RE.search(dev.vendor) + ): + logging.debug( + "The device does not seem to be a memory" + " card (bus: %r, model: %r), skipping", + dev.bus, dev.model) return print(message % {'bus': 'memory card', 'file': dev.file}) else: - if (FLASH_RE.search(dev.media) + if ( + FLASH_RE.search(dev.media) or CARD_READER_RE.search(dev.model) - or GENERIC_RE.search(dev.vendor)): + or GENERIC_RE.search(dev.vendor) + ): logging.debug("The device seems to be a memory" " card (bus: %r (model: %r), skipping", dev.bus, dev.model) @@ -180,8 +190,7 @@ class UDisks1StorageDeviceListener: message = "Expected memory card device %(file)s inserted" else: message = "Expected %(bus)s device %(file)s inserted" - self.verify_device_change(inserted_devices, - message=message) + self.verify_device_change(inserted_devices, message=message) def add_detected(self, added_path): logging.debug("UDisks1 reports device has been added: %s", added_path) @@ -205,8 +214,9 @@ class UDisks1StorageDeviceListener: removed_devices = list(set(self._starting_devices) - set(current_devices)) logging.debug("Computed removed devices: %s", removed_devices) - self.verify_device_change(removed_devices, - message="Removable %(bus)s device %(file)s has been removed") + self.verify_device_change( + removed_devices, + message="Removable %(bus)s device %(file)s has been removed") def get_existing_devices(self): logging.debug("Getting existing devices from UDisks1") @@ -221,12 +231,9 @@ class UDisks1StorageDeviceListener: device_props = dbus.Interface(device_obj, dbus.PROPERTIES_IFACE) udisks = 'org.freedesktop.UDisks.Device' - _device_file = device_props.Get(udisks, - "DeviceFile") - _bus = device_props.Get(udisks, - "DriveConnectionInterface") - _speed = device_props.Get(udisks, - "DriveConnectionSpeed") + _device_file = device_props.Get(udisks, "DeviceFile") + _bus = device_props.Get(udisks, "DriveConnectionInterface") + _speed = device_props.Get(udisks, "DriveConnectionSpeed") _parent_model = '' _parent_media = '' _parent_vendor = '' @@ -411,7 +418,7 @@ class UDisks2StorageDeviceListener: UDISKS2_DRIVE_PROPERTY_CONNECTION_BUS = "ConnectionBus" def __init__(self, system_bus, loop, action, devices, minimum_speed, - memorycard, unmounted = False): + memorycard, unmounted=False): # Store the desired minimum speed of the device in Mbit/s. The argument # is passed as the number of bits per second so let's fix that. self._desired_minimum_speed = minimum_speed / 10 ** 6 @@ -581,24 +588,30 @@ class UDisks2StorageDeviceListener: continue # For devices with empty "ConnectionBus" property, don't # require the device to be mounted - if (record.value.iface_name == + if ( + record.value.iface_name == "org.freedesktop.UDisks2.Drive" and record.value.delta_type == DELTA_TYPE_PROP and record.value.prop_name == "ConnectionBus" - and record.value.prop_value == ""): + and record.value.prop_value == "" + ): needs.remove('mounted') # Detect block devices designated for filesystems - if (record.value.iface_name == + if ( + record.value.iface_name == "org.freedesktop.UDisks2.Block" and record.value.delta_type == DELTA_TYPE_PROP and record.value.prop_name == "IdUsage" - and record.value.prop_value == "filesystem"): + and record.value.prop_value == "filesystem" + ): found.add('block-fs') # Memorize the block device path - elif (record.value.iface_name == + elif ( + record.value.iface_name == "org.freedesktop.UDisks2.Block" and record.value.delta_type == DELTA_TYPE_PROP - and record.value.prop_name == "PreferredDevice"): + and record.value.prop_name == "PreferredDevice" + ): object_block_device = record.value.prop_value # Ensure the device is a partition elif (record.value.iface_name == @@ -845,8 +858,9 @@ def main(): description = "Wait for the specified device to be inserted or removed." parser = argparse.ArgumentParser(description=description) parser.add_argument('action', choices=['insert', 'remove']) - parser.add_argument('device', choices=['usb', 'sdio', 'firewire', 'scsi', - 'ata_serial_esata'], nargs="+") + parser.add_argument( + 'device', choices=['usb', 'sdio', 'firewire', 'scsi', + 'ata_serial_esata'], nargs="+") memorycard_help = ("Memory cards devices on bus other than sdio require " "this parameter to identify them as such") parser.add_argument('--memorycard', action="store_true", @@ -902,5 +916,6 @@ def main(): except KeyboardInterrupt: return 1 + if __name__ == "__main__": sys.exit(main()) diff --git a/bin/resolution_test.py b/bin/resolution_test.py index 55a7828..77238ff 100755 --- a/bin/resolution_test.py +++ b/bin/resolution_test.py @@ -5,7 +5,7 @@ import sys from argparse import ArgumentParser gi.require_version('Gdk', '3.0') -from gi.repository import Gdk +from gi.repository import Gdk # noqa: E402 def check_resolution(): @@ -28,11 +28,13 @@ def compare_resolution(min_h, min_v): def main(): parser = ArgumentParser() - parser.add_argument("--horizontal", + parser.add_argument( + "--horizontal", type=int, default=0, help="Minimum acceptable horizontal resolution.") - parser.add_argument("--vertical", + parser.add_argument( + "--vertical", type=int, default=0, help="Minimum acceptable vertical resolution.") diff --git a/bin/rotation_test.py b/bin/rotation_test.py index 6bf90fd..de70f4a 100755 --- a/bin/rotation_test.py +++ b/bin/rotation_test.py @@ -27,7 +27,8 @@ import time import subprocess gi.require_version('Gdk', '3.0') -from gi.repository import Gdk +from gi.repository import Gdk # noqa: E402 + def main(): """Run rotation cycling by running xrandr command.""" @@ -41,5 +42,6 @@ def main(): ['xrandr', '--output', output, '--rotation', rotation]) time.sleep(4) + if __name__ == '__main__': exit(main()) diff --git a/bin/sleep_test.py b/bin/sleep_test.py index 423fd6c..3483951 100755 --- a/bin/sleep_test.py +++ b/bin/sleep_test.py @@ -184,19 +184,19 @@ class SuspendTest(): if perf: for idx in range(0, len(loglist)): if 'PM: Syncing filesystems' in loglist[idx]: - sleep_start_time = re.split('[\[\]]', + sleep_start_time = re.split(r'[\[\]]', loglist[idx])[1].strip() logging.debug('Sleep started at %s' % sleep_start_time) if 'ACPI: Low-level resume complete' in loglist[idx]: - sleep_end_time = re.split('[\[\]]', + sleep_end_time = re.split(r'[\[\]]', loglist[idx - 1])[1].strip() logging.debug('Sleep ended at %s' % sleep_end_time) - resume_start_time = re.split('[\[\]]', + resume_start_time = re.split(r'[\[\]]', loglist[idx])[1].strip() logging.debug('Resume started at %s' % resume_start_time) idx += 1 if 'Restarting tasks' in loglist[idx]: - resume_end_time = re.split('[\[\]]', + resume_end_time = re.split(r'[\[\]]', loglist[idx])[1].strip() logging.debug('Resume ended at %s' % resume_end_time) if self.end_marker in loglist[idx]: @@ -395,5 +395,6 @@ def main(): options.iterations) return 0 + if __name__ == '__main__': sys.exit(main()) diff --git a/bin/sleep_test_log_check.py b/bin/sleep_test_log_check.py index aae7556..350bbbb 100755 --- a/bin/sleep_test_log_check.py +++ b/bin/sleep_test_log_check.py @@ -138,8 +138,9 @@ def main(): logging.basicConfig(level=args.debug) - #Create a generator and get our lines - log = (line.rstrip() for line in open(args.logfile, 'rt', encoding="UTF-8")) + # Create a generator and get our lines + log = ( + line.rstrip() for line in open(args.logfile, 'rt', encoding="UTF-8")) # End result will be a dictionary with a key per level, value is another # dictionary with a key per test (s3, s4, ...) and a list of all failures @@ -166,9 +167,8 @@ def main(): # Report what I found for level in sorted(results.keys()): - if results[level]: # Yes, we can have an empty level. We may have - # seen the levelheader but had it report no - # failures. + if results[level]: # Yes, we can have an empty level. + # We may have seen the levelheader but had it report no failures. print("{} failures:".format(level)) for test in results[level].keys(): print(" {}: {} failures".format(test, @@ -184,8 +184,8 @@ def main(): logging.error("No fwts test summaries found, " "possible malformed fwts log file") return_code = 2 - elif not results.keys(): # If it has no keys, means we didn't see any - # FWTS levels + elif not results.keys(): + # If it has no keys, means we didn't see any FWTS levels logging.error("None of the summaries contained failure levels, " "possible malformed fwts log file") return_code = 2 @@ -197,5 +197,6 @@ def main(): return return_code + if __name__ == '__main__': sys.exit(main()) diff --git a/bin/sleep_time_check.py b/bin/sleep_time_check.py index f9513fa..90e1bc7 100755 --- a/bin/sleep_time_check.py +++ b/bin/sleep_time_check.py @@ -53,13 +53,18 @@ def main(): print(e) return 1 - if (sleep_time is None or resume_time is None) or \ - (len(sleep_times) != len(resume_times)): + if ( + (sleep_time is None or resume_time is None) or + (len(sleep_times) != len(resume_times)) + ): print("ERROR: One or more times was not reported correctly") return 1 - print("Average time to enter sleep state: %.4f seconds" % mean(sleep_times)) - print("Average time to resume from sleep state: %.4f seconds" % mean(resume_times)) + print( + "Average time to enter sleep state: %.4f seconds" % mean(sleep_times)) + print( + "Average time to resume from sleep state: %.4f seconds" % mean( + resume_times)) failed = 0 if sleep_time > args.sleep_threshold: @@ -76,5 +81,6 @@ def main(): return failed + if __name__ == "__main__": sys.exit(main()) diff --git a/bin/stress_ng_test.py b/bin/stress_ng_test.py index b8c2586..35aad2d 100755 --- a/bin/stress_ng_test.py +++ b/bin/stress_ng_test.py @@ -138,7 +138,7 @@ def num_numa_nodes(): try: return int(run(['numactl', '--hardware'], stdout=PIPE).stdout.split()[1]) - except: + except (ValueError, OSError, IndexError): return 1 diff --git a/bin/test_bt_keyboard.py b/bin/test_bt_keyboard.py index 1457a1d..782ae20 100755 --- a/bin/test_bt_keyboard.py +++ b/bin/test_bt_keyboard.py @@ -5,7 +5,7 @@ # Written by: # Maciej Kisielewski -import checkbox_support.bt_helper +import checkbox_support.bt_helper as bt_helper def main(): diff --git a/bin/touchpad_driver_info.py b/bin/touchpad_driver_info.py index 431b874..9f6278f 100755 --- a/bin/touchpad_driver_info.py +++ b/bin/touchpad_driver_info.py @@ -67,8 +67,8 @@ def main(): if attributes: modinfo = TouchpadDriver(attributes['driver']) attributes['version'] = modinfo.driver_version - print("%s: %s\n%s: %s\n%s: %s\n" % - ('Device', attributes['product'], + print("%s: %s\n%s: %s\n%s: %s\n" % ( + 'Device', attributes['product'], 'Driver', attributes['driver'], 'Driver Version', attributes['version'])) else: diff --git a/bin/touchpad_test.py b/bin/touchpad_test.py index bc6a474..6ecde92 100755 --- a/bin/touchpad_test.py +++ b/bin/touchpad_test.py @@ -8,8 +8,8 @@ from gettext import gettext as _ gi.require_version('Gdk', '3.0') gi.require_version('Gio', '2.0') gi.require_version("Gtk", "3.0") -from gi.repository import Gio, Gtk, Gdk -from optparse import OptionParser +from gi.repository import Gio, Gtk, Gdk # noqa: E402 +from optparse import OptionParser # noqa: E402 EXIT_WITH_FAILURE = 1 @@ -205,5 +205,6 @@ def main(args): return scroller.exit_code + if __name__ == "__main__": sys.exit(main(sys.argv[1:])) diff --git a/bin/virtualization.py b/bin/virtualization.py index 2b7cd54..10b827b 100755 --- a/bin/virtualization.py +++ b/bin/virtualization.py @@ -330,7 +330,7 @@ class KVMTest(object): # Attempt download try: - resp = urllib.request.urlretrieve(full_url, cloud_iso) + urllib.request.urlretrieve(full_url, cloud_iso) except (IOError, OSError, urllib.error.HTTPError, @@ -400,25 +400,6 @@ class KVMTest(object): Generate Cloud meta data and creates an iso object to be mounted as virtual device to instance during boot. """ - - user_data = """\ -#cloud-config - -runcmd: - - [ sh, -c, echo "========= CERTIFICATION TEST =========" ] - -power_state: - mode: halt - message: Bye - timeout: 480 - -final_message: CERTIFICATION BOOT COMPLETE -""" - - meta_data = """\ -{ echo instance-id: iid-local01; echo local-hostname, certification; } -""" - for file in ['user-data', 'meta-data']: logging.debug("Creating cloud %s", file) with open(file, "wt") as data_file: @@ -427,11 +408,11 @@ final_message: CERTIFICATION BOOT COMPLETE # Create Data ISO hosting user & meta cloud config data try: - iso_build = check_output( + check_output( ['genisoimage', '-output', 'seed.iso', '-volid', 'cidata', '-joliet', '-rock', 'user-data', 'meta-data'], universal_newlines=True) - except CalledProcessError as exception: + except CalledProcessError: logging.exception("Cloud data disk creation failed") def log_check(self, stream): @@ -483,7 +464,7 @@ final_message: CERTIFICATION BOOT COMPLETE self.create_cloud_disk() # Boot Virtual Machine - instance = self.boot_image(self.image) + self.boot_image(self.image) # If running in console, reset console window to regain # control from VM Serial I/0 @@ -659,7 +640,7 @@ class LXDTest(object): logging.debug("Attempting download of {} from {}".format(filename, url)) try: - resp = urllib.request.urlretrieve(url, filename) + urllib.request.urlretrieve(url, filename) except (IOError, OSError, urllib.error.HTTPError, diff --git a/bin/volume_test.py b/bin/volume_test.py index a949f93..d1aeffd 100755 --- a/bin/volume_test.py +++ b/bin/volume_test.py @@ -8,12 +8,12 @@ from subprocess import check_output TYPES = ("source", "sink") -active_entries_regex = re.compile("\* index.*?(?=properties)", re.DOTALL) -entries_regex = re.compile("index.*?(?=properties)", re.DOTALL) -index_regex = re.compile("(?<=index: )[0-9]*") -muted_regex = re.compile("(?<=muted: ).*") -volume_regex = re.compile("(?<=volume: 0: )\s*[0-9]*") -name_regex = re.compile("(?<=name:).*") +active_entries_regex = re.compile(r"\* index.*?(?=properties)", re.DOTALL) +entries_regex = re.compile(r"index.*?(?=properties)", re.DOTALL) +index_regex = re.compile(r"(?<=index: )[0-9]*") +muted_regex = re.compile(r"(?<=muted: ).*") +volume_regex = re.compile(r"(?<=volume: 0: )\s*[0-9]*") +name_regex = re.compile(r"(?<=name:).*") def check_muted(): @@ -27,26 +27,29 @@ def check_muted(): pacmd_entries = check_output(["pacmd", "list-%ss" % vtype], universal_newlines=True) except Exception as e: - print("Error when running pacmd list-%ss: %s" % (vtype, e), - file=sys.stderr) + print( + "Error when running pacmd list-%ss: %s" % (vtype, e), + file=sys.stderr) return 1 active_entry_match = active_entries_regex.search(pacmd_entries) if active_entry_match: active_entry = active_entry_match.group() else: - print("Unable to find a %s active_entry in the pacmd list-%ss"\ - " output\npacmd output was: %s" % - (vtype, vtype, pacmd_entries), file=sys.stderr) + print( + "Unable to find a %s active_entry in the pacmd list-%ss" + " output\npacmd output was: %s" % + (vtype, vtype, pacmd_entries), file=sys.stderr) return 1 name_match = name_regex.search(active_entry) if name_match: name = name_match.group() else: - print("Unable to determine device bus information from the"\ - " pacmd list-%ss output\npacmd output was: %s" % - (vtype, pacmd_entries), file=sys.stderr) + print( + "Unable to determine device bus information from the" + " pacmd list-%ss output\npacmd output was: %s" % + (vtype, pacmd_entries), file=sys.stderr) return 1 muted_match = muted_regex.search(active_entry) @@ -58,9 +61,10 @@ def check_muted(): else: print("PASS: Audio is not muted on %s %s" % (name, vtype)) else: - print("Unable to find mute information in the pacmd list-%ss"\ - " output for device %s\npacmd output was: %s" % - (vtype, name, pacmd_entries), file=sys.stderr) + print( + "Unable to find mute information in the pacmd list-%ss" + " output for device %s\npacmd output was: %s" % + (vtype, name, pacmd_entries), file=sys.stderr) return 1 return retval @@ -77,8 +81,9 @@ def check_volume(minvol, maxvol): pacmd_entries = check_output(["pacmd", "list-%ss" % vtype], universal_newlines=True) except Exception as e: - print("Error when running pacmd list-%ss: %s" % (vtype, e), - file=sys.stderr) + print( + "Error when running pacmd list-%ss: %s" % (vtype, e), + file=sys.stderr) return 1 entries = entries_regex.findall(pacmd_entries) @@ -88,32 +93,36 @@ def check_volume(minvol, maxvol): if name_match: name = name_match.group() else: - print("Unable to determine device bus information from the"\ - " pacmd list-%ss output\npacmd output was: %s" % - (vtype, pacmd_entries), file=sys.stderr) + print( + "Unable to determine device bus information from the" + " pacmd list-%ss output\npacmd output was: %s" % + (vtype, pacmd_entries), file=sys.stderr) return 1 volume_match = volume_regex.search(entry) if volume_match: volume = int(volume_match.group().strip()) if volume > maxvol: - print ("FAIL: Volume of %d is greater than"\ - " maximum of %d for %s %s" % (volume, maxvol, - name, vtype)) + print( + "FAIL: Volume of %d is greater than" + " maximum of %d for %s %s" % + (volume, maxvol, name, vtype)) retval = 1 elif volume < minvol: - print ("FAIL: Volume of %d is less than"\ - " minimum of %d for %s %s" % (volume, minvol, - name, vtype)) + print( + "FAIL: Volume of %d is less than" + " minimum of %d for %s %s" % + (volume, minvol, name, vtype)) retval = 1 else: - print ("PASS: Volume is %d for %s %s" % (volume, name, - vtype)) + print( + "PASS: Volume is %d for %s %s" % + (volume, name, vtype)) else: - print("Unable to find volume information in the pacmd"\ - " list-%ss output for device %s.\npacmd output "\ - "was: %s" % - (vtype, name, pacmd_entries), file=sys.stderr) + print( + "Unable to find volume information in the pacmd" + " list-%ss output for device %s.\npacmd output " + "was: %s" % (vtype, name, pacmd_entries), file=sys.stderr) return 1 return retval @@ -134,5 +143,6 @@ def main(): check_volume_retval = check_volume(args.minvol, args.maxvol) return check_muted_retval or check_volume_retval + if __name__ == "__main__": sys.exit(main()) diff --git a/bin/wifi_ap_wizard.py b/bin/wifi_ap_wizard.py index 5506d01..9029040 100755 --- a/bin/wifi_ap_wizard.py +++ b/bin/wifi_ap_wizard.py @@ -61,5 +61,6 @@ def main(): raise SystemExit('Did not get prompted ("{}")'.format(prompt)) wizard.writeline(response) + if __name__ == '__main__': main() diff --git a/bin/wifi_client_test_netplan.py b/bin/wifi_client_test_netplan.py index c74320e..6f8281c 100755 --- a/bin/wifi_client_test_netplan.py +++ b/bin/wifi_client_test_netplan.py @@ -21,8 +21,6 @@ import subprocess as sp import textwrap import time import shutil -from struct import pack -from socket import inet_ntoa import sys print = functools.partial(print, flush=True) @@ -135,7 +133,8 @@ def generate_test_config(interface, ssid, psk, address, dhcp): nameservers: {} Static IP no dhcp: - >>> print(generate_test_config("eth0", "my_ap", "s3cr3t", "192.168.1.1", False)) + >>> print(generate_test_config( + "eth0", "my_ap", "s3cr3t", "192.168.1.1", False)) # This is the network config written by checkbox network: version: 2 @@ -162,7 +161,8 @@ def generate_test_config(interface, ssid, psk, address, dhcp): password = "password: " + psk else: password = "" - return textwrap.dedent(np_cfg.format(interface, ssid, password, address, dhcp)) + return textwrap.dedent( + np_cfg.format(interface, ssid, password, address, dhcp)) def write_test_config(config): diff --git a/bin/wifi_master_mode.py b/bin/wifi_master_mode.py index 3888caa..d52d5c0 100755 --- a/bin/wifi_master_mode.py +++ b/bin/wifi_master_mode.py @@ -67,7 +67,7 @@ class WifiMasterMode(): logging.info(output) logging.info("AP successfully established.") child.terminate() - if child.poll() is not 0: + if child.poll() != 0: output = child.stdout.read() logging.error(log + output) logging.error('AP failed to start') diff --git a/bin/wifi_time2reconnect.py b/bin/wifi_time2reconnect.py index d4ae2c0..a1b49b5 100755 --- a/bin/wifi_time2reconnect.py +++ b/bin/wifi_time2reconnect.py @@ -7,9 +7,8 @@ import time import subprocess from datetime import datetime try: - from subprocess import DEVNULL # >= python3.3 + from subprocess import DEVNULL # >= python3.3 except ImportError: - import os DEVNULL = open(os.devnull, 'wb') IFACE = None @@ -21,7 +20,7 @@ def main(): Check the time needed to reconnect an active WIFI connection """ devices = subprocess.getoutput('nmcli dev') - match = re.search('(\w+)\s+(802-11-wireless|wifi)\s+connected', devices) + match = re.search(r'(\w+)\s+(802-11-wireless|wifi)\s+connected', devices) if match: IFACE = match.group(1) else: @@ -46,7 +45,7 @@ def main(): return 1 subprocess.call( - 'nmcli dev disconnect iface %s' %IFACE, + 'nmcli dev disconnect iface %s' % IFACE, stdout=open(os.devnull, 'w'), stderr=subprocess.STDOUT, shell=True) @@ -55,13 +54,13 @@ def main(): start = datetime.now() subprocess.call( - 'nmcli con up uuid %s --timeout %s' %(uuid, TIMEOUT), + 'nmcli con up uuid %s --timeout %s' % (uuid, TIMEOUT), stdout=open(os.devnull, 'w'), stderr=subprocess.STDOUT, shell=True) delta = datetime.now() - start - print('%.2f Seconds' %delta.total_seconds()) + print('%.2f Seconds' % delta.total_seconds()) return 0 diff --git a/bin/window_test.py b/bin/window_test.py index ee79a89..895e2db 100755 --- a/bin/window_test.py +++ b/bin/window_test.py @@ -277,8 +277,7 @@ def print_move_window(iterations, timeout, *args): for it in range(iterations): print('Iteration %d of %d:' % (it + 1, iterations)) - exit_status = move_window('glxgears', - timeout) + status = move_window('glxgears', timeout) print('') return status @@ -290,7 +289,8 @@ def main(): 'open-close-multi': print_open_close_multi, 'move': print_move_window} - parser = ArgumentParser(description='Script that performs window operation') + parser = ArgumentParser( + description='Script that performs window operation') parser.add_argument('-t', '--test', default='all', help='The name of the test to run. \ @@ -338,5 +338,6 @@ def main(): return status + if __name__ == '__main__': exit(main()) diff --git a/bin/wwan_tests.py b/bin/wwan_tests.py index df44cf6..9bbf1e7 100755 --- a/bin/wwan_tests.py +++ b/bin/wwan_tests.py @@ -278,7 +278,7 @@ class ThreeGppConnection(): _wwan_radio_on() time.sleep(args.wwan_setup_time) ret_code = _ping_test(args.wwan_net_if) - except: + except subprocess.SubprocessError: pass _destroy_3gpp_connection() _wwan_radio_off() @@ -312,29 +312,6 @@ class Resources(): mm = MMDbus() for m in mm.get_modem_ids(): print("mm_id: {}".format(m)) - # Removed ability to fetch supported RATs when adding CLI support - # to this script. It is somewhat messy to scrape this from mmcli - # output and it wasn't actually used in test definitions. - # - # cap_bits = mm.get_rat_support(m) - # if cap_bits != 0: - # if (cap_bits & MMModemCapability['MM_MODEM_CAPABILITY_POTS']): - # print("pots: supported") - # if (cap_bits & - # MMModemCapability['MM_MODEM_CAPABILITY_CDMA_EVDO']): - # print("cdma_evdo: supported") - # if (cap_bits & - # MMModemCapability['MM_MODEM_CAPABILITY_GSM_UMTS']): - # print("gsm_umts: supported") - # if (cap_bits & - # MMModemCapability['MM_MODEM_CAPABILITY_LTE']): - # print("lte: supported") - # if (cap_bits & - # MMModemCapability['MM_MODEM_CAPABILITY_LTE_ADVANCED']): - # print("lte_advanced: supported") - # if (cap_bits & - # MMModemCapability['MM_MODEM_CAPABILITY_IRIDIUM']): - # print("iridium: supported") print("hw_id: {}".format(mm.get_equipment_id(m))) print("manufacturer: {}".format(mm.get_manufacturer(m))) print("model: {}".format(mm.get_model_name(m))) diff --git a/bin/xrandr_cycle.py b/bin/xrandr_cycle.py index 6743f6f..8d87217 100755 --- a/bin/xrandr_cycle.py +++ b/bin/xrandr_cycle.py @@ -117,11 +117,11 @@ if 'PLAINBOX_PROVIDER_DATA' in os.environ: shutter_xml_template = os.path.join(os.environ['PLAINBOX_PROVIDER_DATA'], "settings", "shutter_xml") else: - shutter_xml_template = os.path.join(os.path.split(os.path.dirname( - os.path.realpath(__file__)))[0], - "data", - "settings", - "shutter_xml") + shutter_xml_template = os.path.join( + os.path.split(os.path.dirname(os.path.realpath(__file__)))[0], + "data", + "settings", + "shutter_xml") if args.keyword: screenshot_path = screenshot_path + '_' + args.keyword @@ -167,7 +167,7 @@ try: 'folder="%s"' % screenshot_path, content)) new_profile.close() old_profile.close() -except: +except (IOError, OSError): raise SystemExit("ERROR: While updating folder name " "in shutter profile: {}".format(sys.exc_info())) @@ -198,7 +198,7 @@ for mode in highest_modes: print("""Could not capture screenshot - you may need to install the package 'shutter'.""") - except: + except (IOError, OSError): print("""Could not configure screenshot tool - you may need to install the package 'shutter', or check that {}/{} exists and is writable.""".format( @@ -220,7 +220,7 @@ try: with tarfile.open(screenshot_path + '.tgz', 'w:gz') as screen_tar: for screen in os.listdir(screenshot_path): screen_tar.add(screenshot_path + '/' + screen, screen) -except: +except (IOError, OSError): pass # Output some fun facts and knock off for the day -- cgit v1.2.3 From 0397f635a66ffb42031d05cd8977209b4e68a726 Mon Sep 17 00:00:00 2001 From: Sylvain Pineau Date: Fri, 17 Jul 2020 10:50:12 +0200 Subject: requirements: Add manage.py test -f to run Flake8 on Python scripts --- requirements/container-tests-provider-checkbox | 1 + requirements/deb-base.txt | 1 + 2 files changed, 2 insertions(+) diff --git a/requirements/container-tests-provider-checkbox b/requirements/container-tests-provider-checkbox index 9f69ffe..7a4404c 100755 --- a/requirements/container-tests-provider-checkbox +++ b/requirements/container-tests-provider-checkbox @@ -18,3 +18,4 @@ python3 $TMPDIR/plainbox-provider-resource/manage.py develop --force ./manage.py validate ./manage.py test -u +./manage.py test -f diff --git a/requirements/deb-base.txt b/requirements/deb-base.txt index 6f9256f..74893e2 100644 --- a/requirements/deb-base.txt +++ b/requirements/deb-base.txt @@ -1 +1,2 @@ shellcheck +flake8 -- cgit v1.2.3 From 7a017ea84cc35c77055a068858b5ecae8ed30d21 Mon Sep 17 00:00:00 2001 From: Sylvain Pineau Date: Fri, 17 Jul 2020 11:58:54 +0200 Subject: bin:*.py: Fix Flake8 W503 (Xenial version, 2.5.4) Can be forced on recent versions with: flake8 --select=W503 ./accelerometer_test.py --- bin/accelerometer_test.py | 4 +-- bin/create_connection.py | 4 +-- bin/frequency_governors_test.py | 57 ++++++++++++++++++------------------ bin/graphics_driver.py | 10 +++---- bin/key_test.py | 6 ++-- bin/mac_passthrough.py | 4 +-- bin/memory_test.py | 4 +-- bin/pm_log_check.py | 12 ++++---- bin/removable_storage_watcher.py | 62 ++++++++++++++++++++-------------------- 9 files changed, 82 insertions(+), 81 deletions(-) diff --git a/bin/accelerometer_test.py b/bin/accelerometer_test.py index 93ae07f..130e22d 100755 --- a/bin/accelerometer_test.py +++ b/bin/accelerometer_test.py @@ -88,8 +88,8 @@ class AccelerometerUI(Gtk.Window): def update_axis_icon(self, direction): """Change desired directional icon to checkmark""" - exec('self.%s_icon.set_from_stock' % (direction) - + '(Gtk.STOCK_YES, size=Gtk.IconSize.BUTTON)') + exec('self.%s_icon.set_from_stock' % (direction) + + '(Gtk.STOCK_YES, size=Gtk.IconSize.BUTTON)') def update_debug_label(self, text): """Update axis information in center of UI""" diff --git a/bin/create_connection.py b/bin/create_connection.py index 880151d..9dc977e 100755 --- a/bin/create_connection.py +++ b/bin/create_connection.py @@ -167,8 +167,8 @@ def write_connection_file(name, connection_info): os.fchmod(connection_file.fileno(), 0o600) connection_file.close() except IOError: - print("Can't write to " + CONNECTIONS_PATH + name - + ". Is this command being run as root?", file=sys.stderr) + print("Can't write to " + CONNECTIONS_PATH + name + + ". Is this command being run as root?", file=sys.stderr) sys.exit(1) diff --git a/bin/frequency_governors_test.py b/bin/frequency_governors_test.py index 52dd74a..fb652f5 100755 --- a/bin/frequency_governors_test.py +++ b/bin/frequency_governors_test.py @@ -238,8 +238,8 @@ class CPUScalingTest(object): else: thisTime = stop_elapsed_time - start_elapsed_time if ( - (abs(thisTime - runTime) / runTime) * 100 - < self.retryTolerance + (abs(thisTime - runTime) / runTime) * 100 < + self.retryTolerance ): return runTime else: @@ -272,9 +272,9 @@ class CPUScalingTest(object): minimumFrequency = self.getParameter("scaling_min_freq") currentFrequency = self.getParameter("scaling_cur_freq") if ( - not minimumFrequency - or not currentFrequency - or (minimumFrequency != currentFrequency) + not minimumFrequency or + not currentFrequency or + (minimumFrequency != currentFrequency) ): return False @@ -377,9 +377,9 @@ class CPUScalingTest(object): minimumFrequency = self.getParameter("scaling_min_freq") currentFrequency = self.getParameter("scaling_cur_freq") if ( - not minimumFrequency - or not currentFrequency - or (minimumFrequency != currentFrequency) + not minimumFrequency or + not currentFrequency or + (minimumFrequency != currentFrequency) ): logging.error( "Could not verify that cpu frequency is set to the minimum" @@ -408,9 +408,9 @@ class CPUScalingTest(object): maximumFrequency = self.getParameter("scaling_max_freq") currentFrequency = self.getParameter("scaling_cur_freq") if ( - not maximumFrequency - or not currentFrequency - or (maximumFrequency != currentFrequency) + not maximumFrequency or + not currentFrequency or + (maximumFrequency != currentFrequency) ): logging.error( "Could not verify that cpu frequency is set to the" @@ -430,8 +430,8 @@ class CPUScalingTest(object): % self.maximumFrequencyTestTime) # Verify MHz increase is comparable to time % decrease - predictedSpeedup = (float(maximumFrequency) - / float(minimumFrequency)) + predictedSpeedup = (float(maximumFrequency) / + float(minimumFrequency)) # If "ida" turbo thing, increase the expectation by 8% if self.cpuFlags and self.idaFlag in self.cpuFlags: @@ -439,17 +439,17 @@ class CPUScalingTest(object): "Found %s flag, increasing expected speedup by %.1f%%" % (self.idaFlag, self.idaSpeedupFactor)) predictedSpeedup = \ - (predictedSpeedup - * (1.0 / (1.0 - (self.idaSpeedupFactor / 100.0)))) + (predictedSpeedup * + (1.0 / (1.0 - (self.idaSpeedupFactor / 100.0)))) if self.minimumFrequencyTestTime and self.maximumFrequencyTestTime: - measuredSpeedup = (self.minimumFrequencyTestTime - / self.maximumFrequencyTestTime) + measuredSpeedup = (self.minimumFrequencyTestTime / + self.maximumFrequencyTestTime) logging.info("CPU Frequency Speed Up: %.2f" % predictedSpeedup) logging.info("Measured Speed Up: %.2f" % measuredSpeedup) differenceSpeedUp = ( - ((measuredSpeedup - predictedSpeedup) / predictedSpeedup) - * 100) + ((measuredSpeedup - predictedSpeedup) / predictedSpeedup) * + 100) logging.info( "Percentage Difference %.1f%%" % differenceSpeedUp) if differenceSpeedUp > self.speedUpTolerance: @@ -504,8 +504,8 @@ class CPUScalingTest(object): # Compare the timing to the max results from earlier, # again time should be within self.speedUpTolerance differenceOnDemandVsMaximum = \ - (abs(onDemandTestTime - self.maximumFrequencyTestTime) - / self.maximumFrequencyTestTime) * 100 + (abs(onDemandTestTime - self.maximumFrequencyTestTime) / + self.maximumFrequencyTestTime) * 100 logging.info( "Percentage Difference vs. maximum frequency: %.1f%%" % differenceOnDemandVsMaximum) @@ -546,9 +546,9 @@ class CPUScalingTest(object): maximumFrequency = self.getParameter("scaling_max_freq") currentFrequency = self.getParameter("scaling_cur_freq") if ( - not maximumFrequency - or not currentFrequency - or (maximumFrequency != currentFrequency) + not maximumFrequency or + not currentFrequency or + (maximumFrequency != currentFrequency) ): logging.error( "Current cpu frequency of %s is not set to the maximum " @@ -567,8 +567,8 @@ class CPUScalingTest(object): if performanceTestTime and self.maximumFrequencyTestTime: # Compare the timing to the max results differencePerformanceVsMaximum = \ - (abs(performanceTestTime - self.maximumFrequencyTestTime) - / self.maximumFrequencyTestTime) * 100 + (abs(performanceTestTime - self.maximumFrequencyTestTime) / + self.maximumFrequencyTestTime) * 100 logging.info( "Percentage Difference vs. maximum frequency: %.1f%%" % differencePerformanceVsMaximum) @@ -631,8 +631,9 @@ class CPUScalingTest(object): if conservativeTestTime and self.minimumFrequencyTestTime: # Compare the timing to the max results differenceConservativeVsMinimum = \ - (abs(conservativeTestTime - self.minimumFrequencyTestTime) - / self.minimumFrequencyTestTime) * 100 + (abs(conservativeTestTime - + self.minimumFrequencyTestTime) / + self.minimumFrequencyTestTime) * 100 logging.info( "Percentage Difference vs. minimum frequency: %.1f%%" % differenceConservativeVsMinimum) diff --git a/bin/graphics_driver.py b/bin/graphics_driver.py index 0b41050..f9bdff3 100755 --- a/bin/graphics_driver.py +++ b/bin/graphics_driver.py @@ -351,12 +351,12 @@ def hybrid_graphics_check(xlog): drivers.append(module['name']) print('Loaded DDX Drivers: %s' % ', '.join(drivers)) - has_hybrid_graphics = (len(cards) > 1 and is_laptop() - and (cards_dict.get('8086') in formatted_cards - or cards_dict.get('1002') in formatted_cards)) + has_hybrid_graphics = (len(cards) > 1 and is_laptop() and + (cards_dict.get('8086') in formatted_cards or + cards_dict.get('1002') in formatted_cards)) - print('Hybrid Graphics: %s' % (has_hybrid_graphics - and 'yes' or 'no')) + print('Hybrid Graphics: %s' % (has_hybrid_graphics and + 'yes' or 'no')) return 0 diff --git a/bin/key_test.py b/bin/key_test.py index 883bf32..f94abd3 100755 --- a/bin/key_test.py +++ b/bin/key_test.py @@ -122,9 +122,9 @@ class Reporter(object): index = 0 length = len(raw_bytes) while index < length: - if (index + 2 < length and (raw_bytes[index] & 0x7f) == 0 - and (raw_bytes[index + 1] & 0x80) != 0 - and (raw_bytes[index + 2] & 0x80) != 0): + if (index + 2 < length and (raw_bytes[index] & 0x7f) == 0 and + (raw_bytes[index + 1] & 0x80) != 0 and + (raw_bytes[index + 2] & 0x80) != 0): code = (((raw_bytes[index + 1] & 0x7f) << 7) | (raw_bytes[2] & 0x7f)) index += 3 diff --git a/bin/mac_passthrough.py b/bin/mac_passthrough.py index 92d2eb6..1707a1e 100755 --- a/bin/mac_passthrough.py +++ b/bin/mac_passthrough.py @@ -53,8 +53,8 @@ def get_pass_through_mac(): for line in dsdt: if 'AUXMAC' in line: bios_mac = line.split('#')[1] - print('Pass-through MAC address from DSDT table: ' - + bios_mac.lower()) + print('Pass-through MAC address from DSDT table: ' + + bios_mac.lower()) return bios_mac.lower() raise SystemExit('No AUXMAC is found in DSDT table, ' 'MAC address pass-through is not working.') diff --git a/bin/memory_test.py b/bin/memory_test.py index 4cd6008..a472b71 100755 --- a/bin/memory_test.py +++ b/bin/memory_test.py @@ -75,8 +75,8 @@ class MemoryTest(): self.process_memory = self.free_memory try: arch = self._command_out("arch").decode() - if (re.match(r"(i[0-9]86|s390|arm.*)", arch) - and self.free_memory > 1024): + if (re.match(r"(i[0-9]86|s390|arm.*)", arch) and + self.free_memory > 1024): self.is_process_limited = True self.process_memory = 1024 # MB, due to 32-bit address space print("%s arch, Limiting Process Memory: %u" % ( diff --git a/bin/pm_log_check.py b/bin/pm_log_check.py index fae8eca..1d41ed6 100755 --- a/bin/pm_log_check.py +++ b/bin/pm_log_check.py @@ -104,8 +104,8 @@ class Parser(object): command_output = {} break if ( - self.is_executing_line(line) - or self.is_getting_info_line(line) + self.is_executing_line(line) or + self.is_getting_info_line(line) ): # Skip commands with no output iterator.unnext(line) @@ -135,8 +135,8 @@ class Parser(object): value = [] for line in iterator: if ( - self.is_logging_line(line) - or self.is_field_line(line) + self.is_logging_line(line) or + self.is_field_line(line) ): iterator.unnext(line) break @@ -209,8 +209,8 @@ def compare_results(results): result_output = result[command] error_messages = [] - fields = (set(baseline_output.keys()) - | set(result_output.keys())) + fields = (set(baseline_output.keys()) | + set(result_output.keys())) for field in fields: baseline_field = baseline_output.get(field, '') result_field = result_output.get(field, '') diff --git a/bin/removable_storage_watcher.py b/bin/removable_storage_watcher.py index 7120f6d..c13439e 100755 --- a/bin/removable_storage_watcher.py +++ b/bin/removable_storage_watcher.py @@ -127,10 +127,10 @@ class UDisks1StorageDeviceListener: for dev in desired_bus_devices: if self._memorycard: if ( - dev.bus != 'sdio' - and not FLASH_RE.search(dev.media) - and not CARD_READER_RE.search(dev.model) - and not GENERIC_RE.search(dev.vendor) + dev.bus != 'sdio' and + not FLASH_RE.search(dev.media) and + not CARD_READER_RE.search(dev.model) and + not GENERIC_RE.search(dev.vendor) ): logging.debug( "The device does not seem to be a memory" @@ -140,9 +140,9 @@ class UDisks1StorageDeviceListener: print(message % {'bus': 'memory card', 'file': dev.file}) else: if ( - FLASH_RE.search(dev.media) - or CARD_READER_RE.search(dev.model) - or GENERIC_RE.search(dev.vendor) + FLASH_RE.search(dev.media) or + CARD_READER_RE.search(dev.model) or + GENERIC_RE.search(dev.vendor) ): logging.debug("The device seems to be a memory" " card (bus: %r (model: %r), skipping", @@ -590,47 +590,47 @@ class UDisks2StorageDeviceListener: # require the device to be mounted if ( record.value.iface_name == - "org.freedesktop.UDisks2.Drive" - and record.value.delta_type == DELTA_TYPE_PROP - and record.value.prop_name == "ConnectionBus" - and record.value.prop_value == "" + "org.freedesktop.UDisks2.Drive" and + record.value.delta_type == DELTA_TYPE_PROP and + record.value.prop_name == "ConnectionBus" and + record.value.prop_value == "" ): needs.remove('mounted') # Detect block devices designated for filesystems if ( record.value.iface_name == - "org.freedesktop.UDisks2.Block" - and record.value.delta_type == DELTA_TYPE_PROP - and record.value.prop_name == "IdUsage" - and record.value.prop_value == "filesystem" + "org.freedesktop.UDisks2.Block" and + record.value.delta_type == DELTA_TYPE_PROP and + record.value.prop_name == "IdUsage" and + record.value.prop_value == "filesystem" ): found.add('block-fs') # Memorize the block device path elif ( record.value.iface_name == - "org.freedesktop.UDisks2.Block" - and record.value.delta_type == DELTA_TYPE_PROP - and record.value.prop_name == "PreferredDevice" + "org.freedesktop.UDisks2.Block" and + record.value.delta_type == DELTA_TYPE_PROP and + record.value.prop_name == "PreferredDevice" ): object_block_device = record.value.prop_value # Ensure the device is a partition elif (record.value.iface_name == - "org.freedesktop.UDisks2.Partition" - and record.value.delta_type == DELTA_TYPE_IFACE): + "org.freedesktop.UDisks2.Partition" and + record.value.delta_type == DELTA_TYPE_IFACE): found.add('partition') # Ensure the device is not empty elif (record.value.iface_name == - "org.freedesktop.UDisks2.Block" - and record.value.delta_type == DELTA_TYPE_PROP - and record.value.prop_name == "Size" - and record.value.prop_value > 0): + "org.freedesktop.UDisks2.Block" and + record.value.delta_type == DELTA_TYPE_PROP and + record.value.prop_name == "Size" and + record.value.prop_value > 0): found.add('non-empty') # Ensure the filesystem is mounted elif (record.value.iface_name == - "org.freedesktop.UDisks2.Filesystem" - and record.value.delta_type == DELTA_TYPE_PROP - and record.value.prop_name == "MountPoints" - and record.value.prop_value != []): + "org.freedesktop.UDisks2.Filesystem" and + record.value.delta_type == DELTA_TYPE_PROP and + record.value.prop_name == "MountPoints" and + record.value.prop_value != []): found.add('mounted') # On some systems partition are reported as mounted # filesystems, without 'partition' record @@ -638,9 +638,9 @@ class UDisks2StorageDeviceListener: needs.remove('partition') # Finally memorize the drive the block device belongs to elif (record.value.iface_name == - "org.freedesktop.UDisks2.Block" - and record.value.delta_type == DELTA_TYPE_PROP - and record.value.prop_name == "Drive"): + "org.freedesktop.UDisks2.Block" and + record.value.delta_type == DELTA_TYPE_PROP and + record.value.prop_name == "Drive"): drive_object_path = record.value.prop_value logging.debug("Finished analyzing %s, found: %s, needs: %s" " drive_object_path: %s", object_path, found, needs, -- cgit v1.2.3