飞雪团队

 找回密码
 立即注册
搜索
热搜: 活动 交友 discuz
查看: 14918|回复: 0

rust 实战 - 实现一个线程工作池 ThreadPool

[复制链接]

8087

主题

8175

帖子

2万

积分

管理员

Rank: 9Rank: 9Rank: 9

积分
26591
发表于 2022-2-12 14:35:42 | 显示全部楼层 |阅读模式

$ K/ m$ Z. s4 j- {6 w' e$ [  C<h1 id="如何实现一个线程池">如何实现一个线程池</h1>
1 y3 I/ U( \# g  D, a5 _) a<p>线程池:一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。 例如,对于计算密集型任务,线程数一般取cpu数量+2比较合适,线程数过多会导致额外的线程切换开销。</p>
2 i- b: A) w& ~( [<p>如何定义线程池Pool呢,首先最大线程数量肯定要作为线程池的一个属性,并且在new Pool时创建指定的线程。</p>
" s% L  [' L" C" I* ~<p>线程池Pool</p>9 V6 q0 K' a" D- i: v2 }7 `
<pre><code>pub struct Pool {
+ t+ F1 u! ^* D- S: L- @6 D  max_workers: usize, // 定义最大线程数
9 a6 P0 _' t& B( Y: i" ~: B}
- X9 J* @7 r% d. }: r
7 a5 N$ {6 T! ~0 E4 simpl Pool {0 y6 D3 N# X4 K! t( i; x
  fn new(max_workers: usize) -&gt; Pool {}
; O  S0 ^7 H! J& f. w  fn execute&lt;F&gt;(&amp;self, f:F) where F: FnOnce() + 'static + Send {}! _: _# E$ x% \9 f
}
& t& Y9 w" g) {, K- S
  ]/ q( y2 G( A</code></pre>8 M$ }" r+ D! D% N+ p1 }
<p>用<code>execute</code>来执行任务,<code>F: FnOnce() + 'static + Send</code> 是使用thread::spawn线程执行需要满足的trait, 代表F是一个能在线程里执行的闭包函数。</p>/ m2 a* ?/ _* S2 Y- n% T9 M! \; K
<p>另一点自然而然会想到在Pool添加一个线程数组, 这个线程数组就是用来执行任务的。比如<code>Vec&lt;Thread&gt;</code> balabala。这里的线程是活的,是一个个不断接受任务然后执行的实体。<br>
. d, ?  M, f+ a+ }9 Q% c可以看作在一个线程里不断执行获取任务并执行的Worker。</p>
/ J( Q5 A, v% v9 h<pre><code>struct Worker where) }% v" l( ~# s0 F: [$ e# |
{
6 `5 @$ R4 |  c    _id: usize, // worker 编号4 y& N9 A7 K6 P8 S! X9 c
}
6 h; p7 _: T6 \3 P5 e</code></pre>' a: a7 C' Z& T6 N; v5 M1 n7 L6 J% @
<p>要怎么把任务发送给Worker执行呢?mpsc(multi producer single consumer) 多生产者单消费者可以满足我们的需求,<code>let (tx, rx) = mpsc::channel()</code> 可以获取到一对发送端和接收端。<br>3 C$ t, [. I! K$ K& l
把发送端添加到Pool里面,把接收端添加到Worker里面。Pool通过channel将任务发送给多个worker消费执行。</p>3 K! {  C8 r  |# C( |! r
<p><strong>这里有一点需要特别注意,channel的接收端receiver需要安全的在多个线程间共享</strong>,因此需要用<code>Arc&lt;Mutex::&lt;T&gt;&gt;</code>来包裹起来,也就是用锁来解决并发冲突。</p>
: D9 v8 m4 V* o* D6 v/ M  z<p>Pool的完整定义</p>
+ P+ H7 u) z. q0 C1 ~- i8 m9 ]9 Y<pre><code>pub struct Pool {+ E( V/ F9 N4 L
    workers: Vec&lt;Worker&gt;,
0 s3 j( p" l9 b  s    max_workers: usize,3 I3 d: e6 }3 Y' y  }' t
    sender: mpsc::Sender&lt;Message&gt;6 d9 V( \. o: g0 t7 J
}4 n& J: X3 y5 r8 W  A, \
</code></pre>
% w2 n9 z  G- P4 p, e9 P8 a0 Y<p>该是时候定义我们要发给Worker的消息Message了<br>
/ V2 }9 B7 ]& N4 G4 z0 _  I定义如下的枚举值</p>
* E( C7 H) W8 U3 ~: j5 v<pre><code>type Job = Box&lt;dyn FnOnce() + 'static + Send&gt;;- U+ o: ]' c8 ]8 Y+ ?3 B
enum Message {% z5 H( G$ v, @  N$ S; e# {
    ByeBye,
% N) m: O6 }1 |4 L    NewJob(Job),
; @$ I* |: m4 e: O* u0 Q+ _}
9 g: I- m4 Y9 y6 z: g1 V9 S</code></pre>
3 k1 x$ y$ X# K, ?( ^<p>Job是一个要发送给Worker执行的闭包函数,这里ByeBye用来通知Worker可以终止当前的执行,退出线程。</p>9 L/ |- `; I$ p- q$ u
<p>只剩下实现Worker和Pool的具体逻辑了。</p>* o. b0 d% n7 \6 C* }
<p>Worker的实现</p>
) T) {; E9 m" a" C5 e4 `0 u& f9 F<pre><code>impl Worker& s8 H" h. p+ a" N
{. U. p. @; H" K9 S' e
    fn new(id: usize, receiver: Arc::&lt;Mutex&lt;mpsc::Receiver&lt;Message&gt;&gt;&gt;) -&gt; Worker {& m% n: r3 U, f
        let t = thread::spawn( move || {
. D7 |  Q5 Q, J            loop {6 h& L: o' Z; y( G8 Z+ [4 k" N+ T' e
                let receiver = receiver.lock().unwrap();
- D! X. J% ]& c# w* C# C! H                let message=  receiver.recv().unwrap();
( i  }& X8 h- P0 ~8 W- J                match message {  v8 y" M1 F( {
                    Message::NewJob(job) =&gt; {2 H9 d& H1 `" D# Z
                        println!("do job from worker[{}]", id);
$ F) M# Y5 I; l; C. U' _4 r  d                        job();: a9 P* `! A! ^, V( G
                    },
