|
|
, p* A8 R1 s9 y; J6 F<h1 id="如何实现一个线程池">如何实现一个线程池</h1>( V, {+ ^5 B8 X$ i3 F1 l2 ?/ p
<p>线程池:一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。 例如,对于计算密集型任务,线程数一般取cpu数量+2比较合适,线程数过多会导致额外的线程切换开销。</p>; z: U% j. K% R9 c) Z+ H
<p>如何定义线程池Pool呢,首先最大线程数量肯定要作为线程池的一个属性,并且在new Pool时创建指定的线程。</p>
# L1 V& V4 M2 a# c e9 M9 Y4 d! G' n<p>线程池Pool</p>+ Y& ^4 F3 |$ p3 M/ i L, L& D2 O
<pre><code>pub struct Pool {
' F/ F; J- f% H% {* | max_workers: usize, // 定义最大线程数7 U. h5 ^8 w4 `9 N" A, x7 p
}) m/ c/ ^4 K W# ~7 _
0 F6 O: x4 q$ S$ S" {impl Pool {: i; w I5 ?6 D- Z- P4 b: i
fn new(max_workers: usize) -> Pool {}7 j3 O" x4 |" Y2 O7 {
fn execute<F>(&self, f:F) where F: FnOnce() + 'static + Send {}
3 e1 ^" f! r4 Q& Y5 [' ]1 R}2 B# H6 a8 \$ p/ @5 J8 p) X" ~2 M& ~
2 [" n" e6 T y' O9 _</code></pre> s' H2 r3 T U9 C# ^: L
<p>用<code>execute</code>来执行任务,<code>F: FnOnce() + 'static + Send</code> 是使用thread::spawn线程执行需要满足的trait, 代表F是一个能在线程里执行的闭包函数。</p>
9 ?3 V& Y7 G3 C# A5 y/ O<p>另一点自然而然会想到在Pool添加一个线程数组, 这个线程数组就是用来执行任务的。比如<code>Vec<Thread></code> balabala。这里的线程是活的,是一个个不断接受任务然后执行的实体。<br>1 t' Z1 \* Y6 f* s7 `
可以看作在一个线程里不断执行获取任务并执行的Worker。</p>( N1 Y, ]7 y* u6 p, w7 y3 }
<pre><code>struct Worker where
$ |% z# e! C% m& V{7 x. ~5 l( x; h8 B
_id: usize, // worker 编号
! Q' I+ X; A: c+ C6 V: y}
" [( j3 f; u! j: z. ]$ b: i</code></pre>1 ]. i- I" X. O' |. V' D
<p>要怎么把任务发送给Worker执行呢?mpsc(multi producer single consumer) 多生产者单消费者可以满足我们的需求,<code>let (tx, rx) = mpsc::channel()</code> 可以获取到一对发送端和接收端。<br>
: ` ^7 a+ ?3 D. U8 `' i把发送端添加到Pool里面,把接收端添加到Worker里面。Pool通过channel将任务发送给多个worker消费执行。</p>
- p8 y, Z; n n( S2 U<p><strong>这里有一点需要特别注意,channel的接收端receiver需要安全的在多个线程间共享</strong>,因此需要用<code>Arc<Mutex::<T>></code>来包裹起来,也就是用锁来解决并发冲突。</p>
5 ^( A& n& Q, J<p>Pool的完整定义</p>
$ l# c1 _9 j% z: P9 z<pre><code>pub struct Pool {
4 N$ {0 k, P1 x/ o) D. w1 y workers: Vec<Worker>,
3 Y X* @* w Z$ a! c6 I9 m max_workers: usize,
% [: v& l) A9 b! ?; s/ A& R sender: mpsc::Sender<Message>* P/ Z1 H/ Y& `* ]9 {$ K
}
- ?' ]5 g; X' u4 _# @: H</code></pre>
I9 V4 H/ f- T4 g<p>该是时候定义我们要发给Worker的消息Message了<br>
7 c9 i( F% B: h% l# i# Q: ~. \% V定义如下的枚举值</p>* X2 Z3 Q) a, M& C
<pre><code>type Job = Box<dyn FnOnce() + 'static + Send>;) C1 h/ h8 g" Y* l, O4 `4 l
enum Message {
$ _' @8 V& f( n$ t6 s/ T, F5 S ByeBye,1 h+ q* V9 I- X% O9 @ h. { _! J* B
NewJob(Job),
: x$ H. }& c" ~1 d' V}+ j+ w8 W. t* U! q- y+ ?6 a' f0 I
</code></pre>4 r" |3 V; Q; @3 ?# E$ v
<p>Job是一个要发送给Worker执行的闭包函数,这里ByeBye用来通知Worker可以终止当前的执行,退出线程。</p>3 G+ m7 j& ]0 g( d: Y
<p>只剩下实现Worker和Pool的具体逻辑了。</p>( t! ]) p6 i5 m" e4 o
<p>Worker的实现</p>; d: }3 F7 |1 M: |5 Q# a, W4 X
<pre><code>impl Worker9 V: s9 w: |3 [
{
G2 p; D* F6 \. ^5 J fn new(id: usize, receiver: Arc::<Mutex<mpsc::Receiver<Message>>>) -> Worker {
# @# T0 J/ f# w. k* c let t = thread::spawn( move || {: o' G* h' y3 o' J* K. C
loop {
4 B0 l, B) b4 G. l" p let receiver = receiver.lock().unwrap();
5 d3 ]- e' G- I. n# ` let message= receiver.recv().unwrap();
- A# J, D: w X7 |; M, b! E( ^ match message {; V4 i5 u% ^; |( X$ y
Message::NewJob(job) => {1 R; t8 g+ f0 ^+ T) O
println!("do job from worker[{}]", id);' M; a: q, t+ w& L
job();
0 P' X U* R2 F }, |( [( o( A$ [
Message::ByeBye => {" g) e- m' L4 V' g$ G1 g# e
println!("ByeBye from worker[{}]", id);
, m- W% F7 ~% D2 [! s y break5 T7 y& d" Y2 s' h. j
},5 j0 v; }; i5 E/ U4 @ o+ g
}
% j- a/ [( b+ e- Z0 y: o }8 {( \& V) {% |6 j$ l! U8 A" r
});. d) E$ y/ {" ~" F, l& _/ }
3 o$ z$ C) ]7 | Worker {# m8 l% A- b, Y1 o: `
_id: id,
! l; I5 Y) y* r' A5 Y t: Some(t),
. j2 }8 g5 S# h3 Q. ~ }
* B' G! r# A3 C } z/ n+ r8 ~# Q7 ]( g
}
0 U. u, Y0 a, |+ ?& e; G</code></pre>
, j/ ~' w8 M* e0 o& j3 H<p><strong>let message = receiver.lock().unwrap().recv().unwrap();</strong> 这里获取锁后从receiver获取到消息体,然后let message结束后rust的生命周期会自动释放掉锁。<br>
- ]( y4 H) `3 E! j* O8 |但如果写成</p>) K& }+ K9 r' G' x- D! s
<pre><code>while let message = receiver.lock().unwrap().recv().unwrap() {6 r1 u2 ~) @9 C6 r0 {' m% ~
};6 t: C1 v+ d) j2 n& E& j% j K* q
</code></pre>
! \. a9 X6 `/ J' V& z<p>while let 后面整个括号都是一个作用域,要在这个作用域结束后,锁才会释放,比上面let message要锁定久时间。<br>
I* P" f& W. V' a( m* J# M, jrust的mutex锁没有对应的unlock方法,由mutex的生命周期管理。</p>
* L( }: W6 N8 @; Q% z<p>我们给Pool实现<code>Drop</code> trait, 让Pool被销毁时,自动暂停掉worker线程的执行。</p>3 j& v P7 T/ N( Z$ t" R% ^; P3 E, b
<pre><code>impl Drop for Pool {' y6 j8 o) d) T" b
fn drop(&mut self) {
1 O, `3 g2 a% H for _ in 0..self.max_workers {! K$ Q, Y& G0 U' s2 H }
self.sender.send(Message::ByeBye).unwrap();
4 `$ i8 h' p7 U2 q. Y! N' u }
9 f, k4 n4 T- x4 M x for w in self.workers.iter_mut() {
6 k8 m* r% o0 i* k& M8 _* z6 B& ~ if let Some(t) = w.t.take() {$ H' G u$ O8 v. i6 ]
t.join().unwrap();! H8 O9 F$ k7 L+ ?8 p
}/ Y: T5 w+ y) O
}" L$ U% h/ [: R9 w. M
}: t' Z9 T+ l% ~. S
}0 R6 A) _: L; E# R& ^
& K7 P+ c- @6 `! K k; J
</code></pre>
2 d( \- a4 o8 \1 l( W) t! t6 l l, S<p><strong>drop方法里面用了两个循环</strong>,而不是在一个循环里做完两件事?</p>
/ l- ~5 K h$ h6 G<pre><code>for w in self.workers.iter_mut() {! S* o B6 F! O" Q# s, r% ^; v+ a9 U, U
if let Some(t) = w.t.take() {# N% ?( ~& V# W" ^5 G% D
self.sender.send(Message::ByeBye).unwrap();
2 V+ W4 q. F; w t.join().unwrap();. \$ b" f* p0 F+ N- b- h3 a; [
}8 P8 T$ V0 d n' u( Z# N; ~
}1 z, M% m r! u
9 S; X# P- t( V; k$ f, E2 i
</code></pre>$ P8 h- v/ f. L
<p>这里面隐藏了一个会造成死锁的陷阱,比如两个Worker, 在单个循环里面迭代所有Worker,再将终止信息发送给通道后,直接调用join,<br>3 e( L7 m8 a1 h0 X; L% N, s* E
我们预期是第一个worker要收到消息,并且等他执行完。当情况可能是第二个worker获取到了消息,第一个worker没有获取到,那接下来的join就会阻塞造成死锁。</p>0 ?; x$ Z$ X% I0 `/ G% e# I
<p><strong>注意到没有,Worker是被包装在Option内的</strong>,这里有两个点需要注意</p>
: |; z" q8 W, }1 F: P9 t<ol>% V' A/ [5 |* m( |; [" Z
<li>t.join 需要持有t的所有权</li>+ y& E! `( N% I+ c6 W+ B
<li>在我们这种情况下,self.workers只能作为引用被for循环迭代。</li>; G0 V$ O0 |* H0 v" I
</ol>+ ^ P7 K$ h( x, B) u
<p>这里考虑让Worker持有<code>Option<JoinHandle<()>></code>,后续可以通过在Option上调用take方法将Some变体的值移出来,并在原来的位置留下None变体。<br>' L, b6 d9 I; y' O' q
换而言之,让运行中的worker持有Some的变体,清理worker时,可以使用None替换掉Some,从而让Worker失去可以运行的线程</p>9 \1 N9 H5 G$ Q$ h, A- N( a
<pre><code>struct Worker where
8 q" h* K# D. Z; a{6 m! A0 j# Y8 M8 O% G
_id: usize,
1 v9 R& v& X b6 K' m t: Option<JoinHandle<()>>,, i* f% F9 M, M3 d, O
}
# ~1 O$ b2 @" l6 c* W4 \7 N3 J</code></pre>
- Z/ r* G$ \+ H ]$ X, [7 ?<h1 id="要点总结">要点总结</h1>
4 c, R6 W0 n J<ul>! R, Q& ?( [/ ~9 X) r2 w
<li>Mutex依赖于生命周期管理锁的释放,使用的时候需要注意是否逾期持有锁</li>
" P; r# Q2 [4 [5 Q. [# M7 o<li><code>Vec<Option<T>></code> 可以解决某些情况下需要T所有权的场景</li>
1 s! E) A! r- |* |3 o w</ul>
/ }7 y5 v4 L. k- g/ F<h1 id="完整代码">完整代码</h1>
! M$ r) {; q; b; R* G7 E9 [" C<pre><code>use std::thread::{self, JoinHandle};
8 {1 u! F5 \$ S9 w, suse std::sync::{Arc, mpsc, Mutex};
$ t7 P' {* S+ {0 I8 W) R4 b& Z( Y" _: @8 b0 M
2 A, l; F8 a9 s+ W- o
type Job = Box<dyn FnOnce() + 'static + Send>;# A% L8 m4 t4 S6 C
enum Message {
" J, ~# g7 r# K2 j+ A ByeBye,3 [3 q3 z: z" x2 |6 h+ k: b
NewJob(Job),1 ^5 t- p$ j e+ S
}" z# X, A# _. S; ^: \4 J
) }( }1 W: B9 Z2 P' n# K/ ?+ ~+ sstruct Worker where
) ?7 T# `' d& \3 Q7 L! E' p{
- C& d' ^9 m4 _; ?+ Y p% ~$ N _id: usize,
0 Z* Q9 Q% R! t/ r7 d- @ t: Option<JoinHandle<()>>,! w2 w1 y1 _% d! ~, H v
}
+ A6 z9 K8 |4 Z" S/ w9 k( |( Z
impl Worker
4 s9 {6 M* k' [; D, p |# V1 C) A5 c{+ e# H2 V$ B- i' f
fn new(id: usize, receiver: Arc::<Mutex<mpsc::Receiver<Message>>>) -> Worker {
" a$ K# V0 h/ H8 m0 Y+ ] let t = thread::spawn( move || {% z. d2 J( f: o* I. m
loop {0 L8 P6 c. ]# I: `' @1 P! K
let message = receiver.lock().unwrap().recv().unwrap();
$ R. m ~5 a! c0 U match message {
/ J$ S3 k5 j% p: n6 _! C Message::NewJob(job) => {1 q8 F6 W+ V6 ?) g; n
println!("do job from worker[{}]", id);$ A: L+ B6 }6 I, ]/ _# s
job();* }6 h! G8 r M- K8 O7 F+ x9 w: h
},8 }$ [9 `: m9 J1 `" E3 G
Message::ByeBye => {) a: x: M; B C+ p/ X3 m
println!("ByeBye from worker[{}]", id);
% I' K, M' J$ D+ r4 x break
" X% C8 G9 ^8 p },/ k8 i) M. _8 x' T7 Q& N! z9 A( u
}
/ j) }" M( d7 j }
3 _- H. u3 G; c });( y$ a* A$ Z/ q1 m$ q( E* N
; y% h4 }/ z$ B3 S2 ~ T9 \( B
Worker {+ A9 q! j1 ~9 B% P
_id: id,
) S* f# {. P6 `& G% m t: Some(t),: h2 I+ o6 h5 J4 K7 G
}7 _ x. b( Z- l* J
}& i; b0 q8 ]: {8 Y: |7 I; s$ ~+ {/ |
} X/ L3 z2 E9 G L7 I3 z0 a$ l
9 x C- ~; X$ K! Xpub struct Pool {
( ~/ L/ A& q9 m workers: Vec<Worker>,: g0 r4 `/ S9 B
max_workers: usize,& c1 `3 K8 A' T" v5 c/ h' B2 w
sender: mpsc::Sender<Message>' O! I* P5 c/ z2 g+ j
} J' A" N/ i) j/ l5 M% D
4 @. A( i, I0 B$ o% M0 ximpl Pool where {6 \4 ], d% _! B( O3 j
pub fn new(max_workers: usize) -> Pool {! t+ r a. P8 W# ^, F: r
if max_workers == 0 {
+ N+ h: s' q- Q; p panic!("max_workers must be greater than zero!")
' k. K& p |" ]: o }
J3 p) [1 _8 P8 I$ K3 `0 I" b let (tx, rx) = mpsc::channel();# D& A( ?4 Y" I! l% b d
4 n% c$ d0 X/ Z. d5 V m
let mut workers = Vec::with_capacity(max_workers);& J1 _6 C! |. [. N+ n+ c
let receiver = Arc::new(Mutex::new(rx));6 ?; C; e8 T- T3 E8 T
for i in 0..max_workers {
0 i2 A+ ?+ y7 W" ^8 Q workers.push(Worker::new(i, Arc::clone(&receiver)));' B2 C) D; \ B% d/ H" g
}
! x# A+ l% J; ?! o
1 B( @4 C) q; s# ^5 E' @, @2 [ \) t Pool { workers: workers, max_workers: max_workers, sender: tx }
2 I3 c+ E0 m3 T! X }, @/ p( e' r9 U) x
7 w% D, f \! W9 R1 p" O
pub fn execute<F>(&self, f:F) where F: FnOnce() + 'static + Send/ G2 M: n* D1 O! j
{
; I6 F( y8 e1 b9 ]* H1 G
6 ?' E8 q' o; Z3 O let job = Message::NewJob(Box::new(f));0 P8 i+ s5 R; E6 B U6 T4 j9 y
self.sender.send(job).unwrap();/ C# D( \9 H' f
}% t8 }2 U* Q7 K p
}
9 x) a5 W. |* M( J& Y; w$ G, N: l6 D& |. I- m) R
impl Drop for Pool {
( q2 ~% ^# F4 B fn drop(&mut self) {' x" t3 I& v$ c/ @& E
for _ in 0..self.max_workers {
2 W, l$ t$ h: E self.sender.send(Message::ByeBye).unwrap();* {; y& q. i/ |1 N
}& F! C) X6 l2 E. y! [4 k o
for w in self.workers {8 c& i& f+ _/ @3 _$ V, n
if let Some(t) = w.t.take() {, _+ f7 @9 J% [
t.join().unwrap();
3 W. r& b+ h: H: @; X8 ]" K! ~ }
0 ?+ C: R: S6 }7 r }, h+ \$ u) f, ^4 Z- J' u0 J
}/ T% ]4 X+ D. @
}
1 p" U: `+ F- n0 W! [* ^- B4 H: n0 |6 ^: R+ I7 w3 ^% S
2 D" f" p: J! [8 l9 a+ b#[cfg(test)]
+ ~; ^5 `4 \; r9 N7 Zmod tests {0 r+ c; b' a! a: X p: m3 ~
use super::*;" A7 o* k( [8 C' Y' E. W: {* o" Q
#[test]
$ `7 Y: O! F/ b: Y& R% q( w; @ fn it_works() {
0 [7 }7 b0 F U+ Z# i8 n let p = Pool::new(4);
, w+ X# p* Z1 y: O3 d$ p% t: J p.execute(|| println!("do new job1"));
+ b$ Q* E2 x! L/ Q p.execute(|| println!("do new job2"));* G. Q z/ _% g: M
p.execute(|| println!("do new job3"));
9 J! ]2 [! B( L' X p.execute(|| println!("do new job4"));: Z+ l& G/ @: w1 d/ W# V
} ?2 F0 M1 J+ N( W6 y, H
}
& `6 z! h$ x) N+ B</code></pre>
% f& [2 A/ W1 g4 A2 h& @( B/ K$ r6 {+ E$ v" ]7 \/ l8 N
|
|