|
|
) k2 J5 O$ @7 b& Z+ S<h1 id="如何实现一个线程池">如何实现一个线程池</h1>
' k" k; w# t* r. Q' u/ K9 x g<p>线程池:一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。 例如,对于计算密集型任务,线程数一般取cpu数量+2比较合适,线程数过多会导致额外的线程切换开销。</p>
% M" g; Y3 F& Q) a& O/ u<p>如何定义线程池Pool呢,首先最大线程数量肯定要作为线程池的一个属性,并且在new Pool时创建指定的线程。</p>
1 Y c, E# i" t3 ?<p>线程池Pool</p>3 O( \2 ^1 p8 O5 d. V+ I
<pre><code>pub struct Pool {, t1 E4 [, t- G2 I: d
max_workers: usize, // 定义最大线程数6 j' T% y/ Z. f. }/ N$ ^$ R
}% n$ L, R$ t* h% L# p/ }. ~
: O6 i8 w/ Q1 ?0 l$ Rimpl Pool {, f% o% T: e9 |# ^5 ~7 b
fn new(max_workers: usize) -> Pool {}! i, v+ h2 U" R: w
fn execute<F>(&self, f:F) where F: FnOnce() + 'static + Send {}
' p8 b0 v4 ~) @- x( n& J# s( y}3 e3 p' H; F& S/ Q4 V
% ^) k; B$ g9 Q) x+ F4 ~3 r/ f1 u</code></pre>5 l0 h: w$ |* k! G: f) a
<p>用<code>execute</code>来执行任务,<code>F: FnOnce() + 'static + Send</code> 是使用thread::spawn线程执行需要满足的trait, 代表F是一个能在线程里执行的闭包函数。</p>
4 o7 u( ?' H- b D4 W<p>另一点自然而然会想到在Pool添加一个线程数组, 这个线程数组就是用来执行任务的。比如<code>Vec<Thread></code> balabala。这里的线程是活的,是一个个不断接受任务然后执行的实体。<br>% B7 n/ j+ |! [& k
可以看作在一个线程里不断执行获取任务并执行的Worker。</p>: a; V; f4 v) V G
<pre><code>struct Worker where# l, K6 ?& L: J/ H% P
{) A" e( V' j, x
_id: usize, // worker 编号: x* s$ W- _! t5 `6 o! f
}
2 X9 X5 e4 {( \! n</code></pre>
/ k' c; {+ Q5 [3 t( D8 F% d- P<p>要怎么把任务发送给Worker执行呢?mpsc(multi producer single consumer) 多生产者单消费者可以满足我们的需求,<code>let (tx, rx) = mpsc::channel()</code> 可以获取到一对发送端和接收端。<br>2 Z+ G: M* G# v# [
把发送端添加到Pool里面,把接收端添加到Worker里面。Pool通过channel将任务发送给多个worker消费执行。</p>. y9 _7 U- V2 ^4 K f* @4 U
<p><strong>这里有一点需要特别注意,channel的接收端receiver需要安全的在多个线程间共享</strong>,因此需要用<code>Arc<Mutex::<T>></code>来包裹起来,也就是用锁来解决并发冲突。</p>( A+ Q, O' k) `- z
<p>Pool的完整定义</p>
3 X# Z. n# l' u4 D: |( I<pre><code>pub struct Pool {' n( a5 |- F* s k, r: N
workers: Vec<Worker>,8 D- v7 R0 ?* q. C9 r- `
max_workers: usize,
# X: L! Y) R7 j% B sender: mpsc::Sender<Message>9 k6 z4 h" Q O
}+ R/ ` g& N$ s* U
</code></pre>' ^' Y: Q+ u- \' _; l7 N4 w
<p>该是时候定义我们要发给Worker的消息Message了<br>
8 H( a. |7 ~3 X" P5 J' a8 U定义如下的枚举值</p>! X7 c. ~( z& r, A, P5 s3 B% j& P
<pre><code>type Job = Box<dyn FnOnce() + 'static + Send>;
" _6 K l2 }) G2 q: _4 Q, w! {enum Message {
2 i- X. X3 ~0 \ ByeBye,
* H8 [# @- X5 [+ Z. @ NewJob(Job),; T, a9 S9 o8 c. o+ Y
}
5 s) W/ @5 }: i( T3 \</code></pre>
& Q1 Y9 v2 }9 e4 E0 j<p>Job是一个要发送给Worker执行的闭包函数,这里ByeBye用来通知Worker可以终止当前的执行,退出线程。</p>
) v4 c6 e; y/ K7 j8 E( @<p>只剩下实现Worker和Pool的具体逻辑了。</p>* I8 ?: u) b8 N; R
<p>Worker的实现</p>5 W6 U# N( L. c g" k$ L! _
<pre><code>impl Worker( e8 Z$ J, Y! \$ |4 M3 z0 L
{& {9 j( E# Z, J4 D
fn new(id: usize, receiver: Arc::<Mutex<mpsc::Receiver<Message>>>) -> Worker {5 [4 s. \7 R, X- {
let t = thread::spawn( move || {
5 I. j0 f0 S7 \/ U7 F# d% ~! i loop {
$ G9 P/ v; L6 T let receiver = receiver.lock().unwrap();
; M7 f) c. K, h% R let message= receiver.recv().unwrap();: |% r9 c! A( N& x! ~
match message {
! B" s) e W" Y& D Message::NewJob(job) => {; D( B' k1 U7 u; E, o; ?
println!("do job from worker[{}]", id);
" _$ Q8 }8 G* U* ~7 Q job();
4 S4 [/ s' N1 ?7 b) I) ^9 ^1 K. ^ },
9 @4 }, D& W5 {# j, [& A7 B Message::ByeBye => {
: _: Q `0 S2 g: J8 B& X println!("ByeBye from worker[{}]", id);" ~' f) L) a2 t) T" m
break
* |- Z% i( {: s. j0 ~ ` },. o3 W( c$ d$ D1 T/ [+ ?3 a
} + O, y9 @' l" ]: A
}- `3 b( q; K/ l* S8 X8 m
});
8 Y# K0 W: w) B- w
! Z' e, K, F7 m- J) Y8 q6 u Worker {
$ F! n! b: t9 w( \3 _* c$ @1 U _id: id,3 W q4 E5 z. J" P+ h1 C6 v8 T
t: Some(t),% _, m8 t* t9 y/ V' ?: O: ]
}# ]# { H# ^- x2 Q) R& E& c
}
+ d( B' x7 `. y% W* t) Q' R) R9 ]}
/ Q( m0 k( K. I- L) c& Q, B</code></pre>
6 Z2 N7 E( t7 k# e( \0 B<p><strong>let message = receiver.lock().unwrap().recv().unwrap();</strong> 这里获取锁后从receiver获取到消息体,然后let message结束后rust的生命周期会自动释放掉锁。<br>
, x3 m0 }. z+ _! G! x但如果写成</p>
5 p2 A4 w# a5 G& |2 F Q, g<pre><code>while let message = receiver.lock().unwrap().recv().unwrap() {
# M2 f' L. p, Z};, S2 v, A/ z3 Q8 J2 S& p
</code></pre>: p1 n1 m/ m$ P* L- l
<p>while let 后面整个括号都是一个作用域,要在这个作用域结束后,锁才会释放,比上面let message要锁定久时间。<br>
$ p) ~* D r/ E) R" o0 W5 Y* h6 g- }rust的mutex锁没有对应的unlock方法,由mutex的生命周期管理。</p>
2 i: Z. l: R: s1 }<p>我们给Pool实现<code>Drop</code> trait, 让Pool被销毁时,自动暂停掉worker线程的执行。</p>6 n" J( d6 d. f" C" d) \! ?& ^
<pre><code>impl Drop for Pool {
. l# ~% k2 C% k) f, `0 y1 ~ fn drop(&mut self) {
- w/ i, d. }- c i' {9 u! P; R# w, G for _ in 0..self.max_workers {
( e8 l, I* `- _4 Y" K self.sender.send(Message::ByeBye).unwrap();
1 o c* l1 S" @3 ^ }
4 ]5 p5 Q' x5 B, U3 n" F4 P for w in self.workers.iter_mut() {! b3 E4 }# q6 e1 A$ P
if let Some(t) = w.t.take() {
; o0 S% `$ k3 ^7 v5 ^ t.join().unwrap();# N/ j- y. U$ |/ W
}
5 _* Z1 L U' R) r7 \5 C* F; [- W5 W }) k7 ?) w' d ], `
}
8 ]) z& y- \+ ?7 U! g' l8 O9 {& f r}" [7 Z0 K+ Y# T% d
! R( c+ Y, K, R% N</code></pre>. O- H" z* h, A
<p><strong>drop方法里面用了两个循环</strong>,而不是在一个循环里做完两件事?</p>
+ R; R, t5 X' [% B" W<pre><code>for w in self.workers.iter_mut() {: b3 m# H" s! z+ W9 v: D
if let Some(t) = w.t.take() {; e( t) L/ h _* f! ~* A
self.sender.send(Message::ByeBye).unwrap();% b5 d' e. S1 M( U$ E
t.join().unwrap();4 B& T2 }7 O: K: A; B# A
}/ p! p, E. T( O! m$ U
}( S* D# i$ a, f7 l# K6 @
1 [, b$ l* v) L5 ^" a
</code></pre>
! h7 U: C4 j3 `4 P5 d1 _% I: u7 L4 C<p>这里面隐藏了一个会造成死锁的陷阱,比如两个Worker, 在单个循环里面迭代所有Worker,再将终止信息发送给通道后,直接调用join,<br>
" k2 m% z8 i% o5 t5 q$ J我们预期是第一个worker要收到消息,并且等他执行完。当情况可能是第二个worker获取到了消息,第一个worker没有获取到,那接下来的join就会阻塞造成死锁。</p>$ ]' v/ \0 h. K' O0 f7 @
<p><strong>注意到没有,Worker是被包装在Option内的</strong>,这里有两个点需要注意</p>
- A" i8 c% @: U. Z<ol>1 c$ o. e2 e9 a
<li>t.join 需要持有t的所有权</li>
5 G) t! Q# ^5 P% K<li>在我们这种情况下,self.workers只能作为引用被for循环迭代。</li>
' ^! }) p. Z5 t& E8 y# Z</ol>8 z5 J! I- a- N" ~$ A5 N
<p>这里考虑让Worker持有<code>Option<JoinHandle<()>></code>,后续可以通过在Option上调用take方法将Some变体的值移出来,并在原来的位置留下None变体。<br>! T$ l$ X0 `. E$ f9 }8 W, t
换而言之,让运行中的worker持有Some的变体,清理worker时,可以使用None替换掉Some,从而让Worker失去可以运行的线程</p>- A- P9 K( {3 D' c6 D6 i
<pre><code>struct Worker where
, R7 b/ f8 ]/ n j{- h" m3 x. V( T5 q, a5 @+ B+ s* T
_id: usize,
6 V" o. w% F8 I1 S& D t: Option<JoinHandle<()>>,
1 } F. Q% H2 p}
( f5 a8 ?8 d, C, `</code></pre>0 @: \ y% Z$ ?+ R
<h1 id="要点总结">要点总结</h1>
; @4 I6 B0 J# ]) p+ v<ul>/ ]5 X$ K2 x9 v
<li>Mutex依赖于生命周期管理锁的释放,使用的时候需要注意是否逾期持有锁</li>
t+ }8 E" y6 [; |' Z( d6 V<li><code>Vec<Option<T>></code> 可以解决某些情况下需要T所有权的场景</li> k3 D$ } q3 B9 p
</ul>
* ]8 q) `1 A$ c) h: w+ Z<h1 id="完整代码">完整代码</h1>. J* Y1 a$ J1 i: c; s
<pre><code>use std::thread::{self, JoinHandle};, K4 [% s4 z- ]$ d0 ]" U
use std::sync::{Arc, mpsc, Mutex};6 w+ c% K! j( T% t+ F2 e3 j! | D
. G, a/ }3 l* e" k
9 _& k) o, G3 ?; o' B+ etype Job = Box<dyn FnOnce() + 'static + Send>;
! K& f, L3 ]. Z7 r- u; l1 p+ ?9 aenum Message {
: m; ~& v# n2 x. c ByeBye,) V: X& B$ i* x5 r J2 Y, ~) s+ x% o
NewJob(Job),
% t9 s2 R, L" g7 c( V9 R5 g}2 k4 {4 J( ^5 ^2 \& ^6 I9 J" e; |" \
$ \, `2 N; F2 @
struct Worker where
% R; R9 j5 \0 [& _5 Z. O$ D6 _& {{0 ^. ?2 d# @$ i- w( P
_id: usize,% \* B+ v6 o/ \; s- r
t: Option<JoinHandle<()>>,
2 e1 C! W+ j" Y& `( z}8 w; {4 K# B7 I8 c% L4 V' D7 t( c
. Q, o8 v9 k% {4 b6 m8 d) c. ^" Eimpl Worker" z+ i4 O* g* B' y; s" w/ H/ `- u( K
{8 K3 Y# k: A6 e- Q4 G" R9 y1 L0 B
fn new(id: usize, receiver: Arc::<Mutex<mpsc::Receiver<Message>>>) -> Worker {
6 m" i8 x- B! } let t = thread::spawn( move || {
2 q$ p1 l( x$ I* z loop {
5 E" ~3 R- f( C/ K4 K. E: J let message = receiver.lock().unwrap().recv().unwrap();
1 d! ^, j7 \/ y0 k match message {6 ~6 b9 ]# F; x
Message::NewJob(job) => {2 V) n' s& o0 E2 \7 E; _- |5 ~
println!("do job from worker[{}]", id);- d; K6 r8 K) c2 ?- B6 v
job();# [1 j* |7 J/ U/ h& ^8 g2 R* g
},, Y. s* H' F* K: m- n" P/ n
Message::ByeBye => {* J% O4 u, N ^: b( U- H2 L+ {
println!("ByeBye from worker[{}]", id);
1 c: n9 L7 J, f7 d! ? h4 ] break9 j$ @2 n6 O0 m: n- Q0 k
}," c$ c3 E7 U" |: d- l+ E1 X6 Z* M# A
} + g! f6 s: |3 g. O) j
}6 e! ~/ D2 @' V R1 T/ E% v4 `) F- b
});5 f6 _! S" U: x
( k' }+ X( f% J: J3 K Worker {
3 u! H8 Z6 E8 l _id: id,
7 B$ I |+ G9 i6 f t: Some(t),
9 S+ x8 \ {) R* \ }$ `! L6 I1 b, ] V( V
}
9 W* t- V4 Z! b}. r/ {# m0 K. O( q, O* L V% ~8 X$ x
6 Q* }8 t! s6 I, E( h- y o# G
pub struct Pool {
L6 H/ d. J( F5 W; @ workers: Vec<Worker>,8 l+ H; c$ X$ T
max_workers: usize,* H8 i% B# ]' P
sender: mpsc::Sender<Message>& U' g3 v! ?1 j- Y3 u6 F/ b+ y
}% d- t% }2 L% i6 t9 |' R
m3 o/ w* z8 himpl Pool where {5 g- T6 N( ]% _: ?% H% w$ y8 y
pub fn new(max_workers: usize) -> Pool {0 G8 M5 [) D. l# ~
if max_workers == 0 {+ h5 R& b3 I8 Q1 Z! z
panic!("max_workers must be greater than zero!")3 B% u2 ^ h* |' e
}4 i: R7 G2 v7 r% h5 L" b
let (tx, rx) = mpsc::channel();
4 Y' w6 y* [7 W5 r9 H1 n
/ z+ v3 |" Q! l: [0 G let mut workers = Vec::with_capacity(max_workers);
! T+ y, S' j3 a7 d0 _) _/ p let receiver = Arc::new(Mutex::new(rx));
R; \3 a# p4 R! R* A1 d for i in 0..max_workers {1 U7 h" E F& _+ x$ s/ k# ?- A
workers.push(Worker::new(i, Arc::clone(&receiver)));& i1 s5 x7 W. f# A
}
8 L, _; a) Y* \% x7 c0 ]
5 w9 A6 E6 M2 F5 U6 L Pool { workers: workers, max_workers: max_workers, sender: tx }
" X8 n4 A% a/ y6 Y }7 t+ t, _- E* v* }3 |
1 \9 {. X8 G# q; x+ Z8 r8 p pub fn execute<F>(&self, f:F) where F: FnOnce() + 'static + Send+ t& t/ N3 J0 Z$ i" A# C
{
+ i/ Z4 n% p3 k. C5 V6 F: v
, c2 u ]0 m2 }& T% x% O. X let job = Message::NewJob(Box::new(f));
N: ^2 |7 B2 h* n* V* H7 @" L+ |1 { self.sender.send(job).unwrap();
: \2 X; i% P8 `- V3 y# I* O& [. s }
) g# w" [" j8 [) W9 ]- W# y4 O}
+ l5 Z: n0 F9 l% Z- W
4 N ^4 v8 w, eimpl Drop for Pool {; K6 l$ q( h! i" y- t0 h2 p6 h* m6 l
fn drop(&mut self) { ~$ i# ~; r. ]2 z* `
for _ in 0..self.max_workers {
- F/ w6 c# L% S& M self.sender.send(Message::ByeBye).unwrap();
& W% q# b. I7 s9 a- L9 y; g1 X }) _; n) }$ w0 c! T) }6 q
for w in self.workers {) f2 a* C, W' U& w: h: o% u
if let Some(t) = w.t.take() {
- q" `5 X. h" q# s% U3 N. D t.join().unwrap();& `9 O% ] w9 ^: E( ~" [7 l
}9 u3 Z! {0 L$ j6 x6 M
}
' `! Z/ L4 _$ ~ }% p: {3 u. K1 t" A
}
* @# U C/ z9 d/ N
$ l' w" E3 S. H
# k! U6 s X5 u: N/ H/ K6 [#[cfg(test)]
' B/ {3 `9 U; n. i/ Z* _ ^% imod tests {
0 M( r- _- k0 x3 x, y use super::*;
7 C3 E: n7 e( F! p; X #[test]7 p& Z3 |7 h9 ?7 q) f6 J
fn it_works() {
2 T i5 P5 D$ A% X! g let p = Pool::new(4);5 E) {- Y/ B9 ^) ]$ ]( Z; g. h. H
p.execute(|| println!("do new job1"));
: X1 w4 d" n" n$ m/ @ p.execute(|| println!("do new job2"));8 k- l/ r" F% G" o) p) o/ S
p.execute(|| println!("do new job3"));$ f8 B5 {* i( n1 Z6 |( e
p.execute(|| println!("do new job4"));
) f8 g( y. e5 l1 t9 I& O }
1 y' _- O+ w; v! ?! V}2 u! C* T6 z, e
</code></pre>( D( j* g7 O; F# u; u2 o' k9 a
6 c5 f- V" k6 m, ?
|
|