飞雪团队

 找回密码
 立即注册
搜索
热搜: 活动 交友 discuz
查看: 14694|回复: 0

rust 实战 - 实现一个线程工作池 ThreadPool

[复制链接]

8042

主题

8130

帖子

2万

积分

管理员

Rank: 9Rank: 9Rank: 9

积分
26456
发表于 2022-2-12 14:35:42 | 显示全部楼层 |阅读模式
  Z! _- b: d3 U  r  c$ f9 J9 M
<h1 id="如何实现一个线程池">如何实现一个线程池</h1>. a9 V# }6 m8 D6 m2 Y5 D
<p>线程池:一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。 例如,对于计算密集型任务,线程数一般取cpu数量+2比较合适,线程数过多会导致额外的线程切换开销。</p>6 r- S9 q2 ?9 H
<p>如何定义线程池Pool呢,首先最大线程数量肯定要作为线程池的一个属性,并且在new Pool时创建指定的线程。</p>
' Z" t9 x- b$ k; l; [<p>线程池Pool</p>
0 y* e" z% [- |5 y, W1 m, v6 M<pre><code>pub struct Pool {
- m  }% Z- O. A0 O! E5 Y# L# h  max_workers: usize, // 定义最大线程数6 S5 a+ W- ~; Y  p
}
4 h/ m" u9 i* J, Z4 g% }* M* B: Z  V6 ]  S
impl Pool {. v3 S- u+ k2 \5 ^9 R8 K
  fn new(max_workers: usize) -&gt; Pool {}1 J5 d2 J& ^2 D1 V! P2 }5 z
  fn execute&lt;F&gt;(&amp;self, f:F) where F: FnOnce() + 'static + Send {}
1 ~9 n: O) j5 ~( Q6 B# _3 n0 h}
/ e& M; }3 S- v( p
8 V( e) M6 p# x/ t</code></pre>
7 j4 c. t7 d, |$ v7 u; ^+ B3 I<p>用<code>execute</code>来执行任务,<code>F: FnOnce() + 'static + Send</code> 是使用thread::spawn线程执行需要满足的trait, 代表F是一个能在线程里执行的闭包函数。</p>1 W  X* f4 _$ f9 n
<p>另一点自然而然会想到在Pool添加一个线程数组, 这个线程数组就是用来执行任务的。比如<code>Vec&lt;Thread&gt;</code> balabala。这里的线程是活的,是一个个不断接受任务然后执行的实体。<br>
2 I7 K9 \7 K5 x8 @5 {' e9 M可以看作在一个线程里不断执行获取任务并执行的Worker。</p>' U+ W5 z2 ~0 L: x3 V
<pre><code>struct Worker where
0 Y2 }) E5 u9 w" p+ G{% _% k" r! S3 p
    _id: usize, // worker 编号
$ g) B! C2 S* v; Q; |' f}( b, \: P$ D; ^0 @5 L, E% K% G/ O
</code></pre>
6 Y/ b: o5 w. J<p>要怎么把任务发送给Worker执行呢?mpsc(multi producer single consumer) 多生产者单消费者可以满足我们的需求,<code>let (tx, rx) = mpsc::channel()</code> 可以获取到一对发送端和接收端。<br>
# `* B( Y8 n6 _( n8 M/ z; h把发送端添加到Pool里面,把接收端添加到Worker里面。Pool通过channel将任务发送给多个worker消费执行。</p>. p1 w$ V% [) a. M/ \8 l% Q* k
<p><strong>这里有一点需要特别注意,channel的接收端receiver需要安全的在多个线程间共享</strong>,因此需要用<code>Arc&lt;Mutex::&lt;T&gt;&gt;</code>来包裹起来,也就是用锁来解决并发冲突。</p>
1 y2 i" n% b& b+ R2 E<p>Pool的完整定义</p>7 z) b" M% u# ?4 R( ~0 x, j+ J  w+ S
<pre><code>pub struct Pool {
4 a4 R% N( x# H2 k8 `    workers: Vec&lt;Worker&gt;,
) m& F$ \# N) ?) M1 W    max_workers: usize,6 u  K" A" I, V" |" _5 P
    sender: mpsc::Sender&lt;Message&gt;; h* ~1 x" Z; u% D, ^0 z4 j
}
, |- P5 N2 S2 }0 b( R6 G+ t$ |</code></pre>
( X. y* R- q* b- A% ]<p>该是时候定义我们要发给Worker的消息Message了<br>, @) Y2 s9 w, `& Q
定义如下的枚举值</p>
  O0 x1 k& J( `0 p<pre><code>type Job = Box&lt;dyn FnOnce() + 'static + Send&gt;;
