飞雪团队

 找回密码
 立即注册
搜索
热搜: 活动 交友 discuz
查看: 8249|回复: 0

rust 实战 - 实现一个线程工作池 ThreadPool

[复制链接]

6379

主题

6467

帖子

2万

积分

管理员

Rank: 9Rank: 9Rank: 9

积分
21461
发表于 2022-2-12 14:35:42 | 显示全部楼层 |阅读模式
) c1 y4 L0 @4 z- l
<h1 id="如何实现一个线程池">如何实现一个线程池</h1>
+ M) ~7 D0 j3 F& o6 `  B<p>线程池:一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。 例如,对于计算密集型任务,线程数一般取cpu数量+2比较合适,线程数过多会导致额外的线程切换开销。</p>+ _5 _; o; d* Y6 R% ~# ~* z8 j
<p>如何定义线程池Pool呢,首先最大线程数量肯定要作为线程池的一个属性,并且在new Pool时创建指定的线程。</p>
* Q3 d1 ]9 y# a% l<p>线程池Pool</p>
* c1 r8 o) F* e6 e, u1 x- s<pre><code>pub struct Pool {
; p7 M* [: F% ~4 o6 J  max_workers: usize, // 定义最大线程数
7 z  X; v0 n$ n: x( c# J  f}
' J; u2 c5 L: p" p& C7 W5 V( P* l: a* k" x. X( y
impl Pool {1 |) m( N; h! ?! z, F
  fn new(max_workers: usize) -&gt; Pool {}& C% n" E6 o, `& Q- X
  fn execute&lt;F&gt;(&amp;self, f:F) where F: FnOnce() + 'static + Send {}( h2 {3 y  p; C/ b8 a) \
}8 m/ F) G/ }+ v( @

