A scalable inference platform that provides multi-node management and control for AI/ML inference workloads.
It enables easy deployment and management of inference pipelines across distributed nodes with auto-discovery, telemetry, and flexible result publishing.
Quick.demo.of.a.computer.vision.inference.pipeline.builder.mp4
- Multi-engine support: Ultralytics YOLO, Geti, and custom engines
- Auto-discovery: Nodes automatically discover each other on the network
- Real-time telemetry: System monitoring and performance metrics via MQTT
- Flexible result publishing: MQTT, webhooks, serial, and custom destinations
- RESTful API: Complete HTTP API for remote management
- Rate limiting: Built-in rate limiting for all result destinations
- Ultralytics: YOLO object detection models (YOLOv8, YOLOv11, etc.)
- Geti: Intel's computer vision platform
- ONNX Runtime: Cross-platform ML model inference with CPU, OpenVINO, and GPU acceleration
- Pass-through: For testing and development
- Custom: Extensible framework for custom implementations
- MQTT: Publish results to MQTT brokers
- Webhook: HTTP POST to custom endpoints
- Serial: Output to serial ports (RS-232, USB)
- OPC UA: Industrial automation protocol
- ROS2: Robot Operating System 2
- ZeroMQ: High-performance messaging
- Folder: Save to local/network filesystem
- Roboflow: Integration with Roboflow platform
- Geti: Geti platform integration
- Custom: Implement your own destinations
- Python 3.10+
- Compatible with Windows, Linux
- Optional: CUDA for GPU acceleration
- Optional: MQTT broker for telemetry and result publishing
Note: Only tested on a limited set of configurations so far (Windows / Ubuntu) x (Intel / Nvidia) - AMD and more is on the #todo list
# Clone the repository git clone https://github.com/olkham/inference_node.git cd inference_node # Run the setup script (Windows) setup.bat # Or on Linux/macOS chmod +x setup.sh ./setup.sh# Install core dependencies pip install -r requirements.txt # Optional: Install AI/ML frameworks (if not already in requirements.txt) pip install torch torchvision ultralytics geti-sdk # Optional: Install ONNX Runtime (choose based on your hardware) pip install onnxruntime>=1.16.0 # CPU version pip install onnxruntime-openvino>=1.16.0 # Intel OpenVINO acceleration pip install "onnxruntime-gpu[cuda12,cudnn]>=1.16.0" # NVIDIA GPU acceleration # Or use optional dependency groups from pyproject.toml pip install -e .[onnx] # CPU version pip install -e .[onnx-openvino] # Intel OpenVINO pip install -e .[onnx-gpu] # NVIDIA GPU # Optional: Install GPU monitoring (uses nvidia-ml-py, not deprecated pynvml) pip install nvidia-ml-py>=12.0.0 # Optional: Install serial communication pip install pyserial>=3.5from InferenceNode import InferenceNode # Create and start a node node = InferenceNode("MyNode", port=5555) node.start(enable_discovery=True, enable_telemetry=True)Or use the command line:
# Start full node with all services using Flask python main.py # Start full node with all services using waitress (production mode) python main.py --production # Start with custom settings python main.py --port 8080 --name "ProductionNode" --no-telemetryfrom InferenceEngine import InferenceEngine # Create different engine types ie_ultralytics = InferenceEngine('ultralytics') ie_onnx = InferenceEngine('onnx') ie_geti = InferenceEngine('geti') ie_custom = InferenceEngine('custom') # Upload and load a model (Ultralytics example) model_id = ie_ultralytics.upload('path/to/model.pt') ie_ultralytics.load(model_id, device='cuda') # Upload and load an ONNX model onnx_model_id = ie_onnx.upload('path/to/model.onnx') ie_onnx.load(onnx_model_id, device='cpu') # Run inference result = ie_ultralytics.infer('path/to/image.jpg') onnx_result = ie_onnx.infer('path/to/image.jpg')from ResultPublisher import ResultPublisher, ResultDestination # Create result publisher rp = ResultPublisher() # Configure MQTT destination rd_mqtt = ResultDestination('mqtt') rd_mqtt.configure( server='localhost', topic='infernode/results', rate_limit=1.0 # 1 second between publishes ) rp.add(rd_mqtt) # Configure webhook destination rd_webhook = ResultDestination('webhook') rd_webhook.configure( url='http://myserver.com/webhook', rate_limit=0.5 ) rp.add(rd_webhook) # Publish results rp.publish({"inference_results": "data"})GET /api/infoReturns node capabilities and status.
# Load an inference engine POST /api/engine/load { "engine_type": "ultralytics", "model_id": "model_123", "device": "cuda" } # Upload a model POST /api/engine/upload # Form data with file uploadPOST /api/inference # Form data with image file or JSON with image_pathPOST /api/publisher/configure { "type": "mqtt", "config": { "server": "localhost", "topic": "results", "rate_limit": 1.0 } }# Start telemetry POST /api/telemetry/start { "mqtt": { "mqtt_server": "localhost", "mqtt_topic": "telemetry" } } # Stop telemetry POST /api/telemetry/stopinference_node/ βββ InferenceEngine/ # Inference engine implementations β βββ engines/ β β βββ base_engine.py # Base class for all engines β β βββ ultralytics_engine.py # Ultralytics YOLO support β β βββ geti_engine.py # Geti support β β βββ onnx_engine.py # ONNX Runtime support β β βββ pass_engine.py # Pass-through engine β β βββ example_engine_template.py # Custom engine template β βββ inference_engine_factory.py β βββ result_converters.py βββ InferenceNode/ # Main node implementation β βββ inference_node.py # Core node class β βββ pipeline_manager.py # Pipeline orchestration β βββ pipeline.py # Pipeline definitions β βββ discovery_manager.py # Network discovery β βββ telemetry.py # System telemetry β βββ model_repo.py # Model repository β βββ hardware_detector.py # Hardware detection β βββ log_manager.py # Logging β βββ static/ # Web UI assets β βββ templates/ # Web UI templates βββ ResultPublisher/ # Result publishing system β βββ publisher.py # Main publisher class β βββ base_destination.py # Base destination class β βββ result_destinations.py # Built-in destinations β βββ plugins/ # Pluggable destinations β βββ mqtt_destination.py β βββ webhook_destination.py β βββ serial_destination.py β βββ opcua_destination.py β βββ ros2_destination.py β βββ zeromq_destination.py β βββ folder_destination.py β βββ roboflow_destination.py β βββ geti_destination.py β βββ null_destination.py βββ main.py # Entry point βββ setup.bat # Windows setup script βββ setup.sh # Linux/macOS setup script βββ requirements.txt # Dependencies βββ pyproject.toml # Project configuration βββ Dockerfile # Docker container βββ docker-compose.yml # Docker compose configuration βββ readme.md # This file The node can be configured through:
- Command-line arguments:
python main.py --port 8080 --name "MyNode" - Web UI: Access the dashboard at
http://localhost:8080 - REST API: Configure via API endpoints
Default settings:
- Node Port: 5555
- Discovery: Enabled
- Telemetry: Disabled by default
- Model Repository:
InferenceNode/model_repository/models/ - Pipelines:
InferenceNode/pipelines/
InferNode provides comprehensive system monitoring:
- CPU usage and frequency
- Memory utilization
- Disk usage
- Network statistics
- GPU information (NVIDIA)
- Inference performance metrics
Telemetry data is published to MQTT in JSON format:
{ "node_id": "uuid-here", "timestamp": "2025-07-28T10:30:00Z", "cpu": {"usage_percent": 45.2, "count": 8}, "memory": {"usage_percent": 67.3, "total_gb": 16}, "gpu": {"available": true, "devices": [...]} }Nodes automatically discover each other using UDP broadcasts:
from discovery import NodeDiscovery # Discover nodes on network discovered = NodeDiscovery.discover_nodes(timeout=5.0) for node_id, info in discovered.items(): print(f"Found node: {node_id} at {info['address']}")from InferenceEngine.base_engine import BaseInferenceEngine class MyCustomEngine(BaseInferenceEngine): def _load_model(self): # Load your model pass def _preprocess(self, image): # Preprocess input return processed_image def _infer(self, preprocessed_input): # Run inference return raw_output def _postprocess(self, raw_output): # Process results return final_resultsfrom ResultPublisher.result_destinations import BaseResultDestination class MyCustomDestination(BaseResultDestination): def configure(self, **kwargs): # Configure your destination self.is_configured = True def _publish(self, data): # Publish data to your destination return True # Success- Fork the repository
- Create a feature branch
- Add tests for new functionality
- Ensure all tests pass
- Submit a pull request
- Issue: Geti SDK support is limited to Python 3.10-3.13 only
- Impact: Users running Python 3.14+ cannot use Geti integration features
- Workaround:
- Use Python 3.10-3.13 for Geti functionality
- Or install in a separate virtual environment with a compatible Python version
- Geti SDK is in optional dependencies and won't block installation on incompatible Python versions
- Issue: On first run with Ultralytics models on Intel hardware, nodes may report failure to start
- Cause: Extra dependencies and model downloads required for OpenVINO conversion are not pre-installed
- Impact: Initial startup may fail or take longer than expected
- Workaround:
- Re-run the node after the initial failure - subsequent starts should work correctly
- The required dependencies will be downloaded automatically on first run
This project is licensed under the Apache 2.0 License - see the LICENSE file for details.
For questions and support:
- Create an issue on GitHub
- Check the documentation
- Review the example code
- Web-based management interface
- Integration with FrameSource library
- Docker containers and orchestration
- Advanced load balancing
- Model versioning and A/B testing
- Enhanced pipeline builder UI
- Additional inference engine integrations