飞雪团队

 找回密码
 立即注册
搜索
热搜: 活动 交友 discuz
查看: 14527|回复: 0

rust 实战 - 实现一个线程工作池 ThreadPool

[复制链接]

7995

主题

8083

帖子

2万

积分

管理员

Rank: 9Rank: 9Rank: 9

积分
26315
发表于 2022-2-12 14:35:42 | 显示全部楼层 |阅读模式
& h! ?. M" b# s+ T
<h1 id="如何实现一个线程池">如何实现一个线程池</h1>2 ]$ T+ l9 l1 ^
<p>线程池:一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。 例如,对于计算密集型任务,线程数一般取cpu数量+2比较合适,线程数过多会导致额外的线程切换开销。</p>+ U. C5 R# P& P9 L/ A
<p>如何定义线程池Pool呢,首先最大线程数量肯定要作为线程池的一个属性,并且在new Pool时创建指定的线程。</p>) S, N% G) y+ V9 A2 i$ d3 O
<p>线程池Pool</p>
) L7 K! s, k# ]<pre><code>pub struct Pool {& E- Z% S; @: p6 t
  max_workers: usize, // 定义最大线程数1 F6 X7 o2 U1 |/ z1 k5 W
}8 b% i- _, @+ F. K, y' v# \

: g8 o5 s1 E# ~4 C) Y. l! Eimpl Pool {
- v  c: Q6 H; i! k1 t6 o  fn new(max_workers: usize) -&gt; Pool {}5 g) G0 F6 k  ]2 u: j/ E) a% P: L
  fn execute&lt;F&gt;(&amp;self, f:F) where F: FnOnce() + 'static + Send {}
