1515"""Cursor class to iterate over Mongo query results."""
1616
1717import copy
18- import datetime
1918import warnings
2019
2120from collections import deque
2928from pymongo import helpers
3029from pymongo .common import validate_boolean , validate_is_mapping
3130from pymongo .collation import validate_collation_or_none
32- from pymongo .errors import (AutoReconnect ,
33- ConnectionFailure ,
31+ from pymongo .errors import (ConnectionFailure ,
3432 InvalidOperation ,
3533 NotMasterError ,
3634 OperationFailure )
37- from pymongo .message import (_convert_exception ,
38- _CursorAddress ,
35+ from pymongo .message import (_CursorAddress ,
3936 _GetMore ,
4037 _RawBatchGetMore ,
4138 _Query ,
5047 "await_data" : 32 ,
5148 "exhaust" : 64 ,
5249 "partial" : 128 }
53- _CURSOR_DOC_FIELDS = {'cursor' : {'firstBatch' : 1 , 'nextBatch' : 1 }}
5450
5551
5652class CursorType (object ):
@@ -941,129 +937,54 @@ def __send_message(self, operation):
941937 Can raise ConnectionFailure.
942938 """
943939 client = self .__collection .database .client
944- listeners = client ._event_listeners
945- publish = listeners .enabled_for_commands
946- from_command = False
947- start = datetime .datetime .now ()
948-
949- def duration (): return datetime .datetime .now () - start
950-
951- if operation :
952- try :
953- response = client ._send_message_with_response (
954- operation , exhaust = self .__exhaust , address = self .__address )
955- self .__address = response .address
956- if self .__exhaust :
957- # 'response' is an ExhaustResponse.
958- self .__exhaust_mgr = _SocketManager (response .socket_info ,
959- response .pool )
960-
961- cmd_name = operation .name
962- reply = response .data
963- rqst_id = response .request_id
964- from_command = response .from_command
965- except AutoReconnect :
966- # Don't try to send kill cursors on another socket
967- # or to another server. It can cause a _pinValue
968- # assertion on some server releases if we get here
969- # due to a socket timeout.
970- self .__killed = True
971- self .__die ()
972- raise
973- else :
974- # Exhaust cursor - no getMore message.
975- rqst_id = 0
976- cmd_name = 'getMore'
977- if publish :
978- # Fake a getMore command.
979- cmd = SON ([('getMore' , self .__id ),
980- ('collection' , self .__collection .name )])
981- if self .__batch_size :
982- cmd ['batchSize' ] = self .__batch_size
983- if self .__max_time_ms :
984- cmd ['maxTimeMS' ] = self .__max_time_ms
985- listeners .publish_command_start (
986- cmd , self .__collection .database .name , 0 , self .__address )
987- try :
988- reply = self .__exhaust_mgr .sock .receive_message (None )
989- except Exception as exc :
990- if publish :
991- listeners .publish_command_failure (
992- duration (), _convert_exception (exc ), cmd_name , rqst_id ,
993- self .__address )
994- if isinstance (exc , ConnectionFailure ):
995- self .__die ()
996- raise
997-
998940 try :
999- with client ._reset_on_error (self .__address , self .__session ):
1000- user_fields = None
1001- legacy_response = True
1002- if from_command :
1003- user_fields = _CURSOR_DOC_FIELDS
1004- legacy_response = False
1005- docs = self ._unpack_response (
1006- reply , self .__id , self .__collection .codec_options ,
1007- legacy_response = legacy_response , user_fields = user_fields )
1008- if from_command :
1009- first = docs [0 ]
1010- client ._process_response (first , self .__session )
1011- helpers ._check_command_response (first )
1012- except OperationFailure as exc :
941+ response = client ._send_message_with_response (
942+ operation , exhaust = self .__exhaust , address = self .__address ,
943+ unpack_res = self ._unpack_response )
944+ except OperationFailure :
1013945 self .__killed = True
1014946
1015947 # Make sure exhaust socket is returned immediately, if necessary.
1016948 self .__die ()
1017949
1018- if publish :
1019- listeners .publish_command_failure (
1020- duration (), exc .details , cmd_name , rqst_id , self .__address )
1021-
1022950 # If this is a tailable cursor the error is likely
1023951 # due to capped collection roll over. Setting
1024952 # self.__killed to True ensures Cursor.alive will be
1025953 # False. No need to re-raise.
1026954 if self .__query_flags & _QUERY_OPTIONS ["tailable_cursor" ]:
1027955 return
1028956 raise
1029- except NotMasterError as exc :
957+ except NotMasterError :
1030958 # Don't send kill cursors to another server after a "not master"
1031959 # error. It's completely pointless.
1032960 self .__killed = True
1033961
1034962 # Make sure exhaust socket is returned immediately, if necessary.
1035963 self .__die ()
1036964
1037- if publish :
1038- listeners .publish_command_failure (
1039- duration (), exc .details , cmd_name , rqst_id , self .__address )
1040-
1041965 raise
1042- except Exception as exc :
1043- if publish :
1044- listeners .publish_command_failure (
1045- duration (), _convert_exception (exc ), cmd_name , rqst_id ,
1046- self .__address )
966+ except ConnectionFailure :
967+ # Don't try to send kill cursors on another socket
968+ # or to another server. It can cause a _pinValue
969+ # assertion on some server releases if we get here
970+ # due to a socket timeout.
971+ self .__killed = True
972+ self .__die ()
973+ raise
974+ except Exception :
975+ # Close the cursor
976+ self .__die ()
1047977 raise
1048978
1049- if publish :
1050- # Must publish in find / getMore / explain command response format.
1051- if from_command :
1052- res = docs [0 ]
1053- elif cmd_name == "explain" :
1054- res = docs [0 ] if docs else {}
1055- else :
1056- res = {"cursor" : {"id" : reply .cursor_id ,
1057- "ns" : self .__collection .full_name },
1058- "ok" : 1 }
1059- if cmd_name == "find" :
1060- res ["cursor" ]["firstBatch" ] = docs
1061- else :
1062- res ["cursor" ]["nextBatch" ] = docs
1063- listeners .publish_command_success (
1064- duration (), res , cmd_name , rqst_id , self .__address )
979+ self .__address = response .address
980+ if self .__exhaust and not self .__exhaust_mgr :
981+ # 'response' is an ExhaustResponse.
982+ self .__exhaust_mgr = _SocketManager (response .socket_info ,
983+ response .pool )
1065984
1066- if from_command :
985+ cmd_name = operation .name
986+ docs = response .docs
987+ if response .from_command :
1067988 if cmd_name != "explain" :
1068989 cursor = docs [0 ]['cursor' ]
1069990 self .__id = cursor ['id' ]
@@ -1078,9 +999,9 @@ def duration(): return datetime.datetime.now() - start
1078999 self .__data = deque (docs )
10791000 self .__retrieved += len (docs )
10801001 else :
1081- self .__id = reply .cursor_id
1002+ self .__id = response . data .cursor_id
10821003 self .__data = deque (docs )
1083- self .__retrieved += reply .number_returned
1004+ self .__retrieved += response . data .number_returned
10841005
10851006 if self .__id == 0 :
10861007 self .__killed = True
@@ -1147,19 +1068,17 @@ def _refresh(self):
11471068 limit = self .__batch_size
11481069
11491070 # Exhaust cursors don't send getMore messages.
1150- if self .__exhaust :
1151- self .__send_message (None )
1152- else :
1153- g = self ._getmore_class (self .__collection .database .name ,
1154- self .__collection .name ,
1155- limit ,
1156- self .__id ,
1157- self .__codec_options ,
1158- self ._read_preference (),
1159- self .__session ,
1160- self .__collection .database .client ,
1161- self .__max_await_time_ms )
1162- self .__send_message (g )
1071+ g = self ._getmore_class (self .__collection .database .name ,
1072+ self .__collection .name ,
1073+ limit ,
1074+ self .__id ,
1075+ self .__codec_options ,
1076+ self ._read_preference (),
1077+ self .__session ,
1078+ self .__collection .database .client ,
1079+ self .__max_await_time_ms ,
1080+ self .__exhaust_mgr )
1081+ self .__send_message (g )
11631082
11641083 return len (self .__data )
11651084
0 commit comments