summaryrefslogtreecommitdiff
path: root/bin
diff options
authorSylvain Pineau <sylvain.pineau@canonical.com>2016-10-07 09:48:55 +0200
committerSylvain Pineau <sylvain.pineau@canonical.com>2016-10-07 09:48:55 +0200
commit33a440473829a82188a9547146789fae72185c5e (patch)
treeaa8efa1434f252432ab99274eaca45a9427207bc /bin
parentcbbe2fe63a1b22f6d0873bfe11ff3e834a1a883c (diff)
Import plainbox-provider-checkbox_0.21.orig.tar.gzupstream-0.21patched-0.21-1
Diffstat (limited to 'bin')
-rwxr-xr-xbin/audio_bluetooth_loopback_test57
-rwxr-xr-xbin/bluetooth_transfer_stress54
-rwxr-xr-xbin/bmc_info61
-rwxr-xr-xbin/broadband_info6
-rwxr-xr-xbin/camera_test42
-rwxr-xr-xbin/connect_wireless37
-rwxr-xr-xbin/cpu_offlining2
-rwxr-xr-xbin/create_connection16
-rwxr-xr-xbin/disk_smart182
-rwxr-xr-xbin/dkms_info569
-rwxr-xr-xbin/dmitest230
-rwxr-xr-xbin/fwts_test273
-rwxr-xr-xbin/gateway_ping_test275
-rwxr-xr-xbin/get_make_and_model35
-rwxr-xr-xbin/glob_test121
-rwxr-xr-xbin/gpu_test9
-rwxr-xr-xbin/graphics_driver4
-rwxr-xr-xbin/hdd_parking74
-rwxr-xr-xbin/lock_screen_watcher29
-rwxr-xr-xbin/manage_compiz_plugin63
-rwxr-xr-xbin/memory_compare71
-rwxr-xr-xbin/memory_test6
-rwxr-xr-xbin/network490
-rwxr-xr-xbin/network_device_info8
-rwxr-xr-xbin/network_info18
-rwxr-xr-xbin/network_wait11
-rwxr-xr-xbin/optical_write_test4
-rwxr-xr-xbin/piglit_test98
-rwxr-xr-xbin/pm_test16
-rwxr-xr-xbin/pts_run4
-rwxr-xr-xbin/recovery_info391
-rwxr-xr-xbin/removable_storage_test192
-rwxr-xr-xbin/rotation_test3
-rwxr-xr-xbin/sleep_test142
-rwxr-xr-xbin/sleep_time_check27
-rwxr-xr-xbin/storage_test20
-rwxr-xr-xbin/touchpad_driver_info2
-rwxr-xr-xbin/virtualization209
-rwxr-xr-xbin/wifi_time2reconnect18
-rwxr-xr-xbin/xrandr_cycle60
40 files changed, 2901 insertions, 1028 deletions
diff --git a/bin/audio_bluetooth_loopback_test b/bin/audio_bluetooth_loopback_test
new file mode 100755
index 0000000..9661ec4
--- /dev/null
+++ b/bin/audio_bluetooth_loopback_test
@@ -0,0 +1,57 @@
+#!/bin/bash
+#
+# This file is part of Checkbox.
+#
+# Copyright 2014 Canonical Ltd.
+#
+# Authors: Daniel Manrique <roadmr@ubuntu.com>
+#
+# Checkbox is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License version 3,
+# as published by the Free Software Foundation.
+#
+# Checkbox is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with Checkbox. If not, see <http://www.gnu.org/licenses/>.
+#
+# This simple script finds a bluetooth source and sink, and records from the
+# source for 6 seconds, playing the recording back into the sink. It helps a
+# human validate that record/playback is working, human can speak into
+# microphone and just ensure the speech can be heard instantly in the headset.
+
+[ -x "`which pactl`" ] || exit 1
+[ -x "`which pacat`" ] || exit 1
+
+SINK=$(pactl list | sed -n '/monitor/d;s/Name: \(bluez_sink\.\)/\1/p')
+SOURCE=$(pactl list | sed -n '/monitor/d;s/Name: \(bluez_source\.\)/\1/p')
+
+
+if [ -n "$SINK" ] && [ -n "$SOURCE" ]; then
+ PLAYBACK_LOG=$(mktemp --tmpdir audio_bluetooth_loopback.XXXXX)
+ RECORD_LOG=$(mktemp --tmpdir audio_bluetooth_loopback.XXXXX)
+ trap "rm $PLAYBACK_LOG $RECORD_LOG" EXIT
+ # ensure we exit with failure if parec fails, and not with pacat
+ # --playback's error code
+ set -o pipefail
+ # Use a short latency parameter so time between speech and hearing it is
+ # short, makes for a nicer interactive experience
+ LATENCY="--latency-msec=50"
+ # time out after 6 seconds, forcibly kill after 8 seconds if pacat didn't
+ # respond
+ echo "Recording and playing back, please speak into bluetooth microphone"
+ timeout -k 8 6 pacat $LATENCY --record -v -d $SOURCE 2>$RECORD_LOG | \
+ pacat $LATENCY --playback -v -d $SINK 2>$PLAYBACK_LOG
+
+ echo "RECORD LOG"
+ cat $RECORD_LOG
+ echo ""
+ echo "PLAYBACK LOG"
+ cat $PLAYBACK_LOG
+else
+ echo "No bluetooth audio device found"
+ exit 1
+fi
diff --git a/bin/bluetooth_transfer_stress b/bin/bluetooth_transfer_stress
new file mode 100755
index 0000000..2add0b1
--- /dev/null
+++ b/bin/bluetooth_transfer_stress
@@ -0,0 +1,54 @@
+#!/bin/bash
+#
+# Copyright (C) 2014 Canonical
+#
+# Authors:
+# Daniel Manrique <daniel.manrique@canonical.com>
+#
+# This program is free software; you can redistribute it and/or
+# modify it under the terms of the GNU General Public License
+# as published by the Free Software Foundation; either version 2
+# of the License, or (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+#
+BTDEVADDR="$1"
+
+if [ -z "$BTDEVADDR" ]; then
+ echo "Please give Bluetooth device address as first parameter"
+ exit 1
+fi
+
+ORIGIN=$(mktemp --tmpdir bluetooth-stress.XXXXXX)
+DESTINATION=$(mktemp --tmpdir bluetooth-stress.XXXXXX)
+REMOTE=$RANDOM
+SIZEKB=10240
+echo "Creating ${SIZEKB}KB file to test transfer"
+dd if=/dev/urandom of=$ORIGIN count=$SIZEKB bs=1024
+ORIGIN_SUM=$(sha256sum $ORIGIN | cut -f 1 -d ' ')
+set -o pipefail
+echo "Sending file using Bluetooth"
+time obexftp -v -b $BTDEVADDR -o $REMOTE --put $ORIGIN 2>&1 | ansi_parser
+sleep 5
+echo "Receiving file using Bluetooth"
+time obexftp -v -b $BTDEVADDR -o $DESTINATION --get $REMOTE 2>&1 | ansi_parser
+# Now checksum destination and compare
+DESTINATION_SUM=$(sha256sum $DESTINATION | cut -f 1 -d ' ')
+# Clean up before reporting
+rm $ORIGIN $DESTINATION
+if [ "$ORIGIN_SUM" = "$DESTINATION_SUM" ]; then
+ echo "Checksums match, file transfer succeeded"
+ exit 0
+else
+ echo "Checksums don't match, file was corrupted during transfers."
+ echo "Original checksum: $ORIGIN_SUM"
+ echo "Checksum of received file: $DESTINATION_SUM"
+ exit 1
+fi
diff --git a/bin/bmc_info b/bin/bmc_info
new file mode 100755
index 0000000..d7aec9b
--- /dev/null
+++ b/bin/bmc_info
@@ -0,0 +1,61 @@
+#!/usr/bin/env python3
+
+import sys
+import shlex
+from subprocess import check_output, CalledProcessError
+
+def main():
+ # First, we need to get output
+ cmd = "ipmitool mc info"
+ try:
+ result = check_output(shlex.split(cmd), universal_newlines=True)
+ except FileNotFoundError:
+ print("ipmitool was not found! Please install it and try again.",
+ file=sys.stderr)
+ return 1
+ except CalledProcessError as e:
+ print("Problem running %s. Error was %s" % (cmd,e),file=sys.stderr)
+ return 1
+ result = result.split('\n')
+
+ # We need some bits that are formatted oddly so we need to do some parsing
+ data = {}
+ for line in result:
+ if ':' in line:
+ key = line.split(':')[0].strip()
+ value = line.split(':')[1].strip()
+ data[key] = value
+ last = (key, [value])
+ else:
+ # since the last line we matched had a ':', it's key is likely the
+ # key for the next few lines that don't have a ':'
+ # This should keep adding items to our last key's list until we hit
+ # another line with a :, and we start the cycle over again.
+ last[1].append(line.strip())
+ data[last[0]] = last[1]
+
+ # Now print out what we care about:
+ we_care_about = ['Manufacturer Name',
+ 'Manufacturer ID',
+ 'Product Name',
+ 'Product ID',
+ 'Firmware Revision',
+ 'IPMI Version',
+ 'Additional Device Support']
+
+ for field in we_care_about:
+ if type(data[field]) is list:
+ # Sometimes the first item in the list is ''. This will remove it
+ data[field].remove('')
+ print(field.ljust(30),':',data[field].pop(0))
+ for item in data[field]:
+ print(' '.ljust(32),item)
+ else:
+ print(field.ljust(30),":",data[field])
+ #print("{}:\t{}".format(field, data[field]))
+
+ return 0
+
+
+if __name__ == "__main__":
+ sys.exit(main())
diff --git a/bin/broadband_info b/bin/broadband_info
new file mode 100755
index 0000000..aec2ccb
--- /dev/null
+++ b/bin/broadband_info
@@ -0,0 +1,6 @@
+#!/bin/bash
+
+for i in $(mmcli --simple-status -L | \
+ awk '/freedesktop\/ModemManager1\/Modem/ {print $1;}'); do
+ mmcli -m $i
+done
diff --git a/bin/camera_test b/bin/camera_test
index 019736c..c3e554c 100755
--- a/bin/camera_test
+++ b/bin/camera_test
@@ -26,20 +26,22 @@
# along with Checkbox. If not, see <http://www.gnu.org/licenses/>.
#
+import argparse
+import ctypes
+import errno
+import fcntl
+import imghdr
+import logging
import os
import re
+import struct
import sys
import time
-import errno
-import fcntl
-import ctypes
-import struct
-import imghdr
-from tempfile import NamedTemporaryFile
-from subprocess import check_call, CalledProcessError, STDOUT
-import argparse
-from glob import glob
+
from gi.repository import GObject
+from glob import glob
+from subprocess import check_call, CalledProcessError, STDOUT
+from tempfile import NamedTemporaryFile
_IOC_NRBITS = 8
@@ -189,8 +191,8 @@ class CameraTest:
print(" driver : %s" % cp.driver.decode('UTF-8'))
print(" version: %s.%s.%s"
% (cp.version >> 16,
- (cp.version >> 8) & 0xff,
- cp.version & 0xff))
+ (cp.version >> 8) & 0xff,
+ cp.version & 0xff))
print(" flags : 0x%x [" % cp.capabilities,
' CAPTURE' if cp.capabilities & V4L2_CAP_VIDEO_CAPTURE
else '',
@@ -205,7 +207,7 @@ class CameraTest:
resolutions = self._get_supported_resolutions(device)
print(' ',
self._supported_resolutions_to_string(resolutions).replace(
- "\n", " "),
+ "\n", " "),
sep="")
if cp.capabilities & V4L2_CAP_VIDEO_CAPTURE:
@@ -223,6 +225,7 @@ class CameraTest:
% {'device': self.args.device,
'type': self._gst_video_type,
'plugin': self._gst_plugin})
+ logging.debug("LED test with pipeline %s", pipespec)
self._pipeline = Gst.parse_launch(pipespec)
self._pipeline.set_state(Gst.State.PLAYING)
time.sleep(10)
@@ -241,6 +244,7 @@ class CameraTest:
'width': self._width,
'height': self._height,
'plugin': self._gst_plugin})
+ logging.debug("display test with pipeline %s", pipespec)
self._pipeline = Gst.parse_launch(pipespec)
self._pipeline.set_state(Gst.State.PLAYING)
time.sleep(10)
@@ -289,6 +293,8 @@ class CameraTest:
'height': height,
'plugin': self._gst_plugin,
'filename': filename})
+ logging.debug("still test with gstreamer and "
+ "pipeline %s", pipespec)
self._pipeline = Gst.parse_launch(pipespec)
self._pipeline.set_state(Gst.State.PLAYING)
time.sleep(3)
@@ -425,8 +431,8 @@ class CameraTest:
# for continuous and stepwise, let's just use min and
# max they use the same structure and only return
# one result
- elif framesize.type == V4L2_FRMSIZE_TYPE_CONTINUOUS or\
- framesize.type == V4L2_FRMSIZE_TYPE_STEPWISE:
+ elif (framesize.type in (V4L2_FRMSIZE_TYPE_CONTINUOUS,
+ V4L2_FRMSIZE_TYPE_STEPWISE)):
resolutions.append([framesize.stepwise.min_width,
framesize.stepwise.min_height]
)
@@ -441,7 +447,6 @@ class CameraTest:
if e.errno != errno.EINVAL:
print("Unable to determine supported framesizes "
"(resolutions), this may be a driver issue.")
- return supported_formats
supported_format['resolutions'] = resolutions
return supported_formats
@@ -495,6 +500,10 @@ def parse_arguments(argv):
title='test',
description='Available camera tests')
+ parser.add_argument('--debug', dest='log_level',
+ action="store_const", const=logging.DEBUG,
+ default=logging.INFO, help="Show debugging messages")
+
def add_device_parameter(parser):
group = parser.add_mutually_exclusive_group()
group.add_argument("-d", "--device", default="/dev/video0",
@@ -505,7 +514,6 @@ def parse_arguments(argv):
group.add_argument("--lowest-device", action="store_true",
help=("Use the /dev/videoN "
"where N is the lowest value available"))
-
subparsers.add_parser('detect')
led_parser = subparsers.add_parser('led')
add_device_parameter(led_parser)
@@ -541,6 +549,8 @@ if __name__ == "__main__":
if not args.test:
args.test = 'detect'
+ logging.basicConfig(level=args.log_level)
+
# Import Gst only for the test cases that will need it
if args.test in ['display', 'still', 'led', 'resolutions']:
from gi.repository import Gst
diff --git a/bin/connect_wireless b/bin/connect_wireless
index 54f61f7..c1ee36c 100755
--- a/bin/connect_wireless
+++ b/bin/connect_wireless
@@ -1,23 +1,42 @@
#!/bin/bash
+# Check nmcli version
+NMCLI_GTE_0_9_10=0
+nmcli general 2>&1 >/dev/null
+if [ $? -eq 0 ]; then
+ NMCLI_GTE_0_9_10=1
+fi
+
# Any active connections?
conn=''
-active_connection=$(nmcli -f SSID,ACTIVE dev wifi list | grep yes)
-
-if [ $? -eq 0 ]
-then
- ap=$(echo $active_connection | awk -F\' '{print $2}')
- conn=$(nmcli -t -f UUID,TYPE,NAME con list | grep wireless | grep -e "$ap$" | awk -F\: '{print $1}')
+if [ $NMCLI_GTE_0_9_10 -eq 0 ]; then
+ active_connection=$(nmcli -f SSID,ACTIVE dev wifi list | grep yes)
+ if [ $? -eq 0 ]; then
+ ap=$(echo $active_connection | awk -F\' '{print $2}')
+ conn=$(nmcli -t -f UUID,TYPE,NAME con list | grep wireless | grep -e "$ap$" | head -n 1 | awk -F\: '{print $1}')
+ else
+ conn=$(nmcli -t -f UUID,TYPE con list | grep wireless | head -n 1 | awk -F\: '{print $1}')
+ fi
else
- conn=$(nmcli -t -f UUID,TYPE con list | grep wireless | head -n 1 | awk -F\: '{print $1}')
+ active_connection=$(nmcli -f SSID,ACTIVE dev wifi | grep yes)
+ if [ $? -eq 0 ]; then
+ ap=$(echo $active_connection | awk '{print $1}')
+ conn=$(nmcli -t -f UUID,TYPE,NAME con show | grep wireless | grep -e "$ap$" | head -n 1 | awk -F\: '{print $1}')
+ else
+ conn=$(nmcli -t -f UUID,TYPE con show | grep wireless | head -n 1 | awk -F\: '{print $1}')
+ fi
fi
#Strip trailing/leading whitespace
conn=$(echo $conn |sed 's/^[ \t]*//;s/[ \t]*$//')
# Find out if wireless is enabled
-nmcli nm wifi | grep -q 'enabled'
+if [ $NMCLI_GTE_0_9_10 -eq 0 ]; then
+ nmcli nm wifi | grep -q 'enabled'
+else
+ nmcli radio wifi | grep -q 'enabled'
+fi
if [ $? -ne 0 ]
then
# Find out why
@@ -35,7 +54,7 @@ nmcli dev status | grep -q '\<connected\>'
if [ $? -eq 0 ]
then
# Disconnect, pause for a short time
- for iface in `nmcli -f GENERAL dev list | grep 'GENERAL.DEVICE' | awk '{print $2}'`
+ for iface in `(nmcli -f GENERAL dev list 2>/dev/null || nmcli -f GENERAL dev show) | grep 'GENERAL.DEVICE' | awk '{print $2}'`
do
nmcli dev disconnect iface $iface
done
diff --git a/bin/cpu_offlining b/bin/cpu_offlining
index c1c02a8..0e88af1 100755
--- a/bin/cpu_offlining
+++ b/bin/cpu_offlining
@@ -40,7 +40,7 @@ done
if [ $result -eq 0 ]; then
echo "Successfully turned $cpu_count cores off and back on"
else
- echo "Error with offlining one or more cores." 1>&2
+ echo "Error with offlining one or more cores. CPU offline may not work if this is an ARM system." 1>&2
fi
exit $result
diff --git a/bin/create_connection b/bin/create_connection
index 6f92859..71e9572 100755
--- a/bin/create_connection
+++ b/bin/create_connection
@@ -5,6 +5,11 @@ import os
import time
from subprocess import check_call, check_output, CalledProcessError
+try:
+ from subprocess import DEVNULL # >= python3.3
+except ImportError:
+ import os
+ DEVNULL = open(os.devnull, 'wb')
from uuid import uuid4
from argparse import ArgumentParser
@@ -127,8 +132,15 @@ baud=115200
def block_until_created(connection, retries, interval):
while retries > 0:
- nmcli_con_list = check_output(['nmcli', 'con', 'list'],
- universal_newlines=True)
+ try:
+ nmcli_con_list = check_output(['nmcli', 'con', 'list'],
+ stderr=DEVNULL,
+ universal_newlines=True)
+ except CalledProcessError:
+ check_call(['nmcli', 'con', 'reload'])
+ nmcli_con_list = check_output(['nmcli', 'con', 'show'],
+ stderr=DEVNULL,
+ universal_newlines=True)
if connection in nmcli_con_list:
print("Connection %s registered" % connection)
diff --git a/bin/disk_smart b/bin/disk_smart
index 3545700..7e6929f 100755
--- a/bin/disk_smart
+++ b/bin/disk_smart
@@ -7,6 +7,7 @@ Copyright (C) 2010 Canonical Ltd.
Authors
Jeff Lane <jeffrey.lane@canonical.com>
Brendan Donegan <brendan.donegan@canonical.com>
+ Rod Smith <rod.smith@canonical.com>
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License version 2,
@@ -34,6 +35,12 @@ USB/eSATA/eSAS attached storage devices.
Changelog:
+v1.4: Fix script failure on disks with no pre-existing SMART tests
+v1.3: Fix detection of SMART availability & activate SMART if available
+ but deactivated. Also use smartctl return value rather than string-
+ matching to determine if a test has failed; this should be more
+ robust, as output strings vary between disks.
+v1.2: Handle multiple output formats for "smartctl -l"
v1.1: Put delay before first attempt to acces log, rather than after
v1.0: added debugger class and code to allow for verbose debug output if needed
@@ -58,8 +65,10 @@ import os
import sys
import time
import logging
+import shlex
-from subprocess import Popen, PIPE
+from subprocess import Popen, PIPE, check_call, check_output
+from subprocess import CalledProcessError
from argparse import ArgumentParser
@@ -72,27 +81,57 @@ class ListHandler(logging.StreamHandler):
msg = msg.decode()
logger = logging.getLogger(record.name)
new_record = logger.makeRecord(record.name, record.levelno,
- record.pathname, record.lineno, msg, record.args,
- record.exc_info, record.funcName)
+ record.pathname, record.lineno,
+ msg, record.args,
+ record.exc_info,
+ record.funcName)
logging.StreamHandler.emit(self, new_record)
else:
logging.StreamHandler.emit(self, record)
+def enable_smart(disk):
+ """ Enable SMART on the specified disk
+ :param disk:
+ disk device filename (e.g., /dev/sda)
+ :returns:
+ True if enabling smart was successful, False otherwise
+ """
+ logging.debug('SMART disabled; attempting to enable it.')
+ command = 'smartctl -s on %s' % disk
+ try:
+ check_call(shlex.split(command))
+ return True
+ except CalledProcessError:
+ return False
+
+
+# Returns True if SMART is enabled. If not enabled, attempt to
+# enable it and return result of that attempt.
def is_smart_enabled(disk):
# Check with smartctl to see if SMART is available and enabled on the disk
command = 'smartctl -i %s' % disk
diskinfo_bytes = (Popen(command, stdout=PIPE, shell=True)
- .communicate()[0])
+ .communicate()[0])
diskinfo = diskinfo_bytes.decode().splitlines()
logging.debug('SMART Info for disk %s', disk)
logging.debug(diskinfo)
- return (len(diskinfo) > 2
- and 'Enabled' in diskinfo[-2]
- and 'Available' in diskinfo[-3])
+ # Return True if the output (diskinfo) includes BOTH
+ # "SMART support is.*Available" AND "SMART support is.*Enabled".
+ # If SMART is available but not enabled, try to enable it.
+ if len(diskinfo) > 2:
+ if any("SMART support is" in s and "Available" in s
+ for s in diskinfo):
+ if any("SMART support is" in s and "Enabled" in s
+ for s in diskinfo):
+ return True
+ else:
+ return enable_smart(disk)
+ else:
+ return False
def run_smart_test(disk, type='short'):
@@ -110,22 +149,26 @@ def run_smart_test(disk, type='short'):
def get_smart_entries(disk, type='selftest'):
entries = []
- command = 'smartctl -l %s %s' % (type, disk)
- stdout = Popen(command, stdout=PIPE, shell=True).stdout
+ try:
+ stdout = check_output(['smartctl', '-l', type, disk],
+ universal_newlines=True)
+ returncode = 0
+ except CalledProcessError as err:
+ stdout = err.output
+ returncode = err.returncode
# Skip intro lines
- while True:
- line = stdout.readline().decode()
- if not line:
- raise Exception('Failed to parse SMART log entries')
-
- if line.startswith('SMART'):
+ stdout_lines = iter(stdout.splitlines())
+ for line in stdout_lines:
+ if (line.startswith('SMART') or
+ line.startswith('No self-tests have been logged')):
break
# Get lengths from header
- line = stdout.readline().decode()
+ line = next(stdout_lines)
if not line.startswith('Num'):
- return entries
+ entries.append('No entries found in log yet')
+ return entries, returncode
columns = ['number', 'description', 'status',
'remaining', 'lifetime', 'lba']
lengths = [line.index(i) for i in line.split()]
@@ -134,8 +177,7 @@ def get_smart_entries(disk, type='selftest'):
# Get remaining lines
entries = []
- for line_bytes in stdout.readlines():
- line = line_bytes.decode()
+ for line in stdout_lines:
if line.startswith('#'):
entry = {}
for i, column in enumerate(columns):
@@ -143,10 +185,64 @@ def get_smart_entries(disk, type='selftest'):
# Convert some columns to integers
entry['number'] = int(entry['number'][1:])
- entry['lifetime'] = int(entry['lifetime'])
entries.append(entry)
- return entries
+ return entries, returncode
+
+
+# Returns True if an "in-progress" message is found in the smartctl
+# output, False if such a message is not found. In the former case,
+# the in-progress message entries are logged.
+def in_progress(current_entries):
+ statuses = [entry for entry in current_entries
+ if isinstance(entry, dict)
+ and 'status' in entry
+ and (entry['status'] == 'Self-test routine in progress'
+ or "Self test in progress" in entry['status'])]
+ if statuses:
+ for entry in statuses:
+ logging.debug('%s %s %s %s' % (entry['number'],
+ entry['description'],
+ entry['status'],
+ entry['remaining']))
+ return True
+ else:
+ return False
+
+
+# Wait for SMART test to complete; return status and return code.
+# Note that different disks return different types of values.
+# Some return no status reports while a test is ongoing; others
+# show a status line at the START of the list of tests, and
+# others show a status line at the END of the list of tests
+# (and then move it to the top once the tests are done).
+def poll_for_status(args, disk, previous_entries):
+ # Priming read... this is here in case our test is finished or fails
+ # immediate after it beginsAccording to.
+ logging.debug('Polling selftest.log for status')
+ keep_going = True
+
+ while keep_going:
+ # Poll every sleep seconds until test is complete$
+ time.sleep(args.sleep)
+
+ current_entries, returncode = get_smart_entries(disk)
+ if current_entries != previous_entries:
+ if not in_progress(current_entries):
+ keep_going = False
+
+ if args.timeout is not None:
+ if args.timeout <= 0:
+ logging.debug('Polling timed out')
+ return 'Polling timed out', 1
+ else:
+ args.timeout -= args.sleep
+
+ if isinstance(current_entries[0], str):
+ return current_entries[0], returncode
+ else:
+ return current_entries[0]['status'], returncode
+
def main():
description = 'Tests that SMART capabilities on disks that support SMART function.'
@@ -183,7 +279,7 @@ def main():
logger.setLevel(logging.INFO)
# Make sure we're root, because smartctl doesn't work otherwise.
- if not os.geteuid()==0:
+ if not os.geteuid() == 0:
parser.error("You must be root to run this program")
# If SMART is available and enabled, we proceed. Otherwise, we exit as the
@@ -194,8 +290,8 @@ def main():
return 0
# Initiate a self test and start polling until the test is done
- previous_entries = get_smart_entries(disk)
- logging.info("Starting SMART self-test on %s" % disk)
+ previous_entries, returncode = get_smart_entries(disk)
+ logging.info("Starting SMART self-test on %s", disk)
if run_smart_test(disk) != 0:
logging.error("Error reported during smartctl test")
return 1
@@ -204,39 +300,17 @@ def main():
# Abort the previous instance
# so that polling can identify the difference
run_smart_test(disk)
- previous_entries = get_smart_entries(disk)
-
- # Priming read... this is here in case our test is finished or fails
- # immediate after it begins.
- logging.debug('Polling selftest.log for status')
-
- while True:
- # Poll every sleep seconds until test is complete$
- time.sleep(args.sleep)
-
- current_entries = get_smart_entries(disk)
- logging.debug('%s %s %s %s' % (current_entries[0]['number'],
- current_entries[0]['description'],
- current_entries[0]['status'],
- current_entries[0]['remaining']))
- if current_entries != previous_entries \
- and current_entries[0]["status"] != 'Self-test routine in progress':
- break
-
- if args.timeout is not None:
- if args.timeout <= 0:
- logging.debug('Polling timed out')
- return 1
- else:
- args.timeout -= args.sleep
+ previous_entries, returncode = get_smart_entries(disk)
- status = current_entries[0]['status']
+ status, returncode = poll_for_status(args, disk, previous_entries)
- if status != 'Completed without error':
- log = get_smart_entries(disk)
+ if returncode != 0:
+ log, returncode = get_smart_entries(disk)
logging.error("FAIL: SMART Self-Test appears to have failed for some reason. "
- "Run 'sudo smartctl -l selftest %s' to see the SMART log" % disk)
- logging.debug("Last self-test run status: %s" % status)
+ "Run 'sudo smartctl -l selftest %s' to see the SMART log",
+ disk)
+ logging.debug("Last smartctl return code: %d", returncode)
+ logging.debug("Last smartctl run status: %s", status)
return 1
else:
logging.info("PASS: SMART Self-Test completed without error")
diff --git a/bin/dkms_info b/bin/dkms_info
new file mode 100755
index 0000000..ae9a997
--- /dev/null
+++ b/bin/dkms_info
@@ -0,0 +1,569 @@
+#!/usr/bin/env python3
+# encoding: utf-8
+# Copyright 2015 Canonical Ltd.
+# Written by:
+# Shawn Wang <shawn.wang@canonical.com>
+# Zygmunt Krynicki <zygmunt.krynicki@canonical.com>
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License version 3,
+# as published by the Free Software Foundation.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+"""
+dkms_info provides device package information.
+
+supported package types:
+ - dkms (Dynamic Kernel Module Support): provides kernel modules
+ - non-dkms: packages that with modaliases header and don't exist in dkms list
+ - hardware modalias: might be unused dkms or config package
+ - oemalias: It is like non-dkms(w/o modalias)
+supported output format:
+ - onelines: one line per packages with matched modaliases information
+ - dumps: json output (fully information)
+"""
+
+import fnmatch
+import functools
+import email.parser
+import io
+import json
+import logging
+import os
+import subprocess
+import sys
+import unittest
+
+from guacamole import Command
+
+try:
+ from unittest import mock
+except ImportError:
+ from plainbox.vendor import mock
+
+_logger = logging.getLogger(None)
+
+
+@functools.lru_cache()
+def get_system_module_list():
+ """
+ Use lsmod to list current kernel modules.
+
+ :returns:
+ A list of module names that are loaded into the current kernel.
+ """
+ _logger.info("Looking at inserted kernel modules")
+ modules = []
+ with io.open("/proc/modules", 'rt',
+ encoding='UTF-8') as stream:
+ for line in stream.readlines():
+ modules.append(line.split()[0].strip())
+ return modules
+
+
+@functools.lru_cache()
+def get_system_modaliases():
+ r"""
+ List machine modaliases.
+
+ :returns:
+ dict of hardware modaliases.
+ key: modalias_type
+ value: list of modalias_string
+ """
+ _logger.info("Looking for modalias files in /sys/devices")
+
+ result = {}
+ name = "modalias"
+ for root, dirs, files in os.walk("/sys/devices/"):
+ if name in files:
+ with io.open(os.path.join(root, name), 'rt',
+ encoding='UTF-8') as stream:
+ data = stream.read().strip()
+ (modalias_type, modalias_string) = data.split(":", 1)
+
+ if modalias_type not in result:
+ result[modalias_type] = []
+ if modalias_string not in result[modalias_type]:
+ result[modalias_type].append(modalias_string)
+ return result
+
+
+def get_installed_dkms_modules():
+ """
+ Query dkms_status from /var/lib/dkms/.
+
+ An installed dkms module has the below directory.
+ /var/lib/dkms/<dkms_name>/<dkms_ver>/<kernel_ver>
+
+ :returns:
+ list of (<dkms_name>, <dkms_ver>)
+ """
+ _dkmses = []
+ _logger.info("Querying dkms database")
+
+ path = "/var/lib/dkms/"
+ for root, dirs, files in os.walk(path):
+ if os.uname().release in dirs:
+ dkms = root[len(path):].split("/")
+ if len(dkms) != 2:
+ continue
+ _dkmses.append(dkms)
+ return _dkmses
+
+
+@functools.lru_cache()
+def match_patterns(patterns):
+ """
+ Check modalias patterns matched with modalias, or type is oemalias.
+
+ oemalias is a special pattern_type for oem.
+ :param patterns:
+ list of modalias pattern from debian package.
+ :returns:
+ list of modalias pattern matched with hardware modalias
+ """
+ _logger.info("Looking for modalias objects matching")
+ matched = []
+ if not patterns:
+ return matched
+ hw_modaliases = get_system_modaliases()
+ for pattern in patterns:
+ pattern_array = pattern.split(":", 1)
+ if len(pattern_array) < 2:
+ _logger.info("skip pattern {}, can't find type".format(pattern))
+ continue
+
+ (pattern_type, pattern_string) = pattern_array
+
+ if pattern_type == "oemalias":
+ matched.append(pattern)
+ if pattern_type not in hw_modaliases:
+ continue
+ for item in hw_modaliases[pattern_type]:
+ if fnmatch.fnmatch(item, pattern_string):
+ matched.append(pattern)
+ return matched
+
+
+class SystemInfoTests(unittest.TestCase):
+
+ """Tests for System Information Parsing and Collection."""
+
+ _proc_modules = """\
+xt_REDIRECT 16384 3 - Live 0x0000000000000000
+nf_nat_redirect 16384 1 xt_REDIRECT, Live 0x0000000000000000
+xt_hl 16384 3 - Live 0x0000000000000000
+hid_generic 16384 0 - Live 0x0000000000000000
+usbhid 53248 0 - Live 0x0000000000000000
+hid 110592 2 hid_generic,usbhid, Live 0x0000000000000000
+overlay 45056 1 - Live 0x0000000000000000
+"""
+ _modalias = """\
+usb:v1D6Bp0003d0319dc09dsc00dp03ic09isc00ip00in00
+"""
+
+ def setUp(self):
+ """Common setup code."""
+ get_system_module_list.cache_clear()
+ get_system_modaliases.cache_clear()
+
+ @mock.patch('io.open', mock.mock_open(read_data=_proc_modules))
+ def test_get_module_list__calls_and_parses_lsmod(self):
+ """Ensure that get_module_list() parses lsmod output."""
+ # NOTE: Return value was loaded from my system running kernel 4.0.
+ # The first few and last rows to be precise.
+ modules = get_system_module_list()
+ self.assertEqual(modules, [
+ 'xt_REDIRECT', 'nf_nat_redirect', 'xt_hl', 'hid_generic',
+ 'usbhid', 'hid', 'overlay'])
+
+ @mock.patch('io.open', mock.mock_open(read_data=_proc_modules))
+ def test_get_module_list_is_cached(self):
+ """Ensure that get_module_list() cache works."""
+ modules1 = get_system_module_list()
+ modules2 = get_system_module_list()
+ self.assertIn('xt_REDIRECT', modules1)
+ self.assertIn('overlay', modules2)
+ self.assertEqual(modules1, modules2)
+
+ @mock.patch('os.walk')
+ @mock.patch('io.open', mock.mock_open(read_data=_modalias))
+ def test_get_system_modalias(self, mock_os_walk):
+ """test_get_system_modalias."""
+ mock_os_walk.return_value = [
+ ("/sys/devices/pci0000:00/0000:00:14.0/usb2/2-0:1.0/modalias",
+ ["driver", "subsystem"],
+ ["modalias", "uevent"]),
+ ]
+
+ """fetch hw_modaliases from machine and check modalis types."""
+ modaliases = get_system_modaliases()
+ self.assertEqual(len(modaliases), 1)
+ self.assertIn("usb", modaliases)
+
+ @mock.patch('os.uname')
+ @mock.patch('os.walk')
+ def test_get_installed_dkms_modules(self, mock_os_walk, mock_os_uname):
+ """test_get_installed_dkms_modules."""
+ mock_os_walk.return_value = [
+ ("/var/lib/dkms/hello/0.1",
+ ["3.19.0-15-generic", "build", "source"],
+ []),
+ ]
+ o = mock.Mock()
+ o.release = "3.19.0-15-generic"
+ mock_os_uname.return_value = o
+ self.assertEqual([['hello', '0.1']],
+ get_installed_dkms_modules())
+
+ @mock.patch('__main__.get_system_modaliases')
+ def test_match_patterns(self, mock_get_system_modaliases):
+ """Test of match_patterns."""
+ mock_get_system_modaliases.return_value = {
+ "pci": ["v0000168Cd00000036sv0000103Csd0000217Fbc02sc80i00",
+ "v00008086d00008C26sv0000103Csd000022D9bc0Csc03i20"],
+ "usb": ["v8087p8000d0005dc09dsc00dp01ic09isc00ip00in00",
+ "v1D6Bp0002d0319dc09dsc00dp00ic09isc00ip00in00"],
+ }
+ pkg_modalieses = ["pci:v00008086d00008C26sv*sd*bc*sc*i*",
+ "usb:v07B4p010Ad0102dc*dsc*dp*ic*isc*ip*in*",
+ "oemalias:test"]
+ matched_modalieses = match_patterns(tuple(pkg_modalieses))
+ # match_patterns
+ self.assertIn("pci:v00008086d00008C26sv*sd*bc*sc*i*",
+ matched_modalieses)
+ self.assertIn("oemalias:test",
+ matched_modalieses)
+ self.assertNotIn("usb:v07B4p010Ad0102dc*dsc*dp*ic*isc*ip*in*",
+ matched_modalieses)
+
+
+class DkmsPackage(object):
+
+ """
+ Handle DKMS type device package, DKMS is a kernel module framework.
+
+ It generate modules for installed kernel or different kernel versions.
+ The dkms modules will be copied to /lib/modulesa/`uname -r`/updates/dkms/.
+ Those modules might be load by modaliases information.
+ """
+
+ def __init__(self, name, version):
+ """
+ init of DkmsPackage, define all the attribute.
+
+ :param name:
+ DKMS module name
+ :param version:
+ DKMS module version
+ """
+ self.dkms_name = name
+ self.dkms_ver = version
+ self.pkg_name = self._query_package()
+ self.kernel_ver = os.uname().release
+ self.arch = os.uname().machine
+ self.mods = self._list_modules()
+ self.install_mods = self._list_install_modules()
+ self.pkg = None
+
+ def _query_package(self):
+ """
+ Query debian package of dkms.
+
+ Use dpkg -S to check dkms src path of debian package.
+
+ :return:
+ string of package name or None
+ """
+ path = "/usr/src/{}-{}/dkms.conf".format(self.dkms_name, self.dkms_ver)
+ _logger.info("Looking for packages that provide: %s", path)
+ dpkg_info_root = "/var/lib/dpkg/info"
+ for fn in os.listdir(dpkg_info_root):
+ if not fn.endswith(".list"):
+ continue
+ with io.open(os.path.join(dpkg_info_root, fn), 'rt',
+ encoding='UTF-8') as stream:
+ if path in stream.read():
+ return fn[:-len(".list")]
+ return None
+
+ def _list_modules(self):
+ """
+ List all the kernel modules that provide by the dkms package.
+
+ Module name (.ko) with "-" will be replace to "_" when module loaded.
+
+ :param path:
+ The directory to look at.
+ :return:
+ List of kernel modules
+ """
+ path = "/var/lib/dkms/{}/{}/{}/{}/module".format(
+ self.dkms_name, self.dkms_ver, self.kernel_ver, self.arch)
+ _logger.info("Looking for kernel modules in %s", path)
+ result = []
+ for module_file in os.listdir(path):
+ (module, extension) = os.path.splitext(module_file)
+ if extension == ".ko":
+ result.append(module.replace("-", "_"))
+ return result
+
+ def _list_install_modules(self):
+ """
+ Return a dict of install_modules.
+
+ key is installed module name
+ value is tuple of matched patterns
+
+ :return:
+ Dict of installed dkms modules
+ """
+ install_mods = {}
+ for m in self.mods:
+ if m not in get_system_module_list():
+ continue
+ _logger.info("Inspecting module %s", m)
+
+ output = subprocess.check_output(["modinfo", m],
+ universal_newlines=True)
+ aliases = []
+ for line in output.splitlines():
+ if not line.startswith("alias:"):
+ continue
+
+ key, value = line.split(':', 1)
+ aliases.append(value.strip())
+
+ install_mods[m] = match_patterns(tuple(aliases))
+ return install_mods
+
+def _headers_to_dist(pkg_str):
+ """
+ Convert rft822 headers string to dict.
+
+ :param headers:
+ deb822 headers object
+ :return:
+ dict, the key is lowercase of deb822 headers key
+ """
+
+ header = email.parser.Parser().parsestr(pkg_str)
+ target = {}
+ for key in header.keys():
+ target[key.lower()] = header[key]
+ return target
+
+class DebianPackageHandler(object):
+
+ """Use rtf822(email) to handle the package information from file_object."""
+
+ def __init__(self, extra_pkgs=[], file_object=None):
+ """
+ DebianPackageHandler.
+
+ :param file_object:
+ default file open from /var/lib/dpkg/status,
+ where stored system package information
+ """
+ if file_object is None:
+ file_object = io.open('/var/lib/dpkg/status', 'rt',
+ encoding='UTF-8')
+ self._file_object = file_object
+ self.extra_pkgs = extra_pkgs
+ self.pkgs = self._get_device_pkgs()
+
+ def _gen_all_pkg_strs(self):
+ """
+ Get package information in /var/lib/dpkg/status.
+
+ :returns:
+ A generator of debian package.
+ """
+ _logger.info("Loading information about all packages")
+ for pkg_str in self._file_object.read().split('\n\n'):
+ yield pkg_str
+
+
+ def _get_device_pkgs(self):
+ """
+ Only device packages have debian package header 'Modaliases'.
+
+ This method get packages with the key ``modaliases``.
+ Use the method instead of get_all_pkgs for performance issues.
+
+ :returns:
+ A list of dict , the dict is converted from debian package header.
+ """
+
+ _logger.info("Looking for packages providing modaliases")
+ _modalias_pkgs = {}
+ target_str = ""
+ result = {}
+
+ for pkg_str in self._gen_all_pkg_strs():
+
+ for pkg in self.extra_pkgs:
+ if pkg.pkg_name is None:
+ continue
+ pstr = "Package: {}".format(pkg.pkg_name)
+ if pstr in pkg_str:
+ _logger.info("Gathering information of package, {}".format(
+ pkg.pkg_name))
+ pkg.pkg = _headers_to_dist(pkg_str)
+ break
+ else:
+ if "Modaliases:" in pkg_str:
+ pkg = _headers_to_dist(pkg_str)
+
+ (modalias_header, pattern_str) = \
+ pkg['modaliases'].strip(")").split("(")
+ patterns = pattern_str.split(', ')
+ patterns.sort()
+ pkg['match_patterns'] = match_patterns(tuple(patterns))
+
+ with io.open("/var/lib/dpkg/info/{}.list".format(pkg['package']),
+ 'rt', encoding='UTF-8') as stream:
+ if "/dkms.conf" in stream.read():
+ pkg['unused_dkms'] = True
+ result[pkg['package']] = pkg
+ return result
+
+ def to_json(self):
+ return json.dumps({ "dkms": self.extra_pkgs, "non-dkms": self.pkgs }, default=lambda o: o.__dict__, sort_keys=True, indent=4)
+
+ def to_outline(self):
+ result = ""
+ for pkg in self.extra_pkgs:
+ if pkg.pkg is None:
+ continue
+ result = "{}\n{}_{}: {}".format(
+ result,
+ pkg.pkg_name,
+ pkg.pkg["version"],
+ pkg.install_mods)
+ for pkg_name, pkg in self.pkgs.items():
+ extra_str = ""
+ if "unused_dkms" in pkg:
+ extra_str = "- "
+ result = "{}\n{}{}_{}: {}".format(
+ result,
+ extra_str,
+ pkg_name,
+ pkg["version"],
+ pkg['match_patterns'])
+ return result
+
+
+class DebianPackageHandlerTest(unittest.TestCase):
+
+ """Test of DebianPackageHandler."""
+
+ _var_lib_dpkg_status = """\
+Package: foo
+Status: install ok installed
+Modaliases: hwe(pci:v000099DDd00000036sv*sd*bc*sc*i*)
+
+Package: foo1
+Status: install ok installed
+Modaliases: hwe(pci:v0000AADDd00000036sv*sd*bc*sc*i*)
+
+Package: foo2
+Status: install ok installed
+
+Package: foo3
+Status: install ok installed
+
+Package: bar
+Status: install ok installed
+
+"""
+
+
+ @mock.patch('io.open', mock.mock_open(read_data=_var_lib_dpkg_status))
+ @mock.patch('__main__.get_system_modaliases')
+ def test_get_pkgs(self, mock_get_system_modaliases):
+ """Test of test_get_pkgs."""
+ mock_get_system_modaliases.return_value = {
+ "pci": ["v0000168Cd00000036sv0000103Csd0000217Fbc02sc80i00",
+ "v00008086d00008C26sv0000103Csd000022D9bc0Csc03i20"],
+ "usb": ["v8087p8000d0005dc09dsc00dp01ic09isc00ip00in00",
+ "v1D6Bp0002d0319dc09dsc00dp00ic09isc00ip00in00"],
+ }
+
+ self.pkg_handler = DebianPackageHandler(
+ file_object=io.StringIO(self._var_lib_dpkg_status))
+ self.assertEqual(len(self.pkg_handler.pkgs), 2)
+
+
+class DeviceInfo(Command):
+
+ """
+ Implementation of the dkms-info command.
+
+ dkms_info provides device package information.
+
+ @EPILOG@
+
+ supported package types:
+ - dkms (Dynamic Kernel Module Support): provides kernel modules
+ - non-dkms: packages that with modaliases header and don't exist in dkms
+ list
+ - hardware modalias: might be unused dkms or config package
+ - oemalias: It is like non-dkms(w/o modalias)
+
+ supported output formats:
+ - onelines: one line per packages with matched modaliases information
+ - dumps: json output (fully information)
+ """
+
+ def register_arguments(self, parser):
+ """Register command line arguments for dkms-info."""
+
+ parser.add_argument(
+ '--format', default="onelines",
+ choices=["summary", "json"],
+ help=("Choose output format type: "
+ "summary (one line per packages) "
+ "or json (json format, fully information)"))
+
+ parser.add_argument(
+ '--output', default=None,
+ help=("Output filename to store the output date"))
+
+ def invoked(self, ctx):
+ """Invoke dkms-info."""
+ logging.basicConfig(
+ level=logging.INFO, format='[%(relativeCreated)06dms] %(message)s')
+ _logger.info("Started")
+
+ dkms_pkgs = []
+ for (dkms_name, dkms_ver) in get_installed_dkms_modules():
+ dkms_pkg = DkmsPackage(dkms_name, dkms_ver)
+ dkms_pkgs.append(dkms_pkg)
+
+ output = sys.stdout
+ if ctx.args.output is not None:
+ output = open(ctx.args.output, 'wt', encoding='UTF-8')
+
+ pkg_handler = DebianPackageHandler(extra_pkgs=dkms_pkgs)
+ if ctx.args.format == "summary":
+ output.write(pkg_handler.to_outline())
+ else:
+ output.write(pkg_handler.to_json())
+ _logger.info("Data collected")
+
+
+if __name__ == '__main__':
+ if '--test' in sys.argv:
+ sys.argv.remove('--test')
+ unittest.main()
+ else:
+ DeviceInfo().main()
diff --git a/bin/dmitest b/bin/dmitest
new file mode 100755
index 0000000..02e67a9
--- /dev/null
+++ b/bin/dmitest
@@ -0,0 +1,230 @@
+#!/usr/bin/env python3
+#
+# This file is part of Checkbox.
+#
+# Copyright 2014 Canonical Ltd.
+#
+# Checkbox is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License version 3,
+# as published by the Free Software Foundation.
+#
+# Checkbox is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with Checkbox. If not, see <http://www.gnu.org/licenses/>.
+#
+# Test for sane dmidecode output, particularly with respect to
+# various manufacturer information fields. Also, verify that the
+# system reports a chassis type that suits its class (server or
+# desktop/laptop)
+#
+# By: Rod Smith
+#
+# Parameters:
+# * --dmifile {filename} -- Input filename; optional. If specified,
+# file is used instead of dmidecode output.
+# * --test_versions -- Include chassis, system, and base boad version
+# numbers among tests.
+# * --test_serials -- Include system and base board serial numbers among
+# tests.
+# * 'desktop' or 'server' -- Type of system being tested.
+
+import re
+import subprocess
+import sys
+
+from argparse import ArgumentParser
+
+# Command to retrieve DMI information
+COMMAND = "dmidecode"
+
+
+def find_in_section(stream, section, label, strings, find_empty):
+ """ Search for a set of strings on a labelled line in a section
+ of the output.
+ :param stream:
+ input text stream (dmidecode output)
+ :param section:
+ section label in which to search (e.g., "Chassis Information")
+ :param label:
+ label of line on which to search (e.g., "Type:")
+ :param strings:
+ set of strings for which to search (e.g., ["server", "blade"])
+ :param find_empty:
+ if True, matches empty label field (as if '""' were passed as
+ a strings value)
+ :returns found:
+ True if one or more of strings was found on "label" line in "section"
+ section, or if "label" line is empty AND find_empty is True;
+ False otherwise
+ """
+ start_looking = False
+ found = False
+ empty = True
+ for line in stream:
+ if line == section:
+ start_looking = True
+ if start_looking and re.search(label, line):
+ print("\n" + section)
+ print(line.strip())
+ empty = len(line.strip()) == len(label)
+ if empty and find_empty:
+ found = True
+ for s in strings:
+ if re.search(s, line, flags=re.IGNORECASE):
+ found = True
+ break
+ start_looking = False
+
+ return found
+
+
+def main():
+ parser = ArgumentParser("dmitest")
+ parser.add_argument('system_type',
+ help="System type ('server' or 'desktop').",
+ choices=['server', 'desktop'])
+ parser.add_argument('--dmifile',
+ help="File to use in lieu of dmidecode.")
+ parser.add_argument('--test_versions', action="store_true",
+ help="Set to check version information")
+ parser.add_argument('--test_serials', action="store_true",
+ help="Set to check serial number information")
+ args = parser.parse_args()
+
+ try:
+ if args.dmifile:
+ print("Reading " + args.dmifile + " as DMI data")
+ stream = subprocess.check_output(['cat', args.dmifile], universal_newlines=True).splitlines()
+ else:
+ stream = subprocess.check_output(COMMAND, universal_newlines=True).splitlines()
+ except subprocess.CalledProcessError as err:
+ print("Error running {}: {}".format(COMMAND, err))
+ return 1
+
+ retval = 0
+
+ """
+ NOTE: System type is encoded in both the "Chassis Information" and "Base
+ Board Type" sections. The former is more reliable, so we do a whitelist
+ test on it -- the type MUST be specified correctly. The "Base Board Type"
+ section is less reliable, so rather than flag large numbers of systems
+ for having "Unknown", "Other", or something similar here, we just flag
+ it when it's at odds with the type passed on the command line. Also,
+ the "Base Board Type" may specify a desktop or tower system on servers
+ shipped in those form factors, so we don't flag that combination as an
+ error.
+ """
+ if args.system_type == 'server':
+ if not find_in_section(stream, 'Chassis Information', 'Type:',
+ ['server', 'rack mount', 'blade',
+ 'expansion chassis', 'multi-system'], False):
+ print("*** Incorrect or unknown server chassis type!")
+ retval = 1
+ if find_in_section(stream, 'Base Board Information', 'Type:',
+ ['portable', 'notebook', 'space-saving',
+ 'all in one'], False):
+ print("*** Incorrect server base board type!")
+ retval = 1
+ else:
+ if not find_in_section(stream, 'Chassis Information', 'Type:',
+ ['notebook', 'portable', 'laptop', 'desktop',
+ 'lunch box', 'space-saving', 'tower',
+ 'all in one', 'hand held'], False):
+ print("*** Incorrect or unknown desktop chassis type!")
+ retval = 1
+ if find_in_section(stream, 'Base Board Information', 'Type:',
+ ['rack mount', 'server', 'multi-system',
+ 'interconnect board'], False):
+ print("*** Incorrect desktop base board type!")
+ retval = 1
+
+ if find_in_section(stream, 'Chassis Information', 'Manufacturer:',
+ ['empty', 'chassis manufacture', 'null', 'insyde',
+ 'to be filled by o\.e\.m\.', 'no enclosure',
+ '\.\.\.\.\.'], True):
+ print("*** Invalid chassis manufacturer!")
+ retval = 1
+
+ if find_in_section(stream, 'System Information', 'Manufacturer:',
+ ['system manufacture', 'insyde', 'standard',
+ 'to be filled by o\.e\.m\.', 'no enclosure'], True):
+ print("*** Invalid system manufacturer!")
+ retval = 1
+
+ if find_in_section(stream, 'Base Board Information', 'Manufacturer:',
+ ['to be filled by o\.e\.m\.'], True):
+ print("*** Invalid base board manufacturer!")
+ retval = 1
+
+ if find_in_section(stream, 'System Information', 'Product Name:',
+ ['system product name', 'to be filled by o\.e\.m\.'],
+ False):
+ print("*** Invalid system product name!")
+ retval = 1
+
+ if find_in_section(stream, 'Base Board Information', 'Product Name:',
+ ['base board product name',
+ 'to be filled by o\.e\.m\.'], False):
+ print("*** Invalid base board product name!")
+ retval = 1
+
+ if args.test_versions:
+
+ if find_in_section(stream, 'Chassis Information', 'Version:',
+ ['to be filled by o\.e\.m\.', 'empty'],
+ False):
+ print("*** Invalid chassis version!")
+ retval = 1
+
+ if find_in_section(stream, 'System Information', 'Version:',
+ ['to be filled by o\.e\.m\.', '\(none\)',
+ 'null', 'system version', 'not applicable',
+ '\.\.\.\.\.'], False):
+ print("*** Invalid system information version!")
+ retval = 1
+
+ if find_in_section(stream, 'Base Board Information', 'Version:',
+ ['base board version',
+ 'empty', 'to be filled by o\.e\.m\.'], False):
+ print("*** Invalid base board version!")
+ retval = 1
+
+ if args.test_serials:
+
+ if find_in_section(stream, 'System Information', 'Serial Number:',
+ ['to be filled by o\.e\.m\.',
+ 'system serial number', '\.\.\.\.\.'],
+ False):
+ print("*** Invalid system information serial number!")
+ retval = 1
+
+ if find_in_section(stream, 'Base Board Information', 'Serial Number:',
+ ['n/a', 'base board serial number',
+ 'to be filled by o\.e\.m\.', 'empty', '\.\.\.'],
+ False):
+ print("*** Invalid base board serial number!")
+ retval = 1
+
+ if find_in_section(stream, 'Processor Information', 'Version:',
+ ['sample'], False):
+ print("*** Invalid processor information!")
+ retval = 1
+
+ # In review of dmidecode data on 10/23/2014, no conspicuous problems
+ # found in BIOS Information section's Vendor, Version, or Release Date
+ # fields. Therefore, no tests based on these fields have been written.
+
+ if retval:
+ print("\nFailed one or more tests (see above)")
+ else:
+ print("\nPassed all tests")
+
+ return retval
+
+
+if __name__ == "__main__":
+ sys.exit(main())
diff --git a/bin/fwts_test b/bin/fwts_test
index f48cae3..127858a 100755
--- a/bin/fwts_test
+++ b/bin/fwts_test
@@ -2,37 +2,80 @@
import sys
import re
-from time import time, sleep
+from time import time
from argparse import ArgumentParser, RawTextHelpFormatter, REMAINDER
from subprocess import Popen, PIPE
from syslog import *
+from distutils.spawn import find_executable
+import os
-TESTS = ['acpidump',
- 'acpiinfo',
- 'acpitables',
- 'apicedge',
- 'apicinstance',
- 'aspm',
- 'bios32',
- 'bios_info',
- 'checksum',
- 'cstates',
- 'dmar',
- 'ebda',
- 'fadt',
- 'hpet_check',
- 'klog',
- 'mcfg',
- 'method',
- 'mpcheck',
- 'msr',
- 'mtrr',
- 'nx',
- 'oops',
- 'uefirtvariable',
- 'version',
- 'virt',
- 'wmi']
+# These tests require user interaction and need either special handling
+# or skipping altogether (right now, we skip them but they're kept here
+# in case we figure out a way to present the interaction to the user).
+INTERACTIVE_TESTS = ['ac_adapter',
+ 'battery',
+ 'hotkey',
+ 'power_button',
+ 'brightness',
+ 'lid']
+# These are usually performed on normal certification runs
+CERT_TESTS = ['acpidump',
+ 'acpitables',
+ 'apicedge',
+ 'apicinstance',
+ 'aspm',
+ 'bios32',
+ 'checksum',
+ 'cstates',
+ 'dmar',
+ 'ebda',
+ 'fadt',
+ 'hpet_check',
+ 'klog',
+ 'mcfg',
+ 'method',
+ 'mpcheck',
+ 'msr',
+ 'mtrr',
+ 'nx',
+ 'oops',
+ 'uefirtvariable',
+ 'version',
+ 'virt',
+ 'wmi']
+# These are advanced tests that shouldn't affect certification status
+NON_CERT_TESTS = ['acpiinfo',
+ 'bios_info',
+ 'cmosdump',
+ 'crs',
+ 'crsdump',
+ 'csm',
+ 'dmicheck',
+ 'ebdadump',
+ 'fan',
+ 'gpedump',
+ 'hda_audio',
+ 'maxfreq',
+ 'maxreadreq',
+ 'memmapdump',
+ 'microcode',
+ 'mpdump',
+ 'os2gap',
+ 'osilinux',
+ 'pcc',
+ 'pciirq',
+ 'plddump',
+ 'pnp',
+ 'prsdump',
+ 'romdump',
+ 'securebootcert',
+ 'syntaxcheck',
+ 'uefidump',
+ 'uefirtmisc',
+ 'uefirttime',
+ 'uefivarinfo'
+ ]
+TESTS = sorted(CERT_TESTS + NON_CERT_TESTS)
def get_sleep_times(start_marker, end_marker, sleep_time, resume_time):
@@ -65,7 +108,7 @@ def get_sleep_times(start_marker, end_marker, sleep_time, resume_time):
if end_marker in loglist[idx]:
run = 'PASS'
break
-
+
sleep_elapsed = float(sleep_end_time) - float(sleep_start_time)
resume_elapsed = float(resume_end_time) - float(resume_start_time)
return (run, sleep_elapsed, resume_elapsed)
@@ -95,6 +138,20 @@ def fix_sleep_args(args):
return new_args
+def detect_progress_indicator():
+ # Return a command suitable for piping progress information to its
+ # stdin (invoked via Popen), in list format.
+ # Return zenity if installed and DISPLAY (--auto-close)
+ # return dialog if installed and no DISPLAY (width height)
+ display = os.environ.get('DISPLAY')
+ if display and find_executable('zenity'):
+ return ["zenity", "--progress", "--text", "Progress", "--auto-close"]
+ if not display and find_executable('dialog'):
+ return ["dialog", "--gauge", "Progress", "20", "70"]
+ # Return None if no progress indicator is to be used
+ return None
+
+
def main():
description_text = 'Tests the system BIOS using the Firmware Test Suite'
epilog_text = ('To perform sleep testing, you will need at least some of '
@@ -103,6 +160,7 @@ def main():
'--s3-delay-delta\n'
'--s3-device-check\n'
'--s3-device-check-delay\n'
+ '--s3-hybrid-sleep\n'
'--s3-max-delay\n'
'--s3-min-delay\n'
'--s3-multiple\n'
@@ -117,24 +175,26 @@ def main():
epilog=epilog_text,
formatter_class=RawTextHelpFormatter)
parser.add_argument('-l', '--log',
- default='/tmp/fwts_results.log',
- help=('Specify the location and name of the log file.\n'
- '[Default: %(default)s]'))
+ default='/tmp/fwts_results.log',
+ help=('Specify the location and name '
+ 'of the log file.\n'
+ '[Default: %(default)s]'))
parser.add_argument('-f', '--fail-level',
- default='high',
- choices=['critical', 'high', 'medium',
- 'low', 'none', 'aborted'],
- help=('Specify the FWTS failure level that will trigger '
- 'this script to return a failing exit code. For '
- 'example, if you chose "critical" as the '
- 'fail-level, this wrapper will NOT return a '
- 'failing exit code unless FWTS reports a test as '
- 'FAILED_CRITICAL. You will still be notified of '
- 'all FWTS test failures. [Default level: '
- '%(default)s]'))
+ default='high',
+ choices=['critical', 'high', 'medium',
+ 'low', 'none', 'aborted'],
+ help=('Specify the FWTS failure level that will '
+ 'trigger this script to return a failing exit '
+ 'code. For example, if you chose "critical" as '
+ 'the fail-level, this wrapper will NOT return '
+ 'a failing exit code unless FWTS reports a '
+ 'test as FAILED_CRITICAL. You will still be '
+ 'notified of all FWTS test failures. '
+ '[Default level: %(default)s]'))
sleep_args = parser.add_argument_group('Sleep Options',
- ('The following arguments are to only be used with the '
- '--sleep test option'))
+ ('The following arguments are to '
+ 'only be used with the '
+ '--sleep test option'))
sleep_args.add_argument('--sleep-time',
dest='sleep_time',
action='store',
@@ -155,25 +215,31 @@ def main():
action='append',
help='Name of the test to run.')
group.add_argument('-a', '--all',
- action='store_true',
- help='Run ALL FWTS automated tests (assumes -w and -c)')
+ action='store_true',
+ help='Run ALL FWTS automated tests (assumes -w and -c)')
group.add_argument('-s', '--sleep',
- nargs=REMAINDER,
- action='store',
- help=('Perform sleep test(s) using the additional\n'
- 'arguments provided after --sleep. All remaining\n'
- 'items on the command line will be passed \n'
- 'through to fwts for performing sleep tests. \n'
- 'For info on these extra fwts options, please \n'
- 'see the epilog below and \n'
- 'the --fwts-help option.'))
+ nargs=REMAINDER,
+ action='store',
+ help=('Perform sleep test(s) using the additional\n'
+ 'arguments provided after --sleep. Remaining\n'
+ 'items on the command line will be passed \n'
+ 'through to fwts for performing sleep tests. \n'
+ 'For info on these extra fwts options, please \n'
+ 'see the epilog below and \n'
+ 'the --fwts-help option.'))
group.add_argument('--fwts-help',
- dest='fwts_help',
- action='store_true',
- help='Display the help info for fwts itself (lengthy)')
+ dest='fwts_help',
+ action='store_true',
+ help='Display the help info for fwts itself (lengthy)')
group.add_argument('--list',
action='store_true',
help='List all tests in fwts.')
+ group.add_argument('--list-cert',
+ action='store_true',
+ help='List all certification tests in fwts.')
+ group.add_argument('--list-advanced',
+ action='store_true',
+ help='List all advanced tests in fwts.')
args = parser.parse_args()
tests = []
@@ -188,16 +254,16 @@ def main():
# Set correct fail level
if args.fail_level is not 'none':
args.fail_level = 'FAILED_%s' % args.fail_level.upper()
-
+
# Get our failure priority and create the priority values
- fail_levels = {'FAILED_CRITICAL':4,
- 'FAILED_HIGH':3,
- 'FAILED_MEDIUM':2,
- 'FAILED_LOW':1,
- 'FAILED_NONE':0,
- 'FAILED_ABORTED':-1}
+ fail_levels = {'FAILED_CRITICAL': 4,
+ 'FAILED_HIGH': 3,
+ 'FAILED_MEDIUM': 2,
+ 'FAILED_LOW': 1,
+ 'FAILED_NONE': 0,
+ 'FAILED_ABORTED': -1}
fail_priority = fail_levels[args.fail_level]
-
+
# Enforce only using sleep opts with --sleep
if args.sleep_time or args.resume_time and not args.sleep:
parser.error('--sleep-time and --resume-time only apply to the '
@@ -208,6 +274,12 @@ def main():
elif args.list:
print('\n'.join(TESTS))
return 0
+ elif args.list_cert:
+ print('\n'.join(CERT_TESTS))
+ return 0
+ elif args.list_advanced:
+ print('\n'.join(NON_CERT_TESTS))
+ return 0
elif args.test:
tests.extend(args.test)
elif args.all:
@@ -232,11 +304,11 @@ def main():
sleep_time_arg = '--sleep-time'
if resume_time_arg in args.sleep:
args.resume_time = int(args.sleep.pop(
- args.sleep.index(resume_time_arg) + 1))
+ args.sleep.index(resume_time_arg) + 1))
args.sleep.remove(resume_time_arg)
if sleep_time_arg in args.sleep:
args.sleep_time = int(args.sleep.pop(
- args.sleep.index(sleep_time_arg) + 1))
+ args.sleep.index(sleep_time_arg) + 1))
args.sleep.remove(sleep_time_arg)
# if we still haven't set a sleep or resume time, use defauts.
if not args.sleep_time:
@@ -245,31 +317,62 @@ def main():
args.resume_time = 3
tests.extend(args.sleep)
else:
- tests.extend(TESTS)
+ tests.extend(CERT_TESTS)
# run the tests we want
if args.sleep:
iteration_results = {}
print('=' * 20 + ' Test Results ' + '=' * 20)
+ progress_indicator = None
+ if detect_progress_indicator():
+ progress_indicator = Popen(detect_progress_indicator(),
+ stdin=PIPE)
for iteration in range(0, iterations):
timestamp = int(time())
start_marker = 'CHECKBOX SLEEP TEST START %s' % timestamp
end_marker = 'CHECKBOX SLEEP TEST STOP %s' % timestamp
syslog(LOG_INFO, '---' + start_marker + '---' + str(time()))
command = ('fwts -q --stdout-summary -r %s %s'
- % (args.log, ' '.join(tests)))
+ % (args.log, ' '.join(tests)))
results['sleep'] = (Popen(command, stdout=PIPE, shell=True)
.communicate()[0].strip()).decode()
syslog(LOG_INFO, '---' + end_marker + '---' + str(time()))
if 's4' not in args.sleep:
- iteration_results[iteration] = get_sleep_times(start_marker,
- end_marker, args.sleep_time,
- args.resume_time)
- print(' - Cycle %s: Status: %s Sleep Elapsed: %0.5f '
- 'Resume Elapsed: %0.5f' % (iteration,
- iteration_results[iteration][0],
- iteration_results[iteration][1],
- iteration_results[iteration][2]))
+ sleep_times = get_sleep_times(start_marker,
+ end_marker,
+ args.sleep_time,
+ args.resume_time)
+ iteration_results[iteration] = sleep_times
+ progress_tuple = (iteration,
+ iteration_results[iteration][0],
+ iteration_results[iteration][1],
+ iteration_results[iteration][2])
+ progress_string = (' - Cycle %s: Status: %s '
+ 'Sleep Elapsed: %0.5f '
+ 'Resume Elapsed: '
+ ' %0.5f' % progress_tuple)
+ progress_pct = "{}".format(int(100 * iteration / iterations))
+ if "zenity" in detect_progress_indicator():
+ progress_indicator.stdin.write("# {}\n".format(
+ progress_string).encode('utf-8'))
+ progress_indicator.stdin.write("{}\n".format(
+ progress_pct).encode('utf-8'))
+ progress_indicator.stdin.flush()
+ elif "dialog" in detect_progress_indicator():
+ progress_indicator.stdin.write("XXX\n".encode('utf-8'))
+ progress_indicator.stdin.write(
+ progress_pct.encode('utf-8'))
+ progress_indicator.stdin.write(
+ "\nTest progress\n".encode('utf-8'))
+ progress_indicator.stdin.write(
+ progress_string.encode('utf-8'))
+ progress_indicator.stdin.write(
+ "\nXXX\n".encode('utf-8'))
+ progress_indicator.stdin.flush()
+ else:
+ print(progress_string)
+ progress_indicator.terminate()
+
if 's4' not in args.sleep:
average_times(iteration_results)
for run in iteration_results.keys():
@@ -304,41 +407,41 @@ def main():
print("WARNING: The following test cases were reported as critical\n"
"level failures by fwts. Please review the log at\n"
"%s for more information." % args.log)
- for test in critical_fails:
+ for test in critical_fails:
print(" - " + test)
if high_fails:
print("High Failures: %d" % len(high_fails))
print("WARNING: The following test cases were reported as high\n"
"level failures by fwts. Please review the log at\n"
"%s for more information." % args.log)
- for test in high_fails:
+ for test in high_fails:
print(" - " + test)
if medium_fails:
print("Medium Failures: %d" % len(medium_fails))
print("WARNING: The following test cases were reported as medium\n"
"level failures by fwts. Please review the log at\n"
"%s for more information." % args.log)
- for test in medium_fails:
+ for test in medium_fails:
print(" - " + test)
if low_fails:
print("Low Failures: %d" % len(low_fails))
print("WARNING: The following test cases were reported as low\n"
"level failures by fwts. Please review the log at\n"
"%s for more information." % args.log)
- for test in low_fails:
+ for test in low_fails:
print(" - " + test)
if passed:
print("Passed: %d" % len(passed))
- for test in passed:
+ for test in passed:
print(" - " + test)
if aborted:
print("Aborted Tests: %d" % len(aborted))
print("WARNING: The following test cases were aborted by fwts\n"
"Please review the log at %s for more information."
% args.log)
- for test in aborted:
+ for test in aborted:
print(" - " + test)
-
+
if args.fail_level is not 'none':
if fail_priority == fail_levels['FAILED_CRITICAL']:
if critical_fails:
diff --git a/bin/gateway_ping_test b/bin/gateway_ping_test
index 19d9f2b..87af5d3 100755
--- a/bin/gateway_ping_test
+++ b/bin/gateway_ping_test
@@ -1,45 +1,64 @@
#!/usr/bin/python3
+# This file is part of Checkbox.
+#
+# Copyright 2007-2014 Canonical Ltd.
+# Written by:
+# Brendan Donegan <brendan.donegan@canonical.com>
+# Daniel Manrique <daniel.manrique@canonical.com>
+# David Murphy <david.murphy@canonical.com>
+# Javier Collado <javier.collado@canonical.com>
+# Jeff Lane <jeffrey.lane@canonical.com>
+# Marc Tardif <marc.tardif@canonical.com>
+# Mathieu Trudel-Lapierre <mathieu.trudel-lapierre@canonical.com>
+# Zygmunt Krynicki <zygmunt.krynicki@canonical.com>
+#
+# Checkbox is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License version 3,
+# as published by the Free Software Foundation.
+#
+# Checkbox is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with Checkbox. If not, see <http://www.gnu.org/licenses/>.
+from gettext import gettext as _
+import argparse
+import errno
+import gettext
+import logging
import os
import re
-import sys
-
-import logging
import socket
import struct
import subprocess
-import gettext
+import sys
import time
-from gettext import gettext as _
-
-from argparse import ArgumentParser
-
-class Route(object):
- """Gets routing information from the system.
+class Route:
+ """
+ Gets routing information from the system.
"""
-
- # auxiliary functions
- def _hex_to_dec(self, string):
- """Returns the integer value of a hexadecimal string s
- """
- return int(string, 16)
def _num_to_dotted_quad(self, number):
- """Convert long int to dotted quad string
+ """
+ Convert long int to dotted quad string
"""
return socket.inet_ntoa(struct.pack("<L", number))
def _get_default_gateway_from_proc(self):
- """"Returns the current default gateway, reading that from /proc
"""
- logging.debug("Reading default gateway information from /proc")
+ Returns the current default gateway, reading that from /proc
+ """
+ logging.debug(_("Reading default gateway information from /proc"))
try:
- file = open("/proc/net/route")
- route = file.read()
+ with open("/proc/net/route", "rt") as stream:
+ route = stream.read()
except:
- logging.error("Failed to read def gateway from /proc")
+ logging.error(_("Failed to read def gateway from /proc"))
return None
else:
h = re.compile("\n(?P<interface>\w+)\s+00000000\s+"
@@ -47,24 +66,26 @@ class Route(object):
w = h.search(route)
if w:
if w.group("def_gateway"):
- return (self._num_to_dotted_quad(
- self._hex_to_dec(w.group("def_gateway"))))
+ return self._num_to_dotted_quad(
+ int(w.group("def_gateway"), 16))
else:
- logging.error("Could not find def gateway info in /proc")
+ logging.error(
+ _("Could not find def gateway info in /proc"))
return None
else:
- logging.error("Could not find def gateway info in /proc")
+ logging.error(_("Could not find def gateway info in /proc"))
return None
def _get_default_gateway_from_bin_route(self):
- """Get default gateway from /sbin/route -n
+ """
+ Get default gateway from /sbin/route -n
Called by get_default_gateway
and is only used if could not get that from /proc
"""
- logging.debug("Reading default gateway information from route binary")
- routebin = subprocess.getstatusoutput("export LANGUAGE=C; "
- "/usr/bin/env route -n")
-
+ logging.debug(
+ _("Reading default gateway information from route binary"))
+ routebin = subprocess.getstatusoutput(
+ "export LANGUAGE=C; " "/usr/bin/env route -n")
if routebin[0] == 0:
h = re.compile("\n0.0.0.0\s+(?P<def_gateway>[\w.]+)\s+")
w = h.search(routebin[1])
@@ -72,8 +93,7 @@ class Route(object):
def_gateway = w.group("def_gateway")
if def_gateway:
return def_gateway
-
- logging.error("Could not find default gateway by running route")
+ logging.error(_("Could not find default gateway by running route"))
return None
def get_hostname(self):
@@ -83,29 +103,25 @@ class Route(object):
t1 = self._get_default_gateway_from_proc()
if not t1:
t1 = self._get_default_gateway_from_bin_route()
-
return t1
def get_host_to_ping(interface=None, verbose=False, default=None):
- #Get list of all IPs from all my interfaces,
+ # Get list of all IPs from all my interfaces,
interface_list = subprocess.check_output(["ip", "-o", 'addr', 'show'])
-
reg = re.compile('\d: (?P<iface>\w+) +inet (?P<address>[\d\.]+)/'
'(?P<netmask>[\d]+) brd (?P<broadcast>[\d\.]+)')
# Will magically exclude lo because it lacks brd field
interfaces = reg.findall(interface_list.decode())
-
# ping -b the network on each one (one ping only)
# exclude the ones not specified in iface
for iface in interfaces:
if not interface or iface[0] == interface:
- #Use check_output even if I'll discard the output
- #looks cleaner than using .call and redirecting stdout to null
+ # Use check_output even if I'll discard the output
+ # looks cleaner than using .call and redirecting stdout to null
try:
- (subprocess
- .check_output(["ping", "-q", "-c", "1", "-b", iface[3]],
- stderr=subprocess.STDOUT))
+ subprocess.check_output(["ping", "-q", "-c", "1", "-b",
+ iface[3]], stderr=subprocess.STDOUT)
except subprocess.CalledProcessError:
pass
# If default host given, ping it as well,
@@ -117,157 +133,154 @@ def get_host_to_ping(interface=None, verbose=False, default=None):
stderr=subprocess.STDOUT)
except subprocess.CalledProcessError:
pass
-
ARP_POPULATE_TRIES = 10
num_tries = 0
-
while num_tries < ARP_POPULATE_TRIES:
- #Get output from arp -a -n to get known IPs
+ # Get output from arp -a -n to get known IPs
known_ips = subprocess.check_output(["arp", "-a", "-n"])
reg = re.compile('\? \((?P<ip>[\d.]+)\) at (?P<mac>[a-f0-9\:]+) '
'\[ether\] on (?P<iface>[\w\d]+)')
-
- #Filter (if needed) IPs not on the specified interface
+ # Filter (if needed) IPs not on the specified interface
pingable_ips = [pingable[0] for pingable in reg.findall(
known_ips.decode()) if not interface
or pingable[2] == interface]
-
# If the default given ip is among the remaining ones,
# ping that.
if default and default in pingable_ips:
if verbose:
- print("Desired ip address %s is reachable, using it" % default)
+ print(_(
+ "Desired ip address {0} is reachable, using it"
+ ).format(default))
return default
-
- #If not, choose another IP.
+ # If not, choose another IP.
address_to_ping = pingable_ips[0] if len(pingable_ips) else None
if verbose:
- print("Desired ip address %s is not reachable from %s. "
- % (default, interface))
- print("using %s instead." % address_to_ping)
-
+ print(_(
+ "Desired ip address {0} is not reachable from {1},"
+ " using {2} instead"
+ ).format(default, interface, address_to_ping))
if address_to_ping:
return address_to_ping
-
time.sleep(2)
num_tries += 1
-
# Wait time expired
return None
def ping(host, interface, count, deadline, verbose=False):
- command = "ping -c %s -w %s %s" % (count, deadline, host)
+ command = ["ping", str(host), "-c", str(count), "-w", str(deadline)]
if interface:
- command = ("ping -I%s -c %s -w %s %s"
- % (interface, count, deadline, host))
-
- reg = re.compile(r"(\d+) packets transmitted, (\d+) received, (\d+)% packet loss")
- ping_summary = None
-
- output = os.popen(command)
- for line in output.readlines():
+ command.append("-I{}".format(interface))
+ reg = re.compile(
+ r"(\d+) packets transmitted, (\d+) received, (\d+)% packet loss")
+ ping_summary = {'transmitted': 0, 'received': 0, 'pct_loss': 0}
+ try:
+ output = subprocess.check_output(command, universal_newlines=True)
+ except OSError as exc:
+ if exc.errno == errno.ENOENT:
+ # No ping command present;
+ # default exception message is informative enough.
+ print(exc)
+ else:
+ raise
+ except subprocess.CalledProcessError as excp:
+ # Ping returned fail exit code
+ print(_("ERROR: ping result: {0}").format(excp))
+ else:
if verbose:
- print(line.rstrip())
-
- received = re.findall(reg, line)
+ print(output)
+ received = re.findall(reg, output)
if received:
ping_summary = received[0]
-
- ping_summary={'transmitted': int(ping_summary[0]),
- 'received': int(ping_summary[1]),
- 'pct_loss': int(ping_summary[2])}
+ ping_summary = {
+ 'transmitted': int(ping_summary[0]),
+ 'received': int(ping_summary[1]),
+ 'pct_loss': int(ping_summary[2])}
return ping_summary
def main(args):
-
- gettext.textdomain("checkbox")
-
+ gettext.textdomain("2013.com.canonical.certification.checkbox")
+ gettext.bindtextdomain("2013.com.canonical.certification.checkbox",
+ os.getenv("CHECKBOX_PROVIDER_LOCALE_DIR", None))
default_count = 2
default_delay = 4
-
route = Route()
-
- parser = ArgumentParser()
- parser.add_argument("host",
- nargs='?',
- default=route.get_default_gateway(),
- help="Host to ping")
- parser.add_argument("-c", "--count",
- default=default_count,
- type=int,
- help="Number of packets to send.")
- parser.add_argument("-d", "--deadline",
- default=default_delay,
- type=int,
- help="Timeouts in seconds.")
- parser.add_argument("-t", "--threshold",
- default=0,
- type=int,
- help="Percentage of allowed packet loss before "
- "considering test failed. Defaults to 0 "
- "(meaning any packet loss will fail the test)")
- parser.add_argument("-v", "--verbose",
- action='store_true',
- help="Be verbose.")
- parser.add_argument("-I", "--interface",
- help="Interface to ping from.")
+ parser = argparse.ArgumentParser()
+ parser.add_argument(
+ "host", nargs='?', default=route.get_default_gateway(),
+ help=_("host to ping"))
+ parser.add_argument(
+ "-c", "--count", default=default_count, type=int,
+ help=_("number of packets to send"))
+ parser.add_argument(
+ "-d", "--deadline", default=default_delay, type=int,
+ help=_("timeout in seconds"))
+ parser.add_argument(
+ "-t", "--threshold", default=0, type=int,
+ help=_("allowed packet loss percentage (default: %(default)s)"))
+ parser.add_argument(
+ "-v", "--verbose", action='store_true', help=_("be verbose"))
+ parser.add_argument(
+ "-I", "--interface", help=_("use specified interface to send packets"))
args = parser.parse_args()
-
- #Ensure count and deadline make sense. Adjust them if not.
-
+ # Ensure count and deadline make sense. Adjust them if not.
if args.deadline != default_delay and args.count != default_count:
- #Ensure they're both consistent, and exit with a warning if
- #not, rather than modifying what the user explicitly set.
+ # Ensure they're both consistent, and exit with a warning if not,
+ # rather than modifying what the user explicitly set.
if args.deadline <= args.count:
- print("ERROR: not enough time for %s pings in %s seconds" %
- (args.count, args.deadline))
- return(1)
+ # FIXME: this cannot ever be translated correctly
+ print(_(
+ "ERROR: not enough time for {0} pings in {1} seconds"
+ ).format(args.count, args.deadline))
+ return 1
elif args.deadline != default_delay:
- #Adjust count according to delay.
+ # Adjust count according to delay.
args.count = args.deadline - 1
if args.count < 1:
args.count = 1
if args.verbose:
- print("Adjusting ping count to %s to fit in %s-second deadline" %
- (args.count, args.deadline))
+ # FIXME: this cannot ever be translated correctly
+ print(_(
+ "Adjusting ping count to {0} to fit in {1}-second deadline"
+ ).format(args.count, args.deadline))
else:
- #Adjust delay according to count
+ # Adjust delay according to count
args.deadline = args.count + 1
if args.verbose:
- print("Adjusting deadline to %s seconds to fit %s pings" %
- (args.deadline, args.count))
-
- #If given host is not pingable, override with something pingable.
- host = get_host_to_ping(interface=args.interface,
- verbose=args.verbose, default=args.host)
-
+ # FIXME: this cannot ever be translated correctly
+ print(_(
+ "Adjusting deadline to {0} seconds to fit {1} pings"
+ ).format(args.deadline, args.count))
+ # If given host is not pingable, override with something pingable.
+ host = get_host_to_ping(
+ interface=args.interface, verbose=args.verbose, default=args.host)
if args.verbose:
- print("Checking connectivity to %s" % host)
+ print(_("Checking connectivity to {0}").format(host))
ping_summary = None
if host:
ping_summary = ping(host, args.interface, args.count,
args.deadline, args.verbose)
-
- if ping_summary == None or ping_summary['received'] == 0:
+ if ping_summary is None or ping_summary['received'] == 0:
print(_("No Internet connection"))
return 1
elif ping_summary['transmitted'] != ping_summary['received']:
- print(_("Connection established, but lost {}% of packets".format(
- ping_summary['pct_loss'])))
+ print(_("Connection established, but lost {0}% of packets").format(
+ ping_summary['pct_loss']))
if ping_summary['pct_loss'] > args.threshold:
- print(_("FAIL: {}% packet loss is higher"
- "than {}% threshold").format(ping_summary['pct_loss'],
- args.threshold))
+ print(_(
+ "FAIL: {0}% packet loss is higher than {1}% threshold"
+ ).format(ping_summary['pct_loss'], args.threshold))
return 1
else:
- print(_("PASS: {}% packet loss is within {}% threshold").format(
- ping_summary['pct_loss'], args.threshold))
+ print(_(
+ "PASS: {0}% packet loss is within {1}% threshold"
+ ).format(ping_summary['pct_loss'], args.threshold))
return 0
else:
print(_("Internet connection fully established"))
return 0
+
if __name__ == "__main__":
sys.exit(main(sys.argv[1:]))
diff --git a/bin/get_make_and_model b/bin/get_make_and_model
new file mode 100755
index 0000000..bb5efd9
--- /dev/null
+++ b/bin/get_make_and_model
@@ -0,0 +1,35 @@
+#!/usr/bin/env python3
+
+import os.path
+import shlex
+from subprocess import check_output
+
+def print_data(key, value):
+ print("{}: {}".format(key, value))
+
+def main():
+ keys = {'Manufacturer': 'vendor',
+ 'Model': 'product',
+ 'Version': 'version'}
+
+ cmd = "lshw -C system"
+
+ out = check_output(shlex.split(cmd),
+ universal_newlines = True)
+ output = out.split('\n')
+
+ data = {}
+ for key in keys:
+ for line in output:
+ if keys[key] in line:
+ data[key] = line.split(':')[1].strip()
+ break
+ else:
+ data[key] = "NOT FOUND"
+
+ for key in data:
+ print_data(key, data[key])
+
+if __name__ == "__main__":
+ raise SystemExit(main())
+
diff --git a/bin/glob_test b/bin/glob_test
deleted file mode 100755
index df13e0f..0000000
--- a/bin/glob_test
+++ /dev/null
@@ -1,121 +0,0 @@
-#!/usr/bin/python
-from Globs import benchmarks, hwd
-import argparse
-import locale
-import logging
-import sys
-import os
-
-#Magic to override the _ function from gettext, since
-#it's overkill for our needs here.
-def underscore(string, *args, **kwargs):
- return(string)
-
-__builtins__._ = underscore
-
-class Application:
-
- def __init__(self, share_dir='', bench_dir='', width=800, height=600,
- time=5, repetitions=1, fullscreen=False, min_fps=60,
- ignore_problems=False):
- self.opts= {}
- self.opts['width'] = width
- self.opts['height'] = height
- self.opts['time'] = time
- self.opts['repetitions'] = repetitions
- self.opts['fullscreen'] = fullscreen
- self.min_fps = min_fps
- self.ignore_problems = ignore_problems
-
- self.share_dir = share_dir
- self.bench_dir = bench_dir
-
-
- def run(self):
- test_pass = True
-
- self.hwd = hwd.HWDetect()
- self.bm = benchmarks.Benchmarks(os.path.join(self.bench_dir,
- 'benchmarks'))
- ver_str = self.hwd.get_gl_info()['version']
- hwd_ext = self.hwd.get_gl_info()['extensions']
- bench_list = [name for name in self.bm.get_names() if name != 'Fake']
- for benchmark_name in bench_list :
- runnable = True
- if self.bm.check_ver(benchmark_name, ver_str) == False:
- logging.warning("%s requires OpenGL version %s, I have %s",
- benchmark_name,
- self.bm.get_info(benchmark_name)['glversion'],
- ver_str)
- runnable = False
- test_pass = False
-
- ext_list = self.bm.check_ext(benchmark_name, hwd_ext)
- if ext_list.__class__ == list: # Returned a list of missing exts
- missing_ext = ''
- for ext in ext_list:
- missing_ext += ext
- if ext_list.index(ext) != len(ext_list) - 1:
- missing_ext += ', '
- logging.warning("Missing extensions: %s",missing_ext)
- runnable = False
- test_pass = False
- if runnable:
- fps = self.bm.run(benchmark_name, self.opts)
- if fps is None:
- #oops, test failed to produce usable result!
- print("Test failed to produce FPS measurement.")
- print("Possible causes: OpenGL version too low/high")
- if self.ignore_problems:
- print("Ignoring this as requested")
- else:
- print("Considering this a FAIL test")
- test_pass = False
- else:
- print("{} {} fps".format(benchmark_name, fps))
- if ( self.min_fps > fps):
- print("(Failed to meet minimum {} FPS)".format(
- self.min_fps))
- test_pass = False
- return test_pass
-
-
-
-
-share_dir = '/usr/share/globs'
-locale_dir = '/usr/share/locale'
-bench_dir = '/usr/lib/globs'
-
-parser = argparse.ArgumentParser("Executes gl benchmarks non-interactively")
-parser.add_argument("--width", action='store',
- default=800, type=int)
-parser.add_argument("--height", action='store',
- default=600, type=int)
-parser.add_argument("--repetitions", action='store',
- default=1, type=int)
-parser.add_argument("--time", action='store',
- default=10, type=int)
-parser.add_argument("--ignore-problems", action='store_true',
- default=False, help=("If a test fails to "
- "produce a FPS rating, ignore it for global test "
- "outcome purposes"))
-parser.add_argument("--fullscreen", action='store_true',
- default=False)
-parser.add_argument("--min-fps", action='store',
- default=60.0, type=float, help=("If any of the benchmarks"
- "obtains less than this FPS, the test will be considered"
- "failed"))
-
-args = parser.parse_args()
-
-app = Application(share_dir, bench_dir, args.width, args.height,
- args.time, args.repetitions, args.fullscreen, args.min_fps,
- args.ignore_problems)
-
-if app.run():
- print("PASS")
- sys.exit(0)
-else:
- print("FAIL")
- sys.exit(1)
-
diff --git a/bin/gpu_test b/bin/gpu_test
index eaf4dda..028eedc 100755
--- a/bin/gpu_test
+++ b/bin/gpu_test
@@ -140,11 +140,10 @@ class Html5VideoThread(Thread):
def check_gpu(log=None):
if not log:
- log = '/var/log/kern.log'
- with open(log, 'rb') as f:
- if re.findall(r'gpu\s+hung', str(f.read()), flags=re.I):
- print("GPU hung Detected")
- return 1
+ log = subprocess.check_output(['dmesg'], universal_newlines=True)
+ if re.findall(r'gpu\s+hung', log, flags=re.I):
+ print("GPU hung Detected")
+ return 1
def main():
diff --git a/bin/graphics_driver b/bin/graphics_driver
index 102cb61..91568e1 100755
--- a/bin/graphics_driver
+++ b/bin/graphics_driver
@@ -190,6 +190,8 @@ class XorgLog(object):
# For NVIDIA
m = re.search(r'\(II\) (.*)\(\d+\): Setting mode "(.*?):', line)
+ if not m:
+ m = re.search(r'\(II\) (.*)\(\d+\): Setting mode "(NULL)"', line)
if m:
self.displays[display_name] = display
self.video_driver = m.group(1)
@@ -309,7 +311,7 @@ def is_laptop():
def hybrid_graphics_check(xlog):
'''Check for Hybrid Graphics'''
card_id1 = re.compile('.*0300: *(.+):(.+) \(.+\)')
- card_id2 = re.compile('.*0300: *(.+):(.+)')
+ card_id2 = re.compile('.*03..: *(.+):(.+)')
cards_dict = {'8086': 'Intel', '10de': 'NVIDIA', '1002': 'AMD'}
cards = []
drivers = []
diff --git a/bin/hdd_parking b/bin/hdd_parking
new file mode 100755
index 0000000..18151b4
--- /dev/null
+++ b/bin/hdd_parking
@@ -0,0 +1,74 @@
+#!/usr/bin/python3
+# -*- coding: utf-8 -*-
+#
+# hdd_parking
+#
+# This file is part of Checkbox.
+#
+# Copyright 2014 Canonical Ltd.
+#
+# Authors: Brendan Donegan <brendan.donegan@canonical.com>
+#
+# Checkbox is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License version 3,
+# as published by the Free Software Foundation.
+
+#
+# Checkbox is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with Checkbox. If not, see <http://www.gnu.org/licenses/>.
+
+"""
+This script verifies that a systems HDD protection capabilities
+are triggered when appropriate. There are many implementations
+of HDD protection from different OEMs, each implemented in a
+different way, so this script can only support implementations
+which are known to work and are testable. Currently the list
+of supported implementations is:
+
+- HDAPS (Lenovo)
+"""
+
+
+import sys
+import time
+
+from argparse import ArgumentParser
+from subprocess import Popen, PIPE
+
+TIMEOUT = 15.0
+
+def hdaps_test(run_time):
+ try:
+ hdapsd = Popen(['/usr/sbin/hdapsd'], stdout=PIPE, stderr=PIPE,
+ universal_newlines=True)
+ except OSError as err:
+ print("Unable to start hdapsd: {}".format(err))
+ return 1
+ time.sleep(float(run_time))
+ hdapsd.terminate()
+ # Look for parking message in hdapsd output.
+ stdout = hdapsd.communicate()[0]
+ print(stdout)
+ for line in stdout.split('\n'):
+ if line.endswith('parking'):
+ return 0
+ return 1
+
+def main():
+ # First establish the driver used
+ parser = ArgumentParser("Tests a systems HDD protection capabilities. "
+ "Requires the system to be moved by the tester.")
+ parser.add_argument('-t', '--timeout',
+ default=TIMEOUT,
+ help='The time allowed before the test fails.')
+ print('Starting HDD protection test - move the system around on '
+ 'all axis. No particular force should be required.')
+ return hdaps_test(parser.parse_args().timeout)
+
+if __name__ == "__main__":
+ sys.exit(main())
diff --git a/bin/lock_screen_watcher b/bin/lock_screen_watcher
index 1ba9779..b8dd886 100755
--- a/bin/lock_screen_watcher
+++ b/bin/lock_screen_watcher
@@ -2,6 +2,7 @@
import argparse
import sys
import subprocess
+import lsb_release
from gi.repository import GObject
@@ -20,13 +21,27 @@ class SceenSaverStatusHelper(threading.Thread):
self.quit = False
def query(self):
- p = subprocess.Popen(["gnome-screensaver-command", "-q"], stdout=subprocess.PIPE)
- stdout, stderr = p.communicate()
- # parse the stdout string from the command "gnome-screensaver-command -q"
- # the result should be "active" or "inactive"
- if "active" == stdout.decode().split(" ")[-1][0:-1] :
- print("the screensaver is active")
- self._loop.quit()
+ if (lsb_release.get_distro_information()["ID"] == "Ubuntu"):
+ if (lsb_release.get_distro_information()["CODENAME"] == "trusty"):
+ # trusty uses login screen as screen saver
+ process_ps = subprocess.Popen(["ps", "aux"], stdout=subprocess.PIPE)
+ process_grep = subprocess.Popen(["grep",
+ "/usr/lib/unity/unity-panel-service --lockscreen-mode"],
+ stdin=process_ps.stdout,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.STDOUT)
+ process_ps.stdout.close()
+ stdout = process_grep.communicate()[0]
+ if (len(stdout.decode().split("\n")) == 3):
+ print("the screensaver is active")
+ self._loop.quit()
+ p = subprocess.Popen(["gnome-screensaver-command", "-q"], stdout=subprocess.PIPE)
+ stdout, stderr = p.communicate()
+ # parse the stdout string from the command "gnome-screensaver-command -q"
+ # the result should be "active" or "inactive"
+ if "active" == stdout.decode().split(" ")[-1].rstrip() :
+ print("the screensaver is active")
+ self._loop.quit()
def run(self):
while not self.quit:
diff --git a/bin/manage_compiz_plugin b/bin/manage_compiz_plugin
new file mode 100755
index 0000000..0b8e479
--- /dev/null
+++ b/bin/manage_compiz_plugin
@@ -0,0 +1,63 @@
+#!/usr/bin/env python3
+# This file is part of Checkbox.
+#
+# Copyright 2014-2015 Canonical Ltd.
+# Written by:
+# Daniel Manrique <roadmr@ubuntu.com>
+# Sylvain Pineau <sylvain.pineau@canonical.com>
+#
+# Checkbox is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License version 3,
+# as published by the Free Software Foundation.
+#
+# Checkbox is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with Checkbox. If not, see <http://www.gnu.org/licenses/>.
+"""
+manage_compiz_plugin
+====================
+
+This script allows enabling or disabling compiz plugins using
+gsettings. Changes take effect on the fly.
+"""
+
+from gettext import gettext as _
+import argparse
+import gettext
+import os
+import sys
+import subprocess
+import time
+
+PATH="org.compiz.core:/org/compiz/profiles/unity/plugins/core/"
+KEY="active-plugins"
+
+gettext.textdomain("2013.com.canonical.certification.checkbox")
+gettext.bindtextdomain("2013.com.canonical.certification.checkbox",
+ os.getenv("CHECKBOX_PROVIDER_LOCALE_DIR", None))
+
+plugins = eval(subprocess.check_output(["gsettings", "get", PATH, KEY]))
+
+parser = argparse.ArgumentParser(description=_("enable/disable compiz plugins"),
+ epilog=_("Available plugins: {}").format(plugins))
+parser.add_argument("plugin", type=str, help=_('Name of plugin to control'))
+parser.add_argument("action", type=str, choices=['enable', 'disable'],
+ help=_("What to do with the plugin"))
+
+args = parser.parse_args()
+
+if args.action == 'enable':
+ if args.plugin in plugins:
+ raise SystemExit(_("Plugin {} already enabled").format(args.plugin))
+ plugins.append(args.plugin)
+else:
+ if args.plugin not in plugins:
+ raise SystemExit(_("Plugin {} doesn't exist").format(args.plugin))
+ plugins.remove(args.plugin)
+subprocess.call(["gsettings", "set", PATH, KEY, str(plugins)])
+
+time.sleep(3)
diff --git a/bin/memory_compare b/bin/memory_compare
index 24d6c84..12a14ec 100755
--- a/bin/memory_compare
+++ b/bin/memory_compare
@@ -1,13 +1,36 @@
#!/usr/bin/env python3
+# -*- coding: utf-8 -*-
+#
+# This file is part of Checkbox.
+#
+# Copyright 2014 Canonical Ltd.
+#
+# Authors:
+# Brendan Donegan <brendan.donegan@canonical.com>
+# Jeff Lane <jeff.lane@canonical.com>
+# Sylvain Pineau <sylvain.pineau@canonical.com>
+#
+# Checkbox is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License version 3,
+# as published by the Free Software Foundation.
+#
+# Checkbox is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with Checkbox. If not, see <http://www.gnu.org/licenses/>.
import os
import sys
+from math import log, copysign
+from subprocess import check_output, PIPE
+from checkbox_support.helpers.human_readable_bytes import HumanReadableBytes
from checkbox_support.parsers.lshwjson import LshwJsonParser
from checkbox_support.parsers.meminfo import MeminfoParser
-from subprocess import check_output, PIPE
-THRESHOLD = 25
class LshwJsonResult:
@@ -20,8 +43,9 @@ class LshwJsonResult:
elif 'bank' in hardware['id']:
self.banks_reported += int(hardware.get('size', 0))
+
def get_installed_memory_size():
- lshw = LshwJsonParser(check_output(['lshw','-json'],
+ lshw = LshwJsonParser(check_output(['lshw', '-json'],
universal_newlines=True,
stderr=PIPE))
result = LshwJsonResult()
@@ -32,6 +56,7 @@ def get_installed_memory_size():
else:
return result.banks_reported
+
class MeminfoResult:
memtotal = 0
@@ -39,6 +64,7 @@ class MeminfoResult:
def setMemory(self, memory):
self.memtotal = memory['total']
+
def get_visible_memory_size():
parser = MeminfoParser(open('/proc/meminfo'))
result = MeminfoResult()
@@ -46,6 +72,7 @@ def get_visible_memory_size():
return result.memtotal
+
def get_threshold(installed_memory):
GB = 1024**3
if installed_memory <= 2 * GB:
@@ -55,36 +82,46 @@ def get_threshold(installed_memory):
else:
return 10
+
def main():
if os.geteuid() != 0:
print("This script must be run as root.", file=sys.stderr)
return 1
- installed_memory = get_installed_memory_size()
- visible_memory = get_visible_memory_size()
+ installed_memory = HumanReadableBytes(get_installed_memory_size())
+ visible_memory = HumanReadableBytes(get_visible_memory_size())
threshold = get_threshold(installed_memory)
- difference = installed_memory - visible_memory
+ difference = HumanReadableBytes(installed_memory - visible_memory)
try:
percentage = difference / installed_memory * 100
except ZeroDivisionError:
print("Results:")
- print("\t/proc/meminfo reports:\t%s kB" % (visible_memory / 1024), file=sys.stderr)
- print("\tlshw reports:\t%s kB" % (installed_memory / 1024), file=sys.stderr)
- print("\nFAIL: Either lshw or /proc/meminfo returned a memory size of 0 kB", file=sys.stderr)
+ print("\t/proc/meminfo reports:\t{}".format(visible_memory),
+ file=sys.stderr)
+ print("\tlshw reports:\t{}".format(installed_memory),
+ file=sys.stderr)
+ print("\nFAIL: Either lshw or /proc/meminfo returned a memory size "
+ "of 0 kB", file=sys.stderr)
return 1
-
+
if percentage <= threshold:
print("Results:")
- print("\t/proc/meminfo reports:\t%s kB" % (visible_memory / 1024))
- print("\tlshw reports:\t%s kB" % (installed_memory / 1024))
- print("\nPASS: Meminfo reports %d bytes less than lshw, a difference of %.2f%%. This is less than the %d%% variance allowed." % (difference, percentage, threshold))
+ print("\t/proc/meminfo reports:\t{}".format(visible_memory))
+ print("\tlshw reports:\t{}".format(installed_memory))
+ print("\nPASS: Meminfo reports %s less than lshw, a "
+ "difference of %.2f%%. This is less than the "
+ "%d%% variance allowed." % (difference, percentage, threshold))
return 0
else:
- print("Results")
- print("\t/proc/meminfo reports:\t%s kB" % (visible_memory / 1024), file=sys.stderr)
- print("\tlshw reports:\t%s kB" % (installed_memory / 1024), file=sys.stderr)
- print("\nFAIL: Meminfo reports %d bytes less than lshw, a difference of %.2f%%. Only a variance of %d%% in reported memory is allowed." % (difference, percentage, threshold), file=sys.stderr)
+ print("Results:", file=sys.stderr)
+ print("\t/proc/meminfo reports:\t{}".format(visible_memory),
+ file=sys.stderr)
+ print("\tlshw reports:\t{}".format(installed_memory), file=sys.stderr)
+ print("\nFAIL: Meminfo reports %d less than lshw, "
+ "a difference of %.2f%%. Only a variance of %d%% in "
+ "reported memory is allowed." %
+ (difference, percentage, threshold), file=sys.stderr)
return 1
if __name__ == "__main__":
diff --git a/bin/memory_test b/bin/memory_test
index c42b05f..4cd6008 100755
--- a/bin/memory_test
+++ b/bin/memory_test
@@ -131,7 +131,7 @@ class MemoryTest():
print("Running threaded memory test:")
run_time = 60 # sec.
- if not self.run_processes(processes, "%s -qpv -m%um -t%u" % (
+ if not self.run_processes(processes, "%s -qv -m%um -t%u" % (
self.threaded_memtest_script, self.process_memory, run_time)):
print("Multi-process, threaded memory Test FAILED",
file=sys.stderr)
@@ -166,7 +166,7 @@ class MemoryTest():
print("Running for more than free memory at %u MB for %u sec." % (
memory, run_time))
- command = "%s -qpv -m%um -t%u" % (
+ command = "%s -qv -m%um -t%u" % (
self.threaded_memtest_script, memory, run_time)
print("Command is: %s" % command)
process = self._command(command)
@@ -180,7 +180,7 @@ class MemoryTest():
# run again for 15 minutes
print("Running for free memory")
- process = self._command("%s -qpv" % self.threaded_memtest_script)
+ process = self._command("%s -qv" % self.threaded_memtest_script)
process.communicate()
if process.returncode != 0:
print("Free Memory Test failed", file=sys.stderr)
diff --git a/bin/network b/bin/network
index 6ab5b16..daef596 100755
--- a/bin/network
+++ b/bin/network
@@ -1,10 +1,11 @@
#!/usr/bin/env python3
"""
-Copyright (C) 2012-2014 Canonical Ltd.
+Copyright (C) 2012-2015 Canonical Ltd.
Authors
Jeff Marcom <jeff.marcom@canonical.com>
Daniel Manrique <roadmr@ubuntu.com>
+ Jeff Lane <jeff@ubuntu.com>
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License version 3,
@@ -23,10 +24,7 @@ from argparse import (
ArgumentParser,
RawTextHelpFormatter
)
-import configparser
import fcntl
-import ftplib
-from ftplib import FTP
import logging
import os
import re
@@ -34,15 +32,16 @@ import shlex
import socket
import struct
import subprocess
+import tempfile
from subprocess import (
CalledProcessError,
check_call,
- check_output
+ check_output,
+ STDOUT
)
import sys
import time
-logging.basicConfig(level=logging.DEBUG)
class IPerfPerformanceTest(object):
@@ -56,19 +55,41 @@ class IPerfPerformanceTest(object):
target,
fail_threshold,
protocol="tcp",
- mbytes="1024M"):
+ data_size="1"):
self.iface = Interface(interface)
self.target = target
self.protocol = protocol
self.fail_threshold = fail_threshold
-
- self.mbytes = mbytes
+ self.data_size = data_size
def run(self):
- cmd = "timeout 180 iperf -c {} -n {}".format(self.target, self.mbytes)
+ # if max_speed is 0, assume it's wifi and move on
+ if self.iface.max_speed == 0:
+ logging.warning("No max speed detected, assuming Wireless device "
+ "and continuing with test.")
+ # Otherwise, no sense in running if we're not running at full speed.
+ elif self.iface.max_speed > self.iface.link_speed:
+ logging.error("Detected link speed (%s) is lower than "
+ "detected max speed (%s)" %
+ (self.iface.link_speed, self.iface.max_speed))
+ logging.error("Check your device configuration and try again")
+ return 1
+
+ # Because we can vary the data size, we need to vary the timeout as
+ # well. It takes an estimated 15 minutes to send 1GB over 10Mb/s.
+ # 802.11b is 11 Mb/s. So we'll assume 1.2x15 minutes or 18 minutes
+ # or 1080 seconds per Gigabit. This will allow for a long period of
+ # time without timeout to catch devices that slow down, and also not
+ # prematurely end iperf on low-bandwidth devices.
+
+ self.timeout = 1080*int(self.data_size)
+
+ cmd = "timeout {} iperf -c {} -n {}G -i 1 -f m".format(
+ self.timeout, self.target, self.data_size)
logging.debug(cmd)
+ print("Starting iperf, this could take some time...")
try:
iperf_return = check_output(
shlex.split(cmd), universal_newlines=True)
@@ -84,46 +105,45 @@ class IPerfPerformanceTest(object):
# "pass through" whatever output iperf did manage to produce.
# When confronted with SIGTERM iperf should stop and output
# a partial (but usable) result.
- logging.warning("iperf timed out - this should be OK")
+ logging.info("iperf timed out - this should be OK")
iperf_return = iperf_exception.output
# 930 Mbits/sec\n'
- print(iperf_return)
- match = re.search(r'[\d\.]+\s([GM])bits', iperf_return)
- if match:
- throughput = match.group(0).split()[0]
- units = match.group(1)
- # self.iface.max_speed is always in mb/s, so we need to scale
- # throughput to match
- scaled_throughput = float(throughput)
- if units == 'G':
- scaled_throughput *= 1000
- if units == 'K':
- scaled_throughput /= 1000
+ logging.debug(iperf_return)
+ speeds = list(map(float, re.findall(r"([\w\.]+)\sMbits/sec",
+ iperf_return)))
+ invalid_speed = False
+ if speeds:
+ throughput = sum(speeds)/len(speeds)
try:
- percent = scaled_throughput / int(self.iface.max_speed) * 100
- except (ZeroDivisionError, TypeError) as error:
+ percent = throughput / int(self.iface.max_speed) * 100
+ except (ZeroDivisionError, TypeError):
# Catches a condition where the interface functions fine but
# ethtool fails to properly report max speed. In this case
# it's up to the reviewer to pass or fail.
- logging.error("Max Speed was not reported properly. Run "
- "ethtool and verify that the card is properly "
- "reporting its capabilities.")
- logging.error(error)
percent = 0
-
- print("\nTransfer speed: {} {}b/s".format(throughput, units))
- print("%3.2f%% of " % percent, end="")
- try:
- print("theoretical max %sMb/s\n" % int(self.iface.max_speed))
- except TypeError as error:
- logging.error("Max Speed was not reported properly. Run "
- "ethtool and verify that the card is properly "
- "reporting its capabilities.")
- logging.error(error)
-
+ invalid_speed = True
+
+ logging.info("Min Transfer speed: {} mb/s".format(min(speeds)))
+ logging.info("Max Transfer speed: {} mb/s".format(max(speeds)))
+ logging.info("Avg Transfer speed: {} mb/s".format(throughput))
+ if invalid_speed:
+ # If we have no link_speed (e.g. wireless interfaces don't
+ # report this), then we shouldn't penalize them because
+ # the transfer may have been reasonable. So in this case,
+ # we'll exit with a pass-warning.
+ logging.info("Unable to obtain maximum speed.")
+ logging.info("Considering the test as passed.")
+ return 0
+ # Below is guaranteed to not throw an exception because we'll
+ # have exited above if it did.
+ logging.info("{:03.2f}% of theoretical max {} Mb/s".format(percent,
+ int(self.iface.max_speed)))
if percent < self.fail_threshold:
logging.warn("Poor network performance detected")
+ logging.warn(" Transfer speed: {} mb/s".format(throughput))
+ logging.warn(" {:03.2f}% of theoretical max {}Mb/s\n".format(
+ percent, int(self.iface.max_speed)))
return 30
logging.debug("Passed benchmark")
@@ -132,152 +152,6 @@ class IPerfPerformanceTest(object):
return 1
-class FTPPerformanceTest(object):
- """Provides file transfer rate based information while
- using the FTP protocol and sending a file (DEFAULT=1GB)
- over the local or public network using a specified network
- interface on the host."""
-
- def __init__(
- self,
- target,
- username,
- password,
- interface,
- binary_size=1,
- file2send="ftp_performance_test"):
-
- self.target = target
- self.username = username
- self.password = password
- self.iface = Interface(interface)
- self.binary_size = binary_size
- self.file2send = file2send
-
- def _make_file2send(self):
- """
- Makes binary file to send over FTP.
- Size defaults to 1GB if not supplied.
- """
-
- logging.debug("Creating %sGB file", self.binary_size)
-
- file_size = (1024 * 1024 * 1024) * self.binary_size
- with open(self.file2send, "wb") as out:
- out.seek((file_size) - 1)
- out.write('\0'.encode())
-
- def send_file(self, filename=None):
- """
- Sends file over the network using FTP and returns the
- amount of bytes sent and delay between send and completed.
- """
-
- if filename is None:
- file = open(self.file2send, 'rb')
- filename = self.file2send
-
- send_time = time.time()
-
- try:
- logging.debug("Sending file")
- self.remote.storbinary("STOR " + filename, file, 1024)
- except (ftplib.all_errors) as send_failure:
- logging.error("Failed to send file to %s", self.target)
- logging.error("Reason: %s", send_failure)
- return 0, 0
-
- file.close()
-
- time_lapse = time.time() - send_time
- bytes_sent = os.stat(filename).st_size
-
- return bytes_sent, time_lapse
-
- def close_connection(self):
- """
- Close connection to remote FTP target
- """
- self.remote.close()
-
- def connect(self):
- """
- Connects to FTP target and set the current directory as /
- """
-
- logging.debug("Connecting to %s", self.target)
- try:
- self.remote = FTP(self.target)
- self.remote.set_debuglevel(2)
- self.remote.set_pasv(True)
- except socket.error as connect_exception:
- logging.error("Failed to connect to: %s", self.target)
- return False
-
- logging.debug("Logging in")
- logging.debug("{USER:%s, PASS:%s}", self.username, self.password)
-
- try:
- self.remote.login(self.username, self.password)
- except ftplib.error_perm as login_exception:
- logging.error("failed to log into target: %s", self.target)
- return False
-
- default_out_dir = ""
- self.remote.cwd(default_out_dir)
- return True
-
- def run(self):
-
- info = {
- "Interface": self.iface.interface,
- "HWAddress": self.iface.macaddress,
- "Duplex": self.iface.duplex_mode,
- "Speed": self.iface.max_speed,
- "Status": self.iface.status
- }
-
- logging.debug(info)
-
- if not os.path.isfile(self.file2send):
- self._make_file2send()
-
- # Connect to FTP target and send file
- connected = self.connect()
-
- if connected is False:
- return 3
-
- filesize, delay = self.send_file()
-
- # Remove created binary
- try:
- os.remove(self.file2send)
- except (IOError, OSError) as file_delete_error:
- logging.error("Could not remove previous ftp file")
- logging.error(file_delete_error)
-
- if connected and filesize > 0:
-
- logging.debug("Bytes sent (%s): %.2f seconds", filesize, delay)
-
- # Calculate transfer rate and determine pass/fail status
- mbs_speed = float(filesize / 131072) / float(delay)
- percent = (mbs_speed / int(info["Speed"])) * 100
- print("Transfer speed:")
- print("%3.2f%% of" % percent)
- print("theoretical max %smbs" % int(info["Speed"]))
-
- if percent < 40:
- logging.warn("Poor network performance detected")
- return 30
-
- logging.debug("Passed benchmark")
- else:
- print("Failed sending file via ftp")
- return 1
-
-
class StressPerformanceTest:
def __init__(self, interface, target):
@@ -334,7 +208,7 @@ class Interface(socket.socket):
try:
return open(os.path.join(self.dev_path, type)).read().strip()
except OSError:
- print("{}: Attribute not found".format(type))
+ logging.info("%s: Attribute not found", type)
@property
def ipaddress(self):
@@ -359,8 +233,37 @@ class Interface(socket.socket):
return socket.inet_ntoa(mask_data[20:24])
@property
+ def link_speed(self):
+ return int(self._read_data("speed"))
+
+ @property
def max_speed(self):
- return self._read_data("speed")
+ # Parse ethtool data for max speed since /sys/class/net/DEV/speed only
+ # reports link speed.
+
+ # Search for things that look like 100baseSX,
+ # 40000baseNX, 10000baseT
+ try:
+ ethinfo = check_output(['ethtool', self.interface],
+ universal_newlines=True,
+ stderr=STDOUT).split(' ')
+ except FileNotFoundError:
+ logging.warning('ethtool not found! Unable to get max speed')
+ ethinfo = None
+ except CalledProcessError as e:
+ logging.error('ethtool returned an error!')
+ logging.error(e.output)
+ ethinfo = None
+ finally:
+ expression = '(\\d+)(base)([A-Z]+)'
+ regex = re.compile(expression)
+ speeds = [0]
+ if ethinfo:
+ for i in ethinfo:
+ hit = regex.search(i)
+ if hit:
+ speeds.append(int(hit.group(1)))
+ return max(speeds)
@property
def macaddress(self):
@@ -379,91 +282,88 @@ class Interface(socket.socket):
return self._read_data("device/label")
-def get_test_parameters(args, environ, config_filename):
+def get_test_parameters(args, environ):
# Decide the actual values for test parameters, which can come
- # from one of three possible sources: a config file, command-line
+ # from one of two possible sources: command-line
# arguments, or environment variables.
# - If command-line args were given, they take precedence
# - Next come environment variables, if set.
- # - Last, values in the config file are used if present.
- params = {"test_target_ftp": None,
- "test_user": None,
- "test_pass": None,
- "test_target_iperf": None}
+ params = {"test_target_iperf": None}
- #First (try to) load values from config file
- config = configparser.SafeConfigParser()
-
- try:
- with open(config_filename) as config_file:
- config.readfp(config_file)
- params["test_target_ftp"] = config.get("FTP", "Target")
- params["test_user"] = config.get("FTP", "User")
- params["test_pass"] = config.get("FTP", "Pass")
- params["test_target_iperf"] = config.get("IPERF", "Target")
- except FileNotFoundError as err:
- pass # No biggie, we can still get configs from elsewhere
-
- # Next see if we have environment variables to override the config file
- # "partial" overrides are not allowed; if an env variable is missing,
- # we won't use this at all.
- if all([param.upper() in os.environ for param in params.keys()]):
- for key in params.keys():
- params[key] = os.environ[key.upper()]
+ # See if we have environment variables
+ for key in params.keys():
+ params[key] = os.environ.get(key.upper(), "")
# Finally, see if we have the command-line arguments that are the ultimate
- # override. Again, we will only override if we have all of them.
- if args.target and args.username and args.password:
- params["test_target_ftp"] = args.target
- params["test_user"] = args.username
- params["test_pass"] = args.password
+ # override.
+ if args.target:
params["test_target_iperf"] = args.target
return params
+def can_ping(the_interface, test_target):
+ working_interface = False
+ num_loops = 0
+ while (not working_interface) and (num_loops < 48):
+ working_interface = True
+
+ try:
+ with open(os.devnull, 'wb') as DEVNULL:
+ check_call(["ping", "-I", the_interface,
+ "-c", "1", test_target],
+ stdout=DEVNULL, stderr=DEVNULL)
+ except CalledProcessError as excp:
+ working_interface = False
+ logging.warning("Ping failure on %s (%s)", the_interface, excp)
+
+ if not working_interface:
+ time.sleep(5)
+ num_loops += 1
+
+ return working_interface
+
+
def interface_test(args):
- if not "test_type" in vars(args):
+ if not ("test_type" in vars(args)):
return
- # Determine whether to use the default or user-supplied config
- # file name.
- DEFAULT_CFG = "/etc/checkbox.d/network.cfg"
- if not "config" in vars(args):
- config_filename = DEFAULT_CFG
- else:
- config_filename = args.config
+ # Get the actual test data from one of two possible sources
+ test_parameters = get_test_parameters(args, os.environ)
- # Get the actual test data from one of three possible sources
- test_parameters = get_test_parameters(args, os.environ, config_filename)
-
- test_user = test_parameters["test_user"]
- test_pass = test_parameters["test_pass"]
if (args.test_type.lower() == "iperf" or
args.test_type.lower() == "stress"):
test_target = test_parameters["test_target_iperf"]
- else:
- test_target = test_parameters["test_target_ftp"]
# Validate that we got reasonable values
if not test_target or "example.com" in test_target:
# Default values found in config file
- logging.error("Please supply target via: %s", config_filename)
+ logging.error("Target server has not been supplied.")
+ logging.info("Configuration settings can be configured 3 different ways:")
+ logging.info("1- If calling the script directly, pass the --target option")
+ logging.info("2- Define the TEST_TARGET_IPERF environment variable")
+ logging.info("3- (If running the test via checkbox/plainbox, define the ")
+ logging.info("target in /etc/xdg/canonical-certification.conf)")
+ logging.info("Please run this script with -h to see more details on how to configure")
sys.exit(1)
# Testing begins here!
#
- # Check and make sure that interface is indeed connected
+ # Make sure that the interface is indeed connected
try:
- cmd = "ip link set dev %s up" % args.interface
- check_call(shlex.split(cmd))
+ check_call(["ip", "link", "set", "dev", args.interface, "up"])
except CalledProcessError as interface_failure:
- logging.error("Failed to use %s:%s", cmd, interface_failure)
+ logging.error("Failed to use %s:%s", args.interface, interface_failure)
return 1
- # Give interface enough time to get DHCP address
- time.sleep(10)
+ # Back up routing table, since network down/up process
+ # tends to trash it....
+ temp = tempfile.TemporaryFile()
+ try:
+ check_call(["ip", "route", "save", "table", "all"], stdout=temp)
+ except CalledProcessError as route_error:
+ logging.warning("Unable to save routing table: %s", route_error)
result = 0
# Stop all other interfaces
@@ -474,25 +374,25 @@ def interface_test(args):
for iface in extra_interfaces:
logging.debug("Shutting down interface:%s", iface)
try:
- cmd = "ip link set dev %s down" % iface
- check_call(shlex.split(cmd))
+ check_call(["ip", "link", "set", "dev", iface, "down"])
except CalledProcessError as interface_failure:
- logging.error("Failed to use %s:%s", cmd, interface_failure)
+ logging.error("Failed to shut down %s:%s",
+ iface, interface_failure)
result = 3
if result == 0:
- # Execute FTP transfer benchmarking test
- if args.test_type.lower() == "ftp":
- ftp_benchmark = FTPPerformanceTest(
- test_target, test_user, test_pass, args.interface)
-
- if args.filesize:
- ftp_benchmark.binary_size = int(args.filesize)
- result = ftp_benchmark.run()
-
- elif args.test_type.lower() == "iperf":
- iperf_benchmark = IPerfPerformanceTest(args.interface, test_target,
- args.fail_threshold)
+ # Ensure that interface is fully up by waiting until it can
+ # ping the test server
+ if not can_ping(args.interface, test_target):
+ logging.error("Can't ping test server on %s", args.interface)
+ return 1
+
+ # Execute requested networking test
+ if args.test_type.lower() == "iperf":
+ iperf_benchmark = IPerfPerformanceTest(args.interface, test_target,
+ args.fail_threshold)
+ if args.datasize:
+ iperf_benchmark.data_size = args.datasize
result = iperf_benchmark.run()
elif args.test_type.lower() == "stress":
@@ -503,12 +403,22 @@ def interface_test(args):
for iface in extra_interfaces:
logging.debug("Restoring interface:%s", iface)
try:
- cmd = "ip link set dev %s up" % iface
- check_call(shlex.split(cmd))
+ check_call(["ip", "link", "set", "dev", iface, "up"])
except CalledProcessError as interface_failure:
- logging.error("Failed to use %s:%s", cmd, interface_failure)
+ logging.error("Failed to restore %s:%s", iface, interface_failure)
result = 3
+ # Restore routing table to original state
+ temp.seek(0)
+ try:
+ # Harmless "RTNETLINK answers: File exists" messages on stderr
+ with open(os.devnull, 'wb') as DEVNULL:
+ check_call(["ip", "route", "restore"], stdin=temp,
+ stderr=DEVNULL)
+ except CalledProcessError as restore_failure:
+ logging.warning("Unable to restore routing table: %s", restore_failure)
+ temp.close()
+
return result
@@ -540,10 +450,10 @@ interface.
Example NIC information usage:
network info -i eth0 --max-speed
-For running ftp benchmark test:
-network test -i eth0 -t ftp
---target 192.168.0.1 --username USERID --password PASSW0RD
---filesize-2
+For running iperf test:
+network test -i eth0 -t iperf --target 192.168.0.1
+NOTE: The iperf test requires an iperf server running on the same network
+segment that the test machine is running on.
Configuration
=============
@@ -553,28 +463,22 @@ priorities:
1- Command-line parameters (see above).
2- Environment variables (example will follow).
-3- Configuration file (example will follow).
- Default config location is /etc/checkbox.d/network.cfg
+3- If run via checkbox/plainbox, /etc/xdg/checkbox-certification.conf
+ can have the below-mentioned environment variables defined in the
+ [environment] section. An example file is provided and can be simply
+ modified with the correct values.
Environment variables
=====================
-ALL environment variables must be defined, even if empty, for them to be
-picked up. The variables are:
-TEST_TARGET_FTP
-TEST_USER
-TEST_PASS
+The variables are:
TEST_TARGET_IPERF
example config file
===================
-[FTP]
+[environment]
+TEST_TARGET_IPERF = iperf-server.example.com
-Target: 192.168.1.23
-User: FTPUser
-Pass:PassW0Rd
-[IPERF]
-Target: 192.168.1.45
**NOTE**
"""
@@ -594,16 +498,14 @@ Target: 192.168.1.45
'-i', '--interface', type=str, required=True)
test_parser.add_argument(
'-t', '--test_type', type=str,
- choices=("ftp", "iperf", "stress"), default="ftp",
- help=("[FTP *Default*]"))
+ choices=("iperf", "stress"), default="iperf",
+ help=("[iperf *Default*]"))
test_parser.add_argument('--target', type=str)
test_parser.add_argument(
- '--username', type=str, help=("For FTP test only"))
- test_parser.add_argument(
- '--password', type=str, help=("For FTP test only"))
- test_parser.add_argument(
- '--filesize', type=str,
- help="Size (GB) of binary file to send **Note** for FTP test only")
+ '--datasize', type=str,
+ default="1",
+ help=("Amount of data to send. For iperf tests this will direct "
+ "iperf to send DATASIZE GB of data to the target."))
test_parser.add_argument(
'--config', type=str,
default="/etc/checkbox.d/network.cfg",
@@ -614,6 +516,9 @@ Target: 192.168.1.45
help=("IPERF Test ONLY. Set the failure threshold (Percent of maximum "
"theoretical bandwidth) as a number like 80. (Default is "
"%(default)s)"))
+ test_parser.add_argument(
+ '--debug', default=False, action="store_true",
+ help="Turn on verbose output")
# Sub info options
info_parser.add_argument(
@@ -623,6 +528,8 @@ Target: 192.168.1.45
info_parser.add_argument(
'--duplex-mode', default=False, action="store_true")
info_parser.add_argument(
+ '--link-speed', default=False, action="store_true")
+ info_parser.add_argument(
'--max-speed', default=False, action="store_true")
info_parser.add_argument(
'--ipaddress', default=False, action="store_true")
@@ -635,13 +542,24 @@ Target: 192.168.1.45
info_parser.add_argument(
'--status', default=False, action="store_true",
help=("displays connection status"))
+ info_parser.add_argument(
+ '--debug', default=False, action="store_true",
+ help="Turn on verbose output")
test_parser.set_defaults(func=interface_test)
info_parser.set_defaults(func=interface_info)
args = parser.parse_args()
-
- return args.func(args)
+
+ if args.debug:
+ logging.basicConfig(level=logging.DEBUG)
+ else:
+ logging.basicConfig(level=logging.INFO)
+
+ if 'func' not in args:
+ parser.print_help()
+ else:
+ return args.func(args)
if __name__ == "__main__":
diff --git a/bin/network_device_info b/bin/network_device_info
index b8c0267..f4b7911 100755
--- a/bin/network_device_info
+++ b/bin/network_device_info
@@ -200,8 +200,10 @@ def match_counts(nm_devices, udev_devices, devtype):
if udev.category == udevtype]
if len(nm_type_devices) != len(udev_type_devices):
print("ERROR: devices missing - udev showed %d %s devices, but "
- "NetworkManager saw %d devices in %s" % (len(udev_type_devices),
- udevtype, len(nm_type_devices), devtype), file=sys.stderr)
+ "NetworkManager saw %d devices in %s" %
+ (len(udev_type_devices), udevtype,
+ len(nm_type_devices), devtype),
+ file=sys.stderr)
return False
else:
return True
@@ -253,7 +255,7 @@ def main(args):
if not match_counts(nm_devices, udev_devices, "WiFi"):
return 1
- elif not match_counts(nm_devices, udev_devices, ("Ethernet","Modem")):
+ elif not match_counts(nm_devices, udev_devices, ("Ethernet", "Modem")):
return 1
else:
return 0
diff --git a/bin/network_info b/bin/network_info
index 2e5fae3..9c20b89 100755
--- a/bin/network_info
+++ b/bin/network_info
@@ -2,6 +2,7 @@
import os
import sys
+import subprocess
import socket
import fcntl
import struct
@@ -38,6 +39,13 @@ def get_ip_address(interface):
)[20:24])
+def get_ipv6_address(interface):
+ cmd = ['/sbin/ip', '-6', 'addr', 'show', 'dev', interface]
+ proc = subprocess.check_output(cmd, universal_newlines=True)
+ ipaddr = proc.split()[8].strip()
+ return ipaddr
+
+
def get_mac_address(interface):
address_file = os.path.join(SYS_PATH, interface, 'address')
@@ -56,9 +64,15 @@ def main(args):
print("Interface: %s" % interface)
print("Connected: %s" % connected)
try:
- print("IP: %s" % get_ip_address(interface))
+ print("IPv4: %s" % get_ip_address(interface))
+ except IOError:
+ print("IPv4: n/a")
+ try:
+ print("IPv6: %s" % get_ipv6_address(interface))
except IOError:
- print("IP: n/a")
+ print("IPv6: n/a")
+ except:
+ print("IPv6: n/a")
print("MAC: %s\n" % get_mac_address(interface))
return 0
diff --git a/bin/network_wait b/bin/network_wait
index 34e31fe..f0637d0 100755
--- a/bin/network_wait
+++ b/bin/network_wait
@@ -1,10 +1,15 @@
#!/bin/bash
-set -e
-
x=1
while true; do
- state=$(/usr/bin/nmcli -t -f STATE nm)
+ state=$(/usr/bin/nmcli -t -f STATE nm 2>/dev/null)
+ if [[ $? != 0 ]]; then
+ state=$(/usr/bin/nmcli -t -f STATE general 2>/dev/null)
+ rc=$?
+ if [[ $rc != 0 ]]; then
+ exit $rc
+ fi
+ fi
if [ "$state" = "connected" ]; then
echo $state
exit 0
diff --git a/bin/optical_write_test b/bin/optical_write_test
index d3f772b..b127667 100755
--- a/bin/optical_write_test
+++ b/bin/optical_write_test
@@ -44,7 +44,7 @@ check_md5(){
generate_iso(){
# Generate ISO image
echo "Creating ISO Image ..."
- genisoimage -r -J -o $ISO_NAME $SAMPLE_FILE
+ genisoimage -input-charset UTF-8 -r -J -o $ISO_NAME $SAMPLE_FILE
return $?
}
@@ -123,7 +123,7 @@ cleanup(){
cd $START_DIR
echo "Now residing in $PWD"
echo "Cleaning up ..."
- umount $MOUNT_PT
+ umount "$MOUNT_PT"
rm -fr $TEMP_DIR
echo "Ejecting spent media ..."
eject $OPTICAL_DRIVE
diff --git a/bin/piglit_test b/bin/piglit_test
deleted file mode 100755
index f03e628..0000000
--- a/bin/piglit_test
+++ /dev/null
@@ -1,98 +0,0 @@
-#!/usr/bin/python3
-
-import os
-import sys
-
-from argparse import ArgumentParser, FileType
-from subprocess import check_output, STDOUT
-
-
-class PiglitTests:
-
- def __init__(self, tests, name):
- self._tests = tests
- self._name = name
- self._results = {}
-
- def run(self):
- piglit_output = ''
-
- log_path = os.path.join(os.environ.get('CHECKBOX_DATA', '.'),
- 'piglit-results', self._name)
-
- run_command = ["piglit-run.py"]
-
- for test in self._tests:
- run_command.extend(["-t", test])
-
- run_command.extend(['/usr/share/piglit/tests/all.tests', log_path])
-
- piglit_output = check_output(run_command,
- universal_newlines=True,
- stderr=STDOUT)
- # TODO: Capture stderr instead?
- for line in piglit_output.split('\n'):
- if ' :: ' in line:
- self._results[line.split(' :: ')[-1].strip()] = \
- line.split(' :: ')[-2].strip()
-
- def get_tests_by_status(self, status):
- """
- Return a list of the tests with the given status in the last piglit run
- """
- tests = []
- for test in self._results:
- if self._results[test] == status:
- tests.append(test)
-
- return tests
-
-
-def main():
- parser = ArgumentParser("A wrapper script for the Piglit graphics test "
- "framework which runs the tests and parses the "
- "results.")
- parser.add_argument("--test", "-t",
- required=True,
- action='append',
- help="The expression used to get the tests to run.")
- parser.add_argument("--name", "-n",
- required=True,
- help="""A friendly name for this group of tests
- to use in reporting.""")
- parser.add_argument("--verbose", "-v",
- action='store_true',
- help='Have more verbose output')
- args = parser.parse_args()
-
- piglit = PiglitTests(args.test, args.name)
- piglit.run()
-
- passed_tests = piglit.get_tests_by_status('pass')
- print("%d tests passed" % len(passed_tests))
-
- if args.verbose:
- print("\n".join(["- %s" % test for test in passed_tests]))
-
- failed_tests = piglit.get_tests_by_status('fail')
- if failed_tests:
- print("%d tests failed" % len(failed_tests))
- print("\n".join(["- %s" % test for test in failed_tests]))
-
- crashed_tests = piglit.get_tests_by_status('crash')
- if crashed_tests:
- print("%d tests crashed" % len(crashed_tests))
- print("\n".join(["- %s" % test for test in crashed_tests]))
-
- skipped_tests = piglit.get_tests_by_status('skip')
- if skipped_tests:
- print("%d tests were skipped" % len(skipped_tests))
- print("\n".join(["- %s" % test for test in skipped_tests]))
-
- if len(failed_tests) > 0 or len(crashed_tests) > 0:
- return 1
- else:
- return 0
-
-if __name__ == "__main__":
- sys.exit(main())
diff --git a/bin/pm_test b/bin/pm_test
index 18c0def..827758f 100755
--- a/bin/pm_test
+++ b/bin/pm_test
@@ -7,6 +7,7 @@ import re
import shutil
import subprocess
import sys
+import signal
from argparse import ArgumentParser, SUPPRESS
from calendar import timegm
@@ -143,7 +144,11 @@ class PowerManagementOperation(object):
logging.info('Executing new {0!r} operation...'
.format(self.args.pm_operation))
logging.debug('Executing: {0!r}...'.format(command_str))
- subprocess.Popen(command_str, shell=True)
+ # The PM operation is performed asynchronously so let's just wait
+ # indefinitely until it happens and someone comes along to kill us.
+ # This addresses LP: #1413134
+ subprocess.check_call(command_str, shell=True)
+ signal.pause()
def summary(self):
"""
@@ -569,7 +574,7 @@ class SudoersConfigurator(object):
"""
logging.debug('Enabling user to execute test as root...')
command = ("sed -i -e '$a{mark}\\n"
- "{user} ALL=NOPASSWD: /usr/bin/python' "
+ "{user} ALL=NOPASSWD: /usr/bin/python3' "
"{filename}".format(mark=self.MARK,
user=self.user,
script=os.path.realpath(__file__),
@@ -597,7 +602,7 @@ class AutoStartFile(object):
[Desktop Entry]
Name={pm_operation} test
Comment=Verify {pm_operation} works properly
-Exec=sudo /usr/bin/python {script} -r {repetitions} -w {wakeup} --hardware-delay {hardware_delay} --pm-delay {pm_delay} --min-pm-time {min_pm_time} --max-pm-time {max_pm_time} --append --total {total} --start {start} --pm-timestamp {pm_timestamp} {silent} --log-level={log_level} --log-dir={log_dir} {pm_operation}
+Exec=sudo /usr/bin/python3 {script} -r {repetitions} -w {wakeup} --hardware-delay {hardware_delay} --pm-delay {pm_delay} --min-pm-time {min_pm_time} --max-pm-time {max_pm_time} --append --total {total} --start {start} --pm-timestamp {pm_timestamp} {silent} --log-level={log_level} --log-dir={log_dir} {pm_operation}
Type=Application
X-GNOME-Autostart-enabled=true
Hidden=false
@@ -617,6 +622,11 @@ Hidden=false
autostart_directory = os.path.join(config_directory, 'autostart')
if not os.path.exists(autostart_directory):
os.makedirs(autostart_directory)
+ user_id = os.getenv('PKEXEC_UID') or os.getenv('SUDO_UID')
+ group_id = os.getenv('PKEXEC_UID') or os.getenv('SUDO_GID')
+ if user_id:
+ os.chown(config_directory, int(user_id), int(group_id))
+ os.chown(autostart_directory, int(user_id), int(group_id))
basename = '{0}.desktop'.format(os.path.basename(__file__))
self.desktop_filename = os.path.join(autostart_directory,
diff --git a/bin/pts_run b/bin/pts_run
index 7b4440e..76921e8 100755
--- a/bin/pts_run
+++ b/bin/pts_run
@@ -9,6 +9,10 @@ echo -e "Y\nn\nn" | phoronix-test-suite > /dev/null
# Disable batch result saving and all test options selection
echo -e "n\nn" | phoronix-test-suite batch-setup > /dev/null
+# Don't show the browser after each test.
+# The implementation is a bit hacky but works.
+phoronix-test-suite user-config-set DefaultBrowser=/bin/true
+
# Run each test only one time
export FORCE_TIMES_TO_RUN=1
diff --git a/bin/recovery_info b/bin/recovery_info
new file mode 100755
index 0000000..4aa03cb
--- /dev/null
+++ b/bin/recovery_info
@@ -0,0 +1,391 @@
+#!/usr/bin/env python3
+# Copyright 2015 Canonical Ltd.
+# Written by:
+# Shawn Wang <shawn.wang@canonical.com>
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License version 3,
+# as published by the Free Software Foundation.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+"""Show the recovery partition information for the preinstalled OS."""
+
+import os
+import re
+import subprocess
+import sys
+import tempfile
+import unittest
+import xml.dom.minidom as minidom
+
+from guacamole import Command
+
+try:
+ from unittest import mock
+except ImportError:
+ from plainbox.vendor import mock
+
+RECOVERY_PACKAGES = ["dell-recovery", "ubuntu-recovery"]
+
+
+def get_recovery_package():
+ """
+ Test with RECOVERY_PACKAGES.
+
+ to check recovery application is installed or not
+
+ :return:
+ string of package_version or None
+ """
+ for pkg in RECOVERY_PACKAGES:
+ output = subprocess.check_output(["apt-cache", "policy", pkg],
+ universal_newlines=True)
+ for line in output.split("\n"):
+ if line.startswith(" Installed:"):
+ ver = line.split(": ")[1]
+ return "{}_{}".format(pkg, ver.strip())
+ return None
+
+
+RECOVERY_LABELS = {"HP_TOOLS": "HP",
+ "PQSERVICE": "UBUNTU",
+ "BACKUP": "TEST",
+ "INSTALL": "DELL",
+ "OS": "DELL",
+ "RECOVERY": "DELL"}
+
+
+_escape_pattern = re.compile(r'\\x([0-9a-fA-F][0-9a-fA-F])')
+
+
+def lsblk_unescape(label):
+ """Un-escape text escaping done by lsblk(8)."""
+ return _escape_pattern.sub(
+ lambda match: chr(int(match.group(1), 16)), label)
+
+
+def get_recovery_partition():
+ """
+ Get the type and location of the recovery partition.
+
+ :return:
+ (recovery_type, recovery_partition) or None
+
+ Use lsblk(8) to inspect available block devices looking
+ for a partition with FAT or NTFS and a well-known label.
+ """
+ cmd = ['lsblk', '-o', 'TYPE,FSTYPE,NAME,LABEL', '--raw']
+ for line in subprocess.check_output(cmd).splitlines()[1:]:
+ type, fstype, name, label = line.split(b' ', 3)
+ # Skip everything but partitions
+ if type != b'part':
+ continue
+ # Skip everything but FAT and NTFS
+ if fstype != b'vfat' and fstype != b'ntfs':
+ continue
+ label = lsblk_unescape(label.decode('utf-8'))
+ recovery_type = RECOVERY_LABELS.get(label)
+ # Skip unknown labels
+ if recovery_type is None:
+ continue
+ recovery_partition = '/dev/{}'.format(name.decode('utf-8'))
+ return (recovery_type, recovery_partition)
+
+
+class FunctionTests(unittest.TestCase):
+
+ """Tests for several functions."""
+
+ @mock.patch('subprocess.check_output')
+ def test_get_recovery_package(self, mock_subprocess_check_output):
+ """Smoke test for get_recovery_package()."""
+ mock_subprocess_check_output.return_value = """\
+dell-recovery:
+ Installed: 1.11
+ Candidate: 1.11
+ Version table:
+ 1.11
+ 500 https://archive/cesg-mirror/ test/public amd64 Packages
+"""
+ self.assertEqual(get_recovery_package(),
+ "dell-recovery_1.11")
+
+ @mock.patch('subprocess.check_output')
+ def test_get_recovery_partition(self, mock_subprocess_check_output):
+ """Smoke test for get_recovery_partition()."""
+ mock_subprocess_check_output.return_value = (
+ b'TYPE FSTYPE NAME LABEL\n'
+ b'disk linux_raid_member sda fx:2x250GB\n'
+ b'raid1 bcache md127 \n'
+ b'disk ext4 bcache0 Ultra\n'
+ b'disk linux_raid_member sdb fx:2x250GB\n'
+ b'raid1 bcache md127 \n'
+ b'disk ext4 bcache0 Ultra\n'
+ b'disk sdc \n'
+ b'part btrfs sdc1 vol1\n'
+ b'disk sdd \n'
+ b'part ntfs sdd1 Windows\x208.1\n'
+ b'part sdd2 \n'
+ b'part ext4 sdd5 Utopic\n'
+ b'part swap sdd6 \n'
+ b'disk bcache sde \n'
+ b'disk ext4 bcache0 Ultra\n'
+ b'disk sdf \n'
+ b'part ntfs sda3 RECOVERY\n')
+ self.assertEqual(get_recovery_partition(), ("DELL", "/dev/sda3"))
+
+ def test_lsblk_unescape(self):
+ """Smoke tests for lsblk_unescape()."""
+ self.assertEqual(lsblk_unescape('Windows\\x208.1'), 'Windows 8.1')
+ self.assertEqual(lsblk_unescape('Windows XP'), 'Windows XP')
+
+
+class MountedPartition(object):
+
+ """
+ Mount Manager to mount partition on tempdir.
+
+ e.g.
+ with MountedPartition("/dev/sda1") as tmp:
+ print("This is the mount point: {}".format(tmp))
+ do_stuff()
+ """
+
+ def __init__(self, part):
+ """
+ Prepare the mntdir point.
+
+ :param part: string of the partition device file, like /dev/sda2
+ """
+ self.part = part
+ self.mntdir = tempfile.mkdtemp()
+
+ def __enter__(self):
+ """
+ __enter__ method for python's with statement.
+
+ Mount the partition device to the mntdir.
+ """
+ cmd = ["mount", self.part, self.mntdir]
+ subprocess.check_output(cmd, universal_newlines=True)
+ return self.mntdir
+
+ def __exit__(self, type, value, traceback):
+ """
+ __exit__ method for python's with statement.
+
+ Unmount and remove the mntdir.
+ """
+ subprocess.check_output(["umount", self.mntdir],
+ universal_newlines=True)
+ os.rmdir(self.mntdir)
+
+
+class MountedPartitionTests(unittest.TestCase):
+
+ """Unittest of MountedPartition."""
+
+ @mock.patch('subprocess.check_output')
+ def test_with_of_MountedPartition(self, mock_subprocess_check_output):
+ """Test mount point."""
+ test_dir = ""
+ with MountedPartition("/dev/test") as tmp:
+ test_dir = tmp
+ self.assertTrue(os.path.exists(test_dir))
+ mock_subprocess_check_output.assert_has_calls(
+ [mock.call(['mount', '/dev/test', test_dir],
+ universal_newlines=True)])
+ self.assertFalse(os.path.exists(test_dir))
+ mock_subprocess_check_output.assert_has_calls(
+ [mock.call(['umount', test_dir],
+ universal_newlines=True)])
+
+
+class RecoveryVersion(Command):
+
+ """
+ print the version of recovery image.
+
+ @EPILOG@
+
+ This commands prints information such as:
+
+ image_version: xxx
+ bto_version: REV_xxx.iso (dell only)
+ """
+
+ def invoked(self, ctx):
+ """
+ Guacamole method called when the command is invoked.
+
+ /etc/buildstamp is a image information file,
+ it created by the oem image builder.
+
+ oilpalm Fri, 20 Jun 2014 04:02:07 +0000
+ somerville-trusty-amd64-20140620-0
+
+ If /etc/buildstamp exist, print out the second line (image iso name).
+
+ For Dell-recovery partition, /etc/buildstamp shows base image info.
+ If recovery_partition/bto.xml,
+ print out the bto_version (read from xml file).
+ """
+ if os.path.isfile("/etc/buildstamp"):
+ with open('/etc/buildstamp', 'rt', encoding='UTF-8') as stream:
+ data = stream.readlines()
+ print("image_version: {}".format(data[1].strip()))
+
+ with MountedPartition(ctx.recovery_partition) as mntdir:
+ fname = "{}/bto.xml".format(mntdir)
+ if os.path.isfile(fname):
+ o = minidom.parse("{}/bto.xml".format(mntdir))
+ bto_version = o.getElementsByTagName("iso")[0].firstChild.data
+ print("bto_version: {}".format(bto_version))
+
+
+class RecoveryFile(Command):
+
+ """
+ display a single file from the recovery partition
+
+ This command can be used to ``cat`` any file from the recovery partition
+ """
+
+ def register_arguments(self, parser):
+ """
+ Guacamole method used by the argparse ingredient.
+
+ :param parser:
+ Argument parser (from :mod:`argparse`) specific to this command.
+ """
+ parser.add_argument('file', help='name of the file to display')
+
+ def invoked(self, ctx):
+ """
+ Guacamole method used by the command ingredient.
+
+ :param ctx:
+ The guacamole context object. Context provides access to all
+ features of guacamole. The argparse ingredient adds the ``args``
+ attribute to it. That attribute contains the result of parsing
+ command line arguments.
+ :returns:
+ The return code of the command. Guacamole translates ``None`` to a
+ successful exit status (return code zero).
+ """
+ with MountedPartition(ctx.recovery_partition) as mnt:
+ return subprocess.call([
+ 'cat', '--', os.path.join(mnt, ctx.args.file)])
+
+
+class RecoveryCheckType(Command):
+
+ """
+ test if the recovery partition is of the given type.
+
+ This command can be used for scripted tests, to see if the recovery
+ partition on the current system is of a concrete type or not (e.g.
+ DELL-specific)
+
+ @EPILOG@
+
+ The exit code is 0 if the recovery partition type matches and 1 otherwise.
+ """
+
+ def register_arguments(self, parser):
+ """
+ Guacamole method used by the argparse ingredient.
+
+ :param parser:
+ Argument parser (from :mod:`argparse`) specific to this command.
+ """
+ parser.add_argument(
+ 'type', help="expected type of the recovery partition")
+
+ def invoked(self, ctx):
+ """
+ Guacamole method used by the command ingredient.
+
+ :param ctx:
+ The guacamole context object. Context provides access to all
+ features of guacamole. The argparse ingredient adds the ``args``
+ attribute to it. That attribute contains the result of parsing
+ command line arguments.
+ :returns:
+ The return code of the command. Guacamole translates ``None`` to a
+ successful exit status (return code zero).
+ """
+ if ctx.recovery_type != ctx.args.type:
+ return 1
+
+
+class RecoveryInfo(Command):
+
+ """
+ Inspect the recovery partition.
+
+ This command can be used to inspect the recovery partition. It has several
+ sub-commands that do various tasks. If the system has no recovery
+ partition, the command exits with the error code 1.
+ """
+
+ sub_commands = (
+ ('version', RecoveryVersion),
+ ('file', RecoveryFile),
+ ('checktype', RecoveryCheckType),
+ )
+
+ def invoked(self, ctx):
+ """
+ Guacamole method used by the command ingredient.
+
+ :param ctx:
+ The guacamole context object. Context provides access to all
+ features of guacamole. The argparse ingredient adds the ``args``
+ attribute to it. That attribute contains the result of parsing
+ command line arguments.
+ :returns:
+ The return code of the command. Guacamole translates ``None`` to a
+ successful exit status (return code zero).
+ """
+ partition = get_recovery_partition()
+
+ if partition is None:
+ print("Recovery partition not found", file=sys.stderr)
+ return 1
+ (recovery_type, recovery_partition) = partition
+ ctx.recovery_partition = recovery_partition
+ ctx.recovery_type = recovery_type
+
+
+class RecoveryInfoTests(unittest.TestCase):
+
+ """Tests for RecoveryInfo."""
+
+ @mock.patch('__main__.get_recovery_package')
+ @mock.patch('__main__.get_recovery_partition')
+ def test_smoke(self, mock_get_recovery_partition,
+ mock_get_recovery_package):
+ """Smoke tests for running recovery_info."""
+ mock_get_recovery_partition.return_value = ("DELL", "/dev/sda3")
+ mock_get_recovery_package.return_value = "dell-recovery_1.11"
+ self.assertEqual(RecoveryInfo().main(argv=[], exit=False), 0)
+ self.assertEqual(
+ RecoveryInfo().main(argv=["checktype", "HP"], exit=False), 1)
+ self.assertEqual(
+ RecoveryInfo().main(argv=["checktype", "DELL"], exit=False), 0)
+
+
+if __name__ == '__main__':
+ if '--test' in sys.argv:
+ sys.argv.remove('--test')
+ unittest.main()
+ else:
+ RecoveryInfo().main()
diff --git a/bin/removable_storage_test b/bin/removable_storage_test
index 81c11f8..8e7732b 100755
--- a/bin/removable_storage_test
+++ b/bin/removable_storage_test
@@ -22,9 +22,13 @@ from checkbox_support.dbus.udisks2 import is_udisks2_supported
from checkbox_support.dbus.udisks2 import lookup_udev_device
from checkbox_support.dbus.udisks2 import map_udisks1_connection_bus
from checkbox_support.heuristics.udisks2 import is_memory_card
-from checkbox_support.parsers.udevadm import CARD_READER_RE, GENERIC_RE, FLASH_RE
+from checkbox_support.helpers.human_readable_bytes import HumanReadableBytes
+from checkbox_support.parsers.udevadm import CARD_READER_RE
+from checkbox_support.parsers.udevadm import GENERIC_RE
+from checkbox_support.parsers.udevadm import FLASH_RE
from checkbox_support.udev import get_interconnect_speed
from checkbox_support.udev import get_udev_block_devices
+from checkbox_support.udev import get_udev_xhci_devices
class ActionTimer():
@@ -99,6 +103,8 @@ class DiskTest():
self.rem_disks_memory_cards = {}
self.rem_disks_memory_cards_nm = {}
self.rem_disks_speed = {}
+ # LP: #1313581, TODO: extend to be rem_disks_driver
+ self.rem_disks_xhci = {}
self.data = ''
self.device = device
self.memorycard = memorycard
@@ -174,8 +180,8 @@ class DiskTest():
have both the filesystem and block device interfaces
"""
for udisks2_object_path, interfaces in udisks2_objects.items():
- if (UDISKS2_FILESYSTEM_INTERFACE in interfaces
- and UDISKS2_BLOCK_INTERFACE in interfaces):
+ if (UDISKS2_FILESYSTEM_INTERFACE in interfaces and
+ UDISKS2_BLOCK_INTERFACE in interfaces):
yield udisks2_object_path
# We need to know about all IO candidates,
# let's iterate over all the block devices reported by udisks2
@@ -275,14 +281,14 @@ class DiskTest():
parent_media = parent_props.Get(udisks, "DriveMedia")
if self.memorycard:
if (dev_bus != 'sdio'
- and not FLASH_RE.search(parent_media)
- and not CARD_READER_RE.search(parent_model)
- and not GENERIC_RE.search(parent_vendor)):
+ and not FLASH_RE.search(parent_media)
+ and not CARD_READER_RE.search(parent_model)
+ and not GENERIC_RE.search(parent_vendor)):
continue
else:
if (FLASH_RE.search(parent_media)
- or CARD_READER_RE.search(parent_model)
- or GENERIC_RE.search(parent_vendor)):
+ or CARD_READER_RE.search(parent_model)
+ or GENERIC_RE.search(parent_vendor)):
continue
dev_file = str(device_props.Get(udisks, "DeviceFile"))
dev_speed = str(device_props.Get(udisks,
@@ -297,6 +303,30 @@ class DiskTest():
self.rem_disks_nm[dev_file] = None
self.rem_disks_memory_cards_nm[dev_file] = None
+ def get_disks_xhci(self):
+ """
+ Compare
+ 1. the pci slot name of the devices using xhci
+ 2. the pci slot name of the disks,
+ which is usb3 disks in this case so far,
+ to make sure the usb3 disk does be on the controller using xhci
+ """
+ # LP: #1378724
+ udev_client = GUdev.Client()
+ # Get a collection of all udev devices corresponding to block devices
+ udev_devices = get_udev_block_devices(udev_client)
+ # Get a collection of all udev devices corresponding to xhci devices
+ udev_devices_xhci = get_udev_xhci_devices(udev_client)
+ for udev_device_xhci in udev_devices_xhci:
+ pci_slot_name = udev_device_xhci.get_property('PCI_SLOT_NAME')
+ for udev_device in udev_devices:
+ devpath = udev_device.get_property('DEVPATH')
+ if (self._compare_pci_slot_from_devpath(devpath,
+ pci_slot_name)):
+ self.rem_disks_xhci[
+ udev_device.get_property('DEVNAME')] = 'xhci'
+ return self.rem_disks_xhci
+
def mount(self):
passed_mount = {}
@@ -340,6 +370,23 @@ class DiskTest():
if not os.path.ismount(self.rem_disks_nm[disk]):
os.rmdir(self.rem_disks_nm[disk])
+ def _compare_pci_slot_from_devpath(self, devpath, pci_slot_name):
+ # LP: #1334991
+ # a smarter parser to get and validate a pci slot name from DEVPATH
+ # then compare this pci slot name to the other
+ dl = devpath.split('/')
+ s = set([x for x in dl if dl.count(x) > 1])
+ if ((pci_slot_name in dl)
+ and (dl.index(pci_slot_name) < dl.index('block'))
+ and (not(pci_slot_name in s))):
+ # 1. there is such pci_slot_name
+ # 2. sysfs topology looks like
+ # DEVPATH = ....../pci_slot_name/....../block/......
+ # 3. pci_slot_name should be unique in DEVPATH
+ return True
+ else:
+ return False
+
def main():
parser = argparse.ArgumentParser()
@@ -380,10 +427,17 @@ def main():
help='The number of random data files to generate')
parser.add_argument('-s', '--size',
action='store',
- type=int,
- default=1048576,
- help=("The size of the test data file to use "
- "in Bytes. Default is %(default)s"))
+ type=HumanReadableBytes,
+ default='1MiB',
+ help=("The size of the test data file to use. "
+ "You may use SI or IEC suffixes like: 'K', 'M',"
+ "'G', 'T', 'Ki', 'Mi', 'Gi', 'Ti', etc. Default"
+ " is %(default)s"))
+ parser.add_argument('--auto-reduce-size',
+ action='store_true',
+ default=False,
+ help=("Automatically reduce size to fit in the target"
+ "filesystem. Reducing until fits in 1MiB"))
parser.add_argument('-n', '--skip-not-mount',
action='store_true',
default=False,
@@ -393,6 +447,10 @@ def main():
help=("Memory cards devices on bus other than sdio "
"require this parameter to identify "
"them as such"))
+ parser.add_argument('--driver',
+ choices=['xhci_hcd'],
+ help=("Detect the driver of the host controller."
+ "Only xhci_hcd for usb3 is supported so far."))
args = parser.parse_args()
@@ -444,7 +502,7 @@ def main():
if errors_mount > 0:
print("There're total %d device(s) failed at mounting."
- % errors_mount)
+ % errors_mount)
errors += errors_mount
disks_all = dict(list(test.rem_disks.items())
@@ -452,17 +510,17 @@ def main():
if len(disks_all) > 0:
print("Found the following mounted %s partitions:"
- % ', '.join(args.device))
+ % ', '.join(args.device))
for disk, mount_point in disks_all.items():
supported_speed = test.rem_disks_speed[disk]
print(" %s : %s : %s bits/s" %
- (disk, mount_point, supported_speed),
- end="")
- if (args.min_speed
- and int(args.min_speed) > int(supported_speed)):
+ (disk, mount_point, supported_speed),
+ end="")
+ if (args.min_speed and
+ int(args.min_speed) > int(supported_speed)):
print(" (Will not test it, speed is below %s bits/s)" %
- args.min_speed, end="")
+ args.min_speed, end="")
print("")
@@ -472,13 +530,38 @@ def main():
if not args.min_speed or
int(test.rem_disks_speed[disk])
>= int(args.min_speed)}
+ if len(disks_eligible) == 0:
+ logging.error(
+ "No %s disks with speed higher than %s bits/s",
+ args.device, args.min_speed)
+ return 1
write_sizes = []
test_files = {}
+ disks_freespace = {}
+ for disk, path in disks_eligible.items():
+ stat = os.statvfs(path)
+ disks_freespace[disk] = stat.f_bfree * stat.f_bsize
+ smallest_freespace = min(disks_freespace.values())
+ desired_size = args.size
+ if desired_size > smallest_freespace:
+ if args.auto_reduce_size:
+ min_space = HumanReadableBytes("1MiB")
+ if smallest_freespace < min_space:
+ raise IOError("Not enough space. {} is required"
+ .format(min_space))
+ new_size = HumanReadableBytes(int(0.8 * smallest_freespace))
+ logging.warning("Automatically reducing test data size"
+ ". {} requested. Reducing to {}."
+ .format(desired_size, new_size))
+ desired_size = new_size
+ else:
+ raise IOError("Not enough space. {} is required"
+ .format(desired_size))
# Generate our data file(s)
for count in range(args.count):
- test_files[count] = RandomData(args.size)
+ test_files[count] = RandomData(desired_size)
write_sizes.append(os.path.getsize(
- test_files[count].tfile.name))
+ test_files[count].tfile.name))
total_write_size = sum(write_sizes)
try:
@@ -525,7 +608,7 @@ def main():
for file in target_file_list:
test.clean_up(file)
total_write_time = sum(write_times)
- avg_write_time = total_write_time / args.count
+ # avg_write_time = total_write_time / args.count
try:
avg_write_speed = ((
total_write_size / total_write_time)
@@ -547,7 +630,7 @@ def main():
(iteration_write_time / args.iterations))
try:
avg_write_speed = (iteration_write_size /
- iteration_write_time)
+ iteration_write_time)
except ZeroDivisionError:
avg_write_speed = 0.00
finally:
@@ -566,16 +649,63 @@ def main():
"Completed %s test iterations, but there were"
" errors", args.count)
return 1
- elif len(disks_eligible) == 0:
- logging.error(
- "No %s disks with speed higher than %s bits/s",
- args.device, args.min_speed)
- return 1
-
else:
- #Pass is not assured!
+ # LP: 1313581
+ # Try to figure out whether the disk
+ # is SuperSpeed USB and using xhci_hcd driver.
+ if (args.driver == 'xhci_hcd'):
+ # The speed reported by udisks is sometimes
+ # less than 5G bits/s, for example,
+ # it may be 705032705 bits/s
+ # So using
+ # 500000000
+ # = 500 M bits/s
+ # > 480 M bits/s ( USB 2.0 spec.)
+ # to make sure that it is higher USB version than 2.0
+ #
+ # int() for int(test.rem_disks_speed[disk])
+ # is necessary
+ # because the speed value of
+ # the dictionary rem_disks_speed is
+ # 1. str or int from _probe_disks_udisks2
+ # 2. int from _probe_disks_udisks1.
+ # This is really a mess. : (
+ print("\t\t--------------------------------")
+ if(500000000 < int(test.rem_disks_speed[disk])):
+ print("\t\tDevice Detected: SuperSpeed USB")
+ # Unlike rem_disks_speed,
+ # which must has the connect speed
+ # for each disk devices,
+ # disk devices may not use xhci as
+ # controller drivers.
+ # This will raise KeyError for no
+ # associated disk device was found.
+ xhci_disks = test.get_disks_xhci()
+ # pep8 style suggest to limit the try clause
+ # to the absolute minimum amount of code necessary
+ try:
+ disk_xhci_flag = xhci_disks[disk]
+ except KeyError:
+ print("\t\tDisk does not use xhci_hci.")
+ return 1
+ else:
+ if('xhci' == disk_xhci_flag):
+ print("\t\tDriver Detected: xhci_hcd")
+ else:
+ print("\t\tDisk does not use xhci_hci.")
+ logging.debug("disk_xhci_flag is not xhci")
+ return 1
+ else:
+ # Give it a hint for the detection failure.
+ # LP: #1362902
+ print(("\t\tNo SuperSpeed USB using xhci_hcd "
+ "was detected correctly."))
+ print(("\t\tHint: please use dmesg to check "
+ "the system status again."))
+ return 1
+ # Pass is not assured
if (not args.pass_speed or
- avg_write_speed >= args.pass_speed):
+ avg_write_speed >= args.pass_speed):
return 0
else:
print("FAIL: Average speed was lower than desired "
diff --git a/bin/rotation_test b/bin/rotation_test
index 8fcf727..5187401 100755
--- a/bin/rotation_test
+++ b/bin/rotation_test
@@ -45,8 +45,9 @@ def main():
for rot in rotations:
try:
status = rotate_screen(rotations[rot])
- except(xrandr.RRError, xrandr.UnsupportedRRError) as error:
+ except (xrandr.RRError, xrandr.UnsupportedRRError) as exc:
status = 1
+ error = exc
else:
error = 'N/A'
# Collect the status and the error message
diff --git a/bin/sleep_test b/bin/sleep_test
index f63ab06..088a7a1 100755
--- a/bin/sleep_test
+++ b/bin/sleep_test
@@ -1,11 +1,12 @@
-#!/usr/bin/python
+#!/usr/bin/python3
'''
Program to automate system entering and resuming from sleep states
-Copyright (C) 2010,2011 Canonical Ltd.
+Copyright (C) 2010-2014 Canonical Ltd.
Author:
Jeff Lane <jeffrey.lane@canonical.com>
+ Daniel Manrique <roadmr@ubuntu.com>
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License version 2,
@@ -20,13 +21,14 @@ You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
'''
-import os
-import sys
import logging
+import os
import re
-from subprocess import call
+import sys
+import syslog
+
from optparse import OptionParser, OptionGroup
-from syslog import *
+from subprocess import call
class ListDictHandler(logging.StreamHandler):
@@ -41,16 +43,20 @@ class ListDictHandler(logging.StreamHandler):
for msg in record.msg:
logger = logging.getLogger(record.name)
new_record = logger.makeRecord(record.name, record.levelno,
- record.pathname, record.lineno, msg, record.args,
- record.exc_info, record.funcName)
+ record.pathname, record.lineno,
+ msg, record.args,
+ record.exc_info,
+ record.funcName)
logging.StreamHandler.emit(self, new_record)
elif isinstance(record.msg, dict):
- for key, val in record.msg.iteritems():
+ for key, val in record.msg.items():
logger = logging.getLogger(record.name)
new_msg = '%s: %s' % (key, val)
new_record = logger.makeRecord(record.name, record.levelno,
- record.pathname, record.lineno, new_msg, record.args,
- record.exc_info, record.funcName)
+ record.pathname, record.lineno,
+ new_msg, record.args,
+ record.exc_info,
+ record.funcName)
logging.StreamHandler.emit(self, new_record)
else:
logging.StreamHandler.emit(self, record)
@@ -74,9 +80,9 @@ class SuspendTest():
future kernels.
'''
- states_fh = open('/sys/power/state', 'r', 0)
+ states_fh = open('/sys/power/state', 'rb', 0)
try:
- states = states_fh.read().split()
+ states = states_fh.read().decode('ascii').split()
finally:
states_fh.close()
logging.debug('The following sleep states were found:')
@@ -89,9 +95,9 @@ class SuspendTest():
def GetCurrentTime(self):
- time_fh = open('/sys/class/rtc/rtc0/since_epoch', 'r', 0)
+ time_fh = open('/sys/class/rtc/rtc0/since_epoch', 'rb', 0)
try:
- time = int(time_fh.read())
+ time = int(time_fh.read().decode('ascii'))
finally:
time_fh.close()
return time
@@ -110,13 +116,13 @@ class SuspendTest():
self.last_time = self.GetCurrentTime()
logging.debug('Current epoch time: %s' % self.last_time)
- wakealarm_fh = open('/sys/class/rtc/rtc0/wakealarm', 'w', 0)
+ wakealarm_fh = open('/sys/class/rtc/rtc0/wakealarm', 'wb', 0)
try:
- wakealarm_fh.write('0\n')
+ wakealarm_fh.write('0\n'.encode('ascii'))
wakealarm_fh.flush()
- wakealarm_fh.write('+%s\n' % time)
+ wakealarm_fh.write('+{}\n'.format(time).encode('ascii'))
wakealarm_fh.flush()
finally:
wakealarm_fh.close()
@@ -153,10 +159,10 @@ class SuspendTest():
def GetResults(self, mode, perf):
'''
- This will parse /var/log/messages for our start and end markers. Then
- it'll find a few key phrases that are part of the sleep and resume
- process, grab their timestamps, Bob's your Uncle and return a
- three-tuple consisting of: (PASS/FAIL,Sleep elapsed time, Resume
+ This will parse /var/log/messages for our start and end markers. Then
+ it'll find a few key phrases that are part of the sleep and resume
+ process, grab their timestamps, Bob's your Uncle and return a
+ three-tuple consisting of: (PASS/FAIL,Sleep elapsed time, Resume
elapsed time)
'''
# figure out our elapsed time
@@ -185,7 +191,7 @@ class SuspendTest():
sleep_end_time = re.split('[\[\]]',
loglist[idx - 1])[1].strip()
logging.debug('Sleep ended at %s' % sleep_end_time)
- resume_start_time = re.split('[\[\]]',
+ resume_start_time = re.split('[\[\]]',
loglist[idx])[1].strip()
logging.debug('Resume started at %s' % resume_start_time)
idx += 1
@@ -194,7 +200,7 @@ class SuspendTest():
loglist[idx])[1].strip()
logging.debug('Resume ended at %s' % resume_end_time)
if self.end_marker in loglist[idx]:
- logging.debug('End Marker found, run appears to '
+ logging.debug('End Marker found, run appears to '
'have completed')
run_complete = 'Pass'
break
@@ -206,7 +212,7 @@ class SuspendTest():
else:
if self.end_marker in loglist:
logging.debug('End Marker found, '
- 'run appears to have completed')
+ 'run appears to have completed')
run_complete = 'Pass'
sleep_elapsed = None
resume_elapsed = None
@@ -218,7 +224,7 @@ class SuspendTest():
Write a stamped marker to syslogd (will appear in /var/log/messages).
This is used to calculate the elapsed time for each iteration.
'''
- syslog(LOG_INFO, '---' + marker + '---')
+ syslog(syslog.LOG_INFO, '---' + marker + '---')
def CheckAlarm(self, mode):
'''
@@ -229,10 +235,10 @@ class SuspendTest():
the system did not wake by alarm IRQ, but by some other means.
'''
rtc = {}
- rtc_fh = open('/proc/driver/rtc', 'r', 0)
- alarm_fh = open('/sys/class/rtc/rtc0/wakealarm', 'r', 0)
+ rtc_fh = open('/proc/driver/rtc', 'rb', 0)
+ alarm_fh = open('/sys/class/rtc/rtc0/wakealarm', 'rb', 0)
try:
- rtc_data = rtc_fh.read().splitlines()
+ rtc_data = rtc_fh.read().decode('ascii').splitlines()
for item in rtc_data:
rtc_entry = item.partition(':')
rtc[rtc_entry[0].strip()] = rtc_entry[2].strip()
@@ -240,7 +246,7 @@ class SuspendTest():
rtc_fh.close()
try:
- alarm = int(alarm_fh.read())
+ alarm = int(alarm_fh.read().decode('ascii'))
except ValueError:
alarm = None
finally:
@@ -275,44 +281,44 @@ class SuspendTest():
def main():
usage = 'Usage: %prog [OPTIONS]'
parser = OptionParser(usage)
- group = OptionGroup(parser, 'This will not work for hibernat testing due' \
- ' to a kernel timestamp bug when doing an S4 ' \
+ group = OptionGroup(parser, 'This will not work for hibernat testing due'
+ ' to a kernel timestamp bug when doing an S4 '
'(hibernate/resume) sleep cycle')
group.add_option('-p', '--perf',
- action='store_true',
- default=False,
- help='Add some output that tells you how long it ' \
- 'takes to enter a sleep state and how long it ' \
- 'takes to resume.')
+ action='store_true',
+ default=False,
+ help='Add some output that tells you how long it '
+ 'takes to enter a sleep state and how long it '
+ 'takes to resume.')
parser.add_option('-i', '--iterations',
- action='store',
- type='int',
- metavar='NUM',
- default=1,
- help='The number of times to run the suspend/resume \
- loop. Default is %default')
+ action='store',
+ type='int',
+ metavar='NUM',
+ default=1,
+ help='The number of times to run the suspend/resume '
+ 'loop. Default is %default')
parser.add_option('-w', '--wake-in',
- action='store',
- type='int',
- metavar='NUM',
- default=60,
- dest='wake_time',
- help='Sets wake up time (in seconds) in the future \
- from now. Default is %default.')
+ action='store',
+ type='int',
+ metavar='NUM',
+ default=60,
+ dest='wake_time',
+ help='Sets wake up time (in seconds) in the future '
+ 'from now. Default is %default.')
parser.add_option('-s', '--sleep-state',
- action='store',
- default='mem',
- metavar='MODE',
- dest='mode',
- help='Sets the sleep state to test. Passing mem will \
- set the sleep state to Suspend-To-Ram or S3. Passing \
- disk will set the sleep state to Suspend-To-Disk or S4\
- (hibernate). Default sleep state is %default')
+ action='store',
+ default='mem',
+ metavar='MODE',
+ dest='mode',
+ help='Sets the sleep state to test. Passing mem will '
+ 'set the sleep state to Suspend-To-Ram or S3. Passing '
+ 'disk will set the sleep state to Suspend-To-Disk or S4 '
+ '(hibernate). Default sleep state is %default')
parser.add_option('-d', '--debug',
- action='store_true',
- default=False,
- help='Choose this to add verbose output for debug \
- purposes')
+ action='store_true',
+ default=False,
+ help='Choose this to add verbose output for debug \
+ purposes')
parser.add_option_group(group)
(options, args) = parser.parse_args()
options_dict = vars(options)
@@ -330,10 +336,10 @@ def main():
# Set up the logger
logger = logging.getLogger()
logger.setLevel(logging.DEBUG)
-
+
if options.debug:
handler.setLevel(logging.DEBUG)
-
+
logger.addHandler(handler)
logging.debug('Running with these options')
logging.debug(options_dict)
@@ -349,7 +355,7 @@ def main():
return 1
else:
logging.debug('%s sleep state supported, continuing test'
- % options.mode)
+ % options.mode)
# We run the following for the number of iterations requested
for iteration in range(0, options.iterations):
@@ -358,7 +364,7 @@ def main():
suspender.DoSuspend(options.mode)
run_count += 1
run_result[run_count] = suspender.GetResults(options.mode,
- options.perf)
+ options.perf)
if suspender.CheckAlarm(options.mode):
logging.debug('The alarm is still set')
@@ -367,7 +373,7 @@ def main():
resume_total = 0.0
logging.info('=' * 20 + ' Test Results ' + '=' * 20)
logging.info(run_result)
-
+
for k in run_result.iterkeys():
sleep_total += run_result[k][1]
resume_total += run_result[k][2]
@@ -381,7 +387,7 @@ def main():
if fail_count > 0:
logging.error('%s sleep/resume test cycles failed' %
- fail_count)
+ fail_count)
logging.error(run_result)
return 1
else:
diff --git a/bin/sleep_time_check b/bin/sleep_time_check
index 6c85c9f..d392769 100755
--- a/bin/sleep_time_check
+++ b/bin/sleep_time_check
@@ -21,18 +21,21 @@ def main():
action='store',
dest='resume_threshold',
type=float,
- default=3.00,
+ default=5.00,
help=('The max time a system should have taken to '
'resume from a sleep state. (Default: '
'%(default)s)'))
args = parser.parse_args()
-
+
try:
- file = open(args.filename)
- lines = file.readlines()
- finally:
- file.close()
+ with open(args.filename) as file:
+ lines = file.readlines()
+ except IOError as e:
+ print(e)
+ return False
+ sleep_time = None
+ resume_time = None
# find our times
for line in lines:
if "Average time to sleep" in line:
@@ -40,17 +43,21 @@ def main():
elif "Average time to resume" in line:
resume_time = float(line.split(':')[1].strip())
+ if sleep_time is None or resume_time is None:
+ print("ERROR: One or more times was not reported correctly")
+ return False
+
print("Average time to enter sleep state: %s seconds" % sleep_time)
print("Average time to resume from sleep state: %s seconds" % resume_time)
-
+
failed = False
if sleep_time > args.sleep_threshold:
- print("System failed to suspend in less than %s seconds" %
- args.sleep_threshold)
+ print("System failed to suspend in less than %s seconds" %
+ args.sleep_threshold)
failed = True
if resume_time > args.resume_threshold:
print("System failed to resume in less than %s seconds" %
- args.resume_threshold)
+ args.resume_threshold)
failed = True
if sleep_time <= 0.00 or resume_time <= 0.00:
print("ERROR: One or more times was not reported correctly")
diff --git a/bin/storage_test b/bin/storage_test
index 54e604f..ef19c5e 100755
--- a/bin/storage_test
+++ b/bin/storage_test
@@ -35,7 +35,21 @@ disk=/dev/$1
if [ -b $disk ]
then
echo "$disk is a block device"
- size=`parted -l -s | grep $disk | awk '{print $3}'`
+
+ #Add a check for warnings
+ WARN=$(parted -s ${disk} print | grep "^Warning.*${disk}.*[Rr]ead-only" 2>&1)
+ if [[ $? == 0 ]]
+ then
+ echo "Warning found in parted output:"
+ echo $WARN
+ echo "Aborting Test"
+ exit 1
+ fi
+
+ # Regex changed to better handle when $disk appears more than once
+ # in parted output (such as in warning messages or not caught in the
+ # check above)
+ size=`parted -l -s |grep "Disk.*${disk}" |awk '{print $3}'`
if [ -n "$size" ]
then
@@ -54,7 +68,7 @@ then
if [ $size_range == "KB" ]
then
- echo "$disk is too small to be functioning."
+ echo "$disk size reported in KB, seems to be too small for testing."
exit 1
elif [ $size_range == "MB" ]
then
@@ -64,7 +78,7 @@ then
then
run_bonnie $disk
else
- echo "$disk is too small to be functioning."
+ echo "$disk is too small to be used for testing."
exit 1
fi
else
diff --git a/bin/touchpad_driver_info b/bin/touchpad_driver_info
index 3f38968..431b874 100755
--- a/bin/touchpad_driver_info
+++ b/bin/touchpad_driver_info
@@ -15,7 +15,7 @@ class TouchResult:
attributes = {}
def addDevice(self, device):
- if getattr(device, 'category') == 'TOUCH':
+ if getattr(device, 'category') == 'TOUCHPAD':
self.attributes['driver'] = getattr(device, 'driver')
self.attributes['product'] = getattr(device, 'product')
diff --git a/bin/virtualization b/bin/virtualization
index 486a554..f1d043c 100755
--- a/bin/virtualization
+++ b/bin/virtualization
@@ -50,14 +50,137 @@ DEFAULT_TIMEOUT = 500
class XENTest(object):
pass
-
+# The "TAR" type is a tarball that contains both
+# a disk image and a kernel binary. This is useful
+# on architectures that don't (yet) have a bootloader
+# in the disk image that we can chain to, and instead
+# we need to have qemu load boot files externally
+CLOUD_IMAGE_TYPE_TAR = 1
+CLOUD_IMAGE_TYPE_DISK = 2
+
+QEMU_DISK_TYPE_SD = 1
+QEMU_DISK_TYPE_VIRTIO = 2
+QEMU_DISK_TYPE_VIRTIO_BLK = 3
+
+QEMU_ARCH_CONFIG = {
+ 'arm64': {
+ 'cloudimg_type': CLOUD_IMAGE_TYPE_TAR,
+ 'cloudimg_arch': 'arm64',
+ 'qemu_bin': 'qemu-system-aarch64',
+ 'qemu_disk_type': QEMU_DISK_TYPE_VIRTIO_BLK,
+ 'qemu_extra_args': [
+ '-machine', 'virt',
+ '-cpu', 'host',
+ '-enable-kvm',
+ '-serial', 'stdio',
+ ],
+ },
+ 'armhf': {
+ 'cloudimg_type': CLOUD_IMAGE_TYPE_TAR,
+ 'cloudimg_arch': 'armhf',
+ 'qemu_bin': 'qemu-system-arm',
+ 'qemu_disk_type': QEMU_DISK_TYPE_VIRTIO_BLK,
+ 'qemu_extra_args': [
+ '-machine', 'virt',
+ '-cpu', 'host',
+ '-enable-kvm',
+ '-serial', 'stdio',
+ ],
+ },
+ 'amd64': {
+ 'cloudimg_type': CLOUD_IMAGE_TYPE_DISK,
+ 'cloudimg_arch': 'i386',
+ 'qemu_bin': 'qemu-system-x86_64',
+ 'qemu_disk_type': QEMU_DISK_TYPE_VIRTIO,
+ 'qemu_extra_args': [
+ '-machine', 'accel=kvm:tcg',
+ ],
+ },
+ 'i386': {
+ 'cloudimg_type': CLOUD_IMAGE_TYPE_DISK,
+ 'cloudimg_arch': 'i386',
+ 'qemu_bin': 'qemu-system-x86_64',
+ 'qemu_disk_type': QEMU_DISK_TYPE_VIRTIO,
+ 'qemu_extra_args': [
+ '-machine', 'accel=kvm:tcg',
+ ],
+ },
+ 'ppc64el': {
+ 'cloudimg_type': CLOUD_IMAGE_TYPE_DISK,
+ 'cloudimg_arch': 'ppc64el',
+ 'qemu_bin': 'qemu-system-ppc64',
+ 'qemu_disk_type': QEMU_DISK_TYPE_VIRTIO,
+ 'qemu_extra_args': [
+ '-machine', 'prep',
+ ],
+ },
+}
+
+class QemuRunner(object):
+ def __init__(self, arch):
+ self.arch = arch
+ self.config = QEMU_ARCH_CONFIG[arch]
+ self.drive_id = 0
+ # Parameters common to all architectures
+ self.params = [
+ self.config['qemu_bin'],
+ "-m", "256",
+ "-display", "none",
+ "-nographic",
+ "-net", "nic",
+ "-net", "user,net=10.0.0.0/8,host=10.0.0.1,hostfwd=tcp::2222-:22",
+ ]
+ # Add any architecture-specific parameters
+ if 'qemu_extra_args' in self.config:
+ self.params = self.params + self.config['qemu_extra_args']
+
+ self.append = []
+ if self.config['cloudimg_type'] == CLOUD_IMAGE_TYPE_TAR:
+ self.append = self.append + [
+ 'console=ttyAMA0',
+ 'earlyprintk=serial',
+ 'ro',
+ 'rootfstype=ext4',
+ 'root=LABEL=cloudimg-rootfs',
+ 'rootdelay=10',
+ ]
+
+ def add_boot_files(self, kernel=None, initrd=None, dtb=None):
+ if kernel:
+ self.params = self.params + ['-kernel', kernel]
+ if initrd:
+ self.params = self.params + ['-initrd', initrd]
+ if dtb:
+ self.params = self.params + ['-dtb', dtb]
+
+ def add_drive(self, cloudimg):
+ drive = ["-drive"]
+ if self.config['qemu_disk_type'] == QEMU_DISK_TYPE_SD:
+ drive = drive + ["file=%s,if=sd,cache=writeback" % (cloudimg)]
+ elif self.config['qemu_disk_type'] == QEMU_DISK_TYPE_VIRTIO:
+ drive = drive + ["file=%s,if=virtio" % (cloudimg)]
+ elif self.config['qemu_disk_type'] == QEMU_DISK_TYPE_VIRTIO_BLK:
+ drive = drive + [ "file=%s,if=none,id=disk.%d"
+ % (cloudimg, self.drive_id) ]
+ drive = drive + [ "-device", "virtio-blk-device,drive=disk.%d"
+ % (self.drive_id) ]
+ self.params = self.params + drive
+ self.drive_id = self.drive_id + 1
+
+ def get_params(self):
+ params = self.params
+ if self.append:
+ params = params + ['-append', '"%s"' % (" ".join(self.append))]
+ return params
+
class KVMTest(object):
- def __init__(self, image=None, timeout=500, debug_file="virt_debug"):
+ def __init__(self, image=None, timeout=500, debug_file=None):
self.image = image
self.timeout = timeout
- self.debug_file = os.path.join(os.getcwd(), debug_file)
- self.arch = check_output(['arch'], universal_newlines=True)
+ self.debug_file = debug_file
+ self.arch = check_output(['dpkg', '--print-architecture'], universal_newlines=True).strip()
+ self.qemu_config = QEMU_ARCH_CONFIG[self.arch]
def download_image(self):
"""
@@ -70,10 +193,13 @@ class KVMTest(object):
# Construct URL
cloud_url = "http://cloud-images.ubuntu.com"
- if re.match("arm.*", self.arch):
- cloud_iso = release + "-server-cloudimg-armhf.tar.gz"
+ if self.qemu_config['cloudimg_type'] == CLOUD_IMAGE_TYPE_TAR:
+ cloud_iso = "%s-server-cloudimg-%s.tar.gz" % (release, self.qemu_config['cloudimg_arch'])
+ elif self.qemu_config['cloudimg_type'] == CLOUD_IMAGE_TYPE_DISK:
+ cloud_iso = "%s-server-cloudimg-%s-disk1.img" % (release, self.qemu_config['cloudimg_arch'])
else:
- cloud_iso = release + "-server-cloudimg-i386-disk1.img"
+ logging.error("Unknown cloud image type")
+ return False
image_url = "/".join((
cloud_url, release, "current", cloud_iso))
@@ -82,12 +208,15 @@ class KVMTest(object):
# Attempt download
try:
resp = urllib.request.urlretrieve(image_url, cloud_iso)
- except (IOError, OSError, urllib.error.HTTPError) as exception:
+ except (IOError,
+ OSError,
+ urllib.error.HTTPError,
+ urllib.error.URLError) as exception:
logging.error("Failed download of image from %s: %s", image_url, exception)
return False
# Unpack img file from tar
- if re.match("arm.*", self.arch):
+ if self.qemu_config['cloudimg_type'] == CLOUD_IMAGE_TYPE_TAR:
cloud_iso_tgz = tarfile.open(cloud_iso)
cloud_iso = cloud_iso.replace('tar.gz', 'img')
cloud_iso_tgz.extract(cloud_iso)
@@ -105,42 +234,29 @@ class KVMTest(object):
logging.debug("Attempting boot for:{}".format(data_disk))
- # Set Arbitrary IP values
- netrange = "10.0.0.0/8"
- image_ip = "10.0.0.1"
- hostfwd = "tcp::2222-:22"
- cloud_disk = ""
+ qemu = QemuRunner(self.arch)
+
+ # Assume that a tar type image is not self-bootable, so
+ # therefore requires explicit bootfiles (otherwise, why
+ # not just use the disk format directly?
+ if self.qemu_config['cloudimg_type'] == CLOUD_IMAGE_TYPE_TAR:
+ for dir in ['/boot', '/']:
+ kernel = os.path.join(dir, 'vmlinuz')
+ initrd = os.path.join(dir, 'initrd.img')
+ if os.path.isfile(kernel):
+ qemu.add_boot_files(kernel=kernel, initrd=initrd)
+ break
+
+ qemu.add_drive(data_disk)
# Should we attach the cloud config disk
if os.path.isfile("seed.iso"):
logging.debug("Attaching Cloud config disk")
- cloud_disk = "-drive file=seed.iso,if=virtio"
+ qemu.add_drive("seed.iso")
+
+ params = qemu.get_params()
+ logging.debug("Using params:{}".format(" ".join(params)))
- if re.match("(arm.*|aarch64)", self.arch):
- uname = check_output(['uname', '-r'], universal_newlines=True)
- cloud_disk = cloud_disk.replace("virtio", "sd")
- params = 'qemu-system-arm -machine vexpress-a15 -cpu cortex-a15 -enable-kvm -m {} -kernel /boot/vmlinuz -append "console=ttyAMA0 earlyprintk=serial root=/dev/mmcblk0 ro rootfstype=ext4" -serial stdio -dtb /lib/firmware/{}/device-tree/vexpress-v2p-ca15-tc1.dtb -initrd /boot/initrd.img -net nic -net user,net={},host={},hostfwd={} -drive file={},if=sd,cache=writeback {} -display none -nographic'.format(uname, "256", netrange, image_ip, hostfwd, data_disk, cloud_disk)
- else:
- params = \
- '''
- qemu-system-x86_64 -machine accel=kvm:tcg -m {0} -net nic
- -net user,net={1},host={2},hostfwd={3}
- -drive file={4},if=virtio {5} -display none -nographic
- '''.format(
- "256",
- netrange,
- image_ip,
- hostfwd,
- data_disk,
- cloud_disk).replace("\n", "").replace(" ", "")
-
- logging.debug("Using params:{}".format(params))
-
- # Default file location for log file is in checkbox output directory
- checkbox_dir = os.getenv("CHECKBOX_DATA")
-
- if checkbox_dir is not None:
- self.debug_file = os.path.join(checkbox_dir, self.debug_file)
logging.info("Storing VM console output in {}".format(
os.path.realpath(self.debug_file)))
# Open VM STDERR/STDOUT log file for writing
@@ -152,7 +268,7 @@ class KVMTest(object):
# Start Virtual machine
self.process = Popen(
- shlex.split(params), stdin=PIPE, stderr=file, stdout=file,
+ params, stdin=PIPE, stderr=file, stdout=file,
universal_newlines=True, shell=False)
def create_cloud_disk(self):
@@ -220,8 +336,10 @@ final_message: CERTIFICATION BOOT COMPLETE
instance = self.boot_image(self.image)
time.sleep(self.timeout)
- # Reset Console window to regain control from VM Serial I/0
- call('reset')
+ # If running in console, reset console window to regain
+ # control from VM Serial I/0
+ if sys.stdout.isatty():
+ call('reset')
# Check to be sure VM boot was successful
with open(self.debug_file, 'r') as debug_file:
file_contents = debug_file.read()
@@ -296,7 +414,7 @@ def test_kvm(args):
if args.image:
image = args.image
- kvm_test = KVMTest(image, timeout)
+ kvm_test = KVMTest(image, timeout, args.log_file)
result = kvm_test.start()
sys.exit(result)
@@ -319,6 +437,9 @@ def main():
'-i', '--image', type=str, default=None)
kvm_test_parser.add_argument(
'-t', '--timeout', type=int)
+ kvm_test_parser.add_argument(
+ '-l', '--log-file', default='virt_debug',
+ help="Location for debugging output log. Defaults to %(default)s.")
kvm_test_parser.add_argument('--debug', dest='log_level',
action="store_const", const=logging.DEBUG,
default=logging.INFO)
diff --git a/bin/wifi_time2reconnect b/bin/wifi_time2reconnect
index bfaa2ce..d4ae2c0 100755
--- a/bin/wifi_time2reconnect
+++ b/bin/wifi_time2reconnect
@@ -6,6 +6,11 @@ import sys
import time
import subprocess
from datetime import datetime
+try:
+ from subprocess import DEVNULL # >= python3.3
+except ImportError:
+ import os
+ DEVNULL = open(os.devnull, 'wb')
IFACE = None
TIMEOUT = 30
@@ -16,14 +21,23 @@ def main():
Check the time needed to reconnect an active WIFI connection
"""
devices = subprocess.getoutput('nmcli dev')
- match = re.search('(\w+)\s+802-11-wireless\s+connected', devices)
+ match = re.search('(\w+)\s+(802-11-wireless|wifi)\s+connected', devices)
if match:
IFACE = match.group(1)
else:
print("No active wifi connection detected", file=sys.stderr)
return 1
- dev_status = subprocess.getoutput('nmcli -t -f devices,uuid con status')
+ try:
+ dev_status = subprocess.check_output(
+ ['nmcli', '-t', '-f', 'devices,uuid', 'con', 'status'],
+ stderr=DEVNULL,
+ universal_newlines=True)
+ except subprocess.CalledProcessError:
+ dev_status = subprocess.check_output(
+ ['nmcli', '-t', '-f', 'device,uuid', 'con', 'show'],
+ stderr=DEVNULL,
+ universal_newlines=True)
match = re.search(IFACE+':(.*)', dev_status)
uuid = None
if match:
diff --git a/bin/xrandr_cycle b/bin/xrandr_cycle
index ef20c41..f778562 100755
--- a/bin/xrandr_cycle
+++ b/bin/xrandr_cycle
@@ -1,13 +1,14 @@
#!/usr/bin/env python3
-import subprocess
import argparse
-import tarfile
-import shutil
-import time
-import sys
+import errno
import os
import re
+import shutil
+import subprocess
+import sys
+import tarfile
+import time
parser = argparse.ArgumentParser()
parser.add_argument('--keyword', default='',
@@ -68,7 +69,19 @@ for line in output:
# Now we have a list of the modes we need to test. So let's do just that.
profile_path = os.environ['HOME'] + '/.shutter/profiles/'
screenshot_path = os.path.join(args.screenshot_dir, 'xrandr_screens')
-script_home = os.path.split(os.path.dirname(os.path.realpath(__file__)))[0]
+
+# Where to find the shutter.xml template? Two possible locations.
+shutter_xml_template = None
+
+if 'PLAINBOX_PROVIDER_DATA' in os.environ:
+ shutter_xml_template = os.path.join(os.environ['PLAINBOX_PROVIDER_DATA'],
+ "settings", "shutter.xml")
+else:
+ shutter_xml_template = os.path.join(os.path.split(os.path.dirname(
+ os.path.realpath(__file__)))[0],
+ "data",
+ "settings",
+ "shutter.xml")
if args.keyword:
screenshot_path = screenshot_path + '_' + args.keyword
@@ -76,21 +89,27 @@ if args.keyword:
regex = re.compile(r'filename="[^"\r\n]*"')
# Keep the shutter profile in place before starting
-try:
- os.makedirs(profile_path)
-except OSError:
- pass
+# Any errors creating the directories or copying the template is fatal,
+# since things won't work if we fail.
try:
- os.makedirs(screenshot_path)
-except OSError:
- pass
+ os.makedirs(profile_path, exist_ok=True)
+ os.makedirs(screenshot_path, exist_ok=True)
+except OSError as excp:
+ raise SystemExit("ERROR: Unable to create "
+ "required directories: {}".format(excp))
try:
- shutil.copy(script_home + '/data/settings/shutter.xml',
- profile_path)
-except IOError:
- pass
+ shutil.copy(shutter_xml_template, profile_path)
+except (IOError, OSError) as excp:
+ print("ERROR: Unable to copy {} to {}: {}".format(shutter_xml_template,
+ profile_path,
+ excp))
+ if excp.errno == errno.ENOENT:
+ print("Try setting PLAINBOX_PROVIDER_DATA to the the data path of a")
+ print("provider shipping the 'shutter.xml' template file, usually ")
+ print("found under /usr/share.")
+ raise SystemExit()
try:
old_profile = open(profile_path + 'shutter.xml', 'r')
@@ -102,7 +121,8 @@ try:
new_profile.close()
old_profile.close()
except:
- pass
+ raise SystemExit("ERROR: While updating folder name "
+ "in shutter profile: {}".format(sys.exc_info()))
for mode in modes:
cmd = 'xrandr --output ' + mode[0] + ' --mode ' + mode[1]
@@ -134,7 +154,9 @@ for mode in modes:
except:
print("""Could not configure screenshot tool -
you may need to install the package 'shutter',
- or check that %s exists.""" % profile_path)
+ or check that {}/{} exists and is writable.""".format(
+ profile_path,
+ 'shutter.xml'))
message = 'Set mode ' + mode[1] + ' for output ' + mode[0]
success_messages.append(message)