飞雪团队

 找回密码
 立即注册
搜索
热搜: 活动 交友 discuz
查看: 14492|回复: 0

rust 实战 - 实现一个线程工作池 ThreadPool

[复制链接]

7992

主题

8080

帖子

2万

积分

管理员

Rank: 9Rank: 9Rank: 9

积分
26306
发表于 2022-2-12 14:35:42 | 显示全部楼层 |阅读模式

! o' t& |  ~6 r% \% K: x<h1 id="如何实现一个线程池">如何实现一个线程池</h1>, G; z( }1 |( e) K; X& e8 I
<p>线程池:一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。 例如,对于计算密集型任务,线程数一般取cpu数量+2比较合适,线程数过多会导致额外的线程切换开销。</p>
+ j8 E' P& Y5 ?<p>如何定义线程池Pool呢,首先最大线程数量肯定要作为线程池的一个属性,并且在new Pool时创建指定的线程。</p>7 U  e. X. @0 \) a/ d: f
<p>线程池Pool</p>
  Q: O5 A, j1 ~. i4 c<pre><code>pub struct Pool {
, c# m, U+ s7 Q: b" m" P& v  max_workers: usize, // 定义最大线程数; g4 w" U1 B6 f# |6 O) _
}
5 f* z- H- a7 h# p1 v  L) A
* U, ]* W3 N1 K, simpl Pool {* j/ P7 F& V! z% D1 u  K; R
  fn new(max_workers: usize) -&gt; Pool {}% u7 f. ~  Q* v- t
  fn execute&lt;F&gt;(&amp;self, f:F) where F: FnOnce() + 'static + Send {}
; A+ ^4 u- C- d' Z2 L, I3 H}) Y0 b+ W( F& v# q
, z0 a3 M- _' p: V) _5 F: c
</code></pre>: L( V; C2 G; i' i+ x4 j  E
<p>用<code>execute</code>来执行任务,<code>F: FnOnce() + 'static + Send</code> 是使用thread::spawn线程执行需要满足的trait, 代表F是一个能在线程里执行的闭包函数。</p>
% w8 o$ e6 |1 G1 {6 ^9 y<p>另一点自然而然会想到在Pool添加一个线程数组, 这个线程数组就是用来执行任务的。比如<code>Vec&lt;Thread&gt;</code> balabala。这里的线程是活的,是一个个不断接受任务然后执行的实体。<br>
! W) Z/ e* d$ Y6 v+ X可以看作在一个线程里不断执行获取任务并执行的Worker。</p>
* H  z0 `$ y6 h6 M5 P<pre><code>struct Worker where
, j# h+ i5 L* D) ?0 H7 N{
/ J7 \5 O! N  _+ L' n& u' u    _id: usize, // worker 编号' b) K6 U5 y' e! D* Y; \; t- Z
}
- e. G- ^4 G8 O3 g2 V2 @# [5 Y4 M2 F</code></pre>/ ?6 w' G) A3 l; y
<p>要怎么把任务发送给Worker执行呢?mpsc(multi producer single consumer) 多生产者单消费者可以满足我们的需求,<code>let (tx, rx) = mpsc::channel()</code> 可以获取到一对发送端和接收端。<br>$ d6 t. {6 s5 M5 P* x! A9 ^
把发送端添加到Pool里面,把接收端添加到Worker里面。Pool通过channel将任务发送给多个worker消费执行。</p>
* n0 r# M8 E" u<p><strong>这里有一点需要特别注意,channel的接收端receiver需要安全的在多个线程间共享</strong>,因此需要用<code>Arc&lt;Mutex::&lt;T&gt;&gt;</code>来包裹起来,也就是用锁来解决并发冲突。</p>" n: t! T, C* H1 }- i! v5 |4 k
<p>Pool的完整定义</p>2 T( ]! l+ X3 Z- F7 w" \
<pre><code>pub struct Pool {4 B/ C) J* Z2 C2 _( Z! ^& [
    workers: Vec&lt;Worker&gt;,9 v  a/ C9 x5 ^, y- B5 y
    max_workers: usize,
" o: k; x9 }; V; w% C% _    sender: mpsc::Sender&lt;Message&gt;
/ b( V9 |4 k! {  L}
/ M/ k# T8 u( I6 E0 m, J0 `  l</code></pre>
1 D3 O4 }' D, z; p0 A( h<p>该是时候定义我们要发给Worker的消息Message了<br>
7 |- J' \( o( {% z' f定义如下的枚举值</p># W* ]9 h! R* t7 i
<pre><code>type Job = Box&lt;dyn FnOnce() + 'static + Send&gt;;
1 E( w! D1 c1 \  @- ]enum Message {
& ]6 l9 o* m! q% D& A/ ]0 Z    ByeBye,6 H$ K/ C3 g% [0 X% P
    NewJob(Job),
6 @0 N4 \+ a1 `& A0 j}% T8 B( |2 w! l; M* d
</code></pre>  {2 m0 T5 G* H! C) L
<p>Job是一个要发送给Worker执行的闭包函数,这里ByeBye用来通知Worker可以终止当前的执行,退出线程。</p>& h9 X3 A5 v& \7 q# A2 y& u
<p>只剩下实现Worker和Pool的具体逻辑了。</p>" V% t$ F# N8 a4 ?
<p>Worker的实现</p>& ]1 ], x- N, d6 O
<pre><code>impl Worker
  M+ L5 Q0 Q* p* R{
& l9 c8 f( ^; D* |! a, k    fn new(id: usize, receiver: Arc::&lt;Mutex&lt;mpsc::Receiver&lt;Message&gt;&gt;&gt;) -&gt; Worker {
% ~, n4 r$ A# k+ }5 K. W) m        let t = thread::spawn( move || {. G. M  i) m6 w* }; j. k* H9 W
            loop {
* M9 J5 g8 E' v                let receiver = receiver.lock().unwrap();, K( K- Z8 ~9 ~0 N: ^
                let message=  receiver.recv().unwrap();
6 \/ w' w4 f) T3 q/ f/ n! Z/ c  q                match message {
3 \# M- v) Y  B) y* o                    Message::NewJob(job) =&gt; {; m9 t9 ~3 \7 Q" g5 @/ y
                        println!("do job from worker[{}]", id);1 f- E/ {( K0 w0 k2 `: d3 s# _
                        job();
6 S- _* j/ p" p* u                    },
# Z" v" b( @% Z2 |6 d                    Message::ByeBye =&gt; {( C2 u$ x. G# \" M1 }5 Z
                        println!("ByeBye from worker[{}]", id);1 u, Y( B) l/ m$ K
                        break
8 E. A; [; |- D- W1 [/ e8 b6 O                    },
2 R5 W4 i, Q6 k" C8 B4 ?( r                }  9 k3 ^' E: @( |) D' J) u
            }
2 q9 @$ l& _2 b0 `" d% m4 `        });
4 p- M3 Z) {- }+ T$ v* \# N1 O; j0 z- K6 M1 s4 j4 m) n* s
        Worker {
. C7 z5 ]' g6 ]* e            _id: id,# H. d) L# P' S& ?4 n  \
            t: Some(t),
