飞雪团队

 找回密码
 立即注册
搜索
热搜: 活动 交友 discuz
查看: 12376|回复: 0

rust 实战 - 实现一个线程工作池 ThreadPool

[复制链接]

7587

主题

7675

帖子

2万

积分

管理员

Rank: 9Rank: 9Rank: 9

积分
25091
发表于 2022-2-12 14:35:42 | 显示全部楼层 |阅读模式
' K% r; D; o! u1 }! \4 E
<h1 id="如何实现一个线程池">如何实现一个线程池</h1>
% n4 n  l7 s6 M8 p6 S1 t2 |+ Q7 J<p>线程池:一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。 例如,对于计算密集型任务,线程数一般取cpu数量+2比较合适,线程数过多会导致额外的线程切换开销。</p>5 G# R2 M3 [( W! t
<p>如何定义线程池Pool呢,首先最大线程数量肯定要作为线程池的一个属性,并且在new Pool时创建指定的线程。</p>' X# Q8 q% S- N7 F
<p>线程池Pool</p>
) x% i7 D) ~. ^: ]8 E<pre><code>pub struct Pool {
; |+ k8 q7 R) i+ H( h6 O  max_workers: usize, // 定义最大线程数) u0 T9 {, Q7 J: n3 e8 E
}
# H; @& V' C" N6 k) e/ w9 v
# q! e* [' z' |$ e% `+ Y) m( G0 mimpl Pool {
" c! S4 _2 K7 T" Y- g  fn new(max_workers: usize) -&gt; Pool {}1 f( S' x- b# X6 _* O$ [$ p
  fn execute&lt;F&gt;(&amp;self, f:F) where F: FnOnce() + 'static + Send {}' Q( z8 S6 P1 r0 k" l
}8 J) C/ G6 n8 U( h

' B3 d) O3 {6 a/ C8 U2 S" s+ {</code></pre>7 O1 @0 W! F  ?$ [9 r) L2 c, B0 ~% D- X
<p>用<code>execute</code>来执行任务,<code>F: FnOnce() + 'static + Send</code> 是使用thread::spawn线程执行需要满足的trait, 代表F是一个能在线程里执行的闭包函数。</p>- G9 h. @+ h  p- K3 C! O
<p>另一点自然而然会想到在Pool添加一个线程数组, 这个线程数组就是用来执行任务的。比如<code>Vec&lt;Thread&gt;</code> balabala。这里的线程是活的,是一个个不断接受任务然后执行的实体。<br>2 [" q  U' D2 I6 r; C6 u' s
可以看作在一个线程里不断执行获取任务并执行的Worker。</p>
) R$ F+ G  A4 z4 G6 v- N& g' \. i<pre><code>struct Worker where% t. S+ |% L5 b" I9 d7 U! D
{
! x$ U" A# Y, P  L    _id: usize, // worker 编号
' V1 p: {$ E, ~( _% [( L  e}
( f- t3 \# q) ^$ O+ D</code></pre>  T) N+ t: W5 J/ ~
<p>要怎么把任务发送给Worker执行呢?mpsc(multi producer single consumer) 多生产者单消费者可以满足我们的需求,<code>let (tx, rx) = mpsc::channel()</code> 可以获取到一对发送端和接收端。<br>
$ Q* X) |  T0 t, c把发送端添加到Pool里面,把接收端添加到Worker里面。Pool通过channel将任务发送给多个worker消费执行。</p>! ?# ^- v; h; t# J! P/ z
<p><strong>这里有一点需要特别注意,channel的接收端receiver需要安全的在多个线程间共享</strong>,因此需要用<code>Arc&lt;Mutex::&lt;T&gt;&gt;</code>来包裹起来,也就是用锁来解决并发冲突。</p>0 |! ]1 O0 u& j) Q+ \% u! H
<p>Pool的完整定义</p>
/ [! q3 e! h6 v+ p7 c% B<pre><code>pub struct Pool {
5 k, I+ o6 J$ k& `    workers: Vec&lt;Worker&gt;,
3 X, o" \9 P4 X  [    max_workers: usize,
: f5 B# C0 \5 I    sender: mpsc::Sender&lt;Message&gt;( x6 U! d; H9 o* ^! }) \# D9 m2 k6 p
}
1 n# g% I5 ^% h! X( e* T$ i</code></pre>
  x* C2 ~; w- U) A0 W$ _/ M: H<p>该是时候定义我们要发给Worker的消息Message了<br>& P# l2 ^  l; Y& P8 n
定义如下的枚举值</p>& ~8 h+ U7 ^6 v) h0 P- T- {0 R
<pre><code>type Job = Box&lt;dyn FnOnce() + 'static + Send&gt;;
4 y9 I+ R  T, ]) B  W8 S4 @+ Eenum Message {
1 D$ V! K3 X7 h1 z: {4 g: l9 [    ByeBye,
$ k4 ~4 [9 a6 l2 q* W0 Z    NewJob(Job),
. @5 F7 h1 F( f) ?/ P& d8 Y, d, f4 z}. f+ O3 S# s5 u$ T8 s- c- v
</code></pre>( v4 N$ t/ a* }6 @' s$ K
<p>Job是一个要发送给Worker执行的闭包函数,这里ByeBye用来通知Worker可以终止当前的执行,退出线程。</p>: H0 f! J- @+ {3 d2 }$ h% f) ^
<p>只剩下实现Worker和Pool的具体逻辑了。</p>
. f# O& ~5 _8 J( R& a% a3 E<p>Worker的实现</p>
- Y9 M: H4 t$ X* Y2 e% [<pre><code>impl Worker* ]& l: U/ P& l' U! h& l1 g
{
  }4 N# c; P0 E    fn new(id: usize, receiver: Arc::&lt;Mutex&lt;mpsc::Receiver&lt;Message&gt;&gt;&gt;) -&gt; Worker {
) F9 m& N! C" R8 F        let t = thread::spawn( move || {
( S; P$ s2 ^5 q  a# R/ Z. }            loop {
0 s6 U- B! M- ]0 r  U5 b# C* W                let receiver = receiver.lock().unwrap();1 U4 i4 Q9 b/ I6 W
                let message=  receiver.recv().unwrap();  W$ m5 e2 h7 O3 b3 m
                match message {  i; P! v4 E$ v/ |: z! s$ C
                    Message::NewJob(job) =&gt; {
7 W! i# }: G1 N$ N. S                        println!("do job from worker[{}]", id);
" S8 l+ i3 N/ L                        job();8 }7 I% R; k9 o: z% C& i
                    },* ]: }3 z& ?- j; a; Y# E/ P% g
                    Message::ByeBye =&gt; {& n/ |: }2 n7 L8 k# P9 {, J
                        println!("ByeBye from worker[{}]", id);- r1 p7 p* A# b! Q  N% ~
                        break
) Z8 o* u' U. j  o; D& X% r                    },+ p/ I5 D6 `# c- }; Y& f; B- g
                }  0 d/ z2 @) k6 ^$ O, u' c' N6 O
            }. W* l" E- U& E3 x3 y
        });
