diff options
| author | Sylvain Pineau <sylvain.pineau@canonical.com> | 2017-07-03 10:07:56 +0200 |
|---|---|---|
| committer | Sylvain Pineau <sylvain.pineau@canonical.com> | 2017-07-03 10:07:56 +0200 |
| commit | 18f79d91e4376193287b980c60452b9a6696d221 (patch) | |
| tree | c46b7e559a5a96b1b9350c2549f71b163b43c314 | |
| parent | 3604050d7f06069e0f4c2e8999a7276d96088f26 (diff) | |
Remove unused bin/network_bandwidth_test
| -rwxr-xr-x | bin/network_bandwidth_test | 671 |
1 files changed, 0 insertions, 671 deletions
diff --git a/bin/network_bandwidth_test b/bin/network_bandwidth_test deleted file mode 100755 index c9b1dfb..0000000 --- a/bin/network_bandwidth_test +++ /dev/null @@ -1,671 +0,0 @@ -#!/usr/bin/env python3 - -import os -import re -import sys -import random -import logging -import subprocess - -from datetime import datetime, timedelta -from time import sleep - -from logging import StreamHandler, FileHandler, Formatter -from optparse import OptionParser - -from checkbox_support.lib.conversion import string_to_type - - -class CommandException(Exception): - - pass - - -class CommandOutput(object): - - def __init__(self, **attributes): - self._attributes = attributes - - def __getattr__(self, name): - if name in self._attributes: - return self._attributes.get(name) - - return None - - -class Command(object): - - # Name of the command to run - name = None - - # Number of command line arguments - argument_count = 0 - - # Option processing - option_strings = {} - option_defaults = {} - - # Ouput processing - output_factory = CommandOutput - output_patterns = {} - - # Convenient output patterns - non_space = r"[^ ]+" - - def __init__(self, *arguments, **options): - if len(arguments) != self.argument_count: - raise TypeError("Invalid number of arguments: %d" % len(arguments)) - - self._arguments = arguments - - self._options = self.option_defaults.copy() - for name, string in options.items(): - if name not in self.option_strings: - raise TypeError("Unknown option: %s" % name) - self._options[name] = string - - def get_command(self): - command = [self.name] - for name, string in self._options.items(): - # Match option from string - if isinstance(string, bool): - option = self.option_strings[name] - else: - option = self.option_strings[name] % string - - command.append(option) - - command.extend(self._arguments) - - return " ".join(command) - - def parse_lines(self, lines): - attributes = {} - for line in lines: - # Match patterns from lines - for name, pattern in self.output_patterns.items(): - match = re.search(pattern, line) - if match: - attributes[name] = string_to_type(match.group(1)) - - return self.output_factory(**attributes) - - def parse_output(self, output): - lines = output.split("\n") - # Strip leading and trailing spaces - lines = [l.strip() for l in lines] - # Skip blank lines - lines = [l for l in lines if l] - - return self.parse_lines(lines) - - def run(self): - command = self.get_command() - logging.debug("Running command: %s" % command) - process = subprocess.Popen(command, shell=True, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE) - error = process.stderr.read() - if error: - raise CommandException(error.decode("utf-8")) - - output = process.stdout.read() - return self.parse_output(output.decode("utf-8")) - - -class NetworkConfigOutput(CommandOutput): - - @property - def speed(self): - if self.name == "lo": - return 10000 - - try: - wireless = WirelessConfig(self.name).run() - speed = wireless.bit_rate - except CommandException: - wired = WiredConfig(self.name).run() - speed = wired.speed - - return speed / 1024 / 1024 - - -class NetworkConfig(Command): - - name = "ifconfig" - - argument_count = 1 - - ipv4 = r"\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}" - ipv6 = r"[\w:]+/\d+" - mac_address = r"\w\w:\w\w:\w\w:\w\w:\w\w:\w\w" - - output_factory = NetworkConfigOutput - output_patterns = { - "name": r"(%s).*Link encap" % Command.non_space, - "broadcast": r"Bcast:(%s)" % ipv4, - "collisions": "collisions:(\d+)", - "hwaddr": r"HWaddr (%s)" % mac_address, - "inet_addr": r"inet addr:(%s)" % ipv4, - "link_encap": r"Link encap:(%s)" % Command.non_space, - "netmask": r"Mask:(%s)" % ipv4, - "metric": r"Metric:(\d+)", - "mtu": r"MTU:(\d+)", - "rx_bytes": "RX bytes:(\d+)", - "rx_dropped": "RX packets:.* dropped:(\d+)", - "rx_errors": "RX packets:.* errors:(\d+)", - "rx_frame": "RX packets:.* frame:(\d+)", - "rx_overruns": "RX packets:.* overruns:(\d+)", - "rx_packets": "RX packets:(\d+)", - "tx_bytes": "TX bytes:(\d+)", - "tx_carrier": "TX packets:.* carrier:(\d+)", - "tx_dropped": "TX packets:.* dropped:(\d+)", - "tx_errors": "TX packets:.* errors:(\d+)", - "tx_overruns": "TX packets:.* overruns:(\d+)", - "tx_packets": "TX packets:(\d+)", - "txqueuelen": "txqueuelen:(\d+)"} - - -class NetworkConfigs(Command): - - name = "ifconfig -a" - - def parse_output(self, output): - outputs = [] - for paragraph in output.split("\n\n"): - if not paragraph: - continue - - lines = paragraph.split("\n") - name = re.split(r"\s+", lines[0])[0] - config = NetworkConfig(name).parse_lines(lines) - outputs.append(config) - - return outputs - - -class WiredConfig(Command): - - name = "ethtool" - - argument_count = 1 - - output_patterns = { - "advertised_auto_negotiation": r"Advertised auto-negotiation:\s+(.*)", - "advertised_link_modes": r"Advertised link modes:\s+(.*)", - "auto_negotiation": r"Auto-negotiation:\s+(.*)", - "current_message_level": r"Current message level:\s+(.*)", - "duplex": r"Duplex:\s+(.*)", - "link_detected": r"Link detected:\s+(.*)", - "phyad": r"PHYAD:\s+(.*)", - "port": r"Port:\s+(.*)", - "speed": r"Speed:\s+(.*)/s", - "supported_auto_negotiation": r"Supports auto-negotiation:\s+(.*)", - "supported_link_modes": r"Supported link modes:\s+(.*)", - "supported_ports": r"Supported ports:\s+(.*)", - "supports_wake_on": r"Supports Wake-on:\s+(.*)", - "transceiver": r"Transceiver:\s+(.*)", - "wake_on": r"Wake-on:\s+(.*)"} - - def parse_lines(self, lines): - new_lines = [] - # Skip header line - for line in lines[1:]: - if not re.search(r": ", line): - new_lines[-1] += " " + line - else: - new_lines.append(line) - - return super(WiredConfig, self).parse_lines(new_lines) - - -class WirelessConfig(Command): - - name = "iwconfig" - - argument_count = 1 - - fraction = r"\d+(/\d+)?" - numeric = r"[\d\.]+" - numeric_with_unit = r"%s( %s)?" % (numeric, Command.non_space) - - output_patterns = { - "access_point": r"Access Point: (.*)", - "bit_rate": r"Bit Rate[=:](%s)/s" % numeric_with_unit, - "channel": r"Channel=(%s)" % Command.non_space, - "essid": r"ESSID:\"?([^\"]+)\"?", - "fragment_thr": r"Fragment thr:(\w+)", - "frequency": r"Frequency:(%s)" % numeric_with_unit, - "invalid_misc": r"Invalid misc:(\d+)", - "link_quality": r"Link Quality[=:](%s)" % fraction, - "missed_beacon": r"Missed beacon:(\d+)", - "mode": r"Mode:(%s)" % Command.non_space, - "noise_level": r"Noise level[=:](%s)" % numeric_with_unit, - "power_management": r"Power Management:(.*)", - "retry_limit": r"Retry limit:(\w+)", - "rts_thr": r"RTS thr:(\w+)", - "rx_invalid_crypt": r"Rx invalid crypt:(\d+)", - "rx_invalid_frag": r"Rx invalid frag:(\d+)", - "rx_invalid_nwid": r"Rx invalid nwid:(\d+)", - "sensitivity": r"Sensitivity=(%s)" % fraction, - "signal_level": r"Signal level[=:](%s)" % numeric_with_unit, - "tx_excessive_retries": r"Tx excessive retries:(\d+)", - "tx_power": r"Tx-Power=(%s)" % numeric_with_unit} - - -class Ping(Command): - - name = "ping" - - argument_count = 1 - - option_strings = { - "count": "-c %d", - "flood": "-f", - "interface": "-I %s", - "quiet": "-q", - "size": "-s %d", - "ttl": "-t %d"} - - option_defaults = { - "count": 1, - "quiet": True} - - ms = r"\d+\.\d+" - rtt = (ms, ms, ms, ms) - - output_patterns = { - "packet_loss": r"(\d+)% packet loss,", - "packets_received": r"(\d+) received,", - "packets_transmitted": r"(\d+) packets transmitted,", - "rtt_avg": r"rtt min/avg/max/mdev = %s/(%s)/%s/%s ms" % rtt, - "rtt_max": r"rtt min/avg/max/mdev = %s/%s/(%s)/%s ms" % rtt, - "rtt_mdev": r"rtt min/avg/max/mdev = %s/%s/%s/(%s) ms" % rtt, - "rtt_min": r"rtt min/avg/max/mdev = (%s)/%s/%s/%s ms" % rtt, - "time": r"time (\d+)ms"} - - def parse_lines(self, lines): - # Skip ping lines - return super(Ping, self).parse_lines(lines[-2:]) - - -class PingLarge(Ping): - - # Some wired environments can handle the maximum ping packet - # size, (65507+28)=65535 bytes. With a count of 191 packets, 65535 - # bytes/packet, 8 bits/byte, the sum payload is 100137480 bits ~ - # 100Mb. This is preferred and will be tried first. - packet_size = 65507 - packet_count = 191 - - option_defaults = { - "count": packet_count, - "flood": True, - "quiet": True, - "size": packet_size, - "ttl": 1} - - -class PingSmall(PingLarge): - - # If the large packet test was too lossy, we fall back to a packet - # equal to the default MTU size of 1500, (1472+28)=1500 bytes. - # With a count of 8334 packets, 1500 bytes/packet, 8 bits/byte, the - # sum payload is 100008000 bits ~ 100Mb. - packet_size = 1472 - packet_count = 8334 - - option_defaults = PingLarge.option_defaults.copy() - option_defaults.update({ - "count": packet_count, - "size": packet_size}) - - -class PingHost(Command): - - output_patterns = { - "host": r"(?:Host|Nmap scan report for) (%s)" % NetworkConfig.ipv4, - "mac_address": r"MAC Address: (%s)" % NetworkConfig.mac_address} - - -class PingScan(Command): - - name = "nmap -n -sP" - - argument_count = 1 - - def parse_lines(self, lines): - hosts = [] - host_lines = [] - # Skip header lines - for line in lines[1:]: - host_lines.append(line) - if line.startswith("MAC Address"): - host = PingHost().parse_lines(host_lines) - hosts.append(host) - host_lines = [] - - return hosts - - -class Ip(object): - - def __init__(self, address): - self.address = address - self.binary = self._address_to_binary(address) - - def __str__(self): - return self.address - - def _address_to_binary(self, address): - binary = 0 - for position, part in enumerate(address.split(".")): - if position >= 4: - raise ValueError("Address contains more than four parts.") - try: - if not part: - part = 0 - else: - part = int(part) - if not 0 <= part < 256: - raise ValueError - except ValueError: - raise ValueError("Address part out of range.") - binary <<= 8 - binary += part - return binary - - def count_1_bits(self): - ret = 0 - num = self.binary - while num > 0: - num = num >> 1 - ret += 1 - return ret - - def count_0_bits(self): - num = int(self.binary) - if num < 0: - raise ValueError("Only positive Numbers please: %s" % (num)) - ret = 0 - while num > 0: - if num & 1 == 1: - break - num = num >> 1 - ret += 1 - return ret - - -class IpRange(object): - - def __init__(self, address, netmask): - self.address = Ip(address) - self.netmask = Ip(netmask) - self.prefix = self._netmask_to_prefix(self.netmask) - - def __str__(self): - return "%s/%s" % (self.address, self.prefix) - - def _check_netmask(self, masklen): - num = int(self.netmask.binary) - bits = masklen - - # remove zero bits at the end - while (num & 1) == 0: - num = num >> 1 - bits -= 1 - if bits == 0: - break - # now check if the rest consists only of ones - while bits > 0: - if (num & 1) == 0: - raise ValueError("Netmask %s can't be expressed as an prefix." - % (hex(self.netmask.binary))) - num = num >> 1 - bits -= 1 - - def _netmask_to_prefix(self, netmask): - netlen = netmask.count_0_bits() - masklen = netmask.count_1_bits() - self._check_netmask(masklen) - return masklen - netlen - - def contains(self, address): - address = Ip(address) - if self.address.binary & self.netmask.binary \ - == address.binary & self.netmask.binary: - return True - - return False - - def scan(self, max=None): - scan = PingScan(str(self)).run() - targets = [s.host for s in scan] - random.shuffle(targets) - - if max is not None: - targets = targets[:max] - - return targets - - -class NetworkManagerException(Exception): - - pass - - -class NetworkManager(object): - - NM_SERVICE = "org.freedesktop.NetworkManager" - NM_PATH = "/org/freedesktop/NetworkManager" - NM_INTERFACE = NM_SERVICE - - NM_PATH_DEVICES = "/org/freedesktop/NetworkManager/Devices" - NM_INTERFACE_DEVICES = "org.freedesktop.NetworkManager.Devices" - - NMI_SERVICE = "org.freedesktop.NetworkManagerInfo" - NMI_PATH = "/org/freedesktop/NetworkManagerInfo" - NMI_INTERFACE = NMI_SERVICE - - HAL_SERVICE = "org.freedesktop.Hal" - HAL_PATH = "/org/freedesktop/Hal/Manager" - HAL_INTERFACE = "org.freedesktop.Hal.Manager" - HAL_INTERFACE_DEVICE = "org.freedesktop.Hal.Device" - - #http://projects.gnome.org/NetworkManager/developers/ - #NetworkManager D-Bus API Specifications, look for the - #NM_STATE enumeration to see which statuses indicate connection - #established and put them in this list. "3" works for NM 0.7 - #and 0.8, while "60" and "70" work for NM 0.9. - STATES_CONNECTED = [3, 60, 70] - - def __init__(self): - try: - import dbus - except ImportError: - raise NetworkManagerException("Python module not found: dbus") - - try: - self._bus = dbus.SystemBus() - self.nm_object = self._bus.get_object(self.NM_SERVICE, - self.NM_PATH) - self.nm_service = dbus.Interface(self.nm_object, self.NM_INTERFACE) - except dbus.exceptions.DBusException: - raise NetworkManagerException("Failed to connect to dbus service") - - def is_connected(self): - state = self.nm_service.state() - return state in self.STATES_CONNECTED - - -class Application(object): - - def __init__(self, targets, interfaces, scan): - self.targets = targets - self.interfaces = interfaces - self.scan = scan - - def test_interface(self, interface, targets): - logging.info("Testing %s at %s-Mbps", interface.name, interface.speed) - for target in targets: - ping = PingLarge(target, interface=interface.name) - result = ping.run() - if result.packet_loss: - ping = PingSmall(target, interface=interface.name) - result = ping.run() - if result.packet_loss: - logging.warning("SKIP: Non-zero packet loss (%s%%) " - "for [%s] [%s]->[%s]", - result.packet_loss, interface.name, - interface.inet_addr, target) - continue - - mbps = (8 * (ping.packet_size + 28) * ping.packet_count - / result.time / 1000) - percent = (100 * 8 * (ping.packet_size + 28) * ping.packet_count - / result.time / 1000 / interface.speed) - if percent >= 10: - logging.info("PASS: Effective rate: %3.4f Mbps, " - "%3.2f%% of theoretical max (%5.2f Mbps)", - mbps, percent, interface.speed) - return True - else: - logging.warning("Unacceptable network effective rate found for [%s]" % interface.name) - logging.warning("Effective rate %3.4f Mbps, %3.2f%% of theoretical max (%5.2f Mbps)" % - (mbps, percent, interface.speed)) - return False - - def run(self): - logging.debug("Acquiring network Interfaces") - if self.interfaces: - interfaces = [NetworkConfig(i).run() for i in self.interfaces] - else: - interfaces = NetworkConfigs().run() - interfaces = [i for i in interfaces if i.inet_addr] - - for interface in interfaces: - if not interface.inet_addr: - logging.debug("No network address for [%s]", interface.name) - continue - - targets = [] - ip_range = IpRange(interface.inet_addr, interface.netmask) - if self.targets: - for target in self.targets: - if ip_range.contains(target): - targets.append(target) - elif interface.name != "lo": - targets = ip_range.scan(self.scan) - logging.info("The following targets were found for %s:" % interface.name) - for target in targets: - logging.info("\t%s" % target) - - if not targets: - logging.debug("No targets found for [%s]", interface.name) - continue - - if not self.test_interface(interface, targets): - return False - - return True - - -class ApplicationManager(object): - - application_factory = Application - - default_log_level = "critical" - default_scan = 1 - default_timeout = 60 - - def get_parser(self, args): - usage = "Usage: %prog [TARGETS]" - - parser = OptionParser(usage=usage) - parser.add_option("-i", "--interface", - dest="interfaces", - action="append", - type="string", - default=[], - help="Interface to test.") - parser.add_option("-s", "--scan", - default=self.default_scan, - type="int", - help="Number of targets to scan when not provided.") - parser.add_option("-t", "--timeout", - default=self.default_timeout, - type="int", - help="Time to wait for network manager to connect.") - parser.add_option("-l", "--log", - metavar="FILE", - help="The file to write the log to.") - parser.add_option("--log-level", - default=self.default_log_level, - help=("One of debug, info, warning, " - "error or critical.")) - - return parser - - def check_uid(self): - return os.getuid() == 0 - - def check_network(self, timeout): - try: - nm = NetworkManager() - except NetworkManagerException: - return True - - start = datetime.now() - while True: - if nm.is_connected(): - return True - if datetime.now() - start > timedelta(seconds=timeout): - return False - sleep(5) - - def create_application(self, args=sys.argv[1:]): - parser = self.get_parser(args) - (options, args) = parser.parse_args(args) - - log_level = logging.getLevelName(options.log_level.upper()) - log_handlers = [] - if options.log: - log_filename = options.log - log_handlers.append(FileHandler(log_filename)) - else: - log_handlers.append(StreamHandler()) - - # Logging setup - format = ("%(asctime)s %(levelname)-8s %(message)s") - date_format = '%Y-%m-%d %H:%M:%S' - if log_handlers: - for handler in log_handlers: - handler.setFormatter(Formatter(format, date_format)) - logging.getLogger().addHandler(handler) - if log_level: - logging.getLogger().setLevel(log_level) - elif not logging.getLogger().handlers: - logging.disable(logging.CRITICAL) - - if not self.check_uid(): - parser.error("Must be run as root.") - - if not self.check_network(options.timeout): - parser.error("Network devices must be configured and connected to a LAN segment before testing") - - targets = args - return self.application_factory(targets, - options.interfaces, options.scan) - - -def main(): - application_manager = ApplicationManager() - application = application_manager.create_application() - if not application.run(): - return 1 - - return 0 - - -if __name__ == "__main__": - sys.exit(main()) |
