|
|
! n1 i" x2 o& J- U% u. [6 Y( {
<h1 id="如何实现一个线程池">如何实现一个线程池</h1>: k) x' s4 z6 T! b6 s
<p>线程池:一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。 例如,对于计算密集型任务,线程数一般取cpu数量+2比较合适,线程数过多会导致额外的线程切换开销。</p>! V; ?, y5 `, {' G* X3 }
<p>如何定义线程池Pool呢,首先最大线程数量肯定要作为线程池的一个属性,并且在new Pool时创建指定的线程。</p>
$ @$ q: z# ?4 }. _. ^9 [<p>线程池Pool</p>
* _+ Q0 `5 O0 L u1 R<pre><code>pub struct Pool {
+ v6 O2 N7 h+ _ max_workers: usize, // 定义最大线程数
6 d( u5 s$ I1 ^; D2 D+ G}# a5 M) M5 ~6 _/ e) O5 S( ]7 w- }
& y1 [" h5 i- h) d" Timpl Pool {
+ k" g. C! [; ~* Y. j$ t- B/ |( P8 U7 L fn new(max_workers: usize) -> Pool {}) T' M$ _, z' {- C" G2 ]
fn execute<F>(&self, f:F) where F: FnOnce() + 'static + Send {}: \$ {/ r9 R/ s/ |
}; t* d) g( p2 P/ y) W9 m* Z
8 x% m5 b& z9 {0 o
</code></pre>
2 O; L- w6 y; {: `% e& q D<p>用<code>execute</code>来执行任务,<code>F: FnOnce() + 'static + Send</code> 是使用thread::spawn线程执行需要满足的trait, 代表F是一个能在线程里执行的闭包函数。</p>
: j% v7 U" o# P9 b H* b' M+ e<p>另一点自然而然会想到在Pool添加一个线程数组, 这个线程数组就是用来执行任务的。比如<code>Vec<Thread></code> balabala。这里的线程是活的,是一个个不断接受任务然后执行的实体。<br>
: b% x! Z3 d) L* z/ |可以看作在一个线程里不断执行获取任务并执行的Worker。</p>
\+ |; j8 o9 X! }<pre><code>struct Worker where/ [3 p' j& h4 u# i& x. n1 T6 W4 G
{* [( s& C2 x7 W. u" C& ?6 }" A
_id: usize, // worker 编号9 ?6 o4 Q3 ]! F U$ z
}3 ~2 K: G4 I3 R" ~
</code></pre>
& g* _0 W! d5 }7 [<p>要怎么把任务发送给Worker执行呢?mpsc(multi producer single consumer) 多生产者单消费者可以满足我们的需求,<code>let (tx, rx) = mpsc::channel()</code> 可以获取到一对发送端和接收端。<br>
: b( W1 _6 Z0 ^9 U3 P Q8 x把发送端添加到Pool里面,把接收端添加到Worker里面。Pool通过channel将任务发送给多个worker消费执行。</p>
& |4 j4 g5 b$ D, x<p><strong>这里有一点需要特别注意,channel的接收端receiver需要安全的在多个线程间共享</strong>,因此需要用<code>Arc<Mutex::<T>></code>来包裹起来,也就是用锁来解决并发冲突。</p>
+ G- l5 l* n! {. t<p>Pool的完整定义</p> {; X+ \+ N7 X v/ H8 ?% w
<pre><code>pub struct Pool {& ]4 J5 z A$ O& ?) l4 U
workers: Vec<Worker>,
( P0 z; v- ]# b! @ max_workers: usize,5 k+ p' V: B, @4 m% A5 b
sender: mpsc::Sender<Message>
9 Z2 x( i& `& ] `7 Q: h0 ?}
" ^/ k/ D& F2 i Y6 B/ Q0 T5 k</code></pre>1 n6 ^9 i/ i& a+ W' a. c
<p>该是时候定义我们要发给Worker的消息Message了<br>
! c* A+ C" U f4 o; G/ j定义如下的枚举值</p>
7 e. q% p: v* P6 t) D) N6 ~% @<pre><code>type Job = Box<dyn FnOnce() + 'static + Send>;
o" _; L# n+ n: W9 [! Z5 E% Renum Message {
* }; ]9 R- C2 t$ q, L9 j( ?' P ByeBye,
O8 A; C9 c5 m- S: B NewJob(Job),. Q5 h) D8 [7 n" K0 l8 E( D- M0 ^
}7 |( Q, p' w. b# O+ c( e
</code></pre>6 C; }+ J$ z' O: S
<p>Job是一个要发送给Worker执行的闭包函数,这里ByeBye用来通知Worker可以终止当前的执行,退出线程。</p>
( M9 I1 f9 T- |9 N# f' e. }1 t<p>只剩下实现Worker和Pool的具体逻辑了。</p>2 D( s# J1 _- V
<p>Worker的实现</p>0 W2 o' c5 o5 P E
<pre><code>impl Worker6 |0 N' ?" o: d+ ~; m; K+ a
{
- @& v" C" n$ H, s fn new(id: usize, receiver: Arc::<Mutex<mpsc::Receiver<Message>>>) -> Worker {
& r: M$ g. M& \6 m/ h* V1 G& V. v let t = thread::spawn( move || {: G& g1 D- D. h( A
loop { H5 S1 E0 f+ ^- x
let receiver = receiver.lock().unwrap();1 Q5 H" x! ~6 W( r4 m5 M, k% j
let message= receiver.recv().unwrap();6 H! T9 R3 B: e$ e+ R8 @6 s
match message {5 b) a; P% n) f1 `7 u
Message::NewJob(job) => {9 I Q; V) G. U4 R G
println!("do job from worker[{}]", id);2 t& N8 B' j) \( N
job();5 Z% U% M" C7 X; E0 _; z
},8 u! T2 z% ^% V; ?
Message::ByeBye => {) g2 ^: m1 B7 _( Y' w# {& o$ R
println!("ByeBye from worker[{}]", id);
7 g% J( t( ?3 Z- h0 L break
, a8 G' V1 f& |; i },# L5 u6 }. M6 b9 `/ n
}
; ^! n2 ^% P8 @% R1 Z }
. ~; F5 ]' r3 f: L | });5 \3 f, ^# f4 p
o, J" w L# O* L y) Y2 x
Worker {" }: i+ c; Y0 V5 l# |0 R
_id: id,7 ~" G, v' L- `: G4 W, e- ~8 m" m8 M
t: Some(t),
5 r' }" t* y% ]; m' f }4 c; `! i, Q$ M
}
$ ?1 R% }& s0 z' p( c& K6 h}
0 S1 V$ U# u! H4 R% N% S</code></pre>6 A- ]% X, g5 A. t' o. u- v( H
<p><strong>let message = receiver.lock().unwrap().recv().unwrap();</strong> 这里获取锁后从receiver获取到消息体,然后let message结束后rust的生命周期会自动释放掉锁。<br>
6 F' ^$ \) c) t5 r% Y& m但如果写成</p>
+ R: J- u1 B$ g5 K# a) Y0 Z8 p5 s z<pre><code>while let message = receiver.lock().unwrap().recv().unwrap() {
' g K/ z# D9 P) Y( b& B}; c* `# J, |; E J! r- ^$ Z6 e( T
</code></pre>
1 j. W$ h7 c1 n2 h4 x<p>while let 后面整个括号都是一个作用域,要在这个作用域结束后,锁才会释放,比上面let message要锁定久时间。<br>
" I. N( ]' L3 F9 l: Qrust的mutex锁没有对应的unlock方法,由mutex的生命周期管理。</p>
Z1 p6 l1 W. a" m3 [- K1 G& }<p>我们给Pool实现<code>Drop</code> trait, 让Pool被销毁时,自动暂停掉worker线程的执行。</p>! k2 D% w( h9 q! |: c' y; V o
<pre><code>impl Drop for Pool {/ x0 K+ t# d8 i s
fn drop(&mut self) {
, _3 ^7 C9 P/ W% {5 n4 h, _2 s for _ in 0..self.max_workers {
! ^8 b# W F! [) c, V) j self.sender.send(Message::ByeBye).unwrap();6 X2 l7 m0 V# x6 t% ^
}" J6 J& D( K$ p2 b- G1 m
for w in self.workers.iter_mut() {, G/ H5 o; c- v" Y- {9 i
if let Some(t) = w.t.take() {
9 k Z$ B& ^* X7 G0 F7 D* {& \ t.join().unwrap();
3 x' V" l9 N8 ?5 `* W' ~, T4 U8 U }) n. ?3 z) H6 Z% u+ C* Q
}, F/ `! Q1 S) x
}) ^. ?! Z* z) m4 r- x* \- w0 k6 L
}3 l1 R P- ?; w8 e9 }, m+ Z
6 }* Q& Y4 V& o9 E4 N5 s* [% P: F
</code></pre>
: y/ {. a& m% v! J) Y<p><strong>drop方法里面用了两个循环</strong>,而不是在一个循环里做完两件事?</p>7 P: K2 ~/ W$ V5 u( H, m
<pre><code>for w in self.workers.iter_mut() {9 A, [" c7 y9 [5 G
if let Some(t) = w.t.take() {
$ n" Q, ]) I8 U! X3 _3 H& w) C3 O self.sender.send(Message::ByeBye).unwrap();
8 U1 c. u3 g3 e) z t.join().unwrap();- p% n9 S9 Z; ?% p1 R% v: U
}# k& r( U1 f, B5 @& G
}
# z8 k: A3 @# v+ M1 ~
* S9 u1 @' }' D</code></pre>
3 H& w2 i: f9 M) O<p>这里面隐藏了一个会造成死锁的陷阱,比如两个Worker, 在单个循环里面迭代所有Worker,再将终止信息发送给通道后,直接调用join,<br>
6 C. k( V: F; d9 W我们预期是第一个worker要收到消息,并且等他执行完。当情况可能是第二个worker获取到了消息,第一个worker没有获取到,那接下来的join就会阻塞造成死锁。</p>
) K5 A, ?. y: Q2 `# W<p><strong>注意到没有,Worker是被包装在Option内的</strong>,这里有两个点需要注意</p>/ w- |1 q: y3 A+ U/ t. G% i% R! p
<ol>
0 ~& F) t. x- t7 \<li>t.join 需要持有t的所有权</li>
W) m" r4 w, h( R' J8 K3 K<li>在我们这种情况下,self.workers只能作为引用被for循环迭代。</li>2 o4 S: Q" s. d
</ol>6 B+ z# m( B- l5 q7 y' A. m
<p>这里考虑让Worker持有<code>Option<JoinHandle<()>></code>,后续可以通过在Option上调用take方法将Some变体的值移出来,并在原来的位置留下None变体。<br>
" u4 u d" U( }# \( y& C换而言之,让运行中的worker持有Some的变体,清理worker时,可以使用None替换掉Some,从而让Worker失去可以运行的线程</p>
, s" Z! X2 h, p: f' B) v8 M" a<pre><code>struct Worker where
3 }3 i: m! d9 k) l. G# a( _{; s# p: t6 u8 d _* r
_id: usize,7 t3 f8 [4 e0 R: p; R ~
t: Option<JoinHandle<()>>,6 ]( J. B* d$ S" \6 L- u% s* }
}* \ Z2 L7 c" _% E
</code></pre>
' s/ O4 [9 h \$ O' r2 w3 j. p<h1 id="要点总结">要点总结</h1> z* ~ G4 k( `$ P1 c/ B' v
<ul>: a9 v* ^5 f' v) Y( N, z9 V) V
<li>Mutex依赖于生命周期管理锁的释放,使用的时候需要注意是否逾期持有锁</li>
2 h8 \5 c2 C6 M ?<li><code>Vec<Option<T>></code> 可以解决某些情况下需要T所有权的场景</li>
, U5 B) S7 ~" d5 D0 Z) E5 j- G</ul>
8 {+ M1 }# O" x# c# N7 ^! T5 V<h1 id="完整代码">完整代码</h1>
4 W7 K: Z" f4 I7 v. ~. {3 e+ X: {4 ]<pre><code>use std::thread::{self, JoinHandle};( N. X& } K% t- k
use std::sync::{Arc, mpsc, Mutex};6 j+ w5 H' i1 g; J
8 j) R3 p7 r# J8 {
6 b& L: Y- S, v% x7 G3 Etype Job = Box<dyn FnOnce() + 'static + Send>;
3 t' `% Z: b( X" u" Oenum Message {
# {9 n! C! b7 h ByeBye,
8 z9 w# q2 B) s$ t1 w$ f: A( s' e4 ~, G3 X NewJob(Job),% p1 O8 R: `" w# @+ r ~
}
6 Z# P& V. b( J% n! R2 X8 A4 t( y6 l& A1 X1 M: N3 g0 v: C: K' f
struct Worker where! f; l" Q' T0 \ g
{: _& V) J% r# V2 j
_id: usize,! M+ b8 L4 d! H' y
t: Option<JoinHandle<()>>,
8 h7 I E& X8 _5 w) R, I* a}! ?( U: k) t, M7 V1 Y
. T; M; z; e* k0 B! q. Q2 R, |3 g
impl Worker
6 W- o1 t; I, t/ v! V8 E{6 c2 Y% i) w- R5 z0 i* h
fn new(id: usize, receiver: Arc::<Mutex<mpsc::Receiver<Message>>>) -> Worker {
/ \+ b- z4 b# J- \% H4 d0 R9 i; ? let t = thread::spawn( move || {
& @& r9 s$ C5 {( ? w loop {! G: _, E8 W3 x8 d# Z; d6 M; }
let message = receiver.lock().unwrap().recv().unwrap();
$ C' z3 C; }0 F+ e0 J4 V6 r: q match message {
) b* T, Z. g! r; Y$ j$ H Message::NewJob(job) => {' ? A7 {- h' C
println!("do job from worker[{}]", id);
+ N: k3 `# n9 k5 I9 }+ t7 P job();) S! F1 B9 O7 ~: [- [8 t9 z' q
},+ D7 ~7 z$ ]) @- g. h
Message::ByeBye => {: |* U2 h( Z7 C- C- A
println!("ByeBye from worker[{}]", id);1 P# e5 Z3 d6 l
break; l: r0 w# k4 ]- S! C! o) c
},4 w, w' K) k+ M' c% t' d; h
}
0 S' Y% Y5 E Y ~% @6 ]2 n; \ }
8 A0 a. K; m( G6 ?( ? });
/ A- Z4 s1 D! K* Y, Z
+ r" G3 L; j/ X& C$ V% K Worker {5 f5 Z: `. K; r4 y
_id: id,
% d8 m& n5 a% _4 _% s+ u2 \ t: Some(t),7 v, W% a _/ B- ]$ L9 K
}
& c) F O9 F' @0 {! O }
8 W& Y8 Y( ^* f$ H}$ s: J4 f( F4 \. J$ S, O) N
7 W4 H5 }6 S2 t6 o5 Rpub struct Pool {8 v% t, f% g2 Z9 l5 c8 D
workers: Vec<Worker>,
( I/ O4 j6 R& j max_workers: usize,/ v: ?7 L5 M. V; x$ C: \
sender: mpsc::Sender<Message> Z: Z K7 f7 E- o/ `; ]$ {( W
}
- r; V; |, I* K4 [( Q5 J8 H2 O1 G- k- X+ y! Q1 Z' e4 J
impl Pool where {
& Y) s3 w. N/ S pub fn new(max_workers: usize) -> Pool {
# w3 s! @, p R3 g9 V$ k if max_workers == 0 {+ w2 t3 H& c" C
panic!("max_workers must be greater than zero!")
1 q) q, D0 ~, D# Q+ w ` }
1 k5 x& W6 L5 B' a2 V: s let (tx, rx) = mpsc::channel();
) j3 r( b) v5 M9 n$ @' T& ]( q) {1 h/ W. B" d
let mut workers = Vec::with_capacity(max_workers);
0 J, j ]- _( e' l' x0 H let receiver = Arc::new(Mutex::new(rx));
3 }6 _4 b9 E( ]! r Q6 E for i in 0..max_workers {
) d: \5 V' g2 T/ y% ]6 z workers.push(Worker::new(i, Arc::clone(&receiver)));
, N7 O. q( H/ t' u1 _5 b }
5 ?7 Z1 e) M2 l; w2 L* ?, N) k8 q* z j2 M5 E3 j
Pool { workers: workers, max_workers: max_workers, sender: tx }
/ U: m* _8 h, D( Q z6 e6 `0 c }' H; b* q f0 H" j' ~3 Z4 h
! `8 k, i0 s: `* y4 R1 n0 J pub fn execute<F>(&self, f:F) where F: FnOnce() + 'static + Send
) i9 I+ J1 ?/ K- Z& R { ]$ ?. O" S& d' B7 k9 [
% s4 H. [0 d3 s6 ^5 |% { X5 z, k. ~& H let job = Message::NewJob(Box::new(f));2 c0 z @, I% `0 j6 A V/ w
self.sender.send(job).unwrap();" q/ ^$ ]$ z0 P9 P# g
}) M) l: d; R/ @3 S, H: Q& S
}# ^: }6 l3 N$ A: H) G! B$ B$ `6 x
* x6 M8 g' S" b/ A+ ?* a) X
impl Drop for Pool {/ o& }% d5 o: ^4 N7 G. y6 Q0 E
fn drop(&mut self) {/ \8 o% u- |% i; P7 r1 j" _
for _ in 0..self.max_workers {
! K# g Y4 [3 k2 t. S3 i7 S self.sender.send(Message::ByeBye).unwrap(); t4 S* d4 S, A; L! N. j
}
0 s9 k4 z3 S% C3 c/ {4 d for w in self.workers {
}1 C2 p7 o- [+ m if let Some(t) = w.t.take() {6 |8 e5 F1 U4 z- @: ?1 D
t.join().unwrap();
7 U3 n. |' n0 c; A }0 `! x% j( u* [$ M* F2 f9 M% K
}, y" J3 c0 o# ~9 \
}* Q: ]: H1 D6 ^0 ?7 l% z5 _, {! D$ X* A
}* ~9 |' Q# c6 [7 @! [
, k/ g# M4 k) H( v
* _- @# ]( ?5 z5 H
#[cfg(test)]
1 E4 `# T8 D: V* {mod tests {. d8 l3 J9 ?: U- G
use super::*;
4 ]4 {' q4 O5 \4 ~# ~/ u #[test]; ^2 P$ D3 O# r/ l9 l
fn it_works() {; u" I. J5 k8 \+ R( f9 T+ ^
let p = Pool::new(4);5 ?1 {/ I& J. Y
p.execute(|| println!("do new job1"));
/ P9 T) Z$ U, _ I7 o p.execute(|| println!("do new job2"));7 j: w9 s% {1 v& R) Z6 }
p.execute(|| println!("do new job3"));
% I2 ?6 Y e, S6 D r p.execute(|| println!("do new job4"));6 M, ?: o, ^" k4 ]9 L' O
}
) G `, v- [! A0 n}
+ T. F) F1 c' J& X</code></pre>0 ^ F" ~" k; O8 L+ \
0 i& d6 Q' z) ?* D8 @5 u6 s' \ |
|