飞雪团队

 找回密码
 立即注册
搜索
热搜: 活动 交友 discuz
查看: 7063|回复: 0

rust 实战 - 实现一个线程工作池 ThreadPool

[复制链接]

5945

主题

6033

帖子

2万

积分

管理员

Rank: 9Rank: 9Rank: 9

积分
20159
发表于 2022-2-12 14:35:42 | 显示全部楼层 |阅读模式
4 Y& y$ T( g2 E
<h1 id="如何实现一个线程池">如何实现一个线程池</h1>5 ^. B7 K! U, Y: i# z3 ^7 }( P# e
<p>线程池:一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。 例如,对于计算密集型任务,线程数一般取cpu数量+2比较合适,线程数过多会导致额外的线程切换开销。</p>
1 @; m/ V# L7 a$ ^<p>如何定义线程池Pool呢,首先最大线程数量肯定要作为线程池的一个属性,并且在new Pool时创建指定的线程。</p>
$ a7 @6 n- m3 }1 X) X; R  H<p>线程池Pool</p>
& i1 I3 B' p9 H7 E8 y' f<pre><code>pub struct Pool {
* c! ~" b$ l  |! v; D  max_workers: usize, // 定义最大线程数5 `; a8 j) }; N! b! q
}$ ~4 h  w7 b# `! h& J

' W: H1 I5 V( simpl Pool {$ l. z6 g6 T2 r: t
  fn new(max_workers: usize) -&gt; Pool {}
6 l2 n* t$ Q4 |$ O4 ?  fn execute&lt;F&gt;(&amp;self, f:F) where F: FnOnce() + 'static + Send {}3 y4 b* p# p: }+ C( _" K
}
% J$ H6 a  S1 I9 Y6 x) ?0 @- M; j6 ^# Q- v' D; B* A
</code></pre>3 ?1 a3 C/ E! ]7 w7 v# H
<p>用<code>execute</code>来执行任务,<code>F: FnOnce() + 'static + Send</code> 是使用thread::spawn线程执行需要满足的trait, 代表F是一个能在线程里执行的闭包函数。</p>
6 Q, g1 Q4 _3 U; h0 z<p>另一点自然而然会想到在Pool添加一个线程数组, 这个线程数组就是用来执行任务的。比如<code>Vec&lt;Thread&gt;</code> balabala。这里的线程是活的,是一个个不断接受任务然后执行的实体。<br>* f3 P. |2 N* k$ m' R; m
可以看作在一个线程里不断执行获取任务并执行的Worker。</p>- B' U: d9 V9 |0 h! s2 D
<pre><code>struct Worker where
* {* M* ^) e) g; d" l) q& b2 ]: L{
6 \& i& {9 B. J8 h( J! u1 G* ?    _id: usize, // worker 编号
; ?- X3 `9 t/ C0 H3 U6 ?8 @}7 B0 g- ?7 G2 _( \+ H5 K
</code></pre>
$ j$ s/ D1 r3 w1 V: V0 K<p>要怎么把任务发送给Worker执行呢?mpsc(multi producer single consumer) 多生产者单消费者可以满足我们的需求,<code>let (tx, rx) = mpsc::channel()</code> 可以获取到一对发送端和接收端。<br>: O8 g: ]9 w. I; ~; E
把发送端添加到Pool里面,把接收端添加到Worker里面。Pool通过channel将任务发送给多个worker消费执行。</p>- `4 C" g2 Q6 D1 x. i, s: s* c
<p><strong>这里有一点需要特别注意,channel的接收端receiver需要安全的在多个线程间共享</strong>,因此需要用<code>Arc&lt;Mutex::&lt;T&gt;&gt;</code>来包裹起来,也就是用锁来解决并发冲突。</p>4 f. E7 t% F% A# S- w1 W( O
<p>Pool的完整定义</p>
; w/ V1 W; s8 l% V1 e; W8 f<pre><code>pub struct Pool {
/ R; _8 ]6 m" T$ F/ l2 R- E: c/ H    workers: Vec&lt;Worker&gt;,& Z3 P; a" \2 t% a4 X- z  m% j  a
    max_workers: usize,
3 v$ G# {) n( B" l    sender: mpsc::Sender&lt;Message&gt;# j9 v2 ~2 D  H7 A1 z% H6 P8 S
}8 I: g( S% K3 k' A4 `9 ^% N4 g2 D
</code></pre>
& k7 t# x: k! ?, d" A/ F2 u) w( j<p>该是时候定义我们要发给Worker的消息Message了<br>
6 z2 i  K/ I+ f定义如下的枚举值</p>
# Q2 \5 G7 ^& A1 O<pre><code>type Job = Box&lt;dyn FnOnce() + 'static + Send&gt;;3 S( a$ e( E" T2 S/ F
enum Message {+ h! \* O0 i" [* s- C
    ByeBye,
5 w4 @/ H. f. R8 r% w' ?( ~4 u" R' |    NewJob(Job),8 @0 x+ ]% K+ p. q" J- W
}) ^; _8 |/ q' v
</code></pre>: a& T2 B4 |  `2 n
<p>Job是一个要发送给Worker执行的闭包函数,这里ByeBye用来通知Worker可以终止当前的执行,退出线程。</p>
1 r- p, ?% c3 G4 {7 b: l9 i2 w<p>只剩下实现Worker和Pool的具体逻辑了。</p>1 A* t# R& z% \5 A1 ~& F
<p>Worker的实现</p>7 S1 f/ T2 p) N2 w% A- f( [
<pre><code>impl Worker
" G. [1 m* }& S2 m8 Z5 n{' h% ?6 P4 J( i
    fn new(id: usize, receiver: Arc::&lt;Mutex&lt;mpsc::Receiver&lt;Message&gt;&gt;&gt;) -&gt; Worker {
7 ]8 m) s5 ]8 }0 |/ ~" ]* ~        let t = thread::spawn( move || {! Y3 x- o5 z9 J! X+ ^
            loop {
. d" y+ _. L& u1 @                let receiver = receiver.lock().unwrap();6 i- e& M5 e% p: s5 `) |
                let message=  receiver.recv().unwrap();! }! E" u' Q/ J% R: |
                match message {
8 }* s2 U- X+ v6 A1 E! Q                    Message::NewJob(job) =&gt; {
7 b, Y7 H1 V) S) t8 y0 p9 T# V& D                        println!("do job from worker[{}]", id);
; H8 M, Y, a) I' b9 R/ L                        job();
" F1 v% ], ~' l                    },
& @8 v$ V# ~4 }9 L% g                    Message::ByeBye =&gt; {2 R" `0 M( k( P; h' T( H' a
                        println!("ByeBye from worker[{}]", id);
