|
|
6 ]3 V) Y `3 ^% ^2 z- y4 z. h7 E
<h1 id="如何实现一个线程池">如何实现一个线程池</h1>
3 {& A [ \. C# J* s<p>线程池:一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。 例如,对于计算密集型任务,线程数一般取cpu数量+2比较合适,线程数过多会导致额外的线程切换开销。</p># W$ y3 q" h' ~/ {. _- `
<p>如何定义线程池Pool呢,首先最大线程数量肯定要作为线程池的一个属性,并且在new Pool时创建指定的线程。</p>5 N9 i& M" n, _3 B5 \
<p>线程池Pool</p>
8 o7 @; a+ ~6 |5 `$ L! E3 y. [<pre><code>pub struct Pool {3 U' D, X7 E3 v5 m4 }
max_workers: usize, // 定义最大线程数
& Y+ [& s9 m+ a. [4 s, O: _}
+ |- _9 ]9 e# P+ Y \( r8 E0 o" `7 `( i" k
impl Pool {
; {! }3 s; }0 j9 e fn new(max_workers: usize) -> Pool {}
: v' C! V: W c; B fn execute<F>(&self, f:F) where F: FnOnce() + 'static + Send {}- Y& c" i' j5 ^ p. G6 `4 @/ c
}
; M( m. w2 U0 \# X# v0 @- z! L
0 O# L5 i9 \& S# ^' F) p8 ^</code></pre>
5 u$ M! m. [5 b3 f/ ~$ y<p>用<code>execute</code>来执行任务,<code>F: FnOnce() + 'static + Send</code> 是使用thread::spawn线程执行需要满足的trait, 代表F是一个能在线程里执行的闭包函数。</p>2 ~* O' V* |8 W3 }8 n+ l
<p>另一点自然而然会想到在Pool添加一个线程数组, 这个线程数组就是用来执行任务的。比如<code>Vec<Thread></code> balabala。这里的线程是活的,是一个个不断接受任务然后执行的实体。<br>
/ r, O S7 q" `; M% ?6 Q7 }' z# T可以看作在一个线程里不断执行获取任务并执行的Worker。</p>: d. ~ f" C1 f
<pre><code>struct Worker where
# Z/ Z7 Y- \; _8 q: t{
. B6 }* M6 ~" N& k, `/ h _id: usize, // worker 编号
0 S _$ A8 ^( x8 }* d6 I}; t2 G+ H$ a/ Q6 k9 |
</code></pre>4 u3 O3 D0 [( W- s* e
<p>要怎么把任务发送给Worker执行呢?mpsc(multi producer single consumer) 多生产者单消费者可以满足我们的需求,<code>let (tx, rx) = mpsc::channel()</code> 可以获取到一对发送端和接收端。<br>/ \4 M2 N5 O; t( G! V
把发送端添加到Pool里面,把接收端添加到Worker里面。Pool通过channel将任务发送给多个worker消费执行。</p>
& k# ]2 }0 }; I5 w) c<p><strong>这里有一点需要特别注意,channel的接收端receiver需要安全的在多个线程间共享</strong>,因此需要用<code>Arc<Mutex::<T>></code>来包裹起来,也就是用锁来解决并发冲突。</p>: P, a7 b: F b
<p>Pool的完整定义</p>+ O' B1 ?+ i+ h; h
<pre><code>pub struct Pool {
3 C2 b0 M# ?: U% J! { workers: Vec<Worker>,
3 s) R( t) D7 W6 u% v3 Q; \ T max_workers: usize,
0 a, R! v2 H9 ]% o( t% W sender: mpsc::Sender<Message>
4 W, z$ c& b' C1 L* m}
$ R, o% M$ j$ y</code></pre>; n) P" J; y2 q8 r9 Z* ~
<p>该是时候定义我们要发给Worker的消息Message了<br>
- z- k# q9 S$ _. O; d定义如下的枚举值</p>
( f3 L' I$ H& ?8 |4 g, s( Y# A<pre><code>type Job = Box<dyn FnOnce() + 'static + Send>;1 _* a3 w9 V* h7 n. Y: n
enum Message {' {6 p- Z) W" @& ?
ByeBye,8 }2 T/ \5 T4 f, a! E! D* `1 A
NewJob(Job),. B, A5 R( Q% s- u, J/ G+ m( h
}
& L' K, ?. N$ [& p- {( H6 Q</code></pre>
, }( q' ~/ M$ w5 D5 P<p>Job是一个要发送给Worker执行的闭包函数,这里ByeBye用来通知Worker可以终止当前的执行,退出线程。</p>
& u: s6 b( X& z+ a( v1 _% b1 s<p>只剩下实现Worker和Pool的具体逻辑了。</p>
! I; n8 X; Y$ g1 }/ c- D9 f# [<p>Worker的实现</p>1 h( P/ _8 k+ m+ i+ X, v. H: ~
<pre><code>impl Worker
) [( Q# R- E! f/ E/ L{7 G; z) y$ Y" I+ `
fn new(id: usize, receiver: Arc::<Mutex<mpsc::Receiver<Message>>>) -> Worker {
# B7 `8 B* j; M9 O) w let t = thread::spawn( move || {6 l$ Y `3 q7 ~6 r' b* s5 \- p$ Z
loop {
! k; L% x( U0 j6 n let receiver = receiver.lock().unwrap();: }" N, V& @& P4 W; E1 h- ]; M4 ^, o- U9 k
let message= receiver.recv().unwrap();# `/ q3 a. m% x; a
match message {" w$ `! g8 l1 o5 {
Message::NewJob(job) => {4 a# D" }, B- B2 o+ Q# Z9 d
println!("do job from worker[{}]", id);6 O* H# Y6 |/ N3 C2 q5 n
job();
# n" f+ g! R( J5 o },' Z9 ~6 k4 @6 m: Z& k8 P0 L
Message::ByeBye => {
- h6 H3 J6 M: ` println!("ByeBye from worker[{}]", id);# f F: V- z4 z! `3 F* A0 f
break
0 A/ O9 J3 V$ f/ v- |/ N- [" j P },% O u% d5 y$ S ~
}
' v$ X- M% ]! a/ n& E: \ u }$ }' {* r! r2 P
});
+ h: E+ b& h1 S& L7 G: A
( ?( g( j6 I: N Worker {: ?$ m7 `5 B6 w& L: i: B8 T1 v
_id: id,
( k$ K6 _: g* H t: Some(t),
7 N1 J& n( `$ I }7 D' e9 a8 l! \' }1 _
}
/ z( ]3 W* h: _}7 d2 J% \1 B" [% c9 N* P( u2 N
</code></pre>
% q: {! F% I4 X<p><strong>let message = receiver.lock().unwrap().recv().unwrap();</strong> 这里获取锁后从receiver获取到消息体,然后let message结束后rust的生命周期会自动释放掉锁。<br>1 L+ L. n. C7 D6 U- z7 m. g2 Z& `
但如果写成</p>: Q8 `( u7 l% ^! g6 B2 C8 z4 d
<pre><code>while let message = receiver.lock().unwrap().recv().unwrap() {3 Z: s( m, y8 I: n; {( ^
};
- {! p# {( J, J. g</code></pre>
3 u, a+ w/ Z2 ?) j" M" L; u<p>while let 后面整个括号都是一个作用域,要在这个作用域结束后,锁才会释放,比上面let message要锁定久时间。<br>
t' w& A: L6 ?# [9 frust的mutex锁没有对应的unlock方法,由mutex的生命周期管理。</p>, m) ?) [1 O6 ]# _2 t u' j3 z
<p>我们给Pool实现<code>Drop</code> trait, 让Pool被销毁时,自动暂停掉worker线程的执行。</p>
2 f8 \5 u- d: q) ?( A: }<pre><code>impl Drop for Pool {( ]- i9 m$ W$ d! M {. \
fn drop(&mut self) {; o6 j, @* K; h+ S% o
for _ in 0..self.max_workers {
7 ]/ r& ~5 i+ I" m: l$ S4 R self.sender.send(Message::ByeBye).unwrap();$ R/ x1 W$ i3 E" `
}
6 s8 q7 X0 M3 I# } for w in self.workers.iter_mut() {- `( G6 x& X; q- q! K1 j4 H5 k5 O
if let Some(t) = w.t.take() {: C) D3 j+ c. \# A: p
t.join().unwrap();
: G9 l3 c t. [: I }6 x2 g: D5 O$ T: ?5 P( N" W
}
- ~1 o% [9 P, e. ]* a }
' z6 p1 Q, @1 I% Z, u}
. U Z8 l! i! ^* ?% [ u F0 l1 P6 \' a2 s0 L
</code></pre>
$ Q @+ p4 B- g/ V8 i<p><strong>drop方法里面用了两个循环</strong>,而不是在一个循环里做完两件事?</p>
8 G& V. e; Z3 Z/ c<pre><code>for w in self.workers.iter_mut() {- I! g6 m8 \5 n: H& E' c6 g
if let Some(t) = w.t.take() {
# k/ B3 s' A2 { self.sender.send(Message::ByeBye).unwrap();! Z$ j7 p1 z2 t- k7 Y7 i3 d
t.join().unwrap();
' V( M; X j- f' s. j+ N2 e }% t# M2 m0 e5 b1 \* r( X
}
: F# e! z |9 _ G2 r/ y( n# M1 f, @8 f2 s
</code></pre># S# f) k) `+ w1 Q
<p>这里面隐藏了一个会造成死锁的陷阱,比如两个Worker, 在单个循环里面迭代所有Worker,再将终止信息发送给通道后,直接调用join,<br>
! v5 [$ i+ a* I我们预期是第一个worker要收到消息,并且等他执行完。当情况可能是第二个worker获取到了消息,第一个worker没有获取到,那接下来的join就会阻塞造成死锁。</p>
' G# I" ]7 r7 }% x* D1 N$ ]! T<p><strong>注意到没有,Worker是被包装在Option内的</strong>,这里有两个点需要注意</p>
& M e% g0 @& q- ?, X<ol>$ M/ R5 k7 y; U0 k1 O3 v
<li>t.join 需要持有t的所有权</li>) R5 T" K6 O4 |( I6 p. ^, ?
<li>在我们这种情况下,self.workers只能作为引用被for循环迭代。</li>4 B5 h) z% y( O e( ^
</ol>
! h" c) G6 c2 |0 {7 ?<p>这里考虑让Worker持有<code>Option<JoinHandle<()>></code>,后续可以通过在Option上调用take方法将Some变体的值移出来,并在原来的位置留下None变体。<br>
9 h3 Y% R# H, E换而言之,让运行中的worker持有Some的变体,清理worker时,可以使用None替换掉Some,从而让Worker失去可以运行的线程</p>
' f& S+ G8 G9 s<pre><code>struct Worker where; B& x! X+ P) c. H" |9 J
{4 ] P. `: k) S: u
_id: usize,+ S( U8 Z6 c9 P9 U A5 x+ N8 T
t: Option<JoinHandle<()>>,
! O/ i0 R/ u$ Y" P$ P. Q8 C* ~}7 I% G' o. w {
</code></pre>
# o. V2 p3 G0 Y$ P) W; S& Q0 p: e. \<h1 id="要点总结">要点总结</h1>
* G8 P/ v8 R8 [# L1 P2 U) M<ul>; \$ @6 `" P& \
<li>Mutex依赖于生命周期管理锁的释放,使用的时候需要注意是否逾期持有锁</li>
2 @$ m, B) m# r5 j: V2 D<li><code>Vec<Option<T>></code> 可以解决某些情况下需要T所有权的场景</li>
6 d, q9 a$ w$ I</ul>, _ ^0 i. } a3 n* Y1 V/ f
<h1 id="完整代码">完整代码</h1>
3 f( ^+ }: Z* e* q& o+ S" J& V<pre><code>use std::thread::{self, JoinHandle};) e( [% L; C2 T4 v# o) s6 P5 i. c
use std::sync::{Arc, mpsc, Mutex};! x/ @0 ?1 c% D/ e2 L
+ t2 F4 s( c, e* V! P6 D: j! n
$ G H) F! R+ N8 s' Atype Job = Box<dyn FnOnce() + 'static + Send>;( Q+ k+ W$ {& t; o7 w: \7 d# X& i
enum Message {8 F& ?0 B, ~% U/ E; {! u) c; Z4 }
ByeBye,0 h5 u/ \7 w4 F2 J
NewJob(Job),4 n& x# r _' E! [3 T
}. q( S/ z4 h# o/ S9 y6 ?
! m( k6 j0 g6 ^struct Worker where- m/ X8 J) x7 b9 `) ~
{
$ Q4 r7 j9 a: I9 k- G# ?7 w _id: usize,' H. u. ?+ L- U8 `
t: Option<JoinHandle<()>>,
3 B$ j+ |$ B: M}3 ~( D3 e0 h- n7 m0 a* }% j9 }
- M' i( g9 Y+ m/ `- Eimpl Worker% s* d8 L: k& Z. E$ K% f
{
9 D: Y/ o' m0 @/ |7 w fn new(id: usize, receiver: Arc::<Mutex<mpsc::Receiver<Message>>>) -> Worker {
: ^9 W2 M1 | b' Y let t = thread::spawn( move || {
# y" i6 g: z. c7 p loop {0 }! ?& M7 v9 D; o0 j
let message = receiver.lock().unwrap().recv().unwrap();6 i7 M7 k, m0 L2 B4 m3 [2 O
match message {
. ]% f; R$ J8 ^: O Message::NewJob(job) => {
, R, r* C+ V( A$ z0 m+ Z. @5 ? println!("do job from worker[{}]", id);
( G7 @( O( j* v* W( H9 a' { job();
& Z8 Y7 G* x! x) @2 k5 ^ },
4 r i2 {4 M0 _; _! d9 @; Y. I& o Message::ByeBye => {
$ V+ l% Z- }3 H- M println!("ByeBye from worker[{}]", id);9 U7 E: b+ g1 O& t
break0 f! v' W9 m! r
},
. k g6 \& `" Q7 i3 U$ S } ; D! J: i, E m2 W2 e
}0 i( n6 k$ Q; Y- W& ?& b
});1 _- z6 x& C/ c+ u
7 l! [6 k( ]& ^5 [* @ Worker {
* P% O7 u0 J; C8 T5 L% V _id: id,5 w4 F3 G' g- }: N+ `1 ~$ O
t: Some(t),- I7 l" P$ t2 Y% F4 ^
}: [ V0 Z* z+ [/ x4 l- a8 M
}9 p- U4 M6 s2 ~) R0 b F- h% I: D1 _
}
( l6 V5 _1 G5 ]0 f' I5 g# [* G6 X1 V K$ j
pub struct Pool {
4 N7 g, o" j# M8 S workers: Vec<Worker>,
- p9 R# h7 j) B% H& N& ` max_workers: usize,
2 _5 P0 F/ s' U- r( ^ sender: mpsc::Sender<Message>
2 t& G- t5 L8 G- E1 g}6 X4 C6 E& r4 k& `% }
+ i* O+ D# {% h& X% @ @impl Pool where {( _$ a# h. J( U9 K/ E
pub fn new(max_workers: usize) -> Pool {
5 t* `' y9 g6 R3 b if max_workers == 0 {
; F& e6 ]9 J8 T+ {; u5 I2 v0 T$ { panic!("max_workers must be greater than zero!")
4 ]! A, O' n/ ~4 K" y3 v0 ` }
( }# ?9 n! j8 Y let (tx, rx) = mpsc::channel();
7 `# L; e+ G7 k8 l- v) {. o* _
: _- x: j) Q% D6 H [6 r let mut workers = Vec::with_capacity(max_workers);) X K1 S% P1 q; F8 L; |
let receiver = Arc::new(Mutex::new(rx));! o7 T0 l5 X. n( G- S9 v3 h! R& s
for i in 0..max_workers {
6 Q2 e3 C6 T% l& a- K* o workers.push(Worker::new(i, Arc::clone(&receiver)));* S6 R B2 `* _+ G* Q0 {
}1 z/ \# v3 w1 i, W2 N
1 d- b/ X* s# u- R, ? Pool { workers: workers, max_workers: max_workers, sender: tx }
. k7 Q/ P/ x' Q5 p }
# K3 Y/ e5 Q. f0 J2 @- c 6 Q+ J' H. B; f( c1 `7 w
pub fn execute<F>(&self, f:F) where F: FnOnce() + 'static + Send6 w9 u% }; @0 t) \) M
{
5 T; s. o7 C; D5 j3 H' Q) r
) [! ~* o2 J% \" Z. O let job = Message::NewJob(Box::new(f));% w, x1 M: h( I5 M
self.sender.send(job).unwrap();& I/ \: ?) B+ N7 b
}
) A$ K& k. K/ R} ~, ~) M* N T0 y, N m) ~8 |, E
: T6 @$ J9 b$ t4 f0 _: ^impl Drop for Pool {
6 n4 F' S0 P" E0 |0 [- n$ ]+ v fn drop(&mut self) {
3 @: n1 j# G* D for _ in 0..self.max_workers {( W. K7 T8 ^; L) D" j4 n
self.sender.send(Message::ByeBye).unwrap();
`$ b: U9 `) ` I }
" Y% ~2 n6 v& [" ~* b for w in self.workers {# o" E! x% C5 Z3 N
if let Some(t) = w.t.take() {
& W: F7 a0 Q% ~* o4 P t.join().unwrap();! p0 b! b$ A7 f; m- G4 d6 a/ h8 F* B
}3 o2 J, v; d1 \# S
}- l3 F& Q Q9 a5 o5 ~4 N4 m! q5 M1 U
}% d" x# l, {( d2 |5 Y! A
}* Z9 c" \# @5 I1 ^
$ @) @0 S. h4 P: \, q4 s( G
2 ]8 u% K* u3 \#[cfg(test)]$ U y. k. { S. {+ D6 z
mod tests {
" }$ Q! _5 B4 ]' L use super::*;. s b: \9 l N
#[test]
0 z4 C0 O0 c' T' c6 m7 G8 a2 f fn it_works() {
1 {2 e5 v: q$ ?$ U2 ^9 @2 m& }8 r let p = Pool::new(4);* P1 P; ^& o4 j- n: _
p.execute(|| println!("do new job1"));
; d% ?0 R4 l) V7 ?# K B p.execute(|| println!("do new job2"));- m& b8 v5 y0 e" a9 H6 d& B9 ~+ [
p.execute(|| println!("do new job3"));
6 Z8 f* L" w. a% o. X p.execute(|| println!("do new job4"));
" ]' {" v, ?% T; l& E4 a0 \# D }
& F6 L$ t( j" K! h* A; e}
+ e3 x5 U Z& W, W2 m8 a1 H</code></pre>
3 Z- x( M; w, U4 p! T3 g$ {8 ~( {" \& X2 I) Z
|
|