Unlock the full potential of parallel computing in Python with FastFlow, a powerful C++ library now available in Python, that brings high-performance, scalable parallelism right to your fingertips.
- 🤩 Implement advanced parallel patterns and building blocks, like pipelines, farms, and all-to-all, with ease.
 - 🚀🚀 Experience lightning-fast parallel execution with zero boilerplate code.
 - 💡 It overcomes Python’s Global Interpreter Lock (GIL) limitations for you. You can leverage on multiple processes or subinterpreters.
 
Whether you’re processing massive datasets, building scalable applications, or designing high-throughput systems, FastFlow delivers the speed and efficiency you've been looking for.
To install fastflow, ensure you have the following dependencies: python3, python3-venv and python3-dev. For example on Ubuntu you can install them using apt install. Ensure you have updated submodules by running git submodule init && git submodule update.
- From the root directory, create virtual environment via 
python3 -m venv .venv - From the root directory, activate the virtual environment by running 
source .venv/bin/activate - Build and install by running 
makefrom the root directory 
  The Farm pattern consists of an emitter, multiple workers, and a collector. It distributes tasks from the emitter to workers and collects the results in the collector. Both emitter and collector are optional.
from fastflow import FFFarm, EOS class source(): def __init__(self): self.counter = 1 def svc(self, *arg): if self.counter == 12: return EOS self.counter += 1 return list(['source']) class worker(): def __init__(self, id): self.id = id def svc(self, lis: list): lis.append(self.id) return lis class sink(): def svc(self, lis: list): print(lis) # create a farm, pass True as argument if you want to use subinterpreters farm = FFFarm() sourcenode = source() sinknode = sink() w_lis = [] for i in range(3): w = worker(f"worker{i+1}") w_lis.append(w) farm.add_emitter(sourcenode) farm.add_workers(w_lis) farm.add_collector(sinknode) # finally run the farm. Blocking call: will resume when the farm ends farm.run_and_wait_end()The Pipeline pattern consists of multiple stages that process tasks in a linear sequence, where the output of one stage becomes the input of the next.
from fastflow import FFPipeline, EOS import sys class source(): def __init__(self): self.counter = 1 def svc(self): if self.counter > 5: return EOS self.counter += 1 return list(["source"]) class stage(): def __init__(self, id): self.id = id def svc(self, lis: list): lis.append(self.id) return lis class sink(): def svc(self, lis: list): lis.append("sink") print(lis) # create a pipeline, pass True if you want to use subinterpreters pipe = FFPipeline(True) # create the first stage sourcenode = source() # create the second stage st1 = stage('st1') # create last stage sinknode = sink() # add all the stages pipe.add_stage(sourcenode) pipe.add_stage(st1) pipe.add_stage(sinknode) # finally run the pipeline. Blocking call: will resume when the pipeline ends pipe.run_and_wait_end() # print how many milliseconds the pipeline took print(f"pipeline done in {farm.ffTime()}ms")
  The All-to-All pattern connects multiple nodes to the left to other multiple nodes to the right, where every node to the left can send data to every node to the right.
from fastflow import FFAllToAll, EOS class leftnode(): def __init__(self, id): self.counter = 1 self.id = id def svc(self, *arg): if self.counter > 5: return EOS self.counter += 1 return list([self.id]) class rightnode(): def __init__(self, id): self.id = id def svc(self, lis: list): lis.append(f"rightnode{self.id}") print(lis) # All-to-All with 4 nodes to the left and 3 nodes to the right a2a = FFAllToAll() # Create and add left nodes a2a.add_firstset([leftnode(i+1) for i in range(4)]) # Create and add right nodes a2a.add_secondset([rightnode(i+1) for i in range(3)]) # Run All-to-All a2a.run_and_wait_end()In the All-to-All building block, two different scheduling policies can be used to distribute tasks from the left nodes to the right nodes:
- Round Robin (default): This policy distributes tasks evenly across the available right nodes in a cyclic manner. Each task is assigned to the next available right node in turn, regardless of the task's nature or the load on the right nodes. This method is efficient when tasks are uniform in size and processing time.
 - On-Demand: In this policy, tasks are only sent to a right node when that node is ready to receive more tasks. It dynamically adapts to the load of each right node, making it more suited for scenarios where tasks have varying sizes or processing times. To enable on-demand scheduling, you can use the 
