Skip to content

Commit bebe3c5

Browse files
committed
refactor: cleanup old files and update README.md
1 parent 6bd4195 commit bebe3c5

File tree

8 files changed

+37
-198
lines changed

8 files changed

+37
-198
lines changed

README.md

Lines changed: 32 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -3,68 +3,55 @@
33
### Farm
44

55
```py
6-
from fastflow_subint_module import FFFarm
6+
from fastflow_module import FFFarm, GO_ON
7+
import sys
8+
9+
class source():
10+
def __init__(self):
11+
self.counter = 1
712

8-
# define an emitter
9-
class emitter():
1013
def svc(self, *args):
11-
data = do_some_work()
12-
return data # data to send to workers
14+
# args are sent by the previous stage
15+
# or is an empty tuple if it is the first stage
16+
if self.counter == 12:
17+
return
18+
self.counter += 1
1319

14-
def svc_end(self):
15-
do_some_work() # do some work when the emitter ends
20+
return list(['source'])
1621

1722
class worker():
18-
def svc(self, *args): # args are sent by the emitter
19-
# perform some work
20-
data = busy_work(args)
21-
22-
return data # data to send to collector
23-
24-
def svc_end(self):
25-
do_some_work() # do some work when the worker ends
26-
27-
class collector():
28-
def svc_init(self):
29-
do_some_work() # do some work when the collector starts
30-
return 0 # 0 for success, error otherwise
23+
def __init__(self, id):
24+
self.id = id
3125

32-
def svc(self, *args): # args are sent by the workers
33-
# perform some work
34-
busy_work(args)
35-
36-
return 0
26+
def svc(self, lis: list):
27+
lis.append(self.id)
28+
return lis
3729

30+
class sink():
31+
def svc(self, lis: list):
32+
print(lis)
33+
return GO_ON
3834

3935
# create a farm, pass True or False if you want to use subinterpreters or not
40-
farm = FFFarm(True)
41-
42-
# create and add emitter
43-
em = emitter()
44-
farm.add_emitter(em)
45-
46-
# create and add collector
47-
coll = collector()
48-
farm.add_collector(coll)
49-
50-
# create 4 workers and put them into a list
36+
farm = FFFarm(False)
37+
sourcenode = source()
38+
sinknode = sink()
5139
w_lis = []
52-
for i in range(4):
40+
for i in range(3):
5341
w = worker(f"{i+1}")
5442
w_lis.append(w)
55-
# add the list of workers to the farm
43+
farm.add_emitter(sourcenode)
5644
farm.add_workers(w_lis)
57-
45+
farm.add_collector(sinknode)
5846
# finally run the farm. Blocking call: will resume when the farm ends
5947
farm.run_and_wait_end()
60-
# print how many milliseconds the farm took
61-
print(f"farm done in {farm.ffTime()}ms")
48+
6249
```
6350

6451
### Pipeline
6552

6653
```py
67-
from fastflow_subint_module import FFPipeline
54+
from fastflow_module import FFPipeline, GO_ON
6855

6956
# define a stage
7057
class stage():
@@ -85,7 +72,7 @@ class sinkstage():
8572

8673
def svc(self, *args): # args are sent by the previous stage
8774
data = do_some_work()
88-
return 0
75+
return GO_ON
8976

9077
# create a pipeline, pass True or False if you want to use subinterpreters or not
9178
pipe = FFPipeline(use_subinterpreters)
@@ -105,24 +92,5 @@ pipe.add_stage(sink)
10592
# finally run the pipeline. Blocking call: will resume when the pipeline ends
10693
pipe.run_and_wait_end()
10794
# print how many milliseconds the pipeline took
108-
print(f"farm done in {farm.pipeline()}ms")
109-
```
110-
111-
## Open questions
112-
- How to handle multi input/output nodes? The function ff_send_out(...) is not a member of the node class. A possible solution:
113-
```
114-
class worker():
115-
def svc(self, *args): # args are sent by the emitter
116-
# perform some work
117-
data = busy_work(args)
118-
fastflow.ff_send_out(data, 1) # <----- call ff_send_out
119-
return # the underline ff_node should continue instead of stopping
120-
121-
def svc_end(self):
122-
do_some_work() # do some work when the worker ends
123-
```
124-
- How to handle renaming of imported modules when recreating the environment in subinterpreters? For example `import numpy as np` would cause the statement `import np` when recreating the environment.
125-
- Can we use shared memory instead of pipes?
126-
- Why calling `svc` inside subinterpreters is slower than calling it inside processes (including the time needed to send through pipe + recv ack + recv response + send ack)...?
127-
- Memory leaks...
128-
- Workers are added to the farm at the same time. The environment is serialized for each worker, but it may be just serialized once and shared accross all the workers. However it is not easy: if done during svc_init, how can we share the env accross all the workers and how we choose which worker does the serialization?
95+
print(f"pipeline done in {farm.ffTime()}ms")
96+
```

benchmark/busy_wait/busy_wait.pyx.old

Lines changed: 0 additions & 28 deletions
This file was deleted.
-20.4 KB
Binary file not shown.

benchmark/dummydata.py

Lines changed: 0 additions & 15 deletions
This file was deleted.

benchmark/farm/bench.py

Lines changed: 5 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
from fastflow_module import FFFarm
1+
from fastflow_module import FFFarm, GO_ON
22
import argparse
33
import sys
44
import busy_wait
@@ -33,52 +33,25 @@ def __init__(self, data_sample, n_tasks):
3333
self.n_tasks = n_tasks
3434
self.data_sample = data_sample
3535

36-
def svc_init(self):
37-
print('[emitter] svc_init was called')
38-
return 0
39-
4036
def svc(self, *args):
41-
print(f'[emitter] svc, remaining tasks = {self.n_tasks}')
4237
if self.n_tasks == 0:
43-
return None
38+
return
4439
self.n_tasks -= 1
4540

4641
return DummyData(self.data_sample)
4742

48-
def svc_end(self):
49-
print(f'[emitter] svc_end was called')
50-
5143
class worker():
5244
def __init__(self, ms, id):
5345
self.ms = ms
5446
self.id = id
5547

56-
def svc_init(self):
57-
print(f'[{self.id} | worker] svc_init was called')
58-
return 0
59-
60-
def svc(self, *args):
61-
print(f'[{self.id} | worker] svc')
62-
48+
def svc(self, *args):
6349
busy_wait.wait(self.ms)
64-
6550
return args
6651

67-
def svc_end(self):
68-
print(f'[{self.id} | worker] svc_end was called')
69-
70-
class collector():
71-
def svc_init(self):
72-
print('[collector] svc_init was called')
73-
return 0
74-
52+
class collector():
7553
def svc(self, *args):
76-
print('[collector] svc')
77-
78-
return args
79-
80-
def svc_end(self):
81-
print(f'[collector] svc_end was called')
54+
return GO_ON
8255

8356
def build_farm(n_tasks, task_ms, nworkers, data_sample, use_subinterpreters = False, use_main_thread = False):
8457
farm = FFFarm(use_subinterpreters)

benchmark/introspection.py

Lines changed: 0 additions & 25 deletions
This file was deleted.

benchmark/single_exec.py

Lines changed: 0 additions & 9 deletions
This file was deleted.

benchmark/single_exec_python.py

Lines changed: 0 additions & 25 deletions
This file was deleted.

0 commit comments

Comments
 (0)