summaryrefslogtreecommitdiff
path: root/bin
diff options
Diffstat (limited to 'bin')
-rwxr-xr-xbin/boot_mode_test96
-rwxr-xr-xbin/bt_connect135
-rw-r--r--bin/bt_helper.py300
-rwxr-xr-xbin/camera_test157
-rwxr-xr-xbin/camera_test_legacy578
-rwxr-xr-xbin/connect_wireless6
-rwxr-xr-xbin/cpu_offlining32
-rwxr-xr-xbin/cpu_stress70
-rwxr-xr-xbin/cpu_topology23
-rwxr-xr-xbin/disk_cpu_load138
-rwxr-xr-xbin/disk_info81
-rwxr-xr-xbin/disk_smart269
-rwxr-xr-xbin/disk_stress_ng242
-rwxr-xr-xbin/dmitest2
-rwxr-xr-xbin/fwts_test29
-rwxr-xr-xbin/graphics_env31
-rwxr-xr-xbin/key_test2
-rwxr-xr-xbin/memory_compare12
-rwxr-xr-xbin/memory_stress_ng158
-rwxr-xr-xbin/network97
-rwxr-xr-xbin/network_device_info9
-rwxr-xr-xbin/pm_test46
-rwxr-xr-xbin/pulse-active-port-change1
-rwxr-xr-xbin/removable_storage_test83
-rwxr-xr-xbin/removable_storage_watcher2
-rwxr-xr-xbin/sleep_time_check12
-rwxr-xr-xbin/touchpad_test38
-rwxr-xr-xbin/virtualization144
28 files changed, 2463 insertions, 330 deletions
diff --git a/bin/boot_mode_test b/bin/boot_mode_test
new file mode 100755
index 0000000..d4c2956
--- /dev/null
+++ b/bin/boot_mode_test
@@ -0,0 +1,96 @@
+#!/usr/bin/env python3
+"""
+Test that the computer booted in EFI mode, with Secure Boot active.
+
+Copyright (C) 2016 Canonical Ltd.
+
+Authors:
+ Rod Smith <rod.smith@canonical.com>
+
+This program is free software: you can redistribute it and/or modify
+it under the terms of the GNU General Public License version 3,
+as published by the Free Software Foundation.
+
+This program is distributed in the hope that it will be useful,
+but WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+GNU General Public License for more details.
+
+You should have received a copy of the GNU General Public License
+along with this program. If not, see <http://www.gnu.org/licenses/>.
+"""
+
+import os
+import sys
+import logging
+from argparse import ArgumentParser
+
+
+def efi_boot_check():
+ """Test that the computer booted in EFI mode
+
+ :returns:
+ 0 if /sys/firmware/efivars exists meaning booted in EFI mode
+ 1 if booted in BIOS mode
+ """
+ efi_dir = "/sys/firmware/efi/"
+ if os.path.isdir(efi_dir):
+ logging.info("PASS: System booted in EFI mode")
+ return 0
+ else:
+ logging.error("FAIL: System did not boot in EFI mode")
+ return 1
+
+
+def secure_boot_check():
+ """Test that the computer booted with Secure Boot active.
+
+ :returns:
+ 0 if Secure Boot is active
+ 1 if Secure Boot is inactive (could be disabled, not supported,
+ or not booted in EFI mode)
+ """
+ sb_dir = "/sys/firmware/efi/efivars/"
+ sb_var = sb_dir + "SecureBoot-8be4df61-93ca-11d2-aa0d-00e098032b8c"
+ if os.path.isdir(sb_dir):
+ if os.path.isfile(sb_var):
+ sb_info = open(sb_var).read()
+ if ord(sb_info[4]) == 1:
+ logging.info("PASS: System booted with Secure Boot active.")
+ return 0
+ else:
+ logging.error("FAIL: System booted with "
+ "Secure Boot available but inactive.")
+ return 1
+ else:
+ # NOTE: Normally, lack of sb_var indicates that the system
+ # doesn't support SB, as on many pre-Windows 8 UEFI systems.
+ # Below is therefore a bit harsh, but is done to ensure that
+ # no system slips through because it supports Secure Boot but
+ # does not create the sb_var when SB is inactive or has never
+ # been activated.
+ logging.error("FAIL: System does not appear to support "
+ "Secure Boot.")
+ return 1
+ else:
+ logging.info("FAIL: System did NOT boot in EFI mode.")
+ return 1
+
+
+def main():
+ parser = ArgumentParser()
+ parser.add_argument('check',
+ choices=['efi', 'secureboot'],
+ help='The type of check to perform')
+ args = parser.parse_args()
+
+ FORMAT = '%(levelname)s: %(message)s'
+ logging.basicConfig(level=logging.INFO, format=FORMAT)
+ if args.check == 'efi':
+ return efi_boot_check()
+ else:
+ return secure_boot_check()
+
+
+if __name__ == '__main__':
+ sys.exit(main())
diff --git a/bin/bt_connect b/bin/bt_connect
new file mode 100755
index 0000000..0ef5169
--- /dev/null
+++ b/bin/bt_connect
@@ -0,0 +1,135 @@
+#!/usr/bin/env python3
+#
+# This file is part of Checkbox.
+#
+# Copyright 2016 Canonical Ltd.
+#
+# Authors:
+# Po-Hsu Lin <po-hsu.lin@canonical.com>
+# Yung Shen <yung.shen@canonical.com>
+#
+# Checkbox is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License version 3,
+# as published by the Free Software Foundation.
+#
+# Checkbox is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with Checkbox. If not, see <http://www.gnu.org/licenses/>.
+
+import sys
+import time
+
+import bt_helper
+
+from argparse import ArgumentParser
+
+
+def unpair_all(devices, manager):
+ """ Unpairing paired devices and scanning again for rerun jobs."""
+ for dev in devices:
+ try:
+ print("INFO: Unpairing", dev)
+ dev.unpair()
+ except bt_helper.BtException as exc:
+ print("Warning: Unpairing failed", exc)
+ else:
+ # print(flush=True) to bypass plainbox output buffer,
+ # see LP: #1569808 for more details.
+ print("Please reset the device to pairing mode in 13 seconds",
+ flush=True)
+ time.sleep(13)
+ print("INFO: Re-scaning for devices in pairing mode", flush=True)
+ manager.scan()
+
+
+def main():
+ """Add argument parser here and do most of the job."""
+ parser = ArgumentParser(description=("Bluetooth auto paring and connect. "
+ "Please select one option."))
+ group = parser.add_mutually_exclusive_group(required=True)
+ group.add_argument("--mac", type=str,
+ help="Pair with a given MAC, not using scan result,")
+ group.add_argument("--mouse", action="store_const",
+ const="input-mouse", dest="target",
+ help="List and pair with mouse devices")
+ group.add_argument("--keyboard", action="store_const",
+ const="input-keyboard", dest="target",
+ help="List and pair with keyboard devices")
+ args = parser.parse_args()
+
+ manager = bt_helper.BtManager()
+ # Power on bluetooth adapter and scanning devices in advance.
+ manager.ensure_adapters_powered()
+ manager.scan()
+
+ if args.mac:
+ # TODO check MAC format
+ print("INFO: Trying to pair with {}".format(args.mac))
+ device = list(manager.get_bt_devices(filters={'Address': args.mac}))
+ paired_device = list(manager.get_bt_devices(
+ filters={'Address': args.mac, 'Paired': True}))
+ if not device:
+ print("ERROR: No pairable device found, terminating")
+ return 1
+
+ unpair_all(paired_device, manager)
+
+ for dev in device:
+ try:
+ dev.pair()
+ except bt_helper.BtException as exc:
+ print("ERROR: Unable to pair: ", exc)
+ return 1
+ else:
+ print("INFO: Device paired")
+ return 0
+ else:
+ print("INFO: Listing targeting devices")
+ # Listing device based on RSSI
+ paired_targets = list(manager.get_bt_devices(category=bt_helper.BT_ANY,
+ filters={'Paired': True, 'Icon': args.target}))
+ if not paired_targets:
+ print("INFO: No paired targeting devices found")
+ manager.scan()
+ else:
+ unpair_all(paired_targets, manager)
+
+ target_devices = sorted(manager.get_bt_devices(
+ category=bt_helper.BT_ANY, filters={
+ 'Paired': False, 'Icon': args.target}),
+ key=lambda x: int(x.rssi or -255), reverse=True)
+ if not target_devices:
+ print("ERROR: No target devices found, terminating")
+ return 1
+ print("INFO: Detected devices (sorted by RSSI; highest first).")
+ # let's assing numbers to devices
+ devices = dict(enumerate(target_devices, 1))
+ for num, dev in devices.items():
+ print("{}. {} (RSSI: {})".format(num, dev, dev.rssi))
+ chosen = False
+ while not chosen:
+ print("Which one would you like to connect to? (0 to exit)")
+ num = input()
+ # TODO: enter as default to 1st device
+ if num == '0':
+ return 1
+ chosen = num.isnumeric() and int(num) in devices.keys()
+ print("INFO: {} chosen.".format(devices[int(num)]))
+ print("INFO: Pairing selected device..")
+ try:
+ devices[int(num)].pair()
+ except bt_helper.BtException as exc:
+ print("ERROR: something wrong: ", exc)
+ return 1
+ else:
+ print("Paired successfully.")
+ return 0
+ # capture all other silence failures
+ return 1
+
+if __name__ == "__main__":
+ sys.exit(main())
diff --git a/bin/bt_helper.py b/bin/bt_helper.py
new file mode 100644
index 0000000..91f879a
--- /dev/null
+++ b/bin/bt_helper.py
@@ -0,0 +1,300 @@
+# Copyright 2016 Canonical Ltd.
+# Written by:
+# Maciej Kisielewski <maciej.kisielewski@canonical.com>
+#
+# This is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License version 3,
+# as published by the Free Software Foundation.
+#
+# This file is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this file. If not, see <http://www.gnu.org/licenses/>.
+"""
+This module provides a set of abstractions to ease the process of automating
+typical Bluetooth task like scanning for devices and pairing with them.
+
+It talks with BlueZ stack using dbus.
+"""
+import logging
+
+import dbus
+import dbus.service
+import dbus.mainloop.glib
+from gi.repository import GObject
+
+logger = logging.getLogger(__file__)
+logger.addHandler(logging.StreamHandler())
+
+IFACE = 'org.bluez.Adapter1'
+ADAPTER_IFACE = 'org.bluez.Adapter1'
+DEVICE_IFACE = 'org.bluez.Device1'
+AGENT_IFACE = 'org.bluez.Agent1'
+
+dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
+
+# To get additional Bluetoot CoDs, check
+# https://www.bluetooth.com/specifications/assigned-numbers/baseband
+BT_ANY = 0
+BT_KEYBOARD = int('0x2540', 16)
+
+
+class BtException(Exception):
+ pass
+
+
+class BtManager:
+ """ Main point of contact with dbus factoring bt objects. """
+ def __init__(self, verbose=False):
+ if verbose:
+ logger.setLevel(logging.DEBUG)
+ self._bus = dbus.SystemBus()
+ self._bt_root = self._bus.get_object('org.bluez', '/')
+ self._manager = dbus.Interface(
+ self._bt_root, 'org.freedesktop.DBus.ObjectManager')
+ self._main_loop = GObject.MainLoop()
+ self._register_agent()
+
+ def _register_agent(self):
+ path = "/bt_helper/agent"
+ BtAgent(self._bus, path)
+ obj = self._bus.get_object('org.bluez', "/org/bluez")
+ agent_manager = dbus.Interface(obj, "org.bluez.AgentManager1")
+ agent_manager.RegisterAgent(path, 'NoInputNoOutput')
+ logger.info("Agent registered")
+
+ def _get_objects_by_iface(self, iface_name):
+ for path, ifaces in self._manager.GetManagedObjects().items():
+ if ifaces.get(iface_name):
+ yield self._bus.get_object('org.bluez', path)
+
+ def get_bt_adapters(self):
+ """Yield BtAdapter objects for each BT adapter found."""
+ for adapter in self._get_objects_by_iface(ADAPTER_IFACE):
+ yield BtAdapter(dbus.Interface(adapter, ADAPTER_IFACE), self)
+
+ def get_bt_devices(self, category=BT_ANY, filters={}):
+ """Yields BtDevice objects currently known to the system.
+
+ filters - specifies the characteristics of that a BT device must have
+ to be yielded. The keys of filters dictionary represent names of
+ parameters (as specified by the bluetooth DBus Api and represented by
+ DBus proxy object), and its values must match proxy values.
+ I.e. {'Paired': False}. For a full list of Parameters see:
+ http://git.kernel.org/cgit/bluetooth/bluez.git/tree/doc/device-api.txt
+
+ Note that this function returns objects corresponding to BT devices
+ that were seen last time scanning was done."""
+ for device in self._get_objects_by_iface(DEVICE_IFACE):
+ obj = self.get_object_by_path(device.object_path)[DEVICE_IFACE]
+ try:
+ if category != BT_ANY:
+ if obj['Class'] != category:
+ continue
+ rejected = False
+ for filter in filters:
+ if obj[filter] != filters[filter]:
+ rejected = True
+ break
+ if rejected:
+ continue
+ yield BtDevice(dbus.Interface(device, DEVICE_IFACE), self)
+ except KeyError as exc:
+ logger.info('Property %s not found on device %s',
+ exc, device.object_path)
+ continue
+
+ def get_prop_iface(self, obj):
+ return dbus.Interface(self._bus.get_object(
+ 'org.bluez', obj.object_path), 'org.freedesktop.DBus.Properties')
+
+ def get_object_by_path(self, path):
+ return self._manager.GetManagedObjects()[path]
+
+ def get_proxy_by_path(self, path):
+ return self._bus.get_object('org.bluez', path)
+
+ def wait(self):
+ self._main_loop.run()
+
+ def quit_loop(self):
+ self._main_loop.quit()
+
+ def ensure_adapters_powered(self):
+ for adapter in self.get_bt_adapters():
+ adapter.ensure_powered()
+
+ def scan(self, timeout=10):
+ """Scan for BT devices visible to all adapters.'"""
+ self._bus.add_signal_receiver(
+ interfaces_added,
+ dbus_interface="org.freedesktop.DBus.ObjectManager",
+ signal_name="InterfacesAdded")
+ self._bus.add_signal_receiver(
+ properties_changed,
+ dbus_interface="org.freedesktop.DBus.Properties",
+ signal_name="PropertiesChanged",
+ arg0="org.bluez.Device1",
+ path_keyword="path")
+ for adapter in self._get_objects_by_iface(ADAPTER_IFACE):
+ try:
+ dbus.Interface(adapter, ADAPTER_IFACE).StopDiscovery()
+ except dbus.exceptions.DBusException:
+ pass
+ dbus.Interface(adapter, ADAPTER_IFACE).StartDiscovery()
+ GObject.timeout_add_seconds(timeout, self._scan_timeout)
+ self._main_loop.run()
+
+ def get_devices(self, timeout=10, rescan=True):
+ """Scan for and list all devices visible to all adapters."""
+ if rescan:
+ self.scan(timeout)
+ return list(self.get_bt_devices())
+
+ def _scan_timeout(self):
+ for adapter in self._get_objects_by_iface(ADAPTER_IFACE):
+ dbus.Interface(adapter, ADAPTER_IFACE).StopDiscovery()
+ self._main_loop.quit()
+
+
+class BtAdapter:
+ def __init__(self, dbus_iface, bt_mgr):
+ self._if = dbus_iface
+ self._bt_mgr = bt_mgr
+ self._prop_if = bt_mgr.get_prop_iface(dbus_iface)
+
+ def set_bool_prop(self, prop_name, value):
+ self._prop_if.Set(IFACE, prop_name, dbus.Boolean(value))
+
+ def ensure_powered(self):
+ """Turn the adapter on, and do nothing if already on."""
+ powered = self._prop_if.Get(IFACE, 'Powered')
+ logger.info('Powering on {}'.format(
+ self._if.object_path.split('/')[-1]))
+ if powered:
+ logger.info('Device already powered')
+ return
+ try:
+ self.set_bool_prop('Powered', True)
+ logger.info('Powered on')
+ except Exception as exc:
+ logging.error('Failed to power on - {}'.format(
+ exc.get_dbus_message()))
+
+
+class BtDevice:
+ def __init__(self, dbus_iface, bt_mgr):
+ self._if = dbus_iface
+ self._obj = bt_mgr.get_object_by_path(
+ self._if.object_path)[DEVICE_IFACE]
+ self._bt_mgr = bt_mgr
+ self._prop_if = bt_mgr.get_prop_iface(dbus_iface)
+ self._pair_outcome = None
+
+ def __str__(self):
+ return "{} ({})".format(self.name, self.address)
+
+ def __repr__(self):
+ return "<BtDevice name:{}, address:{}>".format(self.name, self.address)
+
+ def pair(self):
+ """Pair the device.
+
+ This function will try pairing with the device and block until device
+ is paired, error occured or default timeout elapsed (whichever comes
+ first).
+ """
+ self._prop_if.Set(DEVICE_IFACE, 'Trusted', True)
+ self._if.Pair(
+ reply_handler=self._pair_ok, error_handler=self._pair_error)
+ self._bt_mgr.wait()
+ if self._pair_outcome:
+ raise BtException(self._pair_outcome)
+ try:
+ self._if.Connect()
+ except dbus.exceptions.DBusException as exc:
+ logging.error('Failed to connect - {}'.format(
+ exc.get_dbus_message()))
+
+ def unpair(self):
+ self._if.Disconnect()
+ adapter = self._bt_mgr.get_proxy_by_path(self._obj['Adapter'])
+ dbus.Interface(adapter, ADAPTER_IFACE).RemoveDevice(self._if)
+
+ @property
+ def name(self):
+ return self._obj.get('Name', '<Unnamed>')
+
+ @property
+ def address(self):
+ return self._obj['Address']
+
+ @property
+ def rssi(self):
+ return self._obj.get('RSSI', None)
+
+ def _pair_ok(self):
+ logger.info('%s successfully paired', self.name)
+ self._pair_outcome = None
+ self._bt_mgr.quit_loop()
+
+ def _pair_error(self, error):
+ logger.warning('Pairing of %s device failed. %s', self.name, error)
+ self._pair_outcome = error
+ self._bt_mgr.quit_loop()
+
+
+class Rejected(dbus.DBusException):
+ _dbus_error_name = "org.bluez.Error.Rejected"
+
+
+class BtAgent(dbus.service.Object):
+ """Agent authenticating everything that is possible."""
+ @dbus.service.method(AGENT_IFACE, in_signature="os", out_signature="")
+ def AuthorizeService(self, device, uuid):
+ logger.info("AuthorizeService (%s, %s)", device, uuid)
+
+ @dbus.service.method(AGENT_IFACE, in_signature="o", out_signature="u")
+ def RequestPasskey(self, device):
+ logger.info("RequestPasskey (%s)", device)
+ passkey = input("Enter passkey: ")
+ return dbus.UInt32(passkey)
+
+ @dbus.service.method(AGENT_IFACE, in_signature="o", out_signature="s")
+ def RequestPinCode(self, device):
+ logger.info("RequestPinCode (%s)", device)
+ return input("Enter PIN Code: ")
+
+ @dbus.service.method(AGENT_IFACE, in_signature="ouq", out_signature="")
+ def DisplayPasskey(self, device, passkey, entered):
+ print("DisplayPasskey (%s, %06u entered %u)" %
+ (device, passkey, entered), flush=True)
+
+ @dbus.service.method(AGENT_IFACE, in_signature="os", out_signature="")
+ def DisplayPinCode(self, device, pincode):
+ logger.info("DisplayPinCode (%s, %s)", device, pincode)
+ print('Type following pin on your device: {}'.format(pincode),
+ flush=True)
+
+ @dbus.service.method(AGENT_IFACE, in_signature="ou", out_signature="")
+ def RequestConfirmation(self, device, passkey):
+ logger.info("RequestConfirmation (%s, %06d)", device, passkey)
+
+ @dbus.service.method(AGENT_IFACE, in_signature="o", out_signature="")
+ def RequestAuthorization(self, device):
+ logger.info("RequestAuthorization (%s)", device)
+
+ @dbus.service.method(AGENT_IFACE, in_signature="", out_signature="")
+ def Cancel(self):
+ logger.info("Cancelled")
+
+
+def properties_changed(interface, changed, invalidated, path):
+ logger.info('Property changed for device @ %s. Change: %s', path, changed)
+
+
+def interfaces_added(path, interfaces):
+ logger.info('Added new bt interfaces: %s @ %s', interfaces, path)
diff --git a/bin/camera_test b/bin/camera_test
index 235eaaf..6128c0f 100755
--- a/bin/camera_test
+++ b/bin/camera_test
@@ -164,13 +164,10 @@ class CameraTest:
"""
A simple class that displays a test image via GStreamer.
"""
- def __init__(self, args, gst_plugin=None, gst_video_type=None):
+ def __init__(self, args):
self.args = args
- self._mainloop = GObject.MainLoop()
self._width = 640
self._height = 480
- self._gst_plugin = gst_plugin
- self._gst_video_type = gst_video_type
def detect(self):
"""
@@ -204,51 +201,62 @@ class CameraTest:
else '',
' ]', sep="")
- resolutions = self._get_supported_resolutions(device)
- print(' ',
- self._supported_resolutions_to_string(resolutions).replace(
- "\n", " "),
- sep="")
+ resolutions = self._supported_resolutions_to_string(
+ self._get_supported_resolutions(device))
+ resolutions = resolutions.replace(
+ "Resolutions:", " Resolutions:")
+ resolutions = resolutions.replace("Format:", " Format:")
+ print(resolutions)
if cp.capabilities & V4L2_CAP_VIDEO_CAPTURE:
cap_status = 0
return dev_status | cap_status
+ def _on_destroy(self, *args):
+ Clutter.main_quit()
+
+ def _take_photo(self, *args):
+ Cheese.Camera.take_photo(self.camera, self.filename)
+
def led(self):
"""
Activate camera (switch on led), but don't display any output
"""
- pipespec = ("v4l2src device=%(device)s "
- "! %(type)s "
- "! %(plugin)s "
- "! testsink"
- % {'device': self.args.device,
- 'type': self._gst_video_type,
- 'plugin': self._gst_plugin})
- logging.debug("LED test with pipeline %s", pipespec)
- self._pipeline = Gst.parse_launch(pipespec)
- self._pipeline.set_state(Gst.State.PLAYING)
- time.sleep(3)
- self._pipeline.set_state(Gst.State.NULL)
+ Clutter.threads_add_timeout(0, 3000, self._on_destroy, None, None)
+ video_texture = Clutter.Actor()
+ try:
+ camera = Cheese.Camera.new(
+ video_texture, self.args.device, self._width, self._height)
+ except TypeError: # libcheese < 3.18 still use Clutter.Texture
+ video_texture = Clutter.Texture()
+ camera = Cheese.Camera.new(
+ video_texture, self.args.device, self._width, self._height)
+ Cheese.Camera.setup(camera, None)
+ Cheese.Camera.play(camera)
+ Clutter.main()
def display(self):
"""
Displays the preview window
"""
- pipespec = ("v4l2src device=%(device)s "
- "! %(type)s,width=%(width)d,height=%(height)d "
- "! %(plugin)s "
- "! autovideosink"
- % {'device': self.args.device,
- 'type': self._gst_video_type,
- 'width': self._width,
- 'height': self._height,
- 'plugin': self._gst_plugin})
- logging.debug("display test with pipeline %s", pipespec)
- self._pipeline = Gst.parse_launch(pipespec)
- self._pipeline.set_state(Gst.State.PLAYING)
- time.sleep(10)
- self._pipeline.set_state(Gst.State.NULL)
+ stage = Clutter.Stage()
+ stage.set_title('Camera test')
+ stage.set_size(self._width, self._height)
+ stage.connect('destroy', self._on_destroy)
+ Clutter.threads_add_timeout(0, 10000, self._on_destroy, None, None)
+ video_texture = Clutter.Actor()
+ try:
+ camera = Cheese.Camera.new(
+ video_texture, self.args.device, self._width, self._height)
+ except TypeError: # libcheese < 3.18 still use Clutter.Texture
+ video_texture = Clutter.Texture()
+ camera = Cheese.Camera.new(
+ video_texture, self.args.device, self._width, self._height)
+ stage.add_actor(video_texture)
+ Cheese.Camera.setup(camera, None)
+ Cheese.Camera.play(camera)
+ stage.show()
+ Clutter.main()
def still(self):
"""
@@ -272,7 +280,7 @@ class CameraTest:
"-d", self.args.device,
"-r", "%dx%d"
% (width, height), filename]
- use_gstreamer = False
+ use_cheese = False
if pixelformat:
if 'MJPG' == pixelformat: # special tweak for fswebcam
pixelformat = 'MJPEG'
@@ -281,32 +289,37 @@ class CameraTest:
try:
check_call(command, stdout=open(os.devnull, 'w'), stderr=STDOUT)
except (CalledProcessError, OSError):
- use_gstreamer = True
-
- if use_gstreamer:
- pipespec = ("v4l2src device=%(device)s "
- "! %(type)s,width=%(width)d,height=%(height)d "
- "! %(plugin)s "
- "! jpegenc "
- "! filesink location=%(filename)s"
- % {'device': self.args.device,
- 'type': self._gst_video_type,
- 'width': width,
- 'height': height,
- 'plugin': self._gst_plugin,
- 'filename': filename})
- logging.debug("still test with gstreamer and "
- "pipeline %s", pipespec)
- self._pipeline = Gst.parse_launch(pipespec)
- self._pipeline.set_state(Gst.State.PLAYING)
- time.sleep(3)
- self._pipeline.set_state(Gst.State.NULL)
+ use_cheese = True
- if not quiet:
+ if use_cheese:
+ stage = Clutter.Stage()
+ stage.connect('destroy', self._on_destroy)
+ video_texture = Clutter.Actor()
try:
- check_call(["timeout", "-k", "11", "10", "eog", filename])
- except CalledProcessError:
- pass
+ self.camera = Cheese.Camera.new(
+ video_texture, self.args.device, self._width, self._height)
+ except TypeError: # libcheese < 3.18 still use Clutter.Texture
+ video_texture = Clutter.Texture()
+ self.camera = Cheese.Camera.new(
+ video_texture, self.args.device, self._width, self._height)
+ Cheese.Camera.setup(self.camera, None)
+ Cheese.Camera.play(self.camera)
+ self.filename = filename
+ Clutter.threads_add_timeout(0, 3000, self._take_photo , None, None)
+ Clutter.threads_add_timeout(0, 4000, self._on_destroy, None, None)
+ Clutter.main()
+ Cheese.Camera.stop(self.camera)
+
+ if not quiet:
+ stage = Clutter.Stage()
+ stage.set_title('Camera still picture test')
+ stage.set_size(width, height)
+ stage.connect('destroy', self._on_destroy)
+ Clutter.threads_add_timeout(0, 10000, self._on_destroy, None, None)
+ still_texture = Clutter.Texture.new_from_file(filename)
+ stage.add_actor(still_texture)
+ stage.show()
+ Clutter.main()
def _supported_resolutions_to_string(self, supported_resolutions):
"""
@@ -555,16 +568,20 @@ if __name__ == "__main__":
# Import Gst only for the test cases that will need it
if args.test in ['display', 'still', 'led', 'resolutions']:
+ import contextlib
+ # Workaround to avoid "cluttervideosink missing"
+ # See https://bugzilla.gnome.org/show_bug.cgi?id=721277
+ with contextlib.suppress(FileNotFoundError):
+ gst_registry = '~/.cache/gstreamer-1.0/registry.x86_64.bin'
+ os.remove(os.path.expanduser(gst_registry))
+ import gi
+ gi.require_version('Gst', '1.0')
from gi.repository import Gst
- if Gst.version()[0] > 0:
- gst_plugin = 'videoconvert'
- gst_video_type = 'video/x-raw'
- else:
- gst_plugin = 'ffmpegcolorspace'
- gst_video_type = 'video/x-raw-yuv'
+ gi.require_version('Cheese', '3.0')
+ from gi.repository import Cheese
+ gi.require_version('Clutter', '1.0')
+ from gi.repository import Clutter
Gst.init(None)
- camera = CameraTest(args, gst_plugin, gst_video_type)
- else:
- camera = CameraTest(args)
-
+ Clutter.init()
+ camera = CameraTest(args)
sys.exit(getattr(camera, args.test)())
diff --git a/bin/camera_test_legacy b/bin/camera_test_legacy
new file mode 100755
index 0000000..794578b
--- /dev/null
+++ b/bin/camera_test_legacy
@@ -0,0 +1,578 @@
+#!/usr/bin/env python3
+#
+# This file is part of Checkbox.
+#
+# Copyright 2008-2012 Canonical Ltd.
+#
+# The v4l2 ioctl code comes from the Python bindings for the v4l2
+# userspace api (http://pypi.python.org/pypi/v4l2):
+# Copyright (C) 1999-2009 the contributors
+#
+# The JPEG metadata parser is a part of bfg-pages:
+# http://code.google.com/p/bfg-pages/source/browse/trunk/pages/getimageinfo.py
+# Copyright (C) Tim Hoffman
+#
+# Checkbox is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License version 3,
+# as published by the Free Software Foundation.
+
+#
+# Checkbox is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with Checkbox. If not, see <http://www.gnu.org/licenses/>.
+#
+
+import argparse
+import ctypes
+import errno
+import fcntl
+import imghdr
+import logging
+import os
+import re
+import struct
+import sys
+import time
+
+from gi.repository import GObject
+from glob import glob
+from subprocess import check_call, CalledProcessError, STDOUT
+from tempfile import NamedTemporaryFile
+
+
+_IOC_NRBITS = 8
+_IOC_TYPEBITS = 8
+_IOC_SIZEBITS = 14
+
+_IOC_NRSHIFT = 0
+_IOC_TYPESHIFT = _IOC_NRSHIFT + _IOC_NRBITS
+_IOC_SIZESHIFT = _IOC_TYPESHIFT + _IOC_TYPEBITS
+_IOC_DIRSHIFT = _IOC_SIZESHIFT + _IOC_SIZEBITS
+
+_IOC_WRITE = 1
+_IOC_READ = 2
+
+
+def _IOC(dir_, type_, nr, size):
+ return (
+ ctypes.c_int32(dir_ << _IOC_DIRSHIFT).value |
+ ctypes.c_int32(ord(type_) << _IOC_TYPESHIFT).value |
+ ctypes.c_int32(nr << _IOC_NRSHIFT).value |
+ ctypes.c_int32(size << _IOC_SIZESHIFT).value)
+
+
+def _IOC_TYPECHECK(t):
+ return ctypes.sizeof(t)
+
+
+def _IOR(type_, nr, size):
+ return _IOC(_IOC_READ, type_, nr, ctypes.sizeof(size))
+
+
+def _IOWR(type_, nr, size):
+ return _IOC(_IOC_READ | _IOC_WRITE, type_, nr, _IOC_TYPECHECK(size))
+
+
+class v4l2_capability(ctypes.Structure):
+ """
+ Driver capabilities
+ """
+ _fields_ = [
+ ('driver', ctypes.c_char * 16),
+ ('card', ctypes.c_char * 32),
+ ('bus_info', ctypes.c_char * 32),
+ ('version', ctypes.c_uint32),
+ ('capabilities', ctypes.c_uint32),
+ ('reserved', ctypes.c_uint32 * 4),
+ ]
+
+
+# Values for 'capabilities' field
+V4L2_CAP_VIDEO_CAPTURE = 0x00000001
+V4L2_CAP_VIDEO_OVERLAY = 0x00000004
+V4L2_CAP_READWRITE = 0x01000000
+V4L2_CAP_STREAMING = 0x04000000
+
+v4l2_frmsizetypes = ctypes.c_uint
+(
+ V4L2_FRMSIZE_TYPE_DISCRETE,
+ V4L2_FRMSIZE_TYPE_CONTINUOUS,
+ V4L2_FRMSIZE_TYPE_STEPWISE,
+) = range(1, 4)
+
+
+class v4l2_frmsize_discrete(ctypes.Structure):
+ _fields_ = [
+ ('width', ctypes.c_uint32),
+ ('height', ctypes.c_uint32),
+ ]
+
+
+class v4l2_frmsize_stepwise(ctypes.Structure):
+ _fields_ = [
+ ('min_width', ctypes.c_uint32),
+ ('min_height', ctypes.c_uint32),
+ ('step_width', ctypes.c_uint32),
+ ('min_height', ctypes.c_uint32),
+ ('max_height', ctypes.c_uint32),
+ ('step_height', ctypes.c_uint32),
+ ]
+
+
+class v4l2_frmsizeenum(ctypes.Structure):
+ class _u(ctypes.Union):
+ _fields_ = [
+ ('discrete', v4l2_frmsize_discrete),
+ ('stepwise', v4l2_frmsize_stepwise),
+ ]
+
+ _fields_ = [
+ ('index', ctypes.c_uint32),
+ ('pixel_format', ctypes.c_uint32),
+ ('type', ctypes.c_uint32),
+ ('_u', _u),
+ ('reserved', ctypes.c_uint32 * 2)
+ ]
+
+ _anonymous_ = ('_u',)
+
+
+class v4l2_fmtdesc(ctypes.Structure):
+ _fields_ = [
+ ('index', ctypes.c_uint32),
+ ('type', ctypes.c_int),
+ ('flags', ctypes.c_uint32),
+ ('description', ctypes.c_char * 32),
+ ('pixelformat', ctypes.c_uint32),
+ ('reserved', ctypes.c_uint32 * 4),
+ ]
+
+V4L2_FMT_FLAG_COMPRESSED = 0x0001
+V4L2_FMT_FLAG_EMULATED = 0x0002
+
+# ioctl code for video devices
+VIDIOC_QUERYCAP = _IOR('V', 0, v4l2_capability)
+VIDIOC_ENUM_FRAMESIZES = _IOWR('V', 74, v4l2_frmsizeenum)
+VIDIOC_ENUM_FMT = _IOWR('V', 2, v4l2_fmtdesc)
+
+
+class CameraTest:
+ """
+ A simple class that displays a test image via GStreamer.
+ """
+ def __init__(self, args, gst_plugin=None, gst_video_type=None):
+ self.args = args
+ self._mainloop = GObject.MainLoop()
+ self._width = 640
+ self._height = 480
+ self._gst_plugin = gst_plugin
+ self._gst_video_type = gst_video_type
+
+ def detect(self):
+ """
+ Display information regarding webcam hardware
+ """
+ cap_status = dev_status = 1
+ for i in range(10):
+ cp = v4l2_capability()
+ device = '/dev/video%d' % i
+ try:
+ with open(device, 'r') as vd:
+ fcntl.ioctl(vd, VIDIOC_QUERYCAP, cp)
+ except IOError:
+ continue
+ dev_status = 0
+ print("%s: OK" % device)
+ print(" name : %s" % cp.card.decode('UTF-8'))
+ print(" driver : %s" % cp.driver.decode('UTF-8'))
+ print(" version: %s.%s.%s"
+ % (cp.version >> 16,
+ (cp.version >> 8) & 0xff,
+ cp.version & 0xff))
+ print(" flags : 0x%x [" % cp.capabilities,
+ ' CAPTURE' if cp.capabilities & V4L2_CAP_VIDEO_CAPTURE
+ else '',
+ ' OVERLAY' if cp.capabilities & V4L2_CAP_VIDEO_OVERLAY
+ else '',
+ ' READWRITE' if cp.capabilities & V4L2_CAP_READWRITE
+ else '',
+ ' STREAMING' if cp.capabilities & V4L2_CAP_STREAMING
+ else '',
+ ' ]', sep="")
+
+ resolutions = self._get_supported_resolutions(device)
+ print(' ',
+ self._supported_resolutions_to_string(resolutions).replace(
+ "\n", " "),
+ sep="")
+
+ if cp.capabilities & V4L2_CAP_VIDEO_CAPTURE:
+ cap_status = 0
+ return dev_status | cap_status
+
+ def led(self):
+ """
+ Activate camera (switch on led), but don't display any output
+ """
+ pipespec = ("v4l2src device=%(device)s "
+ "! %(type)s "
+ "! %(plugin)s "
+ "! testsink"
+ % {'device': self.args.device,
+ 'type': self._gst_video_type,
+ 'plugin': self._gst_plugin})
+ logging.debug("LED test with pipeline %s", pipespec)
+ self._pipeline = Gst.parse_launch(pipespec)
+ self._pipeline.set_state(Gst.State.PLAYING)
+ time.sleep(3)
+ self._pipeline.set_state(Gst.State.NULL)
+
+ def display(self):
+ """
+ Displays the preview window
+ """
+ pipespec = ("v4l2src device=%(device)s "
+ "! %(type)s,width=%(width)d,height=%(height)d "
+ "! %(plugin)s "
+ "! autovideosink"
+ % {'device': self.args.device,
+ 'type': self._gst_video_type,
+ 'width': self._width,
+ 'height': self._height,
+ 'plugin': self._gst_plugin})
+ logging.debug("display test with pipeline %s", pipespec)
+ self._pipeline = Gst.parse_launch(pipespec)
+ self._pipeline.set_state(Gst.State.PLAYING)
+ time.sleep(10)
+ self._pipeline.set_state(Gst.State.NULL)
+
+ def still(self):
+ """
+ Captures an image to a file
+ """
+ if self.args.filename:
+ self._still_helper(self.args.filename, self._width, self._height,
+ self.args.quiet)
+ else:
+ with NamedTemporaryFile(prefix='camera_test_', suffix='.jpg') as f:
+ self._still_helper(f.name, self._width, self._height,
+ self.args.quiet)
+
+ def _still_helper(self, filename, width, height, quiet, pixelformat=None):
+ """
+ Captures an image to a given filename. width and height specify the
+ image size and quiet controls whether the image is displayed to the
+ user (quiet = True means do not display image).
+ """
+ command = ["fswebcam", "-D 1", "-S 50", "--no-banner",
+ "-d", self.args.device,
+ "-r", "%dx%d"
+ % (width, height), filename]
+ use_gstreamer = False
+ if pixelformat:
+ if 'MJPG' == pixelformat: # special tweak for fswebcam
+ pixelformat = 'MJPEG'
+ command.extend(["-p", pixelformat])
+
+ try:
+ check_call(command, stdout=open(os.devnull, 'w'), stderr=STDOUT)
+ except (CalledProcessError, OSError):
+ use_gstreamer = True
+
+ if use_gstreamer:
+ pipespec = ("v4l2src device=%(device)s "
+ "! %(type)s,width=%(width)d,height=%(height)d "
+ "! %(plugin)s "
+ "! jpegenc "
+ "! filesink location=%(filename)s"
+ % {'device': self.args.device,
+ 'type': self._gst_video_type,
+ 'width': width,
+ 'height': height,
+ 'plugin': self._gst_plugin,
+ 'filename': filename})
+ logging.debug("still test with gstreamer and "
+ "pipeline %s", pipespec)
+ self._pipeline = Gst.parse_launch(pipespec)
+ self._pipeline.set_state(Gst.State.PLAYING)
+ time.sleep(3)
+ self._pipeline.set_state(Gst.State.NULL)
+
+ if not quiet:
+ import imghdr
+ image_type = imghdr.what(filename)
+ pipespec = ("filesrc location=%(filename)s ! "
+ "%(type)sdec ! "
+ "videoscale ! "
+ "imagefreeze ! autovideosink"
+ % {'filename': filename,
+ 'type': image_type})
+ self._pipeline = Gst.parse_launch(pipespec)
+ self._pipeline.set_state(Gst.State.PLAYING)
+ time.sleep(10)
+ self._pipeline.set_state(Gst.State.NULL)
+
+ def _supported_resolutions_to_string(self, supported_resolutions):
+ """
+ Return a printable string representing a list of supported resolutions
+ """
+ ret = ""
+ for resolution in supported_resolutions:
+ ret += "Format: %s (%s)\n" % (resolution['pixelformat'],
+ resolution['description'])
+ ret += "Resolutions: "
+ for res in resolution['resolutions']:
+ ret += "%sx%s," % (res[0], res[1])
+ # truncate the extra comma with :-1
+ ret = ret[:-1] + "\n"
+ return ret
+
+ def resolutions(self):
+ """
+ After querying the webcam for supported formats and resolutions,
+ take multiple images using the first format returned by the driver,
+ and see if they are valid
+ """
+ resolutions = self._get_supported_resolutions(self.args.device)
+ # print supported formats and resolutions for the logs
+ print(self._supported_resolutions_to_string(resolutions))
+
+ # pick the first format, which seems to be what the driver wants for a
+ # default. This also matches the logic that fswebcam uses to select
+ # a default format.
+ resolution = resolutions[0]
+ if resolution:
+ print("Taking multiple images using the %s format"
+ % resolution['pixelformat'])
+ for res in resolution['resolutions']:
+ w = res[0]
+ h = res[1]
+ f = NamedTemporaryFile(prefix='camera_test_%s%sx%s' %
+ (resolution['pixelformat'], w, h),
+ suffix='.jpg', delete=False)
+ print("Taking a picture at %sx%s" % (w, h))
+ self._still_helper(f.name, w, h, True,
+ pixelformat=resolution['pixelformat'])
+ if self._validate_image(f.name, w, h):
+ print("Validated image %s" % f.name)
+ os.remove(f.name)
+ else:
+ print("Failed to validate image %s" % f.name,
+ file=sys.stderr)
+ os.remove(f.name)
+ return 1
+ return 0
+
+ def _get_pixel_formats(self, device, maxformats=5):
+ """
+ Query the camera to see what pixel formats it supports. A list of
+ dicts is returned consisting of format and description. The caller
+ should check whether this camera supports VIDEO_CAPTURE before
+ calling this function.
+ """
+ supported_formats = []
+ fmt = v4l2_fmtdesc()
+ fmt.index = 0
+ fmt.type = V4L2_CAP_VIDEO_CAPTURE
+ try:
+ while fmt.index < maxformats:
+ with open(device, 'r') as vd:
+ if fcntl.ioctl(vd, VIDIOC_ENUM_FMT, fmt) == 0:
+ pixelformat = {}
+ # save the int type for re-use later
+ pixelformat['pixelformat_int'] = fmt.pixelformat
+ pixelformat['pixelformat'] = "%s%s%s%s" % \
+ (chr(fmt.pixelformat & 0xFF),
+ chr((fmt.pixelformat >> 8) & 0xFF),
+ chr((fmt.pixelformat >> 16) & 0xFF),
+ chr((fmt.pixelformat >> 24) & 0xFF))
+ pixelformat['description'] = fmt.description.decode()
+ supported_formats.append(pixelformat)
+ fmt.index = fmt.index + 1
+ except IOError as e:
+ # EINVAL is the ioctl's way of telling us that there are no
+ # more formats, so we ignore it
+ if e.errno != errno.EINVAL:
+ print("Unable to determine Pixel Formats, this may be a "
+ "driver issue.")
+ return supported_formats
+ return supported_formats
+
+ def _get_supported_resolutions(self, device):
+ """
+ Query the camera for supported resolutions for a given pixel_format.
+ Data is returned in a list of dictionaries with supported pixel
+ formats as the following example shows:
+ resolution['pixelformat'] = "YUYV"
+ resolution['description'] = "(YUV 4:2:2 (YUYV))"
+ resolution['resolutions'] = [[width, height], [640, 480], [1280, 720] ]
+
+ If we are unable to gather any information from the driver, then we
+ return YUYV and 640x480 which seems to be a safe default.
+ Per the v4l2 spec the ioctl used here is experimental
+ but seems to be well supported.
+ """
+ supported_formats = self._get_pixel_formats(device)
+ if not supported_formats:
+ resolution = {}
+ resolution['description'] = "YUYV"
+ resolution['pixelformat'] = "YUYV"
+ resolution['resolutions'] = [[640, 480]]
+ supported_formats.append(resolution)
+ return supported_formats
+
+ for supported_format in supported_formats:
+ resolutions = []
+ framesize = v4l2_frmsizeenum()
+ framesize.index = 0
+ framesize.pixel_format = supported_format['pixelformat_int']
+ with open(device, 'r') as vd:
+ try:
+ while fcntl.ioctl(vd,
+ VIDIOC_ENUM_FRAMESIZES,
+ framesize) == 0:
+ if framesize.type == V4L2_FRMSIZE_TYPE_DISCRETE:
+ resolutions.append([framesize.discrete.width,
+ framesize.discrete.height])
+ # for continuous and stepwise, let's just use min and
+ # max they use the same structure and only return
+ # one result
+ elif (framesize.type in (V4L2_FRMSIZE_TYPE_CONTINUOUS,
+ V4L2_FRMSIZE_TYPE_STEPWISE)):
+ resolutions.append([framesize.stepwise.min_width,
+ framesize.stepwise.min_height]
+ )
+ resolutions.append([framesize.stepwise.max_width,
+ framesize.stepwise.max_height]
+ )
+ break
+ framesize.index = framesize.index + 1
+ except IOError as e:
+ # EINVAL is the ioctl's way of telling us that there are no
+ # more formats, so we ignore it
+ if e.errno != errno.EINVAL:
+ print("Unable to determine supported framesizes "
+ "(resolutions), this may be a driver issue.")
+ supported_format['resolutions'] = resolutions
+ return supported_formats
+
+ def _validate_image(self, filename, width, height):
+ """
+ Given a filename, ensure that the image is the width and height
+ specified and is a valid image file.
+ """
+ if imghdr.what(filename) != 'jpeg':
+ return False
+
+ outw = outh = 0
+ with open(filename, mode='rb') as jpeg:
+ jpeg.seek(2)
+ b = jpeg.read(1)
+ try:
+ while (b and ord(b) != 0xDA):
+ while (ord(b) != 0xFF):
+ b = jpeg.read(1)
+ while (ord(b) == 0xFF):
+ b = jpeg.read(1)
+ if (ord(b) >= 0xC0 and ord(b) <= 0xC3):
+ jpeg.seek(3, 1)
+ h, w = struct.unpack(">HH", jpeg.read(4))
+ break
+ b = jpeg.read(1)
+ outw, outh = int(w), int(h)
+ except (struct.error, ValueError):
+ pass
+
+ if outw != width:
+ print("Image width does not match, was %s should be %s" %
+ (outw, width), file=sys.stderr)
+ return False
+ if outh != height:
+ print("Image width does not match, was %s should be %s" %
+ (outh, height), file=sys.stderr)
+ return False
+
+ return True
+
+ return True
+
+
+def parse_arguments(argv):
+ """
+ Parse command line arguments
+ """
+ parser = argparse.ArgumentParser(description="Run a camera-related test")
+ subparsers = parser.add_subparsers(dest='test',
+ title='test',
+ description='Available camera tests')
+
+ parser.add_argument('--debug', dest='log_level',
+ action="store_const", const=logging.DEBUG,
+ default=logging.INFO, help="Show debugging messages")
+
+ def add_device_parameter(parser):
+ group = parser.add_mutually_exclusive_group()
+ group.add_argument("-d", "--device", default="/dev/video0",
+ help="Device for the webcam to use")
+ group.add_argument("--highest-device", action="store_true",
+ help=("Use the /dev/videoN "
+ "where N is the highest value available"))
+ group.add_argument("--lowest-device", action="store_true",
+ help=("Use the /dev/videoN "
+ "where N is the lowest value available"))
+ subparsers.add_parser('detect')
+ led_parser = subparsers.add_parser('led')
+ add_device_parameter(led_parser)
+ display_parser = subparsers.add_parser('display')
+ add_device_parameter(display_parser)
+ still_parser = subparsers.add_parser('still')
+ add_device_parameter(still_parser)
+ still_parser.add_argument("-f", "--filename",
+ help="Filename to store the picture")
+ still_parser.add_argument("-q", "--quiet", action="store_true",
+ help=("Don't display picture, "
+ "just write the picture to a file"))
+ resolutions_parser = subparsers.add_parser('resolutions')
+ add_device_parameter(resolutions_parser)
+ args = parser.parse_args(argv)
+
+ def get_video_devices():
+ devices = sorted(glob('/dev/video[0-9]'),
+ key=lambda d: re.search(r'\d', d).group(0))
+ assert len(devices) > 0, "No video devices found"
+ return devices
+
+ if hasattr(args, 'highest_device') and args.highest_device:
+ args.device = get_video_devices()[-1]
+ elif hasattr(args, 'lowest_device') and args.lowest_device:
+ args.device = get_video_devices()[0]
+ return args
+
+
+if __name__ == "__main__":
+ args = parse_arguments(sys.argv[1:])
+
+ if not args.test:
+ args.test = 'detect'
+
+ logging.basicConfig(level=args.log_level)
+
+ # Import Gst only for the test cases that will need it
+ if args.test in ['display', 'still', 'led', 'resolutions']:
+ from gi.repository import Gst
+ if Gst.version()[0] > 0:
+ gst_plugin = 'videoconvert'
+ gst_video_type = 'video/x-raw'
+ else:
+ gst_plugin = 'ffmpegcolorspace'
+ gst_video_type = 'video/x-raw-yuv'
+ Gst.init(None)
+ camera = CameraTest(args, gst_plugin, gst_video_type)
+ else:
+ camera = CameraTest(args)
+
+ sys.exit(getattr(camera, args.test)())
diff --git a/bin/connect_wireless b/bin/connect_wireless
index c1ee36c..e39f52c 100755
--- a/bin/connect_wireless
+++ b/bin/connect_wireless
@@ -56,7 +56,11 @@ then
# Disconnect, pause for a short time
for iface in `(nmcli -f GENERAL dev list 2>/dev/null || nmcli -f GENERAL dev show) | grep 'GENERAL.DEVICE' | awk '{print $2}'`
do
- nmcli dev disconnect iface $iface
+ if [ $NMCLI_GTE_0_9_10 -eq 0 ]; then
+ nmcli dev disconnect iface $iface
+ else
+ nmcli dev disconnect $iface
+ fi
done
sleep 2
fi
diff --git a/bin/cpu_offlining b/bin/cpu_offlining
index 0e88af1..7f1095a 100755
--- a/bin/cpu_offlining
+++ b/bin/cpu_offlining
@@ -1,22 +1,24 @@
#!/bin/bash
-echo "Beginning CPU Offlining Test" 1>&2
-
result=0
cpu_count=0
+offline_fails="Offline Failed:"
+online_fails="Online Failed:"
+exitcode=0
# Turn CPU cores off
for cpu_num in `ls /sys/devices/system/cpu | grep -o cpu[0-9]*`; do
if [ -f /sys/devices/system/cpu/$cpu_num/online ]; then
if [ "$cpu_num" != "cpu0" ]; then
((cpu_count++))
- echo "Offlining $cpu_num" 1>&2
echo 0 > /sys/devices/system/cpu/$cpu_num/online
-
- grep -w -i -q $cpu_num /proc/interrupts
- if [ $? -eq 0 ]; then
+ sleep 0.5
+ output=`grep -w -i $cpu_num /proc/interrupts`
+ result=$?
+ if [ $result -eq 0 ]; then
echo "ERROR: Failed to offline $cpu_num" 1>&2
- result=1
+ offline_fails="$offline_fails $cpu_num"
+ exitcode=1
fi
fi
fi
@@ -26,21 +28,25 @@ done
for cpu_num in `ls /sys/devices/system/cpu | grep -o cpu[0-9]*`; do
if [ -f /sys/devices/system/cpu/$cpu_num/online ]; then
if [ "$cpu_num" != "cpu0" ]; then
- echo "Onlining $cpu_num" 1>&2
echo 1 > /sys/devices/system/cpu/$cpu_num/online
- grep -w -i -q $cpu_num /proc/interrupts
- if [ $? -eq 1 ]; then
+ sleep 0.5
+ output=`grep -w -i $cpu_num /proc/interrupts`
+ result=$?
+ if [ $result -eq 1 ]; then
echo "ERROR: Failed to online $cpu_num" 1>&2
- result=1
+ online_fails="$online_fails $cpu_num"
+ exitcode=1
fi
fi
fi
done
-if [ $result -eq 0 ]; then
+if [ $exitcode -eq 0 ]; then
echo "Successfully turned $cpu_count cores off and back on"
else
echo "Error with offlining one or more cores. CPU offline may not work if this is an ARM system." 1>&2
+ echo $offline_fails 1>&2
+ echo $online_fails 1>&2
fi
-exit $result
+exit $exitcode
diff --git a/bin/cpu_stress b/bin/cpu_stress
new file mode 100755
index 0000000..abcb983
--- /dev/null
+++ b/bin/cpu_stress
@@ -0,0 +1,70 @@
+#!/bin/sh
+
+# Script to perform CPU stress tests
+#
+# Copyright (c) 2016 Canonical Ltd.
+#
+# Authors
+# Rod Smith <rod.smith@canonical.com>
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License version 3,
+# as published by the Free Software Foundation.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+#
+# The purpose of this script is to run CPU stress tests using the
+# stress-ng program.
+#
+# Usage:
+# cpu_stress [ --runtime <time-in-seconds> ]
+#
+# If --runtime is not specified, it defaults to 7200 (2 hours).
+
+runtime=7200
+if [ "$#" = "2" ] && [ "$1" = "--runtime" ] && [ "$2" -eq "$2" ] ; then
+ runtime=$2
+elif [ "$#" != "0" ] ; then
+ echo "Usage:"
+ echo " $0 [ --runtime <time-in-seconds> ]"
+ exit 1
+fi
+echo "Setting run time to $runtime seconds"
+# Add 10% to runtime; will forcefully terminate if stress-ng
+# fails to return in that time.
+end_time=$((runtime*11/10))
+
+# NOTE:
+# Options --af-alg 0 through --wcs 0 specify CPU stressors. As of stress-ng
+# version 0.05.12, this is equivalent to --class cpu --all 0 --exclude numa,cpu_online.
+# This script specifies stressors individually because the list of stressors keeps
+# increasing, and we want consistency -- if the stress-ng version bumps up, we
+# don't want new stressors being run. We're omitting numa because it's most
+# useful on systems with massive numbers of CPUs, and cpu_online because it's
+# failed on 4 of 8 test systems, so it seems too strict.
+# Use "timeout" command to launch stress-ng, to catch it should it go into la-la land
+timeout -s 9 $end_time stress-ng --aggressive --verify --timeout $runtime \
+ --metrics-brief --tz --times \
+ --af-alg 0 --bsearch 0 --context 0 --cpu 0 \
+ --crypt 0 --hsearch 0 --longjmp 0 --lsearch 0 \
+ --matrix 0 --qsort 0 --str 0 --stream 0 \
+ --tsearch 0 --vecmath 0 --wcs 0
+result="$?"
+
+echo "**********************************************************"
+if [ $result = "0" ] ; then
+ echo "* stress-ng CPU test passed!"
+else
+ if [ $result = "137" ] ; then
+ echo "* stress-ng CPU test timed out and was forcefully terminated!"
+ fi
+ echo "* stress-ng CPU test failed with result $result"
+fi
+echo "**********************************************************"
+exit $result
diff --git a/bin/cpu_topology b/bin/cpu_topology
index 7822f9e..78317b7 100755
--- a/bin/cpu_topology
+++ b/bin/cpu_topology
@@ -5,6 +5,7 @@ Written by Jeffrey Lane <jeffrey.lane@canonical.com>
'''
import sys
import os
+import re
class proc_cpuinfo():
@@ -20,10 +21,18 @@ class proc_cpuinfo():
finally:
cpu_fh.close()
+ r_s390 = re.compile("processor [0-9]")
+ r_x86 = re.compile("processor\s+:")
for i in temp:
- if i.startswith('processor'):
+ # Handle s390 first
+ if r_s390.match(i):
+ cpu_num = i.split(':')[0].split()[1].strip()
+ key = 'cpu' + cpu_num
+ self.cpuinfo[key] = {'core_id': cpu_num,
+ 'physical_package_id': cpu_num}
+ elif r_x86.match(i):
key = 'cpu' + (i.split(':')[1].strip())
- self.cpuinfo[key] = {'core_id':'', 'physical_package_id':''}
+ self.cpuinfo[key] = {'core_id': '', 'physical_package_id': ''}
elif i.startswith('core id'):
self.cpuinfo[key].update({'core_id': i.split(':')[1].strip()})
elif i.startswith('physical id'):
@@ -84,12 +93,12 @@ def main():
print("FAIL: CPU Topology is incorrect", file=sys.stderr)
print("-" * 52, file=sys.stderr)
print("{0}{1}".format("/proc/cpuinfo".center(30), "sysfs".center(25)),
- file=sys.stderr)
+ file=sys.stderr)
print("{0}{1}{2}{3}{1}{2}".format(
- "CPU".center(6),
- "Physical ID".center(13),
- "Core ID".center(9),
- "|".center(3)), file=sys.stderr)
+ "CPU".center(6),
+ "Physical ID".center(13),
+ "Core ID".center(9),
+ "|".center(3)), file=sys.stderr)
for key in sorted(sys_cpu.keys()):
print("{0}{1}{2}{3}{4}{5}".format(
key.center(6),
diff --git a/bin/disk_cpu_load b/bin/disk_cpu_load
new file mode 100755
index 0000000..5e0d0eb
--- /dev/null
+++ b/bin/disk_cpu_load
@@ -0,0 +1,138 @@
+#!/bin/bash
+
+# Script to test CPU load imposed by a simple disk read operation
+#
+# Copyright (c) 2016 Canonical Ltd.
+#
+# Authors
+# Rod Smith <rod.smith@canonical.com>
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License version 3,
+# as published by the Free Software Foundation.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+#
+# The purpose of this script is to run disk stress tests using the
+# stress-ng program.
+#
+# Usage:
+# disk_cpu_load [ --max-load <load> ] [ --xfer <mebibytes> ]
+# [ <device-filename> ]
+#
+# Parameters:
+# --max-load <load> -- The maximum acceptable CPU load, as a percentage.
+# Defaults to 30.
+# --xfer <mebibytes> -- The amount of data to read from the disk, in
+# mebibytes. Defaults to 4096 (4 GiB).
+# <device-filename> -- This is the WHOLE-DISK device filename (with or
+# without "/dev/"), e.g. "sda" or "/dev/sda". The
+# script finds a filesystem on that device, mounts
+# it if necessary, and runs the tests on that mounted
+# filesystem. Defaults to /dev/sda.
+
+
+set -e
+
+
+get_params() {
+ disk_device="/dev/sda"
+ short_device="sda"
+ max_load=30
+ xfer=4096
+ while [ $# -gt 0 ] ; do
+ case $1 in
+ --max-load) max_load="$2"
+ shift
+ ;;
+ --xfer) xfer="$2"
+ shift
+ ;;
+ *) disk_device="/dev/$1"
+ disk_device=`echo $disk_device | sed "s/\/dev\/\/dev/\/dev/g"`
+ short_device=$(echo $disk_device | sed "s/\/dev//g")
+ if [ ! -b $disk_device ] ; then
+ echo "Unknown block device \"$disk_device\""
+ echo "Usage: $0 [ --max-load <load> ] [ --xfer <mebibytes> ]"
+ echo " [ device-file ]"
+ exit 1
+ fi
+ ;;
+ esac
+ shift
+ done
+} # get_params()
+
+
+# Find the sum of all values in an array
+# Input:
+# $1 - The array whose values are to be summed
+# Output:
+# $total - The sum of the values
+sum_array() {
+ local array=("${@}")
+ total=0
+ for i in ${array[@]}; do
+ let total+=$i
+ done
+} # sum_array()
+
+
+# Compute's CPU load between two points in time.
+# Input:
+# $1 - CPU statistics from /proc/stat from START point, in a string of numbers
+# $2 - CPU statistics from /proc/stat from END point, in a string of numbers
+# These values can be obtained via $(grep "cpu " /proc/stat | tr -s " " | cut -d " " -f 2-)
+# Ouput:
+# $cpu_load - CPU load over the two measurements, as a percentage (0-100)
+compute_cpu_load() {
+ local start_use
+ local end_use
+ IFS=' ' read -r -a start_use <<< $1
+ IFS=' ' read -r -a end_use <<< $2
+ local diff_idle
+ let diff_idle=${end_use[3]}-${start_use[3]}
+
+ sum_array "${start_use[@]}"
+ local start_total=$total
+ sum_array "${end_use[@]}"
+ local end_total=$total
+
+ local diff_total
+ local diff_used
+ let diff_total=${end_total}-${start_total}
+ let diff_used=$diff_total-$diff_idle
+
+ if [ "$diff_total" != "0" ] ; then
+ let cpu_load=($diff_used*100)/$diff_total
+ else
+ cpu_load=0
+ fi
+} # compute_cpu_load()
+
+
+#
+# Main program body....
+#
+
+get_params "$@"
+retval=0
+echo "Testing CPU load when reading $xfer MiB from $disk_device"
+echo "Maximum acceptable CPU load is $max_load"
+blockdev --flushbufs $disk_device
+start_load="$(grep "cpu " /proc/stat | tr -s " " | cut -d " " -f 2-)"
+dd if="$disk_device" of=/dev/null bs=1048576 count="$xfer" &> /dev/null
+end_load="$(grep "cpu " /proc/stat | tr -s " " | cut -d " " -f 2-)"
+compute_cpu_load "$start_load" "$end_load"
+echo "Detected disk read CPU load is $cpu_load"
+if [ "$cpu_load" -gt "$max_load" ] ; then
+ retval=1
+ echo "*** DISK CPU LOAD TEST HAS FAILED! ***"
+fi
+exit $retval
diff --git a/bin/disk_info b/bin/disk_info
index b1b73df..5efedbd 100755
--- a/bin/disk_info
+++ b/bin/disk_info
@@ -4,10 +4,12 @@
#
# Copyright (C) 2010-2013 by Cloud Computing Center for Mobile Applications
# Industrial Technology Research Institute
+# Copyright 2016 Canonical Ltd.
#
# Authors:
# Nelson Chu <Nelson.Chu@itri.org.tw>
# Jeff Lane <jeff@ubuntu.com>
+# Sylvain Pineau <sylvain.pineau@canonical.com>
#
# Checkbox is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License version 3,
@@ -20,55 +22,60 @@
#
# You should have received a copy of the GNU General Public License
# along with Checkbox. If not, see <http://www.gnu.org/licenses/>.
-"""
-disk_info
-
-Uses lshw to gather information about disks seen by the OS.
-Outputs logical name, vendor, description, size and product data
-"""
+"""disk_info utility."""
+import re
import sys
-import xml.etree.ElementTree as ET
-from subprocess import check_output
-
+from subprocess import check_output, CalledProcessError
-def get_item(disk, attribute):
- try:
- return disk.find(attribute).text
- except AttributeError:
- return "Unknown"
+from checkbox_support.parsers.udevadm import find_pkname_is_root_mountpoint
def main():
- hwinfo_xml = check_output(['lshw', '-c', 'disk', '-xml'])
- root = ET.fromstring(hwinfo_xml)
+ """
+ disk_info.
- # Parse lshw XML for gathering disk information.
- disk_list = root.findall(".//node[@class='disk']")
+ Uses lsblk to gather information about disks seen by the OS.
+ Outputs kernel name, model and size data
+ """
+ pattern = re.compile('KNAME="(?P<KNAME>.*)" '
+ 'TYPE="(?P<TYPE>.*)" '
+ 'SIZE="(?P<SIZE>.*)" '
+ 'MODEL="(?P<MODEL>.*)" '
+ 'MOUNTPOINT="(?P<MOUNTPOINT>.*)"')
+ try:
+ lsblk = check_output(["lsblk", "-i", "-n", "-P", "-o",
+ "KNAME,TYPE,SIZE,MODEL,MOUNTPOINT"],
+ universal_newlines=True)
+ except CalledProcessError as e:
+ sys.exit(e)
+
+ disks = 0
+ for line in lsblk.splitlines():
+ m = pattern.match(line)
+ if not m or m.group('TYPE') != 'disk':
+ continue
+ # Only consider MMC block devices if one of their mounted partitions is
+ # root (/)
+ if (
+ m.group('KNAME').startswith('mmcblk') and not
+ find_pkname_is_root_mountpoint(m.group('KNAME'), lsblk)
+ ):
+ continue
+ disks += 1
+ model = m.group('MODEL')
+ if not model:
+ model = 'Unknown'
+ print("Name: /dev/{}".format(m.group('KNAME')))
+ print("\t{:7}\t{}".format('Model:', model))
+ print("\t{:7}\t{}".format('Size:', m.group('SIZE')))
- if not disk_list:
+ if not disks:
print("No disk information discovered.")
return 10
- for disk in disk_list:
- if disk.get('id') == 'disk':
- print("Name: {}".format(get_item(disk, 'logicalname')))
- print("\t{k:15}\t{v}".format(k="Description:",
- v=get_item(disk, 'description')))
- print("\t{k:15}\t{v}".format(k="Vendor:",
- v=get_item(disk, 'vendor')))
- print("\t{k:15}\t{v}".format(k="Product:",
- v=get_item(disk, 'product')))
- try:
- disk_size = ("%dGB" % (
- int(disk.find('size').text) / (1000**3)))
- except TypeError:
- disk_size = "No Reported Size"
- except AttributeError:
- disk_size = "No Reported Size"
- print("\t{k:15}\t{v}".format(k="Size:",
- v=disk_size))
return 0
+
if __name__ == '__main__':
sys.exit(main())
diff --git a/bin/disk_smart b/bin/disk_smart
index d984a45..804b11f 100755
--- a/bin/disk_smart
+++ b/bin/disk_smart
@@ -77,6 +77,9 @@ from subprocess import Popen, PIPE, check_call, check_output
from subprocess import CalledProcessError
from argparse import ArgumentParser
+# NOTE: If raid_types changes, also change it in block_device_resource script!
+raid_types = ["megaraid", "cciss", "3ware", "areca"]
+
class ListHandler(logging.StreamHandler):
@@ -97,38 +100,102 @@ class ListHandler(logging.StreamHandler):
logging.StreamHandler.emit(self, record)
-def enable_smart(disk):
+def enable_smart(disk, raid_element, raid_type):
"""Log data and, if necessary, enable SMART on the specified disk.
See also smart_support() in block_device_resource script.
:param disk:
disk device filename (e.g., /dev/sda)
+ :param raid_element:
+ element number to enable in RAID array; undefined if not a RAID device
+ :param raid_type:
+ type of raid device (none, megaraid, etc.)
:returns:
True if enabling smart was successful, False otherwise
"""
# Check with smartctl to record basic SMART data on the disk
- command = 'smartctl -i %s' % disk
+ if raid_type == 'none':
+ command = 'smartctl -i {}'.format(disk)
+ logging.debug('SMART Info for disk {}'.format(disk))
+ else:
+ command = 'smartctl -i {} -d {},{}'.format(disk, raid_type,
+ raid_element)
+ logging.debug('SMART Info for disk {}, element {}'.
+ format(disk, raid_element))
diskinfo_bytes = (Popen(command, stdout=PIPE, shell=True)
.communicate()[0])
diskinfo = (diskinfo_bytes.decode(encoding='utf-8', errors='ignore')
.splitlines())
- logging.debug('SMART Info for disk %s', disk)
logging.debug(diskinfo)
if len(diskinfo) > 2 and not any("SMART support is" in s and "Enabled"
in s for s in diskinfo):
logging.debug('SMART disabled; attempting to enable it.')
- command = 'smartctl -s on %s' % disk
+ if raid_type == 'none':
+ command = 'smartctl -s on {}'.format(disk)
+ else:
+ command = ('smartctl -s on {} -d {},{}'.
+ format(disk, raid_type, raid_element))
try:
check_call(shlex.split(command))
return True
except CalledProcessError:
+ if raid_type == 'none':
+ logging.warning('SMART could not be enabled on {}'.
+ format(disk))
+ else:
+ logging.warning('SMART could not be enabled on {}, element '
+ '{}'.format(disk, raid_element))
return False
return True
-def run_smart_test(disk, type='short'):
- ctl_command = 'smartctl -t %s %s' % (type, disk)
- logging.debug('Beginning test with %s', ctl_command)
+def count_raid_disks(disk):
+ """Count the disks in a RAID array.
+
+ :param disk:
+ Disk device filename (e.g., /dev/sda)
+ :returns:
+ Number of disks in array (0 for non-RAID disk)
+ Type of RAID (none, megaraid, 3ware, areca, or cciss; note that only
+ none and megaraid are tested, as of Jan. 2016)
+ """
+ raid_element = 0
+ raid_type = 'none'
+ command = 'smartctl -i {}'.format(disk)
+ diskinfo_bytes = (Popen(command, stdout=PIPE, shell=True)
+ .communicate()[0])
+ diskinfo = (diskinfo_bytes.decode(encoding='utf-8', errors='ignore')
+ .splitlines())
+ for type in raid_types:
+ if any("-d {},N".format(type) in s for s in diskinfo):
+ logging.info('Found RAID controller of type {}'.format(type))
+ raid_type = type
+ break
+ if raid_type != 'none':
+ # This is a hardware RAID controller, so count individual disks....
+ disk_exists = True
+ while disk_exists:
+ command = ('smartctl -i {} -d {},{}'.
+ format(disk, raid_type, raid_element))
+ try:
+ check_output(shlex.split(command))
+ raid_element += 1
+ except CalledProcessError:
+ disk_exists = False
+ logging.info("Counted {} RAID disks on {}\n".
+ format(raid_element, disk))
+ return raid_element, raid_type
+
+
+def initiate_smart_test(disk, raid_element, raid_type, type='short'):
+ # Note, '-t force' ensures we abort any existing smart test in progress
+ # and start a clean run.
+ if raid_type == 'none':
+ ctl_command = 'smartctl -t {} -t force {}'.format(type, disk)
+ else:
+ ctl_command = ('smartctl -t {} -t force {} -d {},{}'.
+ format(type, disk, raid_type, raid_element))
+ logging.debug('Beginning test with {}'.format(ctl_command))
smart_proc = Popen(ctl_command, stderr=PIPE, stdout=PIPE,
universal_newlines=True, shell=True)
@@ -139,13 +206,22 @@ def run_smart_test(disk, type='short'):
return smart_proc.returncode
-def get_smart_entries(disk, type='selftest'):
+def get_smart_entries(disk, raid_element, raid_type, type='selftest'):
entries = []
try:
- stdout = check_output(['smartctl', '-l', type, disk],
- universal_newlines=True)
+ if raid_type == 'none':
+ stdout = check_output(['smartctl', '-l', type, disk],
+ universal_newlines=True)
+ else:
+ stdout = check_output(['smartctl', '-l', type, disk,
+ '-d', '{},{}'.
+ format(raid_type, raid_element)],
+ universal_newlines=True)
returncode = 0
except CalledProcessError as err:
+ logging.error("Error encountered checking SMART Log")
+ logging.error("\tsmartctl returned code: {}".format(err.returncode))
+ logging.error("\tSee 'man smartctl' for info on return code meanings")
stdout = err.output
returncode = err.returncode
@@ -185,15 +261,22 @@ def get_smart_entries(disk, type='selftest'):
return entries, returncode
-# Returns True if an "in-progress" message is found in the smartctl
-# output, False if such a message is not found. In the former case,
-# the in-progress message entries are logged.
def in_progress(current_entries):
+ """Check to see if the test is in progress.
+
+ :param current_entries:
+ Output of smartctl command to be checked for status indicator.
+ :returns:
+ True if an "in-progress" message is found, False otherwise
+ """
+ # LP:1612220 Only check first log entry for status to avoid false triggers
+ # on older interrupted tests that may still show an "in progress" status.
statuses = [entry for entry in current_entries
- if isinstance(entry, dict)
- and 'status' in entry
- and (entry['status'] == 'Self-test routine in progress'
- or "Self test in progress" in entry['status'])]
+ if isinstance(entry,
+ dict) and 'status' in entry and
+ entry['number'] == 1 and (
+ entry['status'] == 'Self-test routine in progress' or
+ "Self test in progress" in entry['status'])]
if statuses:
for entry in statuses:
logging.debug('%s %s %s %s' % (entry['number'],
@@ -205,25 +288,43 @@ def in_progress(current_entries):
return False
-# Wait for SMART test to complete; return status and return code.
-# Note that different disks return different types of values.
-# Some return no status reports while a test is ongoing; others
-# show a status line at the START of the list of tests, and
-# others show a status line at the END of the list of tests
-# (and then move it to the top once the tests are done).
-def poll_for_status(args, disk, previous_entries):
+def poll_for_status(args, disk, raid_element, raid_type, previous_entries):
+ """Poll a disk for its SMART status.
+
+ Wait for SMART test to complete; return status and return code.
+ Note that different disks return different types of values.
+ Some return no status reports while a test is ongoing; others
+ show a status line at the START of the list of tests, and
+ others show a status line at the END of the list of tests
+ (and then move it to the top once the tests are done).
+ :param args:
+ Script's command-line arguments
+ :param disk:
+ Disk device (e.g., /dev/sda)
+ :param raid_element:
+ RAID disk number (undefined for non-RAID disk)
+ :param raid_type:
+ Type of RAID device (megaraid, etc.)
+ :param previous_entries:
+ Previous SMART output; used to spot a change
+ :returns:
+ Current output and return code
+ """
# Priming read... this is here in case our test is finished or fails
# immediate after it beginsAccording to.
- logging.debug('Polling selftest.log for status')
+ logging.debug('Polling SMART selftest log for status')
keep_going = True
while keep_going:
# Poll every sleep seconds until test is complete$
time.sleep(args.sleep)
- current_entries, returncode = get_smart_entries(disk)
+ current_entries, returncode = get_smart_entries(disk, raid_element,
+ raid_type)
if current_entries != previous_entries:
if not in_progress(current_entries):
+ logging.debug("Current log entries differ from starting log"
+ " entries. Stopping polling.")
keep_going = False
if args.timeout is not None:
@@ -239,8 +340,82 @@ def poll_for_status(args, disk, previous_entries):
return current_entries[0]['status'], returncode
+def run_smart_test(args, disk, raid_element, raid_type):
+ """Run a test on a single disk device (possibly multiple RAID elements).
+
+ :param args:
+ Command-line arguments passed to script
+ :param disk:
+ Disk device filename (e.g., /dev/sda)
+ :param raid_element:
+ Number of RAID array element or undefined for non-RAID disk
+ :param raid_type:
+ Type of RAID device (e.g., megaraid)
+ :returns:
+ True for success, False for failure
+ """
+ previous_entries, returncode = get_smart_entries(disk, raid_element,
+ raid_type)
+ if raid_type == 'none':
+ logging.info("Starting SMART self-test on {}".format(disk))
+ else:
+ logging.info("Starting SMART self-test on {}, element {}".
+ format(disk, raid_element))
+ if initiate_smart_test(disk, raid_element, raid_type) != 0:
+ logging.error("Error reported during smartctl test")
+ return False
+
+ if len(previous_entries) > 20:
+ # Abort the previous instance
+ # so that polling can identify the difference
+
+ # The proper way to kill the test is using -X
+ # kill_smart_test(disk, raid_element, raid_type)
+ # Then re-initiate the test
+ logging.debug("Log is 20+ entries long. Restarting test to add an"
+ " abort message to make the log diff easier")
+ initiate_smart_test(disk, raid_element, raid_type)
+ previous_entries, returncode = get_smart_entries(disk, raid_element,
+ raid_type)
+
+ status, returncode = poll_for_status(args, disk, raid_element, raid_type,
+ previous_entries)
+
+ if returncode != 0:
+ log, returncode = get_smart_entries(disk, raid_element, raid_type)
+ if raid_type == 'none':
+ logging.error("FAIL: SMART Self-Test appears to have failed "
+ "for some reason. Run 'sudo smartctl -l selftest "
+ "{}' to see the SMART log".format(disk))
+ else:
+ logging.error("FAIL: SMART Self-Test appears to have failed "
+ "for some reason. Run 'sudo smartctl -l selftest "
+ "{} -d {},{}' to see the SMART log".
+ format(disk, raid_type, raid_element))
+ logging.error("\tLast smartctl return code: %d", returncode)
+ logging.error("\tLast smartctl run status: %s", status)
+ logging.debug("\tMost Recent SMART LOG Entry:")
+ for log_entry in log:
+ if log_entry['number'] == 1:
+ logging.debug("\t# {}\t{}\t{}\t{}\t{}\t{}".format(
+ log_entry['number'], log_entry['description'],
+ log_entry['status'], log_entry['remaining'],
+ log_entry['lifetime'], log_entry['lba']))
+ return False
+ else:
+ if raid_type == 'none':
+ logging.info("PASS: SMART Self-Test on {} completed without error".
+ format(disk))
+ else:
+ logging.info("PASS: SMART Self-Test on {}, element {} completed "
+ "without error\n".format(disk, raid_element))
+ return True
+
+
def main():
- description = 'Tests that SMART capabilities on disks that support SMART function.'
+ """Test SMART capabilities on disks that support SMART functions."""
+ description = ('Tests SMART capabilities on disks that support '
+ 'SMART functions.')
parser = ArgumentParser(description=description)
parser.add_argument('-b', '--block-dev',
metavar='DISK',
@@ -278,35 +453,21 @@ def main():
parser.error("You must be root to run this program")
disk = args.block_dev
- if not enable_smart(disk):
- logging.warning('SMART could not be enabled on %s' % disk)
- return 1
-
- # Initiate a self test and start polling until the test is done
- previous_entries, returncode = get_smart_entries(disk)
- logging.info("Starting SMART self-test on %s", disk)
- if run_smart_test(disk) != 0:
- logging.error("Error reported during smartctl test")
- return 1
-
- if len(previous_entries) > 20:
- # Abort the previous instance
- # so that polling can identify the difference
- run_smart_test(disk)
- previous_entries, returncode = get_smart_entries(disk)
-
- status, returncode = poll_for_status(args, disk, previous_entries)
-
- if returncode != 0:
- log, returncode = get_smart_entries(disk)
- logging.error("FAIL: SMART Self-Test appears to have failed for some reason. "
- "Run 'sudo smartctl -l selftest %s' to see the SMART log",
- disk)
- logging.debug("Last smartctl return code: %d", returncode)
- logging.debug("Last smartctl run status: %s", status)
+ num_disks, raid_type = count_raid_disks(disk)
+ if num_disks == 0:
+ success = enable_smart(disk, -1, raid_type)
+ success = success and run_smart_test(args, disk, -1, raid_type)
+ else:
+ success = True
+ for raid_element in range(0, num_disks):
+ if enable_smart(disk, raid_element, raid_type):
+ success = (run_smart_test(args, disk, raid_element,
+ raid_type) and success)
+ else:
+ success = False
+ if success is False:
return 1
else:
- logging.info("PASS: SMART Self-Test completed without error")
return 0
diff --git a/bin/disk_stress_ng b/bin/disk_stress_ng
new file mode 100755
index 0000000..8445488
--- /dev/null
+++ b/bin/disk_stress_ng
@@ -0,0 +1,242 @@
+#!/bin/bash
+
+# Script to disk stress tests using stress-ng
+#
+# Copyright (c) 2016 Canonical Ltd.
+#
+# Authors
+# Rod Smith <rod.smith@canonical.com>
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License version 3,
+# as published by the Free Software Foundation.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+#
+# The purpose of this script is to run disk stress tests using the
+# stress-ng program.
+#
+# Usage:
+# disk_stress_ng [ <device-filename> ]
+# [ --base-time <time-in-seconds> ]
+# [ --really-run ]
+#
+# Parameters:
+# --disk-device -- This is the WHOLE-DISK device filename WITHOUT "/dev/"
+# (e.g., sda). The script finds a filesystem on that
+# device, mounts it if necessary, and runs the tests on
+# that mounted filesystem.
+# Test with iostat
+
+get_params() {
+ disk_device="/dev/sda"
+ short_device="sda"
+ base_time="240"
+ really_run="N"
+ while [ $# -gt 0 ] ; do
+ case $1 in
+ --base-time) base_time="$2"
+ shift
+ ;;
+ --really-run) really_run="Y"
+ ;;
+ *) disk_device="/dev/$1"
+ disk_device=`echo $disk_device | sed "s/\/dev\/\/dev/\/dev/g"`
+ short_device=$(echo $disk_device | sed "s/\/dev//g")
+ if [ ! -b $disk_device ] ; then
+ echo "Unknown block device \"$disk_device\""
+ echo "Usage: $0 [ --base-time <time-in-seconds> ] [ --really-run ]"
+ echo " [ device-file ]"
+ exit 1
+ fi
+ ;;
+ esac
+ shift
+ done
+ mounted_part="N"
+} # get_params()
+
+
+# Find the largest partition that holds a supported filesystem on $disk_device.
+# Output:
+# $largest_part -- Device filename of largest qualifying partition
+# $largest_size -- Size of largest qualifying partition
+# $largest_fs -- Filesystem (ext4, etc.) used on largest qualifying partition
+# $unsupported_fs -- Empty or contains name of unsupported filesystem found on disk
+find_largest_partition() {
+ largest_part=""
+ largest_size=0
+ partitions=$(lsblk -b -l -n -o NAME,SIZE,TYPE,MOUNTPOINT $disk_device | grep part | tr -s " ")
+ unsupported_fs=""
+ for partition in $(echo "$partitions" | cut -d " " -f 1) ; do
+ part_size=$(echo "$partitions" | grep "$partition " | cut -d " " -f 2)
+ local blkid_info=$(blkid -s TYPE /dev/$partition | grep -E ext2\|ext3\|ext4\|xfs\|jfs\|btrfs)
+ if [ "$part_size" -gt "$largest_size" ] && [ -n "$blkid_info" ] ; then
+ largest_size=$part_size
+ largest_part="/dev/$partition"
+ largest_fs=$(blkid -s TYPE "/dev/$partition" | cut -d "=" -f 2)
+ fi
+ local blkid_info=$(blkid -s TYPE /dev/$partition | grep -E ntfs\|vfat\|hfs)
+ if [ -n "$blkid_info" ] ; then
+ # If there's an NTFS, HFS+, or FAT filesystem on the disk make note of it....
+ unsupported_fs=$(blkid -s TYPE "/dev/$partition" | cut -d "=" -f 2)
+ fi
+ done
+} # find_largest_partition()
+
+# Find the largest filesystem on $disk_device. If that partition is not
+# already mounted, try to mount it.
+# Output:
+# $test_dir -- Directory in which tests will occur
+# $mount_point -- Location where filesystem is mounted
+# $mounted_part -- Sets to "Y" if script mounted partition
+# $made_mountpoint -- Sets to "Y" if script created the mount point
+mount_filesystem() {
+ test_dir="/tmp/disk_stress_ng"
+ if [ -b $disk_device ]
+ then
+ echo "$disk_device is a block device"
+
+ #Add a check for warnings
+ WARN=$(parted -s ${disk_device} print | grep "^Warning.*${disk}.*[Rr]ead-only" 2>&1)
+ if [[ $? == 0 ]]
+ then
+ echo "Warning found in parted output:"
+ echo $WARN
+ echo "Aborting Test"
+ exit 1
+ fi
+ else
+ echo "$disk_device is not a block device! Aborting!"
+ exit 1
+ fi
+
+ find_largest_partition
+
+ if [ -n "$largest_part" ] ; then
+ echo "Found largest partition: \"$largest_part\""
+ mount_point=$(df | grep "$largest_part " | tr -s " " | cut -d " " -f 6)
+ if [ "$mount_point" == "" ] && [ "$really_run" == "Y" ] ; then
+ disk_device=$(echo $disk_device | sed "s/\/dev\/\/dev/\/dev/g")
+ mount_point="/mnt$short_device"
+ echo "No partition is mounted from $disk_device; attempting to mount one...."
+ if [ ! -d $mount_point ] ; then
+ mkdir -p "$mount_point"
+ made_mountpoint="Y"
+ fi
+ mount "$largest_part" "$mount_point"
+ mounted_part="Y"
+ fi
+ if [ "$mount_point" == "/" ] ; then
+ test_dir="/tmp/disk_stress_ng"
+ else
+ test_dir="$mount_point/tmp/disk_stress_ng"
+ fi
+ echo "Test will use $largest_part, mounted at \"$mount_point\", using $largest_fs"
+ else
+ echo "There appears to be no partition with a suitable filesystem"
+ echo "on $disk_device; please create a suitable partition and re-run"
+ echo "this test."
+ if [ -n "unsupported_fs" ] ; then
+ echo "NOTE: A filesystem of type $unsupported_fs was found, but is not supported"
+ echo "by this test. A Linux-native filesystem (ext2/3/4fs, XFS, JFS, or Btrfs)"
+ echo "is required."
+ fi
+ exit 1
+ fi
+} # mount_filesystem()
+
+
+# Run an individual stressor
+# Input:
+# $1 = stressor name (e.g., copyfile, dentry)
+# $2 = run time
+# Output:
+# had_error -- sets to "1" if an error occurred
+run_stressor() {
+ local runtime="$2"
+ # Multiply runtime by 5; will forcefully terminate if stress-ng
+ # fails to return in that time.
+ end_time=$((runtime*5))
+ echo "Running stress-ng $1 stressor for $2 seconds...."
+ # Use "timeout" command to launch stress-ng, to catch it should it go into
+ # la-la land
+ timeout -s 9 $end_time stress-ng --aggressive --verify --timeout $runtime \
+ --temp-path $test_dir --$1 0
+ return_code="$?"
+ echo "return_code is $return_code"
+ if [ "$return_code" != "0" ] ; then
+ had_error=1
+ echo "*****************************************************************"
+ if [ $return_code = "137" ] ; then
+ echo "** stress-ng disk test timed out and was forcefully terminated!"
+ else
+ echo "** Error $return_code reported on stressor $stressor!)"
+ fi
+ echo "*****************************************************************"
+ had_error=1
+ result=$return_code
+ fi
+} # run_stressor()
+
+
+#
+# Main program body....
+#
+
+
+get_params "$@"
+mount_filesystem
+echo "test_dir is $test_dir"
+
+had_error=0
+
+# Tests Colin said to try but that aren't present as of standard stress-ng
+# in Ubuntu 16.04:
+#
+# "chown" "copyfile" "ioprio" "locka" "lockofd" "madvise" "msync" "seal"
+#
+# TODO: Consider adding these tests for Ubuntu 18.04, or ealier with an
+# updated stress-ng in the certification PPA....
+
+disk_stressors=("aio" "aiol" "chdir" "chmod" "dentry" "dir" "fallocate" \
+ "fiemap" "filename" "flock" "fstat" "hdd" "lease" "lockf" \
+ "mknod" "readahead" "seek" "sync-file" "xattr")
+
+total_runtime=$((${#disk_stressors[@]}*$base_time))
+
+echo "Estimated total run time is $total_runtime seconds"
+echo ""
+
+if [ "$really_run" == "Y" ] ; then
+ mkdir -p "$test_dir"
+ for stressor in ${disk_stressors[@]}; do
+ run_stressor $stressor $base_time
+ done
+ rm -rf "$test_dir"
+ if [ "$mounted_part" == "Y" ] ; then
+ umount "$mount_point"
+ if [ "$made_mountpoint" == "Y" ] ; then
+ rmdir "$mount_point"
+ fi
+ fi
+else
+ echo "To actually run tests, pass the --really-run option."
+ echo "Script is now terminating...."
+ exit 1
+fi
+
+echo "*******************************************************************"
+if [ $had_error = "0" ] ; then
+ echo "** stress-ng disk test passed!"
+else
+ echo "** stress-ng disk test failed; most recent error was $result"
+fi
+echo "*******************************************************************"
+exit $result
diff --git a/bin/dmitest b/bin/dmitest
index cacd22d..fca67ca 100755
--- a/bin/dmitest
+++ b/bin/dmitest
@@ -256,7 +256,7 @@ def main():
if args.test_serials:
retval += serial_tests(args, stream)
if find_in_section(stream, 'Processor Information', 'Version:',
- ['sample'], False):
+ ['sample', 'Genuine Intel\(R\) CPU 0000'], False):
print("*** Invalid processor information!")
retval += 1
diff --git a/bin/fwts_test b/bin/fwts_test
index 31b0545..0e5ffc9 100755
--- a/bin/fwts_test
+++ b/bin/fwts_test
@@ -4,7 +4,7 @@ import sys
import re
from time import time
from argparse import ArgumentParser, RawTextHelpFormatter, REMAINDER
-from subprocess import Popen, PIPE
+from subprocess import Popen, PIPE, check_output
from syslog import *
from distutils.spawn import find_executable
import os
@@ -76,7 +76,7 @@ TESTS = sorted(QA_TESTS + NON_CERT_TESTS + HWE_TESTS)
def get_sleep_times(start_marker, end_marker, sleep_time, resume_time):
logfile = '/var/log/syslog'
- log_fh = open(logfile, 'r')
+ log_fh = open(logfile, 'r', encoding='UTF-8')
line = ''
run = 'FAIL'
sleep_start_time = 0.0
@@ -408,19 +408,32 @@ def main():
results[test] = (Popen(command, stdout=PIPE, shell=True)
.communicate()[0].strip()).decode()
+ # lp:1584607
+ # We append the content of dmesg and syslog at the end of the logfile
+ # generated by FWTS.
+ # FIXME: Commented out after discovered that it created HUGE log files
+ # during stress tests.
+ #with open(args.log, 'a') as logfile:
+ # logfile.write("--- beginning of dmesg ---\n")
+ # logfile.write(check_output('dmesg').decode('utf-8', 'ignore'))
+ # logfile.write("--- end of dmesg ---\n")
+ # logfile.write("--- beginning of syslog ---\n")
+ # logfile.write(check_output(['cat', '/var/log/syslog']).decode('utf-8', 'ignore'))
+ # logfile.write("--- end of syslog ---\n")
+
# parse the summaries
for test in results.keys():
- if results[test] == 'FAILED_CRITICAL':
+ if 'FAILED_CRITICAL' in results[test]:
critical_fails.append(test)
- elif results[test] == 'FAILED_HIGH':
+ if 'FAILED_HIGH' in results[test]:
high_fails.append(test)
- elif results[test] == 'FAILED_MEDIUM':
+ if 'FAILED_MEDIUM' in results[test]:
medium_fails.append(test)
- elif results[test] == 'FAILED_LOW':
+ if 'FAILED_LOW' in results[test]:
low_fails.append(test)
- elif results[test] == 'PASSED':
+ if 'PASSED' in results[test]:
passed.append(test)
- elif results[test] == 'ABORTED':
+ if 'ABORTED' in results[test]:
aborted.append(test)
else:
continue
diff --git a/bin/graphics_env b/bin/graphics_env
new file mode 100755
index 0000000..55a3486
--- /dev/null
+++ b/bin/graphics_env
@@ -0,0 +1,31 @@
+#!/bin/bash
+# This script checks if the submitted VIDEO resource is from AMD and if it is
+# a discrete GPU (graphics_card_resource orders GPUs by index: 1 is the
+# integrated one, 2 is the discrete one).
+#
+# This script has to be sourced in order to set an environment variable that
+# is used by the open source AMD driver to trigger the use of discrete GPU.
+
+DRIVER=$1
+INDEX=$2
+UBUNTU_CODENAME=`lsb_release -c | awk {'print $2'}`
+
+# We only want to set the DRI_PRIME env variable on Xenial (16.04) systems
+# with more than 1 GPU running the amdgpu driver.
+if [[ $DRIVER == "amdgpu" ]]; then
+ if [[ $UBUNTU_CODENAME == "xenial" ]]; then
+ NB_GPU=`udev_resource -l VIDEO | grep -oP -m1 '\d+'`
+ if [ $NB_GPU -gt 1 ]; then
+ if [ $INDEX -gt 1 ]; then
+ # See https://wiki.archlinux.org/index.php/PRIME
+ echo "Setting up PRIME GPU offloading for AMD discrete GPU"
+ PROVIDER_ID=`xrandr --listproviders | grep "Sink Output" | awk {'print $4'} | tail -1`
+ SINK_ID=`xrandr --listproviders | grep "Source Output" | awk {'print $4'} | tail -1`
+ xrandr --setprovideroffloadsink ${PROVIDER_ID} ${SINK_ID}
+ export DRI_PRIME=1
+ else
+ export DRI_PRIME=
+ fi
+ fi
+ fi
+fi
diff --git a/bin/key_test b/bin/key_test
index 35fae77..3cf7ae0 100755
--- a/bin/key_test
+++ b/bin/key_test
@@ -163,7 +163,7 @@ class Reporter(object):
# Check for ESC key pressed
self.show_text(_("Test cancelled"))
self.quit()
- elif code > 1 and code < 10:
+ elif 1 < code < 10 and type(self) == CLIReporter:
# Check for number to skip
self.toggle_key(self.keys[code - 2])
else:
diff --git a/bin/memory_compare b/bin/memory_compare
index c145038..02e2def 100755
--- a/bin/memory_compare
+++ b/bin/memory_compare
@@ -26,7 +26,7 @@ import os
import sys
import re
from math import log, copysign
-from subprocess import check_output, PIPE
+from subprocess import check_output, CalledProcessError, PIPE
from checkbox_support.helpers.human_readable_bytes import HumanReadableBytes
from checkbox_support.parsers.lshwjson import LshwJsonParser
@@ -51,9 +51,13 @@ class LshwJsonResult:
def get_installed_memory_size():
- lshw = LshwJsonParser(check_output(['lshw', '-json'],
- universal_newlines=True,
- stderr=PIPE))
+ try:
+ output = check_output(['lshw', '-json'],
+ universal_newlines=True,
+ stderr=PIPE)
+ except CalledProcessError:
+ return 0
+ lshw = LshwJsonParser(output)
result = LshwJsonResult()
lshw.run(result)
diff --git a/bin/memory_stress_ng b/bin/memory_stress_ng
new file mode 100755
index 0000000..ab65e86
--- /dev/null
+++ b/bin/memory_stress_ng
@@ -0,0 +1,158 @@
+#!/bin/bash
+
+# Script to perform memory stress tests
+#
+# Copyright (c) 2016 Canonical Ltd.
+#
+# Authors
+# Rod Smith <rod.smith@canonical.com>
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License version 3,
+# as published by the Free Software Foundation.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+#
+# The purpose of this script is to run memory stress tests using the
+# stress-ng program. It also happens to impose a heavy CPU load, but
+# that's a side effect of the memory stressors, not their purpose.
+#
+# Usage:
+# memory_stress_ng [ --base-time <time> ] [ --time-per-gig <time> ]
+#
+# Parameters:
+# --base-time is the time in seconds to run each stressor. (The default
+# is 300 seconds, or five minutes.)
+# --time-per-gig is extra time given to SOME stressors, measured in a
+# seconds per GiB way. (The default is 10 seconds per GiB.)
+#
+# There are a total of 22 constant-run-time stressors and 6 variable-
+# run-time stressors. Given the defaults, this works out to a total
+# expected default run time of 8400 seconds (145 minutes) plus 60 seconds
+# per GiB of RAM -- so a system with 16 GiB should take 156 minutes; one
+# with 32 GiB should take 172 minutes, and so on, using the default
+# values.
+
+
+get_params() {
+ base_time=300
+ time_per_gig=10
+ while [ $# -gt 0 ] ; do
+ case $1 in
+ --base-time) base_time="$2"
+ shift
+ ;;
+ --time-per-gig) time_per_gig="$2"
+ shift
+ ;;
+ *) echo "Usage: $0 [ --base-time <time> ] [ --time-per-gig <time> ]"
+ exit 1
+ ;;
+ esac
+ shift
+ done
+ local extra_time=$(($time_per_gig * $total_mem_in_GiB))
+ variable_time=$(($base_time + $extra_time ))
+} # get_params()
+
+
+# Run an individual stressor
+# Input:
+# $1 = stressor name (e.g., malloc, brk)
+# $2 = run time
+# Output:
+# had_error -- sets to "1" if an error occurred
+run_stressor() {
+ local runtime="$2"
+ # Add 50% to runtime; will forcefully terminate if stress-ng
+ # fails to return in that time.
+ end_time=$((runtime*15/10))
+ echo "Running stress-ng $1 stressor for $2 seconds...."
+ # Use "timeout" command to launch stress-ng, to catch it should it go into la-la land
+ timeout -s 9 $end_time stress-ng --aggressive --verify --timeout $runtime --$1 0
+ return_code="$?"
+ echo "return_code is $return_code"
+ if [ "$return_code" != "0" ] ; then
+ had_error=1
+ echo "*****************************************************************"
+ if [ $return_code = "137" ] ; then
+ echo "** stress-ng memory test timed out and was forcefully terminated!"
+ else
+ echo "** Error $return_code reported on stressor $stressor!)"
+ fi
+ echo "*****************************************************************"
+ had_error=1
+ result=$return_code
+ fi
+} # run_stressor()
+
+
+#
+# Main program body....
+#
+
+swap_space=`cat /proc/meminfo | grep -i SwapTotal | tr -s " " | cut -f 2 -d " "`
+if [ -z $swap_space ] || [ $swap_space = "0" ] ; then
+ echo "Swap space unavailable! Please activate swap space and re-run this test!"
+ exit 1
+fi
+
+# Total memory in KiB....
+total_mem_in_KiB=`cat /proc/meminfo | grep MemTotal | tr -s " " | cut -f 2 -d " "`
+total_mem_in_GiB=$((($total_mem_in_KiB/1048576)+1))
+echo "Total memory is $total_mem_in_GiB GiB"
+
+get_params "$@"
+echo "Constant run time is $base_time seconds per stressor"
+echo "Variable run time is $variable_time seconds per stressor"
+
+had_error=0
+
+# NOTE: Specify stressors in two arrays rather than rely on stress-ng's
+# --class memory,vm option for two reasons:
+# 1. We want to run some stressors (those that exhaust all memory)
+# for longer than others, so we need to specify different run
+# times for different stressors.
+# 2. stress-ng is constantly being updated with new tests. We don't
+# want to run one set of tests on SUT 1 and a larger set of tests
+# on SUT 2 if we happen to have updated stress-ng for some unrelated
+# reason (like a bug fix); thus, we specify tests individually.
+
+# Constant-run-time stressors -- run them for the same length of time on all
+# systems....
+crt_stressors=("bsearch" "context" "hsearch" "lsearch" "matrix" \
+ "memcpy" "null" "pipe" "qsort" "stack" "str" "stream" \
+ "tsearch" "vm-rw" "wcs" "zero" "mlock" "mmapfork" "mmapmany" \
+ "mremap" "shm-sysv" "vm-splice")
+crt_runtime=$((${#crt_stressors[@]}*$base_time))
+
+# Variable-run-time stressors -- run them longer on systems with more RAM....
+vrt_stressors=("malloc" "mincore" "vm" "bigheap" "brk" "mmap")
+vrt_runtime=$((${#vrt_stressors[@]}*$variable_time))
+
+total_runtime=$((($crt_runtime + $vrt_runtime) / 60))
+echo "Estimated total run time is $total_runtime minutes"
+echo ""
+
+for stressor in ${crt_stressors[@]}; do
+ run_stressor $stressor $base_time
+done
+
+for stressor in ${vrt_stressors[@]}; do
+ run_stressor $stressor $variable_time
+done
+
+echo "*******************************************************************"
+if [ $had_error = "0" ] ; then
+ echo "** stress-ng memory test passed!"
+else
+ echo "** stress-ng memory test failed; most recent error was $return_code"
+fi
+echo "*******************************************************************"
+exit $result
diff --git a/bin/network b/bin/network
index 06d106e..a7f4b88 100755
--- a/bin/network
+++ b/bin/network
@@ -24,6 +24,7 @@ from argparse import (
ArgumentParser,
RawTextHelpFormatter
)
+import datetime
import fcntl
import logging
import os
@@ -56,7 +57,9 @@ class IPerfPerformanceTest(object):
cpu_load_fail_threshold,
iperf3,
protocol="tcp",
- data_size="1"):
+ data_size="1",
+ run_time=None,
+ scan_timeout=3600):
self.iface = Interface(interface)
self.target = target
@@ -65,6 +68,8 @@ class IPerfPerformanceTest(object):
self.cpu_load_fail_threshold = cpu_load_fail_threshold
self.iperf3 = iperf3
self.data_size = data_size
+ self.run_time = run_time
+ self.scan_timeout = scan_timeout
def run(self):
# if max_speed is 0, assume it's wifi and move on
@@ -79,23 +84,28 @@ class IPerfPerformanceTest(object):
logging.error("Check your device configuration and try again")
return 1
- # Because we can vary the data size, we need to vary the timeout as
- # well. It takes an estimated 15 minutes to send 1GB over 10Mb/s.
- # 802.11b is 11 Mb/s. So we'll assume 1.2x15 minutes or 18 minutes
- # or 1080 seconds per Gigabit. This will allow for a long period of
- # time without timeout to catch devices that slow down, and also not
- # prematurely end iperf on low-bandwidth devices.
-
- self.timeout = 1080*int(self.data_size)
-
+ # Set the correct binary to run
if (self.iperf3):
- cmd = "timeout {} iperf3 -c {} -n {}G -i 1 -f m -V".format(
- self.timeout, self.target, self.data_size)
+ self.executable = "iperf3 -V"
else:
- cmd = "timeout {} iperf -c {} -n {}G -i 1 -f m".format(
- self.timeout, self.target, self.data_size)
+ self.executable = "iperf"
- logging.debug(cmd)
+ # If we set run_time, use that instead to build the command.
+ if self.run_time is not None:
+ cmd = "{} -c {} -t {} -i 1 -f m".format(
+ self.executable, self.target, self.run_time)
+ else:
+ # Because we can vary the data size, we need to vary the timeout as
+ # well. It takes an estimated 15 minutes to send 1GB over 10Mb/s.
+ # 802.11b is 11 Mb/s. So we'll assume 1.2x15 minutes or 18 minutes
+ # or 1080 seconds per Gigabit. This will allow for a long period of
+ # time without timeout to catch devices that slow down, and also
+ # not prematurely end iperf on low-bandwidth devices.
+ self.timeout = 1080*int(self.data_size)
+ cmd = "timeout {} {} -c {} -n {}G -i 1 -f -m".format(
+ self.timeout, self.executable, self.target, self.data_size)
+
+ logging.debug("Executing command {}".format(cmd))
logging.info("Starting iperf against {}, this could take a while...".
format(self.target))
try:
@@ -375,6 +385,8 @@ def run_test(args, test_target):
args.iperf3)
if args.datasize:
iperf_benchmark.data_size = args.datasize
+ if args.runtime:
+ iperf_benchmark.run_time = args.runtime
run_num = 0
while not error_number and run_num < args.num_runs:
error_number = iperf_benchmark.run()
@@ -405,12 +417,16 @@ def interface_test(args):
if not test_targets or "example.com" in test_targets:
# Default values found in config file
logging.error("Target server has not been supplied.")
- logging.info("Configuration settings can be configured 3 different ways:")
- logging.info("1- If calling the script directly, pass the --target option")
+ logging.info("Configuration settings can be configured 3 different "
+ "ways:")
+ logging.info("1- If calling the script directly, pass the --target "
+ "option")
logging.info("2- Define the TEST_TARGET_IPERF environment variable")
- logging.info("3- (If running the test via checkbox/plainbox, define the ")
+ logging.info("3- (If running the test via checkbox/plainbox, define "
+ "the ")
logging.info("target in /etc/xdg/canonical-certification.conf)")
- logging.info("Please run this script with -h to see more details on how to configure")
+ logging.info("Please run this script with -h to see more details on "
+ "how to configure")
sys.exit(1)
# Testing begins here!
@@ -448,12 +464,23 @@ def interface_test(args):
if error_number == 0:
test_targets_list = test_targets.split(",")
test_targets_list.reverse()
- # Keep testing until a success or we run out of targets
+ start_time = datetime.datetime.now()
+ first_loop = True
+ # Keep testing until a success or we run out of both targets and time
while test_targets_list:
test_target = test_targets_list.pop().strip()
error_number = run_test(args, test_target)
- if not error_number:
+ elapsed_seconds = (datetime.datetime.now() - start_time).seconds
+ if (elapsed_seconds > args.scan_timeout and not first_loop) or \
+ not error_number:
break
+ if not test_targets_list:
+ logging.info(" Exhausted test target list; trying again "
+ .center(60, "="))
+ test_targets_list = test_targets.split(",")
+ test_targets_list.reverse()
+ time.sleep(30)
+ first_loop = False
for iface in extra_interfaces:
logging.debug("Restoring interface:%s", iface)
@@ -549,6 +576,8 @@ TEST_TARGET_IPERF = iperf-server.example.com
'info', help=("Gather network info"))
# Sub test options
+ action = test_parser.add_mutually_exclusive_group()
+
test_parser.add_argument(
'-i', '--interface', type=str, required=True)
test_parser.add_argument(
@@ -558,11 +587,24 @@ TEST_TARGET_IPERF = iperf-server.example.com
test_parser.add_argument(
'-3', '--iperf3', default=False, action="store_true")
test_parser.add_argument('--target', type=str)
- test_parser.add_argument(
+ action.add_argument(
'--datasize', type=str,
default="1",
- help=("Amount of data to send. For iperf tests this will direct "
- "iperf to send DATASIZE GB of data to the target."))
+ help=("CANNOT BE USED WITH --runtime. Amount of data to send. For "
+ "iperf tests this will direct iperf to send DATASIZE GB of "
+ "data to the target."))
+ action.add_argument(
+ '--runtime', type=int,
+ default=60,
+ help=("CANNOT BE USED WITH --datasize. Send data for *runtime* "
+ "seconds. For iperf tests, this will send data for the amount "
+ "of time indicated, rather than until a certain file size is "
+ "reached."))
+ test_parser.add_argument(
+ '--scan-timeout', type=int,
+ default=60,
+ help=("Sets the maximum time, in seconds, the test will scan for "
+ "iperf servers before giving up."))
test_parser.add_argument(
'--config', type=str,
default="/etc/checkbox.d/network.cfg",
@@ -617,8 +659,11 @@ TEST_TARGET_IPERF = iperf-server.example.com
info_parser.set_defaults(func=interface_info)
args = parser.parse_args()
- if args.cpu_load_fail_threshold != 100 and not args.iperf3:
- parser.error('--cpu-load-fail-threshold can only be set with --iperf3.')
+ if (args.func.__name__ is interface_test and
+ not args.cpu_load_fail_threshold != 100 and
+ not args.iperf3):
+ parser.error('--cpu-load-fail-threshold can only be set with '
+ '--iperf3.')
if args.debug:
logging.basicConfig(level=logging.DEBUG)
diff --git a/bin/network_device_info b/bin/network_device_info
index f4b7911..19e5b68 100755
--- a/bin/network_device_info
+++ b/bin/network_device_info
@@ -236,7 +236,14 @@ def main(args):
value, network_dev._driver_ver))
else:
print("%s: %s" % (attribute.capitalize(), value))
-
+ vendor_id = getattr(device, 'vendor_id')
+ product_id = getattr(device, 'product_id')
+ subvendor_id = getattr(device, 'subvendor_id')
+ subproduct_id = getattr(device, 'subproduct_id')
+ if vendor_id and product_id:
+ print("ID: [{0:04x}:{1:04x}]".format(vendor_id, product_id))
+ if subvendor_id and subproduct_id:
+ print("Subsystem ID: [{0:04x}:{1:04x}]".format(subvendor_id, subproduct_id))
print()
try:
diff --git a/bin/pm_test b/bin/pm_test
index c2e0efb..c0fb879 100755
--- a/bin/pm_test
+++ b/bin/pm_test
@@ -139,7 +139,7 @@ class PowerManagementOperation(object):
return
if fwts:
script_name = 'fwts_test'
- command_tpl = '{} -s s3 --s3-sleep-delay=30 --s3-multiple={}'
+ command_tpl = '{} -s s3 --s3-device-check --s3-sleep-delay=30 --s3-multiple={}'
else:
script_name = 'sleep_test'
command_tpl = '{} -s mem -p -i {} -w 10'
@@ -148,10 +148,14 @@ class PowerManagementOperation(object):
command_str = command_tpl.format(script_path, cycles_count)
logging.info('Running suspend/resume cycles')
logging.debug('Executing: {0!r}...'.format(command_str))
- # We call sleep_test script and log its output as it contains
- # average times we need to compute global average times later.
- logging.info(subprocess.check_output(command_str,
- universal_newlines=True, shell=True))
+ try:
+ # We call sleep_test or fwts_test script and log its output as it
+ # contains average times we need to compute global average times later.
+ logging.info(subprocess.check_output(command_str,
+ universal_newlines=True, shell=True))
+ except subprocess.CalledProcessError as e:
+ logging.error('Error while running {0}:'.format(e.cmd))
+ logging.exception(e.output)
def summary(self):
"""
@@ -320,11 +324,11 @@ class Command(object):
'- returncode:\n{0}'.format(self.process.returncode)]
if stdout:
if type(stdout) is bytes:
- stdout = stdout.decode('utf-8')
+ stdout = stdout.decode('utf-8', 'ignore')
message.append('- stdout:\n{0}'.format(stdout))
if stderr:
if type(stderr) is bytes:
- stderr = stderr.decode('utf-8')
+ stderr = stderr.decode('utf-8', 'ignore')
message.append('- stderr:\n{0}'.format(stderr))
logging.debug('\n'.join(message))
@@ -365,21 +369,27 @@ class CountdownDialog(Gtk.Dialog):
{'template': 'Gathering hardware information in {time} seconds...',
'timeout': hardware_delay,
'callback': self.on_hardware_info_timeout_cb}
+ system_info_event = \
+ {'template': 'Gathering system information in {time} seconds...',
+ 'timeout': 2,
+ 'callback': self.on_system_info_timeout_cb}
if iterations == 0:
- # In first iteration, gather hardware information directly
- # and perform pm-operation
+ # In first iteration, gather hardware and system information
+ # directly and perform pm-operation
self.on_hardware_info_timeout_cb()
+ self.on_system_info_timeout_cb()
self.events = [operation_event]
elif iterations < iterations_count:
# In last iteration, wait before gathering hardware information
# and perform pm-operation
self.events = [operation_event,
- hardware_info_event]
+ hardware_info_event,
+ system_info_event]
else:
# In last iteration, wait before gathering hardware information
# and finish the test
- self.events = [hardware_info_event]
+ self.events = [hardware_info_event, system_info_event]
self.label = Gtk.Label()
self.vbox.pack_start(self.label, True, True, 0)
@@ -477,6 +487,20 @@ class CountdownDialog(Gtk.Dialog):
# errors can be retrieved by pm_log_check
logging.error('Problem found in logs by fwts')
+ def on_system_info_timeout_cb(self):
+ """
+ Gather system information and print it to logs
+ """
+ logging.info('Gathering system information...')
+ # FIXME: Commented out as it created huge log files
+ # during stress tests.
+ #logging.debug('--- beginning of dmesg ---')
+ #logging.debug(Command('dmesg').run().stdout)
+ #logging.debug('--- end of dmesg ---')
+ #logging.debug('--- beginning of syslog ---')
+ #logging.debug(Command('cat /var/log/syslog').run().stdout)
+ #logging.debug('--- end of syslog ---')
+
class MessageDialog(object):
"""
diff --git a/bin/pulse-active-port-change b/bin/pulse-active-port-change
index 0632444..f2d9527 100755
--- a/bin/pulse-active-port-change
+++ b/bin/pulse-active-port-change
@@ -72,6 +72,7 @@ class AudioPlugDetection:
cfg = set()
for record in doc.record_list:
active_port = None
+ port_availability = None
# We go through the attribute list once to try to find an active port
for attr in record.attribute_list:
if attr.name == "Active Port":
diff --git a/bin/removable_storage_test b/bin/removable_storage_test
index 8e7732b..3502455 100755
--- a/bin/removable_storage_test
+++ b/bin/removable_storage_test
@@ -6,11 +6,15 @@ import dbus
import hashlib
import logging
import os
+import re
+import shlex
import subprocess
import sys
import tempfile
import time
+import gi
+gi.require_version('GUdev', '1.0')
from gi.repository import GUdev
from checkbox_support.dbus import connect_to_system_bus
@@ -26,6 +30,7 @@ from checkbox_support.helpers.human_readable_bytes import HumanReadableBytes
from checkbox_support.parsers.udevadm import CARD_READER_RE
from checkbox_support.parsers.udevadm import GENERIC_RE
from checkbox_support.parsers.udevadm import FLASH_RE
+from checkbox_support.parsers.udevadm import find_pkname_is_root_mountpoint
from checkbox_support.udev import get_interconnect_speed
from checkbox_support.udev import get_udev_block_devices
from checkbox_support.udev import get_udev_xhci_devices
@@ -97,7 +102,7 @@ def md5_hash_file(path):
class DiskTest():
''' Class to contain various methods for testing removable disks '''
- def __init__(self, device, memorycard):
+ def __init__(self, device, memorycard, lsblkcommand):
self.rem_disks = {} # mounted before the script running
self.rem_disks_nm = {} # not mounted before the script running
self.rem_disks_memory_cards = {}
@@ -106,8 +111,10 @@ class DiskTest():
# LP: #1313581, TODO: extend to be rem_disks_driver
self.rem_disks_xhci = {}
self.data = ''
+ self.lsblk = ''
self.device = device
self.memorycard = memorycard
+ self._run_lsblk(lsblkcommand)
self._probe_disks()
def read_file(self, source):
@@ -145,6 +152,24 @@ class DiskTest():
logging.error("Unable to remove tempfile %s", target)
logging.error(" %s", exc)
+ def _find_parent(self, device):
+ if self.lsblk:
+ pattern = re.compile('KNAME="(?P<KNAME>.*)" '
+ 'TYPE="(?P<TYPE>.*)" '
+ 'MOUNTPOINT="(?P<MOUNTPOINT>.*)"')
+ for line in self.lsblk.splitlines():
+ m = pattern.match(line)
+ if m and device.startswith(m.group('KNAME')):
+ return m.group('KNAME')
+ return False
+
+ def _run_lsblk(self, lsblkcommand):
+ try:
+ self.lsblk = subprocess.check_output(shlex.split(lsblkcommand),
+ universal_newlines=True)
+ except subprocess.CalledProcessError as exc:
+ raise SystemExit(exc)
+
def _probe_disks(self):
"""
Internal method used to probe for available disks
@@ -223,6 +248,9 @@ class DiskTest():
# Get the block device pathname,
# to avoid the confusion, this is something like /dev/sdbX
dev_file = udev_device.get_device_file()
+ parent = self._find_parent(dev_file.replace('/dev/', ''))
+ if parent and find_pkname_is_root_mountpoint(parent, self.lsblk):
+ continue
# Get the list of mount points of this block device
mount_points = (
udisks2_object[UDISKS2_FILESYSTEM_INTERFACE]['MountPoints'])
@@ -280,15 +308,15 @@ class DiskTest():
parent_vendor = parent_props.Get(udisks, "DriveVendor")
parent_media = parent_props.Get(udisks, "DriveMedia")
if self.memorycard:
- if (dev_bus != 'sdio'
- and not FLASH_RE.search(parent_media)
- and not CARD_READER_RE.search(parent_model)
- and not GENERIC_RE.search(parent_vendor)):
+ if (dev_bus != 'sdio' and not
+ FLASH_RE.search(parent_media) and not
+ CARD_READER_RE.search(parent_model) and not
+ GENERIC_RE.search(parent_vendor)):
continue
else:
- if (FLASH_RE.search(parent_media)
- or CARD_READER_RE.search(parent_model)
- or GENERIC_RE.search(parent_vendor)):
+ if (FLASH_RE.search(parent_media) or
+ CARD_READER_RE.search(parent_model) or
+ GENERIC_RE.search(parent_vendor)):
continue
dev_file = str(device_props.Get(udisks, "DeviceFile"))
dev_speed = str(device_props.Get(udisks,
@@ -376,9 +404,11 @@ class DiskTest():
# then compare this pci slot name to the other
dl = devpath.split('/')
s = set([x for x in dl if dl.count(x) > 1])
- if ((pci_slot_name in dl)
- and (dl.index(pci_slot_name) < dl.index('block'))
- and (not(pci_slot_name in s))):
+ if (
+ (pci_slot_name in dl) and
+ (dl.index(pci_slot_name) < dl.index('block')) and
+ (not(pci_slot_name in s))
+ ):
# 1. there is such pci_slot_name
# 2. sysfs topology looks like
# DEVPATH = ....../pci_slot_name/....../block/......
@@ -451,10 +481,14 @@ def main():
choices=['xhci_hcd'],
help=("Detect the driver of the host controller."
"Only xhci_hcd for usb3 is supported so far."))
+ parser.add_argument("--lsblkcommand", action='store', type=str,
+ default="lsblk -i -n -P -o KNAME,TYPE,MOUNTPOINT",
+ help=("Command to execute to get lsblk information. "
+ "Only change it if you know what you're doing."))
args = parser.parse_args()
- test = DiskTest(args.device, args.memorycard)
+ test = DiskTest(args.device, args.memorycard, args.lsblkcommand)
errors = 0
# If we do have removable drives attached and mounted
@@ -505,8 +539,8 @@ def main():
% errors_mount)
errors += errors_mount
- disks_all = dict(list(test.rem_disks.items())
- + list(test.rem_disks_nm.items()))
+ disks_all = dict(list(test.rem_disks.items()) +
+ list(test.rem_disks_nm.items()))
if len(disks_all) > 0:
print("Found the following mounted %s partitions:"
@@ -528,8 +562,8 @@ def main():
disks_eligible = {disk: disks_all[disk] for disk in disks_all
if not args.min_speed or
- int(test.rem_disks_speed[disk])
- >= int(args.min_speed)}
+ int(test.rem_disks_speed[disk]) >=
+ int(args.min_speed)}
if len(disks_eligible) == 0:
logging.error(
"No %s disks with speed higher than %s bits/s",
@@ -542,21 +576,24 @@ def main():
stat = os.statvfs(path)
disks_freespace[disk] = stat.f_bfree * stat.f_bsize
smallest_freespace = min(disks_freespace.values())
+ smallest_partition = [d for d, v in disks_freespace.items() if
+ v == smallest_freespace][0]
desired_size = args.size
if desired_size > smallest_freespace:
if args.auto_reduce_size:
min_space = HumanReadableBytes("1MiB")
if smallest_freespace < min_space:
- raise IOError("Not enough space. {} is required"
- .format(min_space))
- new_size = HumanReadableBytes(int(0.8 * smallest_freespace))
+ sys.exit("Not enough space. {} is required on {}"
+ .format(min_space, smallest_partition))
+ new_size = HumanReadableBytes(
+ int(0.8 * smallest_freespace))
logging.warning("Automatically reducing test data size"
". {} requested. Reducing to {}."
.format(desired_size, new_size))
desired_size = new_size
else:
- raise IOError("Not enough space. {} is required"
- .format(desired_size))
+ sys.exit("Not enough space. {} is required on {}"
+ .format(desired_size, smallest_partition))
# Generate our data file(s)
for count in range(args.count):
test_files[count] = RandomData(desired_size)
@@ -611,8 +648,8 @@ def main():
# avg_write_time = total_write_time / args.count
try:
avg_write_speed = ((
- total_write_size / total_write_time)
- / 1024 / 1024)
+ total_write_size / total_write_time) /
+ 1024 / 1024)
except ZeroDivisionError:
avg_write_speed = 0.00
finally:
diff --git a/bin/removable_storage_watcher b/bin/removable_storage_watcher
index 97fd071..794bb1b 100755
--- a/bin/removable_storage_watcher
+++ b/bin/removable_storage_watcher
@@ -7,6 +7,8 @@ import dbus
import logging
import sys
+import gi
+gi.require_version('GUdev', '1.0')
from gi.repository import GObject, GUdev
from checkbox_support.dbus import connect_to_system_bus
diff --git a/bin/sleep_time_check b/bin/sleep_time_check
index 87fce15..9e7f084 100755
--- a/bin/sleep_time_check
+++ b/bin/sleep_time_check
@@ -33,7 +33,7 @@ def main():
lines = file.readlines()
except IOError as e:
print(e)
- return False
+ return 1
sleep_time = None
sleep_times = []
@@ -51,23 +51,23 @@ def main():
if (sleep_time is None or resume_time is None) or \
(len(sleep_times) != len(resume_times)):
print("ERROR: One or more times was not reported correctly")
- return False
+ return 1
print("Average time to enter sleep state: %.4f seconds" % mean(sleep_times))
print("Average time to resume from sleep state: %.4f seconds" % mean(resume_times))
- failed = False
+ failed = 0
if sleep_time > args.sleep_threshold:
print("System failed to suspend in less than %s seconds" %
args.sleep_threshold)
- failed = True
+ failed = 1
if resume_time > args.resume_threshold:
print("System failed to resume in less than %s seconds" %
args.resume_threshold)
- failed = True
+ failed = 1
if sleep_time <= 0.00 or resume_time <= 0.00:
print("ERROR: One or more times was not reported correctly")
- failed = True
+ failed = 1
return failed
diff --git a/bin/touchpad_test b/bin/touchpad_test
index 814f0c2..7152745 100755
--- a/bin/touchpad_test
+++ b/bin/touchpad_test
@@ -1,12 +1,10 @@
#!/usr/bin/env python3
import sys
-
import gettext
-from gettext import gettext as _
+from gettext import gettext as _
from gi.repository import Gio, Gtk, Gdk
-
from optparse import OptionParser
@@ -25,12 +23,18 @@ class Direction(object):
class GtkScroller(object):
- touchpad_key = "org.gnome.settings-daemon.peripherals.touchpad"
exit_code = EXIT_WITH_FAILURE
def __init__(self, directions, edge_scroll=False):
self.directions = directions
self.edge_scroll = edge_scroll
+ self.touchpad_key = "org.gnome.settings-daemon.peripherals.touchpad"
+ self.horiz_scroll_key = True
+ source = Gio.SettingsSchemaSource.get_default()
+ if not source.lookup(self.touchpad_key, True):
+ self.touchpad_key = "org.gnome.desktop.peripherals.touchpad"
+ self.horiz_scroll_key = False
+ self.touchpad_settings = Gio.Settings.new(self.touchpad_key)
# Initialize GTK constants
self.ICON_SIZE = Gtk.IconSize.BUTTON
@@ -74,7 +78,7 @@ class GtkScroller(object):
self.show_text(
_("Please move the mouse cursor to this window.") +
- "\n" +
+ "\n" +
_("Then scroll in each direction on your touchpad."))
def _add_button(self, context, stock):
@@ -116,25 +120,27 @@ class GtkScroller(object):
def run(self):
# Save touchpad settings.
- touchpad_settings = Gio.Settings.new(self.touchpad_key)
- self.saved_horiz_scroll_enabled = touchpad_settings.get_boolean(
- "horiz-scroll-enabled")
- self.saved_scroll_method = touchpad_settings.get_string(
+ if self.horiz_scroll_key:
+ self.saved_horiz_scroll_enabled = \
+ self.touchpad_settings.get_boolean("horiz-scroll-enabled")
+ self.saved_scroll_method = self.touchpad_settings.get_string(
"scroll-method")
# Set touchpad settings.
- touchpad_settings.set_boolean("horiz-scroll-enabled", True)
+ if self.horiz_scroll_key:
+ self.touchpad_settings.set_boolean("horiz-scroll-enabled", True)
if self.edge_scroll:
- touchpad_settings.set_string("scroll-method", "edge-scrolling")
+ self.touchpad_settings.set_string(
+ "scroll-method", "edge-scrolling")
Gtk.main()
def quit(self):
# Reset touchpad settings.
- touchpad_settings = Gio.Settings.new(self.touchpad_key)
- touchpad_settings.set_boolean(
- "horiz-scroll-enabled", self.saved_horiz_scroll_enabled)
- touchpad_settings.set_string(
+ if self.horiz_scroll_key:
+ self.touchpad_settings.set_boolean(
+ "horiz-scroll-enabled", self.saved_horiz_scroll_enabled)
+ self.touchpad_settings.set_string(
"scroll-method", self.saved_scroll_method)
Gtk.main_quit()
@@ -190,7 +196,7 @@ def main(args):
try:
scroller.run()
except KeyboardInterrupt:
- scroller.show_text(_("Test interrupted"), self.status)
+ scroller.show_text(_("Test interrupted"), scroller.status)
scroller.quit()
return scroller.exit_code
diff --git a/bin/virtualization b/bin/virtualization
index f32c7ce..fe59656 100755
--- a/bin/virtualization
+++ b/bin/virtualization
@@ -44,13 +44,11 @@ import tempfile
import tarfile
import time
import urllib.request
+from urllib.parse import urlparse
DEFAULT_TIMEOUT = 500
-class XENTest(object):
- pass
-
# The "TAR" type is a tarball that contains both
# a disk image and a kernel binary. This is useful
# on architectures that don't (yet) have a bootloader
@@ -88,7 +86,7 @@ QEMU_ARCH_CONFIG = {
},
'amd64': {
'cloudimg_type': CLOUD_IMAGE_TYPE_DISK,
- 'cloudimg_arch': 'i386',
+ 'cloudimg_arch': 'amd64',
'qemu_bin': 'qemu-system-x86_64',
'qemu_disk_type': QEMU_DISK_TYPE_VIRTIO,
'qemu_extra_args': [
@@ -110,7 +108,19 @@ QEMU_ARCH_CONFIG = {
'qemu_bin': 'qemu-system-ppc64',
'qemu_disk_type': QEMU_DISK_TYPE_VIRTIO,
'qemu_extra_args': [
- '-machine', 'pseries',
+ '-enable-kvm',
+ '-machine', 'pseries,usb=off',
+ '-cpu', 'POWER8',
+ ],
+ },
+ 's390x': {
+ 'cloudimg_type': CLOUD_IMAGE_TYPE_DISK,
+ 'cloudimg_arch': 's390x',
+ 'qemu_bin': 'qemu-system-s390x',
+ 'qemu_disk_type': QEMU_DISK_TYPE_VIRTIO,
+ 'qemu_extra_args': [
+ '-enable-kvm',
+ '-machine', 's390-ccw-virtio-2.5',
],
},
}
@@ -124,7 +134,7 @@ class QemuRunner(object):
# Parameters common to all architectures
self.params = [
self.config['qemu_bin'],
- "-m", "256",
+ "-m", "1024",
"-display", "none",
"-nographic",
"-net", "nic",
@@ -133,11 +143,8 @@ class QemuRunner(object):
# If arch is arm64, add the machine type for gicv3, or default to old
# type
if self.arch == 'arm64':
- if glob("/sys/firmware/devicetree/base/interrupt-controller@*/compatible"):
- self.config['qemu_extra_args'].extend(
- ['-machine', 'virt,gic_version=3'])
- else:
- self.config['qemu_extra_args'].extend(['-machine', 'virt'])
+ (self.config['qemu_extra_args'].
+ extend(['-machine', 'virt,gic_version=host']))
# Add any architecture-specific parameters
if 'qemu_extra_args' in self.config:
self.params = self.params + self.config['qemu_extra_args']
@@ -191,35 +198,68 @@ class KVMTest(object):
self.arch = check_output(['dpkg', '--print-architecture'],
universal_newlines=True).strip()
self.qemu_config = QEMU_ARCH_CONFIG[self.arch]
+ self.release = lsb_release.get_lsb_information()["CODENAME"]
- def download_image(self):
+ def url_to_path(self, image_path):
"""
- Downloads Cloud image for same release as host machine
+ Test the provided image path to determine if it's a URL or or a simple
+ file path
+ """
+ url = urlparse(image_path)
+ if url.scheme == '' or url.scheme == 'file':
+ # Gives us path wheter we specify a filesystem path or a file URL
+ logging.debug("Cloud image exists locally at %s" % url.path)
+ return url.path
+ elif url.scheme == 'http' or url.scheme == 'ftp':
+ # Gives us the stuff needed to build the URL to download the image
+ return self.download_image(image_path)
+
+ def construct_cloud_filename(self):
+ """
+ Build a URL for official Ubuntu images hosted either at
+ cloud-images.ubuntu.com or on a maas server hosting a mirror of
+ cloud-images.ubuntu.com
"""
-
- # Check Ubuntu release info. Example {quantal, precise}
- release = lsb_release.get_lsb_information()["CODENAME"]
-
- # Construct URL
- cloud_url = "http://cloud-images.ubuntu.com"
-
if self.qemu_config['cloudimg_type'] == CLOUD_IMAGE_TYPE_TAR:
cloud_iso = "%s-server-cloudimg-%s.tar.gz" % (
- release, self.qemu_config['cloudimg_arch'])
+ self.release, self.qemu_config['cloudimg_arch'])
elif self.qemu_config['cloudimg_type'] == CLOUD_IMAGE_TYPE_DISK:
cloud_iso = "%s-server-cloudimg-%s-disk1.img" % (
- release, self.qemu_config['cloudimg_arch'])
+ self.release, self.qemu_config['cloudimg_arch'])
else:
logging.error("Unknown cloud image type")
- return False
- image_url = "/".join((
- cloud_url, release, "current", cloud_iso))
+ sys.exit(1)
+ return cloud_iso
+ def download_image(self, image_url=None):
+ """
+ Downloads Cloud image for same release as host machine
+ """
+ if image_url is None:
+ # If we have not specified a URL to get our images from, default
+ # to ubuntu.com
+ cloud_url = "http://cloud-images.ubuntu.com"
+ cloud_iso = self.construct_cloud_filename()
+ full_url = "/".join((
+ cloud_url, self.release, "current", cloud_iso))
+ else:
+ url = urlparse(image_url)
+ if url.path.endswith('/') or url.path == '':
+ # If we have a relative URL (MAAS server mirror)
+ cloud_url = image_url
+ cloud_iso = self.construct_cloud_filename()
+ full_url = "/".join((
+ cloud_url, cloud_iso))
+ else:
+ # Assume anything else is an absolute URL to a remote server
+ cloud_iso = url.path.split('/')[-1]
+ cloud_url = "{}://{}".format(url.scheme, url.netloc)
+ full_url = image_url
logging.debug("Downloading {}, from {}".format(cloud_iso, cloud_url))
# Attempt download
try:
- resp = urllib.request.urlretrieve(image_url, cloud_iso)
+ resp = urllib.request.urlretrieve(full_url, cloud_iso)
except (IOError,
OSError,
urllib.error.HTTPError,
@@ -324,6 +364,22 @@ final_message: CERTIFICATION BOOT COMPLETE
logging.exception("Cloud data disk creation failed")
def start(self):
+ if self.arch == 'arm64':
+ # lp:1548539 - For arm64, we need to make sure we're using qemu
+ # later than 2.0.0 to enable gic_version functionality
+ logging.debug('Checking QEMU version for arm64 arch')
+ cmd = 'apt-cache policy qemu-system-arm | grep Installed'
+ installed_version = (check_output(['/bin/bash', '-c', cmd]).
+ decode().split(':', 1)[1].strip())
+
+ cmd = ('dpkg --compare-versions \"2.0.0\" \"lt\" \"{}\"'
+ .format(installed_version))
+ retcode = call(['/bin/bash', '-c', cmd])
+ if retcode != 0:
+ logging.error('arm64 needs qemu-system version later than '
+ '2.0.0')
+ return 1
+
logging.debug('Starting KVM Test')
status = 1
# Create temp directory:
@@ -337,6 +393,10 @@ final_message: CERTIFICATION BOOT COMPLETE
logging.debug('No image specified, downloading one now.')
# Download cloud image
self.image = self.download_image()
+ else:
+ logging.debug('Cloud image location specified: %s.' %
+ self.image)
+ self.image = self.url_to_path(self.image)
if self.image and os.path.isfile(self.image):
@@ -385,34 +445,10 @@ final_message: CERTIFICATION BOOT COMPLETE
def test_kvm(args):
print("Executing KVM Test", file=sys.stderr)
- DEFAULT_CFG = "/etc/checkbox.d/virtualization.cfg"
image = ""
timeout = ""
- # Configuration data can come from three sources.
- # Lowest priority is the config file.
- config_file = DEFAULT_CFG
- config = configparser.SafeConfigParser()
-
- try:
- config.readfp(open(config_file))
- except IOError:
- logging.warn("No config file found")
- else:
- try:
- timeout = config.getfloat("KVM", "timeout")
- except ValueError:
- logging.warning('Invalid or Empty timeout in config file. '
- 'Falling back to default')
- except configparser.NoSectionError as e:
- logging.exception(e)
-
- try:
- image = config.get("KVM", "image")
- except configparser.NoSectionError:
- logging.exception('Invalid or Empty image in config file.')
-
- # Next in priority are environment variables.
+ # First in priority are environment variables.
if 'KVM_TIMEOUT' in os.environ:
try:
timeout = float(os.environ['KVM_TIMEOUT'])
@@ -431,7 +467,13 @@ def test_kvm(args):
image = args.image
kvm_test = KVMTest(image, timeout, args.log_file)
+ # If arch is ppc64el, disable smt
+ if kvm_test.arch == 'ppc64el':
+ os.system("/usr/sbin/ppc64_cpu --smt=off")
result = kvm_test.start()
+ # If arch is ppc64el, re-enable smt
+ if kvm_test.arch == 'ppc64el':
+ os.system("/usr/sbin/ppc64_cpu --smt=on")
sys.exit(result)