, T8 U2 B9 W7 yenum Message {
  e" E9 _/ w1 o4 I$ e; i    ByeBye,
% U8 C$ K. t3 [1 l. P    NewJob(Job),0 t% M. p  C3 I: f
}
: z6 U( q) X: x+ r2 O1 _/ z1 N</code></pre>8 P  f; S3 W: h+ B  M  k" I* g
<p>Job是一个要发送给Worker执行的闭包函数,这里ByeBye用来通知Worker可以终止当前的执行,退出线程。</p>
% {( G8 f& F% ^- g7 y<p>只剩下实现Worker和Pool的具体逻辑了。</p>
/ v" o7 H4 q% g9 d0 F8 b5 }( c<p>Worker的实现</p>
) F& R2 [' H6 A7 Q8 C7 D5 o<pre><code>impl Worker% [9 X9 m% n) z( P/ t
{
1 N+ P" _5 B& _. q3 ^: ?, |    fn new(id: usize, receiver: Arc::&lt;Mutex&lt;mpsc::Receiver&lt;Message&gt;&gt;&gt;) -&gt; Worker {
. @1 Y& z' M& t5 q7 ~        let t = thread::spawn( move || {
6 S5 o0 K0 T7 W$ [, P            loop {, X& u) m+ Z" U1 T2 n" ]
                let receiver = receiver.lock().unwrap();
3 R3 d5 ~9 c  g                let message=  receiver.recv().unwrap();
5 z* u* S5 }7 ~  b+ ]" ^8 K, P                match message {
3 Y8 c2 M3 U4 J0 \8 P7 I" K                    Message::NewJob(job) =&gt; {
6 ?3 o  e3 U/ T: y- a                        println!("do job from worker[{}]", id);
" }0 J, r! f$ T7 B# V' E                        job();' D! ?) `. Y# b! [- ]
                    },
, g' F: o, J4 ]2 z" a                    Message::ByeBye =&gt; {
# k1 O! e, n$ y! T8 w# B                        println!("ByeBye from worker[{}]", id);
* U4 U. E9 L' G                        break
  p5 G9 T9 B; O' k" B2 N                    },) S/ g& s  Y/ a+ |$ g! v" j
                }    n" j+ y0 A- v+ e8 e7 `
            }
