|
|
3 e4 A4 n4 j; W7 s<h1 id="如何实现一个线程池">如何实现一个线程池</h1>7 ?" U1 m5 z0 Q6 k# o" G( v9 q/ g# ?
<p>线程池:一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。 例如,对于计算密集型任务,线程数一般取cpu数量+2比较合适,线程数过多会导致额外的线程切换开销。</p>2 v. y" E6 @* _8 E
<p>如何定义线程池Pool呢,首先最大线程数量肯定要作为线程池的一个属性,并且在new Pool时创建指定的线程。</p>. A1 Q' S( L1 Z
<p>线程池Pool</p>; S: }7 U" d; i% \. M, f
<pre><code>pub struct Pool {/ P; [$ D, H1 T( w S5 p+ d
max_workers: usize, // 定义最大线程数4 _6 Q# D, T$ K% X( {& W
}, ~& Y, f6 u1 z
# C4 a. W5 l: b$ W$ F9 ]impl Pool {
7 q0 r) I" Y" P8 y. p* m- R fn new(max_workers: usize) -> Pool {}0 h( K# d) j. S v3 v3 W8 k2 ~; r
fn execute<F>(&self, f:F) where F: FnOnce() + 'static + Send {}/ O* Q" v2 X1 ^/ y5 H0 @! x
}
+ s l V# ~- a. i! w/ ^4 x- x/ H, m4 X6 w1 W- X( e, k
</code></pre>
% ~+ X9 X' X5 W<p>用<code>execute</code>来执行任务,<code>F: FnOnce() + 'static + Send</code> 是使用thread::spawn线程执行需要满足的trait, 代表F是一个能在线程里执行的闭包函数。</p>/ k, W- y0 b$ l
<p>另一点自然而然会想到在Pool添加一个线程数组, 这个线程数组就是用来执行任务的。比如<code>Vec<Thread></code> balabala。这里的线程是活的,是一个个不断接受任务然后执行的实体。<br>
9 h9 Z. `/ o* x% Y. U可以看作在一个线程里不断执行获取任务并执行的Worker。</p>, D% ^6 {! E! ~* h) t- D
<pre><code>struct Worker where
' N: W! D4 E/ w{) {1 D& o1 e1 X+ f" Q
_id: usize, // worker 编号' W. e% g& \1 O8 c% Y/ c
} f7 P+ {' @& ~- r4 a
</code></pre>
2 E) j4 ?/ Q# l: z<p>要怎么把任务发送给Worker执行呢?mpsc(multi producer single consumer) 多生产者单消费者可以满足我们的需求,<code>let (tx, rx) = mpsc::channel()</code> 可以获取到一对发送端和接收端。<br>
) R! J7 `2 t- V7 j& L把发送端添加到Pool里面,把接收端添加到Worker里面。Pool通过channel将任务发送给多个worker消费执行。</p>
3 k7 U& n7 k$ Z5 ~* m: {<p><strong>这里有一点需要特别注意,channel的接收端receiver需要安全的在多个线程间共享</strong>,因此需要用<code>Arc<Mutex::<T>></code>来包裹起来,也就是用锁来解决并发冲突。</p>
1 l3 g6 }$ h6 L* p, s<p>Pool的完整定义</p>0 U$ ^6 |2 y$ K2 h! Y) Z0 y
<pre><code>pub struct Pool {
# m. b! H1 ^% K5 U z3 c workers: Vec<Worker>,
8 P5 p d0 k4 P max_workers: usize,
+ M' }% o; I! i( M" L sender: mpsc::Sender<Message>8 `' h9 P3 g% w- s5 e& M! V3 z
}! I' a* P2 W6 ]" x
</code></pre>
" |# k! O+ K0 T( o<p>该是时候定义我们要发给Worker的消息Message了<br>
3 m/ O+ c9 }; J! N( ]4 {5 }定义如下的枚举值</p>
1 ~* a6 c& s x% M5 U$ ]$ s( v: j<pre><code>type Job = Box<dyn FnOnce() + 'static + Send>;
1 X, L' b3 L9 w3 P, `. ~3 renum Message {
: ^& L( @ [3 K. j- |, x( } ByeBye,9 n, I& T# x+ ^( C; D
NewJob(Job),; t) V" \# h) D! v+ ~
}' J/ [2 b" u- @' x1 M6 P+ q
</code></pre>2 L& t, }* Y9 k9 v5 G! \
<p>Job是一个要发送给Worker执行的闭包函数,这里ByeBye用来通知Worker可以终止当前的执行,退出线程。</p>
& n9 d! l; V* |6 g, N5 q<p>只剩下实现Worker和Pool的具体逻辑了。</p>
7 }5 T/ B% j+ d* Q1 I# F<p>Worker的实现</p>. ?, Z1 z1 c5 ~2 c/ S
<pre><code>impl Worker
1 y: u; w& \& U0 K{1 c: F4 c3 S- G: @
fn new(id: usize, receiver: Arc::<Mutex<mpsc::Receiver<Message>>>) -> Worker {3 s: e/ G5 j+ d! _3 {' ~! ]: h7 H
let t = thread::spawn( move || {* i7 C) t) v' N. J( b5 }3 J
loop {
9 G+ G6 c2 ~ X* h/ ?* Z let receiver = receiver.lock().unwrap();6 z/ l( j* h( I4 z
let message= receiver.recv().unwrap();( Y# L! t& O {- e2 {1 ~' N
match message {! n9 p1 j4 q, g8 D
Message::NewJob(job) => {
5 X" W) V2 b/ b# t) i println!("do job from worker[{}]", id);$ h* L( \2 T: q' G
job();! B2 y* `" ^ ^* K0 s
},
- Y! t) k( d1 H; j1 B Message::ByeBye => {: P! b/ s1 Y9 Q* R3 M1 i& j
println!("ByeBye from worker[{}]", id);' v) S! o: f4 ~9 Z$ ]% J0 B
break. c6 j8 C# `, o* {
},8 o1 P' ]5 j) ~4 k: z
} 1 E$ U7 q- M& w4 h1 _- }
}" N. i8 Y# S6 y& H' a7 w) H& h: j
});
2 l0 H* @* Y& Z8 a! x! p$ M7 u, \. A5 D2 c/ _( P
Worker {
. J' ^# J0 O% {% V# H* c5 n* f* n _id: id,
3 c+ Q0 H7 ]$ I8 Y& y6 C6 ?0 P t: Some(t),
: G, }7 Y# S1 r0 q. U }
( `: G+ t6 {( g8 n3 g: u }2 @/ [# R5 m/ i6 E, t: ]) j+ W
}5 ]: O! {& A9 Q, X9 F. }1 |& Q
</code></pre>
; d9 O, O, v- _1 d; L6 |# C3 \! X9 y- y<p><strong>let message = receiver.lock().unwrap().recv().unwrap();</strong> 这里获取锁后从receiver获取到消息体,然后let message结束后rust的生命周期会自动释放掉锁。<br>0 ~; c7 L$ d* x
但如果写成</p>% o) t5 p6 w3 I1 K; C9 D2 f/ d
<pre><code>while let message = receiver.lock().unwrap().recv().unwrap() {8 w% j3 h5 M$ D _' f3 p, v) R
};
7 M' y) [' {% ^# W7 x& P0 J }</code></pre>0 l: x, ^0 A4 o, s, y
<p>while let 后面整个括号都是一个作用域,要在这个作用域结束后,锁才会释放,比上面let message要锁定久时间。<br>
- K! I1 e; l6 @9 K% h! ?1 ^rust的mutex锁没有对应的unlock方法,由mutex的生命周期管理。</p>
9 O% f; \0 U" c* q* H0 [7 O9 w2 _<p>我们给Pool实现<code>Drop</code> trait, 让Pool被销毁时,自动暂停掉worker线程的执行。</p>+ `+ z) W. L: X
<pre><code>impl Drop for Pool {$ ~. {# i# k1 \: |5 v3 I
fn drop(&mut self) {
3 s( b( ~/ _: N for _ in 0..self.max_workers {
& z& H9 E, W8 E self.sender.send(Message::ByeBye).unwrap();6 O, K+ ]$ j6 c' ~7 T
}1 G2 d, P4 @& `/ h, p: a) r
for w in self.workers.iter_mut() {/ S4 n' W' ?& |5 g. m$ K i5 {
if let Some(t) = w.t.take() {/ y2 d! _! Y2 U# \1 s+ W2 A' {
t.join().unwrap();
) }- M4 t2 g i; E% ]# b9 g. f }- c: w- Z4 j: D: `6 y
}
9 ^/ s) g6 `! M7 c1 r }9 b. Q# t9 k: B9 U" w
}; y/ O' H) r4 ^0 } m9 p3 v1 s
0 M2 o! Q7 n# Q# `
</code></pre>- C( {$ _1 G; f7 w
<p><strong>drop方法里面用了两个循环</strong>,而不是在一个循环里做完两件事?</p>9 u; T0 Q. E1 R! v+ @' i' o
<pre><code>for w in self.workers.iter_mut() {$ L$ h* Z" N' J( s
if let Some(t) = w.t.take() {
6 j% m& Z& q: |7 Y6 H3 z self.sender.send(Message::ByeBye).unwrap();
U0 W3 g6 V) k4 k8 g t.join().unwrap();8 w7 ?9 I0 W. D+ F9 p/ j' K' a
}8 `, P: ]2 N3 u I
}7 {" H( f- D4 O
2 U' u+ I, \5 }4 |' h: V
</code></pre>
. @1 x; X: A: q o$ H W<p>这里面隐藏了一个会造成死锁的陷阱,比如两个Worker, 在单个循环里面迭代所有Worker,再将终止信息发送给通道后,直接调用join,<br>' @$ M0 v# c0 Z, S" _
我们预期是第一个worker要收到消息,并且等他执行完。当情况可能是第二个worker获取到了消息,第一个worker没有获取到,那接下来的join就会阻塞造成死锁。</p>
' m0 `5 F q) R5 c3 [; E<p><strong>注意到没有,Worker是被包装在Option内的</strong>,这里有两个点需要注意</p>8 l0 o; m O# p/ n4 Q% K. t
<ol>" g3 a% p; N, [4 }! S$ P
<li>t.join 需要持有t的所有权</li>
0 F$ w5 Q1 |! W' T$ t<li>在我们这种情况下,self.workers只能作为引用被for循环迭代。</li>0 u* e- q$ ~. m/ p- `5 S8 x0 K
</ol>7 s) A- J( n' O4 {5 k+ K8 R) l3 J
<p>这里考虑让Worker持有<code>Option<JoinHandle<()>></code>,后续可以通过在Option上调用take方法将Some变体的值移出来,并在原来的位置留下None变体。<br>5 i& g9 X( r8 V/ p
换而言之,让运行中的worker持有Some的变体,清理worker时,可以使用None替换掉Some,从而让Worker失去可以运行的线程</p>
+ |5 k1 H* y/ M. e<pre><code>struct Worker where
, C( `; T& W# |; ~{. [) l( l6 k+ I7 M3 a7 Y2 v9 L8 A
_id: usize,
# h& d5 d- a) \" U t: Option<JoinHandle<()>>,
: O$ V# j2 t& g4 g9 V}4 F/ G. A+ U o9 n
</code></pre>
! c; \: C; S' L<h1 id="要点总结">要点总结</h1>& A7 o2 i. c, F l
<ul>, O; S! S; d! P6 }. g
<li>Mutex依赖于生命周期管理锁的释放,使用的时候需要注意是否逾期持有锁</li>
3 J& w! t6 n: {4 z6 H<li><code>Vec<Option<T>></code> 可以解决某些情况下需要T所有权的场景</li>
8 O) I L9 b& ?( ]</ul>9 n( n" p* s& t+ c! j( S& {1 V
<h1 id="完整代码">完整代码</h1>
$ T& E4 C5 ?0 L& v. R/ M<pre><code>use std::thread::{self, JoinHandle};
3 ]4 F7 a( |2 q( X! X5 b: r @use std::sync::{Arc, mpsc, Mutex};
$ r7 r: ?4 G3 ]; @; ^. o ?: A( }$ K5 S' G4 D, e$ `3 @+ j
( h, O7 |6 G4 g G8 y8 U7 b; Rtype Job = Box<dyn FnOnce() + 'static + Send>;( f: c6 l! e6 D3 _3 y0 \
enum Message {
: W( a4 e7 Q a: M: H ByeBye,
" @# v7 x2 P+ t+ S( i NewJob(Job),
3 d2 O6 o$ k7 x1 C! X+ I}
) m. k( u- F7 p# @$ ~+ K5 @( t/ A5 r! _2 c% i" \$ Z
struct Worker where
, J% b: C" Q8 A) \{
: q* K- r5 E( \8 s& G' ]" f _id: usize,9 q+ f% |+ E% Y
t: Option<JoinHandle<()>>,
! T4 C0 J. L( x/ J _, T W+ I}0 E( h# E( n4 ^* \$ j( b* F
9 i$ n- b( q0 [& J
impl Worker
( G* N7 r3 R) \% t5 G) _{
5 O& R, F1 N! ~4 x3 s+ E% L3 f fn new(id: usize, receiver: Arc::<Mutex<mpsc::Receiver<Message>>>) -> Worker {, J- U8 X. C5 r1 h' \0 C
let t = thread::spawn( move || {: z( S1 V) `# H0 I: q( \
loop {/ {+ x9 K5 u) B9 c/ {
let message = receiver.lock().unwrap().recv().unwrap();
. D( N' ]8 X M' d& u match message {- U$ K) d, B& h; ^" s; T
Message::NewJob(job) => {
$ h8 `. C+ a3 A) J# t println!("do job from worker[{}]", id);# U" n* _/ W2 p7 n6 W
job();: o: U) A5 k# d# y$ y
},
/ Z- P/ Q, _3 |/ M5 Q Message::ByeBye => {$ k% v; k5 @+ h" P1 h4 c
println!("ByeBye from worker[{}]", id);
" S2 Q4 h- o9 Q1 [ break/ y! [/ N5 F: x* h" j, |
},
+ q3 ^$ F8 d! B7 i2 v6 |9 r6 y } : Y& p2 g2 m; T; i3 j1 L
}- c2 N u" t( v
});
& f @5 B0 [" a/ s; A
! r8 z. c" I# I- z% K) F. a Worker {, A, {) c2 ?* D: p+ G9 |: x
_id: id,
3 j- H: W4 P7 G7 t( w) F t: Some(t),
. A1 b6 V$ B' H }. W8 y1 L& \+ N6 n* @
}
, k% X" S! ^: }1 f( [: L8 \. H8 g( n% W* k}
; p5 W0 n2 S7 @% W/ A: W8 v! W# S" Z' p* W; _/ I; i6 ~* V- t
pub struct Pool {
3 G0 l; v# i& `9 ~/ f! [ workers: Vec<Worker>,* h+ S1 \. `' X% a7 J8 t
max_workers: usize,; U0 V8 F/ N1 Z. Z
sender: mpsc::Sender<Message>9 I; H K, V7 \4 u+ a0 J
}
% J% p5 k: l1 j! f$ L4 J
7 P6 S- J+ P4 U) g1 f! m- Limpl Pool where {
4 B# ]6 k8 ]" C7 V5 v. ]( J( \ pub fn new(max_workers: usize) -> Pool {0 [4 X+ D- c6 E# k/ J+ r$ K
if max_workers == 0 {
" n8 ~1 O% y! v; Q9 h panic!("max_workers must be greater than zero!")
5 D- y' [# D1 ^. X. c0 O2 f }' z3 S- s( }7 N' d( R
let (tx, rx) = mpsc::channel();
% \! B4 F* g) c; \
. L4 W5 P6 b; {8 p v let mut workers = Vec::with_capacity(max_workers);
$ S% ?/ Q7 ]8 t" d& ], [4 v let receiver = Arc::new(Mutex::new(rx));4 A# H# `: J* E: D U; k
for i in 0..max_workers {
7 S$ Q! z" L' l0 g( M6 i: \ workers.push(Worker::new(i, Arc::clone(&receiver)));
2 @5 Y: C: |/ Z' e8 r9 j: b }% X5 C5 F" U1 R
& N- J, I) t/ R# a9 Y) | Pool { workers: workers, max_workers: max_workers, sender: tx }
- o7 g' g' ^, ^- D$ J+ Z8 P# E { }
% x4 R: B3 }% {+ z" y( ` r- {; e2 } 7 J) | [: [& Y+ v# K, N
pub fn execute<F>(&self, f:F) where F: FnOnce() + 'static + Send
& D" k: P4 u2 {4 ? c7 w {; E# s& U! O( K. Q2 O
$ J6 y1 I k* v3 F; I( { let job = Message::NewJob(Box::new(f));
5 A7 E3 g. `, K* o self.sender.send(job).unwrap();: z5 ?" }7 d& D& I1 v
}
- p* a/ V" g G% \3 h}
; c: K: S+ s+ k: W9 j8 P2 ^- i& T
8 n; ~- v* z, E5 T# Aimpl Drop for Pool {
, Z/ n3 u, w& I" n% @5 r fn drop(&mut self) {
5 `! I: a* y& B, U for _ in 0..self.max_workers {2 s& a+ x0 m7 t. e
self.sender.send(Message::ByeBye).unwrap();0 q7 o: q7 Z6 b2 X9 \
}: Q+ t2 k) y' |& g
for w in self.workers {
2 L- F- q: v5 M: Y( Z if let Some(t) = w.t.take() {9 W: d) h8 _7 ~% c1 Q& }, N
t.join().unwrap();
0 O4 n3 b) g! O3 `7 T }1 q: N3 y2 K+ d& J! M, K" V' o! ]
}7 L8 \$ |) M5 C r
}* H; M/ |* @8 G5 ?6 _% }
}
, ?- h2 ^, H/ ]3 x# g( w
7 D P R/ H4 W( {; Z; b4 x& `- A9 }; Q
#[cfg(test)]: p6 M8 n( c, o' P7 D1 P8 u
mod tests {
, {- d( f* N1 T0 R8 m use super::*;! |6 R2 g3 m5 {. L- W0 ?4 u+ k" n3 z
#[test]& n3 v. v3 ?+ d3 `
fn it_works() {& L- s M" i5 {- F; U) y/ B
let p = Pool::new(4);
6 e9 ]( t, S$ S9 K9 `; k& ^$ l5 X p.execute(|| println!("do new job1"));$ r! j7 h; L# A1 f8 a
p.execute(|| println!("do new job2"));
8 l$ k/ d# c3 O% j# ? p.execute(|| println!("do new job3"));
: l+ O, ]& ]- q p.execute(|| println!("do new job4"));( A3 _* P0 b( Y/ m: H. v: d( N0 n
}, O& N/ d( d5 {$ t- N9 u8 W
}3 _9 t3 c8 @' K% R
</code></pre>
) W$ v. n0 T, P0 S, H6 `" Z! G+ ` ]* M
|
|