飞雪团队

 找回密码
 立即注册
搜索
热搜: 活动 交友 discuz
查看: 14755|回复: 0

rust 实战 - 实现一个线程工作池 ThreadPool

[复制链接]

8049

主题

8137

帖子

2万

积分

管理员

Rank: 9Rank: 9Rank: 9

积分
26477
发表于 2022-2-12 14:35:42 | 显示全部楼层 |阅读模式
$ V& j' K, k0 e. h) b
<h1 id="如何实现一个线程池">如何实现一个线程池</h1>% d5 a1 i) e0 S6 f% J- H
<p>线程池:一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。 例如,对于计算密集型任务,线程数一般取cpu数量+2比较合适,线程数过多会导致额外的线程切换开销。</p>
+ x. g) _1 S4 D4 r0 S0 g, R4 K<p>如何定义线程池Pool呢,首先最大线程数量肯定要作为线程池的一个属性,并且在new Pool时创建指定的线程。</p>
- Y; W( N* L/ l% k" H( \' O<p>线程池Pool</p>4 e0 ?. T. |/ k" ]5 M! g: Z, c; Z+ P
<pre><code>pub struct Pool {! C5 h! j5 C& w! |
  max_workers: usize, // 定义最大线程数
8 B5 C" L! v# B. e% _( E}! a& x; g6 S) n5 Y! s$ d4 h

# c4 `9 T6 K. w, A# Limpl Pool {$ O' @0 ?$ X' t, C! W. E/ T
  fn new(max_workers: usize) -&gt; Pool {}) U1 v# o1 o+ k/ V. ~
  fn execute&lt;F&gt;(&amp;self, f:F) where F: FnOnce() + 'static + Send {}- o* `0 ?1 _+ G$ U  _
}
. R4 g* `' f4 Z
4 O4 R( ^2 h; D* p9 c$ m</code></pre>: D% I5 _- G5 N! _/ k
<p>用<code>execute</code>来执行任务,<code>F: FnOnce() + 'static + Send</code> 是使用thread::spawn线程执行需要满足的trait, 代表F是一个能在线程里执行的闭包函数。</p>
( D  ]( _7 L0 `2 U. E<p>另一点自然而然会想到在Pool添加一个线程数组, 这个线程数组就是用来执行任务的。比如<code>Vec&lt;Thread&gt;</code> balabala。这里的线程是活的,是一个个不断接受任务然后执行的实体。<br>
" L4 F5 ?4 ^( J) F5 @0 Q- B可以看作在一个线程里不断执行获取任务并执行的Worker。</p>- p" }+ ~( O0 s- @; Z; C+ T. [  r
<pre><code>struct Worker where2 l0 u% V& @* i( o0 y$ }
{0 f& y$ m* K9 W
    _id: usize, // worker 编号
+ v5 M$ H; l; v0 D  h. [. V}) Y1 K" I3 s: u' P* x, }
</code></pre>
# m! p" _% B4 U; k  k<p>要怎么把任务发送给Worker执行呢?mpsc(multi producer single consumer) 多生产者单消费者可以满足我们的需求,<code>let (tx, rx) = mpsc::channel()</code> 可以获取到一对发送端和接收端。<br>$ W) }0 e, G8 @" {! d( ?' T8 Z
把发送端添加到Pool里面,把接收端添加到Worker里面。Pool通过channel将任务发送给多个worker消费执行。</p>
: e0 y7 X3 @& W9 }7 v% n! i* s<p><strong>这里有一点需要特别注意,channel的接收端receiver需要安全的在多个线程间共享</strong>,因此需要用<code>Arc&lt;Mutex::&lt;T&gt;&gt;</code>来包裹起来,也就是用锁来解决并发冲突。</p>9 v: k0 s  c' l" R( B
<p>Pool的完整定义</p>8 n$ `1 S8 \/ t1 K( Q& V
<pre><code>pub struct Pool {1 R% J& I6 L0 b( q: A
    workers: Vec&lt;Worker&gt;,
, A7 I/ Z7 C( ?/ u$ J' u. {    max_workers: usize,4 w6 v! Y; j# ?9 r! L' [1 S/ {6 {
    sender: mpsc::Sender&lt;Message&gt;
/ k3 C+ F0 q1 L* P+ t+ F) s}: Q8 F5 J9 b" T) Y. m* W/ P5 |
</code></pre>
5 _) C! \3 o! ^  t. Y% O. r# G; z<p>该是时候定义我们要发给Worker的消息Message了<br>
. P6 G5 Z6 y. h, |定义如下的枚举值</p>, ~5 t' Y0 a7 h! A; @9 Z5 M
<pre><code>type Job = Box&lt;dyn FnOnce() + 'static + Send&gt;;9 v0 W* B7 T$ J  X# J$ M' \. r
enum Message {8 F) s. i6 O& s. S! ~! P+ c) J
    ByeBye,
, n" G/ _6 w0 b' }    NewJob(Job),; b% z# Z8 a. P2 v" G
}
6 U, I* c0 ^) f+ X</code></pre>
4 o! H& Y# ~" b6 p0 G<p>Job是一个要发送给Worker执行的闭包函数,这里ByeBye用来通知Worker可以终止当前的执行,退出线程。</p>
2 K  m; x# n: y( s1 ^1 M' f( _, O<p>只剩下实现Worker和Pool的具体逻辑了。</p>
! e% B! Y' @  C+ ]0 M  [1 {<p>Worker的实现</p>+ E$ ]$ x; }6 {# r
<pre><code>impl Worker
7 t" w0 Q5 r4 c: ?0 S# c. a' _9 S{
8 }- V3 k  x, Z  B2 W6 k" `    fn new(id: usize, receiver: Arc::&lt;Mutex&lt;mpsc::Receiver&lt;Message&gt;&gt;&gt;) -&gt; Worker {
. ^2 c9 M) z% F6 @) ^% w' L0 N        let t = thread::spawn( move || {
3 S8 M% l' Q* V$ @2 f1 r            loop {; g, t1 u% o& \& B
                let receiver = receiver.lock().unwrap();+ d- ?& V1 f8 @6 I( f; I/ ~2 F
                let message=  receiver.recv().unwrap();
) r  Z+ x4 M! g                match message {
$ l( y( X" x6 i, O* B9 O% ?                    Message::NewJob(job) =&gt; {
0 E0 ~/ z8 Q; q. i- R2 W( \                        println!("do job from worker[{}]", id);. j' L! T$ g0 j8 {2 [
                        job();& A2 s. e, p' `$ H/ G- T. }
                    },/ ~8 m3 j6 s' k. u8 H
                    Message::ByeBye =&gt; {% }/ U; V& I$ [) b2 l
                        println!("ByeBye from worker[{}]", id);/ J6 c9 d7 z/ }$ a/ O3 ?
                        break. W( L. y% p1 B  w* b0 W
                    },- W: S4 p* A  S5 v
                }  
" Q4 |4 n8 }! y3 \' C  b            }, G6 O. e1 C; m; e- I% L) v
        });7 G3 O. I+ v6 H9 M

