|
|
6 m3 q" V8 G5 @: c
<h1 id="如何实现一个线程池">如何实现一个线程池</h1>7 E1 Z& U! T4 d& o |
<p>线程池:一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。 例如,对于计算密集型任务,线程数一般取cpu数量+2比较合适,线程数过多会导致额外的线程切换开销。</p>
# N; s# E6 g' v7 R<p>如何定义线程池Pool呢,首先最大线程数量肯定要作为线程池的一个属性,并且在new Pool时创建指定的线程。</p>3 M9 q+ l8 f Z' r+ J
<p>线程池Pool</p>- C8 ~! ^3 }7 t. r% U4 o3 r
<pre><code>pub struct Pool {
! n D7 p, l1 h1 E" A1 j max_workers: usize, // 定义最大线程数
% k0 S; g2 W( U# {$ B5 u% u0 L}
3 s0 R8 Q" ?+ |* [. f- i) i6 [; C/ t0 W; `, ?+ z
impl Pool {; l& T3 q* v% _# `
fn new(max_workers: usize) -> Pool {}
2 K. g; N: e) e' c x E fn execute<F>(&self, f:F) where F: FnOnce() + 'static + Send {}
: r* [2 G+ d. N+ Q+ C}
- ~* }* w& ^: K: P6 M' H0 J% B* Q! V. Q7 [
</code></pre>
$ V2 P; @ Q# S6 u# h<p>用<code>execute</code>来执行任务,<code>F: FnOnce() + 'static + Send</code> 是使用thread::spawn线程执行需要满足的trait, 代表F是一个能在线程里执行的闭包函数。</p>4 e, U) M: f2 u
<p>另一点自然而然会想到在Pool添加一个线程数组, 这个线程数组就是用来执行任务的。比如<code>Vec<Thread></code> balabala。这里的线程是活的,是一个个不断接受任务然后执行的实体。<br>$ @7 H) R! i, y! s: ]2 R: \, L. A
可以看作在一个线程里不断执行获取任务并执行的Worker。</p>
0 A4 T: f! n/ a0 a9 D" U' O<pre><code>struct Worker where" Q( M, X$ q4 A; D$ _# q
{
; T+ l. G" a+ ?; V1 N! m1 _3 o _id: usize, // worker 编号5 k; X: w1 Q. b1 K: e* q* S1 }
}
7 X+ Z4 t; N- B, s/ E1 ^5 n</code></pre>
4 @' v3 U3 j' }- M5 l* K0 t<p>要怎么把任务发送给Worker执行呢?mpsc(multi producer single consumer) 多生产者单消费者可以满足我们的需求,<code>let (tx, rx) = mpsc::channel()</code> 可以获取到一对发送端和接收端。<br>8 [3 M& f* ]: k+ y
把发送端添加到Pool里面,把接收端添加到Worker里面。Pool通过channel将任务发送给多个worker消费执行。</p>& F$ z ^4 b$ {% I7 O3 s9 D C7 v+ a
<p><strong>这里有一点需要特别注意,channel的接收端receiver需要安全的在多个线程间共享</strong>,因此需要用<code>Arc<Mutex::<T>></code>来包裹起来,也就是用锁来解决并发冲突。</p>
$ ?" d; z# j1 c+ @<p>Pool的完整定义</p>
2 `4 K1 U" M. |4 d: A! d2 }<pre><code>pub struct Pool {
! @+ r/ k% H/ r workers: Vec<Worker>,
$ @5 g- g: A, V. Y$ K% ^ max_workers: usize,
1 n) b# N9 H9 H+ x; R+ W sender: mpsc::Sender<Message>
! T8 s! [0 l8 K9 Q2 i* l: O' N}
. w7 w$ b: R+ W. n</code></pre>$ o: d( M, q- y! L5 y! l/ H1 P6 M: I
<p>该是时候定义我们要发给Worker的消息Message了<br>
0 \7 I/ D$ t( m5 q/ ?& ~8 K, s定义如下的枚举值</p>2 f# R& d1 J1 @
<pre><code>type Job = Box<dyn FnOnce() + 'static + Send>;
) G& `( |8 J( y1 E! i$ |$ X( tenum Message {/ r- j5 ]: t7 }$ N1 [
ByeBye,/ M2 @" C ~+ d9 H8 j2 y! T
NewJob(Job),2 ?3 @( H0 J) U$ O6 ^5 n+ A
}
8 J! w' H6 a: k& }- s) w6 s</code></pre>) \5 z$ R0 t9 o! B4 Y# ]/ C
<p>Job是一个要发送给Worker执行的闭包函数,这里ByeBye用来通知Worker可以终止当前的执行,退出线程。</p>
O% s4 w' u# i ^* w<p>只剩下实现Worker和Pool的具体逻辑了。</p>
7 k# v, k8 k. v( j1 i$ g<p>Worker的实现</p>5 P8 ^' {7 O! Q* h5 k, f
<pre><code>impl Worker- @1 r2 m$ e' A0 x# {7 n+ T
{
, N4 R1 S* I+ L fn new(id: usize, receiver: Arc::<Mutex<mpsc::Receiver<Message>>>) -> Worker {
. Q9 o( `- X! V6 O" M/ } let t = thread::spawn( move || {0 r8 @# B9 g: a: p0 h) }
loop {' u2 g6 S: r, m; T7 G5 _
let receiver = receiver.lock().unwrap();+ Q/ H' n: m G. N1 p
let message= receiver.recv().unwrap();
! ~1 {% Y1 `) g# p4 D# g4 ~ match message {
+ l; j4 [' C. r8 L Message::NewJob(job) => {& ]- A+ R3 G. f) o4 S/ Q q8 I
println!("do job from worker[{}]", id);
2 X+ Y& y- _5 w! N job();3 D ~; m- ~- q ~0 G* z% \/ ]
},
& X) b5 |7 q- S' u Message::ByeBye => {
9 L# [1 j9 A1 s& r println!("ByeBye from worker[{}]", id);
# u. F# e* V9 R" W break' X* k, P: D u
},+ Y# I- e+ R0 X2 ^+ ]" R
} / ^+ v# o' n G' I+ P2 o& \. A
}6 z: [5 a5 N' z# V5 D
});
5 i4 W" V" [. ?1 R
! k$ j; r5 s( M Worker {+ k% `+ K" N) Z& d
_id: id,: E# G, i& Z, l5 x( u, f: K& Q
t: Some(t),3 O' x" A4 u$ H+ w( m5 M) t
}. r+ ^* G' u& o. k( ]
}
0 j" @/ _+ Y" T# l* p$ e! X/ f}/ _; i$ {8 _# @) w
</code></pre>
3 x0 d3 U7 Z, S4 ?8 ? w<p><strong>let message = receiver.lock().unwrap().recv().unwrap();</strong> 这里获取锁后从receiver获取到消息体,然后let message结束后rust的生命周期会自动释放掉锁。<br>0 N7 f" `3 t4 v1 a
但如果写成</p>
3 ? k# `0 {3 ?3 y<pre><code>while let message = receiver.lock().unwrap().recv().unwrap() {/ H4 K; D* F$ O# H- G( \
};
$ Y9 j. `; n6 p8 A* g# p</code></pre>
, A1 w6 V# N+ l* L<p>while let 后面整个括号都是一个作用域,要在这个作用域结束后,锁才会释放,比上面let message要锁定久时间。<br>6 g. {% w& U) `2 j6 x$ L2 ]; J/ M
rust的mutex锁没有对应的unlock方法,由mutex的生命周期管理。</p>8 G% J$ ^7 G, |) C4 _
<p>我们给Pool实现<code>Drop</code> trait, 让Pool被销毁时,自动暂停掉worker线程的执行。</p>' w w4 k6 X* }; X+ Y: w
<pre><code>impl Drop for Pool {
; U+ u8 Z6 k7 q4 y% a k fn drop(&mut self) {) C; C" _. L9 e/ t. B* ]+ w
for _ in 0..self.max_workers {7 @* a$ ]( k' Z: t9 O* D4 [% L' \
self.sender.send(Message::ByeBye).unwrap();* q3 F% K* `2 \6 r, S/ K
}/ o7 z$ ]4 F+ v: {" d
for w in self.workers.iter_mut() {: `% U$ N# d/ m, W9 l$ s
if let Some(t) = w.t.take() {
u( ~+ D9 g7 H, y. G1 Z t.join().unwrap();
3 y7 J# H( f( w$ S }) @' X8 P# ^4 C! a0 ?
}
7 j b, r" \$ Q: m }
0 V1 O3 H& M8 r% J$ a T, z}3 U7 ?8 o9 u1 `5 P( i' ?
n, R d! a. _</code></pre>. V2 l0 ^* ^: g" v( ^
<p><strong>drop方法里面用了两个循环</strong>,而不是在一个循环里做完两件事?</p>9 h1 Q% y( I, q* h
<pre><code>for w in self.workers.iter_mut() {9 D8 h H" Q& Q2 J& o4 P& ~1 q
if let Some(t) = w.t.take() {
; `0 R% s6 {8 ? { j self.sender.send(Message::ByeBye).unwrap();' S% J/ X; ?- v/ @. P8 }
t.join().unwrap();; q5 W7 V7 i' s
}
4 L2 O1 Z; P) \, Y+ e" {" r o}
, g! [* a# b+ O/ H/ q& b: l
6 h* E$ [4 X) D) i- G5 a4 V</code></pre>
( V, t2 }0 z$ S V: x* \<p>这里面隐藏了一个会造成死锁的陷阱,比如两个Worker, 在单个循环里面迭代所有Worker,再将终止信息发送给通道后,直接调用join,<br>3 H5 G' b ], `
我们预期是第一个worker要收到消息,并且等他执行完。当情况可能是第二个worker获取到了消息,第一个worker没有获取到,那接下来的join就会阻塞造成死锁。</p>
7 ^( X% I9 x( d9 m' U<p><strong>注意到没有,Worker是被包装在Option内的</strong>,这里有两个点需要注意</p>0 P5 n$ p- @( Y# C8 b
<ol>, P3 X) T |. q7 q
<li>t.join 需要持有t的所有权</li>% G% o l: `& k, P" f! p
<li>在我们这种情况下,self.workers只能作为引用被for循环迭代。</li>! w9 d: [) l- W4 O
</ol>' a* j+ Z7 N& o# D2 Q! Q
<p>这里考虑让Worker持有<code>Option<JoinHandle<()>></code>,后续可以通过在Option上调用take方法将Some变体的值移出来,并在原来的位置留下None变体。<br>
; N/ N' }# X) l ]换而言之,让运行中的worker持有Some的变体,清理worker时,可以使用None替换掉Some,从而让Worker失去可以运行的线程</p>
" U0 _1 l. G1 Y# a. A7 C<pre><code>struct Worker where
& T" H5 H, t* e+ Z{
/ Q7 U7 M( U2 `5 z$ c1 i0 J/ P _id: usize,8 c8 t6 B, g6 P. U% p- Z
t: Option<JoinHandle<()>>,
0 ~3 K( T- z- a+ [: t5 I6 J" Z}/ Y8 V( h* r9 L, s5 F3 t
</code></pre>% P3 l1 v( h) _% s
<h1 id="要点总结">要点总结</h1>' J( g6 l6 {8 W! O
<ul>, G! ?0 Q3 f4 D& `
<li>Mutex依赖于生命周期管理锁的释放,使用的时候需要注意是否逾期持有锁</li>
7 H0 r8 ?; E) L' |1 e<li><code>Vec<Option<T>></code> 可以解决某些情况下需要T所有权的场景</li>
( i* A, k( I# D: ~7 `% D& ^</ul>& _9 F" \/ c6 ?1 G- f' c
<h1 id="完整代码">完整代码</h1>% c, @5 _: @4 n: l9 s
<pre><code>use std::thread::{self, JoinHandle};! r) _" `7 t1 X. k5 t
use std::sync::{Arc, mpsc, Mutex};" _( r, F. a0 u1 o
+ K* D8 G# x9 k( T5 x3 Z0 u3 \
& m/ N0 X/ Y1 Z' M' Jtype Job = Box<dyn FnOnce() + 'static + Send>;; x0 k/ m& q: a- `# T5 }
enum Message {3 m1 A" `% B, ` @2 I( @
ByeBye,
3 Y( Y" l: g7 R; k NewJob(Job),
9 q7 \; o! J& O: X}
% R4 i; ~5 R! A- a& G8 n7 o5 a$ {" ~* ]/ X
struct Worker where
6 D& I8 A+ w: Z) d{
( X4 b! |7 U, \' c- X* |& { _id: usize,. T+ k7 s" R, X$ V+ j
t: Option<JoinHandle<()>>,
3 k% h% F' g, r1 b& G8 V: k}
0 k6 l4 }4 L3 G% @. w1 s- B/ j% g. c6 c1 c
impl Worker
7 T' y/ A5 S8 u) j/ [! c" T{
+ S- r9 Y: q; N1 [4 J fn new(id: usize, receiver: Arc::<Mutex<mpsc::Receiver<Message>>>) -> Worker {3 I N+ c7 m( ]3 b% A. c# Q: {
let t = thread::spawn( move || {
1 J# p+ {/ n9 y A* B& p! n! T loop {
( w3 |$ l$ n' U; g let message = receiver.lock().unwrap().recv().unwrap();
5 m% r! x! x! C; i match message {
7 C# e# R4 R7 D% w; ]( u Message::NewJob(job) => {+ N) j5 d* k, r$ o" @' P. {" H
println!("do job from worker[{}]", id);
+ o: f4 w$ b& X. X job();. e8 t) B4 H( G, s0 o. n
},
) [( s, T) b! Q# \- L Message::ByeBye => {: B2 y; q- k- D* s
println!("ByeBye from worker[{}]", id);6 D3 l N7 G% q. ]
break
! ^( i2 n2 j: H7 y2 n+ { },
% S5 w* x6 u$ i4 o/ \" l, f' S: B } % R1 m, g b2 g- g" _8 E
} V1 a$ @; o5 E7 P
});$ O9 g9 b8 t Z4 m" V$ N
+ Z- w6 u v% D5 C, X* X# } Worker {
0 d4 a1 q: \* `, l- ^: l _id: id,
: ~' a7 {& J/ ]/ b t: Some(t),* y# K9 i1 T% W- Z$ z+ Y
}/ G8 _/ m& n# k5 l+ n0 ^: h
}% T& ^% _7 R" e9 A( H0 H
}
" J5 b4 B4 i" ?: J* I1 ]+ m) a
! {+ T/ r5 \9 T; X1 `pub struct Pool {
% F5 S: |1 K' ]% h# V workers: Vec<Worker>,: `: e+ p2 z+ O8 S/ }4 r# H% b
max_workers: usize,7 j0 P6 X# f7 [- f' |! h5 @! u
sender: mpsc::Sender<Message>
4 h+ [9 H% @+ l4 L; ?}1 `- {8 U' b4 |1 U! {; z
1 A: h! q+ x0 U- \ ]impl Pool where {1 j" N6 Y. c: `) _; Q
pub fn new(max_workers: usize) -> Pool {% k2 j" j' A0 h* d
if max_workers == 0 {
% r& @6 z% Z v8 f. n. C6 k7 k panic!("max_workers must be greater than zero!")
$ |( P+ Y# t8 u# a' q4 D, H' W/ M/ a3 Q }7 B5 A* u8 `& b* K* M+ p
let (tx, rx) = mpsc::channel();
0 w; b6 r* h1 `! f* r7 _( } G/ e% Y: W8 V/ L9 ]$ N0 A
let mut workers = Vec::with_capacity(max_workers);
$ F. y' i- O9 ?: }+ { let receiver = Arc::new(Mutex::new(rx));/ x' R: v8 h# m8 k
for i in 0..max_workers {
7 g* |9 N3 C* u ? workers.push(Worker::new(i, Arc::clone(&receiver)));; \0 ]& I! ^' E( \. A! O
}
. I+ b7 i6 s+ }/ Z5 k& Y
% p+ V3 H) o# _: } Pool { workers: workers, max_workers: max_workers, sender: tx }
1 J6 Z# S; ]5 J" l! N }6 O3 Z; p" I1 ^& m
0 F! v3 O3 m! U# q% Q
pub fn execute<F>(&self, f:F) where F: FnOnce() + 'static + Send
4 u+ z! H4 }+ f; E2 b {$ f3 j! x [! H
: n' b1 E7 V! L' R let job = Message::NewJob(Box::new(f));: E* d2 V: o I
self.sender.send(job).unwrap();
5 s! L$ ]0 m# G }
. i* s4 b R% q0 i! Y4 k4 _8 I$ L}- O* z( M" i% o& t' j5 j
+ U4 D4 l6 ?& x1 d" T5 i
impl Drop for Pool {
3 l/ j) Y6 a. A# p# [& W* | fn drop(&mut self) {6 }% O! v2 _8 I3 M2 b4 F
for _ in 0..self.max_workers {
+ ^- \* w5 `, T# J, o+ Y" t. ~ self.sender.send(Message::ByeBye).unwrap();
+ k' W) O+ X& o$ Z }3 E4 t0 q- q! O. g
for w in self.workers {
4 y t- i" [& h9 Y) ?# ~ if let Some(t) = w.t.take() {7 J. h" a' x9 L8 b5 @2 r8 i
t.join().unwrap();
, Q6 _0 P0 ?5 Z* q' {- M- [; i }
, r k1 d. n2 A- W }
5 H% n- B0 a2 E8 k# G! Z5 Y }4 x5 l, t$ H; A3 ^& @. x- R' `* b
}& G6 r; M. z" n E
* D( y; [8 @8 @8 X
8 j. u; R+ c9 j#[cfg(test)]
& e0 q0 {% M9 N8 ?4 X! Wmod tests { P3 U m( _. P; X& P+ Z
use super::*;) O: d' |8 D) U x; J
#[test]5 f) h6 Z7 J. z, h2 j# T
fn it_works() {/ {; Y, ^% k! j
let p = Pool::new(4);
: _. ?$ k G9 M& t. @ p.execute(|| println!("do new job1"));7 O/ ]: j! I. ~
p.execute(|| println!("do new job2"));/ Y* d9 G/ o9 H0 y
p.execute(|| println!("do new job3"));
5 v' j3 f. j4 m2 z" L p.execute(|| println!("do new job4"));
0 _9 r$ {& N H* v O5 i# v }
4 t8 e2 R; Z& J5 N}
0 p' C: b/ ]6 \7 x g9 w</code></pre>
$ v% `$ Z; z$ U/ e# j- ]# o2 ~4 x( O% C
|
|