diff options
author | PMR <pmr@pmr-lander> | 2018-06-08 16:26:07 +0000 |
---|---|---|
committer | PMR <pmr@pmr-lander> | 2018-06-08 16:26:07 +0000 |
commit | 17f7dbbf8dd64347fe333f92d1dccb4d951b483e (patch) | |
tree | e2d63d0ed4717508d02376738c4ae6eb184b2d0e | |
parent | 14264463afb5e839075d5349c2d276e62f2346a1 (diff) | |
parent | 640705622f82bc5c119b222dd5f46d4ac41d0f85 (diff) |
Merge #347430 from ~jocave/plainbox-provider-checkbox:create-socketcan-tests
-rwxr-xr-x | bin/socketcan_test.py | 199 | ||||
-rw-r--r-- | units/socketcan/category.pxu | 3 | ||||
-rw-r--r-- | units/socketcan/jobs.pxu | 166 | ||||
-rw-r--r-- | units/socketcan/manifest.pxu | 5 | ||||
-rw-r--r-- | units/socketcan/test-plan.pxu | 28 |
5 files changed, 401 insertions, 0 deletions
diff --git a/bin/socketcan_test.py b/bin/socketcan_test.py new file mode 100755 index 0000000..232fc2c --- /dev/null +++ b/bin/socketcan_test.py @@ -0,0 +1,199 @@ +#!/usr/bin/env python3 +# This file is part of Checkbox. +# +# Copyright 2018 Canonical Ltd. +# Written by: +# Jonathan Cave <jonathan.cave@canonical.com> +# +# Checkbox is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License version 3, +# as published by the Free Software Foundation. +# +# Checkbox is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with Checkbox. If not, see <http://www.gnu.org/licenses/>. + +import argparse +import os +import socket +import struct +import sys +import textwrap +import threading +import time + + +class CANSocket(): + + # struct module format strings for CAN packets + # Normal format: + # < little-endian + # I unsigned int (4) : CAN-ID + EFF/RTR/ERR Flags + # B unsigned char (1) : Data length + # 3x padding (3 * 1) : - + # 8s char array (8 * 1) : Data + FORMAT = "<IB3x8s" + # Flexible Data (FD) rate format: + # < little-endian + # I unsigned int (4) : CAN-ID + EFF/RTR/ERR Flags + # B unsigned char (1) : Data length + # B unsigned char (1) : FD Flags + # 2x padding (2 * 1) : - + # 64s char array (64 * 1) : Data + FD_FORMAT = "<IBB2x64s" + + CAN_MTU = struct.Struct(FORMAT).size + CANFD_MTU = struct.Struct(FD_FORMAT).size + + # Socket options from <linux/can/raw.h> + CAN_RAW_FILTER = 1 # set 0 .. n can_filter(s) + CAN_RAW_ERR_FILTER = 2 # set filter for error frames + CAN_RAW_LOOPBACK = 3 # local loopback (default:on) + CAN_RAW_RECV_OWN_MSGS = 4 # receive my own msgs (default:off) + CAN_RAW_FD_FRAMES = 5 # allow CAN FD frames (default:off) + CAN_RAW_JOIN_FILTERS = 6 # all filters must match to trigger + + def __init__(self, interface=None, fdmode=False, loopback=True): + self.sock = socket.socket(socket.PF_CAN, # protocol family + socket.SOCK_RAW, + socket.CAN_RAW) + self._fdmode = fdmode + self._loopback = loopback + if interface is not None: + self._bind(interface) + + def __enter__(self): + return self + + def __exit__(self, *args): + self.close() + + def close(self): + self.sock.close() + + def _bind(self, interface): + self.sock.bind((interface,)) + if self._fdmode: # default is off + self.sock.setsockopt(socket.SOL_CAN_RAW, self.CAN_RAW_FD_FRAMES, 1) + if not self._loopback: # default is on + self.sock.setsockopt(socket.SOL_CAN_RAW, self.CAN_RAW_LOOPBACK, 0) + + def send(self, can_id, data, id_flags=0, fd_flags=0): + can_id = can_id | id_flags + if self._fdmode: + can_pkt = struct.pack(self.FD_FORMAT, can_id, len(data), fd_flags, + data) + else: + can_pkt = struct.pack(self.FORMAT, can_id, len(data), data) + self.sock.send(can_pkt) + + def recv(self): + if self._fdmode: + can_pkt = self.sock.recv(self.CANFD_MTU) + can_id, length, fd_flags, data = struct.unpack(self.FD_FORMAT, + can_pkt) + else: + can_pkt = self.sock.recv(self.CAN_MTU) + can_id, length, data = struct.unpack(self.FORMAT, can_pkt) + can_id &= socket.CAN_EFF_MASK + return (can_id, data[:length]) + + +def echo_test(args): + # ID conversion and size check + print('Using source ID: {}'.format(args.can_id)) + can_id_i = int(args.can_id, 16) + if can_id_i > 2047 and not args.effid: + raise SystemExit('ERROR: CAN ID to high for SFF') + id_flags = 0 + if args.effid: + print('Setting EFF CAN ID flag') + id_flags = socket.CAN_EFF_FLAG + + # Whether to enable local loopback, required for local only test + # but only want to parse packets from other end if remote + loopback = not args.remote + + # Default data size is 8 bytes but if testing FD Mode use 64 + data_size = 8 + if args.fdmode: + data_size = 64 + data_b = os.urandom(data_size) + print('Sending data: {}'.format(data_b.hex())) + + recv_id_i = None + recv_data_b = None + + def receive(): + nonlocal recv_id_i + nonlocal recv_data_b + print('Opening read socket on {}'.format(args.interface)) + with CANSocket(args.interface, fdmode=args.fdmode, + loopback=loopback) as recv_s: + recv_id_i, recv_data_b = recv_s.recv() + + # Create a receive thread + recv_t = threading.Thread(target=receive, daemon=True) + recv_t.start() + time.sleep(1) + + print('Opening send socket on {}'.format(args.interface)) + # Open socket, will raise OSError on failure + with CANSocket(args.interface, fdmode=args.fdmode, + loopback=loopback) as send_s: + print('Sending data...', flush=True) + try: + send_s.send(can_id_i, data_b, id_flags=id_flags) + except OSError as e: + print(e, file=sys.stderr) + if e.errno == 90: + raise SystemExit('ERROR: interface does not support FD Mode') + else: + raise SystemExit('ERROR: OSError on attempt to send') + + recv_t.join(10) + if recv_t.is_alive(): + raise SystemExit('ERROR: Timeout waiting to receive data') + + print('Received packet') + print(' ID : {:x}'.format(recv_id_i)) + print(' Data: {}'.format(recv_data_b.hex())) + if recv_id_i != can_id_i or recv_data_b != data_b: + raise SystemExit('ERROR: ID/Data received does not match sent') + + print('\nPASSED') + + +def main(): + parser = argparse.ArgumentParser( + formatter_class=argparse.RawDescriptionHelpFormatter, + description='SocketCAN Tests', + epilog=textwrap.dedent(''' + Examples: + socketcan_test.py can0 123 + socketcan_test.py can0 212 --remote + socketcan_test.py can0 FA123 --effid + socketcan_test.py can0 E407DB --effid --fdmode''').lstrip()) + parser.add_argument('interface', type=str, help='Interface name e.g. can0') + parser.add_argument('can_id', type=str, help=textwrap.dedent(''' + CAN ID of source in Hex, max of 11 bits using Standard Frame + Format (SFF). Specifying use of Extended Frame Format (EFF) + allows the use of up to 29 bit IDs.''').lstrip()) + parser.add_argument('--remote', action='store_true', + help='Expect a remote device to echo the test packet') + parser.add_argument('--effid', action='store_true', + help='Use EFF ID (CAN 2.0 B)') + parser.add_argument('--fdmode', action='store_true', + help='Attempt to send 64 bytes of data i.e. FD mode') + parser.set_defaults(func=echo_test) + + args = parser.parse_args() + args.func(args) + + +if __name__ == "__main__": + main() diff --git a/units/socketcan/category.pxu b/units/socketcan/category.pxu new file mode 100644 index 0000000..4c43f37 --- /dev/null +++ b/units/socketcan/category.pxu @@ -0,0 +1,3 @@ +unit: category +id: socketcan +_name: Test SocketCAN interfaces diff --git a/units/socketcan/jobs.pxu b/units/socketcan/jobs.pxu new file mode 100644 index 0000000..0bf22a1 --- /dev/null +++ b/units/socketcan/jobs.pxu @@ -0,0 +1,166 @@ + +id: socketcan/modprobe_vcan +category_id: socketcan +_summary: Create virtual CAN device +_description: + Add a virtual CAN interface for testing kernel CAN support +plugin: shell +user: root +estimated_duration: 2.0 +command: + BASH_XTRACEFD=1 + set -ex + if lsmod | grep -v "vcan" &> /dev/null ; then + modprobe vcan + fi + if ! ip link show vcan0 &> /dev/null ; then + ip link add vcan0 type vcan + fi + ip link set vcan0 up + + +id: socketcan/send_packet_local_sff_virtual +depends: socketcan/modprobe_vcan +_summary: Virtual CAN device support test (Raw, Local) +_description: + Test that the kernel supports CAN networking by sending packets to a + virtual device using a raw socket, this is only a local test as + the broadcast packet is received on the same device +category_id: socketcan +plugin: shell +estimated_duration: 2.0 +command: + socketcan_test.py vcan0 111 + + +id: socketcan/send_packet_local_eff_virtual +depends: socketcan/modprobe_vcan +_summary: Virtual CAN device support test (Raw, Local, EFF) +_description: + Test that the kernel supports CAN networking by sending packets to a + virtual device using a raw socket, this is only a local test as + the broadcast packet is received on the same device +category_id: socketcan +plugin: shell +estimated_duration: 2.0 +command: + socketcan_test.py vcan0 1F334455 --effid + + +id: socketcan/send_packet_local_fd_virtual +depends: socketcan/modprobe_vcan +_summary: Virtual CAN device support test (Raw, Local, FD) +_description: + Test that the kernel supports CAN networking by sending packets to a + virtual device using a raw socket, this is only a local test as + the broadcast packet is received on the same device +category_id: socketcan +plugin: shell +estimated_duration: 2.0 +command: + socketcan_test.py vcan0 1A --fdmode + + +unit: template +template-resource: device +template-filter: device.category == 'SOCKETCAN' +id: socketcan/send_packet_local_sff_{interface} +_summary: CAN device support test {interface} (Raw, Local) +_description: + Test a CAN device by sending packets using a raw socket, this is only a + local test as the broadcast packet is received on the same device +category_id: socketcan +plugin: shell +estimated_duration: 2.0 +imports: from com.canonical.plainbox import manifest +requires: manifest.socket_can_echo_server_running == 'False' +command: + socketcan_test.py {interface} 111 + + +unit: template +template-resource: device +template-filter: device.category == 'SOCKETCAN' +id: socketcan/send_packet_local_eff_{interface} +_summary: CAN device support test {interface} (Raw, Local, EFF) +_description: + Test a CAN device by sending packets using a raw socket, this is only a + local test as the broadcast packet is received on the same device +category_id: socketcan +plugin: shell +estimated_duration: 2.0 +imports: from com.canonical.plainbox import manifest +requires: manifest.socket_can_echo_server_running == 'False' +command: + socketcan_test.py {interface} FA123 --effid + + +unit: template +template-resource: device +template-filter: device.category == 'SOCKETCAN' +id: socketcan/send_packet_local_fd_{interface} +_summary: CAN device support test {interface} (Raw, Local, FD) +_description: + Test a CAN device by sending packets using a raw socket, this is only a + local test as the broadcast packet is received on the same device +category_id: socketcan +plugin: shell +estimated_duration: 2.0 +imports: from com.canonical.plainbox import manifest +requires: manifest.socket_can_echo_server_running == 'False' +command: + socketcan_test.py {interface} 1B --fdmode + + +unit: template +template-resource: device +template-filter: device.category == 'SOCKETCAN' +id: socketcan/send_packet_remote_sff_{interface} +_summary: CAN device support test {interface} (Raw, Remote) +_description: + Test a CAN device by sending packets using a raw socket to a remote device. + As a prerequisite the remote device should have can-echo-server installed so + as to return the predicted packet. +category_id: socketcan +plugin: shell +estimated_duration: 5.0 +imports: from com.canonical.plainbox import manifest +requires: manifest.socket_can_echo_server_running == 'True' +command: + socketcan_test.py {interface} 111 --remote + + +unit: template +template-resource: device +template-filter: device.category == 'SOCKETCAN' +id: socketcan/send_packet_remote_eff_{interface} +_summary: CAN device support test {interface} (Raw, Remote, EFF) +_description: + Test a CAN device by sending packets using a raw socket to a remote device. + As a prerequisite the remote device should have can-echo-server installed so + as to return the predicted packet. +category_id: socketcan +plugin: shell +estimated_duration: 5.0 +imports: from com.canonical.plainbox import manifest +requires: manifest.socket_can_echo_server_running == 'True' +command: + socketcan_test.py {interface} E407DB --remote --effid + + +unit: template +template-resource: device +template-filter: device.category == 'SOCKETCAN' +id: socketcan/send_packet_remote_fd_{interface} +_summary: CAN device support test {interface} (Raw, Remote, FD) +_description: + Test a CAN device by sending packets using a raw socket to a remote device. + As a prerequisite the remote device should have can-echo-server installed so + as to return the predicted packet. +category_id: socketcan +plugin: shell +estimated_duration: 5.0 +imports: from com.canonical.plainbox import manifest +requires: manifest.socket_can_echo_server_running == 'True' +command: + socketcan_test.py {interface} 19F --remote --fdmode diff --git a/units/socketcan/manifest.pxu b/units/socketcan/manifest.pxu new file mode 100644 index 0000000..bb140c4 --- /dev/null +++ b/units/socketcan/manifest.pxu @@ -0,0 +1,5 @@ + +unit: manifest entry +id: socket_can_echo_server_running +_name: SocketCAN Echo Server Running +value-type: bool diff --git a/units/socketcan/test-plan.pxu b/units/socketcan/test-plan.pxu new file mode 100644 index 0000000..c4e343e --- /dev/null +++ b/units/socketcan/test-plan.pxu @@ -0,0 +1,28 @@ + +id: socketcan-auto-remote +unit: test plan +_name: SocketCAN Tests (Automated, Remote) +_description: + SocketCAN Tests (Automated, Remote) +include: + socketcan/send_packet_remote_.* + + +id: socketcan-auto-local +unit: test plan +_name: SocketCAN Tests, (Automated, Local) +_description: + SocketCAN Tests, (Automated, Local) +include: + socketcan/send_packet_local_.* + + +id: socketcan-full +unit: test plan +_name: SocketCAN Tests +_description: + SocketCAN Tests +include: +nested_part: + socketcan-auto-remote + socketcan-auto-local |