ondemand=Trueoption in theadd_firstset()method. 
The following is an example of On-Demand scheduling:
from fastflow import FFAllToAll, EOS class source(): def __init__(self, id): self.counter = 1 self.id = id def svc(self, *arg): if self.counter > 5: return EOS self.counter += 1 return [f"data from source {self.id}"] class sink(): def __init__(self, id): self.id = id def svc(self, lis: list): lis.append(f"processed by sink {self.id}") print(lis) # All-to-All with on-demand scheduling a2a = FFAllToAll() # First stage (sources) first_lis = [source(i+1) for i in range(3)] # Second stage (sinks) second_lis = [sink(i+1) for i in range(2)] # Add stages using on-demand scheduling a2a.add_firstset(first_lis, ondemand=True) a2a.add_secondset(second_lis) # Run All-to-All a2a.run_and_wait_end()You can specify svc_init and svc_end for Initialization and Cleanup purposes. This example shows how to use svc_init for initialization and svc_end for finalization logic in stages.
from fastflow import FFAllToAll, EOS class source(): def __init__(self, id): self.id = id self.counter = 1 def svc_init(self): print(f"Source {self.id} initialized") def svc(self, *arg): if self.counter > 5: return EOS self.counter += 1 return [self.id] def svc_end(self): print(f"Source {self.id} finished") class sink(): def __init__(self, id): self.id = id def svc_init(self): print(f"Sink {self.id} initialized") def svc(self, lis: list): lis.append(f"sink{self.id}") print(lis) def svc_end(self): print(f"Sink {self.id} finished") # All-to-All setup a2a = FFAllToAll() first_stage_size = 3 second_stage_size = 3 # Create and add first stages (sources) first_lis = [source(i+1) for i in range(first_stage_size)] a2a.add_firstset(first_lis) # Create and add second stages (sinks) second_lis = [sink(i+1) for i in range(second_stage_size)] a2a.add_secondset(second_lis) # Run All-to-All a2a.run_and_wait_end()You can send data multiple times or to specific nodes using ff_send_out.
from fastflow import FFAllToAll, EOS, ff_send_out class source(): def svc(self, *arg): # Send data items to sink1 and sink3 ff_send_out(["source-to-sink2"], 1) ff_send_out(["source-to-sink3"], 2) # Send multiple data items to any node ff_send_out(["source-to-any"]) ff_send_out(["source-to-any"]) return EOS class sink(): def __init__(self, id): self.id = id def svc(self, lis: list): lis.append(f"sink{self.id}") print(lis) # All-to-All setup a2a = FFAllToAll() first_stage_size = 4 second_stage_size = 3 # Create and add first stage (sources) first_lis = [source() for i in range(first_stage_size)] a2a.add_firstset(first_lis) # Create and add second stage (sinks) second_lis = [sink(i+1) for i in range(second_stage_size)] a2a.add_secondset(second_lis) # Run All-to-All a2a.run_and_wait_end()This example demonstrates combining All-to-All inside a Pipeline. The pipeline uses multiple stages where one stage is an All-to-All pattern.
from fastflow import FFPipeline, FFAllToAll, EOS class source(): def __init__(self): self.counter = 1 def svc(self, *arg): if self.counter > 5: return EOS self.counter += 1 return [f"data{self.counter}"] class stage(): def svc(self, lis: list): lis.append(f"stage") return lis class sink(): def svc(self, lis: list): lis.append("sink") print(lis) # Inner All-to-All setup a2a = FFAllToAll() first_lis = [stage() for _ in range(3)] second_lis = [stage() for _ in range(2)] a2a.add_firstset(first_lis) a2a.add_secondset(second_lis) # Pipeline setup pipeline = FFPipeline() pipeline.add_stage(source()) pipeline.add_stage(a2a) # All-to-All as a stage in the pipeline pipeline.add_stage(sink()) # Run pipeline pipeline.run_and_wait_end()This example shows Pipeline inside an All-to-All stage, combining the two patterns.
from fastflow import FFAllToAll, FFPipeline, EOS class source(): def __init__(self): self.counter = 1 def svc(self, *arg): if self.counter > 5: return EOS self.counter += 1 return [f"data{self.counter}"] class stage(): def svc(self, lis: list): lis.append(f"stage") return lis class sink(): def svc(self, lis: list): lis.append(f"sink") print(lis) # build a Pipeline to be used inside the second stage of All-to-All def build_pipeline(): pipeline = FFPipeline() pipeline.add_stage(stage()) pipeline.add_stage(stage()) pipeline.add_stage(sink()) return pipeline # All-to-All setup a2a = FFAllToAll() first_lis = [source() for _ in range(3)] # 3 first stages second_lis = [build_pipeline() for _ in range(2)] # 2 Pipelines in the second stage a2a.add_firstset(first_lis) a2a.add_secondset(second_lis) # Run All-to-All with embedded pipelines a2a.run_and_wait_end()The blocking_mode(boolean) function allows you to control whether the nodes block and wait for input or continuously check for data. Setting blocking_mode(True) enables blocking mode, reducing resource usage, while blocking_mode(False) disables it, making the nodes more responsive but more resource-hungry. By default, building blocks have blocking mode disabled. All the building blocks support it, this is an example using a Farm:
from fastflow import FFFarm, EOS class source(): def __init__(self): self.counter = 1 def svc(self, *arg): if self.counter > 5: return EOS self.counter += 1 return [f"data{self.counter}"] class worker(): def svc(self, lis: list): lis.append(f"processed") return lis class sink(): def svc(self, lis: list): print(lis) # Create a Farm with blocking mode enabled farm = FFFarm() farm.blocking_mode(True) # Enable blocking mode sourcenode = source() sinknode = sink() workers = [worker() for _ in range(3)] farm.add_emitter(sourcenode) farm.add_workers(workers) farm.add_collector(sinknode) farm.run_and_wait_end()By default, each nodes is set to run to one and only one core, improving performances. The no_mapping() function disables FastFlow's feature of pinning nodes to specific cores, allowing more flexibility in resource allocation but potentially increasing overhead due to movement between cores. All the building blocks support it, this is an example using a Pipeline:
from fastflow import FFPipeline, EOS class source(): def __init__(self): self.counter = 1 def svc(self): if self.counter > 5: return EOS self.counter += 1 return [f"data{self.counter}"] class stage(): def svc(self, lis: list): lis.append(f"processed") return lis class sink(): def svc(self, lis: list): lis.append("final") print(lis) # Create a Pipeline with no mapping pipeline = FFPipeline() pipeline.no_mapping() # Disable core pinning for nodes pipeline.add_stage(source()) pipeline.add_stage(stage()) pipeline.add_stage(sink()) pipeline.run_and_wait_end()To test the CI locally (optional), useful to test changes before actually change the whole ci, pip install cibuildwheel and then run cibuildwheel --platform linux or make ci.
Contributions are welcome! Please submit pull requests or open issues to help improve this project.