飞雪团队

 找回密码
 立即注册
搜索
热搜: 活动 交友 discuz
查看: 14568|回复: 0

rust 实战 - 实现一个线程工作池 ThreadPool

[复制链接]

8030

主题

8118

帖子

2万

积分

管理员

Rank: 9Rank: 9Rank: 9

积分
26420
发表于 2022-2-12 14:35:42 | 显示全部楼层 |阅读模式
+ J* P4 O: s2 L: S. k
<h1 id="如何实现一个线程池">如何实现一个线程池</h1>/ E$ u% Y4 ?2 s- P! y1 P1 O8 f
<p>线程池:一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。 例如,对于计算密集型任务,线程数一般取cpu数量+2比较合适,线程数过多会导致额外的线程切换开销。</p>
6 W/ d8 H% E2 z3 R<p>如何定义线程池Pool呢,首先最大线程数量肯定要作为线程池的一个属性,并且在new Pool时创建指定的线程。</p>
" w0 ]% B5 I+ d<p>线程池Pool</p>
) ?! g4 F, a' T& D0 ?+ C<pre><code>pub struct Pool {5 A1 A4 _  P) @$ E; j: o
  max_workers: usize, // 定义最大线程数( T) ~/ g3 h' f6 n
}
" ^6 u. ?7 g0 Y3 t( t1 a) \3 v5 c4 M4 X$ O9 w; f
impl Pool {
! n3 y8 _! N9 W( S  fn new(max_workers: usize) -&gt; Pool {}
! X+ R( g: H- P: y' P  h, v  fn execute&lt;F&gt;(&amp;self, f:F) where F: FnOnce() + 'static + Send {}; Y& @4 [3 B% y6 W7 o, y2 U
}
# X7 G/ q$ f; l* h0 b4 c1 b7 r. b& V; d+ n
</code></pre>1 \$ q7 x& p+ |- `
<p>用<code>execute</code>来执行任务,<code>F: FnOnce() + 'static + Send</code> 是使用thread::spawn线程执行需要满足的trait, 代表F是一个能在线程里执行的闭包函数。</p>6 O/ ]9 E8 D& m; W0 p
<p>另一点自然而然会想到在Pool添加一个线程数组, 这个线程数组就是用来执行任务的。比如<code>Vec&lt;Thread&gt;</code> balabala。这里的线程是活的,是一个个不断接受任务然后执行的实体。<br>
) j& h9 [" q0 S& b( n可以看作在一个线程里不断执行获取任务并执行的Worker。</p>
; y) W5 K' K- b& D<pre><code>struct Worker where
9 t6 F  R  z4 J) b{6 S: e$ `9 L3 s
    _id: usize, // worker 编号- R' K0 U' g/ O6 P
}5 u: v. C% Y* Z; u
</code></pre># I/ q. O8 ?0 B
<p>要怎么把任务发送给Worker执行呢?mpsc(multi producer single consumer) 多生产者单消费者可以满足我们的需求,<code>let (tx, rx) = mpsc::channel()</code> 可以获取到一对发送端和接收端。<br>3 Y2 ^3 ?* {1 S8 S# I
把发送端添加到Pool里面,把接收端添加到Worker里面。Pool通过channel将任务发送给多个worker消费执行。</p>
4 p( W6 b- Q1 A/ n, j; P<p><strong>这里有一点需要特别注意,channel的接收端receiver需要安全的在多个线程间共享</strong>,因此需要用<code>Arc&lt;Mutex::&lt;T&gt;&gt;</code>来包裹起来,也就是用锁来解决并发冲突。</p>
3 y! k% Z  Z" g) i* @- L<p>Pool的完整定义</p>4 b- K' q& T+ N8 h" d8 o
<pre><code>pub struct Pool {+ n9 D' B# P( L7 ~$ T0 x7 W6 f( _
    workers: Vec&lt;Worker&gt;,% H, `* R9 O$ l5 L
    max_workers: usize,
7 g) U+ w' L/ {/ _/ V9 q    sender: mpsc::Sender&lt;Message&gt;
' M: c5 J! d* `; C# X7 L. v3 y}; P" i/ w! r4 \( q7 O4 A$ B
</code></pre>
9 N  x( a3 _8 q+ b8 g<p>该是时候定义我们要发给Worker的消息Message了<br>; Y9 f' L4 W% E* S! A
定义如下的枚举值</p>
+ c8 D+ q, o0 H  k' n<pre><code>type Job = Box&lt;dyn FnOnce() + 'static + Send&gt;;$ a  b# I6 `3 C: K# W  Q! u
enum Message {; y$ X& I; @4 H! y0 v
    ByeBye,
# P% S! V( Q( m    NewJob(Job),
% f+ _: p' ]% z: ]6 v8 d: {! R}
; x9 r+ X: j3 i</code></pre>- h: I2 l5 A& G" x9 V+ N+ i
<p>Job是一个要发送给Worker执行的闭包函数,这里ByeBye用来通知Worker可以终止当前的执行,退出线程。</p>
! X# I; [+ l! H: n/ I. p9 E<p>只剩下实现Worker和Pool的具体逻辑了。</p>) U/ ]3 o" I8 H8 }# x% \
<p>Worker的实现</p>( W. p: A# A1 [" w
<pre><code>impl Worker
9 y2 s" u- n7 K/ y/ e$ d{
* G  l9 p- E; o2 l+ E    fn new(id: usize, receiver: Arc::&lt;Mutex&lt;mpsc::Receiver&lt;Message&gt;&gt;&gt;) -&gt; Worker {$ h  e+ }: k3 d- P  v- K
        let t = thread::spawn( move || {
0 A" O# [. M! m. M" _            loop {7 r3 Y1 Z0 M1 a- w$ N
                let receiver = receiver.lock().unwrap();
( R! D6 p' ]1 h, x                let message=  receiver.recv().unwrap();
$ F0 n# v& U3 \4 u: E5 W; o$ o                match message {
0 X  `4 o+ n. V! z6 e                    Message::NewJob(job) =&gt; {# I1 o% l: w( p' J. R, c
                        println!("do job from worker[{}]", id);
, R- {, S0 W% X/ O9 h/ u+ N$ M# W                        job();
- e9 C  e. |* A  J: ~: k                    },
' b5 j9 K4 ?- ]) ]3 r                    Message::ByeBye =&gt; {
! b' ?9 i) v1 F# z- {; o                        println!("ByeBye from worker[{}]", id);" y4 ^2 N) ?. H; D0 V( s' T' n
                        break
+ ~* r4 @( x5 G/ O                    },
2 G$ ]" r6 O4 P  }. c5 i* a( }                }  $ B4 d  M  j7 w- s+ X. }. X
            }
+ c4 `8 Z8 Y) J/ U6 ~        });" |8 v4 y' m4 d0 I" c0 ^

( m' i- G1 n$ b+ c; z! x        Worker {0 x% T5 {  w( K+ Z) B
            _id: id,
2 o3 E" c: G) T* U            t: Some(t),3 {& R/ R( Q; s4 M
        }" p( z! v! l* q. ^) q
    }
1 g8 U" g/ [' I2 `}
( o3 i+ q- }) G) O4 _, o) k</code></pre>' P1 B) z* ^4 T
<p><strong>let message = receiver.lock().unwrap().recv().unwrap();</strong> 这里获取锁后从receiver获取到消息体,然后let message结束后rust的生命周期会自动释放掉锁。<br>
2 u0 F! z- k1 v但如果写成</p>
1 F0 w$ D4 r/ r<pre><code>while let message = receiver.lock().unwrap().recv().unwrap() {
, }7 l2 Z: r* g; M. a7 P};
8 @  Y8 K7 m6 D9 [</code></pre>' g, K9 M+ @; D3 G- o* p" b- I
<p>while let 后面整个括号都是一个作用域,要在这个作用域结束后,锁才会释放,比上面let message要锁定久时间。<br>
  k2 S( q, _$ w! D  |rust的mutex锁没有对应的unlock方法,由mutex的生命周期管理。</p>
+ i8 s6 B0 ]6 h* @! r<p>我们给Pool实现<code>Drop</code> trait, 让Pool被销毁时,自动暂停掉worker线程的执行。</p>" t, e6 ^( X$ e6 B$ e- l
<pre><code>impl Drop for Pool {. D( U3 y  h" X3 D% ^  K
    fn drop(&amp;mut self) {/ O. n9 f6 ^8 d3 C  A) J1 E, p
        for _ in 0..self.max_workers {
: }0 O# m& h( V, d- w. B            self.sender.send(Message::ByeBye).unwrap();
- H; c8 w+ W$ _9 X        }( H. S9 r% m% d( M  \5 k' N
        for w in self.workers.iter_mut() {7 W9 B! |7 y( k* y: r
            if let Some(t) = w.t.take() {
5 M$ h' {- b% k& z+ d. @4 K, V, }- P                t.join().unwrap();
# Z, K0 f4 ?% i1 I& t            }. \/ h$ C* p- {, Y  F
        }1 c1 Z4 |6 C2 Q3 D: s
    }1 s( j% d9 D! f5 N0 [( Y( K
}$ ]) m' f& H- c

5 w3 U4 h. |" x( f9 Y</code></pre>6 Q) k% g2 G; @+ s
<p><strong>drop方法里面用了两个循环</strong>,而不是在一个循环里做完两件事?</p>
' p, t9 `, x; C4 _9 x0 V<pre><code>for w in self.workers.iter_mut() {) X3 O: o% n, ]! S4 s& o
    if let Some(t) = w.t.take() {. c2 p* W. t6 i& B
        self.sender.send(Message::ByeBye).unwrap();" q9 P. I( W% O
        t.join().unwrap();
6 `: O9 B- ~* h+ R0 V* @    }3 N2 G' A% P  {% S
}# a% N7 R1 L+ O* p2 B# U" g

4 P  L2 \* j6 P</code></pre>  g: }9 V- a9 s, k% T* ^0 T8 t7 r/ g
<p>这里面隐藏了一个会造成死锁的陷阱,比如两个Worker, 在单个循环里面迭代所有Worker,再将终止信息发送给通道后,直接调用join,<br>
8 Y- ~6 X6 F6 L+ i9 g, Y. [0 Q我们预期是第一个worker要收到消息,并且等他执行完。当情况可能是第二个worker获取到了消息,第一个worker没有获取到,那接下来的join就会阻塞造成死锁。</p>
0 Y3 c* O; B5 Y2 n$ `<p><strong>注意到没有,Worker是被包装在Option内的</strong>,这里有两个点需要注意</p>
9 D# i  p9 ^; ^3 k3 y1 a6 ~<ol>
- X1 q, U* S, d' e7 w0 w) \- r<li>t.join 需要持有t的所有权</li>/ q0 f+ |8 p' |  L! ~5 ?
<li>在我们这种情况下,self.workers只能作为引用被for循环迭代。</li>/ E% h0 @( c$ o$ i8 }; x
</ol>: l; T( p# N. o! L# x7 \
<p>这里考虑让Worker持有<code>Option&lt;JoinHandle&lt;()&gt;&gt;</code>,后续可以通过在Option上调用take方法将Some变体的值移出来,并在原来的位置留下None变体。<br>4 M% f! P7 e4 R4 x2 v
换而言之,让运行中的worker持有Some的变体,清理worker时,可以使用None替换掉Some,从而让Worker失去可以运行的线程</p>+ j1 b( O9 x" e1 [- K% b1 E% R
<pre><code>struct Worker where
+ B6 q, t0 J0 r/ A* B+ H( ]2 Y{2 A7 O, V; g1 R( q5 ^! l
    _id: usize,
  L- \1 B% Z. [7 c# ~+ ?1 V* w    t: Option&lt;JoinHandle&lt;()&gt;&gt;,5 r( M# o* s3 [
}
/ ?; ]$ F. o) E</code></pre>
% T! b3 I# k* A, q$ W1 D7 {<h1 id="要点总结">要点总结</h1>7 y9 u7 D5 M& g+ P7 k
<ul>  }# A& R! S% s3 ?* J1 o% J+ Y9 V
<li>Mutex依赖于生命周期管理锁的释放,使用的时候需要注意是否逾期持有锁</li>6 V3 X- e4 G3 c: E7 \& d, U) m6 c
<li><code>Vec&lt;Option&lt;T&gt;&gt;</code> 可以解决某些情况下需要T所有权的场景</li>
2 F! l( j& S8 E) J3 F& @</ul>4 Q8 R; b$ Y1 I/ y- a; W  x  M0 v# Y
<h1 id="完整代码">完整代码</h1>0 Q: G7 v9 ~- m" L
<pre><code>use std::thread::{self, JoinHandle};
7 j) P1 P* U7 f' t3 X( Vuse std::sync::{Arc, mpsc, Mutex};
  Y# l/ b/ |9 Y4 Z7 E& u9 [" ^  P! ^$ ^

! Z6 _* N( V1 ]) A& M. qtype Job = Box&lt;dyn FnOnce() + 'static + Send&gt;;, K. s( [. j: Q
enum Message {
8 i9 f4 z+ ^, I& y    ByeBye,; a  q$ A5 J+ X! Z- S' C
    NewJob(Job),' @. o0 J! x: J
}% E4 D4 w/ y4 R9 w
" [+ e( t/ ~7 r  x8 _7 l
struct Worker where
1 S+ q' s7 z; _! u{
2 ^0 [. p# ^) r, o4 ~    _id: usize,
7 ~' p4 g( f, @' H! N+ P  ^' h4 w  V    t: Option&lt;JoinHandle&lt;()&gt;&gt;,
6 D. |4 R3 `. c$ @. @}: L: V( [* N/ S$ j% [
3 ?3 h+ z/ e6 [" b* d$ b
impl Worker
0 ]( a8 I: Y+ B0 J  r5 @; D% h{$ ~5 A0 V# l$ W  d; d* K3 ^$ F
    fn new(id: usize, receiver: Arc::&lt;Mutex&lt;mpsc::Receiver&lt;Message&gt;&gt;&gt;) -&gt; Worker {1 @* o( e* L. r/ U* _
        let t = thread::spawn( move || {
2 @" ^( w0 y+ y0 f            loop {
/ B; N2 ]% h9 |; l. O/ i' b  h                let message = receiver.lock().unwrap().recv().unwrap();
* E/ M( c1 x0 w# o  P+ }7 u& C                match message {
: I- x' e" V8 m5 T0 x; x: h2 C                    Message::NewJob(job) =&gt; {
* S' `2 `- F! ]6 ]' O& `& Z5 z# r                        println!("do job from worker[{}]", id);
2 k. J% @. [/ y                        job();
1 n5 ?+ U7 ?' M# U                    },/ Z  g) O# a9 @5 I. }. S
                    Message::ByeBye =&gt; {
) ~( s$ x& e  F! A  ~4 r                        println!("ByeBye from worker[{}]", id);
) ]+ |: m9 Y! F& c: |                        break6 y% {( n, e9 |4 f, N/ W6 N
                    },1 }6 u- x5 j* J3 `
                }  
1 N$ `6 }1 \+ k' A! p% p            }/ J) E+ r& E, k) a/ J) F
        });0 g. K" y  E( l
- [; u6 {% H& N+ l" g
        Worker {
6 m' d1 p" `- a- h) }# E, ?            _id: id,
, d! o  `3 O2 p: n            t: Some(t),# o, _9 o. q( G& p- Q8 s0 V
        }6 J$ W: E" M5 S9 j2 x7 ]4 \
    }
! p# K3 |: e: E: S# g}
9 D" s, K; ^7 @. a% ?/ P
* n1 P, V, a/ m! h( Q1 ]* e6 Gpub struct Pool {
* f" F+ D8 Y9 b    workers: Vec&lt;Worker&gt;,
6 ^; T$ \7 Y+ T) e; O# C+ J2 O    max_workers: usize,. d$ T  o$ B+ [5 p  g! O2 r
    sender: mpsc::Sender&lt;Message&gt;, y+ N* ]5 c) ^
}
7 N# N8 P5 _* U* z9 i& F: K2 ^
impl Pool where {
/ m3 L4 e/ C1 z) [. N9 U    pub fn new(max_workers: usize) -&gt; Pool {9 r8 T5 n# e- a# R$ t* {3 J- k
        if max_workers == 0 {/ p7 y* V3 [8 `: w
            panic!("max_workers must be greater than zero!")
- [7 J) \+ ^+ y        }$ A  @( ?' Q1 f, U4 Q& C7 u
        let (tx, rx) = mpsc::channel();
7 Z1 R; ~" u# Z& T; ?' P
! c" K8 I' A4 L$ s5 C3 G        let mut workers = Vec::with_capacity(max_workers);
% f+ N" x$ p9 f" I0 @        let receiver = Arc::new(Mutex::new(rx));
. V4 o, I; ?" b. }        for i in 0..max_workers {+ d# c* V4 ?3 i* P" U0 K, m
            workers.push(Worker::new(i, Arc::clone(&amp;receiver)));- _; A; x; x: ~  ~: n4 ^
        }
: I$ k3 `. \, A: B/ W
  G% N; ]! b, b7 N        Pool { workers: workers, max_workers: max_workers, sender: tx }5 s! p8 y4 L! D: l
    }
' s- R: ]; G: \  J4 @  C4 W+ [    + J7 N7 J$ ?0 t3 t) w
    pub fn execute&lt;F&gt;(&amp;self, f:F) where F: FnOnce() + 'static + Send$ T3 F2 u' W) `, f: Q; `. R9 e
    {: l, e% e/ m2 b' O. K4 Y- G1 X
7 w1 N7 c' f$ Q$ N! |: }8 v
        let job = Message::NewJob(Box::new(f));; I* ^# B3 l, ~0 M" {) x
        self.sender.send(job).unwrap();" t# f, t1 \  n, v1 ?/ N
    }
7 C# G6 v4 H4 n2 Q- K+ \8 S}
5 Y: Y  j9 k5 F5 y  C, l" F+ i
) ]$ m. w% |% R7 |, R9 vimpl Drop for Pool {
& m. b$ k! r8 A0 k    fn drop(&amp;mut self) {
+ Y- r5 i( W* h7 X3 Z6 O        for _ in 0..self.max_workers {
% j4 M. N) }6 j4 B& u            self.sender.send(Message::ByeBye).unwrap();) q, z- ~5 }0 m& r1 V4 d
        }
6 i& c. i3 g. n6 f9 a% L+ P        for w in self.workers {& D* O5 n/ x( N3 r
            if let Some(t) = w.t.take() {( J3 C  C5 f( H9 Z" [
                t.join().unwrap();
$ R+ q; Q0 p3 u2 s$ m- H            }9 a. G% t8 g2 f
        }
- s+ E* s4 T/ D3 ?4 Z  O9 }/ n' i$ D    }+ s( P: p, B9 ~: v/ r/ X  M
}( Q( s- O9 ~( }1 d

& T9 V4 g+ Y$ h$ H% b. l  A$ M/ H( a$ L
#[cfg(test)]
5 A5 y' w; y9 m1 q, K6 W7 zmod tests {3 K1 ~& X4 \' A
    use super::*;
4 O% \, `3 e; P; v6 v    #[test], `% R" M2 y9 H( E$ _" X
    fn it_works() {
$ Y) k2 N+ J, K1 ]* l        let p = Pool::new(4);! k3 `# m3 b  j
        p.execute(|| println!("do new job1"));
5 X: D. A/ C, D3 i7 o8 A        p.execute(|| println!("do new job2"));1 {7 G4 [" ]- F- Y2 W9 r1 e: y
        p.execute(|| println!("do new job3"));
- n+ V1 x# ~5 g5 \" c8 y/ N" L        p.execute(|| println!("do new job4"));* m: m: R, w0 b- A
    }* E# [9 Y, t; C3 N8 H5 l7 r* a
}
8 W6 g5 h# o, q1 B* E' n</code></pre>5 ^5 M$ Y7 m. o7 Q+ ?! o  P0 W
& D* F1 X" y$ z4 P# k- i" @* M6 S
回复

使用道具 举报

懒得打字嘛,点击右侧快捷回复 【右侧内容,后台自定义】
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

手机版|飞雪团队

GMT+8, 2025-12-2 13:12 , Processed in 0.068316 second(s), 22 queries , Gzip On.

Powered by Discuz! X3.4

Copyright © 2001-2021, Tencent Cloud.

快速回复 返回顶部 返回列表