Skip to content

Commit cb98116

Browse files
committed
feat: serialize environment once before running the nodes
1 parent 57a1fe8 commit cb98116

File tree

11 files changed

+245
-157
lines changed

11 files changed

+245
-157
lines changed

include/debugging.hpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
#ifndef LOG_HPP
22
#define LOG_HPP
33

4-
//#define DEBUG
4+
#define DEBUG
55

66
#ifdef DEBUG
77
# define LOG(msg) std::cerr << msg << std::endl

include/node_utils.hpp

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,4 +46,56 @@ ff::ff_node* args_to_node(PyObject *args, PyObject *kwds, bool use_subints, bool
4646
return multi_output ? (ff::ff_node*)new ff_monode_process(py_node):new ff_node_process(py_node);
4747
}
4848

49+
template<typename T>
50+
PyObject* run_and_wait_end(T *node, bool use_subinterpreters) {
51+
PyObject* globals = NULL;
52+
if (use_subinterpreters) {
53+
// Load pickling/unpickling functions but in the main interpreter
54+
pickling pickling_main;
55+
CHECK_ERROR_THEN("load pickle and unpickle failure: ", return NULL;)
56+
57+
globals = get_globals();
58+
59+
// run code to compute global declarations, imports, etc...
60+
PyRun_String(R"PY(
61+
glb = [[k,v] for k,v in globals().items() if not (k.startswith('__') and k.endswith('__'))]
62+
63+
import inspect
64+
__ff_environment_string = ""
65+
for [k, v] in glb:
66+
try:
67+
if inspect.ismodule(v):
68+
if v.__package__:
69+
__ff_environment_string += f"from {v.__package__} import {k}"
70+
else: # v.__package__ is empty the module and the package are the same
71+
__ff_environment_string += f"import {k}"
72+
elif inspect.isclass(v) or inspect.isfunction(v):
73+
__ff_environment_string += inspect.getsource(v)
74+
# else:
75+
# __ff_environment_string += f"{k} = {v}"
76+
__ff_environment_string += "\n"
77+
except:
78+
pass
79+
)PY", Py_file_input, globals, NULL);
80+
CHECK_ERROR_THEN("PyRun_String failure: ", return NULL;)
81+
// Cleanup of objects created
82+
pickling_main.~pickling();
83+
}
84+
85+
// Release GIL while waiting for thread.
86+
int val = 0;
87+
Py_BEGIN_ALLOW_THREADS
88+
val = node->run_and_wait_end();
89+
Py_END_ALLOW_THREADS
90+
91+
if (use_subinterpreters) {
92+
// cleanup of the environment string to free memory
93+
PyRun_String(R"PY(
94+
__ff_environment_string = ""
95+
)PY", Py_file_input, globals, NULL);
96+
}
97+
98+
return PyLong_FromLong(val);
99+
}
100+
49101
#endif // NODE_UTILS

include/process/base_process.hpp

Lines changed: 71 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -2,24 +2,25 @@
22
#define BASE_PROCESS
33

44
#include <Python.h>
5+
#include "object.h"
56
#include <ff/ff.hpp>
67
#include <iostream>
78
#include <sys/wait.h>
89
#include <typeinfo>
910
#include <unordered_map>
1011
#include "pickle.hpp"
1112
#include "debugging.hpp"
12-
#include "../py_ff_callback.hpp"
1313
#include "messaging.hpp"
14-
#include "object.h"
15-
#include "../py_ff_constant.hpp"
14+
#include "python_utils.hpp"
15+
#include "py_ff_callback.hpp"
16+
#include "py_ff_constant.hpp"
1617

1718
#define handleError(msg, then) do { perror(msg); then; } while(0)
1819

1920
#define SERIALIZED_EMPTY_TUPLE "(t."
2021
#define SERIALIZED_NONE "N."
2122