" g, ?1 [2 C) Q6 H) Y# H3 Y                        break2 o7 E3 p# I, T! _
                    },
6 @+ S  k2 F+ Z  u- L                }  
1 ^5 F2 B& r* [            }$ |, T) o  q, c7 w9 j8 T' y
        });# r' D. |( F; l

, I% [( y- y' J        Worker {; o/ ^8 L) j- V  y" z
            _id: id,
* m* R0 B  Q7 b5 s$ P4 h# [$ |, T+ [9 ]            t: Some(t),6 \" m( W2 s: i3 {8 q& J0 i
        }
3 R5 w: T* m1 z) \3 h5 M    }, c6 k9 z- f% u% r! V
}
; p7 `# n' Q6 h. \) h; ]3 S</code></pre>( z7 O. u( x1 z, r+ P5 P
<p><strong>let message = receiver.lock().unwrap().recv().unwrap();</strong> 这里获取锁后从receiver获取到消息体,然后let message结束后rust的生命周期会自动释放掉锁。<br>
* T: s9 s1 x- s- O3 f: [但如果写成</p># x6 T2 d& G+ L' @& T1 ^& V
<pre><code>while let message = receiver.lock().unwrap().recv().unwrap() {
6 N" `% l" ]9 d6 m1 V) B( s: u4 V};# N6 ^) [4 X/ |( \- C2 m) h
</code></pre>/ {5 \% [: O9 G0 o
<p>while let 后面整个括号都是一个作用域,要在这个作用域结束后,锁才会释放,比上面let message要锁定久时间。<br>
- N' b3 B- J' p+ C) K. urust的mutex锁没有对应的unlock方法,由mutex的生命周期管理。</p>+ U" n& O4 Z- X& `. J. v2 K0 |( F  e$ e
<p>我们给Pool实现<code>Drop</code> trait, 让Pool被销毁时,自动暂停掉worker线程的执行。</p>
! x  f- s# o% E% \  }<pre><code>impl Drop for Pool {+ ^/ A8 w9 }/ b5 [0 b3 Y' I
    fn drop(&amp;mut self) {
. c: c; M' A! @+ h; m        for _ in 0..self.max_workers {7 ?3 C- x* X( |7 u4 z5 k
            self.sender.send(Message::ByeBye).unwrap();
; d# a( Z  y: r# A% v; g/ S% h' q        }+ y+ X$ ^; N- c3 _4 |" P# a
        for w in self.workers.iter_mut() {+ Z/ G/ n- U/ ~( k
            if let Some(t) = w.t.take() {' n* w1 c6 w3 Y' b: t! M# Q
                t.join().unwrap();% ~. A7 \2 t) E. m8 D, e, X
            }4 t2 z, ~8 ]8 u$ V# x& k4 l
        }
' D: O+ `) H* S7 b( b- i    }
) \6 @' d3 m* r( f+ U  n0 P% `/ L. M}+ \1 t3 f' f5 X