" V, e1 Y- R# r) j, {/ U  `        Worker {  t! H- C* \5 l  P  ]
            _id: id,) }2 Q4 G3 t2 A+ P: |, U
            t: Some(t),3 p% }2 b# F/ D
        }
5 p0 s7 i4 V5 O- {    }' l# O6 `/ f# [0 W2 y- c
}" h7 V  U9 P% |6 ]2 @* y
</code></pre>8 \! |: q* ~0 O. T9 `4 p5 x
<p><strong>let message = receiver.lock().unwrap().recv().unwrap();</strong> 这里获取锁后从receiver获取到消息体,然后let message结束后rust的生命周期会自动释放掉锁。<br>
5 R) c) D# \; Z8 D7 T% N但如果写成</p>" I" I0 _1 D, t! ], q; ]
<pre><code>while let message = receiver.lock().unwrap().recv().unwrap() {
9 i+ ~/ ~" h8 M  B3 e. Y! U};
3 ]: K- v( r6 @/ a6 g4 T</code></pre>
+ E+ a5 O) X/ P1 \- A8 J; K% q<p>while let 后面整个括号都是一个作用域,要在这个作用域结束后,锁才会释放,比上面let message要锁定久时间。<br>/ @  n9 _( E# O
rust的mutex锁没有对应的unlock方法,由mutex的生命周期管理。</p>
& B& f" z+ q/ Q3 Q1 }  x<p>我们给Pool实现<code>Drop</code> trait, 让Pool被销毁时,自动暂停掉worker线程的执行。</p>* Y3 l5 {6 d4 q7 h! B9 P
<pre><code>impl Drop for Pool {
! ~6 H  O# @/ P$ Y0 K    fn drop(&amp;mut self) {( z+ K  X  P! v$ H0 E$ H: _
        for _ in 0..self.max_workers {
# |* u4 ^7 [3 R8 U6 ~, w7 \. A            self.sender.send(Message::ByeBye).unwrap();7 \8 f9 i2 U( x8 ]% J
        }
% _/ L3 W( v" g* J. ]( C) y        for w in self.workers.iter_mut() {
* r0 ~- ]/ n. a6 r            if let Some(t) = w.t.take() {4 A  z# m& ^7 E& _
                t.join().unwrap();5 N- L" A+ i0 ]& ?$ o/ c
            }
4 g8 f0 I& H( N2 n3 L        }* `( R2 j) E+ g9 h
    }
