|
|
+ {8 r* b# u% C. {, [, T
<h1 id="如何实现一个线程池">如何实现一个线程池</h1>
* F, m# }" Q o: u# h: e0 F<p>线程池:一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。 例如,对于计算密集型任务,线程数一般取cpu数量+2比较合适,线程数过多会导致额外的线程切换开销。</p>
' W1 {! N4 B6 X4 A% m3 u<p>如何定义线程池Pool呢,首先最大线程数量肯定要作为线程池的一个属性,并且在new Pool时创建指定的线程。</p>8 h5 b- c' u+ i! q( U
<p>线程池Pool</p>
4 R1 i0 B/ \' `* i) F6 e+ y- ]. [<pre><code>pub struct Pool {
: k9 J2 Q }% w$ f" H! s0 ]: U max_workers: usize, // 定义最大线程数3 a5 G: O6 g/ x! i; p3 M
}
; g5 R; ?6 W y2 X. F" {4 {3 w0 ^& M) Z( I' r3 ?. D0 v
impl Pool {
0 W* ?- w9 Q% d5 R+ {/ v fn new(max_workers: usize) -> Pool {}& q( D+ C6 K1 X; f( }$ {
fn execute<F>(&self, f:F) where F: FnOnce() + 'static + Send {}" g& m/ W/ Y1 E+ ?# f# |
}4 q G# p; P0 G s" P w
& ]- Z2 L6 @9 t( O6 n</code></pre>
' H% J1 F& [4 S" d a( Q2 f% ]<p>用<code>execute</code>来执行任务,<code>F: FnOnce() + 'static + Send</code> 是使用thread::spawn线程执行需要满足的trait, 代表F是一个能在线程里执行的闭包函数。</p>
* X( V2 M! K f1 m2 _* B<p>另一点自然而然会想到在Pool添加一个线程数组, 这个线程数组就是用来执行任务的。比如<code>Vec<Thread></code> balabala。这里的线程是活的,是一个个不断接受任务然后执行的实体。<br>
; x' E2 @' [8 \可以看作在一个线程里不断执行获取任务并执行的Worker。</p>
" J" U7 D6 B5 x- g2 O+ [1 t- O8 _+ F<pre><code>struct Worker where4 r Y5 b1 W1 [% {. p
{5 I+ ~! G0 D+ s: a) l( {
_id: usize, // worker 编号
( @' Y( S1 t. D9 a* l* h}
' j1 ^7 n; r6 t e( |2 x7 W</code></pre>
9 r4 n6 Y: k" c$ G6 R- F<p>要怎么把任务发送给Worker执行呢?mpsc(multi producer single consumer) 多生产者单消费者可以满足我们的需求,<code>let (tx, rx) = mpsc::channel()</code> 可以获取到一对发送端和接收端。<br>5 T! }7 A1 G) ~7 M2 B( I
把发送端添加到Pool里面,把接收端添加到Worker里面。Pool通过channel将任务发送给多个worker消费执行。</p>; K& t. t T2 r' u
<p><strong>这里有一点需要特别注意,channel的接收端receiver需要安全的在多个线程间共享</strong>,因此需要用<code>Arc<Mutex::<T>></code>来包裹起来,也就是用锁来解决并发冲突。</p>* i7 R2 Y6 F, f2 H7 q
<p>Pool的完整定义</p>7 p, O1 @' ]/ {
<pre><code>pub struct Pool {
8 C$ X1 t' v5 C( n! y$ n1 S workers: Vec<Worker>,
9 o; M4 Q4 q; M2 @ max_workers: usize,7 N( E. K6 ?( ^1 b f; G3 @
sender: mpsc::Sender<Message>! B3 x) T; J" j' B! K& `
}: w# x& _7 p8 F* z5 c1 p
</code></pre>
0 z+ F$ B# d7 ~5 l& S) [<p>该是时候定义我们要发给Worker的消息Message了<br>+ B3 P7 Q1 M# l9 X/ L' E1 Y
定义如下的枚举值</p>+ p' L9 W% s% ]
<pre><code>type Job = Box<dyn FnOnce() + 'static + Send>; _& N: p n( a/ E, j
enum Message {5 }/ A2 z- P0 s
ByeBye,
2 T' L a, q. \% o NewJob(Job),* k9 t& ]4 X2 ?& E) N
}
9 R; i/ P. `) ~. q, M/ A! e</code></pre>
. Q, |' c4 ?9 R$ ]/ x' ]+ R<p>Job是一个要发送给Worker执行的闭包函数,这里ByeBye用来通知Worker可以终止当前的执行,退出线程。</p>
1 o j. \4 T0 H<p>只剩下实现Worker和Pool的具体逻辑了。</p>
$ i& o5 v& o0 h6 w<p>Worker的实现</p>
8 K% f: w- k3 R8 T) h) B<pre><code>impl Worker
" U' A! `* k7 C9 K! {{
6 ]) y$ c/ e/ U fn new(id: usize, receiver: Arc::<Mutex<mpsc::Receiver<Message>>>) -> Worker {8 |. o4 L/ L1 w- Z' {1 n
let t = thread::spawn( move || {
( n9 _6 O5 W+ m7 R loop {
; S2 q1 G) i# ?5 ]8 q. t let receiver = receiver.lock().unwrap();
8 W7 Z3 o2 ?, a9 S0 A3 I& C let message= receiver.recv().unwrap();
4 s0 N2 r* S* E8 w0 m( ~3 e match message {
' B1 K1 ]0 s. |1 _# R+ M! c Message::NewJob(job) => {
; g- z+ e6 A/ c) M: R; K! l0 n: Q println!("do job from worker[{}]", id);
, z/ |1 B ^) q( ~) y5 s5 |, z! k job();9 s u9 `; Q" ^. b
},
" G4 i- a1 e- O) ]! g- k2 H Message::ByeBye => {# n% O; y$ m1 O2 o- W
println!("ByeBye from worker[{}]", id);* h- l" f6 C: s+ C7 c
break; d3 @. B" ?8 k: V
},
; F( l: A" x* o4 ~+ M1 Y+ ` } : |- C( h" r6 f
}" w- x; u/ C2 Q0 G
});+ t4 ], K) h0 X8 v {7 l
, z- q/ }9 c7 \8 o Worker {7 ^' e/ z" V- V3 h, f! \
_id: id,
5 \2 K) S, D7 z. x+ j t: Some(t),
* N" ~6 }- i6 g7 A }$ M5 D, s1 o3 L* r
}
. N* }7 o7 U. O' [3 | ~/ s. r, o}3 r) h/ P0 G. Z& U% ~5 R
</code></pre>
& V+ ~# F7 Z* u9 @<p><strong>let message = receiver.lock().unwrap().recv().unwrap();</strong> 这里获取锁后从receiver获取到消息体,然后let message结束后rust的生命周期会自动释放掉锁。<br>5 X# H8 U6 F% ^' d K2 s4 S6 g4 z
但如果写成</p>
% s y1 l+ N# h' S7 M$ N<pre><code>while let message = receiver.lock().unwrap().recv().unwrap() {7 y5 d% N; o7 v& p5 D* X P
};& ^5 r5 l4 `* \% M7 X$ L
</code></pre>( f4 P# Z$ Y$ C3 ]/ l" v6 y
<p>while let 后面整个括号都是一个作用域,要在这个作用域结束后,锁才会释放,比上面let message要锁定久时间。<br>
) a! w4 W: Q4 V+ ~: l6 s7 _8 ^rust的mutex锁没有对应的unlock方法,由mutex的生命周期管理。</p>
9 x- K% W! c- W' H8 R0 M<p>我们给Pool实现<code>Drop</code> trait, 让Pool被销毁时,自动暂停掉worker线程的执行。</p>
- }2 Y: P: Y! z<pre><code>impl Drop for Pool {
, c$ t7 r* V3 g3 q! i; y: x fn drop(&mut self) {
! ~9 U- }, d) T1 @2 y* i for _ in 0..self.max_workers {
: H; f2 X, H! g self.sender.send(Message::ByeBye).unwrap();
0 H5 p% U' E8 [2 V; O }
8 {% _1 j9 o. I- G& S for w in self.workers.iter_mut() {
3 c1 H0 v; h5 q" H+ o if let Some(t) = w.t.take() {( a9 m/ o6 e& N6 N: K
t.join().unwrap();2 { u# P. I3 X2 ?# ]
}' A( k n2 ]9 e6 W7 C
}
* f+ G# ?( }8 ^ }
- M# Z5 K7 s% e0 U; a* K& x}& Y9 ]2 e0 d4 u
0 i% C( E* f6 U2 A
</code></pre>5 Q, o! v0 p! i- }8 ]1 n5 S
<p><strong>drop方法里面用了两个循环</strong>,而不是在一个循环里做完两件事?</p>
" d% M2 _3 |' v4 Z) M7 p" x C$ s<pre><code>for w in self.workers.iter_mut() {
3 j" A% m# G1 n. h2 F0 t' f' b if let Some(t) = w.t.take() {* b& j0 N0 W& N. A% ~; f
self.sender.send(Message::ByeBye).unwrap();* j7 y$ k( Z, f: o9 } O
t.join().unwrap();$ a+ x4 h! d( O8 x% ?. ]8 B
}
# ^" u8 q3 B8 E1 _# M1 d h}
9 @* z; {/ c6 a1 l1 f/ X
, {/ b: t. v8 j8 _</code></pre>
8 k( S2 b" L; U* F2 l& `/ @0 Q<p>这里面隐藏了一个会造成死锁的陷阱,比如两个Worker, 在单个循环里面迭代所有Worker,再将终止信息发送给通道后,直接调用join,<br>0 D- N# q# ]8 [3 @% p1 F) r
我们预期是第一个worker要收到消息,并且等他执行完。当情况可能是第二个worker获取到了消息,第一个worker没有获取到,那接下来的join就会阻塞造成死锁。</p>4 u3 c( ^7 ?9 k
<p><strong>注意到没有,Worker是被包装在Option内的</strong>,这里有两个点需要注意</p>
% m6 U8 O! d( m1 @1 O# g ~% l1 Q<ol>
- V# x: A u# E, g! B; d, U, E<li>t.join 需要持有t的所有权</li>
& l B8 \, b; }# [<li>在我们这种情况下,self.workers只能作为引用被for循环迭代。</li>
5 p' z' M% G) R/ V</ol>
" ~# o5 B0 H& d+ D% I& L1 `' f$ u s<p>这里考虑让Worker持有<code>Option<JoinHandle<()>></code>,后续可以通过在Option上调用take方法将Some变体的值移出来,并在原来的位置留下None变体。<br>
- R, i% D5 q/ O* W换而言之,让运行中的worker持有Some的变体,清理worker时,可以使用None替换掉Some,从而让Worker失去可以运行的线程</p>3 o. X; j! |" w1 n
<pre><code>struct Worker where
, v# l( }4 Z6 M4 B l: r, X1 V{# a3 g9 X* p2 |6 q2 Y! U. V
_id: usize,
. ~; m2 Q& M$ w1 h) T( r8 m/ @ t: Option<JoinHandle<()>>,
& V) C( ^2 {- V( \7 r}
' w: Q$ i( H, Q+ [: e. u</code></pre>: e1 M' }# b! C
<h1 id="要点总结">要点总结</h1>+ Q& B4 [4 m! I# F9 C
<ul>" t. \( q e( g; d3 H
<li>Mutex依赖于生命周期管理锁的释放,使用的时候需要注意是否逾期持有锁</li>: g5 x9 q5 G- e, i) R L( h# \9 Q7 {
<li><code>Vec<Option<T>></code> 可以解决某些情况下需要T所有权的场景</li>
+ X; } a7 u; S( J. J</ul>
! F; Z1 _3 `' C( |<h1 id="完整代码">完整代码</h1>
2 T; O) N# p* x" y7 m8 V<pre><code>use std::thread::{self, JoinHandle};
" [ h3 K& V- Y: q! juse std::sync::{Arc, mpsc, Mutex};( [8 A/ s" ]+ a0 V2 u; O$ U" D
' A5 N1 \3 o/ H( M; M
2 U6 K+ ?. l3 S' a& jtype Job = Box<dyn FnOnce() + 'static + Send>;) n0 T/ L+ ]* X) C9 ~
enum Message {
- m+ }5 ~4 m9 B; P7 E& Q ByeBye,. k) Z+ d+ S# A$ d$ L
NewJob(Job),- Z6 W" b5 f. m6 D. |& }- h
}
3 j" O2 e) s, U! `, |$ K2 ]; c1 E' c0 W) t
struct Worker where0 X5 D* h Y* k# T
{# e4 D- r% i" W- G
_id: usize,) L2 X: u1 S2 c6 x8 p
t: Option<JoinHandle<()>>,
$ g/ T- d9 F& p6 k}
) K% Y. [ N8 A% ^0 D
" n7 Z! X2 T5 n+ ]; e+ Himpl Worker
' y( h9 h! t' X{6 o+ H3 \% @0 A& S/ {0 [
fn new(id: usize, receiver: Arc::<Mutex<mpsc::Receiver<Message>>>) -> Worker {
3 {5 P, \; S8 [1 M+ X% j" i& P let t = thread::spawn( move || {
u q, Y' M% r5 g6 m. `, R: G3 ? loop {
7 J6 t) Y! y+ i& S* y" \8 v6 R let message = receiver.lock().unwrap().recv().unwrap();3 o6 j) n# G7 ?4 E2 m( F" @ T+ z
match message {
$ `- c+ B8 y4 R2 e Message::NewJob(job) => {( Y( N6 d# \* l3 L/ z, n i
println!("do job from worker[{}]", id);
$ P! S) a1 k& P" |' E+ T job();( L2 B/ [, w+ c3 \, @( E, D
}," d3 w/ Y! F2 i- e7 m: }
Message::ByeBye => {
n0 W+ }/ D/ y9 b x+ E' Q9 v* M println!("ByeBye from worker[{}]", id);
' _- |4 Y0 Y( }/ R; f' a! } \ N break
. i* y- |" U& W& b },3 C- O# n: k7 e d# w1 Z( F. _
} 7 |" @+ E0 l9 B" v; Z+ }/ M
}% @" ^: ~1 @- G! z
});' {& K! ~4 A8 q/ S+ R
^! T2 i4 d$ \- x4 H: {/ K1 N& F Worker {. U3 b! E: o: S: H1 H
_id: id,
' u0 |! H& j! g. `1 i t: Some(t),% O1 r/ B5 i8 I J# I# b3 T
} A- m$ ^/ {9 L
}
& G2 ?% J- x* Y% g: u0 D0 ^+ |5 s8 ?/ z}8 c4 s( z0 B3 e# a+ \, D
8 N7 ^9 z2 l7 w3 i; c; c
pub struct Pool {
, i$ G! p6 |. s$ s% G workers: Vec<Worker>,
; T8 c% Q4 [5 x( I& P# @ max_workers: usize,
X1 A" t2 P x) u sender: mpsc::Sender<Message>
( t7 c. i9 C7 W6 r- q2 M# l}
! C R: q; b% T+ J, v1 c) l8 `8 Y G7 c, p
impl Pool where {
' I7 o y* B4 C: K pub fn new(max_workers: usize) -> Pool {% {/ J. M9 {! k& L7 K I' p
if max_workers == 0 {
, u4 }9 T$ k% P$ ?. F panic!("max_workers must be greater than zero!")3 O; w% Q3 L; t- d1 o
}: ?+ s- i! p0 x
let (tx, rx) = mpsc::channel();. g* X6 n5 Y5 ~
6 \) S7 }( l2 m3 p: f6 M
let mut workers = Vec::with_capacity(max_workers);: ~% ?/ r6 ~; O: ?; S# z
let receiver = Arc::new(Mutex::new(rx));3 x( r# p0 T7 e' \/ k
for i in 0..max_workers {
3 M B1 I8 V3 F% W* ?# [1 w workers.push(Worker::new(i, Arc::clone(&receiver)));
8 f/ ~3 I' b- L; j7 ? }
# @% W& x4 e: w3 ]2 t- ^0 |: I# y' @- ~% s/ w9 L( D
Pool { workers: workers, max_workers: max_workers, sender: tx }; d! q5 E1 D0 R8 |+ p6 y- [
}1 Y: p: c8 M6 q+ c
s& B. x+ V; ] s) Y
pub fn execute<F>(&self, f:F) where F: FnOnce() + 'static + Send6 v0 S( h! l( V5 B8 R; R
{! V( R# ]8 @) A8 O0 n* F
% I' B% R% ~3 R$ p let job = Message::NewJob(Box::new(f));
3 S7 @; q# b: x! l' w ] self.sender.send(job).unwrap();
& [% n* N) m. D0 c }
" ?' ~" k, a" X% z6 ~" ?}
; E9 @: q: S- B U0 A1 ^+ S9 ~( W i/ ?1 c: \6 C! o* n
impl Drop for Pool {
1 i- k9 i7 ^6 W# l8 m fn drop(&mut self) {
2 p5 K( r' }$ z6 j for _ in 0..self.max_workers {/ H8 B @" R E! P
self.sender.send(Message::ByeBye).unwrap();" ?! O% c1 ]$ t8 U
}
( D$ }% e$ b x. G" M for w in self.workers {
+ e: i `* o4 k4 Q if let Some(t) = w.t.take() {
: h. o& w# s$ G! _" x& P& n t.join().unwrap();+ c0 R8 W& n8 S2 j
}2 L' l2 I4 n; _# j/ ?
}8 m$ B) f! x: |+ \
}
" ~2 E9 e: L- j+ t) f2 _}
/ M4 m9 c, D) m# g- o9 |2 A
7 B5 _- [5 H% H
% o3 v0 H" J7 d! F#[cfg(test)]
3 E! C X9 g1 fmod tests {
+ B% k2 u0 f4 v) H7 _- D, Q use super::*;. o3 H4 t2 l* }
#[test]7 ]" r9 l+ x" h5 M( V/ r/ o4 \
fn it_works() {; b( c9 @ ], ~" s N
let p = Pool::new(4);
0 N: }* n- o* u1 F( m6 |0 m p.execute(|| println!("do new job1"));2 p8 M. M' s, d- w
p.execute(|| println!("do new job2"));0 ^+ k, O) I+ l- @: h) |" \
p.execute(|| println!("do new job3"));" P+ x6 f; V: w: L% O9 i. V) C' q3 K
p.execute(|| println!("do new job4"));
8 [2 X, N0 b! Y3 [7 I9 Q }+ }# ]9 k& L- \ O7 p( G& w3 Q8 \8 c& j& X O
}
. x0 x5 j7 g* q</code></pre>
* e- J; x( P" e8 u5 o0 y& }- E$ k2 X: o% w% B. _& |- p) y5 b
|
|