Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 42 additions & 10 deletions executorlib/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@
pandas.DataFrame.
"""

from typing import Optional

import executorlib._version
from executorlib.executor.base import BaseExecutor
from executorlib.executor.flux import (
FluxClusterExecutor,
Expand All @@ -22,12 +25,48 @@
SlurmClusterExecutor,
SlurmJobExecutor,
)
from executorlib.standalone.cache import get_cache_data

from . import _version

def get_cache_data(cache_directory: str) -> list[dict]:
"""
Collect all HDF5 files in the cache directory

Args:
cache_directory (str): The directory to store cache files.

Returns:
list[dict]: List of dictionaries each representing on of the HDF5 files in the cache directory.
"""
from executorlib.standalone.hdf import get_cache_data

return get_cache_data(cache_directory=cache_directory)


def terminate_tasks_in_cache(
cache_directory: str,
config_directory: Optional[str] = None,
backend: Optional[str] = None,
):
"""
Delete all jobs stored in the cache directory from the queuing system

Args:
cache_directory (str): The directory to store cache files.
config_directory (str, optional): path to the config directory.
backend (str, optional): name of the backend used to spawn tasks ["slurm", "flux"].
"""
from executorlib.task_scheduler.file.queue_spawner import terminate_tasks_in_cache

return terminate_tasks_in_cache(
cache_directory=cache_directory,
config_directory=config_directory,
backend=backend,
)


__all__: list[str] = [
"get_cache_data",
"terminate_tasks_in_cache",
"BaseExecutor",
"FluxJobExecutor",
"FluxClusterExecutor",
Expand All @@ -36,11 +75,4 @@
"SlurmClusterExecutor",
]

try:
from executorlib.task_scheduler.file.queue_spawner import terminate_tasks_in_cache

__all__ += ["terminate_tasks_in_cache"]
except ImportError:
pass

__version__ = _version.__version__
__version__ = executorlib._version.__version__
57 changes: 0 additions & 57 deletions executorlib/standalone/cache.py

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,16 @@
import h5py
import numpy as np

from executorlib.standalone.cache import group_dict
group_dict = {
"fn": "function",
"args": "input_args",
"kwargs": "input_kwargs",
"output": "output",
"error": "error",
"runtime": "runtime",
"queue_id": "queue_id",
"error_log_file": "error_log_file",
}


def dump(file_name: Optional[str], data_dict: dict) -> None:
Expand Down Expand Up @@ -110,3 +119,54 @@ def get_queue_id(file_name: Optional[str]) -> Optional[int]:
if "queue_id" in hdf:
return cloudpickle.loads(np.void(hdf["/queue_id"]))
return None


def get_cache_data(cache_directory: str) -> list[dict]:
"""
Collect all HDF5 files in the cache directory

Args:
cache_directory (str): The directory to store cache files.

Returns:
list[dict]: List of dictionaries each representing on of the HDF5 files in the cache directory.
"""
return [
_get_content_of_file(file_name=file_name) | {"filename": file_name}
for file_name in get_cache_files(cache_directory=cache_directory)
]


def get_cache_files(cache_directory: str) -> list[str]:
"""
Recursively find all HDF5 files in the cache_directory which contain outputs.

Args:
cache_directory (str): The directory to store cache files.

Returns:
list[str]: List of HDF5 file in the cache directory which contain outputs.
"""
file_lst = []
cache_directory_abs = os.path.abspath(cache_directory)
for dirpath, _, filenames in os.walk(cache_directory_abs):
file_lst += [os.path.join(dirpath, f) for f in filenames if f.endswith("_o.h5")]
return file_lst


def _get_content_of_file(file_name: str) -> dict:
"""
Get content of an HDF5 file

Args:
file_name (str): file name

