|
|
. z6 ]0 ] T+ u. Z$ e: w<h1 id="如何实现一个线程池">如何实现一个线程池</h1>! |: `6 i* D- [- n% N
<p>线程池:一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。 例如,对于计算密集型任务,线程数一般取cpu数量+2比较合适,线程数过多会导致额外的线程切换开销。</p>, P! Y6 s; O. h) \0 I
<p>如何定义线程池Pool呢,首先最大线程数量肯定要作为线程池的一个属性,并且在new Pool时创建指定的线程。</p>0 `+ G N' n- }: m
<p>线程池Pool</p>. p9 m9 \4 P* N2 F6 P9 P7 \
<pre><code>pub struct Pool {" W3 \( Z( q1 V. Q) i0 `$ `2 |, Z
max_workers: usize, // 定义最大线程数 p: T0 k3 c2 F+ `4 F
}
8 V9 W" v$ x7 R I3 s8 P( F/ h ] y* C/ @# s' B
impl Pool {
" p4 r+ Y$ \+ C, r fn new(max_workers: usize) -> Pool {}
$ f" G! d+ G1 g- K1 e3 K0 d7 X fn execute<F>(&self, f:F) where F: FnOnce() + 'static + Send {}) P+ Q1 W/ b' r9 x4 a: O
}
|( z; H: C) P" \' }- _6 `% l2 ^' l( |1 L# W
</code></pre>7 X5 B! `) G$ A9 X A$ C4 Q
<p>用<code>execute</code>来执行任务,<code>F: FnOnce() + 'static + Send</code> 是使用thread::spawn线程执行需要满足的trait, 代表F是一个能在线程里执行的闭包函数。</p>8 Z& o u+ o( R) V) f. G& j1 f& m" F
<p>另一点自然而然会想到在Pool添加一个线程数组, 这个线程数组就是用来执行任务的。比如<code>Vec<Thread></code> balabala。这里的线程是活的,是一个个不断接受任务然后执行的实体。<br>
# J# X& H; X& h' i- v. P2 I6 m可以看作在一个线程里不断执行获取任务并执行的Worker。</p>
0 N' U1 w+ b/ G8 V; e0 O; J, y0 X<pre><code>struct Worker where
, j* f1 y. [0 j* w4 a7 s* X{3 m7 m/ g5 D! d: G7 d k& }
_id: usize, // worker 编号
5 G0 n" y& j* c: }}
$ h6 i3 P/ }' K, W$ o</code></pre>" ~; Z3 u$ V9 o) _
<p>要怎么把任务发送给Worker执行呢?mpsc(multi producer single consumer) 多生产者单消费者可以满足我们的需求,<code>let (tx, rx) = mpsc::channel()</code> 可以获取到一对发送端和接收端。<br>
* C& q( w8 s S把发送端添加到Pool里面,把接收端添加到Worker里面。Pool通过channel将任务发送给多个worker消费执行。</p>% i# A) |4 V$ A8 w7 Z4 e9 M3 p: J
<p><strong>这里有一点需要特别注意,channel的接收端receiver需要安全的在多个线程间共享</strong>,因此需要用<code>Arc<Mutex::<T>></code>来包裹起来,也就是用锁来解决并发冲突。</p>8 [) L/ }4 h' }+ X% Z% X
<p>Pool的完整定义</p>
. M$ p% i: @ a( X7 T% c/ z<pre><code>pub struct Pool {! J0 ]7 z0 Z/ D( e
workers: Vec<Worker>,
1 A, p/ q/ |6 ~/ ^* }% M+ S max_workers: usize,: G6 \; F! y6 [' C8 ?
sender: mpsc::Sender<Message>7 n( E. j- N) j0 E
}! ]0 M8 {6 f. X0 s* r& q0 F& K
</code></pre>
$ V: b/ Q# Q r( {* ]7 t0 f) P<p>该是时候定义我们要发给Worker的消息Message了<br>
0 g7 Q! f# k$ f+ Q0 u定义如下的枚举值</p>& e8 N! I; u+ a8 S
<pre><code>type Job = Box<dyn FnOnce() + 'static + Send>;# I4 Y& o: M" R
enum Message {* Z- Z% U/ b& O8 A/ T
ByeBye,/ [# ]! |8 w7 t6 z/ j
NewJob(Job),. a2 n$ S% q2 |2 V
}* n$ p- y8 W3 @+ ], A2 K9 l
</code></pre>5 M7 I/ f1 [5 J( ?6 R
<p>Job是一个要发送给Worker执行的闭包函数,这里ByeBye用来通知Worker可以终止当前的执行,退出线程。</p>3 B7 f# T: ~& H: \- ]0 @
<p>只剩下实现Worker和Pool的具体逻辑了。</p>
# P/ X4 X8 R3 p+ o<p>Worker的实现</p>
* {' u% H# C9 l4 Y<pre><code>impl Worker/ Q) T) F# ~% @5 s" o }$ S2 Y
{1 n- s/ j' J4 V! A3 O
fn new(id: usize, receiver: Arc::<Mutex<mpsc::Receiver<Message>>>) -> Worker {8 |: _1 J* V4 j
let t = thread::spawn( move || {
( i0 w7 O0 S) C loop {
5 \2 ~. A5 i* S G {, S let receiver = receiver.lock().unwrap();
# o4 I, k8 `3 L let message= receiver.recv().unwrap();
( a# L7 Q4 r- P/ R4 J; A match message { Y6 ?5 b; p' C" [, u$ ]7 C; o# n8 e; u
Message::NewJob(job) => {" }1 T" a6 o1 h( b5 L) U) [
println!("do job from worker[{}]", id);- o8 V9 j6 S) r
job();
& Q8 z7 _) x+ b; ]+ ~$ z },
; y2 K9 r+ d) F7 L; ` O Message::ByeBye => {$ o; k9 V0 @1 R6 T& u
println!("ByeBye from worker[{}]", id);
/ h1 c& s4 | p" i+ w3 v; e break" W: m$ w+ _9 z# p6 m/ z2 j: [+ U; s
},. }4 R6 R: }0 {. r5 j: M
} 9 L9 Q, D1 o- F' W/ s
}
% B* M1 \% q" x: [' B1 X( ^ });6 P- ^$ G+ @* D5 a$ D
$ E: Y$ V3 t& n3 y+ ?: J
Worker {/ b# d' M- t4 q) X7 e
_id: id,
5 ~8 G( s9 @( a/ C5 @" L t: Some(t),' h; B6 D$ B* g
}
% s/ t1 R/ A0 _% u }$ S0 G; g+ q: q& u
}0 z/ U B$ g3 _; k, _
</code></pre>
c+ h( B; | {# X6 f% E+ G9 s<p><strong>let message = receiver.lock().unwrap().recv().unwrap();</strong> 这里获取锁后从receiver获取到消息体,然后let message结束后rust的生命周期会自动释放掉锁。<br>
/ H0 [3 v5 G Y8 V, M$ R8 k但如果写成</p>
: P. n% Y* \' W8 v$ e( _1 F W<pre><code>while let message = receiver.lock().unwrap().recv().unwrap() {, q+ A @+ v. b
};: y+ @# z& O& O' f1 j0 g
</code></pre>' j5 u9 k1 u6 h- n" Q
<p>while let 后面整个括号都是一个作用域,要在这个作用域结束后,锁才会释放,比上面let message要锁定久时间。<br>" }2 \' s' F% K: r
rust的mutex锁没有对应的unlock方法,由mutex的生命周期管理。</p>
# v e" F5 v# N4 ?/ y" X<p>我们给Pool实现<code>Drop</code> trait, 让Pool被销毁时,自动暂停掉worker线程的执行。</p>
6 W$ n( t. ]: a( C# l% g! K2 |<pre><code>impl Drop for Pool {; f, x4 V t' d; D; i3 M- K
fn drop(&mut self) {: _0 J" ~2 o, o t$ u
for _ in 0..self.max_workers {
) B' T' p! L, T0 V) e1 i$ f4 B self.sender.send(Message::ByeBye).unwrap();
7 D" m- y, H6 {7 ~ }1 K3 A* J9 R$ |
for w in self.workers.iter_mut() {" h( Y( R0 i; z4 F
if let Some(t) = w.t.take() {4 J- ~3 w" U3 G! V4 a5 F" ~
t.join().unwrap();
3 \# [$ [8 E b- _9 s' F/ M& r }# e. k5 u& J& I0 q/ l* y @
}, j9 [# |: e" ^9 J* j3 x; B1 m
}
* G% l( \( ?5 |2 E s}
; G1 L7 N7 h9 |& O* _) A" M6 t% n. r# t/ l; y# }* y4 @3 e. o
</code></pre>
7 R( Z1 B2 x$ R( u* m4 A# R<p><strong>drop方法里面用了两个循环</strong>,而不是在一个循环里做完两件事?</p>
+ K0 D: \. D B; O3 [& R<pre><code>for w in self.workers.iter_mut() {% V: S7 o$ \9 z! N
if let Some(t) = w.t.take() {9 i+ K& w/ q3 z- d
self.sender.send(Message::ByeBye).unwrap();
& [' D5 A' D- A6 m. w% b t.join().unwrap();) h+ z- Q/ T# b, ^; [+ Z4 ]. _
}
" U# c" G7 P3 ?0 O; z! A; `4 m}
+ x/ `8 S: K" l! n! V7 A/ G# |. T( b
</code></pre>
$ @2 E& K8 q# o6 z<p>这里面隐藏了一个会造成死锁的陷阱,比如两个Worker, 在单个循环里面迭代所有Worker,再将终止信息发送给通道后,直接调用join,<br>
% m: M+ @, |$ |! s0 P* M4 Y我们预期是第一个worker要收到消息,并且等他执行完。当情况可能是第二个worker获取到了消息,第一个worker没有获取到,那接下来的join就会阻塞造成死锁。</p>
8 ~2 ~' K1 J2 U5 f! J1 Y<p><strong>注意到没有,Worker是被包装在Option内的</strong>,这里有两个点需要注意</p>
s( f7 ~" { j# P5 d<ol>2 @9 i0 P" u; w5 ^; f' Z8 _) y
<li>t.join 需要持有t的所有权</li> C: f% E2 h' O. K; ^
<li>在我们这种情况下,self.workers只能作为引用被for循环迭代。</li># W3 ?0 D) I/ k) d) x7 J; _
</ol>/ [+ E8 j) i* h( F
<p>这里考虑让Worker持有<code>Option<JoinHandle<()>></code>,后续可以通过在Option上调用take方法将Some变体的值移出来,并在原来的位置留下None变体。<br>
0 a. `# d) u8 L1 h" q; g换而言之,让运行中的worker持有Some的变体,清理worker时,可以使用None替换掉Some,从而让Worker失去可以运行的线程</p>4 V7 [$ T7 M( E9 N
<pre><code>struct Worker where* W! V* ~ H7 s' i
{* L. U# c' a+ Y
_id: usize,
6 j" [$ H7 E; @) k# P t: Option<JoinHandle<()>>,, U/ _3 B ~( p4 R: X7 G
}
5 M2 x, b% w) |- ~4 H</code></pre>
0 _7 [! w) J! y- ]<h1 id="要点总结">要点总结</h1> @4 d: z$ f$ C- T
<ul>
0 F' i G' H5 j$ F" H<li>Mutex依赖于生命周期管理锁的释放,使用的时候需要注意是否逾期持有锁</li>
# x( l8 W" G& G, J' o<li><code>Vec<Option<T>></code> 可以解决某些情况下需要T所有权的场景</li>
0 e4 V$ p3 w: U/ j; h N: o! m</ul>/ n1 E0 z; z! G$ W& S, P S( e& p6 V8 g
<h1 id="完整代码">完整代码</h1>
! v; y6 Z2 z# }<pre><code>use std::thread::{self, JoinHandle};
/ k, r2 K, E( F6 puse std::sync::{Arc, mpsc, Mutex};
8 C0 Y* D- R$ H7 f& i! o( b% m- B
5 f. a2 T: H& D. U
" }" L' Z1 ~) X* M0 ?1 Ttype Job = Box<dyn FnOnce() + 'static + Send>;
8 F% Y: R8 d6 Z d5 o7 Ienum Message {# e7 x' x9 z' D
ByeBye,: {# @2 d/ n' F
NewJob(Job),5 y/ d/ D& ` N
}# ^6 P3 H2 \% J/ l) I% R
- H2 _) q* Z" ~1 _4 G
struct Worker where
, A W& L- T2 R. T2 k( z% m8 `8 O{" n' L+ o3 c1 Z. N" Y
_id: usize,
7 ]8 k D4 N) S t: Option<JoinHandle<()>>,) {, P- o/ W! v, f* U/ R
}
0 f/ g- r8 V9 M
7 D: t( z/ j% U/ Aimpl Worker
& _$ a" ` q* }/ o+ ], R{3 U# w% s$ s' s% N4 ]; J) x
fn new(id: usize, receiver: Arc::<Mutex<mpsc::Receiver<Message>>>) -> Worker {3 c4 Q, S# \. [* z" A5 |
let t = thread::spawn( move || {) L. D5 U; b4 ^# F
loop {
/ x6 _- ^ Z$ l5 {/ | let message = receiver.lock().unwrap().recv().unwrap();9 t& g/ n1 u& i/ l; y
match message {( J1 D7 h: j: |
Message::NewJob(job) => {
6 s' ?0 i3 {* r2 y1 a7 O println!("do job from worker[{}]", id);
% n4 t O" `5 U( X; v job();3 [0 g9 V. o/ K T$ r1 r
},
& J3 i s$ n9 z4 w" A8 z( f2 U ~ Message::ByeBye => {
# C, `" Q0 t% J M; D println!("ByeBye from worker[{}]", id);) e5 U7 e& }0 E% p) j; q- A
break
7 o, O8 ^# M- m: B. K) E1 r/ c },
9 p: s7 {3 O7 c# d, u3 r7 T0 C h }
9 V" G4 p2 [# `% }' j4 F }5 T3 }' }2 h1 T! ]. m" U: k( ], S7 n e8 ^
});
+ k, [/ s0 Y, V E: L3 n% |
& }, H; i% c; H5 r$ i% r Worker {0 ^; _5 s8 |9 j
_id: id,
: Q! R1 r$ O2 y6 t* P7 t" b3 } t: Some(t),) ~; m# S3 {8 [7 S3 \
}
7 s3 X' u) E M, p+ E } R4 u3 }; U( i7 u9 P9 b4 O
}
; {0 d6 z% e2 R4 y; o: U; G& U' H: L) a! H1 {( Q0 j
pub struct Pool {
& P2 t. P, c/ R7 Y workers: Vec<Worker>,
0 k9 {+ ^/ w# z3 G9 W5 T max_workers: usize,
- q z6 c" i& l" ^ sender: mpsc::Sender<Message>
% @$ o7 i. G( S}
- g) b: I0 t/ Z& u- \
7 J4 T6 a$ ^9 ?/ \8 Nimpl Pool where {! a8 g; v `6 A
pub fn new(max_workers: usize) -> Pool {$ g. X) f# U- j. }7 C
if max_workers == 0 {2 b! @8 r" S. S0 `% o
panic!("max_workers must be greater than zero!")
, x1 z- e; \& b2 F }
' ] G6 e( ~0 a; ?1 `: X5 j let (tx, rx) = mpsc::channel();
2 F: g% _( L- v, f
) o% R! b9 x; W" b8 Z let mut workers = Vec::with_capacity(max_workers);
1 F% j/ L5 e- `/ I9 ~! T7 r4 k let receiver = Arc::new(Mutex::new(rx));
5 K, n5 v% j9 S! w# C! x for i in 0..max_workers {
" o2 ^4 w- c& s. w# J: j! ~' n, l workers.push(Worker::new(i, Arc::clone(&receiver)));
$ Q; H( r0 t9 s' b; O }
+ U- i9 U( T6 _' }# C& ~6 f3 o! E7 I6 n! e# O
Pool { workers: workers, max_workers: max_workers, sender: tx }
2 p" v4 f/ [4 y7 C! N8 P1 q }! ^& i+ {! d N# D
8 s6 n. }6 ^; s9 |2 U1 S
pub fn execute<F>(&self, f:F) where F: FnOnce() + 'static + Send1 b0 _/ n& b8 z( n
{, t4 ]& _# ?" Q5 N; L
0 }, ?/ r8 T& m! y* D let job = Message::NewJob(Box::new(f));9 e$ z6 y+ G: i2 S5 I
self.sender.send(job).unwrap();: Q. Q$ Z+ B8 C8 b* M+ j
}
' U9 ^8 P, c+ t: X" c' c. F9 \}
) A& I: c- G( w( E" Z/ v( J$ P$ ^' O. X, S$ C- T1 X8 }. |7 b
impl Drop for Pool {# s2 Y1 z I; Y! {+ f! {; }
fn drop(&mut self) {2 i& c% u+ p2 `4 W3 F
for _ in 0..self.max_workers {: g% M* v$ y, `
self.sender.send(Message::ByeBye).unwrap();
5 C! l! d2 M6 u3 @- _ }9 w N8 C9 r) _) ~6 y3 [
for w in self.workers {
a+ w) K, A7 K% Z8 N- Y if let Some(t) = w.t.take() {
{) b. B# I/ h& F3 ?- P; {# m t.join().unwrap();
1 t5 m& X& H, K, |" s }
/ U6 w9 }/ n }" c7 ~8 H }
; D h5 F5 \6 v$ J }
, [. c# S1 |1 B! ?}
6 J$ q( x3 ?4 y% |! b/ P8 @' R8 y! e0 _
0 }7 l! ], y3 C( X: W
#[cfg(test)]5 _0 l" O" D* V6 ^* p' I$ ~
mod tests {
* d& ~+ V" @ g- R4 l+ a# C/ N use super::*;
* v2 s% X3 M2 ]$ A #[test]
) C$ U8 z0 {/ E& h. H9 e: } fn it_works() {& d0 Q, D P' s* G X4 ?2 S) ^/ b
let p = Pool::new(4);" c4 d1 B8 y) q5 O: y) U+ e7 Y
p.execute(|| println!("do new job1"));2 W4 L2 \3 R" A1 K# B+ U
p.execute(|| println!("do new job2"));
3 d% l( X/ a C2 e, u p.execute(|| println!("do new job3"));- g6 e' y( ]+ X$ Z0 t
p.execute(|| println!("do new job4"));
3 U: x# M; }" H }
* J) A4 t) P2 R0 ?! Z" ?; M9 S}( q+ o( ]- K( K- n7 q9 o
</code></pre># p0 i H) \/ {. O
3 J5 w; H1 T& _* Y1 \3 u5 A0 l |
|