飞雪团队

 找回密码
 立即注册
搜索
热搜: 活动 交友 discuz
查看: 14523|回复: 0

rust 实战 - 实现一个线程工作池 ThreadPool

[复制链接]

7995

主题

8083

帖子

2万

积分

管理员

Rank: 9Rank: 9Rank: 9

积分
26315
发表于 2022-2-12 14:35:42 | 显示全部楼层 |阅读模式

$ {6 H- l8 y8 o) q& E, f<h1 id="如何实现一个线程池">如何实现一个线程池</h1>
" ^0 {( ~' y4 d% r% n<p>线程池:一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。 例如,对于计算密集型任务,线程数一般取cpu数量+2比较合适,线程数过多会导致额外的线程切换开销。</p>! T2 \( N$ s% G- H
<p>如何定义线程池Pool呢,首先最大线程数量肯定要作为线程池的一个属性,并且在new Pool时创建指定的线程。</p>
8 i" Y! c" [; e2 T6 V, C<p>线程池Pool</p>
4 i. B+ b/ s8 i8 c<pre><code>pub struct Pool {
) z% {! G; U7 u  max_workers: usize, // 定义最大线程数* J* w" K2 ?# H; e0 N
}
7 r  F' }2 F* ~4 k; G
! a: [! d2 x6 W/ q, rimpl Pool {
4 |# `6 L, j& V  fn new(max_workers: usize) -&gt; Pool {}/ _$ w3 m6 b& @$ n" W; Z( H! j
  fn execute&lt;F&gt;(&amp;self, f:F) where F: FnOnce() + 'static + Send {}
: I7 i. R  v; ]* g  K) \}% j9 h' }- \$ M( y; \7 z6 J
- R9 u0 G" v2 M  Z1 ?- Q' T
</code></pre>. E$ L1 ?" ~. u# |+ ?& q6 r
<p>用<code>execute</code>来执行任务,<code>F: FnOnce() + 'static + Send</code> 是使用thread::spawn线程执行需要满足的trait, 代表F是一个能在线程里执行的闭包函数。</p>
+ B7 ]* ~& B0 G<p>另一点自然而然会想到在Pool添加一个线程数组, 这个线程数组就是用来执行任务的。比如<code>Vec&lt;Thread&gt;</code> balabala。这里的线程是活的,是一个个不断接受任务然后执行的实体。<br>9 {3 n- _9 x! h4 U. Y
可以看作在一个线程里不断执行获取任务并执行的Worker。</p>( _6 @1 c: q2 a  l7 D
<pre><code>struct Worker where
6 Z$ F5 t' u5 U; W; b0 v# ?' b) Z{
2 B) `( R7 E) u1 `    _id: usize, // worker 编号
) W. ?; }% \  a9 E' m8 R. ^2 }}! Q) N6 s. V: \0 d% {5 ]
</code></pre>% ?  b9 K4 C2 o+ q
<p>要怎么把任务发送给Worker执行呢?mpsc(multi producer single consumer) 多生产者单消费者可以满足我们的需求,<code>let (tx, rx) = mpsc::channel()</code> 可以获取到一对发送端和接收端。<br>) ^8 i) R* D3 c) W1 j
把发送端添加到Pool里面,把接收端添加到Worker里面。Pool通过channel将任务发送给多个worker消费执行。</p>  J6 D/ V/ _7 [0 ~* m( n; e9 ^
<p><strong>这里有一点需要特别注意,channel的接收端receiver需要安全的在多个线程间共享</strong>,因此需要用<code>Arc&lt;Mutex::&lt;T&gt;&gt;</code>来包裹起来,也就是用锁来解决并发冲突。</p>
6 E& I4 G' J3 D, N$ S4 o  y6 F<p>Pool的完整定义</p>
1 b* G/ C8 Y' e<pre><code>pub struct Pool {% o7 E6 F( B- \4 r
    workers: Vec&lt;Worker&gt;,
" O) |. w7 f: b4 g; o    max_workers: usize,0 I: L. `. n: ^  I. T
    sender: mpsc::Sender&lt;Message&gt;9 s6 L0 q# @; T
}6 j; ]5 }3 U& g8 b" [0 _7 r
</code></pre>4 y7 [9 w: v8 ^' B
<p>该是时候定义我们要发给Worker的消息Message了<br># b' }7 ~# c9 j! R9 _" E
定义如下的枚举值</p>* k: [! k2 f6 F9 M
<pre><code>type Job = Box&lt;dyn FnOnce() + 'static + Send&gt;;: z* m5 h# a; ^. A
enum Message {& V4 t. T) f; u. j( w1 x$ a* `
    ByeBye,
6 @, t$ `( P& X# y7 Z' S$ D    NewJob(Job),
5 F( T6 j3 q  E' J  P3 b# @}/ `- j5 o. h! L  r
</code></pre>
0 y- X% n$ _  Z# l' O( k<p>Job是一个要发送给Worker执行的闭包函数,这里ByeBye用来通知Worker可以终止当前的执行,退出线程。</p>
: F3 @+ ^0 b. ^& c4 S( v6 K% V<p>只剩下实现Worker和Pool的具体逻辑了。</p>( A. \; N( P4 Q2 l8 n& g2 R2 D
<p>Worker的实现</p>1 F( @% o& p5 |1 `
<pre><code>impl Worker
' j8 A, B" i5 a! M4 K{0 N( I. {1 b9 \& B
    fn new(id: usize, receiver: Arc::&lt;Mutex&lt;mpsc::Receiver&lt;Message&gt;&gt;&gt;) -&gt; Worker {
) z' H5 a* Y' _( h        let t = thread::spawn( move || {3 ^* F5 _! k" m2 T* E
            loop {
8 H$ i: r. H3 B9 `0 b6 x  L5 |. \                let receiver = receiver.lock().unwrap();
0 q( N1 f2 ~9 f( x                let message=  receiver.recv().unwrap();% ^5 @* J: w# }9 g( ~7 O
                match message {
, W* l8 q+ ]: Q8 n& f                    Message::NewJob(job) =&gt; {
) G' d) M& @$ Q4 T) K' L8 h                        println!("do job from worker[{}]", id);* a% r% g1 P$ F, A" u6 g
                        job();
1 ]& u; }( Q1 }- s# b5 ]                    },# D  }2 C3 v5 D5 Q( W3 ?) L
                    Message::ByeBye =&gt; {5 k7 \/ Y% ~; N# N" C
                        println!("ByeBye from worker[{}]", id);
0 R% [6 z) o8 m8 `# n                        break
0 ^/ ~, L( M6 k                    },1 F* A5 m5 }1 Q% V
                }  
# ]  G8 A4 i. t- K3 S4 y% ^            }# I3 N4 ~5 A/ G- J& H
        });
3 S( Y/ ]  {8 q  U
+ V9 y+ n8 S; O        Worker {. q5 W( s+ z8 {8 k6 L  |% z
            _id: id,
+ b. c5 _  K2 G% ?% u, w) Z            t: Some(t),
  M4 e5 z! B8 }6 B" {( P3 u) }) F        }
) X: }3 _% A2 O/ T    }/ V2 S! h! s- V1 \7 C# \
}- C& q3 W- T! P2 w# h
</code></pre>
3 }& c9 t  y3 ]% T# F/ J<p><strong>let message = receiver.lock().unwrap().recv().unwrap();</strong> 这里获取锁后从receiver获取到消息体,然后let message结束后rust的生命周期会自动释放掉锁。<br>% G* F6 o- Z- A* \: T
但如果写成</p>  S! X# g% c7 I7 ~
<pre><code>while let message = receiver.lock().unwrap().recv().unwrap() {$ \; g& o5 O, }8 K$ P, B, K
};
. m7 m2 h/ ?) H</code></pre>" e+ w2 A3 `* d. |1 B$ V) B
<p>while let 后面整个括号都是一个作用域,要在这个作用域结束后,锁才会释放,比上面let message要锁定久时间。<br>" I' \/ F3 B+ s' q7 [
rust的mutex锁没有对应的unlock方法,由mutex的生命周期管理。</p>, W) |9 _: p5 r- H9 A
<p>我们给Pool实现<code>Drop</code> trait, 让Pool被销毁时,自动暂停掉worker线程的执行。</p>
" c" b7 A8 @8 W- o/ U: }: Z" H<pre><code>impl Drop for Pool {
+ H9 L) f5 ]$ a# t* r3 x; D    fn drop(&amp;mut self) {
5 |  I; s5 ^7 d+ V/ @        for _ in 0..self.max_workers {
, j8 h2 D8 E2 H            self.sender.send(Message::ByeBye).unwrap();
+ i; A7 k) B9 F6 m0 b- q# e1 Q' e        }( N8 B! |+ z; f/ [4 E; \' o: G
        for w in self.workers.iter_mut() {
/ p/ B+ n7 i7 m0 g            if let Some(t) = w.t.take() {8 ]; S9 Q9 r& l
                t.join().unwrap();: N1 r' b3 H" ]/ u( U
            }
4 F6 s; y  C7 V  Z6 s        }
  T' z0 y; Y- i- O9 L6 ~5 y8 g    }8 @: x% I0 Z& s; U6 U
}. H/ K4 J) g  M, e
: U" d: {1 y2 D5 @5 a6 O' N
</code></pre>2 s; i& W+ g% w  J( C& s3 i+ q
<p><strong>drop方法里面用了两个循环</strong>,而不是在一个循环里做完两件事?</p>; X, p" G5 V# H: F7 m4 _
<pre><code>for w in self.workers.iter_mut() {
' d) z" g0 h4 x+ x2 G    if let Some(t) = w.t.take() {
# ^0 a) ~3 ~; u        self.sender.send(Message::ByeBye).unwrap();
' t7 A$ o- z7 _2 w2 K        t.join().unwrap();
4 }: z) T; U  n: G: x& V    }
7 }+ Q- ?; d* ]$ T9 G}7 v8 V* c* C$ E/ i; o  j1 _

