飞雪团队

 找回密码
 立即注册
搜索
热搜: 活动 交友 discuz
查看: 12373|回复: 0

rust 实战 - 实现一个线程工作池 ThreadPool

[复制链接]

7587

主题

7675

帖子

2万

积分

管理员

Rank: 9Rank: 9Rank: 9

积分
25091
发表于 2022-2-12 14:35:42 | 显示全部楼层 |阅读模式
/ l$ O* B7 O* j9 D! f
<h1 id="如何实现一个线程池">如何实现一个线程池</h1>" c: u5 o; K2 |6 s
<p>线程池:一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。 例如,对于计算密集型任务,线程数一般取cpu数量+2比较合适,线程数过多会导致额外的线程切换开销。</p>
/ \; m" y! Q: p<p>如何定义线程池Pool呢,首先最大线程数量肯定要作为线程池的一个属性,并且在new Pool时创建指定的线程。</p>
1 i: _# e, [( K0 B3 a<p>线程池Pool</p>* s% u9 U# [( _1 h6 ]: I
<pre><code>pub struct Pool {" M; X. e$ I3 u# i8 j7 ]/ a
  max_workers: usize, // 定义最大线程数2 |/ X* d7 \6 W8 u
}5 C0 P' ]  T* f
8 E8 k, O+ z4 n2 R8 D  }3 C, g4 B
impl Pool {
$ E$ B2 T( @$ `" X  fn new(max_workers: usize) -&gt; Pool {}
0 v  \# Y) Z) r4 X  fn execute&lt;F&gt;(&amp;self, f:F) where F: FnOnce() + 'static + Send {}
' S$ ?5 Q; e2 s* G}# K6 F* C. v) r, W' t. [
; n8 e6 A, x' C: ]
</code></pre>
. g/ N* d; `$ E+ f# u* O<p>用<code>execute</code>来执行任务,<code>F: FnOnce() + 'static + Send</code> 是使用thread::spawn线程执行需要满足的trait, 代表F是一个能在线程里执行的闭包函数。</p>1 ~6 V2 ~- ]1 C: f- e' L6 E
<p>另一点自然而然会想到在Pool添加一个线程数组, 这个线程数组就是用来执行任务的。比如<code>Vec&lt;Thread&gt;</code> balabala。这里的线程是活的,是一个个不断接受任务然后执行的实体。<br>5 Z/ n' _$ H; M0 {4 J, S) w
可以看作在一个线程里不断执行获取任务并执行的Worker。</p>
- q0 S8 X# V% S, B<pre><code>struct Worker where
: \  l8 h7 ]7 W; g: B" s1 c6 c/ E{
7 }. o8 ]+ N+ q; f/ t- c    _id: usize, // worker 编号8 r. i$ z: v% x3 A
}
$ p0 n3 G1 b- p! c/ X</code></pre>
  }) j: m1 Y. x" M& o<p>要怎么把任务发送给Worker执行呢?mpsc(multi producer single consumer) 多生产者单消费者可以满足我们的需求,<code>let (tx, rx) = mpsc::channel()</code> 可以获取到一对发送端和接收端。<br>, G* `: z; C! R# s2 m  V
