|
|
+ f+ c4 n* N ~1 P0 F, t( {' L<h1 id="如何实现一个线程池">如何实现一个线程池</h1>8 O% m6 r% c- X" j
<p>线程池:一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。 例如,对于计算密集型任务,线程数一般取cpu数量+2比较合适,线程数过多会导致额外的线程切换开销。</p>
( [/ W# I# Z3 q3 H/ Y% R. v6 M1 ^9 a* G<p>如何定义线程池Pool呢,首先最大线程数量肯定要作为线程池的一个属性,并且在new Pool时创建指定的线程。</p>
+ ?( R1 Z! `: i1 {. m& v6 e<p>线程池Pool</p>
0 i+ l% K3 W% z- A: C3 ~<pre><code>pub struct Pool {' R* Q, m$ H& b7 l( z- ]
max_workers: usize, // 定义最大线程数6 @; j1 W* O& j/ F6 S; `) }5 A
}% I, e u1 V+ r: c8 B
: W; d0 s. D% M% X8 u+ yimpl Pool {
8 ]" }0 r9 _+ F" C- x fn new(max_workers: usize) -> Pool {}1 ?1 t" T1 f$ x5 z7 B5 Y# P1 b
fn execute<F>(&self, f:F) where F: FnOnce() + 'static + Send {}: P+ \# C9 k$ A) O+ ?8 ?5 K/ p
}
# b" C7 b3 V! ]8 ]& D$ B% `5 U9 c6 a& w( u" `% j2 e+ j9 q9 x
</code></pre>
z8 ]: Z5 I \) X1 ]( j. D2 I<p>用<code>execute</code>来执行任务,<code>F: FnOnce() + 'static + Send</code> 是使用thread::spawn线程执行需要满足的trait, 代表F是一个能在线程里执行的闭包函数。</p>
`( ?/ `( s+ e+ Y<p>另一点自然而然会想到在Pool添加一个线程数组, 这个线程数组就是用来执行任务的。比如<code>Vec<Thread></code> balabala。这里的线程是活的,是一个个不断接受任务然后执行的实体。<br>
$ l' ]9 k: T! |1 x: U& K8 O5 K) _可以看作在一个线程里不断执行获取任务并执行的Worker。</p>" ]2 _" q* y, g7 G3 B2 y
<pre><code>struct Worker where0 S: d! A% Q+ S5 q0 d0 |7 l" R; k* ^% A
{6 O! k3 R6 T3 o) Z" l1 J% o4 Q
_id: usize, // worker 编号. f& E7 n" U. ]& Y
}, R' A# H6 c$ v: W! V
</code></pre>2 C; I" `7 X/ `+ ]3 Z* e
<p>要怎么把任务发送给Worker执行呢?mpsc(multi producer single consumer) 多生产者单消费者可以满足我们的需求,<code>let (tx, rx) = mpsc::channel()</code> 可以获取到一对发送端和接收端。<br>
5 x0 F* R1 ~0 }$ l把发送端添加到Pool里面,把接收端添加到Worker里面。Pool通过channel将任务发送给多个worker消费执行。</p>
: b) H! s5 A6 E, f' X<p><strong>这里有一点需要特别注意,channel的接收端receiver需要安全的在多个线程间共享</strong>,因此需要用<code>Arc<Mutex::<T>></code>来包裹起来,也就是用锁来解决并发冲突。</p>; {% o! p" k. F9 c6 F/ h
<p>Pool的完整定义</p>6 `5 [; u, I# U1 N% R
<pre><code>pub struct Pool {
0 U: a" [" h' c+ ` workers: Vec<Worker>,
7 L& E. y9 \9 k5 O* z& Y max_workers: usize,* A1 a9 c5 H. i
sender: mpsc::Sender<Message>2 t2 C5 b6 A) n' v5 t. R
}
+ g' E7 O! E: G3 t; h</code></pre>) p0 D& k1 q" S9 t
<p>该是时候定义我们要发给Worker的消息Message了<br>& o$ Y( s5 b2 D9 \/ X. F! z4 Y$ Z
定义如下的枚举值</p>% ~6 J6 Y5 l- S( H7 `& d
<pre><code>type Job = Box<dyn FnOnce() + 'static + Send>;
7 x% r7 p/ E1 `enum Message {
" l0 a4 z' {" R ByeBye,/ K# f. f" |6 X! P
NewJob(Job),9 O% O H# }- f
}
2 E# q7 F" [) W6 M( U# d1 R |</code></pre>
/ P( o! S4 X" G<p>Job是一个要发送给Worker执行的闭包函数,这里ByeBye用来通知Worker可以终止当前的执行,退出线程。</p>
1 y* H3 J, A* {: w8 M4 |' a<p>只剩下实现Worker和Pool的具体逻辑了。</p>9 p, M; i( H& {( U/ e! Y7 e: S
<p>Worker的实现</p>( w+ X/ E7 _6 ^$ t
<pre><code>impl Worker
8 s o& @! S# L* X {# b: G{
4 ]" t0 m1 |' G0 E) Q& C7 p fn new(id: usize, receiver: Arc::<Mutex<mpsc::Receiver<Message>>>) -> Worker {
, B. V' ]# G. i2 {8 E$ Y3 C let t = thread::spawn( move || {$ D3 w$ V. I6 r$ T6 c/ S4 P
loop {
. _" y& b1 y( F* d; W7 h let receiver = receiver.lock().unwrap();
3 T9 {4 P7 ?, j/ q let message= receiver.recv().unwrap();4 K: c/ j: l1 k. E
match message {
/ n" T( z3 f: q Message::NewJob(job) => {# e- [- k) \7 M5 [9 g
println!("do job from worker[{}]", id);9 r" b3 `. @1 b
job();9 k* I6 h5 u: a6 I8 L0 W
},
2 V/ G8 j; g. l1 |' t1 \ Message::ByeBye => {
& ^0 ?$ V+ \6 d. P println!("ByeBye from worker[{}]", id);
( B! V5 H8 i% e break3 d' H3 `( K! o5 M4 B; _2 S+ \( Q
},
1 q' g$ @. t6 E9 b5 g: N } 4 X5 l3 @( ?8 X A6 ~9 n' Y$ {
}
+ ~4 C- w, Y2 ?1 @( P: d1 q });
4 U( {6 K1 K5 t+ d
, [ N# }- z3 _: y Worker {
3 j$ |; q; e$ ? `5 l _id: id,
, q, V. b h5 n. u$ w4 C4 w% v t: Some(t), Y. _+ {% c" ?: i0 \% S
}
% k5 l* E- a, I0 U5 o }
$ ]# A) i" Y- S0 t}
* H& n( z* f0 L; j& K- c- W \+ [</code></pre>
+ t; H# S/ ]: F<p><strong>let message = receiver.lock().unwrap().recv().unwrap();</strong> 这里获取锁后从receiver获取到消息体,然后let message结束后rust的生命周期会自动释放掉锁。<br>- G( t- W' O2 t
但如果写成</p>
! V; J& y& q% a<pre><code>while let message = receiver.lock().unwrap().recv().unwrap() {3 y p& L! Q$ Q2 E9 F9 c; L, J
};
& W3 t; j8 T) C2 R3 _! Z& e</code></pre>
& L2 u* Q5 M" B* c( x<p>while let 后面整个括号都是一个作用域,要在这个作用域结束后,锁才会释放,比上面let message要锁定久时间。<br>
- V) e2 t% ] M3 d: U! {rust的mutex锁没有对应的unlock方法,由mutex的生命周期管理。</p>
U+ I% C* c6 b/ @<p>我们给Pool实现<code>Drop</code> trait, 让Pool被销毁时,自动暂停掉worker线程的执行。</p>/ v$ f( g. k( U) J
<pre><code>impl Drop for Pool {+ E# k. B9 r% ?' k3 @# e( s
fn drop(&mut self) {
9 O" Y. V. d' W for _ in 0..self.max_workers {5 e, v2 O& g* b2 w- Z. A# v
self.sender.send(Message::ByeBye).unwrap();8 y( w7 S& o" l8 g
}
# f. }$ n+ q$ m9 _( O2 A) a for w in self.workers.iter_mut() {
1 p7 W( a* Z& J5 Y9 P5 K if let Some(t) = w.t.take() {8 p3 W( o. i: b+ Q! [* z
t.join().unwrap();" C1 H6 `8 g/ ?! c1 _
}5 ?. {) u' M d3 H* O1 V; j8 Q
}/ p) q7 _$ x0 r6 p& h s1 s
}
5 `+ \2 l. S9 U. @}
7 _5 _1 H; S9 a: A7 n6 R' ~* k
- v/ R$ ?( b W</code></pre>
: \* y- R& o7 L* ]* C<p><strong>drop方法里面用了两个循环</strong>,而不是在一个循环里做完两件事?</p>" j( v2 T5 L0 D0 M7 v1 M
<pre><code>for w in self.workers.iter_mut() {
# q+ B* p5 q" } if let Some(t) = w.t.take() {6 n, o2 p) c; i5 {* i
self.sender.send(Message::ByeBye).unwrap();8 E% m3 B6 V% P8 O) O6 R* C1 ]" k
t.join().unwrap();
( T- b' _6 ?6 h. {5 {4 Z }
3 x1 p$ {; ^2 \: }9 x& |% h}
, k% }! n& Z. W D& P4 P& Q
6 S/ ^' N6 c# j. V- V) V7 H7 a+ _9 X</code></pre>
- r* t% s( }4 ]$ u) r, }. u0 L<p>这里面隐藏了一个会造成死锁的陷阱,比如两个Worker, 在单个循环里面迭代所有Worker,再将终止信息发送给通道后,直接调用join,<br>: U, d1 \( ]5 i# f) h
我们预期是第一个worker要收到消息,并且等他执行完。当情况可能是第二个worker获取到了消息,第一个worker没有获取到,那接下来的join就会阻塞造成死锁。</p>+ n/ X6 V9 w$ ]* Q3 _6 W$ h
<p><strong>注意到没有,Worker是被包装在Option内的</strong>,这里有两个点需要注意</p>
- h5 F( j( R4 _% G5 W8 G<ol>
8 Q. a& B2 z, I! T3 P$ ~" K7 ]<li>t.join 需要持有t的所有权</li>
( ]( Y' o8 E& b% l& G) M<li>在我们这种情况下,self.workers只能作为引用被for循环迭代。</li>
( y+ {: L, H7 m" D</ol>
4 B3 v( B( y/ y<p>这里考虑让Worker持有<code>Option<JoinHandle<()>></code>,后续可以通过在Option上调用take方法将Some变体的值移出来,并在原来的位置留下None变体。<br>) X5 y# `3 a! w3 i" n; P1 x
换而言之,让运行中的worker持有Some的变体,清理worker时,可以使用None替换掉Some,从而让Worker失去可以运行的线程</p>) w0 r; E$ C' } P; `% q
<pre><code>struct Worker where- u& n! W3 v9 U' Q1 u7 j' W
{! T; A6 {) Y4 h% }
_id: usize,& B/ i7 U8 P& C; U2 k, h `7 u
t: Option<JoinHandle<()>>,0 q; r1 |, @7 b; ]' N% x
}
' c* M2 M8 s; y5 ?# d. Y</code></pre>0 b' H* a$ @+ y$ Q; ~# b7 M" {: ~
<h1 id="要点总结">要点总结</h1>
& }8 c5 A/ H+ M0 d/ @<ul>
8 k) A5 o* O7 ^' j# t<li>Mutex依赖于生命周期管理锁的释放,使用的时候需要注意是否逾期持有锁</li>. A. {2 w9 `0 J9 Z. E& r
<li><code>Vec<Option<T>></code> 可以解决某些情况下需要T所有权的场景</li>
7 K& r- e! D* S+ `</ul> ~4 o( a7 j. X! s" Y$ r
<h1 id="完整代码">完整代码</h1>& }, K9 b3 R/ U ^ i
<pre><code>use std::thread::{self, JoinHandle};/ Y" u1 Q- A) I: q
use std::sync::{Arc, mpsc, Mutex};
1 Z$ M: z1 g) m" D. J8 V
& x) f2 B+ m( Z; v7 f/ f
& i* v4 ?( h# P' B, ?type Job = Box<dyn FnOnce() + 'static + Send>;
" g: x2 s8 x$ p& Y4 s' Renum Message {% T' o3 S0 B* C, ?$ j( Z8 F% a
ByeBye,2 c5 \" o s g$ @( M
NewJob(Job),
2 S3 ~$ O; e* H5 O/ b}
. l W2 V2 ~7 c4 ^* |$ G1 Y; q
) Q/ F) V: t& {( m" ^: jstruct Worker where6 T# O; @8 q3 X) P9 A
{
3 j; n0 y! V" z3 @/ H5 E8 q _id: usize,
) ]) ^3 m. h% U t: Option<JoinHandle<()>>,
3 d& \: s: A, Q9 t# T3 q0 K: K}
% w( |5 m' V! E& ~9 Q! Z, s/ [ I1 M
impl Worker
0 e: @3 U! z* N. X3 b. c{& G* K# m4 N' Y1 d8 |4 E0 ^6 B
fn new(id: usize, receiver: Arc::<Mutex<mpsc::Receiver<Message>>>) -> Worker {. F1 L; b2 K; _, {1 S
let t = thread::spawn( move || {
4 `! Z* p4 P4 b4 ~ loop {
: J7 a2 v3 ~3 C4 [ let message = receiver.lock().unwrap().recv().unwrap();
0 ^8 m. H6 v8 u4 a2 Q match message {
, A2 y9 {3 h, Z t& L# v4 _: _ Message::NewJob(job) => {1 O; c6 @4 q. x* |! ]( Y( o
println!("do job from worker[{}]", id);
' L2 w Q6 ?6 c! `/ K/ l; Y job();% j' [0 J1 S( K
},
6 v+ {7 l, m( t+ H9 O Message::ByeBye => {
: }8 ^+ t; b' L. r) m% N/ M println!("ByeBye from worker[{}]", id);
2 T+ Y [+ `6 B) e/ ]8 l- m' c break( w! ~1 n- c4 |1 M* H
},
1 H6 k2 U% H# m& E6 l }
- Z/ @1 A$ d: S H. ^. r }: R5 ^9 w, _7 G6 A
});
3 ?: J4 w7 R! c. ?3 p% u; M1 \$ w
: k/ u$ S% v$ y1 ~# E Worker {
: n. Y1 |. B5 J X! B8 M _id: id,+ e- o2 D" L0 q6 M7 |) e
t: Some(t),
5 E. g4 C( m) L) Y3 E }9 B7 R; J q# z T4 C4 ~* h. b0 b
}+ _6 D/ C3 ~% G, H7 W3 r) l
}
) c; U5 {3 ^, A: I6 d% i6 b, o- @$ T
5 }. e4 |1 B! @ f# }pub struct Pool {
9 B* k: Z7 l6 @" g; V workers: Vec<Worker>,
/ f6 Y8 U1 t+ ?, r max_workers: usize,: X: W* x: c7 O/ y v
sender: mpsc::Sender<Message> g! r2 [& F) E3 z* C/ W* |
}
/ P; l0 J8 Q$ @% D9 [7 d' L Y% x5 ~! p+ j6 t# L
impl Pool where {
" ?# `9 O5 ~! A. C _- @ pub fn new(max_workers: usize) -> Pool {& x0 J- S$ T" } J
if max_workers == 0 {
* d# ]" m0 P/ m L panic!("max_workers must be greater than zero!")
5 N" R5 w: R0 B H# W }
$ a# W2 F9 X/ O4 ^; u R3 Z! o let (tx, rx) = mpsc::channel();: g$ T/ f/ I- M0 X
7 f* N8 o$ A) Z, [8 T; d0 I: v+ ?
let mut workers = Vec::with_capacity(max_workers);$ R7 L6 J- |- R" _2 G/ f" ^8 X K$ [
let receiver = Arc::new(Mutex::new(rx)); B5 p7 A$ c: [; A
for i in 0..max_workers {
" w$ |' N q, ]0 | workers.push(Worker::new(i, Arc::clone(&receiver)));: P2 w% i$ y2 w* Y) C1 d" d7 [
}" ?3 a' y- L2 K, z( U: _
, B5 U' }+ f/ i6 {' b+ Q& U Pool { workers: workers, max_workers: max_workers, sender: tx }- T' L' J! k8 x# z0 j9 L; B
}; m9 E& W. Z9 i$ y
9 ]* L; _" N* W( W* X
pub fn execute<F>(&self, f:F) where F: FnOnce() + 'static + Send
3 }6 F, s2 X4 A" V8 [ {% m: F: g' Z- w9 g+ n$ R. G
7 V6 ~& u5 W+ _; A1 r# J8 j- F let job = Message::NewJob(Box::new(f));" D0 v$ I3 V. S! G) S1 ^( D
self.sender.send(job).unwrap();
6 R3 X3 {+ Q1 |2 K: y, N% s }6 [/ c* d4 M0 l$ S% V
}
- |" r# V% z4 ^7 a5 e
* E; H4 w; b! y/ {8 t7 ^ Dimpl Drop for Pool {
: }! T2 i2 }2 x% W, @ fn drop(&mut self) {" ?; v( v2 ^# L+ G: i
for _ in 0..self.max_workers {% u/ E7 |# H6 l
self.sender.send(Message::ByeBye).unwrap();1 P- F7 _* p7 L1 G1 {( a0 e
}
$ B/ V& b: I7 e( X- q for w in self.workers {! `- C. F7 E+ R! e# r: T5 F. \; e3 v
if let Some(t) = w.t.take() {# a% p `! t! l# |7 B
t.join().unwrap();# \6 C3 s7 C: k0 l
}
$ Y5 _. y: x& H/ ~0 S8 ` }! C* h6 s/ d O2 v$ m- X7 I, [
}
& t. X8 c! x9 S0 n}6 Y! ^( `( F5 }
/ b& X& d' }& C5 V
- ?' D0 X" v) R& Z! ]. o3 D9 f1 i& _#[cfg(test)], E" \6 ?% K, A6 L* l7 e! p! m
mod tests {
& F* a" ^9 A$ G use super::*;
% ~4 S8 n4 h, d! K1 D" ^% B& q #[test]) `9 y+ d! k K2 v9 H2 r( |( Y
fn it_works() {2 l6 n3 i' w: \0 V
let p = Pool::new(4);
$ |" l3 X& h- w8 T5 [/ D7 ?8 f p.execute(|| println!("do new job1"));
/ i. M& H$ x: v& P, G. Q i: f p.execute(|| println!("do new job2")); g) n8 X U1 k% w n0 U! a
p.execute(|| println!("do new job3"));- r4 S- l/ {7 s6 m9 n) M$ y V
p.execute(|| println!("do new job4"));
, E0 g, i1 K$ u& d9 z5 T& z }
2 s% [$ ^7 f/ x% _}0 ?* @& t' Q' C2 F* B
</code></pre># E( o0 U+ b4 _" k
2 F( y, d! S6 n0 V9 o# e7 J$ X
|
|