|
|
6 F6 Z$ ^& s, m( T
<h1 id="如何实现一个线程池">如何实现一个线程池</h1>$ }2 a3 K) e. u2 Y: @
<p>线程池:一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。 例如,对于计算密集型任务,线程数一般取cpu数量+2比较合适,线程数过多会导致额外的线程切换开销。</p>' p# q x3 j8 B! P) F
<p>如何定义线程池Pool呢,首先最大线程数量肯定要作为线程池的一个属性,并且在new Pool时创建指定的线程。</p>
6 ?& w* B. _. n9 p" V<p>线程池Pool</p>
$ H# x: X& }2 \& w0 s% p<pre><code>pub struct Pool {
' a0 j% u0 X( ^5 ^ max_workers: usize, // 定义最大线程数
2 V. x Z( W* r}
" h) z( b$ b, _# y3 m |! z; r+ [5 z5 H& C' U9 o
impl Pool {4 |1 \' s- T; I: a3 d4 S8 V
fn new(max_workers: usize) -> Pool {}7 v; w' j! j1 G, ~; m! ~9 k
fn execute<F>(&self, f:F) where F: FnOnce() + 'static + Send {}
/ u$ M: y" u4 t I- ]: ]% g" m}
$ u( B! i$ h2 K- [/ `& k9 T& O" z7 o% s0 [
</code></pre>
9 I2 {+ o6 L6 q8 A<p>用<code>execute</code>来执行任务,<code>F: FnOnce() + 'static + Send</code> 是使用thread::spawn线程执行需要满足的trait, 代表F是一个能在线程里执行的闭包函数。</p>
8 y4 [9 M2 u' W<p>另一点自然而然会想到在Pool添加一个线程数组, 这个线程数组就是用来执行任务的。比如<code>Vec<Thread></code> balabala。这里的线程是活的,是一个个不断接受任务然后执行的实体。<br>
% ]4 H- }6 N5 k2 ~- e可以看作在一个线程里不断执行获取任务并执行的Worker。</p>
! n- }% w5 S% Z* \, @/ h; F8 E<pre><code>struct Worker where
$ L, d) Z# V# p* e3 a, c9 }{
, Q: F3 u# z# t. ] _id: usize, // worker 编号
$ E7 l. I8 L+ {}
) ~, O3 W) M0 P0 b4 j& Z3 u' [</code></pre>- k2 _ I! i6 g( v
<p>要怎么把任务发送给Worker执行呢?mpsc(multi producer single consumer) 多生产者单消费者可以满足我们的需求,<code>let (tx, rx) = mpsc::channel()</code> 可以获取到一对发送端和接收端。<br>9 q) }9 r2 i+ w! g
把发送端添加到Pool里面,把接收端添加到Worker里面。Pool通过channel将任务发送给多个worker消费执行。</p>
N. T x$ U) ^6 Y$ t, A<p><strong>这里有一点需要特别注意,channel的接收端receiver需要安全的在多个线程间共享</strong>,因此需要用<code>Arc<Mutex::<T>></code>来包裹起来,也就是用锁来解决并发冲突。</p>! e9 G& E3 z8 W+ f# r; U
<p>Pool的完整定义</p>
0 h) h+ s9 b4 ~! _& [8 y8 Y<pre><code>pub struct Pool {
$ Z, R8 b k. |* v workers: Vec<Worker>,
1 g0 I7 K2 c* l+ k, b max_workers: usize,
+ R7 z3 w# J' Q, J sender: mpsc::Sender<Message>
- ? q' D: R% y/ ]}* p- h: j( [( z" s' N+ I% X" K. d) V
</code></pre>: k6 q! t0 x, |/ ~2 G8 K8 a* H5 \
<p>该是时候定义我们要发给Worker的消息Message了<br>
: z& y( ~( G- r; R% |3 J定义如下的枚举值</p>
; K7 \0 ^- J$ D2 z4 H+ I0 j<pre><code>type Job = Box<dyn FnOnce() + 'static + Send>;% V! |# \% w; I
enum Message {" A- L& C# D1 H+ j( n8 v1 r
ByeBye,; g9 P2 i- f* o
NewJob(Job),8 q+ l& q2 ?* {( i( E2 G
}
* E4 \( m4 b Q4 S4 L) P</code></pre>- O: O$ b* k% C8 p: P% X
<p>Job是一个要发送给Worker执行的闭包函数,这里ByeBye用来通知Worker可以终止当前的执行,退出线程。</p>+ Z! F. N# K: `6 T1 H3 S
<p>只剩下实现Worker和Pool的具体逻辑了。</p>. ^# X" \6 D. `/ _
<p>Worker的实现</p>% T1 w3 e, t6 C
<pre><code>impl Worker
6 B$ Y2 _5 P: ~3 C{
# i6 G* {' p7 N/ n& {+ D fn new(id: usize, receiver: Arc::<Mutex<mpsc::Receiver<Message>>>) -> Worker {8 q, X$ X' T7 s( Y% l$ c
let t = thread::spawn( move || {1 |0 \; M, V1 L: a, [3 [% l* m
loop {) j) p& X2 d! [
let receiver = receiver.lock().unwrap();1 s5 r5 P' k" z( }* T
let message= receiver.recv().unwrap();
9 L$ Y: |$ Y5 w) U8 l7 z- S6 j match message {0 P+ S3 ]" C' T& \0 v$ u
Message::NewJob(job) => {, e* K- I v2 ~9 ~
println!("do job from worker[{}]", id);" E, k0 x7 B0 o C2 G3 Z# X
job();) V. V, d* \- J6 d. J5 z. ?
},
7 _; h8 r+ J. U$ F/ N8 X Message::ByeBye => {
4 q" Z, \" b& T println!("ByeBye from worker[{}]", id);
& {' u. N H& l3 b5 I! p% [ break
0 n( ?( H- h0 O8 _; d! N },5 Q7 Z. W6 E5 W4 |
} + j0 C8 o% O3 I7 C
}# C7 F" E- `; t: k: l) r& y
});
/ @/ t, v% a! ^$ V
% F- D" P3 i. i1 B/ E Worker {
6 ^# [8 d& c% ? _id: id,
4 G3 m, @1 [: U2 E' U7 U t: Some(t),
, |5 K( k T3 v$ l- u! y. }; Z' | }
7 R. Z; l: n1 ^ @ }; t2 T$ s1 C1 f8 ~$ Q
}
" Y: H+ p- u2 ?5 N</code></pre>
5 u$ \! [# d( e, ~/ Z<p><strong>let message = receiver.lock().unwrap().recv().unwrap();</strong> 这里获取锁后从receiver获取到消息体,然后let message结束后rust的生命周期会自动释放掉锁。<br>( f' H: y. B+ B5 n! l/ y
但如果写成</p>
5 Z! X: M+ e- ?3 u H<pre><code>while let message = receiver.lock().unwrap().recv().unwrap() {
* O7 g( |# t. h( x1 h9 e/ o};
. k4 e; ?0 w9 v A; X! O8 A</code></pre>
$ R& Z+ k$ R j+ L4 `<p>while let 后面整个括号都是一个作用域,要在这个作用域结束后,锁才会释放,比上面let message要锁定久时间。<br>7 D8 g' ^( a9 D% q5 b
rust的mutex锁没有对应的unlock方法,由mutex的生命周期管理。</p>
5 d+ h3 y0 j# g0 E N# F# Z<p>我们给Pool实现<code>Drop</code> trait, 让Pool被销毁时,自动暂停掉worker线程的执行。</p>
5 D! q( T8 A, b5 Q<pre><code>impl Drop for Pool {* `3 {% N1 K _7 r, l$ x2 \9 V
fn drop(&mut self) {
! ~2 _0 N$ B2 p8 J for _ in 0..self.max_workers {, |8 b% e# X$ ?, d" H. y
self.sender.send(Message::ByeBye).unwrap();8 `3 i4 x' I7 B1 V/ s
}7 M! o8 D/ w& n3 ^: [
for w in self.workers.iter_mut() {
* v, A0 x. ]( Y if let Some(t) = w.t.take() { F2 a8 D: n4 Q8 l0 B
t.join().unwrap();4 \6 e9 h5 V# |0 k8 G
}" H. |0 G4 d: ^+ z2 t5 c Y2 N A
}. {9 `7 \/ {7 f4 C
}2 @" ?9 s9 L; @' G1 z5 B
}
5 s, k2 ^# c3 R- H
d! I8 u- b& e& s- [</code></pre>: Z$ Z( L9 r. j1 x) ^: H w
<p><strong>drop方法里面用了两个循环</strong>,而不是在一个循环里做完两件事?</p>
7 f: i) V5 s9 F: Q, `1 h<pre><code>for w in self.workers.iter_mut() {' m; f6 ?, n( j6 O5 T; @# [$ {
if let Some(t) = w.t.take() {) \) P9 f+ p: L, y
self.sender.send(Message::ByeBye).unwrap();; K, {9 B9 d; e/ c
t.join().unwrap();
- k: S( o. ^ @; i- u* R }% K5 j/ C, V- Y) H: m2 Y; I2 c/ Y
}3 \" d5 f# D3 ~: [
# F; ?8 f; v/ `+ q</code></pre>
4 l5 z4 x$ t' \7 x! H; {, a<p>这里面隐藏了一个会造成死锁的陷阱,比如两个Worker, 在单个循环里面迭代所有Worker,再将终止信息发送给通道后,直接调用join,<br>0 h9 h; { v+ w+ U+ h; x
我们预期是第一个worker要收到消息,并且等他执行完。当情况可能是第二个worker获取到了消息,第一个worker没有获取到,那接下来的join就会阻塞造成死锁。</p>8 Y0 H5 |7 y# h; g- Y: H
<p><strong>注意到没有,Worker是被包装在Option内的</strong>,这里有两个点需要注意</p>" d3 x9 c& c: Z
<ol>
* o+ q7 [% u) s6 {$ K<li>t.join 需要持有t的所有权</li>2 P. ]: U: N; p5 f9 k' O5 N
<li>在我们这种情况下,self.workers只能作为引用被for循环迭代。</li>
, l4 E, P4 x' g: [2 b</ol>' e( w3 p, _- N% \7 h) I
<p>这里考虑让Worker持有<code>Option<JoinHandle<()>></code>,后续可以通过在Option上调用take方法将Some变体的值移出来,并在原来的位置留下None变体。<br>
, g3 {$ A4 b, o# R/ I [8 v换而言之,让运行中的worker持有Some的变体,清理worker时,可以使用None替换掉Some,从而让Worker失去可以运行的线程</p>" \0 g m8 N) L- |
<pre><code>struct Worker where
/ e0 X/ w* |; p{
6 y4 x) _5 ^% i$ m, E0 N5 }1 } _id: usize, i) l, ?6 ?/ m3 k9 k/ F
t: Option<JoinHandle<()>>,) `$ D/ F) I* `& F. Q
}
5 z6 P E' M7 G8 |5 M( h! u</code></pre>0 }/ b' S4 d9 {8 Y9 Y
<h1 id="要点总结">要点总结</h1>
- A" ~3 A! P' T<ul>
) D* x; F& @( }3 V- k% [2 w<li>Mutex依赖于生命周期管理锁的释放,使用的时候需要注意是否逾期持有锁</li> M0 X% U& q/ f! y, p8 T
<li><code>Vec<Option<T>></code> 可以解决某些情况下需要T所有权的场景</li>
; F: }) B/ w; d9 p, M7 N</ul>
7 e4 o) _: V4 `% D: Y) V8 j0 v<h1 id="完整代码">完整代码</h1>
5 Z( B& o0 B$ f7 T. I<pre><code>use std::thread::{self, JoinHandle};6 A! d8 ^! ?" f% v3 V. b
use std::sync::{Arc, mpsc, Mutex};, P* j: F8 C' t$ y
# F' x: k( j& @
* C# J) J, B( G& l- Qtype Job = Box<dyn FnOnce() + 'static + Send>;
; ~- J" d0 J% F4 {; W. P1 K8 s+ tenum Message {, x0 h4 l# v2 G
ByeBye,' E& E, O- U" i9 }" r h/ t$ C
NewJob(Job),
. s% Z$ Z& t$ Z. z; _. q}
8 A3 V: X* e& W+ l. K2 K
) C! O9 O+ P+ W/ Zstruct Worker where
# \2 t8 ]. F* K% K- q! L3 P) w{
: F f$ C K/ {. u _id: usize,% }- ?9 Z; k7 c
t: Option<JoinHandle<()>>,
; _# E) [. O' {* |4 t% m}7 C. ~: K7 m: O. g4 J4 N
. H0 T1 p) B% d1 r
impl Worker, c% n o5 o( T
{
$ ]" Q4 _: P/ f0 R fn new(id: usize, receiver: Arc::<Mutex<mpsc::Receiver<Message>>>) -> Worker {
4 p# ?% Y. _- l$ F8 N+ [ let t = thread::spawn( move || {: W* h8 ~2 h/ U
loop {
/ n o' i9 n/ @% i0 \' ] let message = receiver.lock().unwrap().recv().unwrap();3 Y1 \0 p- [7 @! r9 D/ F6 P. \
match message {
- H5 y3 T( S/ S5 C. B Message::NewJob(job) => {$ L4 u; h: K! n% M/ X$ ^7 I
println!("do job from worker[{}]", id);
( p5 e9 b& b5 D/ ?; O p/ { job();! A) g; | H1 I; U
},
0 j1 \9 _# ?8 Z) J( ] Message::ByeBye => {
+ r6 H# V1 g t/ E& I, Y println!("ByeBye from worker[{}]", id);
6 O3 h% n) M3 D/ ?, c8 t* V+ x4 V break; Q% x7 m1 Z4 }) q0 x+ i: s
},
+ e+ g. f# F" p4 \5 Z5 |$ ~ } 4 q, {# w, [3 d! d1 _
}
/ A2 L& @7 ? p9 D2 s4 t });( l3 c# h8 h6 l! D: o
( [6 |" X+ z6 M: x; N Worker {
$ w5 E/ A% I* M, x5 C5 f _id: id,
) O: z$ B* h# V; y t: Some(t),5 e3 ]. w5 ]6 i# J6 ~7 e
}
* f- ?4 g5 k- ~5 ]4 y4 C. D7 \; k: L }$ r' S2 D0 n$ S+ f) O5 j. ]# u
}$ r6 @8 J3 `8 [8 y9 t
+ k8 q- g; ?/ @# rpub struct Pool {* ^9 M. N: D) I
workers: Vec<Worker>,
2 z0 u3 t9 l6 Y* `5 T( d max_workers: usize,
* T! J1 X4 c+ E" \1 w& F sender: mpsc::Sender<Message>; T2 W) ~4 C: N4 G }
}
0 a' _8 U+ j: L) D+ D3 A0 N) ~# t7 i, L6 A! D' d" E
impl Pool where {
. J7 Z- M! C( n. W; J pub fn new(max_workers: usize) -> Pool {* G/ E# Z7 Y2 V5 [
if max_workers == 0 {
+ S& w0 ]4 |! k% y( I panic!("max_workers must be greater than zero!")% {1 L3 I& l( V B' O
}
0 P' Z1 \& R/ x% e2 U% R let (tx, rx) = mpsc::channel();$ F7 z& L/ N) f5 t/ P- |: s V
. w* I, ~. k, U8 {* V4 i
let mut workers = Vec::with_capacity(max_workers);
0 b. q6 P' s3 d let receiver = Arc::new(Mutex::new(rx));
8 ~: l2 b" D* E" O7 b3 y" Q, M for i in 0..max_workers {
j! C9 Y6 A+ w! Z, L workers.push(Worker::new(i, Arc::clone(&receiver)));% D4 F$ u) \( y5 c9 ?4 U1 H8 p4 K- u, n
}
& E! {% H! l5 V4 m v( F- d8 ?5 Y4 [ }2 J Q$ M* f
Pool { workers: workers, max_workers: max_workers, sender: tx }
7 [! U) s7 u7 f2 W O, x }& x( X8 p; t. }$ N- |6 c, c
+ G+ {8 H! O7 q4 [
pub fn execute<F>(&self, f:F) where F: FnOnce() + 'static + Send
2 {3 c( I1 N& I {" n: j5 I1 _" \
! m/ ]: \8 T" W s' b9 a. S
let job = Message::NewJob(Box::new(f));
, ~ z( Q4 z2 ^: q- M$ O1 \8 V x4 c self.sender.send(job).unwrap();
8 R/ A1 a1 T: V" D! T! C }
4 A8 ~* u- O; w+ G/ V: a& |- c" _}. f a; P+ P; g! ]( ]9 B Q
& f" o: H& j* ^1 U4 Timpl Drop for Pool {
1 _. C. ^# O" y) d& g- _7 t$ c fn drop(&mut self) {
( F! y" e. l9 b for _ in 0..self.max_workers {1 j; u( V. a9 g3 e6 g( o' |
self.sender.send(Message::ByeBye).unwrap();# O" G5 c& S/ \7 A( N% C M6 h, J& Z3 l
}
6 Y4 `1 ?) ~* N0 L9 x, T for w in self.workers {: P. B0 @& m4 [9 ^6 ]. {
if let Some(t) = w.t.take() {6 C4 W) T/ P" a8 X! E( r& N1 `- Q
t.join().unwrap();* o* D$ v1 P2 O: d+ L# _% y
}
5 x) N- X1 w4 h }! c# ^& a$ a* m! A7 Y% R
}! C) T l' p4 p4 a! ?
}& h; z9 Q. i3 `/ y# f
6 }: Z# g% A1 `: j( O6 A
; |8 h! ?& k1 i#[cfg(test)]
# I7 n' A, @ Omod tests {, H, A( R8 b8 n5 R8 T
use super::*;
/ V& b- C% a+ ^1 q I2 _% d #[test]
8 [, E! }( V9 Q6 u4 l9 l fn it_works() {
9 \" H: Z& l+ m: p: ` let p = Pool::new(4);
& v" d) F, B/ B: m* E* j& w% k" e) \; n p.execute(|| println!("do new job1"));. k9 X7 H) j4 M! Y& X0 w# b1 y
p.execute(|| println!("do new job2"));3 @% D/ T8 P" ?6 A$ [
p.execute(|| println!("do new job3"));1 S' M9 a- d5 i0 N5 M- K
p.execute(|| println!("do new job4"));
1 e/ K8 E8 g% j1 V: ~+ u, H) A' y }
0 {" h3 }6 x7 u}$ I$ Q) |# ~8 q* `9 r/ o
</code></pre>/ _7 p/ t' Q+ c: z
* V, _ h: y" X6 d% \
|
|