|
|
O% D8 v: w7 L8 W8 B4 t( D7 c8 v3 Y<h1 id="如何实现一个线程池">如何实现一个线程池</h1> r* G" i( a5 ~ B
<p>线程池:一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。 例如,对于计算密集型任务,线程数一般取cpu数量+2比较合适,线程数过多会导致额外的线程切换开销。</p>
- Q- @/ N' _0 V9 l0 ?/ i/ Y<p>如何定义线程池Pool呢,首先最大线程数量肯定要作为线程池的一个属性,并且在new Pool时创建指定的线程。</p>
a6 Q! H3 C- `+ ^- ]- t<p>线程池Pool</p>
' j7 R! A# e, J' I<pre><code>pub struct Pool {# n }4 @6 a4 F$ W6 |/ ]' _! }
max_workers: usize, // 定义最大线程数
' Y& w" R! r% S( F7 Q. C}' R4 z* c' M) A, }8 H- C
0 _ j3 P( s' @
impl Pool {
2 `- X( I$ N: F4 Y fn new(max_workers: usize) -> Pool {}$ v2 L4 { V0 k5 v6 Q4 n$ @
fn execute<F>(&self, f:F) where F: FnOnce() + 'static + Send {}8 {; s/ u( Q5 G9 K6 o0 y l$ f
}
6 B3 M0 |' E$ y$ ^: g# n# W1 |2 ~- U: J8 ]/ ? p" G
</code></pre>! _1 y4 C! e |5 C5 e+ b
<p>用<code>execute</code>来执行任务,<code>F: FnOnce() + 'static + Send</code> 是使用thread::spawn线程执行需要满足的trait, 代表F是一个能在线程里执行的闭包函数。</p>8 S' e) z' e1 \8 S0 j
<p>另一点自然而然会想到在Pool添加一个线程数组, 这个线程数组就是用来执行任务的。比如<code>Vec<Thread></code> balabala。这里的线程是活的,是一个个不断接受任务然后执行的实体。<br>$ G6 Y+ L1 `/ x0 {
可以看作在一个线程里不断执行获取任务并执行的Worker。</p>& J3 {4 L, O/ N: V
<pre><code>struct Worker where7 ?# X Q0 @1 X6 n8 g. V
{2 f. r2 z$ ~3 v, ?" m! }+ Q
_id: usize, // worker 编号( T4 O! `+ {; q9 w0 b @! v. g
}
7 j2 e3 }( H- ~5 A8 [# s</code></pre>% ^6 H& R. M) w! d0 O$ w
<p>要怎么把任务发送给Worker执行呢?mpsc(multi producer single consumer) 多生产者单消费者可以满足我们的需求,<code>let (tx, rx) = mpsc::channel()</code> 可以获取到一对发送端和接收端。<br>
% ^2 Z& M( {/ z% B( q$ a- a6 A4 ~把发送端添加到Pool里面,把接收端添加到Worker里面。Pool通过channel将任务发送给多个worker消费执行。</p>
$ I: Z7 B7 r! K1 I7 e* l) M<p><strong>这里有一点需要特别注意,channel的接收端receiver需要安全的在多个线程间共享</strong>,因此需要用<code>Arc<Mutex::<T>></code>来包裹起来,也就是用锁来解决并发冲突。</p>/ X8 e- S" i2 A/ O0 C2 d
<p>Pool的完整定义</p>- G( h) l4 K5 Q9 C6 H" f
<pre><code>pub struct Pool {
, V( O& O# K. X7 P5 C: a workers: Vec<Worker>,5 b$ D6 O @/ s% e# G; v6 P
max_workers: usize,( H6 r! g) d) f8 x' J
sender: mpsc::Sender<Message>
3 k8 k" R8 l6 j$ {0 J `6 _$ }5 y}
/ `# ?) x' a# Q: G# L</code></pre>/ ~' p. M$ r) j( u9 \' Q) e
<p>该是时候定义我们要发给Worker的消息Message了<br>: V& q! Y7 U/ i3 |+ ^. B
定义如下的枚举值</p>
2 Z# A% i% e, q0 e# u8 h- U* j! r<pre><code>type Job = Box<dyn FnOnce() + 'static + Send>;1 X9 G! ?" u% H
enum Message {
; {4 n$ [; Q( d# ?! P2 k: X) N- I ByeBye,2 ?! l9 h/ K& i; N
NewJob(Job),
1 j" q; ]% [4 v" ]( r; T} g4 x; K+ m2 B. i- C- Z/ Y' j
</code></pre>1 I2 C( \3 P4 k0 L3 T0 o
<p>Job是一个要发送给Worker执行的闭包函数,这里ByeBye用来通知Worker可以终止当前的执行,退出线程。</p># G0 V# G& `: G3 R
<p>只剩下实现Worker和Pool的具体逻辑了。</p>
- D+ `- b+ R! d4 O<p>Worker的实现</p>
( `5 S7 G7 M" @$ g; _<pre><code>impl Worker
}! I& n! W8 ^; |5 E" w{5 s$ P( {! G- ^8 v
fn new(id: usize, receiver: Arc::<Mutex<mpsc::Receiver<Message>>>) -> Worker {/ ?, P6 j$ T# X6 ]
let t = thread::spawn( move || {
+ B1 o0 U4 D1 A: e j9 d" Z loop {
4 q. h. s1 X$ q2 H& M let receiver = receiver.lock().unwrap();
" b4 X% X' g1 b* K1 m# M let message= receiver.recv().unwrap();
: A' Q7 W' q E$ o match message {
9 F' S; b, c# h Message::NewJob(job) => {: w7 g% s4 A3 T0 P
println!("do job from worker[{}]", id);
2 X( j% S- x: r' ]5 q) C; M job();
3 y, Q- n, U6 k* w" l; N },
5 [: O- N! N1 @! h+ Y- A7 Q B Message::ByeBye => {
: _7 ^5 g' r0 G; k' w' ^ println!("ByeBye from worker[{}]", id);
4 i, ` Z$ N ~6 M* x9 r" j$ c break; D( ^, d: ?: ~0 \$ k: ~9 @0 J
},1 \$ z- v/ b* b) f* J; Q/ n
} 3 H- T- H; R( p' y
}3 [! I- p% O+ U; l& S+ j, L2 @
});0 q Z, {1 ]* i, e7 X4 x3 f
- l6 j& ^ ]' ~% }: o0 r/ {
Worker {
3 g# \6 w5 d. ?' l# V) e- F _id: id,0 S B9 Z: V4 u5 p+ o# b, ~
t: Some(t),
* F% _) }% V) [1 n/ d7 k, p }9 ^" f" P/ F9 X2 d) A: f J
}
/ L& Y3 w6 D: |- b}+ v# s! E" k% I
</code></pre>; p' M f- W. S. T @8 J# A
<p><strong>let message = receiver.lock().unwrap().recv().unwrap();</strong> 这里获取锁后从receiver获取到消息体,然后let message结束后rust的生命周期会自动释放掉锁。<br>
3 L+ Q* G* j7 S' Q( |% b! j但如果写成</p>* z8 a# S/ G2 ?' _1 g" M y& t r! m
<pre><code>while let message = receiver.lock().unwrap().recv().unwrap() {7 @4 x; v5 a. e% I0 Q/ @$ h0 z
};# E: H0 p6 I% e2 P
</code></pre>
, d& f2 ]2 e1 V& \8 U! H4 ?7 r8 ~<p>while let 后面整个括号都是一个作用域,要在这个作用域结束后,锁才会释放,比上面let message要锁定久时间。<br>" w5 ^# Z) A1 }5 ^4 Z5 K6 D
rust的mutex锁没有对应的unlock方法,由mutex的生命周期管理。</p>; c" x+ K3 t) h# o' v$ S$ S
<p>我们给Pool实现<code>Drop</code> trait, 让Pool被销毁时,自动暂停掉worker线程的执行。</p>
" b" k. W. J, w( r }: s- |0 L<pre><code>impl Drop for Pool {2 Z8 Y- J; s" L7 k4 O$ i
fn drop(&mut self) {; c/ a# a* r2 M6 p% J' |# J
for _ in 0..self.max_workers {
7 ~4 H6 e% [0 H7 e5 F self.sender.send(Message::ByeBye).unwrap();9 B1 h( ], K' N/ C% j3 }; y; a
}: ? m1 B- j: F% u+ x7 |$ H
for w in self.workers.iter_mut() {$ U, N& {, A, I6 I$ \
if let Some(t) = w.t.take() {; A4 K+ q7 V3 B* p8 ?5 F) F5 U- u
t.join().unwrap();
+ `5 B- o6 b* ~. ? }
9 e3 J L) i1 X& u( v1 } }/ P- w. @ Z) d: T* ?
}& L5 y' W' L" j6 z0 I. }- P" }
}
! n! p2 B. _) }" x3 K; E3 r! }' k6 \# d5 g8 r2 o0 h2 ~4 `
</code></pre>& _0 [ p; D( m
<p><strong>drop方法里面用了两个循环</strong>,而不是在一个循环里做完两件事?</p># b. K+ O% v* F: v$ r% V* w
<pre><code>for w in self.workers.iter_mut() {9 z- a& y4 U/ y7 W# p7 p
if let Some(t) = w.t.take() {7 u9 Q! L2 X! ]7 p# f& d9 M! P* Y
self.sender.send(Message::ByeBye).unwrap();
3 r) S0 N% B. ]2 a, ^ t.join().unwrap();3 b/ ~( R! u( u9 k
}, y: s4 L! }& \+ e+ x2 `/ Q
}( ~5 \( H- \) B3 z6 M
. K, X: W4 n% R. y. p, L f
</code></pre>
$ X" h% R, z( n* G! C1 H6 T# B, r$ [; k<p>这里面隐藏了一个会造成死锁的陷阱,比如两个Worker, 在单个循环里面迭代所有Worker,再将终止信息发送给通道后,直接调用join,<br> D8 R) ]2 r+ A
我们预期是第一个worker要收到消息,并且等他执行完。当情况可能是第二个worker获取到了消息,第一个worker没有获取到,那接下来的join就会阻塞造成死锁。</p># Z$ ^2 f! |9 U9 q* X
<p><strong>注意到没有,Worker是被包装在Option内的</strong>,这里有两个点需要注意</p>$ z. ^# v: C: J$ b8 s
<ol>
4 N' d; F0 ]$ X$ Z D5 @<li>t.join 需要持有t的所有权</li>
7 q8 u$ z ]/ O. w- m" Q' Y<li>在我们这种情况下,self.workers只能作为引用被for循环迭代。</li>
% Y% ~# v& k5 |: _, b</ol>
& |! `7 {$ g' @7 J& K+ K<p>这里考虑让Worker持有<code>Option<JoinHandle<()>></code>,后续可以通过在Option上调用take方法将Some变体的值移出来,并在原来的位置留下None变体。<br>
( d) E0 e6 m) L/ U换而言之,让运行中的worker持有Some的变体,清理worker时,可以使用None替换掉Some,从而让Worker失去可以运行的线程</p>$ l6 O0 |) j- E
<pre><code>struct Worker where% h1 P {) |( f
{
3 l% t& ^6 |2 G5 ~6 b3 u2 h$ I _id: usize,9 ^* M$ C6 y1 c
t: Option<JoinHandle<()>>,
: c1 R4 t: ^4 q% X' W}& X) `& \- ?+ q2 g* X
</code></pre>
0 I/ Q8 g5 b1 K! d7 `<h1 id="要点总结">要点总结</h1>' S2 z7 T% y0 _. p3 I5 t! o
<ul>6 F, r" f" R! h4 y, f. h
<li>Mutex依赖于生命周期管理锁的释放,使用的时候需要注意是否逾期持有锁</li>5 ?! U+ [! ^! X9 |$ u+ x8 }) P
<li><code>Vec<Option<T>></code> 可以解决某些情况下需要T所有权的场景</li>: x- X+ N% _9 s; _+ a' C) Y
</ul>
1 P, R' X/ @' {$ B+ u<h1 id="完整代码">完整代码</h1>
* t) U- V7 Z9 e+ k5 v) `( W" @<pre><code>use std::thread::{self, JoinHandle};
. M! Z$ I7 p$ [4 Quse std::sync::{Arc, mpsc, Mutex};
) z; o! Q. m: D8 E8 T2 h% d
2 l7 K# V" t! Q: d1 q, \
* S1 @7 k/ F+ R! b2 Ttype Job = Box<dyn FnOnce() + 'static + Send>;
# |( E; L+ e0 W5 Venum Message {
- _1 j* J. t/ d ByeBye,
" A% w) G" c2 r. J T" Q/ q+ n NewJob(Job),$ F1 t5 _( [. u" ?4 y. ^9 D: }5 R
}
5 `0 Z# e. e. y
% R, V/ f$ @$ F$ P/ _" gstruct Worker where
6 j' H' c( z+ P# i& T{! C$ b1 {1 ~1 [8 s& W
_id: usize,3 A2 B e$ `% ?0 G. Q
t: Option<JoinHandle<()>>,
! P4 I6 I! d$ a}
5 z4 c0 a. j2 y# |/ `1 U
% b6 b2 K( ?& Z% u' W jimpl Worker3 Y5 V, I& \9 g! I' c) Y
{7 k, R: C% C, P7 ]! X$ _- E6 i7 _8 u
fn new(id: usize, receiver: Arc::<Mutex<mpsc::Receiver<Message>>>) -> Worker {" Z! _; v0 l3 L
let t = thread::spawn( move || {
. J7 h# F6 |1 g4 f! H loop {0 z' ^7 j8 u; S3 T1 [2 m
let message = receiver.lock().unwrap().recv().unwrap();! g! q/ q v# Q) G+ |, y
match message {; g$ e" E, |5 t
Message::NewJob(job) => { ^7 k" `* r h; f! v% b9 R1 d
println!("do job from worker[{}]", id);/ X* H0 [; g8 [* Z: ]& c
job();1 t5 _4 G1 b+ `0 n* `
},
! B" w8 p |. }$ v- l! J Message::ByeBye => {
1 {: S( O3 E6 H) C1 a println!("ByeBye from worker[{}]", id);; s/ t/ ]; l# R/ x( i: d7 T
break
4 f5 a: Y T: u# y1 L C4 C },
9 E, R1 ]3 M" K4 E5 H } , l1 }9 }; D( X
}, |& A9 N. v1 U: l
});
, A, [4 \4 f# @& l& @
0 n5 c/ i6 Y' |+ X Worker {. a0 @3 @; @) f- @6 N: k$ F/ Y
_id: id,
/ v% O3 c. a# i" }+ k# T- r t: Some(t),
( ]0 C3 |% R; {5 h' j" Y' S; c+ b }
8 K& g/ Q" j# O9 A6 q$ ~) l0 s6 [ }1 O; W# r# C! z% E& \
}7 U: q3 V! ~; ]7 M9 d4 j$ D
6 D6 U" a' G' B/ \ upub struct Pool {: [2 o) r; c: J; z
workers: Vec<Worker>,
F/ a" Y! X; A# M j! t" m' H+ @ max_workers: usize,6 \& C: c! r! v% {% e/ k
sender: mpsc::Sender<Message>
* S( U, y1 _( p' B) i: A5 d}
2 y* t! J1 G; \0 ^% B
2 A2 \0 j& B3 C9 |: _impl Pool where {
. ^ `# X4 p! s. V' e4 l pub fn new(max_workers: usize) -> Pool {
' S% V1 k' {( J8 V5 I if max_workers == 0 {: N: y8 z, C$ t% ?/ b& K5 w' i7 O
panic!("max_workers must be greater than zero!")2 } c5 A, B! ] D1 ~1 r
}
. ?) G! N( @. T/ G' F3 r let (tx, rx) = mpsc::channel();
/ U" G& I! y0 G1 a. e9 @3 X+ J1 i: ^3 e( P: s' L
let mut workers = Vec::with_capacity(max_workers);
6 F8 ^: ~, v. h# b6 B$ \! u; p let receiver = Arc::new(Mutex::new(rx));
3 D! t+ b( y5 v5 S0 S |' } for i in 0..max_workers {6 I* T' {7 [4 N" V8 j! ?
workers.push(Worker::new(i, Arc::clone(&receiver)));
5 f9 J) X; w6 q# Z$ o5 v# L }
2 T* O5 n1 M; F+ Y- t# b+ a+ ~5 u
Pool { workers: workers, max_workers: max_workers, sender: tx }
& a: G, R8 r3 ` }; N1 w# O6 q& K9 D% _3 z
% S5 K3 l6 N6 u5 q* m* e; c
pub fn execute<F>(&self, f:F) where F: FnOnce() + 'static + Send/ J9 F; |* `. v7 y8 i. a3 r4 x
{
$ J3 G( w D& o, }2 ^# {( s% ]8 d- }7 t/ u* L, q* d
let job = Message::NewJob(Box::new(f)); ?5 v2 ^& E* T# i1 @0 t
self.sender.send(job).unwrap();
/ ^4 r8 D- s" Y } G1 B& r; j& E2 }$ \( |, c$ ]
}
( Q' L" e# M2 Z% f/ a' K3 [
6 y9 D$ {6 ?. O" Y1 ]impl Drop for Pool {
/ Z/ d" P7 O: o$ M. P- T) S1 H fn drop(&mut self) {
% k O$ t" |, _ for _ in 0..self.max_workers {
6 W5 t- a$ u2 q- t: Z% j; l6 q ?+ f! } self.sender.send(Message::ByeBye).unwrap();1 `" [5 [9 v6 }' Q
}
5 x$ O2 k- F! C) ^* e; ~7 c9 D for w in self.workers {3 K% Z- z G+ q: Z6 G
if let Some(t) = w.t.take() {2 t p8 U9 {6 k. X; r: f
t.join().unwrap();
0 J; Q: U* _; m0 f/ d/ G! V }7 O5 ~4 o. N" Z
}
* \8 v' N$ W/ X# z/ P' Q" Z' { }
+ D" I- x: E3 ^ ]8 S7 v5 [0 e}, }1 |! V5 ^. r, Q! N
% a/ O3 i( J: w; T1 u a
; G1 F4 n( [% G1 }#[cfg(test)]
7 r1 h! M& o7 B5 ]mod tests {
8 U' \, v* L) @' a use super::*;
3 |9 \3 d; s+ g4 `5 G( u& T #[test]5 i. i' I* ?& V3 l M, s
fn it_works() {
: k+ I/ [5 m0 d! p+ B) I* q let p = Pool::new(4);, Z% x/ {: W6 H% V. l: p1 X' U/ J2 _
p.execute(|| println!("do new job1"));
+ j: A2 c) L8 f5 J$ J' K p.execute(|| println!("do new job2"));
8 u u( d+ [- v8 C+ A. A p.execute(|| println!("do new job3"));
' K1 V1 L+ ]/ O7 \1 i# x p.execute(|| println!("do new job4"));
' B/ p, {% g% n6 G7 E+ i }
O" b7 I/ G W& S}/ K( ]4 a: _7 v# `
</code></pre>& f$ _& x7 E& W
# M) N/ H7 k5 P# A |
|