飞雪团队

 找回密码
 立即注册
搜索
热搜: 活动 交友 discuz
查看: 14605|回复: 0

rust 实战 - 实现一个线程工作池 ThreadPool

[复制链接]

8032

主题

8120

帖子

2万

积分

管理员

Rank: 9Rank: 9Rank: 9

积分
26426
发表于 2022-2-12 14:35:42 | 显示全部楼层 |阅读模式
1 r  J9 E9 ~" N8 o# {( t) a2 S
<h1 id="如何实现一个线程池">如何实现一个线程池</h1>! f' I+ _& u3 v: f1 b
<p>线程池:一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。 例如,对于计算密集型任务,线程数一般取cpu数量+2比较合适,线程数过多会导致额外的线程切换开销。</p>
* D  h9 k$ S4 ?, b<p>如何定义线程池Pool呢,首先最大线程数量肯定要作为线程池的一个属性,并且在new Pool时创建指定的线程。</p>
0 _8 h5 Y6 q7 j, n7 J& K+ s0 t<p>线程池Pool</p>) Q2 g3 |/ ]% P! m, e! k
<pre><code>pub struct Pool {
! K. |; K' e( F  max_workers: usize, // 定义最大线程数2 K. l- ]( ?; P8 v  \2 M$ O
}
/ n  c2 u; q( x2 r' M; e4 `8 I: C" U: z$ G8 v3 e2 z) X1 q' Z
impl Pool {
. ]4 h4 [1 i; M+ c& D6 Q! c  fn new(max_workers: usize) -&gt; Pool {}/ y7 z. G$ s/ x( A2 S
  fn execute&lt;F&gt;(&amp;self, f:F) where F: FnOnce() + 'static + Send {}. y8 f9 w" y, {6 t
}) c; N8 p7 J, @2 [

% S" w$ d& O6 i0 @0 M+ ~</code></pre>
0 J- T% S; q3 b7 A3 H<p>用<code>execute</code>来执行任务,<code>F: FnOnce() + 'static + Send</code> 是使用thread::spawn线程执行需要满足的trait, 代表F是一个能在线程里执行的闭包函数。</p>
1 D/ y4 n# B5 c% ?8 S7 t" T+ M<p>另一点自然而然会想到在Pool添加一个线程数组, 这个线程数组就是用来执行任务的。比如<code>Vec&lt;Thread&gt;</code> balabala。这里的线程是活的,是一个个不断接受任务然后执行的实体。<br>% h2 G5 B. W7 F1 b4 l# J3 y
可以看作在一个线程里不断执行获取任务并执行的Worker。</p>/ G" Z  l' r/ Z8 ^, _
<pre><code>struct Worker where- S# _" Y7 F5 f9 A
{* E) ]+ s  f( `: _0 h* h" Q
    _id: usize, // worker 编号' h3 R+ g( z3 s) D
}
6 h, ~  e( b; x. L3 V</code></pre>' C2 i' V( b3 P8 o
<p>要怎么把任务发送给Worker执行呢?mpsc(multi producer single consumer) 多生产者单消费者可以满足我们的需求,<code>let (tx, rx) = mpsc::channel()</code> 可以获取到一对发送端和接收端。<br>, \7 O  B$ k" l# c' d' n
把发送端添加到Pool里面,把接收端添加到Worker里面。Pool通过channel将任务发送给多个worker消费执行。</p>
8 T% @1 Z5 b- h! D# D<p><strong>这里有一点需要特别注意,channel的接收端receiver需要安全的在多个线程间共享</strong>,因此需要用<code>Arc&lt;Mutex::&lt;T&gt;&gt;</code>来包裹起来,也就是用锁来解决并发冲突。</p>! w; j. R% {8 v, T6 d5 W6 ?% y
<p>Pool的完整定义</p>
) M" j5 n& n+ z9 @0 {$ \, J<pre><code>pub struct Pool {3 _$ X5 Q. G1 C9 H& |' z
    workers: Vec&lt;Worker&gt;,& Y: Q9 h0 w, o' o8 Z5 ]
    max_workers: usize,
: a4 t$ ~$ ?% }3 ~" z    sender: mpsc::Sender&lt;Message&gt;
; L1 Y* ~; U! a: A" w# ^5 w5 ?! o1 [}0 J1 d0 U$ m- b% L6 G
</code></pre>
3 _! j0 B6 p( [+ Y, l1 |4 g<p>该是时候定义我们要发给Worker的消息Message了<br>
' i; Q' O3 s: k' ?定义如下的枚举值</p>& G$ L* y. W4 h
<pre><code>type Job = Box&lt;dyn FnOnce() + 'static + Send&gt;;* T  V/ D( ?$ p; M
enum Message {
: Y! I- g7 D; L# m+ J    ByeBye,
* W+ M% {0 h6 b3 F/ y/ r) K    NewJob(Job),
# X0 [5 M, ^* |}4 ^* H' }( L/ x# p8 w6 Y
</code></pre>5 w/ @  `7 w% ?# a' Q
<p>Job是一个要发送给Worker执行的闭包函数,这里ByeBye用来通知Worker可以终止当前的执行,退出线程。</p>
# G2 _5 q$ x0 O& d4 m<p>只剩下实现Worker和Pool的具体逻辑了。</p>+ J3 b7 k2 u2 \0 f' P
<p>Worker的实现</p>/ `$ h! E: C, ?# s5 y
<pre><code>impl Worker
( h1 Z* K% m5 w5 i- M3 e{3 n5 J: ?9 [" X
    fn new(id: usize, receiver: Arc::&lt;Mutex&lt;mpsc::Receiver&lt;Message&gt;&gt;&gt;) -&gt; Worker {
4 F' N1 B7 ]6 t& R: I$ c6 v        let t = thread::spawn( move || {
: M  H  o* o! S4 k# T            loop {# C. b; z% ^" w
                let receiver = receiver.lock().unwrap();
3 U* y; w7 r' n* \                let message=  receiver.recv().unwrap();, q1 Y' h- i0 |# n) f. j8 a$ q
                match message {
: [. s' e; c5 J0 M0 n& y                    Message::NewJob(job) =&gt; {
  T* B9 g% \3 z2 l; a! N5 c8 R" R4 u                        println!("do job from worker[{}]", id);$ P* r% |# P3 R+ s/ _2 P
                        job();
3 [7 U, `; k6 c7 u4 }1 r                    },' b9 z5 }8 q6 C( x% x3 h
                    Message::ByeBye =&gt; {
" r0 \* M  U  {, z0 r# S! N) k                        println!("ByeBye from worker[{}]", id);
8 i3 ~$ z1 i1 O' y* |5 }                        break8 U7 d" L; @' g; S! B. q
                    },
