Skip to content

Commit ebae611

Browse files
committed
Fix typo in plasma protocol; add DCHECK for ReadXXX in plasma protocol.
1 parent db181d1 commit ebae611

File tree

5 files changed

+110
-80
lines changed

5 files changed

+110
-80
lines changed

cpp/src/plasma/client.cc

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,7 @@ Status PlasmaClient::Create(const ObjectID& object_id, int64_t data_size,
130130
RETURN_NOT_OK(PlasmaReceive(store_conn_, MessageType_PlasmaCreateReply, &buffer));
131131
ObjectID id;
132132
PlasmaObject object;
133-
RETURN_NOT_OK(ReadCreateReply(buffer.data(), &id, &object));
133+
RETURN_NOT_OK(ReadCreateReply(buffer.data(), buffer.size(), &id, &object));
134134
// If the CreateReply included an error, then the store will not send a file
135135
// descriptor.
136136
int fd = recv_fd(store_conn_);
@@ -204,7 +204,7 @@ Status PlasmaClient::Get(const ObjectID* object_ids, int64_t num_objects,
204204
std::vector<PlasmaObject> object_data(num_objects);
205205
PlasmaObject* object;
206206
RETURN_NOT_OK(ReadGetReply(
207-
buffer.data(), received_object_ids.data(), object_data.data(), num_objects));
207+
buffer.data(), buffer.size(), received_object_ids.data(), object_data.data(), num_objects));
208208

209209
for (int i = 0; i < num_objects; ++i) {
210210
DCHECK(received_object_ids[i] == object_ids[i]);
@@ -328,7 +328,7 @@ Status PlasmaClient::Contains(const ObjectID& object_id, bool* has_object) {
328328
std::vector<uint8_t> buffer;
329329
RETURN_NOT_OK(PlasmaReceive(store_conn_, MessageType_PlasmaContainsReply, &buffer));
330330
ObjectID object_id2;
331-
RETURN_NOT_OK(ReadContainsReply(buffer.data(), &object_id2, has_object));
331+
RETURN_NOT_OK(ReadContainsReply(buffer.data(), buffer.size(), &object_id2, has_object));
332332
}
333333
return Status::OK();
334334
}
@@ -436,7 +436,7 @@ Status PlasmaClient::Evict(int64_t num_bytes, int64_t& num_bytes_evicted) {
436436
std::vector<uint8_t> buffer;
437437
int64_t type;
438438
RETURN_NOT_OK(ReadMessage(store_conn_, &type, &buffer));
439-
return ReadEvictReply(buffer.data(), num_bytes_evicted);
439+
return ReadEvictReply(buffer.data(), buffer.size(), num_bytes_evicted);
440440
}
441441

442442
Status PlasmaClient::Subscribe(int* fd) {
@@ -473,7 +473,7 @@ Status PlasmaClient::Connect(const std::string& store_socket_name,
473473
RETURN_NOT_OK(SendConnectRequest(store_conn_));
474474
std::vector<uint8_t> buffer;
475475
RETURN_NOT_OK(PlasmaReceive(store_conn_, MessageType_PlasmaConnectReply, &buffer));
476-
RETURN_NOT_OK(ReadConnectReply(buffer.data(), &store_capacity_));
476+
RETURN_NOT_OK(ReadConnectReply(buffer.data(), buffer.size(), &store_capacity_));
477477
return Status::OK();
478478
}
479479

@@ -511,7 +511,7 @@ Status PlasmaClient::Info(const ObjectID& object_id, int* object_status) {
511511
std::vector<uint8_t> buffer;
512512
RETURN_NOT_OK(PlasmaReceive(manager_conn_, MessageType_PlasmaStatusReply, &buffer));
513513
ObjectID id;
514-
RETURN_NOT_OK(ReadStatusReply(buffer.data(), &id, object_status, 1));
514+
RETURN_NOT_OK(ReadStatusReply(buffer.data(), buffer.size(), &id, object_status, 1));
515515
ARROW_CHECK(object_id == id);
516516
return Status::OK();
517517
}
@@ -532,7 +532,7 @@ Status PlasmaClient::Wait(int64_t num_object_requests, ObjectRequest* object_req
532532
num_ready_objects, timeout_ms));
533533
std::vector<uint8_t> buffer;
534534
RETURN_NOT_OK(PlasmaReceive(manager_conn_, MessageType_PlasmaWaitReply, &buffer));
535-
RETURN_NOT_OK(ReadWaitReply(buffer.data(), object_requests, &num_ready_objects));
535+
RETURN_NOT_OK(ReadWaitReply(buffer.data(), buffer.size(), object_requests, &num_ready_objects));
536536

