飞雪团队

 找回密码
 立即注册
搜索
热搜: 活动 交友 discuz
查看: 14890|回复: 0

rust 实战 - 实现一个线程工作池 ThreadPool

[复制链接]

8062

主题

8150

帖子

2万

积分

管理员

Rank: 9Rank: 9Rank: 9

积分
26516
发表于 2022-2-12 14:35:42 | 显示全部楼层 |阅读模式
5 X7 m" ]7 `! m! \- e
<h1 id="如何实现一个线程池">如何实现一个线程池</h1>* J% o0 V5 @/ D" y
<p>线程池:一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。 例如,对于计算密集型任务,线程数一般取cpu数量+2比较合适,线程数过多会导致额外的线程切换开销。</p>/ {3 p8 s& S- ]4 u1 |
<p>如何定义线程池Pool呢,首先最大线程数量肯定要作为线程池的一个属性,并且在new Pool时创建指定的线程。</p>9 G$ {1 [5 S4 ?: h
<p>线程池Pool</p>" _% t/ e# c. B: ~
<pre><code>pub struct Pool {
4 y- g+ x' k& J% A  max_workers: usize, // 定义最大线程数* X+ I- K0 e2 n! f2 z5 O9 k; k2 \
}
  R' T, [3 \" s# c: b! ~5 Z- W  x
4 D: V  q/ B; ^0 T. ?8 }impl Pool {+ I. W. G9 E/ b# @: T6 z
  fn new(max_workers: usize) -&gt; Pool {}
  Q9 ^! S4 |6 n$ E1 F  fn execute&lt;F&gt;(&amp;self, f:F) where F: FnOnce() + 'static + Send {}
- k- j$ G* z# l+ u* b. a2 K' `( w}# [' P0 ^3 P' G7 r+ E( v. z

2 u# w  S4 v: J/ Z6 F2 ?& v</code></pre># p) @7 u# H5 D/ J
<p>用<code>execute</code>来执行任务,<code>F: FnOnce() + 'static + Send</code> 是使用thread::spawn线程执行需要满足的trait, 代表F是一个能在线程里执行的闭包函数。</p>
7 O, z# M1 D7 R* _4 y3 u<p>另一点自然而然会想到在Pool添加一个线程数组, 这个线程数组就是用来执行任务的。比如<code>Vec&lt;Thread&gt;</code> balabala。这里的线程是活的,是一个个不断接受任务然后执行的实体。<br>
' Q  k1 E5 G! _可以看作在一个线程里不断执行获取任务并执行的Worker。</p>
% X2 j6 y5 ?: _; ~3 u; z<pre><code>struct Worker where& @5 t: b0 O1 g% M$ Z% m0 {; d
{0 Y0 h; b$ }. i8 M& v" J, x% B  [
    _id: usize, // worker 编号
8 |; E; d& Y4 _}
( @. i. E6 L  c</code></pre>
/ A' v( m; b, W9 T$ Z<p>要怎么把任务发送给Worker执行呢?mpsc(multi producer single consumer) 多生产者单消费者可以满足我们的需求,<code>let (tx, rx) = mpsc::channel()</code> 可以获取到一对发送端和接收端。<br>4 @: h/ f3 Q/ [" N6 R8 g0 P
把发送端添加到Pool里面,把接收端添加到Worker里面。Pool通过channel将任务发送给多个worker消费执行。</p>/ j8 m) q2 A) }( Z' P: A
<p><strong>这里有一点需要特别注意,channel的接收端receiver需要安全的在多个线程间共享</strong>,因此需要用<code>Arc&lt;Mutex::&lt;T&gt;&gt;</code>来包裹起来,也就是用锁来解决并发冲突。</p>9 p, b' }- y( a9 f
<p>Pool的完整定义</p>
& d6 f3 c2 ~! s6 }$ v<pre><code>pub struct Pool {5 K- R! W$ z: |$ Y& e$ P+ M3 m7 U
    workers: Vec&lt;Worker&gt;,
* X3 U- R% C3 \/ g5 J1 ]    max_workers: usize,/ m6 H9 h- ?7 I0 D1 t& O
    sender: mpsc::Sender&lt;Message&gt;
0 G2 ^( U( Z- X, O3 h  U}' h% ]7 v8 r. A: t. L8 }8 O5 W$ ]. I0 K
</code></pre>
/ k: W" j0 ^. d4 I9 u" B) ?  C2 m<p>该是时候定义我们要发给Worker的消息Message了<br>/ d- p6 @# z+ _0 b. S4 ?* j
定义如下的枚举值</p>
9 }- V5 K# v, j0 y" I6 F! b<pre><code>type Job = Box&lt;dyn FnOnce() + 'static + Send&gt;;
3 o0 Y" z! i. j  [, E% `7 ]enum Message {. v# I- _( t) I; E4 ?
    ByeBye," D# F2 o( j1 @$ @' v
    NewJob(Job),
) S, k$ ^4 s# W8 A$ e, F- y& U. z8 d}
( p( ^$ R8 g- B+ Z</code></pre>  ^8 e$ w& n' K# i; M
<p>Job是一个要发送给Worker执行的闭包函数,这里ByeBye用来通知Worker可以终止当前的执行,退出线程。</p>
( C5 Q6 ]2 M% D9 R# `( }# Y6 b<p>只剩下实现Worker和Pool的具体逻辑了。</p>) ~. P5 ~, J8 m- e- b
<p>Worker的实现</p>
' m- c& ?$ F# r( l1 R) W: ?9 y/ n<pre><code>impl Worker, S& t& L/ I) n1 X1 Q' X9 B
{8 g6 a( `& J, [5 L$ e8 J
    fn new(id: usize, receiver: Arc::&lt;Mutex&lt;mpsc::Receiver&lt;Message&gt;&gt;&gt;) -&gt; Worker {
" O# P& w: W$ }% M$ `2 _# n        let t = thread::spawn( move || {
. V9 C( Z! w" P1 W8 {# E; J# D$ {            loop {
3 v8 b3 [/ O" h. d# D                let receiver = receiver.lock().unwrap();4 w4 `. p8 h. M' q; r2 d
                let message=  receiver.recv().unwrap();
4 d/ z4 z* E/ U: ]3 D6 w                match message {
8 U9 ^- F3 x/ \: a' S" L) s3 }' ?                    Message::NewJob(job) =&gt; {  W8 u1 i; k, l, e4 ~% _. {9 y% l1 \
                        println!("do job from worker[{}]", id);
& z7 b4 w8 f' y$ |) G                        job();7 ~+ d4 U" i8 ~2 C" ^0 T
                    },
/ s7 i# E8 x  Z' M% c                    Message::ByeBye =&gt; {* h) d/ g  |7 x" L$ A* |0 w* n3 S- v
                        println!("ByeBye from worker[{}]", id);
8 a4 ?6 i' ?4 j* W" k4 g                        break
2 u8 u! A% ~2 c3 m: z4 t                    },
9 H" S1 ~2 a2 U                }  
7 ^+ S) P4 [) y1 d, w            }% _1 @6 T$ \' p7 k# @* e) j9 v3 ^
        });* [2 f, J0 h. W/ L
2 Y& T% F7 H4 O0 |* |/ R
        Worker {
6 T7 N& S: r9 ~6 g2 [            _id: id,
6 u) ~( e$ s) J4 m) X            t: Some(t),
; l* f: E+ W% R4 `        }7 p% O5 ]+ E  x5 k7 @" i1 l
    }
* {/ h4 e8 [. U7 q' H  {}7 h% A& |8 |- S! ^) g+ U
</code></pre>
5 k0 N7 |5 k5 V; [<p><strong>let message = receiver.lock().unwrap().recv().unwrap();</strong> 这里获取锁后从receiver获取到消息体,然后let message结束后rust的生命周期会自动释放掉锁。<br>) D/ ^3 j% Z( \+ c3 M
但如果写成</p>6 ~) @. V, }1 z  y5 ]
<pre><code>while let message = receiver.lock().unwrap().recv().unwrap() {
  z2 E) j& h' n) l- z};: u$ G7 i/ {% S1 \
</code></pre>
& T* m0 \  k1 t<p>while let 后面整个括号都是一个作用域,要在这个作用域结束后,锁才会释放,比上面let message要锁定久时间。<br>, u- F3 c- Q9 U- P
rust的mutex锁没有对应的unlock方法,由mutex的生命周期管理。</p>
9 P% f! l, u/ N, v' E; n<p>我们给Pool实现<code>Drop</code> trait, 让Pool被销毁时,自动暂停掉worker线程的执行。</p>
7 k% F9 o5 e0 H<pre><code>impl Drop for Pool {8 n: k$ Z2 v9 |+ z
    fn drop(&amp;mut self) {
3 _1 R7 b2 n8 I  h" C        for _ in 0..self.max_workers {! X8 T& I" ~8 M' P6 `0 ~
            self.sender.send(Message::ByeBye).unwrap();% r7 H* N3 i# t8 L0 C6 g; G1 u
        }
% e  \' U. m8 e* e/ _# d# k" B        for w in self.workers.iter_mut() {, ~5 J+ P' K8 }' b- U0 T+ t3 d
            if let Some(t) = w.t.take() {
- b0 R8 v) E7 I                t.join().unwrap();9 w! v' ~$ Z" x! P+ R* a' g
            }