Returns:
dict: Content of HDF5 file
"""
with h5py.File(file_name, "r") as hdf:
return {
key: cloudpickle.loads(np.void(hdf["/" + key]))
for key in group_dict.values()
if key in hdf
}
2 changes: 1 addition & 1 deletion executorlib/task_scheduler/file/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from typing import Any

from executorlib.standalone.error import backend_write_error_file
from executorlib.task_scheduler.file.hdf import dump, load
from executorlib.standalone.hdf import dump, load
from executorlib.task_scheduler.file.shared import FutureItem


Expand Down
2 changes: 1 addition & 1 deletion executorlib/task_scheduler/file/queue_spawner.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@

from pysqa import QueueAdapter

from executorlib.standalone.hdf import dump, get_queue_id
from executorlib.standalone.inputcheck import check_file_exists
from executorlib.standalone.scheduler import pysqa_execute_command, terminate_with_pysqa
from executorlib.task_scheduler.file.hdf import dump, get_queue_id


def execute_with_pysqa(
Expand Down
3 changes: 1 addition & 2 deletions executorlib/task_scheduler/file/shared.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,9 @@
from concurrent.futures import Future
from typing import Any, Callable, Optional

from executorlib.standalone.cache import get_cache_files
from executorlib.standalone.command import get_cache_execute_command
from executorlib.standalone.hdf import get_cache_files, get_output
from executorlib.standalone.serialize import serialize_funct
from executorlib.task_scheduler.file.hdf import get_output
from executorlib.task_scheduler.file.subprocess_spawner import terminate_subprocess


Expand Down
2 changes: 1 addition & 1 deletion executorlib/task_scheduler/file/subprocess_spawner.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
import time
from typing import Optional

from executorlib.standalone.hdf import dump
from executorlib.standalone.inputcheck import check_file_exists
from executorlib.task_scheduler.file.hdf import dump


def execute_in_subprocess(
Expand Down
3 changes: 1 addition & 2 deletions executorlib/task_scheduler/interactive/shared.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import time
from typing import Callable, Optional

from executorlib.standalone.cache import get_cache_files
from executorlib.standalone.command import get_interactive_execute_command
from executorlib.standalone.interactive.communication import (
SocketInterface,
Expand Down Expand Up @@ -130,7 +129,7 @@ def _execute_task_with_cache(
cache_key (str, optional): By default the cache_key is generated based on the function hash, this can be
overwritten by setting the cache_key.
"""
from executorlib.task_scheduler.file.hdf import dump, get_output
from executorlib.standalone.hdf import dump, get_cache_files, get_output

task_key, data_dict = serialize_funct(
fn=task_dict["fn"],
Expand Down
2 changes: 1 addition & 1 deletion tests/test_cache_backend_execute.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
try:
from executorlib.task_scheduler.file.backend import backend_execute_task_in_file
from executorlib.task_scheduler.file.shared import _check_task_output, FutureItem
from executorlib.task_scheduler.file.hdf import dump, get_runtime
from executorlib.standalone.hdf import dump, get_runtime
from executorlib.standalone.serialize import serialize_funct

skip_h5io_test = False
Expand Down
5 changes: 3 additions & 2 deletions tests/test_fluxclusterexecutor.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,9 @@

try:
import flux.job
from executorlib.task_scheduler.file.hdf import dump
from executorlib.task_scheduler.file.queue_spawner import terminate_tasks_in_cache, execute_with_pysqa
from executorlib import terminate_tasks_in_cache
from executorlib.standalone.hdf import dump
from executorlib.task_scheduler.file.queue_spawner import execute_with_pysqa
from executorlib.standalone.scheduler import terminate_with_pysqa

skip_flux_test = "FLUX_URI" not in os.environ
Expand Down
2 changes: 1 addition & 1 deletion tests/test_slurmclusterexecutor.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
skip_mpi4py_test = importlib.util.find_spec("mpi4py") is None

try:
from executorlib.task_scheduler.file.hdf import dump
from executorlib.standalone.hdf import dump

skip_h5py_test = False
except ImportError:
Expand Down
2 changes: 1 addition & 1 deletion tests/test_standalone_hdf.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@


try:
from executorlib.task_scheduler.file.hdf import (
from executorlib.standalone.hdf import (
dump,
load,
get_output,
Expand Down
Loading