飞雪团队

 找回密码
 立即注册
搜索
热搜: 活动 交友 discuz
查看: 14778|回复: 0

rust 实战 - 实现一个线程工作池 ThreadPool

[复制链接]

8053

主题

8141

帖子

2万

积分

管理员

Rank: 9Rank: 9Rank: 9

积分
26489
发表于 2022-2-12 14:35:42 | 显示全部楼层 |阅读模式

0 s) ~, Y3 j9 [0 ]8 A' X- B<h1 id="如何实现一个线程池">如何实现一个线程池</h1>% b7 t( q. W6 c" _6 K
<p>线程池:一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。 例如,对于计算密集型任务,线程数一般取cpu数量+2比较合适,线程数过多会导致额外的线程切换开销。</p>
; ]/ R; _/ Q9 n1 H: [, @<p>如何定义线程池Pool呢,首先最大线程数量肯定要作为线程池的一个属性,并且在new Pool时创建指定的线程。</p>: m; [9 \' _% }7 G: }9 \* Y0 b
<p>线程池Pool</p>
0 u5 ~* w; q2 ^0 M$ I4 I# j4 ~<pre><code>pub struct Pool {4 {0 I( L* A, i, |6 A* L: d
  max_workers: usize, // 定义最大线程数  g& |5 ^+ q2 T& O
}, [5 [- w6 I3 {' Q
6 H$ O2 a' i' t7 D
impl Pool {
9 [1 m3 l( w* i, T0 T- m  Q  fn new(max_workers: usize) -&gt; Pool {}
! X* C; K0 V: B  t" l  fn execute&lt;F&gt;(&amp;self, f:F) where F: FnOnce() + 'static + Send {}7 y+ d& e$ f( U( o7 g# l, I) ~  \
}
5 ^2 ^( }9 k  Q: j8 A/ m; E1 m2 H$ x: c$ W9 ?) T
</code></pre>
4 F% W& M( Y0 E" }<p>用<code>execute</code>来执行任务,<code>F: FnOnce() + 'static + Send</code> 是使用thread::spawn线程执行需要满足的trait, 代表F是一个能在线程里执行的闭包函数。</p>
8 }+ n$ o- d' [4 _0 \/ b) ?( d* M) L<p>另一点自然而然会想到在Pool添加一个线程数组, 这个线程数组就是用来执行任务的。比如<code>Vec&lt;Thread&gt;</code> balabala。这里的线程是活的,是一个个不断接受任务然后执行的实体。<br>
- B# R8 S" n; }0 N' e. p& }4 K可以看作在一个线程里不断执行获取任务并执行的Worker。</p>" u& D  F1 s% V: G5 |% P7 J; R
<pre><code>struct Worker where
) w0 _& \% X0 X' b{
1 a; x# Z) S2 `$ z! v- v  w    _id: usize, // worker 编号( S- a/ R$ B/ o
}4 ?2 C% r$ y" ]/ {
</code></pre>+ a- E1 K, Z* I7 k% C
<p>要怎么把任务发送给Worker执行呢?mpsc(multi producer single consumer) 多生产者单消费者可以满足我们的需求,<code>let (tx, rx) = mpsc::channel()</code> 可以获取到一对发送端和接收端。<br>6 |, s6 q, M5 n8 h
把发送端添加到Pool里面,把接收端添加到Worker里面。Pool通过channel将任务发送给多个worker消费执行。</p>
: G: ~& K9 ^9 x: B! T* Q<p><strong>这里有一点需要特别注意,channel的接收端receiver需要安全的在多个线程间共享</strong>,因此需要用<code>Arc&lt;Mutex::&lt;T&gt;&gt;</code>来包裹起来,也就是用锁来解决并发冲突。</p>
0 y; G' r* ~3 M# X<p>Pool的完整定义</p>
* \6 w( U0 Y- Q0 I<pre><code>pub struct Pool {
3 b# x$ ?5 W3 B, f/ N" f) N    workers: Vec&lt;Worker&gt;,$ V- a- Y7 k4 z: M, e/ h1 x* N1 q
    max_workers: usize,* t, H+ y: E; P" q
    sender: mpsc::Sender&lt;Message&gt;
" M, Q; _; C5 X/ H/ p! `}9 y6 S& ?5 n/ t# N" i
</code></pre>
. a' Q7 Z# x: u9 n<p>该是时候定义我们要发给Worker的消息Message了<br>
# O6 F0 m/ v" D* T: Q定义如下的枚举值</p>
$ v9 C9 s& e# o  u( z5 O6 @<pre><code>type Job = Box&lt;dyn FnOnce() + 'static + Send&gt;;. t2 S! s7 b  x3 I; z% ]9 O1 ]
enum Message {8 w8 p$ l: p4 z& C8 j4 }. r( h
    ByeBye,8 J% v5 @3 P3 ?1 S$ Z: Q3 C
    NewJob(Job),! r- E2 L1 `5 r9 A" n
}
# {4 q- p* e+ F) U! m$ `2 d</code></pre>
3 c9 C- V1 `* t! u8 k<p>Job是一个要发送给Worker执行的闭包函数,这里ByeBye用来通知Worker可以终止当前的执行,退出线程。</p>
* f# w9 K8 x# S) l: I<p>只剩下实现Worker和Pool的具体逻辑了。</p>0 i) ]9 P$ `; Z  c
<p>Worker的实现</p>* i7 B9 e8 ]; ?( ]5 C
<pre><code>impl Worker* F/ k& S* ^. u( Q4 t3 y# }$ ~- m
{% o; S* O5 T0 w% d3 {1 ]: F
    fn new(id: usize, receiver: Arc::&lt;Mutex&lt;mpsc::Receiver&lt;Message&gt;&gt;&gt;) -&gt; Worker {5 P3 d+ {# \2 h
        let t = thread::spawn( move || {1 M. ~' }; H* V0 A; B& w' n
            loop {
5 t5 c; v. |5 U0 r                let receiver = receiver.lock().unwrap();5 t. [( [7 {( U7 d
                let message=  receiver.recv().unwrap();7 L5 p/ ^- D% m) q! v$ m5 _; |
                match message {; B$ p. B8 p3 `0 |- T  @
                    Message::NewJob(job) =&gt; {7 i' H9 j; L, o# [# l5 E- I  I
                        println!("do job from worker[{}]", id);8 f# g7 D! \4 W8 F' o" _0 m
                        job();' r8 D& g. ~3 C: M8 H/ b# |! N4 B( J
                    },' Q6 `4 I6 Q# w4 t6 a- e
                    Message::ByeBye =&gt; {
9 ^) E6 X% o( ^% i                        println!("ByeBye from worker[{}]", id);3 f7 N" P: H# J/ O' l5 N5 a+ [
                        break
5 p  y$ h' o2 A4 Z5 }! e; D( w                    },
$ x0 c& @% l6 \& B5 b% v                }  ( A% S/ \+ m5 t3 F
            }. p1 o0 B. o. k& I+ y0 [! w
        });
( h, c$ E, a& ~, S  J/ p/ A9 J" \6 i, K+ o" O. U3 n
        Worker {
! ~0 |: F1 @* ]4 t  `* \            _id: id,
+ u$ p  k7 G" y* H. H  u            t: Some(t),
& c' {* {% P3 m7 @        }) {* r: F2 R; F2 Q( G5 _
    }
: N7 C  Z% t# l: p}
! }1 I9 k+ c! ?3 c0 W</code></pre># E2 _! d# K/ l0 v: W
<p><strong>let message = receiver.lock().unwrap().recv().unwrap();</strong> 这里获取锁后从receiver获取到消息体,然后let message结束后rust的生命周期会自动释放掉锁。<br>
/ y. n* v% H( \. K& ~7 F: c! P但如果写成</p>
9 `( _' G9 a4 h<pre><code>while let message = receiver.lock().unwrap().recv().unwrap() {
; Q5 n& q0 S) B3 N};! K! _+ r* L7 ?
</code></pre>7 U4 q, Q; q5 W
<p>while let 后面整个括号都是一个作用域,要在这个作用域结束后,锁才会释放,比上面let message要锁定久时间。<br>7 |7 U: R1 i& |) {$ b7 p
rust的mutex锁没有对应的unlock方法,由mutex的生命周期管理。</p># y3 N* r, R  r  j8 v# m2 v
<p>我们给Pool实现<code>Drop</code> trait, 让Pool被销毁时,自动暂停掉worker线程的执行。</p>" G  ?& H  s, T5 L) l* z
<pre><code>impl Drop for Pool {* J+ X- I. d2 J. i
    fn drop(&amp;mut self) {; D$ m; O2 I: ^! a
        for _ in 0..self.max_workers {
, v. s9 Z: ]7 w$ \' H8 E# W& p5 ^' [+ H            self.sender.send(Message::ByeBye).unwrap();8 K4 W; B3 y  @; j- G7 {% g
        }
. T4 |/ U+ ^" X5 Q8 P        for w in self.workers.iter_mut() {
9 A# B7 K$ t/ `$ C: Y9 _            if let Some(t) = w.t.take() {
$ @+ \/ w( g5 E( h2 y                t.join().unwrap();
* Z1 q- [1 }& D1 G+ x0 [7 C4 n2 n9 t) {            }' t4 l7 k; A4 x
        }
( _7 _( @8 f4 X5 P4 O    }1 F0 h, C0 W. H) _4 x/ a' p4 w
}
: E4 T$ m  f6 c8 A; P
4 }; s4 s3 c4 a; ^</code></pre>5 m9 W" @7 s0 h' o0 y
<p><strong>drop方法里面用了两个循环</strong>,而不是在一个循环里做完两件事?</p>
% h# l$ H& I% C% l. F1 h<pre><code>for w in self.workers.iter_mut() {; v) G: D5 z2 ^2 i7 R2 m
    if let Some(t) = w.t.take() {2 B  E# @# z$ M4 ~- u) S! w
        self.sender.send(Message::ByeBye).unwrap();
" j) C" W" ^9 I5 U9 h& J        t.join().unwrap();
' w/ M$ `3 I0 v5 t+ r/ J, _    }! `! l( g$ f( R+ B. w
}5 s( g, C8 h5 M' i+ [/ y

  e  n1 {% t5 @& h/ ^</code></pre>& ?5 C6 H0 V% s
<p>这里面隐藏了一个会造成死锁的陷阱,比如两个Worker, 在单个循环里面迭代所有Worker,再将终止信息发送给通道后,直接调用join,<br>
4 V  J* ?4 j% j: i我们预期是第一个worker要收到消息,并且等他执行完。当情况可能是第二个worker获取到了消息,第一个worker没有获取到,那接下来的join就会阻塞造成死锁。</p>
! T8 c( R( `7 g" v<p><strong>注意到没有,Worker是被包装在Option内的</strong>,这里有两个点需要注意</p>+ v8 L; y* g9 }* z% S5 X5 f
<ol>
7 h' [3 u9 o& {! j<li>t.join 需要持有t的所有权</li>! Q4 a8 d! s* |1 V9 C0 J# }+ K
<li>在我们这种情况下,self.workers只能作为引用被for循环迭代。</li>" c& e: z, V* ]5 _
</ol>
# t! ~! ?3 C# l) t) _, @<p>这里考虑让Worker持有<code>Option&lt;JoinHandle&lt;()&gt;&gt;</code>,后续可以通过在Option上调用take方法将Some变体的值移出来,并在原来的位置留下None变体。<br>
$ ~$ R& S& Q+ r* |+ U换而言之,让运行中的worker持有Some的变体,清理worker时,可以使用None替换掉Some,从而让Worker失去可以运行的线程</p>$ W& k: L5 h# R
<pre><code>struct Worker where
$ g+ {2 h, R( O, u/ U{
* T$ y1 f" C& D, p5 v    _id: usize,  c( C' Y7 Z/ a, ?, \- v6 `/ d
    t: Option&lt;JoinHandle&lt;()&gt;&gt;,( y* ?1 S% w) j+ S. ~" i, J: f
}4 x2 `6 N8 e2 v
</code></pre>
3 L0 S# E6 k2 S; {0 I+ G  `<h1 id="要点总结">要点总结</h1>7 ~; U; f: R; ]* C% k- A6 L0 W
<ul>
2 J7 M# S; ^8 j' G0 Y6 C0 R, `<li>Mutex依赖于生命周期管理锁的释放,使用的时候需要注意是否逾期持有锁</li>& x* a' \% ?7 b) e! S+ ?& @
<li><code>Vec&lt;Option&lt;T&gt;&gt;</code> 可以解决某些情况下需要T所有权的场景</li>0 V; f& ?4 o: }" H0 C
</ul>: V* L0 L6 Y# v+ J
<h1 id="完整代码">完整代码</h1>2 s, @2 J. Y) Q# K% ?1 C
<pre><code>use std::thread::{self, JoinHandle};
+ {+ J2 ]6 u0 G0 G5 Q. nuse std::sync::{Arc, mpsc, Mutex};
7 R3 e0 \7 u1 }- {- v0 a% J, I' V8 j* P
% D/ I. v$ n6 c& \3 w/ \8 X
type Job = Box&lt;dyn FnOnce() + 'static + Send&gt;;) H( `3 _  E' x! h5 G: x: x- Q
enum Message {- b# a6 \" n7 z% Q2 ]
    ByeBye,( G* f$ |8 ]7 {: y) ]7 o% R# ]# u! p
    NewJob(Job),
. _6 q+ \9 l2 w& U+ g3 `5 }}  ?% F  H' t8 g5 m: I- x- W: s/ R
5 g% z( d+ B: P; ^+ M2 h* {8 Y
struct Worker where. o8 \8 |# M: ?. \4 D/ z6 [+ I
{0 C" m, ?  R# ], K2 r2 A& ~6 _, O
    _id: usize,
5 r, f, k0 C6 V6 @2 d: C    t: Option&lt;JoinHandle&lt;()&gt;&gt;,4 R+ T( X! P' A
}
1 a. u; l  N* e( g; q" u' K" u; h$ m) ~/ @9 A. T" X: {
impl Worker
9 Y' H5 a4 ^+ W& c  B  D# @; c{+ q# F7 `9 }3 y6 |7 ~7 s
    fn new(id: usize, receiver: Arc::&lt;Mutex&lt;mpsc::Receiver&lt;Message&gt;&gt;&gt;) -&gt; Worker {
1 q8 {, r" d2 y" g; {        let t = thread::spawn( move || {# m  j( A6 z# w7 b$ r9 Q
            loop {3 }* A# l8 g, m6 w3 M
                let message = receiver.lock().unwrap().recv().unwrap();4 H. K( L1 i1 j8 N
                match message {
! W: `/ @4 a2 ]( J* {* ~, c                    Message::NewJob(job) =&gt; {# A0 w  J3 G6 s) W& c1 a
                        println!("do job from worker[{}]", id);
* N# M3 H0 L" N( \                        job();
, r( C( F3 V* D; b# j/ j                    },
( ~9 i+ }! l$ T: t$ Y* x                    Message::ByeBye =&gt; {$ l  U" W- y) D
                        println!("ByeBye from worker[{}]", id);) C: H9 @7 Q  T! b+ p% H8 ^
                        break
) s0 g6 }$ n% d* d: q# q                    },' _& G& F; d+ _; ?' {4 d  U, a6 v
                }  2 \& `+ R; c  g2 X& q0 s  a
            }
" K2 I/ u2 l: g- U5 Y- A9 R* W2 o        });
- g( M1 C9 k; d! b- o- P2 X3 e# F. o( R$ F3 c
        Worker {
9 ]* E0 @; a' d* E- c* s            _id: id,& b9 W% p9 j2 b  [' g; G: p4 p
            t: Some(t),
- _" b  x( ~+ w4 }. @        }
- H( G/ _; R, ?4 c. D    }2 ?7 u- e: V4 D/ `" |& u4 T! f) N* c
}
; R& e# o2 x, j; ]" g7 W! {8 ~- {7 S% _& e% s4 |/ f
pub struct Pool {% y5 @9 M4 u# P0 `
    workers: Vec&lt;Worker&gt;,6 ~: j& F& I" L7 Q( N- W0 c6 `
    max_workers: usize,
2 t1 [' |2 H4 O' l( b7 |    sender: mpsc::Sender&lt;Message&gt;
  q6 m7 R, P  Q) g}% f: t9 ?% }# U4 R: w% Q% T: x
! ~% h2 a: B* T* C5 y; D- K5 V
impl Pool where {
) q3 t3 r  t- R) V# m    pub fn new(max_workers: usize) -&gt; Pool {
, A5 M$ J# x$ Y- Z& O/ n        if max_workers == 0 {
7 O9 \2 `9 N* v            panic!("max_workers must be greater than zero!")
3 d& T* y3 j! d, [        }& O6 |3 G& U# j; g
        let (tx, rx) = mpsc::channel();
6 B+ ?4 v5 _0 W, I
. Q8 U+ J! |4 V        let mut workers = Vec::with_capacity(max_workers);
0 Y9 K3 v, f* ]# g        let receiver = Arc::new(Mutex::new(rx));4 l9 i3 I4 H& `: m5 v
        for i in 0..max_workers {$ R3 ]  D& E6 b% J& ?5 H; X
            workers.push(Worker::new(i, Arc::clone(&amp;receiver)));
! k# |- a& G. S4 R; h' I9 }        }& }; _1 G- b% c- j+ U# {  p

; w4 h* r7 v( Q9 D  S7 d        Pool { workers: workers, max_workers: max_workers, sender: tx }
6 W, E9 p/ Q  t  v1 M    }
! J) x$ ^  N* p# d    + h/ h( ]) K% A; d$ p# b
    pub fn execute&lt;F&gt;(&amp;self, f:F) where F: FnOnce() + 'static + Send
# w) W' |  y. q* ~7 `3 x2 ^    {
8 o$ @6 [% J5 ]' `
8 t) |. R% J- [9 C% ?        let job = Message::NewJob(Box::new(f));
, y2 c, g) G2 ]- q7 s! i2 N        self.sender.send(job).unwrap();3 B( t0 P& h& C5 D: E' K# K
    }4 a: s" Z4 n6 T; X, V* O
}) @1 F7 i4 V& y, q4 e+ H. [
( W. \2 B+ t0 J2 U9 U4 H: h
impl Drop for Pool {" h+ {2 A  [+ Z$ S
    fn drop(&amp;mut self) {
; G9 O& v0 f+ Q4 e, M        for _ in 0..self.max_workers {! c/ H# ~  @2 T. M
            self.sender.send(Message::ByeBye).unwrap();
! E- w# i2 y) @( S        }2 U' J% W( z# a+ W0 J
        for w in self.workers {
8 o0 Y( r$ ^; w( T" W. @            if let Some(t) = w.t.take() {
3 d: Q, o  {1 [! c                t.join().unwrap();( @  m1 g4 g( _, t
            }
" {+ m; r! L( c+ j% Y5 w) C* w0 H) Z        }
* Z' N/ H7 q; N3 T9 }    }
+ `" {* U; v) [0 p5 k" }  o! v}* p# r# E1 L+ `9 ]' J1 Q
6 H8 N2 Z0 I& ]7 x

* m- S1 I( u+ g" u; x8 }#[cfg(test)]
  \" |2 J# U/ @) M& h" N4 T$ Zmod tests {4 l: Y$ |4 `+ y- F+ R, ~& Y; r' i
    use super::*;% ?0 i) ?6 ]- x4 S: g& b/ |1 q
    #[test]
8 v1 w8 ?+ k1 ?5 G2 ~3 N' J3 M    fn it_works() {  G8 o$ O. z& r, [2 R6 s
        let p = Pool::new(4);
& k* E! _5 Z* c! t( U) x        p.execute(|| println!("do new job1"));
3 r7 Q+ @( ^& Y3 e  ~8 p" d3 B9 M+ c        p.execute(|| println!("do new job2"));
* p6 w9 N( M5 ^4 X  \' r        p.execute(|| println!("do new job3"));
2 _4 y6 S# s1 A7 r        p.execute(|| println!("do new job4"));
9 H: b- ~& N6 r2 u6 Z2 A3 Y+ o4 ?    }
; S* \5 s4 Y- n! z1 C& Z5 B+ c}0 P6 N5 X7 D4 z9 M
</code></pre>* t4 U! ]) f0 d; U( M
7 v/ S. x7 m8 l) _
回复

使用道具 举报

懒得打字嘛,点击右侧快捷回复 【右侧内容,后台自定义】
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

手机版|飞雪团队

GMT+8, 2025-12-12 02:36 , Processed in 0.063032 second(s), 22 queries , Gzip On.

Powered by Discuz! X3.4

Copyright © 2001-2021, Tencent Cloud.

快速回复 返回顶部 返回列表