飞雪团队

 找回密码
 立即注册
搜索
热搜: 活动 交友 discuz
查看: 14666|回复: 0

rust 实战 - 实现一个线程工作池 ThreadPool

[复制链接]

8040

主题

8128

帖子

2万

积分

管理员

Rank: 9Rank: 9Rank: 9

积分
26450
发表于 2022-2-12 14:35:42 | 显示全部楼层 |阅读模式
# e+ M( ^+ g# t8 o
<h1 id="如何实现一个线程池">如何实现一个线程池</h1>1 \$ B* C! v" V4 X
<p>线程池:一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。 例如,对于计算密集型任务,线程数一般取cpu数量+2比较合适,线程数过多会导致额外的线程切换开销。</p>
3 l# W0 p/ `3 E, w, v; a<p>如何定义线程池Pool呢,首先最大线程数量肯定要作为线程池的一个属性,并且在new Pool时创建指定的线程。</p>) T! m: t! c) b
<p>线程池Pool</p>6 q1 e- c- N4 E7 w* @
<pre><code>pub struct Pool {
/ z! @1 |3 L+ G% I( \0 b" G' c  max_workers: usize, // 定义最大线程数$ B; C1 x$ o. I/ V& @0 p& Q& k
}
# `* Q* r) t' i# w) Z# G! H  y
1 P; W" p; J: C2 E7 d; Rimpl Pool {
- ?8 A4 c5 X# ?4 }4 P  fn new(max_workers: usize) -&gt; Pool {}9 a+ I. Z0 n; ^- o. z' t2 M+ |
  fn execute&lt;F&gt;(&amp;self, f:F) where F: FnOnce() + 'static + Send {}  k% n+ Y! ^; F7 k4 d- T2 D
}
' `, \3 [: q* y. `4 ?* g) X, u! I# J, Q2 e( c0 a: ]6 S! n+ K* B. j7 K; O
</code></pre>
) a' f4 I& C  h7 }1 |<p>用<code>execute</code>来执行任务,<code>F: FnOnce() + 'static + Send</code> 是使用thread::spawn线程执行需要满足的trait, 代表F是一个能在线程里执行的闭包函数。</p>
& X! M) U8 I1 y3 F: F) ?5 }<p>另一点自然而然会想到在Pool添加一个线程数组, 这个线程数组就是用来执行任务的。比如<code>Vec&lt;Thread&gt;</code> balabala。这里的线程是活的,是一个个不断接受任务然后执行的实体。<br>
3 `  P' s8 b0 K7 d' U' H可以看作在一个线程里不断执行获取任务并执行的Worker。</p>
9 o/ f8 y, A, l<pre><code>struct Worker where+ B6 V2 o/ Q. J4 r  W7 r' w
{
7 q, j9 t: X8 K$ b+ \; ~) C* S7 y    _id: usize, // worker 编号
, D0 H" w9 _& \0 _}
7 U2 n* l; z, T</code></pre>
1 p) a# C9 C5 Z<p>要怎么把任务发送给Worker执行呢?mpsc(multi producer single consumer) 多生产者单消费者可以满足我们的需求,<code>let (tx, rx) = mpsc::channel()</code> 可以获取到一对发送端和接收端。<br># r3 Z( }  ?# U# W8 ?; c
把发送端添加到Pool里面,把接收端添加到Worker里面。Pool通过channel将任务发送给多个worker消费执行。</p>3 d5 k, j9 B+ J
<p><strong>这里有一点需要特别注意,channel的接收端receiver需要安全的在多个线程间共享</strong>,因此需要用<code>Arc&lt;Mutex::&lt;T&gt;&gt;</code>来包裹起来,也就是用锁来解决并发冲突。</p>
& `8 v2 B0 _! a; x4 ^<p>Pool的完整定义</p>
5 a# Z/ X% @, H' V<pre><code>pub struct Pool {
6 P* W; x/ l1 Q/ b) [  w; }    workers: Vec&lt;Worker&gt;,  x; b5 u. y6 O2 `6 M
    max_workers: usize,: Z" r, P- o' E/ F* O, F% t
    sender: mpsc::Sender&lt;Message&gt;
! }: R# W: c! I$ s. S; d}
* X( m" k( F$ H4 ~: o</code></pre>7 f( s' |) `, f! c# ?
<p>该是时候定义我们要发给Worker的消息Message了<br>
# Z( L* U1 K2 H. P. L! ~定义如下的枚举值</p>! @  @$ Y7 I% L" @  K- o
<pre><code>type Job = Box&lt;dyn FnOnce() + 'static + Send&gt;;  n9 z. l+ I% `0 E0 |
enum Message {
. N5 I3 W& [( @/ b    ByeBye,6 q! o2 S5 S6 p! ^/ Q1 Z
    NewJob(Job),
6 F0 g- D; H0 s}, |# F+ ~; e0 A8 Q
</code></pre>9 _6 ]3 Y7 I# q, c
<p>Job是一个要发送给Worker执行的闭包函数,这里ByeBye用来通知Worker可以终止当前的执行,退出线程。</p>
7 T8 `! V4 S4 C- D$ Y1 M<p>只剩下实现Worker和Pool的具体逻辑了。</p>7 R  ?' e, J) B* K  t; Z, t
<p>Worker的实现</p>* [% {, i3 ?% _" d0 W
<pre><code>impl Worker
: \7 y+ H% @8 v) [' `{
$ [! ~9 s; q+ b( O    fn new(id: usize, receiver: Arc::&lt;Mutex&lt;mpsc::Receiver&lt;Message&gt;&gt;&gt;) -&gt; Worker {
8 n/ o4 G& j; s/ t* _( m( p3 b8 N, ], L        let t = thread::spawn( move || {
) w* Y. O& }4 u1 O, U& g- Q# `            loop {
7 y' r' x& O4 D9 C                let receiver = receiver.lock().unwrap();/ l# s! k7 @3 P: X' `
                let message=  receiver.recv().unwrap();+ B. I8 B% C9 d  d# I* x
                match message {
2 G0 u0 I: q( F' E% w) w9 U                    Message::NewJob(job) =&gt; {
4 |4 h: S; g8 B( ?/ D                        println!("do job from worker[{}]", id);
0 P7 s" L+ v' h: m! A                        job();
* W$ r5 Q6 o5 a( T/ F; T3 N                    },
  k% w% |& @/ c3 k                    Message::ByeBye =&gt; {* M9 C3 v6 z+ F7 v/ v' i9 G, @
                        println!("ByeBye from worker[{}]", id);
; }% i8 \3 S1 P3 I% v$ z                        break8 u( W: Z. c2 @% {/ r
                    },) M; d0 D- e$ ^# k+ T: ~
                }  
$ q/ f* u0 N) @( f' G% _, e            }; p* H* x( r: n' T$ G
        });
  I3 G0 U" O/ j3 n% N
8 F9 k4 V& ^) J; q, Y! T0 ]        Worker {8 i# B+ S+ o1 k- f
            _id: id,8 c# j6 K" f- H4 r; j9 D: c
            t: Some(t),8 o5 H5 E! i4 e
        }. C& u  q- W0 u8 P
    }0 F/ e  l) p$ L: w, r
}6 X, L7 [& y  Q; [& A+ j
</code></pre>1 H) k# }; N! j6 P$ e' Y
<p><strong>let message = receiver.lock().unwrap().recv().unwrap();</strong> 这里获取锁后从receiver获取到消息体,然后let message结束后rust的生命周期会自动释放掉锁。<br>5 B- E# A4 n0 G6 u" m
但如果写成</p>
" {3 `, ^$ t' G6 I9 J- k: n<pre><code>while let message = receiver.lock().unwrap().recv().unwrap() {; v3 Z' q; s0 y
};! N, m0 `$ ?, ]9 c/ _. P
</code></pre>8 ]+ p, [. @& W: s0 ^
<p>while let 后面整个括号都是一个作用域,要在这个作用域结束后,锁才会释放,比上面let message要锁定久时间。<br>: P* l5 X' q. }) N$ m
rust的mutex锁没有对应的unlock方法,由mutex的生命周期管理。</p>
2 @& u9 D; ~& a<p>我们给Pool实现<code>Drop</code> trait, 让Pool被销毁时,自动暂停掉worker线程的执行。</p>% f  `1 s2 J/ U2 ?) h. S5 v
<pre><code>impl Drop for Pool {
0 I! w# Y( e5 k( Z% N    fn drop(&amp;mut self) {
7 m+ A) D3 J1 U4 j* d        for _ in 0..self.max_workers {; t! S: C% `/ T. V9 [8 Q
            self.sender.send(Message::ByeBye).unwrap();
& B* e. v8 A' m( l9 F8 X! Y        }
/ o  p& `8 N1 Z* G5 Q1 n/ n+ c6 Q        for w in self.workers.iter_mut() {
# D* h: o6 o3 V            if let Some(t) = w.t.take() {
- j& H$ ]% Y! a+ X                t.join().unwrap();0 G4 d, y& u/ ?) @
            }
