Skip to content
4 changes: 2 additions & 2 deletions .ci_support/environment-old.yml
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
channels:
- conda-forge
dependencies:
- lammps =2022.06.23
- lammps =2024.06.27=*_mpi_openmpi_*
- openmpi
- numpy =1.23.5
- mpi4py =3.1.4
- mpi4py =4.0.1
- executorlib =1.2.0
- ase =3.23.0
- scipy =1.9.3
Expand Down
139 changes: 139 additions & 0 deletions tests/test_mpi_backend.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
import unittest
import os
import numpy as np
from lammps import lammps
from pylammpsmpi.mpi.lmpmpi import select_cmd


class TestMpiBackend(unittest.TestCase):
@classmethod
def setUpClass(cls):
cls.execution_path = os.path.dirname(os.path.abspath(__file__))
cls.lammps_file = os.path.join(cls.execution_path, "in.simple")
cls.lmp = lammps()
select_cmd("get_file")(job=cls.lmp, funct_args=[cls.lammps_file])

@classmethod
def tearDownClass(cls):
cls.lmp.close()

def test_extract_atom(self):
f = select_cmd("extract_atom")(job=self.lmp, funct_args=["error"])
self.assertEqual(len(f), 0)
f = select_cmd("extract_atom")(job=self.lmp, funct_args=["f"])
self.assertEqual(len(f), 256)
self.assertEqual(np.round(f[0][0], 2), -0.26)

ids = select_cmd("extract_atom")(job=self.lmp, funct_args=["id"])
self.assertEqual(len(ids), 256)

def test_extract_compute_global(self):
compute_temp = select_cmd("extract_compute")(
job=self.lmp, funct_args=["1", 0, 0, 0, 0]
)
self.assertIsInstance(compute_temp, float)

def test_extract_compute_per_atom(self):
compute_ke_atom = select_cmd("extract_compute")(
job=self.lmp, funct_args=["ke", 1, 1, 256, 0]
)
self.assertEqual(len(compute_ke_atom), 256)

def test_gather_atoms(self):
f = select_cmd("gather_atoms")(job=self.lmp, funct_args=["f"])
self.assertEqual(len(f), 256)
# this checks if info was gathered from
# all processors
self.assertEqual(np.round(f[-22][0], 2), 0.31)

ids = select_cmd("extract_atom")(job=self.lmp, funct_args=["id"])
self.assertEqual(len(ids), 256)
self.assertEqual(select_cmd("get_natoms")(job=self.lmp, funct_args=[]), 256)

def test_extract_fix(self):
x = select_cmd("extract_fix")(job=self.lmp, funct_args=["2", 0, 1, 1])
self.assertEqual(np.round(x, 2), -2.61)

def test_extract_variable(self):
x = select_cmd("extract_variable")(job=self.lmp, funct_args=["tt", "all", 0])
self.assertEqual(np.round(x, 2), 1.13)
x = select_cmd("extract_variable")(job=self.lmp, funct_args=["fx", "all", 1])
self.assertEqual(len(x), 256)
self.assertEqual(np.round(x[0], 2), -0.26)

def test_scatter_atoms(self):
f = select_cmd("gather_atoms")(job=self.lmp, funct_args=["f"])
val = np.random.randint(0, 100)
f[1][0] = val
select_cmd("scatter_atoms")(job=self.lmp, funct_args=["f", f])
f1 = select_cmd("gather_atoms")(job=self.lmp, funct_args=["f"])
self.assertEqual(f1[1][0], val)

f = select_cmd("gather_atoms")(job=self.lmp, funct_args=["f", 2, [1, 2]])
val = np.random.randint(0, 100)
f[1][1] = val
select_cmd("scatter_atoms")(job=self.lmp, funct_args=["f", f, 2, [1, 2]])
f1 = select_cmd("gather_atoms")(job=self.lmp, funct_args=["f", 2, [1, 2]])
self.assertEqual(f1[1][1], val)

def test_extract_box(self):
box = select_cmd("extract_box")(job=self.lmp, funct_args=[])
self.assertEqual(len(box), 7)

self.assertEqual(box[0][0], 0.0)
self.assertEqual(np.round(box[1][0], 2), 6.72)

def test_version(self):
self.assertTrue(
select_cmd("get_version")(job=self.lmp, funct_args=[])
in [20220623, 20230802, 20231121, 20240207, 20240627, 20240829]
)

def test_extract_global(self):
self.assertEqual(
select_cmd("extract_global")(job=self.lmp, funct_args=["boxhi"]),
[6.718384765530029, 6.718384765530029, 6.718384765530029],
)
self.assertEqual(
select_cmd("extract_global")(job=self.lmp, funct_args=["boxlo"]),
[0.0, 0.0, 0.0],
)

