飞雪团队

 找回密码
 立即注册
搜索
热搜: 活动 交友 discuz
查看: 9852|回复: 0

rust 实战 - 实现一个线程工作池 ThreadPool

[复制链接]

6831

主题

6919

帖子

2万

积分

管理员

Rank: 9Rank: 9Rank: 9

积分
22823
发表于 2022-2-12 14:35:42 | 显示全部楼层 |阅读模式
1 }: R7 O* D  \& R" D% n$ D
<h1 id="如何实现一个线程池">如何实现一个线程池</h1>+ ~. \# B/ ]: I0 v0 R, L0 [# b. _! `
<p>线程池:一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。 例如,对于计算密集型任务,线程数一般取cpu数量+2比较合适,线程数过多会导致额外的线程切换开销。</p>6 G1 i7 _- v: c( {
<p>如何定义线程池Pool呢,首先最大线程数量肯定要作为线程池的一个属性,并且在new Pool时创建指定的线程。</p>0 [5 U! g+ i% S8 e5 Y8 ^. F' v3 S: H
<p>线程池Pool</p>
- l4 h  X  \2 `3 `<pre><code>pub struct Pool {  v( {; Y* W+ U" Z
  max_workers: usize, // 定义最大线程数' X2 ]; K- K' d. z+ y" u
}
6 o# |; @4 Q% c5 A/ B
  \0 T( r* u7 |  N2 fimpl Pool {3 G9 M4 l2 ]6 ]3 e  l; s
  fn new(max_workers: usize) -&gt; Pool {}
- M  i) o1 R9 m  fn execute&lt;F&gt;(&amp;self, f:F) where F: FnOnce() + 'static + Send {}& k9 t5 L0 W* `1 C2 u
}, [* S7 i9 V# `

# E& S' g+ d# s( a  M; t</code></pre>/ T- V( T$ e2 u; o* I  D) I) j
<p>用<code>execute</code>来执行任务,<code>F: FnOnce() + 'static + Send</code> 是使用thread::spawn线程执行需要满足的trait, 代表F是一个能在线程里执行的闭包函数。</p>
# P2 g, w. I, f$ {5 O<p>另一点自然而然会想到在Pool添加一个线程数组, 这个线程数组就是用来执行任务的。比如<code>Vec&lt;Thread&gt;</code> balabala。这里的线程是活的,是一个个不断接受任务然后执行的实体。<br>
7 p* r1 ?( }' B+ F: q6 w+ K可以看作在一个线程里不断执行获取任务并执行的Worker。</p>
3 @0 M9 ]# p+ e2 G<pre><code>struct Worker where2 o; [4 \; I- ~) G+ b; z+ ?; [
{
, s& X2 v/ ^4 ~: P. _    _id: usize, // worker 编号
& r! Y' P$ C9 V1 a  T- h* i. t9 [}
0 N' c; Y3 A7 J# i" p& _</code></pre>
! K8 O) h1 M4 G1 f' m; R<p>要怎么把任务发送给Worker执行呢?mpsc(multi producer single consumer) 多生产者单消费者可以满足我们的需求,<code>let (tx, rx) = mpsc::channel()</code> 可以获取到一对发送端和接收端。<br># `  _0 Q( U+ g& \+ _4 P% V+ U  v
把发送端添加到Pool里面,把接收端添加到Worker里面。Pool通过channel将任务发送给多个worker消费执行。</p>) L" M. ^4 H/ k" \
<p><strong>这里有一点需要特别注意,channel的接收端receiver需要安全的在多个线程间共享</strong>,因此需要用<code>Arc&lt;Mutex::&lt;T&gt;&gt;</code>来包裹起来,也就是用锁来解决并发冲突。</p>. y: l* s2 ?- W! M
<p>Pool的完整定义</p>0 g. B% N1 E* K3 x
<pre><code>pub struct Pool {
( s  d3 u! w6 d0 E6 x* z* A    workers: Vec&lt;Worker&gt;,% B" t. _; c& f9 Z) J$ _
    max_workers: usize,
  p/ S1 R0 C' U    sender: mpsc::Sender&lt;Message&gt;
: K' b% D6 U$ v) E5 B0 T}
7 J% I+ `' J% V+ {* ?4 [8 n$ D</code></pre>
3 k8 U) g9 f0 x! T) r0 \# s8 j<p>该是时候定义我们要发给Worker的消息Message了<br>% C1 F; \* J+ q8 J
定义如下的枚举值</p>( y( B5 m; P1 }! G
<pre><code>type Job = Box&lt;dyn FnOnce() + 'static + Send&gt;;# n# Z2 P) a# h$ |4 R
enum Message {
( E" S8 R" @* r9 D8 h    ByeBye,
' D" R4 Y9 [2 q; g    NewJob(Job),2 q6 ^8 C8 x- a0 _
}
) h. B/ }  N( Y2 J. w4 C</code></pre>3 n6 A& Z6 ~7 u7 r2 x, P" G2 P
<p>Job是一个要发送给Worker执行的闭包函数,这里ByeBye用来通知Worker可以终止当前的执行,退出线程。</p>6 h+ t' z: i! t. U0 I0 e2 s1 x. c6 H+ R) J
<p>只剩下实现Worker和Pool的具体逻辑了。</p>. L' |9 C9 ?& @1 f* V
<p>Worker的实现</p>
! Z3 r7 R- E8 U: w. V9 X: ?! {+ D<pre><code>impl Worker& d5 t6 W1 ^- M6 I6 N% E/ [, m
{% g$ m& A- y6 I  @
    fn new(id: usize, receiver: Arc::&lt;Mutex&lt;mpsc::Receiver&lt;Message&gt;&gt;&gt;) -&gt; Worker {
! `! \6 I8 N( G! I" n6 p        let t = thread::spawn( move || {
6 z' m( }6 ~5 j2 a8 \5 N8 p3 W& b+ C            loop {1 j! ~9 m- H( H* y, M! G
                let receiver = receiver.lock().unwrap();
4 |. F" t# b1 k* G                let message=  receiver.recv().unwrap();
: S7 x6 M0 P7 y- D- i5 V1 s, U6 }! `% }                match message {2 u) w- X" _0 F$ {& ]- T
                    Message::NewJob(job) =&gt; {
& K, d0 Y. b: |& G- M# \                        println!("do job from worker[{}]", id);4 U3 Q2 z$ _9 w; t' K5 z
                        job();9 W: N- |) d* ~  ~9 z0 ~6 B
                    },: O: C9 [7 X/ t, O+ w. n$ ]; K5 e
                    Message::ByeBye =&gt; {; d; u4 g) r2 Q) X
                        println!("ByeBye from worker[{}]", id);
- x. K+ P7 A- o                        break
+ M1 [4 `0 [  h" G! c% D" P& l5 o                    },1 t; o0 h/ P; y% F& Q
                }  
4 j/ z/ S2 m1 `3 T% [            }
4 O3 c+ [5 U/ t3 a! X5 Y        });" b! H' R6 M, f2 T: M

3 }5 @' e  o. w- S        Worker {8 @" k  y! V# z! }
            _id: id,3 D9 _1 f6 }/ i. j. W! u0 n
            t: Some(t),
2 D9 r( N) \5 ?( `0 e' X        }9 R; W$ e2 r/ o0 ]) q
    }
, v' c: M' C8 H7 {& A}+ }5 F4 u( H5 n+ N/ ^
</code></pre>; O; l. u- Y% N" P
<p><strong>let message = receiver.lock().unwrap().recv().unwrap();</strong> 这里获取锁后从receiver获取到消息体,然后let message结束后rust的生命周期会自动释放掉锁。<br>9 V& }* [' G( T  i+ M8 W& s/ m% ^4 i
但如果写成</p>
. q$ @- Y8 q- w1 j& x<pre><code>while let message = receiver.lock().unwrap().recv().unwrap() {
+ G' a: Y5 z& g) k1 w: J};* [' h3 M% l- H7 T; ?9 z0 o
</code></pre>8 k3 v$ p0 I' V( u& X4 I
<p>while let 后面整个括号都是一个作用域,要在这个作用域结束后,锁才会释放,比上面let message要锁定久时间。<br>
5 F7 v' q+ j8 o! @! E- Crust的mutex锁没有对应的unlock方法,由mutex的生命周期管理。</p>9 d* M) t% Y0 ^
<p>我们给Pool实现<code>Drop</code> trait, 让Pool被销毁时,自动暂停掉worker线程的执行。</p>
8 V' W1 v* y) t# R* j<pre><code>impl Drop for Pool {
- q5 n- z6 f/ J3 y0 V6 K% M" _, R    fn drop(&amp;mut self) {. v/ |3 n* |' u6 ^2 c/ D0 ^( v( K2 B
        for _ in 0..self.max_workers {/ h! B( N4 P8 @' J: X& c2 ?6 v7 Z
            self.sender.send(Message::ByeBye).unwrap();1 P4 m5 ]+ D& f1 Y2 V2 j
        }
* f* d' ^% |7 K% G        for w in self.workers.iter_mut() {
3 x- c) h( A( @" S/ V% J6 o0 j7 K            if let Some(t) = w.t.take() {+ u- ?3 r+ ]) Q" b2 l0 @
                t.join().unwrap();3 J& ]: @4 q$ n  h; v4 T
            }
' N' t% Z0 c: {' h% T+ M        }- x! n/ D5 |' ~
    }
, @% g# d: R2 k5 ]* h' C+ W}) ~. P" r. f( p& f- ?

& P1 u2 g2 j* V3 D3 Z% T- T</code></pre>
+ m& `. [9 J; I. [9 L( D; e  Q3 k$ U<p><strong>drop方法里面用了两个循环</strong>,而不是在一个循环里做完两件事?</p>% q. d6 J- ]# _7 O/ ]" {. R
<pre><code>for w in self.workers.iter_mut() {7 _" V  E! ?/ Y5 X7 ?
    if let Some(t) = w.t.take() {
6 U# T0 }! f! z5 C; s7 f& Q( n8 Q( W% @        self.sender.send(Message::ByeBye).unwrap();
) M3 h+ K9 ^" Z9 A: S        t.join().unwrap();( x7 j7 ~) `+ ^/ s* T+ R: _/ A1 V% L
    }: _6 ?$ F7 f7 ]# W+ z2 J6 `
}
; a4 A5 E6 @0 y0 Z- p9 h* Z: g- F0 p5 O4 G1 i8 L) b. y0 ]
</code></pre>
! E* s  f! M& p<p>这里面隐藏了一个会造成死锁的陷阱,比如两个Worker, 在单个循环里面迭代所有Worker,再将终止信息发送给通道后,直接调用join,<br>, v  x& A6 x3 W6 Z! S
我们预期是第一个worker要收到消息,并且等他执行完。当情况可能是第二个worker获取到了消息,第一个worker没有获取到,那接下来的join就会阻塞造成死锁。</p>8 A6 o# Y9 A  {- _* l8 c$ l0 n2 C+ t
<p><strong>注意到没有,Worker是被包装在Option内的</strong>,这里有两个点需要注意</p>9 U* L8 S6 L# `2 O* p9 f
<ol>7 f/ Q: B) S- D( b5 i8 v3 E) i$ C
<li>t.join 需要持有t的所有权</li>6 g7 b! z: }! M" x. s8 C. A' U' N
<li>在我们这种情况下,self.workers只能作为引用被for循环迭代。</li>
. a* N9 U' O1 Q) w( [& q</ol>4 ?2 i% H: L. {; ?% g6 T
<p>这里考虑让Worker持有<code>Option&lt;JoinHandle&lt;()&gt;&gt;</code>,后续可以通过在Option上调用take方法将Some变体的值移出来,并在原来的位置留下None变体。<br>0 ^, Q) v9 y4 k" V3 z
换而言之,让运行中的worker持有Some的变体,清理worker时,可以使用None替换掉Some,从而让Worker失去可以运行的线程</p>
1 t) W0 p; P. g5 {# J! g7 H! Q* g<pre><code>struct Worker where
' |7 D. E8 q( V! Z{
% f5 S. N; X4 A7 ]8 i5 v; Q    _id: usize,
7 H4 h/ Q. u$ [- ^) s    t: Option&lt;JoinHandle&lt;()&gt;&gt;,2 ^4 ?0 B7 y1 F
}
$ M0 U; h! d# T</code></pre>5 X* a& y2 D0 ]+ |& K% K3 b
<h1 id="要点总结">要点总结</h1>4 f, q4 N& f0 Q
<ul>
- U$ @& w: @8 q<li>Mutex依赖于生命周期管理锁的释放,使用的时候需要注意是否逾期持有锁</li>  h) S7 G1 M( z/ M7 W
<li><code>Vec&lt;Option&lt;T&gt;&gt;</code> 可以解决某些情况下需要T所有权的场景</li>
% A2 M7 A$ V7 j7 A; H  E- l8 O- i  P</ul>0 R, w' m" X0 h
<h1 id="完整代码">完整代码</h1>5 Q5 j1 E- _) a4 D) U, u% y) Q8 ~
<pre><code>use std::thread::{self, JoinHandle};
. m* _$ j. B+ ^+ e; }7 yuse std::sync::{Arc, mpsc, Mutex};4 E4 U4 }$ c  C( Z4 |
1 G( @5 e$ o2 Y$ T- o6 M0 ^# K5 I

. j' W6 b* K* ~( E5 b' _2 \type Job = Box&lt;dyn FnOnce() + 'static + Send&gt;;
0 Q* `; X7 a! _* Kenum Message {1 Y. f$ ~$ e  M8 F+ Q1 B. l5 K
    ByeBye,) u( v( H( P& o; H- E
    NewJob(Job),
; v  ]1 W+ t- `* F/ z+ G  @4 Q" q- I}3 G1 y9 [1 p8 B
7 Q5 n5 @: m) O( K% V9 y, X- U
struct Worker where
. d& F% _; R- d6 p& D{
/ n9 p/ Q! y; R2 S: X    _id: usize,2 T; l0 C  I3 q
    t: Option&lt;JoinHandle&lt;()&gt;&gt;,
& \( Q) q7 h' [}: b9 e/ z- Q. Z6 t" b

# P  Y% x% j( \& }+ ]impl Worker2 y; o, s! T9 F9 u- u* R5 P0 v
{
# m: \% k# N0 \: G    fn new(id: usize, receiver: Arc::&lt;Mutex&lt;mpsc::Receiver&lt;Message&gt;&gt;&gt;) -&gt; Worker {. @/ h* m# L$ r8 u' C. |5 p. K
        let t = thread::spawn( move || {* D7 ~% |: }* t) G, c' F4 C$ `
            loop {$ c- }1 \% M0 l, o' y7 }4 U
                let message = receiver.lock().unwrap().recv().unwrap();
. }4 S) R4 h# F+ m# V! D9 [                match message {) p+ C; y) T7 V' J; q" [9 H5 B* U
                    Message::NewJob(job) =&gt; {
$ x! S. Y! M( N! m1 m6 `                        println!("do job from worker[{}]", id);, K9 K( K. O0 D% R0 \
                        job();: {6 B6 D5 w) V, A. D' x
                    },. f# ~+ T1 u; |
                    Message::ByeBye =&gt; {
4 g* I8 z( P+ x1 L1 D7 F2 a/ r                        println!("ByeBye from worker[{}]", id);
7 b- ?% M( F, `/ a                        break
% f' d# t9 X  x, _' \                    },4 I( {7 I) j7 U6 t! j5 i
                }  
/ v7 o3 T  C# ?9 G8 Q, \# F% O            }
3 R7 F% b8 N! h! p        });
+ y; U/ K1 ^" Q7 n& }/ }8 C! l" t; X  V! |" H% g5 b5 L, y' ]
        Worker {" }/ y* S* ?- r# Q
            _id: id,) P) D2 F, E) z0 K
            t: Some(t),! X1 O3 b# X' R: Y: _2 l. J
        }
