飞雪团队

 找回密码
 立即注册
搜索
热搜: 活动 交友 discuz
查看: 14730|回复: 0

rust 实战 - 实现一个线程工作池 ThreadPool

[复制链接]

8042

主题

8130

帖子

2万

积分

管理员

Rank: 9Rank: 9Rank: 9

积分
26456
发表于 2022-2-12 14:35:42 | 显示全部楼层 |阅读模式
( L/ ^+ R' _# |$ x+ G( u' r
<h1 id="如何实现一个线程池">如何实现一个线程池</h1>) {# u( b2 i7 h
<p>线程池:一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。 例如,对于计算密集型任务,线程数一般取cpu数量+2比较合适,线程数过多会导致额外的线程切换开销。</p>) H2 P! J5 v( S- F9 g. Z9 ?& Z
<p>如何定义线程池Pool呢,首先最大线程数量肯定要作为线程池的一个属性,并且在new Pool时创建指定的线程。</p>
" I0 O8 ]; W: A7 j9 H& ?<p>线程池Pool</p>0 ?& j: E: i. B; ~7 Q
<pre><code>pub struct Pool {
2 h, A8 F1 f( y6 H  max_workers: usize, // 定义最大线程数9 S; G: l; O; G, G, k1 j
}# s" \: s* ]1 N9 D/ p2 ~4 W' Z/ }

+ q2 C* U* X! w$ c3 S3 S) ]- Mimpl Pool {& X3 l, [: C8 g& b5 [( {
  fn new(max_workers: usize) -&gt; Pool {}6 r' c5 M) A/ K: x2 F
  fn execute&lt;F&gt;(&amp;self, f:F) where F: FnOnce() + 'static + Send {}8 C% k# g: j6 o  Y' X
}
% v, l  H, z9 t/ ?& p
& m5 p% J, n/ a- c. j0 j3 L</code></pre>( \9 x$ Y. J5 V  I/ b( Q
<p>用<code>execute</code>来执行任务,<code>F: FnOnce() + 'static + Send</code> 是使用thread::spawn线程执行需要满足的trait, 代表F是一个能在线程里执行的闭包函数。</p>. V$ O8 H. {6 m
<p>另一点自然而然会想到在Pool添加一个线程数组, 这个线程数组就是用来执行任务的。比如<code>Vec&lt;Thread&gt;</code> balabala。这里的线程是活的,是一个个不断接受任务然后执行的实体。<br>
& G" g' c$ j7 F3 G  o" `2 S- N% _' ?/ o" ~可以看作在一个线程里不断执行获取任务并执行的Worker。</p>
% @* z8 _# @- _( o& g<pre><code>struct Worker where+ j2 [1 {% ?7 q6 X# `
{
; C& D2 n1 Z0 d; o  U* W2 J4 c    _id: usize, // worker 编号! i" ]- h7 C- c6 ~3 m$ K! P0 Y- Q; z
}
) g3 M; o2 y  x8 o. h</code></pre>& N" S! y$ c. b" z) \, k9 [
<p>要怎么把任务发送给Worker执行呢?mpsc(multi producer single consumer) 多生产者单消费者可以满足我们的需求,<code>let (tx, rx) = mpsc::channel()</code> 可以获取到一对发送端和接收端。<br>
! e6 U, G6 Z+ @把发送端添加到Pool里面,把接收端添加到Worker里面。Pool通过channel将任务发送给多个worker消费执行。</p>" C' D2 l" @9 v
<p><strong>这里有一点需要特别注意,channel的接收端receiver需要安全的在多个线程间共享</strong>,因此需要用<code>Arc&lt;Mutex::&lt;T&gt;&gt;</code>来包裹起来,也就是用锁来解决并发冲突。</p>4 e% Y" V& g. Q1 @3 D
<p>Pool的完整定义</p>. ^/ }! D$ ]" X5 l/ l' J8 Z
<pre><code>pub struct Pool {( a- K6 g/ |/ ], ~+ }' \8 z& S8 P% {5 x
    workers: Vec&lt;Worker&gt;,
) r2 q/ B/ }6 c6 b( `    max_workers: usize,% W8 w- L! n4 v# S; a+ @7 }
    sender: mpsc::Sender&lt;Message&gt;1 Q* {! B7 i9 {0 C0 X) T2 [
}% |% n3 B' P5 a9 G0 p. b3 q" ^
</code></pre>
% T+ ^9 C- ^" O( U# l9 p" y<p>该是时候定义我们要发给Worker的消息Message了<br>
. r) _: q3 X; S! _定义如下的枚举值</p>& D. u: s  f. G' X
<pre><code>type Job = Box&lt;dyn FnOnce() + 'static + Send&gt;;9 l+ h" @- W, l5 Q5 e  N( i
enum Message {
# k  V* z" m% p- A    ByeBye," e- p! v- {& @, u
    NewJob(Job),  O! M" }2 U8 \1 p
}; v  Z' W6 \) l  i3 I
</code></pre>
0 h) h" b- i4 z" l' T<p>Job是一个要发送给Worker执行的闭包函数,这里ByeBye用来通知Worker可以终止当前的执行,退出线程。</p>' g) T% D9 k9 T
<p>只剩下实现Worker和Pool的具体逻辑了。</p>: e- y0 @& [* Y% O4 F/ G* G
<p>Worker的实现</p>; U* U3 |! C( z5 `9 j
<pre><code>impl Worker
  @; ~; \9 I; r$ W{
' ~3 v* k5 R& Y+ n2 n    fn new(id: usize, receiver: Arc::&lt;Mutex&lt;mpsc::Receiver&lt;Message&gt;&gt;&gt;) -&gt; Worker {5 h# `; Z' R; G! l. Q% {9 q
        let t = thread::spawn( move || {: W, W% g  L0 D# D
            loop {8 U3 ^! I+ a) |* D) i* h# `, o8 }
                let receiver = receiver.lock().unwrap();
1 V2 W$ u7 N/ C                let message=  receiver.recv().unwrap();! B- q# @$ O( x  ~
                match message {* A4 ?! h, o) n' J/ T/ X( d9 u
                    Message::NewJob(job) =&gt; {
1 `7 ]. [: I( |1 A  I) Z) l8 c                        println!("do job from worker[{}]", id);* E: N" ]& B- n: k1 o. Q
                        job();8 P/ t# h7 X/ t4 m7 `/ ]  T
                    },2 ]3 ]1 O# a' i! |8 f. l
                    Message::ByeBye =&gt; {
1 H) y* |/ w4 s5 H4 Z" }# L4 C; ~! I                        println!("ByeBye from worker[{}]", id);
0 j  h1 n5 @* P% N) L" l7 ^                        break. Y6 d+ e' A' k$ }) U
                    },* ?/ v# S' T) T+ J4 @9 l
                }  
