pdf下载[1]线程池是一种并发编程的设计模式,它由一组预先创建的线程组成,用于执行多个任务。线程池的主要作用是在任务到达时,重用已创建的线程,避免频繁地创建和销毁线程,从而提高系统的性能和资源利用率。线程池通常用于需要处理大量短期任务或并发请求的应用程序。
线程池的优势包括:
减少线程创建和销毁的开销:线程的创建和销毁是一项昂贵的操作,线程池通过重用线程减少了这些开销,提高了系统的响应速度和效率。
控制并发度:线程池可以限制同时执行的线程数量,从而有效控制系统的并发度,避免资源耗尽和过度竞争。
任务调度和负载均衡:线程池使用任务队列和调度算法来管理和分配任务,确保任务按照合理的方式分配给可用的线程,实现负载均衡和最优的资源利用。
Rayon 是 Rust 中的一个并行计算库,它可以让你更容易地编写并行代码,以充分利用多核处理器。Rayon 提供了一种简单的 API,允许你将迭代操作并行化,从而加速处理大规模数据集的能力。除了这些核心功能外,它还提供构建线程池的能力。
rayon::ThreadPoolBuilder
是 Rayon 库中的一个结构体,用于自定义和配置 Rayon 线程池的行为。线程池是 Rayon 的核心部分,它管理并行任务的执行。通过使用 ThreadPoolBuilder
,你可以根据你的需求定制 Rayon 线程池的行为,以便更好地适应你的并行计算任务。在创建线程池之后,你可以使用 Rayon 提供的方法来并行执行任务,利用多核处理器的性能优势。
ThreadPoolBuilder 是以设计模式中的构建者模式设计的,以下是一些 ThreadPoolBuilder
的主要方法:
new()
方法:创建一个新的 ThreadPoolBuilder
实例。use rayon::ThreadPoolBuilder;
fn main() {
let builder = ThreadPoolBuilder::new();
}
num_threads()
方法:设置线程池的线程数量。你可以通过这个方法指定线程池中的线程数,以控制并行度。默认情况下,Rayon 会根据 CPU 内核数量自动设置线程数。use rayon::ThreadPoolBuilder;
fn main() {
let builder = ThreadPoolBuilder::new().num_threads(4); // 设置线程池有 4 个线程
}
thread_name()
方法:为线程池中的线程设置一个名称,这可以帮助你在调试时更容易识别线程。use rayon::ThreadPoolBuilder;
fn main() {
let builder = ThreadPoolBuilder::new().thread_name(|i| format!("worker-{}", i));
}
build()
方法:通过 build
方法来创建线程池。这个方法会将之前的配置应用于线程池并返回一个 rayon::ThreadPool
实例。use rayon::ThreadPoolBuilder;
fn main() {
let pool = ThreadPoolBuilder::new()
.num_threads(4)
.thread_name(|i| format!("worker-{}", i))
.build()
.unwrap(); // 使用 unwrap() 来处理潜在的错误
}
build_global
方法 通过build_global
方法创建一个全局的线程池。不推荐你主动调用这个方法初始化全局的线程池,使用默认的配置就好,记得全局的线程池只会初始化一次。rayon::ThreadPoolBuilder::new().num_threads(22).build_global().unwrap();
其他方法:ThreadPoolBuilder
还提供了其他一些方法,用于配置线程池的行为,如 stack_size()
用于设置线程栈的大小。
它还提供了一些回调函数的设置,start_handler()
用于设置线程启动时的回调函数等。spawn_handler
实现定制化的函数来产生线程。panic_handler
提供对 panic 处理的回调函数。exit_handler
提供线程退出时的回调。
下面这个例子演示了使用 rayon 线程池计算斐波那契数列:
fn fib(n: usize) -> usize {
if n == 0 || n == 1 {
return n;
}
let (a, b) = rayon::join(|| fib(n - 1), || fib(n - 2)); // 运行在rayon线程池中
return a + b;
}
pub fn rayon_threadpool() {
let pool = rayon::ThreadPoolBuilder::new()
.num_threads(8)
.build()
.unwrap();
let n = pool.install(|| fib(20));
println!("{}", n);
}
rayon::ThreadPoolBuilder 用来创建一个线程池。设置使用 8 个线程
pool.install() 在线程池中运行 fib
rayon::join 用于并行执行两个函数并等待它们的结果。它使得你可以同时执行两个独立的任务,然后等待它们都完成,以便将它们的结果合并到一起。
通过在 join 中传入 fib 递归任务,实现并行计算 fib 数列
与直接 spawn thread 相比,使用 rayon 的线程池有以下优点:
线程可重用,避免频繁创建/销毁线程的开销
线程数可配置,一般根据 CPU 核心数设置
避免大量线程造成资源竞争问题
接下来在看一段使用build_scoped
的代码:
scoped_tls::scoped_thread_local!(static POOL_DATA: Vec<i32>);
pub fn rayon_threadpool2() {
let pool_data = vec![1, 2, 3];
// We haven't assigned any TLS data yet.
assert!(!POOL_DATA.is_set());
rayon::ThreadPoolBuilder::new()
.build_scoped(
// Borrow `pool_data` in TLS for each thread.
|thread| POOL_DATA.set(&pool_data, || thread.run()),
// Do some work that needs the TLS data.
|pool| pool.install(|| assert!(POOL_DATA.is_set())),
).unwrap();
// Once we've returned, `pool_data` is no longer borrowed.
drop(pool_data);
}
这段 Rust 代码使用了一些 Rust 库来演示线程池的使用以及如何在线程池中共享线程本地存储(TLS,Thread-Local Storage)。
scoped_tls::scoped_thread_local!(static POOL_DATA: Vec<i32>);
这一行代码使用了 scoped_tls
库的宏 scoped_thread_local!
来创建一个静态的线程本地存储变量 POOL_DATA
,其类型是 Vec<i32>
。这意味着每个线程都可以拥有自己的 POOL_DATA
值,而这些值在不同线程之间是相互独立的。
let pool_data = vec![1, 2, 3];
在 main
函数内,创建了一个 Vec<i32>
类型的变量 pool_data
,其中包含了整数 1、2 和 3。
assert!(!POOL_DATA.is_set());
这一行代码用来检查在线程本地存储中是否已经设置了 POOL_DATA
。在此初始阶段,我们还没有为它的任何线程分配值,因此应该返回 false
。
rayon::ThreadPoolBuilder::new()
这一行开始构建一个 Rayon 线程池。
.build_scoped
在线程池建立之后,这里使用 .build_scoped
方法来定义线程池的行为。这个方法需要两个闭包作为参数。
第一个闭包 |thread| POOL_DATA.set(&pool_data, || thread.run())
用于定义每个线程在启动时要执行的操作。它将 pool_data
的引用设置为 POOL_DATA
的线程本地存储值,并在一个新的线程中运行 thread.run()
,这个闭包的目的是为每个线程设置线程本地存储数据。
第二个闭包 |pool| pool.install(|| assert!(POOL_DATA.is_set()))
定义了线程池启动后要执行的操作。它使用 pool.install
方法来确保在线程池中的每个线程中都能够访问到线程本地存储的值,并且执行了一个断言来验证 POOL_DATA
在这个线程的线程本地存储中已经被设置。
drop(pool_data);
在线程池的作用域结束后,这一行代码用来释放 pool_data
变量。这是因为线程本地存储中的值是按线程管理的,所以在这个作用域结束后,我们需要手动释放 pool_data
,以确保它不再被任何线程访问。threadpool
是一个 Rust 库,用于创建和管理线程池,使并行化任务变得更加容易。线程池是一种管理线程的机制,它可以在应用程序中重用线程,以减少线程创建和销毁的开销,并允许您有效地管理并行任务。下面是关于 threadpool
库的一些基本介绍:
创建线程池: threadpool
允许您轻松创建线程池,可以指定线程池的大小(即同时运行的线程数量)。这可以确保您不会创建过多的线程,从而避免不必要的开销。
提交任务: 一旦创建了线程池,您可以将任务提交给线程池进行执行。这可以是任何实现了 FnOnce()
特质的闭包,通常用于表示您想要并行执行的工作单元。
任务调度: 线程池会自动将任务分发给可用线程,并在任务完成后回收线程,以便其他任务可以使用。这种任务调度可以减少线程创建和销毁的开销,并更好地利用系统资源。
等待任务完成: 您可以等待线程池中所有任务完成,以确保在继续执行后续代码之前,所有任务都已完成。这对于需要等待并行任务的结果的情况非常有用。
错误处理: threadpool
提供了一些错误处理机制,以便您可以检测和处理任务执行期间可能发生的错误。
下面是一个简单的示例,演示如何使用 threadpool
库创建一个线程池并提交任务:
use std::sync::mpsc::channel;
use threadpool::ThreadPool;
fn main() {
// 创建一个线程池,其中包含 4 个线程
let pool = threadpool::ThreadPool::new(4);
// 创建一个通道,用于接收任务的结果
let (sender, receiver) = channel();
// 提交一些任务给线程池
for i in 0..8 {
let sender = sender.clone();
pool.execute(move || {
let result = i * 2;
sender.send(result).expect("发送失败");
});
}
// 等待所有任务完成,并接收它们的结果
for _ in 0..8 {
let result = receiver.recv().expect("接收失败");
println!("任务结果: {}", result);
}
}
上述示例创建了一个包含 4 个线程的线程池,并向线程池提交了 8 个任务,每个任务计算一个数字的两倍并将结果发送到通道。最后,它等待所有任务完成并打印结果。
接下来我们再看一个 threadpool + barrier 的例子。并发执行多个任务,并且使用 barrier 等待所有的任务完成。注意任务数一定不能大于 worker 的数量,否则会导致死锁:
// create at least as many workers as jobs or you will deadlock yourself
let n_workers = 42;
let n_jobs = 23;
let pool = threadpool::ThreadPool::new(n_workers);
let an_atomic = Arc::new(AtomicUsize::new(0));
assert!(n_jobs <= n_workers, "too many jobs, will deadlock");
// 创建一个barrier,等待所有的任务完成
let barrier = Arc::new(Barrier::new(n_jobs + 1));
for _ in 0..n_jobs {
let barrier = barrier.clone();
let an_atomic = an_atomic.clone();
pool.execute(move || {
// 执行一个很重的任务
an_atomic.fetch_add(1, Ordering::Relaxed);
// 等待其他线程完成
barrier.wait();
});
}
// 等待线程完成
barrier.wait();
assert_eq!(an_atomic.load(Ordering::SeqCst), /* n_jobs = */ 23);
这是基于 crossbeam 多生产者多消费者通道实现的自适应线程池。它具有以下特点:
总之,该线程池实现了自动扩缩容、空闲回收、异步任务支持等功能。
其自适应控制和异步任务的支持使其可以很好地应对突发大流量,而平时也可以节省资源。
从实现来看,作者运用了 crossbeam 通道等 Rust 并发编程地道的方式,代码质量很高。
所以这是一个非常先进实用的线程池实现,值得深入学习借鉴。可以成为我们编写弹性伸缩的 Rust 并发程序的很好选择
pub fn rusty_pool_example() {
let pool = rusty_pool::ThreadPool::default();
for _ in 1..10 {
pool.execute(|| {
println!("Hello from a rusty_pool!");
});
}
pool.join();
}
这个例子展示了如何使用另一个线程池 rusty_pool 来实现并发。
主要步骤包括:
与之前的 threadpool 类似,rusty_pool 也提供了一个方便的线程池抽象,使用起来更简单些。
下面这段代码是提交一个任务给线程池运行后,等到结果返回的例子:
let handle = pool.evaluate(|| {
thread::sleep(Duration::from_secs(5));
return 4;
});
let result = handle.await_complete();
assert_eq!(result, 4);
下面这个例子展示了如何在 rusty_pool 线程池中执行异步任务。
主要包含两个处理方式:
a1、创建默认的 rusty_pool 线程池
a2、使用 pool.complete 来同步执行一个 async 块
b1、使用 pool.spawn 来异步执行 async 块
b2、在主线程中调用 join,等待异步任务完成
b3、检验异步任务的结果
通过 complete 和 spawn 的结合,可以灵活地在线程池中同步或异步地执行 Future 任务。
rusty_pool 通过内置的 async 运行时,很好地支持了 Future based 的异步编程。
我们可以利用这种方式来实现复杂的异步业务,而不需要自己管理线程和 Future。
pub fn rusty_pool_example2() {
let pool = rusty_pool::ThreadPool::default();
let handle = pool.complete(async {
let a = some_async_fn(4, 6).await; // 10
let b = some_async_fn(a, 3).await; // 13
let c = other_async_fn(b, a).await; // 3
some_async_fn(c, 5).await // 8
});
assert_eq!(handle.await_complete(), 8);
let count = Arc::new(AtomicI32::new(0));
let clone = count.clone();
pool.spawn(async move {
let a = some_async_fn(3, 6).await; // 9
let b = other_async_fn(a, 4).await; // 5
let c = some_async_fn(b, 7).await; // 12
clone.fetch_add(c, Ordering::SeqCst);
});
pool.join();
assert_eq!(count.load(Ordering::SeqCst), 12);
}
接下来是等待超时以及关闭线程池的例子:
pub fn rusty_pool_example3() {
let pool = ThreadPool::default();
for _ in 0..10 {
pool.execute(|| thread::sleep(Duration::from_secs(10)))
}
// 等待所有线程变得空闲,即所有任务都完成,包括此线程调用join()后由其他线程添加的任务,或者等待超时
pool.join_timeout(Duration::from_secs(5));
let count = Arc::new(AtomicI32::new(0));
for _ in 0..15 {
let clone = count.clone();
pool.execute(move || {
thread::sleep(Duration::from_secs(5));
clone.fetch_add(1, Ordering::SeqCst);
});
}
// 关闭并删除此“ ThreadPool”的唯一实例(无克隆),导致通道被中断,从而导致所有worker在完成当前工作后退出
pool.shutdown_join();
assert_eq!(count.load(Ordering::SeqCst), 15);
}
这个线程池实现经过优化以获取最小化延迟。特别是,保证你在执行你的任务之前不会支付线程生成的成本。新线程仅在工作线程的 "闲置时间"(例如,在返回作业结果后)期间生成。
唯一可能导致延迟的情况是 "可用" 工作线程不足。为了最小化这种情况的发生概率,这个线程池会不断保持一定数量的可用工作线程(可配置)。
这个实现允许你以异步方式等待任务的执行结果,因此你可以将其用作替代异步运行时的 spawn_blocking
函数。
pub fn fast_threadpool_example() -> Result<(), fast_threadpool::ThreadPoolDisconnected>{
let threadpool = fast_threadpool::ThreadPool::start(ThreadPoolConfig::default(), ()).into_sync_handler();
assert_eq!(4, threadpool.execute(|_| { 2 + 2 })?);
Ok(())
}
这个例子展示了 fast_threadpool crate 的用法。
主要步骤包括:
下面这个例子异步执行任务的例子,这里我们使用了 tokio 的异步运行时:
let rt = tokio::runtime::Runtime::new().unwrap();
rt.block_on(async {
let threadpool = fast_threadpool::ThreadPool::start(ThreadPoolConfig::default(), ()).into_async_handler();
assert_eq!(4, threadpool.execute(|_| { 2 + 2 }).await.unwrap());
});
在 Rust 多线程编程中,scoped 是一个特定的概念,指的是一种限定作用域的线程。
scoped 线程的主要特征是:
一个典型的 scoped 线程池用法如下:
pool.scoped(|scope| {
scope.execute(|| {
// 可以直接访问外部状态
});
}); // 作用域结束时,线程被Join
scoped 线程的优点是:
scoped 线程适用于:
总之,scoped 线程在 Rust 中提供了一种更安全便捷的多线程模式,值得我们在多线程编程中考虑使用。
这一节我们就介绍一个专门的 scoped_threadpool 库。
pub fn scoped_threadpool() {
let mut pool = scoped_threadpool::Pool::new(4);
let mut vec = vec![0, 1, 2, 3, 4, 5, 6, 7];
// Use the threads as scoped threads that can reference anything outside this closure
pool.scoped(|s| {
// Create references to each element in the vector ...
for e in &mut vec {
// ... and add 1 to it in a seperate thread
s.execute(move || {
*e += 1;
});
}
});
assert_eq!(vec, vec![1, 2, 3, 4, 5, 6, 7, 8]);
}
这个例子展示了如何使用 scoped_threadpool 库创建一个 scoped 线程池。
scoped 线程池的主要特点:
相比全局线程池,scoped 线程池的优势在于:
接下来可以扩展介绍:
总之,scoped 线程池提供了一种更安全方便的并发模式,很适合在 Rust 中使用。
scheduled-thread-pool 是一个 Rust 库,它提供了一个支持任务调度的线程池实现。下面我来介绍其主要功能和用法:
pub fn scheduled_thread_pool() {
let (sender, receiver) = channel();
let pool = scheduled_thread_pool::ScheduledThreadPool::new(4);
let handle = pool.execute_after(Duration::from_millis(1000), move ||{
println!("Hello from a scheduled thread!");
sender.send("done").unwrap();
});
let _ = handle;
receiver.recv().unwrap();
}
这个例子展示了如何使用 scheduled_thread_pool crate 创建一个可调度的线程池。
scheduled 线程池的主要功能:
相比普通线程池,scheduled 线程池的优势在于:
poolite 是一个非常轻量级的 Rust 线程池库,主要有以下特性:
提供了基础的创建池子、添加任务等接口:
let pool = poolite::Pool::new()?;
pool.push(|| println!("hello"));
scoped 可以自动等待任务完成:
pool.scoped(|scope| {
scope.push(|| println!("hello"));
});
可以通过 Builder 自定义线程数:
let pool = poolite::Pool::builder().thread_num(8).build()?;
对于我们常见的共享资源的访问,poollite 也提供了很好的支持。下面的例子是计算斐波那契数列的并发版本:
use poolite::Pool;
use std::collections::BTreeMap;
use std::sync::{Arc, Mutex};
/// `cargo run --example arc_mutex`
fn main() {
let pool = Pool::new().unwrap();
// You also can use RwLock instead of Mutex if you read more than write.
let map = Arc::new(Mutex::new(BTreeMap::<i32, i32>::new()));
for i in 0..10 {
let map = map.clone();
pool.push(move || test(i, map));
}
pool.join(); //wait for the pool
for (k, v) in map.lock().unwrap().iter() {
println!("key: {}\tvalue: {}", k, v);
}
}
fn test(msg: i32, map: Arc<Mutex<BTreeMap<i32, i32>>>) {
let res = fib(msg);
let mut maplock = map.lock().unwrap();
maplock.insert(msg, res);
}
fn fib(msg: i32) -> i32 {
match msg {
0...2 => 1,
x => fib(x - 1) + fib(x - 2),
}
}
fn main() {
let pool = Pool::new().unwrap();
let (mp, sc) = channel();
for i in 0..38 {
let mp = mp.clone();
pool.push(move || test(i, mp));
}
pool.join(); // wait for the pool
println!("{:?}", pool);
while let Ok((k, v)) = sc.try_recv() {
println!("key: {}\tvalue: {}", k, v);
}
}
fn main() {
let pool = Builder::new()
.min(1)
.max(9)
.daemon(None) // Close
.timeout(None) //Close
.name("Worker")
.stack_size(1024*1024*2) //2Mib
.build()
.unwrap();
for i in 0..38 {
pool.push(move || test(i));
}
pool.join(); //wait for the pool
println!("{:?}", pool);
}
poolite 整个库只有约 500 多行代码,非常精简。
poolite 提供了一个简单实用的线程池实现,适合对性能要求不高,但需要稳定和易用的场景,如脚本语言的运行时等。
如果需要一个小而精的 Rust 线程池,poolite 是一个很不错的选择。
executor_service 是一个提供线程池抽象的 Rust 库,模仿 Java 的 ExecutorService,主要特征如下:
executor_service 是一个提供线程池抽象的 Rust 库,主要特征如下:
可以按需创建不同类型的线程池:
// 固定线程数线程池
let pool = Executors::new_fixed_thread_pool(4)?;
// 缓存线程池
let pool = Executors::new_cached_thread_pool()?;
固定线程数的线程池顾名思义,也就是创建固定数量的线程,线程数量不会变化。
缓存线程池会按需创建线程,创建的新线程会被缓存起来。默认初始化 10 个线程,最多 150 个线程。最大线程值是个常量,看起来不能修改,但是初始化的线程数可以在初始化的时候设置,但也不能超过 150。
支持闭包、Future 等任务形式:
// 执行闭包
pool.execute(|| println!("hello"));
// 提交future
pool.spawn(async {
// ...
});
submit_sync 可以同步提交任务并获取返回值:
let result = pool.submit_sync(|| {
// run task
return result;
})?;
可以自定义线程池参数:
ThreadPoolExecutor::builder()
.core_threads(4)
.max_threads(8)
.build()?;
这个例子展示了如何使用 executor_service 这个线程池库:
pub fn executor_service_example() {
use executor_service::Executors;
let mut executor_service =
Executors::new_fixed_thread_pool(10).expect("Failed to create the thread pool");
let counter = Arc::new(AtomicUsize::new(0));
for _ in 0..10 {
let counter = counter.clone();
executor_service.execute(move || {
thread::sleep(Duration::from_millis(100));
counter.fetch_add(1, Ordering::SeqCst);
});
}
thread::sleep(Duration::from_millis(1000));
assert_eq!(counter.load(Ordering::SeqCst), 10);
let mut executor_service = Executors::new_fixed_thread_pool(2).expect("Failed to create the thread pool");
let some_param = "Mr White";
let res = executor_service.submit_sync(move || {
sleep(Duration::from_secs(5));
println!("Hello {:}", some_param);
println!("Long computation finished");
2
}).expect("Failed to submit function");
println!("Result: {:#?}", res);
assert_eq!(res, 2);
}
示例中做了以下几件事:
threadpool_executor 是一个功能丰富的 Rust 线程池库,提供了高度可配置的线程池实现。主要特性如下:
通过构建器可以自定义线程池所有方面的参数:
ThreadPool::builder()
.core_threads(4)
.max_threads(8)
.keep_alive(Duration::from_secs(30))
.build();
闭包、async 块、回调函数等:
// 闭包
pool.execute(|| println!("hello"));
// 异步任务
pool.execute(async {
// ...
});
所有任务执行后返回 Result<T, E>:
let result = pool.execute(|| {
Ok(1 + 2)
})?;
let res = result.unwrap().get_result_timeout(std::time::Duration::from_secs(3));
assert!(res.is_err());
if let Err(err) = res {
matches!(err.kind(), threadpool_executor::error::ErrorKind::TimeOut);
}
可以随时取消已提交的任务:
let mut task = pool.execute(|| {}).unwrap();
task.cancel();
按需创建线程,自动回收空闲线程。
threadpool_executor 提供了完整可控的线程池实现,适合对线程管理要求较高的场景。它的配置能力非常强大,值得深入研究和使用。
这个例子展示了如何使用 threadpool_executor 这个线程池库:
pub fn threadpool_executor_example() {
let pool = threadpool_executor::ThreadPool::new(1);
let mut expectation = pool.execute(|| "hello, thread pool!").unwrap();
assert_eq!(expectation.get_result().unwrap(), "hello, thread pool!");
let pool = threadpool_executor::threadpool::Builder::new()
.core_pool_size(1)
.maximum_pool_size(3)
.keep_alive_time(std::time::Duration::from_secs(300))
.exeed_limit_policy(threadpool_executor::threadpool::ExceedLimitPolicy::Wait)
.build();
pool.execute(|| {
std::thread::sleep(std::time::Duration::from_secs(3));
})
.unwrap();
let mut exp = pool.execute(|| {}).unwrap();
exp.cancel();
}
示例中做了以下几件事:
创建一个单线程线程池,提交一个任务并获取结果
使用 Builder 创建一个可配置的线程池
提交一个长时间任务到线程池
提交一个任务后立即取消它
threadpool_executor 的一些关键特性:
后续可以扩展介绍:
threadpool_executor 提供了功能完备的线程池实现,适合需要细粒度控制的场景。
pdf下载: https://github.com/smallnest/concurrency-programming-via-rust/blob/master/book_cn/rust_concurrency_cookbook.pdf