|
|
! Y$ l% j( x5 C3 T
<h1 id="如何实现一个线程池">如何实现一个线程池</h1>: B: } J" k" g
<p>线程池:一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。 例如,对于计算密集型任务,线程数一般取cpu数量+2比较合适,线程数过多会导致额外的线程切换开销。</p>
5 ?' Z, a4 P$ k H<p>如何定义线程池Pool呢,首先最大线程数量肯定要作为线程池的一个属性,并且在new Pool时创建指定的线程。</p>/ V. }" p8 V6 f8 ^$ f/ e) w: h4 O
<p>线程池Pool</p>+ C8 d. U. J4 z8 q( a
<pre><code>pub struct Pool {. u* Y( c3 g8 r* m, f* M/ k( d
max_workers: usize, // 定义最大线程数
) g& O; i4 ~; S/ M5 U}' ~3 Y* y' A0 l6 ^
; M9 [9 v9 T& ?, Gimpl Pool {
2 H! Q+ U# E# \2 ~ fn new(max_workers: usize) -> Pool {}
F$ S" k% H; j" \ fn execute<F>(&self, f:F) where F: FnOnce() + 'static + Send {}+ O* `& ~( @) c1 V4 ?" o
}3 r4 z6 K; w, I1 `7 {
8 D: d- t7 |' w; d7 G- ?+ s) ]
</code></pre>
i( A- @9 r' s/ R<p>用<code>execute</code>来执行任务,<code>F: FnOnce() + 'static + Send</code> 是使用thread::spawn线程执行需要满足的trait, 代表F是一个能在线程里执行的闭包函数。</p>
" P' o* L5 `0 Z( h<p>另一点自然而然会想到在Pool添加一个线程数组, 这个线程数组就是用来执行任务的。比如<code>Vec<Thread></code> balabala。这里的线程是活的,是一个个不断接受任务然后执行的实体。<br>! r+ U) w, d( v! C5 ]
可以看作在一个线程里不断执行获取任务并执行的Worker。</p>
7 g, \* [& t' y* s<pre><code>struct Worker where
3 q0 @% Z x; Y8 f2 P& Z" U: t' J{
5 t% n4 q/ l* ] _id: usize, // worker 编号2 b) W/ ~ V g) D
}
# K& S' ^/ M: [! C2 K) F9 N/ `</code></pre>. O5 w7 [ T, `+ W
<p>要怎么把任务发送给Worker执行呢?mpsc(multi producer single consumer) 多生产者单消费者可以满足我们的需求,<code>let (tx, rx) = mpsc::channel()</code> 可以获取到一对发送端和接收端。<br>
+ X g5 H: N, ^3 Q把发送端添加到Pool里面,把接收端添加到Worker里面。Pool通过channel将任务发送给多个worker消费执行。</p>7 g- S* _2 X; q% m& P( ]4 V
<p><strong>这里有一点需要特别注意,channel的接收端receiver需要安全的在多个线程间共享</strong>,因此需要用<code>Arc<Mutex::<T>></code>来包裹起来,也就是用锁来解决并发冲突。</p>( \5 u$ F5 X [( ^
<p>Pool的完整定义</p>2 n; e2 L! v% Z7 F. q2 e8 v
<pre><code>pub struct Pool {
, W; ~! X1 w' J9 D& @ workers: Vec<Worker>,
' W; K. V3 F( r& F) E. | max_workers: usize,
! j- z1 U5 `- m$ W; M: ~ sender: mpsc::Sender<Message>. t; `5 j2 q8 D0 o @. L9 G: Z6 {
}
8 p0 o8 s0 z5 {</code></pre>. p" w& c8 o/ N3 P. ~9 `/ D
<p>该是时候定义我们要发给Worker的消息Message了<br>* q* P: l# D- P$ Z+ `. O
定义如下的枚举值</p>3 E9 N! i3 Y! x2 r4 C; p+ g: C+ e- W
<pre><code>type Job = Box<dyn FnOnce() + 'static + Send>; S) O6 y* ^7 w
enum Message {
6 l, w4 j& k8 _4 B% f0 Z ByeBye,
) K6 S# n0 s0 d0 C" o NewJob(Job),) \6 ^! W, ]6 Z! c
}3 v# ~- A5 S9 f6 M( B- u
</code></pre>/ U# V% c: m2 ^( u& G
<p>Job是一个要发送给Worker执行的闭包函数,这里ByeBye用来通知Worker可以终止当前的执行,退出线程。</p>: F! f9 }. K$ s. S- f. Y( g
<p>只剩下实现Worker和Pool的具体逻辑了。</p>. ~+ z3 ]8 Q* c/ G) Q$ w: E5 s" d
<p>Worker的实现</p>
; U* d& v" B- l% v- {' @* p' X<pre><code>impl Worker1 c( P2 j) l3 u. r9 Q
{/ ?6 b" p1 Y2 a, W
fn new(id: usize, receiver: Arc::<Mutex<mpsc::Receiver<Message>>>) -> Worker {
6 ]/ c, G+ h; h: N2 z) G let t = thread::spawn( move || {
7 t7 G, s' l5 Z# |+ @7 N7 X loop {9 A. E. e% b0 W' N3 s$ W
let receiver = receiver.lock().unwrap();5 }/ k, | g9 S
let message= receiver.recv().unwrap();4 J- q% d) S! w1 q4 ~" C0 k1 u
match message {* `3 E$ ?! H4 v3 a" l$ u
Message::NewJob(job) => {# b% D1 K/ I/ l2 m2 X. ?4 A
println!("do job from worker[{}]", id);) g$ W. H2 K8 W; x' p
job();$ r$ h7 c5 {, [: O8 U
},( O- J2 g# X% L5 n
Message::ByeBye => {5 C8 W: _/ t1 h c( k
println!("ByeBye from worker[{}]", id);
2 ? ?/ F" H! {* \( v break
; |$ h. x! O, ~/ a. g M },
& C! @: v0 F0 b1 P. s# j }
; ^8 ?$ g! t/ E2 H$ m, Q/ f; @! }2 J( m }# S/ i) b, g$ G% x. w- i5 i. P
});
8 t V: j0 V) Z/ {% i
- k7 Y3 j: r+ w* \1 V Worker {& t! h9 g" A& \" |+ n8 Z; @5 l
_id: id,! i- ~3 n/ u9 z3 D8 l# |
t: Some(t),; R, K+ P! Z- Q. J" M
}
0 n3 ?- b, O( |3 H# u5 K }
: b# R4 v; w8 R! ]9 |) B: r}9 R: v8 e( ^, ], V0 L6 A* N5 D
</code></pre>: l+ r9 E* k, \8 Y7 {! d- q" E
<p><strong>let message = receiver.lock().unwrap().recv().unwrap();</strong> 这里获取锁后从receiver获取到消息体,然后let message结束后rust的生命周期会自动释放掉锁。<br>7 ^: h& [! F) l4 ]
但如果写成</p>
! ?6 O" d* y" V' z! l7 d, l$ }3 s' p<pre><code>while let message = receiver.lock().unwrap().recv().unwrap() {
( A% y4 g" r( G+ }& ^};
1 l) l( Z$ L, g) M2 e8 k! K</code></pre>7 u/ g9 N& E4 N+ J8 Y
<p>while let 后面整个括号都是一个作用域,要在这个作用域结束后,锁才会释放,比上面let message要锁定久时间。<br>
' }3 m& h4 M' N& ^7 v5 g3 C( Orust的mutex锁没有对应的unlock方法,由mutex的生命周期管理。</p>( H# @# \2 G" x
<p>我们给Pool实现<code>Drop</code> trait, 让Pool被销毁时,自动暂停掉worker线程的执行。</p>' a* R8 A( m& N+ ]% B! i( V
<pre><code>impl Drop for Pool {
! t9 B, X! s2 w2 L fn drop(&mut self) {3 |$ d c$ [( d2 I# ]7 G! n
for _ in 0..self.max_workers {7 }5 h: F' Z* Y: `
self.sender.send(Message::ByeBye).unwrap();; n% d7 z6 ], d0 ?
}! l! Q, C) R% b, w1 j0 I7 u
for w in self.workers.iter_mut() {' N) `# x- y5 G; v0 O' L) w' u
if let Some(t) = w.t.take() {0 q3 u, K& X+ v; D
t.join().unwrap();
3 T) ?' t. w2 u) s3 U }6 R }5 D1 S) f
}0 j& N* _$ r+ q9 Q
}) A! L' \( F2 K- A
}9 O) A9 c- \5 e8 E$ k# J
2 _- {- G1 N2 V6 `- d3 ? D3 C) |
</code></pre>( W V5 p0 Z% [( T6 X' R0 h- B
<p><strong>drop方法里面用了两个循环</strong>,而不是在一个循环里做完两件事?</p>
- E8 a2 ^- F( Q$ G<pre><code>for w in self.workers.iter_mut() {& P8 e& c% ^1 g1 |: M
if let Some(t) = w.t.take() {. a# u, l0 t9 v% M$ o- S
self.sender.send(Message::ByeBye).unwrap();' }9 I# H$ K" i) h3 t
t.join().unwrap();! v+ k# y( c2 ^. z% @7 f3 J
}
( W# a6 c+ F9 i* v}
5 _ P; D+ d8 d" q: O# S1 t
) ^5 ]7 Z, m/ |3 i% g, a3 e</code></pre>
& U) L, F2 C- f% `% C+ R% I3 O<p>这里面隐藏了一个会造成死锁的陷阱,比如两个Worker, 在单个循环里面迭代所有Worker,再将终止信息发送给通道后,直接调用join,<br>
R# |5 P7 P3 ~# r我们预期是第一个worker要收到消息,并且等他执行完。当情况可能是第二个worker获取到了消息,第一个worker没有获取到,那接下来的join就会阻塞造成死锁。</p>
8 L6 b/ p$ p; i% |5 F* \<p><strong>注意到没有,Worker是被包装在Option内的</strong>,这里有两个点需要注意</p>
W8 f* [/ e# Q9 S6 Y5 M3 i. l7 `<ol>
4 {2 [/ U& V: Z4 E<li>t.join 需要持有t的所有权</li>
% w: C; P! D _* C0 K' M<li>在我们这种情况下,self.workers只能作为引用被for循环迭代。</li>! {, G7 R0 k. V7 d) O H
</ol>
2 j) j/ K$ R' G) y' `# T1 _<p>这里考虑让Worker持有<code>Option<JoinHandle<()>></code>,后续可以通过在Option上调用take方法将Some变体的值移出来,并在原来的位置留下None变体。<br>" ]7 l$ \6 T+ N) M @7 e
换而言之,让运行中的worker持有Some的变体,清理worker时,可以使用None替换掉Some,从而让Worker失去可以运行的线程</p>
- v! _# K0 p5 F- I<pre><code>struct Worker where9 |7 |3 e: Q0 |) ^5 F
{
3 g) E3 e* D# k* H6 \( b _id: usize,4 F9 c5 }+ B" B3 i0 S
t: Option<JoinHandle<()>>,
9 o6 B% P6 b7 P; S- e; |}' [, `) a+ `. D- }) R; j; Y
</code></pre>
% Q' [7 _- [3 O K<h1 id="要点总结">要点总结</h1>4 M5 G! W' O* G/ G$ f
<ul>
5 i4 m+ C+ ]0 p<li>Mutex依赖于生命周期管理锁的释放,使用的时候需要注意是否逾期持有锁</li>3 ^- H, f2 d% ]8 F; y# E) R) g
<li><code>Vec<Option<T>></code> 可以解决某些情况下需要T所有权的场景</li>
% j4 X7 }) p/ q- b: _& j</ul>; a- z) d2 x I- h
<h1 id="完整代码">完整代码</h1>6 H% s" c `0 i7 _
<pre><code>use std::thread::{self, JoinHandle};8 A l3 Q( s! R' f2 U! s( j
use std::sync::{Arc, mpsc, Mutex};
& w6 ~. J- I. |( b* N
8 g% Z9 b) Q ]; U2 S$ C0 t3 G) ? I/ A
type Job = Box<dyn FnOnce() + 'static + Send>;$ |7 N- W' l5 S( I5 P
enum Message {
% d# I1 J; M P5 I+ ~! N( R2 x ByeBye," r- {5 @! i" B2 `4 r
NewJob(Job)," E/ Q+ Q2 c+ H/ [5 F- @( [
}0 o7 J/ b/ X' k- l+ e
' [; y' J& A+ k. Ystruct Worker where: @3 m: N8 t: \5 E- H1 e+ w; Z+ P
{& G/ J9 o" s2 o5 c2 P9 F
_id: usize,
1 M6 h8 E% h; j5 `# f! G* {7 W( z t: Option<JoinHandle<()>>,
$ z) V. M( `5 j1 g, c6 [; [! d* A} ~' E. E: K* y& z7 Y4 x. J
" @& Y1 g4 y0 x- _8 b# h0 O
impl Worker8 {9 }- s4 v& O4 J
{' K1 b# ^/ P& K, ~2 g+ E
fn new(id: usize, receiver: Arc::<Mutex<mpsc::Receiver<Message>>>) -> Worker {
: A" F: P$ f; K- ?! | let t = thread::spawn( move || {
( Q' B; D$ N9 [+ v loop {
1 J* h* K" K0 f% y4 o" R* e: m3 t1 Q let message = receiver.lock().unwrap().recv().unwrap();! a/ r6 c. e) Q; Q1 [0 W
match message {
8 t7 f/ y1 O% j1 D3 T& y" P Message::NewJob(job) => {' i3 c" }5 j+ x- _) t+ \2 F
println!("do job from worker[{}]", id);
1 Y P" h) d" z: Z1 O% r! Q job();5 k& }6 w; [- {; g
},0 \! ^6 J. t' z' q9 P$ s" A* t6 f
Message::ByeBye => {
! ^8 s2 X) }" ~6 F- A println!("ByeBye from worker[{}]", id);
' z" J; O9 {5 I6 W. y% i break/ u$ R; U6 j2 Q5 b
},
7 u% s1 I: U8 s4 q }
m- h/ C$ B x- o2 D6 k( j; ^0 l6 I }
) u+ }- i. S" k5 e3 r$ V2 l });. R) L! i# f; E# f
) F x+ ?# n$ @ ?2 C: W
Worker {* x; A7 r5 F1 ~! I1 y
_id: id,
* z& Z5 A% _$ ]$ E t: Some(t),+ }* K) N9 x) \ M& i
}
; u5 z2 \2 e/ u/ y; o' g, Y }8 X$ E2 Y( J0 f* m2 A0 L# z
}
3 n1 }7 I" N4 R; f) { V' d! ]6 ~4 Z* s6 L& ] L( J
pub struct Pool {
$ \+ L- j B% i0 t# l0 H workers: Vec<Worker>,
' I9 S4 h: ~1 J& a max_workers: usize,, {% |" [' A: Y+ o R
sender: mpsc::Sender<Message>
' N: z! {0 T; n: b$ F}
5 n! C6 g2 d- U) @$ ]( t* A: _
& f3 U4 E( u: I$ F* Z; Wimpl Pool where {0 m* u" o% e: \0 m" e, s
pub fn new(max_workers: usize) -> Pool {; {) C: v1 H6 k B- \0 m+ t2 p
if max_workers == 0 {; t& F, w5 T+ C2 q6 u
panic!("max_workers must be greater than zero!")
$ E% F$ [! o& { }7 ?2 t5 p1 z& N* l
let (tx, rx) = mpsc::channel();
! z5 v' p2 T1 c# ?/ ? [$ F8 g4 O6 o1 X+ e9 V
let mut workers = Vec::with_capacity(max_workers);; h B! O$ {7 O, L
let receiver = Arc::new(Mutex::new(rx));
7 `$ U0 I( W0 B for i in 0..max_workers {' U# Q% l0 Z' [4 f! F
workers.push(Worker::new(i, Arc::clone(&receiver)));& o2 f7 b4 d j. \5 p
}$ }) p. @' W, M7 { o6 z" H+ P4 w. @
- X+ J# g0 k; v6 T+ r5 m0 q9 ~" [
Pool { workers: workers, max_workers: max_workers, sender: tx }- g8 }8 ]/ y7 I" T- ?( |3 Q
}1 F9 z. X' W# a+ j7 E
]( N9 s$ U7 q: T ] t. L- U/ ] pub fn execute<F>(&self, f:F) where F: FnOnce() + 'static + Send1 P9 k6 g: G( Q8 f5 N9 T
{0 R1 ?4 a; o4 A/ d0 F
$ f# }0 V$ b# V' _ [9 N- C
let job = Message::NewJob(Box::new(f));
$ L+ l! |. ^) ]6 S f6 o self.sender.send(job).unwrap();: K3 c$ S0 o1 [9 H8 V
}
! B8 f# m+ s4 F}; X4 h: i, W, T1 f
% Y# N: A% z8 d7 T2 Wimpl Drop for Pool {& f& q& s) W3 Z- A0 z( l
fn drop(&mut self) {
8 w1 o1 q2 V9 S/ [) r/ n. f for _ in 0..self.max_workers {3 J4 S6 i* F' z, N6 P
self.sender.send(Message::ByeBye).unwrap(); q! N: Y1 m/ c7 l3 F& L r8 Y
}3 S+ t: P/ H' h& P
for w in self.workers {
. H W$ C! H$ Y' N: K: L: ~ if let Some(t) = w.t.take() {
1 F5 }& L; K; a t.join().unwrap();; b7 ?, u2 l7 T9 w
}# [8 _4 J: F" c) `* x
}4 P" ?5 t1 N2 K7 E$ c( a
}
9 X Y7 y8 [$ [( b}. s) l% V5 B7 `# b( ~
4 m! X I( m( R0 Q# y7 J U2 R6 ^ W( u5 E: o
#[cfg(test)]# `( O+ X d- F2 N" E/ n' e5 x
mod tests {
) n' t L% v3 k% A1 ~) u use super::*;
6 n* c H! _# o- Y0 Y& a; B' G #[test]
& X3 N0 i7 A0 v9 d3 j2 t fn it_works() {
6 A$ e' P# k: M% Y let p = Pool::new(4);
5 d) [! v/ V G' J p.execute(|| println!("do new job1"));" R. g: Q! N1 c6 e* j# g
p.execute(|| println!("do new job2"));
% r$ B) y0 \; n+ ~: N$ n p.execute(|| println!("do new job3"));
8 P7 G. `0 U# u# l p.execute(|| println!("do new job4"));
* ^& U1 y' i. H1 W! A3 F }9 X% R& N" x1 _ N) Q* @
}
4 p( Y, A3 @' |- [/ H</code></pre>
3 W/ y8 B- Y; P% ]
2 n* b, L' d; m. A' K; n% A |
|