|
|
( X: S' c% p8 C. x$ R<h1 id="如何实现一个线程池">如何实现一个线程池</h1>- G: Y. D: B5 M: P4 v: G
<p>线程池:一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。 例如,对于计算密集型任务,线程数一般取cpu数量+2比较合适,线程数过多会导致额外的线程切换开销。</p>
" O: \4 x4 \, A<p>如何定义线程池Pool呢,首先最大线程数量肯定要作为线程池的一个属性,并且在new Pool时创建指定的线程。</p>' }8 R% E- o) X& l7 J. M5 X
<p>线程池Pool</p>1 n3 }& c/ J T. v( c I
<pre><code>pub struct Pool {
: T+ }- e' h5 F2 v4 b [ max_workers: usize, // 定义最大线程数& x% @- z1 _# z2 J6 u1 D! l2 a7 G
}
# d" `# Z' X, i2 G& Y' |/ g2 j8 @7 \
impl Pool {
5 g: i: A( `2 R fn new(max_workers: usize) -> Pool {}
* ^4 I a* v. B' _# V8 D# h P fn execute<F>(&self, f:F) where F: FnOnce() + 'static + Send {}) @( o! }5 r: D/ p8 `$ O* S( p
}! d* u& ]7 p2 e! k; G2 g9 k1 X+ W1 v
; z8 i# l- f6 B! P" K n9 g</code></pre>
6 H2 D" Z; }/ ?$ S<p>用<code>execute</code>来执行任务,<code>F: FnOnce() + 'static + Send</code> 是使用thread::spawn线程执行需要满足的trait, 代表F是一个能在线程里执行的闭包函数。</p>
5 s8 [8 F0 H& X" F' X( l) H<p>另一点自然而然会想到在Pool添加一个线程数组, 这个线程数组就是用来执行任务的。比如<code>Vec<Thread></code> balabala。这里的线程是活的,是一个个不断接受任务然后执行的实体。<br>
' ?' t% S0 a/ M* y6 O可以看作在一个线程里不断执行获取任务并执行的Worker。</p>& X; j( g: k1 U: X6 D. [; ~
<pre><code>struct Worker where) p) d9 w4 z! a: i+ `
{
% N5 `, A! D4 w4 Y! o! J _id: usize, // worker 编号; X6 n- h; ?. f$ q
}" M. q9 i# y$ Y1 ` s/ }! d
</code></pre>
* k9 {, x- y5 z: N0 q<p>要怎么把任务发送给Worker执行呢?mpsc(multi producer single consumer) 多生产者单消费者可以满足我们的需求,<code>let (tx, rx) = mpsc::channel()</code> 可以获取到一对发送端和接收端。<br>
' V% O$ S* W1 Z5 W5 I6 f把发送端添加到Pool里面,把接收端添加到Worker里面。Pool通过channel将任务发送给多个worker消费执行。</p># [. h* z- D7 z0 c/ E
<p><strong>这里有一点需要特别注意,channel的接收端receiver需要安全的在多个线程间共享</strong>,因此需要用<code>Arc<Mutex::<T>></code>来包裹起来,也就是用锁来解决并发冲突。</p>
) x7 `" C9 Q' E [8 U) @<p>Pool的完整定义</p>
) B; _9 p1 s; p5 K. Z<pre><code>pub struct Pool {: e' M- S9 q$ L1 _( a; h
workers: Vec<Worker>,' [1 t* ^- q' q/ y' D- D5 T/ t
max_workers: usize,
% U; m9 y$ P1 j& G1 \- D sender: mpsc::Sender<Message>
% m& c1 \6 Y, F0 I. z* d}$ X' N8 m: S8 e+ f$ L3 U
</code></pre>2 C( l% d3 A) m9 Q0 P: l( d4 Y! V& w
<p>该是时候定义我们要发给Worker的消息Message了<br>+ A X H( F+ \( O
定义如下的枚举值</p>
- m& o, [1 Z5 \3 l: y K7 t<pre><code>type Job = Box<dyn FnOnce() + 'static + Send>;# S( [$ ~9 C* a9 P/ x
enum Message {8 G& G" K1 M* F- s @! d
ByeBye,6 w; R9 v, r) y4 V
NewJob(Job),
2 V. p& u0 {% U% u$ ~* ?% A}% Q# I! a- J' f, u1 C) {$ u
</code></pre>7 {( C5 Y( n* c
<p>Job是一个要发送给Worker执行的闭包函数,这里ByeBye用来通知Worker可以终止当前的执行,退出线程。</p>. X* k; t* k7 h0 a( F; F( \
<p>只剩下实现Worker和Pool的具体逻辑了。</p>+ I5 f8 ^7 Z, J2 d: O9 X
<p>Worker的实现</p>
: ^. T. _7 e* p- l5 V" }, u1 u2 b<pre><code>impl Worker/ K0 a; y( u: @
{
* o- |, Q3 i: m/ N: @ o5 F fn new(id: usize, receiver: Arc::<Mutex<mpsc::Receiver<Message>>>) -> Worker {9 C% @& \8 W$ q) g6 \9 I1 ^) E' i2 b) Z
let t = thread::spawn( move || {8 [* {, J2 ]% K n# T- O$ _
loop {5 J1 y1 ]3 D% z2 G# q5 y
let receiver = receiver.lock().unwrap();
+ P5 W) A/ S. b; G, r/ S9 s let message= receiver.recv().unwrap();
0 R8 L! A( d' R3 E5 U5 O match message {- N) x* ~4 ?+ B9 W& `# R
Message::NewJob(job) => {
% _2 v$ t- `5 x( w println!("do job from worker[{}]", id);& K$ R9 }# J K% B7 ~" c& ?
job();& T' e. S) z. o# I4 d0 T, @) m
},
3 I T8 U) l* _7 Y2 m* B% ^6 h Message::ByeBye => {
; W: p0 m7 Z" H! s1 J println!("ByeBye from worker[{}]", id);
8 q# @2 V! O" r2 o break' x7 f' I' B( {7 ^
},
) [. h2 j! S) ]( V( J }
* c. c: N/ l' f' w( }/ b8 K }
~0 `& P$ y. o) O });
7 u* T6 P2 d" n+ B- G; j" y6 f! B4 P m
Worker {$ ]! e6 o+ C' v& n+ ^$ c& R; A
_id: id,
- L2 F) U; I: o% R t: Some(t),2 E/ y( v$ o8 H0 S x6 x
}8 N; ~* m# h4 o) p- j1 v; S* k5 H
}; E/ N2 |+ {; j4 ^( P$ C7 N: V
}
) T+ s" n0 q* c* I6 C, f4 M</code></pre>5 E& Q( U) `/ c* w# ~7 z/ t% D5 ^7 J6 f' R3 u
<p><strong>let message = receiver.lock().unwrap().recv().unwrap();</strong> 这里获取锁后从receiver获取到消息体,然后let message结束后rust的生命周期会自动释放掉锁。<br>
7 b2 i5 m4 f& P& W# E3 K但如果写成</p>
: e/ [4 |2 |4 l5 p# m<pre><code>while let message = receiver.lock().unwrap().recv().unwrap() {0 v& ]4 n& p" y" ~
};
' p; |" c9 o& ^ h& w& [</code></pre>/ x( j% E) ?8 M3 g4 \
<p>while let 后面整个括号都是一个作用域,要在这个作用域结束后,锁才会释放,比上面let message要锁定久时间。<br>
( x! x; {1 l" V5 j( ^, orust的mutex锁没有对应的unlock方法,由mutex的生命周期管理。</p>
( h" F- `3 m) c& J" ]5 e1 n* k" T<p>我们给Pool实现<code>Drop</code> trait, 让Pool被销毁时,自动暂停掉worker线程的执行。</p>
' M% g% G! E4 s) P: s7 X<pre><code>impl Drop for Pool {- f5 o" f. @! X: q
fn drop(&mut self) {; E) ^- H' N, m3 v* r
for _ in 0..self.max_workers {
1 b! }: a6 a. [3 Z c" s" ` self.sender.send(Message::ByeBye).unwrap();
; `+ y a7 s( ?8 O6 i5 l }0 V" g3 }5 C. a% K
for w in self.workers.iter_mut() {
1 t5 M3 n/ J% F+ e5 g% |- o1 ~ if let Some(t) = w.t.take() {0 e: y% @1 U0 F
t.join().unwrap();# O% C/ W1 ?2 u6 q' j) T
}6 d8 e9 m) d1 }. V4 ~
}
7 B3 V/ q5 u4 ]2 A }
: x4 Y B5 d0 A, |; {}
" `8 p; i/ a! ^8 [/ A {5 y) D) a! r4 ]0 N, y
</code></pre>7 q N$ [: k& A! V# w' ]
<p><strong>drop方法里面用了两个循环</strong>,而不是在一个循环里做完两件事?</p>
& i8 W9 k2 p( L; ]) H9 {# P<pre><code>for w in self.workers.iter_mut() {* X/ y, \6 b s. P" ]' L' G
if let Some(t) = w.t.take() {. P9 R& k9 T6 [ f+ T
self.sender.send(Message::ByeBye).unwrap();
' w6 z; v% r( B0 {% h: m. f t.join().unwrap();
0 d* b0 j; f4 Y6 X% } }# K! [% k6 T. i, U
}+ _! g) e2 Z( v4 ^5 `. `. k8 r2 O
) l3 C9 Y* q1 Y/ c0 i) e3 d</code></pre>/ E3 P9 [# Z% s, s
<p>这里面隐藏了一个会造成死锁的陷阱,比如两个Worker, 在单个循环里面迭代所有Worker,再将终止信息发送给通道后,直接调用join,<br>
/ b. m7 V- G2 @9 q& t我们预期是第一个worker要收到消息,并且等他执行完。当情况可能是第二个worker获取到了消息,第一个worker没有获取到,那接下来的join就会阻塞造成死锁。</p>
X. g7 P+ j0 C [1 q/ u$ P' r0 l* I<p><strong>注意到没有,Worker是被包装在Option内的</strong>,这里有两个点需要注意</p>
7 B7 j( g7 I+ G7 q v- @# P- l [<ol>1 U, |2 A" e/ c$ D
<li>t.join 需要持有t的所有权</li>
5 ?/ B0 o) f- A9 D4 j<li>在我们这种情况下,self.workers只能作为引用被for循环迭代。</li>0 H- u3 ~: c# S7 a9 Z: h
</ol>
/ m5 U- F6 B# _3 B3 G<p>这里考虑让Worker持有<code>Option<JoinHandle<()>></code>,后续可以通过在Option上调用take方法将Some变体的值移出来,并在原来的位置留下None变体。<br># D6 M* N& ?9 C: E
换而言之,让运行中的worker持有Some的变体,清理worker时,可以使用None替换掉Some,从而让Worker失去可以运行的线程</p>
0 v) M0 v; ~ V<pre><code>struct Worker where
( M$ r' ~# L. A" `; ~9 S{+ F1 ~5 A1 |' P+ B" Q
_id: usize,
7 l G9 r9 D; \% O+ ^0 N! g t: Option<JoinHandle<()>>,
* Y3 {2 b* W5 x. J}
/ `3 v) ?- Q4 y</code></pre>; B, ]; l3 D( ?6 q9 f
<h1 id="要点总结">要点总结</h1>
& l# x& P6 Z P( B! f$ q<ul>
4 Y; j c+ V2 e8 ~4 E5 a7 Y<li>Mutex依赖于生命周期管理锁的释放,使用的时候需要注意是否逾期持有锁</li>4 L7 E% a: P& M1 K. S
<li><code>Vec<Option<T>></code> 可以解决某些情况下需要T所有权的场景</li>0 o$ ~# n) T; P1 _1 z% S' n
</ul>* d1 O' `+ ?* G5 D
<h1 id="完整代码">完整代码</h1>
: u! M- m, J& F! N<pre><code>use std::thread::{self, JoinHandle};' P+ P2 w8 C9 Y$ B1 N0 v7 l
use std::sync::{Arc, mpsc, Mutex};
/ L! ?, w" Y2 R7 Z3 j x
/ E$ y9 l* b) i" M5 {; v" a% B2 U5 v( u3 l3 \% ]& C) P
type Job = Box<dyn FnOnce() + 'static + Send>; L# S( A/ |. }9 G2 z) ?
enum Message {' V7 I3 X" }" {3 H5 s; H) A
ByeBye,/ o& A! D0 p @& T- Z
NewJob(Job),% r$ X% _6 a8 _5 _8 g
}
I: Y! B' R0 @- `3 O1 Q
( \; B8 }+ q% Cstruct Worker where
" t' B2 H$ b( `2 _) T( V( c{) m( I7 X8 j. f0 Y+ \1 ?6 h* ^
_id: usize,6 c# m6 f/ M9 v8 Y( Q2 Z* W$ [
t: Option<JoinHandle<()>>,' h+ E: u: w8 C+ ]- l8 \5 z9 Q3 x
}/ O; |% n& A' K) l" @
7 G& Q6 Z; I3 F9 r' {+ Z0 ~! O
impl Worker8 v8 d! Y+ K J2 U. K+ L
{
6 o7 s$ U* a6 o. y7 w( }; M4 y5 g: h fn new(id: usize, receiver: Arc::<Mutex<mpsc::Receiver<Message>>>) -> Worker {" \" L7 r& h6 `9 C% |
let t = thread::spawn( move || {+ K# I7 \' n; u! u1 o
loop {
; j" M- T7 k. \0 [0 a let message = receiver.lock().unwrap().recv().unwrap();, I) T: n$ b1 I
match message {
( J( e" h4 \3 a Message::NewJob(job) => {, c4 I4 N! t- x& ~; S- o
println!("do job from worker[{}]", id);& A1 @; p7 l7 Z+ Q- o: m1 V
job();
) S8 Q( ^. K5 V N a },& Z/ f) I2 _+ m6 K! Q5 u7 X
Message::ByeBye => {0 t" v3 W- |# L
println!("ByeBye from worker[{}]", id);
9 @' Q: w, F( ?, } J% e break* \1 g; x9 t7 i- ?
},' R# z& ]2 g# e* |
} 1 |9 y! t9 d, ?- Z
} R/ Q( M5 R# f7 H4 i" v
});$ k1 G8 h S& Y! t/ I, S
3 C' X% x x0 \3 Z' x# Z Worker {
: x( A. U8 {, {- W) U }9 a _id: id,
% A% M" G0 t9 ? t: Some(t),- |& F# W( A$ V
}
3 ^0 j2 m0 m. T: ] }
2 p' r! q7 @- \ |4 }( F* l}
3 w# J; [% R: K0 L1 n" [8 N9 u S
4 t2 l0 B" j$ g5 F! T8 K7 p$ m( D- hpub struct Pool {3 R' [! n7 O( V
workers: Vec<Worker>,' m: j* `2 Q2 y" k& J7 e
max_workers: usize,
9 H9 j* Z/ |) g sender: mpsc::Sender<Message>: n/ f; a6 D/ o
}
+ I5 _. E5 Z+ n" F3 D8 N* Y1 e5 e, N4 K& ?. v
impl Pool where {
5 q5 C7 d& {( `2 d8 ^ pub fn new(max_workers: usize) -> Pool {- C1 E3 w* O3 ~/ i4 S0 f. ?
if max_workers == 0 {
3 O+ s* L& h0 X5 G panic!("max_workers must be greater than zero!")& ?1 b+ v* {, [4 _% a9 `
}
1 J2 U8 q4 |- @ let (tx, rx) = mpsc::channel();
5 a) T) j1 r! X2 G0 q; u! u/ _! T0 q
let mut workers = Vec::with_capacity(max_workers);. P2 S5 u0 V+ Q8 Q1 J% ^! b/ H
let receiver = Arc::new(Mutex::new(rx));& l$ X7 Y' L) A& s% |7 j) J3 I
for i in 0..max_workers {
/ k- }: w3 c$ Q# S8 J7 O workers.push(Worker::new(i, Arc::clone(&receiver)));
- ^; b7 _/ B9 |# V }
% X2 H A' ?( g
! u6 P0 h2 }( L7 V _ Pool { workers: workers, max_workers: max_workers, sender: tx }
6 d& r6 {* K, ~" l! z L" ]: b }
5 k4 o, u% Q. N: s% B) W7 H) f6 ?
# c( T2 n: s1 F' t* s* Y7 O pub fn execute<F>(&self, f:F) where F: FnOnce() + 'static + Send
8 p, `$ A) t9 z! R: \ {
2 U, U7 L4 ~- }; f7 f3 [; [+ r |0 G/ ?5 c% O
let job = Message::NewJob(Box::new(f));
6 }: Z* ^& b( B( e5 |7 t1 }- M" f+ X self.sender.send(job).unwrap();6 D: G7 |( ]7 r$ d5 j7 S; F+ G" f% e6 v
}
! ~# u( F; H! G- B}
% P) R: y' Q0 y/ F. `& |5 i
9 |( r0 c$ l+ f1 _" {impl Drop for Pool {
$ q% n n/ }2 P6 x0 o) e fn drop(&mut self) {% o. Y5 g0 E6 u/ r1 A5 I
for _ in 0..self.max_workers {
0 y& m" p9 \) i+ H self.sender.send(Message::ByeBye).unwrap();
7 z; V4 T2 W# P- M }
; h2 d: j" T% M) _, ~) e7 { for w in self.workers {& a5 `7 @, n; {% z7 i
if let Some(t) = w.t.take() {1 @$ `" k7 g) e+ ^# ]$ Y. B2 G
t.join().unwrap();
' Y! e, [- {' R7 ?3 E }
$ ^5 G+ N& P( C. T! M0 T }
6 ^- Y" V. F2 o6 }* K0 U2 [9 O }
4 k: [! H( S& R! |}- M/ n4 l( x' X# k# h
4 P3 P0 ~ l2 S# v$ y( A
8 t# ?4 t* t8 O* c4 ~#[cfg(test)]) ?/ @# W4 p5 X1 }! I
mod tests {
3 s9 t0 i$ X( G7 r( ]& G use super::*;
0 t/ I0 K' m* ^2 g( _ #[test]
0 y& R, q7 W+ W: F, R. e fn it_works() {
0 \' {/ _, d6 I! ~# ?) p, y' N1 Q let p = Pool::new(4);9 M8 f- g/ d/ ]% n W" T, s, a
p.execute(|| println!("do new job1"));2 O' E; Z0 x6 u6 X$ \, n
p.execute(|| println!("do new job2"));1 U5 b+ ] o: }5 \9 y# |! ]4 w
p.execute(|| println!("do new job3"));
. l) j, D3 X+ r/ L, {5 }" K p.execute(|| println!("do new job4"));* |1 X$ \& D G8 F! s# J+ T3 N
}
' j8 R3 S! k- `# K! f4 t}3 L$ j2 z; {! r) w P
</code></pre>/ i6 H, y! s8 y' O) U6 L9 L8 ^4 ~) H
0 [9 Z. `/ G+ P- g# S |
|