飞雪团队

 找回密码
 立即注册
搜索
热搜: 活动 交友 discuz
查看: 14582|回复: 0

rust 实战 - 实现一个线程工作池 ThreadPool

[复制链接]

8030

主题

8118

帖子

2万

积分

管理员

Rank: 9Rank: 9Rank: 9

积分
26420
发表于 2022-2-12 14:35:42 | 显示全部楼层 |阅读模式
/ J. T; H/ r/ j0 _' B/ Q
<h1 id="如何实现一个线程池">如何实现一个线程池</h1>
1 u, O. R2 i. C<p>线程池:一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。 例如,对于计算密集型任务,线程数一般取cpu数量+2比较合适,线程数过多会导致额外的线程切换开销。</p>
6 P: Y; p. l! W" _  n* O3 j+ r<p>如何定义线程池Pool呢,首先最大线程数量肯定要作为线程池的一个属性,并且在new Pool时创建指定的线程。</p>
  h$ @* T" T) v0 s7 g* v<p>线程池Pool</p>
& F2 v) z4 p! j7 @$ j8 |2 W<pre><code>pub struct Pool {& P# Y9 I1 D& ]0 J, \% q& C3 r$ v
  max_workers: usize, // 定义最大线程数
2 a: c! s% O! Y) r}
" G+ o3 O7 w2 _, V- A
' z% R- Y; g2 k+ `. d' Wimpl Pool {
$ K' T. ~) q" p  fn new(max_workers: usize) -&gt; Pool {}
0 |" z+ q8 X" J% V# _7 G  M% D0 A  fn execute&lt;F&gt;(&amp;self, f:F) where F: FnOnce() + 'static + Send {}- o6 h! w  n+ O
}
( P* l2 O" S7 Z& z: R# @7 H5 w1 ^+ R* K* y3 S. ]
</code></pre>
0 d0 ?- \" g' C<p>用<code>execute</code>来执行任务,<code>F: FnOnce() + 'static + Send</code> 是使用thread::spawn线程执行需要满足的trait, 代表F是一个能在线程里执行的闭包函数。</p>
# {- S# ]: t& [9 k. P% c4 y8 i<p>另一点自然而然会想到在Pool添加一个线程数组, 这个线程数组就是用来执行任务的。比如<code>Vec&lt;Thread&gt;</code> balabala。这里的线程是活的,是一个个不断接受任务然后执行的实体。<br>
/ H/ Y- {0 K; L可以看作在一个线程里不断执行获取任务并执行的Worker。</p>* S0 {; Z  D- [- a, v5 x( K
<pre><code>struct Worker where3 P1 E7 U' j! F8 T6 |2 y$ x! |
{
0 n' p. l; n, X    _id: usize, // worker 编号
4 ^6 K+ o: I$ v: M/ V9 \}% q+ U" ?5 h: S  b1 Y7 h, @
</code></pre>
- q' y/ s1 Z2 J1 M1 g; G<p>要怎么把任务发送给Worker执行呢?mpsc(multi producer single consumer) 多生产者单消费者可以满足我们的需求,<code>let (tx, rx) = mpsc::channel()</code> 可以获取到一对发送端和接收端。<br>
! x) O! p9 h4 C% }0 j把发送端添加到Pool里面,把接收端添加到Worker里面。Pool通过channel将任务发送给多个worker消费执行。</p>1 G( @0 _5 d/ B) X' K4 H" [" K
<p><strong>这里有一点需要特别注意,channel的接收端receiver需要安全的在多个线程间共享</strong>,因此需要用<code>Arc&lt;Mutex::&lt;T&gt;&gt;</code>来包裹起来,也就是用锁来解决并发冲突。</p>
% q& n% X3 v1 g  q# ]$ D# S. v<p>Pool的完整定义</p>
+ A* f' \7 A* X6 W) q) P<pre><code>pub struct Pool {. q" v3 Z4 ~9 @, K+ c
    workers: Vec&lt;Worker&gt;,3 J* E5 }. f8 l( y1 A1 |
    max_workers: usize,
& ]4 ?; i& H7 N) b9 K+ x2 b    sender: mpsc::Sender&lt;Message&gt;
0 g1 c3 M$ }; Z5 O}+ @( V& o- L$ i' T- l2 Q
</code></pre>
, A1 J* c$ X& h: G3 {  C' O<p>该是时候定义我们要发给Worker的消息Message了<br>
: q; h1 |5 K6 G$ N9 u定义如下的枚举值</p>
; y  A" T( \! U<pre><code>type Job = Box&lt;dyn FnOnce() + 'static + Send&gt;;; o+ ^+ f4 \4 {* z/ F! g
enum Message {
1 K6 z; O" [; k* U* Z    ByeBye,& l3 }( m: e, d1 D
    NewJob(Job),. U! z, q1 |' D0 p
}
: B0 J2 E& R$ M% k1 F. l1 F3 C0 r</code></pre>
9 ~) P1 v* e, }, D9 A. Y/ `9 p<p>Job是一个要发送给Worker执行的闭包函数,这里ByeBye用来通知Worker可以终止当前的执行,退出线程。</p>
2 j* i+ W1 j+ F$ a; D6 Z+ v; q<p>只剩下实现Worker和Pool的具体逻辑了。</p>* R8 x1 a' o( z2 k: {" ?& t# C
<p>Worker的实现</p>
# V" I; T" q+ Y) J3 d6 ^<pre><code>impl Worker
6 F: ~! h7 i: v4 v! Y6 D{6 ~# g" v* b  T+ J
    fn new(id: usize, receiver: Arc::&lt;Mutex&lt;mpsc::Receiver&lt;Message&gt;&gt;&gt;) -&gt; Worker {; L) B2 K1 w6 B  T
        let t = thread::spawn( move || {
. f  d- C0 B$ m- l& ^            loop {
6 t4 K4 m! P, g% b9 i                let receiver = receiver.lock().unwrap();
5 y- h$ C* C% P3 I* r                let message=  receiver.recv().unwrap();
6 K1 r) ^' Q9 A) Y$ F$ T                match message {
# Q' j4 E/ W3 Z0 M" g                    Message::NewJob(job) =&gt; {& D4 K: O0 x6 Z  U) f6 B
                        println!("do job from worker[{}]", id);
* y! K$ Z/ O  x) s9 C  B                        job();5 B6 W% _9 F! y: C, T$ ~# C
                    },8 l5 S  k  p) L
                    Message::ByeBye =&gt; {
/ p5 Y3 u/ N8 W  w( l0 ?7 [+ _                        println!("ByeBye from worker[{}]", id);: d+ W# A8 w- p
                        break( s6 ]" I0 X3 R; o0 i) {$ L
                    },$ O; p7 h7 @+ X  t
                }  
" M2 c+ h6 _4 j& ~- l  k            }
/ a% C$ M, E. i7 I! B: g        });
  e& ?: L4 X; E, c2 M
1 h  u* e5 S8 d! b9 F# q6 B9 [7 l        Worker {- j9 M( W. H+ `6 d
            _id: id,