# i* P& M; c1 q                }  
; X3 \0 W6 |! d( h4 g) z+ F            }
/ ?" l6 R* ~) A, L, @1 ~3 v        });+ w8 u( V, O* c9 z3 B' X- L
) k3 g- i  ?9 K! d0 M  @
        Worker {
5 K+ A9 @, |% a; V            _id: id,& y$ L( a: ^( }+ I) q
            t: Some(t),
$ L, g* k6 Z! j3 x7 V# t        }: T9 e, }, Z" i% O8 b
    }. r6 V. ]! _& a& L& e
}+ D& I. C- _2 H# f6 `& [! _
</code></pre>* {  ]1 t0 K4 o2 D' R- Y- d
<p><strong>let message = receiver.lock().unwrap().recv().unwrap();</strong> 这里获取锁后从receiver获取到消息体,然后let message结束后rust的生命周期会自动释放掉锁。<br>
2 R! R3 l" J! t" M  e0 a. X: [) F但如果写成</p>
) _: z) O* n! W0 r1 ~<pre><code>while let message = receiver.lock().unwrap().recv().unwrap() {
! o( o( d" L; N; H7 Y2 i$ [* A};
4 [0 [# w- l- F: f. f9 z</code></pre>% Q/ `$ I! Z( `3 Y3 ~4 G! w1 m
<p>while let 后面整个括号都是一个作用域,要在这个作用域结束后,锁才会释放,比上面let message要锁定久时间。<br>! [- @5 F7 e" G$ p1 Z7 x
rust的mutex锁没有对应的unlock方法,由mutex的生命周期管理。</p>
1 w  i8 _, o" b<p>我们给Pool实现<code>Drop</code> trait, 让Pool被销毁时,自动暂停掉worker线程的执行。</p>+ M$ x5 ]* ~$ }+ z  j+ m; t. D
<pre><code>impl Drop for Pool {# j1 s& |3 T# C' e2 B
    fn drop(&amp;mut self) {; n: F% U; O' w, E# j
        for _ in 0..self.max_workers {" E% I3 p8 F) ^2 E
            self.sender.send(Message::ByeBye).unwrap();
/ a2 @& Z, K  `) ?" D) z5 D& w' E        }* c( A! `5 T! K: |3 c2 `& O
        for w in self.workers.iter_mut() {1 f# W1 X1 I2 g
            if let Some(t) = w.t.take() {' B" T+ @: S$ h9 {# n, h  g
                t.join().unwrap();( l( ~' p. s2 |) n' A3 c+ H
            }4 o; l* j$ l2 Z2 c# m6 a
        }4 v2 i2 L+ W: n* q
    }' e5 S% M1 R/ w& Z8 y
}
" r$ Q1 o$ u& w. K& r2 f
2 D6 i- @$ n6 S/ o' t, s$ v</code></pre>
9 W0 [: |+ h. U" T<p><strong>drop方法里面用了两个循环</strong>,而不是在一个循环里做完两件事?</p>, A: E0 X; p! e
<pre><code>for w in self.workers.iter_mut() {5 ~1 W, R/ J) M
    if let Some(t) = w.t.take() {
3 N- J0 K8 R% @4 b* r        self.sender.send(Message::ByeBye).unwrap();
8 O4 c' u% \6 K) W3 K2 {4 M        t.join().unwrap();
( v5 x2 L9 ]6 n- h: v% {    }
1 v0 ^! N/ `. M}" n4 K* {5 i' ]; L
0 ~4 p/ y5 ~- V3 l1 Q7 p
</code></pre>
; a& ^3 L8 W9 I4 J6 Y8 Y" }+ V<p>这里面隐藏了一个会造成死锁的陷阱,比如两个Worker, 在单个循环里面迭代所有Worker,再将终止信息发送给通道后,直接调用join,<br>
$ E: R, z3 Q$ h/ h% U5 y6 v我们预期是第一个worker要收到消息,并且等他执行完。当情况可能是第二个worker获取到了消息,第一个worker没有获取到,那接下来的join就会阻塞造成死锁。</p>
3 y% Y. h+ _" O1 b<p><strong>注意到没有,Worker是被包装在Option内的</strong>,这里有两个点需要注意</p>
1 B  n, e6 A; ~2 \<ol>  U" w: v( t8 {  h: k" o
<li>t.join 需要持有t的所有权</li>/ J9 {. L! }* W; B0 k1 U
<li>在我们这种情况下,self.workers只能作为引用被for循环迭代。</li>
5 h, p, R( ]. h  N/ d  H# e% v</ol>
" }& ]$ K" g' Q4 ?<p>这里考虑让Worker持有<code>Option&lt;JoinHandle&lt;()&gt;&gt;</code>,后续可以通过在Option上调用take方法将Some变体的值移出来,并在原来的位置留下None变体。<br>0 r" x: ^3 Q  k9 Q5 F
换而言之,让运行中的worker持有Some的变体,清理worker时,可以使用None替换掉Some,从而让Worker失去可以运行的线程</p>
4 Q( d$ `* _% L* v2 k<pre><code>struct Worker where
5 w) o0 l# q* A5 |. X# R{
" K1 O& C: C8 P% i    _id: usize,
2 f" I( ^" Y: ~, U* G    t: Option&lt;JoinHandle&lt;()&gt;&gt;,
1 V  C; B; m2 t  S4 Z  A$ f* g( }}8 ]6 Z% B8 ?7 n6 |. ~! b
</code></pre>
, l% r# p" I# ]8 q: S<h1 id="要点总结">要点总结</h1>
' {2 s8 a  K: N9 J<ul>
9 s( X5 d8 h- P4 j7 {<li>Mutex依赖于生命周期管理锁的释放,使用的时候需要注意是否逾期持有锁</li>6 U9 _' k# ~7 y
<li><code>Vec&lt;Option&lt;T&gt;&gt;</code> 可以解决某些情况下需要T所有权的场景</li>
6 ^0 n$ ]& j$ b0 e4 z* e' ?5 H</ul>0 w  J  s, E- W4 z; F' w  Z/ _
<h1 id="完整代码">完整代码</h1>( g5 b$ V. M/ s9 d  [' m; b# S
<pre><code>use std::thread::{self, JoinHandle};
* l4 @% `. q0 u" Ouse std::sync::{Arc, mpsc, Mutex};: T$ [9 g/ ]6 u
: `. Q/ [% e! N' v+ b% v7 I) L) o: e
- q. D* V6 ^- |: D8 V2 R
type Job = Box&lt;dyn FnOnce() + 'static + Send&gt;;
" D6 e5 ^2 Z* S% ^, z& `1 kenum Message {4 H" @  l& r0 ^, ?, a2 \
    ByeBye,
! S. B1 T6 ^, n    NewJob(Job),- i% s' Y0 m" Q8 t+ i; c( {1 r$ \- A7 P1 `
}1 ]8 q8 \: Q/ N) W4 D4 I
8 N2 ~, f1 }& E$ |$ e3 z% X( W4 P
struct Worker where$ d' G& R" \8 B
{3 B. G  O; T2 I$ f8 }
    _id: usize,. x  f. ]. Y. n/ Y# f, F
    t: Option&lt;JoinHandle&lt;()&gt;&gt;,
7 E$ Q4 j6 j; E# c' `  i}
9 D1 H/ b5 j" w( ^$ y7 X& X& C2 j, s0 g: e) ]' s( }0 z) i! G2 ~$ x2 u+ ^
impl Worker; _3 S& z. b  k6 z4 j' v3 c. h
{5 h  h- N; s" b5 U" e* k# H4 y
    fn new(id: usize, receiver: Arc::&lt;Mutex&lt;mpsc::Receiver&lt;Message&gt;&gt;&gt;) -&gt; Worker {
7 t0 o7 P9 O1 a        let t = thread::spawn( move || {( z* @7 j7 n! O/ W- \! C! m
            loop {0 {" @4 t0 S( f( [7 }6 ~
                let message = receiver.lock().unwrap().recv().unwrap();& y5 v2 `  P: x5 d7 @1 W+ P
                match message {
  Q' N* z2 `* i  S: L  N; a                    Message::NewJob(job) =&gt; {
+ q9 T$ ?( e# J; @/ D. G0 o  F% W                        println!("do job from worker[{}]", id);* C" m7 B& Q" [, `) n5 k- J
                        job();+ n- |) D+ K* {& ]) M; `
                    },( R+ P+ K9 y) o# }' F  X& F7 ?+ v, w
                    Message::ByeBye =&gt; {4 o1 [& m/ ]+ ?( ?* y
                        println!("ByeBye from worker[{}]", id);' P( x/ s( l& g' L$ F! m7 Y
                        break/ x) c2 r( ~9 b$ N
                    },
+ {  a4 V4 f+ H( B' y% {/ F                }  ; D7 w) }3 y- R9 i
            }
0 r* u# f* ^  y% O1 T        });
! Z" j2 Q  x1 a. E
4 b3 T& \" V* }        Worker {( D$ D9 q) L' p
            _id: id,9 W3 \3 t$ l0 O( P
            t: Some(t),* H2 q3 A9 @  z$ A  m- |
        }& D# F' A; h9 K( B% s' K) }
    }5 C0 X0 {9 q% Q+ ~
}
9 ^& @' q) q& f% U2 ]% X  U
; i9 T# f3 s9 Z4 T1 N0 dpub struct Pool {' J* C& s* q# c6 U
    workers: Vec&lt;Worker&gt;,. ?3 Y  a" W1 _8 n, B  t
    max_workers: usize,! W9 e8 w- d* B) i: r/ h# E
    sender: mpsc::Sender&lt;Message&gt;- K" Z' {9 U0 O' f
}
9 H$ X; c- M. d. K" M) w
$ ?5 Y0 S7 L, i, _* W5 Gimpl Pool where {
9 S0 C' e- [3 ?& x1 K2 G    pub fn new(max_workers: usize) -&gt; Pool {  I3 A8 A3 S$ Q( Q4 e2 ?
        if max_workers == 0 {
5 V. ]# F9 |$ j! b) [            panic!("max_workers must be greater than zero!")
8 Y/ D1 l! a4 |2 B* A        }
! _4 t! M, a  H" |        let (tx, rx) = mpsc::channel();) e( _" n* e. r. o. X( j# r

( D; ^  ], ]! {/ }4 L        let mut workers = Vec::with_capacity(max_workers);% i' K5 j1 a) z1 {6 f
        let receiver = Arc::new(Mutex::new(rx));# ]- z, ?* t  a" ?& D+ u" f9 y$ V
        for i in 0..max_workers {
4 O9 @! s) O# T            workers.push(Worker::new(i, Arc::clone(&amp;receiver)));* y% g2 b# ~& `! M2 x" b9 e+ t. W
        }3 Z* D+ e5 C6 r: S. R! ?; C
- F2 |# t) c! {) g
        Pool { workers: workers, max_workers: max_workers, sender: tx }3 f' r" X$ B7 W
    }7 x. C. c. J7 `; g$ N
    1 b7 j9 o4 u( f7 E! b6 p( ]( j
    pub fn execute&lt;F&gt;(&amp;self, f:F) where F: FnOnce() + 'static + Send! m) \0 F4 Z3 V! {
    {
2 L6 M# T6 @( W1 b3 h' K/ U4 `- O. D, U* ?
        let job = Message::NewJob(Box::new(f));
5 {+ Q. m! v% b) v- P; e% {$ w        self.sender.send(job).unwrap();3 r* P# L7 c' [' ~- t
    }# w  J; H; R$ S) A' X
}# u. z. J$ v0 Z, E) {

7 l5 ^: `5 C/ d% Yimpl Drop for Pool {
5 z8 s: U6 w8 r8 x! Q    fn drop(&amp;mut self) {8 i3 `/ ~2 l* g5 A9 D
        for _ in 0..self.max_workers {
; q& l0 s  v1 @            self.sender.send(Message::ByeBye).unwrap();
: o$ J! ?" ^# _5 B* _        }
- `5 Z; I  j6 H        for w in self.workers {
; l0 F3 \  {5 M            if let Some(t) = w.t.take() {
( ?3 s" I0 B& W7 \- ]- p( O                t.join().unwrap();8 l; T  C, a5 C8 A$ k0 a% s* |" ^
            }
5 [% }: C; [2 Q+ Z3 Y3 f        }
7 N6 t  |3 c7 b" A2 E    }5 \2 V* D& a+ H, L
}1 _, n+ x$ F# i# L0 O9 q

1 K7 w+ b1 ~: q1 d) j% ^) d* ], Z8 X8 Y" M. v
#[cfg(test)]
" U( a" Q, {( C" r0 ~$ tmod tests {
6 z8 O+ l/ r: Q* D+ K    use super::*;
- E7 V- T4 N0 n9 }/ t7 ]0 A    #[test]4 _# {) W( P9 i$ Q; G. F
    fn it_works() {: [3 W7 F1 f' _$ k1 A  r! c5 ?9 b
        let p = Pool::new(4);
9 w9 k& E7 |0 D. J5 ?; z+ H# U9 ~        p.execute(|| println!("do new job1"));# ~: O  y& F/ `, t
        p.execute(|| println!("do new job2"));0 {4 X' Q! j4 D; Y7 n/ Y& R' ^
        p.execute(|| println!("do new job3"));0 b7 [' J4 t# Y2 g4 H  k6 Y0 k
        p.execute(|| println!("do new job4"));" I+ ^8 `" s" E# D' ]8 B
    }8 r, v3 I* x2 q5 s! s
}" H; l* c! j) R
</code></pre>
5 D: @- W; i. U6 g$ o; h% }
- R: H: n% e/ u* f( ~8 n6 N; s, J
回复

使用道具 举报

懒得打字嘛,点击右侧快捷回复 【右侧内容,后台自定义】
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

手机版|飞雪团队

GMT+8, 2025-12-4 09:00 , Processed in 0.062124 second(s), 22 queries , Gzip On.

Powered by Discuz! X3.4

Copyright © 2001-2021, Tencent Cloud.

快速回复 返回顶部 返回列表