飞雪团队

 找回密码
 立即注册
搜索
热搜: 活动 交友 discuz
查看: 16790|回复: 0

rust 实战 - 实现一个线程工作池 ThreadPool

[复制链接]

8590

主题

8678

帖子

2万

积分

管理员

Rank: 9Rank: 9Rank: 9

积分
28100
发表于 2022-2-12 14:35:42 | 显示全部楼层 |阅读模式

( X: S' c% p8 C. x$ R<h1 id="如何实现一个线程池">如何实现一个线程池</h1>- G: Y. D: B5 M: P4 v: G
<p>线程池:一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。 例如,对于计算密集型任务,线程数一般取cpu数量+2比较合适,线程数过多会导致额外的线程切换开销。</p>
" O: \4 x4 \, A<p>如何定义线程池Pool呢,首先最大线程数量肯定要作为线程池的一个属性,并且在new Pool时创建指定的线程。</p>' }8 R% E- o) X& l7 J. M5 X
<p>线程池Pool</p>1 n3 }& c/ J  T. v( c  I
<pre><code>pub struct Pool {
: T+ }- e' h5 F2 v4 b  [  max_workers: usize, // 定义最大线程数& x% @- z1 _# z2 J6 u1 D! l2 a7 G
}
# d" `# Z' X, i2 G& Y' |/ g2 j8 @7 \
impl Pool {
5 g: i: A( `2 R  fn new(max_workers: usize) -&gt; Pool {}
* ^4 I  a* v. B' _# V8 D# h  P  fn execute&lt;F&gt;(&amp;self, f:F) where F: FnOnce() + 'static + Send {}) @( o! }5 r: D/ p8 `$ O* S( p
}! d* u& ]7 p2 e! k; G2 g9 k1 X+ W1 v

; z8 i# l- f6 B! P" K  n9 g</code></pre>
6 H2 D" Z; }/ ?$ S<p>用<code>execute</code>来执行任务,<code>F: FnOnce() + 'static + Send</code> 是使用thread::spawn线程执行需要满足的trait, 代表F是一个能在线程里执行的闭包函数。</p>
5 s8 [8 F0 H& X" F' X( l) H<p>另一点自然而然会想到在Pool添加一个线程数组, 这个线程数组就是用来执行任务的。比如<code>Vec&lt;Thread&gt;</code> balabala。这里的线程是活的,是一个个不断接受任务然后执行的实体。<br>
' ?' t% S0 a/ M* y6 O可以看作在一个线程里不断执行获取任务并执行的Worker。</p>& X; j( g: k1 U: X6 D. [; ~
<pre><code>struct Worker where) p) d9 w4 z! a: i+ `
{
% N5 `, A! D4 w4 Y! o! J    _id: usize, // worker 编号; X6 n- h; ?. f$ q
}" M. q9 i# y$ Y1 `  s/ }! d
</code></pre>
* k9 {, x- y5 z: N0 q<p>要怎么把任务发送给Worker执行呢?mpsc(multi producer single consumer) 多生产者单消费者可以满足我们的需求,<code>let (tx, rx) = mpsc::channel()</code> 可以获取到一对发送端和接收端。<br>
' V% O$ S* W1 Z5 W5 I6 f把发送端添加到Pool里面,把接收端添加到Worker里面。Pool通过channel将任务发送给多个worker消费执行。</p># [. h* z- D7 z0 c/ E
<p><strong>这里有一点需要特别注意,channel的接收端receiver需要安全的在多个线程间共享</strong>,因此需要用<code>Arc&lt;Mutex::&lt;T&gt;&gt;</code>来包裹起来,也就是用锁来解决并发冲突。</p>
) x7 `" C9 Q' E  [8 U) @<p>Pool的完整定义</p>
) B; _9 p1 s; p5 K. Z<pre><code>pub struct Pool {: e' M- S9 q$ L1 _( a; h
    workers: Vec&lt;Worker&gt;,' [1 t* ^- q' q/ y' D- D5 T/ t
    max_workers: usize,
% U; m9 y$ P1 j& G1 \- D    sender: mpsc::Sender&lt;Message&gt;
% m& c1 \6 Y, F0 I. z* d}$ X' N8 m: S8 e+ f$ L3 U
</code></pre>2 C( l% d3 A) m9 Q0 P: l( d4 Y! V& w
<p>该是时候定义我们要发给Worker的消息Message了<br>+ A  X  H( F+ \( O
定义如下的枚举值</p>
- m& o, [1 Z5 \3 l: y  K7 t<pre><code>type Job = Box&lt;dyn FnOnce() + 'static + Send&gt;;# S( [$ ~9 C* a9 P/ x
enum Message {8 G& G" K1 M* F- s  @! d
    ByeBye,6 w; R9 v, r) y4 V
    NewJob(Job),
2 V. p& u0 {% U% u$ ~* ?% A}% Q# I! a- J' f, u1 C) {$ u
</code></pre>7 {( C5 Y( n* c
<p>Job是一个要发送给Worker执行的闭包函数,这里ByeBye用来通知Worker可以终止当前的执行,退出线程。</p>. X* k; t* k7 h0 a( F; F( \
<p>只剩下实现Worker和Pool的具体逻辑了。</p>+ I5 f8 ^7 Z, J2 d: O9 X
<p>Worker的实现</p>
: ^. T. _7 e* p- l5 V" }, u1 u2 b<pre><code>impl Worker/ K0 a; y( u: @
{
* o- |, Q3 i: m/ N: @  o5 F    fn new(id: usize, receiver: Arc::&lt;Mutex&lt;mpsc::Receiver&lt;Message&gt;&gt;&gt;) -&gt; Worker {9 C% @& \8 W$ q) g6 \9 I1 ^) E' i2 b) Z
        let t = thread::spawn( move || {8 [* {, J2 ]% K  n# T- O$ _
            loop {5 J1 y1 ]3 D% z2 G# q5 y
                let receiver = receiver.lock().unwrap();
+ P5 W) A/ S. b; G, r/ S9 s                let message=  receiver.recv().unwrap();
0 R8 L! A( d' R3 E5 U5 O                match message {- N) x* ~4 ?+ B9 W& `# R
                    Message::NewJob(job) =&gt; {
% _2 v$ t- `5 x( w                        println!("do job from worker[{}]", id);& K$ R9 }# J  K% B7 ~" c& ?
                        job();& T' e. S) z. o# I4 d0 T, @) m
                    },
3 I  T8 U) l* _7 Y2 m* B% ^6 h                    Message::ByeBye =&gt; {
; W: p0 m7 Z" H! s1 J                        println!("ByeBye from worker[{}]", id);
8 q# @2 V! O" r2 o                        break' x7 f' I' B( {7 ^
                    },
) [. h2 j! S) ]( V( J                }  
* c. c: N/ l' f' w( }/ b8 K            }
  ~0 `& P$ y. o) O        });
