飞雪团队

 找回密码
 立即注册
搜索
热搜: 活动 交友 discuz
查看: 13847|回复: 0

rust 实战 - 实现一个线程工作池 ThreadPool

[复制链接]

7736

主题

7824

帖子

2万

积分

管理员

Rank: 9Rank: 9Rank: 9

积分
25538
发表于 2022-2-12 14:35:42 | 显示全部楼层 |阅读模式

0 r% Y8 z* @& F7 P, X<h1 id="如何实现一个线程池">如何实现一个线程池</h1>' E& V4 E/ @+ b. {. t! B5 W
<p>线程池:一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。 例如,对于计算密集型任务,线程数一般取cpu数量+2比较合适,线程数过多会导致额外的线程切换开销。</p>3 B8 u/ `: M; g1 m
<p>如何定义线程池Pool呢,首先最大线程数量肯定要作为线程池的一个属性,并且在new Pool时创建指定的线程。</p>
6 U0 V8 }# }! m$ a<p>线程池Pool</p>
; H& y. ^# G6 x0 {# t* w<pre><code>pub struct Pool {- C+ L: C7 }1 P" Y% e1 Y( ?$ e5 _; F
  max_workers: usize, // 定义最大线程数9 c/ ~7 c# p" z
}
# U! d8 J7 k* L8 `
' Y1 v# {/ Q4 e( U3 m7 X, R3 _! fimpl Pool {
" y5 _0 ~0 ]* L) v, s( R$ k7 p  fn new(max_workers: usize) -&gt; Pool {}
# m7 o( d5 [; t' Y9 `6 x  fn execute&lt;F&gt;(&amp;self, f:F) where F: FnOnce() + 'static + Send {}
  ^" T8 g* w' b* m' z" L/ e}+ q* y; A5 ^# `' T; X5 a
