飞雪团队

 找回密码
 立即注册
搜索
热搜: 活动 交友 discuz
查看: 14606|回复: 0

rust 实战 - 实现一个线程工作池 ThreadPool

[复制链接]

8034

主题

8122

帖子

2万

积分

管理员

Rank: 9Rank: 9Rank: 9

积分
26432
发表于 2022-2-12 14:35:42 | 显示全部楼层 |阅读模式
6 E) U: z0 w5 y: W; P( w
<h1 id="如何实现一个线程池">如何实现一个线程池</h1>
' X+ C* K+ [3 Z2 M; d<p>线程池:一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。 例如,对于计算密集型任务,线程数一般取cpu数量+2比较合适,线程数过多会导致额外的线程切换开销。</p>* C3 ~5 [2 z" ~0 t: v) C
<p>如何定义线程池Pool呢,首先最大线程数量肯定要作为线程池的一个属性,并且在new Pool时创建指定的线程。</p>( R, |& Z1 z9 d1 t
<p>线程池Pool</p>1 s6 b4 C+ r" r
<pre><code>pub struct Pool {8 j: w6 o( k8 T. w3 H
  max_workers: usize, // 定义最大线程数
; t! K, C! K. [, ]* P2 s' w7 H}& [: U: H, E5 b
- ~0 u1 [9 U5 _! B( h$ R
impl Pool {
# T" E! e9 K5 Q9 ^) H' S  fn new(max_workers: usize) -&gt; Pool {}
' N- |" S: M. J- B8 `  fn execute&lt;F&gt;(&amp;self, f:F) where F: FnOnce() + 'static + Send {}
! p. t/ e1 A* o/ ^9 ]  B}1 H2 \+ q1 w8 Z2 u
! `) x1 W6 d" c" h9 S: U# ?
</code></pre>& C. w7 o# J3 r8 @5 J0 e# v) R
<p>用<code>execute</code>来执行任务,<code>F: FnOnce() + 'static + Send</code> 是使用thread::spawn线程执行需要满足的trait, 代表F是一个能在线程里执行的闭包函数。</p>9 }% A, F- t3 _, S3 G: ?8 x
<p>另一点自然而然会想到在Pool添加一个线程数组, 这个线程数组就是用来执行任务的。比如<code>Vec&lt;Thread&gt;</code> balabala。这里的线程是活的,是一个个不断接受任务然后执行的实体。<br>
$ X# n3 V& {7 }" {5 [可以看作在一个线程里不断执行获取任务并执行的Worker。</p>
$ X0 P3 U3 A3 C# W2 [- U9 ^<pre><code>struct Worker where
# q; K5 b, ?( a# v4 m7 y{
* j+ m- t2 P/ l# _, g* `    _id: usize, // worker 编号7 C( [* O8 M8 l8 }3 ^6 L
}" w+ A. a* o* G( [0 _- K
</code></pre>
) F: O1 C+ d0 [& t<p>要怎么把任务发送给Worker执行呢?mpsc(multi producer single consumer) 多生产者单消费者可以满足我们的需求,<code>let (tx, rx) = mpsc::channel()</code> 可以获取到一对发送端和接收端。<br>
1 N+ O; O$ n0 F1 O1 {) P' G把发送端添加到Pool里面,把接收端添加到Worker里面。Pool通过channel将任务发送给多个worker消费执行。</p>
, s1 u7 E& k) `* h<p><strong>这里有一点需要特别注意,channel的接收端receiver需要安全的在多个线程间共享</strong>,因此需要用<code>Arc&lt;Mutex::&lt;T&gt;&gt;</code>来包裹起来,也就是用锁来解决并发冲突。</p>
! O! s1 @/ o* @; ]* {7 _' u; m<p>Pool的完整定义</p>
3 W. k2 o! x) a. P<pre><code>pub struct Pool {
. z, d: V2 W+ ]! e/ h/ m8 \    workers: Vec&lt;Worker&gt;,
! \) ~' I6 U9 B; z+ R6 t. p    max_workers: usize,
4 V, ]3 G8 @6 B, T8 T    sender: mpsc::Sender&lt;Message&gt;+ |) V+ N, ]# K3 _* O* W1 Y
}1 O! k  T4 Z, \$ @, h
</code></pre>% M* I7 b$ S  H/ w' t
<p>该是时候定义我们要发给Worker的消息Message了<br>4 P) s  O3 f4 X3 F1 x- ^6 ?8 y) z* V
定义如下的枚举值</p>
* S# F9 i7 m$ P6 Q* i<pre><code>type Job = Box&lt;dyn FnOnce() + 'static + Send&gt;;
* Y% \! V) a3 b7 }3 Nenum Message {+ Z/ ?' z/ b# s* l
    ByeBye,
. `8 M. k7 k" ~* @    NewJob(Job),
1 T; M) q: r3 N2 `" H1 y}
; b$ b& }4 I3 x1 H. g</code></pre>
6 t. |2 S! a' n/ Z+ y/ @- i0 E<p>Job是一个要发送给Worker执行的闭包函数,这里ByeBye用来通知Worker可以终止当前的执行,退出线程。</p>$ t) o3 W' n# B! f
<p>只剩下实现Worker和Pool的具体逻辑了。</p>
2 `1 X0 W7 v9 C! l' B3 x<p>Worker的实现</p>; ?% n3 S/ D( N; ?: w9 Q
<pre><code>impl Worker3 `0 |+ p8 U4 j
{, M7 Y5 s) o7 r$ L( M  o
    fn new(id: usize, receiver: Arc::&lt;Mutex&lt;mpsc::Receiver&lt;Message&gt;&gt;&gt;) -&gt; Worker {/ k7 {5 I; A+ t  d
        let t = thread::spawn( move || {
3 e$ Y) B! O2 ~. l& I            loop {* Q: r# c; F  t2 E
                let receiver = receiver.lock().unwrap();
4 t! j& @; A* s                let message=  receiver.recv().unwrap();' u; B& }9 X: b9 I- W
                match message {
. \" q3 M; {5 b                    Message::NewJob(job) =&gt; {2 j2 ~% F0 d& }' b4 R
                        println!("do job from worker[{}]", id);
+ t. y2 e0 O& a* u& P( h  G  @                        job();
2 O4 I8 o, N0 a* Y/ ?                    },. }% Q' R" j/ g* {
                    Message::ByeBye =&gt; {% s+ o; ]( ?1 L
                        println!("ByeBye from worker[{}]", id);% C) Q8 P/ M; ]& B
                        break
( d; X5 P% D$ e, Z0 e1 ^; a2 I                    },
+ {0 `# C8 T: d6 Y. ~+ Z6 H                }  
* n  w0 v' d! n            }
1 |. j7 D) c, Q2 o7 u4 b5 ?        });
9 t. g7 I5 N* D! N
! _9 t: a7 _1 I9 t! a* ~1 M* c        Worker {
( J4 Y5 [- o, N, I            _id: id,
' w; e4 G( h9 v0 R- I            t: Some(t),
" l6 @/ R  e. i$ c* m/ T% \9 T' W        }
7 I3 q2 Q: F  z4 g% w    }0 e+ a8 W( f; o: a9 T+ z7 @+ [
}) ], s) _$ i6 @! }; B$ Q+ A1 x
</code></pre>. W  n4 h4 y7 u, V. ^
<p><strong>let message = receiver.lock().unwrap().recv().unwrap();</strong> 这里获取锁后从receiver获取到消息体,然后let message结束后rust的生命周期会自动释放掉锁。<br>
( A" b8 u; a  A但如果写成</p>
) O% O2 ^8 G: ?3 h<pre><code>while let message = receiver.lock().unwrap().recv().unwrap() {* q" D$ w, u5 l$ c( Z* M6 V0 v
};
! M: |8 P/ c- }3 f% A</code></pre>- ?' F. G, E0 t' N5 @* N
<p>while let 后面整个括号都是一个作用域,要在这个作用域结束后,锁才会释放,比上面let message要锁定久时间。<br>
( w+ l# P8 }% M* e: ~& R. qrust的mutex锁没有对应的unlock方法,由mutex的生命周期管理。</p>
3 @5 ^  |4 Z  |7 a<p>我们给Pool实现<code>Drop</code> trait, 让Pool被销毁时,自动暂停掉worker线程的执行。</p>, ~' \) ~4 N6 B" k7 s
<pre><code>impl Drop for Pool {& e; v! A" p2 G9 K$ l' E: n: ?
    fn drop(&amp;mut self) {  B: I; s+ Y  @" D0 K  b
        for _ in 0..self.max_workers {
2 m+ K5 s. N% p' x            self.sender.send(Message::ByeBye).unwrap();
$ g$ i0 w. h  o2 V" X% g        }
0 n! `: P! ^9 u: f/ j+ j        for w in self.workers.iter_mut() {" e5 Z$ P' }0 q( t; I& C
            if let Some(t) = w.t.take() {9 [/ \2 ]* b% F9 w
                t.join().unwrap();
5 Y) P8 F- J5 ~4 E  U4 b            }
, X2 z/ a8 |- H) C$ L        }; U. s! m4 c8 P
    }
! }3 q9 {6 }4 a* s}
# ^9 Z0 G6 v2 ?/ v, ]1 I% d
2 G3 b$ n6 v" c% n5 q! Q% K</code></pre>
. [9 I: O9 k9 H<p><strong>drop方法里面用了两个循环</strong>,而不是在一个循环里做完两件事?</p>4 h1 a, V+ q5 a. A7 b
<pre><code>for w in self.workers.iter_mut() {0 w% f$ o+ p0 d$ c5 _/ C% i
    if let Some(t) = w.t.take() {
, G5 {7 ?1 s% {2 J        self.sender.send(Message::ByeBye).unwrap();
- S5 u) q% f- y/ V6 F* C1 n6 z        t.join().unwrap();
- b" O7 z/ B( X    }! Z2 u4 p7 n7 `$ o! N
}
+ s& R9 L3 K6 w: C. |% _1 }9 F4 [" [! U6 i8 i  V1 s/ Z
</code></pre>
5 P; r) C! d" s* c6 Y  B<p>这里面隐藏了一个会造成死锁的陷阱,比如两个Worker, 在单个循环里面迭代所有Worker,再将终止信息发送给通道后,直接调用join,<br>
5 W6 t* p9 G% A1 \- O我们预期是第一个worker要收到消息,并且等他执行完。当情况可能是第二个worker获取到了消息,第一个worker没有获取到,那接下来的join就会阻塞造成死锁。</p>
4 F! h9 Q) D$ E" C4 E& B<p><strong>注意到没有,Worker是被包装在Option内的</strong>,这里有两个点需要注意</p>
4 B' m, B0 T3 @% f% W<ol>
5 R8 p" X6 Z. ^! f: _<li>t.join 需要持有t的所有权</li>3 c) F0 z7 }+ f
<li>在我们这种情况下,self.workers只能作为引用被for循环迭代。</li>
. z1 V) V* s) O  c0 G2 `</ol>
( ?  I+ {4 K2 C9 `# z<p>这里考虑让Worker持有<code>Option&lt;JoinHandle&lt;()&gt;&gt;</code>,后续可以通过在Option上调用take方法将Some变体的值移出来,并在原来的位置留下None变体。<br>) n, N2 _+ ^/ ~. V+ |
换而言之,让运行中的worker持有Some的变体,清理worker时,可以使用None替换掉Some,从而让Worker失去可以运行的线程</p>
( a1 M( q; l) K' z  c6 V8 H<pre><code>struct Worker where
4 \' i1 p3 i+ R$ d. D. q2 ~) F{8 E. c9 D. J+ _' X2 J% ?( Z& K
    _id: usize,
# D+ ~( t; r/ |  T! B: Q7 H6 I    t: Option&lt;JoinHandle&lt;()&gt;&gt;,7 Q, u3 `4 u8 U9 i+ f. u* f
}6 X: s+ H: X  U: h
</code></pre>
; p0 u; \6 G( v+ ?<h1 id="要点总结">要点总结</h1>1 s3 j9 B& }+ Y3 U4 o$ @5 K5 D
<ul>
: ^7 ~5 H; R9 c2 _( k- `<li>Mutex依赖于生命周期管理锁的释放,使用的时候需要注意是否逾期持有锁</li># q5 \% B' S/ u+ g( ^4 `/ g
<li><code>Vec&lt;Option&lt;T&gt;&gt;</code> 可以解决某些情况下需要T所有权的场景</li>
; G+ g$ v5 _& ^- c6 B% L5 x  o</ul>) V+ _2 ?) f/ @
<h1 id="完整代码">完整代码</h1>$ F. Y* Q: a% Q' j
<pre><code>use std::thread::{self, JoinHandle};1 i) y0 Y) V% V" K
use std::sync::{Arc, mpsc, Mutex};
' }8 ?% C7 a% q& }2 \( ?3 c' h; Z: k
# q& Y6 Q4 i' z6 `+ Z/ N
type Job = Box&lt;dyn FnOnce() + 'static + Send&gt;;
* g9 c1 G# h$ @8 ]8 ^enum Message {
( `. j. c" t" R. U, {( o    ByeBye,
7 z! A5 F6 H2 @    NewJob(Job),
  M$ o" a0 z- P/ |$ u7 l7 h}& C5 `5 \8 ]7 z% H! _# V

) t+ m) T2 m) N: \! U+ C0 \$ jstruct Worker where
% |2 _. o; A! p% V5 L$ h{
+ ?, v: ^( F1 N% i. l    _id: usize,
' ^7 p* v6 i8 B7 Y0 k    t: Option&lt;JoinHandle&lt;()&gt;&gt;,1 y; e" T" q5 i) \! R6 ^
}9 K$ K3 f2 L9 c) ]
" F# q9 r& ~; y% Y2 B. Y
impl Worker0 ^% y0 |% D9 W) c# `
{% \  Y: X4 L9 \% O  ~% j" }- w
    fn new(id: usize, receiver: Arc::&lt;Mutex&lt;mpsc::Receiver&lt;Message&gt;&gt;&gt;) -&gt; Worker {% T5 `; d; e2 h5 E3 q
        let t = thread::spawn( move || {
* Q7 B: H9 z: ^. H8 h3 E            loop {6 I. a$ R9 d3 g, q3 X( P
                let message = receiver.lock().unwrap().recv().unwrap();% a& `3 X% K5 E
                match message {
  @1 y" U( y( N3 [5 E, Q; a1 n0 I                    Message::NewJob(job) =&gt; {4 o1 L/ b1 q, }' s
                        println!("do job from worker[{}]", id);
1 h0 t6 Y* b% q, }                        job();
. I* h  O1 j! u; c                    },8 V, y: c% ~7 G% a; H
                    Message::ByeBye =&gt; {
4 m  j& {/ Z+ g$ k$ p- d                        println!("ByeBye from worker[{}]", id);; C5 h( V( x. S
                        break2 c, c) C8 }# [' I( B2 s+ v
                    },7 W1 A+ X3 b% @. W( }' Q# V8 g- p* p
                }  
9 }& x( i0 E4 Z            }: [+ T$ s+ ^& w  Y) u: W9 l
        });
