飞雪团队

 找回密码
 立即注册
搜索
热搜: 活动 交友 discuz
查看: 6448|回复: 0

rust 实战 - 实现一个线程工作池 ThreadPool

[复制链接]

5744

主题

5832

帖子

1万

积分

管理员

Rank: 9Rank: 9Rank: 9

积分
19556
发表于 2022-2-12 14:35:42 | 显示全部楼层 |阅读模式
% l' |! ?% \+ f4 f
<h1 id="如何实现一个线程池">如何实现一个线程池</h1>
' D3 p+ X  Q( N9 d8 {) N8 y* f' b<p>线程池:一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。 例如,对于计算密集型任务,线程数一般取cpu数量+2比较合适,线程数过多会导致额外的线程切换开销。</p>
$ X" a1 {  \( R$ j) y5 e, J+ E; I<p>如何定义线程池Pool呢,首先最大线程数量肯定要作为线程池的一个属性,并且在new Pool时创建指定的线程。</p>+ W1 c: |. X8 y
<p>线程池Pool</p>% a  i, I' X- \/ A6 }4 u
<pre><code>pub struct Pool {
- w& O2 R! d7 q3 `" z: q5 m+ {- o  max_workers: usize, // 定义最大线程数" D; A+ C" w3 D3 \$ j
}
) [' _5 a* F% J5 \+ w2 I5 s( L, s* t8 U) f0 \! R
impl Pool {
) e6 ^" z# g" M3 s  fn new(max_workers: usize) -&gt; Pool {}
( V/ R- \6 G1 X9 ]7 `+ N( c  fn execute&lt;F&gt;(&amp;self, f:F) where F: FnOnce() + 'static + Send {}, Y: z& @6 }1 R4 [4 _$ F
}
+ Q) G8 X+ `# w# y: K# b# n# W0 b3 z8 h% }
</code></pre>
4 j, p" [; T1 o0 J; a+ q# w<p>用<code>execute</code>来执行任务,<code>F: FnOnce() + 'static + Send</code> 是使用thread::spawn线程执行需要满足的trait, 代表F是一个能在线程里执行的闭包函数。</p>
6 ?8 @4 e/ g- j/ r2 M<p>另一点自然而然会想到在Pool添加一个线程数组, 这个线程数组就是用来执行任务的。比如<code>Vec&lt;Thread&gt;</code> balabala。这里的线程是活的,是一个个不断接受任务然后执行的实体。<br>% i6 l6 V' [( C. V$ d  }2 K& E  f
可以看作在一个线程里不断执行获取任务并执行的Worker。</p>
; ]/ |5 L8 m/ t<pre><code>struct Worker where
8 C# c7 T! r( C{
# [" m! \1 i: o0 Z; F. ^    _id: usize, // worker 编号- z$ _1 p7 I1 i! T
}- |7 [1 }; t7 v
</code></pre>
3 g1 y1 E* T% Q3 {* ]' ]9 b2 Y: h<p>要怎么把任务发送给Worker执行呢?mpsc(multi producer single consumer) 多生产者单消费者可以满足我们的需求,<code>let (tx, rx) = mpsc::channel()</code> 可以获取到一对发送端和接收端。<br>% k. S2 h) h1 r' A) a  W
把发送端添加到Pool里面,把接收端添加到Worker里面。Pool通过channel将任务发送给多个worker消费执行。</p>4 Y1 m7 q- H9 p. e2 [' e
<p><strong>这里有一点需要特别注意,channel的接收端receiver需要安全的在多个线程间共享</strong>,因此需要用<code>Arc&lt;Mutex::&lt;T&gt;&gt;</code>来包裹起来,也就是用锁来解决并发冲突。</p>- b4 v0 r: v4 K+ j# z* m3 Q6 s
<p>Pool的完整定义</p>: b9 v8 g6 ~. \! M4 \
<pre><code>pub struct Pool {
, ~- S( I8 X' X* e+ ~, w; e! q5 y6 B    workers: Vec&lt;Worker&gt;,
& U) k2 b4 T# J& N8 \1 C; z2 P6 i    max_workers: usize,
# v1 X. a( q" o' q    sender: mpsc::Sender&lt;Message&gt;5 T" T* M. }% J6 [8 V+ R8 }9 Y/ C
}
4 L) R" L6 H( a; c7 I</code></pre>; z8 q' e1 A- j- E1 X# |% C+ V
<p>该是时候定义我们要发给Worker的消息Message了<br>' P% m1 U2 G3 M" b) w: [) x4 x' _4 z
定义如下的枚举值</p>
) v2 X9 w. L& Q2 b6 p<pre><code>type Job = Box&lt;dyn FnOnce() + 'static + Send&gt;;! g% Y2 y1 P) J' I
enum Message {  {; W1 s. |0 E- w
    ByeBye,$ t+ O: ~8 E8 e/ ~
    NewJob(Job),- d9 J& T" |) O" k& N
}
# c. p- `8 [# ~9 F4 f) ~( P</code></pre>
! W& }0 G% {* t<p>Job是一个要发送给Worker执行的闭包函数,这里ByeBye用来通知Worker可以终止当前的执行,退出线程。</p>
- M# Q  U6 t, @3 d9 y( Y2 E  m<p>只剩下实现Worker和Pool的具体逻辑了。</p>
! |( m) N0 r$ w* \3 N7 @) L<p>Worker的实现</p>+ y% e% p& v5 x# r
<pre><code>impl Worker
$ r1 q" ?$ v1 ]8 U/ A0 A{0 M$ {" C3 F% d' \- g
    fn new(id: usize, receiver: Arc::&lt;Mutex&lt;mpsc::Receiver&lt;Message&gt;&gt;&gt;) -&gt; Worker {4 z( b, Y' d% ^- B6 j* v
        let t = thread::spawn( move || {, \. s: u/ l$ [  j+ [
            loop {
; C2 b! r9 O% a5 J/ R- ?                let receiver = receiver.lock().unwrap();: D( E3 U6 f& z7 L* C& i3 I1 J
                let message=  receiver.recv().unwrap();0 f) W9 S% L+ Z9 h
                match message {$ Y* P. d% f$ B/ w1 W
                    Message::NewJob(job) =&gt; {
7 c6 f3 ]) h3 N* b$ }+ T                        println!("do job from worker[{}]", id);
5 S6 b0 \% N( T$ T9 K; y                        job();
- v; k5 O  a! z" Q                    },4 E9 s6 a6 Z* Y9 [, j' a
                    Message::ByeBye =&gt; {
. x: X& |! w+ h  F: S0 K                        println!("ByeBye from worker[{}]", id);
" x3 z( i; j( U; {) P$ V# B+ D                        break2 [/ t8 p$ P, L4 I/ P& s5 p
                    },
8 |2 w: t8 s: [                }  
9 q. d7 ^; c( P; X' r            }0 a$ ~, w# s0 L( h$ g- Y4 ^
        });* E# I; g% ]& D/ i0 l
1 b3 v& b: [& A# ?6 `
        Worker {
1 i# `" Z/ c6 z  a6 x' |            _id: id,
0 `/ o/ j2 Y/ t+ J6 S+ I. @2 z            t: Some(t),
% z: H, L2 D* @# a        }
. y! @# `* F  L# l    }. d, X& l2 z/ H' H( ~
}; z0 D1 u' l- V* g
</code></pre>
' @0 }* f$ F4 m% o+ B  W/ ], M2 x<p><strong>let message = receiver.lock().unwrap().recv().unwrap();</strong> 这里获取锁后从receiver获取到消息体,然后let message结束后rust的生命周期会自动释放掉锁。<br>8 g/ M8 t/ j5 n( J# A4 r1 O6 t
但如果写成</p>
. f" E; o" g. h6 G<pre><code>while let message = receiver.lock().unwrap().recv().unwrap() {2 I( V8 k: `* P/ k
};: x# W2 R& J' R; V0 i) X; b
</code></pre>
  B/ ^2 R6 \9 |, O; \- w: d$ Y( B<p>while let 后面整个括号都是一个作用域,要在这个作用域结束后,锁才会释放,比上面let message要锁定久时间。<br>3 I& E) h" x6 x* w" L3 k5 T3 i  g
rust的mutex锁没有对应的unlock方法,由mutex的生命周期管理。</p>7 c. E& a' g' I0 c* d3 w0 d6 {, h
<p>我们给Pool实现<code>Drop</code> trait, 让Pool被销毁时,自动暂停掉worker线程的执行。</p>' T9 ^9 S, O2 U
<pre><code>impl Drop for Pool {* W% E, s; a: e+ y% o" J& r5 R
    fn drop(&amp;mut self) {
: _, Q/ m% s! x. f        for _ in 0..self.max_workers {
  j  A% B: ?1 Y1 b9 U            self.sender.send(Message::ByeBye).unwrap();
+ b) q* J/ T- |6 f" N+ ]        }
3 x  `% H8 g$ L+ @9 A" D        for w in self.workers.iter_mut() {9 I  Z- e$ ^* c$ N
            if let Some(t) = w.t.take() {* [/ p' \8 i4 [  A. D6 m/ W, u
                t.join().unwrap();# C. o! f5 A. x3 r& c
            }2 {7 s! {0 z5 @& @# [  {" q1 o
        }
/ s+ b& X; s4 [' P    }
% [+ W1 I6 d5 j; N6 }}
# w" [3 {% B& V) h* ]6 I: V& r. a, ?: f; d  ?
</code></pre>2 z- j; o! \+ K- ]9 e; y
<p><strong>drop方法里面用了两个循环</strong>,而不是在一个循环里做完两件事?</p>1 P5 l1 z. O' `- O
<pre><code>for w in self.workers.iter_mut() {+ u3 Z. [; C. n. R0 z
    if let Some(t) = w.t.take() {
9 E8 a* X# X5 a0 \9 ]* {        self.sender.send(Message::ByeBye).unwrap();( e- U5 U7 I$ w+ E3 G  D& w# M& q
        t.join().unwrap();! k9 ^0 O7 T! A3 Z6 a
    }5 o, }/ }& h' W% B
}2 p2 `  ~1 I% }

. B3 ]! j4 N% ~( g" s* Z% W</code></pre>
4 T* Y, q! Q8 k% |, X7 Z! `$ r<p>这里面隐藏了一个会造成死锁的陷阱,比如两个Worker, 在单个循环里面迭代所有Worker,再将终止信息发送给通道后,直接调用join,<br>
% L4 @* ~" _" |1 ]1 X4 k我们预期是第一个worker要收到消息,并且等他执行完。当情况可能是第二个worker获取到了消息,第一个worker没有获取到,那接下来的join就会阻塞造成死锁。</p>! W+ p6 b) f1 s9 s& e0 p
<p><strong>注意到没有,Worker是被包装在Option内的</strong>,这里有两个点需要注意</p>
# N+ P' b7 n! T1 ]4 O; G+ o3 l$ [<ol>" z# y4 \$ F5 P% W' F4 j. F
<li>t.join 需要持有t的所有权</li>
2 I; g' V8 \  |" S* F<li>在我们这种情况下,self.workers只能作为引用被for循环迭代。</li>
& [" ~, y8 j) M</ol># q# ^7 U: ?9 q! v" J/ f
<p>这里考虑让Worker持有<code>Option&lt;JoinHandle&lt;()&gt;&gt;</code>,后续可以通过在Option上调用take方法将Some变体的值移出来,并在原来的位置留下None变体。<br># o+ j  P4 D. `5 R7 K0 B
换而言之,让运行中的worker持有Some的变体,清理worker时,可以使用None替换掉Some,从而让Worker失去可以运行的线程</p>$ j# g6 C, F( X
<pre><code>struct Worker where) J. L6 m  i$ e3 t5 @
{) b" B5 j& m4 x4 J" ?
    _id: usize,5 A4 p5 {- i. A8 f- h: F* g
    t: Option&lt;JoinHandle&lt;()&gt;&gt;,. \- T5 T2 Y6 }% ?9 i( Q2 c8 N, [
}
' O& W! H" F5 ^7 g2 N) F+ I</code></pre>
% U* ]7 Q1 `+ v9 p# j<h1 id="要点总结">要点总结</h1>3 F  {4 y1 O. I8 |5 w, \! ^: q
<ul>* M$ c9 I6 q. {- Z$ K1 D9 [
<li>Mutex依赖于生命周期管理锁的释放,使用的时候需要注意是否逾期持有锁</li>1 S% \) L/ t) ^& v5 M8 M  o
<li><code>Vec&lt;Option&lt;T&gt;&gt;</code> 可以解决某些情况下需要T所有权的场景</li>8 D9 Z* l6 t* a; x3 f
</ul>
5 F- ~4 n' r: j% m* ?% q8 d( M<h1 id="完整代码">完整代码</h1>) r+ H# V/ M7 C' B+ C- R1 q
<pre><code>use std::thread::{self, JoinHandle};
! w; H. P# V5 v; X8 e  yuse std::sync::{Arc, mpsc, Mutex};- k! l2 E8 c3 Q4 A' e' \
1 A! M# ]0 i4 V5 F7 F% A

. V6 s, e/ J  f+ x2 p* ~% ^type Job = Box&lt;dyn FnOnce() + 'static + Send&gt;;1 b% E  I& Z* K' J- R4 c) m
enum Message {
4 g; T, i7 t. o( k* ?2 y2 l    ByeBye,$ ^5 j5 x% p6 }# J
    NewJob(Job),
) G3 }2 P4 t, |' |, [  C1 M0 H" Y}
: S/ N' f/ E! \/ N! o: d8 a; \& v. R) J+ C+ j* U- Q8 u  g
struct Worker where
% E3 |: B4 v) P' \* S9 j& @! p{7 Q1 M$ z$ K9 B% v
    _id: usize,1 _  K" h! D/ r- m5 O
    t: Option&lt;JoinHandle&lt;()&gt;&gt;,
) a. I' \9 c% N! T# O3 ^: D( W}3 ^. k) q* n% q* o8 M

# ?1 F5 F1 G3 o- a/ }impl Worker5 J% y9 |: O5 s! y: R8 ]0 T
{
3 H& F* Z( D4 p' W% N; A1 O    fn new(id: usize, receiver: Arc::&lt;Mutex&lt;mpsc::Receiver&lt;Message&gt;&gt;&gt;) -&gt; Worker {
4 F4 i+ _4 h& u. W# J" F        let t = thread::spawn( move || {
# U4 O6 o' C- |- g* o- r            loop {( E$ q7 T+ R+ f: u
                let message = receiver.lock().unwrap().recv().unwrap();
3 f0 k9 O7 Z' |# k' I/ P                match message {9 G1 L. i9 K/ W' a2 O
                    Message::NewJob(job) =&gt; {
2 J4 z, g. q" |) c; h& C  ?* ^$ S5 x6 ?                        println!("do job from worker[{}]", id);, v$ b7 D% R  Y( J8 R# N0 S
                        job();
1 i) ?* y9 k9 b/ s3 `9 j" \7 s- u                    },
8 t- p' R# D. _) @3 c                    Message::ByeBye =&gt; {
/ G0 i6 c4 _: V+ ~                        println!("ByeBye from worker[{}]", id);
6 t! j3 N% [5 \$ H" G  w/ a                        break& o4 m5 C# Z! P! a6 i
                    },
9 W3 v: P$ ]$ L: t                }  $ V/ G; f- v2 m$ O
            }
( \3 w1 [" G1 s- ^0 g        });
; K, A$ G. J9 S! R$ w
/ x2 F1 v( i* u( o6 m+ U9 e        Worker {) G; w+ [4 ~3 j
            _id: id,7 Z$ w7 k8 p& [$ |
            t: Some(t),
# m0 m9 U2 }  q: X) n9 t9 ^0 W; P        }
0 y  ], c, Y9 j    }3 H" j" F$ O2 y0 i
}
# ]9 [0 v. v# d  v8 \  y7 s# i9 ]0 v0 E4 Y1 E' ?2 [
pub struct Pool {+ d' T9 [  R/ v9 N2 x' }0 a
    workers: Vec&lt;Worker&gt;,% ~. l9 ^8 X; V+ U6 {
    max_workers: usize,) ]2 t' A7 @) h4 \
    sender: mpsc::Sender&lt;Message&gt;
  q( m7 k6 z0 u7 `2 m; e}
/ }6 v% H! p% z7 F& B& _& W3 O& j7 {2 F- a, T
impl Pool where {6 P6 p8 q- y0 b: X
    pub fn new(max_workers: usize) -&gt; Pool {3 e" ?5 e& p. G1 K) Z7 B; \0 a
        if max_workers == 0 {
7 I5 W/ P3 ^* b1 r3 p            panic!("max_workers must be greater than zero!")
5 _6 x) N# }. o0 Q0 T        }
) ^9 P) [3 F( J% o2 l/ X8 |        let (tx, rx) = mpsc::channel();9 ]* A1 J; s9 g$ X

$ ?$ |) o! h! \' q' r        let mut workers = Vec::with_capacity(max_workers);
6 s6 m+ x& q, T& H( W- ]        let receiver = Arc::new(Mutex::new(rx));! c  X0 i. k" N* X2 ~
        for i in 0..max_workers {% h, t+ B0 }3 J. q6 z
            workers.push(Worker::new(i, Arc::clone(&amp;receiver)));
$ z6 i6 F, a4 l% y        }/ G% O! ~& ?( m5 n0 O" J

) v* T% Y" R* @  G+ q6 S        Pool { workers: workers, max_workers: max_workers, sender: tx }: T$ e4 V  u  A7 x3 n
    }! W8 O) e9 ]1 o3 S, P7 q
   
) |. i; r& P! E  {4 @7 I    pub fn execute&lt;F&gt;(&amp;self, f:F) where F: FnOnce() + 'static + Send
5 ], T. x$ _, V6 Q+ a    {
* w) d* j& P5 a7 y6 I
& {1 X: [, [; U0 u/ g$ u. d        let job = Message::NewJob(Box::new(f));9 |. R; d) I$ D4 v3 c
        self.sender.send(job).unwrap();
( e1 y# W! L- }. {6 o    }7 Q0 A6 m. ]+ ~, h, q
}7 F& A3 k8 q& P5 O! Q; h5 O6 [- P

: s) d0 G5 s( |7 _3 m( r+ fimpl Drop for Pool {, C! m% m7 s2 q
    fn drop(&amp;mut self) {* c% z2 J" l0 ~5 v2 d0 d: m
        for _ in 0..self.max_workers {
6 N  Y" M% O) R: ]" [/ f            self.sender.send(Message::ByeBye).unwrap();
; T/ ]' ?& ^6 N0 D! L3 q        }1 j! U' V% y; n, e/ [
        for w in self.workers {
0 N4 _1 u. G4 v' M9 j            if let Some(t) = w.t.take() {8 \$ p0 }7 {) W
                t.join().unwrap();! g: N( N4 k: J- M
            }; y. L2 j' N8 e9 L9 N6 }1 }
        }9 s2 h" h( f9 @$ v# c* X- F
    }
2 h* o" O+ {* h}( x3 S. d  c6 q$ R
! V5 N  K* H9 S1 g4 {- V
9 z1 O+ B. p/ c+ `$ G8 S/ l: G
#[cfg(test)]2 N0 `" F# X6 d: A# a& l
mod tests {5 R# f& @. Z% j& V
    use super::*;
& `" ]: E, b- I' d- N& W! ^7 _    #[test]% X9 r) _5 q. g/ Z) B& {
    fn it_works() {  m4 F7 {( H% D; T" G
        let p = Pool::new(4);
: D! ~8 K8 \2 I1 j. W- k0 s6 E& b        p.execute(|| println!("do new job1"));/ ?; M$ @2 G; n; B: }+ p0 `+ G
        p.execute(|| println!("do new job2"));4 ~) j. M0 B4 G! s( p4 l
        p.execute(|| println!("do new job3"));
  Q6 M: j3 Z0 m! X% D3 D, A: Y        p.execute(|| println!("do new job4"));3 y- j- T/ R" {  H: _8 x: t9 l
    }! G7 y2 N' E) E( P& w# h
}
1 p5 r. M6 j5 ], r! W0 y</code></pre># H) W! \! O) s/ a+ E2 b

7 t( r; b+ m: r! G
回复

使用道具 举报

懒得打字嘛,点击右侧快捷回复 【右侧内容,后台自定义】
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

手机版|飞雪团队

GMT+8, 2025-1-25 12:03 , Processed in 0.069314 second(s), 21 queries , Gzip On.

Powered by Discuz! X3.4

Copyright © 2001-2021, Tencent Cloud.

快速回复 返回顶部 返回列表