|
|
$ V1 {+ X- r0 {, v" h2 Y/ t1 w" m5 L<h1 id="如何实现一个线程池">如何实现一个线程池</h1>
1 g& {9 d6 Z9 C" ]& Z4 ]<p>线程池:一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。 例如,对于计算密集型任务,线程数一般取cpu数量+2比较合适,线程数过多会导致额外的线程切换开销。</p>
& ~ e' J* W2 }, j5 A' f<p>如何定义线程池Pool呢,首先最大线程数量肯定要作为线程池的一个属性,并且在new Pool时创建指定的线程。</p>
9 Z+ ?2 m9 ~( f1 c! ^<p>线程池Pool</p>8 \6 V8 v* S! O* v3 b$ U$ ~! f
<pre><code>pub struct Pool { n7 [3 t* j; k, P9 b3 Z* P
max_workers: usize, // 定义最大线程数' h( y6 i q0 A5 e7 O0 s
}
1 t( j( I9 ^. ]- N) A
; E8 ?! W% q, H7 ~3 Kimpl Pool {
9 f1 Z. [! h* ]3 N) ? fn new(max_workers: usize) -> Pool {}
# E% A" ^! ~* C3 ?+ G$ D5 j1 T fn execute<F>(&self, f:F) where F: FnOnce() + 'static + Send {}
8 E0 [- d" K2 t# D, c}
/ `" m& q# g+ r6 R6 B# g+ d, C+ R! ^1 N, a
</code></pre>% S) ^+ Q6 k. j# [& l7 D% l+ F N
<p>用<code>execute</code>来执行任务,<code>F: FnOnce() + 'static + Send</code> 是使用thread::spawn线程执行需要满足的trait, 代表F是一个能在线程里执行的闭包函数。</p>- p4 ~2 i, v% F9 K. }) |# v1 T
<p>另一点自然而然会想到在Pool添加一个线程数组, 这个线程数组就是用来执行任务的。比如<code>Vec<Thread></code> balabala。这里的线程是活的,是一个个不断接受任务然后执行的实体。<br>) z5 D5 _( u4 w( c& A
可以看作在一个线程里不断执行获取任务并执行的Worker。</p>5 U3 g F7 j0 K" g! C( ^" d
<pre><code>struct Worker where w' o; \* O- B& r! |. a! O8 k2 A
{
" n$ ^6 F( ]% i0 f7 Z( k _id: usize, // worker 编号
: m% O) g/ U$ V& M5 K: @4 T}& u, _3 @" }! i3 U$ a
</code></pre>* J7 g! h5 o7 Q" w3 v
<p>要怎么把任务发送给Worker执行呢?mpsc(multi producer single consumer) 多生产者单消费者可以满足我们的需求,<code>let (tx, rx) = mpsc::channel()</code> 可以获取到一对发送端和接收端。<br>% k& i7 {; H) C" T
把发送端添加到Pool里面,把接收端添加到Worker里面。Pool通过channel将任务发送给多个worker消费执行。</p>) b9 i% T1 ^- S& }- n& x+ N
<p><strong>这里有一点需要特别注意,channel的接收端receiver需要安全的在多个线程间共享</strong>,因此需要用<code>Arc<Mutex::<T>></code>来包裹起来,也就是用锁来解决并发冲突。</p>
' L8 K( f1 Y; D) T1 }2 _<p>Pool的完整定义</p>
8 K, S u9 a* d7 ] W<pre><code>pub struct Pool {* ?3 R; Q2 D- t s6 L5 A. S
workers: Vec<Worker>,
2 A. |$ r9 L5 ~# p% C) C- Y max_workers: usize,; k6 W7 {, L2 }# m
sender: mpsc::Sender<Message>4 S! u4 w& K6 N7 R
}
3 a9 @9 M" v6 p! R$ x2 Q. X</code></pre>
$ `6 S5 V, p; l9 p<p>该是时候定义我们要发给Worker的消息Message了<br>
7 n$ Y% @; ~+ M: D0 b定义如下的枚举值</p>
( z1 g' w' [4 Q/ l0 _$ C' i1 [7 ]<pre><code>type Job = Box<dyn FnOnce() + 'static + Send>;& a& s4 `+ G) G( ~" g
enum Message {% W c i9 M# g% j$ x
ByeBye,
+ `& m- S; S8 z" e o# R) p NewJob(Job),+ }. r, D+ |3 e* k
} n# e9 [+ V7 g; ] \
</code></pre>4 _4 b5 k1 o- v5 o7 J
<p>Job是一个要发送给Worker执行的闭包函数,这里ByeBye用来通知Worker可以终止当前的执行,退出线程。</p>; E9 O O0 w9 r0 |2 [5 B
<p>只剩下实现Worker和Pool的具体逻辑了。</p>/ h. I. S7 E4 C1 X6 e0 ^9 o
<p>Worker的实现</p>
( f9 j9 c) n- Q3 t4 G+ |$ P8 F<pre><code>impl Worker$ i; Z1 `' v& Q/ c+ }& D: u
{
7 e- k% X# S3 A! H$ C+ g fn new(id: usize, receiver: Arc::<Mutex<mpsc::Receiver<Message>>>) -> Worker {* u6 S$ W* L- c0 l" C- n5 r) Z" @4 m
let t = thread::spawn( move || {8 @+ {( p# t! d, @- Q
loop {; [9 B" c! ?* t* P5 {) V: ~' p
let receiver = receiver.lock().unwrap();. h3 U- Z( v6 h
let message= receiver.recv().unwrap();
7 k$ h0 y0 f) d ~$ _8 y4 ~ match message {9 K3 `- i5 v5 v7 C
Message::NewJob(job) => {
" \; T/ w) J }% v) _9 S8 | println!("do job from worker[{}]", id);& k6 Q3 A0 \1 ^
job();4 Z4 X( b5 \1 u, @
},$ P8 T7 l4 b4 k4 a# U$ s0 K7 T# w
Message::ByeBye => {
8 i1 H/ X8 ^) a4 }5 w( ` println!("ByeBye from worker[{}]", id);
4 v5 I }; x2 g, c/ j% N* ] break
3 v: M5 M" A' \ m! J1 H },
: Z% Z- B& h8 u4 [5 \" y; Y }
8 t3 {# K# r/ E- K! b2 B* ?, [ }! b5 p- Z. B3 S
});
6 G- w2 w* j1 K
, \( C6 P, q0 P) F, T4 A Worker { ` e6 E% P8 z) A F" H7 x
_id: id,; o- g% u+ M2 `
t: Some(t)," k4 T3 i& c0 j& V
}
) R4 ~6 ?) |1 ^ }7 ~0 n. p# e; C) |+ m) Y, d
}7 y. N. T* T3 a( l+ P
</code></pre>* m0 B/ s& |. o# w0 Y( v: F5 ~& z
<p><strong>let message = receiver.lock().unwrap().recv().unwrap();</strong> 这里获取锁后从receiver获取到消息体,然后let message结束后rust的生命周期会自动释放掉锁。<br>
% E8 d; y0 ^+ r: Q: l$ O% S* c# H但如果写成</p>
5 @; C% H$ ?/ h6 L- a<pre><code>while let message = receiver.lock().unwrap().recv().unwrap() {
$ J" ~4 Y6 t9 n. Q. _};- T. a% `( g) e) N1 r; \ }
</code></pre>
" J& {$ L. `3 G2 f/ o6 C<p>while let 后面整个括号都是一个作用域,要在这个作用域结束后,锁才会释放,比上面let message要锁定久时间。<br>
, U: w$ Y1 P6 R+ y* J' w) J$ t# Srust的mutex锁没有对应的unlock方法,由mutex的生命周期管理。</p>% E8 x% {/ z* z& N" V
<p>我们给Pool实现<code>Drop</code> trait, 让Pool被销毁时,自动暂停掉worker线程的执行。</p>0 }& j( P3 Z( j1 O# d+ v
<pre><code>impl Drop for Pool {- Z7 }& s8 F; c
fn drop(&mut self) {
; S( F( a9 A6 \$ e) S for _ in 0..self.max_workers {
# B# M% q. n1 u2 h( a' H self.sender.send(Message::ByeBye).unwrap();
$ a9 y w7 u" C# Q, h: I }
0 Z; L4 U- @6 i! h; k for w in self.workers.iter_mut() {! g/ e6 J5 ^8 t* v
if let Some(t) = w.t.take() {, P6 C9 h( G6 _6 O/ x7 _5 h4 c2 J! }: k
t.join().unwrap();
" b. Z( M/ h6 O4 G( B5 C7 Q8 E# k. J" a }, ~+ r) M- G7 [# q( b) S- _
}
, x+ j5 A: z0 l' p0 L }# @4 ^9 R; i) F5 J b& Q: y% l& d# ?
}7 B5 I/ u% I) \" e4 l1 B
4 r7 M" s s( R8 S" w) t
</code></pre>- s( Y. c( ~6 d0 e' V3 ^% D
<p><strong>drop方法里面用了两个循环</strong>,而不是在一个循环里做完两件事?</p>( m$ U& b, _" R, ~* U J7 {! |: \1 e
<pre><code>for w in self.workers.iter_mut() {
3 ]' P8 {) l- x, { if let Some(t) = w.t.take() {" O" x5 u; o a! F
self.sender.send(Message::ByeBye).unwrap();% ~: \( y+ e8 c8 v0 J9 K7 a
t.join().unwrap();
# g/ O+ p" @! _, A0 [8 u3 _ }: L5 z+ P F2 L. `/ I$ t4 L, i
}
" D$ A* r( N6 I
) t2 p& D* W5 }4 e7 Z6 k7 u</code></pre>$ E0 I1 K- R0 z; N3 V
<p>这里面隐藏了一个会造成死锁的陷阱,比如两个Worker, 在单个循环里面迭代所有Worker,再将终止信息发送给通道后,直接调用join,<br>$ z! \1 Q# C/ |1 m6 g
我们预期是第一个worker要收到消息,并且等他执行完。当情况可能是第二个worker获取到了消息,第一个worker没有获取到,那接下来的join就会阻塞造成死锁。</p>, T" _' i$ O1 r0 W" w- \6 ]1 s- F
<p><strong>注意到没有,Worker是被包装在Option内的</strong>,这里有两个点需要注意</p>
/ e' }/ @2 G+ y<ol>; `% X/ z+ G. A h e' ]
<li>t.join 需要持有t的所有权</li>
" Z9 c: v8 {9 ~" c# D; w<li>在我们这种情况下,self.workers只能作为引用被for循环迭代。</li>: k( j: P( e- b$ D) C' R' [
</ol>
- r9 q# Q0 i4 |1 p<p>这里考虑让Worker持有<code>Option<JoinHandle<()>></code>,后续可以通过在Option上调用take方法将Some变体的值移出来,并在原来的位置留下None变体。<br>7 s N5 W& f5 V% {
换而言之,让运行中的worker持有Some的变体,清理worker时,可以使用None替换掉Some,从而让Worker失去可以运行的线程</p>' L$ g1 L( Z5 c# k: H5 a+ h4 M
<pre><code>struct Worker where- W4 v2 Y f0 u2 n, s
{* \- R: H& v3 r% L
_id: usize,
4 k' P1 J4 K) W2 S" Q, }% }' X t: Option<JoinHandle<()>>,
! L! t$ A4 x2 J}
" P& d; W; Q& r+ ~" H1 P</code></pre>! G3 t( Z# n4 Z: z3 w) w
<h1 id="要点总结">要点总结</h1>
9 }+ C; D5 u) `! n3 ^5 G/ }# D& x<ul>3 p9 n1 t2 Q# q' ~* L
<li>Mutex依赖于生命周期管理锁的释放,使用的时候需要注意是否逾期持有锁</li>
7 b) h4 n# Y* |0 F% [) U: Y<li><code>Vec<Option<T>></code> 可以解决某些情况下需要T所有权的场景</li>) A: y# \4 o: f* r+ f2 {3 r$ L/ Q; `
</ul>1 ?0 Z+ R3 X* G+ G: y) {
<h1 id="完整代码">完整代码</h1>
4 `3 S5 W7 l# q9 W; d: o# m<pre><code>use std::thread::{self, JoinHandle};
7 Q* w$ _" B* c7 m" S, H( I; `& Quse std::sync::{Arc, mpsc, Mutex};* O, ]; I% S9 @: R
+ g$ R0 S2 U- J1 c
' J0 ?+ E6 {3 a$ [: o8 {! |. {type Job = Box<dyn FnOnce() + 'static + Send>;
/ f2 Z/ O- L1 {- l+ Senum Message {
, N- C# a5 L9 l% }4 Q3 j; h ByeBye,
* _3 `2 O" q) A NewJob(Job),
( L4 l# _. k/ ], T6 o3 ]}1 P) V/ x: \; }4 u* a* t. |2 r
$ z$ `8 i) o9 v- d
struct Worker where! o- R% z" W: i, U/ D" H
{
" R% V4 T/ g `" } _id: usize,
7 `7 o7 ?+ \% V* X9 W/ }# C t: Option<JoinHandle<()>>,9 ^5 |: O+ r+ i5 F
}
- d, g" v+ b7 |' z6 X$ ?
9 w( ]4 ?$ A" D, b/ v' bimpl Worker8 x4 V8 j* G0 Y* }; V0 c8 |2 X: T( t
{
% J. P$ T; q3 c! j fn new(id: usize, receiver: Arc::<Mutex<mpsc::Receiver<Message>>>) -> Worker {
/ O+ q6 s3 J) v, p$ m let t = thread::spawn( move || {: F+ t. o( }, J& Q% ~' f
loop {6 i" h& P7 }5 a- F! e' S% j$ ]
let message = receiver.lock().unwrap().recv().unwrap();0 F8 h3 \0 i. K5 _$ d% h' t
match message {
+ J1 v0 V$ h# G/ t8 B Message::NewJob(job) => {* T. _; ?! ?$ d9 F( @: Z y1 s
println!("do job from worker[{}]", id);
# B' e6 H" e* t/ Z" s4 y0 R# V job();
$ d( C0 `7 t1 C3 v },
+ T+ `$ b9 p% y: ] Message::ByeBye => {
5 u9 Q3 ]" B5 {9 ^- ` println!("ByeBye from worker[{}]", id);7 f$ H7 D: H3 M0 _2 n. w' W
break
, w$ Q- `0 H+ ^# F" @* k },: v; T+ d, B( U/ v# s1 }
} 0 i( m+ W; ~ T/ L5 a3 G
}8 c9 d; f& x3 E$ C2 Q( K/ g
});
3 d# j4 a- q9 ]5 `7 f) H- q- A2 J8 H
Worker {- J. e% F F, }( o* b* S
_id: id,! G% P' |# H' U: P P
t: Some(t),) O5 r9 e' ?( F1 B9 s
}6 n) ?4 w, ]; m3 ]! m+ X, b& r
}) Z9 J# k0 N+ b, B7 ~) m% F
}
0 y, h) K5 V( W7 _5 R" c4 c r0 P. G; h/ B5 z
pub struct Pool {
& i3 U4 V' B' w, H: j8 P# |& R workers: Vec<Worker>,- m) w3 A+ Y1 @! k4 @' h- t
max_workers: usize,
]% S9 P9 M% Y! o, x# w sender: mpsc::Sender<Message>
% o( u, K9 G! v+ J8 F% M9 ]* k}
- x; v$ o* g0 o5 l+ d0 N& l$ s. C9 }2 A9 z0 a& k" ~8 J
impl Pool where {
" I' z- |' Y. c1 c; ^ pub fn new(max_workers: usize) -> Pool {" e( K5 N: D; s2 _
if max_workers == 0 {
5 A# G, u$ C/ r7 f) D panic!("max_workers must be greater than zero!")
* {1 K# @+ ~1 L' p }; T6 j2 r' j! A9 g
let (tx, rx) = mpsc::channel();1 \& {" w$ l7 q2 W4 N
1 N2 z& `0 i5 N' t0 K! n* r& l
let mut workers = Vec::with_capacity(max_workers);; B/ M) ~% ~( f# D6 I& b! T% _
let receiver = Arc::new(Mutex::new(rx));
( c* G ?( U6 i8 B, g( r for i in 0..max_workers {) N0 B8 Q+ p# h0 _5 P0 {
workers.push(Worker::new(i, Arc::clone(&receiver)));
: {! a& x: D7 J2 I: d1 H' h }. G+ A4 W, `( O' V2 Q
! a4 t9 Z+ l; D, w
Pool { workers: workers, max_workers: max_workers, sender: tx }9 ?$ W0 ]& O) n. i
}. h- `( w, g4 A. l9 b3 r
3 e n3 v+ ^! l: v. m4 Q4 l- E/ O
pub fn execute<F>(&self, f:F) where F: FnOnce() + 'static + Send
7 r6 z; N2 ]" m# f" Y5 N) h* S9 t {+ p. l' V4 t0 m5 Q8 ?& k9 c) @. q
# G8 L0 Y+ K( U
let job = Message::NewJob(Box::new(f));! s7 }: ]' m k. | H1 x/ l
self.sender.send(job).unwrap();
7 P$ ^- q/ ?4 L# n& P2 t }
1 L" F: b/ M" e}
. s$ ]" B& _$ D% L8 J/ J
' c5 T2 S; u6 b# }impl Drop for Pool {3 m+ G0 q; h" W# |
fn drop(&mut self) {
. o6 ?, R0 [- J: `" |* B for _ in 0..self.max_workers {
4 h6 z" O! R7 ^) _) b# M+ M' a self.sender.send(Message::ByeBye).unwrap();
4 X1 x# |5 Y; A, E* L! S4 s/ Q }# T7 j0 `3 Y: B
for w in self.workers {
- \( w; ?$ Y" F( r if let Some(t) = w.t.take() {2 V1 a# s" U' C3 \: }
t.join().unwrap();/ X' p% t& G6 P; j6 G h; I
}* H9 F- W5 d i% }+ l8 ^0 X
}- h; d+ B" [8 d% A8 e/ S% n
}% N2 b+ w' \0 ^7 ~$ ` P! a
}; G4 p& h Z2 j, R
" F5 S7 J& w' V6 ^, B8 y; e$ H1 A0 F: e6 X4 h- t* v
#[cfg(test)]
& g' l. r5 u: B# Q5 ], D& q- X3 wmod tests {
, V4 W5 i) i z6 J. w8 a use super::*;
2 k: p9 V1 L! L! S( d/ q- Y6 n #[test]- E' k, G. k# Y) E8 F; t
fn it_works() {
B# R; b7 S4 w- y$ a \ let p = Pool::new(4);* k c9 a/ t: {# f1 X% d
p.execute(|| println!("do new job1"));4 o( }$ c, _0 Y g, Y
p.execute(|| println!("do new job2"));" E9 l' Z) R; t/ X- b7 }
p.execute(|| println!("do new job3"));6 s* P3 ]/ h- m+ V( A1 V% m
p.execute(|| println!("do new job4"));: a E7 K- s, f9 N5 B
}
& e9 T- K8 G% m6 N' G}
9 \7 Q( ^ D( K" F3 [</code></pre>9 U. M7 m/ J- p2 e
( ?9 D$ t1 \$ s
|
|