|
|
$ V& j' K, k0 e. h) b
<h1 id="如何实现一个线程池">如何实现一个线程池</h1>% d5 a1 i) e0 S6 f% J- H
<p>线程池:一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。 例如,对于计算密集型任务,线程数一般取cpu数量+2比较合适,线程数过多会导致额外的线程切换开销。</p>
+ x. g) _1 S4 D4 r0 S0 g, R4 K<p>如何定义线程池Pool呢,首先最大线程数量肯定要作为线程池的一个属性,并且在new Pool时创建指定的线程。</p>
- Y; W( N* L/ l% k" H( \' O<p>线程池Pool</p>4 e0 ?. T. |/ k" ]5 M! g: Z, c; Z+ P
<pre><code>pub struct Pool {! C5 h! j5 C& w! |
max_workers: usize, // 定义最大线程数
8 B5 C" L! v# B. e% _( E}! a& x; g6 S) n5 Y! s$ d4 h
# c4 `9 T6 K. w, A# Limpl Pool {$ O' @0 ?$ X' t, C! W. E/ T
fn new(max_workers: usize) -> Pool {}) U1 v# o1 o+ k/ V. ~
fn execute<F>(&self, f:F) where F: FnOnce() + 'static + Send {}- o* `0 ?1 _+ G$ U _
}
. R4 g* `' f4 Z
4 O4 R( ^2 h; D* p9 c$ m</code></pre>: D% I5 _- G5 N! _/ k
<p>用<code>execute</code>来执行任务,<code>F: FnOnce() + 'static + Send</code> 是使用thread::spawn线程执行需要满足的trait, 代表F是一个能在线程里执行的闭包函数。</p>
( D ]( _7 L0 `2 U. E<p>另一点自然而然会想到在Pool添加一个线程数组, 这个线程数组就是用来执行任务的。比如<code>Vec<Thread></code> balabala。这里的线程是活的,是一个个不断接受任务然后执行的实体。<br>
" L4 F5 ?4 ^( J) F5 @0 Q- B可以看作在一个线程里不断执行获取任务并执行的Worker。</p>- p" }+ ~( O0 s- @; Z; C+ T. [ r
<pre><code>struct Worker where2 l0 u% V& @* i( o0 y$ }
{0 f& y$ m* K9 W
_id: usize, // worker 编号
+ v5 M$ H; l; v0 D h. [. V}) Y1 K" I3 s: u' P* x, }
</code></pre>
# m! p" _% B4 U; k k<p>要怎么把任务发送给Worker执行呢?mpsc(multi producer single consumer) 多生产者单消费者可以满足我们的需求,<code>let (tx, rx) = mpsc::channel()</code> 可以获取到一对发送端和接收端。<br>$ W) }0 e, G8 @" {! d( ?' T8 Z
把发送端添加到Pool里面,把接收端添加到Worker里面。Pool通过channel将任务发送给多个worker消费执行。</p>
: e0 y7 X3 @& W9 }7 v% n! i* s<p><strong>这里有一点需要特别注意,channel的接收端receiver需要安全的在多个线程间共享</strong>,因此需要用<code>Arc<Mutex::<T>></code>来包裹起来,也就是用锁来解决并发冲突。</p>9 v: k0 s c' l" R( B
<p>Pool的完整定义</p>8 n$ `1 S8 \/ t1 K( Q& V
<pre><code>pub struct Pool {1 R% J& I6 L0 b( q: A
workers: Vec<Worker>,
, A7 I/ Z7 C( ?/ u$ J' u. { max_workers: usize,4 w6 v! Y; j# ?9 r! L' [1 S/ {6 {
sender: mpsc::Sender<Message>
/ k3 C+ F0 q1 L* P+ t+ F) s}: Q8 F5 J9 b" T) Y. m* W/ P5 |
</code></pre>
5 _) C! \3 o! ^ t. Y% O. r# G; z<p>该是时候定义我们要发给Worker的消息Message了<br>
. P6 G5 Z6 y. h, |定义如下的枚举值</p>, ~5 t' Y0 a7 h! A; @9 Z5 M
<pre><code>type Job = Box<dyn FnOnce() + 'static + Send>;9 v0 W* B7 T$ J X# J$ M' \. r
enum Message {8 F) s. i6 O& s. S! ~! P+ c) J
ByeBye,
, n" G/ _6 w0 b' } NewJob(Job),; b% z# Z8 a. P2 v" G
}
6 U, I* c0 ^) f+ X</code></pre>
4 o! H& Y# ~" b6 p0 G<p>Job是一个要发送给Worker执行的闭包函数,这里ByeBye用来通知Worker可以终止当前的执行,退出线程。</p>
2 K m; x# n: y( s1 ^1 M' f( _, O<p>只剩下实现Worker和Pool的具体逻辑了。</p>
! e% B! Y' @ C+ ]0 M [1 {<p>Worker的实现</p>+ E$ ]$ x; }6 {# r
<pre><code>impl Worker
7 t" w0 Q5 r4 c: ?0 S# c. a' _9 S{
8 }- V3 k x, Z B2 W6 k" ` fn new(id: usize, receiver: Arc::<Mutex<mpsc::Receiver<Message>>>) -> Worker {
. ^2 c9 M) z% F6 @) ^% w' L0 N let t = thread::spawn( move || {
3 S8 M% l' Q* V$ @2 f1 r loop {; g, t1 u% o& \& B
let receiver = receiver.lock().unwrap();+ d- ?& V1 f8 @6 I( f; I/ ~2 F
let message= receiver.recv().unwrap();
) r Z+ x4 M! g match message {
$ l( y( X" x6 i, O* B9 O% ? Message::NewJob(job) => {
0 E0 ~/ z8 Q; q. i- R2 W( \ println!("do job from worker[{}]", id);. j' L! T$ g0 j8 {2 [
job();& A2 s. e, p' `$ H/ G- T. }
},/ ~8 m3 j6 s' k. u8 H
Message::ByeBye => {% }/ U; V& I$ [) b2 l
println!("ByeBye from worker[{}]", id);/ J6 c9 d7 z/ }$ a/ O3 ?
break. W( L. y% p1 B w* b0 W
},- W: S4 p* A S5 v
}
" Q4 |4 n8 }! y3 \' C b }, G6 O. e1 C; m; e- I% L) v
});7 G3 O. I+ v6 H9 M
" V, e1 Y- R# r) j, {/ U ` Worker { t! H- C* \5 l P ]
_id: id,) }2 Q4 G3 t2 A+ P: |, U
t: Some(t),3 p% }2 b# F/ D
}
5 p0 s7 i4 V5 O- { }' l# O6 `/ f# [0 W2 y- c
}" h7 V U9 P% |6 ]2 @* y
</code></pre>8 \! |: q* ~0 O. T9 `4 p5 x
<p><strong>let message = receiver.lock().unwrap().recv().unwrap();</strong> 这里获取锁后从receiver获取到消息体,然后let message结束后rust的生命周期会自动释放掉锁。<br>
5 R) c) D# \; Z8 D7 T% N但如果写成</p>" I" I0 _1 D, t! ], q; ]
<pre><code>while let message = receiver.lock().unwrap().recv().unwrap() {
9 i+ ~/ ~" h8 M B3 e. Y! U};
3 ]: K- v( r6 @/ a6 g4 T</code></pre>
+ E+ a5 O) X/ P1 \- A8 J; K% q<p>while let 后面整个括号都是一个作用域,要在这个作用域结束后,锁才会释放,比上面let message要锁定久时间。<br>/ @ n9 _( E# O
rust的mutex锁没有对应的unlock方法,由mutex的生命周期管理。</p>
& B& f" z+ q/ Q3 Q1 } x<p>我们给Pool实现<code>Drop</code> trait, 让Pool被销毁时,自动暂停掉worker线程的执行。</p>* Y3 l5 {6 d4 q7 h! B9 P
<pre><code>impl Drop for Pool {
! ~6 H O# @/ P$ Y0 K fn drop(&mut self) {( z+ K X P! v$ H0 E$ H: _
for _ in 0..self.max_workers {
# |* u4 ^7 [3 R8 U6 ~, w7 \. A self.sender.send(Message::ByeBye).unwrap();7 \8 f9 i2 U( x8 ]% J
}
% _/ L3 W( v" g* J. ]( C) y for w in self.workers.iter_mut() {
* r0 ~- ]/ n. a6 r if let Some(t) = w.t.take() {4 A z# m& ^7 E& _
t.join().unwrap();5 N- L" A+ i0 ]& ?$ o/ c
}
4 g8 f0 I& H( N2 n3 L }* `( R2 j) E+ g9 h
}
: X/ X! ]+ E* g/ v}2 N5 T, w6 c$ d+ M+ s' e
# E4 n9 S) J: f Q0 z6 v- i) m, g</code></pre>/ S) s. K$ e( j
<p><strong>drop方法里面用了两个循环</strong>,而不是在一个循环里做完两件事?</p>
& c8 z# s$ s) t# j1 {# [<pre><code>for w in self.workers.iter_mut() {
) v. u0 n+ U! ]8 Z3 M if let Some(t) = w.t.take() {; Z7 v) c' j. }( p# T
self.sender.send(Message::ByeBye).unwrap();
& e. ]" e5 r! S1 k) f t.join().unwrap();
( Z: |7 R) N& u j0 m2 k( P+ r e }
/ Z* y( |! |+ X5 Y7 Q4 A$ M}. B8 C# c6 C2 \: H- f- G
0 _. \3 q7 N9 w) d</code></pre>5 L" E. h$ x$ Y
<p>这里面隐藏了一个会造成死锁的陷阱,比如两个Worker, 在单个循环里面迭代所有Worker,再将终止信息发送给通道后,直接调用join,<br>
# `9 Z! t2 a/ W. A1 A+ }; b/ a我们预期是第一个worker要收到消息,并且等他执行完。当情况可能是第二个worker获取到了消息,第一个worker没有获取到,那接下来的join就会阻塞造成死锁。</p>
6 T4 q- g0 i' ^$ c; a4 d<p><strong>注意到没有,Worker是被包装在Option内的</strong>,这里有两个点需要注意</p>
4 u6 ^$ c @6 {<ol>
7 `1 z, m. ?) S, P# v r, Y- k( X<li>t.join 需要持有t的所有权</li>- L2 `6 i$ q; b8 a0 I' X$ q/ N
<li>在我们这种情况下,self.workers只能作为引用被for循环迭代。</li>
; }7 L( E# H! i9 D+ T8 y</ol>" z7 U9 [) u: I/ i8 z
<p>这里考虑让Worker持有<code>Option<JoinHandle<()>></code>,后续可以通过在Option上调用take方法将Some变体的值移出来,并在原来的位置留下None变体。<br>' y( J4 Z" m1 C
换而言之,让运行中的worker持有Some的变体,清理worker时,可以使用None替换掉Some,从而让Worker失去可以运行的线程</p>
0 c& H4 Q$ j. F4 o<pre><code>struct Worker where ~: y+ X+ O! h5 B* p
{7 g" g2 ~( t4 O& f7 ]" ~
_id: usize,; x! a7 n5 o& {8 ^' K& D3 M
t: Option<JoinHandle<()>>,. g6 b% |- I4 r% Q. T* z8 l* A
}0 k; V- Z( q! G; L! A. {
</code></pre>
, ?! E7 l3 s/ ?<h1 id="要点总结">要点总结</h1>
, K* v, L: m' ]# M# ~, F<ul>
+ b$ V y: s" ?/ o<li>Mutex依赖于生命周期管理锁的释放,使用的时候需要注意是否逾期持有锁</li>: M) F" t( j0 s# w$ o7 H
<li><code>Vec<Option<T>></code> 可以解决某些情况下需要T所有权的场景</li>$ \" G0 J' b, A" n8 [; E: X
</ul>: S# b) L. G% X6 D& T8 z
<h1 id="完整代码">完整代码</h1>7 ^ o6 h1 L( E$ N# m) P# ? g
<pre><code>use std::thread::{self, JoinHandle};
* _5 d4 `, K6 Ouse std::sync::{Arc, mpsc, Mutex};7 _- E$ \5 F! K) N: X
. o9 @/ X0 y5 p$ @1 A& Y$ G. t
, r B f3 S' T- l. F: ?1 z
type Job = Box<dyn FnOnce() + 'static + Send>;
& G5 ^7 c4 x9 B _ Q. b( Benum Message {
6 d; a0 f% a9 ]9 \7 O3 W% G ByeBye,2 ^8 x0 Q2 J7 b
NewJob(Job),
9 m' b$ _) J `( M/ T1 ?}
% X% I. G9 U: T! d& {1 C9 d
" t1 [5 z3 K% R- i$ nstruct Worker where* ]; R6 n: B. M) o+ p
{
! Z: Y; q3 c' @ _id: usize,/ o6 B1 }2 V7 W% m
t: Option<JoinHandle<()>>,
a8 X9 N0 r4 x* \4 \! n2 V Y2 J}
R# B' S' w4 t- M! y/ y# H5 V. g" r; D
impl Worker
; n5 U' V- w: k% P% `{
- q, g6 z9 N5 F8 F8 T8 n0 X7 c fn new(id: usize, receiver: Arc::<Mutex<mpsc::Receiver<Message>>>) -> Worker {
_; a9 c) U% c$ v- K, u let t = thread::spawn( move || {
2 Q- A7 B+ n1 A4 W/ n loop {
$ V* k" F/ f* m7 z+ r. B3 i* b let message = receiver.lock().unwrap().recv().unwrap();
- l3 R* o9 g0 ~ z: b% [ match message {. `; Y( Q2 ]* `: A0 F B
Message::NewJob(job) => {
- R6 @( J# P$ z- i8 g println!("do job from worker[{}]", id);
# ]% f1 c7 I, D0 i! W" s job();
% q9 }: K3 ~3 H! t, U% L: i$ g/ u8 R3 [ },
0 ^- L/ X b+ O Message::ByeBye => {
' W" |" Y/ z) g8 s0 E println!("ByeBye from worker[{}]", id);
1 U8 h" U5 R1 i7 b$ s break0 h. b$ ^* e6 s0 d% s t/ a
},( J4 a5 `0 f9 f* j/ G. R2 `
} 8 l8 U4 ~/ A a; L' _( d+ q
}
/ R9 z1 G) n4 g( q4 X });+ j2 c2 F+ S: L8 e! F
3 O! H2 B3 F- r
Worker {/ I i7 u* M0 ^- O% w: ^- x
_id: id,, f. Y. B. M: B9 `
t: Some(t),, Z6 k. @. h8 I8 Z& ]
}; b* v: p& ~( d8 ]! b
}
5 y8 D/ x0 T3 d4 q" h. O}
! m( R+ U* r9 _. Z
" R+ d" L, o& gpub struct Pool {
# r0 X+ k+ c# C workers: Vec<Worker>,6 x6 K$ c6 v. t0 ?" K& j1 U
max_workers: usize,+ I O; ^) v9 p5 ~9 q: R1 k
sender: mpsc::Sender<Message>
9 I3 }8 D. ^* k2 x}& b) d g$ S7 e: n' v: k& d: J: q0 S
, g6 h1 \% P+ |- f3 l" K/ t4 `impl Pool where {" R9 W; Q6 x3 p2 G a
pub fn new(max_workers: usize) -> Pool {
4 U- n t% [/ A! s. y) r6 ?9 z if max_workers == 0 {
" W. P. l T. K) X1 u1 V% `- W panic!("max_workers must be greater than zero!")5 O2 U3 p; u3 v4 @
}
; [2 T2 x2 ]" h1 Y, s4 P9 h1 i let (tx, rx) = mpsc::channel();
5 \5 G" f3 W3 |# g
% g- e4 O$ D7 g) g; K1 ? let mut workers = Vec::with_capacity(max_workers);9 F; R3 K7 Y& E7 R" [" ]: a0 r
let receiver = Arc::new(Mutex::new(rx));0 b# ?. N/ u3 {2 r$ P3 K3 h' {
for i in 0..max_workers {/ Z2 T9 I) Z3 B% L
workers.push(Worker::new(i, Arc::clone(&receiver)));* C* I3 c3 B g' ]# V3 |/ a
} m A% {# H6 W" D! `3 _5 X
4 w. N6 K9 C$ `5 n9 j2 T) @
Pool { workers: workers, max_workers: max_workers, sender: tx }5 [4 Q# Z+ o! ]4 u
}. N" l" T$ X4 c$ l
9 c5 }7 Y/ h- `& y8 m8 p pub fn execute<F>(&self, f:F) where F: FnOnce() + 'static + Send$ B% F6 [0 n! c5 B2 M# R
{
3 x6 v: _ K+ {! j! Z8 n- |! D+ l
' c0 f6 M0 @+ A `; x( z7 A0 w% q0 r) ] let job = Message::NewJob(Box::new(f));$ x/ w! X* H0 C/ A) T( ^+ r( ]# T8 i
self.sender.send(job).unwrap();5 g$ d7 ]' Z$ q. r* }
}
# K5 S& r9 Q2 j3 ~9 M! `}
4 b" Y1 J2 t. {4 D
" [' j! g6 G$ Rimpl Drop for Pool {
4 ]- d& A) V. \( J7 m) r fn drop(&mut self) {6 W$ d8 I+ K3 ^+ d, `/ }2 O4 U$ n
for _ in 0..self.max_workers {2 c$ M$ {% J/ a, v3 f
self.sender.send(Message::ByeBye).unwrap();/ ]( S. p1 U7 w6 z; `6 L
}
- w" a7 p9 M5 m$ X' F for w in self.workers {
+ T6 j# X, n6 ^- V' z if let Some(t) = w.t.take() {
4 @+ k: O J' B! A t.join().unwrap();
# r# y" f9 A7 w" ] }
2 N) p, y4 M. ]9 }# D6 f }
; p) x7 Z i# `4 K& h$ B }) R" ~( A& Z+ V6 Y# A
}
* X% q" x6 P5 u
; N, X a6 u$ D+ O' |6 w
6 K' E( a0 |! X, A#[cfg(test)]- w/ P3 l' Y5 G9 B
mod tests {
) q: a! p$ F: n1 N use super::*;
; M; y; f/ I+ N #[test]
# P8 U1 `$ Z, Q* P; j" l fn it_works() {
% b9 e8 s* J: W let p = Pool::new(4);
0 P# ? T/ \" Z4 i p.execute(|| println!("do new job1"));
4 G8 A: f7 D8 ]2 z9 E8 p p.execute(|| println!("do new job2"));
7 G, ^8 B7 C6 N' ? p.execute(|| println!("do new job3"));" V) y0 u, b- D; @! u
p.execute(|| println!("do new job4"));( D; L3 X0 O. h! O. J, E1 @
}
* c) V; a7 H$ F; e+ s6 i2 f: t}% n; u: S/ e9 J/ S% m. v& R
</code></pre>
5 A0 v# a( W$ Y' w4 J
5 D6 i/ z1 I# N( K( ]' i |
|