% ]) X% q7 X1 d5 O9 Q</code></pre>9 b( ]" f, T0 d; @
<p><strong>drop方法里面用了两个循环</strong>,而不是在一个循环里做完两件事?</p>
' o! v* y$ R! h7 h# k<pre><code>for w in self.workers.iter_mut() {
0 D7 h. a) f# x" [    if let Some(t) = w.t.take() {
0 ?! p: L$ r/ B        self.sender.send(Message::ByeBye).unwrap();9 c' J' z- c. T+ C
        t.join().unwrap();) m" {# ~* k6 M9 d7 F5 Q! T! g2 p; F
    }
" {4 {+ e8 J! a, f}; k/ U2 N4 e! a* D3 r
- V4 P% }9 ?, f0 h1 J% v
</code></pre>
' t% i+ Y3 g* z. F+ V$ B3 c<p>这里面隐藏了一个会造成死锁的陷阱,比如两个Worker, 在单个循环里面迭代所有Worker,再将终止信息发送给通道后,直接调用join,<br>
! B# t" H2 W4 V; E' U3 c2 w! m5 b我们预期是第一个worker要收到消息,并且等他执行完。当情况可能是第二个worker获取到了消息,第一个worker没有获取到,那接下来的join就会阻塞造成死锁。</p>
6 w- Z# D9 a6 }* W- u. v" B<p><strong>注意到没有,Worker是被包装在Option内的</strong>,这里有两个点需要注意</p>
8 p1 q$ c1 W# r" d& I0 D<ol>6 o$ v1 C' J' I$ m' Q! T/ ^9 i& t
<li>t.join 需要持有t的所有权</li>8 [; g0 G1 z9 M5 `
<li>在我们这种情况下,self.workers只能作为引用被for循环迭代。</li>$ ?" B. }' @1 ~* t
</ol>
$ G( i: v' V  k- z) W<p>这里考虑让Worker持有<code>Option&lt;JoinHandle&lt;()&gt;&gt;</code>,后续可以通过在Option上调用take方法将Some变体的值移出来,并在原来的位置留下None变体。<br>
; U1 h7 s0 @1 V  J1 w3 j) m( R换而言之,让运行中的worker持有Some的变体,清理worker时,可以使用None替换掉Some,从而让Worker失去可以运行的线程</p>- V4 V; t1 t6 q/ h0 h) _
<pre><code>struct Worker where
' c# h( I) T, t6 l6 K( k* d# u; D{
: ?/ C* }* O( C5 z    _id: usize,) ^2 \5 G2 [1 ~: u
    t: Option&lt;JoinHandle&lt;()&gt;&gt;," L" ]8 h* b, w* \8 ^
}
. @/ Q! {4 [6 W- T! U: R! \</code></pre>7 L  Z# B5 X7 K, d
<h1 id="要点总结">要点总结</h1>& p7 E' [" A4 u; Y) E; Q
<ul>
$ b3 f8 h* i& L3 ^4 ?  e<li>Mutex依赖于生命周期管理锁的释放,使用的时候需要注意是否逾期持有锁</li>  ~' ^; G  s% C) X) q" o: H! [
<li><code>Vec&lt;Option&lt;T&gt;&gt;</code> 可以解决某些情况下需要T所有权的场景</li>
0 z0 y$ H7 A. X; U$ \* ]8 }</ul>& _" n8 V% J! o# b
<h1 id="完整代码">完整代码</h1># _+ d9 z8 `, j* t' V
<pre><code>use std::thread::{self, JoinHandle};1 s6 b8 e  t' {" _, l' x
use std::sync::{Arc, mpsc, Mutex};* h" x2 k6 e3 _2 V2 b

1 G- U6 x9 d1 z5 R7 ]7 `3 q3 `
' _5 h% L, T  |, a3 y7 d  s2 {- G) ztype Job = Box&lt;dyn FnOnce() + 'static + Send&gt;;
8 U- S( q% {3 T% benum Message {
" i# k; r' z* q9 o    ByeBye,
+ q0 k/ S5 G% j7 F4 m! U" q7 f3 B$ A    NewJob(Job),
& s2 P  S, t: P; z}
: Q: C' T$ ?2 D
4 y& a, ^+ [' {struct Worker where
5 e/ F- E$ ~3 C( W4 w; Z{
$ Z0 j) ^7 Z4 u, j# `    _id: usize,' l! k  u4 Q: d3 d9 E
    t: Option&lt;JoinHandle&lt;()&gt;&gt;,
" C& @' D1 |; C% p9 i}
9 j( H5 J3 l) B2 p" I& I1 O- h5 N! Q6 a. \
impl Worker
, Z5 f1 R( ~( P% b# E( [{
+ |3 `1 N1 |1 C: o    fn new(id: usize, receiver: Arc::&lt;Mutex&lt;mpsc::Receiver&lt;Message&gt;&gt;&gt;) -&gt; Worker {3 b" \" \. A4 C8 m* z
        let t = thread::spawn( move || {0 k; C2 T9 _. e; r
            loop {
. o! T$ b% c$ R( I/ R                let message = receiver.lock().unwrap().recv().unwrap();
/ D; Q% t/ C( [) Z$ k* x6 S* p                match message {
4 X* x( `. Z9 r& Z% l                    Message::NewJob(job) =&gt; {
! F4 B2 L/ @  L/ \0 D                        println!("do job from worker[{}]", id);
! l) \0 p: ^* n' `9 Q9 m                        job();2 `: I8 a6 O! J6 |+ V% ?, D+ z
                    },
. v: k2 t6 R6 d' [" L5 X                    Message::ByeBye =&gt; {
, ]! V0 q; a  U" n                        println!("ByeBye from worker[{}]", id);
- X* }# d% a* }2 [! K                        break
4 ~  j# ?, ]# d. f; y                    },
) B4 t8 R8 U+ W8 g7 Z8 K! ~                }  
9 w# q4 s; r( W0 O            }8 q8 ?- |0 o' e; f- l$ C7 o8 b4 \
        });
