|
|
2 S6 P3 O0 k7 Q% L<h1 id="如何实现一个线程池">如何实现一个线程池</h1>
0 @. k; ^+ V6 ?* ~9 B5 U<p>线程池:一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。 例如,对于计算密集型任务,线程数一般取cpu数量+2比较合适,线程数过多会导致额外的线程切换开销。</p>$ i( d5 b9 H h
<p>如何定义线程池Pool呢,首先最大线程数量肯定要作为线程池的一个属性,并且在new Pool时创建指定的线程。</p>
2 o/ O/ y( b# {$ |* `6 J5 S1 h4 X6 C" }<p>线程池Pool</p>
& u# u& F* `. v! y6 v! [<pre><code>pub struct Pool {
7 Y- F7 T( j; Y" t; ]: ]5 Y max_workers: usize, // 定义最大线程数
. W: Z6 A. {) D& X" |$ p/ i}
$ x, ?; H4 u7 y5 M( u6 n* v- U5 g P# h$ N
impl Pool {
5 J/ Y9 n }; F; o fn new(max_workers: usize) -> Pool {}
9 r% Y2 I2 c$ j K2 T3 h" y" C) f. r fn execute<F>(&self, f:F) where F: FnOnce() + 'static + Send {}- b+ c, Z4 W* R+ \- S; e3 K' g
}+ Y0 Y8 i0 c: i C
5 Q- a* b% R3 W* {" |. J" @</code></pre>9 E3 Q d) N7 x5 U9 _, L# Q3 {3 b
<p>用<code>execute</code>来执行任务,<code>F: FnOnce() + 'static + Send</code> 是使用thread::spawn线程执行需要满足的trait, 代表F是一个能在线程里执行的闭包函数。</p>0 E3 I u. e8 e _, D0 s/ e
<p>另一点自然而然会想到在Pool添加一个线程数组, 这个线程数组就是用来执行任务的。比如<code>Vec<Thread></code> balabala。这里的线程是活的,是一个个不断接受任务然后执行的实体。<br>' [0 p, s3 x- M. z: y1 z b. Y" ^
可以看作在一个线程里不断执行获取任务并执行的Worker。</p>
[! V8 O* k5 Y; U5 e<pre><code>struct Worker where0 B1 k9 [8 a1 b6 T* z* y
{* e7 d3 T8 U6 i* m C
_id: usize, // worker 编号
* H; q( }8 Z0 y# Z: Q}- p, t0 A$ ?- V: ~3 j$ R
</code></pre>( V; p# _( Z/ w4 @: c
<p>要怎么把任务发送给Worker执行呢?mpsc(multi producer single consumer) 多生产者单消费者可以满足我们的需求,<code>let (tx, rx) = mpsc::channel()</code> 可以获取到一对发送端和接收端。<br>
* t1 m! w: S8 @' N% y把发送端添加到Pool里面,把接收端添加到Worker里面。Pool通过channel将任务发送给多个worker消费执行。</p>
7 E$ g J; u6 R: X1 W* ?- S<p><strong>这里有一点需要特别注意,channel的接收端receiver需要安全的在多个线程间共享</strong>,因此需要用<code>Arc<Mutex::<T>></code>来包裹起来,也就是用锁来解决并发冲突。</p>
- c9 \- I* g- S% W) a. C& I<p>Pool的完整定义</p>
$ \* x s3 | n. N! E<pre><code>pub struct Pool {# N) B7 ] J9 h. c+ j% o) d( r
workers: Vec<Worker>,! Z/ e2 A6 R! s
max_workers: usize,
, i9 B" k+ k$ e8 Q8 o5 s sender: mpsc::Sender<Message>8 |3 @4 g1 b q3 O* C6 _
}
6 @* A# [4 T8 W</code></pre>
2 T. ^3 ^0 g7 L# j1 H! x! E; `<p>该是时候定义我们要发给Worker的消息Message了<br>
! G R1 X' |1 R$ |% D定义如下的枚举值</p>
$ g4 B7 I I! ^) S- }2 s<pre><code>type Job = Box<dyn FnOnce() + 'static + Send>;( u' W8 n% @, o- ^3 o5 R! f
enum Message {
5 \6 n0 J l+ t# g" @/ r% w ByeBye,
- ^7 a2 Y: i( ]& I7 K# _ NewJob(Job),
( R$ i% O, D+ V+ J/ S' k* p}! q- O( n; t# e3 A3 b* v
</code></pre>
& \5 r$ Y3 }/ S4 G: z& e& v; W<p>Job是一个要发送给Worker执行的闭包函数,这里ByeBye用来通知Worker可以终止当前的执行,退出线程。</p>
! G+ y+ O @8 E; J. @* Y( ^- k% _<p>只剩下实现Worker和Pool的具体逻辑了。</p>
' s( g9 [* X& k<p>Worker的实现</p>9 H8 l* V3 p7 |/ s) ?
<pre><code>impl Worker
/ `( k0 R: I& T5 y, C n{; `6 r& Z3 |: c
fn new(id: usize, receiver: Arc::<Mutex<mpsc::Receiver<Message>>>) -> Worker {
2 K( Y2 Y; l' T8 w+ G3 ^ let t = thread::spawn( move || {
+ s! g0 d6 i: R6 R; g loop {- r' Z- n- T' a
let receiver = receiver.lock().unwrap();
6 U+ T+ K- z& G* `' x1 d' i3 P let message= receiver.recv().unwrap();9 P- {9 b( c! s! O6 l
match message {
( ]; B& e+ f( s- n8 J Message::NewJob(job) => {
" z! V1 J% W/ x: s println!("do job from worker[{}]", id);
0 B( r8 j" j. J4 ^9 H job();, B/ y& l. Z6 s- K" j
},* n/ I* O3 _- w+ N
Message::ByeBye => {% W0 ^' J8 r" j! K3 K; Z
println!("ByeBye from worker[{}]", id);
3 [7 b0 Y) ^) |8 U( l break
0 J; C: O( h" C+ d- X+ E },
3 A. w' [9 v' u! S& t } : ^) q9 W" U6 H8 \1 `* O1 D
}) g! y, z/ m* ~. p, |$ S
});& B& m/ p/ O+ w5 {! }# L
9 O$ Z% k1 ?1 N' I Worker {/ |, v) E# N( H4 {( k
_id: id,
* h( Z# Q" i, c. V6 ]# r7 u t: Some(t),! ?- [* N. b! B5 i4 I5 F
}
4 G4 y L0 g# {& C/ a5 ? }
; O" \- L! N4 w- p) |1 a}7 K( j1 g8 P" Y8 h5 t$ l
</code></pre>8 d) z {% {5 w9 ~5 N; }
<p><strong>let message = receiver.lock().unwrap().recv().unwrap();</strong> 这里获取锁后从receiver获取到消息体,然后let message结束后rust的生命周期会自动释放掉锁。<br>
5 w; t) r& |- w2 ], e! G但如果写成</p>% r; c, E, I$ l* _ z8 z3 S
<pre><code>while let message = receiver.lock().unwrap().recv().unwrap() {. F( R/ }' _& }. ~# [
};& H0 h' N! U2 a' l' a! X1 }
</code></pre>
' }9 k% A; U: ~% v( E! P" I3 g% T<p>while let 后面整个括号都是一个作用域,要在这个作用域结束后,锁才会释放,比上面let message要锁定久时间。<br>+ n( n4 h+ n' Y9 D v
rust的mutex锁没有对应的unlock方法,由mutex的生命周期管理。</p>
% M# g% y6 o- n3 A5 G2 Q' E9 d9 O8 P<p>我们给Pool实现<code>Drop</code> trait, 让Pool被销毁时,自动暂停掉worker线程的执行。</p>3 K0 O* P4 c+ f( d
<pre><code>impl Drop for Pool {
2 T3 r+ q: e5 Y- z* W3 g fn drop(&mut self) {
' V# O0 y5 \* k% W9 o! g2 {- w3 h for _ in 0..self.max_workers {+ O& o9 ]/ f4 O* e5 E3 W
self.sender.send(Message::ByeBye).unwrap();
% t4 Z% w1 m1 P* u( ^6 V. O }3 _. [& F6 }' `1 ^! I" t7 O
for w in self.workers.iter_mut() {
: L$ O& ]% R& r; U# b2 j$ I' x if let Some(t) = w.t.take() {: x: X& B4 J% y
t.join().unwrap();, V q8 H+ R# R$ a
}
& \' I" N0 r3 N4 |& d6 W7 C }" U, O. M: k- ? J8 F
}9 @& G' x: r+ V6 \, X3 c
}
, r3 k/ M" \$ g' Z6 K. c. }
: F3 ]3 l7 J, I8 B u9 L</code></pre>0 p7 P Y& r5 D9 @/ W3 G
<p><strong>drop方法里面用了两个循环</strong>,而不是在一个循环里做完两件事?</p>
; }# o! z5 _0 n% _, K4 F3 w<pre><code>for w in self.workers.iter_mut() {" |4 @" R- j+ [1 x2 g) f
if let Some(t) = w.t.take() {& w! D; e' B/ Y
self.sender.send(Message::ByeBye).unwrap();
r. Q% D1 i( Z! Q# m4 X' b$ `0 x+ [ t.join().unwrap();
; A& v9 U% z2 y, q( A }
& }( t! T" e* i4 x% v1 p/ n) U- O}
: f3 L; s; W& W+ I) W
+ {( V' z5 r. M4 E2 W</code></pre>
" s9 T. l8 [+ U* T) I8 A4 w<p>这里面隐藏了一个会造成死锁的陷阱,比如两个Worker, 在单个循环里面迭代所有Worker,再将终止信息发送给通道后,直接调用join,<br>! p9 M6 p' ]& p- ]3 M9 _# r
我们预期是第一个worker要收到消息,并且等他执行完。当情况可能是第二个worker获取到了消息,第一个worker没有获取到,那接下来的join就会阻塞造成死锁。</p>3 @7 D& I5 N( R0 f B K4 d
<p><strong>注意到没有,Worker是被包装在Option内的</strong>,这里有两个点需要注意</p>
3 q/ ]# r5 B( O# b. P( I<ol>
$ l1 G" G2 E9 f3 S3 P<li>t.join 需要持有t的所有权</li>
) B; K4 d$ k J<li>在我们这种情况下,self.workers只能作为引用被for循环迭代。</li>7 R: V4 |: S, W: q0 Y1 \
</ol>
& m) m7 }% x+ `9 a<p>这里考虑让Worker持有<code>Option<JoinHandle<()>></code>,后续可以通过在Option上调用take方法将Some变体的值移出来,并在原来的位置留下None变体。<br>; Y( r& u. k6 @: @9 L4 [' l
换而言之,让运行中的worker持有Some的变体,清理worker时,可以使用None替换掉Some,从而让Worker失去可以运行的线程</p>) x6 m$ _' U: h1 p J
<pre><code>struct Worker where: u; N7 ~' S4 L, J, b+ h% L0 L# a
{9 h+ k6 \2 h2 e4 w R0 P4 N
_id: usize,
+ X- d- L" p' X+ l7 d# m2 t1 i t: Option<JoinHandle<()>>,( C- G0 \5 f& i( ?; z
}
8 Z$ P9 }$ F5 m1 E. [$ g6 T</code></pre>8 M4 L% \0 S( `9 p3 Q
<h1 id="要点总结">要点总结</h1>% f+ I1 e1 A8 K: i% q9 S
<ul>9 i g4 u$ Q- k9 h7 m3 H) N
<li>Mutex依赖于生命周期管理锁的释放,使用的时候需要注意是否逾期持有锁</li>
, F* @/ \1 ?+ K) { S! s: p- g<li><code>Vec<Option<T>></code> 可以解决某些情况下需要T所有权的场景</li>
5 Z( `( a o* ^- D8 V</ul>
8 r' y% T. L6 ?$ b0 F<h1 id="完整代码">完整代码</h1>" j, \1 j. q) ?
<pre><code>use std::thread::{self, JoinHandle};. b7 _& C. n1 E
use std::sync::{Arc, mpsc, Mutex};
1 u8 {) {* z$ h9 Z; g" M& {" y2 I/ T5 N! z- A6 t7 l/ y) S
6 M0 Q9 b! {0 N3 `7 _1 c; [) D
type Job = Box<dyn FnOnce() + 'static + Send>;' t. o; D7 u# W+ R6 y- o, U
enum Message {
8 H6 F+ P# b i' l" B' Q' b; ~4 y ByeBye,, J- y; P3 F8 N; Q
NewJob(Job),
7 Q4 T8 n8 ]' q2 Y5 a/ c}
5 T- x) |/ o9 ~( v- {* S- \1 `! Z3 M: ]; y7 E
struct Worker where1 H3 u7 j$ o, |
{
$ A5 d9 @% V) F9 t _id: usize,
* ?: {+ d) J7 B& Z. B( o t: Option<JoinHandle<()>>,
0 F0 Q$ d1 u% p& q% ~, ?4 H}
3 L) h; ^+ Z7 P
1 s! o/ y3 j1 m* l7 Dimpl Worker
; [$ O6 {$ P( X/ l w/ D{, m) t/ k9 L, N1 U$ u& K% U
fn new(id: usize, receiver: Arc::<Mutex<mpsc::Receiver<Message>>>) -> Worker {
6 d2 Y% O% t4 D6 w) Y8 N1 ? let t = thread::spawn( move || {
5 q3 [( X+ I1 f: |" |% k loop {
2 X" k) B. V$ L% e- ~! c4 L1 W# J let message = receiver.lock().unwrap().recv().unwrap();
. z, p) i. O- m3 G* H2 i V: D4 j match message {- L# \4 z3 O* ~4 y! K9 o( t
Message::NewJob(job) => {1 z! z- |$ N( F' d9 D' @+ ~
println!("do job from worker[{}]", id); U# ^! x' q$ J& j5 M3 |$ U
job();9 a# \* d* X; D! A" Z) L
},1 @% h% {# c3 Y: x* v7 K
Message::ByeBye => {
, y5 T4 v' m4 i! H println!("ByeBye from worker[{}]", id);$ J! a1 M& {- F" J2 @6 B" b; j% C
break
( p7 Z% m" R% Y; V: a },9 e9 Z! f2 g3 Z' b
}
8 ?( | Z H. D! t: z$ d# N3 e) ? }' P/ g; P5 t6 j n) J, K
});5 H" f1 b8 @1 g; F2 T
; Y }: ?2 n" v. ?6 ^. ? n; C! s& S Worker {( ~ d4 N3 ?# @2 f1 ~2 H
_id: id,% ]6 {6 u! Q, Q; D' u: Y8 N/ u
t: Some(t),- {5 e& I0 N3 ]3 @
}3 c8 l* N: h+ x8 j% g* Q
}8 k; _# T1 Y( a- A/ j
}- W f7 C# G0 Y) G5 s
$ R$ S( r) ]/ d$ B
pub struct Pool {# Y1 Y+ k4 m+ ?5 a" H
workers: Vec<Worker>,
9 B; a& o- ]8 ] m max_workers: usize,
2 H" k& C; N2 _1 S8 a4 K8 ^ sender: mpsc::Sender<Message>
! b9 v& X2 J/ U. N" y; |}
8 F; W# P$ W, T9 t% A; q. b) |$ q }( D- k2 ]9 t1 l6 [5 k7 g3 h
impl Pool where {% t. F# |7 ~8 F- `
pub fn new(max_workers: usize) -> Pool {% h4 Q/ R: [/ d# [: m0 j6 d) X. J q8 z8 `
if max_workers == 0 {
. R8 f0 J% h& j: o7 A) |6 h panic!("max_workers must be greater than zero!")
1 R+ _# l& O+ T: B5 |8 y5 r% s }
( {1 {! x( z/ N! D$ w: j- |* q let (tx, rx) = mpsc::channel();5 B# j2 q6 ?: A ~* }6 Q
. v0 S% q h" i% @' I) }
let mut workers = Vec::with_capacity(max_workers);6 h: e- r n' B, v9 {* v
let receiver = Arc::new(Mutex::new(rx));9 Q: g* }. p9 o( B
for i in 0..max_workers {
9 ?. Y5 Y9 a+ G workers.push(Worker::new(i, Arc::clone(&receiver)));# T# [3 u* i: p
}" M1 L& B, Y$ y8 i# _4 [9 D0 k
+ N! C* x8 j- F6 {, W9 G# e& V( b Pool { workers: workers, max_workers: max_workers, sender: tx }
, [1 t0 N/ Q) g2 N( o J' i }! y" I# X9 @: j, f
) p+ k- D; e$ h( o# I3 k5 @
pub fn execute<F>(&self, f:F) where F: FnOnce() + 'static + Send+ r! k( u% w+ x0 Q. K/ ?/ D
{- {5 Q A, T# e2 P1 |
; w% U/ Z" a; J1 o; q1 N
let job = Message::NewJob(Box::new(f));
. _' Q3 l& ^+ e. l5 Z self.sender.send(job).unwrap();
- x* K [7 _" {1 w8 ]/ V }
, O( o! {7 N$ O! I2 ^. g}
8 E7 h3 i% |: T. n+ X" v2 u' i- ?; g5 G5 W& a
impl Drop for Pool {/ |& n3 N, e7 z/ B5 `/ X
fn drop(&mut self) {
0 F1 \+ Q& D+ V8 s9 X% A for _ in 0..self.max_workers {
5 M. s# X" R- t4 J3 H) W* F self.sender.send(Message::ByeBye).unwrap();
0 q0 p/ G' c. C. h6 M8 n' C" K7 m }
5 ], }0 F w+ ?. T for w in self.workers {* {( v/ W% o- B# G% D
if let Some(t) = w.t.take() {1 [8 _7 l( B2 i+ _7 }& N; {
t.join().unwrap();
9 t! }: S1 D/ n5 b }, V0 h4 F. b7 A% U4 T( {9 f
} y5 d; J/ M; m5 ]
}; X+ I% i9 ^! d' z$ N# s
}1 ~9 x+ a+ v Y v1 N# X) O
/ r" M+ U7 o4 w* B
" A5 @! u; n" F0 @#[cfg(test)]
# ]2 [5 h6 G' w V, Lmod tests {0 z' u$ U# D# f, E5 _
use super::*;
& i; c+ G& P4 O# C; q' R1 c9 X #[test]2 t: i! m; x; {0 Q: N8 B4 a
fn it_works() {. A/ u% n1 d# j( x }3 j- p6 z0 M
let p = Pool::new(4);
0 F' [5 ], d" U9 e7 x4 Q) Q. e p.execute(|| println!("do new job1")); V2 s" {2 H. T( r' N
p.execute(|| println!("do new job2"));
2 D% k8 |% Y7 D# E p.execute(|| println!("do new job3"));8 B* D% |( C! P0 k: }
p.execute(|| println!("do new job4"));
* o# D; `0 l9 q+ `% o, l }' G: Z" Q& {3 R
}
' l' X0 M+ B2 _ O6 h</code></pre>! o. b% B: l4 I- v" O ?# z* d. p
3 |4 o$ [" l5 `$ O
|
|