|
3 g5 E ^% F! O$ W$ Y+ K
<h1 id="如何实现一个线程池">如何实现一个线程池</h1>1 r' Y& I$ k+ n1 O2 x3 u
<p>线程池:一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。 例如,对于计算密集型任务,线程数一般取cpu数量+2比较合适,线程数过多会导致额外的线程切换开销。</p>4 ]0 I) J7 t! \7 J: A
<p>如何定义线程池Pool呢,首先最大线程数量肯定要作为线程池的一个属性,并且在new Pool时创建指定的线程。</p>7 s: r- h6 {/ E( ?6 h
<p>线程池Pool</p>
2 G2 e6 g5 b9 A) k ^4 C/ o; O3 @<pre><code>pub struct Pool {' Y e+ ?- f. c4 u$ z; o" ~% @
max_workers: usize, // 定义最大线程数
5 {( L5 [, S5 v, }}1 q( R) H9 s! \2 n- G! [1 g7 k
* ~5 s$ N4 a$ `) wimpl Pool {8 \& `% _/ }% s# R8 C/ G
fn new(max_workers: usize) -> Pool {} e. J9 b5 l. T2 d$ ?3 m% h) d
fn execute<F>(&self, f:F) where F: FnOnce() + 'static + Send {}
6 g( P9 G( `9 L* H9 O o) X4 G}
. |7 l! g- h5 W/ i5 L/ ^5 \
2 S8 D5 D# e4 Z) b</code></pre>
+ s; n7 w8 J& s6 u: Z; N8 K: }<p>用<code>execute</code>来执行任务,<code>F: FnOnce() + 'static + Send</code> 是使用thread::spawn线程执行需要满足的trait, 代表F是一个能在线程里执行的闭包函数。</p>
. o- i0 \- }' p0 w$ T }1 S g<p>另一点自然而然会想到在Pool添加一个线程数组, 这个线程数组就是用来执行任务的。比如<code>Vec<Thread></code> balabala。这里的线程是活的,是一个个不断接受任务然后执行的实体。<br>; o7 a+ \% b$ e4 ?" d: U
可以看作在一个线程里不断执行获取任务并执行的Worker。</p>; _* `2 X( b$ x
<pre><code>struct Worker where
5 E$ j: H' c" p3 n. a{
" F O* g5 R# U- D q _id: usize, // worker 编号
4 |3 ?" [! v' k M* E" n}3 k& o& N i/ ]" x; z
</code></pre>
7 l# K3 `) ]" d; X<p>要怎么把任务发送给Worker执行呢?mpsc(multi producer single consumer) 多生产者单消费者可以满足我们的需求,<code>let (tx, rx) = mpsc::channel()</code> 可以获取到一对发送端和接收端。<br>
& n' o: m! q0 [8 H T, S; {7 ~把发送端添加到Pool里面,把接收端添加到Worker里面。Pool通过channel将任务发送给多个worker消费执行。</p>
( O2 H: ]8 F2 g<p><strong>这里有一点需要特别注意,channel的接收端receiver需要安全的在多个线程间共享</strong>,因此需要用<code>Arc<Mutex::<T>></code>来包裹起来,也就是用锁来解决并发冲突。</p>& Y/ G/ p4 g8 }8 ^! u- S+ E
<p>Pool的完整定义</p>7 s- J% f$ M5 e
<pre><code>pub struct Pool {
1 w& T Y# s% l3 `) R' C& u9 o2 u workers: Vec<Worker>,
$ F. T7 m. H; t/ D c max_workers: usize,
/ O5 T5 M6 R8 T( n }& y# I sender: mpsc::Sender<Message>
3 U0 |- O: L+ [8 z3 w" O6 z: n}
2 E R( D# Y! q+ z# C- V</code></pre>2 S9 k7 W& j) _ P
<p>该是时候定义我们要发给Worker的消息Message了<br>0 |1 Q/ O$ T" x6 F: H
定义如下的枚举值</p>+ Q% L8 O, e! K' U- v
<pre><code>type Job = Box<dyn FnOnce() + 'static + Send>;5 L7 b( ]5 n _: G0 n9 {+ I( \' q
enum Message {5 V4 a/ F( p7 D, a# Q
ByeBye,! K# S' r0 H _- x1 Q4 g. e0 Y+ v
NewJob(Job),% Q- e' a, q7 G4 J6 F, z
}. ?( R9 N; B, b. I0 }; Y' }. }
</code></pre>) t1 F! \4 |. w+ w" S" T; }; S
<p>Job是一个要发送给Worker执行的闭包函数,这里ByeBye用来通知Worker可以终止当前的执行,退出线程。</p>/ g5 Q) t P/ N# l3 C
<p>只剩下实现Worker和Pool的具体逻辑了。</p>- ^- c4 m. h# K7 s/ j
<p>Worker的实现</p>
' s. R+ r- o$ U<pre><code>impl Worker
1 Z/ F& z( _% d{3 R+ y: l( C( k4 F: A/ n
fn new(id: usize, receiver: Arc::<Mutex<mpsc::Receiver<Message>>>) -> Worker {# j( N( a+ T3 l( a; b/ s+ O
let t = thread::spawn( move || {5 P( E6 Q r! x
loop {& S% ^5 A; b# [+ H. v% ?
let receiver = receiver.lock().unwrap();) B' ]! @6 C; e
let message= receiver.recv().unwrap();$ e" B) ?, J6 n& Q W
match message {) G. O" o# }/ `7 m$ ^% P" i
Message::NewJob(job) => {4 D& }2 w5 j$ Y
println!("do job from worker[{}]", id); f/ Q6 R; K T( _: i
job();: g6 K8 C" R5 F( x1 ?
},9 |; ~" W: }9 b" ]* S7 |0 o$ V
Message::ByeBye => {: m: k7 I# ?# a1 \& S5 T$ l
println!("ByeBye from worker[{}]", id);# w, d8 J# G, {& O) U& K
break% o1 U P* u2 R$ b
},( A9 L% v, B6 M$ o
} ' R6 Y6 ~" q- k# w
}
- y3 G1 V4 |% ]) G8 N) o });
' m# N6 I$ D1 g1 v2 t: p. i) F. s7 D: _* B, R1 V/ i
Worker {9 b. H' G* y0 ]7 K5 M/ p# M
_id: id,4 Y- l2 i8 | d
t: Some(t),( B( D: l' R# n) H
}% W) A `" e3 L
}
# d, H3 M" t# n/ f! S2 w}
3 z. [. P1 Z3 `7 g% y5 T</code></pre>
5 C; I n9 P. F- ^; ?4 s" C" I<p><strong>let message = receiver.lock().unwrap().recv().unwrap();</strong> 这里获取锁后从receiver获取到消息体,然后let message结束后rust的生命周期会自动释放掉锁。<br>
: O1 \0 b& u1 O- a+ u: q' u但如果写成</p>$ t, y+ r8 G; ~
<pre><code>while let message = receiver.lock().unwrap().recv().unwrap() {
4 ~- c! P2 e5 |/ O};
- P( n# c5 A! w: U! I! v</code></pre>" M) [0 H' x+ X: u
<p>while let 后面整个括号都是一个作用域,要在这个作用域结束后,锁才会释放,比上面let message要锁定久时间。<br>
( N2 G$ S- J2 R* Wrust的mutex锁没有对应的unlock方法,由mutex的生命周期管理。</p>
0 N) ]3 C, W1 z3 `( \7 ~- P! K, s% V<p>我们给Pool实现<code>Drop</code> trait, 让Pool被销毁时,自动暂停掉worker线程的执行。</p>% ]) U$ n8 N+ W3 M2 k
<pre><code>impl Drop for Pool {
9 H- n; X4 k) B* e) F7 X' ] fn drop(&mut self) {
; L5 _7 t# S1 T% k- g for _ in 0..self.max_workers {$ h U& I+ V I$ \1 Q, C
self.sender.send(Message::ByeBye).unwrap();9 ^3 t. |0 m2 s, `" S6 I2 E
}
* ?- {6 L- X% J) d$ G( f for w in self.workers.iter_mut() {! J: C$ T% M; q! h
if let Some(t) = w.t.take() {! W5 B- M& ?3 s3 Z, O! {, a1 V
t.join().unwrap();
1 J* B1 I. P. k: H% K2 g }" O- L. m6 p F/ c' y
}4 ?" \) a' i! I: R2 |
}1 P1 x9 O; h5 h% B; M
}
- J( n1 u& X a$ C9 B: n
) w5 X& g+ S1 I4 X4 k</code></pre>; E* {2 E. }' H7 ?0 ?) ^
<p><strong>drop方法里面用了两个循环</strong>,而不是在一个循环里做完两件事?</p>
( G8 b2 ^- J% a J" h8 v; ^% W$ V<pre><code>for w in self.workers.iter_mut() {
" ~: ~% J& ~( g if let Some(t) = w.t.take() {
5 b! | q0 x' L9 b self.sender.send(Message::ByeBye).unwrap();" @1 H5 i* X4 I! l6 K! x7 x7 v0 B( ^
t.join().unwrap();
7 S' P/ Q0 ?. R }. N a. T. k6 C l: v& x* D
}
2 R& d2 i( e& J! z8 m9 U
% h ?' u" B' k3 K</code></pre>$ v3 s$ p* x1 I$ A
<p>这里面隐藏了一个会造成死锁的陷阱,比如两个Worker, 在单个循环里面迭代所有Worker,再将终止信息发送给通道后,直接调用join,<br>$ o( u7 k) N* p! G! h
我们预期是第一个worker要收到消息,并且等他执行完。当情况可能是第二个worker获取到了消息,第一个worker没有获取到,那接下来的join就会阻塞造成死锁。</p>
6 N7 z# v n" P' Q; D1 O+ Q$ B<p><strong>注意到没有,Worker是被包装在Option内的</strong>,这里有两个点需要注意</p>7 q( @2 k6 X9 Y" ~7 x) u
<ol>
o& O" f( r* | a" Z<li>t.join 需要持有t的所有权</li>
& X {- n9 P g<li>在我们这种情况下,self.workers只能作为引用被for循环迭代。</li>' g9 L% e; {( Z+ s7 j: y# b) i
</ol>
+ N7 ?8 | A0 W<p>这里考虑让Worker持有<code>Option<JoinHandle<()>></code>,后续可以通过在Option上调用take方法将Some变体的值移出来,并在原来的位置留下None变体。<br>0 s9 j7 m% x( m$ B: g4 M, Z8 C
换而言之,让运行中的worker持有Some的变体,清理worker时,可以使用None替换掉Some,从而让Worker失去可以运行的线程</p>
% s2 X& y$ o: V, S1 |4 k8 p<pre><code>struct Worker where7 K5 B$ ~" Q# ?8 [
{
5 i$ }' `, L8 v/ t8 B _id: usize,
8 F9 @# F* r" \8 ^% {) K5 _ t: Option<JoinHandle<()>>, t. \4 b1 w; v$ m- u0 g+ Y
}1 m- a V5 Y: H" Q" ~ D+ ]
</code></pre>& y" }7 ?4 R4 [, ~9 D4 ~9 ^; K
<h1 id="要点总结">要点总结</h1># g6 S6 I+ ], D" A/ L" {
<ul>9 E' @; H; U5 h- w! ?- k* j0 C
<li>Mutex依赖于生命周期管理锁的释放,使用的时候需要注意是否逾期持有锁</li>
: `! R/ Z% P ?<li><code>Vec<Option<T>></code> 可以解决某些情况下需要T所有权的场景</li>9 Z0 S" S2 l0 L. z. M' A
</ul>
! u/ ~8 |% M) a, S9 ~6 ^<h1 id="完整代码">完整代码</h1>
y1 P: f d2 k2 M<pre><code>use std::thread::{self, JoinHandle};; L4 ^+ ~3 ], T7 @% h) X8 ?
use std::sync::{Arc, mpsc, Mutex};
, T$ z" J' C" p6 d% p" O2 ]
, K7 l7 e' C. k- _7 k7 j; V/ D* R
type Job = Box<dyn FnOnce() + 'static + Send>;
; S2 e0 |8 v+ }! Qenum Message {+ z5 L! o: b) p* i( O g/ \
ByeBye,) z; A6 g1 N& Q1 \( Q/ J
NewJob(Job),
! o' Z4 {% v3 o0 q3 _}( ^4 f' G) p* h5 z8 @; r
* {. E1 I5 a& w1 D9 d1 [2 K8 p6 ~ R3 gstruct Worker where
( F W& y8 k& K* g) g+ L{
/ M: H+ ^% j7 t; d$ V _id: usize,
8 F, {. e/ Y- j" ^ t: Option<JoinHandle<()>>,( v- G# p2 h0 u, i, b
}3 c; m& U& G$ L1 P
, c) O: D$ F; B; w/ q" ~2 kimpl Worker
4 _# E+ N# f8 b( C. N{( o6 y1 V* Y- `! q2 M5 O \. s
fn new(id: usize, receiver: Arc::<Mutex<mpsc::Receiver<Message>>>) -> Worker {
6 J$ O' N9 z3 E let t = thread::spawn( move || {
. R0 }+ L$ T. @4 ` loop {
4 O; y/ d2 V3 ?, q# P let message = receiver.lock().unwrap().recv().unwrap();
. ]( j1 o% A- C U match message {1 y+ {. ?! }+ b' ]- h9 ~- B1 _
Message::NewJob(job) => {; Z. k1 C8 F! u" S L/ d
println!("do job from worker[{}]", id);0 \& G" U; r0 m1 \ H9 L
job();
2 I/ u& i" d9 _) F& G: b },
+ c, \: C9 [6 {5 I* v A Message::ByeBye => {7 }, h1 B0 G- e
println!("ByeBye from worker[{}]", id);; A) @) }! U/ A/ q$ E) l' _
break
' m% |% H1 w! W% P, g. c4 y' [ },
# E$ s% {" g$ W } / {: e, j& M0 u5 R1 N0 `; u
}
6 F# v! w- ]: K, W/ ]! I; }+ }$ [ });
9 z$ k' m& n1 v- g$ Y6 a- y' F7 ?, G f6 A+ ^0 D
Worker {
5 `; q7 m, c" I% V- D* m4 @0 W _id: id,, Q/ [% g L, H! q+ v9 b
t: Some(t),
* M* ^2 b& V; _* n3 Z! l }
: _; e# c" u q }
) ]1 h7 H7 x3 k2 ~3 N/ m' @}9 t. H+ L- e' w# s) U6 n# Y
' N" J6 T C& E9 L" K! J* m
pub struct Pool {( o# S, C3 [$ K% f& P+ ~
workers: Vec<Worker>,
9 \& f3 f4 n X max_workers: usize,# }# [& z0 k9 h
sender: mpsc::Sender<Message>) Y- `6 g, ^1 e8 C# P2 A, P' }
}
! {& P5 x7 X l J, o+ ^ B8 P
5 c1 |/ F6 w, j0 B# Q' t, aimpl Pool where {
9 A) h3 o, O P- d# |4 X pub fn new(max_workers: usize) -> Pool {1 {) g4 I. R# M: J6 z- A. j4 E
if max_workers == 0 {; h: X+ K9 I2 b& j& v$ O
panic!("max_workers must be greater than zero!")
! g" F: l, a9 h3 Z f3 A }2 d( R0 H% q% y3 d( u) f \- a: j
let (tx, rx) = mpsc::channel();
; _6 a( _% O6 E* S: V+ l% {: Q# x }5 z4 f+ h j# c2 }, Q4 c3 D
let mut workers = Vec::with_capacity(max_workers);
" f( v7 ~9 ^2 G3 n T3 f let receiver = Arc::new(Mutex::new(rx));0 C* w* v$ K; L( I
for i in 0..max_workers {
2 v( J) e$ S( H2 d2 l) Q3 c workers.push(Worker::new(i, Arc::clone(&receiver)));; I. X/ t4 V( j8 c" C0 p* y- K
}+ J" u5 ^3 L p
/ z6 \5 j$ p | Pool { workers: workers, max_workers: max_workers, sender: tx }. U' g, j: `) V. f
}: F' e" y& y6 v3 G. g9 Q
+ z; F5 S0 Z. c% |' k2 i1 m
pub fn execute<F>(&self, f:F) where F: FnOnce() + 'static + Send
0 l4 Y# F. E( x8 H {
9 ?- ?+ W, d! B0 p3 f* t- S5 y+ P2 v, J g* i
let job = Message::NewJob(Box::new(f));( r- E4 g7 J* c& y
self.sender.send(job).unwrap();6 X0 L7 c5 l' F* E. E/ q3 x
}
+ W1 U5 P9 Y& [: t}
- e d$ C! J+ R b5 R5 M G) O# P$ i+ _* Q7 x$ Z
impl Drop for Pool {
( P' |4 k0 ^: j" H% ` fn drop(&mut self) {
; u* n+ D7 \3 Q) y ~, J for _ in 0..self.max_workers {
- A7 ]9 y f. j, A& C self.sender.send(Message::ByeBye).unwrap();: B: x; J2 c3 y; ~; K }
}3 W$ _# r$ d* E! J' A! i
for w in self.workers {+ p2 d s) L3 v' \1 F Z7 i9 M) n
if let Some(t) = w.t.take() {
/ c2 ]6 ?: i( Z x0 F+ _% R t.join().unwrap();) q4 ^3 p {2 V) Y h4 S
} I F: m+ Q. o3 a
}$ Y% L- {) R1 ?; p9 L
}
- {) ~. }5 N4 g, a" ]7 F; n}
% Z+ y) |' l% }% f1 ~ i7 U1 ]5 o( M0 |$ `" Q3 H2 ]
/ x X5 N+ k, q# b7 M; _6 J/ o
#[cfg(test)]. z( C" |$ g3 f: n5 N7 t+ ~
mod tests {: ]9 A, I+ _) u* ?
use super::*;5 K" J; x# _8 N4 S& N! d
#[test]
1 t5 [ Y% W( F& w. W' ^, \ fn it_works() {; g/ u8 x4 f8 z
let p = Pool::new(4);% m5 z' M" p. h# b1 f
p.execute(|| println!("do new job1"));
/ S$ s9 u9 @% k& ?7 X! I8 i' |( ^ p.execute(|| println!("do new job2"));$ V/ v) v* c \6 @. @ _# W
p.execute(|| println!("do new job3"));! l$ D- Z4 u7 A1 |8 b8 f
p.execute(|| println!("do new job4"));( a# O# i! F- [' F1 K3 l* m# h; I& `
}
1 S u) }6 m/ o: P5 y0 B, ]+ s}( P2 t( c2 Z+ O6 t( N% |* M6 c: Y/ H& L
</code></pre>
; }5 e; z6 ]; e& P
9 O0 e/ c& D& x7 X9 r+ v2 P |
|