def test_properties(self):
self.assertEqual(
select_cmd("has_exceptions")(job=self.lmp, funct_args=[]), True
)
self.assertEqual(
select_cmd("has_gzip_support")(job=self.lmp, funct_args=[]), True
)
self.assertEqual(
select_cmd("has_png_support")(job=self.lmp, funct_args=[]), True
)
self.assertEqual(
select_cmd("has_jpeg_support")(job=self.lmp, funct_args=[]), True
)
self.assertEqual(
select_cmd("has_ffmpeg_support")(job=self.lmp, funct_args=[]), False
)

def test_get_thermo(self):
self.assertEqual(
float(select_cmd("get_thermo")(job=self.lmp, funct_args=["temp"])),
1.1298532212880312,
)
select_cmd("command")(job=self.lmp, funct_args="run 0")
self.assertEqual(
float(select_cmd("get_thermo")(job=self.lmp, funct_args=["temp"])),
1.129853221288031,
)

def test_installed_packages(self):
packages = select_cmd("installed_packages")(job=self.lmp, funct_args=[])
self.assertIsInstance(packages, list)
self.assertIn("MANYBODY", packages)
self.assertIn("KSPACE", packages)
self.assertIn("MC", packages)


if __name__ == "__main__":
unittest.main()
159 changes: 159 additions & 0 deletions tests/test_mpi_backend_extended.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
import unittest
import os
import numpy as np
from lammps import lammps
from pylammpsmpi.mpi.lmpmpi import select_cmd


class TestMpiBackendExtended(unittest.TestCase):
def setUp(self):
self.execution_path = os.path.dirname(os.path.abspath(__file__))
self.lammps_file = os.path.join(self.execution_path, "in.simple")
self.lmp = lammps()
select_cmd("get_file")(job=self.lmp, funct_args=[self.lammps_file])

def tearDown(self):
self.lmp.close()

def test_gather_atoms_invalid(self):
f = select_cmd("gather_atoms")(job=self.lmp, funct_args=["error"])
self.assertEqual(len(f), 0)

def test_extract_compute_style_error(self):
with self.assertRaises(ValueError):
select_cmd("extract_compute")(job=self.lmp, funct_args=["1", 2, 0, 0, 0])

def test_get_file(self):
lmp = lammps()
ret = select_cmd("get_file")(job=lmp, funct_args=[self.lammps_file])
self.assertEqual(ret, 1)
lmp.close()

def test_commands(self):
ret = select_cmd("commands_list")(job=self.lmp, funct_args=[["run 0", "run 1"]])
self.assertEqual(ret, 1)
ret = select_cmd("commands_string")(job=self.lmp, funct_args=["run 0\nrun 1"])
self.assertEqual(ret, 1)

def test_set_variable_deprecated(self):
ret = select_cmd("set_variable")(job=self.lmp, funct_args=["tt", 1.0])
self.assertEqual(ret, -1)

def test_variable_setting_with_command(self):
select_cmd("command")(job=self.lmp, funct_args="variable my_var equal 1.0")
select_cmd("command")(job=self.lmp, funct_args="variable my_var equal 2.0")
x = select_cmd("extract_variable")(
job=self.lmp, funct_args=["my_var", "all", 0]
)
self.assertEqual(x, 2.0)

def test_reset_box(self):
lmp = lammps()
select_cmd("command")(job=lmp, funct_args="units lj")
select_cmd("command")(job=lmp, funct_args="dimension 3")
select_cmd("command")(job=lmp, funct_args="boundary p p p")
select_cmd("command")(job=lmp, funct_args="atom_style atomic")
select_cmd("command")(job=lmp, funct_args="region mybox block 0 10 0 10 0 10")
select_cmd("command")(job=lmp, funct_args="create_box 1 mybox")
box = select_cmd("extract_box")(job=lmp, funct_args=[])
box[0][0] = -1.0
box[1][0] = 7.0
ret = select_cmd("reset_box")(
job=lmp, funct_args=[box[0], box[1], box[2], box[3], box[4]]
)
self.assertEqual(ret, 1)
new_box = select_cmd("extract_box")(job=lmp, funct_args=[])
self.assertEqual(new_box[0][0], -1.0)
self.assertEqual(new_box[1][0], 7.0)
lmp.close()

def test_command(self):
ret = select_cmd("command")(job=self.lmp, funct_args="run 0")
self.assertEqual(ret, 1)

def test_extract_variable_error(self):
v = select_cmd("extract_variable")(
job=self.lmp, funct_args=["var_not_exist", "all", 0]
)
self.assertIsNone(v)

