温馨提示×

Linux下Rust并发编程实战

小樊
41
2025-11-14 17:26:21
栏目: 编程语言

Linux 下 Rust 并发编程实战指南

一 环境准备与项目初始化

  • Linux 上安装 Rust:执行命令:curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh,随后执行 source $HOME/.cargo/env 重载环境。
  • 创建项目:cargo new concurrent_demo && cd concurrent_demo
  • 运行与调试:cargo run;需要调试时使用 gdblldb,也可借助 IDE 内置调试功能。

二 核心并发模型与最小示例

  • 线程与通道

    • 使用 std::thread 创建线程,借助 std::sync::mpsc 进行线程间消息传递(MPSC)。
    • 示例要点:将发送端 tx 通过 move 转移到子线程;主线程通过 recv() 接收数据。
  • 共享内存与同步原语

    • 使用 Arc(原子引用计数)共享所有权,配合 Mutex/RwLock 保护共享可变状态;读多写少场景优先考虑 RwLock
    • 示例要点:对共享数据加锁后操作,完成后及时释放锁;多线程 join() 等待回收。
  • 原子操作

    • 对简单计数器等场景,使用 std::sync::atomic 提供的原子类型(如 AtomicUsize)与合适的 Ordering(如 SeqCst)可避免锁开销。
  • 异步并发 I/O

    • 使用 async/awaittokio 运行时编写高并发网络服务;典型模式是 TcpListener 接收连接,并为每个连接 tokio::spawn 一个任务处理读写循环。
  • 并行数据并行

    • 使用 rayon 的并行迭代器对计算密集型数据进行分治并行,简化线程池与任务调度。
  • 第三方通道与 Actor 生态

    • 需要更灵活的线程通信时,可使用 crossbeam::channel(支持多生产者多消费者等);若偏好 Actor 模型,可选 actix

三 实战示例 多线程并行计算与原子计数

  • 目标:启动 10 个线程并发累加,分别用 MutexAtomicUsize 两种实现,最后对比结果与性能感受。
  • 代码:
// Cargo.toml // [dependencies] use std::sync::{Arc, Mutex}; use std::sync::atomic::{AtomicUsize, Ordering}; use std::thread; fn main() { const N: usize = 10; const INCR: usize = 100_000; // 1) 使用 Mutex + Arc let mtx = Arc::new(Mutex::new(0usize)); let mut handles = Vec::with_capacity(N); for _ in 0..N { let m = Arc::clone(&mtx); handles.push(thread::spawn(move || { for _ in 0..INCR { *m.lock().unwrap() += 1; } })); } for h in handles { h.join().unwrap(); } println!("Mutex 结果: {}", *mtx.lock().unwrap()); // 2) 使用 AtomicUsize let atomic = Arc::new(AtomicUsize::new(0)); let mut handles = Vec::with_capacity(N); for _ in 0..N { let a = Arc::clone(&atomic); handles.push(thread::spawn(move || { for _ in 0..INCR { a.fetch_add(1, Ordering::SeqCst); } })); } for h in handles { h.join().unwrap(); } println!("Atomic 结果: {}", atomic.load(Ordering::SeqCst)); } 
  • 说明:两种实现均应输出 1_000_000;计算密集型且竞争较低时,原子操作通常开销更小,Mutex 更通用但存在锁竞争与阻塞。

四 实战示例 异步高并发回显服务器

  • 目标:基于 tokio 实现一个 TCP 回显服务器,对每条连接并发处理,读到 0 字节 表示对端关闭。
  • 代码:
// Cargo.toml // [dependencies] // tokio = { version = "1", features = ["full"] } use tokio::net::TcpListener; use tokio::io; #[tokio::main] async fn main() -> io::Result<()> { let listener = TcpListener::bind("127.0.0.1:8080").await?; println!("Listening on 127.0.0.1:8080"); loop { let (mut socket, _) = listener.accept().await?; tokio::spawn(async move { let mut buf = vec![0u8; 1024]; loop { let n = match socket.read(&mut buf).await { Ok(0) => return, // 对端关闭 Ok(n) => n, Err(e) => { eprintln!("read error: {}", e); return; } }; if let Err(e) = socket.write_all(&buf[..n]).await { eprintln!("write error: {}", e); return; } } }); } } 
  • 说明:每个连接由独立任务并发处理,适合 I/O 密集型 高并发场景;运行时负责 异步任务调度I/O 多路复用

五 性能与工程实践建议

  • 选择模型
    • CPU 密集:优先考虑 rayonstd::thread + 工作窃取;共享计数用 AtomicUsize,复杂共享状态用 Arc<Mutex/RwLock>
    • I/O 密集:使用 async/await + tokio(或 async-std),以事件驱动减少线程阻塞与上下文切换。
  • 减少锁争用
    • 细化锁粒度、缩短临界区;优先使用 原子操作 或无锁数据结构;读多写少场景用 RwLock
  • 通道与背压
    • 线程/任务间通信优先 消息传递;当生产者远快于消费者时,引入 有限容量通道批量处理 形成背压,避免内存暴涨。
  • 资源与错误处理
    • 为所有线程/任务 join()/await;为 I/O 操作设置超时与重试;统一错误类型与日志。
  • 可观测性
    • 在关键路径打点与追踪(如连接数、队列长度、处理时延),配合 perf/flamegraphtokio-console 定位瓶颈。
  • 生态扩展
    • 需要更强的并发通道与无锁结构时引入 crossbeam;需要 Actor 模式时考虑 actix

0