@@ -40,6 +40,10 @@ def expand_action(data):
4040 return action , data .get ('_source' , data )
4141
4242def _chunk_actions (actions , chunk_size , max_chunk_bytes , serializer ):
43+ """
44+ Split actions into chunks by number or size, serialize them into strings in
45+ the process.
46+ """
4347 bulk_actions = []
4448 size , action_count = 0 , 0
4549 for action , data in actions :
@@ -65,6 +69,64 @@ def _chunk_actions(actions, chunk_size, max_chunk_bytes, serializer):
6569 if bulk_actions :
6670 yield bulk_actions
6771
72+ def _process_bulk_chunk (client , bulk_actions , raise_on_exception = True , raise_on_error = True , ** kwargs ):
73+ """
74+ Send a bulk request to elasticsearch and process the output.
75+ """
76+ # if raise on error is set, we need to collect errors per chunk before raising them
77+ errors = []
78+
79+ try :
80+ # send the actual request
81+ resp = client .bulk ('\n ' .join (bulk_actions ) + '\n ' , ** kwargs )
82+ except TransportError as e :
83+ # default behavior - just propagate exception
84+ if raise_on_exception :
85+ raise e
86+
87+ # if we are not propagating, mark all actions in current chunk as failed
88+ err_message = str (e )
89+ exc_errors = []
90+
91+ # deserialize the data back, thisis expensive but only run on
92+ # errors if raise_on_exception is false, so shouldn't be a real
93+ # issue
94+ bulk_data = iter (map (client .transport .serializer .loads , bulk_actions ))
95+ while True :
96+ try :
97+ # collect all the information about failed actions
98+ action = next (bulk_data )
99+ op_type , action = action .popitem ()
100+ info = {"error" : err_message , "status" : e .status_code , "exception" : e }
101+ if op_type != 'delete' :
102+ info ['data' ] = next (bulk_data )
103+ info .update (action )
104+ exc_errors .append ({op_type : info })
105+ except StopIteration :
106+ break
107+
108+ # emulate standard behavior for failed actions
109+ if raise_on_error :
110+ raise BulkIndexError ('%i document(s) failed to index.' % len (exc_errors ), exc_errors )
111+ else :
112+ for err in exc_errors :
113+ yield False , err
114+ return
115+
116+ # go through request-reponse pairs and detect failures
117+ for op_type , item in map (methodcaller ('popitem' ), resp ['items' ]):
118+ ok = 200 <= item .get ('status' , 500 ) < 300
119+ if not ok and raise_on_error :
120+ errors .append ({op_type : item })
121+
122+ if ok or not errors :
123+ # if we are not just recording all errors to be able to raise
124+ # them all at once, yield items individually
125+ yield ok , {op_type : item }
126+
127+ if errors :
128+ raise BulkIndexError ('%i document(s) failed to index.' % len (errors ), errors )
129+
68130def streaming_bulk (client , actions , chunk_size = 500 , max_chunk_bytes = 100 * 1014 * 1024 ,
69131 raise_on_error = True , expand_action_callback = expand_action ,
70132 raise_on_exception = True , ** kwargs ):
@@ -122,63 +184,11 @@ def streaming_bulk(client, actions, chunk_size=500, max_chunk_bytes=100 * 1014 *
122184 should return a tuple containing the action line and the data line
123185 (`None` if data line should be omitted).
124186 """
125- serializer = client .transport .serializer
126187 actions = map (expand_action_callback , actions )
127188
128- # if raise on error is set, we need to collect errors per chunk before raising them
129- errors = []
130-
131- for bulk_actions in _chunk_actions (actions , chunk_size , max_chunk_bytes , serializer ):
132- try :
133- # send the actual request
134- resp = client .bulk ('\n ' .join (bulk_actions ) + '\n ' , ** kwargs )
135- except TransportError as e :
136- # default behavior - just propagate exception
137- if raise_on_exception :
138- raise e
139-
140- # if we are not propagating, mark all actions in current chunk as failed
141- err_message = str (e )
142- exc_errors = []
143-
144- # deserialize the data back, thisis expensive but only run on
145- # errors if raise_on_exception is false, so shouldn't be a real
146- # issue
147- bulk_data = iter (map (serializer .loads , bulk_actions ))
148- while True :
149- try :
150- # collect all the information about failed actions
151- action = next (bulk_data )
152- op_type , action = action .popitem ()
153- info = {"error" : err_message , "status" : e .status_code , "exception" : e }
154- if op_type != 'delete' :
155- info ['data' ] = next (bulk_data )
156- info .update (action )
157- exc_errors .append ({op_type : info })
158- except StopIteration :
159- break
160-
161- # emulate standard behavior for failed actions
162- if raise_on_error :
163- raise BulkIndexError ('%i document(s) failed to index.' % len (exc_errors ), exc_errors )
164- else :
165- for err in exc_errors :
166- yield False , err
167- continue
168-
169- # go through request-reponse pairs and detect failures
170- for op_type , item in map (methodcaller ('popitem' ), resp ['items' ]):
171- ok = 200 <= item .get ('status' , 500 ) < 300
172- if not ok and raise_on_error :
173- errors .append ({op_type : item })
174-
175- if not errors :
176- # if we are not just recording all errors to be able to raise
177- # them all at once, yield items individually
178- yield ok , {op_type : item }
179-
180- if errors :
181- raise BulkIndexError ('%i document(s) failed to index.' % len (errors ), errors )
189+ for bulk_actions in _chunk_actions (actions , chunk_size , max_chunk_bytes , client .transport .serializer ):
190+ for result in _process_bulk_chunk (client , bulk_actions , raise_on_exception , raise_on_error , ** kwargs ):
191+ yield result
182192
183193def bulk (client , actions , stats_only = False , ** kwargs ):
184194 """
0 commit comments