|
|
0 R" W9 s# o+ `6 C6 \<h1 id="如何实现一个线程池">如何实现一个线程池</h1>0 j/ n9 Z- i7 q3 y* O3 Y
<p>线程池:一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。 例如,对于计算密集型任务,线程数一般取cpu数量+2比较合适,线程数过多会导致额外的线程切换开销。</p>5 f9 N3 R" A) B; H& x7 x* e
<p>如何定义线程池Pool呢,首先最大线程数量肯定要作为线程池的一个属性,并且在new Pool时创建指定的线程。</p>7 W* i) b% @. X/ m v9 [8 K- L4 n
<p>线程池Pool</p>9 J6 h6 W5 z6 T- u1 I0 p
<pre><code>pub struct Pool {
1 i) Z+ N) Z7 J" w0 B max_workers: usize, // 定义最大线程数% @: A* K* m! `) d" F! L0 o
}
$ |# Z' X3 t) R1 A
: z8 j( P% D7 E- e5 q& z, Limpl Pool {9 _' I$ \0 k+ C) V8 X3 v
fn new(max_workers: usize) -> Pool {}& X2 m0 l; X- L3 M4 ?* [. q
fn execute<F>(&self, f:F) where F: FnOnce() + 'static + Send {}
# k! }# _% N# f1 N# T" f}; r& r/ d/ J8 _
# K+ _4 `4 _3 D6 ^+ t5 w
</code></pre>
% }! G) @) b9 P; V, R% G L6 ?<p>用<code>execute</code>来执行任务,<code>F: FnOnce() + 'static + Send</code> 是使用thread::spawn线程执行需要满足的trait, 代表F是一个能在线程里执行的闭包函数。</p>
$ M! r9 W! i4 J5 t# O<p>另一点自然而然会想到在Pool添加一个线程数组, 这个线程数组就是用来执行任务的。比如<code>Vec<Thread></code> balabala。这里的线程是活的,是一个个不断接受任务然后执行的实体。<br>% J& P, M4 X$ x8 q- _ c
可以看作在一个线程里不断执行获取任务并执行的Worker。</p>8 H$ W* {( u9 T% U: O
<pre><code>struct Worker where( Y; }" f) T9 ^- N9 W7 V8 @$ R% x
{
7 [8 O; @# Q6 {5 R _id: usize, // worker 编号8 l# T% ]8 h0 L2 i% P
}3 s, I i* D/ j8 j+ q1 @$ Q: }$ o
</code></pre>
/ A, ?/ Y V' k* Y z) L, P<p>要怎么把任务发送给Worker执行呢?mpsc(multi producer single consumer) 多生产者单消费者可以满足我们的需求,<code>let (tx, rx) = mpsc::channel()</code> 可以获取到一对发送端和接收端。<br>( @+ x+ Z$ t( m. l# w
把发送端添加到Pool里面,把接收端添加到Worker里面。Pool通过channel将任务发送给多个worker消费执行。</p>* `5 s; q9 C2 i' C* p# J8 t
<p><strong>这里有一点需要特别注意,channel的接收端receiver需要安全的在多个线程间共享</strong>,因此需要用<code>Arc<Mutex::<T>></code>来包裹起来,也就是用锁来解决并发冲突。</p>- ]6 Q# @, L }* ~+ x) F! R& v3 s
<p>Pool的完整定义</p>
; k5 ?& m8 B% H0 Y5 F1 Z, ~<pre><code>pub struct Pool {
5 d( s# b3 V* F* c& F workers: Vec<Worker>,: P) A+ A0 O! k2 x
max_workers: usize,
7 G0 H3 V( l' Q p- @7 a sender: mpsc::Sender<Message>
! [% X1 u/ k' h6 h; {( V}; @$ D/ h2 J3 A% K' F* ^
</code></pre>
6 M) l r: s7 i- j<p>该是时候定义我们要发给Worker的消息Message了<br>
% x4 ?3 L( S+ b3 }) { O- e5 x, U定义如下的枚举值</p>
_1 ?8 |9 x+ H/ h- m<pre><code>type Job = Box<dyn FnOnce() + 'static + Send>;7 n# Y3 k8 D( K$ P7 Z" P3 k" k# `
enum Message {
2 R G. E. p+ p ByeBye,
) a2 Y2 i' c9 S# B1 P1 p, [" h+ u NewJob(Job), ], f' G7 d' T$ l! j9 J
}) I- s- O) J# T% {, G) \
</code></pre>2 c2 e; l* M W2 a3 [0 E: S n. o
<p>Job是一个要发送给Worker执行的闭包函数,这里ByeBye用来通知Worker可以终止当前的执行,退出线程。</p>" I9 M& g8 k( D7 j0 c& w* I0 d* L) F' N
<p>只剩下实现Worker和Pool的具体逻辑了。</p>
: c8 i) i, v+ h/ S" {( z<p>Worker的实现</p>' i5 y# ~4 J6 S: c% x* C. t
<pre><code>impl Worker' b- U% E* x: D- S- l$ ~! Y$ X
{
' }. O0 d$ @2 U9 D' _: ~ fn new(id: usize, receiver: Arc::<Mutex<mpsc::Receiver<Message>>>) -> Worker {
0 ?" M# ~9 T( Y7 O1 A0 K+ e8 s let t = thread::spawn( move || {( j0 T, I0 p) m) P) b! t7 C
loop {1 g) j, H# p% L" N" d& q* g
let receiver = receiver.lock().unwrap();
8 b% i, W) O, T0 ?, n/ H* L: C7 t0 [ let message= receiver.recv().unwrap();5 P( h, e+ i9 V- F
match message {
& H' P+ o. e/ {3 y8 \/ o Message::NewJob(job) => {. \9 ? L! w8 R5 r: U: c
println!("do job from worker[{}]", id);
" k* ~5 A, \3 Z: D3 i" w6 p job();7 J8 I$ E [ X" ]) y- Z5 w
},9 S( u- C( n) b* U6 I
Message::ByeBye => {
8 `# [6 { H: _# a, z, [: J1 s4 H5 m println!("ByeBye from worker[{}]", id);
3 @% d, E/ \9 Z5 U% ~+ z- D' D break( s7 J! I+ y# B' z6 v: c
},
7 B; t7 K- H" [4 ^ }
4 i! t: Y! w; F; Q }
+ K# q* Z" G' ]- D });4 }: K! P* l- U1 n! I! K g: i
0 B7 E/ l+ l0 D4 D: D
Worker {
0 {7 {" H$ f5 k$ o+ u1 B _id: id,
) o c. q/ R! Q* X/ S l t: Some(t),
. x1 B5 J% A- {. J8 C+ Y8 U) n9 B }/ N0 g& j! C$ M* Z3 p! M
}
# f7 }3 I$ N. v2 o9 z/ @}4 m+ o7 v: ]4 {9 m
</code></pre>) O' ]9 J, Z; T
<p><strong>let message = receiver.lock().unwrap().recv().unwrap();</strong> 这里获取锁后从receiver获取到消息体,然后let message结束后rust的生命周期会自动释放掉锁。<br>
) d* ~0 R* h1 M& }. k( H0 p! u但如果写成</p>
, n; A% {5 r/ T<pre><code>while let message = receiver.lock().unwrap().recv().unwrap() {. D; P1 X6 ?, Z/ u
};
# s+ a% S& E# ~1 {</code></pre>
# R* A# } H" L% B/ ~/ V2 N, v! q<p>while let 后面整个括号都是一个作用域,要在这个作用域结束后,锁才会释放,比上面let message要锁定久时间。<br>
* ^; h F3 `* V5 _- L$ crust的mutex锁没有对应的unlock方法,由mutex的生命周期管理。</p>
; r' l7 M! G0 A7 p<p>我们给Pool实现<code>Drop</code> trait, 让Pool被销毁时,自动暂停掉worker线程的执行。</p>
$ d/ a" ]) T4 k. o+ y) ?<pre><code>impl Drop for Pool {3 T/ [. C$ |% J( z5 T! K
fn drop(&mut self) {
$ k( ?( |( Q$ |) t. ? J for _ in 0..self.max_workers {/ t5 D% s1 S9 \: p U& X/ h) p3 Q) G
self.sender.send(Message::ByeBye).unwrap();
/ m* I' e. z% b9 C) R% p2 @ }
* F, U' `0 L3 l for w in self.workers.iter_mut() {) u0 x1 G8 N4 g2 |) I2 F
if let Some(t) = w.t.take() {
9 \, _0 M0 m- c( Z# C7 t6 Z& I0 R t.join().unwrap();
3 j7 h3 D1 H3 ~( @( Z }( N9 H6 H; {, I# o4 v ~0 n
}
. A4 Q1 h5 t* ?9 x3 A$ j }
1 X6 n: o! {9 H0 F7 R8 u: t}
1 p6 J3 q x0 L' p8 v0 M0 J' K& \7 Y, x" s
</code></pre>( b! [9 T/ N# M3 }* T3 d0 c
<p><strong>drop方法里面用了两个循环</strong>,而不是在一个循环里做完两件事?</p>
6 z7 l" x1 d' g6 ^9 O6 V8 Q- d3 V7 v<pre><code>for w in self.workers.iter_mut() {; c; y3 y# l4 H2 d1 A! w
if let Some(t) = w.t.take() {
1 X; D- y: _6 p* `7 N self.sender.send(Message::ByeBye).unwrap();
k4 W/ U. X* t. ?' u t.join().unwrap();( d3 C' B+ G, l9 R; m
}
3 N0 o7 x! m# w- n}
. X b5 j& V5 _5 K. [3 _ H3 M4 S* y; i; k9 H
</code></pre>& F* ~ H1 E0 G, z& V
<p>这里面隐藏了一个会造成死锁的陷阱,比如两个Worker, 在单个循环里面迭代所有Worker,再将终止信息发送给通道后,直接调用join,<br>
+ ~, \2 @& r6 D; ~" @我们预期是第一个worker要收到消息,并且等他执行完。当情况可能是第二个worker获取到了消息,第一个worker没有获取到,那接下来的join就会阻塞造成死锁。</p>
& X9 g4 c4 V6 `5 |$ O<p><strong>注意到没有,Worker是被包装在Option内的</strong>,这里有两个点需要注意</p>
4 R( }8 l7 e! H* k% p0 I9 {9 B# x, C& r<ol>& g4 Y3 W* Z8 b" N6 o
<li>t.join 需要持有t的所有权</li>2 F1 ]; Q. g1 }" L2 @5 ~
<li>在我们这种情况下,self.workers只能作为引用被for循环迭代。</li>) R ~, `0 S2 b% I) X- U
</ol>% \+ L6 M5 G: d& M
<p>这里考虑让Worker持有<code>Option<JoinHandle<()>></code>,后续可以通过在Option上调用take方法将Some变体的值移出来,并在原来的位置留下None变体。<br>0 H2 u0 z/ C, G9 a) v8 r0 b6 [
换而言之,让运行中的worker持有Some的变体,清理worker时,可以使用None替换掉Some,从而让Worker失去可以运行的线程</p>
" L* F* p3 w- [, D2 V<pre><code>struct Worker where
6 G! e$ M: F! V' {{
- W6 ~2 U. p( h& L _id: usize,3 o& e% e( s7 i# o1 v/ \5 W5 a
t: Option<JoinHandle<()>>,
w9 z% N3 H+ i$ A( ` Q}
& k( O a$ W2 f</code></pre>/ B6 W1 ^5 }# j1 E5 v) Z
<h1 id="要点总结">要点总结</h1>' c* o- a3 [8 [* j
<ul>
7 M a+ u" s$ |' f- e<li>Mutex依赖于生命周期管理锁的释放,使用的时候需要注意是否逾期持有锁</li> O& L) k* V$ ^3 a5 }
<li><code>Vec<Option<T>></code> 可以解决某些情况下需要T所有权的场景</li>
; j9 g$ C: ~% ~. [</ul>- {& U- S0 B6 @% G" A5 g
<h1 id="完整代码">完整代码</h1>
: X( o* N8 F' y- P% H# @. q<pre><code>use std::thread::{self, JoinHandle};
8 V6 i. d( _/ _; J% o6 ~use std::sync::{Arc, mpsc, Mutex};
( G# v2 g5 P2 i, E9 t" y, X
9 b, Q/ o! G0 V2 [/ b$ @9 |1 m; E5 D% L( B. q5 a1 T. E/ S4 W3 v
type Job = Box<dyn FnOnce() + 'static + Send>;! N0 P) b7 e; Z& a) P
enum Message {
' l2 L D" ?2 a/ L; {: G ByeBye,
5 `. V4 f% F7 `9 w/ ~ NewJob(Job),
- F& w0 ~9 L* C A" b}
$ @. x7 ~7 x5 I/ H3 v: z/ A! Z
. @: i" a: f. h2 F5 Nstruct Worker where; y* m& O. Z2 V8 B8 O
{+ e. z7 {, _6 ^* ?' Z1 J/ j
_id: usize,
6 g2 B+ o0 R7 W$ a' N t: Option<JoinHandle<()>>,
3 l: h1 B. `& X. ^+ r" w* _" T}3 S. P8 A4 g- {
- ]) Z* m) q) U l, Cimpl Worker9 _# q. V3 k' i: ?/ i, f
{" \0 `. J6 } U2 w
fn new(id: usize, receiver: Arc::<Mutex<mpsc::Receiver<Message>>>) -> Worker {$ K1 u9 V7 q2 v4 }
let t = thread::spawn( move || {
1 y* c( N/ K3 E) I& \* q5 q8 K/ P0 j loop {
, J) ?; P% n% {- m5 {- M let message = receiver.lock().unwrap().recv().unwrap();' Y$ d% I; N) Q4 m
match message {1 A0 V, _: R3 Q% j
Message::NewJob(job) => {6 {/ k$ Y# }5 V t
println!("do job from worker[{}]", id);
& K1 b' X+ n4 U4 j) I job();
5 l9 U* V& E8 l },
% u7 a2 ^3 z) b2 ^; | Message::ByeBye => {
$ ^, A# T, k, t3 \8 e println!("ByeBye from worker[{}]", id); G. @, A: I7 Z& q" M
break
3 @, M5 q& C- x% b" b# w% W },
5 @* ^5 i* r! m1 i5 K1 p }
7 @0 T7 R. s! {& T2 D' d8 U$ f }5 e$ B3 ?5 v/ Z: o& W8 |, R+ R
});
2 }: v6 F& m( L0 W
5 e- I R8 \$ s; Y2 R( X% G Worker {) L2 h7 p1 `' k3 h5 U( J
_id: id,
3 U: i' }9 e7 e+ e0 u8 n/ U; ?$ j t: Some(t),+ r+ C4 H! }9 y8 ]6 n, \: ^
}
' U8 D+ @" j1 _! U }
6 A# Q& H7 _' H}
) a. D; E+ j. h9 O
8 c- @- }5 A8 c5 r+ opub struct Pool {
2 ~3 K) e! u& W workers: Vec<Worker>,0 R4 _* r, J6 M* z g. o5 m
max_workers: usize,
7 @- K b; b% V* `- N- x sender: mpsc::Sender<Message>) _) v! c% q( @3 X1 K6 S
}( U4 O! N: g0 y! r# a- Q+ S+ y
7 S* y" e1 x+ ?" R8 A! n
impl Pool where {$ {9 I/ a# `0 T: i
pub fn new(max_workers: usize) -> Pool {
f& j& P$ s g0 P1 z, n if max_workers == 0 {
# b' D) K, {5 Q$ a1 r9 ? panic!("max_workers must be greater than zero!")
/ D( w4 m& ~/ |% U5 o |1 Q- m }9 i' a) s) X+ L% I4 n& |
let (tx, rx) = mpsc::channel();
. c/ R. d( s' K5 E- h C' e, m( S% d! s0 S5 Y! C9 A
let mut workers = Vec::with_capacity(max_workers);
" b" s! |, `' _/ u3 D, x let receiver = Arc::new(Mutex::new(rx));
4 T+ O& I) q# _( u. i+ G for i in 0..max_workers {% L; D# _3 R8 n* s( ]- _
workers.push(Worker::new(i, Arc::clone(&receiver)));
9 L& U, b* x" L- j4 C1 F9 D/ ] }
% o, V& F' _ R
5 p# L) e# v: s Pool { workers: workers, max_workers: max_workers, sender: tx }
6 {/ X* E0 k5 m4 N6 G! a }/ z9 h5 w3 |* x( Y. }
0 I, J. |# Z! H7 j/ U2 w
pub fn execute<F>(&self, f:F) where F: FnOnce() + 'static + Send
! P. x. Y) n' E: N, {. L: s+ @ {7 s' e3 p1 i! ?' l: v8 p
6 V0 v" x* s8 Q3 o7 j. K/ a' i
let job = Message::NewJob(Box::new(f));
# l/ m5 w# P9 c" ^ self.sender.send(job).unwrap();
3 j# @- L" h u8 o0 O }
! x) p) D0 ?% k0 p3 K6 Q9 n}$ \. a2 Y/ a& ^) v4 u0 |: {
) ^" R8 |, r2 L+ X" c) B5 m
impl Drop for Pool {! O6 _/ \. [9 o f' ?9 O
fn drop(&mut self) {1 A: m b/ }9 \4 i( n
for _ in 0..self.max_workers {4 K5 G' F, z L1 X& p' D- W
self.sender.send(Message::ByeBye).unwrap();; D) i) V: e! d4 Q! P; _
}
; ]( [7 Z9 @2 D4 c! u for w in self.workers {
3 H# a& T4 |* B" j/ Q# w' S if let Some(t) = w.t.take() {4 U/ F @7 f7 B
t.join().unwrap();1 O4 X! E$ Y1 g6 Q" k: J
}1 P/ L5 }3 ^: q' t9 f
}
& R& P% j0 i4 _; h5 } }' k# m) \6 C2 J: K- {3 S
}) J. J: I" `, q
" o* K& y8 |* H" B/ ]
9 R% A/ y+ O, k4 K2 H/ n
#[cfg(test)], T& Z! s* `3 o5 |9 L- d
mod tests {) h6 i" L) ?( Z/ f* c9 @: P
use super::*;
' S" m @/ ?- a/ }9 a# P0 m% A #[test]
2 ?7 e5 P t. f% h* Q$ A: L fn it_works() {4 o; k9 I9 t' E
let p = Pool::new(4);
+ t! G- d( V& O/ V! P( [ p.execute(|| println!("do new job1"));
1 Z) }$ b+ T8 A p.execute(|| println!("do new job2"));
: t8 L- J8 m% n1 W" R p.execute(|| println!("do new job3"));& x8 I. p9 _) }; A1 Q s
p.execute(|| println!("do new job4"));5 ]- _* c/ L G1 t
}
9 s# s Y8 z# o9 j1 Q}5 _: l2 t5 E& B `
</code></pre>
7 w6 A) H P3 M( s p4 W) ]& W1 l; S
|
|