3 J) j" X- w0 z6 p) Q$ k0 ~    }0 I$ i# y# d- H5 O& e/ x. b0 k4 e1 I
}) {0 q8 C' j+ _% W

( C) b/ D9 l8 Z1 N) Y# q. `: a* {& upub struct Pool {5 a( p0 i* N2 q: x2 m
    workers: Vec&lt;Worker&gt;,0 i4 x0 y& C$ N# k4 g- T8 h. g
    max_workers: usize,/ ^3 O3 {& |. s8 M. N9 T
    sender: mpsc::Sender&lt;Message&gt;
' @: u! R6 X5 p}- ~( }8 _5 q0 v# p

6 H4 c9 H3 f4 {% I; i, B% Fimpl Pool where {
. s  c- A5 J$ }, n    pub fn new(max_workers: usize) -&gt; Pool {- G. G$ e+ B- J. f9 N
        if max_workers == 0 {8 E9 n8 M8 g, R! W4 J
            panic!("max_workers must be greater than zero!")
3 |% S3 `+ [4 Q1 |        }. D: e8 p* C% f) T  i' d
        let (tx, rx) = mpsc::channel();
* Z# U, |1 t2 F4 [: _" I1 M4 I6 Q3 l
4 Z: q& f5 \/ h! A7 e" g% g        let mut workers = Vec::with_capacity(max_workers);
) T. e1 a& l8 }( h* l2 r        let receiver = Arc::new(Mutex::new(rx));+ s9 ]! J* B( ~1 L7 r  H4 e* H
        for i in 0..max_workers {
, X* c2 a4 C+ c            workers.push(Worker::new(i, Arc::clone(&amp;receiver)));' R( I" F+ x* T, B
        }7 Z  ^# [3 a: A& @# D# J, j

0 c4 ?' p6 P+ l        Pool { workers: workers, max_workers: max_workers, sender: tx }$ Y: l  ~6 \) i: J
    }
" b/ |$ H* I$ J   
, M" a+ [9 s4 M2 P6 x7 B9 i    pub fn execute&lt;F&gt;(&amp;self, f:F) where F: FnOnce() + 'static + Send
4 q9 ?( d: h& B/ L    {9 Y1 _. V4 N1 }/ S# A
1 `0 M+ `" I# P! I4 C; m  Y3 X
        let job = Message::NewJob(Box::new(f));
, Z; k! P1 E7 Z        self.sender.send(job).unwrap();% s8 i' @/ d* w/ k  B" }: S! [
    }- N$ [# t: ~/ d* M" P
}" V7 m4 X1 |+ u

, a  w9 A( q- m0 Vimpl Drop for Pool {- `/ f8 N5 z8 a/ j$ s5 q
    fn drop(&amp;mut self) {% D+ v% I8 b0 l4 B) A
        for _ in 0..self.max_workers {
6 ]) Y  O+ e4 A/ H2 ~- p- h            self.sender.send(Message::ByeBye).unwrap();
) p) G. X4 ]# h  z. |5 V        }
6 b' }. J+ p% i+ i$ o9 C        for w in self.workers {1 q5 k8 P3 V% m% M. ?2 f3 L' Y/ S
            if let Some(t) = w.t.take() {
1 D% w/ a  y" i                t.join().unwrap();
- |' V4 k0 e$ C+ ^            }
! h4 `* V6 F5 w5 o# a; I        }
2 F, [% E9 u$ b* a# L    }3 r; l+ Z& u' w. g" N" |
}
0 s. t8 a: g, I# S8 S  r) W) u- D6 `+ q: S) l7 z( r. i$ U

. `0 x0 X* E1 P/ C#[cfg(test)]- K) w1 F& a* A5 U* C2 L, J9 O9 i' S
mod tests {* A# T; M& `4 B+ G
    use super::*;
# B( F6 B( s0 @9 s    #[test]
9 [% @# u& l6 G" d& z2 Q    fn it_works() {% p; w& e0 v: u+ ]- }
        let p = Pool::new(4);
$ j! z8 W) J# ^        p.execute(|| println!("do new job1"));
0 n% I2 H9 u. z5 l        p.execute(|| println!("do new job2"));$ B/ y" I6 x6 j9 Y
        p.execute(|| println!("do new job3"));% _2 G) R/ U; C, |2 E
        p.execute(|| println!("do new job4"));
. e* p* V% H- ]    }; X! a' ?" O- f7 U4 ?( o# ?$ |
}; f+ R3 v# F0 N: e- }
</code></pre>
+ j3 v* o, }, _0 [. Z+ j1 u1 l/ p
回复

使用道具 举报

懒得打字嘛,点击右侧快捷回复 【右侧内容,后台自定义】
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

手机版|飞雪团队

GMT+8, 2025-7-1 11:23 , Processed in 0.404046 second(s), 21 queries , Gzip On.

Powered by Discuz! X3.4

Copyright © 2001-2021, Tencent Cloud.

快速回复 返回顶部 返回列表