|
|
0 s) ~, Y3 j9 [0 ]8 A' X- B<h1 id="如何实现一个线程池">如何实现一个线程池</h1>% b7 t( q. W6 c" _6 K
<p>线程池:一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。 例如,对于计算密集型任务,线程数一般取cpu数量+2比较合适,线程数过多会导致额外的线程切换开销。</p>
; ]/ R; _/ Q9 n1 H: [, @<p>如何定义线程池Pool呢,首先最大线程数量肯定要作为线程池的一个属性,并且在new Pool时创建指定的线程。</p>: m; [9 \' _% }7 G: }9 \* Y0 b
<p>线程池Pool</p>
0 u5 ~* w; q2 ^0 M$ I4 I# j4 ~<pre><code>pub struct Pool {4 {0 I( L* A, i, |6 A* L: d
max_workers: usize, // 定义最大线程数 g& |5 ^+ q2 T& O
}, [5 [- w6 I3 {' Q
6 H$ O2 a' i' t7 D
impl Pool {
9 [1 m3 l( w* i, T0 T- m Q fn new(max_workers: usize) -> Pool {}
! X* C; K0 V: B t" l fn execute<F>(&self, f:F) where F: FnOnce() + 'static + Send {}7 y+ d& e$ f( U( o7 g# l, I) ~ \
}
5 ^2 ^( }9 k Q: j8 A/ m; E1 m2 H$ x: c$ W9 ?) T
</code></pre>
4 F% W& M( Y0 E" }<p>用<code>execute</code>来执行任务,<code>F: FnOnce() + 'static + Send</code> 是使用thread::spawn线程执行需要满足的trait, 代表F是一个能在线程里执行的闭包函数。</p>
8 }+ n$ o- d' [4 _0 \/ b) ?( d* M) L<p>另一点自然而然会想到在Pool添加一个线程数组, 这个线程数组就是用来执行任务的。比如<code>Vec<Thread></code> balabala。这里的线程是活的,是一个个不断接受任务然后执行的实体。<br>
- B# R8 S" n; }0 N' e. p& }4 K可以看作在一个线程里不断执行获取任务并执行的Worker。</p>" u& D F1 s% V: G5 |% P7 J; R
<pre><code>struct Worker where
) w0 _& \% X0 X' b{
1 a; x# Z) S2 `$ z! v- v w _id: usize, // worker 编号( S- a/ R$ B/ o
}4 ?2 C% r$ y" ]/ {
</code></pre>+ a- E1 K, Z* I7 k% C
<p>要怎么把任务发送给Worker执行呢?mpsc(multi producer single consumer) 多生产者单消费者可以满足我们的需求,<code>let (tx, rx) = mpsc::channel()</code> 可以获取到一对发送端和接收端。<br>6 |, s6 q, M5 n8 h
把发送端添加到Pool里面,把接收端添加到Worker里面。Pool通过channel将任务发送给多个worker消费执行。</p>
: G: ~& K9 ^9 x: B! T* Q<p><strong>这里有一点需要特别注意,channel的接收端receiver需要安全的在多个线程间共享</strong>,因此需要用<code>Arc<Mutex::<T>></code>来包裹起来,也就是用锁来解决并发冲突。</p>
0 y; G' r* ~3 M# X<p>Pool的完整定义</p>
* \6 w( U0 Y- Q0 I<pre><code>pub struct Pool {
3 b# x$ ?5 W3 B, f/ N" f) N workers: Vec<Worker>,$ V- a- Y7 k4 z: M, e/ h1 x* N1 q
max_workers: usize,* t, H+ y: E; P" q
sender: mpsc::Sender<Message>
" M, Q; _; C5 X/ H/ p! `}9 y6 S& ?5 n/ t# N" i
</code></pre>
. a' Q7 Z# x: u9 n<p>该是时候定义我们要发给Worker的消息Message了<br>
# O6 F0 m/ v" D* T: Q定义如下的枚举值</p>
$ v9 C9 s& e# o u( z5 O6 @<pre><code>type Job = Box<dyn FnOnce() + 'static + Send>;. t2 S! s7 b x3 I; z% ]9 O1 ]
enum Message {8 w8 p$ l: p4 z& C8 j4 }. r( h
ByeBye,8 J% v5 @3 P3 ?1 S$ Z: Q3 C
NewJob(Job),! r- E2 L1 `5 r9 A" n
}
# {4 q- p* e+ F) U! m$ `2 d</code></pre>
3 c9 C- V1 `* t! u8 k<p>Job是一个要发送给Worker执行的闭包函数,这里ByeBye用来通知Worker可以终止当前的执行,退出线程。</p>
* f# w9 K8 x# S) l: I<p>只剩下实现Worker和Pool的具体逻辑了。</p>0 i) ]9 P$ `; Z c
<p>Worker的实现</p>* i7 B9 e8 ]; ?( ]5 C
<pre><code>impl Worker* F/ k& S* ^. u( Q4 t3 y# }$ ~- m
{% o; S* O5 T0 w% d3 {1 ]: F
fn new(id: usize, receiver: Arc::<Mutex<mpsc::Receiver<Message>>>) -> Worker {5 P3 d+ {# \2 h
let t = thread::spawn( move || {1 M. ~' }; H* V0 A; B& w' n
loop {
5 t5 c; v. |5 U0 r let receiver = receiver.lock().unwrap();5 t. [( [7 {( U7 d
let message= receiver.recv().unwrap();7 L5 p/ ^- D% m) q! v$ m5 _; |
match message {; B$ p. B8 p3 `0 |- T @
Message::NewJob(job) => {7 i' H9 j; L, o# [# l5 E- I I
println!("do job from worker[{}]", id);8 f# g7 D! \4 W8 F' o" _0 m
job();' r8 D& g. ~3 C: M8 H/ b# |! N4 B( J
},' Q6 `4 I6 Q# w4 t6 a- e
Message::ByeBye => {
9 ^) E6 X% o( ^% i println!("ByeBye from worker[{}]", id);3 f7 N" P: H# J/ O' l5 N5 a+ [
break
5 p y$ h' o2 A4 Z5 }! e; D( w },
$ x0 c& @% l6 \& B5 b% v } ( A% S/ \+ m5 t3 F
}. p1 o0 B. o. k& I+ y0 [! w
});
( h, c$ E, a& ~, S J/ p/ A9 J" \6 i, K+ o" O. U3 n
Worker {
! ~0 |: F1 @* ]4 t `* \ _id: id,
+ u$ p k7 G" y* H. H u t: Some(t),
& c' {* {% P3 m7 @ }) {* r: F2 R; F2 Q( G5 _
}
: N7 C Z% t# l: p}
! }1 I9 k+ c! ?3 c0 W</code></pre># E2 _! d# K/ l0 v: W
<p><strong>let message = receiver.lock().unwrap().recv().unwrap();</strong> 这里获取锁后从receiver获取到消息体,然后let message结束后rust的生命周期会自动释放掉锁。<br>
/ y. n* v% H( \. K& ~7 F: c! P但如果写成</p>
9 `( _' G9 a4 h<pre><code>while let message = receiver.lock().unwrap().recv().unwrap() {
; Q5 n& q0 S) B3 N};! K! _+ r* L7 ?
</code></pre>7 U4 q, Q; q5 W
<p>while let 后面整个括号都是一个作用域,要在这个作用域结束后,锁才会释放,比上面let message要锁定久时间。<br>7 |7 U: R1 i& |) {$ b7 p
rust的mutex锁没有对应的unlock方法,由mutex的生命周期管理。</p># y3 N* r, R r j8 v# m2 v
<p>我们给Pool实现<code>Drop</code> trait, 让Pool被销毁时,自动暂停掉worker线程的执行。</p>" G ?& H s, T5 L) l* z
<pre><code>impl Drop for Pool {* J+ X- I. d2 J. i
fn drop(&mut self) {; D$ m; O2 I: ^! a
for _ in 0..self.max_workers {
, v. s9 Z: ]7 w$ \' H8 E# W& p5 ^' [+ H self.sender.send(Message::ByeBye).unwrap();8 K4 W; B3 y @; j- G7 {% g
}
. T4 |/ U+ ^" X5 Q8 P for w in self.workers.iter_mut() {
9 A# B7 K$ t/ `$ C: Y9 _ if let Some(t) = w.t.take() {
$ @+ \/ w( g5 E( h2 y t.join().unwrap();
* Z1 q- [1 }& D1 G+ x0 [7 C4 n2 n9 t) { }' t4 l7 k; A4 x
}
( _7 _( @8 f4 X5 P4 O }1 F0 h, C0 W. H) _4 x/ a' p4 w
}
: E4 T$ m f6 c8 A; P
4 }; s4 s3 c4 a; ^</code></pre>5 m9 W" @7 s0 h' o0 y
<p><strong>drop方法里面用了两个循环</strong>,而不是在一个循环里做完两件事?</p>
% h# l$ H& I% C% l. F1 h<pre><code>for w in self.workers.iter_mut() {; v) G: D5 z2 ^2 i7 R2 m
if let Some(t) = w.t.take() {2 B E# @# z$ M4 ~- u) S! w
self.sender.send(Message::ByeBye).unwrap();
" j) C" W" ^9 I5 U9 h& J t.join().unwrap();
' w/ M$ `3 I0 v5 t+ r/ J, _ }! `! l( g$ f( R+ B. w
}5 s( g, C8 h5 M' i+ [/ y
e n1 {% t5 @& h/ ^</code></pre>& ?5 C6 H0 V% s
<p>这里面隐藏了一个会造成死锁的陷阱,比如两个Worker, 在单个循环里面迭代所有Worker,再将终止信息发送给通道后,直接调用join,<br>
4 V J* ?4 j% j: i我们预期是第一个worker要收到消息,并且等他执行完。当情况可能是第二个worker获取到了消息,第一个worker没有获取到,那接下来的join就会阻塞造成死锁。</p>
! T8 c( R( `7 g" v<p><strong>注意到没有,Worker是被包装在Option内的</strong>,这里有两个点需要注意</p>+ v8 L; y* g9 }* z% S5 X5 f
<ol>
7 h' [3 u9 o& {! j<li>t.join 需要持有t的所有权</li>! Q4 a8 d! s* |1 V9 C0 J# }+ K
<li>在我们这种情况下,self.workers只能作为引用被for循环迭代。</li>" c& e: z, V* ]5 _
</ol>
# t! ~! ?3 C# l) t) _, @<p>这里考虑让Worker持有<code>Option<JoinHandle<()>></code>,后续可以通过在Option上调用take方法将Some变体的值移出来,并在原来的位置留下None变体。<br>
$ ~$ R& S& Q+ r* |+ U换而言之,让运行中的worker持有Some的变体,清理worker时,可以使用None替换掉Some,从而让Worker失去可以运行的线程</p>$ W& k: L5 h# R
<pre><code>struct Worker where
$ g+ {2 h, R( O, u/ U{
* T$ y1 f" C& D, p5 v _id: usize, c( C' Y7 Z/ a, ?, \- v6 `/ d
t: Option<JoinHandle<()>>,( y* ?1 S% w) j+ S. ~" i, J: f
}4 x2 `6 N8 e2 v
</code></pre>
3 L0 S# E6 k2 S; {0 I+ G `<h1 id="要点总结">要点总结</h1>7 ~; U; f: R; ]* C% k- A6 L0 W
<ul>
2 J7 M# S; ^8 j' G0 Y6 C0 R, `<li>Mutex依赖于生命周期管理锁的释放,使用的时候需要注意是否逾期持有锁</li>& x* a' \% ?7 b) e! S+ ?& @
<li><code>Vec<Option<T>></code> 可以解决某些情况下需要T所有权的场景</li>0 V; f& ?4 o: }" H0 C
</ul>: V* L0 L6 Y# v+ J
<h1 id="完整代码">完整代码</h1>2 s, @2 J. Y) Q# K% ?1 C
<pre><code>use std::thread::{self, JoinHandle};
+ {+ J2 ]6 u0 G0 G5 Q. nuse std::sync::{Arc, mpsc, Mutex};
7 R3 e0 \7 u1 }- {- v0 a% J, I' V8 j* P
% D/ I. v$ n6 c& \3 w/ \8 X
type Job = Box<dyn FnOnce() + 'static + Send>;) H( `3 _ E' x! h5 G: x: x- Q
enum Message {- b# a6 \" n7 z% Q2 ]
ByeBye,( G* f$ |8 ]7 {: y) ]7 o% R# ]# u! p
NewJob(Job),
. _6 q+ \9 l2 w& U+ g3 `5 }} ?% F H' t8 g5 m: I- x- W: s/ R
5 g% z( d+ B: P; ^+ M2 h* {8 Y
struct Worker where. o8 \8 |# M: ?. \4 D/ z6 [+ I
{0 C" m, ? R# ], K2 r2 A& ~6 _, O
_id: usize,
5 r, f, k0 C6 V6 @2 d: C t: Option<JoinHandle<()>>,4 R+ T( X! P' A
}
1 a. u; l N* e( g; q" u' K" u; h$ m) ~/ @9 A. T" X: {
impl Worker
9 Y' H5 a4 ^+ W& c B D# @; c{+ q# F7 `9 }3 y6 |7 ~7 s
fn new(id: usize, receiver: Arc::<Mutex<mpsc::Receiver<Message>>>) -> Worker {
1 q8 {, r" d2 y" g; { let t = thread::spawn( move || {# m j( A6 z# w7 b$ r9 Q
loop {3 }* A# l8 g, m6 w3 M
let message = receiver.lock().unwrap().recv().unwrap();4 H. K( L1 i1 j8 N
match message {
! W: `/ @4 a2 ]( J* {* ~, c Message::NewJob(job) => {# A0 w J3 G6 s) W& c1 a
println!("do job from worker[{}]", id);
* N# M3 H0 L" N( \ job();
, r( C( F3 V* D; b# j/ j },
( ~9 i+ }! l$ T: t$ Y* x Message::ByeBye => {$ l U" W- y) D
println!("ByeBye from worker[{}]", id);) C: H9 @7 Q T! b+ p% H8 ^
break
) s0 g6 }$ n% d* d: q# q },' _& G& F; d+ _; ?' {4 d U, a6 v
} 2 \& `+ R; c g2 X& q0 s a
}
" K2 I/ u2 l: g- U5 Y- A9 R* W2 o });
- g( M1 C9 k; d! b- o- P2 X3 e# F. o( R$ F3 c
Worker {
9 ]* E0 @; a' d* E- c* s _id: id,& b9 W% p9 j2 b [' g; G: p4 p
t: Some(t),
- _" b x( ~+ w4 }. @ }
- H( G/ _; R, ?4 c. D }2 ?7 u- e: V4 D/ `" |& u4 T! f) N* c
}
; R& e# o2 x, j; ]" g7 W! {8 ~- {7 S% _& e% s4 |/ f
pub struct Pool {% y5 @9 M4 u# P0 `
workers: Vec<Worker>,6 ~: j& F& I" L7 Q( N- W0 c6 `
max_workers: usize,
2 t1 [' |2 H4 O' l( b7 | sender: mpsc::Sender<Message>
q6 m7 R, P Q) g}% f: t9 ?% }# U4 R: w% Q% T: x
! ~% h2 a: B* T* C5 y; D- K5 V
impl Pool where {
) q3 t3 r t- R) V# m pub fn new(max_workers: usize) -> Pool {
, A5 M$ J# x$ Y- Z& O/ n if max_workers == 0 {
7 O9 \2 `9 N* v panic!("max_workers must be greater than zero!")
3 d& T* y3 j! d, [ }& O6 |3 G& U# j; g
let (tx, rx) = mpsc::channel();
6 B+ ?4 v5 _0 W, I
. Q8 U+ J! |4 V let mut workers = Vec::with_capacity(max_workers);
0 Y9 K3 v, f* ]# g let receiver = Arc::new(Mutex::new(rx));4 l9 i3 I4 H& `: m5 v
for i in 0..max_workers {$ R3 ] D& E6 b% J& ?5 H; X
workers.push(Worker::new(i, Arc::clone(&receiver)));
! k# |- a& G. S4 R; h' I9 } }& }; _1 G- b% c- j+ U# { p
; w4 h* r7 v( Q9 D S7 d Pool { workers: workers, max_workers: max_workers, sender: tx }
6 W, E9 p/ Q t v1 M }
! J) x$ ^ N* p# d + h/ h( ]) K% A; d$ p# b
pub fn execute<F>(&self, f:F) where F: FnOnce() + 'static + Send
# w) W' | y. q* ~7 `3 x2 ^ {
8 o$ @6 [% J5 ]' `
8 t) |. R% J- [9 C% ? let job = Message::NewJob(Box::new(f));
, y2 c, g) G2 ]- q7 s! i2 N self.sender.send(job).unwrap();3 B( t0 P& h& C5 D: E' K# K
}4 a: s" Z4 n6 T; X, V* O
}) @1 F7 i4 V& y, q4 e+ H. [
( W. \2 B+ t0 J2 U9 U4 H: h
impl Drop for Pool {" h+ {2 A [+ Z$ S
fn drop(&mut self) {
; G9 O& v0 f+ Q4 e, M for _ in 0..self.max_workers {! c/ H# ~ @2 T. M
self.sender.send(Message::ByeBye).unwrap();
! E- w# i2 y) @( S }2 U' J% W( z# a+ W0 J
for w in self.workers {
8 o0 Y( r$ ^; w( T" W. @ if let Some(t) = w.t.take() {
3 d: Q, o {1 [! c t.join().unwrap();( @ m1 g4 g( _, t
}
" {+ m; r! L( c+ j% Y5 w) C* w0 H) Z }
* Z' N/ H7 q; N3 T9 } }
+ `" {* U; v) [0 p5 k" } o! v}* p# r# E1 L+ `9 ]' J1 Q
6 H8 N2 Z0 I& ]7 x
* m- S1 I( u+ g" u; x8 }#[cfg(test)]
\" |2 J# U/ @) M& h" N4 T$ Zmod tests {4 l: Y$ |4 `+ y- F+ R, ~& Y; r' i
use super::*;% ?0 i) ?6 ]- x4 S: g& b/ |1 q
#[test]
8 v1 w8 ?+ k1 ?5 G2 ~3 N' J3 M fn it_works() { G8 o$ O. z& r, [2 R6 s
let p = Pool::new(4);
& k* E! _5 Z* c! t( U) x p.execute(|| println!("do new job1"));
3 r7 Q+ @( ^& Y3 e ~8 p" d3 B9 M+ c p.execute(|| println!("do new job2"));
* p6 w9 N( M5 ^4 X \' r p.execute(|| println!("do new job3"));
2 _4 y6 S# s1 A7 r p.execute(|| println!("do new job4"));
9 H: b- ~& N6 r2 u6 Z2 A3 Y+ o4 ? }
; S* \5 s4 Y- n! z1 C& Z5 B+ c}0 P6 N5 X7 D4 z9 M
</code></pre>* t4 U! ]) f0 d; U( M
7 v/ S. x7 m8 l) _
|
|