/ w6 h) Y5 S, W0 B2 {# l5 W        }1 y4 o9 n% o! D- U- A* F) J
    }: o3 i, X( x8 Y) T; _; E/ p
}7 w6 @. O3 w: o* b! |
2 a$ r* J- X: l; b9 B  o& C$ k
</code></pre>
6 Q, X2 K. o* N8 P: R<p><strong>drop方法里面用了两个循环</strong>,而不是在一个循环里做完两件事?</p>4 [5 I) }7 m" k+ i& W4 b3 Z( l
<pre><code>for w in self.workers.iter_mut() {
) F  c: c' D: A& n7 X$ \1 V/ g; n    if let Some(t) = w.t.take() {
/ p$ f+ T# E3 C- o4 `8 E        self.sender.send(Message::ByeBye).unwrap();6 P1 y2 J9 E6 l4 Q/ g
        t.join().unwrap();
- p% r& E! m' T* Z9 ]) |, a    }
& g" {- K+ o; a' \3 {8 B, W4 ^}$ X0 d+ Q2 G9 H

3 W* [) y% C9 n' u) s0 T</code></pre>
1 t! n# y5 |/ y% D4 s& z6 A<p>这里面隐藏了一个会造成死锁的陷阱,比如两个Worker, 在单个循环里面迭代所有Worker,再将终止信息发送给通道后,直接调用join,<br>
8 C% x# |4 }5 J; `我们预期是第一个worker要收到消息,并且等他执行完。当情况可能是第二个worker获取到了消息,第一个worker没有获取到,那接下来的join就会阻塞造成死锁。</p>; e( e! t- {1 {0 k
<p><strong>注意到没有,Worker是被包装在Option内的</strong>,这里有两个点需要注意</p>
0 }1 V& H( K% P- h0 e& N8 d( k<ol>
, s3 z) F8 T, L2 R<li>t.join 需要持有t的所有权</li>
" X4 k$ U5 Z5 a<li>在我们这种情况下,self.workers只能作为引用被for循环迭代。</li>
2 _' f0 R, J: c) R. I</ol>+ s; z" y7 e: K3 @- T! f: i
<p>这里考虑让Worker持有<code>Option&lt;JoinHandle&lt;()&gt;&gt;</code>,后续可以通过在Option上调用take方法将Some变体的值移出来,并在原来的位置留下None变体。<br>5 |; F4 K% r5 v2 l. }2 z4 z5 A$ e
换而言之,让运行中的worker持有Some的变体,清理worker时,可以使用None替换掉Some,从而让Worker失去可以运行的线程</p>
) ^, [& k. A. x! p; }9 X5 a& {7 b<pre><code>struct Worker where8 ^/ c; G- E: ]' ?8 H
{6 w& [! v) r$ d* Q  k
    _id: usize,+ o) B; N3 d! l& N6 O
    t: Option&lt;JoinHandle&lt;()&gt;&gt;,
