飞雪团队

 找回密码
 立即注册
搜索
热搜: 活动 交友 discuz
查看: 14544|回复: 0

rust 实战 - 实现一个线程工作池 ThreadPool

[复制链接]

8019

主题

8107

帖子

2万

积分

管理员

Rank: 9Rank: 9Rank: 9

积分
26387
发表于 2022-2-12 14:35:42 | 显示全部楼层 |阅读模式

4 f; p& ~& ^6 C2 P- p4 j' N<h1 id="如何实现一个线程池">如何实现一个线程池</h1>' t! L  A  z8 i. C, f( q* C
<p>线程池:一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。 例如,对于计算密集型任务,线程数一般取cpu数量+2比较合适,线程数过多会导致额外的线程切换开销。</p>
' ?3 ~# R1 a) t! {; X! _5 W<p>如何定义线程池Pool呢,首先最大线程数量肯定要作为线程池的一个属性,并且在new Pool时创建指定的线程。</p>; u7 U' X- b  k1 R) m
<p>线程池Pool</p>
5 l0 O( d6 d4 }! r3 d# u& P<pre><code>pub struct Pool {
0 b& o" _. D1 Y; e+ Z0 G  max_workers: usize, // 定义最大线程数
& T. y- d0 h# ?6 m6 l}
6 w% v) c9 T5 T/ M' o
2 s  w8 \. S7 ~7 ]9 r3 s& ~impl Pool {, ~' e( }* ?' r. I3 ?
  fn new(max_workers: usize) -&gt; Pool {}7 r& F5 Q7 O. B( s2 @
  fn execute&lt;F&gt;(&amp;self, f:F) where F: FnOnce() + 'static + Send {}
4 S0 O1 k# j4 g( `  k' p- p}
* m8 V4 m9 `5 [8 ?) t/ m* E" e4 C; c4 k0 f- I' K
</code></pre>
; e( {7 ?! S- K3 g# f+ P<p>用<code>execute</code>来执行任务,<code>F: FnOnce() + 'static + Send</code> 是使用thread::spawn线程执行需要满足的trait, 代表F是一个能在线程里执行的闭包函数。</p>
( A' M0 _/ ~& y0 F! q<p>另一点自然而然会想到在Pool添加一个线程数组, 这个线程数组就是用来执行任务的。比如<code>Vec&lt;Thread&gt;</code> balabala。这里的线程是活的,是一个个不断接受任务然后执行的实体。<br>
/ ]1 x; c& L: |+ V9 S可以看作在一个线程里不断执行获取任务并执行的Worker。</p>  W; N. q& h$ @* m9 C
<pre><code>struct Worker where
* v, W7 l4 W' d/ u2 \! t5 ]{
# L8 z! s6 n( ^( ?5 O    _id: usize, // worker 编号( N! G0 k& B) _. d2 w8 X( h
}
2 i$ S8 l  U& A) e</code></pre>
3 m8 A/ j' U# Z0 E. G<p>要怎么把任务发送给Worker执行呢?mpsc(multi producer single consumer) 多生产者单消费者可以满足我们的需求,<code>let (tx, rx) = mpsc::channel()</code> 可以获取到一对发送端和接收端。<br>
( m" c# ]/ e8 ^把发送端添加到Pool里面,把接收端添加到Worker里面。Pool通过channel将任务发送给多个worker消费执行。</p>% }' T5 V& y; a5 F: N( B
<p><strong>这里有一点需要特别注意,channel的接收端receiver需要安全的在多个线程间共享</strong>,因此需要用<code>Arc&lt;Mutex::&lt;T&gt;&gt;</code>来包裹起来,也就是用锁来解决并发冲突。</p>0 N" ]' B+ M% v7 v# ~0 T
<p>Pool的完整定义</p>
* K+ a) J4 {  W+ v<pre><code>pub struct Pool {
9 X! t) g" J$ G5 w, u& V# k9 {# b    workers: Vec&lt;Worker&gt;,4 C% C* x0 d7 M2 p! \
    max_workers: usize,3 j3 G8 W# N4 I5 R% s6 i, T
    sender: mpsc::Sender&lt;Message&gt;
1 T1 A& |5 S+ Q& s+ u9 ^}8 m0 @$ P) M2 @* j
</code></pre>
4 _! P- ?3 }9 m0 Y/ O, ]<p>该是时候定义我们要发给Worker的消息Message了<br>
+ X7 w3 o! @) `! {- U  D! H2 k1 N定义如下的枚举值</p>
2 q1 U- y2 k. m) A$ B6 {/ S<pre><code>type Job = Box&lt;dyn FnOnce() + 'static + Send&gt;;
! n  M( p: J, L9 Z) M6 `& J9 J5 s6 Benum Message {
3 k/ W. f; A% a1 Y: {' z# p9 E    ByeBye,
+ O+ o) s5 f8 m) o, R9 M( r    NewJob(Job),# G& m- x. n2 ^9 G* _
}
# d1 g" N1 {; @5 {</code></pre>6 j" ^0 {: \% a- Q2 l  h$ P* D& \9 @
<p>Job是一个要发送给Worker执行的闭包函数,这里ByeBye用来通知Worker可以终止当前的执行,退出线程。</p>
: }+ p- a: S9 [' O4 L: K<p>只剩下实现Worker和Pool的具体逻辑了。</p>
2 t' T& ?  \2 W0 r4 D<p>Worker的实现</p>, q+ I& _& a! P9 C' T
<pre><code>impl Worker7 k- d/ |' e  A
{4 N/ B, w' \  e' o# E
    fn new(id: usize, receiver: Arc::&lt;Mutex&lt;mpsc::Receiver&lt;Message&gt;&gt;&gt;) -&gt; Worker {* \8 j! _! f1 l6 c1 h
        let t = thread::spawn( move || {
2 Z: i  l0 U- i: H. B/ e9 F! d            loop {
3 ^, G7 S! _* d" q. y                let receiver = receiver.lock().unwrap();9 y  D) J0 f/ K) {
                let message=  receiver.recv().unwrap();# F$ F4 ]6 A5 }( l- v+ n% |2 ]" Y) l
                match message {- l+ m) J/ r5 e
                    Message::NewJob(job) =&gt; {$ A6 x" ]1 \% _" Q" p
                        println!("do job from worker[{}]", id);5 r' ]8 S1 W) q2 q
                        job();( F2 p( N/ l4 m. |
                    },& t1 k3 ?7 v# j& r
                    Message::ByeBye =&gt; {
% E  T) I5 I% a, W7 ]% E& I                        println!("ByeBye from worker[{}]", id);
: o$ T8 O$ q' A0 K) e) A+ [/ a2 i                        break3 H' S8 Z3 z& V& n. G7 }
                    },
+ V1 Q5 o) I; y% U+ U                }  / `6 p& C( f0 W0 u9 m+ E
            }
2 ?! ?8 i! x( @; H! T        });
, e; w3 L. ~7 n$ ~- y- a6 y! l+ u
        Worker {
- w) m9 G8 J! e. q( P, v            _id: id,/ u$ i. {4 D; v1 R
            t: Some(t),
6 z2 H4 D* A% H# j        }
  I" g% _3 k/ Z# M& N    }
( d: s, M0 ^( x  B}. _8 `2 b; `3 z  j5 x
</code></pre>3 b0 k8 q9 q8 G6 e" k) G
<p><strong>let message = receiver.lock().unwrap().recv().unwrap();</strong> 这里获取锁后从receiver获取到消息体,然后let message结束后rust的生命周期会自动释放掉锁。<br>
2 @, E8 L, r0 V& y6 r7 V但如果写成</p>; g3 h& d6 O, S! v: ]; k
<pre><code>while let message = receiver.lock().unwrap().recv().unwrap() {
2 k. M1 k1 V( F! ?};
' G% Y( @7 u- }, r- {</code></pre>
6 \. L' X1 d3 b3 V; A7 T<p>while let 后面整个括号都是一个作用域,要在这个作用域结束后,锁才会释放,比上面let message要锁定久时间。<br>
' N0 E0 O  ?; d: {' J/ M# Yrust的mutex锁没有对应的unlock方法,由mutex的生命周期管理。</p>
* b0 M1 W" a7 X, q. A<p>我们给Pool实现<code>Drop</code> trait, 让Pool被销毁时,自动暂停掉worker线程的执行。</p>  ]' {  Y4 y2 o! p# |
<pre><code>impl Drop for Pool {7 Q. F5 ~* L4 ]3 S0 m4 C7 Z* v
    fn drop(&amp;mut self) {+ W  G2 }; D; @+ c
        for _ in 0..self.max_workers {2 n- @' M3 G3 \1 \  L5 @% D6 |
            self.sender.send(Message::ByeBye).unwrap();
: P3 V+ R) r! E+ D        }6 z' n+ E8 s7 @6 R. y) ^  W/ {
        for w in self.workers.iter_mut() {
+ ~5 o# \. H7 Z' g            if let Some(t) = w.t.take() {) M, F9 w/ F6 o% x. E; K
                t.join().unwrap();
5 P+ s8 d; N' b# _+ _  I+ {  V            }
9 `& @. a8 l) K        }1 G5 R3 [# y& T  v" q. C- x
    }, E  v( k$ |# C: {& L1 Q
}
& o, |7 Q$ B. `* p; i& I; v
2 N  F- U9 G- Z5 M1 c) M</code></pre>% h# y4 E' n7 T% ~$ i% U
<p><strong>drop方法里面用了两个循环</strong>,而不是在一个循环里做完两件事?</p>9 K$ u" }& z2 x9 X
<pre><code>for w in self.workers.iter_mut() {
- t. H1 S. }: l$ n    if let Some(t) = w.t.take() {# A9 e  V& l. P* Z8 A
        self.sender.send(Message::ByeBye).unwrap();
: W* n  q4 @- ]6 a; `        t.join().unwrap();
$ g* i6 `  S. [! }: v% ]/ p" C    }4 _, ]/ d1 |" u+ a
}3 F" G1 n5 V  h
3 h" e% n& D( I2 s' {! [
</code></pre>
( |" h. p& u9 g% h. z9 O8 w1 m<p>这里面隐藏了一个会造成死锁的陷阱,比如两个Worker, 在单个循环里面迭代所有Worker,再将终止信息发送给通道后,直接调用join,<br>
" d6 O9 o) p& V2 V* D我们预期是第一个worker要收到消息,并且等他执行完。当情况可能是第二个worker获取到了消息,第一个worker没有获取到,那接下来的join就会阻塞造成死锁。</p>$ C/ L) b& w+ }+ i# l2 v5 Z4 }
<p><strong>注意到没有,Worker是被包装在Option内的</strong>,这里有两个点需要注意</p>6 u2 a: F7 `$ L6 E' k  |5 U
<ol>
) ~8 P' m1 V1 V$ d3 d. Z& a<li>t.join 需要持有t的所有权</li>0 h( v3 t3 c5 U9 {7 m# R
<li>在我们这种情况下,self.workers只能作为引用被for循环迭代。</li>2 L" D# D  o  ?% V3 @  u
</ol>- f+ G; B' V' s& y0 i) M* z8 H
<p>这里考虑让Worker持有<code>Option&lt;JoinHandle&lt;()&gt;&gt;</code>,后续可以通过在Option上调用take方法将Some变体的值移出来,并在原来的位置留下None变体。<br>1 V. ?( o* O' r! M
换而言之,让运行中的worker持有Some的变体,清理worker时,可以使用None替换掉Some,从而让Worker失去可以运行的线程</p>/ f# I$ S1 V+ \5 y; D, f
<pre><code>struct Worker where# J! @3 h+ T  T# M$ s
{
! e+ P/ F" v( u/ Y# ?% x    _id: usize,) j8 p; M  ]: s. L- M' i
    t: Option&lt;JoinHandle&lt;()&gt;&gt;,0 m  c+ p( f* Y4 P* p1 G# F
}
3 P9 z* ~4 V( v& E9 h! K; d</code></pre>8 s" U4 }9 R) L2 [0 `( B2 d
<h1 id="要点总结">要点总结</h1>
$ \8 A8 I6 t# ?$ @( t! N<ul>
' l# d$ I2 ~- |& c<li>Mutex依赖于生命周期管理锁的释放,使用的时候需要注意是否逾期持有锁</li>3 H5 y: W! s" `/ T' p2 k
<li><code>Vec&lt;Option&lt;T&gt;&gt;</code> 可以解决某些情况下需要T所有权的场景</li>
5 p: q3 m4 L; D6 T8 d</ul>
0 C4 g8 K* ~. r1 |* U<h1 id="完整代码">完整代码</h1>. H  `5 v6 {+ ]+ o; C+ l
<pre><code>use std::thread::{self, JoinHandle};- {) b$ \& ^9 I: P, C5 B
use std::sync::{Arc, mpsc, Mutex};, l2 @/ X8 q2 i, E
- ^0 Y* N" S/ B6 Q. o0 U

  C; Y# i# j4 `8 [type Job = Box&lt;dyn FnOnce() + 'static + Send&gt;;
/ h5 ?1 I( [' c: _* Y  benum Message {$ D7 o5 @5 U( ?! E  W$ s: m3 j
    ByeBye,
8 B6 Q$ ]/ p$ ]) I: |    NewJob(Job),
: U6 _1 }2 |+ t2 W, \4 T$ Z}! R" C9 D/ m6 W
2 B" W7 t$ I  _# e
struct Worker where+ A- P5 X0 c7 @& i5 I( j0 A/ N
{
& _& r  @" e: f4 L) Z# t7 D    _id: usize,' x5 P3 E0 S8 i2 c
    t: Option&lt;JoinHandle&lt;()&gt;&gt;,3 ]$ G( v4 z, h
}
& Z) u' o) n+ l( L' v! D2 ^
/ t# B' b# ^% h. aimpl Worker/ v! ?7 D, ^! @7 `9 P
{
( L: {- Q' ]! G: f( t. [' A5 O( w4 V    fn new(id: usize, receiver: Arc::&lt;Mutex&lt;mpsc::Receiver&lt;Message&gt;&gt;&gt;) -&gt; Worker {6 ^+ ]' P* e; b# h1 d' T1 ]  i
        let t = thread::spawn( move || {$ `7 ~; z* s' K* f2 z
            loop {
" b, L' c! ]* G* k                let message = receiver.lock().unwrap().recv().unwrap();8 n- K% t( ~& M6 D7 G, b2 H
                match message {
8 j; a2 K7 r/ F                    Message::NewJob(job) =&gt; {% p, ^* C' j8 C$ k5 m0 r  Z4 ?2 B
                        println!("do job from worker[{}]", id);
" H4 X; k8 n' Z                        job();5 \, t$ }" O* }! E/ N
                    },
3 n# h0 L, W  O/ d                    Message::ByeBye =&gt; {
8 O2 ~5 B7 S; j/ t! m1 A5 d8 D                        println!("ByeBye from worker[{}]", id);
8 T( K8 A2 F+ {" ^* j9 a                        break
) _6 q0 t6 C3 P2 H                    },7 c: Z1 I5 j% f2 X7 w2 c
                }  
) ?8 u, ?% c/ f' \/ l            }5 ~8 a7 ^- g; v4 E/ |; f+ A
        });6 F; d; L* y: R) U3 b

5 w- Z" m( G( c) a- N5 ~7 ^5 {        Worker {
% ~; F% [  P: v- N) ], ?  P- b8 ~            _id: id,
+ f& W; F5 b8 |2 Y% l. v            t: Some(t),
, S6 u# a9 T) G  b: _        }/ t& X' e* @' |* G( b0 U
    }- W! v7 @2 w6 [$ d* P) \; [
}2 ^& g) s  a' Y, H$ Y7 f8 I
5 H. \7 Q6 Z! T2 m' k0 O
pub struct Pool {
6 N* K% ^# }' M/ ]- w/ w+ I/ e    workers: Vec&lt;Worker&gt;,. ~/ V" ^1 y( ?/ Q' `- O5 u1 ~0 _
    max_workers: usize,9 q: \; x, {4 M4 L
    sender: mpsc::Sender&lt;Message&gt;7 r! J3 s# h) u
}
& e9 ]6 O: u- n: P0 Z/ h5 W0 P
8 Y& ~6 G1 l. H. j9 qimpl Pool where {
% _: q/ r1 c% E) S    pub fn new(max_workers: usize) -&gt; Pool {2 v2 P6 P6 J9 E$ C" S9 s" }* G
        if max_workers == 0 {
6 M( f# \) H6 G6 b4 I            panic!("max_workers must be greater than zero!"); Z% v1 X0 F! k4 ?
        }
9 C+ H0 I& c6 E$ T; ?        let (tx, rx) = mpsc::channel();- ?' U+ j2 x: x. i
' |2 D; m  ^1 a
        let mut workers = Vec::with_capacity(max_workers);
6 [( z' d5 c; F: T8 b6 c7 B        let receiver = Arc::new(Mutex::new(rx));0 Q4 _' R0 W) t( U- l& G
        for i in 0..max_workers {: K3 E% m# T: z8 u8 P+ y
            workers.push(Worker::new(i, Arc::clone(&amp;receiver)));, F! t7 z- V( r, g7 x) [+ N
        }2 O2 z% X5 X& M# ^1 f
* [* f" g+ l" o% x3 G6 _% U
        Pool { workers: workers, max_workers: max_workers, sender: tx }
2 e# N/ I2 [" |    }
5 I4 D) D+ Y1 p3 r: C1 s7 I   
& M" C# r/ e9 H0 U6 Q$ j5 r6 w1 w+ g    pub fn execute&lt;F&gt;(&amp;self, f:F) where F: FnOnce() + 'static + Send
4 X, @- L: X$ a4 q2 S/ V    {
1 G8 P+ o1 T) z: m; p! K: w8 x6 h9 V& u" i% {
        let job = Message::NewJob(Box::new(f));
5 v  F9 H1 j7 v/ r2 I3 i+ Z/ e        self.sender.send(job).unwrap();4 j' Z7 I0 r; n8 Z6 L- {
    }" c; a3 a5 s* V7 }3 W' g
}8 d8 P; p: q3 l- S; _

2 L' U( d; @! M$ l4 ~1 F: gimpl Drop for Pool {! x# c. x3 v6 A# H8 c
    fn drop(&amp;mut self) {
. ]/ y$ R/ F0 [0 _0 M6 ?+ U% @        for _ in 0..self.max_workers {
( U0 i" D/ q1 Z            self.sender.send(Message::ByeBye).unwrap();- e3 J4 t6 T8 t' h
        }+ L/ D) k3 p9 Z3 ~
        for w in self.workers {
- n: u  x8 F, O            if let Some(t) = w.t.take() {* |4 d. j* C% Y5 V) h5 s
                t.join().unwrap();
& q; _  m. P8 `0 n            }+ i! b3 Q) T+ H% B9 r9 A+ s
        }
/ z( E: z, ?) b    }, g6 z! f( [' g' z# K
}
  A* x4 C% N9 z, ~% R7 u; [
% G- X4 y6 j/ b4 |- [6 a8 b
* w/ \( r- L8 |% Z5 i% w5 ~#[cfg(test)]9 Y$ I6 x) `5 D% C, F4 y
mod tests {
6 w6 S8 m7 N8 H6 F% J    use super::*;" H7 P3 Q6 ^7 C% l. [2 U# k
    #[test]  f" h1 `0 P2 I4 B5 K" w3 q* Y  p' y
    fn it_works() {) o% G1 [# `, U, D
        let p = Pool::new(4);0 i$ @# x' Y( x- _5 n6 w
        p.execute(|| println!("do new job1"));
: u$ D3 p1 s7 X. v  U5 [0 U' ~1 u3 S        p.execute(|| println!("do new job2"));( y- ~( T. c. k' f( G3 p1 E
        p.execute(|| println!("do new job3"));
& ?4 B3 |9 B3 ^5 \( M  |        p.execute(|| println!("do new job4"));
, n% a  x: \0 ?+ r    }
$ ^- {0 J0 L- e' l# |3 \( s$ Q}' a  V7 l) {' A4 e1 S( `$ x% @
</code></pre>0 [' L; q; G" `

1 Z2 O/ w0 w: m) r7 N6 y
回复

使用道具 举报

懒得打字嘛,点击右侧快捷回复 【右侧内容,后台自定义】
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

手机版|飞雪团队

GMT+8, 2025-12-1 16:56 , Processed in 0.063522 second(s), 22 queries , Gzip On.

Powered by Discuz! X3.4

Copyright © 2001-2021, Tencent Cloud.

快速回复 返回顶部 返回列表