$ X( q( }% H7 `7 J1 @9 B8 h}
1 \* r, o" g& h" z+ K
8 m9 ]) w0 e$ Y! Y) q0 w* R, O</code></pre>' t0 Z3 V7 t0 O
<p>用<code>execute</code>来执行任务,<code>F: FnOnce() + 'static + Send</code> 是使用thread::spawn线程执行需要满足的trait, 代表F是一个能在线程里执行的闭包函数。</p>0 q1 I1 P+ k, R) J% j0 x6 U
<p>另一点自然而然会想到在Pool添加一个线程数组, 这个线程数组就是用来执行任务的。比如<code>Vec&lt;Thread&gt;</code> balabala。这里的线程是活的,是一个个不断接受任务然后执行的实体。<br>
! O  N9 \5 s, y/ f: k. X6 w% F5 C& W- I可以看作在一个线程里不断执行获取任务并执行的Worker。</p>) L: M* U+ Q/ a! X# @1 Y' Z0 I
<pre><code>struct Worker where" ?; t8 g- p, O) l% W7 e6 p' l: _
{: f+ I0 v6 E6 A4 k8 e/ L, h
    _id: usize, // worker 编号
: a) Y  u" y/ m% ?* |# L}
/ g' L+ l$ B# V: x0 B% h</code></pre>) E9 R% j; A" G
<p>要怎么把任务发送给Worker执行呢?mpsc(multi producer single consumer) 多生产者单消费者可以满足我们的需求,<code>let (tx, rx) = mpsc::channel()</code> 可以获取到一对发送端和接收端。<br>/ k+ l) @# k5 g9 z7 e- n, Q9 `$ u3 |
把发送端添加到Pool里面,把接收端添加到Worker里面。Pool通过channel将任务发送给多个worker消费执行。</p>
1 {. V2 S0 Y) Z4 d- \0 b1 b<p><strong>这里有一点需要特别注意,channel的接收端receiver需要安全的在多个线程间共享</strong>,因此需要用<code>Arc&lt;Mutex::&lt;T&gt;&gt;</code>来包裹起来,也就是用锁来解决并发冲突。</p>
! f( K! V: _+ ]! B" Z<p>Pool的完整定义</p>
  n5 j+ m3 h  H2 o7 a<pre><code>pub struct Pool {9 S8 [7 }8 d. B) c2 W. x" B
    workers: Vec&lt;Worker&gt;,- X' b- x; s; G7 A
    max_workers: usize,# t' g% i6 T+ t- D
    sender: mpsc::Sender&lt;Message&gt;3 ~/ c9 i6 b2 ?- q& s3 z4 m
}
8 }: _* U" J. d# i- U, F) e- Q</code></pre>) r$ b$ i- h9 ^9 {
<p>该是时候定义我们要发给Worker的消息Message了<br>4 }' v7 N6 `' \' m. T. P1 g
定义如下的枚举值</p>
' D* a0 `$ R- J9 `<pre><code>type Job = Box&lt;dyn FnOnce() + 'static + Send&gt;;7 w5 }" T: B+ y2 I; ^0 L
enum Message {
# i) }0 |# O- [: N+ \3 B    ByeBye,
& H' X6 ]. K7 A$ z- k    NewJob(Job),* C% k% r( w9 E" r% [$ h
}- H( y0 v0 w, ?% ?
</code></pre>2 H' _) ^' `3 X' L/ M
<p>Job是一个要发送给Worker执行的闭包函数,这里ByeBye用来通知Worker可以终止当前的执行,退出线程。</p>. D1 e9 H7 E# M4 M! u
<p>只剩下实现Worker和Pool的具体逻辑了。</p>
/ }8 _  H1 y" N* H<p>Worker的实现</p>
: j2 r1 ?; @" L6 I. q1 n<pre><code>impl Worker  Y- @  M. ^- Z; K
{
8 r5 H  \/ u* s, @- N% {) y- r    fn new(id: usize, receiver: Arc::&lt;Mutex&lt;mpsc::Receiver&lt;Message&gt;&gt;&gt;) -&gt; Worker {% l3 W5 O7 O& _- m( ^  _0 O
        let t = thread::spawn( move || {
/ ?0 t2 ~3 F* j1 V  g" }5 R0 a+ I            loop {9 j8 |6 H; S+ O) \  _
                let receiver = receiver.lock().unwrap();
( x: i9 N' Y* u7 b                let message=  receiver.recv().unwrap();
7 A9 h. I# r( D# Q" O                match message {
  J6 P3 l5 L* y# Z( h                    Message::NewJob(job) =&gt; {4 f' \6 p$ x) U
                        println!("do job from worker[{}]", id);
, f, d; U5 W+ }0 B( g                        job();' _3 G4 [: z. _& I, V% F
                    },
2 @. F% f- E/ A. O- u- l. B! Z$ w. K                    Message::ByeBye =&gt; {
8 h2 E, v+ N6 E7 x0 B                        println!("ByeBye from worker[{}]", id);
5 f# q" p/ K# b$ Y  k, _. Q1 T                        break
, X- h# V2 O" H+ c                    },! R+ F6 y$ Z. q
                }  
6 N( X* ]! T0 @            }$ r* V. Y3 l: \; {( h/ R& i' ^% C
        });1 S: ~5 r( y4 L# f6 n/ Y6 F# k! C
. b- {0 J2 f; i$ j
        Worker {$ n% l% b1 \4 B4 V) M0 D9 p
            _id: id,7 \: x2 Z+ r% v9 ~. Z
            t: Some(t),% G/ P% T+ P: t( Y  _
        }- _' r& _0 f/ P$ L6 I/ w7 L
    }/ B# ?: [+ V' ^4 V- S
}. U; N9 ~' r/ U5 r9 a; C. B3 O3 @
</code></pre>
$ v* d2 m5 @" p6 N/ Y4 A* R<p><strong>let message = receiver.lock().unwrap().recv().unwrap();</strong> 这里获取锁后从receiver获取到消息体,然后let message结束后rust的生命周期会自动释放掉锁。<br>" i# G5 C. l1 E
但如果写成</p>
5 `" E) m; a! @3 _  Z9 e0 ?<pre><code>while let message = receiver.lock().unwrap().recv().unwrap() {7 W8 Z6 _; M4 z) _% Y& o3 h
};
9 L. }; D6 X' e  r' u8 w0 h</code></pre>
% O2 f- a1 s5 X, B<p>while let 后面整个括号都是一个作用域,要在这个作用域结束后,锁才会释放,比上面let message要锁定久时间。<br>4 ?: z) d5 J" S/ n
rust的mutex锁没有对应的unlock方法,由mutex的生命周期管理。</p>
2 d' E$ I$ t- P% D/ }5 |<p>我们给Pool实现<code>Drop</code> trait, 让Pool被销毁时,自动暂停掉worker线程的执行。</p>( z0 V2 m; B9 Q2 W7 o8 J  s4 A
<pre><code>impl Drop for Pool {
- s+ s+ k" A+ k4 E4 |3 `, ?$ s- m    fn drop(&amp;mut self) {
: Y! B6 U6 e$ {/ t( h        for _ in 0..self.max_workers {, G6 A) p9 [6 s8 ]0 E
            self.sender.send(Message::ByeBye).unwrap();
" G' K* e+ f5 w) L% @) I! E        }
1 W; l5 M9 \7 V        for w in self.workers.iter_mut() {
1 ]- G5 ]9 C1 E  B) @5 o7 h4 N; w            if let Some(t) = w.t.take() {1 z! I' e: N5 M
                t.join().unwrap();3 z2 O5 [- |- O5 z" X( q+ Z
            }  m7 ]& |: W/ T, Z- P% q- I: B$ b
        }! @7 n. c2 y4 _8 c+ J) B% U- s
    }5 Z! |. p0 f; Y8 m* u
}
# N/ m8 E- F) W7 ^0 m+ {. j( }# Z: Y8 j# e
</code></pre>, O" S4 U5 L- y, `- W
<p><strong>drop方法里面用了两个循环</strong>,而不是在一个循环里做完两件事?</p>6 p0 K0 N" P" l: ?1 q0 D
<pre><code>for w in self.workers.iter_mut() {
- R" I" j) J) [3 c# N7 c    if let Some(t) = w.t.take() {
/ e$ ^8 G6 a9 E5 x        self.sender.send(Message::ByeBye).unwrap();
% W/ M7 ^5 j1 n. Y        t.join().unwrap();4 G3 _9 Z( d2 y% A5 L. K
    }
& q% k; P! M$ \" }! _% [. D% {}
4 Z1 r# `" ~2 z( B1 ~3 w
( o6 A% H/ J$ B. |9 f, f) u9 w</code></pre>% |& r! j. N0 X  a& g
<p>这里面隐藏了一个会造成死锁的陷阱,比如两个Worker, 在单个循环里面迭代所有Worker,再将终止信息发送给通道后,直接调用join,<br>" L' ~& G; R# Z. Y9 K
我们预期是第一个worker要收到消息,并且等他执行完。当情况可能是第二个worker获取到了消息,第一个worker没有获取到,那接下来的join就会阻塞造成死锁。</p>7 J# {! T' k+ S* n/ z
<p><strong>注意到没有,Worker是被包装在Option内的</strong>,这里有两个点需要注意</p>2 h# o6 \. d3 P2 Y/ F# C: A. f
<ol>
, t5 {6 Z' P0 e+ x: M% F: g<li>t.join 需要持有t的所有权</li>
' b# a# R/ X1 O- U<li>在我们这种情况下,self.workers只能作为引用被for循环迭代。</li>
! M& r% r5 G1 C</ol>. o# C' A- j& {* u
<p>这里考虑让Worker持有<code>Option&lt;JoinHandle&lt;()&gt;&gt;</code>,后续可以通过在Option上调用take方法将Some变体的值移出来,并在原来的位置留下None变体。<br>
: d# |6 d/ C2 M换而言之,让运行中的worker持有Some的变体,清理worker时,可以使用None替换掉Some,从而让Worker失去可以运行的线程</p>9 {" ?+ N0 D# H) s9 |: _% E
<pre><code>struct Worker where
. F$ o( `2 X% W$ g, P{5 X/ \% L1 I+ v* }) \9 O9 ]. Z1 s& w, }& Z
    _id: usize,$ E, |( C) k+ z8 A
    t: Option&lt;JoinHandle&lt;()&gt;&gt;,* T% J7 {; w* I+ j/ F2 E! Z
}  `/ j- f. u/ e3 A# r
</code></pre>
: {' ]" p# Z# \7 u% H6 j<h1 id="要点总结">要点总结</h1>
4 F; h4 S  I% m5 c4 B' q4 {. J<ul>
4 \/ f% ]* {" y<li>Mutex依赖于生命周期管理锁的释放,使用的时候需要注意是否逾期持有锁</li>! T" O- J0 V! `8 Q4 v" F8 Z0 t
<li><code>Vec&lt;Option&lt;T&gt;&gt;</code> 可以解决某些情况下需要T所有权的场景</li>( i6 D4 Z0 F% R9 d+ A
</ul>
" u4 f4 Q/ {) {7 `; J+ ^<h1 id="完整代码">完整代码</h1>
/ Z* r7 H# O; j6 {% w0 P<pre><code>use std::thread::{self, JoinHandle};
& s) p" n6 G; f* buse std::sync::{Arc, mpsc, Mutex};
4 h1 e5 r% z; b/ q# D. D5 e
# }/ c2 i  z6 S) Y; Q$ y5 X& ^9 c9 T& N+ _% w
type Job = Box&lt;dyn FnOnce() + 'static + Send&gt;;; F. O0 Y3 P: Z5 ?7 Z* g
enum Message {
8 [! K& x+ I2 I7 c    ByeBye,* m) A" E) A) q! f
    NewJob(Job),# L/ d3 _9 V# W  b$ r- W- B; I* v
}
# E* ~9 D4 \) B3 [; q1 O( H
& s# m1 o. O- }: B  w: z7 nstruct Worker where2 V9 w% u8 v' b9 I5 q% z1 E9 z  S
{  H  {( U. M, d7 V1 d
    _id: usize,
5 h* V+ {! w. O) j2 R, W+ Z( b5 T" M    t: Option&lt;JoinHandle&lt;()&gt;&gt;,8 Y# N- M6 g" ]
}
8 t4 n3 ?3 ~) J) u# H  F8 b) S2 u, a
impl Worker4 `) Q: _5 L0 L' n" B: k! P) D
{- `" y  M: D3 n0 w: ]( o2 K8 e
    fn new(id: usize, receiver: Arc::&lt;Mutex&lt;mpsc::Receiver&lt;Message&gt;&gt;&gt;) -&gt; Worker {
/ `/ M0 h5 o. Q. f( x- R" Z" Q* U        let t = thread::spawn( move || {
6 p1 A: u. u/ m8 `8 V            loop {) A4 G8 F& K$ ^  _  [  k
                let message = receiver.lock().unwrap().recv().unwrap();3 b; z# @. B- `
                match message {4 P/ @4 [, [, d& V% j
                    Message::NewJob(job) =&gt; {' _& u8 M) d, l7 U
                        println!("do job from worker[{}]", id);
+ r6 A7 s( G. f. [" |                        job();# I  `. a% z5 t& `* o
                    },# {, d7 R' S. ?
                    Message::ByeBye =&gt; {" q' w  f' E7 J' \
                        println!("ByeBye from worker[{}]", id);9 \9 \2 P5 f4 y- u, ~! F
                        break
, }  w5 l3 b6 M5 m6 Q                    },
" p- {. l( W" I# V2 _  D9 I0 S                }  . v& ?6 x2 {5 t# k& L% N$ I
            }& d! I+ E9 w0 i% w% Z7 I6 B* d- B6 U) k
        });8 U% g( g; V  q: \$ Y3 @' Y
0 G7 y& c: K2 }( g# V) [
        Worker {
5 U. q& U* G* z" I! b, B            _id: id,# c; v, c$ |/ D$ c* K
            t: Some(t),7 a" @$ S- ~8 i2 s
        }. Q4 {+ @4 L2 E# A
    }
1 n0 o' m0 L9 n& X* S% d# d}& f% ]8 g+ u* ]/ x4 |# v
, s5 ^5 t# a( y: x
pub struct Pool {
* n: Y% @, b! q2 ?    workers: Vec&lt;Worker&gt;,) S, q% G: M' V0 f, K
    max_workers: usize,5 j* Z% ~% e4 z! g% O8 b& E) d' k
    sender: mpsc::Sender&lt;Message&gt;
" s  T9 a7 S! v, c' k- J& w% W  ^}$ Y8 e4 Q$ h- [, `: `2 j

# Z( o4 `! [* G$ jimpl Pool where {
: {# x  Q, f( s6 d5 ~  L    pub fn new(max_workers: usize) -&gt; Pool {
2 h' m7 \; b) l5 i; }        if max_workers == 0 {) r3 W6 H5 \& F4 C) Y& d1 t0 y5 f
            panic!("max_workers must be greater than zero!")+ Q; u. U' W( Y2 {$ V- h
        }
7 R- V8 k7 U" X8 ]' I        let (tx, rx) = mpsc::channel();+ {2 f  d# C, d( j$ T
% P! ^: r" }4 ^* `
        let mut workers = Vec::with_capacity(max_workers);
; d# q5 j8 x/ z2 L2 p1 s        let receiver = Arc::new(Mutex::new(rx));
& N/ g9 ~* N1 U- K        for i in 0..max_workers {+ w* x+ g5 ?' @% k$ i2 O. U# m
            workers.push(Worker::new(i, Arc::clone(&amp;receiver)));
1 w. m2 ?+ _# B8 Q) }/ D        }1 L# @( d- D4 w' e) _7 t

. ^1 Z# _3 \! C  T        Pool { workers: workers, max_workers: max_workers, sender: tx }. w. f3 m# u) y* `4 g! }  I: m
    }/ V* E6 ~4 A) d7 A2 h( m
    7 s$ |# r' q) z
    pub fn execute&lt;F&gt;(&amp;self, f:F) where F: FnOnce() + 'static + Send+ N* ~% M: H2 {! d, K
    {; R3 p- j' d* K1 w+ M5 T6 w

3 C* t* t0 `' D6 \7 l        let job = Message::NewJob(Box::new(f));6 T( U4 i' D- d
        self.sender.send(job).unwrap();
7 C3 {7 e4 r/ e6 L    }
! E9 c; u* y6 }% n& z# b}5 n0 M; `) g; V2 J

# f6 t9 Z2 u5 g+ X( U) Mimpl Drop for Pool {
# o9 s3 x' X  V' F, Z    fn drop(&amp;mut self) {
4 K* a' L& p$ d- C! l) e+ R- G8 E        for _ in 0..self.max_workers {, R* G; P0 T& Z6 S. u- K
            self.sender.send(Message::ByeBye).unwrap();
! T" h) m; p. w3 d, O        }8 \$ D1 U+ o3 w9 ~: E0 ~* u) I
        for w in self.workers {+ D7 U: V4 {% l" Y. P" [+ W  ~- X
            if let Some(t) = w.t.take() {
6 F4 ~: k  o4 _                t.join().unwrap();
* N" L# T0 h' L9 A            }, T4 t4 T0 i$ w4 F7 _$ ~; f7 G  Q
        }
6 T) y) m. f* q7 N# P" ?    }
, f: F& i. K8 P}6 V4 |, ^5 _% E4 z0 C7 Z$ T

' [' m/ [! A3 ^( V1 v9 E8 W( Y
" e6 A5 n2 T( O8 X6 S#[cfg(test)]
8 \( M7 Q; P' l; @mod tests {: i: ]% m& o/ x! g8 o( _9 m
    use super::*;+ |) y2 `! O7 k) S4 Z, Z2 {0 t7 t
    #[test]" }! b* N" {# m/ {4 x
    fn it_works() {
9 ?" W! s0 w2 ?) m! @$ s3 c: \        let p = Pool::new(4);
# D7 K" T- l  L6 h$ l: n        p.execute(|| println!("do new job1"));- j( O9 B1 @+ Q" S& \+ Z
        p.execute(|| println!("do new job2"));
0 u, a3 ?$ Q4 _0 M" f' a        p.execute(|| println!("do new job3"));0 ~7 n4 {7 D+ y, Q1 B
        p.execute(|| println!("do new job4"));
( j6 g. V3 l' }& C/ i    }
- @' I7 T. A  T8 b+ j5 i}' L5 q) x5 `6 g
</code></pre>
$ ]; v1 G  G! C4 I
* [  k  J! \! H5 w: Y6 T4 p
回复

使用道具 举报

懒得打字嘛,点击右侧快捷回复 【右侧内容,后台自定义】
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

手机版|飞雪团队

GMT+8, 2025-12-1 00:37 , Processed in 0.068597 second(s), 22 queries , Gzip On.

Powered by Discuz! X3.4

Copyright © 2001-2021, Tencent Cloud.

快速回复 返回顶部 返回列表