- I: _) x; {; a# x! t
  y1 F/ o2 h. L! _! g- x        Worker {
8 \3 q# `1 V/ f            _id: id,
8 i2 v7 K  @5 F1 F" u: ~            t: Some(t),
6 Q4 T% p6 ~1 {; Q" V) q        }* T) N- T" X6 R
    }
2 {; ^8 Q: {' W8 x}
; z1 C2 d3 Z( f/ C' t1 x8 s, P7 e. u. w$ q
pub struct Pool {
  I9 s8 z1 p4 s: G- J6 D* U    workers: Vec&lt;Worker&gt;,
* K* |6 Y/ ~8 A3 F" f    max_workers: usize,
0 X& Z; P+ \. |: f. i& r    sender: mpsc::Sender&lt;Message&gt;
8 w3 j7 V1 u2 ^  u! D}! F) _" S+ A9 _
# t. u3 H4 n- h7 k+ u+ h8 @4 t
impl Pool where {
% D/ F$ a1 Q+ f2 d    pub fn new(max_workers: usize) -&gt; Pool {9 b- A$ d2 d! T% f
        if max_workers == 0 {0 c% t6 f0 p: z$ F* x3 ], O1 z, `
            panic!("max_workers must be greater than zero!")4 X' ~8 v$ F- }& y5 M
        }* i2 P* o' @) d, K; L6 v
        let (tx, rx) = mpsc::channel();% d: O$ w1 r# u+ g* K6 e3 P. E& \6 _2 E

; S! i2 J0 ?% `: Q- i. I        let mut workers = Vec::with_capacity(max_workers);8 \1 d/ `' s+ }! @6 i) b' {+ _" x1 F. M
        let receiver = Arc::new(Mutex::new(rx));+ Z$ n: O" x( S7 s1 N6 R8 ~9 N. H
        for i in 0..max_workers {; z: @* b" }' A4 U
            workers.push(Worker::new(i, Arc::clone(&amp;receiver)));
& W8 U" }7 N& y  Q. k! V( G  B        }
2 G6 \* u$ x* r1 ?: K, k6 }! o0 `7 s  _4 W, L" z% o' A
        Pool { workers: workers, max_workers: max_workers, sender: tx }; H! a. ^2 Z& W7 n2 l1 F1 x
    }  k' ?4 _; B$ I% c# r" Y
   
5 q1 a! m$ J- p: t7 E  U    pub fn execute&lt;F&gt;(&amp;self, f:F) where F: FnOnce() + 'static + Send
3 ~6 c/ R- ?* @    {1 Z% Z( }/ S3 H( ^( q

- c' M& D( N. V7 n        let job = Message::NewJob(Box::new(f));5 n& v" c  a' v0 O, W* k
        self.sender.send(job).unwrap();
9 |/ C9 W$ N: p/ E    }
6 R. y: U- y' x/ w3 C0 ^; y, a3 z}
: V2 f) T' f" W* A3 Y+ b8 b7 s
: i* [0 m. F6 E1 W# n8 G6 x5 Oimpl Drop for Pool {
( D  c. `5 L- ?1 A: P$ N    fn drop(&amp;mut self) {: ], P1 N+ n. ]! L' ^0 n: w
        for _ in 0..self.max_workers {3 M/ X% f. o! B2 y; \; e  M7 a
            self.sender.send(Message::ByeBye).unwrap();* x8 z& ?# l) O) L" L2 f( S& `
        }& j3 \/ W1 v# y' X) u: R
        for w in self.workers {
" e9 v) w- g9 A* D            if let Some(t) = w.t.take() {6 z8 s; ^" {8 x3 {0 O6 I
                t.join().unwrap();
" \! h/ \" s6 p( p' X) r            }
3 C4 R/ g9 Z8 {  A$ _0 l        }! k$ g. h, M* ]5 Y1 B
    }
( D# G' D4 s( H2 n# F# {4 C}
! A* }) F# {7 {3 j9 A% j3 g6 C; R, t6 T: R

5 Z! ]* a7 O4 m2 f* C% {* I#[cfg(test)]" Q) {7 J* B- E: S
mod tests {2 H! n2 y( R3 C2 K- `
    use super::*;
4 @0 \2 }- U! m8 {/ r, O# k; T    #[test]
& c  Y, U8 a7 Y3 E, M& h$ X    fn it_works() {9 ]" G0 ~/ B' v9 d8 t
        let p = Pool::new(4);
2 u, l# G/ m' p- r0 ~        p.execute(|| println!("do new job1"));
# K. b/ g" D/ C( y2 R" C        p.execute(|| println!("do new job2"));
, C# u8 n$ I$ A$ x! B. d+ r& Z        p.execute(|| println!("do new job3"));
6 k5 k9 z8 B+ ]4 L3 m        p.execute(|| println!("do new job4"));
/ ]+ Y  [% U$ f6 h    }
* |) Q3 \: V3 }  ~: I8 G}- l/ B( |6 x8 _  S: V2 [* R
</code></pre>
+ z1 G/ I% V0 R; Q$ M" _8 h+ N8 U( F$ x
回复

使用道具 举报

懒得打字嘛,点击右侧快捷回复 【右侧内容,后台自定义】
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

手机版|飞雪团队

GMT+8, 2025-12-4 17:00 , Processed in 0.074294 second(s), 31 queries , Gzip On.

Powered by Discuz! X3.4

Copyright © 2001-2021, Tencent Cloud.

快速回复 返回顶部 返回列表