summaryrefslogtreecommitdiff
path: root/bin
diff options
authorPMR <pmr@pmr-lander>2017-07-05 09:44:54 +0000
committerPMR <pmr@pmr-lander>2017-07-05 09:44:54 +0000
commit1cddd6723ed57f468b3a11d89a1dbca79facac8e (patch)
treeede9ddf7bb4e30ddf2e77693d303d28c1463a49c /bin
parent4ba702d74b44de712fd9d15f1b5fa581bd796229 (diff)
parentdab49b3d0559813341a7c74a5e77c185ed3b52f1 (diff)
Merge #326656 from ~sylvain-pineau/plainbox-provider-checkbox:cleanup_duplicates
Diffstat (limited to 'bin')
-rwxr-xr-xbin/bt_connect2
-rw-r--r--bin/bt_helper.py300
-rwxr-xr-xbin/fwts_test459
-rwxr-xr-xbin/gateway_ping_test2
-rwxr-xr-xbin/network_bandwidth_test671
-rwxr-xr-xbin/network_device_info13
-rwxr-xr-xbin/removable_storage_test117
-rwxr-xr-xbin/removable_storage_watcher15
-rwxr-xr-xbin/storage_test10
9 files changed, 140 insertions, 1449 deletions
diff --git a/bin/bt_connect b/bin/bt_connect
index 0ef5169..b52642a 100755
--- a/bin/bt_connect
+++ b/bin/bt_connect
@@ -23,7 +23,7 @@
import sys
import time
-import bt_helper
+import checkbox_support.bt_helper
from argparse import ArgumentParser
diff --git a/bin/bt_helper.py b/bin/bt_helper.py
deleted file mode 100644
index 91f879a..0000000
--- a/bin/bt_helper.py
+++ /dev/null
@@ -1,300 +0,0 @@
-# Copyright 2016 Canonical Ltd.
-# Written by:
-# Maciej Kisielewski <maciej.kisielewski@canonical.com>
-#
-# This is free software: you can redistribute it and/or modify
-# it under the terms of the GNU General Public License version 3,
-# as published by the Free Software Foundation.
-#
-# This file is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this file. If not, see <http://www.gnu.org/licenses/>.
-"""
-This module provides a set of abstractions to ease the process of automating
-typical Bluetooth task like scanning for devices and pairing with them.
-
-It talks with BlueZ stack using dbus.
-"""
-import logging
-
-import dbus
-import dbus.service
-import dbus.mainloop.glib
-from gi.repository import GObject
-
-logger = logging.getLogger(__file__)
-logger.addHandler(logging.StreamHandler())
-
-IFACE = 'org.bluez.Adapter1'
-ADAPTER_IFACE = 'org.bluez.Adapter1'
-DEVICE_IFACE = 'org.bluez.Device1'
-AGENT_IFACE = 'org.bluez.Agent1'
-
-dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
-
-# To get additional Bluetoot CoDs, check
-# https://www.bluetooth.com/specifications/assigned-numbers/baseband
-BT_ANY = 0
-BT_KEYBOARD = int('0x2540', 16)
-
-
-class BtException(Exception):
- pass
-
-
-class BtManager:
- """ Main point of contact with dbus factoring bt objects. """
- def __init__(self, verbose=False):
- if verbose:
- logger.setLevel(logging.DEBUG)
- self._bus = dbus.SystemBus()
- self._bt_root = self._bus.get_object('org.bluez', '/')
- self._manager = dbus.Interface(
- self._bt_root, 'org.freedesktop.DBus.ObjectManager')
- self._main_loop = GObject.MainLoop()
- self._register_agent()
-
- def _register_agent(self):
- path = "/bt_helper/agent"
- BtAgent(self._bus, path)
- obj = self._bus.get_object('org.bluez', "/org/bluez")
- agent_manager = dbus.Interface(obj, "org.bluez.AgentManager1")
- agent_manager.RegisterAgent(path, 'NoInputNoOutput')
- logger.info("Agent registered")
-
- def _get_objects_by_iface(self, iface_name):
- for path, ifaces in self._manager.GetManagedObjects().items():
- if ifaces.get(iface_name):
- yield self._bus.get_object('org.bluez', path)
-
- def get_bt_adapters(self):
- """Yield BtAdapter objects for each BT adapter found."""
- for adapter in self._get_objects_by_iface(ADAPTER_IFACE):
- yield BtAdapter(dbus.Interface(adapter, ADAPTER_IFACE), self)
-
- def get_bt_devices(self, category=BT_ANY, filters={}):
- """Yields BtDevice objects currently known to the system.
-
- filters - specifies the characteristics of that a BT device must have
- to be yielded. The keys of filters dictionary represent names of
- parameters (as specified by the bluetooth DBus Api and represented by
- DBus proxy object), and its values must match proxy values.
- I.e. {'Paired': False}. For a full list of Parameters see:
- http://git.kernel.org/cgit/bluetooth/bluez.git/tree/doc/device-api.txt
-
- Note that this function returns objects corresponding to BT devices
- that were seen last time scanning was done."""
- for device in self._get_objects_by_iface(DEVICE_IFACE):
- obj = self.get_object_by_path(device.object_path)[DEVICE_IFACE]
- try:
- if category != BT_ANY:
- if obj['Class'] != category:
- continue
- rejected = False
- for filter in filters:
- if obj[filter] != filters[filter]:
- rejected = True
- break
- if rejected:
- continue
- yield BtDevice(dbus.Interface(device, DEVICE_IFACE), self)
- except KeyError as exc:
- logger.info('Property %s not found on device %s',
- exc, device.object_path)
- continue
-
- def get_prop_iface(self, obj):
- return dbus.Interface(self._bus.get_object(
- 'org.bluez', obj.object_path), 'org.freedesktop.DBus.Properties')
-
- def get_object_by_path(self, path):
- return self._manager.GetManagedObjects()[path]
-
- def get_proxy_by_path(self, path):
- return self._bus.get_object('org.bluez', path)
-
- def wait(self):
- self._main_loop.run()
-
- def quit_loop(self):
- self._main_loop.quit()
-
- def ensure_adapters_powered(self):
- for adapter in self.get_bt_adapters():
- adapter.ensure_powered()
-
- def scan(self, timeout=10):
- """Scan for BT devices visible to all adapters.'"""
- self._bus.add_signal_receiver(
- interfaces_added,
- dbus_interface="org.freedesktop.DBus.ObjectManager",
- signal_name="InterfacesAdded")
- self._bus.add_signal_receiver(
- properties_changed,
- dbus_interface="org.freedesktop.DBus.Properties",
- signal_name="PropertiesChanged",
- arg0="org.bluez.Device1",
- path_keyword="path")
- for adapter in self._get_objects_by_iface(ADAPTER_IFACE):
- try:
- dbus.Interface(adapter, ADAPTER_IFACE).StopDiscovery()
- except dbus.exceptions.DBusException:
- pass
- dbus.Interface(adapter, ADAPTER_IFACE).StartDiscovery()
- GObject.timeout_add_seconds(timeout, self._scan_timeout)
- self._main_loop.run()
-
- def get_devices(self, timeout=10, rescan=True):
- """Scan for and list all devices visible to all adapters."""
- if rescan:
- self.scan(timeout)
- return list(self.get_bt_devices())
-
- def _scan_timeout(self):
- for adapter in self._get_objects_by_iface(ADAPTER_IFACE):
- dbus.Interface(adapter, ADAPTER_IFACE).StopDiscovery()
- self._main_loop.quit()
-
-
-class BtAdapter:
- def __init__(self, dbus_iface, bt_mgr):
- self._if = dbus_iface
- self._bt_mgr = bt_mgr
- self._prop_if = bt_mgr.get_prop_iface(dbus_iface)
-
- def set_bool_prop(self, prop_name, value):
- self._prop_if.Set(IFACE, prop_name, dbus.Boolean(value))
-
- def ensure_powered(self):
- """Turn the adapter on, and do nothing if already on."""
- powered = self._prop_if.Get(IFACE, 'Powered')
- logger.info('Powering on {}'.format(
- self._if.object_path.split('/')[-1]))
- if powered:
- logger.info('Device already powered')
- return
- try:
- self.set_bool_prop('Powered', True)
- logger.info('Powered on')
- except Exception as exc:
- logging.error('Failed to power on - {}'.format(
- exc.get_dbus_message()))
-
-
-class BtDevice:
- def __init__(self, dbus_iface, bt_mgr):
- self._if = dbus_iface
- self._obj = bt_mgr.get_object_by_path(
- self._if.object_path)[DEVICE_IFACE]
- self._bt_mgr = bt_mgr
- self._prop_if = bt_mgr.get_prop_iface(dbus_iface)
- self._pair_outcome = None
-
- def __str__(self):
- return "{} ({})".format(self.name, self.address)
-
- def __repr__(self):
- return "<BtDevice name:{}, address:{}>".format(self.name, self.address)
-
- def pair(self):
- """Pair the device.
-
- This function will try pairing with the device and block until device
- is paired, error occured or default timeout elapsed (whichever comes
- first).
- """
- self._prop_if.Set(DEVICE_IFACE, 'Trusted', True)
- self._if.Pair(
- reply_handler=self._pair_ok, error_handler=self._pair_error)
- self._bt_mgr.wait()
- if self._pair_outcome:
- raise BtException(self._pair_outcome)
- try:
- self._if.Connect()
- except dbus.exceptions.DBusException as exc:
- logging.error('Failed to connect - {}'.format(
- exc.get_dbus_message()))
-
- def unpair(self):
- self._if.Disconnect()
- adapter = self._bt_mgr.get_proxy_by_path(self._obj['Adapter'])
- dbus.Interface(adapter, ADAPTER_IFACE).RemoveDevice(self._if)
-
- @property
- def name(self):
- return self._obj.get('Name', '<Unnamed>')
-
- @property
- def address(self):
- return self._obj['Address']
-
- @property
- def rssi(self):
- return self._obj.get('RSSI', None)
-
- def _pair_ok(self):
- logger.info('%s successfully paired', self.name)
- self._pair_outcome = None
- self._bt_mgr.quit_loop()
-
- def _pair_error(self, error):
- logger.warning('Pairing of %s device failed. %s', self.name, error)
- self._pair_outcome = error
- self._bt_mgr.quit_loop()
-
-
-class Rejected(dbus.DBusException):
- _dbus_error_name = "org.bluez.Error.Rejected"
-
-
-class BtAgent(dbus.service.Object):
- """Agent authenticating everything that is possible."""
- @dbus.service.method(AGENT_IFACE, in_signature="os", out_signature="")
- def AuthorizeService(self, device, uuid):
- logger.info("AuthorizeService (%s, %s)", device, uuid)
-
- @dbus.service.method(AGENT_IFACE, in_signature="o", out_signature="u")
- def RequestPasskey(self, device):
- logger.info("RequestPasskey (%s)", device)
- passkey = input("Enter passkey: ")
- return dbus.UInt32(passkey)
-
- @dbus.service.method(AGENT_IFACE, in_signature="o", out_signature="s")
- def RequestPinCode(self, device):
- logger.info("RequestPinCode (%s)", device)
- return input("Enter PIN Code: ")
-
- @dbus.service.method(AGENT_IFACE, in_signature="ouq", out_signature="")
- def DisplayPasskey(self, device, passkey, entered):
- print("DisplayPasskey (%s, %06u entered %u)" %
- (device, passkey, entered), flush=True)
-
- @dbus.service.method(AGENT_IFACE, in_signature="os", out_signature="")
- def DisplayPinCode(self, device, pincode):
- logger.info("DisplayPinCode (%s, %s)", device, pincode)
- print('Type following pin on your device: {}'.format(pincode),
- flush=True)
-
- @dbus.service.method(AGENT_IFACE, in_signature="ou", out_signature="")
- def RequestConfirmation(self, device, passkey):
- logger.info("RequestConfirmation (%s, %06d)", device, passkey)
-
- @dbus.service.method(AGENT_IFACE, in_signature="o", out_signature="")
- def RequestAuthorization(self, device):
- logger.info("RequestAuthorization (%s)", device)
-
- @dbus.service.method(AGENT_IFACE, in_signature="", out_signature="")
- def Cancel(self):
- logger.info("Cancelled")
-
-
-def properties_changed(interface, changed, invalidated, path):
- logger.info('Property changed for device @ %s. Change: %s', path, changed)
-
-
-def interfaces_added(path, interfaces):
- logger.info('Added new bt interfaces: %s @ %s', interfaces, path)
diff --git a/bin/fwts_test b/bin/fwts_test
deleted file mode 100755
index c8ff1f7..0000000
--- a/bin/fwts_test
+++ /dev/null
@@ -1,459 +0,0 @@
-#! /usr/bin/python3
-
-import sys
-import re
-from time import time
-from argparse import ArgumentParser, RawTextHelpFormatter, REMAINDER
-from subprocess import Popen, PIPE, check_output
-from syslog import *
-from distutils.spawn import find_executable
-import os
-
-# These tests require user interaction and need either special handling
-# or skipping altogether (right now, we skip them but they're kept here
-# in case we figure out a way to present the interaction to the user).
-INTERACTIVE_TESTS = ['ac_adapter',
- 'battery',
- 'hotkey',
- 'power_button',
- 'brightness',
- 'lid']
-# Tests recommended by the Hardware Enablement Team (HWE)
-# These are performed on QA certification runs
-QA_TESTS = ['acpitests',
- 'apicedge',
- 'aspm',
- 'cpufreq',
- 'dmicheck',
- 'esrt',
- 'klog',
- 'maxfreq',
- 'msr',
- 'mtrr',
- 'nx',
- 'oops',
- 'uefibootpath',
- 'uefirtmisc',
- 'uefirttime',
- 'uefirtvariable',
- 'version',
- 'virt']
-# The following tests will record logs in a separate file for the HWE team
-HWE_TESTS = ['version',
- 'mtrr',
- 'virt',
- 'apicedge',
- 'klog',
- 'oops']
-# By default, we launch all the tests
-TESTS = sorted(list(set(QA_TESTS + HWE_TESTS)))
-
-
-def get_sleep_times(start_marker, end_marker, sleep_time, resume_time):
- logfile = '/var/log/syslog'
- log_fh = open(logfile, 'r', encoding='UTF-8')
- line = ''
- run = 'FAIL'
- sleep_start_time = 0.0
- sleep_end_time = 0.0
- resume_start_time = 0.0
- resume_end_time = 0.0
-
- while start_marker not in line:
- try:
- line = log_fh.readline()
- except UnicodeDecodeError:
- continue
- if start_marker in line:
- loglist = log_fh.readlines()
-
- for idx in range(0, len(loglist)):
- if 'PM: Syncing filesystems' in loglist[idx]:
- sleep_start_time = re.split('[\[\]]', loglist[idx])[1].strip()
- if 'ACPI: Low-level resume complete' in loglist[idx]:
- sleep_end_time = re.split('[\[\]]', loglist[idx - 1])[1].strip()
- resume_start_time = re.split('[\[\]]', loglist[idx])[1].strip()
- idx += 1
- if 'Restarting tasks' in loglist[idx]:
- resume_end_time = re.split('[\[\]]', loglist[idx])[1].strip()
- if end_marker in loglist[idx]:
- run = 'PASS'
- break
-
- sleep_elapsed = float(sleep_end_time) - float(sleep_start_time)
- resume_elapsed = float(resume_end_time) - float(resume_start_time)
- return (run, sleep_elapsed, resume_elapsed)
-
-
-def average_times(runs):
- sleep_total = 0.0
- resume_total = 0.0
- run_count = 0
- for run in runs.keys():
- run_count += 1
- sleep_total += runs[run][1]
- resume_total += runs[run][2]
- sleep_avg = sleep_total / run_count
- resume_avg = resume_total / run_count
- print('Average time to sleep: %0.5f' % sleep_avg)
- print('Average time to resume: %0.5f' % resume_avg)
-
-
-def fix_sleep_args(args):
- new_args = []
- for arg in args:
- if "=" in arg:
- new_args.extend(arg.split('='))
- else:
- new_args.append(arg)
- return new_args
-
-
-def detect_progress_indicator():
- # Return a command suitable for piping progress information to its
- # stdin (invoked via Popen), in list format.
- # Return zenity if installed and DISPLAY (--auto-close)
- # return dialog if installed and no DISPLAY (width height)
- display = os.environ.get('DISPLAY')
- if display and find_executable('zenity'):
- return ["zenity", "--progress", "--text", "Progress", "--auto-close"]
- if not display and find_executable('dialog'):
- return ["dialog", "--gauge", "Progress", "20", "70"]
- # Return None if no progress indicator is to be used
- return None
-
-
-def main():
- description_text = 'Tests the system BIOS using the Firmware Test Suite'
- epilog_text = ('To perform sleep testing, you will need at least some of '
- 'the following options: \n'
- 's3 or s4: tells fwts which type of sleep to perform.\n'
- '--s3-delay-delta\n'
- '--s3-device-check\n'
- '--s3-device-check-delay\n'
- '--s3-hybrid-sleep\n'
- '--s3-max-delay\n'
- '--s3-min-delay\n'
- '--s3-multiple\n'
- '--s3-quirks\n'
- '--s3-sleep-delay\n'
- '--s3power-sleep-delay\n\n'
- 'Example: fwts_test --sleep s3 --s3-min-delay 30 '
- '--s3-multiple 10 --s3-device-check\n\n'
- 'For further help with sleep options:\n'
- 'fwts_test --fwts-help')
- parser = ArgumentParser(description=description_text,
- epilog=epilog_text,
- formatter_class=RawTextHelpFormatter)
- parser.add_argument('-l', '--log',
- default='/tmp/fwts_results.log',
- help=('Specify the location and name '
- 'of the log file.\n'
- '[Default: %(default)s]'))
- parser.add_argument('-f', '--fail-level',
- default='high',
- choices=['critical', 'high', 'medium',
- 'low', 'none', 'aborted'],
- help=('Specify the FWTS failure level that will '
- 'trigger this script to return a failing exit '
- 'code. For example, if you chose "critical" as '
- 'the fail-level, this wrapper will NOT return '
- 'a failing exit code unless FWTS reports a '
- 'test as FAILED_CRITICAL. You will still be '
- 'notified of all FWTS test failures. '
- '[Default level: %(default)s]'))
- sleep_args = parser.add_argument_group('Sleep Options',
- ('The following arguments are to '
- 'only be used with the '
- '--sleep test option'))
- sleep_args.add_argument('--sleep-time',
- dest='sleep_time',
- action='store',
- help=('The max time in seconds that a system '
- 'should take\nto completely enter sleep. '
- 'Anything more than this\ntime will cause '
- 'that test iteration to fail.\n'
- '[Default: 10s]'))
- sleep_args.add_argument('--resume-time',
- dest='resume_time',
- action='store',
- help=('Same as --sleep-time, except this applies '
- 'to the\ntime it takes a system to fully '
- 'wake from sleep.\n[Default: 3s]'))
-
- group = parser.add_mutually_exclusive_group()
- group.add_argument('-t', '--test',
- action='append',
- help='Name of the test to run.')
- group.add_argument('-s', '--sleep',
- nargs=REMAINDER,
- action='store',
- help=('Perform sleep test(s) using the additional\n'
- 'arguments provided after --sleep. Remaining\n'
- 'items on the command line will be passed \n'
- 'through to fwts for performing sleep tests. \n'
- 'For info on these extra fwts options, please \n'
- 'see the epilog below and \n'
- 'the --fwts-help option.'))
- group.add_argument('--hwe',
- action='store_true',
- help='Run HWE concerned tests in fwts')
- group.add_argument('--qa',
- action='store_true',
- help='Run QA concerned tests in fwts')
- group.add_argument('--fwts-help',
- dest='fwts_help',
- action='store_true',
- help='Display the help info for fwts itself (lengthy)')
- group.add_argument('--list',
- action='store_true',
- help='List all tests in fwts.')
- group.add_argument('--list-hwe',
- action='store_true',
- help='List all HWE concerned tests in fwts')
- group.add_argument('--list-qa',
- action='store_true',
- help='List all QA concerned tests in fwts')
- args = parser.parse_args()
-
- tests = []
- results = {}
- critical_fails = []
- high_fails = []
- medium_fails = []
- low_fails = []
- passed = []
- aborted = []
-
- # Set correct fail level
- if args.fail_level is not 'none':
- args.fail_level = 'FAILED_%s' % args.fail_level.upper()
-
- # Get our failure priority and create the priority values
- fail_levels = {'FAILED_CRITICAL': 4,
- 'FAILED_HIGH': 3,
- 'FAILED_MEDIUM': 2,
- 'FAILED_LOW': 1,
- 'FAILED_NONE': 0,
- 'FAILED_ABORTED': -1}
- fail_priority = fail_levels[args.fail_level]
-
- # Enforce only using sleep opts with --sleep
- if args.sleep_time or args.resume_time and not args.sleep:
- parser.error('--sleep-time and --resume-time only apply to the '
- '--sleep testing option.')
- if args.fwts_help:
- Popen('fwts -h', shell=True).communicate()[0]
- return 0
- elif args.list:
- print('\n'.join(TESTS))
- return 0
- elif args.list_hwe:
- print('\n'.join(HWE_TESTS))
- return 0
- elif args.list_qa:
- print('\n'.join(QA_TESTS))
- return 0
- elif args.test:
- tests.extend(args.test)
- elif args.hwe:
- tests.extend(HWE_TESTS)
- elif args.qa:
- tests.extend(QA_TESTS)
- elif args.sleep:
- args.sleep = fix_sleep_args(args.sleep)
- iterations = 1
- # if multiple iterations are requested, we need to intercept
- # that argument and keep it from being presented to fwts since
- # we're handling the iterations directly.
- s3 = '--s3-multiple'
- s4 = '--s4-multiple'
- if s3 in args.sleep:
- iterations = int(args.sleep.pop(args.sleep.index(s3) + 1))
- args.sleep.remove(s3)
- if s4 in args.sleep:
- iterations = int(args.sleep.pop(args.sleep.index(s4) + 1))
- args.sleep.remove(s4)
- # if we've passed our custom sleep arguments for resume or sleep
- # time, we need to intercept those as well.
- resume_time_arg = '--resume-time'
- sleep_time_arg = '--sleep-time'
- if resume_time_arg in args.sleep:
- args.resume_time = int(args.sleep.pop(
- args.sleep.index(resume_time_arg) + 1))
- args.sleep.remove(resume_time_arg)
- if sleep_time_arg in args.sleep:
- args.sleep_time = int(args.sleep.pop(
- args.sleep.index(sleep_time_arg) + 1))
- args.sleep.remove(sleep_time_arg)
- # if we still haven't set a sleep or resume time, use defauts.
- if not args.sleep_time:
- args.sleep_time = 10
- if not args.resume_time:
- args.resume_time = 3
- tests.extend(args.sleep)
- else:
- tests.extend(TESTS)
-
- # run the tests we want
- if args.sleep:
- iteration_results = {}
- print('=' * 20 + ' Test Results ' + '=' * 20)
- progress_indicator = None
- if detect_progress_indicator():
- progress_indicator = Popen(detect_progress_indicator(),
- stdin=PIPE)
- for iteration in range(0, iterations):
- timestamp = int(time())
- start_marker = 'CHECKBOX SLEEP TEST START %s' % timestamp
- end_marker = 'CHECKBOX SLEEP TEST STOP %s' % timestamp
- syslog(LOG_INFO, '---' + start_marker + '---' + str(time()))
- command = ('fwts -q --stdout-summary -r %s %s'
- % (args.log, ' '.join(tests)))
- results['sleep'] = (Popen(command, stdout=PIPE, shell=True)
- .communicate()[0].strip()).decode()
- syslog(LOG_INFO, '---' + end_marker + '---' + str(time()))
- if 's4' not in args.sleep:
- sleep_times = get_sleep_times(start_marker,
- end_marker,
- args.sleep_time,
- args.resume_time)
- iteration_results[iteration] = sleep_times
- progress_tuple = (iteration,
- iteration_results[iteration][0],
- iteration_results[iteration][1],
- iteration_results[iteration][2])
- progress_string = (' - Cycle %s: Status: %s '
- 'Sleep Elapsed: %0.5f '
- 'Resume Elapsed: '
- ' %0.5f' % progress_tuple)
- progress_pct = "{}".format(int(100 * iteration / iterations))
- if "zenity" in detect_progress_indicator():
- progress_indicator.stdin.write("# {}\n".format(
- progress_string).encode('utf-8'))
- progress_indicator.stdin.write("{}\n".format(
- progress_pct).encode('utf-8'))
- progress_indicator.stdin.flush()
- elif "dialog" in detect_progress_indicator():
- progress_indicator.stdin.write("XXX\n".encode('utf-8'))
- progress_indicator.stdin.write(
- progress_pct.encode('utf-8'))
- progress_indicator.stdin.write(
- "\nTest progress\n".encode('utf-8'))
- progress_indicator.stdin.write(
- progress_string.encode('utf-8'))
- progress_indicator.stdin.write(
- "\nXXX\n".encode('utf-8'))
- progress_indicator.stdin.flush()
- else:
- print(progress_string)
- progress_indicator.terminate()
-
- if 's4' not in args.sleep:
- average_times(iteration_results)
- for run in iteration_results.keys():
- if 'FAIL' in iteration_results[run]:
- results['sleep'] = 'FAILED_CRITICAL'
- else:
- for test in tests:
- # ACPI tests can now be run with --acpitests (fwts >= 15.07.00)
- log = args.log
- # Split the log file for HWE (only if -t is not used)
- if test == 'acpitests':
- test = '--acpitests'
- command = ('fwts -q --stdout-summary -r %s %s'
- % (log, test))
- results[test] = (Popen(command, stdout=PIPE, shell=True)
- .communicate()[0].strip()).decode()
-
- # lp:1584607
- # We append the content of dmesg and syslog at the end of the logfile
- # generated by FWTS.
- # FIXME: Commented out after discovered that it created HUGE log files
- # during stress tests.
- #with open(args.log, 'a') as logfile:
- # logfile.write("--- beginning of dmesg ---\n")
- # logfile.write(check_output('dmesg').decode('utf-8', 'ignore'))
- # logfile.write("--- end of dmesg ---\n")
- # logfile.write("--- beginning of syslog ---\n")
- # logfile.write(check_output(['cat', '/var/log/syslog']).decode('utf-8', 'ignore'))
- # logfile.write("--- end of syslog ---\n")
-
- # parse the summaries
- for test in results.keys():
- if 'FAILED_CRITICAL' in results[test]:
- critical_fails.append(test)
- if 'FAILED_HIGH' in results[test]:
- high_fails.append(test)
- if 'FAILED_MEDIUM' in results[test]:
- medium_fails.append(test)
- if 'FAILED_LOW' in results[test]:
- low_fails.append(test)
- if 'PASSED' in results[test]:
- passed.append(test)
- if 'ABORTED' in results[test]:
- aborted.append(test)
- else:
- continue
-
- if critical_fails:
- print("Critical Failures: %d" % len(critical_fails))
- print("WARNING: The following test cases were reported as critical\n"
- "level failures by fwts. Please review the log at\n"
- "%s for more information." % args.log)
- for test in critical_fails:
- print(" - " + test)
- if high_fails:
- print("High Failures: %d" % len(high_fails))
- print("WARNING: The following test cases were reported as high\n"
- "level failures by fwts. Please review the log at\n"
- "%s for more information." % args.log)
- for test in high_fails:
- print(" - " + test)
- if medium_fails:
- print("Medium Failures: %d" % len(medium_fails))
- print("WARNING: The following test cases were reported as medium\n"
- "level failures by fwts. Please review the log at\n"
- "%s for more information." % args.log)
- for test in medium_fails:
- print(" - " + test)
- if low_fails:
- print("Low Failures: %d" % len(low_fails))
- print("WARNING: The following test cases were reported as low\n"
- "level failures by fwts. Please review the log at\n"
- "%s for more information." % args.log)
- for test in low_fails:
- print(" - " + test)
- if passed:
- print("Passed: %d" % len(passed))
- for test in passed:
- print(" - " + test)
- if aborted:
- print("Aborted Tests: %d" % len(aborted))
- print("WARNING: The following test cases were aborted by fwts\n"
- "Please review the log at %s for more information."
- % args.log)
- for test in aborted:
- print(" - " + test)
-
- if args.fail_level is not 'none':
- if fail_priority == fail_levels['FAILED_CRITICAL']:
- if critical_fails:
- return 1
- if fail_priority == fail_levels['FAILED_HIGH']:
- if critical_fails or high_fails:
- return 1
- if fail_priority == fail_levels['FAILED_MEDIUM']:
- if critical_fails or high_fails or medium_fails:
- return 1
- if fail_priority == fail_levels['FAILED_LOW']:
- if critical_fails or high_fails or medium_fails or low_fails:
- return 1
- if fail_priority == fail_levels['FAILED_ABORTED']:
- if aborted or critical_fails or high_fails:
- return 1
-
- return 0
-
-if __name__ == '__main__':
- sys.exit(main())
diff --git a/bin/gateway_ping_test b/bin/gateway_ping_test
index 22aec6a..3e58c21 100755
--- a/bin/gateway_ping_test
+++ b/bin/gateway_ping_test
@@ -278,7 +278,7 @@ def main(args):
).format(ping_summary['pct_loss'], args.threshold))
return 0
else:
- print(_("Internet connection fully established"))
+ print(_("Connection to test host fully established"))
return 0
diff --git a/bin/network_bandwidth_test b/bin/network_bandwidth_test
deleted file mode 100755
index c9b1dfb..0000000
--- a/bin/network_bandwidth_test
+++ /dev/null
@@ -1,671 +0,0 @@
-#!/usr/bin/env python3
-
-import os
-import re
-import sys
-import random
-import logging
-import subprocess
-
-from datetime import datetime, timedelta
-from time import sleep
-
-from logging import StreamHandler, FileHandler, Formatter
-from optparse import OptionParser
-
-from checkbox_support.lib.conversion import string_to_type
-
-
-class CommandException(Exception):
-
- pass
-
-
-class CommandOutput(object):
-
- def __init__(self, **attributes):
- self._attributes = attributes
-
- def __getattr__(self, name):
- if name in self._attributes:
- return self._attributes.get(name)
-
- return None
-
-
-class Command(object):
-
- # Name of the command to run
- name = None
-
- # Number of command line arguments
- argument_count = 0
-
- # Option processing
- option_strings = {}
- option_defaults = {}
-
- # Ouput processing
- output_factory = CommandOutput
- output_patterns = {}
-
- # Convenient output patterns
- non_space = r"[^ ]+"
-
- def __init__(self, *arguments, **options):
- if len(arguments) != self.argument_count:
- raise TypeError("Invalid number of arguments: %d" % len(arguments))
-
- self._arguments = arguments
-
- self._options = self.option_defaults.copy()
- for name, string in options.items():
- if name not in self.option_strings:
- raise TypeError("Unknown option: %s" % name)
- self._options[name] = string
-
- def get_command(self):
- command = [self.name]
- for name, string in self._options.items():
- # Match option from string
- if isinstance(string, bool):
- option = self.option_strings[name]
- else:
- option = self.option_strings[name] % string
-
- command.append(option)
-
- command.extend(self._arguments)
-
- return " ".join(command)
-
- def parse_lines(self, lines):
- attributes = {}
- for line in lines:
- # Match patterns from lines
- for name, pattern in self.output_patterns.items():
- match = re.search(pattern, line)
- if match:
- attributes[name] = string_to_type(match.group(1))
-
- return self.output_factory(**attributes)
-
- def parse_output(self, output):
- lines = output.split("\n")
- # Strip leading and trailing spaces
- lines = [l.strip() for l in lines]
- # Skip blank lines
- lines = [l for l in lines if l]
-
- return self.parse_lines(lines)
-
- def run(self):
- command = self.get_command()
- logging.debug("Running command: %s" % command)
- process = subprocess.Popen(command, shell=True,
- stdout=subprocess.PIPE,
- stderr=subprocess.PIPE)
- error = process.stderr.read()
- if error:
- raise CommandException(error.decode("utf-8"))
-
- output = process.stdout.read()
- return self.parse_output(output.decode("utf-8"))
-
-
-class NetworkConfigOutput(CommandOutput):
-
- @property
- def speed(self):
- if self.name == "lo":
- return 10000
-
- try:
- wireless = WirelessConfig(self.name).run()
- speed = wireless.bit_rate
- except CommandException:
- wired = WiredConfig(self.name).run()
- speed = wired.speed
-
- return speed / 1024 / 1024
-
-
-class NetworkConfig(Command):
-
- name = "ifconfig"
-
- argument_count = 1
-
- ipv4 = r"\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}"
- ipv6 = r"[\w:]+/\d+"
- mac_address = r"\w\w:\w\w:\w\w:\w\w:\w\w:\w\w"
-
- output_factory = NetworkConfigOutput
- output_patterns = {
- "name": r"(%s).*Link encap" % Command.non_space,
- "broadcast": r"Bcast:(%s)" % ipv4,
- "collisions": "collisions:(\d+)",
- "hwaddr": r"HWaddr (%s)" % mac_address,
- "inet_addr": r"inet addr:(%s)" % ipv4,
- "link_encap": r"Link encap:(%s)" % Command.non_space,
- "netmask": r"Mask:(%s)" % ipv4,
- "metric": r"Metric:(\d+)",
- "mtu": r"MTU:(\d+)",
- "rx_bytes": "RX bytes:(\d+)",
- "rx_dropped": "RX packets:.* dropped:(\d+)",
- "rx_errors": "RX packets:.* errors:(\d+)",
- "rx_frame": "RX packets:.* frame:(\d+)",
- "rx_overruns": "RX packets:.* overruns:(\d+)",
- "rx_packets": "RX packets:(\d+)",
- "tx_bytes": "TX bytes:(\d+)",
- "tx_carrier": "TX packets:.* carrier:(\d+)",
- "tx_dropped": "TX packets:.* dropped:(\d+)",
- "tx_errors": "TX packets:.* errors:(\d+)",
- "tx_overruns": "TX packets:.* overruns:(\d+)",
- "tx_packets": "TX packets:(\d+)",
- "txqueuelen": "txqueuelen:(\d+)"}
-
-
-class NetworkConfigs(Command):
-
- name = "ifconfig -a"
-
- def parse_output(self, output):
- outputs = []
- for paragraph in output.split("\n\n"):
- if not paragraph:
- continue
-
- lines = paragraph.split("\n")
- name = re.split(r"\s+", lines[0])[0]
- config = NetworkConfig(name).parse_lines(lines)
- outputs.append(config)
-
- return outputs
-
-
-class WiredConfig(Command):
-
- name = "ethtool"
-
- argument_count = 1
-
- output_patterns = {
- "advertised_auto_negotiation": r"Advertised auto-negotiation:\s+(.*)",
- "advertised_link_modes": r"Advertised link modes:\s+(.*)",
- "auto_negotiation": r"Auto-negotiation:\s+(.*)",
- "current_message_level": r"Current message level:\s+(.*)",
- "duplex": r"Duplex:\s+(.*)",
- "link_detected": r"Link detected:\s+(.*)",
- "phyad": r"PHYAD:\s+(.*)",
- "port": r"Port:\s+(.*)",
- "speed": r"Speed:\s+(.*)/s",
- "supported_auto_negotiation": r"Supports auto-negotiation:\s+(.*)",
- "supported_link_modes": r"Supported link modes:\s+(.*)",
- "supported_ports": r"Supported ports:\s+(.*)",
- "supports_wake_on": r"Supports Wake-on:\s+(.*)",
- "transceiver": r"Transceiver:\s+(.*)",
- "wake_on": r"Wake-on:\s+(.*)"}
-
- def parse_lines(self, lines):
- new_lines = []
- # Skip header line
- for line in lines[1:]:
- if not re.search(r": ", line):
- new_lines[-1] += " " + line
- else:
- new_lines.append(line)
-
- return super(WiredConfig, self).parse_lines(new_lines)
-
-
-class WirelessConfig(Command):
-
- name = "iwconfig"
-
- argument_count = 1
-
- fraction = r"\d+(/\d+)?"
- numeric = r"[\d\.]+"
- numeric_with_unit = r"%s( %s)?" % (numeric, Command.non_space)
-
- output_patterns = {
- "access_point": r"Access Point: (.*)",
- "bit_rate": r"Bit Rate[=:](%s)/s" % numeric_with_unit,
- "channel": r"Channel=(%s)" % Command.non_space,
- "essid": r"ESSID:\"?([^\"]+)\"?",
- "fragment_thr": r"Fragment thr:(\w+)",
- "frequency": r"Frequency:(%s)" % numeric_with_unit,
- "invalid_misc": r"Invalid misc:(\d+)",
- "link_quality": r"Link Quality[=:](%s)" % fraction,
- "missed_beacon": r"Missed beacon:(\d+)",
- "mode": r"Mode:(%s)" % Command.non_space,
- "noise_level": r"Noise level[=:](%s)" % numeric_with_unit,
- "power_management": r"Power Management:(.*)",
- "retry_limit": r"Retry limit:(\w+)",
- "rts_thr": r"RTS thr:(\w+)",
- "rx_invalid_crypt": r"Rx invalid crypt:(\d+)",
- "rx_invalid_frag": r"Rx invalid frag:(\d+)",
- "rx_invalid_nwid": r"Rx invalid nwid:(\d+)",
- "sensitivity": r"Sensitivity=(%s)" % fraction,
- "signal_level": r"Signal level[=:](%s)" % numeric_with_unit,
- "tx_excessive_retries": r"Tx excessive retries:(\d+)",
- "tx_power": r"Tx-Power=(%s)" % numeric_with_unit}
-
-
-class Ping(Command):
-
- name = "ping"
-
- argument_count = 1
-
- option_strings = {
- "count": "-c %d",
- "flood": "-f",
- "interface": "-I %s",
- "quiet": "-q",
- "size": "-s %d",
- "ttl": "-t %d"}
-
- option_defaults = {
- "count": 1,
- "quiet": True}
-
- ms = r"\d+\.\d+"
- rtt = (ms, ms, ms, ms)
-
- output_patterns = {
- "packet_loss": r"(\d+)% packet loss,",
- "packets_received": r"(\d+) received,",
- "packets_transmitted": r"(\d+) packets transmitted,",
- "rtt_avg": r"rtt min/avg/max/mdev = %s/(%s)/%s/%s ms" % rtt,
- "rtt_max": r"rtt min/avg/max/mdev = %s/%s/(%s)/%s ms" % rtt,
- "rtt_mdev": r"rtt min/avg/max/mdev = %s/%s/%s/(%s) ms" % rtt,
- "rtt_min": r"rtt min/avg/max/mdev = (%s)/%s/%s/%s ms" % rtt,
- "time": r"time (\d+)ms"}
-
- def parse_lines(self, lines):
- # Skip ping lines
- return super(Ping, self).parse_lines(lines[-2:])
-
-
-class PingLarge(Ping):
-
- # Some wired environments can handle the maximum ping packet
- # size, (65507+28)=65535 bytes. With a count of 191 packets, 65535
- # bytes/packet, 8 bits/byte, the sum payload is 100137480 bits ~
- # 100Mb. This is preferred and will be tried first.
- packet_size = 65507
- packet_count = 191
-
- option_defaults = {
- "count": packet_count,
- "flood": True,
- "quiet": True,
- "size": packet_size,
- "ttl": 1}
-
-
-class PingSmall(PingLarge):
-
- # If the large packet test was too lossy, we fall back to a packet
- # equal to the default MTU size of 1500, (1472+28)=1500 bytes.
- # With a count of 8334 packets, 1500 bytes/packet, 8 bits/byte, the
- # sum payload is 100008000 bits ~ 100Mb.
- packet_size = 1472
- packet_count = 8334
-
- option_defaults = PingLarge.option_defaults.copy()
- option_defaults.update({
- "count": packet_count,
- "size": packet_size})
-
-
-class PingHost(Command):
-
- output_patterns = {
- "host": r"(?:Host|Nmap scan report for) (%s)" % NetworkConfig.ipv4,
- "mac_address": r"MAC Address: (%s)" % NetworkConfig.mac_address}
-
-
-class PingScan(Command):
-
- name = "nmap -n -sP"
-
- argument_count = 1
-
- def parse_lines(self, lines):
- hosts = []
- host_lines = []
- # Skip header lines
- for line in lines[1:]:
- host_lines.append(line)
- if line.startswith("MAC Address"):
- host = PingHost().parse_lines(host_lines)
- hosts.append(host)
- host_lines = []
-
- return hosts
-
-
-class Ip(object):
-
- def __init__(self, address):
- self.address = address
- self.binary = self._address_to_binary(address)
-
- def __str__(self):
- return self.address
-
- def _address_to_binary(self, address):
- binary = 0
- for position, part in enumerate(address.split(".")):
- if position >= 4:
- raise ValueError("Address contains more than four parts.")
- try:
- if not part:
- part = 0
- else:
- part = int(part)
- if not 0 <= part < 256:
- raise ValueError
- except ValueError:
- raise ValueError("Address part out of range.")
- binary <<= 8
- binary += part
- return binary
-
- def count_1_bits(self):
- ret = 0
- num = self.binary
- while num > 0:
- num = num >> 1
- ret += 1
- return ret
-
- def count_0_bits(self):
- num = int(self.binary)
- if num < 0:
- raise ValueError("Only positive Numbers please: %s" % (num))
- ret = 0
- while num > 0:
- if num & 1 == 1:
- break
- num = num >> 1
- ret += 1
- return ret
-
-
-class IpRange(object):
-
- def __init__(self, address, netmask):
- self.address = Ip(address)
- self.netmask = Ip(netmask)
- self.prefix = self._netmask_to_prefix(self.netmask)
-
- def __str__(self):
- return "%s/%s" % (self.address, self.prefix)
-
- def _check_netmask(self, masklen):
- num = int(self.netmask.binary)
- bits = masklen
-
- # remove zero bits at the end
- while (num & 1) == 0:
- num = num >> 1
- bits -= 1
- if bits == 0:
- break
- # now check if the rest consists only of ones
- while bits > 0:
- if (num & 1) == 0:
- raise ValueError("Netmask %s can't be expressed as an prefix."
- % (hex(self.netmask.binary)))
- num = num >> 1
- bits -= 1
-
- def _netmask_to_prefix(self, netmask):
- netlen = netmask.count_0_bits()
- masklen = netmask.count_1_bits()
- self._check_netmask(masklen)
- return masklen - netlen
-
- def contains(self, address):
- address = Ip(address)
- if self.address.binary & self.netmask.binary \
- == address.binary & self.netmask.binary:
- return True
-
- return False
-
- def scan(self, max=None):
- scan = PingScan(str(self)).run()
- targets = [s.host for s in scan]
- random.shuffle(targets)
-
- if max is not None:
- targets = targets[:max]
-
- return targets
-
-
-class NetworkManagerException(Exception):
-
- pass
-
-
-class NetworkManager(object):
-
- NM_SERVICE = "org.freedesktop.NetworkManager"
- NM_PATH = "/org/freedesktop/NetworkManager"
- NM_INTERFACE = NM_SERVICE
-
- NM_PATH_DEVICES = "/org/freedesktop/NetworkManager/Devices"
- NM_INTERFACE_DEVICES = "org.freedesktop.NetworkManager.Devices"
-
- NMI_SERVICE = "org.freedesktop.NetworkManagerInfo"
- NMI_PATH = "/org/freedesktop/NetworkManagerInfo"
- NMI_INTERFACE = NMI_SERVICE
-
- HAL_SERVICE = "org.freedesktop.Hal"
- HAL_PATH = "/org/freedesktop/Hal/Manager"
- HAL_INTERFACE = "org.freedesktop.Hal.Manager"
- HAL_INTERFACE_DEVICE = "org.freedesktop.Hal.Device"
-
- #http://projects.gnome.org/NetworkManager/developers/
- #NetworkManager D-Bus API Specifications, look for the
- #NM_STATE enumeration to see which statuses indicate connection
- #established and put them in this list. "3" works for NM 0.7
- #and 0.8, while "60" and "70" work for NM 0.9.
- STATES_CONNECTED = [3, 60, 70]
-
- def __init__(self):
- try:
- import dbus
- except ImportError:
- raise NetworkManagerException("Python module not found: dbus")
-
- try:
- self._bus = dbus.SystemBus()
- self.nm_object = self._bus.get_object(self.NM_SERVICE,
- self.NM_PATH)
- self.nm_service = dbus.Interface(self.nm_object, self.NM_INTERFACE)
- except dbus.exceptions.DBusException:
- raise NetworkManagerException("Failed to connect to dbus service")
-
- def is_connected(self):
- state = self.nm_service.state()
- return state in self.STATES_CONNECTED
-
-
-class Application(object):
-
- def __init__(self, targets, interfaces, scan):
- self.targets = targets
- self.interfaces = interfaces
- self.scan = scan
-
- def test_interface(self, interface, targets):
- logging.info("Testing %s at %s-Mbps", interface.name, interface.speed)
- for target in targets:
- ping = PingLarge(target, interface=interface.name)
- result = ping.run()
- if result.packet_loss:
- ping = PingSmall(target, interface=interface.name)
- result = ping.run()
- if result.packet_loss:
- logging.warning("SKIP: Non-zero packet loss (%s%%) "
- "for [%s] [%s]->[%s]",
- result.packet_loss, interface.name,
- interface.inet_addr, target)
- continue
-
- mbps = (8 * (ping.packet_size + 28) * ping.packet_count
- / result.time / 1000)
- percent = (100 * 8 * (ping.packet_size + 28) * ping.packet_count
- / result.time / 1000 / interface.speed)
- if percent >= 10:
- logging.info("PASS: Effective rate: %3.4f Mbps, "
- "%3.2f%% of theoretical max (%5.2f Mbps)",
- mbps, percent, interface.speed)
- return True
- else:
- logging.warning("Unacceptable network effective rate found for [%s]" % interface.name)
- logging.warning("Effective rate %3.4f Mbps, %3.2f%% of theoretical max (%5.2f Mbps)" %
- (mbps, percent, interface.speed))
- return False
-
- def run(self):
- logging.debug("Acquiring network Interfaces")
- if self.interfaces:
- interfaces = [NetworkConfig(i).run() for i in self.interfaces]
- else:
- interfaces = NetworkConfigs().run()
- interfaces = [i for i in interfaces if i.inet_addr]
-
- for interface in interfaces:
- if not interface.inet_addr:
- logging.debug("No network address for [%s]", interface.name)
- continue
-
- targets = []
- ip_range = IpRange(interface.inet_addr, interface.netmask)
- if self.targets:
- for target in self.targets:
- if ip_range.contains(target):
- targets.append(target)
- elif interface.name != "lo":
- targets = ip_range.scan(self.scan)
- logging.info("The following targets were found for %s:" % interface.name)
- for target in targets:
- logging.info("\t%s" % target)
-
- if not targets:
- logging.debug("No targets found for [%s]", interface.name)
- continue
-
- if not self.test_interface(interface, targets):
- return False
-
- return True
-
-
-class ApplicationManager(object):
-
- application_factory = Application
-
- default_log_level = "critical"
- default_scan = 1
- default_timeout = 60
-
- def get_parser(self, args):
- usage = "Usage: %prog [TARGETS]"
-
- parser = OptionParser(usage=usage)
- parser.add_option("-i", "--interface",
- dest="interfaces",
- action="append",
- type="string",
- default=[],
- help="Interface to test.")
- parser.add_option("-s", "--scan",
- default=self.default_scan,
- type="int",
- help="Number of targets to scan when not provided.")
- parser.add_option("-t", "--timeout",
- default=self.default_timeout,
- type="int",
- help="Time to wait for network manager to connect.")
- parser.add_option("-l", "--log",
- metavar="FILE",
- help="The file to write the log to.")
- parser.add_option("--log-level",
- default=self.default_log_level,
- help=("One of debug, info, warning, "
- "error or critical."))
-
- return parser
-
- def check_uid(self):
- return os.getuid() == 0
-
- def check_network(self, timeout):
- try:
- nm = NetworkManager()
- except NetworkManagerException:
- return True
-
- start = datetime.now()
- while True:
- if nm.is_connected():
- return True
- if datetime.now() - start > timedelta(seconds=timeout):
- return False
- sleep(5)
-
- def create_application(self, args=sys.argv[1:]):
- parser = self.get_parser(args)
- (options, args) = parser.parse_args(args)
-
- log_level = logging.getLevelName(options.log_level.upper())
- log_handlers = []
- if options.log:
- log_filename = options.log
- log_handlers.append(FileHandler(log_filename))
- else:
- log_handlers.append(StreamHandler())
-
- # Logging setup
- format = ("%(asctime)s %(levelname)-8s %(message)s")
- date_format = '%Y-%m-%d %H:%M:%S'
- if log_handlers:
- for handler in log_handlers:
- handler.setFormatter(Formatter(format, date_format))
- logging.getLogger().addHandler(handler)
- if log_level:
- logging.getLogger().setLevel(log_level)
- elif not logging.getLogger().handlers:
- logging.disable(logging.CRITICAL)
-
- if not self.check_uid():
- parser.error("Must be run as root.")
-
- if not self.check_network(options.timeout):
- parser.error("Network devices must be configured and connected to a LAN segment before testing")
-
- targets = args
- return self.application_factory(targets,
- options.interfaces, options.scan)
-
-
-def main():
- application_manager = ApplicationManager()
- application = application_manager.create_application()
- if not application.run():
- return 1
-
- return 0
-
-
-if __name__ == "__main__":
- sys.exit(main())
diff --git a/bin/network_device_info b/bin/network_device_info
index 19e5b68..9aa4f90 100755
--- a/bin/network_device_info
+++ b/bin/network_device_info
@@ -126,12 +126,9 @@ class NetworkingDevice():
try:
stream = check_output(cmd, stderr=STDOUT, universal_newlines=True)
except CalledProcessError as err:
- print("Error running %s:" % ' '.join(cmd), file=sys.stderr)
- print(err.output, file=sys.stderr)
return None
if not stream:
- print("Error: modinfo returned nothing", file=sys.stderr)
return None
else:
parser = ModinfoParser(stream)
@@ -193,7 +190,11 @@ def match_counts(nm_devices, udev_devices, devtype):
"""
# now check that the count (by type) matches
nm_type_devices = [dev for dev in nm_devices if dev.gettype() in devtype]
- udevtype = 'WIRELESS' if devtype == 'WiFi' else 'NETWORK'
+ udevtype = 'NETWORK'
+ if devtype == 'WiFi':
+ udevtype = 'WIRELESS'
+ elif devtype == 'Modem':
+ udevtype = 'WWAN'
udev_type_devices = [
udev
for udev in udev_devices
@@ -262,7 +263,9 @@ def main(args):
if not match_counts(nm_devices, udev_devices, "WiFi"):
return 1
- elif not match_counts(nm_devices, udev_devices, ("Ethernet", "Modem")):
+ elif not match_counts(nm_devices, udev_devices, "Ethernet"):
+ return 1
+ elif not match_counts(nm_devices, udev_devices, "Modem"):
return 1
else:
return 0
diff --git a/bin/removable_storage_test b/bin/removable_storage_test
index 3502455..ddd4e1b 100755
--- a/bin/removable_storage_test
+++ b/bin/removable_storage_test
@@ -177,11 +177,120 @@ class DiskTest():
Indirectly sets:
self.rem_disks{,_nm,_memory_cards,_memory_cards_nm,_speed}
"""
- bus, loop = connect_to_system_bus()
- if is_udisks2_supported(bus):
- self._probe_disks_udisks2(bus)
+ if "SNAP" in os.environ:
+ self._probe_disks_udisks2_cli()
else:
- self._probe_disks_udisks1(bus)
+ bus, loop = connect_to_system_bus()
+ if is_udisks2_supported(bus):
+ self._probe_disks_udisks2(bus)
+ else:
+ self._probe_disks_udisks1(bus)
+
+ def _probe_disks_udisks2_cli(self):
+ # First we will build up a db of udisks info by scraping the output
+ # of the dump command
+ # TODO: remove the snap prefix when the alias becomes available
+ proc = subprocess.Popen(['udisks2.udisksctl', 'dump'],
+ stdout=subprocess.PIPE)
+ udisks_devices = {}
+ current_bd = None
+ current_interface = None
+ while True:
+ line = proc.stdout.readline().decode(sys.stdout.encoding)
+ if line == '':
+ break
+ if line == '\n':
+ current_bd = None
+ current_interface = None
+ if line.startswith('/org/freedesktop/UDisks2/'):
+ path = line.strip()
+ current_bd = os.path.basename(path).rstrip(':')
+ udisks_devices[current_bd] = {}
+ continue
+ if current_bd is None:
+ continue
+ if line.startswith(' org.freedesktop'):
+ current_interface = line.strip().rstrip(':')
+ udisks_devices[current_bd][current_interface] = {}
+ continue
+ if current_interface is None:
+ continue
+ entry = ''.join(c for c in line if c not in '\n\t\' ')
+ wanted_keys = ('Device:', 'Drive:', 'MountPoints:', 'Vendor:',
+ 'ConnectionBus:', 'Model:', 'Media:',)
+ for key in wanted_keys:
+ if entry.startswith(key):
+ udisks_devices[current_bd][current_interface][key] = (
+ entry[len(key):])
+
+ # Now use the populated udisks structure to fill out the API used by
+ # other _probe disks functions
+ for device, interfaces in udisks_devices.items():
+ # iterate over udisks objects that have both filesystem and
+ # block device interfaces
+ if (UDISKS2_FILESYSTEM_INTERFACE in interfaces and
+ UDISKS2_BLOCK_INTERFACE in interfaces):
+ # To be an IO candidate there must be a drive object
+ drive = interfaces[UDISKS2_BLOCK_INTERFACE].get('Drive:')
+ if drive is None or drive is '/':
+ continue
+ drive_object = udisks_devices[os.path.basename(drive)]
+
+ # Get the connection bus property from the drive interface of
+ # the drive object. This is required to filter out the devices
+ # we don't want to look at now.
+ connection_bus = (
+ drive_object[UDISKS2_DRIVE_INTERFACE]['ConnectionBus:'])
+ desired_connection_buses = set([
+ map_udisks1_connection_bus(device)
+ for device in self.device])
+ # Skip devices that are attached to undesired connection buses
+ if connection_bus not in desired_connection_buses:
+ continue
+
+ dev_file = (
+ interfaces[UDISKS2_BLOCK_INTERFACE].get('Device:'))
+
+ parent = self._find_parent(dev_file.replace('/dev/', ''))
+ if (parent and
+ find_pkname_is_root_mountpoint(parent, self.lsblk)):
+ continue
+
+ # XXX: we actually only scrape the first one currently
+ mount_point = (
+ interfaces[UDISKS2_FILESYSTEM_INTERFACE].get(
+ 'MountPoints:'))
+ if mount_point == '':
+ mount_point = None
+
+ # We need to skip-non memory cards if we look for memory cards
+ # and vice-versa so let's inspect the drive and use heuristics
+ # to detect memory cards (a memory card reader actually) now.
+ if self.memorycard != is_memory_card(
+ drive_object[UDISKS2_DRIVE_INTERFACE]['Vendor:'],
+ drive_object[UDISKS2_DRIVE_INTERFACE]['Model:'],
+ drive_object[UDISKS2_DRIVE_INTERFACE]['Media:']):
+ continue
+
+ if mount_point is None:
+ self.rem_disks_memory_cards_nm[dev_file] = None
+ self.rem_disks_nm[dev_file] = None
+ else:
+ self.rem_disks_memory_cards[dev_file] = mount_point
+ self.rem_disks[dev_file] = mount_point
+
+ # Get the speed of the interconnect that is associated with the
+ # block device we're looking at. This is purely informational
+ # but it is a part of the required API
+ udev_devices = get_udev_block_devices(GUdev.Client())
+ for udev_device in udev_devices:
+ if udev_device.get_device_file() == dev_file:
+ interconnect_speed = get_interconnect_speed(udev_device)
+ if interconnect_speed:
+ self.rem_disks_speed[dev_file] = (
+ interconnect_speed * 10 ** 6)
+ else:
+ self.rem_disks_speed[dev_file] = None
def _probe_disks_udisks2(self, bus):
"""
diff --git a/bin/removable_storage_watcher b/bin/removable_storage_watcher
index 794bb1b..88cae9b 100755
--- a/bin/removable_storage_watcher
+++ b/bin/removable_storage_watcher
@@ -411,7 +411,7 @@ class UDisks2StorageDeviceListener:
UDISKS2_DRIVE_PROPERTY_CONNECTION_BUS = "ConnectionBus"
def __init__(self, system_bus, loop, action, devices, minimum_speed,
- memorycard):
+ memorycard, unmounted = False):
# Store the desired minimum speed of the device in Mbit/s. The argument
# is passed as the number of bits per second so let's fix that.
self._desired_minimum_speed = minimum_speed / 10 ** 6
@@ -421,6 +421,9 @@ class UDisks2StorageDeviceListener:
map_udisks1_connection_bus(device) for device in devices])
# Check if we are explicitly looking for memory cards
self._desired_memory_card = memorycard
+ # Store information whether we also want detected, but unmounted
+ # devices too
+ self._allow_unmounted = unmounted
# Store the desired "delta" direction depending on
# whether we test for insertion or removal
if action == "insert":
@@ -561,7 +564,10 @@ class UDisks2StorageDeviceListener:
# Skip objects we already ignored and complained about before
if object_path in self._ignored_objects:
continue
- needs = set(('block-fs', 'partition', 'non-empty', 'mounted'))
+ needs = set(('block-fs', 'partition', 'non-empty'))
+ if not self._allow_unmounted:
+ needs.add('mounted')
+
# As a special exception when the ConnectionBus is allowed to be
# empty, as is the case with eSATA devices, do not require the
# filesystem to be mounted as gvfs may choose not to mount it
@@ -851,6 +857,8 @@ def main():
dest='logging_level', help="Enable verbose output")
parser.add_argument('--debug', action='store_const', const=logging.DEBUG,
dest='logging_level', help="Enable debugging")
+ parser.add_argument('--unmounted', action='store_true',
+ help="Don't require drive being automounted")
parser.set_defaults(logging_level=logging.WARNING)
args = parser.parse_args()
@@ -873,7 +881,8 @@ def main():
logging.debug("Using UDisks2 interface")
listener = UDisks2StorageDeviceListener(
system_bus, loop,
- args.action, args.device, args.minimum_speed, args.memorycard)
+ args.action, args.device, args.minimum_speed, args.memorycard,
+ args.unmounted)
else:
# Construct the listener with all of the arguments provided on the
# command line and the explicit system_bus, loop objects.
diff --git a/bin/storage_test b/bin/storage_test
index 5f7516b..da4b717 100755
--- a/bin/storage_test
+++ b/bin/storage_test
@@ -116,7 +116,7 @@ fi
echo "Set disk to $disk"
scripted_mount=0
-if [ -b $disk ]
+if [ -b "$disk" ]
then
echo "$disk is a block device"
@@ -133,7 +133,7 @@ then
# Regex changed to better handle when $disk appears more than once
# in parted output (such as in warning messages or not caught in the
# check above)
- size=`parted -l -s |grep "Disk.*${disk}" |awk '{print $3}'`
+ size=`parted -l -s 2>&1 | grep "Disk .*${disk}:" | awk '{print $3}'`
if [ -n "$size" ]
then
@@ -163,15 +163,15 @@ then
fi
- if [ $size_range == "KB" ]
+ if [ "$size_range" == "KB" ]
then
echo "$disk size reported in KB, seems to be too small for testing."
exit 1
- elif [ $size_range == "MB" ]
+ elif [ "$size_range" == "MB" ]
then
size_int=${size::${#size}-2}
- if [ $size_int -gt 10 ]
+ if [ "$size_int" -gt 10 ]
then
run_bonnie $disk
if [[ $scripted_mount == 1 ]]