* `( i( f3 G+ M% u0 W                    Message::ByeBye =&gt; {
/ p2 t* Y) R) Y& W; ?6 s                        println!("ByeBye from worker[{}]", id);! H5 |- u( O7 l& }! E9 v
                        break( ~5 _. w7 k2 O! a) A) Y
                    },7 k- |! J8 k9 R1 W0 A4 l' R
                }  
4 }6 R9 P- y# L- u& h* c! d- e" n            }2 r* E4 M+ c) M; O0 o
        });8 }, ^& G& K! ]; P. r

9 c2 w5 T" t# h  H        Worker {  p6 p0 j" b' H7 W$ H* b
            _id: id,
7 {& {; x; a: }            t: Some(t),% F. S) e4 ~* i
        }
: {4 p9 ^) ^# o  u3 m3 B- ^    }
7 a! m* ~7 m3 x$ e: O}
" K# r9 r2 c8 X! A, `: h</code></pre>
4 z. H( F$ }% M<p><strong>let message = receiver.lock().unwrap().recv().unwrap();</strong> 这里获取锁后从receiver获取到消息体,然后let message结束后rust的生命周期会自动释放掉锁。<br>  B1 v5 Y% g9 K, N7 V" Y! w
但如果写成</p>
$ o2 S' z4 C1 ]( z6 j0 _0 x<pre><code>while let message = receiver.lock().unwrap().recv().unwrap() {. d* i4 H2 m% @# g0 k2 i9 N$ D
};5 L. A' s8 b' i3 g! i
</code></pre>
6 H: u1 S9 B( B+ S* A" v0 K<p>while let 后面整个括号都是一个作用域,要在这个作用域结束后,锁才会释放,比上面let message要锁定久时间。<br>
, k3 O% i( n9 b  L' M# rrust的mutex锁没有对应的unlock方法,由mutex的生命周期管理。</p>: i$ C# B- X1 R- N7 w) w' B
<p>我们给Pool实现<code>Drop</code> trait, 让Pool被销毁时,自动暂停掉worker线程的执行。</p>* O& k/ b5 V+ o: t3 S/ C
<pre><code>impl Drop for Pool {3 F) m& X' h% w6 @! v
    fn drop(&amp;mut self) {
7 Y6 G" _0 g# u" H        for _ in 0..self.max_workers {
5 f# _$ ]* T/ `& q' g            self.sender.send(Message::ByeBye).unwrap();
& y/ U8 U- M% r. s2 x        }
7 W! `  l- o: B( @+ t  G# l        for w in self.workers.iter_mut() {
0 X4 O/ j# E% T5 w% A) \0 a            if let Some(t) = w.t.take() {
4 S$ v1 P% g! ~                t.join().unwrap();- s( ]5 d) H$ ?$ Z+ c6 H6 `
            }
2 p! [+ H* Y* [1 a$ n        }4 A, v7 R; ^3 G
    }. G- @. d3 m9 {) S
}( a1 }% Q4 ^( @9 u3 X0 l

9 R$ z4 p* ]6 V+ W1 s6 P( [</code></pre>
0 w- P! G0 T. V4 Y8 b1 O9 b<p><strong>drop方法里面用了两个循环</strong>,而不是在一个循环里做完两件事?</p>
( J. F# {, R! o, A% D! w- C<pre><code>for w in self.workers.iter_mut() {
! r5 f1 Y  S( f$ ^3 X    if let Some(t) = w.t.take() {
# g) p$ S+ H5 g, e" @* Z1 L        self.sender.send(Message::ByeBye).unwrap();' F; m' o' ?0 H8 M- e
        t.join().unwrap();* d) H' l" `. z( I9 N
    }
0 N! z* L) U# M4 q/ U/ Y0 Y}( Q% e1 ?6 U1 k; H
% T5 U' J' v5 o, Z+ ^
</code></pre>
! `& x) n, S2 n5 q! W<p>这里面隐藏了一个会造成死锁的陷阱,比如两个Worker, 在单个循环里面迭代所有Worker,再将终止信息发送给通道后,直接调用join,<br>
9 W$ q! l. y# i  J1 P: P4 @2 n9 V我们预期是第一个worker要收到消息,并且等他执行完。当情况可能是第二个worker获取到了消息,第一个worker没有获取到,那接下来的join就会阻塞造成死锁。</p>
# U4 h! X: V: a! d. _& l<p><strong>注意到没有,Worker是被包装在Option内的</strong>,这里有两个点需要注意</p># s) x0 O3 S2 w" g& k" m, o
<ol># ~0 U  p  E* i  ?: |, `, Q* {& ^
<li>t.join 需要持有t的所有权</li>
4 K+ o1 P; B' ]( @$ ]/ L<li>在我们这种情况下,self.workers只能作为引用被for循环迭代。</li>
0 a) [2 o# x# Q. E</ol>( S/ B5 l+ p% W" `# W% }6 x4 U
<p>这里考虑让Worker持有<code>Option&lt;JoinHandle&lt;()&gt;&gt;</code>,后续可以通过在Option上调用take方法将Some变体的值移出来,并在原来的位置留下None变体。<br>" G" u* d: D# D! a2 {  \
换而言之,让运行中的worker持有Some的变体,清理worker时,可以使用None替换掉Some,从而让Worker失去可以运行的线程</p>  Q6 U5 Y9 @; ]" {, x* q$ t
<pre><code>struct Worker where
! a0 L1 S: V0 u/ J{
. K* V$ O4 C1 I& n; w0 {    _id: usize,
* [; d# C: Q# j7 ^* g2 [) ^    t: Option&lt;JoinHandle&lt;()&gt;&gt;,2 O, K6 ]+ n2 d; |3 x" M; H
}
6 }5 R/ ^1 r! O$ p  i& F</code></pre>
. w" f! d; S! h+ V<h1 id="要点总结">要点总结</h1>
$ t% G6 u: F7 A& `9 w( m<ul>6 b1 C1 w" M4 A, m
<li>Mutex依赖于生命周期管理锁的释放,使用的时候需要注意是否逾期持有锁</li>
8 }7 Z5 {# F- V, W<li><code>Vec&lt;Option&lt;T&gt;&gt;</code> 可以解决某些情况下需要T所有权的场景</li>
( Q  R+ q1 P( x% z1 p# J- k5 L</ul>( N: p6 Q. s3 c1 w* j' l
<h1 id="完整代码">完整代码</h1>0 U7 ]; P; X. b
<pre><code>use std::thread::{self, JoinHandle};
0 v, q- t8 p; e4 r9 C+ C. Puse std::sync::{Arc, mpsc, Mutex};% A3 N" ]7 q5 {4 K$ D4 k

/ C7 R9 X* Q* q6 g; S/ O
* t) i; A& }! ^7 b1 K) \type Job = Box&lt;dyn FnOnce() + 'static + Send&gt;;
( O% @! K$ r0 T8 O) p$ Tenum Message {% W- g" ?& P, w5 g9 {# D
    ByeBye,
) X* Q  v' X. s  S  o    NewJob(Job),
% t. A) d/ U1 P& O! Z}: S( Q. P' }5 W. C
9 K+ P7 M0 f: W* w* L2 E9 u0 D
struct Worker where7 |; }, j  e, W# a' c" J9 Y' g
{) D5 h+ _8 g1 N+ n. P; V
    _id: usize,
6 G% U0 {  N3 u' M1 ~! Q* c    t: Option&lt;JoinHandle&lt;()&gt;&gt;,
: s. _% ^7 \! A! }9 q3 @  j6 v}7 _' C4 c( I$ l9 |* T* j3 r! T

$ j* R4 ^: [) Y. Ximpl Worker
' F/ B1 S! m/ |( D8 L/ e/ Y2 d! G3 [# A{. c: G3 i: f+ Y1 Z% ]7 D
    fn new(id: usize, receiver: Arc::&lt;Mutex&lt;mpsc::Receiver&lt;Message&gt;&gt;&gt;) -&gt; Worker {) b) K; o+ |7 t; y1 m
        let t = thread::spawn( move || {9 o' ?; j) I4 d7 G$ x& ^4 o% s$ Q8 t
            loop {
6 M5 @% w& {) }1 X$ V6 v                let message = receiver.lock().unwrap().recv().unwrap();
" n" t9 h& [6 e5 t( G9 U3 F                match message {1 D4 O, p1 x$ J4 _2 e: d
                    Message::NewJob(job) =&gt; {
6 O" ]7 E/ Y. {, ~                        println!("do job from worker[{}]", id);1 T+ g0 y2 n, E! E& O9 r& G) K- t  h
                        job();: y+ |) P- W, t# H7 A0 `5 L3 H
                    },
6 B* ^& N/ }9 y# Q7 p                    Message::ByeBye =&gt; {* B% O6 e$ j- O5 `+ i; a: l
                        println!("ByeBye from worker[{}]", id);
/ M3 ?2 o3 g; A4 m# U                        break! M# A+ v" ]4 y4 g+ m
                    },
. r7 f" J' V, A- |                }  
7 F% J& W6 s% F* _- B# W- y% a            }
+ g, [0 T6 d6 Z        });. T* S4 x% R7 N3 x) ^