9 Y( Z1 |4 E7 C" d5 @
</code></pre>
6 {9 ^% p! `: q1 r3 I* _<p>用<code>execute</code>来执行任务,<code>F: FnOnce() + 'static + Send</code> 是使用thread::spawn线程执行需要满足的trait, 代表F是一个能在线程里执行的闭包函数。</p>
/ T% u5 \, C$ a8 |5 U<p>另一点自然而然会想到在Pool添加一个线程数组, 这个线程数组就是用来执行任务的。比如<code>Vec&lt;Thread&gt;</code> balabala。这里的线程是活的,是一个个不断接受任务然后执行的实体。<br>5 E9 u/ |$ ~( u: {/ m. e' R
可以看作在一个线程里不断执行获取任务并执行的Worker。</p>1 ?$ H) \# `8 [* ]
<pre><code>struct Worker where
7 s2 Y* ]6 e! S. P' Z{
( Y, Y4 @; H% a  N& P- G    _id: usize, // worker 编号
& p" I( |6 r- A8 O# f! W2 a}
' X5 w! ?5 r- Z2 M& G</code></pre>6 ]+ d, _6 `8 U. ]4 A
<p>要怎么把任务发送给Worker执行呢?mpsc(multi producer single consumer) 多生产者单消费者可以满足我们的需求,<code>let (tx, rx) = mpsc::channel()</code> 可以获取到一对发送端和接收端。<br>
. g( y* t3 c. ^3 Z$ g把发送端添加到Pool里面,把接收端添加到Worker里面。Pool通过channel将任务发送给多个worker消费执行。</p>
+ h6 o+ P- v) g/ k$ V- G<p><strong>这里有一点需要特别注意,channel的接收端receiver需要安全的在多个线程间共享</strong>,因此需要用<code>Arc&lt;Mutex::&lt;T&gt;&gt;</code>来包裹起来,也就是用锁来解决并发冲突。</p>' q; ]6 z6 c5 L0 A7 O3 Y6 h+ r7 r. }
<p>Pool的完整定义</p>: y8 b% N5 _2 M
<pre><code>pub struct Pool {+ I, V8 ], @* C/ x: A' K
    workers: Vec&lt;Worker&gt;,# G* N% S' B* l; }( L7 a$ ]
    max_workers: usize,
, p# d8 V3 y2 l/ i2 r    sender: mpsc::Sender&lt;Message&gt;
6 c4 O7 g9 h6 Z+ e" \. t; z}
* y  `0 t+ B( B5 a4 h</code></pre>
( b. ?! b) v% H/ X: t! d  T<p>该是时候定义我们要发给Worker的消息Message了<br>5 P) i$ n& V- e5 M
定义如下的枚举值</p>
" T, q. i7 v; N0 f6 O6 a<pre><code>type Job = Box&lt;dyn FnOnce() + 'static + Send&gt;;9 T% u7 _' Z; W2 c
enum Message {
( P/ v* N( P5 }% }$ C# C+ H0 @5 s    ByeBye,
  E# U6 b# I2 z8 j2 a    NewJob(Job),+ {$ d0 r0 L4 W# ?/ J
}
, T7 ?( ~9 ~6 K- y& q5 k8 H' B$ }</code></pre>/ Y9 U, H& L: I- E
<p>Job是一个要发送给Worker执行的闭包函数,这里ByeBye用来通知Worker可以终止当前的执行,退出线程。</p>/ o; n* D/ j+ @0 D% |
<p>只剩下实现Worker和Pool的具体逻辑了。</p>
7 M$ k, l  {5 H: ?& t! w5 `, O<p>Worker的实现</p>
2 K* \" j! o! C( Q; i5 J<pre><code>impl Worker6 n. T! S4 M8 L7 f5 ~5 F7 }! F
{
, ^# m6 {: E. ^+ G+ j0 P$ U    fn new(id: usize, receiver: Arc::&lt;Mutex&lt;mpsc::Receiver&lt;Message&gt;&gt;&gt;) -&gt; Worker {
* y7 s# D# j  L" O# b3 S+ T        let t = thread::spawn( move || {
8 r. p/ j' {; F. Q9 E3 m& e+ y            loop {& x" s6 F6 _- d7 }
                let receiver = receiver.lock().unwrap();
1 k: R! k% _% ~) ~                let message=  receiver.recv().unwrap();* ?+ V+ a% M6 o+ w0 J# y6 t% ~
                match message {# a+ L5 b5 b& \7 z7 K( M, B
                    Message::NewJob(job) =&gt; {
1 c! G# r. b! S( J4 \$ Q7 d9 m. Y% X                        println!("do job from worker[{}]", id);
$ z! C5 _  D( T2 r) p2 x                        job();
6 P1 ?) ]$ N! H: C$ A                    },
2 @/ V- k0 f' w! g9 w  @7 H                    Message::ByeBye =&gt; {* C: h0 L# t) r* i- a. Z& Q: p1 l
                        println!("ByeBye from worker[{}]", id);- V% |$ C( i% D3 \2 C: [: m
                        break
$ h# [. b9 g/ b8 c* |  q; F                    },6 ~# O+ u! _4 t( s4 }
                }  8 b0 b5 v% ?) I
            }1 v" D1 Q: }5 r8 N/ I% ~
        });
