飞雪团队

 找回密码
 立即注册
搜索
热搜: 活动 交友 discuz
查看: 14862|回复: 0

rust 实战 - 实现一个线程工作池 ThreadPool

[复制链接]

8059

主题

8147

帖子

2万

积分

管理员

Rank: 9Rank: 9Rank: 9

积分
26507
发表于 2022-2-12 14:35:42 | 显示全部楼层 |阅读模式
! n5 r& q# G7 g, k: R) ~
<h1 id="如何实现一个线程池">如何实现一个线程池</h1>7 b; a2 X/ B" c5 w3 O4 C- {
<p>线程池:一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。 例如,对于计算密集型任务,线程数一般取cpu数量+2比较合适,线程数过多会导致额外的线程切换开销。</p>9 Z+ f4 l; p9 I' _; v7 Z
<p>如何定义线程池Pool呢,首先最大线程数量肯定要作为线程池的一个属性,并且在new Pool时创建指定的线程。</p>
4 Z) D0 q* v2 u1 P: g0 f# g, j" m<p>线程池Pool</p>
# ?7 i  M2 h* l2 I<pre><code>pub struct Pool {2 s  w9 u  C' ^# U6 Z* Z
  max_workers: usize, // 定义最大线程数
8 c# A& y  V+ X}$ h, Z+ _) r+ i7 Y0 ^. K