, {' ~7 [5 {5 ^& r6 ]$ i1 F! S        Worker {5 y! k- ]) j8 O) Z! h) w( C6 @) _
            _id: id,
% @; X7 X+ y; e+ D            t: Some(t),3 G1 H6 B* }1 c% `6 d
        }6 b. c0 v2 n3 l# Y% F8 P' r5 Q
    }3 Y: I8 F! J% O) G: T
}; f6 Z% i: X1 T/ A* `, q% Q2 p
9 f/ n5 Z% `6 j! E
pub struct Pool {
' W2 x3 a: A& a+ u0 y7 f+ @7 e% Z    workers: Vec&lt;Worker&gt;,# q3 n: X) O, W& @3 `7 ^6 O2 C9 e
    max_workers: usize,
, h, i) `* \: ?: j  u1 O    sender: mpsc::Sender&lt;Message&gt;$ V# h( |# T; P  x: U: d
}
* v& ?9 z' Z) f% S/ u
; y& X1 ]: ?' |  ?0 kimpl Pool where {0 K* V% V+ t1 T& A7 a4 f
    pub fn new(max_workers: usize) -&gt; Pool {/ v) O$ k0 \4 u0 K
        if max_workers == 0 {" v, g6 {# \& [2 y! [
            panic!("max_workers must be greater than zero!"): A# a& `: h! ^$ G6 j
        }, d0 f) C$ l) B) c5 i; l0 J! g
        let (tx, rx) = mpsc::channel();) R. ^( x/ E5 ]7 x+ z9 u