5 z# R7 e" V( w8 `4 b' g, N0 v) C) X}
- g1 t: L% ?  N4 j; Q</code></pre>4 t- Q$ e; ]8 ?' H( `
<h1 id="要点总结">要点总结</h1>  `, j: s* W+ \, y( P" F
<ul>" e. Q$ x  ^1 A3 u' v
<li>Mutex依赖于生命周期管理锁的释放,使用的时候需要注意是否逾期持有锁</li>4 N7 j+ l  Z! p$ N5 @
<li><code>Vec&lt;Option&lt;T&gt;&gt;</code> 可以解决某些情况下需要T所有权的场景</li>. ?7 r, k$ d$ R( h1 T4 Y
</ul>
+ y' T' }& _/ q* m$ C<h1 id="完整代码">完整代码</h1>
1 j9 ]8 a( F2 c, e2 \" q<pre><code>use std::thread::{self, JoinHandle};% h$ o5 A. X! g  g. \
use std::sync::{Arc, mpsc, Mutex};5 c3 X$ F7 W  O( }  O' F
4 |: Y' k% U; P; t( I6 p, l
. m! F( W8 s( z; T
type Job = Box&lt;dyn FnOnce() + 'static + Send&gt;;' z2 i5 Y/ V: F! K& X. M4 `% V
enum Message {
5 ?; w$ i- o" Q+ Y; t    ByeBye,3 z, ]5 e* c  f* g' p! o6 S1 x0 a
    NewJob(Job),7 j4 k) R( d* s  _/ B1 `
}
. Z9 F" ^% Z2 k
- e- o( B, E1 b) M. M/ rstruct Worker where
0 f1 K3 g  K3 J{) ?7 y+ i# {$ p: W! R% m) h
    _id: usize,6 J: S3 o* [! k3 Q: h
    t: Option&lt;JoinHandle&lt;()&gt;&gt;,
# r1 H8 T/ O/ l: I! H, h* T2 L  S}
, G  q4 C' S; }% Y" ~  H' s8 O: p! g0 D% G. |/ P
impl Worker% }2 ~! K$ O4 c: l
{
+ z/ D0 o+ s  |! F4 w/ L) j    fn new(id: usize, receiver: Arc::&lt;Mutex&lt;mpsc::Receiver&lt;Message&gt;&gt;&gt;) -&gt; Worker {
' h$ l) Z  X  p# |        let t = thread::spawn( move || {5 ]" J+ z1 o* k9 f. }
            loop {( I2 z2 o; d. n: i
                let message = receiver.lock().unwrap().recv().unwrap();
7 C$ b( x& {% O6 J1 u" J1 ?: k                match message {
7 N" n/ n* c+ w1 s0 `8 y" q                    Message::NewJob(job) =&gt; {
: ^6 Y$ N# J$ u+ i7 c" g                        println!("do job from worker[{}]", id);+ V! K$ m0 O- \# M9 m4 k
                        job();
1 }3 n& S1 W0 g! E$ d. @6 P. G                    },
8 v% b* \5 r2 }                    Message::ByeBye =&gt; {: h, [1 {$ R$ S. X2 Y  A: a
                        println!("ByeBye from worker[{}]", id);
, h% f4 ]* S+ ~5 m. ^9 k                        break
. z; n% X, J( W8 a- N2 |7 O                    },7 {$ P/ N& d& ?& _! ]
                }  
2 K6 l, W; i- t" H  Y; \% ^) S            }
7 ^! v! K4 T% @/ L& N* G2 e) `5 i        });' A. m0 ]( p3 [. s! D7 l
% Y; v1 E, @6 J
        Worker {1 ~: G/ ~) q" `; L
            _id: id,