537537
*num_objects_ready = 0;
538538
for (int i = 0; i < num_object_requests; ++i) {

cpp/src/plasma/protocol.cc

Lines changed: 47 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -61,9 +61,10 @@ Status SendCreateRequest(
6161
}
6262

6363
Status ReadCreateRequest(
64-
uint8_t* data, ObjectID* object_id, int64_t* data_size, int64_t* metadata_size) {
64+
uint8_t* data, size_t size, ObjectID* object_id, int64_t* data_size, int64_t* metadata_size) {
6565
DCHECK(data);
6666
auto message = flatbuffers::GetRoot<PlasmaCreateRequest>(data);
67+
DCHECK(verify_flatbuffer(message, data, size));
6768
*data_size = message->data_size();
6869
*metadata_size = message->metadata_size();
6970
*object_id = ObjectID::from_binary(message->object_id()->str());
@@ -81,9 +82,10 @@ Status SendCreateReply(
8182
return PlasmaSend(sock, MessageType_PlasmaCreateReply, &fbb, message);
8283
}
8384

84-
Status ReadCreateReply(uint8_t* data, ObjectID* object_id, PlasmaObject* object) {
85+
Status ReadCreateReply(uint8_t* data, size_t size, ObjectID* object_id, PlasmaObject* object) {
8586
DCHECK(data);
8687
auto message = flatbuffers::GetRoot<PlasmaCreateReply>(data);
88+
DCHECK(verify_flatbuffer(message, data, size));
8789
*object_id = ObjectID::from_binary(message->object_id()->str());
8890
object->handle.store_fd = message->plasma_object()->segment_index();
8991
object->handle.mmap_size = message->plasma_object()->mmap_size();
@@ -104,9 +106,10 @@ Status SendSealRequest(int sock, ObjectID object_id, unsigned char* digest) {
104106
return PlasmaSend(sock, MessageType_PlasmaSealRequest, &fbb, message);
105107
}
106108

107-
Status ReadSealRequest(uint8_t* data, ObjectID* object_id, unsigned char* digest) {
109+
Status ReadSealRequest(uint8_t* data, size_t size, ObjectID* object_id, unsigned char* digest) {
108110
DCHECK(data);
109111
auto message = flatbuffers::GetRoot<PlasmaSealRequest>(data);
112+
DCHECK(verify_flatbuffer(message, data, size));
110113
*object_id = ObjectID::from_binary(message->object_id()->str());
111114
ARROW_CHECK(message->digest()->size() == kDigestSize);
112115
memcpy(digest, message->digest()->data(), kDigestSize);
@@ -120,9 +123,10 @@ Status SendSealReply(int sock, ObjectID object_id, int error) {
120123
return PlasmaSend(sock, MessageType_PlasmaSealReply, &fbb, message);
121124
}
122125

123-
Status ReadSealReply(uint8_t* data, ObjectID* object_id) {
126+
Status ReadSealReply(uint8_t* data, size_t size, ObjectID* object_id) {
124127
DCHECK(data);
125128
auto message = flatbuffers::GetRoot<PlasmaSealReply>(data);
129+
DCHECK(verify_flatbuffer(message, data, size));
126130
*object_id = ObjectID::from_binary(message->object_id()->str());
127131
return plasma_error_status(message->error());
128132
}
@@ -131,13 +135,14 @@ Status ReadSealReply(uint8_t* data, ObjectID* object_id) {
131135

132136
Status SendReleaseRequest(int sock, ObjectID object_id) {
133137
flatbuffers::FlatBufferBuilder fbb;
134-
auto message = CreatePlasmaSealRequest(fbb, fbb.CreateString(object_id.binary()));
138+
auto message = CreatePlasmaReleaseRequest(fbb, fbb.CreateString(object_id.binary()));
135139
return PlasmaSend(sock, MessageType_PlasmaReleaseRequest, &fbb, message);
136140
}
137141

138-
Status ReadReleaseRequest(uint8_t* data, ObjectID* object_id) {
142+
Status ReadReleaseRequest(uint8_t* data, size_t size, ObjectID* object_id) {
139143
DCHECK(data);
140144
auto message = flatbuffers::GetRoot<PlasmaReleaseRequest>(data);
145+
DCHECK(verify_flatbuffer(message, data, size));
141146
*object_id = ObjectID::from_binary(message->object_id()->str());
142147
return Status::OK();
143148
}
@@ -149,9 +154,10 @@ Status SendReleaseReply(int sock, ObjectID object_id, int error) {
149154
return PlasmaSend(sock, MessageType_PlasmaReleaseReply, &fbb, message);
150155
}
151156

152-
Status ReadReleaseReply(uint8_t* data, ObjectID* object_id) {
157+
Status ReadReleaseReply(uint8_t* data, size_t size, ObjectID* object_id) {
153158
DCHECK(data);
154159
auto message = flatbuffers::GetRoot<PlasmaReleaseReply>(data);
160+
DCHECK(verify_flatbuffer(message, data, size));
155161
*object_id = ObjectID::from_binary(message->object_id()->str());
156162
return plasma_error_status(message->error());
157163
}
@@ -164,9 +170,10 @@ Status SendDeleteRequest(int sock, ObjectID object_id) {
164170
return PlasmaSend(sock, MessageType_PlasmaDeleteRequest, &fbb, message);
165171
}
166172

167-
Status ReadDeleteRequest(uint8_t* data, ObjectID* object_id) {
173+
Status ReadDeleteRequest(uint8_t* data, size_t size, ObjectID* object_id) {
168174
DCHECK(data);
169175
auto message = flatbuffers::GetRoot<PlasmaReleaseReply>(data);
176+
DCHECK(verify_flatbuffer(message, data, size));
170177
*object_id = ObjectID::from_binary(message->object_id()->str());
171178
return Status::OK();
172179
}
@@ -178,9 +185,10 @@ Status SendDeleteReply(int sock, ObjectID object_id, int error) {
178185
return PlasmaSend(sock, MessageType_PlasmaDeleteReply, &fbb, message);
179186
}
180187

181-
Status ReadDeleteReply(uint8_t* data, ObjectID* object_id) {
188+
Status ReadDeleteReply(uint8_t* data, size_t size, ObjectID* object_id) {
182189
DCHECK(data);
183190
auto message = flatbuffers::GetRoot<PlasmaDeleteReply>(data);
191+
DCHECK(verify_flatbuffer(message, data, size));
184192
*object_id = ObjectID::from_binary(message->object_id()->str());
185193
return plasma_error_status(message->error());
186194
}
@@ -194,9 +202,10 @@ Status SendStatusRequest(int sock, const ObjectID* object_ids, int64_t num_objec
194202
return PlasmaSend(sock, MessageType_PlasmaStatusRequest, &fbb, message);
195203
}
196204

197-
Status ReadStatusRequest(uint8_t* data, ObjectID object_ids[], int64_t num_objects) {
205+
Status ReadStatusRequest(uint8_t* data, size_t size, ObjectID object_ids[], int64_t num_objects) {
198206
DCHECK(data);
199207
auto message = flatbuffers::GetRoot<PlasmaStatusRequest>(data);
208+
DCHECK(verify_flatbuffer(message, data, size));
200209
for (uoffset_t i = 0; i < num_objects; ++i) {
201210
object_ids[i] = ObjectID::from_binary(message->object_ids()->Get(i)->str());
202211
}
@@ -212,16 +221,18 @@ Status SendStatusReply(
212221
return PlasmaSend(sock, MessageType_PlasmaStatusReply, &fbb, message);
213222
}
214223

215-
int64_t ReadStatusReply_num_objects(uint8_t* data) {
224+
int64_t ReadStatusReply_num_objects(uint8_t* data, size_t size) {
216225
DCHECK(data);
217226
auto message = flatbuffers::GetRoot<PlasmaStatusReply>(data);
227+
DCHECK(verify_flatbuffer(message, data, size));
218228
return message->object_ids()->size();
219229
}
220230

221231
Status ReadStatusReply(
222-
uint8_t* data, ObjectID object_ids[], int object_status[], int64_t num_objects) {
232+
uint8_t* data, size_t size, ObjectID object_ids[], int object_status[], int64_t num_objects) {
223233
DCHECK(data);
224234
auto message = flatbuffers::GetRoot<PlasmaStatusReply>(data);
235+
DCHECK(verify_flatbuffer(message, data, size));
225236
for (uoffset_t i = 0; i < num_objects; ++i) {
226237
object_ids[i] = ObjectID::from_binary(message->object_ids()->Get(i)->str());
227238
}
@@ -239,9 +250,10 @@ Status SendContainsRequest(int sock, ObjectID object_id) {
239250
return PlasmaSend(sock, MessageType_PlasmaContainsRequest, &fbb, message);
240251
}
241252

242-
Status ReadContainsRequest(uint8_t* data, ObjectID* object_id) {
253+
Status ReadContainsRequest(uint8_t* data, size_t size, ObjectID* object_id) {
243254
DCHECK(data);
244255
auto message = flatbuffers::GetRoot<PlasmaContainsRequest>(data);
256+
DCHECK(verify_flatbuffer(message, data, size));
245257
*object_id = ObjectID::from_binary(message->object_id()->str());
246258
return Status::OK();
247259
}
@@ -253,9 +265,10 @@ Status SendContainsReply(int sock, ObjectID object_id, bool has_object) {
253265
return PlasmaSend(sock, MessageType_PlasmaContainsReply, &fbb, message);
254266
}
255267

256-
Status ReadContainsReply(uint8_t* data, ObjectID* object_id, bool* has_object) {
268+
Status ReadContainsReply(uint8_t* data, size_t size, ObjectID* object_id, bool* has_object) {
257269
DCHECK(data);
258270
auto message = flatbuffers::GetRoot<PlasmaContainsReply>(data);
271+
DCHECK(verify_flatbuffer(message, data, size));
259272
*object_id = ObjectID::from_binary(message->object_id()->str());
260273
*has_object = message->has_object();
261274
return Status::OK();
@@ -279,9 +292,10 @@ Status SendConnectReply(int sock, int64_t memory_capacity) {
279292
return PlasmaSend(sock, MessageType_PlasmaConnectReply, &fbb, message);
280293
}
281294

282-
Status ReadConnectReply(uint8_t* data, int64_t* memory_capacity) {
295+
Status ReadConnectReply(uint8_t* data, size_t size, int64_t* memory_capacity) {
283296
DCHECK(data);
284297
auto message = flatbuffers::GetRoot<PlasmaConnectReply>(data);
298+
DCHECK(verify_flatbuffer(message, data, size));
285299
*memory_capacity = message->memory_capacity();
286300
return Status::OK();
287301
}
@@ -294,9 +308,10 @@ Status SendEvictRequest(int sock, int64_t num_bytes) {
294308
return PlasmaSend(sock, MessageType_PlasmaEvictRequest, &fbb, message);
295309
}
296310

297-
Status ReadEvictRequest(uint8_t* data, int64_t* num_bytes) {
311+
Status ReadEvictRequest(uint8_t* data, size_t size, int64_t* num_bytes) {
298312
DCHECK(data);
299313
auto message = flatbuffers::GetRoot<PlasmaEvictRequest>(data);
314+
DCHECK(verify_flatbuffer(message, data, size));
300315
*num_bytes = message->num_bytes();
301316
return Status::OK();
302317
}
@@ -307,9 +322,10 @@ Status SendEvictReply(int sock, int64_t num_bytes) {
307322
return PlasmaSend(sock, MessageType_PlasmaEvictReply, &fbb, message);
308323
}
309324

310-
Status ReadEvictReply(uint8_t* data, int64_t& num_bytes) {
325+
Status ReadEvictReply(uint8_t* data, size_t size, int64_t& num_bytes) {
311326
DCHECK(data);
312327
auto message = flatbuffers::GetRoot<PlasmaEvictReply>(data);
328+
DCHECK(verify_flatbuffer(message, data, size));
313329
num_bytes = message->num_bytes();
314330
return Status::OK();
315331
}
@@ -325,9 +341,10 @@ Status SendGetRequest(
325341
}
326342

327343
Status ReadGetRequest(
328-
uint8_t* data, std::vector<ObjectID>& object_ids, int64_t* timeout_ms) {
344+
uint8_t* data, size_t size, std::vector<ObjectID>& object_ids, int64_t* timeout_ms) {
329345
DCHECK(data);
330346
auto message = flatbuffers::GetRoot<PlasmaGetRequest>(data);
347+
DCHECK(verify_flatbuffer(message, data, size));
331348
for (uoffset_t i = 0; i < message->object_ids()->size(); ++i) {
332349
auto object_id = message->object_ids()->Get(i)->str();
333350
object_ids.push_back(ObjectID::from_binary(object_id));
@@ -353,10 +370,11 @@ Status SendGetReply(int sock, ObjectID object_ids[],
353370
return PlasmaSend(sock, MessageType_PlasmaGetReply, &fbb, message);
354371
}
355372

356-
Status ReadGetReply(uint8_t* data, ObjectID object_ids[], PlasmaObject plasma_objects[],
373+
Status ReadGetReply(uint8_t* data, size_t size, ObjectID object_ids[], PlasmaObject plasma_objects[],
357374
int64_t num_objects) {
358375
DCHECK(data);
359376
auto message = flatbuffers::GetRoot<PlasmaGetReply>(data);
377+
DCHECK(verify_flatbuffer(message, data, size));
360378
for (uoffset_t i = 0; i < num_objects; ++i) {
361379
object_ids[i] = ObjectID::from_binary(message->object_ids()->Get(i)->str());
362380
}
@@ -381,9 +399,10 @@ Status SendFetchRequest(int sock, const ObjectID* object_ids, int64_t num_object
381399
return PlasmaSend(sock, MessageType_PlasmaFetchRequest, &fbb, message);
382400
}
383401

384-
Status ReadFetchRequest(uint8_t* data, std::vector<ObjectID>& object_ids) {
402+
Status ReadFetchRequest(uint8_t* data, size_t size, std::vector<ObjectID>& object_ids) {
385403
DCHECK(data);
386404
auto message = flatbuffers::GetRoot<PlasmaFetchRequest>(data);
405+
DCHECK(verify_flatbuffer(message, data, size));
387406
for (uoffset_t i = 0; i < message->object_ids()->size(); ++i) {
388407
object_ids.push_back(ObjectID::from_binary(message->object_ids()->Get(i)->str()));
389408
}
@@ -408,10 +427,11 @@ Status SendWaitRequest(int sock, ObjectRequest object_requests[], int64_t num_re
408427
return PlasmaSend(sock, MessageType_PlasmaWaitRequest, &fbb, message);
409428
}
410429

411-
Status ReadWaitRequest(uint8_t* data, ObjectRequestMap& object_requests,
430+
Status ReadWaitRequest(uint8_t* data, size_t size, ObjectRequestMap& object_requests,
412431
int64_t* timeout_ms, int* num_ready_objects) {
413432
DCHECK(data);
414433
auto message = flatbuffers::GetRoot<PlasmaWaitRequest>(data);
434+
DCHECK(verify_flatbuffer(message, data, size));
415435
*num_ready_objects = message->num_ready_objects();
416436
*timeout_ms = message->timeout();
417437

@@ -442,10 +462,11 @@ Status SendWaitReply(
442462
}
443463

444464
Status ReadWaitReply(
445-
uint8_t* data, ObjectRequest object_requests[], int* num_ready_objects) {
465+
uint8_t* data, size_t size, ObjectRequest object_requests[], int* num_ready_objects) {
446466
DCHECK(data);
447467

448468
auto message = flatbuffers::GetRoot<PlasmaWaitReply>(data);
469+
DCHECK(verify_flatbuffer(message, data, size));
449470
*num_ready_objects = message->num_ready_objects();
450471
for (int i = 0; i < *num_ready_objects; i++) {
451472
object_requests[i].object_id =
@@ -473,9 +494,10 @@ Status SendDataRequest(int sock, ObjectID object_id, const char* address, int po
473494
return PlasmaSend(sock, MessageType_PlasmaDataRequest, &fbb, message);
474495
}
475496

476-
Status ReadDataRequest(uint8_t* data, ObjectID* object_id, char** address, int* port) {
497+
Status ReadDataRequest(uint8_t* data, size_t size, ObjectID* object_id, char** address, int* port) {
477498
DCHECK(data);
478499
auto message = flatbuffers::GetRoot<PlasmaDataRequest>(data);
500+
DCHECK(verify_flatbuffer(message, data, size));
479501
DCHECK(message->object_id()->size() == sizeof(ObjectID));
480502
*object_id = ObjectID::from_binary(message->object_id()->str());
481503
*address = strdup(message->address()->c_str());
@@ -492,9 +514,10 @@ Status SendDataReply(
492514
}
493515

494516
Status ReadDataReply(
495-
uint8_t* data, ObjectID* object_id, int64_t* object_size, int64_t* metadata_size) {
517+
uint8_t* data, size_t size, ObjectID* object_id, int64_t* object_size, int64_t* metadata_size) {
496518
DCHECK(data);
497519
auto message = flatbuffers::GetRoot<PlasmaDataReply>(data);
520+
DCHECK(verify_flatbuffer(message, data, size));
498521
*object_id = ObjectID::from_binary(message->object_id()->str());
499522
*object_size = (int64_t)message->object_size();
500523
*metadata_size = (int64_t)message->metadata_size();

0 commit comments

Comments
 (0)