summaryrefslogtreecommitdiff
path: root/bin
diff options
authorPMR <pmr@pmr-lander>2019-05-29 15:17:12 +0000
committerPMR <pmr@pmr-lander>2019-05-29 15:17:12 +0000
commitf6c2e95152a8ad2fd77b68a8b1d7946be056fbaf (patch)
treeb8b82062475f94d25d9cc5b2ad78ca5fd6f4d455 /bin
parentb0d1e33fb407dfa299574396c868dce33c639ff4 (diff)
parent5d905d4a9bee69919589e85dca7ab9980f213081 (diff)
Merge #367991 from ~jocave/plainbox-provider-checkbox:network-device-info-rework
Diffstat (limited to 'bin')
-rwxr-xr-xbin/network_device_info.py523
-rwxr-xr-xbin/network_info96
2 files changed, 317 insertions, 302 deletions
diff --git a/bin/network_device_info.py b/bin/network_device_info.py
index 29a4aed8..e36c2012 100755
--- a/bin/network_device_info.py
+++ b/bin/network_device_info.py
@@ -19,6 +19,11 @@
#
# Copyright (C) 2012-2019 Canonical, Ltd.
+import argparse
+import fcntl
+import os
+import socket
+import struct
from subprocess import check_output, CalledProcessError, STDOUT
import sys
@@ -28,234 +33,340 @@ from checkbox_support.parsers.modinfo import ModinfoParser
from checkbox_support.parsers.udevadm import UdevadmParser
-# This example lists basic information about network interfaces known to NM
-devtypes = {1: "Ethernet",
- 2: "WiFi",
- 5: "Bluetooth",
- 6: "OLPC",
- 7: "WiMAX",
- 8: "Modem"}
+class Utils():
-states = {0: "Unknown",
- 10: "Unmanaged",
- 20: "Unavailable",
- 30: "Disconnected",
- 40: "Prepare",
- 50: "Config",
- 60: "Need Auth",
- 70: "IP Config",
- 80: "IP Check",
- 90: "Secondaries",
- 100: "Activated",
- 110: "Deactivating",
- 120: "Failed"}
+ sys_path = '/sys/class/net'
-attributes = ("category", "interface", "product", "vendor", "driver", "path")
-
-udev_devices = []
-nm_devices = []
-
-
-class UdevResult:
- def addDevice(self, device):
- if device.category == 'NETWORK' and device.interface != "UNKNOWN":
- udev_devices.append(device)
-
-
-class NetworkingDevice():
- def __init__(self, devtype, props, dev_proxy, bus):
- self._devtype = devtype
+ @classmethod
+ def is_iface_connected(cls, iface):
try:
- self._interface = props['Interface']
- except KeyError:
- self._interface = "Unknown"
-
- try:
- self._ip = self._int_to_ip(props['Ip4Address'])
- except KeyError:
- self._ip = "Unknown"
-
- try:
- self._driver = props['Driver']
- except KeyError:
- self._driver = "Unknown"
- self._driver_ver = "Unknown"
-
- if self._driver != "Unknown":
- self._modinfo = self._modinfo_parser(props['Driver'])
- if self._modinfo:
- self._driver_ver = self._find_driver_ver()
- else:
- self._driver_ver = "Unknown"
-
- try:
- self._firmware_missing = props['FirmwareMissing']
- except KeyError:
- self._firmware_missing = False
-
+ carrier_file = os.path.join(cls.sys_path, iface, 'carrier')
+ return int(open(carrier_file, 'r').read()) == 1
+ except Exception:
+ pass
+ return False
+
+ @staticmethod
+ def get_ipv4_address(interface):
+ s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
+ return socket.inet_ntoa(fcntl.ioctl(
+ s.fileno(),
+ 0x8915, # SIOCGIFADDR
+ struct.pack('256s', interface[:15].encode())
+ )[20:24])
+
+ @staticmethod
+ def get_ipv6_address(interface):
+ cmd = ['/sbin/ip', '-6', '-o', 'addr', 'show', 'dev', interface,
+ 'scope', 'link']
+ proc = check_output(cmd, universal_newlines=True)
+ return proc.split()[3].strip()
+
+ @classmethod
+ def get_mac_address(cls, interface):
+ address_file = os.path.join(cls.sys_path, interface, 'address')
try:
- self._state = states[props['State']]
- except KeyError:
- self._state = "Unknown"
-
- def __str__(self):
- ret = "Category: %s\n" % self._devtype
- ret += "Interface: %s\n" % self._interface
- ret += "IP: %s\n" % self._ip
- ret += "Driver: %s (ver: %s)\n" % (self._driver, self._driver_ver)
- if self._firmware_missing:
- ret += "Warning: Required Firmware Missing for device\n"
- ret += "State: %s\n" % self._state
- return ret
-
- def getstate(self):
- return self._state
-
- def gettype(self):
- return self._devtype
+ return open(address_file, 'r').read().strip()
+ except IOError:
+ return 'UNKNOWN'
- def _bitrate_to_mbps(self, bitrate):
+ @classmethod
+ def get_speed(cls, interface):
+ speed_file = os.path.join(cls.sys_path, interface, 'speed')
try:
- intbr = int(bitrate)
- return str(intbr / 1000)
- except Exception:
- return "NaN"
+ return open(speed_file, 'r').read().strip()
+ except IOError:
+ return 'UNKNOWN'
- def _modinfo_parser(self, driver):
+ @staticmethod
+ def get_driver_version(driver):
cmd = ['/sbin/modinfo', driver]
try:
- stream = check_output(cmd, stderr=STDOUT, universal_newlines=True)
+ output = check_output(cmd, stderr=STDOUT, universal_newlines=True)
except CalledProcessError:
return None
-
- if not stream:
+ if not output:
return None
- else:
- parser = ModinfoParser(stream)
- modinfo = parser.get_all()
+ parser = ModinfoParser(output)
+ modinfo = parser.get_all()
- return modinfo
-
- def _find_driver_ver(self):
# try the version field first, then vermagic second, some audio
# drivers don't report version if the driver is in-tree
- if self._modinfo['version'] and self._modinfo['version'] != 'in-tree:':
- return self._modinfo['version']
- else:
+ version = modinfo.get('version')
+
+ if version is None or version == 'in-tree':
# vermagic will look like this (below) and we only care about the
# first part:
# "3.2.0-29-generic SMP mod_unload modversions"
- return self._modinfo['vermagic'].split()[0]
-
- def _int_to_ip(self, int_ip):
- ip = [0, 0, 0, 0]
- ip[0] = int_ip & 0xff
- ip[1] = (int_ip >> 8) & 0xff
- ip[2] = (int_ip >> 16) & 0xff
- ip[3] = (int_ip >> 24) & 0xff
- return "%d.%d.%d.%d" % (ip[0], ip[1], ip[2], ip[3])
-
-
-def print_udev_devices():
- print("[ Devices found by udev ]".center(80, '-'))
- for device in udev_devices:
- for attribute in attributes:
- value = getattr(device, attribute)
- if value is not None:
- if attribute == 'driver':
- props = {}
- props['Driver'] = value
- network_dev = NetworkingDevice(
- None, props, None, None)
- print("%s: %s (ver: %s)" % (
- attribute.capitalize(), value,
- network_dev._driver_ver))
- else:
- print("%s: %s" % (attribute.capitalize(), value))
- vendor_id = getattr(device, 'vendor_id')
- product_id = getattr(device, 'product_id')
- subvendor_id = getattr(device, 'subvendor_id')
- subproduct_id = getattr(device, 'subproduct_id')
- if vendor_id and product_id:
- print("ID: [{0:04x}:{1:04x}]".format(
- vendor_id, product_id))
- if subvendor_id and subproduct_id:
- print("Subsystem ID: [{0:04x}:{1:04x}]".format(
- subvendor_id, subproduct_id))
- print()
-
-
-def get_nm_devices():
- devices = []
- bus = dbus.SystemBus()
-
- # Get a proxy for the base NetworkManager object
- proxy = bus.get_object("org.freedesktop.NetworkManager",
- "/org/freedesktop/NetworkManager")
- manager = dbus.Interface(proxy, "org.freedesktop.NetworkManager")
-
- # Get all devices known to NM and print their properties
- nm_devices = manager.GetDevices()
- for d in nm_devices:
- dev_proxy = bus.get_object("org.freedesktop.NetworkManager", d)
- prop_iface = dbus.Interface(dev_proxy,
- "org.freedesktop.DBus.Properties")
- props = prop_iface.GetAll("org.freedesktop.NetworkManager.Device")
- try:
- devtype = devtypes[props['DeviceType']]
- except KeyError:
- devtype = "Unknown"
-
- # only return Ethernet devices
- if devtype == "Ethernet":
- devices.append(NetworkingDevice(devtype, props, dev_proxy, bus))
- return devices
-
-
-def main():
- if len(sys.argv) != 2 or sys.argv[1] not in ('detect', 'info'):
- raise SystemExit('ERROR: please specify detect or info')
- action = sys.argv[1]
-
- try:
- output = check_output(['udevadm', 'info', '--export-db'])
- except CalledProcessError as err:
- raise SystemExit(err)
- try:
- output = output.decode("UTF-8", errors='ignore')
- except UnicodeDecodeError as err:
- raise SystemExit("udevadm output is not valid UTF-8")
- udev = UdevadmParser(output)
- result = UdevResult()
- udev.run(result)
-
- # The detect action should indicate presence of an ethernet adatper and
- # cause the job to fail if none present - rely on udev for this
- if action == 'detect':
- if udev_devices:
- print_udev_devices()
+ version = modinfo.get('vermagic').split()[0]
+
+ return version
+
+
+class NetworkDeviceInfo():
+
+ def __init__(self):
+ self._category = None
+ self._interface = None
+ self._product = None
+ self._vendor = None
+ self._driver = None
+ self._driver_version = None
+ self._firmware_missing = None
+ self._path = None
+ self._id = None
+ self._subsystem_id = None
+ self._mac = None
+ self._carrier_status = None
+ self._ipv4 = None
+ self._ipv6 = None
+ self._speed = None
+
+ def __str__(self):
+ ret = ""
+ for key, val in vars(self).items():
+ if val is not None:
+ # leading _ removed, remaining ones spaces
+ pretty_key = key.lstrip('_').replace('_', ' ').title()
+ ret += '{}: {}\n'.format(pretty_key, val)
+ return ret
+
+ @property
+ def interface(self):
+ return self._interface
+
+ @interface.setter
+ def interface(self, value):
+ self._interface = value
+ self._interface_populate()
+
+ @property
+ def driver(self):
+ return self._driver
+
+ @driver.setter
+ def driver(self, value):
+ self._driver = value
+ self._driver_populate()
+
+ @property
+ def category(self):
+ return self._category
+
+ @category.setter
+ def category(self, value):
+ self._category = value
+
+ @property
+ def product(self):
+ return self._product
+
+ @product.setter
+ def product(self, value):
+ self._product = value
+
+ @property
+ def vendor(self):
+ return self._vendor
+
+ @vendor.setter
+ def vendor(self, value):
+ self._vendor = value
+
+ @property
+ def firmware_missing(self):
+ return self._firmware_missing
+
+ @firmware_missing.setter
+ def firmware_missing(self, value):
+ self._firmware_missing = value
+
+ @property
+ def path(self):
+ return self._path
+
+ @path.setter
+ def path(self, value):
+ self._path = value
+
+ @property
+ def id(self):
+ return self._id
+
+ @id.setter
+ def id(self, value):
+ self._id = value
+
+ @property
+ def subsystem_id(self):
+ return self._subsystem_id
+
+ @subsystem_id.setter
+ def subsystem_id(self, value):
+ self._subsystem_id = value
+
+ def _interface_populate(self):
+ """Get extra attributes based on interface"""
+ if self.interface is None:
+ return
+ self._mac = Utils.get_mac_address(self.interface)
+ if Utils.is_iface_connected(self.interface):
+ self._carrier_status = 'Connected'
+ self._ipv4 = Utils.get_ipv4_address(self.interface)
+ self._ipv6 = Utils.get_ipv6_address(self.interface)
+ self._speed = Utils.get_speed(self.interface)
else:
- raise SystemExit('No devices detected by udev')
+ self._carrier_status = 'Disconnected'
+
+ def _driver_populate(self):
+ """Get extra attributes based on driver"""
+ if self.driver is None:
+ return
+ self._driver_version = Utils.get_driver_version(self.driver)
+
+
+class NMDevices():
+
+ # This example lists basic information about network interfaces known to NM
+ devtypes = {1: "Ethernet",
+ 2: "WiFi",
+ 5: "Bluetooth",
+ 6: "OLPC",
+ 7: "WiMAX",
+ 8: "Modem"}
+
+ def __init__(self, category="NETWORK"):
+ self.category = category
+ self._devices = []
+ self._collect_devices()
+
+ def __len__(self):
+ return len(self._devices)
+
+ def _collect_devices(self):
+ bus = dbus.SystemBus()
+ proxy = bus.get_object("org.freedesktop.NetworkManager",
+ "/org/freedesktop/NetworkManager")
+ manager = dbus.Interface(proxy, "org.freedesktop.NetworkManager")
+ self._devices = manager.GetDevices()
+
+ def devices(self):
+ """Convert to list of NetworkDevice with NM derived attrs set"""
+ for d in self._devices:
+ bus = dbus.SystemBus()
+ dev_proxy = bus.get_object("org.freedesktop.NetworkManager", d)
+ prop_iface = dbus.Interface(dev_proxy,
+ "org.freedesktop.DBus.Properties")
+ props = prop_iface.GetAll("org.freedesktop.NetworkManager.Device")
+ devtype = self.devtypes.get(props.get('DeviceType'))
+ if devtype is None:
+ continue
+ nd = NetworkDeviceInfo()
+ if self.category == "NETWORK":
+ if devtype == "Ethernet":
+ nd.category = self.category
+ else:
+ continue
+ if self.category == "WIRELESS":
+ if devtype == "WiFi":
+ nd.category = self.category
+ else:
+ continue
+ nd.interface = props.get('Interface')
+ nd.driver = props.get('Driver')
+ nd.firmware_missing = props.get('FirmwareMissing')
+ yield nd
- # The info action should just gather infomation about any ethernet devices
- # found and report for inclusion in e.g. an attachment job
- if action == 'info':
- # Report udev detected devices first
- if udev_devices:
- print_udev_devices()
- # Attempt to report devices found by NetworkManager - this doesn't
- # make sense for server installs so skipping is acceptable
+class UdevDevices():
+
+ def __init__(self, category='NETWORK'):
+ self.category = category
+ self._devices = []
+ self._collect_devices()
+
+ def __len__(self):
+ return len(self._devices)
+
+ def _collect_devices(self):
+ cmd = ['udevadm', 'info', '--export-db']
try:
- nm_devices = get_nm_devices()
- except dbus.exceptions.DBusException:
- # server's don't have network manager installed
- print('Network Manager not found')
- else:
- print("[ Devices found by Network Manager ]".center(80, '-'))
- for nm_dev in nm_devices:
- print(nm_dev)
+ output = check_output(cmd).decode(sys.stdout.encoding)
+ except CalledProcessError as err:
+ sys.stderr.write(err)
+ return
+ udev = UdevadmParser(output)
+ udev.run(self)
+
+ def addDevice(self, device):
+ """Callback for UdevadmParser"""
+ if device.category == self.category and device.interface != 'UNKNOWN':
+ self._devices.append(device)
+
+ def devices(self):
+ """Convert to list of NetworkDevice with UDev derived attrs set"""
+ for device in self._devices:
+ nd = NetworkDeviceInfo()
+ nd.category = getattr(device, 'category')
+ nd.interface = getattr(device, 'interface')
+ nd.product = getattr(device, 'product')
+ nd.vendor = getattr(device, 'vendor')
+ nd.driver = getattr(device, 'driver')
+ nd.path = getattr(device, 'path')
+ nd.id = '[{0:04x}:{1:04x}]'.format(
+ getattr(device, 'vendor_id'),
+ getattr(device, 'product_id'))
+ nd.subsystem_id = '[{0:04x}:{1:04x}]'.format(
+ getattr(device, 'subvendor_id'),
+ getattr(device, 'subproduct_id'))
+ yield nd
if __name__ == "__main__":
- main()
+ parser = argparse.ArgumentParser(
+ description='Gather information about network devices')
+ parser.add_argument('action', choices=('detect', 'info'),
+ help='Detect mode or just report info')
+ parser.add_argument('category', choices=('NETWORK', 'WIRELESS'),
+ help='Either ethernet or WLAN devices')
+ parser.add_argument('--no-nm', action='store_true',
+ help='Don\'t attempt to get info from network manager')
+ parser.add_argument('--interface',
+ help='Restrict info action to specified interface')
+ args = parser.parse_args()
+
+ udev = UdevDevices(args.category)
+
+ # The detect action should indicate presence of a device belonging to the
+ # category and cause the job to fail if none present
+ if args.action == 'detect':
+ if len(udev) == 0:
+ raise SystemExit('No devices detected by udev')
+ else:
+ print("[ Devices found by udev ]".center(80, '-'))
+ for device in udev.devices():
+ print(device)
+
+ # The info action should just gather infomation about any ethernet devices
+ # found and report for inclusion in e.g. an attachment job and include
+ # NetworkManager as a source if desired
+ if args.action == 'info':
+ # If interface has been specified
+ if args.interface:
+ for device in udev.devices():
+ if device.interface == args.interface:
+ print(device)
+ sys.exit(0)
+
+ # Report udev detected devices first
+ print("[ Devices found by udev ]".center(80, '-'))
+ for device in udev.devices():
+ print(device)
+
+ # Attempt to report devices found by NetworkManager. This can be
+ # skipped as doesn't make sense for server installs
+ if not args.no_nm:
+ nm = NMDevices(args.category)
+ print("[ Devices found by Network Manager ]".center(80, '-'))
+ for device in nm.devices():
+ print(device)
diff --git a/bin/network_info b/bin/network_info
deleted file mode 100755
index d3ff5df0..00000000
--- a/bin/network_info
+++ /dev/null
@@ -1,96 +0,0 @@
-#!/usr/bin/env python3
-
-import os
-import sys
-import subprocess
-import socket
-import fcntl
-import struct
-
-SYS_PATH = '/sys/class/net'
-
-
-def _read_file(file):
- source = open(file, 'r')
- content = source.read()
- source.close()
- return content
-
-
-def get_connected(interface):
- STATUS = ('No', 'Yes')
- carrier_file = os.path.join(SYS_PATH, interface, 'carrier')
-
- carrier = 0
- try:
- carrier = int(_read_file(carrier_file))
- except IOError:
- pass
-
- return STATUS[carrier]
-
-
-def get_ip_address(interface):
- s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
- return socket.inet_ntoa(fcntl.ioctl(
- s.fileno(),
- 0x8915, # SIOCGIFADDR
- struct.pack('256s', interface[:15].encode())
- )[20:24])
-
-
-def get_ipv6_address(interface):
- cmd = ['/sbin/ip', '-6', '-o', 'addr', 'show',
- 'dev', interface, 'scope', 'link']
- proc = subprocess.check_output(cmd, universal_newlines=True)
- ipaddr = proc.split()[3].strip()
- return ipaddr
-
-
-def get_mac_address(interface):
- address_file = os.path.join(SYS_PATH, interface, 'address')
-
- address = ''
- try:
- address = _read_file(address_file).strip()
- except IOError:
- pass
-
- return address
-
-
-def get_speed(interface):
- speed_file = os.path.join(SYS_PATH, interface, 'speed')
-
- speed = ''
- try:
- speed = _read_file(speed_file).strip()
- except IOError:
- pass
-
- return speed
-
-
-def main(args):
- for interface in args:
- connected = get_connected(interface)
- print("Interface: %s" % interface)
- print("Connected: %s" % connected)
- try:
- print("IPv4: %s" % get_ip_address(interface))
- except IOError:
- print("IPv4: n/a")
- try:
- print("IPv6 (link local): %s" % get_ipv6_address(interface))
- except IOError:
- print("IPv6 (link local): n/a")
- except:
- print("IPv6 (link local): n/a")
- print("MAC: %s" % get_mac_address(interface))
- print("Connect Speed: %s\n" % get_speed(interface))
-
- return 0
-
-
-if __name__ == "__main__":
- sys.exit(main(sys.argv[1:]))