把发送端添加到Pool里面,把接收端添加到Worker里面。Pool通过channel将任务发送给多个worker消费执行。</p>
7 l4 }$ ?  q/ Q" O  X3 p* Z  C<p><strong>这里有一点需要特别注意,channel的接收端receiver需要安全的在多个线程间共享</strong>,因此需要用<code>Arc&lt;Mutex::&lt;T&gt;&gt;</code>来包裹起来,也就是用锁来解决并发冲突。</p>  z% v' T) f& P4 L, R* C
<p>Pool的完整定义</p>
; Q: h; _( u8 J' D% V) s  f) o, d<pre><code>pub struct Pool {3 b" n+ o0 d, p7 k; ~! p
    workers: Vec&lt;Worker&gt;,
+ A/ o  z# P- m    max_workers: usize,
0 m0 o+ D0 q( W  m4 M' m6 S    sender: mpsc::Sender&lt;Message&gt;' e: u$ e% V" S1 a2 n& D
}& i+ U7 F9 S' }; T) }% G
</code></pre>+ x& W" q% ]  G+ y7 e6 I
<p>该是时候定义我们要发给Worker的消息Message了<br>6 [, [1 J: l4 l  s
定义如下的枚举值</p>
8 Y, v: K, K5 ]6 r: L+ T<pre><code>type Job = Box&lt;dyn FnOnce() + 'static + Send&gt;;
1 K/ ?) x' c( i) D0 ienum Message {
' S0 d( \( I/ G/ A    ByeBye,
. R" Y( u/ l! k& b) U( Y6 P    NewJob(Job),
2 U2 X. k) W. ]& {}: [* i0 x# m2 s8 k$ b+ t( x
</code></pre>7 j) x' |* Z7 W3 S
<p>Job是一个要发送给Worker执行的闭包函数,这里ByeBye用来通知Worker可以终止当前的执行,退出线程。</p>$ J7 C7 W3 d  @* v
<p>只剩下实现Worker和Pool的具体逻辑了。</p>
$ U/ c) J, U" s6 G8 x  f) N; F2 o<p>Worker的实现</p>( |6 Z$ S5 |% E- N' d5 c7 L# @( @
<pre><code>impl Worker9 B* f+ F  V8 `/ ?; P* [* p( o
{, @4 N2 x8 P7 v6 ?5 F
    fn new(id: usize, receiver: Arc::&lt;Mutex&lt;mpsc::Receiver&lt;Message&gt;&gt;&gt;) -&gt; Worker {
* W* U/ J% o! B+ n2 r6 p- g        let t = thread::spawn( move || {5 N, T( @& x0 ~8 G" X  |/ O: v
            loop {' _+ Y' M% Y9 Y9 y. X4 z* ^
                let receiver = receiver.lock().unwrap();' K4 Q% k1 V2 k1 n4 u" M! d. j; A
                let message=  receiver.recv().unwrap();
) w- S+ k  c' B6 K3 U( E( t, k                match message {
% N7 a9 G7 y5 B" O. d) Y                    Message::NewJob(job) =&gt; {' Y$ \3 h" L( F6 Q' ~# p
                        println!("do job from worker[{}]", id);
7 c( g* a/ b/ l0 r' N/ a8 A                        job();
! |, X$ z; z" l" u1 I& G# p$ _# c. O                    },
7 D- [% U4 M1 \2 S4 y' ?+ m4 I                    Message::ByeBye =&gt; {( z* v( K% q* v  Y/ E, k
                        println!("ByeBye from worker[{}]", id);/ p/ I; J: P  W- U( }$ [
                        break- A  l7 D- r) [$ z. s8 t1 T
                    },* V- o( b# r- b, x5 m, E
                }  
* d# p* `$ f* G% ~1 Z; W            }
9 e6 E5 P# x5 O% q% C        });# ]: u" J; O8 k. a! N& G  {
! w: W2 Q5 I5 B, f. X9 m
        Worker {
9 J- C& A  z9 k! U            _id: id,2 t' ^8 ^' Z& V) k" G+ H8 r2 v
            t: Some(t),  G1 j) c! b1 ?4 \5 W7 F
        }