7 u* T6 P2 d" n+ B- G; j" y6 f! B4 P  m
        Worker {$ ]! e6 o+ C' v& n+ ^$ c& R; A
            _id: id,
- L2 F) U; I: o% R            t: Some(t),2 E/ y( v$ o8 H0 S  x6 x
        }8 N; ~* m# h4 o) p- j1 v; S* k5 H
    }; E/ N2 |+ {; j4 ^( P$ C7 N: V
}
) T+ s" n0 q* c* I6 C, f4 M</code></pre>5 E& Q( U) `/ c* w# ~7 z/ t% D5 ^7 J6 f' R3 u
<p><strong>let message = receiver.lock().unwrap().recv().unwrap();</strong> 这里获取锁后从receiver获取到消息体,然后let message结束后rust的生命周期会自动释放掉锁。<br>
7 b2 i5 m4 f& P& W# E3 K但如果写成</p>
: e/ [4 |2 |4 l5 p# m<pre><code>while let message = receiver.lock().unwrap().recv().unwrap() {0 v& ]4 n& p" y" ~
};
' p; |" c9 o& ^  h& w& [</code></pre>/ x( j% E) ?8 M3 g4 \
<p>while let 后面整个括号都是一个作用域,要在这个作用域结束后,锁才会释放,比上面let message要锁定久时间。<br>
( x! x; {1 l" V5 j( ^, orust的mutex锁没有对应的unlock方法,由mutex的生命周期管理。</p>
( h" F- `3 m) c& J" ]5 e1 n* k" T<p>我们给Pool实现<code>Drop</code> trait, 让Pool被销毁时,自动暂停掉worker线程的执行。</p>
' M% g% G! E4 s) P: s7 X<pre><code>impl Drop for Pool {- f5 o" f. @! X: q
    fn drop(&amp;mut self) {; E) ^- H' N, m3 v* r
        for _ in 0..self.max_workers {
1 b! }: a6 a. [3 Z  c" s" `            self.sender.send(Message::ByeBye).unwrap();
; `+ y  a7 s( ?8 O6 i5 l        }0 V" g3 }5 C. a% K
        for w in self.workers.iter_mut() {
1 t5 M3 n/ J% F+ e5 g% |- o1 ~            if let Some(t) = w.t.take() {0 e: y% @1 U0 F
                t.join().unwrap();# O% C/ W1 ?2 u6 q' j) T
            }6 d8 e9 m) d1 }. V4 ~
        }
7 B3 V/ q5 u4 ]2 A    }
: x4 Y  B5 d0 A, |; {}
" `8 p; i/ a! ^8 [/ A  {5 y) D) a! r4 ]0 N, y
</code></pre>7 q  N$ [: k& A! V# w' ]
<p><strong>drop方法里面用了两个循环</strong>,而不是在一个循环里做完两件事?</p>
& i8 W9 k2 p( L; ]) H9 {# P<pre><code>for w in self.workers.iter_mut() {* X/ y, \6 b  s. P" ]' L' G
    if let Some(t) = w.t.take() {. P9 R& k9 T6 [  f+ T
        self.sender.send(Message::ByeBye).unwrap();
' w6 z; v% r( B0 {% h: m. f        t.join().unwrap();
0 d* b0 j; f4 Y6 X% }    }# K! [% k6 T. i, U
}+ _! g) e2 Z( v4 ^5 `. `. k8 r2 O

) l3 C9 Y* q1 Y/ c0 i) e3 d</code></pre>/ E3 P9 [# Z% s, s
<p>这里面隐藏了一个会造成死锁的陷阱,比如两个Worker, 在单个循环里面迭代所有Worker,再将终止信息发送给通道后,直接调用join,<br>
/ b. m7 V- G2 @9 q& t我们预期是第一个worker要收到消息,并且等他执行完。当情况可能是第二个worker获取到了消息,第一个worker没有获取到,那接下来的join就会阻塞造成死锁。</p>
  X. g7 P+ j0 C  [1 q/ u$ P' r0 l* I<p><strong>注意到没有,Worker是被包装在Option内的</strong>,这里有两个点需要注意</p>
7 B7 j( g7 I+ G7 q  v- @# P- l  [<ol>1 U, |2 A" e/ c$ D
<li>t.join 需要持有t的所有权</li>
5 ?/ B0 o) f- A9 D4 j<li>在我们这种情况下,self.workers只能作为引用被for循环迭代。</li>0 H- u3 ~: c# S7 a9 Z: h
</ol>
/ m5 U- F6 B# _3 B3 G<p>这里考虑让Worker持有<code>Option&lt;JoinHandle&lt;()&gt;&gt;</code>,后续可以通过在Option上调用take方法将Some变体的值移出来,并在原来的位置留下None变体。<br># D6 M* N& ?9 C: E
换而言之,让运行中的worker持有Some的变体,清理worker时,可以使用None替换掉Some,从而让Worker失去可以运行的线程</p>
0 v) M0 v; ~  V<pre><code>struct Worker where
( M$ r' ~# L. A" `; ~9 S{+ F1 ~5 A1 |' P+ B" Q
    _id: usize,
7 l  G9 r9 D; \% O+ ^0 N! g    t: Option&lt;JoinHandle&lt;()&gt;&gt;,
* Y3 {2 b* W5 x. J}
/ `3 v) ?- Q4 y</code></pre>; B, ]; l3 D( ?6 q9 f
<h1 id="要点总结">要点总结</h1>
& l# x& P6 Z  P( B! f$ q<ul>
4 Y; j  c+ V2 e8 ~4 E5 a7 Y<li>Mutex依赖于生命周期管理锁的释放,使用的时候需要注意是否逾期持有锁</li>4 L7 E% a: P& M1 K. S
<li><code>Vec&lt;Option&lt;T&gt;&gt;</code> 可以解决某些情况下需要T所有权的场景</li>0 o$ ~# n) T; P1 _1 z% S' n
</ul>* d1 O' `+ ?* G5 D
<h1 id="完整代码">完整代码</h1>
: u! M- m, J& F! N<pre><code>use std::thread::{self, JoinHandle};' P+ P2 w8 C9 Y$ B1 N0 v7 l
use std::sync::{Arc, mpsc, Mutex};
/ L! ?, w" Y2 R7 Z3 j  x
/ E$ y9 l* b) i" M5 {; v" a% B2 U5 v( u3 l3 \% ]& C) P
type Job = Box&lt;dyn FnOnce() + 'static + Send&gt;;  L# S( A/ |. }9 G2 z) ?
enum Message {' V7 I3 X" }" {3 H5 s; H) A
    ByeBye,/ o& A! D0 p  @& T- Z
    NewJob(Job),% r$ X% _6 a8 _5 _8 g
}
  I: Y! B' R0 @- `3 O1 Q
