飞雪团队

 找回密码
 立即注册
搜索
热搜: 活动 交友 discuz
查看: 14511|回复: 0

rust 实战 - 实现一个线程工作池 ThreadPool

[复制链接]

7994

主题

8082

帖子

2万

积分

管理员

Rank: 9Rank: 9Rank: 9

积分
26312
发表于 2022-2-12 14:35:42 | 显示全部楼层 |阅读模式

. z6 ]0 ]  T+ u. Z$ e: w<h1 id="如何实现一个线程池">如何实现一个线程池</h1>! |: `6 i* D- [- n% N
<p>线程池:一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。 例如,对于计算密集型任务,线程数一般取cpu数量+2比较合适,线程数过多会导致额外的线程切换开销。</p>, P! Y6 s; O. h) \0 I
<p>如何定义线程池Pool呢,首先最大线程数量肯定要作为线程池的一个属性,并且在new Pool时创建指定的线程。</p>0 `+ G  N' n- }: m
<p>线程池Pool</p>. p9 m9 \4 P* N2 F6 P9 P7 \
<pre><code>pub struct Pool {" W3 \( Z( q1 V. Q) i0 `$ `2 |, Z
  max_workers: usize, // 定义最大线程数  p: T0 k3 c2 F+ `4 F
}
8 V9 W" v$ x7 R  I3 s8 P( F/ h  ]  y* C/ @# s' B
impl Pool {
" p4 r+ Y$ \+ C, r  fn new(max_workers: usize) -&gt; Pool {}
$ f" G! d+ G1 g- K1 e3 K0 d7 X  fn execute&lt;F&gt;(&amp;self, f:F) where F: FnOnce() + 'static + Send {}) P+ Q1 W/ b' r9 x4 a: O
}
  |( z; H: C) P" \' }- _6 `% l2 ^' l( |1 L# W
