飞雪团队

 找回密码
 立即注册
搜索
热搜: 活动 交友 discuz
查看: 5457|回复: 0

rust 实战 - 实现一个线程工作池 ThreadPool

[复制链接]

5344

主题

5432

帖子

1万

积分

管理员

Rank: 9Rank: 9Rank: 9

积分
18354
发表于 2022-2-12 14:35:42 | 显示全部楼层 |阅读模式

+ c2 Y' B9 K3 c5 F<h1 id="如何实现一个线程池">如何实现一个线程池</h1>' r* O$ O7 I* h$ K, h
<p>线程池:一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。 例如,对于计算密集型任务,线程数一般取cpu数量+2比较合适,线程数过多会导致额外的线程切换开销。</p>
8 m  @. P- |# J, t* j7 f  g! j<p>如何定义线程池Pool呢,首先最大线程数量肯定要作为线程池的一个属性,并且在new Pool时创建指定的线程。</p>; l9 ~) A3 b0 c
<p>线程池Pool</p>
" I& Q" e+ V" M<pre><code>pub struct Pool {% r& P8 M2 J' W! E5 s, y; v, o7 S
  max_workers: usize, // 定义最大线程数
+ ~) p/ W4 [0 }8 q' d2 T1 v0 T4 M}4 X/ h7 c7 p/ }+ g/ c

' r1 r( B/ Q$ [9 b' D/ c9 {impl Pool {3 {' r) [" S$ p  r. J
  fn new(max_workers: usize) -&gt; Pool {}3 T4 S% P& q) P2 y8 V
  fn execute&lt;F&gt;(&amp;self, f:F) where F: FnOnce() + 'static + Send {}1 i; q5 |7 b4 O. ~% X' ]! e* k
}4 e) k3 Z8 w: _- z
2 C1 ~2 s# q) a3 \
</code></pre>
( Z. H" @# w( F9 z# x( r! S1 ?# j<p>用<code>execute</code>来执行任务,<code>F: FnOnce() + 'static + Send</code> 是使用thread::spawn线程执行需要满足的trait, 代表F是一个能在线程里执行的闭包函数。</p>
1 ?/ v! c, X* q- }7 b" D<p>另一点自然而然会想到在Pool添加一个线程数组, 这个线程数组就是用来执行任务的。比如<code>Vec&lt;Thread&gt;</code> balabala。这里的线程是活的,是一个个不断接受任务然后执行的实体。<br>0 X0 h1 W2 f$ s/ c+ o7 Q9 e
可以看作在一个线程里不断执行获取任务并执行的Worker。</p>4 G( |* I; p, ~) V% A* K
<pre><code>struct Worker where
1 k# D2 i4 J) Z3 D5 o{) |' Z# @4 T' A" U0 f0 J5 U5 u
    _id: usize, // worker 编号
# s1 _1 f5 w+ i}
  H" P- N9 X' d  J+ l# j</code></pre>
- o6 G" M3 V  t- S( y! j, f# [<p>要怎么把任务发送给Worker执行呢?mpsc(multi producer single consumer) 多生产者单消费者可以满足我们的需求,<code>let (tx, rx) = mpsc::channel()</code> 可以获取到一对发送端和接收端。<br>! z: j' y. A9 \% c% Y2 J! w
把发送端添加到Pool里面,把接收端添加到Worker里面。Pool通过channel将任务发送给多个worker消费执行。</p>
# A4 X) x" G% z8 J5 @<p><strong>这里有一点需要特别注意,channel的接收端receiver需要安全的在多个线程间共享</strong>,因此需要用<code>Arc&lt;Mutex::&lt;T&gt;&gt;</code>来包裹起来,也就是用锁来解决并发冲突。</p>
9 o) K  ~$ x1 x' t# w; g<p>Pool的完整定义</p>
/ m; D  Y5 B7 \3 `; e<pre><code>pub struct Pool {
* U4 L9 u" h% |2 n1 O    workers: Vec&lt;Worker&gt;,- y) e4 j; d$ |: h
    max_workers: usize,
2 }! H& I" g; P4 I: |, d0 |9 W  H2 u. l    sender: mpsc::Sender&lt;Message&gt;
) ?  L3 _" l9 C( j}$ ~0 |; }, \# Q! Y; B" r. D/ J
</code></pre>. \; C/ ^! J% o4 K& i" J
<p>该是时候定义我们要发给Worker的消息Message了<br>! D! |( h. T4 Q2 N  d. y. Z, i7 i
定义如下的枚举值</p>
5 h% h5 ]- p1 b. |& c<pre><code>type Job = Box&lt;dyn FnOnce() + 'static + Send&gt;;
' `: @5 X) g0 B$ M" Wenum Message {7 Q4 v: |9 A0 ^( F  y  T2 X3 B/ i
    ByeBye,