22-
void process_body(int read_fd, int send_fd, bool isMultiOutput) {
23+
void process_body(std::string &node_ser, int read_fd, int send_fd, bool isMultiOutput, bool hasSvcInit) {
2324
Messaging messaging{ send_fd, read_fd };
2425
Message message;
2526

@@ -38,14 +39,8 @@ void process_body(int read_fd, int send_fd, bool isMultiOutput) {
3839
};
3940

4041
CHECK_ERROR_THEN("[child] load pickle/unpickle failure: ", cleanup_exit();)
41-
42-
// receive serialized node
43-
int err = messaging.recv_message(message);
44-
if (err <= 0) handleError("[child] read serialized node", cleanup_exit());
4542

46-
// deserialize Python node
47-
auto node = pickl.unpickle(message.data[0]);
48-
CHECK_ERROR_THEN("[child] unpickle node failure: ", cleanup_exit();)
43+
PyObject* node = pickl.unpickle(node_ser);
4944

5045
// create the callback object
5146
py_ff_callback_object* callback = (py_ff_callback_object*) PyObject_CallObject(
@@ -67,20 +62,24 @@ void process_body(int read_fd, int send_fd, bool isMultiOutput) {
6762
// we may have a fastflow constant as data to send out to index
6863
py_ff_constant_object* _const_result = reinterpret_cast<py_ff_constant_object*>(pydata);
6964
int err = messaging.call_remote(response, "ff_send_out_to", index, _const_result->ff_const);
65+
if (err <= 0) {
66+
PyErr_SetString(PyExc_Exception, "Error occurred sending ff_send_out_to request");
67+
return (PyObject*) NULL;
68+
}
7069
} else {
7170
int err = messaging.call_remote(response, "ff_send_out_to", index, pickl.pickle(pydata));
7271
if (PyErr_Occurred()) return (PyObject*) NULL;
72+
if (err <= 0) {
73+
PyErr_SetString(PyExc_Exception, "Error occurred sending ff_send_out_to request");
74+
return (PyObject*) NULL;
75+
}
7376
}
7477

7578
auto result = messaging.parse_data<bool>(response.data);
7679
return std::get<0>(result) ? Py_True:Py_False;
7780
};
7881

79-
PyObject* main_module = PyImport_ImportModule("__main__");
80-
CHECK_ERROR_THEN("PyImport_ImportModule __main__ failure: ", cleanup_exit();)
81-
PyObject* globals = PyModule_GetDict(main_module);
82-
CHECK_ERROR_THEN("PyModule_GetDict failure: ", cleanup_exit();)
83-
82+
PyObject* globals = get_globals();
8483
// if you access the methods from the module itself, replace it with the callback
8584
if (PyDict_SetItemString(globals, "fastflow", (PyObject*) callback) == -1) {
8685
CHECK_ERROR_THEN("PyDict_SetItemString failure: ", cleanup_exit();)
@@ -90,21 +89,37 @@ void process_body(int read_fd, int send_fd, bool isMultiOutput) {
9089
CHECK_ERROR_THEN("PyDict_SetItemString failure: ", cleanup_exit();)
9190
}
9291

92+
int err = 1;
93+
if (hasSvcInit) {
94+
int result = 0;
95+
PyObject *py_func = PyObject_GetAttrString(node, "svc_init");
96+
PyObject* svc_init_result = PyObject_CallObject(py_func, nullptr);
97+
if (PyLong_Check(svc_init_result)) {
98+
result = static_cast<int>(PyLong_AsLong(svc_init_result));
99+
}
100+
err = messaging.send_response(result);
101+
if (err <= 0) handleError("[child] send svc_init result", cleanup_exit());
102+
}
103+
93104
while(err > 0) {
94105
err = messaging.recv_message(message);
95-
if (err <= 0) handleError("[child] read next", cleanup_exit());
106+
if (err <= 0) handleError("[child] recv remote call", cleanup_exit());
96107
if (message.type == MESSAGE_TYPE_END_OF_LIFE) break;
97108

98109
// deserialize data
99110
auto py_args_tuple = pickl.unpickle(message.data[0]);
100111
CHECK_ERROR_THEN("[child] deserialize data failure: ", std::cout << message.data[0] << std::endl; cleanup_exit();)
101-
// call function
112+
113+
// get the function reference
102114
PyObject *py_func = PyObject_GetAttrString(node, message.f_name.c_str());
115+
Py_XINCREF(py_func);
103116
CHECK_ERROR_THEN("[child] get node function: ", cleanup_exit();)
104117

105118
if (py_func) {
106-
// finally call the function
107-
PyObject* py_result = PyTuple_Check(py_args_tuple) == 1 ? PyObject_CallObject(py_func, py_args_tuple):PyObject_CallFunctionObjArgs(py_func, py_args_tuple, nullptr);
119+
// finally call the function. Use PyObject_CallObject if we have a tuple already, PyObject_CallFunctionObjArgs otherwise
120+
PyObject* py_result;
121+
if (PyTuple_Check(py_args_tuple) == 1) py_result = PyObject_CallObject(py_func, py_args_tuple);
122+
else py_result = PyObject_CallFunctionObjArgs(py_func, py_args_tuple, nullptr);
108123
CHECK_ERROR_THEN("[child] call function failure: ", cleanup_exit();)
109124

110125
// we may have a fastflow constant as result
@@ -141,27 +156,16 @@ class base_process {
141156
// initialize the thread state with main thread state
142157
tstate = PyThreadState_Get();
143158
Py_INCREF(node);
159+
has_svc_init = PyObject_HasAttrString(node, "svc_init") == 1;
160+
has_svc_end = PyObject_HasAttrString(node, "svc_end") == 1;
144161
}
145162

146163
int svc_init() {
164+
TIMESTART(svc_init_start_time);
147165
// associate a new thread state with ff_node's thread
148166
PyThreadState* cached_tstate = tstate;
149167
tstate = PyThreadState_New(cached_tstate->interp);
150168

151-
// Hold the main GIL
152-
PyEval_RestoreThread(tstate);
153-
154-
bool has_svc_init = PyObject_HasAttrString(node, "svc_init") == 1;
155-
has_svc_end = PyObject_HasAttrString(node, "svc_end") == 1;
156-
157-
// Load pickling/unpickling functions
158-
pickling pickl;
159-
CHECK_ERROR_THEN("load pickle and unpickle failure: ", return -1;)
160-
161-
// serialize Python node to string
162-
auto node_str = pickl.pickle(node);
163-
CHECK_ERROR_THEN("pickle node failure: ", return -1;)
164-
165169
int mainToChildFD[2]; // data to be sent from main process to child process
166170
int childToMainFD[2]; // data to be sent from child process to main process
167171
// create pipes
@@ -178,9 +182,16 @@ class base_process {
178182
return -1;
179183
}
180184

185+
// Hold the main GIL
186+
PyEval_RestoreThread(tstate);
187+
181188
int returnValue = 0;
189+
pickling pickl;
190+
CHECK_ERROR_THEN("load pickle/unpickle failure: ", return -1;)
191+
auto node_ser = pickl.pickle(node);
192+
182193
// from cpython source code
183-
// https://github.com/python/cpython/blob/main/Modules/posixmodule.c#L8056
194+
// https://github.com/python/cpython/blob/9d0a75269c6ae361b1ed5910c3b3424ed93b6f6d/Modules/posixmodule.c#L8044
184195
PyOS_BeforeFork();
185196
pid = fork();
186197
if (pid == -1) {
@@ -194,62 +205,45 @@ class base_process {
194205
close(mainToChildFD[1]); // Close write end of mainToChildFD
195206
close(childToMainFD[0]); // Close read end of childToMainFD
196207

197-
process_body(mainToChildFD[0], childToMainFD[1], registered_callback != NULL);
208+
pickl.~pickling();
209+
process_body(node_ser, mainToChildFD[0], childToMainFD[1], registered_callback != NULL, has_svc_init);
198210
std::string msg = "[child] shouldn't be here... ";
199211
msg.append(strerror(errno));
200212
PyErr_SetString(PyExc_Exception, msg.c_str());
201213
returnValue = -1;
202-
} else { // parent
203-
PyOS_AfterFork_Parent();
204-
205-
// Release the main GIL
206-
PyEval_SaveThread();
214+
}
207215

208-
close(mainToChildFD[0]); // Close read end of mainToChildFD
209-
close(childToMainFD[1]); // Close write end of childToMainFD
216+
// parent
217+
PyOS_AfterFork_Parent();
218+
pickl.~pickling();
210219

211-
messaging = { mainToChildFD[1], childToMainFD[0] };
220+
// Release the main GIL
221+
PyEval_SaveThread();
212222

213-
// send serialized node
214-
int err = messaging.send_data(node_str);
215-
if (err <= 0) handleError("send serialized node", returnValue = -1);
223+
close(mainToChildFD[0]); // Close read end of mainToChildFD
224+
close(childToMainFD[1]); // Close write end of childToMainFD
216225

217-
if (err > 0 && has_svc_init) {
218-
Message response;
219-
int err = messaging.call_remote(response, "svc_init", SERIALIZED_EMPTY_TUPLE);
220-
if (err <= 0) {
221-
handleError("read result of remote call of svc_init", );
222-
} else {
223-
// Hold the main GIL
224-
PyEval_RestoreThread(tstate);
225-
PyObject *svc_init_result = pickl.unpickle(response.data[0]);
226-
CHECK_ERROR_THEN("unpickle svc_init_result failure: ", returnValue = -1;)
227-
228-
returnValue = 0;
229-
if (PyLong_Check(svc_init_result)) {
230-
long res_as_long = PyLong_AsLong(svc_init_result);
231-
returnValue = static_cast<int>(res_as_long);
232-
}
233-
Py_DECREF(svc_init_result);
234-
pickl.~pickling();
235-
236-
// Release the main GIL
237-
PyEval_SaveThread();
238-
}
239-
}
226+
messaging = { mainToChildFD[1], childToMainFD[0] };
227+
if (has_svc_init) {
228+
// if the node has svc_init, the child process will call it
229+
// wait for svc_init result
230+
Message response;
231+
messaging.recv_message(response);
232+
returnValue = deserialize<int>(response.data[0]);
240233
}
241234

235+
LOGELAPSED("svc_init time ", svc_init_start_time);
242236
// from here the GIL is NOT acquired
243237
return returnValue;
244238
}
245239

246240
void* svc(void *arg) {
241+
TIMESTART(svc_start_time);
247242
// arg may be equal to ff::FF_GO_ON in case of a node of a first set of an a2a that hasn't input channels
248243
std::string serialized_data = arg == NULL || arg == ff::FF_GO_ON ? SERIALIZED_EMPTY_TUPLE:*reinterpret_cast<std::string*>(arg);
249244

250245
Message response;
251246
int err = messaging.call_remote(response, "svc", serialized_data);
252-
//int err = messaging.remote_procedure_call(serialized_data, "svc", response);
253247
if (err <= 0) {
254248
handleError("remote call of svc", );
255249
return NULL;
@@ -288,10 +282,13 @@ class base_process {
288282
void* constant = deserialize<void*>(response.data[0]);
289283
if (constant != NULL) return constant;
290284

285+
LOGELAPSED("svc time ", svc_start_time);
286+
291287
return new std::string(response.data[0]);
292288
}
293289

294290
void svc_end() {
291+
TIMESTART(svc_end_start_time);
295292
if (has_svc_end) {
296293
Message response;
297294
int err = messaging.call_remote(response, "svc_end", SERIALIZED_EMPTY_TUPLE);
@@ -300,6 +297,7 @@ class base_process {
300297

301298
// send end of life. Meanwhile we acquire the GIL and cleanup, the process will stop
302299
int err = messaging.eol();
300+
if (err <= 0) handleError("sending EOL", );
303301

304302
// Acquire the main GIL
305303
PyEval_RestoreThread(tstate);
@@ -312,6 +310,7 @@ class base_process {
312310
waitpid(pid, nullptr, 0);
313311

314312
messaging.closefds();
313+
LOGELAPSED("svc_end time ", svc_end_start_time);
315314
}
316315

317316
void register_callback(ff::ff_monode* cb_node) {
@@ -326,6 +325,7 @@ class base_process {
326325
PyThreadState *tstate;
327326
PyObject* node;
328327
bool has_svc_end;
328+
bool has_svc_init;
329329
Messaging messaging;
330330
pid_t pid;
331331
ff::ff_monode* registered_callback;

include/process/messaging.hpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ class Messaging {
9090
int res = read(read_fd, &message.type, sizeof(message.type));
9191
if (res <= 0) return res;
9292

93-
int data_vector_size;
93+
size_t data_vector_size;
9494
res = read(read_fd, &data_vector_size, sizeof(data_vector_size));
9595
if (res <= 0) return res;
9696
message.data.resize(data_vector_size);
@@ -162,7 +162,7 @@ class Messaging {
162162
if (write(send_fd, &type, sizeof(type)) == -1) return -1;
163163

164164
// send vector data size
165-
int data_vector_size = serialized_args.size();
165+
size_t data_vector_size = serialized_args.size();
166166
if (write(send_fd, &data_vector_size, sizeof(data_vector_size)) == -1) return -1;
167167

168168
// send vector of data

include/py_ff_a2a.hpp

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -105,14 +105,7 @@ PyObject* py_ff_a2a_run_and_wait_end(PyObject *self, PyObject *args)
105105
assert(self);
106106

107107
py_ff_a2a_object* _self = reinterpret_cast<py_ff_a2a_object*>(self);
108-
109-
// Release GIL while waiting for thread.
110-
int val = 0;
111-
Py_BEGIN_ALLOW_THREADS
112-
val = _self->a2a->run_and_wait_end();
113-
Py_END_ALLOW_THREADS
114-
115-
return PyLong_FromLong(val);
108+
return run_and_wait_end(_self->a2a, _self->use_subinterpreters);
116109
}
117110

118111
PyDoc_STRVAR(py_ff_a2a_add_firstset_doc, "Add first set to the a2a");

0 commit comments

Comments
 (0)