|
|
3 W3 N! O' ]) e<h1 id="如何实现一个线程池">如何实现一个线程池</h1>- o% D0 ^2 |5 |! d9 ?
<p>线程池:一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。 例如,对于计算密集型任务,线程数一般取cpu数量+2比较合适,线程数过多会导致额外的线程切换开销。</p>
0 T2 X. }& F! h<p>如何定义线程池Pool呢,首先最大线程数量肯定要作为线程池的一个属性,并且在new Pool时创建指定的线程。</p>
. E; }! z: B# w; O& v<p>线程池Pool</p>
1 z' B( v# y1 B) S<pre><code>pub struct Pool {) M K" n. Z+ @3 ^! ]7 S; q, a9 B. D
max_workers: usize, // 定义最大线程数, U( ]2 g& _) {4 I" j
}8 r3 H# `- l" J, ?1 o# B
% \0 b3 H5 S" Z$ U/ g3 C/ I- R" O9 E( simpl Pool {
# x- B3 S- T+ m4 g2 h fn new(max_workers: usize) -> Pool {}: N' x9 k& u. V5 J
fn execute<F>(&self, f:F) where F: FnOnce() + 'static + Send {}, k+ V" k+ m; W1 B/ v9 j
}* ^% z( ^+ X" [$ Z& _# v6 y1 d% {
1 L3 o+ i4 e, P4 U7 U: R
</code></pre>3 H$ z; U4 i. J: E7 o
<p>用<code>execute</code>来执行任务,<code>F: FnOnce() + 'static + Send</code> 是使用thread::spawn线程执行需要满足的trait, 代表F是一个能在线程里执行的闭包函数。</p>2 W2 F! n: o1 m
<p>另一点自然而然会想到在Pool添加一个线程数组, 这个线程数组就是用来执行任务的。比如<code>Vec<Thread></code> balabala。这里的线程是活的,是一个个不断接受任务然后执行的实体。<br>
7 n$ @( \ F4 C q* y. U可以看作在一个线程里不断执行获取任务并执行的Worker。</p>
/ j! V) m; l4 s6 G/ k g9 ^<pre><code>struct Worker where/ p! ~. c# P7 a ^9 p
{
5 y5 r5 Q: \# A4 [* b) | _id: usize, // worker 编号( T; w3 f4 X, j
}5 ?3 {: y( b5 {1 a0 H s2 d
</code></pre>) f, R- n6 w7 E( u
<p>要怎么把任务发送给Worker执行呢?mpsc(multi producer single consumer) 多生产者单消费者可以满足我们的需求,<code>let (tx, rx) = mpsc::channel()</code> 可以获取到一对发送端和接收端。<br>
) T( n' X. O* ~! Z把发送端添加到Pool里面,把接收端添加到Worker里面。Pool通过channel将任务发送给多个worker消费执行。</p>- `8 e. `# n. M. q0 ?
<p><strong>这里有一点需要特别注意,channel的接收端receiver需要安全的在多个线程间共享</strong>,因此需要用<code>Arc<Mutex::<T>></code>来包裹起来,也就是用锁来解决并发冲突。</p>
8 r! c% o# }2 |9 ]) o% H: m" P( b<p>Pool的完整定义</p>
0 b P5 e( M' Q/ E2 T( Q0 ?<pre><code>pub struct Pool {* x4 q+ q& I+ ~3 b! `
workers: Vec<Worker>,- U }' p9 F5 |7 }% F$ s- w
max_workers: usize,( g: H8 a! h3 z4 w) r
sender: mpsc::Sender<Message>
9 n; P5 m' Z' G4 b, N$ v. _}
* C$ S1 `4 M5 M- q9 R0 F2 {7 Q# h</code></pre>- r0 f. B7 R% Q# m1 d; ^
<p>该是时候定义我们要发给Worker的消息Message了<br>
# U! V" Y$ z+ d6 k定义如下的枚举值</p># |1 M+ g: y+ @$ q% ^
<pre><code>type Job = Box<dyn FnOnce() + 'static + Send>;- u t/ ?. X9 p& m& B4 j1 f; b0 b
enum Message {# r% b' V" M7 g' n9 j7 ]
ByeBye,
" H3 s4 `5 I* P8 B NewJob(Job), `. \9 o1 t+ b+ X( u3 L2 h$ z# u, s
}
9 a$ ~& J. y6 I6 F" h; p</code></pre>; R# H8 ^6 y( @. v2 R/ ?( d7 {. R
<p>Job是一个要发送给Worker执行的闭包函数,这里ByeBye用来通知Worker可以终止当前的执行,退出线程。</p>/ Q' s' x+ |6 `
<p>只剩下实现Worker和Pool的具体逻辑了。</p>
6 ^1 I( c6 T) |5 t8 u+ l& F1 t! U<p>Worker的实现</p>
! N; z: O O6 C% z' c4 g" t<pre><code>impl Worker
" t1 F8 S( @; ~0 K/ `{6 }# E. A( b, f- E& F7 r! f
fn new(id: usize, receiver: Arc::<Mutex<mpsc::Receiver<Message>>>) -> Worker {. h5 U. ^4 k( D7 X
let t = thread::spawn( move || {3 Z# a' g* ^1 Q! z9 T
loop {$ @$ F7 O; B- i. b7 c5 l5 M! s
let receiver = receiver.lock().unwrap();: b; ]1 s; H6 r6 c% H+ \& H
let message= receiver.recv().unwrap();7 A3 z: o+ {* Y' [& b
match message {8 J. f: M* }: I
Message::NewJob(job) => {
7 w: b/ e. B3 ~5 T h println!("do job from worker[{}]", id);7 Z# ^* @6 ]+ C4 |) o- j
job();7 `5 o: h% S/ }. F- k. Y+ k
},3 G. H* t% N8 {& ^; E7 \
Message::ByeBye => {" m( [; g) U, A. g8 o
println!("ByeBye from worker[{}]", id);5 n# ^4 E p3 h9 @
break
3 G/ }( R0 Z) v$ f0 e7 e- P },
6 ~) f9 D5 A5 m- J } $ Q" g6 l$ c% w* p' k
}
( `; i# s1 h" i- f- k1 X% Q) C4 N });6 G4 Y# j5 m- q! c9 Y4 m# U. ?
4 c$ {9 T7 ]/ Z
Worker { }% Z( j; K2 E& I4 A0 ]5 j
_id: id,
. G: M$ P5 `$ P t: Some(t),
' |) b8 p) u4 ~. f' P% j4 n4 ^ }$ v) {, r8 |8 D1 a$ Y1 G& w
}
1 K# V9 _7 s4 A& F" i}
: s: F0 S4 t! C0 _* X9 j</code></pre>
: f- `8 s. h( s! o2 C& k<p><strong>let message = receiver.lock().unwrap().recv().unwrap();</strong> 这里获取锁后从receiver获取到消息体,然后let message结束后rust的生命周期会自动释放掉锁。<br>- j4 k6 P7 @. b+ P" E; }
但如果写成</p>
+ }' [* c! V( ~<pre><code>while let message = receiver.lock().unwrap().recv().unwrap() {# ^& Z: D$ E8 S" w& b5 `9 X5 f
};
3 i8 q- |, G4 \' t- x</code></pre>0 M! P9 ~2 [. r. I7 {4 G
<p>while let 后面整个括号都是一个作用域,要在这个作用域结束后,锁才会释放,比上面let message要锁定久时间。<br>
# S# k. i- ?' Urust的mutex锁没有对应的unlock方法,由mutex的生命周期管理。</p>
5 n2 Q5 z7 u! Y V) ]<p>我们给Pool实现<code>Drop</code> trait, 让Pool被销毁时,自动暂停掉worker线程的执行。</p>
" ^8 N \) k4 T& g3 l<pre><code>impl Drop for Pool {1 `1 z. O* C/ `$ Y7 O
fn drop(&mut self) {/ Q v# U1 B9 l" E% D0 q' X6 n1 c
for _ in 0..self.max_workers {
$ l4 W: ?7 S5 f- B! U% l2 \( | self.sender.send(Message::ByeBye).unwrap();$ H( A( `! ^( |3 D$ m* a- S; ~6 A
}
8 Q) \5 i; L/ I5 f for w in self.workers.iter_mut() {" f7 _9 H. m; h' t& Q
if let Some(t) = w.t.take() {
) k" M2 _+ \! f t.join().unwrap();
0 K4 u9 x/ M# A. k1 x8 j: \ }: M9 _/ t+ K: q* j/ z
}! S( x1 V- w) Q% m" _
}
5 [- z; F6 A7 m9 Q- w. m* k8 x4 u}
3 m- _, J0 ?. U
2 j; r, G+ ?2 N& \) K; I5 E</code></pre>
( ?" d( Q, p9 E% P, w, a<p><strong>drop方法里面用了两个循环</strong>,而不是在一个循环里做完两件事?</p>. f" Z* U+ F1 t) q
<pre><code>for w in self.workers.iter_mut() {# F6 h8 l1 P% T6 M. {
if let Some(t) = w.t.take() {' ?/ L8 w) i( e N
self.sender.send(Message::ByeBye).unwrap();
' k2 E9 h+ d4 b; p8 @6 n/ Z t.join().unwrap();$ n2 t4 W+ o+ N
}7 d3 J8 `* a/ ?- |. Z8 r3 [+ H- c
}
7 s8 P7 T- ~- Q: G, ?0 x
. a+ p7 U2 F, J9 E- g$ ~</code></pre>
/ N7 Z- S/ K* z, x% H3 q/ L<p>这里面隐藏了一个会造成死锁的陷阱,比如两个Worker, 在单个循环里面迭代所有Worker,再将终止信息发送给通道后,直接调用join,<br>
0 E( j- C6 i' Z7 e% g2 l) q我们预期是第一个worker要收到消息,并且等他执行完。当情况可能是第二个worker获取到了消息,第一个worker没有获取到,那接下来的join就会阻塞造成死锁。</p>2 p3 y+ d8 e2 Y
<p><strong>注意到没有,Worker是被包装在Option内的</strong>,这里有两个点需要注意</p>
& Q% U+ x' k6 H; R' H<ol>
$ s; s' m! A9 c. ^: G! ~; K/ U) x- o<li>t.join 需要持有t的所有权</li>
: q( ^5 j h- o' O3 T<li>在我们这种情况下,self.workers只能作为引用被for循环迭代。</li>
! C. R1 n: I& }+ n$ M</ol>
3 Q8 R; d* y' x- S, ]- B<p>这里考虑让Worker持有<code>Option<JoinHandle<()>></code>,后续可以通过在Option上调用take方法将Some变体的值移出来,并在原来的位置留下None变体。<br>4 v! Y' L! z4 S$ E
换而言之,让运行中的worker持有Some的变体,清理worker时,可以使用None替换掉Some,从而让Worker失去可以运行的线程</p>
2 s R4 \6 Z2 b. u<pre><code>struct Worker where
# P2 }: ]. k$ j; k: h{/ T/ o, N" _* v7 C1 Y8 c
_id: usize,/ i' L" z- {: y, F# n: \
t: Option<JoinHandle<()>>,9 K$ a- }# ~+ q9 q
}, [5 t) W' g" @1 m3 a6 f
</code></pre>9 h9 d3 f& Y0 n# U' q5 A
<h1 id="要点总结">要点总结</h1>" ]1 m7 M+ w ]( v. J$ y
<ul>5 ^' E7 Q: W E& U' o1 d
<li>Mutex依赖于生命周期管理锁的释放,使用的时候需要注意是否逾期持有锁</li>
) `/ b9 w1 T9 }+ U<li><code>Vec<Option<T>></code> 可以解决某些情况下需要T所有权的场景</li>5 Z; [- }: {/ d5 `* O0 M
</ul> O8 S- R' e! r+ ~( ]' n
<h1 id="完整代码">完整代码</h1>
0 q8 y' p0 l# [<pre><code>use std::thread::{self, JoinHandle};
. u) Z0 T l8 D+ l7 Luse std::sync::{Arc, mpsc, Mutex};: ] I& W# @5 M$ r7 q' I- f
* P/ G& U# j- D$ n! X" F6 d+ Y. C% t- ^2 S6 c1 m
type Job = Box<dyn FnOnce() + 'static + Send>;
* W6 f1 `. F) U9 W( ^0 r U% senum Message {
; K: V. Q$ M l4 Y+ }0 q6 Q4 }$ A ByeBye,3 K. o4 u% C! T7 B6 }
NewJob(Job),
7 J. A! H# T- }5 s/ o6 P5 J0 n}/ b( M F7 S- L+ Q0 m9 S; Q) ]
0 z* Y9 q% q# D: p/ Q
struct Worker where% x' e. b$ ~ S+ J
{
+ p' v3 U* ] @8 @" a+ j _id: usize,4 z6 n' V0 M) S& G
t: Option<JoinHandle<()>>,8 |" R3 f1 A! Q9 R+ a- r0 B
}- V% C5 t# m. g) R: r g/ `$ E
2 N. t9 M0 ?9 g K, i6 M$ C) e
impl Worker
1 x/ w1 m# s/ n0 |{" }5 h5 N T4 T4 H
fn new(id: usize, receiver: Arc::<Mutex<mpsc::Receiver<Message>>>) -> Worker {
) E9 B) S. o% e5 F2 @' N# v2 I1 r, ?: t let t = thread::spawn( move || { a0 c, @/ S( Z6 |
loop {8 \1 x9 F% g/ t7 a. h9 J# L- Z4 n
let message = receiver.lock().unwrap().recv().unwrap();7 ~. G: H) g, R1 G8 }$ s% P/ u
match message {% h" u% E+ U' f7 v
Message::NewJob(job) => {
, h% F0 n0 q a# Q; z println!("do job from worker[{}]", id);
$ G: ~ l9 Y/ w0 a8 K job();
3 z: p4 l, e7 o' {- p! M },
- y% o; R! Z# d7 w8 f Message::ByeBye => { x9 R% [, l1 {! P5 v
println!("ByeBye from worker[{}]", id);
1 d2 \( ~* [0 { break
. c* A; h( `- W4 M: V! T" T },
) t+ u: @3 g$ v) N) x4 L" k& C }
2 M/ X& x. N' t6 u* [ }
/ r- P+ s" r& C% T x. F });6 x( m1 `7 s2 N* V/ o
7 e/ J) N! f& _3 w0 e& x% r
Worker {
& P7 O/ z0 i, }2 Q5 j' m _id: id,
: m. t. ]1 c# m! V$ \1 S3 L" g t: Some(t),: x& K7 G: @, q4 _- ]5 s
}
& M7 Q: t y$ h5 p2 p" H3 b }; j7 v# L7 C' p, `7 g
}6 j4 `% B1 _3 d0 m0 u8 X& x& \7 g# z
- a6 L' i1 p* I! }& g8 l
pub struct Pool {
0 V' i' s6 C1 Y9 `: R: U- P workers: Vec<Worker>,
1 R3 c# o& a4 ?/ e max_workers: usize,& f |/ K/ W+ g/ {( W" W/ ]& \3 q
sender: mpsc::Sender<Message>
5 E. c: l u+ u" }$ G! S+ v5 U}$ X5 ~9 |6 y2 |1 K
- q# e' f' [6 y+ J$ o
impl Pool where {1 C) k d: y- g2 J4 h
pub fn new(max_workers: usize) -> Pool {
7 c, v5 s2 v! q& v4 z- k% p5 g if max_workers == 0 {
7 C1 F6 l" C2 O* \0 f* r& n panic!("max_workers must be greater than zero!")" S6 G4 F! c' y! u; k4 Q0 y
}* R7 X( i6 ]/ m4 Y, s" c/ J
let (tx, rx) = mpsc::channel();$ q4 e% _6 S" B% B) M
8 S6 V4 A& ~! ^. r: W6 S let mut workers = Vec::with_capacity(max_workers);
5 O8 I% T$ T/ Q6 F let receiver = Arc::new(Mutex::new(rx));2 V' n4 `6 A: J
for i in 0..max_workers {
* _0 h) w2 \, ` workers.push(Worker::new(i, Arc::clone(&receiver)));1 R+ T8 }1 C+ F4 |& V ~3 D
}
9 U! j) E8 q% s8 P. C- ?, n: l
/ P" ?6 k0 B, s7 R! M- A0 U Pool { workers: workers, max_workers: max_workers, sender: tx }
! `6 h1 k" K% B% O; K3 _$ a0 N# E }
2 Y' _5 g! i7 y c& ^* Q $ L( A' g+ i" y: v1 ^$ T; W
pub fn execute<F>(&self, f:F) where F: FnOnce() + 'static + Send" K3 K! t2 L$ N/ g/ D$ b/ [0 G! U
{% p% L& G$ x$ E! U) c
- J9 ]3 V: ?9 u
let job = Message::NewJob(Box::new(f));
; N: j# J i+ d' Z self.sender.send(job).unwrap();
* I: U0 Y* A- f* z }% _& r2 |6 Q% L( k5 L, h
}
% P/ G" w$ I3 ]! S
* U$ I$ Y! e6 I+ fimpl Drop for Pool {
$ h8 ^" S1 p5 ~% | fn drop(&mut self) {
' T5 W8 }6 m F6 [+ p: k" K S for _ in 0..self.max_workers {1 g; e7 v8 r; K8 J) s3 Z
self.sender.send(Message::ByeBye).unwrap();
, p% V7 O t6 d+ ` }& ~0 z( x; ]- c
for w in self.workers {6 H( n1 V; w1 n$ c7 B6 E4 |' M
if let Some(t) = w.t.take() {
% p7 l2 J/ l$ ?- L t.join().unwrap();
$ \2 V1 L6 l9 x/ P9 Y8 p0 z/ ` }
* V: ?% c" q1 E( u1 ?: r }* [# P: S8 {) Y& z' _
}
& k- X% v: Q* y: |}
, h0 Q7 v, P1 h
/ x' t9 T* c% h& b7 i. P- J1 \/ A# V# f1 h
#[cfg(test)]4 ^( N( R4 m8 S N+ ~! _
mod tests {; n2 X) ^, C. }7 q" }+ V: z
use super::*;
) v1 Y3 ^/ z! s5 Z O! O #[test]
6 i; L! d: ~6 Q( q, M/ o2 L: ~' c fn it_works() {
" c. i4 Z/ m% g; { let p = Pool::new(4);
& @" B+ L- m) y p.execute(|| println!("do new job1"));
, e0 ^) p8 Z. }& w& k3 `! z$ u0 v. C" U6 n p.execute(|| println!("do new job2"));
- {* u5 Q {$ h1 o p.execute(|| println!("do new job3"));% B, a8 v/ |8 L3 ~
p.execute(|| println!("do new job4"));
1 Y) c* K4 w1 v9 b }
4 K* f# `6 K8 x- s" K}
3 q, w. X! K6 `3 a( ^/ p</code></pre>3 ^* o, s+ t" N! S% v7 ~$ l1 [
) M1 P6 @0 N6 D; ~- }+ Y; {1 h
|
|