Skip to content
1 change: 1 addition & 0 deletions docs/changelog/2074.feature.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add AppData and Cache protocols to discovery for decoupling - by :user:`esafak`.
2 changes: 1 addition & 1 deletion src/virtualenv/create/via_global_ref/venv.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ class Venv(ViaGlobalRefApi):
def __init__(self, options, interpreter) -> None:
self.describe = options.describe
super().__init__(options, interpreter)
current = PythonInfo.current()
current = PythonInfo.current(options.app_data, options.cache)
self.can_be_inline = interpreter is current and interpreter.executable == interpreter.system_executable
self._context = None

Expand Down
23 changes: 23 additions & 0 deletions src/virtualenv/discovery/app_data.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
from __future__ import annotations

from contextlib import contextmanager
from typing import TYPE_CHECKING, Any, ContextManager, Protocol

if TYPE_CHECKING:
from pathlib import Path


class AppData(Protocol):
"""Protocol for application data store."""

def py_info(self, path: Path) -> Any: ...

def py_info_clear(self) -> None: ...

@contextmanager
def ensure_extracted(self, path: Path, to_folder: Path | None = None) -> ContextManager[Path]: ...

@contextmanager
def extract(self, path: Path, to_folder: Path | None = None) -> ContextManager[Path]: ...

def close(self) -> None: ...
31 changes: 18 additions & 13 deletions src/virtualenv/discovery/builtin.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
from argparse import ArgumentParser
from collections.abc import Callable, Generator, Iterable, Mapping, Sequence

from virtualenv.app_data.base import AppData
from .app_data import AppData
LOGGER = logging.getLogger(__name__)


Expand All @@ -27,8 +27,8 @@ class Builtin(Discover):
app_data: AppData
try_first_with: Sequence[str]

def __init__(self, options) -> None:
super().__init__(options)
def __init__(self, options, cache=None) -> None:
super().__init__(options, cache)
self.python_spec = options.python or [sys.executable]
if self._env.get("VIRTUALENV_PYTHON"):
self.python_spec = self.python_spec[1:] + self.python_spec[:1] # Rotate the list
Expand Down Expand Up @@ -60,7 +60,7 @@ def add_parser_arguments(cls, parser: ArgumentParser) -> None:

def run(self) -> PythonInfo | None:
for python_spec in self.python_spec:
result = get_interpreter(python_spec, self.try_first_with, self.app_data, self._env)
result = get_interpreter(python_spec, self.try_first_with, self.app_data, self.cache, self._env)
if result is not None:
return result
return None
Expand All @@ -71,13 +71,17 @@ def __repr__(self) -> str:


