|
|
0 i h W& a8 F6 V$ c5 D
<h1 id="如何实现一个线程池">如何实现一个线程池</h1>: p5 c( g8 t$ K3 K: v( X
<p>线程池:一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。 例如,对于计算密集型任务,线程数一般取cpu数量+2比较合适,线程数过多会导致额外的线程切换开销。</p>
! n5 z/ {! L. [% q1 b1 g<p>如何定义线程池Pool呢,首先最大线程数量肯定要作为线程池的一个属性,并且在new Pool时创建指定的线程。</p>% X4 [# z: r+ N( T* j7 F
<p>线程池Pool</p>
3 i4 H: `+ I: ?<pre><code>pub struct Pool {) v) @! a0 w) ~: e$ i
max_workers: usize, // 定义最大线程数
2 l3 z# e W. v/ E5 K}2 H* J5 j6 Q, [* t0 H
+ @( D' H9 Z9 B" E& }& O0 kimpl Pool {
2 V+ R8 p" V; m fn new(max_workers: usize) -> Pool {}
& j: Z) q7 o: p0 ~ fn execute<F>(&self, f:F) where F: FnOnce() + 'static + Send {}/ I& R/ q1 u1 ^. V2 [" ~- C% I
}
# Z# x( {: U: g+ l, c; j0 @3 {7 R9 `) P6 Y( W' f n
</code></pre>
8 }8 {/ ~- I: S: x<p>用<code>execute</code>来执行任务,<code>F: FnOnce() + 'static + Send</code> 是使用thread::spawn线程执行需要满足的trait, 代表F是一个能在线程里执行的闭包函数。</p>' G/ d6 a6 t; g. [! G
<p>另一点自然而然会想到在Pool添加一个线程数组, 这个线程数组就是用来执行任务的。比如<code>Vec<Thread></code> balabala。这里的线程是活的,是一个个不断接受任务然后执行的实体。<br>" H) {) ]+ b2 Z; o4 t
可以看作在一个线程里不断执行获取任务并执行的Worker。</p>
% i2 H" R& i( ~3 {<pre><code>struct Worker where% G2 m& h1 G& f" B8 L6 E
{
( G! F9 |$ L) }; w5 m( M' Z- i- C* i _id: usize, // worker 编号
1 g' o* z4 f' S, y& e% e: L}
# U4 U6 q2 Z a( K) n' J# ^</code></pre>' v- `/ D$ B3 C: n3 h9 F
<p>要怎么把任务发送给Worker执行呢?mpsc(multi producer single consumer) 多生产者单消费者可以满足我们的需求,<code>let (tx, rx) = mpsc::channel()</code> 可以获取到一对发送端和接收端。<br>
- d E3 {$ v! r" ] s把发送端添加到Pool里面,把接收端添加到Worker里面。Pool通过channel将任务发送给多个worker消费执行。</p>' j% G% k* K: {$ p$ M3 u1 v
<p><strong>这里有一点需要特别注意,channel的接收端receiver需要安全的在多个线程间共享</strong>,因此需要用<code>Arc<Mutex::<T>></code>来包裹起来,也就是用锁来解决并发冲突。</p>& n/ p- F+ B, x5 g0 O, S
<p>Pool的完整定义</p>/ f% P' P: k' z/ i* q
<pre><code>pub struct Pool {, ?2 s6 g) X% Z8 f8 E
workers: Vec<Worker>,0 s9 {8 P1 J' M8 ~
max_workers: usize,/ o1 e4 ^9 Q2 f( |7 y3 @
sender: mpsc::Sender<Message>
, C2 {5 t( ?( {: ]; ~}7 {( l' a. ]: P, E0 E
</code></pre>& M# | X" X2 H( t" E# }8 {: O
<p>该是时候定义我们要发给Worker的消息Message了<br>
8 G- i3 [& e6 c4 a定义如下的枚举值</p>
6 g9 d" L" g7 X2 A4 r<pre><code>type Job = Box<dyn FnOnce() + 'static + Send>;0 w3 p9 n3 v% ?; O4 E: k
enum Message {
! b: h' V& l$ k4 Z ByeBye,0 j) i4 O: L9 e9 R: @6 y6 e
NewJob(Job),
/ O8 x7 O. g5 O& R3 a# s+ }% ?}( z- v) p/ ~7 I
</code></pre>
& k' F9 z" s* |; u1 E<p>Job是一个要发送给Worker执行的闭包函数,这里ByeBye用来通知Worker可以终止当前的执行,退出线程。</p># w% x; ]) E" x3 {4 f
<p>只剩下实现Worker和Pool的具体逻辑了。</p>
+ C. k" S1 I j1 I( n+ p7 M<p>Worker的实现</p>
1 ~3 f/ X% m2 A& ~" Z* R$ u<pre><code>impl Worker
/ l7 a' C8 ?4 ?' u{
( p" Z0 j* k) L$ e fn new(id: usize, receiver: Arc::<Mutex<mpsc::Receiver<Message>>>) -> Worker {
$ o# M8 r: c1 K0 U+ @ let t = thread::spawn( move || {$ t9 p6 Q9 J; B! Z3 e
loop {
$ S6 P3 c+ g, u6 B7 e: y; ~ let receiver = receiver.lock().unwrap();: a8 }4 W, _, m1 A, r# s: q7 w
let message= receiver.recv().unwrap();! P S4 E8 U! \7 c5 Q: E
match message {
% f, U( r8 D: v4 Q5 p1 I V Message::NewJob(job) => {
/ O" n" q: U5 l" g println!("do job from worker[{}]", id);7 g( ~' |) i c. G: T/ f) ^
job();( t, ]. `6 z6 [5 @- A" a) O
},* F, u' W/ A# z+ D
Message::ByeBye => {
. c# d9 q6 v$ n/ R3 E& h println!("ByeBye from worker[{}]", id);5 w2 o; E7 O/ ~7 }* X! l
break
$ s. X0 a. D* T) Y! Z; E },
' N& {# C9 y {9 r7 |# R) l% O! D }
9 l' O# ^6 H( @ { }' v6 }# J4 B; b& u5 I/ @# X9 o( v( R
});
' r& E/ q9 N; _9 J$ Q( ~5 M8 j" B" |8 F. b- |7 l. G0 q5 S5 u# \+ u
Worker {
% \5 `; s. }% ~0 a P* G3 X8 x _id: id,! T& s% U3 N& K6 e8 M7 g
t: Some(t),+ ?1 T( e+ u$ [ t
}
# H9 |9 ~1 i2 Z$ E2 p* z }! E3 x2 N( G0 D& h
}
# c4 ~- |" q; L; I f9 l2 C# S</code></pre>8 O6 W( d3 E4 }, V# a/ `5 S
<p><strong>let message = receiver.lock().unwrap().recv().unwrap();</strong> 这里获取锁后从receiver获取到消息体,然后let message结束后rust的生命周期会自动释放掉锁。<br>: g& V+ p. G8 v* B6 k. ~
但如果写成</p>
2 b3 e. S6 {" N<pre><code>while let message = receiver.lock().unwrap().recv().unwrap() {+ C# L) v9 ]( [ i/ N
};4 p, A" x. f. {# O! S5 @( ~2 h
</code></pre>
7 ~% b- |; V `<p>while let 后面整个括号都是一个作用域,要在这个作用域结束后,锁才会释放,比上面let message要锁定久时间。<br>
3 {' Z% `1 U4 t% ?1 grust的mutex锁没有对应的unlock方法,由mutex的生命周期管理。</p>
/ _9 H5 Z7 h6 d2 X$ F% n+ Z5 T+ r% ~<p>我们给Pool实现<code>Drop</code> trait, 让Pool被销毁时,自动暂停掉worker线程的执行。</p>
. D" B @/ z" D<pre><code>impl Drop for Pool {. ?: e. p2 t# u8 I' ^
fn drop(&mut self) {8 R6 \' h" _, J. q, k
for _ in 0..self.max_workers {
* ], A) H3 w$ c self.sender.send(Message::ByeBye).unwrap();0 ]! r+ ~7 ?0 n; O1 i
}
% X1 [- |5 S8 k* K2 |* C( N for w in self.workers.iter_mut() {
1 g9 ~$ M: G* Y3 \- ?) s( E$ O8 _ if let Some(t) = w.t.take() {
, \. Q- {( H$ I/ Z% A, S t.join().unwrap();
; N1 b# x. ?9 h5 Z2 J }' p; g& f) t) D; f& F1 e
}
3 t0 u8 i1 |7 ~5 E1 X5 H }
4 z5 D0 f4 w0 P7 G5 j}
& j: J5 r- x2 I% j3 C# ?' Y: M8 y/ M' E6 x
</code></pre>
3 \+ T/ g) @% _# o/ C" |2 ^' W! J<p><strong>drop方法里面用了两个循环</strong>,而不是在一个循环里做完两件事?</p>
: a4 T4 ^8 M, p<pre><code>for w in self.workers.iter_mut() {
9 `- ] j% R; g; F5 g# C- | if let Some(t) = w.t.take() {# ]( `: O. J9 K
self.sender.send(Message::ByeBye).unwrap();, q" [3 L# W. V
t.join().unwrap();
5 C8 V) e, J# @/ l6 W7 Y+ G }5 S/ |& E% l3 v Z6 ?
}
9 K; `1 u# n9 i; F( m$ i; J5 F
</code></pre>
! s: m+ k) p3 U2 [. `& s<p>这里面隐藏了一个会造成死锁的陷阱,比如两个Worker, 在单个循环里面迭代所有Worker,再将终止信息发送给通道后,直接调用join,<br>
9 u7 C9 G) [* `4 n7 o# R' g我们预期是第一个worker要收到消息,并且等他执行完。当情况可能是第二个worker获取到了消息,第一个worker没有获取到,那接下来的join就会阻塞造成死锁。</p>; D" O. a8 R4 Y
<p><strong>注意到没有,Worker是被包装在Option内的</strong>,这里有两个点需要注意</p>/ @7 p+ h5 `9 b7 [) n- G% I
<ol>+ a Q# k! x, `6 @
<li>t.join 需要持有t的所有权</li>
q5 p" W% S& k( X( E<li>在我们这种情况下,self.workers只能作为引用被for循环迭代。</li>4 W4 Z5 y" o0 k8 X
</ol>* R0 q& T8 p' h- o( R# R8 z
<p>这里考虑让Worker持有<code>Option<JoinHandle<()>></code>,后续可以通过在Option上调用take方法将Some变体的值移出来,并在原来的位置留下None变体。<br># |9 G2 A# k# @: [
换而言之,让运行中的worker持有Some的变体,清理worker时,可以使用None替换掉Some,从而让Worker失去可以运行的线程</p>* l2 P* a3 d& T/ L& D p
<pre><code>struct Worker where
* f# a* [! a* H1 A{
1 H7 m" {! u" h _id: usize,
. z% H, I" t9 x/ _$ ^ t: Option<JoinHandle<()>>,
5 y( C' X5 c7 U) H}
+ J9 C0 m$ W& y</code></pre>2 }9 T) z9 N; F5 S
<h1 id="要点总结">要点总结</h1>
* \, |- s0 W7 }" c! |* ]: ~" X, d1 X<ul> @8 ^. J, B w0 B7 M& a z
<li>Mutex依赖于生命周期管理锁的释放,使用的时候需要注意是否逾期持有锁</li>1 d7 `. v9 L+ u4 U% i
<li><code>Vec<Option<T>></code> 可以解决某些情况下需要T所有权的场景</li>
( f- y3 _2 O* @" G, W</ul>8 \- z, q' I; k u
<h1 id="完整代码">完整代码</h1>2 L5 M9 X0 j& ~, q7 u) _4 h5 p i
<pre><code>use std::thread::{self, JoinHandle};
& v: S+ m5 p6 O* ]( k$ v! Z) b, v Suse std::sync::{Arc, mpsc, Mutex};: d1 ?2 u* D8 B: ]5 G; |% X9 Y! I
9 X. { Q D- H" T+ }) D/ u3 s/ V& z
+ y1 h' x4 v* c3 z; t0 @type Job = Box<dyn FnOnce() + 'static + Send>;
' j* A( j2 c4 L- Renum Message {
1 j K% Z# V4 g8 f8 x& f ByeBye,( b2 l1 A5 y( K' K4 g# }
NewJob(Job),
* h5 w- {) |- ]}, @1 E& j: ~7 h, O" ?
8 U2 `% k% h& u
struct Worker where
) r9 U: k. {; R5 F' o: y{
9 w9 E/ `7 v C4 ?' y1 O( G _id: usize,+ k) D, W5 h; ~8 g! S$ ^
t: Option<JoinHandle<()>>,$ w, \9 C. X* ~& n/ d
}+ V- G& d' \: T5 V+ {% R
" ~& E. G( ]" b4 I6 a% T: l, h* W
impl Worker& r( P6 b' D5 J6 b3 s% ^3 N0 A
{2 A/ F3 J. y; g8 G- |8 ?' L1 B
fn new(id: usize, receiver: Arc::<Mutex<mpsc::Receiver<Message>>>) -> Worker {
% o. x/ V' O- k+ V; N( V- f let t = thread::spawn( move || {
" z! r$ M+ `& Z loop {6 y1 `8 J4 M# n! _8 O0 G; H
let message = receiver.lock().unwrap().recv().unwrap();" `1 s6 x- P1 }% c* p* M9 ?$ d1 @
match message {3 O2 j! V3 \' s! m# Z" c. l* N+ i
Message::NewJob(job) => {
* ]# u# r J2 f: J% I1 D6 W6 n println!("do job from worker[{}]", id);, c/ {6 B7 m/ F5 [$ h
job();/ v; ~/ w2 ] m; j# Z" o
},
' A5 e; M+ y% c. f+ b+ n5 N) \% I Message::ByeBye => {4 P w3 }9 @( \% }$ E. {- m6 m
println!("ByeBye from worker[{}]", id);
V* F/ h* I V break
5 \! }5 s; c' N7 _ },, w, f; ~! q) Q2 z( y0 i- n& @
}
$ t5 M6 I7 S& j3 r( ^2 L, V }
3 Q: X3 K$ t! `& D });
5 l* N8 X1 f6 }5 X: u* P- |7 N
4 _* z' r% r# c) p- q Worker {
# q: V* i8 [1 m/ z _id: id,: Y' n2 K& R8 T" W0 Z6 I
t: Some(t),5 s# ~/ M6 [2 c3 A
}
# b( x( f6 `4 N }
+ Y, W. m; q: o- B* K}
g3 ]% Y9 E! \1 D$ O( L. K8 a3 @; P- I- S7 Y6 R$ b3 a. X1 J) n
pub struct Pool {; q" J* \) f) x" k+ {6 R
workers: Vec<Worker>,
; m% s5 l7 x( S8 H8 ` max_workers: usize,
# m3 b; i; i1 V! B; f3 L sender: mpsc::Sender<Message>3 h+ m) j- h+ o# W. r& j2 \; w
}0 N5 J V& T1 u) J' ~" s) X
R R- A6 d) t4 [; b
impl Pool where {) H5 i" C# D. S6 A$ m, g
pub fn new(max_workers: usize) -> Pool {/ M& R' Q7 e$ J$ x2 G
if max_workers == 0 {, E& X7 a! _3 B
panic!("max_workers must be greater than zero!")
' y8 i4 u9 q7 p; ^- ~ }& v% _+ J* L# @2 X: b7 C
let (tx, rx) = mpsc::channel();
) R6 L2 }- G3 |2 @5 v! B2 U+ F" M1 g0 f. n/ }0 U L, T5 I
let mut workers = Vec::with_capacity(max_workers);: l' l! s8 \6 a5 ^2 h% V h
let receiver = Arc::new(Mutex::new(rx));
1 E0 w1 r) X% v% w# u+ M( P for i in 0..max_workers {6 H% |. u9 e/ k2 X
workers.push(Worker::new(i, Arc::clone(&receiver)));
/ h6 M& M* d& p+ b3 D }- G% E3 L' U$ t
, i( W: `. A# E' h( u; A' _
Pool { workers: workers, max_workers: max_workers, sender: tx }
0 e4 b, z8 a+ ~; s- c* T }' V$ Z8 F( @4 d0 l* N
! L8 ?) |8 C" v
pub fn execute<F>(&self, f:F) where F: FnOnce() + 'static + Send5 D* W# ^9 d3 R6 P: l
{* s2 f4 D; k1 [8 |
0 k2 M e( D( b, z7 X
let job = Message::NewJob(Box::new(f));9 ]8 L& z# O" k3 |; Z
self.sender.send(job).unwrap();
" k3 \* {5 S5 G& C }: Z3 A) o# a4 i9 ]/ v/ m1 `1 t
}
w3 G7 Q1 x) H- ~& g5 [6 G
/ l) X7 o# Y: H; W" V) n7 timpl Drop for Pool {" [+ M5 N2 A8 E0 h7 m' u3 V1 `7 D2 h
fn drop(&mut self) {
! m0 W. \& j9 @5 p2 N9 V) e for _ in 0..self.max_workers {
1 {# n. O. x: Z9 w self.sender.send(Message::ByeBye).unwrap();5 H3 D/ N0 v; S. U. L0 `8 R* a
}& s4 r8 t* ]7 C6 x; k( k- g9 ~
for w in self.workers {) v2 z; j4 v# O9 F, P
if let Some(t) = w.t.take() {
/ k T! d/ W& D/ x, ? t.join().unwrap();
1 D2 r% V) `% [, h. K9 H) a8 d }
4 s- `: P' Y; n0 ` }
) f8 f) t' N V5 z* M. L }
7 o5 Y6 w, o/ t}3 v; h+ g4 F3 [" u/ {0 v
1 B! M7 t- V" ?) \& Q
3 ?$ j9 ?/ j5 m# T1 A. l* Y ]
#[cfg(test)], I6 p& e( C6 G$ @$ m( ?: ]
mod tests {
4 C' n) }) Q/ t- @ use super::*;
0 V3 D# d, J: u/ s& G! c1 `& V- { #[test]0 J9 }! s8 M. m7 U% u4 s2 A0 J% @
fn it_works() {
8 O& ]6 U% G5 a$ Y! Y$ ?6 C let p = Pool::new(4);
% m d6 v J+ n p.execute(|| println!("do new job1"));
! ~% d( q+ p8 M2 j+ A p.execute(|| println!("do new job2"));1 Y# j! S- I. `2 |, x8 j
p.execute(|| println!("do new job3"));# U. E0 S9 S; T& g
p.execute(|| println!("do new job4"));
8 k5 I; L2 n: d" d+ u2 @+ G; ~ }0 N2 N/ P7 z6 c& M" V" L8 n' i' a7 e
}
4 J' @" z( G1 s% z4 ^# n</code></pre>5 v& x& G; A, z* X$ |
+ k+ \, w# c* S7 I( ?$ D2 |9 u
|
|