Skip to content

Commit 914dc11

Browse files
committed
feat: add support to sending out constants
1 parent 160a673 commit 914dc11

File tree

4 files changed

+93
-59
lines changed

4 files changed

+93
-59
lines changed

include/process/base_process.hpp

Lines changed: 32 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -54,12 +54,20 @@ void process_body(int read_fd, int send_fd, bool isMultiOutput) {
5454
return (PyObject*) NULL;
5555
}
5656

57-
// serialize pydata
58-
auto pydata_str = pickl.pickle(pydata);
59-
if (PyErr_Occurred()) return (PyObject*) NULL;
57+
// we may have a fastflow constant as data to send out to index
58+
void *constant = NULL;
59+
std::string pydata_str = "";
60+
if (PyObject_TypeCheck(pydata, &py_ff_constant_type) != 0) {
61+
py_ff_constant_object* _const_result = reinterpret_cast<py_ff_constant_object*>(pydata);
62+
constant = _const_result->ff_const;
63+
} else {
64+
// serialize pydata
65+
pydata_str = pickl.pickle(pydata);
66+
if (PyErr_Occurred()) return (PyObject*) NULL;
67+
}
6068

6169
Message message;
62-
create_message_ff_send_out_to(message, pydata_str, index);
70+
create_message_ff_send_out_to(message, index, constant, pydata_str);
6371

6472
// send response
6573
int err = sendMessage(read_fd, send_fd, message);
@@ -140,7 +148,7 @@ void process_body(int read_fd, int send_fd, bool isMultiOutput) {
140148
cleanup_exit();
141149
}
142150

143-
#define EMPTY_TUPLE_STR "(t."
151+
#define SERIALIZED_EMPTY_TUPLE "(t."
144152

