|
3 A, Y2 C8 X1 p: {( G
<h1 id="如何实现一个线程池">如何实现一个线程池</h1>
) Z/ [/ V. g! K$ p) C. z<p>线程池:一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。 例如,对于计算密集型任务,线程数一般取cpu数量+2比较合适,线程数过多会导致额外的线程切换开销。</p>
5 r( |9 c( o1 g- x, t6 G6 h<p>如何定义线程池Pool呢,首先最大线程数量肯定要作为线程池的一个属性,并且在new Pool时创建指定的线程。</p>$ D8 h: ~5 `% F' q. I3 i" q
<p>线程池Pool</p>
9 X* G- L, d( @) ~8 c$ m<pre><code>pub struct Pool {
2 X' y% y7 z( K/ ], _7 t max_workers: usize, // 定义最大线程数
' t+ |' T0 W8 f5 J) g' b. c}, o0 } F2 I) T. p- C$ Y c i- k
* c) Q4 m4 s) V2 V+ Rimpl Pool {
/ p6 H1 B2 I* ^- R' [ fn new(max_workers: usize) -> Pool {}
$ \- q# l4 Y3 ~0 }5 u/ M fn execute<F>(&self, f:F) where F: FnOnce() + 'static + Send {}' S3 h! o& G# i- m
}
7 ~# L+ l1 ^, D5 B1 Z6 z% W
0 i+ }$ F0 r. l* ~, u6 U</code></pre>1 D4 r& u( I* `: V% U C2 J
<p>用<code>execute</code>来执行任务,<code>F: FnOnce() + 'static + Send</code> 是使用thread::spawn线程执行需要满足的trait, 代表F是一个能在线程里执行的闭包函数。</p>; U A* L0 u( d6 ?4 k0 L
<p>另一点自然而然会想到在Pool添加一个线程数组, 这个线程数组就是用来执行任务的。比如<code>Vec<Thread></code> balabala。这里的线程是活的,是一个个不断接受任务然后执行的实体。<br>" M; T: u! r9 X. l# u: F
可以看作在一个线程里不断执行获取任务并执行的Worker。</p>9 j$ I; E* m5 Y2 p) U8 l
<pre><code>struct Worker where! k8 j3 Q0 T+ T" a) ^
{
( n( D* H4 A4 E1 P _id: usize, // worker 编号, h6 O( a; O, O K. M. K& E' Q
}+ a- X4 r0 W: N2 ?6 [" t
</code></pre>" f# K/ D1 F: p, H/ A
<p>要怎么把任务发送给Worker执行呢?mpsc(multi producer single consumer) 多生产者单消费者可以满足我们的需求,<code>let (tx, rx) = mpsc::channel()</code> 可以获取到一对发送端和接收端。<br>4 i& G6 K7 d' O- X8 Q! B' {' ~! @
把发送端添加到Pool里面,把接收端添加到Worker里面。Pool通过channel将任务发送给多个worker消费执行。</p>
6 G- S# X s( o+ p8 @) \& Y3 Y<p><strong>这里有一点需要特别注意,channel的接收端receiver需要安全的在多个线程间共享</strong>,因此需要用<code>Arc<Mutex::<T>></code>来包裹起来,也就是用锁来解决并发冲突。</p>
4 B6 G* @0 W- ?. L<p>Pool的完整定义</p>
# N) L9 Q7 D* ^* o5 Q2 Y3 z<pre><code>pub struct Pool {
2 K8 q+ [+ b1 x0 d4 ? workers: Vec<Worker>,
8 J$ r( V: A; f' d4 \4 X max_workers: usize,. u% ~- f; q/ j f: c- ]
sender: mpsc::Sender<Message>1 P" [, I. v1 D2 ?
}
: J% b! J Q8 ^6 o7 V</code></pre>$ R0 |2 T; D$ ]9 u4 u
<p>该是时候定义我们要发给Worker的消息Message了<br>( X# D1 w, h0 t t' X: _
定义如下的枚举值</p>$ ~; `, y6 F5 y8 |; H4 k' Z! r
<pre><code>type Job = Box<dyn FnOnce() + 'static + Send>;
! |) L3 s- e- y. ~2 henum Message {
; {& y( a _; J8 O! |* l% l ByeBye,# D$ W. Y- C) M3 v- M/ P4 [
NewJob(Job),' y( @/ L9 W! O8 T4 d" x! h
}
& | |- t* k* K# S9 k5 S' H" W</code></pre>
/ a( ~1 ? V! L) M8 t7 r; {<p>Job是一个要发送给Worker执行的闭包函数,这里ByeBye用来通知Worker可以终止当前的执行,退出线程。</p>- ^" T1 @+ Z4 ^2 j* f6 o* D0 U
<p>只剩下实现Worker和Pool的具体逻辑了。</p>
$ @/ M- ] f, _( ~) x2 l! U<p>Worker的实现</p>3 L4 v. O+ C0 y; O
<pre><code>impl Worker5 R5 Z4 D! l9 i
{
7 R0 x& u8 W" V" E$ O2 P fn new(id: usize, receiver: Arc::<Mutex<mpsc::Receiver<Message>>>) -> Worker {0 Y$ t6 J* O' ?6 G$ \, u8 X: T3 V
let t = thread::spawn( move || {
1 N- U9 I* i( L2 O3 _& n7 H6 | loop {
. q. d6 V6 ]4 `: R" J let receiver = receiver.lock().unwrap();0 v3 e+ n0 h# D
let message= receiver.recv().unwrap();
2 q, Z+ p3 H9 k+ A. {8 y- q match message {
9 f% Y, w5 h9 e. d3 R4 M' l9 z$ a Message::NewJob(job) => {
: p3 G) _( m$ P0 ^' r: \* Q" m( {( f println!("do job from worker[{}]", id);5 H' ?. X8 }# h5 [: b0 U! j
job();
3 E$ L+ S$ W- O; `3 Z },
* y/ l7 @ u# J4 ]. A Message::ByeBye => {
4 t9 X, ]$ r: I6 |1 j( o! n; @ println!("ByeBye from worker[{}]", id);
% k6 x- T2 o" t: ~ break
# z" n" d/ K; m# p/ n1 T },
g) J- S' a' G/ ` } 9 G+ @' U6 I' W9 y7 z; L
}& h3 J% d8 h3 x1 P" x( f6 u
});7 }* W1 d& V7 |. b0 i7 ^$ O
; c u2 I! e, }3 l4 d2 M! L
Worker {
. i* M, p! N, U _id: id,/ U6 w4 Z2 q5 N4 Q
t: Some(t),
, {; n; j& A5 S }2 s6 Y( I; q5 \! W f/ k' z
}
; H* O# c1 |% }' }3 {}7 M8 Q# c6 A7 ]: _% b
</code></pre>* Q5 K2 [. l! l- Y
<p><strong>let message = receiver.lock().unwrap().recv().unwrap();</strong> 这里获取锁后从receiver获取到消息体,然后let message结束后rust的生命周期会自动释放掉锁。<br>: j+ a6 g- K! n1 K
但如果写成</p>: n) R: m# ]: x# z% T8 n* A8 v* |, @
<pre><code>while let message = receiver.lock().unwrap().recv().unwrap() {% V6 ^7 k, i4 `9 o* V( ~% B$ _
};
0 H) m( ?) d& z2 S9 F( u0 h0 `</code></pre>
: ]3 n1 O/ I% D<p>while let 后面整个括号都是一个作用域,要在这个作用域结束后,锁才会释放,比上面let message要锁定久时间。<br>, T' z3 i, _% u4 C! H1 A8 s
rust的mutex锁没有对应的unlock方法,由mutex的生命周期管理。</p>* k$ z! P3 O' a1 D) s: i5 G
<p>我们给Pool实现<code>Drop</code> trait, 让Pool被销毁时,自动暂停掉worker线程的执行。</p>
/ i8 ]+ Z- p* j9 \) p1 o( E3 v! {<pre><code>impl Drop for Pool {5 U" B' p+ H' R! X) G! Z4 B; ^# g
fn drop(&mut self) {
" t, ^0 ]0 I B' ~# S for _ in 0..self.max_workers {
, s5 W$ y2 T% q U self.sender.send(Message::ByeBye).unwrap();
3 E$ Y- v [8 X- [* h" _ }
: S! V: d5 v- b for w in self.workers.iter_mut() {. I5 u" w0 E( o7 m. T
if let Some(t) = w.t.take() {
. ?7 I- d4 O6 Y" G& Z8 `; P t.join().unwrap();% G9 x! A$ F! n$ f3 e. i0 z
}
6 r( M" d" E6 T( X } l3 u, E+ c7 T
}
, C/ ^# T3 i$ X2 c+ {& f/ y}5 e% N7 R. O9 a/ S3 V! g) X" e$ G0 x
- ?/ ]! R, s( A</code></pre>$ ?! ~' V* ?% {6 |/ V6 C8 i
<p><strong>drop方法里面用了两个循环</strong>,而不是在一个循环里做完两件事?</p>
# Z) ^; N; Z% d; | e' Z5 m<pre><code>for w in self.workers.iter_mut() { C3 c/ G; H6 W* v! ?& i0 r8 ^
if let Some(t) = w.t.take() {, z) n' s' }! g! o" k" x3 Y
self.sender.send(Message::ByeBye).unwrap();: }6 F& u3 e1 F0 U, T! |: e
t.join().unwrap();) y6 A+ a0 a# ^3 Z. s. Y# H9 ~
}
6 @, k' \0 _4 M+ i1 P' N}; U4 [2 t3 c, A P) L
5 w" T% T" S+ Q& x# l
</code></pre>
4 N! O* c0 M( Y, @4 g* n5 n<p>这里面隐藏了一个会造成死锁的陷阱,比如两个Worker, 在单个循环里面迭代所有Worker,再将终止信息发送给通道后,直接调用join,<br>: H6 D8 L& e* [# |- ?
我们预期是第一个worker要收到消息,并且等他执行完。当情况可能是第二个worker获取到了消息,第一个worker没有获取到,那接下来的join就会阻塞造成死锁。</p>
- W4 J, W) k$ M( w<p><strong>注意到没有,Worker是被包装在Option内的</strong>,这里有两个点需要注意</p>0 e" X. m& w4 A1 \6 b/ I
<ol>
$ Q& c) \5 |$ A<li>t.join 需要持有t的所有权</li>1 Y) U0 H; P& Q z
<li>在我们这种情况下,self.workers只能作为引用被for循环迭代。</li>& ?6 t/ q: V d) A
</ol>& k5 [$ A; D2 D( H- y1 v; M8 r+ v/ @6 w
<p>这里考虑让Worker持有<code>Option<JoinHandle<()>></code>,后续可以通过在Option上调用take方法将Some变体的值移出来,并在原来的位置留下None变体。<br>
4 ^6 W2 o# N3 l, s% r, l' A3 B换而言之,让运行中的worker持有Some的变体,清理worker时,可以使用None替换掉Some,从而让Worker失去可以运行的线程</p>$ ]& E. Q, A& D6 N% n$ f2 R6 M
<pre><code>struct Worker where
' q& }$ D! K/ w! L8 D{, m) j: l5 q. j5 I8 ^4 p9 w V# a7 b
_id: usize,
( ~0 f( m! _) S( t8 ?. k9 S t: Option<JoinHandle<()>>,
, f' w2 A6 V/ N7 d}. X s6 ^" s$ f0 k, r. I0 g3 S
</code></pre>2 w9 e- B4 @8 Q8 y* e) ?& R
<h1 id="要点总结">要点总结</h1>% G/ `0 i4 ]3 D8 ?( x9 \
<ul>5 a- K* ]3 P2 V. i$ u+ _. C
<li>Mutex依赖于生命周期管理锁的释放,使用的时候需要注意是否逾期持有锁</li>
' r9 w j7 Q8 f9 ?* _* A' f<li><code>Vec<Option<T>></code> 可以解决某些情况下需要T所有权的场景</li>
/ j, b2 w$ Z3 d. v* y. N1 f</ul>
7 _9 S, k. G- ^) K8 y1 ?+ Z<h1 id="完整代码">完整代码</h1>
$ T: d& ~: b) I5 \( E<pre><code>use std::thread::{self, JoinHandle};
: C: i1 n7 n, t7 Q# F4 E8 iuse std::sync::{Arc, mpsc, Mutex};
: h" ~3 J4 I& k$ \# V/ S7 s* R8 d2 V, ?( C
. K" W# p% b) v
type Job = Box<dyn FnOnce() + 'static + Send>;
. w8 |4 t9 v aenum Message {% ?6 y" r" {* e+ T8 {; k
ByeBye,. Y1 h/ L7 p! k$ L4 x3 a
NewJob(Job), d) @/ c) \ T7 ~& i, e. g4 ~) r3 C
}3 Y f* j5 P+ @! t& e
& P. t7 u% {, _- m) Lstruct Worker where
3 A3 L. v; }9 j- O2 k% s{3 r/ F9 I$ D- a# n3 k0 z0 u7 T4 D u
_id: usize,- z" a9 ]3 U* x1 N* ]0 M/ R
t: Option<JoinHandle<()>>,+ O% g) w5 H/ r' j4 h1 M$ C
}
) A6 U0 S& }& A" {6 ~8 f6 w( Y- m1 A1 j1 y9 y d
impl Worker
. c+ r! R# b* J{
' y V' V$ _) X7 j' Q s- W fn new(id: usize, receiver: Arc::<Mutex<mpsc::Receiver<Message>>>) -> Worker {8 w) e4 l x9 a2 h6 i" Q
let t = thread::spawn( move || {
3 c8 n: P1 O' e: C; Z& T @ loop {4 ?# K* J2 |3 V* h& q
let message = receiver.lock().unwrap().recv().unwrap();' X0 v7 e/ H" m: `
match message {- `8 k0 o x3 m( e; { @+ B* _
Message::NewJob(job) => {, t& m$ x( H8 }& v
println!("do job from worker[{}]", id);
, z/ [" Q4 w" r6 f job(); r5 }" v* [4 L0 m: M- M
},! v: t' m- a& W. N
Message::ByeBye => {
7 o4 ^( A! j9 G) h7 j6 c0 g5 x3 O println!("ByeBye from worker[{}]", id);' U, ~9 k. _9 H$ \0 j, W, b1 e
break. g' O$ u( F; S8 m/ s
},$ N. z! N5 R/ G; d* z
}
6 T: |! H5 g$ T% D7 h5 _ }4 b% X6 e5 I- B9 K% [+ x
});
3 I- t- K, w# s7 N1 S* T3 |1 t( H
Worker {! W2 e+ l$ n! J- O( ]" b
_id: id,
6 y; ]2 Y4 i+ W) f, { t: Some(t),2 @9 c' U' z- f
}: Q1 g( [, x" u& o0 R6 Z( J
}
$ O+ M4 N( @" l" E+ Y' I* u" c}) I d( r6 H& k1 G6 r2 T9 Z
1 `! G$ o1 j# F1 bpub struct Pool {! X: v# V* N9 {
workers: Vec<Worker>,2 R/ T* W: ^4 _7 ^; o
max_workers: usize, O2 v. t3 A1 V0 `0 `" C
sender: mpsc::Sender<Message>' u' L. _+ M: f3 I* O
}
+ s9 Z3 `; ]& y+ z
* | J/ _* v! T$ O' \impl Pool where {
0 C( s0 b7 V+ H6 `0 y pub fn new(max_workers: usize) -> Pool {" [; E, _5 A; B! K4 R% G# ~8 \
if max_workers == 0 {
9 A8 Y2 w0 B' E; V( _/ m; [5 C panic!("max_workers must be greater than zero!")
3 |) T, H; n3 v. C' V }3 t, k( y) q7 M% }9 z1 i
let (tx, rx) = mpsc::channel();
2 B2 z5 E6 f5 B7 M
/ F' T1 H9 d- \ let mut workers = Vec::with_capacity(max_workers);
" {2 v# j7 F/ K5 f let receiver = Arc::new(Mutex::new(rx));
( z* ^) D& r; _8 W- ` for i in 0..max_workers {9 U4 ~7 V1 i8 K y6 _
workers.push(Worker::new(i, Arc::clone(&receiver)));9 Z: m6 l1 o9 n. ~: Z- y1 g% ^
}
+ I0 V7 \, O- c- d* O4 Y: m+ p9 }0 w" K- D4 D" U2 d
Pool { workers: workers, max_workers: max_workers, sender: tx } } ~1 r; }0 W1 J$ q! C+ N9 S# ?9 ?
}
8 K! S n4 u/ l8 p 9 ` c" H$ |0 E
pub fn execute<F>(&self, f:F) where F: FnOnce() + 'static + Send0 A9 \1 t* i) q0 ^
{
% }, z6 z3 g6 J8 s# T/ t E$ s+ P/ ^+ N' m$ l ]! H
let job = Message::NewJob(Box::new(f));
C% @. a- W' s3 F% h8 }3 l" p self.sender.send(job).unwrap();
: s+ B, X4 P3 L7 v1 ?- n8 A) Y! l0 k D" D }4 u9 m% n9 m; r# Q$ ~
}2 ^! ~+ [& Q# w
, P4 B# X2 Q2 B6 ^* Cimpl Drop for Pool {& H- w5 @- y7 F \4 D. ?- j
fn drop(&mut self) {
; H5 q/ q, x/ i1 A1 R for _ in 0..self.max_workers {: ?5 W6 J& x @; N
self.sender.send(Message::ByeBye).unwrap();0 A8 ?8 M, n C- y! k- m' V
}3 [' F1 c* W0 G( r
for w in self.workers {/ G5 A1 Y& K& `$ e' F$ B
if let Some(t) = w.t.take() {
2 Q' C' g- b3 B7 o8 ] t.join().unwrap();& }, B" ?3 f! r- S
}
/ t/ E+ G* m, X+ }0 D }# B# r: B6 r; K6 C) v- d% a
}& Q; o1 G! \0 r" X, `
}
: p, Q. v& a% N5 ]0 n0 } @; B1 y/ l+ o# u
: N- F5 H$ E+ V5 U, g1 X9 c) k- _#[cfg(test)]
4 |' e( H4 a4 O% Nmod tests {$ v1 ^' J" k* p9 J* l2 y2 N
use super::*;
5 e( I& T/ @0 R. J/ S: ~ #[test]- l* ^: r. K) z& _, L$ f A' O" n
fn it_works() {
0 k1 P. q. Y- s" r' A* B let p = Pool::new(4);
. w" w7 D @6 Y0 B p.execute(|| println!("do new job1"));
1 q6 V5 p2 x/ F+ m2 J$ Z+ n. R p.execute(|| println!("do new job2"));
+ }( {- X1 z7 y! E3 u1 u, { p.execute(|| println!("do new job3"));( x8 @* M% j# D! ~5 g3 r6 m# ?
p.execute(|| println!("do new job4"));( {! c4 W6 W' ?; e
}' l$ O: L8 R2 S9 d' M
}
* g, T% W. `8 v0 w* x</code></pre>( L0 T6 y& x6 P% X- D, ~0 b- E! y
" O/ g: ^ P7 G, [3 \0 r- ` |
|