飞雪团队

 找回密码
 立即注册
搜索
热搜: 活动 交友 discuz
查看: 14653|回复: 0

rust 实战 - 实现一个线程工作池 ThreadPool

[复制链接]

8038

主题

8126

帖子

2万

积分

管理员

Rank: 9Rank: 9Rank: 9

积分
26444
发表于 2022-2-12 14:35:42 | 显示全部楼层 |阅读模式
4 y+ I  {8 `" I1 k, o5 `9 H% J$ j
<h1 id="如何实现一个线程池">如何实现一个线程池</h1>% \3 M: w/ o+ B" V7 L6 V% P, g& _
<p>线程池:一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。 例如,对于计算密集型任务,线程数一般取cpu数量+2比较合适,线程数过多会导致额外的线程切换开销。</p>) T- w/ u. l, Q5 p
<p>如何定义线程池Pool呢,首先最大线程数量肯定要作为线程池的一个属性,并且在new Pool时创建指定的线程。</p>
& S: B! v& V& D( n8 A<p>线程池Pool</p>, Y) H" k. I" |. [/ p; _7 J' h
<pre><code>pub struct Pool {
: c: t/ v1 s, y  max_workers: usize, // 定义最大线程数7 a# m% q# n3 m' W9 ?- y* m
}: S+ Y0 A) ~" f& G3 T3 S) M+ f# d
4 c5 P/ D2 B6 A: d" _. a2 ~
impl Pool {7 X' M6 w) K9 L. h. m, y, v- c6 @
  fn new(max_workers: usize) -&gt; Pool {}; v) x& M$ Z& R: ~7 n! B" B% ]
  fn execute&lt;F&gt;(&amp;self, f:F) where F: FnOnce() + 'static + Send {}
