Skip to content

Commit 7eb2ad8

Browse files
committed
feat: add EOS constant and map None to EOS, fix processes closing procedures
2 parents 3b47ae2 + c2ab0ac commit 7eb2ad8

14 files changed

+136
-142
lines changed

fastflow_module.cpp

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -13,15 +13,6 @@
1313
#include "process/ff_node_process.hpp"
1414
#include "py_ff_constant.hpp"
1515

16-
PyDoc_STRVAR(ff_send_out_to_doc, "Send out to a node");
17-
18-
PyObject* empty_ff_send_out_to(PyObject *self, PyObject *args) {
19-
assert(self);
20-
21-
PyErr_SetString(PyExc_Exception, "Operation not available. This is not a multi output node");
22-
return NULL;
23-
}
24-
2516
/* initialization function */
2617
static int
2718
fastflow_module_exec(PyObject *module)
@@ -70,9 +61,22 @@ fastflow_module_exec(PyObject *module)
7061
return -1;
7162
}
7263

64+
if (PyModule_AddObject(module, EOS_CONSTANT_NAME, build_py_ff_constant(ff::FF_EOS)) < 0) {
65+
return -1;
66+
}
67+
7368
return 0;
7469
}
7570

71+
PyDoc_STRVAR(ff_send_out_to_doc, "Send out to a node");
72+
73+
PyObject* empty_ff_send_out_to(PyObject *self, PyObject *args) {
74+
assert(self);
75+
76+
PyErr_SetString(PyExc_Exception, "Operation not available. This is not a multi output node");
77+
return NULL;
78+
}
79+
7680
static PyMethodDef module_methods[] = {
7781
{ "ff_send_out_to", (PyCFunction) empty_ff_send_out_to, METH_VARARGS, ff_send_out_to_doc },
7882
{NULL, NULL} /* Sentinel */

include/process/base_process.hpp

Lines changed: 54 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ void process_body(int read_fd, int send_fd, bool isMultiOutput) {
2424
close(send_fd);
2525
close(read_fd);
2626
pickl.~pickling();
27+
// finalize python interpreter
2728
Py_Finalize();
2829
exit(0);
2930
};
@@ -33,12 +34,10 @@ void process_body(int read_fd, int send_fd, bool isMultiOutput) {
3334
// receive serialized node
3435
int err = receiveMessage(read_fd, send_fd, message);
3536
if (err <= 0) handleError("[child] read serialized node", cleanup_exit());
36-
LOG("[child] read serialized node");
3737

3838
// deserialize Python node
3939
auto node = pickl.unpickle(message.data);
4040
CHECK_ERROR_THEN("[child] unpickle node failure: ", cleanup_exit();)
41-
LOG("[child] deserialized node");
4241

4342
py_ff_callback_object* callback = (py_ff_callback_object*) PyObject_CallObject(
4443
(PyObject *) &py_ff_callback_type, NULL
@@ -76,8 +75,6 @@ void process_body(int read_fd, int send_fd, bool isMultiOutput) {
7675
return (PyObject*) NULL;
7776
}
7877

79-
LOG("[child] sent ff_send_out_to message");
80-
8178
// receive response
8279
err = receiveMessage(read_fd, send_fd, message);
8380
if (err <= 0) {
@@ -104,41 +101,42 @@ void process_body(int read_fd, int send_fd, bool isMultiOutput) {
104101

105102
while(err > 0) {
106103
err = receiveMessage(read_fd, send_fd, message);
107-
if (err < 0) handleError("[child] read next", cleanup_exit());
108-
109-
if (err > 0) {
110-
// deserialize data
111-
auto py_args_tuple = pickl.unpickle(message.data);
112-
CHECK_ERROR_THEN("[child] deserialize data failure: ", cleanup_exit();)
113-
// call function
114-
PyObject *py_func = PyObject_GetAttrString(node, message.f_name.c_str());
115-
CHECK_ERROR_THEN("[child] get node function: ", cleanup_exit();)
116-
117-
if (py_func) {
118-
// finally call the function
119-
PyObject* py_result = PyTuple_Check(py_args_tuple) == 1 ? PyObject_CallObject(py_func, py_args_tuple):PyObject_CallFunctionObjArgs(py_func, py_args_tuple, nullptr);
120-
CHECK_ERROR_THEN("[child] call function failure: ", cleanup_exit();)
121-
122-
// we may have a fastflow constant as result
123-
if (PyObject_TypeCheck(py_result, &py_ff_constant_type) != 0) {
124-
// py_ff_constant_object* _const_result = reinterpret_cast<py_ff_constant_object*>(py_result);
125-
// the only constant available is FF_GO_ON
126-
err = sendMessage(read_fd, send_fd, { .type = MESSAGE_TYPE_RESPONSE_GO_ON, .data = "", .f_name = "" });
127-
if (err <= 0) handleError("[child] send constant response", cleanup_exit());
128-
LOG("[child] sent constant response");
129-
} else {
130-
// serialize response
131-
auto result_str = pickl.pickle(py_result);
132-
CHECK_ERROR_THEN("[child] pickle result failure: ", cleanup_exit();)
133-
134-
// send response
135-
err = sendMessage(read_fd, send_fd, { .type = MESSAGE_TYPE_RESPONSE, .data = result_str, .f_name = "" });
136-
if (err <= 0) handleError("[child] send response", cleanup_exit());
137-
LOG("[child] sent response");
138-
139-
Py_DECREF(py_result);
140-
Py_DECREF(py_func);
141-
}
104+
if (err <= 0) handleError("[child] read next", cleanup_exit());
105+
106+
if (message.type == MESSAGE_TYPE_END_OF_LIFE) break;
107+
108+
// deserialize data
109+
auto py_args_tuple = pickl.unpickle(message.data);
110+
CHECK_ERROR_THEN("[child] deserialize data failure: ", cleanup_exit();)
111+
// call function
112+
PyObject *py_func = PyObject_GetAttrString(node, message.f_name.c_str());
113+
CHECK_ERROR_THEN("[child] get node function: ", cleanup_exit();)
114+
115+
if (py_func) {
116+
// finally call the function
117+
PyObject* py_result = PyTuple_Check(py_args_tuple) == 1 ? PyObject_CallObject(py_func, py_args_tuple):PyObject_CallFunctionObjArgs(py_func, py_args_tuple, nullptr);
118+
CHECK_ERROR_THEN("[child] call function failure: ", cleanup_exit();)
119+
120+
// we may have a fastflow constant as result
121+
if (PyObject_TypeCheck(py_result, &py_ff_constant_type) != 0) {
122+
py_ff_constant_object* _const_result = reinterpret_cast<py_ff_constant_object*>(py_result);
123+
err = sendMessage(read_fd, send_fd, {
124+
.type = _const_result->ff_const == ff::FF_EOS ? MESSAGE_TYPE_EOS:MESSAGE_TYPE_GO_ON,
125+
.data = "",
126+
.f_name = ""
127+
});
128+
if (err <= 0) handleError("[child] send constant response", cleanup_exit());
129+
} else {
130+
// serialize response
131+
auto result_str = pickl.pickle(py_result);
132+
CHECK_ERROR_THEN("[child] pickle result failure: ", cleanup_exit();)
133+
134+
// send response
135+
err = sendMessage(read_fd, send_fd, { .type = MESSAGE_TYPE_RESPONSE, .data = result_str, .f_name = "" });
136+
if (err <= 0) handleError("[child] send response", cleanup_exit());
137+
138+
Py_DECREF(py_result);
139+
Py_DECREF(py_func);
142140
}
143141
}
144142
}
@@ -159,7 +157,6 @@ class base_process {
159157
}
160158

161159
int svc_init() {
162-
TIMESTART(svc_init_start_time);
163160
// associate a new thread state with ff_node's thread
164161
PyThreadState* cached_tstate = tstate;
165162
tstate = PyThreadState_New(cached_tstate->interp);
@@ -252,7 +249,6 @@ class base_process {
252249
}
253250

254251
// from here the GIL is NOT acquired
255-
LOGELAPSED("svc_init time ", svc_init_start_time);
256252
return returnValue;
257253
}
258254

@@ -264,7 +260,6 @@ class base_process {
264260
int err = remote_procedure_call(send_fd, read_fd, serialized_data, "svc", response);
265261
if (err <= 0) {
266262
handleError("remote call of svc", );
267-
LOG("an error occurred, abort.");
268263
return NULL;
269264
}
270265

@@ -273,7 +268,6 @@ class base_process {
273268
// if the call of ff_send_out_to (as of today...)
274269
if (response.f_name.compare("ff_send_out_to") != 0) {
275270
handleError("got invalid f_name", );
276-
LOG("an error occurred, got invalid f_name. Abort.");
277271
return NULL;
278272
}
279273

@@ -291,63 +285,51 @@ class base_process {
291285
err = sendMessage(read_fd, send_fd, { .type = MESSAGE_TYPE_RESPONSE, .data = result ? "t":"f", .f_name = "" });
292286
if (err <= 0) {
293287
handleError("error sending ff_send_out_to response", );
294-
LOG("an error occurred, sending ff_send_out_to. Abort.");
295288
return NULL;
296289
}
297290

298291
// prepare for next iteration
299292
err = receiveMessage(read_fd, send_fd, response);
300293
if (err <= 0) {
301294
handleError("waiting for svc response", );
302-
LOG("an error occurred, abort.");
303295
return NULL;
304296
}
305297
}
306298

307299
// got response of svc
308-
if (response.type == MESSAGE_TYPE_RESPONSE_GO_ON) return ff::FF_GO_ON;
309-
if (response.type == MESSAGE_TYPE_RESPONSE_EOS) return ff::FF_EOS;
310-
if (response.data.compare(none_str) == 0) return ff::FF_EOS;
300+
if (response.type == MESSAGE_TYPE_EOS) return ff::FF_EOS;
301+
if (response.type == MESSAGE_TYPE_GO_ON || response.data.compare(none_str) == 0) {
302+
return ff::FF_GO_ON;
303+
}
311304

312305
return new std::string(response.data);
313306
}
314307

315-
void cleanup() {
316-
// Cleanup of objects created
317-
Py_DECREF(node);
318-
node = nullptr;
319-
tstate = nullptr;
320-
PyEval_SaveThread();
321-
322-
if (send_fd > 0) close(send_fd);
323-
send_fd = -1;
324-
if (read_fd > 0) close(read_fd);
325-
send_fd = -1;
326-
}
327-
328308
void svc_end() {
329-
TIMESTART(svc_end_start_time);
330-
331309
if (has_svc_end) {
332310
Message response;
333311
auto empty_tuple = std::string(SERIALIZED_EMPTY_TUPLE);
334312
int err = remote_procedure_call(send_fd, read_fd, empty_tuple, "svc_end", response);
335313
if (err <= 0) handleError("read result of remote call of svc_end", );
336314
}
337315

338-
// close the pipes, so the process can stop meanwhile we acquire the GIL and cleanup everything
316+
// send end of life. Meanwhile we acquire the GIL and cleanup, the process will stop
317+
int err = sendMessage(read_fd, send_fd, { .type = MESSAGE_TYPE_END_OF_LIFE, .data = "", .f_name = "" });
318+
319+
// Acquire the main GIL
320+
PyEval_RestoreThread(tstate);
321+
// Cleanup of objects created
322+
Py_DECREF(node);
323+
node = nullptr;
324+
tstate = nullptr;
325+
// Release the main GIL
326+
PyEval_SaveThread();
327+
waitpid(pid, nullptr, 0);
328+
339329
if (send_fd > 0) close(send_fd);
340330
send_fd = -1;
341331
if (read_fd > 0) close(read_fd);
342332
send_fd = -1;
343-
344-
waitpid(pid, nullptr, 0);
345-
346-
// Acquire the main GIL
347-
PyEval_RestoreThread(tstate);
348-
cleanup();
349-
350-
LOGELAPSED("svc_end time ", svc_end_start_time);
351333
}
352334

353335
void register_callback(ff::ff_monode* cb_node) {

include/process/messaging.hpp

Lines changed: 18 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -5,14 +5,15 @@
55
#include "debugging.hpp"
66
#include <ff/distributed/ff_network.hpp> // import writen and readn
77

8-
#define handleError(msg, then) do { if (errno < 0) { perror(msg); } else { LOG(msg": errno == 0, fd closed" << std::endl); } then; } while(0)
8+
#define handleError(msg, then) do { perror(msg); then; } while(0)
99

1010
enum message_type {
1111
MESSAGE_TYPE_RESPONSE = 1,
1212
MESSAGE_TYPE_REMOTE_PROCEDURE_CALL,
13-
MESSAGE_TYPE_RESPONSE_GO_ON,
14-
MESSAGE_TYPE_RESPONSE_EOS,
15-
MESSAGE_TYPE_ACK
13+
MESSAGE_TYPE_GO_ON,
14+
MESSAGE_TYPE_EOS,
15+
MESSAGE_TYPE_ACK,
16+
MESSAGE_TYPE_END_OF_LIFE
1617
};
1718

1819
struct Message {
@@ -46,8 +47,8 @@ int sendMessage(int read_fd, int send_fd, const Message& message) {
4647
handleError("write f_name", return -1);
4748
}
4849

49-
int dummy;
50-
int err = read(read_fd, &dummy, sizeof(dummy));
50+
int ack;
51+
int err = read(read_fd, &ack, sizeof(ack));
5152
if (err <= 0) handleError("error receiving ACK", return err);
5253

5354
return 1; // 0 = EOF, -1 = ERROR, 1 = SUCCESS
@@ -62,14 +63,15 @@ int receiveMessage(int read_fd, int send_fd, Message& message) {
6263
uint32_t dataSize;
6364
res = read(read_fd, &dataSize, sizeof(dataSize));
6465
if (res <= 0) handleError("read data size", return res);
66+
6567
char* bufferData = new char[dataSize + 1];
6668
if (dataSize > 0) {
6769
res = readn(read_fd, bufferData, dataSize);
6870
if (res <= 0) handleError("read data", return res);
6971
}
7072

7173
bufferData[dataSize] = '\0';
72-
message.data = std::string(bufferData);
74+
//message.data = std::string(bufferData);
7375
message.data.assign(bufferData, dataSize);
7476
delete[] bufferData;
7577

@@ -88,23 +90,22 @@ int receiveMessage(int read_fd, int send_fd, Message& message) {
8890
message.f_name = std::string(bufferFname);
8991
delete[] bufferFname;
9092

91-
int dummy = 17;
92-
res = write(send_fd, &dummy, sizeof(dummy));
93+
int ack = 17;
94+
res = write(send_fd, &ack, sizeof(ack));
9395
if (res == -1) handleError("error sending ACK", return res);
9496

9597
return 1; // 0 = EOF, -1 = ERROR, 1 = SUCCESS
9698
}
9799

98100
int remote_procedure_call(int send_fd, int read_fd, std::string &data, const char *f_name, Message &response) {
99-
if (send_fd == -1 || read_fd == -1) {
100-
// they are -1 if an error occurred during svc_init or svc
101-
return -1;
102-
}
101+
// they are -1 if an error occurred during svc_init or svc
102+
if (send_fd == -1 || read_fd == -1) return -1;
103103

104104
int err = sendMessage(read_fd, send_fd, {
105105
.type = MESSAGE_TYPE_REMOTE_PROCEDURE_CALL,
106106
.data = data,
107-
.f_name = f_name });
107+
.f_name = f_name
108+
});
108109
if (err <= 0) return err;
109110

110111
return receiveMessage(read_fd, send_fd, response);
@@ -124,7 +125,7 @@ void create_message_ff_send_out_to(Message &message, int index, void* &constant,
124125
message.data.append(index_str);
125126
message.data.append("~");
126127
message.data.append(constant != NULL ? "t":"f");
127-
message.data.append(constant != NULL ? std::to_string(constant == ff::FF_EOS ? MESSAGE_TYPE_RESPONSE_EOS:MESSAGE_TYPE_RESPONSE_GO_ON):data);
128+
message.data.append(constant != NULL ? std::to_string(constant == ff::FF_EOS ? MESSAGE_TYPE_EOS:MESSAGE_TYPE_GO_ON):data);
128129
}
129130

130131
void parse_message_ff_send_out_to(Message &message, void **constant, int *index, std::string *data) {
@@ -133,10 +134,10 @@ void parse_message_ff_send_out_to(Message &message, void **constant, int *index,
133134
if (message.data.at(dividerPos+1) == 't') {
134135
*data = "";
135136
std::string inner_data = message.data.substr(dividerPos+2, message.data.length() - dividerPos - 1);
136-
if (inner_data.compare(std::to_string(MESSAGE_TYPE_RESPONSE_EOS)) == 0) {
137+
if (inner_data.compare(std::to_string(MESSAGE_TYPE_EOS)) == 0) {
137138
*constant = ff::FF_EOS;
138139
}
139-
if (inner_data.compare(std::to_string(MESSAGE_TYPE_RESPONSE_GO_ON)) == 0) {
140+
if (inner_data.compare(std::to_string(MESSAGE_TYPE_GO_ON)) == 0) {
140141
*constant = ff::FF_GO_ON;
141142
}
142143
} else {

include/py_ff_a2a.hpp

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -138,8 +138,18 @@ PyObject* py_ff_a2a_add_firstset(PyObject *self, PyObject *args, PyObject* kwds)
138138
py_ff_pipeline_object* _pipe = reinterpret_cast<py_ff_pipeline_object*>(item);
139139
if (!_pipe->pipeline->isMultiOutput()) {
140140
ff::ff_node* last_stage = _pipe->pipeline->get_laststage();
141-
ff::ff_node* new_node = new ff::internal_mo_transformer(last_stage, false);
142-
_pipe->pipeline->change_node(last_stage, new_node, true, true);
141+
// let's replace the last stage with a multi output variant
142+
PyObject *last_stage_py_node = _self->use_subinterpreters ?
143+
(reinterpret_cast<ff_node_subint*>(last_stage))->get_python_object():
144+
(reinterpret_cast<ff_node_process*>(last_stage))->get_python_object();
145+
if (last_stage_py_node == NULL) {
146+
PyErr_SetString(PyExc_RuntimeError, "Unable to transform previous stage to multi output (null pointer)");
147+
return NULL;
148+
}
149+
ff::ff_node* new_last_stage = _self->use_subinterpreters ?
150+
(ff::ff_node*) new ff_monode_subint(last_stage_py_node):
151+
(ff::ff_node*) new ff_monode_process(last_stage_py_node);
152+
_pipe->pipeline->change_node(last_stage, new_last_stage, true, true);
143153
}
144154
node = _pipe->pipeline;
145155
} else if (_self->use_subinterpreters) {

include/py_ff_constant.hpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ static PyTypeObject py_ff_constant_type = {
1919
};
2020

2121
#define GO_ON_CONSTANT_NAME "GO_ON"
22+
#define EOS_CONSTANT_NAME "EOS"
2223

2324
PyObject* build_py_ff_constant(void *val) {
2425
py_ff_constant_object* constant = (py_ff_constant_object*) PyObject_CallObject(

0 commit comments

Comments
 (0)