飞雪团队

 找回密码
 立即注册
搜索
热搜: 活动 交友 discuz
查看: 14761|回复: 0

rust 实战 - 实现一个线程工作池 ThreadPool

[复制链接]

8053

主题

8141

帖子

2万

积分

管理员

Rank: 9Rank: 9Rank: 9

积分
26489
发表于 2022-2-12 14:35:42 | 显示全部楼层 |阅读模式

" V4 A' L) X3 D* L<h1 id="如何实现一个线程池">如何实现一个线程池</h1>
$ x+ k* z- b! `& @% R<p>线程池:一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。 例如,对于计算密集型任务,线程数一般取cpu数量+2比较合适,线程数过多会导致额外的线程切换开销。</p>
) y2 Q9 Q4 L/ L% M" k% k0 S<p>如何定义线程池Pool呢,首先最大线程数量肯定要作为线程池的一个属性,并且在new Pool时创建指定的线程。</p>
( W% }3 [2 p1 Q, q9 ?  _<p>线程池Pool</p>, u: [" K3 {& {8 n$ J' z
<pre><code>pub struct Pool {/ @! a1 K; M% n" k
  max_workers: usize, // 定义最大线程数
6 ~! E% h+ u" a! j}
3 B9 p3 o. d: r5 x8 g& b% c- j7 ?. X
impl Pool {
: b8 b2 q/ F7 b1 L  fn new(max_workers: usize) -&gt; Pool {}
. i" o9 }6 u  Q) G  fn execute&lt;F&gt;(&amp;self, f:F) where F: FnOnce() + 'static + Send {}
, V, g% ?5 ^3 M+ P) d( ~' O! ?}/ ]: ]* A/ I1 G7 V% K3 H0 I
) [! |5 B8 ?; w5 b3 \2 j: {
</code></pre>" S$ c9 U7 d; D5 h4 l
<p>用<code>execute</code>来执行任务,<code>F: FnOnce() + 'static + Send</code> 是使用thread::spawn线程执行需要满足的trait, 代表F是一个能在线程里执行的闭包函数。</p>
% \- f. E, x2 p+ `6 {' {<p>另一点自然而然会想到在Pool添加一个线程数组, 这个线程数组就是用来执行任务的。比如<code>Vec&lt;Thread&gt;</code> balabala。这里的线程是活的,是一个个不断接受任务然后执行的实体。<br>/ Y5 l: |' N  }& C! F6 b
可以看作在一个线程里不断执行获取任务并执行的Worker。</p>8 \: h5 Y& B* b! X5 c* m% U
<pre><code>struct Worker where: {! U  m! ^1 i* l7 M
{
- H; {  p3 d# L) I0 e# `    _id: usize, // worker 编号$ P# {1 E0 b5 F! S5 @+ f
}1 l8 Z: l/ V6 [" X
</code></pre>
1 V0 e3 B7 O. e9 o$ l( l7 b1 I- k<p>要怎么把任务发送给Worker执行呢?mpsc(multi producer single consumer) 多生产者单消费者可以满足我们的需求,<code>let (tx, rx) = mpsc::channel()</code> 可以获取到一对发送端和接收端。<br>
6 s6 E* ~, b' Z. ~) C3 o# a把发送端添加到Pool里面,把接收端添加到Worker里面。Pool通过channel将任务发送给多个worker消费执行。</p>% v( k4 `; E. h' @7 R9 G2 _
<p><strong>这里有一点需要特别注意,channel的接收端receiver需要安全的在多个线程间共享</strong>,因此需要用<code>Arc&lt;Mutex::&lt;T&gt;&gt;</code>来包裹起来,也就是用锁来解决并发冲突。</p>% g0 k8 N9 ^: U" ~# Y7 R
<p>Pool的完整定义</p>
! }4 Y; A, P& J/ O. s9 _<pre><code>pub struct Pool {
# z+ R3 y! K8 o4 V8 I    workers: Vec&lt;Worker&gt;,
; _* E. ?+ N7 S, R6 ?    max_workers: usize,) x& ^6 L9 F9 ~  A  d
    sender: mpsc::Sender&lt;Message&gt;( l( J( C: ^% k$ i4 n9 R, e
}
+ e2 j5 Y/ G; z7 g& ~6 t1 _$ P</code></pre>0 _, q4 i% b6 x& I( p0 E
<p>该是时候定义我们要发给Worker的消息Message了<br>6 s" U7 E+ J) r* a- I
定义如下的枚举值</p>
( M* ]# G9 o+ T<pre><code>type Job = Box&lt;dyn FnOnce() + 'static + Send&gt;;$ U# u- A/ {/ p1 B1 u
enum Message {
2 x& @' d, O9 @7 z# f0 \    ByeBye,: @2 x4 O8 d1 w% F. s6 t. Z) K
    NewJob(Job),0 @# r; H7 j: d0 N2 S# i
}
9 O: x( t- ^" V+ Z% L( i: E4 M9 q. K</code></pre>
  v! U/ V% k% K<p>Job是一个要发送给Worker执行的闭包函数,这里ByeBye用来通知Worker可以终止当前的执行,退出线程。</p>! ~/ v" B' m. W6 ]
<p>只剩下实现Worker和Pool的具体逻辑了。</p>
/ c& |9 ]) S- U; T; {2 \9 S<p>Worker的实现</p>, Z) E7 k, Z" [0 ~. Y5 {" k
<pre><code>impl Worker0 |# `9 Y7 n" I6 C$ L
{8 s( L+ W2 k% j! d
    fn new(id: usize, receiver: Arc::&lt;Mutex&lt;mpsc::Receiver&lt;Message&gt;&gt;&gt;) -&gt; Worker {
( A3 C; B# W+ D2 l0 Y0 C        let t = thread::spawn( move || {
1 @$ E4 Q( ^: H+ b& v8 K            loop {
+ u$ w8 n; j0 |- J) s: u/ B                let receiver = receiver.lock().unwrap();
# u) E# B& U3 }2 c8 r. ^: \                let message=  receiver.recv().unwrap();+ w, D" Q) a3 t5 a9 G" v3 u& M
                match message {
2 s: N4 T% I* E* [6 |( ], B1 Z                    Message::NewJob(job) =&gt; {
* l6 B# h! k/ N# k3 \4 d7 K5 m                        println!("do job from worker[{}]", id);
% \, @2 d9 _- W6 K1 o                        job();
4 F1 d4 B8 T/ ]                    },' j  K2 T4 \1 H/ n) C/ h+ w
                    Message::ByeBye =&gt; {
9 i' q/ A0 ]0 p+ U- b& M                        println!("ByeBye from worker[{}]", id);9 L/ \' h% _3 _! n6 ], Z; ^2 A
                        break
* O" W0 l  k5 r* t& p$ ]$ E. W' n1 N                    },: X* ~% v# i. V# A- |4 W) g* E
                }  , k+ E* V: `8 q, K, o% e4 _2 j
            }
8 F( b) w( d2 b  w* y  @        });9 r4 t: W+ g- p  E
5 L9 N! M; o6 x1 p, F
        Worker {/ n8 w' p6 z5 c0 a# U0 f$ ^
            _id: id,3 p4 y7 s4 J  S9 k; x
            t: Some(t),& P% X$ l' f2 E* f: e' t
        }' v2 ?( G# E* \, ?& ]0 u! J* O
    }
3 K* t, |) e% i4 V2 `}  W7 P# F0 D0 N2 v3 m% S' r
</code></pre>5 Y& Q( B7 W6 ?% n9 I
<p><strong>let message = receiver.lock().unwrap().recv().unwrap();</strong> 这里获取锁后从receiver获取到消息体,然后let message结束后rust的生命周期会自动释放掉锁。<br>" ~1 W3 u% _  x( Y, z# m9 X
但如果写成</p>
  f* M% M$ ~4 J! E1 O, ?5 Y: q<pre><code>while let message = receiver.lock().unwrap().recv().unwrap() {4 P' [* ^+ E- o5 o/ c
};
* R# V* \0 h) B; O$ Y</code></pre>
) O7 C6 M: Q) m<p>while let 后面整个括号都是一个作用域,要在这个作用域结束后,锁才会释放,比上面let message要锁定久时间。<br>* }+ @7 g, z( T. W$ ~
rust的mutex锁没有对应的unlock方法,由mutex的生命周期管理。</p>* f- j$ f! Q' q$ Y& Z& S
<p>我们给Pool实现<code>Drop</code> trait, 让Pool被销毁时,自动暂停掉worker线程的执行。</p>& c6 M) b. O+ Z  J
<pre><code>impl Drop for Pool {8 N0 S$ j; z/ _/ c* R
    fn drop(&amp;mut self) {
$ V3 L( b2 p8 ?5 g8 U, \, _        for _ in 0..self.max_workers {: z( N6 d7 p1 R. {
            self.sender.send(Message::ByeBye).unwrap();
' H9 @* s( x" e. y        }' Z; s( z- I* @& L- t0 j
        for w in self.workers.iter_mut() {
% r/ C' Y( ^2 G7 O            if let Some(t) = w.t.take() {
7 e; K: c/ F3 G                t.join().unwrap();
+ o( _0 f) L+ f7 V            }
! X9 x2 F% d3 J  @& w        }
( W4 l, t5 F) Z$ X& n, |6 G    }
: @$ V) ~. c4 v( W}" l+ ~' U/ Q" I( ^; q4 g

. J. ]( j+ Q" N2 S2 A- d</code></pre>
; R8 K9 u* M( d9 l. _<p><strong>drop方法里面用了两个循环</strong>,而不是在一个循环里做完两件事?</p>
8 D8 ]: j: d- O+ @6 b8 {<pre><code>for w in self.workers.iter_mut() {( w. M0 G& {- Y. W9 v6 J
    if let Some(t) = w.t.take() {- n/ W7 _- X6 U* _
        self.sender.send(Message::ByeBye).unwrap();
7 Y2 J5 D. k  p. V/ c) A  [        t.join().unwrap();" ~; O% P% z. T4 d& |! h6 Z
    }. H6 I/ c3 w) \
}
% z( T. i2 |- D" V% B/ g/ ~1 ~: z, h3 m/ D1 S7 O" Z
</code></pre>
6 `6 W  a$ R2 N# }  A<p>这里面隐藏了一个会造成死锁的陷阱,比如两个Worker, 在单个循环里面迭代所有Worker,再将终止信息发送给通道后,直接调用join,<br>4 {1 x5 J  J0 e# a# t
我们预期是第一个worker要收到消息,并且等他执行完。当情况可能是第二个worker获取到了消息,第一个worker没有获取到,那接下来的join就会阻塞造成死锁。</p>0 ~: ^# }9 O7 F9 C) C0 o& ]
<p><strong>注意到没有,Worker是被包装在Option内的</strong>,这里有两个点需要注意</p>
/ a" E- T6 p5 |' S0 g/ |8 t/ n<ol>
- g  \) ~( q4 P- ?( K# s: X<li>t.join 需要持有t的所有权</li>$ z) i8 d) j1 J* U2 o' |2 y' l
<li>在我们这种情况下,self.workers只能作为引用被for循环迭代。</li>
# x! N5 j+ J1 S5 q6 r( M</ol>8 j" j$ u+ i% c" m  Y) \
<p>这里考虑让Worker持有<code>Option&lt;JoinHandle&lt;()&gt;&gt;</code>,后续可以通过在Option上调用take方法将Some变体的值移出来,并在原来的位置留下None变体。<br>
; m# w; z! A' d0 F8 |换而言之,让运行中的worker持有Some的变体,清理worker时,可以使用None替换掉Some,从而让Worker失去可以运行的线程</p>
0 t0 d3 n$ P$ R$ a& \<pre><code>struct Worker where
* z5 c+ p: O/ V' b+ b2 K{3 s& ?& i( z/ X, W2 H
    _id: usize,2 T1 {' g9 C2 m& X0 @" N/ c8 V
    t: Option&lt;JoinHandle&lt;()&gt;&gt;,, X+ C$ b4 I) H+ A# m: {2 T7 e
}
3 n; q$ j+ C6 Q- ]$ [  o- a</code></pre>+ T6 o, y) Z: B. y8 M$ k
<h1 id="要点总结">要点总结</h1>" ]" ^: m2 {9 `0 Z
<ul>
* y5 E: P! Z8 m9 l<li>Mutex依赖于生命周期管理锁的释放,使用的时候需要注意是否逾期持有锁</li>2 l* b0 i- v. E* e4 Q
<li><code>Vec&lt;Option&lt;T&gt;&gt;</code> 可以解决某些情况下需要T所有权的场景</li>
( o/ X8 w% e9 K$ Z& F6 m</ul>) m, t# t# M. U  p$ A
<h1 id="完整代码">完整代码</h1>
4 L4 u1 D7 n. N+ O% q<pre><code>use std::thread::{self, JoinHandle};
% X/ |5 ]& X" S6 {2 l% W2 u6 `use std::sync::{Arc, mpsc, Mutex};
% _- s0 ^9 P7 d, Z+ H
* W8 y6 J% c: ?3 u/ e% t) b
/ @9 h# G; ?* J5 _. o& |, F- Mtype Job = Box&lt;dyn FnOnce() + 'static + Send&gt;;1 N, E1 C0 w  n$ G
enum Message {5 a. T  B6 ~6 y5 e7 I# h* N
    ByeBye,+ l2 h/ U0 o) e6 Z& e- V+ a
    NewJob(Job),
0 Y6 y( ^2 b' W4 O}
9 @7 m2 h# _' D
/ c# m* k, d' ?8 t* L# qstruct Worker where1 p# V: _6 D& V/ ?% [. g$ N
{
) k- {, E% U6 \1 `    _id: usize,/ |! V4 V* q3 Y
    t: Option&lt;JoinHandle&lt;()&gt;&gt;,+ E1 Z: ^8 n' ^
}
( i( R; Q. G  G: F( w4 X( ]  L, O- f
5 y! Y1 x( l* d- w1 A* `impl Worker+ z  ~' U: N0 d0 M( i6 L5 `0 ^
{
. x7 E$ }; `/ i    fn new(id: usize, receiver: Arc::&lt;Mutex&lt;mpsc::Receiver&lt;Message&gt;&gt;&gt;) -&gt; Worker {7 e! g! d* t& w& D
        let t = thread::spawn( move || {
1 ]7 Z# W/ J* d: K/ m            loop {8 N/ n; N# E& i7 h/ |
                let message = receiver.lock().unwrap().recv().unwrap();
& t7 t0 u- |" a: X- ^3 y9 k                match message {6 p; D2 F; K6 d6 y
                    Message::NewJob(job) =&gt; {* ]8 j' D( `5 v3 V7 {
                        println!("do job from worker[{}]", id);9 X. n9 C% E4 f" c' o9 t3 u
                        job();
  P* h8 y8 o1 @: A                    },$ A6 T& t" P+ y& C" C' _# {
                    Message::ByeBye =&gt; {) h4 P6 n  ^+ M  A2 }  T
                        println!("ByeBye from worker[{}]", id);
8 r7 [0 i6 i$ \7 H                        break
; D+ \% l& H% t9 f" t                    },
4 D: S5 \/ P0 H3 [2 o% |8 r; e                }  9 p- ^1 c) C1 s4 g' A  j$ m
            }
# h8 H) c$ F3 @" X, z# X1 z( B        });# A8 r0 ]; o3 l6 A* |

+ Z$ O5 L. l3 J9 [        Worker {1 r) o% q: @/ v
            _id: id,, O' e; f( }7 m$ N' l
            t: Some(t)," b& Y" r; C- O: E* R6 O* `: e
        }, ^* {  Z; ?& G. `) Y
    }
' ~7 Y' Y3 o+ T5 ?' `}
+ {( V. ~3 Y: g) n8 }8 v  m  v
pub struct Pool {3 k/ u3 F# V- e1 U$ d: }# ^! Q7 u
    workers: Vec&lt;Worker&gt;,) d0 K4 o8 s6 b! A! n
    max_workers: usize,
8 \% e1 u* ~- a5 }/ l  M8 e! w* A) l    sender: mpsc::Sender&lt;Message&gt;( m5 }2 ~' u6 s3 I4 c
}" J. s4 b) c/ s" N7 I0 `

8 w* S- P) {) L7 v  R& K! k) ximpl Pool where {, T0 s+ V1 W7 J. a7 F9 @
    pub fn new(max_workers: usize) -&gt; Pool {" T5 }2 y5 A2 t8 c6 ?3 [% L# r) a3 z
        if max_workers == 0 {
( d/ m5 T0 \1 I+ [  j2 r            panic!("max_workers must be greater than zero!")
2 X5 b2 G1 {8 }' i1 a8 u        }1 R1 U* c4 e, Z  C: Y
        let (tx, rx) = mpsc::channel();  P4 c9 Z8 P" W" L$ I
* b$ x6 z  i: e2 a& d, W1 I
        let mut workers = Vec::with_capacity(max_workers);
7 H1 w) c# E/ d  w        let receiver = Arc::new(Mutex::new(rx));
  M' X" Y. j- G- l6 a        for i in 0..max_workers {
: `% y1 Q7 x+ d- ?& W            workers.push(Worker::new(i, Arc::clone(&amp;receiver)));/ c6 ?6 q6 W) T. W) R
        }
- i) E6 W5 n$ v+ _; Z( k$ f% E8 S6 c, w: N1 C0 s0 ?
        Pool { workers: workers, max_workers: max_workers, sender: tx }* w( X% x4 n/ F( c0 T  g8 x% N/ A
    }& |. }- l1 n) j7 ]
   
  l5 F0 ?7 f4 @/ i, z' `    pub fn execute&lt;F&gt;(&amp;self, f:F) where F: FnOnce() + 'static + Send
& z8 E2 U' N# O6 {4 }& f    {, c0 [9 V, d" L/ W7 C7 O

! m. C) [7 a- |        let job = Message::NewJob(Box::new(f));3 |$ Y! q2 |5 ^7 {2 q
        self.sender.send(job).unwrap();
! R- J6 h- K- @, I" ^; v1 @    }
7 o: z: H2 K" L' A7 `}
" M1 [/ e$ V! A2 t. _& Q* e  _
- k1 W4 u; ~$ H, }1 B- ^impl Drop for Pool {
) I5 I8 }, Q; |/ S2 m    fn drop(&amp;mut self) {; z! c+ b# k' w3 A& j- q! u9 D% s
        for _ in 0..self.max_workers {
/ v* G. A* n+ H            self.sender.send(Message::ByeBye).unwrap();* l3 D5 p2 Y1 W
        }% j" ^) T) R% D7 D
        for w in self.workers {+ g2 }7 G: \# \
            if let Some(t) = w.t.take() {
$ _- U+ l% i* O; f/ `3 c  }                t.join().unwrap();7 u2 _3 r+ a) m' I' {
            }
2 C  P4 y3 J+ Y0 \4 e, B        }
  n* y- w/ g) {' W. Q1 [  S5 Q    }) f6 e: ]4 B: B+ e* @
}- ~3 G; A- Z0 o/ @

, Y! q1 Y2 l! \: I+ }# ]" r5 C2 t
#[cfg(test)]
/ E* ~" v3 e* D" H1 Lmod tests {
5 \3 n' m0 o+ j4 l  E" y: i    use super::*;
, }" N  X' h$ X$ _* d$ a    #[test]- Y" \3 L9 F+ L# p
    fn it_works() {
, |8 c4 ^  l5 R2 N% ]" N% J        let p = Pool::new(4);6 l2 h# X0 f2 Z7 g8 t# N& }; L
        p.execute(|| println!("do new job1"));
2 d) g( _  @6 b! {' d        p.execute(|| println!("do new job2"));* M- ~9 Y# }) ]/ A  I$ P
        p.execute(|| println!("do new job3"));* X2 E; U/ \2 w4 B( m4 {
        p.execute(|| println!("do new job4"));
: Y9 L6 U1 ^; |    }9 X9 Y3 x/ X+ K) G; q  T# q+ W7 C
}7 u8 h- C! m3 B, [+ P5 T
</code></pre>8 R9 I8 v* _" Q1 F9 i; F( U

3 S9 L2 d5 x5 w  x6 s1 u7 s3 I
回复

使用道具 举报

懒得打字嘛,点击右侧快捷回复 【右侧内容,后台自定义】
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

手机版|飞雪团队

GMT+8, 2025-12-11 13:16 , Processed in 0.063725 second(s), 22 queries , Gzip On.

Powered by Discuz! X3.4

Copyright © 2001-2021, Tencent Cloud.

快速回复 返回顶部 返回列表