def test_gather_atoms_concat(self):
f = select_cmd("gather_atoms_concat")(job=self.lmp, funct_args=["f"])
self.assertEqual(len(f), 256)
self.assertEqual(np.round(f[-22][0], 2), 0.31)
f = select_cmd("gather_atoms_concat")(job=self.lmp, funct_args=["error"])
self.assertEqual(len(f), 0)

def test_gather_atoms_subset(self):
ids = [1, 5, 10]
f = select_cmd("gather_atoms_subset")(
job=self.lmp, funct_args=["f", len(ids), ids]
)
self.assertEqual(len(f), len(ids))
f = select_cmd("gather_atoms_subset")(job=self.lmp, funct_args=["error", 0, []])
self.assertEqual(len(f), 0)

def test_scatter_atoms_subset(self):
ids = [1, 5, 10]
f = select_cmd("gather_atoms_subset")(
job=self.lmp, funct_args=["f", len(ids), ids]
)
val = np.random.randint(0, 100)
f[1][0] = val
ret = select_cmd("scatter_atoms_subset")(
job=self.lmp, funct_args=["f", f, len(ids), ids]
)
self.assertEqual(ret, 1)
f1 = select_cmd("gather_atoms_subset")(
job=self.lmp, funct_args=["f", len(ids), ids]
)
self.assertEqual(f1[1][0], val)

def test_scatter_atoms_integer(self):
types = select_cmd("gather_atoms")(job=self.lmp, funct_args=["type"])
val = 2
types[10] = val
ret = select_cmd("scatter_atoms")(
job=self.lmp, funct_args=["type", types.astype(int)]
)
self.assertEqual(ret, 1)
types_new = select_cmd("gather_atoms")(job=self.lmp, funct_args=["type"])
self.assertEqual(types_new[10], val)

def test_neighlist(self):
nl = select_cmd("get_neighlist")(job=self.lmp, funct_args=[0])
self.assertIsNotNone(nl)
pnl = select_cmd("find_pair_neighlist")(job=self.lmp, funct_args=["lj/cut"])
self.assertIsInstance(pnl, int)
fnl = select_cmd("find_fix_neighlist")(job=self.lmp, funct_args=["2"])
self.assertIsInstance(fnl, int)
cnl = select_cmd("find_compute_neighlist")(job=self.lmp, funct_args=["ke"])
self.assertIsInstance(cnl, int)
size = select_cmd("get_neighlist_size")(job=self.lmp, funct_args=[0])
self.assertIsInstance(size, int)
elem = select_cmd("get_neighlist_element_neighbors")(
job=self.lmp, funct_args=[0, 0]
)
self.assertIsInstance(elem, object)

def test_extract_atom_error(self):
f = select_cmd("extract_atom")(job=self.lmp, funct_args=["error"])
self.assertEqual(len(f), 0)

def test_create_atoms(self):
lmp = lammps()
select_cmd("command")(job=lmp, funct_args="units lj")
select_cmd("command")(job=lmp, funct_args="dimension 3")
select_cmd("command")(job=lmp, funct_args="boundary p p p")
select_cmd("command")(job=lmp, funct_args="atom_style atomic")
select_cmd("command")(job=lmp, funct_args="region box block 0 10 0 10 0 10")
select_cmd("command")(job=lmp, funct_args="create_box 1 box")
ret = select_cmd("create_atoms")(
job=lmp, funct_args=[1, [1], [1], [[1.0, 1.0, 1.0]], [0], 0]
)
self.assertEqual(ret, 1)
lmp.close()


if __name__ == "__main__":
unittest.main()
13 changes: 13 additions & 0 deletions tests/test_pylammpsmpi_local.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,12 @@ def setUpClass(cls):
def tearDownClass(cls):
cls.lmp.close()

def test_version(self):
self.assertTrue(
self.lmp.version
in [20220623, 20230802, 20231121, 20240207, 20240627, 20240829]
)

def test_extract_atom(self):
f = self.lmp.extract_atom("f")
self.assertEqual(len(f), 256)
Expand Down Expand Up @@ -94,6 +100,13 @@ def test_extract_box(self):
def test_cmdarg_options(self):
self.assertTrue(os.path.isfile(self.citation_file))

def test_properties(self):
self.assertEqual(self.lmp.has_exceptions, True)
self.assertEqual(self.lmp.has_gzip_support, True)
self.assertEqual(self.lmp.has_png_support, True)
self.assertEqual(self.lmp.has_jpeg_support, True)
self.assertEqual(self.lmp.has_ffmpeg_support, False)


if __name__ == "__main__":
unittest.main()
Loading