飞雪团队

 找回密码
 立即注册
搜索
热搜: 活动 交友 discuz
查看: 14624|回复: 0

rust 实战 - 实现一个线程工作池 ThreadPool

[复制链接]

8034

主题

8122

帖子

2万

积分

管理员

Rank: 9Rank: 9Rank: 9

积分
26432
发表于 2022-2-12 14:35:42 | 显示全部楼层 |阅读模式

3 r! j6 |/ m. L9 Y' U6 x<h1 id="如何实现一个线程池">如何实现一个线程池</h1>' k5 N! f2 P) k) a. }. ~7 S4 L2 w
<p>线程池:一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。 例如,对于计算密集型任务,线程数一般取cpu数量+2比较合适,线程数过多会导致额外的线程切换开销。</p>4 p8 [% N" P7 C8 |5 \
<p>如何定义线程池Pool呢,首先最大线程数量肯定要作为线程池的一个属性,并且在new Pool时创建指定的线程。</p>
7 Q# C7 x& a+ ?$ ~1 _+ b$ @# S<p>线程池Pool</p>
) {8 [- ]5 R9 K( v) l4 t<pre><code>pub struct Pool {
& F3 I/ h9 y8 b+ P  max_workers: usize, // 定义最大线程数- B7 p" j& `# R8 X  [
}- @- C6 a2 n! Z$ w  O& o
3 Y0 t: u0 [+ O$ |+ @' p9 ~
impl Pool {8 ^% V* U" I6 y4 w8 `& p! `. \' N9 C% l
  fn new(max_workers: usize) -&gt; Pool {}
) M0 |* t* K8 `" B% f# S; m  fn execute&lt;F&gt;(&amp;self, f:F) where F: FnOnce() + 'static + Send {}
. R, u% W- X) j5 ~4 t% \5 P}
& f2 ]. r' s8 O2 D8 W& ]
$ A4 N9 R9 m  K; b</code></pre>' K' j" E" X: [4 L% u( g( S9 y) }
<p>用<code>execute</code>来执行任务,<code>F: FnOnce() + 'static + Send</code> 是使用thread::spawn线程执行需要满足的trait, 代表F是一个能在线程里执行的闭包函数。</p>! f+ w' y# M  F- d. [1 R; y3 U( S: Q
<p>另一点自然而然会想到在Pool添加一个线程数组, 这个线程数组就是用来执行任务的。比如<code>Vec&lt;Thread&gt;</code> balabala。这里的线程是活的,是一个个不断接受任务然后执行的实体。<br>
6 J" R( M8 I4 \+ ^* v( T可以看作在一个线程里不断执行获取任务并执行的Worker。</p>
, J& [  A" O' w) f9 ]* c/ ?<pre><code>struct Worker where
% @  O6 ~0 D9 X! o. g{3 i0 R1 q0 k1 Q( g$ V3 F: o
    _id: usize, // worker 编号
' Q& s$ e3 W, W3 F( ^4 e5 p7 g' X}- E& T) o0 k% l" H
</code></pre>
: U0 s3 p" j( N9 P" x2 G<p>要怎么把任务发送给Worker执行呢?mpsc(multi producer single consumer) 多生产者单消费者可以满足我们的需求,<code>let (tx, rx) = mpsc::channel()</code> 可以获取到一对发送端和接收端。<br>
$ W) D: q# V- H) D把发送端添加到Pool里面,把接收端添加到Worker里面。Pool通过channel将任务发送给多个worker消费执行。</p>
  @6 r% l. V% L- T$ d<p><strong>这里有一点需要特别注意,channel的接收端receiver需要安全的在多个线程间共享</strong>,因此需要用<code>Arc&lt;Mutex::&lt;T&gt;&gt;</code>来包裹起来,也就是用锁来解决并发冲突。</p>0 D( Z1 X" O1 \- p! C
<p>Pool的完整定义</p>2 T9 e8 v" j, l' ~/ D7 X4 |9 I
<pre><code>pub struct Pool {5 Q3 [! F$ m& A: E1 `9 Z
    workers: Vec&lt;Worker&gt;,
" c1 F* W. m6 h; z    max_workers: usize," A6 o; c- L# U) M
    sender: mpsc::Sender&lt;Message&gt;
& |; e5 q6 u. m8 o" z5 m* F! P}* w5 T) Q& Y0 y' Q7 A! I
</code></pre>6 B7 G% Q/ ]% V" G2 B4 S
<p>该是时候定义我们要发给Worker的消息Message了<br>+ Q1 t4 n4 W6 _! \
定义如下的枚举值</p>2 ?* @- x) e7 w& z( X$ W
<pre><code>type Job = Box&lt;dyn FnOnce() + 'static + Send&gt;;
$ ~6 W( D8 G$ G1 G, Renum Message {
2 T3 D/ {$ {- i, y9 l$ g    ByeBye,: c/ V6 j. v. d( L) }# j3 d$ |& g
    NewJob(Job),
, X' A; r% d/ `7 h5 r( A6 O}8 d0 F4 X/ E2 l  ?2 U4 S8 G
</code></pre>1 @8 d% U. I7 v
<p>Job是一个要发送给Worker执行的闭包函数,这里ByeBye用来通知Worker可以终止当前的执行,退出线程。</p>* }/ l, ^% X9 H5 G, b; L, p5 M" X
<p>只剩下实现Worker和Pool的具体逻辑了。</p>
1 |- p3 O# }+ B! D<p>Worker的实现</p>
: ~4 P0 G) |( r& `' M7 ~<pre><code>impl Worker
  a  p0 g$ |5 R0 ~{* x* X* Z' m: f! m
    fn new(id: usize, receiver: Arc::&lt;Mutex&lt;mpsc::Receiver&lt;Message&gt;&gt;&gt;) -&gt; Worker {% W# \0 u- h5 V* I! l( _# f
        let t = thread::spawn( move || {  ?/ I* E5 b1 a& }
            loop {/ u) \& E, U7 e' R! M9 y3 H3 t
                let receiver = receiver.lock().unwrap();1 N7 s' z0 F9 d' ]9 C/ ~& u- b
                let message=  receiver.recv().unwrap();6 g6 Q# S+ @2 l) n0 c+ _& P
                match message {
7 W' e+ `- S: [* J/ f                    Message::NewJob(job) =&gt; {( _7 g9 v0 p! P4 e0 M
                        println!("do job from worker[{}]", id);
% x; ^; M  e; A2 i" g+ i                        job();' e$ K: f+ d- M; o  Q
                    },
- J! \- {0 h  w+ i9 a) @( m                    Message::ByeBye =&gt; {, [0 Y8 Y9 `+ s; \$ Q" R* @6 G
                        println!("ByeBye from worker[{}]", id);/ P, Y6 q: J; U: B/ n1 N
                        break
4 H4 d+ e! E7 p( t6 z  _4 U9 i# f                    },5 a1 v* h8 Q# j& d
                }  ; u( w- j! R) ~2 h0 j1 y! |
            }
& O4 e+ \- v6 r/ @! ]# T        });
! N3 g, I  T& Q7 Z7 C( i. ~8 {  `9 d6 V+ E" b2 {% V/ X
        Worker {
& ~0 @' x, ~; }! d' @            _id: id,; \2 W3 E6 @. l% L4 k7 t! c
            t: Some(t),
3 A0 U+ w3 D6 l/ H! d2 l. y        }
8 \; Y' ?; ]1 y$ F    }: T! C1 z+ F/ l! E
}
2 @# w* c" ]+ Y- @( M</code></pre>/ w& ]+ B3 H5 s" a( I% `% d
<p><strong>let message = receiver.lock().unwrap().recv().unwrap();</strong> 这里获取锁后从receiver获取到消息体,然后let message结束后rust的生命周期会自动释放掉锁。<br>
* Z# R, Z. W2 S* `* I6 @1 J但如果写成</p>& `+ K7 O/ P4 c/ }4 R% d
<pre><code>while let message = receiver.lock().unwrap().recv().unwrap() {
' [4 P( B. B1 E5 F3 x. D};1 P1 j* h: i6 D, ^8 `7 R" ^- T
</code></pre>; ], F! G: c' ^% y
<p>while let 后面整个括号都是一个作用域,要在这个作用域结束后,锁才会释放,比上面let message要锁定久时间。<br>
( H1 Z) u0 N- ]7 b- Zrust的mutex锁没有对应的unlock方法,由mutex的生命周期管理。</p>
. Y  ^; K, Q6 j/ t) l  o3 z: ?8 e! _& Q<p>我们给Pool实现<code>Drop</code> trait, 让Pool被销毁时,自动暂停掉worker线程的执行。</p>
6 c1 R# Z5 \9 t% ]0 `& R" B<pre><code>impl Drop for Pool {& v1 R3 r2 X# N# q7 `8 X
    fn drop(&amp;mut self) {
0 T  o; t* [6 [1 `7 g& w, h        for _ in 0..self.max_workers {
$ Y/ x# r4 R# a. i            self.sender.send(Message::ByeBye).unwrap();
/ h5 {! }" p/ C: `' ~  \        }
( G- e) `  ~' Y" e7 U        for w in self.workers.iter_mut() {" z, C- J$ h6 q1 f0 L5 D2 j" U  E
            if let Some(t) = w.t.take() {% n! I, N7 D/ ?* b; [* b% E! v
                t.join().unwrap();0 d; z' R+ c) i( b9 h
            }, a5 s) v2 |. g) z( w1 ?. Q  F; Z
        }
) b/ V( X6 P7 I" }* v1 B    }3 n3 P0 n, i/ h5 M( O' x9 L
}
8 w- B. ~  F+ \( g/ [2 z  u# n. {8 P+ n5 W: u4 C
</code></pre>  L& U& P) p9 @5 m
<p><strong>drop方法里面用了两个循环</strong>,而不是在一个循环里做完两件事?</p>4 d& p7 P+ {' @1 H
<pre><code>for w in self.workers.iter_mut() {9 P' v" |, _+ S. H9 z( p. m! [
    if let Some(t) = w.t.take() {
/ `4 P3 Q' d$ G& M2 ?5 @: n        self.sender.send(Message::ByeBye).unwrap();
6 K; Y9 _/ t- ^+ h        t.join().unwrap();
8 X3 ?8 D$ \* l* I5 X9 V    }
+ P4 T, ]/ I. F( z; }8 t0 J}* }9 p: }3 z4 m5 L

' ]; p6 W: Q7 C& X</code></pre>
5 n. ?% T" s! B<p>这里面隐藏了一个会造成死锁的陷阱,比如两个Worker, 在单个循环里面迭代所有Worker,再将终止信息发送给通道后,直接调用join,<br>& i3 h& q  s/ Y- ^- W& s
我们预期是第一个worker要收到消息,并且等他执行完。当情况可能是第二个worker获取到了消息,第一个worker没有获取到,那接下来的join就会阻塞造成死锁。</p># j+ \" F7 j; U: J  u7 S; t1 Z9 T2 m
<p><strong>注意到没有,Worker是被包装在Option内的</strong>,这里有两个点需要注意</p>
" [7 u' |+ F; S. n<ol>
. c. ^: @4 |3 @) B; S<li>t.join 需要持有t的所有权</li>) r2 S. w2 g4 i4 Y6 H
<li>在我们这种情况下,self.workers只能作为引用被for循环迭代。</li>: V. m6 F5 o# N" h7 ?- _0 l  K
</ol>7 Z7 ~/ }3 y* o/ b1 R7 A) i
<p>这里考虑让Worker持有<code>Option&lt;JoinHandle&lt;()&gt;&gt;</code>,后续可以通过在Option上调用take方法将Some变体的值移出来,并在原来的位置留下None变体。<br>
6 V9 F: o6 d3 k换而言之,让运行中的worker持有Some的变体,清理worker时,可以使用None替换掉Some,从而让Worker失去可以运行的线程</p>
5 X, s$ f* p) G8 m1 I<pre><code>struct Worker where( Q, g, c- Y! N# g! ~6 H- m
{
; g3 F8 B/ A: D    _id: usize,
4 Z, ^4 Z7 W8 U; p% m    t: Option&lt;JoinHandle&lt;()&gt;&gt;,, F0 p4 B# `. I3 n
}
1 K2 b. Y7 |) v! h( K- G5 x% ]</code></pre>" q: q4 P+ e: H3 J4 U0 g; ^, u* o
<h1 id="要点总结">要点总结</h1>+ ?. y- s! @* x4 ^) {" s
<ul>8 F' g+ P: h" F  I' V
<li>Mutex依赖于生命周期管理锁的释放,使用的时候需要注意是否逾期持有锁</li>
9 ?& G. X! x: h: y<li><code>Vec&lt;Option&lt;T&gt;&gt;</code> 可以解决某些情况下需要T所有权的场景</li>
0 d1 {5 e+ ?7 [7 Z5 N/ U</ul>
8 q3 |# u; k' i% M, P<h1 id="完整代码">完整代码</h1>  q% ]8 ~3 D" \
<pre><code>use std::thread::{self, JoinHandle};
/ K/ S9 K  U6 [use std::sync::{Arc, mpsc, Mutex};
, F/ V1 @. D! r3 Q% X/ v+ ~# l  Y; d0 n
% n3 ~7 B1 a) \6 H3 V; S
type Job = Box&lt;dyn FnOnce() + 'static + Send&gt;;
2 J& `. Y) V( e) m- Genum Message {9 h- [+ s+ X8 b# ]
    ByeBye,# H3 b' R3 ^; v& R
    NewJob(Job),0 @5 H" [; i+ q% W3 a& `+ M) h, ~
}
7 v% f8 z. T7 o6 A& j8 d( u$ u* P2 P$ \# F
struct Worker where. Z, h9 c+ f2 q" n: D. I
{- u- @5 n/ y0 D+ B- h) _
    _id: usize,, X  M' l  s3 k# y( c
    t: Option&lt;JoinHandle&lt;()&gt;&gt;,
8 }2 n: c  O6 N' j}
5 k' Q& k- g/ y+ ?0 ?; Q
5 w9 z& T9 g& Z+ H' |" Eimpl Worker
# M+ h9 ]4 n4 w- ~! J$ ~. s{
6 t) r* L4 g6 m8 X5 z9 B7 v    fn new(id: usize, receiver: Arc::&lt;Mutex&lt;mpsc::Receiver&lt;Message&gt;&gt;&gt;) -&gt; Worker {7 d- b5 m4 ^: r- w* g
        let t = thread::spawn( move || {
7 d9 g: r8 N1 t6 ?7 y            loop {
6 S( ^% z- _, B' F' v! M: h5 W. {                let message = receiver.lock().unwrap().recv().unwrap();
; O+ b/ Q8 Z+ @( G+ Y$ {                match message {
4 i7 {* L' J9 t/ e7 q& F3 y& J                    Message::NewJob(job) =&gt; {' {0 @! C  ^, `3 S7 U! [
                        println!("do job from worker[{}]", id);
2 B. j  p; _5 S                        job();
; q, h0 J9 E/ [                    },
+ w+ @! b5 G! b4 K5 A* R  U% P                    Message::ByeBye =&gt; {2 R- ^7 h6 j1 d. U7 C: [
                        println!("ByeBye from worker[{}]", id);
2 A- h3 s' z/ M- w& |! Y+ y                        break
8 i2 C$ T' b2 `# M- }( l8 t+ _                    }," V5 B- K% W! c
                }  
* j$ V$ }* V* ]$ g+ `* D8 n            }, J, p# m% ~3 L' q( l% @" @
        });
, D* A* s" w8 ~" z, m; V' l, v6 I  }# ^; D" y0 }+ X6 J; {& d, Z. s
        Worker {
5 Y4 Y  z4 m& y8 L            _id: id,
8 G& ~. t4 B( f  `- h+ Q4 l  _  d            t: Some(t),1 E9 M: Y" ~9 x3 p
        }" x7 i) S  l5 b0 G
    }
$ \2 v; y4 Q$ v" m$ }* h8 {& ~}
" @  I8 d7 f& w1 ^9 P0 C! Y# V2 |* ?# C
pub struct Pool {  H) k; ?/ q/ |1 |
    workers: Vec&lt;Worker&gt;,
: A6 z$ h, u/ l3 k8 e  T    max_workers: usize,
: Y6 A" \5 V' s/ E& X$ K    sender: mpsc::Sender&lt;Message&gt;. P8 \/ m- [7 C6 z" n" b7 S7 F$ h" R
}
  m4 k* p2 D9 E2 Z" E; h
7 \- i9 Y( L* i1 z& eimpl Pool where {; g. h. X7 ^$ G) _
    pub fn new(max_workers: usize) -&gt; Pool {
8 G: U1 j% ~! d' d6 o        if max_workers == 0 {9 H* d, x5 U0 a
            panic!("max_workers must be greater than zero!")6 S, D6 [: U. b# j
        }
) b7 k8 J) P- u        let (tx, rx) = mpsc::channel();$ y& u8 m: |/ X, s/ d

2 E  q# V: _9 `6 X        let mut workers = Vec::with_capacity(max_workers);* o6 A/ C/ R, [
        let receiver = Arc::new(Mutex::new(rx));" S0 U- j2 l1 w. g6 @& o$ ~( a
        for i in 0..max_workers {
9 u' `7 |9 o& W7 q. o            workers.push(Worker::new(i, Arc::clone(&amp;receiver)));: y/ \: w) M3 Z6 T, [) m+ r( {
        }
/ b( d% N" J' t
, J- C. S1 o3 e3 w        Pool { workers: workers, max_workers: max_workers, sender: tx }
5 ?4 L3 ?$ g2 T' l: x7 C    }4 @9 \" R# ~5 Z7 Z* Z" |
    / m- B- C! F7 [  H  [! i+ J1 Z
    pub fn execute&lt;F&gt;(&amp;self, f:F) where F: FnOnce() + 'static + Send
! ~+ i, |% s( p) b3 Q    {
* G7 ~( ^' u1 F+ |8 a
! i! L& s" e" C4 {9 h( Y. P        let job = Message::NewJob(Box::new(f));) g& [/ l8 a, m% n* O* |4 W7 H
        self.sender.send(job).unwrap();- e9 |' ^  C6 w6 G" b- o3 D
    }) h, p" r6 O: T
}
1 j! Z2 D1 ?) \: j; S, E7 ]& C7 F, |: X' V3 \* o: |
impl Drop for Pool {
$ r: u" D; V  o) z* X    fn drop(&amp;mut self) {. z' p- P' J" m* L+ A5 Q  W6 {# o
        for _ in 0..self.max_workers {
7 O5 W/ y0 }6 u$ j, |            self.sender.send(Message::ByeBye).unwrap();
) W6 Z' e  p9 f5 F9 S2 d& b        }
0 C* K+ k! _9 D7 |# V        for w in self.workers {, N8 p6 U/ G% S  s
            if let Some(t) = w.t.take() {& A: O2 N; I$ F5 O$ S3 i$ a* V
                t.join().unwrap();2 T$ {/ m; l9 h: e% Y) R
            }
9 q( y9 T3 p( }8 \5 {* D        }
4 T6 W0 r2 w+ c, U, J7 [% e$ @$ {    }1 x) k3 T( `, W1 S1 B
}
6 r# I: F$ e* C# Z" x3 O6 `6 p& C" ~& q4 J$ X* }7 s7 H" d/ ?- N9 ]
9 L/ d3 ]+ J& a; P
#[cfg(test)]% t- w% W1 D  f% ?" I
mod tests {
; v+ M& x" D$ Y, M. h" {+ R    use super::*;
* T) A- B( z# F  i3 x* X( t: n9 B* a    #[test]
' m. F2 K  w5 @' Z    fn it_works() {
& F! c: B3 ?; R5 P( ^        let p = Pool::new(4);& B  o7 S* {* b  `
        p.execute(|| println!("do new job1"));4 M, X2 E7 \# o/ l* s  A. b; y
        p.execute(|| println!("do new job2"));
3 S/ N% H# x3 R  B: R        p.execute(|| println!("do new job3"));
0 n$ ~7 J7 R& s  d  V' G. ~( d        p.execute(|| println!("do new job4"));
! F% N9 }3 w# `    }% p2 b9 o! W" z3 p' s* _  @; A
}
' f! e/ }* l- d9 ?2 x$ N) S</code></pre>5 e- @, Z( V; ^% p+ B
5 q) R6 Q# T5 n6 o/ j% G' c
回复

使用道具 举报

懒得打字嘛,点击右侧快捷回复 【右侧内容,后台自定义】
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

手机版|飞雪团队

GMT+8, 2025-12-5 07:43 , Processed in 0.063343 second(s), 22 queries , Gzip On.

Powered by Discuz! X3.4

Copyright © 2001-2021, Tencent Cloud.

快速回复 返回顶部 返回列表