飞雪团队

 找回密码
 立即注册
搜索
热搜: 活动 交友 discuz
查看: 18293|回复: 0

rust 实战 - 实现一个线程工作池 ThreadPool

[复制链接]

8920

主题

9008

帖子

2万

积分

管理员

Rank: 9Rank: 9Rank: 9

积分
29090
发表于 2022-2-12 14:35:42 | 显示全部楼层 |阅读模式
$ J# W" q0 {5 i. F# w7 N
<h1 id="如何实现一个线程池">如何实现一个线程池</h1>
# R. ], i- h$ ^$ u1 M7 T9 V<p>线程池:一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。 例如,对于计算密集型任务,线程数一般取cpu数量+2比较合适,线程数过多会导致额外的线程切换开销。</p>3 g1 G5 x7 s! ]8 {0 R
<p>如何定义线程池Pool呢,首先最大线程数量肯定要作为线程池的一个属性,并且在new Pool时创建指定的线程。</p>" T) O5 X: T, t' S2 l* \
<p>线程池Pool</p>
' K' G1 f# b# g+ ?1 F% L& A( T<pre><code>pub struct Pool {
& Y& h: t# v) H7 U# R( _  max_workers: usize, // 定义最大线程数
' Q+ G' m+ b$ u7 D, `1 S3 _% [}+ f" ~3 g, Q* l. N$ f* N* _. E, C

1 e( `! t2 w5 _# m* h1 pimpl Pool {* t& e( G, R; m
  fn new(max_workers: usize) -&gt; Pool {}) k: E$ K/ I% a9 K, _- x* y. L
  fn execute&lt;F&gt;(&amp;self, f:F) where F: FnOnce() + 'static + Send {}
