Skip to content

Commit c2ab0ac

Browse files
committed
feat: map None to ff::EOS
1 parent 914dc11 commit c2ab0ac

14 files changed

+76
-61
lines changed

fastflow_module.cpp

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -13,15 +13,6 @@
1313
#include "process/ff_node_process.hpp"
1414
#include "py_ff_constant.hpp"
1515

16-
PyDoc_STRVAR(ff_send_out_to_doc, "Send out to a node");
17-
18-
PyObject* empty_ff_send_out_to(PyObject *self, PyObject *args) {
19-
assert(self);
20-
21-
PyErr_SetString(PyExc_Exception, "Operation not available. This is not a multi output node");
22-
return NULL;
23-
}
24-
2516
/* initialization function */
2617
static int
2718
fastflow_module_exec(PyObject *module)
@@ -70,9 +61,22 @@ fastflow_module_exec(PyObject *module)
7061
return -1;
7162
}
7263

64+
if (PyModule_AddObject(module, EOS_CONSTANT_NAME, build_py_ff_constant(ff::FF_EOS)) < 0) {
65+
return -1;
66+
}
67+
7368
return 0;
7469
}
7570

71+
PyDoc_STRVAR(ff_send_out_to_doc, "Send out to a node");
72+
73+
PyObject* empty_ff_send_out_to(PyObject *self, PyObject *args) {
74+
assert(self);
75+
76+
PyErr_SetString(PyExc_Exception, "Operation not available. This is not a multi output node");
77+
return NULL;
78+
}
79+
7680
static PyMethodDef module_methods[] = {
7781
{ "ff_send_out_to", (PyCFunction) empty_ff_send_out_to, METH_VARARGS, ff_send_out_to_doc },
7882
{NULL, NULL} /* Sentinel */

include/process/base_process.hpp

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -121,9 +121,12 @@ void process_body(int read_fd, int send_fd, bool isMultiOutput) {
121121

122122
// we may have a fastflow constant as result
123123
if (PyObject_TypeCheck(py_result, &py_ff_constant_type) != 0) {
124-
// py_ff_constant_object* _const_result = reinterpret_cast<py_ff_constant_object*>(py_result);
125-
// the only constant available is FF_GO_ON
126-
err = sendMessage(read_fd, send_fd, { .type = MESSAGE_TYPE_RESPONSE_GO_ON, .data = "", .f_name = "" });
124+
py_ff_constant_object* _const_result = reinterpret_cast<py_ff_constant_object*>(py_result);
125+
err = sendMessage(read_fd, send_fd, {
126+
.type = _const_result->ff_const == ff::FF_EOS ? MESSAGE_TYPE_EOS:MESSAGE_TYPE_GO_ON,
127+
.data = "",
128+
.f_name = ""
129+
});
127130
if (err <= 0) handleError("[child] send constant response", cleanup_exit());
128131
LOG("[child] sent constant response");
129132
} else {
@@ -305,9 +308,9 @@ class base_process {
305308
}
306309

307310
// got response of svc
308-
if (response.type == MESSAGE_TYPE_RESPONSE_GO_ON) return ff::FF_GO_ON;
309-
if (response.type == MESSAGE_TYPE_RESPONSE_EOS) return ff::FF_EOS;
310-
if (response.data.compare(none_str) == 0) return ff::FF_EOS;
311+
if (response.type == MESSAGE_TYPE_GO_ON) return ff::FF_GO_ON;
312+
if (response.type == MESSAGE_TYPE_EOS) return ff::FF_EOS;
313+
if (response.data.compare(none_str) == 0) return ff::FF_GO_ON;
311314

312315
return new std::string(response.data);
313316
}

include/process/messaging.hpp

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,8 @@
1010
enum message_type {
1111
MESSAGE_TYPE_RESPONSE = 1,
1212
MESSAGE_TYPE_REMOTE_PROCEDURE_CALL,
13-
MESSAGE_TYPE_RESPONSE_GO_ON,
14-
MESSAGE_TYPE_RESPONSE_EOS,
13+
MESSAGE_TYPE_GO_ON,
14+
MESSAGE_TYPE_EOS,
1515
MESSAGE_TYPE_ACK
1616
};
1717

@@ -62,14 +62,15 @@ int receiveMessage(int read_fd, int send_fd, Message& message) {
6262
uint32_t dataSize;
6363
res = read(read_fd, &dataSize, sizeof(dataSize));
6464
if (res <= 0) handleError("read data size", return res);
65+
6566
char* bufferData = new char[dataSize + 1];
6667
if (dataSize > 0) {
6768
res = readn(read_fd, bufferData, dataSize);
6869
if (res <= 0) handleError("read data", return res);
6970
}
7071

7172
bufferData[dataSize] = '\0';
72-
message.data = std::string(bufferData);
73+
//message.data = std::string(bufferData);
7374
message.data.assign(bufferData, dataSize);
7475
delete[] bufferData;
7576

@@ -124,7 +125,7 @@ void create_message_ff_send_out_to(Message &message, int index, void* &constant,
124125
message.data.append(index_str);
125126
message.data.append("~");
126127
message.data.append(constant != NULL ? "t":"f");
127-
message.data.append(constant != NULL ? std::to_string(constant == ff::FF_EOS ? MESSAGE_TYPE_RESPONSE_EOS:MESSAGE_TYPE_RESPONSE_GO_ON):data);
128+
message.data.append(constant != NULL ? std::to_string(constant == ff::FF_EOS ? MESSAGE_TYPE_EOS:MESSAGE_TYPE_GO_ON):data);
128129
}
129130

130131
void parse_message_ff_send_out_to(Message &message, void **constant, int *index, std::string *data) {
@@ -133,10 +134,10 @@ void parse_message_ff_send_out_to(Message &message, void **constant, int *index,
133134
if (message.data.at(dividerPos+1) == 't') {
134135
*data = "";
135136
std::string inner_data = message.data.substr(dividerPos+2, message.data.length() - dividerPos - 1);
136-
if (inner_data.compare(std::to_string(MESSAGE_TYPE_RESPONSE_EOS)) == 0) {
137+
if (inner_data.compare(std::to_string(MESSAGE_TYPE_EOS)) == 0) {
137138
*constant = ff::FF_EOS;
138139
}
139-
if (inner_data.compare(std::to_string(MESSAGE_TYPE_RESPONSE_GO_ON)) == 0) {
140+
if (inner_data.compare(std::to_string(MESSAGE_TYPE_GO_ON)) == 0) {
140141
*constant = ff::FF_GO_ON;
141142
}
142143
} else {

include/py_ff_a2a.hpp

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -138,8 +138,18 @@ PyObject* py_ff_a2a_add_firstset(PyObject *self, PyObject *args, PyObject* kwds)
138138
py_ff_pipeline_object* _pipe = reinterpret_cast<py_ff_pipeline_object*>(item);
139139
if (!_pipe->pipeline->isMultiOutput()) {
140140
ff::ff_node* last_stage = _pipe->pipeline->get_laststage();
141-
ff::ff_node* new_node = new ff::internal_mo_transformer(last_stage, false);
142-
_pipe->pipeline->change_node(last_stage, new_node, true, true);
141+
// let's replace the last stage with a multi output variant
142+
PyObject *last_stage_py_node = _self->use_subinterpreters ?
143+
(reinterpret_cast<ff_node_subint*>(last_stage))->get_python_object():
144+
(reinterpret_cast<ff_node_process*>(last_stage))->get_python_object();
145+
if (last_stage_py_node == NULL) {
146+
PyErr_SetString(PyExc_RuntimeError, "Unable to transform previous stage to multi output (null pointer)");
147+
return NULL;
148+
}
149+
ff::ff_node* new_last_stage = _self->use_subinterpreters ?
150+
(ff::ff_node*) new ff_monode_subint(last_stage_py_node):
151+
(ff::ff_node*) new ff_monode_process(last_stage_py_node);
152+
_pipe->pipeline->change_node(last_stage, new_last_stage, true, true);
143153
}
144154
node = _pipe->pipeline;
145155
} else if (_self->use_subinterpreters) {

include/py_ff_constant.hpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ static PyTypeObject py_ff_constant_type = {
1919
};
2020

2121
#define GO_ON_CONSTANT_NAME "GO_ON"
22+
#define EOS_CONSTANT_NAME "EOS"
2223

2324
PyObject* build_py_ff_constant(void *val) {
2425
py_ff_constant_object* constant = (py_ff_constant_object*) PyObject_CallObject(

include/subint/base_subint.hpp

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -141,6 +141,10 @@ for [k, v] in glb:
141141
CHECK_ERROR_THEN("PyDict_SetItemString failure: ", returnValue = -1;)
142142
}
143143

144+
if (PyDict_SetItemString(globals, EOS_CONSTANT_NAME, build_py_ff_constant(ff::FF_EOS)) == -1) {
145+
CHECK_ERROR_THEN("PyDict_SetItemString failure: ", returnValue = -1;)
146+
}
147+
144148
if (PyObject_HasAttrString(node, "svc_init")) {
145149
PyObject* svc_init_func = PyObject_GetAttrString(node, "svc_init");
146150
if (svc_init_func) {
@@ -162,20 +166,22 @@ for [k, v] in glb:
162166
}
163167

164168
void * svc(void *arg) {
165-
auto is_ff_marker = arg == ff::FF_GO_ON;
166169
TIMESTART(svc_start_time);
167-
PyObject* pickled_bytes = is_ff_marker || arg == NULL ? NULL:reinterpret_cast<PyObject*>(arg);
170+
// arg may be equal to ff::FF_GO_ON in case of a node of a first set of an a2a that hasn't input channels
171+
PyObject* pickled_bytes = arg == ff::FF_GO_ON || arg == NULL ? NULL:reinterpret_cast<PyObject*>(arg);
168172

169-
PyObject* py_args = is_ff_marker || arg == NULL ? nullptr:pickl->unpickle_bytes(pickled_bytes);
173+
PyObject* py_args = arg == ff::FF_GO_ON || arg == NULL ? nullptr:pickl->unpickle_bytes(pickled_bytes);
170174
CHECK_ERROR_THEN("unpickle serialized data failure: ", return NULL;)
171175
//if (pickled_bytes) Py_DECREF(pickled_bytes);
172176

173177
PyObject* py_result = py_args != nullptr && PyTuple_Check(py_args) == 1 ? PyObject_CallObject(svc_func, py_args):PyObject_CallFunctionObjArgs(svc_func, py_args, nullptr);
174178
CHECK_ERROR_THEN("PyObject_CallObject failure: ", return NULL;)
175-
179+
180+
// map None to ff::FF_GO_ON
181+
// we have None also if the svc function returned nothing (e.g. void function in c++)
176182
if (py_result == Py_None) {
177183
Py_DECREF(py_result);
178-
return NULL;
184+
return ff::FF_GO_ON;
179185
}
180186

181187
// we may have a fastflow constant as result

tests/python/a2a-ff-send-to.test.py

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
from fastflow_module import FFAllToAll, GO_ON, ff_send_out_to
1+
from fastflow_module import FFAllToAll, EOS, ff_send_out_to
22
import sys
33

44
"""
@@ -18,21 +18,18 @@ def __init__(self, id):
1818

1919
def svc(self, *arg):
2020
if self.counter > 5:
21-
return
21+
return EOS
2222
self.counter += 1
2323

2424
ff_send_out_to(list([f"source{self.id}-to-sink1"]), 0)
2525

26-
return GO_ON
27-
2826
class sink():
2927
def __init__(self, id):
3028
self.id = id
3129

3230
def svc(self, lis: list):
3331
lis.append(f"sink{self.id}")
3432
print(lis)
35-
return GO_ON
3633

3734
def run_test(use_subinterpreters = True):
3835
a2a = FFAllToAll(use_subinterpreters)

tests/python/a2a-with-pipe.test.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
from fastflow_module import FFAllToAll, FFPipeline, GO_ON
1+
from fastflow_module import FFAllToAll, FFPipeline, EOS
22
import sys
33

44
"""
@@ -18,7 +18,7 @@ def __init__(self, id):
1818

1919
def svc(self, *arg):
2020
if self.counter > 5:
21-
return
21+
return EOS
2222
self.counter += 1
2323

2424
return list([self.id])
@@ -46,7 +46,6 @@ def __init__(self, id):
4646
def svc(self, lis: list):
4747
lis.append(self.id)
4848
print(lis)
49-
return GO_ON
5049

5150
def run_test(use_subinterpreters = True):
5251
a2a = FFAllToAll(use_subinterpreters)

tests/python/a2a.test.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
from fastflow_module import FFAllToAll, GO_ON
1+
from fastflow_module import FFAllToAll, EOS
22
import sys
33

44
"""
@@ -18,7 +18,7 @@ def __init__(self, id):
1818

1919
def svc(self, *arg):
2020
if self.counter > 5:
21-
return
21+
return EOS
2222
self.counter += 1
2323

2424
return list([self.id])
@@ -30,7 +30,6 @@ def __init__(self, id):
3030
def svc(self, lis: list):
3131
lis.append(self.id)
3232
print(lis)
33-
return GO_ON
3433

3534
def run_test(use_subinterpreters = True):
3635
a2a = FFAllToAll(use_subinterpreters)

tests/python/farm.test.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
from fastflow_module import FFFarm, GO_ON
1+
from fastflow_module import FFFarm, EOS
22
import sys
33

44
class source():
@@ -7,7 +7,7 @@ def __init__(self):
77

88
def svc(self, *arg):
99
if self.counter == 12:
10-
return
10+
return EOS
1111
self.counter += 1
1212

1313
return list(['source'])
@@ -23,7 +23,6 @@ def svc(self, lis: list):
2323
class sink():
2424
def svc(self, lis: list):
2525
print(lis)
26-
return GO_ON
2726

2827
def run_test(use_subinterpreters = True):
2928
farm = FFFarm(use_subinterpreters)

0 commit comments

Comments
 (0)