|
% l' |! ?% \+ f4 f
<h1 id="如何实现一个线程池">如何实现一个线程池</h1>
' D3 p+ X Q( N9 d8 {) N8 y* f' b<p>线程池:一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。 例如,对于计算密集型任务,线程数一般取cpu数量+2比较合适,线程数过多会导致额外的线程切换开销。</p>
$ X" a1 { \( R$ j) y5 e, J+ E; I<p>如何定义线程池Pool呢,首先最大线程数量肯定要作为线程池的一个属性,并且在new Pool时创建指定的线程。</p>+ W1 c: |. X8 y
<p>线程池Pool</p>% a i, I' X- \/ A6 }4 u
<pre><code>pub struct Pool {
- w& O2 R! d7 q3 `" z: q5 m+ {- o max_workers: usize, // 定义最大线程数" D; A+ C" w3 D3 \$ j
}
) [' _5 a* F% J5 \+ w2 I5 s( L, s* t8 U) f0 \! R
impl Pool {
) e6 ^" z# g" M3 s fn new(max_workers: usize) -> Pool {}
( V/ R- \6 G1 X9 ]7 `+ N( c fn execute<F>(&self, f:F) where F: FnOnce() + 'static + Send {}, Y: z& @6 }1 R4 [4 _$ F
}
+ Q) G8 X+ `# w# y: K# b# n# W0 b3 z8 h% }
</code></pre>
4 j, p" [; T1 o0 J; a+ q# w<p>用<code>execute</code>来执行任务,<code>F: FnOnce() + 'static + Send</code> 是使用thread::spawn线程执行需要满足的trait, 代表F是一个能在线程里执行的闭包函数。</p>
6 ?8 @4 e/ g- j/ r2 M<p>另一点自然而然会想到在Pool添加一个线程数组, 这个线程数组就是用来执行任务的。比如<code>Vec<Thread></code> balabala。这里的线程是活的,是一个个不断接受任务然后执行的实体。<br>% i6 l6 V' [( C. V$ d }2 K& E f
可以看作在一个线程里不断执行获取任务并执行的Worker。</p>
; ]/ |5 L8 m/ t<pre><code>struct Worker where
8 C# c7 T! r( C{
# [" m! \1 i: o0 Z; F. ^ _id: usize, // worker 编号- z$ _1 p7 I1 i! T
}- |7 [1 }; t7 v
</code></pre>
3 g1 y1 E* T% Q3 {* ]' ]9 b2 Y: h<p>要怎么把任务发送给Worker执行呢?mpsc(multi producer single consumer) 多生产者单消费者可以满足我们的需求,<code>let (tx, rx) = mpsc::channel()</code> 可以获取到一对发送端和接收端。<br>% k. S2 h) h1 r' A) a W
把发送端添加到Pool里面,把接收端添加到Worker里面。Pool通过channel将任务发送给多个worker消费执行。</p>4 Y1 m7 q- H9 p. e2 [' e
<p><strong>这里有一点需要特别注意,channel的接收端receiver需要安全的在多个线程间共享</strong>,因此需要用<code>Arc<Mutex::<T>></code>来包裹起来,也就是用锁来解决并发冲突。</p>- b4 v0 r: v4 K+ j# z* m3 Q6 s
<p>Pool的完整定义</p>: b9 v8 g6 ~. \! M4 \
<pre><code>pub struct Pool {
, ~- S( I8 X' X* e+ ~, w; e! q5 y6 B workers: Vec<Worker>,
& U) k2 b4 T# J& N8 \1 C; z2 P6 i max_workers: usize,
# v1 X. a( q" o' q sender: mpsc::Sender<Message>5 T" T* M. }% J6 [8 V+ R8 }9 Y/ C
}
4 L) R" L6 H( a; c7 I</code></pre>; z8 q' e1 A- j- E1 X# |% C+ V
<p>该是时候定义我们要发给Worker的消息Message了<br>' P% m1 U2 G3 M" b) w: [) x4 x' _4 z
定义如下的枚举值</p>
) v2 X9 w. L& Q2 b6 p<pre><code>type Job = Box<dyn FnOnce() + 'static + Send>;! g% Y2 y1 P) J' I
enum Message { {; W1 s. |0 E- w
ByeBye,$ t+ O: ~8 E8 e/ ~
NewJob(Job),- d9 J& T" |) O" k& N
}
# c. p- `8 [# ~9 F4 f) ~( P</code></pre>
! W& }0 G% {* t<p>Job是一个要发送给Worker执行的闭包函数,这里ByeBye用来通知Worker可以终止当前的执行,退出线程。</p>
- M# Q U6 t, @3 d9 y( Y2 E m<p>只剩下实现Worker和Pool的具体逻辑了。</p>
! |( m) N0 r$ w* \3 N7 @) L<p>Worker的实现</p>+ y% e% p& v5 x# r
<pre><code>impl Worker
$ r1 q" ?$ v1 ]8 U/ A0 A{0 M$ {" C3 F% d' \- g
fn new(id: usize, receiver: Arc::<Mutex<mpsc::Receiver<Message>>>) -> Worker {4 z( b, Y' d% ^- B6 j* v
let t = thread::spawn( move || {, \. s: u/ l$ [ j+ [
loop {
; C2 b! r9 O% a5 J/ R- ? let receiver = receiver.lock().unwrap();: D( E3 U6 f& z7 L* C& i3 I1 J
let message= receiver.recv().unwrap();0 f) W9 S% L+ Z9 h
match message {$ Y* P. d% f$ B/ w1 W
Message::NewJob(job) => {
7 c6 f3 ]) h3 N* b$ }+ T println!("do job from worker[{}]", id);
5 S6 b0 \% N( T$ T9 K; y job();
- v; k5 O a! z" Q },4 E9 s6 a6 Z* Y9 [, j' a
Message::ByeBye => {
. x: X& |! w+ h F: S0 K println!("ByeBye from worker[{}]", id);
" x3 z( i; j( U; {) P$ V# B+ D break2 [/ t8 p$ P, L4 I/ P& s5 p
},
8 |2 w: t8 s: [ }
9 q. d7 ^; c( P; X' r }0 a$ ~, w# s0 L( h$ g- Y4 ^
});* E# I; g% ]& D/ i0 l
1 b3 v& b: [& A# ?6 `
Worker {
1 i# `" Z/ c6 z a6 x' | _id: id,
0 `/ o/ j2 Y/ t+ J6 S+ I. @2 z t: Some(t),
% z: H, L2 D* @# a }
. y! @# `* F L# l }. d, X& l2 z/ H' H( ~
}; z0 D1 u' l- V* g
</code></pre>
' @0 }* f$ F4 m% o+ B W/ ], M2 x<p><strong>let message = receiver.lock().unwrap().recv().unwrap();</strong> 这里获取锁后从receiver获取到消息体,然后let message结束后rust的生命周期会自动释放掉锁。<br>8 g/ M8 t/ j5 n( J# A4 r1 O6 t
但如果写成</p>
. f" E; o" g. h6 G<pre><code>while let message = receiver.lock().unwrap().recv().unwrap() {2 I( V8 k: `* P/ k
};: x# W2 R& J' R; V0 i) X; b
</code></pre>
B/ ^2 R6 \9 |, O; \- w: d$ Y( B<p>while let 后面整个括号都是一个作用域,要在这个作用域结束后,锁才会释放,比上面let message要锁定久时间。<br>3 I& E) h" x6 x* w" L3 k5 T3 i g
rust的mutex锁没有对应的unlock方法,由mutex的生命周期管理。</p>7 c. E& a' g' I0 c* d3 w0 d6 {, h
<p>我们给Pool实现<code>Drop</code> trait, 让Pool被销毁时,自动暂停掉worker线程的执行。</p>' T9 ^9 S, O2 U
<pre><code>impl Drop for Pool {* W% E, s; a: e+ y% o" J& r5 R
fn drop(&mut self) {
: _, Q/ m% s! x. f for _ in 0..self.max_workers {
j A% B: ?1 Y1 b9 U self.sender.send(Message::ByeBye).unwrap();
+ b) q* J/ T- |6 f" N+ ] }
3 x `% H8 g$ L+ @9 A" D for w in self.workers.iter_mut() {9 I Z- e$ ^* c$ N
if let Some(t) = w.t.take() {* [/ p' \8 i4 [ A. D6 m/ W, u
t.join().unwrap();# C. o! f5 A. x3 r& c
}2 {7 s! {0 z5 @& @# [ {" q1 o
}
/ s+ b& X; s4 [' P }
% [+ W1 I6 d5 j; N6 }}
# w" [3 {% B& V) h* ]6 I: V& r. a, ?: f; d ?
</code></pre>2 z- j; o! \+ K- ]9 e; y
<p><strong>drop方法里面用了两个循环</strong>,而不是在一个循环里做完两件事?</p>1 P5 l1 z. O' `- O
<pre><code>for w in self.workers.iter_mut() {+ u3 Z. [; C. n. R0 z
if let Some(t) = w.t.take() {
9 E8 a* X# X5 a0 \9 ]* { self.sender.send(Message::ByeBye).unwrap();( e- U5 U7 I$ w+ E3 G D& w# M& q
t.join().unwrap();! k9 ^0 O7 T! A3 Z6 a
}5 o, }/ }& h' W% B
}2 p2 ` ~1 I% }
. B3 ]! j4 N% ~( g" s* Z% W</code></pre>
4 T* Y, q! Q8 k% |, X7 Z! `$ r<p>这里面隐藏了一个会造成死锁的陷阱,比如两个Worker, 在单个循环里面迭代所有Worker,再将终止信息发送给通道后,直接调用join,<br>
% L4 @* ~" _" |1 ]1 X4 k我们预期是第一个worker要收到消息,并且等他执行完。当情况可能是第二个worker获取到了消息,第一个worker没有获取到,那接下来的join就会阻塞造成死锁。</p>! W+ p6 b) f1 s9 s& e0 p
<p><strong>注意到没有,Worker是被包装在Option内的</strong>,这里有两个点需要注意</p>
# N+ P' b7 n! T1 ]4 O; G+ o3 l$ [<ol>" z# y4 \$ F5 P% W' F4 j. F
<li>t.join 需要持有t的所有权</li>
2 I; g' V8 \ |" S* F<li>在我们这种情况下,self.workers只能作为引用被for循环迭代。</li>
& [" ~, y8 j) M</ol># q# ^7 U: ?9 q! v" J/ f
<p>这里考虑让Worker持有<code>Option<JoinHandle<()>></code>,后续可以通过在Option上调用take方法将Some变体的值移出来,并在原来的位置留下None变体。<br># o+ j P4 D. `5 R7 K0 B
换而言之,让运行中的worker持有Some的变体,清理worker时,可以使用None替换掉Some,从而让Worker失去可以运行的线程</p>$ j# g6 C, F( X
<pre><code>struct Worker where) J. L6 m i$ e3 t5 @
{) b" B5 j& m4 x4 J" ?
_id: usize,5 A4 p5 {- i. A8 f- h: F* g
t: Option<JoinHandle<()>>,. \- T5 T2 Y6 }% ?9 i( Q2 c8 N, [
}
' O& W! H" F5 ^7 g2 N) F+ I</code></pre>
% U* ]7 Q1 `+ v9 p# j<h1 id="要点总结">要点总结</h1>3 F {4 y1 O. I8 |5 w, \! ^: q
<ul>* M$ c9 I6 q. {- Z$ K1 D9 [
<li>Mutex依赖于生命周期管理锁的释放,使用的时候需要注意是否逾期持有锁</li>1 S% \) L/ t) ^& v5 M8 M o
<li><code>Vec<Option<T>></code> 可以解决某些情况下需要T所有权的场景</li>8 D9 Z* l6 t* a; x3 f
</ul>
5 F- ~4 n' r: j% m* ?% q8 d( M<h1 id="完整代码">完整代码</h1>) r+ H# V/ M7 C' B+ C- R1 q
<pre><code>use std::thread::{self, JoinHandle};
! w; H. P# V5 v; X8 e yuse std::sync::{Arc, mpsc, Mutex};- k! l2 E8 c3 Q4 A' e' \
1 A! M# ]0 i4 V5 F7 F% A
. V6 s, e/ J f+ x2 p* ~% ^type Job = Box<dyn FnOnce() + 'static + Send>;1 b% E I& Z* K' J- R4 c) m
enum Message {
4 g; T, i7 t. o( k* ?2 y2 l ByeBye,$ ^5 j5 x% p6 }# J
NewJob(Job),
) G3 }2 P4 t, |' |, [ C1 M0 H" Y}
: S/ N' f/ E! \/ N! o: d8 a; \& v. R) J+ C+ j* U- Q8 u g
struct Worker where
% E3 |: B4 v) P' \* S9 j& @! p{7 Q1 M$ z$ K9 B% v
_id: usize,1 _ K" h! D/ r- m5 O
t: Option<JoinHandle<()>>,
) a. I' \9 c% N! T# O3 ^: D( W}3 ^. k) q* n% q* o8 M
# ?1 F5 F1 G3 o- a/ }impl Worker5 J% y9 |: O5 s! y: R8 ]0 T
{
3 H& F* Z( D4 p' W% N; A1 O fn new(id: usize, receiver: Arc::<Mutex<mpsc::Receiver<Message>>>) -> Worker {
4 F4 i+ _4 h& u. W# J" F let t = thread::spawn( move || {
# U4 O6 o' C- |- g* o- r loop {( E$ q7 T+ R+ f: u
let message = receiver.lock().unwrap().recv().unwrap();
3 f0 k9 O7 Z' |# k' I/ P match message {9 G1 L. i9 K/ W' a2 O
Message::NewJob(job) => {
2 J4 z, g. q" |) c; h& C ?* ^$ S5 x6 ? println!("do job from worker[{}]", id);, v$ b7 D% R Y( J8 R# N0 S
job();
1 i) ?* y9 k9 b/ s3 `9 j" \7 s- u },
8 t- p' R# D. _) @3 c Message::ByeBye => {
/ G0 i6 c4 _: V+ ~ println!("ByeBye from worker[{}]", id);
6 t! j3 N% [5 \$ H" G w/ a break& o4 m5 C# Z! P! a6 i
},
9 W3 v: P$ ]$ L: t } $ V/ G; f- v2 m$ O
}
( \3 w1 [" G1 s- ^0 g });
; K, A$ G. J9 S! R$ w
/ x2 F1 v( i* u( o6 m+ U9 e Worker {) G; w+ [4 ~3 j
_id: id,7 Z$ w7 k8 p& [$ |
t: Some(t),
# m0 m9 U2 } q: X) n9 t9 ^0 W; P }
0 y ], c, Y9 j }3 H" j" F$ O2 y0 i
}
# ]9 [0 v. v# d v8 \ y7 s# i9 ]0 v0 E4 Y1 E' ?2 [
pub struct Pool {+ d' T9 [ R/ v9 N2 x' }0 a
workers: Vec<Worker>,% ~. l9 ^8 X; V+ U6 {
max_workers: usize,) ]2 t' A7 @) h4 \
sender: mpsc::Sender<Message>
q( m7 k6 z0 u7 `2 m; e}
/ }6 v% H! p% z7 F& B& _& W3 O& j7 {2 F- a, T
impl Pool where {6 P6 p8 q- y0 b: X
pub fn new(max_workers: usize) -> Pool {3 e" ?5 e& p. G1 K) Z7 B; \0 a
if max_workers == 0 {
7 I5 W/ P3 ^* b1 r3 p panic!("max_workers must be greater than zero!")
5 _6 x) N# }. o0 Q0 T }
) ^9 P) [3 F( J% o2 l/ X8 | let (tx, rx) = mpsc::channel();9 ]* A1 J; s9 g$ X
$ ?$ |) o! h! \' q' r let mut workers = Vec::with_capacity(max_workers);
6 s6 m+ x& q, T& H( W- ] let receiver = Arc::new(Mutex::new(rx));! c X0 i. k" N* X2 ~
for i in 0..max_workers {% h, t+ B0 }3 J. q6 z
workers.push(Worker::new(i, Arc::clone(&receiver)));
$ z6 i6 F, a4 l% y }/ G% O! ~& ?( m5 n0 O" J
) v* T% Y" R* @ G+ q6 S Pool { workers: workers, max_workers: max_workers, sender: tx }: T$ e4 V u A7 x3 n
}! W8 O) e9 ]1 o3 S, P7 q
) |. i; r& P! E {4 @7 I pub fn execute<F>(&self, f:F) where F: FnOnce() + 'static + Send
5 ], T. x$ _, V6 Q+ a {
* w) d* j& P5 a7 y6 I
& {1 X: [, [; U0 u/ g$ u. d let job = Message::NewJob(Box::new(f));9 |. R; d) I$ D4 v3 c
self.sender.send(job).unwrap();
( e1 y# W! L- }. {6 o }7 Q0 A6 m. ]+ ~, h, q
}7 F& A3 k8 q& P5 O! Q; h5 O6 [- P
: s) d0 G5 s( |7 _3 m( r+ fimpl Drop for Pool {, C! m% m7 s2 q
fn drop(&mut self) {* c% z2 J" l0 ~5 v2 d0 d: m
for _ in 0..self.max_workers {
6 N Y" M% O) R: ]" [/ f self.sender.send(Message::ByeBye).unwrap();
; T/ ]' ?& ^6 N0 D! L3 q }1 j! U' V% y; n, e/ [
for w in self.workers {
0 N4 _1 u. G4 v' M9 j if let Some(t) = w.t.take() {8 \$ p0 }7 {) W
t.join().unwrap();! g: N( N4 k: J- M
}; y. L2 j' N8 e9 L9 N6 }1 }
}9 s2 h" h( f9 @$ v# c* X- F
}
2 h* o" O+ {* h}( x3 S. d c6 q$ R
! V5 N K* H9 S1 g4 {- V
9 z1 O+ B. p/ c+ `$ G8 S/ l: G
#[cfg(test)]2 N0 `" F# X6 d: A# a& l
mod tests {5 R# f& @. Z% j& V
use super::*;
& `" ]: E, b- I' d- N& W! ^7 _ #[test]% X9 r) _5 q. g/ Z) B& {
fn it_works() { m4 F7 {( H% D; T" G
let p = Pool::new(4);
: D! ~8 K8 \2 I1 j. W- k0 s6 E& b p.execute(|| println!("do new job1"));/ ?; M$ @2 G; n; B: }+ p0 `+ G
p.execute(|| println!("do new job2"));4 ~) j. M0 B4 G! s( p4 l
p.execute(|| println!("do new job3"));
Q6 M: j3 Z0 m! X% D3 D, A: Y p.execute(|| println!("do new job4"));3 y- j- T/ R" { H: _8 x: t9 l
}! G7 y2 N' E) E( P& w# h
}
1 p5 r. M6 j5 ], r! W0 y</code></pre># H) W! \! O) s/ a+ E2 b
7 t( r; b+ m: r! G |
|