|
|
, Y9 Q; P" F" v$ o<h1 id="如何实现一个线程池">如何实现一个线程池</h1> ~ l$ i) @* C6 a. A
<p>线程池:一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。 例如,对于计算密集型任务,线程数一般取cpu数量+2比较合适,线程数过多会导致额外的线程切换开销。</p>/ l- g i4 T# M, }" K2 J8 \' `/ N
<p>如何定义线程池Pool呢,首先最大线程数量肯定要作为线程池的一个属性,并且在new Pool时创建指定的线程。</p> m! n* @3 w/ ~7 L
<p>线程池Pool</p>9 N8 W" h5 O6 q- \$ x$ t/ K$ ^
<pre><code>pub struct Pool {
! S( C6 B s: J max_workers: usize, // 定义最大线程数
9 K/ r8 T+ E8 l% ]4 c u}6 M9 l/ {$ B) Y% }: f# W
1 h( c" D; o& Q- C$ V% ~& x/ M
impl Pool {
4 y5 k" K. n4 G$ r1 o0 [! C! @7 Z fn new(max_workers: usize) -> Pool {}) W8 g6 f0 h' {
fn execute<F>(&self, f:F) where F: FnOnce() + 'static + Send {}
* q$ T$ V9 |2 h}
0 e5 Q* Y* f2 v4 c* D: n# @3 x5 S1 L3 [( F; p6 I/ N; m
</code></pre>
, r/ L8 y( D5 ?5 P2 D- ?<p>用<code>execute</code>来执行任务,<code>F: FnOnce() + 'static + Send</code> 是使用thread::spawn线程执行需要满足的trait, 代表F是一个能在线程里执行的闭包函数。</p>+ R$ w- A0 o3 k n& p
<p>另一点自然而然会想到在Pool添加一个线程数组, 这个线程数组就是用来执行任务的。比如<code>Vec<Thread></code> balabala。这里的线程是活的,是一个个不断接受任务然后执行的实体。<br>
. g, h5 f; J6 ?可以看作在一个线程里不断执行获取任务并执行的Worker。</p>- Q. f* ]7 T' t# n5 M+ [6 Z
<pre><code>struct Worker where% M+ E K9 ?- ^# Q) K% C1 z
{- r' Y$ E6 s- x
_id: usize, // worker 编号
+ H( w& n5 \6 {0 K+ w5 T# L}: t9 r9 i' F- R% O# M% L7 d1 w. k! [6 P
</code></pre>: }4 c* m% F8 b
<p>要怎么把任务发送给Worker执行呢?mpsc(multi producer single consumer) 多生产者单消费者可以满足我们的需求,<code>let (tx, rx) = mpsc::channel()</code> 可以获取到一对发送端和接收端。<br>
. v, _7 u2 e0 s7 p, G+ ]4 a把发送端添加到Pool里面,把接收端添加到Worker里面。Pool通过channel将任务发送给多个worker消费执行。</p>
) }7 Y8 u0 X _: v) d$ C<p><strong>这里有一点需要特别注意,channel的接收端receiver需要安全的在多个线程间共享</strong>,因此需要用<code>Arc<Mutex::<T>></code>来包裹起来,也就是用锁来解决并发冲突。</p>' g7 M# c% q4 v5 t/ |5 v
<p>Pool的完整定义</p>; S& f0 @/ N, v% i g1 g
<pre><code>pub struct Pool {
- w/ H( D) V% v! P workers: Vec<Worker>,
" U1 u9 r& @/ f' c0 b F8 f max_workers: usize,
! c2 G( m9 T' ?3 m6 H sender: mpsc::Sender<Message>- ~0 f, C1 Z! S
}
p5 `& M4 C- ^6 G0 }" `* w, W</code></pre>4 G! o' k& k' B
<p>该是时候定义我们要发给Worker的消息Message了<br>9 [' s% O } k8 t+ v
定义如下的枚举值</p>
8 W* C, F' Z! N& s4 h<pre><code>type Job = Box<dyn FnOnce() + 'static + Send>;
5 E6 L' I; O' ^, `' u& menum Message {: g% ], L8 l* `& R1 C) F5 G& z+ t$ T1 ?
ByeBye,
; B8 S0 K: n% `7 L NewJob(Job),; K' L9 P, M0 _8 W6 `( N
}
- i* E! h8 H; Q) j6 N</code></pre>
( l. e6 p! g% ]7 P/ _<p>Job是一个要发送给Worker执行的闭包函数,这里ByeBye用来通知Worker可以终止当前的执行,退出线程。</p>2 S0 E' O9 I E- h' [0 X" G
<p>只剩下实现Worker和Pool的具体逻辑了。</p>
0 U) {+ }- F' u- U; e<p>Worker的实现</p>, A3 ^/ \1 G5 C! [9 H" k+ v
<pre><code>impl Worker
3 K! T8 A2 x2 c. p' j- ]6 D{! j4 N. u9 b4 y/ y$ i* X1 {5 o
fn new(id: usize, receiver: Arc::<Mutex<mpsc::Receiver<Message>>>) -> Worker {
- r4 r4 t& S" t e let t = thread::spawn( move || {, ^6 a9 o* O6 |! m+ N: x0 H
loop {& @9 `( E4 w2 [0 N: q
let receiver = receiver.lock().unwrap();
+ R7 R- c5 g- i F" t+ o let message= receiver.recv().unwrap();
0 c4 w3 f# j/ q, M/ W7 N2 S match message {$ t* l; C; R2 ?+ s9 V
Message::NewJob(job) => {
5 W8 p, l$ H' @8 ` println!("do job from worker[{}]", id);; H7 v# n1 ?& N) } r* q
job();& F$ y. @) X7 J- l& Y6 S, O
},9 x( p, _% m0 g. c1 t/ F8 z
Message::ByeBye => {
7 M3 V5 `% k2 J4 D8 ? println!("ByeBye from worker[{}]", id);8 l/ [ e, l; b. ?
break
$ Y& a- s- Z0 r },6 u+ q/ m1 P1 V" F2 M5 }0 ^$ g
} . J2 k' I9 P" g% V( l
}
" o; N6 i7 V, n* b3 z });) B- q0 P `1 y) A7 o# P
5 j9 x; n) ]- @7 H! V5 \; @7 G
Worker {4 S% @# z( J2 A8 V
_id: id,
' @8 N* {. |: ]' _ t: Some(t), z, a; W: R& E' ^% v. |
}+ [0 h. z/ o3 ^9 r' {
}
# K. T7 h/ W) w( h2 Q9 |$ b+ I: d}0 Y+ ~' [6 f5 n% ^
</code></pre>0 g4 Q6 R* p2 D( r: @) V; f7 D$ B
<p><strong>let message = receiver.lock().unwrap().recv().unwrap();</strong> 这里获取锁后从receiver获取到消息体,然后let message结束后rust的生命周期会自动释放掉锁。<br>" y& i% U' R3 I8 j% q5 \0 K
但如果写成</p>+ k0 w( j T1 M$ t6 r. A5 _5 q
<pre><code>while let message = receiver.lock().unwrap().recv().unwrap() {
" \- d7 @; d" o9 c8 b};
: X) w3 ^. v% ?6 ~+ R</code></pre>
8 y x% }% Q* c<p>while let 后面整个括号都是一个作用域,要在这个作用域结束后,锁才会释放,比上面let message要锁定久时间。<br>
r; `% s0 A' n4 _! E; _6 Vrust的mutex锁没有对应的unlock方法,由mutex的生命周期管理。</p>
* U4 \! t+ M% D! z( t, V% \, H. m. [<p>我们给Pool实现<code>Drop</code> trait, 让Pool被销毁时,自动暂停掉worker线程的执行。</p>
5 ?5 c% d! O0 u6 w6 F<pre><code>impl Drop for Pool {3 q3 c( E, U0 ~7 z! e
fn drop(&mut self) {/ [9 S* M6 T$ u) Y" G
for _ in 0..self.max_workers {
3 R. A" D6 j+ R& B9 E" A5 d% A3 d self.sender.send(Message::ByeBye).unwrap();
8 ]* l; W. D- ~( L& u/ L# ] }& X9 i& Z% D0 N6 F6 a, R
for w in self.workers.iter_mut() {
) A8 t) y8 @! m& w. g/ q3 v% \7 r5 c if let Some(t) = w.t.take() {
3 }% x$ N( b3 e t.join().unwrap();3 r$ u" }0 n! `6 W# k$ d( b
}
* h2 c* ^9 l& q3 {' B. c ]7 g* B4 ? }
+ E* I1 K3 F$ m# W5 D! l }& o4 M* U+ e! c9 D1 C5 f
}
7 I; Z5 [' `, h5 T# e8 O9 U# O
: [8 I! ^" {2 I</code></pre>5 Y4 J6 c' I# r& |
<p><strong>drop方法里面用了两个循环</strong>,而不是在一个循环里做完两件事?</p>* a' ]( @ e! ?0 O2 `8 F5 N
<pre><code>for w in self.workers.iter_mut() {
+ U, `! a- f" Z* r: k( A: `% ]' H4 d if let Some(t) = w.t.take() {
( @( x! s. B2 Q' D self.sender.send(Message::ByeBye).unwrap();
" o& _3 O. @) |+ s: G t.join().unwrap();* ^; b* a! v4 F# @
}% P" i1 i Y! T3 b3 {% c
}4 K R9 M" F: Q9 F0 g3 d/ v1 _3 u
) P# q% \8 q0 U( o; z</code></pre>
4 e2 ]% @# P0 |9 `4 ~5 y+ F5 M<p>这里面隐藏了一个会造成死锁的陷阱,比如两个Worker, 在单个循环里面迭代所有Worker,再将终止信息发送给通道后,直接调用join,<br>
- {: \- r7 W6 ]9 A我们预期是第一个worker要收到消息,并且等他执行完。当情况可能是第二个worker获取到了消息,第一个worker没有获取到,那接下来的join就会阻塞造成死锁。</p>
8 Q- b2 C9 m6 u: v<p><strong>注意到没有,Worker是被包装在Option内的</strong>,这里有两个点需要注意</p>
. K5 c% N7 w/ B1 G y<ol>
$ l" Q' s% y" ?& x+ q, J) n<li>t.join 需要持有t的所有权</li>
0 Y8 q) |* I% t, N: o2 a; b<li>在我们这种情况下,self.workers只能作为引用被for循环迭代。</li>
& O, }) F3 c, ?4 U# V7 f/ c, V n</ol>
2 O( e+ u+ J4 d/ j* R1 }5 K6 t<p>这里考虑让Worker持有<code>Option<JoinHandle<()>></code>,后续可以通过在Option上调用take方法将Some变体的值移出来,并在原来的位置留下None变体。<br>
" u- j' h5 a1 E& z! y! V6 U- {" B, j换而言之,让运行中的worker持有Some的变体,清理worker时,可以使用None替换掉Some,从而让Worker失去可以运行的线程</p>
' a+ E: _1 ]: U; F+ q<pre><code>struct Worker where; C8 C# _$ W. s b: ?4 \
{
e! m, F) f0 K3 F3 Z" Y% ~7 r/ y _id: usize,
) t: O0 `5 T" F) m# e7 g t: Option<JoinHandle<()>>,
* M; N) s4 t# I2 l% q1 k}
& Z2 g X- J. t. ~# Z; ?" ^</code></pre>& d! s" R+ N, Z
<h1 id="要点总结">要点总结</h1>
/ S+ t0 I6 p! ` g& f9 b! t<ul>
% s/ C5 D) l( J8 \- ?" R' Z5 L& M$ @<li>Mutex依赖于生命周期管理锁的释放,使用的时候需要注意是否逾期持有锁</li>
3 b4 g: Q( ?1 m! ~<li><code>Vec<Option<T>></code> 可以解决某些情况下需要T所有权的场景</li>3 n' o2 e, n* W% T, U
</ul>' b% @$ x5 Q9 S- X2 y" [3 e: M0 |+ t
<h1 id="完整代码">完整代码</h1>4 t4 R" m. I" S3 X
<pre><code>use std::thread::{self, JoinHandle};: @5 Y' X. V. K. o0 ^
use std::sync::{Arc, mpsc, Mutex};% b* D* K3 Q- p5 e2 Z! u$ h
* C. _: \1 `$ c0 r8 e9 }9 ~5 q3 t# R: ~8 X$ g# s E
type Job = Box<dyn FnOnce() + 'static + Send>;# i9 f- \( I' I5 z% U0 g
enum Message {
8 U/ d" v: H6 g; V5 n/ A6 p ByeBye,
+ ^2 ]" S/ W3 d. e" c2 t$ H. t NewJob(Job),
" k* R# i& y( D& B. R& n+ T}
, _/ i' F7 _8 p3 \0 B) O0 B
' m) q9 D/ h5 z# |# u8 estruct Worker where
- D$ o+ B2 j" H* K6 c{% G/ w! c* h- o; G
_id: usize,$ ?' t- B: I* M7 Y" n
t: Option<JoinHandle<()>>,
. R ^0 K* I8 n) X; K+ N, B; }}$ m8 T) D2 @5 r r
: x3 p2 V$ _5 ~1 b8 m: v* b
impl Worker
" D& D/ x6 J' ~1 g* _{
1 Y. Y8 }6 u3 m1 X fn new(id: usize, receiver: Arc::<Mutex<mpsc::Receiver<Message>>>) -> Worker {3 D# N. I$ Y, v* b3 ] V" e
let t = thread::spawn( move || {/ `$ I0 K* X, J/ C3 R
loop {) f2 C1 Z) O' n# E. V$ C
let message = receiver.lock().unwrap().recv().unwrap(); {2 D( U9 g& D8 c
match message {7 R3 _- `9 ?) D. c
Message::NewJob(job) => {
# \: @& ? {% J2 p: | println!("do job from worker[{}]", id);
( }. X5 F, k& t job();
, F, F8 |0 @; n4 g$ R },
; X w( o: Q- ^- U" T Message::ByeBye => {
7 M+ i5 A& n/ ?) q% t9 w. ? println!("ByeBye from worker[{}]", id);8 u* O0 v, V9 S4 n/ d# |1 @
break) S3 w$ J2 u2 q
},. E! D: }& Y3 ?6 U
}
5 F4 j6 G; o$ y }7 D# Z: T8 n3 H7 a2 X1 F: x- |0 B
});
) o/ L1 Z5 h3 S% n5 M
* v6 q3 X! e S- |( @) U6 N2 U Worker {
& z. ~3 y$ ^$ D$ ^0 B3 m5 e _id: id,: b( I0 o5 C! H* ?; b" |; v2 s1 |8 i
t: Some(t),
9 B. g( o7 i+ \ }* ?; ^( J, X2 K$ R$ L% |
}
% w8 E* N4 ]8 X* w, b+ q, v1 q- G}; K, o M# Y* c8 Q0 i6 ?/ }$ J
8 Y1 N( H/ a/ Tpub struct Pool {2 y( T% @6 ~6 r( m
workers: Vec<Worker>,& U. O% L# i* K# _( k4 R
max_workers: usize,2 i$ ^5 E Y. r* V5 r7 E a
sender: mpsc::Sender<Message>
S9 l) T e/ ?4 j" f}
% j+ Q" I6 j% V' n; M( |6 M9 H5 ~0 \1 ]# P) A
impl Pool where { E3 t* C; S5 M! R" V) m) o
pub fn new(max_workers: usize) -> Pool {
; F- G- N8 C- a3 E$ ^ g8 f1 _ if max_workers == 0 {
& J7 _' e5 Q" Z) {: ~ panic!("max_workers must be greater than zero!")
, Q) e: D* c3 \9 Y6 S3 l% g) I" m }* A+ h7 n7 w4 X& M3 Q1 e2 h; [
let (tx, rx) = mpsc::channel();
1 e3 c7 H2 X) l5 R
6 Y2 ^4 f2 e7 T' O2 T let mut workers = Vec::with_capacity(max_workers);& G8 q' B2 [' N/ H) t1 w% T- k
let receiver = Arc::new(Mutex::new(rx));8 s& s2 N9 B/ F0 [, h1 d2 y1 Y5 c
for i in 0..max_workers {3 J5 t% |, J: F9 ]9 l+ U9 p6 X
workers.push(Worker::new(i, Arc::clone(&receiver)));
9 \0 v2 W% Z9 y* g T& T* \ p7 c% D/ F }; [4 M6 |( y" |/ e5 G
6 a+ P# J f ^" a2 X* Q Pool { workers: workers, max_workers: max_workers, sender: tx }
7 ~. C: ?# ] y) ]- T( E ^. f }" f; \' ~& i5 i# Z5 E! n h7 |
0 _& e% r& N6 \/ R @) {- N
pub fn execute<F>(&self, f:F) where F: FnOnce() + 'static + Send
& r3 ?2 k3 v- ]+ U4 Y; e {
( F( W7 T q+ R/ p d. w+ f0 z; u# M: `
let job = Message::NewJob(Box::new(f));
7 ^3 M: z) J5 ?- T self.sender.send(job).unwrap();
! U+ p: W% Y- N& @+ X9 H s" ^ }
! n: [5 Y( b8 |' @% {3 {6 W}
$ A, p8 B$ l; `
9 x. S) ?% P: a2 Nimpl Drop for Pool {. u8 i8 X# s& a( T0 t$ D3 Z
fn drop(&mut self) {
# f3 P1 V) O7 n. Z+ M6 p3 a) G& k for _ in 0..self.max_workers {9 H; i. x5 }5 j/ G# b& l
self.sender.send(Message::ByeBye).unwrap();
9 u5 y4 g0 M/ W7 G }
8 v' d' M% m+ J& E+ M, b- C7 m for w in self.workers {
2 k: v0 l% @; B if let Some(t) = w.t.take() {
4 o& \: Z* I( _( g! F4 N0 d t.join().unwrap();
6 V0 G, j8 B" P* Y, K" d }
1 j/ U: x; {( C$ Z0 U }
$ L' v+ c" ~* T' \5 L% x* R }" W0 h- ^( U* U
}
1 P, E6 b& v! d
3 ^6 v }/ Z2 p
( ~4 k% |8 t; o8 l C! J& G! q) m5 g#[cfg(test)]
) ?3 X2 k2 _/ ?mod tests {4 {# R+ ], ]) F
use super::*;6 g0 v2 J, N* _1 \* z7 k0 N" L
#[test]) Y8 \' e F9 [& y: N( d
fn it_works() {% u _5 M7 W N5 ]4 T! d
let p = Pool::new(4);' l$ C, a: A, ]7 l/ j
p.execute(|| println!("do new job1"));% ~( f9 {+ T: Y- O' J
p.execute(|| println!("do new job2"));5 t: r! m$ M# g+ d* q8 Q( K
p.execute(|| println!("do new job3"));0 v# L$ j1 B" f; F
p.execute(|| println!("do new job4"));
+ L' j4 \, n6 @6 I }$ Z- c& _ d3 S3 g: @. _2 K( L
}
/ c6 M+ g1 `/ }* r</code></pre>- p( y; C9 B" L1 h
: U, Y- a$ @0 J
|
|