: l) G& u: C4 M8 z. C* M</code></pre>
. y+ q& }  ], w8 T<p>用<code>execute</code>来执行任务,<code>F: FnOnce() + 'static + Send</code> 是使用thread::spawn线程执行需要满足的trait, 代表F是一个能在线程里执行的闭包函数。</p>2 c3 n1 M6 y2 d+ m' p0 y
<p>另一点自然而然会想到在Pool添加一个线程数组, 这个线程数组就是用来执行任务的。比如<code>Vec&lt;Thread&gt;</code> balabala。这里的线程是活的,是一个个不断接受任务然后执行的实体。<br>
' x" M5 o9 U* A# ^1 D. k2 p% a可以看作在一个线程里不断执行获取任务并执行的Worker。</p>1 B7 l2 [0 h1 ^: T. W( @+ \
<pre><code>struct Worker where
4 ^; B; Q& q' \" U7 |{
# i7 i: U6 q" X6 P5 Y/ [& A    _id: usize, // worker 编号
# U" S) v5 l8 V7 j: A}
7 A8 j9 f3 o6 O8 c/ x5 O' M' M</code></pre>; ]1 n9 C0 a: ^6 x& A, o9 ^6 w
<p>要怎么把任务发送给Worker执行呢?mpsc(multi producer single consumer) 多生产者单消费者可以满足我们的需求,<code>let (tx, rx) = mpsc::channel()</code> 可以获取到一对发送端和接收端。<br>3 F% n8 n7 \; s( N: C) Y# e7 B
把发送端添加到Pool里面,把接收端添加到Worker里面。Pool通过channel将任务发送给多个worker消费执行。</p>
6 e, @" C: q: |- s$ D<p><strong>这里有一点需要特别注意,channel的接收端receiver需要安全的在多个线程间共享</strong>,因此需要用<code>Arc&lt;Mutex::&lt;T&gt;&gt;</code>来包裹起来,也就是用锁来解决并发冲突。</p>
3 r# Q1 Z- D* O5 ^<p>Pool的完整定义</p>7 z0 j, I; |  V6 x5 J  ]
<pre><code>pub struct Pool {
4 Z0 H- {# r+ v+ t9 R, I    workers: Vec&lt;Worker&gt;,
6 {2 A0 p) K9 k5 a* U; v    max_workers: usize,/ Q( O9 D( m! s+ I; f( `
    sender: mpsc::Sender&lt;Message&gt;
  z: K9 T% b+ n) L3 Q# v+ ]4 q}
9 N, y) i, |$ ~% ?( W; [3 z0 V</code></pre>
8 y" \5 ~: b/ y5 ~4 D- O" ~. K<p>该是时候定义我们要发给Worker的消息Message了<br>
% F, I: w7 m% j  O; s) e定义如下的枚举值</p>
7 l+ M* ]! X7 T9 d9 J6 m! e& D2 _<pre><code>type Job = Box&lt;dyn FnOnce() + 'static + Send&gt;;
/ P5 H% O( v* P0 `% Menum Message {
/ v% B) f6 b5 K. U    ByeBye,/ `9 C9 s* B1 Q( I4 F+ E+ F, B
    NewJob(Job),6 H; e# V3 K0 ]- g7 P: j
}# a* `! X4 R" N3 Y. d
</code></pre>
. h9 d! c) e4 X' k<p>Job是一个要发送给Worker执行的闭包函数,这里ByeBye用来通知Worker可以终止当前的执行,退出线程。</p>
. g4 _# j3 ?, J6 D<p>只剩下实现Worker和Pool的具体逻辑了。</p>  R* E9 G$ h" i) y7 o$ r& o+ z  B
<p>Worker的实现</p>
. T7 }- C' j. ^* U- A$ c% z5 g<pre><code>impl Worker
0 t5 t* E1 U; F; B2 q, `{
: Y3 Q1 H0 w6 ]4 u6 n    fn new(id: usize, receiver: Arc::&lt;Mutex&lt;mpsc::Receiver&lt;Message&gt;&gt;&gt;) -&gt; Worker {
; j7 Q( U4 H; l        let t = thread::spawn( move || {7 i8 a1 d) Z2 N8 O) B3 l- b
            loop {
5 {3 [$ J! a! H' ^5 \                let receiver = receiver.lock().unwrap();( G/ A2 u3 `6 D1 m7 n" U7 t* \
                let message=  receiver.recv().unwrap();- T# Z. |. J" f3 e/ I0 G4 o' D
                match message {& O1 X* h! L% w8 a
                    Message::NewJob(job) =&gt; {
3 j/ d2 Q% {5 B+ ]6 ~2 s                        println!("do job from worker[{}]", id);  f$ o5 o7 S* I' l+ @
                        job();
2 g- [1 ^# ?, t) h2 P+ t                    },7 _9 y$ ^2 e' X, |9 N$ [7 |& l% C
                    Message::ByeBye =&gt; {# o' S# e; d0 A" z2 y3 A; f
                        println!("ByeBye from worker[{}]", id);3 A# u3 n! t- K) N
                        break
" o3 T1 a2 a4 ]2 D/ l( N: m                    },
0 Q- }- V1 t# E% j9 f                }  % Y: P5 x! z' U) Z& K
            }
1 k3 X5 v' b: t5 N7 q        });1 n( X3 t, j0 B' T; W* }

% p. H# j/ J5 n1 k/ a, `, b/ x        Worker {
* o0 j- d8 K; e- h; {# n+ L  t$ D            _id: id,4 s! [0 K5 Q$ L: o
            t: Some(t),7 `4 c8 J0 A! u
        }
0 A' ^9 `! @6 {$ I% {$ m5 Z    }
# P$ g9 z8 j) v( Y- l, V* N}
& x+ F6 s4 {& }3 }</code></pre>$ K; E4 q$ i/ c, j
<p><strong>let message = receiver.lock().unwrap().recv().unwrap();</strong> 这里获取锁后从receiver获取到消息体,然后let message结束后rust的生命周期会自动释放掉锁。<br>
& b+ W% H( u: _2 a但如果写成</p>* v9 [8 E0 S6 J& O4 o4 |
<pre><code>while let message = receiver.lock().unwrap().recv().unwrap() {
9 u2 M) W2 U3 X! v5 w};% L# `( o8 f) T& r# D
</code></pre>* J( V) {; t& F- f% c" A
<p>while let 后面整个括号都是一个作用域,要在这个作用域结束后,锁才会释放,比上面let message要锁定久时间。<br>3 o& G; d6 D) M! H1 I/ s5 v! r! M
rust的mutex锁没有对应的unlock方法,由mutex的生命周期管理。</p>2 N# g2 y" R3 p) Q
<p>我们给Pool实现<code>Drop</code> trait, 让Pool被销毁时,自动暂停掉worker线程的执行。</p>
3 E3 ?8 k0 X+ q, R<pre><code>impl Drop for Pool {
+ A9 q, |. Y" W1 N% J    fn drop(&amp;mut self) {3 b& v4 a3 j8 ?/ K) P
        for _ in 0..self.max_workers {% r8 s- B' q: R' Q0 R
            self.sender.send(Message::ByeBye).unwrap();( m/ r5 c9 d4 U" F! f6 T' M3 H6 b
        }' @8 @9 N4 M# i# N4 A' w
        for w in self.workers.iter_mut() {, U: w3 O7 d/ ]7 i( b
            if let Some(t) = w.t.take() {
8 t# K. r. [& s# K% ]* u                t.join().unwrap();. S( ?5 G6 Q. [, k* N: _0 b7 W
            }* e7 E/ e1 A- v2 D7 g: y
        }1 m6 {/ P( Z$ A/ C, ^4 L* j2 K
    }% M7 M3 y: L; _! Z
}
) G5 Y) s7 \9 G2 e( i0 S. V3 E) o( ^$ R* o9 i) r
</code></pre>
9 I) z* V8 u3 ~6 `: p<p><strong>drop方法里面用了两个循环</strong>,而不是在一个循环里做完两件事?</p>
! r& d  P+ `9 y4 {2 B1 z<pre><code>for w in self.workers.iter_mut() {
; q, O8 ~5 ?& l+ t* L, I& E    if let Some(t) = w.t.take() {! o8 X" [4 f$ s" R6 ]
        self.sender.send(Message::ByeBye).unwrap();5 Y) w+ B) }" L" K4 ?- R. `5 v
        t.join().unwrap();
. \( V0 m# _4 K' q: z; V    }( k7 v$ C* O7 y! z! J3 J
}
: }+ D' n6 D' U% y3 @- D6 l; u# h. P# ~, r& z) \
</code></pre>
* m$ l0 ~+ M  B8 [# r, [! Z<p>这里面隐藏了一个会造成死锁的陷阱,比如两个Worker, 在单个循环里面迭代所有Worker,再将终止信息发送给通道后,直接调用join,<br>, @3 s! |: y' W
我们预期是第一个worker要收到消息,并且等他执行完。当情况可能是第二个worker获取到了消息,第一个worker没有获取到,那接下来的join就会阻塞造成死锁。</p>0 s8 P" S9 [5 |% f9 U+ C# M% H
<p><strong>注意到没有,Worker是被包装在Option内的</strong>,这里有两个点需要注意</p>
, n# m$ E8 u6 p' ^+ \<ol>
3 r5 e3 D& A* D; \* C! U: E- B<li>t.join 需要持有t的所有权</li>
' x: {1 F  ?+ Q2 l<li>在我们这种情况下,self.workers只能作为引用被for循环迭代。</li>" M5 z& \9 F( m" F8 O# m9 f) R% A
</ol>
8 F( n6 X( U0 w$ N7 F/ v<p>这里考虑让Worker持有<code>Option&lt;JoinHandle&lt;()&gt;&gt;</code>,后续可以通过在Option上调用take方法将Some变体的值移出来,并在原来的位置留下None变体。<br>
) ]5 R5 d& Y+ ]: S. \- [) h换而言之,让运行中的worker持有Some的变体,清理worker时,可以使用None替换掉Some,从而让Worker失去可以运行的线程</p>, V  }, o, C3 j$ Z; [0 T
<pre><code>struct Worker where
1 i+ M" g7 R4 }+ r& ?{
* ]/ H4 }& l8 o3 |7 _; y    _id: usize,
# Y. R0 ~7 a8 e. w    t: Option&lt;JoinHandle&lt;()&gt;&gt;,6 r  I$ Q+ ?: V" U
}: }; q! S9 ?6 W7 }+ D
</code></pre>0 x& n/ @) O- V
<h1 id="要点总结">要点总结</h1>
  u6 g9 l& v, X$ K, o1 G: e/ ?$ f<ul>
4 _4 k) X8 E  I! d; Z3 ?<li>Mutex依赖于生命周期管理锁的释放,使用的时候需要注意是否逾期持有锁</li>5 s, S: Z9 a/ R0 O2 J
<li><code>Vec&lt;Option&lt;T&gt;&gt;</code> 可以解决某些情况下需要T所有权的场景</li>
3 E: h8 R# |) o% T) Y9 V% s  z7 j! a</ul>
# K& _; {3 F1 E, f<h1 id="完整代码">完整代码</h1>7 F' |4 h" Z* G
<pre><code>use std::thread::{self, JoinHandle};5 p, }/ K( M2 j+ |
use std::sync::{Arc, mpsc, Mutex};% c" `  K! p2 ^5 s& ?7 x  V

& S0 D" _" Y6 X8 {9 w! D
$ Z1 }1 c/ Y: ~' ]type Job = Box&lt;dyn FnOnce() + 'static + Send&gt;;( X7 ]9 |3 D4 ^  |' w+ {
enum Message {
% }2 W( k3 g0 Q    ByeBye,
/ x0 W/ Y: i" B7 \7 _2 y2 [8 r9 i% N    NewJob(Job),
3 c# m  j# D( ^$ x}4 c8 T5 p" {. C% F; V6 s
+ ^4 g0 ~6 g6 E. p
struct Worker where  j/ Z, W) z8 |' b8 ?9 m' P6 F
{
8 A$ u% a. E* i4 b6 K    _id: usize,8 W5 R" W0 Y; k, K
    t: Option&lt;JoinHandle&lt;()&gt;&gt;,
4 S3 D1 }( J8 {+ v}9 w! s3 y; z. M# s# I6 f# T3 ]

6 a- C$ _% k) f* v" z! q  W& Pimpl Worker1 d0 c( C; Y" ]4 u7 ]! J9 ~+ s
{+ r- f- p& e. ^$ t& t+ _
    fn new(id: usize, receiver: Arc::&lt;Mutex&lt;mpsc::Receiver&lt;Message&gt;&gt;&gt;) -&gt; Worker {! v; l9 f% e' @3 c! n4 Q
        let t = thread::spawn( move || {
1 E7 ^: S& _9 H  d            loop {1 H  n  E8 Z# f
                let message = receiver.lock().unwrap().recv().unwrap();; a7 f( V+ }  ^$ `% V! K
                match message {
+ u+ m) b( p  E. x, b0 T                    Message::NewJob(job) =&gt; {* s( B+ \( y# c3 H
                        println!("do job from worker[{}]", id);
0 D9 x; H+ b  _8 }                        job();" a: b4 i" \6 j9 V. c' p
                    },
) @4 F3 G$ x& X6 u                    Message::ByeBye =&gt; {
# {6 N3 T& \7 ~) j- t3 G" F                        println!("ByeBye from worker[{}]", id);
0 z: [( q7 A7 H  k) E% U                        break
0 L1 [  k8 C+ C9 S- S                    },* b( x( O0 ^0 R3 B  G4 K
                }  + R! A9 J) H! k  Q( O5 l5 s
            }
7 }$ F2 q# W' |        });
* D0 f* q( }# V3 j+ H* H4 U- b8 A/ b9 \5 f5 d3 p
        Worker {- Z$ D. t6 y" L4 }
            _id: id,2 O* o, z# n: m" i
            t: Some(t),
  b; Q& f% U* ^8 \! O) y        }
6 U' M, s& N. r5 e4 y- x/ J5 L. N; {    }
1 I+ J! Y' l  _1 D' t( j, \0 B$ y2 S}) s0 M5 j" l  B% p( h  `

1 o7 A. e+ A$ [pub struct Pool {/ u0 p  {  f, ~& ]' ]2 [
    workers: Vec&lt;Worker&gt;,& m7 `8 _% q2 M' w& D0 D+ M  f0 X
    max_workers: usize,* ?" G3 d# q& K+ |% v
    sender: mpsc::Sender&lt;Message&gt;
( q" ^- M- E& ^6 t' L}2 f) b0 J) U* ^1 c
9 r& t* P5 H  c$ U
impl Pool where {+ D$ A% `1 S9 }7 I" ]: E$ i. [* @
    pub fn new(max_workers: usize) -&gt; Pool {* v9 z5 ?- |5 H# _! {
        if max_workers == 0 {
) I5 f8 V, b, }! l* U+ h, l$ f8 H5 ?" n            panic!("max_workers must be greater than zero!")4 D4 P9 ~# N0 K
        }: R/ K# [5 g# L5 L/ U  k
        let (tx, rx) = mpsc::channel();6 X  T* `  q, |% |
7 L; e& }% s6 W0 j  z7 N
        let mut workers = Vec::with_capacity(max_workers);* |. ?2 M0 D/ Q$ \/ E
        let receiver = Arc::new(Mutex::new(rx));
+ Q- o1 D2 Y7 p        for i in 0..max_workers {
. a: b+ V* o5 G+ _            workers.push(Worker::new(i, Arc::clone(&amp;receiver)));
# f6 u6 J: e0 }9 N' \5 ~  q        }
' f) B9 W$ Q) v1 u
3 p$ V# ^" [. ]# t6 x! Z. f        Pool { workers: workers, max_workers: max_workers, sender: tx }
4 _$ ?$ G1 l+ M# N/ ^( f* I  [0 v# m- Y' @    }5 D$ w4 k, D9 u1 Q/ h
    : x, r9 |1 D, E( l4 Y$ {! B
    pub fn execute&lt;F&gt;(&amp;self, f:F) where F: FnOnce() + 'static + Send6 R: x5 J% c+ s! [7 J0 y
    {( }% \4 y& {0 }1 @. `9 s: ?8 v
( Y- E" {9 W) o( C
        let job = Message::NewJob(Box::new(f));
+ A" q( b1 W0 K1 S9 C  G) g        self.sender.send(job).unwrap();
: e4 r: T$ w9 S! E    }9 X: P, D0 }: n+ k# }: o
}
0 z/ |. o0 n3 V5 q7 T+ K7 i& B( U; c4 h2 }
impl Drop for Pool {
) ]* W. b# Y- h7 r! i: V6 h    fn drop(&amp;mut self) {$ T5 e! Y  x8 d% E' s
        for _ in 0..self.max_workers {
  T/ i0 Z9 U9 F6 w6 O+ @- L8 @# k6 c            self.sender.send(Message::ByeBye).unwrap();
6 Y4 D3 h% q0 @0 T1 m        }. W8 T" m8 r9 V# @$ P9 U9 w
        for w in self.workers {. s) W' ]( f7 s4 ~) E5 M- E) Q2 _
            if let Some(t) = w.t.take() {" j; X& n' ]( U; ]" a' K1 H' r$ D/ Y- `
                t.join().unwrap();1 w2 D0 \* H. d
            }1 r2 _$ X0 ~" f! @6 k! J
        }8 i, Y0 A. \0 R+ g
    }6 W1 l" S9 @+ j
}' H- V# T0 I9 y" a4 J  h
! t( ~, }0 \9 N: }

3 Y. m1 P3 e+ A1 m1 w#[cfg(test)]
1 J. k, z$ e8 bmod tests {
) r: W1 x  S; _6 l# u3 [    use super::*;2 F1 _/ v1 b; }: G, `
    #[test]; M8 s+ m' g) h- \* S
    fn it_works() {
' g8 L- g- P4 X3 e        let p = Pool::new(4);1 w! `. V5 U% C- z8 Y6 s" ^
        p.execute(|| println!("do new job1"));  h3 r) L7 ?+ m
        p.execute(|| println!("do new job2"));7 w& K9 v8 N2 e8 H9 [, Y
        p.execute(|| println!("do new job3"));
+ T- B: l+ f2 @        p.execute(|| println!("do new job4"));) d6 R0 |9 ]+ g
    }
: s8 ?* }, I* U/ Y2 J, d}
1 `9 |$ j) g- U! R( I</code></pre>3 m# [; j4 j9 J( ]- H) v
# c7 u) p8 s+ j; V: h6 t: q
回复

使用道具 举报

懒得打字嘛,点击右侧快捷回复 【右侧内容,后台自定义】
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

手机版|飞雪团队

GMT+8, 2025-4-17 14:53 , Processed in 0.062877 second(s), 22 queries , Gzip On.

Powered by Discuz! X3.4

Copyright © 2001-2021, Tencent Cloud.

快速回复 返回顶部 返回列表