|
|
+ z5 A) {" @! Y, n2 t! W<h1 id="如何实现一个线程池">如何实现一个线程池</h1>
9 y1 g: x' A5 `$ C$ y<p>线程池:一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。 例如,对于计算密集型任务,线程数一般取cpu数量+2比较合适,线程数过多会导致额外的线程切换开销。</p>
7 @7 ^9 b" G/ }+ d1 o# J<p>如何定义线程池Pool呢,首先最大线程数量肯定要作为线程池的一个属性,并且在new Pool时创建指定的线程。</p>
: Q y# T2 b! W; t. m<p>线程池Pool</p>
" H! N" R& M7 D: ]# g+ \( a<pre><code>pub struct Pool {
1 H; \. x! v! j4 [8 W4 } max_workers: usize, // 定义最大线程数. X3 ?$ Y& i1 [3 q8 P( W
}5 K; B, {* B3 {. z& ~' J% T4 K8 r; V6 g
& Q7 o. ?5 O* H4 i6 himpl Pool {
; d9 U: w H9 \) A7 O/ q fn new(max_workers: usize) -> Pool {}1 m% `/ f6 y, [; L( i! m7 p+ ~
fn execute<F>(&self, f:F) where F: FnOnce() + 'static + Send {}
4 [3 ^, l# F1 S1 Z- U" ^}0 F+ q' }+ V+ l5 T
4 c+ b, p5 m6 U: H+ [4 S' l
</code></pre>: _' `0 q( ]4 B' H
<p>用<code>execute</code>来执行任务,<code>F: FnOnce() + 'static + Send</code> 是使用thread::spawn线程执行需要满足的trait, 代表F是一个能在线程里执行的闭包函数。</p>
3 ^( ~1 B) i1 ^ o N2 e<p>另一点自然而然会想到在Pool添加一个线程数组, 这个线程数组就是用来执行任务的。比如<code>Vec<Thread></code> balabala。这里的线程是活的,是一个个不断接受任务然后执行的实体。<br>
# Y( I, V- n, l4 Y5 P9 P* I" k可以看作在一个线程里不断执行获取任务并执行的Worker。</p>
" @# `4 M- q+ A; _) x) A) I+ s<pre><code>struct Worker where) w, ?( {: i7 I' I
{- |5 L* B4 l* o+ N5 f7 [- e/ S/ f
_id: usize, // worker 编号
# _4 L- W5 r! T2 e}
$ F" R' K7 b. q7 b</code></pre>- l# f5 C" c: N2 c, f$ A
<p>要怎么把任务发送给Worker执行呢?mpsc(multi producer single consumer) 多生产者单消费者可以满足我们的需求,<code>let (tx, rx) = mpsc::channel()</code> 可以获取到一对发送端和接收端。<br>1 U0 H3 B- q! N
把发送端添加到Pool里面,把接收端添加到Worker里面。Pool通过channel将任务发送给多个worker消费执行。</p>
1 t) j# }7 H+ \( N8 _' O; H<p><strong>这里有一点需要特别注意,channel的接收端receiver需要安全的在多个线程间共享</strong>,因此需要用<code>Arc<Mutex::<T>></code>来包裹起来,也就是用锁来解决并发冲突。</p>& l& Z1 x; l/ c- _( V
<p>Pool的完整定义</p>8 G* Q6 B) p7 `1 s% r
<pre><code>pub struct Pool {& q# k" H" Y, M
workers: Vec<Worker>,
' b6 p. c# E; `5 A5 u) J max_workers: usize,
+ z4 v! I* y _& A3 n2 g2 ~( f sender: mpsc::Sender<Message>
, [) {: I1 a A3 U}& y6 @: ?/ t: c5 ^/ m6 p
</code></pre>9 @5 y I% r( B: T/ f1 q+ E/ v
<p>该是时候定义我们要发给Worker的消息Message了<br>
9 Y8 C5 V* T8 P$ Z/ |! s' i; I# o定义如下的枚举值</p>
' B3 M" i+ l9 f/ w) w/ S* T<pre><code>type Job = Box<dyn FnOnce() + 'static + Send>;
$ I+ F& j: `6 Denum Message {
; r1 D+ z: E9 d0 N* _ ByeBye," R3 z2 }0 o! Y( q
NewJob(Job),
1 N' E0 ~# I. @5 Y$ D N}+ @: V& {4 T b: I
</code></pre>1 K, d. t2 L" G( [$ j4 _, }
<p>Job是一个要发送给Worker执行的闭包函数,这里ByeBye用来通知Worker可以终止当前的执行,退出线程。</p>, `/ I$ j# i; D0 S# e+ o2 h. T8 w
<p>只剩下实现Worker和Pool的具体逻辑了。</p>1 T& F# X5 `5 C2 l; u% I8 l
<p>Worker的实现</p>
# x0 i0 m2 b2 i6 r$ y8 N, s<pre><code>impl Worker
# Z3 [9 P+ }, c: v5 z7 ]$ C+ C: n' }{8 U+ r4 _2 W G( V" c9 a
fn new(id: usize, receiver: Arc::<Mutex<mpsc::Receiver<Message>>>) -> Worker {8 H l) n+ Y1 F( z2 e
let t = thread::spawn( move || {# V9 W: {( Y8 h1 b
loop {
& l1 P7 D/ H% p1 \: m4 a: d let receiver = receiver.lock().unwrap();
! M6 f5 r" Y1 d+ {/ B/ O+ W9 G let message= receiver.recv().unwrap();
9 @1 o! H9 X- k7 V- i( @9 t0 ^ match message {
( x% K$ e3 u/ P* N Message::NewJob(job) => {
C8 p5 I6 i" d+ F println!("do job from worker[{}]", id);
4 k6 Q% w4 T! u* D3 @! X8 K job();2 j4 @6 S2 r: k+ i* W6 a4 n t }
},$ x' f0 T$ v2 Z" C/ m1 ~ q
Message::ByeBye => {
( ]/ X- |, `6 j println!("ByeBye from worker[{}]", id);
0 t9 E8 B" d+ Y. G break
% t) T Z: {$ e. b, T) q },
( s9 {7 B. `* L0 G }
. l0 A) s c+ y& a- G2 d' l# {+ M }2 @% f L5 h/ a! [1 Z- k/ c, {
});
5 v& c& [% w* M* z% G; R8 o, N( s/ A! I6 Z
Worker {
6 S: d/ {% o" r: n. E! C _id: id,* L$ ^1 }+ f" ^: S
t: Some(t),) p6 z+ X: T. ~' R+ w6 l
}: h+ P+ n6 Q+ ]* T0 A! Z9 p
}
" F6 U V! @( Y/ x E}
I7 \9 k5 |6 J' s$ Q2 H9 Q) W</code></pre>- ]* s | G# H0 h
<p><strong>let message = receiver.lock().unwrap().recv().unwrap();</strong> 这里获取锁后从receiver获取到消息体,然后let message结束后rust的生命周期会自动释放掉锁。<br>
6 f1 w7 {6 m" o. L) U% D但如果写成</p>
; c1 P$ R) z7 I$ L7 e<pre><code>while let message = receiver.lock().unwrap().recv().unwrap() {
, r# f2 X# d9 X/ C" A! ]7 d};% N" B3 ~* c2 I4 r
</code></pre>* p' ~2 z! e/ g3 M1 N7 ]) R
<p>while let 后面整个括号都是一个作用域,要在这个作用域结束后,锁才会释放,比上面let message要锁定久时间。<br>
% r$ _+ u$ J# S% ]9 ~" @rust的mutex锁没有对应的unlock方法,由mutex的生命周期管理。</p>: i. p! G! t$ h
<p>我们给Pool实现<code>Drop</code> trait, 让Pool被销毁时,自动暂停掉worker线程的执行。</p>$ I- B: G! g \
<pre><code>impl Drop for Pool {3 Z4 N/ c* b5 \$ S0 k/ Z
fn drop(&mut self) {
9 n& [ m9 t# H) `2 G for _ in 0..self.max_workers {
8 @- m [% j) V n self.sender.send(Message::ByeBye).unwrap();
+ v3 {' F" C# N/ F* @ }, x0 x, m& Q" |/ }+ R8 o" d* X E
for w in self.workers.iter_mut() {2 ~: R) r m) I9 U" D
if let Some(t) = w.t.take() {6 J7 ?; v' f$ w# s
t.join().unwrap();* [( B7 j1 X& z9 Z" T
}
( N2 m% r# E! \: n6 R/ a; D& M2 \: ] }
" Z4 h1 F. r( [2 M/ _4 p/ M }, ~" |* u8 ^/ b; V1 V$ E0 d
}1 C8 [6 l& A0 l# H0 y( D7 r1 L
( w* r) Q* i; h3 L7 P& w</code></pre>
0 r6 X' t+ `4 M L. g<p><strong>drop方法里面用了两个循环</strong>,而不是在一个循环里做完两件事?</p>
- e( v, m' z: b, N4 N1 d<pre><code>for w in self.workers.iter_mut() {
1 X# d+ b! F6 K1 Y if let Some(t) = w.t.take() {
( u$ m3 h- ~! ~4 e t u7 e3 D self.sender.send(Message::ByeBye).unwrap();# Q/ ~0 e, \5 ]- {" y
t.join().unwrap();
# r' A* K1 | ^1 G+ C- e }+ B) a$ k1 B8 _& i& n
}4 F7 s F! ]0 {
' a$ h/ U$ J! }3 T$ k</code></pre>+ V. }* _, x) c! H* U/ P8 V
<p>这里面隐藏了一个会造成死锁的陷阱,比如两个Worker, 在单个循环里面迭代所有Worker,再将终止信息发送给通道后,直接调用join,<br>
6 q8 ], U6 w& {, G我们预期是第一个worker要收到消息,并且等他执行完。当情况可能是第二个worker获取到了消息,第一个worker没有获取到,那接下来的join就会阻塞造成死锁。</p>
0 p, G) a. I- D5 N+ i<p><strong>注意到没有,Worker是被包装在Option内的</strong>,这里有两个点需要注意</p>
2 n1 ]% H3 p! r9 Y* W- x" U<ol>" t; |0 d% A+ C& k6 x1 f9 S1 S1 D: e
<li>t.join 需要持有t的所有权</li>
8 }, d+ z0 g2 |$ o3 H<li>在我们这种情况下,self.workers只能作为引用被for循环迭代。</li>
6 i# ~8 O+ v6 m</ol>
7 f# e7 B) ~7 S. n9 P1 @<p>这里考虑让Worker持有<code>Option<JoinHandle<()>></code>,后续可以通过在Option上调用take方法将Some变体的值移出来,并在原来的位置留下None变体。<br>
4 z; i1 G2 }3 ?/ o* s" U, ?8 q/ C换而言之,让运行中的worker持有Some的变体,清理worker时,可以使用None替换掉Some,从而让Worker失去可以运行的线程</p>
6 d: D% ]- S7 V9 ?* j4 Y<pre><code>struct Worker where
: U1 v9 \/ d& A0 e3 e+ ^3 @2 t{. U4 }6 h! H( w: r
_id: usize,5 e; v5 I) T$ e: A
t: Option<JoinHandle<()>>,
5 y$ c$ r3 A' q) f2 M}
+ f, F* c/ E' t3 W+ v3 U: _</code></pre>
: x* Q# ?* m# h# F, T% c* C<h1 id="要点总结">要点总结</h1>" ]1 J9 h" o1 k$ C1 l% F
<ul>
6 G# N+ F7 c8 o+ h( g# U* H<li>Mutex依赖于生命周期管理锁的释放,使用的时候需要注意是否逾期持有锁</li># f: X! j) ~) q* W1 p, U+ D
<li><code>Vec<Option<T>></code> 可以解决某些情况下需要T所有权的场景</li>5 n$ Y& S1 z6 @2 @; X; {5 C
</ul>
- i+ P7 c3 o" Q9 M& |! k<h1 id="完整代码">完整代码</h1>
7 Y( k) Z9 e! K# N I5 `<pre><code>use std::thread::{self, JoinHandle};
( Y6 f* Z" _. e0 H3 }) U3 r% O3 Euse std::sync::{Arc, mpsc, Mutex};
( t& V* L/ |6 E+ Y- U2 P3 ~; ?2 m; ~* K7 R
7 w( q0 R+ t) h- |& f7 Rtype Job = Box<dyn FnOnce() + 'static + Send>;
4 W! K2 R4 n$ t/ ?* J4 x O7 genum Message {) ]* h- w! y& J: ]; _
ByeBye,1 p- i( u2 S0 P4 N' H% Y# d$ R# J
NewJob(Job),) a2 ?% r) D- `" s
}: B4 M. s. t( f# x5 Q% E- q: ^' o
) U+ _8 E1 d9 n, r" u3 V7 ]
struct Worker where3 f/ C% V+ n. y2 j
{- R& I3 S- D$ Q2 v
_id: usize,
- t& p6 ^3 ^% ]+ t. } t: Option<JoinHandle<()>>,
' m- |/ r; Y- H$ v7 m3 d" P}
& G5 n: E3 R+ `/ `1 Q4 \2 @- _
; d6 ?9 t9 |, w# Kimpl Worker( H8 W# Z, {5 H" H
{# j) N& d3 @& i, l/ B! N g
fn new(id: usize, receiver: Arc::<Mutex<mpsc::Receiver<Message>>>) -> Worker {; r, c4 s7 Y3 v; V
let t = thread::spawn( move || {
& r+ V" \: }- j loop {6 y# e/ h; w5 U/ W# H+ u
let message = receiver.lock().unwrap().recv().unwrap();
( \/ _ h) G; ?1 L% @) } match message {
5 d9 l; C6 N% o* k Message::NewJob(job) => {
" e& T) L, U8 L println!("do job from worker[{}]", id);8 w7 `* f; u7 Y( T0 a
job();" `: m) c! F4 |8 A. C1 @3 n. F
},9 | u# X) B k Z5 L* b2 i
Message::ByeBye => {
2 [* S. G' O8 k0 p2 Q( I5 b println!("ByeBye from worker[{}]", id);! e! S: {3 Y$ ]/ i% V
break
; {3 u" b) I7 R- A* ~9 U },
j7 [6 i! W6 {# |- m N; g }
' L3 z1 j$ v* {7 F+ U }
) [2 N [( D0 F" r8 Q6 H- n });4 R& E. R$ X8 Z( i. |% ?6 K
) |1 i+ o0 W8 o; r: B# Z& J Worker {
3 B, a. M1 `; c( ? _id: id,
. l; l8 O, C P6 X t: Some(t),8 |6 K2 X9 ~% L0 _0 Y5 Z; Z
}* w( N8 [; J) ~: O' U( p
} @# f. H! |& [4 i- f
}5 a) t" I; d( b- b3 z
& i8 W6 ?6 n5 v3 v" W$ \6 Z5 H
pub struct Pool {
2 y# L+ x7 H" V workers: Vec<Worker>,% B5 T5 g3 q0 I$ C$ a; c7 S
max_workers: usize,* v; H# n( ?* W, Q7 }$ h
sender: mpsc::Sender<Message>7 S6 I: m% R0 I0 A7 X! |
}8 y! ~# _1 J4 V2 O4 Y) M2 K
4 A( N- u/ w% D; }( J! B
impl Pool where {1 n6 _" W5 h5 `$ b
pub fn new(max_workers: usize) -> Pool {" l" {1 `& A# W2 L8 Z
if max_workers == 0 {
6 E9 w8 A' C+ }4 ` panic!("max_workers must be greater than zero!")
- S' C8 C, E7 I; ~; \ }# y# R4 i, Q2 K$ k0 ^6 {4 u5 O1 {
let (tx, rx) = mpsc::channel();& q$ `& V' O& W/ v- D% E+ ^
% P5 i0 r' h* ?( W! y! c let mut workers = Vec::with_capacity(max_workers);
9 B+ s; ]% Q/ ^0 l* b+ s7 Z let receiver = Arc::new(Mutex::new(rx));" a& Z% q8 l( a9 ^
for i in 0..max_workers {6 @6 R' u1 J" O4 J2 k
workers.push(Worker::new(i, Arc::clone(&receiver)));
$ I) B/ f" C2 Q* q- ~1 ~ }
4 Q( l% e, @# G/ i! o1 Y2 W5 }; J7 {4 D$ H( u3 H' J* Z0 L
Pool { workers: workers, max_workers: max_workers, sender: tx }
7 [' [4 i! }3 i$ r. C }
! u m3 J: D( F& I
1 g+ X' |; @6 r( \" ]) U; Z/ d; n6 F pub fn execute<F>(&self, f:F) where F: FnOnce() + 'static + Send
5 C" ]2 ]0 G0 |1 q0 N& W {/ Q" H* }) ?! A
' q2 o/ i; ~# o0 Y6 L3 k9 W
let job = Message::NewJob(Box::new(f));+ D% ^5 f5 w; _( v/ B- N
self.sender.send(job).unwrap();
6 O8 y" @% n6 U5 l7 [1 f }6 B4 E4 `. l7 E
}- V! k$ g- Z7 M8 b3 p
+ c2 W; M% k( R; B9 ~$ P& ?, ^* Ximpl Drop for Pool {
' B+ I8 \* ?% S' o fn drop(&mut self) {9 r/ R6 {# i# ]7 _+ L
for _ in 0..self.max_workers {
' x) _3 X- M3 x( k: F( T6 s self.sender.send(Message::ByeBye).unwrap();- T0 }5 r+ a9 z8 H. Z/ F
}9 x2 y. a9 l7 U( z$ U
for w in self.workers {
4 b% S% ?* c& }4 Y2 j% ]+ b9 C if let Some(t) = w.t.take() {! K0 H4 @% E* h! K7 O
t.join().unwrap();
# s \1 A. ]& V* V! r }
! \2 a, ~" B. Y5 I6 j( y5 D5 q* Y }: `# y6 w% M# ^& S U2 C% T, l
}
4 J6 k7 J! p q/ w. P}
0 x1 x2 P0 n1 _1 U
! Y+ u7 \" O( V+ _
# Z/ Z" L; O1 M' Y# e#[cfg(test)]
' n- ]- H/ J1 M9 w. l: R# C6 {mod tests {
' Y/ k0 P" z7 K$ N use super::*;5 J. p, X8 h! t5 M+ g
#[test]
3 ~& e# U+ T2 G. |. w; R fn it_works() {* ]( y+ z" ]+ l
let p = Pool::new(4);+ t' S' o4 O% R# n, c2 T$ [3 `
p.execute(|| println!("do new job1"));
3 s# v' e; Z9 c7 F p.execute(|| println!("do new job2"));; X6 h6 R, Q7 ]3 P7 _
p.execute(|| println!("do new job3"));
' E/ `8 I7 I! n1 R, M0 v p.execute(|| println!("do new job4"));% e/ h! f: f" z# N5 J; W
}& H# n L5 x' M
}
7 }* E, A& S- Y1 C6 n. ^, X) e</code></pre>
2 K& u- R$ _5 w1 }
5 M5 q5 K" u0 \7 h4 d! M5 ]2 D |
|