|
|
! n5 r& q# G7 g, k: R) ~
<h1 id="如何实现一个线程池">如何实现一个线程池</h1>7 b; a2 X/ B" c5 w3 O4 C- {
<p>线程池:一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。 例如,对于计算密集型任务,线程数一般取cpu数量+2比较合适,线程数过多会导致额外的线程切换开销。</p>9 Z+ f4 l; p9 I' _; v7 Z
<p>如何定义线程池Pool呢,首先最大线程数量肯定要作为线程池的一个属性,并且在new Pool时创建指定的线程。</p>
4 Z) D0 q* v2 u1 P: g0 f# g, j" m<p>线程池Pool</p>
# ?7 i M2 h* l2 I<pre><code>pub struct Pool {2 s w9 u C' ^# U6 Z* Z
max_workers: usize, // 定义最大线程数
8 c# A& y V+ X}$ h, Z+ _) r+ i7 Y0 ^. K
( M/ U6 a, r8 r' o$ H3 Simpl Pool {
: G1 r" u! x' h" @ fn new(max_workers: usize) -> Pool {}
- ^& j' M7 v! @( y7 K$ q: u: u fn execute<F>(&self, f:F) where F: FnOnce() + 'static + Send {}
- y4 _, _; i$ |' x: H0 v}+ [' z: Q$ U7 R! Z
5 A/ F t+ s, h1 F# g2 J( i' w</code></pre>
" X+ |- R1 ~6 e6 E+ G% n5 Y+ B<p>用<code>execute</code>来执行任务,<code>F: FnOnce() + 'static + Send</code> 是使用thread::spawn线程执行需要满足的trait, 代表F是一个能在线程里执行的闭包函数。</p>
/ x7 o; j2 E, m6 V7 E: E0 i4 e<p>另一点自然而然会想到在Pool添加一个线程数组, 这个线程数组就是用来执行任务的。比如<code>Vec<Thread></code> balabala。这里的线程是活的,是一个个不断接受任务然后执行的实体。<br>0 r! `2 Y) I: V1 ]7 }
可以看作在一个线程里不断执行获取任务并执行的Worker。</p>& _" W+ J) V6 X2 v. @( t
<pre><code>struct Worker where5 d. y# O0 P2 H& g. i/ F; W
{
: U( _' D0 h1 v; t _id: usize, // worker 编号
. q5 m9 j5 i. ~. l8 l/ F7 B}
/ z3 x: L) F% n2 S, }4 B# p</code></pre>" F- F3 N, H. `5 P) R3 \
<p>要怎么把任务发送给Worker执行呢?mpsc(multi producer single consumer) 多生产者单消费者可以满足我们的需求,<code>let (tx, rx) = mpsc::channel()</code> 可以获取到一对发送端和接收端。<br>
8 C4 y' S; `( f- W1 F' W) U' }; m把发送端添加到Pool里面,把接收端添加到Worker里面。Pool通过channel将任务发送给多个worker消费执行。</p>
4 |- R7 ?$ E/ g0 l' J<p><strong>这里有一点需要特别注意,channel的接收端receiver需要安全的在多个线程间共享</strong>,因此需要用<code>Arc<Mutex::<T>></code>来包裹起来,也就是用锁来解决并发冲突。</p>
. a% w5 i( e' S8 K- f+ W<p>Pool的完整定义</p>
( g3 i+ u' a# X<pre><code>pub struct Pool {
% M, e; z: ^ E2 l( e/ }1 y Y workers: Vec<Worker>,
3 L" ], M Y) ^ max_workers: usize,! H- s* ?: _) r+ l! \7 m8 K
sender: mpsc::Sender<Message>
9 E+ [1 @6 j, G; w& n$ F9 O( r) e} H0 ]$ P- J' b' h, Y& ^
</code></pre>
3 t4 Z' @; R& z2 y8 O8 g" H+ M<p>该是时候定义我们要发给Worker的消息Message了<br>
( `5 r0 {' l1 v) V2 x定义如下的枚举值</p>
% s5 X1 v* x9 n" U<pre><code>type Job = Box<dyn FnOnce() + 'static + Send>;
8 i& @* m. U9 R, o' O: oenum Message {
; u& e# ~0 l. q$ q ByeBye,
& p$ k8 ] S* R0 q0 |+ y V+ l NewJob(Job),! `& b7 p. }# }1 u% r
}
% l# a% v1 A o2 J3 Y$ B7 T</code></pre>
! h H) h# k$ M7 z( R<p>Job是一个要发送给Worker执行的闭包函数,这里ByeBye用来通知Worker可以终止当前的执行,退出线程。</p>' ? [* x) m) p4 z+ z5 G1 h7 `
<p>只剩下实现Worker和Pool的具体逻辑了。</p>7 F6 P- i# ~/ R( f: U8 _& R
<p>Worker的实现</p>
" f$ f7 u; l# H: @<pre><code>impl Worker
5 _! |( v, l7 t+ z4 K+ E{: |2 w7 X, h. K
fn new(id: usize, receiver: Arc::<Mutex<mpsc::Receiver<Message>>>) -> Worker {0 n' T7 C {( b3 A# z
let t = thread::spawn( move || {
( F0 D% E3 \4 \. O8 F/ s* v loop {
& t8 L( ?5 c) o4 t% h! H0 h6 V; g let receiver = receiver.lock().unwrap();8 c1 b# }% h+ c9 P1 F
let message= receiver.recv().unwrap();
. M7 x/ z% G/ K. C0 j. U; W% H match message {
* C/ \6 V( p" |5 t3 K2 G Message::NewJob(job) => {; F+ ^0 J3 f4 p. ^2 s
println!("do job from worker[{}]", id);; p# R& e7 Y7 \7 @8 j
job();) Q5 W6 I, F3 w
},; A5 p- N. o0 u) G& I2 ~: W/ u) D* J4 P
Message::ByeBye => {
- |5 Q( y4 Q9 a" g8 m0 ` println!("ByeBye from worker[{}]", id);, {% u8 I! M) d, i8 z5 B0 y
break
* `8 y5 u5 a8 Z) D6 [8 A },$ P6 H- Y0 }6 Q# G
}
8 A! k1 e7 w: [: S }
9 _1 V2 ?2 W4 B9 F [( o });! _5 R9 b& F: b3 F5 c% F0 j* t
; Z) V- ~ I& [ K
Worker {
6 e7 B- D5 W0 z$ Y# W2 }) [ _id: id,
9 f; S1 y; A" i t: Some(t),
' E: i! ~% I" d& F }
5 G' o8 B2 M: A" _8 d! t }
' h0 g+ k! F* t! `: J/ z2 D b ^" c}) Q6 {8 j! b7 A6 _" v
</code></pre>( u3 D9 e3 V. m7 r+ K- h+ u
<p><strong>let message = receiver.lock().unwrap().recv().unwrap();</strong> 这里获取锁后从receiver获取到消息体,然后let message结束后rust的生命周期会自动释放掉锁。<br>- N2 D p/ m+ w4 A% W7 v- e
但如果写成</p># O7 ?6 b; j F( i% L3 r" d
<pre><code>while let message = receiver.lock().unwrap().recv().unwrap() {& ^/ E* b7 x) R9 v- F3 K
};
8 c. L& E8 V) O0 J1 c" x- _</code></pre>
/ z2 q, }( q- R h: {" {<p>while let 后面整个括号都是一个作用域,要在这个作用域结束后,锁才会释放,比上面let message要锁定久时间。<br>8 r0 ^4 p: |4 W: F
rust的mutex锁没有对应的unlock方法,由mutex的生命周期管理。</p>
. f8 y* r( u! a Y- `<p>我们给Pool实现<code>Drop</code> trait, 让Pool被销毁时,自动暂停掉worker线程的执行。</p>( @) ^ c/ A4 Q* a5 j1 r8 C# G( C
<pre><code>impl Drop for Pool {$ J# q& |; E @, R6 s" `+ q
fn drop(&mut self) {0 e" n" j" q# n* B: q/ y& q# o
for _ in 0..self.max_workers {/ o0 D+ q) E- [. T9 R+ U+ P
self.sender.send(Message::ByeBye).unwrap();
& S6 @+ ]3 h. Y9 s }
$ W" Q+ }( V! T$ r( r0 O for w in self.workers.iter_mut() {- G: J# ^+ f4 @) G5 M+ Z
if let Some(t) = w.t.take() {
) }; s3 d x/ }) `$ n( z9 q t.join().unwrap();4 u+ Y$ i- y& V: S$ C* R
}
; l3 s( [4 s" h; O. |2 E }
% \% U, m/ t; M) @; W O, Q' \$ D }
& {4 [. b1 d4 m o}
8 T* M: O3 O& _) S2 R0 C$ k! @6 B# y1 |
</code></pre>
$ \. k6 @" p$ v" r- ]" Z% s<p><strong>drop方法里面用了两个循环</strong>,而不是在一个循环里做完两件事?</p>
! M5 s6 ~0 h7 b4 d<pre><code>for w in self.workers.iter_mut() {) \7 Y: B# `0 g6 L
if let Some(t) = w.t.take() {$ j" T" b. @6 g! u* U( ~
self.sender.send(Message::ByeBye).unwrap();. a/ t! X6 l# c0 a5 Q( S
t.join().unwrap();
& A1 B( T* h- J7 Q9 v }" D# J6 U6 n7 A$ P! B* \
}
$ b) {, O3 Z! B1 o/ ?* g0 R% I/ y# {8 }) C$ \) G/ u! a' O
</code></pre>
' X- N& \+ B, q2 Z8 c- S# W<p>这里面隐藏了一个会造成死锁的陷阱,比如两个Worker, 在单个循环里面迭代所有Worker,再将终止信息发送给通道后,直接调用join,<br>* Q" f9 K$ Y6 n/ Q- u
我们预期是第一个worker要收到消息,并且等他执行完。当情况可能是第二个worker获取到了消息,第一个worker没有获取到,那接下来的join就会阻塞造成死锁。</p>
& H$ Z9 h- [) X5 g1 f<p><strong>注意到没有,Worker是被包装在Option内的</strong>,这里有两个点需要注意</p>
7 X1 \- U3 J' U% r r<ol>! H4 e1 T# }% C7 H2 j6 d
<li>t.join 需要持有t的所有权</li>' M: c! Y+ r$ V' j
<li>在我们这种情况下,self.workers只能作为引用被for循环迭代。</li>
" C) m g4 L$ |: G/ u4 R0 t6 Q</ol>
* U- [3 p/ c6 X3 b<p>这里考虑让Worker持有<code>Option<JoinHandle<()>></code>,后续可以通过在Option上调用take方法将Some变体的值移出来,并在原来的位置留下None变体。<br>
: c) M/ F6 V+ Z换而言之,让运行中的worker持有Some的变体,清理worker时,可以使用None替换掉Some,从而让Worker失去可以运行的线程</p>
; g8 }. i. A3 F( c; o" h, r/ a/ @<pre><code>struct Worker where, w) @3 @" S& l. c: ^6 N
{+ {+ J* @3 s+ l# H* v/ L
_id: usize,* M8 l$ |) s" t: K, S" Q; R- B
t: Option<JoinHandle<()>>,
8 s# ?4 N N- X1 e! s% ?}
0 _$ e8 C9 ?4 r</code></pre>
$ F! k9 M% u8 J) _% a8 S( P+ R& X$ l<h1 id="要点总结">要点总结</h1>
8 ~' U$ a1 C, ]' x<ul>3 j) I+ d' h) p7 ~7 ~8 @* L
<li>Mutex依赖于生命周期管理锁的释放,使用的时候需要注意是否逾期持有锁</li># ]; V) h$ s7 v6 O* v) K, x3 [
<li><code>Vec<Option<T>></code> 可以解决某些情况下需要T所有权的场景</li>
. B$ b2 d9 W. J! z% r0 V% L6 `, e</ul>
. F$ o# x9 J- d) d<h1 id="完整代码">完整代码</h1>1 A$ _7 `4 }6 l' a, p U3 ~: b
<pre><code>use std::thread::{self, JoinHandle};8 n& b( k6 @$ |3 A& [; I( n
use std::sync::{Arc, mpsc, Mutex};
. G' Y7 ]' Y4 x h/ @. g
2 b. y% H. u! \% ?( b, p+ @! t! I% ^& E+ |' G# A# B6 f' Z! m
type Job = Box<dyn FnOnce() + 'static + Send>;
3 c% ?5 @# f3 h& s* uenum Message {
`8 q( ?2 f# F' f ByeBye,$ A1 t6 H4 y$ k6 N
NewJob(Job),
/ X1 B, ~8 S( m" S. o}
1 l$ N0 n) m5 `' `1 t
/ k6 U! Y( g. Q s( g( v/ r# s( _struct Worker where
T" ^4 V5 V% |, [) H$ I{
" H9 T3 f1 I) `! n1 R" s _id: usize,
$ t7 t' |/ U# G# G* O t: Option<JoinHandle<()>>,
0 z. I+ ^$ z7 {" C4 N m9 D}, Y1 G1 w; D7 \# [3 R+ a7 {% O) ]
5 O) c3 ?2 W! h: o
impl Worker
4 a j1 p9 p D( B8 V{4 e. X4 @% X8 u3 P3 I3 @
fn new(id: usize, receiver: Arc::<Mutex<mpsc::Receiver<Message>>>) -> Worker {2 T9 y# S) n7 k0 X8 E/ P
let t = thread::spawn( move || {$ u8 K4 B! Z6 \3 _6 Q/ ~+ V
loop {! r( K. R, v# K# O4 y$ O
let message = receiver.lock().unwrap().recv().unwrap();
6 e/ X& |4 v* P- f" t" R' v3 v match message {3 K' q5 |, b- o! D7 t" \
Message::NewJob(job) => {
4 p1 f0 H d& Z4 Y println!("do job from worker[{}]", id);" ]+ z/ X( u4 |! T, U# B
job(); t% w, M8 |0 m f
},
- c; H# P7 N' p$ u Message::ByeBye => {0 V3 g0 }" d& x' s0 d7 U
println!("ByeBye from worker[{}]", id); }8 w. o# q0 y* N# F. L( F
break
/ `8 K1 M4 A6 h },1 z' N6 a5 Y* W
}
7 e# e7 b o! h }
0 {$ \0 }3 ^ V+ r* G6 W0 U, Q });
3 V5 G: B2 z9 o8 l3 b7 ]; g1 l/ U2 y3 G j7 X5 g! W
Worker {
! \( C3 u% \3 W, S) K# d4 s _id: id," l' k" ~0 W" p! }0 ~4 x
t: Some(t),& r: P3 \* [/ J
}: i- m" \% j- S7 _0 ?# g3 M3 O
}
% J. |% u; g" k( p5 S7 Z3 ?) b}
4 T4 f) N0 ]- Q! V
6 ^! \8 m' ?; }; Epub struct Pool {
0 w' ~ J m/ F0 o( v* m' T workers: Vec<Worker>,
y3 P+ ^. S. O& U max_workers: usize,$ ~+ E, g1 A+ T9 E
sender: mpsc::Sender<Message>
1 \, L/ l0 a; A9 L}" C* X8 k% t+ [) m# A2 E5 O
( J# A9 k- z$ n& h. vimpl Pool where {
5 g" C& z% X* ]% I( u pub fn new(max_workers: usize) -> Pool {4 o& C: e6 w! Z) m( i1 Z
if max_workers == 0 {+ @" h+ F! a' a6 @: O' o; M
panic!("max_workers must be greater than zero!")& L- S, S8 Y5 y8 _" T. U3 q5 C% d; Q
}& f2 R9 j* r1 u5 w$ @
let (tx, rx) = mpsc::channel();
6 s- ]2 `5 ~ S% @6 e+ }$ J5 X u2 S9 F
let mut workers = Vec::with_capacity(max_workers);4 D/ D' K) y$ k- z4 a6 T$ u9 Y
let receiver = Arc::new(Mutex::new(rx));
4 c( Q# O( y$ V) a for i in 0..max_workers {
9 V3 }# D% }3 t0 }6 r workers.push(Worker::new(i, Arc::clone(&receiver)));" @6 ]7 t1 Z( ?5 K" N
}# k6 [$ @7 W; V- e- w, l
o2 a; U! [/ C* Y, d) L% ~
Pool { workers: workers, max_workers: max_workers, sender: tx }
5 r0 e" u3 a2 c# P! ~; N }6 M" f6 d) _' C8 A
9 Z: d: k# \( { R$ ~ pub fn execute<F>(&self, f:F) where F: FnOnce() + 'static + Send
. H N x& L3 v! l! Z {! |; ^* y4 i0 l, k ]
0 j% ~" x1 g1 U4 H0 E7 J4 k
let job = Message::NewJob(Box::new(f));
8 n* ^6 K! p2 N; b self.sender.send(job).unwrap();
2 `, b" t/ m. G }: R& h( [4 Z. D8 {1 i9 D6 q
}
; _; M) M, Q ?3 r
* H. u4 ?3 ?/ ^+ @' |6 o, T* g& vimpl Drop for Pool {1 |8 m9 o& r9 p9 H" e7 [6 u; r
fn drop(&mut self) {7 Z" b1 k2 P j
for _ in 0..self.max_workers {, E/ x# S. U8 _ p) [2 r) C! {
self.sender.send(Message::ByeBye).unwrap();2 T7 ~: c9 f: J
} |* U. k% ~* d( u) s$ w8 `: V
for w in self.workers {
% c; g) B; w! R3 X if let Some(t) = w.t.take() {% t: V( R5 {" _$ h& ]5 O# U
t.join().unwrap();. g2 h4 R2 x6 i
}
3 S1 z" D. v4 G' L4 k Y$ W }& E: v; q u* ]! H- C' R) \
}
) L+ Y3 }8 s8 D5 ~; s2 ~}% w( y5 Q1 P6 m8 U( I w F
2 p! c; ~4 E+ J6 E
; P! ]: x% m( u#[cfg(test)]
' R" `8 w+ J' m8 hmod tests {- d2 \& O' @) _
use super::*;
+ v% ?/ Q! A, y5 z7 F0 J2 D* G #[test]* N4 I, a3 M8 ~* a# k4 O
fn it_works() {( C, ^ L0 {4 a& X* D
let p = Pool::new(4);2 n J6 Z3 g1 ^! q5 L. B
p.execute(|| println!("do new job1"));9 v) W. S! n1 Q6 ^1 t9 z/ f
p.execute(|| println!("do new job2"));# l7 V( @) ^+ I6 _. p4 U$ y
p.execute(|| println!("do new job3")); g5 ~' F; A3 Q: x
p.execute(|| println!("do new job4"));+ ~2 k1 s( Y& D" ~, a+ x B8 W
}
' w6 s# U* `7 a* R: t8 b* j}# C4 C& D% S' Y* w9 a
</code></pre>
5 w% r% p+ ]9 B1 w) U: |( H/ @
* J3 P; K) a1 W |
|