4 S0 N3 P! k' f. U
* R' W: j8 ^& D/ l( H        Worker {5 X% F4 |. j( d* Y: j+ m1 G0 p3 S
            _id: id,
# W$ E8 y4 X9 R% v" k% R            t: Some(t),7 i, a. C- V/ [
        }
9 L" b& [# d) u* R: e2 b0 Z8 V    }
% L" A0 |( M; e4 R) k( S}* f' p8 b' r& N
</code></pre>
7 I# j; Y; W, L6 m  {& a6 X<p><strong>let message = receiver.lock().unwrap().recv().unwrap();</strong> 这里获取锁后从receiver获取到消息体,然后let message结束后rust的生命周期会自动释放掉锁。<br>" A6 F% a* V+ h2 Z7 i
但如果写成</p>( L/ ?8 Y4 ]9 h# n% S& p! n! z: X
<pre><code>while let message = receiver.lock().unwrap().recv().unwrap() {# w/ j! D4 q+ U1 u$ o/ N1 }
};
) U3 c  q, f8 y" O4 x</code></pre>
5 V( c0 `2 o1 u' B- r* ?<p>while let 后面整个括号都是一个作用域,要在这个作用域结束后,锁才会释放,比上面let message要锁定久时间。<br>3 A- c1 n1 @7 E2 Q' a0 F7 X
rust的mutex锁没有对应的unlock方法,由mutex的生命周期管理。</p>
# k8 J& l# P  a; j9 B; O<p>我们给Pool实现<code>Drop</code> trait, 让Pool被销毁时,自动暂停掉worker线程的执行。</p>. v$ K: C: M; A- _
<pre><code>impl Drop for Pool {
2 l' S) s) m; q- J5 N    fn drop(&amp;mut self) {* P( ?* ~4 G3 @- q
        for _ in 0..self.max_workers {$ d2 L7 T  {% ^* p( d( b8 X
            self.sender.send(Message::ByeBye).unwrap();5 p4 H* m2 v% g' n
        }% r7 p' M" @3 J9 N" h
        for w in self.workers.iter_mut() {! N* A& d5 D1 Z( S- h
            if let Some(t) = w.t.take() {: R; f0 D" [- R8 L$ t
                t.join().unwrap();& r- Y$ [  x! F; A  }" @4 E2 ^2 |
            }
" j" K5 M7 q2 X9 K2 ~7 g1 u        }
2 i% s( B8 @! v  D( ~( b$ Q    }" y4 S+ w0 g% w. i+ @; m* h. g- ^; L
}
6 U: z" ^, ~/ _7 t9 H6 E3 J  b9 R! a( i8 O' f
</code></pre>
, z3 G2 d4 S* @5 p2 g) R<p><strong>drop方法里面用了两个循环</strong>,而不是在一个循环里做完两件事?</p>. q( W1 L0 M: _
<pre><code>for w in self.workers.iter_mut() {# R: s* }" }$ z3 l1 E' E: G3 c
    if let Some(t) = w.t.take() {) ]' e$ B5 d; ]4 i4 ^
        self.sender.send(Message::ByeBye).unwrap();9 c1 k! }4 Y; Y, B
        t.join().unwrap();
; Z6 _0 J- g& d; N0 c    }
+ o( {% Q& Y, @+ a}% o0 C$ U6 W, j! r
0 [& p- @6 Q! j" d( E* C! \9 T
</code></pre>
3 q$ J" S# ^- n0 J<p>这里面隐藏了一个会造成死锁的陷阱,比如两个Worker, 在单个循环里面迭代所有Worker,再将终止信息发送给通道后,直接调用join,<br>
7 b" R  S' z# P* W8 e4 G我们预期是第一个worker要收到消息,并且等他执行完。当情况可能是第二个worker获取到了消息,第一个worker没有获取到,那接下来的join就会阻塞造成死锁。</p>
; _6 t2 i& W/ @5 p1 t<p><strong>注意到没有,Worker是被包装在Option内的</strong>,这里有两个点需要注意</p>! U  q/ G/ _2 o) h
<ol>
+ X$ \" _/ w: x7 A. K$ W<li>t.join 需要持有t的所有权</li>
0 Y' B* `3 O3 \" h( @/ M! a<li>在我们这种情况下,self.workers只能作为引用被for循环迭代。</li>2 V/ ~) U/ F! l$ x2 S. u
</ol>, Z' d7 t+ o6 W4 u" `
<p>这里考虑让Worker持有<code>Option&lt;JoinHandle&lt;()&gt;&gt;</code>,后续可以通过在Option上调用take方法将Some变体的值移出来,并在原来的位置留下None变体。<br>- w3 Q% n" g5 }; Z# w
换而言之,让运行中的worker持有Some的变体,清理worker时,可以使用None替换掉Some,从而让Worker失去可以运行的线程</p>
3 z5 U8 [. u) O: p+ D+ `<pre><code>struct Worker where" f( z! v! ?' r( n+ B& b- x
{
& x" t" I  P: w, f    _id: usize,; {5 x  [  X- y: N; V
    t: Option&lt;JoinHandle&lt;()&gt;&gt;,
  E( P: o3 l7 N: a, [. K}
& F0 G( x' F6 k9 d</code></pre>0 |3 q2 y, ~; ?7 e# H4 g
<h1 id="要点总结">要点总结</h1>$ _9 |9 q3 v/ p( z
<ul>
9 U! @- `$ t- j2 a2 _, ~<li>Mutex依赖于生命周期管理锁的释放,使用的时候需要注意是否逾期持有锁</li>
# V! e: C% ^2 j# I5 F! [6 u<li><code>Vec&lt;Option&lt;T&gt;&gt;</code> 可以解决某些情况下需要T所有权的场景</li>
' ]6 q1 d1 [0 U  |</ul>
! s9 ]6 c2 v; {% z( Y$ q<h1 id="完整代码">完整代码</h1>  m3 j# U  a2 P- u
<pre><code>use std::thread::{self, JoinHandle};
+ J' s. N0 Y( f' U# Guse std::sync::{Arc, mpsc, Mutex};
7 {% \6 m- G# U
! _/ Y2 j4 I. V3 |8 s/ X0 o8 I; M! [4 J- |- ?# N4 n
type Job = Box&lt;dyn FnOnce() + 'static + Send&gt;;+ ~  f! y+ q4 l+ U
enum Message {5 s& h( h: x' |& u6 y
    ByeBye,& _$ J  L: I6 ^1 ]' d" Z& c( w- s
    NewJob(Job),
6 g0 k& j' \1 k7 G5 I% j2 ~5 ~}
& P8 @) d7 N# _6 m2 R1 p
% k1 _% ?- _( ]' u- C: @' Mstruct Worker where/ q& [' P! w1 u+ ~/ L# D/ f& O
{4 I6 @! @+ f! x) u5 j0 x( y& v- z
    _id: usize,
, U  {& ~* y& d% p) A' u9 b    t: Option&lt;JoinHandle&lt;()&gt;&gt;,
. D4 n8 a4 [( ~* x; E5 k- Y}
5 y$ `* r5 g; r5 n" e- g- _, K; C' I
impl Worker% v- x6 J$ s9 @
{
3 j' c+ f$ a9 u2 Y0 y    fn new(id: usize, receiver: Arc::&lt;Mutex&lt;mpsc::Receiver&lt;Message&gt;&gt;&gt;) -&gt; Worker {
" U+ i' A$ G8 N2 Q4 g) c        let t = thread::spawn( move || {
" Y& }5 u4 ~7 |2 O, H            loop {
1 Z. @& b' @' h5 q4 ~                let message = receiver.lock().unwrap().recv().unwrap();
6 g5 s3 `& ~  q1 T5 a! r5 [                match message {
3 V' m. R: G3 v# O, ^. t9 C& o2 e                    Message::NewJob(job) =&gt; {
6 H2 N7 O' M5 `) @/ O                        println!("do job from worker[{}]", id);
6 L6 T5 R& P, ^' u                        job();
0 o  ~9 k& b3 K8 M7 _/ j                    },: u' L/ K0 ]3 r8 W) o1 }! @
                    Message::ByeBye =&gt; {
- N) a  a3 G; }% A4 q/ X                        println!("ByeBye from worker[{}]", id);! a1 s# ^$ w1 P1 @4 G
                        break! o7 V) }3 u4 u3 x' {  ?
                    },
: t" a/ b( o8 @, u$ U1 H7 k! m                }  
7 U  f2 F$ A! t0 W' m            }& Q% R/ d+ v6 Q, d; ~
        });" [: k6 G* y/ z4 s6 t+ T
2 q6 M9 {5 N- [; e* _
        Worker {: N6 q. N7 l% ^5 U( d  [
            _id: id,
; @  d7 J' }% J2 |0 Z! R) L            t: Some(t),# E7 S5 M9 H0 N
        }" }" E; j0 b, _& }9 ^/ ~6 [. j
    }
$ z' Q& h% V, n( A' T# C" Y}
+ K1 B6 B( B, v! g; O5 M9 ~! A6 J& ^
5 r  k4 z4 \, t! ]pub struct Pool {
! B# n) `6 u. T' F6 B, b/ `5 N6 ^    workers: Vec&lt;Worker&gt;,
: L! n7 m1 B4 S: Q( g0 E    max_workers: usize,, _! l: b' Q* P3 H6 ]
    sender: mpsc::Sender&lt;Message&gt;  c  h9 l) U. m) t
}
7 d( a7 J, M. \# F8 e
1 s4 Q4 [& e& {  p0 p/ {9 H$ Q2 aimpl Pool where {
% A8 @0 m' v* x1 k3 }; m8 e) O    pub fn new(max_workers: usize) -&gt; Pool {
8 S  [' X' \9 q        if max_workers == 0 {
2 g/ q! S) E# _) D+ p7 V+ i            panic!("max_workers must be greater than zero!")
, t( V- y1 F# J. p& }        }
, J# d# P$ e) h4 j) Y/ @' k) \0 L! x        let (tx, rx) = mpsc::channel();, \7 R5 ~. u/ ^: n& P. b2 m( G

/ C' z% }4 K/ C0 p8 L. r& {: ^        let mut workers = Vec::with_capacity(max_workers);
6 W( i7 y  s7 k        let receiver = Arc::new(Mutex::new(rx));6 g* z5 H( y( M$ X0 w
        for i in 0..max_workers {
0 z5 I5 R! K5 V9 @& b2 Y            workers.push(Worker::new(i, Arc::clone(&amp;receiver)));9 c& j1 M. u) u6 b% o
        }
. V+ K* {9 x  k) x. h; a
' R6 N# V, m' \9 R        Pool { workers: workers, max_workers: max_workers, sender: tx }
5 e' C2 J6 x" u1 f5 ?5 Q. A9 E    }* E, a  ~2 V9 j/ n: \! f' A$ a
    2 l- V0 v3 ~9 @: g' b- F+ \
    pub fn execute&lt;F&gt;(&amp;self, f:F) where F: FnOnce() + 'static + Send
$ H+ w! B3 b- H; p' R9 m    {, _# f" x, m* Q# r: q. ^
/ d3 r% O0 K4 y# ?1 i# T! i# F5 @
        let job = Message::NewJob(Box::new(f));& A& J9 o* {7 t% s, d8 v( M" t* R* T
        self.sender.send(job).unwrap();
* O7 A! \% A. J& J0 b( N    }8 \9 C  v! A  x. K2 G
}
5 o' J; X5 O- ?( r. x
& ~0 E, I1 O1 Oimpl Drop for Pool {0 h9 H& t6 m6 W; W: Z+ X
    fn drop(&amp;mut self) {6 Z$ h* w2 a1 u/ ~
        for _ in 0..self.max_workers {& R8 F$ z6 h* C
            self.sender.send(Message::ByeBye).unwrap();
4 t" w( ^2 l$ x. j/ v) V! F( P) O        }
0 O# j: o9 f- M: R' i. R1 m" J        for w in self.workers {
0 |; n: V. b( l' Z0 W            if let Some(t) = w.t.take() {
) f3 p1 c! b0 O; r8 H1 I                t.join().unwrap();
+ C. d7 ]) q. e0 z2 K) Y            }
' P/ J$ x* |8 ?$ C# F        }
& U$ a& v7 T4 Z: K/ m* ^    }
3 C0 S( w1 H3 s! a# D}5 o$ F  u! ?% G
% r* I8 g  p2 x- i' t- E4 s

- i# |0 F( k8 f, d% n5 v$ X#[cfg(test)]
8 C" f- R# n; M: [9 Z- rmod tests {6 n& F0 ^0 S& d: G- g* _
    use super::*;
5 Z# n, q0 ?9 B, A9 ^6 a    #[test]
) k8 U( F* d+ k! O    fn it_works() {8 e! a( s- e0 {  J. \% l7 O
        let p = Pool::new(4);
/ r  A4 |- D0 V! Q        p.execute(|| println!("do new job1"));
5 V# A, F/ M8 U9 p" a        p.execute(|| println!("do new job2"));" m0 V4 V7 Y& t, b$ q
        p.execute(|| println!("do new job3"));
( M9 T3 f3 [) x: K* B        p.execute(|| println!("do new job4"));! j1 h4 H7 ~$ X
    }/ h# k# Y) ?( e! L4 z8 ?) L
}
2 k4 S6 p6 D9 r# ?6 w+ s3 w7 r</code></pre>
* N+ Y* `$ }4 Y8 |4 Q7 }3 S8 ]2 w
; X8 Q  \& R; w# D9 X" {
回复

使用道具 举报

懒得打字嘛,点击右侧快捷回复 【右侧内容,后台自定义】
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

手机版|飞雪团队

GMT+8, 2025-9-19 09:31 , Processed in 0.070805 second(s), 21 queries , Gzip On.

Powered by Discuz! X3.4

Copyright © 2001-2021, Tencent Cloud.

快速回复 返回顶部 返回列表