* k& {8 X6 h; s2 z0 p' t# Z% d3 w9 V& N- G2 V: P- z# b
        Worker {
" C/ _: I/ ]5 @- T5 U2 N+ ?            _id: id,
( s1 E9 @: L2 b0 T1 m            t: Some(t),
% u: Y  |! k: V3 d. \        }; g1 U6 n1 I  P3 A, ^& G8 ?( q
    }
- w6 q3 ^* h1 j}
, E3 @  G$ @1 l8 q: d9 `2 R9 Y' P! b( E* f3 U- J  Q% @+ P0 L
pub struct Pool {) ]6 N  b( ]3 _8 C' d/ Q
    workers: Vec&lt;Worker&gt;,
1 k* \& V) K. W. L% G    max_workers: usize,6 q: V- s: Q; }
    sender: mpsc::Sender&lt;Message&gt;& ?) A) T! u( d1 ]5 m) u
}
3 \5 ^% u( G3 |' {" L% U- f
  y8 ^2 l/ d  m* j4 L- A( [impl Pool where {
* y9 `" z2 D) B9 `$ n' ]    pub fn new(max_workers: usize) -&gt; Pool {
" ~( w8 C( u# O- i% {' K        if max_workers == 0 {5 ]$ i$ F5 c- J$ e, m2 K% ^
            panic!("max_workers must be greater than zero!"); v, @7 D7 f) B) J6 R
        }4 v& d" C( c0 x( U0 y% U
        let (tx, rx) = mpsc::channel();
