diff options
author | Sylvain Pineau <sylvain.pineau@canonical.com> | 2020-07-17 10:49:33 +0200 |
---|---|---|
committer | Sylvain Pineau <sylvain.pineau@canonical.com> | 2020-07-17 10:49:33 +0200 |
commit | 53a9dace03fea49a2de030e7cd1999155637b6ef (patch) | |
tree | 11ca39912f0ec5993853f8f86b7d37340532195f /bin | |
parent | 5476520cd9983b2c8d65c98ea1ba542ac922fe95 (diff) |
bin:*.py: Fix all Flake8 errors
Diffstat (limited to 'bin')
68 files changed, 764 insertions, 622 deletions
diff --git a/bin/accelerometer_test.py b/bin/accelerometer_test.py index 2d95ca8..93ae07f 100755 --- a/bin/accelerometer_test.py +++ b/bin/accelerometer_test.py @@ -23,6 +23,7 @@ The purpose of this script is to simply interact with an onboard accelerometer, and check to be sure that the x, y, z axis respond to physical movement of hardware. ''' + from argparse import ArgumentParser import gi import logging @@ -34,9 +35,10 @@ import time gi.require_version('Gdk', '3.0') gi.require_version('GLib', '2.0') gi.require_version("Gtk", "3.0") -from gi.repository import Gdk, GLib, Gtk -from subprocess import Popen, PIPE, check_output, STDOUT, CalledProcessError -from checkbox_support.parsers.modinfo import ModinfoParser +from gi.repository import Gdk, GLib, Gtk # noqa: E402 +from subprocess import Popen, PIPE, check_output, STDOUT # noqa: E402 +from subprocess import CalledProcessError # noqa: E402 +from checkbox_support.parsers.modinfo import ModinfoParser # noqa: E402 handler = logging.StreamHandler() logger = logging.getLogger() @@ -86,7 +88,7 @@ class AccelerometerUI(Gtk.Window): def update_axis_icon(self, direction): """Change desired directional icon to checkmark""" - exec('self.%s_icon.set_from_stock' % (direction) \ + exec('self.%s_icon.set_from_stock' % (direction) + '(Gtk.STOCK_YES, size=Gtk.IconSize.BUTTON)') def update_debug_label(self, text): @@ -131,7 +133,7 @@ class AxisData(threading.Thread): self.x_test_pool = ["up", "down"] self.y_test_pool = ["left", "right"] - if self.ui == None: + if self.ui is None: self.ui.enabled = False def grab_current_readings(self): @@ -223,8 +225,8 @@ class AxisData(threading.Thread): def insert_supported_module(oem_module): """Try and insert supported module to see if we get any init errors""" try: - stream = check_output(['modinfo', oem_module], stderr=STDOUT, - universal_newlines=True) + stream = check_output( + ['modinfo', oem_module], stderr=STDOUT, universal_newlines=True) except CalledProcessError as err: print("Error accessing modinfo for %s: " % oem_module, file=sys.stderr) print(err.output, file=sys.stderr) @@ -232,7 +234,7 @@ def insert_supported_module(oem_module): parser = ModinfoParser(stream) module = os.path.basename(parser.get_field('filename')) - + insmod_output = Popen(['insmod %s' % module], stderr=PIPE, shell=True, universal_newlines=True) @@ -259,10 +261,10 @@ def check_module_status(): if "Permission denied" in error: raise PermissionException(error) - vendor_data = re.findall("Vendor:\s.*", output) + vendor_data = re.findall(r"Vendor:\s.*", output) try: manufacturer = vendor_data[0].split(":")[1].strip() - except IndexError as exception: + except IndexError: logging.error("Failed to find Manufacturing data") return @@ -275,8 +277,8 @@ def check_module_status(): oem_module = oem_driver_pool.get(vendor) break # We've found our desired module to probe. - if oem_module != None: - if insert_supported_module(oem_module) != None: + if oem_module is not None: + if insert_supported_module(oem_module) is not None: logging.error("Failed module insertion") # Check dmesg status for supported module driver_status = Popen(['dmesg'], stdout=PIPE, universal_newlines=True) @@ -347,5 +349,6 @@ def main(): # Reading is not instant. time.sleep(5) + if __name__ == '__main__': - main(); + main() diff --git a/bin/alsa_tests.py b/bin/alsa_tests.py index 16fb044..e919570 100755 --- a/bin/alsa_tests.py +++ b/bin/alsa_tests.py @@ -168,5 +168,6 @@ def main(): device = None return(actions[args.action](args.duration, device)) + if __name__ == '__main__': sys.exit(main()) diff --git a/bin/ansi_parser.py b/bin/ansi_parser.py index c7cf26c..b5cec30 100755 --- a/bin/ansi_parser.py +++ b/bin/ansi_parser.py @@ -128,7 +128,8 @@ def parse_filename(filename): def main(args): usage = "Usage: %prog [OPTIONS] [FILE...]" parser = OptionParser(usage=usage) - parser.add_option("-o", "--output", + parser.add_option( + "-o", "--output", metavar="FILE", help="File where to output the result.") (options, args) = parser.parse_args(args) diff --git a/bin/audio_driver_info.py b/bin/audio_driver_info.py index b9bfaf8..424b125 100755 --- a/bin/audio_driver_info.py +++ b/bin/audio_driver_info.py @@ -41,7 +41,6 @@ class PacmdAudioDevice(): print("Error running %s:" % cmd, file=sys.stderr) print(err.output, file=sys.stderr) return None - if not stream: print("Error: modinfo returned nothing", file=sys.stderr) return None @@ -74,7 +73,8 @@ def list_device_info(): pacmd_entries = check_output(["pacmd", "list-%ss" % vtype], universal_newlines=True) except Exception as e: - print("Error when running pacmd list-%ss: %s" % (vtype, e), + print( + "Error when running pacmd list-%ss: %s" % (vtype, e), file=sys.stderr) return 1 @@ -102,8 +102,9 @@ def list_device_info(): def main(): parser = ArgumentParser("List audio device and driver information") - args = parser.parse_args() + parser.parse_args() return list_device_info() + if __name__ == "__main__": sys.exit(main()) diff --git a/bin/audio_test.py b/bin/audio_test.py index 7bd6c44..2a56df4 100755 --- a/bin/audio_test.py +++ b/bin/audio_test.py @@ -13,7 +13,7 @@ import time try: import gi gi.require_version('GLib', '2.0') - gi.require_version('Gst','1.0') + gi.require_version('Gst', '1.0') from gi.repository import GObject from gi.repository import Gst from gi.repository import GLib @@ -29,26 +29,27 @@ try: except ImportError: from collections import Callable # backward compatible -#Frequency bands for FFT +# Frequency bands for FFT BINS = 256 -#How often to take a sample and do FFT on it. +# How often to take a sample and do FFT on it. FFT_INTERVAL = 100000000 # In nanoseconds, so this is every 1/10th second -#Sampling frequency. The effective maximum frequency we can analyze is -#half of this (see Nyquist's theorem) +# Sampling frequency. The effective maximum frequency we can analyze is +# half of this (see Nyquist's theorem) SAMPLING_FREQUENCY = 44100 -#The default test frequency is in the middle of the band that contains 5000Hz -#This frequency was determined experimentally to be high enough but more -#reliable than others we tried. +# The default test frequency is in the middle of the band that contains 5000Hz +# This frequency was determined experimentally to be high enough but more +# reliable than others we tried. DEFAULT_TEST_FREQUENCY = 5035 -#only sample a signal when peak level is in this range (in dB attenuation, -#0 means no attenuation (and horrible clipping). +# only sample a signal when peak level is in this range (in dB attenuation, +# 0 means no attenuation (and horrible clipping). REC_LEVEL_RANGE = (-2.0, -12.0) -#For our test signal to be considered present, it has to be this much higher -#than the base level (minimum magnitude). This is in dB. +# For our test signal to be considered present, it has to be this much higher +# than the base level (minimum magnitude). This is in dB. MAGNITUDE_THRESHOLD = 2.5 -#Volume for the sample tone (in %) +# Volume for the sample tone (in %) PLAY_VOLUME = 70 + class PIDController(object): """ A Proportional-Integrative-Derivative controller (PID) controls a process's output to try to maintain a desired output value (known as @@ -141,7 +142,7 @@ class PAVolumeController(object): 'set-%s-volume' % (self.pa_types[self.type]), str(self.identifier[0]), str(int(volume)) + "%"] - if False == self.method(command): + if not self.method(command): return False self._volume = volume return True @@ -159,7 +160,7 @@ class PAVolumeController(object): 'set-%s-mute' % (self.pa_types[self.type]), str(self.identifier[0]), mute] - if False == self.method(command): + if not self.method(command): return False return True @@ -187,10 +188,10 @@ class PAVolumeController(object): return None command = ['pactl', 'list', self.pa_types[type] + "s", 'short'] - #Expect lines of this form (field separator is tab): - #<ID>\t<NAME>\t<MODULE>\t<SAMPLE_SPEC_WITH_SPACES>\t<STATE> - #What we need to return is the ID for the first element on this list - #that does not contain auto_null or monitor. + # Expect lines of this form (field separator is tab): + # <ID>\t<NAME>\t<MODULE>\t<SAMPLE_SPEC_WITH_SPACES>\t<STATE> + # What we need to return is the ID for the first element on this list + # that does not contain auto_null or monitor. pa_info = self.method(command) valid_elements = None @@ -203,17 +204,17 @@ class PAVolumeController(object): self.logger.error("No valid PulseAudio elements" " for %s" % (self.type)) return None - #We only need the pulseaudio numeric ID and long name for each element + # We only need the pulseaudio numeric ID and long name for each element valid_elements = [(int(e.split()[0]), e.split()[1]) for e in valid_elements] return valid_elements[0] def _pactl_output(self, command): - #This method mainly calls pactl (hence the name). Since pactl may - #return a failure if the audio layer is not yet initialized, we will - #try running a few times in case of failure. All our invocations of - #pactl should be "idempotent" so repeating them should not have - #any bad effects. + # This method mainly calls pactl (hence the name). Since pactl may + # return a failure if the audio layer is not yet initialized, we will + # try running a few times in case of failure. All our invocations of + # pactl should be "idempotent" so repeating them should not have + # any bad effects. for attempt in range(0, 3): try: return subprocess.check_output(command, @@ -242,8 +243,8 @@ class SpectrumAnalyzer(object): self.number_of_samples = 0 self.wanted_samples = wanted_samples self.sampling_frequency = sampling_frequency - #Frequencies should contain *real* frequency which is half of - #the sampling frequency + # Frequencies should contain *real* frequency which is half of + # the sampling frequency self.frequencies = [((sampling_frequency / 2.0) / points) * i for i in range(points)] @@ -259,14 +260,14 @@ class SpectrumAnalyzer(object): self.number_of_samples += 1 def frequencies_with_peak_magnitude(self, threshold=1.0): - #First establish the base level + # First establish the base level per_magnitude_bins = collections.defaultdict(int) for magnitude in self.spectrum: per_magnitude_bins[magnitude] += 1 base_level = max(per_magnitude_bins, key=lambda x: per_magnitude_bins[x]) - #Now return all values that are higher (more positive) - #than base_level + threshold + # Now return all values that are higher (more positive) + # than base_level + threshold peaks = [] for i in range(1, len(self.spectrum) - 1): first_index = i - 1 @@ -282,10 +283,10 @@ class SpectrumAnalyzer(object): """Convenience function to tell me which band a frequency is contained in """ - #Note that actual frequencies are half of what the sampling - #frequency would tell us. If SF is 44100 then maximum actual - #frequency is 22050, and if I have 10 frequency bins each will - #contain only 2205 Hz, not 4410 Hz. + # Note that actual frequencies are half of what the sampling + # frequency would tell us. If SF is 44100 then maximum actual + # frequency is 22050, and if I have 10 frequency bins each will + # contain only 2205 Hz, not 4410 Hz. max_frequency = self.sampling_frequency / 2 if frequency > max_frequency or frequency < 0: return None @@ -339,34 +340,33 @@ class GStreamerMessageHandler(object): if message.type == Gst.MessageType.ELEMENT: message_name = message.get_structure().get_name() if message_name == 'spectrum': - #TODO: Due to an upstream bug, a structure's get_value method - #doesn't work if the value in question is an array (as is the - #case with the magnitudes). - #https://bugzilla.gnome.org/show_bug.cgi?id=693168 - #We have to resort to parsing the string representation of the - #structure. It's an ugly hack but it works. - #Ideally we'd be able to say this to get fft_magnitudes: - #message.get_structure.get_value('magnitude'). - #If an upstream fix ever makes it into gstreamer, - #remember to remove this hack and the parse_spectrum - #method + # TODO: Due to an upstream bug, a structure's get_value method + # doesn't work if the value in question is an array (as is the + # case with the magnitudes). + # https://bugzilla.gnome.org/show_bug.cgi?id=693168 + # We have to resort to parsing the string representation of the + # structure. It's an ugly hack but it works. + # Ideally we'd be able to say this to get fft_magnitudes: + # message.get_structure.get_value('magnitude'). + # If an upstream fix ever makes it into gstreamer, + # remember to remove this hack and the parse_spectrum method struct_string = message.get_structure().to_string() structure = parse_spectrum_message_structure(struct_string) fft_magnitudes = structure['magnitude'] self.spectrum_method(self.spectrum_analyzer, fft_magnitudes) if message_name == 'level': - #peak_value is our process feedback - #It's returned as an array, so I need the first (and only) - #element + # peak_value is our process feedback + # It's returned as an array, so I need the first (and only) + # element peak_value = message.get_structure().get_value('peak')[0] self.level_method(peak_value, self.pid_controller, self.volume_controller) - #Adjust recording level + # Adjust recording level def level_method(self, level, pid_controller, volume_controller): - #If volume controller doesn't return a valid volume, - #we can't control it :( + # If volume controller doesn't return a valid volume, + # we can't control it :( current_volume = volume_controller.get_volume() if current_volume is None: self.logger.error("Unable to control recording volume." @@ -375,19 +375,20 @@ class GStreamerMessageHandler(object): self.current_level = level change = pid_controller.input_change(level, 0.10) if self.logger: - self.logger.debug("Peak level: %(peak_level).2f, " - "volume: %(volume)d%%, Volume change: %(change)f%%" % - {'peak_level': level, - 'change': change, - 'volume': current_volume}) + self.logger.debug( + "Peak level: %(peak_level).2f, " + "volume: %(volume)d%%, Volume change: %(change)f%%" % { + 'peak_level': level, + 'change': change, + 'volume': current_volume}) volume_controller.set_volume(current_volume + change) - #Only sample if level is within the threshold + # Only sample if level is within the threshold def spectrum_method(self, analyzer, spectrum): if self.rec_level_range[1] <= self.current_level \ or self.current_level <= self.rec_level_range[0]: - self.logger.debug("Sampling, recorded %d samples" % - analyzer.number_of_samples) + self.logger.debug( + "Sampling, recorded %d samples" % analyzer.number_of_samples) analyzer.sample(spectrum) if analyzer.sampling_complete() and self._quit_method: self.logger.info("Sampling complete, ending process") @@ -414,10 +415,11 @@ class GstAudioObject(object): class Player(GstAudioObject): def __init__(self, frequency=DEFAULT_TEST_FREQUENCY, logger=None): super(Player, self).__init__() - self.pipeline_description = ("audiotestsrc wave=sine freq=%s " - "! audioconvert " - "! audioresample " - "! autoaudiosink" % int(frequency)) + self.pipeline_description = ( + "audiotestsrc wave=sine freq=%s " + "! audioconvert " + "! audioresample " + "! autoaudiosink" % int(frequency)) self.logger = logger if self.logger: self.logger.debug(self.pipeline_description) @@ -437,11 +439,11 @@ class Recorder(GstAudioObject): ! audioresample ! spectrum interval=%(fft_interval)s bands = %(bands)s ! wavenc - ! filesink location=%(file)s''' % - {'bands': bins, - 'rate': sampling_frequency, - 'fft_interval': fft_interval, - 'file': output_file}) + ! filesink location=%(file)s''' % { + 'bands': bins, + 'rate': sampling_frequency, + 'fft_interval': fft_interval, + 'file': output_file}) self.logger = logger if self.logger: self.logger.debug(pipeline_description) @@ -457,25 +459,25 @@ class Recorder(GstAudioObject): def parse_spectrum_message_structure(struct_string): - #First let's jsonize this - #This is the message name, which we don't need + # First let's jsonize this + # This is the message name, which we don't need text = struct_string.replace("spectrum, ", "") - #name/value separator in json is : and not = - text = text.replace("=",": ") - #Mutate the {} array notation from the structure to - #[] notation for json. - text = text.replace("{","[") - text = text.replace("}","]") - #Remove a few stray semicolons that aren't needed - text = text.replace(";","") - #Remove the data type fields, as json doesn't need them + # name/value separator in json is : and not = + text = text.replace("=", ": ") + # Mutate the {} array notation from the structure to + # [] notation for json. + text = text.replace("{", "[") + text = text.replace("}", "]") + # Remove a few stray semicolons that aren't needed + text = text.replace(";", "") + # Remove the data type fields, as json doesn't need them text = re.sub(r"\(.+?\)", "", text) - #double-quote the identifiers + # double-quote the identifiers text = re.sub(r"([\w-]+):", r'"\1":', text) - #Wrap the whole thing in brackets + # Wrap the whole thing in brackets text = ("{"+text+"}") - #Try to parse and return something sensible here, even if - #the data was unparsable. + # Try to parse and return something sensible here, even if + # the data was unparsable. try: return json.loads(text) except ValueError: @@ -489,32 +491,38 @@ def process_arguments(): presence of the played frequency, if present it exits with success. """ parser = argparse.ArgumentParser(description=description) - parser.add_argument("-t", "--time", + parser.add_argument( + "-t", "--time", dest='test_duration', action='store', default=30, type=int, help="""Maximum test duration, default %(default)s seconds. It may exit sooner if it determines it has enough data.""") - parser.add_argument("-a", "--audio", + parser.add_argument( + "-a", "--audio", action='store', default="/dev/null", type=str, help="File to save recorded audio in .wav format") - parser.add_argument("-q", "--quiet", + parser.add_argument( + "-q", "--quiet", action='store_true', default=False, help="Be quiet, no output unless there's an error.") - parser.add_argument("-d", "--debug", + parser.add_argument( + "-d", "--debug", action='store_true', default=False, help="Debugging output") - parser.add_argument("-f", "--frequency", + parser.add_argument( + "-f", "--frequency", action='store', default=DEFAULT_TEST_FREQUENCY, type=int, help="Frequency for test signal, default %(default)s Hz") - parser.add_argument("-u", "--spectrum", + parser.add_argument( + "-u", "--spectrum", action='store', type=str, help="""File to save spectrum information for plotting @@ -524,10 +532,10 @@ def process_arguments(): # def main(): - #Get arguments. + # Get arguments. args = process_arguments() - #Setup logging + # Setup logging level = logging.INFO if args.debug: level = logging.DEBUG @@ -535,56 +543,57 @@ def main(): level = logging.ERROR logging.basicConfig(level=level) try: - #Launches recording pipeline. I need to hook up into the gst - #messages. + # Launches recording pipeline. I need to hook up into the gst + # messages. recorder = Recorder(output_file=args.audio, logger=logging) - #Just launches the playing pipeline + # Just launches the playing pipeline player = Player(frequency=args.frequency, logger=logging) except GObject.GError as excp: logging.critical("Unable to initialize GStreamer pipelines: %s", excp) sys.exit(127) - #This just receives a process feedback and tells me how much to change to - #achieve the setpoint + # This just receives a process feedback and tells me how much to change to + # achieve the setpoint pidctrl = PIDController(Kp=0.7, Ki=.01, Kd=0.01, setpoint=REC_LEVEL_RANGE[0]) pidctrl.set_change_limit(5) - #This gathers spectrum data. + # This gathers spectrum data. analyzer = SpectrumAnalyzer(points=BINS, sampling_frequency=SAMPLING_FREQUENCY) - #Volume controllers actually set volumes for their device types. - #we should at least issue a warning + # Volume controllers actually set volumes for their device types. + # we should at least issue a warning recorder.volumecontroller = PAVolumeController(type='input', logger=logging) if not recorder.volumecontroller.get_identifier(): - logging.warning("Unable to get input volume control identifier. " - "Test results will probably be invalid") + logging.warning( + "Unable to get input volume control identifier. " + "Test results will probably be invalid") recorder.volumecontroller.set_volume(0) recorder.volumecontroller.mute(False) player.volumecontroller = PAVolumeController(type='output', logger=logging) if not player.volumecontroller.get_identifier(): - logging.warning("Unable to get output volume control identifier. " - "Test results will probably be invalid") + logging.warning( + "Unable to get output volume control identifier. " + "Test results will probably be invalid") player.volumecontroller.set_volume(PLAY_VOLUME) player.volumecontroller.mute(False) - #This handles the messages from gstreamer and orchestrates - #the passed volume controllers, pid controller and spectrum analyzer - #accordingly. + # This handles the messages from gstreamer and orchestrates + # the passed volume controllers, pid controller and spectrum analyzer + # accordingly. gmh = GStreamerMessageHandler(rec_level_range=REC_LEVEL_RANGE, logger=logging, volumecontroller=recorder.volumecontroller, pidcontroller=pidctrl, spectrum_analyzer=analyzer) - #I need to tell the recorder which method will handle messages. + # I need to tell the recorder which method will handle messages. recorder.register_message_handler(gmh.bus_message_handler) - #Create the loop and add a few triggers -# GObject.threads_init() #Not needed? + # Create the loop and add a few triggers loop = GLib.MainLoop() GLib.timeout_add_seconds(0, player.start) GLib.timeout_add_seconds(0, recorder.start) @@ -595,29 +604,32 @@ def main(): loop.run() - #When the loop ends, set things back to reasonable states + # When the loop ends, set things back to reasonable states player.stop() recorder.stop() player.volumecontroller.set_volume(50) recorder.volumecontroller.set_volume(10) - #See if data gathering was successful. + # See if data gathering was successful. test_band = analyzer.frequency_band_for(args.frequency) - candidate_bands = analyzer.frequencies_with_peak_magnitude(MAGNITUDE_THRESHOLD) + candidate_bands = analyzer.frequencies_with_peak_magnitude( + MAGNITUDE_THRESHOLD) for band in candidate_bands: logging.debug("Band (%.2f,%.2f) contains a magnitude peak" % analyzer.frequencies_for_band(band)) if test_band in candidate_bands: freqs_for_band = analyzer.frequencies_for_band(test_band) - logging.info("PASS: Test frequency of %s in band (%.2f, %.2f) " - "which contains a magnitude peak" % + logging.info( + "PASS: Test frequency of %s in band (%.2f, %.2f) " + "which contains a magnitude peak" % ((args.frequency,) + freqs_for_band)) return_value = 0 else: - logging.info("FAIL: Test frequency of %s is not in one of the " - "bands with magnitude peaks" % args.frequency) + logging.info( + "FAIL: Test frequency of %s is not in one of the " + "bands with magnitude peaks" % args.frequency) return_value = 1 - #Is the microphone broken? + # Is the microphone broken? if len(set(analyzer.spectrum)) <= 1: logging.info("WARNING: Microphone seems broken, didn't even " "record ambient noise") @@ -625,14 +637,17 @@ def main(): if args.spectrum: logging.info("Saving spectrum data for plotting as %s" % args.spectrum) - if not FileDumper().write_to_file(args.spectrum, - ["%s,%s" % t for t in - zip(analyzer.frequencies, - analyzer.spectrum)]): + if ( + not FileDumper().write_to_file( + args.spectrum, + ["%s,%s" % t for t in zip( + analyzer.frequencies, analyzer.spectrum)]) + ): logging.error("Couldn't save spectrum data for plotting", file=sys.stderr) return return_value + if __name__ == "__main__": sys.exit(main()) diff --git a/bin/battery_test.py b/bin/battery_test.py index b2682c7..112ef5e 100755 --- a/bin/battery_test.py +++ b/bin/battery_test.py @@ -1,14 +1,13 @@ #!/usr/bin/env python3 import gi -import os import time import re import subprocess import sys import argparse gi.require_version('Gio', '2.0') -from gi.repository import Gio +from gi.repository import Gio # noqa: E402 class Battery(): @@ -89,10 +88,10 @@ def get_battery_state(): def validate_battery_info(battery): if battery is None: - print ("Error obtaining battery info") + print("Error obtaining battery info") return False if battery._state != "discharging": - print ("Error: battery is not discharging, test will not be valid") + print("Error: battery is not discharging, test will not be valid") return False return True @@ -173,5 +172,6 @@ def main(): return(battery_life(battery_before, battery_after, test_time)) + if __name__ == "__main__": sys.exit(main()) diff --git a/bin/bluetooth_test.py b/bin/bluetooth_test.py index 8fafad6..fd3c956 100755 --- a/bin/bluetooth_test.py +++ b/bin/bluetooth_test.py @@ -145,5 +145,6 @@ def main(): format='%(levelname)s: %(message)s') return getattr(ObexFTPTest(args.file.name, args.btaddr), args.action)() + if __name__ == "__main__": sys.exit(main()) diff --git a/bin/bmc_info.py b/bin/bmc_info.py index d7aec9b..674d7e8 100755 --- a/bin/bmc_info.py +++ b/bin/bmc_info.py @@ -4,6 +4,7 @@ import sys import shlex from subprocess import check_output, CalledProcessError + def main(): # First, we need to get output cmd = "ipmitool mc info" @@ -14,7 +15,7 @@ def main(): file=sys.stderr) return 1 except CalledProcessError as e: - print("Problem running %s. Error was %s" % (cmd,e),file=sys.stderr) + print("Problem running %s. Error was %s" % (cmd, e), file=sys.stderr) return 1 result = result.split('\n') @@ -47,12 +48,11 @@ def main(): if type(data[field]) is list: # Sometimes the first item in the list is ''. This will remove it data[field].remove('') - print(field.ljust(30),':',data[field].pop(0)) + print(field.ljust(30), ':', data[field].pop(0)) for item in data[field]: - print(' '.ljust(32),item) + print(' '.ljust(32), item) else: - print(field.ljust(30),":",data[field]) - #print("{}:\t{}".format(field, data[field])) + print(field.ljust(30), ":", data[field]) return 0 diff --git a/bin/boot_mode_test_snappy.py b/bin/boot_mode_test_snappy.py index 864be29..23b30e4 100755 --- a/bin/boot_mode_test_snappy.py +++ b/bin/boot_mode_test_snappy.py @@ -14,7 +14,6 @@ import tempfile import yaml -from checkbox_support.parsers.kernel_cmdline import parse_kernel_cmdline from checkbox_support.snap_utils.system import get_lk_bootimg_path diff --git a/bin/brightness_test.py b/bin/brightness_test.py index 52d478a..e3a8e90 100755 --- a/bin/brightness_test.py +++ b/bin/brightness_test.py @@ -28,7 +28,6 @@ import sys import os import time -from sys import stdout, stderr from glob import glob @@ -42,7 +41,7 @@ class Brightness(object): # See if the source is a file or a file object # and act accordingly file = path - if file == None: + if file is None: lines_list = [] else: # It's a file @@ -102,8 +101,10 @@ class Brightness(object): Note: this doesn't guarantee that screen brightness changed. ''' - if (abs(self.get_actual_brightness(interface) - - self.get_last_set_brightness(interface)) > 1): + if ( + abs(self.get_actual_brightness(interface) - + self.get_last_set_brightness(interface)) > 1 + ): return 1 else: return 0 @@ -132,9 +133,9 @@ def main(): max_brightness = brightness.get_max_brightness(interface) # Set the brightness to half the max value - brightness.write_value(max_brightness / 2, - os.path.join(interface, - 'brightness')) + brightness.write_value( + max_brightness / 2, + os.path.join(interface, 'brightness')) # Check that "actual_brightness" reports the same value we # set "brightness" to @@ -144,9 +145,9 @@ def main(): time.sleep(2) # Set the brightness back to its original value - brightness.write_value(current_brightness, - os.path.join(interface, - 'brightness')) + brightness.write_value( + current_brightness, + os.path.join(interface, 'brightness')) exit(exit_status) diff --git a/bin/bt_connect.py b/bin/bt_connect.py index 1fed30e..8d6757c 100755 --- a/bin/bt_connect.py +++ b/bin/bt_connect.py @@ -131,5 +131,6 @@ def main(): # capture all other silence failures return 1 + if __name__ == "__main__": sys.exit(main()) diff --git a/bin/bt_list_adapters.py b/bin/bt_list_adapters.py index e791442..c8db787 100755 --- a/bin/bt_list_adapters.py +++ b/bin/bt_list_adapters.py @@ -38,7 +38,7 @@ def main(): with open(namef, 'r') as f: name = f.read().strip() print(rfdev, name) - if found_adatper == False: + if found_adatper is False: raise SystemExit('No bluetooth adatpers registered with rfkill') diff --git a/bin/color_depth_info.py b/bin/color_depth_info.py index 129f051..fcab953 100755 --- a/bin/color_depth_info.py +++ b/bin/color_depth_info.py @@ -53,7 +53,7 @@ def get_color_depth(log_dir='/var/log/'): return (depth, pixmap_format) with open(current_log, 'rb') as stream: - for match in re.finditer('Depth (\d+) pixmap format is (\d+) bpp', + for match in re.finditer(r'Depth (\d+) pixmap format is (\d+) bpp', str(stream.read())): depth = int(match.group(1)) pixmap_format = int(match.group(2)) @@ -72,5 +72,6 @@ def main(): return 1 return 0 + if __name__ == '__main__': sys.exit(main()) diff --git a/bin/cpu_offlining.py b/bin/cpu_offlining.py index 5616e30..3d269ac 100755 --- a/bin/cpu_offlining.py +++ b/bin/cpu_offlining.py @@ -68,5 +68,6 @@ def main(): print(' '.join(failed_onlines)) return 1 + if __name__ == '__main__': main() diff --git a/bin/cpu_topology.py b/bin/cpu_topology.py index 0ce68c5..425e3c4 100755 --- a/bin/cpu_topology.py +++ b/bin/cpu_topology.py @@ -21,8 +21,8 @@ class proc_cpuinfo(): finally: cpu_fh.close() - r_s390 = re.compile("processor [0-9]") - r_x86 = re.compile("processor\s+:") + r_s390 = re.compile(r"processor [0-9]") + r_x86 = re.compile(r"processor\s+:") for i in temp: # Handle s390 first if r_s390.match(i): @@ -111,5 +111,6 @@ def main(): else: return 0 + if __name__ == '__main__': sys.exit(main()) diff --git a/bin/cpuid.py b/bin/cpuid.py index d0cdea9..6543dc4 100755 --- a/bin/cpuid.py +++ b/bin/cpuid.py @@ -25,10 +25,9 @@ # Modifications: 2019 Jeffrey Lane (jeffrey.lane@canonical.com) import ctypes -import os import platform import sys -from ctypes import (c_uint32, c_int, c_long, c_ulong, c_size_t, c_void_p, +from ctypes import (c_uint32, c_int, c_size_t, c_void_p, POINTER, CFUNCTYPE) from subprocess import check_output @@ -84,7 +83,8 @@ CPUIDS = { "Broadwell": ['0x4067', '0x306d4', '0x5066', '0x406f'], "Canon Lake": ['0x6066'], "Cascade Lake": ['0x50655', '0x50656', '0x50657'], - "Coffee Lake": ['0x806ea', '0x906ea', '0x906eb', '0x906ec', '0x906ed'], + "Coffee Lake": [ + '0x806ea', '0x906ea', '0x906eb', '0x906ec', '0x906ed'], "Haswell": ['0x306c', '0x4065', '0x4066', '0x306f'], "Ice Lake": ['0x706e'], "Ivy Bridge": ['0x306a', '0x306e'], diff --git a/bin/create_connection.py b/bin/create_connection.py index 71e9572..880151d 100755 --- a/bin/create_connection.py +++ b/bin/create_connection.py @@ -6,9 +6,8 @@ import time from subprocess import check_call, check_output, CalledProcessError try: - from subprocess import DEVNULL # >= python3.3 + from subprocess import DEVNULL # >= python3.3 except ImportError: - import os DEVNULL = open(os.devnull, 'wb') from uuid import uuid4 @@ -154,7 +153,7 @@ def block_until_created(connection, retries, interval): sys.exit(1) else: try: - nmcli_con_up = check_call(['nmcli', 'con', 'up', 'id', connection]) + check_call(['nmcli', 'con', 'up', 'id', connection]) print("Connection %s activated." % connection) except CalledProcessError as error: print("Failed to activate %s." % connection, file=sys.stderr) @@ -216,7 +215,7 @@ def create_mobilebroadband_connection(args): args.apn, args.username, args.password, - args.pin) + args.pin) if args.type == 'cdma': mobilebroadband_connection += mobilebroadband_ppp_section() @@ -288,5 +287,6 @@ def main(): # Make sure we don't exit until the connection is fully created block_until_created(connection_name, args.retries, args.interval) + if __name__ == "__main__": main() diff --git a/bin/dmitest.py b/bin/dmitest.py index 642af3a..2be50cc 100755 --- a/bin/dmitest.py +++ b/bin/dmitest.py @@ -146,26 +146,26 @@ def standard_tests(args, stream, dmi_data): if find_in_section(stream, dmi_data, 'Chassis Information', 'Manufacturer:', ['empty', 'chassis manufacture', 'null', 'insyde', - 'to be filled by o\.e\.m\.', 'no enclosure', - '\.\.\.\.\.'], True): + r'to be filled by o\.e\.m\.', 'no enclosure', + r'\.\.\.\.\.'], True): dmi_data['Chassis Information']['Manufacturer'] += \ " *** Invalid chassis manufacturer!" retval += 1 if find_in_section(stream, dmi_data, 'System Information', 'Manufacturer:', ['system manufacture', 'insyde', 'standard', - 'to be filled by o\.e\.m\.', 'no enclosure'], True): + r'to be filled by o\.e\.m\.', 'no enclosure'], True): dmi_data['System Information']['Manufacturer'] += \ " *** Invalid system manufacturer!" retval += 1 if find_in_section(stream, dmi_data, 'Base Board Information', 'Manufacturer:', - ['to be filled by o\.e\.m\.'], True): + [r'to be filled by o\.e\.m\.'], True): dmi_data['Base Board Information']['Manufacturer'] += \ " *** Invalid base board manufacturer!" retval += 1 if find_in_section(stream, dmi_data, 'System Information', 'Product Name:', - ['system product name', 'to be filled by o\.e\.m\.'], + ['system product name', r'to be filled by o\.e\.m\.'], False): dmi_data['System Information']['Product Name'] += \ " *** Invalid system product name!" @@ -173,7 +173,7 @@ def standard_tests(args, stream, dmi_data): if find_in_section(stream, dmi_data, 'Base Board Information', 'Product Name:', ['base board product name', - 'to be filled by o\.e\.m\.'], False): + r'to be filled by o\.e\.m\.'], False): dmi_data['Base Board Information']['Product Name'] += \ " *** Invalid base board product name!" retval += 1 @@ -193,21 +193,21 @@ def version_tests(args, stream, dmi_data): """ retval = 0 if find_in_section(stream, dmi_data, 'Chassis Information', 'Version:', - ['to be filled by o\.e\.m\.', 'empty', 'x\.x'], + [r'to be filled by o\.e\.m\.', 'empty', r'x\.x'], False): dmi_data['Chassis Information']['Version'] += \ " *** Invalid chassis version!" retval += 1 if find_in_section(stream, dmi_data, 'System Information', 'Version:', - ['to be filled by o\.e\.m\.', '\(none\)', + [r'to be filled by o\.e\.m\.', r'\(none\)', 'null', 'system version', 'not applicable', - '\.\.\.\.\.'], False): + r'\.\.\.\.\.'], False): dmi_data['System Information']['Version'] += \ " *** Invalid system information version!" retval += 1 if find_in_section(stream, dmi_data, 'Base Board Information', 'Version:', - ['base board version', 'x\.x', - 'empty', 'to be filled by o\.e\.m\.'], False): + ['base board version', r'x\.x', + 'empty', r'to be filled by o\.e\.m\.'], False): dmi_data['Base Board Information']['Version'] += \ " *** Invalid base board version!" retval += 1 @@ -228,8 +228,8 @@ def serial_tests(args, stream, dmi_data): retval = 0 if find_in_section(stream, dmi_data, 'System Information', 'Serial Number:', - ['to be filled by o\.e\.m\.', - 'system serial number', '\.\.\.\.\.'], + [r'to be filled by o\.e\.m\.', + 'system serial number', r'\.\.\.\.\.'], False): dmi_data['System Information']['Serial Number'] += \ " *** Invalid system information serial number!" @@ -237,8 +237,8 @@ def serial_tests(args, stream, dmi_data): if find_in_section(stream, dmi_data, 'Base Board Information', 'Serial Number:', ['n/a', 'base board serial number', - 'to be filled by o\.e\.m\.', - 'empty', '\.\.\.'], + r'to be filled by o\.e\.m\.', + 'empty', r'\.\.\.'], False): dmi_data['Base Board Information']['Serial Number'] += \ " *** Invalid base board serial number!" @@ -306,7 +306,7 @@ def main(): if args.test_serials: retval += serial_tests(args, stream, dmi_data) if find_in_section(stream, dmi_data, 'Processor Information', 'Version:', - ['sample', 'Genuine Intel\(R\) CPU 0000'], False): + ['sample', r'Genuine Intel\(R\) CPU 0000'], False): dmi_data['Processor Information']['Version'] += \ " *** Invalid processor information!" retval += 1 diff --git a/bin/edid_cycle.py b/bin/edid_cycle.py index a6daf1c..e327cb0 100755 --- a/bin/edid_cycle.py +++ b/bin/edid_cycle.py @@ -21,7 +21,7 @@ def check_resolution(): output = subprocess.check_output('xdpyinfo') for line in output.decode(sys.stdout.encoding).splitlines(): if 'dimensions' in line: - match = re.search('(\d+)x(\d+)\ pixels', line) + match = re.search(r'(\d+)x(\d+)\ pixels', line) if match and len(match.groups()) == 2: return '{}x{}'.format(*match.groups()) @@ -29,7 +29,8 @@ def check_resolution(): def change_edid(host, edid_file): with open(edid_file, 'rb') as f: cmd = ['ssh', host, '/snap/bin/pigbox', 'run', - '\'v4l2-ctl --set-edid=file=-,format=raw --fix-edid-checksums\''] + '\'v4l2-ctl --set-edid=file=-,' + 'format=raw --fix-edid-checksums\''] subprocess.check_output(cmd, input=f.read()) @@ -52,5 +53,6 @@ def main(): print('PASS') return failed + if __name__ == '__main__': raise SystemExit(main()) diff --git a/bin/evdev_touch_test.py b/bin/evdev_touch_test.py index 5649701..bc52cc4 100755 --- a/bin/evdev_touch_test.py +++ b/bin/evdev_touch_test.py @@ -53,7 +53,7 @@ while True: for e in dev.read(): tap = args.xfingers if tap == 1: - if (e.type == 3 and e.code == 47 and e.value > 0): + if (e.type == 3 and e.code == 47 and e.value > 0): raise SystemExit( "Multitouch Event detected but Single was expected") # type 1 is evdev.ecodes.EV_KEY diff --git a/bin/fan_reaction_test.py b/bin/fan_reaction_test.py index a4ab744..c37054b 100755 --- a/bin/fan_reaction_test.py +++ b/bin/fan_reaction_test.py @@ -34,6 +34,7 @@ class FanMonitor: if not self._fan_paths: print('Fan monitoring interface not found in SysFS') raise SystemExit(0) + def get_rpm(self): result = {} for p in self._fan_paths: @@ -44,6 +45,7 @@ class FanMonitor: except OSError: print('Fan SysFS node dissappeared ({})'.format(p)) return result + def get_average_rpm(self, period): acc = self.get_rpm() for i in range(period): @@ -83,14 +85,15 @@ class Stressor: def _stress_fun(self): """Actual stress function.""" # generate some random data - data = bytes(random.getrandbits(8) for _ in range(1024)) - hasher = hashlib.sha256() + data = bytes(random.getrandbits(8) for _ in range(1024)) + hasher = hashlib.sha256() hasher.update(data) while True: new_digest = hasher.digest() # use the newly obtained digest as the new data to the hasher hasher.update(new_digest) + def main(): """Entry point.""" @@ -153,5 +156,6 @@ def main(): if not (rpm_rose_during_stress and rpm_dropped_during_cooling): raise SystemExit("Fans did not react to stress expectedly") + if __name__ == '__main__': main() diff --git a/bin/frequency_governors_test.py b/bin/frequency_governors_test.py index 6072ded..52dd74a 100755 --- a/bin/frequency_governors_test.py +++ b/bin/frequency_governors_test.py @@ -8,7 +8,8 @@ import time import argparse import logging -from subprocess import check_output, check_call, CalledProcessError, PIPE +from subprocess import check_output, check_call, CalledProcessError + class CPUScalingTest(object): @@ -37,11 +38,11 @@ class CPUScalingTest(object): for subdirectory in os.listdir(self.sysCPUDirectory): match = pattern.search(subdirectory) if match and match.group("cpuNumber"): - cpufreqDirectory = os.path.join(self.sysCPUDirectory, - subdirectory, "cpufreq") + cpufreqDirectory = os.path.join( + self.sysCPUDirectory, subdirectory, "cpufreq") if not os.path.exists(cpufreqDirectory): - logging.error("CPU %s has no cpufreq directory %s" - % (match.group("cpuNumber"), cpufreqDirectory)) + logging.error("CPU %s has no cpufreq directory %s" % ( + match.group("cpuNumber"), cpufreqDirectory)) return None # otherwise self.cpufreqDirectories.append(cpufreqDirectory) @@ -59,7 +60,8 @@ class CPUScalingTest(object): for cpufreqDirectory in self.cpufreqDirectories: parameters = self.getParameters(cpufreqDirectory, file) if not parameters: - logging.error("Error: could not determine cpu parameters from %s" + logging.error( + "Error: could not determine cpu parameters from %s" % os.path.join(cpufreqDirectory, file)) return None if not current: @@ -91,7 +93,7 @@ class CPUScalingTest(object): return rf return None - logging.debug("Setting %s to %s" % (setFile,value)) + logging.debug("Setting %s to %s" % (setFile, value)) path = None if not skip: if automatch: @@ -115,8 +117,9 @@ class CPUScalingTest(object): parameterFile = open(path) line = parameterFile.readline() if not line or line.strip() != str(value): - logging.error("Error: could not verify that %s was set to %s" - % (path, value)) + logging.error( + "Error: could not verify that %s was set to %s" % ( + path, value)) if line: logging.error("Actual Value: %s" % line) else: @@ -127,6 +130,7 @@ class CPUScalingTest(object): def checkSelectorExecutable(self): logging.debug("Determining if %s is executable" % self.selectorExe) + def is_exe(fpath): return os.path.exists(fpath) and os.access(fpath, os.X_OK) @@ -179,8 +183,9 @@ class CPUScalingTest(object): parameterFile = open(parameterFilePath) line = parameterFile.readline() if not line: - logging.error("Error: failed to get %s for %s" - % (parameter, self.cpufreqDirectory)) + logging.error( + "Error: failed to get %s for %s" % ( + parameter, self.cpufreqDirectory)) return None value = line.strip() return value @@ -198,8 +203,9 @@ class CPUScalingTest(object): parameterFile = open(path) line = parameterFile.readline() if not line: - logging.error("Error: failed to get %s for %s" - % (parameter, cpufreqDirectory)) + logging.error( + "Error: failed to get %s for %s" % ( + parameter, cpufreqDirectory)) return None values.append(line.strip()) logging.debug("Found parameters:") @@ -231,15 +237,18 @@ class CPUScalingTest(object): runTime = stop_elapsed_time - start_elapsed_time else: thisTime = stop_elapsed_time - start_elapsed_time - if ((abs(thisTime - runTime) / runTime) * 100 - < self.retryTolerance): + if ( + (abs(thisTime - runTime) / runTime) * 100 + < self.retryTolerance + ): return runTime else: runTime = thisTime tries += 1 - logging.error("Could not repeat load test times within %.1f%%" - % self.retryTolerance) + logging.error( + "Could not repeat load test times within %.1f%%" % + self.retryTolerance) return None def pi(self): @@ -262,9 +271,11 @@ class CPUScalingTest(object): logging.info("Done.") minimumFrequency = self.getParameter("scaling_min_freq") currentFrequency = self.getParameter("scaling_cur_freq") - if (not minimumFrequency + if ( + not minimumFrequency or not currentFrequency - or (minimumFrequency != currentFrequency)): + or (minimumFrequency != currentFrequency) + ): return False # otherwise @@ -310,7 +321,8 @@ class CPUScalingTest(object): logging.info(" cpu%u: %s" % (i, g)) i += 1 else: - logging.error("Error: could not determine current governor settings") + logging.error( + "Error: could not determine current governor settings") return False self.getCPUFlags() @@ -333,7 +345,7 @@ class CPUScalingTest(object): logging.debug("Found the following CPU Flags:") for line in self.cpuFlags: logging.debug(" %s" % line) - except: + except OSError: logging.warning("Could not read CPU flags") def runUserSpaceTests(self): @@ -356,52 +368,66 @@ class CPUScalingTest(object): # Set the the CPU speed to it's lowest value frequency = self.minFreq - logging.info("Setting CPU frequency to %u MHz" % (int(frequency) / 1000)) + logging.info( + "Setting CPU frequency to %u MHz" % (int(frequency) / 1000)) if not self.setFrequency(frequency): success = False # Verify the speed is set to the lowest value minimumFrequency = self.getParameter("scaling_min_freq") currentFrequency = self.getParameter("scaling_cur_freq") - if (not minimumFrequency + if ( + not minimumFrequency or not currentFrequency - or (minimumFrequency != currentFrequency)): - logging.error("Could not verify that cpu frequency is set to the minimum value of %s" - % minimumFrequency) + or (minimumFrequency != currentFrequency) + ): + logging.error( + "Could not verify that cpu frequency is set to the minimum" + " value of %s" % minimumFrequency) success = False # Run Load Test self.minimumFrequencyTestTime = self.runLoadTest() if not self.minimumFrequencyTestTime: - logging.error("Could not retrieve the minimum frequency test's execution time.") + logging.error( + "Could not retrieve the minimum frequency test's" + " execution time.") success = False else: - logging.info("Minimum frequency load test time: %.2f" - % self.minimumFrequencyTestTime) + logging.info( + "Minimum frequency load test time: %.2f" + % self.minimumFrequencyTestTime) # Set the CPU speed to it's highest value as above. frequency = self.maxFreq - logging.info("Setting CPU frequency to %u MHz" % (int(frequency) / 1000)) + logging.info( + "Setting CPU frequency to %u MHz" % (int(frequency) / 1000)) if not self.setFrequency(frequency): success = False maximumFrequency = self.getParameter("scaling_max_freq") currentFrequency = self.getParameter("scaling_cur_freq") - if (not maximumFrequency + if ( + not maximumFrequency or not currentFrequency - or (maximumFrequency != currentFrequency)): - logging.error("Could not verify that cpu frequency is set to the maximum value of %s" - % maximumFrequency) + or (maximumFrequency != currentFrequency) + ): + logging.error( + "Could not verify that cpu frequency is set to the" + " maximum value of %s" % maximumFrequency) success = False # Repeat workload test self.maximumFrequencyTestTime = self.runLoadTest() if not self.maximumFrequencyTestTime: - logging.error("Could not retrieve the maximum frequency test's execution time.") + logging.error( + "Could not retrieve the maximum frequency test's " + "execution time.") success = False else: - logging.info("Maximum frequency load test time: %.2f" - % self.maximumFrequencyTestTime) + logging.info( + "Maximum frequency load test time: %.2f" + % self.maximumFrequencyTestTime) # Verify MHz increase is comparable to time % decrease predictedSpeedup = (float(maximumFrequency) @@ -409,8 +435,9 @@ class CPUScalingTest(object): # If "ida" turbo thing, increase the expectation by 8% if self.cpuFlags and self.idaFlag in self.cpuFlags: - logging.info("Found %s flag, increasing expected speedup by %.1f%%" - % (self.idaFlag, self.idaSpeedupFactor)) + logging.info( + "Found %s flag, increasing expected speedup by %.1f%%" + % (self.idaFlag, self.idaSpeedupFactor)) predictedSpeedup = \ (predictedSpeedup * (1.0 / (1.0 - (self.idaSpeedupFactor / 100.0)))) @@ -420,18 +447,24 @@ class CPUScalingTest(object): / self.maximumFrequencyTestTime) logging.info("CPU Frequency Speed Up: %.2f" % predictedSpeedup) logging.info("Measured Speed Up: %.2f" % measuredSpeedup) - differenceSpeedUp = (((measuredSpeedup - predictedSpeedup) / predictedSpeedup) * 100) - logging.info("Percentage Difference %.1f%%" % differenceSpeedUp) + differenceSpeedUp = ( + ((measuredSpeedup - predictedSpeedup) / predictedSpeedup) + * 100) + logging.info( + "Percentage Difference %.1f%%" % differenceSpeedUp) if differenceSpeedUp > self.speedUpTolerance: - logging.error("Measured speedup vs expected speedup is %.1f%% and is not within %.1f%% margin." - % (differenceSpeedUp, self.speedUpTolerance)) + logging.error( + "Measured speedup vs expected speedup is %.1f%% " + "and is not within %.1f%% margin." + % (differenceSpeedUp, self.speedUpTolerance)) success = False elif differenceSpeedUp < 0: - logging.info(""" - Measured speed up %.2f exceeded expected speedup %.2f - """ % (measuredSpeedup, predictedSpeedup)) + logging.info( + "Measured speed up %.2f exceeded expected speedup %.2f" + % (measuredSpeedup, predictedSpeedup)) else: - logging.error("Not enough timing data to calculate speed differences.") + logging.error( + "Not enough timing data to calculate speed differences.") return success @@ -453,7 +486,9 @@ class CPUScalingTest(object): # Wait a fixed period of time, then verify current speed # is the slowest in as before if not self.verifyMinimumFrequency(): - logging.error("Could not verify that cpu frequency has settled to the minimum value") + logging.error( + "Could not verify that cpu frequency has settled to the " + "minimum value") success = False # Repeat workload test @@ -462,7 +497,8 @@ class CPUScalingTest(object): logging.warning("No On Demand load test time available.") success = False else: - logging.info("On Demand load test time: %.2f" % onDemandTestTime) + logging.info( + "On Demand load test time: %.2f" % onDemandTestTime) if onDemandTestTime and self.maximumFrequencyTestTime: # Compare the timing to the max results from earlier, @@ -470,19 +506,24 @@ class CPUScalingTest(object): differenceOnDemandVsMaximum = \ (abs(onDemandTestTime - self.maximumFrequencyTestTime) / self.maximumFrequencyTestTime) * 100 - logging.info("Percentage Difference vs. maximum frequency: %.1f%%" - % differenceOnDemandVsMaximum) + logging.info( + "Percentage Difference vs. maximum frequency: %.1f%%" + % differenceOnDemandVsMaximum) if differenceOnDemandVsMaximum > self.speedUpTolerance: - logging.error("On demand performance vs maximum of %.1f%% is not within %.1f%% margin" - % (differenceOnDemandVsMaximum, - self.speedUpTolerance)) + logging.error( + "On demand performance vs maximum of %.1f%% is not " + "within %.1f%% margin" + % (differenceOnDemandVsMaximum, self.speedUpTolerance)) success = False else: - logging.error("Not enough timing data to calculate speed differences.") + logging.error( + "Not enough timing data to calculate speed differences.") # Verify the current speed has returned to the lowest speed again if not self.verifyMinimumFrequency(): - logging.error("Could not verify that cpu frequency has settled to the minimum value") + logging.error( + "Could not verify that cpu frequency has settled to the " + "minimum value") success = False return success @@ -504,11 +545,14 @@ class CPUScalingTest(object): # Verify the current speed is the same as scaling_max_freq maximumFrequency = self.getParameter("scaling_max_freq") currentFrequency = self.getParameter("scaling_cur_freq") - if (not maximumFrequency + if ( + not maximumFrequency or not currentFrequency - or (maximumFrequency != currentFrequency)): - logging.error("Current cpu frequency of %s is not set to the maximum value of %s" - % (currentFrequency, maximumFrequency)) + or (maximumFrequency != currentFrequency) + ): + logging.error( + "Current cpu frequency of %s is not set to the maximum " + "value of %s" % (currentFrequency, maximumFrequency)) success = False # Repeat work load test @@ -517,22 +561,27 @@ class CPUScalingTest(object): logging.error("No Performance load test time available.") success = False else: - logging.info("Performance load test time: %.2f" % performanceTestTime) + logging.info( + "Performance load test time: %.2f" % performanceTestTime) if performanceTestTime and self.maximumFrequencyTestTime: # Compare the timing to the max results differencePerformanceVsMaximum = \ (abs(performanceTestTime - self.maximumFrequencyTestTime) / self.maximumFrequencyTestTime) * 100 - logging.info("Percentage Difference vs. maximum frequency: %.1f%%" - % differencePerformanceVsMaximum) + logging.info( + "Percentage Difference vs. maximum frequency: %.1f%%" + % differencePerformanceVsMaximum) if differencePerformanceVsMaximum > self.speedUpTolerance: - logging.error("Performance setting vs maximum of %.1f%% is not within %.1f%% margin" - % (differencePerformanceVsMaximum, - self.speedUpTolerance)) + logging.error( + "Performance setting vs maximum of %.1f%% is not " + "within %.1f%% margin" + % (differencePerformanceVsMaximum, + self.speedUpTolerance)) success = False else: - logging.error("Not enough timing data to calculate speed differences.") + logging.error( + "Not enough timing data to calculate speed differences.") return success @@ -559,7 +608,9 @@ class CPUScalingTest(object): # Wait a fixed period of time, # then verify current speed is the slowest in as before if not self.verifyMinimumFrequency(10): - logging.error("Could not verify that cpu frequency has settled to the minimum value") + logging.error( + "Could not verify that cpu frequency has settled " + "to the minimum value") success = False # Set the frequency step to 0, @@ -573,28 +624,34 @@ class CPUScalingTest(object): logging.error("No Conservative load test time available.") success = False else: - logging.info("Conservative load test time: %.2f" - % conservativeTestTime) + logging.info( + "Conservative load test time: %.2f" + % conservativeTestTime) if conservativeTestTime and self.minimumFrequencyTestTime: # Compare the timing to the max results differenceConservativeVsMinimum = \ (abs(conservativeTestTime - self.minimumFrequencyTestTime) / self.minimumFrequencyTestTime) * 100 - logging.info("Percentage Difference vs. minimum frequency: %.1f%%" - % differenceConservativeVsMinimum) + logging.info( + "Percentage Difference vs. minimum frequency: %.1f%%" + % differenceConservativeVsMinimum) if differenceConservativeVsMinimum > self.speedUpTolerance: - logging.error("Performance setting vs minimum of %.1f%% is not within %.1f%% margin" - % (differenceConservativeVsMinimum, - self.speedUpTolerance)) + logging.error( + "Performance setting vs minimum of %.1f%% is not " + "within %.1f%% margin" + % (differenceConservativeVsMinimum, + self.speedUpTolerance)) success = False else: - logging.error("Not enough timing data to calculate speed differences.") + logging.error( + "Not enough timing data to calculate speed differences.") return success def restoreGovernors(self): - logging.info("Restoring original governor to %s" % (self.originalGovernors[0])) + logging.info( + "Restoring original governor to %s" % (self.originalGovernors[0])) self.setGovernor(self.originalGovernors[0]) @@ -604,8 +661,9 @@ def main(): help="Suppress output.") parser.add_argument("-c", "--capabilities", action="store_true", help="Only output CPU capabilities.") - parser.add_argument("-d", "--debug", action="store_true", - help="Turn on debug level output for extra info during test run.") + parser.add_argument( + "-d", "--debug", action="store_true", + help="Turn on debug level output for extra info during test run.") args = parser.parse_args() # Set up the logging system (unless we don't want ANY logging) @@ -614,11 +672,11 @@ def main(): logger.setLevel(logging.DEBUG) format = '%(asctime)s %(levelname)-8s %(message)s' date_format = '%Y-%m-%d %H:%M:%S' - + # If we DO want console output if not args.quiet: - console_handler = logging.StreamHandler() #logging.StreamHandler() - console_handler.setFormatter(logging.Formatter(format,date_format)) + console_handler = logging.StreamHandler() + console_handler.setFormatter(logging.Formatter(format, date_format)) console_handler.setLevel(logging.INFO) logger.addHandler(console_handler) diff --git a/bin/fresh_rate_info.py b/bin/fresh_rate_info.py index 43a8f2a..3089b48 100755 --- a/bin/fresh_rate_info.py +++ b/bin/fresh_rate_info.py @@ -38,13 +38,13 @@ def xrandr_paser(data=None): resolution = None xrandrs = list() for line in str(data).split('\n'): - for match in re.finditer('(.+) connected (\d+x\d+)\+', line): + for match in re.finditer(r'(.+) connected (\d+x\d+)\+', line): connector = match.group(1) resolution = match.group(2) break if resolution is None: continue - for match in re.finditer('{0}\s+(.+)\*'.format(resolution), + for match in re.finditer(r'{0}\s+(.+)\*'.format(resolution), line): refresh_rate = match.group(1) xrandr = {'connector': connector, @@ -72,5 +72,6 @@ def main(): xrandr['resolution'], xrandr['refresh_rate'])) + if __name__ == '__main__': sys.exit(main()) diff --git a/bin/get_make_and_model.py b/bin/get_make_and_model.py index 496a949..42fb219 100755 --- a/bin/get_make_and_model.py +++ b/bin/get_make_and_model.py @@ -1,26 +1,28 @@ #!/usr/bin/env python3 -import os.path import shlex from subprocess import check_output + def print_header(value): print("{}:".format(value)) + def print_data(key, value): print(" {}: {}".format(key, value)) + def run_cmd(option): cmd = "lshw -C " + option - out = check_output(shlex.split(cmd), - universal_newlines = True) + out = check_output(shlex.split(cmd), universal_newlines=True) return out.split('\n') + def main(): keys = {'Manufacturer': 'vendor', 'Model': 'product', 'Version': 'version'} - lshw_classes = {'system': 'System', + lshw_classes = {'system': 'System', 'bus': 'Mainboard'} for lshw_class in lshw_classes: @@ -38,6 +40,6 @@ def main(): for key in data: print_data(key, data[key]) + if __name__ == "__main__": raise SystemExit(main()) - diff --git a/bin/gpu_test.py b/bin/gpu_test.py index 49ea31a..4f2af2e 100755 --- a/bin/gpu_test.py +++ b/bin/gpu_test.py @@ -30,9 +30,9 @@ import subprocess import sys import time gi.require_version('Gio', '2.0') -from gi.repository import Gio -from math import cos, sin -from threading import Thread +from gi.repository import Gio # noqa: E402 +from math import cos, sin # noqa: E402 +from threading import Thread # noqa: E402 class GlxThread(Thread): @@ -44,14 +44,13 @@ class GlxThread(Thread): try: self.process = subprocess.Popen( - ["glxgears","-geometry", "400x400"], + ["glxgears", "-geometry", "400x400"], stdout=subprocess.PIPE, stderr=subprocess.STDOUT) self.process.communicate() except (subprocess.CalledProcessError, FileNotFoundError) as er: print("WARNING: Unable to start glxgears (%s)" % er) - def terminate(self): if not hasattr(self, 'id'): print("WARNING: Attempted to terminate non-existing window.") @@ -121,8 +120,8 @@ class Html5VideoThread(Thread): def html5_path(self): if os.getenv('PLAINBOX_PROVIDER_DATA'): return os.path.join( - os.getenv('PLAINBOX_PROVIDER_DATA'), - 'websites/html5_video.html') + os.getenv('PLAINBOX_PROVIDER_DATA'), + 'websites/html5_video.html') def run(self): if self.html5_path and os.path.isfile(self.html5_path): @@ -136,8 +135,8 @@ class Html5VideoThread(Thread): print("WARNING: test results may be invalid.") def terminate(self): - if self.html5_path and os.path.isfile(self.html5_path): - subprocess.call("pkill firefox", shell=True) + if self.html5_path and os.path.isfile(self.html5_path): + subprocess.call("pkill firefox", shell=True) def check_gpu(log=None): @@ -172,10 +171,10 @@ def main(): print("WARNING: Got an exception %s" % er) windows = "" for app in sorted(windows.splitlines(), reverse=True): - if not b'glxgears' in app: + if b'glxgears' not in app: continue GlxWindows[i].id = str( - re.match(b'^(0x\w+)', app).group(0), 'utf-8') + re.match(b'^(0x\w+)', app).group(0), 'utf-8') # noqa: W605 break if hasattr(GlxWindows[i], "id"): rotator = RotateGlxThread(GlxWindows[i].id, i + 1) @@ -204,7 +203,7 @@ def main(): 'gconftool --get /apps/compiz-1/general/screen0/options/vsize', shell=True)) (x_res, y_res) = re.search( - b'DG:\s+(\d+)x(\d+)', + b'DG:\s+(\d+)x(\d+)', # noqa: W605 subprocess.check_output('wmctrl -d', shell=True)).groups() DesktopSwitch = ChangeWorkspace( hsize, vsize, int(x_res) // hsize, int(y_res) // vsize) @@ -230,5 +229,6 @@ def main(): settings.set_int("vsize", vsize_ori) Gio.Settings.sync() + if __name__ == '__main__': sys.exit(main()) diff --git a/bin/graphic_memory_info.py b/bin/graphic_memory_info.py index ba7eece..29040a5 100755 --- a/bin/graphic_memory_info.py +++ b/bin/graphic_memory_info.py @@ -42,13 +42,13 @@ def vgamem_paser(data=None): device = None vgamems = list() for line in data.split('\n'): - for match in re.finditer('(\d\d:\d\d\.\d) VGA(.+): (.+)', line): + for match in re.finditer(r'(\d\d:\d\d\.\d) VGA(.+): (.+)', line): device = match.group(1) name = match.group(3) if device is None: continue -#Memory at e0000000 (32-bit, prefetchable) [size=256M] - for match in re.finditer('Memory(.+) prefetchable\) \[size=(\d+)M\]', +# Memory at e0000000 (32-bit, prefetchable) [size=256M] + for match in re.finditer(r'Memory(.+) prefetchable\) \[size=(\d+)M\]', line): vgamem_size = match.group(2) vgamem = {'device': device, @@ -77,5 +77,6 @@ def main(): vgamem['name'], vgamem['vgamem_size'])) + if __name__ == '__main__': sys.exit(main()) diff --git a/bin/graphics_driver.py b/bin/graphics_driver.py index 6d28f5a..0b41050 100755 --- a/bin/graphics_driver.py +++ b/bin/graphics_driver.py @@ -129,7 +129,7 @@ class XorgLog(object): gathering_module = False module = None m = re.search( - '\(II\) Loading.*modules\/drivers\/(.+)_drv\.so', line) + r'\(II\) Loading.*modules\/drivers\/(.+)_drv\.so', line) if m: found_ddx = True continue @@ -207,7 +207,8 @@ class XorgLog(object): # For NVIDIA m = re.search(r'\(II\) (.*)\(\d+\): Setting mode "(.*?):', line) if not m: - m = re.search(r'\(II\) (.*)\(\d+\): Setting mode "(NULL)"', line) + m = re.search( + r'\(II\) (.*)\(\d+\): Setting mode "(NULL)"', line) if m: self.displays[display_name] = display self.video_driver = m.group(1) @@ -314,8 +315,8 @@ def is_laptop(): def hybrid_graphics_check(xlog): '''Check for Hybrid Graphics''' - card_id1 = re.compile('.*0300: *(.+):(.+) \(.+\)') - card_id2 = re.compile('.*03..: *(.+):(.+)') + card_id1 = re.compile(r'.*0300: *(.+):(.+) \(.+\)') + card_id2 = re.compile(r'.*03..: *(.+):(.+)') cards_dict = {'8086': 'Intel', '10de': 'NVIDIA', '1002': 'AMD'} cards = [] drivers = [] @@ -372,5 +373,6 @@ def main(): return 1 if 1 in results else 0 + if __name__ == "__main__": sys.exit(main()) diff --git a/bin/graphics_modes_info.py b/bin/graphics_modes_info.py index f3c962a..f92cd76 100755 --- a/bin/graphics_modes_info.py +++ b/bin/graphics_modes_info.py @@ -32,7 +32,7 @@ from checkbox_support.contrib import xrandr def print_modes_info(screen): """Print some information about the detected screen and its outputs""" xrandr._check_required_version((1, 0)) - print("Screen %s: minimum %s x %s, current %s x %s, maximum %s x %s" %\ + print("Screen %s: minimum %s x %s, current %s x %s, maximum %s x %s" % (screen._screen, screen._width_min, screen._height_min, screen._width, screen._height, @@ -50,10 +50,9 @@ def print_modes_info(screen): for m in range(len(modes)): mode = modes[m] refresh = mode.dotClock / (mode.hTotal * mode.vTotal) - print(" [%s] %s x %s @ %s Hz" % (m, - mode.width, - mode.height, - refresh), end=' ') + print( + " [%s] %s x %s @ %s Hz" % + (m, mode.width, mode.height, refresh), end=' ') if mode.id == output._mode: print("(current)", end=' ') if m == output.get_preferred_mode(): @@ -70,5 +69,6 @@ def main(): except(xrandr.UnsupportedRRError): print('Error: RandR version lower than 1.0', file=sys.stderr) + if __name__ == '__main__': main() diff --git a/bin/graphics_stress_test.py b/bin/graphics_stress_test.py index d467b3f..c0eeb4a 100755 --- a/bin/graphics_stress_test.py +++ b/bin/graphics_stress_test.py @@ -45,7 +45,7 @@ class VtWrapper(object): vt = 0 proc = Popen(['ps', 'aux'], stdout=PIPE, universal_newlines=True) proc_output = proc.communicate()[0].split('\n') - proc_line = re.compile('.*tty(\d+).+/usr/bin/X.*') + proc_line = re.compile(r'.*tty(\d+).+/usr/bin/X.*') for line in proc_output: match = proc_line.match(line) if match: @@ -56,6 +56,7 @@ class VtWrapper(object): retcode = call(['chvt', '%d' % vt]) return retcode + class SuspendWrapper(object): def __init__(self): pass @@ -132,6 +133,7 @@ class SuspendWrapper(object): return status + class RotationWrapper(object): def __init__(self): @@ -178,6 +180,7 @@ class RotationWrapper(object): result = 1 return result + class RenderCheckWrapper(object): """A simple class to run the rendercheck suites""" @@ -234,14 +237,15 @@ class RenderCheckWrapper(object): passed = int(match_output.group(1).strip()) total = int(match_output.group(2).strip()) logging.info('Results:') - logging.info(' %d tests passed out of %d.' - % (passed, total)) + logging.info( + ' %d tests passed out of %d.' % (passed, total)) if show_errors and match_errors: error = match_errors.group(0).strip() if first_error: - logging.debug('Rendercheck %s suite errors ' - 'from iteration %d:' - % (suites, iteration)) + logging.debug( + 'Rendercheck %s suite errors ' + 'from iteration %d:' + % (suites, iteration)) first_error = False logging.debug(' %s' % error) @@ -254,17 +258,17 @@ class RenderCheckWrapper(object): exit_status = 0 for suite in suites: for it in range(iterations): - logging.info('Iteration %d of Rendercheck %s suite...' - % (it + 1, suite)) - (status, passed, total) = \ - self._print_test_info(suites=suite, - iteration=it + 1, - show_errors=show_errors) + logging.info( + 'Iteration %d of Rendercheck %s suite...' + % (it + 1, suite)) + (status, passed, total) = self._print_test_info( + suites=suite, iteration=it + 1, show_errors=show_errors) if status != 0: # Make sure to catch a non-zero exit status - logging.info('Iteration %d of Rendercheck %s suite ' - 'exited with status %d.' - % (it + 1, suite, status)) + logging.info( + 'Iteration %d of Rendercheck %s suite ' + 'exited with status %d.' + % (it + 1, suite, status)) exit_status = status it += 1 @@ -372,7 +376,6 @@ def main(): logfile_handler.setFormatter(logging.Formatter(format)) logger.addHandler(logfile_handler) - log_path = os.path.abspath(logfile) # Write only to stdout else: @@ -402,16 +405,18 @@ def main(): logging.info('Iteration %d...', it) retcode = vt_wrap.set_vt(target_vt) if retcode != 0: - logging.error('Error: switching to tty%d failed with code %d ' - 'on iteration %d' % (target_vt, retcode, it)) + logging.error( + 'Error: switching to tty%d failed with code %d ' + 'on iteration %d' % (target_vt, retcode, it)) status = 1 else: logging.info('Switching to tty%d: passed' % (target_vt)) time.sleep(2) retcode = vt_wrap.set_vt(vt_wrap.x_vt) if retcode != 0: - logging.error('Error: switching to tty%d failed with code %d ' - 'on iteration %d' % (vt_wrap.x_vt, retcode, it)) + logging.error( + 'Error: switching to tty%d failed with code %d ' + 'on iteration %d' % (vt_wrap.x_vt, retcode, it)) else: logging.info('Switching to tty%d: passed' % (vt_wrap.x_vt)) status = 1 @@ -463,5 +468,6 @@ def main(): return status + if __name__ == '__main__': exit(main()) diff --git a/bin/gst_pipeline_test.py b/bin/gst_pipeline_test.py index c7a9af5..f022632 100755 --- a/bin/gst_pipeline_test.py +++ b/bin/gst_pipeline_test.py @@ -9,9 +9,9 @@ import sys import time gi.require_version('Gst', '1.0') gi.require_version('GLib', '2.0') -from gi.repository import Gst -from gi.repository import GLib -from subprocess import check_output +from gi.repository import Gst # noqa: E402 +from gi.repository import GLib # noqa: E402 +from subprocess import check_output # noqa: E402 def check_state(device): @@ -22,8 +22,11 @@ def check_state(device): data = sink_info.split("\n") try: - device_name = re.findall(".*Name:\s.*%s.*" % device, sink_info, re.IGNORECASE)[0].lstrip() - sink = re.findall(".*Name:\s(.*%s.*)" % device, sink_info, re.IGNORECASE)[0].lstrip() + device_name = re.findall( + r".*Name:\s.*%s.*" % device, sink_info, re.IGNORECASE)[0].lstrip() + sink = re.findall( + r".*Name:\s(.*%s.*)" % device, sink_info, + re.IGNORECASE)[0].lstrip() status = data[data.index("\t" + device_name) - 1] except (IndexError, ValueError): logging.error("Failed to find status for device: %s" % device) @@ -32,22 +35,26 @@ def check_state(device): os.environ['PULSE_SINK'] = sink logging.info("[ Pulse sink ]".center(80, '=')) logging.info("Device: %s %s" % (device_name.strip(), status.strip())) - return status + return status def main(): parser = ArgumentParser(description='Simple GStreamer pipeline player') - parser.add_argument('PIPELINE', + parser.add_argument( + 'PIPELINE', help='Quoted GStreamer pipeline to launch') - parser.add_argument('-t', '--timeout', + parser.add_argument( + '-t', '--timeout', type=int, required=True, help='Timeout for running the pipeline') - parser.add_argument('-d', '--device', + parser.add_argument( + '-d', '--device', type=str, help="Device to check for status") args = parser.parse_args() - logging.basicConfig(format='%(levelname)s:%(message)s', level=logging.INFO, + logging.basicConfig( + format='%(levelname)s:%(message)s', level=logging.INFO, stream=sys.stdout) exit_code = 0 @@ -63,7 +70,7 @@ def main(): except GLib.GError as error: print("Specified pipeline couldn't be processed.") print("Error when processing pipeline: {}".format(error)) - #Exit harmlessly + # Exit harmlessly return(2) print("Pipeline initialized, now starting playback.") diff --git a/bin/hdd_parking.py b/bin/hdd_parking.py index 6d34904..01022ba 100755 --- a/bin/hdd_parking.py +++ b/bin/hdd_parking.py @@ -25,7 +25,7 @@ """ This script verifies that a systems HDD protection capabilities are triggered when appropriate. There are many implementations -of HDD protection from different OEMs, each implemented in a +of HDD protection from different OEMs, each implemented in a different way, so this script can only support implementations which are known to work and are testable. Currently the list of supported implementations is: @@ -42,10 +42,12 @@ from subprocess import Popen, PIPE TIMEOUT = 15.0 + def hdaps_test(run_time): try: - hdapsd = Popen(['/usr/sbin/hdapsd'], stdout=PIPE, stderr=PIPE, - universal_newlines=True) + hdapsd = Popen( + ['/usr/sbin/hdapsd'], stdout=PIPE, stderr=PIPE, + universal_newlines=True) except OSError as err: print("Unable to start hdapsd: {}".format(err)) return 1 @@ -59,6 +61,7 @@ def hdaps_test(run_time): return 0 return 1 + def main(): # First establish the driver used parser = ArgumentParser("Tests a systems HDD protection capabilities. " @@ -70,5 +73,6 @@ def main(): 'all axis. No particular force should be required.') return hdaps_test(parser.parse_args().timeout) + if __name__ == "__main__": sys.exit(main()) diff --git a/bin/hotkey_tests.py b/bin/hotkey_tests.py index f1b3560..066f36c 100755 --- a/bin/hotkey_tests.py +++ b/bin/hotkey_tests.py @@ -554,5 +554,6 @@ def main(): if failed: raise SystemExit('Some test failed!') + if __name__ == '__main__': main() diff --git a/bin/key_test.py b/bin/key_test.py index 2bd95f9..883bf32 100755 --- a/bin/key_test.py +++ b/bin/key_test.py @@ -29,7 +29,7 @@ import termios from gettext import gettext as _ -from gi.repository import GObject +from gi.repository import GLib from optparse import OptionParser @@ -81,7 +81,7 @@ class Reporter(object): self.scancodes = scancodes self.fileno = os.open("/dev/console", os.O_RDONLY) - GObject.io_add_watch(self.fileno, GObject.IO_IN, self.on_key) + GLib.io_add_watch(self.fileno, GLib.IO_IN, self.on_key) # Set terminal attributes self.saved_attributes = termios.tcgetattr(self.fileno) @@ -324,7 +324,8 @@ class GtkReporter(Reporter): def found_key(self, key): super(GtkReporter, self).found_key(key) - self.icons[key].set_from_stock(self.ICON_TESTED, size=self.ICON_SIZE) + self.icons[key].set_from_icon_name( + self.ICON_TESTED, size=self.ICON_SIZE) self.check_keys() @@ -334,7 +335,7 @@ class GtkReporter(Reporter): stock_icon = self.ICON_UNTESTED else: stock_icon = self.ICON_NOT_REQUIRED - self.icons[key].set_from_stock(stock_icon, self.ICON_SIZE) + self.icons[key].set_from_icon_name(stock_icon, self.ICON_SIZE) self.check_keys() @@ -394,10 +395,10 @@ Hint to find codes: key = Key(codes, name) keys.append(key) - main_loop = GObject.MainLoop() + main_loop = GLib.MainLoop() try: reporter = reporter_factory(main_loop, keys, options.scancodes) - except: + except OSError: parser.error("Failed to initialize interface: %s" % options.interface) try: @@ -408,5 +409,6 @@ Hint to find codes: return reporter.exit_code + if __name__ == "__main__": sys.exit(main(sys.argv[1:])) diff --git a/bin/keyboard_test.py b/bin/keyboard_test.py index 1615106..e87dc2e 100755 --- a/bin/keyboard_test.py +++ b/bin/keyboard_test.py @@ -84,5 +84,6 @@ def main(args): return 0 + if __name__ == "__main__": sys.exit(main(sys.argv[1:])) diff --git a/bin/lock_screen_watcher.py b/bin/lock_screen_watcher.py index 1c30270..e4433c8 100755 --- a/bin/lock_screen_watcher.py +++ b/bin/lock_screen_watcher.py @@ -20,8 +20,8 @@ import dbus import gi from dbus.mainloop.glib import DBusGMainLoop gi.require_version('GLib', '2.0') -from gi.repository import GObject -from gi.repository import GLib +from gi.repository import GObject # noqa: E402 +from gi.repository import GLib # noqa: E402 def filter_cb(bus, message): @@ -41,6 +41,7 @@ def on_timeout_expired(): print("You have failed to perform the required manipulation in time") raise SystemExit(1) + DBusGMainLoop(set_as_default=True) bus = dbus.SessionBus() bus.add_match_string("type='signal',interface='com.ubuntu.Upstart0_6'") diff --git a/bin/lsmod_info.py b/bin/lsmod_info.py index 3b81542..cb7c0bc 100755 --- a/bin/lsmod_info.py +++ b/bin/lsmod_info.py @@ -36,5 +36,6 @@ def main(): print('%s: %s' % (module, version)) return 0 + if __name__ == '__main__': sys.exit(main()) diff --git a/bin/manage_compiz_plugin.py b/bin/manage_compiz_plugin.py index 03c0ed8..be9541f 100755 --- a/bin/manage_compiz_plugin.py +++ b/bin/manage_compiz_plugin.py @@ -29,11 +29,10 @@ from gettext import gettext as _ import argparse import gettext import os -import sys import subprocess import time -KEY="/org/compiz/profiles/unity/plugins/core/active-plugins" +KEY = "/org/compiz/profiles/unity/plugins/core/active-plugins" gettext.textdomain("com.canonical.certification.checkbox") gettext.bindtextdomain("com.canonical.certification.checkbox", @@ -41,8 +40,9 @@ gettext.bindtextdomain("com.canonical.certification.checkbox", plugins = eval(subprocess.check_output(["dconf", "read", KEY])) -parser = argparse.ArgumentParser(description=_("enable/disable compiz plugins"), - epilog=_("Available plugins: {}").format(plugins)) +parser = argparse.ArgumentParser( + description=_("enable/disable compiz plugins"), + epilog=_("Available plugins: {}").format(plugins)) parser.add_argument("plugin", type=str, help=_('Name of plugin to control')) parser.add_argument("action", type=str, choices=['enable', 'disable'], help=_("What to do with the plugin")) diff --git a/bin/memory_compare.py b/bin/memory_compare.py index 02e2def..aaf89f4 100755 --- a/bin/memory_compare.py +++ b/bin/memory_compare.py @@ -25,7 +25,6 @@ import os import sys import re -from math import log, copysign from subprocess import check_output, CalledProcessError, PIPE from checkbox_support.helpers.human_readable_bytes import HumanReadableBytes @@ -44,7 +43,7 @@ class LshwJsonResult: desc_regex = re.compile('System Memory', re.IGNORECASE) def addHardware(self, hardware): - if self.desc_regex.match(str(hardware.get('description',0))): + if self.desc_regex.match(str(hardware.get('description', 0))): self.memory_reported += int(hardware.get('size', 0)) elif 'bank' in hardware['id']: self.banks_reported += int(hardware.get('size', 0)) @@ -134,5 +133,6 @@ def main(): (difference, percentage, threshold), file=sys.stderr) return 1 + if __name__ == "__main__": sys.exit(main()) diff --git a/bin/network.py b/bin/network.py index a646f9c..60f7b0a 100755 --- a/bin/network.py +++ b/bin/network.py @@ -255,14 +255,15 @@ class IPerfPerformanceTest(object): logging.warning("The network test against {} failed because:". format(self.target)) if percent < self.fail_threshold: - logging.error(" Transfer speed: {} Mb/s". - format(throughput)) - logging.error(" {:03.2f}% of theoretical max {} Mb/s\n". - format(percent, int(self.iface.max_speed))) + logging.error(" Transfer speed: {} Mb/s".format(throughput)) + logging.error( + " {:03.2f}% of theoretical max {} Mb/s\n".format( + percent, int(self.iface.max_speed))) if cpu_load > self.cpu_load_fail_threshold: logging.error(" CPU load: {}%".format(cpu_load)) - logging.error(" CPU load is above {}% maximum\n". - format(self.cpu_load_fail_threshold)) + logging.error( + " CPU load is above {}% maximum\n".format( + self.cpu_load_fail_threshold)) return 30 logging.debug("Passed benchmark against {}".format(self.target)) @@ -277,9 +278,11 @@ class StressPerformanceTest: def run(self): if self.iperf3: - iperf_cmd = 'timeout -k 1 320 iperf3 -c {} -t 300'.format(self.target) + iperf_cmd = 'timeout -k 1 320 iperf3 -c {} -t 300'.format( + self.target) else: - iperf_cmd = 'timeout -k 1 320 iperf -c {} -t 300'.format(self.target) + iperf_cmd = 'timeout -k 1 320 iperf -c {} -t 300'.format( + self.target) print("Running iperf...") iperf = subprocess.Popen(shlex.split(iperf_cmd)) @@ -365,14 +368,14 @@ class Interface(socket.socket): ethinfo = check_output(['ethtool', self.interface], universal_newlines=True, stderr=STDOUT).split(' ') - expression = '(\\d+)(base)([A-Z]+)|(\d+)(Mb/s)' + expression = r'(\\d+)(base)([A-Z]+)|(\d+)(Mb/s)' regex = re.compile(expression) if ethinfo: for i in ethinfo: hit = regex.search(i) if hit: - speeds.append(int(re.sub("\D", "", hit.group(0)))) + speeds.append(int(re.sub(r"\D", "", hit.group(0)))) except CalledProcessError as e: logging.error('ethtool returned an error!') logging.error(e.output) @@ -391,7 +394,7 @@ class Interface(socket.socket): for s in line.split(' '): hit = regex.search(s) if hit: - speeds.append(int(re.sub("\D", "", hit.group(0)))) + speeds.append(int(re.sub(r"\D", "", hit.group(0)))) except FileNotFoundError: logging.warning('mii-tool not found! Unable to get max speed') except CalledProcessError as e: diff --git a/bin/network_check.py b/bin/network_check.py index 82c00d9..20b88d5 100755 --- a/bin/network_check.py +++ b/bin/network_check.py @@ -6,7 +6,9 @@ ubuntu.com from subprocess import call from argparse import ArgumentParser import http.client -import urllib.request, urllib.error, urllib.parse +import urllib.request +import urllib.error +import urllib.parse import sys @@ -61,12 +63,14 @@ def main(): try: call(command) except OSError: - print("Zenity missing; unable to report test result:\n %s" % message) + print( + "Zenity missing; unable to report test result:\n %s" % message) if any(results.values()): return 0 else: return 1 + if __name__ == "__main__": sys.exit(main()) diff --git a/bin/network_ntp_test.py b/bin/network_ntp_test.py index 6e2ccf8..f7a2297 100755 --- a/bin/network_ntp_test.py +++ b/bin/network_ntp_test.py @@ -84,13 +84,13 @@ def StartStopNTPD(state, pid=0): SilentCall('/etc/init.d/ntp start') ntpd_status = CheckNTPD() - if status == 0: + if ntpd_status == 0: logging.debug('ntpd restarted with PID: %s' % ntpd_status[1]) else: logging.error('ntpd restart failed for some reason') else: - logging.error('%s is an unknown state, unable to start/stop ntpd' % - state) + logging.error( + '%s is an unknown state, unable to start/stop ntpd' % state) def SyncTime(server): @@ -141,7 +141,8 @@ def SkewTime(): def main(): - description = 'Tests the ability to skew and sync the clock with an NTP server' + description = ( + 'Tests the ability to skew and sync the clock with an NTP server') parser = ArgumentParser(description=description) parser.add_argument('--server', action='store', @@ -173,7 +174,7 @@ def main(): if args.debug: logger.setLevel(logging.DEBUG) logger.addHandler(handler) - + # Make sure NTP is installed if not os.access('/usr/sbin/ntpdate', os.F_OK): logging.error('NTP is not installed!') @@ -208,5 +209,6 @@ def main(): else: return 1 + if __name__ == '__main__': sys.exit(main()) diff --git a/bin/network_reconnect_resume_test.py b/bin/network_reconnect_resume_test.py index 8861eb6..17a220e 100755 --- a/bin/network_reconnect_resume_test.py +++ b/bin/network_reconnect_resume_test.py @@ -43,16 +43,17 @@ def get_wifi_reconnect_times(): Returns a list of all the timestamps for wifi reconnects. """ data = subprocess.check_output(['dmesg'], universal_newlines=True) - syntax = re.compile("\[(.*)\] wlan.* associated") + syntax = re.compile(r"\[(.*)\] wlan.* associated") results = re.findall(syntax, data) return map(float, results) + def get_wired_reconnect_times(): """ Returns a list of all the timestamps for wired reconnects. """ data = subprocess.check_output(['dmesg'], universal_newlines=True) - syntax = re.compile("\[(.*)\].*eth.* Link is [uU]p") + syntax = re.compile(r"\[(.*)\].*eth.* Link is [uU]p") results = re.findall(syntax, data) return map(float, results) @@ -63,7 +64,7 @@ def get_resume_time(): If no resume is found, None is returned. """ data = subprocess.check_output(['dmesg'], universal_newlines=True) - syntax = re.compile("\[(.*)\].ACPI: Waking up from system sleep state S3") + syntax = re.compile(r"\[(.*)\].ACPI: Waking up from system sleep state S3") results = re.findall(syntax, data) if not results: return None @@ -85,7 +86,7 @@ def main(): args = parser.parse_args() timedif = get_time_difference(args.device) - + if not timedif: return 0 @@ -98,5 +99,6 @@ def main(): print("PASS: the network connected within the allotted time") return 0 + if __name__ == "__main__": sys.exit(main()) diff --git a/bin/network_restart.py b/bin/network_restart.py index 14c7357..63a99fb 100755 --- a/bin/network_restart.py +++ b/bin/network_restart.py @@ -96,7 +96,8 @@ class GtkApplication(Application): def __init__(self, address, times): Application.__init__(self, address, times) - dialog = Gtk.Dialog(title='Network restart', + dialog = Gtk.Dialog( + title='Network restart', buttons=(Gtk.STOCK_CANCEL, Gtk.ResponseType.CANCEL)) dialog.set_default_size(300, 100) @@ -187,7 +188,8 @@ def ping(address): """ logging.info('Pinging {0!r}...'.format(address)) try: - check_call('ping -c 1 -w 5 {0}'.format(address), + check_call( + 'ping -c 1 -w 5 {0}'.format(address), stdout=open(os.devnull, 'w'), stderr=STDOUT, shell=True) except CalledProcessError: return False @@ -209,8 +211,9 @@ class Networking: """ output = check_output(['/sbin/ifconfig', '-s', '-a']) lines = output.splitlines()[1:] - interfaces = ([interface for interface in - [line.split()[0] for line in lines] if interface != 'lo']) + interfaces = ( + [interface for interface in + [line.split()[0] for line in lines] if interface != 'lo']) return interfaces def restart(self): @@ -287,21 +290,24 @@ def parse_args(): """ Parse command line options """ - parser = ArgumentParser('Reboot networking interface ' - 'and verify that is up again afterwards') - parser.add_argument('-a', '--address', default='ubuntu.com', + parser = ArgumentParser( + 'Reboot networking interface and verify that is up again afterwards') + parser.add_argument( + '-a', '--address', default='ubuntu.com', help=('Address to ping to verify that network connection is up ' "('%(default)s' by default)")) parser.add_argument('-o', '--output', default='/var/log', help='The path to the log directory. \ Default is /var/log') - parser.add_argument('-t', '--times', + parser.add_argument( + '-t', '--times', type=int, default=1, help=('Number of times that the network interface has to be restarted ' '(%(default)s by default)')) log_levels = ['notset', 'debug', 'info', 'warning', 'error', 'critical'] - parser.add_argument('--log-level', dest='log_level_str', default='notset', + parser.add_argument( + '--log-level', dest='log_level_str', default='notset', choices=log_levels, help=('Log level. ' 'One of {0} or {1} (%(default)s by default)' diff --git a/bin/optical_read_test.py b/bin/optical_read_test.py index b3cf23d..7560db1 100755 --- a/bin/optical_read_test.py +++ b/bin/optical_read_test.py @@ -7,7 +7,7 @@ import filecmp import shutil from argparse import ArgumentParser -from subprocess import Popen, PIPE +from subprocess import Popen, PIPE, SubprocessError DEFAULT_DIR = '/tmp/checkbox.optical' DEFAULT_DEVICE_DIR = 'device' @@ -18,11 +18,7 @@ CDROM_ID = '/lib/udev/cdrom_id' def _command(command, shell=True): - proc = Popen(command, - shell=shell, - stdout=PIPE, - stderr=PIPE - ) + proc = Popen(command, shell=shell, stdout=PIPE, stderr=PIPE) return proc @@ -33,7 +29,7 @@ def _command_out(command, shell=True): def compare_tree(source, target): for dirpath, dirnames, filenames in os.walk(source): - #if both tree are empty return false + # if both tree are empty return false if dirpath == source and dirnames == [] and filenames == []: return False for name in filenames: @@ -64,21 +60,21 @@ def read_test(device): mount = _command("mount -o ro %s %s" % (device, device_dir)) mount.communicate() if mount.returncode != 0: - print("Unable to mount %s to %s" % - (device, device_dir), file=sys.stderr) + print("Unable to mount %s to %s" % + (device, device_dir), file=sys.stderr) return False file_copy = _command("cp -dpR %s %s" % (device_dir, image_dir)) file_copy.communicate() if file_copy.returncode != 0: - print("Failed to copy files from %s to %s" % - (device_dir, image_dir), file=sys.stderr) + print("Failed to copy files from %s to %s" % + (device_dir, image_dir), file=sys.stderr) return False if compare_tree(device_dir, image_dir): passed = True - except: - print("File Comparison failed while testing %s" % device, - file=sys.stderr) + except (SubprocessError, OSError): + print("File Comparison failed while testing %s" % device, + file=sys.stderr) passed = False finally: _command("umount %s" % device_dir).communicate(3) @@ -88,7 +84,7 @@ def read_test(device): if passed: print("File Comparison passed (%s)" % device) - + return passed @@ -127,8 +123,9 @@ def main(): print("Testing %s on %s ... " % (test, device), file=sys.stdout) tester = "%s_test" % test return_values.append(globals()[tester](device)) - + return False in return_values + if __name__ == "__main__": sys.exit(main()) diff --git a/bin/pm_log_check.py b/bin/pm_log_check.py index cb75685..fae8eca 100755 --- a/bin/pm_log_check.py +++ b/bin/pm_log_check.py @@ -36,8 +36,8 @@ class Parser(object): """ Reboot test log file parser """ - is_logging_line = (re.compile('^\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2},\d{3}') - .search) + is_logging_line = (re.compile( + r'^\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2},\d{3}').search) is_getting_info_line = (re.compile('Gathering hardware information...$') .search) is_executing_line = (re.compile("Executing: '(?P<command>.*)'...$") @@ -103,8 +103,10 @@ class Parser(object): if self.is_output_line(line): command_output = {} break - if (self.is_executing_line(line) - or self.is_getting_info_line(line)): + if ( + self.is_executing_line(line) + or self.is_getting_info_line(line) + ): # Skip commands with no output iterator.unnext(line) return None @@ -132,8 +134,10 @@ class Parser(object): # for the field value value = [] for line in iterator: - if (self.is_logging_line(line) - or self.is_field_line(line)): + if ( + self.is_logging_line(line) + or self.is_field_line(line) + ): iterator.unnext(line) break diff --git a/bin/pm_test.py b/bin/pm_test.py index 18cf97c..fcaf7d9 100755 --- a/bin/pm_test.py +++ b/bin/pm_test.py @@ -20,7 +20,7 @@ from configparser import ConfigParser from datetime import datetime, timedelta from time import localtime, time gi.require_version("Gtk", "3.0") -from gi.repository import GObject, Gtk +from gi.repository import GObject, Gtk # noqa: E402 def main(): @@ -266,9 +266,9 @@ class PowerManagementOperation(): count = count_s3 + count_s2idle if count != total_suspends_expected: problems = ( - "Found {} occurrences of '{}'. Expected {}".format( - count, magic_line.strip(), - total_suspends_expected)) + "Found {} occurrences of S3/S2idle." + " Expected {}".format( + count, total_suspends_expected)) except FileNotFoundError: problems = "Error opening {}".format(fwts_log_path) if problems: @@ -367,13 +367,13 @@ class WakeUpAlarm(): with open(cls.ALARM_FILENAME) as alarm_file: wakeup_time_stored_str = alarm_file.read() - if not re.match('\d+', wakeup_time_stored_str): + if not re.match(r'\d+', wakeup_time_stored_str): subprocess.check_call('echo "+%d" > %s' % (timeout, cls.ALARM_FILENAME), shell=True) with open(cls.ALARM_FILENAME) as alarm_file2: wakeup_time_stored_str = alarm_file2.read() - if not re.match('\d+', wakeup_time_stored_str): + if not re.match(r'\d+', wakeup_time_stored_str): logging.error('Invalid wakeup time format: {0!r}' .format(wakeup_time_stored_str)) sys.exit(1) @@ -396,7 +396,7 @@ class WakeUpAlarm(): sys.exit(1) with open(cls.RTC_FILENAME) as rtc_file: - separator_regex = re.compile('\s+:\s+') + separator_regex = re.compile(r'\s+:\s+') rtc_data = dict([separator_regex.split(line.rstrip()) for line in rtc_file]) logging.debug('RTC data:\n{0}' @@ -582,14 +582,15 @@ class CountdownDialog(Gtk.Dialog): .run().stdout), ethernet=(Command('lspci | grep Ethernet') .run().stdout), - ifconfig=(Command("ifconfig -a | grep -A1 '^\w'") + ifconfig=(Command( + r"ifconfig -a | grep -A1 '^\w'") .run().stdout), - iwconfig=(Command("iwconfig | grep -A1 '^\w'") + iwconfig=(Command(r"iwconfig | grep -A1 '^\w'") .run().stdout))) logging.debug('Bluetooth Device:\n' '{hciconfig}' - .format(hciconfig=(Command("hciconfig -a " - "| grep -A2 '^\w'") + .format(hciconfig=(Command(r"hciconfig -a " + r"| grep -A2 '^\w'") .run().stdout))) logging.debug('Video Card:\n' '{lspci}' @@ -770,7 +771,7 @@ Exec=sudo {script} -r {repetitions} -w {wakeup} --hardware-delay {hardware_delay Type=Application X-GNOME-Autostart-enabled=true Hidden=false -""" +""" # noqa: E501 def __init__(self, args, user=None): self.args = args @@ -1012,7 +1013,7 @@ class MyArgumentParser(): '{0}.{1}.{2}.log'.format( os.path.splitext(os.path.basename(__file__))[0], args.pm_operation, - args.total))) + args.total)) return args, extra_args diff --git a/bin/process_wait.py b/bin/process_wait.py index 6107d1b..1295991 100755 --- a/bin/process_wait.py +++ b/bin/process_wait.py @@ -36,14 +36,17 @@ def main(args): usage = "Usage: %prog PROCESS [PROCESS...]" parser = OptionParser(usage=usage) - parser.add_option("-s", "--sleep", + parser.add_option( + "-s", "--sleep", type="int", default=default_sleep, help="Number of seconds to sleep between checks.") - parser.add_option("-t", "--timeout", + parser.add_option( + "-t", "--timeout", type="int", help="Number of seconds to timeout from sleeping.") - parser.add_option("-u", "--uid", + parser.add_option( + "-u", "--uid", help="Effective user name or id of the running processes") (options, processes) = parser.parse_args(args) diff --git a/bin/pulse-active-port-change.py b/bin/pulse-active-port-change.py index 90abed8..baf0152 100755 --- a/bin/pulse-active-port-change.py +++ b/bin/pulse-active-port-change.py @@ -71,20 +71,21 @@ class AudioPlugDetection: doc = parse_pactl_output(text) cfg = set() for record in doc.record_list: - active_port = None - port_availability = None - # We go through the attribute list once to try to find an active port - for attr in record.attribute_list: - if attr.name == "Active Port": - active_port = attr.value - # If there is one, we retrieve its availability flag - if active_port: - for attr in record.attribute_list: - if attr.name == "Ports": - for port in attr.value: - if port.name == active_port: - port_availability = port.availability - cfg.add((record.name, active_port, port_availability)) + active_port = None + port_availability = None + # We go through the attribute list once to try to find + # an active port + for attr in record.attribute_list: + if attr.name == "Active Port": + active_port = attr.value + # If there is one, we retrieve its availability flag + if active_port: + for attr in record.attribute_list: + if attr.name == "Ports": + for port in attr.value: + if port.name == active_port: + port_availability = port.availability + cfg.add((record.name, active_port, port_availability)) return cfg def on_timeout(self, signum, frame): diff --git a/bin/removable_storage_test.py b/bin/removable_storage_test.py index 82f1595..4253cbf 100755 --- a/bin/removable_storage_test.py +++ b/bin/removable_storage_test.py @@ -15,26 +15,30 @@ import time import gi gi.require_version('GUdev', '1.0') -from gi.repository import GUdev - -from checkbox_support.dbus import connect_to_system_bus -from checkbox_support.dbus.udisks2 import UDISKS2_BLOCK_INTERFACE -from checkbox_support.dbus.udisks2 import UDISKS2_DRIVE_INTERFACE -from checkbox_support.dbus.udisks2 import UDISKS2_FILESYSTEM_INTERFACE -from checkbox_support.dbus.udisks2 import UDISKS2_LOOP_INTERFACE -from checkbox_support.dbus.udisks2 import UDisks2Model, UDisks2Observer -from checkbox_support.dbus.udisks2 import is_udisks2_supported -from checkbox_support.dbus.udisks2 import lookup_udev_device -from checkbox_support.dbus.udisks2 import map_udisks1_connection_bus -from checkbox_support.heuristics.udisks2 import is_memory_card -from checkbox_support.helpers.human_readable_bytes import HumanReadableBytes -from checkbox_support.parsers.udevadm import CARD_READER_RE -from checkbox_support.parsers.udevadm import GENERIC_RE -from checkbox_support.parsers.udevadm import FLASH_RE -from checkbox_support.parsers.udevadm import find_pkname_is_root_mountpoint -from checkbox_support.udev import get_interconnect_speed -from checkbox_support.udev import get_udev_block_devices -from checkbox_support.udev import get_udev_xhci_devices +from gi.repository import GUdev # noqa: E402 + +from checkbox_support.dbus import connect_to_system_bus # noqa: E402 +from checkbox_support.dbus.udisks2 import ( + UDISKS2_BLOCK_INTERFACE, + UDISKS2_DRIVE_INTERFACE, + UDISKS2_FILESYSTEM_INTERFACE, + UDISKS2_LOOP_INTERFACE, + UDisks2Model, + UDisks2Observer, + is_udisks2_supported, + lookup_udev_device, + map_udisks1_connection_bus) # noqa: E402 +from checkbox_support.heuristics.udisks2 import is_memory_card # noqa: E402 +from checkbox_support.helpers.human_readable_bytes import ( + HumanReadableBytes) # noqa: E402 +from checkbox_support.parsers.udevadm import ( + CARD_READER_RE, + GENERIC_RE, + FLASH_RE, + find_pkname_is_root_mountpoint) # noqa: E402 +from checkbox_support.udev import get_interconnect_speed # noqa: E402 +from checkbox_support.udev import get_udev_block_devices # noqa: E402 +from checkbox_support.udev import get_udev_xhci_devices # noqa: E402 class ActionTimer(): @@ -109,8 +113,8 @@ def on_ubuntucore(): snap = os.getenv("SNAP") if snap: with open(os.path.join(snap, 'meta/snap.yaml')) as f: - for l in f.readlines(): - if l == "confinement: classic\n": + for line in f.readlines(): + if line == "confinement: classic\n": return False return True return False @@ -855,7 +859,6 @@ def main(): # controller drivers. # This will raise KeyError for no # associated disk device was found. - xhci_disks = test.get_disks_xhci() if test.get_disks_xhci().get(disk, '') != 'xhci': raise SystemExit( "\t\tDisk does not use xhci_hcd.") diff --git a/bin/removable_storage_watcher.py b/bin/removable_storage_watcher.py index d5a0722..7120f6d 100755 --- a/bin/removable_storage_watcher.py +++ b/bin/removable_storage_watcher.py @@ -9,16 +9,21 @@ import sys import gi gi.require_version('GUdev', '1.0') -from gi.repository import GObject, GUdev - -from checkbox_support.dbus import connect_to_system_bus -from checkbox_support.dbus.udisks2 import UDisks2Model, UDisks2Observer -from checkbox_support.dbus.udisks2 import is_udisks2_supported -from checkbox_support.dbus.udisks2 import lookup_udev_device -from checkbox_support.dbus.udisks2 import map_udisks1_connection_bus -from checkbox_support.heuristics.udisks2 import is_memory_card -from checkbox_support.parsers.udevadm import CARD_READER_RE, GENERIC_RE, FLASH_RE -from checkbox_support.udev import get_interconnect_speed, get_udev_block_devices +from gi.repository import GObject, GUdev # noqa: E402 + +from checkbox_support.dbus import connect_to_system_bus # noqa: E402 +from checkbox_support.dbus.udisks2 import UDisks2Model # noqa: E402 +from checkbox_support.dbus.udisks2 import UDisks2Observer # noqa: E402 +from checkbox_support.dbus.udisks2 import is_udisks2_supported # noqa: E402 +from checkbox_support.dbus.udisks2 import lookup_udev_device # noqa: E402 +from checkbox_support.dbus.udisks2 import ( + map_udisks1_connection_bus) # noqa: E402 +from checkbox_support.heuristics.udisks2 import is_memory_card # noqa: E402 +from checkbox_support.parsers.udevadm import CARD_READER_RE # noqa: E402 +from checkbox_support.parsers.udevadm import GENERIC_RE # noqa: E402 +from checkbox_support.parsers.udevadm import FLASH_RE # noqa: E402 +from checkbox_support.udev import get_interconnect_speed # noqa: E402 +from checkbox_support.udev import get_udev_block_devices # noqa: E402 # Record representing properties of a UDisks1 Drive object needed by the # UDisks1 version of the watcher implementation @@ -121,19 +126,24 @@ class UDisks1StorageDeviceListener: logging.debug("Desired bus devices: %s", desired_bus_devices) for dev in desired_bus_devices: if self._memorycard: - if (dev.bus != 'sdio' + if ( + dev.bus != 'sdio' and not FLASH_RE.search(dev.media) and not CARD_READER_RE.search(dev.model) - and not GENERIC_RE.search(dev.vendor)): - logging.debug("The device does not seem to be a memory" - " card (bus: %r, model: %r), skipping", - dev.bus, dev.model) + and not GENERIC_RE.search(dev.vendor) + ): + logging.debug( + "The device does not seem to be a memory" + " card (bus: %r, model: %r), skipping", + dev.bus, dev.model) return print(message % {'bus': 'memory card', 'file': dev.file}) else: - if (FLASH_RE.search(dev.media) + if ( + FLASH_RE.search(dev.media) or CARD_READER_RE.search(dev.model) - or GENERIC_RE.search(dev.vendor)): + or GENERIC_RE.search(dev.vendor) + ): logging.debug("The device seems to be a memory" " card (bus: %r (model: %r), skipping", dev.bus, dev.model) @@ -180,8 +190,7 @@ class UDisks1StorageDeviceListener: message = "Expected memory card device %(file)s inserted" else: message = "Expected %(bus)s device %(file)s inserted" - self.verify_device_change(inserted_devices, - message=message) + self.verify_device_change(inserted_devices, message=message) def add_detected(self, added_path): logging.debug("UDisks1 reports device has been added: %s", added_path) @@ -205,8 +214,9 @@ class UDisks1StorageDeviceListener: removed_devices = list(set(self._starting_devices) - set(current_devices)) logging.debug("Computed removed devices: %s", removed_devices) - self.verify_device_change(removed_devices, - message="Removable %(bus)s device %(file)s has been removed") + self.verify_device_change( + removed_devices, + message="Removable %(bus)s device %(file)s has been removed") def get_existing_devices(self): logging.debug("Getting existing devices from UDisks1") @@ -221,12 +231,9 @@ class UDisks1StorageDeviceListener: device_props = dbus.Interface(device_obj, dbus.PROPERTIES_IFACE) udisks = 'org.freedesktop.UDisks.Device' - _device_file = device_props.Get(udisks, - "DeviceFile") - _bus = device_props.Get(udisks, - "DriveConnectionInterface") - _speed = device_props.Get(udisks, - "DriveConnectionSpeed") + _device_file = device_props.Get(udisks, "DeviceFile") + _bus = device_props.Get(udisks, "DriveConnectionInterface") + _speed = device_props.Get(udisks, "DriveConnectionSpeed") _parent_model = '' _parent_media = '' _parent_vendor = '' @@ -411,7 +418,7 @@ class UDisks2StorageDeviceListener: UDISKS2_DRIVE_PROPERTY_CONNECTION_BUS = "ConnectionBus" def __init__(self, system_bus, loop, action, devices, minimum_speed, - memorycard, unmounted = False): + memorycard, unmounted=False): # Store the desired minimum speed of the device in Mbit/s. The argument # is passed as the number of bits per second so let's fix that. self._desired_minimum_speed = minimum_speed / 10 ** 6 @@ -581,24 +588,30 @@ class UDisks2StorageDeviceListener: continue # For devices with empty "ConnectionBus" property, don't # require the device to be mounted - if (record.value.iface_name == + if ( + record.value.iface_name == "org.freedesktop.UDisks2.Drive" and record.value.delta_type == DELTA_TYPE_PROP and record.value.prop_name == "ConnectionBus" - and record.value.prop_value == ""): + and record.value.prop_value == "" + ): needs.remove('mounted') # Detect block devices designated for filesystems - if (record.value.iface_name == + if ( + record.value.iface_name == "org.freedesktop.UDisks2.Block" and record.value.delta_type == DELTA_TYPE_PROP and record.value.prop_name == "IdUsage" - and record.value.prop_value == "filesystem"): + and record.value.prop_value == "filesystem" + ): found.add('block-fs') # Memorize the block device path - elif (record.value.iface_name == + elif ( + record.value.iface_name == "org.freedesktop.UDisks2.Block" and record.value.delta_type == DELTA_TYPE_PROP - and record.value.prop_name == "PreferredDevice"): + and record.value.prop_name == "PreferredDevice" + ): object_block_device = record.value.prop_value # Ensure the device is a partition elif (record.value.iface_name == @@ -845,8 +858,9 @@ def main(): description = "Wait for the specified device to be inserted or removed." parser = argparse.ArgumentParser(description=description) parser.add_argument('action', choices=['insert', 'remove']) - parser.add_argument('device', choices=['usb', 'sdio', 'firewire', 'scsi', - 'ata_serial_esata'], nargs="+") + parser.add_argument( + 'device', choices=['usb', 'sdio', 'firewire', 'scsi', + 'ata_serial_esata'], nargs="+") memorycard_help = ("Memory cards devices on bus other than sdio require " "this parameter to identify them as such") parser.add_argument('--memorycard', action="store_true", @@ -902,5 +916,6 @@ def main(): except KeyboardInterrupt: return 1 + if __name__ == "__main__": sys.exit(main()) diff --git a/bin/resolution_test.py b/bin/resolution_test.py index 55a7828..77238ff 100755 --- a/bin/resolution_test.py +++ b/bin/resolution_test.py @@ -5,7 +5,7 @@ import sys from argparse import ArgumentParser gi.require_version('Gdk', '3.0') -from gi.repository import Gdk +from gi.repository import Gdk # noqa: E402 def check_resolution(): @@ -28,11 +28,13 @@ def compare_resolution(min_h, min_v): def main(): parser = ArgumentParser() - parser.add_argument("--horizontal", + parser.add_argument( + "--horizontal", type=int, default=0, help="Minimum acceptable horizontal resolution.") - parser.add_argument("--vertical", + parser.add_argument( + "--vertical", type=int, default=0, help="Minimum acceptable vertical resolution.") diff --git a/bin/rotation_test.py b/bin/rotation_test.py index 6bf90fd..de70f4a 100755 --- a/bin/rotation_test.py +++ b/bin/rotation_test.py @@ -27,7 +27,8 @@ import time import subprocess gi.require_version('Gdk', '3.0') -from gi.repository import Gdk +from gi.repository import Gdk # noqa: E402 + def main(): """Run rotation cycling by running xrandr command.""" @@ -41,5 +42,6 @@ def main(): ['xrandr', '--output', output, '--rotation', rotation]) time.sleep(4) + if __name__ == '__main__': exit(main()) diff --git a/bin/sleep_test.py b/bin/sleep_test.py index 423fd6c..3483951 100755 --- a/bin/sleep_test.py +++ b/bin/sleep_test.py @@ -184,19 +184,19 @@ class SuspendTest(): if perf: for idx in range(0, len(loglist)): if 'PM: Syncing filesystems' in loglist[idx]: - sleep_start_time = re.split('[\[\]]', + sleep_start_time = re.split(r'[\[\]]', loglist[idx])[1].strip() logging.debug('Sleep started at %s' % sleep_start_time) if 'ACPI: Low-level resume complete' in loglist[idx]: - sleep_end_time = re.split('[\[\]]', + sleep_end_time = re.split(r'[\[\]]', loglist[idx - 1])[1].strip() logging.debug('Sleep ended at %s' % sleep_end_time) - resume_start_time = re.split('[\[\]]', + resume_start_time = re.split(r'[\[\]]', loglist[idx])[1].strip() logging.debug('Resume started at %s' % resume_start_time) idx += 1 if 'Restarting tasks' in loglist[idx]: - resume_end_time = re.split('[\[\]]', + resume_end_time = re.split(r'[\[\]]', loglist[idx])[1].strip() logging.debug('Resume ended at %s' % resume_end_time) if self.end_marker in loglist[idx]: @@ -395,5 +395,6 @@ def main(): options.iterations) return 0 + if __name__ == '__main__': sys.exit(main()) diff --git a/bin/sleep_test_log_check.py b/bin/sleep_test_log_check.py index aae7556..350bbbb 100755 --- a/bin/sleep_test_log_check.py +++ b/bin/sleep_test_log_check.py @@ -138,8 +138,9 @@ def main(): logging.basicConfig(level=args.debug) - #Create a generator and get our lines - log = (line.rstrip() for line in open(args.logfile, 'rt', encoding="UTF-8")) + # Create a generator and get our lines + log = ( + line.rstrip() for line in open(args.logfile, 'rt', encoding="UTF-8")) # End result will be a dictionary with a key per level, value is another # dictionary with a key per test (s3, s4, ...) and a list of all failures @@ -166,9 +167,8 @@ def main(): # Report what I found for level in sorted(results.keys()): - if results[level]: # Yes, we can have an empty level. We may have - # seen the levelheader but had it report no - # failures. + if results[level]: # Yes, we can have an empty level. + # We may have seen the levelheader but had it report no failures. print("{} failures:".format(level)) for test in results[level].keys(): print(" {}: {} failures".format(test, @@ -184,8 +184,8 @@ def main(): logging.error("No fwts test summaries found, " "possible malformed fwts log file") return_code = 2 - elif not results.keys(): # If it has no keys, means we didn't see any - # FWTS levels + elif not results.keys(): + # If it has no keys, means we didn't see any FWTS levels logging.error("None of the summaries contained failure levels, " "possible malformed fwts log file") return_code = 2 @@ -197,5 +197,6 @@ def main(): return return_code + if __name__ == '__main__': sys.exit(main()) diff --git a/bin/sleep_time_check.py b/bin/sleep_time_check.py index f9513fa..90e1bc7 100755 --- a/bin/sleep_time_check.py +++ b/bin/sleep_time_check.py @@ -53,13 +53,18 @@ def main(): print(e) return 1 - if (sleep_time is None or resume_time is None) or \ - (len(sleep_times) != len(resume_times)): + if ( + (sleep_time is None or resume_time is None) or + (len(sleep_times) != len(resume_times)) + ): print("ERROR: One or more times was not reported correctly") return 1 - print("Average time to enter sleep state: %.4f seconds" % mean(sleep_times)) - print("Average time to resume from sleep state: %.4f seconds" % mean(resume_times)) + print( + "Average time to enter sleep state: %.4f seconds" % mean(sleep_times)) + print( + "Average time to resume from sleep state: %.4f seconds" % mean( + resume_times)) failed = 0 if sleep_time > args.sleep_threshold: @@ -76,5 +81,6 @@ def main(): return failed + if __name__ == "__main__": sys.exit(main()) diff --git a/bin/stress_ng_test.py b/bin/stress_ng_test.py index b8c2586..35aad2d 100755 --- a/bin/stress_ng_test.py +++ b/bin/stress_ng_test.py @@ -138,7 +138,7 @@ def num_numa_nodes(): try: return int(run(['numactl', '--hardware'], stdout=PIPE).stdout.split()[1]) - except: + except (ValueError, OSError, IndexError): return 1 diff --git a/bin/test_bt_keyboard.py b/bin/test_bt_keyboard.py index 1457a1d..782ae20 100755 --- a/bin/test_bt_keyboard.py +++ b/bin/test_bt_keyboard.py @@ -5,7 +5,7 @@ # Written by: # Maciej Kisielewski <maciej.kisielewski@canonical.com> -import checkbox_support.bt_helper +import checkbox_support.bt_helper as bt_helper def main(): diff --git a/bin/touchpad_driver_info.py b/bin/touchpad_driver_info.py index 431b874..9f6278f 100755 --- a/bin/touchpad_driver_info.py +++ b/bin/touchpad_driver_info.py @@ -67,8 +67,8 @@ def main(): if attributes: modinfo = TouchpadDriver(attributes['driver']) attributes['version'] = modinfo.driver_version - print("%s: %s\n%s: %s\n%s: %s\n" % - ('Device', attributes['product'], + print("%s: %s\n%s: %s\n%s: %s\n" % ( + 'Device', attributes['product'], 'Driver', attributes['driver'], 'Driver Version', attributes['version'])) else: diff --git a/bin/touchpad_test.py b/bin/touchpad_test.py index bc6a474..6ecde92 100755 --- a/bin/touchpad_test.py +++ b/bin/touchpad_test.py @@ -8,8 +8,8 @@ from gettext import gettext as _ gi.require_version('Gdk', '3.0') gi.require_version('Gio', '2.0') gi.require_version("Gtk", "3.0") -from gi.repository import Gio, Gtk, Gdk -from optparse import OptionParser +from gi.repository import Gio, Gtk, Gdk # noqa: E402 +from optparse import OptionParser # noqa: E402 EXIT_WITH_FAILURE = 1 @@ -205,5 +205,6 @@ def main(args): return scroller.exit_code + if __name__ == "__main__": sys.exit(main(sys.argv[1:])) diff --git a/bin/virtualization.py b/bin/virtualization.py index 2b7cd54..10b827b 100755 --- a/bin/virtualization.py +++ b/bin/virtualization.py @@ -330,7 +330,7 @@ class KVMTest(object): # Attempt download try: - resp = urllib.request.urlretrieve(full_url, cloud_iso) + urllib.request.urlretrieve(full_url, cloud_iso) except (IOError, OSError, urllib.error.HTTPError, @@ -400,25 +400,6 @@ class KVMTest(object): Generate Cloud meta data and creates an iso object to be mounted as virtual device to instance during boot. """ - - user_data = """\ -#cloud-config - -runcmd: - - [ sh, -c, echo "========= CERTIFICATION TEST =========" ] - -power_state: - mode: halt - message: Bye - timeout: 480 - -final_message: CERTIFICATION BOOT COMPLETE -""" - - meta_data = """\ -{ echo instance-id: iid-local01; echo local-hostname, certification; } -""" - for file in ['user-data', 'meta-data']: logging.debug("Creating cloud %s", file) with open(file, "wt") as data_file: @@ -427,11 +408,11 @@ final_message: CERTIFICATION BOOT COMPLETE # Create Data ISO hosting user & meta cloud config data try: - iso_build = check_output( + check_output( ['genisoimage', '-output', 'seed.iso', '-volid', 'cidata', '-joliet', '-rock', 'user-data', 'meta-data'], universal_newlines=True) - except CalledProcessError as exception: + except CalledProcessError: logging.exception("Cloud data disk creation failed") def log_check(self, stream): @@ -483,7 +464,7 @@ final_message: CERTIFICATION BOOT COMPLETE self.create_cloud_disk() # Boot Virtual Machine - instance = self.boot_image(self.image) + self.boot_image(self.image) # If running in console, reset console window to regain # control from VM Serial I/0 @@ -659,7 +640,7 @@ class LXDTest(object): logging.debug("Attempting download of {} from {}".format(filename, url)) try: - resp = urllib.request.urlretrieve(url, filename) + urllib.request.urlretrieve(url, filename) except (IOError, OSError, urllib.error.HTTPError, diff --git a/bin/volume_test.py b/bin/volume_test.py index a949f93..d1aeffd 100755 --- a/bin/volume_test.py +++ b/bin/volume_test.py @@ -8,12 +8,12 @@ from subprocess import check_output TYPES = ("source", "sink") -active_entries_regex = re.compile("\* index.*?(?=properties)", re.DOTALL) -entries_regex = re.compile("index.*?(?=properties)", re.DOTALL) -index_regex = re.compile("(?<=index: )[0-9]*") -muted_regex = re.compile("(?<=muted: ).*") -volume_regex = re.compile("(?<=volume: 0: )\s*[0-9]*") -name_regex = re.compile("(?<=name:).*") +active_entries_regex = re.compile(r"\* index.*?(?=properties)", re.DOTALL) +entries_regex = re.compile(r"index.*?(?=properties)", re.DOTALL) +index_regex = re.compile(r"(?<=index: )[0-9]*") +muted_regex = re.compile(r"(?<=muted: ).*") +volume_regex = re.compile(r"(?<=volume: 0: )\s*[0-9]*") +name_regex = re.compile(r"(?<=name:).*") def check_muted(): @@ -27,26 +27,29 @@ def check_muted(): pacmd_entries = check_output(["pacmd", "list-%ss" % vtype], universal_newlines=True) except Exception as e: - print("Error when running pacmd list-%ss: %s" % (vtype, e), - file=sys.stderr) + print( + "Error when running pacmd list-%ss: %s" % (vtype, e), + file=sys.stderr) return 1 active_entry_match = active_entries_regex.search(pacmd_entries) if active_entry_match: active_entry = active_entry_match.group() else: - print("Unable to find a %s active_entry in the pacmd list-%ss"\ - " output\npacmd output was: %s" % - (vtype, vtype, pacmd_entries), file=sys.stderr) + print( + "Unable to find a %s active_entry in the pacmd list-%ss" + " output\npacmd output was: %s" % + (vtype, vtype, pacmd_entries), file=sys.stderr) return 1 name_match = name_regex.search(active_entry) if name_match: name = name_match.group() else: - print("Unable to determine device bus information from the"\ - " pacmd list-%ss output\npacmd output was: %s" % - (vtype, pacmd_entries), file=sys.stderr) + print( + "Unable to determine device bus information from the" + " pacmd list-%ss output\npacmd output was: %s" % + (vtype, pacmd_entries), file=sys.stderr) return 1 muted_match = muted_regex.search(active_entry) @@ -58,9 +61,10 @@ def check_muted(): else: print("PASS: Audio is not muted on %s %s" % (name, vtype)) else: - print("Unable to find mute information in the pacmd list-%ss"\ - " output for device %s\npacmd output was: %s" % - (vtype, name, pacmd_entries), file=sys.stderr) + print( + "Unable to find mute information in the pacmd list-%ss" + " output for device %s\npacmd output was: %s" % + (vtype, name, pacmd_entries), file=sys.stderr) return 1 return retval @@ -77,8 +81,9 @@ def check_volume(minvol, maxvol): pacmd_entries = check_output(["pacmd", "list-%ss" % vtype], universal_newlines=True) except Exception as e: - print("Error when running pacmd list-%ss: %s" % (vtype, e), - file=sys.stderr) + print( + "Error when running pacmd list-%ss: %s" % (vtype, e), + file=sys.stderr) return 1 entries = entries_regex.findall(pacmd_entries) @@ -88,32 +93,36 @@ def check_volume(minvol, maxvol): if name_match: name = name_match.group() else: - print("Unable to determine device bus information from the"\ - " pacmd list-%ss output\npacmd output was: %s" % - (vtype, pacmd_entries), file=sys.stderr) + print( + "Unable to determine device bus information from the" + " pacmd list-%ss output\npacmd output was: %s" % + (vtype, pacmd_entries), file=sys.stderr) return 1 volume_match = volume_regex.search(entry) if volume_match: volume = int(volume_match.group().strip()) if volume > maxvol: - print ("FAIL: Volume of %d is greater than"\ - " maximum of %d for %s %s" % (volume, maxvol, - name, vtype)) + print( + "FAIL: Volume of %d is greater than" + " maximum of %d for %s %s" % + (volume, maxvol, name, vtype)) retval = 1 elif volume < minvol: - print ("FAIL: Volume of %d is less than"\ - " minimum of %d for %s %s" % (volume, minvol, - name, vtype)) + print( + "FAIL: Volume of %d is less than" + " minimum of %d for %s %s" % + (volume, minvol, name, vtype)) retval = 1 else: - print ("PASS: Volume is %d for %s %s" % (volume, name, - vtype)) + print( + "PASS: Volume is %d for %s %s" % + (volume, name, vtype)) else: - print("Unable to find volume information in the pacmd"\ - " list-%ss output for device %s.\npacmd output "\ - "was: %s" % - (vtype, name, pacmd_entries), file=sys.stderr) + print( + "Unable to find volume information in the pacmd" + " list-%ss output for device %s.\npacmd output " + "was: %s" % (vtype, name, pacmd_entries), file=sys.stderr) return 1 return retval @@ -134,5 +143,6 @@ def main(): check_volume_retval = check_volume(args.minvol, args.maxvol) return check_muted_retval or check_volume_retval + if __name__ == "__main__": sys.exit(main()) diff --git a/bin/wifi_ap_wizard.py b/bin/wifi_ap_wizard.py index 5506d01..9029040 100755 --- a/bin/wifi_ap_wizard.py +++ b/bin/wifi_ap_wizard.py @@ -61,5 +61,6 @@ def main(): raise SystemExit('Did not get prompted ("{}")'.format(prompt)) wizard.writeline(response) + if __name__ == '__main__': main() diff --git a/bin/wifi_client_test_netplan.py b/bin/wifi_client_test_netplan.py index c74320e..6f8281c 100755 --- a/bin/wifi_client_test_netplan.py +++ b/bin/wifi_client_test_netplan.py @@ -21,8 +21,6 @@ import subprocess as sp import textwrap import time import shutil -from struct import pack -from socket import inet_ntoa import sys print = functools.partial(print, flush=True) @@ -135,7 +133,8 @@ def generate_test_config(interface, ssid, psk, address, dhcp): nameservers: {} Static IP no dhcp: - >>> print(generate_test_config("eth0", "my_ap", "s3cr3t", "192.168.1.1", False)) + >>> print(generate_test_config( + "eth0", "my_ap", "s3cr3t", "192.168.1.1", False)) # This is the network config written by checkbox network: version: 2 @@ -162,7 +161,8 @@ def generate_test_config(interface, ssid, psk, address, dhcp): password = "password: " + psk else: password = "" - return textwrap.dedent(np_cfg.format(interface, ssid, password, address, dhcp)) + return textwrap.dedent( + np_cfg.format(interface, ssid, password, address, dhcp)) def write_test_config(config): diff --git a/bin/wifi_master_mode.py b/bin/wifi_master_mode.py index 3888caa..d52d5c0 100755 --- a/bin/wifi_master_mode.py +++ b/bin/wifi_master_mode.py @@ -67,7 +67,7 @@ class WifiMasterMode(): logging.info(output) logging.info("AP successfully established.") child.terminate() - if child.poll() is not 0: + if child.poll() != 0: output = child.stdout.read() logging.error(log + output) logging.error('AP failed to start') diff --git a/bin/wifi_time2reconnect.py b/bin/wifi_time2reconnect.py index d4ae2c0..a1b49b5 100755 --- a/bin/wifi_time2reconnect.py +++ b/bin/wifi_time2reconnect.py @@ -7,9 +7,8 @@ import time import subprocess from datetime import datetime try: - from subprocess import DEVNULL # >= python3.3 + from subprocess import DEVNULL # >= python3.3 except ImportError: - import os DEVNULL = open(os.devnull, 'wb') IFACE = None @@ -21,7 +20,7 @@ def main(): Check the time needed to reconnect an active WIFI connection """ devices = subprocess.getoutput('nmcli dev') - match = re.search('(\w+)\s+(802-11-wireless|wifi)\s+connected', devices) + match = re.search(r'(\w+)\s+(802-11-wireless|wifi)\s+connected', devices) if match: IFACE = match.group(1) else: @@ -46,7 +45,7 @@ def main(): return 1 subprocess.call( - 'nmcli dev disconnect iface %s' %IFACE, + 'nmcli dev disconnect iface %s' % IFACE, stdout=open(os.devnull, 'w'), stderr=subprocess.STDOUT, shell=True) @@ -55,13 +54,13 @@ def main(): start = datetime.now() subprocess.call( - 'nmcli con up uuid %s --timeout %s' %(uuid, TIMEOUT), + 'nmcli con up uuid %s --timeout %s' % (uuid, TIMEOUT), stdout=open(os.devnull, 'w'), stderr=subprocess.STDOUT, shell=True) delta = datetime.now() - start - print('%.2f Seconds' %delta.total_seconds()) + print('%.2f Seconds' % delta.total_seconds()) return 0 diff --git a/bin/window_test.py b/bin/window_test.py index ee79a89..895e2db 100755 --- a/bin/window_test.py +++ b/bin/window_test.py @@ -277,8 +277,7 @@ def print_move_window(iterations, timeout, *args): for it in range(iterations): print('Iteration %d of %d:' % (it + 1, iterations)) - exit_status = move_window('glxgears', - timeout) + status = move_window('glxgears', timeout) print('') return status @@ -290,7 +289,8 @@ def main(): 'open-close-multi': print_open_close_multi, 'move': print_move_window} - parser = ArgumentParser(description='Script that performs window operation') + parser = ArgumentParser( + description='Script that performs window operation') parser.add_argument('-t', '--test', default='all', help='The name of the test to run. \ @@ -338,5 +338,6 @@ def main(): return status + if __name__ == '__main__': exit(main()) diff --git a/bin/wwan_tests.py b/bin/wwan_tests.py index df44cf6..9bbf1e7 100755 --- a/bin/wwan_tests.py +++ b/bin/wwan_tests.py @@ -278,7 +278,7 @@ class ThreeGppConnection(): _wwan_radio_on() time.sleep(args.wwan_setup_time) ret_code = _ping_test(args.wwan_net_if) - except: + except subprocess.SubprocessError: pass _destroy_3gpp_connection() _wwan_radio_off() @@ -312,29 +312,6 @@ class Resources(): mm = MMDbus() for m in mm.get_modem_ids(): print("mm_id: {}".format(m)) - # Removed ability to fetch supported RATs when adding CLI support - # to this script. It is somewhat messy to scrape this from mmcli - # output and it wasn't actually used in test definitions. - # - # cap_bits = mm.get_rat_support(m) - # if cap_bits != 0: - # if (cap_bits & MMModemCapability['MM_MODEM_CAPABILITY_POTS']): - # print("pots: supported") - # if (cap_bits & - # MMModemCapability['MM_MODEM_CAPABILITY_CDMA_EVDO']): - # print("cdma_evdo: supported") - # if (cap_bits & - # MMModemCapability['MM_MODEM_CAPABILITY_GSM_UMTS']): - # print("gsm_umts: supported") - # if (cap_bits & - # MMModemCapability['MM_MODEM_CAPABILITY_LTE']): - # print("lte: supported") - # if (cap_bits & - # MMModemCapability['MM_MODEM_CAPABILITY_LTE_ADVANCED']): - # print("lte_advanced: supported") - # if (cap_bits & - # MMModemCapability['MM_MODEM_CAPABILITY_IRIDIUM']): - # print("iridium: supported") print("hw_id: {}".format(mm.get_equipment_id(m))) print("manufacturer: {}".format(mm.get_manufacturer(m))) print("model: {}".format(mm.get_model_name(m))) diff --git a/bin/xrandr_cycle.py b/bin/xrandr_cycle.py index 6743f6f..8d87217 100755 --- a/bin/xrandr_cycle.py +++ b/bin/xrandr_cycle.py @@ -117,11 +117,11 @@ if 'PLAINBOX_PROVIDER_DATA' in os.environ: shutter_xml_template = os.path.join(os.environ['PLAINBOX_PROVIDER_DATA'], "settings", "shutter.xml") else: - shutter_xml_template = os.path.join(os.path.split(os.path.dirname( - os.path.realpath(__file__)))[0], - "data", - "settings", - "shutter.xml") + shutter_xml_template = os.path.join( + os.path.split(os.path.dirname(os.path.realpath(__file__)))[0], + "data", + "settings", + "shutter.xml") if args.keyword: screenshot_path = screenshot_path + '_' + args.keyword @@ -167,7 +167,7 @@ try: 'folder="%s"' % screenshot_path, content)) new_profile.close() old_profile.close() -except: +except (IOError, OSError): raise SystemExit("ERROR: While updating folder name " "in shutter profile: {}".format(sys.exc_info())) @@ -198,7 +198,7 @@ for mode in highest_modes: print("""Could not capture screenshot - you may need to install the package 'shutter'.""") - except: + except (IOError, OSError): print("""Could not configure screenshot tool - you may need to install the package 'shutter', or check that {}/{} exists and is writable.""".format( @@ -220,7 +220,7 @@ try: with tarfile.open(screenshot_path + '.tgz', 'w:gz') as screen_tar: for screen in os.listdir(screenshot_path): screen_tar.add(screenshot_path + '/' + screen, screen) -except: +except (IOError, OSError): pass # Output some fun facts and knock off for the day |