2 o5 l% R+ ^5 i, P$ Z8 a            t: Some(t),
0 g% \  H8 ?3 T& V- B: r        }2 S1 v; h) O$ S# J4 L
    }
6 ?: f/ _/ a' _! Z}3 J8 C# l9 s" J+ _! r4 g

! h2 P5 z$ r8 n+ y5 ^9 Z" Ipub struct Pool {" X# F9 {( o4 t  ?- C. w
    workers: Vec&lt;Worker&gt;,
/ P# L* L. q5 t2 l6 G9 Y    max_workers: usize,; s! k( F1 }* u% J- f# Y( L
    sender: mpsc::Sender&lt;Message&gt;; ~! [  V$ Z/ A) j% m, j
}! K$ T# O  ?8 X4 I6 |$ c

6 y# W* i' j; q; K. _2 Zimpl Pool where {
0 C. }! i, d, C  q+ m5 i6 v) h    pub fn new(max_workers: usize) -&gt; Pool {
. Y/ U& a! y+ ]( C) H9 J* _        if max_workers == 0 {' [0 U  S$ H* c) E
            panic!("max_workers must be greater than zero!")8 K* X" i4 H7 i' B/ B! O8 J# O* U
        }* h4 L* C$ R% B; Z3 i1 y: h
        let (tx, rx) = mpsc::channel();: ]  b  l9 T; t+ R! \7 R4 K9 H$ i3 ~4 n
