|
|
/ J. T; H/ r/ j0 _' B/ Q
<h1 id="如何实现一个线程池">如何实现一个线程池</h1>
1 u, O. R2 i. C<p>线程池:一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。 例如,对于计算密集型任务,线程数一般取cpu数量+2比较合适,线程数过多会导致额外的线程切换开销。</p>
6 P: Y; p. l! W" _ n* O3 j+ r<p>如何定义线程池Pool呢,首先最大线程数量肯定要作为线程池的一个属性,并且在new Pool时创建指定的线程。</p>
h$ @* T" T) v0 s7 g* v<p>线程池Pool</p>
& F2 v) z4 p! j7 @$ j8 |2 W<pre><code>pub struct Pool {& P# Y9 I1 D& ]0 J, \% q& C3 r$ v
max_workers: usize, // 定义最大线程数
2 a: c! s% O! Y) r}
" G+ o3 O7 w2 _, V- A
' z% R- Y; g2 k+ `. d' Wimpl Pool {
$ K' T. ~) q" p fn new(max_workers: usize) -> Pool {}
0 |" z+ q8 X" J% V# _7 G M% D0 A fn execute<F>(&self, f:F) where F: FnOnce() + 'static + Send {}- o6 h! w n+ O
}
( P* l2 O" S7 Z& z: R# @7 H5 w1 ^+ R* K* y3 S. ]
</code></pre>
0 d0 ?- \" g' C<p>用<code>execute</code>来执行任务,<code>F: FnOnce() + 'static + Send</code> 是使用thread::spawn线程执行需要满足的trait, 代表F是一个能在线程里执行的闭包函数。</p>
# {- S# ]: t& [9 k. P% c4 y8 i<p>另一点自然而然会想到在Pool添加一个线程数组, 这个线程数组就是用来执行任务的。比如<code>Vec<Thread></code> balabala。这里的线程是活的,是一个个不断接受任务然后执行的实体。<br>
/ H/ Y- {0 K; L可以看作在一个线程里不断执行获取任务并执行的Worker。</p>* S0 {; Z D- [- a, v5 x( K
<pre><code>struct Worker where3 P1 E7 U' j! F8 T6 |2 y$ x! |
{
0 n' p. l; n, X _id: usize, // worker 编号
4 ^6 K+ o: I$ v: M/ V9 \}% q+ U" ?5 h: S b1 Y7 h, @
</code></pre>
- q' y/ s1 Z2 J1 M1 g; G<p>要怎么把任务发送给Worker执行呢?mpsc(multi producer single consumer) 多生产者单消费者可以满足我们的需求,<code>let (tx, rx) = mpsc::channel()</code> 可以获取到一对发送端和接收端。<br>
! x) O! p9 h4 C% }0 j把发送端添加到Pool里面,把接收端添加到Worker里面。Pool通过channel将任务发送给多个worker消费执行。</p>1 G( @0 _5 d/ B) X' K4 H" [" K
<p><strong>这里有一点需要特别注意,channel的接收端receiver需要安全的在多个线程间共享</strong>,因此需要用<code>Arc<Mutex::<T>></code>来包裹起来,也就是用锁来解决并发冲突。</p>
% q& n% X3 v1 g q# ]$ D# S. v<p>Pool的完整定义</p>
+ A* f' \7 A* X6 W) q) P<pre><code>pub struct Pool {. q" v3 Z4 ~9 @, K+ c
workers: Vec<Worker>,3 J* E5 }. f8 l( y1 A1 |
max_workers: usize,
& ]4 ?; i& H7 N) b9 K+ x2 b sender: mpsc::Sender<Message>
0 g1 c3 M$ }; Z5 O}+ @( V& o- L$ i' T- l2 Q
</code></pre>
, A1 J* c$ X& h: G3 { C' O<p>该是时候定义我们要发给Worker的消息Message了<br>
: q; h1 |5 K6 G$ N9 u定义如下的枚举值</p>
; y A" T( \! U<pre><code>type Job = Box<dyn FnOnce() + 'static + Send>;; o+ ^+ f4 \4 {* z/ F! g
enum Message {
1 K6 z; O" [; k* U* Z ByeBye,& l3 }( m: e, d1 D
NewJob(Job),. U! z, q1 |' D0 p
}
: B0 J2 E& R$ M% k1 F. l1 F3 C0 r</code></pre>
9 ~) P1 v* e, }, D9 A. Y/ `9 p<p>Job是一个要发送给Worker执行的闭包函数,这里ByeBye用来通知Worker可以终止当前的执行,退出线程。</p>
2 j* i+ W1 j+ F$ a; D6 Z+ v; q<p>只剩下实现Worker和Pool的具体逻辑了。</p>* R8 x1 a' o( z2 k: {" ?& t# C
<p>Worker的实现</p>
# V" I; T" q+ Y) J3 d6 ^<pre><code>impl Worker
6 F: ~! h7 i: v4 v! Y6 D{6 ~# g" v* b T+ J
fn new(id: usize, receiver: Arc::<Mutex<mpsc::Receiver<Message>>>) -> Worker {; L) B2 K1 w6 B T
let t = thread::spawn( move || {
. f d- C0 B$ m- l& ^ loop {
6 t4 K4 m! P, g% b9 i let receiver = receiver.lock().unwrap();
5 y- h$ C* C% P3 I* r let message= receiver.recv().unwrap();
6 K1 r) ^' Q9 A) Y$ F$ T match message {
# Q' j4 E/ W3 Z0 M" g Message::NewJob(job) => {& D4 K: O0 x6 Z U) f6 B
println!("do job from worker[{}]", id);
* y! K$ Z/ O x) s9 C B job();5 B6 W% _9 F! y: C, T$ ~# C
},8 l5 S k p) L
Message::ByeBye => {
/ p5 Y3 u/ N8 W w( l0 ?7 [+ _ println!("ByeBye from worker[{}]", id);: d+ W# A8 w- p
break( s6 ]" I0 X3 R; o0 i) {$ L
},$ O; p7 h7 @+ X t
}
" M2 c+ h6 _4 j& ~- l k }
/ a% C$ M, E. i7 I! B: g });
e& ?: L4 X; E, c2 M
1 h u* e5 S8 d! b9 F# q6 B9 [7 l Worker {- j9 M( W. H+ `6 d
_id: id,
4 w3 r) p' U3 |% k t: Some(t),
% n, A; J- \2 N }
& w! j6 A- H% u) d }0 V9 o8 i! E: y8 O* p% }
}8 F$ D; K) H' g
</code></pre>& z5 ?3 u/ \1 u4 g" |
<p><strong>let message = receiver.lock().unwrap().recv().unwrap();</strong> 这里获取锁后从receiver获取到消息体,然后let message结束后rust的生命周期会自动释放掉锁。<br>
4 r( c% S2 _) ^! o( J但如果写成</p>
8 f. y3 N* {* ^<pre><code>while let message = receiver.lock().unwrap().recv().unwrap() {$ S: a+ H9 X/ ?. Q; A
};% _* j. O# h( F5 g
</code></pre>2 R6 H, B! U$ k, U. U; D# V" v
<p>while let 后面整个括号都是一个作用域,要在这个作用域结束后,锁才会释放,比上面let message要锁定久时间。<br>
- S. V9 g/ L5 J5 arust的mutex锁没有对应的unlock方法,由mutex的生命周期管理。</p>
/ H# L! H# j A# H/ ]<p>我们给Pool实现<code>Drop</code> trait, 让Pool被销毁时,自动暂停掉worker线程的执行。</p>
% I z/ }8 K( ^9 F7 z<pre><code>impl Drop for Pool {
9 O( D( k" i5 }! h fn drop(&mut self) {
) @2 P% |6 r! y for _ in 0..self.max_workers {0 s5 B2 e4 ]/ B0 B8 T* U4 L
self.sender.send(Message::ByeBye).unwrap();
6 X+ Z9 Q) a; s1 w }
+ {, l6 u9 o% l7 ` for w in self.workers.iter_mut() { [0 ^9 T* ?) Z6 @
if let Some(t) = w.t.take() {8 h: @' w2 W, u& h- p6 H1 j
t.join().unwrap();, g6 p- C/ _- z5 m v2 w
}
: }! b$ i6 n# _$ O4 s f, s7 L }
; d" A- U6 [& e" R: L/ d }
0 x$ V2 `9 K- V# T}' b* L0 Z8 A5 R5 X- N, d2 n
6 T5 i; [. s: U
</code></pre>
5 D+ E& ]6 g+ b; D6 ]" \<p><strong>drop方法里面用了两个循环</strong>,而不是在一个循环里做完两件事?</p>- Z' G0 }2 X) q$ d5 a4 \
<pre><code>for w in self.workers.iter_mut() {
- ?; A3 J; Z* _: k) f) I! O if let Some(t) = w.t.take() {/ o1 W4 {6 u3 O# W1 d1 |
self.sender.send(Message::ByeBye).unwrap();/ c" t. x7 }7 d: @& \0 \1 z9 l! L
t.join().unwrap();% ?+ J- a6 b$ ~) S, ^
}6 Y" R* x$ g+ n4 _1 b
}( ?& G$ N% D) W6 u9 U1 l& ?
6 q& L i( m2 l! C, L</code></pre>
7 ~5 z5 \. m- x% P) R<p>这里面隐藏了一个会造成死锁的陷阱,比如两个Worker, 在单个循环里面迭代所有Worker,再将终止信息发送给通道后,直接调用join,<br>
: {; Z% e( f) H' f我们预期是第一个worker要收到消息,并且等他执行完。当情况可能是第二个worker获取到了消息,第一个worker没有获取到,那接下来的join就会阻塞造成死锁。</p>: O; u3 d9 f2 n' ]
<p><strong>注意到没有,Worker是被包装在Option内的</strong>,这里有两个点需要注意</p>% u& i5 D2 }6 X2 ?, K
<ol>" u& G5 ]6 `0 Z1 i0 C% N1 |
<li>t.join 需要持有t的所有权</li>
3 M1 o/ D# x2 ?8 p+ G5 d. _% Q+ j<li>在我们这种情况下,self.workers只能作为引用被for循环迭代。</li>5 k6 T. V4 J5 v! f; M
</ol>
- h5 f- X5 A' K3 q& D2 k, }/ a g<p>这里考虑让Worker持有<code>Option<JoinHandle<()>></code>,后续可以通过在Option上调用take方法将Some变体的值移出来,并在原来的位置留下None变体。<br>% s; |7 c: e1 a6 A+ g
换而言之,让运行中的worker持有Some的变体,清理worker时,可以使用None替换掉Some,从而让Worker失去可以运行的线程</p>
; X; }, u( J5 u6 z<pre><code>struct Worker where" t; V" e- y7 Z
{
9 Y" o' |$ {7 i8 J) E9 y2 K _id: usize,/ p2 R: e4 I( {4 G6 m
t: Option<JoinHandle<()>>,8 X7 c: {: t0 X# U9 \. s: b
}7 g- G' Z, _, }# B" r
</code></pre>
4 e; w/ S+ o( j- d" o; g& G<h1 id="要点总结">要点总结</h1># U m' c. W8 R9 @" f( q6 M
<ul># I/ l6 L* I& z% I! K5 v
<li>Mutex依赖于生命周期管理锁的释放,使用的时候需要注意是否逾期持有锁</li> |& w2 _" r H+ D$ P, W
<li><code>Vec<Option<T>></code> 可以解决某些情况下需要T所有权的场景</li>+ H7 ?! F: `) v' l0 q
</ul>+ _/ b# k( {5 t9 u: |# W
<h1 id="完整代码">完整代码</h1>
' n" l. o, D7 R# J2 `6 F# o( ?* W<pre><code>use std::thread::{self, JoinHandle};3 T7 j* j0 _3 Y) X- q9 D+ q) a
use std::sync::{Arc, mpsc, Mutex};# u- m3 g- ?6 Q! R4 `; e! h
( d9 R0 [4 i( K) L7 y% p
8 A8 X' f8 A% c/ M( S+ ?
type Job = Box<dyn FnOnce() + 'static + Send>;
9 `) w8 T, [5 T8 [) zenum Message {4 X) E/ N+ | V
ByeBye,& H6 s4 l* z; U, p1 |
NewJob(Job),4 u. A( U8 S: j3 ]1 h& J
}
2 @+ h- O7 J- ~: z: C
, q( e5 U5 Z& b- |9 d& l' W0 d9 \& K) Istruct Worker where# @9 P# J3 b4 a- x
{% v+ R, z) }# C) ?; Z
_id: usize,
7 e% P2 J8 C1 f t: Option<JoinHandle<()>>,% P! j3 _' T6 } F/ j
}
2 e) ~/ [" `, G) v x/ e& O/ ?; b3 {
impl Worker( d# H3 _ O* J u$ i0 U n& S1 q
{
* ?( h; p8 Z7 E/ `9 g fn new(id: usize, receiver: Arc::<Mutex<mpsc::Receiver<Message>>>) -> Worker {' M/ w7 ~/ @: W' {- y3 c, f4 j6 S" E
let t = thread::spawn( move || {
! f. z% T: z; L, W loop {- t6 `5 I7 d6 G- v% u. B/ I' v
let message = receiver.lock().unwrap().recv().unwrap();; ]( N# |' q7 w1 I& V
match message {5 v( Q% \ }6 v) N7 i
Message::NewJob(job) => {: ?6 h/ _# n% O" I) n
println!("do job from worker[{}]", id);. o% W, s6 Y. N2 r
job();
5 H$ T% G" N5 Q, f( L, a },8 }2 j$ x: W1 p" o1 h J w
Message::ByeBye => {7 p9 U9 f6 P) i. E
println!("ByeBye from worker[{}]", id);
: g5 |4 j1 H9 N$ ?+ Q) L break7 p( l8 J8 s+ w4 E9 s U
},
- n2 F7 w% k8 S: F2 q* v }
, Y/ Z6 }* z0 C, N8 Y* x }9 Y9 p! ~" d+ u( _$ R8 N
});6 ?# x8 b6 V- z5 Z2 G. \8 I3 K) k
3 q' q( {* W: P/ y
Worker {7 ~, M& L2 |! W/ j5 f! c
_id: id,
) Y% f0 y7 f9 ?. t t: Some(t),
8 @, }4 t3 n! x5 J- D$ `/ Z }
6 v8 m. A" R- S( u }
( ^3 w( q1 v- \: r' i}
# g; G7 d, n! k9 q' I2 ?
( B( ], M& o- a' ~ Mpub struct Pool { j( |' m- I0 L5 n
workers: Vec<Worker>,
. d7 u' M; \' \$ R: \ max_workers: usize,
& I! [9 F* d' L0 C |& b sender: mpsc::Sender<Message>% W- |2 |5 W/ U" b$ M2 u$ E4 [, v
}" u0 |% C, E$ C7 x
. C% b* [$ ^) l" \
impl Pool where {
5 u5 n8 V: b0 c; N5 N G pub fn new(max_workers: usize) -> Pool {3 }3 k. O5 L$ V" Z& u* C, w- h6 D+ \
if max_workers == 0 {
/ T: A" H: l3 g0 r) o* ] panic!("max_workers must be greater than zero!")& m7 p- A. h2 s# q! @6 [2 [
}7 u5 Z. k! n' ]" [# q9 _
let (tx, rx) = mpsc::channel();
8 ?; \/ v; c- L* K8 [2 p, L4 @) S% f: O3 p
let mut workers = Vec::with_capacity(max_workers);
% q9 d/ C! O) A1 E, }/ e let receiver = Arc::new(Mutex::new(rx));
7 h, ?# _+ `* Q& B0 Y- D for i in 0..max_workers {
* i# l5 Y" F7 | workers.push(Worker::new(i, Arc::clone(&receiver)));
- c- A0 D7 Z$ ?1 l( O* g }
7 t( X6 F" U; u5 z2 s1 ]) f5 O4 d! M* O& s
Pool { workers: workers, max_workers: max_workers, sender: tx }
8 Z7 @/ x5 v4 y' ~ }
5 z2 j- N4 L( S4 v6 l4 C1 z9 N% Y # ?/ \# a% U7 I
pub fn execute<F>(&self, f:F) where F: FnOnce() + 'static + Send
+ W- X) j" G) ?7 E% x- B {
# ~. R, |. q& q% f0 ~2 i+ l5 F* ?7 k4 |8 D5 X* s
let job = Message::NewJob(Box::new(f));
+ |6 c4 \& A8 c# K6 E self.sender.send(job).unwrap();0 Y A: Y; J6 G2 [* Y4 n9 ]; C
}
" [" B0 Z5 V! h# I) F$ O+ K, B, w}' ?5 f) o, a, r9 u" K5 [
9 ^! Q: W3 H; L0 ]( Qimpl Drop for Pool {
) n4 v7 @* m' _( J+ t fn drop(&mut self) {8 W( }$ j1 J0 i' b9 e7 ?9 ]" B2 \
for _ in 0..self.max_workers {
: X5 K' ~8 ^ _3 y, b6 P; x self.sender.send(Message::ByeBye).unwrap();* |, l" I- i# J* a: K* g
}) p" K6 S" t0 p9 z
for w in self.workers { f% r" H; V; @+ `
if let Some(t) = w.t.take() {' i1 q1 h0 E- Q& r2 ?& b# f, {% y% o6 ?7 N
t.join().unwrap();4 l8 B J6 C1 g3 s
}9 Y u8 D; ~4 Y
}8 I6 Z, h7 B5 u5 R* a/ g
}
0 C6 r6 H* b2 f/ z# E}
0 S4 m. c' \+ { d' V0 l4 W- r! D5 u) C0 m0 E4 @ j6 i9 G8 X- v
' y# G" |3 p# k: L#[cfg(test)]' {- Z$ Q% o* E. J$ O' _- H
mod tests {
3 C; f2 j5 O6 ]% n6 a: g use super::*;
+ A8 s" T' U9 {6 |/ V+ z #[test]% j- R/ j8 Z5 I0 ]0 n6 N
fn it_works() { _0 B* n1 C- W/ B5 S/ [
let p = Pool::new(4);% ]: J0 j2 [0 A* i3 o
p.execute(|| println!("do new job1"));
: A. ^& S+ W3 H; _) R$ Z p.execute(|| println!("do new job2"));
: e, t0 H' L& {1 o& t p.execute(|| println!("do new job3"));8 W4 w5 n0 P/ E: d' C6 O
p.execute(|| println!("do new job4"));" S5 {! I: t: E
}: i8 J2 L. @2 M' _
}1 v6 c; _0 S" Z& [* |9 t; X
</code></pre>7 W! c! g& } _9 ~) `3 `( g
. M2 K3 M; c- a2 L
|
|