飞雪团队

 找回密码
 立即注册
搜索
热搜: 活动 交友 discuz
查看: 4513|回复: 0

rust 实战 - 实现一个线程工作池 ThreadPool

[复制链接]

4822

主题

4910

帖子

1万

积分

管理员

Rank: 9Rank: 9Rank: 9

积分
16784
发表于 2022-2-12 14:35:42 | 显示全部楼层 |阅读模式

7 {5 ~6 w; @& q& l" G* ?& U* R<h1 id="如何实现一个线程池">如何实现一个线程池</h1>+ z6 u1 l  p8 @
<p>线程池:一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。 例如,对于计算密集型任务,线程数一般取cpu数量+2比较合适,线程数过多会导致额外的线程切换开销。</p>0 F" s2 J) i) u. B8 f7 {
<p>如何定义线程池Pool呢,首先最大线程数量肯定要作为线程池的一个属性,并且在new Pool时创建指定的线程。</p>% T) k- m2 j. b1 ]) V3 o& J5 j8 m
<p>线程池Pool</p>
9 Y4 }% d: a8 `9 K; V$ _' c2 `<pre><code>pub struct Pool {
" K7 C  k) ~7 o' @  max_workers: usize, // 定义最大线程数
0 z" R; L, N, C1 c8 V! O6 ]* ~}
, }* F; k7 a9 R% ~5 E/ y' f. q- u
impl Pool {
% W, H1 {+ }" R4 M. @4 s5 r" T  fn new(max_workers: usize) -&gt; Pool {}
' Q2 Y4 c( G% _7 U  fn execute&lt;F&gt;(&amp;self, f:F) where F: FnOnce() + 'static + Send {}
% B5 }0 `5 v0 ]' N0 P3 U( }}4 D# m* L3 [7 u  W

- I" V: z+ M  ~6 b* W  F5 Y</code></pre>( _3 {! G' I& [: N6 D  b
<p>用<code>execute</code>来执行任务,<code>F: FnOnce() + 'static + Send</code> 是使用thread::spawn线程执行需要满足的trait, 代表F是一个能在线程里执行的闭包函数。</p>
8 K1 c1 g2 T( l<p>另一点自然而然会想到在Pool添加一个线程数组, 这个线程数组就是用来执行任务的。比如<code>Vec&lt;Thread&gt;</code> balabala。这里的线程是活的,是一个个不断接受任务然后执行的实体。<br>
. e; z! N4 ^' A* r$ E可以看作在一个线程里不断执行获取任务并执行的Worker。</p>
( N1 @2 w2 o* Q3 l- F$ @<pre><code>struct Worker where& g  P% d" R, ~% u4 v- m
{& W$ \& U* S, I+ n$ V& D% l
    _id: usize, // worker 编号5 w2 C. o( ], l* P: v, D
}
+ S9 w) d  V7 [7 A# s4 A4 v</code></pre>7 b7 ^- v5 Z6 _( b2 |5 L; t
<p>要怎么把任务发送给Worker执行呢?mpsc(multi producer single consumer) 多生产者单消费者可以满足我们的需求,<code>let (tx, rx) = mpsc::channel()</code> 可以获取到一对发送端和接收端。<br>
: Q, H! U. W! a9 n把发送端添加到Pool里面,把接收端添加到Worker里面。Pool通过channel将任务发送给多个worker消费执行。</p>
2 G! M5 C* a( ?) c) ]  A<p><strong>这里有一点需要特别注意,channel的接收端receiver需要安全的在多个线程间共享</strong>,因此需要用<code>Arc&lt;Mutex::&lt;T&gt;&gt;</code>来包裹起来,也就是用锁来解决并发冲突。</p>
% [$ W, g' D6 V# B" p' D<p>Pool的完整定义</p>& W2 a7 R4 C6 W2 K5 {
<pre><code>pub struct Pool {
6 y# e- y( r' X  O9 w5 L" H6 T    workers: Vec&lt;Worker&gt;,2 a* `- q7 }; p0 b* H
    max_workers: usize,) S% F) s: \. b/ K$ M3 t/ E
    sender: mpsc::Sender&lt;Message&gt;$ `4 U  K# L. ]  T( ]5 M( [
}
& ~% Z& y' z6 @: P% C</code></pre>1 l5 }$ y7 b# F5 W/ Y& J/ Q: _
<p>该是时候定义我们要发给Worker的消息Message了<br>
4 ]+ M# J9 x4 P7 ^8 H9 ?- x定义如下的枚举值</p>& S+ u1 n& l$ F
<pre><code>type Job = Box&lt;dyn FnOnce() + 'static + Send&gt;;0 g4 Z" O6 T9 G7 \: ~0 J
enum Message {
2 r+ T' ?  V: E9 e- p    ByeBye,# C/ m8 K, L3 P2 P! k4 [
    NewJob(Job),! K: w9 l. v8 A0 P0 h2 H8 I
}6 R5 _) a! }& }2 ]8 v
</code></pre>' E9 [/ N9 f5 @3 D. K
<p>Job是一个要发送给Worker执行的闭包函数,这里ByeBye用来通知Worker可以终止当前的执行,退出线程。</p>9 W. ?" n0 h( g: t8 g
<p>只剩下实现Worker和Pool的具体逻辑了。</p>4 c, n/ T1 z9 ~3 t
<p>Worker的实现</p>5 x5 o& A5 s, S" T) q. a, t6 W- d
<pre><code>impl Worker! E) \) U& k2 H2 _
{( ~. \& q; j; j' `' w' ^
    fn new(id: usize, receiver: Arc::&lt;Mutex&lt;mpsc::Receiver&lt;Message&gt;&gt;&gt;) -&gt; Worker {5 O: n8 Z4 d. u; J! K
        let t = thread::spawn( move || {
9 ^5 N' y' N3 F" a# w* G# \! G            loop {
5 `+ _$ [' u5 E# z' Y) Y! B+ `                let receiver = receiver.lock().unwrap();
1 u! P* g; G: S* R                let message=  receiver.recv().unwrap();
( F7 r; }# ^$ O                match message {  V( G" ~( L$ C2 K8 S4 p# \
                    Message::NewJob(job) =&gt; {
" ~; N- ?( ]0 D" y/ }; c; w                        println!("do job from worker[{}]", id);
' v* ?$ {; ~. O& a5 H& s7 B# b% O1 M                        job();
. [2 z5 R$ y: g6 A* p                    },
2 _/ Z0 c, [) @# T7 q. r                    Message::ByeBye =&gt; {
- D5 t: L0 `: Z) u+ s& R2 C                        println!("ByeBye from worker[{}]", id);
" c; K$ ?5 X4 _                        break
9 L+ L" ]' d5 g& K. I1 G                    },/ ^6 G" v4 c# `7 p* a8 K* ~
                }  
/ ^. j# w. |. l            }& W9 [& ^. q0 }2 ~  i! T5 ?
        });
