diff options
Diffstat (limited to 'bin')
-rwxr-xr-x | bin/accelerometer_test | 6 | ||||
-rwxr-xr-x | bin/audio_test | 1 | ||||
-rwxr-xr-x | bin/battery_test | 2 | ||||
-rwxr-xr-x | bin/boot_mode_test_snappy.py | 125 | ||||
-rwxr-xr-x | bin/camera_test_legacy | 578 | ||||
-rwxr-xr-x | bin/check-prerelease | 137 | ||||
-rwxr-xr-x | bin/cycle_vts | 6 | ||||
-rwxr-xr-x | bin/disk_info | 2 | ||||
-rwxr-xr-x | bin/disk_read_performance_test | 2 | ||||
-rwxr-xr-x | bin/efi-pxeboot | 3 | ||||
-rwxr-xr-x | bin/fde_tests.py | 101 | ||||
-rwxr-xr-x | bin/gatt-notify-test.py | 241 | ||||
-rwxr-xr-x | bin/gpu_test | 2 | ||||
-rwxr-xr-x | bin/graphics_driver | 5 | ||||
-rwxr-xr-x | bin/graphics_env | 2 | ||||
-rwxr-xr-x | bin/gst_pipeline_test | 3 | ||||
-rwxr-xr-x | bin/ipmi_test | 8 | ||||
-rwxr-xr-x | bin/key_test | 4 | ||||
-rwxr-xr-x | bin/keyboard_test | 4 | ||||
-rwxr-xr-x | bin/lock_screen_watcher | 2 | ||||
-rwxr-xr-x | bin/manage_compiz_plugin | 9 | ||||
-rwxr-xr-x | bin/net_if_watcher.py | 43 | ||||
-rwxr-xr-x | bin/network_restart | 3 | ||||
-rwxr-xr-x | bin/pm_test | 93 | ||||
-rwxr-xr-x | bin/removable_storage_test | 18 | ||||
-rwxr-xr-x | bin/resolution_test | 3 | ||||
-rwxr-xr-x | bin/socketcan_test.py | 199 | ||||
-rwxr-xr-x | bin/touchpad_test | 4 | ||||
-rwxr-xr-x | bin/xrandr_cycle | 7 |
29 files changed, 980 insertions, 633 deletions
diff --git a/bin/accelerometer_test b/bin/accelerometer_test index 93130fc..2d95ca8 100755 --- a/bin/accelerometer_test +++ b/bin/accelerometer_test @@ -24,13 +24,17 @@ accelerometer, and check to be sure that the x, y, z axis respond to physical movement of hardware. ''' from argparse import ArgumentParser -from gi.repository import Gdk, GLib, Gtk +import gi import logging import os import re import sys import threading import time +gi.require_version('Gdk', '3.0') +gi.require_version('GLib', '2.0') +gi.require_version("Gtk", "3.0") +from gi.repository import Gdk, GLib, Gtk from subprocess import Popen, PIPE, check_output, STDOUT, CalledProcessError from checkbox_support.parsers.modinfo import ModinfoParser diff --git a/bin/audio_test b/bin/audio_test index f0b73a4..466b760 100755 --- a/bin/audio_test +++ b/bin/audio_test @@ -12,6 +12,7 @@ import sys import time try: import gi + gi.require_version('GLib', '2.0') gi.require_version('Gst','1.0') from gi.repository import GObject from gi.repository import Gst diff --git a/bin/battery_test b/bin/battery_test index cdfd227..b2682c7 100755 --- a/bin/battery_test +++ b/bin/battery_test @@ -1,11 +1,13 @@ #!/usr/bin/env python3 +import gi import os import time import re import subprocess import sys import argparse +gi.require_version('Gio', '2.0') from gi.repository import Gio diff --git a/bin/boot_mode_test_snappy.py b/bin/boot_mode_test_snappy.py new file mode 100755 index 0000000..46bed92 --- /dev/null +++ b/bin/boot_mode_test_snappy.py @@ -0,0 +1,125 @@ +#!/usr/bin/env python3 +# Copyright 2018 Canonical Ltd. +# Written by: +# Jonathan Cave <jonathan.cave@canonical.com> + +import io +import os +import re +import sys +import subprocess as sp + +import yaml + + +def fitdumpimage(filename): + cmd = 'dumpimage -l {}'.format(filename) + out = sp.check_output(cmd, shell=True).decode(sys.stdout.encoding) + buf = io.StringIO(out) + + # first line should identify FIT file + if not buf.readline().startswith('FIT description'): + raise SystemExit('ERROR: expected FIT image description') + + # second line contains some metadata, skip it + buf.readline() + + # from then on should get blocks of text describing the objects that were + # combined in to the FIT image e.g. kernel, ramdisk, device tree + image_re = re.compile(r'(?:^\ Image)\ \d+\ \((\S+)\)$') + config_re = re.compile(r'^\ Default Configuration|^\ Configuration') + objects = {} + name = '' + while True: + line = buf.readline() + # stop at end + if line == '': + break + # interested in storing image information + match = image_re.search(line) + if match: + name = match.group(1) + objects[name] = {} + continue + # not interested in configurations + if config_re.search(line): + name = '' + continue + # while in an image section store the info + if name != '': + entries = [s.strip() for s in line.split(':', 1)] + objects[name][entries[0]] = entries[1] + return objects + + +def main(): + if len(sys.argv) != 3: + raise SystemExit('ERROR: please supply gadget & kernel name') + gadget = sys.argv[1] + kernel = sys.argv[2] + + gadget_yaml = os.path.join('/snap', gadget, 'current/meta/gadget.yaml') + + if not os.path.exists(gadget_yaml): + raise SystemExit( + 'ERROR: failed to find gadget.yaml at {}'.format(gadget_yaml)) + + with open(gadget_yaml) as f: + data = yaml.load(f) + for k in data['volumes'].keys(): + bootloader = data['volumes'][k]['bootloader'] + if not bootloader: + raise SystemExit('ERROR: could not find name of bootloader') + + if bootloader not in ('u-boot', 'grub'): + raise SystemExit( + 'ERROR: Unexpected bootloader name {}'.format(bootloader)) + print('Bootloader is {}\n'.format(bootloader)) + + if bootloader == 'u-boot': + print('Parsing FIT image information...\n') + + kernel_rev = os.path.basename( + os.path.realpath('/snap/{}/current'.format(kernel))) + boot_kernel = '/boot/uboot/{}_{}.snap/kernel.img'.format( + kernel, kernel_rev) + boot_objects = fitdumpimage(boot_kernel) + + for obj, attrs in boot_objects.items(): + print('Checking object {}'.format(obj)) + if 'Sign value' not in attrs: + raise SystemExit('ERROR: no sign value found for object') + print('Found "Sign value"') + if len(attrs['Sign value']) != 512: + raise SystemExit('ERROR: unexpected sign value size') + if all(s in attrs['Sign algo'] for s in ['sha256', 'rsa2048']): + print('Found expected signing algorithms') + else: + raise SystemExit( + 'ERROR: unexpected signing algorithms {}'.format( + attrs['Sign algo'])) + print() + + # check that all parts of the fit image have + snap_kernel = '/snap/{}/current/kernel.img'.format(kernel) + snap_objects = fitdumpimage(snap_kernel) + if snap_objects != boot_objects: + raise SystemExit( + 'ERROR: boot kernel and current snap kernel do not match') + print('Kernel images in current snap and u-boot match\n') + + print('Secure Boot appears to be enabled on this system') + + if bootloader == 'grub': + cmd = 'mokutil --sb-state' + print('+', cmd, flush=True) + out = sp.check_output(cmd, shell=True).decode(sys.stdout.encoding) + print(out, flush=True) + if out != 'SecureBoot enabled\n': + raise SystemExit('ERROR: mokutil reports Secure Boot not in use') + + print('Secure Boot appears to be enabled on this system') + + +if __name__ == '__main__': + main() diff --git a/bin/camera_test_legacy b/bin/camera_test_legacy deleted file mode 100755 index 794578b..0000000 --- a/bin/camera_test_legacy +++ /dev/null @@ -1,578 +0,0 @@ -#!/usr/bin/env python3 -# -# This file is part of Checkbox. -# -# Copyright 2008-2012 Canonical Ltd. -# -# The v4l2 ioctl code comes from the Python bindings for the v4l2 -# userspace api (http://pypi.python.org/pypi/v4l2): -# Copyright (C) 1999-2009 the contributors -# -# The JPEG metadata parser is a part of bfg-pages: -# http://code.google.com/p/bfg-pages/source/browse/trunk/pages/getimageinfo.py -# Copyright (C) Tim Hoffman -# -# Checkbox is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License version 3, -# as published by the Free Software Foundation. - -# -# Checkbox is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with Checkbox. If not, see <http://www.gnu.org/licenses/>. -# - -import argparse -import ctypes -import errno -import fcntl -import imghdr -import logging -import os -import re -import struct -import sys -import time - -from gi.repository import GObject -from glob import glob -from subprocess import check_call, CalledProcessError, STDOUT -from tempfile import NamedTemporaryFile - - -_IOC_NRBITS = 8 -_IOC_TYPEBITS = 8 -_IOC_SIZEBITS = 14 - -_IOC_NRSHIFT = 0 -_IOC_TYPESHIFT = _IOC_NRSHIFT + _IOC_NRBITS -_IOC_SIZESHIFT = _IOC_TYPESHIFT + _IOC_TYPEBITS -_IOC_DIRSHIFT = _IOC_SIZESHIFT + _IOC_SIZEBITS - -_IOC_WRITE = 1 -_IOC_READ = 2 - - -def _IOC(dir_, type_, nr, size): - return ( - ctypes.c_int32(dir_ << _IOC_DIRSHIFT).value | - ctypes.c_int32(ord(type_) << _IOC_TYPESHIFT).value | - ctypes.c_int32(nr << _IOC_NRSHIFT).value | - ctypes.c_int32(size << _IOC_SIZESHIFT).value) - - -def _IOC_TYPECHECK(t): - return ctypes.sizeof(t) - - -def _IOR(type_, nr, size): - return _IOC(_IOC_READ, type_, nr, ctypes.sizeof(size)) - - -def _IOWR(type_, nr, size): - return _IOC(_IOC_READ | _IOC_WRITE, type_, nr, _IOC_TYPECHECK(size)) - - -class v4l2_capability(ctypes.Structure): - """ - Driver capabilities - """ - _fields_ = [ - ('driver', ctypes.c_char * 16), - ('card', ctypes.c_char * 32), - ('bus_info', ctypes.c_char * 32), - ('version', ctypes.c_uint32), - ('capabilities', ctypes.c_uint32), - ('reserved', ctypes.c_uint32 * 4), - ] - - -# Values for 'capabilities' field -V4L2_CAP_VIDEO_CAPTURE = 0x00000001 -V4L2_CAP_VIDEO_OVERLAY = 0x00000004 -V4L2_CAP_READWRITE = 0x01000000 -V4L2_CAP_STREAMING = 0x04000000 - -v4l2_frmsizetypes = ctypes.c_uint -( - V4L2_FRMSIZE_TYPE_DISCRETE, - V4L2_FRMSIZE_TYPE_CONTINUOUS, - V4L2_FRMSIZE_TYPE_STEPWISE, -) = range(1, 4) - - -class v4l2_frmsize_discrete(ctypes.Structure): - _fields_ = [ - ('width', ctypes.c_uint32), - ('height', ctypes.c_uint32), - ] - - -class v4l2_frmsize_stepwise(ctypes.Structure): - _fields_ = [ - ('min_width', ctypes.c_uint32), - ('min_height', ctypes.c_uint32), - ('step_width', ctypes.c_uint32), - ('min_height', ctypes.c_uint32), - ('max_height', ctypes.c_uint32), - ('step_height', ctypes.c_uint32), - ] - - -class v4l2_frmsizeenum(ctypes.Structure): - class _u(ctypes.Union): - _fields_ = [ - ('discrete', v4l2_frmsize_discrete), - ('stepwise', v4l2_frmsize_stepwise), - ] - - _fields_ = [ - ('index', ctypes.c_uint32), - ('pixel_format', ctypes.c_uint32), - ('type', ctypes.c_uint32), - ('_u', _u), - ('reserved', ctypes.c_uint32 * 2) - ] - - _anonymous_ = ('_u',) - - -class v4l2_fmtdesc(ctypes.Structure): - _fields_ = [ - ('index', ctypes.c_uint32), - ('type', ctypes.c_int), - ('flags', ctypes.c_uint32), - ('description', ctypes.c_char * 32), - ('pixelformat', ctypes.c_uint32), - ('reserved', ctypes.c_uint32 * 4), - ] - -V4L2_FMT_FLAG_COMPRESSED = 0x0001 -V4L2_FMT_FLAG_EMULATED = 0x0002 - -# ioctl code for video devices -VIDIOC_QUERYCAP = _IOR('V', 0, v4l2_capability) -VIDIOC_ENUM_FRAMESIZES = _IOWR('V', 74, v4l2_frmsizeenum) -VIDIOC_ENUM_FMT = _IOWR('V', 2, v4l2_fmtdesc) - - -class CameraTest: - """ - A simple class that displays a test image via GStreamer. - """ - def __init__(self, args, gst_plugin=None, gst_video_type=None): - self.args = args - self._mainloop = GObject.MainLoop() - self._width = 640 - self._height = 480 - self._gst_plugin = gst_plugin - self._gst_video_type = gst_video_type - - def detect(self): - """ - Display information regarding webcam hardware - """ - cap_status = dev_status = 1 - for i in range(10): - cp = v4l2_capability() - device = '/dev/video%d' % i - try: - with open(device, 'r') as vd: - fcntl.ioctl(vd, VIDIOC_QUERYCAP, cp) - except IOError: - continue - dev_status = 0 - print("%s: OK" % device) - print(" name : %s" % cp.card.decode('UTF-8')) - print(" driver : %s" % cp.driver.decode('UTF-8')) - print(" version: %s.%s.%s" - % (cp.version >> 16, - (cp.version >> 8) & 0xff, - cp.version & 0xff)) - print(" flags : 0x%x [" % cp.capabilities, - ' CAPTURE' if cp.capabilities & V4L2_CAP_VIDEO_CAPTURE - else '', - ' OVERLAY' if cp.capabilities & V4L2_CAP_VIDEO_OVERLAY - else '', - ' READWRITE' if cp.capabilities & V4L2_CAP_READWRITE - else '', - ' STREAMING' if cp.capabilities & V4L2_CAP_STREAMING - else '', - ' ]', sep="") - - resolutions = self._get_supported_resolutions(device) - print(' ', - self._supported_resolutions_to_string(resolutions).replace( - "\n", " "), - sep="") - - if cp.capabilities & V4L2_CAP_VIDEO_CAPTURE: - cap_status = 0 - return dev_status | cap_status - - def led(self): - """ - Activate camera (switch on led), but don't display any output - """ - pipespec = ("v4l2src device=%(device)s " - "! %(type)s " - "! %(plugin)s " - "! testsink" - % {'device': self.args.device, - 'type': self._gst_video_type, - 'plugin': self._gst_plugin}) - logging.debug("LED test with pipeline %s", pipespec) - self._pipeline = Gst.parse_launch(pipespec) - self._pipeline.set_state(Gst.State.PLAYING) - time.sleep(3) - self._pipeline.set_state(Gst.State.NULL) - - def display(self): - """ - Displays the preview window - """ - pipespec = ("v4l2src device=%(device)s " - "! %(type)s,width=%(width)d,height=%(height)d " - "! %(plugin)s " - "! autovideosink" - % {'device': self.args.device, - 'type': self._gst_video_type, - 'width': self._width, - 'height': self._height, - 'plugin': self._gst_plugin}) - logging.debug("display test with pipeline %s", pipespec) - self._pipeline = Gst.parse_launch(pipespec) - self._pipeline.set_state(Gst.State.PLAYING) - time.sleep(10) - self._pipeline.set_state(Gst.State.NULL) - - def still(self): - """ - Captures an image to a file - """ - if self.args.filename: - self._still_helper(self.args.filename, self._width, self._height, - self.args.quiet) - else: - with NamedTemporaryFile(prefix='camera_test_', suffix='.jpg') as f: - self._still_helper(f.name, self._width, self._height, - self.args.quiet) - - def _still_helper(self, filename, width, height, quiet, pixelformat=None): - """ - Captures an image to a given filename. width and height specify the - image size and quiet controls whether the image is displayed to the - user (quiet = True means do not display image). - """ - command = ["fswebcam", "-D 1", "-S 50", "--no-banner", - "-d", self.args.device, - "-r", "%dx%d" - % (width, height), filename] - use_gstreamer = False - if pixelformat: - if 'MJPG' == pixelformat: # special tweak for fswebcam - pixelformat = 'MJPEG' - command.extend(["-p", pixelformat]) - - try: - check_call(command, stdout=open(os.devnull, 'w'), stderr=STDOUT) - except (CalledProcessError, OSError): - use_gstreamer = True - - if use_gstreamer: - pipespec = ("v4l2src device=%(device)s " - "! %(type)s,width=%(width)d,height=%(height)d " - "! %(plugin)s " - "! jpegenc " - "! filesink location=%(filename)s" - % {'device': self.args.device, - 'type': self._gst_video_type, - 'width': width, - 'height': height, - 'plugin': self._gst_plugin, - 'filename': filename}) - logging.debug("still test with gstreamer and " - "pipeline %s", pipespec) - self._pipeline = Gst.parse_launch(pipespec) - self._pipeline.set_state(Gst.State.PLAYING) - time.sleep(3) - self._pipeline.set_state(Gst.State.NULL) - - if not quiet: - import imghdr - image_type = imghdr.what(filename) - pipespec = ("filesrc location=%(filename)s ! " - "%(type)sdec ! " - "videoscale ! " - "imagefreeze ! autovideosink" - % {'filename': filename, - 'type': image_type}) - self._pipeline = Gst.parse_launch(pipespec) - self._pipeline.set_state(Gst.State.PLAYING) - time.sleep(10) - self._pipeline.set_state(Gst.State.NULL) - - def _supported_resolutions_to_string(self, supported_resolutions): - """ - Return a printable string representing a list of supported resolutions - """ - ret = "" - for resolution in supported_resolutions: - ret += "Format: %s (%s)\n" % (resolution['pixelformat'], - resolution['description']) - ret += "Resolutions: " - for res in resolution['resolutions']: - ret += "%sx%s," % (res[0], res[1]) - # truncate the extra comma with :-1 - ret = ret[:-1] + "\n" - return ret - - def resolutions(self): - """ - After querying the webcam for supported formats and resolutions, - take multiple images using the first format returned by the driver, - and see if they are valid - """ - resolutions = self._get_supported_resolutions(self.args.device) - # print supported formats and resolutions for the logs - print(self._supported_resolutions_to_string(resolutions)) - - # pick the first format, which seems to be what the driver wants for a - # default. This also matches the logic that fswebcam uses to select - # a default format. - resolution = resolutions[0] - if resolution: - print("Taking multiple images using the %s format" - % resolution['pixelformat']) - for res in resolution['resolutions']: - w = res[0] - h = res[1] - f = NamedTemporaryFile(prefix='camera_test_%s%sx%s' % - (resolution['pixelformat'], w, h), - suffix='.jpg', delete=False) - print("Taking a picture at %sx%s" % (w, h)) - self._still_helper(f.name, w, h, True, - pixelformat=resolution['pixelformat']) - if self._validate_image(f.name, w, h): - print("Validated image %s" % f.name) - os.remove(f.name) - else: - print("Failed to validate image %s" % f.name, - file=sys.stderr) - os.remove(f.name) - return 1 - return 0 - - def _get_pixel_formats(self, device, maxformats=5): - """ - Query the camera to see what pixel formats it supports. A list of - dicts is returned consisting of format and description. The caller - should check whether this camera supports VIDEO_CAPTURE before - calling this function. - """ - supported_formats = [] - fmt = v4l2_fmtdesc() - fmt.index = 0 - fmt.type = V4L2_CAP_VIDEO_CAPTURE - try: - while fmt.index < maxformats: - with open(device, 'r') as vd: - if fcntl.ioctl(vd, VIDIOC_ENUM_FMT, fmt) == 0: - pixelformat = {} - # save the int type for re-use later - pixelformat['pixelformat_int'] = fmt.pixelformat - pixelformat['pixelformat'] = "%s%s%s%s" % \ - (chr(fmt.pixelformat & 0xFF), - chr((fmt.pixelformat >> 8) & 0xFF), - chr((fmt.pixelformat >> 16) & 0xFF), - chr((fmt.pixelformat >> 24) & 0xFF)) - pixelformat['description'] = fmt.description.decode() - supported_formats.append(pixelformat) - fmt.index = fmt.index + 1 - except IOError as e: - # EINVAL is the ioctl's way of telling us that there are no - # more formats, so we ignore it - if e.errno != errno.EINVAL: - print("Unable to determine Pixel Formats, this may be a " - "driver issue.") - return supported_formats - return supported_formats - - def _get_supported_resolutions(self, device): - """ - Query the camera for supported resolutions for a given pixel_format. - Data is returned in a list of dictionaries with supported pixel - formats as the following example shows: - resolution['pixelformat'] = "YUYV" - resolution['description'] = "(YUV 4:2:2 (YUYV))" - resolution['resolutions'] = [[width, height], [640, 480], [1280, 720] ] - - If we are unable to gather any information from the driver, then we - return YUYV and 640x480 which seems to be a safe default. - Per the v4l2 spec the ioctl used here is experimental - but seems to be well supported. - """ - supported_formats = self._get_pixel_formats(device) - if not supported_formats: - resolution = {} - resolution['description'] = "YUYV" - resolution['pixelformat'] = "YUYV" - resolution['resolutions'] = [[640, 480]] - supported_formats.append(resolution) - return supported_formats - - for supported_format in supported_formats: - resolutions = [] - framesize = v4l2_frmsizeenum() - framesize.index = 0 - framesize.pixel_format = supported_format['pixelformat_int'] - with open(device, 'r') as vd: - try: - while fcntl.ioctl(vd, - VIDIOC_ENUM_FRAMESIZES, - framesize) == 0: - if framesize.type == V4L2_FRMSIZE_TYPE_DISCRETE: - resolutions.append([framesize.discrete.width, - framesize.discrete.height]) - # for continuous and stepwise, let's just use min and - # max they use the same structure and only return - # one result - elif (framesize.type in (V4L2_FRMSIZE_TYPE_CONTINUOUS, - V4L2_FRMSIZE_TYPE_STEPWISE)): - resolutions.append([framesize.stepwise.min_width, - framesize.stepwise.min_height] - ) - resolutions.append([framesize.stepwise.max_width, - framesize.stepwise.max_height] - ) - break - framesize.index = framesize.index + 1 - except IOError as e: - # EINVAL is the ioctl's way of telling us that there are no - # more formats, so we ignore it - if e.errno != errno.EINVAL: - print("Unable to determine supported framesizes " - "(resolutions), this may be a driver issue.") - supported_format['resolutions'] = resolutions - return supported_formats - - def _validate_image(self, filename, width, height): - """ - Given a filename, ensure that the image is the width and height - specified and is a valid image file. - """ - if imghdr.what(filename) != 'jpeg': - return False - - outw = outh = 0 - with open(filename, mode='rb') as jpeg: - jpeg.seek(2) - b = jpeg.read(1) - try: - while (b and ord(b) != 0xDA): - while (ord(b) != 0xFF): - b = jpeg.read(1) - while (ord(b) == 0xFF): - b = jpeg.read(1) - if (ord(b) >= 0xC0 and ord(b) <= 0xC3): - jpeg.seek(3, 1) - h, w = struct.unpack(">HH", jpeg.read(4)) - break - b = jpeg.read(1) - outw, outh = int(w), int(h) - except (struct.error, ValueError): - pass - - if outw != width: - print("Image width does not match, was %s should be %s" % - (outw, width), file=sys.stderr) - return False - if outh != height: - print("Image width does not match, was %s should be %s" % - (outh, height), file=sys.stderr) - return False - - return True - - return True - - -def parse_arguments(argv): - """ - Parse command line arguments - """ - parser = argparse.ArgumentParser(description="Run a camera-related test") - subparsers = parser.add_subparsers(dest='test', - title='test', - description='Available camera tests') - - parser.add_argument('--debug', dest='log_level', - action="store_const", const=logging.DEBUG, - default=logging.INFO, help="Show debugging messages") - - def add_device_parameter(parser): - group = parser.add_mutually_exclusive_group() - group.add_argument("-d", "--device", default="/dev/video0", - help="Device for the webcam to use") - group.add_argument("--highest-device", action="store_true", - help=("Use the /dev/videoN " - "where N is the highest value available")) - group.add_argument("--lowest-device", action="store_true", - help=("Use the /dev/videoN " - "where N is the lowest value available")) - subparsers.add_parser('detect') - led_parser = subparsers.add_parser('led') - add_device_parameter(led_parser) - display_parser = subparsers.add_parser('display') - add_device_parameter(display_parser) - still_parser = subparsers.add_parser('still') - add_device_parameter(still_parser) - still_parser.add_argument("-f", "--filename", - help="Filename to store the picture") - still_parser.add_argument("-q", "--quiet", action="store_true", - help=("Don't display picture, " - "just write the picture to a file")) - resolutions_parser = subparsers.add_parser('resolutions') - add_device_parameter(resolutions_parser) - args = parser.parse_args(argv) - - def get_video_devices(): - devices = sorted(glob('/dev/video[0-9]'), - key=lambda d: re.search(r'\d', d).group(0)) - assert len(devices) > 0, "No video devices found" - return devices - - if hasattr(args, 'highest_device') and args.highest_device: - args.device = get_video_devices()[-1] - elif hasattr(args, 'lowest_device') and args.lowest_device: - args.device = get_video_devices()[0] - return args - - -if __name__ == "__main__": - args = parse_arguments(sys.argv[1:]) - - if not args.test: - args.test = 'detect' - - logging.basicConfig(level=args.log_level) - - # Import Gst only for the test cases that will need it - if args.test in ['display', 'still', 'led', 'resolutions']: - from gi.repository import Gst - if Gst.version()[0] > 0: - gst_plugin = 'videoconvert' - gst_video_type = 'video/x-raw' - else: - gst_plugin = 'ffmpegcolorspace' - gst_video_type = 'video/x-raw-yuv' - Gst.init(None) - camera = CameraTest(args, gst_plugin, gst_video_type) - else: - camera = CameraTest(args) - - sys.exit(getattr(camera, args.test)()) diff --git a/bin/check-prerelease b/bin/check-prerelease new file mode 100755 index 0000000..1f3d017 --- /dev/null +++ b/bin/check-prerelease @@ -0,0 +1,137 @@ +#!/usr/bin/env python3 + +""" +Script to test that the system is NOT running prerelease software + +Copyright (c) 2018 Canonical Ltd. + +Authors + Rod Smith <rod.smith@canonical.com> + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License version 3, +as published by the Free Software Foundation. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see <http://www.gnu.org/licenses/>. + +The purpose of this script is to identify whether an EFI-based +system booted from the network (test passes) or from a local disk +(test fails). + +Usage: + check-prerelease +""" + +import platform +import shlex +import sys + +from subprocess import Popen, PIPE + + +def check_kernel_status(): + """Check kernel to see if it's supported for certification + + :returns: + True if OK, False if not + """ + kernel_release = platform.release() + + retval = False + command = "apt-cache show linux-image-{}".format(kernel_release) + aptinfo = [] + aptinfo_bytes = (Popen(shlex.split(command), stdout=PIPE) + .communicate()[0]) + aptinfo = (aptinfo_bytes.decode(encoding="utf-8", errors="ignore") + .splitlines()) + + """Kernel apt-cache info includes a 'Supported:' line on release + kernels to identify the period of support. This line is missing on + pre-release kernels. Thus, we return a True value only if this + line is present and shows a 5-year support period.""" + if any("Supported: 5y" in s for s in aptinfo): + retval = True + if any("Supported: 9m" in s for s in aptinfo): + print("* Kernel is supported for 9 months; it is an interim release!") + if not any("Supported:" in s for s in aptinfo): + print("* Could not find a kernel support period; " + "may be a pre-release kernel!") + + """We also want to exclude 'edge' kernels, which are identified via + the 'Source:' line in the apt-cache output....""" + if any("Source: linux-signed-hwe-edge" in s for s in aptinfo): + print("* Kernel is an 'edge' kernel!") + retval = False + if any("Source: linux-hwe-edge" in s for s in aptinfo): + print("* Kernel is an 'edge' kernel!") + retval = False + + """We also want to exclude low-latency kernels, which are identified + via the kernel name string itself....""" + if "lowlatency" in kernel_release: + print("* Kernel is a low-latency kernel!") + retval = False + + if (not retval): + print("* Kernel release is {}".format(kernel_release)) + print("* Kernel is ineligible for certification!") + + return retval + + +def check_os_status(): + """Check OS to see if it's supported for certification. Note that this + passes any release version (even a non-LTS version), but not pre-release + versions. + + :returns: + True if OK, False if not + """ + retval = True + command = "lsb_release -s -d" + lsbinfo = [] + lsbinfo_bytes = (Popen(shlex.split(command), stdout=PIPE) + .communicate()[0]) + lsbinfo = (lsbinfo_bytes.decode(encoding="utf-8", errors="ignore") + .rstrip()) + + """OS information include '(development branch)' on pre-release + installations. Such installations should fail this test.""" + if "(development branch)" in lsbinfo: + print("* 'lsb_release -s -d' result is '{}'".format(lsbinfo)) + print("* OS is reported as a development branch:") + print("* {}".format(lsbinfo)) + retval = False + print("") + + return retval + + +def main(): + """Check to see if the machine is running pre-release kernel or OS.""" + + retval = 0 + if (not check_kernel_status()): + retval = 1 + if (not check_os_status()): + retval += 2 + if (retval == 0): + print("** All OK; production kernel and OS.") + elif (retval == 1): + print("** Test FAILS; running ineligible kernel!") + elif (retval == 2): + print("** Test FAILS; running pre-release OS!") + else: + print("** Test FAILS; running pre-release OS with ineligible kernel!") + + return(retval) + + +if __name__ == '__main__': + sys.exit(main()) diff --git a/bin/cycle_vts b/bin/cycle_vts index 8b7f5bc..c5e1677 100755 --- a/bin/cycle_vts +++ b/bin/cycle_vts @@ -12,14 +12,14 @@ then exit 1 fi -if [ "$CURRENT_VT" -ne "1" ] +if [ "$CURRENT_VT" == "7" ] then chvt 1 else - chvt 2 + chvt 3 fi -sleep 2 +sleep 5 chvt "$CURRENT_VT" sleep 2 diff --git a/bin/disk_info b/bin/disk_info index 5efedbd..0b45443 100755 --- a/bin/disk_info +++ b/bin/disk_info @@ -53,7 +53,7 @@ def main(): disks = 0 for line in lsblk.splitlines(): m = pattern.match(line) - if not m or m.group('TYPE') != 'disk': + if not m or m.group('TYPE') not in ('disk', 'crypt'): continue # Only consider MMC block devices if one of their mounted partitions is # root (/) diff --git a/bin/disk_read_performance_test b/bin/disk_read_performance_test index e2a219d..00810a5 100755 --- a/bin/disk_read_performance_test +++ b/bin/disk_read_performance_test @@ -4,7 +4,7 @@ # #Default to a lower bound of 15 MB/s -DEFAULT_BUF_READ=15 +DEFAULT_BUF_READ=${DISK_READ_PERF:-15} for disk in $@; do diff --git a/bin/efi-pxeboot b/bin/efi-pxeboot index 724fefb..24ac231 100755 --- a/bin/efi-pxeboot +++ b/bin/efi-pxeboot @@ -49,6 +49,7 @@ def discover_data(): .splitlines()) boot_entries = {} boot_order = [] + boot_current = "" if len(bootinfo) > 1: for s in bootinfo: if "BootOrder" in s: @@ -103,7 +104,7 @@ def is_pxe_booted(boot_entries, boot_order, boot_current): or "refind_" in desc: # This string indicates a boot directly from the normal Ubuntu GRUB # or rEFInd installation on the hard disk. - print("The system seems to have booted directly from the hard disk!") + print("FAIL: The system has booted directly from the hard disk!") retval = 1 elif "SATA" in desc or "Sata" in desc or "Hard Drive" in desc: # These strings indicate booting with a "generic" disk entry (one diff --git a/bin/fde_tests.py b/bin/fde_tests.py new file mode 100755 index 0000000..3008735 --- /dev/null +++ b/bin/fde_tests.py @@ -0,0 +1,101 @@ +#!/usr/bin/env python3 +# Copyright 2018 Canonical Ltd. +# Written by: +# Jonathan Cave <jonathan.cave@canonical.com> + +"""Test that Full Disk Encryption is in use. + +$ fde_tests.py +Canonical has a reference implementation of full disk encryption for IoT +devices. With no arguments passed this test checks this implementation is in +operation on the device under test. + +$ fde_tests.py desktop +Checks if the system appears to be using full disk encryption as configured by +the desktop installer. +""" + +import os +import re +import subprocess as sp +import sys + + +def main(): + on_desktop = len(sys.argv) > 1 and sys.argv[1] == 'desktop' + + # the mountpoint corresponding to the on disk encrypted partition + base_mount = '/' if on_desktop else '/writable' + + # discover the underlying mount point for the encrypted part + cmd = 'findmnt {} -n -o SOURCE'.format(base_mount) + print('+ {}'.format(cmd)) + try: + source = sp.check_output(cmd, shell=True).decode( + sys.stdout.encoding).strip() + except sp.CalledProcessError: + raise SystemExit( + 'ERROR: could not find mountpoint for {}'.format(base_mount)) + print(source, '\n') + + # resolve the source to an actual device node + print('+ realpath {}'.format(source)) + device = os.path.realpath(source) + print(device, '\n') + + # work upwards through the tree of devices until we find the one that has + # the type 'crypt' + kname = os.path.basename(device) + while True: + cmd = 'lsblk -r -n -i -o KNAME,TYPE,PKNAME | grep "^{}"'.format(kname) + print('+ {}'.format(cmd)) + try: + lsblk = sp.check_output(cmd, shell=True).decode( + sys.stdout.encoding).strip() + except sp.CalledProcessError: + raise SystemExit('ERROR: lsblk call failed') + _, devtype, parent = lsblk.split(maxsplit=2) + print(devtype, '\n') + if devtype == 'crypt': + # found the device + break + if devtype == 'disk': + # reached the physical device, end the search + raise SystemExit( + 'ERROR: could not find a block device of type "crypt"') + kname = parent + + # the presence of device with type 'crypt' is probably confirmation enough + # but to be really sure lets check to see it is found by cryptsetup + + # first we need to know its mapper name + cmd = 'dmsetup info /dev/{} | grep "^Name:"'.format(kname) + print('+ {}'.format(cmd)) + try: + mapper_name = sp.check_output(cmd, shell=True).decode( + sys.stdout.encoding).strip().split()[-1] + except sp.CalledProcessError: + raise SystemExit( + 'ERROR: dmsetup info on device {} failed'.format(kname)) + print(mapper_name, '\n') + + # then query the info in cryptsetup + cmd = 'cryptsetup status {}'.format(mapper_name) + print('+ {}'.format(cmd)) + try: + cryptinfo = sp.check_output(cmd, shell=True).decode( + sys.stdout.encoding).strip() + except sp.CalledProcessError: + raise SystemExit('ERROR: dmsetup failed') + print(cryptinfo, '\n') + + # use the type as the final arbiter of success + regexp = re.compile(r'type:\ *LUKS1') + if regexp.search(cryptinfo): + print('Full Disk Encryption is operational on this device') + else: + raise SystemExit('ERROR: cryptsetup did not report LUKS1 in use') + + +if __name__ == "__main__": + main() diff --git a/bin/gatt-notify-test.py b/bin/gatt-notify-test.py new file mode 100755 index 0000000..9923a93 --- /dev/null +++ b/bin/gatt-notify-test.py @@ -0,0 +1,241 @@ +#!/usr/bin/env python3 +# +# Copyright 2018 Canonical Ltd. +# Written by: +# Sylvain Pineau <sylvain.pineau@canonical.com> +# +# This is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License version 3, +# as published by the Free Software Foundation. +# +# This file is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this file. If not, see <http://www.gnu.org/licenses/>. + +import argparse +import logging +import os +import sys +import time + +import dbus +import dbus.service +import dbus.mainloop.glib +from gi.repository import GObject + +logger = logging.getLogger(__file__) +logger.addHandler(logging.StreamHandler(sys.stdout)) + +ADAPTER_INTERFACE = 'org.bluez.Adapter1' +DEVICE_INTERFACE = 'org.bluez.Device1' +PROP_INTERFACE = 'org.freedesktop.DBus.Properties' +OM_INTERFACE = 'org.freedesktop.DBus.ObjectManager' +GATT_SERVICE_INTERFACE = 'org.bluez.GattService1' +GATT_CHRC_INTERFACE = 'org.bluez.GattCharacteristic1' + +dbus.mainloop.glib.DBusGMainLoop(set_as_default=True) + + +class BtAdapter: + """Bluetooth LE Adapter class.""" + def __init__(self, pattern): + self._pattern = os.path.basename(pattern) + self._bus = dbus.SystemBus() + self._manager = dbus.Interface( + self._bus.get_object("org.bluez", "/"), OM_INTERFACE) + self._main_loop = GObject.MainLoop() + self._adapter = self._find_adapter() + self._path = self._adapter.object_path + self._props = dbus.Interface(self._adapter, PROP_INTERFACE) + self._name = self._props.Get(ADAPTER_INTERFACE, "Name") + self._addr = self._props.Get(ADAPTER_INTERFACE, "Address") + self._alias = self._props.Get(ADAPTER_INTERFACE, "Alias") + logger.info('Adapter found: [ {} ] {} - {}'.format( + self._path, self._addr, self._alias)) + + def _get_managed_objects(self): + return self._manager.GetManagedObjects() + + def _find_adapter(self): + for path, ifaces in self._get_managed_objects().items(): + adapter = ifaces.get(ADAPTER_INTERFACE) + if adapter is None: + continue + if (self._pattern == adapter["Address"] or + path.endswith(self._pattern)): + obj = self._bus.get_object("org.bluez", path) + return dbus.Interface(obj, ADAPTER_INTERFACE) + raise SystemExit("Bluetooth adapter not found!") + + def ensure_powered(self): + """Turn the adapter on.""" + self._props.Set(ADAPTER_INTERFACE, "Powered", dbus.Boolean(1)) + logger.info('Adapter powered on') + + def scan(self, timeout=10): + """Scan for BT devices.""" + dbus.Interface(self._adapter, ADAPTER_INTERFACE).StartDiscovery() + logger.info('Adapter scan on ({}s)'.format(timeout)) + GObject.timeout_add_seconds(timeout, self._scan_timeout) + self._main_loop.run() + + def _scan_timeout(self): + dbus.Interface(self._adapter, ADAPTER_INTERFACE).StopDiscovery() + logger.info('Adapter scan completed') + self._main_loop.quit() + + def find_device_with_service(self, ADV_SVC_UUID): + """Find a device with a given remote service.""" + for path, ifaces in self._get_managed_objects().items(): + device = ifaces.get(DEVICE_INTERFACE) + if device is None: + continue + logger.debug("{} {} {}".format( + path, device["Address"], device["Alias"])) + if ADV_SVC_UUID in device["UUIDs"] and path.startswith(self._path): + obj = self._bus.get_object("org.bluez", path) + logger.info('Device found: [ {} ] {} - {}'.format( + path, device["Name"], device["Address"])) + return dbus.Interface(obj, DEVICE_INTERFACE) + raise SystemExit("Bluetooth device not found!") + + def remove_device(self, device): + """Remove the remote device object at the given path.""" + try: + self._adapter.RemoveDevice(device) + except dbus.exceptions.DBusException as msg: + logging.error(msg) + raise SystemExit(1) + logger.info('Device properly removed') + + +class BtGATTRemoteService: + """Bluetooth LE GATT Remote Service class.""" + def __init__(self, SVC_UUID, adapter, device, max_notif): + self.SVC_UUID = SVC_UUID + self._adapter = adapter + self.device = device + self._max_notif = max_notif + self._notifications = 0 + self._bus = dbus.SystemBus() + self._manager = dbus.Interface( + self._bus.get_object("org.bluez", "/"), OM_INTERFACE) + self._main_loop = GObject.MainLoop() + self._service = self._find_service() + self._path = self._service.object_path + + def _get_managed_objects(self): + return self._manager.GetManagedObjects() + + def _find_service(self): + for path, ifaces in self._get_managed_objects().items(): + if GATT_SERVICE_INTERFACE not in ifaces.keys(): + continue + service = self._bus.get_object('org.bluez', path) + props = dbus.Interface(service, PROP_INTERFACE) + if props.Get(GATT_SERVICE_INTERFACE, "UUID") == self.SVC_UUID: + logger.info('Service found: {}'.format(path)) + return service + self._adapter.remove_device(self._device) + raise SystemExit("Bluetooth Service not found!") + + def find_chrc(self, MSRMT_UUID): + for path, ifaces in self._get_managed_objects().items(): + if GATT_CHRC_INTERFACE not in ifaces.keys(): + continue + chrc = self._bus.get_object('org.bluez', path) + props = dbus.Interface(chrc, PROP_INTERFACE) + if props.Get(GATT_CHRC_INTERFACE, "UUID") == MSRMT_UUID: + logger.info('Characteristic found: {}'.format(path)) + return chrc + self._adapter.remove_device(self._device) + raise SystemExit("Bluetooth Characteristic not found!") + + def _generic_error_cb(self, error): + self._adapter.remove_device(self._device) + self._main_loop.quit() + raise SystemExit('D-Bus call failed: ' + str(error)) + + def _start_notify_cb(self): + logger.info('Notifications enabled') + + def _notify_timeout(self): + self._adapter.remove_device(self._device) + self._main_loop.quit() + raise SystemExit('Notification test failed') + + def _changed_cb(self, iface, changed_props, invalidated_props): + if iface != GATT_CHRC_INTERFACE: + return + if not len(changed_props): + return + value = changed_props.get('Value', None) + if not value: + return + logger.debug('New Notification') + self._notifications += 1 + if self._notifications >= self._max_notif: + logger.info('Notification test succeeded') + self._main_loop.quit() + + def check_notification(self, chrc, timeout=20): + # Listen to PropertiesChanged signals from the BLE Measurement + # Characteristic. + prop_iface = dbus.Interface(chrc, PROP_INTERFACE) + prop_iface.connect_to_signal("PropertiesChanged", self._changed_cb) + + # Subscribe to BLE Measurement notifications. + chrc.StartNotify(reply_handler=self._start_notify_cb, + error_handler=self._generic_error_cb, + dbus_interface=GATT_CHRC_INTERFACE) + GObject.timeout_add_seconds(timeout, self._notify_timeout) + self._main_loop.run() + + +def main(): + logger.setLevel(logging.DEBUG) + parser = argparse.ArgumentParser() + parser.add_argument( + "id", + help='Address, udev path or name (hciX) of the BT adapter') + parser.add_argument( + "ADV_SVC_UUID", help='Beacon Gatt configuration service UUID') + parser.add_argument( + "SVC_UUID", help='Beacon Gatt notification service UUID') + parser.add_argument("MSRMT_UUID", help='Beacon Gatt measurement UUID') + parser.add_argument( + "--max-notif", "-m", type=int, default=5, + help="Maximum notification threshold") + args = parser.parse_args() + adapter = BtAdapter(args.id) + adapter.ensure_powered() + adapter.scan() + device = adapter.find_device_with_service(args.ADV_SVC_UUID) + try: + device.Connect() + except dbus.exceptions.DBusException as msg: + logging.error(msg) + adapter.remove_device(device) + raise SystemExit(1) + logger.info('Device connected, waiting 10s for services to be available') + time.sleep(10) # Let all the services to broadcast their UUIDs + service = BtGATTRemoteService( + args.SVC_UUID, adapter, device, args.max_notif) + chrc = service.find_chrc(args.MSRMT_UUID) + service.check_notification(chrc) + try: + device.Disconnect() + except dbus.exceptions.DBusException as msg: + logging.error(msg) + adapter.remove_device(device) + raise SystemExit(1) + logger.info('Device properly disconnected') + adapter.remove_device(device) + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/bin/gpu_test b/bin/gpu_test index bd22acc..49ea31a 100755 --- a/bin/gpu_test +++ b/bin/gpu_test @@ -23,11 +23,13 @@ lockups. Inspired by the workload directory of the xdiagnose package. """ +import gi import os import re import subprocess import sys import time +gi.require_version('Gio', '2.0') from gi.repository import Gio from math import cos, sin from threading import Thread diff --git a/bin/graphics_driver b/bin/graphics_driver index dfb5522..6d28f5a 100755 --- a/bin/graphics_driver +++ b/bin/graphics_driver @@ -361,7 +361,10 @@ def hybrid_graphics_check(xlog): def main(): - xlog = XorgLog("/var/log/Xorg.0.log") + if os.path.isfile("/var/log/Xorg.0.log"): + xlog = XorgLog("/var/log/Xorg.0.log") + else: + xlog = XorgLog(os.path.expanduser("~/.local/share/xorg/Xorg.0.log")) results = [] results.append(get_driver_info(xlog)) diff --git a/bin/graphics_env b/bin/graphics_env index 9511560..89c761d 100755 --- a/bin/graphics_env +++ b/bin/graphics_env @@ -19,7 +19,7 @@ if [[ $DRIVER == "amdgpu" || $DRIVER == "radeon" ]]; then if [ $INDEX -gt 1 ]; then # See https://wiki.archlinux.org/index.php/PRIME echo "Setting up PRIME GPU offloading for AMD discrete GPU" - if ! grep -q DRI3 /var/log/Xorg.0.log; then + if ! cat /var/log/Xorg.0.log ~/.local/share/xorg/Xorg.0.log 2>&1 | grep -q DRI3; then PROVIDER_ID=`xrandr --listproviders | grep "Sink Output" | awk {'print $4'} | tail -1` SINK_ID=`xrandr --listproviders | grep "Source Output" | awk {'print $4'} | tail -1` xrandr --setprovideroffloadsink ${PROVIDER_ID} ${SINK_ID} diff --git a/bin/gst_pipeline_test b/bin/gst_pipeline_test index cde7fbb..4d9f3c4 100755 --- a/bin/gst_pipeline_test +++ b/bin/gst_pipeline_test @@ -1,11 +1,14 @@ #!/usr/bin/env python3 from argparse import ArgumentParser +import gi import logging import re import os import sys import time +gi.require_version('Gst', '1.0') +gi.require_version('GLib', '2.0') from gi.repository import Gst from gi.repository import GLib from subprocess import check_output diff --git a/bin/ipmi_test b/bin/ipmi_test index 8be8b96..71e7f36 100755 --- a/bin/ipmi_test +++ b/bin/ipmi_test @@ -28,13 +28,15 @@ done echo echo "Checking for chassis status" ipmitool chassis status && echo "Successfully got chassis status" && chassis=0 || chassis=1 -echo "Checking to see if we can get sensor data" -ipmitool sdr list full && echo "Successfully got sensor data" && sensor=0 || sensor=1 +echo "Checking to see if we can get power status" +ipmitool power status && echo "Successfully got power status" && power=0 || power=1 +echo "Checking to see if we can get user data" +ipmitool user list && echo "Successfully got user data" && user=0 || user=1 echo "Checking to see if we can get info on the BMC" ipmitool bmc info && echo "Successfully got BMC information" && bmc=0 || bmc=1 # if everything passes, exit 0 -[ $chassis -eq 0 ] && [ $sensor -eq 0 ] && [ $bmc -eq 0 ] && exit 0 || echo "FAILURE: chassis: $chassis sensor: $sensor bmc: $bmc" +[ $chassis -eq 0 ] && [ $power -eq 0 ] && [ $user -eq 0 ] && [ $bmc -eq 0 ] && exit 0 || echo "FAILURE: chassis: $chassis power: $power user: $user bmc: $bmc" # otherwise exit 1 exit 1 diff --git a/bin/key_test b/bin/key_test index 3cf7ae0..2bd95f9 100755 --- a/bin/key_test +++ b/bin/key_test @@ -18,6 +18,7 @@ # along with Checkbox. If not, see <http://www.gnu.org/licenses/>. +import gi import os import sys @@ -27,6 +28,7 @@ import struct import termios from gettext import gettext as _ + from gi.repository import GObject from optparse import OptionParser @@ -221,6 +223,8 @@ class GtkReporter(Reporter): def __init__(self, *args, **kwargs): super(GtkReporter, self).__init__(*args, **kwargs) + gi.require_version('Gdk', '3.0') + gi.require_version("Gtk", "3.0") from gi.repository import Gdk, Gtk # Initialize GTK constants diff --git a/bin/keyboard_test b/bin/keyboard_test index 309b845..1615106 100755 --- a/bin/keyboard_test +++ b/bin/keyboard_test @@ -33,8 +33,10 @@ def cli_prompt(): def gtk_prompt(): + import gi + gi.require_version('Gdk', '3.0') + gi.require_version("Gtk", "3.0") from gi.repository import Gtk, Gdk - # create a new window window = Gtk.Window() window.set_type_hint(Gdk.WindowType.TOPLEVEL) diff --git a/bin/lock_screen_watcher b/bin/lock_screen_watcher index 4e9847e..1c30270 100755 --- a/bin/lock_screen_watcher +++ b/bin/lock_screen_watcher @@ -17,7 +17,9 @@ # along with Checkbox. If not, see <http://www.gnu.org/licenses/>. import dbus +import gi from dbus.mainloop.glib import DBusGMainLoop +gi.require_version('GLib', '2.0') from gi.repository import GObject from gi.repository import GLib diff --git a/bin/manage_compiz_plugin b/bin/manage_compiz_plugin index a1236ea..9650a9d 100755 --- a/bin/manage_compiz_plugin +++ b/bin/manage_compiz_plugin @@ -1,7 +1,7 @@ #!/usr/bin/env python3 # This file is part of Checkbox. # -# Copyright 2014-2015 Canonical Ltd. +# Copyright 2014-2018 Canonical Ltd. # Written by: # Daniel Manrique <roadmr@ubuntu.com> # Sylvain Pineau <sylvain.pineau@canonical.com> @@ -33,14 +33,13 @@ import sys import subprocess import time -PATH="org.compiz.core:/org/compiz/profiles/unity/plugins/core/" -KEY="active-plugins" +KEY="/org/compiz/profiles/unity/plugins/core/active-plugins" gettext.textdomain("com.canonical.certification.checkbox") gettext.bindtextdomain("com.canonical.certification.checkbox", os.getenv("CHECKBOX_PROVIDER_LOCALE_DIR", None)) -plugins = eval(subprocess.check_output(["gsettings", "get", PATH, KEY])) +plugins = eval(subprocess.check_output(["dconf", "read", KEY])) parser = argparse.ArgumentParser(description=_("enable/disable compiz plugins"), epilog=_("Available plugins: {}").format(plugins)) @@ -58,6 +57,6 @@ else: if args.plugin not in plugins: raise SystemExit(_("Plugin {} doesn't exist").format(args.plugin)) plugins.remove(args.plugin) -subprocess.call(["gsettings", "set", PATH, KEY, str(plugins)]) +subprocess.call(["dconf", "write", KEY, str(plugins)]) time.sleep(3) diff --git a/bin/net_if_watcher.py b/bin/net_if_watcher.py new file mode 100755 index 0000000..0ba61df --- /dev/null +++ b/bin/net_if_watcher.py @@ -0,0 +1,43 @@ +#!/usr/bin/env python3 +# Copyright 2018 Canonical Ltd. +# All rights reserved. +# +# Written by: +# Maciej Kisielewski <maciej.kisielewski@canonical.com> +""" +Detect insertion of a new network interface. +""" + +from pathlib import Path +import time + + +def get_ifaces(): + return set([i.name for i in Path("/sys/class/net").iterdir()]) + + +def main(): + print("INSERT NOW") + starting_ifaces = get_ifaces() + attempts = 20 + while attempts > 0: + now_ifaces = get_ifaces() + # check if something disappeared + if not starting_ifaces == now_ifaces & starting_ifaces: + raise SystemExit("Interface(s) disappeared: {}".format( + ", ".join(list(starting_ifaces - now_ifaces)))) + new_ifaces = now_ifaces - starting_ifaces + if new_ifaces: + print() + print("New interface(s) detected: {}".format( + ", ".join(list(new_ifaces)))) + return + time.sleep(1) + print('.', end='', flush=True) + attempts -= 1 + print() + raise SystemExit("Failed to detect new network interface") + + +if __name__ == '__main__': + main() diff --git a/bin/network_restart b/bin/network_restart index 31f1da7..d01e983 100755 --- a/bin/network_restart +++ b/bin/network_restart @@ -14,6 +14,9 @@ from subprocess import check_output, check_call, CalledProcessError, STDOUT from argparse import ArgumentParser try: + import gi + gi.require_version('GLib', '2.0') + gi.require_version("Gtk", "3.0") from gi.repository import Gtk, GObject, GLib GLib.threads_init() GObject.threads_init() diff --git a/bin/pm_test b/bin/pm_test index d1bd3b1..70cd121 100755 --- a/bin/pm_test +++ b/bin/pm_test @@ -1,4 +1,10 @@ #!/usr/bin/env python3 +""" +If you're debugging this program, set PM_TEST_DRY_RUN in your environment. +It will make the script not run actual S3, S4, reboot and poweroff commands. +""" +import gi +import json import logging import logging.handlers import os @@ -13,7 +19,7 @@ from calendar import timegm from configparser import ConfigParser from datetime import datetime, timedelta from time import localtime, time - +gi.require_version("Gtk", "3.0") from gi.repository import GObject, Gtk @@ -41,8 +47,13 @@ def main(): logging.debug('Arguments: {0!r}'.format(args)) logging.debug('Extra Arguments: {0!r}'.format(extra_args)) + dry_run = os.environ.get('PM_TEST_DRY_RUN', False) + if dry_run: + logging.info("Running in dry-run mode") + try: - operation = PowerManagementOperation(args, extra_args, user=username) + operation = PowerManagementOperation( + args, extra_args, user=username, dry_run=dry_run) operation.setup() operation.run() except (TestCancelled, TestFailed) as exception: @@ -64,10 +75,11 @@ def main(): class PowerManagementOperation(): SLEEP_TIME = 5 - def __init__(self, args, extra_args, user=None): + def __init__(self, args, extra_args, user=None, dry_run=False): self.args = args self.extra_args = extra_args self.user = user + self.dry_run = dry_run def setup(self): """ @@ -128,11 +140,17 @@ class PowerManagementOperation(): logging.info('Executing new {0!r} operation...' .format(self.args.pm_operation)) logging.debug('Executing: {0!r}...'.format(command_str)) - # The PM operation is performed asynchronously so let's just wait - # indefinitely until it happens and someone comes along to kill us. - # This addresses LP: #1413134 - subprocess.check_call(command_str, shell=True) - signal.pause() + if self.dry_run: + print("\n\nRUNNING IN DRY-RUN MODE") + print("Normally the program would run: {}".format(command_str)) + print("Waiting for Enter instead") + input() + else: + # The PM operation is performed asynchronously so let's just wait + # indefinitely until it happens and someone comes along to kill us. + # This addresses LP: #1413134 + subprocess.check_call(command_str, shell=True) + signal.pause() def run_suspend_cycles(self, cycles_count, fwts): """Run suspend and resume cycles.""" @@ -155,15 +173,21 @@ class PowerManagementOperation(): command_str = command_tpl.format(script_path, cycles_count) logging.info('Running suspend/resume cycles') logging.debug('Executing: {0!r}...'.format(command_str)) - try: - # We call sleep_test or fwts_test script and log its output as it - # contains average times we need to compute global average times - # later. - logging.info(subprocess.check_output( - command_str, universal_newlines=True, shell=True)) - except subprocess.CalledProcessError as e: - logging.error('Error while running {0}:'.format(e.cmd)) - logging.error(e.output) + if self.dry_run: + print("\n\nRUNNING IN DRY-RUN MODE") + print("Normally the program would run: {}".format(command_str)) + print("Waiting for Enter instead") + input() + else: + try: + # We call sleep_test or fwts_test script and log its output as + # it contains average times we need to compute global average + # times later. + logging.info(subprocess.check_output( + command_str, universal_newlines=True, shell=True)) + except subprocess.CalledProcessError as e: + logging.error('Error while running {0}:'.format(e.cmd)) + logging.error(e.output) def summary(self): """ @@ -199,6 +223,29 @@ class PowerManagementOperation(): message = ('{0} test complete' .format(self.args.pm_operation.capitalize())) + total_suspends_expected = ( + self.args.suspends_before_reboot * self.args.total) + problems = '' + fwts_log_path = os.path.join(self.args.log_dir, 'fwts.log') + try: + with open(fwts_log_path, 'rt') as f: + magic_line = 'Completed S3 cycle(s) \n' + count = f.readlines().count(magic_line) + if count != total_suspends_expected: + problems = ( + "Found {} occurrences of '{}'. Expected {}".format( + count, magic_line.strip(), + total_suspends_expected)) + except FileNotFoundError: + problems = "Error opening {}".format(fwts_log_path) + if problems: + result = { + 'outcome': 'fail' if problems else 'pass', + 'comment': problems, + } + with open(os.path.join(self.args.log_dir, '__result'), 'wt') as f: + json.dump(result, f) + if self.args.silent: logging.info(message) else: @@ -206,6 +253,8 @@ class PowerManagementOperation(): MessageDialog(title, message).run() if self.args.checkbox_respawn_cmd: subprocess.run( + r'unset LD_LIBRARY_PATH;' + r'unset PYTHONPATH; unset PYTHONHOME;' r'DISPLAY=:0 x-terminal-emulator -e "sudo -u ' r'{} bash -c \"source {}; exec bash\""'.format( self.user, self.args.checkbox_respawn_cmd), shell=True) @@ -572,11 +621,11 @@ autologin-user-timeout=0 shutil.copystat(self.config_filename, backup_filename) break - with open(self.config_filename, 'w') as f: - if self.config_filename == '/etc/lightdm/lightdm.conf': - f.write(self.template.format(username=self.user)) - elif self.config_filename == '/etc/gdm3/custom.conf': - self.parser.write(f) + with open(self.config_filename, 'w') as f: + if self.config_filename == '/etc/lightdm/lightdm.conf': + f.write(self.template.format(username=self.user)) + elif self.config_filename == '/etc/gdm3/custom.conf': + self.parser.write(f) def disable(self): """ diff --git a/bin/removable_storage_test b/bin/removable_storage_test index 42e197a..6de3f97 100755 --- a/bin/removable_storage_test +++ b/bin/removable_storage_test @@ -841,20 +841,10 @@ def main(): # This will raise KeyError for no # associated disk device was found. xhci_disks = test.get_disks_xhci() - # pep8 style suggest to limit the try clause - # to the absolute minimum amount of code necessary - try: - disk_xhci_flag = xhci_disks[disk] - except KeyError: - print("\t\tDisk does not use xhci_hci.") - return 1 - else: - if('xhci' == disk_xhci_flag): - print("\t\tDriver Detected: xhci_hcd") - else: - print("\t\tDisk does not use xhci_hci.") - logging.debug("disk_xhci_flag is not xhci") - return 1 + if test.get_disks_xhci().get(disk, '') != 'xhci': + raise SystemExit( + "\t\tDisk does not use xhci_hcd.") + print("\t\tDriver Detected: xhci_hcd") else: # Give it a hint for the detection failure. # LP: #1362902 diff --git a/bin/resolution_test b/bin/resolution_test index a50868b..55a7828 100755 --- a/bin/resolution_test +++ b/bin/resolution_test @@ -1,9 +1,10 @@ #!/usr/bin/env python3 +import gi import sys from argparse import ArgumentParser - +gi.require_version('Gdk', '3.0') from gi.repository import Gdk diff --git a/bin/socketcan_test.py b/bin/socketcan_test.py new file mode 100755 index 0000000..232fc2c --- /dev/null +++ b/bin/socketcan_test.py @@ -0,0 +1,199 @@ +#!/usr/bin/env python3 +# This file is part of Checkbox. +# +# Copyright 2018 Canonical Ltd. +# Written by: +# Jonathan Cave <jonathan.cave@canonical.com> +# +# Checkbox is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License version 3, +# as published by the Free Software Foundation. +# +# Checkbox is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with Checkbox. If not, see <http://www.gnu.org/licenses/>. + +import argparse +import os +import socket +import struct +import sys +import textwrap +import threading +import time + + +class CANSocket(): + + # struct module format strings for CAN packets + # Normal format: + # < little-endian + # I unsigned int (4) : CAN-ID + EFF/RTR/ERR Flags + # B unsigned char (1) : Data length + # 3x padding (3 * 1) : - + # 8s char array (8 * 1) : Data + FORMAT = "<IB3x8s" + # Flexible Data (FD) rate format: + # < little-endian + # I unsigned int (4) : CAN-ID + EFF/RTR/ERR Flags + # B unsigned char (1) : Data length + # B unsigned char (1) : FD Flags + # 2x padding (2 * 1) : - + # 64s char array (64 * 1) : Data + FD_FORMAT = "<IBB2x64s" + + CAN_MTU = struct.Struct(FORMAT).size + CANFD_MTU = struct.Struct(FD_FORMAT).size + + # Socket options from <linux/can/raw.h> + CAN_RAW_FILTER = 1 # set 0 .. n can_filter(s) + CAN_RAW_ERR_FILTER = 2 # set filter for error frames + CAN_RAW_LOOPBACK = 3 # local loopback (default:on) + CAN_RAW_RECV_OWN_MSGS = 4 # receive my own msgs (default:off) + CAN_RAW_FD_FRAMES = 5 # allow CAN FD frames (default:off) + CAN_RAW_JOIN_FILTERS = 6 # all filters must match to trigger + + def __init__(self, interface=None, fdmode=False, loopback=True): + self.sock = socket.socket(socket.PF_CAN, # protocol family + socket.SOCK_RAW, + socket.CAN_RAW) + self._fdmode = fdmode + self._loopback = loopback + if interface is not None: + self._bind(interface) + + def __enter__(self): + return self + + def __exit__(self, *args): + self.close() + + def close(self): + self.sock.close() + + def _bind(self, interface): + self.sock.bind((interface,)) + if self._fdmode: # default is off + self.sock.setsockopt(socket.SOL_CAN_RAW, self.CAN_RAW_FD_FRAMES, 1) + if not self._loopback: # default is on + self.sock.setsockopt(socket.SOL_CAN_RAW, self.CAN_RAW_LOOPBACK, 0) + + def send(self, can_id, data, id_flags=0, fd_flags=0): + can_id = can_id | id_flags + if self._fdmode: + can_pkt = struct.pack(self.FD_FORMAT, can_id, len(data), fd_flags, + data) + else: + can_pkt = struct.pack(self.FORMAT, can_id, len(data), data) + self.sock.send(can_pkt) + + def recv(self): + if self._fdmode: + can_pkt = self.sock.recv(self.CANFD_MTU) + can_id, length, fd_flags, data = struct.unpack(self.FD_FORMAT, + can_pkt) + else: + can_pkt = self.sock.recv(self.CAN_MTU) + can_id, length, data = struct.unpack(self.FORMAT, can_pkt) + can_id &= socket.CAN_EFF_MASK + return (can_id, data[:length]) + + +def echo_test(args): + # ID conversion and size check + print('Using source ID: {}'.format(args.can_id)) + can_id_i = int(args.can_id, 16) + if can_id_i > 2047 and not args.effid: + raise SystemExit('ERROR: CAN ID to high for SFF') + id_flags = 0 + if args.effid: + print('Setting EFF CAN ID flag') + id_flags = socket.CAN_EFF_FLAG + + # Whether to enable local loopback, required for local only test + # but only want to parse packets from other end if remote + loopback = not args.remote + + # Default data size is 8 bytes but if testing FD Mode use 64 + data_size = 8 + if args.fdmode: + data_size = 64 + data_b = os.urandom(data_size) + print('Sending data: {}'.format(data_b.hex())) + + recv_id_i = None + recv_data_b = None + + def receive(): + nonlocal recv_id_i + nonlocal recv_data_b + print('Opening read socket on {}'.format(args.interface)) + with CANSocket(args.interface, fdmode=args.fdmode, + loopback=loopback) as recv_s: + recv_id_i, recv_data_b = recv_s.recv() + + # Create a receive thread + recv_t = threading.Thread(target=receive, daemon=True) + recv_t.start() + time.sleep(1) + + print('Opening send socket on {}'.format(args.interface)) + # Open socket, will raise OSError on failure + with CANSocket(args.interface, fdmode=args.fdmode, + loopback=loopback) as send_s: + print('Sending data...', flush=True) + try: + send_s.send(can_id_i, data_b, id_flags=id_flags) + except OSError as e: + print(e, file=sys.stderr) + if e.errno == 90: + raise SystemExit('ERROR: interface does not support FD Mode') + else: + raise SystemExit('ERROR: OSError on attempt to send') + + recv_t.join(10) + if recv_t.is_alive(): + raise SystemExit('ERROR: Timeout waiting to receive data') + + print('Received packet') + print(' ID : {:x}'.format(recv_id_i)) + print(' Data: {}'.format(recv_data_b.hex())) + if recv_id_i != can_id_i or recv_data_b != data_b: + raise SystemExit('ERROR: ID/Data received does not match sent') + + print('\nPASSED') + + +def main(): + parser = argparse.ArgumentParser( + formatter_class=argparse.RawDescriptionHelpFormatter, + description='SocketCAN Tests', + epilog=textwrap.dedent(''' + Examples: + socketcan_test.py can0 123 + socketcan_test.py can0 212 --remote + socketcan_test.py can0 FA123 --effid + socketcan_test.py can0 E407DB --effid --fdmode''').lstrip()) + parser.add_argument('interface', type=str, help='Interface name e.g. can0') + parser.add_argument('can_id', type=str, help=textwrap.dedent(''' + CAN ID of source in Hex, max of 11 bits using Standard Frame + Format (SFF). Specifying use of Extended Frame Format (EFF) + allows the use of up to 29 bit IDs.''').lstrip()) + parser.add_argument('--remote', action='store_true', + help='Expect a remote device to echo the test packet') + parser.add_argument('--effid', action='store_true', + help='Use EFF ID (CAN 2.0 B)') + parser.add_argument('--fdmode', action='store_true', + help='Attempt to send 64 bytes of data i.e. FD mode') + parser.set_defaults(func=echo_test) + + args = parser.parse_args() + args.func(args) + + +if __name__ == "__main__": + main() diff --git a/bin/touchpad_test b/bin/touchpad_test index 7152745..bc6a474 100755 --- a/bin/touchpad_test +++ b/bin/touchpad_test @@ -1,9 +1,13 @@ #!/usr/bin/env python3 +import gi import sys import gettext from gettext import gettext as _ +gi.require_version('Gdk', '3.0') +gi.require_version('Gio', '2.0') +gi.require_version("Gtk", "3.0") from gi.repository import Gio, Gtk, Gdk from optparse import OptionParser diff --git a/bin/xrandr_cycle b/bin/xrandr_cycle index b9f615b..34f953c 100755 --- a/bin/xrandr_cycle +++ b/bin/xrandr_cycle @@ -87,6 +87,13 @@ for adapter, mode in modes: highest_modes = [] for adapter, params in top_res_per_aspect.items(): for aspect, width in params.items(): + # xrandr can list modes that are unsupported, unity-control-center + # defines minimum width and height, below which the resolution + # is not listed as a choice in display settings panel in UCC + # see should_show_resolution function in cc-display-panel.c + # from lp:unity-control-center + if width < 675 or width / aspect < 530: + continue mode = '{}x{}'.format(width, width/aspect) highest_modes.append((adapter, mode)) |