Skip to content

Commit 3afd684

Browse files
committed
Automatically split batch inserts PYTHON-414
With this change PyMongo will automatically split large batch inserts into multiple messages based on client.max_message_size (the largest message mongod/s will accept).
1 parent 55dd328 commit 3afd684

File tree

8 files changed

+502
-22
lines changed

8 files changed

+502
-22
lines changed

bson/buffer.c

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -140,3 +140,7 @@ int buffer_get_position(buffer_t buffer) {
140140
char* buffer_get_buffer(buffer_t buffer) {
141141
return buffer->buffer;
142142
}
143+
144+
void buffer_update_position(buffer_t buffer, buffer_position new_position) {
145+
buffer->position = new_position;
146+
}

bson/buffer.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,5 +51,6 @@ int buffer_write_at_position(buffer_t buffer, buffer_position position, const ch
5151
* since they break the abstraction. */
5252
buffer_position buffer_get_position(buffer_t buffer);
5353
char* buffer_get_buffer(buffer_t buffer);
54+
void buffer_update_position(buffer_t buffer, buffer_position new_position);
5455

5556
#endif

pymongo/_cmessagemodule.c

Lines changed: 316 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -131,10 +131,36 @@ static int add_last_error(PyObject* self, buffer_t buffer,
131131
return 1;
132132
}
133133

134+
static int init_insert_buffer(buffer_t buffer, int request_id, int options,
135+
const char* coll_name, int coll_name_len) {
136+
/* Save space for message length */
137+
int length_location = buffer_save_space(buffer, 4);
138+
if (length_location == -1) {
139+
PyErr_NoMemory();
140+
return length_location;
141+
}
142+
if (!buffer_write_bytes(buffer, (const char*)&request_id, 4) ||
143+
!buffer_write_bytes(buffer,
144+
"\x00\x00\x00\x00"
145+
"\xd2\x07\x00\x00",
146+
8) ||
147+
!buffer_write_bytes(buffer, (const char*)&options, 4) ||
148+
!buffer_write_bytes(buffer,
149+
coll_name,
150+
coll_name_len + 1)) {
151+
return -1;
152+
}
153+
return length_location;
154+
}
155+
134156
static PyObject* _cbson_insert_message(PyObject* self, PyObject* args) {
135-
/* NOTE just using a random number as the request_id */
157+
/* Note: As of PyMongo 2.6, this function is no longer used. It
158+
* is being kept (with tests) for backwards compatibility with 3rd
159+
* party libraries that may currently be using it, but will likely
160+
* be removed in a future release. */
136161
struct module_state *state = GETSTATE(self);
137162

163+
/* NOTE just using a random number as the request_id */
138164
int request_id = rand();
139165
char* collection_name = NULL;
140166
int collection_name_length;
@@ -172,23 +198,13 @@ static PyObject* _cbson_insert_message(PyObject* self, PyObject* args) {
172198
return NULL;
173199
}
174200

175-
// save space for message length
176-
length_location = buffer_save_space(buffer, 4);
201+
length_location = init_insert_buffer(buffer,
202+
request_id,
203+
options,
204+
collection_name,
205+
collection_name_length);
177206
if (length_location == -1) {
178207
PyMem_Free(collection_name);
179-
PyErr_NoMemory();
180-
return NULL;
181-
}
182-
if (!buffer_write_bytes(buffer, (const char*)&request_id, 4) ||
183-
!buffer_write_bytes(buffer,
184-
"\x00\x00\x00\x00"
185-
"\xd2\x07\x00\x00",
186-
8) ||
187-
!buffer_write_bytes(buffer, (const char*)&options, 4) ||
188-
!buffer_write_bytes(buffer,
189-
collection_name,
190-
collection_name_length + 1)) {
191-
PyMem_Free(collection_name);
192208
buffer_free(buffer);
193209
return NULL;
194210
}
@@ -259,6 +275,14 @@ static PyObject* _cbson_insert_message(PyObject* self, PyObject* args) {
259275
return result;
260276
}
261277

278+
PyDoc_STRVAR(_cbson_insert_message_doc,
279+
"Create an insert message to be sent to MongoDB\n\
280+
\n\
281+
Note: As of PyMongo 2.6, this function is no longer used. It\n\
282+
is being kept (with tests) for backwards compatibility with 3rd\n\
283+
party libraries that may currently be using it, but will likely\n\
284+
be removed in a future release.");
285+
262286
static PyObject* _cbson_update_message(PyObject* self, PyObject* args) {
263287
/* NOTE just using a random number as the request_id */
264288
struct module_state *state = GETSTATE(self);
@@ -512,15 +536,290 @@ static PyObject* _cbson_get_more_message(PyObject* self, PyObject* args) {
512536
return result;
513537
}
514538

539+
static PyObject* _cbson_do_batched_insert(PyObject* self, PyObject* args) {
540+
struct module_state *state = GETSTATE(self);
541+
542+
/* NOTE just using a random number as the request_id */
543+
int request_id = rand();
544+
int options = 0, max_size = 0;
545+
int length_location, message_length;
546+
int collection_name_length;
547+
char* collection_name = NULL;
548+
PyObject* docs;
549+
PyObject* doc;
550+
PyObject* iterator;
551+
PyObject* client;
552+
PyObject* last_error_args;
553+
PyObject* result;
554+
unsigned char check_keys;
555+
unsigned char safe;
556+
unsigned char continue_on_error;
557+
unsigned char uuid_subtype;
558+
long max_bson_size;
559+
long max_message_size;
560+
buffer_t buffer;
561+
PyObject *exc_type = NULL, *exc_value = NULL, *exc_trace = NULL;
562+
563+
if (!PyArg_ParseTuple(args, "et#ObbObbO",
564+
"utf-8",
565+
&collection_name,
566+
&collection_name_length,
567+
&docs, &check_keys, &safe,
568+
&last_error_args,
569+
&continue_on_error,
570+
&uuid_subtype, &client)) {
571+
return NULL;
572+
}
573+
if (continue_on_error) {
574+
options += 1;
575+
}
576+
577+
#if PY_MAJOR_VERSION >= 3
578+
max_bson_size = PyLong_AsLong(
579+
PyObject_GetAttrString(client, "max_bson_size"));
580+
#else
581+
max_bson_size = PyInt_AsLong(
582+
PyObject_GetAttrString(client, "max_bson_size"));
583+
#endif
584+
if (max_bson_size == -1) {
585+
PyMem_Free(collection_name);
586+
return NULL;
587+
}
588+
#if PY_MAJOR_VERSION >= 3
589+
max_message_size = PyLong_AsLong(
590+
PyObject_GetAttrString(client, "max_message_size"));
591+
#else
592+
max_message_size = PyInt_AsLong(
593+
PyObject_GetAttrString(client, "max_message_size"));
594+
#endif
595+
if (max_message_size == -1) {
596+
PyMem_Free(collection_name);
597+
return NULL;
598+
}
599+
600+
buffer = buffer_new();
601+
if (!buffer) {
602+
PyErr_NoMemory();
603+
PyMem_Free(collection_name);
604+
return NULL;
605+
}
606+
607+
length_location = init_insert_buffer(buffer,
608+
request_id,
609+
options,
610+
collection_name,
611+
collection_name_length);
612+
if (length_location == -1) {
613+
goto insertfail;
614+
}
615+
616+
iterator = PyObject_GetIter(docs);
617+
if (iterator == NULL) {
618+
PyObject* InvalidOperation = _error("InvalidOperation");
619+
if (InvalidOperation) {
620+
PyErr_SetString(InvalidOperation, "input is not iterable");
621+
Py_DECREF(InvalidOperation);
622+
}
623+
goto insertfail;
624+
}
625+
while ((doc = PyIter_Next(iterator)) != NULL) {
626+
int before = buffer_get_position(buffer);
627+
int cur_size;
628+
if (!write_dict(state->_cbson, buffer, doc, check_keys, uuid_subtype, 1)) {
629+
Py_DECREF(doc);
630+
goto iterfail;
631+
}
632+
Py_DECREF(doc);
633+
634+
cur_size = buffer_get_position(buffer) - before;
635+
max_size = (cur_size > max_size) ? cur_size : max_size;
636+
if (cur_size > max_bson_size) {
637+
PyObject* InvalidDocument = _error("InvalidDocument");
638+
if (InvalidDocument) {
639+
const char* msg = ("BSON document too large (%ld bytes)"
640+
" - the connected server supports"
641+
" BSON document sizes up to %ld bytes.");
642+
#if PY_MAJOR_VERSION >= 3
643+
PyObject* error = PyUnicode_FromFormat(msg,
644+
cur_size, max_bson_size);
645+
#else
646+
PyObject* error = PyString_FromFormat(msg,
647+
cur_size, max_bson_size);
648+
#endif
649+
if (error) {
650+
PyErr_SetObject(InvalidDocument, error);
651+
Py_DECREF(error);
652+
}
653+
Py_DECREF(InvalidDocument);
654+
}
655+
goto iterfail;
656+
}
657+
658+
/* We have enough data, send this batch. */
659+
if (buffer_get_position(buffer) > max_message_size) {
660+
int new_request_id = rand();
661+
int message_start;
662+
PyObject* send_gle = Py_False;
663+
buffer_t new_buffer = buffer_new();
664+
if (!new_buffer) {
665+
PyErr_NoMemory();
666+
goto iterfail;
667+
}
668+
message_start = init_insert_buffer(new_buffer,
669+
new_request_id,
670+
options,
671+
collection_name,
672+
collection_name_length);
673+
if (message_start == -1) {
674+
buffer_free(new_buffer);
675+
goto iterfail;
676+
}
677+
678+
/* Copy the overflow encoded document into the new buffer. */
679+
if (!buffer_write_bytes(new_buffer,
680+
(const char*)buffer_get_buffer(buffer) + before, cur_size)) {
681+
buffer_free(new_buffer);
682+
goto iterfail;
683+
}
684+
685+
/* Roll back to the beginning of this document. */
686+
buffer_update_position(buffer, before);
687+
message_length = buffer_get_position(buffer) - length_location;
688+
memcpy(buffer_get_buffer(buffer) + length_location, &message_length, 4);
689+
690+
/* If we are doing unacknowledged writes *and* continue_on_error
691+
* is True it's pointless (and slower) to send GLE. */
692+
if (safe || !continue_on_error) {
693+
send_gle = Py_True;
694+
if (!add_last_error(self, buffer, request_id, collection_name,
695+
collection_name_length, last_error_args)) {
696+
buffer_free(new_buffer);
697+
goto iterfail;
698+
}
699+
}
700+
/* Objectify buffer */
701+
result = Py_BuildValue("i" BYTES_FORMAT_STRING, request_id,
702+
buffer_get_buffer(buffer),
703+
buffer_get_position(buffer));
704+
buffer_free(buffer);
705+
buffer = new_buffer;
706+
request_id = new_request_id;
707+
length_location = message_start;
708+
709+
if (!PyObject_CallMethod(client,
710+
"_send_message", "NO", result, send_gle)) {
711+
PyObject *etype = NULL, *evalue = NULL, *etrace = NULL;
712+
PyErr_Fetch(&etype, &evalue, &etrace);
713+
PyObject* OperationFailure = _error("OperationFailure");
714+
if (OperationFailure) {
715+
if (PyErr_GivenExceptionMatches(etype, OperationFailure)) {
716+
if (!safe || continue_on_error) {
717+
Py_DECREF(OperationFailure);
718+
if (!safe) {
719+
/* We're doing unacknowledged writes and
720+
* continue_on_error is False. Just return. */
721+
Py_DECREF(etype);
722+
Py_DECREF(evalue);
723+
Py_DECREF(etrace);
724+
Py_DECREF(iterator);
725+
buffer_free(buffer);
726+
PyMem_Free(collection_name);
727+
Py_RETURN_NONE;
728+
}
729+
/* continue_on_error is True, store the error
730+
* details to re-raise after the final batch */
731+
Py_XDECREF(exc_type);
732+
Py_XDECREF(exc_value);
733+
Py_XDECREF(exc_trace);
734+
exc_type = etype;
735+
exc_value = evalue;
736+
exc_trace = etrace;
737+
continue;
738+
}
739+
}
740+
Py_DECREF(OperationFailure);
741+
}
742+
/* This isn't OperationFailure, we couldn't
743+
* import OperationFailure, or we are doing
744+
* acknowledged writes. Re-raise immediately. */
745+
PyErr_Restore(etype, evalue, etrace);
746+
goto iterfail;
747+
}
748+
}
749+
}
750+
Py_DECREF(iterator);
751+
752+
if (PyErr_Occurred()) {
753+
goto insertfail;
754+
}
755+
756+
if (!max_size) {
757+
PyObject* InvalidOperation = _error("InvalidOperation");
758+
if (InvalidOperation) {
759+
PyErr_SetString(InvalidOperation, "cannot do an empty bulk insert");
760+
Py_DECREF(InvalidOperation);
761+
}
762+
goto insertfail;
763+
}
764+
765+
message_length = buffer_get_position(buffer) - length_location;
766+
memcpy(buffer_get_buffer(buffer) + length_location, &message_length, 4);
767+
768+
if (safe) {
769+
if (!add_last_error(self, buffer, request_id, collection_name,
770+
collection_name_length, last_error_args)) {
771+
goto insertfail;
772+
}
773+
}
774+
775+
PyMem_Free(collection_name);
776+
777+
/* objectify buffer */
778+
result = Py_BuildValue("i" BYTES_FORMAT_STRING, request_id,
779+
buffer_get_buffer(buffer),
780+
buffer_get_position(buffer));
781+
buffer_free(buffer);
782+
783+
/* Send the last (or only) batch */
784+
if (!PyObject_CallMethod(client, "_send_message", "NN",
785+
result, PyBool_FromLong((long)safe))) {
786+
Py_XDECREF(exc_type);
787+
Py_XDECREF(exc_value);
788+
Py_XDECREF(exc_trace);
789+
return NULL;
790+
}
791+
792+
if (exc_type) {
793+
/* Re-raise any previously stored exception
794+
* due to continue_on_error being True */
795+
PyErr_Restore(exc_type, exc_value, exc_trace);
796+
return NULL;
797+
}
798+
799+
Py_RETURN_NONE;
800+
801+
iterfail:
802+
Py_DECREF(iterator);
803+
insertfail:
804+
Py_XDECREF(exc_type);
805+
Py_XDECREF(exc_value);
806+
Py_XDECREF(exc_trace);
807+
buffer_free(buffer);
808+
PyMem_Free(collection_name);
809+
return NULL;
810+
}
811+
515812
static PyMethodDef _CMessageMethods[] = {
516813
{"_insert_message", _cbson_insert_message, METH_VARARGS,
517-
"create an insert message to be sent to MongoDB"},
814+
_cbson_insert_message_doc},
518815
{"_update_message", _cbson_update_message, METH_VARARGS,
519816
"create an update message to be sent to MongoDB"},
520817
{"_query_message", _cbson_query_message, METH_VARARGS,
521818
"create a query message to be sent to MongoDB"},
522819
{"_get_more_message", _cbson_get_more_message, METH_VARARGS,
523820
"create a get more message to be sent to MongoDB"},
821+
{"_do_batched_insert", _cbson_do_batched_insert, METH_VARARGS,
822+
"insert a batch of documents, splitting the batch as needed"},
524823
{NULL, NULL, 0, NULL}
525824
};
526825

0 commit comments

Comments
 (0)