|
|
6 E) U: z0 w5 y: W; P( w
<h1 id="如何实现一个线程池">如何实现一个线程池</h1>
' X+ C* K+ [3 Z2 M; d<p>线程池:一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。 例如,对于计算密集型任务,线程数一般取cpu数量+2比较合适,线程数过多会导致额外的线程切换开销。</p>* C3 ~5 [2 z" ~0 t: v) C
<p>如何定义线程池Pool呢,首先最大线程数量肯定要作为线程池的一个属性,并且在new Pool时创建指定的线程。</p>( R, |& Z1 z9 d1 t
<p>线程池Pool</p>1 s6 b4 C+ r" r
<pre><code>pub struct Pool {8 j: w6 o( k8 T. w3 H
max_workers: usize, // 定义最大线程数
; t! K, C! K. [, ]* P2 s' w7 H}& [: U: H, E5 b
- ~0 u1 [9 U5 _! B( h$ R
impl Pool {
# T" E! e9 K5 Q9 ^) H' S fn new(max_workers: usize) -> Pool {}
' N- |" S: M. J- B8 ` fn execute<F>(&self, f:F) where F: FnOnce() + 'static + Send {}
! p. t/ e1 A* o/ ^9 ] B}1 H2 \+ q1 w8 Z2 u
! `) x1 W6 d" c" h9 S: U# ?
</code></pre>& C. w7 o# J3 r8 @5 J0 e# v) R
<p>用<code>execute</code>来执行任务,<code>F: FnOnce() + 'static + Send</code> 是使用thread::spawn线程执行需要满足的trait, 代表F是一个能在线程里执行的闭包函数。</p>9 }% A, F- t3 _, S3 G: ?8 x
<p>另一点自然而然会想到在Pool添加一个线程数组, 这个线程数组就是用来执行任务的。比如<code>Vec<Thread></code> balabala。这里的线程是活的,是一个个不断接受任务然后执行的实体。<br>
$ X# n3 V& {7 }" {5 [可以看作在一个线程里不断执行获取任务并执行的Worker。</p>
$ X0 P3 U3 A3 C# W2 [- U9 ^<pre><code>struct Worker where
# q; K5 b, ?( a# v4 m7 y{
* j+ m- t2 P/ l# _, g* ` _id: usize, // worker 编号7 C( [* O8 M8 l8 }3 ^6 L
}" w+ A. a* o* G( [0 _- K
</code></pre>
) F: O1 C+ d0 [& t<p>要怎么把任务发送给Worker执行呢?mpsc(multi producer single consumer) 多生产者单消费者可以满足我们的需求,<code>let (tx, rx) = mpsc::channel()</code> 可以获取到一对发送端和接收端。<br>
1 N+ O; O$ n0 F1 O1 {) P' G把发送端添加到Pool里面,把接收端添加到Worker里面。Pool通过channel将任务发送给多个worker消费执行。</p>
, s1 u7 E& k) `* h<p><strong>这里有一点需要特别注意,channel的接收端receiver需要安全的在多个线程间共享</strong>,因此需要用<code>Arc<Mutex::<T>></code>来包裹起来,也就是用锁来解决并发冲突。</p>
! O! s1 @/ o* @; ]* {7 _' u; m<p>Pool的完整定义</p>
3 W. k2 o! x) a. P<pre><code>pub struct Pool {
. z, d: V2 W+ ]! e/ h/ m8 \ workers: Vec<Worker>,
! \) ~' I6 U9 B; z+ R6 t. p max_workers: usize,
4 V, ]3 G8 @6 B, T8 T sender: mpsc::Sender<Message>+ |) V+ N, ]# K3 _* O* W1 Y
}1 O! k T4 Z, \$ @, h
</code></pre>% M* I7 b$ S H/ w' t
<p>该是时候定义我们要发给Worker的消息Message了<br>4 P) s O3 f4 X3 F1 x- ^6 ?8 y) z* V
定义如下的枚举值</p>
* S# F9 i7 m$ P6 Q* i<pre><code>type Job = Box<dyn FnOnce() + 'static + Send>;
* Y% \! V) a3 b7 }3 Nenum Message {+ Z/ ?' z/ b# s* l
ByeBye,
. `8 M. k7 k" ~* @ NewJob(Job),
1 T; M) q: r3 N2 `" H1 y}
; b$ b& }4 I3 x1 H. g</code></pre>
6 t. |2 S! a' n/ Z+ y/ @- i0 E<p>Job是一个要发送给Worker执行的闭包函数,这里ByeBye用来通知Worker可以终止当前的执行,退出线程。</p>$ t) o3 W' n# B! f
<p>只剩下实现Worker和Pool的具体逻辑了。</p>
2 `1 X0 W7 v9 C! l' B3 x<p>Worker的实现</p>; ?% n3 S/ D( N; ?: w9 Q
<pre><code>impl Worker3 `0 |+ p8 U4 j
{, M7 Y5 s) o7 r$ L( M o
fn new(id: usize, receiver: Arc::<Mutex<mpsc::Receiver<Message>>>) -> Worker {/ k7 {5 I; A+ t d
let t = thread::spawn( move || {
3 e$ Y) B! O2 ~. l& I loop {* Q: r# c; F t2 E
let receiver = receiver.lock().unwrap();
4 t! j& @; A* s let message= receiver.recv().unwrap();' u; B& }9 X: b9 I- W
match message {
. \" q3 M; {5 b Message::NewJob(job) => {2 j2 ~% F0 d& }' b4 R
println!("do job from worker[{}]", id);
+ t. y2 e0 O& a* u& P( h G @ job();
2 O4 I8 o, N0 a* Y/ ? },. }% Q' R" j/ g* {
Message::ByeBye => {% s+ o; ]( ?1 L
println!("ByeBye from worker[{}]", id);% C) Q8 P/ M; ]& B
break
( d; X5 P% D$ e, Z0 e1 ^; a2 I },
+ {0 `# C8 T: d6 Y. ~+ Z6 H }
* n w0 v' d! n }
1 |. j7 D) c, Q2 o7 u4 b5 ? });
9 t. g7 I5 N* D! N
! _9 t: a7 _1 I9 t! a* ~1 M* c Worker {
( J4 Y5 [- o, N, I _id: id,
' w; e4 G( h9 v0 R- I t: Some(t),
" l6 @/ R e. i$ c* m/ T% \9 T' W }
7 I3 q2 Q: F z4 g% w }0 e+ a8 W( f; o: a9 T+ z7 @+ [
}) ], s) _$ i6 @! }; B$ Q+ A1 x
</code></pre>. W n4 h4 y7 u, V. ^
<p><strong>let message = receiver.lock().unwrap().recv().unwrap();</strong> 这里获取锁后从receiver获取到消息体,然后let message结束后rust的生命周期会自动释放掉锁。<br>
( A" b8 u; a A但如果写成</p>
) O% O2 ^8 G: ?3 h<pre><code>while let message = receiver.lock().unwrap().recv().unwrap() {* q" D$ w, u5 l$ c( Z* M6 V0 v
};
! M: |8 P/ c- }3 f% A</code></pre>- ?' F. G, E0 t' N5 @* N
<p>while let 后面整个括号都是一个作用域,要在这个作用域结束后,锁才会释放,比上面let message要锁定久时间。<br>
( w+ l# P8 }% M* e: ~& R. qrust的mutex锁没有对应的unlock方法,由mutex的生命周期管理。</p>
3 @5 ^ |4 Z |7 a<p>我们给Pool实现<code>Drop</code> trait, 让Pool被销毁时,自动暂停掉worker线程的执行。</p>, ~' \) ~4 N6 B" k7 s
<pre><code>impl Drop for Pool {& e; v! A" p2 G9 K$ l' E: n: ?
fn drop(&mut self) { B: I; s+ Y @" D0 K b
for _ in 0..self.max_workers {
2 m+ K5 s. N% p' x self.sender.send(Message::ByeBye).unwrap();
$ g$ i0 w. h o2 V" X% g }
0 n! `: P! ^9 u: f/ j+ j for w in self.workers.iter_mut() {" e5 Z$ P' }0 q( t; I& C
if let Some(t) = w.t.take() {9 [/ \2 ]* b% F9 w
t.join().unwrap();
5 Y) P8 F- J5 ~4 E U4 b }
, X2 z/ a8 |- H) C$ L }; U. s! m4 c8 P
}
! }3 q9 {6 }4 a* s}
# ^9 Z0 G6 v2 ?/ v, ]1 I% d
2 G3 b$ n6 v" c% n5 q! Q% K</code></pre>
. [9 I: O9 k9 H<p><strong>drop方法里面用了两个循环</strong>,而不是在一个循环里做完两件事?</p>4 h1 a, V+ q5 a. A7 b
<pre><code>for w in self.workers.iter_mut() {0 w% f$ o+ p0 d$ c5 _/ C% i
if let Some(t) = w.t.take() {
, G5 {7 ?1 s% {2 J self.sender.send(Message::ByeBye).unwrap();
- S5 u) q% f- y/ V6 F* C1 n6 z t.join().unwrap();
- b" O7 z/ B( X }! Z2 u4 p7 n7 `$ o! N
}
+ s& R9 L3 K6 w: C. |% _1 }9 F4 [" [! U6 i8 i V1 s/ Z
</code></pre>
5 P; r) C! d" s* c6 Y B<p>这里面隐藏了一个会造成死锁的陷阱,比如两个Worker, 在单个循环里面迭代所有Worker,再将终止信息发送给通道后,直接调用join,<br>
5 W6 t* p9 G% A1 \- O我们预期是第一个worker要收到消息,并且等他执行完。当情况可能是第二个worker获取到了消息,第一个worker没有获取到,那接下来的join就会阻塞造成死锁。</p>
4 F! h9 Q) D$ E" C4 E& B<p><strong>注意到没有,Worker是被包装在Option内的</strong>,这里有两个点需要注意</p>
4 B' m, B0 T3 @% f% W<ol>
5 R8 p" X6 Z. ^! f: _<li>t.join 需要持有t的所有权</li>3 c) F0 z7 }+ f
<li>在我们这种情况下,self.workers只能作为引用被for循环迭代。</li>
. z1 V) V* s) O c0 G2 `</ol>
( ? I+ {4 K2 C9 `# z<p>这里考虑让Worker持有<code>Option<JoinHandle<()>></code>,后续可以通过在Option上调用take方法将Some变体的值移出来,并在原来的位置留下None变体。<br>) n, N2 _+ ^/ ~. V+ |
换而言之,让运行中的worker持有Some的变体,清理worker时,可以使用None替换掉Some,从而让Worker失去可以运行的线程</p>
( a1 M( q; l) K' z c6 V8 H<pre><code>struct Worker where
4 \' i1 p3 i+ R$ d. D. q2 ~) F{8 E. c9 D. J+ _' X2 J% ?( Z& K
_id: usize,
# D+ ~( t; r/ | T! B: Q7 H6 I t: Option<JoinHandle<()>>,7 Q, u3 `4 u8 U9 i+ f. u* f
}6 X: s+ H: X U: h
</code></pre>
; p0 u; \6 G( v+ ?<h1 id="要点总结">要点总结</h1>1 s3 j9 B& }+ Y3 U4 o$ @5 K5 D
<ul>
: ^7 ~5 H; R9 c2 _( k- `<li>Mutex依赖于生命周期管理锁的释放,使用的时候需要注意是否逾期持有锁</li># q5 \% B' S/ u+ g( ^4 `/ g
<li><code>Vec<Option<T>></code> 可以解决某些情况下需要T所有权的场景</li>
; G+ g$ v5 _& ^- c6 B% L5 x o</ul>) V+ _2 ?) f/ @
<h1 id="完整代码">完整代码</h1>$ F. Y* Q: a% Q' j
<pre><code>use std::thread::{self, JoinHandle};1 i) y0 Y) V% V" K
use std::sync::{Arc, mpsc, Mutex};
' }8 ?% C7 a% q& }2 \( ?3 c' h; Z: k
# q& Y6 Q4 i' z6 `+ Z/ N
type Job = Box<dyn FnOnce() + 'static + Send>;
* g9 c1 G# h$ @8 ]8 ^enum Message {
( `. j. c" t" R. U, {( o ByeBye,
7 z! A5 F6 H2 @ NewJob(Job),
M$ o" a0 z- P/ |$ u7 l7 h}& C5 `5 \8 ]7 z% H! _# V
) t+ m) T2 m) N: \! U+ C0 \$ jstruct Worker where
% |2 _. o; A! p% V5 L$ h{
+ ?, v: ^( F1 N% i. l _id: usize,
' ^7 p* v6 i8 B7 Y0 k t: Option<JoinHandle<()>>,1 y; e" T" q5 i) \! R6 ^
}9 K$ K3 f2 L9 c) ]
" F# q9 r& ~; y% Y2 B. Y
impl Worker0 ^% y0 |% D9 W) c# `
{% \ Y: X4 L9 \% O ~% j" }- w
fn new(id: usize, receiver: Arc::<Mutex<mpsc::Receiver<Message>>>) -> Worker {% T5 `; d; e2 h5 E3 q
let t = thread::spawn( move || {
* Q7 B: H9 z: ^. H8 h3 E loop {6 I. a$ R9 d3 g, q3 X( P
let message = receiver.lock().unwrap().recv().unwrap();% a& `3 X% K5 E
match message {
@1 y" U( y( N3 [5 E, Q; a1 n0 I Message::NewJob(job) => {4 o1 L/ b1 q, }' s
println!("do job from worker[{}]", id);
1 h0 t6 Y* b% q, } job();
. I* h O1 j! u; c },8 V, y: c% ~7 G% a; H
Message::ByeBye => {
4 m j& {/ Z+ g$ k$ p- d println!("ByeBye from worker[{}]", id);; C5 h( V( x. S
break2 c, c) C8 }# [' I( B2 s+ v
},7 W1 A+ X3 b% @. W( }' Q# V8 g- p* p
}
9 }& x( i0 E4 Z }: [+ T$ s+ ^& w Y) u: W9 l
});
- I: _) x; {; a# x! t
y1 F/ o2 h. L! _! g- x Worker {
8 \3 q# `1 V/ f _id: id,
8 i2 v7 K @5 F1 F" u: ~ t: Some(t),
6 Q4 T% p6 ~1 {; Q" V) q }* T) N- T" X6 R
}
2 {; ^8 Q: {' W8 x}
; z1 C2 d3 Z( f/ C' t1 x8 s, P7 e. u. w$ q
pub struct Pool {
I9 s8 z1 p4 s: G- J6 D* U workers: Vec<Worker>,
* K* |6 Y/ ~8 A3 F" f max_workers: usize,
0 X& Z; P+ \. |: f. i& r sender: mpsc::Sender<Message>
8 w3 j7 V1 u2 ^ u! D}! F) _" S+ A9 _
# t. u3 H4 n- h7 k+ u+ h8 @4 t
impl Pool where {
% D/ F$ a1 Q+ f2 d pub fn new(max_workers: usize) -> Pool {9 b- A$ d2 d! T% f
if max_workers == 0 {0 c% t6 f0 p: z$ F* x3 ], O1 z, `
panic!("max_workers must be greater than zero!")4 X' ~8 v$ F- }& y5 M
}* i2 P* o' @) d, K; L6 v
let (tx, rx) = mpsc::channel();% d: O$ w1 r# u+ g* K6 e3 P. E& \6 _2 E
; S! i2 J0 ?% `: Q- i. I let mut workers = Vec::with_capacity(max_workers);8 \1 d/ `' s+ }! @6 i) b' {+ _" x1 F. M
let receiver = Arc::new(Mutex::new(rx));+ Z$ n: O" x( S7 s1 N6 R8 ~9 N. H
for i in 0..max_workers {; z: @* b" }' A4 U
workers.push(Worker::new(i, Arc::clone(&receiver)));
& W8 U" }7 N& y Q. k! V( G B }
2 G6 \* u$ x* r1 ?: K, k6 }! o0 `7 s _4 W, L" z% o' A
Pool { workers: workers, max_workers: max_workers, sender: tx }; H! a. ^2 Z& W7 n2 l1 F1 x
} k' ?4 _; B$ I% c# r" Y
5 q1 a! m$ J- p: t7 E U pub fn execute<F>(&self, f:F) where F: FnOnce() + 'static + Send
3 ~6 c/ R- ?* @ {1 Z% Z( }/ S3 H( ^( q
- c' M& D( N. V7 n let job = Message::NewJob(Box::new(f));5 n& v" c a' v0 O, W* k
self.sender.send(job).unwrap();
9 |/ C9 W$ N: p/ E }
6 R. y: U- y' x/ w3 C0 ^; y, a3 z}
: V2 f) T' f" W* A3 Y+ b8 b7 s
: i* [0 m. F6 E1 W# n8 G6 x5 Oimpl Drop for Pool {
( D c. `5 L- ?1 A: P$ N fn drop(&mut self) {: ], P1 N+ n. ]! L' ^0 n: w
for _ in 0..self.max_workers {3 M/ X% f. o! B2 y; \; e M7 a
self.sender.send(Message::ByeBye).unwrap();* x8 z& ?# l) O) L" L2 f( S& `
}& j3 \/ W1 v# y' X) u: R
for w in self.workers {
" e9 v) w- g9 A* D if let Some(t) = w.t.take() {6 z8 s; ^" {8 x3 {0 O6 I
t.join().unwrap();
" \! h/ \" s6 p( p' X) r }
3 C4 R/ g9 Z8 { A$ _0 l }! k$ g. h, M* ]5 Y1 B
}
( D# G' D4 s( H2 n# F# {4 C}
! A* }) F# {7 {3 j9 A% j3 g6 C; R, t6 T: R
5 Z! ]* a7 O4 m2 f* C% {* I#[cfg(test)]" Q) {7 J* B- E: S
mod tests {2 H! n2 y( R3 C2 K- `
use super::*;
4 @0 \2 }- U! m8 {/ r, O# k; T #[test]
& c Y, U8 a7 Y3 E, M& h$ X fn it_works() {9 ]" G0 ~/ B' v9 d8 t
let p = Pool::new(4);
2 u, l# G/ m' p- r0 ~ p.execute(|| println!("do new job1"));
# K. b/ g" D/ C( y2 R" C p.execute(|| println!("do new job2"));
, C# u8 n$ I$ A$ x! B. d+ r& Z p.execute(|| println!("do new job3"));
6 k5 k9 z8 B+ ]4 L3 m p.execute(|| println!("do new job4"));
/ ]+ Y [% U$ f6 h }
* |) Q3 \: V3 } ~: I8 G}- l/ B( |6 x8 _ S: V2 [* R
</code></pre>
+ z1 G/ I% V0 R; Q$ M" _8 h+ N8 U( F$ x
|
|