Skip to content

[refactor] Dynamic/streaming scene processing needs improvement. #660

@kaori-seasons

Description

@kaori-seasons

Description:

UnAlignedComputeWorke need support dynamic/streaming scene processing needs improvement.

Proposed Solution:

I. Problem

1.1 Core Issue

Issue 1: Incomplete Window Initialization Logic

Current init() method has the following problems:

  • Condition check processingWindowIdQueue.isEmpty() && windowId <= context.getWindowId() is overly restrictive
  • In dynamic graph scenarios, windows may arrive out of order, which this logic cannot handle correctly
  • Missing protection mechanism against duplicate initialization
  • Does not consider continuous window processing in streaming scenarios

Issue 2: Forced Alignment Processing in LoadGraphProcessEvent

LoadGraphProcessEvent is forced to use alignment processing mode, which severely limits the performance advantages of unaligned workers in dynamic graph loading scenarios.

Issue 3: Window Queue Management Defects

In the finishBarrier() method:

  • Queue poll operation may fail due to timeout
  • Window ID validation range [currentWindowId - 1, currentWindowId] may not be flexible enough for streaming scenarios
  • Missing queue recovery mechanism in exception cases

Issue 4: Finish Method Lacks State Synchronization

The finish method directly calls the processor's finish without:

  • Checking if queue state is consistent
  • Verifying if windows have truly completed processing
  • Handling potential race conditions

II. Detailed Improvement Plan

2.1 Improved UnAlignedComputeWorker Class

Solution 1: Enhanced Window Initialization Management

Improvement Goals:

  • Support out-of-order window arrival
  • Add window state tracking
  • Prevent duplicate initialization
  • Support streaming continuous window processing

Implementation Plan:

public class UnAlignedComputeWorker<T, R> extends AbstractUnAlignedWorker<T, R> { private static final Logger LOGGER = LoggerFactory.getLogger(UnAlignedComputeWorker.class); // Added: Window state management private enum WindowState { PENDING, // Waiting for initialization INITIALIZED, // Initialized PROCESSING, // Processing FINISHED // Completed } // Added: Window state tracking map private final Map<Long, WindowState> windowStateMap = new ConcurrentHashMap<>(); // Added: Initialized window set private final Set<Long> initializedWindows = ConcurrentHashMap.newKeySet(); // Added: Configuration parameters private int maxPendingWindows = 10; // Maximum pending windows private boolean allowOutOfOrderWindows = true; // Allow out-of-order windows @Override public void init(long windowId) { // Check if window is already initialized if (initializedWindows.contains(windowId)) { LOGGER.warn("taskId {} windowId {} already initialized, skipping", context.getTaskId(), windowId); return; } // Check queue capacity if (processingWindowIdQueue.size() >= maxPendingWindows) { LOGGER.warn("taskId {} processing queue is full, size: {}, windowId: {}", context.getTaskId(), processingWindowIdQueue.size(), windowId); // Optional: Block waiting or throw exception throw new GeaflowRuntimeException("Processing window queue is full"); } long currentWindowId = context.getWindowId(); // Dynamic/streaming scenario improvement logic if (allowOutOfOrderWindows) { // Allow out-of-order: process as long as window is not initialized if (windowId < currentWindowId - maxPendingWindows) { LOGGER.error("taskId {} windowId {} is too old, current: {}", context.getTaskId(), windowId, currentWindowId); throw new GeaflowRuntimeException("Window ID too old: " + windowId); } // Update window state windowStateMap.put(windowId, WindowState.INITIALIZED); initializedWindows.add(windowId); // Call parent initialization (only when queue is empty and windowId is ordered) if (processingWindowIdQueue.isEmpty() && windowId <= currentWindowId) { super.init(windowId); } // Add to processing queue processingWindowIdQueue.add(windowId); LOGGER.info("taskId {} init windowId {} (out-of-order mode), current: {}, queue size: {}", context.getTaskId(), windowId, currentWindowId, processingWindowIdQueue.size()); } else { // Strict ordering mode (original logic) if (processingWindowIdQueue.isEmpty() && windowId <= currentWindowId) { super.init(windowId); windowStateMap.put(windowId, WindowState.INITIALIZED); initializedWindows.add(windowId); } processingWindowIdQueue.add(windowId); LOGGER.info("taskId {} init windowId {} (ordered mode), current: {}, queue size: {}", context.getTaskId(), windowId, currentWindowId, processingWindowIdQueue.size()); } } @Override public void finish(long windowId) { LOGGER.info("taskId {} finishing windowId {}, currentBatchId {}, real currentBatchId {}, queue size: {}", context.getTaskId(), windowId, windowId, context.getCurrentWindowId(), processingWindowIdQueue.size()); // Check window state WindowState state = windowStateMap.get(windowId); if (state == null) { LOGGER.warn("taskId {} windowId {} has no state record", context.getTaskId(), windowId); } else if (state == WindowState.FINISHED) { LOGGER.warn("taskId {} windowId {} already finished", context.getTaskId(), windowId); return; } // Update state to finished windowStateMap.put(windowId, WindowState.FINISHED); // Call processor finish context.getProcessor().finish(windowId); // Complete window processing finishWindow(windowId); // Clean up old window states (prevent memory leaks) cleanupOldWindows(windowId); } // Added: Clean up old window states private void cleanupOldWindows(long currentWindowId) { // Retain recent window states, clean up old ones long threshold = currentWindowId - maxPendingWindows * 2; windowStateMap.keySet().removeIf(wid -> wid < threshold); initializedWindows.removeIf(wid -> wid < threshold); LOGGER.debug("taskId {} cleaned up windows before {}, remaining states: {}", context.getTaskId(), threshold, windowStateMap.size()); } @Override public WorkerType getWorkerType() { return WorkerType.unaligned_compute; } // Added: Configuration methods public void setMaxPendingWindows(int maxPendingWindows) { this.maxPendingWindows = maxPendingWindows; } public void setAllowOutOfOrderWindows(boolean allowOutOfOrderWindows) { this.allowOutOfOrderWindows = allowOutOfOrderWindows; } }

2.2 Improve AbstractUnAlignedWorker Base Class

Solution 2: Optimize LoadGraphProcessEvent Handling

Improvement Goals:

