|
|
+ k) I' X/ u& F<h1 id="如何实现一个线程池">如何实现一个线程池</h1># W. O2 K7 m( g3 @6 i+ E
<p>线程池:一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。 例如,对于计算密集型任务,线程数一般取cpu数量+2比较合适,线程数过多会导致额外的线程切换开销。</p>
( o. m0 P9 X% ]3 N* M<p>如何定义线程池Pool呢,首先最大线程数量肯定要作为线程池的一个属性,并且在new Pool时创建指定的线程。</p>7 R0 q2 y" H* }9 n, ^
<p>线程池Pool</p>
3 ~: M$ k/ a, z* y& E<pre><code>pub struct Pool {
. C0 Z9 E3 @1 K: I5 @ max_workers: usize, // 定义最大线程数
7 k, V6 d' b7 p2 Q: C}6 h6 _- ~, Y+ K) n- h, G8 m
& v& m* H5 l1 h
impl Pool {
4 d( n' i0 L7 S; Q. i' L fn new(max_workers: usize) -> Pool {}
" x+ O& z1 K. j; V) F4 K fn execute<F>(&self, f:F) where F: FnOnce() + 'static + Send {}8 n! g2 _8 D7 `+ t [" E( v6 ^
}
: ]; ?1 P, G* Y4 m; K4 V8 r" ?, p! |: d5 D0 [8 q% e, z
</code></pre>: u5 ^- h: m; l
<p>用<code>execute</code>来执行任务,<code>F: FnOnce() + 'static + Send</code> 是使用thread::spawn线程执行需要满足的trait, 代表F是一个能在线程里执行的闭包函数。</p>. V5 o' p$ P! J( J6 M0 g
<p>另一点自然而然会想到在Pool添加一个线程数组, 这个线程数组就是用来执行任务的。比如<code>Vec<Thread></code> balabala。这里的线程是活的,是一个个不断接受任务然后执行的实体。<br>
, d6 b! x9 H8 G2 f8 j可以看作在一个线程里不断执行获取任务并执行的Worker。</p>9 @6 c- B5 a8 f+ C% a2 `
<pre><code>struct Worker where
l# D$ N$ \, K+ k) v{
' O; a' f& P" ^* t9 W1 q _id: usize, // worker 编号% N( U) X( g+ Z
}4 T5 U- T6 y6 Y' O
</code></pre>
: V, ^: q2 h4 ]<p>要怎么把任务发送给Worker执行呢?mpsc(multi producer single consumer) 多生产者单消费者可以满足我们的需求,<code>let (tx, rx) = mpsc::channel()</code> 可以获取到一对发送端和接收端。<br>$ U1 i. P% \! }, F& v
把发送端添加到Pool里面,把接收端添加到Worker里面。Pool通过channel将任务发送给多个worker消费执行。</p>- K; n1 B# E. Q9 ^; S/ r
<p><strong>这里有一点需要特别注意,channel的接收端receiver需要安全的在多个线程间共享</strong>,因此需要用<code>Arc<Mutex::<T>></code>来包裹起来,也就是用锁来解决并发冲突。</p>
g9 E6 z+ {% | P<p>Pool的完整定义</p>
3 ~9 c7 ~0 Q/ T, R7 L<pre><code>pub struct Pool {% k+ f2 E9 A' ]& @) Y# Z1 T. ?2 q' x6 |
workers: Vec<Worker>,
g) y/ @; U- ~7 j! P: d+ ?. k! u max_workers: usize,9 S d4 s9 H! X- h1 h* g' T' P
sender: mpsc::Sender<Message>
. G3 ?, Q: W/ l, v* F5 J}
4 {( o+ a6 ^2 @# i, p; n9 y</code></pre>
: q3 S$ @; t( T4 [3 T<p>该是时候定义我们要发给Worker的消息Message了<br>; d/ V" U9 I: f- F/ Z& B
定义如下的枚举值</p>
L6 |+ s" Q- w; a0 N- }9 v0 R9 E2 Z# R<pre><code>type Job = Box<dyn FnOnce() + 'static + Send>;3 S1 ?$ ] c. F5 ^% ?8 b, S
enum Message {, S1 S6 @# s: a9 w2 d2 O
ByeBye,
5 t$ W* I8 c. T0 d NewJob(Job),
2 d# `! ]( p5 |+ y1 j1 E4 c8 p}
3 j$ ~' D( P4 A. k</code></pre>
9 [1 M) i4 j& g<p>Job是一个要发送给Worker执行的闭包函数,这里ByeBye用来通知Worker可以终止当前的执行,退出线程。</p>
v J: r3 F4 ]$ R9 k1 b<p>只剩下实现Worker和Pool的具体逻辑了。</p>: o4 q4 C7 e& P) c% Q1 F: q
<p>Worker的实现</p>
5 R4 f K* w; q' Y& R<pre><code>impl Worker
! ?9 t" k Q' d- f/ ^8 p) U5 W{$ s7 S* V/ b: W! C
fn new(id: usize, receiver: Arc::<Mutex<mpsc::Receiver<Message>>>) -> Worker {, J; Z6 L8 G' C$ a3 x* X! U' }- X2 i
let t = thread::spawn( move || {
. G) ^5 {4 P- I/ V& R0 q loop {5 k$ |2 P9 v2 C1 P7 i$ z
let receiver = receiver.lock().unwrap();
2 W5 w% p# \( \( H* L+ |' j; z let message= receiver.recv().unwrap();) x- w$ L; }, d G9 X
match message {( I, j7 o. l7 R
Message::NewJob(job) => {
- d4 g* `! u& ]6 f$ N println!("do job from worker[{}]", id);
/ U- Y6 M" ~4 W7 G% U job();3 K+ C( P* M- ?+ U% _
},6 k' M$ H7 m& h1 R6 e/ o$ e" p5 a
Message::ByeBye => {
) e, ^; p7 G; \$ V! j println!("ByeBye from worker[{}]", id);
/ A6 z# C7 ]8 D+ ~' S. p break: L5 F# E0 A6 X1 a; j" V
},
8 W3 x) \6 @4 D) f/ f4 y1 D' N } 0 c" I- D" A( S, J
}
5 J3 H# A$ ]+ Y& @ });
9 n V( \1 m0 z2 z6 d
% q1 `& c2 p( l) P, M Worker {
/ K* I9 a* s/ X5 `( b _id: id,
1 v$ _- \$ s6 T' P; j t: Some(t),( n7 ~- J4 I& Q3 g: Z
}& N" C% k# z$ a, a2 c' \
}
( K6 ~0 j% d6 R9 l7 A, \}% |6 g) }5 F, O6 x' D2 [
</code></pre>' C# Y! W1 U! |( h
<p><strong>let message = receiver.lock().unwrap().recv().unwrap();</strong> 这里获取锁后从receiver获取到消息体,然后let message结束后rust的生命周期会自动释放掉锁。<br>& _ d6 v. L( R
但如果写成</p>
' z: h/ `$ M' S6 X8 ]! T<pre><code>while let message = receiver.lock().unwrap().recv().unwrap() {
( J, p8 m- _5 @};3 |3 s1 P8 G3 o2 h' L
</code></pre>
; ?/ N5 g* X# }% A* N<p>while let 后面整个括号都是一个作用域,要在这个作用域结束后,锁才会释放,比上面let message要锁定久时间。<br>9 J+ L$ B" D7 h# V3 p; ^9 W) J
rust的mutex锁没有对应的unlock方法,由mutex的生命周期管理。</p>& b- O& m/ a* h$ y
<p>我们给Pool实现<code>Drop</code> trait, 让Pool被销毁时,自动暂停掉worker线程的执行。</p>
3 k8 w. k& B* R* U2 g<pre><code>impl Drop for Pool {
/ f) Y* O+ j" z3 e7 I) j fn drop(&mut self) {! c. K9 @+ }4 o/ z2 C& _2 ^
for _ in 0..self.max_workers {' w% n1 z1 X. C: `! V
self.sender.send(Message::ByeBye).unwrap();" j4 |# o& S4 x
}
3 D8 r U8 V* X5 D1 Q: Z* v for w in self.workers.iter_mut() {; O8 M8 V7 h7 a) k& N8 B
if let Some(t) = w.t.take() {
( Y* s* A, D/ M* S t.join().unwrap();
" {1 Z' ], W7 x: W- i }
. f6 b% w f; m5 k! t }9 w7 d( m# C# T3 F
}
M4 T% }- {" W0 _- l1 F) e0 X$ R}
/ { O- {) J( [6 g0 R& N' `- U+ k) D, |1 `
</code></pre>
7 O9 N* ~! t5 n& A" E1 J8 B<p><strong>drop方法里面用了两个循环</strong>,而不是在一个循环里做完两件事?</p>
+ z' l; V! M4 r4 h Q. V4 T! u<pre><code>for w in self.workers.iter_mut() {
! b; e, ^% H* k% }1 } if let Some(t) = w.t.take() {
! E- b$ }- j* @ self.sender.send(Message::ByeBye).unwrap();! ^$ a% p6 m9 S! a; V" @
t.join().unwrap();0 \. T+ w8 [' g: l4 C) G; K& x
}
' O2 E4 t! ~1 s7 c} i6 _. U' z$ F" s
' |* j, l9 k: x8 w
</code></pre>
# c: J# u9 I# v: ]& e4 H2 F- e<p>这里面隐藏了一个会造成死锁的陷阱,比如两个Worker, 在单个循环里面迭代所有Worker,再将终止信息发送给通道后,直接调用join,<br>8 }$ {8 N" o) @8 f* ]1 D
我们预期是第一个worker要收到消息,并且等他执行完。当情况可能是第二个worker获取到了消息,第一个worker没有获取到,那接下来的join就会阻塞造成死锁。</p>& C* J7 u7 @! M
<p><strong>注意到没有,Worker是被包装在Option内的</strong>,这里有两个点需要注意</p>
. O. i3 n$ }- L+ l<ol>$ S7 [/ P& ^* A' I! |
<li>t.join 需要持有t的所有权</li> [% K; Q; i$ J9 R$ @9 X
<li>在我们这种情况下,self.workers只能作为引用被for循环迭代。</li>$ @& e d+ Z2 i! o: S% I" A4 V
</ol>
- p# H1 G" i! i0 S' W8 G: H<p>这里考虑让Worker持有<code>Option<JoinHandle<()>></code>,后续可以通过在Option上调用take方法将Some变体的值移出来,并在原来的位置留下None变体。<br>
( \5 x1 |/ i: J9 x4 U& e换而言之,让运行中的worker持有Some的变体,清理worker时,可以使用None替换掉Some,从而让Worker失去可以运行的线程</p>+ Q t9 ^3 U7 O( x! [0 `: ^
<pre><code>struct Worker where' q# Y0 }/ F6 [. R/ Y5 X9 v
{
& N% F4 F- w7 Z6 D5 U: j+ O _id: usize,& |" a( G1 |, ?+ h. s$ k! x& |
t: Option<JoinHandle<()>>,# l; i- N3 [. k4 \; k; s; G2 T6 R
}
2 S* H8 J' v i6 t; n</code></pre>
( g, \9 t0 c# L0 B2 |<h1 id="要点总结">要点总结</h1>5 K' J7 U r6 |
<ul>
7 K5 X# n0 j$ I) J5 v5 i<li>Mutex依赖于生命周期管理锁的释放,使用的时候需要注意是否逾期持有锁</li>
9 y% b5 ~3 M' E7 s' e<li><code>Vec<Option<T>></code> 可以解决某些情况下需要T所有权的场景</li>& O+ L" n$ {% R
</ul>- E+ U& Y+ r- g9 T
<h1 id="完整代码">完整代码</h1>
( v7 d m) d" G9 b6 P4 T<pre><code>use std::thread::{self, JoinHandle};
5 R+ h( F) Y" c3 e- p5 quse std::sync::{Arc, mpsc, Mutex};8 f- t. C; A- X" {
p) m5 M% t1 C$ n7 `
/ p* U/ a* ~, s: c5 m+ r
type Job = Box<dyn FnOnce() + 'static + Send>;) R: D! ^; |8 T: p" ^ J; B5 \
enum Message {
5 t- p8 `. k) }$ \% ]% P ByeBye,
5 |3 h: p/ i6 h) {# ~+ R# v NewJob(Job),7 G2 m8 [* z* j1 Y5 j
}
% M6 ?) ~: z) e4 z3 \
# D' n O. ?9 I( t3 W: D8 l+ B* t# Astruct Worker where/ E+ {9 t8 g+ o
{) F. z7 W5 X; v9 d5 S7 X& O7 |
_id: usize,
; Y! p% P$ K$ m! L7 ? t: Option<JoinHandle<()>>,* H% J% W% R9 t
}1 y% I# v6 y+ J$ O% u
e! g6 f. [& ^5 a' N1 g0 c6 uimpl Worker
# ~( d+ k: [& I; D7 |{
! Z! d6 k6 d- J; E. k& q0 v0 I fn new(id: usize, receiver: Arc::<Mutex<mpsc::Receiver<Message>>>) -> Worker {
4 Y- d5 p6 L" J' y( f* P4 K/ ? let t = thread::spawn( move || {4 |( T4 I$ t% g- V* U: O; A' u
loop {
7 z% Q4 _7 z- g let message = receiver.lock().unwrap().recv().unwrap();
9 s" b1 ]8 Y$ o/ Y1 q- ^" w& } match message {
3 b0 V/ H6 C- ` Message::NewJob(job) => {
5 M! \2 C0 s& C ]1 e. L println!("do job from worker[{}]", id);
2 p! l# o! K5 o s0 F7 ~ job();1 \' g" [6 D% v: ^' Q" C
},
0 U7 t% P% y: r Message::ByeBye => {
+ B% \4 r2 E; U& K3 Q; b9 m println!("ByeBye from worker[{}]", id);9 U% z: |- _5 S9 E' `0 L3 L
break
9 p' k* y( d* } },
" \8 w o R k } - L% b) T* t' E
}+ ^5 ` X( ?( S
});
' B( I6 J8 f% N3 c' l
" M5 Q! _( Q& V4 F$ G' N+ w# m: [ Worker {
* L: E* |) E& q7 M5 j X7 b( [2 i; W _id: id,
: k' D% X6 |2 `$ [. N t: Some(t),, o( {! Z" g* a
}
6 P& I" ]0 j$ }) M- j6 S: ^% T$ t }- F" E) a2 @* u1 C8 J$ H$ T( m; k. y4 i
}: B4 u# ~, V2 [3 C' \2 y
: F% p s/ y( [$ b3 _
pub struct Pool {5 L9 Z Y+ W4 ^
workers: Vec<Worker>,
X( g" n* {4 P6 d: h max_workers: usize,
9 \* D# D. {! v ]: R, k1 a sender: mpsc::Sender<Message>" Y' ^% N! |6 N2 R! n1 \. e
}
! d% c0 n; a* \
0 P# }( @! F4 W- L8 j* O9 G( @impl Pool where {
6 G# F, x* m6 ]! K- l pub fn new(max_workers: usize) -> Pool {
5 S/ Z7 ~6 h, U& T if max_workers == 0 {: [# D2 b& U( |
panic!("max_workers must be greater than zero!")
8 @3 A5 H9 V7 t$ ? }/ F/ y$ D+ t% \
let (tx, rx) = mpsc::channel();% L, \! s5 K4 q y
8 k2 Q' J/ ^- |# I let mut workers = Vec::with_capacity(max_workers);" E3 x8 }. `( e5 P0 W
let receiver = Arc::new(Mutex::new(rx));
3 e s0 y; e. F$ R! y4 c6 W for i in 0..max_workers {
& a, V1 h7 t; C8 x8 o workers.push(Worker::new(i, Arc::clone(&receiver)));
0 J1 M! H/ C7 T, H. P0 ` }
+ l$ E$ O7 x; `( h- H# Q, t/ t+ N0 V, m7 c; K
Pool { workers: workers, max_workers: max_workers, sender: tx }
7 g& `. w% {" X8 g }
6 B- ~: {! T' A/ L/ h : G# Y- y- S! v f: W
pub fn execute<F>(&self, f:F) where F: FnOnce() + 'static + Send* h0 _% x3 L6 W/ @5 q
{
4 f) c) i+ e" y& ], p9 y9 ^
) V& Z* L: O* f* t, o+ ]2 }! } let job = Message::NewJob(Box::new(f));
& k( v0 t' j( k8 ?- X% w/ c5 g6 h self.sender.send(job).unwrap();6 `+ ?6 X4 O8 v1 L# |
}
# G! B u9 P/ i6 x}
" o" Q6 q' H8 O9 H8 N9 K) j" T
9 Q$ T. V# ~$ |- [% |& Y# Qimpl Drop for Pool {6 k. t! ]1 V* j% p3 `; \" R. } _
fn drop(&mut self) {
, w5 W# E( |6 r) b$ H for _ in 0..self.max_workers {5 R; Q$ y$ R( ^+ y0 Z+ S( l
self.sender.send(Message::ByeBye).unwrap();0 f/ V0 |& n$ Z, o
} D- u" i4 N: p" u; v0 N6 e
for w in self.workers {4 S7 s- G# V0 d; V; Z/ { j8 U0 m
if let Some(t) = w.t.take() {8 \; t+ l% @) L6 H
t.join().unwrap();
4 v+ o4 O0 `. P! l# G- A4 e, F }
8 @& ~# g. T0 V3 z }+ K( S+ e8 q- Q+ G- [
}
$ i- N$ g6 X# c b/ @4 ?7 U/ U1 D! O}
3 s# A: Y: W1 @
+ p- E( o7 D3 f( ~/ d2 V% W( F7 P. e( L
#[cfg(test)]! L9 m& `8 @2 x1 X3 c4 t+ j" s' }
mod tests {
9 k8 g& D6 o' u$ a# e* o: P! i4 j use super::*;
% ~1 e; h" p x) A #[test]
$ h* i; \5 ~5 H$ j% J# M+ S4 K& a fn it_works() {( q8 _' H4 @+ L/ s6 D$ P
let p = Pool::new(4);
! ^: H3 k8 A S/ i' b- i p.execute(|| println!("do new job1"));; s4 g" H' \- E2 S" y
p.execute(|| println!("do new job2"));
) q; q- m& M- z* C6 W: x: p p.execute(|| println!("do new job3"));; L: M/ B% ]+ t8 |$ o5 g$ ^
p.execute(|| println!("do new job4"));
3 f: [, E. n, d6 Q% y; s }
. g: v8 B6 n8 t* o* i}3 G$ k2 H1 Z: ~. @
</code></pre>' W' m* m4 h7 s0 T9 e
4 q! ?, Y5 ^4 Y8 _5 }
|
|