飞雪团队

 找回密码
 立即注册
搜索
热搜: 活动 交友 discuz
查看: 14498|回复: 0

rust 实战 - 实现一个线程工作池 ThreadPool

[复制链接]

7992

主题

8080

帖子

2万

积分

管理员

Rank: 9Rank: 9Rank: 9

积分
26306
发表于 2022-2-12 14:35:42 | 显示全部楼层 |阅读模式
1 v6 i1 s6 d/ d8 X% k' ?& Y
<h1 id="如何实现一个线程池">如何实现一个线程池</h1>
5 m5 [6 Y# S+ S  O" z<p>线程池:一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。 例如,对于计算密集型任务,线程数一般取cpu数量+2比较合适,线程数过多会导致额外的线程切换开销。</p>! C- C$ F* h! E; W  w7 }
<p>如何定义线程池Pool呢,首先最大线程数量肯定要作为线程池的一个属性,并且在new Pool时创建指定的线程。</p>8 r; [+ H1 |: @* t  H5 x
<p>线程池Pool</p>: A2 |3 [7 T' p# V: m6 V
<pre><code>pub struct Pool {
4 W$ L( @' I! X  max_workers: usize, // 定义最大线程数* h0 `  N5 O/ V$ A( D8 i) O: T
}$ Q8 n9 |; ]9 k
7 c+ {5 U6 y6 w; y. f
impl Pool {
- m9 X9 i; T: w* U8 ?4 j' l  fn new(max_workers: usize) -&gt; Pool {}8 D2 ?2 j) z, ~. D" M
  fn execute&lt;F&gt;(&amp;self, f:F) where F: FnOnce() + 'static + Send {}# O1 w0 a' g3 `3 U% ^6 y
}
% Q. D* f( P  K' {7 l% v: W  C
( v; Q5 g% ]: T( o! G/ ^</code></pre>" j; B4 P6 i% h5 V5 k' S# \5 l
<p>用<code>execute</code>来执行任务,<code>F: FnOnce() + 'static + Send</code> 是使用thread::spawn线程执行需要满足的trait, 代表F是一个能在线程里执行的闭包函数。</p>$ ?! t& r: w0 @) a
<p>另一点自然而然会想到在Pool添加一个线程数组, 这个线程数组就是用来执行任务的。比如<code>Vec&lt;Thread&gt;</code> balabala。这里的线程是活的,是一个个不断接受任务然后执行的实体。<br>. j, i- c; r- j9 s9 T
可以看作在一个线程里不断执行获取任务并执行的Worker。</p>
# z- r1 z  |" i1 v4 Q  ~<pre><code>struct Worker where
) ?/ J# e# \/ p" U{
/ [1 r" v3 [/ u% J* Z& X1 k    _id: usize, // worker 编号
  F) v/ }! ^" G" v  ~( F}
  m: i, U3 m  U$ h: h</code></pre>0 ^$ o  s5 e3 h
<p>要怎么把任务发送给Worker执行呢?mpsc(multi producer single consumer) 多生产者单消费者可以满足我们的需求,<code>let (tx, rx) = mpsc::channel()</code> 可以获取到一对发送端和接收端。<br>$ e- G& X# A6 u, d+ k
把发送端添加到Pool里面,把接收端添加到Worker里面。Pool通过channel将任务发送给多个worker消费执行。</p>, c- I% @6 `/ H# Q# }: @
<p><strong>这里有一点需要特别注意,channel的接收端receiver需要安全的在多个线程间共享</strong>,因此需要用<code>Arc&lt;Mutex::&lt;T&gt;&gt;</code>来包裹起来,也就是用锁来解决并发冲突。</p>5 [4 L' G: w; {3 r8 m7 y7 H9 v
<p>Pool的完整定义</p>
3 o# d& s- [! W( i% o/ ~. v<pre><code>pub struct Pool {
/ v4 J; P: S8 `7 V- q& J5 \    workers: Vec&lt;Worker&gt;,' Y  Z+ R! \: H& J' H9 j
    max_workers: usize,0 Q, B: @, C8 D1 A% R! g" X$ d
    sender: mpsc::Sender&lt;Message&gt;, v4 p4 A! |" p' k8 G( q
}" `1 g6 n0 _7 L, a( `- `. S( U" t% L
</code></pre># }! C. a  L/ x/ \8 i9 K* E8 f
<p>该是时候定义我们要发给Worker的消息Message了<br>
4 C1 Y5 x4 L1 N+ s定义如下的枚举值</p>
/ [+ c5 j/ {/ f% S7 E<pre><code>type Job = Box&lt;dyn FnOnce() + 'static + Send&gt;;
* G- F1 f0 I7 x- h! Eenum Message {6 K+ L) ~$ L6 I: G0 \; M1 @  r& _
    ByeBye,
- t( n& k7 L8 m$ X% c% w    NewJob(Job),
0 M+ d) B& E( z7 k7 W) y}" m7 \0 [+ }" _" G9 T1 o1 A
</code></pre>
- i9 l0 M- G& N6 H0 v) j+ q7 U<p>Job是一个要发送给Worker执行的闭包函数,这里ByeBye用来通知Worker可以终止当前的执行,退出线程。</p>6 k6 I% U% B* {
<p>只剩下实现Worker和Pool的具体逻辑了。</p>
- U) V& e/ }1 B9 \0 @8 R. X<p>Worker的实现</p>/ F! D6 N6 P' r) O/ P$ @. U
<pre><code>impl Worker+ @# U7 _9 d' ?  ?- m
{6 o& l  p( ]$ d4 x1 F
    fn new(id: usize, receiver: Arc::&lt;Mutex&lt;mpsc::Receiver&lt;Message&gt;&gt;&gt;) -&gt; Worker {
0 g7 }9 x8 N. s- K& B        let t = thread::spawn( move || {
4 Q% P6 }8 |, y            loop {" {1 V& @4 l5 q' s, h: o
                let receiver = receiver.lock().unwrap();+ {. z; w- f& {+ O
                let message=  receiver.recv().unwrap();
3 ~0 P' F6 T/ t5 C+ z- M                match message {4 e: O  [5 I+ B* I/ Q( J
                    Message::NewJob(job) =&gt; {
  x' z" H3 F  {4 e                        println!("do job from worker[{}]", id);9 W1 I; f" l2 H& y
                        job();' Z9 `* K6 i8 h: F, Q. i+ x+ X
                    },
% [' @4 |& b0 k8 V& Q6 ?- U2 Y                    Message::ByeBye =&gt; {5 |" T0 L, u1 w
                        println!("ByeBye from worker[{}]", id);% S2 z9 J  l& \7 L& P( F+ c  P' y: R
                        break; N& e' {( q: i5 }. B
                    },: e+ o2 Q: H- H$ @  B- L5 U. f( |
                }  
- r, M( q* V' e) f+ f  s& ~6 t            }% c7 `3 W0 [9 L* h' [
        });6 P' u5 S0 C  L& D/ z$ R  I
( c5 I, z; e2 F% w
        Worker {: c# x/ P/ D# h! {2 q* V" @3 l% }' T
            _id: id,
; n7 s% k$ E7 D' C% S% F/ ~            t: Some(t),
# \! n, T% H. K( ^9 Z! h) D& o- f7 y        }
! v1 E& {3 p8 h' }# i    }
( F9 u# T1 ?7 z% g$ m}! o) [$ x7 e2 R" Y5 P
</code></pre>
! Q8 j/ q4 E9 w* d; B9 f<p><strong>let message = receiver.lock().unwrap().recv().unwrap();</strong> 这里获取锁后从receiver获取到消息体,然后let message结束后rust的生命周期会自动释放掉锁。<br>) @# b; a7 c) E) t
但如果写成</p>
! h% y" D7 C7 u, c6 Q4 S8 M; j<pre><code>while let message = receiver.lock().unwrap().recv().unwrap() {
. O7 w$ ], R  V6 {( x' U};
0 r; \" _8 m7 _+ G</code></pre>
: s/ x$ l( _+ r  {<p>while let 后面整个括号都是一个作用域,要在这个作用域结束后,锁才会释放,比上面let message要锁定久时间。<br>
* u# S$ H, R: Q2 L' t# i8 brust的mutex锁没有对应的unlock方法,由mutex的生命周期管理。</p>8 G! a+ |' M' f, d( z1 L) X6 m6 b
<p>我们给Pool实现<code>Drop</code> trait, 让Pool被销毁时,自动暂停掉worker线程的执行。</p>
# f# m  [2 R7 a3 Z( l<pre><code>impl Drop for Pool {
- i; c' i. k+ D$ H& W0 a    fn drop(&amp;mut self) {6 \0 ^& M( p/ _# A& l
        for _ in 0..self.max_workers {
* D6 r% [; z3 ?+ i8 S            self.sender.send(Message::ByeBye).unwrap();
. k( S; s5 P* h        }4 P4 z3 d  M2 x) D. a
        for w in self.workers.iter_mut() {) s! J3 \0 B! p7 S& M
            if let Some(t) = w.t.take() {
! Y. F  K# O( P! z6 Z8 y7 i                t.join().unwrap();
* S4 k  M" d  q, L/ N( {8 s            }
& ?; `# j5 h: r# H& K        }+ F1 u8 ~% K! F, i& X
    }, f0 G0 b! R8 {; I1 a- R
}
% I% J1 e* @1 [9 M! e" `9 b
1 b: n6 ]6 p, `# d- k' V</code></pre>8 L3 J- ^1 J/ w1 v- ?
<p><strong>drop方法里面用了两个循环</strong>,而不是在一个循环里做完两件事?</p>3 O# W% \$ _/ o) t
<pre><code>for w in self.workers.iter_mut() {, P) M! p3 ?, g) F# P
    if let Some(t) = w.t.take() {- b5 `8 T( K' M, s0 t
        self.sender.send(Message::ByeBye).unwrap();3 h* q7 L" }3 I; I7 g1 X, G
        t.join().unwrap();  N3 r2 b& F8 g% g2 A8 r- ?
    }* |1 g0 c9 T4 D0 h  B
}
1 ~# p! o$ P: E, ^) C2 O* }7 A& r3 P6 Q7 |2 x6 N5 |  i9 d
</code></pre>+ m4 `8 s& c4 k  K9 H' H% i0 \
<p>这里面隐藏了一个会造成死锁的陷阱,比如两个Worker, 在单个循环里面迭代所有Worker,再将终止信息发送给通道后,直接调用join,<br>
, K% ~4 ^" h+ r; ~9 [- U我们预期是第一个worker要收到消息,并且等他执行完。当情况可能是第二个worker获取到了消息,第一个worker没有获取到,那接下来的join就会阻塞造成死锁。</p>
+ y3 [( \) P* D& _" b- k<p><strong>注意到没有,Worker是被包装在Option内的</strong>,这里有两个点需要注意</p>
3 r+ P: p. u0 a6 A6 t<ol>
+ a, |% N* h9 |<li>t.join 需要持有t的所有权</li>! q# o% a% p7 q  {, j- k* F' J- U
<li>在我们这种情况下,self.workers只能作为引用被for循环迭代。</li>9 L  ], V5 b  j7 V2 u
</ol>) Z$ @4 Q" [; ~1 O6 Z0 _5 X
<p>这里考虑让Worker持有<code>Option&lt;JoinHandle&lt;()&gt;&gt;</code>,后续可以通过在Option上调用take方法将Some变体的值移出来,并在原来的位置留下None变体。<br>  g! a7 C( y1 C* U9 }4 ~4 R
换而言之,让运行中的worker持有Some的变体,清理worker时,可以使用None替换掉Some,从而让Worker失去可以运行的线程</p>3 _# w: L' h/ N7 ]3 d: Z, t- M
<pre><code>struct Worker where
: M* E& g( f2 P% D- r{
3 N, ^4 \$ `+ H5 i' s    _id: usize,
* g, M7 r$ r( A/ \5 r, Y    t: Option&lt;JoinHandle&lt;()&gt;&gt;,
' _( m" e+ K- z+ W0 U+ t% P}: h+ y8 A# N/ y/ F9 o
</code></pre># r* t; [! X, Z' i
<h1 id="要点总结">要点总结</h1>
% C& s) U. k' o<ul>, ?9 E; _! D4 Z9 ^8 i
<li>Mutex依赖于生命周期管理锁的释放,使用的时候需要注意是否逾期持有锁</li>
7 [8 V' X& d+ n6 M; M$ ~* w2 D4 @& h<li><code>Vec&lt;Option&lt;T&gt;&gt;</code> 可以解决某些情况下需要T所有权的场景</li>
4 f- h* M' @$ w# R</ul>9 t. V+ d1 F' _- Q+ u/ `
<h1 id="完整代码">完整代码</h1>
( a0 J3 f. i( A: |' S4 O9 l<pre><code>use std::thread::{self, JoinHandle};
9 z5 m  p5 E6 [$ y; {use std::sync::{Arc, mpsc, Mutex};% v0 b2 B# U! Z5 c
, D# y3 v' f9 ?2 J) o( [' F

1 @8 Q" n  s" a, Q/ Q4 ztype Job = Box&lt;dyn FnOnce() + 'static + Send&gt;;$ I' [8 t: J* i0 D
enum Message {
# O6 g* W/ {5 W- a$ [    ByeBye,/ I1 v: ~$ C" R" m+ X) ]
    NewJob(Job),& q: u3 O5 A1 ?& `; t
}) Z2 ]$ ~7 t% B2 O8 o- i' t+ `, j

7 |: @! F  l( S8 \9 Tstruct Worker where
  K; }0 g. a2 T+ @% p{  D2 _2 V: f. u8 J' C
    _id: usize,6 A( I( V* g% W6 f" J6 v' n0 @
    t: Option&lt;JoinHandle&lt;()&gt;&gt;,. h( j3 `( X- V* \) W4 |
}
& X7 t; B- u/ P9 x3 L6 g, Z# t
8 o; V% ]3 f) r' `$ q- R/ Q% Kimpl Worker
4 \' L, ]5 k- X- u' S! {7 ?' v6 c{& d) e$ V, [/ q7 \9 D; B& x9 x3 V
    fn new(id: usize, receiver: Arc::&lt;Mutex&lt;mpsc::Receiver&lt;Message&gt;&gt;&gt;) -&gt; Worker {( [6 d7 ]1 t7 W! q6 q- m& ~# g' h6 W9 Y
        let t = thread::spawn( move || {  w& R# A& e, B  N* G
            loop {+ E$ w6 R$ h. B3 @
                let message = receiver.lock().unwrap().recv().unwrap();( l, V6 n/ c" R/ E
                match message {
$ E6 w) {+ E( r9 Q5 q                    Message::NewJob(job) =&gt; {# ^  U& i( a' @5 D
                        println!("do job from worker[{}]", id);$ I2 x+ O0 Y0 M. K2 C; _! W
                        job();
+ s4 K4 U1 i3 B8 A" M                    },$ z0 _% N* O+ l: ~1 Z
                    Message::ByeBye =&gt; {! x5 ]) z7 m7 R8 C' F
                        println!("ByeBye from worker[{}]", id);; C: ]- e& i+ m4 O' k
                        break- T- q+ i$ e& a
                    },
& }/ G$ _0 z5 J* P' V# f                }  
8 U7 D- ?5 o$ ^2 Q1 p; C+ G            }
- u0 S, W# i. @& U8 O; [0 e7 t        });
2 E7 ?1 G; l, g; _0 w# G2 W( _) r) o3 V- ]! @
        Worker {
( ^0 ~( N6 Y: `/ l4 H5 s  C: d- \$ C9 p            _id: id,
. B6 f9 V! {' J' W+ d4 T            t: Some(t)," D8 _& A. B& m
        }" T# r$ f) O$ J: T: ^
    }
: _6 r* Y# F# Z$ {6 P. n}
4 ~3 P# H3 J: ]+ X/ f2 o$ X
- x# x% C. A4 P- j+ Qpub struct Pool {9 i3 [7 j# }! w! d
    workers: Vec&lt;Worker&gt;,
5 \$ ~/ n: u& c( r2 J% Q+ q    max_workers: usize,
, {9 i1 t- f& p4 m    sender: mpsc::Sender&lt;Message&gt;
) [9 w2 w6 D% h  f" P+ e, U+ N8 j}) s' W5 F$ L2 [, r! n

% [4 r$ w/ g+ j8 u$ O  L0 ~0 Kimpl Pool where {& b+ X6 t3 V& C# J+ ^
    pub fn new(max_workers: usize) -&gt; Pool {/ w6 d0 Q% h# J" i! @9 s" v+ O
        if max_workers == 0 {% [; F( R3 j! E/ z7 w
            panic!("max_workers must be greater than zero!")
$ ]1 e8 l. [: A0 x; D( I! g- M" O        }
, [% Y) K4 N* P: e' A+ A% G& e7 B        let (tx, rx) = mpsc::channel();
& E6 j+ y, u' y  t3 h! Q5 B
2 p/ O2 |  Z6 D0 g% m# H8 r        let mut workers = Vec::with_capacity(max_workers);# M* G: L# D/ a9 [
        let receiver = Arc::new(Mutex::new(rx));
5 K) N1 Q$ Y4 g1 ?0 H6 E        for i in 0..max_workers {
1 H. S: b  O. B9 B$ y5 g3 w) l, J; ^            workers.push(Worker::new(i, Arc::clone(&amp;receiver)));* B8 ~6 d3 @9 u6 l
        }/ c: ?( p% D3 u1 r! X# c
: s$ ^; \9 [1 }& ~" h6 d* T0 `2 q7 _
        Pool { workers: workers, max_workers: max_workers, sender: tx }/ O. e  A% q: p& U( N- V+ s3 K, N
    }: s' _" [$ u, J0 d9 J
    ! G3 w' w, F2 I
    pub fn execute&lt;F&gt;(&amp;self, f:F) where F: FnOnce() + 'static + Send5 e9 z" D3 f( u6 s/ e% e
    {
- s, m( R4 D5 k  s) E  Z  n
$ t" ^7 x5 v0 L5 E8 x) d0 C, d        let job = Message::NewJob(Box::new(f));
  i3 L) H# b- ~  G% Z        self.sender.send(job).unwrap();
8 C6 {- D( X; R% _: b" R    }6 v. l" j5 `! u% a9 w7 c( k
}
) n5 Q4 \% B1 G) N8 J* C8 D6 C0 q5 w* i  t1 e" b
impl Drop for Pool {* j2 l7 a0 V% J# e- h7 c; v/ x' X
    fn drop(&amp;mut self) {; G. S* Z% z+ ?
        for _ in 0..self.max_workers {! g3 d- k  I2 t( b( s8 {% {! B
            self.sender.send(Message::ByeBye).unwrap();
# p: ^  b3 k/ G2 S& a        }5 f! T( D9 v$ s
        for w in self.workers {
( B( y1 _: K0 b9 I7 z; ?, g% u            if let Some(t) = w.t.take() {0 ]& U/ |: |  i* S# c9 B
                t.join().unwrap();
/ v: P' t& j) U$ e1 }  O# R( Y            }
0 \( L5 B# v4 H: S        }
& p3 K6 k" o( L# z' j) d    }; o$ v  X* p2 Y* s" f
}
, N1 [# b; D) y3 @  x! z# ?2 T7 ~

8 q9 s! B! z6 A& i0 D#[cfg(test)]
& N7 k( M  ^8 v( Smod tests {# L, [, w% |" Y+ G/ G+ j# O
    use super::*;
$ O. n$ F5 }$ @0 @) `    #[test]- I+ _" b/ l0 w% {4 a  [6 s# F
    fn it_works() {" R3 t0 f$ B: j0 }( Q- M' g
        let p = Pool::new(4);7 c4 F. s$ T6 d4 C; u/ E
        p.execute(|| println!("do new job1"));! E) |" ?- m: J' B1 F
        p.execute(|| println!("do new job2"));" |7 Z  Y+ ]& `, u) L/ y) p
        p.execute(|| println!("do new job3"));
- d; F+ L6 m9 `5 @7 v* Q        p.execute(|| println!("do new job4"));
3 A+ f! ?" L4 S/ o! X+ C    }0 S$ k) t2 R) E  D5 E6 r, L# F$ t, K
}: `0 \6 U8 q3 v8 z
</code></pre>- c' U0 _" s# F4 X, s0 ?
' z- l- _: p. d7 N
回复

使用道具 举报

懒得打字嘛,点击右侧快捷回复 【右侧内容,后台自定义】
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

手机版|飞雪团队

GMT+8, 2025-11-29 05:15 , Processed in 0.066867 second(s), 22 queries , Gzip On.

Powered by Discuz! X3.4

Copyright © 2001-2021, Tencent Cloud.

快速回复 返回顶部 返回列表