|
|
6 [* ]+ b5 `, j/ b+ E+ {<h1 id="如何实现一个线程池">如何实现一个线程池</h1>. m! D, `0 t' s O$ W
<p>线程池:一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。 例如,对于计算密集型任务,线程数一般取cpu数量+2比较合适,线程数过多会导致额外的线程切换开销。</p>* ?5 E/ l+ Y- W Y
<p>如何定义线程池Pool呢,首先最大线程数量肯定要作为线程池的一个属性,并且在new Pool时创建指定的线程。</p>
+ k. Z* v4 u: Q" C1 r2 O: }! s6 [<p>线程池Pool</p>- C' |% a% _9 j e* C% {
<pre><code>pub struct Pool {
# J9 t) C3 m. @. R max_workers: usize, // 定义最大线程数" c, A% u3 l3 N) l6 M+ k% o/ n7 O
}
8 a V) r/ p% L' K# M9 b
9 {- c# y: ~+ F3 s/ R$ I1 Eimpl Pool {3 K! I$ W; A) [9 a' r* [, _$ q
fn new(max_workers: usize) -> Pool {}/ {. y; h* u5 {6 z1 {
fn execute<F>(&self, f:F) where F: FnOnce() + 'static + Send {}! S$ e% B2 f; |7 T! `
}8 Y1 v* G/ ^! Z7 f9 l2 B# i
. L. g8 a' O) x/ t" t
</code></pre>+ `& _! @5 m5 \ e: [
<p>用<code>execute</code>来执行任务,<code>F: FnOnce() + 'static + Send</code> 是使用thread::spawn线程执行需要满足的trait, 代表F是一个能在线程里执行的闭包函数。</p>
1 I8 ^, p- f( @/ z# j4 j<p>另一点自然而然会想到在Pool添加一个线程数组, 这个线程数组就是用来执行任务的。比如<code>Vec<Thread></code> balabala。这里的线程是活的,是一个个不断接受任务然后执行的实体。<br>
# Q; V1 \# y& S可以看作在一个线程里不断执行获取任务并执行的Worker。</p>9 ?2 q4 g7 [5 G
<pre><code>struct Worker where
7 n! ?0 u- {6 r% X X& h{
' [6 e* |6 y' U _id: usize, // worker 编号) I6 l5 v4 C V @4 j
}
; o4 t3 }1 V/ G+ d# r! k3 e</code></pre>
- @" @' u0 s' _2 y" K<p>要怎么把任务发送给Worker执行呢?mpsc(multi producer single consumer) 多生产者单消费者可以满足我们的需求,<code>let (tx, rx) = mpsc::channel()</code> 可以获取到一对发送端和接收端。<br>, i8 ~8 I) n: H, ?! \* z) r
把发送端添加到Pool里面,把接收端添加到Worker里面。Pool通过channel将任务发送给多个worker消费执行。</p>% A# V$ {7 \9 U1 @( h3 f
<p><strong>这里有一点需要特别注意,channel的接收端receiver需要安全的在多个线程间共享</strong>,因此需要用<code>Arc<Mutex::<T>></code>来包裹起来,也就是用锁来解决并发冲突。</p># F& W, H8 o. x5 V) n, f
<p>Pool的完整定义</p>
% O& `) G3 j3 \; L0 r# N: B<pre><code>pub struct Pool {
( r% p# Z6 l3 x9 E, }3 E workers: Vec<Worker>,! U$ v# {1 W5 L# M9 ^) v' F
max_workers: usize,
) E& P1 X4 m; ]* Q sender: mpsc::Sender<Message>
- i5 W, ~+ L0 [; @$ E}
2 e* ^4 ?1 U; X7 i8 w5 q</code></pre>
; ~# U7 ^/ v2 E" |* V/ Y<p>该是时候定义我们要发给Worker的消息Message了<br>7 I( K- s" W! f' n# M! k
定义如下的枚举值</p>
9 @7 ]2 r4 m$ A0 n9 V4 q7 Y( {! v<pre><code>type Job = Box<dyn FnOnce() + 'static + Send>;
7 \7 W# g5 U6 Senum Message { H. w6 w, p5 X; @
ByeBye,
8 w/ V/ p8 s# g' K6 b NewJob(Job),' g( \0 O, l A7 z# K
}' |5 ?% q' J2 b
</code></pre>2 C d+ m2 B$ f% F1 X( b! ?5 j* p; Z
<p>Job是一个要发送给Worker执行的闭包函数,这里ByeBye用来通知Worker可以终止当前的执行,退出线程。</p>
1 f O3 u& O' b% f. {4 f: e<p>只剩下实现Worker和Pool的具体逻辑了。</p>
6 F S! N6 Q% O1 v1 O$ f<p>Worker的实现</p>, ]! K' E- ?3 M# G! V# O
<pre><code>impl Worker
. a# W1 h" O9 D& ^. y" H4 x) M2 X{7 E% `$ T; h( y& b e& ?, [4 o
fn new(id: usize, receiver: Arc::<Mutex<mpsc::Receiver<Message>>>) -> Worker {
, v0 A" n" U" }$ `! X* @ let t = thread::spawn( move || {/ W8 i6 z* f+ ?9 ~+ E/ n: @
loop {: m0 U: `1 Z" }% d0 E6 {8 R( ?7 T
let receiver = receiver.lock().unwrap();
( I. D* A+ i% w. ]+ O let message= receiver.recv().unwrap();
% j3 w0 U$ B! u/ [ match message {
r/ R& o; x: b; J9 [9 |! s Message::NewJob(job) => {& O) a k) u" C" p6 c* L* N" `
println!("do job from worker[{}]", id);
. Q2 M7 n5 v2 j# a job();% y; W0 }# Y- h! X
},
" F8 Y) e" l( J! k+ T Message::ByeBye => {
. K8 y) s$ k" N- P1 p& s! X println!("ByeBye from worker[{}]", id);
. P/ ]) `# M6 Y+ y7 Q' Z1 l break
& `4 J. o% T6 Y8 R. [7 V& K. B },
4 k7 Q: V5 R' \ }
0 C0 `# D2 c) N2 T }! _! K8 `! G# B6 ]+ Q! t
});: z, _2 |) }4 @' q4 l
! F* G; A% L5 k/ x Worker {4 u4 t; D& K6 S$ {! ~
_id: id,' J S7 h( x6 y- m$ n* @' l( f) `* x
t: Some(t),
9 z: u! ]* R$ t h9 N3 o! x w: Z }' Z- b6 q! s3 M" W: G" q$ M
}% \% z r2 F0 |" P
}' c3 L; d0 t+ z- M
</code></pre>
4 H+ r) l( n9 {<p><strong>let message = receiver.lock().unwrap().recv().unwrap();</strong> 这里获取锁后从receiver获取到消息体,然后let message结束后rust的生命周期会自动释放掉锁。<br>
8 s% D5 T, _/ r b9 R0 j但如果写成</p>& G1 }1 w0 @( m8 ]) u
<pre><code>while let message = receiver.lock().unwrap().recv().unwrap() {0 o& E N6 {# G: s
};
7 X: F F; M! O Q5 A/ C</code></pre>
! A- H) b, J+ Q$ z! h<p>while let 后面整个括号都是一个作用域,要在这个作用域结束后,锁才会释放,比上面let message要锁定久时间。<br>
% p @9 F3 b3 }/ Qrust的mutex锁没有对应的unlock方法,由mutex的生命周期管理。</p>$ H2 T s+ l& P& G8 K, V
<p>我们给Pool实现<code>Drop</code> trait, 让Pool被销毁时,自动暂停掉worker线程的执行。</p>
! P9 E5 i4 w/ { z) M, }<pre><code>impl Drop for Pool {
$ i6 Z* b9 s; i" G$ O fn drop(&mut self) {
0 x& W' _ Q% m L for _ in 0..self.max_workers {
9 R; K/ e: y, L' E: U' `7 I self.sender.send(Message::ByeBye).unwrap();
3 G5 U& ]7 M, R$ f+ q" Y/ _ }
( s- ]9 x' H$ s1 y for w in self.workers.iter_mut() {
1 b7 j8 J/ c- x1 Y5 x! [& z) h3 B if let Some(t) = w.t.take() {3 X. U2 n0 K; _- D% C) T
t.join().unwrap();
4 Y1 b: @/ G' X( z- e, K8 ] |. ] }
2 _$ n4 y5 q/ ]$ Y0 e8 a% x }
" |1 ^* P, [3 i) ? }1 y# M+ h r' ]; {
}
, j$ o9 ^. f: L5 | u; P$ W" u8 u" I/ C' U
</code></pre>) ?* R/ Z; ? K) y
<p><strong>drop方法里面用了两个循环</strong>,而不是在一个循环里做完两件事?</p>
/ y* U% {6 l+ i5 K' H% y<pre><code>for w in self.workers.iter_mut() {
* n! Y0 u1 o R7 {% t" m$ _ if let Some(t) = w.t.take() {
) O+ f/ c+ D7 [/ V9 m self.sender.send(Message::ByeBye).unwrap();
2 E9 X* Z/ P& @ t.join().unwrap();
; q% t9 ]# n" t }- d y @( |6 @/ W v7 e! t
}
& R8 Z K) I2 }* N3 W- E$ d! b+ ]
9 L9 f5 G% c% T; K( P+ C</code></pre>
- \) i% F% _6 @<p>这里面隐藏了一个会造成死锁的陷阱,比如两个Worker, 在单个循环里面迭代所有Worker,再将终止信息发送给通道后,直接调用join,<br>) q; G; @$ U* a1 W0 ]
我们预期是第一个worker要收到消息,并且等他执行完。当情况可能是第二个worker获取到了消息,第一个worker没有获取到,那接下来的join就会阻塞造成死锁。</p>
: F* X% C# p' K. B# n<p><strong>注意到没有,Worker是被包装在Option内的</strong>,这里有两个点需要注意</p>+ N9 u* n! Y5 S- ~
<ol>
' Y" g) L( _& Q<li>t.join 需要持有t的所有权</li>
( K" B; i) x: k<li>在我们这种情况下,self.workers只能作为引用被for循环迭代。</li> `9 A7 G+ ~( G/ ?" ^9 t! l
</ol>+ C- k$ i8 }3 _% p- T
<p>这里考虑让Worker持有<code>Option<JoinHandle<()>></code>,后续可以通过在Option上调用take方法将Some变体的值移出来,并在原来的位置留下None变体。<br>
" Z* w8 w/ [6 U4 v6 H+ B* e8 X0 ?换而言之,让运行中的worker持有Some的变体,清理worker时,可以使用None替换掉Some,从而让Worker失去可以运行的线程</p>: h8 w' A) w9 r9 j) _
<pre><code>struct Worker where/ V' _6 ]$ m$ ?" @2 o
{. X8 s; L& g4 w9 n
_id: usize,( |9 x3 ~8 l+ c
t: Option<JoinHandle<()>>,/ w0 u! D0 ]3 v/ O/ x. r6 N4 A$ J
}
( s7 B: U4 ]* [2 }5 |% ~: ^7 P/ a, D</code></pre>
( Q- Z* _- q: Y3 G<h1 id="要点总结">要点总结</h1>. {8 J3 n( Y! v) b2 D7 N
<ul>
: Y; x/ v# U0 |3 W: Y1 J& r; G9 B: V<li>Mutex依赖于生命周期管理锁的释放,使用的时候需要注意是否逾期持有锁</li>
5 B: J( n+ Y% g- I/ o3 ]<li><code>Vec<Option<T>></code> 可以解决某些情况下需要T所有权的场景</li>% ]) b+ y% ^& `# d7 A5 W
</ul>
: T$ ^% ?* d; A* V) m<h1 id="完整代码">完整代码</h1>
( s3 s8 T) a, v1 l; Z0 \; K<pre><code>use std::thread::{self, JoinHandle};
" E+ V* a$ d( @4 H1 Z4 ouse std::sync::{Arc, mpsc, Mutex};9 R" B% ]* E; j6 y
3 R7 D( m% u+ X! T! x7 \
# s- `! ?) S! R5 `) z9 U% R5 ltype Job = Box<dyn FnOnce() + 'static + Send>;
. V4 d5 j L& x3 ?) Xenum Message {9 l& O# w Z+ P2 S& Q0 [
ByeBye,. C1 _) ]1 y/ y" h
NewJob(Job),8 m8 O8 ~; }7 E, T
}) c* S. l. O0 c1 N8 A- \0 [
0 {' p- S: n3 w9 L0 a- {
struct Worker where
; G" t* K( k* I9 m# b7 a1 v2 ^2 d v{
! d4 ^7 J* _: U% ^2 V* ] _id: usize,: ^/ s7 l( b% B9 J/ M
t: Option<JoinHandle<()>>,
5 y: E" l! n+ M/ F, J- C( ~" f}3 w/ Y0 N" i- p1 ^2 t% L
" T0 `0 @1 {# |4 Cimpl Worker1 I i; ~& h6 M- F5 k. Q/ A
{/ S# Y" Y+ Q8 t- _# c8 n
fn new(id: usize, receiver: Arc::<Mutex<mpsc::Receiver<Message>>>) -> Worker {
3 f. o0 s3 i! m let t = thread::spawn( move || {* m, [+ P( ~6 @! u, F4 Q0 [( I; r
loop {
: d) J& k. N9 C5 l let message = receiver.lock().unwrap().recv().unwrap();
) [( S( [2 q# ~& w6 G- m match message {0 V- G- G7 R. X
Message::NewJob(job) => {
, i: w1 _4 m# U! f: c println!("do job from worker[{}]", id);
' [' ]: A) e3 R# V% q job();) `9 ?2 e+ q2 c9 t
}, j J( F& K) q$ T
Message::ByeBye => {4 D5 G1 z4 m# t% M
println!("ByeBye from worker[{}]", id);6 ~6 g2 |1 v5 p+ U2 e
break
+ F$ H6 ]2 Q" D+ `+ i& b },1 ]8 W/ y5 Q) N' Y9 N
} ! {: I9 S8 F8 G: B* E
}
* \2 {$ e: Y2 ^# k1 K! e });
* G8 |' y' C9 g, f) r: |' A: _5 `4 R" C: Y- O3 ^, _( o$ [% A; b
Worker {
# c; v! Y5 }/ x, D _id: id,' {6 V5 T4 }1 a6 G
t: Some(t),9 ^8 w! H( ?+ b+ W
}
& C3 U c# H' t- p9 [! S }" z" Q f& h3 y7 ?$ b3 V4 c
}
, T e1 C! r/ x" n$ I+ m8 P+ G' U
0 V1 ~( J+ J1 j0 O3 ^pub struct Pool {; m# l5 @' I) l2 ^$ E
workers: Vec<Worker>," Z+ i* }3 y5 @5 w' n2 _
max_workers: usize,& ], Q2 R4 J" ~8 d
sender: mpsc::Sender<Message>; ?/ N+ J" } V* F( ?! x1 L; ^! K/ t/ S
}6 |' W% A9 F/ T* {3 O! Q
" r' i$ |4 A1 W1 F: e+ R4 h- Iimpl Pool where {5 x4 K+ f1 l Q
pub fn new(max_workers: usize) -> Pool {) R$ x% |) D! q% |! l
if max_workers == 0 {
+ b# T, O# Q5 `; A panic!("max_workers must be greater than zero!")( b; F9 Y2 V4 W! b
}
# l5 y" b2 f% @5 ^5 v let (tx, rx) = mpsc::channel();7 o. d, a$ D+ f" C
: r& m1 O% D0 m
let mut workers = Vec::with_capacity(max_workers);
& y$ M; q: q& ?5 u: g5 s- [ let receiver = Arc::new(Mutex::new(rx));
% N+ l- t; W. H6 n1 b4 v: e H% A for i in 0..max_workers {
! {; Y8 x( T- e) j3 T, x workers.push(Worker::new(i, Arc::clone(&receiver)));
8 Y# b# E* F! K! T6 K( N6 ~0 G }
% `! r& x' J. Y
% Z- Z3 Q/ X, @ Pool { workers: workers, max_workers: max_workers, sender: tx }9 o1 B" u- L* x4 E4 G, [: E0 x
}
# P% R2 y- w" Z( u9 u
7 E p' y( _/ ?7 s9 p pub fn execute<F>(&self, f:F) where F: FnOnce() + 'static + Send! `2 G- |7 [0 q( b X& x/ l0 e
{
2 o$ Q5 A' w7 ^8 Z2 N/ V+ b5 l* |" V3 A
let job = Message::NewJob(Box::new(f));
# S/ [! ]* ^* Y3 X6 y self.sender.send(job).unwrap();' Y( d- p J h
}1 J6 \' T4 K S6 ~0 c& l
}
7 i4 _8 l' j! G4 d R9 c
6 {* Z! H6 q; O/ p" Rimpl Drop for Pool {
& A, B0 a9 @2 Y9 t m fn drop(&mut self) {
0 R* T& r5 g! R& T% r- f for _ in 0..self.max_workers {
9 j4 k% z- }) C, n self.sender.send(Message::ByeBye).unwrap();4 \, t7 v6 \1 u3 c8 { C
}8 q5 k( {& k R& h# E: ~
for w in self.workers {. W! C& s" a' s* u4 R/ K* a
if let Some(t) = w.t.take() {2 o/ d m6 \' z. q" N
t.join().unwrap();/ J* g5 r% K# O) a7 \
}0 G7 ~' e% |6 F D3 O
}
( h. m3 \. U/ ^6 ]- x/ s1 @* V }' E5 b9 Q: Z' f* a2 I% f+ Q
}+ [/ K& G- i$ v6 q4 [" L% i; @
! P7 G9 u. C) m5 _6 ~3 A+ I$ E9 J: |% E, U) W; b
#[cfg(test)]1 U5 O! o% s2 p
mod tests {# H: d+ W3 A8 L* n( \. X. o
use super::*;, r5 t/ I1 M1 m9 m
#[test]" F6 s% ]- _0 @' A' [9 L/ @# M+ l
fn it_works() {
4 G1 G# }3 y U2 m( X/ | let p = Pool::new(4);$ u0 d4 j* |/ i( O* }! N
p.execute(|| println!("do new job1"));8 j( |2 Q6 h7 K7 ^) b
p.execute(|| println!("do new job2"));
z! w" j! B" x p.execute(|| println!("do new job3"));
% `& k6 w4 f% }1 @) a p.execute(|| println!("do new job4"));
1 V8 `" M; ^. z6 Z5 t! h& G }: z5 d3 I: X0 z0 \
}5 _3 u. Q' o6 |& i' q
</code></pre>
. b# K1 ]7 H2 e8 `; K) g/ S M4 }- [3 y
|
|