- ~4 U, @  v8 Q. ^    }
+ O$ k$ n+ p7 U& d7 V}! c% ?7 K6 ~, U/ P/ D: B
</code></pre>) `# \" E5 n+ B/ B% s3 O
<p><strong>let message = receiver.lock().unwrap().recv().unwrap();</strong> 这里获取锁后从receiver获取到消息体,然后let message结束后rust的生命周期会自动释放掉锁。<br>+ d! P; g/ S6 Q0 T& U6 }: X2 D8 Z
但如果写成</p>; r9 w. W' e' T! u4 j* r3 u% X
<pre><code>while let message = receiver.lock().unwrap().recv().unwrap() {
3 S/ L" g0 {$ g, C/ b& |};
+ y  g7 O8 t4 W9 D0 W</code></pre>7 L/ t* F% N  l$ p8 C" ~
<p>while let 后面整个括号都是一个作用域,要在这个作用域结束后,锁才会释放,比上面let message要锁定久时间。<br>  S9 t8 r# g. n- R1 H( t( B8 H
rust的mutex锁没有对应的unlock方法,由mutex的生命周期管理。</p>5 x8 p4 I7 A% z& r, U/ p6 \
<p>我们给Pool实现<code>Drop</code> trait, 让Pool被销毁时,自动暂停掉worker线程的执行。</p>+ I4 n9 G! ~( A5 N
<pre><code>impl Drop for Pool {+ i2 k6 A& g* \. I
    fn drop(&amp;mut self) {4 `% c( P7 F, w' p4 j3 z
        for _ in 0..self.max_workers {# p& h3 H8 D: Z, @  @" `/ z
            self.sender.send(Message::ByeBye).unwrap();
' u0 F$ k0 S; @        }
, F+ i. X' i! ^! p  X4 q        for w in self.workers.iter_mut() {. X2 A& |2 N) g6 g
            if let Some(t) = w.t.take() {
1 `2 ?- Z. L/ G% q1 U0 ~                t.join().unwrap();" p: z% }7 o$ p
            }
( h! `5 Y! c5 s3 N: a        }- E6 [9 m7 E( Y
    }
# {% t; B3 u0 d; t2 Z" |}6 }& O7 G, y; u, G+ Q! ~1 O9 c
3 A# R1 x% l# z+ S  l/ Z' P2 Z
</code></pre>! Z0 k- t9 l+ H2 _! [: t8 i0 y
<p><strong>drop方法里面用了两个循环</strong>,而不是在一个循环里做完两件事?</p>" ^, u" k& B  S& A, P
<pre><code>for w in self.workers.iter_mut() {
6 E% Z& m' U+ a0 t" e    if let Some(t) = w.t.take() {# `* q$ s7 K/ R6 X5 l  }6 @- a
        self.sender.send(Message::ByeBye).unwrap();
& {' n7 T# Z+ E6 p+ N) }        t.join().unwrap();
, M( U/ F& o4 `, ^# T  l    }
9 w& c( q7 T6 q3 L1 q  {. u}
9 y4 P0 B: v5 o  k; F; s# T! s+ Z, b1 K; R4 M
</code></pre>& {+ m& u# v0 c* \1 }+ t4 h8 j
<p>这里面隐藏了一个会造成死锁的陷阱,比如两个Worker, 在单个循环里面迭代所有Worker,再将终止信息发送给通道后,直接调用join,<br>
5 }. ^1 z) D% C- _# J我们预期是第一个worker要收到消息,并且等他执行完。当情况可能是第二个worker获取到了消息,第一个worker没有获取到,那接下来的join就会阻塞造成死锁。</p>
# W1 ~$ X) n; o! s- [' `2 Q<p><strong>注意到没有,Worker是被包装在Option内的</strong>,这里有两个点需要注意</p>$ M2 b* s1 t! r3 T. X* @
<ol>
7 N2 x7 }  ]: i# M' Y( c<li>t.join 需要持有t的所有权</li>$ Y# P- ^& z; S3 Q+ T7 T7 l* z
<li>在我们这种情况下,self.workers只能作为引用被for循环迭代。</li>
, c) f9 W0 k) w+ K- o</ol>$ E2 q) F: F. \( C$ w6 n
<p>这里考虑让Worker持有<code>Option&lt;JoinHandle&lt;()&gt;&gt;</code>,后续可以通过在Option上调用take方法将Some变体的值移出来,并在原来的位置留下None变体。<br>2 A, ~  G9 e$ c
换而言之,让运行中的worker持有Some的变体,清理worker时,可以使用None替换掉Some,从而让Worker失去可以运行的线程</p>
0 M6 `' m  n2 w: [5 ?0 j& I<pre><code>struct Worker where
, z7 s& N+ V7 U) R{, T( ~1 A0 F: [
    _id: usize,2 S# l! l0 A1 c& W9 |0 l
    t: Option&lt;JoinHandle&lt;()&gt;&gt;,5 [$ T# ?0 m) C, Q( T# G6 ^/ U4 x
}
( ?  ^) o7 p' y9 x; h  t! V- r7 }</code></pre>& N' h2 k. x) i6 q( N8 L) m
<h1 id="要点总结">要点总结</h1>4 q. m  e0 i$ O3 J* v  A
<ul>
/ S0 {. q$ M2 ~6 M, p4 i& x6 N<li>Mutex依赖于生命周期管理锁的释放,使用的时候需要注意是否逾期持有锁</li>) E2 N6 a- g7 Z/ Z# a
<li><code>Vec&lt;Option&lt;T&gt;&gt;</code> 可以解决某些情况下需要T所有权的场景</li>
1 X9 O0 X% o2 s- n/ E</ul>
1 p( u7 G( \6 J' o8 E<h1 id="完整代码">完整代码</h1>" d  v/ R% n" m9 H7 E+ N
<pre><code>use std::thread::{self, JoinHandle};
9 g1 _. w$ l4 v' c0 ?! A; Iuse std::sync::{Arc, mpsc, Mutex};8 l% E4 C8 L' }2 v
) E+ w6 Z4 E; ^2 d( I4 Z& ]. S

1 w/ D4 i8 [+ F5 h; {. M0 |type Job = Box&lt;dyn FnOnce() + 'static + Send&gt;;0 x; P- W# q( F
enum Message {, G4 @4 x1 q# M" {3 s
    ByeBye,$ h5 Q& ?9 x$ f
    NewJob(Job),! N3 W, e9 y" F6 l' Z+ P, I4 Z
}; a* M, u! S, o% ?

% {4 H3 t9 o2 w4 y# t. Zstruct Worker where
0 |7 K# d  e5 T  q{  J* ?/ \& z( s% Z
    _id: usize,
- i' d) R8 q$ N7 `, m) _; Z5 l    t: Option&lt;JoinHandle&lt;()&gt;&gt;,
* j- H6 m; b, U4 H) i( D}
" y: h' J1 N& E) Q) k3 Y/ j& `6 u
9 |0 e: h' H# M) x4 J$ cimpl Worker
" A- d/ r4 B! |5 ?; `) Z; _  f: w{* G. T2 n2 E8 z" R5 D5 O
    fn new(id: usize, receiver: Arc::&lt;Mutex&lt;mpsc::Receiver&lt;Message&gt;&gt;&gt;) -&gt; Worker {, g4 s- Y  k9 V7 a# U
        let t = thread::spawn( move || {
- A' |( |% m/ T5 B+ ]/ \            loop {6 W3 V! D3 ?8 ~" r( b
                let message = receiver.lock().unwrap().recv().unwrap();
( a" C% p) ~/ u) x5 O2 e                match message {
9 Q  Z. V% N3 J6 B                    Message::NewJob(job) =&gt; {
0 I) M; o5 d7 Z9 U                        println!("do job from worker[{}]", id);
% q* Y2 [: c: w, n                        job();" H# R' g+ c( i. v3 i; P  L
                    },* ?8 J  [# _  V9 y$ k
                    Message::ByeBye =&gt; {
/ v' v" T; u$ l7 t2 \  }: E. R                        println!("ByeBye from worker[{}]", id);
1 e, |9 P6 b7 F/ A4 Z                        break6 j6 s$ B4 r8 _& g0 X
                    },
$ r2 N7 ?0 k( q" c+ W, ?                }  % C9 l2 T" A3 {, ]  T: w! t
            }
8 g' u' O( s' Y8 `% E- j  D$ f- i0 ?        });
! F+ a8 Z! A' ]) F, e$ K* B0 v$ T4 q4 t0 }
        Worker {
( L7 x6 q7 y8 u+ Z7 p2 ~. S            _id: id,$ \! {7 l& m* P* F; ?* k2 c
            t: Some(t),
7 U: Q7 A5 a& ]( G. @6 ~2 {+ U& j        }* X" |* Y  X3 K) r' X
    }
9 J/ I. A4 ^# G8 Q# r5 E$ {) t/ I}- P) v8 C9 Y1 _$ n/ O  i
% g+ \6 s! A( h6 o
pub struct Pool {2 [6 @0 P; R' _  Z8 w
    workers: Vec&lt;Worker&gt;,
! Q8 _* ]- p7 C    max_workers: usize,
: F/ J) |+ g4 P9 C    sender: mpsc::Sender&lt;Message&gt;
4 w( ~  R' c1 H' {}
5 J5 H7 z: X, J0 |% s9 @" _6 b# y0 l! S1 |8 I
impl Pool where {, D$ G) N2 l% @
    pub fn new(max_workers: usize) -&gt; Pool {
0 G0 f- E$ c) O! N        if max_workers == 0 {
' _; k3 a) {. \8 z6 o# X2 J            panic!("max_workers must be greater than zero!")+ G6 C! |3 b( y: h
        }6 }3 Z/ b, E- V
        let (tx, rx) = mpsc::channel();
! W) j2 e( c  w  L; y$ @% d/ Q0 @8 }+ k, k8 s, v
        let mut workers = Vec::with_capacity(max_workers);4 o/ \. J' V. z& K
        let receiver = Arc::new(Mutex::new(rx));, i# K# {/ b( }0 Q
        for i in 0..max_workers {
/ T1 @( p$ I$ d2 t( d3 r" q            workers.push(Worker::new(i, Arc::clone(&amp;receiver)));7 M6 J, H; A( |% }
        }& B6 u5 q4 \4 M+ @& o$ {6 X

! Z* T, `# |4 X* W2 q8 }        Pool { workers: workers, max_workers: max_workers, sender: tx }
) b% I9 n, A- s( [& }    }0 t! d' r4 _3 M  E9 K; P, ]. z! l& ?
    1 P7 y$ g  Y1 @( ]. Y
    pub fn execute&lt;F&gt;(&amp;self, f:F) where F: FnOnce() + 'static + Send& b/ `4 c  \/ s6 X3 P9 T
    {! m  j' ]4 W  U0 C( k4 |* J$ [: x7 Z

1 b/ ?& l; v, T0 w& N- m        let job = Message::NewJob(Box::new(f));
$ ~3 i/ b5 W) g3 ]2 z# |        self.sender.send(job).unwrap();4 w8 B, t; @& H& K! X. O8 P2 o
    }4 {! ~/ r; ~( I  j% W
}
; b) o: j5 J$ W; j  ~% d  f
  G# o3 ]5 A+ cimpl Drop for Pool {
2 }( V7 t$ G6 _. O4 i7 a    fn drop(&amp;mut self) {' R" \3 m" b( ]4 ]% x  e0 V
        for _ in 0..self.max_workers {7 \( L1 Y8 M; S& ?
            self.sender.send(Message::ByeBye).unwrap();
; c1 V, @  P+ w7 `6 b        }, \; T7 n0 y7 A6 D
        for w in self.workers {# `$ q: ^/ b& g7 M  T. B, s+ U0 Z6 I
            if let Some(t) = w.t.take() {/ y  |) y) _8 K8 n' k& X
                t.join().unwrap();
4 u( Z: s! {$ R" o            }8 X. [2 R( z3 F0 @2 b* E
        }
( ]; j& k$ p0 A& M5 o    }
8 `, y  U" {6 F7 y  M' B2 ~- M}) @7 T' Z8 `' @- p  X* T# C

; C: L$ P8 n: T2 a: _" R# [% ]4 h# i+ d- h
#[cfg(test)]9 c/ N9 c' E0 Z: t
mod tests {( r2 i8 z+ n& d5 b
    use super::*;
1 _! X$ O# E- x' L1 i8 n# H8 R    #[test]
/ ^: ^/ U' c' \: t# f: J    fn it_works() {4 w7 v3 p5 B5 M! q7 ?1 U
        let p = Pool::new(4);; E7 o. }; p1 i3 s  m* H" J
        p.execute(|| println!("do new job1"));9 @0 }7 d" \' N. `" U
        p.execute(|| println!("do new job2"));
0 S0 n2 \4 V( x1 \$ |( ^4 l        p.execute(|| println!("do new job3"));
( g$ L# V4 ], I        p.execute(|| println!("do new job4"));
+ C$ z+ v3 F! `6 u    }
* H) X" A" ?4 T! n+ U}& |! ?* O6 H8 q% d, g
</code></pre>
0 i. Z$ @+ S" f6 q" X7 v3 R/ H& ~% M  X6 P& y  ?
回复

使用道具 举报

懒得打字嘛,点击右侧快捷回复 【右侧内容,后台自定义】
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

手机版|飞雪团队

GMT+8, 2025-9-19 07:52 , Processed in 0.069679 second(s), 22 queries , Gzip On.

Powered by Discuz! X3.4

Copyright © 2001-2021, Tencent Cloud.

快速回复 返回顶部 返回列表