4 w3 r) p' U3 |% k            t: Some(t),
% n, A; J- \2 N        }
& w! j6 A- H% u) d    }0 V9 o8 i! E: y8 O* p% }
}8 F$ D; K) H' g
</code></pre>& z5 ?3 u/ \1 u4 g" |
<p><strong>let message = receiver.lock().unwrap().recv().unwrap();</strong> 这里获取锁后从receiver获取到消息体,然后let message结束后rust的生命周期会自动释放掉锁。<br>
4 r( c% S2 _) ^! o( J但如果写成</p>
8 f. y3 N* {* ^<pre><code>while let message = receiver.lock().unwrap().recv().unwrap() {$ S: a+ H9 X/ ?. Q; A
};% _* j. O# h( F5 g
</code></pre>2 R6 H, B! U$ k, U. U; D# V" v
<p>while let 后面整个括号都是一个作用域,要在这个作用域结束后,锁才会释放,比上面let message要锁定久时间。<br>
- S. V9 g/ L5 J5 arust的mutex锁没有对应的unlock方法,由mutex的生命周期管理。</p>
/ H# L! H# j  A# H/ ]<p>我们给Pool实现<code>Drop</code> trait, 让Pool被销毁时,自动暂停掉worker线程的执行。</p>
% I  z/ }8 K( ^9 F7 z<pre><code>impl Drop for Pool {
9 O( D( k" i5 }! h    fn drop(&amp;mut self) {
) @2 P% |6 r! y        for _ in 0..self.max_workers {0 s5 B2 e4 ]/ B0 B8 T* U4 L
            self.sender.send(Message::ByeBye).unwrap();
6 X+ Z9 Q) a; s1 w        }
+ {, l6 u9 o% l7 `        for w in self.workers.iter_mut() {  [0 ^9 T* ?) Z6 @
            if let Some(t) = w.t.take() {8 h: @' w2 W, u& h- p6 H1 j
                t.join().unwrap();, g6 p- C/ _- z5 m  v2 w
            }
: }! b$ i6 n# _$ O4 s  f, s7 L        }
; d" A- U6 [& e" R: L/ d    }
0 x$ V2 `9 K- V# T}' b* L0 Z8 A5 R5 X- N, d2 n
6 T5 i; [. s: U
</code></pre>
5 D+ E& ]6 g+ b; D6 ]" \<p><strong>drop方法里面用了两个循环</strong>,而不是在一个循环里做完两件事?</p>- Z' G0 }2 X) q$ d5 a4 \
<pre><code>for w in self.workers.iter_mut() {
- ?; A3 J; Z* _: k) f) I! O    if let Some(t) = w.t.take() {/ o1 W4 {6 u3 O# W1 d1 |
        self.sender.send(Message::ByeBye).unwrap();/ c" t. x7 }7 d: @& \0 \1 z9 l! L
        t.join().unwrap();% ?+ J- a6 b$ ~) S, ^
    }6 Y" R* x$ g+ n4 _1 b
}( ?& G$ N% D) W6 u9 U1 l& ?

