Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions can/interfaces/socketcan/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
# BCM opcodes
CAN_BCM_TX_SETUP = 1
CAN_BCM_TX_DELETE = 2
CAN_BCM_TX_READ = 3

# BCM flags
SETTIMER = 0x0001
Expand Down
26 changes: 26 additions & 0 deletions can/interfaces/socketcan/socketcan.py
Original file line number Diff line number Diff line change
Expand Up @@ -330,6 +330,32 @@ def _tx_setup(self, messages):
ival1 = 0
ival2 = self.period

# First do a TX_READ before creating a new task, and check if we get
# EINVAL. If so, then we are referring to a CAN message with the same
# ID
check_header = build_bcm_header(
opcode=CAN_BCM_TX_READ,
flags=0,
count=0,
ival1_seconds=0,
ival1_usec=0,
ival2_seconds=0,
ival2_usec=0,
can_id=self.can_id_with_flags,
nframes=0,
)
try:
self.bcm_socket.send(check_header)
except OSError as e:
if e.errno != errno.EINVAL:
raise e
else:
raise ValueError(
"A periodic Task for Arbitration ID {} has already been created".format(
messages[0].arbitration_id
)
)

header = build_bcm_transmit_header(
self.can_id_with_flags,
count,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@


@unittest.skipUnless(TEST_INTERFACE_SOCKETCAN, "skip testing of socketcan")
class SocketCanCyclicMultiple(unittest.TestCase):
class CyclicSocketCan(unittest.TestCase):
BITRATE = 500000
TIMEOUT = 0.1

Expand Down Expand Up @@ -244,6 +244,27 @@ def test_cyclic_initializer_different_arbitration_ids(self):
with self.assertRaises(ValueError):
task = self._send_bus.send_periodic(messages, self.PERIOD)

def test_create_same_id_raises_exception(self):
messages_a = can.Message(
arbitration_id=0x401,
data=[0x11, 0x11, 0x11, 0x11, 0x11, 0x11],
is_extended_id=False,
)

messages_b = can.Message(
arbitration_id=0x401,
data=[0x22, 0x22, 0x22, 0x22, 0x22, 0x22],
is_extended_id=False,
)

task_a = self._send_bus.send_periodic(messages_a, 1)
self.assertIsInstance(task_a, can.broadcastmanager.CyclicSendTaskABC)

# The second one raises a ValueError when we attempt to create a new
# Task, since it has the same arbitration ID.
with self.assertRaises(ValueError):
task_b = self._send_bus.send_periodic(messages_b, 1)

def test_modify_data_list(self):
messages_odd = []
messages_odd.append(
Expand Down