( B! l3 x! r9 D- b- w" }  |
7 a# \1 s2 m6 Y5 P5 n        Worker {
+ |+ |( X0 c: \8 }  A# Y            _id: id,5 X  v2 v* |( E) o, N
            t: Some(t),$ u$ {1 p& I& t5 z
        }
, X- [: f) E0 L( w1 L; @    }& A0 Q( m, b8 `1 }
}
9 P+ q' j+ k4 m  K</code></pre>5 F, R0 u- B8 {) @8 r' F
<p><strong>let message = receiver.lock().unwrap().recv().unwrap();</strong> 这里获取锁后从receiver获取到消息体,然后let message结束后rust的生命周期会自动释放掉锁。<br>
/ a' H  e7 [! o  R但如果写成</p>$ v9 U1 U5 D* V  K5 O" g1 E# {, a
<pre><code>while let message = receiver.lock().unwrap().recv().unwrap() {9 S0 w$ u* P$ ~+ @
};& Q; \) ^" ]5 X+ z$ S
</code></pre>
7 `1 G1 s6 {7 y4 L# \7 ]& n<p>while let 后面整个括号都是一个作用域,要在这个作用域结束后,锁才会释放,比上面let message要锁定久时间。<br>* ^4 J4 Z: n: A9 ~( `! H% V
rust的mutex锁没有对应的unlock方法,由mutex的生命周期管理。</p>
3 x9 k6 t) H' @5 ?<p>我们给Pool实现<code>Drop</code> trait, 让Pool被销毁时,自动暂停掉worker线程的执行。</p>+ t. B& x9 B. h1 ]% K- Z8 ~: p
<pre><code>impl Drop for Pool {
- K+ v" V% Q: K  c& x    fn drop(&amp;mut self) {7 l% ~3 f7 A* F! ?
        for _ in 0..self.max_workers {" t( @# k/ F: S1 R" A0 H
            self.sender.send(Message::ByeBye).unwrap();- r6 T( ~3 d; q0 |9 V# S# c2 b
        }4 a0 h3 s/ @! r3 m  r; b, W
        for w in self.workers.iter_mut() {
2 f6 R5 H; ^4 u            if let Some(t) = w.t.take() {
. Z1 R: `1 ^0 X                t.join().unwrap();  l% e; b  ?2 Q7 }
            }& V& q, k/ |; @4 V% W/ ~/ m
        }. U: ~2 V# ~( ?" r( W% [0 q
    }
( L  }* k% |( t7 K}
" j4 i! H' U' o* r$ p5 t
; z- L* x1 e% W: c</code></pre>
) B  u' W7 d3 a" N: M$ s* ~<p><strong>drop方法里面用了两个循环</strong>,而不是在一个循环里做完两件事?</p>/ P# G% ?. R- m1 {7 t$ K+ Y
<pre><code>for w in self.workers.iter_mut() {5 l) o! E- O# P, N  q0 x
    if let Some(t) = w.t.take() {# P! k5 O4 s( ^7 I
        self.sender.send(Message::ByeBye).unwrap();2 v5 a6 {4 K2 M& j
        t.join().unwrap();4 v: g" |& `& s/ y: R$ ]
    }8 U  O# `5 ^8 \8 V
}; |% ]7 H6 Z' ^& m8 q6 r$ S

& I+ e. a2 V" ]8 ?9 x</code></pre>
+ ^0 J! ], f8 v2 E<p>这里面隐藏了一个会造成死锁的陷阱,比如两个Worker, 在单个循环里面迭代所有Worker,再将终止信息发送给通道后,直接调用join,<br>! B; R9 z* O& m& W4 K
我们预期是第一个worker要收到消息,并且等他执行完。当情况可能是第二个worker获取到了消息,第一个worker没有获取到,那接下来的join就会阻塞造成死锁。</p>; v% j2 i* E8 ]
<p><strong>注意到没有,Worker是被包装在Option内的</strong>,这里有两个点需要注意</p>2 B% \" ~/ _% f( a- c
<ol>9 C& D2 f1 s: D( I+ R& Y
<li>t.join 需要持有t的所有权</li>
6 `1 U3 U; w5 q<li>在我们这种情况下,self.workers只能作为引用被for循环迭代。</li>
; [, [0 c% \- X3 a7 |  X</ol>, W# k2 ^% a3 t0 {: V
<p>这里考虑让Worker持有<code>Option&lt;JoinHandle&lt;()&gt;&gt;</code>,后续可以通过在Option上调用take方法将Some变体的值移出来,并在原来的位置留下None变体。<br>6 U- m3 H. F: l2 {% ~/ b
换而言之,让运行中的worker持有Some的变体,清理worker时,可以使用None替换掉Some,从而让Worker失去可以运行的线程</p>
; E+ F( s5 G, w$ v( r<pre><code>struct Worker where
* x/ U; q9 A. M6 M3 |{6 D2 i* Q5 H( i  X* u) Q
    _id: usize," n% d5 p5 J: K$ P; v5 j
    t: Option&lt;JoinHandle&lt;()&gt;&gt;,8 R: }7 A# [4 ~9 N& s( M
}
& i* j" o( r7 X1 O+ Q# l</code></pre>
. X4 d" R- d# q+ x  \9 X2 d<h1 id="要点总结">要点总结</h1># z: f2 L; F; |: I
<ul>* ]1 I1 C2 s1 \9 L3 m% W
<li>Mutex依赖于生命周期管理锁的释放,使用的时候需要注意是否逾期持有锁</li>
  O$ u2 L/ L- Y  I( J' g<li><code>Vec&lt;Option&lt;T&gt;&gt;</code> 可以解决某些情况下需要T所有权的场景</li>
/ n1 k8 Z$ n4 |8 }</ul>! _/ O. V! ?1 `& u! g* N% i, @
<h1 id="完整代码">完整代码</h1>3 Z( G6 D8 K" g4 O
<pre><code>use std::thread::{self, JoinHandle};. E2 u+ @" N8 d. ^8 E
use std::sync::{Arc, mpsc, Mutex};
1 d, q9 x8 B& P7 y' n/ j
/ u8 v, |8 e% }1 w3 G' W
9 z. K  H6 k& s4 G* z/ Jtype Job = Box&lt;dyn FnOnce() + 'static + Send&gt;;( h0 @& ~/ d$ T, z' l
enum Message {% z+ v4 ^* @2 E9 }
    ByeBye,
: p/ s/ s' C9 J* r- R$ L    NewJob(Job),
+ b: V: `% T5 w}
1 u, j, S- H% b( Z: b
# ^- ?! B) J0 x: y6 r  Rstruct Worker where: d  Q8 I$ z6 T1 {, T; R
{
6 |, ^& U5 C5 c/ M: A/ P! l/ ?4 R    _id: usize,
" c; A- l, _/ a" z8 F: a    t: Option&lt;JoinHandle&lt;()&gt;&gt;,
7 O$ x4 E4 T+ C6 V. y; T/ N" {}: n' C! V1 @) e  E+ i6 R! u

: w- q8 l# N  E8 f- r5 ^3 nimpl Worker1 e$ Y) C, ]2 o& s
{. @1 v" R  W, M5 b
    fn new(id: usize, receiver: Arc::&lt;Mutex&lt;mpsc::Receiver&lt;Message&gt;&gt;&gt;) -&gt; Worker {7 _+ X7 a8 k  T1 J, p" U
        let t = thread::spawn( move || {
& G$ l9 t9 d, K# |            loop {8 a( o+ [& f" I/ i
                let message = receiver.lock().unwrap().recv().unwrap();  X3 u+ y: B3 L* u) Q
                match message {* W$ Y: ~/ B" H# H4 q4 c2 m6 d
                    Message::NewJob(job) =&gt; {
- t- c  j" t2 y7 I, {& {                        println!("do job from worker[{}]", id);
1 x/ v3 _- l' D; o                        job();' t8 |# m2 ~% ^  V2 r- u
                    },0 t; d) I9 k$ _7 V* k. F6 T8 U
                    Message::ByeBye =&gt; {; V1 n; E' I+ ^0 |& t" i
                        println!("ByeBye from worker[{}]", id);
4 ~$ [/ _5 I( D& T) f                        break
/ H  L+ @$ `$ |8 g' e4 O$ W2 M8 i                    },: V- |+ e& A- I
                }  