& U6 X, r5 a/ E1 a: I1 v% g
$ L) R2 W3 @4 ~1 x% p' |- t- Y2 a$ U        let mut workers = Vec::with_capacity(max_workers);4 ^. n/ c% f1 ^; t
        let receiver = Arc::new(Mutex::new(rx));
0 C: x- N& `. d+ C; k1 r' m5 b        for i in 0..max_workers {' ]6 o$ Y( J0 k
            workers.push(Worker::new(i, Arc::clone(&amp;receiver)));. E$ j; n, }8 W0 M) K
        }
" [7 a/ Y3 a0 I6 E. m+ L& w* A) f- R: f( ~3 ~$ z
        Pool { workers: workers, max_workers: max_workers, sender: tx }+ X' V- ?. K5 P3 `; F& R
    }
% M8 ?' B! t3 Y- A. H  J* H   
& V: D$ n9 k3 E6 Y+ M( O/ U    pub fn execute&lt;F&gt;(&amp;self, f:F) where F: FnOnce() + 'static + Send
$ q- @: m& }: d# ~8 c2 ?    {
# M& h. V% W5 v; G5 n- c0 ^- n3 E! b" J/ {4 R  f
        let job = Message::NewJob(Box::new(f));
3 n- A) _' [) J3 q& j3 M6 C        self.sender.send(job).unwrap();) P, X  [# o; k/ p1 ?8 h
    }- l+ M  t% C, X
}- D, X2 m$ ?5 }% v, i
; J5 c3 T" I/ b
impl Drop for Pool {
* B- C$ P$ v) c& H+ d    fn drop(&amp;mut self) {
) f# x* u6 E/ ~- K        for _ in 0..self.max_workers {4 b1 H3 F" @5 a' z3 u, s1 t3 u, M
            self.sender.send(Message::ByeBye).unwrap();
2 J7 o) O! i( H# [  y4 X* I        }
) r) W, d& @9 H  T& t        for w in self.workers {
9 G2 K9 i9 j. l            if let Some(t) = w.t.take() {
: ~; H: w0 S6 ]3 n" _3 v                t.join().unwrap();# c. M1 }/ l: C, T3 A
            }
* ?: l/ k$ S% Z, E        }2 y' j7 |9 }+ u  X* N
    }
2 m/ S! |1 j' g* _. u% [}
: T1 o# F* h' U9 _3 ?* k( ^! x4 e. G

% A! `4 M9 d) @9 B: f3 }#[cfg(test)]( u1 v$ B3 b( Y5 L! I
mod tests {5 t* s2 p/ ?% m
    use super::*;3 H2 Y9 g! g) s8 E# n
    #[test]
1 Y) \" D. u" I) ?  |& R. b    fn it_works() {1 v% o1 L9 G; ]3 p" ]$ m
        let p = Pool::new(4);( Z/ Z" y* k4 c8 X  p- N
        p.execute(|| println!("do new job1"));, o# w6 n+ c  Y3 D- n
        p.execute(|| println!("do new job2"));& u" a( F: q8 o
        p.execute(|| println!("do new job3"));
" L  Y1 u- p# P4 g; X        p.execute(|| println!("do new job4"));+ j2 K1 m; K/ ]; X2 x
    }
2 X# j1 Y& I* l' u1 }}- c5 n5 z3 p9 i1 S" @, L" E
</code></pre>+ m) P6 P  S7 P; ]8 t
4 b7 i, l+ e& {9 k) C0 F: m
回复

使用道具 举报

懒得打字嘛,点击右侧快捷回复 【右侧内容,后台自定义】
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

手机版|飞雪团队

GMT+8, 2025-2-24 15:14 , Processed in 0.076083 second(s), 21 queries , Gzip On.

Powered by Discuz! X3.4

Copyright © 2001-2021, Tencent Cloud.

快速回复 返回顶部 返回列表