|
|
5 [) ]7 |( I8 E$ m) e! g9 b
<h1 id="如何实现一个线程池">如何实现一个线程池</h1>
/ h/ i# K0 X, C$ {) r) y! a" ?<p>线程池:一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。 例如,对于计算密集型任务,线程数一般取cpu数量+2比较合适,线程数过多会导致额外的线程切换开销。</p>& C2 A3 O9 p! C# ~' D2 R
<p>如何定义线程池Pool呢,首先最大线程数量肯定要作为线程池的一个属性,并且在new Pool时创建指定的线程。</p>6 y) I) Q1 V' ^9 N) {0 z% @+ _- J
<p>线程池Pool</p>5 x3 t' z4 e" j) k6 n. |' j% k) n
<pre><code>pub struct Pool {
. G# Y$ U; G: M5 s% V7 Y max_workers: usize, // 定义最大线程数% V( S- N: r+ c# o& o: e, j: \
}
4 u, d: i) V* H+ c' c! W( ^# F% X+ _2 M- i6 n0 P' x9 Z
impl Pool {
9 N* L7 `$ a/ y' ~; f, x fn new(max_workers: usize) -> Pool {}
9 W2 X2 H/ I2 ]0 [0 q7 U fn execute<F>(&self, f:F) where F: FnOnce() + 'static + Send {}$ A( L6 _( c- }9 n9 W# F8 k' e
}
' {0 Y, ?7 H+ g8 U$ P4 z. S" k _0 ~8 M- ^1 O- ]
</code></pre>4 H9 q/ i4 |9 x" `; g7 H
<p>用<code>execute</code>来执行任务,<code>F: FnOnce() + 'static + Send</code> 是使用thread::spawn线程执行需要满足的trait, 代表F是一个能在线程里执行的闭包函数。</p>; P2 C3 V4 x* X" E3 ~! D6 T
<p>另一点自然而然会想到在Pool添加一个线程数组, 这个线程数组就是用来执行任务的。比如<code>Vec<Thread></code> balabala。这里的线程是活的,是一个个不断接受任务然后执行的实体。<br>
: W# ~$ O) s% |可以看作在一个线程里不断执行获取任务并执行的Worker。</p>
5 `( Q3 U6 O& t( ~, ~7 Q3 I<pre><code>struct Worker where" ? C8 U8 }" x& p: {" X8 i7 f
{
5 J6 R. t6 [5 O1 }, ]* C _id: usize, // worker 编号 S/ G8 v2 Q5 E! _) E- ~, L* m
}
' f/ D3 \0 b7 I</code></pre>
5 z8 }/ X$ r3 P+ i<p>要怎么把任务发送给Worker执行呢?mpsc(multi producer single consumer) 多生产者单消费者可以满足我们的需求,<code>let (tx, rx) = mpsc::channel()</code> 可以获取到一对发送端和接收端。<br> f. B) t& h" h
把发送端添加到Pool里面,把接收端添加到Worker里面。Pool通过channel将任务发送给多个worker消费执行。</p>& Y# t4 ^. W" [- n2 v. y/ w8 H x
<p><strong>这里有一点需要特别注意,channel的接收端receiver需要安全的在多个线程间共享</strong>,因此需要用<code>Arc<Mutex::<T>></code>来包裹起来,也就是用锁来解决并发冲突。</p>
# W! c" g: ?- k) u/ b7 u! w' n<p>Pool的完整定义</p>
- ]* O& U2 B; I<pre><code>pub struct Pool {. A6 o- z. T `
workers: Vec<Worker>, _+ U+ E6 G$ V3 A/ @ r7 }! D( B) |
max_workers: usize,0 {0 z. c4 p- T2 i2 w% ?
sender: mpsc::Sender<Message>
. J/ o( V) i) J) B) }}& N0 M1 T5 l9 a! D9 D. c/ e; x) ^
</code></pre>0 ^! {2 C: n/ ~
<p>该是时候定义我们要发给Worker的消息Message了<br>
) T! z2 Q) s8 N& o- W( I定义如下的枚举值</p>8 S0 ~& C! O/ Y* H* P
<pre><code>type Job = Box<dyn FnOnce() + 'static + Send>;
% Z2 T B" e( X# k7 ~enum Message {
( D" U7 `3 d' k ByeBye,, ?0 U1 y# O4 U) G$ d2 h
NewJob(Job),
: ?' ]' ]! u# y* B}
) y) \( }( q! g2 ~# N</code></pre>9 e d: _8 w# g) E Z% v
<p>Job是一个要发送给Worker执行的闭包函数,这里ByeBye用来通知Worker可以终止当前的执行,退出线程。</p>
& `5 F% h$ ]" L2 Y9 J$ R<p>只剩下实现Worker和Pool的具体逻辑了。</p>) H$ @) S$ f4 e" S3 [% H( x# d% R
<p>Worker的实现</p>1 t* y# [0 D3 o2 F3 h
<pre><code>impl Worker
( L U& q2 ?7 b8 l4 ]{
& j8 B0 L: M1 O: N B4 f fn new(id: usize, receiver: Arc::<Mutex<mpsc::Receiver<Message>>>) -> Worker {$ p- w+ }: v6 r% T1 {! K9 S) B
let t = thread::spawn( move || {6 x C) N! t' u
loop {" L- c$ V: x* c% y0 O: h
let receiver = receiver.lock().unwrap();
6 w5 t+ r. A& P# c/ K! R) c let message= receiver.recv().unwrap();+ U' U3 r5 @$ l3 o4 {( T8 a
match message {5 E# [4 c( R3 B5 E# F4 x0 Q
Message::NewJob(job) => {
6 E1 ^( i1 {5 @2 i5 h! \" }" M- }: Y println!("do job from worker[{}]", id);3 V! u+ Q% F4 B' l3 N! E# J
job();% T/ o& \3 L8 \! S
},1 H% S0 n( T2 `# A
Message::ByeBye => {( U( r: W, ^, u q/ F
println!("ByeBye from worker[{}]", id);+ H/ O! V9 _4 Q7 d
break. X4 ~7 I, h% h7 J! a O) K. ?
},
( w3 k7 r2 y7 b: U2 n V# ] }
! e& j9 w. m, J4 M8 q }9 O9 P" e" ?1 A$ k
});
2 R" j( ?- k* u0 I- B
0 {6 j# I0 ^4 r4 N6 O/ A8 V+ N) M% a Worker {
; m( G. g% I; F" [1 }: T _id: id,
. ]& o2 j; Y) h+ m7 @) }+ L- q% ^: t t: Some(t),
; J3 a |, {9 e1 D! B! Y0 I }+ s: g# C, K4 m) k% V; y
}
2 f) [' C; Z& c* h0 T6 T}
) L L5 D8 B+ F8 P9 L3 G4 A</code></pre>8 S+ P5 j* c1 i9 }6 K
<p><strong>let message = receiver.lock().unwrap().recv().unwrap();</strong> 这里获取锁后从receiver获取到消息体,然后let message结束后rust的生命周期会自动释放掉锁。<br>
2 M2 J& ?1 S7 ~1 c& B但如果写成</p>! U" }& K& j1 z+ X) O( E$ d
<pre><code>while let message = receiver.lock().unwrap().recv().unwrap() {
+ Z4 Z+ |& C1 y+ F}; b; e/ y" v. q# R2 \
</code></pre>0 H% {: }9 r/ m* I$ \
<p>while let 后面整个括号都是一个作用域,要在这个作用域结束后,锁才会释放,比上面let message要锁定久时间。<br>; H9 B* C3 y% c3 m5 G
rust的mutex锁没有对应的unlock方法,由mutex的生命周期管理。</p>
% m6 n/ a( N) K! U1 G# E3 q<p>我们给Pool实现<code>Drop</code> trait, 让Pool被销毁时,自动暂停掉worker线程的执行。</p>
$ A, D0 d4 z6 J) E2 m, F<pre><code>impl Drop for Pool {
- _0 i4 X0 ^$ ? fn drop(&mut self) {
2 U0 `5 C0 U1 e# \1 v; y for _ in 0..self.max_workers {
1 a9 b* ^: w% B9 n self.sender.send(Message::ByeBye).unwrap();
! d& G: ^1 t1 D6 E }% c/ [' {. k4 D8 K0 \- h! d
for w in self.workers.iter_mut() {
! n4 @3 A1 c* K% M4 ~2 o& l if let Some(t) = w.t.take() {
. ]4 r% d' R! j' T t.join().unwrap();
2 B. ~4 T$ Z/ w+ ~3 t( e0 a$ U1 d1 z }
5 E" p9 R8 u j% k+ o3 }; X9 F }
3 I) y! H6 t2 n3 | }
* \- ~8 r1 `. K0 W: \* G}! J t. g0 @; v) w" k
2 x. @( @8 i E( c# H- k</code></pre>8 T$ H- J8 D+ ~* f$ \0 Q
<p><strong>drop方法里面用了两个循环</strong>,而不是在一个循环里做完两件事?</p> ~$ q4 q" N% O
<pre><code>for w in self.workers.iter_mut() {; W N! R" X: L8 Y3 g- \ D% `
if let Some(t) = w.t.take() {2 |9 Y8 c* |# p/ i
self.sender.send(Message::ByeBye).unwrap();
6 V3 d2 r& {" J6 g# k4 L t.join().unwrap();7 _: a) e8 Y/ h
}
( P7 T5 Q. D1 S# ^5 M, C9 D7 q}$ @! [* m& q( r$ p3 ?0 ?& r
9 e( J& }9 s- Q</code></pre>
/ t) r; u3 f2 n9 L/ L<p>这里面隐藏了一个会造成死锁的陷阱,比如两个Worker, 在单个循环里面迭代所有Worker,再将终止信息发送给通道后,直接调用join,<br>& m* F; [/ p$ @* b0 X# f, Y R: [
我们预期是第一个worker要收到消息,并且等他执行完。当情况可能是第二个worker获取到了消息,第一个worker没有获取到,那接下来的join就会阻塞造成死锁。</p>" f+ Q) K w T/ s
<p><strong>注意到没有,Worker是被包装在Option内的</strong>,这里有两个点需要注意</p>, _9 ?. {) ^8 T! n' G4 i
<ol>/ e8 U- K: N+ E% b% w- _) a+ l
<li>t.join 需要持有t的所有权</li>4 X+ y0 O- |- P8 [
<li>在我们这种情况下,self.workers只能作为引用被for循环迭代。</li>
" L. x6 q/ a8 j7 x</ol>
T4 f$ @+ _; L: ?: u<p>这里考虑让Worker持有<code>Option<JoinHandle<()>></code>,后续可以通过在Option上调用take方法将Some变体的值移出来,并在原来的位置留下None变体。<br>: l9 k T, i1 |1 Z4 I. o
换而言之,让运行中的worker持有Some的变体,清理worker时,可以使用None替换掉Some,从而让Worker失去可以运行的线程</p>
) ]! m5 M# d- k! O! {# H! O<pre><code>struct Worker where
7 q, C$ I r9 o* C* K{8 D& x+ j" @2 J7 o4 c1 }
_id: usize,
5 R3 S4 |$ B6 D+ } t: Option<JoinHandle<()>>,3 ^ `( t4 T& D7 O7 I
}
9 z+ N/ F& ?( I0 ?; x( s- t</code></pre>% p# W+ @' X9 Q+ L& I) N. N: W
<h1 id="要点总结">要点总结</h1>' r* T! v1 b" n( M3 ]
<ul>
6 e( L( b3 ~/ g8 I7 @# ]& f4 o( v; o<li>Mutex依赖于生命周期管理锁的释放,使用的时候需要注意是否逾期持有锁</li>. X# g3 M2 y( X4 l( g( _
<li><code>Vec<Option<T>></code> 可以解决某些情况下需要T所有权的场景</li>
! b+ S8 c2 ?6 R! P</ul>
5 W5 I7 \# ]1 Z/ \6 R: N<h1 id="完整代码">完整代码</h1>- \- k+ V- m. W8 Y' K0 k+ }
<pre><code>use std::thread::{self, JoinHandle};1 v2 N @0 H' g9 V3 |
use std::sync::{Arc, mpsc, Mutex}; n0 o; D. u' r! s# ]$ k) [* J+ J
# F$ o$ m' [. w& _4 k
0 S- h8 j9 g& Y3 Btype Job = Box<dyn FnOnce() + 'static + Send>;
* A8 O, g" K; T# t" v5 S; Nenum Message {
7 ~3 Z( F6 V7 o4 m0 B. b ByeBye,/ C3 d0 @# z2 u$ M ?
NewJob(Job),
. o( h( v3 R# S}
% `4 j+ e9 T7 M/ \) Q U/ N, I% L# e
struct Worker where
" @; ^5 t; ]; d3 c{# x+ b2 G( o8 M" o5 T) e' p
_id: usize,1 H$ s/ w7 m; o- J0 Y& ~' v$ I
t: Option<JoinHandle<()>>,
4 ?% R# Q( `7 [- \/ b5 @" V}
! j9 j6 z; h' V$ i, L( U' `! ^: n5 Q3 w1 Y3 u& z( @8 {- s
impl Worker0 [1 i9 v8 S* R# s
{
$ a, q( ]9 P* [- a, }1 S O- D fn new(id: usize, receiver: Arc::<Mutex<mpsc::Receiver<Message>>>) -> Worker {$ f# b7 J# z% y4 q U
let t = thread::spawn( move || {( V6 F7 U) e7 T6 J8 R- l
loop {
2 U- i, t1 d, y1 w( f& x5 L let message = receiver.lock().unwrap().recv().unwrap();
: K2 [6 \5 d, L |& W8 s match message {# F. L( ]2 z3 c3 Y$ C7 y
Message::NewJob(job) => {
, D7 b8 N( u- W+ c( P6 ?) a println!("do job from worker[{}]", id);
/ Q) o$ W& T/ m4 k. A- v5 X$ m job();5 Z! N" R6 d' l
},! `- E( d, j0 ]; e
Message::ByeBye => {
# K" R/ o* i' S, G2 ~ `# O println!("ByeBye from worker[{}]", id);/ j8 T5 x# J. c2 P9 Z4 i
break0 M9 W* Q4 {+ b7 t! h+ y
},* A& S) D# I# V6 i+ b
}
9 U: |4 |4 q9 G4 t; h8 Y" z5 h' n }. P1 @$ a( b* @. \5 Q0 `" J
});
2 L3 y' u& x! |' ?6 s# o+ x; b0 {9 o, k9 f* U/ Z: {
Worker {/ ]! y6 P: [2 b; x x, C/ y# ~
_id: id,$ L- C" X! G7 C% h, Q/ k
t: Some(t),* o1 R; x6 x* Z6 B9 G" U8 S/ A
}
[6 d# R3 R6 a4 m }
# R# n7 z) Y% o8 x, L! S}
# t v2 w2 d+ w% }+ i* q& D: Y+ T' ~- Q! s: k& Y
pub struct Pool {
; _) _, e+ z- b workers: Vec<Worker>,, B2 k: O7 `% ^) u; K' V, Z
max_workers: usize,
' S* f& Z' { R6 \ sender: mpsc::Sender<Message>
$ ?# _! F6 S3 O" y) k}
) d0 E5 i% R( b+ p6 M* P4 J, q$ F: N. I* x& ~; Z/ i: q
impl Pool where {
+ Z" a- [: E, _4 {3 Q- g pub fn new(max_workers: usize) -> Pool {, s3 N4 C3 {9 W# g
if max_workers == 0 {% c9 y$ z3 p" m: ^4 X5 \. h8 k
panic!("max_workers must be greater than zero!")
" k& U2 L0 Y ?" s; n% N* A2 \" F }
G0 y2 F- @- H2 n+ v/ t# ` let (tx, rx) = mpsc::channel();
0 k- ~ H( K! t3 \/ c* g% f( j( r$ ^
* d4 \- C9 p6 h. O1 s) U9 L. d let mut workers = Vec::with_capacity(max_workers);' J+ E0 d7 ?, f5 K7 j
let receiver = Arc::new(Mutex::new(rx));
/ y7 b2 _% T( x5 u5 P for i in 0..max_workers {- Z( f$ m3 V6 q& Q
workers.push(Worker::new(i, Arc::clone(&receiver)));2 P* y3 C- i/ e7 s* l5 z
}2 a5 S# b5 V( {( q- e! o* D
! |2 M6 ^- Z3 {- a( \- @ d
Pool { workers: workers, max_workers: max_workers, sender: tx }: L3 f+ h7 s) \4 R$ y/ _+ Q6 n
}
% v+ W" M7 k; V B
7 S7 n/ e6 a6 P# R) T pub fn execute<F>(&self, f:F) where F: FnOnce() + 'static + Send
2 V4 A- Z4 L5 Q2 ~* B4 a4 E. x {4 l1 }) h/ E7 Z$ K& h3 h8 K& c# C
& c$ C i# u7 M, `6 \ let job = Message::NewJob(Box::new(f));3 i0 P4 @2 Z2 s) ?8 W
self.sender.send(job).unwrap();# j% b' ^' l( }0 L% P3 v
}
9 r* O) v+ U% s5 F- a8 {# c( g; `+ h}
+ U! ~% N! }* j( r& v' [! g* f& n# Q4 e4 x$ }
impl Drop for Pool {2 F3 j6 i- k4 y5 Z. {: _
fn drop(&mut self) {& m, B; W0 M) W1 V- W' y
for _ in 0..self.max_workers {) i1 {9 r9 c/ I
self.sender.send(Message::ByeBye).unwrap();
! u4 o/ }- j% ? j% G }
2 w- @4 l7 y9 ]4 |8 \1 ^3 b for w in self.workers {$ V8 g& k6 K8 o3 n
if let Some(t) = w.t.take() {
6 g5 d+ W- u& [' d/ W# c0 ^5 e t.join().unwrap();
5 k& U3 V5 Z2 G+ R, ]( k9 x, X }: v2 R4 V' d, g+ G
}& p' T2 h1 V( t' {( D
}
5 m' A/ W0 b+ i' j' y}" K# i0 s1 L3 M$ N( C7 h
/ J/ w( f* x' o1 h, ^
$ s& v% s& ?) p5 n( V
#[cfg(test)]
: ~! _9 t! \7 M, ^mod tests {) A1 ?: ^5 E4 H( i
use super::*;* B/ ? v6 ?% U0 d b) f. q$ G9 r8 B
#[test] _% e; s+ T: O1 [0 {* Q
fn it_works() {
; c' E6 S5 ?2 n1 p- L! B! Q9 g let p = Pool::new(4);- |! S0 J5 R1 n2 Y( @& n( N
p.execute(|| println!("do new job1"));
& l; q; r+ X) C( m+ {7 }0 f p.execute(|| println!("do new job2"));
8 R/ C! }, C" @+ X' Y; {" }0 k0 K p.execute(|| println!("do new job3"));6 d' z9 s w S9 _* W
p.execute(|| println!("do new job4"));6 D) o E! C& U+ K6 P; C, M6 u
}% i! H9 D Q0 N4 {( }
}
" r. T5 O) q6 a7 G* R: \9 Y</code></pre>
7 G* e. Z: W/ x3 J- Y; Y5 c$ X% q4 r- i4 l- x& w2 s
|
|