Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
177 changes: 176 additions & 1 deletion arango/cluster.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
__all__ = ["Cluster"]

from typing import List
from typing import List, Optional

from arango.api import ApiGroup
from arango.exceptions import (
ClusterEndpointsError,
ClusterHealthError,
ClusterMaintenanceModeError,
ClusterRebalanceError,
ClusterServerCountError,
ClusterServerEngineError,
ClusterServerIDError,
Expand Down Expand Up @@ -195,3 +196,177 @@ def response_handler(resp: Response) -> List[str]:
return [item["endpoint"] for item in resp.body["endpoints"]]

return self._execute(request, response_handler)

def calculate_imbalance(self) -> Result[Json]:
"""Compute the current cluster imbalance, including
the amount of ongoing and pending move shard operations.

:return: Cluster imbalance information.
:rtype: dict
:raise: arango.exceptions.ClusterRebalanceError: If retrieval fails.
"""
request = Request(method="get", endpoint="/_admin/cluster/rebalance")

def response_handler(resp: Response) -> Json:
if not resp.is_success:
raise ClusterRebalanceError(resp, request)
result: Json = resp.body["result"]
return result

return self._execute(request, response_handler)

def rebalance(
self,
version: int = 1,
max_moves: Optional[int] = None,
leader_changes: Optional[bool] = None,
move_leaders: Optional[bool] = None,
move_followers: Optional[bool] = None,
pi_factor: Optional[float] = None,
exclude_system_collections: Optional[bool] = None,
databases_excluded: Optional[List[str]] = None,
) -> Result[Json]:
"""Compute and execute a cluster rebalance plan.

:param version: Must be set to 1.
:type version: int
:param max_moves: Maximum number of moves to be computed.
:type max_moves: int | None
:param leader_changes: Allow leader changes without moving data.
:type leader_changes: bool | None
:param move_leaders: Allow moving shard leaders.
:type move_leaders: bool | None
:param move_followers: Allow moving shard followers.
:type move_followers: bool | None
:param pi_factor: A weighting factor that should remain untouched.
:type pi_factor: float | None
:param exclude_system_collections: Ignore system collections in the
rebalance plan.
:type exclude_system_collections: bool | None
:param databases_excluded: List of database names to be excluded
from the analysis.
:type databases_excluded: [str] | None
:return: Cluster rebalance plan that has been executed.
:rtype: dict
:raise: arango.exceptions.ClusterRebalanceError: If retrieval fails.
"""
data: Json = dict(version=version)
if max_moves is not None:
data["maximumNumberOfMoves"] = max_moves
if leader_changes is not None:
data["leaderChanges"] = leader_changes
if move_leaders is not None:
data["moveLeaders"] = move_leaders
if move_followers is not None:
data["moveFollowers"] = move_followers
if pi_factor is not None:
data["piFactor"] = pi_factor
if exclude_system_collections is not None:
data["excludeSystemCollections"] = exclude_system_collections
if databases_excluded is not None:
data["databasesExcluded"] = databases_excluded

request = Request(method="put", endpoint="/_admin/cluster/rebalance", data=data)

def response_handler(resp: Response) -> Json:
if not resp.is_success:
raise ClusterRebalanceError(resp, request)
result: Json = resp.body["result"]
return result

return self._execute(request, response_handler)

def calculate_rebalance_plan(
self,
version: int = 1,
max_moves: Optional[int] = None,
leader_changes: Optional[bool] = None,
move_leaders: Optional[bool] = None,
move_followers: Optional[bool] = None,
pi_factor: Optional[float] = None,
exclude_system_collections: Optional[bool] = None,
databases_excluded: Optional[List[str]] = None,
) -> Result[Json]:
"""Compute the cluster rebalance plan.

:param version: Must be set to 1.
:type version: int
:param max_moves: Maximum number of moves to be computed.
:type max_moves: int | None
:param leader_changes: Allow leader changes without moving data.
:type leader_changes: bool | None
:param move_leaders: Allow moving shard leaders.
:type move_leaders: bool | None
:param move_followers: Allow moving shard followers.
:type move_followers: bool | None
:param pi_factor: A weighting factor that should remain untouched.
:type pi_factor: float | None
:param exclude_system_collections: Ignore system collections in the
rebalance plan.
:type exclude_system_collections: bool | None
:param databases_excluded: List of database names to be excluded
from the analysis.
:type databases_excluded: [str] | None
:return: Cluster rebalance plan.
:rtype: dict
:raise: arango.exceptions.ClusterRebalanceError: If retrieval fails.
"""
data: Json = dict(version=version)
if max_moves is not None:
data["maximumNumberOfMoves"] = max_moves
if leader_changes is not None:
data["leaderChanges"] = leader_changes
if move_leaders is not None:
data["moveLeaders"] = move_leaders
if move_followers is not None:
data["moveFollowers"] = move_followers
if pi_factor is not None:
data["piFactor"] = pi_factor
if exclude_system_collections is not None:
data["excludeSystemCollections"] = exclude_system_collections
if databases_excluded is not None:
data["databasesExcluded"] = databases_excluded

request = Request(
method="post", endpoint="/_admin/cluster/rebalance", data=data
)

def response_handler(resp: Response) -> Json:
if not resp.is_success:
raise ClusterRebalanceError(resp, request)
result: Json = resp.body["result"]
return result

return self._execute(request, response_handler)

def execute_rebalance_plan(
self, moves: List[Json], version: int = 1
) -> Result[bool]:
"""Execute the given set of move shard operations.

You can use :meth:`Cluster.calculate_rebalance_plan` to calculate
these operations to improve the balance of shards, leader shards,
and follower shards.

:param moves: List of move shard operations.
:type moves: [dict]
:param version: Must be set to 1.
:type version: int
:return: True if the methods have been accepted and scheduled
for execution.
:rtype: bool
:raise: arango.exceptions.ClusterRebalanceError: If request fails.
"""
data: Json = dict(version=version, moves=moves)

request = Request(
method="post", endpoint="/_admin/cluster/rebalance/execute", data=data
)

def response_handler(resp: Response) -> bool:
if not resp.is_success:
raise ClusterRebalanceError(resp, request)
result: bool = resp.body["code"] == 202
return result

return self._execute(request, response_handler)
4 changes: 4 additions & 0 deletions arango/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -975,6 +975,10 @@ class ClusterServerCountError(ArangoServerError):
"""Failed to retrieve cluster server count."""


class ClusterRebalanceError(ArangoServerError):
"""Failed to execute cluster re-balancing operation (load/set)."""


##################
# JWT Exceptions #
##################
Expand Down
3 changes: 3 additions & 0 deletions docs/cluster.rst
Original file line number Diff line number Diff line change
Expand Up @@ -91,4 +91,7 @@ Below is an example on how to manage clusters using python-arango.
cluster.toggle_maintenance_mode('on')
cluster.toggle_maintenance_mode('off')

# Rebalance the distribution of shards. Available with ArangoDB 3.10+.
cluster.rebalance()

See :ref:`ArangoClient` and :ref:`Cluster` for API specification.
54 changes: 54 additions & 0 deletions tests/test_cluster.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
import pytest
from packaging import version

from arango.errno import DATABASE_NOT_FOUND, FORBIDDEN
from arango.exceptions import (
ClusterEndpointsError,
ClusterHealthError,
ClusterMaintenanceModeError,
ClusterRebalanceError,
ClusterServerCountError,
ClusterServerEngineError,
ClusterServerIDError,
Expand Down Expand Up @@ -134,3 +136,55 @@ def test_cluster_server_count(db, bad_db, cluster):
with assert_raises(ClusterServerCountError) as err:
bad_db.cluster.server_count()
assert err.value.error_code in {FORBIDDEN, DATABASE_NOT_FOUND}


def test_cluster_rebalance(sys_db, bad_db, cluster, db_version):
if not cluster:
pytest.skip("Only tested in a cluster setup")

if db_version < version.parse("3.10.0"):
pytest.skip("Only tested on ArangoDB 3.10+")

# Test imbalance retrieval
imbalance = sys_db.cluster.calculate_imbalance()
assert "leader" in imbalance
assert "shards" in imbalance
assert imbalance["pendingMoveShards"] == 0
assert imbalance["todoMoveShards"] == 0

with assert_raises(ClusterRebalanceError) as err:
bad_db.cluster.calculate_imbalance()
assert err.value.error_code == FORBIDDEN

# Test rebalance computation
rebalance = sys_db.cluster.calculate_rebalance_plan(
max_moves=3,
leader_changes=True,
move_leaders=True,
move_followers=True,
pi_factor=1234.5,
databases_excluded=["_system"],
)
assert "imbalanceBefore" in rebalance
assert "imbalanceAfter" in rebalance
assert "moves" in rebalance

with assert_raises(ClusterRebalanceError) as err:
bad_db.cluster.calculate_rebalance_plan()
assert err.value.error_code == FORBIDDEN

# Test rebalance execution
assert sys_db.cluster.execute_rebalance_plan(rebalance["moves"]) is True
with assert_raises(ClusterRebalanceError) as err:
bad_db.cluster.execute_rebalance_plan(rebalance["moves"])
assert err.value.error_code == FORBIDDEN

# Rebalance cluster in one go
rebalance = sys_db.cluster.rebalance()
assert "imbalanceBefore" in rebalance
assert "imbalanceAfter" in rebalance
assert "moves" in rebalance

with assert_raises(ClusterRebalanceError) as err:
bad_db.cluster.rebalance()
assert err.value.error_code == FORBIDDEN