- Notifications
You must be signed in to change notification settings - Fork 155
Description
Description:
UnAlignedComputeWorke need support dynamic/streaming scene processing needs improvement.
Proposed Solution:
I. Problem
1.1 Core Issue
Issue 1: Incomplete Window Initialization Logic
Current init() method has the following problems:
- Condition check
processingWindowIdQueue.isEmpty() && windowId <= context.getWindowId()is overly restrictive - In dynamic graph scenarios, windows may arrive out of order, which this logic cannot handle correctly
- Missing protection mechanism against duplicate initialization
- Does not consider continuous window processing in streaming scenarios
Issue 2: Forced Alignment Processing in LoadGraphProcessEvent
LoadGraphProcessEvent is forced to use alignment processing mode, which severely limits the performance advantages of unaligned workers in dynamic graph loading scenarios.
Issue 3: Window Queue Management Defects
In the finishBarrier() method:
- Queue poll operation may fail due to timeout
- Window ID validation range
[currentWindowId - 1, currentWindowId]may not be flexible enough for streaming scenarios - Missing queue recovery mechanism in exception cases
Issue 4: Finish Method Lacks State Synchronization
The finish method directly calls the processor's finish without:
- Checking if queue state is consistent
- Verifying if windows have truly completed processing
- Handling potential race conditions
II. Detailed Improvement Plan
2.1 Improved UnAlignedComputeWorker Class
Solution 1: Enhanced Window Initialization Management
Improvement Goals:
- Support out-of-order window arrival
- Add window state tracking
- Prevent duplicate initialization
- Support streaming continuous window processing
Implementation Plan:
public class UnAlignedComputeWorker<T, R> extends AbstractUnAlignedWorker<T, R> { private static final Logger LOGGER = LoggerFactory.getLogger(UnAlignedComputeWorker.class); // Added: Window state management private enum WindowState { PENDING, // Waiting for initialization INITIALIZED, // Initialized PROCESSING, // Processing FINISHED // Completed } // Added: Window state tracking map private final Map<Long, WindowState> windowStateMap = new ConcurrentHashMap<>(); // Added: Initialized window set private final Set<Long> initializedWindows = ConcurrentHashMap.newKeySet(); // Added: Configuration parameters private int maxPendingWindows = 10; // Maximum pending windows private boolean allowOutOfOrderWindows = true; // Allow out-of-order windows @Override public void init(long windowId) { // Check if window is already initialized if (initializedWindows.contains(windowId)) { LOGGER.warn("taskId {} windowId {} already initialized, skipping", context.getTaskId(), windowId); return; } // Check queue capacity if (processingWindowIdQueue.size() >= maxPendingWindows) { LOGGER.warn("taskId {} processing queue is full, size: {}, windowId: {}", context.getTaskId(), processingWindowIdQueue.size(), windowId); // Optional: Block waiting or throw exception throw new GeaflowRuntimeException("Processing window queue is full"); } long currentWindowId = context.getWindowId(); // Dynamic/streaming scenario improvement logic if (allowOutOfOrderWindows) { // Allow out-of-order: process as long as window is not initialized if (windowId < currentWindowId - maxPendingWindows) { LOGGER.error("taskId {} windowId {} is too old, current: {}", context.getTaskId(), windowId, currentWindowId); throw new GeaflowRuntimeException("Window ID too old: " + windowId); } // Update window state windowStateMap.put(windowId, WindowState.INITIALIZED); initializedWindows.add(windowId); // Call parent initialization (only when queue is empty and windowId is ordered) if (processingWindowIdQueue.isEmpty() && windowId <= currentWindowId) { super.init(windowId); } // Add to processing queue processingWindowIdQueue.add(windowId); LOGGER.info("taskId {} init windowId {} (out-of-order mode), current: {}, queue size: {}", context.getTaskId(), windowId, currentWindowId, processingWindowIdQueue.size()); } else { // Strict ordering mode (original logic) if (processingWindowIdQueue.isEmpty() && windowId <= currentWindowId) { super.init(windowId); windowStateMap.put(windowId, WindowState.INITIALIZED); initializedWindows.add(windowId); } processingWindowIdQueue.add(windowId); LOGGER.info("taskId {} init windowId {} (ordered mode), current: {}, queue size: {}", context.getTaskId(), windowId, currentWindowId, processingWindowIdQueue.size()); } } @Override public void finish(long windowId) { LOGGER.info("taskId {} finishing windowId {}, currentBatchId {}, real currentBatchId {}, queue size: {}", context.getTaskId(), windowId, windowId, context.getCurrentWindowId(), processingWindowIdQueue.size()); // Check window state WindowState state = windowStateMap.get(windowId); if (state == null) { LOGGER.warn("taskId {} windowId {} has no state record", context.getTaskId(), windowId); } else if (state == WindowState.FINISHED) { LOGGER.warn("taskId {} windowId {} already finished", context.getTaskId(), windowId); return; } // Update state to finished windowStateMap.put(windowId, WindowState.FINISHED); // Call processor finish context.getProcessor().finish(windowId); // Complete window processing finishWindow(windowId); // Clean up old window states (prevent memory leaks) cleanupOldWindows(windowId); } // Added: Clean up old window states private void cleanupOldWindows(long currentWindowId) { // Retain recent window states, clean up old ones long threshold = currentWindowId - maxPendingWindows * 2; windowStateMap.keySet().removeIf(wid -> wid < threshold); initializedWindows.removeIf(wid -> wid < threshold); LOGGER.debug("taskId {} cleaned up windows before {}, remaining states: {}", context.getTaskId(), threshold, windowStateMap.size()); } @Override public WorkerType getWorkerType() { return WorkerType.unaligned_compute; } // Added: Configuration methods public void setMaxPendingWindows(int maxPendingWindows) { this.maxPendingWindows = maxPendingWindows; } public void setAllowOutOfOrderWindows(boolean allowOutOfOrderWindows) { this.allowOutOfOrderWindows = allowOutOfOrderWindows; } }2.2 Improve AbstractUnAlignedWorker Base Class
Solution 2: Optimize LoadGraphProcessEvent Handling
Improvement Goals:
- Support unaligned processing of LoadGraphProcessEvent
- Maintain data consistency
- Improve dynamic graph loading performance
Implementation Plan:
Add new methods in AbstractUnAlignedWorker:
/** * Process graph loading events in unaligned manner * Ensure consistency through phased checkpoints */ public void processLoadGraphUnaligned(long fetchCount) { LOGGER.info("taskId {} start unaligned graph loading, fetchCount: {}", context.getTaskId(), fetchCount); // Phase 1: Asynchronously load vertex and edge data long processedCount = 0; Set<Long> receivedWindowIds = new HashSet<>(); while (processedCount < fetchCount && running) { try { InputMessage input = inputReader.poll(DEFAULT_TIMEOUT_MS, TimeUnit.MILLISECONDS); if (input != null) { long windowId = input.getWindowId(); receivedWindowIds.add(windowId); if (input.getMessage() != null) { PipelineMessage message = input.getMessage(); processMessage(windowId, message); processedCount++; } else { // Encounter barrier, record but continue processing long totalCount = input.getWindowCount(); LOGGER.debug("taskId {} received barrier for windowId {}, totalCount: {}", context.getTaskId(), windowId, totalCount); } } } catch (Throwable t) { if (running) { LOGGER.error("Error during unaligned graph loading", t); throw new GeaflowRuntimeException(t); } } } // Phase 2: Wait for barriers of all windows to arrive (ensure consistency) LOGGER.info("taskId {} graph data loaded, waiting for barriers, windows: {}", context.getTaskId(), receivedWindowIds); for (Long windowId : receivedWindowIds) { // Wait for barrier of each window waitForWindowBarrier(windowId); } LOGGER.info("taskId {} completed unaligned graph loading", context.getTaskId()); } private void waitForWindowBarrier(long windowId) { // Wait for barrier of specific window to arrive long startTime = System.currentTimeMillis(); long timeout = 30000; // 30-second timeout while (System.currentTimeMillis() - startTime < timeout) { try { InputMessage input = inputReader.poll(DEFAULT_TIMEOUT_MS, TimeUnit.MILLISECONDS); if (input != null && input.getMessage() == null && input.getWindowId() == windowId) { // Received barrier of target window long totalCount = input.getWindowCount(); processBarrier(windowId, totalCount); return; } } catch (InterruptedException e) { throw new GeaflowRuntimeException(e); } } throw new GeaflowRuntimeException( String.format("Timeout waiting for barrier of window %d", windowId)); }2.3 Improve AbstractIterationComputeCommand
Solution 3: Dynamically Select Processing Mode
Modify AbstractIterationComputeCommand.execute() method:
@Override public void execute(ITaskContext taskContext) { final long start = System.currentTimeMillis(); super.execute(taskContext); AbstractWorker abstractWorker = (AbstractWorker) worker; abstractWorker.init(windowId); fetcherRunner.add(new FetchRequest(((WorkerContext) this.context).getTaskId(), fetchWindowId, fetchCount)); // Improvement: Select processing mode based on configuration and event type boolean useAligned = determineProcessingMode(abstractWorker); abstractWorker.process(fetchCount, useAligned); ((AbstractWorkerContext) this.context).getEventMetrics() .addProcessCostMs(System.currentTimeMillis() - start); } private boolean determineProcessingMode(AbstractWorker worker) { // If it's an aligned worker, always use aligned mode if (worker instanceof AbstractAlignedWorker) { return true; } // Check if it's LoadGraphProcessEvent if (this instanceof LoadGraphProcessEvent) { // Read from configuration whether to force alignment Configuration config = ((AbstractWorkerContext) context).getConfiguration(); boolean forceAlignedForGraphLoad = config.getBoolean( "geaflow.graph.load.force.aligned", false); if (forceAlignedForGraphLoad) { LOGGER.info("taskId {} forcing aligned mode for graph loading", context.getTaskId()); return true; } else { LOGGER.info("taskId {} using unaligned mode for graph loading", context.getTaskId()); return false; } } // Other cases, use unaligned mode return false; }2.4 Enhanced Window Queue Management
Solution 4: Improve finishBarrier Method
Improve in AbstractUnAlignedWorker:
@Override protected void finishBarrier(long totalCount, long processedCount) { // Validate counts if (totalCount != processedCount) { LOGGER.error("taskId {} count mismatch, TotalCount:{} != ProcessCount:{}", context.getTaskId(), totalCount, processedCount); // Decide whether to throw exception or continue based on configuration boolean strictMode = context.getConfiguration() .getBoolean("geaflow.strict.count.check", true); if (strictMode) { throw new GeaflowRuntimeException( String.format("Count mismatch: %d != %d", totalCount, processedCount)); } } context.getEventMetrics().addShuffleReadRecords(totalCount); // Improvement: Add retry and timeout handling long currentWindowId = pollWindowIdWithRetry(); // Validate window ID (relax restrictions to support streaming scenarios) validateWindowId(currentWindowId); // Complete window processing finish(currentWindowId); // Initialize next window super.init(currentWindowId + 1); } private long pollWindowIdWithRetry() { int maxRetries = 3; int retryCount = 0; long timeout = DEFAULT_TIMEOUT_MS; while (retryCount < maxRetries) { try { Long windowId = processingWindowIdQueue.poll(timeout, TimeUnit.MILLISECONDS); if (windowId != null) { LOGGER.debug("taskId {} polled windowId {} from queue", context.getTaskId(), windowId); return windowId; } retryCount++; timeout *= 2; // Exponential backoff LOGGER.warn("taskId {} failed to poll windowId, retry {}/{}", context.getTaskId(), retryCount, maxRetries); } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new GeaflowRuntimeException("Interrupted while polling window ID", e); } } // All retries failed throw new GeaflowRuntimeException( String.format("Failed to poll window ID after %d retries", maxRetries)); } private void validateWindowId(long currentWindowId) { long contextWindowId = context.getCurrentWindowId(); // Relax validation range to support dynamic/streaming scenarios // Allow windows in [contextWindowId - maxWindowSkew, contextWindowId + maxWindowSkew] range int maxWindowSkew = context.getConfiguration() .getInteger("geaflow.window.skew.tolerance", 2); long lowerBound = contextWindowId - maxWindowSkew; long upperBound = contextWindowId + maxWindowSkew; if (currentWindowId < lowerBound || currentWindowId > upperBound) { String errorMessage = String.format( "Window ID %d out of valid range [%d, %d], context window: %d", currentWindowId, lowerBound, upperBound, contextWindowId); LOGGER.error("taskId {} {}", context.getTaskId(), errorMessage); throw new GeaflowRuntimeException(errorMessage); } if (currentWindowId != contextWindowId) { LOGGER.warn("taskId {} window ID mismatch: queue={}, context={}", context.getTaskId(), currentWindowId, contextWindowId); } }III. Configuration Parameter Design
3.1 New Configuration Items
Add in FrameworkConfigKeys:
// Whether to allow unaligned mode for graph loading public static final ConfigKey GRAPH_LOAD_UNALIGNED_ENABLE = ConfigKeys .key("geaflow.graph.load.unaligned.enable") .defaultValue(false) .description("enable unaligned processing for graph loading, default is false"); // Window ID tolerance range public static final ConfigKey WINDOW_SKEW_TOLERANCE = ConfigKeys .key("geaflow.window.skew.tolerance") .defaultValue(2) .description("tolerance for window ID skew in dynamic scenarios"); // Maximum pending windows public static final ConfigKey MAX_PENDING_WINDOWS = ConfigKeys .key("geaflow.max.pending.windows") .defaultValue(10) .description("maximum number of pending windows in unaligned worker"); // Allow out-of-order windows public static final ConfigKey ALLOW_OUT_OF_ORDER_WINDOWS = ConfigKeys .key("geaflow.allow.out.of.order.windows") .defaultValue(true) .description("allow out-of-order window processing in stream scenarios"); // Strict count checking public static final ConfigKey STRICT_COUNT_CHECK = ConfigKeys .key("geaflow.strict.count.check") .defaultValue(true) .description("enable strict count checking for barriers");IV. Test Plan
4.1 Unit Tests
@Test public void testOutOfOrderWindowInitialization() { UnAlignedComputeWorker worker = new UnAlignedComputeWorker(); worker.setAllowOutOfOrderWindows(true); // Test out-of-order window initialization worker.init(5); worker.init(3); worker.init(4); // Verify all windows are correctly initialized assertEquals(3, worker.getInitializedWindows().size()); } @Test public void testDuplicateWindowInitialization() { UnAlignedComputeWorker worker = new UnAlignedComputeWorker(); // Test duplicate initialization worker.init(1); worker.init(1); // Should be ignored assertEquals(1, worker.getProcessingWindowIdQueue().size()); } @Test public void testWindowQueueCapacity() { UnAlignedComputeWorker worker = new UnAlignedComputeWorker(); worker.setMaxPendingWindows(3); // Test queue capacity limit worker.init(1); worker.init(2); worker.init(3); assertThrows(GeaflowRuntimeException.class, () -> { worker.init(4); // Should throw exception }); }4.2 Integration Tests
@Test public void testDynamicGraphStreamProcessing() { // Create dynamic graph streaming scenario Configuration config = new Configuration(); config.put(FrameworkConfigKeys.ASP_ENABLE, true); config.put(FrameworkConfigKeys.ALLOW_OUT_OF_ORDER_WINDOWS, true); config.put(FrameworkConfigKeys.GRAPH_LOAD_UNALIGNED_ENABLE, true); // Build incremental graph view PIncGraphView<Integer, Integer, Integer> incGraphView = buildIncrementalGraphView(config); // Execute traversal for multiple windows for (int i = 0; i < 10; i++) { incGraphView.incrementalTraversal(new TestTraversal()) .start(getRequests()) .sink(new TestSink()); } // Verify result correctness verifyResults(); }V. Performance Optimization Recommendations
5.1 Memory Management
- Automatic Window State Cleanup: Periodically clean up completed window states to prevent memory leaks
- Queue Capacity Limitation: Set reasonable queue size limits to prevent OOM
- Use Weak References: Consider using WeakHashMap for historical window states
5.2 Concurrency Optimization
- Lock-Free Queues: Consider using ConcurrentLinkedQueue instead of LinkedBlockingDeque
- Segmented Locks: Use segmented locks for window state map to reduce contention
- Batch Processing: Batch process window completion events to reduce RPC calls
5.3 Monitoring Metrics
Add the following monitoring metrics:
// Queue length metrics.gauge("unaligned.worker.queue.size", () -> processingWindowIdQueue.size()); // Pending windows count metrics.gauge("unaligned.worker.pending.windows", () -> windowStateMap.size()); // Window processing latency metrics.histogram("unaligned.worker.window.latency"); // Out-of-order window count metrics.counter("unaligned.worker.out.of.order.windows");VI. Migration Path
6.1 Phase 1: Basic Improvements (1-2 weeks)
- Implement enhanced window state management
- Improve init() and finish() methods
- Add basic configuration parameters
6.2 Phase 2: Performance Optimization (2-3 weeks)
- Implement unaligned processing for LoadGraphProcessEvent
- Optimize queue management and barrier handling
- Complete monitoring and logging
6.3 Phase 3: Comprehensive Testing (2 weeks)
- Unit test coverage
- Integration test verification
- Performance stress testing and tuning
6.4 Phase 4: Gradual Rollout (2-3 weeks)
- Small traffic validation
- Gradually expand scope
- Monitoring and issue resolution