|
|
0 r% Y8 z* @& F7 P, X<h1 id="如何实现一个线程池">如何实现一个线程池</h1>' E& V4 E/ @+ b. {. t! B5 W
<p>线程池:一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。 例如,对于计算密集型任务,线程数一般取cpu数量+2比较合适,线程数过多会导致额外的线程切换开销。</p>3 B8 u/ `: M; g1 m
<p>如何定义线程池Pool呢,首先最大线程数量肯定要作为线程池的一个属性,并且在new Pool时创建指定的线程。</p>
6 U0 V8 }# }! m$ a<p>线程池Pool</p>
; H& y. ^# G6 x0 {# t* w<pre><code>pub struct Pool {- C+ L: C7 }1 P" Y% e1 Y( ?$ e5 _; F
max_workers: usize, // 定义最大线程数9 c/ ~7 c# p" z
}
# U! d8 J7 k* L8 `
' Y1 v# {/ Q4 e( U3 m7 X, R3 _! fimpl Pool {
" y5 _0 ~0 ]* L) v, s( R$ k7 p fn new(max_workers: usize) -> Pool {}
# m7 o( d5 [; t' Y9 `6 x fn execute<F>(&self, f:F) where F: FnOnce() + 'static + Send {}
^" T8 g* w' b* m' z" L/ e}+ q* y; A5 ^# `' T; X5 a
9 Y( Z1 |4 E7 C" d5 @
</code></pre>
6 {9 ^% p! `: q1 r3 I* _<p>用<code>execute</code>来执行任务,<code>F: FnOnce() + 'static + Send</code> 是使用thread::spawn线程执行需要满足的trait, 代表F是一个能在线程里执行的闭包函数。</p>
/ T% u5 \, C$ a8 |5 U<p>另一点自然而然会想到在Pool添加一个线程数组, 这个线程数组就是用来执行任务的。比如<code>Vec<Thread></code> balabala。这里的线程是活的,是一个个不断接受任务然后执行的实体。<br>5 E9 u/ |$ ~( u: {/ m. e' R
可以看作在一个线程里不断执行获取任务并执行的Worker。</p>1 ?$ H) \# `8 [* ]
<pre><code>struct Worker where
7 s2 Y* ]6 e! S. P' Z{
( Y, Y4 @; H% a N& P- G _id: usize, // worker 编号
& p" I( |6 r- A8 O# f! W2 a}
' X5 w! ?5 r- Z2 M& G</code></pre>6 ]+ d, _6 `8 U. ]4 A
<p>要怎么把任务发送给Worker执行呢?mpsc(multi producer single consumer) 多生产者单消费者可以满足我们的需求,<code>let (tx, rx) = mpsc::channel()</code> 可以获取到一对发送端和接收端。<br>
. g( y* t3 c. ^3 Z$ g把发送端添加到Pool里面,把接收端添加到Worker里面。Pool通过channel将任务发送给多个worker消费执行。</p>
+ h6 o+ P- v) g/ k$ V- G<p><strong>这里有一点需要特别注意,channel的接收端receiver需要安全的在多个线程间共享</strong>,因此需要用<code>Arc<Mutex::<T>></code>来包裹起来,也就是用锁来解决并发冲突。</p>' q; ]6 z6 c5 L0 A7 O3 Y6 h+ r7 r. }
<p>Pool的完整定义</p>: y8 b% N5 _2 M
<pre><code>pub struct Pool {+ I, V8 ], @* C/ x: A' K
workers: Vec<Worker>,# G* N% S' B* l; }( L7 a$ ]
max_workers: usize,
, p# d8 V3 y2 l/ i2 r sender: mpsc::Sender<Message>
6 c4 O7 g9 h6 Z+ e" \. t; z}
* y `0 t+ B( B5 a4 h</code></pre>
( b. ?! b) v% H/ X: t! d T<p>该是时候定义我们要发给Worker的消息Message了<br>5 P) i$ n& V- e5 M
定义如下的枚举值</p>
" T, q. i7 v; N0 f6 O6 a<pre><code>type Job = Box<dyn FnOnce() + 'static + Send>;9 T% u7 _' Z; W2 c
enum Message {
( P/ v* N( P5 }% }$ C# C+ H0 @5 s ByeBye,
E# U6 b# I2 z8 j2 a NewJob(Job),+ {$ d0 r0 L4 W# ?/ J
}
, T7 ?( ~9 ~6 K- y& q5 k8 H' B$ }</code></pre>/ Y9 U, H& L: I- E
<p>Job是一个要发送给Worker执行的闭包函数,这里ByeBye用来通知Worker可以终止当前的执行,退出线程。</p>/ o; n* D/ j+ @0 D% |
<p>只剩下实现Worker和Pool的具体逻辑了。</p>
7 M$ k, l {5 H: ?& t! w5 `, O<p>Worker的实现</p>
2 K* \" j! o! C( Q; i5 J<pre><code>impl Worker6 n. T! S4 M8 L7 f5 ~5 F7 }! F
{
, ^# m6 {: E. ^+ G+ j0 P$ U fn new(id: usize, receiver: Arc::<Mutex<mpsc::Receiver<Message>>>) -> Worker {
* y7 s# D# j L" O# b3 S+ T let t = thread::spawn( move || {
8 r. p/ j' {; F. Q9 E3 m& e+ y loop {& x" s6 F6 _- d7 }
let receiver = receiver.lock().unwrap();
1 k: R! k% _% ~) ~ let message= receiver.recv().unwrap();* ?+ V+ a% M6 o+ w0 J# y6 t% ~
match message {# a+ L5 b5 b& \7 z7 K( M, B
Message::NewJob(job) => {
1 c! G# r. b! S( J4 \$ Q7 d9 m. Y% X println!("do job from worker[{}]", id);
$ z! C5 _ D( T2 r) p2 x job();
6 P1 ?) ]$ N! H: C$ A },
2 @/ V- k0 f' w! g9 w @7 H Message::ByeBye => {* C: h0 L# t) r* i- a. Z& Q: p1 l
println!("ByeBye from worker[{}]", id);- V% |$ C( i% D3 \2 C: [: m
break
$ h# [. b9 g/ b8 c* | q; F },6 ~# O+ u! _4 t( s4 }
} 8 b0 b5 v% ?) I
}1 v" D1 Q: }5 r8 N/ I% ~
});
( B! l3 x! r9 D- b- w" } |
7 a# \1 s2 m6 Y5 P5 n Worker {
+ |+ |( X0 c: \8 } A# Y _id: id,5 X v2 v* |( E) o, N
t: Some(t),$ u$ {1 p& I& t5 z
}
, X- [: f) E0 L( w1 L; @ }& A0 Q( m, b8 `1 }
}
9 P+ q' j+ k4 m K</code></pre>5 F, R0 u- B8 {) @8 r' F
<p><strong>let message = receiver.lock().unwrap().recv().unwrap();</strong> 这里获取锁后从receiver获取到消息体,然后let message结束后rust的生命周期会自动释放掉锁。<br>
/ a' H e7 [! o R但如果写成</p>$ v9 U1 U5 D* V K5 O" g1 E# {, a
<pre><code>while let message = receiver.lock().unwrap().recv().unwrap() {9 S0 w$ u* P$ ~+ @
};& Q; \) ^" ]5 X+ z$ S
</code></pre>
7 `1 G1 s6 {7 y4 L# \7 ]& n<p>while let 后面整个括号都是一个作用域,要在这个作用域结束后,锁才会释放,比上面let message要锁定久时间。<br>* ^4 J4 Z: n: A9 ~( `! H% V
rust的mutex锁没有对应的unlock方法,由mutex的生命周期管理。</p>
3 x9 k6 t) H' @5 ?<p>我们给Pool实现<code>Drop</code> trait, 让Pool被销毁时,自动暂停掉worker线程的执行。</p>+ t. B& x9 B. h1 ]% K- Z8 ~: p
<pre><code>impl Drop for Pool {
- K+ v" V% Q: K c& x fn drop(&mut self) {7 l% ~3 f7 A* F! ?
for _ in 0..self.max_workers {" t( @# k/ F: S1 R" A0 H
self.sender.send(Message::ByeBye).unwrap();- r6 T( ~3 d; q0 |9 V# S# c2 b
}4 a0 h3 s/ @! r3 m r; b, W
for w in self.workers.iter_mut() {
2 f6 R5 H; ^4 u if let Some(t) = w.t.take() {
. Z1 R: `1 ^0 X t.join().unwrap(); l% e; b ?2 Q7 }
}& V& q, k/ |; @4 V% W/ ~/ m
}. U: ~2 V# ~( ?" r( W% [0 q
}
( L }* k% |( t7 K}
" j4 i! H' U' o* r$ p5 t
; z- L* x1 e% W: c</code></pre>
) B u' W7 d3 a" N: M$ s* ~<p><strong>drop方法里面用了两个循环</strong>,而不是在一个循环里做完两件事?</p>/ P# G% ?. R- m1 {7 t$ K+ Y
<pre><code>for w in self.workers.iter_mut() {5 l) o! E- O# P, N q0 x
if let Some(t) = w.t.take() {# P! k5 O4 s( ^7 I
self.sender.send(Message::ByeBye).unwrap();2 v5 a6 {4 K2 M& j
t.join().unwrap();4 v: g" |& `& s/ y: R$ ]
}8 U O# `5 ^8 \8 V
}; |% ]7 H6 Z' ^& m8 q6 r$ S
& I+ e. a2 V" ]8 ?9 x</code></pre>
+ ^0 J! ], f8 v2 E<p>这里面隐藏了一个会造成死锁的陷阱,比如两个Worker, 在单个循环里面迭代所有Worker,再将终止信息发送给通道后,直接调用join,<br>! B; R9 z* O& m& W4 K
我们预期是第一个worker要收到消息,并且等他执行完。当情况可能是第二个worker获取到了消息,第一个worker没有获取到,那接下来的join就会阻塞造成死锁。</p>; v% j2 i* E8 ]
<p><strong>注意到没有,Worker是被包装在Option内的</strong>,这里有两个点需要注意</p>2 B% \" ~/ _% f( a- c
<ol>9 C& D2 f1 s: D( I+ R& Y
<li>t.join 需要持有t的所有权</li>
6 `1 U3 U; w5 q<li>在我们这种情况下,self.workers只能作为引用被for循环迭代。</li>
; [, [0 c% \- X3 a7 | X</ol>, W# k2 ^% a3 t0 {: V
<p>这里考虑让Worker持有<code>Option<JoinHandle<()>></code>,后续可以通过在Option上调用take方法将Some变体的值移出来,并在原来的位置留下None变体。<br>6 U- m3 H. F: l2 {% ~/ b
换而言之,让运行中的worker持有Some的变体,清理worker时,可以使用None替换掉Some,从而让Worker失去可以运行的线程</p>
; E+ F( s5 G, w$ v( r<pre><code>struct Worker where
* x/ U; q9 A. M6 M3 |{6 D2 i* Q5 H( i X* u) Q
_id: usize," n% d5 p5 J: K$ P; v5 j
t: Option<JoinHandle<()>>,8 R: }7 A# [4 ~9 N& s( M
}
& i* j" o( r7 X1 O+ Q# l</code></pre>
. X4 d" R- d# q+ x \9 X2 d<h1 id="要点总结">要点总结</h1># z: f2 L; F; |: I
<ul>* ]1 I1 C2 s1 \9 L3 m% W
<li>Mutex依赖于生命周期管理锁的释放,使用的时候需要注意是否逾期持有锁</li>
O$ u2 L/ L- Y I( J' g<li><code>Vec<Option<T>></code> 可以解决某些情况下需要T所有权的场景</li>
/ n1 k8 Z$ n4 |8 }</ul>! _/ O. V! ?1 `& u! g* N% i, @
<h1 id="完整代码">完整代码</h1>3 Z( G6 D8 K" g4 O
<pre><code>use std::thread::{self, JoinHandle};. E2 u+ @" N8 d. ^8 E
use std::sync::{Arc, mpsc, Mutex};
1 d, q9 x8 B& P7 y' n/ j
/ u8 v, |8 e% }1 w3 G' W
9 z. K H6 k& s4 G* z/ Jtype Job = Box<dyn FnOnce() + 'static + Send>;( h0 @& ~/ d$ T, z' l
enum Message {% z+ v4 ^* @2 E9 }
ByeBye,
: p/ s/ s' C9 J* r- R$ L NewJob(Job),
+ b: V: `% T5 w}
1 u, j, S- H% b( Z: b
# ^- ?! B) J0 x: y6 r Rstruct Worker where: d Q8 I$ z6 T1 {, T; R
{
6 |, ^& U5 C5 c/ M: A/ P! l/ ?4 R _id: usize,
" c; A- l, _/ a" z8 F: a t: Option<JoinHandle<()>>,
7 O$ x4 E4 T+ C6 V. y; T/ N" {}: n' C! V1 @) e E+ i6 R! u
: w- q8 l# N E8 f- r5 ^3 nimpl Worker1 e$ Y) C, ]2 o& s
{. @1 v" R W, M5 b
fn new(id: usize, receiver: Arc::<Mutex<mpsc::Receiver<Message>>>) -> Worker {7 _+ X7 a8 k T1 J, p" U
let t = thread::spawn( move || {
& G$ l9 t9 d, K# | loop {8 a( o+ [& f" I/ i
let message = receiver.lock().unwrap().recv().unwrap(); X3 u+ y: B3 L* u) Q
match message {* W$ Y: ~/ B" H# H4 q4 c2 m6 d
Message::NewJob(job) => {
- t- c j" t2 y7 I, {& { println!("do job from worker[{}]", id);
1 x/ v3 _- l' D; o job();' t8 |# m2 ~% ^ V2 r- u
},0 t; d) I9 k$ _7 V* k. F6 T8 U
Message::ByeBye => {; V1 n; E' I+ ^0 |& t" i
println!("ByeBye from worker[{}]", id);
4 ~$ [/ _5 I( D& T) f break
/ H L+ @$ `$ |8 g' e4 O$ W2 M8 i },: V- |+ e& A- I
}
6 l) M$ o. |1 } J9 T }
) J9 h; y6 X N9 O6 }! ^% t' P });
$ \9 i4 z% G3 N# G/ @+ E! _, {& h2 Q# n2 m
Worker {
8 c9 h: j" l, M _id: id,# j- B( c2 G) i4 Y/ z1 [4 g% C+ U
t: Some(t),* E* D) L# C- r2 X8 M: G$ E0 o" k
}
/ u- j: C( ?, z/ o }+ s H4 O, c: S
}3 G- a' T6 B) i3 g7 |+ p4 ]
/ C: o3 d. c4 F( k
pub struct Pool {
' {0 Y, q0 z, `; d* x; z t& L5 R workers: Vec<Worker>,
! H- k8 U, {5 z+ J3 J6 ?' E max_workers: usize,
/ Q3 [" j& ]3 u# j9 C sender: mpsc::Sender<Message>& B0 G3 M6 r, h) I! w- x* S
}
! d6 r8 y$ }9 r5 Q
3 d) U( N9 k# g( Cimpl Pool where {
0 s6 I. I5 y1 g! E" h" T pub fn new(max_workers: usize) -> Pool {
% U/ H G; `& b3 M* q if max_workers == 0 {( b9 y; v R7 u/ [5 W
panic!("max_workers must be greater than zero!")
4 L, F) L; S2 l( O9 y( H }3 w' j+ P* R8 a4 ]5 _1 _
let (tx, rx) = mpsc::channel();& K, Y6 s- Q& V9 h. A
% p& \# W$ G L* H/ ~) U
let mut workers = Vec::with_capacity(max_workers);6 G- g4 r% P& n3 A% t
let receiver = Arc::new(Mutex::new(rx));- V- a) J5 v3 D
for i in 0..max_workers {/ c( |/ @1 M9 m+ {
workers.push(Worker::new(i, Arc::clone(&receiver)));: l- n# t2 p" x, ]% n3 A
}4 Z" c0 ?5 M! F6 |0 {
' V/ M& h+ [" R# f/ ? Pool { workers: workers, max_workers: max_workers, sender: tx }
1 E- I5 ~0 a9 D: N }6 V5 |4 P" z: n9 H
+ K2 Z. q% v! G: O: h pub fn execute<F>(&self, f:F) where F: FnOnce() + 'static + Send/ O- G( z7 I( i9 {" o+ ^
{1 b, c( x9 t8 U2 U
* u0 j! k) ]' U( c% O5 [5 ` let job = Message::NewJob(Box::new(f));
) i3 C# v' R" A8 J# o0 s y self.sender.send(job).unwrap();+ T$ w3 R3 _, L% c( z
}# }8 w1 t# P l5 b) F
}
. f9 w. Z$ Q- D1 @8 H: K7 w6 K* s) i1 X6 P5 q0 S/ l5 E9 q
impl Drop for Pool {
& p$ R0 i* g3 @ fn drop(&mut self) {2 n6 {3 T- k5 e" K& U8 \
for _ in 0..self.max_workers {
5 D" k/ ~! D8 b$ Z# b- A self.sender.send(Message::ByeBye).unwrap();
. ^; m, Y# i; L: a, C+ h }3 B1 `7 \5 R& F. O k* L! K
for w in self.workers {' U" |$ L2 K( M b1 O
if let Some(t) = w.t.take() {
% l: U8 ^& Q8 D- D. d4 T2 Y t.join().unwrap();
2 g6 j) B7 F+ G! A% @& Q' L+ c }
# B+ @" _9 ^: D2 Q2 ^ }% B, H4 I8 o9 m( D
}6 `2 @2 G$ U. m1 {
}
! q q7 H5 a8 g( }- R
1 m3 g! b$ H) {. D' A* q9 y9 c8 B
1 m- t3 ^' \9 {#[cfg(test)]7 F' H! @0 U* w1 _. k) t) R4 m: I
mod tests {2 D( ^; u% e0 S9 D9 K4 O
use super::*;0 e2 N' W2 i( e& K0 [5 b
#[test]
6 ]4 n1 a M: E1 N fn it_works() {( ~; Y- R8 c- Z4 [& t' Q* A. @
let p = Pool::new(4);
& o2 B+ Q- R5 j p.execute(|| println!("do new job1"));
) l' z: o( _0 L p.execute(|| println!("do new job2"));
; q9 E. O1 J, t4 N; r3 R p.execute(|| println!("do new job3"));$ }& P; {& v% d' U8 F
p.execute(|| println!("do new job4"));
8 X; v* A2 ]+ V2 Q } y; { @+ Q3 i5 X' D
}
' I# z; |* r& o. i% ^+ M1 s</code></pre>
# R% \0 o- w! {) L% _- F& H
1 I* M) ]/ v1 j, b5 T. L. { |
|