Skip to content

Commit 8dad0a2

Browse files
async_cluster: optimize determine_nodes
1 parent c4abee1 commit 8dad0a2

File tree

1 file changed

+51
-68
lines changed

1 file changed

+51
-68
lines changed

redis/asyncio/cluster.py

Lines changed: 51 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -415,39 +415,13 @@ def set_response_callback(self, command: KeyT, callback: Callable) -> None:
415415
"""Set a custom response callback."""
416416
self.cluster_response_callbacks[command] = callback
417417

418-
async def _determine_nodes(self, *args, **kwargs) -> List["ClusterNode"]:
419-
command = args[0]
420-
nodes_flag = kwargs.pop("nodes_flag", None)
421-
if nodes_flag is not None:
422-
# nodes flag passed by the user
423-
command_flag = nodes_flag
424-
else:
425-
# get the nodes group for this command if it was predefined
426-
command_flag = self.command_flags.get(command)
427-
if command_flag == self.__class__.RANDOM:
428-
# return a random node
429-
return [self.get_random_node()]
430-
elif command_flag == self.__class__.PRIMARIES:
431-
# return all primaries
432-
return self.get_primaries()
433-
elif command_flag == self.__class__.REPLICAS:
434-
# return all replicas
435-
return self.get_replicas()
436-
elif command_flag == self.__class__.ALL_NODES:
437-
# return all nodes
438-
return self.get_nodes()
439-
elif command_flag == self.__class__.DEFAULT_NODE:
440-
# return the cluster's default node
441-
return [self.nodes_manager.default_node]
442-
elif command in self.__class__.SEARCH_COMMANDS[0]:
443-
return [self.nodes_manager.default_node]
444-
else:
445-
# get the node that holds the key's slot
446-
slot = await self.determine_slot(*args)
447-
node = self.nodes_manager.get_node_from_slot(
448-
slot, self.read_from_replicas and command in READ_COMMANDS
449-
)
450-
return [node]
418+
def get_encoder(self) -> Encoder:
419+
"""Get the encoder object of the client."""
420+
return self.encoder
421+
422+
def get_connection_kwargs(self) -> Dict[str, Optional[Any]]:
423+
"""Get the connection kwargs passed to :class:`~redis.asyncio.client.Redis`."""
424+
return self.connection_kwargs
451425