def get_interpreter(
key, try_first_with: Iterable[str], app_data: AppData | None = None, env: Mapping[str, str] | None = None
key,
try_first_with: Iterable[str],
app_data: AppData | None = None,
cache=None,
env: Mapping[str, str] | None = None,
) -> PythonInfo | None:
spec = PythonSpec.from_string_spec(key)
LOGGER.info("find interpreter for spec %r", spec)
proposed_paths = set()
env = os.environ if env is None else env
for interpreter, impl_must_match in propose_interpreters(spec, try_first_with, app_data, env):
for interpreter, impl_must_match in propose_interpreters(spec, try_first_with, app_data, cache, env):
key = interpreter.system_executable, impl_must_match
if key in proposed_paths:
continue
Expand All @@ -93,6 +97,7 @@ def propose_interpreters( # noqa: C901, PLR0912, PLR0915
spec: PythonSpec,
try_first_with: Iterable[str],
app_data: AppData | None = None,
cache=None,
env: Mapping[str, str] | None = None,
) -> Generator[tuple[PythonInfo, bool], None, None]:
# 0. if it's a path and exists, and is absolute path, this is the only option we consider
Expand All @@ -108,7 +113,7 @@ def propose_interpreters( # noqa: C901, PLR0912, PLR0915
exe_id = fs_path_id(exe_raw)
if exe_id not in tested_exes:
tested_exes.add(exe_id)
yield PythonInfo.from_exe(exe_raw, app_data, env=env), True
yield PythonInfo.from_exe(exe_raw, app_data, cache, env=env), True
return

# 1. try with first
Expand All @@ -124,7 +129,7 @@ def propose_interpreters( # noqa: C901, PLR0912, PLR0915
if exe_id in tested_exes:
continue
tested_exes.add(exe_id)
yield PythonInfo.from_exe(exe_raw, app_data, env=env), True
yield PythonInfo.from_exe(exe_raw, app_data, cache, env=env), True

# 1. if it's a path and exists
if spec.path is not None:
Expand All @@ -137,12 +142,12 @@ def propose_interpreters( # noqa: C901, PLR0912, PLR0915
exe_id = fs_path_id(exe_raw)
if exe_id not in tested_exes:
tested_exes.add(exe_id)
yield PythonInfo.from_exe(exe_raw, app_data, env=env), True
yield PythonInfo.from_exe(exe_raw, app_data, cache, env=env), True
if spec.is_abs:
return
else:
# 2. otherwise try with the current
current_python = PythonInfo.current_system(app_data)
current_python = PythonInfo.current_system(app_data, cache)
exe_raw = str(current_python.executable)
exe_id = fs_path_id(exe_raw)
if exe_id not in tested_exes:
Expand All @@ -153,7 +158,7 @@ def propose_interpreters( # noqa: C901, PLR0912, PLR0915
if IS_WIN:
from .windows import propose_interpreters # noqa: PLC0415

for interpreter in propose_interpreters(spec, app_data, env):
for interpreter in propose_interpreters(spec, app_data, cache, env):
exe_raw = str(interpreter.executable)
exe_id = fs_path_id(exe_raw)
if exe_id in tested_exes:
Expand All @@ -171,7 +176,7 @@ def propose_interpreters( # noqa: C901, PLR0912, PLR0915
if exe_id in tested_exes:
continue
tested_exes.add(exe_id)
interpreter = PathPythonInfo.from_exe(exe_raw, app_data, raise_on_error=False, env=env)
interpreter = PathPythonInfo.from_exe(exe_raw, app_data, cache, raise_on_error=False, env=env)
if interpreter is not None:
yield interpreter, impl_must_match

Expand All @@ -184,7 +189,7 @@ def propose_interpreters( # noqa: C901, PLR0912, PLR0915
uv_python_path = user_data_path("uv") / "python"

for exe_path in uv_python_path.glob("*/bin/python"):
interpreter = PathPythonInfo.from_exe(str(exe_path), app_data, raise_on_error=False, env=env)
interpreter = PathPythonInfo.from_exe(str(exe_path), app_data, cache, raise_on_error=False, env=env)
if interpreter is not None:
yield interpreter, True

Expand Down
18 changes: 18 additions & 0 deletions src/virtualenv/discovery/cache.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
from __future__ import annotations

from typing import TYPE_CHECKING, Any, Protocol

if TYPE_CHECKING:
from pathlib import Path


class Cache(Protocol):
"""A protocol for a cache."""

def get(self, path: Path) -> Any: ...

def set(self, path: Path, data: Any) -> None: ...

def remove(self, path: Path) -> None: ...

def clear(self) -> None: ...
41 changes: 18 additions & 23 deletions src/virtualenv/discovery/cached_py_info.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,11 @@
from string import ascii_lowercase, ascii_uppercase, digits
from typing import TYPE_CHECKING

from virtualenv.app_data.na import AppDataDisabled
from virtualenv.cache import FileCache
from .py_info import PythonInfo

if TYPE_CHECKING:
from virtualenv.app_data.base import AppData
from virtualenv.cache import Cache
from virtualenv.discovery.py_info import PythonInfo
from .app_data import AppData
from .cache import Cache

_CACHE = OrderedDict()
_CACHE[Path(sys.executable)] = PythonInfo()
Expand All @@ -35,19 +33,15 @@

def from_exe( # noqa: PLR0913
cls,
app_data,
exe,
env=None,
app_data: AppData,
exe: str,
env: dict[str, str] | None = None,
*,
raise_on_error=True,
ignore_cache=False,
cache: Cache | None = None,
raise_on_error: bool = True,
ignore_cache: bool = False,
cache: Cache,
) -> PythonInfo | None:
env = os.environ if env is None else env
if cache is None:
if app_data is None:
app_data = AppDataDisabled()
cache = FileCache(store_factory=app_data.py_info, clearer=app_data.py_info_clear)
result = _get_from_cache(cls, app_data, exe, env, cache, ignore_cache=ignore_cache)
if isinstance(result, Exception):
if raise_on_error:
Expand Down Expand Up @@ -123,7 +117,12 @@ def gen_cookie():
)


def _run_subprocess(cls, exe, app_data, env):
def _run_subprocess(
cls,
exe: str,
app_data: AppData,
env: dict[str, str],
) -> tuple[Exception | None, PythonInfo | None]:
py_info_script = Path(os.path.abspath(__file__)).parent / "py_info.py"
# Cookies allow to split the serialized stdout output generated by the script collecting the info from the output
# generated by something else. The right way to deal with it is to create an anonymous pipe and pass its descriptor
Expand All @@ -135,10 +134,8 @@ def _run_subprocess(cls, exe, app_data, env):

start_cookie = gen_cookie()
end_cookie = gen_cookie()
if app_data is None:
app_data = AppDataDisabled()
with app_data.ensure_extracted(py_info_script) as py_info_script:
cmd = [exe, str(py_info_script), start_cookie, end_cookie]
with app_data.ensure_extracted(py_info_script) as py_info_script_path:
cmd = [exe, str(py_info_script_path), start_cookie, end_cookie]
# prevent sys.prefix from leaking into the child process - see https://bugs.python.org/issue22490
env = env.copy()
env.pop("__PYVENV_LAUNCHER__", None)
Expand Down Expand Up @@ -199,10 +196,8 @@ def __repr__(self) -> str:
return cmd_repr


def clear(app_data=None, cache=None):
def clear(cache: Cache | None = None) -> None:
"""Clear the cache."""
if cache is None and app_data is not None:
cache = FileCache(store_factory=app_data.py_info, clearer=app_data.py_info_clear)
if cache is not None:
cache.clear()
_CACHE.clear()
Expand Down
3 changes: 2 additions & 1 deletion src/virtualenv/discovery/discover.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ def add_parser_arguments(cls, parser):
"""
raise NotImplementedError

def __init__(self, options) -> None:
def __init__(self, options, cache=None) -> None:
"""
Create a new discovery mechanism.

Expand All @@ -24,6 +24,7 @@ def __init__(self, options) -> None:
self._has_run = False
self._interpreter = None
self._env = options.env
self.cache = cache

@abstractmethod
def run(self):
Expand Down
28 changes: 14 additions & 14 deletions src/virtualenv/discovery/py_info.py
Original file line number Diff line number Diff line change
Expand Up @@ -378,11 +378,11 @@ def spec(self):
)

@classmethod
def clear_cache(cls, app_data, cache=None):
def clear_cache(cls, cache=None):
# this method is not used by itself, so here and called functions can import stuff locally
from virtualenv.discovery.cached_py_info import clear # noqa: PLC0415

clear(app_data, cache)
clear(cache)
cls._cache_exe_discovery.clear()

def satisfies(self, spec, impl_must_match): # noqa: C901, PLR0911
Expand Down Expand Up @@ -423,7 +423,7 @@ def satisfies(self, spec, impl_must_match): # noqa: C901, PLR0911
_current = None

@classmethod
def current(cls, app_data=None, cache=None):
def current(cls, app_data, cache):
"""
This locates the current host interpreter information. This might be different than what we run into in case
the host python has been upgraded from underneath us.
Expand All @@ -432,14 +432,14 @@ def current(cls, app_data=None, cache=None):
cls._current = cls.from_exe(
sys.executable,
app_data,
cache,
raise_on_error=True,
resolve_to_host=False,
cache=cache,
)
return cls._current

@classmethod
def current_system(cls, app_data=None, cache=None) -> PythonInfo:
def current_system(cls, app_data, cache) -> PythonInfo:
"""
This locates the current host interpreter information. This might be different than what we run into in case
the host python has been upgraded from underneath us.
Expand All @@ -448,9 +448,9 @@ def current_system(cls, app_data=None, cache=None) -> PythonInfo:
cls._current_system = cls.from_exe(
sys.executable,
app_data,
cache,
raise_on_error=True,
resolve_to_host=True,
cache=cache,
)
return cls._current_system

Expand All @@ -467,12 +467,12 @@ def _to_dict(self):
def from_exe( # noqa: PLR0913
cls,
exe,
app_data=None,
app_data,
cache,
raise_on_error=True, # noqa: FBT002
ignore_cache=False, # noqa: FBT002
resolve_to_host=True, # noqa: FBT002
env=None,
cache=None,
):
"""Given a path to an executable get the python information."""
# this method is not used by itself, so here and called functions can import stuff locally
Expand Down Expand Up @@ -513,7 +513,7 @@ def _from_dict(cls, data):
return result

@classmethod
def _resolve_to_system(cls, app_data, target, cache=None):
def _resolve_to_system(cls, app_data, target, cache):
start_executable = target.executable
prefixes = OrderedDict()
while target.system_executable is None:
Expand All @@ -532,13 +532,13 @@ def _resolve_to_system(cls, app_data, target, cache=None):
prefixes[prefix] = target
target = target.discover_exe(app_data, prefix=prefix, exact=False, cache=cache)
if target.executable != target.system_executable:
target = cls.from_exe(target.system_executable, app_data, cache=cache)
target = cls.from_exe(target.system_executable, app_data, cache)
target.executable = start_executable
return target

_cache_exe_discovery = {} # noqa: RUF012

def discover_exe(self, app_data, prefix, exact=True, env=None, cache=None): # noqa: FBT002
def discover_exe(self, app_data, cache, prefix, exact=True, env=None): # noqa: FBT002
key = prefix, exact
if key in self._cache_exe_discovery and prefix:
LOGGER.debug("discover exe from cache %s - exact %s: %r", prefix, exact, self._cache_exe_discovery[key])
Expand All @@ -551,7 +551,7 @@ def discover_exe(self, app_data, prefix, exact=True, env=None, cache=None): # n
env = os.environ if env is None else env
for folder in possible_folders:
for name in possible_names:
info = self._check_exe(app_data, folder, name, exact, discovered, env, cache)
info = self._check_exe(app_data, cache, folder, name, exact, discovered, env)
if info is not None:
self._cache_exe_discovery[key] = info
return info
Expand All @@ -564,17 +564,17 @@ def discover_exe(self, app_data, prefix, exact=True, env=None, cache=None): # n
msg = "failed to detect {} in {}".format("|".join(possible_names), os.pathsep.join(possible_folders))
raise RuntimeError(msg)

def _check_exe(self, app_data, folder, name, exact, discovered, env, cache): # noqa: PLR0913
def _check_exe(self, app_data, cache, folder, name, exact, discovered, env): # noqa: PLR0913
exe_path = os.path.join(folder, name)
if not os.path.exists(exe_path):
return None
info = self.from_exe(
exe_path,
app_data,
cache,
resolve_to_host=False,
raise_on_error=False,
env=env,
cache=cache,
)
if info is None: # ignore if for some reason we can't query
return None
Expand Down
Loading
Loading