/ [6 o& }# w" J  H1 W
        let mut workers = Vec::with_capacity(max_workers);
7 b1 O  l/ `. N7 E% |8 O' d        let receiver = Arc::new(Mutex::new(rx));
+ [, N- n! X/ l8 Q- a        for i in 0..max_workers {
1 W, z) {. R! w8 y$ [# z+ ~! Q            workers.push(Worker::new(i, Arc::clone(&amp;receiver)));3 p$ c$ k& ]& Q5 L: p9 t3 R
        }
2 ~2 j0 a$ k/ n6 t3 n
$ @3 F/ a/ i" k) g- P+ F* f. Z        Pool { workers: workers, max_workers: max_workers, sender: tx }
, a- m4 z" c! q( h, f    }
' M* @0 P. ~5 ~   
2 ^* q- i& k4 k. u3 z% t    pub fn execute&lt;F&gt;(&amp;self, f:F) where F: FnOnce() + 'static + Send* ]9 y2 W: ~5 N& W. N
    {
! j5 R* V; ~. O7 Q  U' I; z4 n' }& n  m, s3 V+ q
        let job = Message::NewJob(Box::new(f));5 E8 p* Q# c' H7 P; C/ e+ n
        self.sender.send(job).unwrap();- U3 v; Y6 w% n% {7 G
    }& C( X8 Y8 }( K4 d* V
}1 ^5 r' s0 a! A1 ]3 H4 G6 n

* e" J: O- K# Y+ Q' v% zimpl Drop for Pool {
( f. C" a* g9 G" x0 c! O& L: a    fn drop(&amp;mut self) {- o1 \$ D& ^6 r% Q0 W6 r! M1 c
        for _ in 0..self.max_workers {) _. n1 |5 E6 s1 Y9 S" D
            self.sender.send(Message::ByeBye).unwrap();9 N% {0 n6 t5 `
        }! G. L0 l; E. s. s5 H
        for w in self.workers {! o6 h9 `" g) Y% S* ^8 }3 B
            if let Some(t) = w.t.take() {
1 Y" s: Q* M3 ~7 s                t.join().unwrap();
  a4 ?8 M/ u# O            }+ |' _- j. }8 T+ ^7 B: y% b& H% z
        }8 X/ m4 F5 i* [( v7 H. C3 d9 i
    }- m/ t* ^( w, [% f
}
# x3 O' C# Y5 o) [  h, j5 Z/ s  o2 H/ G! f1 ]% ?

# w5 f4 M9 H% V# Y% c0 R1 r#[cfg(test)]7 Z% O8 |1 t6 h  F8 U
mod tests {9 `4 X2 b  h  r+ s
    use super::*;
1 `  S. W% l- x$ \3 z9 e4 O    #[test]2 w8 j( l: X0 o! h! v
    fn it_works() {
9 E" X/ Z/ @8 t1 L- S        let p = Pool::new(4);
" m7 ]2 y- c/ G9 n$ ~( q% b        p.execute(|| println!("do new job1"));; f8 I( N! \9 R  `( m
        p.execute(|| println!("do new job2"));: N; X! U1 h0 \6 U! e7 w! C! Y
        p.execute(|| println!("do new job3"));
5 N1 _2 {5 G% G6 f        p.execute(|| println!("do new job4"));
; A6 |5 S. p* A, i    }) l" g& M% u% j6 M7 X& b) b
}
" L2 z! Z4 Z" A3 X, e, H; z1 `</code></pre>6 z5 x7 Y1 z0 z0 u

, G! K1 r! A: N8 x
回复

使用道具 举报

懒得打字嘛,点击右侧快捷回复 【右侧内容,后台自定义】
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

手机版|飞雪团队

GMT+8, 2025-12-18 00:44 , Processed in 0.067506 second(s), 22 queries , Gzip On.

Powered by Discuz! X3.4

Copyright © 2001-2021, Tencent Cloud.

快速回复 返回顶部 返回列表