6 q& L  i( m2 l! C, L</code></pre>
7 ~5 z5 \. m- x% P) R<p>这里面隐藏了一个会造成死锁的陷阱,比如两个Worker, 在单个循环里面迭代所有Worker,再将终止信息发送给通道后,直接调用join,<br>
: {; Z% e( f) H' f我们预期是第一个worker要收到消息,并且等他执行完。当情况可能是第二个worker获取到了消息,第一个worker没有获取到,那接下来的join就会阻塞造成死锁。</p>: O; u3 d9 f2 n' ]
<p><strong>注意到没有,Worker是被包装在Option内的</strong>,这里有两个点需要注意</p>% u& i5 D2 }6 X2 ?, K
<ol>" u& G5 ]6 `0 Z1 i0 C% N1 |
<li>t.join 需要持有t的所有权</li>
3 M1 o/ D# x2 ?8 p+ G5 d. _% Q+ j<li>在我们这种情况下,self.workers只能作为引用被for循环迭代。</li>5 k6 T. V4 J5 v! f; M
</ol>
- h5 f- X5 A' K3 q& D2 k, }/ a  g<p>这里考虑让Worker持有<code>Option&lt;JoinHandle&lt;()&gt;&gt;</code>,后续可以通过在Option上调用take方法将Some变体的值移出来,并在原来的位置留下None变体。<br>% s; |7 c: e1 a6 A+ g
换而言之,让运行中的worker持有Some的变体,清理worker时,可以使用None替换掉Some,从而让Worker失去可以运行的线程</p>
; X; }, u( J5 u6 z<pre><code>struct Worker where" t; V" e- y7 Z
{
9 Y" o' |$ {7 i8 J) E9 y2 K    _id: usize,/ p2 R: e4 I( {4 G6 m
    t: Option&lt;JoinHandle&lt;()&gt;&gt;,8 X7 c: {: t0 X# U9 \. s: b
}7 g- G' Z, _, }# B" r
</code></pre>
4 e; w/ S+ o( j- d" o; g& G<h1 id="要点总结">要点总结</h1># U  m' c. W8 R9 @" f( q6 M
<ul># I/ l6 L* I& z% I! K5 v
<li>Mutex依赖于生命周期管理锁的释放,使用的时候需要注意是否逾期持有锁</li>  |& w2 _" r  H+ D$ P, W
<li><code>Vec&lt;Option&lt;T&gt;&gt;</code> 可以解决某些情况下需要T所有权的场景</li>+ H7 ?! F: `) v' l0 q
</ul>+ _/ b# k( {5 t9 u: |# W
<h1 id="完整代码">完整代码</h1>
' n" l. o, D7 R# J2 `6 F# o( ?* W<pre><code>use std::thread::{self, JoinHandle};3 T7 j* j0 _3 Y) X- q9 D+ q) a
use std::sync::{Arc, mpsc, Mutex};# u- m3 g- ?6 Q! R4 `; e! h
( d9 R0 [4 i( K) L7 y% p
8 A8 X' f8 A% c/ M( S+ ?
type Job = Box&lt;dyn FnOnce() + 'static + Send&gt;;
9 `) w8 T, [5 T8 [) zenum Message {4 X) E/ N+ |  V
    ByeBye,& H6 s4 l* z; U, p1 |
    NewJob(Job),4 u. A( U8 S: j3 ]1 h& J
}
2 @+ h- O7 J- ~: z: C
, q( e5 U5 Z& b- |9 d& l' W0 d9 \& K) Istruct Worker where# @9 P# J3 b4 a- x
{% v+ R, z) }# C) ?; Z
    _id: usize,
7 e% P2 J8 C1 f    t: Option&lt;JoinHandle&lt;()&gt;&gt;,% P! j3 _' T6 }  F/ j
}
2 e) ~/ [" `, G) v  x/ e& O/ ?; b3 {
impl Worker( d# H3 _  O* J  u$ i0 U  n& S1 q
{
* ?( h; p8 Z7 E/ `9 g    fn new(id: usize, receiver: Arc::&lt;Mutex&lt;mpsc::Receiver&lt;Message&gt;&gt;&gt;) -&gt; Worker {' M/ w7 ~/ @: W' {- y3 c, f4 j6 S" E
        let t = thread::spawn( move || {
! f. z% T: z; L, W            loop {- t6 `5 I7 d6 G- v% u. B/ I' v
                let message = receiver.lock().unwrap().recv().unwrap();; ]( N# |' q7 w1 I& V
                match message {5 v( Q% \  }6 v) N7 i
                    Message::NewJob(job) =&gt; {: ?6 h/ _# n% O" I) n
                        println!("do job from worker[{}]", id);. o% W, s6 Y. N2 r
                        job();
5 H$ T% G" N5 Q, f( L, a                    },8 }2 j$ x: W1 p" o1 h  J  w
                    Message::ByeBye =&gt; {7 p9 U9 f6 P) i. E
                        println!("ByeBye from worker[{}]", id);
: g5 |4 j1 H9 N$ ?+ Q) L                        break7 p( l8 J8 s+ w4 E9 s  U
                    },
- n2 F7 w% k8 S: F2 q* v                }  
, Y/ Z6 }* z0 C, N8 Y* x            }9 Y9 p! ~" d+ u( _$ R8 N
        });6 ?# x8 b6 V- z5 Z2 G. \8 I3 K) k
3 q' q( {* W: P/ y
        Worker {7 ~, M& L2 |! W/ j5 f! c
            _id: id,
) Y% f0 y7 f9 ?. t            t: Some(t),
8 @, }4 t3 n! x5 J- D$ `/ Z        }
6 v8 m. A" R- S( u    }
( ^3 w( q1 v- \: r' i}
# g; G7 d, n! k9 q' I2 ?
( B( ], M& o- a' ~  Mpub struct Pool {  j( |' m- I0 L5 n
    workers: Vec&lt;Worker&gt;,
. d7 u' M; \' \$ R: \    max_workers: usize,
& I! [9 F* d' L0 C  |& b    sender: mpsc::Sender&lt;Message&gt;% W- |2 |5 W/ U" b$ M2 u$ E4 [, v
}" u0 |% C, E$ C7 x
. C% b* [$ ^) l" \
impl Pool where {
5 u5 n8 V: b0 c; N5 N  G    pub fn new(max_workers: usize) -&gt; Pool {3 }3 k. O5 L$ V" Z& u* C, w- h6 D+ \
        if max_workers == 0 {
/ T: A" H: l3 g0 r) o* ]            panic!("max_workers must be greater than zero!")& m7 p- A. h2 s# q! @6 [2 [
        }7 u5 Z. k! n' ]" [# q9 _
        let (tx, rx) = mpsc::channel();
8 ?; \/ v; c- L* K8 [2 p, L4 @) S% f: O3 p
        let mut workers = Vec::with_capacity(max_workers);
% q9 d/ C! O) A1 E, }/ e        let receiver = Arc::new(Mutex::new(rx));
7 h, ?# _+ `* Q& B0 Y- D        for i in 0..max_workers {
* i# l5 Y" F7 |            workers.push(Worker::new(i, Arc::clone(&amp;receiver)));
- c- A0 D7 Z$ ?1 l( O* g        }
7 t( X6 F" U; u5 z2 s1 ]) f5 O4 d! M* O& s
        Pool { workers: workers, max_workers: max_workers, sender: tx }
8 Z7 @/ x5 v4 y' ~    }
5 z2 j- N4 L( S4 v6 l4 C1 z9 N% Y    # ?/ \# a% U7 I
    pub fn execute&lt;F&gt;(&amp;self, f:F) where F: FnOnce() + 'static + Send
+ W- X) j" G) ?7 E% x- B    {
# ~. R, |. q& q% f0 ~2 i+ l5 F* ?7 k4 |8 D5 X* s
        let job = Message::NewJob(Box::new(f));
+ |6 c4 \& A8 c# K6 E        self.sender.send(job).unwrap();0 Y  A: Y; J6 G2 [* Y4 n9 ]; C
    }
" [" B0 Z5 V! h# I) F$ O+ K, B, w}' ?5 f) o, a, r9 u" K5 [

9 ^! Q: W3 H; L0 ]( Qimpl Drop for Pool {
) n4 v7 @* m' _( J+ t    fn drop(&amp;mut self) {8 W( }$ j1 J0 i' b9 e7 ?9 ]" B2 \
        for _ in 0..self.max_workers {
: X5 K' ~8 ^  _3 y, b6 P; x            self.sender.send(Message::ByeBye).unwrap();* |, l" I- i# J* a: K* g
        }) p" K6 S" t0 p9 z
        for w in self.workers {  f% r" H; V; @+ `
            if let Some(t) = w.t.take() {' i1 q1 h0 E- Q& r2 ?& b# f, {% y% o6 ?7 N
                t.join().unwrap();4 l8 B  J6 C1 g3 s
            }9 Y  u8 D; ~4 Y
        }8 I6 Z, h7 B5 u5 R* a/ g
    }