2 I0 J" `+ w7 B! K3 y}
( i3 M$ l$ c6 J! D1 U" S# |  \+ b1 B% Y* ^0 C( B$ Y
</code></pre>
7 V/ l1 x6 P/ f7 D( l! [<p>用<code>execute</code>来执行任务,<code>F: FnOnce() + 'static + Send</code> 是使用thread::spawn线程执行需要满足的trait, 代表F是一个能在线程里执行的闭包函数。</p>
" @8 Y! t+ I& _6 v" U+ ]" r& v8 B8 h<p>另一点自然而然会想到在Pool添加一个线程数组, 这个线程数组就是用来执行任务的。比如<code>Vec&lt;Thread&gt;</code> balabala。这里的线程是活的,是一个个不断接受任务然后执行的实体。<br>
8 K- U) F0 `2 h3 o可以看作在一个线程里不断执行获取任务并执行的Worker。</p>8 Q& A" r" @) \' ?( l
<pre><code>struct Worker where# ^+ h! {( T, e* \8 {$ C
{+ _( e- L( p1 k' j2 R4 m" ^
    _id: usize, // worker 编号
  L. A. n, i, r5 s- O% ^0 {}
  O* ?: H+ M4 w5 l</code></pre>
& L$ u  p  e: B4 n* g  t+ O* ~<p>要怎么把任务发送给Worker执行呢?mpsc(multi producer single consumer) 多生产者单消费者可以满足我们的需求,<code>let (tx, rx) = mpsc::channel()</code> 可以获取到一对发送端和接收端。<br>6 V" V. i6 R6 X3 B
把发送端添加到Pool里面,把接收端添加到Worker里面。Pool通过channel将任务发送给多个worker消费执行。</p>
7 o: `8 u, i+ h3 c<p><strong>这里有一点需要特别注意,channel的接收端receiver需要安全的在多个线程间共享</strong>,因此需要用<code>Arc&lt;Mutex::&lt;T&gt;&gt;</code>来包裹起来,也就是用锁来解决并发冲突。</p>
3 c. D. ?; F  ]/ V# ?- F2 L<p>Pool的完整定义</p>
) [" C& ]0 l4 A2 x( ]! ]<pre><code>pub struct Pool {0 j) w' i& k1 |
    workers: Vec&lt;Worker&gt;,
; k% a( E6 X6 K! x) ?' }    max_workers: usize,( o3 ~% b% s* b6 {, D8 G# Q3 l7 |5 Y4 x
    sender: mpsc::Sender&lt;Message&gt;  K/ F: k* a3 c
}
/ m7 A1 N& Q) h" x$ l6 U' F7 Y6 ~</code></pre>
) }0 L. C* D' l. x; b6 z<p>该是时候定义我们要发给Worker的消息Message了<br>
, q8 A5 e; K. N' [: V. Y/ \$ S6 u定义如下的枚举值</p>
7 ]' f6 j) ]! N6 {<pre><code>type Job = Box&lt;dyn FnOnce() + 'static + Send&gt;;
1 _5 w  {3 {0 O- d) v1 n* \enum Message {
+ L/ K4 ?% B1 z0 a2 |    ByeBye,# f% J8 P# f3 @2 s
    NewJob(Job),
. G% y% ?# j3 `& B/ _3 c/ F- B$ ~; j}
; v% c; J# ^; \0 Y3 y. {0 [</code></pre>' l1 Q+ H7 O: \
<p>Job是一个要发送给Worker执行的闭包函数,这里ByeBye用来通知Worker可以终止当前的执行,退出线程。</p>
1 Z: V1 y9 L7 ~<p>只剩下实现Worker和Pool的具体逻辑了。</p>
& Q! l) S1 @6 W% ^<p>Worker的实现</p>
4 ?! R# ?% {, V& F" b  ?<pre><code>impl Worker! f* A6 b# M  r2 V
{
7 H, j1 k% ?% ?$ B4 i7 E; G    fn new(id: usize, receiver: Arc::&lt;Mutex&lt;mpsc::Receiver&lt;Message&gt;&gt;&gt;) -&gt; Worker {
9 A: [5 G0 p: s" [# Y4 y* L        let t = thread::spawn( move || {
1 }# Y* R- W" y$ K. Y6 Z  k, W            loop {
1 N4 K# j0 S+ ^8 d% {& H                let receiver = receiver.lock().unwrap();
  h( F0 l- y6 B+ v( k4 U4 G  C* {                let message=  receiver.recv().unwrap();
+ n# d1 D. M8 x% ?" ]- n6 ]! a                match message {3 \/ F- L# ~% g6 S& j# I
                    Message::NewJob(job) =&gt; {1 c8 Q" ~& w5 e8 s6 u. G: b/ P
                        println!("do job from worker[{}]", id);
6 L8 Z8 b: l8 N/ m  W! z) `3 ?; i                        job();
  T/ E1 C# U% v' q' n                    },  H' ]8 j2 |. ?, A3 z
                    Message::ByeBye =&gt; {" ^) q' h7 G2 j1 A. k/ p# d+ v
                        println!("ByeBye from worker[{}]", id);
; c: Z( U$ u- p! k' G2 u                        break
+ b, N" M* ?' q+ G  K- S/ ~7 M7 g                    },
& ^5 ?/ g1 }& [* R, U$ x                }  
( Y7 g( R- X% G+ y            }
6 _8 o) K. d! V3 H8 k! v        });
! N0 J7 l% W6 [7 G6 K8 k# G& G8 L' {, |5 Y- s5 x% \; g$ W
        Worker {$ g: ]7 f! }" V' q
            _id: id,8 h0 C& q6 r' @  d
            t: Some(t),
+ W; f8 i4 I& E8 q/ C- a! R        }8 L$ m/ {+ t7 ~. d# W
    }1 ]' l# p6 t- S
}
5 m- F0 e! s1 y4 E! X% m1 `</code></pre>
) B) B; r$ b  Y2 w" k' a<p><strong>let message = receiver.lock().unwrap().recv().unwrap();</strong> 这里获取锁后从receiver获取到消息体,然后let message结束后rust的生命周期会自动释放掉锁。<br>; H/ o" L1 H2 d. \
但如果写成</p>
' y- B3 ?5 M, X# D+ ]/ M3 R<pre><code>while let message = receiver.lock().unwrap().recv().unwrap() {
* W* q8 D1 b$ D+ q};
( r2 s# S9 X, v3 a* |2 F' U& m: k1 S</code></pre>' T8 A, e# k. Y2 ]) f, L4 {/ @7 c
<p>while let 后面整个括号都是一个作用域,要在这个作用域结束后,锁才会释放,比上面let message要锁定久时间。<br>
  Q6 v1 ~' W1 t/ s1 \& K% `rust的mutex锁没有对应的unlock方法,由mutex的生命周期管理。</p>
/ a4 O) P6 N% ~6 F+ W+ W: t<p>我们给Pool实现<code>Drop</code> trait, 让Pool被销毁时,自动暂停掉worker线程的执行。</p>+ x+ e/ @) I0 a+ a( t
<pre><code>impl Drop for Pool {6 T% f- j7 T0 s2 \
    fn drop(&amp;mut self) {
6 f, R. f  \4 b        for _ in 0..self.max_workers {
" T0 R! L/ A; `& u3 ?+ Z            self.sender.send(Message::ByeBye).unwrap();
; G# o# U; k% h5 ^        }
1 ~$ T9 Z6 p" g4 T9 X" R        for w in self.workers.iter_mut() {
$ S3 X# q+ X0 q' S6 h            if let Some(t) = w.t.take() {% z- n+ c4 ]3 g' V
                t.join().unwrap();; u9 g" T5 U& t. ]
            }
* ^. i/ ~- S4 j5 a0 `' y1 c        }
+ \/ x/ T2 o8 }( \3 f4 U, Z    }, H# A8 [' `+ p* Y0 r
}
6 x+ l/ n6 T; Y# O; ]. W/ @: |  m' \) c: |7 @3 U& V
</code></pre>
! A) w9 l0 {1 s. H7 P$ L$ i<p><strong>drop方法里面用了两个循环</strong>,而不是在一个循环里做完两件事?</p>
; r8 a& F: ?! _" P) u  L5 u<pre><code>for w in self.workers.iter_mut() {/ u- o6 v  _9 _( A& C0 w! T
    if let Some(t) = w.t.take() {' i! [; [) Y/ Z% j1 L  c  Y
        self.sender.send(Message::ByeBye).unwrap();0 r7 o6 D( F2 l: D; x! w' a. g
        t.join().unwrap();- F* ~; {) J3 p9 K0 l) ^
    }  A1 D9 B6 e( H  H( [' Q3 m. n6 [9 c2 w5 O
}
5 V9 k7 n9 d$ |* o/ s  e0 l1 A! L
: p# M' l2 ]  H) q</code></pre>
4 l* @' p$ u* s5 a' F" n4 u<p>这里面隐藏了一个会造成死锁的陷阱,比如两个Worker, 在单个循环里面迭代所有Worker,再将终止信息发送给通道后,直接调用join,<br>% m5 s& h. Y$ Y
我们预期是第一个worker要收到消息,并且等他执行完。当情况可能是第二个worker获取到了消息,第一个worker没有获取到,那接下来的join就会阻塞造成死锁。</p>; i) s" P: `: T; _) d- ]. F
<p><strong>注意到没有,Worker是被包装在Option内的</strong>,这里有两个点需要注意</p>, o3 u! ?: U4 {0 p3 u
<ol>% J1 R9 v& X5 M" I5 F* a* \* A
<li>t.join 需要持有t的所有权</li>& v9 \& \6 N& e1 k0 p0 Q3 p
<li>在我们这种情况下,self.workers只能作为引用被for循环迭代。</li>; H0 T! Q: M% D. V' _
</ol>
0 r5 O0 H% i: S* P<p>这里考虑让Worker持有<code>Option&lt;JoinHandle&lt;()&gt;&gt;</code>,后续可以通过在Option上调用take方法将Some变体的值移出来,并在原来的位置留下None变体。<br>. W) N! r* c& e* W6 U
换而言之,让运行中的worker持有Some的变体,清理worker时,可以使用None替换掉Some,从而让Worker失去可以运行的线程</p>: a- Z* f% @9 C1 h  q! _
<pre><code>struct Worker where
) h. r6 n/ ^7 E' _: E7 W- g% O{
* x7 A7 k( y3 e4 d- a* l0 z0 F! @    _id: usize,
' G3 [# a( h) K' y( G1 j    t: Option&lt;JoinHandle&lt;()&gt;&gt;,
; X1 _, m9 _& m/ [}
/ a& H1 N$ W% k4 k' k+ X2 ]</code></pre>$ Z  k- D2 |4 h0 ?
<h1 id="要点总结">要点总结</h1>% a+ _7 _2 |/ d# R/ F9 J. j5 r/ J
<ul>
/ S. D, J& x! s2 V<li>Mutex依赖于生命周期管理锁的释放,使用的时候需要注意是否逾期持有锁</li>' G# w% l5 F9 s6 C8 M2 ^1 L
<li><code>Vec&lt;Option&lt;T&gt;&gt;</code> 可以解决某些情况下需要T所有权的场景</li>
2 V. P5 p: p/ P0 C8 l; b</ul>+ o7 F# W6 w/ J4 @1 n
<h1 id="完整代码">完整代码</h1>$ L( |1 g) f- a% |
<pre><code>use std::thread::{self, JoinHandle};0 ~, Q% _5 y7 z/ D
use std::sync::{Arc, mpsc, Mutex};
3 e1 C- n) f. n* w5 @0 m1 i  B9 C/ }) B
+ v+ T1 C. l. c7 M! y, z
type Job = Box&lt;dyn FnOnce() + 'static + Send&gt;;0 @' b' k6 U" @
enum Message {$ R; ~9 d$ e7 R1 G9 N- M' t6 Z& n
    ByeBye,5 G5 Y' m  G0 e" l9 P" u
    NewJob(Job),
+ i! z0 f6 V1 Y. X# P- X}9 u' U. M3 }/ r, m( u* K7 M

& L1 z* Z+ e4 w9 K5 ?struct Worker where
# |/ O1 P$ t& a7 {. G) {$ q{
' L5 T8 |) X8 b% {    _id: usize,
- [+ R" `9 F: n# {; r" [    t: Option&lt;JoinHandle&lt;()&gt;&gt;,- e5 c. |0 y3 x; C; O: x
}
# J9 P/ p4 h7 r: q9 b' p3 j9 f+ O0 r
impl Worker) O. @+ H6 U. D( Y. E$ ~, k/ r4 K
{
: ^: x' b4 `9 h    fn new(id: usize, receiver: Arc::&lt;Mutex&lt;mpsc::Receiver&lt;Message&gt;&gt;&gt;) -&gt; Worker {& u9 ^  o. h8 S: w1 o
        let t = thread::spawn( move || {0 t8 k) M" G8 `* d
            loop {- c9 U. h- P! P2 G  Q1 M- {$ R" K: C
                let message = receiver.lock().unwrap().recv().unwrap();
: |. o8 v+ b% ~! J0 s* s                match message {
9 Z3 T' j7 \, I2 w                    Message::NewJob(job) =&gt; {
9 m, _; E0 Y' i! X" n                        println!("do job from worker[{}]", id);
- t$ i' \: K; _+ K& C; u6 V, s4 P                        job();
7 d& F( [0 l' y) U1 c% w# [" b6 R* `* m6 ^                    },
) f+ r$ {2 A# `2 a  x+ |' b                    Message::ByeBye =&gt; {
+ G, J6 `, v, B" a1 }" F                        println!("ByeBye from worker[{}]", id);2 ^! R' g* A# v+ q/ _2 D8 ]
                        break
+ _: X9 \5 z; O* W8 B                    },
8 L* c& f( e3 w* G) h                }  
5 l  [' H0 \6 I1 V9 h' R            }
* D3 ]2 L% E1 j: {* H5 d        });$ C# p. c4 C; J. D: r* I, i
  }) Z! q0 P5 n7 Q
        Worker {& ~6 o- Q* @; K
            _id: id,
) q! N9 ^; k3 l6 Z' [# N4 p            t: Some(t),% P1 M7 N* p, T5 G. a0 }1 c. Z
        }
+ }. m3 L$ L9 u0 d' x2 [  w    }2 |2 k# ^6 c  d$ |* G/ V
}
; M$ O3 w6 ?* k0 t6 e8 A* H0 e8 F
pub struct Pool {. ^$ z" ?1 x8 P5 ~9 j% B& S7 X6 \
    workers: Vec&lt;Worker&gt;,
. |6 c' P9 W: D% |/ i4 }4 L! W0 M    max_workers: usize,
: P6 w" A- H' l. ^, [! C; v    sender: mpsc::Sender&lt;Message&gt;9 X6 D" V* U# ]7 r% L1 `
}
: H7 T2 ^7 x! a( Z3 m+ i% W; Q1 h0 {7 [, u# V1 f. F% x# l9 \# `
impl Pool where {
8 \1 D* `$ M$ ~. `( I    pub fn new(max_workers: usize) -&gt; Pool {/ H7 I5 x# `! b& J9 d
        if max_workers == 0 {
. Y, O! G7 ^# q9 O; g6 L            panic!("max_workers must be greater than zero!")4 m9 q5 P9 f3 B- f  B. D. U6 F  R- O
        }
9 \8 \. z4 Y; k        let (tx, rx) = mpsc::channel();& v+ v* @: p1 ?& s4 Y

6 a2 E4 H4 I" A5 ?4 G1 o4 X  J        let mut workers = Vec::with_capacity(max_workers);
, N0 y- B- d* N+ Y        let receiver = Arc::new(Mutex::new(rx));! M+ _7 X) v1 l' _
        for i in 0..max_workers {
: _! q* p% y/ ~! J' Z            workers.push(Worker::new(i, Arc::clone(&amp;receiver)));
* o+ y: R, d" i# w' Y2 v        }$ O1 O, V* N6 @5 ]1 q$ X; B/ P

/ p. e7 S8 h; v8 D* m6 d        Pool { workers: workers, max_workers: max_workers, sender: tx }
$ |5 L; n/ ^7 s    }& s: ~7 M3 @$ G$ ^) S
    8 ]) k& M. t- _$ v: Z
    pub fn execute&lt;F&gt;(&amp;self, f:F) where F: FnOnce() + 'static + Send
( s; ~8 X  a: n/ F/ a    {
8 {2 M) L- ^+ m; ?& [
" G5 n! x7 }- X* B% j- k        let job = Message::NewJob(Box::new(f));
+ ]; C* W+ A$ k, |3 s6 c        self.sender.send(job).unwrap();
! a/ L$ H& r7 \. w5 U    }
6 i# \0 Q! y, E$ d$ X}- k+ C3 Y3 S$ a: r" v7 Z
6 B. b: x& h* X' n
impl Drop for Pool {
" n3 k1 g) w4 \6 I$ {    fn drop(&amp;mut self) {5 u3 _: _+ f+ i% d. |6 A
        for _ in 0..self.max_workers {
0 l* U2 _2 ~$ ?, O            self.sender.send(Message::ByeBye).unwrap();. u( w% H# l. K* K- N$ V5 M* T& n2 \
        }
; W- M* s+ `0 F: d* t4 A        for w in self.workers {' C4 i1 a5 q5 H' H1 C6 c8 o1 M
            if let Some(t) = w.t.take() {
! b5 Z  H1 s! ~, S8 P& O                t.join().unwrap();& o( _2 B! C. K) ~6 O# E0 J
            }
# p# y) ~* b* ]" t2 U. V        }8 z4 {  y  S% _* T
    }
9 }. k2 c* k* |- X/ A- x9 s4 `}6 w" s1 {2 C# c8 I" `4 |  B1 W5 @2 Z; G

; i" i# i' j- v
4 N; }1 \/ e% M9 H! `! `#[cfg(test)]8 `( ?) f( ~* {  Q5 L
mod tests {; P; I8 U0 {7 t& m3 j0 Y: u/ J
    use super::*;* v* R5 ~- N( v$ {5 \
    #[test]4 n; k% T$ q7 o, \& @. j" M
    fn it_works() {
$ f8 y- U/ z4 F3 W        let p = Pool::new(4);
4 T  E  ~* r5 T  j" j- F# b8 R        p.execute(|| println!("do new job1"));
3 L  K3 R0 J  k  j        p.execute(|| println!("do new job2"));
7 d- P  b! E0 z8 ]5 e        p.execute(|| println!("do new job3"));1 `, a9 K5 c  X& d6 ]2 x4 k
        p.execute(|| println!("do new job4"));
3 s( [6 I/ D' L1 V) b    }9 @1 F; ~9 n. Y0 r/ G* e
}3 @/ p) h9 C3 q- T; ]
</code></pre>( I; G+ ~0 c1 T" `/ m7 x

# h% v; D0 z# ?: l; d
回复

使用道具 举报

懒得打字嘛,点击右侧快捷回复 【右侧内容,后台自定义】
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

手机版|飞雪团队

GMT+8, 2026-7-5 00:25 , Processed in 0.073353 second(s), 22 queries , Gzip On.

Powered by Discuz! X3.4

Copyright © 2001-2021, Tencent Cloud.

快速回复 返回顶部 返回列表