. B( ~  s+ N. d    NewJob(Job),* p$ e  N1 [; j: O) n4 k* u, t- B9 {3 v
}
3 I) s6 |; f1 A) X: g4 L8 N* u</code></pre>- q( |) I1 i- [9 E
<p>Job是一个要发送给Worker执行的闭包函数,这里ByeBye用来通知Worker可以终止当前的执行,退出线程。</p>
# K+ B( a% B) a0 l: `<p>只剩下实现Worker和Pool的具体逻辑了。</p>
& l; G  E% |# c9 j- k7 p. t/ w<p>Worker的实现</p>6 H0 g: {$ y; R
<pre><code>impl Worker
* ~/ z# a2 B9 g) _{' m1 v9 L3 [5 }
    fn new(id: usize, receiver: Arc::&lt;Mutex&lt;mpsc::Receiver&lt;Message&gt;&gt;&gt;) -&gt; Worker {9 v+ Y: V  S) ^0 O7 {+ V$ U- h
        let t = thread::spawn( move || {
) U4 ^" r( I: e3 E. S' v; C0 h            loop {9 l6 m, X& x  u: j
                let receiver = receiver.lock().unwrap();% b. j8 r* P) X- B8 h
                let message=  receiver.recv().unwrap();) y9 M( @7 f/ s/ Q% m' U# u
                match message {. d0 T5 E' v; r  o* E1 r
                    Message::NewJob(job) =&gt; {- A6 \$ u- h+ \% H4 f+ f
                        println!("do job from worker[{}]", id);# b: o* Z. q( a, S3 `4 B- r
                        job();
' F0 @4 I9 ^# p& T3 X& B- I; C                    },2 `0 C9 I! W4 G9 ?+ D/ ~
                    Message::ByeBye =&gt; {0 n/ w5 t7 t, ^! O
                        println!("ByeBye from worker[{}]", id);( T. o" ?* H, z$ i* a
                        break
2 x) }3 H$ }7 s) k$ z. `                    },
, a( M& [4 Y8 R5 A4 J$ U                }  - S6 r5 l6 d0 A; R
            }
/ ]. U# u$ |( X6 J+ W# i0 `" ~7 D        });" x. g5 g/ e. C

0 X  u7 }$ e' u, H; ?& A( R7 z        Worker {
. f- n" [8 S4 l' Z, x            _id: id,
- O7 \/ q& \1 w; `4 i            t: Some(t),# J3 `; `1 f9 ^- j2 w
        }9 [' Y5 o8 H! a( [4 r% n2 t) a& ]
    }4 f; a  |% b: i/ |( e
}
- E3 V( o2 e, y0 A  b  a</code></pre>
8 L1 [" F' p, d' [<p><strong>let message = receiver.lock().unwrap().recv().unwrap();</strong> 这里获取锁后从receiver获取到消息体,然后let message结束后rust的生命周期会自动释放掉锁。<br>+ ^# m6 W/ q( ]" F! M
但如果写成</p>
* m2 Z# b2 U: s, Z" |& A<pre><code>while let message = receiver.lock().unwrap().recv().unwrap() {' B9 a) a8 i  [5 u- l3 {0 c0 j$ x& h
};4 Z( J# {9 u: N5 P. W7 s0 c; h- G
</code></pre>, G! `% D' V2 Z8 `
<p>while let 后面整个括号都是一个作用域,要在这个作用域结束后,锁才会释放,比上面let message要锁定久时间。<br>
8 h" P& [$ ~9 {- Prust的mutex锁没有对应的unlock方法,由mutex的生命周期管理。</p>5 R' [3 N3 K" s8 q: |3 p
<p>我们给Pool实现<code>Drop</code> trait, 让Pool被销毁时,自动暂停掉worker线程的执行。</p>
- v( e, E: b; X<pre><code>impl Drop for Pool {
, J4 j1 X. T' W( _/ ^% A    fn drop(&amp;mut self) {
$ _; T2 r: `3 e& D        for _ in 0..self.max_workers {  a  G; r  i' Z3 M/ D
            self.sender.send(Message::ByeBye).unwrap();
! B7 E# P# [/ j6 z6 o        }
' O* ^, Z8 d, C2 y        for w in self.workers.iter_mut() {) N/ ?3 @2 y" J0 a/ g, K
            if let Some(t) = w.t.take() {: b8 B9 b1 t( S' l
                t.join().unwrap();/ _/ V$ y% U% `) F0 S1 @
            }
; ]6 _( U- P  x8 A3 V: |% w% ]        }
# l5 M( B/ X) {+ u1 d4 G* p    }# Q6 H9 I% @# U) T0 w7 A, x& w. M
}+ T+ }2 ~1 t; Q) k5 P

