飞雪团队

 找回密码
 立即注册
搜索
热搜: 活动 交友 discuz
查看: 14711|回复: 0

rust 实战 - 实现一个线程工作池 ThreadPool

[复制链接]

8042

主题

8130

帖子

2万

积分

管理员

Rank: 9Rank: 9Rank: 9

积分
26456
发表于 2022-2-12 14:35:42 | 显示全部楼层 |阅读模式

1 H# J  c8 q, \. N2 _* t<h1 id="如何实现一个线程池">如何实现一个线程池</h1>
; |  R0 M: t! G) {0 i/ z<p>线程池:一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。 例如,对于计算密集型任务,线程数一般取cpu数量+2比较合适,线程数过多会导致额外的线程切换开销。</p>7 h& U" `5 N( i1 a7 b6 k2 C
<p>如何定义线程池Pool呢,首先最大线程数量肯定要作为线程池的一个属性,并且在new Pool时创建指定的线程。</p>8 l6 _1 V( K6 {- t: w1 \
<p>线程池Pool</p>. S3 `- m, ]% u( R' q( t/ j9 X
<pre><code>pub struct Pool {/ I: O3 y+ X7 {) |
  max_workers: usize, // 定义最大线程数
: H0 }- T, B+ J; }" y' \# C}
0 M6 @5 R9 J& y+ ^# @) ?& l7 p' N; S! u9 L5 f
impl Pool {6 f* o) ?9 u+ J2 a- ~
  fn new(max_workers: usize) -&gt; Pool {}( |+ ]( g3 `+ Y2 L$ y: h2 e7 ?
  fn execute&lt;F&gt;(&amp;self, f:F) where F: FnOnce() + 'static + Send {}
4 Q7 |/ K( E9 v3 r}" l3 ?' l( S7 l8 W

( ?# F% A" c6 T1 Z</code></pre>6 {% X. q% `1 O; @. X* u
<p>用<code>execute</code>来执行任务,<code>F: FnOnce() + 'static + Send</code> 是使用thread::spawn线程执行需要满足的trait, 代表F是一个能在线程里执行的闭包函数。</p>
8 _* T" b  R5 }& \" v4 c4 x<p>另一点自然而然会想到在Pool添加一个线程数组, 这个线程数组就是用来执行任务的。比如<code>Vec&lt;Thread&gt;</code> balabala。这里的线程是活的,是一个个不断接受任务然后执行的实体。<br>4 X8 q0 t* Q" q2 c# p  U
可以看作在一个线程里不断执行获取任务并执行的Worker。</p>
# W- u1 E4 W( Q9 w<pre><code>struct Worker where5 `6 c6 W$ K' O$ m) C6 G
{
8 F( s6 j. j8 B( i    _id: usize, // worker 编号$ S' e- v2 I# {0 n( ~( x
}1 `, D$ P" K8 @$ M( _! r
</code></pre>: B. X5 |* p% Y" v" Z1 d& t
<p>要怎么把任务发送给Worker执行呢?mpsc(multi producer single consumer) 多生产者单消费者可以满足我们的需求,<code>let (tx, rx) = mpsc::channel()</code> 可以获取到一对发送端和接收端。<br>5 p! B, P! ?1 D9 c' {
把发送端添加到Pool里面,把接收端添加到Worker里面。Pool通过channel将任务发送给多个worker消费执行。</p>, v4 ~& B6 ?1 T% e8 Q8 c9 a9 e( P
<p><strong>这里有一点需要特别注意,channel的接收端receiver需要安全的在多个线程间共享</strong>,因此需要用<code>Arc&lt;Mutex::&lt;T&gt;&gt;</code>来包裹起来,也就是用锁来解决并发冲突。</p>/ S" m; A  Z2 u# }' R7 o  F6 e
<p>Pool的完整定义</p>
$ |# W( y' k  C% K8 |<pre><code>pub struct Pool {
" v: Q1 J: W# u* A" L  y    workers: Vec&lt;Worker&gt;,! b9 J3 P2 R8 o0 C
    max_workers: usize,
) X$ h# o7 f1 W0 L  s1 O# X    sender: mpsc::Sender&lt;Message&gt;
2 I. M7 D, K6 i' H/ @( B. `# ^}
3 {6 i8 t# u- K" Q2 Z</code></pre>
1 E0 B' m2 }2 _3 `4 ?& V<p>该是时候定义我们要发给Worker的消息Message了<br>
4 s) ]2 b0 U; a% T# d; J定义如下的枚举值</p>9 P5 W0 r: Y# {$ X, x
<pre><code>type Job = Box&lt;dyn FnOnce() + 'static + Send&gt;;
* B' c/ @8 Z  a' K' w6 b+ I# M/ Jenum Message {2 [& P! Z! M! l$ Y2 n. v" Y& H
    ByeBye,
; X/ X5 o( g) d5 r& |5 }6 R" r% W$ v    NewJob(Job),
/ R& t, t+ ?7 o% W+ a1 H}
: v, }/ \- D. {+ w</code></pre>( s' O" ^5 k4 n; v2 B# Z
<p>Job是一个要发送给Worker执行的闭包函数,这里ByeBye用来通知Worker可以终止当前的执行,退出线程。</p>" O9 H3 i3 z, A; c* r
<p>只剩下实现Worker和Pool的具体逻辑了。</p>6 B8 |( C4 b' p4 Z& A3 e, K
<p>Worker的实现</p>
, [1 T- i- o0 w) c; y<pre><code>impl Worker
' v) U0 p- \' R0 W. Q% a{
& j* _0 j/ N: v0 ^, g; Q    fn new(id: usize, receiver: Arc::&lt;Mutex&lt;mpsc::Receiver&lt;Message&gt;&gt;&gt;) -&gt; Worker {
0 K2 ]2 R' y; [2 Z# U$ D        let t = thread::spawn( move || {
, b9 s0 v* P, ?* z: r# v5 u* r  ~            loop {
5 S( B1 Z1 H* l. m                let receiver = receiver.lock().unwrap();
  b# i. w2 }0 x1 x4 ^: c% @                let message=  receiver.recv().unwrap();
  M, K# R& ?5 p! y2 W4 l4 W* h                match message {
! [1 [2 G$ p+ c2 B7 B. Z                    Message::NewJob(job) =&gt; {
  F- v, A+ f6 _7 O5 v! h+ ~( A$ {* R5 {/ k                        println!("do job from worker[{}]", id);: k' }! h' \9 ?1 X5 l' k
                        job();2 y& M( u6 A: `
                    },
. t- V1 p. Z+ }' T4 D  }" F+ Z                    Message::ByeBye =&gt; {& k' W8 _  I& P! M8 V* K+ X
                        println!("ByeBye from worker[{}]", id);9 a' m  S( ~- Y$ p% C$ ^! |
                        break
+ u* P- P+ S. J/ A0 |" r                    },7 b% H- x$ A$ u2 ?7 X- D% {
                }  
: }# V! L: x( W' N            }- ?; `! {/ O: v8 V. I
        });9 }1 _9 N+ l, d

# Q: |% M1 `' C$ \' _, q: O+ X        Worker {% H8 T; Q% f" J! R2 m- u
            _id: id,
3 \/ r6 a& h/ n" @: {            t: Some(t),# U, T7 o: W% q3 ?, H
        }% l. g8 b( J9 j* r
    }6 {3 z/ p9 h  ^7 d
}
4 o2 v. p0 o6 D; J</code></pre>
( U3 O5 C5 @+ `4 Y<p><strong>let message = receiver.lock().unwrap().recv().unwrap();</strong> 这里获取锁后从receiver获取到消息体,然后let message结束后rust的生命周期会自动释放掉锁。<br>! i' j  a  G1 i! C; X
但如果写成</p>; ^" m- ?- l1 j; ^; T+ N9 b
<pre><code>while let message = receiver.lock().unwrap().recv().unwrap() {
* m/ x* `& T8 W$ I};
, S3 E' B$ [# @+ H</code></pre>% k7 H. x3 h, e4 C
<p>while let 后面整个括号都是一个作用域,要在这个作用域结束后,锁才会释放,比上面let message要锁定久时间。<br># ]2 x) h) h5 n: A# m  M
rust的mutex锁没有对应的unlock方法,由mutex的生命周期管理。</p>
( h" }: l( q  i+ r* z- ]1 g% T; @<p>我们给Pool实现<code>Drop</code> trait, 让Pool被销毁时,自动暂停掉worker线程的执行。</p>- J; \) ^) b" G$ ~8 P
<pre><code>impl Drop for Pool {1 e  w% g* T1 \' e2 K" a* }* c3 i
    fn drop(&amp;mut self) {
9 q9 H2 e5 K; E# r1 O7 m( C5 d        for _ in 0..self.max_workers {) k4 K( t- T0 _# d1 E. w& h
            self.sender.send(Message::ByeBye).unwrap();
: S0 W8 B# k9 O1 Y        }
- |' h; c  {7 y8 ?, F  Y        for w in self.workers.iter_mut() {1 f' _" k) j9 K
            if let Some(t) = w.t.take() {
  D7 K. K, P1 V( g+ u) Y$ L) V                t.join().unwrap();/ S9 X, w  d. C; [9 |. [
            }6 |! q8 Y! y* l6 [3 Q* D
        }
' M9 F/ W, H. t4 }    }3 |2 [' _3 h! j
}
, X0 D; ]/ E& O
: e7 X! d+ n8 V+ X: x</code></pre>
5 u8 ]4 _1 q3 P/ f: x$ @<p><strong>drop方法里面用了两个循环</strong>,而不是在一个循环里做完两件事?</p># n$ z! Y4 }; |
<pre><code>for w in self.workers.iter_mut() {* q& z* u' q2 F" S6 o
    if let Some(t) = w.t.take() {
& Z4 z* w( ^' Z4 ]' w        self.sender.send(Message::ByeBye).unwrap();
! S3 W9 a- U! ]5 M6 t        t.join().unwrap();
; `" p: `) ^4 w) k4 `( T    }
" h$ Z+ V) ?  \5 j4 J5 ]}/ K; ^/ ]; l6 P- I) V. W

+ Z: W% K7 i9 n8 T, B4 H0 K</code></pre>9 c5 P' w# v; k9 B
<p>这里面隐藏了一个会造成死锁的陷阱,比如两个Worker, 在单个循环里面迭代所有Worker,再将终止信息发送给通道后,直接调用join,<br>5 D) _2 D' x; X7 L7 d$ s
我们预期是第一个worker要收到消息,并且等他执行完。当情况可能是第二个worker获取到了消息,第一个worker没有获取到,那接下来的join就会阻塞造成死锁。</p>6 q2 p4 z7 Z5 c  m1 j3 s
<p><strong>注意到没有,Worker是被包装在Option内的</strong>,这里有两个点需要注意</p>
6 N+ i2 n7 [& T. p<ol>
  w& ?* o, C& X4 D3 {  w( S, n<li>t.join 需要持有t的所有权</li>
& N- _! q" l, `8 v- l<li>在我们这种情况下,self.workers只能作为引用被for循环迭代。</li>2 z! T. m# f' \6 k! o3 \# ~. ?
</ol>
; ?; i0 C* Y. s8 H! d& U<p>这里考虑让Worker持有<code>Option&lt;JoinHandle&lt;()&gt;&gt;</code>,后续可以通过在Option上调用take方法将Some变体的值移出来,并在原来的位置留下None变体。<br>/ `% Q! C( y  A
换而言之,让运行中的worker持有Some的变体,清理worker时,可以使用None替换掉Some,从而让Worker失去可以运行的线程</p>
" {1 a1 u9 a2 X# y9 y' a* x* }<pre><code>struct Worker where
3 h% _) W" G! n# A{+ q% O9 T/ w7 Y) c
    _id: usize,
6 {- A5 R% C% C7 T    t: Option&lt;JoinHandle&lt;()&gt;&gt;,( w6 F) H! O) J- a6 d
}
+ m8 N1 C/ `# T* r3 t# _</code></pre>
, B8 A* R6 c7 k" ?<h1 id="要点总结">要点总结</h1>0 _6 u9 y! ]! a- `8 Y9 q  k
<ul>7 b2 z# Q+ B) X4 I
<li>Mutex依赖于生命周期管理锁的释放,使用的时候需要注意是否逾期持有锁</li>
% ~+ c4 f! q% s, p% w% M: T' v<li><code>Vec&lt;Option&lt;T&gt;&gt;</code> 可以解决某些情况下需要T所有权的场景</li>' J0 D! i- d+ ~  v7 L5 v
</ul>8 a. J% @' K) A  R
<h1 id="完整代码">完整代码</h1>* g$ I& B1 W. [+ S" E
<pre><code>use std::thread::{self, JoinHandle};9 V% Q; s& h3 Q4 ^/ o, H& Q& E, z
use std::sync::{Arc, mpsc, Mutex};
5 ?* i* \7 b2 v* Q7 J# W3 ~% J
7 H. Z4 [6 \  b1 M1 C
! H& C/ \# }1 Q$ T" s. f9 etype Job = Box&lt;dyn FnOnce() + 'static + Send&gt;;5 ?* x4 G7 V; W; u( ?" Z  f" z
enum Message {
9 {3 I7 L6 Q) Z$ W4 A    ByeBye,
8 f8 I7 m: l% t' b+ t! @    NewJob(Job),2 ?$ D! `" K0 m# O4 w  G
}
7 b5 x. b9 R7 r. h
8 m! Z& d- w3 D! E! j" Pstruct Worker where- x! b5 i; c* A6 p
{
! d$ \5 t; u( Y' H  x' Y* y    _id: usize,8 ]$ j/ E6 T7 J. A) [' _2 e' @
    t: Option&lt;JoinHandle&lt;()&gt;&gt;,
; M+ X" F0 U; M, c6 n+ }}
8 u9 f: n, W" ^5 e
! N7 J$ n/ v( c- I& M! d* Yimpl Worker
% r2 {: {4 k( s  t/ I% x{
) j# ~/ f$ M1 I; O& _" T    fn new(id: usize, receiver: Arc::&lt;Mutex&lt;mpsc::Receiver&lt;Message&gt;&gt;&gt;) -&gt; Worker {4 U0 M, k# ^; E8 ~/ W5 R0 X
        let t = thread::spawn( move || {. H0 c. A" C. {: M* L+ Z8 f4 ^
            loop {. B6 i+ @' |0 L$ {  D
                let message = receiver.lock().unwrap().recv().unwrap();
* P7 E4 Q, Z+ t3 F2 V: H! J7 L; t4 b& I                match message {
' e$ r/ R% B# t% l. n2 F" v                    Message::NewJob(job) =&gt; {
; p* D6 a0 k. H                        println!("do job from worker[{}]", id);
: Z/ l: C0 L+ Y- {3 Z, F                        job();
$ J3 s. t% t0 T, X3 _                    },
8 z5 t6 d) @) s9 _! h                    Message::ByeBye =&gt; {/ e( h6 Y2 r, {- l3 W
                        println!("ByeBye from worker[{}]", id);: S; j2 F( _/ p) P7 W6 p+ ?
                        break$ p# `& w, f* G( K5 N0 ^
                    },1 i' F/ |2 y. N3 z' ]6 l
                }  
- v5 K3 r' Q3 R7 D            }+ K8 k3 e7 c. f! p8 N7 M' e+ F
        });' Q- s' g- b8 Q5 Z4 ?- B' ~
8 A+ s6 q" K: E
        Worker {8 [' ?; W  I* r8 y0 h
            _id: id,
# f; g% S0 x) K/ o            t: Some(t),8 T+ M% C  ^3 ^0 K( U& Z9 F
        }( K' \5 [1 H" c& @! }2 H8 i. A, P$ l
    }
. a7 O: _9 v. R}
7 T) B2 X% Z! w8 p: N$ U
" J7 t& h% O2 T/ N0 q) e/ ?pub struct Pool {1 n6 n4 l& Y: f" Z. ?6 d* X4 z
    workers: Vec&lt;Worker&gt;,0 o% }! [8 b2 ]: S/ I" ?. J
    max_workers: usize,; \# w+ q; h" ^9 s) X
    sender: mpsc::Sender&lt;Message&gt;' `# m  p2 _! r" k
}/ F( o' \6 q, r6 @

8 |$ u4 W# O) j! w  n* S. g0 J+ rimpl Pool where {( J7 f% C$ O! u2 @. Z/ }
    pub fn new(max_workers: usize) -&gt; Pool {$ q6 _! k- E4 G: M. p
        if max_workers == 0 {+ a& Q' m& M: k* n3 ?6 |, }0 r
            panic!("max_workers must be greater than zero!")
; |+ J7 ^+ t3 w) x2 E. ]% G        }
2 B& Z  L4 |- z' J& ^        let (tx, rx) = mpsc::channel();2 K& I# i3 O4 z- U% {4 T

) S; {6 }6 R/ }4 |% u        let mut workers = Vec::with_capacity(max_workers);8 t: U  |+ v3 }. A% y, ]
        let receiver = Arc::new(Mutex::new(rx));6 T6 K$ Y+ [: o3 w. `7 [
        for i in 0..max_workers {4 W9 E. t7 [, p1 x. L* ]
            workers.push(Worker::new(i, Arc::clone(&amp;receiver)));# {0 p9 S9 t! s7 R" X
        }
# ~1 m& C2 L% g2 I  \1 M9 q; g# J. r5 c0 ?
        Pool { workers: workers, max_workers: max_workers, sender: tx }
9 m) D9 n& U: A) \/ E4 I3 ^# _    }
6 j4 ]- Q2 w* [% D; e    ( \8 ^- D) h! x2 j! V# y
    pub fn execute&lt;F&gt;(&amp;self, f:F) where F: FnOnce() + 'static + Send; g  y3 U3 I9 N% o- \4 _8 j: H3 L) x
    {
& z' ]# k* N0 |3 L9 f- p2 P8 ^* _  {0 C+ @
        let job = Message::NewJob(Box::new(f));
5 K, I( ?) j1 i& K; e        self.sender.send(job).unwrap();! X3 p0 i( i; Q) l4 p) U
    }$ I- e7 w  }7 O7 e8 _* h8 W& x
}" S8 i# K: ]+ ]2 `/ T

5 F, W9 E2 V# t5 a0 Zimpl Drop for Pool {
2 U5 G# ~/ G7 r  ]1 G5 Y% }7 k    fn drop(&amp;mut self) {
+ X6 t' a7 p4 N& q* Q; w5 A: `5 T4 ?        for _ in 0..self.max_workers {
2 ~4 [# e8 b8 j8 l            self.sender.send(Message::ByeBye).unwrap();
9 ^" b- b. B9 s) t        }, S& g/ H! K$ _/ Y/ _
        for w in self.workers {: [7 m5 z1 ?8 T, W
            if let Some(t) = w.t.take() {
' U: D1 u9 l1 {+ x                t.join().unwrap();% W8 f& K( P$ Z# Z* q" S
            }- u- @7 f/ d: ?2 d! ~9 k
        }
0 y- w  o6 }9 z; S# Y0 W    }. D) ?& ?1 t& q0 n! r5 d. `+ i
}
# a1 ~6 y& P& G- ^) Z! h1 E# `

. c2 M1 f, ~( i* f7 L. K2 _- g2 d#[cfg(test)]* h9 v. B3 ~: Y. M& R: N# J5 B: e! _
mod tests {1 R* D; o7 b  @" `+ r
    use super::*;
5 C1 {8 B+ N  _# @    #[test]
" k* K6 B3 i7 L: |5 {3 Z  J    fn it_works() {
$ ^) }: r  x1 X7 n. S( v        let p = Pool::new(4);
: |& x, L' S8 A8 J        p.execute(|| println!("do new job1"));
  P$ ]+ A% e  u! q! E3 u7 r        p.execute(|| println!("do new job2"));
" V4 U9 b) i$ ]/ k& k4 R        p.execute(|| println!("do new job3"));
4 |2 O* o) ^& Q7 Z/ i; n' k" Q/ q        p.execute(|| println!("do new job4"));
$ `8 S& a* U# h) W6 `6 X6 F% I" \    }
4 ]5 X2 L1 E9 S- _7 Z9 N7 O: q, i}% B( N+ F. j) P  m
</code></pre>9 O, h8 Z6 `. U( n4 l. D  E

9 u7 v  V/ ^+ W4 t2 E, a$ w$ A
回复

使用道具 举报

懒得打字嘛,点击右侧快捷回复 【右侧内容,后台自定义】
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

手机版|飞雪团队

GMT+8, 2025-12-8 19:31 , Processed in 0.070590 second(s), 22 queries , Gzip On.

Powered by Discuz! X3.4

Copyright © 2001-2021, Tencent Cloud.

快速回复 返回顶部 返回列表