: X/ X! ]+ E* g/ v}2 N5 T, w6 c$ d+ M+ s' e

# E4 n9 S) J: f  Q0 z6 v- i) m, g</code></pre>/ S) s. K$ e( j
<p><strong>drop方法里面用了两个循环</strong>,而不是在一个循环里做完两件事?</p>
& c8 z# s$ s) t# j1 {# [<pre><code>for w in self.workers.iter_mut() {
) v. u0 n+ U! ]8 Z3 M    if let Some(t) = w.t.take() {; Z7 v) c' j. }( p# T
        self.sender.send(Message::ByeBye).unwrap();
& e. ]" e5 r! S1 k) f        t.join().unwrap();
( Z: |7 R) N& u  j0 m2 k( P+ r  e    }
/ Z* y( |! |+ X5 Y7 Q4 A$ M}. B8 C# c6 C2 \: H- f- G

0 _. \3 q7 N9 w) d</code></pre>5 L" E. h$ x$ Y
<p>这里面隐藏了一个会造成死锁的陷阱,比如两个Worker, 在单个循环里面迭代所有Worker,再将终止信息发送给通道后,直接调用join,<br>
# `9 Z! t2 a/ W. A1 A+ }; b/ a我们预期是第一个worker要收到消息,并且等他执行完。当情况可能是第二个worker获取到了消息,第一个worker没有获取到,那接下来的join就会阻塞造成死锁。</p>
6 T4 q- g0 i' ^$ c; a4 d<p><strong>注意到没有,Worker是被包装在Option内的</strong>,这里有两个点需要注意</p>
4 u6 ^$ c  @6 {<ol>
7 `1 z, m. ?) S, P# v  r, Y- k( X<li>t.join 需要持有t的所有权</li>- L2 `6 i$ q; b8 a0 I' X$ q/ N
<li>在我们这种情况下,self.workers只能作为引用被for循环迭代。</li>
; }7 L( E# H! i9 D+ T8 y</ol>" z7 U9 [) u: I/ i8 z
<p>这里考虑让Worker持有<code>Option&lt;JoinHandle&lt;()&gt;&gt;</code>,后续可以通过在Option上调用take方法将Some变体的值移出来,并在原来的位置留下None变体。<br>' y( J4 Z" m1 C
换而言之,让运行中的worker持有Some的变体,清理worker时,可以使用None替换掉Some,从而让Worker失去可以运行的线程</p>
0 c& H4 Q$ j. F4 o<pre><code>struct Worker where  ~: y+ X+ O! h5 B* p
{7 g" g2 ~( t4 O& f7 ]" ~
    _id: usize,; x! a7 n5 o& {8 ^' K& D3 M
    t: Option&lt;JoinHandle&lt;()&gt;&gt;,. g6 b% |- I4 r% Q. T* z8 l* A
}0 k; V- Z( q! G; L! A. {
</code></pre>
, ?! E7 l3 s/ ?<h1 id="要点总结">要点总结</h1>
, K* v, L: m' ]# M# ~, F<ul>
+ b$ V  y: s" ?/ o<li>Mutex依赖于生命周期管理锁的释放,使用的时候需要注意是否逾期持有锁</li>: M) F" t( j0 s# w$ o7 H
<li><code>Vec&lt;Option&lt;T&gt;&gt;</code> 可以解决某些情况下需要T所有权的场景</li>$ \" G0 J' b, A" n8 [; E: X
</ul>: S# b) L. G% X6 D& T8 z
<h1 id="完整代码">完整代码</h1>7 ^  o6 h1 L( E$ N# m) P# ?  g
<pre><code>use std::thread::{self, JoinHandle};
* _5 d4 `, K6 Ouse std::sync::{Arc, mpsc, Mutex};7 _- E$ \5 F! K) N: X
. o9 @/ X0 y5 p$ @1 A& Y$ G. t
, r  B  f3 S' T- l. F: ?1 z
type Job = Box&lt;dyn FnOnce() + 'static + Send&gt;;
& G5 ^7 c4 x9 B  _  Q. b( Benum Message {
6 d; a0 f% a9 ]9 \7 O3 W% G    ByeBye,2 ^8 x0 Q2 J7 b
    NewJob(Job),
9 m' b$ _) J  `( M/ T1 ?}
% X% I. G9 U: T! d& {1 C9 d
" t1 [5 z3 K% R- i$ nstruct Worker where* ]; R6 n: B. M) o+ p
{
! Z: Y; q3 c' @    _id: usize,/ o6 B1 }2 V7 W% m
    t: Option&lt;JoinHandle&lt;()&gt;&gt;,
  a8 X9 N0 r4 x* \4 \! n2 V  Y2 J}
  R# B' S' w4 t- M! y/ y# H5 V. g" r; D
impl Worker
; n5 U' V- w: k% P% `{
- q, g6 z9 N5 F8 F8 T8 n0 X7 c    fn new(id: usize, receiver: Arc::&lt;Mutex&lt;mpsc::Receiver&lt;Message&gt;&gt;&gt;) -&gt; Worker {
  _; a9 c) U% c$ v- K, u        let t = thread::spawn( move || {
2 Q- A7 B+ n1 A4 W/ n            loop {
$ V* k" F/ f* m7 z+ r. B3 i* b                let message = receiver.lock().unwrap().recv().unwrap();
- l3 R* o9 g0 ~  z: b% [                match message {. `; Y( Q2 ]* `: A0 F  B
                    Message::NewJob(job) =&gt; {
- R6 @( J# P$ z- i8 g                        println!("do job from worker[{}]", id);
# ]% f1 c7 I, D0 i! W" s                        job();
% q9 }: K3 ~3 H! t, U% L: i$ g/ u8 R3 [                    },
0 ^- L/ X  b+ O                    Message::ByeBye =&gt; {
' W" |" Y/ z) g8 s0 E                        println!("ByeBye from worker[{}]", id);
1 U8 h" U5 R1 i7 b$ s                        break0 h. b$ ^* e6 s0 d% s  t/ a
                    },( J4 a5 `0 f9 f* j/ G. R2 `
                }  8 l8 U4 ~/ A  a; L' _( d+ q
            }
/ R9 z1 G) n4 g( q4 X        });+ j2 c2 F+ S: L8 e! F
3 O! H2 B3 F- r
        Worker {/ I  i7 u* M0 ^- O% w: ^- x
            _id: id,, f. Y. B. M: B9 `
            t: Some(t),, Z6 k. @. h8 I8 Z& ]
        }; b* v: p& ~( d8 ]! b
    }
5 y8 D/ x0 T3 d4 q" h. O}
! m( R+ U* r9 _. Z
" R+ d" L, o& gpub struct Pool {
# r0 X+ k+ c# C    workers: Vec&lt;Worker&gt;,6 x6 K$ c6 v. t0 ?" K& j1 U
    max_workers: usize,+ I  O; ^) v9 p5 ~9 q: R1 k
    sender: mpsc::Sender&lt;Message&gt;
9 I3 }8 D. ^* k2 x}& b) d  g$ S7 e: n' v: k& d: J: q0 S

, g6 h1 \% P+ |- f3 l" K/ t4 `impl Pool where {" R9 W; Q6 x3 p2 G  a
    pub fn new(max_workers: usize) -&gt; Pool {
4 U- n  t% [/ A! s. y) r6 ?9 z        if max_workers == 0 {
" W. P. l  T. K) X1 u1 V% `- W            panic!("max_workers must be greater than zero!")5 O2 U3 p; u3 v4 @
        }
; [2 T2 x2 ]" h1 Y, s4 P9 h1 i        let (tx, rx) = mpsc::channel();
5 \5 G" f3 W3 |# g
% g- e4 O$ D7 g) g; K1 ?        let mut workers = Vec::with_capacity(max_workers);9 F; R3 K7 Y& E7 R" [" ]: a0 r
        let receiver = Arc::new(Mutex::new(rx));0 b# ?. N/ u3 {2 r$ P3 K3 h' {
        for i in 0..max_workers {/ Z2 T9 I) Z3 B% L
            workers.push(Worker::new(i, Arc::clone(&amp;receiver)));* C* I3 c3 B  g' ]# V3 |/ a
        }  m  A% {# H6 W" D! `3 _5 X
4 w. N6 K9 C$ `5 n9 j2 T) @
        Pool { workers: workers, max_workers: max_workers, sender: tx }5 [4 Q# Z+ o! ]4 u
    }. N" l" T$ X4 c$ l
   
9 c5 }7 Y/ h- `& y8 m8 p    pub fn execute&lt;F&gt;(&amp;self, f:F) where F: FnOnce() + 'static + Send$ B% F6 [0 n! c5 B2 M# R
    {
3 x6 v: _  K+ {! j! Z8 n- |! D+ l
' c0 f6 M0 @+ A  `; x( z7 A0 w% q0 r) ]        let job = Message::NewJob(Box::new(f));$ x/ w! X* H0 C/ A) T( ^+ r( ]# T8 i
        self.sender.send(job).unwrap();5 g$ d7 ]' Z$ q. r* }
    }
# K5 S& r9 Q2 j3 ~9 M! `}
4 b" Y1 J2 t. {4 D
" [' j! g6 G$ Rimpl Drop for Pool {
4 ]- d& A) V. \( J7 m) r    fn drop(&amp;mut self) {6 W$ d8 I+ K3 ^+ d, `/ }2 O4 U$ n
        for _ in 0..self.max_workers {2 c$ M$ {% J/ a, v3 f
            self.sender.send(Message::ByeBye).unwrap();/ ]( S. p1 U7 w6 z; `6 L
        }
- w" a7 p9 M5 m$ X' F        for w in self.workers {
+ T6 j# X, n6 ^- V' z            if let Some(t) = w.t.take() {
4 @+ k: O  J' B! A                t.join().unwrap();
# r# y" f9 A7 w" ]            }
2 N) p, y4 M. ]9 }# D6 f        }
; p) x7 Z  i# `4 K& h$ B    }) R" ~( A& Z+ V6 Y# A
}
* X% q" x6 P5 u
; N, X  a6 u$ D+ O' |6 w
6 K' E( a0 |! X, A#[cfg(test)]- w/ P3 l' Y5 G9 B
mod tests {
) q: a! p$ F: n1 N    use super::*;
; M; y; f/ I+ N    #[test]
# P8 U1 `$ Z, Q* P; j" l    fn it_works() {
% b9 e8 s* J: W        let p = Pool::new(4);
0 P# ?  T/ \" Z4 i        p.execute(|| println!("do new job1"));
4 G8 A: f7 D8 ]2 z9 E8 p        p.execute(|| println!("do new job2"));
7 G, ^8 B7 C6 N' ?        p.execute(|| println!("do new job3"));" V) y0 u, b- D; @! u
        p.execute(|| println!("do new job4"));( D; L3 X0 O. h! O. J, E1 @
    }
* c) V; a7 H$ F; e+ s6 i2 f: t}% n; u: S/ e9 J/ S% m. v& R
</code></pre>
5 A0 v# a( W$ Y' w4 J
5 D6 i/ z1 I# N( K( ]' i
回复

使用道具 举报

懒得打字嘛,点击右侧快捷回复 【右侧内容,后台自定义】
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

手机版|飞雪团队

GMT+8, 2025-12-10 22:16 , Processed in 0.119390 second(s), 22 queries , Gzip On.

Powered by Discuz! X3.4

Copyright © 2001-2021, Tencent Cloud.

快速回复 返回顶部 返回列表