飞雪团队

 找回密码
 立即注册
搜索
热搜: 活动 交友 discuz
查看: 14597|回复: 0

rust 实战 - 实现一个线程工作池 ThreadPool

[复制链接]

8032

主题

8120

帖子

2万

积分

管理员

Rank: 9Rank: 9Rank: 9

积分
26426
发表于 2022-2-12 14:35:42 | 显示全部楼层 |阅读模式

2 S6 P3 O0 k7 Q% L<h1 id="如何实现一个线程池">如何实现一个线程池</h1>
0 @. k; ^+ V6 ?* ~9 B5 U<p>线程池:一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。 例如,对于计算密集型任务,线程数一般取cpu数量+2比较合适,线程数过多会导致额外的线程切换开销。</p>$ i( d5 b9 H  h
<p>如何定义线程池Pool呢,首先最大线程数量肯定要作为线程池的一个属性,并且在new Pool时创建指定的线程。</p>
2 o/ O/ y( b# {$ |* `6 J5 S1 h4 X6 C" }<p>线程池Pool</p>
& u# u& F* `. v! y6 v! [<pre><code>pub struct Pool {
7 Y- F7 T( j; Y" t; ]: ]5 Y  max_workers: usize, // 定义最大线程数
. W: Z6 A. {) D& X" |$ p/ i}
$ x, ?; H4 u7 y5 M( u6 n* v- U5 g  P# h$ N
impl Pool {
5 J/ Y9 n  }; F; o  fn new(max_workers: usize) -&gt; Pool {}
9 r% Y2 I2 c$ j  K2 T3 h" y" C) f. r  fn execute&lt;F&gt;(&amp;self, f:F) where F: FnOnce() + 'static + Send {}- b+ c, Z4 W* R+ \- S; e3 K' g
}+ Y0 Y8 i0 c: i  C

5 Q- a* b% R3 W* {" |. J" @</code></pre>9 E3 Q  d) N7 x5 U9 _, L# Q3 {3 b
<p>用<code>execute</code>来执行任务,<code>F: FnOnce() + 'static + Send</code> 是使用thread::spawn线程执行需要满足的trait, 代表F是一个能在线程里执行的闭包函数。</p>0 E3 I  u. e8 e  _, D0 s/ e
<p>另一点自然而然会想到在Pool添加一个线程数组, 这个线程数组就是用来执行任务的。比如<code>Vec&lt;Thread&gt;</code> balabala。这里的线程是活的,是一个个不断接受任务然后执行的实体。<br>' [0 p, s3 x- M. z: y1 z  b. Y" ^
可以看作在一个线程里不断执行获取任务并执行的Worker。</p>
  [! V8 O* k5 Y; U5 e<pre><code>struct Worker where0 B1 k9 [8 a1 b6 T* z* y
{* e7 d3 T8 U6 i* m  C
    _id: usize, // worker 编号
* H; q( }8 Z0 y# Z: Q}- p, t0 A$ ?- V: ~3 j$ R
</code></pre>( V; p# _( Z/ w4 @: c
<p>要怎么把任务发送给Worker执行呢?mpsc(multi producer single consumer) 多生产者单消费者可以满足我们的需求,<code>let (tx, rx) = mpsc::channel()</code> 可以获取到一对发送端和接收端。<br>
* t1 m! w: S8 @' N% y把发送端添加到Pool里面,把接收端添加到Worker里面。Pool通过channel将任务发送给多个worker消费执行。</p>
7 E$ g  J; u6 R: X1 W* ?- S<p><strong>这里有一点需要特别注意,channel的接收端receiver需要安全的在多个线程间共享</strong>,因此需要用<code>Arc&lt;Mutex::&lt;T&gt;&gt;</code>来包裹起来,也就是用锁来解决并发冲突。</p>
- c9 \- I* g- S% W) a. C& I<p>Pool的完整定义</p>
$ \* x  s3 |  n. N! E<pre><code>pub struct Pool {# N) B7 ]  J9 h. c+ j% o) d( r
    workers: Vec&lt;Worker&gt;,! Z/ e2 A6 R! s
    max_workers: usize,
, i9 B" k+ k$ e8 Q8 o5 s    sender: mpsc::Sender&lt;Message&gt;8 |3 @4 g1 b  q3 O* C6 _
}
6 @* A# [4 T8 W</code></pre>
2 T. ^3 ^0 g7 L# j1 H! x! E; `<p>该是时候定义我们要发给Worker的消息Message了<br>
! G  R1 X' |1 R$ |% D定义如下的枚举值</p>
$ g4 B7 I  I! ^) S- }2 s<pre><code>type Job = Box&lt;dyn FnOnce() + 'static + Send&gt;;( u' W8 n% @, o- ^3 o5 R! f
enum Message {
5 \6 n0 J  l+ t# g" @/ r% w    ByeBye,
- ^7 a2 Y: i( ]& I7 K# _    NewJob(Job),
( R$ i% O, D+ V+ J/ S' k* p}! q- O( n; t# e3 A3 b* v
</code></pre>
& \5 r$ Y3 }/ S4 G: z& e& v; W<p>Job是一个要发送给Worker执行的闭包函数,这里ByeBye用来通知Worker可以终止当前的执行,退出线程。</p>
! G+ y+ O  @8 E; J. @* Y( ^- k% _<p>只剩下实现Worker和Pool的具体逻辑了。</p>
' s( g9 [* X& k<p>Worker的实现</p>9 H8 l* V3 p7 |/ s) ?
<pre><code>impl Worker
/ `( k0 R: I& T5 y, C  n{; `6 r& Z3 |: c
    fn new(id: usize, receiver: Arc::&lt;Mutex&lt;mpsc::Receiver&lt;Message&gt;&gt;&gt;) -&gt; Worker {
2 K( Y2 Y; l' T8 w+ G3 ^        let t = thread::spawn( move || {
+ s! g0 d6 i: R6 R; g            loop {- r' Z- n- T' a
                let receiver = receiver.lock().unwrap();
6 U+ T+ K- z& G* `' x1 d' i3 P                let message=  receiver.recv().unwrap();9 P- {9 b( c! s! O6 l
                match message {
( ]; B& e+ f( s- n8 J                    Message::NewJob(job) =&gt; {
" z! V1 J% W/ x: s                        println!("do job from worker[{}]", id);
0 B( r8 j" j. J4 ^9 H                        job();, B/ y& l. Z6 s- K" j
                    },* n/ I* O3 _- w+ N
                    Message::ByeBye =&gt; {% W0 ^' J8 r" j! K3 K; Z
                        println!("ByeBye from worker[{}]", id);
3 [7 b0 Y) ^) |8 U( l                        break
0 J; C: O( h" C+ d- X+ E                    },
3 A. w' [9 v' u! S& t                }  : ^) q9 W" U6 H8 \1 `* O1 D
            }) g! y, z/ m* ~. p, |$ S
        });& B& m/ p/ O+ w5 {! }# L

9 O$ Z% k1 ?1 N' I        Worker {/ |, v) E# N( H4 {( k
            _id: id,
* h( Z# Q" i, c. V6 ]# r7 u            t: Some(t),! ?- [* N. b! B5 i4 I5 F
        }
4 G4 y  L0 g# {& C/ a5 ?    }
; O" \- L! N4 w- p) |1 a}7 K( j1 g8 P" Y8 h5 t$ l
</code></pre>8 d) z  {% {5 w9 ~5 N; }
<p><strong>let message = receiver.lock().unwrap().recv().unwrap();</strong> 这里获取锁后从receiver获取到消息体,然后let message结束后rust的生命周期会自动释放掉锁。<br>
5 w; t) r& |- w2 ], e! G但如果写成</p>% r; c, E, I$ l* _  z8 z3 S
<pre><code>while let message = receiver.lock().unwrap().recv().unwrap() {. F( R/ }' _& }. ~# [
};& H0 h' N! U2 a' l' a! X1 }
</code></pre>
' }9 k% A; U: ~% v( E! P" I3 g% T<p>while let 后面整个括号都是一个作用域,要在这个作用域结束后,锁才会释放,比上面let message要锁定久时间。<br>+ n( n4 h+ n' Y9 D  v
rust的mutex锁没有对应的unlock方法,由mutex的生命周期管理。</p>
% M# g% y6 o- n3 A5 G2 Q' E9 d9 O8 P<p>我们给Pool实现<code>Drop</code> trait, 让Pool被销毁时,自动暂停掉worker线程的执行。</p>3 K0 O* P4 c+ f( d
<pre><code>impl Drop for Pool {
2 T3 r+ q: e5 Y- z* W3 g    fn drop(&amp;mut self) {
' V# O0 y5 \* k% W9 o! g2 {- w3 h        for _ in 0..self.max_workers {+ O& o9 ]/ f4 O* e5 E3 W
            self.sender.send(Message::ByeBye).unwrap();
% t4 Z% w1 m1 P* u( ^6 V. O        }3 _. [& F6 }' `1 ^! I" t7 O
        for w in self.workers.iter_mut() {
: L$ O& ]% R& r; U# b2 j$ I' x            if let Some(t) = w.t.take() {: x: X& B4 J% y
                t.join().unwrap();, V  q8 H+ R# R$ a
            }
& \' I" N0 r3 N4 |& d6 W7 C        }" U, O. M: k- ?  J8 F
    }9 @& G' x: r+ V6 \, X3 c
}
, r3 k/ M" \$ g' Z6 K. c. }
: F3 ]3 l7 J, I8 B  u9 L</code></pre>0 p7 P  Y& r5 D9 @/ W3 G
<p><strong>drop方法里面用了两个循环</strong>,而不是在一个循环里做完两件事?</p>
; }# o! z5 _0 n% _, K4 F3 w<pre><code>for w in self.workers.iter_mut() {" |4 @" R- j+ [1 x2 g) f
    if let Some(t) = w.t.take() {& w! D; e' B/ Y
        self.sender.send(Message::ByeBye).unwrap();
  r. Q% D1 i( Z! Q# m4 X' b$ `0 x+ [        t.join().unwrap();
; A& v9 U% z2 y, q( A    }
& }( t! T" e* i4 x% v1 p/ n) U- O}
: f3 L; s; W& W+ I) W
+ {( V' z5 r. M4 E2 W</code></pre>
" s9 T. l8 [+ U* T) I8 A4 w<p>这里面隐藏了一个会造成死锁的陷阱,比如两个Worker, 在单个循环里面迭代所有Worker,再将终止信息发送给通道后,直接调用join,<br>! p9 M6 p' ]& p- ]3 M9 _# r
我们预期是第一个worker要收到消息,并且等他执行完。当情况可能是第二个worker获取到了消息,第一个worker没有获取到,那接下来的join就会阻塞造成死锁。</p>3 @7 D& I5 N( R0 f  B  K4 d
<p><strong>注意到没有,Worker是被包装在Option内的</strong>,这里有两个点需要注意</p>
3 q/ ]# r5 B( O# b. P( I<ol>
$ l1 G" G2 E9 f3 S3 P<li>t.join 需要持有t的所有权</li>
) B; K4 d$ k  J<li>在我们这种情况下,self.workers只能作为引用被for循环迭代。</li>7 R: V4 |: S, W: q0 Y1 \
</ol>
& m) m7 }% x+ `9 a<p>这里考虑让Worker持有<code>Option&lt;JoinHandle&lt;()&gt;&gt;</code>,后续可以通过在Option上调用take方法将Some变体的值移出来,并在原来的位置留下None变体。<br>; Y( r& u. k6 @: @9 L4 [' l
换而言之,让运行中的worker持有Some的变体,清理worker时,可以使用None替换掉Some,从而让Worker失去可以运行的线程</p>) x6 m$ _' U: h1 p  J
<pre><code>struct Worker where: u; N7 ~' S4 L, J, b+ h% L0 L# a
{9 h+ k6 \2 h2 e4 w  R0 P4 N
    _id: usize,
+ X- d- L" p' X+ l7 d# m2 t1 i    t: Option&lt;JoinHandle&lt;()&gt;&gt;,( C- G0 \5 f& i( ?; z
}
8 Z$ P9 }$ F5 m1 E. [$ g6 T</code></pre>8 M4 L% \0 S( `9 p3 Q
<h1 id="要点总结">要点总结</h1>% f+ I1 e1 A8 K: i% q9 S
<ul>9 i  g4 u$ Q- k9 h7 m3 H) N
<li>Mutex依赖于生命周期管理锁的释放,使用的时候需要注意是否逾期持有锁</li>
, F* @/ \1 ?+ K) {  S! s: p- g<li><code>Vec&lt;Option&lt;T&gt;&gt;</code> 可以解决某些情况下需要T所有权的场景</li>
5 Z( `( a  o* ^- D8 V</ul>
8 r' y% T. L6 ?$ b0 F<h1 id="完整代码">完整代码</h1>" j, \1 j. q) ?
<pre><code>use std::thread::{self, JoinHandle};. b7 _& C. n1 E
use std::sync::{Arc, mpsc, Mutex};
1 u8 {) {* z$ h9 Z; g" M& {" y2 I/ T5 N! z- A6 t7 l/ y) S
6 M0 Q9 b! {0 N3 `7 _1 c; [) D
type Job = Box&lt;dyn FnOnce() + 'static + Send&gt;;' t. o; D7 u# W+ R6 y- o, U
enum Message {
8 H6 F+ P# b  i' l" B' Q' b; ~4 y    ByeBye,, J- y; P3 F8 N; Q
    NewJob(Job),
7 Q4 T8 n8 ]' q2 Y5 a/ c}
5 T- x) |/ o9 ~( v- {* S- \1 `! Z3 M: ]; y7 E
struct Worker where1 H3 u7 j$ o, |
{
$ A5 d9 @% V) F9 t    _id: usize,
* ?: {+ d) J7 B& Z. B( o    t: Option&lt;JoinHandle&lt;()&gt;&gt;,
0 F0 Q$ d1 u% p& q% ~, ?4 H}
3 L) h; ^+ Z7 P
1 s! o/ y3 j1 m* l7 Dimpl Worker
; [$ O6 {$ P( X/ l  w/ D{, m) t/ k9 L, N1 U$ u& K% U
    fn new(id: usize, receiver: Arc::&lt;Mutex&lt;mpsc::Receiver&lt;Message&gt;&gt;&gt;) -&gt; Worker {
6 d2 Y% O% t4 D6 w) Y8 N1 ?        let t = thread::spawn( move || {
5 q3 [( X+ I1 f: |" |% k            loop {
2 X" k) B. V$ L% e- ~! c4 L1 W# J                let message = receiver.lock().unwrap().recv().unwrap();
. z, p) i. O- m3 G* H2 i  V: D4 j                match message {- L# \4 z3 O* ~4 y! K9 o( t
                    Message::NewJob(job) =&gt; {1 z! z- |$ N( F' d9 D' @+ ~
                        println!("do job from worker[{}]", id);  U# ^! x' q$ J& j5 M3 |$ U
                        job();9 a# \* d* X; D! A" Z) L
                    },1 @% h% {# c3 Y: x* v7 K
                    Message::ByeBye =&gt; {
, y5 T4 v' m4 i! H                        println!("ByeBye from worker[{}]", id);$ J! a1 M& {- F" J2 @6 B" b; j% C
                        break
( p7 Z% m" R% Y; V: a                    },9 e9 Z! f2 g3 Z' b
                }  
8 ?( |  Z  H. D! t: z$ d# N3 e) ?            }' P/ g; P5 t6 j  n) J, K
        });5 H" f1 b8 @1 g; F2 T

; Y  }: ?2 n" v. ?6 ^. ?  n; C! s& S        Worker {( ~  d4 N3 ?# @2 f1 ~2 H
            _id: id,% ]6 {6 u! Q, Q; D' u: Y8 N/ u
            t: Some(t),- {5 e& I0 N3 ]3 @
        }3 c8 l* N: h+ x8 j% g* Q
    }8 k; _# T1 Y( a- A/ j
}- W  f7 C# G0 Y) G5 s
$ R$ S( r) ]/ d$ B
pub struct Pool {# Y1 Y+ k4 m+ ?5 a" H
    workers: Vec&lt;Worker&gt;,
9 B; a& o- ]8 ]  m    max_workers: usize,
2 H" k& C; N2 _1 S8 a4 K8 ^    sender: mpsc::Sender&lt;Message&gt;
! b9 v& X2 J/ U. N" y; |}
8 F; W# P$ W, T9 t% A; q. b) |$ q  }( D- k2 ]9 t1 l6 [5 k7 g3 h
impl Pool where {% t. F# |7 ~8 F- `
    pub fn new(max_workers: usize) -&gt; Pool {% h4 Q/ R: [/ d# [: m0 j6 d) X. J  q8 z8 `
        if max_workers == 0 {
. R8 f0 J% h& j: o7 A) |6 h            panic!("max_workers must be greater than zero!")
1 R+ _# l& O+ T: B5 |8 y5 r% s        }
( {1 {! x( z/ N! D$ w: j- |* q        let (tx, rx) = mpsc::channel();5 B# j2 q6 ?: A  ~* }6 Q
. v0 S% q  h" i% @' I) }
        let mut workers = Vec::with_capacity(max_workers);6 h: e- r  n' B, v9 {* v
        let receiver = Arc::new(Mutex::new(rx));9 Q: g* }. p9 o( B
        for i in 0..max_workers {
9 ?. Y5 Y9 a+ G            workers.push(Worker::new(i, Arc::clone(&amp;receiver)));# T# [3 u* i: p
        }" M1 L& B, Y$ y8 i# _4 [9 D0 k

+ N! C* x8 j- F6 {, W9 G# e& V( b        Pool { workers: workers, max_workers: max_workers, sender: tx }
, [1 t0 N/ Q) g2 N( o  J' i    }! y" I# X9 @: j, f
    ) p+ k- D; e$ h( o# I3 k5 @
    pub fn execute&lt;F&gt;(&amp;self, f:F) where F: FnOnce() + 'static + Send+ r! k( u% w+ x0 Q. K/ ?/ D
    {- {5 Q  A, T# e2 P1 |
; w% U/ Z" a; J1 o; q1 N
        let job = Message::NewJob(Box::new(f));
. _' Q3 l& ^+ e. l5 Z        self.sender.send(job).unwrap();
- x* K  [7 _" {1 w8 ]/ V    }
, O( o! {7 N$ O! I2 ^. g}
8 E7 h3 i% |: T. n+ X" v2 u' i- ?; g5 G5 W& a
impl Drop for Pool {/ |& n3 N, e7 z/ B5 `/ X
    fn drop(&amp;mut self) {
0 F1 \+ Q& D+ V8 s9 X% A        for _ in 0..self.max_workers {
5 M. s# X" R- t4 J3 H) W* F            self.sender.send(Message::ByeBye).unwrap();
0 q0 p/ G' c. C. h6 M8 n' C" K7 m        }
5 ], }0 F  w+ ?. T        for w in self.workers {* {( v/ W% o- B# G% D
            if let Some(t) = w.t.take() {1 [8 _7 l( B2 i+ _7 }& N; {
                t.join().unwrap();
9 t! }: S1 D/ n5 b            }, V0 h4 F. b7 A% U4 T( {9 f
        }  y5 d; J/ M; m5 ]
    }; X+ I% i9 ^! d' z$ N# s
}1 ~9 x+ a+ v  Y  v1 N# X) O

/ r" M+ U7 o4 w* B
" A5 @! u; n" F0 @#[cfg(test)]
# ]2 [5 h6 G' w  V, Lmod tests {0 z' u$ U# D# f, E5 _
    use super::*;
& i; c+ G& P4 O# C; q' R1 c9 X    #[test]2 t: i! m; x; {0 Q: N8 B4 a
    fn it_works() {. A/ u% n1 d# j( x  }3 j- p6 z0 M
        let p = Pool::new(4);
0 F' [5 ], d" U9 e7 x4 Q) Q. e        p.execute(|| println!("do new job1"));  V2 s" {2 H. T( r' N
        p.execute(|| println!("do new job2"));
2 D% k8 |% Y7 D# E        p.execute(|| println!("do new job3"));8 B* D% |( C! P0 k: }
        p.execute(|| println!("do new job4"));
* o# D; `0 l9 q+ `% o, l    }' G: Z" Q& {3 R
}
' l' X0 M+ B2 _  O6 h</code></pre>! o. b% B: l4 I- v" O  ?# z* d. p
3 |4 o$ [" l5 `$ O
回复

使用道具 举报

懒得打字嘛,点击右侧快捷回复 【右侧内容,后台自定义】
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

手机版|飞雪团队

GMT+8, 2025-12-3 17:06 , Processed in 0.064480 second(s), 22 queries , Gzip On.

Powered by Discuz! X3.4

Copyright © 2001-2021, Tencent Cloud.

快速回复 返回顶部 返回列表