$ x2 @5 K% l- N$ r$ E2 O) x. C8 `  P/ P, T% g. C
        Worker {
4 `% X" f5 ?/ p! A. B8 N* l            _id: id,
. f6 P5 d# U' E  E  b+ [, g            t: Some(t),
  y+ I, K; P- y        }
! U+ z# b  F% h9 Z" N7 ~% }5 o    }
. I  S' [* P, |  j9 m% a: ]6 y}4 g6 E$ ]6 [. Q- I" p$ H2 s
</code></pre>  `2 e) p: m2 ]) ~9 Z
<p><strong>let message = receiver.lock().unwrap().recv().unwrap();</strong> 这里获取锁后从receiver获取到消息体,然后let message结束后rust的生命周期会自动释放掉锁。<br>
; `0 O4 Q' k$ X: y3 @+ b% \: I# B: ~但如果写成</p>
7 e8 _3 M' P. Y( f* l. [' O<pre><code>while let message = receiver.lock().unwrap().recv().unwrap() {- P6 b/ O* O2 K
};0 I7 F. C) v, `2 d
</code></pre>
4 E' H7 \% Y! c4 }% a<p>while let 后面整个括号都是一个作用域,要在这个作用域结束后,锁才会释放,比上面let message要锁定久时间。<br>
7 V+ G: N8 h1 R, W4 @) M# zrust的mutex锁没有对应的unlock方法,由mutex的生命周期管理。</p>; u: k& b5 C: K2 |* _
<p>我们给Pool实现<code>Drop</code> trait, 让Pool被销毁时,自动暂停掉worker线程的执行。</p>. f0 o) F) M' Q# ]- e& a# T
<pre><code>impl Drop for Pool {
  `1 T+ v! ~4 ?9 p% b9 ^* H    fn drop(&amp;mut self) {9 g  U# L% A  }% T& [6 G% q: T
        for _ in 0..self.max_workers {
5 _4 V, H" V% |0 Y/ E            self.sender.send(Message::ByeBye).unwrap();$ J3 X, U3 o& R0 {
        }2 N4 s( V/ F- d- q# z6 e
        for w in self.workers.iter_mut() {; |, ?2 Q( M1 E+ B9 c
            if let Some(t) = w.t.take() {
! U' M! l0 t% F                t.join().unwrap();" L# N5 e6 n1 g9 q8 P$ n
            }
# \! p. h* ~% l. I+ |        }
5 v. q$ o; @1 p    }6 D7 d$ B) Y7 y" d
}
3 v6 q' [4 V1 ~8 _* `. k# c# M( a9 s/ Z0 q
</code></pre>
% y7 f5 ^  \. w3 S; B  l- h<p><strong>drop方法里面用了两个循环</strong>,而不是在一个循环里做完两件事?</p>. C. e6 V, T. m  `; \
<pre><code>for w in self.workers.iter_mut() {
+ r: s* e7 F$ U" Q$ M: n4 c( C' O    if let Some(t) = w.t.take() {; b- D" k4 G! V2 }6 O8 i& x
        self.sender.send(Message::ByeBye).unwrap();
" [/ y; d7 A& F! ~" n' T        t.join().unwrap();
4 T# b+ [1 F8 m1 ~    }
  W- I' ]- r. q7 h, C& ?}
+ f# ?: F4 `3 S
- A( q* P7 o/ C</code></pre>
& q5 Q5 O- Y& }! a% k5 ~8 |( R<p>这里面隐藏了一个会造成死锁的陷阱,比如两个Worker, 在单个循环里面迭代所有Worker,再将终止信息发送给通道后,直接调用join,<br>
5 r  e' @. @2 L" F6 {  x, V我们预期是第一个worker要收到消息,并且等他执行完。当情况可能是第二个worker获取到了消息,第一个worker没有获取到,那接下来的join就会阻塞造成死锁。</p>; X6 _, I( d& k6 B4 A6 v& ?
<p><strong>注意到没有,Worker是被包装在Option内的</strong>,这里有两个点需要注意</p>, B. j) g3 {0 v$ q, X2 E
<ol>
' f/ ^  l: c. n. c( [<li>t.join 需要持有t的所有权</li>% }$ k8 H! z" V1 m3 I8 V$ F/ m
<li>在我们这种情况下,self.workers只能作为引用被for循环迭代。</li>
9 R" P# x) W7 o" c) C</ol>
. V: M! t& q- {. h, \<p>这里考虑让Worker持有<code>Option&lt;JoinHandle&lt;()&gt;&gt;</code>,后续可以通过在Option上调用take方法将Some变体的值移出来,并在原来的位置留下None变体。<br>
, H% g, N. A+ C8 P3 ?/ W换而言之,让运行中的worker持有Some的变体,清理worker时,可以使用None替换掉Some,从而让Worker失去可以运行的线程</p>
- ~' P( }, z9 f+ U* U' c5 Q<pre><code>struct Worker where
9 [$ `3 D- K$ g$ |7 u{4 B% v; m, |1 }" X
    _id: usize,; Z7 E! o3 e# C0 ?6 G
    t: Option&lt;JoinHandle&lt;()&gt;&gt;,! _0 _' a' q' V; h. }
}
8 }7 V9 C2 `! U4 N( s* _7 N</code></pre>! ~, d, ?* M4 g. e1 W. {/ F
<h1 id="要点总结">要点总结</h1>
" v8 b6 E- G# |<ul>, d6 m) s4 j& `. Z
<li>Mutex依赖于生命周期管理锁的释放,使用的时候需要注意是否逾期持有锁</li>
4 _9 s& A) i" x' t: S8 C<li><code>Vec&lt;Option&lt;T&gt;&gt;</code> 可以解决某些情况下需要T所有权的场景</li>
3 E$ ~, I  _7 x* G: `</ul>
6 g: r4 `. y$ u" @<h1 id="完整代码">完整代码</h1>" S  A9 R" o. k( D2 J: m
<pre><code>use std::thread::{self, JoinHandle};
8 a0 h% @2 ]& ^1 i9 Duse std::sync::{Arc, mpsc, Mutex};/ J3 m' M' ?$ N! U
- x9 ?: z- o4 a+ t( I

. i2 ^8 D7 I# _5 v6 Dtype Job = Box&lt;dyn FnOnce() + 'static + Send&gt;;% C6 [# n7 B, d, V1 e
enum Message {% a' i0 a8 V* N9 b: F
    ByeBye,
) k4 l* A" f% n- n: t/ v  s    NewJob(Job),5 T, w/ C, k4 s7 Z+ _# j; _, |; _
}
8 O( {3 Z  ~5 h1 z2 h
- R- M2 p7 G8 ~, T& G: t* a4 }* _" \- _struct Worker where5 b: s5 n6 O  l1 G. `
{
& K8 P0 H$ q; G    _id: usize,& V8 v2 [. V2 {4 J% ^8 w& \+ h# e
    t: Option&lt;JoinHandle&lt;()&gt;&gt;,9 q' A3 P. Y' w. Y
}8 R7 G7 V! O. E$ |' }4 S8 U

! q& X! b3 b, r4 Gimpl Worker
- D* B" ~* m8 C{
0 B$ u# G- n: |! t9 x: K* x9 l    fn new(id: usize, receiver: Arc::&lt;Mutex&lt;mpsc::Receiver&lt;Message&gt;&gt;&gt;) -&gt; Worker {, e" ~* E8 h! q' ]2 E
        let t = thread::spawn( move || {
5 k% O$ @5 p$ L) h. W0 h3 p% V            loop {% m: @" f4 A( T1 V, A- v  k
                let message = receiver.lock().unwrap().recv().unwrap();6 S+ f0 E' C1 b
                match message {9 z# J! n% {/ L2 v" \
                    Message::NewJob(job) =&gt; {) X% k2 z- h2 q2 I( X0 A
                        println!("do job from worker[{}]", id);' o2 w% c, c+ D9 g% ~. n- U
                        job();
# G0 {4 g8 v1 _4 U* V6 ?! P8 c                    },5 r" W( W6 f9 t
                    Message::ByeBye =&gt; {
5 Q0 K$ y0 A1 Y& c" T6 b                        println!("ByeBye from worker[{}]", id);2 E* X6 e3 c9 P0 U* j2 G& E
                        break( V9 _; g$ G  ~* I  k8 c
                    },* o: m  ]# A6 B
                }  
9 i4 n, t8 h4 W- l6 G            }
+ b6 ^, ?- x$ x$ k4 I" i        });9 y, h/ B! D' j9 `" T, U6 k- q

* W9 Z. w7 {& A1 v2 [        Worker {; z" L7 b9 D  x
            _id: id,3 C4 e$ g+ `# d, ~" |
            t: Some(t),. N' j$ I2 X# N! @9 l  \
        }$ H1 R' v0 {+ t7 @
    }& f. f; n5 [3 u0 \/ w" y& [' i
}
9 t( e% o) c$ m& w/ h2 X& D" W  u1 {1 L0 L" T- O; B
pub struct Pool {1 T  {, Z6 T  h
    workers: Vec&lt;Worker&gt;,4 c* S: F! E- l0 N: ~3 O* R" {1 o
    max_workers: usize,2 I. T4 c: M5 v$ t/ f  e
    sender: mpsc::Sender&lt;Message&gt;
( S% M8 N2 z0 Q& N: |}* t, q$ h/ K' g

& ]# M  U  n% pimpl Pool where {
5 j% L: G( O! e% Z8 N* T    pub fn new(max_workers: usize) -&gt; Pool {& t: @5 U- d5 D$ g' ~( O  Y4 |
        if max_workers == 0 {0 r! c! u. v6 r2 A% x
            panic!("max_workers must be greater than zero!")
9 B) h, u+ k4 P  M0 A        }
: X1 l. i7 i2 g, a        let (tx, rx) = mpsc::channel();+ n/ R& X1 ~3 J" t" E- a9 j
3 ?% c8 Y/ p4 N  B+ |# L$ t/ |4 ^
        let mut workers = Vec::with_capacity(max_workers);  o! n/ \8 [9 [/ V" e3 t' e
        let receiver = Arc::new(Mutex::new(rx));
' `, m: F7 I' Z6 f0 W        for i in 0..max_workers {. N9 I0 @5 S3 o. [) h2 j7 p  L6 ?
            workers.push(Worker::new(i, Arc::clone(&amp;receiver)));
+ g1 p% e4 \1 x3 H7 i% G- H        }$ V: v+ h0 m9 v, D0 @

8 B3 c' F6 ?+ r. L        Pool { workers: workers, max_workers: max_workers, sender: tx }1 S' R9 ^' T. t3 g0 X
    }8 z: o4 S$ J/ p* e8 M
   
, Y6 \% e; I. |  K    pub fn execute&lt;F&gt;(&amp;self, f:F) where F: FnOnce() + 'static + Send
' a9 j7 b* p- _7 X& j) i    {
* Y+ g% y% Y! }5 i1 r) R2 w0 t7 @8 z( d5 Y% s+ J) h: P
        let job = Message::NewJob(Box::new(f));
# k* c2 N9 R) ]/ x/ _( @- f; ?        self.sender.send(job).unwrap();; a: t2 I" z( C4 y
    }
8 S1 l  V5 x' n* y+ t" m}  j5 `$ n+ P9 _

3 J# ^5 ]! A$ @1 I) `! g5 [7 Cimpl Drop for Pool {
. q7 r1 z6 |. G' Q5 I) m, {' m    fn drop(&amp;mut self) {
3 B8 a9 ^7 w/ L- r& D        for _ in 0..self.max_workers {
. z3 h/ |% ]# N4 p$ v8 d            self.sender.send(Message::ByeBye).unwrap();4 e- M% _, ~0 r
        }
" k& T' r% e+ c0 U* R, x5 A        for w in self.workers {
- A4 ?; B) ?9 y            if let Some(t) = w.t.take() {4 \+ i+ h7 Q- {. P5 M  S9 H
                t.join().unwrap();3 P3 c/ j1 C2 P' ?) p. l: v  ]! P8 [
            }# X/ t+ {* |! X
        }  I: H0 T" k. U- B, e8 X
    }
, S% J3 R, S. i! n$ q}
5 P& @& b2 l, N1 t' M
! d6 \9 Q) J2 i; N" C: B0 z1 Q' I5 u% d0 i
#[cfg(test)], j1 [. s- e2 Y  p. v
mod tests {! d: i3 C2 x0 b2 C- o
    use super::*;( [( Z: t( w/ Q: k
    #[test]
- F  M+ ~6 r2 X+ _    fn it_works() {) C0 G1 t" E0 O) h
        let p = Pool::new(4);
* j7 B; e, g& m- q8 S% i        p.execute(|| println!("do new job1"));
: T1 e7 @. T: x& ?        p.execute(|| println!("do new job2"));& b2 Y& M3 S7 F- N0 S
        p.execute(|| println!("do new job3"));
- H) z, i3 t9 e! A        p.execute(|| println!("do new job4"));) q" i+ Z6 k  K; c9 y- }5 _
    }( `9 `/ t; X+ j/ m& @8 O& [
}" Z' b" c! d: s3 `: `* Z/ J: D
</code></pre>
2 H" D& l" A6 D' J
/ T( n, ?/ {  t: }4 z2 L6 S6 \
回复

使用道具 举报

懒得打字嘛,点击右侧快捷回复 【右侧内容,后台自定义】
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

手机版|飞雪团队

GMT+8, 2024-9-20 00:04 , Processed in 0.063179 second(s), 22 queries , Gzip On.

Powered by Discuz! X3.4

Copyright © 2001-2021, Tencent Cloud.

快速回复 返回顶部 返回列表