( q: K: Z: \, E# m</code></pre>8 N# ^$ e5 y* E7 o
<p>这里面隐藏了一个会造成死锁的陷阱,比如两个Worker, 在单个循环里面迭代所有Worker,再将终止信息发送给通道后,直接调用join,<br>  z1 N5 y1 s& @4 G* W; v) ~
我们预期是第一个worker要收到消息,并且等他执行完。当情况可能是第二个worker获取到了消息,第一个worker没有获取到,那接下来的join就会阻塞造成死锁。</p>) i9 m* L4 b* Q) T# ?/ Z3 T0 b$ H. b
<p><strong>注意到没有,Worker是被包装在Option内的</strong>,这里有两个点需要注意</p>9 d5 S2 E! L1 _$ o% D
<ol>+ p. M7 x) \3 @0 v4 C0 c+ e
<li>t.join 需要持有t的所有权</li>/ {$ h" v( f4 v* n
<li>在我们这种情况下,self.workers只能作为引用被for循环迭代。</li>1 q* ]8 t8 |9 h9 C$ B5 i; D, y- m
</ol>
+ i# w+ u7 R7 q5 t0 t8 k9 M- X<p>这里考虑让Worker持有<code>Option&lt;JoinHandle&lt;()&gt;&gt;</code>,后续可以通过在Option上调用take方法将Some变体的值移出来,并在原来的位置留下None变体。<br>
9 F- g* S- \: M, _6 f- Z换而言之,让运行中的worker持有Some的变体,清理worker时,可以使用None替换掉Some,从而让Worker失去可以运行的线程</p>
! q! m- c% C# S/ C<pre><code>struct Worker where. @  J% W$ i$ B  D  ^  N9 t
{
) d% e2 X3 R8 \- Y$ f; w    _id: usize,
; v8 u# [* n( g$ a. p  n    t: Option&lt;JoinHandle&lt;()&gt;&gt;,/ e* z1 u- G# G2 Z
}( {) F. G, _9 i# y8 B# W
</code></pre>
+ {' M. @6 k+ g! C4 Y<h1 id="要点总结">要点总结</h1>. b' u( Q5 j' z& I4 @" _# d
<ul>4 N( a; k7 P+ ?' C- z- ~
<li>Mutex依赖于生命周期管理锁的释放,使用的时候需要注意是否逾期持有锁</li>- d' ?4 l: X2 Z, n5 J1 X% W" P
<li><code>Vec&lt;Option&lt;T&gt;&gt;</code> 可以解决某些情况下需要T所有权的场景</li>2 ]2 z5 e( _0 O0 P6 D
</ul>  b2 C7 _, j2 @- I7 L0 D0 x
<h1 id="完整代码">完整代码</h1>: S( v3 n0 @8 i# k& r
<pre><code>use std::thread::{self, JoinHandle};
9 ~9 U* t0 k3 E  U; guse std::sync::{Arc, mpsc, Mutex};
7 H2 Y  G- ~& C0 o! R  x
% p0 w6 [$ b, O  ?# G0 X! e% v* [" G: z+ m
type Job = Box&lt;dyn FnOnce() + 'static + Send&gt;;
6 p5 d9 ~+ T; zenum Message {
% w, A' K9 A# m- R    ByeBye,
  R/ m) W; @+ u7 N9 ^/ ]; p, u  E; u$ ?    NewJob(Job),3 g9 I3 L) ?( |! W) m
}! n. y6 J: K: h" S

4 V/ Y+ ^( m: K5 P1 O# k  [struct Worker where! |8 i2 x8 J! P4 q
{
% e: b6 Z; V# O    _id: usize,
- k2 j4 M1 P* L$ q2 K    t: Option&lt;JoinHandle&lt;()&gt;&gt;,
4 l. A$ f/ a! h. ^}
) h; I6 H- ]: y- {8 Y0 J* \5 N( X9 X% J* d) I) E: |- \' m( E7 \0 \& }
impl Worker2 _# Z, C9 V/ M# e! s4 J
{
8 [2 O& j! b* G9 \! X    fn new(id: usize, receiver: Arc::&lt;Mutex&lt;mpsc::Receiver&lt;Message&gt;&gt;&gt;) -&gt; Worker {
  m- n( V) o- ?        let t = thread::spawn( move || {
0 V/ z8 o* @# {            loop {
  j) N" P; h% E6 E& R, X+ X) N  v                let message = receiver.lock().unwrap().recv().unwrap();
. w  i' a8 v6 u& F                match message {5 h, f, O/ @  f- M5 Z
                    Message::NewJob(job) =&gt; {& i) ~! z4 a. W5 E
                        println!("do job from worker[{}]", id);1 p8 K4 J$ l. W: S6 a
                        job();
( d6 j) P& x; R5 C6 ~" F                    },2 h. B. b! `. F) A4 C+ F
                    Message::ByeBye =&gt; {
/ u2 d: P  ?' D                        println!("ByeBye from worker[{}]", id);/ e6 p9 \& w' h* P1 B
                        break
4 w! U  T4 R0 V! {4 {1 D                    },
) a; B7 L' N7 i- X                }  
3 }! u  f% {! k2 [            }
  {6 H/ w) }9 i* @2 r* C+ P        });
: `9 ]' w9 _6 R+ W! {9 G/ S7 N$ c. z/ i& R
        Worker {1 y3 B7 M2 T* O$ w* D
            _id: id,
, M& M7 ~. B2 {( c+ w- j            t: Some(t),
- Q* a+ U6 _8 v6 S/ Y        }
8 e. Q! H$ L2 `1 b; a8 z2 ^    }
. c0 p/ F" p( z}* d2 R7 f% `2 _0 ?) Z4 _6 o

2 }( A9 M6 C; p- Epub struct Pool {( W9 q4 V  @, p( W0 g3 p# a+ [
    workers: Vec&lt;Worker&gt;,
3 o; b- e  L; M+ a    max_workers: usize,( y* f; X: T2 {2 w% ~4 r9 Y0 p
    sender: mpsc::Sender&lt;Message&gt;% u: S! z  A3 [. s8 \/ T6 H9 Q
}
* C/ L. l! U* w& u0 r6 }* r( v' W' w1 X. S5 R$ F2 K
impl Pool where {
3 `' X& t( a' Z2 P" k* O, D8 N5 `, K    pub fn new(max_workers: usize) -&gt; Pool {* D/ O( p) F/ [
        if max_workers == 0 {3 S% g/ y& Y1 W/ s
            panic!("max_workers must be greater than zero!")$ f/ g4 Q# Q( o( R
        }
( b+ ]% f( _7 H        let (tx, rx) = mpsc::channel();/ ~  K- S! R- X* |
  X' t6 d9 h2 G9 `4 [' c, O% ?
        let mut workers = Vec::with_capacity(max_workers);) H+ F3 H' H  T
        let receiver = Arc::new(Mutex::new(rx));0 b1 h7 [9 n8 c! s$ Q8 \
        for i in 0..max_workers {
6 c4 J, n4 K6 d( n, ^            workers.push(Worker::new(i, Arc::clone(&amp;receiver)));
/ R, G/ b8 T, M1 I        }
3 [7 j4 j& U  w2 `0 }
3 P+ ^+ i! \9 J+ R4 ~9 F: l        Pool { workers: workers, max_workers: max_workers, sender: tx }1 W, Z/ d3 Z2 ?/ h! `5 ~: a8 L9 N
    }
9 d5 |2 U+ P8 {& N9 Z9 T; v6 n    + Q- M% w2 ?$ o- [( u# J
    pub fn execute&lt;F&gt;(&amp;self, f:F) where F: FnOnce() + 'static + Send
5 ?% G+ v9 |1 B, g$ Z* p    {" A8 s' Q0 `6 b; z9 d& B
' m- C' ~# P9 A5 }# ^; M
        let job = Message::NewJob(Box::new(f));- x4 }: g! M% d/ W
        self.sender.send(job).unwrap();6 N  x* T% A2 v) N1 Y3 k
    }1 X1 \( w: l* T6 f7 W
}
6 T& I+ o# Z" x% M9 G$ @( L
7 t% |+ R/ K+ @) Simpl Drop for Pool {
0 v7 D6 @+ o3 N* O& K    fn drop(&amp;mut self) {: g: s; l% I) k/ L1 R- w, i+ c! Y
        for _ in 0..self.max_workers {
6 i! S% l. H6 P* ?0 S: a            self.sender.send(Message::ByeBye).unwrap();
. Y8 o% D+ Y3 I8 K        }
; i3 ^, t+ u  u0 Q        for w in self.workers {
! |& h/ n( Z/ H# X) Q. }            if let Some(t) = w.t.take() {8 q# k8 J2 c% d" |: g" F" ~
                t.join().unwrap();+ B% I1 S1 R* z
            }
6 M- ^' s6 w7 W  Z& V7 b3 l        }
: P! \4 m/ c" V" n+ o+ c    }
( M3 i# p6 O6 |* R5 M}
0 ]8 u3 r) Z$ p& z+ T, c# I, p
# ~6 C5 t5 a0 m1 ^$ K0 e% G3 ?+ M1 W) a2 I2 g
#[cfg(test)]
/ o2 _) h  _4 _mod tests {
- Z9 y* d  R6 n4 p    use super::*;3 ^/ c+ O2 W+ _% b$ `% c
    #[test]
* m. v. L  Q% s0 j9 ~    fn it_works() {8 J! p8 _& g+ r2 ?/ N# I  O
        let p = Pool::new(4);
% p1 m* X/ A% z2 Z; n        p.execute(|| println!("do new job1"));* z8 l! B6 t/ T/ E5 v5 L! @) \
        p.execute(|| println!("do new job2"));, U7 ]. A+ G7 V6 k8 V
        p.execute(|| println!("do new job3"));& V) h& F: Z* u
        p.execute(|| println!("do new job4"));
3 D/ D$ P1 Z) L. {5 j9 e    }
  l/ M, Z/ D: E4 W2 s+ _( S+ ^0 V}
7 M* m: z3 a, Y, F- a</code></pre>1 W% ~& i$ ~% d$ p( f( O
: u! i) v# q/ H1 _" D! n
回复

使用道具 举报

懒得打字嘛,点击右侧快捷回复 【右侧内容,后台自定义】
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

手机版|飞雪团队

GMT+8, 2025-11-30 18:25 , Processed in 0.072988 second(s), 22 queries , Gzip On.

Powered by Discuz! X3.4

Copyright © 2001-2021, Tencent Cloud.

快速回复 返回顶部 返回列表