( q$ m' u9 b! M. G, Z
        let mut workers = Vec::with_capacity(max_workers);" H8 `& e3 a8 L& }& R8 V
        let receiver = Arc::new(Mutex::new(rx));1 s& v2 B; k% J# k
        for i in 0..max_workers {$ Q$ _: L0 m6 `
            workers.push(Worker::new(i, Arc::clone(&amp;receiver)));
& J; m* h  O3 m# v; ]) R, `" V        }, _8 A  m5 M* A; b8 |
' I3 m; L) v: Q/ C4 Y& J' P
        Pool { workers: workers, max_workers: max_workers, sender: tx }) d& @3 [$ o/ _% c% q" a
    }
* @. W& m8 V2 a$ s5 f( V    + m, Q& `3 ~7 x. W' e( k- x
    pub fn execute&lt;F&gt;(&amp;self, f:F) where F: FnOnce() + 'static + Send
- R- N( A, J3 @/ w! Y* }* y* A    {- [0 {% K4 i6 j6 [8 T

+ _/ B3 ~2 o1 g# i0 E/ H        let job = Message::NewJob(Box::new(f));
0 S) p# P# k' o: K) ]+ t        self.sender.send(job).unwrap();
8 ~4 l- d" A, O' B! p$ e    }
& ~) W" S6 T( w$ W- B! @}
. R, p. |* [( E! o+ e" W2 H
# w: c- ^3 @2 x* C/ b; S8 {* Qimpl Drop for Pool {
+ y' m8 j7 b, p2 a! q0 e4 P    fn drop(&amp;mut self) {
: }# `" c9 E& C1 h        for _ in 0..self.max_workers {/ ~8 x- k: `5 B2 Q: A
            self.sender.send(Message::ByeBye).unwrap();
2 n" l  V4 L" h/ y5 I$ m        }, J8 G& q; A$ u
        for w in self.workers {
9 e" g# K  x1 G( p! q            if let Some(t) = w.t.take() {
9 I2 o4 v% B- n1 ^5 P                t.join().unwrap();0 z: b  T* ~9 u) v( V
            }  c' N4 ~/ k) h, d  q) t
        }" i! t+ c0 A) L7 m
    }5 @, L" q7 a; \. X/ [
}
5 U- u7 l3 `% y- w' R4 c( {% D. E% H3 E

/ W! Z# p- t/ C#[cfg(test)]! c- v0 U6 Q6 a% y( w  e
mod tests {
' {3 Z0 |) y# _, k+ p* c7 P    use super::*;
5 D' H. R" V2 }0 G( i; W0 |4 m    #[test]  Y# I+ N& x# ]# F$ i
    fn it_works() {7 R6 _7 D/ U& ]4 b
        let p = Pool::new(4);
- h( [- Y4 Q. y$ j5 C        p.execute(|| println!("do new job1"));
8 i1 w. ~9 E# M9 @0 o        p.execute(|| println!("do new job2"));
4 [8 A/ w9 M  a/ C9 @2 v        p.execute(|| println!("do new job3"));
. A; I5 A7 |5 f* b) _        p.execute(|| println!("do new job4"));7 J8 U( x8 n# S! H8 n! e
    }& N' G$ ?8 P/ x
}9 G0 {5 \1 w4 i# g& k
</code></pre>
: m8 q. O2 v- U! P5 f" Q8 W4 q- w% K0 [* z/ E7 f+ i
回复

使用道具 举报

懒得打字嘛,点击右侧快捷回复 【右侧内容,后台自定义】
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

手机版|飞雪团队

GMT+8, 2025-12-16 13:31 , Processed in 0.073059 second(s), 22 queries , Gzip On.

Powered by Discuz! X3.4

Copyright © 2001-2021, Tencent Cloud.

快速回复 返回顶部 返回列表