145153
class base_process {
146154
public:
@@ -214,22 +222,22 @@ class base_process {
214222
send_fd = mainToChildFD[1];
215223
read_fd = childToMainFD[0];
216224

217-
int err = sendMessage(read_fd, send_fd, { .type = MESSAGE_TYPE_REMOTE_FUNCTION_CALL, .data = node_str, .f_name = "" });
225+
int err = sendMessage(read_fd, send_fd, { .type = MESSAGE_TYPE_REMOTE_PROCEDURE_CALL, .data = node_str, .f_name = "" });
218226
if (err <= 0) handleError("send serialized node", returnValue = -1);
219227

220228
if (err > 0 && has_svc_init) {
221229
Message response;
222-
auto empty_tuple = std::string(EMPTY_TUPLE_STR);
223-
int err = remote_function_call(send_fd, read_fd, empty_tuple, "svc_init", response);
230+
auto empty_tuple = std::string(SERIALIZED_EMPTY_TUPLE);
231+
int err = remote_procedure_call(send_fd, read_fd, empty_tuple, "svc_init", response);
224232
if (err <= 0) {
225233
handleError("read result of remote call of svc_init", );
226234
} else {
227235
// Hold the main GIL
228236
PyEval_RestoreThread(tstate);
229237
PyObject *svc_init_result = pickl.unpickle(response.data);
230238
CHECK_ERROR_THEN("unpickle svc_init_result failure: ", returnValue = -1;)
239+
231240
returnValue = 0;
232-
// if we are here, then the GIL was acquired before
233241
if (PyLong_Check(svc_init_result)) {
234242
long res_as_long = PyLong_AsLong(svc_init_result);
235243
returnValue = static_cast<int>(res_as_long);
@@ -249,31 +257,35 @@ class base_process {
249257
}
250258

251259
void* svc(void *arg) {
252-
auto is_ff_marker = arg == ff::FF_GO_ON;
253-
std::string serialized_data = is_ff_marker || arg == NULL ? EMPTY_TUPLE_STR:*reinterpret_cast<std::string*>(arg);
260+
// arg may be equal to ff::FF_GO_ON in case of a node of a first set of an a2a that hasn't input channels
261+
std::string serialized_data = arg == NULL || arg == ff::FF_GO_ON ? SERIALIZED_EMPTY_TUPLE:*reinterpret_cast<std::string*>(arg);
254262

255263
Message response;
256-
int err = remote_function_call(send_fd, read_fd, serialized_data, "svc", response);
264+
int err = remote_procedure_call(send_fd, read_fd, serialized_data, "svc", response);
257265
if (err <= 0) {
258266
handleError("remote call of svc", );
259267
LOG("an error occurred, abort.");
260268
return NULL;
261269
}
262270

263-
while(response.type == MESSAGE_TYPE_REMOTE_FUNCTION_CALL) {
271+
while(response.type == MESSAGE_TYPE_REMOTE_PROCEDURE_CALL) {
272+
// the only supported remote procedure call from the child process
273+
// if the call of ff_send_out_to (as of today...)
264274
if (response.f_name.compare("ff_send_out_to") != 0) {
265275
handleError("got invalid f_name", );
266276
LOG("an error occurred, got invalid f_name. Abort.");
267277
return NULL;
268278
}
269279

270280
// parse received ff_send_out_to request
271-
int sendout_index = 0;
272-
std::string* ser_data = new std::string();
273-
parse_message_ff_send_out_to(response, ser_data, &sendout_index);
281+
int index;
282+
void *constant = NULL;
283+
std::string *data = new std::string();
284+
parse_message_ff_send_out_to(response, &constant, &index, data);
274285

286+
if (constant != NULL) free(data);
275287
// finally perform ff_send_out_to
276-
bool result = registered_callback->ff_send_out_to(ser_data, sendout_index);
288+
bool result = registered_callback->ff_send_out_to(constant != NULL ? constant:data, index);
277289

278290
// send ff_send_out_to result
279291
err = sendMessage(read_fd, send_fd, { .type = MESSAGE_TYPE_RESPONSE, .data = result ? "t":"f", .f_name = "" });
@@ -294,6 +306,7 @@ class base_process {
294306

295307
// got response of svc
296308
if (response.type == MESSAGE_TYPE_RESPONSE_GO_ON) return ff::FF_GO_ON;
309+
if (response.type == MESSAGE_TYPE_RESPONSE_EOS) return ff::FF_EOS;
297310
if (response.data.compare(none_str) == 0) return ff::FF_EOS;
298311

299312
return new std::string(response.data);
@@ -317,8 +330,8 @@ class base_process {
317330

318331
if (has_svc_end) {
319332
Message response;
320-
auto empty_tuple = std::string(EMPTY_TUPLE_STR);
321-
int err = remote_function_call(send_fd, read_fd, empty_tuple, "svc_end", response);
333+
auto empty_tuple = std::string(SERIALIZED_EMPTY_TUPLE);
334+
int err = remote_procedure_call(send_fd, read_fd, empty_tuple, "svc_end", response);
322335
if (err <= 0) handleError("read result of remote call of svc_end", );
323336
}
324337

include/process/messaging.hpp

Lines changed: 46 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -7,13 +7,16 @@
77

88
#define handleError(msg, then) do { if (errno < 0) { perror(msg); } else { LOG(msg": errno == 0, fd closed" << std::endl); } then; } while(0)
99

10-
#define MESSAGE_TYPE_RESPONSE '1'
11-
#define MESSAGE_TYPE_RESPONSE_GO_ON '2'
12-
#define MESSAGE_TYPE_REMOTE_FUNCTION_CALL '3'
13-
#define MESSAGE_TYPE_ACK '4'
10+
enum message_type {
11+
MESSAGE_TYPE_RESPONSE = 1,
12+
MESSAGE_TYPE_REMOTE_PROCEDURE_CALL,
13+
MESSAGE_TYPE_RESPONSE_GO_ON,
14+
MESSAGE_TYPE_RESPONSE_EOS,
15+
MESSAGE_TYPE_ACK
16+
};
1417

1518
struct Message {
16-
char type;
19+
message_type type;
1720
std::string data;
1821
std::string f_name;
1922
};
@@ -34,7 +37,7 @@ int sendMessage(int read_fd, int send_fd, const Message& message) {
3437
}
3538

3639
// send f_name
37-
uint32_t fnameSize = message.f_name.size();
40+
uint32_t fnameSize = message.f_name.length();
3841
if (write(send_fd, &fnameSize, sizeof(fnameSize)) == -1) {
3942
handleError("write f_name size", return -1);
4043
}
@@ -92,36 +95,54 @@ int receiveMessage(int read_fd, int send_fd, Message& message) {
9295
return 1; // 0 = EOF, -1 = ERROR, 1 = SUCCESS
9396
}
9497

95-
void create_message_ff_send_out_to(Message &message, std::string &data, int index) {
96-
message.type = MESSAGE_TYPE_REMOTE_FUNCTION_CALL;
97-
message.f_name = "ff_send_out_to";
98-
std::string index_str = std::to_string(index);
99-
message.data.erase();
100-
message.data.reserve(index_str.length() + 3 + data.length());
101-
message.data.append(index_str);
102-
message.data.append("~");
103-
message.data.append(data);
104-
}
105-
106-
void parse_message_ff_send_out_to(Message &message, std::string *data, int* sendout_index) {
107-
int dividerPos = message.data.find('~');
108-
*sendout_index = std::stoi(message.data.substr(0, dividerPos));
109-
*data = message.data.substr(dividerPos+1, message.data.length() - dividerPos);
110-
}
111-
112-
int remote_function_call(int send_fd, int read_fd, std::string &data, const char *f_name, Message &response) {
98+
int remote_procedure_call(int send_fd, int read_fd, std::string &data, const char *f_name, Message &response) {
11399
if (send_fd == -1 || read_fd == -1) {
114100
// they are -1 if an error occurred during svc_init or svc
115101
return -1;
116102
}
117103

118104
int err = sendMessage(read_fd, send_fd, {
119-
.type = MESSAGE_TYPE_REMOTE_FUNCTION_CALL,
105+
.type = MESSAGE_TYPE_REMOTE_PROCEDURE_CALL,
120106
.data = data,
121107
.f_name = f_name });
122108
if (err <= 0) return err;
123109

124110
return receiveMessage(read_fd, send_fd, response);
125111
}
126112

113+
void create_message_ff_send_out_to(Message &message, int index, void* &constant, std::string &data) {
114+
if (constant != NULL && constant != ff::FF_EOS
115+
&& constant != ff::FF_GO_ON) {
116+
throw "fastflow constant not supported";
117+
}
118+
119+
message.type = MESSAGE_TYPE_REMOTE_PROCEDURE_CALL;
120+
message.f_name = "ff_send_out_to";
121+
std::string index_str = std::to_string(index);
122+
message.data.erase();
123+
message.data.reserve(index_str.length() + 8 + data.length()); // 8 is just to be sure there is enough space for the constant
124+
message.data.append(index_str);
125+
message.data.append("~");
126+
message.data.append(constant != NULL ? "t":"f");
127+
message.data.append(constant != NULL ? std::to_string(constant == ff::FF_EOS ? MESSAGE_TYPE_RESPONSE_EOS:MESSAGE_TYPE_RESPONSE_GO_ON):data);
128+
}
129+
130+
void parse_message_ff_send_out_to(Message &message, void **constant, int *index, std::string *data) {
131+
int dividerPos = message.data.find('~');
132+
*index = std::stoi(message.data.substr(0, dividerPos));
133+
if (message.data.at(dividerPos+1) == 't') {
134+
*data = "";
135+
std::string inner_data = message.data.substr(dividerPos+2, message.data.length() - dividerPos - 1);
136+
if (inner_data.compare(std::to_string(MESSAGE_TYPE_RESPONSE_EOS)) == 0) {
137+
*constant = ff::FF_EOS;
138+
}
139+
if (inner_data.compare(std::to_string(MESSAGE_TYPE_RESPONSE_GO_ON)) == 0) {
140+
*constant = ff::FF_GO_ON;
141+
}
142+
} else {
143+
*constant = NULL;
144+
*data = message.data.substr(dividerPos+2, message.data.length() - dividerPos - 1);
145+
}
146+
}
147+
127148
#endif //MESSAGING_HPP

tests/python/a2a-ff-send-to.test.py

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,13 @@
22
import sys
33

44
"""
5-
first ______ second
6-
|
7-
first ___| second
8-
|
9-
first ___| second
10-
|
11-
first ___|
5+
source ______ sink
6+
|
7+
source ___| sink
8+
|
9+
source ___| sink
10+
|
11+
source ___|
1212
"""
1313

1414
class source():

tests/python/a2a.test.py

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,13 @@
22
import sys
33

44
"""
5-
first _ _ second
6-
| |
7-
first _|_|_ second
8-
| |
9-
first _|_|_ second
10-
|
11-
first _|
5+
source _ _ sink
6+
| |
7+
source _|_|_ sink
8+
| |
9+
source _|_|_ sink
10+
|
11+
source _|
1212
"""
1313

1414
class source():
@@ -18,7 +18,7 @@ def __init__(self, id):
1818

1919
def svc(self, *arg):
2020
if self.counter > 5:
21-
return None
21+
return
2222
self.counter += 1
2323

2424
return list([self.id])

0 commit comments

Comments
 (0)