: o9 w2 L- r4 @4 }! l/ ]1 B) m" ]            }
: V' [. M/ q; Q6 J. c( k        });  j% `% W) y$ |$ d1 \4 c; }6 H) B: S

* ~  t# c. n6 _) V8 h        Worker {8 m0 x# [+ S: w; R/ W
            _id: id,% h9 D0 F4 |7 v& {, Q" V
            t: Some(t),6 Q% n1 Z2 H! F% e
        }
; ]1 Z1 h/ O3 N    }' v* E5 K6 T: N4 G3 c
}
( F, C) v, @& O5 x0 W; I8 X</code></pre>
; M7 X% I* z. m# y# ^  m( S<p><strong>let message = receiver.lock().unwrap().recv().unwrap();</strong> 这里获取锁后从receiver获取到消息体,然后let message结束后rust的生命周期会自动释放掉锁。<br>8 |0 |1 U2 H7 ^) q4 `; C8 F
但如果写成</p>7 v) U4 r9 V/ v4 j( L8 b7 R
<pre><code>while let message = receiver.lock().unwrap().recv().unwrap() {
% s$ u4 T. ^) p};
  q1 i3 p& w) J$ S6 T+ [! U</code></pre>
% G- u. W% I  d) `! u$ @5 v<p>while let 后面整个括号都是一个作用域,要在这个作用域结束后,锁才会释放,比上面let message要锁定久时间。<br>. L6 S( p" Z; I$ ^: ^7 y9 J; d% E( `! C
rust的mutex锁没有对应的unlock方法,由mutex的生命周期管理。</p>
7 _  C4 f: ]3 d/ a0 j1 |* A; R<p>我们给Pool实现<code>Drop</code> trait, 让Pool被销毁时,自动暂停掉worker线程的执行。</p>1 a' m2 A: x9 E2 v
<pre><code>impl Drop for Pool {
! L, |# d5 ~3 u! |- `+ k    fn drop(&amp;mut self) {- d5 b/ b0 B0 t( Q* c
        for _ in 0..self.max_workers {
3 I. z/ H% g! |/ T            self.sender.send(Message::ByeBye).unwrap();
0 L1 S! n) f1 y: r        }, q+ W1 O) s% K* l# J! r$ Z  u( C. f
        for w in self.workers.iter_mut() {6 V$ r& k  V- D4 N
            if let Some(t) = w.t.take() {# v8 ?. d1 e! R/ F9 U; F% l
                t.join().unwrap();
$ i) c& k* c  o, g, w2 d3 |            }4 J4 Z& o  T& O, j" V5 O3 g
        }
