|
|
# @# S8 l/ {* x9 N0 v4 ?
<h1 id="如何实现一个线程池">如何实现一个线程池</h1>
6 E- A7 Y/ L! G<p>线程池:一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。 例如,对于计算密集型任务,线程数一般取cpu数量+2比较合适,线程数过多会导致额外的线程切换开销。</p>
; ?' s2 p+ v2 y<p>如何定义线程池Pool呢,首先最大线程数量肯定要作为线程池的一个属性,并且在new Pool时创建指定的线程。</p>
Z, c# a& Q4 \, W7 J6 `<p>线程池Pool</p>1 _) J1 [( C6 C% t
<pre><code>pub struct Pool {7 l8 q- x$ m( E; P8 [: H& C
max_workers: usize, // 定义最大线程数
& v8 A9 n' }( ?/ Q) s}- X9 r' ~- l) M- @# n+ v# G0 @
% o: x: u+ S0 p% ~' o: U" H
impl Pool {
8 [3 B0 h9 P& K6 x9 [ fn new(max_workers: usize) -> Pool {}. s( t& V) Q# U: Y* o
fn execute<F>(&self, f:F) where F: FnOnce() + 'static + Send {}6 w+ \( y+ D' t/ d/ x" z
}
4 `; V/ g8 n3 a: Q5 ^6 ^- ^
4 c" M+ f; }3 P$ K9 F</code></pre>
, {6 N) v$ o- x5 K9 j<p>用<code>execute</code>来执行任务,<code>F: FnOnce() + 'static + Send</code> 是使用thread::spawn线程执行需要满足的trait, 代表F是一个能在线程里执行的闭包函数。</p>
9 J/ s5 L* P/ v# w9 p; J$ F# V<p>另一点自然而然会想到在Pool添加一个线程数组, 这个线程数组就是用来执行任务的。比如<code>Vec<Thread></code> balabala。这里的线程是活的,是一个个不断接受任务然后执行的实体。<br>
) S( A. \4 h/ b可以看作在一个线程里不断执行获取任务并执行的Worker。</p>* U C! x1 {; ?$ B) {
<pre><code>struct Worker where
" D5 c+ ]) S p$ c{8 b, b; C& |& r# d0 _! X
_id: usize, // worker 编号- s! e& d$ v# ?' f
}
! K4 E C }" Q</code></pre>
3 u, H( u/ H+ n, w$ p E<p>要怎么把任务发送给Worker执行呢?mpsc(multi producer single consumer) 多生产者单消费者可以满足我们的需求,<code>let (tx, rx) = mpsc::channel()</code> 可以获取到一对发送端和接收端。<br>/ i' e7 Q( Q$ Z# X0 v
把发送端添加到Pool里面,把接收端添加到Worker里面。Pool通过channel将任务发送给多个worker消费执行。</p>
3 Q l. K$ ]1 Q6 Q<p><strong>这里有一点需要特别注意,channel的接收端receiver需要安全的在多个线程间共享</strong>,因此需要用<code>Arc<Mutex::<T>></code>来包裹起来,也就是用锁来解决并发冲突。</p>
/ P* ?- \; H+ J4 p8 w<p>Pool的完整定义</p>
; s* E' v1 x$ X6 W; p<pre><code>pub struct Pool {
8 y. B; h/ f+ u8 o. P; O workers: Vec<Worker>,2 l, h: G- D# |) }6 F1 [
max_workers: usize,
2 n- O9 M3 r* V2 ]+ {& G: k4 p+ e sender: mpsc::Sender<Message>
( }0 t+ H1 k: Q: ]) n}2 Z; Y1 Z* v9 [/ `: \
</code></pre>
* B+ k. s& ^* e; J) n x- q* e9 v; V<p>该是时候定义我们要发给Worker的消息Message了<br>
* o r* _9 Z+ [- a# c% \# L, J: r定义如下的枚举值</p>
# O/ |2 Z* U w/ y3 ~' V; i<pre><code>type Job = Box<dyn FnOnce() + 'static + Send>;% j$ z; T% M( o) y2 ^" R
enum Message {
8 D, P( {) F, m5 o6 j$ `1 [$ x1 g ByeBye,
' x$ v: b3 P" `; W NewJob(Job),
5 D9 @7 X: o( |* b}( C4 P, Q4 N7 D" q
</code></pre>6 e! u' {( v6 N! M- E* I
<p>Job是一个要发送给Worker执行的闭包函数,这里ByeBye用来通知Worker可以终止当前的执行,退出线程。</p>
0 ]# u t! S! k<p>只剩下实现Worker和Pool的具体逻辑了。</p>
. }: n( G5 p) t9 H& H" ^<p>Worker的实现</p>& H, o; t- ]. Z8 `9 `' z
<pre><code>impl Worker
$ F0 l4 B8 K6 R Y! c3 ~7 N{( q9 u* R1 V3 w6 t: j+ S$ c; m
fn new(id: usize, receiver: Arc::<Mutex<mpsc::Receiver<Message>>>) -> Worker {: ]% w. t8 f/ A) M9 E
let t = thread::spawn( move || {! Q8 [2 K3 X O# Q+ ]
loop {; o( e% i b. v. z ] @3 P' |
let receiver = receiver.lock().unwrap();9 b* g$ @) j# p9 i2 |3 X
let message= receiver.recv().unwrap();
1 O% |' Q: V- [+ i$ O match message {
7 i7 ^1 m) w3 {! `! t0 S Message::NewJob(job) => {. F; |5 P* i! Z5 `: L8 |
println!("do job from worker[{}]", id);
/ ~( I' s% o$ N! c; a job();; E3 \0 l. y% v$ t- ]2 c4 v
},
( o9 b& a% N' W1 @. ] g Message::ByeBye => {
9 t: _6 z8 q+ a" b) O6 t println!("ByeBye from worker[{}]", id);
. }' W. T& u' g break! p* \+ |) u/ A! d/ k6 r9 q5 F6 y
},- D$ s: z1 p: g& L t
} ! ~' I! [. _7 Y( I, P
}
1 s% f& N) G6 e+ \ });1 a0 C! Y' R9 Z, Z& \
4 j' t2 E; S1 N Worker {; Q" X3 C3 P. r- Y
_id: id,
% P5 u* f8 b- ]$ U# z t: Some(t),8 t+ V+ `' Y$ v
}4 y4 O- {5 u6 t2 ~
}
, a* p0 E, y$ f# V& j( N}) f* @+ p# f6 s# U; _
</code></pre>
) A- [, v @, i* q. t- K% x, I<p><strong>let message = receiver.lock().unwrap().recv().unwrap();</strong> 这里获取锁后从receiver获取到消息体,然后let message结束后rust的生命周期会自动释放掉锁。<br>
' |1 i) @% V5 E1 T5 A但如果写成</p>
9 |1 P: o+ `3 b: @! G; N<pre><code>while let message = receiver.lock().unwrap().recv().unwrap() {
& P B/ O- r/ s# o" a. w};8 c; u3 j# S& X0 g7 p% Y( y8 n
</code></pre>
: O6 Q& |# b' H! n<p>while let 后面整个括号都是一个作用域,要在这个作用域结束后,锁才会释放,比上面let message要锁定久时间。<br>
) @( {. c* k# T0 `rust的mutex锁没有对应的unlock方法,由mutex的生命周期管理。</p>
# j7 K7 N. m4 Y& E8 [# L2 C<p>我们给Pool实现<code>Drop</code> trait, 让Pool被销毁时,自动暂停掉worker线程的执行。</p>$ u7 C4 n5 {; m/ ?- I. Z* F
<pre><code>impl Drop for Pool {! v1 n( G' ~0 j
fn drop(&mut self) {8 I: n8 M! i2 |; ^- t
for _ in 0..self.max_workers {
+ W" u4 |% p( u ] self.sender.send(Message::ByeBye).unwrap();
6 d8 L9 V% E1 @9 [8 f }) Y: u" V) O4 D, [! x: j5 ^- |
for w in self.workers.iter_mut() {
- s" |: C6 Y) {! M" u4 F$ p! P if let Some(t) = w.t.take() {
+ o( _% z, X8 c" _ t.join().unwrap();# n0 T% K5 Y+ |
}
0 `: Q4 A- D+ k8 ]: o }
2 O8 d5 [2 c" p/ } }9 G( s& q1 j% |$ [
}" Z0 w, q% d$ t; h8 o6 V+ C1 ]% @
1 }3 T1 I l6 v
</code></pre>. N9 J5 j% }$ {4 q
<p><strong>drop方法里面用了两个循环</strong>,而不是在一个循环里做完两件事?</p>% n3 H" t$ `/ ]/ ?
<pre><code>for w in self.workers.iter_mut() {3 U$ t$ x4 y4 \( U( h( _
if let Some(t) = w.t.take() {
2 N+ u( B% ?, _+ o0 x/ T4 v1 P self.sender.send(Message::ByeBye).unwrap();
5 f3 L5 L) @1 W2 z* k8 r* t8 J t.join().unwrap();
- y! f+ e( d# q) r }" z. ?, s/ @+ B3 V; f
}
' Q2 r) ^) W- p! G( k5 n3 p* J% L/ J; ~; p8 X$ v6 i
</code></pre>: l& c* U& E% X4 ^7 g" h- A5 m+ p9 e
<p>这里面隐藏了一个会造成死锁的陷阱,比如两个Worker, 在单个循环里面迭代所有Worker,再将终止信息发送给通道后,直接调用join,<br>
3 t5 F1 X, c/ ?/ m8 G我们预期是第一个worker要收到消息,并且等他执行完。当情况可能是第二个worker获取到了消息,第一个worker没有获取到,那接下来的join就会阻塞造成死锁。</p>
% u# [$ F9 y$ e/ A3 w/ V<p><strong>注意到没有,Worker是被包装在Option内的</strong>,这里有两个点需要注意</p>
# X+ g+ l! w) L0 k* P<ol>* w/ ]: N1 K, s V& }9 @/ n
<li>t.join 需要持有t的所有权</li>- o5 z# Z$ x! c4 W4 r, @ U s2 P9 W1 _
<li>在我们这种情况下,self.workers只能作为引用被for循环迭代。</li>
, u: w Y; j7 P, X. b' C3 n</ol>9 P9 `$ s! Y$ Q: h
<p>这里考虑让Worker持有<code>Option<JoinHandle<()>></code>,后续可以通过在Option上调用take方法将Some变体的值移出来,并在原来的位置留下None变体。<br>5 d5 ~( j5 F+ |7 d( A3 p) m6 N5 `
换而言之,让运行中的worker持有Some的变体,清理worker时,可以使用None替换掉Some,从而让Worker失去可以运行的线程</p>
7 P7 ^( y6 _9 I; `( `8 \<pre><code>struct Worker where/ w& f9 t! L" |% d. j f
{
9 t9 a$ M& l2 \; h) O% w" ?9 w, { _id: usize,% J/ G6 y# A- z; o. ~1 Z# x. Z
t: Option<JoinHandle<()>>,: K6 I5 x! s1 @* z1 ?+ j- K. u& l9 V% j
}
6 a( I7 r" k, }$ z</code></pre>
' E$ w* d7 B) T7 h5 ~<h1 id="要点总结">要点总结</h1>
0 `( a2 T! P7 d* D<ul>
- P% W" E7 Q! b, ?5 P6 R7 F4 K<li>Mutex依赖于生命周期管理锁的释放,使用的时候需要注意是否逾期持有锁</li>+ i9 ~% V- R2 u& ^# {5 c% d
<li><code>Vec<Option<T>></code> 可以解决某些情况下需要T所有权的场景</li>: c( ]7 E2 I6 l2 s, H1 M- D
</ul>
7 Z3 B( d) w5 B1 y<h1 id="完整代码">完整代码</h1>+ X, Z; N# F5 ^% L, d
<pre><code>use std::thread::{self, JoinHandle};
0 u) Q+ C" T+ o9 z0 n$ x* u, duse std::sync::{Arc, mpsc, Mutex};
+ g, N0 p) t8 G% g; [, f
6 c. N% ?4 d& B5 d* b) h8 k
- W0 }" y- G+ s8 y3 d" dtype Job = Box<dyn FnOnce() + 'static + Send>;
3 M( H g9 N2 j0 j6 @2 X2 O; r+ x: Nenum Message {
5 a, e. m W. W' g ByeBye,& |: g' T Y3 Y
NewJob(Job),) f+ ]6 M2 l' }0 t! r1 A. t/ `
}* D( g' `; w8 g# s. t; d9 t, h1 t/ `
1 e+ C. W3 a; m. R. I6 A; pstruct Worker where( a4 W, Y. q; w' ]* l
{
' X) a# a* Y* [+ g+ G3 T _id: usize,6 j% q7 ]1 S1 P) q
t: Option<JoinHandle<()>>,
, ^# H4 M; ^3 f0 r$ e5 N}
3 q+ s* E3 ~# ?' d# S9 D$ `! N6 ^, |1 q! s
impl Worker7 b2 N! E& H& D% }1 Y3 R- @5 X. @
{
! e- E2 q5 X/ P4 p9 H9 g3 F2 m2 N fn new(id: usize, receiver: Arc::<Mutex<mpsc::Receiver<Message>>>) -> Worker {
& R4 J3 @8 c& E5 z% q$ K' {: k let t = thread::spawn( move || {
6 k) ]+ J+ ~9 Y loop {
3 e3 N) Q8 D- [) W let message = receiver.lock().unwrap().recv().unwrap();: r2 O7 r! f0 J. m5 g
match message {7 c7 }0 b2 r1 l+ T# N
Message::NewJob(job) => {" L) c1 J$ X# V
println!("do job from worker[{}]", id);
9 Y6 A2 r6 T% A job(); b& {$ b6 u" K; s6 @$ ?8 M1 j2 Z
},
9 G, N3 o- e( P2 r+ j Message::ByeBye => {) T' z8 q* u! S: n! E
println!("ByeBye from worker[{}]", id);' ^ Q" M3 P W8 e3 N
break
" ]1 z D6 K# @$ {. W },( a/ [ Y8 G/ ]. k2 x
} p: B m4 c H, e, Y2 a9 t/ }
}$ K/ s3 p/ N5 G. p
});
% @5 x4 u' f; K3 q/ z
2 \ r3 e! j0 | Worker {
( a w P/ U; q$ O( a# E) M _id: id,8 [1 z8 w6 J$ J3 h0 v/ W/ H
t: Some(t),8 b3 {% ~$ Q* j2 X8 v2 L! ^
}
+ c: Z8 `$ r9 H, T8 i) \( L }
: {6 d b" N2 V( k. Y}5 B% x6 x4 q+ {" C2 c+ l' ?) }
- T! i1 r" |9 j1 Y" s2 z- k
pub struct Pool {
! F G8 c: ?4 K4 ^, k workers: Vec<Worker>,6 D: K: i! c# R8 g
max_workers: usize,
' A+ e& h4 A; [& N' U1 F sender: mpsc::Sender<Message> P! \# g* K9 r# z+ `" A- a
}
! ^- u! {& w5 J0 B* j, @% x3 b$ ]$ Y; j L2 A( Y
impl Pool where {! m9 X/ l6 `4 w2 D# L
pub fn new(max_workers: usize) -> Pool {
! P- x* b4 Y* j! n- u' f" g$ q if max_workers == 0 {: X4 C2 m9 ~/ T
panic!("max_workers must be greater than zero!")
" C) U& U% r( y- E. e' z0 Z }
4 Z8 D9 X( S$ y- [' N( P- F5 ] let (tx, rx) = mpsc::channel();
2 n& v u1 b- g, G( e6 n$ R9 a; [/ n4 g7 t3 k
let mut workers = Vec::with_capacity(max_workers);
8 n { l5 Z9 p/ J let receiver = Arc::new(Mutex::new(rx));
4 s! Y9 I4 c: f! _. i for i in 0..max_workers {, k$ H8 Q% b" l7 @- X6 }6 p
workers.push(Worker::new(i, Arc::clone(&receiver)));; a- Q; r4 v9 m! c- m5 `5 B" W0 D
}
7 q# P2 F9 f9 N3 b! l4 e1 f
1 w) e# c% A2 D0 s1 i& z* i Pool { workers: workers, max_workers: max_workers, sender: tx }
* S7 L4 k7 y* a6 x A0 y6 c9 d }1 o& d& E' f' d. T
1 q3 N6 K7 q" h8 S$ Z
pub fn execute<F>(&self, f:F) where F: FnOnce() + 'static + Send o+ d: v5 ?) q1 ~
{
. k( R8 N' j& H! K$ P
3 U! W/ @: ?, h: z3 d) }- s let job = Message::NewJob(Box::new(f));
3 L8 A# ]1 P4 ]& L5 H, f self.sender.send(job).unwrap();2 }% }+ S+ R g5 y& I
}8 b/ ^- }8 t8 o' k/ ]. w) ^/ m9 u
}
* L7 h* ]6 _ E
: n1 m7 b7 j1 o7 a8 b+ G1 Aimpl Drop for Pool {
+ H0 G$ E! K! m' u1 r fn drop(&mut self) {- T3 J- \ f( }2 w% e, X9 R
for _ in 0..self.max_workers {
7 Q {1 M; c- p. f6 g! x) r self.sender.send(Message::ByeBye).unwrap();
# }0 @. M N! Y7 u& w) A3 B }
3 r7 }7 |/ l/ d* x- g for w in self.workers {
* K8 O9 l5 o7 @; g% @5 w if let Some(t) = w.t.take() {# t; C3 b& a! ]
t.join().unwrap();
1 x3 Y9 U$ x# b) d0 \( p/ P! R; j }
" N# n+ H! ~( g9 L6 ^! c }7 r8 @3 j, V" N& ^% l- i e6 y
}
1 V$ \" ~: I( M: l) \( ]}5 h. n- i% J0 h/ k* d! A; h
( k4 I. f$ Y4 j" B
+ z# c1 B) ^8 H2 a" G& D/ u' m4 G#[cfg(test)]: U: X# H/ d z6 q" J( d$ c
mod tests {( P+ s& G5 S6 [6 @& c. p
use super::*;
: u. b( ]2 \. t3 j2 e7 |6 d, B1 ?+ \, o #[test]( \2 K( Y* `6 G* p% K$ q
fn it_works() {
`' f1 r; V, z& d$ b let p = Pool::new(4);
2 \% `; @; f+ {, d p.execute(|| println!("do new job1"));% e+ i9 c m& k8 ^4 D! v
p.execute(|| println!("do new job2"));4 {$ g" n8 N O' u1 u2 [# O
p.execute(|| println!("do new job3")); f6 h5 Y/ [: q+ u; l! _) d9 ]$ ^
p.execute(|| println!("do new job4"));
1 r) |0 X# s/ y! z2 A0 L! A0 ] }
) Z+ s. E7 L) w- U v- P} R/ q n! X* d: j! {- G+ B8 Z
</code></pre>
3 q& f4 L$ T- b6 x4 b& h/ e% D/ @; [! Y& B3 E/ V
|
|