( \; B8 }+ q% Cstruct Worker where
" t' B2 H$ b( `2 _) T( V( c{) m( I7 X8 j. f0 Y+ \1 ?6 h* ^
    _id: usize,6 c# m6 f/ M9 v8 Y( Q2 Z* W$ [
    t: Option&lt;JoinHandle&lt;()&gt;&gt;,' h+ E: u: w8 C+ ]- l8 \5 z9 Q3 x
}/ O; |% n& A' K) l" @
7 G& Q6 Z; I3 F9 r' {+ Z0 ~! O
impl Worker8 v8 d! Y+ K  J2 U. K+ L
{
6 o7 s$ U* a6 o. y7 w( }; M4 y5 g: h    fn new(id: usize, receiver: Arc::&lt;Mutex&lt;mpsc::Receiver&lt;Message&gt;&gt;&gt;) -&gt; Worker {" \" L7 r& h6 `9 C% |
        let t = thread::spawn( move || {+ K# I7 \' n; u! u1 o
            loop {
; j" M- T7 k. \0 [0 a                let message = receiver.lock().unwrap().recv().unwrap();, I) T: n$ b1 I
                match message {
( J( e" h4 \3 a                    Message::NewJob(job) =&gt; {, c4 I4 N! t- x& ~; S- o
                        println!("do job from worker[{}]", id);& A1 @; p7 l7 Z+ Q- o: m1 V
                        job();
) S8 Q( ^. K5 V  N  a                    },& Z/ f) I2 _+ m6 K! Q5 u7 X
                    Message::ByeBye =&gt; {0 t" v3 W- |# L
                        println!("ByeBye from worker[{}]", id);
9 @' Q: w, F( ?, }  J% e                        break* \1 g; x9 t7 i- ?
                    },' R# z& ]2 g# e* |
                }  1 |9 y! t9 d, ?- Z
            }  R/ Q( M5 R# f7 H4 i" v
        });$ k1 G8 h  S& Y! t/ I, S

3 C' X% x  x0 \3 Z' x# Z        Worker {
: x( A. U8 {, {- W) U  }9 a            _id: id,
% A% M" G0 t9 ?            t: Some(t),- |& F# W( A$ V
        }
3 ^0 j2 m0 m. T: ]    }
2 p' r! q7 @- \  |4 }( F* l}
3 w# J; [% R: K0 L1 n" [8 N9 u  S
4 t2 l0 B" j$ g5 F! T8 K7 p$ m( D- hpub struct Pool {3 R' [! n7 O( V
    workers: Vec&lt;Worker&gt;,' m: j* `2 Q2 y" k& J7 e
    max_workers: usize,
9 H9 j* Z/ |) g    sender: mpsc::Sender&lt;Message&gt;: n/ f; a6 D/ o
}
+ I5 _. E5 Z+ n" F3 D8 N* Y1 e5 e, N4 K& ?. v
impl Pool where {
5 q5 C7 d& {( `2 d8 ^    pub fn new(max_workers: usize) -&gt; Pool {- C1 E3 w* O3 ~/ i4 S0 f. ?
        if max_workers == 0 {
3 O+ s* L& h0 X5 G            panic!("max_workers must be greater than zero!")& ?1 b+ v* {, [4 _% a9 `
        }
1 J2 U8 q4 |- @        let (tx, rx) = mpsc::channel();
5 a) T) j1 r! X2 G0 q; u! u/ _! T0 q
        let mut workers = Vec::with_capacity(max_workers);. P2 S5 u0 V+ Q8 Q1 J% ^! b/ H
        let receiver = Arc::new(Mutex::new(rx));& l$ X7 Y' L) A& s% |7 j) J3 I
        for i in 0..max_workers {
/ k- }: w3 c$ Q# S8 J7 O            workers.push(Worker::new(i, Arc::clone(&amp;receiver)));
- ^; b7 _/ B9 |# V        }
% X2 H  A' ?( g
! u6 P0 h2 }( L7 V  _        Pool { workers: workers, max_workers: max_workers, sender: tx }
6 d& r6 {* K, ~" l! z  L" ]: b    }
5 k4 o, u% Q. N: s% B) W7 H) f6 ?   
# c( T2 n: s1 F' t* s* Y7 O    pub fn execute&lt;F&gt;(&amp;self, f:F) where F: FnOnce() + 'static + Send
8 p, `$ A) t9 z! R: \    {
2 U, U7 L4 ~- }; f7 f3 [; [+ r  |0 G/ ?5 c% O
        let job = Message::NewJob(Box::new(f));
6 }: Z* ^& b( B( e5 |7 t1 }- M" f+ X        self.sender.send(job).unwrap();6 D: G7 |( ]7 r$ d5 j7 S; F+ G" f% e6 v
    }
! ~# u( F; H! G- B}
% P) R: y' Q0 y/ F. `& |5 i
9 |( r0 c$ l+ f1 _" {impl Drop for Pool {
$ q% n  n/ }2 P6 x0 o) e    fn drop(&amp;mut self) {% o. Y5 g0 E6 u/ r1 A5 I
        for _ in 0..self.max_workers {
0 y& m" p9 \) i+ H            self.sender.send(Message::ByeBye).unwrap();
7 z; V4 T2 W# P- M        }
; h2 d: j" T% M) _, ~) e7 {        for w in self.workers {& a5 `7 @, n; {% z7 i
            if let Some(t) = w.t.take() {1 @$ `" k7 g) e+ ^# ]$ Y. B2 G
                t.join().unwrap();
' Y! e, [- {' R7 ?3 E            }
$ ^5 G+ N& P( C. T! M0 T        }
6 ^- Y" V. F2 o6 }* K0 U2 [9 O    }
4 k: [! H( S& R! |}- M/ n4 l( x' X# k# h
4 P3 P0 ~  l2 S# v$ y( A

8 t# ?4 t* t8 O* c4 ~#[cfg(test)]) ?/ @# W4 p5 X1 }! I
mod tests {
3 s9 t0 i$ X( G7 r( ]& G    use super::*;
0 t/ I0 K' m* ^2 g( _    #[test]
0 y& R, q7 W+ W: F, R. e    fn it_works() {
0 \' {/ _, d6 I! ~# ?) p, y' N1 Q        let p = Pool::new(4);9 M8 f- g/ d/ ]% n  W" T, s, a
        p.execute(|| println!("do new job1"));2 O' E; Z0 x6 u6 X$ \, n
        p.execute(|| println!("do new job2"));1 U5 b+ ]  o: }5 \9 y# |! ]4 w
        p.execute(|| println!("do new job3"));
. l) j, D3 X+ r/ L, {5 }" K        p.execute(|| println!("do new job4"));* |1 X$ \& D  G8 F! s# J+ T3 N
    }
' j8 R3 S! k- `# K! f4 t}3 L$ j2 z; {! r) w  P
</code></pre>/ i6 H, y! s8 y' O) U6 L9 L8 ^4 ~) H

0 [9 Z. `/ G+ P- g# S
回复

使用道具 举报

懒得打字嘛,点击右侧快捷回复 【右侧内容,后台自定义】
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

手机版|飞雪团队

GMT+8, 2026-4-30 09:43 , Processed in 0.109618 second(s), 21 queries , Gzip On.

Powered by Discuz! X3.4

Copyright © 2001-2021, Tencent Cloud.

快速回复 返回顶部 返回列表