3 p1 t: y. e! b2 t+ j, \}
7 a7 j3 M2 P( \" G4 Q4 c* P  K: }) S& S# ]( ^, v9 g, c$ P
</code></pre>2 M* U3 K$ t+ R( a8 a, J
<p>用<code>execute</code>来执行任务,<code>F: FnOnce() + 'static + Send</code> 是使用thread::spawn线程执行需要满足的trait, 代表F是一个能在线程里执行的闭包函数。</p>
  L- N# ]) d% x7 e# a<p>另一点自然而然会想到在Pool添加一个线程数组, 这个线程数组就是用来执行任务的。比如<code>Vec&lt;Thread&gt;</code> balabala。这里的线程是活的,是一个个不断接受任务然后执行的实体。<br>
* h9 K7 n+ W& n& P( g: g可以看作在一个线程里不断执行获取任务并执行的Worker。</p>! y# Z/ Q* j' w; U# f
<pre><code>struct Worker where$ K! S5 |8 ~5 {
{, w) A- ~% l( s/ g  n* ~7 ~7 r
    _id: usize, // worker 编号
. b. G4 [+ W0 ?8 r- b0 U}; i7 G  G3 Y+ R
</code></pre>
# E0 |( C& q1 q, Z7 D<p>要怎么把任务发送给Worker执行呢?mpsc(multi producer single consumer) 多生产者单消费者可以满足我们的需求,<code>let (tx, rx) = mpsc::channel()</code> 可以获取到一对发送端和接收端。<br>/ [6 p; q' c- D; R
把发送端添加到Pool里面,把接收端添加到Worker里面。Pool通过channel将任务发送给多个worker消费执行。</p>" F0 I5 t) B  R; l( f
<p><strong>这里有一点需要特别注意,channel的接收端receiver需要安全的在多个线程间共享</strong>,因此需要用<code>Arc&lt;Mutex::&lt;T&gt;&gt;</code>来包裹起来,也就是用锁来解决并发冲突。</p>
" A. i4 b3 Z3 \# F0 H+ v' Y<p>Pool的完整定义</p>( a$ @2 s7 f; `" d8 J
<pre><code>pub struct Pool {  [  z, ^" M& t  y  l
    workers: Vec&lt;Worker&gt;,! ~6 e) {( k4 r- M
    max_workers: usize,
( y6 l$ }, ^, M6 z    sender: mpsc::Sender&lt;Message&gt;
# ^9 o0 |  r2 W6 ?* j, V6 g( w8 f}
/ E: O6 D, m0 m/ |$ v</code></pre>
0 j3 z) d2 b) \$ Y4 w3 e- B<p>该是时候定义我们要发给Worker的消息Message了<br>
, N; T3 z5 _! f/ S# I定义如下的枚举值</p>7 s( ^" a2 |; U
<pre><code>type Job = Box&lt;dyn FnOnce() + 'static + Send&gt;;. c1 @1 i( @, _. Q7 ^
enum Message {
# f( n. G9 f9 _5 _) A% M    ByeBye,
1 O& `2 N6 g( u- s- E0 }$ {: E3 s" P# X    NewJob(Job),
# u* {! C* ]" ]! t+ c# i! d}
+ w) J6 t, S6 w! s* `</code></pre>
  \& i% \! G" T<p>Job是一个要发送给Worker执行的闭包函数,这里ByeBye用来通知Worker可以终止当前的执行,退出线程。</p>- R( S5 ^$ K) W1 q: f% @
<p>只剩下实现Worker和Pool的具体逻辑了。</p>
/ E1 r8 |+ d7 D3 {. E9 N<p>Worker的实现</p>6 m3 @& _. M/ m+ L8 w
<pre><code>impl Worker
8 I9 K& X( p: ^1 l3 {/ d{9 G" N0 Z3 U, ]9 P: i; M
    fn new(id: usize, receiver: Arc::&lt;Mutex&lt;mpsc::Receiver&lt;Message&gt;&gt;&gt;) -&gt; Worker {
. y, t9 F: ?6 E1 w        let t = thread::spawn( move || {
) t5 \1 R5 u8 `            loop {& @' y" z' }& t0 h' _) I/ }
                let receiver = receiver.lock().unwrap();
# W7 ~7 S. E8 L                let message=  receiver.recv().unwrap();& G9 ^; t' g) K. S$ `/ t* q% U: A
                match message {
' g9 H: s) S  k1 a5 w4 s                    Message::NewJob(job) =&gt; {
5 I  u9 c% y- R. w7 \5 S" u+ h! ]                        println!("do job from worker[{}]", id);
3 p& z9 h. ~5 w' b                        job();
. Z# c0 n/ {6 z, m; i  y7 `6 U                    },# S/ |* Z) |% X' s) ^2 `
                    Message::ByeBye =&gt; {
0 W9 c( d* K  k* v! K  d( s* ?: x: J                        println!("ByeBye from worker[{}]", id);- [: ]0 [! X: Y
                        break
# A$ [( e# E/ ]- |' q                    },7 E# G: \2 ~  X, R# z' m- Z
                }  ' i6 p" t* K* U, R; ^
            }
2 x& e$ e  c# C/ B! V- _        });3 k% ~5 f$ G' C1 }7 V* d

" I9 d# F! h+ h1 m9 ~        Worker {9 F4 v, J. R5 Q1 ^) ~
            _id: id,
  U8 Q6 ]3 `4 h  Z            t: Some(t),& o5 e! U7 f! E9 @% z8 X
        }9 Z5 L% {. Z) Q
    }6 ~+ U# w/ f1 \; d1 S5 q
}/ ?6 W; {. `- k. f0 P" Q' j% W5 E/ l
</code></pre>7 Y$ B4 V/ @( d0 Q2 O& ?
<p><strong>let message = receiver.lock().unwrap().recv().unwrap();</strong> 这里获取锁后从receiver获取到消息体,然后let message结束后rust的生命周期会自动释放掉锁。<br>
# s5 N% s) s% z6 K( R但如果写成</p>
& V  v& |$ h: f3 T, {<pre><code>while let message = receiver.lock().unwrap().recv().unwrap() {
4 Z" x; ?" I0 y2 x4 ^1 ]( w2 U2 U};- S  S" T/ P5 S) X0 P. ^3 b
</code></pre>
7 e; J$ c2 l0 k( V; N<p>while let 后面整个括号都是一个作用域,要在这个作用域结束后,锁才会释放,比上面let message要锁定久时间。<br>, Q& s: I7 g# M* }4 s( d& d: }( M
rust的mutex锁没有对应的unlock方法,由mutex的生命周期管理。</p>
" H) ]' J  d* v+ c<p>我们给Pool实现<code>Drop</code> trait, 让Pool被销毁时,自动暂停掉worker线程的执行。</p>
; y' y2 A8 L: [5 ^. Z+ l* V<pre><code>impl Drop for Pool {2 p+ V  k& `  K; G
    fn drop(&amp;mut self) {
; a) F$ s  o2 f        for _ in 0..self.max_workers {
+ Z5 r! }' k6 r  q            self.sender.send(Message::ByeBye).unwrap();  {8 _, g  n8 H* }- Q, v
        }" D; J; U) p8 y5 d; x
        for w in self.workers.iter_mut() {. m+ D, t  O, @+ R1 N0 H, H
            if let Some(t) = w.t.take() {0 y3 h) E& m$ h( B& Y$ c3 C7 H
                t.join().unwrap();( Y; R0 e/ x) [% K
            }
3 D% {# e5 w% `5 k/ n        }
% b  ]0 i: p! e1 `/ g# v    }
7 B' Z$ h: k' B' b: p}/ c! M3 ^" s7 W9 @
3 }/ g# R% o) [5 G
</code></pre>9 Y1 c6 W, h4 e  B$ l0 C0 j9 E
<p><strong>drop方法里面用了两个循环</strong>,而不是在一个循环里做完两件事?</p>
1 `: k- u% i* u4 P( |% Z' J<pre><code>for w in self.workers.iter_mut() {
) D# L. h* b6 l  _2 S2 |$ _% H    if let Some(t) = w.t.take() {
* K2 O0 G/ _2 H        self.sender.send(Message::ByeBye).unwrap();
6 B' c2 a& |! m, Y- ?: x# ?3 J! ]6 c        t.join().unwrap();% S+ m, `4 p% M2 [. Q- c: k/ d
    }9 f3 z9 A; k5 n( s9 k
}- R- Q8 X6 F! m. G) E
; r+ d/ c' d$ \, v/ r6 E- h3 A' X
</code></pre>
/ i( H, U) j: c5 |1 A; D( a4 F8 P<p>这里面隐藏了一个会造成死锁的陷阱,比如两个Worker, 在单个循环里面迭代所有Worker,再将终止信息发送给通道后,直接调用join,<br>4 \9 ^  _4 {  E0 Y( G
我们预期是第一个worker要收到消息,并且等他执行完。当情况可能是第二个worker获取到了消息,第一个worker没有获取到,那接下来的join就会阻塞造成死锁。</p>
  o8 g/ t, C5 P4 s' k2 A<p><strong>注意到没有,Worker是被包装在Option内的</strong>,这里有两个点需要注意</p>& D/ D8 w2 c3 V1 h, \/ T- g) V* s
<ol>
2 _/ F& |/ ?. c6 O) K<li>t.join 需要持有t的所有权</li>0 u" s, g& `" w6 \* t
<li>在我们这种情况下,self.workers只能作为引用被for循环迭代。</li>
( q) V2 |( e, a0 t</ol>
3 \+ T+ r0 p% V5 I- ~<p>这里考虑让Worker持有<code>Option&lt;JoinHandle&lt;()&gt;&gt;</code>,后续可以通过在Option上调用take方法将Some变体的值移出来,并在原来的位置留下None变体。<br>5 x. h8 g$ j# {
换而言之,让运行中的worker持有Some的变体,清理worker时,可以使用None替换掉Some,从而让Worker失去可以运行的线程</p>/ k7 M; |/ x6 `. d+ V0 @
<pre><code>struct Worker where
/ ^- |2 d+ @0 K. s9 [' `8 U2 u( s{* u7 f% j- i; e8 H5 `6 d
    _id: usize," l: W' ~6 J: t
    t: Option&lt;JoinHandle&lt;()&gt;&gt;,1 |6 @) d2 N- |
}6 U# t& B5 ~( h% h) D3 R
</code></pre>
3 m# W1 [' Y; y4 s5 a6 V/ L7 r" H<h1 id="要点总结">要点总结</h1>4 G" e+ H+ `6 f. \
<ul>3 k2 |6 T! c1 J1 W6 |8 P4 E
<li>Mutex依赖于生命周期管理锁的释放,使用的时候需要注意是否逾期持有锁</li>
, i6 @6 d4 o4 E7 Y' x7 t<li><code>Vec&lt;Option&lt;T&gt;&gt;</code> 可以解决某些情况下需要T所有权的场景</li>! H( D( \" r% _/ ^
</ul>
+ G6 f% e' V0 K2 Y<h1 id="完整代码">完整代码</h1>
; ?) b* X+ o0 n6 y& Z# p  o<pre><code>use std::thread::{self, JoinHandle};6 Q0 L0 E8 f# [7 P6 k
use std::sync::{Arc, mpsc, Mutex};. {) T  ^9 ]6 R

6 H% G2 ?% D8 ?2 r  _; H2 }" R5 Z0 G! Y; H  X6 n
type Job = Box&lt;dyn FnOnce() + 'static + Send&gt;;" w* U- W1 D6 x$ H) y: x
enum Message {: }; x3 ]8 S  a8 @
    ByeBye,
/ K  L: \5 [; b( i7 u. _    NewJob(Job),
2 x6 K  q7 ]; w  v" N* k/ a}2 K; R* V5 V/ o3 ]& G
$ B$ {: F( Z: a( m# B: Z! B! t( N) Q
struct Worker where
) F* G. r- B3 O! r{8 L8 O) f( l. D
    _id: usize,
, b9 t: [; c- R( }& S6 {& W+ r; U    t: Option&lt;JoinHandle&lt;()&gt;&gt;," \/ D4 ~! m' ?
}' G9 o8 S6 B9 p
& i3 A$ `2 x( a5 ?. Y
impl Worker
; {6 Z( U( }1 m. O; b; S{/ f; R& b4 J# c% z' m/ W
    fn new(id: usize, receiver: Arc::&lt;Mutex&lt;mpsc::Receiver&lt;Message&gt;&gt;&gt;) -&gt; Worker {
) g% e6 @7 M- W: Q' [! z1 ^        let t = thread::spawn( move || {8 ]8 @! C; ]$ ?3 J& K
            loop {
  m0 y1 }0 p5 F                let message = receiver.lock().unwrap().recv().unwrap();/ A, O+ j2 C' Y% o, W$ X8 P
                match message {& E$ i4 u1 o3 y9 a% }- x# x
                    Message::NewJob(job) =&gt; {
4 ^  D! r0 u. a  K* J7 z2 Q% b                        println!("do job from worker[{}]", id);: o3 T: a! q$ m4 x* E0 X
                        job();' a) U* g# U9 G
                    },2 s0 S5 z. X/ O  ~- E' I
                    Message::ByeBye =&gt; {
2 j: u* y( y5 N0 d                        println!("ByeBye from worker[{}]", id);1 s9 G5 o6 j  W; r7 _4 u
                        break
: ~$ C" i& x8 }" l5 L; }6 q                    },
! U: {1 r8 z$ r$ S* n, @; O                }  , X7 ?8 c& _7 R$ s
            }+ s7 I. k. C3 v$ t- q- v4 b
        });
% k0 S+ H) q" I. I: U4 {, m: O# s' ^
        Worker {9 ~2 g1 C3 ], \: ]2 h* l' b
            _id: id,
( }" M( q& O" O            t: Some(t),
2 d9 A& w- P0 L2 m. @, b, M        }9 d4 F, L4 Z( i" H# D$ X- a
    }
, L7 ?1 a6 i9 Y  ?) b}2 }, W0 n7 T7 [9 _
, n9 r( b2 p' R* [+ s4 Q
pub struct Pool {  I1 b1 S( U' X$ U, |: L- Z$ u
    workers: Vec&lt;Worker&gt;,, R& k& Y' C! z6 A" M
    max_workers: usize,3 \( S' T1 T7 \: U
    sender: mpsc::Sender&lt;Message&gt;5 c. P( e0 m4 G& C) L) r# p
}. R7 ]' g1 P4 t4 o% u' j

6 ^- z1 b# E( [. U% I% a) x$ X! i" Gimpl Pool where {
& p; \" k5 s9 v, m    pub fn new(max_workers: usize) -&gt; Pool {
8 A, O. P0 X; m& M4 [: O- ]) @        if max_workers == 0 {. r9 w) o, Y$ Z: y2 E- |
            panic!("max_workers must be greater than zero!")
) Z7 Q$ v0 h3 v4 X2 P3 r        }
. ]* M( h1 S# m$ ^        let (tx, rx) = mpsc::channel();9 X( b* n# y2 {$ `
& W  G9 O6 ?4 N2 Q+ |# u
        let mut workers = Vec::with_capacity(max_workers);
" t# K( x" }& ]5 S        let receiver = Arc::new(Mutex::new(rx));
. O- G+ n" y: @) H6 z3 d        for i in 0..max_workers {
/ I; u8 z; x3 P            workers.push(Worker::new(i, Arc::clone(&amp;receiver)));( _! p  ~1 I% I. @1 o* m$ B
        }, p: }: ^2 D. B3 U4 [+ T

* F7 l' X' a* ]; s        Pool { workers: workers, max_workers: max_workers, sender: tx }* c% w+ F! V- ?" e4 V
    }$ L- X  b* B% j/ C' U1 F
   
- K  _/ F( d7 b    pub fn execute&lt;F&gt;(&amp;self, f:F) where F: FnOnce() + 'static + Send
, w( S& r& U1 c    {2 U1 D: E6 V8 E3 s1 p% _
6 M4 r. B5 d/ }
        let job = Message::NewJob(Box::new(f));: h* p" y8 ~3 z0 Y! \+ B" B; f& H
        self.sender.send(job).unwrap();
  b" z+ ~4 H% a* E6 r2 U    }
4 Q, B- z' I1 R$ p4 f}
8 Z. k3 ?& g5 N, P% C3 Q; f" i2 p0 S! l* {( b- e
impl Drop for Pool {+ B. ]9 T2 E/ j& z3 |; m% w
    fn drop(&amp;mut self) {
7 E- _- F5 L- v+ }# x2 l        for _ in 0..self.max_workers {4 n# C, J4 ?3 a7 e5 g& m+ h  Q4 d
            self.sender.send(Message::ByeBye).unwrap();, e4 \% X9 u( J  c; U. J
        }
0 T9 y: t2 J& D2 C; V+ L/ e5 I        for w in self.workers {
2 u% j5 y% t3 l, e2 f$ \            if let Some(t) = w.t.take() {  V% G: M7 Q- Y7 s
                t.join().unwrap();; K) w3 @0 W) K8 L
            }9 O$ [/ A0 n# E* W
        }
6 y, k1 ]; v3 |+ n9 A7 _' `, U2 _. B    }
" w! f: c8 e8 @4 K}
8 G" e+ b1 Q4 [; S1 A/ ^% J0 Y; T: J; J. B/ U+ G
9 m0 F/ P( @- y  i+ |) G& {" z
#[cfg(test)]
* X; F% x$ ]3 S" Y# rmod tests {
/ z, X, I, @" y1 Z    use super::*;
) f. V9 ^; l: r0 O1 f) v/ a/ {    #[test]
. w+ n# n- m2 Q+ H0 P6 K2 O    fn it_works() {
, @, |) a$ ^* O1 Y0 A' ~7 l        let p = Pool::new(4);1 S, t* G" @; W5 R) U
        p.execute(|| println!("do new job1"));
- J8 E% W5 \! R        p.execute(|| println!("do new job2"));
" c0 @& t. O& Q- T% Y        p.execute(|| println!("do new job3"));' l: t1 y0 I9 Z1 v
        p.execute(|| println!("do new job4"));
9 b' G& t& K) v    }  d- x- s6 `4 n) w, S  J) c  w
}! Q0 `7 {) M: a* D' }; f$ e3 r: X
</code></pre>: p) z# |! u" ?# N& L

7 h: z+ e. F3 T+ H; ^' w" z
回复

使用道具 举报

懒得打字嘛,点击右侧快捷回复 【右侧内容,后台自定义】
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

手机版|飞雪团队

GMT+8, 2025-12-6 08:53 , Processed in 0.066648 second(s), 22 queries , Gzip On.

Powered by Discuz! X3.4

Copyright © 2001-2021, Tencent Cloud.

快速回复 返回顶部 返回列表