|
|
5 X7 m" ]7 `! m! \- e
<h1 id="如何实现一个线程池">如何实现一个线程池</h1>* J% o0 V5 @/ D" y
<p>线程池:一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。 例如,对于计算密集型任务,线程数一般取cpu数量+2比较合适,线程数过多会导致额外的线程切换开销。</p>/ {3 p8 s& S- ]4 u1 |
<p>如何定义线程池Pool呢,首先最大线程数量肯定要作为线程池的一个属性,并且在new Pool时创建指定的线程。</p>9 G$ {1 [5 S4 ?: h
<p>线程池Pool</p>" _% t/ e# c. B: ~
<pre><code>pub struct Pool {
4 y- g+ x' k& J% A max_workers: usize, // 定义最大线程数* X+ I- K0 e2 n! f2 z5 O9 k; k2 \
}
R' T, [3 \" s# c: b! ~5 Z- W x
4 D: V q/ B; ^0 T. ?8 }impl Pool {+ I. W. G9 E/ b# @: T6 z
fn new(max_workers: usize) -> Pool {}
Q9 ^! S4 |6 n$ E1 F fn execute<F>(&self, f:F) where F: FnOnce() + 'static + Send {}
- k- j$ G* z# l+ u* b. a2 K' `( w}# [' P0 ^3 P' G7 r+ E( v. z
2 u# w S4 v: J/ Z6 F2 ?& v</code></pre># p) @7 u# H5 D/ J
<p>用<code>execute</code>来执行任务,<code>F: FnOnce() + 'static + Send</code> 是使用thread::spawn线程执行需要满足的trait, 代表F是一个能在线程里执行的闭包函数。</p>
7 O, z# M1 D7 R* _4 y3 u<p>另一点自然而然会想到在Pool添加一个线程数组, 这个线程数组就是用来执行任务的。比如<code>Vec<Thread></code> balabala。这里的线程是活的,是一个个不断接受任务然后执行的实体。<br>
' Q k1 E5 G! _可以看作在一个线程里不断执行获取任务并执行的Worker。</p>
% X2 j6 y5 ?: _; ~3 u; z<pre><code>struct Worker where& @5 t: b0 O1 g% M$ Z% m0 {; d
{0 Y0 h; b$ }. i8 M& v" J, x% B [
_id: usize, // worker 编号
8 |; E; d& Y4 _}
( @. i. E6 L c</code></pre>
/ A' v( m; b, W9 T$ Z<p>要怎么把任务发送给Worker执行呢?mpsc(multi producer single consumer) 多生产者单消费者可以满足我们的需求,<code>let (tx, rx) = mpsc::channel()</code> 可以获取到一对发送端和接收端。<br>4 @: h/ f3 Q/ [" N6 R8 g0 P
把发送端添加到Pool里面,把接收端添加到Worker里面。Pool通过channel将任务发送给多个worker消费执行。</p>/ j8 m) q2 A) }( Z' P: A
<p><strong>这里有一点需要特别注意,channel的接收端receiver需要安全的在多个线程间共享</strong>,因此需要用<code>Arc<Mutex::<T>></code>来包裹起来,也就是用锁来解决并发冲突。</p>9 p, b' }- y( a9 f
<p>Pool的完整定义</p>
& d6 f3 c2 ~! s6 }$ v<pre><code>pub struct Pool {5 K- R! W$ z: |$ Y& e$ P+ M3 m7 U
workers: Vec<Worker>,
* X3 U- R% C3 \/ g5 J1 ] max_workers: usize,/ m6 H9 h- ?7 I0 D1 t& O
sender: mpsc::Sender<Message>
0 G2 ^( U( Z- X, O3 h U}' h% ]7 v8 r. A: t. L8 }8 O5 W$ ]. I0 K
</code></pre>
/ k: W" j0 ^. d4 I9 u" B) ? C2 m<p>该是时候定义我们要发给Worker的消息Message了<br>/ d- p6 @# z+ _0 b. S4 ?* j
定义如下的枚举值</p>
9 }- V5 K# v, j0 y" I6 F! b<pre><code>type Job = Box<dyn FnOnce() + 'static + Send>;
3 o0 Y" z! i. j [, E% `7 ]enum Message {. v# I- _( t) I; E4 ?
ByeBye," D# F2 o( j1 @$ @' v
NewJob(Job),
) S, k$ ^4 s# W8 A$ e, F- y& U. z8 d}
( p( ^$ R8 g- B+ Z</code></pre> ^8 e$ w& n' K# i; M
<p>Job是一个要发送给Worker执行的闭包函数,这里ByeBye用来通知Worker可以终止当前的执行,退出线程。</p>
( C5 Q6 ]2 M% D9 R# `( }# Y6 b<p>只剩下实现Worker和Pool的具体逻辑了。</p>) ~. P5 ~, J8 m- e- b
<p>Worker的实现</p>
' m- c& ?$ F# r( l1 R) W: ?9 y/ n<pre><code>impl Worker, S& t& L/ I) n1 X1 Q' X9 B
{8 g6 a( `& J, [5 L$ e8 J
fn new(id: usize, receiver: Arc::<Mutex<mpsc::Receiver<Message>>>) -> Worker {
" O# P& w: W$ }% M$ `2 _# n let t = thread::spawn( move || {
. V9 C( Z! w" P1 W8 {# E; J# D$ { loop {
3 v8 b3 [/ O" h. d# D let receiver = receiver.lock().unwrap();4 w4 `. p8 h. M' q; r2 d
let message= receiver.recv().unwrap();
4 d/ z4 z* E/ U: ]3 D6 w match message {
8 U9 ^- F3 x/ \: a' S" L) s3 }' ? Message::NewJob(job) => { W8 u1 i; k, l, e4 ~% _. {9 y% l1 \
println!("do job from worker[{}]", id);
& z7 b4 w8 f' y$ |) G job();7 ~+ d4 U" i8 ~2 C" ^0 T
},
/ s7 i# E8 x Z' M% c Message::ByeBye => {* h) d/ g |7 x" L$ A* |0 w* n3 S- v
println!("ByeBye from worker[{}]", id);
8 a4 ?6 i' ?4 j* W" k4 g break
2 u8 u! A% ~2 c3 m: z4 t },
9 H" S1 ~2 a2 U }
7 ^+ S) P4 [) y1 d, w }% _1 @6 T$ \' p7 k# @* e) j9 v3 ^
});* [2 f, J0 h. W/ L
2 Y& T% F7 H4 O0 |* |/ R
Worker {
6 T7 N& S: r9 ~6 g2 [ _id: id,
6 u) ~( e$ s) J4 m) X t: Some(t),
; l* f: E+ W% R4 ` }7 p% O5 ]+ E x5 k7 @" i1 l
}
* {/ h4 e8 [. U7 q' H {}7 h% A& |8 |- S! ^) g+ U
</code></pre>
5 k0 N7 |5 k5 V; [<p><strong>let message = receiver.lock().unwrap().recv().unwrap();</strong> 这里获取锁后从receiver获取到消息体,然后let message结束后rust的生命周期会自动释放掉锁。<br>) D/ ^3 j% Z( \+ c3 M
但如果写成</p>6 ~) @. V, }1 z y5 ]
<pre><code>while let message = receiver.lock().unwrap().recv().unwrap() {
z2 E) j& h' n) l- z};: u$ G7 i/ {% S1 \
</code></pre>
& T* m0 \ k1 t<p>while let 后面整个括号都是一个作用域,要在这个作用域结束后,锁才会释放,比上面let message要锁定久时间。<br>, u- F3 c- Q9 U- P
rust的mutex锁没有对应的unlock方法,由mutex的生命周期管理。</p>
9 P% f! l, u/ N, v' E; n<p>我们给Pool实现<code>Drop</code> trait, 让Pool被销毁时,自动暂停掉worker线程的执行。</p>
7 k% F9 o5 e0 H<pre><code>impl Drop for Pool {8 n: k$ Z2 v9 |+ z
fn drop(&mut self) {
3 _1 R7 b2 n8 I h" C for _ in 0..self.max_workers {! X8 T& I" ~8 M' P6 `0 ~
self.sender.send(Message::ByeBye).unwrap();% r7 H* N3 i# t8 L0 C6 g; G1 u
}
% e \' U. m8 e* e/ _# d# k" B for w in self.workers.iter_mut() {, ~5 J+ P' K8 }' b- U0 T+ t3 d
if let Some(t) = w.t.take() {
- b0 R8 v) E7 I t.join().unwrap();9 w! v' ~$ Z" x! P+ R* a' g
}
/ w6 h) Y5 S, W0 B2 {# l5 W }1 y4 o9 n% o! D- U- A* F) J
}: o3 i, X( x8 Y) T; _; E/ p
}7 w6 @. O3 w: o* b! |
2 a$ r* J- X: l; b9 B o& C$ k
</code></pre>
6 Q, X2 K. o* N8 P: R<p><strong>drop方法里面用了两个循环</strong>,而不是在一个循环里做完两件事?</p>4 [5 I) }7 m" k+ i& W4 b3 Z( l
<pre><code>for w in self.workers.iter_mut() {
) F c: c' D: A& n7 X$ \1 V/ g; n if let Some(t) = w.t.take() {
/ p$ f+ T# E3 C- o4 `8 E self.sender.send(Message::ByeBye).unwrap();6 P1 y2 J9 E6 l4 Q/ g
t.join().unwrap();
- p% r& E! m' T* Z9 ]) |, a }
& g" {- K+ o; a' \3 {8 B, W4 ^}$ X0 d+ Q2 G9 H
3 W* [) y% C9 n' u) s0 T</code></pre>
1 t! n# y5 |/ y% D4 s& z6 A<p>这里面隐藏了一个会造成死锁的陷阱,比如两个Worker, 在单个循环里面迭代所有Worker,再将终止信息发送给通道后,直接调用join,<br>
8 C% x# |4 }5 J; `我们预期是第一个worker要收到消息,并且等他执行完。当情况可能是第二个worker获取到了消息,第一个worker没有获取到,那接下来的join就会阻塞造成死锁。</p>; e( e! t- {1 {0 k
<p><strong>注意到没有,Worker是被包装在Option内的</strong>,这里有两个点需要注意</p>
0 }1 V& H( K% P- h0 e& N8 d( k<ol>
, s3 z) F8 T, L2 R<li>t.join 需要持有t的所有权</li>
" X4 k$ U5 Z5 a<li>在我们这种情况下,self.workers只能作为引用被for循环迭代。</li>
2 _' f0 R, J: c) R. I</ol>+ s; z" y7 e: K3 @- T! f: i
<p>这里考虑让Worker持有<code>Option<JoinHandle<()>></code>,后续可以通过在Option上调用take方法将Some变体的值移出来,并在原来的位置留下None变体。<br>5 |; F4 K% r5 v2 l. }2 z4 z5 A$ e
换而言之,让运行中的worker持有Some的变体,清理worker时,可以使用None替换掉Some,从而让Worker失去可以运行的线程</p>
) ^, [& k. A. x! p; }9 X5 a& {7 b<pre><code>struct Worker where8 ^/ c; G- E: ]' ?8 H
{6 w& [! v) r$ d* Q k
_id: usize,+ o) B; N3 d! l& N6 O
t: Option<JoinHandle<()>>,
5 z# R7 e" V( w8 `4 b' g, N0 v) C) X}
- g1 t: L% ? N4 j; Q</code></pre>4 t- Q$ e; ]8 ?' H( `
<h1 id="要点总结">要点总结</h1> `, j: s* W+ \, y( P" F
<ul>" e. Q$ x ^1 A3 u' v
<li>Mutex依赖于生命周期管理锁的释放,使用的时候需要注意是否逾期持有锁</li>4 N7 j+ l Z! p$ N5 @
<li><code>Vec<Option<T>></code> 可以解决某些情况下需要T所有权的场景</li>. ?7 r, k$ d$ R( h1 T4 Y
</ul>
+ y' T' }& _/ q* m$ C<h1 id="完整代码">完整代码</h1>
1 j9 ]8 a( F2 c, e2 \" q<pre><code>use std::thread::{self, JoinHandle};% h$ o5 A. X! g g. \
use std::sync::{Arc, mpsc, Mutex};5 c3 X$ F7 W O( } O' F
4 |: Y' k% U; P; t( I6 p, l
. m! F( W8 s( z; T
type Job = Box<dyn FnOnce() + 'static + Send>;' z2 i5 Y/ V: F! K& X. M4 `% V
enum Message {
5 ?; w$ i- o" Q+ Y; t ByeBye,3 z, ]5 e* c f* g' p! o6 S1 x0 a
NewJob(Job),7 j4 k) R( d* s _/ B1 `
}
. Z9 F" ^% Z2 k
- e- o( B, E1 b) M. M/ rstruct Worker where
0 f1 K3 g K3 J{) ?7 y+ i# {$ p: W! R% m) h
_id: usize,6 J: S3 o* [! k3 Q: h
t: Option<JoinHandle<()>>,
# r1 H8 T/ O/ l: I! H, h* T2 L S}
, G q4 C' S; }% Y" ~ H' s8 O: p! g0 D% G. |/ P
impl Worker% }2 ~! K$ O4 c: l
{
+ z/ D0 o+ s |! F4 w/ L) j fn new(id: usize, receiver: Arc::<Mutex<mpsc::Receiver<Message>>>) -> Worker {
' h$ l) Z X p# | let t = thread::spawn( move || {5 ]" J+ z1 o* k9 f. }
loop {( I2 z2 o; d. n: i
let message = receiver.lock().unwrap().recv().unwrap();
7 C$ b( x& {% O6 J1 u" J1 ?: k match message {
7 N" n/ n* c+ w1 s0 `8 y" q Message::NewJob(job) => {
: ^6 Y$ N# J$ u+ i7 c" g println!("do job from worker[{}]", id);+ V! K$ m0 O- \# M9 m4 k
job();
1 }3 n& S1 W0 g! E$ d. @6 P. G },
8 v% b* \5 r2 } Message::ByeBye => {: h, [1 {$ R$ S. X2 Y A: a
println!("ByeBye from worker[{}]", id);
, h% f4 ]* S+ ~5 m. ^9 k break
. z; n% X, J( W8 a- N2 |7 O },7 {$ P/ N& d& ?& _! ]
}
2 K6 l, W; i- t" H Y; \% ^) S }
7 ^! v! K4 T% @/ L& N* G2 e) `5 i });' A. m0 ]( p3 [. s! D7 l
% Y; v1 E, @6 J
Worker {1 ~: G/ ~) q" `; L
_id: id,
2 o5 l% R+ ^5 i, P$ Z8 a t: Some(t),
0 g% \ H8 ?3 T& V- B: r }2 S1 v; h) O$ S# J4 L
}
6 ?: f/ _/ a' _! Z}3 J8 C# l9 s" J+ _! r4 g
! h2 P5 z$ r8 n+ y5 ^9 Z" Ipub struct Pool {" X# F9 {( o4 t ?- C. w
workers: Vec<Worker>,
/ P# L* L. q5 t2 l6 G9 Y max_workers: usize,; s! k( F1 }* u% J- f# Y( L
sender: mpsc::Sender<Message>; ~! [ V$ Z/ A) j% m, j
}! K$ T# O ?8 X4 I6 |$ c
6 y# W* i' j; q; K. _2 Zimpl Pool where {
0 C. }! i, d, C q+ m5 i6 v) h pub fn new(max_workers: usize) -> Pool {
. Y/ U& a! y+ ]( C) H9 J* _ if max_workers == 0 {' [0 U S$ H* c) E
panic!("max_workers must be greater than zero!")8 K* X" i4 H7 i' B/ B! O8 J# O* U
}* h4 L* C$ R% B; Z3 i1 y: h
let (tx, rx) = mpsc::channel();: ] b l9 T; t+ R! \7 R4 K9 H$ i3 ~4 n
( q$ m' u9 b! M. G, Z
let mut workers = Vec::with_capacity(max_workers);" H8 `& e3 a8 L& }& R8 V
let receiver = Arc::new(Mutex::new(rx));1 s& v2 B; k% J# k
for i in 0..max_workers {$ Q$ _: L0 m6 `
workers.push(Worker::new(i, Arc::clone(&receiver)));
& J; m* h O3 m# v; ]) R, `" V }, _8 A m5 M* A; b8 |
' I3 m; L) v: Q/ C4 Y& J' P
Pool { workers: workers, max_workers: max_workers, sender: tx }) d& @3 [$ o/ _% c% q" a
}
* @. W& m8 V2 a$ s5 f( V + m, Q& `3 ~7 x. W' e( k- x
pub fn execute<F>(&self, f:F) where F: FnOnce() + 'static + Send
- R- N( A, J3 @/ w! Y* }* y* A {- [0 {% K4 i6 j6 [8 T
+ _/ B3 ~2 o1 g# i0 E/ H let job = Message::NewJob(Box::new(f));
0 S) p# P# k' o: K) ]+ t self.sender.send(job).unwrap();
8 ~4 l- d" A, O' B! p$ e }
& ~) W" S6 T( w$ W- B! @}
. R, p. |* [( E! o+ e" W2 H
# w: c- ^3 @2 x* C/ b; S8 {* Qimpl Drop for Pool {
+ y' m8 j7 b, p2 a! q0 e4 P fn drop(&mut self) {
: }# `" c9 E& C1 h for _ in 0..self.max_workers {/ ~8 x- k: `5 B2 Q: A
self.sender.send(Message::ByeBye).unwrap();
2 n" l V4 L" h/ y5 I$ m }, J8 G& q; A$ u
for w in self.workers {
9 e" g# K x1 G( p! q if let Some(t) = w.t.take() {
9 I2 o4 v% B- n1 ^5 P t.join().unwrap();0 z: b T* ~9 u) v( V
} c' N4 ~/ k) h, d q) t
}" i! t+ c0 A) L7 m
}5 @, L" q7 a; \. X/ [
}
5 U- u7 l3 `% y- w' R4 c( {% D. E% H3 E
/ W! Z# p- t/ C#[cfg(test)]! c- v0 U6 Q6 a% y( w e
mod tests {
' {3 Z0 |) y# _, k+ p* c7 P use super::*;
5 D' H. R" V2 }0 G( i; W0 |4 m #[test] Y# I+ N& x# ]# F$ i
fn it_works() {7 R6 _7 D/ U& ]4 b
let p = Pool::new(4);
- h( [- Y4 Q. y$ j5 C p.execute(|| println!("do new job1"));
8 i1 w. ~9 E# M9 @0 o p.execute(|| println!("do new job2"));
4 [8 A/ w9 M a/ C9 @2 v p.execute(|| println!("do new job3"));
. A; I5 A7 |5 f* b) _ p.execute(|| println!("do new job4"));7 J8 U( x8 n# S! H8 n! e
}& N' G$ ?8 P/ x
}9 G0 {5 \1 w4 i# g& k
</code></pre>
: m8 q. O2 v- U! P5 f" Q8 W4 q- w% K0 [* z/ E7 f+ i
|
|