summaryrefslogtreecommitdiff
diff options
authorPMR <pmr@pmr-lander>2020-07-17 10:01:55 +0000
committerPMR <pmr@pmr-lander>2020-07-17 10:01:55 +0000
commit247d2d56699ab3bae1653bd390d2d3924240b6c6 (patch)
treee2d5f7c03cf53eac373dbce70237e794688fe3e5
parent88e6c784f6c3212f076b3a7ec0d911fb4172db47 (diff)
parent7a017ea84cc35c77055a068858b5ecae8ed30d21 (diff)
Merge #387579 from ~sylvain-pineau/plainbox-provider-checkbox:fix_flake8_errors
-rwxr-xr-xbin/accelerometer_test.py31
-rwxr-xr-xbin/alsa_tests.py1
-rwxr-xr-xbin/ansi_parser.py3
-rwxr-xr-xbin/audio_driver_info.py7
-rwxr-xr-xbin/audio_test.py257
-rwxr-xr-xbin/battery_test.py8
-rwxr-xr-xbin/bluetooth_test.py1
-rwxr-xr-xbin/bmc_info.py10
-rwxr-xr-xbin/boot_mode_test_snappy.py1
-rwxr-xr-xbin/brightness_test.py21
-rwxr-xr-xbin/bt_connect.py1
-rwxr-xr-xbin/bt_list_adapters.py2
-rwxr-xr-xbin/color_depth_info.py3
-rwxr-xr-xbin/cpu_offlining.py1
-rwxr-xr-xbin/cpu_topology.py5
-rwxr-xr-xbin/cpuid.py6
-rwxr-xr-xbin/create_connection.py12
-rwxr-xr-xbin/dmitest.py32
-rwxr-xr-xbin/edid_cycle.py6
-rwxr-xr-xbin/evdev_touch_test.py2
-rwxr-xr-xbin/fan_reaction_test.py8
-rwxr-xr-xbin/frequency_governors_test.py255
-rwxr-xr-xbin/fresh_rate_info.py5
-rwxr-xr-xbin/get_make_and_model.py12
-rwxr-xr-xbin/gpu_test.py24
-rwxr-xr-xbin/graphic_memory_info.py7
-rwxr-xr-xbin/graphics_driver.py20
-rwxr-xr-xbin/graphics_modes_info.py10
-rwxr-xr-xbin/graphics_stress_test.py46
-rwxr-xr-xbin/gst_pipeline_test.py29
-rwxr-xr-xbin/hdd_parking.py10
-rwxr-xr-xbin/hotkey_tests.py1
-rwxr-xr-xbin/key_test.py20
-rwxr-xr-xbin/keyboard_test.py1
-rwxr-xr-xbin/lock_screen_watcher.py5
-rwxr-xr-xbin/lsmod_info.py1
-rwxr-xr-xbin/mac_passthrough.py4
-rwxr-xr-xbin/manage_compiz_plugin.py8
-rwxr-xr-xbin/memory_compare.py4
-rwxr-xr-xbin/memory_test.py4
-rwxr-xr-xbin/network.py25
-rwxr-xr-xbin/network_check.py8
-rwxr-xr-xbin/network_ntp_test.py12
-rwxr-xr-xbin/network_reconnect_resume_test.py10
-rwxr-xr-xbin/network_restart.py24
-rwxr-xr-xbin/optical_read_test.py29
-rwxr-xr-xbin/pm_log_check.py20
-rwxr-xr-xbin/pm_test.py27
-rwxr-xr-xbin/process_wait.py9
-rwxr-xr-xbin/pulse-active-port-change.py29
-rwxr-xr-xbin/removable_storage_test.py49
-rwxr-xr-xbin/removable_storage_watcher.py135
-rwxr-xr-xbin/resolution_test.py8
-rwxr-xr-xbin/rotation_test.py4
-rwxr-xr-xbin/sleep_test.py9
-rwxr-xr-xbin/sleep_test_log_check.py15
-rwxr-xr-xbin/sleep_time_check.py14
-rwxr-xr-xbin/stress_ng_test.py2
-rwxr-xr-xbin/test_bt_keyboard.py2
-rwxr-xr-xbin/testlib.py1450
-rwxr-xr-xbin/touchpad_driver_info.py4
-rwxr-xr-xbin/touchpad_test.py5
-rwxr-xr-xbin/virtualization.py29
-rwxr-xr-xbin/volume_test.py78
-rwxr-xr-xbin/wifi_ap_wizard.py1
-rwxr-xr-xbin/wifi_client_test_netplan.py8
-rwxr-xr-xbin/wifi_master_mode.py2
-rwxr-xr-xbin/wifi_time2reconnect.py11
-rwxr-xr-xbin/window_test.py7
-rwxr-xr-xbin/wwan_tests.py25
-rwxr-xr-xbin/xrandr_cycle.py16
-rwxr-xr-xrequirements/container-tests-provider-checkbox1
-rw-r--r--requirements/deb-base.txt1
73 files changed, 824 insertions, 2129 deletions
diff --git a/bin/accelerometer_test.py b/bin/accelerometer_test.py
index 2d95ca8..130e22d 100755
--- a/bin/accelerometer_test.py
+++ b/bin/accelerometer_test.py
@@ -23,6 +23,7 @@ The purpose of this script is to simply interact with an onboard
accelerometer, and check to be sure that the x, y, z axis respond
to physical movement of hardware.
'''
+
from argparse import ArgumentParser
import gi
import logging
@@ -34,9 +35,10 @@ import time
gi.require_version('Gdk', '3.0')
gi.require_version('GLib', '2.0')
gi.require_version("Gtk", "3.0")
-from gi.repository import Gdk, GLib, Gtk
-from subprocess import Popen, PIPE, check_output, STDOUT, CalledProcessError
-from checkbox_support.parsers.modinfo import ModinfoParser
+from gi.repository import Gdk, GLib, Gtk # noqa: E402
+from subprocess import Popen, PIPE, check_output, STDOUT # noqa: E402
+from subprocess import CalledProcessError # noqa: E402
+from checkbox_support.parsers.modinfo import ModinfoParser # noqa: E402
handler = logging.StreamHandler()
logger = logging.getLogger()
@@ -86,8 +88,8 @@ class AccelerometerUI(Gtk.Window):
def update_axis_icon(self, direction):
"""Change desired directional icon to checkmark"""
- exec('self.%s_icon.set_from_stock' % (direction) \
- + '(Gtk.STOCK_YES, size=Gtk.IconSize.BUTTON)')
+ exec('self.%s_icon.set_from_stock' % (direction) +
+ '(Gtk.STOCK_YES, size=Gtk.IconSize.BUTTON)')
def update_debug_label(self, text):
"""Update axis information in center of UI"""
@@ -131,7 +133,7 @@ class AxisData(threading.Thread):
self.x_test_pool = ["up", "down"]
self.y_test_pool = ["left", "right"]
- if self.ui == None:
+ if self.ui is None:
self.ui.enabled = False
def grab_current_readings(self):
@@ -223,8 +225,8 @@ class AxisData(threading.Thread):
def insert_supported_module(oem_module):
"""Try and insert supported module to see if we get any init errors"""
try:
- stream = check_output(['modinfo', oem_module], stderr=STDOUT,
- universal_newlines=True)
+ stream = check_output(
+ ['modinfo', oem_module], stderr=STDOUT, universal_newlines=True)
except CalledProcessError as err:
print("Error accessing modinfo for %s: " % oem_module, file=sys.stderr)
print(err.output, file=sys.stderr)
@@ -232,7 +234,7 @@ def insert_supported_module(oem_module):
parser = ModinfoParser(stream)
module = os.path.basename(parser.get_field('filename'))
-
+
insmod_output = Popen(['insmod %s' % module], stderr=PIPE,
shell=True, universal_newlines=True)
@@ -259,10 +261,10 @@ def check_module_status():
if "Permission denied" in error:
raise PermissionException(error)
- vendor_data = re.findall("Vendor:\s.*", output)
+ vendor_data = re.findall(r"Vendor:\s.*", output)
try:
manufacturer = vendor_data[0].split(":")[1].strip()
- except IndexError as exception:
+ except IndexError:
logging.error("Failed to find Manufacturing data")
return
@@ -275,8 +277,8 @@ def check_module_status():
oem_module = oem_driver_pool.get(vendor)
break # We've found our desired module to probe.
- if oem_module != None:
- if insert_supported_module(oem_module) != None:
+ if oem_module is not None:
+ if insert_supported_module(oem_module) is not None:
logging.error("Failed module insertion")
# Check dmesg status for supported module
driver_status = Popen(['dmesg'], stdout=PIPE, universal_newlines=True)
@@ -347,5 +349,6 @@ def main():
# Reading is not instant.
time.sleep(5)
+
if __name__ == '__main__':
- main();
+ main()
diff --git a/bin/alsa_tests.py b/bin/alsa_tests.py
index 16fb044..e919570 100755
--- a/bin/alsa_tests.py
+++ b/bin/alsa_tests.py
@@ -168,5 +168,6 @@ def main():
device = None
return(actions[args.action](args.duration, device))
+
if __name__ == '__main__':
sys.exit(main())
diff --git a/bin/ansi_parser.py b/bin/ansi_parser.py
index c7cf26c..b5cec30 100755
--- a/bin/ansi_parser.py
+++ b/bin/ansi_parser.py
@@ -128,7 +128,8 @@ def parse_filename(filename):
def main(args):
usage = "Usage: %prog [OPTIONS] [FILE...]"
parser = OptionParser(usage=usage)
- parser.add_option("-o", "--output",
+ parser.add_option(
+ "-o", "--output",
metavar="FILE",
help="File where to output the result.")
(options, args) = parser.parse_args(args)
diff --git a/bin/audio_driver_info.py b/bin/audio_driver_info.py
index b9bfaf8..424b125 100755
--- a/bin/audio_driver_info.py
+++ b/bin/audio_driver_info.py
@@ -41,7 +41,6 @@ class PacmdAudioDevice():
print("Error running %s:" % cmd, file=sys.stderr)
print(err.output, file=sys.stderr)
return None
-
if not stream:
print("Error: modinfo returned nothing", file=sys.stderr)
return None
@@ -74,7 +73,8 @@ def list_device_info():
pacmd_entries = check_output(["pacmd", "list-%ss" % vtype],
universal_newlines=True)
except Exception as e:
- print("Error when running pacmd list-%ss: %s" % (vtype, e),
+ print(
+ "Error when running pacmd list-%ss: %s" % (vtype, e),
file=sys.stderr)
return 1
@@ -102,8 +102,9 @@ def list_device_info():
def main():
parser = ArgumentParser("List audio device and driver information")
- args = parser.parse_args()
+ parser.parse_args()
return list_device_info()
+
if __name__ == "__main__":
sys.exit(main())
diff --git a/bin/audio_test.py b/bin/audio_test.py
index 7bd6c44..2a56df4 100755
--- a/bin/audio_test.py
+++ b/bin/audio_test.py
@@ -13,7 +13,7 @@ import time
try:
import gi
gi.require_version('GLib', '2.0')
- gi.require_version('Gst','1.0')
+ gi.require_version('Gst', '1.0')
from gi.repository import GObject
from gi.repository import Gst
from gi.repository import GLib
@@ -29,26 +29,27 @@ try:
except ImportError:
from collections import Callable # backward compatible
-#Frequency bands for FFT
+# Frequency bands for FFT
BINS = 256
-#How often to take a sample and do FFT on it.
+# How often to take a sample and do FFT on it.
FFT_INTERVAL = 100000000 # In nanoseconds, so this is every 1/10th second
-#Sampling frequency. The effective maximum frequency we can analyze is
-#half of this (see Nyquist's theorem)
+# Sampling frequency. The effective maximum frequency we can analyze is
+# half of this (see Nyquist's theorem)
SAMPLING_FREQUENCY = 44100
-#The default test frequency is in the middle of the band that contains 5000Hz
-#This frequency was determined experimentally to be high enough but more
-#reliable than others we tried.
+# The default test frequency is in the middle of the band that contains 5000Hz
+# This frequency was determined experimentally to be high enough but more
+# reliable than others we tried.
DEFAULT_TEST_FREQUENCY = 5035
-#only sample a signal when peak level is in this range (in dB attenuation,
-#0 means no attenuation (and horrible clipping).
+# only sample a signal when peak level is in this range (in dB attenuation,
+# 0 means no attenuation (and horrible clipping).
REC_LEVEL_RANGE = (-2.0, -12.0)
-#For our test signal to be considered present, it has to be this much higher
-#than the base level (minimum magnitude). This is in dB.
+# For our test signal to be considered present, it has to be this much higher
+# than the base level (minimum magnitude). This is in dB.
MAGNITUDE_THRESHOLD = 2.5
-#Volume for the sample tone (in %)
+# Volume for the sample tone (in %)
PLAY_VOLUME = 70
+
class PIDController(object):
""" A Proportional-Integrative-Derivative controller (PID) controls a
process's output to try to maintain a desired output value (known as
@@ -141,7 +142,7 @@ class PAVolumeController(object):
'set-%s-volume' % (self.pa_types[self.type]),
str(self.identifier[0]),
str(int(volume)) + "%"]
- if False == self.method(command):
+ if not self.method(command):
return False
self._volume = volume
return True
@@ -159,7 +160,7 @@ class PAVolumeController(object):
'set-%s-mute' % (self.pa_types[self.type]),
str(self.identifier[0]),
mute]
- if False == self.method(command):
+ if not self.method(command):
return False
return True
@@ -187,10 +188,10 @@ class PAVolumeController(object):
return None
command = ['pactl', 'list', self.pa_types[type] + "s", 'short']
- #Expect lines of this form (field separator is tab):
- #<ID>\t<NAME>\t<MODULE>\t<SAMPLE_SPEC_WITH_SPACES>\t<STATE>
- #What we need to return is the ID for the first element on this list
- #that does not contain auto_null or monitor.
+ # Expect lines of this form (field separator is tab):
+ # <ID>\t<NAME>\t<MODULE>\t<SAMPLE_SPEC_WITH_SPACES>\t<STATE>
+ # What we need to return is the ID for the first element on this list
+ # that does not contain auto_null or monitor.
pa_info = self.method(command)
valid_elements = None
@@ -203,17 +204,17 @@ class PAVolumeController(object):
self.logger.error("No valid PulseAudio elements"
" for %s" % (self.type))
return None
- #We only need the pulseaudio numeric ID and long name for each element
+ # We only need the pulseaudio numeric ID and long name for each element
valid_elements = [(int(e.split()[0]), e.split()[1])
for e in valid_elements]
return valid_elements[0]
def _pactl_output(self, command):
- #This method mainly calls pactl (hence the name). Since pactl may
- #return a failure if the audio layer is not yet initialized, we will
- #try running a few times in case of failure. All our invocations of
- #pactl should be "idempotent" so repeating them should not have
- #any bad effects.
+ # This method mainly calls pactl (hence the name). Since pactl may
+ # return a failure if the audio layer is not yet initialized, we will
+ # try running a few times in case of failure. All our invocations of
+ # pactl should be "idempotent" so repeating them should not have
+ # any bad effects.
for attempt in range(0, 3):
try:
return subprocess.check_output(command,
@@ -242,8 +243,8 @@ class SpectrumAnalyzer(object):
self.number_of_samples = 0
self.wanted_samples = wanted_samples
self.sampling_frequency = sampling_frequency
- #Frequencies should contain *real* frequency which is half of
- #the sampling frequency
+ # Frequencies should contain *real* frequency which is half of
+ # the sampling frequency
self.frequencies = [((sampling_frequency / 2.0) / points) * i
for i in range(points)]
@@ -259,14 +260,14 @@ class SpectrumAnalyzer(object):
self.number_of_samples += 1
def frequencies_with_peak_magnitude(self, threshold=1.0):
- #First establish the base level
+ # First establish the base level
per_magnitude_bins = collections.defaultdict(int)
for magnitude in self.spectrum:
per_magnitude_bins[magnitude] += 1
base_level = max(per_magnitude_bins,
key=lambda x: per_magnitude_bins[x])
- #Now return all values that are higher (more positive)
- #than base_level + threshold
+ # Now return all values that are higher (more positive)
+ # than base_level + threshold
peaks = []
for i in range(1, len(self.spectrum) - 1):
first_index = i - 1
@@ -282,10 +283,10 @@ class SpectrumAnalyzer(object):
"""Convenience function to tell me which band
a frequency is contained in
"""
- #Note that actual frequencies are half of what the sampling
- #frequency would tell us. If SF is 44100 then maximum actual
- #frequency is 22050, and if I have 10 frequency bins each will
- #contain only 2205 Hz, not 4410 Hz.
+ # Note that actual frequencies are half of what the sampling
+ # frequency would tell us. If SF is 44100 then maximum actual
+ # frequency is 22050, and if I have 10 frequency bins each will
+ # contain only 2205 Hz, not 4410 Hz.
max_frequency = self.sampling_frequency / 2
if frequency > max_frequency or frequency < 0:
return None
@@ -339,34 +340,33 @@ class GStreamerMessageHandler(object):
if message.type == Gst.MessageType.ELEMENT:
message_name = message.get_structure().get_name()
if message_name == 'spectrum':
- #TODO: Due to an upstream bug, a structure's get_value method
- #doesn't work if the value in question is an array (as is the
- #case with the magnitudes).
- #https://bugzilla.gnome.org/show_bug.cgi?id=693168
- #We have to resort to parsing the string representation of the
- #structure. It's an ugly hack but it works.
- #Ideally we'd be able to say this to get fft_magnitudes:
- #message.get_structure.get_value('magnitude').
- #If an upstream fix ever makes it into gstreamer,
- #remember to remove this hack and the parse_spectrum
- #method
+ # TODO: Due to an upstream bug, a structure's get_value method
+ # doesn't work if the value in question is an array (as is the
+ # case with the magnitudes).
+ # https://bugzilla.gnome.org/show_bug.cgi?id=693168
+ # We have to resort to parsing the string representation of the
+ # structure. It's an ugly hack but it works.
+ # Ideally we'd be able to say this to get fft_magnitudes:
+ # message.get_structure.get_value('magnitude').
+ # If an upstream fix ever makes it into gstreamer,
+ # remember to remove this hack and the parse_spectrum method
struct_string = message.get_structure().to_string()
structure = parse_spectrum_message_structure(struct_string)
fft_magnitudes = structure['magnitude']
self.spectrum_method(self.spectrum_analyzer, fft_magnitudes)
if message_name == 'level':
- #peak_value is our process feedback
- #It's returned as an array, so I need the first (and only)
- #element
+ # peak_value is our process feedback
+ # It's returned as an array, so I need the first (and only)
+ # element
peak_value = message.get_structure().get_value('peak')[0]
self.level_method(peak_value, self.pid_controller,
self.volume_controller)
- #Adjust recording level
+ # Adjust recording level
def level_method(self, level, pid_controller, volume_controller):
- #If volume controller doesn't return a valid volume,
- #we can't control it :(
+ # If volume controller doesn't return a valid volume,
+ # we can't control it :(
current_volume = volume_controller.get_volume()
if current_volume is None:
self.logger.error("Unable to control recording volume."
@@ -375,19 +375,20 @@ class GStreamerMessageHandler(object):
self.current_level = level
change = pid_controller.input_change(level, 0.10)
if self.logger:
- self.logger.debug("Peak level: %(peak_level).2f, "
- "volume: %(volume)d%%, Volume change: %(change)f%%" %
- {'peak_level': level,
- 'change': change,
- 'volume': current_volume})
+ self.logger.debug(
+ "Peak level: %(peak_level).2f, "
+ "volume: %(volume)d%%, Volume change: %(change)f%%" % {
+ 'peak_level': level,
+ 'change': change,
+ 'volume': current_volume})
volume_controller.set_volume(current_volume + change)
- #Only sample if level is within the threshold
+ # Only sample if level is within the threshold
def spectrum_method(self, analyzer, spectrum):
if self.rec_level_range[1] <= self.current_level \
or self.current_level <= self.rec_level_range[0]:
- self.logger.debug("Sampling, recorded %d samples" %
- analyzer.number_of_samples)
+ self.logger.debug(
+ "Sampling, recorded %d samples" % analyzer.number_of_samples)
analyzer.sample(spectrum)
if analyzer.sampling_complete() and self._quit_method:
self.logger.info("Sampling complete, ending process")
@@ -414,10 +415,11 @@ class GstAudioObject(object):
class Player(GstAudioObject):
def __init__(self, frequency=DEFAULT_TEST_FREQUENCY, logger=None):
super(Player, self).__init__()
- self.pipeline_description = ("audiotestsrc wave=sine freq=%s "
- "! audioconvert "
- "! audioresample "
- "! autoaudiosink" % int(frequency))
+ self.pipeline_description = (
+ "audiotestsrc wave=sine freq=%s "
+ "! audioconvert "
+ "! audioresample "
+ "! autoaudiosink" % int(frequency))
self.logger = logger
if self.logger:
self.logger.debug(self.pipeline_description)
@@ -437,11 +439,11 @@ class Recorder(GstAudioObject):
! audioresample
! spectrum interval=%(fft_interval)s bands = %(bands)s
! wavenc
- ! filesink location=%(file)s''' %
- {'bands': bins,
- 'rate': sampling_frequency,
- 'fft_interval': fft_interval,
- 'file': output_file})
+ ! filesink location=%(file)s''' % {
+ 'bands': bins,
+ 'rate': sampling_frequency,
+ 'fft_interval': fft_interval,
+ 'file': output_file})
self.logger = logger
if self.logger:
self.logger.debug(pipeline_description)
@@ -457,25 +459,25 @@ class Recorder(GstAudioObject):
def parse_spectrum_message_structure(struct_string):
- #First let's jsonize this
- #This is the message name, which we don't need
+ # First let's jsonize this
+ # This is the message name, which we don't need
text = struct_string.replace("spectrum, ", "")
- #name/value separator in json is : and not =
- text = text.replace("=",": ")
- #Mutate the {} array notation from the structure to
- #[] notation for json.
- text = text.replace("{","[")
- text = text.replace("}","]")
- #Remove a few stray semicolons that aren't needed
- text = text.replace(";","")
- #Remove the data type fields, as json doesn't need them
+ # name/value separator in json is : and not =
+ text = text.replace("=", ": ")
+ # Mutate the {} array notation from the structure to
+ # [] notation for json.
+ text = text.replace("{", "[")
+ text = text.replace("}", "]")
+ # Remove a few stray semicolons that aren't needed
+ text = text.replace(";", "")
+ # Remove the data type fields, as json doesn't need them
text = re.sub(r"\(.+?\)", "", text)
- #double-quote the identifiers
+ # double-quote the identifiers
text = re.sub(r"([\w-]+):", r'"\1":', text)
- #Wrap the whole thing in brackets
+ # Wrap the whole thing in brackets
text = ("{"+text+"}")
- #Try to parse and return something sensible here, even if
- #the data was unparsable.
+ # Try to parse and return something sensible here, even if
+ # the data was unparsable.
try:
return json.loads(text)
except ValueError:
@@ -489,32 +491,38 @@ def process_arguments():
presence of the played frequency, if present it exits with success.
"""
parser = argparse.ArgumentParser(description=description)
- parser.add_argument("-t", "--time",
+ parser.add_argument(
+ "-t", "--time",
dest='test_duration',
action='store',
default=30,
type=int,
help="""Maximum test duration, default %(default)s seconds.
It may exit sooner if it determines it has enough data.""")
- parser.add_argument("-a", "--audio",
+ parser.add_argument(
+ "-a", "--audio",
action='store',
default="/dev/null",
type=str,
help="File to save recorded audio in .wav format")
- parser.add_argument("-q", "--quiet",
+ parser.add_argument(
+ "-q", "--quiet",
action='store_true',
default=False,
help="Be quiet, no output unless there's an error.")
- parser.add_argument("-d", "--debug",
+ parser.add_argument(
+ "-d", "--debug",
action='store_true',
default=False,
help="Debugging output")
- parser.add_argument("-f", "--frequency",
+ parser.add_argument(
+ "-f", "--frequency",
action='store',
default=DEFAULT_TEST_FREQUENCY,
type=int,
help="Frequency for test signal, default %(default)s Hz")
- parser.add_argument("-u", "--spectrum",
+ parser.add_argument(
+ "-u", "--spectrum",
action='store',
type=str,
help="""File to save spectrum information for plotting
@@ -524,10 +532,10 @@ def process_arguments():
#
def main():
- #Get arguments.
+ # Get arguments.
args = process_arguments()
- #Setup logging
+ # Setup logging
level = logging.INFO
if args.debug:
level = logging.DEBUG
@@ -535,56 +543,57 @@ def main():
level = logging.ERROR
logging.basicConfig(level=level)
try:
- #Launches recording pipeline. I need to hook up into the gst
- #messages.
+ # Launches recording pipeline. I need to hook up into the gst
+ # messages.
recorder = Recorder(output_file=args.audio, logger=logging)
- #Just launches the playing pipeline
+ # Just launches the playing pipeline
player = Player(frequency=args.frequency, logger=logging)
except GObject.GError as excp:
logging.critical("Unable to initialize GStreamer pipelines: %s", excp)
sys.exit(127)
- #This just receives a process feedback and tells me how much to change to
- #achieve the setpoint
+ # This just receives a process feedback and tells me how much to change to
+ # achieve the setpoint
pidctrl = PIDController(Kp=0.7, Ki=.01, Kd=0.01,
setpoint=REC_LEVEL_RANGE[0])
pidctrl.set_change_limit(5)
- #This gathers spectrum data.
+ # This gathers spectrum data.
analyzer = SpectrumAnalyzer(points=BINS,
sampling_frequency=SAMPLING_FREQUENCY)
- #Volume controllers actually set volumes for their device types.
- #we should at least issue a warning
+ # Volume controllers actually set volumes for their device types.
+ # we should at least issue a warning
recorder.volumecontroller = PAVolumeController(type='input',
logger=logging)
if not recorder.volumecontroller.get_identifier():
- logging.warning("Unable to get input volume control identifier. "
- "Test results will probably be invalid")
+ logging.warning(
+ "Unable to get input volume control identifier. "
+ "Test results will probably be invalid")
recorder.volumecontroller.set_volume(0)
recorder.volumecontroller.mute(False)
player.volumecontroller = PAVolumeController(type='output',
logger=logging)
if not player.volumecontroller.get_identifier():
- logging.warning("Unable to get output volume control identifier. "
- "Test results will probably be invalid")
+ logging.warning(
+ "Unable to get output volume control identifier. "
+ "Test results will probably be invalid")
player.volumecontroller.set_volume(PLAY_VOLUME)
player.volumecontroller.mute(False)
- #This handles the messages from gstreamer and orchestrates
- #the passed volume controllers, pid controller and spectrum analyzer
- #accordingly.
+ # This handles the messages from gstreamer and orchestrates
+ # the passed volume controllers, pid controller and spectrum analyzer
+ # accordingly.
gmh = GStreamerMessageHandler(rec_level_range=REC_LEVEL_RANGE,
logger=logging,
volumecontroller=recorder.volumecontroller,
pidcontroller=pidctrl,
spectrum_analyzer=analyzer)
- #I need to tell the recorder which method will handle messages.
+ # I need to tell the recorder which method will handle messages.
recorder.register_message_handler(gmh.bus_message_handler)
- #Create the loop and add a few triggers
-# GObject.threads_init() #Not needed?
+ # Create the loop and add a few triggers
loop = GLib.MainLoop()
GLib.timeout_add_seconds(0, player.start)
GLib.timeout_add_seconds(0, recorder.start)
@@ -595,29 +604,32 @@ def main():
loop.run()
- #When the loop ends, set things back to reasonable states
+ # When the loop ends, set things back to reasonable states
player.stop()
recorder.stop()
player.volumecontroller.set_volume(50)
recorder.volumecontroller.set_volume(10)
- #See if data gathering was successful.
+ # See if data gathering was successful.
test_band = analyzer.frequency_band_for(args.frequency)
- candidate_bands = analyzer.frequencies_with_peak_magnitude(MAGNITUDE_THRESHOLD)
+ candidate_bands = analyzer.frequencies_with_peak_magnitude(
+ MAGNITUDE_THRESHOLD)
for band in candidate_bands:
logging.debug("Band (%.2f,%.2f) contains a magnitude peak" %
analyzer.frequencies_for_band(band))
if test_band in candidate_bands:
freqs_for_band = analyzer.frequencies_for_band(test_band)
- logging.info("PASS: Test frequency of %s in band (%.2f, %.2f) "
- "which contains a magnitude peak" %
+ logging.info(
+ "PASS: Test frequency of %s in band (%.2f, %.2f) "
+ "which contains a magnitude peak" %
((args.frequency,) + freqs_for_band))
return_value = 0
else:
- logging.info("FAIL: Test frequency of %s is not in one of the "
- "bands with magnitude peaks" % args.frequency)
+ logging.info(
+ "FAIL: Test frequency of %s is not in one of the "
+ "bands with magnitude peaks" % args.frequency)
return_value = 1
- #Is the microphone broken?
+ # Is the microphone broken?
if len(set(analyzer.spectrum)) <= 1:
logging.info("WARNING: Microphone seems broken, didn't even "
"record ambient noise")
@@ -625,14 +637,17 @@ def main():
if args.spectrum:
logging.info("Saving spectrum data for plotting as %s" %
args.spectrum)
- if not FileDumper().write_to_file(args.spectrum,
- ["%s,%s" % t for t in
- zip(analyzer.frequencies,
- analyzer.spectrum)]):
+ if (
+ not FileDumper().write_to_file(
+ args.spectrum,
+ ["%s,%s" % t for t in zip(
+ analyzer.frequencies, analyzer.spectrum)])
+ ):
logging.error("Couldn't save spectrum data for plotting",
file=sys.stderr)
return return_value
+
if __name__ == "__main__":
sys.exit(main())
diff --git a/bin/battery_test.py b/bin/battery_test.py
index b2682c7..112ef5e 100755
--- a/bin/battery_test.py
+++ b/bin/battery_test.py
@@ -1,14 +1,13 @@
#!/usr/bin/env python3
import gi
-import os
import time
import re
import subprocess
import sys
import argparse
gi.require_version('Gio', '2.0')
-from gi.repository import Gio
+from gi.repository import Gio # noqa: E402
class Battery():
@@ -89,10 +88,10 @@ def get_battery_state():
def validate_battery_info(battery):
if battery is None:
- print ("Error obtaining battery info")
+ print("Error obtaining battery info")
return False
if battery._state != "discharging":
- print ("Error: battery is not discharging, test will not be valid")
+ print("Error: battery is not discharging, test will not be valid")
return False
return True
@@ -173,5 +172,6 @@ def main():
return(battery_life(battery_before, battery_after, test_time))
+
if __name__ == "__main__":
sys.exit(main())
diff --git a/bin/bluetooth_test.py b/bin/bluetooth_test.py
index 8fafad6..fd3c956 100755
--- a/bin/bluetooth_test.py
+++ b/bin/bluetooth_test.py
@@ -145,5 +145,6 @@ def main():
format='%(levelname)s: %(message)s')
return getattr(ObexFTPTest(args.file.name, args.btaddr), args.action)()
+
if __name__ == "__main__":
sys.exit(main())
diff --git a/bin/bmc_info.py b/bin/bmc_info.py
index d7aec9b..674d7e8 100755
--- a/bin/bmc_info.py
+++ b/bin/bmc_info.py
@@ -4,6 +4,7 @@ import sys
import shlex
from subprocess import check_output, CalledProcessError
+
def main():
# First, we need to get output
cmd = "ipmitool mc info"
@@ -14,7 +15,7 @@ def main():
file=sys.stderr)
return 1
except CalledProcessError as e:
- print("Problem running %s. Error was %s" % (cmd,e),file=sys.stderr)
+ print("Problem running %s. Error was %s" % (cmd, e), file=sys.stderr)
return 1
result = result.split('\n')
@@ -47,12 +48,11 @@ def main():
if type(data[field]) is list:
# Sometimes the first item in the list is ''. This will remove it
data[field].remove('')
- print(field.ljust(30),':',data[field].pop(0))
+ print(field.ljust(30), ':', data[field].pop(0))
for item in data[field]:
- print(' '.ljust(32),item)
+ print(' '.ljust(32), item)
else:
- print(field.ljust(30),":",data[field])
- #print("{}:\t{}".format(field, data[field]))
+ print(field.ljust(30), ":", data[field])
return 0
diff --git a/bin/boot_mode_test_snappy.py b/bin/boot_mode_test_snappy.py
index 864be29..23b30e4 100755
--- a/bin/boot_mode_test_snappy.py
+++ b/bin/boot_mode_test_snappy.py
@@ -14,7 +14,6 @@ import tempfile
import yaml
-from checkbox_support.parsers.kernel_cmdline import parse_kernel_cmdline
from checkbox_support.snap_utils.system import get_lk_bootimg_path
diff --git a/bin/brightness_test.py b/bin/brightness_test.py
index 52d478a..e3a8e90 100755
--- a/bin/brightness_test.py
+++ b/bin/brightness_test.py
@@ -28,7 +28,6 @@ import sys
import os
import time
-from sys import stdout, stderr
from glob import glob
@@ -42,7 +41,7 @@ class Brightness(object):
# See if the source is a file or a file object
# and act accordingly
file = path
- if file == None:
+ if file is None:
lines_list = []
else:
# It's a file
@@ -102,8 +101,10 @@ class Brightness(object):
Note: this doesn't guarantee that screen brightness
changed.
'''
- if (abs(self.get_actual_brightness(interface) -
- self.get_last_set_brightness(interface)) > 1):
+ if (
+ abs(self.get_actual_brightness(interface) -
+ self.get_last_set_brightness(interface)) > 1
+ ):
return 1
else:
return 0
@@ -132,9 +133,9 @@ def main():
max_brightness = brightness.get_max_brightness(interface)
# Set the brightness to half the max value
- brightness.write_value(max_brightness / 2,
- os.path.join(interface,
- 'brightness'))
+ brightness.write_value(
+ max_brightness / 2,
+ os.path.join(interface, 'brightness'))
# Check that "actual_brightness" reports the same value we
# set "brightness" to
@@ -144,9 +145,9 @@ def main():
time.sleep(2)
# Set the brightness back to its original value
- brightness.write_value(current_brightness,
- os.path.join(interface,
- 'brightness'))
+ brightness.write_value(
+ current_brightness,
+ os.path.join(interface, 'brightness'))
exit(exit_status)
diff --git a/bin/bt_connect.py b/bin/bt_connect.py
index 1fed30e..8d6757c 100755
--- a/bin/bt_connect.py
+++ b/bin/bt_connect.py
@@ -131,5 +131,6 @@ def main():
# capture all other silence failures
return 1
+
if __name__ == "__main__":
sys.exit(main())
diff --git a/bin/bt_list_adapters.py b/bin/bt_list_adapters.py
index e791442..c8db787 100755
--- a/bin/bt_list_adapters.py
+++ b/bin/bt_list_adapters.py
@@ -38,7 +38,7 @@ def main():
with open(namef, 'r') as f:
name = f.read().strip()
print(rfdev, name)
- if found_adatper == False:
+ if found_adatper is False:
raise SystemExit('No bluetooth adatpers registered with rfkill')
diff --git a/bin/color_depth_info.py b/bin/color_depth_info.py
index 129f051..fcab953 100755
--- a/bin/color_depth_info.py
+++ b/bin/color_depth_info.py
@@ -53,7 +53,7 @@ def get_color_depth(log_dir='/var/log/'):
return (depth, pixmap_format)
with open(current_log, 'rb') as stream:
- for match in re.finditer('Depth (\d+) pixmap format is (\d+) bpp',
+ for match in re.finditer(r'Depth (\d+) pixmap format is (\d+) bpp',
str(stream.read())):
depth = int(match.group(1))
pixmap_format = int(match.group(2))
@@ -72,5 +72,6 @@ def main():
return 1
return 0
+
if __name__ == '__main__':
sys.exit(main())
diff --git a/bin/cpu_offlining.py b/bin/cpu_offlining.py
index 5616e30..3d269ac 100755
--- a/bin/cpu_offlining.py
+++ b/bin/cpu_offlining.py
@@ -68,5 +68,6 @@ def main():
print(' '.join(failed_onlines))
return 1
+
if __name__ == '__main__':
main()
diff --git a/bin/cpu_topology.py b/bin/cpu_topology.py
index 0ce68c5..425e3c4 100755
--- a/bin/cpu_topology.py
+++ b/bin/cpu_topology.py
@@ -21,8 +21,8 @@ class proc_cpuinfo():
finally:
cpu_fh.close()
- r_s390 = re.compile("processor [0-9]")
- r_x86 = re.compile("processor\s+:")
+ r_s390 = re.compile(r"processor [0-9]")
+ r_x86 = re.compile(r"processor\s+:")
for i in temp:
# Handle s390 first
if r_s390.match(i):
@@ -111,5 +111,6 @@ def main():
else:
return 0
+
if __name__ == '__main__':
sys.exit(main())
diff --git a/bin/cpuid.py b/bin/cpuid.py
index d0cdea9..6543dc4 100755
--- a/bin/cpuid.py
+++ b/bin/cpuid.py
@@ -25,10 +25,9 @@
# Modifications: 2019 Jeffrey Lane (jeffrey.lane@canonical.com)
import ctypes
-import os
import platform
import sys
-from ctypes import (c_uint32, c_int, c_long, c_ulong, c_size_t, c_void_p,
+from ctypes import (c_uint32, c_int, c_size_t, c_void_p,
POINTER, CFUNCTYPE)
from subprocess import check_output
@@ -84,7 +83,8 @@ CPUIDS = {
"Broadwell": ['0x4067', '0x306d4', '0x5066', '0x406f'],
"Canon Lake": ['0x6066'],
"Cascade Lake": ['0x50655', '0x50656', '0x50657'],
- "Coffee Lake": ['0x806ea', '0x906ea', '0x906eb', '0x906ec', '0x906ed'],
+ "Coffee Lake": [
+ '0x806ea', '0x906ea', '0x906eb', '0x906ec', '0x906ed'],
"Haswell": ['0x306c', '0x4065', '0x4066', '0x306f'],
"Ice Lake": ['0x706e'],
"Ivy Bridge": ['0x306a', '0x306e'],
diff --git a/bin/create_connection.py b/bin/create_connection.py
index 71e9572..9dc977e 100755
--- a/bin/create_connection.py
+++ b/bin/create_connection.py
@@ -6,9 +6,8 @@ import time
from subprocess import check_call, check_output, CalledProcessError
try:
- from subprocess import DEVNULL # >= python3.3
+ from subprocess import DEVNULL # >= python3.3
except ImportError:
- import os
DEVNULL = open(os.devnull, 'wb')
from uuid import uuid4
@@ -154,7 +153,7 @@ def block_until_created(connection, retries, interval):
sys.exit(1)
else:
try:
- nmcli_con_up = check_call(['nmcli', 'con', 'up', 'id', connection])
+ check_call(['nmcli', 'con', 'up', 'id', connection])
print("Connection %s activated." % connection)
except CalledProcessError as error:
print("Failed to activate %s." % connection, file=sys.stderr)
@@ -168,8 +167,8 @@ def write_connection_file(name, connection_info):
os.fchmod(connection_file.fileno(), 0o600)
connection_file.close()
except IOError:
- print("Can't write to " + CONNECTIONS_PATH + name
- + ". Is this command being run as root?", file=sys.stderr)
+ print("Can't write to " + CONNECTIONS_PATH + name +
+ ". Is this command being run as root?", file=sys.stderr)
sys.exit(1)
@@ -216,7 +215,7 @@ def create_mobilebroadband_connection(args):
args.apn,
args.username,
args.password,
- args.pin)
+ args.pin)
if args.type == 'cdma':
mobilebroadband_connection += mobilebroadband_ppp_section()
@@ -288,5 +287,6 @@ def main():
# Make sure we don't exit until the connection is fully created
block_until_created(connection_name, args.retries, args.interval)
+
if __name__ == "__main__":
main()
diff --git a/bin/dmitest.py b/bin/dmitest.py
index 642af3a..2be50cc 100755
--- a/bin/dmitest.py
+++ b/bin/dmitest.py
@@ -146,26 +146,26 @@ def standard_tests(args, stream, dmi_data):
if find_in_section(stream, dmi_data, 'Chassis Information',
'Manufacturer:',
['empty', 'chassis manufacture', 'null', 'insyde',
- 'to be filled by o\.e\.m\.', 'no enclosure',
- '\.\.\.\.\.'], True):
+ r'to be filled by o\.e\.m\.', 'no enclosure',
+ r'\.\.\.\.\.'], True):
dmi_data['Chassis Information']['Manufacturer'] += \
" *** Invalid chassis manufacturer!"
retval += 1
if find_in_section(stream, dmi_data, 'System Information', 'Manufacturer:',
['system manufacture', 'insyde', 'standard',
- 'to be filled by o\.e\.m\.', 'no enclosure'], True):
+ r'to be filled by o\.e\.m\.', 'no enclosure'], True):
dmi_data['System Information']['Manufacturer'] += \
" *** Invalid system manufacturer!"
retval += 1
if find_in_section(stream, dmi_data, 'Base Board Information',
'Manufacturer:',
- ['to be filled by o\.e\.m\.'], True):
+ [r'to be filled by o\.e\.m\.'], True):
dmi_data['Base Board Information']['Manufacturer'] += \
" *** Invalid base board manufacturer!"
retval += 1
if find_in_section(stream, dmi_data, 'System Information',
'Product Name:',
- ['system product name', 'to be filled by o\.e\.m\.'],
+ ['system product name', r'to be filled by o\.e\.m\.'],
False):
dmi_data['System Information']['Product Name'] += \
" *** Invalid system product name!"
@@ -173,7 +173,7 @@ def standard_tests(args, stream, dmi_data):
if find_in_section(stream, dmi_data, 'Base Board Information',
'Product Name:',
['base board product name',
- 'to be filled by o\.e\.m\.'], False):
+ r'to be filled by o\.e\.m\.'], False):
dmi_data['Base Board Information']['Product Name'] += \
" *** Invalid base board product name!"
retval += 1
@@ -193,21 +193,21 @@ def version_tests(args, stream, dmi_data):
"""
retval = 0
if find_in_section(stream, dmi_data, 'Chassis Information', 'Version:',
- ['to be filled by o\.e\.m\.', 'empty', 'x\.x'],
+ [r'to be filled by o\.e\.m\.', 'empty', r'x\.x'],
False):
dmi_data['Chassis Information']['Version'] += \
" *** Invalid chassis version!"
retval += 1
if find_in_section(stream, dmi_data, 'System Information', 'Version:',
- ['to be filled by o\.e\.m\.', '\(none\)',
+ [r'to be filled by o\.e\.m\.', r'\(none\)',
'null', 'system version', 'not applicable',
- '\.\.\.\.\.'], False):
+ r'\.\.\.\.\.'], False):
dmi_data['System Information']['Version'] += \
" *** Invalid system information version!"
retval += 1
if find_in_section(stream, dmi_data, 'Base Board Information', 'Version:',
- ['base board version', 'x\.x',
- 'empty', 'to be filled by o\.e\.m\.'], False):
+ ['base board version', r'x\.x',
+ 'empty', r'to be filled by o\.e\.m\.'], False):
dmi_data['Base Board Information']['Version'] += \
" *** Invalid base board version!"
retval += 1
@@ -228,8 +228,8 @@ def serial_tests(args, stream, dmi_data):
retval = 0
if find_in_section(stream, dmi_data, 'System Information',
'Serial Number:',
- ['to be filled by o\.e\.m\.',
- 'system serial number', '\.\.\.\.\.'],
+ [r'to be filled by o\.e\.m\.',
+ 'system serial number', r'\.\.\.\.\.'],
False):
dmi_data['System Information']['Serial Number'] += \
" *** Invalid system information serial number!"
@@ -237,8 +237,8 @@ def serial_tests(args, stream, dmi_data):
if find_in_section(stream, dmi_data, 'Base Board Information',
'Serial Number:',
['n/a', 'base board serial number',
- 'to be filled by o\.e\.m\.',
- 'empty', '\.\.\.'],
+ r'to be filled by o\.e\.m\.',
+ 'empty', r'\.\.\.'],
False):
dmi_data['Base Board Information']['Serial Number'] += \
" *** Invalid base board serial number!"
@@ -306,7 +306,7 @@ def main():
if args.test_serials:
retval += serial_tests(args, stream, dmi_data)
if find_in_section(stream, dmi_data, 'Processor Information', 'Version:',
- ['sample', 'Genuine Intel\(R\) CPU 0000'], False):
+ ['sample', r'Genuine Intel\(R\) CPU 0000'], False):
dmi_data['Processor Information']['Version'] += \
" *** Invalid processor information!"
retval += 1
diff --git a/bin/edid_cycle.py b/bin/edid_cycle.py
index a6daf1c..e327cb0 100755
--- a/bin/edid_cycle.py
+++ b/bin/edid_cycle.py
@@ -21,7 +21,7 @@ def check_resolution():
output = subprocess.check_output('xdpyinfo')
for line in output.decode(sys.stdout.encoding).splitlines():
if 'dimensions' in line:
- match = re.search('(\d+)x(\d+)\ pixels', line)
+ match = re.search(r'(\d+)x(\d+)\ pixels', line)
if match and len(match.groups()) == 2:
return '{}x{}'.format(*match.groups())
@@ -29,7 +29,8 @@ def check_resolution():
def change_edid(host, edid_file):
with open(edid_file, 'rb') as f:
cmd = ['ssh', host, '/snap/bin/pigbox', 'run',
- '\'v4l2-ctl --set-edid=file=-,format=raw --fix-edid-checksums\'']
+ '\'v4l2-ctl --set-edid=file=-,'
+ 'format=raw --fix-edid-checksums\'']
subprocess.check_output(cmd, input=f.read())
@@ -52,5 +53,6 @@ def main():
print('PASS')
return failed
+
if __name__ == '__main__':
raise SystemExit(main())
diff --git a/bin/evdev_touch_test.py b/bin/evdev_touch_test.py
index 5649701..bc52cc4 100755
--- a/bin/evdev_touch_test.py
+++ b/bin/evdev_touch_test.py
@@ -53,7 +53,7 @@ while True:
for e in dev.read():
tap = args.xfingers
if tap == 1:
- if (e.type == 3 and e.code == 47 and e.value > 0):
+ if (e.type == 3 and e.code == 47 and e.value > 0):
raise SystemExit(
"Multitouch Event detected but Single was expected")
# type 1 is evdev.ecodes.EV_KEY
diff --git a/bin/fan_reaction_test.py b/bin/fan_reaction_test.py
index a4ab744..c37054b 100755
--- a/bin/fan_reaction_test.py
+++ b/bin/fan_reaction_test.py
@@ -34,6 +34,7 @@ class FanMonitor:
if not self._fan_paths:
print('Fan monitoring interface not found in SysFS')
raise SystemExit(0)
+
def get_rpm(self):
result = {}
for p in self._fan_paths:
@@ -44,6 +45,7 @@ class FanMonitor:
except OSError:
print('Fan SysFS node dissappeared ({})'.format(p))
return result
+
def get_average_rpm(self, period):
acc = self.get_rpm()
for i in range(period):
@@ -83,14 +85,15 @@ class Stressor:
def _stress_fun(self):
"""Actual stress function."""
# generate some random data
- data = bytes(random.getrandbits(8) for _ in range(1024))
- hasher = hashlib.sha256()
+ data = bytes(random.getrandbits(8) for _ in range(1024))
+ hasher = hashlib.sha256()
hasher.update(data)
while True:
new_digest = hasher.digest()
# use the newly obtained digest as the new data to the hasher
hasher.update(new_digest)
+
def main():
"""Entry point."""
@@ -153,5 +156,6 @@ def main():
if not (rpm_rose_during_stress and rpm_dropped_during_cooling):
raise SystemExit("Fans did not react to stress expectedly")
+
if __name__ == '__main__':
main()
diff --git a/bin/frequency_governors_test.py b/bin/frequency_governors_test.py
index 6072ded..fb652f5 100755
--- a/bin/frequency_governors_test.py
+++ b/bin/frequency_governors_test.py
@@ -8,7 +8,8 @@ import time
import argparse
import logging
-from subprocess import check_output, check_call, CalledProcessError, PIPE
+from subprocess import check_output, check_call, CalledProcessError
+
class CPUScalingTest(object):
@@ -37,11 +38,11 @@ class CPUScalingTest(object):
for subdirectory in os.listdir(self.sysCPUDirectory):
match = pattern.search(subdirectory)
if match and match.group("cpuNumber"):
- cpufreqDirectory = os.path.join(self.sysCPUDirectory,
- subdirectory, "cpufreq")
+ cpufreqDirectory = os.path.join(
+ self.sysCPUDirectory, subdirectory, "cpufreq")
if not os.path.exists(cpufreqDirectory):
- logging.error("CPU %s has no cpufreq directory %s"
- % (match.group("cpuNumber"), cpufreqDirectory))
+ logging.error("CPU %s has no cpufreq directory %s" % (
+ match.group("cpuNumber"), cpufreqDirectory))
return None
# otherwise
self.cpufreqDirectories.append(cpufreqDirectory)
@@ -59,7 +60,8 @@ class CPUScalingTest(object):
for cpufreqDirectory in self.cpufreqDirectories:
parameters = self.getParameters(cpufreqDirectory, file)
if not parameters:
- logging.error("Error: could not determine cpu parameters from %s"
+ logging.error(
+ "Error: could not determine cpu parameters from %s"
% os.path.join(cpufreqDirectory, file))
return None
if not current:
@@ -91,7 +93,7 @@ class CPUScalingTest(object):
return rf
return None
- logging.debug("Setting %s to %s" % (setFile,value))
+ logging.debug("Setting %s to %s" % (setFile, value))
path = None
if not skip:
if automatch:
@@ -115,8 +117,9 @@ class CPUScalingTest(object):
parameterFile = open(path)
line = parameterFile.readline()
if not line or line.strip() != str(value):
- logging.error("Error: could not verify that %s was set to %s"
- % (path, value))
+ logging.error(
+ "Error: could not verify that %s was set to %s" % (
+ path, value))
if line:
logging.error("Actual Value: %s" % line)
else:
@@ -127,6 +130,7 @@ class CPUScalingTest(object):
def checkSelectorExecutable(self):
logging.debug("Determining if %s is executable" % self.selectorExe)
+
def is_exe(fpath):
return os.path.exists(fpath) and os.access(fpath, os.X_OK)
@@ -179,8 +183,9 @@ class CPUScalingTest(object):
parameterFile = open(parameterFilePath)
line = parameterFile.readline()
if not line:
- logging.error("Error: failed to get %s for %s"
- % (parameter, self.cpufreqDirectory))
+ logging.error(
+ "Error: failed to get %s for %s" % (
+ parameter, self.cpufreqDirectory))
return None
value = line.strip()
return value
@@ -198,8 +203,9 @@ class CPUScalingTest(object):
parameterFile = open(path)
line = parameterFile.readline()
if not line:
- logging.error("Error: failed to get %s for %s"
- % (parameter, cpufreqDirectory))
+ logging.error(
+ "Error: failed to get %s for %s" % (
+ parameter, cpufreqDirectory))
return None
values.append(line.strip())
logging.debug("Found parameters:")
@@ -231,15 +237,18 @@ class CPUScalingTest(object):
runTime = stop_elapsed_time - start_elapsed_time
else:
thisTime = stop_elapsed_time - start_elapsed_time
- if ((abs(thisTime - runTime) / runTime) * 100
- < self.retryTolerance):
+ if (
+ (abs(thisTime - runTime) / runTime) * 100 <
+ self.retryTolerance
+ ):
return runTime
else:
runTime = thisTime
tries += 1
- logging.error("Could not repeat load test times within %.1f%%"
- % self.retryTolerance)
+ logging.error(
+ "Could not repeat load test times within %.1f%%" %
+ self.retryTolerance)
return None
def pi(self):
@@ -262,9 +271,11 @@ class CPUScalingTest(object):
logging.info("Done.")
minimumFrequency = self.getParameter("scaling_min_freq")
currentFrequency = self.getParameter("scaling_cur_freq")
- if (not minimumFrequency
- or not currentFrequency
- or (minimumFrequency != currentFrequency)):
+ if (
+ not minimumFrequency or
+ not currentFrequency or
+ (minimumFrequency != currentFrequency)
+ ):
return False
# otherwise
@@ -310,7 +321,8 @@ class CPUScalingTest(object):
logging.info(" cpu%u: %s" % (i, g))
i += 1
else:
- logging.error("Error: could not determine current governor settings")
+ logging.error(
+ "Error: could not determine current governor settings")
return False
self.getCPUFlags()
@@ -333,7 +345,7 @@ class CPUScalingTest(object):
logging.debug("Found the following CPU Flags:")
for line in self.cpuFlags:
logging.debug(" %s" % line)
- except:
+ except OSError:
logging.warning("Could not read CPU flags")
def runUserSpaceTests(self):
@@ -356,82 +368,103 @@ class CPUScalingTest(object):
# Set the the CPU speed to it's lowest value
frequency = self.minFreq
- logging.info("Setting CPU frequency to %u MHz" % (int(frequency) / 1000))
+ logging.info(
+ "Setting CPU frequency to %u MHz" % (int(frequency) / 1000))
if not self.setFrequency(frequency):
success = False
# Verify the speed is set to the lowest value
minimumFrequency = self.getParameter("scaling_min_freq")
currentFrequency = self.getParameter("scaling_cur_freq")
- if (not minimumFrequency
- or not currentFrequency
- or (minimumFrequency != currentFrequency)):
- logging.error("Could not verify that cpu frequency is set to the minimum value of %s"
- % minimumFrequency)
+ if (
+ not minimumFrequency or
+ not currentFrequency or
+ (minimumFrequency != currentFrequency)
+ ):
+ logging.error(
+ "Could not verify that cpu frequency is set to the minimum"
+ " value of %s" % minimumFrequency)
success = False
# Run Load Test
self.minimumFrequencyTestTime = self.runLoadTest()
if not self.minimumFrequencyTestTime:
- logging.error("Could not retrieve the minimum frequency test's execution time.")
+ logging.error(
+ "Could not retrieve the minimum frequency test's"
+ " execution time.")
success = False
else:
- logging.info("Minimum frequency load test time: %.2f"
- % self.minimumFrequencyTestTime)
+ logging.info(
+ "Minimum frequency load test time: %.2f"
+ % self.minimumFrequencyTestTime)
# Set the CPU speed to it's highest value as above.
frequency = self.maxFreq
- logging.info("Setting CPU frequency to %u MHz" % (int(frequency) / 1000))
+ logging.info(
+ "Setting CPU frequency to %u MHz" % (int(frequency) / 1000))
if not self.setFrequency(frequency):
success = False
maximumFrequency = self.getParameter("scaling_max_freq")
currentFrequency = self.getParameter("scaling_cur_freq")
- if (not maximumFrequency
- or not currentFrequency
- or (maximumFrequency != currentFrequency)):
- logging.error("Could not verify that cpu frequency is set to the maximum value of %s"
- % maximumFrequency)
+ if (
+ not maximumFrequency or
+ not currentFrequency or
+ (maximumFrequency != currentFrequency)
+ ):
+ logging.error(
+ "Could not verify that cpu frequency is set to the"
+ " maximum value of %s" % maximumFrequency)
success = False
# Repeat workload test
self.maximumFrequencyTestTime = self.runLoadTest()
if not self.maximumFrequencyTestTime:
- logging.error("Could not retrieve the maximum frequency test's execution time.")
+ logging.error(
+ "Could not retrieve the maximum frequency test's "
+ "execution time.")
success = False
else:
- logging.info("Maximum frequency load test time: %.2f"
- % self.maximumFrequencyTestTime)
+ logging.info(
+ "Maximum frequency load test time: %.2f"
+ % self.maximumFrequencyTestTime)
# Verify MHz increase is comparable to time % decrease
- predictedSpeedup = (float(maximumFrequency)
- / float(minimumFrequency))
+ predictedSpeedup = (float(maximumFrequency) /
+ float(minimumFrequency))
# If "ida" turbo thing, increase the expectation by 8%
if self.cpuFlags and self.idaFlag in self.cpuFlags:
- logging.info("Found %s flag, increasing expected speedup by %.1f%%"
- % (self.idaFlag, self.idaSpeedupFactor))
+ logging.info(
+ "Found %s flag, increasing expected speedup by %.1f%%"
+ % (self.idaFlag, self.idaSpeedupFactor))
predictedSpeedup = \
- (predictedSpeedup
- * (1.0 / (1.0 - (self.idaSpeedupFactor / 100.0))))
+ (predictedSpeedup *
+ (1.0 / (1.0 - (self.idaSpeedupFactor / 100.0))))
if self.minimumFrequencyTestTime and self.maximumFrequencyTestTime:
- measuredSpeedup = (self.minimumFrequencyTestTime
- / self.maximumFrequencyTestTime)
+ measuredSpeedup = (self.minimumFrequencyTestTime /
+ self.maximumFrequencyTestTime)
logging.info("CPU Frequency Speed Up: %.2f" % predictedSpeedup)
logging.info("Measured Speed Up: %.2f" % measuredSpeedup)
- differenceSpeedUp = (((measuredSpeedup - predictedSpeedup) / predictedSpeedup) * 100)
- logging.info("Percentage Difference %.1f%%" % differenceSpeedUp)
+ differenceSpeedUp = (
+ ((measuredSpeedup - predictedSpeedup) / predictedSpeedup) *
+ 100)
+ logging.info(
+ "Percentage Difference %.1f%%" % differenceSpeedUp)
if differenceSpeedUp > self.speedUpTolerance:
- logging.error("Measured speedup vs expected speedup is %.1f%% and is not within %.1f%% margin."
- % (differenceSpeedUp, self.speedUpTolerance))
+ logging.error(
+ "Measured speedup vs expected speedup is %.1f%% "
+ "and is not within %.1f%% margin."
+ % (differenceSpeedUp, self.speedUpTolerance))
success = False
elif differenceSpeedUp < 0:
- logging.info("""
- Measured speed up %.2f exceeded expected speedup %.2f
- """ % (measuredSpeedup, predictedSpeedup))
+ logging.info(
+ "Measured speed up %.2f exceeded expected speedup %.2f"
+ % (measuredSpeedup, predictedSpeedup))
else:
- logging.error("Not enough timing data to calculate speed differences.")
+ logging.error(
+ "Not enough timing data to calculate speed differences.")
return success
@@ -453,7 +486,9 @@ class CPUScalingTest(object):
# Wait a fixed period of time, then verify current speed
# is the slowest in as before
if not self.verifyMinimumFrequency():
- logging.error("Could not verify that cpu frequency has settled to the minimum value")
+ logging.error(
+ "Could not verify that cpu frequency has settled to the "
+ "minimum value")
success = False
# Repeat workload test
@@ -462,27 +497,33 @@ class CPUScalingTest(object):
logging.warning("No On Demand load test time available.")
success = False
else:
- logging.info("On Demand load test time: %.2f" % onDemandTestTime)
+ logging.info(
+ "On Demand load test time: %.2f" % onDemandTestTime)
if onDemandTestTime and self.maximumFrequencyTestTime:
# Compare the timing to the max results from earlier,
# again time should be within self.speedUpTolerance
differenceOnDemandVsMaximum = \
- (abs(onDemandTestTime - self.maximumFrequencyTestTime)
- / self.maximumFrequencyTestTime) * 100
- logging.info("Percentage Difference vs. maximum frequency: %.1f%%"
- % differenceOnDemandVsMaximum)
+ (abs(onDemandTestTime - self.maximumFrequencyTestTime) /
+ self.maximumFrequencyTestTime) * 100
+ logging.info(
+ "Percentage Difference vs. maximum frequency: %.1f%%"
+ % differenceOnDemandVsMaximum)
if differenceOnDemandVsMaximum > self.speedUpTolerance:
- logging.error("On demand performance vs maximum of %.1f%% is not within %.1f%% margin"
- % (differenceOnDemandVsMaximum,
- self.speedUpTolerance))
+ logging.error(
+ "On demand performance vs maximum of %.1f%% is not "
+ "within %.1f%% margin"
+ % (differenceOnDemandVsMaximum, self.speedUpTolerance))
success = False
else:
- logging.error("Not enough timing data to calculate speed differences.")
+ logging.error(
+ "Not enough timing data to calculate speed differences.")
# Verify the current speed has returned to the lowest speed again
if not self.verifyMinimumFrequency():
- logging.error("Could not verify that cpu frequency has settled to the minimum value")
+ logging.error(
+ "Could not verify that cpu frequency has settled to the "
+ "minimum value")
success = False
return success
@@ -504,11 +545,14 @@ class CPUScalingTest(object):
# Verify the current speed is the same as scaling_max_freq
maximumFrequency = self.getParameter("scaling_max_freq")
currentFrequency = self.getParameter("scaling_cur_freq")
- if (not maximumFrequency
- or not currentFrequency
- or (maximumFrequency != currentFrequency)):
- logging.error("Current cpu frequency of %s is not set to the maximum value of %s"
- % (currentFrequency, maximumFrequency))
+ if (
+ not maximumFrequency or
+ not currentFrequency or
+ (maximumFrequency != currentFrequency)
+ ):
+ logging.error(
+ "Current cpu frequency of %s is not set to the maximum "
+ "value of %s" % (currentFrequency, maximumFrequency))
success = False
# Repeat work load test
@@ -517,22 +561,27 @@ class CPUScalingTest(object):
logging.error("No Performance load test time available.")
success = False
else:
- logging.info("Performance load test time: %.2f" % performanceTestTime)
+ logging.info(
+ "Performance load test time: %.2f" % performanceTestTime)
if performanceTestTime and self.maximumFrequencyTestTime:
# Compare the timing to the max results
differencePerformanceVsMaximum = \
- (abs(performanceTestTime - self.maximumFrequencyTestTime)
- / self.maximumFrequencyTestTime) * 100
- logging.info("Percentage Difference vs. maximum frequency: %.1f%%"
- % differencePerformanceVsMaximum)
+ (abs(performanceTestTime - self.maximumFrequencyTestTime) /
+ self.maximumFrequencyTestTime) * 100
+ logging.info(
+ "Percentage Difference vs. maximum frequency: %.1f%%"
+ % differencePerformanceVsMaximum)
if differencePerformanceVsMaximum > self.speedUpTolerance:
- logging.error("Performance setting vs maximum of %.1f%% is not within %.1f%% margin"
- % (differencePerformanceVsMaximum,
- self.speedUpTolerance))
+ logging.error(
+ "Performance setting vs maximum of %.1f%% is not "
+ "within %.1f%% margin"
+ % (differencePerformanceVsMaximum,
+ self.speedUpTolerance))
success = False
else:
- logging.error("Not enough timing data to calculate speed differences.")
+ logging.error(
+ "Not enough timing data to calculate speed differences.")
return success
@@ -559,7 +608,9 @@ class CPUScalingTest(object):
# Wait a fixed period of time,
# then verify current speed is the slowest in as before
if not self.verifyMinimumFrequency(10):
- logging.error("Could not verify that cpu frequency has settled to the minimum value")
+ logging.error(
+ "Could not verify that cpu frequency has settled "
+ "to the minimum value")
success = False
# Set the frequency step to 0,
@@ -573,28 +624,35 @@ class CPUScalingTest(object):
logging.error("No Conservative load test time available.")
success = False
else:
- logging.info("Conservative load test time: %.2f"
- % conservativeTestTime)
+ logging.info(
+ "Conservative load test time: %.2f"
+ % conservativeTestTime)
if conservativeTestTime and self.minimumFrequencyTestTime:
# Compare the timing to the max results
differenceConservativeVsMinimum = \
- (abs(conservativeTestTime - self.minimumFrequencyTestTime)
- / self.minimumFrequencyTestTime) * 100
- logging.info("Percentage Difference vs. minimum frequency: %.1f%%"
- % differenceConservativeVsMinimum)
+ (abs(conservativeTestTime -
+ self.minimumFrequencyTestTime) /
+ self.minimumFrequencyTestTime) * 100
+ logging.info(
+ "Percentage Difference vs. minimum frequency: %.1f%%"
+ % differenceConservativeVsMinimum)
if differenceConservativeVsMinimum > self.speedUpTolerance:
- logging.error("Performance setting vs minimum of %.1f%% is not within %.1f%% margin"
- % (differenceConservativeVsMinimum,
- self.speedUpTolerance))
+ logging.error(
+ "Performance setting vs minimum of %.1f%% is not "
+ "within %.1f%% margin"
+ % (differenceConservativeVsMinimum,
+ self.speedUpTolerance))
success = False
else:
- logging.error("Not enough timing data to calculate speed differences.")
+ logging.error(
+ "Not enough timing data to calculate speed differences.")
return success
def restoreGovernors(self):
- logging.info("Restoring original governor to %s" % (self.originalGovernors[0]))
+ logging.info(
+ "Restoring original governor to %s" % (self.originalGovernors[0]))
self.setGovernor(self.originalGovernors[0])
@@ -604,8 +662,9 @@ def main():
help="Suppress output.")
parser.add_argument("-c", "--capabilities", action="store_true",
help="Only output CPU capabilities.")
- parser.add_argument("-d", "--debug", action="store_true",
- help="Turn on debug level output for extra info during test run.")
+ parser.add_argument(
+ "-d", "--debug", action="store_true",
+ help="Turn on debug level output for extra info during test run.")
args = parser.parse_args()
# Set up the logging system (unless we don't want ANY logging)
@@ -614,11 +673,11 @@ def main():
logger.setLevel(logging.DEBUG)
format = '%(asctime)s %(levelname)-8s %(message)s'
date_format = '%Y-%m-%d %H:%M:%S'
-
+
# If we DO want console output
if not args.quiet:
- console_handler = logging.StreamHandler() #logging.StreamHandler()
- console_handler.setFormatter(logging.Formatter(format,date_format))
+ console_handler = logging.StreamHandler()
+ console_handler.setFormatter(logging.Formatter(format, date_format))
console_handler.setLevel(logging.INFO)
logger.addHandler(console_handler)
diff --git a/bin/fresh_rate_info.py b/bin/fresh_rate_info.py
index 43a8f2a..3089b48 100755
--- a/bin/fresh_rate_info.py
+++ b/bin/fresh_rate_info.py
@@ -38,13 +38,13 @@ def xrandr_paser(data=None):
resolution = None
xrandrs = list()
for line in str(data).split('\n'):
- for match in re.finditer('(.+) connected (\d+x\d+)\+', line):
+ for match in re.finditer(r'(.+) connected (\d+x\d+)\+', line):
connector = match.group(1)
resolution = match.group(2)
break
if resolution is None:
continue
- for match in re.finditer('{0}\s+(.+)\*'.format(resolution),
+ for match in re.finditer(r'{0}\s+(.+)\*'.format(resolution),
line):
refresh_rate = match.group(1)
xrandr = {'connector': connector,
@@ -72,5 +72,6 @@ def main():
xrandr['resolution'],
xrandr['refresh_rate']))
+
if __name__ == '__main__':
sys.exit(main())
diff --git a/bin/get_make_and_model.py b/bin/get_make_and_model.py
index 496a949..42fb219 100755
--- a/bin/get_make_and_model.py
+++ b/bin/get_make_and_model.py
@@ -1,26 +1,28 @@
#!/usr/bin/env python3
-import os.path
import shlex
from subprocess import check_output
+
def print_header(value):
print("{}:".format(value))
+
def print_data(key, value):
print(" {}: {}".format(key, value))
+
def run_cmd(option):
cmd = "lshw -C " + option
- out = check_output(shlex.split(cmd),
- universal_newlines = True)
+ out = check_output(shlex.split(cmd), universal_newlines=True)
return out.split('\n')
+
def main():
keys = {'Manufacturer': 'vendor',
'Model': 'product',
'Version': 'version'}
- lshw_classes = {'system': 'System',
+ lshw_classes = {'system': 'System',
'bus': 'Mainboard'}
for lshw_class in lshw_classes:
@@ -38,6 +40,6 @@ def main():
for key in data:
print_data(key, data[key])
+
if __name__ == "__main__":
raise SystemExit(main())
-
diff --git a/bin/gpu_test.py b/bin/gpu_test.py
index 49ea31a..4f2af2e 100755
--- a/bin/gpu_test.py
+++ b/bin/gpu_test.py
@@ -30,9 +30,9 @@ import subprocess
import sys
import time
gi.require_version('Gio', '2.0')
-from gi.repository import Gio
-from math import cos, sin
-from threading import Thread
+from gi.repository import Gio # noqa: E402
+from math import cos, sin # noqa: E402
+from threading import Thread # noqa: E402
class GlxThread(Thread):
@@ -44,14 +44,13 @@ class GlxThread(Thread):
try:
self.process = subprocess.Popen(
- ["glxgears","-geometry", "400x400"],
+ ["glxgears", "-geometry", "400x400"],
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT)
self.process.communicate()
except (subprocess.CalledProcessError, FileNotFoundError) as er:
print("WARNING: Unable to start glxgears (%s)" % er)
-
def terminate(self):
if not hasattr(self, 'id'):
print("WARNING: Attempted to terminate non-existing window.")
@@ -121,8 +120,8 @@ class Html5VideoThread(Thread):
def html5_path(self):
if os.getenv('PLAINBOX_PROVIDER_DATA'):
return os.path.join(
- os.getenv('PLAINBOX_PROVIDER_DATA'),
- 'websites/html5_video.html')
+ os.getenv('PLAINBOX_PROVIDER_DATA'),
+ 'websites/html5_video.html')
def run(self):
if self.html5_path and os.path.isfile(self.html5_path):
@@ -136,8 +135,8 @@ class Html5VideoThread(Thread):
print("WARNING: test results may be invalid.")
def terminate(self):
- if self.html5_path and os.path.isfile(self.html5_path):
- subprocess.call("pkill firefox", shell=True)
+ if self.html5_path and os.path.isfile(self.html5_path):
+ subprocess.call("pkill firefox", shell=True)
def check_gpu(log=None):
@@ -172,10 +171,10 @@ def main():
print("WARNING: Got an exception %s" % er)
windows = ""
for app in sorted(windows.splitlines(), reverse=True):
- if not b'glxgears' in app:
+ if b'glxgears' not in app:
continue
GlxWindows[i].id = str(
- re.match(b'^(0x\w+)', app).group(0), 'utf-8')
+ re.match(b'^(0x\w+)', app).group(0), 'utf-8') # noqa: W605
break
if hasattr(GlxWindows[i], "id"):
rotator = RotateGlxThread(GlxWindows[i].id, i + 1)
@@ -204,7 +203,7 @@ def main():
'gconftool --get /apps/compiz-1/general/screen0/options/vsize',
shell=True))
(x_res, y_res) = re.search(
- b'DG:\s+(\d+)x(\d+)',
+ b'DG:\s+(\d+)x(\d+)', # noqa: W605
subprocess.check_output('wmctrl -d', shell=True)).groups()
DesktopSwitch = ChangeWorkspace(
hsize, vsize, int(x_res) // hsize, int(y_res) // vsize)
@@ -230,5 +229,6 @@ def main():
settings.set_int("vsize", vsize_ori)
Gio.Settings.sync()
+
if __name__ == '__main__':
sys.exit(main())
diff --git a/bin/graphic_memory_info.py b/bin/graphic_memory_info.py
index ba7eece..29040a5 100755
--- a/bin/graphic_memory_info.py
+++ b/bin/graphic_memory_info.py
@@ -42,13 +42,13 @@ def vgamem_paser(data=None):
device = None
vgamems = list()
for line in data.split('\n'):
- for match in re.finditer('(\d\d:\d\d\.\d) VGA(.+): (.+)', line):
+ for match in re.finditer(r'(\d\d:\d\d\.\d) VGA(.+): (.+)', line):
device = match.group(1)
name = match.group(3)
if device is None:
continue
-#Memory at e0000000 (32-bit, prefetchable) [size=256M]
- for match in re.finditer('Memory(.+) prefetchable\) \[size=(\d+)M\]',
+# Memory at e0000000 (32-bit, prefetchable) [size=256M]
+ for match in re.finditer(r'Memory(.+) prefetchable\) \[size=(\d+)M\]',
line):
vgamem_size = match.group(2)
vgamem = {'device': device,
@@ -77,5 +77,6 @@ def main():
vgamem['name'],
vgamem['vgamem_size']))
+
if __name__ == '__main__':
sys.exit(main())
diff --git a/bin/graphics_driver.py b/bin/graphics_driver.py
index 6d28f5a..f9bdff3 100755
--- a/bin/graphics_driver.py
+++ b/bin/graphics_driver.py
@@ -129,7 +129,7 @@ class XorgLog(object):
gathering_module = False
module = None
m = re.search(
- '\(II\) Loading.*modules\/drivers\/(.+)_drv\.so', line)
+ r'\(II\) Loading.*modules\/drivers\/(.+)_drv\.so', line)
if m:
found_ddx = True
continue
@@ -207,7 +207,8 @@ class XorgLog(object):
# For NVIDIA
m = re.search(r'\(II\) (.*)\(\d+\): Setting mode "(.*?):', line)
if not m:
- m = re.search(r'\(II\) (.*)\(\d+\): Setting mode "(NULL)"', line)
+ m = re.search(
+ r'\(II\) (.*)\(\d+\): Setting mode "(NULL)"', line)
if m:
self.displays[display_name] = display
self.video_driver = m.group(1)
@@ -314,8 +315,8 @@ def is_laptop():
def hybrid_graphics_check(xlog):
'''Check for Hybrid Graphics'''
- card_id1 = re.compile('.*0300: *(.+):(.+) \(.+\)')
- card_id2 = re.compile('.*03..: *(.+):(.+)')
+ card_id1 = re.compile(r'.*0300: *(.+):(.+) \(.+\)')
+ card_id2 = re.compile(r'.*03..: *(.+):(.+)')
cards_dict = {'8086': 'Intel', '10de': 'NVIDIA', '1002': 'AMD'}
cards = []
drivers = []
@@ -350,12 +351,12 @@ def hybrid_graphics_check(xlog):
drivers.append(module['name'])
print('Loaded DDX Drivers: %s' % ', '.join(drivers))
- has_hybrid_graphics = (len(cards) > 1 and is_laptop()
- and (cards_dict.get('8086') in formatted_cards
- or cards_dict.get('1002') in formatted_cards))
+ has_hybrid_graphics = (len(cards) > 1 and is_laptop() and
+ (cards_dict.get('8086') in formatted_cards or
+ cards_dict.get('1002') in formatted_cards))
- print('Hybrid Graphics: %s' % (has_hybrid_graphics
- and 'yes' or 'no'))
+ print('Hybrid Graphics: %s' % (has_hybrid_graphics and
+ 'yes' or 'no'))
return 0
@@ -372,5 +373,6 @@ def main():
return 1 if 1 in results else 0
+
if __name__ == "__main__":
sys.exit(main())
diff --git a/bin/graphics_modes_info.py b/bin/graphics_modes_info.py
index f3c962a..f92cd76 100755
--- a/bin/graphics_modes_info.py
+++ b/bin/graphics_modes_info.py
@@ -32,7 +32,7 @@ from checkbox_support.contrib import xrandr
def print_modes_info(screen):
"""Print some information about the detected screen and its outputs"""
xrandr._check_required_version((1, 0))
- print("Screen %s: minimum %s x %s, current %s x %s, maximum %s x %s" %\
+ print("Screen %s: minimum %s x %s, current %s x %s, maximum %s x %s" %
(screen._screen,
screen._width_min, screen._height_min,
screen._width, screen._height,
@@ -50,10 +50,9 @@ def print_modes_info(screen):
for m in range(len(modes)):
mode = modes[m]
refresh = mode.dotClock / (mode.hTotal * mode.vTotal)
- print(" [%s] %s x %s @ %s Hz" % (m,
- mode.width,
- mode.height,
- refresh), end=' ')
+ print(
+ " [%s] %s x %s @ %s Hz" %
+ (m, mode.width, mode.height, refresh), end=' ')
if mode.id == output._mode:
print("(current)", end=' ')
if m == output.get_preferred_mode():
@@ -70,5 +69,6 @@ def main():
except(xrandr.UnsupportedRRError):
print('Error: RandR version lower than 1.0', file=sys.stderr)
+
if __name__ == '__main__':
main()
diff --git a/bin/graphics_stress_test.py b/bin/graphics_stress_test.py
index d467b3f..c0eeb4a 100755
--- a/bin/graphics_stress_test.py
+++ b/bin/graphics_stress_test.py
@@ -45,7 +45,7 @@ class VtWrapper(object):
vt = 0
proc = Popen(['ps', 'aux'], stdout=PIPE, universal_newlines=True)
proc_output = proc.communicate()[0].split('\n')
- proc_line = re.compile('.*tty(\d+).+/usr/bin/X.*')
+ proc_line = re.compile(r'.*tty(\d+).+/usr/bin/X.*')
for line in proc_output:
match = proc_line.match(line)
if match:
@@ -56,6 +56,7 @@ class VtWrapper(object):
retcode = call(['chvt', '%d' % vt])
return retcode
+
class SuspendWrapper(object):
def __init__(self):
pass
@@ -132,6 +133,7 @@ class SuspendWrapper(object):
return status
+
class RotationWrapper(object):
def __init__(self):
@@ -178,6 +180,7 @@ class RotationWrapper(object):
result = 1
return result
+
class RenderCheckWrapper(object):
"""A simple class to run the rendercheck suites"""
@@ -234,14 +237,15 @@ class RenderCheckWrapper(object):
passed = int(match_output.group(1).strip())
total = int(match_output.group(2).strip())
logging.info('Results:')
- logging.info(' %d tests passed out of %d.'
- % (passed, total))
+ logging.info(
+ ' %d tests passed out of %d.' % (passed, total))
if show_errors and match_errors:
error = match_errors.group(0).strip()
if first_error:
- logging.debug('Rendercheck %s suite errors '
- 'from iteration %d:'
- % (suites, iteration))
+ logging.debug(
+ 'Rendercheck %s suite errors '
+ 'from iteration %d:'
+ % (suites, iteration))
first_error = False
logging.debug(' %s' % error)
@@ -254,17 +258,17 @@ class RenderCheckWrapper(object):
exit_status = 0
for suite in suites:
for it in range(iterations):
- logging.info('Iteration %d of Rendercheck %s suite...'
- % (it + 1, suite))
- (status, passed, total) = \
- self._print_test_info(suites=suite,
- iteration=it + 1,
- show_errors=show_errors)
+ logging.info(
+ 'Iteration %d of Rendercheck %s suite...'
+ % (it + 1, suite))
+ (status, passed, total) = self._print_test_info(
+ suites=suite, iteration=it + 1, show_errors=show_errors)
if status != 0:
# Make sure to catch a non-zero exit status
- logging.info('Iteration %d of Rendercheck %s suite '
- 'exited with status %d.'
- % (it + 1, suite, status))
+ logging.info(
+ 'Iteration %d of Rendercheck %s suite '
+ 'exited with status %d.'
+ % (it + 1, suite, status))
exit_status = status
it += 1
@@ -372,7 +376,6 @@ def main():
logfile_handler.setFormatter(logging.Formatter(format))
logger.addHandler(logfile_handler)
- log_path = os.path.abspath(logfile)
# Write only to stdout
else:
@@ -402,16 +405,18 @@ def main():
logging.info('Iteration %d...', it)
retcode = vt_wrap.set_vt(target_vt)
if retcode != 0:
- logging.error('Error: switching to tty%d failed with code %d '
- 'on iteration %d' % (target_vt, retcode, it))
+ logging.error(
+ 'Error: switching to tty%d failed with code %d '
+ 'on iteration %d' % (target_vt, retcode, it))
status = 1
else:
logging.info('Switching to tty%d: passed' % (target_vt))
time.sleep(2)
retcode = vt_wrap.set_vt(vt_wrap.x_vt)
if retcode != 0:
- logging.error('Error: switching to tty%d failed with code %d '
- 'on iteration %d' % (vt_wrap.x_vt, retcode, it))
+ logging.error(
+ 'Error: switching to tty%d failed with code %d '
+ 'on iteration %d' % (vt_wrap.x_vt, retcode, it))
else:
logging.info('Switching to tty%d: passed' % (vt_wrap.x_vt))
status = 1
@@ -463,5 +468,6 @@ def main():
return status
+
if __name__ == '__main__':
exit(main())
diff --git a/bin/gst_pipeline_test.py b/bin/gst_pipeline_test.py
index c7a9af5..f022632 100755
--- a/bin/gst_pipeline_test.py
+++ b/bin/gst_pipeline_test.py
@@ -9,9 +9,9 @@ import sys
import time
gi.require_version('Gst', '1.0')
gi.require_version('GLib', '2.0')
-from gi.repository import Gst
-from gi.repository import GLib
-from subprocess import check_output
+from gi.repository import Gst # noqa: E402
+from gi.repository import GLib # noqa: E402
+from subprocess import check_output # noqa: E402
def check_state(device):
@@ -22,8 +22,11 @@ def check_state(device):
data = sink_info.split("\n")
try:
- device_name = re.findall(".*Name:\s.*%s.*" % device, sink_info, re.IGNORECASE)[0].lstrip()
- sink = re.findall(".*Name:\s(.*%s.*)" % device, sink_info, re.IGNORECASE)[0].lstrip()
+ device_name = re.findall(
+ r".*Name:\s.*%s.*" % device, sink_info, re.IGNORECASE)[0].lstrip()
+ sink = re.findall(
+ r".*Name:\s(.*%s.*)" % device, sink_info,
+ re.IGNORECASE)[0].lstrip()
status = data[data.index("\t" + device_name) - 1]
except (IndexError, ValueError):
logging.error("Failed to find status for device: %s" % device)
@@ -32,22 +35,26 @@ def check_state(device):
os.environ['PULSE_SINK'] = sink
logging.info("[ Pulse sink ]".center(80, '='))
logging.info("Device: %s %s" % (device_name.strip(), status.strip()))
- return status
+ return status
def main():
parser = ArgumentParser(description='Simple GStreamer pipeline player')
- parser.add_argument('PIPELINE',
+ parser.add_argument(
+ 'PIPELINE',
help='Quoted GStreamer pipeline to launch')
- parser.add_argument('-t', '--timeout',
+ parser.add_argument(
+ '-t', '--timeout',
type=int, required=True,
help='Timeout for running the pipeline')
- parser.add_argument('-d', '--device',
+ parser.add_argument(
+ '-d', '--device',
type=str,
help="Device to check for status")
args = parser.parse_args()
- logging.basicConfig(format='%(levelname)s:%(message)s', level=logging.INFO,
+ logging.basicConfig(
+ format='%(levelname)s:%(message)s', level=logging.INFO,
stream=sys.stdout)
exit_code = 0
@@ -63,7 +70,7 @@ def main():
except GLib.GError as error:
print("Specified pipeline couldn't be processed.")
print("Error when processing pipeline: {}".format(error))
- #Exit harmlessly
+ # Exit harmlessly
return(2)
print("Pipeline initialized, now starting playback.")
diff --git a/bin/hdd_parking.py b/bin/hdd_parking.py
index 6d34904..01022ba 100755
--- a/bin/hdd_parking.py
+++ b/bin/hdd_parking.py
@@ -25,7 +25,7 @@
"""
This script verifies that a systems HDD protection capabilities
are triggered when appropriate. There are many implementations
-of HDD protection from different OEMs, each implemented in a
+of HDD protection from different OEMs, each implemented in a
different way, so this script can only support implementations
which are known to work and are testable. Currently the list
of supported implementations is:
@@ -42,10 +42,12 @@ from subprocess import Popen, PIPE
TIMEOUT = 15.0
+
def hdaps_test(run_time):
try:
- hdapsd = Popen(['/usr/sbin/hdapsd'], stdout=PIPE, stderr=PIPE,
- universal_newlines=True)
+ hdapsd = Popen(
+ ['/usr/sbin/hdapsd'], stdout=PIPE, stderr=PIPE,
+ universal_newlines=True)
except OSError as err:
print("Unable to start hdapsd: {}".format(err))
return 1
@@ -59,6 +61,7 @@ def hdaps_test(run_time):
return 0
return 1
+
def main():
# First establish the driver used
parser = ArgumentParser("Tests a systems HDD protection capabilities. "
@@ -70,5 +73,6 @@ def main():
'all axis. No particular force should be required.')
return hdaps_test(parser.parse_args().timeout)
+
if __name__ == "__main__":
sys.exit(main())
diff --git a/bin/hotkey_tests.py b/bin/hotkey_tests.py
index f1b3560..066f36c 100755
--- a/bin/hotkey_tests.py
+++ b/bin/hotkey_tests.py
@@ -554,5 +554,6 @@ def main():
if failed:
raise SystemExit('Some test failed!')
+
if __name__ == '__main__':
main()
diff --git a/bin/key_test.py b/bin/key_test.py
index 2bd95f9..f94abd3 100755
--- a/bin/key_test.py
+++ b/bin/key_test.py
@@ -29,7 +29,7 @@ import termios
from gettext import gettext as _
-from gi.repository import GObject
+from gi.repository import GLib
from optparse import OptionParser
@@ -81,7 +81,7 @@ class Reporter(object):
self.scancodes = scancodes
self.fileno = os.open("/dev/console", os.O_RDONLY)
- GObject.io_add_watch(self.fileno, GObject.IO_IN, self.on_key)
+ GLib.io_add_watch(self.fileno, GLib.IO_IN, self.on_key)
# Set terminal attributes
self.saved_attributes = termios.tcgetattr(self.fileno)
@@ -122,9 +122,9 @@ class Reporter(object):
index = 0
length = len(raw_bytes)
while index < length:
- if (index + 2 < length and (raw_bytes[index] & 0x7f) == 0
- and (raw_bytes[index + 1] & 0x80) != 0
- and (raw_bytes[index + 2] & 0x80) != 0):
+ if (index + 2 < length and (raw_bytes[index] & 0x7f) == 0 and
+ (raw_bytes[index + 1] & 0x80) != 0 and
+ (raw_bytes[index + 2] & 0x80) != 0):
code = (((raw_bytes[index + 1] & 0x7f) << 7) |
(raw_bytes[2] & 0x7f))
index += 3
@@ -324,7 +324,8 @@ class GtkReporter(Reporter):
def found_key(self, key):
super(GtkReporter, self).found_key(key)
- self.icons[key].set_from_stock(self.ICON_TESTED, size=self.ICON_SIZE)
+ self.icons[key].set_from_icon_name(
+ self.ICON_TESTED, size=self.ICON_SIZE)
self.check_keys()
@@ -334,7 +335,7 @@ class GtkReporter(Reporter):
stock_icon = self.ICON_UNTESTED
else:
stock_icon = self.ICON_NOT_REQUIRED
- self.icons[key].set_from_stock(stock_icon, self.ICON_SIZE)
+ self.icons[key].set_from_icon_name(stock_icon, self.ICON_SIZE)
self.check_keys()
@@ -394,10 +395,10 @@ Hint to find codes:
key = Key(codes, name)
keys.append(key)
- main_loop = GObject.MainLoop()
+ main_loop = GLib.MainLoop()
try:
reporter = reporter_factory(main_loop, keys, options.scancodes)
- except:
+ except OSError:
parser.error("Failed to initialize interface: %s" % options.interface)
try:
@@ -408,5 +409,6 @@ Hint to find codes:
return reporter.exit_code
+
if __name__ == "__main__":
sys.exit(main(sys.argv[1:]))
diff --git a/bin/keyboard_test.py b/bin/keyboard_test.py
index 1615106..e87dc2e 100755
--- a/bin/keyboard_test.py
+++ b/bin/keyboard_test.py
@@ -84,5 +84,6 @@ def main(args):
return 0
+
if __name__ == "__main__":
sys.exit(main(sys.argv[1:]))
diff --git a/bin/lock_screen_watcher.py b/bin/lock_screen_watcher.py
index 1c30270..e4433c8 100755
--- a/bin/lock_screen_watcher.py
+++ b/bin/lock_screen_watcher.py
@@ -20,8 +20,8 @@ import dbus
import gi
from dbus.mainloop.glib import DBusGMainLoop
gi.require_version('GLib', '2.0')
-from gi.repository import GObject
-from gi.repository import GLib
+from gi.repository import GObject # noqa: E402
+from gi.repository import GLib # noqa: E402
def filter_cb(bus, message):
@@ -41,6 +41,7 @@ def on_timeout_expired():
print("You have failed to perform the required manipulation in time")
raise SystemExit(1)
+
DBusGMainLoop(set_as_default=True)
bus = dbus.SessionBus()
bus.add_match_string("type='signal',interface='com.ubuntu.Upstart0_6'")
diff --git a/bin/lsmod_info.py b/bin/lsmod_info.py
index 3b81542..cb7c0bc 100755
--- a/bin/lsmod_info.py
+++ b/bin/lsmod_info.py
@@ -36,5 +36,6 @@ def main():
print('%s: %s' % (module, version))
return 0
+
if __name__ == '__main__':
sys.exit(main())
diff --git a/bin/mac_passthrough.py b/bin/mac_passthrough.py
index 92d2eb6..1707a1e 100755
--- a/bin/mac_passthrough.py
+++ b/bin/mac_passthrough.py
@@ -53,8 +53,8 @@ def get_pass_through_mac():
for line in dsdt:
if 'AUXMAC' in line:
bios_mac = line.split('#')[1]
- print('Pass-through MAC address from DSDT table: '
- + bios_mac.lower())
+ print('Pass-through MAC address from DSDT table: ' +
+ bios_mac.lower())
return bios_mac.lower()
raise SystemExit('No AUXMAC is found in DSDT table, '
'MAC address pass-through is not working.')
diff --git a/bin/manage_compiz_plugin.py b/bin/manage_compiz_plugin.py
index 03c0ed8..be9541f 100755
--- a/bin/manage_compiz_plugin.py
+++ b/bin/manage_compiz_plugin.py
@@ -29,11 +29,10 @@ from gettext import gettext as _
import argparse
import gettext
import os
-import sys
import subprocess
import time
-KEY="/org/compiz/profiles/unity/plugins/core/active-plugins"
+KEY = "/org/compiz/profiles/unity/plugins/core/active-plugins"
gettext.textdomain("com.canonical.certification.checkbox")
gettext.bindtextdomain("com.canonical.certification.checkbox",
@@ -41,8 +40,9 @@ gettext.bindtextdomain("com.canonical.certification.checkbox",
plugins = eval(subprocess.check_output(["dconf", "read", KEY]))
-parser = argparse.ArgumentParser(description=_("enable/disable compiz plugins"),
- epilog=_("Available plugins: {}").format(plugins))
+parser = argparse.ArgumentParser(
+ description=_("enable/disable compiz plugins"),
+ epilog=_("Available plugins: {}").format(plugins))
parser.add_argument("plugin", type=str, help=_('Name of plugin to control'))
parser.add_argument("action", type=str, choices=['enable', 'disable'],
help=_("What to do with the plugin"))
diff --git a/bin/memory_compare.py b/bin/memory_compare.py
index 02e2def..aaf89f4 100755
--- a/bin/memory_compare.py
+++ b/bin/memory_compare.py
@@ -25,7 +25,6 @@
import os
import sys
import re
-from math import log, copysign
from subprocess import check_output, CalledProcessError, PIPE
from checkbox_support.helpers.human_readable_bytes import HumanReadableBytes
@@ -44,7 +43,7 @@ class LshwJsonResult:
desc_regex = re.compile('System Memory', re.IGNORECASE)
def addHardware(self, hardware):
- if self.desc_regex.match(str(hardware.get('description',0))):
+ if self.desc_regex.match(str(hardware.get('description', 0))):
self.memory_reported += int(hardware.get('size', 0))
elif 'bank' in hardware['id']:
self.banks_reported += int(hardware.get('size', 0))
@@ -134,5 +133,6 @@ def main():
(difference, percentage, threshold), file=sys.stderr)
return 1
+
if __name__ == "__main__":
sys.exit(main())
diff --git a/bin/memory_test.py b/bin/memory_test.py
index 4cd6008..a472b71 100755
--- a/bin/memory_test.py
+++ b/bin/memory_test.py
@@ -75,8 +75,8 @@ class MemoryTest():
self.process_memory = self.free_memory
try:
arch = self._command_out("arch").decode()
- if (re.match(r"(i[0-9]86|s390|arm.*)", arch)
- and self.free_memory > 1024):
+ if (re.match(r"(i[0-9]86|s390|arm.*)", arch) and
+ self.free_memory > 1024):
self.is_process_limited = True
self.process_memory = 1024 # MB, due to 32-bit address space
print("%s arch, Limiting Process Memory: %u" % (
diff --git a/bin/network.py b/bin/network.py
index a646f9c..60f7b0a 100755
--- a/bin/network.py
+++ b/bin/network.py
@@ -255,14 +255,15 @@ class IPerfPerformanceTest(object):
logging.warning("The network test against {} failed because:".
format(self.target))
if percent < self.fail_threshold:
- logging.error(" Transfer speed: {} Mb/s".
- format(throughput))
- logging.error(" {:03.2f}% of theoretical max {} Mb/s\n".
- format(percent, int(self.iface.max_speed)))
+ logging.error(" Transfer speed: {} Mb/s".format(throughput))
+ logging.error(
+ " {:03.2f}% of theoretical max {} Mb/s\n".format(
+ percent, int(self.iface.max_speed)))
if cpu_load > self.cpu_load_fail_threshold:
logging.error(" CPU load: {}%".format(cpu_load))
- logging.error(" CPU load is above {}% maximum\n".
- format(self.cpu_load_fail_threshold))
+ logging.error(
+ " CPU load is above {}% maximum\n".format(
+ self.cpu_load_fail_threshold))
return 30
logging.debug("Passed benchmark against {}".format(self.target))
@@ -277,9 +278,11 @@ class StressPerformanceTest:
def run(self):
if self.iperf3:
- iperf_cmd = 'timeout -k 1 320 iperf3 -c {} -t 300'.format(self.target)
+ iperf_cmd = 'timeout -k 1 320 iperf3 -c {} -t 300'.format(
+ self.target)
else:
- iperf_cmd = 'timeout -k 1 320 iperf -c {} -t 300'.format(self.target)
+ iperf_cmd = 'timeout -k 1 320 iperf -c {} -t 300'.format(
+ self.target)
print("Running iperf...")
iperf = subprocess.Popen(shlex.split(iperf_cmd))
@@ -365,14 +368,14 @@ class Interface(socket.socket):
ethinfo = check_output(['ethtool', self.interface],
universal_newlines=True,
stderr=STDOUT).split(' ')
- expression = '(\\d+)(base)([A-Z]+)|(\d+)(Mb/s)'
+ expression = r'(\\d+)(base)([A-Z]+)|(\d+)(Mb/s)'
regex = re.compile(expression)
if ethinfo:
for i in ethinfo:
hit = regex.search(i)
if hit:
- speeds.append(int(re.sub("\D", "", hit.group(0))))
+ speeds.append(int(re.sub(r"\D", "", hit.group(0))))
except CalledProcessError as e:
logging.error('ethtool returned an error!')
logging.error(e.output)
@@ -391,7 +394,7 @@ class Interface(socket.socket):
for s in line.split(' '):
hit = regex.search(s)
if hit:
- speeds.append(int(re.sub("\D", "", hit.group(0))))
+ speeds.append(int(re.sub(r"\D", "", hit.group(0))))
except FileNotFoundError:
logging.warning('mii-tool not found! Unable to get max speed')
except CalledProcessError as e:
diff --git a/bin/network_check.py b/bin/network_check.py
index 82c00d9..20b88d5 100755
--- a/bin/network_check.py
+++ b/bin/network_check.py
@@ -6,7 +6,9 @@ ubuntu.com
from subprocess import call
from argparse import ArgumentParser
import http.client
-import urllib.request, urllib.error, urllib.parse
+import urllib.request
+import urllib.error
+import urllib.parse
import sys
@@ -61,12 +63,14 @@ def main():
try:
call(command)
except OSError:
- print("Zenity missing; unable to report test result:\n %s" % message)
+ print(
+ "Zenity missing; unable to report test result:\n %s" % message)
if any(results.values()):
return 0
else:
return 1
+
if __name__ == "__main__":
sys.exit(main())
diff --git a/bin/network_ntp_test.py b/bin/network_ntp_test.py
index 6e2ccf8..f7a2297 100755
--- a/bin/network_ntp_test.py
+++ b/bin/network_ntp_test.py
@@ -84,13 +84,13 @@ def StartStopNTPD(state, pid=0):
SilentCall('/etc/init.d/ntp start')
ntpd_status = CheckNTPD()
- if status == 0:
+ if ntpd_status == 0:
logging.debug('ntpd restarted with PID: %s' % ntpd_status[1])
else:
logging.error('ntpd restart failed for some reason')
else:
- logging.error('%s is an unknown state, unable to start/stop ntpd' %
- state)
+ logging.error(
+ '%s is an unknown state, unable to start/stop ntpd' % state)
def SyncTime(server):
@@ -141,7 +141,8 @@ def SkewTime():
def main():
- description = 'Tests the ability to skew and sync the clock with an NTP server'
+ description = (
+ 'Tests the ability to skew and sync the clock with an NTP server')
parser = ArgumentParser(description=description)
parser.add_argument('--server',
action='store',
@@ -173,7 +174,7 @@ def main():
if args.debug:
logger.setLevel(logging.DEBUG)
logger.addHandler(handler)
-
+
# Make sure NTP is installed
if not os.access('/usr/sbin/ntpdate', os.F_OK):
logging.error('NTP is not installed!')
@@ -208,5 +209,6 @@ def main():
else:
return 1
+
if __name__ == '__main__':
sys.exit(main())
diff --git a/bin/network_reconnect_resume_test.py b/bin/network_reconnect_resume_test.py
index 8861eb6..17a220e 100755
--- a/bin/network_reconnect_resume_test.py
+++ b/bin/network_reconnect_resume_test.py
@@ -43,16 +43,17 @@ def get_wifi_reconnect_times():
Returns a list of all the timestamps for wifi reconnects.
"""
data = subprocess.check_output(['dmesg'], universal_newlines=True)
- syntax = re.compile("\[(.*)\] wlan.* associated")
+ syntax = re.compile(r"\[(.*)\] wlan.* associated")
results = re.findall(syntax, data)
return map(float, results)
+
def get_wired_reconnect_times():
"""
Returns a list of all the timestamps for wired reconnects.
"""
data = subprocess.check_output(['dmesg'], universal_newlines=True)
- syntax = re.compile("\[(.*)\].*eth.* Link is [uU]p")
+ syntax = re.compile(r"\[(.*)\].*eth.* Link is [uU]p")
results = re.findall(syntax, data)
return map(float, results)
@@ -63,7 +64,7 @@ def get_resume_time():
If no resume is found, None is returned.
"""
data = subprocess.check_output(['dmesg'], universal_newlines=True)
- syntax = re.compile("\[(.*)\].ACPI: Waking up from system sleep state S3")
+ syntax = re.compile(r"\[(.*)\].ACPI: Waking up from system sleep state S3")
results = re.findall(syntax, data)
if not results:
return None
@@ -85,7 +86,7 @@ def main():
args = parser.parse_args()
timedif = get_time_difference(args.device)
-
+
if not timedif:
return 0
@@ -98,5 +99,6 @@ def main():
print("PASS: the network connected within the allotted time")
return 0
+
if __name__ == "__main__":
sys.exit(main())
diff --git a/bin/network_restart.py b/bin/network_restart.py
index 14c7357..63a99fb 100755
--- a/bin/network_restart.py
+++ b/bin/network_restart.py
@@ -96,7 +96,8 @@ class GtkApplication(Application):
def __init__(self, address, times):
Application.__init__(self, address, times)
- dialog = Gtk.Dialog(title='Network restart',
+ dialog = Gtk.Dialog(
+ title='Network restart',
buttons=(Gtk.STOCK_CANCEL, Gtk.ResponseType.CANCEL))
dialog.set_default_size(300, 100)
@@ -187,7 +188,8 @@ def ping(address):
"""
logging.info('Pinging {0!r}...'.format(address))
try:
- check_call('ping -c 1 -w 5 {0}'.format(address),
+ check_call(
+ 'ping -c 1 -w 5 {0}'.format(address),
stdout=open(os.devnull, 'w'), stderr=STDOUT, shell=True)
except CalledProcessError:
return False
@@ -209,8 +211,9 @@ class Networking:
"""
output = check_output(['/sbin/ifconfig', '-s', '-a'])
lines = output.splitlines()[1:]
- interfaces = ([interface for interface in
- [line.split()[0] for line in lines] if interface != 'lo'])
+ interfaces = (
+ [interface for interface in
+ [line.split()[0] for line in lines] if interface != 'lo'])
return interfaces
def restart(self):
@@ -287,21 +290,24 @@ def parse_args():
"""
Parse command line options
"""
- parser = ArgumentParser('Reboot networking interface '
- 'and verify that is up again afterwards')
- parser.add_argument('-a', '--address', default='ubuntu.com',
+ parser = ArgumentParser(
+ 'Reboot networking interface and verify that is up again afterwards')
+ parser.add_argument(
+ '-a', '--address', default='ubuntu.com',
help=('Address to ping to verify that network connection is up '
"('%(default)s' by default)"))
parser.add_argument('-o', '--output',
default='/var/log',
help='The path to the log directory. \
Default is /var/log')
- parser.add_argument('-t', '--times',
+ parser.add_argument(
+ '-t', '--times',
type=int, default=1,
help=('Number of times that the network interface has to be restarted '
'(%(default)s by default)'))
log_levels = ['notset', 'debug', 'info', 'warning', 'error', 'critical']
- parser.add_argument('--log-level', dest='log_level_str', default='notset',
+ parser.add_argument(
+ '--log-level', dest='log_level_str', default='notset',
choices=log_levels,
help=('Log level. '
'One of {0} or {1} (%(default)s by default)'
diff --git a/bin/optical_read_test.py b/bin/optical_read_test.py
index b3cf23d..7560db1 100755
--- a/bin/optical_read_test.py
+++ b/bin/optical_read_test.py
@@ -7,7 +7,7 @@ import filecmp
import shutil
from argparse import ArgumentParser
-from subprocess import Popen, PIPE
+from subprocess import Popen, PIPE, SubprocessError
DEFAULT_DIR = '/tmp/checkbox.optical'
DEFAULT_DEVICE_DIR = 'device'
@@ -18,11 +18,7 @@ CDROM_ID = '/lib/udev/cdrom_id'
def _command(command, shell=True):
- proc = Popen(command,
- shell=shell,
- stdout=PIPE,
- stderr=PIPE
- )
+ proc = Popen(command, shell=shell, stdout=PIPE, stderr=PIPE)
return proc
@@ -33,7 +29,7 @@ def _command_out(command, shell=True):
def compare_tree(source, target):
for dirpath, dirnames, filenames in os.walk(source):
- #if both tree are empty return false
+ # if both tree are empty return false
if dirpath == source and dirnames == [] and filenames == []:
return False
for name in filenames:
@@ -64,21 +60,21 @@ def read_test(device):
mount = _command("mount -o ro %s %s" % (device, device_dir))
mount.communicate()
if mount.returncode != 0:
- print("Unable to mount %s to %s" %
- (device, device_dir), file=sys.stderr)
+ print("Unable to mount %s to %s" %
+ (device, device_dir), file=sys.stderr)
return False
file_copy = _command("cp -dpR %s %s" % (device_dir, image_dir))
file_copy.communicate()
if file_copy.returncode != 0:
- print("Failed to copy files from %s to %s" %
- (device_dir, image_dir), file=sys.stderr)
+ print("Failed to copy files from %s to %s" %
+ (device_dir, image_dir), file=sys.stderr)
return False
if compare_tree(device_dir, image_dir):
passed = True
- except:
- print("File Comparison failed while testing %s" % device,
- file=sys.stderr)
+ except (SubprocessError, OSError):
+ print("File Comparison failed while testing %s" % device,
+ file=sys.stderr)
passed = False
finally:
_command("umount %s" % device_dir).communicate(3)
@@ -88,7 +84,7 @@ def read_test(device):
if passed:
print("File Comparison passed (%s)" % device)
-
+
return passed
@@ -127,8 +123,9 @@ def main():
print("Testing %s on %s ... " % (test, device), file=sys.stdout)
tester = "%s_test" % test
return_values.append(globals()[tester](device))
-
+
return False in return_values
+
if __name__ == "__main__":
sys.exit(main())
diff --git a/bin/pm_log_check.py b/bin/pm_log_check.py
index cb75685..1d41ed6 100755
--- a/bin/pm_log_check.py
+++ b/bin/pm_log_check.py
@@ -36,8 +36,8 @@ class Parser(object):
"""
Reboot test log file parser
"""
- is_logging_line = (re.compile('^\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2},\d{3}')
- .search)
+ is_logging_line = (re.compile(
+ r'^\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2},\d{3}').search)
is_getting_info_line = (re.compile('Gathering hardware information...$')
.search)
is_executing_line = (re.compile("Executing: '(?P<command>.*)'...$")
@@ -103,8 +103,10 @@ class Parser(object):
if self.is_output_line(line):
command_output = {}
break
- if (self.is_executing_line(line)
- or self.is_getting_info_line(line)):
+ if (
+ self.is_executing_line(line) or
+ self.is_getting_info_line(line)
+ ):
# Skip commands with no output
iterator.unnext(line)
return None
@@ -132,8 +134,10 @@ class Parser(object):
# for the field value
value = []
for line in iterator:
- if (self.is_logging_line(line)
- or self.is_field_line(line)):
+ if (
+ self.is_logging_line(line) or
+ self.is_field_line(line)
+ ):
iterator.unnext(line)
break
@@ -205,8 +209,8 @@ def compare_results(results):
result_output = result[command]
error_messages = []
- fields = (set(baseline_output.keys())
- | set(result_output.keys()))
+ fields = (set(baseline_output.keys()) |
+ set(result_output.keys()))
for field in fields:
baseline_field = baseline_output.get(field, '')
result_field = result_output.get(field, '')
diff --git a/bin/pm_test.py b/bin/pm_test.py
index 18cf97c..fcaf7d9 100755
--- a/bin/pm_test.py
+++ b/bin/pm_test.py
@@ -20,7 +20,7 @@ from configparser import ConfigParser
from datetime import datetime, timedelta
from time import localtime, time
gi.require_version("Gtk", "3.0")
-from gi.repository import GObject, Gtk
+from gi.repository import GObject, Gtk # noqa: E402
def main():
@@ -266,9 +266,9 @@ class PowerManagementOperation():
count = count_s3 + count_s2idle
if count != total_suspends_expected:
problems = (
- "Found {} occurrences of '{}'. Expected {}".format(
- count, magic_line.strip(),
- total_suspends_expected))
+ "Found {} occurrences of S3/S2idle."
+ " Expected {}".format(
+ count, total_suspends_expected))
except FileNotFoundError:
problems = "Error opening {}".format(fwts_log_path)
if problems:
@@ -367,13 +367,13 @@ class WakeUpAlarm():
with open(cls.ALARM_FILENAME) as alarm_file:
wakeup_time_stored_str = alarm_file.read()
- if not re.match('\d+', wakeup_time_stored_str):
+ if not re.match(r'\d+', wakeup_time_stored_str):
subprocess.check_call('echo "+%d" > %s'
% (timeout, cls.ALARM_FILENAME),
shell=True)
with open(cls.ALARM_FILENAME) as alarm_file2:
wakeup_time_stored_str = alarm_file2.read()
- if not re.match('\d+', wakeup_time_stored_str):
+ if not re.match(r'\d+', wakeup_time_stored_str):
logging.error('Invalid wakeup time format: {0!r}'
.format(wakeup_time_stored_str))
sys.exit(1)
@@ -396,7 +396,7 @@ class WakeUpAlarm():
sys.exit(1)
with open(cls.RTC_FILENAME) as rtc_file:
- separator_regex = re.compile('\s+:\s+')
+ separator_regex = re.compile(r'\s+:\s+')
rtc_data = dict([separator_regex.split(line.rstrip())
for line in rtc_file])
logging.debug('RTC data:\n{0}'
@@ -582,14 +582,15 @@ class CountdownDialog(Gtk.Dialog):
.run().stdout),
ethernet=(Command('lspci | grep Ethernet')
.run().stdout),
- ifconfig=(Command("ifconfig -a | grep -A1 '^\w'")
+ ifconfig=(Command(
+ r"ifconfig -a | grep -A1 '^\w'")
.run().stdout),
- iwconfig=(Command("iwconfig | grep -A1 '^\w'")
+ iwconfig=(Command(r"iwconfig | grep -A1 '^\w'")
.run().stdout)))
logging.debug('Bluetooth Device:\n'
'{hciconfig}'
- .format(hciconfig=(Command("hciconfig -a "
- "| grep -A2 '^\w'")
+ .format(hciconfig=(Command(r"hciconfig -a "
+ r"| grep -A2 '^\w'")
.run().stdout)))
logging.debug('Video Card:\n'
'{lspci}'
@@ -770,7 +771,7 @@ Exec=sudo {script} -r {repetitions} -w {wakeup} --hardware-delay {hardware_delay
Type=Application
X-GNOME-Autostart-enabled=true
Hidden=false
-"""
+""" # noqa: E501
def __init__(self, args, user=None):
self.args = args
@@ -1012,7 +1013,7 @@ class MyArgumentParser():
'{0}.{1}.{2}.log'.format(
os.path.splitext(os.path.basename(__file__))[0],
args.pm_operation,
- args.total)))
+ args.total))
return args, extra_args
diff --git a/bin/process_wait.py b/bin/process_wait.py
index 6107d1b..1295991 100755
--- a/bin/process_wait.py
+++ b/bin/process_wait.py
@@ -36,14 +36,17 @@ def main(args):
usage = "Usage: %prog PROCESS [PROCESS...]"
parser = OptionParser(usage=usage)
- parser.add_option("-s", "--sleep",
+ parser.add_option(
+ "-s", "--sleep",
type="int",
default=default_sleep,
help="Number of seconds to sleep between checks.")
- parser.add_option("-t", "--timeout",
+ parser.add_option(
+ "-t", "--timeout",
type="int",
help="Number of seconds to timeout from sleeping.")
- parser.add_option("-u", "--uid",
+ parser.add_option(
+ "-u", "--uid",
help="Effective user name or id of the running processes")
(options, processes) = parser.parse_args(args)
diff --git a/bin/pulse-active-port-change.py b/bin/pulse-active-port-change.py
index 90abed8..baf0152 100755
--- a/bin/pulse-active-port-change.py
+++ b/bin/pulse-active-port-change.py
@@ -71,20 +71,21 @@ class AudioPlugDetection:
doc = parse_pactl_output(text)
cfg = set()
for record in doc.record_list:
- active_port = None
- port_availability = None
- # We go through the attribute list once to try to find an active port
- for attr in record.attribute_list:
- if attr.name == "Active Port":
- active_port = attr.value
- # If there is one, we retrieve its availability flag
- if active_port:
- for attr in record.attribute_list:
- if attr.name == "Ports":
- for port in attr.value:
- if port.name == active_port:
- port_availability = port.availability
- cfg.add((record.name, active_port, port_availability))
+ active_port = None
+ port_availability = None
+ # We go through the attribute list once to try to find
+ # an active port
+ for attr in record.attribute_list:
+ if attr.name == "Active Port":
+ active_port = attr.value
+ # If there is one, we retrieve its availability flag
+ if active_port:
+ for attr in record.attribute_list:
+ if attr.name == "Ports":
+ for port in attr.value:
+ if port.name == active_port:
+ port_availability = port.availability
+ cfg.add((record.name, active_port, port_availability))
return cfg
def on_timeout(self, signum, frame):
diff --git a/bin/removable_storage_test.py b/bin/removable_storage_test.py
index 82f1595..4253cbf 100755
--- a/bin/removable_storage_test.py
+++ b/bin/removable_storage_test.py
@@ -15,26 +15,30 @@ import time
import gi
gi.require_version('GUdev', '1.0')
-from gi.repository import GUdev
-
-from checkbox_support.dbus import connect_to_system_bus
-from checkbox_support.dbus.udisks2 import UDISKS2_BLOCK_INTERFACE
-from checkbox_support.dbus.udisks2 import UDISKS2_DRIVE_INTERFACE
-from checkbox_support.dbus.udisks2 import UDISKS2_FILESYSTEM_INTERFACE
-from checkbox_support.dbus.udisks2 import UDISKS2_LOOP_INTERFACE
-from checkbox_support.dbus.udisks2 import UDisks2Model, UDisks2Observer
-from checkbox_support.dbus.udisks2 import is_udisks2_supported
-from checkbox_support.dbus.udisks2 import lookup_udev_device
-from checkbox_support.dbus.udisks2 import map_udisks1_connection_bus
-from checkbox_support.heuristics.udisks2 import is_memory_card
-from checkbox_support.helpers.human_readable_bytes import HumanReadableBytes
-from checkbox_support.parsers.udevadm import CARD_READER_RE
-from checkbox_support.parsers.udevadm import GENERIC_RE
-from checkbox_support.parsers.udevadm import FLASH_RE
-from checkbox_support.parsers.udevadm import find_pkname_is_root_mountpoint
-from checkbox_support.udev import get_interconnect_speed
-from checkbox_support.udev import get_udev_block_devices
-from checkbox_support.udev import get_udev_xhci_devices
+from gi.repository import GUdev # noqa: E402
+
+from checkbox_support.dbus import connect_to_system_bus # noqa: E402
+from checkbox_support.dbus.udisks2 import (
+ UDISKS2_BLOCK_INTERFACE,
+ UDISKS2_DRIVE_INTERFACE,
+ UDISKS2_FILESYSTEM_INTERFACE,
+ UDISKS2_LOOP_INTERFACE,
+ UDisks2Model,
+ UDisks2Observer,
+ is_udisks2_supported,
+ lookup_udev_device,
+ map_udisks1_connection_bus) # noqa: E402
+from checkbox_support.heuristics.udisks2 import is_memory_card # noqa: E402
+from checkbox_support.helpers.human_readable_bytes import (
+ HumanReadableBytes) # noqa: E402
+from checkbox_support.parsers.udevadm import (
+ CARD_READER_RE,
+ GENERIC_RE,
+ FLASH_RE,
+ find_pkname_is_root_mountpoint) # noqa: E402
+from checkbox_support.udev import get_interconnect_speed # noqa: E402
+from checkbox_support.udev import get_udev_block_devices # noqa: E402
+from checkbox_support.udev import get_udev_xhci_devices # noqa: E402
class ActionTimer():
@@ -109,8 +113,8 @@ def on_ubuntucore():
snap = os.getenv("SNAP")
if snap:
with open(os.path.join(snap, 'meta/snap.yaml')) as f:
- for l in f.readlines():
- if l == "confinement: classic\n":
+ for line in f.readlines():
+ if line == "confinement: classic\n":
return False
return True
return False
@@ -855,7 +859,6 @@ def main():
# controller drivers.
# This will raise KeyError for no
# associated disk device was found.
- xhci_disks = test.get_disks_xhci()
if test.get_disks_xhci().get(disk, '') != 'xhci':
raise SystemExit(
"\t\tDisk does not use xhci_hcd.")
diff --git a/bin/removable_storage_watcher.py b/bin/removable_storage_watcher.py
index d5a0722..c13439e 100755
--- a/bin/removable_storage_watcher.py
+++ b/bin/removable_storage_watcher.py
@@ -9,16 +9,21 @@ import sys
import gi
gi.require_version('GUdev', '1.0')
-from gi.repository import GObject, GUdev
-
-from checkbox_support.dbus import connect_to_system_bus
-from checkbox_support.dbus.udisks2 import UDisks2Model, UDisks2Observer
-from checkbox_support.dbus.udisks2 import is_udisks2_supported
-from checkbox_support.dbus.udisks2 import lookup_udev_device
-from checkbox_support.dbus.udisks2 import map_udisks1_connection_bus
-from checkbox_support.heuristics.udisks2 import is_memory_card
-from checkbox_support.parsers.udevadm import CARD_READER_RE, GENERIC_RE, FLASH_RE
-from checkbox_support.udev import get_interconnect_speed, get_udev_block_devices
+from gi.repository import GObject, GUdev # noqa: E402
+
+from checkbox_support.dbus import connect_to_system_bus # noqa: E402
+from checkbox_support.dbus.udisks2 import UDisks2Model # noqa: E402
+from checkbox_support.dbus.udisks2 import UDisks2Observer # noqa: E402
+from checkbox_support.dbus.udisks2 import is_udisks2_supported # noqa: E402
+from checkbox_support.dbus.udisks2 import lookup_udev_device # noqa: E402
+from checkbox_support.dbus.udisks2 import (
+ map_udisks1_connection_bus) # noqa: E402
+from checkbox_support.heuristics.udisks2 import is_memory_card # noqa: E402
+from checkbox_support.parsers.udevadm import CARD_READER_RE # noqa: E402
+from checkbox_support.parsers.udevadm import GENERIC_RE # noqa: E402
+from checkbox_support.parsers.udevadm import FLASH_RE # noqa: E402
+from checkbox_support.udev import get_interconnect_speed # noqa: E402
+from checkbox_support.udev import get_udev_block_devices # noqa: E402
# Record representing properties of a UDisks1 Drive object needed by the
# UDisks1 version of the watcher implementation
@@ -121,19 +126,24 @@ class UDisks1StorageDeviceListener:
logging.debug("Desired bus devices: %s", desired_bus_devices)
for dev in desired_bus_devices:
if self._memorycard:
- if (dev.bus != 'sdio'
- and not FLASH_RE.search(dev.media)
- and not CARD_READER_RE.search(dev.model)
- and not GENERIC_RE.search(dev.vendor)):
- logging.debug("The device does not seem to be a memory"
- " card (bus: %r, model: %r), skipping",
- dev.bus, dev.model)
+ if (
+ dev.bus != 'sdio' and
+ not FLASH_RE.search(dev.media) and
+ not CARD_READER_RE.search(dev.model) and
+ not GENERIC_RE.search(dev.vendor)
+ ):
+ logging.debug(
+ "The device does not seem to be a memory"
+ " card (bus: %r, model: %r), skipping",
+ dev.bus, dev.model)
return
print(message % {'bus': 'memory card', 'file': dev.file})
else:
- if (FLASH_RE.search(dev.media)
- or CARD_READER_RE.search(dev.model)
- or GENERIC_RE.search(dev.vendor)):
+ if (
+ FLASH_RE.search(dev.media) or
+ CARD_READER_RE.search(dev.model) or
+ GENERIC_RE.search(dev.vendor)
+ ):
logging.debug("The device seems to be a memory"
" card (bus: %r (model: %r), skipping",
dev.bus, dev.model)
@@ -180,8 +190,7 @@ class UDisks1StorageDeviceListener:
message = "Expected memory card device %(file)s inserted"
else:
message = "Expected %(bus)s device %(file)s inserted"
- self.verify_device_change(inserted_devices,
- message=message)
+ self.verify_device_change(inserted_devices, message=message)
def add_detected(self, added_path):
logging.debug("UDisks1 reports device has been added: %s", added_path)
@@ -205,8 +214,9 @@ class UDisks1StorageDeviceListener:
removed_devices = list(set(self._starting_devices) -
set(current_devices))
logging.debug("Computed removed devices: %s", removed_devices)
- self.verify_device_change(removed_devices,
- message="Removable %(bus)s device %(file)s has been removed")
+ self.verify_device_change(
+ removed_devices,
+ message="Removable %(bus)s device %(file)s has been removed")
def get_existing_devices(self):
logging.debug("Getting existing devices from UDisks1")
@@ -221,12 +231,9 @@ class UDisks1StorageDeviceListener:
device_props = dbus.Interface(device_obj,
dbus.PROPERTIES_IFACE)
udisks = 'org.freedesktop.UDisks.Device'
- _device_file = device_props.Get(udisks,
- "DeviceFile")
- _bus = device_props.Get(udisks,
- "DriveConnectionInterface")
- _speed = device_props.Get(udisks,
- "DriveConnectionSpeed")
+ _device_file = device_props.Get(udisks, "DeviceFile")
+ _bus = device_props.Get(udisks, "DriveConnectionInterface")
+ _speed = device_props.Get(udisks, "DriveConnectionSpeed")
_parent_model = ''
_parent_media = ''
_parent_vendor = ''
@@ -411,7 +418,7 @@ class UDisks2StorageDeviceListener:
UDISKS2_DRIVE_PROPERTY_CONNECTION_BUS = "ConnectionBus"
def __init__(self, system_bus, loop, action, devices, minimum_speed,
- memorycard, unmounted = False):
+ memorycard, unmounted=False):
# Store the desired minimum speed of the device in Mbit/s. The argument
# is passed as the number of bits per second so let's fix that.
self._desired_minimum_speed = minimum_speed / 10 ** 6
@@ -581,43 +588,49 @@ class UDisks2StorageDeviceListener:
continue
# For devices with empty "ConnectionBus" property, don't
# require the device to be mounted
- if (record.value.iface_name ==
- "org.freedesktop.UDisks2.Drive"
- and record.value.delta_type == DELTA_TYPE_PROP
- and record.value.prop_name == "ConnectionBus"
- and record.value.prop_value == ""):
+ if (
+ record.value.iface_name ==
+ "org.freedesktop.UDisks2.Drive" and
+ record.value.delta_type == DELTA_TYPE_PROP and
+ record.value.prop_name == "ConnectionBus" and
+ record.value.prop_value == ""
+ ):
needs.remove('mounted')
# Detect block devices designated for filesystems
- if (record.value.iface_name ==
- "org.freedesktop.UDisks2.Block"
- and record.value.delta_type == DELTA_TYPE_PROP
- and record.value.prop_name == "IdUsage"
- and record.value.prop_value == "filesystem"):
+ if (
+ record.value.iface_name ==
+ "org.freedesktop.UDisks2.Block" and
+ record.value.delta_type == DELTA_TYPE_PROP and
+ record.value.prop_name == "IdUsage" and
+ record.value.prop_value == "filesystem"
+ ):
found.add('block-fs')
# Memorize the block device path
- elif (record.value.iface_name ==
- "org.freedesktop.UDisks2.Block"
- and record.value.delta_type == DELTA_TYPE_PROP
- and record.value.prop_name == "PreferredDevice"):
+ elif (
+ record.value.iface_name ==
+ "org.freedesktop.UDisks2.Block" and
+ record.value.delta_type == DELTA_TYPE_PROP and
+ record.value.prop_name == "PreferredDevice"
+ ):
object_block_device = record.value.prop_value
# Ensure the device is a partition
elif (record.value.iface_name ==
- "org.freedesktop.UDisks2.Partition"
- and record.value.delta_type == DELTA_TYPE_IFACE):
+ "org.freedesktop.UDisks2.Partition" and
+ record.value.delta_type == DELTA_TYPE_IFACE):
found.add('partition')
# Ensure the device is not empty
elif (record.value.iface_name ==
- "org.freedesktop.UDisks2.Block"
- and record.value.delta_type == DELTA_TYPE_PROP
- and record.value.prop_name == "Size"
- and record.value.prop_value > 0):
+ "org.freedesktop.UDisks2.Block" and
+ record.value.delta_type == DELTA_TYPE_PROP and
+ record.value.prop_name == "Size" and
+ record.value.prop_value > 0):
found.add('non-empty')
# Ensure the filesystem is mounted
elif (record.value.iface_name ==
- "org.freedesktop.UDisks2.Filesystem"
- and record.value.delta_type == DELTA_TYPE_PROP
- and record.value.prop_name == "MountPoints"
- and record.value.prop_value != []):
+ "org.freedesktop.UDisks2.Filesystem" and
+ record.value.delta_type == DELTA_TYPE_PROP and
+ record.value.prop_name == "MountPoints" and
+ record.value.prop_value != []):
found.add('mounted')
# On some systems partition are reported as mounted
# filesystems, without 'partition' record
@@ -625,9 +638,9 @@ class UDisks2StorageDeviceListener:
needs.remove('partition')
# Finally memorize the drive the block device belongs to
elif (record.value.iface_name ==
- "org.freedesktop.UDisks2.Block"
- and record.value.delta_type == DELTA_TYPE_PROP
- and record.value.prop_name == "Drive"):
+ "org.freedesktop.UDisks2.Block" and
+ record.value.delta_type == DELTA_TYPE_PROP and
+ record.value.prop_name == "Drive"):
drive_object_path = record.value.prop_value
logging.debug("Finished analyzing %s, found: %s, needs: %s"
" drive_object_path: %s", object_path, found, needs,
@@ -845,8 +858,9 @@ def main():
description = "Wait for the specified device to be inserted or removed."
parser = argparse.ArgumentParser(description=description)
parser.add_argument('action', choices=['insert', 'remove'])
- parser.add_argument('device', choices=['usb', 'sdio', 'firewire', 'scsi',
- 'ata_serial_esata'], nargs="+")
+ parser.add_argument(
+ 'device', choices=['usb', 'sdio', 'firewire', 'scsi',
+ 'ata_serial_esata'], nargs="+")
memorycard_help = ("Memory cards devices on bus other than sdio require "
"this parameter to identify them as such")
parser.add_argument('--memorycard', action="store_true",
@@ -902,5 +916,6 @@ def main():
except KeyboardInterrupt:
return 1
+
if __name__ == "__main__":
sys.exit(main())
diff --git a/bin/resolution_test.py b/bin/resolution_test.py
index 55a7828..77238ff 100755
--- a/bin/resolution_test.py
+++ b/bin/resolution_test.py
@@ -5,7 +5,7 @@ import sys
from argparse import ArgumentParser
gi.require_version('Gdk', '3.0')
-from gi.repository import Gdk
+from gi.repository import Gdk # noqa: E402
def check_resolution():
@@ -28,11 +28,13 @@ def compare_resolution(min_h, min_v):
def main():
parser = ArgumentParser()
- parser.add_argument("--horizontal",
+ parser.add_argument(
+ "--horizontal",
type=int,
default=0,
help="Minimum acceptable horizontal resolution.")
- parser.add_argument("--vertical",
+ parser.add_argument(
+ "--vertical",
type=int,
default=0,
help="Minimum acceptable vertical resolution.")
diff --git a/bin/rotation_test.py b/bin/rotation_test.py
index 6bf90fd..de70f4a 100755
--- a/bin/rotation_test.py
+++ b/bin/rotation_test.py
@@ -27,7 +27,8 @@ import time
import subprocess
gi.require_version('Gdk', '3.0')
-from gi.repository import Gdk
+from gi.repository import Gdk # noqa: E402
+
def main():
"""Run rotation cycling by running xrandr command."""
@@ -41,5 +42,6 @@ def main():
['xrandr', '--output', output, '--rotation', rotation])
time.sleep(4)
+
if __name__ == '__main__':
exit(main())
diff --git a/bin/sleep_test.py b/bin/sleep_test.py
index 423fd6c..3483951 100755
--- a/bin/sleep_test.py
+++ b/bin/sleep_test.py
@@ -184,19 +184,19 @@ class SuspendTest():
if perf:
for idx in range(0, len(loglist)):
if 'PM: Syncing filesystems' in loglist[idx]:
- sleep_start_time = re.split('[\[\]]',
+ sleep_start_time = re.split(r'[\[\]]',
loglist[idx])[1].strip()
logging.debug('Sleep started at %s' % sleep_start_time)
if 'ACPI: Low-level resume complete' in loglist[idx]:
- sleep_end_time = re.split('[\[\]]',
+ sleep_end_time = re.split(r'[\[\]]',
loglist[idx - 1])[1].strip()
logging.debug('Sleep ended at %s' % sleep_end_time)
- resume_start_time = re.split('[\[\]]',
+ resume_start_time = re.split(r'[\[\]]',
loglist[idx])[1].strip()
logging.debug('Resume started at %s' % resume_start_time)
idx += 1
if 'Restarting tasks' in loglist[idx]:
- resume_end_time = re.split('[\[\]]',
+ resume_end_time = re.split(r'[\[\]]',
loglist[idx])[1].strip()
logging.debug('Resume ended at %s' % resume_end_time)
if self.end_marker in loglist[idx]:
@@ -395,5 +395,6 @@ def main():
options.iterations)
return 0
+
if __name__ == '__main__':
sys.exit(main())
diff --git a/bin/sleep_test_log_check.py b/bin/sleep_test_log_check.py
index aae7556..350bbbb 100755
--- a/bin/sleep_test_log_check.py
+++ b/bin/sleep_test_log_check.py
@@ -138,8 +138,9 @@ def main():
logging.basicConfig(level=args.debug)
- #Create a generator and get our lines
- log = (line.rstrip() for line in open(args.logfile, 'rt', encoding="UTF-8"))
+ # Create a generator and get our lines
+ log = (
+ line.rstrip() for line in open(args.logfile, 'rt', encoding="UTF-8"))
# End result will be a dictionary with a key per level, value is another
# dictionary with a key per test (s3, s4, ...) and a list of all failures
@@ -166,9 +167,8 @@ def main():
# Report what I found
for level in sorted(results.keys()):
- if results[level]: # Yes, we can have an empty level. We may have
- # seen the levelheader but had it report no
- # failures.
+ if results[level]: # Yes, we can have an empty level.
+ # We may have seen the levelheader but had it report no failures.
print("{} failures:".format(level))
for test in results[level].keys():
print(" {}: {} failures".format(test,
@@ -184,8 +184,8 @@ def main():
logging.error("No fwts test summaries found, "
"possible malformed fwts log file")
return_code = 2
- elif not results.keys(): # If it has no keys, means we didn't see any
- # FWTS levels
+ elif not results.keys():
+ # If it has no keys, means we didn't see any FWTS levels
logging.error("None of the summaries contained failure levels, "
"possible malformed fwts log file")
return_code = 2
@@ -197,5 +197,6 @@ def main():
return return_code
+
if __name__ == '__main__':
sys.exit(main())
diff --git a/bin/sleep_time_check.py b/bin/sleep_time_check.py
index f9513fa..90e1bc7 100755
--- a/bin/sleep_time_check.py
+++ b/bin/sleep_time_check.py
@@ -53,13 +53,18 @@ def main():
print(e)
return 1
- if (sleep_time is None or resume_time is None) or \
- (len(sleep_times) != len(resume_times)):
+ if (
+ (sleep_time is None or resume_time is None) or
+ (len(sleep_times) != len(resume_times))
+ ):
print("ERROR: One or more times was not reported correctly")
return 1
- print("Average time to enter sleep state: %.4f seconds" % mean(sleep_times))
- print("Average time to resume from sleep state: %.4f seconds" % mean(resume_times))
+ print(
+ "Average time to enter sleep state: %.4f seconds" % mean(sleep_times))
+ print(
+ "Average time to resume from sleep state: %.4f seconds" % mean(
+ resume_times))
failed = 0
if sleep_time > args.sleep_threshold:
@@ -76,5 +81,6 @@ def main():
return failed
+
if __name__ == "__main__":
sys.exit(main())
diff --git a/bin/stress_ng_test.py b/bin/stress_ng_test.py
index b8c2586..35aad2d 100755
--- a/bin/stress_ng_test.py
+++ b/bin/stress_ng_test.py
@@ -138,7 +138,7 @@ def num_numa_nodes():
try:
return int(run(['numactl', '--hardware'],
stdout=PIPE).stdout.split()[1])
- except:
+ except (ValueError, OSError, IndexError):
return 1
diff --git a/bin/test_bt_keyboard.py b/bin/test_bt_keyboard.py
index 1457a1d..782ae20 100755
--- a/bin/test_bt_keyboard.py
+++ b/bin/test_bt_keyboard.py
@@ -5,7 +5,7 @@
# Written by:
# Maciej Kisielewski <maciej.kisielewski@canonical.com>
-import checkbox_support.bt_helper
+import checkbox_support.bt_helper as bt_helper
def main():
diff --git a/bin/testlib.py b/bin/testlib.py
deleted file mode 100755
index 5f783f8..0000000
--- a/bin/testlib.py
+++ /dev/null
@@ -1,1450 +0,0 @@
-from __future__ import print_function
-#
-# testlib.py quality assurance test script
-# Copyright (C) 2008-2016 Canonical Ltd.
-#
-# This library is free software; you can redistribute it and/or
-# modify it under the terms of the GNU Library General Public
-# License as published by the Free Software Foundation; either
-# version 2 of the License.
-#
-# This library is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
-# Library General Public License for more details.
-#
-# You should have received a copy of the GNU Library General Public
-# License along with this program. If not, see
-# <http://www.gnu.org/licenses/>.
-#
-
-'''Common classes and functions for package tests.'''
-
-import crypt
-import glob
-import grp
-import gzip
-import os
-import os.path
-import platform
-import pwd
-import random
-import re
-import shutil
-import socket
-import string
-import subprocess
-import sys
-import tempfile
-import time
-import unittest
-
-from stat import ST_SIZE
-
-# Don't make python-pexpect mandatory
-try:
- import pexpect
-except ImportError:
- pass
-
-import warnings
-warnings.filterwarnings('ignore', message=r'.*apt_pkg\.TagFile.*', category=DeprecationWarning)
-try:
- import apt_pkg
- # cope with apt_pkg api changes.
- if 'init_system' in dir(apt_pkg):
- apt_pkg.init_system()
- else:
- apt_pkg.InitSystem()
-except:
- # On non-Debian system, fall back to simple comparison without debianisms
- class apt_pkg(object):
- @staticmethod
- def version_compare(one, two):
- list_one = one.split('.')
- list_two = two.split('.')
- while len(list_one) > 0 and len(list_two) > 0:
- try:
- if int(list_one[0]) > int(list_two[0]):
- return 1
- if int(list_one[0]) < int(list_two[0]):
- return -1
- except:
- # ugh, non-numerics in the version, fall back to
- # string comparison, which will be wrong for e.g.
- # 3.2 vs 3.16rc1
- if list_one[0] > list_two[0]:
- return 1
- if list_one[0] < list_two[0]:
- return -1
- list_one.pop(0)
- list_two.pop(0)
- return 0
-
- @staticmethod
- def VersionCompare(one, two):
- return apt_pkg.version_compare(one, two)
-
-bogus_nxdomain = "208.69.32.132"
-
-# http://www.chiark.greenend.org.uk/ucgi/~cjwatson/blosxom/2009-07-02-python-sigpipe.html
-# This is needed so that the subprocesses that produce endless output
-# actually quit when the reader goes away.
-import signal
-def subprocess_setup():
- # Python installs a SIGPIPE handler by default. This is usually not what
- # non-Python subprocesses expect.
- signal.signal(signal.SIGPIPE, signal.SIG_DFL)
-
-
-class TimedOutException(Exception):
- def __init__(self, value="Timed Out"):
- self.value = value
-
- def __str__(self):
- return repr(self.value)
-
-
-def _restore_backup(path):
- pathbackup = path + '.autotest'
- if os.path.exists(pathbackup):
- shutil.move(pathbackup, path)
-
-
-def _save_backup(path):
- pathbackup = path + '.autotest'
- if os.path.exists(path) and not os.path.exists(pathbackup):
- shutil.copy2(path, pathbackup)
- # copy2 does not copy ownership, so do it here.
- # Reference: http://docs.python.org/library/shutil.html
- a = os.stat(path)
- os.chown(pathbackup, a[4], a[5])
-
-
-def config_copydir(path):
- if os.path.exists(path) and not os.path.isdir(path):
- raise OSError("'%s' is not a directory" % (path))
- _restore_backup(path)
-
- pathbackup = path + '.autotest'
- if os.path.exists(path):
- shutil.copytree(path, pathbackup, symlinks=True)
-
-
-def config_replace(path, contents, append=False):
- '''Replace (or append) to a config file'''
- _restore_backup(path)
- if os.path.exists(path):
- _save_backup(path)
- if append:
- with open(path) as fh:
- contents = fh.read() + contents
- with open(path, 'w') as fh:
- fh.write(contents)
-
-
-def config_comment(path, field):
- _save_backup(path)
- contents = ""
- with open(path) as fh:
- for line in fh:
- if re.search("^\s*%s\s*=" % (field), line):
- line = "#" + line
- contents += line
-
- with open(path + '.new', 'w') as new_fh:
- new_fh.write(contents)
- os.rename(path + '.new', path)
-
-
-def config_set(path, field, value, spaces=True):
- _save_backup(path)
- contents = ""
- if spaces:
- setting = '%s = %s\n' % (field, value)
- else:
- setting = '%s=%s\n' % (field, value)
- found = False
- with open(path) as fh:
- for line in fh:
- if re.search("^\s*%s\s*=" % (field), line):
- found = True
- line = setting
- contents += line
- if not found:
- contents += setting
-
- with open(path + '.new', 'w') as new_config:
- new_config.write(contents)
- os.rename(path + '.new', path)
-
-
-def config_patch(path, patch, depth=1):
- '''Patch a config file'''
- _restore_backup(path)
- _save_backup(path)
-
- handle, name = mkstemp_fill(patch)
- rc = subprocess.call(['/usr/bin/patch', '-p%s' % depth, path], stdin=handle, stdout=subprocess.PIPE)
- os.unlink(name)
- if rc != 0:
- raise Exception("Patch failed")
-
-
-def config_restore(path):
- '''Rename a replaced config file back to its initial state'''
- _restore_backup(path)
-
-
-def timeout(secs, f, *args):
- def handler(signum, frame):
- raise TimedOutException()
-
- old = signal.signal(signal.SIGALRM, handler)
- result = None
- signal.alarm(secs)
- try:
- result = f(*args)
- finally:
- signal.alarm(0)
- signal.signal(signal.SIGALRM, old)
-
- return result
-
-
-def require_nonroot():
- if os.geteuid() == 0:
- print("This series of tests should be run as a regular user with sudo access, not as root.", file=sys.stderr)
- sys.exit(1)
-
-
-def require_root():
- if os.geteuid() != 0:
- print("This series of tests should be run with root privileges (e.g. via sudo).", file=sys.stderr)
- sys.exit(1)
-
-
-def require_sudo():
- if os.geteuid() != 0 or os.environ.get('SUDO_USER', None) is None:
- print("This series of tests must be run under sudo.", file=sys.stderr)
- sys.exit(1)
- if os.environ['SUDO_USER'] == 'root':
- print('Please run this test using sudo from a regular user. (You ran sudo from root.)', file=sys.stderr)
- sys.exit(1)
-
-
-def random_string(length, lower=False):
- '''Return a random string, consisting of ASCII letters, with given
- length.'''
-
- s = ''
- selection = string.ascii_letters
- if lower:
- selection = string.ascii_lowercase
- maxind = len(selection) - 1
- for l in range(length):
- s += selection[random.randint(0, maxind)]
- return s
-
-
-def mkstemp_fill(contents, suffix='', prefix='testlib-', dir=None):
- '''As tempfile.mkstemp does, return a (file, name) pair, but with
- prefilled contents.'''
-
- handle, name = tempfile.mkstemp(suffix=suffix, prefix=prefix, dir=dir)
- os.close(handle)
- handle = open(name, "w+")
- handle.write(contents)
- handle.flush()
- handle.seek(0)
-
- return handle, name
-
-
-def create_fill(path, contents, mode=0o644):
- '''Safely create a page'''
- # make the temp file in the same dir as the destination file so we
- # don't get invalid cross-device link errors when we rename
- handle, name = mkstemp_fill(contents, dir=os.path.dirname(path))
- handle.close()
- os.rename(name, path)
- os.chmod(path, mode)
-
-
-def login_exists(login):
- '''Checks whether the given login exists on the system.'''
-
- try:
- pwd.getpwnam(login)
- return True
- except KeyError:
- return False
-
-
-def group_exists(group):
- '''Checks whether the given login exists on the system.'''
-
- try:
- grp.getgrnam(group)
- return True
- except KeyError:
- return False
-
-
-def is_empty_file(path):
- '''test if file is empty, returns True if so'''
- with open(path) as fh:
- return (len(fh.read()) == 0)
-
-
-def recursive_rm(dirPath, contents_only=False):
- '''recursively remove directory'''
- names = os.listdir(dirPath)
- for name in names:
- path = os.path.join(dirPath, name)
- if os.path.islink(path) or not os.path.isdir(path):
- os.unlink(path)
- else:
- recursive_rm(path)
- if not contents_only:
- os.rmdir(dirPath)
-
-
-def check_pidfile(exe, pidfile):
- '''Checks if pid in pidfile is running'''
- if not os.path.exists(pidfile):
- return False
-
- # get the pid
- try:
- with open(pidfile, 'r') as fd:
- pid = fd.readline().rstrip('\n')
- except:
- return False
-
- return check_pid(exe, pid)
-
-
-def check_pid(exe, pid):
- '''Checks if pid is running'''
- cmdline = "/proc/%s/cmdline" % (str(pid))
- if not os.path.exists(cmdline):
- return False
-
- # get the command line
- try:
- with open(cmdline, 'r') as fd:
- tmp = fd.readline().split('\0')
- except:
- return False
-
- # this allows us to match absolute paths or just the executable name
- if re.match('^' + exe + '$', tmp[0]) or \
- re.match('.*/' + exe + '$', tmp[0]) or \
- re.match('^' + exe + ': ', tmp[0]) or \
- re.match('^\(' + exe + '\)', tmp[0]):
- return True
-
- return False
-
-
-def check_port(port, proto, ver=4):
- '''Check if something is listening on the specified port.
- WARNING: for some reason this does not work with a bind mounted /proc
- '''
- assert (port >= 1)
- assert (port <= 65535)
- assert (proto.lower() == "tcp" or proto.lower() == "udp")
- assert (ver == 4 or ver == 6)
-
- fn = "/proc/net/%s" % (proto)
- if ver == 6:
- fn += str(ver)
-
- rc, report = cmd(['cat', fn])
- assert (rc == 0)
-
- hport = "%0.4x" % port
-
- if re.search(': [0-9a-f]{8}:%s [0-9a-f]' % str(hport).lower(), report.lower()):
- return True
- return False
-
-
-def get_arch():
- '''Get the current architecture'''
- rc, report = cmd(['uname', '-m'])
- assert (rc == 0)
- return report.strip()
-
-
-def get_multiarch_tuple():
- '''Get the current debian multiarch tuple'''
-
- # XXX dpkg-architecture on Ubuntu 12.04 requires -q be adjacent to
- # the variable name
- rc, report = cmd(['dpkg-architecture', '-qDEB_HOST_MULTIARCH'])
- assert (rc == 0)
- return report.strip()
-
-
-def get_bits():
- '''return the default architecture number of bits (32 or 64, usually)'''
- return platform.architecture()[0][0:2]
-
-
-def get_memory():
- '''Gets total ram and swap'''
- meminfo = "/proc/meminfo"
- memtotal = 0
- swaptotal = 0
- if not os.path.exists(meminfo):
- return (False, False)
-
- try:
- fd = open(meminfo, 'r')
- for line in fd.readlines():
- splitline = line.split()
- if splitline[0] == 'MemTotal:':
- memtotal = int(splitline[1])
- elif splitline[0] == 'SwapTotal:':
- swaptotal = int(splitline[1])
- fd.close()
- except:
- return (False, False)
-
- return (memtotal, swaptotal)
-
-
-def is_running_in_vm():
- '''Check if running under a VM'''
- # add other virtualization environments here
- for search in ['QEMU Virtual CPU']:
- rc, report = cmd_pipe(['dmesg'], ['grep', search])
- if rc == 0:
- return True
- return False
-
-
-def ubuntu_release():
- '''Get the Ubuntu release'''
- f = "/etc/lsb-release"
- try:
- size = os.stat(f)[ST_SIZE]
- except:
- return "UNKNOWN"
-
- if size > 1024 * 1024:
- raise IOError('Could not open "%s" (too big)' % f)
-
- with open("/etc/lsb-release", 'r') as fh:
- lines = fh.readlines()
-
- pat = re.compile(r'DISTRIB_CODENAME')
- for line in lines:
- if pat.search(line):
- return line.split('=')[1].rstrip('\n').rstrip('\r')
-
- return "UNKNOWN"
-
-
-def cmd(command, input=None, stderr=subprocess.STDOUT, stdout=subprocess.PIPE, stdin=None, timeout=None, env=None):
- '''Try to execute given command (array) and return its stdout, or return
- a textual error if it failed.'''
-
- try:
- sp = subprocess.Popen(command, stdin=stdin, stdout=stdout, stderr=stderr, close_fds=True, preexec_fn=subprocess_setup, env=env, universal_newlines=True)
- except OSError as e:
- return [127, str(e)]
-
- out, outerr = sp.communicate(input)
- # Handle redirection of stdout
- if out is None:
- out = ''
- # Handle redirection of stderr
- if outerr is None:
- outerr = ''
- return [sp.returncode, out + outerr]
-
-
-def cmd_pipe(command1, command2, input=None, stderr=subprocess.STDOUT, stdin=None):
- '''Try to pipe command1 into command2.'''
- try:
- sp1 = subprocess.Popen(command1, stdin=stdin, stdout=subprocess.PIPE, stderr=stderr, close_fds=True)
- sp2 = subprocess.Popen(command2, stdin=sp1.stdout, stdout=subprocess.PIPE, stderr=stderr, close_fds=True)
- except OSError as e:
- return [127, str(e)]
-
- out = sp2.communicate(input)[0]
- return [sp2.returncode, out]
-
-
-def cwd_has_enough_space(cdir, total_bytes):
- '''Determine if the partition of the current working directory has 'bytes'
- free.'''
- rc, df_output = cmd(['df'])
- if rc != 0:
- result = 'df failed, got exit code %d, expected %d\n' % (rc, 0)
- raise OSError(result)
- if rc != 0:
- return False
-
- kb = total_bytes / 1024
-
- mounts = dict()
- for line in df_output.splitlines():
- if '/' not in line:
- continue
- tmp = line.split()
- mounts[tmp[5]] = int(tmp[3])
-
- cdir = os.getcwd()
- while cdir != '/':
- if cdir not in mounts:
- cdir = os.path.dirname(cdir)
- continue
- return kb < mounts[cdir]
-
- return kb < mounts['/']
-
-
-def get_md5(filename):
- '''Gets the md5sum of the file specified'''
-
- (rc, report) = cmd(["/usr/bin/md5sum", "-b", filename])
- expected = 0
- assert (expected == rc)
-
- return report.split(' ')[0]
-
-
-def dpkg_compare_installed_version(pkg, check, version):
- '''Gets the version for the installed package, and compares it to the
- specified version.
- '''
- (rc, report) = cmd(["/usr/bin/dpkg", "-s", pkg])
- assert (rc == 0)
- assert ("Status: install ok installed" in report)
- installed_version = ""
- for line in report.splitlines():
- if line.startswith("Version: "):
- installed_version = line.split()[1]
-
- assert (installed_version != "")
-
- (rc, report) = cmd(["/usr/bin/dpkg", "--compare-versions", installed_version, check, version])
- assert (rc == 0 or rc == 1)
- if rc == 0:
- return True
- return False
-
-
-def _run_apt_command(pkg_list, apt_cmd='install'):
- env = os.environ.copy()
- env['DEBIAN_FRONTEND'] = 'noninteractive'
- # debugging version, but on precise doesn't actually run dpkg
- # command = ['apt-get', '-y', '--force-yes', '-o', 'Dpkg::Options::=--force-confold', '-o', 'Debug::pkgDPkgPM=true', apt_cmd]
- command = ['apt-get', '-y', '--force-yes', '-o', 'Dpkg::Options::=--force-confold', apt_cmd]
- command.extend(pkg_list)
- rc, report = cmd(command)
- return rc, report
-
-
-# note: for the following install_* functions, you probably want the
-# versions in the TestlibCase class (see below)
-def install_builddeps(src_pkg):
- rc, report = _run_apt_command([src_pkg], 'build-dep')
- assert(rc == 0)
-
-
-def install_package(package):
- rc, report = _run_apt_command([package], 'install')
- assert(rc == 0)
-
-
-def install_packages(pkg_list):
- rc, report = _run_apt_command(pkg_list, 'install')
- assert(rc == 0)
-
-
-def prepare_source(source, builder, cached_src, build_src, patch_system):
- '''Download and unpack source package, installing necessary build depends,
- adjusting the permissions for the 'builder' user, and returning the
- directory of the unpacked source. Patch system can be one of:
- - cdbs
- - dpatch
- - quilt
- - quiltv3
- - None (not the string)
-
- This is normally used like this:
-
- def setUp(self):
- ...
- self.topdir = os.getcwd()
- self.cached_src = os.path.join(os.getcwd(), "source")
- self.tmpdir = tempfile.mkdtemp(prefix='testlib', dir='/tmp')
- self.builder = testlib.TestUser()
- testlib.cmd(['chgrp', self.builder.login, self.tmpdir])
- os.chmod(self.tmpdir, 0o775)
-
- def tearDown(self):
- ...
- self.builder = None
- self.topdir = os.getcwd()
- if os.path.exists(self.tmpdir):
- testlib.recursive_rm(self.tmpdir)
-
- def test_suite_build(self):
- ...
- build_dir = testlib.prepare_source('foo', \
- self.builder, \
- self.cached_src, \
- os.path.join(self.tmpdir, \
- os.path.basename(self.cached_src)),
- "quilt")
- os.chdir(build_dir)
-
- # Example for typical build, adjust as necessary
- print("")
- print(" make clean")
- rc, report = testlib.cmd(['sudo', '-u', self.builder.login, 'make', 'clean'])
-
- print(" configure")
- rc, report = testlib.cmd(['sudo', '-u', self.builder.login, './configure', '--prefix=%s' % self.tmpdir, '--enable-debug'])
-
- print(" make (will take a while)")
- rc, report = testlib.cmd(['sudo', '-u', self.builder.login, 'make'])
-
- print(" make check (will take a while)",)
- rc, report = testlib.cmd(['sudo', '-u', self.builder.login, 'make', 'check'])
- expected = 0
- result = 'Got exit code %d, expected %d\n' % (rc, expected)
- self.assertEqual(expected, rc, result + report)
-
- def test_suite_cleanup(self):
- ...
- if os.path.exists(self.cached_src):
- testlib.recursive_rm(self.cached_src)
-
- It is up to the caller to clean up cached_src and build_src (as in the
- above example, often the build_src is in a tmpdir that is cleaned in
- tearDown() and the cached_src is cleaned in a one time clean-up
- operation (eg 'test_suite_cleanup()) which must be run after the build
- suite test (obviously).
- '''
-
- # Make sure we have a clean slate
- assert (os.path.exists(os.path.dirname(build_src)))
- assert (not os.path.exists(build_src))
-
- cdir = os.getcwd()
- if os.path.exists(cached_src):
- shutil.copytree(cached_src, build_src)
- os.chdir(build_src)
- else:
- # Only install the build dependencies on the initial setup
- install_builddeps(source)
- os.makedirs(build_src)
- os.chdir(build_src)
-
- # These are always needed
- pkgs = ['build-essential', 'dpkg-dev', 'fakeroot']
- install_packages(pkgs)
-
- rc, report = cmd(['apt-get', 'source', source])
- assert (rc == 0)
- shutil.copytree(build_src, cached_src)
- print("(unpacked %s)" % os.path.basename(glob.glob('%s_*.dsc' % source)[0]), end=' ')
-
- unpacked_dir = os.path.join(build_src, glob.glob('%s-*' % source)[0])
-
- # Now apply the patches. Do it here so that we don't mess up our cached
- # sources.
- os.chdir(unpacked_dir)
- assert (patch_system in ['cdbs', 'dpatch', 'quilt', 'quiltv3', None])
- if patch_system is not None and patch_system != "quiltv3":
- if patch_system == "quilt":
- os.environ.setdefault('QUILT_PATCHES', 'debian/patches')
- rc, report = cmd(['quilt', 'push', '-a'])
- assert (rc == 0)
- elif patch_system == "cdbs":
- rc, report = cmd(['./debian/rules', 'apply-patches'])
- assert (rc == 0)
- elif patch_system == "dpatch":
- rc, report = cmd(['dpatch', 'apply-all'])
- assert (rc == 0)
-
- cmd(['chown', '-R', '%s:%s' % (builder.uid, builder.gid), build_src])
- os.chdir(cdir)
-
- return unpacked_dir
-
-
-def get_changelog_version(source_dir):
- '''Extract a package version from a changelog'''
- package_version = ""
-
- changelog_file = os.path.join(source_dir, "debian/changelog")
-
- if os.path.exists(changelog_file):
- changelog = open(changelog_file, 'r')
- header = changelog.readline().split()
- package_version = header[1].strip('()')
-
- return package_version
-
-
-def _aa_status():
- '''Get aa-status output'''
- exe = "/usr/sbin/aa-status"
- assert (os.path.exists(exe))
- if os.geteuid() == 0:
- return cmd([exe])
- return cmd(['sudo', exe])
-
-
-def is_apparmor_loaded(path):
- '''Check if profile is loaded'''
- rc, report = _aa_status()
- if rc != 0:
- return False
-
- for line in report.splitlines():
- if line.endswith(path):
- return True
- return False
-
-
-def is_apparmor_confined(path):
- '''Check if application is confined'''
- rc, report = _aa_status()
- if rc != 0:
- return False
-
- for line in report.splitlines():
- if re.search('%s \(' % path, line):
- return True
- return False
-
-
-def check_apparmor(path, first_ubuntu_release, is_running=True):
- '''Check if path is loaded and confined for everything higher than the
- first Ubuntu release specified.
-
- Usage:
- rc, report = testlib.check_apparmor('/usr/sbin/foo', 8.04, is_running=True)
- if rc < 0:
- return self._skipped(report)
-
- expected = 0
- result = 'Got exit code %d, expected %d\n' % (rc, expected)
- self.assertEqual(expected, rc, result + report)
- '''
- global manager
- rc = -1
-
- if manager.lsb_release["Release"] < first_ubuntu_release:
- return (rc, "Skipped apparmor check")
-
- if not os.path.exists('/sbin/apparmor_parser'):
- return (rc, "Skipped (couldn't find apparmor_parser)")
-
- rc = 0
- msg = ""
- if not is_apparmor_loaded(path):
- rc = 1
- msg = "Profile not loaded for '%s'" % path
-
- # this check only makes sense it the 'path' is currently executing
- if is_running and rc == 0 and not is_apparmor_confined(path):
- rc = 1
- msg = "'%s' is not running in enforce mode" % path
-
- return (rc, msg)
-
-
-def get_gcc_version(gcc, full=True):
- gcc_version = 'none'
- if not gcc.startswith('/'):
- gcc = '/usr/bin/%s' % (gcc)
- if os.path.exists(gcc):
- gcc_version = 'unknown'
- lines = cmd([gcc, '-v'])[1].strip().splitlines()
- version_lines = [x for x in lines if x.startswith('gcc version')]
- if len(version_lines) == 1:
- gcc_version = " ".join(version_lines[0].split()[2:])
- if not full:
- return gcc_version.split()[0]
- return gcc_version
-
-
-def is_kdeinit_running():
- '''Test if kdeinit is running'''
- # applications that use kdeinit will spawn it if it isn't running in the
- # test. This is a problem because it does not exit. This is a helper to
- # check for it.
- rc, report = cmd(['ps', 'x'])
- if 'kdeinit4 Running' not in report:
- print("kdeinit not running (you may start/stop any KDE application then run this script again)", file=sys.stderr)
- return False
- return True
-
-
-def get_pkgconfig_flags(libs=[]):
- '''Find pkg-config flags for libraries'''
- assert (len(libs) > 0)
- rc, pkg_config = cmd(['pkg-config', '--cflags', '--libs'] + libs)
- expected = 0
- if rc != expected:
- print('Got exit code %d, expected %d\n' % (rc, expected), file=sys.stderr)
- assert(rc == expected)
- return pkg_config.split()
-
-
-def cap_to_name(cap_num):
- '''given an integer, return the capability name'''
- rc, output = cmd(['capsh', '--decode=%x' % cap_num])
- expected = 0
- if rc != expected:
- print('capsh: got exit code %d, expected %d\n' % (rc, expected), file=sys.stderr)
- cap_name = output.strip().split('=')[1]
- return cap_name
-
-
-def enumerate_capabilities():
- i = 0
- cap_list = []
- done = False
- while not done:
- cap_name = cap_to_name(pow(2, i))
- if cap_name == str(i):
- done = True
- else:
- cap_list.append(cap_name)
- i += 1
- if i > 64:
- done = True
- return cap_list
-
-
-class TestDaemon:
- '''Helper class to manage daemons consistently'''
- def __init__(self, init):
- '''Setup daemon attributes'''
- self.initscript = init
-
- def start(self):
- '''Start daemon'''
- rc, report = cmd([self.initscript, 'start'])
- expected = 0
- result = 'Got exit code %d, expected %d\n' % (rc, expected)
- time.sleep(2)
- if expected != rc:
- return (False, result + report)
-
- if "fail" in report:
- return (False, "Found 'fail' in report\n" + report)
-
- return (True, "")
-
- def stop(self):
- '''Stop daemon'''
- rc, report = cmd([self.initscript, 'stop'])
- expected = 0
- result = 'Got exit code %d, expected %d\n' % (rc, expected)
- if expected != rc:
- return (False, result + report)
-
- if "fail" in report:
- return (False, "Found 'fail' in report\n" + report)
-
- return (True, "")
-
- def reload(self):
- '''Reload daemon'''
- rc, report = cmd([self.initscript, 'force-reload'])
- expected = 0
- result = 'Got exit code %d, expected %d\n' % (rc, expected)
- if expected != rc:
- return (False, result + report)
-
- if "fail" in report:
- return (False, "Found 'fail' in report\n" + report)
-
- return (True, "")
-
- def restart(self):
- '''Restart daemon'''
- (res, str) = self.stop()
- if not res:
- return (res, str)
-
- (res, str) = self.start()
- if not res:
- return (res, str)
-
- return (True, "")
-
- def force_restart(self):
- '''Restart daemon even if already stopped'''
- (res, str) = self.stop()
-
- (res, str) = self.start()
- if not res:
- return (res, str)
-
- return (True, "")
-
- def status(self):
- '''Check daemon status'''
- rc, report = cmd([self.initscript, 'status'])
- expected = 0
- result = 'Got exit code %d, expected %d\n' % (rc, expected)
- if expected != rc:
- return (False, result + report)
-
- if "fail" in report:
- return (False, "Found 'fail' in report\n" + report)
-
- return (True, "")
-
-
-class TestlibManager(object):
- '''Singleton class used to set up per-test-run information'''
- def __init__(self):
- # Set glibc aborts to dump to stderr instead of the tty so test output
- # is more sane.
- os.environ.setdefault('LIBC_FATAL_STDERR_', '1')
-
- # check verbosity
- self.verbosity = False
- if (len(sys.argv) > 1 and '-v' in sys.argv[1:]):
- self.verbosity = True
-
- # Load LSB release file
- self.lsb_release = dict()
- if os.path.exists('/usr/bin/lsb_release') and not os.path.exists('/bin/lsb_release'):
- for line in subprocess.Popen(['lsb_release', '-a'], stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True).communicate()[0].splitlines():
- field, value = line.split(':', 1)
- value = value.strip()
- field = field.strip()
- # Convert numerics
- try:
- value = float(value)
- except:
- pass
- self.lsb_release.setdefault(field, value)
- if not self.lsb_release:
- self.lsb_release = {
- 'Distributor ID': 'Ubuntu Core',
- 'Description': 'Ubuntu Core 16',
- 'Release': 16,
- 'Codename': 'xenial',
- }
-
- # FIXME: hack OEM releases into known-Ubuntu versions
- if self.lsb_release['Distributor ID'] == "HP MIE (Mobile Internet Experience)":
- if self.lsb_release['Release'] == 1.0:
- self.lsb_release['Distributor ID'] = "Ubuntu"
- self.lsb_release['Release'] = 8.04
- else:
- raise OSError("Unknown version of HP MIE")
-
- # FIXME: hack to assume a most-recent release if we're not
- # running under Ubuntu.
- if self.lsb_release['Distributor ID'] not in ["Ubuntu", "Linaro"]:
- self.lsb_release['Release'] = 10000
- # Adjust Linaro release to pretend to be Ubuntu
- if self.lsb_release['Distributor ID'] in ["Linaro"]:
- self.lsb_release['Distributor ID'] = "Ubuntu"
- self.lsb_release['Release'] -= 0.01
-
- # Load arch
- if not os.path.exists('/usr/bin/dpkg'):
- machine = cmd(['uname', '-m'])[1].strip()
- if machine.endswith('86'):
- self.dpkg_arch = 'i386'
- elif machine.endswith('_64'):
- self.dpkg_arch = 'amd64'
- elif machine.startswith('arm'):
- self.dpkg_arch = 'armel'
- else:
- raise ValueError("Unknown machine type '%s'" % (machine))
- else:
- self.dpkg_arch = cmd(['dpkg', '--print-architecture'])[1].strip()
-
- # Find kernel version
- self.kernel_is_ubuntu = False
- self.kernel_version_signature = None
- self.kernel_version = cmd(["uname", "-r"])[1].strip()
- versig = '/proc/version_signature'
- if os.path.exists(versig):
- self.kernel_is_ubuntu = True
- self.kernel_version_signature = open(versig).read().strip()
- self.kernel_version_ubuntu = self.kernel_version
- elif os.path.exists('/usr/bin/dpkg'):
- # this can easily be inaccurate but is only an issue for Dapper
- rc, out = cmd(['dpkg', '-l', 'linux-image-%s' % (self.kernel_version)])
- if rc == 0:
- self.kernel_version_signature = out.strip().split('\n').pop().split()[2]
- self.kernel_version_ubuntu = self.kernel_version_signature
- if self.kernel_version_signature is None:
- # Attempt to fall back to something for non-Debian-based
- self.kernel_version_signature = self.kernel_version
- self.kernel_version_ubuntu = self.kernel_version
- # Build ubuntu version without hardware suffix
- try:
- self.kernel_version_ubuntu = "-".join([x for x in self.kernel_version_signature.split(' ')[1].split('-') if re.search('^[0-9]', x)])
- except:
- pass
-
- # Find gcc version
- self.gcc_version = get_gcc_version('gcc')
-
- # Find libc
- self.path_libc = [x.split()[2] for x in cmd(['ldd', '/bin/ls'])[1].splitlines() if x.startswith('\tlibc.so.')][0]
-
- # Report self
- if self.verbosity:
- kernel = self.kernel_version_ubuntu
- if kernel != self.kernel_version_signature:
- kernel += " (%s)" % (self.kernel_version_signature)
- print("Running test: '%s' distro: '%s %.2f' kernel: '%s' arch: '%s' uid: %d/%d SUDO_USER: '%s')" % (
- sys.argv[0],
- self.lsb_release['Distributor ID'],
- self.lsb_release['Release'],
- kernel,
- self.dpkg_arch,
- os.geteuid(), os.getuid(),
- os.environ.get('SUDO_USER', '')), file=sys.stdout)
- sys.stdout.flush()
-
- # Additional heuristics
- # if os.environ.get('SUDO_USER', os.environ.get('USER', '')) in ['mdeslaur']:
- # sys.stdout.write("Replying to Marc Deslauriers in http://launchpad.net/bugs/%d: " % random.randint(600000, 980000))
- # sys.stdout.flush()
- # time.sleep(0.5)
- # sys.stdout.write("destroyed\n")
- # time.sleep(0.5)
-
- def hello(self, msg):
- print("Hello from %s" % (msg), file=sys.stderr)
-# The central instance
-manager = TestlibManager()
-
-
-class TestlibCase(unittest.TestCase):
- def __init__(self, *args):
- '''This is called for each TestCase test instance, which isn't much better
- than SetUp.'''
-
- unittest.TestCase.__init__(self, *args)
-
- # Attach to and duplicate dicts from manager singleton
- self.manager = manager
- # self.manager.hello(repr(self) + repr(*args))
- self.my_verbosity = self.manager.verbosity
- self.lsb_release = self.manager.lsb_release
- self.dpkg_arch = self.manager.dpkg_arch
- self.kernel_version = self.manager.kernel_version
- self.kernel_version_signature = self.manager.kernel_version_signature
- self.kernel_version_ubuntu = self.manager.kernel_version_ubuntu
- self.kernel_is_ubuntu = self.manager.kernel_is_ubuntu
- self.gcc_version = self.manager.gcc_version
- self.path_libc = self.manager.path_libc
-
- def version_compare(self, one, two):
- if 'version_compare' in dir(apt_pkg):
- return apt_pkg.version_compare(one, two)
- else:
- return apt_pkg.VersionCompare(one, two)
-
- def assertFileType(self, filename, filetype, strict=True):
- '''Checks the file type of the file specified'''
-
- (rc, report, out) = self._testlib_shell_cmd(["/usr/bin/file", "-b", filename])
- out = out.strip()
- expected = 0
- # Absolutely no idea why this happens on Hardy
- if self.lsb_release['Release'] == 8.04 and rc == 255 and len(out) > 0:
- rc = 0
- result = 'Got exit code %d, expected %d:\n%s\n' % (rc, expected, report)
- self.assertEqual(expected, rc, result)
-
- if strict:
- filetype = '^%s$' % (filetype)
- else:
- # accept if the beginning of the line matches
- filetype = '^%s' % (filetype)
- result = 'File type reported by file: [%s], expected regex: [%s]\n' % (out, filetype)
- self.assertNotEqual(None, re.search(filetype, out), result)
-
- def yank_commonname_from_cert(self, certfile):
- '''Extract the commonName from a given PEM'''
- rc, out = cmd(['openssl', 'asn1parse', '-in', certfile])
- if rc == 0:
- ready = False
- for line in out.splitlines():
- if ready:
- return line.split(':')[-1]
- if ':commonName' in line:
- ready = True
- return socket.getfqdn()
-
- def announce(self, text):
- if self.my_verbosity:
- print("(%s) " % (text), file=sys.stderr, end='')
- sys.stdout.flush()
-
- def make_clean(self):
- rc, output = self.shell_cmd(['make', 'clean'])
- self.assertEqual(rc, 0, output)
-
- def get_makefile_compiler(self):
- # Find potential compiler name
- compiler = 'gcc'
- if os.path.exists('Makefile'):
- for line in open('Makefile'):
- if line.startswith('CC') and '=' in line:
- items = [x.strip() for x in line.split('=')]
- if items[0] == 'CC':
- compiler = items[1]
- break
- return compiler
-
- def make_target(self, target, expected=0):
- '''Compile a target and report output'''
-
- compiler = self.get_makefile_compiler()
- rc, output = self.shell_cmd(['make', target])
- self.assertEqual(rc, expected, 'rc(%d)!=%d:\n' % (rc, expected) + output)
- self.assertTrue('%s ' % (compiler) in output, 'Expected "%s":' % (compiler) + output)
- return output
-
- # call as return testlib.skipped()
- def _skipped(self, reason=""):
- '''Provide a visible way to indicate that a test was skipped'''
- if reason != "":
- reason = ': %s' % (reason)
- self.announce("skipped%s" % (reason))
- return False
-
- def _testlib_shell_cmd(self, args, stdin=None, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, env=None):
- argstr = "'" + "', '".join(args).strip() + "'"
- rc, out = cmd(args, stdin=stdin, stdout=stdout, stderr=stderr, env=env)
- report = 'Command: ' + argstr + '\nOutput:\n' + out
- return rc, report, out
-
- def shell_cmd(self, args, stdin=None):
- return cmd(args, stdin=stdin)
-
- def assertShellExitEquals(self, expected, args, stdin=None, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, env=None, msg="", bare_report=False):
- '''Test a shell command matches a specific exit code'''
- rc, report, out = self._testlib_shell_cmd(args, stdin=stdin, stdout=stdout, stderr=stderr, env=env)
- result = 'Got exit code %d, expected %d\n' % (rc, expected)
- self.assertEqual(expected, rc, msg + result + report)
- if bare_report:
- return out
- else:
- return report
-
- # make sure exit value is in a list of expected values
- def assertShellExitIn(self, expected, args, stdin=None, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, msg=""):
- '''Test a shell command matches a specific exit code'''
- rc, report, out = self._testlib_shell_cmd(args, stdin=stdin, stdout=stdout, stderr=stderr)
- result = 'Got exit code %d, expected one of %s\n' % (rc, ', '.join(map(str, expected)))
- self.assertIn(rc, expected, msg + result + report)
-
- def assertShellExitNotEquals(self, unwanted, args, stdin=None, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, msg=""):
- '''Test a shell command doesn't match a specific exit code'''
- rc, report, out = self._testlib_shell_cmd(args, stdin=stdin, stdout=stdout, stderr=stderr)
- result = 'Got (unwanted) exit code %d\n' % rc
- self.assertNotEqual(unwanted, rc, msg + result + report)
-
- def assertShellOutputContains(self, text, args, stdin=None, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, msg="", invert=False, expected=None):
- '''Test a shell command contains a specific output'''
- rc, report, out = self._testlib_shell_cmd(args, stdin=stdin, stdout=stdout, stderr=stderr)
- result = 'Got exit code %d. Looking for text "%s"\n' % (rc, text)
- if not invert:
- self.assertTrue(text in out, msg + result + report)
- else:
- self.assertFalse(text in out, msg + result + report)
- if expected is not None:
- result = 'Got exit code %d. Expected %d (%s)\n' % (rc, expected, " ".join(args))
- self.assertEqual(rc, expected, msg + result + report)
-
- def assertShellOutputEquals(self, text, args, stdin=None, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, msg="", invert=False, expected=None):
- '''Test a shell command matches a specific output'''
- rc, report, out = self._testlib_shell_cmd(args, stdin=stdin, stdout=stdout, stderr=stderr)
- result = 'Got exit code %d. Looking for exact text "%s" (%s)\n' % (rc, text, " ".join(args))
- if not invert:
- self.assertEqual(text, out, msg + result + report)
- else:
- self.assertNotEqual(text, out, msg + result + report)
- if expected is not None:
- result = 'Got exit code %d. Expected %d (%s)\n' % (rc, expected, " ".join(args))
- self.assertEqual(rc, expected, msg + result + report)
-
- def _word_find(self, report, content, invert=False):
- '''Check for a specific string'''
- if invert:
- warning = 'Found "%s"\n' % content
- self.assertTrue(content not in report, warning + report)
- else:
- warning = 'Could not find "%s"\n' % content
- self.assertTrue(content in report, warning + report)
-
- def _test_sysctl_value(self, path, expected, msg=None, exists=True):
- sysctl = '/proc/sys/%s' % (path)
- self.assertEqual(exists, os.path.exists(sysctl), sysctl)
- value = None
- if exists:
- with open(sysctl) as sysctl_fd:
- value = int(sysctl_fd.read())
- report = "%s is not %d: %d" % (sysctl, expected, value)
- if msg:
- report += " (%s)" % (msg)
- self.assertEqual(value, expected, report)
- return value
-
- def set_sysctl_value(self, path, desired):
- sysctl = '/proc/sys/%s' % (path)
- self.assertTrue(os.path.exists(sysctl), "%s does not exist" % (sysctl))
- with open(sysctl, 'w') as sysctl_fh:
- sysctl_fh.write(str(desired))
- self._test_sysctl_value(path, desired)
-
- def kernel_at_least(self, introduced):
- return self.version_compare(self.kernel_version_ubuntu,
- introduced) >= 0
-
- def kernel_claims_cve_fixed(self, cve):
- changelog = "/usr/share/doc/linux-image-%s/changelog.Debian.gz" % (self.kernel_version)
- if os.path.exists(changelog):
- for line in gzip.open(changelog):
- if cve in line and "revert" not in line and "Revert" not in line:
- return True
- return False
-
- def install_builddeps(self, src_pkg):
- rc, report = _run_apt_command([src_pkg], 'build-dep')
- self.assertEqual(0, rc, 'Failed to install build-deps for %s\nOutput:\n%s' % (src_pkg, report))
-
- def install_package(self, package):
- rc, report = _run_apt_command([package], 'install')
- self.assertEqual(0, rc, 'Failed to install package %s\nOutput:\n%s' % (package, report))
-
- def install_packages(self, pkg_list):
- rc, report = _run_apt_command(pkg_list, 'install')
- self.assertEqual(0, rc, 'Failed to install packages %s\nOutput:\n%s' % (','.join(pkg_list), report))
-
-
-class TestGroup:
- '''Create a temporary test group and remove it again in the dtor.'''
-
- def __init__(self, group=None, lower=False):
- '''Create a new group'''
-
- self.group = None
- if group:
- if group_exists(group):
- raise ValueError('group name already exists')
- else:
- while(True):
- group = random_string(7, lower=lower)
- if not group_exists(group):
- break
-
- assert subprocess.call(['groupadd', group]) == 0
- self.group = group
- g = grp.getgrnam(self.group)
- self.gid = g[2]
-
- def __del__(self):
- '''Remove the created group.'''
-
- if self.group:
- rc, report = cmd(['groupdel', self.group])
- assert rc == 0
-
-
-class TestUser:
- '''Create a temporary test user and remove it again in the dtor.'''
-
- def __init__(self, login=None, home=True, group=None, uidmin=None, lower=False, shell=None):
- '''Create a new user account with a random password.
-
- By default, the login name is random, too, but can be explicitly
- specified with 'login'. By default, a home directory is created, this
- can be suppressed with 'home=False'.'''
-
- self.login = None
-
- if os.geteuid() != 0:
- raise ValueError("You must be root to run this test")
-
- if login:
- if login_exists(login):
- raise ValueError('login name already exists')
- else:
- while(True):
- login = 't' + random_string(7, lower=lower)
- if not login_exists(login):
- break
-
- self.salt = random_string(2)
- self.password = random_string(8, lower=lower)
- self.crypted = crypt.crypt(self.password, self.salt)
-
- creation = ['useradd', '-p', self.crypted]
- if home:
- creation += ['-m']
- if group:
- creation += ['-G', group]
- if uidmin:
- creation += ['-K', 'UID_MIN=%d' % uidmin]
- if shell:
- creation += ['-s', shell]
- creation += [login]
- assert subprocess.call(creation) == 0
- # Set GECOS
- assert subprocess.call(['usermod', '-c', 'Buddy %s' % (login), login]) == 0
-
- self.login = login
- p = pwd.getpwnam(self.login)
- self.uid = p[2]
- self.gid = p[3]
- self.gecos = p[4]
- self.home = p[5]
- self.shell = p[6]
-
- def __del__(self):
- '''Remove the created user account.'''
-
- if self.login:
- # sanity check the login name so we don't accidentally wipe too much
- if len(self.login) > 3 and '/' not in self.login:
- subprocess.call(['rm', '-rf', '/home/' + self.login, '/var/mail/' + self.login])
- rc, report = cmd(['userdel', '-f', self.login])
- assert rc == 0
-
- def add_to_group(self, group):
- '''Add user to the specified group name'''
- rc, report = cmd(['usermod', '-G', group, self.login])
- if rc != 0:
- print(report)
- assert rc == 0
-
-
-class AddUser:
- '''Create a temporary test user and remove it again in the dtor.'''
-
- def __init__(self, login=None, home=True, encrypt_home=False, group=None, lower=False, shell=None):
- '''Create a new user account with a random password.
-
- By default, the login name is random, too, but can be explicitly
- specified with 'login'. By default, a home directory is created, this
- can be suppressed with 'home=False'.
-
- This class differs from the TestUser class in that the adduser/deluser
- tools are used rather than the useradd/user/del tools. The adduser
- program is the only commandline program that can be used to add a new
- user with an encrypted home directory. It is possible that the AddUser
- class may replace the TestUser class in the future.'''
-
- self.login = None
-
- if os.geteuid() != 0:
- raise ValueError("You must be root to run this test")
-
- if login:
- if login_exists(login):
- raise ValueError('login name already exists')
- else:
- while(True):
- login = 't' + random_string(7, lower=True)
- if not login_exists(login):
- break
-
- self.password = random_string(8, lower=lower)
-
- creation = ['adduser', '--quiet']
-
- if not home:
- creation += ['--no-create-home']
- elif encrypt_home:
- creation += ['--encrypt-home']
-
- if shell:
- creation += ['--shell', shell]
-
- creation += ['--gecos', 'Buddy %s' % (login), login]
-
- child = pexpect.spawn(creation.pop(0), creation, timeout=5)
- assert child.expect('Enter new UNIX password:') == 0
- child.sendline(self.password)
- assert child.expect('Retype new UNIX password:') == 0
- child.sendline(self.password)
-
- child.wait()
- child.close()
- assert child.exitstatus == 0
- assert child.signalstatus is None
-
- self.login = login
-
- if group:
- assert self.add_to_group(group) == 0
-
- p = pwd.getpwnam(self.login)
- self.uid = p[2]
- self.gid = p[3]
- self.gecos = p[4]
- self.home = p[5]
- self.shell = p[6]
-
- def __del__(self):
- '''Remove the created user account.'''
-
- if self.login:
- # sanity check the login name so we don't accidentally wipe too much
- rc, report = cmd(['deluser', '--remove-home', self.login])
- assert rc == 0
-
- def add_to_group(self, group):
- '''Add user to the specified group name'''
- rc, report = cmd(['adduser', self.login, group])
- if rc != 0:
- print(report)
- assert rc == 0
-
-
-# Timeout handler using alarm() from John P. Speno's Pythonic Avocado
-class TimeoutFunctionException(Exception):
- """Exception to raise on a timeout"""
- pass
-
-
-class TimeoutFunction:
- def __init__(self, function, timeout):
- self.timeout = timeout
- self.function = function
-
- def handle_timeout(self, signum, frame):
- raise TimeoutFunctionException()
-
- def __call__(self, *args, **kwargs):
- old = signal.signal(signal.SIGALRM, self.handle_timeout)
- signal.alarm(self.timeout)
- try:
- result = self.function(*args, **kwargs)
- finally:
- signal.signal(signal.SIGALRM, old)
- signal.alarm(0)
- return result
-
-
-def main():
- print("hi")
- unittest.main()
diff --git a/bin/touchpad_driver_info.py b/bin/touchpad_driver_info.py
index 431b874..9f6278f 100755
--- a/bin/touchpad_driver_info.py
+++ b/bin/touchpad_driver_info.py
@@ -67,8 +67,8 @@ def main():
if attributes:
modinfo = TouchpadDriver(attributes['driver'])
attributes['version'] = modinfo.driver_version
- print("%s: %s\n%s: %s\n%s: %s\n" %
- ('Device', attributes['product'],
+ print("%s: %s\n%s: %s\n%s: %s\n" % (
+ 'Device', attributes['product'],
'Driver', attributes['driver'],
'Driver Version', attributes['version']))
else:
diff --git a/bin/touchpad_test.py b/bin/touchpad_test.py
index bc6a474..6ecde92 100755
--- a/bin/touchpad_test.py
+++ b/bin/touchpad_test.py
@@ -8,8 +8,8 @@ from gettext import gettext as _
gi.require_version('Gdk', '3.0')
gi.require_version('Gio', '2.0')
gi.require_version("Gtk", "3.0")
-from gi.repository import Gio, Gtk, Gdk
-from optparse import OptionParser
+from gi.repository import Gio, Gtk, Gdk # noqa: E402
+from optparse import OptionParser # noqa: E402
EXIT_WITH_FAILURE = 1
@@ -205,5 +205,6 @@ def main(args):
return scroller.exit_code
+
if __name__ == "__main__":
sys.exit(main(sys.argv[1:]))
diff --git a/bin/virtualization.py b/bin/virtualization.py
index 2b7cd54..10b827b 100755
--- a/bin/virtualization.py
+++ b/bin/virtualization.py
@@ -330,7 +330,7 @@ class KVMTest(object):
# Attempt download
try:
- resp = urllib.request.urlretrieve(full_url, cloud_iso)
+ urllib.request.urlretrieve(full_url, cloud_iso)
except (IOError,
OSError,
urllib.error.HTTPError,
@@ -400,25 +400,6 @@ class KVMTest(object):
Generate Cloud meta data and creates an iso object
to be mounted as virtual device to instance during boot.
"""
-
- user_data = """\
-#cloud-config
-
-runcmd:
- - [ sh, -c, echo "========= CERTIFICATION TEST =========" ]
-
-power_state:
- mode: halt
- message: Bye
- timeout: 480
-
-final_message: CERTIFICATION BOOT COMPLETE
-"""
-
- meta_data = """\
-{ echo instance-id: iid-local01; echo local-hostname, certification; }
-"""
-
for file in ['user-data', 'meta-data']:
logging.debug("Creating cloud %s", file)
with open(file, "wt") as data_file:
@@ -427,11 +408,11 @@ final_message: CERTIFICATION BOOT COMPLETE
# Create Data ISO hosting user & meta cloud config data
try:
- iso_build = check_output(
+ check_output(
['genisoimage', '-output', 'seed.iso', '-volid',
'cidata', '-joliet', '-rock', 'user-data', 'meta-data'],
universal_newlines=True)
- except CalledProcessError as exception:
+ except CalledProcessError:
logging.exception("Cloud data disk creation failed")
def log_check(self, stream):
@@ -483,7 +464,7 @@ final_message: CERTIFICATION BOOT COMPLETE
self.create_cloud_disk()
# Boot Virtual Machine
- instance = self.boot_image(self.image)
+ self.boot_image(self.image)
# If running in console, reset console window to regain
# control from VM Serial I/0
@@ -659,7 +640,7 @@ class LXDTest(object):
logging.debug("Attempting download of {} from {}".format(filename,
url))
try:
- resp = urllib.request.urlretrieve(url, filename)
+ urllib.request.urlretrieve(url, filename)
except (IOError,
OSError,
urllib.error.HTTPError,
diff --git a/bin/volume_test.py b/bin/volume_test.py
index a949f93..d1aeffd 100755
--- a/bin/volume_test.py
+++ b/bin/volume_test.py
@@ -8,12 +8,12 @@ from subprocess import check_output
TYPES = ("source", "sink")
-active_entries_regex = re.compile("\* index.*?(?=properties)", re.DOTALL)
-entries_regex = re.compile("index.*?(?=properties)", re.DOTALL)
-index_regex = re.compile("(?<=index: )[0-9]*")
-muted_regex = re.compile("(?<=muted: ).*")
-volume_regex = re.compile("(?<=volume: 0: )\s*[0-9]*")
-name_regex = re.compile("(?<=name:).*")
+active_entries_regex = re.compile(r"\* index.*?(?=properties)", re.DOTALL)
+entries_regex = re.compile(r"index.*?(?=properties)", re.DOTALL)
+index_regex = re.compile(r"(?<=index: )[0-9]*")
+muted_regex = re.compile(r"(?<=muted: ).*")
+volume_regex = re.compile(r"(?<=volume: 0: )\s*[0-9]*")
+name_regex = re.compile(r"(?<=name:).*")
def check_muted():
@@ -27,26 +27,29 @@ def check_muted():
pacmd_entries = check_output(["pacmd", "list-%ss" % vtype],
universal_newlines=True)
except Exception as e:
- print("Error when running pacmd list-%ss: %s" % (vtype, e),
- file=sys.stderr)
+ print(
+ "Error when running pacmd list-%ss: %s" % (vtype, e),
+ file=sys.stderr)
return 1
active_entry_match = active_entries_regex.search(pacmd_entries)
if active_entry_match:
active_entry = active_entry_match.group()
else:
- print("Unable to find a %s active_entry in the pacmd list-%ss"\
- " output\npacmd output was: %s" %
- (vtype, vtype, pacmd_entries), file=sys.stderr)
+ print(
+ "Unable to find a %s active_entry in the pacmd list-%ss"
+ " output\npacmd output was: %s" %
+ (vtype, vtype, pacmd_entries), file=sys.stderr)
return 1
name_match = name_regex.search(active_entry)
if name_match:
name = name_match.group()
else:
- print("Unable to determine device bus information from the"\
- " pacmd list-%ss output\npacmd output was: %s" %
- (vtype, pacmd_entries), file=sys.stderr)
+ print(
+ "Unable to determine device bus information from the"
+ " pacmd list-%ss output\npacmd output was: %s" %
+ (vtype, pacmd_entries), file=sys.stderr)
return 1
muted_match = muted_regex.search(active_entry)
@@ -58,9 +61,10 @@ def check_muted():
else:
print("PASS: Audio is not muted on %s %s" % (name, vtype))
else:
- print("Unable to find mute information in the pacmd list-%ss"\
- " output for device %s\npacmd output was: %s" %
- (vtype, name, pacmd_entries), file=sys.stderr)
+ print(
+ "Unable to find mute information in the pacmd list-%ss"
+ " output for device %s\npacmd output was: %s" %
+ (vtype, name, pacmd_entries), file=sys.stderr)
return 1
return retval
@@ -77,8 +81,9 @@ def check_volume(minvol, maxvol):
pacmd_entries = check_output(["pacmd", "list-%ss" % vtype],
universal_newlines=True)
except Exception as e:
- print("Error when running pacmd list-%ss: %s" % (vtype, e),
- file=sys.stderr)
+ print(
+ "Error when running pacmd list-%ss: %s" % (vtype, e),
+ file=sys.stderr)
return 1
entries = entries_regex.findall(pacmd_entries)
@@ -88,32 +93,36 @@ def check_volume(minvol, maxvol):
if name_match:
name = name_match.group()
else:
- print("Unable to determine device bus information from the"\
- " pacmd list-%ss output\npacmd output was: %s" %
- (vtype, pacmd_entries), file=sys.stderr)
+ print(
+ "Unable to determine device bus information from the"
+ " pacmd list-%ss output\npacmd output was: %s" %
+ (vtype, pacmd_entries), file=sys.stderr)
return 1
volume_match = volume_regex.search(entry)
if volume_match:
volume = int(volume_match.group().strip())
if volume > maxvol:
- print ("FAIL: Volume of %d is greater than"\
- " maximum of %d for %s %s" % (volume, maxvol,
- name, vtype))
+ print(
+ "FAIL: Volume of %d is greater than"
+ " maximum of %d for %s %s" %
+ (volume, maxvol, name, vtype))
retval = 1
elif volume < minvol:
- print ("FAIL: Volume of %d is less than"\
- " minimum of %d for %s %s" % (volume, minvol,
- name, vtype))
+ print(
+ "FAIL: Volume of %d is less than"
+ " minimum of %d for %s %s" %
+ (volume, minvol, name, vtype))
retval = 1
else:
- print ("PASS: Volume is %d for %s %s" % (volume, name,
- vtype))
+ print(
+ "PASS: Volume is %d for %s %s" %
+ (volume, name, vtype))
else:
- print("Unable to find volume information in the pacmd"\
- " list-%ss output for device %s.\npacmd output "\
- "was: %s" %
- (vtype, name, pacmd_entries), file=sys.stderr)
+ print(
+ "Unable to find volume information in the pacmd"
+ " list-%ss output for device %s.\npacmd output "
+ "was: %s" % (vtype, name, pacmd_entries), file=sys.stderr)
return 1
return retval
@@ -134,5 +143,6 @@ def main():
check_volume_retval = check_volume(args.minvol, args.maxvol)
return check_muted_retval or check_volume_retval
+
if __name__ == "__main__":
sys.exit(main())
diff --git a/bin/wifi_ap_wizard.py b/bin/wifi_ap_wizard.py
index 5506d01..9029040 100755
--- a/bin/wifi_ap_wizard.py
+++ b/bin/wifi_ap_wizard.py
@@ -61,5 +61,6 @@ def main():
raise SystemExit('Did not get prompted ("{}")'.format(prompt))
wizard.writeline(response)
+
if __name__ == '__main__':
main()
diff --git a/bin/wifi_client_test_netplan.py b/bin/wifi_client_test_netplan.py
index c74320e..6f8281c 100755
--- a/bin/wifi_client_test_netplan.py
+++ b/bin/wifi_client_test_netplan.py
@@ -21,8 +21,6 @@ import subprocess as sp
import textwrap
import time
import shutil
-from struct import pack
-from socket import inet_ntoa
import sys
print = functools.partial(print, flush=True)
@@ -135,7 +133,8 @@ def generate_test_config(interface, ssid, psk, address, dhcp):
nameservers: {}
Static IP no dhcp:
- >>> print(generate_test_config("eth0", "my_ap", "s3cr3t", "192.168.1.1", False))
+ >>> print(generate_test_config(
+ "eth0", "my_ap", "s3cr3t", "192.168.1.1", False))
# This is the network config written by checkbox
network:
version: 2
@@ -162,7 +161,8 @@ def generate_test_config(interface, ssid, psk, address, dhcp):
password = "password: " + psk
else:
password = ""
- return textwrap.dedent(np_cfg.format(interface, ssid, password, address, dhcp))
+ return textwrap.dedent(
+ np_cfg.format(interface, ssid, password, address, dhcp))
def write_test_config(config):
diff --git a/bin/wifi_master_mode.py b/bin/wifi_master_mode.py
index 3888caa..d52d5c0 100755
--- a/bin/wifi_master_mode.py
+++ b/bin/wifi_master_mode.py
@@ -67,7 +67,7 @@ class WifiMasterMode():
logging.info(output)
logging.info("AP successfully established.")
child.terminate()
- if child.poll() is not 0:
+ if child.poll() != 0:
output = child.stdout.read()
logging.error(log + output)
logging.error('AP failed to start')
diff --git a/bin/wifi_time2reconnect.py b/bin/wifi_time2reconnect.py
index d4ae2c0..a1b49b5 100755
--- a/bin/wifi_time2reconnect.py
+++ b/bin/wifi_time2reconnect.py
@@ -7,9 +7,8 @@ import time
import subprocess
from datetime import datetime
try:
- from subprocess import DEVNULL # >= python3.3
+ from subprocess import DEVNULL # >= python3.3
except ImportError:
- import os
DEVNULL = open(os.devnull, 'wb')
IFACE = None
@@ -21,7 +20,7 @@ def main():
Check the time needed to reconnect an active WIFI connection
"""
devices = subprocess.getoutput('nmcli dev')
- match = re.search('(\w+)\s+(802-11-wireless|wifi)\s+connected', devices)
+ match = re.search(r'(\w+)\s+(802-11-wireless|wifi)\s+connected', devices)
if match:
IFACE = match.group(1)
else:
@@ -46,7 +45,7 @@ def main():
return 1
subprocess.call(
- 'nmcli dev disconnect iface %s' %IFACE,
+ 'nmcli dev disconnect iface %s' % IFACE,
stdout=open(os.devnull, 'w'),
stderr=subprocess.STDOUT,
shell=True)
@@ -55,13 +54,13 @@ def main():
start = datetime.now()
subprocess.call(
- 'nmcli con up uuid %s --timeout %s' %(uuid, TIMEOUT),
+ 'nmcli con up uuid %s --timeout %s' % (uuid, TIMEOUT),
stdout=open(os.devnull, 'w'),
stderr=subprocess.STDOUT,
shell=True)
delta = datetime.now() - start
- print('%.2f Seconds' %delta.total_seconds())
+ print('%.2f Seconds' % delta.total_seconds())
return 0
diff --git a/bin/window_test.py b/bin/window_test.py
index ee79a89..895e2db 100755
--- a/bin/window_test.py
+++ b/bin/window_test.py
@@ -277,8 +277,7 @@ def print_move_window(iterations, timeout, *args):
for it in range(iterations):
print('Iteration %d of %d:' % (it + 1, iterations))
- exit_status = move_window('glxgears',
- timeout)
+ status = move_window('glxgears', timeout)
print('')
return status
@@ -290,7 +289,8 @@ def main():
'open-close-multi': print_open_close_multi,
'move': print_move_window}
- parser = ArgumentParser(description='Script that performs window operation')
+ parser = ArgumentParser(
+ description='Script that performs window operation')
parser.add_argument('-t', '--test',
default='all',
help='The name of the test to run. \
@@ -338,5 +338,6 @@ def main():
return status
+
if __name__ == '__main__':
exit(main())
diff --git a/bin/wwan_tests.py b/bin/wwan_tests.py
index df44cf6..9bbf1e7 100755
--- a/bin/wwan_tests.py
+++ b/bin/wwan_tests.py
@@ -278,7 +278,7 @@ class ThreeGppConnection():
_wwan_radio_on()
time.sleep(args.wwan_setup_time)
ret_code = _ping_test(args.wwan_net_if)
- except:
+ except subprocess.SubprocessError:
pass
_destroy_3gpp_connection()
_wwan_radio_off()
@@ -312,29 +312,6 @@ class Resources():
mm = MMDbus()
for m in mm.get_modem_ids():
print("mm_id: {}".format(m))
- # Removed ability to fetch supported RATs when adding CLI support
- # to this script. It is somewhat messy to scrape this from mmcli
- # output and it wasn't actually used in test definitions.
- #
- # cap_bits = mm.get_rat_support(m)
- # if cap_bits != 0:
- # if (cap_bits & MMModemCapability['MM_MODEM_CAPABILITY_POTS']):
- # print("pots: supported")
- # if (cap_bits &
- # MMModemCapability['MM_MODEM_CAPABILITY_CDMA_EVDO']):
- # print("cdma_evdo: supported")
- # if (cap_bits &
- # MMModemCapability['MM_MODEM_CAPABILITY_GSM_UMTS']):
- # print("gsm_umts: supported")
- # if (cap_bits &
- # MMModemCapability['MM_MODEM_CAPABILITY_LTE']):
- # print("lte: supported")
- # if (cap_bits &
- # MMModemCapability['MM_MODEM_CAPABILITY_LTE_ADVANCED']):
- # print("lte_advanced: supported")
- # if (cap_bits &
- # MMModemCapability['MM_MODEM_CAPABILITY_IRIDIUM']):
- # print("iridium: supported")
print("hw_id: {}".format(mm.get_equipment_id(m)))
print("manufacturer: {}".format(mm.get_manufacturer(m)))
print("model: {}".format(mm.get_model_name(m)))
diff --git a/bin/xrandr_cycle.py b/bin/xrandr_cycle.py
index 6743f6f..8d87217 100755
--- a/bin/xrandr_cycle.py
+++ b/bin/xrandr_cycle.py
@@ -117,11 +117,11 @@ if 'PLAINBOX_PROVIDER_DATA' in os.environ:
shutter_xml_template = os.path.join(os.environ['PLAINBOX_PROVIDER_DATA'],
"settings", "shutter.xml")
else:
- shutter_xml_template = os.path.join(os.path.split(os.path.dirname(
- os.path.realpath(__file__)))[0],
- "data",
- "settings",
- "shutter.xml")
+ shutter_xml_template = os.path.join(
+ os.path.split(os.path.dirname(os.path.realpath(__file__)))[0],
+ "data",
+ "settings",
+ "shutter.xml")
if args.keyword:
screenshot_path = screenshot_path + '_' + args.keyword
@@ -167,7 +167,7 @@ try:
'folder="%s"' % screenshot_path, content))
new_profile.close()
old_profile.close()
-except:
+except (IOError, OSError):
raise SystemExit("ERROR: While updating folder name "
"in shutter profile: {}".format(sys.exc_info()))
@@ -198,7 +198,7 @@ for mode in highest_modes:
print("""Could not capture screenshot -
you may need to install the package 'shutter'.""")
- except:
+ except (IOError, OSError):
print("""Could not configure screenshot tool -
you may need to install the package 'shutter',
or check that {}/{} exists and is writable.""".format(
@@ -220,7 +220,7 @@ try:
with tarfile.open(screenshot_path + '.tgz', 'w:gz') as screen_tar:
for screen in os.listdir(screenshot_path):
screen_tar.add(screenshot_path + '/' + screen, screen)
-except:
+except (IOError, OSError):
pass
# Output some fun facts and knock off for the day
diff --git a/requirements/container-tests-provider-checkbox b/requirements/container-tests-provider-checkbox
index 9f69ffe..7a4404c 100755
--- a/requirements/container-tests-provider-checkbox
+++ b/requirements/container-tests-provider-checkbox
@@ -18,3 +18,4 @@ python3 $TMPDIR/plainbox-provider-resource/manage.py develop --force
./manage.py validate
./manage.py test -u
+./manage.py test -f
diff --git a/requirements/deb-base.txt b/requirements/deb-base.txt
index 6f9256f..74893e2 100644
--- a/requirements/deb-base.txt
+++ b/requirements/deb-base.txt
@@ -1 +1,2 @@
shellcheck
+flake8