飞雪团队

 找回密码
 立即注册
搜索
热搜: 活动 交友 discuz
查看: 15110|回复: 0

rust 实战 - 实现一个线程工作池 ThreadPool

[复制链接]

8131

主题

8219

帖子

2万

积分

管理员

Rank: 9Rank: 9Rank: 9

积分
26723
发表于 2022-2-12 14:35:42 | 显示全部楼层 |阅读模式
! Y$ l% j( x5 C3 T
<h1 id="如何实现一个线程池">如何实现一个线程池</h1>: B: }  J" k" g
<p>线程池:一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。 例如,对于计算密集型任务,线程数一般取cpu数量+2比较合适,线程数过多会导致额外的线程切换开销。</p>
5 ?' Z, a4 P$ k  H<p>如何定义线程池Pool呢,首先最大线程数量肯定要作为线程池的一个属性,并且在new Pool时创建指定的线程。</p>/ V. }" p8 V6 f8 ^$ f/ e) w: h4 O
<p>线程池Pool</p>+ C8 d. U. J4 z8 q( a
<pre><code>pub struct Pool {. u* Y( c3 g8 r* m, f* M/ k( d
  max_workers: usize, // 定义最大线程数
) g& O; i4 ~; S/ M5 U}' ~3 Y* y' A0 l6 ^

; M9 [9 v9 T& ?, Gimpl Pool {
2 H! Q+ U# E# \2 ~  fn new(max_workers: usize) -&gt; Pool {}
  F$ S" k% H; j" \  fn execute&lt;F&gt;(&amp;self, f:F) where F: FnOnce() + 'static + Send {}+ O* `& ~( @) c1 V4 ?" o
}3 r4 z6 K; w, I1 `7 {
8 D: d- t7 |' w; d7 G- ?+ s) ]
</code></pre>
  i( A- @9 r' s/ R<p>用<code>execute</code>来执行任务,<code>F: FnOnce() + 'static + Send</code> 是使用thread::spawn线程执行需要满足的trait, 代表F是一个能在线程里执行的闭包函数。</p>
" P' o* L5 `0 Z( h<p>另一点自然而然会想到在Pool添加一个线程数组, 这个线程数组就是用来执行任务的。比如<code>Vec&lt;Thread&gt;</code> balabala。这里的线程是活的,是一个个不断接受任务然后执行的实体。<br>! r+ U) w, d( v! C5 ]
可以看作在一个线程里不断执行获取任务并执行的Worker。</p>
7 g, \* [& t' y* s<pre><code>struct Worker where
3 q0 @% Z  x; Y8 f2 P& Z" U: t' J{
5 t% n4 q/ l* ]    _id: usize, // worker 编号2 b) W/ ~  V  g) D
}
# K& S' ^/ M: [! C2 K) F9 N/ `</code></pre>. O5 w7 [  T, `+ W
<p>要怎么把任务发送给Worker执行呢?mpsc(multi producer single consumer) 多生产者单消费者可以满足我们的需求,<code>let (tx, rx) = mpsc::channel()</code> 可以获取到一对发送端和接收端。<br>
+ X  g5 H: N, ^3 Q把发送端添加到Pool里面,把接收端添加到Worker里面。Pool通过channel将任务发送给多个worker消费执行。</p>7 g- S* _2 X; q% m& P( ]4 V
<p><strong>这里有一点需要特别注意,channel的接收端receiver需要安全的在多个线程间共享</strong>,因此需要用<code>Arc&lt;Mutex::&lt;T&gt;&gt;</code>来包裹起来,也就是用锁来解决并发冲突。</p>( \5 u$ F5 X  [( ^
<p>Pool的完整定义</p>2 n; e2 L! v% Z7 F. q2 e8 v
<pre><code>pub struct Pool {
, W; ~! X1 w' J9 D& @    workers: Vec&lt;Worker&gt;,
' W; K. V3 F( r& F) E. |    max_workers: usize,
! j- z1 U5 `- m$ W; M: ~    sender: mpsc::Sender&lt;Message&gt;. t; `5 j2 q8 D0 o  @. L9 G: Z6 {
}
8 p0 o8 s0 z5 {</code></pre>. p" w& c8 o/ N3 P. ~9 `/ D
<p>该是时候定义我们要发给Worker的消息Message了<br>* q* P: l# D- P$ Z+ `. O
定义如下的枚举值</p>3 E9 N! i3 Y! x2 r4 C; p+ g: C+ e- W
<pre><code>type Job = Box&lt;dyn FnOnce() + 'static + Send&gt;;  S) O6 y* ^7 w
enum Message {
6 l, w4 j& k8 _4 B% f0 Z    ByeBye,
) K6 S# n0 s0 d0 C" o    NewJob(Job),) \6 ^! W, ]6 Z! c
}3 v# ~- A5 S9 f6 M( B- u
</code></pre>/ U# V% c: m2 ^( u& G
<p>Job是一个要发送给Worker执行的闭包函数,这里ByeBye用来通知Worker可以终止当前的执行,退出线程。</p>: F! f9 }. K$ s. S- f. Y( g
<p>只剩下实现Worker和Pool的具体逻辑了。</p>. ~+ z3 ]8 Q* c/ G) Q$ w: E5 s" d
<p>Worker的实现</p>
; U* d& v" B- l% v- {' @* p' X<pre><code>impl Worker1 c( P2 j) l3 u. r9 Q
{/ ?6 b" p1 Y2 a, W
    fn new(id: usize, receiver: Arc::&lt;Mutex&lt;mpsc::Receiver&lt;Message&gt;&gt;&gt;) -&gt; Worker {
6 ]/ c, G+ h; h: N2 z) G        let t = thread::spawn( move || {
7 t7 G, s' l5 Z# |+ @7 N7 X            loop {9 A. E. e% b0 W' N3 s$ W
                let receiver = receiver.lock().unwrap();5 }/ k, |  g9 S
                let message=  receiver.recv().unwrap();4 J- q% d) S! w1 q4 ~" C0 k1 u
                match message {* `3 E$ ?! H4 v3 a" l$ u
                    Message::NewJob(job) =&gt; {# b% D1 K/ I/ l2 m2 X. ?4 A
                        println!("do job from worker[{}]", id);) g$ W. H2 K8 W; x' p
                        job();$ r$ h7 c5 {, [: O8 U
                    },( O- J2 g# X% L5 n
                    Message::ByeBye =&gt; {5 C8 W: _/ t1 h  c( k
                        println!("ByeBye from worker[{}]", id);
2 ?  ?/ F" H! {* \( v                        break
; |$ h. x! O, ~/ a. g  M                    },
& C! @: v0 F0 b1 P. s# j                }  
; ^8 ?$ g! t/ E2 H$ m, Q/ f; @! }2 J( m            }# S/ i) b, g$ G% x. w- i5 i. P
        });
8 t  V: j0 V) Z/ {% i
- k7 Y3 j: r+ w* \1 V        Worker {& t! h9 g" A& \" |+ n8 Z; @5 l
            _id: id,! i- ~3 n/ u9 z3 D8 l# |
            t: Some(t),; R, K+ P! Z- Q. J" M
        }
0 n3 ?- b, O( |3 H# u5 K    }
: b# R4 v; w8 R! ]9 |) B: r}9 R: v8 e( ^, ], V0 L6 A* N5 D
</code></pre>: l+ r9 E* k, \8 Y7 {! d- q" E
<p><strong>let message = receiver.lock().unwrap().recv().unwrap();</strong> 这里获取锁后从receiver获取到消息体,然后let message结束后rust的生命周期会自动释放掉锁。<br>7 ^: h& [! F) l4 ]
但如果写成</p>
! ?6 O" d* y" V' z! l7 d, l$ }3 s' p<pre><code>while let message = receiver.lock().unwrap().recv().unwrap() {
( A% y4 g" r( G+ }& ^};
1 l) l( Z$ L, g) M2 e8 k! K</code></pre>7 u/ g9 N& E4 N+ J8 Y
<p>while let 后面整个括号都是一个作用域,要在这个作用域结束后,锁才会释放,比上面let message要锁定久时间。<br>
' }3 m& h4 M' N& ^7 v5 g3 C( Orust的mutex锁没有对应的unlock方法,由mutex的生命周期管理。</p>( H# @# \2 G" x
<p>我们给Pool实现<code>Drop</code> trait, 让Pool被销毁时,自动暂停掉worker线程的执行。</p>' a* R8 A( m& N+ ]% B! i( V
<pre><code>impl Drop for Pool {
! t9 B, X! s2 w2 L    fn drop(&amp;mut self) {3 |$ d  c$ [( d2 I# ]7 G! n
        for _ in 0..self.max_workers {7 }5 h: F' Z* Y: `
            self.sender.send(Message::ByeBye).unwrap();; n% d7 z6 ], d0 ?
        }! l! Q, C) R% b, w1 j0 I7 u
        for w in self.workers.iter_mut() {' N) `# x- y5 G; v0 O' L) w' u
            if let Some(t) = w.t.take() {0 q3 u, K& X+ v; D
                t.join().unwrap();
3 T) ?' t. w2 u) s3 U            }6 R  }5 D1 S) f
        }0 j& N* _$ r+ q9 Q
    }) A! L' \( F2 K- A
}9 O) A9 c- \5 e8 E$ k# J
2 _- {- G1 N2 V6 `- d3 ?  D3 C) |
</code></pre>( W  V5 p0 Z% [( T6 X' R0 h- B
<p><strong>drop方法里面用了两个循环</strong>,而不是在一个循环里做完两件事?</p>
- E8 a2 ^- F( Q$ G<pre><code>for w in self.workers.iter_mut() {& P8 e& c% ^1 g1 |: M
    if let Some(t) = w.t.take() {. a# u, l0 t9 v% M$ o- S
        self.sender.send(Message::ByeBye).unwrap();' }9 I# H$ K" i) h3 t
        t.join().unwrap();! v+ k# y( c2 ^. z% @7 f3 J
    }
( W# a6 c+ F9 i* v}
5 _  P; D+ d8 d" q: O# S1 t
) ^5 ]7 Z, m/ |3 i% g, a3 e</code></pre>
& U) L, F2 C- f% `% C+ R% I3 O<p>这里面隐藏了一个会造成死锁的陷阱,比如两个Worker, 在单个循环里面迭代所有Worker,再将终止信息发送给通道后,直接调用join,<br>
  R# |5 P7 P3 ~# r我们预期是第一个worker要收到消息,并且等他执行完。当情况可能是第二个worker获取到了消息,第一个worker没有获取到,那接下来的join就会阻塞造成死锁。</p>
8 L6 b/ p$ p; i% |5 F* \<p><strong>注意到没有,Worker是被包装在Option内的</strong>,这里有两个点需要注意</p>
  W8 f* [/ e# Q9 S6 Y5 M3 i. l7 `<ol>
4 {2 [/ U& V: Z4 E<li>t.join 需要持有t的所有权</li>
% w: C; P! D  _* C0 K' M<li>在我们这种情况下,self.workers只能作为引用被for循环迭代。</li>! {, G7 R0 k. V7 d) O  H
</ol>
2 j) j/ K$ R' G) y' `# T1 _<p>这里考虑让Worker持有<code>Option&lt;JoinHandle&lt;()&gt;&gt;</code>,后续可以通过在Option上调用take方法将Some变体的值移出来,并在原来的位置留下None变体。<br>" ]7 l$ \6 T+ N) M  @7 e
换而言之,让运行中的worker持有Some的变体,清理worker时,可以使用None替换掉Some,从而让Worker失去可以运行的线程</p>
- v! _# K0 p5 F- I<pre><code>struct Worker where9 |7 |3 e: Q0 |) ^5 F
{
3 g) E3 e* D# k* H6 \( b    _id: usize,4 F9 c5 }+ B" B3 i0 S
    t: Option&lt;JoinHandle&lt;()&gt;&gt;,
9 o6 B% P6 b7 P; S- e; |}' [, `) a+ `. D- }) R; j; Y
</code></pre>
% Q' [7 _- [3 O  K<h1 id="要点总结">要点总结</h1>4 M5 G! W' O* G/ G$ f
<ul>
5 i4 m+ C+ ]0 p<li>Mutex依赖于生命周期管理锁的释放,使用的时候需要注意是否逾期持有锁</li>3 ^- H, f2 d% ]8 F; y# E) R) g
<li><code>Vec&lt;Option&lt;T&gt;&gt;</code> 可以解决某些情况下需要T所有权的场景</li>
% j4 X7 }) p/ q- b: _& j</ul>; a- z) d2 x  I- h
<h1 id="完整代码">完整代码</h1>6 H% s" c  `0 i7 _
<pre><code>use std::thread::{self, JoinHandle};8 A  l3 Q( s! R' f2 U! s( j
use std::sync::{Arc, mpsc, Mutex};
& w6 ~. J- I. |( b* N
8 g% Z9 b) Q  ]; U2 S$ C0 t3 G) ?  I/ A
type Job = Box&lt;dyn FnOnce() + 'static + Send&gt;;$ |7 N- W' l5 S( I5 P
enum Message {
% d# I1 J; M  P5 I+ ~! N( R2 x    ByeBye," r- {5 @! i" B2 `4 r
    NewJob(Job)," E/ Q+ Q2 c+ H/ [5 F- @( [
}0 o7 J/ b/ X' k- l+ e

' [; y' J& A+ k. Ystruct Worker where: @3 m: N8 t: \5 E- H1 e+ w; Z+ P
{& G/ J9 o" s2 o5 c2 P9 F
    _id: usize,
1 M6 h8 E% h; j5 `# f! G* {7 W( z    t: Option&lt;JoinHandle&lt;()&gt;&gt;,
$ z) V. M( `5 j1 g, c6 [; [! d* A}  ~' E. E: K* y& z7 Y4 x. J
" @& Y1 g4 y0 x- _8 b# h0 O
impl Worker8 {9 }- s4 v& O4 J
{' K1 b# ^/ P& K, ~2 g+ E
    fn new(id: usize, receiver: Arc::&lt;Mutex&lt;mpsc::Receiver&lt;Message&gt;&gt;&gt;) -&gt; Worker {
: A" F: P$ f; K- ?! |        let t = thread::spawn( move || {
( Q' B; D$ N9 [+ v            loop {
1 J* h* K" K0 f% y4 o" R* e: m3 t1 Q                let message = receiver.lock().unwrap().recv().unwrap();! a/ r6 c. e) Q; Q1 [0 W
                match message {
8 t7 f/ y1 O% j1 D3 T& y" P                    Message::NewJob(job) =&gt; {' i3 c" }5 j+ x- _) t+ \2 F
                        println!("do job from worker[{}]", id);
1 Y  P" h) d" z: Z1 O% r! Q                        job();5 k& }6 w; [- {; g
                    },0 \! ^6 J. t' z' q9 P$ s" A* t6 f
                    Message::ByeBye =&gt; {
! ^8 s2 X) }" ~6 F- A                        println!("ByeBye from worker[{}]", id);
' z" J; O9 {5 I6 W. y% i                        break/ u$ R; U6 j2 Q5 b
                    },
7 u% s1 I: U8 s4 q                }  
  m- h/ C$ B  x- o2 D6 k( j; ^0 l6 I            }
) u+ }- i. S" k5 e3 r$ V2 l        });. R) L! i# f; E# f
) F  x+ ?# n$ @  ?2 C: W
        Worker {* x; A7 r5 F1 ~! I1 y
            _id: id,
* z& Z5 A% _$ ]$ E            t: Some(t),+ }* K) N9 x) \  M& i
        }
