飞雪团队

 找回密码
 立即注册
搜索
热搜: 活动 交友 discuz
查看: 15024|回复: 0

rust 实战 - 实现一个线程工作池 ThreadPool

[复制链接]

8117

主题

8205

帖子

2万

积分

管理员

Rank: 9Rank: 9Rank: 9

积分
26681
发表于 2022-2-12 14:35:42 | 显示全部楼层 |阅读模式
6 ]3 V) Y  `3 ^% ^2 z- y4 z. h7 E
<h1 id="如何实现一个线程池">如何实现一个线程池</h1>
3 {& A  [  \. C# J* s<p>线程池:一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。 例如,对于计算密集型任务,线程数一般取cpu数量+2比较合适,线程数过多会导致额外的线程切换开销。</p># W$ y3 q" h' ~/ {. _- `
<p>如何定义线程池Pool呢,首先最大线程数量肯定要作为线程池的一个属性,并且在new Pool时创建指定的线程。</p>5 N9 i& M" n, _3 B5 \
<p>线程池Pool</p>
8 o7 @; a+ ~6 |5 `$ L! E3 y. [<pre><code>pub struct Pool {3 U' D, X7 E3 v5 m4 }
  max_workers: usize, // 定义最大线程数
& Y+ [& s9 m+ a. [4 s, O: _}
+ |- _9 ]9 e# P+ Y  \( r8 E0 o" `7 `( i" k
impl Pool {
; {! }3 s; }0 j9 e  fn new(max_workers: usize) -&gt; Pool {}
: v' C! V: W  c; B  fn execute&lt;F&gt;(&amp;self, f:F) where F: FnOnce() + 'static + Send {}- Y& c" i' j5 ^  p. G6 `4 @/ c
}
; M( m. w2 U0 \# X# v0 @- z! L
0 O# L5 i9 \& S# ^' F) p8 ^</code></pre>
5 u$ M! m. [5 b3 f/ ~$ y<p>用<code>execute</code>来执行任务,<code>F: FnOnce() + 'static + Send</code> 是使用thread::spawn线程执行需要满足的trait, 代表F是一个能在线程里执行的闭包函数。</p>2 ~* O' V* |8 W3 }8 n+ l
<p>另一点自然而然会想到在Pool添加一个线程数组, 这个线程数组就是用来执行任务的。比如<code>Vec&lt;Thread&gt;</code> balabala。这里的线程是活的,是一个个不断接受任务然后执行的实体。<br>
/ r, O  S7 q" `; M% ?6 Q7 }' z# T可以看作在一个线程里不断执行获取任务并执行的Worker。</p>: d. ~  f" C1 f
<pre><code>struct Worker where
# Z/ Z7 Y- \; _8 q: t{
. B6 }* M6 ~" N& k, `/ h    _id: usize, // worker 编号
0 S  _$ A8 ^( x8 }* d6 I}; t2 G+ H$ a/ Q6 k9 |
</code></pre>4 u3 O3 D0 [( W- s* e
<p>要怎么把任务发送给Worker执行呢?mpsc(multi producer single consumer) 多生产者单消费者可以满足我们的需求,<code>let (tx, rx) = mpsc::channel()</code> 可以获取到一对发送端和接收端。<br>/ \4 M2 N5 O; t( G! V
把发送端添加到Pool里面,把接收端添加到Worker里面。Pool通过channel将任务发送给多个worker消费执行。</p>
& k# ]2 }0 }; I5 w) c<p><strong>这里有一点需要特别注意,channel的接收端receiver需要安全的在多个线程间共享</strong>,因此需要用<code>Arc&lt;Mutex::&lt;T&gt;&gt;</code>来包裹起来,也就是用锁来解决并发冲突。</p>: P, a7 b: F  b
<p>Pool的完整定义</p>+ O' B1 ?+ i+ h; h
<pre><code>pub struct Pool {
3 C2 b0 M# ?: U% J! {    workers: Vec&lt;Worker&gt;,
3 s) R( t) D7 W6 u% v3 Q; \  T    max_workers: usize,
0 a, R! v2 H9 ]% o( t% W    sender: mpsc::Sender&lt;Message&gt;
4 W, z$ c& b' C1 L* m}
$ R, o% M$ j$ y</code></pre>; n) P" J; y2 q8 r9 Z* ~
<p>该是时候定义我们要发给Worker的消息Message了<br>
- z- k# q9 S$ _. O; d定义如下的枚举值</p>
( f3 L' I$ H& ?8 |4 g, s( Y# A<pre><code>type Job = Box&lt;dyn FnOnce() + 'static + Send&gt;;1 _* a3 w9 V* h7 n. Y: n
enum Message {' {6 p- Z) W" @& ?
    ByeBye,8 }2 T/ \5 T4 f, a! E! D* `1 A
    NewJob(Job),. B, A5 R( Q% s- u, J/ G+ m( h
}
& L' K, ?. N$ [& p- {( H6 Q</code></pre>
, }( q' ~/ M$ w5 D5 P<p>Job是一个要发送给Worker执行的闭包函数,这里ByeBye用来通知Worker可以终止当前的执行,退出线程。</p>
& u: s6 b( X& z+ a( v1 _% b1 s<p>只剩下实现Worker和Pool的具体逻辑了。</p>
! I; n8 X; Y$ g1 }/ c- D9 f# [<p>Worker的实现</p>1 h( P/ _8 k+ m+ i+ X, v. H: ~
<pre><code>impl Worker
) [( Q# R- E! f/ E/ L{7 G; z) y$ Y" I+ `
    fn new(id: usize, receiver: Arc::&lt;Mutex&lt;mpsc::Receiver&lt;Message&gt;&gt;&gt;) -&gt; Worker {
# B7 `8 B* j; M9 O) w        let t = thread::spawn( move || {6 l$ Y  `3 q7 ~6 r' b* s5 \- p$ Z
            loop {
! k; L% x( U0 j6 n                let receiver = receiver.lock().unwrap();: }" N, V& @& P4 W; E1 h- ]; M4 ^, o- U9 k
                let message=  receiver.recv().unwrap();# `/ q3 a. m% x; a
                match message {" w$ `! g8 l1 o5 {
                    Message::NewJob(job) =&gt; {4 a# D" }, B- B2 o+ Q# Z9 d
                        println!("do job from worker[{}]", id);6 O* H# Y6 |/ N3 C2 q5 n
                        job();
# n" f+ g! R( J5 o                    },' Z9 ~6 k4 @6 m: Z& k8 P0 L
                    Message::ByeBye =&gt; {
- h6 H3 J6 M: `                        println!("ByeBye from worker[{}]", id);# f  F: V- z4 z! `3 F* A0 f
                        break
0 A/ O9 J3 V$ f/ v- |/ N- [" j  P                    },% O  u% d5 y$ S  ~
                }  
' v$ X- M% ]! a/ n& E: \  u            }$ }' {* r! r2 P
        });
+ h: E+ b& h1 S& L7 G: A
( ?( g( j6 I: N        Worker {: ?$ m7 `5 B6 w& L: i: B8 T1 v
            _id: id,
( k$ K6 _: g* H            t: Some(t),
7 N1 J& n( `$ I        }7 D' e9 a8 l! \' }1 _
    }
/ z( ]3 W* h: _}7 d2 J% \1 B" [% c9 N* P( u2 N
</code></pre>
% q: {! F% I4 X<p><strong>let message = receiver.lock().unwrap().recv().unwrap();</strong> 这里获取锁后从receiver获取到消息体,然后let message结束后rust的生命周期会自动释放掉锁。<br>1 L+ L. n. C7 D6 U- z7 m. g2 Z& `
但如果写成</p>: Q8 `( u7 l% ^! g6 B2 C8 z4 d
<pre><code>while let message = receiver.lock().unwrap().recv().unwrap() {3 Z: s( m, y8 I: n; {( ^
};
- {! p# {( J, J. g</code></pre>
3 u, a+ w/ Z2 ?) j" M" L; u<p>while let 后面整个括号都是一个作用域,要在这个作用域结束后,锁才会释放,比上面let message要锁定久时间。<br>
  t' w& A: L6 ?# [9 frust的mutex锁没有对应的unlock方法,由mutex的生命周期管理。</p>, m) ?) [1 O6 ]# _2 t  u' j3 z
<p>我们给Pool实现<code>Drop</code> trait, 让Pool被销毁时,自动暂停掉worker线程的执行。</p>
2 f8 \5 u- d: q) ?( A: }<pre><code>impl Drop for Pool {( ]- i9 m$ W$ d! M  {. \
    fn drop(&amp;mut self) {; o6 j, @* K; h+ S% o
        for _ in 0..self.max_workers {
7 ]/ r& ~5 i+ I" m: l$ S4 R            self.sender.send(Message::ByeBye).unwrap();$ R/ x1 W$ i3 E" `
        }
6 s8 q7 X0 M3 I# }        for w in self.workers.iter_mut() {- `( G6 x& X; q- q! K1 j4 H5 k5 O
            if let Some(t) = w.t.take() {: C) D3 j+ c. \# A: p
                t.join().unwrap();
: G9 l3 c  t. [: I            }6 x2 g: D5 O$ T: ?5 P( N" W
        }
- ~1 o% [9 P, e. ]* a    }
' z6 p1 Q, @1 I% Z, u}
. U  Z8 l! i! ^* ?% [  u  F0 l1 P6 \' a2 s0 L
</code></pre>
$ Q  @+ p4 B- g/ V8 i<p><strong>drop方法里面用了两个循环</strong>,而不是在一个循环里做完两件事?</p>
8 G& V. e; Z3 Z/ c<pre><code>for w in self.workers.iter_mut() {- I! g6 m8 \5 n: H& E' c6 g
    if let Some(t) = w.t.take() {
# k/ B3 s' A2 {        self.sender.send(Message::ByeBye).unwrap();! Z$ j7 p1 z2 t- k7 Y7 i3 d
        t.join().unwrap();
' V( M; X  j- f' s. j+ N2 e    }% t# M2 m0 e5 b1 \* r( X
}
: F# e! z  |9 _  G2 r/ y( n# M1 f, @8 f2 s
</code></pre># S# f) k) `+ w1 Q
<p>这里面隐藏了一个会造成死锁的陷阱,比如两个Worker, 在单个循环里面迭代所有Worker,再将终止信息发送给通道后,直接调用join,<br>
! v5 [$ i+ a* I我们预期是第一个worker要收到消息,并且等他执行完。当情况可能是第二个worker获取到了消息,第一个worker没有获取到,那接下来的join就会阻塞造成死锁。</p>
' G# I" ]7 r7 }% x* D1 N$ ]! T<p><strong>注意到没有,Worker是被包装在Option内的</strong>,这里有两个点需要注意</p>
& M  e% g0 @& q- ?, X<ol>$ M/ R5 k7 y; U0 k1 O3 v
<li>t.join 需要持有t的所有权</li>) R5 T" K6 O4 |( I6 p. ^, ?
<li>在我们这种情况下,self.workers只能作为引用被for循环迭代。</li>4 B5 h) z% y( O  e( ^
</ol>
! h" c) G6 c2 |0 {7 ?<p>这里考虑让Worker持有<code>Option&lt;JoinHandle&lt;()&gt;&gt;</code>,后续可以通过在Option上调用take方法将Some变体的值移出来,并在原来的位置留下None变体。<br>
9 h3 Y% R# H, E换而言之,让运行中的worker持有Some的变体,清理worker时,可以使用None替换掉Some,从而让Worker失去可以运行的线程</p>
' f& S+ G8 G9 s<pre><code>struct Worker where; B& x! X+ P) c. H" |9 J
{4 ]  P. `: k) S: u
    _id: usize,+ S( U8 Z6 c9 P9 U  A5 x+ N8 T
    t: Option&lt;JoinHandle&lt;()&gt;&gt;,
! O/ i0 R/ u$ Y" P$ P. Q8 C* ~}7 I% G' o. w  {
</code></pre>
# o. V2 p3 G0 Y$ P) W; S& Q0 p: e. \<h1 id="要点总结">要点总结</h1>
* G8 P/ v8 R8 [# L1 P2 U) M<ul>; \$ @6 `" P& \
<li>Mutex依赖于生命周期管理锁的释放,使用的时候需要注意是否逾期持有锁</li>
2 @$ m, B) m# r5 j: V2 D<li><code>Vec&lt;Option&lt;T&gt;&gt;</code> 可以解决某些情况下需要T所有权的场景</li>
6 d, q9 a$ w$ I</ul>, _  ^0 i. }  a3 n* Y1 V/ f
<h1 id="完整代码">完整代码</h1>
3 f( ^+ }: Z* e* q& o+ S" J& V<pre><code>use std::thread::{self, JoinHandle};) e( [% L; C2 T4 v# o) s6 P5 i. c
use std::sync::{Arc, mpsc, Mutex};! x/ @0 ?1 c% D/ e2 L

+ t2 F4 s( c, e* V! P6 D: j! n
$ G  H) F! R+ N8 s' Atype Job = Box&lt;dyn FnOnce() + 'static + Send&gt;;( Q+ k+ W$ {& t; o7 w: \7 d# X& i
enum Message {8 F& ?0 B, ~% U/ E; {! u) c; Z4 }
    ByeBye,0 h5 u/ \7 w4 F2 J
    NewJob(Job),4 n& x# r  _' E! [3 T
}. q( S/ z4 h# o/ S9 y6 ?

! m( k6 j0 g6 ^struct Worker where- m/ X8 J) x7 b9 `) ~
{
$ Q4 r7 j9 a: I9 k- G# ?7 w    _id: usize,' H. u. ?+ L- U8 `
    t: Option&lt;JoinHandle&lt;()&gt;&gt;,
3 B$ j+ |$ B: M}3 ~( D3 e0 h- n7 m0 a* }% j9 }

- M' i( g9 Y+ m/ `- Eimpl Worker% s* d8 L: k& Z. E$ K% f
{
9 D: Y/ o' m0 @/ |7 w    fn new(id: usize, receiver: Arc::&lt;Mutex&lt;mpsc::Receiver&lt;Message&gt;&gt;&gt;) -&gt; Worker {
: ^9 W2 M1 |  b' Y        let t = thread::spawn( move || {
# y" i6 g: z. c7 p            loop {0 }! ?& M7 v9 D; o0 j
                let message = receiver.lock().unwrap().recv().unwrap();6 i7 M7 k, m0 L2 B4 m3 [2 O
                match message {
. ]% f; R$ J8 ^: O                    Message::NewJob(job) =&gt; {
, R, r* C+ V( A$ z0 m+ Z. @5 ?                        println!("do job from worker[{}]", id);
( G7 @( O( j* v* W( H9 a' {                        job();
& Z8 Y7 G* x! x) @2 k5 ^                    },
4 r  i2 {4 M0 _; _! d9 @; Y. I& o                    Message::ByeBye =&gt; {
$ V+ l% Z- }3 H- M                        println!("ByeBye from worker[{}]", id);9 U7 E: b+ g1 O& t
                        break0 f! v' W9 m! r
                    },
. k  g6 \& `" Q7 i3 U$ S                }  ; D! J: i, E  m2 W2 e
            }0 i( n6 k$ Q; Y- W& ?& b
        });1 _- z6 x& C/ c+ u

7 l! [6 k( ]& ^5 [* @        Worker {
* P% O7 u0 J; C8 T5 L% V            _id: id,5 w4 F3 G' g- }: N+ `1 ~$ O
            t: Some(t),- I7 l" P$ t2 Y% F4 ^
        }: [  V0 Z* z+ [/ x4 l- a8 M
    }9 p- U4 M6 s2 ~) R0 b  F- h% I: D1 _
}
( l6 V5 _1 G5 ]0 f' I5 g# [* G6 X1 V  K$ j
pub struct Pool {
4 N7 g, o" j# M8 S    workers: Vec&lt;Worker&gt;,
- p9 R# h7 j) B% H& N& `    max_workers: usize,
2 _5 P0 F/ s' U- r( ^    sender: mpsc::Sender&lt;Message&gt;
2 t& G- t5 L8 G- E1 g}6 X4 C6 E& r4 k& `% }

+ i* O+ D# {% h& X% @  @impl Pool where {( _$ a# h. J( U9 K/ E
    pub fn new(max_workers: usize) -&gt; Pool {
5 t* `' y9 g6 R3 b        if max_workers == 0 {
; F& e6 ]9 J8 T+ {; u5 I2 v0 T$ {            panic!("max_workers must be greater than zero!")
4 ]! A, O' n/ ~4 K" y3 v0 `        }
( }# ?9 n! j8 Y        let (tx, rx) = mpsc::channel();
7 `# L; e+ G7 k8 l- v) {. o* _
: _- x: j) Q% D6 H  [6 r        let mut workers = Vec::with_capacity(max_workers);) X  K1 S% P1 q; F8 L; |
        let receiver = Arc::new(Mutex::new(rx));! o7 T0 l5 X. n( G- S9 v3 h! R& s
        for i in 0..max_workers {
6 Q2 e3 C6 T% l& a- K* o            workers.push(Worker::new(i, Arc::clone(&amp;receiver)));* S6 R  B2 `* _+ G* Q0 {
        }1 z/ \# v3 w1 i, W2 N

1 d- b/ X* s# u- R, ?        Pool { workers: workers, max_workers: max_workers, sender: tx }
. k7 Q/ P/ x' Q5 p    }
# K3 Y/ e5 Q. f0 J2 @- c    6 Q+ J' H. B; f( c1 `7 w
    pub fn execute&lt;F&gt;(&amp;self, f:F) where F: FnOnce() + 'static + Send6 w9 u% }; @0 t) \) M
    {
5 T; s. o7 C; D5 j3 H' Q) r
) [! ~* o2 J% \" Z. O        let job = Message::NewJob(Box::new(f));% w, x1 M: h( I5 M
        self.sender.send(job).unwrap();& I/ \: ?) B+ N7 b
    }
) A$ K& k. K/ R}  ~, ~) M* N  T0 y, N  m) ~8 |, E

: T6 @$ J9 b$ t4 f0 _: ^impl Drop for Pool {
6 n4 F' S0 P" E0 |0 [- n$ ]+ v    fn drop(&amp;mut self) {
3 @: n1 j# G* D        for _ in 0..self.max_workers {( W. K7 T8 ^; L) D" j4 n
            self.sender.send(Message::ByeBye).unwrap();
  `$ b: U9 `) `  I        }
" Y% ~2 n6 v& [" ~* b        for w in self.workers {# o" E! x% C5 Z3 N
            if let Some(t) = w.t.take() {
& W: F7 a0 Q% ~* o4 P                t.join().unwrap();! p0 b! b$ A7 f; m- G4 d6 a/ h8 F* B
            }3 o2 J, v; d1 \# S
        }- l3 F& Q  Q9 a5 o5 ~4 N4 m! q5 M1 U
    }% d" x# l, {( d2 |5 Y! A
}* Z9 c" \# @5 I1 ^

$ @) @0 S. h4 P: \, q4 s( G
2 ]8 u% K* u3 \#[cfg(test)]$ U  y. k. {  S. {+ D6 z
mod tests {
" }$ Q! _5 B4 ]' L    use super::*;. s  b: \9 l  N
    #[test]
0 z4 C0 O0 c' T' c6 m7 G8 a2 f    fn it_works() {
1 {2 e5 v: q$ ?$ U2 ^9 @2 m& }8 r        let p = Pool::new(4);* P1 P; ^& o4 j- n: _
        p.execute(|| println!("do new job1"));
; d% ?0 R4 l) V7 ?# K  B        p.execute(|| println!("do new job2"));- m& b8 v5 y0 e" a9 H6 d& B9 ~+ [
        p.execute(|| println!("do new job3"));
6 Z8 f* L" w. a% o. X        p.execute(|| println!("do new job4"));
" ]' {" v, ?% T; l& E4 a0 \# D    }
& F6 L$ t( j" K! h* A; e}
+ e3 x5 U  Z& W, W2 m8 a1 H</code></pre>
3 Z- x( M; w, U4 p! T3 g$ {8 ~( {" \& X2 I) Z
回复

使用道具 举报

懒得打字嘛,点击右侧快捷回复 【右侧内容,后台自定义】
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

手机版|飞雪团队

GMT+8, 2025-12-22 22:04 , Processed in 0.190987 second(s), 22 queries , Gzip On.

Powered by Discuz! X3.4

Copyright © 2001-2021, Tencent Cloud.

快速回复 返回顶部 返回列表