Skip to content

Conversation

@cloud-neutral
Copy link

@cloud-neutral cloud-neutral commented Jul 24, 2025

This PR is for:

  • Agent

Feature:

  • 支持多线程并发发送日志,提升高吞吐量场景稳定性
  • 引入了基于 crossbeam-channel 的 全新 MPMC(多生产者多消费者)队列实现,用于替代或补充现有的 ring-buffer 实现。

背景问题

在专属采集器或高速数据面(如 2 Mpps、L4 Flow 限速 4 万条/s)下,现有链路中仅包含单个发送队列和单个 UniformSenderThread,很容易出现写入过快、队列被写满、日志覆盖丢弃的问题,最终影响日志完整性与可观测性。


本次改动实现

✅ 主要改动

在 agent/crates/public/src/queue/ 下新增:mpmc_queue.rs:实现基于 crossbeam_channel 的 Sender、Receiver 和 StatsHandle,支持 MPMC 模型;

修改 queue/mod.rs: 引入并导出 mpmc_queue,增加 bounded_mpmc() 构造函数

✅ 新增功能:
  • 引入可配置的 日志发送并发度参数,包括:

    • l4_flow_senders
    • l7_flow_senders
    • metric_senders
    • pcap_senders
  • 每类日志类型按配置值创建多个:

    • 有界 DebugSender 队列(queue::bounded_with_debug
    • 对应的 UniformSenderThread 实例
  • 所有 sender 命名后缀追加编号(如 3-flowlog-to-collector-sender-0 ~ -3

✅ 日志发送改为并发分发:
  • 负载均衡策略:目前支持:

    • Round-Robin(默认)
    • 哈希分发(基于五元组)可扩展支持
  • 发送线程存入 Vec<UniformSenderThread<_>>,在 Trident::start() 中统一启动,优雅退出时统一停止

✅ 配置示例(YAML):
log: l4_flow_senders: 4 l7_flow_senders: 2 metric_senders: 2 ## ChatGPT/CodeX 设计思路 1. 配置并发度 为每种日志类型新增一个配置项,如 l4_flow_senders、metrics_senders 等,用于指定要创建的 sender 数量。 2. 创建多组队列及线程 根据上述配置,循环调用 queue::bounded_with_debug 和 UniformSenderThread::new 创建若干队列及对应的 UniformSenderThread。可以在队列名称后追加索引区分,例如 "3-flowlog-to-collector-sender-1"、"3-flowlog-to-collector-sender-2" 等。所有生成的线程存入 Vec<UniformSenderThread<_>> 统一管理。 3. 在聚合/生成阶段分发数据 原先的发送链路(如 FlowAggrThread)只向一个 DebugSender 写数据。扩展后,可在 CollectorThread 或 FlowAggrThread 中实现简单的负载均衡策略: - 轮询:对每个输出条目按顺序选择不同的 DebugSender。 - 哈希:根据流的五元组或其他关键字段计算哈希,选择固定的 sender,从而保持同一流的数据顺序。 分发后即可并行写入多个发送队列。 4. 启动与管理多个线程 在 Trident::start() 中遍历 Vec<UniformSenderThread<_>>,逐一调用 start() 启动。停止时同样遍历 notify_stop 或 stop()。 统计信息和异常处理可沿用现有逻辑,只需将每个线程的计数器注册到 stats_collector。 5. 多消费者/MPMC支持 多消费者(MPMC, Multi-Producer Multi-Consumer)支持是对当前队列模型(MPSC: 多生产者单消费者)的架构级增强,其核心目标是提升消费者处理吞吐能力,降低单消费者瓶颈对系统稳定性的影响
@CLAassistant
Copy link

CLAassistant commented Jul 24, 2025

CLA assistant check
All committers have signed the CLA.

shenlan added 24 commits July 24, 2025 13:28
…or-high-throughput Add MPMC queue implementation 引入了基于 crossbeam-channel 的 全新 MPMC(多生产者多消费者)队列实现,用于替代或补充现有的 ring-buffer 实现。 🔧 主要改动 在 agent/crates/public/src/queue/ 下新增: mpmc_queue.rs:实现基于 crossbeam_channel 的 Sender、Receiver 和 StatsHandle,支持 MPMC 模型; 修改 queue/mod.rs: 引入并导出 mpmc_queue,增加 bounded_mpmc() 构造函数
…ub/workflows Replace custom Cirun runners with default GitHub runners
…ersion Update arm64 runner for verify agent
…-deepflow-agent Fix clone derivation for BoxedTaggedFlow
…ation-for-performance Add dynamic queue with expansion
…ent-of-thread-and-queue-settings Add auto tuning for sender configuration
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

2 participants