Skip to content

Commit 4d786fd

Browse files
committed
PYTHON-1329 - OP_MSG bulk writes
1 parent ea8cb6c commit 4d786fd

File tree

4 files changed

+495
-25
lines changed

4 files changed

+495
-25
lines changed

pymongo/_cmessagemodule.c

Lines changed: 315 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1117,6 +1117,315 @@ static PyObject* _cbson_do_batched_insert(PyObject* self, PyObject* args) {
11171117
#define _UPDATE 1
11181118
#define _DELETE 2
11191119

1120+
/* OP_MSG ----------------------------------------------- */
1121+
1122+
static int
1123+
_batched_op_msg(
1124+
unsigned char op, unsigned char check_keys, unsigned char ack,
1125+
PyObject* command, PyObject* docs, PyObject* ctx,
1126+
PyObject* to_publish, codec_options_t options,
1127+
buffer_t buffer, struct module_state *state) {
1128+
1129+
long max_bson_size;
1130+
long max_write_batch_size;
1131+
long max_message_size;
1132+
int idx = 0;
1133+
int size_location;
1134+
int position;
1135+
int length;
1136+
PyObject* max_bson_size_obj;
1137+
PyObject* max_write_batch_size_obj;
1138+
PyObject* max_message_size_obj;
1139+
PyObject* doc;
1140+
PyObject* iterator;
1141+
char* flags = ack ? "\x00\x00\x00\x00" : "\x02\x00\x00\x00";
1142+
1143+
max_bson_size_obj = PyObject_GetAttrString(ctx, "max_bson_size");
1144+
#if PY_MAJOR_VERSION >= 3
1145+
max_bson_size = PyLong_AsLong(max_bson_size_obj);
1146+
#else
1147+
max_bson_size = PyInt_AsLong(max_bson_size_obj);
1148+
#endif
1149+
Py_XDECREF(max_bson_size_obj);
1150+
if (max_bson_size == -1) {
1151+
return 0;
1152+
}
1153+
1154+
max_write_batch_size_obj = PyObject_GetAttrString(ctx, "max_write_batch_size");
1155+
#if PY_MAJOR_VERSION >= 3
1156+
max_write_batch_size = PyLong_AsLong(max_write_batch_size_obj);
1157+
#else
1158+
max_write_batch_size = PyInt_AsLong(max_write_batch_size_obj);
1159+
#endif
1160+
Py_XDECREF(max_write_batch_size_obj);
1161+
if (max_write_batch_size == -1) {
1162+
return 0;
1163+
}
1164+
1165+
max_message_size_obj = PyObject_GetAttrString(ctx, "max_message_size");
1166+
#if PY_MAJOR_VERSION >= 3
1167+
max_message_size = PyLong_AsLong(max_message_size_obj);
1168+
#else
1169+
max_message_size = PyInt_AsLong(max_message_size_obj);
1170+
#endif
1171+
Py_XDECREF(max_message_size_obj);
1172+
if (max_message_size == -1) {
1173+
return 0;
1174+
}
1175+
1176+
if (!buffer_write_bytes(buffer, flags, 4)) {
1177+
return 0;
1178+
}
1179+
/* Type 0 Section */
1180+
if (!buffer_write_bytes(buffer, "\x00", 1)) {
1181+
return 0;
1182+
}
1183+
if (!write_dict(state->_cbson, buffer, command, 0,
1184+
&options, 0)) {
1185+
return 0;
1186+
}
1187+
1188+
/* Type 1 Section */
1189+
if (!buffer_write_bytes(buffer, "\x01", 1)) {
1190+
return 0;
1191+
}
1192+
/* Save space for size */
1193+
size_location = buffer_save_space(buffer, 4);
1194+
1195+
switch (op) {
1196+
case _INSERT:
1197+
{
1198+
if (!buffer_write_bytes(buffer, "documents\x00", 10))
1199+
goto cmdfail;
1200+
break;
1201+
}
1202+
case _UPDATE:
1203+
{
1204+
/* MongoDB does key validation for update. */
1205+
check_keys = 0;
1206+
if (!buffer_write_bytes(buffer, "updates\x00", 8))
1207+
goto cmdfail;
1208+
break;
1209+
}
1210+
case _DELETE:
1211+
{
1212+
/* Never check keys in a delete command. */
1213+
check_keys = 0;
1214+
if (!buffer_write_bytes(buffer, "deletes\x00", 8))
1215+
goto cmdfail;
1216+
break;
1217+
}
1218+
default:
1219+
{
1220+
PyObject* InvalidOperation = _error("InvalidOperation");
1221+
if (InvalidOperation) {
1222+
PyErr_SetString(InvalidOperation, "Unknown command");
1223+
Py_DECREF(InvalidOperation);
1224+
}
1225+
return 0;
1226+
}
1227+
}
1228+
1229+
iterator = PyObject_GetIter(docs);
1230+
if (iterator == NULL) {
1231+
PyObject* InvalidOperation = _error("InvalidOperation");
1232+
if (InvalidOperation) {
1233+
PyErr_SetString(InvalidOperation, "input is not iterable");
1234+
Py_DECREF(InvalidOperation);
1235+
}
1236+
return 0;
1237+
}
1238+
while ((doc = PyIter_Next(iterator)) != NULL) {
1239+
int cur_doc_begin = buffer_get_position(buffer);
1240+
int cur_size;
1241+
int enough_data = 0;
1242+
int enough_documents = 0;
1243+
if (!write_dict(state->_cbson, buffer, doc, check_keys,
1244+
&options, 1)) {
1245+
goto cmditerfail;
1246+
}
1247+
/* We have enough data, return this batch. */
1248+
enough_data = (buffer_get_position(buffer) > max_message_size);
1249+
enough_documents = (idx >= max_write_batch_size);
1250+
if (enough_data || enough_documents) {
1251+
cur_size = buffer_get_position(buffer) - cur_doc_begin;
1252+
1253+
/* This single document is too large for the message. */
1254+
if (!idx) {
1255+
if (op == _INSERT) {
1256+
_set_document_too_large(cur_size, max_bson_size);
1257+
} else {
1258+
PyObject* DocumentTooLarge = _error("DocumentTooLarge");
1259+
if (DocumentTooLarge) {
1260+
/*
1261+
* There's nothing intelligent we can say
1262+
* about size for update and remove.
1263+
*/
1264+
PyErr_SetString(DocumentTooLarge,
1265+
"operation document too large");
1266+
Py_DECREF(DocumentTooLarge);
1267+
}
1268+
}
1269+
goto cmditerfail;
1270+
}
1271+
/*
1272+
* Roll the existing buffer back to the beginning
1273+
* of the last document encoded.
1274+
*/
1275+
buffer_update_position(buffer, cur_doc_begin);
1276+
break;
1277+
}
1278+
if (PyList_Append(to_publish, doc) < 0) {
1279+
goto cmditerfail;
1280+
}
1281+
Py_CLEAR(doc);
1282+
idx += 1;
1283+
}
1284+
Py_DECREF(iterator);
1285+
1286+
if (PyErr_Occurred()) {
1287+
goto cmdfail;
1288+
}
1289+
1290+
position = buffer_get_position(buffer);
1291+
length = position - size_location;
1292+
buffer_write_int32_at_position(buffer, size_location, (int32_t)length);
1293+
return 1;
1294+
1295+
cmditerfail:
1296+
Py_XDECREF(doc);
1297+
Py_DECREF(iterator);
1298+
cmdfail:
1299+
return 0;
1300+
}
1301+
1302+
static PyObject*
1303+
_cbson_encode_batched_op_msg(PyObject* self, PyObject* args) {
1304+
unsigned char op;
1305+
unsigned char check_keys;
1306+
unsigned char ack;
1307+
PyObject* command;
1308+
PyObject* docs;
1309+
PyObject* ctx = NULL;
1310+
PyObject* to_publish = NULL;
1311+
PyObject* result = NULL;
1312+
codec_options_t options;
1313+
buffer_t buffer;
1314+
struct module_state *state = GETSTATE(self);
1315+
1316+
if (!PyArg_ParseTuple(args, "bOObbO&O",
1317+
&op, &command, &docs, &check_keys, &ack,
1318+
convert_codec_options, &options,
1319+
&ctx)) {
1320+
return NULL;
1321+
}
1322+
if (!(buffer = buffer_new())) {
1323+
PyErr_NoMemory();
1324+
destroy_codec_options(&options);
1325+
return NULL;
1326+
}
1327+
if (!(to_publish = PyList_New(0))) {
1328+
goto fail;
1329+
}
1330+
1331+
if (!_batched_op_msg(
1332+
op,
1333+
check_keys,
1334+
ack,
1335+
command,
1336+
docs,
1337+
ctx,
1338+
to_publish,
1339+
options,
1340+
buffer,
1341+
state)) {
1342+
goto fail;
1343+
}
1344+
1345+
result = Py_BuildValue(BYTES_FORMAT_STRING "O",
1346+
buffer_get_buffer(buffer),
1347+
buffer_get_position(buffer),
1348+
to_publish);
1349+
fail:
1350+
destroy_codec_options(&options);
1351+
buffer_free(buffer);
1352+
Py_XDECREF(to_publish);
1353+
return result;
1354+
}
1355+
1356+
static PyObject*
1357+
_cbson_batched_op_msg(PyObject* self, PyObject* args) {
1358+
unsigned char op;
1359+
unsigned char check_keys;
1360+
unsigned char ack;
1361+
int request_id;
1362+
int position;
1363+
PyObject* command;
1364+
PyObject* docs;
1365+
PyObject* ctx = NULL;
1366+
PyObject* to_publish = NULL;
1367+
PyObject* result = NULL;
1368+
codec_options_t options;
1369+
buffer_t buffer;
1370+
struct module_state *state = GETSTATE(self);
1371+
1372+
if (!PyArg_ParseTuple(args, "bOObbO&O",
1373+
&op, &command, &docs, &check_keys, &ack,
1374+
convert_codec_options, &options,
1375+
&ctx)) {
1376+
return NULL;
1377+
}
1378+
if (!(buffer = buffer_new())) {
1379+
PyErr_NoMemory();
1380+
destroy_codec_options(&options);
1381+
return NULL;
1382+
}
1383+
/* Save space for message length and request id */
1384+
if ((buffer_save_space(buffer, 8)) == -1) {
1385+
PyErr_NoMemory();
1386+
goto fail;
1387+
}
1388+
if (!buffer_write_bytes(buffer,
1389+
"\x00\x00\x00\x00" /* responseTo */
1390+
"\xdd\x07\x00\x00", /* opcode */
1391+
8)) {
1392+
goto fail;
1393+
}
1394+
if (!(to_publish = PyList_New(0))) {
1395+
goto fail;
1396+
}
1397+
1398+
if (!_batched_op_msg(
1399+
op,
1400+
check_keys,
1401+
ack,
1402+
command,
1403+
docs,
1404+
ctx,
1405+
to_publish,
1406+
options,
1407+
buffer,
1408+
state)) {
1409+
goto fail;
1410+
}
1411+
1412+
request_id = rand();
1413+
position = buffer_get_position(buffer);
1414+
buffer_write_int32_at_position(buffer, 0, (int32_t)position);
1415+
buffer_write_int32_at_position(buffer, 4, (int32_t)request_id);
1416+
result = Py_BuildValue("i" BYTES_FORMAT_STRING "O", request_id,
1417+
buffer_get_buffer(buffer),
1418+
buffer_get_position(buffer),
1419+
to_publish);
1420+
fail:
1421+
destroy_codec_options(&options);
1422+
buffer_free(buffer);
1423+
Py_XDECREF(to_publish);
1424+
return result;
1425+
}
1426+
1427+
/* End OP_MSG -------------------------------------------- */
1428+
11201429
static int
11211430
_batched_write_command(
11221431
char* ns, int ns_len, unsigned char op, int check_keys,
@@ -1376,7 +1685,7 @@ _cbson_encode_batched_write_command(PyObject* self, PyObject* args) {
13761685
}
13771686

13781687
static PyObject*
1379-
_cbson_do_batched_write_command(PyObject* self, PyObject* args) {
1688+
_cbson_batched_write_command(PyObject* self, PyObject* args) {
13801689
char *ns = NULL;
13811690
unsigned char op;
13821691
unsigned char check_keys;
@@ -1463,10 +1772,14 @@ static PyMethodDef _CMessageMethods[] = {
14631772
"create an OP_MSG message to be sent to MongoDB"},
14641773
{"_do_batched_insert", _cbson_do_batched_insert, METH_VARARGS,
14651774
"insert a batch of documents, splitting the batch as needed"},
1466-
{"_do_batched_write_command", _cbson_do_batched_write_command, METH_VARARGS,
1775+
{"_batched_write_command", _cbson_batched_write_command, METH_VARARGS,
14671776
"Create the next batched insert, update, or delete command"},
14681777
{"_encode_batched_write_command", _cbson_encode_batched_write_command, METH_VARARGS,
14691778
"Encode the next batched insert, update, or delete command"},
1779+
{"_batched_op_msg", _cbson_batched_op_msg, METH_VARARGS,
1780+
"Create the next batched insert, update, or delete using OP_MSG"},
1781+
{"_encode_batched_op_msg", _cbson_encode_batched_op_msg, METH_VARARGS,
1782+
"Encode the next batched insert, update, or delete using OP_MSG"},
14701783
{NULL, NULL, 0, NULL}
14711784
};
14721785

pymongo/bulk.py

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -27,16 +27,15 @@
2727
validate_is_document_type,
2828
validate_ok_for_replace,
2929
validate_ok_for_update)
30-
from pymongo.helpers import _RETRYABLE_ERROR_CODES, _raise_write_concern_error
30+
from pymongo.helpers import _RETRYABLE_ERROR_CODES
3131
from pymongo.collation import validate_collation_or_none
3232
from pymongo.errors import (BulkWriteError,
3333
ConfigurationError,
3434
InvalidOperation,
3535
OperationFailure)
3636
from pymongo.message import (_INSERT, _UPDATE, _DELETE,
3737
_do_batched_insert,
38-
_do_batched_write_command,
39-
_do_batched_write_command_compressed,
38+
_do_bulk_write_command,
4039
_randint,
4140
_BulkWriteContext)
4241
from pymongo.read_preferences import ReadPreference
@@ -260,11 +259,6 @@ def _execute_command(self, generator, write_concern, session,
260259
self.current_run = next(generator)
261260
run = self.current_run
262261

263-
if sock_info.compression_context:
264-
do_writes = _do_batched_write_command_compressed
265-
else:
266-
do_writes = _do_batched_write_command
267-
268262
# sock_info.command validates the session, but we use
269263
# sock_info.write_command.
270264
sock_info.validate_session(client, session)
@@ -285,7 +279,7 @@ def _execute_command(self, generator, write_concern, session,
285279
check_keys = run.op_type == _INSERT
286280
ops = islice(run.ops, run.idx_offset, None)
287281
# Run as many ops as possible.
288-
request_id, msg, to_send = do_writes(
282+
request_id, msg, to_send = _do_bulk_write_command(
289283
self.namespace, run.op_type, cmd, ops, check_keys,
290284
self.collection.codec_options, bwc)
291285
if not to_send:

0 commit comments

Comments
 (0)