|
|
& _5 N5 f6 h# } B2 M- n<h1 id="如何实现一个线程池">如何实现一个线程池</h1>7 j- x& b3 O* W. j0 t+ {7 w8 \% w
<p>线程池:一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。 例如,对于计算密集型任务,线程数一般取cpu数量+2比较合适,线程数过多会导致额外的线程切换开销。</p>
( K9 Q. Z. J' _+ i$ h<p>如何定义线程池Pool呢,首先最大线程数量肯定要作为线程池的一个属性,并且在new Pool时创建指定的线程。</p>9 c0 Q2 I. `0 Z+ D
<p>线程池Pool</p>
; G# v& _/ J1 d$ ?$ m: C O<pre><code>pub struct Pool {
+ H* [5 E+ `9 ^& m+ O+ _' c1 l max_workers: usize, // 定义最大线程数9 x& n7 I, [4 B3 v8 U
}2 B& L0 `& [4 s$ z+ z
: b8 M4 t6 a% y- ^' q5 Dimpl Pool {. q" v/ C& ^* Q" B9 h: v
fn new(max_workers: usize) -> Pool {}1 q4 Y3 l3 a- \
fn execute<F>(&self, f:F) where F: FnOnce() + 'static + Send {}
$ x$ g! h$ F) {) o& G8 c4 j}
0 @* z* r, K% e! s1 ^$ y' F/ `2 |0 N# o( T( g: E& L" M5 g3 Y, }
</code></pre>) g, X/ N0 F* y0 I
<p>用<code>execute</code>来执行任务,<code>F: FnOnce() + 'static + Send</code> 是使用thread::spawn线程执行需要满足的trait, 代表F是一个能在线程里执行的闭包函数。</p>3 v, H G0 e: R( h
<p>另一点自然而然会想到在Pool添加一个线程数组, 这个线程数组就是用来执行任务的。比如<code>Vec<Thread></code> balabala。这里的线程是活的,是一个个不断接受任务然后执行的实体。<br>
, i) F0 z9 b l可以看作在一个线程里不断执行获取任务并执行的Worker。</p> R6 o+ P. ?% N, J b& |8 Y
<pre><code>struct Worker where+ m# Z$ t. ?1 ~3 t( [/ C0 O
{- w$ n! ~* Z# O" n$ q" t
_id: usize, // worker 编号
% c3 _) \- O, p( @% e7 k7 _}
) U9 a+ D3 b3 J6 o9 A</code></pre>8 x4 X" i$ a9 {/ }
<p>要怎么把任务发送给Worker执行呢?mpsc(multi producer single consumer) 多生产者单消费者可以满足我们的需求,<code>let (tx, rx) = mpsc::channel()</code> 可以获取到一对发送端和接收端。<br>& k3 z) C2 V" T( W
把发送端添加到Pool里面,把接收端添加到Worker里面。Pool通过channel将任务发送给多个worker消费执行。</p>
( t% G8 S/ |0 c% h4 G<p><strong>这里有一点需要特别注意,channel的接收端receiver需要安全的在多个线程间共享</strong>,因此需要用<code>Arc<Mutex::<T>></code>来包裹起来,也就是用锁来解决并发冲突。</p>
8 K, R+ B4 i% P: m# O<p>Pool的完整定义</p>
6 `# v: V" r" Q' G7 U<pre><code>pub struct Pool {& g P3 q/ w/ x. j7 T% b% ^
workers: Vec<Worker>,
3 L8 `' T: U4 | max_workers: usize,
" e, C' y! a$ L6 w( M9 z/ ?6 v sender: mpsc::Sender<Message>: x. N A3 G c4 l1 V X8 l
}
4 q! ^% s' f* W) H' E</code></pre>' \1 A2 U! Y6 W6 U+ c
<p>该是时候定义我们要发给Worker的消息Message了<br>0 {- j4 P, _( q9 e$ X
定义如下的枚举值</p>9 {" d% H1 I3 v8 g
<pre><code>type Job = Box<dyn FnOnce() + 'static + Send>;) i% K4 A" m- a2 ^' B
enum Message {1 R4 z6 J0 n" O, u' c' I4 Y- ?
ByeBye,, M a* D/ K# W
NewJob(Job),) T0 J: \& v8 _0 j, d @9 b9 _+ z
}! o: O' r6 k, U% G% J6 s
</code></pre>5 [5 F! [) A; h- y
<p>Job是一个要发送给Worker执行的闭包函数,这里ByeBye用来通知Worker可以终止当前的执行,退出线程。</p>
* j7 j; @; U" |: n, O6 F<p>只剩下实现Worker和Pool的具体逻辑了。</p>
" F+ p( S. L! e# m4 H. y9 b<p>Worker的实现</p>
# r- Z- T5 _0 n2 y<pre><code>impl Worker- Z5 F& m* q% a& c- p+ F
{+ Y) B2 s1 x/ L" _
fn new(id: usize, receiver: Arc::<Mutex<mpsc::Receiver<Message>>>) -> Worker {' n2 Z& e! n& x' G$ a
let t = thread::spawn( move || {2 E" _2 [8 n; I/ l5 `
loop {
, x# Y8 K5 H7 P( L let receiver = receiver.lock().unwrap();$ r+ I9 e2 P" ~
let message= receiver.recv().unwrap();
9 f, l2 l4 x8 t! n. a# @& @* n match message {
9 j9 m1 G1 G$ ^' a: k5 C Message::NewJob(job) => {* V+ ]6 T/ w2 Q7 n+ F" B
println!("do job from worker[{}]", id);# P" u' _2 p: z7 v: N
job();
, U. z) J+ L5 a5 D1 l4 l" p" U },
% I/ I" S. X& }* L Message::ByeBye => {8 J6 \% R6 E, @) z/ z, M+ Y
println!("ByeBye from worker[{}]", id);
& A+ S9 E1 r* A break
- l0 W- @- m, k( W {" O },) V" S2 J. K5 l F
} ( [0 `9 q9 n4 O2 g. c; V# u4 T
}/ u4 y+ D8 U7 G5 h
});4 N# v% B, g ` ^, D% F
0 a8 ]& c9 _& }2 { Worker {6 k9 K# x- {0 G$ k9 ^4 q
_id: id,/ {. H& g/ M Z5 Q4 |5 H2 L) y
t: Some(t),! r1 J1 w6 p7 k( Y, p1 x3 z, O: F
}6 I- o2 A- J- {9 o$ t4 ]# l
}
. H4 z7 S$ o& r}2 C9 N$ c+ L9 I3 }6 s9 |
</code></pre>
- b( O" L+ }) _; C8 m* W. @<p><strong>let message = receiver.lock().unwrap().recv().unwrap();</strong> 这里获取锁后从receiver获取到消息体,然后let message结束后rust的生命周期会自动释放掉锁。<br>
9 g% _3 \* P5 C6 T5 v但如果写成</p>' _3 @* h1 U- v" b! _+ Z
<pre><code>while let message = receiver.lock().unwrap().recv().unwrap() {/ f# W6 z: D8 d& N* D
};
6 K7 U/ R( S8 p</code></pre>
* P- W( T0 m. `8 E& Z! @7 j<p>while let 后面整个括号都是一个作用域,要在这个作用域结束后,锁才会释放,比上面let message要锁定久时间。<br>) ^1 ^ w$ d8 V4 c2 @% D9 N
rust的mutex锁没有对应的unlock方法,由mutex的生命周期管理。</p>/ [- z! i6 V: m
<p>我们给Pool实现<code>Drop</code> trait, 让Pool被销毁时,自动暂停掉worker线程的执行。</p>
) }4 Y6 U8 L" H<pre><code>impl Drop for Pool {
7 v! d. G- w' h/ I) I* T5 U- q fn drop(&mut self) {8 i3 @7 _' {; {% ^
for _ in 0..self.max_workers {3 |) k* r/ a$ T: ? ?# k0 v/ p
self.sender.send(Message::ByeBye).unwrap();
0 K' r. R0 \5 g" n4 W }5 {3 R% H1 L0 z! |8 U+ U8 z
for w in self.workers.iter_mut() {4 d# a$ E3 I) P6 o4 |3 U3 W
if let Some(t) = w.t.take() {
q4 }5 H' R' b2 H3 @ t.join().unwrap();
8 L6 ~' c2 K+ g( s0 T3 l }( y7 E3 P. x& M% |
}4 }' Y' z3 ?) R) K
}5 w# _- q; C- u9 T
}7 H5 Q9 W$ p1 s3 F8 N7 i0 p
8 I% @( d7 p% X; O+ Q) j$ H' `</code></pre>
3 K% O4 r0 K! s0 q9 j<p><strong>drop方法里面用了两个循环</strong>,而不是在一个循环里做完两件事?</p>, q; }' `( m& M* N8 Z
<pre><code>for w in self.workers.iter_mut() {
) f+ o4 d8 z3 Y! d O3 } if let Some(t) = w.t.take() {
- k5 j! M# ~1 l& [3 l w' p. n, s# d self.sender.send(Message::ByeBye).unwrap();
% B' E/ T! C& z, u t.join().unwrap();
A" h1 k0 k5 E$ p9 d3 r) t% S8 J/ q }5 R! B3 V V! h+ X$ W
}
0 [7 k* Q9 T+ M: u6 }9 e6 Y, \0 ]
# X; j9 p7 P) b9 L: ~( }+ T$ W</code></pre>
" x* p ^- M5 V" ^& V<p>这里面隐藏了一个会造成死锁的陷阱,比如两个Worker, 在单个循环里面迭代所有Worker,再将终止信息发送给通道后,直接调用join,<br>) J, H! w3 l2 T' w+ Z& P
我们预期是第一个worker要收到消息,并且等他执行完。当情况可能是第二个worker获取到了消息,第一个worker没有获取到,那接下来的join就会阻塞造成死锁。</p>
- @# j5 y T5 X- f4 c6 C<p><strong>注意到没有,Worker是被包装在Option内的</strong>,这里有两个点需要注意</p>
9 z* U( y: G- |" K- b9 h! {<ol>5 q' a# b; Q" A H7 ?7 {
<li>t.join 需要持有t的所有权</li>, z- D- o( O2 x: ^; I; j
<li>在我们这种情况下,self.workers只能作为引用被for循环迭代。</li>9 ]: t* h/ A5 D6 P* V1 a+ h
</ol>0 w+ @* {& H u+ ~) t2 U' x
<p>这里考虑让Worker持有<code>Option<JoinHandle<()>></code>,后续可以通过在Option上调用take方法将Some变体的值移出来,并在原来的位置留下None变体。<br>6 `0 x, x5 G5 q
换而言之,让运行中的worker持有Some的变体,清理worker时,可以使用None替换掉Some,从而让Worker失去可以运行的线程</p>. e4 i+ a$ P; s3 d
<pre><code>struct Worker where
4 M' ]4 c! T% b( A+ T! |! i$ [{+ h5 g" t L8 V% n
_id: usize,
, Y( i7 y" C) u t: Option<JoinHandle<()>>,
( {% C9 O4 e! J: G2 G}2 c$ ^) E4 j# ^5 B/ D
</code></pre>7 m' V9 L5 A5 L1 O
<h1 id="要点总结">要点总结</h1>( z: d3 r% n+ o5 U
<ul>
% |# b+ ?( b V5 `4 I<li>Mutex依赖于生命周期管理锁的释放,使用的时候需要注意是否逾期持有锁</li>
h1 c5 ^5 n' p% h<li><code>Vec<Option<T>></code> 可以解决某些情况下需要T所有权的场景</li>3 z% x! c4 D% x) k, j+ M; a
</ul>
`9 J. [$ k1 m3 i<h1 id="完整代码">完整代码</h1>7 a5 C4 J o. m# k
<pre><code>use std::thread::{self, JoinHandle};& Q0 d5 h5 V" S4 z2 y: g
use std::sync::{Arc, mpsc, Mutex};
4 V4 ?. n0 ]# X( Y" c) k
2 M; V6 v) ^: d3 ?* H; w9 {2 `; t8 E7 F. {8 o5 A
type Job = Box<dyn FnOnce() + 'static + Send>;5 y2 S( P% Z$ j" v( x5 y
enum Message {
+ S/ ^0 M/ s s" z# w ByeBye,: P# {/ a8 s, G9 ~, J
NewJob(Job),
5 \1 @1 Q, `' u3 C( h}# e2 h7 |9 j$ D; ~+ B1 @
7 d- I8 u0 E2 |6 Vstruct Worker where2 e3 ?$ [- @! p, H2 A% t' a
{3 ]2 d. X* |7 s f6 w& {
_id: usize,2 O7 ~& D( n; J# G- R+ G+ I
t: Option<JoinHandle<()>>,
2 i( l$ o4 N0 C7 k9 [}, F. R: a0 r3 C8 V( S/ G
D; N" D" k8 O
impl Worker9 U* \& o4 Q3 d; Q+ X7 [; r: i
{
. C8 A, K1 F" Z, I$ S/ y. P fn new(id: usize, receiver: Arc::<Mutex<mpsc::Receiver<Message>>>) -> Worker {
. d) P# i$ o, t let t = thread::spawn( move || {! u- k9 t) ~$ J3 R; G' ` h
loop {4 T: {* [0 @* y( F) L
let message = receiver.lock().unwrap().recv().unwrap();
# ~* R" d1 Q: N+ }+ F match message {% |( s; \. X6 t7 A ?. T! T/ P. r) g
Message::NewJob(job) => {
w# j3 {9 E2 P( Y println!("do job from worker[{}]", id);6 l& w4 `: {% \: p/ L
job();
9 }6 N) o# M- q3 D% J9 r },
, Y3 Y6 m5 ]5 [4 i Message::ByeBye => {
) Y' L% X* i: }- s5 ?6 B println!("ByeBye from worker[{}]", id);! k" a2 s$ q' e& j* H
break. @3 V9 Q8 J& g+ {0 [
},
/ O% o( E- ^5 _ }
p$ X! n2 T& r1 I& u/ z5 I8 Y8 }/ n4 H }6 q. m) R% C" H
});
! k9 G' G' b* P0 M9 x* q) h) h4 x6 |, l* F( P" t
Worker {5 y8 s5 V( Z5 s& j$ N
_id: id,5 t2 C; L& z' k6 M' k
t: Some(t),; i0 T: X. A/ t; K4 J/ \
}
) V) z5 Y5 _# e L, O$ f }3 C- ?- I3 j+ p% R# _
}
& z8 M7 r5 V. E' K+ N, X- X/ }6 D) {7 E8 H m* C3 Y+ R
pub struct Pool {2 H$ T& ^2 U' b
workers: Vec<Worker>,6 G- `) w1 l3 _. @$ I+ r/ | Y
max_workers: usize,
; ]: U% @& h# ?/ ]* e sender: mpsc::Sender<Message>
6 a# i8 `; n/ C/ P# A}9 C! v/ t1 a" o1 A$ S
& M3 q6 H4 R) P5 E+ w
impl Pool where {
* x9 Y1 w @' g9 F7 h, E- c- [ U: u* g pub fn new(max_workers: usize) -> Pool {5 `9 I- B a% o$ ]9 b9 \# U
if max_workers == 0 {4 b- H( t5 r3 G' A" o
panic!("max_workers must be greater than zero!")
m" H6 K- u5 J3 O& T& t! e8 h( o }
# H/ h$ ?3 S9 [( m" x. l, d let (tx, rx) = mpsc::channel();
8 J% ^0 `4 T' S+ L5 }1 E0 j+ s/ A Z7 m% C# v( Z
let mut workers = Vec::with_capacity(max_workers);
/ ]2 n+ y( y1 x let receiver = Arc::new(Mutex::new(rx));" K R/ {* l# v' E/ ?
for i in 0..max_workers {
( _$ _4 x. Y! |3 W+ K% u- @9 G; } workers.push(Worker::new(i, Arc::clone(&receiver)));
# K! ~- k* @; Q }
/ z' I- f S! `, }5 _) w, j3 V" c- @5 i' _9 J/ |+ l# `
Pool { workers: workers, max_workers: max_workers, sender: tx }" g) X9 U/ L: d+ Y
}
9 z6 T6 K1 |8 d/ g! U- Q
' Z! O9 K6 h. f9 U2 Q+ N. s pub fn execute<F>(&self, f:F) where F: FnOnce() + 'static + Send
1 |' y Q' f) g; U {$ @& u _0 Y3 g' Z1 g6 K
( l& {' [4 u, x6 A. `5 |% v* ^
let job = Message::NewJob(Box::new(f));
8 p8 m b9 M d, R self.sender.send(job).unwrap();8 S$ p" r p- ?6 l/ ~; j
}
& Z& T0 t; g# M: u6 e$ d( M, H}, }; i+ T" Y5 a
. T! X8 k# b2 Gimpl Drop for Pool {
* |# P, J* t8 U% A4 X2 a6 q fn drop(&mut self) {+ T3 o* s: A& q. X
for _ in 0..self.max_workers {& }/ T1 R: W6 d% e% K w
self.sender.send(Message::ByeBye).unwrap();5 B4 Y3 Y, \# V' c; m- y" [' j+ ?9 A
}
6 w5 T% l/ _$ |8 x' U for w in self.workers {- z1 V( @. y1 x6 c; B/ x- E
if let Some(t) = w.t.take() {
' }) Y2 J3 `5 T% q t.join().unwrap();6 f) B5 e* m& ?5 b
}
, o' J: ?, I- {7 g7 t" T, w+ X }
* [4 B7 F$ C C- w. i) ~ }
$ k. [' l4 d6 a}
2 h/ X2 c/ ?1 K; [* y) z6 z& e) Y$ n8 u
* p! i2 j6 \, U& S7 Y! R
#[cfg(test)]
. m$ b. e, i( j& i# ~; gmod tests {" K* r7 _# W! d' f4 t
use super::*;& |5 s, k9 y; ~0 a: n5 ]3 }8 r: Q: Q
#[test]
5 J' M& ~3 O9 V# T fn it_works() {
4 b; `8 A" @% c% J" @ let p = Pool::new(4);; ^1 R4 ~, ]2 v1 {9 e; g0 z
p.execute(|| println!("do new job1"));
7 Q2 P0 Q5 |$ k" X. Q p.execute(|| println!("do new job2"));1 W6 V, q2 h. b z; s$ w) r
p.execute(|| println!("do new job3"));1 V! R+ H- B5 j* Q% N: U
p.execute(|| println!("do new job4"));
# x7 v, K; c( B }! m4 D% l$ `7 G, y3 ^
}
9 i+ n2 h- f. w6 A0 h4 J( i- Z- v</code></pre>
9 v9 T# W, o& M/ K7 S* n
! _% E3 y9 M' O: G7 d$ e5 ~ |
|