diff options
author | Jonathan Cave <jonathan.cave@canonical.com> | 2019-05-22 17:14:15 +0100 |
---|---|---|
committer | Jonathan Cave <jonathan.cave@canonical.com> | 2019-05-29 14:51:04 +0100 |
commit | 8c99262b24fdf3f9fbe5519c34ad2096ddd55ad2 (patch) | |
tree | 6fe535d6b5653b4ee16d0e0cb73303a5e421ea3e /bin | |
parent | 6167a9da66f1c85399a96ac529c7d7b1e99227f8 (diff) |
network_device_info: refactor with argparse
Refactoring of the network_device_info which appears to have grown organically to the point where it is somewhat unreadable
Diffstat (limited to 'bin')
-rwxr-xr-x | bin/network_device_info.py | 523 |
1 files changed, 317 insertions, 206 deletions
diff --git a/bin/network_device_info.py b/bin/network_device_info.py index 29a4aed..e36c201 100755 --- a/bin/network_device_info.py +++ b/bin/network_device_info.py @@ -19,6 +19,11 @@ # # Copyright (C) 2012-2019 Canonical, Ltd. +import argparse +import fcntl +import os +import socket +import struct from subprocess import check_output, CalledProcessError, STDOUT import sys @@ -28,234 +33,340 @@ from checkbox_support.parsers.modinfo import ModinfoParser from checkbox_support.parsers.udevadm import UdevadmParser -# This example lists basic information about network interfaces known to NM -devtypes = {1: "Ethernet", - 2: "WiFi", - 5: "Bluetooth", - 6: "OLPC", - 7: "WiMAX", - 8: "Modem"} +class Utils(): -states = {0: "Unknown", - 10: "Unmanaged", - 20: "Unavailable", - 30: "Disconnected", - 40: "Prepare", - 50: "Config", - 60: "Need Auth", - 70: "IP Config", - 80: "IP Check", - 90: "Secondaries", - 100: "Activated", - 110: "Deactivating", - 120: "Failed"} + sys_path = '/sys/class/net' -attributes = ("category", "interface", "product", "vendor", "driver", "path") - -udev_devices = [] -nm_devices = [] - - -class UdevResult: - def addDevice(self, device): - if device.category == 'NETWORK' and device.interface != "UNKNOWN": - udev_devices.append(device) - - -class NetworkingDevice(): - def __init__(self, devtype, props, dev_proxy, bus): - self._devtype = devtype + @classmethod + def is_iface_connected(cls, iface): try: - self._interface = props['Interface'] - except KeyError: - self._interface = "Unknown" - - try: - self._ip = self._int_to_ip(props['Ip4Address']) - except KeyError: - self._ip = "Unknown" - - try: - self._driver = props['Driver'] - except KeyError: - self._driver = "Unknown" - self._driver_ver = "Unknown" - - if self._driver != "Unknown": - self._modinfo = self._modinfo_parser(props['Driver']) - if self._modinfo: - self._driver_ver = self._find_driver_ver() - else: - self._driver_ver = "Unknown" - - try: - self._firmware_missing = props['FirmwareMissing'] - except KeyError: - self._firmware_missing = False - + carrier_file = os.path.join(cls.sys_path, iface, 'carrier') + return int(open(carrier_file, 'r').read()) == 1 + except Exception: + pass + return False + + @staticmethod + def get_ipv4_address(interface): + s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + return socket.inet_ntoa(fcntl.ioctl( + s.fileno(), + 0x8915, # SIOCGIFADDR + struct.pack('256s', interface[:15].encode()) + )[20:24]) + + @staticmethod + def get_ipv6_address(interface): + cmd = ['/sbin/ip', '-6', '-o', 'addr', 'show', 'dev', interface, + 'scope', 'link'] + proc = check_output(cmd, universal_newlines=True) + return proc.split()[3].strip() + + @classmethod + def get_mac_address(cls, interface): + address_file = os.path.join(cls.sys_path, interface, 'address') try: - self._state = states[props['State']] - except KeyError: - self._state = "Unknown" - - def __str__(self): - ret = "Category: %s\n" % self._devtype - ret += "Interface: %s\n" % self._interface - ret += "IP: %s\n" % self._ip - ret += "Driver: %s (ver: %s)\n" % (self._driver, self._driver_ver) - if self._firmware_missing: - ret += "Warning: Required Firmware Missing for device\n" - ret += "State: %s\n" % self._state - return ret - - def getstate(self): - return self._state - - def gettype(self): - return self._devtype + return open(address_file, 'r').read().strip() + except IOError: + return 'UNKNOWN' - def _bitrate_to_mbps(self, bitrate): + @classmethod + def get_speed(cls, interface): + speed_file = os.path.join(cls.sys_path, interface, 'speed') try: - intbr = int(bitrate) - return str(intbr / 1000) - except Exception: - return "NaN" + return open(speed_file, 'r').read().strip() + except IOError: + return 'UNKNOWN' - def _modinfo_parser(self, driver): + @staticmethod + def get_driver_version(driver): cmd = ['/sbin/modinfo', driver] try: - stream = check_output(cmd, stderr=STDOUT, universal_newlines=True) + output = check_output(cmd, stderr=STDOUT, universal_newlines=True) except CalledProcessError: return None - - if not stream: + if not output: return None - else: - parser = ModinfoParser(stream) - modinfo = parser.get_all() + parser = ModinfoParser(output) + modinfo = parser.get_all() - return modinfo - - def _find_driver_ver(self): # try the version field first, then vermagic second, some audio # drivers don't report version if the driver is in-tree - if self._modinfo['version'] and self._modinfo['version'] != 'in-tree:': - return self._modinfo['version'] - else: + version = modinfo.get('version') + + if version is None or version == 'in-tree': # vermagic will look like this (below) and we only care about the # first part: # "3.2.0-29-generic SMP mod_unload modversions" - return self._modinfo['vermagic'].split()[0] - - def _int_to_ip(self, int_ip): - ip = [0, 0, 0, 0] - ip[0] = int_ip & 0xff - ip[1] = (int_ip >> 8) & 0xff - ip[2] = (int_ip >> 16) & 0xff - ip[3] = (int_ip >> 24) & 0xff - return "%d.%d.%d.%d" % (ip[0], ip[1], ip[2], ip[3]) - - -def print_udev_devices(): - print("[ Devices found by udev ]".center(80, '-')) - for device in udev_devices: - for attribute in attributes: - value = getattr(device, attribute) - if value is not None: - if attribute == 'driver': - props = {} - props['Driver'] = value - network_dev = NetworkingDevice( - None, props, None, None) - print("%s: %s (ver: %s)" % ( - attribute.capitalize(), value, - network_dev._driver_ver)) - else: - print("%s: %s" % (attribute.capitalize(), value)) - vendor_id = getattr(device, 'vendor_id') - product_id = getattr(device, 'product_id') - subvendor_id = getattr(device, 'subvendor_id') - subproduct_id = getattr(device, 'subproduct_id') - if vendor_id and product_id: - print("ID: [{0:04x}:{1:04x}]".format( - vendor_id, product_id)) - if subvendor_id and subproduct_id: - print("Subsystem ID: [{0:04x}:{1:04x}]".format( - subvendor_id, subproduct_id)) - print() - - -def get_nm_devices(): - devices = [] - bus = dbus.SystemBus() - - # Get a proxy for the base NetworkManager object - proxy = bus.get_object("org.freedesktop.NetworkManager", - "/org/freedesktop/NetworkManager") - manager = dbus.Interface(proxy, "org.freedesktop.NetworkManager") - - # Get all devices known to NM and print their properties - nm_devices = manager.GetDevices() - for d in nm_devices: - dev_proxy = bus.get_object("org.freedesktop.NetworkManager", d) - prop_iface = dbus.Interface(dev_proxy, - "org.freedesktop.DBus.Properties") - props = prop_iface.GetAll("org.freedesktop.NetworkManager.Device") - try: - devtype = devtypes[props['DeviceType']] - except KeyError: - devtype = "Unknown" - - # only return Ethernet devices - if devtype == "Ethernet": - devices.append(NetworkingDevice(devtype, props, dev_proxy, bus)) - return devices - - -def main(): - if len(sys.argv) != 2 or sys.argv[1] not in ('detect', 'info'): - raise SystemExit('ERROR: please specify detect or info') - action = sys.argv[1] - - try: - output = check_output(['udevadm', 'info', '--export-db']) - except CalledProcessError as err: - raise SystemExit(err) - try: - output = output.decode("UTF-8", errors='ignore') - except UnicodeDecodeError as err: - raise SystemExit("udevadm output is not valid UTF-8") - udev = UdevadmParser(output) - result = UdevResult() - udev.run(result) - - # The detect action should indicate presence of an ethernet adatper and - # cause the job to fail if none present - rely on udev for this - if action == 'detect': - if udev_devices: - print_udev_devices() + version = modinfo.get('vermagic').split()[0] + + return version + + +class NetworkDeviceInfo(): + + def __init__(self): + self._category = None + self._interface = None + self._product = None + self._vendor = None + self._driver = None + self._driver_version = None + self._firmware_missing = None + self._path = None + self._id = None + self._subsystem_id = None + self._mac = None + self._carrier_status = None + self._ipv4 = None + self._ipv6 = None + self._speed = None + + def __str__(self): + ret = "" + for key, val in vars(self).items(): + if val is not None: + # leading _ removed, remaining ones spaces + pretty_key = key.lstrip('_').replace('_', ' ').title() + ret += '{}: {}\n'.format(pretty_key, val) + return ret + + @property + def interface(self): + return self._interface + + @interface.setter + def interface(self, value): + self._interface = value + self._interface_populate() + + @property + def driver(self): + return self._driver + + @driver.setter + def driver(self, value): + self._driver = value + self._driver_populate() + + @property + def category(self): + return self._category + + @category.setter + def category(self, value): + self._category = value + + @property + def product(self): + return self._product + + @product.setter + def product(self, value): + self._product = value + + @property + def vendor(self): + return self._vendor + + @vendor.setter + def vendor(self, value): + self._vendor = value + + @property + def firmware_missing(self): + return self._firmware_missing + + @firmware_missing.setter + def firmware_missing(self, value): + self._firmware_missing = value + + @property + def path(self): + return self._path + + @path.setter + def path(self, value): + self._path = value + + @property + def id(self): + return self._id + + @id.setter + def id(self, value): + self._id = value + + @property + def subsystem_id(self): + return self._subsystem_id + + @subsystem_id.setter + def subsystem_id(self, value): + self._subsystem_id = value + + def _interface_populate(self): + """Get extra attributes based on interface""" + if self.interface is None: + return + self._mac = Utils.get_mac_address(self.interface) + if Utils.is_iface_connected(self.interface): + self._carrier_status = 'Connected' + self._ipv4 = Utils.get_ipv4_address(self.interface) + self._ipv6 = Utils.get_ipv6_address(self.interface) + self._speed = Utils.get_speed(self.interface) else: - raise SystemExit('No devices detected by udev') + self._carrier_status = 'Disconnected' + + def _driver_populate(self): + """Get extra attributes based on driver""" + if self.driver is None: + return + self._driver_version = Utils.get_driver_version(self.driver) + + +class NMDevices(): + + # This example lists basic information about network interfaces known to NM + devtypes = {1: "Ethernet", + 2: "WiFi", + 5: "Bluetooth", + 6: "OLPC", + 7: "WiMAX", + 8: "Modem"} + + def __init__(self, category="NETWORK"): + self.category = category + self._devices = [] + self._collect_devices() + + def __len__(self): + return len(self._devices) + + def _collect_devices(self): + bus = dbus.SystemBus() + proxy = bus.get_object("org.freedesktop.NetworkManager", + "/org/freedesktop/NetworkManager") + manager = dbus.Interface(proxy, "org.freedesktop.NetworkManager") + self._devices = manager.GetDevices() + + def devices(self): + """Convert to list of NetworkDevice with NM derived attrs set""" + for d in self._devices: + bus = dbus.SystemBus() + dev_proxy = bus.get_object("org.freedesktop.NetworkManager", d) + prop_iface = dbus.Interface(dev_proxy, + "org.freedesktop.DBus.Properties") + props = prop_iface.GetAll("org.freedesktop.NetworkManager.Device") + devtype = self.devtypes.get(props.get('DeviceType')) + if devtype is None: + continue + nd = NetworkDeviceInfo() + if self.category == "NETWORK": + if devtype == "Ethernet": + nd.category = self.category + else: + continue + if self.category == "WIRELESS": + if devtype == "WiFi": + nd.category = self.category + else: + continue + nd.interface = props.get('Interface') + nd.driver = props.get('Driver') + nd.firmware_missing = props.get('FirmwareMissing') + yield nd - # The info action should just gather infomation about any ethernet devices - # found and report for inclusion in e.g. an attachment job - if action == 'info': - # Report udev detected devices first - if udev_devices: - print_udev_devices() - # Attempt to report devices found by NetworkManager - this doesn't - # make sense for server installs so skipping is acceptable +class UdevDevices(): + + def __init__(self, category='NETWORK'): + self.category = category + self._devices = [] + self._collect_devices() + + def __len__(self): + return len(self._devices) + + def _collect_devices(self): + cmd = ['udevadm', 'info', '--export-db'] try: - nm_devices = get_nm_devices() - except dbus.exceptions.DBusException: - # server's don't have network manager installed - print('Network Manager not found') - else: - print("[ Devices found by Network Manager ]".center(80, '-')) - for nm_dev in nm_devices: - print(nm_dev) + output = check_output(cmd).decode(sys.stdout.encoding) + except CalledProcessError as err: + sys.stderr.write(err) + return + udev = UdevadmParser(output) + udev.run(self) + + def addDevice(self, device): + """Callback for UdevadmParser""" + if device.category == self.category and device.interface != 'UNKNOWN': + self._devices.append(device) + + def devices(self): + """Convert to list of NetworkDevice with UDev derived attrs set""" + for device in self._devices: + nd = NetworkDeviceInfo() + nd.category = getattr(device, 'category') + nd.interface = getattr(device, 'interface') + nd.product = getattr(device, 'product') + nd.vendor = getattr(device, 'vendor') + nd.driver = getattr(device, 'driver') + nd.path = getattr(device, 'path') + nd.id = '[{0:04x}:{1:04x}]'.format( + getattr(device, 'vendor_id'), + getattr(device, 'product_id')) + nd.subsystem_id = '[{0:04x}:{1:04x}]'.format( + getattr(device, 'subvendor_id'), + getattr(device, 'subproduct_id')) + yield nd if __name__ == "__main__": - main() + parser = argparse.ArgumentParser( + description='Gather information about network devices') + parser.add_argument('action', choices=('detect', 'info'), + help='Detect mode or just report info') + parser.add_argument('category', choices=('NETWORK', 'WIRELESS'), + help='Either ethernet or WLAN devices') + parser.add_argument('--no-nm', action='store_true', + help='Don\'t attempt to get info from network manager') + parser.add_argument('--interface', + help='Restrict info action to specified interface') + args = parser.parse_args() + + udev = UdevDevices(args.category) + + # The detect action should indicate presence of a device belonging to the + # category and cause the job to fail if none present + if args.action == 'detect': + if len(udev) == 0: + raise SystemExit('No devices detected by udev') + else: + print("[ Devices found by udev ]".center(80, '-')) + for device in udev.devices(): + print(device) + + # The info action should just gather infomation about any ethernet devices + # found and report for inclusion in e.g. an attachment job and include + # NetworkManager as a source if desired + if args.action == 'info': + # If interface has been specified + if args.interface: + for device in udev.devices(): + if device.interface == args.interface: + print(device) + sys.exit(0) + + # Report udev detected devices first + print("[ Devices found by udev ]".center(80, '-')) + for device in udev.devices(): + print(device) + + # Attempt to report devices found by NetworkManager. This can be + # skipped as doesn't make sense for server installs + if not args.no_nm: + nm = NMDevices(args.category) + print("[ Devices found by Network Manager ]".center(80, '-')) + for device in nm.devices(): + print(device) |