6 l) M$ o. |1 }  J9 T            }
) J9 h; y6 X  N9 O6 }! ^% t' P        });
$ \9 i4 z% G3 N# G/ @+ E! _, {& h2 Q# n2 m
        Worker {
8 c9 h: j" l, M            _id: id,# j- B( c2 G) i4 Y/ z1 [4 g% C+ U
            t: Some(t),* E* D) L# C- r2 X8 M: G$ E0 o" k
        }
/ u- j: C( ?, z/ o    }+ s  H4 O, c: S
}3 G- a' T6 B) i3 g7 |+ p4 ]
/ C: o3 d. c4 F( k
pub struct Pool {
' {0 Y, q0 z, `; d* x; z  t& L5 R    workers: Vec&lt;Worker&gt;,
! H- k8 U, {5 z+ J3 J6 ?' E    max_workers: usize,
/ Q3 [" j& ]3 u# j9 C    sender: mpsc::Sender&lt;Message&gt;& B0 G3 M6 r, h) I! w- x* S
}
! d6 r8 y$ }9 r5 Q
3 d) U( N9 k# g( Cimpl Pool where {
0 s6 I. I5 y1 g! E" h" T    pub fn new(max_workers: usize) -&gt; Pool {
% U/ H  G; `& b3 M* q        if max_workers == 0 {( b9 y; v  R7 u/ [5 W
            panic!("max_workers must be greater than zero!")
4 L, F) L; S2 l( O9 y( H        }3 w' j+ P* R8 a4 ]5 _1 _
        let (tx, rx) = mpsc::channel();& K, Y6 s- Q& V9 h. A
% p& \# W$ G  L* H/ ~) U
        let mut workers = Vec::with_capacity(max_workers);6 G- g4 r% P& n3 A% t
        let receiver = Arc::new(Mutex::new(rx));- V- a) J5 v3 D
        for i in 0..max_workers {/ c( |/ @1 M9 m+ {
            workers.push(Worker::new(i, Arc::clone(&amp;receiver)));: l- n# t2 p" x, ]% n3 A
        }4 Z" c0 ?5 M! F6 |0 {

' V/ M& h+ [" R# f/ ?        Pool { workers: workers, max_workers: max_workers, sender: tx }
1 E- I5 ~0 a9 D: N    }6 V5 |4 P" z: n9 H
   
+ K2 Z. q% v! G: O: h    pub fn execute&lt;F&gt;(&amp;self, f:F) where F: FnOnce() + 'static + Send/ O- G( z7 I( i9 {" o+ ^
    {1 b, c( x9 t8 U2 U

* u0 j! k) ]' U( c% O5 [5 `        let job = Message::NewJob(Box::new(f));
) i3 C# v' R" A8 J# o0 s  y        self.sender.send(job).unwrap();+ T$ w3 R3 _, L% c( z
    }# }8 w1 t# P  l5 b) F
}
. f9 w. Z$ Q- D1 @8 H: K7 w6 K* s) i1 X6 P5 q0 S/ l5 E9 q
impl Drop for Pool {
& p$ R0 i* g3 @    fn drop(&amp;mut self) {2 n6 {3 T- k5 e" K& U8 \
        for _ in 0..self.max_workers {
5 D" k/ ~! D8 b$ Z# b- A            self.sender.send(Message::ByeBye).unwrap();
. ^; m, Y# i; L: a, C+ h        }3 B1 `7 \5 R& F. O  k* L! K
        for w in self.workers {' U" |$ L2 K( M  b1 O
            if let Some(t) = w.t.take() {
% l: U8 ^& Q8 D- D. d4 T2 Y                t.join().unwrap();
2 g6 j) B7 F+ G! A% @& Q' L+ c            }
# B+ @" _9 ^: D2 Q2 ^        }% B, H4 I8 o9 m( D
    }6 `2 @2 G$ U. m1 {
}
! q  q7 H5 a8 g( }- R
1 m3 g! b$ H) {. D' A* q9 y9 c8 B
1 m- t3 ^' \9 {#[cfg(test)]7 F' H! @0 U* w1 _. k) t) R4 m: I
mod tests {2 D( ^; u% e0 S9 D9 K4 O
    use super::*;0 e2 N' W2 i( e& K0 [5 b
    #[test]
6 ]4 n1 a  M: E1 N    fn it_works() {( ~; Y- R8 c- Z4 [& t' Q* A. @
        let p = Pool::new(4);
& o2 B+ Q- R5 j        p.execute(|| println!("do new job1"));
) l' z: o( _0 L        p.execute(|| println!("do new job2"));
; q9 E. O1 J, t4 N; r3 R        p.execute(|| println!("do new job3"));$ }& P; {& v% d' U8 F
        p.execute(|| println!("do new job4"));
8 X; v* A2 ]+ V2 Q    }  y; {  @+ Q3 i5 X' D
}
' I# z; |* r& o. i% ^+ M1 s</code></pre>
# R% \0 o- w! {) L% _- F& H
1 I* M) ]/ v1 j, b5 T. L. {
回复

使用道具 举报

懒得打字嘛,点击右侧快捷回复 【右侧内容,后台自定义】
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

手机版|飞雪团队

GMT+8, 2025-11-3 12:57 , Processed in 0.067201 second(s), 22 queries , Gzip On.

Powered by Discuz! X3.4

Copyright © 2001-2021, Tencent Cloud.

快速回复 返回顶部 返回列表