Skip to content

Commit e94db06

Browse files
committed
feat: set fastflow as module name
1 parent 70b4dc3 commit e94db06

38 files changed

+150
-668
lines changed

README.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
### Farm
44

55
```py
6-
from fastflow_module import FFFarm, GO_ON
6+
from fastflow import FFFarm, GO_ON
77
import sys
88

99
class source():
@@ -51,7 +51,7 @@ farm.run_and_wait_end()
5151
### Pipeline
5252

5353
```py
54-
from fastflow_module import FFPipeline, GO_ON
54+
from fastflow import FFPipeline, GO_ON
5555

5656
# define a stage
5757
class stage():

benchmark/README.md

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
# How to run the benchmarks
2+
3+
1. From the root directory, create virtual environment via `python3.12 -m venv .venv`
4+
2. From the root directory, activate the virtual environment by running `source .venv/bin/activate`
5+
3. Build and install `busy_wait`. Read the README.md file inside busy_wait's directory
6+
4. Ensure you have built and installed `fastflow` Python API by running `make` from the root directory
7+
8+
# Usage
9+
10+
```
11+
usage: bench.py [-h] -tasks TASKS -workers WORKERS -ms MS -bytes BYTES [-blocking-mode] [-no-mapping] [-proc | -sub | -seq]
12+
13+
Run a farm of <WORKERS> workers and <TASKS> tasks. Each task is <MS>ms long and has a size of <BYTES> bytes. Using subinterpreters or multiprocessing based strategy
14+
15+
options:
16+
-h, --help show this help message and exit
17+
-tasks TASKS Number of tasks to process
18+
-workers WORKERS Number of workers of the farm
19+
-ms MS Duration, in milliseconds, of one task
20+
-bytes BYTES The size, in bytes, of one task
21+
-blocking-mode Enable blocking mode of the farm
22+
-no-mapping Disable fastflow's mapping
23+
-proc Use multiprocessing to process tasks
24+
-sub Use subinterpreters to process tasks
25+
-seq Run tasks sequentially
26+
```

benchmark/busy_wait/README.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
# How to build
2+
3+
1. ensure you have activated the virtual environment
4+
2. `cd` to busy_wait's directory
5+
3. run `./install.sh`

benchmark/busy_wait/install.sh

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,3 @@
11
rm -rf build
2-
python3.12 setup.py install
2+
pip install setuptools
3+
pip install .

benchmark/farm/bench.py

Lines changed: 19 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,8 @@
1-
from fastflow_module import FFFarm, GO_ON
1+
from fastflow import FFFarm, EOS, GO_ON
22
import argparse
33
import sys
44
import busy_wait
55

6-
"""class DummyData:
7-
def __init__(self, size, val):
8-
self.astr = f"a string {val}"
9-
self.num = 1704 + val
10-
self.adict = {
11-
"val": 17 + val,
12-
"str": f"string inside a dictionary {val}"
13-
}
14-
self.atup = (4, 17, 16, val, 15, 30)
15-
self.aset = set([1, 2, 9, 6, val])
16-
17-
def __str__(self):
18-
return f"{self.astr}, {self.num}, {self.adict}, {self.atup}, {self.aset}"
19-
20-
__repr__ = __str__"""
21-
226
class DummyData:
237
def __init__(self, data):
248
self.data = [data]
@@ -35,7 +19,7 @@ def __init__(self, data_sample, n_tasks):
3519

3620
def svc(self, *args):
3721
if self.n_tasks == 0:
38-
return
22+
return EOS
3923
self.n_tasks -= 1
4024

