|
|
+ N6 J( y3 V. T<h1 id="如何实现一个线程池">如何实现一个线程池</h1>
9 J! P2 ~4 X7 d<p>线程池:一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。 例如,对于计算密集型任务,线程数一般取cpu数量+2比较合适,线程数过多会导致额外的线程切换开销。</p>
. G" M3 c; J5 e! N<p>如何定义线程池Pool呢,首先最大线程数量肯定要作为线程池的一个属性,并且在new Pool时创建指定的线程。</p>8 V8 ~8 L' c s& p5 R+ T
<p>线程池Pool</p>
5 H: p% T" Y" Y5 `) I( g/ Z<pre><code>pub struct Pool {
6 {' j1 F5 l" L& |, U) W! I max_workers: usize, // 定义最大线程数) Z9 A; X: c: ~- Z
}% |' t# d" R9 }* I. g, e. w
* d& [, C9 t5 ]8 L- [3 h- O4 b
impl Pool {
; U9 g6 M! h; v" ] L( E fn new(max_workers: usize) -> Pool {}* s) r" b) t1 v
fn execute<F>(&self, f:F) where F: FnOnce() + 'static + Send {}/ v7 z( ^2 v4 X, u* E/ D
}8 ^6 l4 H# t6 ? o( T1 f
$ B. ]# D) L. W! z) o2 n
</code></pre>
% T6 w/ I& L& N8 ?/ q1 h, _<p>用<code>execute</code>来执行任务,<code>F: FnOnce() + 'static + Send</code> 是使用thread::spawn线程执行需要满足的trait, 代表F是一个能在线程里执行的闭包函数。</p>
* G5 b, W1 i$ t7 @8 D3 A( P2 T<p>另一点自然而然会想到在Pool添加一个线程数组, 这个线程数组就是用来执行任务的。比如<code>Vec<Thread></code> balabala。这里的线程是活的,是一个个不断接受任务然后执行的实体。<br>5 C i% y) x: y
可以看作在一个线程里不断执行获取任务并执行的Worker。</p>
: [- \( j/ }# M, m<pre><code>struct Worker where/ D' O0 H% t2 C6 i4 r; L$ u/ M
{
# s. D/ E$ |. N- Q+ L _id: usize, // worker 编号
2 e" J l! e8 U2 q, F S}9 H+ W" O0 d7 q* X" g
</code></pre>
% _& b" b4 s f4 B<p>要怎么把任务发送给Worker执行呢?mpsc(multi producer single consumer) 多生产者单消费者可以满足我们的需求,<code>let (tx, rx) = mpsc::channel()</code> 可以获取到一对发送端和接收端。<br>
3 Z0 j) j/ {- \" F# s0 ]把发送端添加到Pool里面,把接收端添加到Worker里面。Pool通过channel将任务发送给多个worker消费执行。</p>
2 x% R' }8 C( N; S<p><strong>这里有一点需要特别注意,channel的接收端receiver需要安全的在多个线程间共享</strong>,因此需要用<code>Arc<Mutex::<T>></code>来包裹起来,也就是用锁来解决并发冲突。</p>
y8 A% G3 I* i9 E, D, c8 S<p>Pool的完整定义</p>
& I& T6 y( C" p8 B6 s<pre><code>pub struct Pool {6 x- D/ i. c4 P0 ]9 P5 i- [
workers: Vec<Worker>,8 h: @: V* A, z2 f( c% L! h6 |
max_workers: usize,; ~. E! [, [2 ?7 I% W3 b
sender: mpsc::Sender<Message>
, ?- |# d6 k* G, c}
* n3 M& m" q) W; G</code></pre>
; \+ E8 G6 K# }. T; q' X% Z<p>该是时候定义我们要发给Worker的消息Message了<br>8 Y& t" f# a: z2 Z* e, A% G
定义如下的枚举值</p>
$ k/ _& A0 n& Z0 {2 O# S, v; P+ d<pre><code>type Job = Box<dyn FnOnce() + 'static + Send>;
0 d6 M& R3 d ~enum Message {
( }9 \, J% z9 D7 r* _; ^3 R* Y# u ByeBye,5 a/ A: V4 r2 b- u# \) u, r
NewJob(Job),
" g4 W, c/ ^2 i0 J9 T+ J" m) S0 e}
5 n0 R) a0 u4 ]& R</code></pre>, Q& F1 c" y: w) c6 Q
<p>Job是一个要发送给Worker执行的闭包函数,这里ByeBye用来通知Worker可以终止当前的执行,退出线程。</p>* P L; A4 |0 s+ L
<p>只剩下实现Worker和Pool的具体逻辑了。</p>4 p; g1 i' Q4 I6 x: g$ n* Q6 T
<p>Worker的实现</p>
- [5 I! e. H+ v6 [9 ^. X% F; J<pre><code>impl Worker
9 U. N4 ^/ }. } y{
0 | U9 {3 q; ?- o; [, n fn new(id: usize, receiver: Arc::<Mutex<mpsc::Receiver<Message>>>) -> Worker {
! Y& ?7 _7 V( z& D% e! Z let t = thread::spawn( move || {2 A8 U3 g# V' M9 H
loop {2 u n! Q6 O! S3 {7 K
let receiver = receiver.lock().unwrap();, p- D' s% c# G% H' q4 Z9 G7 [" K. V
let message= receiver.recv().unwrap();; |' ?2 e9 y& B d ^; v& d; \! J
match message {% ^* D+ r% J# s% @5 K
Message::NewJob(job) => {
- z# i; t+ ^# g. p println!("do job from worker[{}]", id);
' y1 l2 [4 M& B2 j" { job();
; H) [- f5 v$ i3 e! ^2 j },
- p/ z3 z; r7 b5 W# V3 m Message::ByeBye => {6 g6 y+ ^* w1 |( J% b1 L
println!("ByeBye from worker[{}]", id);
$ Z2 e9 e3 q1 S7 D! M break
9 q$ C9 R$ J1 @1 l3 N: |# d3 Q },
, Q+ ~( Y5 p7 ]' Q }
* Q/ y! i) m3 L }# m5 v' q" V1 n$ c3 q- s) l8 n
});
7 s9 a* g( F6 Q4 }- s0 W6 ]; r' V, ~- e/ x, M! h9 |- g1 g
Worker {
6 {! A) A/ X1 _; q' z' c1 G _id: id,
+ X" Z/ P3 ~( d; Q/ B t: Some(t),
0 p+ `) f' X+ ]4 \# z" P+ ?& w }
: E. N) n, t( f' I1 M9 ?- [ }
9 s0 Y/ Y; Y+ y1 Z5 Q8 r6 Q, d}
& h% |, o. _/ n1 G/ o8 o( Q- f</code></pre>
( m# s- f4 f8 y2 @<p><strong>let message = receiver.lock().unwrap().recv().unwrap();</strong> 这里获取锁后从receiver获取到消息体,然后let message结束后rust的生命周期会自动释放掉锁。<br>: y6 h7 v6 ^2 T$ m
但如果写成</p>
4 z% O% `3 p; _1 A2 P<pre><code>while let message = receiver.lock().unwrap().recv().unwrap() {$ H. K" c* U, W: J5 T3 d0 c
};
5 ? `1 W2 \6 k+ M2 v6 m</code></pre>3 J) ^# Z6 c. e8 X8 z
<p>while let 后面整个括号都是一个作用域,要在这个作用域结束后,锁才会释放,比上面let message要锁定久时间。<br>
% h6 I Q' W& prust的mutex锁没有对应的unlock方法,由mutex的生命周期管理。</p>, V. M% _. j) p5 ^
<p>我们给Pool实现<code>Drop</code> trait, 让Pool被销毁时,自动暂停掉worker线程的执行。</p>
/ s6 D3 o( ~- L" f( ]<pre><code>impl Drop for Pool {+ o$ T. m, B1 g% {
fn drop(&mut self) {( ?3 t2 r1 n& s2 S- T
for _ in 0..self.max_workers {6 r3 ~ K# j4 T3 n4 @
self.sender.send(Message::ByeBye).unwrap();" ?2 n+ h' f- P% q* k( Y
}6 w D2 |0 Z3 `1 q
for w in self.workers.iter_mut() {
9 T/ l0 C6 G+ U; a/ e @- x if let Some(t) = w.t.take() {
( `5 K, g" B6 t3 Q5 L t.join().unwrap();
- L' V4 } q& Z) X' J$ A }
7 D4 D3 U* u; {+ Z! W e. Y }
' _! d0 { y7 m( B5 R }
4 t }. a- q. Q}
; V6 l5 X/ L; L( B8 c/ y1 Y
* {/ {* |- n- v5 ]</code></pre>0 C+ a' m& A' }# T u
<p><strong>drop方法里面用了两个循环</strong>,而不是在一个循环里做完两件事?</p>3 d0 A- A0 e1 v2 f
<pre><code>for w in self.workers.iter_mut() {; Z, L3 s" m# U6 `$ g
if let Some(t) = w.t.take() {
! w, H8 B0 @: H8 _2 {. E self.sender.send(Message::ByeBye).unwrap();
$ ~" Q" l3 ?8 D/ h% I3 f8 w t.join().unwrap();; ^+ p; ~4 ~' u' D+ N7 j
}2 u. C* m) R. d' Y
}
9 p, v$ v; @( j6 P. V' T. v7 c
" j$ O4 R, F' u. y</code></pre>
* @4 \; {1 {' t9 N# g<p>这里面隐藏了一个会造成死锁的陷阱,比如两个Worker, 在单个循环里面迭代所有Worker,再将终止信息发送给通道后,直接调用join,<br>
9 w( ]- K9 O( p9 M& R" u我们预期是第一个worker要收到消息,并且等他执行完。当情况可能是第二个worker获取到了消息,第一个worker没有获取到,那接下来的join就会阻塞造成死锁。</p>. Q) i9 E( Z' V3 L7 A' I: _
<p><strong>注意到没有,Worker是被包装在Option内的</strong>,这里有两个点需要注意</p>0 M3 c0 I1 Y1 ~' X" p
<ol>
# `4 k& e& Q9 p% K<li>t.join 需要持有t的所有权</li>5 Y2 F; q" j! C8 K x" D
<li>在我们这种情况下,self.workers只能作为引用被for循环迭代。</li>
1 H: g! T4 E) N+ r% _0 P. W</ol>
8 w* }1 Y2 s9 z1 G. |<p>这里考虑让Worker持有<code>Option<JoinHandle<()>></code>,后续可以通过在Option上调用take方法将Some变体的值移出来,并在原来的位置留下None变体。<br>
& G; @9 k7 ^4 W, Z6 N换而言之,让运行中的worker持有Some的变体,清理worker时,可以使用None替换掉Some,从而让Worker失去可以运行的线程</p>2 U. V9 X6 D3 q6 V8 |; ^
<pre><code>struct Worker where6 U" v" i+ @/ h2 A1 J, J5 ~$ [
{2 w K5 `: e: t \
_id: usize,
/ X: [% p, `+ d/ z B, a: Q* c1 Z t: Option<JoinHandle<()>>,4 r6 D2 O$ S3 J3 d: l
}; u _; j6 @6 e& b3 P
</code></pre>
5 |6 V7 [ D7 d: ?/ S) m% f+ w8 E<h1 id="要点总结">要点总结</h1>
! z7 c) L5 n. C# _- S* M6 `<ul>
5 N% B2 B3 P/ {) Q<li>Mutex依赖于生命周期管理锁的释放,使用的时候需要注意是否逾期持有锁</li>. D6 m7 k' |! D1 p$ N
<li><code>Vec<Option<T>></code> 可以解决某些情况下需要T所有权的场景</li>+ W& W/ L T1 ^# L8 h9 L
</ul>
( W: _( C* b; q1 O<h1 id="完整代码">完整代码</h1>
# ~" `$ d* Z. p2 `- ]1 ~; m<pre><code>use std::thread::{self, JoinHandle};0 c; ?& v+ Q& b7 e, x
use std::sync::{Arc, mpsc, Mutex};( h% m8 t1 n1 ?, C; l1 y
7 N& j8 t8 v! r+ B- D5 F& k P9 W
3 e `: [+ ^3 \
type Job = Box<dyn FnOnce() + 'static + Send>;
8 l: o# w# H* C* \1 N: o/ M! Renum Message {
! [/ O0 w9 S6 i6 J4 U n ByeBye,2 q0 e2 |0 e" i. d- d0 W
NewJob(Job),/ t+ j8 W9 q' }; m( V
}
: v. | G0 Y- B& }
- b0 a" [! H3 o9 o! O5 E2 i9 ]struct Worker where0 \5 I6 v7 a0 R0 `8 [, n. i
{
p: E4 Z3 Q/ J) s% h9 _ _id: usize,
7 L4 r5 A4 f' m+ l3 [6 @( G f t: Option<JoinHandle<()>>,$ h( b K! }' N/ @+ d3 V
}
& ]6 c& s' q1 u' p' X- `# [9 Z6 I/ r# N$ ]
impl Worker* Z" o0 o" q. p: y% i' s8 T
{/ T/ H* Y; \$ T% W" V
fn new(id: usize, receiver: Arc::<Mutex<mpsc::Receiver<Message>>>) -> Worker {
7 P9 Z2 G8 J9 [9 D q1 o$ w$ ] let t = thread::spawn( move || {
% X2 x5 _& k5 f Z+ L+ k loop {
9 L/ a- t9 }5 k v+ H let message = receiver.lock().unwrap().recv().unwrap();% m+ o4 U. n# |& R \- T
match message {4 d l. V& y' u2 d, ]0 x" A7 G
Message::NewJob(job) => {
/ @0 {' f7 X7 r println!("do job from worker[{}]", id);4 r, e [7 i3 M
job();% b D1 Y7 v1 {$ @5 q
},3 q- g, B7 J- L5 a
Message::ByeBye => {# p' ]- H1 s! h7 |1 p/ [( O0 m# u
println!("ByeBye from worker[{}]", id);! I; z/ \8 m; v6 ?; g2 G! I+ u; W
break
0 y5 M% r/ \0 T; c },
3 ]; L9 j( v- D! Y1 F8 j) O5 Q0 q } ! j4 L- G4 k; S8 a5 c
}0 V- e! ]3 _* l7 @ W9 g7 q/ w- v6 b5 i) _
});
5 k' h5 ?0 ]" n5 U X- e7 m7 G! z
; B* I2 D3 v, d- [3 {: C& o/ ^ Worker {3 l j Q2 e* j8 g
_id: id,8 D" u3 Q' w+ G# w& s+ R+ L
t: Some(t),# g: Z1 _: b6 ?& X; r; r
}
6 s* a2 |. M" s9 N }
* V2 E/ C7 Q) v3 i8 Q; b) H, c} r9 w4 ~6 m# ]# J3 c
; q1 U6 r0 L5 U/ d2 s
pub struct Pool {
5 r; N. ~1 f. T: `/ e2 e workers: Vec<Worker>,) K; M" D" v: ~$ N6 z
max_workers: usize,5 D# w9 i- A# w# b' |- o
sender: mpsc::Sender<Message>8 _" I! f/ O1 |2 {
}! X1 @$ @* h2 Z# `
! c {0 n6 e, k+ \+ yimpl Pool where { M: l2 v, s/ Q& z2 |
pub fn new(max_workers: usize) -> Pool {
) R) Z! |& N9 R: ^4 S if max_workers == 0 {
8 d2 G1 K* T8 m: `" O panic!("max_workers must be greater than zero!")+ \5 t5 K" [. ^# {; o
}& ^5 H! j" N$ G: Z0 h3 D' L: E
let (tx, rx) = mpsc::channel();& F+ t0 G. h. ]3 z6 v( [
5 l2 F4 y6 j* j5 t- n h# o
let mut workers = Vec::with_capacity(max_workers);
* r& d* V& j' l' W6 q, w, F let receiver = Arc::new(Mutex::new(rx));
9 X4 q* m; ~* u- K8 P for i in 0..max_workers {
2 {& o- w7 o, F* V! I& M, b workers.push(Worker::new(i, Arc::clone(&receiver)));% D# s; H0 ~. ]6 r4 T+ `
}( |5 @# Q& {: x- ]
# @$ j- R- Y/ l& ~ Pool { workers: workers, max_workers: max_workers, sender: tx }
' i1 Y/ n) ?8 ^- E y }* l# `3 F o$ w1 G) V2 J' [( X8 U
$ {% v8 ^& l9 Z
pub fn execute<F>(&self, f:F) where F: FnOnce() + 'static + Send" Z! C( c: z# h1 d, S; V( V# N& I
{* ~8 G: D. j0 ^) _+ r' C6 b
0 ~ b$ e$ f. ~5 ]! I$ b" E5 M let job = Message::NewJob(Box::new(f));
) f8 R% B/ p R& V8 w: O( n self.sender.send(job).unwrap();
) X& W5 N2 b& G3 D }& b+ O7 `9 b! O8 v
}
5 b8 o/ E5 D# N! o, v" O6 H+ q+ Z8 l$ }8 q
impl Drop for Pool {
' x& Y- n+ g- s% U. s) {9 } fn drop(&mut self) {/ _& ^- ~6 X7 Z; M2 C% \
for _ in 0..self.max_workers {
- X1 R1 R8 S8 Q+ t- n& b" T self.sender.send(Message::ByeBye).unwrap();0 N8 N0 a" A; Y3 x& d/ U, C& S
}
. h8 r9 G: E, @ for w in self.workers {1 j, _! i, W8 R. d
if let Some(t) = w.t.take() {
- q7 g w2 l* E t.join().unwrap();
( W: N% [# R" I* ]* j }
* r# C$ y& F. F: \! ] }
* L# j+ {! X H* h; y# V! d }8 W! `/ u6 m$ ^- Z7 b* K
}( A! q4 ^+ U% B0 u% B [
/ z7 `( x3 w3 C* X3 t, @
, q( s5 i6 \3 o+ S$ }#[cfg(test)] ^. |, x& {, r& \2 f+ P) k# y
mod tests {
, c: |/ M4 X* p( Z& P7 S# D use super::*;
9 _0 N& z, X8 ^, F" B- e #[test]0 K7 z( I7 W6 j1 P1 G
fn it_works() {6 Q& T& X8 ~$ r' c% |
let p = Pool::new(4);9 O, l( C4 }- e( c8 }
p.execute(|| println!("do new job1"));
. f% B! g9 F2 E( ?1 ^ p.execute(|| println!("do new job2"));. k4 [9 q4 ^& R4 X8 t
p.execute(|| println!("do new job3"));
* x0 f6 k$ [ q9 y1 Z p.execute(|| println!("do new job4"));
7 m9 F6 H4 M) Z, U( k/ V1 Z) {! K$ l }, ~0 {/ M$ B( I8 }4 W% `
}
4 {. I# W, A( t1 ~% \</code></pre>
) g! o( q, V( K8 H* [5 y3 k; W8 B1 t; y+ q6 [& O
|
|