: I  X4 s8 T# H    }
! k! S1 r: E  N}( @) c* [0 q0 A& f+ k: {3 V

4 A# L. q& o. a$ b</code></pre>
& H% v* l3 `9 d% P; N<p><strong>drop方法里面用了两个循环</strong>,而不是在一个循环里做完两件事?</p>
, e# u) }4 ^1 s6 J$ E1 K) |<pre><code>for w in self.workers.iter_mut() {
9 P% V  ~8 G% X% l9 N' F5 g    if let Some(t) = w.t.take() {5 ?) o* \/ X5 z( ^
        self.sender.send(Message::ByeBye).unwrap();
) [& }% H5 C' Y% L! \        t.join().unwrap();/ h* e0 _% i7 V
    }  @+ W) Y' v4 |- j0 O: V* O* v
}
4 X) B5 P4 k/ O5 M6 q9 E% i* M7 w) l
</code></pre>
, F/ h8 l# \3 x, ?# p<p>这里面隐藏了一个会造成死锁的陷阱,比如两个Worker, 在单个循环里面迭代所有Worker,再将终止信息发送给通道后,直接调用join,<br>$ v0 W3 Y9 c2 Q' L0 U
我们预期是第一个worker要收到消息,并且等他执行完。当情况可能是第二个worker获取到了消息,第一个worker没有获取到,那接下来的join就会阻塞造成死锁。</p>
* v" s$ w/ T8 d. ]) g2 p9 {<p><strong>注意到没有,Worker是被包装在Option内的</strong>,这里有两个点需要注意</p>
3 ~5 k8 a- q: J4 R. W' |7 }<ol>. D" |( F* _! j4 P) U4 i( ?
<li>t.join 需要持有t的所有权</li># ]" X: {+ G% X! s* E; V1 ]$ m
<li>在我们这种情况下,self.workers只能作为引用被for循环迭代。</li># t  a' E4 B2 _$ i% Z' y
</ol>  S+ s7 E$ d  ^
<p>这里考虑让Worker持有<code>Option&lt;JoinHandle&lt;()&gt;&gt;</code>,后续可以通过在Option上调用take方法将Some变体的值移出来,并在原来的位置留下None变体。<br>1 c# A' |: S+ i8 c+ y  g& ^$ I
换而言之,让运行中的worker持有Some的变体,清理worker时,可以使用None替换掉Some,从而让Worker失去可以运行的线程</p>3 g3 i/ E, S) ^* ^% h
<pre><code>struct Worker where
2 u5 D9 {2 ^# u$ d1 W7 j7 B{
! f5 Y% l8 {/ @    _id: usize,/ C& Z8 z' b2 C
    t: Option&lt;JoinHandle&lt;()&gt;&gt;,* u$ P' C# m3 l) Y
}
. R% t0 {& f6 S4 `' F1 E  Y( ~) O- R</code></pre>& I2 ]  p7 p& u; \" S9 L- A; r7 F
<h1 id="要点总结">要点总结</h1>: \+ ^2 H' L$ k8 Y5 I
<ul>5 j) f: o$ p. G" D4 l
<li>Mutex依赖于生命周期管理锁的释放,使用的时候需要注意是否逾期持有锁</li>
' y7 v0 s% C; E3 H6 n3 b1 O<li><code>Vec&lt;Option&lt;T&gt;&gt;</code> 可以解决某些情况下需要T所有权的场景</li>
7 |% M$ U8 ~% S. u+ x; |! ^5 y</ul>* I& X; s  H8 G8 i, K/ _/ ]
<h1 id="完整代码">完整代码</h1>6 e' O9 \* |8 {% a: r2 M
<pre><code>use std::thread::{self, JoinHandle};
/ V: y* }* w9 G1 F- x  huse std::sync::{Arc, mpsc, Mutex};
4 p6 ^" t* @5 g$ U: v
. H; m" L: h. d! N& b# Z8 `- D2 V1 H
type Job = Box&lt;dyn FnOnce() + 'static + Send&gt;;: Z6 z" W* A) `7 p
enum Message {
. Z( U- l- }6 [0 x! w    ByeBye,  `+ q! t) g: e2 H; j* g
    NewJob(Job),9 @* S. W( i* S! B
}, [/ o; S+ X$ s# C6 O
* l5 `  N8 Y, ?6 Y% o
struct Worker where
9 g+ y7 Y+ n) `" Z3 Q7 t{3 j2 S. i0 {/ [
    _id: usize,9 a* e/ _; t$ }8 u" u7 }5 w4 Y
    t: Option&lt;JoinHandle&lt;()&gt;&gt;,& D* L) v- M- A% b4 X- U  n
}( C8 `( C3 ?; h% b2 W. Y' O3 F
. J6 i& \" P4 L: B, F$ b: _& _7 ~1 n
impl Worker/ ~' k2 H2 J& _6 c2 m+ f
{' U7 T+ k: w+ j; z3 J5 z( w: [
    fn new(id: usize, receiver: Arc::&lt;Mutex&lt;mpsc::Receiver&lt;Message&gt;&gt;&gt;) -&gt; Worker {
& w, W: g: k: B" J% p        let t = thread::spawn( move || {0 S& [- M' q3 `; p
            loop {& m# g% @& ?3 S2 h$ ^
                let message = receiver.lock().unwrap().recv().unwrap();
6 k% M8 ^% y& w$ ^# r                match message {' w( \! `; P  s! ~
                    Message::NewJob(job) =&gt; {  Z* t8 U, k' R6 O* Q
                        println!("do job from worker[{}]", id);
- G, {* A% n0 E9 w% t                        job();
5 @3 Z- Q6 B& k9 h" [7 }                    },
0 e  a, N+ K5 T/ D2 m6 Z! y, @  R                    Message::ByeBye =&gt; {
. j' R( G# n4 A  _; d                        println!("ByeBye from worker[{}]", id);6 J0 @0 ?7 g3 |, ~& @, G: W
                        break' l& F. n! c9 r7 Y! I% L8 Z  m: ~
                    },
8 M/ {; _" s! v5 A# U# b+ M7 S                }    D; Q2 j% g4 g. ^3 ~/ P
            }
! \4 L! c" }3 G% z# @        });6 ~4 R0 B- n$ [; i; O6 c

/ N2 F! y) ~: M0 i4 f        Worker {. X* Z  I3 O4 Q4 @. _* P: n. q" \
            _id: id,
. v, v8 e" a7 v4 B            t: Some(t),
6 H; ^; t; N& V/ `# z1 X5 G        }
1 I$ o% f9 \" b+ v( G  o    }0 D3 Q5 \" }( C; G& p
}( m  T/ I5 ~" r
1 @5 ]9 j; R( D, G1 N7 a
pub struct Pool {
/ L1 R1 X) `8 R1 C) \    workers: Vec&lt;Worker&gt;,
  x: r- Z- t- S1 e! d% ~0 r$ z7 _    max_workers: usize,
+ |2 m& r" O9 n6 u4 r1 N. ^& }  G    sender: mpsc::Sender&lt;Message&gt;
6 V! h  i4 x" {$ i" w" }7 r}% Y' ]8 d8 f' N( s
7 O9 ~! V: g- A% ^* V- y2 N
impl Pool where {
" {% l2 x- u" f% k0 }    pub fn new(max_workers: usize) -&gt; Pool {" W; p5 o0 k; }+ h1 ]- r$ b, _
        if max_workers == 0 {
% C: z: \! k. [) A            panic!("max_workers must be greater than zero!")
4 i. ?4 p3 a. f8 D* \9 @        }: b' L5 E( i! Z$ V
        let (tx, rx) = mpsc::channel();
5 J# a5 z2 K# A* w/ [2 Q$ D, {. z( a3 _
        let mut workers = Vec::with_capacity(max_workers);4 s# N% A- r; b( t3 x& r# X' L$ @  y
        let receiver = Arc::new(Mutex::new(rx));  {: W3 y; M3 d' Q1 Q
        for i in 0..max_workers {
; b+ c( p- N8 i  g            workers.push(Worker::new(i, Arc::clone(&amp;receiver)));
6 Q$ b! G# U2 R4 b* v8 A& U        }* ^; Y/ _! P- O0 u9 Q0 R
/ B+ Y/ u5 X8 C8 N) D
        Pool { workers: workers, max_workers: max_workers, sender: tx }# ~( a) D- k6 R) j0 Z# J
    }3 h4 }/ E7 C& e: g+ B# a* q
   
% X& d: D0 m) k    pub fn execute&lt;F&gt;(&amp;self, f:F) where F: FnOnce() + 'static + Send9 Z1 z* |9 o% l- o9 K2 y
    {
" O6 W& r- M) }. F9 ], H5 G# z3 M$ Z' n2 `1 v
        let job = Message::NewJob(Box::new(f));
0 U! A& u7 g7 T) S: R( j/ V5 `        self.sender.send(job).unwrap();( p2 o2 z/ |& O1 C- f2 S6 V5 ^
    }/ ^- O5 A5 v6 s7 y4 L& t: ]
}  D" }' U+ H: N
5 X5 K" V$ F* U5 G6 U
impl Drop for Pool {
; O/ g- S) d: M    fn drop(&amp;mut self) {1 e! {9 d3 i' x) L% R: k
        for _ in 0..self.max_workers {
9 h' O& I  t3 m2 B2 s$ A+ M            self.sender.send(Message::ByeBye).unwrap();
, ~! O! r0 u) L" a        }
, a5 g  Y+ ?. ^/ o( {% d8 e        for w in self.workers {! n* n) |; G3 t" {5 m9 i& b
            if let Some(t) = w.t.take() {
  d' c1 Z( h; A: t                t.join().unwrap();& J: ^- h5 P  k
            }
8 {: ^5 q* o1 ?1 D        }% d- s" ~2 ]- ?' r! O
    }( M" A! i% _9 z* x
}
2 w0 R/ [3 a$ s. ?& _7 u
; c% @! z! R! d1 w% A' Q$ `! w& t- J
#[cfg(test)]% ]! u* w" G6 K3 Y; ^/ f5 C4 d- _
mod tests {
5 L% S; W+ |* m# y( L    use super::*;
0 r+ |7 `: H8 s" {% e4 n0 |    #[test]: c, z3 p. f$ Y* I/ l
    fn it_works() {
  ^: R# A1 H3 w        let p = Pool::new(4);( N$ |6 j& k2 l  s5 V. F
        p.execute(|| println!("do new job1"));
6 [- A  X- b3 B/ S/ B- d        p.execute(|| println!("do new job2"));' Y% p4 `) c5 x# W
        p.execute(|| println!("do new job3"));
5 M6 \8 j7 ?. C0 y        p.execute(|| println!("do new job4"));
! x4 @( v2 ]+ Q/ w; n    }
. `6 H4 L( m/ m0 O; _7 U! J. f}# Z1 Y4 D, m+ n* b+ c) Z( ?2 M
</code></pre>
& o& R  n+ r# X; C2 ~+ Z4 W0 w3 W4 W! b& I* y5 F" g4 I. z
回复

使用道具 举报

懒得打字嘛,点击右侧快捷回复 【右侧内容,后台自定义】
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

手机版|飞雪团队

GMT+8, 2025-12-9 11:19 , Processed in 0.066904 second(s), 22 queries , Gzip On.

Powered by Discuz! X3.4

Copyright © 2001-2021, Tencent Cloud.

快速回复 返回顶部 返回列表