; u5 z2 \2 e/ u/ y; o' g, Y    }8 X$ E2 Y( J0 f* m2 A0 L# z
}
3 n1 }7 I" N4 R; f) {  V' d! ]6 ~4 Z* s6 L& ]  L( J
pub struct Pool {
$ \+ L- j  B% i0 t# l0 H    workers: Vec&lt;Worker&gt;,
' I9 S4 h: ~1 J& a    max_workers: usize,, {% |" [' A: Y+ o  R
    sender: mpsc::Sender&lt;Message&gt;
' N: z! {0 T; n: b$ F}
5 n! C6 g2 d- U) @$ ]( t* A: _
& f3 U4 E( u: I$ F* Z; Wimpl Pool where {0 m* u" o% e: \0 m" e, s
    pub fn new(max_workers: usize) -&gt; Pool {; {) C: v1 H6 k  B- \0 m+ t2 p
        if max_workers == 0 {; t& F, w5 T+ C2 q6 u
            panic!("max_workers must be greater than zero!")
$ E% F$ [! o& {        }7 ?2 t5 p1 z& N* l
        let (tx, rx) = mpsc::channel();
! z5 v' p2 T1 c# ?/ ?  [$ F8 g4 O6 o1 X+ e9 V
        let mut workers = Vec::with_capacity(max_workers);; h  B! O$ {7 O, L
        let receiver = Arc::new(Mutex::new(rx));
7 `$ U0 I( W0 B        for i in 0..max_workers {' U# Q% l0 Z' [4 f! F
            workers.push(Worker::new(i, Arc::clone(&amp;receiver)));& o2 f7 b4 d  j. \5 p
        }$ }) p. @' W, M7 {  o6 z" H+ P4 w. @
- X+ J# g0 k; v6 T+ r5 m0 q9 ~" [
        Pool { workers: workers, max_workers: max_workers, sender: tx }- g8 }8 ]/ y7 I" T- ?( |3 Q
    }1 F9 z. X' W# a+ j7 E
   
  ]( N9 s$ U7 q: T  ]  t. L- U/ ]    pub fn execute&lt;F&gt;(&amp;self, f:F) where F: FnOnce() + 'static + Send1 P9 k6 g: G( Q8 f5 N9 T
    {0 R1 ?4 a; o4 A/ d0 F
$ f# }0 V$ b# V' _  [9 N- C
        let job = Message::NewJob(Box::new(f));
$ L+ l! |. ^) ]6 S  f6 o        self.sender.send(job).unwrap();: K3 c$ S0 o1 [9 H8 V
    }
! B8 f# m+ s4 F}; X4 h: i, W, T1 f

% Y# N: A% z8 d7 T2 Wimpl Drop for Pool {& f& q& s) W3 Z- A0 z( l
    fn drop(&amp;mut self) {
8 w1 o1 q2 V9 S/ [) r/ n. f        for _ in 0..self.max_workers {3 J4 S6 i* F' z, N6 P
            self.sender.send(Message::ByeBye).unwrap();  q! N: Y1 m/ c7 l3 F& L  r8 Y
        }3 S+ t: P/ H' h& P
        for w in self.workers {
. H  W$ C! H$ Y' N: K: L: ~            if let Some(t) = w.t.take() {
1 F5 }& L; K; a                t.join().unwrap();; b7 ?, u2 l7 T9 w
            }# [8 _4 J: F" c) `* x
        }4 P" ?5 t1 N2 K7 E$ c( a
    }
9 X  Y7 y8 [$ [( b}. s) l% V5 B7 `# b( ~

4 m! X  I( m( R0 Q# y7 J  U2 R6 ^  W( u5 E: o
#[cfg(test)]# `( O+ X  d- F2 N" E/ n' e5 x
mod tests {
) n' t  L% v3 k% A1 ~) u    use super::*;
6 n* c  H! _# o- Y0 Y& a; B' G    #[test]
& X3 N0 i7 A0 v9 d3 j2 t    fn it_works() {
6 A$ e' P# k: M% Y        let p = Pool::new(4);
5 d) [! v/ V  G' J        p.execute(|| println!("do new job1"));" R. g: Q! N1 c6 e* j# g
        p.execute(|| println!("do new job2"));
% r$ B) y0 \; n+ ~: N$ n        p.execute(|| println!("do new job3"));
8 P7 G. `0 U# u# l        p.execute(|| println!("do new job4"));
* ^& U1 y' i. H1 W! A3 F    }9 X% R& N" x1 _  N) Q* @
}
4 p( Y, A3 @' |- [/ H</code></pre>
3 W/ y8 B- Y; P% ]
2 n* b, L' d; m. A' K; n% A
回复

使用道具 举报

懒得打字嘛,点击右侧快捷回复 【右侧内容,后台自定义】
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

手机版|飞雪团队

GMT+8, 2025-12-27 06:44 , Processed in 0.064132 second(s), 22 queries , Gzip On.

Powered by Discuz! X3.4

Copyright © 2001-2021, Tencent Cloud.

快速回复 返回顶部 返回列表