4125
return DummyData(self.data_sample)
@@ -75,8 +59,9 @@ def build_farm(n_tasks, task_ms, nworkers, data_sample, use_subinterpreters = Fa
7559

7660
return farm
7761

78-
def run_farm(n_tasks, task_ms, nworkers, data_sample, use_subinterpreters = False, blocking_mode = None, no_mapping = False, use_main_thread = False):
79-
print(f"run farm of {nworkers} workers and {n_tasks} tasks", file=sys.stderr)
62+
def run_farm(n_tasks, task_ms, nworkers, data_sample, data_size, use_subinterpreters = False, blocking_mode = None, no_mapping = False, use_main_thread = False):
63+
strategy = "Not using any strategy" if use_main_thread else f"Using {'subinterpreters' if use_subinterpreters else 'processes'}-based strategy"
64+
print(f"Running a farm of {nworkers} workers and {n_tasks} tasks. Each task is {task_ms}ms long and has a size of {data_size} bytes. {strategy}", file=sys.stderr)
8065
farm = build_farm(n_tasks, task_ms, nworkers, data_sample, use_subinterpreters, use_main_thread)
8166
if blocking_mode is not None:
8267
farm.blocking_mode(blocking_mode)
@@ -90,55 +75,30 @@ def get_data_sample(task_bytes):
9075
result = ascii_letters * int(task_bytes / len(ascii_letters))
9176
return result
9277

93-
def print_res(title, res, args):
94-
print(title)
95-
print(f"res[{args.bytes}] = res.get({args.bytes}, []); res.get({args.bytes}).append(({args.workers}, {res})) # bytes =", args.bytes, "ms =", args.ms)
96-
9778
if __name__ == "__main__":
98-
parser = argparse.ArgumentParser(description='Process some tasks.')
79+
parser = argparse.ArgumentParser(description='Run a farm of <WORKERS> workers and <TASKS> tasks. Each task is <MS>ms long and has a size of <BYTES> bytes. Using subinterpreters or multiprocessing based strategy')
9980
parser.add_argument('-tasks', type=int, help='Number of tasks to process', required=True)
10081
parser.add_argument('-workers', type=int, help='Number of workers of the farm', required=True)
101-
parser.add_argument('-ms', type=int, help='Duration in milliseconds of one task', required=True)
82+
parser.add_argument('-ms', type=int, help='Duration, in milliseconds, of one task', required=True)
10283
parser.add_argument('-bytes', type=int, help='The size, in bytes, of one task', required=True)
103-
parser.add_argument('-blocking-mode', help='The blocking mode of the farm', required=False, action='store_true', default=None)
84+
parser.add_argument('-blocking-mode', help='Enable blocking mode of the farm', required=False, action='store_true', default=None)
10485
parser.add_argument('-no-mapping', help="Disable fastflow's mapping", required=False, action='store_true')
105-
group = parser.add_mutually_exclusive_group(required=False)
86+
group = parser.add_mutually_exclusive_group(required=True)
10687
group.add_argument('-proc', action='store_true', help='Use multiprocessing to process tasks')
10788
group.add_argument('-sub', action='store_true', help='Use subinterpreters to process tasks')
10889
group.add_argument('-seq', action='store_true', help='Run tasks sequentially')
10990
args = parser.parse_args()
11091

11192
# test the serialization to adjust the number of bytes
11293
data_sample = get_data_sample(args.bytes)
94+
95+
res = run_farm(args.tasks, args.ms, args.workers, data_sample, args.bytes, use_subinterpreters = args.sub and not args.proc, blocking_mode = args.blocking_mode, no_mapping = args.no_mapping, use_main_thread = args.seq)
96+
print(f"res[{args.bytes}] = res.get({args.bytes}, []); res.get({args.bytes}).append(({args.workers}, {res})) # bytes =", args.bytes, "ms =", args.ms)
97+
11398

114-
if args.proc:
115-
processes = [[],[]]
116-
res = run_farm(args.tasks, args.ms, args.workers, data_sample, use_subinterpreters = False, blocking_mode = args.blocking_mode, no_mapping = args.no_mapping)
117-
print(f"done in {res}ms")
118-
processes[0].append(args.workers) # x
119-
processes[1].append(res) # y
120-
121-
print_res("processes", res, args)
122-
elif args.sub:
123-
subinterpreters = [[],[]]
124-
res = run_farm(args.tasks, args.ms, args.workers, data_sample, use_subinterpreters = True, blocking_mode = args.blocking_mode, no_mapping = args.no_mapping)
125-
print(f"done in {res}ms")
126-
subinterpreters[0].append(args.workers) # x
127-
subinterpreters[1].append(res) # y
128-
129-
print_res("subinterpreters", res, args)
130-
else:
131-
standard = [[],[]]
132-
res = run_farm(args.tasks, args.ms, args.workers, data_sample, use_subinterpreters = False, use_main_thread = True)
133-
print(f"done in {res}ms")
134-
standard[0].append(args.workers) # x
135-
standard[1].append(res) # y
136-
137-
print_res("standard", res, args)
138-
139-
140-
# for i in 1 2 4 8 10 12 16 20 26 30 36 42 48 54 60 64; do for size in 1024 4096 8192 16384 32768 65536 524288 1048576; do echo $i $size; done; done
141-
# python3.12 benchmark/farm/bench.py -tasks 128 -workers 4 -ms 100 -bytes 512000 -sub
142-
# python3.12 benchmark/farm/bench.py -tasks 128 -workers 4 -ms 100 -bytes 320024 -sub 2>1 | grep subinterp
143-
144-
# for i in 1 2 4 8 10 12 16 20 26 30 36 42 48 54 60 64; do for size in 1024 4096 8192 16384 32768 65536 524288 1048576; do python3.12 benchmark/farm/bench.py -tasks 512 -workers $i -ms 500 -bytes $size -sub 2>1 | grep subinterp; done; done
99+
"""
100+
Some examples to run this benchmark
101+
- python3.12 benchmark/farm/bench.py -tasks 128 -workers 4 -ms 100 -bytes 512000 -sub
102+
- python3.12 benchmark/farm/bench.py -tasks 128 -workers 4 -ms 100 -bytes 320024 -sub 2>1 | grep subinterp
103+
- for i in 1 2 4 8 10 12 16 20 26 30 36 42 48 54 60 64; do for size in 1024 4096 8192 16384 32768 65536 524288 1048576; do python3.12 benchmark/farm/bench.py -tasks 512 -workers $i -ms 500 -bytes $size -sub 2>1 | grep subinterp; done; done
104+
"""

fastflow_module.cpp

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515

1616
/* initialization function */
1717
static int
18-
fastflow_module_exec(PyObject *module)
18+
fastflow_exec(PyObject *module)
1919
{
2020
// add FFPipeline
2121
if (PyType_Ready(&py_ff_pipeline_type) < 0)
@@ -83,7 +83,7 @@ static PyMethodDef module_methods[] = {
8383
};
8484

8585
static PyModuleDef_Slot module_slots[] = {
86-
{Py_mod_exec, (void*) fastflow_module_exec},
86+
{Py_mod_exec, (void*) fastflow_exec},
8787
#if PY_MINOR_VERSION >= 12
8888
{Py_mod_multiple_interpreters, Py_MOD_PER_INTERPRETER_GIL_SUPPORTED},
8989
#endif
@@ -92,15 +92,15 @@ static PyModuleDef_Slot module_slots[] = {
9292

9393
static struct PyModuleDef moduledef = {
9494
.m_base = PyModuleDef_HEAD_INIT,
95-
.m_name = "fastflow_module",
95+
.m_name = "fastflow",
9696
.m_doc = "This is FastFlow's docstring",
9797
.m_size = 0,
9898
.m_methods = module_methods,
9999
.m_slots = module_slots,
100100
};
101101

102102
PyMODINIT_FUNC
103-
PyInit_fastflow_module(void) {
103+
PyInit_fastflow(void) {
104104
return PyModuleDef_Init(&moduledef);
105105
}
106106

include/process/base_process.hpp

Lines changed: 29 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,9 @@
1212
#include "object.h"
1313
#include "../py_ff_constant.hpp"
1414

15+
#define SERIALIZED_EMPTY_TUPLE "(t."
16+
#define SERIALIZED_NONE "N."
17+
1518
void process_body(int read_fd, int send_fd, bool isMultiOutput) {
1619
Message message;
1720

@@ -91,7 +94,7 @@ void process_body(int read_fd, int send_fd, bool isMultiOutput) {
9194
CHECK_ERROR_THEN("PyModule_GetDict failure: ", cleanup_exit();)
9295

9396
// if you access the methods from the module itself, replace it with the callback
94-
if (PyDict_SetItemString(globals, "fastflow_module", (PyObject*) callback) == -1) {
97+
if (PyDict_SetItemString(globals, "fastflow", (PyObject*) callback) == -1) {
9598
CHECK_ERROR_THEN("PyDict_SetItemString failure: ", cleanup_exit();)
9699
}
97100
// if you access the methods by importing them from the module, replace each method with the delegate's one
@@ -121,18 +124,19 @@ void process_body(int read_fd, int send_fd, bool isMultiOutput) {
121124
if (PyObject_TypeCheck(py_result, &py_ff_constant_type) != 0) {
122125
py_ff_constant_object* _const_result = reinterpret_cast<py_ff_constant_object*>(py_result);
123126
err = sendMessage(read_fd, send_fd, {
124-
.type = _const_result->ff_const == ff::FF_EOS ? MESSAGE_TYPE_EOS:MESSAGE_TYPE_GO_ON,
125-
.data = "",
126-
.f_name = ""
127+
.type = _const_result->ff_const == ff::FF_EOS ? MESSAGE_TYPE_EOS:MESSAGE_TYPE_GO_ON
127128
});
128129
if (err <= 0) handleError("[child] send constant response", cleanup_exit());
129130
} else {
131+
std::string result_str = SERIALIZED_NONE;
130132
// serialize response
131-
auto result_str = pickl.pickle(py_result);
132-
CHECK_ERROR_THEN("[child] pickle result failure: ", cleanup_exit();)
133+
if (py_result != Py_None) {
134+
result_str = pickl.pickle(py_result);
135+
CHECK_ERROR_THEN("[child] pickle result failure: ", cleanup_exit();)
136+
}
133137

134138
// send response
135-
err = sendMessage(read_fd, send_fd, { .type = MESSAGE_TYPE_RESPONSE, .data = result_str, .f_name = "" });
139+
err = sendMessage(read_fd, send_fd, { .type = MESSAGE_TYPE_RESPONSE, .data = result_str });
136140
if (err <= 0) handleError("[child] send response", cleanup_exit());
137141

138142
Py_DECREF(py_result);
@@ -146,8 +150,6 @@ void process_body(int read_fd, int send_fd, bool isMultiOutput) {
146150
cleanup_exit();
147151
}
148152

149-
#define SERIALIZED_EMPTY_TUPLE "(t."
150-
151153
class base_process {
152154
public:
153155
base_process(PyObject* node): node(node), send_fd(-1), read_fd(-1), registered_callback(NULL) {
@@ -175,18 +177,19 @@ class base_process {
175177
auto node_str = pickl.pickle(node);
176178
CHECK_ERROR_THEN("pickle node failure: ", return -1;)
177179

178-
none_str = std::string(pickl.pickle(Py_None));
179-
CHECK_ERROR_THEN("pickle None failure: ", return -1;)
180-
181180
int mainToChildFD[2]; // data to be sent from main process to child process
182181
int childToMainFD[2]; // data to be sent from child process to main process
183182
// create pipes
184183
if (pipe(mainToChildFD) == -1) {
185-
perror("pipe mainToChildFD");
184+
std::string msg = "Failed to create pipe. ";
185+
msg.append(strerror(errno));
186+
PyErr_SetString(PyExc_Exception, msg.c_str());
186187
return -1;
187188
}
188189
if (pipe(childToMainFD) == -1) {
189-
perror("pipe childToMainFD");
190+
std::string msg = "Failed to create pipe. ";
191+
msg.append(strerror(errno));
192+
PyErr_SetString(PyExc_Exception, msg.c_str());
190193
return -1;
191194
}
192195

@@ -196,7 +199,9 @@ class base_process {
196199
PyOS_BeforeFork();
197200
pid = fork();
198201
if (pid == -1) {
199-
perror("fork failed");
202+
std::string msg = "Failed to fork. ";
203+
msg.append(strerror(errno));
204+
PyErr_SetString(PyExc_Exception, msg.c_str());
200205
returnValue = -1;
201206
} else if (pid == 0) { // child
202207
PyOS_AfterFork_Child();
@@ -205,7 +210,9 @@ class base_process {
205210
close(childToMainFD[0]); // Close read end of childToMainFD
206211

207212
process_body(mainToChildFD[0], childToMainFD[1], registered_callback != NULL);
208-
perror("[child] shouldn't be here...");
213+
std::string msg = "[child] shouldn't be here... ";
214+
msg.append(strerror(errno));
215+
PyErr_SetString(PyExc_Exception, msg.c_str());
209216
returnValue = -1;
210217
} else { // parent
211218
PyOS_AfterFork_Parent();
@@ -219,7 +226,8 @@ class base_process {
219226
send_fd = mainToChildFD[1];
220227
read_fd = childToMainFD[0];
221228

222-
int err = sendMessage(read_fd, send_fd, { .type = MESSAGE_TYPE_REMOTE_PROCEDURE_CALL, .data = node_str, .f_name = "" });
229+
// send serialized node
230+
int err = sendMessage(read_fd, send_fd, { .data = node_str });
223231
if (err <= 0) handleError("send serialized node", returnValue = -1);
224232

225233
if (err > 0 && has_svc_init) {
@@ -277,12 +285,11 @@ class base_process {
277285
std::string *data = new std::string();
278286
parse_message_ff_send_out_to(response, &constant, &index, data);
279287

280-
if (constant != NULL) free(data);
281288
// finally perform ff_send_out_to
282289
bool result = registered_callback->ff_send_out_to(constant != NULL ? constant:data, index);
283-
290+
284291
// send ff_send_out_to result
285-
err = sendMessage(read_fd, send_fd, { .type = MESSAGE_TYPE_RESPONSE, .data = result ? "t":"f", .f_name = "" });
292+
err = sendMessage(read_fd, send_fd, { .type = MESSAGE_TYPE_RESPONSE, .data = result ? "t":"f" });
286293
if (err <= 0) {
287294
handleError("error sending ff_send_out_to response", );
288295
return NULL;
@@ -298,7 +305,7 @@ class base_process {
298305

299306
// got response of svc
300307
if (response.type == MESSAGE_TYPE_EOS) return ff::FF_EOS;
301-
if (response.type == MESSAGE_TYPE_GO_ON || response.data.compare(none_str) == 0) {
308+
if (response.type == MESSAGE_TYPE_GO_ON || response.data.compare(SERIALIZED_NONE) == 0) {
302309
return ff::FF_GO_ON;
303310
}
304311

@@ -314,7 +321,7 @@ class base_process {
314321
}
315322

316323
// send end of life. Meanwhile we acquire the GIL and cleanup, the process will stop
317-
int err = sendMessage(read_fd, send_fd, { .type = MESSAGE_TYPE_END_OF_LIFE, .data = "", .f_name = "" });
324+
int err = sendMessage(read_fd, send_fd, { .type = MESSAGE_TYPE_END_OF_LIFE });
318325

319326
// Acquire the main GIL
320327
PyEval_RestoreThread(tstate);
@@ -347,7 +354,6 @@ class base_process {
347354
int send_fd;
348355
int read_fd;
349356
pid_t pid;
350-
std::string none_str;
351357
ff::ff_monode* registered_callback;
352358
};
353359

include/process/messaging.hpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,8 @@ enum message_type {
1818

1919
struct Message {
2020
message_type type;
21-
std::string data;
22-
std::string f_name;
21+
std::string data = "";
22+
std::string f_name = "";
2323
};
2424

2525
int sendMessage(int read_fd, int send_fd, const Message& message) {

include/py_ff_a2a.hpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -278,7 +278,7 @@ static PyType_Spec spec_py_ff_a2a = {
278278

279279
PyTypeObject py_ff_a2a_type = {
280280
PyVarObject_HEAD_INIT(NULL, 0)
281-
.tp_name = "fastflow_module.FFAllToAll",
281+
.tp_name = "fastflow.FFAllToAll",
282282
.tp_basicsize = sizeof(py_ff_a2a_object) + sizeof(ff::ff_a2a),
283283
.tp_itemsize = 0,
284284
.tp_dealloc = (destructor) py_ff_a2a_dealloc,

include/py_ff_callback.hpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ static PyMethodDef py_ff_callback_methods[] = {
3737

3838
static PyTypeObject py_ff_callback_type = {
3939
PyVarObject_HEAD_INIT(NULL, 0)
40-
.tp_name = "fastflow_module.FFCallback",
40+
.tp_name = "fastflow.FFCallback",
4141
.tp_basicsize = sizeof(py_ff_callback_object),
4242
.tp_itemsize = 0,
4343
.tp_flags = Py_TPFLAGS_DEFAULT,

0 commit comments

Comments
 (0)