7 j9 W9 ~: y3 b8 O/ S        });
2 \8 W* A2 ^( V( Z4 U
8 ^( {! C4 I* n3 W$ `        Worker {
( Q# n) _0 D; F- k/ U            _id: id,8 p  s5 I- K8 @. ^
            t: Some(t),: q. a5 `4 k. |4 `
        }
8 Q1 ?6 N- s; _! ]: s    }7 S, \! K& I1 g( h& T. y
}& h9 ^; z' w6 k6 ]
</code></pre>$ a, B% Q. `" ~9 ^) i  }" f, [
<p><strong>let message = receiver.lock().unwrap().recv().unwrap();</strong> 这里获取锁后从receiver获取到消息体,然后let message结束后rust的生命周期会自动释放掉锁。<br>
. z& R, f" l3 Q; L但如果写成</p>6 F. y0 `2 Z. ?/ X
<pre><code>while let message = receiver.lock().unwrap().recv().unwrap() {8 `' ?% W! H$ q
};/ |8 z  D! y4 ?2 q
</code></pre>  o1 |% ]8 I' x, H0 u  G/ V
<p>while let 后面整个括号都是一个作用域,要在这个作用域结束后,锁才会释放,比上面let message要锁定久时间。<br>
/ ]1 S' @' x( k  ]0 E5 q0 l9 qrust的mutex锁没有对应的unlock方法,由mutex的生命周期管理。</p>6 S6 f* ?7 Q6 f
<p>我们给Pool实现<code>Drop</code> trait, 让Pool被销毁时,自动暂停掉worker线程的执行。</p>
0 R3 Y0 I6 G; p<pre><code>impl Drop for Pool {
( o1 f4 C/ a; W9 ]& D4 S3 J6 f    fn drop(&amp;mut self) {
0 v% r# l" N' F2 K0 g8 \& c. a, c        for _ in 0..self.max_workers {
/ |( p4 _0 S' L7 s+ w6 m/ \% o            self.sender.send(Message::ByeBye).unwrap();
' T1 E, k# M) E' ^' V        }
' J) y0 I8 S3 Y6 `        for w in self.workers.iter_mut() {8 Z' V: {/ {: |+ M
            if let Some(t) = w.t.take() {3 \5 K( d" i( J  W
                t.join().unwrap();' b! O. d1 L9 E. |" k
            }. t  @4 d+ l% J4 G1 k4 G
        }7 G  G/ c; b$ ^5 N  J
    }
" G/ j8 F/ v+ r}/ ?2 W+ p" k8 e  d) j% o

; ~( }+ T, u; M8 t7 p, X! ~</code></pre>
- o' `1 Y% X: o( p<p><strong>drop方法里面用了两个循环</strong>,而不是在一个循环里做完两件事?</p>
! q3 X1 I1 J7 B8 y! t' U* B) T<pre><code>for w in self.workers.iter_mut() {6 ]; u' ?6 _4 E' p# r( K* {$ X4 x  x5 {
    if let Some(t) = w.t.take() {
; W8 M& z6 g) d! j; h        self.sender.send(Message::ByeBye).unwrap();
! Y, ^1 D  R0 g1 r* Q# i5 F        t.join().unwrap();
* a% A3 g! t" _0 M. ]4 E& I1 A    }9 ^0 S  e" P5 ^, d6 x
}
5 U$ @2 N; w" }) }) A
' J3 {; G1 R+ W2 s. N" H" A</code></pre>
9 U3 c2 j  `9 V: M" z- C<p>这里面隐藏了一个会造成死锁的陷阱,比如两个Worker, 在单个循环里面迭代所有Worker,再将终止信息发送给通道后,直接调用join,<br>5 I3 M& |. R) s7 q, J- C
我们预期是第一个worker要收到消息,并且等他执行完。当情况可能是第二个worker获取到了消息,第一个worker没有获取到,那接下来的join就会阻塞造成死锁。</p>* `( ?7 C2 c, ?9 q: ~* y+ t7 a! y8 s
<p><strong>注意到没有,Worker是被包装在Option内的</strong>,这里有两个点需要注意</p>
5 F' N) x6 ?1 t, V/ W( R; F<ol>& B5 o$ R* a& M$ ?2 I$ O% g0 d. i
<li>t.join 需要持有t的所有权</li>
4 O9 d2 c* t5 x<li>在我们这种情况下,self.workers只能作为引用被for循环迭代。</li>
9 d2 U! [2 O; y; ~3 p# a4 D/ |</ol>6 S$ c4 i- x( f* o5 \' Q
<p>这里考虑让Worker持有<code>Option&lt;JoinHandle&lt;()&gt;&gt;</code>,后续可以通过在Option上调用take方法将Some变体的值移出来,并在原来的位置留下None变体。<br>
& M3 {) Y1 u2 I, h! j# n换而言之,让运行中的worker持有Some的变体,清理worker时,可以使用None替换掉Some,从而让Worker失去可以运行的线程</p>! B( p: m8 j- d" H# M
<pre><code>struct Worker where
* r) M) e. }! ]" X+ L2 m: z! r{
& ~3 N- ]; |) u/ J; B5 _    _id: usize,
9 [* `4 d& ?, y4 J$ Z* l    t: Option&lt;JoinHandle&lt;()&gt;&gt;,
% H  ]+ y$ v7 q2 K! [7 c: t1 L}
6 d' L9 }( T" a</code></pre>
' p( k5 l# S' s1 C7 a! F# Y<h1 id="要点总结">要点总结</h1>
# L* Q  H' m1 r" F9 L  M<ul>
+ k1 k: j, c- u* O1 S<li>Mutex依赖于生命周期管理锁的释放,使用的时候需要注意是否逾期持有锁</li>- _1 \6 Q5 d" G2 d4 _2 t
<li><code>Vec&lt;Option&lt;T&gt;&gt;</code> 可以解决某些情况下需要T所有权的场景</li>) Q; v6 _) k- }' I  I. W
</ul>
; l6 _$ B& B3 H( t1 l/ U<h1 id="完整代码">完整代码</h1>- p) i. A, u) H  i+ R
<pre><code>use std::thread::{self, JoinHandle};
: y5 f* R- y8 u1 v9 }9 wuse std::sync::{Arc, mpsc, Mutex};
% o0 f0 C- ~, c8 ]  u1 n& a& Z% m( M. h: \
3 R1 w. z; m' ~: O
type Job = Box&lt;dyn FnOnce() + 'static + Send&gt;;3 |9 ^+ b9 R' f! X  T$ J
enum Message {5 q5 t4 @7 B- y' w+ L8 u' h. Z8 k
    ByeBye,0 t; E  [* W! o! p; v; H; _' D
    NewJob(Job),
/ t' R; _+ p" f3 Y}
1 F# {, [% l, d2 b# `: l: o3 |) I! P( R7 S( `
struct Worker where
5 ?% _5 j7 G  s8 q" a: }{- ^7 j, a* c# g" `& z; T* l5 p* G
    _id: usize,7 j/ ]7 {5 A  u0 [' j1 a6 G
    t: Option&lt;JoinHandle&lt;()&gt;&gt;," q: X, O8 z0 Y' [) T- S
}6 m7 v. ^' R; f; M& s! o

/ B6 ^& ]3 }. \6 Q  H, k/ q) oimpl Worker
& J- O  d7 Z; d" \{* R1 }( S) W  _! _
    fn new(id: usize, receiver: Arc::&lt;Mutex&lt;mpsc::Receiver&lt;Message&gt;&gt;&gt;) -&gt; Worker {
! u: Y: w. K  T4 U$ h0 I8 _6 Q        let t = thread::spawn( move || {3 \1 k/ ~5 t' E  y
            loop {' T3 L8 |- S, O
                let message = receiver.lock().unwrap().recv().unwrap();! t, ?3 d7 X5 G0 t' Q$ k9 N8 u
                match message {) g' O8 b* t6 e" [
                    Message::NewJob(job) =&gt; {" D" z; U. ~1 D
                        println!("do job from worker[{}]", id);
  M5 f5 _' b5 ~. O* q+ R                        job();- j& ~9 S1 q( d" I/ ~0 d. {
                    },8 \  w: Q) m( D+ ~3 U' L
                    Message::ByeBye =&gt; {2 l# ]0 h; [% T# L/ N/ [' z4 r
                        println!("ByeBye from worker[{}]", id);6 {4 Z/ O. s, E- }. ~* N
                        break
  B& G2 t9 ]5 y' Y5 Q: P% l                    },
6 A9 W* U8 K8 L; L                }  
' E4 w  i# S4 n0 R            }) _1 m* Y6 c$ d5 S8 ?3 k  d
        });$ C8 w0 E, `8 F
8 g( S% u8 n5 G& ?7 m) s
        Worker {
& O( U3 l( f; V; J. v5 V: P$ R            _id: id,' M- g2 V( [5 D/ G4 D
            t: Some(t),4 T0 ]. v# L! M: H" @
        }
4 O2 S% U/ n9 H$ U& F, b3 I    }5 C7 r* J% g% }8 q; ]4 @1 _
}
. ?5 w6 m5 O0 I0 n# G8 a) _5 }, _& C/ E5 T7 o4 q# f& T2 y! s
pub struct Pool {
: M, V+ S) S3 {    workers: Vec&lt;Worker&gt;,6 N4 U2 X7 S& s/ M& Q3 b
    max_workers: usize,, D- O7 ~% w5 Y- s; s) ~
    sender: mpsc::Sender&lt;Message&gt;
6 I& O& r' I+ x  V+ v}& ?6 x! U0 U' }" C% G: U) o
5 h$ |' _. w+ a0 h
impl Pool where {% N! y% ~0 `1 Q
    pub fn new(max_workers: usize) -&gt; Pool {
$ P+ N% b% e* d        if max_workers == 0 {& B4 J1 S' s. S) e8 k( o
            panic!("max_workers must be greater than zero!")
( I! H/ h! X; j9 u        }* p( r/ h+ o# M: `8 J
        let (tx, rx) = mpsc::channel();; ~( I0 ^& t5 d) P

' ?2 o  W: b% L# p- L5 w        let mut workers = Vec::with_capacity(max_workers);0 _* q7 R' T. R' y5 a. x  f
        let receiver = Arc::new(Mutex::new(rx));
7 e! g$ F3 K, [& ?9 b        for i in 0..max_workers {$ |2 Y' y; \8 w( |9 L
            workers.push(Worker::new(i, Arc::clone(&amp;receiver)));
; I8 G( t' }/ }; l0 {0 I        }
! x* ?3 x. _2 J, ]5 e$ l- Z! r( j+ m* w0 O% ?' e' c
        Pool { workers: workers, max_workers: max_workers, sender: tx }
$ U+ ?, Z5 k' B. a1 [# j    }
- D/ P! D# F1 b8 f; r" ?" g$ D    ; |# Z7 b; U' h0 d' C
    pub fn execute&lt;F&gt;(&amp;self, f:F) where F: FnOnce() + 'static + Send2 E, k- n% n) R! n* s$ w% S
    {
& o+ f- q) h; x
) I! ?( |4 X0 x: x# W7 i) t        let job = Message::NewJob(Box::new(f));* E0 P2 v& m9 d7 z0 U0 L1 R
        self.sender.send(job).unwrap();' M' M$ f4 y! c- S& f8 X7 n
    }
6 D) f2 r* Z1 a}
' N" F; g. G% N5 G; s9 ^
; a# M' Q- u& o. s, a" X- Pimpl Drop for Pool {; t0 h9 G; ?4 f" z# }
    fn drop(&amp;mut self) {: _; y) Z, ~% @$ y& v
        for _ in 0..self.max_workers {* M+ g3 U, U3 b, t) k. s+ O; v
            self.sender.send(Message::ByeBye).unwrap();
% ^" d3 M* {9 }* F        }
2 F  v4 w5 u; t/ c7 n# n+ |1 V        for w in self.workers {
- b" ?* u& Y' |  J: I            if let Some(t) = w.t.take() {
- n( E. [8 O4 J& y3 s) U                t.join().unwrap();
, q4 t. ?' j; U/ `# {. T0 g+ O            }9 T* o; D1 q. m( A# D) ^6 E
        }
+ r7 l5 t$ x7 B# b* C/ m! ^# E    }# S2 r- Q& D, U) h7 k# L
}
9 w, K/ \$ \  s' f
+ W- M5 C0 P3 b8 w* }' |0 k8 \8 W1 b3 ?" I$ W
#[cfg(test)]3 p- S7 L! p/ p/ Q$ |2 c
mod tests {: z: z) v" E, L5 \" n
    use super::*;0 S/ w8 Y6 A7 S- b3 _8 q/ ~. i
    #[test]
2 w; T# Q4 M! J# S7 `( G, f    fn it_works() {
5 S/ H2 J2 h+ i) a: o1 M& N        let p = Pool::new(4);, N& x/ Y5 j/ M1 v/ p  }! o+ U5 E" {& q
        p.execute(|| println!("do new job1"));0 A' m: A  m. z4 N0 K* p
        p.execute(|| println!("do new job2"));
3 ]! Z( Z+ H, F, |        p.execute(|| println!("do new job3"));
0 P, U# L& V- l4 n8 D        p.execute(|| println!("do new job4"));& Q9 c/ c9 o' J8 M7 S9 _
    }
: {  K3 P4 c2 J: r! t/ G! i}
& z5 Q9 n6 e" [' \5 W# j</code></pre>
3 X7 O9 P0 j. f9 z0 b% L" g* q# R' U% H% i2 H2 F
回复

使用道具 举报

懒得打字嘛,点击右侧快捷回复 【右侧内容,后台自定义】
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

手机版|飞雪团队

GMT+8, 2025-12-7 21:41 , Processed in 0.062665 second(s), 22 queries , Gzip On.

Powered by Discuz! X3.4

Copyright © 2001-2021, Tencent Cloud.

快速回复 返回顶部 返回列表