|
|
! D Y! _8 O, F; c
<h1 id="如何实现一个线程池">如何实现一个线程池</h1>' b, w2 ?% T$ I! s$ a! f
<p>线程池:一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。 例如,对于计算密集型任务,线程数一般取cpu数量+2比较合适,线程数过多会导致额外的线程切换开销。</p>
I7 W" U3 I* p; z+ Q3 S<p>如何定义线程池Pool呢,首先最大线程数量肯定要作为线程池的一个属性,并且在new Pool时创建指定的线程。</p>" i- ~' e# a' {7 o5 F( p
<p>线程池Pool</p>5 @- m& [: {3 Y2 H& O) W
<pre><code>pub struct Pool {6 t! ]5 c6 `- Z3 n6 U7 R
max_workers: usize, // 定义最大线程数$ [1 l! ]3 N( _" W) W5 @9 b
}
' z6 R7 t) l+ Q8 U! {& t. y% q& @% }: c( G0 z$ e4 `
impl Pool {& [$ \" O: _6 D' ]
fn new(max_workers: usize) -> Pool {}1 N* `$ f8 r) v$ n% b" P
fn execute<F>(&self, f:F) where F: FnOnce() + 'static + Send {}# k/ v' j: D' Q4 j5 K ]1 ^/ Z
}: o/ }) }! \: C% y; D: h0 j. ?1 o) h
) ^3 g I& ^+ X' G</code></pre>
. G; P5 _$ f8 z3 C0 e<p>用<code>execute</code>来执行任务,<code>F: FnOnce() + 'static + Send</code> 是使用thread::spawn线程执行需要满足的trait, 代表F是一个能在线程里执行的闭包函数。</p>
$ R, Y4 @& k4 w- k<p>另一点自然而然会想到在Pool添加一个线程数组, 这个线程数组就是用来执行任务的。比如<code>Vec<Thread></code> balabala。这里的线程是活的,是一个个不断接受任务然后执行的实体。<br>
, D( b1 D& Y4 p可以看作在一个线程里不断执行获取任务并执行的Worker。</p>4 |4 } O$ E6 c! Y$ Z! F. [
<pre><code>struct Worker where* q+ |) A2 t1 q5 \: n# F
{3 n; A$ n: y' R' X% d
_id: usize, // worker 编号
/ r! R' g5 x* u}
. K4 k, |0 W) J7 `3 c1 i</code></pre>
( b! S" m) s; ?* U( k<p>要怎么把任务发送给Worker执行呢?mpsc(multi producer single consumer) 多生产者单消费者可以满足我们的需求,<code>let (tx, rx) = mpsc::channel()</code> 可以获取到一对发送端和接收端。<br>
2 t; b4 S4 X) k( @& R把发送端添加到Pool里面,把接收端添加到Worker里面。Pool通过channel将任务发送给多个worker消费执行。</p>
+ O5 f' r! X- L, R! a8 o1 B2 T<p><strong>这里有一点需要特别注意,channel的接收端receiver需要安全的在多个线程间共享</strong>,因此需要用<code>Arc<Mutex::<T>></code>来包裹起来,也就是用锁来解决并发冲突。</p>
' X1 l+ t- V9 S i' H/ i<p>Pool的完整定义</p>7 q y! @! ]0 _* z
<pre><code>pub struct Pool {
% O6 A/ ?9 E4 `; J* F6 Z/ z' @ workers: Vec<Worker>,1 S* E$ X! s, H% I
max_workers: usize,4 s! z/ N; o6 _" y$ t
sender: mpsc::Sender<Message>; N9 R2 {" M! r9 b& `- h9 b
}3 b) c. L. n- C8 [! I/ p
</code></pre>! c0 |' g5 W2 _, Z3 U3 Y
<p>该是时候定义我们要发给Worker的消息Message了<br>
g4 {3 o3 c$ c6 h4 x定义如下的枚举值</p>9 z# L4 i- z' p3 n/ P
<pre><code>type Job = Box<dyn FnOnce() + 'static + Send>;9 j9 p, K/ ?9 x9 b7 Q9 K
enum Message {" k% M6 _! r# P
ByeBye,
' B9 G- g+ m' p5 e# _ NewJob(Job),4 I) s" w+ [; |" f3 F
}, |1 a/ ~- x- H& {( m9 x. s' O
</code></pre>
* H9 J/ I; _3 \1 @2 X3 D<p>Job是一个要发送给Worker执行的闭包函数,这里ByeBye用来通知Worker可以终止当前的执行,退出线程。</p>* s0 i( N& l, Y
<p>只剩下实现Worker和Pool的具体逻辑了。</p>
, A5 b: R4 o6 ^& D* v# R; O<p>Worker的实现</p>6 O4 g$ A) M: R! p
<pre><code>impl Worker
7 o) ]5 ~) A$ a! `; {{& X+ {; P4 z( K e# ~( f, W
fn new(id: usize, receiver: Arc::<Mutex<mpsc::Receiver<Message>>>) -> Worker { y" b2 a) Y/ E" Q% \( J9 Z
let t = thread::spawn( move || { K% u6 Y2 j% E% p. Y3 }7 K$ x9 ~
loop {
+ V1 @+ E2 _$ f let receiver = receiver.lock().unwrap();
/ t2 f* W& {$ {" z$ f& O let message= receiver.recv().unwrap();( H) b* v/ ^: j9 N* s* O7 h& [
match message {3 A3 ^" ~* k. p
Message::NewJob(job) => {
/ c5 }! n( d! P7 w- m println!("do job from worker[{}]", id);
" G3 V F* `. h% C* [4 W job();6 r1 s0 E; o5 N. M5 |) d& j$ o
},% {! E% d, p' |3 w# F+ I
Message::ByeBye => {
- o; I+ P* Y# n- P println!("ByeBye from worker[{}]", id);' ^/ l+ j( a* b# X3 H# |
break/ m# G; M0 }2 ?5 _3 v) R% a
},& ]6 d5 q) r0 a4 q
}
+ I, {* M6 @0 k7 l T K; x2 A }5 I. E/ O# {: Y, K$ _- K
});6 v" x4 U/ D7 E1 U/ i
0 u7 O6 x" {4 f# |' y @4 a* Z6 x
Worker {
( t% n z8 h3 _- I* r _id: id,4 `! q) w/ ~6 [0 m. ?$ P
t: Some(t),* z1 A% l" i+ i$ A$ z, X6 r# N! j
}% m2 L0 ^; O9 k' i0 a. `
}* b$ M* f, W! z' O) b# w
}- B0 i6 z$ X) [: x% W$ V" ^& f
</code></pre>
% c" r# H C6 @# b- s' r$ Y9 p<p><strong>let message = receiver.lock().unwrap().recv().unwrap();</strong> 这里获取锁后从receiver获取到消息体,然后let message结束后rust的生命周期会自动释放掉锁。<br>
/ G I, _; ]: a' d, \% \0 }3 k" U7 p但如果写成</p>
! y4 A1 U) w* ?. o* Z2 r<pre><code>while let message = receiver.lock().unwrap().recv().unwrap() {
" S0 ]# d) m/ H. O( t% _0 \};$ I; }4 o$ m/ J2 A3 k8 V
</code></pre>
& g6 d: Y3 z- ]( R8 ^<p>while let 后面整个括号都是一个作用域,要在这个作用域结束后,锁才会释放,比上面let message要锁定久时间。<br># m* J! O! u4 }+ b) k7 x
rust的mutex锁没有对应的unlock方法,由mutex的生命周期管理。</p>" m% j0 ?, @% Q3 X9 ~
<p>我们给Pool实现<code>Drop</code> trait, 让Pool被销毁时,自动暂停掉worker线程的执行。</p>
. j6 |. |. ], k& D<pre><code>impl Drop for Pool {) P- T2 D6 t/ L# d; C& w! u( d
fn drop(&mut self) {
. n1 |+ b ]; W' \# e1 h9 }; W for _ in 0..self.max_workers {9 c: R# ]! e! F
self.sender.send(Message::ByeBye).unwrap();3 b) L) | X" E) k- Y
}# \! B& {2 _. L
for w in self.workers.iter_mut() {
2 Y9 W7 b. h- {8 l if let Some(t) = w.t.take() {
2 ]6 ~0 {/ s& @% ^3 J) G9 b t.join().unwrap();
' m' Q8 C0 N2 p& ? }" B: o0 i2 e3 Z* z7 `! x
}
m: y, Z) W$ T# h7 ?& `- P9 r# T }- N$ ?) X5 U U5 Q
}
: l7 d9 S8 |4 z( Z" I4 \) k
! o- G( h T5 V8 w5 B/ V0 P</code></pre>% A+ l& g* s: a1 e3 }
<p><strong>drop方法里面用了两个循环</strong>,而不是在一个循环里做完两件事?</p>
7 i, X$ g. s+ {0 u7 R<pre><code>for w in self.workers.iter_mut() {% X" Y: e$ k5 Z7 S! V3 x8 a2 a
if let Some(t) = w.t.take() {
6 n0 ]8 A9 I: \6 D4 n B self.sender.send(Message::ByeBye).unwrap();% r2 |1 T8 B/ \9 `
t.join().unwrap();
/ U1 [& y( Z5 \' z0 Q }
! ]- u) T7 {7 s4 h2 V}
6 h+ {( s' R& p
4 G ?! l' I/ C B6 _</code></pre>7 M- ]5 H( n6 u" t9 o9 f
<p>这里面隐藏了一个会造成死锁的陷阱,比如两个Worker, 在单个循环里面迭代所有Worker,再将终止信息发送给通道后,直接调用join,<br>2 N, e" {& B# k) V" G& H
我们预期是第一个worker要收到消息,并且等他执行完。当情况可能是第二个worker获取到了消息,第一个worker没有获取到,那接下来的join就会阻塞造成死锁。</p>
' \5 ^4 e& _+ o1 W, j( }+ J<p><strong>注意到没有,Worker是被包装在Option内的</strong>,这里有两个点需要注意</p>" t' G/ f F8 @$ L- z. S) ~
<ol> m4 y. x! C- ~6 T
<li>t.join 需要持有t的所有权</li>
0 m7 C6 k! \9 V3 O5 K3 a<li>在我们这种情况下,self.workers只能作为引用被for循环迭代。</li>& Y- L9 b( T& w8 D- j6 k
</ol>, w! ~+ r' I3 j3 v2 t" E5 g" b
<p>这里考虑让Worker持有<code>Option<JoinHandle<()>></code>,后续可以通过在Option上调用take方法将Some变体的值移出来,并在原来的位置留下None变体。<br>% y1 Z4 B: Y, T
换而言之,让运行中的worker持有Some的变体,清理worker时,可以使用None替换掉Some,从而让Worker失去可以运行的线程</p>
3 X2 ^+ h# U& u" ]! L+ @4 E: D# L<pre><code>struct Worker where
' R) C/ g _+ G) e9 b{
: g6 K# _' l1 r0 i _id: usize,
8 L" n7 a0 K- D8 N" j t: Option<JoinHandle<()>>,# H7 V# m2 Z5 a% _2 a9 `, z. O
}
4 G) a& Z5 S) S0 Q" H. w</code></pre>
- j5 K h) a1 V, M8 _<h1 id="要点总结">要点总结</h1>
`8 A- h. I+ k& y' y<ul>
% l* {- |0 J1 R! `" M<li>Mutex依赖于生命周期管理锁的释放,使用的时候需要注意是否逾期持有锁</li>
6 V1 s7 x4 f- L# {; e<li><code>Vec<Option<T>></code> 可以解决某些情况下需要T所有权的场景</li>/ Q+ w1 e" z3 f ` t k
</ul># H5 h8 H, p0 x5 h1 T+ m: v% V
<h1 id="完整代码">完整代码</h1>
9 M% M/ M; g4 s% Q: r<pre><code>use std::thread::{self, JoinHandle};
. M# ?8 i( S8 d& }use std::sync::{Arc, mpsc, Mutex};9 i3 b- L# v9 h
$ y, G1 z+ c* O& V V( x# W
( |: ^' r* H) }; q8 f
type Job = Box<dyn FnOnce() + 'static + Send>;
' h3 k; O6 z, M( m! ], venum Message {
. d" r3 _5 C, X0 r( ? ByeBye,
1 n3 f& Y+ Q' m: g4 I NewJob(Job),& f; r: N8 h# C/ h0 T% m" p; \
}
9 n' j( Y! v6 O5 B f
# H9 t! k+ X* m3 b1 Kstruct Worker where _1 a/ _' X0 ]1 ^
{
' _) S4 G8 u0 `8 L _id: usize,
$ f4 E- s/ T$ t4 } t: Option<JoinHandle<()>>,
s; X7 |# z1 _9 O3 j9 o/ y}
G- r& [ b! W* u) t+ D1 x* e& ?4 A1 `; d5 ?$ E5 S/ P
impl Worker
" v0 v V" V- r% C/ M{
$ V4 a2 T4 w3 H- _ fn new(id: usize, receiver: Arc::<Mutex<mpsc::Receiver<Message>>>) -> Worker {9 ?4 G R+ N2 F- R4 x0 o
let t = thread::spawn( move || {
6 F! P) \, }" b2 K! ]! H t loop {% U- p- ?) Z6 X* j7 ]- n8 k
let message = receiver.lock().unwrap().recv().unwrap();- T: d) }6 R& F5 Q: ?- `/ L
match message {1 {* D2 O! B2 s" B6 Z& w8 i: A
Message::NewJob(job) => {
( H& G" f! F. A; S: Y% X9 { println!("do job from worker[{}]", id);
. P1 t/ u$ e, w7 }: |/ I job();, v y1 k( R5 R, n' \
},4 u9 H) K. i1 N! o6 W) ^! D ^
Message::ByeBye => {
/ U: J5 v. H: N* v& K4 ]3 w( h println!("ByeBye from worker[{}]", id);. g& S4 e0 `6 J6 E
break
# w) B( h2 P, E* T1 H" ~) l" s },& I% x! X- k2 b0 Y
} * E$ d/ q, Z4 M. B( m! i- S
}7 Z- X6 L5 }; Q8 O3 E' i
});# }& @/ ?; A% @! {: b
* j) x; D& G: n: {+ v* K- ^
Worker {" @1 ]1 G' J/ ]1 U7 c6 L; [
_id: id,$ @ \. _: i9 Z; R
t: Some(t),
! j6 N- r3 N4 b/ S1 U; o/ B, Y" O }
' I; G% _9 P! X, x/ S1 l }
& `! Y0 w$ g/ W" [6 q n. H}
% o- J/ m* s" R/ o b% s2 b/ C, y
+ a2 l; w: F. x, Tpub struct Pool {3 p1 M. o: x1 ^1 a* C2 P
workers: Vec<Worker>,
4 f& t/ x% y7 e- D max_workers: usize,
! S5 z2 m2 q0 k/ _ sender: mpsc::Sender<Message>! |! c: h! }$ L+ B
}1 ?; v2 ^( V0 b1 c' m) j+ [* O
. X! m6 a* ~7 i) e1 l: U
impl Pool where {9 j# y5 e- e U8 G
pub fn new(max_workers: usize) -> Pool {1 Y. {4 d' S' [+ L' |
if max_workers == 0 {
" h2 T& Z7 |9 T: X D panic!("max_workers must be greater than zero!")
+ s& _& F2 u" q$ F! g }
: ~0 z& D9 E0 M0 w. d let (tx, rx) = mpsc::channel();
% y! x! D6 o2 J4 r, {. v$ m) V6 x2 X- x2 _/ D- _4 b/ _
let mut workers = Vec::with_capacity(max_workers);6 m R* f. e5 m5 j2 B1 n! d; e
let receiver = Arc::new(Mutex::new(rx));
( O. x7 [! H7 K) r for i in 0..max_workers {+ V' x1 Q' c- }9 x F6 z7 w
workers.push(Worker::new(i, Arc::clone(&receiver)));
. z: F- f- [$ X, ^1 {4 o }% G1 F, O! u* x/ C; f( I
& y$ I1 V5 H# ?9 S
Pool { workers: workers, max_workers: max_workers, sender: tx }. f' B( h8 ^$ K4 L% I. D( F
}
% {: ~$ w) @* h" L# o9 i4 F
6 `. g. J% X( v4 @ K pub fn execute<F>(&self, f:F) where F: FnOnce() + 'static + Send
: P9 r, }% ~8 R6 v' n0 H# _9 c- ] {3 j1 e' Z! G/ A6 z* s
4 \! }2 m7 j* f# e6 `: k3 ^ let job = Message::NewJob(Box::new(f));
% O' ^% G7 k, y1 [ self.sender.send(job).unwrap();
! a3 K+ H) F0 p) h4 O }& M' N8 ~; ?" W% g m
}
3 }6 l+ G1 {: ~! ~# p, F* _* @. T. s4 d0 ~; F9 }; i# @
impl Drop for Pool {
! x! _+ e7 ?; C2 C, J fn drop(&mut self) {
7 o2 Q0 w0 D5 k, U. D/ O; y for _ in 0..self.max_workers {- F- J$ o, E) I3 s6 R
self.sender.send(Message::ByeBye).unwrap();! L8 h8 I3 B L$ `; K, P
}
2 }/ d4 w# b A for w in self.workers {
7 X; `5 B4 }" i, W6 W if let Some(t) = w.t.take() {
9 x8 P7 ~( [- N$ d: r6 v+ E; k, ^* @ t.join().unwrap();8 }$ C" i( J3 R, l
}
7 G( ~# p: D* Y8 P }
, y F' F! D$ p. c/ z+ n6 h }! _# m6 v3 s& ]! e/ v% P# ^: Y' w
}; U! ?! }3 t9 z% ]+ {" G! c6 }. G
5 J/ F5 H; g5 G9 \: c
3 H: _. {, d* q( j#[cfg(test)], {. d2 F7 \+ V8 H. @
mod tests {& j( { D" D' c, `
use super::*; _* |1 U" I. q0 Q1 \6 w: O, Q
#[test]
6 w, |- |. ]' p' p3 P fn it_works() {' C9 ^ n4 u; y2 \' h, c f2 Y
let p = Pool::new(4);$ {" b _/ {/ H+ Q N4 w% T8 b
p.execute(|| println!("do new job1"));9 d' [% G1 a5 }. [; y. H
p.execute(|| println!("do new job2"));! O, @4 Q- U5 S$ R. q1 |
p.execute(|| println!("do new job3"));' F- y, j* d2 i& d5 p
p.execute(|| println!("do new job4")); N& o( i. J0 v& [+ ]$ X
}
% t7 g% D+ v! [1 W}
0 S: S/ V3 [: T1 V7 }</code></pre>
4 @$ {; H8 s" g! q! S: n
$ m1 _0 F: c8 l7 o2 D8 d1 ] |
|