summaryrefslogtreecommitdiff
path: root/bin
diff options
authorSylvain Pineau <sylvain.pineau@canonical.com>2020-07-17 10:49:33 +0200
committerSylvain Pineau <sylvain.pineau@canonical.com>2020-07-17 10:49:33 +0200
commit53a9dace03fea49a2de030e7cd1999155637b6ef (patch)
tree11ca39912f0ec5993853f8f86b7d37340532195f /bin
parent5476520cd9983b2c8d65c98ea1ba542ac922fe95 (diff)
bin:*.py: Fix all Flake8 errors
Diffstat (limited to 'bin')
-rwxr-xr-xbin/accelerometer_test.py29
-rwxr-xr-xbin/alsa_tests.py1
-rwxr-xr-xbin/ansi_parser.py3
-rwxr-xr-xbin/audio_driver_info.py7
-rwxr-xr-xbin/audio_test.py257
-rwxr-xr-xbin/battery_test.py8
-rwxr-xr-xbin/bluetooth_test.py1
-rwxr-xr-xbin/bmc_info.py10
-rwxr-xr-xbin/boot_mode_test_snappy.py1
-rwxr-xr-xbin/brightness_test.py21
-rwxr-xr-xbin/bt_connect.py1
-rwxr-xr-xbin/bt_list_adapters.py2
-rwxr-xr-xbin/color_depth_info.py3
-rwxr-xr-xbin/cpu_offlining.py1
-rwxr-xr-xbin/cpu_topology.py5
-rwxr-xr-xbin/cpuid.py6
-rwxr-xr-xbin/create_connection.py8
-rwxr-xr-xbin/dmitest.py32
-rwxr-xr-xbin/edid_cycle.py6
-rwxr-xr-xbin/evdev_touch_test.py2
-rwxr-xr-xbin/fan_reaction_test.py8
-rwxr-xr-xbin/frequency_governors_test.py222
-rwxr-xr-xbin/fresh_rate_info.py5
-rwxr-xr-xbin/get_make_and_model.py12
-rwxr-xr-xbin/gpu_test.py24
-rwxr-xr-xbin/graphic_memory_info.py7
-rwxr-xr-xbin/graphics_driver.py10
-rwxr-xr-xbin/graphics_modes_info.py10
-rwxr-xr-xbin/graphics_stress_test.py46
-rwxr-xr-xbin/gst_pipeline_test.py29
-rwxr-xr-xbin/hdd_parking.py10
-rwxr-xr-xbin/hotkey_tests.py1
-rwxr-xr-xbin/key_test.py14
-rwxr-xr-xbin/keyboard_test.py1
-rwxr-xr-xbin/lock_screen_watcher.py5
-rwxr-xr-xbin/lsmod_info.py1
-rwxr-xr-xbin/manage_compiz_plugin.py8
-rwxr-xr-xbin/memory_compare.py4
-rwxr-xr-xbin/network.py25
-rwxr-xr-xbin/network_check.py8
-rwxr-xr-xbin/network_ntp_test.py12
-rwxr-xr-xbin/network_reconnect_resume_test.py10
-rwxr-xr-xbin/network_restart.py24
-rwxr-xr-xbin/optical_read_test.py29
-rwxr-xr-xbin/pm_log_check.py16
-rwxr-xr-xbin/pm_test.py27
-rwxr-xr-xbin/process_wait.py9
-rwxr-xr-xbin/pulse-active-port-change.py29
-rwxr-xr-xbin/removable_storage_test.py49
-rwxr-xr-xbin/removable_storage_watcher.py87
-rwxr-xr-xbin/resolution_test.py8
-rwxr-xr-xbin/rotation_test.py4
-rwxr-xr-xbin/sleep_test.py9
-rwxr-xr-xbin/sleep_test_log_check.py15
-rwxr-xr-xbin/sleep_time_check.py14
-rwxr-xr-xbin/stress_ng_test.py2
-rwxr-xr-xbin/test_bt_keyboard.py2
-rwxr-xr-xbin/touchpad_driver_info.py4
-rwxr-xr-xbin/touchpad_test.py5
-rwxr-xr-xbin/virtualization.py29
-rwxr-xr-xbin/volume_test.py78
-rwxr-xr-xbin/wifi_ap_wizard.py1
-rwxr-xr-xbin/wifi_client_test_netplan.py8
-rwxr-xr-xbin/wifi_master_mode.py2
-rwxr-xr-xbin/wifi_time2reconnect.py11
-rwxr-xr-xbin/window_test.py7
-rwxr-xr-xbin/wwan_tests.py25
-rwxr-xr-xbin/xrandr_cycle.py16
68 files changed, 764 insertions, 622 deletions
diff --git a/bin/accelerometer_test.py b/bin/accelerometer_test.py
index 2d95ca8..93ae07f 100755
--- a/bin/accelerometer_test.py
+++ b/bin/accelerometer_test.py
@@ -23,6 +23,7 @@ The purpose of this script is to simply interact with an onboard
accelerometer, and check to be sure that the x, y, z axis respond
to physical movement of hardware.
'''
+
from argparse import ArgumentParser
import gi
import logging
@@ -34,9 +35,10 @@ import time
gi.require_version('Gdk', '3.0')
gi.require_version('GLib', '2.0')
gi.require_version("Gtk", "3.0")
-from gi.repository import Gdk, GLib, Gtk
-from subprocess import Popen, PIPE, check_output, STDOUT, CalledProcessError
-from checkbox_support.parsers.modinfo import ModinfoParser
+from gi.repository import Gdk, GLib, Gtk # noqa: E402
+from subprocess import Popen, PIPE, check_output, STDOUT # noqa: E402
+from subprocess import CalledProcessError # noqa: E402
+from checkbox_support.parsers.modinfo import ModinfoParser # noqa: E402
handler = logging.StreamHandler()
logger = logging.getLogger()
@@ -86,7 +88,7 @@ class AccelerometerUI(Gtk.Window):
def update_axis_icon(self, direction):
"""Change desired directional icon to checkmark"""
- exec('self.%s_icon.set_from_stock' % (direction) \
+ exec('self.%s_icon.set_from_stock' % (direction)
+ '(Gtk.STOCK_YES, size=Gtk.IconSize.BUTTON)')
def update_debug_label(self, text):
@@ -131,7 +133,7 @@ class AxisData(threading.Thread):
self.x_test_pool = ["up", "down"]
self.y_test_pool = ["left", "right"]
- if self.ui == None:
+ if self.ui is None:
self.ui.enabled = False
def grab_current_readings(self):
@@ -223,8 +225,8 @@ class AxisData(threading.Thread):
def insert_supported_module(oem_module):
"""Try and insert supported module to see if we get any init errors"""
try:
- stream = check_output(['modinfo', oem_module], stderr=STDOUT,
- universal_newlines=True)
+ stream = check_output(
+ ['modinfo', oem_module], stderr=STDOUT, universal_newlines=True)
except CalledProcessError as err:
print("Error accessing modinfo for %s: " % oem_module, file=sys.stderr)
print(err.output, file=sys.stderr)
@@ -232,7 +234,7 @@ def insert_supported_module(oem_module):
parser = ModinfoParser(stream)
module = os.path.basename(parser.get_field('filename'))
-
+
insmod_output = Popen(['insmod %s' % module], stderr=PIPE,
shell=True, universal_newlines=True)
@@ -259,10 +261,10 @@ def check_module_status():
if "Permission denied" in error:
raise PermissionException(error)
- vendor_data = re.findall("Vendor:\s.*", output)
+ vendor_data = re.findall(r"Vendor:\s.*", output)
try:
manufacturer = vendor_data[0].split(":")[1].strip()
- except IndexError as exception:
+ except IndexError:
logging.error("Failed to find Manufacturing data")
return
@@ -275,8 +277,8 @@ def check_module_status():
oem_module = oem_driver_pool.get(vendor)
break # We've found our desired module to probe.
- if oem_module != None:
- if insert_supported_module(oem_module) != None:
+ if oem_module is not None:
+ if insert_supported_module(oem_module) is not None:
logging.error("Failed module insertion")
# Check dmesg status for supported module
driver_status = Popen(['dmesg'], stdout=PIPE, universal_newlines=True)
@@ -347,5 +349,6 @@ def main():
# Reading is not instant.
time.sleep(5)
+
if __name__ == '__main__':
- main();
+ main()
diff --git a/bin/alsa_tests.py b/bin/alsa_tests.py
index 16fb044..e919570 100755
--- a/bin/alsa_tests.py
+++ b/bin/alsa_tests.py
@@ -168,5 +168,6 @@ def main():
device = None
return(actions[args.action](args.duration, device))
+
if __name__ == '__main__':
sys.exit(main())
diff --git a/bin/ansi_parser.py b/bin/ansi_parser.py
index c7cf26c..b5cec30 100755
--- a/bin/ansi_parser.py
+++ b/bin/ansi_parser.py
@@ -128,7 +128,8 @@ def parse_filename(filename):
def main(args):
usage = "Usage: %prog [OPTIONS] [FILE...]"
parser = OptionParser(usage=usage)
- parser.add_option("-o", "--output",
+ parser.add_option(
+ "-o", "--output",
metavar="FILE",
help="File where to output the result.")
(options, args) = parser.parse_args(args)
diff --git a/bin/audio_driver_info.py b/bin/audio_driver_info.py
index b9bfaf8..424b125 100755
--- a/bin/audio_driver_info.py
+++ b/bin/audio_driver_info.py
@@ -41,7 +41,6 @@ class PacmdAudioDevice():
print("Error running %s:" % cmd, file=sys.stderr)
print(err.output, file=sys.stderr)
return None
-
if not stream:
print("Error: modinfo returned nothing", file=sys.stderr)
return None
@@ -74,7 +73,8 @@ def list_device_info():
pacmd_entries = check_output(["pacmd", "list-%ss" % vtype],
universal_newlines=True)
except Exception as e:
- print("Error when running pacmd list-%ss: %s" % (vtype, e),
+ print(
+ "Error when running pacmd list-%ss: %s" % (vtype, e),
file=sys.stderr)
return 1
@@ -102,8 +102,9 @@ def list_device_info():
def main():
parser = ArgumentParser("List audio device and driver information")
- args = parser.parse_args()
+ parser.parse_args()
return list_device_info()
+
if __name__ == "__main__":
sys.exit(main())
diff --git a/bin/audio_test.py b/bin/audio_test.py
index 7bd6c44..2a56df4 100755
--- a/bin/audio_test.py
+++ b/bin/audio_test.py
@@ -13,7 +13,7 @@ import time
try:
import gi
gi.require_version('GLib', '2.0')
- gi.require_version('Gst','1.0')
+ gi.require_version('Gst', '1.0')
from gi.repository import GObject
from gi.repository import Gst
from gi.repository import GLib
@@ -29,26 +29,27 @@ try:
except ImportError:
from collections import Callable # backward compatible
-#Frequency bands for FFT
+# Frequency bands for FFT
BINS = 256
-#How often to take a sample and do FFT on it.
+# How often to take a sample and do FFT on it.
FFT_INTERVAL = 100000000 # In nanoseconds, so this is every 1/10th second
-#Sampling frequency. The effective maximum frequency we can analyze is
-#half of this (see Nyquist's theorem)
+# Sampling frequency. The effective maximum frequency we can analyze is
+# half of this (see Nyquist's theorem)
SAMPLING_FREQUENCY = 44100
-#The default test frequency is in the middle of the band that contains 5000Hz
-#This frequency was determined experimentally to be high enough but more
-#reliable than others we tried.
+# The default test frequency is in the middle of the band that contains 5000Hz
+# This frequency was determined experimentally to be high enough but more
+# reliable than others we tried.
DEFAULT_TEST_FREQUENCY = 5035
-#only sample a signal when peak level is in this range (in dB attenuation,
-#0 means no attenuation (and horrible clipping).
+# only sample a signal when peak level is in this range (in dB attenuation,
+# 0 means no attenuation (and horrible clipping).
REC_LEVEL_RANGE = (-2.0, -12.0)
-#For our test signal to be considered present, it has to be this much higher
-#than the base level (minimum magnitude). This is in dB.
+# For our test signal to be considered present, it has to be this much higher
+# than the base level (minimum magnitude). This is in dB.
MAGNITUDE_THRESHOLD = 2.5
-#Volume for the sample tone (in %)
+# Volume for the sample tone (in %)
PLAY_VOLUME = 70
+
class PIDController(object):
""" A Proportional-Integrative-Derivative controller (PID) controls a
process's output to try to maintain a desired output value (known as
@@ -141,7 +142,7 @@ class PAVolumeController(object):
'set-%s-volume' % (self.pa_types[self.type]),
str(self.identifier[0]),
str(int(volume)) + "%"]
- if False == self.method(command):
+ if not self.method(command):
return False
self._volume = volume
return True
@@ -159,7 +160,7 @@ class PAVolumeController(object):
'set-%s-mute' % (self.pa_types[self.type]),
str(self.identifier[0]),
mute]
- if False == self.method(command):
+ if not self.method(command):
return False
return True
@@ -187,10 +188,10 @@ class PAVolumeController(object):
return None
command = ['pactl', 'list', self.pa_types[type] + "s", 'short']
- #Expect lines of this form (field separator is tab):
- #<ID>\t<NAME>\t<MODULE>\t<SAMPLE_SPEC_WITH_SPACES>\t<STATE>
- #What we need to return is the ID for the first element on this list
- #that does not contain auto_null or monitor.
+ # Expect lines of this form (field separator is tab):
+ # <ID>\t<NAME>\t<MODULE>\t<SAMPLE_SPEC_WITH_SPACES>\t<STATE>
+ # What we need to return is the ID for the first element on this list
+ # that does not contain auto_null or monitor.
pa_info = self.method(command)
valid_elements = None
@@ -203,17 +204,17 @@ class PAVolumeController(object):
self.logger.error("No valid PulseAudio elements"
" for %s" % (self.type))
return None
- #We only need the pulseaudio numeric ID and long name for each element
+ # We only need the pulseaudio numeric ID and long name for each element
valid_elements = [(int(e.split()[0]), e.split()[1])
for e in valid_elements]
return valid_elements[0]
def _pactl_output(self, command):
- #This method mainly calls pactl (hence the name). Since pactl may
- #return a failure if the audio layer is not yet initialized, we will
- #try running a few times in case of failure. All our invocations of
- #pactl should be "idempotent" so repeating them should not have
- #any bad effects.
+ # This method mainly calls pactl (hence the name). Since pactl may
+ # return a failure if the audio layer is not yet initialized, we will
+ # try running a few times in case of failure. All our invocations of
+ # pactl should be "idempotent" so repeating them should not have
+ # any bad effects.
for attempt in range(0, 3):
try:
return subprocess.check_output(command,
@@ -242,8 +243,8 @@ class SpectrumAnalyzer(object):
self.number_of_samples = 0
self.wanted_samples = wanted_samples
self.sampling_frequency = sampling_frequency
- #Frequencies should contain *real* frequency which is half of
- #the sampling frequency
+ # Frequencies should contain *real* frequency which is half of
+ # the sampling frequency
self.frequencies = [((sampling_frequency / 2.0) / points) * i
for i in range(points)]
@@ -259,14 +260,14 @@ class SpectrumAnalyzer(object):
self.number_of_samples += 1
def frequencies_with_peak_magnitude(self, threshold=1.0):
- #First establish the base level
+ # First establish the base level
per_magnitude_bins = collections.defaultdict(int)
for magnitude in self.spectrum:
per_magnitude_bins[magnitude] += 1
base_level = max(per_magnitude_bins,
key=lambda x: per_magnitude_bins[x])
- #Now return all values that are higher (more positive)
- #than base_level + threshold
+ # Now return all values that are higher (more positive)
+ # than base_level + threshold
peaks = []
for i in range(1, len(self.spectrum) - 1):
first_index = i - 1
@@ -282,10 +283,10 @@ class SpectrumAnalyzer(object):
"""Convenience function to tell me which band
a frequency is contained in
"""
- #Note that actual frequencies are half of what the sampling
- #frequency would tell us. If SF is 44100 then maximum actual
- #frequency is 22050, and if I have 10 frequency bins each will
- #contain only 2205 Hz, not 4410 Hz.
+ # Note that actual frequencies are half of what the sampling
+ # frequency would tell us. If SF is 44100 then maximum actual
+ # frequency is 22050, and if I have 10 frequency bins each will
+ # contain only 2205 Hz, not 4410 Hz.
max_frequency = self.sampling_frequency / 2
if frequency > max_frequency or frequency < 0:
return None
@@ -339,34 +340,33 @@ class GStreamerMessageHandler(object):
if message.type == Gst.MessageType.ELEMENT:
message_name = message.get_structure().get_name()
if message_name == 'spectrum':
- #TODO: Due to an upstream bug, a structure's get_value method
- #doesn't work if the value in question is an array (as is the
- #case with the magnitudes).
- #https://bugzilla.gnome.org/show_bug.cgi?id=693168
- #We have to resort to parsing the string representation of the
- #structure. It's an ugly hack but it works.
- #Ideally we'd be able to say this to get fft_magnitudes:
- #message.get_structure.get_value('magnitude').
- #If an upstream fix ever makes it into gstreamer,
- #remember to remove this hack and the parse_spectrum
- #method
+ # TODO: Due to an upstream bug, a structure's get_value method
+ # doesn't work if the value in question is an array (as is the
+ # case with the magnitudes).
+ # https://bugzilla.gnome.org/show_bug.cgi?id=693168
+ # We have to resort to parsing the string representation of the
+ # structure. It's an ugly hack but it works.
+ # Ideally we'd be able to say this to get fft_magnitudes:
+ # message.get_structure.get_value('magnitude').
+ # If an upstream fix ever makes it into gstreamer,
+ # remember to remove this hack and the parse_spectrum method
struct_string = message.get_structure().to_string()
structure = parse_spectrum_message_structure(struct_string)
fft_magnitudes = structure['magnitude']
self.spectrum_method(self.spectrum_analyzer, fft_magnitudes)
if message_name == 'level':
- #peak_value is our process feedback
- #It's returned as an array, so I need the first (and only)
- #element
+ # peak_value is our process feedback
+ # It's returned as an array, so I need the first (and only)
+ # element
peak_value = message.get_structure().get_value('peak')[0]
self.level_method(peak_value, self.pid_controller,
self.volume_controller)
- #Adjust recording level
+ # Adjust recording level
def level_method(self, level, pid_controller, volume_controller):
- #If volume controller doesn't return a valid volume,
- #we can't control it :(
+ # If volume controller doesn't return a valid volume,
+ # we can't control it :(
current_volume = volume_controller.get_volume()
if current_volume is None:
self.logger.error("Unable to control recording volume."
@@ -375,19 +375,20 @@ class GStreamerMessageHandler(object):
self.current_level = level
change = pid_controller.input_change(level, 0.10)
if self.logger:
- self.logger.debug("Peak level: %(peak_level).2f, "
- "volume: %(volume)d%%, Volume change: %(change)f%%" %
- {'peak_level': level,
- 'change': change,
- 'volume': current_volume})
+ self.logger.debug(
+ "Peak level: %(peak_level).2f, "
+ "volume: %(volume)d%%, Volume change: %(change)f%%" % {
+ 'peak_level': level,
+ 'change': change,
+ 'volume': current_volume})
volume_controller.set_volume(current_volume + change)
- #Only sample if level is within the threshold
+ # Only sample if level is within the threshold
def spectrum_method(self, analyzer, spectrum):
if self.rec_level_range[1] <= self.current_level \
or self.current_level <= self.rec_level_range[0]:
- self.logger.debug("Sampling, recorded %d samples" %
- analyzer.number_of_samples)
+ self.logger.debug(
+ "Sampling, recorded %d samples" % analyzer.number_of_samples)
analyzer.sample(spectrum)
if analyzer.sampling_complete() and self._quit_method:
self.logger.info("Sampling complete, ending process")
@@ -414,10 +415,11 @@ class GstAudioObject(object):
class Player(GstAudioObject):
def __init__(self, frequency=DEFAULT_TEST_FREQUENCY, logger=None):
super(Player, self).__init__()
- self.pipeline_description = ("audiotestsrc wave=sine freq=%s "
- "! audioconvert "
- "! audioresample "
- "! autoaudiosink" % int(frequency))
+ self.pipeline_description = (
+ "audiotestsrc wave=sine freq=%s "
+ "! audioconvert "
+ "! audioresample "
+ "! autoaudiosink" % int(frequency))
self.logger = logger
if self.logger:
self.logger.debug(self.pipeline_description)
@@ -437,11 +439,11 @@ class Recorder(GstAudioObject):
! audioresample
! spectrum interval=%(fft_interval)s bands = %(bands)s
! wavenc
- ! filesink location=%(file)s''' %
- {'bands': bins,
- 'rate': sampling_frequency,
- 'fft_interval': fft_interval,
- 'file': output_file})
+ ! filesink location=%(file)s''' % {
+ 'bands': bins,
+ 'rate': sampling_frequency,
+ 'fft_interval': fft_interval,
+ 'file': output_file})
self.logger = logger
if self.logger:
self.logger.debug(pipeline_description)
@@ -457,25 +459,25 @@ class Recorder(GstAudioObject):
def parse_spectrum_message_structure(struct_string):
- #First let's jsonize this
- #This is the message name, which we don't need
+ # First let's jsonize this
+ # This is the message name, which we don't need
text = struct_string.replace("spectrum, ", "")
- #name/value separator in json is : and not =
- text = text.replace("=",": ")
- #Mutate the {} array notation from the structure to
- #[] notation for json.
- text = text.replace("{","[")
- text = text.replace("}","]")
- #Remove a few stray semicolons that aren't needed
- text = text.replace(";","")
- #Remove the data type fields, as json doesn't need them
+ # name/value separator in json is : and not =
+ text = text.replace("=", ": ")
+ # Mutate the {} array notation from the structure to
+ # [] notation for json.
+ text = text.replace("{", "[")
+ text = text.replace("}", "]")
+ # Remove a few stray semicolons that aren't needed
+ text = text.replace(";", "")
+ # Remove the data type fields, as json doesn't need them
text = re.sub(r"\(.+?\)", "", text)
- #double-quote the identifiers
+ # double-quote the identifiers
text = re.sub(r"([\w-]+):", r'"\1":', text)
- #Wrap the whole thing in brackets
+ # Wrap the whole thing in brackets
text = ("{"+text+"}")
- #Try to parse and return something sensible here, even if
- #the data was unparsable.
+ # Try to parse and return something sensible here, even if
+ # the data was unparsable.
try:
return json.loads(text)
except ValueError:
@@ -489,32 +491,38 @@ def process_arguments():
presence of the played frequency, if present it exits with success.
"""
parser = argparse.ArgumentParser(description=description)
- parser.add_argument("-t", "--time",
+ parser.add_argument(
+ "-t", "--time",
dest='test_duration',
action='store',
default=30,
type=int,
help="""Maximum test duration, default %(default)s seconds.
It may exit sooner if it determines it has enough data.""")
- parser.add_argument("-a", "--audio",
+ parser.add_argument(
+ "-a", "--audio",
action='store',
default="/dev/null",
type=str,
help="File to save recorded audio in .wav format")
- parser.add_argument("-q", "--quiet",
+ parser.add_argument(
+ "-q", "--quiet",
action='store_true',
default=False,
help="Be quiet, no output unless there's an error.")
- parser.add_argument("-d", "--debug",
+ parser.add_argument(
+ "-d", "--debug",
action='store_true',
default=False,
help="Debugging output")
- parser.add_argument("-f", "--frequency",
+ parser.add_argument(
+ "-f", "--frequency",
action='store',
default=DEFAULT_TEST_FREQUENCY,
type=int,
help="Frequency for test signal, default %(default)s Hz")
- parser.add_argument("-u", "--spectrum",
+ parser.add_argument(
+ "-u", "--spectrum",
action='store',
type=str,
help="""File to save spectrum information for plotting
@@ -524,10 +532,10 @@ def process_arguments():
#
def main():
- #Get arguments.
+ # Get arguments.
args = process_arguments()
- #Setup logging
+ # Setup logging
level = logging.INFO
if args.debug:
level = logging.DEBUG
@@ -535,56 +543,57 @@ def main():
level = logging.ERROR
logging.basicConfig(level=level)
try:
- #Launches recording pipeline. I need to hook up into the gst
- #messages.
+ # Launches recording pipeline. I need to hook up into the gst
+ # messages.
recorder = Recorder(output_file=args.audio, logger=logging)
- #Just launches the playing pipeline
+ # Just launches the playing pipeline
player = Player(frequency=args.frequency, logger=logging)
except GObject.GError as excp:
logging.critical("Unable to initialize GStreamer pipelines: %s", excp)
sys.exit(127)
- #This just receives a process feedback and tells me how much to change to
- #achieve the setpoint
+ # This just receives a process feedback and tells me how much to change to
+ # achieve the setpoint
pidctrl = PIDController(Kp=0.7, Ki=.01, Kd=0.01,
setpoint=REC_LEVEL_RANGE[0])
pidctrl.set_change_limit(5)
- #This gathers spectrum data.
+ # This gathers spectrum data.
analyzer = SpectrumAnalyzer(points=BINS,
sampling_frequency=SAMPLING_FREQUENCY)
- #Volume controllers actually set volumes for their device types.
- #we should at least issue a warning
+ # Volume controllers actually set volumes for their device types.
+ # we should at least issue a warning
recorder.volumecontroller = PAVolumeController(type='input',
logger=logging)
if not recorder.volumecontroller.get_identifier():
- logging.warning("Unable to get input volume control identifier. "
- "Test results will probably be invalid")
+ logging.warning(
+ "Unable to get input volume control identifier. "
+ "Test results will probably be invalid")
recorder.volumecontroller.set_volume(0)
recorder.volumecontroller.mute(False)
player.volumecontroller = PAVolumeController(type='output',
logger=logging)
if not player.volumecontroller.get_identifier():
- logging.warning("Unable to get output volume control identifier. "
- "Test results will probably be invalid")
+ logging.warning(
+ "Unable to get output volume control identifier. "
+ "Test results will probably be invalid")
player.volumecontroller.set_volume(PLAY_VOLUME)
player.volumecontroller.mute(False)
- #This handles the messages from gstreamer and orchestrates
- #the passed volume controllers, pid controller and spectrum analyzer
- #accordingly.
+ # This handles the messages from gstreamer and orchestrates
+ # the passed volume controllers, pid controller and spectrum analyzer
+ # accordingly.
gmh = GStreamerMessageHandler(rec_level_range=REC_LEVEL_RANGE,
logger=logging,
volumecontroller=recorder.volumecontroller,
pidcontroller=pidctrl,
spectrum_analyzer=analyzer)
- #I need to tell the recorder which method will handle messages.
+ # I need to tell the recorder which method will handle messages.
recorder.register_message_handler(gmh.bus_message_handler)
- #Create the loop and add a few triggers
-# GObject.threads_init() #Not needed?
+ # Create the loop and add a few triggers
loop = GLib.MainLoop()
GLib.timeout_add_seconds(0, player.start)
GLib.timeout_add_seconds(0, recorder.start)
@@ -595,29 +604,32 @@ def main():
loop.run()
- #When the loop ends, set things back to reasonable states
+ # When the loop ends, set things back to reasonable states
player.stop()
recorder.stop()
player.volumecontroller.set_volume(50)
recorder.volumecontroller.set_volume(10)
- #See if data gathering was successful.
+ # See if data gathering was successful.
test_band = analyzer.frequency_band_for(args.frequency)
- candidate_bands = analyzer.frequencies_with_peak_magnitude(MAGNITUDE_THRESHOLD)
+ candidate_bands = analyzer.frequencies_with_peak_magnitude(
+ MAGNITUDE_THRESHOLD)
for band in candidate_bands:
logging.debug("Band (%.2f,%.2f) contains a magnitude peak" %
analyzer.frequencies_for_band(band))
if test_band in candidate_bands:
freqs_for_band = analyzer.frequencies_for_band(test_band)
- logging.info("PASS: Test frequency of %s in band (%.2f, %.2f) "
- "which contains a magnitude peak" %
+ logging.info(
+ "PASS: Test frequency of %s in band (%.2f, %.2f) "
+ "which contains a magnitude peak" %
((args.frequency,) + freqs_for_band))
return_value = 0
else:
- logging.info("FAIL: Test frequency of %s is not in one of the "
- "bands with magnitude peaks" % args.frequency)
+ logging.info(
+ "FAIL: Test frequency of %s is not in one of the "
+ "bands with magnitude peaks" % args.frequency)
return_value = 1
- #Is the microphone broken?
+ # Is the microphone broken?
if len(set(analyzer.spectrum)) <= 1:
logging.info("WARNING: Microphone seems broken, didn't even "
"record ambient noise")
@@ -625,14 +637,17 @@ def main():
if args.spectrum:
logging.info("Saving spectrum data for plotting as %s" %
args.spectrum)
- if not FileDumper().write_to_file(args.spectrum,
- ["%s,%s" % t for t in
- zip(analyzer.frequencies,
- analyzer.spectrum)]):
+ if (
+ not FileDumper().write_to_file(
+ args.spectrum,
+ ["%s,%s" % t for t in zip(
+ analyzer.frequencies, analyzer.spectrum)])
+ ):
logging.error("Couldn't save spectrum data for plotting",
file=sys.stderr)
return return_value
+
if __name__ == "__main__":
sys.exit(main())
diff --git a/bin/battery_test.py b/bin/battery_test.py
index b2682c7..112ef5e 100755
--- a/bin/battery_test.py
+++ b/bin/battery_test.py
@@ -1,14 +1,13 @@
#!/usr/bin/env python3
import gi
-import os
import time
import re
import subprocess
import sys
import argparse
gi.require_version('Gio', '2.0')
-from gi.repository import Gio
+from gi.repository import Gio # noqa: E402
class Battery():
@@ -89,10 +88,10 @@ def get_battery_state():
def validate_battery_info(battery):
if battery is None:
- print ("Error obtaining battery info")
+ print("Error obtaining battery info")
return False
if battery._state != "discharging":
- print ("Error: battery is not discharging, test will not be valid")
+ print("Error: battery is not discharging, test will not be valid")
return False
return True
@@ -173,5 +172,6 @@ def main():
return(battery_life(battery_before, battery_after, test_time))
+
if __name__ == "__main__":
sys.exit(main())
diff --git a/bin/bluetooth_test.py b/bin/bluetooth_test.py
index 8fafad6..fd3c956 100755
--- a/bin/bluetooth_test.py
+++ b/bin/bluetooth_test.py
@@ -145,5 +145,6 @@ def main():
format='%(levelname)s: %(message)s')
return getattr(ObexFTPTest(args.file.name, args.btaddr), args.action)()
+
if __name__ == "__main__":
sys.exit(main())
diff --git a/bin/bmc_info.py b/bin/bmc_info.py
index d7aec9b..674d7e8 100755
--- a/bin/bmc_info.py
+++ b/bin/bmc_info.py
@@ -4,6 +4,7 @@ import sys
import shlex
from subprocess import check_output, CalledProcessError
+
def main():
# First, we need to get output
cmd = "ipmitool mc info"
@@ -14,7 +15,7 @@ def main():
file=sys.stderr)
return 1
except CalledProcessError as e:
- print("Problem running %s. Error was %s" % (cmd,e),file=sys.stderr)
+ print("Problem running %s. Error was %s" % (cmd, e), file=sys.stderr)
return 1
result = result.split('\n')
@@ -47,12 +48,11 @@ def main():
if type(data[field]) is list:
# Sometimes the first item in the list is ''. This will remove it
data[field].remove('')
- print(field.ljust(30),':',data[field].pop(0))
+ print(field.ljust(30), ':', data[field].pop(0))
for item in data[field]:
- print(' '.ljust(32),item)
+ print(' '.ljust(32), item)
else:
- print(field.ljust(30),":",data[field])
- #print("{}:\t{}".format(field, data[field]))
+ print(field.ljust(30), ":", data[field])
return 0
diff --git a/bin/boot_mode_test_snappy.py b/bin/boot_mode_test_snappy.py
index 864be29..23b30e4 100755
--- a/bin/boot_mode_test_snappy.py
+++ b/bin/boot_mode_test_snappy.py
@@ -14,7 +14,6 @@ import tempfile
import yaml
-from checkbox_support.parsers.kernel_cmdline import parse_kernel_cmdline
from checkbox_support.snap_utils.system import get_lk_bootimg_path
diff --git a/bin/brightness_test.py b/bin/brightness_test.py
index 52d478a..e3a8e90 100755
--- a/bin/brightness_test.py
+++ b/bin/brightness_test.py
@@ -28,7 +28,6 @@ import sys
import os
import time
-from sys import stdout, stderr
from glob import glob
@@ -42,7 +41,7 @@ class Brightness(object):
# See if the source is a file or a file object
# and act accordingly
file = path
- if file == None:
+ if file is None:
lines_list = []
else:
# It's a file
@@ -102,8 +101,10 @@ class Brightness(object):
Note: this doesn't guarantee that screen brightness
changed.
'''
- if (abs(self.get_actual_brightness(interface) -
- self.get_last_set_brightness(interface)) > 1):
+ if (
+ abs(self.get_actual_brightness(interface) -
+ self.get_last_set_brightness(interface)) > 1
+ ):
return 1
else:
return 0
@@ -132,9 +133,9 @@ def main():
max_brightness = brightness.get_max_brightness(interface)
# Set the brightness to half the max value
- brightness.write_value(max_brightness / 2,
- os.path.join(interface,
- 'brightness'))
+ brightness.write_value(
+ max_brightness / 2,
+ os.path.join(interface, 'brightness'))
# Check that "actual_brightness" reports the same value we
# set "brightness" to
@@ -144,9 +145,9 @@ def main():
time.sleep(2)
# Set the brightness back to its original value
- brightness.write_value(current_brightness,
- os.path.join(interface,
- 'brightness'))
+ brightness.write_value(
+ current_brightness,
+ os.path.join(interface, 'brightness'))
exit(exit_status)
diff --git a/bin/bt_connect.py b/bin/bt_connect.py
index 1fed30e..8d6757c 100755
--- a/bin/bt_connect.py
+++ b/bin/bt_connect.py
@@ -131,5 +131,6 @@ def main():
# capture all other silence failures
return 1
+
if __name__ == "__main__":
sys.exit(main())
diff --git a/bin/bt_list_adapters.py b/bin/bt_list_adapters.py
index e791442..c8db787 100755
--- a/bin/bt_list_adapters.py
+++ b/bin/bt_list_adapters.py
@@ -38,7 +38,7 @@ def main():
with open(namef, 'r') as f:
name = f.read().strip()
print(rfdev, name)
- if found_adatper == False:
+ if found_adatper is False:
raise SystemExit('No bluetooth adatpers registered with rfkill')
diff --git a/bin/color_depth_info.py b/bin/color_depth_info.py
index 129f051..fcab953 100755
--- a/bin/color_depth_info.py
+++ b/bin/color_depth_info.py
@@ -53,7 +53,7 @@ def get_color_depth(log_dir='/var/log/'):
return (depth, pixmap_format)
with open(current_log, 'rb') as stream:
- for match in re.finditer('Depth (\d+) pixmap format is (\d+) bpp',
+ for match in re.finditer(r'Depth (\d+) pixmap format is (\d+) bpp',
str(stream.read())):
depth = int(match.group(1))
pixmap_format = int(match.group(2))
@@ -72,5 +72,6 @@ def main():
return 1
return 0
+
if __name__ == '__main__':
sys.exit(main())
diff --git a/bin/cpu_offlining.py b/bin/cpu_offlining.py
index 5616e30..3d269ac 100755
--- a/bin/cpu_offlining.py
+++ b/bin/cpu_offlining.py
@@ -68,5 +68,6 @@ def main():
print(' '.join(failed_onlines))
return 1
+
if __name__ == '__main__':
main()
diff --git a/bin/cpu_topology.py b/bin/cpu_topology.py
index 0ce68c5..425e3c4 100755
--- a/bin/cpu_topology.py
+++ b/bin/cpu_topology.py
@@ -21,8 +21,8 @@ class proc_cpuinfo():
finally:
cpu_fh.close()
- r_s390 = re.compile("processor [0-9]")
- r_x86 = re.compile("processor\s+:")
+ r_s390 = re.compile(r"processor [0-9]")
+ r_x86 = re.compile(r"processor\s+:")
for i in temp:
# Handle s390 first
if r_s390.match(i):
@@ -111,5 +111,6 @@ def main():
else:
return 0
+
if __name__ == '__main__':
sys.exit(main())
diff --git a/bin/cpuid.py b/bin/cpuid.py
index d0cdea9..6543dc4 100755
--- a/bin/cpuid.py
+++ b/bin/cpuid.py
@@ -25,10 +25,9 @@
# Modifications: 2019 Jeffrey Lane (jeffrey.lane@canonical.com)
import ctypes
-import os
import platform
import sys
-from ctypes import (c_uint32, c_int, c_long, c_ulong, c_size_t, c_void_p,
+from ctypes import (c_uint32, c_int, c_size_t, c_void_p,
POINTER, CFUNCTYPE)
from subprocess import check_output
@@ -84,7 +83,8 @@ CPUIDS = {
"Broadwell": ['0x4067', '0x306d4', '0x5066', '0x406f'],
"Canon Lake": ['0x6066'],
"Cascade Lake": ['0x50655', '0x50656', '0x50657'],
- "Coffee Lake": ['0x806ea', '0x906ea', '0x906eb', '0x906ec', '0x906ed'],
+ "Coffee Lake": [
+ '0x806ea', '0x906ea', '0x906eb', '0x906ec', '0x906ed'],
"Haswell": ['0x306c', '0x4065', '0x4066', '0x306f'],
"Ice Lake": ['0x706e'],
"Ivy Bridge": ['0x306a', '0x306e'],
diff --git a/bin/create_connection.py b/bin/create_connection.py
index 71e9572..880151d 100755
--- a/bin/create_connection.py
+++ b/bin/create_connection.py
@@ -6,9 +6,8 @@ import time
from subprocess import check_call, check_output, CalledProcessError
try:
- from subprocess import DEVNULL # >= python3.3
+ from subprocess import DEVNULL # >= python3.3
except ImportError:
- import os
DEVNULL = open(os.devnull, 'wb')
from uuid import uuid4
@@ -154,7 +153,7 @@ def block_until_created(connection, retries, interval):
sys.exit(1)
else:
try:
- nmcli_con_up = check_call(['nmcli', 'con', 'up', 'id', connection])
+ check_call(['nmcli', 'con', 'up', 'id', connection])
print("Connection %s activated." % connection)
except CalledProcessError as error:
print("Failed to activate %s." % connection, file=sys.stderr)
@@ -216,7 +215,7 @@ def create_mobilebroadband_connection(args):
args.apn,
args.username,
args.password,
- args.pin)
+ args.pin)
if args.type == 'cdma':
mobilebroadband_connection += mobilebroadband_ppp_section()
@@ -288,5 +287,6 @@ def main():
# Make sure we don't exit until the connection is fully created
block_until_created(connection_name, args.retries, args.interval)
+
if __name__ == "__main__":
main()
diff --git a/bin/dmitest.py b/bin/dmitest.py
index 642af3a..2be50cc 100755
--- a/bin/dmitest.py
+++ b/bin/dmitest.py
@@ -146,26 +146,26 @@ def standard_tests(args, stream, dmi_data):
if find_in_section(stream, dmi_data, 'Chassis Information',
'Manufacturer:',
['empty', 'chassis manufacture', 'null', 'insyde',
- 'to be filled by o\.e\.m\.', 'no enclosure',
- '\.\.\.\.\.'], True):
+ r'to be filled by o\.e\.m\.', 'no enclosure',
+ r'\.\.\.\.\.'], True):
dmi_data['Chassis Information']['Manufacturer'] += \
" *** Invalid chassis manufacturer!"
retval += 1
if find_in_section(stream, dmi_data, 'System Information', 'Manufacturer:',
['system manufacture', 'insyde', 'standard',
- 'to be filled by o\.e\.m\.', 'no enclosure'], True):
+ r'to be filled by o\.e\.m\.', 'no enclosure'], True):
dmi_data['System Information']['Manufacturer'] += \
" *** Invalid system manufacturer!"
retval += 1
if find_in_section(stream, dmi_data, 'Base Board Information',
'Manufacturer:',
- ['to be filled by o\.e\.m\.'], True):
+ [r'to be filled by o\.e\.m\.'], True):
dmi_data['Base Board Information']['Manufacturer'] += \
" *** Invalid base board manufacturer!"
retval += 1
if find_in_section(stream, dmi_data, 'System Information',
'Product Name:',
- ['system product name', 'to be filled by o\.e\.m\.'],
+ ['system product name', r'to be filled by o\.e\.m\.'],
False):
dmi_data['System Information']['Product Name'] += \
" *** Invalid system product name!"
@@ -173,7 +173,7 @@ def standard_tests(args, stream, dmi_data):
if find_in_section(stream, dmi_data, 'Base Board Information',
'Product Name:',
['base board product name',
- 'to be filled by o\.e\.m\.'], False):
+ r'to be filled by o\.e\.m\.'], False):
dmi_data['Base Board Information']['Product Name'] += \
" *** Invalid base board product name!"
retval += 1
@@ -193,21 +193,21 @@ def version_tests(args, stream, dmi_data):
"""
retval = 0
if find_in_section(stream, dmi_data, 'Chassis Information', 'Version:',
- ['to be filled by o\.e\.m\.', 'empty', 'x\.x'],
+ [r'to be filled by o\.e\.m\.', 'empty', r'x\.x'],
False):
dmi_data['Chassis Information']['Version'] += \
" *** Invalid chassis version!"
retval += 1
if find_in_section(stream, dmi_data, 'System Information', 'Version:',
- ['to be filled by o\.e\.m\.', '\(none\)',
+ [r'to be filled by o\.e\.m\.', r'\(none\)',
'null', 'system version', 'not applicable',
- '\.\.\.\.\.'], False):
+ r'\.\.\.\.\.'], False):
dmi_data['System Information']['Version'] += \
" *** Invalid system information version!"
retval += 1
if find_in_section(stream, dmi_data, 'Base Board Information', 'Version:',
- ['base board version', 'x\.x',
- 'empty', 'to be filled by o\.e\.m\.'], False):
+ ['base board version', r'x\.x',
+ 'empty', r'to be filled by o\.e\.m\.'], False):
dmi_data['Base Board Information']['Version'] += \
" *** Invalid base board version!"
retval += 1
@@ -228,8 +228,8 @@ def serial_tests(args, stream, dmi_data):
retval = 0
if find_in_section(stream, dmi_data, 'System Information',
'Serial Number:',
- ['to be filled by o\.e\.m\.',
- 'system serial number', '\.\.\.\.\.'],
+ [r'to be filled by o\.e\.m\.',
+ 'system serial number', r'\.\.\.\.\.'],
False):
dmi_data['System Information']['Serial Number'] += \
" *** Invalid system information serial number!"
@@ -237,8 +237,8 @@ def serial_tests(args, stream, dmi_data):
if find_in_section(stream, dmi_data, 'Base Board Information',
'Serial Number:',
['n/a', 'base board serial number',
- 'to be filled by o\.e\.m\.',
- 'empty', '\.\.\.'],
+ r'to be filled by o\.e\.m\.',
+ 'empty', r'\.\.\.'],
False):
dmi_data['Base Board Information']['Serial Number'] += \
" *** Invalid base board serial number!"
@@ -306,7 +306,7 @@ def main():
if args.test_serials:
retval += serial_tests(args, stream, dmi_data)
if find_in_section(stream, dmi_data, 'Processor Information', 'Version:',
- ['sample', 'Genuine Intel\(R\) CPU 0000'], False):
+ ['sample', r'Genuine Intel\(R\) CPU 0000'], False):
dmi_data['Processor Information']['Version'] += \
" *** Invalid processor information!"
retval += 1
diff --git a/bin/edid_cycle.py b/bin/edid_cycle.py
index a6daf1c..e327cb0 100755
--- a/bin/edid_cycle.py
+++ b/bin/edid_cycle.py
@@ -21,7 +21,7 @@ def check_resolution():
output = subprocess.check_output('xdpyinfo')
for line in output.decode(sys.stdout.encoding).splitlines():
if 'dimensions' in line:
- match = re.search('(\d+)x(\d+)\ pixels', line)
+ match = re.search(r'(\d+)x(\d+)\ pixels', line)
if match and len(match.groups()) == 2:
return '{}x{}'.format(*match.groups())
@@ -29,7 +29,8 @@ def check_resolution():
def change_edid(host, edid_file):
with open(edid_file, 'rb') as f:
cmd = ['ssh', host, '/snap/bin/pigbox', 'run',
- '\'v4l2-ctl --set-edid=file=-,format=raw --fix-edid-checksums\'']
+ '\'v4l2-ctl --set-edid=file=-,'
+ 'format=raw --fix-edid-checksums\'']
subprocess.check_output(cmd, input=f.read())
@@ -52,5 +53,6 @@ def main():
print('PASS')
return failed
+
if __name__ == '__main__':
raise SystemExit(main())
diff --git a/bin/evdev_touch_test.py b/bin/evdev_touch_test.py
index 5649701..bc52cc4 100755
--- a/bin/evdev_touch_test.py
+++ b/bin/evdev_touch_test.py
@@ -53,7 +53,7 @@ while True:
for e in dev.read():
tap = args.xfingers
if tap == 1:
- if (e.type == 3 and e.code == 47 and e.value > 0):
+ if (e.type == 3 and e.code == 47 and e.value > 0):
raise SystemExit(
"Multitouch Event detected but Single was expected")
# type 1 is evdev.ecodes.EV_KEY
diff --git a/bin/fan_reaction_test.py b/bin/fan_reaction_test.py
index a4ab744..c37054b 100755
--- a/bin/fan_reaction_test.py
+++ b/bin/fan_reaction_test.py
@@ -34,6 +34,7 @@ class FanMonitor:
if not self._fan_paths:
print('Fan monitoring interface not found in SysFS')
raise SystemExit(0)
+
def get_rpm(self):
result = {}
for p in self._fan_paths:
@@ -44,6 +45,7 @@ class FanMonitor:
except OSError:
print('Fan SysFS node dissappeared ({})'.format(p))
return result
+
def get_average_rpm(self, period):
acc = self.get_rpm()
for i in range(period):
@@ -83,14 +85,15 @@ class Stressor:
def _stress_fun(self):
"""Actual stress function."""
# generate some random data
- data = bytes(random.getrandbits(8) for _ in range(1024))
- hasher = hashlib.sha256()
+ data = bytes(random.getrandbits(8) for _ in range(1024))
+ hasher = hashlib.sha256()
hasher.update(data)
while True:
new_digest = hasher.digest()
# use the newly obtained digest as the new data to the hasher
hasher.update(new_digest)
+
def main():
"""Entry point."""
@@ -153,5 +156,6 @@ def main():
if not (rpm_rose_during_stress and rpm_dropped_during_cooling):
raise SystemExit("Fans did not react to stress expectedly")
+
if __name__ == '__main__':
main()
diff --git a/bin/frequency_governors_test.py b/bin/frequency_governors_test.py
index 6072ded..52dd74a 100755
--- a/bin/frequency_governors_test.py
+++ b/bin/frequency_governors_test.py
@@ -8,7 +8,8 @@ import time
import argparse
import logging
-from subprocess import check_output, check_call, CalledProcessError, PIPE
+from subprocess import check_output, check_call, CalledProcessError
+
class CPUScalingTest(object):
@@ -37,11 +38,11 @@ class CPUScalingTest(object):
for subdirectory in os.listdir(self.sysCPUDirectory):
match = pattern.search(subdirectory)
if match and match.group("cpuNumber"):
- cpufreqDirectory = os.path.join(self.sysCPUDirectory,
- subdirectory, "cpufreq")
+ cpufreqDirectory = os.path.join(
+ self.sysCPUDirectory, subdirectory, "cpufreq")
if not os.path.exists(cpufreqDirectory):
- logging.error("CPU %s has no cpufreq directory %s"
- % (match.group("cpuNumber"), cpufreqDirectory))
+ logging.error("CPU %s has no cpufreq directory %s" % (
+ match.group("cpuNumber"), cpufreqDirectory))
return None
# otherwise
self.cpufreqDirectories.append(cpufreqDirectory)
@@ -59,7 +60,8 @@ class CPUScalingTest(object):
for cpufreqDirectory in self.cpufreqDirectories:
parameters = self.getParameters(cpufreqDirectory, file)
if not parameters:
- logging.error("Error: could not determine cpu parameters from %s"
+ logging.error(
+ "Error: could not determine cpu parameters from %s"
% os.path.join(cpufreqDirectory, file))
return None
if not current:
@@ -91,7 +93,7 @@ class CPUScalingTest(object):
return rf
return None
- logging.debug("Setting %s to %s" % (setFile,value))
+ logging.debug("Setting %s to %s" % (setFile, value))
path = None
if not skip:
if automatch:
@@ -115,8 +117,9 @@ class CPUScalingTest(object):
parameterFile = open(path)
line = parameterFile.readline()
if not line or line.strip() != str(value):
- logging.error("Error: could not verify that %s was set to %s"
- % (path, value))
+ logging.error(
+ "Error: could not verify that %s was set to %s" % (
+ path, value))
if line:
logging.error("Actual Value: %s" % line)
else:
@@ -127,6 +130,7 @@ class CPUScalingTest(object):
def checkSelectorExecutable(self):
logging.debug("Determining if %s is executable" % self.selectorExe)
+
def is_exe(fpath):
return os.path.exists(fpath) and os.access(fpath, os.X_OK)
@@ -179,8 +183,9 @@ class CPUScalingTest(object):
parameterFile = open(parameterFilePath)
line = parameterFile.readline()
if not line:
- logging.error("Error: failed to get %s for %s"
- % (parameter, self.cpufreqDirectory))
+ logging.error(
+ "Error: failed to get %s for %s" % (
+ parameter, self.cpufreqDirectory))
return None
value = line.strip()
return value
@@ -198,8 +203,9 @@ class CPUScalingTest(object):
parameterFile = open(path)
line = parameterFile.readline()
if not line:
- logging.error("Error: failed to get %s for %s"
- % (parameter, cpufreqDirectory))
+ logging.error(
+ "Error: failed to get %s for %s" % (
+ parameter, cpufreqDirectory))
return None
values.append(line.strip())
logging.debug("Found parameters:")
@@ -231,15 +237,18 @@ class CPUScalingTest(object):
runTime = stop_elapsed_time - start_elapsed_time
else:
thisTime = stop_elapsed_time - start_elapsed_time
- if ((abs(thisTime - runTime) / runTime) * 100
- < self.retryTolerance):
+ if (
+ (abs(thisTime - runTime) / runTime) * 100
+ < self.retryTolerance
+ ):
return runTime
else:
runTime = thisTime
tries += 1
- logging.error("Could not repeat load test times within %.1f%%"
- % self.retryTolerance)
+ logging.error(
+ "Could not repeat load test times within %.1f%%" %
+ self.retryTolerance)
return None
def pi(self):
@@ -262,9 +271,11 @@ class CPUScalingTest(object):
logging.info("Done.")
minimumFrequency = self.getParameter("scaling_min_freq")
currentFrequency = self.getParameter("scaling_cur_freq")
- if (not minimumFrequency
+ if (
+ not minimumFrequency
or not currentFrequency
- or (minimumFrequency != currentFrequency)):
+ or (minimumFrequency != currentFrequency)
+ ):
return False
# otherwise
@@ -310,7 +321,8 @@ class CPUScalingTest(object):
logging.info(" cpu%u: %s" % (i, g))
i += 1
else:
- logging.error("Error: could not determine current governor settings")
+ logging.error(
+ "Error: could not determine current governor settings")
return False
self.getCPUFlags()
@@ -333,7 +345,7 @@ class CPUScalingTest(object):
logging.debug("Found the following CPU Flags:")
for line in self.cpuFlags:
logging.debug(" %s" % line)
- except:
+ except OSError:
logging.warning("Could not read CPU flags")
def runUserSpaceTests(self):
@@ -356,52 +368,66 @@ class CPUScalingTest(object):
# Set the the CPU speed to it's lowest value
frequency = self.minFreq
- logging.info("Setting CPU frequency to %u MHz" % (int(frequency) / 1000))
+ logging.info(
+ "Setting CPU frequency to %u MHz" % (int(frequency) / 1000))
if not self.setFrequency(frequency):
success = False
# Verify the speed is set to the lowest value
minimumFrequency = self.getParameter("scaling_min_freq")
currentFrequency = self.getParameter("scaling_cur_freq")
- if (not minimumFrequency
+ if (
+ not minimumFrequency
or not currentFrequency
- or (minimumFrequency != currentFrequency)):
- logging.error("Could not verify that cpu frequency is set to the minimum value of %s"
- % minimumFrequency)
+ or (minimumFrequency != currentFrequency)
+ ):
+ logging.error(
+ "Could not verify that cpu frequency is set to the minimum"
+ " value of %s" % minimumFrequency)
success = False
# Run Load Test
self.minimumFrequencyTestTime = self.runLoadTest()
if not self.minimumFrequencyTestTime:
- logging.error("Could not retrieve the minimum frequency test's execution time.")
+ logging.error(
+ "Could not retrieve the minimum frequency test's"
+ " execution time.")
success = False
else:
- logging.info("Minimum frequency load test time: %.2f"
- % self.minimumFrequencyTestTime)
+ logging.info(
+ "Minimum frequency load test time: %.2f"
+ % self.minimumFrequencyTestTime)
# Set the CPU speed to it's highest value as above.
frequency = self.maxFreq
- logging.info("Setting CPU frequency to %u MHz" % (int(frequency) / 1000))
+ logging.info(
+ "Setting CPU frequency to %u MHz" % (int(frequency) / 1000))
if not self.setFrequency(frequency):
success = False
maximumFrequency = self.getParameter("scaling_max_freq")
currentFrequency = self.getParameter("scaling_cur_freq")
- if (not maximumFrequency
+ if (
+ not maximumFrequency
or not currentFrequency
- or (maximumFrequency != currentFrequency)):
- logging.error("Could not verify that cpu frequency is set to the maximum value of %s"
- % maximumFrequency)
+ or (maximumFrequency != currentFrequency)
+ ):
+ logging.error(
+ "Could not verify that cpu frequency is set to the"
+ " maximum value of %s" % maximumFrequency)
success = False
# Repeat workload test
self.maximumFrequencyTestTime = self.runLoadTest()
if not self.maximumFrequencyTestTime:
- logging.error("Could not retrieve the maximum frequency test's execution time.")
+ logging.error(
+ "Could not retrieve the maximum frequency test's "
+ "execution time.")
success = False
else:
- logging.info("Maximum frequency load test time: %.2f"
- % self.maximumFrequencyTestTime)
+ logging.info(
+ "Maximum frequency load test time: %.2f"
+ % self.maximumFrequencyTestTime)
# Verify MHz increase is comparable to time % decrease
predictedSpeedup = (float(maximumFrequency)
@@ -409,8 +435,9 @@ class CPUScalingTest(object):
# If "ida" turbo thing, increase the expectation by 8%
if self.cpuFlags and self.idaFlag in self.cpuFlags:
- logging.info("Found %s flag, increasing expected speedup by %.1f%%"
- % (self.idaFlag, self.idaSpeedupFactor))
+ logging.info(
+ "Found %s flag, increasing expected speedup by %.1f%%"
+ % (self.idaFlag, self.idaSpeedupFactor))
predictedSpeedup = \
(predictedSpeedup
* (1.0 / (1.0 - (self.idaSpeedupFactor / 100.0))))
@@ -420,18 +447,24 @@ class CPUScalingTest(object):
/ self.maximumFrequencyTestTime)
logging.info("CPU Frequency Speed Up: %.2f" % predictedSpeedup)
logging.info("Measured Speed Up: %.2f" % measuredSpeedup)
- differenceSpeedUp = (((measuredSpeedup - predictedSpeedup) / predictedSpeedup) * 100)
- logging.info("Percentage Difference %.1f%%" % differenceSpeedUp)
+ differenceSpeedUp = (
+ ((measuredSpeedup - predictedSpeedup) / predictedSpeedup)
+ * 100)
+ logging.info(
+ "Percentage Difference %.1f%%" % differenceSpeedUp)
if differenceSpeedUp > self.speedUpTolerance:
- logging.error("Measured speedup vs expected speedup is %.1f%% and is not within %.1f%% margin."
- % (differenceSpeedUp, self.speedUpTolerance))
+ logging.error(
+ "Measured speedup vs expected speedup is %.1f%% "
+ "and is not within %.1f%% margin."
+ % (differenceSpeedUp, self.speedUpTolerance))
success = False
elif differenceSpeedUp < 0:
- logging.info("""
- Measured speed up %.2f exceeded expected speedup %.2f
- """ % (measuredSpeedup, predictedSpeedup))
+ logging.info(
+ "Measured speed up %.2f exceeded expected speedup %.2f"
+ % (measuredSpeedup, predictedSpeedup))
else:
- logging.error("Not enough timing data to calculate speed differences.")
+ logging.error(
+ "Not enough timing data to calculate speed differences.")
return success
@@ -453,7 +486,9 @@ class CPUScalingTest(object):
# Wait a fixed period of time, then verify current speed
# is the slowest in as before
if not self.verifyMinimumFrequency():
- logging.error("Could not verify that cpu frequency has settled to the minimum value")
+ logging.error(
+ "Could not verify that cpu frequency has settled to the "
+ "minimum value")
success = False
# Repeat workload test
@@ -462,7 +497,8 @@ class CPUScalingTest(object):
logging.warning("No On Demand load test time available.")
success = False
else:
- logging.info("On Demand load test time: %.2f" % onDemandTestTime)
+ logging.info(
+ "On Demand load test time: %.2f" % onDemandTestTime)
if onDemandTestTime and self.maximumFrequencyTestTime:
# Compare the timing to the max results from earlier,
@@ -470,19 +506,24 @@ class CPUScalingTest(object):
differenceOnDemandVsMaximum = \
(abs(onDemandTestTime - self.maximumFrequencyTestTime)
/ self.maximumFrequencyTestTime) * 100
- logging.info("Percentage Difference vs. maximum frequency: %.1f%%"
- % differenceOnDemandVsMaximum)
+ logging.info(
+ "Percentage Difference vs. maximum frequency: %.1f%%"
+ % differenceOnDemandVsMaximum)
if differenceOnDemandVsMaximum > self.speedUpTolerance:
- logging.error("On demand performance vs maximum of %.1f%% is not within %.1f%% margin"
- % (differenceOnDemandVsMaximum,
- self.speedUpTolerance))
+ logging.error(
+ "On demand performance vs maximum of %.1f%% is not "
+ "within %.1f%% margin"
+ % (differenceOnDemandVsMaximum, self.speedUpTolerance))
success = False
else:
- logging.error("Not enough timing data to calculate speed differences.")
+ logging.error(
+ "Not enough timing data to calculate speed differences.")
# Verify the current speed has returned to the lowest speed again
if not self.verifyMinimumFrequency():
- logging.error("Could not verify that cpu frequency has settled to the minimum value")
+ logging.error(
+ "Could not verify that cpu frequency has settled to the "
+ "minimum value")
success = False
return success
@@ -504,11 +545,14 @@ class CPUScalingTest(object):
# Verify the current speed is the same as scaling_max_freq
maximumFrequency = self.getParameter("scaling_max_freq")
currentFrequency = self.getParameter("scaling_cur_freq")
- if (not maximumFrequency
+ if (
+ not maximumFrequency
or not currentFrequency
- or (maximumFrequency != currentFrequency)):
- logging.error("Current cpu frequency of %s is not set to the maximum value of %s"
- % (currentFrequency, maximumFrequency))
+ or (maximumFrequency != currentFrequency)
+ ):
+ logging.error(
+ "Current cpu frequency of %s is not set to the maximum "
+ "value of %s" % (currentFrequency, maximumFrequency))
success = False
# Repeat work load test
@@ -517,22 +561,27 @@ class CPUScalingTest(object):
logging.error("No Performance load test time available.")
success = False
else:
- logging.info("Performance load test time: %.2f" % performanceTestTime)
+ logging.info(
+ "Performance load test time: %.2f" % performanceTestTime)
if performanceTestTime and self.maximumFrequencyTestTime:
# Compare the timing to the max results
differencePerformanceVsMaximum = \
(abs(performanceTestTime - self.maximumFrequencyTestTime)
/ self.maximumFrequencyTestTime) * 100
- logging.info("Percentage Difference vs. maximum frequency: %.1f%%"
- % differencePerformanceVsMaximum)
+ logging.info(
+ "Percentage Difference vs. maximum frequency: %.1f%%"
+ % differencePerformanceVsMaximum)
if differencePerformanceVsMaximum > self.speedUpTolerance:
- logging.error("Performance setting vs maximum of %.1f%% is not within %.1f%% margin"
- % (differencePerformanceVsMaximum,
- self.speedUpTolerance))
+ logging.error(
+ "Performance setting vs maximum of %.1f%% is not "
+ "within %.1f%% margin"
+ % (differencePerformanceVsMaximum,
+ self.speedUpTolerance))
success = False
else:
- logging.error("Not enough timing data to calculate speed differences.")
+ logging.error(
+ "Not enough timing data to calculate speed differences.")
return success
@@ -559,7 +608,9 @@ class CPUScalingTest(object):
# Wait a fixed period of time,
# then verify current speed is the slowest in as before
if not self.verifyMinimumFrequency(10):
- logging.error("Could not verify that cpu frequency has settled to the minimum value")
+ logging.error(
+ "Could not verify that cpu frequency has settled "
+ "to the minimum value")
success = False
# Set the frequency step to 0,
@@ -573,28 +624,34 @@ class CPUScalingTest(object):
logging.error("No Conservative load test time available.")
success = False
else:
- logging.info("Conservative load test time: %.2f"
- % conservativeTestTime)
+ logging.info(
+ "Conservative load test time: %.2f"
+ % conservativeTestTime)
if conservativeTestTime and self.minimumFrequencyTestTime:
# Compare the timing to the max results
differenceConservativeVsMinimum = \
(abs(conservativeTestTime - self.minimumFrequencyTestTime)
/ self.minimumFrequencyTestTime) * 100
- logging.info("Percentage Difference vs. minimum frequency: %.1f%%"
- % differenceConservativeVsMinimum)
+ logging.info(
+ "Percentage Difference vs. minimum frequency: %.1f%%"
+ % differenceConservativeVsMinimum)
if differenceConservativeVsMinimum > self.speedUpTolerance:
- logging.error("Performance setting vs minimum of %.1f%% is not within %.1f%% margin"
- % (differenceConservativeVsMinimum,
- self.speedUpTolerance))
+ logging.error(
+ "Performance setting vs minimum of %.1f%% is not "
+ "within %.1f%% margin"
+ % (differenceConservativeVsMinimum,
+ self.speedUpTolerance))
success = False
else:
- logging.error("Not enough timing data to calculate speed differences.")
+ logging.error(
+ "Not enough timing data to calculate speed differences.")
return success
def restoreGovernors(self):
- logging.info("Restoring original governor to %s" % (self.originalGovernors[0]))
+ logging.info(
+ "Restoring original governor to %s" % (self.originalGovernors[0]))
self.setGovernor(self.originalGovernors[0])
@@ -604,8 +661,9 @@ def main():
help="Suppress output.")
parser.add_argument("-c", "--capabilities", action="store_true",
help="Only output CPU capabilities.")
- parser.add_argument("-d", "--debug", action="store_true",
- help="Turn on debug level output for extra info during test run.")
+ parser.add_argument(
+ "-d", "--debug", action="store_true",
+ help="Turn on debug level output for extra info during test run.")
args = parser.parse_args()
# Set up the logging system (unless we don't want ANY logging)
@@ -614,11 +672,11 @@ def main():
logger.setLevel(logging.DEBUG)
format = '%(asctime)s %(levelname)-8s %(message)s'
date_format = '%Y-%m-%d %H:%M:%S'
-
+
# If we DO want console output
if not args.quiet:
- console_handler = logging.StreamHandler() #logging.StreamHandler()
- console_handler.setFormatter(logging.Formatter(format,date_format))
+ console_handler = logging.StreamHandler()
+ console_handler.setFormatter(logging.Formatter(format, date_format))
console_handler.setLevel(logging.INFO)
logger.addHandler(console_handler)
diff --git a/bin/fresh_rate_info.py b/bin/fresh_rate_info.py
index 43a8f2a..3089b48 100755
--- a/bin/fresh_rate_info.py
+++ b/bin/fresh_rate_info.py
@@ -38,13 +38,13 @@ def xrandr_paser(data=None):
resolution = None
xrandrs = list()
for line in str(data).split('\n'):
- for match in re.finditer('(.+) connected (\d+x\d+)\+', line):
+ for match in re.finditer(r'(.+) connected (\d+x\d+)\+', line):
connector = match.group(1)
resolution = match.group(2)
break
if resolution is None:
continue
- for match in re.finditer('{0}\s+(.+)\*'.format(resolution),
+ for match in re.finditer(r'{0}\s+(.+)\*'.format(resolution),
line):
refresh_rate = match.group(1)
xrandr = {'connector': connector,
@@ -72,5 +72,6 @@ def main():
xrandr['resolution'],
xrandr['refresh_rate']))
+
if __name__ == '__main__':
sys.exit(main())
diff --git a/bin/get_make_and_model.py b/bin/get_make_and_model.py
index 496a949..42fb219 100755
--- a/bin/get_make_and_model.py
+++ b/bin/get_make_and_model.py
@@ -1,26 +1,28 @@
#!/usr/bin/env python3
-import os.path
import shlex
from subprocess import check_output
+
def print_header(value):
print("{}:".format(value))
+
def print_data(key, value):
print(" {}: {}".format(key, value))
+
def run_cmd(option):
cmd = "lshw -C " + option
- out = check_output(shlex.split(cmd),
- universal_newlines = True)
+ out = check_output(shlex.split(cmd), universal_newlines=True)
return out.split('\n')
+
def main():
keys = {'Manufacturer': 'vendor',
'Model': 'product',
'Version': 'version'}
- lshw_classes = {'system': 'System',
+ lshw_classes = {'system': 'System',
'bus': 'Mainboard'}
for lshw_class in lshw_classes:
@@ -38,6 +40,6 @@ def main():
for key in data:
print_data(key, data[key])
+
if __name__ == "__main__":
raise SystemExit(main())
-
diff --git a/bin/gpu_test.py b/bin/gpu_test.py
index 49ea31a..4f2af2e 100755
--- a/bin/gpu_test.py
+++ b/bin/gpu_test.py
@@ -30,9 +30,9 @@ import subprocess
import sys
import time
gi.require_version('Gio', '2.0')
-from gi.repository import Gio
-from math import cos, sin
-from threading import Thread
+from gi.repository import Gio # noqa: E402
+from math import cos, sin # noqa: E402
+from threading import Thread # noqa: E402
class GlxThread(Thread):
@@ -44,14 +44,13 @@ class GlxThread(Thread):
try:
self.process = subprocess.Popen(
- ["glxgears","-geometry", "400x400"],
+ ["glxgears", "-geometry", "400x400"],
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT)
self.process.communicate()
except (subprocess.CalledProcessError, FileNotFoundError) as er:
print("WARNING: Unable to start glxgears (%s)" % er)
-
def terminate(self):
if not hasattr(self, 'id'):
print("WARNING: Attempted to terminate non-existing window.")
@@ -121,8 +120,8 @@ class Html5VideoThread(Thread):
def html5_path(self):
if os.getenv('PLAINBOX_PROVIDER_DATA'):
return os.path.join(
- os.getenv('PLAINBOX_PROVIDER_DATA'),
- 'websites/html5_video.html')
+ os.getenv('PLAINBOX_PROVIDER_DATA'),
+ 'websites/html5_video.html')
def run(self):
if self.html5_path and os.path.isfile(self.html5_path):
@@ -136,8 +135,8 @@ class Html5VideoThread(Thread):
print("WARNING: test results may be invalid.")
def terminate(self):
- if self.html5_path and os.path.isfile(self.html5_path):
- subprocess.call("pkill firefox", shell=True)
+ if self.html5_path and os.path.isfile(self.html5_path):
+ subprocess.call("pkill firefox", shell=True)
def check_gpu(log=None):
@@ -172,10 +171,10 @@ def main():
print("WARNING: Got an exception %s" % er)
windows = ""
for app in sorted(windows.splitlines(), reverse=True):
- if not b'glxgears' in app:
+ if b'glxgears' not in app:
continue
GlxWindows[i].id = str(
- re.match(b'^(0x\w+)', app).group(0), 'utf-8')
+ re.match(b'^(0x\w+)', app).group(0), 'utf-8') # noqa: W605
break
if hasattr(GlxWindows[i], "id"):
rotator = RotateGlxThread(GlxWindows[i].id, i + 1)
@@ -204,7 +203,7 @@ def main():
'gconftool --get /apps/compiz-1/general/screen0/options/vsize',
shell=True))
(x_res, y_res) = re.search(
- b'DG:\s+(\d+)x(\d+)',
+ b'DG:\s+(\d+)x(\d+)', # noqa: W605
subprocess.check_output('wmctrl -d', shell=True)).groups()
DesktopSwitch = ChangeWorkspace(
hsize, vsize, int(x_res) // hsize, int(y_res) // vsize)
@@ -230,5 +229,6 @@ def main():
settings.set_int("vsize", vsize_ori)
Gio.Settings.sync()
+
if __name__ == '__main__':
sys.exit(main())
diff --git a/bin/graphic_memory_info.py b/bin/graphic_memory_info.py
index ba7eece..29040a5 100755
--- a/bin/graphic_memory_info.py
+++ b/bin/graphic_memory_info.py
@@ -42,13 +42,13 @@ def vgamem_paser(data=None):
device = None
vgamems = list()
for line in data.split('\n'):
- for match in re.finditer('(\d\d:\d\d\.\d) VGA(.+): (.+)', line):
+ for match in re.finditer(r'(\d\d:\d\d\.\d) VGA(.+): (.+)', line):
device = match.group(1)
name = match.group(3)
if device is None:
continue
-#Memory at e0000000 (32-bit, prefetchable) [size=256M]
- for match in re.finditer('Memory(.+) prefetchable\) \[size=(\d+)M\]',
+# Memory at e0000000 (32-bit, prefetchable) [size=256M]
+ for match in re.finditer(r'Memory(.+) prefetchable\) \[size=(\d+)M\]',
line):
vgamem_size = match.group(2)
vgamem = {'device': device,
@@ -77,5 +77,6 @@ def main():
vgamem['name'],
vgamem['vgamem_size']))
+
if __name__ == '__main__':
sys.exit(main())
diff --git a/bin/graphics_driver.py b/bin/graphics_driver.py
index 6d28f5a..0b41050 100755
--- a/bin/graphics_driver.py
+++ b/bin/graphics_driver.py
@@ -129,7 +129,7 @@ class XorgLog(object):
gathering_module = False
module = None
m = re.search(
- '\(II\) Loading.*modules\/drivers\/(.+)_drv\.so', line)
+ r'\(II\) Loading.*modules\/drivers\/(.+)_drv\.so', line)
if m:
found_ddx = True
continue
@@ -207,7 +207,8 @@ class XorgLog(object):
# For NVIDIA
m = re.search(r'\(II\) (.*)\(\d+\): Setting mode "(.*?):', line)
if not m:
- m = re.search(r'\(II\) (.*)\(\d+\): Setting mode "(NULL)"', line)
+ m = re.search(
+ r'\(II\) (.*)\(\d+\): Setting mode "(NULL)"', line)
if m:
self.displays[display_name] = display
self.video_driver = m.group(1)
@@ -314,8 +315,8 @@ def is_laptop():
def hybrid_graphics_check(xlog):
'''Check for Hybrid Graphics'''
- card_id1 = re.compile('.*0300: *(.+):(.+) \(.+\)')
- card_id2 = re.compile('.*03..: *(.+):(.+)')
+ card_id1 = re.compile(r'.*0300: *(.+):(.+) \(.+\)')
+ card_id2 = re.compile(r'.*03..: *(.+):(.+)')
cards_dict = {'8086': 'Intel', '10de': 'NVIDIA', '1002': 'AMD'}
cards = []
drivers = []
@@ -372,5 +373,6 @@ def main():
return 1 if 1 in results else 0
+
if __name__ == "__main__":
sys.exit(main())
diff --git a/bin/graphics_modes_info.py b/bin/graphics_modes_info.py
index f3c962a..f92cd76 100755
--- a/bin/graphics_modes_info.py
+++ b/bin/graphics_modes_info.py
@@ -32,7 +32,7 @@ from checkbox_support.contrib import xrandr
def print_modes_info(screen):
"""Print some information about the detected screen and its outputs"""
xrandr._check_required_version((1, 0))
- print("Screen %s: minimum %s x %s, current %s x %s, maximum %s x %s" %\
+ print("Screen %s: minimum %s x %s, current %s x %s, maximum %s x %s" %
(screen._screen,
screen._width_min, screen._height_min,
screen._width, screen._height,
@@ -50,10 +50,9 @@ def print_modes_info(screen):
for m in range(len(modes)):
mode = modes[m]
refresh = mode.dotClock / (mode.hTotal * mode.vTotal)
- print(" [%s] %s x %s @ %s Hz" % (m,
- mode.width,
- mode.height,
- refresh), end=' ')
+ print(
+ " [%s] %s x %s @ %s Hz" %
+ (m, mode.width, mode.height, refresh), end=' ')
if mode.id == output._mode:
print("(current)", end=' ')
if m == output.get_preferred_mode():
@@ -70,5 +69,6 @@ def main():
except(xrandr.UnsupportedRRError):
print('Error: RandR version lower than 1.0', file=sys.stderr)
+
if __name__ == '__main__':
main()
diff --git a/bin/graphics_stress_test.py b/bin/graphics_stress_test.py
index d467b3f..c0eeb4a 100755
--- a/bin/graphics_stress_test.py
+++ b/bin/graphics_stress_test.py
@@ -45,7 +45,7 @@ class VtWrapper(object):
vt = 0
proc = Popen(['ps', 'aux'], stdout=PIPE, universal_newlines=True)
proc_output = proc.communicate()[0].split('\n')
- proc_line = re.compile('.*tty(\d+).+/usr/bin/X.*')
+ proc_line = re.compile(r'.*tty(\d+).+/usr/bin/X.*')
for line in proc_output:
match = proc_line.match(line)
if match:
@@ -56,6 +56,7 @@ class VtWrapper(object):
retcode = call(['chvt', '%d' % vt])
return retcode
+
class SuspendWrapper(object):
def __init__(self):
pass
@@ -132,6 +133,7 @@ class SuspendWrapper(object):
return status
+
class RotationWrapper(object):
def __init__(self):
@@ -178,6 +180,7 @@ class RotationWrapper(object):
result = 1
return result
+
class RenderCheckWrapper(object):
"""A simple class to run the rendercheck suites"""
@@ -234,14 +237,15 @@ class RenderCheckWrapper(object):
passed = int(match_output.group(1).strip())
total = int(match_output.group(2).strip())
logging.info('Results:')
- logging.info(' %d tests passed out of %d.'
- % (passed, total))
+ logging.info(
+ ' %d tests passed out of %d.' % (passed, total))
if show_errors and match_errors:
error = match_errors.group(0).strip()
if first_error:
- logging.debug('Rendercheck %s suite errors '
- 'from iteration %d:'
- % (suites, iteration))
+ logging.debug(
+ 'Rendercheck %s suite errors '
+ 'from iteration %d:'
+ % (suites, iteration))
first_error = False
logging.debug(' %s' % error)
@@ -254,17 +258,17 @@ class RenderCheckWrapper(object):
exit_status = 0
for suite in suites:
for it in range(iterations):
- logging.info('Iteration %d of Rendercheck %s suite...'
- % (it + 1, suite))
- (status, passed, total) = \
- self._print_test_info(suites=suite,
- iteration=it + 1,
- show_errors=show_errors)
+ logging.info(
+ 'Iteration %d of Rendercheck %s suite...'
+ % (it + 1, suite))
+ (status, passed, total) = self._print_test_info(
+ suites=suite, iteration=it + 1, show_errors=show_errors)
if status != 0:
# Make sure to catch a non-zero exit status
- logging.info('Iteration %d of Rendercheck %s suite '
- 'exited with status %d.'
- % (it + 1, suite, status))
+ logging.info(
+ 'Iteration %d of Rendercheck %s suite '
+ 'exited with status %d.'
+ % (it + 1, suite, status))
exit_status = status
it += 1
@@ -372,7 +376,6 @@ def main():
logfile_handler.setFormatter(logging.Formatter(format))
logger.addHandler(logfile_handler)
- log_path = os.path.abspath(logfile)
# Write only to stdout
else:
@@ -402,16 +405,18 @@ def main():
logging.info('Iteration %d...', it)
retcode = vt_wrap.set_vt(target_vt)
if retcode != 0:
- logging.error('Error: switching to tty%d failed with code %d '
- 'on iteration %d' % (target_vt, retcode, it))
+ logging.error(
+ 'Error: switching to tty%d failed with code %d '
+ 'on iteration %d' % (target_vt, retcode, it))
status = 1
else:
logging.info('Switching to tty%d: passed' % (target_vt))
time.sleep(2)
retcode = vt_wrap.set_vt(vt_wrap.x_vt)
if retcode != 0:
- logging.error('Error: switching to tty%d failed with code %d '
- 'on iteration %d' % (vt_wrap.x_vt, retcode, it))
+ logging.error(
+ 'Error: switching to tty%d failed with code %d '
+ 'on iteration %d' % (vt_wrap.x_vt, retcode, it))
else:
logging.info('Switching to tty%d: passed' % (vt_wrap.x_vt))
status = 1
@@ -463,5 +468,6 @@ def main():
return status
+
if __name__ == '__main__':
exit(main())
diff --git a/bin/gst_pipeline_test.py b/bin/gst_pipeline_test.py
index c7a9af5..f022632 100755
--- a/bin/gst_pipeline_test.py
+++ b/bin/gst_pipeline_test.py
@@ -9,9 +9,9 @@ import sys
import time
gi.require_version('Gst', '1.0')
gi.require_version('GLib', '2.0')
-from gi.repository import Gst
-from gi.repository import GLib
-from subprocess import check_output
+from gi.repository import Gst # noqa: E402
+from gi.repository import GLib # noqa: E402
+from subprocess import check_output # noqa: E402
def check_state(device):
@@ -22,8 +22,11 @@ def check_state(device):
data = sink_info.split("\n")
try:
- device_name = re.findall(".*Name:\s.*%s.*" % device, sink_info, re.IGNORECASE)[0].lstrip()
- sink = re.findall(".*Name:\s(.*%s.*)" % device, sink_info, re.IGNORECASE)[0].lstrip()
+ device_name = re.findall(
+ r".*Name:\s.*%s.*" % device, sink_info, re.IGNORECASE)[0].lstrip()
+ sink = re.findall(
+ r".*Name:\s(.*%s.*)" % device, sink_info,
+ re.IGNORECASE)[0].lstrip()
status = data[data.index("\t" + device_name) - 1]
except (IndexError, ValueError):
logging.error("Failed to find status for device: %s" % device)
@@ -32,22 +35,26 @@ def check_state(device):
os.environ['PULSE_SINK'] = sink
logging.info("[ Pulse sink ]".center(80, '='))
logging.info("Device: %s %s" % (device_name.strip(), status.strip()))
- return status
+ return status
def main():
parser = ArgumentParser(description='Simple GStreamer pipeline player')
- parser.add_argument('PIPELINE',
+ parser.add_argument(
+ 'PIPELINE',
help='Quoted GStreamer pipeline to launch')
- parser.add_argument('-t', '--timeout',
+ parser.add_argument(
+ '-t', '--timeout',
type=int, required=True,
help='Timeout for running the pipeline')
- parser.add_argument('-d', '--device',
+ parser.add_argument(
+ '-d', '--device',
type=str,
help="Device to check for status")
args = parser.parse_args()
- logging.basicConfig(format='%(levelname)s:%(message)s', level=logging.INFO,
+ logging.basicConfig(
+ format='%(levelname)s:%(message)s', level=logging.INFO,
stream=sys.stdout)
exit_code = 0
@@ -63,7 +70,7 @@ def main():
except GLib.GError as error:
print("Specified pipeline couldn't be processed.")
print("Error when processing pipeline: {}".format(error))
- #Exit harmlessly
+ # Exit harmlessly
return(2)
print("Pipeline initialized, now starting playback.")
diff --git a/bin/hdd_parking.py b/bin/hdd_parking.py
index 6d34904..01022ba 100755
--- a/bin/hdd_parking.py
+++ b/bin/hdd_parking.py
@@ -25,7 +25,7 @@
"""
This script verifies that a systems HDD protection capabilities
are triggered when appropriate. There are many implementations
-of HDD protection from different OEMs, each implemented in a
+of HDD protection from different OEMs, each implemented in a
different way, so this script can only support implementations
which are known to work and are testable. Currently the list
of supported implementations is:
@@ -42,10 +42,12 @@ from subprocess import Popen, PIPE
TIMEOUT = 15.0
+
def hdaps_test(run_time):
try:
- hdapsd = Popen(['/usr/sbin/hdapsd'], stdout=PIPE, stderr=PIPE,
- universal_newlines=True)
+ hdapsd = Popen(
+ ['/usr/sbin/hdapsd'], stdout=PIPE, stderr=PIPE,
+ universal_newlines=True)
except OSError as err:
print("Unable to start hdapsd: {}".format(err))
return 1
@@ -59,6 +61,7 @@ def hdaps_test(run_time):
return 0
return 1
+
def main():
# First establish the driver used
parser = ArgumentParser("Tests a systems HDD protection capabilities. "
@@ -70,5 +73,6 @@ def main():
'all axis. No particular force should be required.')
return hdaps_test(parser.parse_args().timeout)
+
if __name__ == "__main__":
sys.exit(main())
diff --git a/bin/hotkey_tests.py b/bin/hotkey_tests.py
index f1b3560..066f36c 100755
--- a/bin/hotkey_tests.py
+++ b/bin/hotkey_tests.py
@@ -554,5 +554,6 @@ def main():
if failed:
raise SystemExit('Some test failed!')
+
if __name__ == '__main__':
main()
diff --git a/bin/key_test.py b/bin/key_test.py
index 2bd95f9..883bf32 100755
--- a/bin/key_test.py
+++ b/bin/key_test.py
@@ -29,7 +29,7 @@ import termios
from gettext import gettext as _
-from gi.repository import GObject
+from gi.repository import GLib
from optparse import OptionParser
@@ -81,7 +81,7 @@ class Reporter(object):
self.scancodes = scancodes
self.fileno = os.open("/dev/console", os.O_RDONLY)
- GObject.io_add_watch(self.fileno, GObject.IO_IN, self.on_key)
+ GLib.io_add_watch(self.fileno, GLib.IO_IN, self.on_key)
# Set terminal attributes
self.saved_attributes = termios.tcgetattr(self.fileno)
@@ -324,7 +324,8 @@ class GtkReporter(Reporter):
def found_key(self, key):
super(GtkReporter, self).found_key(key)
- self.icons[key].set_from_stock(self.ICON_TESTED, size=self.ICON_SIZE)
+ self.icons[key].set_from_icon_name(
+ self.ICON_TESTED, size=self.ICON_SIZE)
self.check_keys()
@@ -334,7 +335,7 @@ class GtkReporter(Reporter):
stock_icon = self.ICON_UNTESTED
else:
stock_icon = self.ICON_NOT_REQUIRED
- self.icons[key].set_from_stock(stock_icon, self.ICON_SIZE)
+ self.icons[key].set_from_icon_name(stock_icon, self.ICON_SIZE)
self.check_keys()
@@ -394,10 +395,10 @@ Hint to find codes:
key = Key(codes, name)
keys.append(key)
- main_loop = GObject.MainLoop()
+ main_loop = GLib.MainLoop()
try:
reporter = reporter_factory(main_loop, keys, options.scancodes)
- except:
+ except OSError:
parser.error("Failed to initialize interface: %s" % options.interface)
try:
@@ -408,5 +409,6 @@ Hint to find codes:
return reporter.exit_code
+
if __name__ == "__main__":
sys.exit(main(sys.argv[1:]))
diff --git a/bin/keyboard_test.py b/bin/keyboard_test.py
index 1615106..e87dc2e 100755
--- a/bin/keyboard_test.py
+++ b/bin/keyboard_test.py
@@ -84,5 +84,6 @@ def main(args):
return 0
+
if __name__ == "__main__":
sys.exit(main(sys.argv[1:]))
diff --git a/bin/lock_screen_watcher.py b/bin/lock_screen_watcher.py
index 1c30270..e4433c8 100755
--- a/bin/lock_screen_watcher.py
+++ b/bin/lock_screen_watcher.py
@@ -20,8 +20,8 @@ import dbus
import gi
from dbus.mainloop.glib import DBusGMainLoop
gi.require_version('GLib', '2.0')
-from gi.repository import GObject
-from gi.repository import GLib
+from gi.repository import GObject # noqa: E402
+from gi.repository import GLib # noqa: E402
def filter_cb(bus, message):
@@ -41,6 +41,7 @@ def on_timeout_expired():
print("You have failed to perform the required manipulation in time")
raise SystemExit(1)
+
DBusGMainLoop(set_as_default=True)
bus = dbus.SessionBus()
bus.add_match_string("type='signal',interface='com.ubuntu.Upstart0_6'")
diff --git a/bin/lsmod_info.py b/bin/lsmod_info.py
index 3b81542..cb7c0bc 100755
--- a/bin/lsmod_info.py
+++ b/bin/lsmod_info.py
@@ -36,5 +36,6 @@ def main():
print('%s: %s' % (module, version))
return 0
+
if __name__ == '__main__':
sys.exit(main())
diff --git a/bin/manage_compiz_plugin.py b/bin/manage_compiz_plugin.py
index 03c0ed8..be9541f 100755
--- a/bin/manage_compiz_plugin.py
+++ b/bin/manage_compiz_plugin.py
@@ -29,11 +29,10 @@ from gettext import gettext as _
import argparse
import gettext
import os
-import sys
import subprocess
import time
-KEY="/org/compiz/profiles/unity/plugins/core/active-plugins"
+KEY = "/org/compiz/profiles/unity/plugins/core/active-plugins"
gettext.textdomain("com.canonical.certification.checkbox")
gettext.bindtextdomain("com.canonical.certification.checkbox",
@@ -41,8 +40,9 @@ gettext.bindtextdomain("com.canonical.certification.checkbox",
plugins = eval(subprocess.check_output(["dconf", "read", KEY]))
-parser = argparse.ArgumentParser(description=_("enable/disable compiz plugins"),
- epilog=_("Available plugins: {}").format(plugins))
+parser = argparse.ArgumentParser(
+ description=_("enable/disable compiz plugins"),
+ epilog=_("Available plugins: {}").format(plugins))
parser.add_argument("plugin", type=str, help=_('Name of plugin to control'))
parser.add_argument("action", type=str, choices=['enable', 'disable'],
help=_("What to do with the plugin"))
diff --git a/bin/memory_compare.py b/bin/memory_compare.py
index 02e2def..aaf89f4 100755
--- a/bin/memory_compare.py
+++ b/bin/memory_compare.py
@@ -25,7 +25,6 @@
import os
import sys
import re
-from math import log, copysign
from subprocess import check_output, CalledProcessError, PIPE
from checkbox_support.helpers.human_readable_bytes import HumanReadableBytes
@@ -44,7 +43,7 @@ class LshwJsonResult:
desc_regex = re.compile('System Memory', re.IGNORECASE)
def addHardware(self, hardware):
- if self.desc_regex.match(str(hardware.get('description',0))):
+ if self.desc_regex.match(str(hardware.get('description', 0))):
self.memory_reported += int(hardware.get('size', 0))
elif 'bank' in hardware['id']:
self.banks_reported += int(hardware.get('size', 0))
@@ -134,5 +133,6 @@ def main():
(difference, percentage, threshold), file=sys.stderr)
return 1
+
if __name__ == "__main__":
sys.exit(main())
diff --git a/bin/network.py b/bin/network.py
index a646f9c..60f7b0a 100755
--- a/bin/network.py
+++ b/bin/network.py
@@ -255,14 +255,15 @@ class IPerfPerformanceTest(object):
logging.warning("The network test against {} failed because:".
format(self.target))
if percent < self.fail_threshold:
- logging.error(" Transfer speed: {} Mb/s".
- format(throughput))
- logging.error(" {:03.2f}% of theoretical max {} Mb/s\n".
- format(percent, int(self.iface.max_speed)))
+ logging.error(" Transfer speed: {} Mb/s".format(throughput))
+ logging.error(
+ " {:03.2f}% of theoretical max {} Mb/s\n".format(
+ percent, int(self.iface.max_speed)))
if cpu_load > self.cpu_load_fail_threshold:
logging.error(" CPU load: {}%".format(cpu_load))
- logging.error(" CPU load is above {}% maximum\n".
- format(self.cpu_load_fail_threshold))
+ logging.error(
+ " CPU load is above {}% maximum\n".format(
+ self.cpu_load_fail_threshold))
return 30
logging.debug("Passed benchmark against {}".format(self.target))
@@ -277,9 +278,11 @@ class StressPerformanceTest:
def run(self):
if self.iperf3:
- iperf_cmd = 'timeout -k 1 320 iperf3 -c {} -t 300'.format(self.target)
+ iperf_cmd = 'timeout -k 1 320 iperf3 -c {} -t 300'.format(
+ self.target)
else:
- iperf_cmd = 'timeout -k 1 320 iperf -c {} -t 300'.format(self.target)
+ iperf_cmd = 'timeout -k 1 320 iperf -c {} -t 300'.format(
+ self.target)
print("Running iperf...")
iperf = subprocess.Popen(shlex.split(iperf_cmd))
@@ -365,14 +368,14 @@ class Interface(socket.socket):
ethinfo = check_output(['ethtool', self.interface],
universal_newlines=True,
stderr=STDOUT).split(' ')
- expression = '(\\d+)(base)([A-Z]+)|(\d+)(Mb/s)'
+ expression = r'(\\d+)(base)([A-Z]+)|(\d+)(Mb/s)'
regex = re.compile(expression)
if ethinfo:
for i in ethinfo:
hit = regex.search(i)
if hit:
- speeds.append(int(re.sub("\D", "", hit.group(0))))
+ speeds.append(int(re.sub(r"\D", "", hit.group(0))))
except CalledProcessError as e:
logging.error('ethtool returned an error!')
logging.error(e.output)
@@ -391,7 +394,7 @@ class Interface(socket.socket):
for s in line.split(' '):
hit = regex.search(s)
if hit:
- speeds.append(int(re.sub("\D", "", hit.group(0))))
+ speeds.append(int(re.sub(r"\D", "", hit.group(0))))
except FileNotFoundError:
logging.warning('mii-tool not found! Unable to get max speed')
except CalledProcessError as e:
diff --git a/bin/network_check.py b/bin/network_check.py
index 82c00d9..20b88d5 100755
--- a/bin/network_check.py
+++ b/bin/network_check.py
@@ -6,7 +6,9 @@ ubuntu.com
from subprocess import call
from argparse import ArgumentParser
import http.client
-import urllib.request, urllib.error, urllib.parse
+import urllib.request
+import urllib.error
+import urllib.parse
import sys
@@ -61,12 +63,14 @@ def main():
try:
call(command)
except OSError:
- print("Zenity missing; unable to report test result:\n %s" % message)
+ print(
+ "Zenity missing; unable to report test result:\n %s" % message)
if any(results.values()):
return 0
else:
return 1
+
if __name__ == "__main__":
sys.exit(main())
diff --git a/bin/network_ntp_test.py b/bin/network_ntp_test.py
index 6e2ccf8..f7a2297 100755
--- a/bin/network_ntp_test.py
+++ b/bin/network_ntp_test.py
@@ -84,13 +84,13 @@ def StartStopNTPD(state, pid=0):
SilentCall('/etc/init.d/ntp start')
ntpd_status = CheckNTPD()
- if status == 0:
+ if ntpd_status == 0:
logging.debug('ntpd restarted with PID: %s' % ntpd_status[1])
else:
logging.error('ntpd restart failed for some reason')
else:
- logging.error('%s is an unknown state, unable to start/stop ntpd' %
- state)
+ logging.error(
+ '%s is an unknown state, unable to start/stop ntpd' % state)
def SyncTime(server):
@@ -141,7 +141,8 @@ def SkewTime():
def main():
- description = 'Tests the ability to skew and sync the clock with an NTP server'
+ description = (
+ 'Tests the ability to skew and sync the clock with an NTP server')
parser = ArgumentParser(description=description)
parser.add_argument('--server',
action='store',
@@ -173,7 +174,7 @@ def main():
if args.debug:
logger.setLevel(logging.DEBUG)
logger.addHandler(handler)
-
+
# Make sure NTP is installed
if not os.access('/usr/sbin/ntpdate', os.F_OK):
logging.error('NTP is not installed!')
@@ -208,5 +209,6 @@ def main():
else:
return 1
+
if __name__ == '__main__':
sys.exit(main())
diff --git a/bin/network_reconnect_resume_test.py b/bin/network_reconnect_resume_test.py
index 8861eb6..17a220e 100755
--- a/bin/network_reconnect_resume_test.py
+++ b/bin/network_reconnect_resume_test.py
@@ -43,16 +43,17 @@ def get_wifi_reconnect_times():
Returns a list of all the timestamps for wifi reconnects.
"""
data = subprocess.check_output(['dmesg'], universal_newlines=True)
- syntax = re.compile("\[(.*)\] wlan.* associated")
+ syntax = re.compile(r"\[(.*)\] wlan.* associated")
results = re.findall(syntax, data)
return map(float, results)
+
def get_wired_reconnect_times():
"""
Returns a list of all the timestamps for wired reconnects.
"""
data = subprocess.check_output(['dmesg'], universal_newlines=True)
- syntax = re.compile("\[(.*)\].*eth.* Link is [uU]p")
+ syntax = re.compile(r"\[(.*)\].*eth.* Link is [uU]p")
results = re.findall(syntax, data)
return map(float, results)
@@ -63,7 +64,7 @@ def get_resume_time():
If no resume is found, None is returned.
"""
data = subprocess.check_output(['dmesg'], universal_newlines=True)
- syntax = re.compile("\[(.*)\].ACPI: Waking up from system sleep state S3")
+ syntax = re.compile(r"\[(.*)\].ACPI: Waking up from system sleep state S3")
results = re.findall(syntax, data)
if not results:
return None
@@ -85,7 +86,7 @@ def main():
args = parser.parse_args()
timedif = get_time_difference(args.device)
-
+
if not timedif:
return 0
@@ -98,5 +99,6 @@ def main():
print("PASS: the network connected within the allotted time")
return 0
+
if __name__ == "__main__":
sys.exit(main())
diff --git a/bin/network_restart.py b/bin/network_restart.py
index 14c7357..63a99fb 100755
--- a/bin/network_restart.py
+++ b/bin/network_restart.py
@@ -96,7 +96,8 @@ class GtkApplication(Application):
def __init__(self, address, times):
Application.__init__(self, address, times)
- dialog = Gtk.Dialog(title='Network restart',
+ dialog = Gtk.Dialog(
+ title='Network restart',
buttons=(Gtk.STOCK_CANCEL, Gtk.ResponseType.CANCEL))
dialog.set_default_size(300, 100)
@@ -187,7 +188,8 @@ def ping(address):
"""
logging.info('Pinging {0!r}...'.format(address))
try:
- check_call('ping -c 1 -w 5 {0}'.format(address),
+ check_call(
+ 'ping -c 1 -w 5 {0}'.format(address),
stdout=open(os.devnull, 'w'), stderr=STDOUT, shell=True)
except CalledProcessError:
return False
@@ -209,8 +211,9 @@ class Networking:
"""
output = check_output(['/sbin/ifconfig', '-s', '-a'])
lines = output.splitlines()[1:]
- interfaces = ([interface for interface in
- [line.split()[0] for line in lines] if interface != 'lo'])
+ interfaces = (
+ [interface for interface in
+ [line.split()[0] for line in lines] if interface != 'lo'])
return interfaces
def restart(self):
@@ -287,21 +290,24 @@ def parse_args():
"""
Parse command line options
"""
- parser = ArgumentParser('Reboot networking interface '
- 'and verify that is up again afterwards')
- parser.add_argument('-a', '--address', default='ubuntu.com',
+ parser = ArgumentParser(
+ 'Reboot networking interface and verify that is up again afterwards')
+ parser.add_argument(
+ '-a', '--address', default='ubuntu.com',
help=('Address to ping to verify that network connection is up '
"('%(default)s' by default)"))
parser.add_argument('-o', '--output',
default='/var/log',
help='The path to the log directory. \
Default is /var/log')
- parser.add_argument('-t', '--times',
+ parser.add_argument(
+ '-t', '--times',
type=int, default=1,
help=('Number of times that the network interface has to be restarted '
'(%(default)s by default)'))
log_levels = ['notset', 'debug', 'info', 'warning', 'error', 'critical']
- parser.add_argument('--log-level', dest='log_level_str', default='notset',
+ parser.add_argument(
+ '--log-level', dest='log_level_str', default='notset',
choices=log_levels,
help=('Log level. '
'One of {0} or {1} (%(default)s by default)'
diff --git a/bin/optical_read_test.py b/bin/optical_read_test.py
index b3cf23d..7560db1 100755
--- a/bin/optical_read_test.py
+++ b/bin/optical_read_test.py
@@ -7,7 +7,7 @@ import filecmp
import shutil
from argparse import ArgumentParser
-from subprocess import Popen, PIPE
+from subprocess import Popen, PIPE, SubprocessError
DEFAULT_DIR = '/tmp/checkbox.optical'
DEFAULT_DEVICE_DIR = 'device'
@@ -18,11 +18,7 @@ CDROM_ID = '/lib/udev/cdrom_id'
def _command(command, shell=True):
- proc = Popen(command,
- shell=shell,
- stdout=PIPE,
- stderr=PIPE
- )
+ proc = Popen(command, shell=shell, stdout=PIPE, stderr=PIPE)
return proc
@@ -33,7 +29,7 @@ def _command_out(command, shell=True):
def compare_tree(source, target):
for dirpath, dirnames, filenames in os.walk(source):
- #if both tree are empty return false
+ # if both tree are empty return false
if dirpath == source and dirnames == [] and filenames == []:
return False
for name in filenames:
@@ -64,21 +60,21 @@ def read_test(device):
mount = _command("mount -o ro %s %s" % (device, device_dir))
mount.communicate()
if mount.returncode != 0:
- print("Unable to mount %s to %s" %
- (device, device_dir), file=sys.stderr)
+ print("Unable to mount %s to %s" %
+ (device, device_dir), file=sys.stderr)
return False
file_copy = _command("cp -dpR %s %s" % (device_dir, image_dir))
file_copy.communicate()
if file_copy.returncode != 0:
- print("Failed to copy files from %s to %s" %
- (device_dir, image_dir), file=sys.stderr)
+ print("Failed to copy files from %s to %s" %
+ (device_dir, image_dir), file=sys.stderr)
return False
if compare_tree(device_dir, image_dir):
passed = True
- except:
- print("File Comparison failed while testing %s" % device,
- file=sys.stderr)
+ except (SubprocessError, OSError):
+ print("File Comparison failed while testing %s" % device,
+ file=sys.stderr)
passed = False
finally:
_command("umount %s" % device_dir).communicate(3)
@@ -88,7 +84,7 @@ def read_test(device):
if passed:
print("File Comparison passed (%s)" % device)
-
+
return passed
@@ -127,8 +123,9 @@ def main():
print("Testing %s on %s ... " % (test, device), file=sys.stdout)
tester = "%s_test" % test
return_values.append(globals()[tester](device))
-
+
return False in return_values
+
if __name__ == "__main__":
sys.exit(main())
diff --git a/bin/pm_log_check.py b/bin/pm_log_check.py
index cb75685..fae8eca 100755
--- a/bin/pm_log_check.py
+++ b/bin/pm_log_check.py
@@ -36,8 +36,8 @@ class Parser(object):
"""
Reboot test log file parser
"""
- is_logging_line = (re.compile('^\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2},\d{3}')
- .search)
+ is_logging_line = (re.compile(
+ r'^\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2},\d{3}').search)
is_getting_info_line = (re.compile('Gathering hardware information...$')
.search)
is_executing_line = (re.compile("Executing: '(?P<command>.*)'...$")
@@ -103,8 +103,10 @@ class Parser(object):
if self.is_output_line(line):
command_output = {}
break
- if (self.is_executing_line(line)
- or self.is_getting_info_line(line)):
+ if (
+ self.is_executing_line(line)
+ or self.is_getting_info_line(line)
+ ):
# Skip commands with no output
iterator.unnext(line)
return None
@@ -132,8 +134,10 @@ class Parser(object):
# for the field value
value = []
for line in iterator:
- if (self.is_logging_line(line)
- or self.is_field_line(line)):
+ if (
+ self.is_logging_line(line)
+ or self.is_field_line(line)
+ ):
iterator.unnext(line)
break
diff --git a/bin/pm_test.py b/bin/pm_test.py
index 18cf97c..fcaf7d9 100755
--- a/bin/pm_test.py
+++ b/bin/pm_test.py
@@ -20,7 +20,7 @@ from configparser import ConfigParser
from datetime import datetime, timedelta
from time import localtime, time
gi.require_version("Gtk", "3.0")
-from gi.repository import GObject, Gtk
+from gi.repository import GObject, Gtk # noqa: E402
def main():
@@ -266,9 +266,9 @@ class PowerManagementOperation():
count = count_s3 + count_s2idle
if count != total_suspends_expected:
problems = (
- "Found {} occurrences of '{}'. Expected {}".format(
- count, magic_line.strip(),
- total_suspends_expected))
+ "Found {} occurrences of S3/S2idle."
+ " Expected {}".format(
+ count, total_suspends_expected))
except FileNotFoundError:
problems = "Error opening {}".format(fwts_log_path)
if problems:
@@ -367,13 +367,13 @@ class WakeUpAlarm():
with open(cls.ALARM_FILENAME) as alarm_file:
wakeup_time_stored_str = alarm_file.read()
- if not re.match('\d+', wakeup_time_stored_str):
+ if not re.match(r'\d+', wakeup_time_stored_str):
subprocess.check_call('echo "+%d" > %s'
% (timeout, cls.ALARM_FILENAME),
shell=True)
with open(cls.ALARM_FILENAME) as alarm_file2:
wakeup_time_stored_str = alarm_file2.read()
- if not re.match('\d+', wakeup_time_stored_str):
+ if not re.match(r'\d+', wakeup_time_stored_str):
logging.error('Invalid wakeup time format: {0!r}'
.format(wakeup_time_stored_str))
sys.exit(1)
@@ -396,7 +396,7 @@ class WakeUpAlarm():
sys.exit(1)
with open(cls.RTC_FILENAME) as rtc_file:
- separator_regex = re.compile('\s+:\s+')
+ separator_regex = re.compile(r'\s+:\s+')
rtc_data = dict([separator_regex.split(line.rstrip())
for line in rtc_file])
logging.debug('RTC data:\n{0}'
@@ -582,14 +582,15 @@ class CountdownDialog(Gtk.Dialog):
.run().stdout),
ethernet=(Command('lspci | grep Ethernet')
.run().stdout),
- ifconfig=(Command("ifconfig -a | grep -A1 '^\w'")
+ ifconfig=(Command(
+ r"ifconfig -a | grep -A1 '^\w'")
.run().stdout),
- iwconfig=(Command("iwconfig | grep -A1 '^\w'")
+ iwconfig=(Command(r"iwconfig | grep -A1 '^\w'")
.run().stdout)))
logging.debug('Bluetooth Device:\n'
'{hciconfig}'
- .format(hciconfig=(Command("hciconfig -a "
- "| grep -A2 '^\w'")
+ .format(hciconfig=(Command(r"hciconfig -a "
+ r"| grep -A2 '^\w'")
.run().stdout)))
logging.debug('Video Card:\n'
'{lspci}'
@@ -770,7 +771,7 @@ Exec=sudo {script} -r {repetitions} -w {wakeup} --hardware-delay {hardware_delay
Type=Application
X-GNOME-Autostart-enabled=true
Hidden=false
-"""
+""" # noqa: E501
def __init__(self, args, user=None):
self.args = args
@@ -1012,7 +1013,7 @@ class MyArgumentParser():
'{0}.{1}.{2}.log'.format(
os.path.splitext(os.path.basename(__file__))[0],
args.pm_operation,
- args.total)))
+ args.total))
return args, extra_args
diff --git a/bin/process_wait.py b/bin/process_wait.py
index 6107d1b..1295991 100755
--- a/bin/process_wait.py
+++ b/bin/process_wait.py
@@ -36,14 +36,17 @@ def main(args):
usage = "Usage: %prog PROCESS [PROCESS...]"
parser = OptionParser(usage=usage)
- parser.add_option("-s", "--sleep",
+ parser.add_option(
+ "-s", "--sleep",
type="int",
default=default_sleep,
help="Number of seconds to sleep between checks.")
- parser.add_option("-t", "--timeout",
+ parser.add_option(
+ "-t", "--timeout",
type="int",
help="Number of seconds to timeout from sleeping.")
- parser.add_option("-u", "--uid",
+ parser.add_option(
+ "-u", "--uid",
help="Effective user name or id of the running processes")
(options, processes) = parser.parse_args(args)
diff --git a/bin/pulse-active-port-change.py b/bin/pulse-active-port-change.py
index 90abed8..baf0152 100755
--- a/bin/pulse-active-port-change.py
+++ b/bin/pulse-active-port-change.py
@@ -71,20 +71,21 @@ class AudioPlugDetection:
doc = parse_pactl_output(text)
cfg = set()
for record in doc.record_list:
- active_port = None
- port_availability = None
- # We go through the attribute list once to try to find an active port
- for attr in record.attribute_list:
- if attr.name == "Active Port":
- active_port = attr.value
- # If there is one, we retrieve its availability flag
- if active_port:
- for attr in record.attribute_list:
- if attr.name == "Ports":
- for port in attr.value:
- if port.name == active_port:
- port_availability = port.availability
- cfg.add((record.name, active_port, port_availability))
+ active_port = None
+ port_availability = None
+ # We go through the attribute list once to try to find
+ # an active port
+ for attr in record.attribute_list:
+ if attr.name == "Active Port":
+ active_port = attr.value
+ # If there is one, we retrieve its availability flag
+ if active_port:
+ for attr in record.attribute_list:
+ if attr.name == "Ports":
+ for port in attr.value:
+ if port.name == active_port:
+ port_availability = port.availability
+ cfg.add((record.name, active_port, port_availability))
return cfg
def on_timeout(self, signum, frame):
diff --git a/bin/removable_storage_test.py b/bin/removable_storage_test.py
index 82f1595..4253cbf 100755
--- a/bin/removable_storage_test.py
+++ b/bin/removable_storage_test.py
@@ -15,26 +15,30 @@ import time
import gi
gi.require_version('GUdev', '1.0')
-from gi.repository import GUdev
-
-from checkbox_support.dbus import connect_to_system_bus
-from checkbox_support.dbus.udisks2 import UDISKS2_BLOCK_INTERFACE
-from checkbox_support.dbus.udisks2 import UDISKS2_DRIVE_INTERFACE
-from checkbox_support.dbus.udisks2 import UDISKS2_FILESYSTEM_INTERFACE
-from checkbox_support.dbus.udisks2 import UDISKS2_LOOP_INTERFACE
-from checkbox_support.dbus.udisks2 import UDisks2Model, UDisks2Observer
-from checkbox_support.dbus.udisks2 import is_udisks2_supported
-from checkbox_support.dbus.udisks2 import lookup_udev_device
-from checkbox_support.dbus.udisks2 import map_udisks1_connection_bus
-from checkbox_support.heuristics.udisks2 import is_memory_card
-from checkbox_support.helpers.human_readable_bytes import HumanReadableBytes
-from checkbox_support.parsers.udevadm import CARD_READER_RE
-from checkbox_support.parsers.udevadm import GENERIC_RE
-from checkbox_support.parsers.udevadm import FLASH_RE
-from checkbox_support.parsers.udevadm import find_pkname_is_root_mountpoint
-from checkbox_support.udev import get_interconnect_speed
-from checkbox_support.udev import get_udev_block_devices
-from checkbox_support.udev import get_udev_xhci_devices
+from gi.repository import GUdev # noqa: E402
+
+from checkbox_support.dbus import connect_to_system_bus # noqa: E402
+from checkbox_support.dbus.udisks2 import (
+ UDISKS2_BLOCK_INTERFACE,
+ UDISKS2_DRIVE_INTERFACE,
+ UDISKS2_FILESYSTEM_INTERFACE,
+ UDISKS2_LOOP_INTERFACE,
+ UDisks2Model,
+ UDisks2Observer,
+ is_udisks2_supported,
+ lookup_udev_device,
+ map_udisks1_connection_bus) # noqa: E402
+from checkbox_support.heuristics.udisks2 import is_memory_card # noqa: E402
+from checkbox_support.helpers.human_readable_bytes import (
+ HumanReadableBytes) # noqa: E402
+from checkbox_support.parsers.udevadm import (
+ CARD_READER_RE,
+ GENERIC_RE,
+ FLASH_RE,
+ find_pkname_is_root_mountpoint) # noqa: E402
+from checkbox_support.udev import get_interconnect_speed # noqa: E402
+from checkbox_support.udev import get_udev_block_devices # noqa: E402
+from checkbox_support.udev import get_udev_xhci_devices # noqa: E402
class ActionTimer():
@@ -109,8 +113,8 @@ def on_ubuntucore():
snap = os.getenv("SNAP")
if snap:
with open(os.path.join(snap, 'meta/snap.yaml')) as f:
- for l in f.readlines():
- if l == "confinement: classic\n":
+ for line in f.readlines():
+ if line == "confinement: classic\n":
return False
return True
return False
@@ -855,7 +859,6 @@ def main():
# controller drivers.
# This will raise KeyError for no
# associated disk device was found.
- xhci_disks = test.get_disks_xhci()
if test.get_disks_xhci().get(disk, '') != 'xhci':
raise SystemExit(
"\t\tDisk does not use xhci_hcd.")
diff --git a/bin/removable_storage_watcher.py b/bin/removable_storage_watcher.py
index d5a0722..7120f6d 100755
--- a/bin/removable_storage_watcher.py
+++ b/bin/removable_storage_watcher.py
@@ -9,16 +9,21 @@ import sys
import gi
gi.require_version('GUdev', '1.0')
-from gi.repository import GObject, GUdev
-
-from checkbox_support.dbus import connect_to_system_bus
-from checkbox_support.dbus.udisks2 import UDisks2Model, UDisks2Observer
-from checkbox_support.dbus.udisks2 import is_udisks2_supported
-from checkbox_support.dbus.udisks2 import lookup_udev_device
-from checkbox_support.dbus.udisks2 import map_udisks1_connection_bus
-from checkbox_support.heuristics.udisks2 import is_memory_card
-from checkbox_support.parsers.udevadm import CARD_READER_RE, GENERIC_RE, FLASH_RE
-from checkbox_support.udev import get_interconnect_speed, get_udev_block_devices
+from gi.repository import GObject, GUdev # noqa: E402
+
+from checkbox_support.dbus import connect_to_system_bus # noqa: E402
+from checkbox_support.dbus.udisks2 import UDisks2Model # noqa: E402
+from checkbox_support.dbus.udisks2 import UDisks2Observer # noqa: E402
+from checkbox_support.dbus.udisks2 import is_udisks2_supported # noqa: E402
+from checkbox_support.dbus.udisks2 import lookup_udev_device # noqa: E402
+from checkbox_support.dbus.udisks2 import (
+ map_udisks1_connection_bus) # noqa: E402
+from checkbox_support.heuristics.udisks2 import is_memory_card # noqa: E402
+from checkbox_support.parsers.udevadm import CARD_READER_RE # noqa: E402
+from checkbox_support.parsers.udevadm import GENERIC_RE # noqa: E402
+from checkbox_support.parsers.udevadm import FLASH_RE # noqa: E402
+from checkbox_support.udev import get_interconnect_speed # noqa: E402
+from checkbox_support.udev import get_udev_block_devices # noqa: E402
# Record representing properties of a UDisks1 Drive object needed by the
# UDisks1 version of the watcher implementation
@@ -121,19 +126,24 @@ class UDisks1StorageDeviceListener:
logging.debug("Desired bus devices: %s", desired_bus_devices)
for dev in desired_bus_devices:
if self._memorycard:
- if (dev.bus != 'sdio'
+ if (
+ dev.bus != 'sdio'
and not FLASH_RE.search(dev.media)
and not CARD_READER_RE.search(dev.model)
- and not GENERIC_RE.search(dev.vendor)):
- logging.debug("The device does not seem to be a memory"
- " card (bus: %r, model: %r), skipping",
- dev.bus, dev.model)
+ and not GENERIC_RE.search(dev.vendor)
+ ):
+ logging.debug(
+ "The device does not seem to be a memory"
+ " card (bus: %r, model: %r), skipping",
+ dev.bus, dev.model)
return
print(message % {'bus': 'memory card', 'file': dev.file})
else:
- if (FLASH_RE.search(dev.media)
+ if (
+ FLASH_RE.search(dev.media)
or CARD_READER_RE.search(dev.model)
- or GENERIC_RE.search(dev.vendor)):
+ or GENERIC_RE.search(dev.vendor)
+ ):
logging.debug("The device seems to be a memory"
" card (bus: %r (model: %r), skipping",
dev.bus, dev.model)
@@ -180,8 +190,7 @@ class UDisks1StorageDeviceListener:
message = "Expected memory card device %(file)s inserted"
else:
message = "Expected %(bus)s device %(file)s inserted"
- self.verify_device_change(inserted_devices,
- message=message)
+ self.verify_device_change(inserted_devices, message=message)
def add_detected(self, added_path):
logging.debug("UDisks1 reports device has been added: %s", added_path)
@@ -205,8 +214,9 @@ class UDisks1StorageDeviceListener:
removed_devices = list(set(self._starting_devices) -
set(current_devices))
logging.debug("Computed removed devices: %s", removed_devices)
- self.verify_device_change(removed_devices,
- message="Removable %(bus)s device %(file)s has been removed")
+ self.verify_device_change(
+ removed_devices,
+ message="Removable %(bus)s device %(file)s has been removed")
def get_existing_devices(self):
logging.debug("Getting existing devices from UDisks1")
@@ -221,12 +231,9 @@ class UDisks1StorageDeviceListener:
device_props = dbus.Interface(device_obj,
dbus.PROPERTIES_IFACE)
udisks = 'org.freedesktop.UDisks.Device'
- _device_file = device_props.Get(udisks,
- "DeviceFile")
- _bus = device_props.Get(udisks,
- "DriveConnectionInterface")
- _speed = device_props.Get(udisks,
- "DriveConnectionSpeed")
+ _device_file = device_props.Get(udisks, "DeviceFile")
+ _bus = device_props.Get(udisks, "DriveConnectionInterface")
+ _speed = device_props.Get(udisks, "DriveConnectionSpeed")
_parent_model = ''
_parent_media = ''
_parent_vendor = ''
@@ -411,7 +418,7 @@ class UDisks2StorageDeviceListener:
UDISKS2_DRIVE_PROPERTY_CONNECTION_BUS = "ConnectionBus"
def __init__(self, system_bus, loop, action, devices, minimum_speed,
- memorycard, unmounted = False):
+ memorycard, unmounted=False):
# Store the desired minimum speed of the device in Mbit/s. The argument
# is passed as the number of bits per second so let's fix that.
self._desired_minimum_speed = minimum_speed / 10 ** 6
@@ -581,24 +588,30 @@ class UDisks2StorageDeviceListener:
continue
# For devices with empty "ConnectionBus" property, don't
# require the device to be mounted
- if (record.value.iface_name ==
+ if (
+ record.value.iface_name ==
"org.freedesktop.UDisks2.Drive"
and record.value.delta_type == DELTA_TYPE_PROP
and record.value.prop_name == "ConnectionBus"
- and record.value.prop_value == ""):
+ and record.value.prop_value == ""
+ ):
needs.remove('mounted')
# Detect block devices designated for filesystems
- if (record.value.iface_name ==
+ if (
+ record.value.iface_name ==
"org.freedesktop.UDisks2.Block"
and record.value.delta_type == DELTA_TYPE_PROP
and record.value.prop_name == "IdUsage"
- and record.value.prop_value == "filesystem"):
+ and record.value.prop_value == "filesystem"
+ ):
found.add('block-fs')
# Memorize the block device path
- elif (record.value.iface_name ==
+ elif (
+ record.value.iface_name ==
"org.freedesktop.UDisks2.Block"
and record.value.delta_type == DELTA_TYPE_PROP
- and record.value.prop_name == "PreferredDevice"):
+ and record.value.prop_name == "PreferredDevice"
+ ):
object_block_device = record.value.prop_value
# Ensure the device is a partition
elif (record.value.iface_name ==
@@ -845,8 +858,9 @@ def main():
description = "Wait for the specified device to be inserted or removed."
parser = argparse.ArgumentParser(description=description)
parser.add_argument('action', choices=['insert', 'remove'])
- parser.add_argument('device', choices=['usb', 'sdio', 'firewire', 'scsi',
- 'ata_serial_esata'], nargs="+")
+ parser.add_argument(
+ 'device', choices=['usb', 'sdio', 'firewire', 'scsi',
+ 'ata_serial_esata'], nargs="+")
memorycard_help = ("Memory cards devices on bus other than sdio require "
"this parameter to identify them as such")
parser.add_argument('--memorycard', action="store_true",
@@ -902,5 +916,6 @@ def main():
except KeyboardInterrupt:
return 1
+
if __name__ == "__main__":
sys.exit(main())
diff --git a/bin/resolution_test.py b/bin/resolution_test.py
index 55a7828..77238ff 100755
--- a/bin/resolution_test.py
+++ b/bin/resolution_test.py
@@ -5,7 +5,7 @@ import sys
from argparse import ArgumentParser
gi.require_version('Gdk', '3.0')
-from gi.repository import Gdk
+from gi.repository import Gdk # noqa: E402
def check_resolution():
@@ -28,11 +28,13 @@ def compare_resolution(min_h, min_v):
def main():
parser = ArgumentParser()
- parser.add_argument("--horizontal",
+ parser.add_argument(
+ "--horizontal",
type=int,
default=0,
help="Minimum acceptable horizontal resolution.")
- parser.add_argument("--vertical",
+ parser.add_argument(
+ "--vertical",
type=int,
default=0,
help="Minimum acceptable vertical resolution.")
diff --git a/bin/rotation_test.py b/bin/rotation_test.py
index 6bf90fd..de70f4a 100755
--- a/bin/rotation_test.py
+++ b/bin/rotation_test.py
@@ -27,7 +27,8 @@ import time
import subprocess
gi.require_version('Gdk', '3.0')
-from gi.repository import Gdk
+from gi.repository import Gdk # noqa: E402
+
def main():
"""Run rotation cycling by running xrandr command."""
@@ -41,5 +42,6 @@ def main():
['xrandr', '--output', output, '--rotation', rotation])
time.sleep(4)
+
if __name__ == '__main__':
exit(main())
diff --git a/bin/sleep_test.py b/bin/sleep_test.py
index 423fd6c..3483951 100755
--- a/bin/sleep_test.py
+++ b/bin/sleep_test.py
@@ -184,19 +184,19 @@ class SuspendTest():
if perf:
for idx in range(0, len(loglist)):
if 'PM: Syncing filesystems' in loglist[idx]:
- sleep_start_time = re.split('[\[\]]',
+ sleep_start_time = re.split(r'[\[\]]',
loglist[idx])[1].strip()
logging.debug('Sleep started at %s' % sleep_start_time)
if 'ACPI: Low-level resume complete' in loglist[idx]:
- sleep_end_time = re.split('[\[\]]',
+ sleep_end_time = re.split(r'[\[\]]',
loglist[idx - 1])[1].strip()
logging.debug('Sleep ended at %s' % sleep_end_time)
- resume_start_time = re.split('[\[\]]',
+ resume_start_time = re.split(r'[\[\]]',
loglist[idx])[1].strip()
logging.debug('Resume started at %s' % resume_start_time)
idx += 1
if 'Restarting tasks' in loglist[idx]:
- resume_end_time = re.split('[\[\]]',
+ resume_end_time = re.split(r'[\[\]]',
loglist[idx])[1].strip()
logging.debug('Resume ended at %s' % resume_end_time)
if self.end_marker in loglist[idx]:
@@ -395,5 +395,6 @@ def main():
options.iterations)
return 0
+
if __name__ == '__main__':
sys.exit(main())
diff --git a/bin/sleep_test_log_check.py b/bin/sleep_test_log_check.py
index aae7556..350bbbb 100755
--- a/bin/sleep_test_log_check.py
+++ b/bin/sleep_test_log_check.py
@@ -138,8 +138,9 @@ def main():
logging.basicConfig(level=args.debug)
- #Create a generator and get our lines
- log = (line.rstrip() for line in open(args.logfile, 'rt', encoding="UTF-8"))
+ # Create a generator and get our lines
+ log = (
+ line.rstrip() for line in open(args.logfile, 'rt', encoding="UTF-8"))
# End result will be a dictionary with a key per level, value is another
# dictionary with a key per test (s3, s4, ...) and a list of all failures
@@ -166,9 +167,8 @@ def main():
# Report what I found
for level in sorted(results.keys()):
- if results[level]: # Yes, we can have an empty level. We may have
- # seen the levelheader but had it report no
- # failures.
+ if results[level]: # Yes, we can have an empty level.
+ # We may have seen the levelheader but had it report no failures.
print("{} failures:".format(level))
for test in results[level].keys():
print(" {}: {} failures".format(test,
@@ -184,8 +184,8 @@ def main():
logging.error("No fwts test summaries found, "
"possible malformed fwts log file")
return_code = 2
- elif not results.keys(): # If it has no keys, means we didn't see any
- # FWTS levels
+ elif not results.keys():
+ # If it has no keys, means we didn't see any FWTS levels
logging.error("None of the summaries contained failure levels, "
"possible malformed fwts log file")
return_code = 2
@@ -197,5 +197,6 @@ def main():
return return_code
+
if __name__ == '__main__':
sys.exit(main())
diff --git a/bin/sleep_time_check.py b/bin/sleep_time_check.py
index f9513fa..90e1bc7 100755
--- a/bin/sleep_time_check.py
+++ b/bin/sleep_time_check.py
@@ -53,13 +53,18 @@ def main():
print(e)
return 1
- if (sleep_time is None or resume_time is None) or \
- (len(sleep_times) != len(resume_times)):
+ if (
+ (sleep_time is None or resume_time is None) or
+ (len(sleep_times) != len(resume_times))
+ ):
print("ERROR: One or more times was not reported correctly")
return 1
- print("Average time to enter sleep state: %.4f seconds" % mean(sleep_times))
- print("Average time to resume from sleep state: %.4f seconds" % mean(resume_times))
+ print(
+ "Average time to enter sleep state: %.4f seconds" % mean(sleep_times))
+ print(
+ "Average time to resume from sleep state: %.4f seconds" % mean(
+ resume_times))
failed = 0
if sleep_time > args.sleep_threshold:
@@ -76,5 +81,6 @@ def main():
return failed
+
if __name__ == "__main__":
sys.exit(main())
diff --git a/bin/stress_ng_test.py b/bin/stress_ng_test.py
index b8c2586..35aad2d 100755
--- a/bin/stress_ng_test.py
+++ b/bin/stress_ng_test.py
@@ -138,7 +138,7 @@ def num_numa_nodes():
try:
return int(run(['numactl', '--hardware'],
stdout=PIPE).stdout.split()[1])
- except:
+ except (ValueError, OSError, IndexError):
return 1
diff --git a/bin/test_bt_keyboard.py b/bin/test_bt_keyboard.py
index 1457a1d..782ae20 100755
--- a/bin/test_bt_keyboard.py
+++ b/bin/test_bt_keyboard.py
@@ -5,7 +5,7 @@
# Written by:
# Maciej Kisielewski <maciej.kisielewski@canonical.com>
-import checkbox_support.bt_helper
+import checkbox_support.bt_helper as bt_helper
def main():
diff --git a/bin/touchpad_driver_info.py b/bin/touchpad_driver_info.py
index 431b874..9f6278f 100755
--- a/bin/touchpad_driver_info.py
+++ b/bin/touchpad_driver_info.py
@@ -67,8 +67,8 @@ def main():
if attributes:
modinfo = TouchpadDriver(attributes['driver'])
attributes['version'] = modinfo.driver_version
- print("%s: %s\n%s: %s\n%s: %s\n" %
- ('Device', attributes['product'],
+ print("%s: %s\n%s: %s\n%s: %s\n" % (
+ 'Device', attributes['product'],
'Driver', attributes['driver'],
'Driver Version', attributes['version']))
else:
diff --git a/bin/touchpad_test.py b/bin/touchpad_test.py
index bc6a474..6ecde92 100755
--- a/bin/touchpad_test.py
+++ b/bin/touchpad_test.py
@@ -8,8 +8,8 @@ from gettext import gettext as _
gi.require_version('Gdk', '3.0')
gi.require_version('Gio', '2.0')
gi.require_version("Gtk", "3.0")
-from gi.repository import Gio, Gtk, Gdk
-from optparse import OptionParser
+from gi.repository import Gio, Gtk, Gdk # noqa: E402
+from optparse import OptionParser # noqa: E402
EXIT_WITH_FAILURE = 1
@@ -205,5 +205,6 @@ def main(args):
return scroller.exit_code
+
if __name__ == "__main__":
sys.exit(main(sys.argv[1:]))
diff --git a/bin/virtualization.py b/bin/virtualization.py
index 2b7cd54..10b827b 100755
--- a/bin/virtualization.py
+++ b/bin/virtualization.py
@@ -330,7 +330,7 @@ class KVMTest(object):
# Attempt download
try:
- resp = urllib.request.urlretrieve(full_url, cloud_iso)
+ urllib.request.urlretrieve(full_url, cloud_iso)
except (IOError,
OSError,
urllib.error.HTTPError,
@@ -400,25 +400,6 @@ class KVMTest(object):
Generate Cloud meta data and creates an iso object
to be mounted as virtual device to instance during boot.
"""
-
- user_data = """\
-#cloud-config
-
-runcmd:
- - [ sh, -c, echo "========= CERTIFICATION TEST =========" ]
-
-power_state:
- mode: halt
- message: Bye
- timeout: 480
-
-final_message: CERTIFICATION BOOT COMPLETE
-"""
-
- meta_data = """\
-{ echo instance-id: iid-local01; echo local-hostname, certification; }
-"""
-
for file in ['user-data', 'meta-data']:
logging.debug("Creating cloud %s", file)
with open(file, "wt") as data_file:
@@ -427,11 +408,11 @@ final_message: CERTIFICATION BOOT COMPLETE
# Create Data ISO hosting user & meta cloud config data
try:
- iso_build = check_output(
+ check_output(
['genisoimage', '-output', 'seed.iso', '-volid',
'cidata', '-joliet', '-rock', 'user-data', 'meta-data'],
universal_newlines=True)
- except CalledProcessError as exception:
+ except CalledProcessError:
logging.exception("Cloud data disk creation failed")
def log_check(self, stream):
@@ -483,7 +464,7 @@ final_message: CERTIFICATION BOOT COMPLETE
self.create_cloud_disk()
# Boot Virtual Machine
- instance = self.boot_image(self.image)
+ self.boot_image(self.image)
# If running in console, reset console window to regain
# control from VM Serial I/0
@@ -659,7 +640,7 @@ class LXDTest(object):
logging.debug("Attempting download of {} from {}".format(filename,
url))
try:
- resp = urllib.request.urlretrieve(url, filename)
+ urllib.request.urlretrieve(url, filename)
except (IOError,
OSError,
urllib.error.HTTPError,
diff --git a/bin/volume_test.py b/bin/volume_test.py
index a949f93..d1aeffd 100755
--- a/bin/volume_test.py
+++ b/bin/volume_test.py
@@ -8,12 +8,12 @@ from subprocess import check_output
TYPES = ("source", "sink")
-active_entries_regex = re.compile("\* index.*?(?=properties)", re.DOTALL)
-entries_regex = re.compile("index.*?(?=properties)", re.DOTALL)
-index_regex = re.compile("(?<=index: )[0-9]*")
-muted_regex = re.compile("(?<=muted: ).*")
-volume_regex = re.compile("(?<=volume: 0: )\s*[0-9]*")
-name_regex = re.compile("(?<=name:).*")
+active_entries_regex = re.compile(r"\* index.*?(?=properties)", re.DOTALL)
+entries_regex = re.compile(r"index.*?(?=properties)", re.DOTALL)
+index_regex = re.compile(r"(?<=index: )[0-9]*")
+muted_regex = re.compile(r"(?<=muted: ).*")
+volume_regex = re.compile(r"(?<=volume: 0: )\s*[0-9]*")
+name_regex = re.compile(r"(?<=name:).*")
def check_muted():
@@ -27,26 +27,29 @@ def check_muted():
pacmd_entries = check_output(["pacmd", "list-%ss" % vtype],
universal_newlines=True)
except Exception as e:
- print("Error when running pacmd list-%ss: %s" % (vtype, e),
- file=sys.stderr)
+ print(
+ "Error when running pacmd list-%ss: %s" % (vtype, e),
+ file=sys.stderr)
return 1
active_entry_match = active_entries_regex.search(pacmd_entries)
if active_entry_match:
active_entry = active_entry_match.group()
else:
- print("Unable to find a %s active_entry in the pacmd list-%ss"\
- " output\npacmd output was: %s" %
- (vtype, vtype, pacmd_entries), file=sys.stderr)
+ print(
+ "Unable to find a %s active_entry in the pacmd list-%ss"
+ " output\npacmd output was: %s" %
+ (vtype, vtype, pacmd_entries), file=sys.stderr)
return 1
name_match = name_regex.search(active_entry)
if name_match:
name = name_match.group()
else:
- print("Unable to determine device bus information from the"\
- " pacmd list-%ss output\npacmd output was: %s" %
- (vtype, pacmd_entries), file=sys.stderr)
+ print(
+ "Unable to determine device bus information from the"
+ " pacmd list-%ss output\npacmd output was: %s" %
+ (vtype, pacmd_entries), file=sys.stderr)
return 1
muted_match = muted_regex.search(active_entry)
@@ -58,9 +61,10 @@ def check_muted():
else:
print("PASS: Audio is not muted on %s %s" % (name, vtype))
else:
- print("Unable to find mute information in the pacmd list-%ss"\
- " output for device %s\npacmd output was: %s" %
- (vtype, name, pacmd_entries), file=sys.stderr)
+ print(
+ "Unable to find mute information in the pacmd list-%ss"
+ " output for device %s\npacmd output was: %s" %
+ (vtype, name, pacmd_entries), file=sys.stderr)
return 1
return retval
@@ -77,8 +81,9 @@ def check_volume(minvol, maxvol):
pacmd_entries = check_output(["pacmd", "list-%ss" % vtype],
universal_newlines=True)
except Exception as e:
- print("Error when running pacmd list-%ss: %s" % (vtype, e),
- file=sys.stderr)
+ print(
+ "Error when running pacmd list-%ss: %s" % (vtype, e),
+ file=sys.stderr)
return 1
entries = entries_regex.findall(pacmd_entries)
@@ -88,32 +93,36 @@ def check_volume(minvol, maxvol):
if name_match:
name = name_match.group()
else:
- print("Unable to determine device bus information from the"\
- " pacmd list-%ss output\npacmd output was: %s" %
- (vtype, pacmd_entries), file=sys.stderr)
+ print(
+ "Unable to determine device bus information from the"
+ " pacmd list-%ss output\npacmd output was: %s" %
+ (vtype, pacmd_entries), file=sys.stderr)
return 1
volume_match = volume_regex.search(entry)
if volume_match:
volume = int(volume_match.group().strip())
if volume > maxvol:
- print ("FAIL: Volume of %d is greater than"\
- " maximum of %d for %s %s" % (volume, maxvol,
- name, vtype))
+ print(
+ "FAIL: Volume of %d is greater than"
+ " maximum of %d for %s %s" %
+ (volume, maxvol, name, vtype))
retval = 1
elif volume < minvol:
- print ("FAIL: Volume of %d is less than"\
- " minimum of %d for %s %s" % (volume, minvol,
- name, vtype))
+ print(
+ "FAIL: Volume of %d is less than"
+ " minimum of %d for %s %s" %
+ (volume, minvol, name, vtype))
retval = 1
else:
- print ("PASS: Volume is %d for %s %s" % (volume, name,
- vtype))
+ print(
+ "PASS: Volume is %d for %s %s" %
+ (volume, name, vtype))
else:
- print("Unable to find volume information in the pacmd"\
- " list-%ss output for device %s.\npacmd output "\
- "was: %s" %
- (vtype, name, pacmd_entries), file=sys.stderr)
+ print(
+ "Unable to find volume information in the pacmd"
+ " list-%ss output for device %s.\npacmd output "
+ "was: %s" % (vtype, name, pacmd_entries), file=sys.stderr)
return 1
return retval
@@ -134,5 +143,6 @@ def main():
check_volume_retval = check_volume(args.minvol, args.maxvol)
return check_muted_retval or check_volume_retval
+
if __name__ == "__main__":
sys.exit(main())
diff --git a/bin/wifi_ap_wizard.py b/bin/wifi_ap_wizard.py
index 5506d01..9029040 100755
--- a/bin/wifi_ap_wizard.py
+++ b/bin/wifi_ap_wizard.py
@@ -61,5 +61,6 @@ def main():
raise SystemExit('Did not get prompted ("{}")'.format(prompt))
wizard.writeline(response)
+
if __name__ == '__main__':
main()
diff --git a/bin/wifi_client_test_netplan.py b/bin/wifi_client_test_netplan.py
index c74320e..6f8281c 100755
--- a/bin/wifi_client_test_netplan.py
+++ b/bin/wifi_client_test_netplan.py
@@ -21,8 +21,6 @@ import subprocess as sp
import textwrap
import time
import shutil
-from struct import pack
-from socket import inet_ntoa
import sys
print = functools.partial(print, flush=True)
@@ -135,7 +133,8 @@ def generate_test_config(interface, ssid, psk, address, dhcp):
nameservers: {}
Static IP no dhcp:
- >>> print(generate_test_config("eth0", "my_ap", "s3cr3t", "192.168.1.1", False))
+ >>> print(generate_test_config(
+ "eth0", "my_ap", "s3cr3t", "192.168.1.1", False))
# This is the network config written by checkbox
network:
version: 2
@@ -162,7 +161,8 @@ def generate_test_config(interface, ssid, psk, address, dhcp):
password = "password: " + psk
else:
password = ""
- return textwrap.dedent(np_cfg.format(interface, ssid, password, address, dhcp))
+ return textwrap.dedent(
+ np_cfg.format(interface, ssid, password, address, dhcp))
def write_test_config(config):
diff --git a/bin/wifi_master_mode.py b/bin/wifi_master_mode.py
index 3888caa..d52d5c0 100755
--- a/bin/wifi_master_mode.py
+++ b/bin/wifi_master_mode.py
@@ -67,7 +67,7 @@ class WifiMasterMode():
logging.info(output)
logging.info("AP successfully established.")
child.terminate()
- if child.poll() is not 0:
+ if child.poll() != 0:
output = child.stdout.read()
logging.error(log + output)
logging.error('AP failed to start')
diff --git a/bin/wifi_time2reconnect.py b/bin/wifi_time2reconnect.py
index d4ae2c0..a1b49b5 100755
--- a/bin/wifi_time2reconnect.py
+++ b/bin/wifi_time2reconnect.py
@@ -7,9 +7,8 @@ import time
import subprocess
from datetime import datetime
try:
- from subprocess import DEVNULL # >= python3.3
+ from subprocess import DEVNULL # >= python3.3
except ImportError:
- import os
DEVNULL = open(os.devnull, 'wb')
IFACE = None
@@ -21,7 +20,7 @@ def main():
Check the time needed to reconnect an active WIFI connection
"""
devices = subprocess.getoutput('nmcli dev')
- match = re.search('(\w+)\s+(802-11-wireless|wifi)\s+connected', devices)
+ match = re.search(r'(\w+)\s+(802-11-wireless|wifi)\s+connected', devices)
if match:
IFACE = match.group(1)
else:
@@ -46,7 +45,7 @@ def main():
return 1
subprocess.call(
- 'nmcli dev disconnect iface %s' %IFACE,
+ 'nmcli dev disconnect iface %s' % IFACE,
stdout=open(os.devnull, 'w'),
stderr=subprocess.STDOUT,
shell=True)
@@ -55,13 +54,13 @@ def main():
start = datetime.now()
subprocess.call(
- 'nmcli con up uuid %s --timeout %s' %(uuid, TIMEOUT),
+ 'nmcli con up uuid %s --timeout %s' % (uuid, TIMEOUT),
stdout=open(os.devnull, 'w'),
stderr=subprocess.STDOUT,
shell=True)
delta = datetime.now() - start
- print('%.2f Seconds' %delta.total_seconds())
+ print('%.2f Seconds' % delta.total_seconds())
return 0
diff --git a/bin/window_test.py b/bin/window_test.py
index ee79a89..895e2db 100755
--- a/bin/window_test.py
+++ b/bin/window_test.py
@@ -277,8 +277,7 @@ def print_move_window(iterations, timeout, *args):
for it in range(iterations):
print('Iteration %d of %d:' % (it + 1, iterations))
- exit_status = move_window('glxgears',
- timeout)
+ status = move_window('glxgears', timeout)
print('')
return status
@@ -290,7 +289,8 @@ def main():
'open-close-multi': print_open_close_multi,
'move': print_move_window}
- parser = ArgumentParser(description='Script that performs window operation')
+ parser = ArgumentParser(
+ description='Script that performs window operation')
parser.add_argument('-t', '--test',
default='all',
help='The name of the test to run. \
@@ -338,5 +338,6 @@ def main():
return status
+
if __name__ == '__main__':
exit(main())
diff --git a/bin/wwan_tests.py b/bin/wwan_tests.py
index df44cf6..9bbf1e7 100755
--- a/bin/wwan_tests.py
+++ b/bin/wwan_tests.py
@@ -278,7 +278,7 @@ class ThreeGppConnection():
_wwan_radio_on()
time.sleep(args.wwan_setup_time)
ret_code = _ping_test(args.wwan_net_if)
- except:
+ except subprocess.SubprocessError:
pass
_destroy_3gpp_connection()
_wwan_radio_off()
@@ -312,29 +312,6 @@ class Resources():
mm = MMDbus()
for m in mm.get_modem_ids():
print("mm_id: {}".format(m))
- # Removed ability to fetch supported RATs when adding CLI support
- # to this script. It is somewhat messy to scrape this from mmcli
- # output and it wasn't actually used in test definitions.
- #
- # cap_bits = mm.get_rat_support(m)
- # if cap_bits != 0:
- # if (cap_bits & MMModemCapability['MM_MODEM_CAPABILITY_POTS']):
- # print("pots: supported")
- # if (cap_bits &
- # MMModemCapability['MM_MODEM_CAPABILITY_CDMA_EVDO']):
- # print("cdma_evdo: supported")
- # if (cap_bits &
- # MMModemCapability['MM_MODEM_CAPABILITY_GSM_UMTS']):
- # print("gsm_umts: supported")
- # if (cap_bits &
- # MMModemCapability['MM_MODEM_CAPABILITY_LTE']):
- # print("lte: supported")
- # if (cap_bits &
- # MMModemCapability['MM_MODEM_CAPABILITY_LTE_ADVANCED']):
- # print("lte_advanced: supported")
- # if (cap_bits &
- # MMModemCapability['MM_MODEM_CAPABILITY_IRIDIUM']):
- # print("iridium: supported")
print("hw_id: {}".format(mm.get_equipment_id(m)))
print("manufacturer: {}".format(mm.get_manufacturer(m)))
print("model: {}".format(mm.get_model_name(m)))
diff --git a/bin/xrandr_cycle.py b/bin/xrandr_cycle.py
index 6743f6f..8d87217 100755
--- a/bin/xrandr_cycle.py
+++ b/bin/xrandr_cycle.py
@@ -117,11 +117,11 @@ if 'PLAINBOX_PROVIDER_DATA' in os.environ:
shutter_xml_template = os.path.join(os.environ['PLAINBOX_PROVIDER_DATA'],
"settings", "shutter.xml")
else:
- shutter_xml_template = os.path.join(os.path.split(os.path.dirname(
- os.path.realpath(__file__)))[0],
- "data",
- "settings",
- "shutter.xml")
+ shutter_xml_template = os.path.join(
+ os.path.split(os.path.dirname(os.path.realpath(__file__)))[0],
+ "data",
+ "settings",
+ "shutter.xml")
if args.keyword:
screenshot_path = screenshot_path + '_' + args.keyword
@@ -167,7 +167,7 @@ try:
'folder="%s"' % screenshot_path, content))
new_profile.close()
old_profile.close()
-except:
+except (IOError, OSError):
raise SystemExit("ERROR: While updating folder name "
"in shutter profile: {}".format(sys.exc_info()))
@@ -198,7 +198,7 @@ for mode in highest_modes:
print("""Could not capture screenshot -
you may need to install the package 'shutter'.""")
- except:
+ except (IOError, OSError):
print("""Could not configure screenshot tool -
you may need to install the package 'shutter',
or check that {}/{} exists and is writable.""".format(
@@ -220,7 +220,7 @@ try:
with tarfile.open(screenshot_path + '.tgz', 'w:gz') as screen_tar:
for screen in os.listdir(screenshot_path):
screen_tar.add(screenshot_path + '/' + screen, screen)
-except:
+except (IOError, OSError):
pass
# Output some fun facts and knock off for the day