452426
def keyslot(self, key: EncodableT) -> int:
453427
"""
@@ -458,7 +432,39 @@ def keyslot(self, key: EncodableT) -> int:
458432
k = self.encoder.encode(key)
459433
return key_slot(k)
460434

461-
async def determine_slot(self, *args) -> int:
435+
async def _determine_nodes(
436+
self, *args, node_flag: Optional[str] = None
437+
) -> List["ClusterNode"]:
438+
command = args[0]
439+
if not node_flag:
440+
# get the nodes group for this command if it was predefined
441+
node_flag = self.command_flags.get(command)
442+
443+
if node_flag in self.node_flags:
444+
if node_flag == self.__class__.DEFAULT_NODE:
445+
# return the cluster's default node
446+
return [self.nodes_manager.default_node]
447+
if node_flag == self.__class__.PRIMARIES:
448+
# return all primaries
449+
return self.nodes_manager.get_nodes_by_server_type(PRIMARY)
450+
if node_flag == self.__class__.REPLICAS:
451+
# return all replicas
452+
return self.nodes_manager.get_nodes_by_server_type(REPLICA)
453+
if node_flag == self.__class__.ALL_NODES:
454+
# return all nodes
455+
return list(self.nodes_manager.nodes_cache.values())
456+
if node_flag == self.__class__.RANDOM:
457+
# return a random node
458+
return [random.choice(list(self.nodes_manager.nodes_cache.values()))]
459+
460+
# get the node that holds the key's slot
461+
slot = await self._determine_slot(*args)
462+
node = self.nodes_manager.get_node_from_slot(
463+
slot, self.read_from_replicas and command in READ_COMMANDS
464+
)
465+
return [node]
466+
467+
async def _determine_slot(self, *args) -> int:
462468
command = args[0]
463469
if self.command_flags.get(command) == SLOT_ID:
464470
# The command contains the slot ID
@@ -514,17 +520,7 @@ async def determine_slot(self, *args) -> int:
514520

515521
return slots.pop()
516522

517-
def get_encoder(self) -> Encoder:
518-
return self.encoder
519-
520-
def get_connection_kwargs(self) -> Dict[str, Optional[Any]]:
521-
"""
522-
Get the kwargs passed to the :class:`~redis.asyncio.client.Redis` object of
523-
each node.
524-
"""
525-
return self.connection_kwargs
526-
527-
def _is_nodes_flag(
523+
def _is_node_flag(
528524
self, target_nodes: Union[List["ClusterNode"], "ClusterNode", str]
529525
) -> bool:
530526
return isinstance(target_nodes, str) and target_nodes in self.node_flags
@@ -570,32 +566,23 @@ async def execute_command(self, *args: Union[KeyT, EncodableT], **kwargs) -> Any
570566
can't be mapped to a slot
571567
"""
572568
command = args[0]
573-
target_nodes_specified = False
574-
target_nodes = None
569+
target_nodes_specified = target_nodes = exception = None
570+
retry_attempts = self.cluster_error_retry_attempts
571+
575572
passed_targets = kwargs.pop("target_nodes", None)
576-
if passed_targets and not self._is_nodes_flag(passed_targets):
573+
if passed_targets and not self._is_node_flag(passed_targets):
577574
target_nodes = self._parse_target_nodes(passed_targets)
578575
target_nodes_specified = True
579-
# If an error that allows retrying was thrown, the nodes and slots
580-
# cache were reinitialized. We will retry executing the command with
581-
# the updated cluster setup only when the target nodes can be
582-
# determined again with the new cache tables. Therefore, when target
583-
# nodes were passed to this function, we cannot retry the command
584-
# execution since the nodes may not be valid anymore after the tables
585-
# were reinitialized. So in case of passed target nodes,
586-
# retry_attempts will be set to 1.
587-
retry_attempts = (
588-
1 if target_nodes_specified else self.cluster_error_retry_attempts
589-
)
590-
exception = None
576+
retry_attempts = 1
577+
591578
for _ in range(0, retry_attempts):
592579
if self._initialize:
593580
await self.initialize()
594581
try:
595582
if not target_nodes_specified:
596583
# Determine the nodes to execute the command on
597584
target_nodes = await self._determine_nodes(
598-
*args, **kwargs, nodes_flag=passed_targets
585+
*args, node_flag=passed_targets
599586
)
600587
if not target_nodes:
601588
raise RedisClusterException(
@@ -642,12 +629,8 @@ async def _execute_command(
642629
self, target_node: "ClusterNode", *args: Union[KeyT, EncodableT], **kwargs
643630
) -> Any:
644631
command = args[0]
645-
redis_connection = None
646-
connection = None
647-
redirect_addr = None
648-
asking = False
649-
moved = False
650-
ttl = int(self.RedisClusterRequestTTL)
632+
redis_connection = connection = redirect_addr = asking = moved = None
633+
ttl = self.RedisClusterRequestTTL
651634
connection_error_retry_counter = 0
652635

653636
while ttl > 0:
@@ -658,7 +641,7 @@ async def _execute_command(
658641
elif moved:
659642
# MOVED occurred and the slots cache was updated,
660643
# refresh the target node
661-
slot = await self.determine_slot(*args)
644+
slot = await self._determine_slot(*args)
662645
target_node = self.nodes_manager.get_node_from_slot(
663646
slot, self.read_from_replicas and command in READ_COMMANDS
664647
)

0 commit comments

Comments
 (0)