0 C6 r6 H* b2 f/ z# E}
0 S4 m. c' \+ {  d' V0 l4 W- r! D5 u) C0 m0 E4 @  j6 i9 G8 X- v

' y# G" |3 p# k: L#[cfg(test)]' {- Z$ Q% o* E. J$ O' _- H
mod tests {
3 C; f2 j5 O6 ]% n6 a: g    use super::*;
+ A8 s" T' U9 {6 |/ V+ z    #[test]% j- R/ j8 Z5 I0 ]0 n6 N
    fn it_works() {  _0 B* n1 C- W/ B5 S/ [
        let p = Pool::new(4);% ]: J0 j2 [0 A* i3 o
        p.execute(|| println!("do new job1"));
: A. ^& S+ W3 H; _) R$ Z        p.execute(|| println!("do new job2"));
: e, t0 H' L& {1 o& t        p.execute(|| println!("do new job3"));8 W4 w5 n0 P/ E: d' C6 O
        p.execute(|| println!("do new job4"));" S5 {! I: t: E
    }: i8 J2 L. @2 M' _
}1 v6 c; _0 S" Z& [* |9 t; X
</code></pre>7 W! c! g& }  _9 ~) `3 `( g
. M2 K3 M; c- a2 L
回复

使用道具 举报

懒得打字嘛,点击右侧快捷回复 【右侧内容,后台自定义】
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

手机版|飞雪团队

GMT+8, 2025-12-2 21:15 , Processed in 0.064566 second(s), 22 queries , Gzip On.

Powered by Discuz! X3.4

Copyright © 2001-2021, Tencent Cloud.

快速回复 返回顶部 返回列表