|
|
# O. j& h# i# Y
<h1 id="如何实现一个线程池">如何实现一个线程池</h1>+ n" G! g1 V# e1 u* O
<p>线程池:一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。 例如,对于计算密集型任务,线程数一般取cpu数量+2比较合适,线程数过多会导致额外的线程切换开销。</p>$ F$ [; m& \. J# k' _4 N0 ?7 E
<p>如何定义线程池Pool呢,首先最大线程数量肯定要作为线程池的一个属性,并且在new Pool时创建指定的线程。</p>
9 w$ P. T+ s3 C( ~' C; d- X<p>线程池Pool</p>
d* s8 j( Q' v" e+ a<pre><code>pub struct Pool {
/ h& q/ a6 h, D1 v7 P$ @ max_workers: usize, // 定义最大线程数
7 a+ S- R) y; @0 \}( N/ c2 M4 x; V0 i
9 Q8 [/ h9 C& i/ s* z) ]impl Pool {
4 C0 ~7 I3 w( c) L5 |8 H fn new(max_workers: usize) -> Pool {}2 ^ \7 Y) k. U, [( V. F& x
fn execute<F>(&self, f:F) where F: FnOnce() + 'static + Send {}
( y& \- S" h f" x. E' u0 r! |& ~7 I}$ x* [: m. L, [3 s k6 H% m! M
# h% L6 I/ E. ?) b7 R! U
</code></pre>4 ]2 x8 s; h% S8 e e/ d% t
<p>用<code>execute</code>来执行任务,<code>F: FnOnce() + 'static + Send</code> 是使用thread::spawn线程执行需要满足的trait, 代表F是一个能在线程里执行的闭包函数。</p>
: B d1 P5 r3 x; R! O% N<p>另一点自然而然会想到在Pool添加一个线程数组, 这个线程数组就是用来执行任务的。比如<code>Vec<Thread></code> balabala。这里的线程是活的,是一个个不断接受任务然后执行的实体。<br>
. p& O" O4 V" G5 q; A* @可以看作在一个线程里不断执行获取任务并执行的Worker。</p>8 ~4 M( Z* g3 u7 Q( ^
<pre><code>struct Worker where
\# P3 {8 Y1 W1 P{% n7 d: ]: W0 N& @. ]9 I
_id: usize, // worker 编号
& h" h& f* P1 d: i5 o$ b- }+ y* j}
" q' g$ X% w8 l. q; O</code></pre>& r' p. \) |- H Z& }* m. P% v
<p>要怎么把任务发送给Worker执行呢?mpsc(multi producer single consumer) 多生产者单消费者可以满足我们的需求,<code>let (tx, rx) = mpsc::channel()</code> 可以获取到一对发送端和接收端。<br>
' n) k* C0 w. p8 s4 C; y$ O8 F把发送端添加到Pool里面,把接收端添加到Worker里面。Pool通过channel将任务发送给多个worker消费执行。</p>
$ y7 @. O& O$ Y$ P; ]' x<p><strong>这里有一点需要特别注意,channel的接收端receiver需要安全的在多个线程间共享</strong>,因此需要用<code>Arc<Mutex::<T>></code>来包裹起来,也就是用锁来解决并发冲突。</p>' J; |6 J4 ?) X5 W/ O: \6 o: E
<p>Pool的完整定义</p>3 w& Q0 u9 Z9 ?4 U( W9 e
<pre><code>pub struct Pool {# x6 \: F. F% K* ]" o6 n
workers: Vec<Worker>,. `; z5 V' D% k) q
max_workers: usize,- g0 L+ T; Q- r% E
sender: mpsc::Sender<Message>
0 J* w. x' E; L8 r+ l* r$ O}; }3 d2 ]6 L, r8 v: V: q
</code></pre>" T0 E P$ h% P6 o2 x
<p>该是时候定义我们要发给Worker的消息Message了<br>) x; Z; U+ K6 [8 g% a* z6 M u
定义如下的枚举值</p>
. w( W: I- J! @, m1 k J<pre><code>type Job = Box<dyn FnOnce() + 'static + Send>;8 x$ o: I" c: V$ U: g9 }
enum Message {7 d$ r1 Y0 c& r7 `( J
ByeBye,
& S+ ~4 @9 d1 @' K% V# E NewJob(Job),
3 e6 H/ N& B: `1 d) O}2 B9 n+ x; ]4 m& \$ b9 {- X3 G
</code></pre>
& m7 L1 }" T( c4 i% i6 p t1 ~5 t0 f! i<p>Job是一个要发送给Worker执行的闭包函数,这里ByeBye用来通知Worker可以终止当前的执行,退出线程。</p>
" {6 y8 g9 c. _" [) S( y) E+ D<p>只剩下实现Worker和Pool的具体逻辑了。</p>( C4 S/ w3 N) S% M" K
<p>Worker的实现</p>3 b* x' [- F+ T! T% x( i- a
<pre><code>impl Worker) i& n6 h4 K0 P* e- t6 t
{' j0 j" Q+ P. p' I3 P( b
fn new(id: usize, receiver: Arc::<Mutex<mpsc::Receiver<Message>>>) -> Worker {* N2 }/ h& P, q' k& J- s" Y
let t = thread::spawn( move || {
/ ^& a) S8 q- }& `: }+ m$ J9 r F loop {5 e) i. W2 l) s, ^1 o8 K
let receiver = receiver.lock().unwrap();' T2 |0 f: f4 @5 r1 j. W; P' S4 ?
let message= receiver.recv().unwrap();
# T' _7 R3 n t4 m: s match message {: W3 B" b( [1 N- A& C( X4 |
Message::NewJob(job) => {$ c3 o( J8 ~% i* j
println!("do job from worker[{}]", id);& K" Z. @. e$ w: P1 Z
job();8 u% I* `$ W0 f
},7 H, ~9 u' @& \- p7 S
Message::ByeBye => {
5 B; B; ]+ V: z+ a/ a println!("ByeBye from worker[{}]", id);6 c) T% q0 c2 _) G6 L( B" K: C2 ?
break. o9 @ i M* w
},+ W$ ^9 q7 R" I' S" j/ I, E
}
, ~8 s( n2 k1 F }
- M1 [; `) B: x4 N h2 S });$ L" P* |) F6 K& X1 h6 U
5 s5 X+ U" ]1 Z4 n: K; Y Worker {+ z; d1 {6 O$ I$ F9 n! b P
_id: id,6 u G- p1 i( d6 K# r
t: Some(t),& [9 V$ W- X3 }
}
# B; Y( h: e" q/ m( M* L }3 d" C' m1 P, x. f9 o9 U S1 k
}4 ?# L% [4 F" ?4 w" Y; s- V" e5 |0 k) w
</code></pre>" P' C5 ^( ?* T* ?+ e8 Z% C
<p><strong>let message = receiver.lock().unwrap().recv().unwrap();</strong> 这里获取锁后从receiver获取到消息体,然后let message结束后rust的生命周期会自动释放掉锁。<br>
: t8 q# u. m' ~9 K6 ?但如果写成</p>
0 Y9 T$ t/ ?: h! [% C<pre><code>while let message = receiver.lock().unwrap().recv().unwrap() {
1 N; S+ t4 A0 J! Q: p};
$ B* [; F! |+ V4 N) b</code></pre>9 `8 @( m7 M4 _& y% ^
<p>while let 后面整个括号都是一个作用域,要在这个作用域结束后,锁才会释放,比上面let message要锁定久时间。<br>
, w* k3 k! V- R! i) j( j4 prust的mutex锁没有对应的unlock方法,由mutex的生命周期管理。</p>+ i- ~. Q/ [8 f9 d* K; ?/ q
<p>我们给Pool实现<code>Drop</code> trait, 让Pool被销毁时,自动暂停掉worker线程的执行。</p>
+ {( ?, r2 a" }: p<pre><code>impl Drop for Pool {
* B, U$ ?( x: V( ]7 l fn drop(&mut self) {
1 Q' Q+ q6 ^: }; q for _ in 0..self.max_workers {2 y7 F% `$ l* A( | U; j" `
self.sender.send(Message::ByeBye).unwrap();
1 U4 M( B3 Y: b* N, F& W$ o } t% G4 [" ~( i" j' b4 O
for w in self.workers.iter_mut() {1 P% X5 P6 R$ |0 ?1 c s
if let Some(t) = w.t.take() {
# {2 I0 n9 \4 J$ C. D t.join().unwrap();
' l* ~+ ~$ M+ f- X3 ~% }% y }
( x" D1 e6 Y+ ^$ ~; T- K0 R }
8 k# A) B+ W1 _( a# n7 ? }1 E1 {6 ?6 ]- S/ @( S' }
}1 W8 w, N$ C$ D! V$ Z B
1 K+ r- ]0 i! b& ]</code></pre>2 `* j$ X, i8 i8 a2 m
<p><strong>drop方法里面用了两个循环</strong>,而不是在一个循环里做完两件事?</p>( S. p9 F2 w& k, k- x4 b6 I# e& I
<pre><code>for w in self.workers.iter_mut() {+ z7 I9 G7 r- D6 \
if let Some(t) = w.t.take() {$ G2 z; d- N. w% M F) O& i+ T
self.sender.send(Message::ByeBye).unwrap();0 `5 l* P8 U1 C/ `" D0 W
t.join().unwrap();
. w& ~& w, p J6 C$ D) p% Q }7 |2 v* ~' d) ?' _: O
}* L a) o. V) G1 D; c6 y7 f5 U" g, u# @
* S' k% [6 `% a# ]6 P/ b: K</code></pre>
! x1 |& {7 T( S- ?# V x<p>这里面隐藏了一个会造成死锁的陷阱,比如两个Worker, 在单个循环里面迭代所有Worker,再将终止信息发送给通道后,直接调用join,<br>
: c, V6 C' _ s: R: k" R" L% _我们预期是第一个worker要收到消息,并且等他执行完。当情况可能是第二个worker获取到了消息,第一个worker没有获取到,那接下来的join就会阻塞造成死锁。</p>, j5 T5 ?$ C! ~3 Z* S7 ~6 E
<p><strong>注意到没有,Worker是被包装在Option内的</strong>,这里有两个点需要注意</p>+ m) ^$ ]! @' y$ a& q7 }) J
<ol>: L, Z8 f+ w0 I. w5 ?
<li>t.join 需要持有t的所有权</li>: `: \+ ]/ S7 o
<li>在我们这种情况下,self.workers只能作为引用被for循环迭代。</li>
: F, I6 v4 H: A2 o</ol>! n/ z" E& p4 w+ t% M8 j
<p>这里考虑让Worker持有<code>Option<JoinHandle<()>></code>,后续可以通过在Option上调用take方法将Some变体的值移出来,并在原来的位置留下None变体。<br>2 ^3 p5 q" f( k& F4 m) t2 v" v n7 q$ u
换而言之,让运行中的worker持有Some的变体,清理worker时,可以使用None替换掉Some,从而让Worker失去可以运行的线程</p>
* j# r5 f# q6 Q0 P8 x<pre><code>struct Worker where; \9 i6 i' f; e0 @* W4 @
{. V3 W' s+ \5 s
_id: usize,
! z2 T- ]) B! k# n2 j" { t: Option<JoinHandle<()>>,
7 v: M1 x6 E% w}
4 p' I. @) x l; \: n3 }</code></pre>
" a0 q* h% x2 _) V$ G' N<h1 id="要点总结">要点总结</h1>% ? S p) | d
<ul>
8 `0 \7 r5 D' `: V& R<li>Mutex依赖于生命周期管理锁的释放,使用的时候需要注意是否逾期持有锁</li>
8 J+ v+ d! t8 ^3 n% i+ F$ ~<li><code>Vec<Option<T>></code> 可以解决某些情况下需要T所有权的场景</li>
& Q) ~* K7 N( C" V9 ~9 B3 w</ul>
" M- a8 p, H# l K# G<h1 id="完整代码">完整代码</h1>$ x S) n; \" i0 l# a' }
<pre><code>use std::thread::{self, JoinHandle};1 r- ^' z+ y8 P" {% O9 w5 i0 b! i
use std::sync::{Arc, mpsc, Mutex};3 V% j4 Q7 M A2 T" x- }
. K( p. o+ C& c- `% I; G
?5 k1 S- ?+ Btype Job = Box<dyn FnOnce() + 'static + Send>;$ O% x \* s/ v o6 B* i
enum Message {
8 F; q: p/ l) V% D* Z% q8 | ByeBye,4 x2 h) q [5 c) r9 }* f! Z7 r
NewJob(Job), E8 L' {+ @) @5 `- o. `1 W' N
}9 M; a1 @# ~$ r2 t) E: v
7 t9 G" U1 C0 v5 f! Z5 W' [struct Worker where
% l0 d8 m1 u5 t* [1 d) Z8 y/ a{
' o! d* o- D/ b5 m! @ _id: usize,
' k2 [3 n- X; M3 y8 U t: Option<JoinHandle<()>>,
. F" I/ i% B* Y/ A9 ]}4 H# }, C, f1 j
$ p; {* t5 L, e/ `. v, }9 b
impl Worker; m/ z: E" f! v. @1 Y+ e' G
{
2 H: F/ F/ f* j6 L) U+ f0 q fn new(id: usize, receiver: Arc::<Mutex<mpsc::Receiver<Message>>>) -> Worker {; d6 l( a% m' L$ e O$ g6 k
let t = thread::spawn( move || {0 a l- `9 n/ g7 x9 d/ O- ^
loop {+ X9 P7 Z( p$ Z4 ^5 k2 ]
let message = receiver.lock().unwrap().recv().unwrap();
2 v. |0 Y& U8 f2 ?: S. ] match message {+ L. h# S+ x+ M
Message::NewJob(job) => {
8 y8 X, ?# \# A println!("do job from worker[{}]", id);
& |1 ^: @6 T; @2 F job();
$ C( M: M. ]; ^, q },
3 W8 Z8 K- `8 K! w Message::ByeBye => {+ r5 t# T& A. X* X! S. R# {5 u" V
println!("ByeBye from worker[{}]", id);
; e7 w8 G5 A) }$ r9 t8 Q$ i break6 B9 O- ^# t0 @) \0 s# F
},) |4 ~1 k( C" R( i& A. e
}
7 F( w. z1 n7 J0 o0 L }) f" I/ ?! |8 I. \7 r( X
});
6 R3 W, l" @$ d$ R* t! Y8 w
1 c) G' v7 m* `4 A6 F2 l Worker {, _6 H: E5 X+ Y% f; O* t: w' S
_id: id,
9 C _9 I7 W m) z t: Some(t),8 N x0 j. N% p7 n& k* s5 F0 T
}) C% F' {; s' g# P% p* w- G
}
. ^ f J& l( I5 n3 Y& C9 ^: B/ U# I}7 D6 s0 ^# t2 a4 P
; z( E) e; K2 @' l
pub struct Pool {& g/ ^0 H) v- e! F
workers: Vec<Worker>,& z4 m% A! d0 s, V4 E5 k6 n. S1 w
max_workers: usize,
! \" \6 ~0 c2 ^ P8 v sender: mpsc::Sender<Message>5 _7 n5 o2 B% ?/ X- J. G
}' _; N n$ [% h
* x# p" [+ h2 _
impl Pool where {" ?( ?1 H& P" l, ~& S( ^; |
pub fn new(max_workers: usize) -> Pool {( q. m/ L$ W/ }# z
if max_workers == 0 {7 h5 Y: q4 e6 T6 z
panic!("max_workers must be greater than zero!")0 P$ D3 c2 y$ L; q" ]& }
}
! n3 b5 z3 V6 m+ i9 V let (tx, rx) = mpsc::channel();
/ E$ ~9 I& h9 D y( g5 f2 B
) m: e! t! `( p# K8 v) a let mut workers = Vec::with_capacity(max_workers);) d. Y4 }% y0 f, R* J
let receiver = Arc::new(Mutex::new(rx));) k$ K4 i+ `5 B% q: M( S
for i in 0..max_workers {7 w6 v/ w8 W, u7 t+ _
workers.push(Worker::new(i, Arc::clone(&receiver)));7 V# y Z0 `/ J$ f- q. [/ ]* ]
}" z% T# ?5 o( B9 u/ @
8 h. L8 j u/ \+ M& }: O* Z, _3 Z
Pool { workers: workers, max_workers: max_workers, sender: tx }: k+ b) H7 T; R; F9 {2 N
}4 X* p1 }3 b2 E+ a5 M0 N* y
, ~2 n: b! z C; N' w pub fn execute<F>(&self, f:F) where F: FnOnce() + 'static + Send; d$ }& j' |8 j2 l, b
{. T t( K- L- j# {0 s; C
3 b# n, `" F( v5 u
let job = Message::NewJob(Box::new(f));
( O+ s' S0 v) f5 p/ l self.sender.send(job).unwrap();
) I! l" V9 _1 N& L: u: X }
9 Z! g; H; V9 ]4 k2 U/ b: ?}
: K" m0 n% `7 w+ G5 x5 Q; Y# p7 ~
impl Drop for Pool {
8 X, q' V3 o& e9 [+ x3 c fn drop(&mut self) {
) K/ Q# v( q5 }$ c0 ]0 D for _ in 0..self.max_workers {
% U+ o$ a1 H: a self.sender.send(Message::ByeBye).unwrap();- ?+ E5 b Y2 h* D
}7 m0 k: \ q) L! w9 h; @" P/ E
for w in self.workers {. G0 o5 J$ q4 j, j4 k2 P
if let Some(t) = w.t.take() {% B& {5 ?: X# b# x, N6 Q
t.join().unwrap();9 @5 L6 A2 h+ J- U7 c' u
}% Q8 @' i8 t; ~+ j6 M- K. [% |6 w
}! k5 ]. U: f8 R, Q9 u
}! L- P1 C; O0 G
}
) k4 o/ T4 J6 [5 P& Q$ W7 k5 l1 @. d% c1 S3 h0 v4 K
. W! Y1 R3 ?3 G! ]# l#[cfg(test)]
) e6 w; j+ S# b: t! U+ x8 Omod tests {
# p9 C! d8 t/ g" q" Y) \2 _ use super::*;
: |+ s& {: H6 O* U #[test]1 q6 C, m1 t1 H( Y
fn it_works() {
- \- h0 L( I0 u! n) y$ j let p = Pool::new(4);
- M |1 d- L6 r% y8 W p.execute(|| println!("do new job1"));! @7 ]0 o$ F0 j9 x4 x
p.execute(|| println!("do new job2"));0 m0 p4 }8 M# K7 \. `1 s6 J
p.execute(|| println!("do new job3"));9 |& v. [2 N' o3 u3 n6 I
p.execute(|| println!("do new job4"));
: |! L' l1 u2 r: Y0 J+ Z) @* { }- u @/ h5 k$ o
}
. n3 f! Q& f/ [ L2 S* i4 a, t</code></pre>, o9 E5 M- L" I+ A2 B o
7 V9 e5 r( z' F! ^7 W |
|