  • Support unaligned processing of LoadGraphProcessEvent
  • Maintain data consistency
  • Improve dynamic graph loading performance

Implementation Plan:

Add new methods in AbstractUnAlignedWorker:

/**  * Process graph loading events in unaligned manner  * Ensure consistency through phased checkpoints  */ public void processLoadGraphUnaligned(long fetchCount) { LOGGER.info("taskId {} start unaligned graph loading, fetchCount: {}", context.getTaskId(), fetchCount); // Phase 1: Asynchronously load vertex and edge data long processedCount = 0; Set<Long> receivedWindowIds = new HashSet<>(); while (processedCount < fetchCount && running) { try { InputMessage input = inputReader.poll(DEFAULT_TIMEOUT_MS, TimeUnit.MILLISECONDS); if (input != null) { long windowId = input.getWindowId(); receivedWindowIds.add(windowId); if (input.getMessage() != null) { PipelineMessage message = input.getMessage(); processMessage(windowId, message); processedCount++; } else { // Encounter barrier, record but continue processing long totalCount = input.getWindowCount(); LOGGER.debug("taskId {} received barrier for windowId {}, totalCount: {}", context.getTaskId(), windowId, totalCount); } } } catch (Throwable t) { if (running) { LOGGER.error("Error during unaligned graph loading", t); throw new GeaflowRuntimeException(t); } } } // Phase 2: Wait for barriers of all windows to arrive (ensure consistency) LOGGER.info("taskId {} graph data loaded, waiting for barriers, windows: {}", context.getTaskId(), receivedWindowIds); for (Long windowId : receivedWindowIds) { // Wait for barrier of each window waitForWindowBarrier(windowId); } LOGGER.info("taskId {} completed unaligned graph loading", context.getTaskId()); } private void waitForWindowBarrier(long windowId) { // Wait for barrier of specific window to arrive long startTime = System.currentTimeMillis(); long timeout = 30000; // 30-second timeout while (System.currentTimeMillis() - startTime < timeout) { try { InputMessage input = inputReader.poll(DEFAULT_TIMEOUT_MS, TimeUnit.MILLISECONDS); if (input != null && input.getMessage() == null && input.getWindowId() == windowId) { // Received barrier of target window long totalCount = input.getWindowCount(); processBarrier(windowId, totalCount); return; } } catch (InterruptedException e) { throw new GeaflowRuntimeException(e); } } throw new GeaflowRuntimeException( String.format("Timeout waiting for barrier of window %d", windowId)); }

2.3 Improve AbstractIterationComputeCommand

Solution 3: Dynamically Select Processing Mode

Modify AbstractIterationComputeCommand.execute() method:

@Override public void execute(ITaskContext taskContext) { final long start = System.currentTimeMillis(); super.execute(taskContext); AbstractWorker abstractWorker = (AbstractWorker) worker; abstractWorker.init(windowId); fetcherRunner.add(new FetchRequest(((WorkerContext) this.context).getTaskId(), fetchWindowId, fetchCount)); // Improvement: Select processing mode based on configuration and event type boolean useAligned = determineProcessingMode(abstractWorker); abstractWorker.process(fetchCount, useAligned); ((AbstractWorkerContext) this.context).getEventMetrics() .addProcessCostMs(System.currentTimeMillis() - start); } private boolean determineProcessingMode(AbstractWorker worker) { // If it's an aligned worker, always use aligned mode if (worker instanceof AbstractAlignedWorker) { return true; } // Check if it's LoadGraphProcessEvent if (this instanceof LoadGraphProcessEvent) { // Read from configuration whether to force alignment Configuration config = ((AbstractWorkerContext) context).getConfiguration(); boolean forceAlignedForGraphLoad = config.getBoolean( "geaflow.graph.load.force.aligned", false); if (forceAlignedForGraphLoad) { LOGGER.info("taskId {} forcing aligned mode for graph loading", context.getTaskId()); return true; } else { LOGGER.info("taskId {} using unaligned mode for graph loading", context.getTaskId()); return false; } } // Other cases, use unaligned mode return false; }

2.4 Enhanced Window Queue Management

Solution 4: Improve finishBarrier Method

Improve in AbstractUnAlignedWorker:

@Override protected void finishBarrier(long totalCount, long processedCount) { // Validate counts if (totalCount != processedCount) { LOGGER.error("taskId {} count mismatch, TotalCount:{} != ProcessCount:{}", context.getTaskId(), totalCount, processedCount); // Decide whether to throw exception or continue based on configuration boolean strictMode = context.getConfiguration() .getBoolean("geaflow.strict.count.check", true); if (strictMode) { throw new GeaflowRuntimeException( String.format("Count mismatch: %d != %d", totalCount, processedCount)); } } context.getEventMetrics().addShuffleReadRecords(totalCount); // Improvement: Add retry and timeout handling long currentWindowId = pollWindowIdWithRetry(); // Validate window ID (relax restrictions to support streaming scenarios) validateWindowId(currentWindowId); // Complete window processing finish(currentWindowId); // Initialize next window super.init(currentWindowId + 1); } private long pollWindowIdWithRetry() { int maxRetries = 3; int retryCount = 0; long timeout = DEFAULT_TIMEOUT_MS; while (retryCount < maxRetries) { try { Long windowId = processingWindowIdQueue.poll(timeout, TimeUnit.MILLISECONDS); if (windowId != null) { LOGGER.debug("taskId {} polled windowId {} from queue", context.getTaskId(), windowId); return windowId; } retryCount++; timeout *= 2; // Exponential backoff LOGGER.warn("taskId {} failed to poll windowId, retry {}/{}", context.getTaskId(), retryCount, maxRetries); } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new GeaflowRuntimeException("Interrupted while polling window ID", e); } } // All retries failed throw new GeaflowRuntimeException( String.format("Failed to poll window ID after %d retries", maxRetries)); } private void validateWindowId(long currentWindowId) { long contextWindowId = context.getCurrentWindowId(); // Relax validation range to support dynamic/streaming scenarios // Allow windows in [contextWindowId - maxWindowSkew, contextWindowId + maxWindowSkew] range int maxWindowSkew = context.getConfiguration() .getInteger("geaflow.window.skew.tolerance", 2); long lowerBound = contextWindowId - maxWindowSkew; long upperBound = contextWindowId + maxWindowSkew; if (currentWindowId < lowerBound || currentWindowId > upperBound) { String errorMessage = String.format( "Window ID %d out of valid range [%d, %d], context window: %d", currentWindowId, lowerBound, upperBound, contextWindowId); LOGGER.error("taskId {} {}", context.getTaskId(), errorMessage); throw new GeaflowRuntimeException(errorMessage); } if (currentWindowId != contextWindowId) { LOGGER.warn("taskId {} window ID mismatch: queue={}, context={}", context.getTaskId(), currentWindowId, contextWindowId); } }

III. Configuration Parameter Design

3.1 New Configuration Items

Add in FrameworkConfigKeys:

// Whether to allow unaligned mode for graph loading public static final ConfigKey GRAPH_LOAD_UNALIGNED_ENABLE = ConfigKeys .key("geaflow.graph.load.unaligned.enable") .defaultValue(false) .description("enable unaligned processing for graph loading, default is false"); // Window ID tolerance range public static final ConfigKey WINDOW_SKEW_TOLERANCE = ConfigKeys .key("geaflow.window.skew.tolerance") .defaultValue(2) .description("tolerance for window ID skew in dynamic scenarios"); // Maximum pending windows public static final ConfigKey MAX_PENDING_WINDOWS = ConfigKeys .key("geaflow.max.pending.windows") .defaultValue(10) .description("maximum number of pending windows in unaligned worker"); // Allow out-of-order windows public static final ConfigKey ALLOW_OUT_OF_ORDER_WINDOWS = ConfigKeys .key("geaflow.allow.out.of.order.windows") .defaultValue(true) .description("allow out-of-order window processing in stream scenarios"); // Strict count checking public static final ConfigKey STRICT_COUNT_CHECK = ConfigKeys .key("geaflow.strict.count.check") .defaultValue(true) .description("enable strict count checking for barriers");

IV. Test Plan

4.1 Unit Tests

@Test public void testOutOfOrderWindowInitialization() { UnAlignedComputeWorker worker = new UnAlignedComputeWorker(); worker.setAllowOutOfOrderWindows(true); // Test out-of-order window initialization worker.init(5); worker.init(3); worker.init(4); // Verify all windows are correctly initialized assertEquals(3, worker.getInitializedWindows().size()); } @Test public void testDuplicateWindowInitialization() { UnAlignedComputeWorker worker = new UnAlignedComputeWorker(); // Test duplicate initialization worker.init(1); worker.init(1); // Should be ignored assertEquals(1, worker.getProcessingWindowIdQueue().size()); } @Test public void testWindowQueueCapacity() { UnAlignedComputeWorker worker = new UnAlignedComputeWorker(); worker.setMaxPendingWindows(3); // Test queue capacity limit worker.init(1); worker.init(2); worker.init(3); assertThrows(GeaflowRuntimeException.class, () -> { worker.init(4); // Should throw exception }); }

4.2 Integration Tests

@Test public void testDynamicGraphStreamProcessing() { // Create dynamic graph streaming scenario Configuration config = new Configuration(); config.put(FrameworkConfigKeys.ASP_ENABLE, true); config.put(FrameworkConfigKeys.ALLOW_OUT_OF_ORDER_WINDOWS, true); config.put(FrameworkConfigKeys.GRAPH_LOAD_UNALIGNED_ENABLE, true); // Build incremental graph view PIncGraphView<Integer, Integer, Integer> incGraphView = buildIncrementalGraphView(config); // Execute traversal for multiple windows for (int i = 0; i < 10; i++) { incGraphView.incrementalTraversal(new TestTraversal()) .start(getRequests()) .sink(new TestSink()); } // Verify result correctness verifyResults(); }

V. Performance Optimization Recommendations

5.1 Memory Management

  1. Automatic Window State Cleanup: Periodically clean up completed window states to prevent memory leaks
  2. Queue Capacity Limitation: Set reasonable queue size limits to prevent OOM
  3. Use Weak References: Consider using WeakHashMap for historical window states

5.2 Concurrency Optimization

  1. Lock-Free Queues: Consider using ConcurrentLinkedQueue instead of LinkedBlockingDeque
  2. Segmented Locks: Use segmented locks for window state map to reduce contention
  3. Batch Processing: Batch process window completion events to reduce RPC calls

5.3 Monitoring Metrics

Add the following monitoring metrics:

// Queue length metrics.gauge("unaligned.worker.queue.size", () -> processingWindowIdQueue.size()); // Pending windows count metrics.gauge("unaligned.worker.pending.windows", () -> windowStateMap.size()); // Window processing latency metrics.histogram("unaligned.worker.window.latency"); // Out-of-order window count metrics.counter("unaligned.worker.out.of.order.windows");

VI. Migration Path

6.1 Phase 1: Basic Improvements (1-2 weeks)

  • Implement enhanced window state management
  • Improve init() and finish() methods
  • Add basic configuration parameters

6.2 Phase 2: Performance Optimization (2-3 weeks)

  • Implement unaligned processing for LoadGraphProcessEvent
  • Optimize queue management and barrier handling
  • Complete monitoring and logging

6.3 Phase 3: Comprehensive Testing (2 weeks)

  • Unit test coverage
  • Integration test verification
  • Performance stress testing and tuning

6.4 Phase 4: Gradual Rollout (2-3 weeks)

  • Small traffic validation
  • Gradually expand scope
  • Monitoring and issue resolution

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions