飞雪团队

 找回密码
 立即注册
搜索
热搜: 活动 交友 discuz
查看: 14747|回复: 0

rust 实战 - 实现一个线程工作池 ThreadPool

[复制链接]

8044

主题

8132

帖子

2万

积分

管理员

Rank: 9Rank: 9Rank: 9

积分
26462
发表于 2022-2-12 14:35:42 | 显示全部楼层 |阅读模式

7 J8 h" F$ j* J+ P: i$ }<h1 id="如何实现一个线程池">如何实现一个线程池</h1>
5 @, _0 X# q# A% k<p>线程池:一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。 例如,对于计算密集型任务,线程数一般取cpu数量+2比较合适,线程数过多会导致额外的线程切换开销。</p>
% M0 D- n; V  |. j6 Q6 Y<p>如何定义线程池Pool呢,首先最大线程数量肯定要作为线程池的一个属性,并且在new Pool时创建指定的线程。</p>0 l4 R2 ~: B; z5 S6 B' x+ G
<p>线程池Pool</p>
1 f; Y* A+ X) w3 c  U9 N<pre><code>pub struct Pool {7 Y( n! z8 `4 A2 ~
  max_workers: usize, // 定义最大线程数
! @; I8 B- f' n, U/ A) Y}
1 U; O$ X% {8 \7 c# p  S; x. |: m: C" n. b
impl Pool {2 h% A7 k0 M7 _+ [  E6 W
  fn new(max_workers: usize) -&gt; Pool {}
, ?! P) F5 b! Q9 G4 p% _- Z  fn execute&lt;F&gt;(&amp;self, f:F) where F: FnOnce() + 'static + Send {}+ z( D$ ~" S8 U3 I: |
}# l) X1 R  E' B1 D* t( C! n& ?- }, V
2 W& C" e, Z6 {$ O. E* m$ Y
</code></pre>) i5 N  t: i. |7 z/ J7 h- D5 y
<p>用<code>execute</code>来执行任务,<code>F: FnOnce() + 'static + Send</code> 是使用thread::spawn线程执行需要满足的trait, 代表F是一个能在线程里执行的闭包函数。</p>
& m7 v+ M6 Z" Y<p>另一点自然而然会想到在Pool添加一个线程数组, 这个线程数组就是用来执行任务的。比如<code>Vec&lt;Thread&gt;</code> balabala。这里的线程是活的,是一个个不断接受任务然后执行的实体。<br>
: R' H* i# F/ K8 B( K. |& C( I可以看作在一个线程里不断执行获取任务并执行的Worker。</p>
8 {# y9 G& Q" G% I! K<pre><code>struct Worker where( \! z7 C% s9 x6 L1 A7 }
{
& Y1 f- [6 p$ G$ s* Y    _id: usize, // worker 编号
* i1 S7 D4 G- m. V1 `}
# f& i6 w9 m2 T' `8 Q" R</code></pre>
, Y2 l/ R9 l' j# L$ d+ b<p>要怎么把任务发送给Worker执行呢?mpsc(multi producer single consumer) 多生产者单消费者可以满足我们的需求,<code>let (tx, rx) = mpsc::channel()</code> 可以获取到一对发送端和接收端。<br>. `  b. v9 {: l0 K) y
把发送端添加到Pool里面,把接收端添加到Worker里面。Pool通过channel将任务发送给多个worker消费执行。</p>" F  _7 ]( N* i& e) T3 j$ d
<p><strong>这里有一点需要特别注意,channel的接收端receiver需要安全的在多个线程间共享</strong>,因此需要用<code>Arc&lt;Mutex::&lt;T&gt;&gt;</code>来包裹起来,也就是用锁来解决并发冲突。</p>
) s# a! Y4 c1 b0 E( z4 f$ Y5 E<p>Pool的完整定义</p>3 u% Q  C" W/ Q: j, F$ t
<pre><code>pub struct Pool {
( a/ z1 x; A; y% g3 i; o1 |  u    workers: Vec&lt;Worker&gt;,
0 V. ^! M3 h2 l. u" y, [( l    max_workers: usize,  Z' l6 ^+ y! |6 p( a. _+ J4 w
    sender: mpsc::Sender&lt;Message&gt;
  m8 _7 O$ \1 j# l6 o/ F+ }! R}
: z; H. L8 N8 T% ?! E7 o* c4 @2 Q</code></pre>5 }$ A: I* e+ t9 |( w& l
<p>该是时候定义我们要发给Worker的消息Message了<br>9 ^8 p/ p% `. O7 [/ U
定义如下的枚举值</p>
- Q7 B4 z, o# Q7 U<pre><code>type Job = Box&lt;dyn FnOnce() + 'static + Send&gt;;0 y/ r( J6 D: x
enum Message {" m) V& t" M% X5 `
    ByeBye,6 X& Z; W4 E+ ?, t
    NewJob(Job),+ Z2 @) ?. P. `8 K/ X, ^) e6 E
}& `6 j0 v1 r; t4 y
</code></pre>+ [+ u2 t" `7 j, C( m- b/ w
<p>Job是一个要发送给Worker执行的闭包函数,这里ByeBye用来通知Worker可以终止当前的执行,退出线程。</p>
  P; p/ y% R' ?1 m( _2 o/ Y% p<p>只剩下实现Worker和Pool的具体逻辑了。</p>5 C5 }7 b& x# K  u
<p>Worker的实现</p># g6 U7 q* u$ `. t& D
<pre><code>impl Worker, ~4 |# ^& n' R4 r
{
$ ~2 h& F9 @  V6 R$ q' w7 F* H: N    fn new(id: usize, receiver: Arc::&lt;Mutex&lt;mpsc::Receiver&lt;Message&gt;&gt;&gt;) -&gt; Worker {
' t, K8 e; `  A8 U% R* g        let t = thread::spawn( move || {
% ]% i, J* w  L7 L9 l7 N4 I8 c            loop {+ b8 O" G8 g3 D
                let receiver = receiver.lock().unwrap();4 k% E) B) S/ h; \
                let message=  receiver.recv().unwrap();3 r  b7 K1 t8 y8 A# z% b. z, R5 R
                match message {
* l7 Y5 T' S6 ?* F3 O                    Message::NewJob(job) =&gt; {9 p& ^" V: d) k( j  Y5 r! I3 X# v
                        println!("do job from worker[{}]", id);
' k, \0 P- A9 a( B0 d" r$ v& v                        job();8 V: |& t1 l% o
                    },
1 X' y4 T; |: V3 H                    Message::ByeBye =&gt; {+ H- ~3 E! Z4 B  ]9 h6 x
                        println!("ByeBye from worker[{}]", id);
2 J; W; y: o7 O9 T8 F; S                        break) \) o) {9 h# _4 k' f9 T$ D
                    },' \/ ]5 I3 J2 k4 O4 P- Q
                }  
) s' O5 ^  |" U- F+ O, t) Y+ y7 @            }" t' w5 s6 O1 d
        });
6 u- l) w' L' C8 M$ f6 i0 ]- ^4 t& Q- v) a, j. m5 o
        Worker {  b, j' C5 x3 t$ M- ~0 R: ^, g
            _id: id,
/ z+ v3 N) R* J# h. a  F            t: Some(t),
( K( p. J' i5 R" m+ f$ P1 N        }
" g; g( l8 o7 d    }
( o9 M# i/ t4 J8 |8 K) d( q}
% c& \1 h7 \4 b2 L4 ]</code></pre>! o/ I+ L  U& c5 P4 `0 T
<p><strong>let message = receiver.lock().unwrap().recv().unwrap();</strong> 这里获取锁后从receiver获取到消息体,然后let message结束后rust的生命周期会自动释放掉锁。<br>
% \* \: ?7 x" [. `# V7 b3 |) H" h但如果写成</p>
0 ^" R9 M7 Q' m8 N+ {& M" w# }5 w! J, Y<pre><code>while let message = receiver.lock().unwrap().recv().unwrap() {
- q4 M! r( R; q: x! J1 w2 r};
1 w9 n* D% c5 T) W; C, Y4 {</code></pre>% i6 O% Q" H) {" S$ ^( G- z
<p>while let 后面整个括号都是一个作用域,要在这个作用域结束后,锁才会释放,比上面let message要锁定久时间。<br>
, W% r8 E: s& \  C1 b  Xrust的mutex锁没有对应的unlock方法,由mutex的生命周期管理。</p>7 h/ n' g1 {+ M
<p>我们给Pool实现<code>Drop</code> trait, 让Pool被销毁时,自动暂停掉worker线程的执行。</p>7 l. m( K  c' o2 q& e. W" M
<pre><code>impl Drop for Pool {6 L4 p- ~6 S# C; }8 {2 z0 F6 w
    fn drop(&amp;mut self) {8 F0 K: M+ ?& i8 C
        for _ in 0..self.max_workers {
* Q' b3 D$ M3 [- s  u" N( E            self.sender.send(Message::ByeBye).unwrap();- ^0 a" `5 i0 F! T+ X- b7 J4 U
        }3 [8 o4 U4 \- h' @8 @6 g
        for w in self.workers.iter_mut() {1 f0 J9 s1 Y8 A1 }* D
            if let Some(t) = w.t.take() {
% b) Z2 k& s- u2 T  x+ y* g( G                t.join().unwrap();
4 o( V3 P. q, j% P) Q1 a            }% `, U! G$ g$ C4 d) N: P9 R4 c) `
        }; P0 ?: D8 [1 [# G. @9 n8 X$ h0 E
    }, g) Z% ?$ N0 X2 V  S5 G% s4 I
}
- f$ z" [  O2 @3 F# x. Y
! q1 v/ {1 ?8 z8 R' O8 K# K</code></pre>
6 T  b# p1 G4 W) L6 m+ V8 u( {& L7 M  R$ ?<p><strong>drop方法里面用了两个循环</strong>,而不是在一个循环里做完两件事?</p>
+ q& @* V5 E; R" R! a<pre><code>for w in self.workers.iter_mut() {2 r" f5 }! l1 G. C
    if let Some(t) = w.t.take() {
: b5 R& |- B" f4 ]- v& Z        self.sender.send(Message::ByeBye).unwrap();5 r+ ^: l7 e" A  m) L/ C+ G# k
        t.join().unwrap();3 I4 `' y; N1 i/ M# X
    }$ p3 f( i% e0 R0 g* _
}6 k; O! r8 D- d  F+ n7 s
& O* I( j9 [3 D7 E
</code></pre>/ k, U& k7 {) B5 F$ t/ m+ h
<p>这里面隐藏了一个会造成死锁的陷阱,比如两个Worker, 在单个循环里面迭代所有Worker,再将终止信息发送给通道后,直接调用join,<br>
0 Y0 C4 y" j' |* k/ b$ @( g! B我们预期是第一个worker要收到消息,并且等他执行完。当情况可能是第二个worker获取到了消息,第一个worker没有获取到,那接下来的join就会阻塞造成死锁。</p>* Q  R( e! y+ n* Q4 t! I, b& _
<p><strong>注意到没有,Worker是被包装在Option内的</strong>,这里有两个点需要注意</p>
1 D. _- W6 R* ?/ D* `3 a<ol>
: p' M% G& b( ~' ~: A<li>t.join 需要持有t的所有权</li>
( X2 m2 B) p1 T% V<li>在我们这种情况下,self.workers只能作为引用被for循环迭代。</li>
3 P' Y+ r0 {* p+ M</ol>
1 l& k- W) L8 k- @3 \/ f<p>这里考虑让Worker持有<code>Option&lt;JoinHandle&lt;()&gt;&gt;</code>,后续可以通过在Option上调用take方法将Some变体的值移出来,并在原来的位置留下None变体。<br>
5 D3 Z( @9 t- }9 I, P换而言之,让运行中的worker持有Some的变体,清理worker时,可以使用None替换掉Some,从而让Worker失去可以运行的线程</p>
1 c' S1 G1 [0 o2 Z<pre><code>struct Worker where
* ^5 U$ a# W9 i9 T{: K8 I+ N/ u% [! N* j
    _id: usize,
; N+ @. m6 ^% g+ g    t: Option&lt;JoinHandle&lt;()&gt;&gt;,
8 U0 L0 n0 \" r& W}# E% \1 Z9 E5 [; @
</code></pre>3 R# D; Q0 @* C7 n$ _% O0 D
<h1 id="要点总结">要点总结</h1>  v3 U' n' @2 b* `# I: ^
<ul>  C/ }, A: s* J% k
<li>Mutex依赖于生命周期管理锁的释放,使用的时候需要注意是否逾期持有锁</li>
8 k0 X. H1 e. N. C7 p( ^5 K<li><code>Vec&lt;Option&lt;T&gt;&gt;</code> 可以解决某些情况下需要T所有权的场景</li>
% M4 W3 U+ N7 q</ul>
* S4 p+ V2 L- k! \8 g6 ~. b6 x<h1 id="完整代码">完整代码</h1>$ s3 P0 C% H' i& @& B# y
<pre><code>use std::thread::{self, JoinHandle};5 L3 Q% W! ?! c& w3 N) L
use std::sync::{Arc, mpsc, Mutex};) @( m/ |, D  {8 `- z

6 I( q# \' R/ w7 \5 |3 ], k9 J7 u
4 W+ Y  A, F- h6 u# n" }type Job = Box&lt;dyn FnOnce() + 'static + Send&gt;;. e+ h1 Y( Z- W) U/ `" [$ C
enum Message {
0 k- I1 X4 r& W0 ?    ByeBye,
. O- \: H- _0 e- r4 t    NewJob(Job),5 p0 f* @4 }5 K2 j
}
2 @) f  o, s. d7 K7 z- B4 R1 t! y) ?
struct Worker where
/ H9 y* M  N3 m3 M1 B{
# s1 m  p" e3 }5 n& z+ B    _id: usize,
0 ]7 k" a$ L6 n' q; {; X: D% ]    t: Option&lt;JoinHandle&lt;()&gt;&gt;,! q; ?$ M0 C2 }# d+ @4 P1 |+ R- Y" i3 f
}
4 Z& W9 I; ]3 ^+ ?  o; H, \$ n0 m. e1 q8 `# @- I
impl Worker
3 y% m" D5 Y% R9 w{
! ?! D1 J8 o% j+ `2 v    fn new(id: usize, receiver: Arc::&lt;Mutex&lt;mpsc::Receiver&lt;Message&gt;&gt;&gt;) -&gt; Worker {
$ r* ~) S0 N6 X        let t = thread::spawn( move || {
) ?; x0 V1 b, [/ B! e; {            loop {, {+ y/ U& \8 ]" ?' q$ j6 S
                let message = receiver.lock().unwrap().recv().unwrap();: f5 F* N! F7 i' u. G5 y2 x
                match message {7 P1 n: W  h) n
                    Message::NewJob(job) =&gt; {
+ S2 z/ p( `) T# y! w9 H! B                        println!("do job from worker[{}]", id);% ?( B; N$ a# N/ c2 H! P7 Q3 Y
                        job();
" g' G- |! U) @" R  Q, Q6 U, N. h                    },+ X8 o. Q( _/ k. |* ]  L$ }$ O' K7 a
                    Message::ByeBye =&gt; {; D% z/ R" Z( z2 B& n* W
                        println!("ByeBye from worker[{}]", id);
' E: i! \% y6 h& d  g$ v                        break
4 _9 \+ x- _- M9 y. i                    },
* \# g: s. F! w5 `                }  ' {, j/ e  r3 ^3 A' ^% J
            }
1 ^6 {, D: d, d: C# x0 P        });
8 o) U. W. S+ X
8 C7 s/ v! e1 \" P        Worker {
6 D" U  ]5 ^  h* ?/ G1 a0 h, b            _id: id,/ W! S5 T9 @, W$ k
            t: Some(t),. r% O% ]/ Y7 N3 j' c' i+ g4 c! w
        }8 O& w+ T* {. ]$ f$ l3 j
    }
' U2 Z  U: X  S! ]) a: l) d0 _}+ Q, v5 _' d+ L( W

# c9 H: w, `# k- Y) ^2 @( \1 r  W$ Gpub struct Pool {
' W1 M! [7 @9 V- W  h! A, v9 j    workers: Vec&lt;Worker&gt;,
- g2 |) z+ @8 M3 N0 K! \1 c4 Q$ U7 d    max_workers: usize,
+ X+ ^2 o+ Z5 d3 d0 \/ B, W    sender: mpsc::Sender&lt;Message&gt;% p5 t  I9 v# g4 f% v
}3 V2 Z3 h' v+ [! J5 D3 o2 j
# q$ s' m8 \6 K! W9 g
impl Pool where {/ [' G8 B1 w7 p# r" M3 |3 t9 B
    pub fn new(max_workers: usize) -&gt; Pool {4 ?1 J, P9 c- ]7 z5 i# u( S4 g
        if max_workers == 0 {6 Z! t6 J+ o6 @) H
            panic!("max_workers must be greater than zero!")
: L* y, t2 Y8 H        }8 @! i, Z( {# T+ ^' @
        let (tx, rx) = mpsc::channel();
2 i1 _' R3 L/ n& F
& V7 i) R1 [- H. S; Y/ ?        let mut workers = Vec::with_capacity(max_workers);
6 r% k. a% T* a3 P0 g        let receiver = Arc::new(Mutex::new(rx));
4 q1 t% M3 _8 n, p        for i in 0..max_workers {5 _) M, B* J4 I/ a2 \" J
            workers.push(Worker::new(i, Arc::clone(&amp;receiver)));: n( g3 f$ b7 j5 y9 O" z2 m
        }
6 F3 s( a2 d$ o6 s$ f" [- {
4 T3 {( |: H* `+ x7 }. d% H- A        Pool { workers: workers, max_workers: max_workers, sender: tx }
! n' `  H6 L+ d( a    }
! y1 u3 ]. g) [  D# L+ @- J   
" r, b/ r- h4 D    pub fn execute&lt;F&gt;(&amp;self, f:F) where F: FnOnce() + 'static + Send$ a, t9 ^0 k2 l% @; u- e
    {
6 i3 |; w+ C3 z$ [
" d8 y# A) l8 u' k0 p        let job = Message::NewJob(Box::new(f));
9 W% C- T0 Q7 R; }5 w/ ]/ [' Z* W        self.sender.send(job).unwrap();
7 c1 o! ~* O0 X1 w/ W: p# M2 J    }
' W2 u* V+ {, N0 `, a6 A. \}. W$ f$ s+ P* v
$ A% V# }& ]4 j1 e! J
impl Drop for Pool {9 ?7 b6 s1 Y" ~
    fn drop(&amp;mut self) {
0 K) A0 T* {1 J7 ]        for _ in 0..self.max_workers {  c! t& q$ j0 x: m& `1 @; x
            self.sender.send(Message::ByeBye).unwrap();' B2 a9 m& A% g3 z
        }
# _" S3 y3 s7 q2 V2 a$ k! }% `. K        for w in self.workers {3 j; e& a0 I  o- @: Y7 A
            if let Some(t) = w.t.take() {
9 g5 l: ]: ~- f9 D+ i                t.join().unwrap();5 p' V5 I( W+ E; ~% E' ]
            }
) L( C- z. I& r" Y        }# K  V! V" i, t: x) A" g8 [
    }" E# i! j( U# F
}
2 _6 t9 g8 Q4 m# g# f8 t+ y. T! ^
& u2 x+ k1 |, o0 @7 L
' \, v7 Y* L: w& a1 }+ ^( Q#[cfg(test)]# `& A  @# k9 C6 t3 I/ h
mod tests {
. g) ~, d5 Q+ c! @) M7 r" q    use super::*;7 j# T: y) D! E- X/ w4 L
    #[test]
5 A1 l* k4 F1 m8 U' b    fn it_works() {# A7 e- m$ }7 W
        let p = Pool::new(4);
; ~6 e* e2 I  @$ `7 a        p.execute(|| println!("do new job1"));
' l- G' A# B+ F" Y2 P0 C        p.execute(|| println!("do new job2"));  e9 Q0 r9 j4 @4 G! K- N
        p.execute(|| println!("do new job3"));
+ @% m8 j. d: f8 R        p.execute(|| println!("do new job4"));
6 p/ A7 i; X; ^; X7 V2 c    }
: G: B! Q; Q7 ?: T}* R* B: u6 q0 ^  z$ u
</code></pre>$ b% {% {9 z( O+ B

% s' H1 O$ I3 a  c( X
回复

使用道具 举报

懒得打字嘛,点击右侧快捷回复 【右侧内容,后台自定义】
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

手机版|飞雪团队

GMT+8, 2025-12-10 09:45 , Processed in 0.069763 second(s), 22 queries , Gzip On.

Powered by Discuz! X3.4

Copyright © 2001-2021, Tencent Cloud.

快速回复 返回顶部 返回列表