( I, ^; c! O( p% R/ j! E4 m0 N        }6 \3 {9 o& o1 o" ?( }
    }' K0 S3 C# n$ a  s* m; B; ?8 H
}( f0 H  ^3 Q1 {6 U# A  b
) z2 f( L0 i+ o: ?6 D8 f6 q
</code></pre>
$ S5 g& o  S4 T$ [4 {1 [<p><strong>drop方法里面用了两个循环</strong>,而不是在一个循环里做完两件事?</p>
/ ^1 d& c  A1 P$ ?& c: w$ T<pre><code>for w in self.workers.iter_mut() {( y& k% l: L8 I
    if let Some(t) = w.t.take() {
6 Y/ @. _2 R3 Z' d        self.sender.send(Message::ByeBye).unwrap();) }: M4 c# A* c5 G8 z
        t.join().unwrap();
2 z% E2 _6 v" }( W( r    }
% c% h+ i1 ^- a: S- C7 I}5 }3 v4 s) s9 G3 c& P

3 ^/ S& Y4 z! |) e' P</code></pre>: t2 [! h; x# N  }0 F
<p>这里面隐藏了一个会造成死锁的陷阱,比如两个Worker, 在单个循环里面迭代所有Worker,再将终止信息发送给通道后,直接调用join,<br>
: E. Y/ e4 K! e; r我们预期是第一个worker要收到消息,并且等他执行完。当情况可能是第二个worker获取到了消息,第一个worker没有获取到,那接下来的join就会阻塞造成死锁。</p>
2 u8 U9 B% E5 l<p><strong>注意到没有,Worker是被包装在Option内的</strong>,这里有两个点需要注意</p>* j/ ]& A! z# g
<ol>
4 c5 k# T! U# b$ A5 H, H<li>t.join 需要持有t的所有权</li>1 O4 E- M, r' e& q- u7 p4 M
<li>在我们这种情况下,self.workers只能作为引用被for循环迭代。</li>+ h; k- P6 }1 k1 W0 h
</ol>
' c( L, O# X2 L! K' H3 T, c<p>这里考虑让Worker持有<code>Option&lt;JoinHandle&lt;()&gt;&gt;</code>,后续可以通过在Option上调用take方法将Some变体的值移出来,并在原来的位置留下None变体。<br>
% k4 x$ L- s% I, i8 P& _/ E1 \1 _换而言之,让运行中的worker持有Some的变体,清理worker时,可以使用None替换掉Some,从而让Worker失去可以运行的线程</p>
# X" [$ x, j( i% B<pre><code>struct Worker where3 G+ W* D3 }: y  V
{
; f  v5 ~* A' T0 a/ ]% v9 V    _id: usize,2 y0 w  {4 E1 x3 {1 K  X: d# `$ ]
    t: Option&lt;JoinHandle&lt;()&gt;&gt;,- M7 g3 ~, ]0 B; W
}$ Z) _; O; C3 x6 {4 M+ k3 [
</code></pre>
9 z# K5 X) @6 f1 f8 W, G9 Z( U9 ^+ J6 D<h1 id="要点总结">要点总结</h1>
, s; J: K" c) ?6 k) k+ \<ul>
0 y1 a2 m& ?( b+ ]+ Z1 h- _<li>Mutex依赖于生命周期管理锁的释放,使用的时候需要注意是否逾期持有锁</li>
5 c! d3 Z- |; |& k  f& j! p<li><code>Vec&lt;Option&lt;T&gt;&gt;</code> 可以解决某些情况下需要T所有权的场景</li>1 @% P( E* m7 L/ ], c4 R6 m( J* [
</ul>; [. Y5 ]! r. V: z: W
<h1 id="完整代码">完整代码</h1>
) u, c$ t' I1 v, s- S% D; n<pre><code>use std::thread::{self, JoinHandle};
- l8 P, \) o1 i) [9 i( p" y# Nuse std::sync::{Arc, mpsc, Mutex};
, ^" k. b# f2 t, y  s! `/ N: g1 i: U% l* s7 V( p+ L
5 {4 O# r+ J' Y# ]( B! i
type Job = Box&lt;dyn FnOnce() + 'static + Send&gt;;0 G- G2 d' j) w4 G3 N" w
enum Message {2 v3 D4 i4 N* x
    ByeBye,
+ f, s5 e# N8 u. _    NewJob(Job),, e* |, H/ H9 Y% G1 W* c& T
}: k8 \! H1 b$ M* ]

8 w0 ]) E# \. B& hstruct Worker where
4 X2 y7 A, X0 E& ~{
2 z7 @8 F4 O+ q6 s    _id: usize,
6 E5 Z9 I0 f- }* o    t: Option&lt;JoinHandle&lt;()&gt;&gt;,
% c9 K1 k6 P2 q}; s0 Y+ T, K: i( n3 x7 X

8 h8 j& q" H0 d" ^3 L2 v8 x  p& Jimpl Worker
( ]6 V4 O! o7 ]8 M6 B! J7 ?+ N{. u; U, V1 S7 b8 q8 G6 d
    fn new(id: usize, receiver: Arc::&lt;Mutex&lt;mpsc::Receiver&lt;Message&gt;&gt;&gt;) -&gt; Worker {- Q3 }  Y, x# _- t
        let t = thread::spawn( move || {
+ O& f8 S) m% n/ D2 f            loop {
5 f" d0 U( B( ^0 @                let message = receiver.lock().unwrap().recv().unwrap();
: q, b( y& V. h2 K$ s6 G8 d1 g                match message {
/ A7 A# j3 f$ Y9 M                    Message::NewJob(job) =&gt; {
3 S& M/ b4 k, o, d$ R/ i, ?) z. {( ]                        println!("do job from worker[{}]", id);: G: I" s) P6 K
                        job();
1 w# H. b6 B- t+ ^                    },, @4 v3 a# {' Q2 G7 I5 H
                    Message::ByeBye =&gt; {
- ~3 ^  T% ~$ `/ d. O9 e                        println!("ByeBye from worker[{}]", id);
# s( n% A1 |; `7 j# \                        break* d; {' F5 H& {2 \( _$ Z2 Z) k& r
                    },
( s6 L7 L9 q/ k+ z0 b7 [# r5 P8 }7 D                }  
! J, y# c( w* q            }' s! Z2 ]" H$ ?; c; j. E6 |* n
        });
$ [! u" x7 S# R' `/ O
3 `; |( Q( y/ K        Worker {
/ e7 M& m+ b- G' l. \! e& V) O  t& Z            _id: id,  v# I. a) \4 S$ l; k4 ?! Q
            t: Some(t),# C" l. m% L3 ^( y/ i
        }
" M: g( X* |/ Q! h    }% P& X, w7 ^9 p4 |. B: s9 f& g1 U7 o
}
3 c/ h& p; W& a/ O! G, b6 a* F* W
+ V- h% Y+ F$ A5 S2 g( V' epub struct Pool {5 M2 D, v- {* v' b
    workers: Vec&lt;Worker&gt;,
# `: B9 ^9 m$ z/ @$ v    max_workers: usize,, m9 a# e$ l, A$ ^* G
    sender: mpsc::Sender&lt;Message&gt;7 o" B$ W9 V9 p  v! g
}7 y. o9 m8 Y+ C* M

7 u* H8 I  B8 ~" H: W7 t/ W1 w; _impl Pool where {
8 h# N% H. t1 Q4 a  V* }( b) E    pub fn new(max_workers: usize) -&gt; Pool {8 p! O8 z, c! v7 T8 ?9 Z8 ]3 o, L- A
        if max_workers == 0 {: x" {- O' E0 f) _1 _! g
            panic!("max_workers must be greater than zero!")/ b/ N  R% t6 }+ ~7 |
        }; F" W+ u4 @' ?2 C6 i8 d4 ?
        let (tx, rx) = mpsc::channel();  W9 j4 u% w, N" z

. R; J/ ~% Z1 |+ z        let mut workers = Vec::with_capacity(max_workers);
/ M" x9 j! B8 P        let receiver = Arc::new(Mutex::new(rx));
5 X, L' G% e0 g" W, N        for i in 0..max_workers {  {( ]+ Z1 f: W2 c+ ^
            workers.push(Worker::new(i, Arc::clone(&amp;receiver)));
4 L' C6 s( y" Y( K1 `/ A8 D        }
# l! }" A( s) L! r
" }, A) _) S! q: M+ y2 L        Pool { workers: workers, max_workers: max_workers, sender: tx }' V2 o" w/ x; X, ?0 ^3 K
    }) B4 c2 S$ g. q! Q
   
8 n( A  x9 [; A    pub fn execute&lt;F&gt;(&amp;self, f:F) where F: FnOnce() + 'static + Send
2 N9 w  f" p5 t    {
9 m2 i; v) s" b+ R" H+ P% w' d$ x% J( i! C& z& E# e
        let job = Message::NewJob(Box::new(f));1 o! V! u  _, `9 K. o  C. H" A
        self.sender.send(job).unwrap();
; A  \! l3 z" p8 S& u    }
/ z( T3 C& U* m/ I) N$ ]5 Q8 o2 s& u}
. |, Z" P9 @; W; c" m
, d9 |  Q! R/ ?* I; k0 zimpl Drop for Pool {8 }% f5 N: }$ _5 l5 D5 s9 P
    fn drop(&amp;mut self) {9 U8 U3 C1 O3 s+ @6 d; d  c
        for _ in 0..self.max_workers {
" r9 N. g$ e" M7 k            self.sender.send(Message::ByeBye).unwrap();
% O! K7 L9 F! Q1 r1 X. v! b7 e        }
% |7 b+ h* m$ U        for w in self.workers {. \, F* T& P. [; e! D& l9 ?
            if let Some(t) = w.t.take() {
7 p, b2 x3 i) d* I. n                t.join().unwrap();, ^* o# X  V7 a! G/ Z( l3 U+ S0 i
            }
2 k9 s1 o5 j- m5 C' ]9 \/ p        }# ?, @# j. X2 c  r* N/ a
    }# a( h& g- U* H% L" F2 T- q  w
}4 c8 C9 Q  c' u, A; p" h
% i. b; Y/ W6 J3 y2 o: v

7 F4 K$ e2 q. i( I. ]/ ^#[cfg(test)]6 A" S2 y% Z; o# h
mod tests {
7 t6 Y. w0 ^! R    use super::*;5 ]2 P, ]2 A+ }2 c
    #[test]- c& \6 B2 I* s- c7 S
    fn it_works() {
, g9 t+ U8 B2 u# b+ a4 B        let p = Pool::new(4);; f. O8 ]) a' U1 L! C7 c, |
        p.execute(|| println!("do new job1"));
& q) a- {; \* G. b: y        p.execute(|| println!("do new job2"));
' D. u0 [8 W3 X1 @) A        p.execute(|| println!("do new job3"));
1 Q- P! Q. ]6 E" f7 d        p.execute(|| println!("do new job4"));
( c4 \" @8 a  u, p+ h' o    }
$ H/ z1 [( y$ f6 Y, g# P9 }+ `  ^}! o5 d( ?4 N; Z+ j0 @, g
</code></pre>
6 {) Y0 a- ~6 q0 t/ c
1 c- `6 P5 S6 Y% i( }6 A) @6 L
回复

使用道具 举报

懒得打字嘛,点击右侧快捷回复 【右侧内容,后台自定义】
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

手机版|飞雪团队

GMT+8, 2025-12-6 17:46 , Processed in 0.066648 second(s), 22 queries , Gzip On.

Powered by Discuz! X3.4

Copyright © 2001-2021, Tencent Cloud.

快速回复 返回顶部 返回列表