Skip to content
133 changes: 80 additions & 53 deletions adafruit_mcp2515/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,15 +99,26 @@
# Filters & Masks
_RXM0SIDH = const(0x20)
_RXM1SIDH = const(0x24)
MASKS = [_RXM0SIDH, _RXM1SIDH]

_RXF0SIDH = const(0x00)
_RXF1SIDH = const(0x04)
_RXF2SIDH = const(0x08)
_RXF3SIDH = const(0x10)
_RXF4SIDH = const(0x14)
_RXF5SIDH = const(0x18)
FILTERS = [[_RXF0SIDH, _RXF1SIDH], [_RXF2SIDH, _RXF3SIDH, _RXF4SIDH, _RXF5SIDH]]

MASKS_FILTERS = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This data should be an instance member (self.masks_filters), because otherwise a system with more than one MCP2515 chip in it would not function properly.

_RXM0SIDH: [None, {
_RXF0SIDH: None,
_RXF1SIDH: None
}],
_RXM1SIDH: [None, {
_RXF2SIDH: None,
_RXF3SIDH: None,
_RXF4SIDH: None,
_RXF5SIDH: None
}]
}
# bits/flags
_RX0IF = const(0x01)
_RX1IF = const(0x02)
Expand Down Expand Up @@ -309,8 +320,6 @@ def __init__(
self._tx_buffers = []
self._rx0_overflow = False
self._rx1_overflow = False
self._masks_in_use = []
self._filters_in_use = [[], []]
self._mode = None
self._bus_state = BusState.ERROR_ACTIVE
self._baudrate = baudrate
Expand Down Expand Up @@ -538,13 +547,8 @@ def _start_transmit(self, tx_buffer):
in_end=1,
)

def _set_filter_register(self, filter_index, mask, extended):
filter_reg_addr = FILTERS[filter_index]
self._write_id_to_register(filter_reg_addr, mask, extended)

def _set_mask_register(self, mask_index, mask, extended):
mask_reg_addr = MASKS[mask_index]
self._write_id_to_register(mask_reg_addr, mask, extended)
def _set_acceptance_register(self, register, value, is_extended):
self._write_id_to_register(register, value, is_extended)

@staticmethod
def _unload_ids(raw_ids):
Expand Down Expand Up @@ -774,45 +778,72 @@ def _get_bus_status(self):
else:
self._bus_state = BusState.ERROR_ACTIVE

def _create_mask(self, match):
mask = match.mask
if mask == 0:
if match.extended:
mask = EXTID_BOTTOM_29_MASK
else:
mask = STDID_BOTTOM_11_MASK

masks_used = len(self._masks_in_use)
if masks_used < len(MASKS):
next_mask_index = masks_used

self._set_mask_register(next_mask_index, mask, match.extended)
self._masks_in_use.append(MASKS[next_mask_index])
return next_mask_index

raise RuntimeError("No Masks Available")

def _create_filter(self, match, mask_index):
def _find_mask_and_filter(self, match_mask: int):
"""Finds the optimal mask and filter registers for the given mask.

next_filter_index = len(self._filters_in_use[mask_index])
if next_filter_index == len(FILTERS[mask_index]):
raise RuntimeError("No Filters Available")
Returns a tuple of (mask, filter) registers.
Returns `None` if no mask and filter are available.

filter_register = FILTERS[mask_index][next_filter_index]
Args:
mask (int): The mask value to fit.
"""

self._write_id_to_register(filter_register, match.address, match.extended)
self._filters_in_use[mask_index].append(filter_register)
mask: int = None
filt: int = None

# First try to find a matching mask with a free filter
for mask_reg, [mask_val, filts] in MASKS_FILTERS.items():
if mask_val == match_mask:
# Matching mask, look for a free filter
for filt_reg, filt_val in filts.items():
if not filt_val:
# Free filter
mask = mask_reg
filt = filt_reg
break

if mask:
# We're done here
break

if mask is None:
# Then try to find a free mask
for mask_reg, [mask_val, filts] in MASKS_FILTERS.items():
if mask_val is None:
mask = mask_reg
filt = next(iter(filts.keys()))
break

return (mask, filt) if mask is not None else None

def _create_mask_and_filter(self, match: Match):
actual_mask = match.mask
if match.mask == 0:
actual_mask = EXTID_BOTTOM_29_MASK if match.extended \
else STDID_BOTTOM_11_MASK

result = self._find_mask_and_filter(actual_mask)
if not result:
raise Exception(
"No mask and filter is available for "
f"mask 0x{actual_mask:03x}, addr 0x{match.address:03x}"
)

(mask, filt) = result
self._set_acceptance_register(mask, actual_mask, match.extended)
self._set_acceptance_register(filt, match.address, match.extended)
MASKS_FILTERS[mask][0] = actual_mask
MASKS_FILTERS[mask][1][filt] = match.address

def deinit_filtering_registers(self):
"""Clears the Receive Mask and Filter Registers"""

for mask_index, mask_reg in enumerate(MASKS):
for mask_reg, mask_data in MASKS_FILTERS.items():
self._set_register(mask_reg, 0)
mask_data[0] = None # Mask value
for filt_reg in mask_data[1]:
self._set_register(filt_reg, 0)
mask_data[1][filt_reg] = None

for filter_reg in FILTERS[mask_index]:
self._set_register(filter_reg, 0)
self._masks_in_use = []
self._filters_in_use = [[], []]

######## CANIO API METHODS #############
@property
Expand Down Expand Up @@ -905,17 +936,13 @@ def listen(self, matches=None, *, timeout: float = 10):

for match in matches:
self._dbg("match:", match)
mask_index_used = self._create_mask(match)
self._create_filter(match, mask_index=mask_index_used)

used_masks = len(self._masks_in_use)
# if matches were made and there are unused masks
# set the unused masks to prevent them from leaking packets
if len(matches) > 0 and used_masks < len(MASKS):
next_mask_index = used_masks
for idx in range(next_mask_index, len(MASKS)):
print("using unused mask index:", idx)
self._create_mask(matches[-1])
self._create_mask_and_filter(match)

for mask_reg, [mask_val, _] in MASKS_FILTERS.items():
if mask_val is None:
# Mask unused, set it to a match to prevent leakage
self._set_acceptance_register(mask_reg, match.mask, match.extended)


return Listener(self, timeout)

Expand Down