Skip to content
102 changes: 102 additions & 0 deletions adafruit_portalbase/network.py
Original file line number Diff line number Diff line change
Expand Up @@ -735,6 +735,108 @@ def process_json(self, json_data, json_path):
values = json.dumps(json_data)
return values

def _tz_from_env(self, default: float = 0.0) -> float:
tz = default
v = os.getenv("NTP_TZ")
if v:
try:
tz = float(v)
except ValueError:
pass
v = os.getenv("NTP_DST")
if v:
try:
tz += float(v)
except ValueError:
pass
return tz

def _socketpool_for_wifi(self):
"""Return a SocketPool for whichever Wi-Fi backend is available.
Works with native ESP32-S2/S3/C6 (wifi.radio) and ESP32SPI coprocessors.
Some CP10 board wrappers may not expose .radio/.esp; in that case,
try the native wifi.radio directly.
"""
wm = getattr(self, "_wifi", None)
radio = getattr(wm, "radio", None)
esp = getattr(wm, "esp", None) or getattr(wm, "_esp", None)

# CP10/MagTag fallback: wrapper didn't expose .radio/.esp -> use native radio
if (radio is None) and (esp is None):
try:
import wifi as _wifi_mod # type: ignore

radio = getattr(_wifi_mod, "radio", None)
except Exception:
radio = None

target = radio if (radio is not None) else esp
if target is None:
raise RuntimeError("No WiFi radio/esp found")

# Prefer connection_manager helper, else direct SocketPool fallback
try:
from adafruit_connection_manager import get_radio_socketpool # lazy import

return get_radio_socketpool(target)
except Exception:
import socketpool # type: ignore

return socketpool.SocketPool(target)

def _wait_for_ready_optional(self, timeout: float = 10.0, poll: float = 0.05) -> None:
wm = getattr(self, "_wifi", None)
ready = getattr(wm, "esp32_ready", None)
if ready is None:
return
start = time.monotonic()
while time.monotonic() - start < timeout:
if not ready.value:
return
time.sleep(poll)
raise TimeoutError("ESP32 not responding")

def time_sync(self, server=None, timeout=None, retries=None, tz=None):
server = server or os.getenv("NTP_SERVER", "0.adafruit.pool.ntp.org")
retries = int(os.getenv("NTP_RETRIES", "8")) if retries is None else int(retries)
timeout = float(os.getenv("NTP_TIMEOUT", "5.0")) if timeout is None else float(timeout)
tz = self._tz_from_env() if tz is None else float(tz)

if not self.is_connected:
self.connect()

try:
self._wait_for_ready_optional()
except Exception:
pass

pool = self._socketpool_for_wifi()

last_exc = None
for _ in range(max(1, retries)):
try:
from adafruit_ntp import NTP # lazy import for CI/tests

try:
ntp = NTP(pool, server=server, tz=tz, socket_timeout=timeout)
except TypeError:
ntp = NTP(pool, server=server, tz_offset=tz, socket_timeout=timeout)

setter = getattr(ntp, "set_time", None)
if setter:
setter()
return time.localtime()

now = ntp.datetime
if rtc:
rtc.RTC().datetime = now
return now
except Exception as ex:
last_exc = ex
time.sleep(0.5)

raise last_exc or RuntimeError("NTP sync failed")

@property
def is_connected(self):
"""Return whether we are connected."""
Expand Down
2 changes: 1 addition & 1 deletion adafruit_portalbase/wifi_coprocessor.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import adafruit_connection_manager
import adafruit_requests
import board
from adafruit_esp32spi import adafruit_esp32spi, adafruit_esp32spi_wifimanager
from adafruit_esp32spi import adafruit_esp32spi
from digitalio import DigitalInOut

__version__ = "0.0.0+auto.0"
Expand Down
30 changes: 30 additions & 0 deletions examples/portalbase_time.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
# SPDX-FileCopyrightText: 2025 Mikey Sklar for Adafruit Industries
#
# SPDX-License-Identifier: MIT

# Uncomment ONE board setup:

# --- PyPortal ---
# from adafruit_pyportal import PyPortal
# portal = PyPortal(status_neopixel=None)
# net = portal.network

# --- MatrixPortal M4 ---
# from adafruit_matrixportal.matrixportal import MatrixPortal
# portal = MatrixPortal(status_neopixel=None)
# net = portal.network

# --- MagTag (ESP32-S2 native WiFi) ---
# from adafruit_magtag.magtag import MagTag
# magtag = MagTag(status_neopixel=None)
# net = magtag.network

# --- Fruit Jam (wrap its WiFi) ---
from adafruit_fruitjam import FruitJam

jam = FruitJam(status_neopixel=None)
net = jam.network


# --- shared output ---
print(net.time_sync())