</code></pre>7 X5 B! `) G$ A9 X  A$ C4 Q
<p>用<code>execute</code>来执行任务,<code>F: FnOnce() + 'static + Send</code> 是使用thread::spawn线程执行需要满足的trait, 代表F是一个能在线程里执行的闭包函数。</p>8 Z& o  u+ o( R) V) f. G& j1 f& m" F
<p>另一点自然而然会想到在Pool添加一个线程数组, 这个线程数组就是用来执行任务的。比如<code>Vec&lt;Thread&gt;</code> balabala。这里的线程是活的,是一个个不断接受任务然后执行的实体。<br>
# J# X& H; X& h' i- v. P2 I6 m可以看作在一个线程里不断执行获取任务并执行的Worker。</p>
0 N' U1 w+ b/ G8 V; e0 O; J, y0 X<pre><code>struct Worker where
, j* f1 y. [0 j* w4 a7 s* X{3 m7 m/ g5 D! d: G7 d  k& }
    _id: usize, // worker 编号
5 G0 n" y& j* c: }}
$ h6 i3 P/ }' K, W$ o</code></pre>" ~; Z3 u$ V9 o) _
<p>要怎么把任务发送给Worker执行呢?mpsc(multi producer single consumer) 多生产者单消费者可以满足我们的需求,<code>let (tx, rx) = mpsc::channel()</code> 可以获取到一对发送端和接收端。<br>
* C& q( w8 s  S把发送端添加到Pool里面,把接收端添加到Worker里面。Pool通过channel将任务发送给多个worker消费执行。</p>% i# A) |4 V$ A8 w7 Z4 e9 M3 p: J
<p><strong>这里有一点需要特别注意,channel的接收端receiver需要安全的在多个线程间共享</strong>,因此需要用<code>Arc&lt;Mutex::&lt;T&gt;&gt;</code>来包裹起来,也就是用锁来解决并发冲突。</p>8 [) L/ }4 h' }+ X% Z% X
<p>Pool的完整定义</p>
. M$ p% i: @  a( X7 T% c/ z<pre><code>pub struct Pool {! J0 ]7 z0 Z/ D( e
    workers: Vec&lt;Worker&gt;,
1 A, p/ q/ |6 ~/ ^* }% M+ S    max_workers: usize,: G6 \; F! y6 [' C8 ?
    sender: mpsc::Sender&lt;Message&gt;7 n( E. j- N) j0 E
}! ]0 M8 {6 f. X0 s* r& q0 F& K
</code></pre>
$ V: b/ Q# Q  r( {* ]7 t0 f) P<p>该是时候定义我们要发给Worker的消息Message了<br>
0 g7 Q! f# k$ f+ Q0 u定义如下的枚举值</p>& e8 N! I; u+ a8 S
<pre><code>type Job = Box&lt;dyn FnOnce() + 'static + Send&gt;;# I4 Y& o: M" R
enum Message {* Z- Z% U/ b& O8 A/ T
    ByeBye,/ [# ]! |8 w7 t6 z/ j
    NewJob(Job),. a2 n$ S% q2 |2 V
}* n$ p- y8 W3 @+ ], A2 K9 l
</code></pre>5 M7 I/ f1 [5 J( ?6 R
<p>Job是一个要发送给Worker执行的闭包函数,这里ByeBye用来通知Worker可以终止当前的执行,退出线程。</p>3 B7 f# T: ~& H: \- ]0 @
<p>只剩下实现Worker和Pool的具体逻辑了。</p>
# P/ X4 X8 R3 p+ o<p>Worker的实现</p>
* {' u% H# C9 l4 Y<pre><code>impl Worker/ Q) T) F# ~% @5 s" o  }$ S2 Y
{1 n- s/ j' J4 V! A3 O
    fn new(id: usize, receiver: Arc::&lt;Mutex&lt;mpsc::Receiver&lt;Message&gt;&gt;&gt;) -&gt; Worker {8 |: _1 J* V4 j
        let t = thread::spawn( move || {
( i0 w7 O0 S) C            loop {
5 \2 ~. A5 i* S  G  {, S                let receiver = receiver.lock().unwrap();
# o4 I, k8 `3 L                let message=  receiver.recv().unwrap();
( a# L7 Q4 r- P/ R4 J; A                match message {  Y6 ?5 b; p' C" [, u$ ]7 C; o# n8 e; u
                    Message::NewJob(job) =&gt; {" }1 T" a6 o1 h( b5 L) U) [
                        println!("do job from worker[{}]", id);- o8 V9 j6 S) r
                        job();
& Q8 z7 _) x+ b; ]+ ~$ z                    },
; y2 K9 r+ d) F7 L; `  O                    Message::ByeBye =&gt; {$ o; k9 V0 @1 R6 T& u
                        println!("ByeBye from worker[{}]", id);
/ h1 c& s4 |  p" i+ w3 v; e                        break" W: m$ w+ _9 z# p6 m/ z2 j: [+ U; s
                    },. }4 R6 R: }0 {. r5 j: M
                }  9 L9 Q, D1 o- F' W/ s
            }
% B* M1 \% q" x: [' B1 X( ^        });6 P- ^$ G+ @* D5 a$ D
$ E: Y$ V3 t& n3 y+ ?: J
        Worker {/ b# d' M- t4 q) X7 e
            _id: id,
5 ~8 G( s9 @( a/ C5 @" L            t: Some(t),' h; B6 D$ B* g
        }
% s/ t1 R/ A0 _% u    }$ S0 G; g+ q: q& u
}0 z/ U  B$ g3 _; k, _
</code></pre>
  c+ h( B; |  {# X6 f% E+ G9 s<p><strong>let message = receiver.lock().unwrap().recv().unwrap();</strong> 这里获取锁后从receiver获取到消息体,然后let message结束后rust的生命周期会自动释放掉锁。<br>
/ H0 [3 v5 G  Y8 V, M$ R8 k但如果写成</p>
: P. n% Y* \' W8 v$ e( _1 F  W<pre><code>while let message = receiver.lock().unwrap().recv().unwrap() {, q+ A  @+ v. b
};: y+ @# z& O& O' f1 j0 g
</code></pre>' j5 u9 k1 u6 h- n" Q
<p>while let 后面整个括号都是一个作用域,要在这个作用域结束后,锁才会释放,比上面let message要锁定久时间。<br>" }2 \' s' F% K: r
rust的mutex锁没有对应的unlock方法,由mutex的生命周期管理。</p>
# v  e" F5 v# N4 ?/ y" X<p>我们给Pool实现<code>Drop</code> trait, 让Pool被销毁时,自动暂停掉worker线程的执行。</p>
6 W$ n( t. ]: a( C# l% g! K2 |<pre><code>impl Drop for Pool {; f, x4 V  t' d; D; i3 M- K
    fn drop(&amp;mut self) {: _0 J" ~2 o, o  t$ u
        for _ in 0..self.max_workers {
) B' T' p! L, T0 V) e1 i$ f4 B            self.sender.send(Message::ByeBye).unwrap();
7 D" m- y, H6 {7 ~        }1 K3 A* J9 R$ |
        for w in self.workers.iter_mut() {" h( Y( R0 i; z4 F
            if let Some(t) = w.t.take() {4 J- ~3 w" U3 G! V4 a5 F" ~
                t.join().unwrap();
3 \# [$ [8 E  b- _9 s' F/ M& r            }# e. k5 u& J& I0 q/ l* y  @
        }, j9 [# |: e" ^9 J* j3 x; B1 m
    }
* G% l( \( ?5 |2 E  s}
; G1 L7 N7 h9 |& O* _) A" M6 t% n. r# t/ l; y# }* y4 @3 e. o
</code></pre>
7 R( Z1 B2 x$ R( u* m4 A# R<p><strong>drop方法里面用了两个循环</strong>,而不是在一个循环里做完两件事?</p>
+ K0 D: \. D  B; O3 [& R<pre><code>for w in self.workers.iter_mut() {% V: S7 o$ \9 z! N
    if let Some(t) = w.t.take() {9 i+ K& w/ q3 z- d
        self.sender.send(Message::ByeBye).unwrap();
& [' D5 A' D- A6 m. w% b        t.join().unwrap();) h+ z- Q/ T# b, ^; [+ Z4 ]. _
    }
" U# c" G7 P3 ?0 O; z! A; `4 m}
+ x/ `8 S: K" l! n! V7 A/ G# |. T( b
</code></pre>
$ @2 E& K8 q# o6 z<p>这里面隐藏了一个会造成死锁的陷阱,比如两个Worker, 在单个循环里面迭代所有Worker,再将终止信息发送给通道后,直接调用join,<br>
% m: M+ @, |$ |! s0 P* M4 Y我们预期是第一个worker要收到消息,并且等他执行完。当情况可能是第二个worker获取到了消息,第一个worker没有获取到,那接下来的join就会阻塞造成死锁。</p>
8 ~2 ~' K1 J2 U5 f! J1 Y<p><strong>注意到没有,Worker是被包装在Option内的</strong>,这里有两个点需要注意</p>
  s( f7 ~" {  j# P5 d<ol>2 @9 i0 P" u; w5 ^; f' Z8 _) y
<li>t.join 需要持有t的所有权</li>  C: f% E2 h' O. K; ^
<li>在我们这种情况下,self.workers只能作为引用被for循环迭代。</li># W3 ?0 D) I/ k) d) x7 J; _
</ol>/ [+ E8 j) i* h( F
<p>这里考虑让Worker持有<code>Option&lt;JoinHandle&lt;()&gt;&gt;</code>,后续可以通过在Option上调用take方法将Some变体的值移出来,并在原来的位置留下None变体。<br>
0 a. `# d) u8 L1 h" q; g换而言之,让运行中的worker持有Some的变体,清理worker时,可以使用None替换掉Some,从而让Worker失去可以运行的线程</p>4 V7 [$ T7 M( E9 N
<pre><code>struct Worker where* W! V* ~  H7 s' i
{* L. U# c' a+ Y
    _id: usize,
6 j" [$ H7 E; @) k# P    t: Option&lt;JoinHandle&lt;()&gt;&gt;,, U/ _3 B  ~( p4 R: X7 G
}
5 M2 x, b% w) |- ~4 H</code></pre>
0 _7 [! w) J! y- ]<h1 id="要点总结">要点总结</h1>  @4 d: z$ f$ C- T
<ul>
0 F' i  G' H5 j$ F" H<li>Mutex依赖于生命周期管理锁的释放,使用的时候需要注意是否逾期持有锁</li>
# x( l8 W" G& G, J' o<li><code>Vec&lt;Option&lt;T&gt;&gt;</code> 可以解决某些情况下需要T所有权的场景</li>
0 e4 V$ p3 w: U/ j; h  N: o! m</ul>/ n1 E0 z; z! G$ W& S, P  S( e& p6 V8 g
<h1 id="完整代码">完整代码</h1>
! v; y6 Z2 z# }<pre><code>use std::thread::{self, JoinHandle};
/ k, r2 K, E( F6 puse std::sync::{Arc, mpsc, Mutex};
8 C0 Y* D- R$ H7 f& i! o( b% m- B
5 f. a2 T: H& D. U
" }" L' Z1 ~) X* M0 ?1 Ttype Job = Box&lt;dyn FnOnce() + 'static + Send&gt;;
8 F% Y: R8 d6 Z  d5 o7 Ienum Message {# e7 x' x9 z' D
    ByeBye,: {# @2 d/ n' F
    NewJob(Job),5 y/ d/ D& `  N
}# ^6 P3 H2 \% J/ l) I% R
- H2 _) q* Z" ~1 _4 G
struct Worker where
, A  W& L- T2 R. T2 k( z% m8 `8 O{" n' L+ o3 c1 Z. N" Y
    _id: usize,
7 ]8 k  D4 N) S    t: Option&lt;JoinHandle&lt;()&gt;&gt;,) {, P- o/ W! v, f* U/ R
}
0 f/ g- r8 V9 M
7 D: t( z/ j% U/ Aimpl Worker
& _$ a" `  q* }/ o+ ], R{3 U# w% s$ s' s% N4 ]; J) x
    fn new(id: usize, receiver: Arc::&lt;Mutex&lt;mpsc::Receiver&lt;Message&gt;&gt;&gt;) -&gt; Worker {3 c4 Q, S# \. [* z" A5 |
        let t = thread::spawn( move || {) L. D5 U; b4 ^# F
            loop {
/ x6 _- ^  Z$ l5 {/ |                let message = receiver.lock().unwrap().recv().unwrap();9 t& g/ n1 u& i/ l; y
                match message {( J1 D7 h: j: |
                    Message::NewJob(job) =&gt; {
6 s' ?0 i3 {* r2 y1 a7 O                        println!("do job from worker[{}]", id);
% n4 t  O" `5 U( X; v                        job();3 [0 g9 V. o/ K  T$ r1 r
                    },
& J3 i  s$ n9 z4 w" A8 z( f2 U  ~                    Message::ByeBye =&gt; {
# C, `" Q0 t% J  M; D                        println!("ByeBye from worker[{}]", id);) e5 U7 e& }0 E% p) j; q- A
                        break
7 o, O8 ^# M- m: B. K) E1 r/ c                    },
9 p: s7 {3 O7 c# d, u3 r7 T0 C  h                }  
9 V" G4 p2 [# `% }' j4 F            }5 T3 }' }2 h1 T! ]. m" U: k( ], S7 n  e8 ^
        });
+ k, [/ s0 Y, V  E: L3 n% |
& }, H; i% c; H5 r$ i% r        Worker {0 ^; _5 s8 |9 j
            _id: id,
: Q! R1 r$ O2 y6 t* P7 t" b3 }            t: Some(t),) ~; m# S3 {8 [7 S3 \
        }
7 s3 X' u) E  M, p+ E    }  R4 u3 }; U( i7 u9 P9 b4 O
}
; {0 d6 z% e2 R4 y; o: U; G& U' H: L) a! H1 {( Q0 j
pub struct Pool {
& P2 t. P, c/ R7 Y    workers: Vec&lt;Worker&gt;,
0 k9 {+ ^/ w# z3 G9 W5 T    max_workers: usize,
- q  z6 c" i& l" ^    sender: mpsc::Sender&lt;Message&gt;
% @$ o7 i. G( S}
- g) b: I0 t/ Z& u- \
7 J4 T6 a$ ^9 ?/ \8 Nimpl Pool where {! a8 g; v  `6 A
    pub fn new(max_workers: usize) -&gt; Pool {$ g. X) f# U- j. }7 C
        if max_workers == 0 {2 b! @8 r" S. S0 `% o
            panic!("max_workers must be greater than zero!")
, x1 z- e; \& b2 F        }
' ]  G6 e( ~0 a; ?1 `: X5 j        let (tx, rx) = mpsc::channel();
2 F: g% _( L- v, f
) o% R! b9 x; W" b8 Z        let mut workers = Vec::with_capacity(max_workers);
1 F% j/ L5 e- `/ I9 ~! T7 r4 k        let receiver = Arc::new(Mutex::new(rx));
5 K, n5 v% j9 S! w# C! x        for i in 0..max_workers {
" o2 ^4 w- c& s. w# J: j! ~' n, l            workers.push(Worker::new(i, Arc::clone(&amp;receiver)));
$ Q; H( r0 t9 s' b; O        }
+ U- i9 U( T6 _' }# C& ~6 f3 o! E7 I6 n! e# O
        Pool { workers: workers, max_workers: max_workers, sender: tx }
2 p" v4 f/ [4 y7 C! N8 P1 q    }! ^& i+ {! d  N# D
    8 s6 n. }6 ^; s9 |2 U1 S
    pub fn execute&lt;F&gt;(&amp;self, f:F) where F: FnOnce() + 'static + Send1 b0 _/ n& b8 z( n
    {, t4 ]& _# ?" Q5 N; L

0 }, ?/ r8 T& m! y* D        let job = Message::NewJob(Box::new(f));9 e$ z6 y+ G: i2 S5 I
        self.sender.send(job).unwrap();: Q. Q$ Z+ B8 C8 b* M+ j
    }
' U9 ^8 P, c+ t: X" c' c. F9 \}
) A& I: c- G( w( E" Z/ v( J$ P$ ^' O. X, S$ C- T1 X8 }. |7 b
impl Drop for Pool {# s2 Y1 z  I; Y! {+ f! {; }
    fn drop(&amp;mut self) {2 i& c% u+ p2 `4 W3 F
        for _ in 0..self.max_workers {: g% M* v$ y, `
            self.sender.send(Message::ByeBye).unwrap();
5 C! l! d2 M6 u3 @- _        }9 w  N8 C9 r) _) ~6 y3 [
        for w in self.workers {
  a+ w) K, A7 K% Z8 N- Y            if let Some(t) = w.t.take() {
  {) b. B# I/ h& F3 ?- P; {# m                t.join().unwrap();
1 t5 m& X& H, K, |" s            }
/ U6 w9 }/ n  }" c7 ~8 H        }
; D  h5 F5 \6 v$ J    }
, [. c# S1 |1 B! ?}
6 J$ q( x3 ?4 y% |! b/ P8 @' R8 y! e0 _
0 }7 l! ], y3 C( X: W
#[cfg(test)]5 _0 l" O" D* V6 ^* p' I$ ~
mod tests {
* d& ~+ V" @  g- R4 l+ a# C/ N    use super::*;
* v2 s% X3 M2 ]$ A    #[test]
) C$ U8 z0 {/ E& h. H9 e: }    fn it_works() {& d0 Q, D  P' s* G  X4 ?2 S) ^/ b
        let p = Pool::new(4);" c4 d1 B8 y) q5 O: y) U+ e7 Y
        p.execute(|| println!("do new job1"));2 W4 L2 \3 R" A1 K# B+ U
        p.execute(|| println!("do new job2"));
3 d% l( X/ a  C2 e, u        p.execute(|| println!("do new job3"));- g6 e' y( ]+ X$ Z0 t
        p.execute(|| println!("do new job4"));
3 U: x# M; }" H    }
* J) A4 t) P2 R0 ?! Z" ?; M9 S}( q+ o( ]- K( K- n7 q9 o
</code></pre># p0 i  H) \/ {. O

3 J5 w; H1 T& _* Y1 \3 u5 A0 l
回复

使用道具 举报

懒得打字嘛,点击右侧快捷回复 【右侧内容,后台自定义】
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

手机版|飞雪团队

GMT+8, 2025-11-30 05:38 , Processed in 0.067629 second(s), 22 queries , Gzip On.

Powered by Discuz! X3.4

Copyright © 2001-2021, Tencent Cloud.

快速回复 返回顶部 返回列表