diff options
author | Sylvain Pineau <sylvain.pineau@canonical.com> | 2016-10-07 09:48:55 +0200 |
---|---|---|
committer | Sylvain Pineau <sylvain.pineau@canonical.com> | 2016-10-07 09:48:55 +0200 |
commit | 33a440473829a82188a9547146789fae72185c5e (patch) | |
tree | aa8efa1434f252432ab99274eaca45a9427207bc /bin | |
parent | cbbe2fe63a1b22f6d0873bfe11ff3e834a1a883c (diff) |
Import plainbox-provider-checkbox_0.21.orig.tar.gzupstream-0.21patched-0.21-1
Diffstat (limited to 'bin')
40 files changed, 2901 insertions, 1028 deletions
diff --git a/bin/audio_bluetooth_loopback_test b/bin/audio_bluetooth_loopback_test new file mode 100755 index 0000000..9661ec4 --- /dev/null +++ b/bin/audio_bluetooth_loopback_test @@ -0,0 +1,57 @@ +#!/bin/bash +# +# This file is part of Checkbox. +# +# Copyright 2014 Canonical Ltd. +# +# Authors: Daniel Manrique <roadmr@ubuntu.com> +# +# Checkbox is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License version 3, +# as published by the Free Software Foundation. +# +# Checkbox is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with Checkbox. If not, see <http://www.gnu.org/licenses/>. +# +# This simple script finds a bluetooth source and sink, and records from the +# source for 6 seconds, playing the recording back into the sink. It helps a +# human validate that record/playback is working, human can speak into +# microphone and just ensure the speech can be heard instantly in the headset. + +[ -x "`which pactl`" ] || exit 1 +[ -x "`which pacat`" ] || exit 1 + +SINK=$(pactl list | sed -n '/monitor/d;s/Name: \(bluez_sink\.\)/\1/p') +SOURCE=$(pactl list | sed -n '/monitor/d;s/Name: \(bluez_source\.\)/\1/p') + + +if [ -n "$SINK" ] && [ -n "$SOURCE" ]; then + PLAYBACK_LOG=$(mktemp --tmpdir audio_bluetooth_loopback.XXXXX) + RECORD_LOG=$(mktemp --tmpdir audio_bluetooth_loopback.XXXXX) + trap "rm $PLAYBACK_LOG $RECORD_LOG" EXIT + # ensure we exit with failure if parec fails, and not with pacat + # --playback's error code + set -o pipefail + # Use a short latency parameter so time between speech and hearing it is + # short, makes for a nicer interactive experience + LATENCY="--latency-msec=50" + # time out after 6 seconds, forcibly kill after 8 seconds if pacat didn't + # respond + echo "Recording and playing back, please speak into bluetooth microphone" + timeout -k 8 6 pacat $LATENCY --record -v -d $SOURCE 2>$RECORD_LOG | \ + pacat $LATENCY --playback -v -d $SINK 2>$PLAYBACK_LOG + + echo "RECORD LOG" + cat $RECORD_LOG + echo "" + echo "PLAYBACK LOG" + cat $PLAYBACK_LOG +else + echo "No bluetooth audio device found" + exit 1 +fi diff --git a/bin/bluetooth_transfer_stress b/bin/bluetooth_transfer_stress new file mode 100755 index 0000000..2add0b1 --- /dev/null +++ b/bin/bluetooth_transfer_stress @@ -0,0 +1,54 @@ +#!/bin/bash +# +# Copyright (C) 2014 Canonical +# +# Authors: +# Daniel Manrique <daniel.manrique@canonical.com> +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU General Public License +# as published by the Free Software Foundation; either version 2 +# of the License, or (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. +# +BTDEVADDR="$1" + +if [ -z "$BTDEVADDR" ]; then + echo "Please give Bluetooth device address as first parameter" + exit 1 +fi + +ORIGIN=$(mktemp --tmpdir bluetooth-stress.XXXXXX) +DESTINATION=$(mktemp --tmpdir bluetooth-stress.XXXXXX) +REMOTE=$RANDOM +SIZEKB=10240 +echo "Creating ${SIZEKB}KB file to test transfer" +dd if=/dev/urandom of=$ORIGIN count=$SIZEKB bs=1024 +ORIGIN_SUM=$(sha256sum $ORIGIN | cut -f 1 -d ' ') +set -o pipefail +echo "Sending file using Bluetooth" +time obexftp -v -b $BTDEVADDR -o $REMOTE --put $ORIGIN 2>&1 | ansi_parser +sleep 5 +echo "Receiving file using Bluetooth" +time obexftp -v -b $BTDEVADDR -o $DESTINATION --get $REMOTE 2>&1 | ansi_parser +# Now checksum destination and compare +DESTINATION_SUM=$(sha256sum $DESTINATION | cut -f 1 -d ' ') +# Clean up before reporting +rm $ORIGIN $DESTINATION +if [ "$ORIGIN_SUM" = "$DESTINATION_SUM" ]; then + echo "Checksums match, file transfer succeeded" + exit 0 +else + echo "Checksums don't match, file was corrupted during transfers." + echo "Original checksum: $ORIGIN_SUM" + echo "Checksum of received file: $DESTINATION_SUM" + exit 1 +fi diff --git a/bin/bmc_info b/bin/bmc_info new file mode 100755 index 0000000..d7aec9b --- /dev/null +++ b/bin/bmc_info @@ -0,0 +1,61 @@ +#!/usr/bin/env python3 + +import sys +import shlex +from subprocess import check_output, CalledProcessError + +def main(): + # First, we need to get output + cmd = "ipmitool mc info" + try: + result = check_output(shlex.split(cmd), universal_newlines=True) + except FileNotFoundError: + print("ipmitool was not found! Please install it and try again.", + file=sys.stderr) + return 1 + except CalledProcessError as e: + print("Problem running %s. Error was %s" % (cmd,e),file=sys.stderr) + return 1 + result = result.split('\n') + + # We need some bits that are formatted oddly so we need to do some parsing + data = {} + for line in result: + if ':' in line: + key = line.split(':')[0].strip() + value = line.split(':')[1].strip() + data[key] = value + last = (key, [value]) + else: + # since the last line we matched had a ':', it's key is likely the + # key for the next few lines that don't have a ':' + # This should keep adding items to our last key's list until we hit + # another line with a :, and we start the cycle over again. + last[1].append(line.strip()) + data[last[0]] = last[1] + + # Now print out what we care about: + we_care_about = ['Manufacturer Name', + 'Manufacturer ID', + 'Product Name', + 'Product ID', + 'Firmware Revision', + 'IPMI Version', + 'Additional Device Support'] + + for field in we_care_about: + if type(data[field]) is list: + # Sometimes the first item in the list is ''. This will remove it + data[field].remove('') + print(field.ljust(30),':',data[field].pop(0)) + for item in data[field]: + print(' '.ljust(32),item) + else: + print(field.ljust(30),":",data[field]) + #print("{}:\t{}".format(field, data[field])) + + return 0 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/bin/broadband_info b/bin/broadband_info new file mode 100755 index 0000000..aec2ccb --- /dev/null +++ b/bin/broadband_info @@ -0,0 +1,6 @@ +#!/bin/bash + +for i in $(mmcli --simple-status -L | \ + awk '/freedesktop\/ModemManager1\/Modem/ {print $1;}'); do + mmcli -m $i +done diff --git a/bin/camera_test b/bin/camera_test index 019736c..c3e554c 100755 --- a/bin/camera_test +++ b/bin/camera_test @@ -26,20 +26,22 @@ # along with Checkbox. If not, see <http://www.gnu.org/licenses/>. # +import argparse +import ctypes +import errno +import fcntl +import imghdr +import logging import os import re +import struct import sys import time -import errno -import fcntl -import ctypes -import struct -import imghdr -from tempfile import NamedTemporaryFile -from subprocess import check_call, CalledProcessError, STDOUT -import argparse -from glob import glob + from gi.repository import GObject +from glob import glob +from subprocess import check_call, CalledProcessError, STDOUT +from tempfile import NamedTemporaryFile _IOC_NRBITS = 8 @@ -189,8 +191,8 @@ class CameraTest: print(" driver : %s" % cp.driver.decode('UTF-8')) print(" version: %s.%s.%s" % (cp.version >> 16, - (cp.version >> 8) & 0xff, - cp.version & 0xff)) + (cp.version >> 8) & 0xff, + cp.version & 0xff)) print(" flags : 0x%x [" % cp.capabilities, ' CAPTURE' if cp.capabilities & V4L2_CAP_VIDEO_CAPTURE else '', @@ -205,7 +207,7 @@ class CameraTest: resolutions = self._get_supported_resolutions(device) print(' ', self._supported_resolutions_to_string(resolutions).replace( - "\n", " "), + "\n", " "), sep="") if cp.capabilities & V4L2_CAP_VIDEO_CAPTURE: @@ -223,6 +225,7 @@ class CameraTest: % {'device': self.args.device, 'type': self._gst_video_type, 'plugin': self._gst_plugin}) + logging.debug("LED test with pipeline %s", pipespec) self._pipeline = Gst.parse_launch(pipespec) self._pipeline.set_state(Gst.State.PLAYING) time.sleep(10) @@ -241,6 +244,7 @@ class CameraTest: 'width': self._width, 'height': self._height, 'plugin': self._gst_plugin}) + logging.debug("display test with pipeline %s", pipespec) self._pipeline = Gst.parse_launch(pipespec) self._pipeline.set_state(Gst.State.PLAYING) time.sleep(10) @@ -289,6 +293,8 @@ class CameraTest: 'height': height, 'plugin': self._gst_plugin, 'filename': filename}) + logging.debug("still test with gstreamer and " + "pipeline %s", pipespec) self._pipeline = Gst.parse_launch(pipespec) self._pipeline.set_state(Gst.State.PLAYING) time.sleep(3) @@ -425,8 +431,8 @@ class CameraTest: # for continuous and stepwise, let's just use min and # max they use the same structure and only return # one result - elif framesize.type == V4L2_FRMSIZE_TYPE_CONTINUOUS or\ - framesize.type == V4L2_FRMSIZE_TYPE_STEPWISE: + elif (framesize.type in (V4L2_FRMSIZE_TYPE_CONTINUOUS, + V4L2_FRMSIZE_TYPE_STEPWISE)): resolutions.append([framesize.stepwise.min_width, framesize.stepwise.min_height] ) @@ -441,7 +447,6 @@ class CameraTest: if e.errno != errno.EINVAL: print("Unable to determine supported framesizes " "(resolutions), this may be a driver issue.") - return supported_formats supported_format['resolutions'] = resolutions return supported_formats @@ -495,6 +500,10 @@ def parse_arguments(argv): title='test', description='Available camera tests') + parser.add_argument('--debug', dest='log_level', + action="store_const", const=logging.DEBUG, + default=logging.INFO, help="Show debugging messages") + def add_device_parameter(parser): group = parser.add_mutually_exclusive_group() group.add_argument("-d", "--device", default="/dev/video0", @@ -505,7 +514,6 @@ def parse_arguments(argv): group.add_argument("--lowest-device", action="store_true", help=("Use the /dev/videoN " "where N is the lowest value available")) - subparsers.add_parser('detect') led_parser = subparsers.add_parser('led') add_device_parameter(led_parser) @@ -541,6 +549,8 @@ if __name__ == "__main__": if not args.test: args.test = 'detect' + logging.basicConfig(level=args.log_level) + # Import Gst only for the test cases that will need it if args.test in ['display', 'still', 'led', 'resolutions']: from gi.repository import Gst diff --git a/bin/connect_wireless b/bin/connect_wireless index 54f61f7..c1ee36c 100755 --- a/bin/connect_wireless +++ b/bin/connect_wireless @@ -1,23 +1,42 @@ #!/bin/bash +# Check nmcli version +NMCLI_GTE_0_9_10=0 +nmcli general 2>&1 >/dev/null +if [ $? -eq 0 ]; then + NMCLI_GTE_0_9_10=1 +fi + # Any active connections? conn='' -active_connection=$(nmcli -f SSID,ACTIVE dev wifi list | grep yes) - -if [ $? -eq 0 ] -then - ap=$(echo $active_connection | awk -F\' '{print $2}') - conn=$(nmcli -t -f UUID,TYPE,NAME con list | grep wireless | grep -e "$ap$" | awk -F\: '{print $1}') +if [ $NMCLI_GTE_0_9_10 -eq 0 ]; then + active_connection=$(nmcli -f SSID,ACTIVE dev wifi list | grep yes) + if [ $? -eq 0 ]; then + ap=$(echo $active_connection | awk -F\' '{print $2}') + conn=$(nmcli -t -f UUID,TYPE,NAME con list | grep wireless | grep -e "$ap$" | head -n 1 | awk -F\: '{print $1}') + else + conn=$(nmcli -t -f UUID,TYPE con list | grep wireless | head -n 1 | awk -F\: '{print $1}') + fi else - conn=$(nmcli -t -f UUID,TYPE con list | grep wireless | head -n 1 | awk -F\: '{print $1}') + active_connection=$(nmcli -f SSID,ACTIVE dev wifi | grep yes) + if [ $? -eq 0 ]; then + ap=$(echo $active_connection | awk '{print $1}') + conn=$(nmcli -t -f UUID,TYPE,NAME con show | grep wireless | grep -e "$ap$" | head -n 1 | awk -F\: '{print $1}') + else + conn=$(nmcli -t -f UUID,TYPE con show | grep wireless | head -n 1 | awk -F\: '{print $1}') + fi fi #Strip trailing/leading whitespace conn=$(echo $conn |sed 's/^[ \t]*//;s/[ \t]*$//') # Find out if wireless is enabled -nmcli nm wifi | grep -q 'enabled' +if [ $NMCLI_GTE_0_9_10 -eq 0 ]; then + nmcli nm wifi | grep -q 'enabled' +else + nmcli radio wifi | grep -q 'enabled' +fi if [ $? -ne 0 ] then # Find out why @@ -35,7 +54,7 @@ nmcli dev status | grep -q '\<connected\>' if [ $? -eq 0 ] then # Disconnect, pause for a short time - for iface in `nmcli -f GENERAL dev list | grep 'GENERAL.DEVICE' | awk '{print $2}'` + for iface in `(nmcli -f GENERAL dev list 2>/dev/null || nmcli -f GENERAL dev show) | grep 'GENERAL.DEVICE' | awk '{print $2}'` do nmcli dev disconnect iface $iface done diff --git a/bin/cpu_offlining b/bin/cpu_offlining index c1c02a8..0e88af1 100755 --- a/bin/cpu_offlining +++ b/bin/cpu_offlining @@ -40,7 +40,7 @@ done if [ $result -eq 0 ]; then echo "Successfully turned $cpu_count cores off and back on" else - echo "Error with offlining one or more cores." 1>&2 + echo "Error with offlining one or more cores. CPU offline may not work if this is an ARM system." 1>&2 fi exit $result diff --git a/bin/create_connection b/bin/create_connection index 6f92859..71e9572 100755 --- a/bin/create_connection +++ b/bin/create_connection @@ -5,6 +5,11 @@ import os import time from subprocess import check_call, check_output, CalledProcessError +try: + from subprocess import DEVNULL # >= python3.3 +except ImportError: + import os + DEVNULL = open(os.devnull, 'wb') from uuid import uuid4 from argparse import ArgumentParser @@ -127,8 +132,15 @@ baud=115200 def block_until_created(connection, retries, interval): while retries > 0: - nmcli_con_list = check_output(['nmcli', 'con', 'list'], - universal_newlines=True) + try: + nmcli_con_list = check_output(['nmcli', 'con', 'list'], + stderr=DEVNULL, + universal_newlines=True) + except CalledProcessError: + check_call(['nmcli', 'con', 'reload']) + nmcli_con_list = check_output(['nmcli', 'con', 'show'], + stderr=DEVNULL, + universal_newlines=True) if connection in nmcli_con_list: print("Connection %s registered" % connection) diff --git a/bin/disk_smart b/bin/disk_smart index 3545700..7e6929f 100755 --- a/bin/disk_smart +++ b/bin/disk_smart @@ -7,6 +7,7 @@ Copyright (C) 2010 Canonical Ltd. Authors Jeff Lane <jeffrey.lane@canonical.com> Brendan Donegan <brendan.donegan@canonical.com> + Rod Smith <rod.smith@canonical.com> This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License version 2, @@ -34,6 +35,12 @@ USB/eSATA/eSAS attached storage devices. Changelog: +v1.4: Fix script failure on disks with no pre-existing SMART tests +v1.3: Fix detection of SMART availability & activate SMART if available + but deactivated. Also use smartctl return value rather than string- + matching to determine if a test has failed; this should be more + robust, as output strings vary between disks. +v1.2: Handle multiple output formats for "smartctl -l" v1.1: Put delay before first attempt to acces log, rather than after v1.0: added debugger class and code to allow for verbose debug output if needed @@ -58,8 +65,10 @@ import os import sys import time import logging +import shlex -from subprocess import Popen, PIPE +from subprocess import Popen, PIPE, check_call, check_output +from subprocess import CalledProcessError from argparse import ArgumentParser @@ -72,27 +81,57 @@ class ListHandler(logging.StreamHandler): msg = msg.decode() logger = logging.getLogger(record.name) new_record = logger.makeRecord(record.name, record.levelno, - record.pathname, record.lineno, msg, record.args, - record.exc_info, record.funcName) + record.pathname, record.lineno, + msg, record.args, + record.exc_info, + record.funcName) logging.StreamHandler.emit(self, new_record) else: logging.StreamHandler.emit(self, record) +def enable_smart(disk): + """ Enable SMART on the specified disk + :param disk: + disk device filename (e.g., /dev/sda) + :returns: + True if enabling smart was successful, False otherwise + """ + logging.debug('SMART disabled; attempting to enable it.') + command = 'smartctl -s on %s' % disk + try: + check_call(shlex.split(command)) + return True + except CalledProcessError: + return False + + +# Returns True if SMART is enabled. If not enabled, attempt to +# enable it and return result of that attempt. def is_smart_enabled(disk): # Check with smartctl to see if SMART is available and enabled on the disk command = 'smartctl -i %s' % disk diskinfo_bytes = (Popen(command, stdout=PIPE, shell=True) - .communicate()[0]) + .communicate()[0]) diskinfo = diskinfo_bytes.decode().splitlines() logging.debug('SMART Info for disk %s', disk) logging.debug(diskinfo) - return (len(diskinfo) > 2 - and 'Enabled' in diskinfo[-2] - and 'Available' in diskinfo[-3]) + # Return True if the output (diskinfo) includes BOTH + # "SMART support is.*Available" AND "SMART support is.*Enabled". + # If SMART is available but not enabled, try to enable it. + if len(diskinfo) > 2: + if any("SMART support is" in s and "Available" in s + for s in diskinfo): + if any("SMART support is" in s and "Enabled" in s + for s in diskinfo): + return True + else: + return enable_smart(disk) + else: + return False def run_smart_test(disk, type='short'): @@ -110,22 +149,26 @@ def run_smart_test(disk, type='short'): def get_smart_entries(disk, type='selftest'): entries = [] - command = 'smartctl -l %s %s' % (type, disk) - stdout = Popen(command, stdout=PIPE, shell=True).stdout + try: + stdout = check_output(['smartctl', '-l', type, disk], + universal_newlines=True) + returncode = 0 + except CalledProcessError as err: + stdout = err.output + returncode = err.returncode # Skip intro lines - while True: - line = stdout.readline().decode() - if not line: - raise Exception('Failed to parse SMART log entries') - - if line.startswith('SMART'): + stdout_lines = iter(stdout.splitlines()) + for line in stdout_lines: + if (line.startswith('SMART') or + line.startswith('No self-tests have been logged')): break # Get lengths from header - line = stdout.readline().decode() + line = next(stdout_lines) if not line.startswith('Num'): - return entries + entries.append('No entries found in log yet') + return entries, returncode columns = ['number', 'description', 'status', 'remaining', 'lifetime', 'lba'] lengths = [line.index(i) for i in line.split()] @@ -134,8 +177,7 @@ def get_smart_entries(disk, type='selftest'): # Get remaining lines entries = [] - for line_bytes in stdout.readlines(): - line = line_bytes.decode() + for line in stdout_lines: if line.startswith('#'): entry = {} for i, column in enumerate(columns): @@ -143,10 +185,64 @@ def get_smart_entries(disk, type='selftest'): # Convert some columns to integers entry['number'] = int(entry['number'][1:]) - entry['lifetime'] = int(entry['lifetime']) entries.append(entry) - return entries + return entries, returncode + + +# Returns True if an "in-progress" message is found in the smartctl +# output, False if such a message is not found. In the former case, +# the in-progress message entries are logged. +def in_progress(current_entries): + statuses = [entry for entry in current_entries + if isinstance(entry, dict) + and 'status' in entry + and (entry['status'] == 'Self-test routine in progress' + or "Self test in progress" in entry['status'])] + if statuses: + for entry in statuses: + logging.debug('%s %s %s %s' % (entry['number'], + entry['description'], + entry['status'], + entry['remaining'])) + return True + else: + return False + + +# Wait for SMART test to complete; return status and return code. +# Note that different disks return different types of values. +# Some return no status reports while a test is ongoing; others +# show a status line at the START of the list of tests, and +# others show a status line at the END of the list of tests +# (and then move it to the top once the tests are done). +def poll_for_status(args, disk, previous_entries): + # Priming read... this is here in case our test is finished or fails + # immediate after it beginsAccording to. + logging.debug('Polling selftest.log for status') + keep_going = True + + while keep_going: + # Poll every sleep seconds until test is complete$ + time.sleep(args.sleep) + + current_entries, returncode = get_smart_entries(disk) + if current_entries != previous_entries: + if not in_progress(current_entries): + keep_going = False + + if args.timeout is not None: + if args.timeout <= 0: + logging.debug('Polling timed out') + return 'Polling timed out', 1 + else: + args.timeout -= args.sleep + + if isinstance(current_entries[0], str): + return current_entries[0], returncode + else: + return current_entries[0]['status'], returncode + def main(): description = 'Tests that SMART capabilities on disks that support SMART function.' @@ -183,7 +279,7 @@ def main(): logger.setLevel(logging.INFO) # Make sure we're root, because smartctl doesn't work otherwise. - if not os.geteuid()==0: + if not os.geteuid() == 0: parser.error("You must be root to run this program") # If SMART is available and enabled, we proceed. Otherwise, we exit as the @@ -194,8 +290,8 @@ def main(): return 0 # Initiate a self test and start polling until the test is done - previous_entries = get_smart_entries(disk) - logging.info("Starting SMART self-test on %s" % disk) + previous_entries, returncode = get_smart_entries(disk) + logging.info("Starting SMART self-test on %s", disk) if run_smart_test(disk) != 0: logging.error("Error reported during smartctl test") return 1 @@ -204,39 +300,17 @@ def main(): # Abort the previous instance # so that polling can identify the difference run_smart_test(disk) - previous_entries = get_smart_entries(disk) - - # Priming read... this is here in case our test is finished or fails - # immediate after it begins. - logging.debug('Polling selftest.log for status') - - while True: - # Poll every sleep seconds until test is complete$ - time.sleep(args.sleep) - - current_entries = get_smart_entries(disk) - logging.debug('%s %s %s %s' % (current_entries[0]['number'], - current_entries[0]['description'], - current_entries[0]['status'], - current_entries[0]['remaining'])) - if current_entries != previous_entries \ - and current_entries[0]["status"] != 'Self-test routine in progress': - break - - if args.timeout is not None: - if args.timeout <= 0: - logging.debug('Polling timed out') - return 1 - else: - args.timeout -= args.sleep + previous_entries, returncode = get_smart_entries(disk) - status = current_entries[0]['status'] + status, returncode = poll_for_status(args, disk, previous_entries) - if status != 'Completed without error': - log = get_smart_entries(disk) + if returncode != 0: + log, returncode = get_smart_entries(disk) logging.error("FAIL: SMART Self-Test appears to have failed for some reason. " - "Run 'sudo smartctl -l selftest %s' to see the SMART log" % disk) - logging.debug("Last self-test run status: %s" % status) + "Run 'sudo smartctl -l selftest %s' to see the SMART log", + disk) + logging.debug("Last smartctl return code: %d", returncode) + logging.debug("Last smartctl run status: %s", status) return 1 else: logging.info("PASS: SMART Self-Test completed without error") diff --git a/bin/dkms_info b/bin/dkms_info new file mode 100755 index 0000000..ae9a997 --- /dev/null +++ b/bin/dkms_info @@ -0,0 +1,569 @@ +#!/usr/bin/env python3 +# encoding: utf-8 +# Copyright 2015 Canonical Ltd. +# Written by: +# Shawn Wang <shawn.wang@canonical.com> +# Zygmunt Krynicki <zygmunt.krynicki@canonical.com> +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License version 3, +# as published by the Free Software Foundation. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +""" +dkms_info provides device package information. + +supported package types: + - dkms (Dynamic Kernel Module Support): provides kernel modules + - non-dkms: packages that with modaliases header and don't exist in dkms list + - hardware modalias: might be unused dkms or config package + - oemalias: It is like non-dkms(w/o modalias) +supported output format: + - onelines: one line per packages with matched modaliases information + - dumps: json output (fully information) +""" + +import fnmatch +import functools +import email.parser +import io +import json +import logging +import os +import subprocess +import sys +import unittest + +from guacamole import Command + +try: + from unittest import mock +except ImportError: + from plainbox.vendor import mock + +_logger = logging.getLogger(None) + + +@functools.lru_cache() +def get_system_module_list(): + """ + Use lsmod to list current kernel modules. + + :returns: + A list of module names that are loaded into the current kernel. + """ + _logger.info("Looking at inserted kernel modules") + modules = [] + with io.open("/proc/modules", 'rt', + encoding='UTF-8') as stream: + for line in stream.readlines(): + modules.append(line.split()[0].strip()) + return modules + + +@functools.lru_cache() +def get_system_modaliases(): + r""" + List machine modaliases. + + :returns: + dict of hardware modaliases. + key: modalias_type + value: list of modalias_string + """ + _logger.info("Looking for modalias files in /sys/devices") + + result = {} + name = "modalias" + for root, dirs, files in os.walk("/sys/devices/"): + if name in files: + with io.open(os.path.join(root, name), 'rt', + encoding='UTF-8') as stream: + data = stream.read().strip() + (modalias_type, modalias_string) = data.split(":", 1) + + if modalias_type not in result: + result[modalias_type] = [] + if modalias_string not in result[modalias_type]: + result[modalias_type].append(modalias_string) + return result + + +def get_installed_dkms_modules(): + """ + Query dkms_status from /var/lib/dkms/. + + An installed dkms module has the below directory. + /var/lib/dkms/<dkms_name>/<dkms_ver>/<kernel_ver> + + :returns: + list of (<dkms_name>, <dkms_ver>) + """ + _dkmses = [] + _logger.info("Querying dkms database") + + path = "/var/lib/dkms/" + for root, dirs, files in os.walk(path): + if os.uname().release in dirs: + dkms = root[len(path):].split("/") + if len(dkms) != 2: + continue + _dkmses.append(dkms) + return _dkmses + + +@functools.lru_cache() +def match_patterns(patterns): + """ + Check modalias patterns matched with modalias, or type is oemalias. + + oemalias is a special pattern_type for oem. + :param patterns: + list of modalias pattern from debian package. + :returns: + list of modalias pattern matched with hardware modalias + """ + _logger.info("Looking for modalias objects matching") + matched = [] + if not patterns: + return matched + hw_modaliases = get_system_modaliases() + for pattern in patterns: + pattern_array = pattern.split(":", 1) + if len(pattern_array) < 2: + _logger.info("skip pattern {}, can't find type".format(pattern)) + continue + + (pattern_type, pattern_string) = pattern_array + + if pattern_type == "oemalias": + matched.append(pattern) + if pattern_type not in hw_modaliases: + continue + for item in hw_modaliases[pattern_type]: + if fnmatch.fnmatch(item, pattern_string): + matched.append(pattern) + return matched + + +class SystemInfoTests(unittest.TestCase): + + """Tests for System Information Parsing and Collection.""" + + _proc_modules = """\ +xt_REDIRECT 16384 3 - Live 0x0000000000000000 +nf_nat_redirect 16384 1 xt_REDIRECT, Live 0x0000000000000000 +xt_hl 16384 3 - Live 0x0000000000000000 +hid_generic 16384 0 - Live 0x0000000000000000 +usbhid 53248 0 - Live 0x0000000000000000 +hid 110592 2 hid_generic,usbhid, Live 0x0000000000000000 +overlay 45056 1 - Live 0x0000000000000000 +""" + _modalias = """\ +usb:v1D6Bp0003d0319dc09dsc00dp03ic09isc00ip00in00 +""" + + def setUp(self): + """Common setup code.""" + get_system_module_list.cache_clear() + get_system_modaliases.cache_clear() + + @mock.patch('io.open', mock.mock_open(read_data=_proc_modules)) + def test_get_module_list__calls_and_parses_lsmod(self): + """Ensure that get_module_list() parses lsmod output.""" + # NOTE: Return value was loaded from my system running kernel 4.0. + # The first few and last rows to be precise. + modules = get_system_module_list() + self.assertEqual(modules, [ + 'xt_REDIRECT', 'nf_nat_redirect', 'xt_hl', 'hid_generic', + 'usbhid', 'hid', 'overlay']) + + @mock.patch('io.open', mock.mock_open(read_data=_proc_modules)) + def test_get_module_list_is_cached(self): + """Ensure that get_module_list() cache works.""" + modules1 = get_system_module_list() + modules2 = get_system_module_list() + self.assertIn('xt_REDIRECT', modules1) + self.assertIn('overlay', modules2) + self.assertEqual(modules1, modules2) + + @mock.patch('os.walk') + @mock.patch('io.open', mock.mock_open(read_data=_modalias)) + def test_get_system_modalias(self, mock_os_walk): + """test_get_system_modalias.""" + mock_os_walk.return_value = [ + ("/sys/devices/pci0000:00/0000:00:14.0/usb2/2-0:1.0/modalias", + ["driver", "subsystem"], + ["modalias", "uevent"]), + ] + + """fetch hw_modaliases from machine and check modalis types.""" + modaliases = get_system_modaliases() + self.assertEqual(len(modaliases), 1) + self.assertIn("usb", modaliases) + + @mock.patch('os.uname') + @mock.patch('os.walk') + def test_get_installed_dkms_modules(self, mock_os_walk, mock_os_uname): + """test_get_installed_dkms_modules.""" + mock_os_walk.return_value = [ + ("/var/lib/dkms/hello/0.1", + ["3.19.0-15-generic", "build", "source"], + []), + ] + o = mock.Mock() + o.release = "3.19.0-15-generic" + mock_os_uname.return_value = o + self.assertEqual([['hello', '0.1']], + get_installed_dkms_modules()) + + @mock.patch('__main__.get_system_modaliases') + def test_match_patterns(self, mock_get_system_modaliases): + """Test of match_patterns.""" + mock_get_system_modaliases.return_value = { + "pci": ["v0000168Cd00000036sv0000103Csd0000217Fbc02sc80i00", + "v00008086d00008C26sv0000103Csd000022D9bc0Csc03i20"], + "usb": ["v8087p8000d0005dc09dsc00dp01ic09isc00ip00in00", + "v1D6Bp0002d0319dc09dsc00dp00ic09isc00ip00in00"], + } + pkg_modalieses = ["pci:v00008086d00008C26sv*sd*bc*sc*i*", + "usb:v07B4p010Ad0102dc*dsc*dp*ic*isc*ip*in*", + "oemalias:test"] + matched_modalieses = match_patterns(tuple(pkg_modalieses)) + # match_patterns + self.assertIn("pci:v00008086d00008C26sv*sd*bc*sc*i*", + matched_modalieses) + self.assertIn("oemalias:test", + matched_modalieses) + self.assertNotIn("usb:v07B4p010Ad0102dc*dsc*dp*ic*isc*ip*in*", + matched_modalieses) + + +class DkmsPackage(object): + + """ + Handle DKMS type device package, DKMS is a kernel module framework. + + It generate modules for installed kernel or different kernel versions. + The dkms modules will be copied to /lib/modulesa/`uname -r`/updates/dkms/. + Those modules might be load by modaliases information. + """ + + def __init__(self, name, version): + """ + init of DkmsPackage, define all the attribute. + + :param name: + DKMS module name + :param version: + DKMS module version + """ + self.dkms_name = name + self.dkms_ver = version + self.pkg_name = self._query_package() + self.kernel_ver = os.uname().release + self.arch = os.uname().machine + self.mods = self._list_modules() + self.install_mods = self._list_install_modules() + self.pkg = None + + def _query_package(self): + """ + Query debian package of dkms. + + Use dpkg -S to check dkms src path of debian package. + + :return: + string of package name or None + """ + path = "/usr/src/{}-{}/dkms.conf".format(self.dkms_name, self.dkms_ver) + _logger.info("Looking for packages that provide: %s", path) + dpkg_info_root = "/var/lib/dpkg/info" + for fn in os.listdir(dpkg_info_root): + if not fn.endswith(".list"): + continue + with io.open(os.path.join(dpkg_info_root, fn), 'rt', + encoding='UTF-8') as stream: + if path in stream.read(): + return fn[:-len(".list")] + return None + + def _list_modules(self): + """ + List all the kernel modules that provide by the dkms package. + + Module name (.ko) with "-" will be replace to "_" when module loaded. + + :param path: + The directory to look at. + :return: + List of kernel modules + """ + path = "/var/lib/dkms/{}/{}/{}/{}/module".format( + self.dkms_name, self.dkms_ver, self.kernel_ver, self.arch) + _logger.info("Looking for kernel modules in %s", path) + result = [] + for module_file in os.listdir(path): + (module, extension) = os.path.splitext(module_file) + if extension == ".ko": + result.append(module.replace("-", "_")) + return result + + def _list_install_modules(self): + """ + Return a dict of install_modules. + + key is installed module name + value is tuple of matched patterns + + :return: + Dict of installed dkms modules + """ + install_mods = {} + for m in self.mods: + if m not in get_system_module_list(): + continue + _logger.info("Inspecting module %s", m) + + output = subprocess.check_output(["modinfo", m], + universal_newlines=True) + aliases = [] + for line in output.splitlines(): + if not line.startswith("alias:"): + continue + + key, value = line.split(':', 1) + aliases.append(value.strip()) + + install_mods[m] = match_patterns(tuple(aliases)) + return install_mods + +def _headers_to_dist(pkg_str): + """ + Convert rft822 headers string to dict. + + :param headers: + deb822 headers object + :return: + dict, the key is lowercase of deb822 headers key + """ + + header = email.parser.Parser().parsestr(pkg_str) + target = {} + for key in header.keys(): + target[key.lower()] = header[key] + return target + +class DebianPackageHandler(object): + + """Use rtf822(email) to handle the package information from file_object.""" + + def __init__(self, extra_pkgs=[], file_object=None): + """ + DebianPackageHandler. + + :param file_object: + default file open from /var/lib/dpkg/status, + where stored system package information + """ + if file_object is None: + file_object = io.open('/var/lib/dpkg/status', 'rt', + encoding='UTF-8') + self._file_object = file_object + self.extra_pkgs = extra_pkgs + self.pkgs = self._get_device_pkgs() + + def _gen_all_pkg_strs(self): + """ + Get package information in /var/lib/dpkg/status. + + :returns: + A generator of debian package. + """ + _logger.info("Loading information about all packages") + for pkg_str in self._file_object.read().split('\n\n'): + yield pkg_str + + + def _get_device_pkgs(self): + """ + Only device packages have debian package header 'Modaliases'. + + This method get packages with the key ``modaliases``. + Use the method instead of get_all_pkgs for performance issues. + + :returns: + A list of dict , the dict is converted from debian package header. + """ + + _logger.info("Looking for packages providing modaliases") + _modalias_pkgs = {} + target_str = "" + result = {} + + for pkg_str in self._gen_all_pkg_strs(): + + for pkg in self.extra_pkgs: + if pkg.pkg_name is None: + continue + pstr = "Package: {}".format(pkg.pkg_name) + if pstr in pkg_str: + _logger.info("Gathering information of package, {}".format( + pkg.pkg_name)) + pkg.pkg = _headers_to_dist(pkg_str) + break + else: + if "Modaliases:" in pkg_str: + pkg = _headers_to_dist(pkg_str) + + (modalias_header, pattern_str) = \ + pkg['modaliases'].strip(")").split("(") + patterns = pattern_str.split(', ') + patterns.sort() + pkg['match_patterns'] = match_patterns(tuple(patterns)) + + with io.open("/var/lib/dpkg/info/{}.list".format(pkg['package']), + 'rt', encoding='UTF-8') as stream: + if "/dkms.conf" in stream.read(): + pkg['unused_dkms'] = True + result[pkg['package']] = pkg + return result + + def to_json(self): + return json.dumps({ "dkms": self.extra_pkgs, "non-dkms": self.pkgs }, default=lambda o: o.__dict__, sort_keys=True, indent=4) + + def to_outline(self): + result = "" + for pkg in self.extra_pkgs: + if pkg.pkg is None: + continue + result = "{}\n{}_{}: {}".format( + result, + pkg.pkg_name, + pkg.pkg["version"], + pkg.install_mods) + for pkg_name, pkg in self.pkgs.items(): + extra_str = "" + if "unused_dkms" in pkg: + extra_str = "- " + result = "{}\n{}{}_{}: {}".format( + result, + extra_str, + pkg_name, + pkg["version"], + pkg['match_patterns']) + return result + + +class DebianPackageHandlerTest(unittest.TestCase): + + """Test of DebianPackageHandler.""" + + _var_lib_dpkg_status = """\ +Package: foo +Status: install ok installed +Modaliases: hwe(pci:v000099DDd00000036sv*sd*bc*sc*i*) + +Package: foo1 +Status: install ok installed +Modaliases: hwe(pci:v0000AADDd00000036sv*sd*bc*sc*i*) + +Package: foo2 +Status: install ok installed + +Package: foo3 +Status: install ok installed + +Package: bar +Status: install ok installed + +""" + + + @mock.patch('io.open', mock.mock_open(read_data=_var_lib_dpkg_status)) + @mock.patch('__main__.get_system_modaliases') + def test_get_pkgs(self, mock_get_system_modaliases): + """Test of test_get_pkgs.""" + mock_get_system_modaliases.return_value = { + "pci": ["v0000168Cd00000036sv0000103Csd0000217Fbc02sc80i00", + "v00008086d00008C26sv0000103Csd000022D9bc0Csc03i20"], + "usb": ["v8087p8000d0005dc09dsc00dp01ic09isc00ip00in00", + "v1D6Bp0002d0319dc09dsc00dp00ic09isc00ip00in00"], + } + + self.pkg_handler = DebianPackageHandler( + file_object=io.StringIO(self._var_lib_dpkg_status)) + self.assertEqual(len(self.pkg_handler.pkgs), 2) + + +class DeviceInfo(Command): + + """ + Implementation of the dkms-info command. + + dkms_info provides device package information. + + @EPILOG@ + + supported package types: + - dkms (Dynamic Kernel Module Support): provides kernel modules + - non-dkms: packages that with modaliases header and don't exist in dkms + list + - hardware modalias: might be unused dkms or config package + - oemalias: It is like non-dkms(w/o modalias) + + supported output formats: + - onelines: one line per packages with matched modaliases information + - dumps: json output (fully information) + """ + + def register_arguments(self, parser): + """Register command line arguments for dkms-info.""" + + parser.add_argument( + '--format', default="onelines", + choices=["summary", "json"], + help=("Choose output format type: " + "summary (one line per packages) " + "or json (json format, fully information)")) + + parser.add_argument( + '--output', default=None, + help=("Output filename to store the output date")) + + def invoked(self, ctx): + """Invoke dkms-info.""" + logging.basicConfig( + level=logging.INFO, format='[%(relativeCreated)06dms] %(message)s') + _logger.info("Started") + + dkms_pkgs = [] + for (dkms_name, dkms_ver) in get_installed_dkms_modules(): + dkms_pkg = DkmsPackage(dkms_name, dkms_ver) + dkms_pkgs.append(dkms_pkg) + + output = sys.stdout + if ctx.args.output is not None: + output = open(ctx.args.output, 'wt', encoding='UTF-8') + + pkg_handler = DebianPackageHandler(extra_pkgs=dkms_pkgs) + if ctx.args.format == "summary": + output.write(pkg_handler.to_outline()) + else: + output.write(pkg_handler.to_json()) + _logger.info("Data collected") + + +if __name__ == '__main__': + if '--test' in sys.argv: + sys.argv.remove('--test') + unittest.main() + else: + DeviceInfo().main() diff --git a/bin/dmitest b/bin/dmitest new file mode 100755 index 0000000..02e67a9 --- /dev/null +++ b/bin/dmitest @@ -0,0 +1,230 @@ +#!/usr/bin/env python3 +# +# This file is part of Checkbox. +# +# Copyright 2014 Canonical Ltd. +# +# Checkbox is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License version 3, +# as published by the Free Software Foundation. +# +# Checkbox is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with Checkbox. If not, see <http://www.gnu.org/licenses/>. +# +# Test for sane dmidecode output, particularly with respect to +# various manufacturer information fields. Also, verify that the +# system reports a chassis type that suits its class (server or +# desktop/laptop) +# +# By: Rod Smith +# +# Parameters: +# * --dmifile {filename} -- Input filename; optional. If specified, +# file is used instead of dmidecode output. +# * --test_versions -- Include chassis, system, and base boad version +# numbers among tests. +# * --test_serials -- Include system and base board serial numbers among +# tests. +# * 'desktop' or 'server' -- Type of system being tested. + +import re +import subprocess +import sys + +from argparse import ArgumentParser + +# Command to retrieve DMI information +COMMAND = "dmidecode" + + +def find_in_section(stream, section, label, strings, find_empty): + """ Search for a set of strings on a labelled line in a section + of the output. + :param stream: + input text stream (dmidecode output) + :param section: + section label in which to search (e.g., "Chassis Information") + :param label: + label of line on which to search (e.g., "Type:") + :param strings: + set of strings for which to search (e.g., ["server", "blade"]) + :param find_empty: + if True, matches empty label field (as if '""' were passed as + a strings value) + :returns found: + True if one or more of strings was found on "label" line in "section" + section, or if "label" line is empty AND find_empty is True; + False otherwise + """ + start_looking = False + found = False + empty = True + for line in stream: + if line == section: + start_looking = True + if start_looking and re.search(label, line): + print("\n" + section) + print(line.strip()) + empty = len(line.strip()) == len(label) + if empty and find_empty: + found = True + for s in strings: + if re.search(s, line, flags=re.IGNORECASE): + found = True + break + start_looking = False + + return found + + +def main(): + parser = ArgumentParser("dmitest") + parser.add_argument('system_type', + help="System type ('server' or 'desktop').", + choices=['server', 'desktop']) + parser.add_argument('--dmifile', + help="File to use in lieu of dmidecode.") + parser.add_argument('--test_versions', action="store_true", + help="Set to check version information") + parser.add_argument('--test_serials', action="store_true", + help="Set to check serial number information") + args = parser.parse_args() + + try: + if args.dmifile: + print("Reading " + args.dmifile + " as DMI data") + stream = subprocess.check_output(['cat', args.dmifile], universal_newlines=True).splitlines() + else: + stream = subprocess.check_output(COMMAND, universal_newlines=True).splitlines() + except subprocess.CalledProcessError as err: + print("Error running {}: {}".format(COMMAND, err)) + return 1 + + retval = 0 + + """ + NOTE: System type is encoded in both the "Chassis Information" and "Base + Board Type" sections. The former is more reliable, so we do a whitelist + test on it -- the type MUST be specified correctly. The "Base Board Type" + section is less reliable, so rather than flag large numbers of systems + for having "Unknown", "Other", or something similar here, we just flag + it when it's at odds with the type passed on the command line. Also, + the "Base Board Type" may specify a desktop or tower system on servers + shipped in those form factors, so we don't flag that combination as an + error. + """ + if args.system_type == 'server': + if not find_in_section(stream, 'Chassis Information', 'Type:', + ['server', 'rack mount', 'blade', + 'expansion chassis', 'multi-system'], False): + print("*** Incorrect or unknown server chassis type!") + retval = 1 + if find_in_section(stream, 'Base Board Information', 'Type:', + ['portable', 'notebook', 'space-saving', + 'all in one'], False): + print("*** Incorrect server base board type!") + retval = 1 + else: + if not find_in_section(stream, 'Chassis Information', 'Type:', + ['notebook', 'portable', 'laptop', 'desktop', + 'lunch box', 'space-saving', 'tower', + 'all in one', 'hand held'], False): + print("*** Incorrect or unknown desktop chassis type!") + retval = 1 + if find_in_section(stream, 'Base Board Information', 'Type:', + ['rack mount', 'server', 'multi-system', + 'interconnect board'], False): + print("*** Incorrect desktop base board type!") + retval = 1 + + if find_in_section(stream, 'Chassis Information', 'Manufacturer:', + ['empty', 'chassis manufacture', 'null', 'insyde', + 'to be filled by o\.e\.m\.', 'no enclosure', + '\.\.\.\.\.'], True): + print("*** Invalid chassis manufacturer!") + retval = 1 + + if find_in_section(stream, 'System Information', 'Manufacturer:', + ['system manufacture', 'insyde', 'standard', + 'to be filled by o\.e\.m\.', 'no enclosure'], True): + print("*** Invalid system manufacturer!") + retval = 1 + + if find_in_section(stream, 'Base Board Information', 'Manufacturer:', + ['to be filled by o\.e\.m\.'], True): + print("*** Invalid base board manufacturer!") + retval = 1 + + if find_in_section(stream, 'System Information', 'Product Name:', + ['system product name', 'to be filled by o\.e\.m\.'], + False): + print("*** Invalid system product name!") + retval = 1 + + if find_in_section(stream, 'Base Board Information', 'Product Name:', + ['base board product name', + 'to be filled by o\.e\.m\.'], False): + print("*** Invalid base board product name!") + retval = 1 + + if args.test_versions: + + if find_in_section(stream, 'Chassis Information', 'Version:', + ['to be filled by o\.e\.m\.', 'empty'], + False): + print("*** Invalid chassis version!") + retval = 1 + + if find_in_section(stream, 'System Information', 'Version:', + ['to be filled by o\.e\.m\.', '\(none\)', + 'null', 'system version', 'not applicable', + '\.\.\.\.\.'], False): + print("*** Invalid system information version!") + retval = 1 + + if find_in_section(stream, 'Base Board Information', 'Version:', + ['base board version', + 'empty', 'to be filled by o\.e\.m\.'], False): + print("*** Invalid base board version!") + retval = 1 + + if args.test_serials: + + if find_in_section(stream, 'System Information', 'Serial Number:', + ['to be filled by o\.e\.m\.', + 'system serial number', '\.\.\.\.\.'], + False): + print("*** Invalid system information serial number!") + retval = 1 + + if find_in_section(stream, 'Base Board Information', 'Serial Number:', + ['n/a', 'base board serial number', + 'to be filled by o\.e\.m\.', 'empty', '\.\.\.'], + False): + print("*** Invalid base board serial number!") + retval = 1 + + if find_in_section(stream, 'Processor Information', 'Version:', + ['sample'], False): + print("*** Invalid processor information!") + retval = 1 + + # In review of dmidecode data on 10/23/2014, no conspicuous problems + # found in BIOS Information section's Vendor, Version, or Release Date + # fields. Therefore, no tests based on these fields have been written. + + if retval: + print("\nFailed one or more tests (see above)") + else: + print("\nPassed all tests") + + return retval + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/bin/fwts_test b/bin/fwts_test index f48cae3..127858a 100755 --- a/bin/fwts_test +++ b/bin/fwts_test @@ -2,37 +2,80 @@ import sys import re -from time import time, sleep +from time import time from argparse import ArgumentParser, RawTextHelpFormatter, REMAINDER from subprocess import Popen, PIPE from syslog import * +from distutils.spawn import find_executable +import os -TESTS = ['acpidump', - 'acpiinfo', - 'acpitables', - 'apicedge', - 'apicinstance', - 'aspm', - 'bios32', - 'bios_info', - 'checksum', - 'cstates', - 'dmar', - 'ebda', - 'fadt', - 'hpet_check', - 'klog', - 'mcfg', - 'method', - 'mpcheck', - 'msr', - 'mtrr', - 'nx', - 'oops', - 'uefirtvariable', - 'version', - 'virt', - 'wmi'] +# These tests require user interaction and need either special handling +# or skipping altogether (right now, we skip them but they're kept here +# in case we figure out a way to present the interaction to the user). +INTERACTIVE_TESTS = ['ac_adapter', + 'battery', + 'hotkey', + 'power_button', + 'brightness', + 'lid'] +# These are usually performed on normal certification runs +CERT_TESTS = ['acpidump', + 'acpitables', + 'apicedge', + 'apicinstance', + 'aspm', + 'bios32', + 'checksum', + 'cstates', + 'dmar', + 'ebda', + 'fadt', + 'hpet_check', + 'klog', + 'mcfg', + 'method', + 'mpcheck', + 'msr', + 'mtrr', + 'nx', + 'oops', + 'uefirtvariable', + 'version', + 'virt', + 'wmi'] +# These are advanced tests that shouldn't affect certification status +NON_CERT_TESTS = ['acpiinfo', + 'bios_info', + 'cmosdump', + 'crs', + 'crsdump', + 'csm', + 'dmicheck', + 'ebdadump', + 'fan', + 'gpedump', + 'hda_audio', + 'maxfreq', + 'maxreadreq', + 'memmapdump', + 'microcode', + 'mpdump', + 'os2gap', + 'osilinux', + 'pcc', + 'pciirq', + 'plddump', + 'pnp', + 'prsdump', + 'romdump', + 'securebootcert', + 'syntaxcheck', + 'uefidump', + 'uefirtmisc', + 'uefirttime', + 'uefivarinfo' + ] +TESTS = sorted(CERT_TESTS + NON_CERT_TESTS) def get_sleep_times(start_marker, end_marker, sleep_time, resume_time): @@ -65,7 +108,7 @@ def get_sleep_times(start_marker, end_marker, sleep_time, resume_time): if end_marker in loglist[idx]: run = 'PASS' break - + sleep_elapsed = float(sleep_end_time) - float(sleep_start_time) resume_elapsed = float(resume_end_time) - float(resume_start_time) return (run, sleep_elapsed, resume_elapsed) @@ -95,6 +138,20 @@ def fix_sleep_args(args): return new_args +def detect_progress_indicator(): + # Return a command suitable for piping progress information to its + # stdin (invoked via Popen), in list format. + # Return zenity if installed and DISPLAY (--auto-close) + # return dialog if installed and no DISPLAY (width height) + display = os.environ.get('DISPLAY') + if display and find_executable('zenity'): + return ["zenity", "--progress", "--text", "Progress", "--auto-close"] + if not display and find_executable('dialog'): + return ["dialog", "--gauge", "Progress", "20", "70"] + # Return None if no progress indicator is to be used + return None + + def main(): description_text = 'Tests the system BIOS using the Firmware Test Suite' epilog_text = ('To perform sleep testing, you will need at least some of ' @@ -103,6 +160,7 @@ def main(): '--s3-delay-delta\n' '--s3-device-check\n' '--s3-device-check-delay\n' + '--s3-hybrid-sleep\n' '--s3-max-delay\n' '--s3-min-delay\n' '--s3-multiple\n' @@ -117,24 +175,26 @@ def main(): epilog=epilog_text, formatter_class=RawTextHelpFormatter) parser.add_argument('-l', '--log', - default='/tmp/fwts_results.log', - help=('Specify the location and name of the log file.\n' - '[Default: %(default)s]')) + default='/tmp/fwts_results.log', + help=('Specify the location and name ' + 'of the log file.\n' + '[Default: %(default)s]')) parser.add_argument('-f', '--fail-level', - default='high', - choices=['critical', 'high', 'medium', - 'low', 'none', 'aborted'], - help=('Specify the FWTS failure level that will trigger ' - 'this script to return a failing exit code. For ' - 'example, if you chose "critical" as the ' - 'fail-level, this wrapper will NOT return a ' - 'failing exit code unless FWTS reports a test as ' - 'FAILED_CRITICAL. You will still be notified of ' - 'all FWTS test failures. [Default level: ' - '%(default)s]')) + default='high', + choices=['critical', 'high', 'medium', + 'low', 'none', 'aborted'], + help=('Specify the FWTS failure level that will ' + 'trigger this script to return a failing exit ' + 'code. For example, if you chose "critical" as ' + 'the fail-level, this wrapper will NOT return ' + 'a failing exit code unless FWTS reports a ' + 'test as FAILED_CRITICAL. You will still be ' + 'notified of all FWTS test failures. ' + '[Default level: %(default)s]')) sleep_args = parser.add_argument_group('Sleep Options', - ('The following arguments are to only be used with the ' - '--sleep test option')) + ('The following arguments are to ' + 'only be used with the ' + '--sleep test option')) sleep_args.add_argument('--sleep-time', dest='sleep_time', action='store', @@ -155,25 +215,31 @@ def main(): action='append', help='Name of the test to run.') group.add_argument('-a', '--all', - action='store_true', - help='Run ALL FWTS automated tests (assumes -w and -c)') + action='store_true', + help='Run ALL FWTS automated tests (assumes -w and -c)') group.add_argument('-s', '--sleep', - nargs=REMAINDER, - action='store', - help=('Perform sleep test(s) using the additional\n' - 'arguments provided after --sleep. All remaining\n' - 'items on the command line will be passed \n' - 'through to fwts for performing sleep tests. \n' - 'For info on these extra fwts options, please \n' - 'see the epilog below and \n' - 'the --fwts-help option.')) + nargs=REMAINDER, + action='store', + help=('Perform sleep test(s) using the additional\n' + 'arguments provided after --sleep. Remaining\n' + 'items on the command line will be passed \n' + 'through to fwts for performing sleep tests. \n' + 'For info on these extra fwts options, please \n' + 'see the epilog below and \n' + 'the --fwts-help option.')) group.add_argument('--fwts-help', - dest='fwts_help', - action='store_true', - help='Display the help info for fwts itself (lengthy)') + dest='fwts_help', + action='store_true', + help='Display the help info for fwts itself (lengthy)') group.add_argument('--list', action='store_true', help='List all tests in fwts.') + group.add_argument('--list-cert', + action='store_true', + help='List all certification tests in fwts.') + group.add_argument('--list-advanced', + action='store_true', + help='List all advanced tests in fwts.') args = parser.parse_args() tests = [] @@ -188,16 +254,16 @@ def main(): # Set correct fail level if args.fail_level is not 'none': args.fail_level = 'FAILED_%s' % args.fail_level.upper() - + # Get our failure priority and create the priority values - fail_levels = {'FAILED_CRITICAL':4, - 'FAILED_HIGH':3, - 'FAILED_MEDIUM':2, - 'FAILED_LOW':1, - 'FAILED_NONE':0, - 'FAILED_ABORTED':-1} + fail_levels = {'FAILED_CRITICAL': 4, + 'FAILED_HIGH': 3, + 'FAILED_MEDIUM': 2, + 'FAILED_LOW': 1, + 'FAILED_NONE': 0, + 'FAILED_ABORTED': -1} fail_priority = fail_levels[args.fail_level] - + # Enforce only using sleep opts with --sleep if args.sleep_time or args.resume_time and not args.sleep: parser.error('--sleep-time and --resume-time only apply to the ' @@ -208,6 +274,12 @@ def main(): elif args.list: print('\n'.join(TESTS)) return 0 + elif args.list_cert: + print('\n'.join(CERT_TESTS)) + return 0 + elif args.list_advanced: + print('\n'.join(NON_CERT_TESTS)) + return 0 elif args.test: tests.extend(args.test) elif args.all: @@ -232,11 +304,11 @@ def main(): sleep_time_arg = '--sleep-time' if resume_time_arg in args.sleep: args.resume_time = int(args.sleep.pop( - args.sleep.index(resume_time_arg) + 1)) + args.sleep.index(resume_time_arg) + 1)) args.sleep.remove(resume_time_arg) if sleep_time_arg in args.sleep: args.sleep_time = int(args.sleep.pop( - args.sleep.index(sleep_time_arg) + 1)) + args.sleep.index(sleep_time_arg) + 1)) args.sleep.remove(sleep_time_arg) # if we still haven't set a sleep or resume time, use defauts. if not args.sleep_time: @@ -245,31 +317,62 @@ def main(): args.resume_time = 3 tests.extend(args.sleep) else: - tests.extend(TESTS) + tests.extend(CERT_TESTS) # run the tests we want if args.sleep: iteration_results = {} print('=' * 20 + ' Test Results ' + '=' * 20) + progress_indicator = None + if detect_progress_indicator(): + progress_indicator = Popen(detect_progress_indicator(), + stdin=PIPE) for iteration in range(0, iterations): timestamp = int(time()) start_marker = 'CHECKBOX SLEEP TEST START %s' % timestamp end_marker = 'CHECKBOX SLEEP TEST STOP %s' % timestamp syslog(LOG_INFO, '---' + start_marker + '---' + str(time())) command = ('fwts -q --stdout-summary -r %s %s' - % (args.log, ' '.join(tests))) + % (args.log, ' '.join(tests))) results['sleep'] = (Popen(command, stdout=PIPE, shell=True) .communicate()[0].strip()).decode() syslog(LOG_INFO, '---' + end_marker + '---' + str(time())) if 's4' not in args.sleep: - iteration_results[iteration] = get_sleep_times(start_marker, - end_marker, args.sleep_time, - args.resume_time) - print(' - Cycle %s: Status: %s Sleep Elapsed: %0.5f ' - 'Resume Elapsed: %0.5f' % (iteration, - iteration_results[iteration][0], - iteration_results[iteration][1], - iteration_results[iteration][2])) + sleep_times = get_sleep_times(start_marker, + end_marker, + args.sleep_time, + args.resume_time) + iteration_results[iteration] = sleep_times + progress_tuple = (iteration, + iteration_results[iteration][0], + iteration_results[iteration][1], + iteration_results[iteration][2]) + progress_string = (' - Cycle %s: Status: %s ' + 'Sleep Elapsed: %0.5f ' + 'Resume Elapsed: ' + ' %0.5f' % progress_tuple) + progress_pct = "{}".format(int(100 * iteration / iterations)) + if "zenity" in detect_progress_indicator(): + progress_indicator.stdin.write("# {}\n".format( + progress_string).encode('utf-8')) + progress_indicator.stdin.write("{}\n".format( + progress_pct).encode('utf-8')) + progress_indicator.stdin.flush() + elif "dialog" in detect_progress_indicator(): + progress_indicator.stdin.write("XXX\n".encode('utf-8')) + progress_indicator.stdin.write( + progress_pct.encode('utf-8')) + progress_indicator.stdin.write( + "\nTest progress\n".encode('utf-8')) + progress_indicator.stdin.write( + progress_string.encode('utf-8')) + progress_indicator.stdin.write( + "\nXXX\n".encode('utf-8')) + progress_indicator.stdin.flush() + else: + print(progress_string) + progress_indicator.terminate() + if 's4' not in args.sleep: average_times(iteration_results) for run in iteration_results.keys(): @@ -304,41 +407,41 @@ def main(): print("WARNING: The following test cases were reported as critical\n" "level failures by fwts. Please review the log at\n" "%s for more information." % args.log) - for test in critical_fails: + for test in critical_fails: print(" - " + test) if high_fails: print("High Failures: %d" % len(high_fails)) print("WARNING: The following test cases were reported as high\n" "level failures by fwts. Please review the log at\n" "%s for more information." % args.log) - for test in high_fails: + for test in high_fails: print(" - " + test) if medium_fails: print("Medium Failures: %d" % len(medium_fails)) print("WARNING: The following test cases were reported as medium\n" "level failures by fwts. Please review the log at\n" "%s for more information." % args.log) - for test in medium_fails: + for test in medium_fails: print(" - " + test) if low_fails: print("Low Failures: %d" % len(low_fails)) print("WARNING: The following test cases were reported as low\n" "level failures by fwts. Please review the log at\n" "%s for more information." % args.log) - for test in low_fails: + for test in low_fails: print(" - " + test) if passed: print("Passed: %d" % len(passed)) - for test in passed: + for test in passed: print(" - " + test) if aborted: print("Aborted Tests: %d" % len(aborted)) print("WARNING: The following test cases were aborted by fwts\n" "Please review the log at %s for more information." % args.log) - for test in aborted: + for test in aborted: print(" - " + test) - + if args.fail_level is not 'none': if fail_priority == fail_levels['FAILED_CRITICAL']: if critical_fails: diff --git a/bin/gateway_ping_test b/bin/gateway_ping_test index 19d9f2b..87af5d3 100755 --- a/bin/gateway_ping_test +++ b/bin/gateway_ping_test @@ -1,45 +1,64 @@ #!/usr/bin/python3 +# This file is part of Checkbox. +# +# Copyright 2007-2014 Canonical Ltd. +# Written by: +# Brendan Donegan <brendan.donegan@canonical.com> +# Daniel Manrique <daniel.manrique@canonical.com> +# David Murphy <david.murphy@canonical.com> +# Javier Collado <javier.collado@canonical.com> +# Jeff Lane <jeffrey.lane@canonical.com> +# Marc Tardif <marc.tardif@canonical.com> +# Mathieu Trudel-Lapierre <mathieu.trudel-lapierre@canonical.com> +# Zygmunt Krynicki <zygmunt.krynicki@canonical.com> +# +# Checkbox is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License version 3, +# as published by the Free Software Foundation. +# +# Checkbox is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with Checkbox. If not, see <http://www.gnu.org/licenses/>. +from gettext import gettext as _ +import argparse +import errno +import gettext +import logging import os import re -import sys - -import logging import socket import struct import subprocess -import gettext +import sys import time -from gettext import gettext as _ - -from argparse import ArgumentParser - -class Route(object): - """Gets routing information from the system. +class Route: + """ + Gets routing information from the system. """ - - # auxiliary functions - def _hex_to_dec(self, string): - """Returns the integer value of a hexadecimal string s - """ - return int(string, 16) def _num_to_dotted_quad(self, number): - """Convert long int to dotted quad string + """ + Convert long int to dotted quad string """ return socket.inet_ntoa(struct.pack("<L", number)) def _get_default_gateway_from_proc(self): - """"Returns the current default gateway, reading that from /proc """ - logging.debug("Reading default gateway information from /proc") + Returns the current default gateway, reading that from /proc + """ + logging.debug(_("Reading default gateway information from /proc")) try: - file = open("/proc/net/route") - route = file.read() + with open("/proc/net/route", "rt") as stream: + route = stream.read() except: - logging.error("Failed to read def gateway from /proc") + logging.error(_("Failed to read def gateway from /proc")) return None else: h = re.compile("\n(?P<interface>\w+)\s+00000000\s+" @@ -47,24 +66,26 @@ class Route(object): w = h.search(route) if w: if w.group("def_gateway"): - return (self._num_to_dotted_quad( - self._hex_to_dec(w.group("def_gateway")))) + return self._num_to_dotted_quad( + int(w.group("def_gateway"), 16)) else: - logging.error("Could not find def gateway info in /proc") + logging.error( + _("Could not find def gateway info in /proc")) return None else: - logging.error("Could not find def gateway info in /proc") + logging.error(_("Could not find def gateway info in /proc")) return None def _get_default_gateway_from_bin_route(self): - """Get default gateway from /sbin/route -n + """ + Get default gateway from /sbin/route -n Called by get_default_gateway and is only used if could not get that from /proc """ - logging.debug("Reading default gateway information from route binary") - routebin = subprocess.getstatusoutput("export LANGUAGE=C; " - "/usr/bin/env route -n") - + logging.debug( + _("Reading default gateway information from route binary")) + routebin = subprocess.getstatusoutput( + "export LANGUAGE=C; " "/usr/bin/env route -n") if routebin[0] == 0: h = re.compile("\n0.0.0.0\s+(?P<def_gateway>[\w.]+)\s+") w = h.search(routebin[1]) @@ -72,8 +93,7 @@ class Route(object): def_gateway = w.group("def_gateway") if def_gateway: return def_gateway - - logging.error("Could not find default gateway by running route") + logging.error(_("Could not find default gateway by running route")) return None def get_hostname(self): @@ -83,29 +103,25 @@ class Route(object): t1 = self._get_default_gateway_from_proc() if not t1: t1 = self._get_default_gateway_from_bin_route() - return t1 def get_host_to_ping(interface=None, verbose=False, default=None): - #Get list of all IPs from all my interfaces, + # Get list of all IPs from all my interfaces, interface_list = subprocess.check_output(["ip", "-o", 'addr', 'show']) - reg = re.compile('\d: (?P<iface>\w+) +inet (?P<address>[\d\.]+)/' '(?P<netmask>[\d]+) brd (?P<broadcast>[\d\.]+)') # Will magically exclude lo because it lacks brd field interfaces = reg.findall(interface_list.decode()) - # ping -b the network on each one (one ping only) # exclude the ones not specified in iface for iface in interfaces: if not interface or iface[0] == interface: - #Use check_output even if I'll discard the output - #looks cleaner than using .call and redirecting stdout to null + # Use check_output even if I'll discard the output + # looks cleaner than using .call and redirecting stdout to null try: - (subprocess - .check_output(["ping", "-q", "-c", "1", "-b", iface[3]], - stderr=subprocess.STDOUT)) + subprocess.check_output(["ping", "-q", "-c", "1", "-b", + iface[3]], stderr=subprocess.STDOUT) except subprocess.CalledProcessError: pass # If default host given, ping it as well, @@ -117,157 +133,154 @@ def get_host_to_ping(interface=None, verbose=False, default=None): stderr=subprocess.STDOUT) except subprocess.CalledProcessError: pass - ARP_POPULATE_TRIES = 10 num_tries = 0 - while num_tries < ARP_POPULATE_TRIES: - #Get output from arp -a -n to get known IPs + # Get output from arp -a -n to get known IPs known_ips = subprocess.check_output(["arp", "-a", "-n"]) reg = re.compile('\? \((?P<ip>[\d.]+)\) at (?P<mac>[a-f0-9\:]+) ' '\[ether\] on (?P<iface>[\w\d]+)') - - #Filter (if needed) IPs not on the specified interface + # Filter (if needed) IPs not on the specified interface pingable_ips = [pingable[0] for pingable in reg.findall( known_ips.decode()) if not interface or pingable[2] == interface] - # If the default given ip is among the remaining ones, # ping that. if default and default in pingable_ips: if verbose: - print("Desired ip address %s is reachable, using it" % default) + print(_( + "Desired ip address {0} is reachable, using it" + ).format(default)) return default - - #If not, choose another IP. + # If not, choose another IP. address_to_ping = pingable_ips[0] if len(pingable_ips) else None if verbose: - print("Desired ip address %s is not reachable from %s. " - % (default, interface)) - print("using %s instead." % address_to_ping) - + print(_( + "Desired ip address {0} is not reachable from {1}," + " using {2} instead" + ).format(default, interface, address_to_ping)) if address_to_ping: return address_to_ping - time.sleep(2) num_tries += 1 - # Wait time expired return None def ping(host, interface, count, deadline, verbose=False): - command = "ping -c %s -w %s %s" % (count, deadline, host) + command = ["ping", str(host), "-c", str(count), "-w", str(deadline)] if interface: - command = ("ping -I%s -c %s -w %s %s" - % (interface, count, deadline, host)) - - reg = re.compile(r"(\d+) packets transmitted, (\d+) received, (\d+)% packet loss") - ping_summary = None - - output = os.popen(command) - for line in output.readlines(): + command.append("-I{}".format(interface)) + reg = re.compile( + r"(\d+) packets transmitted, (\d+) received, (\d+)% packet loss") + ping_summary = {'transmitted': 0, 'received': 0, 'pct_loss': 0} + try: + output = subprocess.check_output(command, universal_newlines=True) + except OSError as exc: + if exc.errno == errno.ENOENT: + # No ping command present; + # default exception message is informative enough. + print(exc) + else: + raise + except subprocess.CalledProcessError as excp: + # Ping returned fail exit code + print(_("ERROR: ping result: {0}").format(excp)) + else: if verbose: - print(line.rstrip()) - - received = re.findall(reg, line) + print(output) + received = re.findall(reg, output) if received: ping_summary = received[0] - - ping_summary={'transmitted': int(ping_summary[0]), - 'received': int(ping_summary[1]), - 'pct_loss': int(ping_summary[2])} + ping_summary = { + 'transmitted': int(ping_summary[0]), + 'received': int(ping_summary[1]), + 'pct_loss': int(ping_summary[2])} return ping_summary def main(args): - - gettext.textdomain("checkbox") - + gettext.textdomain("2013.com.canonical.certification.checkbox") + gettext.bindtextdomain("2013.com.canonical.certification.checkbox", + os.getenv("CHECKBOX_PROVIDER_LOCALE_DIR", None)) default_count = 2 default_delay = 4 - route = Route() - - parser = ArgumentParser() - parser.add_argument("host", - nargs='?', - default=route.get_default_gateway(), - help="Host to ping") - parser.add_argument("-c", "--count", - default=default_count, - type=int, - help="Number of packets to send.") - parser.add_argument("-d", "--deadline", - default=default_delay, - type=int, - help="Timeouts in seconds.") - parser.add_argument("-t", "--threshold", - default=0, - type=int, - help="Percentage of allowed packet loss before " - "considering test failed. Defaults to 0 " - "(meaning any packet loss will fail the test)") - parser.add_argument("-v", "--verbose", - action='store_true', - help="Be verbose.") - parser.add_argument("-I", "--interface", - help="Interface to ping from.") + parser = argparse.ArgumentParser() + parser.add_argument( + "host", nargs='?', default=route.get_default_gateway(), + help=_("host to ping")) + parser.add_argument( + "-c", "--count", default=default_count, type=int, + help=_("number of packets to send")) + parser.add_argument( + "-d", "--deadline", default=default_delay, type=int, + help=_("timeout in seconds")) + parser.add_argument( + "-t", "--threshold", default=0, type=int, + help=_("allowed packet loss percentage (default: %(default)s)")) + parser.add_argument( + "-v", "--verbose", action='store_true', help=_("be verbose")) + parser.add_argument( + "-I", "--interface", help=_("use specified interface to send packets")) args = parser.parse_args() - - #Ensure count and deadline make sense. Adjust them if not. - + # Ensure count and deadline make sense. Adjust them if not. if args.deadline != default_delay and args.count != default_count: - #Ensure they're both consistent, and exit with a warning if - #not, rather than modifying what the user explicitly set. + # Ensure they're both consistent, and exit with a warning if not, + # rather than modifying what the user explicitly set. if args.deadline <= args.count: - print("ERROR: not enough time for %s pings in %s seconds" % - (args.count, args.deadline)) - return(1) + # FIXME: this cannot ever be translated correctly + print(_( + "ERROR: not enough time for {0} pings in {1} seconds" + ).format(args.count, args.deadline)) + return 1 elif args.deadline != default_delay: - #Adjust count according to delay. + # Adjust count according to delay. args.count = args.deadline - 1 if args.count < 1: args.count = 1 if args.verbose: - print("Adjusting ping count to %s to fit in %s-second deadline" % - (args.count, args.deadline)) + # FIXME: this cannot ever be translated correctly + print(_( + "Adjusting ping count to {0} to fit in {1}-second deadline" + ).format(args.count, args.deadline)) else: - #Adjust delay according to count + # Adjust delay according to count args.deadline = args.count + 1 if args.verbose: - print("Adjusting deadline to %s seconds to fit %s pings" % - (args.deadline, args.count)) - - #If given host is not pingable, override with something pingable. - host = get_host_to_ping(interface=args.interface, - verbose=args.verbose, default=args.host) - + # FIXME: this cannot ever be translated correctly + print(_( + "Adjusting deadline to {0} seconds to fit {1} pings" + ).format(args.deadline, args.count)) + # If given host is not pingable, override with something pingable. + host = get_host_to_ping( + interface=args.interface, verbose=args.verbose, default=args.host) if args.verbose: - print("Checking connectivity to %s" % host) + print(_("Checking connectivity to {0}").format(host)) ping_summary = None if host: ping_summary = ping(host, args.interface, args.count, args.deadline, args.verbose) - - if ping_summary == None or ping_summary['received'] == 0: + if ping_summary is None or ping_summary['received'] == 0: print(_("No Internet connection")) return 1 elif ping_summary['transmitted'] != ping_summary['received']: - print(_("Connection established, but lost {}% of packets".format( - ping_summary['pct_loss']))) + print(_("Connection established, but lost {0}% of packets").format( + ping_summary['pct_loss'])) if ping_summary['pct_loss'] > args.threshold: - print(_("FAIL: {}% packet loss is higher" - "than {}% threshold").format(ping_summary['pct_loss'], - args.threshold)) + print(_( + "FAIL: {0}% packet loss is higher than {1}% threshold" + ).format(ping_summary['pct_loss'], args.threshold)) return 1 else: - print(_("PASS: {}% packet loss is within {}% threshold").format( - ping_summary['pct_loss'], args.threshold)) + print(_( + "PASS: {0}% packet loss is within {1}% threshold" + ).format(ping_summary['pct_loss'], args.threshold)) return 0 else: print(_("Internet connection fully established")) return 0 + if __name__ == "__main__": sys.exit(main(sys.argv[1:])) diff --git a/bin/get_make_and_model b/bin/get_make_and_model new file mode 100755 index 0000000..bb5efd9 --- /dev/null +++ b/bin/get_make_and_model @@ -0,0 +1,35 @@ +#!/usr/bin/env python3 + +import os.path +import shlex +from subprocess import check_output + +def print_data(key, value): + print("{}: {}".format(key, value)) + +def main(): + keys = {'Manufacturer': 'vendor', + 'Model': 'product', + 'Version': 'version'} + + cmd = "lshw -C system" + + out = check_output(shlex.split(cmd), + universal_newlines = True) + output = out.split('\n') + + data = {} + for key in keys: + for line in output: + if keys[key] in line: + data[key] = line.split(':')[1].strip() + break + else: + data[key] = "NOT FOUND" + + for key in data: + print_data(key, data[key]) + +if __name__ == "__main__": + raise SystemExit(main()) + diff --git a/bin/glob_test b/bin/glob_test deleted file mode 100755 index df13e0f..0000000 --- a/bin/glob_test +++ /dev/null @@ -1,121 +0,0 @@ -#!/usr/bin/python -from Globs import benchmarks, hwd -import argparse -import locale -import logging -import sys -import os - -#Magic to override the _ function from gettext, since -#it's overkill for our needs here. -def underscore(string, *args, **kwargs): - return(string) - -__builtins__._ = underscore - -class Application: - - def __init__(self, share_dir='', bench_dir='', width=800, height=600, - time=5, repetitions=1, fullscreen=False, min_fps=60, - ignore_problems=False): - self.opts= {} - self.opts['width'] = width - self.opts['height'] = height - self.opts['time'] = time - self.opts['repetitions'] = repetitions - self.opts['fullscreen'] = fullscreen - self.min_fps = min_fps - self.ignore_problems = ignore_problems - - self.share_dir = share_dir - self.bench_dir = bench_dir - - - def run(self): - test_pass = True - - self.hwd = hwd.HWDetect() - self.bm = benchmarks.Benchmarks(os.path.join(self.bench_dir, - 'benchmarks')) - ver_str = self.hwd.get_gl_info()['version'] - hwd_ext = self.hwd.get_gl_info()['extensions'] - bench_list = [name for name in self.bm.get_names() if name != 'Fake'] - for benchmark_name in bench_list : - runnable = True - if self.bm.check_ver(benchmark_name, ver_str) == False: - logging.warning("%s requires OpenGL version %s, I have %s", - benchmark_name, - self.bm.get_info(benchmark_name)['glversion'], - ver_str) - runnable = False - test_pass = False - - ext_list = self.bm.check_ext(benchmark_name, hwd_ext) - if ext_list.__class__ == list: # Returned a list of missing exts - missing_ext = '' - for ext in ext_list: - missing_ext += ext - if ext_list.index(ext) != len(ext_list) - 1: - missing_ext += ', ' - logging.warning("Missing extensions: %s",missing_ext) - runnable = False - test_pass = False - if runnable: - fps = self.bm.run(benchmark_name, self.opts) - if fps is None: - #oops, test failed to produce usable result! - print("Test failed to produce FPS measurement.") - print("Possible causes: OpenGL version too low/high") - if self.ignore_problems: - print("Ignoring this as requested") - else: - print("Considering this a FAIL test") - test_pass = False - else: - print("{} {} fps".format(benchmark_name, fps)) - if ( self.min_fps > fps): - print("(Failed to meet minimum {} FPS)".format( - self.min_fps)) - test_pass = False - return test_pass - - - - -share_dir = '/usr/share/globs' -locale_dir = '/usr/share/locale' -bench_dir = '/usr/lib/globs' - -parser = argparse.ArgumentParser("Executes gl benchmarks non-interactively") -parser.add_argument("--width", action='store', - default=800, type=int) -parser.add_argument("--height", action='store', - default=600, type=int) -parser.add_argument("--repetitions", action='store', - default=1, type=int) -parser.add_argument("--time", action='store', - default=10, type=int) -parser.add_argument("--ignore-problems", action='store_true', - default=False, help=("If a test fails to " - "produce a FPS rating, ignore it for global test " - "outcome purposes")) -parser.add_argument("--fullscreen", action='store_true', - default=False) -parser.add_argument("--min-fps", action='store', - default=60.0, type=float, help=("If any of the benchmarks" - "obtains less than this FPS, the test will be considered" - "failed")) - -args = parser.parse_args() - -app = Application(share_dir, bench_dir, args.width, args.height, - args.time, args.repetitions, args.fullscreen, args.min_fps, - args.ignore_problems) - -if app.run(): - print("PASS") - sys.exit(0) -else: - print("FAIL") - sys.exit(1) - diff --git a/bin/gpu_test b/bin/gpu_test index eaf4dda..028eedc 100755 --- a/bin/gpu_test +++ b/bin/gpu_test @@ -140,11 +140,10 @@ class Html5VideoThread(Thread): def check_gpu(log=None): if not log: - log = '/var/log/kern.log' - with open(log, 'rb') as f: - if re.findall(r'gpu\s+hung', str(f.read()), flags=re.I): - print("GPU hung Detected") - return 1 + log = subprocess.check_output(['dmesg'], universal_newlines=True) + if re.findall(r'gpu\s+hung', log, flags=re.I): + print("GPU hung Detected") + return 1 def main(): diff --git a/bin/graphics_driver b/bin/graphics_driver index 102cb61..91568e1 100755 --- a/bin/graphics_driver +++ b/bin/graphics_driver @@ -190,6 +190,8 @@ class XorgLog(object): # For NVIDIA m = re.search(r'\(II\) (.*)\(\d+\): Setting mode "(.*?):', line) + if not m: + m = re.search(r'\(II\) (.*)\(\d+\): Setting mode "(NULL)"', line) if m: self.displays[display_name] = display self.video_driver = m.group(1) @@ -309,7 +311,7 @@ def is_laptop(): def hybrid_graphics_check(xlog): '''Check for Hybrid Graphics''' card_id1 = re.compile('.*0300: *(.+):(.+) \(.+\)') - card_id2 = re.compile('.*0300: *(.+):(.+)') + card_id2 = re.compile('.*03..: *(.+):(.+)') cards_dict = {'8086': 'Intel', '10de': 'NVIDIA', '1002': 'AMD'} cards = [] drivers = [] diff --git a/bin/hdd_parking b/bin/hdd_parking new file mode 100755 index 0000000..18151b4 --- /dev/null +++ b/bin/hdd_parking @@ -0,0 +1,74 @@ +#!/usr/bin/python3 +# -*- coding: utf-8 -*- +# +# hdd_parking +# +# This file is part of Checkbox. +# +# Copyright 2014 Canonical Ltd. +# +# Authors: Brendan Donegan <brendan.donegan@canonical.com> +# +# Checkbox is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License version 3, +# as published by the Free Software Foundation. + +# +# Checkbox is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with Checkbox. If not, see <http://www.gnu.org/licenses/>. + +""" +This script verifies that a systems HDD protection capabilities +are triggered when appropriate. There are many implementations +of HDD protection from different OEMs, each implemented in a +different way, so this script can only support implementations +which are known to work and are testable. Currently the list +of supported implementations is: + +- HDAPS (Lenovo) +""" + + +import sys +import time + +from argparse import ArgumentParser +from subprocess import Popen, PIPE + +TIMEOUT = 15.0 + +def hdaps_test(run_time): + try: + hdapsd = Popen(['/usr/sbin/hdapsd'], stdout=PIPE, stderr=PIPE, + universal_newlines=True) + except OSError as err: + print("Unable to start hdapsd: {}".format(err)) + return 1 + time.sleep(float(run_time)) + hdapsd.terminate() + # Look for parking message in hdapsd output. + stdout = hdapsd.communicate()[0] + print(stdout) + for line in stdout.split('\n'): + if line.endswith('parking'): + return 0 + return 1 + +def main(): + # First establish the driver used + parser = ArgumentParser("Tests a systems HDD protection capabilities. " + "Requires the system to be moved by the tester.") + parser.add_argument('-t', '--timeout', + default=TIMEOUT, + help='The time allowed before the test fails.') + print('Starting HDD protection test - move the system around on ' + 'all axis. No particular force should be required.') + return hdaps_test(parser.parse_args().timeout) + +if __name__ == "__main__": + sys.exit(main()) diff --git a/bin/lock_screen_watcher b/bin/lock_screen_watcher index 1ba9779..b8dd886 100755 --- a/bin/lock_screen_watcher +++ b/bin/lock_screen_watcher @@ -2,6 +2,7 @@ import argparse import sys import subprocess +import lsb_release from gi.repository import GObject @@ -20,13 +21,27 @@ class SceenSaverStatusHelper(threading.Thread): self.quit = False def query(self): - p = subprocess.Popen(["gnome-screensaver-command", "-q"], stdout=subprocess.PIPE) - stdout, stderr = p.communicate() - # parse the stdout string from the command "gnome-screensaver-command -q" - # the result should be "active" or "inactive" - if "active" == stdout.decode().split(" ")[-1][0:-1] : - print("the screensaver is active") - self._loop.quit() + if (lsb_release.get_distro_information()["ID"] == "Ubuntu"): + if (lsb_release.get_distro_information()["CODENAME"] == "trusty"): + # trusty uses login screen as screen saver + process_ps = subprocess.Popen(["ps", "aux"], stdout=subprocess.PIPE) + process_grep = subprocess.Popen(["grep", + "/usr/lib/unity/unity-panel-service --lockscreen-mode"], + stdin=process_ps.stdout, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT) + process_ps.stdout.close() + stdout = process_grep.communicate()[0] + if (len(stdout.decode().split("\n")) == 3): + print("the screensaver is active") + self._loop.quit() + p = subprocess.Popen(["gnome-screensaver-command", "-q"], stdout=subprocess.PIPE) + stdout, stderr = p.communicate() + # parse the stdout string from the command "gnome-screensaver-command -q" + # the result should be "active" or "inactive" + if "active" == stdout.decode().split(" ")[-1].rstrip() : + print("the screensaver is active") + self._loop.quit() def run(self): while not self.quit: diff --git a/bin/manage_compiz_plugin b/bin/manage_compiz_plugin new file mode 100755 index 0000000..0b8e479 --- /dev/null +++ b/bin/manage_compiz_plugin @@ -0,0 +1,63 @@ +#!/usr/bin/env python3 +# This file is part of Checkbox. +# +# Copyright 2014-2015 Canonical Ltd. +# Written by: +# Daniel Manrique <roadmr@ubuntu.com> +# Sylvain Pineau <sylvain.pineau@canonical.com> +# +# Checkbox is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License version 3, +# as published by the Free Software Foundation. +# +# Checkbox is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with Checkbox. If not, see <http://www.gnu.org/licenses/>. +""" +manage_compiz_plugin +==================== + +This script allows enabling or disabling compiz plugins using +gsettings. Changes take effect on the fly. +""" + +from gettext import gettext as _ +import argparse +import gettext +import os +import sys +import subprocess +import time + +PATH="org.compiz.core:/org/compiz/profiles/unity/plugins/core/" +KEY="active-plugins" + +gettext.textdomain("2013.com.canonical.certification.checkbox") +gettext.bindtextdomain("2013.com.canonical.certification.checkbox", + os.getenv("CHECKBOX_PROVIDER_LOCALE_DIR", None)) + +plugins = eval(subprocess.check_output(["gsettings", "get", PATH, KEY])) + +parser = argparse.ArgumentParser(description=_("enable/disable compiz plugins"), + epilog=_("Available plugins: {}").format(plugins)) +parser.add_argument("plugin", type=str, help=_('Name of plugin to control')) +parser.add_argument("action", type=str, choices=['enable', 'disable'], + help=_("What to do with the plugin")) + +args = parser.parse_args() + +if args.action == 'enable': + if args.plugin in plugins: + raise SystemExit(_("Plugin {} already enabled").format(args.plugin)) + plugins.append(args.plugin) +else: + if args.plugin not in plugins: + raise SystemExit(_("Plugin {} doesn't exist").format(args.plugin)) + plugins.remove(args.plugin) +subprocess.call(["gsettings", "set", PATH, KEY, str(plugins)]) + +time.sleep(3) diff --git a/bin/memory_compare b/bin/memory_compare index 24d6c84..12a14ec 100755 --- a/bin/memory_compare +++ b/bin/memory_compare @@ -1,13 +1,36 @@ #!/usr/bin/env python3 +# -*- coding: utf-8 -*- +# +# This file is part of Checkbox. +# +# Copyright 2014 Canonical Ltd. +# +# Authors: +# Brendan Donegan <brendan.donegan@canonical.com> +# Jeff Lane <jeff.lane@canonical.com> +# Sylvain Pineau <sylvain.pineau@canonical.com> +# +# Checkbox is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License version 3, +# as published by the Free Software Foundation. +# +# Checkbox is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with Checkbox. If not, see <http://www.gnu.org/licenses/>. import os import sys +from math import log, copysign +from subprocess import check_output, PIPE +from checkbox_support.helpers.human_readable_bytes import HumanReadableBytes from checkbox_support.parsers.lshwjson import LshwJsonParser from checkbox_support.parsers.meminfo import MeminfoParser -from subprocess import check_output, PIPE -THRESHOLD = 25 class LshwJsonResult: @@ -20,8 +43,9 @@ class LshwJsonResult: elif 'bank' in hardware['id']: self.banks_reported += int(hardware.get('size', 0)) + def get_installed_memory_size(): - lshw = LshwJsonParser(check_output(['lshw','-json'], + lshw = LshwJsonParser(check_output(['lshw', '-json'], universal_newlines=True, stderr=PIPE)) result = LshwJsonResult() @@ -32,6 +56,7 @@ def get_installed_memory_size(): else: return result.banks_reported + class MeminfoResult: memtotal = 0 @@ -39,6 +64,7 @@ class MeminfoResult: def setMemory(self, memory): self.memtotal = memory['total'] + def get_visible_memory_size(): parser = MeminfoParser(open('/proc/meminfo')) result = MeminfoResult() @@ -46,6 +72,7 @@ def get_visible_memory_size(): return result.memtotal + def get_threshold(installed_memory): GB = 1024**3 if installed_memory <= 2 * GB: @@ -55,36 +82,46 @@ def get_threshold(installed_memory): else: return 10 + def main(): if os.geteuid() != 0: print("This script must be run as root.", file=sys.stderr) return 1 - installed_memory = get_installed_memory_size() - visible_memory = get_visible_memory_size() + installed_memory = HumanReadableBytes(get_installed_memory_size()) + visible_memory = HumanReadableBytes(get_visible_memory_size()) threshold = get_threshold(installed_memory) - difference = installed_memory - visible_memory + difference = HumanReadableBytes(installed_memory - visible_memory) try: percentage = difference / installed_memory * 100 except ZeroDivisionError: print("Results:") - print("\t/proc/meminfo reports:\t%s kB" % (visible_memory / 1024), file=sys.stderr) - print("\tlshw reports:\t%s kB" % (installed_memory / 1024), file=sys.stderr) - print("\nFAIL: Either lshw or /proc/meminfo returned a memory size of 0 kB", file=sys.stderr) + print("\t/proc/meminfo reports:\t{}".format(visible_memory), + file=sys.stderr) + print("\tlshw reports:\t{}".format(installed_memory), + file=sys.stderr) + print("\nFAIL: Either lshw or /proc/meminfo returned a memory size " + "of 0 kB", file=sys.stderr) return 1 - + if percentage <= threshold: print("Results:") - print("\t/proc/meminfo reports:\t%s kB" % (visible_memory / 1024)) - print("\tlshw reports:\t%s kB" % (installed_memory / 1024)) - print("\nPASS: Meminfo reports %d bytes less than lshw, a difference of %.2f%%. This is less than the %d%% variance allowed." % (difference, percentage, threshold)) + print("\t/proc/meminfo reports:\t{}".format(visible_memory)) + print("\tlshw reports:\t{}".format(installed_memory)) + print("\nPASS: Meminfo reports %s less than lshw, a " + "difference of %.2f%%. This is less than the " + "%d%% variance allowed." % (difference, percentage, threshold)) return 0 else: - print("Results") - print("\t/proc/meminfo reports:\t%s kB" % (visible_memory / 1024), file=sys.stderr) - print("\tlshw reports:\t%s kB" % (installed_memory / 1024), file=sys.stderr) - print("\nFAIL: Meminfo reports %d bytes less than lshw, a difference of %.2f%%. Only a variance of %d%% in reported memory is allowed." % (difference, percentage, threshold), file=sys.stderr) + print("Results:", file=sys.stderr) + print("\t/proc/meminfo reports:\t{}".format(visible_memory), + file=sys.stderr) + print("\tlshw reports:\t{}".format(installed_memory), file=sys.stderr) + print("\nFAIL: Meminfo reports %d less than lshw, " + "a difference of %.2f%%. Only a variance of %d%% in " + "reported memory is allowed." % + (difference, percentage, threshold), file=sys.stderr) return 1 if __name__ == "__main__": diff --git a/bin/memory_test b/bin/memory_test index c42b05f..4cd6008 100755 --- a/bin/memory_test +++ b/bin/memory_test @@ -131,7 +131,7 @@ class MemoryTest(): print("Running threaded memory test:") run_time = 60 # sec. - if not self.run_processes(processes, "%s -qpv -m%um -t%u" % ( + if not self.run_processes(processes, "%s -qv -m%um -t%u" % ( self.threaded_memtest_script, self.process_memory, run_time)): print("Multi-process, threaded memory Test FAILED", file=sys.stderr) @@ -166,7 +166,7 @@ class MemoryTest(): print("Running for more than free memory at %u MB for %u sec." % ( memory, run_time)) - command = "%s -qpv -m%um -t%u" % ( + command = "%s -qv -m%um -t%u" % ( self.threaded_memtest_script, memory, run_time) print("Command is: %s" % command) process = self._command(command) @@ -180,7 +180,7 @@ class MemoryTest(): # run again for 15 minutes print("Running for free memory") - process = self._command("%s -qpv" % self.threaded_memtest_script) + process = self._command("%s -qv" % self.threaded_memtest_script) process.communicate() if process.returncode != 0: print("Free Memory Test failed", file=sys.stderr) diff --git a/bin/network b/bin/network index 6ab5b16..daef596 100755 --- a/bin/network +++ b/bin/network @@ -1,10 +1,11 @@ #!/usr/bin/env python3 """ -Copyright (C) 2012-2014 Canonical Ltd. +Copyright (C) 2012-2015 Canonical Ltd. Authors Jeff Marcom <jeff.marcom@canonical.com> Daniel Manrique <roadmr@ubuntu.com> + Jeff Lane <jeff@ubuntu.com> This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License version 3, @@ -23,10 +24,7 @@ from argparse import ( ArgumentParser, RawTextHelpFormatter ) -import configparser import fcntl -import ftplib -from ftplib import FTP import logging import os import re @@ -34,15 +32,16 @@ import shlex import socket import struct import subprocess +import tempfile from subprocess import ( CalledProcessError, check_call, - check_output + check_output, + STDOUT ) import sys import time -logging.basicConfig(level=logging.DEBUG) class IPerfPerformanceTest(object): @@ -56,19 +55,41 @@ class IPerfPerformanceTest(object): target, fail_threshold, protocol="tcp", - mbytes="1024M"): + data_size="1"): self.iface = Interface(interface) self.target = target self.protocol = protocol self.fail_threshold = fail_threshold - - self.mbytes = mbytes + self.data_size = data_size def run(self): - cmd = "timeout 180 iperf -c {} -n {}".format(self.target, self.mbytes) + # if max_speed is 0, assume it's wifi and move on + if self.iface.max_speed == 0: + logging.warning("No max speed detected, assuming Wireless device " + "and continuing with test.") + # Otherwise, no sense in running if we're not running at full speed. + elif self.iface.max_speed > self.iface.link_speed: + logging.error("Detected link speed (%s) is lower than " + "detected max speed (%s)" % + (self.iface.link_speed, self.iface.max_speed)) + logging.error("Check your device configuration and try again") + return 1 + + # Because we can vary the data size, we need to vary the timeout as + # well. It takes an estimated 15 minutes to send 1GB over 10Mb/s. + # 802.11b is 11 Mb/s. So we'll assume 1.2x15 minutes or 18 minutes + # or 1080 seconds per Gigabit. This will allow for a long period of + # time without timeout to catch devices that slow down, and also not + # prematurely end iperf on low-bandwidth devices. + + self.timeout = 1080*int(self.data_size) + + cmd = "timeout {} iperf -c {} -n {}G -i 1 -f m".format( + self.timeout, self.target, self.data_size) logging.debug(cmd) + print("Starting iperf, this could take some time...") try: iperf_return = check_output( shlex.split(cmd), universal_newlines=True) @@ -84,46 +105,45 @@ class IPerfPerformanceTest(object): # "pass through" whatever output iperf did manage to produce. # When confronted with SIGTERM iperf should stop and output # a partial (but usable) result. - logging.warning("iperf timed out - this should be OK") + logging.info("iperf timed out - this should be OK") iperf_return = iperf_exception.output # 930 Mbits/sec\n' - print(iperf_return) - match = re.search(r'[\d\.]+\s([GM])bits', iperf_return) - if match: - throughput = match.group(0).split()[0] - units = match.group(1) - # self.iface.max_speed is always in mb/s, so we need to scale - # throughput to match - scaled_throughput = float(throughput) - if units == 'G': - scaled_throughput *= 1000 - if units == 'K': - scaled_throughput /= 1000 + logging.debug(iperf_return) + speeds = list(map(float, re.findall(r"([\w\.]+)\sMbits/sec", + iperf_return))) + invalid_speed = False + if speeds: + throughput = sum(speeds)/len(speeds) try: - percent = scaled_throughput / int(self.iface.max_speed) * 100 - except (ZeroDivisionError, TypeError) as error: + percent = throughput / int(self.iface.max_speed) * 100 + except (ZeroDivisionError, TypeError): # Catches a condition where the interface functions fine but # ethtool fails to properly report max speed. In this case # it's up to the reviewer to pass or fail. - logging.error("Max Speed was not reported properly. Run " - "ethtool and verify that the card is properly " - "reporting its capabilities.") - logging.error(error) percent = 0 - - print("\nTransfer speed: {} {}b/s".format(throughput, units)) - print("%3.2f%% of " % percent, end="") - try: - print("theoretical max %sMb/s\n" % int(self.iface.max_speed)) - except TypeError as error: - logging.error("Max Speed was not reported properly. Run " - "ethtool and verify that the card is properly " - "reporting its capabilities.") - logging.error(error) - + invalid_speed = True + + logging.info("Min Transfer speed: {} mb/s".format(min(speeds))) + logging.info("Max Transfer speed: {} mb/s".format(max(speeds))) + logging.info("Avg Transfer speed: {} mb/s".format(throughput)) + if invalid_speed: + # If we have no link_speed (e.g. wireless interfaces don't + # report this), then we shouldn't penalize them because + # the transfer may have been reasonable. So in this case, + # we'll exit with a pass-warning. + logging.info("Unable to obtain maximum speed.") + logging.info("Considering the test as passed.") + return 0 + # Below is guaranteed to not throw an exception because we'll + # have exited above if it did. + logging.info("{:03.2f}% of theoretical max {} Mb/s".format(percent, + int(self.iface.max_speed))) if percent < self.fail_threshold: logging.warn("Poor network performance detected") + logging.warn(" Transfer speed: {} mb/s".format(throughput)) + logging.warn(" {:03.2f}% of theoretical max {}Mb/s\n".format( + percent, int(self.iface.max_speed))) return 30 logging.debug("Passed benchmark") @@ -132,152 +152,6 @@ class IPerfPerformanceTest(object): return 1 -class FTPPerformanceTest(object): - """Provides file transfer rate based information while - using the FTP protocol and sending a file (DEFAULT=1GB) - over the local or public network using a specified network - interface on the host.""" - - def __init__( - self, - target, - username, - password, - interface, - binary_size=1, - file2send="ftp_performance_test"): - - self.target = target - self.username = username - self.password = password - self.iface = Interface(interface) - self.binary_size = binary_size - self.file2send = file2send - - def _make_file2send(self): - """ - Makes binary file to send over FTP. - Size defaults to 1GB if not supplied. - """ - - logging.debug("Creating %sGB file", self.binary_size) - - file_size = (1024 * 1024 * 1024) * self.binary_size - with open(self.file2send, "wb") as out: - out.seek((file_size) - 1) - out.write('\0'.encode()) - - def send_file(self, filename=None): - """ - Sends file over the network using FTP and returns the - amount of bytes sent and delay between send and completed. - """ - - if filename is None: - file = open(self.file2send, 'rb') - filename = self.file2send - - send_time = time.time() - - try: - logging.debug("Sending file") - self.remote.storbinary("STOR " + filename, file, 1024) - except (ftplib.all_errors) as send_failure: - logging.error("Failed to send file to %s", self.target) - logging.error("Reason: %s", send_failure) - return 0, 0 - - file.close() - - time_lapse = time.time() - send_time - bytes_sent = os.stat(filename).st_size - - return bytes_sent, time_lapse - - def close_connection(self): - """ - Close connection to remote FTP target - """ - self.remote.close() - - def connect(self): - """ - Connects to FTP target and set the current directory as / - """ - - logging.debug("Connecting to %s", self.target) - try: - self.remote = FTP(self.target) - self.remote.set_debuglevel(2) - self.remote.set_pasv(True) - except socket.error as connect_exception: - logging.error("Failed to connect to: %s", self.target) - return False - - logging.debug("Logging in") - logging.debug("{USER:%s, PASS:%s}", self.username, self.password) - - try: - self.remote.login(self.username, self.password) - except ftplib.error_perm as login_exception: - logging.error("failed to log into target: %s", self.target) - return False - - default_out_dir = "" - self.remote.cwd(default_out_dir) - return True - - def run(self): - - info = { - "Interface": self.iface.interface, - "HWAddress": self.iface.macaddress, - "Duplex": self.iface.duplex_mode, - "Speed": self.iface.max_speed, - "Status": self.iface.status - } - - logging.debug(info) - - if not os.path.isfile(self.file2send): - self._make_file2send() - - # Connect to FTP target and send file - connected = self.connect() - - if connected is False: - return 3 - - filesize, delay = self.send_file() - - # Remove created binary - try: - os.remove(self.file2send) - except (IOError, OSError) as file_delete_error: - logging.error("Could not remove previous ftp file") - logging.error(file_delete_error) - - if connected and filesize > 0: - - logging.debug("Bytes sent (%s): %.2f seconds", filesize, delay) - - # Calculate transfer rate and determine pass/fail status - mbs_speed = float(filesize / 131072) / float(delay) - percent = (mbs_speed / int(info["Speed"])) * 100 - print("Transfer speed:") - print("%3.2f%% of" % percent) - print("theoretical max %smbs" % int(info["Speed"])) - - if percent < 40: - logging.warn("Poor network performance detected") - return 30 - - logging.debug("Passed benchmark") - else: - print("Failed sending file via ftp") - return 1 - - class StressPerformanceTest: def __init__(self, interface, target): @@ -334,7 +208,7 @@ class Interface(socket.socket): try: return open(os.path.join(self.dev_path, type)).read().strip() except OSError: - print("{}: Attribute not found".format(type)) + logging.info("%s: Attribute not found", type) @property def ipaddress(self): @@ -359,8 +233,37 @@ class Interface(socket.socket): return socket.inet_ntoa(mask_data[20:24]) @property + def link_speed(self): + return int(self._read_data("speed")) + + @property def max_speed(self): - return self._read_data("speed") + # Parse ethtool data for max speed since /sys/class/net/DEV/speed only + # reports link speed. + + # Search for things that look like 100baseSX, + # 40000baseNX, 10000baseT + try: + ethinfo = check_output(['ethtool', self.interface], + universal_newlines=True, + stderr=STDOUT).split(' ') + except FileNotFoundError: + logging.warning('ethtool not found! Unable to get max speed') + ethinfo = None + except CalledProcessError as e: + logging.error('ethtool returned an error!') + logging.error(e.output) + ethinfo = None + finally: + expression = '(\\d+)(base)([A-Z]+)' + regex = re.compile(expression) + speeds = [0] + if ethinfo: + for i in ethinfo: + hit = regex.search(i) + if hit: + speeds.append(int(hit.group(1))) + return max(speeds) @property def macaddress(self): @@ -379,91 +282,88 @@ class Interface(socket.socket): return self._read_data("device/label") -def get_test_parameters(args, environ, config_filename): +def get_test_parameters(args, environ): # Decide the actual values for test parameters, which can come - # from one of three possible sources: a config file, command-line + # from one of two possible sources: command-line # arguments, or environment variables. # - If command-line args were given, they take precedence # - Next come environment variables, if set. - # - Last, values in the config file are used if present. - params = {"test_target_ftp": None, - "test_user": None, - "test_pass": None, - "test_target_iperf": None} + params = {"test_target_iperf": None} - #First (try to) load values from config file - config = configparser.SafeConfigParser() - - try: - with open(config_filename) as config_file: - config.readfp(config_file) - params["test_target_ftp"] = config.get("FTP", "Target") - params["test_user"] = config.get("FTP", "User") - params["test_pass"] = config.get("FTP", "Pass") - params["test_target_iperf"] = config.get("IPERF", "Target") - except FileNotFoundError as err: - pass # No biggie, we can still get configs from elsewhere - - # Next see if we have environment variables to override the config file - # "partial" overrides are not allowed; if an env variable is missing, - # we won't use this at all. - if all([param.upper() in os.environ for param in params.keys()]): - for key in params.keys(): - params[key] = os.environ[key.upper()] + # See if we have environment variables + for key in params.keys(): + params[key] = os.environ.get(key.upper(), "") # Finally, see if we have the command-line arguments that are the ultimate - # override. Again, we will only override if we have all of them. - if args.target and args.username and args.password: - params["test_target_ftp"] = args.target - params["test_user"] = args.username - params["test_pass"] = args.password + # override. + if args.target: params["test_target_iperf"] = args.target return params +def can_ping(the_interface, test_target): + working_interface = False + num_loops = 0 + while (not working_interface) and (num_loops < 48): + working_interface = True + + try: + with open(os.devnull, 'wb') as DEVNULL: + check_call(["ping", "-I", the_interface, + "-c", "1", test_target], + stdout=DEVNULL, stderr=DEVNULL) + except CalledProcessError as excp: + working_interface = False + logging.warning("Ping failure on %s (%s)", the_interface, excp) + + if not working_interface: + time.sleep(5) + num_loops += 1 + + return working_interface + + def interface_test(args): - if not "test_type" in vars(args): + if not ("test_type" in vars(args)): return - # Determine whether to use the default or user-supplied config - # file name. - DEFAULT_CFG = "/etc/checkbox.d/network.cfg" - if not "config" in vars(args): - config_filename = DEFAULT_CFG - else: - config_filename = args.config + # Get the actual test data from one of two possible sources + test_parameters = get_test_parameters(args, os.environ) - # Get the actual test data from one of three possible sources - test_parameters = get_test_parameters(args, os.environ, config_filename) - - test_user = test_parameters["test_user"] - test_pass = test_parameters["test_pass"] if (args.test_type.lower() == "iperf" or args.test_type.lower() == "stress"): test_target = test_parameters["test_target_iperf"] - else: - test_target = test_parameters["test_target_ftp"] # Validate that we got reasonable values if not test_target or "example.com" in test_target: # Default values found in config file - logging.error("Please supply target via: %s", config_filename) + logging.error("Target server has not been supplied.") + logging.info("Configuration settings can be configured 3 different ways:") + logging.info("1- If calling the script directly, pass the --target option") + logging.info("2- Define the TEST_TARGET_IPERF environment variable") + logging.info("3- (If running the test via checkbox/plainbox, define the ") + logging.info("target in /etc/xdg/canonical-certification.conf)") + logging.info("Please run this script with -h to see more details on how to configure") sys.exit(1) # Testing begins here! # - # Check and make sure that interface is indeed connected + # Make sure that the interface is indeed connected try: - cmd = "ip link set dev %s up" % args.interface - check_call(shlex.split(cmd)) + check_call(["ip", "link", "set", "dev", args.interface, "up"]) except CalledProcessError as interface_failure: - logging.error("Failed to use %s:%s", cmd, interface_failure) + logging.error("Failed to use %s:%s", args.interface, interface_failure) return 1 - # Give interface enough time to get DHCP address - time.sleep(10) + # Back up routing table, since network down/up process + # tends to trash it.... + temp = tempfile.TemporaryFile() + try: + check_call(["ip", "route", "save", "table", "all"], stdout=temp) + except CalledProcessError as route_error: + logging.warning("Unable to save routing table: %s", route_error) result = 0 # Stop all other interfaces @@ -474,25 +374,25 @@ def interface_test(args): for iface in extra_interfaces: logging.debug("Shutting down interface:%s", iface) try: - cmd = "ip link set dev %s down" % iface - check_call(shlex.split(cmd)) + check_call(["ip", "link", "set", "dev", iface, "down"]) except CalledProcessError as interface_failure: - logging.error("Failed to use %s:%s", cmd, interface_failure) + logging.error("Failed to shut down %s:%s", + iface, interface_failure) result = 3 if result == 0: - # Execute FTP transfer benchmarking test - if args.test_type.lower() == "ftp": - ftp_benchmark = FTPPerformanceTest( - test_target, test_user, test_pass, args.interface) - - if args.filesize: - ftp_benchmark.binary_size = int(args.filesize) - result = ftp_benchmark.run() - - elif args.test_type.lower() == "iperf": - iperf_benchmark = IPerfPerformanceTest(args.interface, test_target, - args.fail_threshold) + # Ensure that interface is fully up by waiting until it can + # ping the test server + if not can_ping(args.interface, test_target): + logging.error("Can't ping test server on %s", args.interface) + return 1 + + # Execute requested networking test + if args.test_type.lower() == "iperf": + iperf_benchmark = IPerfPerformanceTest(args.interface, test_target, + args.fail_threshold) + if args.datasize: + iperf_benchmark.data_size = args.datasize result = iperf_benchmark.run() elif args.test_type.lower() == "stress": @@ -503,12 +403,22 @@ def interface_test(args): for iface in extra_interfaces: logging.debug("Restoring interface:%s", iface) try: - cmd = "ip link set dev %s up" % iface - check_call(shlex.split(cmd)) + check_call(["ip", "link", "set", "dev", iface, "up"]) except CalledProcessError as interface_failure: - logging.error("Failed to use %s:%s", cmd, interface_failure) + logging.error("Failed to restore %s:%s", iface, interface_failure) result = 3 + # Restore routing table to original state + temp.seek(0) + try: + # Harmless "RTNETLINK answers: File exists" messages on stderr + with open(os.devnull, 'wb') as DEVNULL: + check_call(["ip", "route", "restore"], stdin=temp, + stderr=DEVNULL) + except CalledProcessError as restore_failure: + logging.warning("Unable to restore routing table: %s", restore_failure) + temp.close() + return result @@ -540,10 +450,10 @@ interface. Example NIC information usage: network info -i eth0 --max-speed -For running ftp benchmark test: -network test -i eth0 -t ftp ---target 192.168.0.1 --username USERID --password PASSW0RD ---filesize-2 +For running iperf test: +network test -i eth0 -t iperf --target 192.168.0.1 +NOTE: The iperf test requires an iperf server running on the same network +segment that the test machine is running on. Configuration ============= @@ -553,28 +463,22 @@ priorities: 1- Command-line parameters (see above). 2- Environment variables (example will follow). -3- Configuration file (example will follow). - Default config location is /etc/checkbox.d/network.cfg +3- If run via checkbox/plainbox, /etc/xdg/checkbox-certification.conf + can have the below-mentioned environment variables defined in the + [environment] section. An example file is provided and can be simply + modified with the correct values. Environment variables ===================== -ALL environment variables must be defined, even if empty, for them to be -picked up. The variables are: -TEST_TARGET_FTP -TEST_USER -TEST_PASS +The variables are: TEST_TARGET_IPERF example config file =================== -[FTP] +[environment] +TEST_TARGET_IPERF = iperf-server.example.com -Target: 192.168.1.23 -User: FTPUser -Pass:PassW0Rd -[IPERF] -Target: 192.168.1.45 **NOTE** """ @@ -594,16 +498,14 @@ Target: 192.168.1.45 '-i', '--interface', type=str, required=True) test_parser.add_argument( '-t', '--test_type', type=str, - choices=("ftp", "iperf", "stress"), default="ftp", - help=("[FTP *Default*]")) + choices=("iperf", "stress"), default="iperf", + help=("[iperf *Default*]")) test_parser.add_argument('--target', type=str) test_parser.add_argument( - '--username', type=str, help=("For FTP test only")) - test_parser.add_argument( - '--password', type=str, help=("For FTP test only")) - test_parser.add_argument( - '--filesize', type=str, - help="Size (GB) of binary file to send **Note** for FTP test only") + '--datasize', type=str, + default="1", + help=("Amount of data to send. For iperf tests this will direct " + "iperf to send DATASIZE GB of data to the target.")) test_parser.add_argument( '--config', type=str, default="/etc/checkbox.d/network.cfg", @@ -614,6 +516,9 @@ Target: 192.168.1.45 help=("IPERF Test ONLY. Set the failure threshold (Percent of maximum " "theoretical bandwidth) as a number like 80. (Default is " "%(default)s)")) + test_parser.add_argument( + '--debug', default=False, action="store_true", + help="Turn on verbose output") # Sub info options info_parser.add_argument( @@ -623,6 +528,8 @@ Target: 192.168.1.45 info_parser.add_argument( '--duplex-mode', default=False, action="store_true") info_parser.add_argument( + '--link-speed', default=False, action="store_true") + info_parser.add_argument( '--max-speed', default=False, action="store_true") info_parser.add_argument( '--ipaddress', default=False, action="store_true") @@ -635,13 +542,24 @@ Target: 192.168.1.45 info_parser.add_argument( '--status', default=False, action="store_true", help=("displays connection status")) + info_parser.add_argument( + '--debug', default=False, action="store_true", + help="Turn on verbose output") test_parser.set_defaults(func=interface_test) info_parser.set_defaults(func=interface_info) args = parser.parse_args() - - return args.func(args) + + if args.debug: + logging.basicConfig(level=logging.DEBUG) + else: + logging.basicConfig(level=logging.INFO) + + if 'func' not in args: + parser.print_help() + else: + return args.func(args) if __name__ == "__main__": diff --git a/bin/network_device_info b/bin/network_device_info index b8c0267..f4b7911 100755 --- a/bin/network_device_info +++ b/bin/network_device_info @@ -200,8 +200,10 @@ def match_counts(nm_devices, udev_devices, devtype): if udev.category == udevtype] if len(nm_type_devices) != len(udev_type_devices): print("ERROR: devices missing - udev showed %d %s devices, but " - "NetworkManager saw %d devices in %s" % (len(udev_type_devices), - udevtype, len(nm_type_devices), devtype), file=sys.stderr) + "NetworkManager saw %d devices in %s" % + (len(udev_type_devices), udevtype, + len(nm_type_devices), devtype), + file=sys.stderr) return False else: return True @@ -253,7 +255,7 @@ def main(args): if not match_counts(nm_devices, udev_devices, "WiFi"): return 1 - elif not match_counts(nm_devices, udev_devices, ("Ethernet","Modem")): + elif not match_counts(nm_devices, udev_devices, ("Ethernet", "Modem")): return 1 else: return 0 diff --git a/bin/network_info b/bin/network_info index 2e5fae3..9c20b89 100755 --- a/bin/network_info +++ b/bin/network_info @@ -2,6 +2,7 @@ import os import sys +import subprocess import socket import fcntl import struct @@ -38,6 +39,13 @@ def get_ip_address(interface): )[20:24]) +def get_ipv6_address(interface): + cmd = ['/sbin/ip', '-6', 'addr', 'show', 'dev', interface] + proc = subprocess.check_output(cmd, universal_newlines=True) + ipaddr = proc.split()[8].strip() + return ipaddr + + def get_mac_address(interface): address_file = os.path.join(SYS_PATH, interface, 'address') @@ -56,9 +64,15 @@ def main(args): print("Interface: %s" % interface) print("Connected: %s" % connected) try: - print("IP: %s" % get_ip_address(interface)) + print("IPv4: %s" % get_ip_address(interface)) + except IOError: + print("IPv4: n/a") + try: + print("IPv6: %s" % get_ipv6_address(interface)) except IOError: - print("IP: n/a") + print("IPv6: n/a") + except: + print("IPv6: n/a") print("MAC: %s\n" % get_mac_address(interface)) return 0 diff --git a/bin/network_wait b/bin/network_wait index 34e31fe..f0637d0 100755 --- a/bin/network_wait +++ b/bin/network_wait @@ -1,10 +1,15 @@ #!/bin/bash -set -e - x=1 while true; do - state=$(/usr/bin/nmcli -t -f STATE nm) + state=$(/usr/bin/nmcli -t -f STATE nm 2>/dev/null) + if [[ $? != 0 ]]; then + state=$(/usr/bin/nmcli -t -f STATE general 2>/dev/null) + rc=$? + if [[ $rc != 0 ]]; then + exit $rc + fi + fi if [ "$state" = "connected" ]; then echo $state exit 0 diff --git a/bin/optical_write_test b/bin/optical_write_test index d3f772b..b127667 100755 --- a/bin/optical_write_test +++ b/bin/optical_write_test @@ -44,7 +44,7 @@ check_md5(){ generate_iso(){ # Generate ISO image echo "Creating ISO Image ..." - genisoimage -r -J -o $ISO_NAME $SAMPLE_FILE + genisoimage -input-charset UTF-8 -r -J -o $ISO_NAME $SAMPLE_FILE return $? } @@ -123,7 +123,7 @@ cleanup(){ cd $START_DIR echo "Now residing in $PWD" echo "Cleaning up ..." - umount $MOUNT_PT + umount "$MOUNT_PT" rm -fr $TEMP_DIR echo "Ejecting spent media ..." eject $OPTICAL_DRIVE diff --git a/bin/piglit_test b/bin/piglit_test deleted file mode 100755 index f03e628..0000000 --- a/bin/piglit_test +++ /dev/null @@ -1,98 +0,0 @@ -#!/usr/bin/python3 - -import os -import sys - -from argparse import ArgumentParser, FileType -from subprocess import check_output, STDOUT - - -class PiglitTests: - - def __init__(self, tests, name): - self._tests = tests - self._name = name - self._results = {} - - def run(self): - piglit_output = '' - - log_path = os.path.join(os.environ.get('CHECKBOX_DATA', '.'), - 'piglit-results', self._name) - - run_command = ["piglit-run.py"] - - for test in self._tests: - run_command.extend(["-t", test]) - - run_command.extend(['/usr/share/piglit/tests/all.tests', log_path]) - - piglit_output = check_output(run_command, - universal_newlines=True, - stderr=STDOUT) - # TODO: Capture stderr instead? - for line in piglit_output.split('\n'): - if ' :: ' in line: - self._results[line.split(' :: ')[-1].strip()] = \ - line.split(' :: ')[-2].strip() - - def get_tests_by_status(self, status): - """ - Return a list of the tests with the given status in the last piglit run - """ - tests = [] - for test in self._results: - if self._results[test] == status: - tests.append(test) - - return tests - - -def main(): - parser = ArgumentParser("A wrapper script for the Piglit graphics test " - "framework which runs the tests and parses the " - "results.") - parser.add_argument("--test", "-t", - required=True, - action='append', - help="The expression used to get the tests to run.") - parser.add_argument("--name", "-n", - required=True, - help="""A friendly name for this group of tests - to use in reporting.""") - parser.add_argument("--verbose", "-v", - action='store_true', - help='Have more verbose output') - args = parser.parse_args() - - piglit = PiglitTests(args.test, args.name) - piglit.run() - - passed_tests = piglit.get_tests_by_status('pass') - print("%d tests passed" % len(passed_tests)) - - if args.verbose: - print("\n".join(["- %s" % test for test in passed_tests])) - - failed_tests = piglit.get_tests_by_status('fail') - if failed_tests: - print("%d tests failed" % len(failed_tests)) - print("\n".join(["- %s" % test for test in failed_tests])) - - crashed_tests = piglit.get_tests_by_status('crash') - if crashed_tests: - print("%d tests crashed" % len(crashed_tests)) - print("\n".join(["- %s" % test for test in crashed_tests])) - - skipped_tests = piglit.get_tests_by_status('skip') - if skipped_tests: - print("%d tests were skipped" % len(skipped_tests)) - print("\n".join(["- %s" % test for test in skipped_tests])) - - if len(failed_tests) > 0 or len(crashed_tests) > 0: - return 1 - else: - return 0 - -if __name__ == "__main__": - sys.exit(main()) diff --git a/bin/pm_test b/bin/pm_test index 18c0def..827758f 100755 --- a/bin/pm_test +++ b/bin/pm_test @@ -7,6 +7,7 @@ import re import shutil import subprocess import sys +import signal from argparse import ArgumentParser, SUPPRESS from calendar import timegm @@ -143,7 +144,11 @@ class PowerManagementOperation(object): logging.info('Executing new {0!r} operation...' .format(self.args.pm_operation)) logging.debug('Executing: {0!r}...'.format(command_str)) - subprocess.Popen(command_str, shell=True) + # The PM operation is performed asynchronously so let's just wait + # indefinitely until it happens and someone comes along to kill us. + # This addresses LP: #1413134 + subprocess.check_call(command_str, shell=True) + signal.pause() def summary(self): """ @@ -569,7 +574,7 @@ class SudoersConfigurator(object): """ logging.debug('Enabling user to execute test as root...') command = ("sed -i -e '$a{mark}\\n" - "{user} ALL=NOPASSWD: /usr/bin/python' " + "{user} ALL=NOPASSWD: /usr/bin/python3' " "{filename}".format(mark=self.MARK, user=self.user, script=os.path.realpath(__file__), @@ -597,7 +602,7 @@ class AutoStartFile(object): [Desktop Entry] Name={pm_operation} test Comment=Verify {pm_operation} works properly -Exec=sudo /usr/bin/python {script} -r {repetitions} -w {wakeup} --hardware-delay {hardware_delay} --pm-delay {pm_delay} --min-pm-time {min_pm_time} --max-pm-time {max_pm_time} --append --total {total} --start {start} --pm-timestamp {pm_timestamp} {silent} --log-level={log_level} --log-dir={log_dir} {pm_operation} +Exec=sudo /usr/bin/python3 {script} -r {repetitions} -w {wakeup} --hardware-delay {hardware_delay} --pm-delay {pm_delay} --min-pm-time {min_pm_time} --max-pm-time {max_pm_time} --append --total {total} --start {start} --pm-timestamp {pm_timestamp} {silent} --log-level={log_level} --log-dir={log_dir} {pm_operation} Type=Application X-GNOME-Autostart-enabled=true Hidden=false @@ -617,6 +622,11 @@ Hidden=false autostart_directory = os.path.join(config_directory, 'autostart') if not os.path.exists(autostart_directory): os.makedirs(autostart_directory) + user_id = os.getenv('PKEXEC_UID') or os.getenv('SUDO_UID') + group_id = os.getenv('PKEXEC_UID') or os.getenv('SUDO_GID') + if user_id: + os.chown(config_directory, int(user_id), int(group_id)) + os.chown(autostart_directory, int(user_id), int(group_id)) basename = '{0}.desktop'.format(os.path.basename(__file__)) self.desktop_filename = os.path.join(autostart_directory, diff --git a/bin/pts_run b/bin/pts_run index 7b4440e..76921e8 100755 --- a/bin/pts_run +++ b/bin/pts_run @@ -9,6 +9,10 @@ echo -e "Y\nn\nn" | phoronix-test-suite > /dev/null # Disable batch result saving and all test options selection echo -e "n\nn" | phoronix-test-suite batch-setup > /dev/null +# Don't show the browser after each test. +# The implementation is a bit hacky but works. +phoronix-test-suite user-config-set DefaultBrowser=/bin/true + # Run each test only one time export FORCE_TIMES_TO_RUN=1 diff --git a/bin/recovery_info b/bin/recovery_info new file mode 100755 index 0000000..4aa03cb --- /dev/null +++ b/bin/recovery_info @@ -0,0 +1,391 @@ +#!/usr/bin/env python3 +# Copyright 2015 Canonical Ltd. +# Written by: +# Shawn Wang <shawn.wang@canonical.com> +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License version 3, +# as published by the Free Software Foundation. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +"""Show the recovery partition information for the preinstalled OS.""" + +import os +import re +import subprocess +import sys +import tempfile +import unittest +import xml.dom.minidom as minidom + +from guacamole import Command + +try: + from unittest import mock +except ImportError: + from plainbox.vendor import mock + +RECOVERY_PACKAGES = ["dell-recovery", "ubuntu-recovery"] + + +def get_recovery_package(): + """ + Test with RECOVERY_PACKAGES. + + to check recovery application is installed or not + + :return: + string of package_version or None + """ + for pkg in RECOVERY_PACKAGES: + output = subprocess.check_output(["apt-cache", "policy", pkg], + universal_newlines=True) + for line in output.split("\n"): + if line.startswith(" Installed:"): + ver = line.split(": ")[1] + return "{}_{}".format(pkg, ver.strip()) + return None + + +RECOVERY_LABELS = {"HP_TOOLS": "HP", + "PQSERVICE": "UBUNTU", + "BACKUP": "TEST", + "INSTALL": "DELL", + "OS": "DELL", + "RECOVERY": "DELL"} + + +_escape_pattern = re.compile(r'\\x([0-9a-fA-F][0-9a-fA-F])') + + +def lsblk_unescape(label): + """Un-escape text escaping done by lsblk(8).""" + return _escape_pattern.sub( + lambda match: chr(int(match.group(1), 16)), label) + + +def get_recovery_partition(): + """ + Get the type and location of the recovery partition. + + :return: + (recovery_type, recovery_partition) or None + + Use lsblk(8) to inspect available block devices looking + for a partition with FAT or NTFS and a well-known label. + """ + cmd = ['lsblk', '-o', 'TYPE,FSTYPE,NAME,LABEL', '--raw'] + for line in subprocess.check_output(cmd).splitlines()[1:]: + type, fstype, name, label = line.split(b' ', 3) + # Skip everything but partitions + if type != b'part': + continue + # Skip everything but FAT and NTFS + if fstype != b'vfat' and fstype != b'ntfs': + continue + label = lsblk_unescape(label.decode('utf-8')) + recovery_type = RECOVERY_LABELS.get(label) + # Skip unknown labels + if recovery_type is None: + continue + recovery_partition = '/dev/{}'.format(name.decode('utf-8')) + return (recovery_type, recovery_partition) + + +class FunctionTests(unittest.TestCase): + + """Tests for several functions.""" + + @mock.patch('subprocess.check_output') + def test_get_recovery_package(self, mock_subprocess_check_output): + """Smoke test for get_recovery_package().""" + mock_subprocess_check_output.return_value = """\ +dell-recovery: + Installed: 1.11 + Candidate: 1.11 + Version table: + 1.11 + 500 https://archive/cesg-mirror/ test/public amd64 Packages +""" + self.assertEqual(get_recovery_package(), + "dell-recovery_1.11") + + @mock.patch('subprocess.check_output') + def test_get_recovery_partition(self, mock_subprocess_check_output): + """Smoke test for get_recovery_partition().""" + mock_subprocess_check_output.return_value = ( + b'TYPE FSTYPE NAME LABEL\n' + b'disk linux_raid_member sda fx:2x250GB\n' + b'raid1 bcache md127 \n' + b'disk ext4 bcache0 Ultra\n' + b'disk linux_raid_member sdb fx:2x250GB\n' + b'raid1 bcache md127 \n' + b'disk ext4 bcache0 Ultra\n' + b'disk sdc \n' + b'part btrfs sdc1 vol1\n' + b'disk sdd \n' + b'part ntfs sdd1 Windows\x208.1\n' + b'part sdd2 \n' + b'part ext4 sdd5 Utopic\n' + b'part swap sdd6 \n' + b'disk bcache sde \n' + b'disk ext4 bcache0 Ultra\n' + b'disk sdf \n' + b'part ntfs sda3 RECOVERY\n') + self.assertEqual(get_recovery_partition(), ("DELL", "/dev/sda3")) + + def test_lsblk_unescape(self): + """Smoke tests for lsblk_unescape().""" + self.assertEqual(lsblk_unescape('Windows\\x208.1'), 'Windows 8.1') + self.assertEqual(lsblk_unescape('Windows XP'), 'Windows XP') + + +class MountedPartition(object): + + """ + Mount Manager to mount partition on tempdir. + + e.g. + with MountedPartition("/dev/sda1") as tmp: + print("This is the mount point: {}".format(tmp)) + do_stuff() + """ + + def __init__(self, part): + """ + Prepare the mntdir point. + + :param part: string of the partition device file, like /dev/sda2 + """ + self.part = part + self.mntdir = tempfile.mkdtemp() + + def __enter__(self): + """ + __enter__ method for python's with statement. + + Mount the partition device to the mntdir. + """ + cmd = ["mount", self.part, self.mntdir] + subprocess.check_output(cmd, universal_newlines=True) + return self.mntdir + + def __exit__(self, type, value, traceback): + """ + __exit__ method for python's with statement. + + Unmount and remove the mntdir. + """ + subprocess.check_output(["umount", self.mntdir], + universal_newlines=True) + os.rmdir(self.mntdir) + + +class MountedPartitionTests(unittest.TestCase): + + """Unittest of MountedPartition.""" + + @mock.patch('subprocess.check_output') + def test_with_of_MountedPartition(self, mock_subprocess_check_output): + """Test mount point.""" + test_dir = "" + with MountedPartition("/dev/test") as tmp: + test_dir = tmp + self.assertTrue(os.path.exists(test_dir)) + mock_subprocess_check_output.assert_has_calls( + [mock.call(['mount', '/dev/test', test_dir], + universal_newlines=True)]) + self.assertFalse(os.path.exists(test_dir)) + mock_subprocess_check_output.assert_has_calls( + [mock.call(['umount', test_dir], + universal_newlines=True)]) + + +class RecoveryVersion(Command): + + """ + print the version of recovery image. + + @EPILOG@ + + This commands prints information such as: + + image_version: xxx + bto_version: REV_xxx.iso (dell only) + """ + + def invoked(self, ctx): + """ + Guacamole method called when the command is invoked. + + /etc/buildstamp is a image information file, + it created by the oem image builder. + + oilpalm Fri, 20 Jun 2014 04:02:07 +0000 + somerville-trusty-amd64-20140620-0 + + If /etc/buildstamp exist, print out the second line (image iso name). + + For Dell-recovery partition, /etc/buildstamp shows base image info. + If recovery_partition/bto.xml, + print out the bto_version (read from xml file). + """ + if os.path.isfile("/etc/buildstamp"): + with open('/etc/buildstamp', 'rt', encoding='UTF-8') as stream: + data = stream.readlines() + print("image_version: {}".format(data[1].strip())) + + with MountedPartition(ctx.recovery_partition) as mntdir: + fname = "{}/bto.xml".format(mntdir) + if os.path.isfile(fname): + o = minidom.parse("{}/bto.xml".format(mntdir)) + bto_version = o.getElementsByTagName("iso")[0].firstChild.data + print("bto_version: {}".format(bto_version)) + + +class RecoveryFile(Command): + + """ + display a single file from the recovery partition + + This command can be used to ``cat`` any file from the recovery partition + """ + + def register_arguments(self, parser): + """ + Guacamole method used by the argparse ingredient. + + :param parser: + Argument parser (from :mod:`argparse`) specific to this command. + """ + parser.add_argument('file', help='name of the file to display') + + def invoked(self, ctx): + """ + Guacamole method used by the command ingredient. + + :param ctx: + The guacamole context object. Context provides access to all + features of guacamole. The argparse ingredient adds the ``args`` + attribute to it. That attribute contains the result of parsing + command line arguments. + :returns: + The return code of the command. Guacamole translates ``None`` to a + successful exit status (return code zero). + """ + with MountedPartition(ctx.recovery_partition) as mnt: + return subprocess.call([ + 'cat', '--', os.path.join(mnt, ctx.args.file)]) + + +class RecoveryCheckType(Command): + + """ + test if the recovery partition is of the given type. + + This command can be used for scripted tests, to see if the recovery + partition on the current system is of a concrete type or not (e.g. + DELL-specific) + + @EPILOG@ + + The exit code is 0 if the recovery partition type matches and 1 otherwise. + """ + + def register_arguments(self, parser): + """ + Guacamole method used by the argparse ingredient. + + :param parser: + Argument parser (from :mod:`argparse`) specific to this command. + """ + parser.add_argument( + 'type', help="expected type of the recovery partition") + + def invoked(self, ctx): + """ + Guacamole method used by the command ingredient. + + :param ctx: + The guacamole context object. Context provides access to all + features of guacamole. The argparse ingredient adds the ``args`` + attribute to it. That attribute contains the result of parsing + command line arguments. + :returns: + The return code of the command. Guacamole translates ``None`` to a + successful exit status (return code zero). + """ + if ctx.recovery_type != ctx.args.type: + return 1 + + +class RecoveryInfo(Command): + + """ + Inspect the recovery partition. + + This command can be used to inspect the recovery partition. It has several + sub-commands that do various tasks. If the system has no recovery + partition, the command exits with the error code 1. + """ + + sub_commands = ( + ('version', RecoveryVersion), + ('file', RecoveryFile), + ('checktype', RecoveryCheckType), + ) + + def invoked(self, ctx): + """ + Guacamole method used by the command ingredient. + + :param ctx: + The guacamole context object. Context provides access to all + features of guacamole. The argparse ingredient adds the ``args`` + attribute to it. That attribute contains the result of parsing + command line arguments. + :returns: + The return code of the command. Guacamole translates ``None`` to a + successful exit status (return code zero). + """ + partition = get_recovery_partition() + + if partition is None: + print("Recovery partition not found", file=sys.stderr) + return 1 + (recovery_type, recovery_partition) = partition + ctx.recovery_partition = recovery_partition + ctx.recovery_type = recovery_type + + +class RecoveryInfoTests(unittest.TestCase): + + """Tests for RecoveryInfo.""" + + @mock.patch('__main__.get_recovery_package') + @mock.patch('__main__.get_recovery_partition') + def test_smoke(self, mock_get_recovery_partition, + mock_get_recovery_package): + """Smoke tests for running recovery_info.""" + mock_get_recovery_partition.return_value = ("DELL", "/dev/sda3") + mock_get_recovery_package.return_value = "dell-recovery_1.11" + self.assertEqual(RecoveryInfo().main(argv=[], exit=False), 0) + self.assertEqual( + RecoveryInfo().main(argv=["checktype", "HP"], exit=False), 1) + self.assertEqual( + RecoveryInfo().main(argv=["checktype", "DELL"], exit=False), 0) + + +if __name__ == '__main__': + if '--test' in sys.argv: + sys.argv.remove('--test') + unittest.main() + else: + RecoveryInfo().main() diff --git a/bin/removable_storage_test b/bin/removable_storage_test index 81c11f8..8e7732b 100755 --- a/bin/removable_storage_test +++ b/bin/removable_storage_test @@ -22,9 +22,13 @@ from checkbox_support.dbus.udisks2 import is_udisks2_supported from checkbox_support.dbus.udisks2 import lookup_udev_device from checkbox_support.dbus.udisks2 import map_udisks1_connection_bus from checkbox_support.heuristics.udisks2 import is_memory_card -from checkbox_support.parsers.udevadm import CARD_READER_RE, GENERIC_RE, FLASH_RE +from checkbox_support.helpers.human_readable_bytes import HumanReadableBytes +from checkbox_support.parsers.udevadm import CARD_READER_RE +from checkbox_support.parsers.udevadm import GENERIC_RE +from checkbox_support.parsers.udevadm import FLASH_RE from checkbox_support.udev import get_interconnect_speed from checkbox_support.udev import get_udev_block_devices +from checkbox_support.udev import get_udev_xhci_devices class ActionTimer(): @@ -99,6 +103,8 @@ class DiskTest(): self.rem_disks_memory_cards = {} self.rem_disks_memory_cards_nm = {} self.rem_disks_speed = {} + # LP: #1313581, TODO: extend to be rem_disks_driver + self.rem_disks_xhci = {} self.data = '' self.device = device self.memorycard = memorycard @@ -174,8 +180,8 @@ class DiskTest(): have both the filesystem and block device interfaces """ for udisks2_object_path, interfaces in udisks2_objects.items(): - if (UDISKS2_FILESYSTEM_INTERFACE in interfaces - and UDISKS2_BLOCK_INTERFACE in interfaces): + if (UDISKS2_FILESYSTEM_INTERFACE in interfaces and + UDISKS2_BLOCK_INTERFACE in interfaces): yield udisks2_object_path # We need to know about all IO candidates, # let's iterate over all the block devices reported by udisks2 @@ -275,14 +281,14 @@ class DiskTest(): parent_media = parent_props.Get(udisks, "DriveMedia") if self.memorycard: if (dev_bus != 'sdio' - and not FLASH_RE.search(parent_media) - and not CARD_READER_RE.search(parent_model) - and not GENERIC_RE.search(parent_vendor)): + and not FLASH_RE.search(parent_media) + and not CARD_READER_RE.search(parent_model) + and not GENERIC_RE.search(parent_vendor)): continue else: if (FLASH_RE.search(parent_media) - or CARD_READER_RE.search(parent_model) - or GENERIC_RE.search(parent_vendor)): + or CARD_READER_RE.search(parent_model) + or GENERIC_RE.search(parent_vendor)): continue dev_file = str(device_props.Get(udisks, "DeviceFile")) dev_speed = str(device_props.Get(udisks, @@ -297,6 +303,30 @@ class DiskTest(): self.rem_disks_nm[dev_file] = None self.rem_disks_memory_cards_nm[dev_file] = None + def get_disks_xhci(self): + """ + Compare + 1. the pci slot name of the devices using xhci + 2. the pci slot name of the disks, + which is usb3 disks in this case so far, + to make sure the usb3 disk does be on the controller using xhci + """ + # LP: #1378724 + udev_client = GUdev.Client() + # Get a collection of all udev devices corresponding to block devices + udev_devices = get_udev_block_devices(udev_client) + # Get a collection of all udev devices corresponding to xhci devices + udev_devices_xhci = get_udev_xhci_devices(udev_client) + for udev_device_xhci in udev_devices_xhci: + pci_slot_name = udev_device_xhci.get_property('PCI_SLOT_NAME') + for udev_device in udev_devices: + devpath = udev_device.get_property('DEVPATH') + if (self._compare_pci_slot_from_devpath(devpath, + pci_slot_name)): + self.rem_disks_xhci[ + udev_device.get_property('DEVNAME')] = 'xhci' + return self.rem_disks_xhci + def mount(self): passed_mount = {} @@ -340,6 +370,23 @@ class DiskTest(): if not os.path.ismount(self.rem_disks_nm[disk]): os.rmdir(self.rem_disks_nm[disk]) + def _compare_pci_slot_from_devpath(self, devpath, pci_slot_name): + # LP: #1334991 + # a smarter parser to get and validate a pci slot name from DEVPATH + # then compare this pci slot name to the other + dl = devpath.split('/') + s = set([x for x in dl if dl.count(x) > 1]) + if ((pci_slot_name in dl) + and (dl.index(pci_slot_name) < dl.index('block')) + and (not(pci_slot_name in s))): + # 1. there is such pci_slot_name + # 2. sysfs topology looks like + # DEVPATH = ....../pci_slot_name/....../block/...... + # 3. pci_slot_name should be unique in DEVPATH + return True + else: + return False + def main(): parser = argparse.ArgumentParser() @@ -380,10 +427,17 @@ def main(): help='The number of random data files to generate') parser.add_argument('-s', '--size', action='store', - type=int, - default=1048576, - help=("The size of the test data file to use " - "in Bytes. Default is %(default)s")) + type=HumanReadableBytes, + default='1MiB', + help=("The size of the test data file to use. " + "You may use SI or IEC suffixes like: 'K', 'M'," + "'G', 'T', 'Ki', 'Mi', 'Gi', 'Ti', etc. Default" + " is %(default)s")) + parser.add_argument('--auto-reduce-size', + action='store_true', + default=False, + help=("Automatically reduce size to fit in the target" + "filesystem. Reducing until fits in 1MiB")) parser.add_argument('-n', '--skip-not-mount', action='store_true', default=False, @@ -393,6 +447,10 @@ def main(): help=("Memory cards devices on bus other than sdio " "require this parameter to identify " "them as such")) + parser.add_argument('--driver', + choices=['xhci_hcd'], + help=("Detect the driver of the host controller." + "Only xhci_hcd for usb3 is supported so far.")) args = parser.parse_args() @@ -444,7 +502,7 @@ def main(): if errors_mount > 0: print("There're total %d device(s) failed at mounting." - % errors_mount) + % errors_mount) errors += errors_mount disks_all = dict(list(test.rem_disks.items()) @@ -452,17 +510,17 @@ def main(): if len(disks_all) > 0: print("Found the following mounted %s partitions:" - % ', '.join(args.device)) + % ', '.join(args.device)) for disk, mount_point in disks_all.items(): supported_speed = test.rem_disks_speed[disk] print(" %s : %s : %s bits/s" % - (disk, mount_point, supported_speed), - end="") - if (args.min_speed - and int(args.min_speed) > int(supported_speed)): + (disk, mount_point, supported_speed), + end="") + if (args.min_speed and + int(args.min_speed) > int(supported_speed)): print(" (Will not test it, speed is below %s bits/s)" % - args.min_speed, end="") + args.min_speed, end="") print("") @@ -472,13 +530,38 @@ def main(): if not args.min_speed or int(test.rem_disks_speed[disk]) >= int(args.min_speed)} + if len(disks_eligible) == 0: + logging.error( + "No %s disks with speed higher than %s bits/s", + args.device, args.min_speed) + return 1 write_sizes = [] test_files = {} + disks_freespace = {} + for disk, path in disks_eligible.items(): + stat = os.statvfs(path) + disks_freespace[disk] = stat.f_bfree * stat.f_bsize + smallest_freespace = min(disks_freespace.values()) + desired_size = args.size + if desired_size > smallest_freespace: + if args.auto_reduce_size: + min_space = HumanReadableBytes("1MiB") + if smallest_freespace < min_space: + raise IOError("Not enough space. {} is required" + .format(min_space)) + new_size = HumanReadableBytes(int(0.8 * smallest_freespace)) + logging.warning("Automatically reducing test data size" + ". {} requested. Reducing to {}." + .format(desired_size, new_size)) + desired_size = new_size + else: + raise IOError("Not enough space. {} is required" + .format(desired_size)) # Generate our data file(s) for count in range(args.count): - test_files[count] = RandomData(args.size) + test_files[count] = RandomData(desired_size) write_sizes.append(os.path.getsize( - test_files[count].tfile.name)) + test_files[count].tfile.name)) total_write_size = sum(write_sizes) try: @@ -525,7 +608,7 @@ def main(): for file in target_file_list: test.clean_up(file) total_write_time = sum(write_times) - avg_write_time = total_write_time / args.count + # avg_write_time = total_write_time / args.count try: avg_write_speed = (( total_write_size / total_write_time) @@ -547,7 +630,7 @@ def main(): (iteration_write_time / args.iterations)) try: avg_write_speed = (iteration_write_size / - iteration_write_time) + iteration_write_time) except ZeroDivisionError: avg_write_speed = 0.00 finally: @@ -566,16 +649,63 @@ def main(): "Completed %s test iterations, but there were" " errors", args.count) return 1 - elif len(disks_eligible) == 0: - logging.error( - "No %s disks with speed higher than %s bits/s", - args.device, args.min_speed) - return 1 - else: - #Pass is not assured! + # LP: 1313581 + # Try to figure out whether the disk + # is SuperSpeed USB and using xhci_hcd driver. + if (args.driver == 'xhci_hcd'): + # The speed reported by udisks is sometimes + # less than 5G bits/s, for example, + # it may be 705032705 bits/s + # So using + # 500000000 + # = 500 M bits/s + # > 480 M bits/s ( USB 2.0 spec.) + # to make sure that it is higher USB version than 2.0 + # + # int() for int(test.rem_disks_speed[disk]) + # is necessary + # because the speed value of + # the dictionary rem_disks_speed is + # 1. str or int from _probe_disks_udisks2 + # 2. int from _probe_disks_udisks1. + # This is really a mess. : ( + print("\t\t--------------------------------") + if(500000000 < int(test.rem_disks_speed[disk])): + print("\t\tDevice Detected: SuperSpeed USB") + # Unlike rem_disks_speed, + # which must has the connect speed + # for each disk devices, + # disk devices may not use xhci as + # controller drivers. + # This will raise KeyError for no + # associated disk device was found. + xhci_disks = test.get_disks_xhci() + # pep8 style suggest to limit the try clause + # to the absolute minimum amount of code necessary + try: + disk_xhci_flag = xhci_disks[disk] + except KeyError: + print("\t\tDisk does not use xhci_hci.") + return 1 + else: + if('xhci' == disk_xhci_flag): + print("\t\tDriver Detected: xhci_hcd") + else: + print("\t\tDisk does not use xhci_hci.") + logging.debug("disk_xhci_flag is not xhci") + return 1 + else: + # Give it a hint for the detection failure. + # LP: #1362902 + print(("\t\tNo SuperSpeed USB using xhci_hcd " + "was detected correctly.")) + print(("\t\tHint: please use dmesg to check " + "the system status again.")) + return 1 + # Pass is not assured if (not args.pass_speed or - avg_write_speed >= args.pass_speed): + avg_write_speed >= args.pass_speed): return 0 else: print("FAIL: Average speed was lower than desired " diff --git a/bin/rotation_test b/bin/rotation_test index 8fcf727..5187401 100755 --- a/bin/rotation_test +++ b/bin/rotation_test @@ -45,8 +45,9 @@ def main(): for rot in rotations: try: status = rotate_screen(rotations[rot]) - except(xrandr.RRError, xrandr.UnsupportedRRError) as error: + except (xrandr.RRError, xrandr.UnsupportedRRError) as exc: status = 1 + error = exc else: error = 'N/A' # Collect the status and the error message diff --git a/bin/sleep_test b/bin/sleep_test index f63ab06..088a7a1 100755 --- a/bin/sleep_test +++ b/bin/sleep_test @@ -1,11 +1,12 @@ -#!/usr/bin/python +#!/usr/bin/python3 ''' Program to automate system entering and resuming from sleep states -Copyright (C) 2010,2011 Canonical Ltd. +Copyright (C) 2010-2014 Canonical Ltd. Author: Jeff Lane <jeffrey.lane@canonical.com> + Daniel Manrique <roadmr@ubuntu.com> This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License version 2, @@ -20,13 +21,14 @@ You should have received a copy of the GNU General Public License along with this program. If not, see <http://www.gnu.org/licenses/>. ''' -import os -import sys import logging +import os import re -from subprocess import call +import sys +import syslog + from optparse import OptionParser, OptionGroup -from syslog import * +from subprocess import call class ListDictHandler(logging.StreamHandler): @@ -41,16 +43,20 @@ class ListDictHandler(logging.StreamHandler): for msg in record.msg: logger = logging.getLogger(record.name) new_record = logger.makeRecord(record.name, record.levelno, - record.pathname, record.lineno, msg, record.args, - record.exc_info, record.funcName) + record.pathname, record.lineno, + msg, record.args, + record.exc_info, + record.funcName) logging.StreamHandler.emit(self, new_record) elif isinstance(record.msg, dict): - for key, val in record.msg.iteritems(): + for key, val in record.msg.items(): logger = logging.getLogger(record.name) new_msg = '%s: %s' % (key, val) new_record = logger.makeRecord(record.name, record.levelno, - record.pathname, record.lineno, new_msg, record.args, - record.exc_info, record.funcName) + record.pathname, record.lineno, + new_msg, record.args, + record.exc_info, + record.funcName) logging.StreamHandler.emit(self, new_record) else: logging.StreamHandler.emit(self, record) @@ -74,9 +80,9 @@ class SuspendTest(): future kernels. ''' - states_fh = open('/sys/power/state', 'r', 0) + states_fh = open('/sys/power/state', 'rb', 0) try: - states = states_fh.read().split() + states = states_fh.read().decode('ascii').split() finally: states_fh.close() logging.debug('The following sleep states were found:') @@ -89,9 +95,9 @@ class SuspendTest(): def GetCurrentTime(self): - time_fh = open('/sys/class/rtc/rtc0/since_epoch', 'r', 0) + time_fh = open('/sys/class/rtc/rtc0/since_epoch', 'rb', 0) try: - time = int(time_fh.read()) + time = int(time_fh.read().decode('ascii')) finally: time_fh.close() return time @@ -110,13 +116,13 @@ class SuspendTest(): self.last_time = self.GetCurrentTime() logging.debug('Current epoch time: %s' % self.last_time) - wakealarm_fh = open('/sys/class/rtc/rtc0/wakealarm', 'w', 0) + wakealarm_fh = open('/sys/class/rtc/rtc0/wakealarm', 'wb', 0) try: - wakealarm_fh.write('0\n') + wakealarm_fh.write('0\n'.encode('ascii')) wakealarm_fh.flush() - wakealarm_fh.write('+%s\n' % time) + wakealarm_fh.write('+{}\n'.format(time).encode('ascii')) wakealarm_fh.flush() finally: wakealarm_fh.close() @@ -153,10 +159,10 @@ class SuspendTest(): def GetResults(self, mode, perf): ''' - This will parse /var/log/messages for our start and end markers. Then - it'll find a few key phrases that are part of the sleep and resume - process, grab their timestamps, Bob's your Uncle and return a - three-tuple consisting of: (PASS/FAIL,Sleep elapsed time, Resume + This will parse /var/log/messages for our start and end markers. Then + it'll find a few key phrases that are part of the sleep and resume + process, grab their timestamps, Bob's your Uncle and return a + three-tuple consisting of: (PASS/FAIL,Sleep elapsed time, Resume elapsed time) ''' # figure out our elapsed time @@ -185,7 +191,7 @@ class SuspendTest(): sleep_end_time = re.split('[\[\]]', loglist[idx - 1])[1].strip() logging.debug('Sleep ended at %s' % sleep_end_time) - resume_start_time = re.split('[\[\]]', + resume_start_time = re.split('[\[\]]', loglist[idx])[1].strip() logging.debug('Resume started at %s' % resume_start_time) idx += 1 @@ -194,7 +200,7 @@ class SuspendTest(): loglist[idx])[1].strip() logging.debug('Resume ended at %s' % resume_end_time) if self.end_marker in loglist[idx]: - logging.debug('End Marker found, run appears to ' + logging.debug('End Marker found, run appears to ' 'have completed') run_complete = 'Pass' break @@ -206,7 +212,7 @@ class SuspendTest(): else: if self.end_marker in loglist: logging.debug('End Marker found, ' - 'run appears to have completed') + 'run appears to have completed') run_complete = 'Pass' sleep_elapsed = None resume_elapsed = None @@ -218,7 +224,7 @@ class SuspendTest(): Write a stamped marker to syslogd (will appear in /var/log/messages). This is used to calculate the elapsed time for each iteration. ''' - syslog(LOG_INFO, '---' + marker + '---') + syslog(syslog.LOG_INFO, '---' + marker + '---') def CheckAlarm(self, mode): ''' @@ -229,10 +235,10 @@ class SuspendTest(): the system did not wake by alarm IRQ, but by some other means. ''' rtc = {} - rtc_fh = open('/proc/driver/rtc', 'r', 0) - alarm_fh = open('/sys/class/rtc/rtc0/wakealarm', 'r', 0) + rtc_fh = open('/proc/driver/rtc', 'rb', 0) + alarm_fh = open('/sys/class/rtc/rtc0/wakealarm', 'rb', 0) try: - rtc_data = rtc_fh.read().splitlines() + rtc_data = rtc_fh.read().decode('ascii').splitlines() for item in rtc_data: rtc_entry = item.partition(':') rtc[rtc_entry[0].strip()] = rtc_entry[2].strip() @@ -240,7 +246,7 @@ class SuspendTest(): rtc_fh.close() try: - alarm = int(alarm_fh.read()) + alarm = int(alarm_fh.read().decode('ascii')) except ValueError: alarm = None finally: @@ -275,44 +281,44 @@ class SuspendTest(): def main(): usage = 'Usage: %prog [OPTIONS]' parser = OptionParser(usage) - group = OptionGroup(parser, 'This will not work for hibernat testing due' \ - ' to a kernel timestamp bug when doing an S4 ' \ + group = OptionGroup(parser, 'This will not work for hibernat testing due' + ' to a kernel timestamp bug when doing an S4 ' '(hibernate/resume) sleep cycle') group.add_option('-p', '--perf', - action='store_true', - default=False, - help='Add some output that tells you how long it ' \ - 'takes to enter a sleep state and how long it ' \ - 'takes to resume.') + action='store_true', + default=False, + help='Add some output that tells you how long it ' + 'takes to enter a sleep state and how long it ' + 'takes to resume.') parser.add_option('-i', '--iterations', - action='store', - type='int', - metavar='NUM', - default=1, - help='The number of times to run the suspend/resume \ - loop. Default is %default') + action='store', + type='int', + metavar='NUM', + default=1, + help='The number of times to run the suspend/resume ' + 'loop. Default is %default') parser.add_option('-w', '--wake-in', - action='store', - type='int', - metavar='NUM', - default=60, - dest='wake_time', - help='Sets wake up time (in seconds) in the future \ - from now. Default is %default.') + action='store', + type='int', + metavar='NUM', + default=60, + dest='wake_time', + help='Sets wake up time (in seconds) in the future ' + 'from now. Default is %default.') parser.add_option('-s', '--sleep-state', - action='store', - default='mem', - metavar='MODE', - dest='mode', - help='Sets the sleep state to test. Passing mem will \ - set the sleep state to Suspend-To-Ram or S3. Passing \ - disk will set the sleep state to Suspend-To-Disk or S4\ - (hibernate). Default sleep state is %default') + action='store', + default='mem', + metavar='MODE', + dest='mode', + help='Sets the sleep state to test. Passing mem will ' + 'set the sleep state to Suspend-To-Ram or S3. Passing ' + 'disk will set the sleep state to Suspend-To-Disk or S4 ' + '(hibernate). Default sleep state is %default') parser.add_option('-d', '--debug', - action='store_true', - default=False, - help='Choose this to add verbose output for debug \ - purposes') + action='store_true', + default=False, + help='Choose this to add verbose output for debug \ + purposes') parser.add_option_group(group) (options, args) = parser.parse_args() options_dict = vars(options) @@ -330,10 +336,10 @@ def main(): # Set up the logger logger = logging.getLogger() logger.setLevel(logging.DEBUG) - + if options.debug: handler.setLevel(logging.DEBUG) - + logger.addHandler(handler) logging.debug('Running with these options') logging.debug(options_dict) @@ -349,7 +355,7 @@ def main(): return 1 else: logging.debug('%s sleep state supported, continuing test' - % options.mode) + % options.mode) # We run the following for the number of iterations requested for iteration in range(0, options.iterations): @@ -358,7 +364,7 @@ def main(): suspender.DoSuspend(options.mode) run_count += 1 run_result[run_count] = suspender.GetResults(options.mode, - options.perf) + options.perf) if suspender.CheckAlarm(options.mode): logging.debug('The alarm is still set') @@ -367,7 +373,7 @@ def main(): resume_total = 0.0 logging.info('=' * 20 + ' Test Results ' + '=' * 20) logging.info(run_result) - + for k in run_result.iterkeys(): sleep_total += run_result[k][1] resume_total += run_result[k][2] @@ -381,7 +387,7 @@ def main(): if fail_count > 0: logging.error('%s sleep/resume test cycles failed' % - fail_count) + fail_count) logging.error(run_result) return 1 else: diff --git a/bin/sleep_time_check b/bin/sleep_time_check index 6c85c9f..d392769 100755 --- a/bin/sleep_time_check +++ b/bin/sleep_time_check @@ -21,18 +21,21 @@ def main(): action='store', dest='resume_threshold', type=float, - default=3.00, + default=5.00, help=('The max time a system should have taken to ' 'resume from a sleep state. (Default: ' '%(default)s)')) args = parser.parse_args() - + try: - file = open(args.filename) - lines = file.readlines() - finally: - file.close() + with open(args.filename) as file: + lines = file.readlines() + except IOError as e: + print(e) + return False + sleep_time = None + resume_time = None # find our times for line in lines: if "Average time to sleep" in line: @@ -40,17 +43,21 @@ def main(): elif "Average time to resume" in line: resume_time = float(line.split(':')[1].strip()) + if sleep_time is None or resume_time is None: + print("ERROR: One or more times was not reported correctly") + return False + print("Average time to enter sleep state: %s seconds" % sleep_time) print("Average time to resume from sleep state: %s seconds" % resume_time) - + failed = False if sleep_time > args.sleep_threshold: - print("System failed to suspend in less than %s seconds" % - args.sleep_threshold) + print("System failed to suspend in less than %s seconds" % + args.sleep_threshold) failed = True if resume_time > args.resume_threshold: print("System failed to resume in less than %s seconds" % - args.resume_threshold) + args.resume_threshold) failed = True if sleep_time <= 0.00 or resume_time <= 0.00: print("ERROR: One or more times was not reported correctly") diff --git a/bin/storage_test b/bin/storage_test index 54e604f..ef19c5e 100755 --- a/bin/storage_test +++ b/bin/storage_test @@ -35,7 +35,21 @@ disk=/dev/$1 if [ -b $disk ] then echo "$disk is a block device" - size=`parted -l -s | grep $disk | awk '{print $3}'` + + #Add a check for warnings + WARN=$(parted -s ${disk} print | grep "^Warning.*${disk}.*[Rr]ead-only" 2>&1) + if [[ $? == 0 ]] + then + echo "Warning found in parted output:" + echo $WARN + echo "Aborting Test" + exit 1 + fi + + # Regex changed to better handle when $disk appears more than once + # in parted output (such as in warning messages or not caught in the + # check above) + size=`parted -l -s |grep "Disk.*${disk}" |awk '{print $3}'` if [ -n "$size" ] then @@ -54,7 +68,7 @@ then if [ $size_range == "KB" ] then - echo "$disk is too small to be functioning." + echo "$disk size reported in KB, seems to be too small for testing." exit 1 elif [ $size_range == "MB" ] then @@ -64,7 +78,7 @@ then then run_bonnie $disk else - echo "$disk is too small to be functioning." + echo "$disk is too small to be used for testing." exit 1 fi else diff --git a/bin/touchpad_driver_info b/bin/touchpad_driver_info index 3f38968..431b874 100755 --- a/bin/touchpad_driver_info +++ b/bin/touchpad_driver_info @@ -15,7 +15,7 @@ class TouchResult: attributes = {} def addDevice(self, device): - if getattr(device, 'category') == 'TOUCH': + if getattr(device, 'category') == 'TOUCHPAD': self.attributes['driver'] = getattr(device, 'driver') self.attributes['product'] = getattr(device, 'product') diff --git a/bin/virtualization b/bin/virtualization index 486a554..f1d043c 100755 --- a/bin/virtualization +++ b/bin/virtualization @@ -50,14 +50,137 @@ DEFAULT_TIMEOUT = 500 class XENTest(object): pass - +# The "TAR" type is a tarball that contains both +# a disk image and a kernel binary. This is useful +# on architectures that don't (yet) have a bootloader +# in the disk image that we can chain to, and instead +# we need to have qemu load boot files externally +CLOUD_IMAGE_TYPE_TAR = 1 +CLOUD_IMAGE_TYPE_DISK = 2 + +QEMU_DISK_TYPE_SD = 1 +QEMU_DISK_TYPE_VIRTIO = 2 +QEMU_DISK_TYPE_VIRTIO_BLK = 3 + +QEMU_ARCH_CONFIG = { + 'arm64': { + 'cloudimg_type': CLOUD_IMAGE_TYPE_TAR, + 'cloudimg_arch': 'arm64', + 'qemu_bin': 'qemu-system-aarch64', + 'qemu_disk_type': QEMU_DISK_TYPE_VIRTIO_BLK, + 'qemu_extra_args': [ + '-machine', 'virt', + '-cpu', 'host', + '-enable-kvm', + '-serial', 'stdio', + ], + }, + 'armhf': { + 'cloudimg_type': CLOUD_IMAGE_TYPE_TAR, + 'cloudimg_arch': 'armhf', + 'qemu_bin': 'qemu-system-arm', + 'qemu_disk_type': QEMU_DISK_TYPE_VIRTIO_BLK, + 'qemu_extra_args': [ + '-machine', 'virt', + '-cpu', 'host', + '-enable-kvm', + '-serial', 'stdio', + ], + }, + 'amd64': { + 'cloudimg_type': CLOUD_IMAGE_TYPE_DISK, + 'cloudimg_arch': 'i386', + 'qemu_bin': 'qemu-system-x86_64', + 'qemu_disk_type': QEMU_DISK_TYPE_VIRTIO, + 'qemu_extra_args': [ + '-machine', 'accel=kvm:tcg', + ], + }, + 'i386': { + 'cloudimg_type': CLOUD_IMAGE_TYPE_DISK, + 'cloudimg_arch': 'i386', + 'qemu_bin': 'qemu-system-x86_64', + 'qemu_disk_type': QEMU_DISK_TYPE_VIRTIO, + 'qemu_extra_args': [ + '-machine', 'accel=kvm:tcg', + ], + }, + 'ppc64el': { + 'cloudimg_type': CLOUD_IMAGE_TYPE_DISK, + 'cloudimg_arch': 'ppc64el', + 'qemu_bin': 'qemu-system-ppc64', + 'qemu_disk_type': QEMU_DISK_TYPE_VIRTIO, + 'qemu_extra_args': [ + '-machine', 'prep', + ], + }, +} + +class QemuRunner(object): + def __init__(self, arch): + self.arch = arch + self.config = QEMU_ARCH_CONFIG[arch] + self.drive_id = 0 + # Parameters common to all architectures + self.params = [ + self.config['qemu_bin'], + "-m", "256", + "-display", "none", + "-nographic", + "-net", "nic", + "-net", "user,net=10.0.0.0/8,host=10.0.0.1,hostfwd=tcp::2222-:22", + ] + # Add any architecture-specific parameters + if 'qemu_extra_args' in self.config: + self.params = self.params + self.config['qemu_extra_args'] + + self.append = [] + if self.config['cloudimg_type'] == CLOUD_IMAGE_TYPE_TAR: + self.append = self.append + [ + 'console=ttyAMA0', + 'earlyprintk=serial', + 'ro', + 'rootfstype=ext4', + 'root=LABEL=cloudimg-rootfs', + 'rootdelay=10', + ] + + def add_boot_files(self, kernel=None, initrd=None, dtb=None): + if kernel: + self.params = self.params + ['-kernel', kernel] + if initrd: + self.params = self.params + ['-initrd', initrd] + if dtb: + self.params = self.params + ['-dtb', dtb] + + def add_drive(self, cloudimg): + drive = ["-drive"] + if self.config['qemu_disk_type'] == QEMU_DISK_TYPE_SD: + drive = drive + ["file=%s,if=sd,cache=writeback" % (cloudimg)] + elif self.config['qemu_disk_type'] == QEMU_DISK_TYPE_VIRTIO: + drive = drive + ["file=%s,if=virtio" % (cloudimg)] + elif self.config['qemu_disk_type'] == QEMU_DISK_TYPE_VIRTIO_BLK: + drive = drive + [ "file=%s,if=none,id=disk.%d" + % (cloudimg, self.drive_id) ] + drive = drive + [ "-device", "virtio-blk-device,drive=disk.%d" + % (self.drive_id) ] + self.params = self.params + drive + self.drive_id = self.drive_id + 1 + + def get_params(self): + params = self.params + if self.append: + params = params + ['-append', '"%s"' % (" ".join(self.append))] + return params + class KVMTest(object): - def __init__(self, image=None, timeout=500, debug_file="virt_debug"): + def __init__(self, image=None, timeout=500, debug_file=None): self.image = image self.timeout = timeout - self.debug_file = os.path.join(os.getcwd(), debug_file) - self.arch = check_output(['arch'], universal_newlines=True) + self.debug_file = debug_file + self.arch = check_output(['dpkg', '--print-architecture'], universal_newlines=True).strip() + self.qemu_config = QEMU_ARCH_CONFIG[self.arch] def download_image(self): """ @@ -70,10 +193,13 @@ class KVMTest(object): # Construct URL cloud_url = "http://cloud-images.ubuntu.com" - if re.match("arm.*", self.arch): - cloud_iso = release + "-server-cloudimg-armhf.tar.gz" + if self.qemu_config['cloudimg_type'] == CLOUD_IMAGE_TYPE_TAR: + cloud_iso = "%s-server-cloudimg-%s.tar.gz" % (release, self.qemu_config['cloudimg_arch']) + elif self.qemu_config['cloudimg_type'] == CLOUD_IMAGE_TYPE_DISK: + cloud_iso = "%s-server-cloudimg-%s-disk1.img" % (release, self.qemu_config['cloudimg_arch']) else: - cloud_iso = release + "-server-cloudimg-i386-disk1.img" + logging.error("Unknown cloud image type") + return False image_url = "/".join(( cloud_url, release, "current", cloud_iso)) @@ -82,12 +208,15 @@ class KVMTest(object): # Attempt download try: resp = urllib.request.urlretrieve(image_url, cloud_iso) - except (IOError, OSError, urllib.error.HTTPError) as exception: + except (IOError, + OSError, + urllib.error.HTTPError, + urllib.error.URLError) as exception: logging.error("Failed download of image from %s: %s", image_url, exception) return False # Unpack img file from tar - if re.match("arm.*", self.arch): + if self.qemu_config['cloudimg_type'] == CLOUD_IMAGE_TYPE_TAR: cloud_iso_tgz = tarfile.open(cloud_iso) cloud_iso = cloud_iso.replace('tar.gz', 'img') cloud_iso_tgz.extract(cloud_iso) @@ -105,42 +234,29 @@ class KVMTest(object): logging.debug("Attempting boot for:{}".format(data_disk)) - # Set Arbitrary IP values - netrange = "10.0.0.0/8" - image_ip = "10.0.0.1" - hostfwd = "tcp::2222-:22" - cloud_disk = "" + qemu = QemuRunner(self.arch) + + # Assume that a tar type image is not self-bootable, so + # therefore requires explicit bootfiles (otherwise, why + # not just use the disk format directly? + if self.qemu_config['cloudimg_type'] == CLOUD_IMAGE_TYPE_TAR: + for dir in ['/boot', '/']: + kernel = os.path.join(dir, 'vmlinuz') + initrd = os.path.join(dir, 'initrd.img') + if os.path.isfile(kernel): + qemu.add_boot_files(kernel=kernel, initrd=initrd) + break + + qemu.add_drive(data_disk) # Should we attach the cloud config disk if os.path.isfile("seed.iso"): logging.debug("Attaching Cloud config disk") - cloud_disk = "-drive file=seed.iso,if=virtio" + qemu.add_drive("seed.iso") + + params = qemu.get_params() + logging.debug("Using params:{}".format(" ".join(params))) - if re.match("(arm.*|aarch64)", self.arch): - uname = check_output(['uname', '-r'], universal_newlines=True) - cloud_disk = cloud_disk.replace("virtio", "sd") - params = 'qemu-system-arm -machine vexpress-a15 -cpu cortex-a15 -enable-kvm -m {} -kernel /boot/vmlinuz -append "console=ttyAMA0 earlyprintk=serial root=/dev/mmcblk0 ro rootfstype=ext4" -serial stdio -dtb /lib/firmware/{}/device-tree/vexpress-v2p-ca15-tc1.dtb -initrd /boot/initrd.img -net nic -net user,net={},host={},hostfwd={} -drive file={},if=sd,cache=writeback {} -display none -nographic'.format(uname, "256", netrange, image_ip, hostfwd, data_disk, cloud_disk) - else: - params = \ - ''' - qemu-system-x86_64 -machine accel=kvm:tcg -m {0} -net nic - -net user,net={1},host={2},hostfwd={3} - -drive file={4},if=virtio {5} -display none -nographic - '''.format( - "256", - netrange, - image_ip, - hostfwd, - data_disk, - cloud_disk).replace("\n", "").replace(" ", "") - - logging.debug("Using params:{}".format(params)) - - # Default file location for log file is in checkbox output directory - checkbox_dir = os.getenv("CHECKBOX_DATA") - - if checkbox_dir is not None: - self.debug_file = os.path.join(checkbox_dir, self.debug_file) logging.info("Storing VM console output in {}".format( os.path.realpath(self.debug_file))) # Open VM STDERR/STDOUT log file for writing @@ -152,7 +268,7 @@ class KVMTest(object): # Start Virtual machine self.process = Popen( - shlex.split(params), stdin=PIPE, stderr=file, stdout=file, + params, stdin=PIPE, stderr=file, stdout=file, universal_newlines=True, shell=False) def create_cloud_disk(self): @@ -220,8 +336,10 @@ final_message: CERTIFICATION BOOT COMPLETE instance = self.boot_image(self.image) time.sleep(self.timeout) - # Reset Console window to regain control from VM Serial I/0 - call('reset') + # If running in console, reset console window to regain + # control from VM Serial I/0 + if sys.stdout.isatty(): + call('reset') # Check to be sure VM boot was successful with open(self.debug_file, 'r') as debug_file: file_contents = debug_file.read() @@ -296,7 +414,7 @@ def test_kvm(args): if args.image: image = args.image - kvm_test = KVMTest(image, timeout) + kvm_test = KVMTest(image, timeout, args.log_file) result = kvm_test.start() sys.exit(result) @@ -319,6 +437,9 @@ def main(): '-i', '--image', type=str, default=None) kvm_test_parser.add_argument( '-t', '--timeout', type=int) + kvm_test_parser.add_argument( + '-l', '--log-file', default='virt_debug', + help="Location for debugging output log. Defaults to %(default)s.") kvm_test_parser.add_argument('--debug', dest='log_level', action="store_const", const=logging.DEBUG, default=logging.INFO) diff --git a/bin/wifi_time2reconnect b/bin/wifi_time2reconnect index bfaa2ce..d4ae2c0 100755 --- a/bin/wifi_time2reconnect +++ b/bin/wifi_time2reconnect @@ -6,6 +6,11 @@ import sys import time import subprocess from datetime import datetime +try: + from subprocess import DEVNULL # >= python3.3 +except ImportError: + import os + DEVNULL = open(os.devnull, 'wb') IFACE = None TIMEOUT = 30 @@ -16,14 +21,23 @@ def main(): Check the time needed to reconnect an active WIFI connection """ devices = subprocess.getoutput('nmcli dev') - match = re.search('(\w+)\s+802-11-wireless\s+connected', devices) + match = re.search('(\w+)\s+(802-11-wireless|wifi)\s+connected', devices) if match: IFACE = match.group(1) else: print("No active wifi connection detected", file=sys.stderr) return 1 - dev_status = subprocess.getoutput('nmcli -t -f devices,uuid con status') + try: + dev_status = subprocess.check_output( + ['nmcli', '-t', '-f', 'devices,uuid', 'con', 'status'], + stderr=DEVNULL, + universal_newlines=True) + except subprocess.CalledProcessError: + dev_status = subprocess.check_output( + ['nmcli', '-t', '-f', 'device,uuid', 'con', 'show'], + stderr=DEVNULL, + universal_newlines=True) match = re.search(IFACE+':(.*)', dev_status) uuid = None if match: diff --git a/bin/xrandr_cycle b/bin/xrandr_cycle index ef20c41..f778562 100755 --- a/bin/xrandr_cycle +++ b/bin/xrandr_cycle @@ -1,13 +1,14 @@ #!/usr/bin/env python3 -import subprocess import argparse -import tarfile -import shutil -import time -import sys +import errno import os import re +import shutil +import subprocess +import sys +import tarfile +import time parser = argparse.ArgumentParser() parser.add_argument('--keyword', default='', @@ -68,7 +69,19 @@ for line in output: # Now we have a list of the modes we need to test. So let's do just that. profile_path = os.environ['HOME'] + '/.shutter/profiles/' screenshot_path = os.path.join(args.screenshot_dir, 'xrandr_screens') -script_home = os.path.split(os.path.dirname(os.path.realpath(__file__)))[0] + +# Where to find the shutter.xml template? Two possible locations. +shutter_xml_template = None + +if 'PLAINBOX_PROVIDER_DATA' in os.environ: + shutter_xml_template = os.path.join(os.environ['PLAINBOX_PROVIDER_DATA'], + "settings", "shutter.xml") +else: + shutter_xml_template = os.path.join(os.path.split(os.path.dirname( + os.path.realpath(__file__)))[0], + "data", + "settings", + "shutter.xml") if args.keyword: screenshot_path = screenshot_path + '_' + args.keyword @@ -76,21 +89,27 @@ if args.keyword: regex = re.compile(r'filename="[^"\r\n]*"') # Keep the shutter profile in place before starting -try: - os.makedirs(profile_path) -except OSError: - pass +# Any errors creating the directories or copying the template is fatal, +# since things won't work if we fail. try: - os.makedirs(screenshot_path) -except OSError: - pass + os.makedirs(profile_path, exist_ok=True) + os.makedirs(screenshot_path, exist_ok=True) +except OSError as excp: + raise SystemExit("ERROR: Unable to create " + "required directories: {}".format(excp)) try: - shutil.copy(script_home + '/data/settings/shutter.xml', - profile_path) -except IOError: - pass + shutil.copy(shutter_xml_template, profile_path) +except (IOError, OSError) as excp: + print("ERROR: Unable to copy {} to {}: {}".format(shutter_xml_template, + profile_path, + excp)) + if excp.errno == errno.ENOENT: + print("Try setting PLAINBOX_PROVIDER_DATA to the the data path of a") + print("provider shipping the 'shutter.xml' template file, usually ") + print("found under /usr/share.") + raise SystemExit() try: old_profile = open(profile_path + 'shutter.xml', 'r') @@ -102,7 +121,8 @@ try: new_profile.close() old_profile.close() except: - pass + raise SystemExit("ERROR: While updating folder name " + "in shutter profile: {}".format(sys.exc_info())) for mode in modes: cmd = 'xrandr --output ' + mode[0] + ' --mode ' + mode[1] @@ -134,7 +154,9 @@ for mode in modes: except: print("""Could not configure screenshot tool - you may need to install the package 'shutter', - or check that %s exists.""" % profile_path) + or check that {}/{} exists and is writable.""".format( + profile_path, + 'shutter.xml')) message = 'Set mode ' + mode[1] + ' for output ' + mode[0] success_messages.append(message) |