|
|
0 u9 x. z1 r" o& J' O8 ~, v
<h1 id="如何实现一个线程池">如何实现一个线程池</h1>
; t- L" F! r# m5 C<p>线程池:一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。 例如,对于计算密集型任务,线程数一般取cpu数量+2比较合适,线程数过多会导致额外的线程切换开销。</p>/ ?& i/ O7 |" Q# T& V" R: ?1 ^
<p>如何定义线程池Pool呢,首先最大线程数量肯定要作为线程池的一个属性,并且在new Pool时创建指定的线程。</p>* M/ i$ x# V6 V6 k
<p>线程池Pool</p>
3 |1 _# `1 U. a<pre><code>pub struct Pool {& F2 ^: [* @: i
max_workers: usize, // 定义最大线程数8 F( v/ ]) a3 @0 c2 [5 X
}4 n( @/ Z$ P: q( U) b) v- i
& ?5 V) M# H! K; o1 d8 S Z9 x! o
impl Pool {/ I" |+ h% A2 I# ?# c
fn new(max_workers: usize) -> Pool {}2 R7 Y9 ]" K2 |! q, Q
fn execute<F>(&self, f:F) where F: FnOnce() + 'static + Send {}/ D q' D4 N# C' w/ s
}) P3 V4 J1 q& o0 P/ X, F
. F' i4 o" o4 S1 v" e</code></pre># G0 f0 M( ]1 R: K8 F
<p>用<code>execute</code>来执行任务,<code>F: FnOnce() + 'static + Send</code> 是使用thread::spawn线程执行需要满足的trait, 代表F是一个能在线程里执行的闭包函数。</p>
& B. k1 X0 ~9 B+ n. M8 s% e. I<p>另一点自然而然会想到在Pool添加一个线程数组, 这个线程数组就是用来执行任务的。比如<code>Vec<Thread></code> balabala。这里的线程是活的,是一个个不断接受任务然后执行的实体。<br>
# B, X9 ]5 |1 h$ W$ W可以看作在一个线程里不断执行获取任务并执行的Worker。</p>3 Z B9 u3 s% e) D. \0 h
<pre><code>struct Worker where* ?6 E W2 X+ R+ Y! ~ J% j( h
{6 D1 x# Z2 c3 g% [
_id: usize, // worker 编号' n- s4 N+ E+ i- u8 C- N
}
. U( ?- v4 G3 l# R2 c6 x</code></pre>
# X" a; a* b" }- d<p>要怎么把任务发送给Worker执行呢?mpsc(multi producer single consumer) 多生产者单消费者可以满足我们的需求,<code>let (tx, rx) = mpsc::channel()</code> 可以获取到一对发送端和接收端。<br>
% Q/ ^) r" X' y V5 l4 {把发送端添加到Pool里面,把接收端添加到Worker里面。Pool通过channel将任务发送给多个worker消费执行。</p>$ X" |1 a! H7 {9 h) n% K- D" E
<p><strong>这里有一点需要特别注意,channel的接收端receiver需要安全的在多个线程间共享</strong>,因此需要用<code>Arc<Mutex::<T>></code>来包裹起来,也就是用锁来解决并发冲突。</p>* M% C0 B) V& I7 M G9 S5 h9 @
<p>Pool的完整定义</p>
& b) x$ M* R4 P& N<pre><code>pub struct Pool {7 V8 U, }9 S3 a( r+ ?% p# m! [
workers: Vec<Worker>,8 H! e8 g0 f+ Y2 W, S4 \! o p
max_workers: usize,
1 e# J5 I0 r- L2 @: j/ i3 { sender: mpsc::Sender<Message>1 }' a5 N- ~9 F$ U0 _& e
}
3 h* T0 n& T& W4 e( k/ {% Q: v: r</code></pre>1 U! O: e6 {; m& F3 ~+ B
<p>该是时候定义我们要发给Worker的消息Message了<br>
. v! u2 x( G/ `定义如下的枚举值</p>
: w. F" W6 L$ C8 j<pre><code>type Job = Box<dyn FnOnce() + 'static + Send>;
u, ~# g" \' H# senum Message {
' H! K- T" F7 Z5 w0 D Y9 Z" b ByeBye,
: N" {, Z9 c5 F3 P% ^ NewJob(Job),
7 z7 M. [0 B/ [) _2 I}# X/ H+ S, |9 @5 D+ r5 I( _4 [
</code></pre>8 M& a$ x6 ^9 }" a, Z9 V) P* B
<p>Job是一个要发送给Worker执行的闭包函数,这里ByeBye用来通知Worker可以终止当前的执行,退出线程。</p>5 M2 R5 v) Q0 R9 }
<p>只剩下实现Worker和Pool的具体逻辑了。</p>
/ z0 r- p% Q; p. O% n<p>Worker的实现</p>( x5 T m' d2 k6 U9 r1 s m
<pre><code>impl Worker+ A4 g; ]8 D* u: a# v' K' ?( a- m
{( P+ H% i; G; d$ Q' ~7 P
fn new(id: usize, receiver: Arc::<Mutex<mpsc::Receiver<Message>>>) -> Worker {2 q* l$ x/ r& O
let t = thread::spawn( move || {& k& T P( I9 Q/ w
loop {" L# j) `# j! O5 p% o( t; t
let receiver = receiver.lock().unwrap();9 q, Q$ X. q) ?3 W0 B Z
let message= receiver.recv().unwrap();
! D& G, |# A) i3 @- r match message {
$ Q; |$ y7 `; n- X6 a Message::NewJob(job) => {6 l5 }! O3 Y; n) V
println!("do job from worker[{}]", id);# a2 o' d7 ^- j5 Y$ {1 s2 C8 r9 n8 `8 e
job();5 R1 c0 m2 n( D! q4 b4 X
}," J5 l8 X+ d' }: m& w
Message::ByeBye => {
- d2 s2 Y4 g3 z8 q c$ `; i# W println!("ByeBye from worker[{}]", id);
}3 f. M/ M6 X; R break
8 m1 F2 C( }$ O' U/ m2 D! v3 M: o; J },- L2 r B, X! v3 ?' W, _& R
} ! i5 N" ?$ c% p5 I' R. B8 l2 Q0 h
}
! S/ q7 z/ g: g2 D });0 X K' X6 e2 g' J/ h
# r2 S. j! q0 S% l3 \; W Worker {
1 X" [7 }1 n- g _id: id,
; ^ W" t9 g* V. a u t: Some(t),
% ? U9 U* Q: n! |; ~ }
7 c& K5 Z: c& m* O }" H& y8 @) @9 Q B( @# Y: U. j
}
6 a3 f9 ]5 e. C* i5 B; B</code></pre>$ Y9 _1 k9 R6 }" v8 v
<p><strong>let message = receiver.lock().unwrap().recv().unwrap();</strong> 这里获取锁后从receiver获取到消息体,然后let message结束后rust的生命周期会自动释放掉锁。<br>
7 S5 M, M |$ }8 q; Z但如果写成</p>
% W" R% H$ p, c o! u( W! Q r5 u" S<pre><code>while let message = receiver.lock().unwrap().recv().unwrap() {2 t# N- u. C- _2 x" m
};
! [' m; y; a9 U8 D! L% x8 O</code></pre>
) O! i& {2 b* ?- B" P3 @<p>while let 后面整个括号都是一个作用域,要在这个作用域结束后,锁才会释放,比上面let message要锁定久时间。<br>5 `0 M# F2 a# i" i
rust的mutex锁没有对应的unlock方法,由mutex的生命周期管理。</p>
8 `4 @) L: h% z4 w3 Q' O& x8 ~<p>我们给Pool实现<code>Drop</code> trait, 让Pool被销毁时,自动暂停掉worker线程的执行。</p>
+ k/ Z6 J" E$ R; }3 A! Y<pre><code>impl Drop for Pool {
( o/ E* ]& z; X( ` fn drop(&mut self) {% o' Q# K4 `! t
for _ in 0..self.max_workers {, O+ T5 C( |# t. k
self.sender.send(Message::ByeBye).unwrap();
) r4 x- h3 v% f L. M }
6 u* b) D/ ^. d9 H1 s P for w in self.workers.iter_mut() {
[9 z+ G1 g' Z( p0 K if let Some(t) = w.t.take() {
/ \$ v6 J' M4 j0 ]3 ?: y t.join().unwrap();5 X l4 p( X1 l. W
}) f: B5 D, |# E! l, f N
}
0 m9 d, }( V3 O+ S* G% \6 o }1 v( q2 `9 T5 I; ]6 a6 {
}
0 r& a2 m- I- g7 q, o% H8 w7 t" K3 R. M* \, [: h
</code></pre>! M7 _2 A Q/ F1 K" f# ~
<p><strong>drop方法里面用了两个循环</strong>,而不是在一个循环里做完两件事?</p>0 |6 `2 c! Z( ]- V
<pre><code>for w in self.workers.iter_mut() {5 E3 y; L( y( g( q" X
if let Some(t) = w.t.take() {
7 K4 H8 l% e9 i% H g9 P7 ` self.sender.send(Message::ByeBye).unwrap();6 R& p& w$ S4 P6 p9 P- m8 k
t.join().unwrap();$ v# q6 m( C) Q4 B
}
" F7 n) z# p; Y0 {+ F}
$ \1 F3 E. {) y. H% u
1 ?- ^) F9 _$ S" ?' a& I</code></pre>
" Q# d2 p9 u! O1 o) o: j: B- E; ]: a<p>这里面隐藏了一个会造成死锁的陷阱,比如两个Worker, 在单个循环里面迭代所有Worker,再将终止信息发送给通道后,直接调用join,<br>
' c) D& Y( l3 S$ h我们预期是第一个worker要收到消息,并且等他执行完。当情况可能是第二个worker获取到了消息,第一个worker没有获取到,那接下来的join就会阻塞造成死锁。</p>8 |2 D- c; m( ^4 c4 A8 l; Z
<p><strong>注意到没有,Worker是被包装在Option内的</strong>,这里有两个点需要注意</p>
O/ s" f; ?8 @7 a<ol>) V* K( V7 M% a! R$ h0 }" x1 k
<li>t.join 需要持有t的所有权</li>& Z( z. n! `7 Y( g: C
<li>在我们这种情况下,self.workers只能作为引用被for循环迭代。</li>, ]- W) s% U% }2 j
</ol># N8 j$ x' i7 ?: v+ w3 A6 f5 v
<p>这里考虑让Worker持有<code>Option<JoinHandle<()>></code>,后续可以通过在Option上调用take方法将Some变体的值移出来,并在原来的位置留下None变体。<br>
6 w. ~' S, q3 {( j9 s; u4 ]% S2 C2 x换而言之,让运行中的worker持有Some的变体,清理worker时,可以使用None替换掉Some,从而让Worker失去可以运行的线程</p>+ k, U* {# ]9 M3 F d9 Z
<pre><code>struct Worker where
& a( W2 K' ~. G+ ]{/ z3 A- k0 P: |8 B% B
_id: usize,
& o+ r' v5 x; E8 g t: Option<JoinHandle<()>>,! I# ~) E0 I2 Q2 h8 d
}
6 E: {" w, S. Q$ K</code></pre>
4 ~, ~4 ]- n( g<h1 id="要点总结">要点总结</h1>6 u2 w- f' o6 Q- V0 `
<ul>5 B8 s! r% a# C
<li>Mutex依赖于生命周期管理锁的释放,使用的时候需要注意是否逾期持有锁</li>
+ Z9 W- v7 {2 S* G; K4 k<li><code>Vec<Option<T>></code> 可以解决某些情况下需要T所有权的场景</li>* S9 r$ b6 D$ L3 H9 B
</ul>
' T9 K' O6 P; [* y( B+ q<h1 id="完整代码">完整代码</h1>
# D" r8 x. i" k<pre><code>use std::thread::{self, JoinHandle};6 ~* Y+ Y9 g* S b
use std::sync::{Arc, mpsc, Mutex};1 x# g& ?- t/ R- Q$ k5 Y
- K, M3 y8 `3 q6 R. N
$ J, t$ w4 z; X4 ]0 d5 i! Y5 U! ?type Job = Box<dyn FnOnce() + 'static + Send>;
% C- E" U0 X" t- W, U4 Uenum Message {
+ J8 u. D" I c/ P2 v( q: g ByeBye,
' D9 a. v6 O9 z, e/ P* R NewJob(Job),8 S6 w( h, i; c. E
}
* W5 [9 D2 d# p+ G/ \ H
& `5 |/ r: @! u& w4 h, }struct Worker where* h1 j6 L- {) J+ `( f
{
2 T6 U* x V5 M k9 W _id: usize,
. A5 e& ]. L* U- g8 C t: Option<JoinHandle<()>>,5 U4 _; b0 _# K$ Q: o4 }
}
, m% m4 H- m, f2 N, `5 m1 \: E0 _' D
impl Worker
+ _! m2 J5 n3 ^7 U0 p+ K; C1 J7 |{
) f5 Y5 ^6 I1 b, d6 g, h fn new(id: usize, receiver: Arc::<Mutex<mpsc::Receiver<Message>>>) -> Worker {
& D( W3 p0 ]4 i let t = thread::spawn( move || {
i! L& C* d0 E$ R loop {0 R1 a1 }" h/ @" L E9 f
let message = receiver.lock().unwrap().recv().unwrap();0 C) @6 N" C2 M! P
match message {! E7 d3 F5 T0 Y% a# J7 T
Message::NewJob(job) => {
9 D" N3 }6 x8 Q3 X! R println!("do job from worker[{}]", id);. u# ?5 w7 e% V: H) L5 p; K
job();3 ]. ?! y( `/ r/ T
},
o. z* }; B* J3 c G Message::ByeBye => {
! v/ B, S, q" g1 Z9 V/ T& H println!("ByeBye from worker[{}]", id);
+ O1 H4 Y& w. E break
( `. g$ a/ M, Z/ Q0 p q },3 m: s: K* }& S1 A4 i
}
: p E, _( c5 v l6 C }
$ l" C7 H% x( L w });, g/ z1 i+ n! k
3 _5 x2 j: g8 _. {2 a3 P5 ]
Worker {' P1 g4 O7 L6 g y, O
_id: id,3 I* h# L" L. l$ a; ~
t: Some(t),! A5 g5 j. b3 a" q! L9 e9 n
}5 q0 e# d+ c I& {5 S: g
}
* a' B) n: u0 `4 }}
. y) H' m! c4 ]* p' p9 W) }% m( O1 W4 M0 ?/ d
pub struct Pool {
- i* J' c' p3 w6 y workers: Vec<Worker>,' B9 r3 s1 |$ I6 b
max_workers: usize,
/ J7 ~) o, l. | sender: mpsc::Sender<Message>
% {1 _4 h; e# T( s E5 k}3 S( q0 b9 N; ]# u" \
. @- h3 G; s3 M- N% C* j( [impl Pool where {7 h2 T4 e: X$ N0 D7 s, J! x- F( Q) Y
pub fn new(max_workers: usize) -> Pool {+ H, P, O$ B/ s8 u
if max_workers == 0 {4 p; X9 [ g; [' t" O. j9 }. |
panic!("max_workers must be greater than zero!")
2 ~8 x3 S+ G# q$ [; u- J }6 r: M. Q" _& n. `; i4 k' Y- a2 [
let (tx, rx) = mpsc::channel();
7 B8 v" g( ^* c+ D
/ ?* `5 B4 t9 f$ [+ p! j let mut workers = Vec::with_capacity(max_workers);, ?& x) Z/ \( F; a3 [1 y* R
let receiver = Arc::new(Mutex::new(rx));
0 |5 d$ T$ N1 n for i in 0..max_workers {
9 p) H7 y, i6 f' V+ Q. ~ workers.push(Worker::new(i, Arc::clone(&receiver)));4 z1 w/ e8 `5 s K6 q, F! x" i! Y/ \
}/ P! f) i' b' M" m$ F/ N
% ?# q. o' x9 J* _# }# d7 C: ]
Pool { workers: workers, max_workers: max_workers, sender: tx }
0 A% r$ ]) n- E0 Y: k }
) a3 I* N t ?# P# M 5 G6 i: T" w+ c1 p
pub fn execute<F>(&self, f:F) where F: FnOnce() + 'static + Send
7 k# {( C1 n, y. d- z( f {
2 R' U7 b( e% Z+ V2 p
; [& E+ J7 u+ ^0 C& L let job = Message::NewJob(Box::new(f));6 c$ y4 Z- Q2 W
self.sender.send(job).unwrap();
3 e% b& P; M. C- w3 }2 L }
! i0 V3 u5 _" n}9 c3 x+ w/ ^+ S/ `. P) f
+ E5 y3 K6 o R' n. Y. ?2 D
impl Drop for Pool {
' w& n4 G5 U* x% F# @3 `: H0 U3 F fn drop(&mut self) {" A; b' `: a H! y! a5 u* _9 Y
for _ in 0..self.max_workers {
, ^# e' ~) _0 ~( B self.sender.send(Message::ByeBye).unwrap();
8 E6 | Y- U: W8 d }& i- P: J1 a" V% O
for w in self.workers {
; F1 \9 w' C5 ?' {, }7 ^. N) [ if let Some(t) = w.t.take() {
# s3 {7 O" X. z" y/ U) Q t.join().unwrap();
- [* I8 Q" B3 d% R# r }
7 g% p& W, {$ A f( ^' { }
E8 x9 g7 U+ h; q4 a3 O) w8 B4 P }6 Z3 P( q* H0 ]! Q: ?# b1 {5 C
}
5 H9 I1 h* ]0 Q) k( k+ R. B; Z; L5 Z
1 Z4 }1 _* N0 \% \#[cfg(test)]
& X% ?1 @) b, Vmod tests {
* t/ u1 p9 d' I; e/ [& z use super::*;
2 n2 P O* A$ e" i+ Y #[test]6 U9 c& Y4 ?, [8 B
fn it_works() {
) F, ~! ]) C0 \7 n d: k- g% g let p = Pool::new(4);2 f: p1 e- G) v# l/ T
p.execute(|| println!("do new job1"));
0 @ B, G- {) s- t p.execute(|| println!("do new job2"));, _' m) Y R: e# i5 C5 _
p.execute(|| println!("do new job3"));( d0 p$ f, I6 x6 l) u& i
p.execute(|| println!("do new job4"));) s d8 ]) m& ~
}
4 G# f3 K L# q1 @}. n+ o/ N7 ~, M8 d, ?' B( K
</code></pre>
( D- K \% o* S7 Q9 Z: q# M$ F+ I4 F: J* _# A+ }$ t9 E
|
|