! J: C2 u! {5 y# J3 J) R+ j</code></pre>4 x( l, S8 [( r* z. ]
<p><strong>drop方法里面用了两个循环</strong>,而不是在一个循环里做完两件事?</p>
" b' q8 y) G/ ?: N: g( g<pre><code>for w in self.workers.iter_mut() {
$ L: E( ^, F% I1 g, {- n    if let Some(t) = w.t.take() {
9 S! `0 ?- l8 a        self.sender.send(Message::ByeBye).unwrap();
- V$ T) e4 [  h3 D1 P* \        t.join().unwrap();! T" F& K5 `; E1 l
    }
5 L# A$ R' n, K1 E}
1 \' O& m" d, G2 G/ s6 b7 O
/ k5 }% K* ~1 u3 f/ L2 X- {$ W$ g</code></pre>, x( D9 B5 u1 ^/ t" {) d; S, \2 X
<p>这里面隐藏了一个会造成死锁的陷阱,比如两个Worker, 在单个循环里面迭代所有Worker,再将终止信息发送给通道后,直接调用join,<br>
2 v% `) K4 z; K; F& K/ W/ k我们预期是第一个worker要收到消息,并且等他执行完。当情况可能是第二个worker获取到了消息,第一个worker没有获取到,那接下来的join就会阻塞造成死锁。</p>( n4 _1 K' F8 W/ R5 Z
<p><strong>注意到没有,Worker是被包装在Option内的</strong>,这里有两个点需要注意</p>
" R! f  d4 j/ T- S) O  `) g" q<ol>
* [) M) b5 C$ Z+ [* ^( m<li>t.join 需要持有t的所有权</li>% r6 i% m; |- Q6 ?. d/ e& @
<li>在我们这种情况下,self.workers只能作为引用被for循环迭代。</li>! l1 f7 b6 F% X' |* n
</ol>
% i& F* b9 G/ p4 L3 l5 j- h- ~<p>这里考虑让Worker持有<code>Option&lt;JoinHandle&lt;()&gt;&gt;</code>,后续可以通过在Option上调用take方法将Some变体的值移出来,并在原来的位置留下None变体。<br>/ S# D) y% Y, O2 K3 D
换而言之,让运行中的worker持有Some的变体,清理worker时,可以使用None替换掉Some,从而让Worker失去可以运行的线程</p>
' D* L0 p2 h' v" J<pre><code>struct Worker where
8 g6 M! ?# L3 }9 q{
% E2 _  S  Z- z7 ?2 ]    _id: usize,
: X' U- N. N  F' D" N4 v0 S    t: Option&lt;JoinHandle&lt;()&gt;&gt;,
$ Q2 Q% ]+ Z3 @9 t( ]9 H' c0 W}
4 H' l4 a9 j8 A& c9 m' o</code></pre>
- o9 S* C1 Q, o' S<h1 id="要点总结">要点总结</h1>
2 R% Z  ?0 [* f4 S3 V; I9 ~<ul>9 T0 g$ w$ X2 {1 Z: E) y, h3 W1 k3 Z
<li>Mutex依赖于生命周期管理锁的释放,使用的时候需要注意是否逾期持有锁</li>1 i6 g% ?- `: W& h3 }9 E
<li><code>Vec&lt;Option&lt;T&gt;&gt;</code> 可以解决某些情况下需要T所有权的场景</li>
4 s1 A9 Q- _4 L- b  k1 q</ul>) [  n- }; M- {+ L
<h1 id="完整代码">完整代码</h1>
7 K" _' G; _  i. W& G/ p<pre><code>use std::thread::{self, JoinHandle};
7 v1 ?& w' z1 h& W) q7 @use std::sync::{Arc, mpsc, Mutex};: o3 z' ]( c1 }. s

2 d2 x3 y7 F  i1 _9 {/ V! }* `2 W) H1 _4 r
type Job = Box&lt;dyn FnOnce() + 'static + Send&gt;;: t# [' ^4 S1 n- {1 Y! z3 Q
enum Message {
$ ?3 x0 W9 w; t    ByeBye,
% G( F5 X  T' s9 U7 D    NewJob(Job),( _& h( g# H; Y: {" d
}0 g( `# N7 a' t& `, h

9 C6 H" B& T+ c2 M- c. t2 ustruct Worker where
/ ^: g" I: e' U8 |9 y) {+ [9 q{
6 }$ \* e& ^3 ~1 Z    _id: usize,
9 [$ X' A( K0 |3 N    t: Option&lt;JoinHandle&lt;()&gt;&gt;,
9 M/ D6 S1 W. E: Z3 u) g}
; f, o* ]" {) ]/ ?+ ~' C5 Y& |3 c' c: Q" O6 |; {7 p% h5 |
impl Worker! q/ f2 l' P$ ~$ ~
{
& J) J  k' D& `: \' x: P. Z    fn new(id: usize, receiver: Arc::&lt;Mutex&lt;mpsc::Receiver&lt;Message&gt;&gt;&gt;) -&gt; Worker {( r1 h; I2 e( Z) j8 Z6 b$ }  w9 L! r
        let t = thread::spawn( move || {
/ N* a* E6 k6 Z0 N; D/ o! k            loop {# S: n$ `* c1 S, E  V
                let message = receiver.lock().unwrap().recv().unwrap();( H! ]$ y1 p$ H3 |6 h2 r
                match message {
& k+ c( L6 D8 S; P) N                    Message::NewJob(job) =&gt; {
% B2 ^# R) ?5 J2 V                        println!("do job from worker[{}]", id);
+ n& E* n5 t* ]                        job();- h/ a7 y) J& }* v. J4 H7 S
                    },3 f/ Z; p, |# B$ y. I  Y' }
                    Message::ByeBye =&gt; {4 a, B- h& g+ a+ w/ L9 Q1 B9 C
                        println!("ByeBye from worker[{}]", id);& t- h4 ?1 ?6 l
                        break1 w* W. O' q0 O6 x: o3 L
                    },( h6 [- l  c6 Z4 O- d* V5 K
                }  3 `0 U: I. n* u& d
            }* \: o% g! a- f) D
        });4 J* t" s& s' }( O' s
/ X3 p" p9 x+ j2 X0 h' y
        Worker {
+ k, i) K' W/ p6 f! g/ c2 n/ G9 v            _id: id,
  p0 D1 h* C5 E            t: Some(t),
9 v( _! x$ d! E/ V2 |/ q) W* H  W" ~        }2 s' p' T& M* a: U8 O8 Q* K
    }0 T/ L6 D3 ]' S8 w
}$ h6 N, ?& V7 ]+ p; y2 H
$ q" Q( o: {: o8 g! ]
pub struct Pool {4 Z4 s, g. y" X& g
    workers: Vec&lt;Worker&gt;,
2 y2 q2 }' A6 f* n3 f% F  H    max_workers: usize,
5 ?; D$ F: H4 b9 x: X    sender: mpsc::Sender&lt;Message&gt;- J6 T* A! W" \( M: x2 `& `* o
}$ w8 d0 P" L5 i( ]+ [

; Z5 P/ w4 b9 o7 X) j" Vimpl Pool where {3 d4 I7 K$ C+ p8 ^! \; J0 T
    pub fn new(max_workers: usize) -&gt; Pool {
$ a! M6 k; Z7 B+ y3 c        if max_workers == 0 {2 ?4 x* _  H% U1 ^, i$ e
            panic!("max_workers must be greater than zero!")
- Q% e! Z! q) T+ L        }
* q2 t" d0 i3 K+ c6 I0 j# X8 e4 l( X        let (tx, rx) = mpsc::channel();
& X% H" Z% [* w" |' o4 o: L1 I* c: H' v
        let mut workers = Vec::with_capacity(max_workers);
% x+ \0 N7 x! w1 g5 u' v        let receiver = Arc::new(Mutex::new(rx));
2 P1 _7 z, m, k        for i in 0..max_workers {
" `4 C9 q# h0 {( k$ J, P1 S5 P0 H9 n- d            workers.push(Worker::new(i, Arc::clone(&amp;receiver)));# T: t8 F3 J4 }
        }
4 j! B. b& o8 V' s) V" [* w" Y2 V
        Pool { workers: workers, max_workers: max_workers, sender: tx }
7 S" f" X. g0 c: j7 j    }6 E1 [* @0 Q6 e( o8 i3 c* F& p
   
4 t4 B8 X/ q( K* L0 c    pub fn execute&lt;F&gt;(&amp;self, f:F) where F: FnOnce() + 'static + Send( d& D- U5 @) e! Z4 T$ [
    {
1 ~6 U8 d$ X- A/ y1 U$ w4 v$ o9 M& v. G9 D, n
        let job = Message::NewJob(Box::new(f));
! P# b- a! C  _6 m# `        self.sender.send(job).unwrap();* @! i8 e  g( w1 T0 R1 k
    }% v0 o7 \* {: S2 Z+ }, d7 \
}( V( }6 r) M5 E* E' a7 U( T$ k  C
- E& W- @7 |1 E
impl Drop for Pool {
7 X3 M5 B: j1 }( S3 T( L2 W" T    fn drop(&amp;mut self) {/ V! H6 T4 e4 f$ f
        for _ in 0..self.max_workers {
7 x8 q# l: i: z2 v            self.sender.send(Message::ByeBye).unwrap();3 i, @# f3 b- U7 Y
        }" s5 p& b0 u! h& F# q; [% U
        for w in self.workers {
/ w  f5 Z6 a% X. M5 O0 n            if let Some(t) = w.t.take() {. ?8 C; U+ D. o) M- @1 g4 s5 i
                t.join().unwrap();8 b0 B4 m5 A" Y0 U
            }
7 H  u  R) I# P7 E        }
, j: j! f( v: E4 ~( B0 Y. p3 Y    }
( h/ |, O7 l1 j4 O/ {}
' }9 }- ^; b0 h8 K9 [9 B' }+ ?3 p( m, X$ {" j- J7 C  S# ?

# e8 T6 E0 A, T8 Y0 D#[cfg(test)]
6 r# V8 i3 g" {' fmod tests {
' }' Y& o- p9 @% H    use super::*;# z, a1 x$ L) D6 x% L' b- w/ q
    #[test]
( k- }8 a; F' Q" b" p. j/ L  F    fn it_works() {$ |. @9 |/ \) p
        let p = Pool::new(4);
3 c4 {' m- x& J6 |/ y. _$ F        p.execute(|| println!("do new job1"));
% v+ b! {6 e* w0 V( d0 p/ c6 A        p.execute(|| println!("do new job2"));
3 b5 \6 @; U5 X8 I, {! @6 R        p.execute(|| println!("do new job3"));
4 s- }( ~$ T! X$ ^" j3 t        p.execute(|| println!("do new job4"));
3 c1 `: b! \5 u* i  K. \7 i    }
5 G7 y5 J! a* m# z: ?: J}' }' g1 Q/ w% E  s1 g( v& |. i
</code></pre>
( N1 }* M% r/ F+ q2 c, }
! [& U5 `" s. {' e+ Z
回复

使用道具 举报

懒得打字嘛,点击右侧快捷回复 【右侧内容,后台自定义】
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

手机版|飞雪团队

GMT+8, 2024-11-24 02:28 , Processed in 0.065840 second(s), 21 queries , Gzip On.

Powered by Discuz! X3.4

Copyright © 2001-2021, Tencent Cloud.

快速回复 返回顶部 返回列表