|
|
9 O1 f& k5 T% c r# y9 X N<h1 id="如何实现一个线程池">如何实现一个线程池</h1>
& @+ b* f# N6 ]4 \" ?# c<p>线程池:一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。 例如,对于计算密集型任务,线程数一般取cpu数量+2比较合适,线程数过多会导致额外的线程切换开销。</p>) L0 I0 M- z1 F% |4 q; A9 f
<p>如何定义线程池Pool呢,首先最大线程数量肯定要作为线程池的一个属性,并且在new Pool时创建指定的线程。</p>
. Q3 X; M: f2 Q; H5 ^ r<p>线程池Pool</p>
# y9 w/ q/ a0 U<pre><code>pub struct Pool {
1 @7 p0 e2 e r max_workers: usize, // 定义最大线程数
% k0 e* C4 [+ u' P}( ?) Y2 ^ r" n( z
, i0 V+ e% j9 j8 k0 M* z6 l
impl Pool {/ {; }' {. P3 @2 U, z$ m* ~7 @
fn new(max_workers: usize) -> Pool {}
$ S; p) A% W' X# ?: X7 x7 X/ _ fn execute<F>(&self, f:F) where F: FnOnce() + 'static + Send {}& Y* G( l u7 E
}; @5 @% R! D! X. H, Y
" O4 C! S0 {7 U' f& ?</code></pre>
4 C7 F$ Y5 j9 I* Q<p>用<code>execute</code>来执行任务,<code>F: FnOnce() + 'static + Send</code> 是使用thread::spawn线程执行需要满足的trait, 代表F是一个能在线程里执行的闭包函数。</p>
u, G4 T) P w' @6 W0 a<p>另一点自然而然会想到在Pool添加一个线程数组, 这个线程数组就是用来执行任务的。比如<code>Vec<Thread></code> balabala。这里的线程是活的,是一个个不断接受任务然后执行的实体。<br>
: y% f: }1 E( E/ g# m+ i可以看作在一个线程里不断执行获取任务并执行的Worker。</p>
/ p) e/ X+ k# y# }<pre><code>struct Worker where) Z5 [% q- i. Q& N" }! f
{/ `5 f1 A0 A; }
_id: usize, // worker 编号
+ k" Y+ y# R& @# {. H1 M# W- i, B}
) m( K: P+ {8 j7 \8 B</code></pre> k- H' ]8 x& Q& j9 D% M
<p>要怎么把任务发送给Worker执行呢?mpsc(multi producer single consumer) 多生产者单消费者可以满足我们的需求,<code>let (tx, rx) = mpsc::channel()</code> 可以获取到一对发送端和接收端。<br>( D# j e1 c" ]' n
把发送端添加到Pool里面,把接收端添加到Worker里面。Pool通过channel将任务发送给多个worker消费执行。</p>4 }& J' k+ d/ i
<p><strong>这里有一点需要特别注意,channel的接收端receiver需要安全的在多个线程间共享</strong>,因此需要用<code>Arc<Mutex::<T>></code>来包裹起来,也就是用锁来解决并发冲突。</p>* }4 N' p9 \ ~& T1 s7 o" V5 A) Q
<p>Pool的完整定义</p>
3 z. T: F/ e: S* F<pre><code>pub struct Pool {: s9 R u5 r$ |3 A) X3 m
workers: Vec<Worker>,1 T/ d6 c7 b; Y5 K$ G" [
max_workers: usize,
* K+ }7 b+ y, _/ [, e sender: mpsc::Sender<Message>
7 r1 \, o2 a T) a8 ^/ i}
. M& {! a* m0 b2 O! x</code></pre>
. S) Q8 Q6 k) d4 [$ H<p>该是时候定义我们要发给Worker的消息Message了<br>
7 R# c8 i. C t, Z% o1 N8 m定义如下的枚举值</p>
0 h2 t- _; h; k/ b+ a. P) f<pre><code>type Job = Box<dyn FnOnce() + 'static + Send>;
/ G4 H$ R4 K! w4 P, J a! [3 j3 V$ P- yenum Message {
7 a" h! Y+ X0 m; H ByeBye,
3 t. ?$ u' }$ m Z9 Z1 M NewJob(Job),
0 @0 y# L1 y4 k, B! H/ Y}7 L! Y8 ~3 \1 f6 m, n0 U$ x7 _
</code></pre>& c1 i( \1 e8 S4 G
<p>Job是一个要发送给Worker执行的闭包函数,这里ByeBye用来通知Worker可以终止当前的执行,退出线程。</p> N: O' l2 E! j2 e" Q3 [
<p>只剩下实现Worker和Pool的具体逻辑了。</p>4 J' W0 n ^" ]9 L
<p>Worker的实现</p>
: C L* f* b3 P<pre><code>impl Worker
6 {+ `. M! ^! V: j: h% o6 o{
& O: D! x% U0 N fn new(id: usize, receiver: Arc::<Mutex<mpsc::Receiver<Message>>>) -> Worker {
( D! T, U" Z" V let t = thread::spawn( move || {
; N2 D& H* ?* e' O1 E7 ` loop {
5 J0 _- D0 E3 b4 m# C( d( E9 X let receiver = receiver.lock().unwrap();; e4 m7 L9 A, p- C. n
let message= receiver.recv().unwrap();
* ~# }7 c! N' E match message {; W9 H' T4 C1 }; Z
Message::NewJob(job) => {
) p ^/ P+ D5 f println!("do job from worker[{}]", id);- m, t; d. C: s: \, X# r: |
job();& r- ?# n8 k8 Z7 y# \- S6 v
},
: y3 O8 D7 t7 z5 b7 g- Q! R) v Message::ByeBye => { @$ n* x, e' c8 f' w* O' O
println!("ByeBye from worker[{}]", id);; Z- C( w" d$ b& g
break
0 N7 }; F- r) } },
o6 @; ~% L' r6 R+ t' [. k% a } 6 H7 Z; N" q! K6 Z, K
}
( Q3 s& V9 D: j8 i. f });
/ Q4 M1 F6 B6 u9 N: @5 T! c M0 k# V
Worker {
) h0 ^8 ?& o4 o8 u' M8 j0 d _id: id,
F2 r1 U, l$ [0 v5 L t: Some(t),, W% {9 W1 y, r, j8 x
}
" I" T: \' o3 h0 O7 k! J1 ^# o8 K }
# u0 N* U5 O; C3 M}6 t4 b. n! O: v. R& |/ X, o
</code></pre># R+ C8 ]. J; L, l$ |; k+ J
<p><strong>let message = receiver.lock().unwrap().recv().unwrap();</strong> 这里获取锁后从receiver获取到消息体,然后let message结束后rust的生命周期会自动释放掉锁。<br>
0 U, {: c8 `/ {但如果写成</p>
$ C+ r$ V8 O' h2 T: _# k) c<pre><code>while let message = receiver.lock().unwrap().recv().unwrap() {0 g0 i5 W7 x3 W* v9 Z; n
};8 V; p% R/ v, a
</code></pre>
6 j7 k6 O! K2 X; x<p>while let 后面整个括号都是一个作用域,要在这个作用域结束后,锁才会释放,比上面let message要锁定久时间。<br>
9 I! L7 ~3 @3 Z) yrust的mutex锁没有对应的unlock方法,由mutex的生命周期管理。</p>
Z) {! |1 K( D; `9 W6 S* s<p>我们给Pool实现<code>Drop</code> trait, 让Pool被销毁时,自动暂停掉worker线程的执行。</p>
& d3 Z. A. Z3 m0 @. n3 |* o6 @) Z6 @<pre><code>impl Drop for Pool {. W2 W6 h# {" E( o$ ]
fn drop(&mut self) {0 S( o0 e. [( ?
for _ in 0..self.max_workers {- Y5 T6 M" c" G- N3 |( w/ u$ x
self.sender.send(Message::ByeBye).unwrap();1 b9 H3 p$ }! |# z
}
9 d( t2 t, h" p. X for w in self.workers.iter_mut() {
5 p2 l* C& V6 D if let Some(t) = w.t.take() {) W- E$ w- H- o8 r3 q: @: _3 w8 h
t.join().unwrap();6 q2 `2 U- S- l$ Z o- Y/ P
}
0 l! P; w0 K$ ?/ \) ? }
1 E% ^" _% G" B3 u+ s! L* ^ }$ C$ h. E$ a5 _# w* g5 K2 Y( n
}7 u3 g5 ]0 e6 \1 n; _9 v
4 G* E: G1 l F" W/ \
</code></pre>* k5 P) H: ?7 v6 f# S+ E
<p><strong>drop方法里面用了两个循环</strong>,而不是在一个循环里做完两件事?</p>
4 D) K" n9 w8 f& E# N& h0 P<pre><code>for w in self.workers.iter_mut() {
: @& }: x6 H' k7 ^2 o/ h if let Some(t) = w.t.take() { n4 S2 |" b4 e* O+ b. y5 ]
self.sender.send(Message::ByeBye).unwrap();8 M0 h( x5 j. S& G
t.join().unwrap();
. w" ^( K. V: l* f7 o }
0 }0 \# ?9 F2 U}
# k! Z) I6 X" c& l' r: C5 z% ~) L* w- t
</code></pre>+ m$ z" y; j+ v( \. C- q/ M
<p>这里面隐藏了一个会造成死锁的陷阱,比如两个Worker, 在单个循环里面迭代所有Worker,再将终止信息发送给通道后,直接调用join,<br>9 L3 F( t4 k7 u" _) N, _1 j1 R
我们预期是第一个worker要收到消息,并且等他执行完。当情况可能是第二个worker获取到了消息,第一个worker没有获取到,那接下来的join就会阻塞造成死锁。</p>
6 X8 w- V% g( d4 `7 U! K& ~<p><strong>注意到没有,Worker是被包装在Option内的</strong>,这里有两个点需要注意</p>
4 |' `: A, C! y! t4 @% U7 t# ^<ol>
$ V( c8 O7 r' A& _<li>t.join 需要持有t的所有权</li>3 z( n5 @+ E1 U2 a0 ?' O
<li>在我们这种情况下,self.workers只能作为引用被for循环迭代。</li>
& a3 }$ N( ~5 A4 @. G' t: H</ol>
/ D/ E( U) N& m- {( c<p>这里考虑让Worker持有<code>Option<JoinHandle<()>></code>,后续可以通过在Option上调用take方法将Some变体的值移出来,并在原来的位置留下None变体。<br>
% f# X7 o0 L, v- x. r) K. U换而言之,让运行中的worker持有Some的变体,清理worker时,可以使用None替换掉Some,从而让Worker失去可以运行的线程</p>
: u- F1 W9 d6 F. M B( w# X<pre><code>struct Worker where
: O9 o$ y: S. S( Y3 @$ P" \ N{; e) ^! R |% k' z
_id: usize,8 X9 `% ^+ z3 r2 P5 Z
t: Option<JoinHandle<()>>,* G4 d/ D4 C( G& V
}) p4 p- l7 d3 I& r) A
</code></pre>
( W: t$ Q# z+ g: Z) O1 \; U<h1 id="要点总结">要点总结</h1>
% c/ Y+ ^; O' V! i<ul>
8 p/ {+ v" p% M% z' D p<li>Mutex依赖于生命周期管理锁的释放,使用的时候需要注意是否逾期持有锁</li>: ~7 z, A Q) _6 I5 M1 M2 l: q' ?
<li><code>Vec<Option<T>></code> 可以解决某些情况下需要T所有权的场景</li>
: t) z0 l: E9 C' U( W</ul>/ C/ [; @5 Z- c Y; e3 x. u
<h1 id="完整代码">完整代码</h1>( S0 S3 u v6 B; D
<pre><code>use std::thread::{self, JoinHandle};$ P" P: W% G$ Q' D
use std::sync::{Arc, mpsc, Mutex};! f5 ~! t+ i: M. R
2 r/ Z6 c* g8 Y6 a5 w1 U* T: A. [8 h! `/ @* w9 Y
type Job = Box<dyn FnOnce() + 'static + Send>;# {% S" s2 `1 z0 n
enum Message {
% S4 ?4 K8 \% r8 g8 F: A9 [4 v ByeBye,1 g, r# b" L! U
NewJob(Job),
) g6 ]" K# {8 k, ]( D} ?6 H! L% f q5 N# T
' p7 X: u0 A: ~* }# o! B. |
struct Worker where
5 Z' R/ F# u8 x: D' P+ [$ w{ {& ]+ Z: m0 v( \5 {
_id: usize,/ ?! j o# \; U# l
t: Option<JoinHandle<()>>,9 G$ I9 w2 t% O1 d' ~4 v9 n: |
}
* r5 j) E/ K% K/ b( i8 n' L+ R# L c
impl Worker
2 Z% Q% p1 o4 _5 G' j{: J1 G% y. S, u( w3 F# C M( {3 Q7 I
fn new(id: usize, receiver: Arc::<Mutex<mpsc::Receiver<Message>>>) -> Worker {, F' c. s) S! Y9 ]' A I9 d: P
let t = thread::spawn( move || {
* f/ [( v/ f8 } loop {+ Q3 T! X5 ~% a9 w2 g; h
let message = receiver.lock().unwrap().recv().unwrap();$ w$ r$ w3 T5 W) O6 i6 }
match message {
, t+ I6 ~# t* t% t s% {5 Q Message::NewJob(job) => {9 Y6 s( s. Y! |7 q4 A
println!("do job from worker[{}]", id);! X$ F& b: I6 T2 G; n% O9 I
job();/ [' d' f- C1 v) T
},
- y- k& A6 N7 J3 o! Y Message::ByeBye => {3 b' y) e% ]+ G- e# _
println!("ByeBye from worker[{}]", id);7 S# q2 k1 q0 v7 U! n, [0 O: Y5 W2 N! G
break
S- ?" c" z1 {1 M; { },
' [4 s/ o& s2 K/ k* m- i } 7 |' |; r% T* v$ J" V1 S
}
4 r' \- b( L6 i1 S1 T });
( }$ |- u9 e! K. A [/ Z" Y& F& |2 z- m) W; m
Worker {
( p& T+ ^6 m/ p1 q$ X* {7 H; u. E" U _id: id,
1 E5 }+ }* \! z6 h- y3 K9 {2 B, u t: Some(t),) ]& u& p; v0 |
}
8 I! _7 }0 X& p( l9 @+ S) U }% O$ s, H6 L/ h2 M
}2 Y( A* P3 h, w
+ S1 h6 U# ]) j3 f& m; Apub struct Pool { v; u" w( e: e2 a' [. ?8 [* H
workers: Vec<Worker>,
3 N6 B! _" D) _ max_workers: usize,( K2 [. T. o7 j
sender: mpsc::Sender<Message>8 y0 r+ {7 ?9 Y' b9 i! r
}: F/ v# s6 x# M7 G' V
1 q6 m! h0 a. K- @5 _) ~: Y3 U: x
impl Pool where {
( O5 I! S# y' ?( u% S pub fn new(max_workers: usize) -> Pool {
! g# x2 G0 x4 \! U, q0 Y: f if max_workers == 0 {0 U: ^+ h8 m, E9 y3 R
panic!("max_workers must be greater than zero!")4 a) N! L7 H) H
}
: M" h- M5 w! W. T$ I6 r let (tx, rx) = mpsc::channel();
; M {3 S" ^; y' C8 y s7 Q; {- C2 x! T: `7 }* B0 F9 w; M
let mut workers = Vec::with_capacity(max_workers);+ o( l0 O$ J% C+ ~
let receiver = Arc::new(Mutex::new(rx));
; ~9 g( O) a: L5 X for i in 0..max_workers {
* m$ y# V# A' C+ _ workers.push(Worker::new(i, Arc::clone(&receiver)));
' N) b4 ^$ X8 Z( @& m: v }
' X! f/ K% H* M$ ~; c% G, f
% o( t; B, M" \9 P4 p0 O Pool { workers: workers, max_workers: max_workers, sender: tx }- x( o% j* w; e0 J: X
}7 b5 F& k5 Q7 Z* O) H, [
" s q5 ?- Y3 ~" i* w! T
pub fn execute<F>(&self, f:F) where F: FnOnce() + 'static + Send
. F2 t9 h5 n8 O% p; M; x {( R2 z! @* \3 S
5 J5 }; c& I# A6 h
let job = Message::NewJob(Box::new(f));
6 u/ z: o0 G: g. P4 @ self.sender.send(job).unwrap();
( u: v; l8 A) A$ p' | }: F, Y: y/ l& h
}
. E. ?. `5 {% u+ e! B# ~+ |" c/ J! H0 i/ D6 U
impl Drop for Pool {
/ |! l' P5 t* z3 n5 k/ Z! v fn drop(&mut self) {; v$ Y# g% O: c5 |1 Q! X8 Z; I; x
for _ in 0..self.max_workers {
9 }' s1 R% Y( }5 b. s' a self.sender.send(Message::ByeBye).unwrap();
( k& d3 y+ m& t& \ }
" U0 ~. e1 Z: k) y, Y" Q for w in self.workers {8 Y1 n" t% G6 S# P
if let Some(t) = w.t.take() {
: a) G' @4 B s8 _$ T t.join().unwrap();9 K6 `2 y& |5 P! L: ?
}+ k9 @ F/ p8 |) U5 r O
}
& `" i% `. Q5 G" b }
; L& S4 a9 h4 A- @+ ~) R6 u}0 d7 [* ^" N, e, X8 V
6 P9 F& n8 F) J! l9 @: z0 o9 m4 |1 O3 T _& r$ O6 l& i$ r
#[cfg(test)]
0 e" Y) Q) B4 D2 e! @/ u& `- G% Amod tests {$ k$ }* M9 W3 k! j2 a
use super::*;
) Z" W+ w- [# o2 H6 ]) n% N! e #[test]
e- v$ t( i: o3 D# i& g6 Y fn it_works() {8 S o; K5 y' @5 y7 G
let p = Pool::new(4);
* d+ h4 K% H+ {% ?0 F* U p.execute(|| println!("do new job1"));
9 H; J4 D/ `: A9 M* q6 A6 a p.execute(|| println!("do new job2"));7 p3 g7 @! a4 K" r9 D
p.execute(|| println!("do new job3"));
! \6 d3 c5 C. q& ?0 k4 q0 C$ ? p.execute(|| println!("do new job4"));" K' `: I4 `6 N: q& E% K! P X+ p
}/ ^# p5 \% W4 @3 ^$ v0 M
}
! ]; A% ^: k3 o7 r3 l- A2 h</code></pre>; K; ^1 b6 ^! {3 ~
9 C3 z% q8 u( P1 P |
|