|
|
4 f; p& ~& ^6 C2 P- p4 j' N<h1 id="如何实现一个线程池">如何实现一个线程池</h1>' t! L A z8 i. C, f( q* C
<p>线程池:一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。 例如,对于计算密集型任务,线程数一般取cpu数量+2比较合适,线程数过多会导致额外的线程切换开销。</p>
' ?3 ~# R1 a) t! {; X! _5 W<p>如何定义线程池Pool呢,首先最大线程数量肯定要作为线程池的一个属性,并且在new Pool时创建指定的线程。</p>; u7 U' X- b k1 R) m
<p>线程池Pool</p>
5 l0 O( d6 d4 }! r3 d# u& P<pre><code>pub struct Pool {
0 b& o" _. D1 Y; e+ Z0 G max_workers: usize, // 定义最大线程数
& T. y- d0 h# ?6 m6 l}
6 w% v) c9 T5 T/ M' o
2 s w8 \. S7 ~7 ]9 r3 s& ~impl Pool {, ~' e( }* ?' r. I3 ?
fn new(max_workers: usize) -> Pool {}7 r& F5 Q7 O. B( s2 @
fn execute<F>(&self, f:F) where F: FnOnce() + 'static + Send {}
4 S0 O1 k# j4 g( ` k' p- p}
* m8 V4 m9 `5 [8 ?) t/ m* E" e4 C; c4 k0 f- I' K
</code></pre>
; e( {7 ?! S- K3 g# f+ P<p>用<code>execute</code>来执行任务,<code>F: FnOnce() + 'static + Send</code> 是使用thread::spawn线程执行需要满足的trait, 代表F是一个能在线程里执行的闭包函数。</p>
( A' M0 _/ ~& y0 F! q<p>另一点自然而然会想到在Pool添加一个线程数组, 这个线程数组就是用来执行任务的。比如<code>Vec<Thread></code> balabala。这里的线程是活的,是一个个不断接受任务然后执行的实体。<br>
/ ]1 x; c& L: |+ V9 S可以看作在一个线程里不断执行获取任务并执行的Worker。</p> W; N. q& h$ @* m9 C
<pre><code>struct Worker where
* v, W7 l4 W' d/ u2 \! t5 ]{
# L8 z! s6 n( ^( ?5 O _id: usize, // worker 编号( N! G0 k& B) _. d2 w8 X( h
}
2 i$ S8 l U& A) e</code></pre>
3 m8 A/ j' U# Z0 E. G<p>要怎么把任务发送给Worker执行呢?mpsc(multi producer single consumer) 多生产者单消费者可以满足我们的需求,<code>let (tx, rx) = mpsc::channel()</code> 可以获取到一对发送端和接收端。<br>
( m" c# ]/ e8 ^把发送端添加到Pool里面,把接收端添加到Worker里面。Pool通过channel将任务发送给多个worker消费执行。</p>% }' T5 V& y; a5 F: N( B
<p><strong>这里有一点需要特别注意,channel的接收端receiver需要安全的在多个线程间共享</strong>,因此需要用<code>Arc<Mutex::<T>></code>来包裹起来,也就是用锁来解决并发冲突。</p>0 N" ]' B+ M% v7 v# ~0 T
<p>Pool的完整定义</p>
* K+ a) J4 { W+ v<pre><code>pub struct Pool {
9 X! t) g" J$ G5 w, u& V# k9 {# b workers: Vec<Worker>,4 C% C* x0 d7 M2 p! \
max_workers: usize,3 j3 G8 W# N4 I5 R% s6 i, T
sender: mpsc::Sender<Message>
1 T1 A& |5 S+ Q& s+ u9 ^}8 m0 @$ P) M2 @* j
</code></pre>
4 _! P- ?3 }9 m0 Y/ O, ]<p>该是时候定义我们要发给Worker的消息Message了<br>
+ X7 w3 o! @) `! {- U D! H2 k1 N定义如下的枚举值</p>
2 q1 U- y2 k. m) A$ B6 {/ S<pre><code>type Job = Box<dyn FnOnce() + 'static + Send>;
! n M( p: J, L9 Z) M6 `& J9 J5 s6 Benum Message {
3 k/ W. f; A% a1 Y: {' z# p9 E ByeBye,
+ O+ o) s5 f8 m) o, R9 M( r NewJob(Job),# G& m- x. n2 ^9 G* _
}
# d1 g" N1 {; @5 {</code></pre>6 j" ^0 {: \% a- Q2 l h$ P* D& \9 @
<p>Job是一个要发送给Worker执行的闭包函数,这里ByeBye用来通知Worker可以终止当前的执行,退出线程。</p>
: }+ p- a: S9 [' O4 L: K<p>只剩下实现Worker和Pool的具体逻辑了。</p>
2 t' T& ? \2 W0 r4 D<p>Worker的实现</p>, q+ I& _& a! P9 C' T
<pre><code>impl Worker7 k- d/ |' e A
{4 N/ B, w' \ e' o# E
fn new(id: usize, receiver: Arc::<Mutex<mpsc::Receiver<Message>>>) -> Worker {* \8 j! _! f1 l6 c1 h
let t = thread::spawn( move || {
2 Z: i l0 U- i: H. B/ e9 F! d loop {
3 ^, G7 S! _* d" q. y let receiver = receiver.lock().unwrap();9 y D) J0 f/ K) {
let message= receiver.recv().unwrap();# F$ F4 ]6 A5 }( l- v+ n% |2 ]" Y) l
match message {- l+ m) J/ r5 e
Message::NewJob(job) => {$ A6 x" ]1 \% _" Q" p
println!("do job from worker[{}]", id);5 r' ]8 S1 W) q2 q
job();( F2 p( N/ l4 m. |
},& t1 k3 ?7 v# j& r
Message::ByeBye => {
% E T) I5 I% a, W7 ]% E& I println!("ByeBye from worker[{}]", id);
: o$ T8 O$ q' A0 K) e) A+ [/ a2 i break3 H' S8 Z3 z& V& n. G7 }
},
+ V1 Q5 o) I; y% U+ U } / `6 p& C( f0 W0 u9 m+ E
}
2 ?! ?8 i! x( @; H! T });
, e; w3 L. ~7 n$ ~- y- a6 y! l+ u
Worker {
- w) m9 G8 J! e. q( P, v _id: id,/ u$ i. {4 D; v1 R
t: Some(t),
6 z2 H4 D* A% H# j }
I" g% _3 k/ Z# M& N }
( d: s, M0 ^( x B}. _8 `2 b; `3 z j5 x
</code></pre>3 b0 k8 q9 q8 G6 e" k) G
<p><strong>let message = receiver.lock().unwrap().recv().unwrap();</strong> 这里获取锁后从receiver获取到消息体,然后let message结束后rust的生命周期会自动释放掉锁。<br>
2 @, E8 L, r0 V& y6 r7 V但如果写成</p>; g3 h& d6 O, S! v: ]; k
<pre><code>while let message = receiver.lock().unwrap().recv().unwrap() {
2 k. M1 k1 V( F! ?};
' G% Y( @7 u- }, r- {</code></pre>
6 \. L' X1 d3 b3 V; A7 T<p>while let 后面整个括号都是一个作用域,要在这个作用域结束后,锁才会释放,比上面let message要锁定久时间。<br>
' N0 E0 O ?; d: {' J/ M# Yrust的mutex锁没有对应的unlock方法,由mutex的生命周期管理。</p>
* b0 M1 W" a7 X, q. A<p>我们给Pool实现<code>Drop</code> trait, 让Pool被销毁时,自动暂停掉worker线程的执行。</p> ]' { Y4 y2 o! p# |
<pre><code>impl Drop for Pool {7 Q. F5 ~* L4 ]3 S0 m4 C7 Z* v
fn drop(&mut self) {+ W G2 }; D; @+ c
for _ in 0..self.max_workers {2 n- @' M3 G3 \1 \ L5 @% D6 |
self.sender.send(Message::ByeBye).unwrap();
: P3 V+ R) r! E+ D }6 z' n+ E8 s7 @6 R. y) ^ W/ {
for w in self.workers.iter_mut() {
+ ~5 o# \. H7 Z' g if let Some(t) = w.t.take() {) M, F9 w/ F6 o% x. E; K
t.join().unwrap();
5 P+ s8 d; N' b# _+ _ I+ { V }
9 `& @. a8 l) K }1 G5 R3 [# y& T v" q. C- x
}, E v( k$ |# C: {& L1 Q
}
& o, |7 Q$ B. `* p; i& I; v
2 N F- U9 G- Z5 M1 c) M</code></pre>% h# y4 E' n7 T% ~$ i% U
<p><strong>drop方法里面用了两个循环</strong>,而不是在一个循环里做完两件事?</p>9 K$ u" }& z2 x9 X
<pre><code>for w in self.workers.iter_mut() {
- t. H1 S. }: l$ n if let Some(t) = w.t.take() {# A9 e V& l. P* Z8 A
self.sender.send(Message::ByeBye).unwrap();
: W* n q4 @- ]6 a; ` t.join().unwrap();
$ g* i6 ` S. [! }: v% ]/ p" C }4 _, ]/ d1 |" u+ a
}3 F" G1 n5 V h
3 h" e% n& D( I2 s' {! [
</code></pre>
( |" h. p& u9 g% h. z9 O8 w1 m<p>这里面隐藏了一个会造成死锁的陷阱,比如两个Worker, 在单个循环里面迭代所有Worker,再将终止信息发送给通道后,直接调用join,<br>
" d6 O9 o) p& V2 V* D我们预期是第一个worker要收到消息,并且等他执行完。当情况可能是第二个worker获取到了消息,第一个worker没有获取到,那接下来的join就会阻塞造成死锁。</p>$ C/ L) b& w+ }+ i# l2 v5 Z4 }
<p><strong>注意到没有,Worker是被包装在Option内的</strong>,这里有两个点需要注意</p>6 u2 a: F7 `$ L6 E' k |5 U
<ol>
) ~8 P' m1 V1 V$ d3 d. Z& a<li>t.join 需要持有t的所有权</li>0 h( v3 t3 c5 U9 {7 m# R
<li>在我们这种情况下,self.workers只能作为引用被for循环迭代。</li>2 L" D# D o ?% V3 @ u
</ol>- f+ G; B' V' s& y0 i) M* z8 H
<p>这里考虑让Worker持有<code>Option<JoinHandle<()>></code>,后续可以通过在Option上调用take方法将Some变体的值移出来,并在原来的位置留下None变体。<br>1 V. ?( o* O' r! M
换而言之,让运行中的worker持有Some的变体,清理worker时,可以使用None替换掉Some,从而让Worker失去可以运行的线程</p>/ f# I$ S1 V+ \5 y; D, f
<pre><code>struct Worker where# J! @3 h+ T T# M$ s
{
! e+ P/ F" v( u/ Y# ?% x _id: usize,) j8 p; M ]: s. L- M' i
t: Option<JoinHandle<()>>,0 m c+ p( f* Y4 P* p1 G# F
}
3 P9 z* ~4 V( v& E9 h! K; d</code></pre>8 s" U4 }9 R) L2 [0 `( B2 d
<h1 id="要点总结">要点总结</h1>
$ \8 A8 I6 t# ?$ @( t! N<ul>
' l# d$ I2 ~- |& c<li>Mutex依赖于生命周期管理锁的释放,使用的时候需要注意是否逾期持有锁</li>3 H5 y: W! s" `/ T' p2 k
<li><code>Vec<Option<T>></code> 可以解决某些情况下需要T所有权的场景</li>
5 p: q3 m4 L; D6 T8 d</ul>
0 C4 g8 K* ~. r1 |* U<h1 id="完整代码">完整代码</h1>. H `5 v6 {+ ]+ o; C+ l
<pre><code>use std::thread::{self, JoinHandle};- {) b$ \& ^9 I: P, C5 B
use std::sync::{Arc, mpsc, Mutex};, l2 @/ X8 q2 i, E
- ^0 Y* N" S/ B6 Q. o0 U
C; Y# i# j4 `8 [type Job = Box<dyn FnOnce() + 'static + Send>;
/ h5 ?1 I( [' c: _* Y benum Message {$ D7 o5 @5 U( ?! E W$ s: m3 j
ByeBye,
8 B6 Q$ ]/ p$ ]) I: | NewJob(Job),
: U6 _1 }2 |+ t2 W, \4 T$ Z}! R" C9 D/ m6 W
2 B" W7 t$ I _# e
struct Worker where+ A- P5 X0 c7 @& i5 I( j0 A/ N
{
& _& r @" e: f4 L) Z# t7 D _id: usize,' x5 P3 E0 S8 i2 c
t: Option<JoinHandle<()>>,3 ]$ G( v4 z, h
}
& Z) u' o) n+ l( L' v! D2 ^
/ t# B' b# ^% h. aimpl Worker/ v! ?7 D, ^! @7 `9 P
{
( L: {- Q' ]! G: f( t. [' A5 O( w4 V fn new(id: usize, receiver: Arc::<Mutex<mpsc::Receiver<Message>>>) -> Worker {6 ^+ ]' P* e; b# h1 d' T1 ] i
let t = thread::spawn( move || {$ `7 ~; z* s' K* f2 z
loop {
" b, L' c! ]* G* k let message = receiver.lock().unwrap().recv().unwrap();8 n- K% t( ~& M6 D7 G, b2 H
match message {
8 j; a2 K7 r/ F Message::NewJob(job) => {% p, ^* C' j8 C$ k5 m0 r Z4 ?2 B
println!("do job from worker[{}]", id);
" H4 X; k8 n' Z job();5 \, t$ }" O* }! E/ N
},
3 n# h0 L, W O/ d Message::ByeBye => {
8 O2 ~5 B7 S; j/ t! m1 A5 d8 D println!("ByeBye from worker[{}]", id);
8 T( K8 A2 F+ {" ^* j9 a break
) _6 q0 t6 C3 P2 H },7 c: Z1 I5 j% f2 X7 w2 c
}
) ?8 u, ?% c/ f' \/ l }5 ~8 a7 ^- g; v4 E/ |; f+ A
});6 F; d; L* y: R) U3 b
5 w- Z" m( G( c) a- N5 ~7 ^5 { Worker {
% ~; F% [ P: v- N) ], ? P- b8 ~ _id: id,
+ f& W; F5 b8 |2 Y% l. v t: Some(t),
, S6 u# a9 T) G b: _ }/ t& X' e* @' |* G( b0 U
}- W! v7 @2 w6 [$ d* P) \; [
}2 ^& g) s a' Y, H$ Y7 f8 I
5 H. \7 Q6 Z! T2 m' k0 O
pub struct Pool {
6 N* K% ^# }' M/ ]- w/ w+ I/ e workers: Vec<Worker>,. ~/ V" ^1 y( ?/ Q' `- O5 u1 ~0 _
max_workers: usize,9 q: \; x, {4 M4 L
sender: mpsc::Sender<Message>7 r! J3 s# h) u
}
& e9 ]6 O: u- n: P0 Z/ h5 W0 P
8 Y& ~6 G1 l. H. j9 qimpl Pool where {
% _: q/ r1 c% E) S pub fn new(max_workers: usize) -> Pool {2 v2 P6 P6 J9 E$ C" S9 s" }* G
if max_workers == 0 {
6 M( f# \) H6 G6 b4 I panic!("max_workers must be greater than zero!"); Z% v1 X0 F! k4 ?
}
9 C+ H0 I& c6 E$ T; ? let (tx, rx) = mpsc::channel();- ?' U+ j2 x: x. i
' |2 D; m ^1 a
let mut workers = Vec::with_capacity(max_workers);
6 [( z' d5 c; F: T8 b6 c7 B let receiver = Arc::new(Mutex::new(rx));0 Q4 _' R0 W) t( U- l& G
for i in 0..max_workers {: K3 E% m# T: z8 u8 P+ y
workers.push(Worker::new(i, Arc::clone(&receiver)));, F! t7 z- V( r, g7 x) [+ N
}2 O2 z% X5 X& M# ^1 f
* [* f" g+ l" o% x3 G6 _% U
Pool { workers: workers, max_workers: max_workers, sender: tx }
2 e# N/ I2 [" | }
5 I4 D) D+ Y1 p3 r: C1 s7 I
& M" C# r/ e9 H0 U6 Q$ j5 r6 w1 w+ g pub fn execute<F>(&self, f:F) where F: FnOnce() + 'static + Send
4 X, @- L: X$ a4 q2 S/ V {
1 G8 P+ o1 T) z: m; p! K: w8 x6 h9 V& u" i% {
let job = Message::NewJob(Box::new(f));
5 v F9 H1 j7 v/ r2 I3 i+ Z/ e self.sender.send(job).unwrap();4 j' Z7 I0 r; n8 Z6 L- {
}" c; a3 a5 s* V7 }3 W' g
}8 d8 P; p: q3 l- S; _
2 L' U( d; @! M$ l4 ~1 F: gimpl Drop for Pool {! x# c. x3 v6 A# H8 c
fn drop(&mut self) {
. ]/ y$ R/ F0 [0 _0 M6 ?+ U% @ for _ in 0..self.max_workers {
( U0 i" D/ q1 Z self.sender.send(Message::ByeBye).unwrap();- e3 J4 t6 T8 t' h
}+ L/ D) k3 p9 Z3 ~
for w in self.workers {
- n: u x8 F, O if let Some(t) = w.t.take() {* |4 d. j* C% Y5 V) h5 s
t.join().unwrap();
& q; _ m. P8 `0 n }+ i! b3 Q) T+ H% B9 r9 A+ s
}
/ z( E: z, ?) b }, g6 z! f( [' g' z# K
}
A* x4 C% N9 z, ~% R7 u; [
% G- X4 y6 j/ b4 |- [6 a8 b
* w/ \( r- L8 |% Z5 i% w5 ~#[cfg(test)]9 Y$ I6 x) `5 D% C, F4 y
mod tests {
6 w6 S8 m7 N8 H6 F% J use super::*;" H7 P3 Q6 ^7 C% l. [2 U# k
#[test] f" h1 `0 P2 I4 B5 K" w3 q* Y p' y
fn it_works() {) o% G1 [# `, U, D
let p = Pool::new(4);0 i$ @# x' Y( x- _5 n6 w
p.execute(|| println!("do new job1"));
: u$ D3 p1 s7 X. v U5 [0 U' ~1 u3 S p.execute(|| println!("do new job2"));( y- ~( T. c. k' f( G3 p1 E
p.execute(|| println!("do new job3"));
& ?4 B3 |9 B3 ^5 \( M | p.execute(|| println!("do new job4"));
, n% a x: \0 ?+ r }
$ ^- {0 J0 L- e' l# |3 \( s$ Q}' a V7 l) {' A4 e1 S( `$ x% @
</code></pre>0 [' L; q; G" `
1 Z2 O/ w0 w: m) r7 N6 y |
|