' B' U* N3 t) }: h& W* g6 e* J        }
1 l, {! m4 [6 I- b+ k# ~$ T    }
( z* T2 {( w. i1 ?1 O}
9 i# K  C7 ^. i+ R0 s</code></pre>
+ N) T' l9 S3 \, F5 x$ p<p><strong>let message = receiver.lock().unwrap().recv().unwrap();</strong> 这里获取锁后从receiver获取到消息体,然后let message结束后rust的生命周期会自动释放掉锁。<br>
' n; P4 X! Q2 Y, M; T  H但如果写成</p>5 r% ^/ Q4 X: e* b8 J, v/ E2 a9 i
<pre><code>while let message = receiver.lock().unwrap().recv().unwrap() {8 p2 A: j2 v5 z* ~+ a( v7 B; U
};5 ?6 Y- z! O6 [
</code></pre>7 {5 B2 B; M* g* v- K
<p>while let 后面整个括号都是一个作用域,要在这个作用域结束后,锁才会释放,比上面let message要锁定久时间。<br>4 t/ S* S% j; p! ~* \: t
rust的mutex锁没有对应的unlock方法,由mutex的生命周期管理。</p>
% V+ _! _! M! S* H9 m+ O" j! Q<p>我们给Pool实现<code>Drop</code> trait, 让Pool被销毁时,自动暂停掉worker线程的执行。</p>
3 O  [8 s  S0 F  R6 _2 [+ B<pre><code>impl Drop for Pool {. O1 T$ N1 i2 l, y, p) ~  s
    fn drop(&amp;mut self) {
$ J' d" T; W5 G        for _ in 0..self.max_workers {/ g. D4 j9 p0 u3 U# ~
            self.sender.send(Message::ByeBye).unwrap();: q  h! d. B+ n
        }
+ y# J* F  b0 |+ e( L( P        for w in self.workers.iter_mut() {
# \8 x2 }0 Z" }            if let Some(t) = w.t.take() {
; i. M! M9 E* m, a                t.join().unwrap();7 I0 g5 ~! \3 M0 m# J
            }. G5 F: ?% K- }
        }4 A2 V! ^, `5 s
    }
8 k: l* A6 B* K2 F9 y}% c% \4 T# {! f. t. N. q" S" n) S. x

! D3 J5 r" P8 A</code></pre>8 G  _) r4 A$ }
<p><strong>drop方法里面用了两个循环</strong>,而不是在一个循环里做完两件事?</p>
( s6 l/ \: O# B& A: H2 g+ c6 O<pre><code>for w in self.workers.iter_mut() {
3 f4 S. ~7 v. @    if let Some(t) = w.t.take() {
! e7 p% I! U* H" m6 @/ K, y: }        self.sender.send(Message::ByeBye).unwrap();
  I2 t8 Z- m, m9 ~2 z        t.join().unwrap();4 a  d+ O8 D( A% h
    }
: ^  {1 a+ ?4 p! l. W4 a}
' j; s& k* f- X; I( C% K
4 L2 l; }: ]0 [( }4 ^( u8 W</code></pre>; j. B7 b% s8 r2 B
<p>这里面隐藏了一个会造成死锁的陷阱,比如两个Worker, 在单个循环里面迭代所有Worker,再将终止信息发送给通道后,直接调用join,<br>
0 @) l# l9 d+ C) N我们预期是第一个worker要收到消息,并且等他执行完。当情况可能是第二个worker获取到了消息,第一个worker没有获取到,那接下来的join就会阻塞造成死锁。</p>
6 P; \8 `) s  ^: _3 \<p><strong>注意到没有,Worker是被包装在Option内的</strong>,这里有两个点需要注意</p>
% P/ J5 ^" b+ J: L* p$ P<ol>
% i7 s: i; Z1 m0 c; h# u& h7 D<li>t.join 需要持有t的所有权</li>' N# _5 ~. X" g) |
<li>在我们这种情况下,self.workers只能作为引用被for循环迭代。</li>
8 D  o1 a2 f% L7 _</ol>
7 b9 R4 A. ~& L4 w- o<p>这里考虑让Worker持有<code>Option&lt;JoinHandle&lt;()&gt;&gt;</code>,后续可以通过在Option上调用take方法将Some变体的值移出来,并在原来的位置留下None变体。<br>2 N- ^4 B  R# ~# j; ?9 p. b& ]
换而言之,让运行中的worker持有Some的变体,清理worker时,可以使用None替换掉Some,从而让Worker失去可以运行的线程</p>4 e, a& H" y; d; l. B% X1 m" c
<pre><code>struct Worker where1 p8 l% }; {  m0 R/ |
{* w" c; l' ^* V0 }4 S7 U
    _id: usize,5 c& t  L' R3 r
    t: Option&lt;JoinHandle&lt;()&gt;&gt;,0 ]8 m: e$ Z9 y  c" i: f
}& l1 u8 I" X: l" B. E3 b; U
</code></pre>0 Q! u  o; ?! u9 m' o# X1 O
<h1 id="要点总结">要点总结</h1>
0 X% w( ?# [$ z- F6 j: H: E<ul>% K4 g* l" ~, T3 Q; N4 M( M- }
<li>Mutex依赖于生命周期管理锁的释放,使用的时候需要注意是否逾期持有锁</li>; I+ r6 ~1 h/ a' r( g
<li><code>Vec&lt;Option&lt;T&gt;&gt;</code> 可以解决某些情况下需要T所有权的场景</li>
+ V" [- l, m: F9 K) |9 D, F  k3 C8 {</ul>
; ~' @2 t3 V; A<h1 id="完整代码">完整代码</h1>
/ g1 k& k8 O+ ~1 ~. ^<pre><code>use std::thread::{self, JoinHandle};6 d9 G3 T! ]2 L- _) W$ B( @
use std::sync::{Arc, mpsc, Mutex};
0 F+ W/ @: X: i6 t3 d& a; ]8 g4 B8 u2 d3 @; b' l9 K" l4 o- f* o
+ a' G; c2 k5 z, U. c( ]
type Job = Box&lt;dyn FnOnce() + 'static + Send&gt;;
( i7 D1 s* N. ]; {& ?  Denum Message {. m; s* M& I% P7 P
    ByeBye,
8 ~$ |7 k+ y+ e5 k# U9 I9 Z    NewJob(Job),
; h' Q1 O7 S& R2 X- K}8 T6 ]$ V* s8 v; s% Q* E. U7 u
- w# w4 g& k; A! W+ `0 K% s, p) b
struct Worker where: ~/ K+ y  o: d) U
{
; v( F. X* x: r  o: M  [2 r    _id: usize,
* ?6 z3 I) ]7 f" _    t: Option&lt;JoinHandle&lt;()&gt;&gt;,
3 d5 p/ @3 S. g% M  m6 y}
# s8 l( V; b* h% V4 t! ~4 [! F* A
impl Worker
$ [& R3 y& K& g& b" [{
! ]/ `& o0 L& q  z    fn new(id: usize, receiver: Arc::&lt;Mutex&lt;mpsc::Receiver&lt;Message&gt;&gt;&gt;) -&gt; Worker {
& p! \: d9 p! Z: ~" `) Z. i- o2 z        let t = thread::spawn( move || {
% D: V. l6 C( t# h0 Z            loop {7 g2 b7 J% h" E: I1 K1 B  s6 B, C: D
                let message = receiver.lock().unwrap().recv().unwrap();& ?, ~  f% t3 o' e; |) j
                match message {4 \4 j+ K) a/ {/ C
                    Message::NewJob(job) =&gt; {( L" k9 o- B# x
                        println!("do job from worker[{}]", id);& Z' p- U* e! y
                        job();4 {: d7 k2 A: _& o
                    },% v5 P% N& w8 s, _( g$ }/ `& H- Y  @
                    Message::ByeBye =&gt; {' v) u$ i( T* `! r, }- ^
                        println!("ByeBye from worker[{}]", id);
0 d6 j+ G6 ^! J, _. a$ B- G; _                        break. D; |7 V- `! \! z* W% w- I
                    },
8 R# O% ?$ N: N5 B" e! h6 a                }  
# S  I- _: Z7 F/ T            }- }3 M; P. X0 ^, \
        });* F# C- l1 I: N8 ]3 C: A

7 q: _# |1 O/ o+ c) h' O        Worker {: M+ w0 j' i9 ?; {+ v" }% G) M9 x
            _id: id,
! E# q& H: d4 z7 s' r& l7 {            t: Some(t),
- P- {9 p3 ^4 @* C* M        }0 f% p, q+ u3 Y" B  G
    }5 Y6 p; E/ |: r. i8 u
}
' c& J% ?% [' s% `5 A
0 Z% X. x- I4 Z- N7 {" d+ L& Cpub struct Pool {
8 m& r2 B* W% M8 K; ^    workers: Vec&lt;Worker&gt;,
" }- C; d* G2 s. {; N+ ~5 o5 z    max_workers: usize,  p- S3 `* [. G# U
    sender: mpsc::Sender&lt;Message&gt;
) c, Y! `+ Z3 _}# V5 r/ y* n$ T, E9 t5 Q; f3 B
- t& j# f7 m/ \6 ^% T
impl Pool where {
8 y. ~; z; r  y* \    pub fn new(max_workers: usize) -&gt; Pool {% {8 K8 B1 h- }! N! E- i& Y( \
        if max_workers == 0 {
& m. Z5 r3 U/ X; R( m/ [' p            panic!("max_workers must be greater than zero!")
) Q2 K) a1 [: V2 d* \        }9 V9 h* q' O- C) @
        let (tx, rx) = mpsc::channel();
# r1 r# L8 \# s# i& d( O' \4 x
3 S- r5 y( P! o% n        let mut workers = Vec::with_capacity(max_workers);
* X9 @# K! W' @# L        let receiver = Arc::new(Mutex::new(rx));
7 U0 u/ e; C; i# u# o$ l) b        for i in 0..max_workers {8 d* h5 F, M4 `& G
            workers.push(Worker::new(i, Arc::clone(&amp;receiver)));* Z  }( c& S" X! e
        }
" z' v* r& j: s5 @3 P/ ~* f
6 m/ z8 W1 A9 t! d! u; L        Pool { workers: workers, max_workers: max_workers, sender: tx }
0 A8 k8 [) g9 A1 ]% b; S0 I" _    }
; j; S/ ^. U! a* m! `+ b      W1 a. c- Z) {4 q/ g
    pub fn execute&lt;F&gt;(&amp;self, f:F) where F: FnOnce() + 'static + Send, T0 H& d/ d5 o9 N6 ?2 F
    {# M* N+ x& i; f
& g# ]) c; G* Y! q$ a
        let job = Message::NewJob(Box::new(f));  b1 T+ D1 Q. x; H6 _. I. U2 }. [
        self.sender.send(job).unwrap();
$ }" p$ X. b$ R8 r7 W. K! F    }  ]& I6 D4 U* n& Q$ ^7 i: _6 l
}
' K) Q1 G, Y# v
( t. p# j( j9 I: ^, Mimpl Drop for Pool {) k) n- O8 Z) Z3 u1 j
    fn drop(&amp;mut self) {
+ j" N4 U, a' x9 b4 ]9 l( B        for _ in 0..self.max_workers {, v' a2 j2 C" K' _0 I( T' x4 G
            self.sender.send(Message::ByeBye).unwrap();4 C9 E. e) ]% ~
        }) z; ^' M# y/ d0 m" h) H6 R" g
        for w in self.workers {
3 Y, \& Y# q4 c8 T  u* ~0 T9 q            if let Some(t) = w.t.take() {
0 d  l# e: C1 u2 w, P                t.join().unwrap();
0 w& v5 a1 v0 {, U! f            }, d+ ?% h* y8 S8 j9 A
        }  A# q7 \1 G* H7 K9 a% O1 }  c
    }
) |% H  l1 X" K6 W' W9 X}
, S) h3 V) i4 t% z. m/ X4 x8 Z0 C) a* Q4 t2 ?* [: H

' f- z7 ?2 |' X* |: D, X: G#[cfg(test)]
; d9 t9 G2 D+ N; n9 n6 R; ^+ [mod tests {
2 e/ p: i4 Z5 D& \    use super::*;
, D7 a3 R8 r0 j, Z5 m: n$ s    #[test]
" d. V8 f6 p) v+ W! X    fn it_works() {
* ^* I* ^: |: Z9 n        let p = Pool::new(4);
; u- H' H9 i4 o0 s0 X3 `/ p        p.execute(|| println!("do new job1"));
; G* N" G1 I0 y$ S        p.execute(|| println!("do new job2"));6 ?' n) U  l$ N" D% k- m6 h/ W
        p.execute(|| println!("do new job3"));
' m  W1 w! K+ ~0 z: f' g" j        p.execute(|| println!("do new job4"));
" {' p: V5 E7 l& _. q2 _' ^* \    }; m$ V; u# R) Y4 }! h! {3 L0 I
}2 I* |" ]) X1 a. c
</code></pre>: r1 J% M6 V; [- V9 u8 `9 H( c5 v

6 z4 u8 h  s- g% O) Z
回复

使用道具 举报

懒得打字嘛,点击右侧快捷回复 【右侧内容,后台自定义】
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

手机版|飞雪团队

GMT+8, 2025-11-28 16:59 , Processed in 0.122654 second(s), 22 queries , Gzip On.

Powered by Discuz! X3.4

Copyright © 2001-2021, Tencent Cloud.

快速回复 返回顶部 返回列表