( M/ U6 a, r8 r' o$ H3 Simpl Pool {
: G1 r" u! x' h" @  fn new(max_workers: usize) -&gt; Pool {}
- ^& j' M7 v! @( y7 K$ q: u: u  fn execute&lt;F&gt;(&amp;self, f:F) where F: FnOnce() + 'static + Send {}
- y4 _, _; i$ |' x: H0 v}+ [' z: Q$ U7 R! Z

5 A/ F  t+ s, h1 F# g2 J( i' w</code></pre>
" X+ |- R1 ~6 e6 E+ G% n5 Y+ B<p>用<code>execute</code>来执行任务,<code>F: FnOnce() + 'static + Send</code> 是使用thread::spawn线程执行需要满足的trait, 代表F是一个能在线程里执行的闭包函数。</p>
/ x7 o; j2 E, m6 V7 E: E0 i4 e<p>另一点自然而然会想到在Pool添加一个线程数组, 这个线程数组就是用来执行任务的。比如<code>Vec&lt;Thread&gt;</code> balabala。这里的线程是活的,是一个个不断接受任务然后执行的实体。<br>0 r! `2 Y) I: V1 ]7 }
可以看作在一个线程里不断执行获取任务并执行的Worker。</p>& _" W+ J) V6 X2 v. @( t
<pre><code>struct Worker where5 d. y# O0 P2 H& g. i/ F; W
{
: U( _' D0 h1 v; t    _id: usize, // worker 编号
. q5 m9 j5 i. ~. l8 l/ F7 B}
/ z3 x: L) F% n2 S, }4 B# p</code></pre>" F- F3 N, H. `5 P) R3 \
<p>要怎么把任务发送给Worker执行呢?mpsc(multi producer single consumer) 多生产者单消费者可以满足我们的需求,<code>let (tx, rx) = mpsc::channel()</code> 可以获取到一对发送端和接收端。<br>
8 C4 y' S; `( f- W1 F' W) U' }; m把发送端添加到Pool里面,把接收端添加到Worker里面。Pool通过channel将任务发送给多个worker消费执行。</p>
4 |- R7 ?$ E/ g0 l' J<p><strong>这里有一点需要特别注意,channel的接收端receiver需要安全的在多个线程间共享</strong>,因此需要用<code>Arc&lt;Mutex::&lt;T&gt;&gt;</code>来包裹起来,也就是用锁来解决并发冲突。</p>
. a% w5 i( e' S8 K- f+ W<p>Pool的完整定义</p>
( g3 i+ u' a# X<pre><code>pub struct Pool {
% M, e; z: ^  E2 l( e/ }1 y  Y    workers: Vec&lt;Worker&gt;,
3 L" ], M  Y) ^    max_workers: usize,! H- s* ?: _) r+ l! \7 m8 K
    sender: mpsc::Sender&lt;Message&gt;
9 E+ [1 @6 j, G; w& n$ F9 O( r) e}  H0 ]$ P- J' b' h, Y& ^
</code></pre>
3 t4 Z' @; R& z2 y8 O8 g" H+ M<p>该是时候定义我们要发给Worker的消息Message了<br>
( `5 r0 {' l1 v) V2 x定义如下的枚举值</p>
% s5 X1 v* x9 n" U<pre><code>type Job = Box&lt;dyn FnOnce() + 'static + Send&gt;;
8 i& @* m. U9 R, o' O: oenum Message {
; u& e# ~0 l. q$ q    ByeBye,
& p$ k8 ]  S* R0 q0 |+ y  V+ l    NewJob(Job),! `& b7 p. }# }1 u% r
}
% l# a% v1 A  o2 J3 Y$ B7 T</code></pre>
! h  H) h# k$ M7 z( R<p>Job是一个要发送给Worker执行的闭包函数,这里ByeBye用来通知Worker可以终止当前的执行,退出线程。</p>' ?  [* x) m) p4 z+ z5 G1 h7 `
<p>只剩下实现Worker和Pool的具体逻辑了。</p>7 F6 P- i# ~/ R( f: U8 _& R
<p>Worker的实现</p>
" f$ f7 u; l# H: @<pre><code>impl Worker
5 _! |( v, l7 t+ z4 K+ E{: |2 w7 X, h. K
    fn new(id: usize, receiver: Arc::&lt;Mutex&lt;mpsc::Receiver&lt;Message&gt;&gt;&gt;) -&gt; Worker {0 n' T7 C  {( b3 A# z
        let t = thread::spawn( move || {
( F0 D% E3 \4 \. O8 F/ s* v            loop {
& t8 L( ?5 c) o4 t% h! H0 h6 V; g                let receiver = receiver.lock().unwrap();8 c1 b# }% h+ c9 P1 F
                let message=  receiver.recv().unwrap();
. M7 x/ z% G/ K. C0 j. U; W% H                match message {
* C/ \6 V( p" |5 t3 K2 G                    Message::NewJob(job) =&gt; {; F+ ^0 J3 f4 p. ^2 s
                        println!("do job from worker[{}]", id);; p# R& e7 Y7 \7 @8 j
                        job();) Q5 W6 I, F3 w
                    },; A5 p- N. o0 u) G& I2 ~: W/ u) D* J4 P
                    Message::ByeBye =&gt; {
- |5 Q( y4 Q9 a" g8 m0 `                        println!("ByeBye from worker[{}]", id);, {% u8 I! M) d, i8 z5 B0 y
                        break
* `8 y5 u5 a8 Z) D6 [8 A                    },$ P6 H- Y0 }6 Q# G
                }  
8 A! k1 e7 w: [: S            }
9 _1 V2 ?2 W4 B9 F  [( o        });! _5 R9 b& F: b3 F5 c% F0 j* t
; Z) V- ~  I& [  K
        Worker {
6 e7 B- D5 W0 z$ Y# W2 }) [            _id: id,
9 f; S1 y; A" i            t: Some(t),
' E: i! ~% I" d& F        }
5 G' o8 B2 M: A" _8 d! t    }
' h0 g+ k! F* t! `: J/ z2 D  b  ^" c}) Q6 {8 j! b7 A6 _" v
</code></pre>( u3 D9 e3 V. m7 r+ K- h+ u
<p><strong>let message = receiver.lock().unwrap().recv().unwrap();</strong> 这里获取锁后从receiver获取到消息体,然后let message结束后rust的生命周期会自动释放掉锁。<br>- N2 D  p/ m+ w4 A% W7 v- e
但如果写成</p># O7 ?6 b; j  F( i% L3 r" d
<pre><code>while let message = receiver.lock().unwrap().recv().unwrap() {& ^/ E* b7 x) R9 v- F3 K
};
8 c. L& E8 V) O0 J1 c" x- _</code></pre>
/ z2 q, }( q- R  h: {" {<p>while let 后面整个括号都是一个作用域,要在这个作用域结束后,锁才会释放,比上面let message要锁定久时间。<br>8 r0 ^4 p: |4 W: F
rust的mutex锁没有对应的unlock方法,由mutex的生命周期管理。</p>
. f8 y* r( u! a  Y- `<p>我们给Pool实现<code>Drop</code> trait, 让Pool被销毁时,自动暂停掉worker线程的执行。</p>( @) ^  c/ A4 Q* a5 j1 r8 C# G( C
<pre><code>impl Drop for Pool {$ J# q& |; E  @, R6 s" `+ q
    fn drop(&amp;mut self) {0 e" n" j" q# n* B: q/ y& q# o
        for _ in 0..self.max_workers {/ o0 D+ q) E- [. T9 R+ U+ P
            self.sender.send(Message::ByeBye).unwrap();
& S6 @+ ]3 h. Y9 s        }
$ W" Q+ }( V! T$ r( r0 O        for w in self.workers.iter_mut() {- G: J# ^+ f4 @) G5 M+ Z
            if let Some(t) = w.t.take() {
) }; s3 d  x/ }) `$ n( z9 q                t.join().unwrap();4 u+ Y$ i- y& V: S$ C* R
            }
; l3 s( [4 s" h; O. |2 E        }
% \% U, m/ t; M) @; W  O, Q' \$ D    }
& {4 [. b1 d4 m  o}
8 T* M: O3 O& _) S2 R0 C$ k! @6 B# y1 |
</code></pre>
$ \. k6 @" p$ v" r- ]" Z% s<p><strong>drop方法里面用了两个循环</strong>,而不是在一个循环里做完两件事?</p>
! M5 s6 ~0 h7 b4 d<pre><code>for w in self.workers.iter_mut() {) \7 Y: B# `0 g6 L
    if let Some(t) = w.t.take() {$ j" T" b. @6 g! u* U( ~
        self.sender.send(Message::ByeBye).unwrap();. a/ t! X6 l# c0 a5 Q( S
        t.join().unwrap();
& A1 B( T* h- J7 Q9 v    }" D# J6 U6 n7 A$ P! B* \
}
$ b) {, O3 Z! B1 o/ ?* g0 R% I/ y# {8 }) C$ \) G/ u! a' O
</code></pre>
' X- N& \+ B, q2 Z8 c- S# W<p>这里面隐藏了一个会造成死锁的陷阱,比如两个Worker, 在单个循环里面迭代所有Worker,再将终止信息发送给通道后,直接调用join,<br>* Q" f9 K$ Y6 n/ Q- u
我们预期是第一个worker要收到消息,并且等他执行完。当情况可能是第二个worker获取到了消息,第一个worker没有获取到,那接下来的join就会阻塞造成死锁。</p>
& H$ Z9 h- [) X5 g1 f<p><strong>注意到没有,Worker是被包装在Option内的</strong>,这里有两个点需要注意</p>
7 X1 \- U3 J' U% r  r<ol>! H4 e1 T# }% C7 H2 j6 d
<li>t.join 需要持有t的所有权</li>' M: c! Y+ r$ V' j
<li>在我们这种情况下,self.workers只能作为引用被for循环迭代。</li>
" C) m  g4 L$ |: G/ u4 R0 t6 Q</ol>
* U- [3 p/ c6 X3 b<p>这里考虑让Worker持有<code>Option&lt;JoinHandle&lt;()&gt;&gt;</code>,后续可以通过在Option上调用take方法将Some变体的值移出来,并在原来的位置留下None变体。<br>
: c) M/ F6 V+ Z换而言之,让运行中的worker持有Some的变体,清理worker时,可以使用None替换掉Some,从而让Worker失去可以运行的线程</p>
; g8 }. i. A3 F( c; o" h, r/ a/ @<pre><code>struct Worker where, w) @3 @" S& l. c: ^6 N
{+ {+ J* @3 s+ l# H* v/ L
    _id: usize,* M8 l$ |) s" t: K, S" Q; R- B
    t: Option&lt;JoinHandle&lt;()&gt;&gt;,
8 s# ?4 N  N- X1 e! s% ?}
0 _$ e8 C9 ?4 r</code></pre>
$ F! k9 M% u8 J) _% a8 S( P+ R& X$ l<h1 id="要点总结">要点总结</h1>
8 ~' U$ a1 C, ]' x<ul>3 j) I+ d' h) p7 ~7 ~8 @* L
<li>Mutex依赖于生命周期管理锁的释放,使用的时候需要注意是否逾期持有锁</li># ]; V) h$ s7 v6 O* v) K, x3 [
<li><code>Vec&lt;Option&lt;T&gt;&gt;</code> 可以解决某些情况下需要T所有权的场景</li>
. B$ b2 d9 W. J! z% r0 V% L6 `, e</ul>
. F$ o# x9 J- d) d<h1 id="完整代码">完整代码</h1>1 A$ _7 `4 }6 l' a, p  U3 ~: b
<pre><code>use std::thread::{self, JoinHandle};8 n& b( k6 @$ |3 A& [; I( n
use std::sync::{Arc, mpsc, Mutex};
. G' Y7 ]' Y4 x  h/ @. g
2 b. y% H. u! \% ?( b, p+ @! t! I% ^& E+ |' G# A# B6 f' Z! m
type Job = Box&lt;dyn FnOnce() + 'static + Send&gt;;
3 c% ?5 @# f3 h& s* uenum Message {
  `8 q( ?2 f# F' f    ByeBye,$ A1 t6 H4 y$ k6 N
    NewJob(Job),
/ X1 B, ~8 S( m" S. o}
1 l$ N0 n) m5 `' `1 t
/ k6 U! Y( g. Q  s( g( v/ r# s( _struct Worker where
  T" ^4 V5 V% |, [) H$ I{
" H9 T3 f1 I) `! n1 R" s    _id: usize,
$ t7 t' |/ U# G# G* O    t: Option&lt;JoinHandle&lt;()&gt;&gt;,
0 z. I+ ^$ z7 {" C4 N  m9 D}, Y1 G1 w; D7 \# [3 R+ a7 {% O) ]
5 O) c3 ?2 W! h: o
impl Worker
4 a  j1 p9 p  D( B8 V{4 e. X4 @% X8 u3 P3 I3 @
    fn new(id: usize, receiver: Arc::&lt;Mutex&lt;mpsc::Receiver&lt;Message&gt;&gt;&gt;) -&gt; Worker {2 T9 y# S) n7 k0 X8 E/ P
        let t = thread::spawn( move || {$ u8 K4 B! Z6 \3 _6 Q/ ~+ V
            loop {! r( K. R, v# K# O4 y$ O
                let message = receiver.lock().unwrap().recv().unwrap();
6 e/ X& |4 v* P- f" t" R' v3 v                match message {3 K' q5 |, b- o! D7 t" \
                    Message::NewJob(job) =&gt; {
4 p1 f0 H  d& Z4 Y                        println!("do job from worker[{}]", id);" ]+ z/ X( u4 |! T, U# B
                        job();  t% w, M8 |0 m  f
                    },
- c; H# P7 N' p$ u                    Message::ByeBye =&gt; {0 V3 g0 }" d& x' s0 d7 U
                        println!("ByeBye from worker[{}]", id);  }8 w. o# q0 y* N# F. L( F
                        break
/ `8 K1 M4 A6 h                    },1 z' N6 a5 Y* W
                }  
7 e# e7 b  o! h            }
0 {$ \0 }3 ^  V+ r* G6 W0 U, Q        });
3 V5 G: B2 z9 o8 l3 b7 ]; g1 l/ U2 y3 G  j7 X5 g! W
        Worker {
! \( C3 u% \3 W, S) K# d4 s            _id: id," l' k" ~0 W" p! }0 ~4 x
            t: Some(t),& r: P3 \* [/ J
        }: i- m" \% j- S7 _0 ?# g3 M3 O
    }
% J. |% u; g" k( p5 S7 Z3 ?) b}
4 T4 f) N0 ]- Q! V
6 ^! \8 m' ?; }; Epub struct Pool {
0 w' ~  J  m/ F0 o( v* m' T    workers: Vec&lt;Worker&gt;,
  y3 P+ ^. S. O& U    max_workers: usize,$ ~+ E, g1 A+ T9 E
    sender: mpsc::Sender&lt;Message&gt;
1 \, L/ l0 a; A9 L}" C* X8 k% t+ [) m# A2 E5 O

( J# A9 k- z$ n& h. vimpl Pool where {
5 g" C& z% X* ]% I( u    pub fn new(max_workers: usize) -&gt; Pool {4 o& C: e6 w! Z) m( i1 Z
        if max_workers == 0 {+ @" h+ F! a' a6 @: O' o; M
            panic!("max_workers must be greater than zero!")& L- S, S8 Y5 y8 _" T. U3 q5 C% d; Q
        }& f2 R9 j* r1 u5 w$ @
        let (tx, rx) = mpsc::channel();
6 s- ]2 `5 ~  S% @6 e+ }$ J5 X  u2 S9 F
        let mut workers = Vec::with_capacity(max_workers);4 D/ D' K) y$ k- z4 a6 T$ u9 Y
        let receiver = Arc::new(Mutex::new(rx));
4 c( Q# O( y$ V) a        for i in 0..max_workers {
9 V3 }# D% }3 t0 }6 r            workers.push(Worker::new(i, Arc::clone(&amp;receiver)));" @6 ]7 t1 Z( ?5 K" N
        }# k6 [$ @7 W; V- e- w, l
  o2 a; U! [/ C* Y, d) L% ~
        Pool { workers: workers, max_workers: max_workers, sender: tx }
5 r0 e" u3 a2 c# P! ~; N    }6 M" f6 d) _' C8 A
   
9 Z: d: k# \( {  R$ ~    pub fn execute&lt;F&gt;(&amp;self, f:F) where F: FnOnce() + 'static + Send
. H  N  x& L3 v! l! Z    {! |; ^* y4 i0 l, k  ]
0 j% ~" x1 g1 U4 H0 E7 J4 k
        let job = Message::NewJob(Box::new(f));
8 n* ^6 K! p2 N; b        self.sender.send(job).unwrap();
2 `, b" t/ m. G    }: R& h( [4 Z. D8 {1 i9 D6 q
}
; _; M) M, Q  ?3 r
* H. u4 ?3 ?/ ^+ @' |6 o, T* g& vimpl Drop for Pool {1 |8 m9 o& r9 p9 H" e7 [6 u; r
    fn drop(&amp;mut self) {7 Z" b1 k2 P  j
        for _ in 0..self.max_workers {, E/ x# S. U8 _  p) [2 r) C! {
            self.sender.send(Message::ByeBye).unwrap();2 T7 ~: c9 f: J
        }  |* U. k% ~* d( u) s$ w8 `: V
        for w in self.workers {
% c; g) B; w! R3 X            if let Some(t) = w.t.take() {% t: V( R5 {" _$ h& ]5 O# U
                t.join().unwrap();. g2 h4 R2 x6 i
            }
3 S1 z" D. v4 G' L4 k  Y$ W        }& E: v; q  u* ]! H- C' R) \
    }
) L+ Y3 }8 s8 D5 ~; s2 ~}% w( y5 Q1 P6 m8 U( I  w  F

2 p! c; ~4 E+ J6 E
; P! ]: x% m( u#[cfg(test)]
' R" `8 w+ J' m8 hmod tests {- d2 \& O' @) _
    use super::*;
+ v% ?/ Q! A, y5 z7 F0 J2 D* G    #[test]* N4 I, a3 M8 ~* a# k4 O
    fn it_works() {( C, ^  L0 {4 a& X* D
        let p = Pool::new(4);2 n  J6 Z3 g1 ^! q5 L. B
        p.execute(|| println!("do new job1"));9 v) W. S! n1 Q6 ^1 t9 z/ f
        p.execute(|| println!("do new job2"));# l7 V( @) ^+ I6 _. p4 U$ y
        p.execute(|| println!("do new job3"));  g5 ~' F; A3 Q: x
        p.execute(|| println!("do new job4"));+ ~2 k1 s( Y& D" ~, a+ x  B8 W
    }
' w6 s# U* `7 a* R: t8 b* j}# C4 C& D% S' Y* w9 a
</code></pre>
5 w% r% p+ ]9 B1 w) U: |( H/ @
* J3 P; K) a1 W
回复

使用道具 举报

懒得打字嘛,点击右侧快捷回复 【右侧内容,后台自定义】
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

手机版|飞雪团队

GMT+8, 2025-12-15 00:17 , Processed in 0.064725 second(s), 22 queries , Gzip On.

Powered by Discuz! X3.4

Copyright © 2001-2021, Tencent Cloud.

快速回复 返回顶部 返回列表