|
|
, A* i$ V! l& t3 N. }* f<h1 id="如何实现一个线程池">如何实现一个线程池</h1>
$ z5 F5 d& M7 E" `6 N! F& m; @<p>线程池:一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。 例如,对于计算密集型任务,线程数一般取cpu数量+2比较合适,线程数过多会导致额外的线程切换开销。</p>
; H( ~! [7 J" @+ e* a<p>如何定义线程池Pool呢,首先最大线程数量肯定要作为线程池的一个属性,并且在new Pool时创建指定的线程。</p>
8 w2 m3 f' o3 B" Z- Y" L<p>线程池Pool</p>
; n _8 o: f: B$ `5 b% M4 {$ E<pre><code>pub struct Pool {/ Z$ [8 e! f' Y" n. L
max_workers: usize, // 定义最大线程数& x: u. s. O/ a: `' O8 D) e9 f: F, I/ D
}+ t3 X- u, V/ ~1 _6 d
0 C1 @2 w3 A0 J% u Y! ^1 @- v
impl Pool {# N- ]0 y6 y) q) ~& f
fn new(max_workers: usize) -> Pool {}; i; e5 d( }9 N' Y- r
fn execute<F>(&self, f:F) where F: FnOnce() + 'static + Send {}1 u+ F7 R' _* k8 h0 Y7 @7 L. _4 |
}
) s" W6 n( k1 M6 j$ Q
6 F5 _5 C. a$ r. C" B$ P</code></pre>
# x) X! R) y/ w `<p>用<code>execute</code>来执行任务,<code>F: FnOnce() + 'static + Send</code> 是使用thread::spawn线程执行需要满足的trait, 代表F是一个能在线程里执行的闭包函数。</p>
# k2 Z* l9 S! B! n8 C# z<p>另一点自然而然会想到在Pool添加一个线程数组, 这个线程数组就是用来执行任务的。比如<code>Vec<Thread></code> balabala。这里的线程是活的,是一个个不断接受任务然后执行的实体。<br>* `2 v0 v. P) J/ v# `/ y1 t
可以看作在一个线程里不断执行获取任务并执行的Worker。</p>
8 [, o1 v, Z( d3 ], P9 k6 H<pre><code>struct Worker where
2 D5 V$ O$ s" v; S{
5 c# G# H/ Y$ G _id: usize, // worker 编号8 Q, c( |1 q' v3 n% e1 L5 ^
}6 K& w9 \0 [' U5 \
</code></pre>$ \' I5 U6 a I6 Q7 i
<p>要怎么把任务发送给Worker执行呢?mpsc(multi producer single consumer) 多生产者单消费者可以满足我们的需求,<code>let (tx, rx) = mpsc::channel()</code> 可以获取到一对发送端和接收端。<br>, Z% K) o* M5 U( i
把发送端添加到Pool里面,把接收端添加到Worker里面。Pool通过channel将任务发送给多个worker消费执行。</p>
0 n2 T C& l/ l- f s1 N; G<p><strong>这里有一点需要特别注意,channel的接收端receiver需要安全的在多个线程间共享</strong>,因此需要用<code>Arc<Mutex::<T>></code>来包裹起来,也就是用锁来解决并发冲突。</p>( J9 c( R: D' R3 U- j
<p>Pool的完整定义</p>0 G/ m3 o0 G! ?& [& f6 M" z
<pre><code>pub struct Pool {( Y9 Q: u; e/ r' B1 j$ F
workers: Vec<Worker>,' p x) l' _; s5 R/ w
max_workers: usize,' G# F6 g1 g6 L. Q7 D$ u$ D
sender: mpsc::Sender<Message>
8 k3 f6 Z5 b) k$ w! A9 u}- o' u1 F6 w! U
</code></pre>, W8 U% T( T1 J" x+ A/ R
<p>该是时候定义我们要发给Worker的消息Message了<br>
+ Z9 T4 F+ a% |3 X N定义如下的枚举值</p>; B) p( A- Z4 _: c2 I8 C
<pre><code>type Job = Box<dyn FnOnce() + 'static + Send>;/ s- \1 w2 L! C% C$ ]
enum Message {9 g" o+ h0 g! T2 W
ByeBye,9 U4 L: x) W( F7 |
NewJob(Job),
& J6 r% }% _/ g( G8 E( h; L1 r}, Q p% y2 W) Q$ \, Y: i3 h; i
</code></pre> N* B* k% C2 x# P2 \0 D
<p>Job是一个要发送给Worker执行的闭包函数,这里ByeBye用来通知Worker可以终止当前的执行,退出线程。</p>: n& Z3 q3 V1 C0 U
<p>只剩下实现Worker和Pool的具体逻辑了。</p>
6 L4 @# f: [* p9 R5 o! \# ~<p>Worker的实现</p>4 S8 q8 O. U% f6 _* j2 Z/ `
<pre><code>impl Worker( J- K) g. |$ {; x" j1 `
{; m2 a" h" M! r' ^
fn new(id: usize, receiver: Arc::<Mutex<mpsc::Receiver<Message>>>) -> Worker {3 q) N$ O. J- Q m1 G
let t = thread::spawn( move || {
: N8 v7 |2 g( u0 _( Y8 ]( M2 A loop {; K# y2 H$ L8 x
let receiver = receiver.lock().unwrap();
# R8 I) t. f5 r& v+ a let message= receiver.recv().unwrap();, i) \6 N0 K/ l- [0 F
match message {$ a1 Y, [- c$ c
Message::NewJob(job) => {
- x. f0 y2 l$ e% g0 } println!("do job from worker[{}]", id);
$ }% P. b! ?/ A( H2 J job();2 D& ~ n% N" z4 |) g8 x( |- a& \
},
" U q/ x/ Y2 F x' | Message::ByeBye => {
$ I9 r% f! M& q, [7 s println!("ByeBye from worker[{}]", id);( K; i* a1 | L' b; c
break; W+ I: i) u, S2 S+ u1 }+ S
},
) W/ E4 `* B% f" H x }
% C) v+ G) P) \% g, \7 X& j3 ?8 y! d }3 a ]7 [. l/ E7 H7 V& Y
});
; c+ T* `8 y. W X. [# Q9 \4 r/ h/ A9 `+ I& {
Worker {
% T+ ^ c5 Y* O, P1 z( i _id: id,
5 u1 `( Y# R6 f* m1 l+ N t: Some(t),- Q9 Z+ {$ ^) x5 ~
}
9 M1 F0 P* Z3 B3 A }
# I5 M5 A# Q1 V; A4 f& S8 u, R}% l+ j& K5 \$ S# S) F! N7 }
</code></pre>* m: y; x/ }' ]" W: {6 r' s
<p><strong>let message = receiver.lock().unwrap().recv().unwrap();</strong> 这里获取锁后从receiver获取到消息体,然后let message结束后rust的生命周期会自动释放掉锁。<br>* w& U2 {( \* q
但如果写成</p>
0 q! _4 G% M* _7 h: T" H! O<pre><code>while let message = receiver.lock().unwrap().recv().unwrap() {# G2 y" ]! F8 J/ V' H( v* |
};. y' \: R/ k+ N- q$ V1 X" b
</code></pre>& l7 i& K8 A) a7 Y" v. O
<p>while let 后面整个括号都是一个作用域,要在这个作用域结束后,锁才会释放,比上面let message要锁定久时间。<br>
& S: h2 }, j% I4 ]" x6 K9 qrust的mutex锁没有对应的unlock方法,由mutex的生命周期管理。</p>
' D3 |" ]/ j R4 U6 B<p>我们给Pool实现<code>Drop</code> trait, 让Pool被销毁时,自动暂停掉worker线程的执行。</p>
# I; S. ?# T7 G( x* F0 T: k<pre><code>impl Drop for Pool {! \1 u9 K w2 P( ]( v; o, {% c5 R
fn drop(&mut self) {
1 t4 s* g4 c# d for _ in 0..self.max_workers {
& V- D7 U3 W* `* c7 } self.sender.send(Message::ByeBye).unwrap();; m4 j; z2 N2 P* d" j- P& K( _ A
}
1 K5 x* f" I4 S: @* X6 i for w in self.workers.iter_mut() { V. K" M8 b3 J- s V
if let Some(t) = w.t.take() {+ n) B0 A+ I* j( x3 F1 U7 c& _ W
t.join().unwrap();6 [" [* U( f# a% x
}5 A8 t( ]/ @4 o; A4 Q7 W
}$ Q9 @( B9 A# T! \
}
$ j2 V( W, s/ G) I}
* z! D8 G. o& q( r, y
' k8 g3 W3 ^' d% B</code></pre>
' h* a* `" C4 g" D" [2 E<p><strong>drop方法里面用了两个循环</strong>,而不是在一个循环里做完两件事?</p>7 u1 [( ^& w6 L7 I5 m
<pre><code>for w in self.workers.iter_mut() {7 L, N% P1 \$ E/ P
if let Some(t) = w.t.take() {6 Y- I ~- I* }
self.sender.send(Message::ByeBye).unwrap();
0 z, D, O; f; Q( a( o8 |4 P t.join().unwrap();
& R7 ]+ b8 q* n' W8 @" C6 i; P }0 z: v% e) X& Q
}
% Q5 ]( u) _! ?% D o) @
- o( p0 \3 o# H+ F+ X</code></pre>5 \: _" X. J( @ F" ?$ C
<p>这里面隐藏了一个会造成死锁的陷阱,比如两个Worker, 在单个循环里面迭代所有Worker,再将终止信息发送给通道后,直接调用join,<br>
( |) Z% E! }4 @, B: a5 N我们预期是第一个worker要收到消息,并且等他执行完。当情况可能是第二个worker获取到了消息,第一个worker没有获取到,那接下来的join就会阻塞造成死锁。</p>) Q$ t! R( N. d$ d+ F& J- }
<p><strong>注意到没有,Worker是被包装在Option内的</strong>,这里有两个点需要注意</p>
+ L) L+ V5 G W/ k+ V3 F/ D<ol>
0 S0 b2 Y2 x* [) @( t<li>t.join 需要持有t的所有权</li>* k! Y8 k2 h) \' B# e
<li>在我们这种情况下,self.workers只能作为引用被for循环迭代。</li>/ Z9 x2 Y0 }- S2 k$ z! H G! s
</ol>
T' b9 d+ [8 {; t0 Y<p>这里考虑让Worker持有<code>Option<JoinHandle<()>></code>,后续可以通过在Option上调用take方法将Some变体的值移出来,并在原来的位置留下None变体。<br>. M2 t( h9 e# J# G+ \6 @' F
换而言之,让运行中的worker持有Some的变体,清理worker时,可以使用None替换掉Some,从而让Worker失去可以运行的线程</p>! w0 o- N- q2 E" N! N3 R
<pre><code>struct Worker where
8 V- c' ?# i% S2 v7 G; d& ~4 C+ V! g0 r- M{
. h- i- i( c7 {2 r8 e! h _id: usize,0 p: n+ p( Q3 y. e, N! `
t: Option<JoinHandle<()>>,8 v% U5 |2 V) b% p! h! M/ ^8 t
}
; s4 B7 ?$ k/ C1 k: F4 ?* q</code></pre>+ X# U. j3 Y( G
<h1 id="要点总结">要点总结</h1>4 Z- w8 h( s9 ?8 q9 n0 e6 [
<ul>
# }( n7 |& A8 |; H2 C; x- a<li>Mutex依赖于生命周期管理锁的释放,使用的时候需要注意是否逾期持有锁</li>9 V$ B) G' I9 @( h; K
<li><code>Vec<Option<T>></code> 可以解决某些情况下需要T所有权的场景</li>' z$ }6 w3 l2 u
</ul>
. C# r6 q6 ^# N) o' F) U# l- s<h1 id="完整代码">完整代码</h1>
& \+ T( R+ y3 M- }<pre><code>use std::thread::{self, JoinHandle};4 q y" m. [8 u. o) n4 E
use std::sync::{Arc, mpsc, Mutex};
& p' \2 T$ o1 r7 y, M: @! [5 _, @ D) u
6 a5 m( U2 ^1 d& p* ^% Ctype Job = Box<dyn FnOnce() + 'static + Send>;
) m. G8 o% x+ i% b: y3 Uenum Message {4 n- n. D9 f! N: K8 `
ByeBye,
* ` ]1 m# M$ w& g) ~ NewJob(Job),# P8 m- b# m' w d& w
}! k& M6 W4 _8 k* P! o( v( T
/ f7 u( q" J" x, y% ^9 ~$ U/ m
struct Worker where) W/ ]7 }0 ~( Q
{9 Z W0 u5 S; n/ @* ~
_id: usize,
, N+ v& T% G$ {9 [' G( h4 B7 P t: Option<JoinHandle<()>>,
" W3 f! R9 c' U% j}: T( `# h- P: j5 e
$ r$ L* W8 v: N# A7 F2 f$ h( J, D
impl Worker$ f% Y( [. v. s( z$ `9 B! S) j
{
3 Q5 |( T& a9 i/ t3 \+ E; j/ g. c+ h' L fn new(id: usize, receiver: Arc::<Mutex<mpsc::Receiver<Message>>>) -> Worker {4 G: K0 p4 E9 G1 T
let t = thread::spawn( move || {# u8 n& H4 H9 O: k" t8 w
loop {
: \$ O' C# k/ J4 W+ t0 G. _ let message = receiver.lock().unwrap().recv().unwrap();
) E7 j' y: ~* x( t5 }0 p match message {
; V6 t0 A. {' ?3 K* ~/ t Message::NewJob(job) => {! ^- Z' G3 z! u0 j( r: v* _; d
println!("do job from worker[{}]", id);
: ]" P. m3 U) m3 N job();
3 j0 D" K7 l$ a },9 T: n8 G5 J3 ^6 M/ t) V5 w5 Z2 U; o
Message::ByeBye => {+ n( s$ I3 P: I1 }2 a0 r4 m K
println!("ByeBye from worker[{}]", id);
. J6 P; i9 c4 ]6 c4 I break/ b5 n" E" q& K; D
},6 A& V# n# B ?4 u* I6 @
} & X/ r' O- i; J* z
}
& \8 F! r& B9 ?- L });9 S q3 K4 }9 [+ b' K/ H- t; U
5 ~9 X' J7 T. X* n) ]
Worker {
+ ~# N: c3 B3 Y0 J; @# i _id: id,. d0 m& h5 u) V. K6 x2 K
t: Some(t),
6 j `; i6 f% ? z+ t) \ }
6 n" I: \9 ]; f, Q8 _0 f }. }1 K/ M1 A% I5 J3 o, R. ~" u
}
: k6 y. _+ _/ W- e
7 [ K: x w+ Y/ C- F# upub struct Pool {) V& X7 ?: E, R
workers: Vec<Worker>,5 \' w2 y, S3 s0 f+ C {0 y6 A+ _4 Q
max_workers: usize,
# b H( I7 n: M3 Y6 B sender: mpsc::Sender<Message>
' Q/ n/ D0 s7 j+ |& i5 w o, L}
( c" F5 V0 x# n- V9 l& }
/ L' K% {3 X7 C) u7 t; V" }impl Pool where {# W- z4 ^# m6 G: D' }
pub fn new(max_workers: usize) -> Pool {. W! T# \$ ~* F9 P! f
if max_workers == 0 {/ Q, A7 I2 b$ P* ~
panic!("max_workers must be greater than zero!")( [0 C9 p0 L* p; |9 u' p% _; |
}2 l+ k9 ]2 r$ d: G; w& c) q. u# d
let (tx, rx) = mpsc::channel();; Q: j0 E' y3 U3 x
6 w5 v" v: r: c" g0 O) Y let mut workers = Vec::with_capacity(max_workers);1 v2 U1 [) _- {
let receiver = Arc::new(Mutex::new(rx));: G3 E+ P/ z. {$ E3 {5 L
for i in 0..max_workers {
- ~* L, b: |' g3 h0 y; D6 \5 { workers.push(Worker::new(i, Arc::clone(&receiver)));+ u* |) i; x5 y! f- l! D
}1 V8 c) F# A6 E
2 m9 U. \% J |# y2 U Pool { workers: workers, max_workers: max_workers, sender: tx }6 p' w3 W' r7 ~2 A" m6 S
}
& _/ i& o6 A* F0 H. @) { M: F/ ] 6 j! M" x( h* Q
pub fn execute<F>(&self, f:F) where F: FnOnce() + 'static + Send% _+ w% A8 e7 l4 a! h9 k$ e5 N
{
: G0 U; d8 _ ^/ g/ ~$ D" K2 o4 v" t3 L! B% h+ A" `
let job = Message::NewJob(Box::new(f));/ S# K3 K+ N5 E, S* P3 J
self.sender.send(job).unwrap();; O0 |! l: V9 C+ o1 _: L2 b3 P
}
. Y! N% l z7 S, ?}
' T& N" k; S# F) }3 S6 g8 |0 g& K
impl Drop for Pool {
/ V" V1 B0 r4 K2 G fn drop(&mut self) { L' |5 P* p+ U d
for _ in 0..self.max_workers {; A/ {7 v) y& f. N8 ~
self.sender.send(Message::ByeBye).unwrap();
{* v* J/ Z2 \# D: E0 N }1 w* F T: p/ _/ V; b
for w in self.workers {% K# Y& O- l$ C# H# Y
if let Some(t) = w.t.take() {
2 a: Q& U, ]% ] ]+ y t.join().unwrap();7 U& U& [. v) \( {" s
}8 i( m, @$ }0 j" o
}" @, a% i9 Y: \0 ?' c' z# J ]" e, G
}! I* s8 Y' ^- ^: T/ G% x% F6 u
}
4 a" Q$ f3 @0 ?, R6 b8 L) L6 D
- ?6 M+ j7 G4 V) g, ^( Y
$ D# i d/ Q7 r8 }+ U! I5 n+ t5 X#[cfg(test)]
6 h: W; U# }+ R* C; hmod tests {
: T6 l2 l6 {2 P+ t' I; ?( n1 @ use super::*;
$ U0 _/ e! ^3 ^7 T; I4 ] u8 S #[test]
- {7 M5 [$ E( O6 n4 H9 _% _6 I f! P fn it_works() {
- P; h0 V. p7 C0 B4 }% {4 K let p = Pool::new(4);! ]: G6 Q) T% V5 \8 R
p.execute(|| println!("do new job1"));
3 l4 ]7 @! w/ w4 k# d p.execute(|| println!("do new job2"));
' c+ Y4 H z! N& p/ v/ r p.execute(|| println!("do new job3"));
1 ?; q6 x7 c, @* l r- h p.execute(|| println!("do new job4"));
1 S8 k' [' b( L; e, A# z$ N } ?7 Q0 R* c$ T. O# u
}
" D/ }9 g$ _6 o, ^$ X4 U</code></pre>& G9 R$ H+ x, l
}5 ]7 l$ N3 {( X |
|