diff options
author | PMR <pmr@pmr-lander> | 2017-07-05 09:44:54 +0000 |
---|---|---|
committer | PMR <pmr@pmr-lander> | 2017-07-05 09:44:54 +0000 |
commit | 1cddd6723ed57f468b3a11d89a1dbca79facac8e (patch) | |
tree | ede9ddf7bb4e30ddf2e77693d303d28c1463a49c /bin | |
parent | 4ba702d74b44de712fd9d15f1b5fa581bd796229 (diff) | |
parent | dab49b3d0559813341a7c74a5e77c185ed3b52f1 (diff) |
Merge #326656 from ~sylvain-pineau/plainbox-provider-checkbox:cleanup_duplicates
Diffstat (limited to 'bin')
-rwxr-xr-x | bin/bt_connect | 2 | ||||
-rw-r--r-- | bin/bt_helper.py | 300 | ||||
-rwxr-xr-x | bin/fwts_test | 459 | ||||
-rwxr-xr-x | bin/gateway_ping_test | 2 | ||||
-rwxr-xr-x | bin/network_bandwidth_test | 671 | ||||
-rwxr-xr-x | bin/network_device_info | 13 | ||||
-rwxr-xr-x | bin/removable_storage_test | 117 | ||||
-rwxr-xr-x | bin/removable_storage_watcher | 15 | ||||
-rwxr-xr-x | bin/storage_test | 10 |
9 files changed, 140 insertions, 1449 deletions
diff --git a/bin/bt_connect b/bin/bt_connect index 0ef5169..b52642a 100755 --- a/bin/bt_connect +++ b/bin/bt_connect @@ -23,7 +23,7 @@ import sys import time -import bt_helper +import checkbox_support.bt_helper from argparse import ArgumentParser diff --git a/bin/bt_helper.py b/bin/bt_helper.py deleted file mode 100644 index 91f879a..0000000 --- a/bin/bt_helper.py +++ /dev/null @@ -1,300 +0,0 @@ -# Copyright 2016 Canonical Ltd. -# Written by: -# Maciej Kisielewski <maciej.kisielewski@canonical.com> -# -# This is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License version 3, -# as published by the Free Software Foundation. -# -# This file is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this file. If not, see <http://www.gnu.org/licenses/>. -""" -This module provides a set of abstractions to ease the process of automating -typical Bluetooth task like scanning for devices and pairing with them. - -It talks with BlueZ stack using dbus. -""" -import logging - -import dbus -import dbus.service -import dbus.mainloop.glib -from gi.repository import GObject - -logger = logging.getLogger(__file__) -logger.addHandler(logging.StreamHandler()) - -IFACE = 'org.bluez.Adapter1' -ADAPTER_IFACE = 'org.bluez.Adapter1' -DEVICE_IFACE = 'org.bluez.Device1' -AGENT_IFACE = 'org.bluez.Agent1' - -dbus.mainloop.glib.DBusGMainLoop(set_as_default=True) - -# To get additional Bluetoot CoDs, check -# https://www.bluetooth.com/specifications/assigned-numbers/baseband -BT_ANY = 0 -BT_KEYBOARD = int('0x2540', 16) - - -class BtException(Exception): - pass - - -class BtManager: - """ Main point of contact with dbus factoring bt objects. """ - def __init__(self, verbose=False): - if verbose: - logger.setLevel(logging.DEBUG) - self._bus = dbus.SystemBus() - self._bt_root = self._bus.get_object('org.bluez', '/') - self._manager = dbus.Interface( - self._bt_root, 'org.freedesktop.DBus.ObjectManager') - self._main_loop = GObject.MainLoop() - self._register_agent() - - def _register_agent(self): - path = "/bt_helper/agent" - BtAgent(self._bus, path) - obj = self._bus.get_object('org.bluez', "/org/bluez") - agent_manager = dbus.Interface(obj, "org.bluez.AgentManager1") - agent_manager.RegisterAgent(path, 'NoInputNoOutput') - logger.info("Agent registered") - - def _get_objects_by_iface(self, iface_name): - for path, ifaces in self._manager.GetManagedObjects().items(): - if ifaces.get(iface_name): - yield self._bus.get_object('org.bluez', path) - - def get_bt_adapters(self): - """Yield BtAdapter objects for each BT adapter found.""" - for adapter in self._get_objects_by_iface(ADAPTER_IFACE): - yield BtAdapter(dbus.Interface(adapter, ADAPTER_IFACE), self) - - def get_bt_devices(self, category=BT_ANY, filters={}): - """Yields BtDevice objects currently known to the system. - - filters - specifies the characteristics of that a BT device must have - to be yielded. The keys of filters dictionary represent names of - parameters (as specified by the bluetooth DBus Api and represented by - DBus proxy object), and its values must match proxy values. - I.e. {'Paired': False}. For a full list of Parameters see: - http://git.kernel.org/cgit/bluetooth/bluez.git/tree/doc/device-api.txt - - Note that this function returns objects corresponding to BT devices - that were seen last time scanning was done.""" - for device in self._get_objects_by_iface(DEVICE_IFACE): - obj = self.get_object_by_path(device.object_path)[DEVICE_IFACE] - try: - if category != BT_ANY: - if obj['Class'] != category: - continue - rejected = False - for filter in filters: - if obj[filter] != filters[filter]: - rejected = True - break - if rejected: - continue - yield BtDevice(dbus.Interface(device, DEVICE_IFACE), self) - except KeyError as exc: - logger.info('Property %s not found on device %s', - exc, device.object_path) - continue - - def get_prop_iface(self, obj): - return dbus.Interface(self._bus.get_object( - 'org.bluez', obj.object_path), 'org.freedesktop.DBus.Properties') - - def get_object_by_path(self, path): - return self._manager.GetManagedObjects()[path] - - def get_proxy_by_path(self, path): - return self._bus.get_object('org.bluez', path) - - def wait(self): - self._main_loop.run() - - def quit_loop(self): - self._main_loop.quit() - - def ensure_adapters_powered(self): - for adapter in self.get_bt_adapters(): - adapter.ensure_powered() - - def scan(self, timeout=10): - """Scan for BT devices visible to all adapters.'""" - self._bus.add_signal_receiver( - interfaces_added, - dbus_interface="org.freedesktop.DBus.ObjectManager", - signal_name="InterfacesAdded") - self._bus.add_signal_receiver( - properties_changed, - dbus_interface="org.freedesktop.DBus.Properties", - signal_name="PropertiesChanged", - arg0="org.bluez.Device1", - path_keyword="path") - for adapter in self._get_objects_by_iface(ADAPTER_IFACE): - try: - dbus.Interface(adapter, ADAPTER_IFACE).StopDiscovery() - except dbus.exceptions.DBusException: - pass - dbus.Interface(adapter, ADAPTER_IFACE).StartDiscovery() - GObject.timeout_add_seconds(timeout, self._scan_timeout) - self._main_loop.run() - - def get_devices(self, timeout=10, rescan=True): - """Scan for and list all devices visible to all adapters.""" - if rescan: - self.scan(timeout) - return list(self.get_bt_devices()) - - def _scan_timeout(self): - for adapter in self._get_objects_by_iface(ADAPTER_IFACE): - dbus.Interface(adapter, ADAPTER_IFACE).StopDiscovery() - self._main_loop.quit() - - -class BtAdapter: - def __init__(self, dbus_iface, bt_mgr): - self._if = dbus_iface - self._bt_mgr = bt_mgr - self._prop_if = bt_mgr.get_prop_iface(dbus_iface) - - def set_bool_prop(self, prop_name, value): - self._prop_if.Set(IFACE, prop_name, dbus.Boolean(value)) - - def ensure_powered(self): - """Turn the adapter on, and do nothing if already on.""" - powered = self._prop_if.Get(IFACE, 'Powered') - logger.info('Powering on {}'.format( - self._if.object_path.split('/')[-1])) - if powered: - logger.info('Device already powered') - return - try: - self.set_bool_prop('Powered', True) - logger.info('Powered on') - except Exception as exc: - logging.error('Failed to power on - {}'.format( - exc.get_dbus_message())) - - -class BtDevice: - def __init__(self, dbus_iface, bt_mgr): - self._if = dbus_iface - self._obj = bt_mgr.get_object_by_path( - self._if.object_path)[DEVICE_IFACE] - self._bt_mgr = bt_mgr - self._prop_if = bt_mgr.get_prop_iface(dbus_iface) - self._pair_outcome = None - - def __str__(self): - return "{} ({})".format(self.name, self.address) - - def __repr__(self): - return "<BtDevice name:{}, address:{}>".format(self.name, self.address) - - def pair(self): - """Pair the device. - - This function will try pairing with the device and block until device - is paired, error occured or default timeout elapsed (whichever comes - first). - """ - self._prop_if.Set(DEVICE_IFACE, 'Trusted', True) - self._if.Pair( - reply_handler=self._pair_ok, error_handler=self._pair_error) - self._bt_mgr.wait() - if self._pair_outcome: - raise BtException(self._pair_outcome) - try: - self._if.Connect() - except dbus.exceptions.DBusException as exc: - logging.error('Failed to connect - {}'.format( - exc.get_dbus_message())) - - def unpair(self): - self._if.Disconnect() - adapter = self._bt_mgr.get_proxy_by_path(self._obj['Adapter']) - dbus.Interface(adapter, ADAPTER_IFACE).RemoveDevice(self._if) - - @property - def name(self): - return self._obj.get('Name', '<Unnamed>') - - @property - def address(self): - return self._obj['Address'] - - @property - def rssi(self): - return self._obj.get('RSSI', None) - - def _pair_ok(self): - logger.info('%s successfully paired', self.name) - self._pair_outcome = None - self._bt_mgr.quit_loop() - - def _pair_error(self, error): - logger.warning('Pairing of %s device failed. %s', self.name, error) - self._pair_outcome = error - self._bt_mgr.quit_loop() - - -class Rejected(dbus.DBusException): - _dbus_error_name = "org.bluez.Error.Rejected" - - -class BtAgent(dbus.service.Object): - """Agent authenticating everything that is possible.""" - @dbus.service.method(AGENT_IFACE, in_signature="os", out_signature="") - def AuthorizeService(self, device, uuid): - logger.info("AuthorizeService (%s, %s)", device, uuid) - - @dbus.service.method(AGENT_IFACE, in_signature="o", out_signature="u") - def RequestPasskey(self, device): - logger.info("RequestPasskey (%s)", device) - passkey = input("Enter passkey: ") - return dbus.UInt32(passkey) - - @dbus.service.method(AGENT_IFACE, in_signature="o", out_signature="s") - def RequestPinCode(self, device): - logger.info("RequestPinCode (%s)", device) - return input("Enter PIN Code: ") - - @dbus.service.method(AGENT_IFACE, in_signature="ouq", out_signature="") - def DisplayPasskey(self, device, passkey, entered): - print("DisplayPasskey (%s, %06u entered %u)" % - (device, passkey, entered), flush=True) - - @dbus.service.method(AGENT_IFACE, in_signature="os", out_signature="") - def DisplayPinCode(self, device, pincode): - logger.info("DisplayPinCode (%s, %s)", device, pincode) - print('Type following pin on your device: {}'.format(pincode), - flush=True) - - @dbus.service.method(AGENT_IFACE, in_signature="ou", out_signature="") - def RequestConfirmation(self, device, passkey): - logger.info("RequestConfirmation (%s, %06d)", device, passkey) - - @dbus.service.method(AGENT_IFACE, in_signature="o", out_signature="") - def RequestAuthorization(self, device): - logger.info("RequestAuthorization (%s)", device) - - @dbus.service.method(AGENT_IFACE, in_signature="", out_signature="") - def Cancel(self): - logger.info("Cancelled") - - -def properties_changed(interface, changed, invalidated, path): - logger.info('Property changed for device @ %s. Change: %s', path, changed) - - -def interfaces_added(path, interfaces): - logger.info('Added new bt interfaces: %s @ %s', interfaces, path) diff --git a/bin/fwts_test b/bin/fwts_test deleted file mode 100755 index c8ff1f7..0000000 --- a/bin/fwts_test +++ /dev/null @@ -1,459 +0,0 @@ -#! /usr/bin/python3 - -import sys -import re -from time import time -from argparse import ArgumentParser, RawTextHelpFormatter, REMAINDER -from subprocess import Popen, PIPE, check_output -from syslog import * -from distutils.spawn import find_executable -import os - -# These tests require user interaction and need either special handling -# or skipping altogether (right now, we skip them but they're kept here -# in case we figure out a way to present the interaction to the user). -INTERACTIVE_TESTS = ['ac_adapter', - 'battery', - 'hotkey', - 'power_button', - 'brightness', - 'lid'] -# Tests recommended by the Hardware Enablement Team (HWE) -# These are performed on QA certification runs -QA_TESTS = ['acpitests', - 'apicedge', - 'aspm', - 'cpufreq', - 'dmicheck', - 'esrt', - 'klog', - 'maxfreq', - 'msr', - 'mtrr', - 'nx', - 'oops', - 'uefibootpath', - 'uefirtmisc', - 'uefirttime', - 'uefirtvariable', - 'version', - 'virt'] -# The following tests will record logs in a separate file for the HWE team -HWE_TESTS = ['version', - 'mtrr', - 'virt', - 'apicedge', - 'klog', - 'oops'] -# By default, we launch all the tests -TESTS = sorted(list(set(QA_TESTS + HWE_TESTS))) - - -def get_sleep_times(start_marker, end_marker, sleep_time, resume_time): - logfile = '/var/log/syslog' - log_fh = open(logfile, 'r', encoding='UTF-8') - line = '' - run = 'FAIL' - sleep_start_time = 0.0 - sleep_end_time = 0.0 - resume_start_time = 0.0 - resume_end_time = 0.0 - - while start_marker not in line: - try: - line = log_fh.readline() - except UnicodeDecodeError: - continue - if start_marker in line: - loglist = log_fh.readlines() - - for idx in range(0, len(loglist)): - if 'PM: Syncing filesystems' in loglist[idx]: - sleep_start_time = re.split('[\[\]]', loglist[idx])[1].strip() - if 'ACPI: Low-level resume complete' in loglist[idx]: - sleep_end_time = re.split('[\[\]]', loglist[idx - 1])[1].strip() - resume_start_time = re.split('[\[\]]', loglist[idx])[1].strip() - idx += 1 - if 'Restarting tasks' in loglist[idx]: - resume_end_time = re.split('[\[\]]', loglist[idx])[1].strip() - if end_marker in loglist[idx]: - run = 'PASS' - break - - sleep_elapsed = float(sleep_end_time) - float(sleep_start_time) - resume_elapsed = float(resume_end_time) - float(resume_start_time) - return (run, sleep_elapsed, resume_elapsed) - - -def average_times(runs): - sleep_total = 0.0 - resume_total = 0.0 - run_count = 0 - for run in runs.keys(): - run_count += 1 - sleep_total += runs[run][1] - resume_total += runs[run][2] - sleep_avg = sleep_total / run_count - resume_avg = resume_total / run_count - print('Average time to sleep: %0.5f' % sleep_avg) - print('Average time to resume: %0.5f' % resume_avg) - - -def fix_sleep_args(args): - new_args = [] - for arg in args: - if "=" in arg: - new_args.extend(arg.split('=')) - else: - new_args.append(arg) - return new_args - - -def detect_progress_indicator(): - # Return a command suitable for piping progress information to its - # stdin (invoked via Popen), in list format. - # Return zenity if installed and DISPLAY (--auto-close) - # return dialog if installed and no DISPLAY (width height) - display = os.environ.get('DISPLAY') - if display and find_executable('zenity'): - return ["zenity", "--progress", "--text", "Progress", "--auto-close"] - if not display and find_executable('dialog'): - return ["dialog", "--gauge", "Progress", "20", "70"] - # Return None if no progress indicator is to be used - return None - - -def main(): - description_text = 'Tests the system BIOS using the Firmware Test Suite' - epilog_text = ('To perform sleep testing, you will need at least some of ' - 'the following options: \n' - 's3 or s4: tells fwts which type of sleep to perform.\n' - '--s3-delay-delta\n' - '--s3-device-check\n' - '--s3-device-check-delay\n' - '--s3-hybrid-sleep\n' - '--s3-max-delay\n' - '--s3-min-delay\n' - '--s3-multiple\n' - '--s3-quirks\n' - '--s3-sleep-delay\n' - '--s3power-sleep-delay\n\n' - 'Example: fwts_test --sleep s3 --s3-min-delay 30 ' - '--s3-multiple 10 --s3-device-check\n\n' - 'For further help with sleep options:\n' - 'fwts_test --fwts-help') - parser = ArgumentParser(description=description_text, - epilog=epilog_text, - formatter_class=RawTextHelpFormatter) - parser.add_argument('-l', '--log', - default='/tmp/fwts_results.log', - help=('Specify the location and name ' - 'of the log file.\n' - '[Default: %(default)s]')) - parser.add_argument('-f', '--fail-level', - default='high', - choices=['critical', 'high', 'medium', - 'low', 'none', 'aborted'], - help=('Specify the FWTS failure level that will ' - 'trigger this script to return a failing exit ' - 'code. For example, if you chose "critical" as ' - 'the fail-level, this wrapper will NOT return ' - 'a failing exit code unless FWTS reports a ' - 'test as FAILED_CRITICAL. You will still be ' - 'notified of all FWTS test failures. ' - '[Default level: %(default)s]')) - sleep_args = parser.add_argument_group('Sleep Options', - ('The following arguments are to ' - 'only be used with the ' - '--sleep test option')) - sleep_args.add_argument('--sleep-time', - dest='sleep_time', - action='store', - help=('The max time in seconds that a system ' - 'should take\nto completely enter sleep. ' - 'Anything more than this\ntime will cause ' - 'that test iteration to fail.\n' - '[Default: 10s]')) - sleep_args.add_argument('--resume-time', - dest='resume_time', - action='store', - help=('Same as --sleep-time, except this applies ' - 'to the\ntime it takes a system to fully ' - 'wake from sleep.\n[Default: 3s]')) - - group = parser.add_mutually_exclusive_group() - group.add_argument('-t', '--test', - action='append', - help='Name of the test to run.') - group.add_argument('-s', '--sleep', - nargs=REMAINDER, - action='store', - help=('Perform sleep test(s) using the additional\n' - 'arguments provided after --sleep. Remaining\n' - 'items on the command line will be passed \n' - 'through to fwts for performing sleep tests. \n' - 'For info on these extra fwts options, please \n' - 'see the epilog below and \n' - 'the --fwts-help option.')) - group.add_argument('--hwe', - action='store_true', - help='Run HWE concerned tests in fwts') - group.add_argument('--qa', - action='store_true', - help='Run QA concerned tests in fwts') - group.add_argument('--fwts-help', - dest='fwts_help', - action='store_true', - help='Display the help info for fwts itself (lengthy)') - group.add_argument('--list', - action='store_true', - help='List all tests in fwts.') - group.add_argument('--list-hwe', - action='store_true', - help='List all HWE concerned tests in fwts') - group.add_argument('--list-qa', - action='store_true', - help='List all QA concerned tests in fwts') - args = parser.parse_args() - - tests = [] - results = {} - critical_fails = [] - high_fails = [] - medium_fails = [] - low_fails = [] - passed = [] - aborted = [] - - # Set correct fail level - if args.fail_level is not 'none': - args.fail_level = 'FAILED_%s' % args.fail_level.upper() - - # Get our failure priority and create the priority values - fail_levels = {'FAILED_CRITICAL': 4, - 'FAILED_HIGH': 3, - 'FAILED_MEDIUM': 2, - 'FAILED_LOW': 1, - 'FAILED_NONE': 0, - 'FAILED_ABORTED': -1} - fail_priority = fail_levels[args.fail_level] - - # Enforce only using sleep opts with --sleep - if args.sleep_time or args.resume_time and not args.sleep: - parser.error('--sleep-time and --resume-time only apply to the ' - '--sleep testing option.') - if args.fwts_help: - Popen('fwts -h', shell=True).communicate()[0] - return 0 - elif args.list: - print('\n'.join(TESTS)) - return 0 - elif args.list_hwe: - print('\n'.join(HWE_TESTS)) - return 0 - elif args.list_qa: - print('\n'.join(QA_TESTS)) - return 0 - elif args.test: - tests.extend(args.test) - elif args.hwe: - tests.extend(HWE_TESTS) - elif args.qa: - tests.extend(QA_TESTS) - elif args.sleep: - args.sleep = fix_sleep_args(args.sleep) - iterations = 1 - # if multiple iterations are requested, we need to intercept - # that argument and keep it from being presented to fwts since - # we're handling the iterations directly. - s3 = '--s3-multiple' - s4 = '--s4-multiple' - if s3 in args.sleep: - iterations = int(args.sleep.pop(args.sleep.index(s3) + 1)) - args.sleep.remove(s3) - if s4 in args.sleep: - iterations = int(args.sleep.pop(args.sleep.index(s4) + 1)) - args.sleep.remove(s4) - # if we've passed our custom sleep arguments for resume or sleep - # time, we need to intercept those as well. - resume_time_arg = '--resume-time' - sleep_time_arg = '--sleep-time' - if resume_time_arg in args.sleep: - args.resume_time = int(args.sleep.pop( - args.sleep.index(resume_time_arg) + 1)) - args.sleep.remove(resume_time_arg) - if sleep_time_arg in args.sleep: - args.sleep_time = int(args.sleep.pop( - args.sleep.index(sleep_time_arg) + 1)) - args.sleep.remove(sleep_time_arg) - # if we still haven't set a sleep or resume time, use defauts. - if not args.sleep_time: - args.sleep_time = 10 - if not args.resume_time: - args.resume_time = 3 - tests.extend(args.sleep) - else: - tests.extend(TESTS) - - # run the tests we want - if args.sleep: - iteration_results = {} - print('=' * 20 + ' Test Results ' + '=' * 20) - progress_indicator = None - if detect_progress_indicator(): - progress_indicator = Popen(detect_progress_indicator(), - stdin=PIPE) - for iteration in range(0, iterations): - timestamp = int(time()) - start_marker = 'CHECKBOX SLEEP TEST START %s' % timestamp - end_marker = 'CHECKBOX SLEEP TEST STOP %s' % timestamp - syslog(LOG_INFO, '---' + start_marker + '---' + str(time())) - command = ('fwts -q --stdout-summary -r %s %s' - % (args.log, ' '.join(tests))) - results['sleep'] = (Popen(command, stdout=PIPE, shell=True) - .communicate()[0].strip()).decode() - syslog(LOG_INFO, '---' + end_marker + '---' + str(time())) - if 's4' not in args.sleep: - sleep_times = get_sleep_times(start_marker, - end_marker, - args.sleep_time, - args.resume_time) - iteration_results[iteration] = sleep_times - progress_tuple = (iteration, - iteration_results[iteration][0], - iteration_results[iteration][1], - iteration_results[iteration][2]) - progress_string = (' - Cycle %s: Status: %s ' - 'Sleep Elapsed: %0.5f ' - 'Resume Elapsed: ' - ' %0.5f' % progress_tuple) - progress_pct = "{}".format(int(100 * iteration / iterations)) - if "zenity" in detect_progress_indicator(): - progress_indicator.stdin.write("# {}\n".format( - progress_string).encode('utf-8')) - progress_indicator.stdin.write("{}\n".format( - progress_pct).encode('utf-8')) - progress_indicator.stdin.flush() - elif "dialog" in detect_progress_indicator(): - progress_indicator.stdin.write("XXX\n".encode('utf-8')) - progress_indicator.stdin.write( - progress_pct.encode('utf-8')) - progress_indicator.stdin.write( - "\nTest progress\n".encode('utf-8')) - progress_indicator.stdin.write( - progress_string.encode('utf-8')) - progress_indicator.stdin.write( - "\nXXX\n".encode('utf-8')) - progress_indicator.stdin.flush() - else: - print(progress_string) - progress_indicator.terminate() - - if 's4' not in args.sleep: - average_times(iteration_results) - for run in iteration_results.keys(): - if 'FAIL' in iteration_results[run]: - results['sleep'] = 'FAILED_CRITICAL' - else: - for test in tests: - # ACPI tests can now be run with --acpitests (fwts >= 15.07.00) - log = args.log - # Split the log file for HWE (only if -t is not used) - if test == 'acpitests': - test = '--acpitests' - command = ('fwts -q --stdout-summary -r %s %s' - % (log, test)) - results[test] = (Popen(command, stdout=PIPE, shell=True) - .communicate()[0].strip()).decode() - - # lp:1584607 - # We append the content of dmesg and syslog at the end of the logfile - # generated by FWTS. - # FIXME: Commented out after discovered that it created HUGE log files - # during stress tests. - #with open(args.log, 'a') as logfile: - # logfile.write("--- beginning of dmesg ---\n") - # logfile.write(check_output('dmesg').decode('utf-8', 'ignore')) - # logfile.write("--- end of dmesg ---\n") - # logfile.write("--- beginning of syslog ---\n") - # logfile.write(check_output(['cat', '/var/log/syslog']).decode('utf-8', 'ignore')) - # logfile.write("--- end of syslog ---\n") - - # parse the summaries - for test in results.keys(): - if 'FAILED_CRITICAL' in results[test]: - critical_fails.append(test) - if 'FAILED_HIGH' in results[test]: - high_fails.append(test) - if 'FAILED_MEDIUM' in results[test]: - medium_fails.append(test) - if 'FAILED_LOW' in results[test]: - low_fails.append(test) - if 'PASSED' in results[test]: - passed.append(test) - if 'ABORTED' in results[test]: - aborted.append(test) - else: - continue - - if critical_fails: - print("Critical Failures: %d" % len(critical_fails)) - print("WARNING: The following test cases were reported as critical\n" - "level failures by fwts. Please review the log at\n" - "%s for more information." % args.log) - for test in critical_fails: - print(" - " + test) - if high_fails: - print("High Failures: %d" % len(high_fails)) - print("WARNING: The following test cases were reported as high\n" - "level failures by fwts. Please review the log at\n" - "%s for more information." % args.log) - for test in high_fails: - print(" - " + test) - if medium_fails: - print("Medium Failures: %d" % len(medium_fails)) - print("WARNING: The following test cases were reported as medium\n" - "level failures by fwts. Please review the log at\n" - "%s for more information." % args.log) - for test in medium_fails: - print(" - " + test) - if low_fails: - print("Low Failures: %d" % len(low_fails)) - print("WARNING: The following test cases were reported as low\n" - "level failures by fwts. Please review the log at\n" - "%s for more information." % args.log) - for test in low_fails: - print(" - " + test) - if passed: - print("Passed: %d" % len(passed)) - for test in passed: - print(" - " + test) - if aborted: - print("Aborted Tests: %d" % len(aborted)) - print("WARNING: The following test cases were aborted by fwts\n" - "Please review the log at %s for more information." - % args.log) - for test in aborted: - print(" - " + test) - - if args.fail_level is not 'none': - if fail_priority == fail_levels['FAILED_CRITICAL']: - if critical_fails: - return 1 - if fail_priority == fail_levels['FAILED_HIGH']: - if critical_fails or high_fails: - return 1 - if fail_priority == fail_levels['FAILED_MEDIUM']: - if critical_fails or high_fails or medium_fails: - return 1 - if fail_priority == fail_levels['FAILED_LOW']: - if critical_fails or high_fails or medium_fails or low_fails: - return 1 - if fail_priority == fail_levels['FAILED_ABORTED']: - if aborted or critical_fails or high_fails: - return 1 - - return 0 - -if __name__ == '__main__': - sys.exit(main()) diff --git a/bin/gateway_ping_test b/bin/gateway_ping_test index 22aec6a..3e58c21 100755 --- a/bin/gateway_ping_test +++ b/bin/gateway_ping_test @@ -278,7 +278,7 @@ def main(args): ).format(ping_summary['pct_loss'], args.threshold)) return 0 else: - print(_("Internet connection fully established")) + print(_("Connection to test host fully established")) return 0 diff --git a/bin/network_bandwidth_test b/bin/network_bandwidth_test deleted file mode 100755 index c9b1dfb..0000000 --- a/bin/network_bandwidth_test +++ /dev/null @@ -1,671 +0,0 @@ -#!/usr/bin/env python3 - -import os -import re -import sys -import random -import logging -import subprocess - -from datetime import datetime, timedelta -from time import sleep - -from logging import StreamHandler, FileHandler, Formatter -from optparse import OptionParser - -from checkbox_support.lib.conversion import string_to_type - - -class CommandException(Exception): - - pass - - -class CommandOutput(object): - - def __init__(self, **attributes): - self._attributes = attributes - - def __getattr__(self, name): - if name in self._attributes: - return self._attributes.get(name) - - return None - - -class Command(object): - - # Name of the command to run - name = None - - # Number of command line arguments - argument_count = 0 - - # Option processing - option_strings = {} - option_defaults = {} - - # Ouput processing - output_factory = CommandOutput - output_patterns = {} - - # Convenient output patterns - non_space = r"[^ ]+" - - def __init__(self, *arguments, **options): - if len(arguments) != self.argument_count: - raise TypeError("Invalid number of arguments: %d" % len(arguments)) - - self._arguments = arguments - - self._options = self.option_defaults.copy() - for name, string in options.items(): - if name not in self.option_strings: - raise TypeError("Unknown option: %s" % name) - self._options[name] = string - - def get_command(self): - command = [self.name] - for name, string in self._options.items(): - # Match option from string - if isinstance(string, bool): - option = self.option_strings[name] - else: - option = self.option_strings[name] % string - - command.append(option) - - command.extend(self._arguments) - - return " ".join(command) - - def parse_lines(self, lines): - attributes = {} - for line in lines: - # Match patterns from lines - for name, pattern in self.output_patterns.items(): - match = re.search(pattern, line) - if match: - attributes[name] = string_to_type(match.group(1)) - - return self.output_factory(**attributes) - - def parse_output(self, output): - lines = output.split("\n") - # Strip leading and trailing spaces - lines = [l.strip() for l in lines] - # Skip blank lines - lines = [l for l in lines if l] - - return self.parse_lines(lines) - - def run(self): - command = self.get_command() - logging.debug("Running command: %s" % command) - process = subprocess.Popen(command, shell=True, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE) - error = process.stderr.read() - if error: - raise CommandException(error.decode("utf-8")) - - output = process.stdout.read() - return self.parse_output(output.decode("utf-8")) - - -class NetworkConfigOutput(CommandOutput): - - @property - def speed(self): - if self.name == "lo": - return 10000 - - try: - wireless = WirelessConfig(self.name).run() - speed = wireless.bit_rate - except CommandException: - wired = WiredConfig(self.name).run() - speed = wired.speed - - return speed / 1024 / 1024 - - -class NetworkConfig(Command): - - name = "ifconfig" - - argument_count = 1 - - ipv4 = r"\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}" - ipv6 = r"[\w:]+/\d+" - mac_address = r"\w\w:\w\w:\w\w:\w\w:\w\w:\w\w" - - output_factory = NetworkConfigOutput - output_patterns = { - "name": r"(%s).*Link encap" % Command.non_space, - "broadcast": r"Bcast:(%s)" % ipv4, - "collisions": "collisions:(\d+)", - "hwaddr": r"HWaddr (%s)" % mac_address, - "inet_addr": r"inet addr:(%s)" % ipv4, - "link_encap": r"Link encap:(%s)" % Command.non_space, - "netmask": r"Mask:(%s)" % ipv4, - "metric": r"Metric:(\d+)", - "mtu": r"MTU:(\d+)", - "rx_bytes": "RX bytes:(\d+)", - "rx_dropped": "RX packets:.* dropped:(\d+)", - "rx_errors": "RX packets:.* errors:(\d+)", - "rx_frame": "RX packets:.* frame:(\d+)", - "rx_overruns": "RX packets:.* overruns:(\d+)", - "rx_packets": "RX packets:(\d+)", - "tx_bytes": "TX bytes:(\d+)", - "tx_carrier": "TX packets:.* carrier:(\d+)", - "tx_dropped": "TX packets:.* dropped:(\d+)", - "tx_errors": "TX packets:.* errors:(\d+)", - "tx_overruns": "TX packets:.* overruns:(\d+)", - "tx_packets": "TX packets:(\d+)", - "txqueuelen": "txqueuelen:(\d+)"} - - -class NetworkConfigs(Command): - - name = "ifconfig -a" - - def parse_output(self, output): - outputs = [] - for paragraph in output.split("\n\n"): - if not paragraph: - continue - - lines = paragraph.split("\n") - name = re.split(r"\s+", lines[0])[0] - config = NetworkConfig(name).parse_lines(lines) - outputs.append(config) - - return outputs - - -class WiredConfig(Command): - - name = "ethtool" - - argument_count = 1 - - output_patterns = { - "advertised_auto_negotiation": r"Advertised auto-negotiation:\s+(.*)", - "advertised_link_modes": r"Advertised link modes:\s+(.*)", - "auto_negotiation": r"Auto-negotiation:\s+(.*)", - "current_message_level": r"Current message level:\s+(.*)", - "duplex": r"Duplex:\s+(.*)", - "link_detected": r"Link detected:\s+(.*)", - "phyad": r"PHYAD:\s+(.*)", - "port": r"Port:\s+(.*)", - "speed": r"Speed:\s+(.*)/s", - "supported_auto_negotiation": r"Supports auto-negotiation:\s+(.*)", - "supported_link_modes": r"Supported link modes:\s+(.*)", - "supported_ports": r"Supported ports:\s+(.*)", - "supports_wake_on": r"Supports Wake-on:\s+(.*)", - "transceiver": r"Transceiver:\s+(.*)", - "wake_on": r"Wake-on:\s+(.*)"} - - def parse_lines(self, lines): - new_lines = [] - # Skip header line - for line in lines[1:]: - if not re.search(r": ", line): - new_lines[-1] += " " + line - else: - new_lines.append(line) - - return super(WiredConfig, self).parse_lines(new_lines) - - -class WirelessConfig(Command): - - name = "iwconfig" - - argument_count = 1 - - fraction = r"\d+(/\d+)?" - numeric = r"[\d\.]+" - numeric_with_unit = r"%s( %s)?" % (numeric, Command.non_space) - - output_patterns = { - "access_point": r"Access Point: (.*)", - "bit_rate": r"Bit Rate[=:](%s)/s" % numeric_with_unit, - "channel": r"Channel=(%s)" % Command.non_space, - "essid": r"ESSID:\"?([^\"]+)\"?", - "fragment_thr": r"Fragment thr:(\w+)", - "frequency": r"Frequency:(%s)" % numeric_with_unit, - "invalid_misc": r"Invalid misc:(\d+)", - "link_quality": r"Link Quality[=:](%s)" % fraction, - "missed_beacon": r"Missed beacon:(\d+)", - "mode": r"Mode:(%s)" % Command.non_space, - "noise_level": r"Noise level[=:](%s)" % numeric_with_unit, - "power_management": r"Power Management:(.*)", - "retry_limit": r"Retry limit:(\w+)", - "rts_thr": r"RTS thr:(\w+)", - "rx_invalid_crypt": r"Rx invalid crypt:(\d+)", - "rx_invalid_frag": r"Rx invalid frag:(\d+)", - "rx_invalid_nwid": r"Rx invalid nwid:(\d+)", - "sensitivity": r"Sensitivity=(%s)" % fraction, - "signal_level": r"Signal level[=:](%s)" % numeric_with_unit, - "tx_excessive_retries": r"Tx excessive retries:(\d+)", - "tx_power": r"Tx-Power=(%s)" % numeric_with_unit} - - -class Ping(Command): - - name = "ping" - - argument_count = 1 - - option_strings = { - "count": "-c %d", - "flood": "-f", - "interface": "-I %s", - "quiet": "-q", - "size": "-s %d", - "ttl": "-t %d"} - - option_defaults = { - "count": 1, - "quiet": True} - - ms = r"\d+\.\d+" - rtt = (ms, ms, ms, ms) - - output_patterns = { - "packet_loss": r"(\d+)% packet loss,", - "packets_received": r"(\d+) received,", - "packets_transmitted": r"(\d+) packets transmitted,", - "rtt_avg": r"rtt min/avg/max/mdev = %s/(%s)/%s/%s ms" % rtt, - "rtt_max": r"rtt min/avg/max/mdev = %s/%s/(%s)/%s ms" % rtt, - "rtt_mdev": r"rtt min/avg/max/mdev = %s/%s/%s/(%s) ms" % rtt, - "rtt_min": r"rtt min/avg/max/mdev = (%s)/%s/%s/%s ms" % rtt, - "time": r"time (\d+)ms"} - - def parse_lines(self, lines): - # Skip ping lines - return super(Ping, self).parse_lines(lines[-2:]) - - -class PingLarge(Ping): - - # Some wired environments can handle the maximum ping packet - # size, (65507+28)=65535 bytes. With a count of 191 packets, 65535 - # bytes/packet, 8 bits/byte, the sum payload is 100137480 bits ~ - # 100Mb. This is preferred and will be tried first. - packet_size = 65507 - packet_count = 191 - - option_defaults = { - "count": packet_count, - "flood": True, - "quiet": True, - "size": packet_size, - "ttl": 1} - - -class PingSmall(PingLarge): - - # If the large packet test was too lossy, we fall back to a packet - # equal to the default MTU size of 1500, (1472+28)=1500 bytes. - # With a count of 8334 packets, 1500 bytes/packet, 8 bits/byte, the - # sum payload is 100008000 bits ~ 100Mb. - packet_size = 1472 - packet_count = 8334 - - option_defaults = PingLarge.option_defaults.copy() - option_defaults.update({ - "count": packet_count, - "size": packet_size}) - - -class PingHost(Command): - - output_patterns = { - "host": r"(?:Host|Nmap scan report for) (%s)" % NetworkConfig.ipv4, - "mac_address": r"MAC Address: (%s)" % NetworkConfig.mac_address} - - -class PingScan(Command): - - name = "nmap -n -sP" - - argument_count = 1 - - def parse_lines(self, lines): - hosts = [] - host_lines = [] - # Skip header lines - for line in lines[1:]: - host_lines.append(line) - if line.startswith("MAC Address"): - host = PingHost().parse_lines(host_lines) - hosts.append(host) - host_lines = [] - - return hosts - - -class Ip(object): - - def __init__(self, address): - self.address = address - self.binary = self._address_to_binary(address) - - def __str__(self): - return self.address - - def _address_to_binary(self, address): - binary = 0 - for position, part in enumerate(address.split(".")): - if position >= 4: - raise ValueError("Address contains more than four parts.") - try: - if not part: - part = 0 - else: - part = int(part) - if not 0 <= part < 256: - raise ValueError - except ValueError: - raise ValueError("Address part out of range.") - binary <<= 8 - binary += part - return binary - - def count_1_bits(self): - ret = 0 - num = self.binary - while num > 0: - num = num >> 1 - ret += 1 - return ret - - def count_0_bits(self): - num = int(self.binary) - if num < 0: - raise ValueError("Only positive Numbers please: %s" % (num)) - ret = 0 - while num > 0: - if num & 1 == 1: - break - num = num >> 1 - ret += 1 - return ret - - -class IpRange(object): - - def __init__(self, address, netmask): - self.address = Ip(address) - self.netmask = Ip(netmask) - self.prefix = self._netmask_to_prefix(self.netmask) - - def __str__(self): - return "%s/%s" % (self.address, self.prefix) - - def _check_netmask(self, masklen): - num = int(self.netmask.binary) - bits = masklen - - # remove zero bits at the end - while (num & 1) == 0: - num = num >> 1 - bits -= 1 - if bits == 0: - break - # now check if the rest consists only of ones - while bits > 0: - if (num & 1) == 0: - raise ValueError("Netmask %s can't be expressed as an prefix." - % (hex(self.netmask.binary))) - num = num >> 1 - bits -= 1 - - def _netmask_to_prefix(self, netmask): - netlen = netmask.count_0_bits() - masklen = netmask.count_1_bits() - self._check_netmask(masklen) - return masklen - netlen - - def contains(self, address): - address = Ip(address) - if self.address.binary & self.netmask.binary \ - == address.binary & self.netmask.binary: - return True - - return False - - def scan(self, max=None): - scan = PingScan(str(self)).run() - targets = [s.host for s in scan] - random.shuffle(targets) - - if max is not None: - targets = targets[:max] - - return targets - - -class NetworkManagerException(Exception): - - pass - - -class NetworkManager(object): - - NM_SERVICE = "org.freedesktop.NetworkManager" - NM_PATH = "/org/freedesktop/NetworkManager" - NM_INTERFACE = NM_SERVICE - - NM_PATH_DEVICES = "/org/freedesktop/NetworkManager/Devices" - NM_INTERFACE_DEVICES = "org.freedesktop.NetworkManager.Devices" - - NMI_SERVICE = "org.freedesktop.NetworkManagerInfo" - NMI_PATH = "/org/freedesktop/NetworkManagerInfo" - NMI_INTERFACE = NMI_SERVICE - - HAL_SERVICE = "org.freedesktop.Hal" - HAL_PATH = "/org/freedesktop/Hal/Manager" - HAL_INTERFACE = "org.freedesktop.Hal.Manager" - HAL_INTERFACE_DEVICE = "org.freedesktop.Hal.Device" - - #http://projects.gnome.org/NetworkManager/developers/ - #NetworkManager D-Bus API Specifications, look for the - #NM_STATE enumeration to see which statuses indicate connection - #established and put them in this list. "3" works for NM 0.7 - #and 0.8, while "60" and "70" work for NM 0.9. - STATES_CONNECTED = [3, 60, 70] - - def __init__(self): - try: - import dbus - except ImportError: - raise NetworkManagerException("Python module not found: dbus") - - try: - self._bus = dbus.SystemBus() - self.nm_object = self._bus.get_object(self.NM_SERVICE, - self.NM_PATH) - self.nm_service = dbus.Interface(self.nm_object, self.NM_INTERFACE) - except dbus.exceptions.DBusException: - raise NetworkManagerException("Failed to connect to dbus service") - - def is_connected(self): - state = self.nm_service.state() - return state in self.STATES_CONNECTED - - -class Application(object): - - def __init__(self, targets, interfaces, scan): - self.targets = targets - self.interfaces = interfaces - self.scan = scan - - def test_interface(self, interface, targets): - logging.info("Testing %s at %s-Mbps", interface.name, interface.speed) - for target in targets: - ping = PingLarge(target, interface=interface.name) - result = ping.run() - if result.packet_loss: - ping = PingSmall(target, interface=interface.name) - result = ping.run() - if result.packet_loss: - logging.warning("SKIP: Non-zero packet loss (%s%%) " - "for [%s] [%s]->[%s]", - result.packet_loss, interface.name, - interface.inet_addr, target) - continue - - mbps = (8 * (ping.packet_size + 28) * ping.packet_count - / result.time / 1000) - percent = (100 * 8 * (ping.packet_size + 28) * ping.packet_count - / result.time / 1000 / interface.speed) - if percent >= 10: - logging.info("PASS: Effective rate: %3.4f Mbps, " - "%3.2f%% of theoretical max (%5.2f Mbps)", - mbps, percent, interface.speed) - return True - else: - logging.warning("Unacceptable network effective rate found for [%s]" % interface.name) - logging.warning("Effective rate %3.4f Mbps, %3.2f%% of theoretical max (%5.2f Mbps)" % - (mbps, percent, interface.speed)) - return False - - def run(self): - logging.debug("Acquiring network Interfaces") - if self.interfaces: - interfaces = [NetworkConfig(i).run() for i in self.interfaces] - else: - interfaces = NetworkConfigs().run() - interfaces = [i for i in interfaces if i.inet_addr] - - for interface in interfaces: - if not interface.inet_addr: - logging.debug("No network address for [%s]", interface.name) - continue - - targets = [] - ip_range = IpRange(interface.inet_addr, interface.netmask) - if self.targets: - for target in self.targets: - if ip_range.contains(target): - targets.append(target) - elif interface.name != "lo": - targets = ip_range.scan(self.scan) - logging.info("The following targets were found for %s:" % interface.name) - for target in targets: - logging.info("\t%s" % target) - - if not targets: - logging.debug("No targets found for [%s]", interface.name) - continue - - if not self.test_interface(interface, targets): - return False - - return True - - -class ApplicationManager(object): - - application_factory = Application - - default_log_level = "critical" - default_scan = 1 - default_timeout = 60 - - def get_parser(self, args): - usage = "Usage: %prog [TARGETS]" - - parser = OptionParser(usage=usage) - parser.add_option("-i", "--interface", - dest="interfaces", - action="append", - type="string", - default=[], - help="Interface to test.") - parser.add_option("-s", "--scan", - default=self.default_scan, - type="int", - help="Number of targets to scan when not provided.") - parser.add_option("-t", "--timeout", - default=self.default_timeout, - type="int", - help="Time to wait for network manager to connect.") - parser.add_option("-l", "--log", - metavar="FILE", - help="The file to write the log to.") - parser.add_option("--log-level", - default=self.default_log_level, - help=("One of debug, info, warning, " - "error or critical.")) - - return parser - - def check_uid(self): - return os.getuid() == 0 - - def check_network(self, timeout): - try: - nm = NetworkManager() - except NetworkManagerException: - return True - - start = datetime.now() - while True: - if nm.is_connected(): - return True - if datetime.now() - start > timedelta(seconds=timeout): - return False - sleep(5) - - def create_application(self, args=sys.argv[1:]): - parser = self.get_parser(args) - (options, args) = parser.parse_args(args) - - log_level = logging.getLevelName(options.log_level.upper()) - log_handlers = [] - if options.log: - log_filename = options.log - log_handlers.append(FileHandler(log_filename)) - else: - log_handlers.append(StreamHandler()) - - # Logging setup - format = ("%(asctime)s %(levelname)-8s %(message)s") - date_format = '%Y-%m-%d %H:%M:%S' - if log_handlers: - for handler in log_handlers: - handler.setFormatter(Formatter(format, date_format)) - logging.getLogger().addHandler(handler) - if log_level: - logging.getLogger().setLevel(log_level) - elif not logging.getLogger().handlers: - logging.disable(logging.CRITICAL) - - if not self.check_uid(): - parser.error("Must be run as root.") - - if not self.check_network(options.timeout): - parser.error("Network devices must be configured and connected to a LAN segment before testing") - - targets = args - return self.application_factory(targets, - options.interfaces, options.scan) - - -def main(): - application_manager = ApplicationManager() - application = application_manager.create_application() - if not application.run(): - return 1 - - return 0 - - -if __name__ == "__main__": - sys.exit(main()) diff --git a/bin/network_device_info b/bin/network_device_info index 19e5b68..9aa4f90 100755 --- a/bin/network_device_info +++ b/bin/network_device_info @@ -126,12 +126,9 @@ class NetworkingDevice(): try: stream = check_output(cmd, stderr=STDOUT, universal_newlines=True) except CalledProcessError as err: - print("Error running %s:" % ' '.join(cmd), file=sys.stderr) - print(err.output, file=sys.stderr) return None if not stream: - print("Error: modinfo returned nothing", file=sys.stderr) return None else: parser = ModinfoParser(stream) @@ -193,7 +190,11 @@ def match_counts(nm_devices, udev_devices, devtype): """ # now check that the count (by type) matches nm_type_devices = [dev for dev in nm_devices if dev.gettype() in devtype] - udevtype = 'WIRELESS' if devtype == 'WiFi' else 'NETWORK' + udevtype = 'NETWORK' + if devtype == 'WiFi': + udevtype = 'WIRELESS' + elif devtype == 'Modem': + udevtype = 'WWAN' udev_type_devices = [ udev for udev in udev_devices @@ -262,7 +263,9 @@ def main(args): if not match_counts(nm_devices, udev_devices, "WiFi"): return 1 - elif not match_counts(nm_devices, udev_devices, ("Ethernet", "Modem")): + elif not match_counts(nm_devices, udev_devices, "Ethernet"): + return 1 + elif not match_counts(nm_devices, udev_devices, "Modem"): return 1 else: return 0 diff --git a/bin/removable_storage_test b/bin/removable_storage_test index 3502455..ddd4e1b 100755 --- a/bin/removable_storage_test +++ b/bin/removable_storage_test @@ -177,11 +177,120 @@ class DiskTest(): Indirectly sets: self.rem_disks{,_nm,_memory_cards,_memory_cards_nm,_speed} """ - bus, loop = connect_to_system_bus() - if is_udisks2_supported(bus): - self._probe_disks_udisks2(bus) + if "SNAP" in os.environ: + self._probe_disks_udisks2_cli() else: - self._probe_disks_udisks1(bus) + bus, loop = connect_to_system_bus() + if is_udisks2_supported(bus): + self._probe_disks_udisks2(bus) + else: + self._probe_disks_udisks1(bus) + + def _probe_disks_udisks2_cli(self): + # First we will build up a db of udisks info by scraping the output + # of the dump command + # TODO: remove the snap prefix when the alias becomes available + proc = subprocess.Popen(['udisks2.udisksctl', 'dump'], + stdout=subprocess.PIPE) + udisks_devices = {} + current_bd = None + current_interface = None + while True: + line = proc.stdout.readline().decode(sys.stdout.encoding) + if line == '': + break + if line == '\n': + current_bd = None + current_interface = None + if line.startswith('/org/freedesktop/UDisks2/'): + path = line.strip() + current_bd = os.path.basename(path).rstrip(':') + udisks_devices[current_bd] = {} + continue + if current_bd is None: + continue + if line.startswith(' org.freedesktop'): + current_interface = line.strip().rstrip(':') + udisks_devices[current_bd][current_interface] = {} + continue + if current_interface is None: + continue + entry = ''.join(c for c in line if c not in '\n\t\' ') + wanted_keys = ('Device:', 'Drive:', 'MountPoints:', 'Vendor:', + 'ConnectionBus:', 'Model:', 'Media:',) + for key in wanted_keys: + if entry.startswith(key): + udisks_devices[current_bd][current_interface][key] = ( + entry[len(key):]) + + # Now use the populated udisks structure to fill out the API used by + # other _probe disks functions + for device, interfaces in udisks_devices.items(): + # iterate over udisks objects that have both filesystem and + # block device interfaces + if (UDISKS2_FILESYSTEM_INTERFACE in interfaces and + UDISKS2_BLOCK_INTERFACE in interfaces): + # To be an IO candidate there must be a drive object + drive = interfaces[UDISKS2_BLOCK_INTERFACE].get('Drive:') + if drive is None or drive is '/': + continue + drive_object = udisks_devices[os.path.basename(drive)] + + # Get the connection bus property from the drive interface of + # the drive object. This is required to filter out the devices + # we don't want to look at now. + connection_bus = ( + drive_object[UDISKS2_DRIVE_INTERFACE]['ConnectionBus:']) + desired_connection_buses = set([ + map_udisks1_connection_bus(device) + for device in self.device]) + # Skip devices that are attached to undesired connection buses + if connection_bus not in desired_connection_buses: + continue + + dev_file = ( + interfaces[UDISKS2_BLOCK_INTERFACE].get('Device:')) + + parent = self._find_parent(dev_file.replace('/dev/', '')) + if (parent and + find_pkname_is_root_mountpoint(parent, self.lsblk)): + continue + + # XXX: we actually only scrape the first one currently + mount_point = ( + interfaces[UDISKS2_FILESYSTEM_INTERFACE].get( + 'MountPoints:')) + if mount_point == '': + mount_point = None + + # We need to skip-non memory cards if we look for memory cards + # and vice-versa so let's inspect the drive and use heuristics + # to detect memory cards (a memory card reader actually) now. + if self.memorycard != is_memory_card( + drive_object[UDISKS2_DRIVE_INTERFACE]['Vendor:'], + drive_object[UDISKS2_DRIVE_INTERFACE]['Model:'], + drive_object[UDISKS2_DRIVE_INTERFACE]['Media:']): + continue + + if mount_point is None: + self.rem_disks_memory_cards_nm[dev_file] = None + self.rem_disks_nm[dev_file] = None + else: + self.rem_disks_memory_cards[dev_file] = mount_point + self.rem_disks[dev_file] = mount_point + + # Get the speed of the interconnect that is associated with the + # block device we're looking at. This is purely informational + # but it is a part of the required API + udev_devices = get_udev_block_devices(GUdev.Client()) + for udev_device in udev_devices: + if udev_device.get_device_file() == dev_file: + interconnect_speed = get_interconnect_speed(udev_device) + if interconnect_speed: + self.rem_disks_speed[dev_file] = ( + interconnect_speed * 10 ** 6) + else: + self.rem_disks_speed[dev_file] = None def _probe_disks_udisks2(self, bus): """ diff --git a/bin/removable_storage_watcher b/bin/removable_storage_watcher index 794bb1b..88cae9b 100755 --- a/bin/removable_storage_watcher +++ b/bin/removable_storage_watcher @@ -411,7 +411,7 @@ class UDisks2StorageDeviceListener: UDISKS2_DRIVE_PROPERTY_CONNECTION_BUS = "ConnectionBus" def __init__(self, system_bus, loop, action, devices, minimum_speed, - memorycard): + memorycard, unmounted = False): # Store the desired minimum speed of the device in Mbit/s. The argument # is passed as the number of bits per second so let's fix that. self._desired_minimum_speed = minimum_speed / 10 ** 6 @@ -421,6 +421,9 @@ class UDisks2StorageDeviceListener: map_udisks1_connection_bus(device) for device in devices]) # Check if we are explicitly looking for memory cards self._desired_memory_card = memorycard + # Store information whether we also want detected, but unmounted + # devices too + self._allow_unmounted = unmounted # Store the desired "delta" direction depending on # whether we test for insertion or removal if action == "insert": @@ -561,7 +564,10 @@ class UDisks2StorageDeviceListener: # Skip objects we already ignored and complained about before if object_path in self._ignored_objects: continue - needs = set(('block-fs', 'partition', 'non-empty', 'mounted')) + needs = set(('block-fs', 'partition', 'non-empty')) + if not self._allow_unmounted: + needs.add('mounted') + # As a special exception when the ConnectionBus is allowed to be # empty, as is the case with eSATA devices, do not require the # filesystem to be mounted as gvfs may choose not to mount it @@ -851,6 +857,8 @@ def main(): dest='logging_level', help="Enable verbose output") parser.add_argument('--debug', action='store_const', const=logging.DEBUG, dest='logging_level', help="Enable debugging") + parser.add_argument('--unmounted', action='store_true', + help="Don't require drive being automounted") parser.set_defaults(logging_level=logging.WARNING) args = parser.parse_args() @@ -873,7 +881,8 @@ def main(): logging.debug("Using UDisks2 interface") listener = UDisks2StorageDeviceListener( system_bus, loop, - args.action, args.device, args.minimum_speed, args.memorycard) + args.action, args.device, args.minimum_speed, args.memorycard, + args.unmounted) else: # Construct the listener with all of the arguments provided on the # command line and the explicit system_bus, loop objects. diff --git a/bin/storage_test b/bin/storage_test index 5f7516b..da4b717 100755 --- a/bin/storage_test +++ b/bin/storage_test @@ -116,7 +116,7 @@ fi echo "Set disk to $disk" scripted_mount=0 -if [ -b $disk ] +if [ -b "$disk" ] then echo "$disk is a block device" @@ -133,7 +133,7 @@ then # Regex changed to better handle when $disk appears more than once # in parted output (such as in warning messages or not caught in the # check above) - size=`parted -l -s |grep "Disk.*${disk}" |awk '{print $3}'` + size=`parted -l -s 2>&1 | grep "Disk .*${disk}:" | awk '{print $3}'` if [ -n "$size" ] then @@ -163,15 +163,15 @@ then fi - if [ $size_range == "KB" ] + if [ "$size_range" == "KB" ] then echo "$disk size reported in KB, seems to be too small for testing." exit 1 - elif [ $size_range == "MB" ] + elif [ "$size_range" == "MB" ] then size_int=${size::${#size}-2} - if [ $size_int -gt 10 ] + if [ "$size_int" -gt 10 ] then run_bonnie $disk if [[ $scripted_mount == 1 ]] |