|
|
; A1 V" S# @) M- T$ S9 X5 j% h8 R
<h1 id="如何实现一个线程池">如何实现一个线程池</h1>7 O0 |7 q, l. y5 r5 P+ l
<p>线程池:一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。 例如,对于计算密集型任务,线程数一般取cpu数量+2比较合适,线程数过多会导致额外的线程切换开销。</p>
* l/ V" x( U* R' ?' ?; |( z<p>如何定义线程池Pool呢,首先最大线程数量肯定要作为线程池的一个属性,并且在new Pool时创建指定的线程。</p>5 Y$ [; m4 c; `
<p>线程池Pool</p>) L# @7 o% F- U J9 ~0 w
<pre><code>pub struct Pool {
" E" F* ]$ P1 ], f$ K& t5 N max_workers: usize, // 定义最大线程数* l7 ^! D' u5 {
}1 x) v2 ]& a" }" G8 [$ A% B
$ {& ^7 q) b9 N5 P8 b1 D& \
impl Pool {) ]& }9 F5 H: L3 q/ U
fn new(max_workers: usize) -> Pool {}2 G; U9 L$ s) t, A: @" _5 E
fn execute<F>(&self, f:F) where F: FnOnce() + 'static + Send {}
1 F$ h* K r# R6 W3 [}
9 G3 L2 x& V6 g$ l0 \ E, ?/ z1 ?0 l/ P6 Q7 j/ ^
</code></pre>0 A3 u9 c3 K( ` b
<p>用<code>execute</code>来执行任务,<code>F: FnOnce() + 'static + Send</code> 是使用thread::spawn线程执行需要满足的trait, 代表F是一个能在线程里执行的闭包函数。</p>
& V! j& d; B0 Q$ B<p>另一点自然而然会想到在Pool添加一个线程数组, 这个线程数组就是用来执行任务的。比如<code>Vec<Thread></code> balabala。这里的线程是活的,是一个个不断接受任务然后执行的实体。<br> D( d6 ~) Q4 L7 ~
可以看作在一个线程里不断执行获取任务并执行的Worker。</p>
8 T2 l. A: }' T0 C8 `7 k7 _3 `9 j<pre><code>struct Worker where+ J$ x9 ]+ h7 x! x, y( f* C Y
{
, V! J" R) h2 M _id: usize, // worker 编号. h: A( @8 ?. X" P l m# z
}3 U v4 n- l( T# t( }8 f# r* Y0 ^
</code></pre>
8 F0 t3 f6 I) H<p>要怎么把任务发送给Worker执行呢?mpsc(multi producer single consumer) 多生产者单消费者可以满足我们的需求,<code>let (tx, rx) = mpsc::channel()</code> 可以获取到一对发送端和接收端。<br>7 j" p# Q, }) P' ^) @5 I
把发送端添加到Pool里面,把接收端添加到Worker里面。Pool通过channel将任务发送给多个worker消费执行。</p>
a! }# q+ W1 g9 T" E, K<p><strong>这里有一点需要特别注意,channel的接收端receiver需要安全的在多个线程间共享</strong>,因此需要用<code>Arc<Mutex::<T>></code>来包裹起来,也就是用锁来解决并发冲突。</p>& S, D/ B( q% P& `) E7 l# b
<p>Pool的完整定义</p>2 Y; [- C8 }9 q+ A6 L8 P8 _9 g% B0 s
<pre><code>pub struct Pool {3 e% f8 B/ {! |! j7 Q+ H1 M2 L5 Y
workers: Vec<Worker>,
% r3 A& m' @& z; W1 V2 b# }) K max_workers: usize,
# e! p, f- x9 {- Y& K! w sender: mpsc::Sender<Message>" P% I- _# u9 V Z- U
}
# @4 O- q' Y$ u3 x. l7 E g</code></pre>
( x3 a% d7 [6 c( @. j) F: ?<p>该是时候定义我们要发给Worker的消息Message了<br>4 f. h1 i' }; r/ R" M) J5 d( ~
定义如下的枚举值</p>% ^$ P( c$ c/ V; H8 D: r
<pre><code>type Job = Box<dyn FnOnce() + 'static + Send>;
1 m% i& c3 b2 Fenum Message {
3 H# Y& J9 o& r/ u ByeBye,
' v2 }# D( m% w" M NewJob(Job),
7 U* Z+ W* c2 G& ~* t1 V0 b}
( @5 V4 ]9 z3 ]</code></pre>
8 Z4 d' S5 }4 Y<p>Job是一个要发送给Worker执行的闭包函数,这里ByeBye用来通知Worker可以终止当前的执行,退出线程。</p>
2 W* W# N4 o0 H: ?' Z4 z<p>只剩下实现Worker和Pool的具体逻辑了。</p>
3 X% L J% A g1 ~" v6 |9 {7 M* K<p>Worker的实现</p>
. w3 j, o2 `; C, g; P1 W+ [: @; U9 q<pre><code>impl Worker
# s# s; K$ \4 k# Q) m M{! s2 b( V- x" O# W u; ^# q7 m
fn new(id: usize, receiver: Arc::<Mutex<mpsc::Receiver<Message>>>) -> Worker {$ b5 f) W @4 l4 O+ `9 l) c, E
let t = thread::spawn( move || {# d/ ?- J0 t! ]& w2 S- H
loop {
, [/ u, r) d' @+ X, B4 a7 `" P2 U let receiver = receiver.lock().unwrap();/ R) Q# T# L# B4 _3 [
let message= receiver.recv().unwrap();5 }7 f3 X8 O. n
match message {
+ U* @7 n! o1 |1 P) p" o Message::NewJob(job) => {) e2 \9 ]! U N* d, j. a5 D% ~
println!("do job from worker[{}]", id);- B0 C# W- L; L- a
job();. O+ R7 _: P# M$ \6 a, e
},! ^. [" s8 K$ j4 O G2 [* ~
Message::ByeBye => {& W: \2 I6 D2 y* Z
println!("ByeBye from worker[{}]", id);* r) {! V5 |# L& R+ N& X" i
break/ c; F3 ?9 R4 c+ E" `8 k
},% b2 y8 C/ ^; g3 ]: C+ E7 [
}
* x9 @ B! X6 ^/ J- P( O0 @ }
8 W2 S3 ?: P" C) ?; e- ]. n });
( f ?" }$ o. e/ y% Q3 g/ B" Q1 {# A9 ]6 \
Worker {% h) V$ I3 m5 k( f, z1 V
_id: id,; Y0 Y3 l G' j: f: U$ E5 a' C
t: Some(t),) v4 h9 R7 @7 `/ ^7 A7 _
}
: }9 b5 z; B i, J' H' W* _4 f }
" {& r' a A2 \. B}
2 T2 g: x% y8 `* ]- M* i</code></pre>: W# m5 c5 k5 q/ c5 m, l
<p><strong>let message = receiver.lock().unwrap().recv().unwrap();</strong> 这里获取锁后从receiver获取到消息体,然后let message结束后rust的生命周期会自动释放掉锁。<br>
7 `; Q& u! _; W( Y但如果写成</p>
7 U4 Z/ | B9 Q# m4 W! g8 M: Y4 y<pre><code>while let message = receiver.lock().unwrap().recv().unwrap() {. K8 g% y2 T1 r% O1 U
};
) f, \( m+ j2 G# k5 k% E2 r</code></pre># l Q; r6 S2 t3 E
<p>while let 后面整个括号都是一个作用域,要在这个作用域结束后,锁才会释放,比上面let message要锁定久时间。<br>
0 m, [: \8 N6 }2 A5 |8 Mrust的mutex锁没有对应的unlock方法,由mutex的生命周期管理。</p>! d5 v( I$ ?# y
<p>我们给Pool实现<code>Drop</code> trait, 让Pool被销毁时,自动暂停掉worker线程的执行。</p>
) s: p {3 k, ^6 u# a) \1 |0 I: V<pre><code>impl Drop for Pool {9 {7 i5 M! Q6 l
fn drop(&mut self) {7 w' F7 {* u+ y v. r
for _ in 0..self.max_workers {
# y: L. A& |; p9 v8 k( s7 Q self.sender.send(Message::ByeBye).unwrap();
5 n- y5 E0 m- P' `+ L }
' @6 i: S& f7 ], ~7 B9 L6 Q, _) S: n/ v for w in self.workers.iter_mut() {
1 V- v+ G j# N. p1 s if let Some(t) = w.t.take() {* E& b8 t, D$ h3 Q7 G
t.join().unwrap();
7 T# D( r+ Y% [- o* x, n, I+ z' T }
7 p" p# x$ E8 d3 u( B0 j }8 X5 @+ N% W5 ^% s& a0 U
}+ ^5 p3 i; R" ?. f( U
}
# P/ P1 X0 {+ ]6 a; J: q- y* Q. @/ f, X( H
</code></pre>% y$ p+ r8 Z" J; C
<p><strong>drop方法里面用了两个循环</strong>,而不是在一个循环里做完两件事?</p>$ T+ p# G* U& P1 Y# ~
<pre><code>for w in self.workers.iter_mut() {5 l8 H* X8 \0 T
if let Some(t) = w.t.take() {5 |7 c* v9 ~7 k; }; d% L8 J
self.sender.send(Message::ByeBye).unwrap();
+ r) l8 D0 q6 I$ {. z t.join().unwrap();
7 ~ j- L& T5 b' N( {- h) X. Z& ? }
# A9 I" V& l* t- f, V# u}2 p8 Q; U* U6 L5 ~: t ~
' p* b% \2 N: j6 M* K</code></pre>
J X. l% x& V* f( E<p>这里面隐藏了一个会造成死锁的陷阱,比如两个Worker, 在单个循环里面迭代所有Worker,再将终止信息发送给通道后,直接调用join,<br>. g: t3 I) n% r$ h9 t/ V
我们预期是第一个worker要收到消息,并且等他执行完。当情况可能是第二个worker获取到了消息,第一个worker没有获取到,那接下来的join就会阻塞造成死锁。</p>2 G; c2 ^1 t+ Q" K6 B( ^
<p><strong>注意到没有,Worker是被包装在Option内的</strong>,这里有两个点需要注意</p>
. y( f! B) v7 z8 _0 z2 c<ol>/ [8 w! [0 E; }2 }1 e" u4 G
<li>t.join 需要持有t的所有权</li>
. J3 R: m. l# ]& t# G<li>在我们这种情况下,self.workers只能作为引用被for循环迭代。</li>
s3 t' A [+ a9 t5 i [; ^; F n</ol>
5 d( ]3 @% m. {8 Z# \7 i7 _; Z<p>这里考虑让Worker持有<code>Option<JoinHandle<()>></code>,后续可以通过在Option上调用take方法将Some变体的值移出来,并在原来的位置留下None变体。<br>% Q, w% h1 n9 [- u4 b6 P9 X1 X
换而言之,让运行中的worker持有Some的变体,清理worker时,可以使用None替换掉Some,从而让Worker失去可以运行的线程</p>
( x3 F& h' Z7 G; v<pre><code>struct Worker where& r( K% ] _5 Q3 S$ L
{) l; g, y* Z5 z q
_id: usize,7 k1 i1 t7 v! X: e4 G
t: Option<JoinHandle<()>>,
" Z9 u* G! a3 r5 F. J% J! g+ c}
/ H, O2 a, C$ d' \8 l</code></pre>
3 k5 x+ f% V5 M5 z6 r<h1 id="要点总结">要点总结</h1> V4 f5 n' V( H
<ul>
- N4 z; C: ^6 c7 R# W _. G<li>Mutex依赖于生命周期管理锁的释放,使用的时候需要注意是否逾期持有锁</li>
; ~) t) P$ e+ Y7 B8 y# ?) l3 c) S<li><code>Vec<Option<T>></code> 可以解决某些情况下需要T所有权的场景</li>
& e3 I" H/ d) V3 u- `</ul>* Y8 k" e3 Q5 }9 c# Y1 G
<h1 id="完整代码">完整代码</h1>
- ]6 W( G$ S [& |+ O5 D+ j<pre><code>use std::thread::{self, JoinHandle};$ i" {8 f9 z4 _1 i
use std::sync::{Arc, mpsc, Mutex};1 _9 j% j- N) k7 `3 \9 C0 P
- g# L2 @( p+ w+ o4 a. o3 z& i
: ?) P9 q1 w( K; F; u- }* B& utype Job = Box<dyn FnOnce() + 'static + Send>;
: R5 `1 a( y: ~enum Message {
, d( r |1 S/ l$ s ByeBye,
1 d; X# U& p5 j; d; i NewJob(Job),
( U/ J3 ]! p6 W9 o/ f) h2 y}$ o* X6 a& m: X( X; v% \
6 }: _8 ^* z; ?0 U' wstruct Worker where
4 v" i: C' m/ _ ]{
# j, g1 _7 u2 _9 z8 ] _id: usize,
* M. C. Z5 D+ _" u* O% R2 M: n5 s t: Option<JoinHandle<()>>,& W% w! \. l0 d/ ?! P
}) f, l* s& y$ {* F$ Y
6 M. ^: p4 Z6 }4 v: z. W
impl Worker0 M- d9 |! N3 I' C" ]. k. F7 k
{" S i! h- p- e! I. ?2 Y
fn new(id: usize, receiver: Arc::<Mutex<mpsc::Receiver<Message>>>) -> Worker {/ o/ o, D, z, p; Y* }4 h% r; u
let t = thread::spawn( move || {
3 o& o3 R' R7 ^. X/ J: t loop {
4 L) Z) s+ { s; d4 m5 O let message = receiver.lock().unwrap().recv().unwrap();
' R/ v" ^2 C% d d( \( T% ` match message {* P+ Z1 i6 E) ^; n+ H
Message::NewJob(job) => {
3 e! C) W6 A' a6 N- |: ?, y. l println!("do job from worker[{}]", id);) i& _2 T$ z2 c0 p+ ~
job();
/ d l( A" s) k; ^6 |: x6 t },
, @- i$ k9 g" W. R" A' F$ u2 q0 W* U Message::ByeBye => {
P L/ o0 q) J# ^: H0 f; K/ O1 j println!("ByeBye from worker[{}]", id);5 P8 Z# D; ~% A$ u6 s' {1 B; F# K
break
' B- m8 r' e- _6 O3 l2 Q },
9 ]" ?) i- k: q- V; }0 ` }
8 c7 g- Y; o) n- R- ` }& Z! h& o% ?9 ` i+ B) F9 X
});4 F5 O2 \1 ~. r" N( M
3 t [" D) p2 w( Z( u# n
Worker {" W$ }) p$ |* s; b# t9 L6 b
_id: id,
5 K1 ]# l7 I( A- w. [0 ] t: Some(t),& o, M ?+ o- o1 }- ^
}
: }/ R# D& c. f4 H }
# M; p3 F# B2 k0 p* ?* V}, n. z3 I7 l# Q$ Z
; a4 l% U- U4 B9 {% A/ W3 T
pub struct Pool {
) x) H b' {* q* P workers: Vec<Worker>,0 ^' m/ r4 t# N6 L" m- c
max_workers: usize,
8 G; y4 a' J. S, K3 G+ y* X sender: mpsc::Sender<Message>
5 O8 b: @0 r4 U F" U}
& K$ v+ F! L% b0 R1 t' N0 {2 o
a& }% t* G. Nimpl Pool where {
8 I- D: y4 F1 f( T& ]9 z pub fn new(max_workers: usize) -> Pool {7 L( L+ d8 |5 a/ ^! h. _
if max_workers == 0 {
) u6 Z8 f! u" h$ s: F( q) K panic!("max_workers must be greater than zero!")2 g4 l2 W$ p8 q$ F
}3 A% c6 v. H! {8 @/ s
let (tx, rx) = mpsc::channel();* @- J- E9 w1 g
9 E: Z1 q# m) k- n let mut workers = Vec::with_capacity(max_workers);
4 n4 a" _( w7 T4 b, z4 i let receiver = Arc::new(Mutex::new(rx));7 X4 l6 ~; y6 I* X) D
for i in 0..max_workers {
7 n) J! s( ]; B! \' v workers.push(Worker::new(i, Arc::clone(&receiver)));& {" G# j5 t1 Y- w6 o
}$ C0 D+ f6 a9 S+ j; d- G
0 Q# b# \) F6 H
Pool { workers: workers, max_workers: max_workers, sender: tx }
5 V J' j1 [8 \4 A& F! N5 ` }% T, B. g' n$ Y A# C
$ i, s; ]! c* w& h7 b
pub fn execute<F>(&self, f:F) where F: FnOnce() + 'static + Send
. k. [# j3 C1 I$ w/ x7 A; H8 d3 g {1 ^$ ]) c; r* n
- V7 J, @' W/ Y; r let job = Message::NewJob(Box::new(f));
/ E: w" K6 C( m5 B self.sender.send(job).unwrap();% P% f- c' }( h: G ?" Q7 }
}7 d" x% d! d2 [) z+ o
}4 o8 a9 s( C; f- S2 Q
9 N. ~% b8 X7 r9 U. G0 w1 o4 h
impl Drop for Pool {$ A$ _ b, G3 A. T
fn drop(&mut self) {: E6 ]# @, x0 _3 g
for _ in 0..self.max_workers {
- n7 G8 H. M$ ] self.sender.send(Message::ByeBye).unwrap();# Q4 g4 s$ ^/ g9 t: F
}) p, s7 G' z9 g
for w in self.workers {& T' i+ z! {3 z2 X$ `- D
if let Some(t) = w.t.take() {
$ t5 ?3 T7 z9 G0 [- g* A$ y t.join().unwrap();
. S5 z$ U, X/ p2 ?7 t3 O& ` }
3 [5 w4 r4 C4 C- M. d* O/ l }! ?7 Z6 u5 s+ B, \- i" b" d
}
, W- X _5 o/ \/ \! M& L% G4 c}
: q+ e6 z" s+ J2 \1 o" Y/ I
7 M, b0 P2 w ^. j% r$ I* m# l q4 \
#[cfg(test)]
9 C/ x' d7 n1 W5 m! Tmod tests {
* S8 q) l+ e3 R$ u( s use super::*;6 q+ o4 Z6 v* ~ t2 Z: K/ d; L
#[test]
0 C1 X; y9 E/ _/ { fn it_works() {3 w& i x% X/ ^( \# ?4 P
let p = Pool::new(4);$ U* F* ?' q! P" T% R: D
p.execute(|| println!("do new job1"));. R, i2 N2 j) o% p1 M
p.execute(|| println!("do new job2"));
1 n/ e c1 q1 b4 \1 W ]9 a& l3 j p.execute(|| println!("do new job3"));
$ U4 L! E) ]/ c0 @ p.execute(|| println!("do new job4"));8 J( w9 r6 q- G8 b7 Y
}7 e5 s" X8 @& Q2 l0 d! y* O
}
5 B0 S5 ?8 R7 X: ]% o. D0 H</code></pre>
1 y4 M* c& `3 C
. i+ G+ h; e& f; ~- m. y# q |
|