飞雪团队

 找回密码
 立即注册
搜索
热搜: 活动 交友 discuz
查看: 14739|回复: 0

rust 实战 - 实现一个线程工作池 ThreadPool

[复制链接]

8044

主题

8132

帖子

2万

积分

管理员

Rank: 9Rank: 9Rank: 9

积分
26462
发表于 2022-2-12 14:35:42 | 显示全部楼层 |阅读模式

6 [* ]+ b5 `, j/ b+ E+ {<h1 id="如何实现一个线程池">如何实现一个线程池</h1>. m! D, `0 t' s  O$ W
<p>线程池:一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。 例如,对于计算密集型任务,线程数一般取cpu数量+2比较合适,线程数过多会导致额外的线程切换开销。</p>* ?5 E/ l+ Y- W  Y
<p>如何定义线程池Pool呢,首先最大线程数量肯定要作为线程池的一个属性,并且在new Pool时创建指定的线程。</p>
+ k. Z* v4 u: Q" C1 r2 O: }! s6 [<p>线程池Pool</p>- C' |% a% _9 j  e* C% {
<pre><code>pub struct Pool {
# J9 t) C3 m. @. R  max_workers: usize, // 定义最大线程数" c, A% u3 l3 N) l6 M+ k% o/ n7 O
}
8 a  V) r/ p% L' K# M9 b
9 {- c# y: ~+ F3 s/ R$ I1 Eimpl Pool {3 K! I$ W; A) [9 a' r* [, _$ q
  fn new(max_workers: usize) -&gt; Pool {}/ {. y; h* u5 {6 z1 {
  fn execute&lt;F&gt;(&amp;self, f:F) where F: FnOnce() + 'static + Send {}! S$ e% B2 f; |7 T! `
}8 Y1 v* G/ ^! Z7 f9 l2 B# i
. L. g8 a' O) x/ t" t
</code></pre>+ `& _! @5 m5 \  e: [
<p>用<code>execute</code>来执行任务,<code>F: FnOnce() + 'static + Send</code> 是使用thread::spawn线程执行需要满足的trait, 代表F是一个能在线程里执行的闭包函数。</p>
1 I8 ^, p- f( @/ z# j4 j<p>另一点自然而然会想到在Pool添加一个线程数组, 这个线程数组就是用来执行任务的。比如<code>Vec&lt;Thread&gt;</code> balabala。这里的线程是活的,是一个个不断接受任务然后执行的实体。<br>
# Q; V1 \# y& S可以看作在一个线程里不断执行获取任务并执行的Worker。</p>9 ?2 q4 g7 [5 G
<pre><code>struct Worker where
7 n! ?0 u- {6 r% X  X& h{
' [6 e* |6 y' U    _id: usize, // worker 编号) I6 l5 v4 C  V  @4 j
}
; o4 t3 }1 V/ G+ d# r! k3 e</code></pre>
- @" @' u0 s' _2 y" K<p>要怎么把任务发送给Worker执行呢?mpsc(multi producer single consumer) 多生产者单消费者可以满足我们的需求,<code>let (tx, rx) = mpsc::channel()</code> 可以获取到一对发送端和接收端。<br>, i8 ~8 I) n: H, ?! \* z) r
把发送端添加到Pool里面,把接收端添加到Worker里面。Pool通过channel将任务发送给多个worker消费执行。</p>% A# V$ {7 \9 U1 @( h3 f
<p><strong>这里有一点需要特别注意,channel的接收端receiver需要安全的在多个线程间共享</strong>,因此需要用<code>Arc&lt;Mutex::&lt;T&gt;&gt;</code>来包裹起来,也就是用锁来解决并发冲突。</p># F& W, H8 o. x5 V) n, f
<p>Pool的完整定义</p>
% O& `) G3 j3 \; L0 r# N: B<pre><code>pub struct Pool {
( r% p# Z6 l3 x9 E, }3 E    workers: Vec&lt;Worker&gt;,! U$ v# {1 W5 L# M9 ^) v' F
    max_workers: usize,
) E& P1 X4 m; ]* Q    sender: mpsc::Sender&lt;Message&gt;
- i5 W, ~+ L0 [; @$ E}
2 e* ^4 ?1 U; X7 i8 w5 q</code></pre>
; ~# U7 ^/ v2 E" |* V/ Y<p>该是时候定义我们要发给Worker的消息Message了<br>7 I( K- s" W! f' n# M! k
定义如下的枚举值</p>
9 @7 ]2 r4 m$ A0 n9 V4 q7 Y( {! v<pre><code>type Job = Box&lt;dyn FnOnce() + 'static + Send&gt;;
7 \7 W# g5 U6 Senum Message {  H. w6 w, p5 X; @
    ByeBye,
8 w/ V/ p8 s# g' K6 b    NewJob(Job),' g( \0 O, l  A7 z# K
}' |5 ?% q' J2 b
</code></pre>2 C  d+ m2 B$ f% F1 X( b! ?5 j* p; Z
<p>Job是一个要发送给Worker执行的闭包函数,这里ByeBye用来通知Worker可以终止当前的执行,退出线程。</p>
1 f  O3 u& O' b% f. {4 f: e<p>只剩下实现Worker和Pool的具体逻辑了。</p>
6 F  S! N6 Q% O1 v1 O$ f<p>Worker的实现</p>, ]! K' E- ?3 M# G! V# O
<pre><code>impl Worker
. a# W1 h" O9 D& ^. y" H4 x) M2 X{7 E% `$ T; h( y& b  e& ?, [4 o
    fn new(id: usize, receiver: Arc::&lt;Mutex&lt;mpsc::Receiver&lt;Message&gt;&gt;&gt;) -&gt; Worker {
, v0 A" n" U" }$ `! X* @        let t = thread::spawn( move || {/ W8 i6 z* f+ ?9 ~+ E/ n: @
            loop {: m0 U: `1 Z" }% d0 E6 {8 R( ?7 T
                let receiver = receiver.lock().unwrap();
( I. D* A+ i% w. ]+ O                let message=  receiver.recv().unwrap();
% j3 w0 U$ B! u/ [                match message {
  r/ R& o; x: b; J9 [9 |! s                    Message::NewJob(job) =&gt; {& O) a  k) u" C" p6 c* L* N" `
                        println!("do job from worker[{}]", id);
. Q2 M7 n5 v2 j# a                        job();% y; W0 }# Y- h! X
                    },
" F8 Y) e" l( J! k+ T                    Message::ByeBye =&gt; {
. K8 y) s$ k" N- P1 p& s! X                        println!("ByeBye from worker[{}]", id);
. P/ ]) `# M6 Y+ y7 Q' Z1 l                        break
& `4 J. o% T6 Y8 R. [7 V& K. B                    },
4 k7 Q: V5 R' \                }  
0 C0 `# D2 c) N2 T            }! _! K8 `! G# B6 ]+ Q! t
        });: z, _2 |) }4 @' q4 l

! F* G; A% L5 k/ x        Worker {4 u4 t; D& K6 S$ {! ~
            _id: id,' J  S7 h( x6 y- m$ n* @' l( f) `* x
            t: Some(t),
9 z: u! ]* R$ t  h9 N3 o! x  w: Z        }' Z- b6 q! s3 M" W: G" q$ M
    }% \% z  r2 F0 |" P
}' c3 L; d0 t+ z- M
</code></pre>
4 H+ r) l( n9 {<p><strong>let message = receiver.lock().unwrap().recv().unwrap();</strong> 这里获取锁后从receiver获取到消息体,然后let message结束后rust的生命周期会自动释放掉锁。<br>
8 s% D5 T, _/ r  b9 R0 j但如果写成</p>& G1 }1 w0 @( m8 ]) u
<pre><code>while let message = receiver.lock().unwrap().recv().unwrap() {0 o& E  N6 {# G: s
};
7 X: F  F; M! O  Q5 A/ C</code></pre>
! A- H) b, J+ Q$ z! h<p>while let 后面整个括号都是一个作用域,要在这个作用域结束后,锁才会释放,比上面let message要锁定久时间。<br>
% p  @9 F3 b3 }/ Qrust的mutex锁没有对应的unlock方法,由mutex的生命周期管理。</p>$ H2 T  s+ l& P& G8 K, V
<p>我们给Pool实现<code>Drop</code> trait, 让Pool被销毁时,自动暂停掉worker线程的执行。</p>
! P9 E5 i4 w/ {  z) M, }<pre><code>impl Drop for Pool {
$ i6 Z* b9 s; i" G$ O    fn drop(&amp;mut self) {
0 x& W' _  Q% m  L        for _ in 0..self.max_workers {
9 R; K/ e: y, L' E: U' `7 I            self.sender.send(Message::ByeBye).unwrap();
3 G5 U& ]7 M, R$ f+ q" Y/ _        }
( s- ]9 x' H$ s1 y        for w in self.workers.iter_mut() {
1 b7 j8 J/ c- x1 Y5 x! [& z) h3 B            if let Some(t) = w.t.take() {3 X. U2 n0 K; _- D% C) T
                t.join().unwrap();
4 Y1 b: @/ G' X( z- e, K8 ]  |. ]            }
2 _$ n4 y5 q/ ]$ Y0 e8 a% x        }
" |1 ^* P, [3 i) ?    }1 y# M+ h  r' ]; {
}
, j$ o9 ^. f: L5 |  u; P$ W" u8 u" I/ C' U
</code></pre>) ?* R/ Z; ?  K) y
<p><strong>drop方法里面用了两个循环</strong>,而不是在一个循环里做完两件事?</p>
/ y* U% {6 l+ i5 K' H% y<pre><code>for w in self.workers.iter_mut() {
* n! Y0 u1 o  R7 {% t" m$ _    if let Some(t) = w.t.take() {
) O+ f/ c+ D7 [/ V9 m        self.sender.send(Message::ByeBye).unwrap();
2 E9 X* Z/ P& @        t.join().unwrap();
; q% t9 ]# n" t    }- d  y  @( |6 @/ W  v7 e! t
}
& R8 Z  K) I2 }* N3 W- E$ d! b+ ]
9 L9 f5 G% c% T; K( P+ C</code></pre>
- \) i% F% _6 @<p>这里面隐藏了一个会造成死锁的陷阱,比如两个Worker, 在单个循环里面迭代所有Worker,再将终止信息发送给通道后,直接调用join,<br>) q; G; @$ U* a1 W0 ]
我们预期是第一个worker要收到消息,并且等他执行完。当情况可能是第二个worker获取到了消息,第一个worker没有获取到,那接下来的join就会阻塞造成死锁。</p>
: F* X% C# p' K. B# n<p><strong>注意到没有,Worker是被包装在Option内的</strong>,这里有两个点需要注意</p>+ N9 u* n! Y5 S- ~
<ol>
' Y" g) L( _& Q<li>t.join 需要持有t的所有权</li>
( K" B; i) x: k<li>在我们这种情况下,self.workers只能作为引用被for循环迭代。</li>  `9 A7 G+ ~( G/ ?" ^9 t! l
</ol>+ C- k$ i8 }3 _% p- T
<p>这里考虑让Worker持有<code>Option&lt;JoinHandle&lt;()&gt;&gt;</code>,后续可以通过在Option上调用take方法将Some变体的值移出来,并在原来的位置留下None变体。<br>
" Z* w8 w/ [6 U4 v6 H+ B* e8 X0 ?换而言之,让运行中的worker持有Some的变体,清理worker时,可以使用None替换掉Some,从而让Worker失去可以运行的线程</p>: h8 w' A) w9 r9 j) _
<pre><code>struct Worker where/ V' _6 ]$ m$ ?" @2 o
{. X8 s; L& g4 w9 n
    _id: usize,( |9 x3 ~8 l+ c
    t: Option&lt;JoinHandle&lt;()&gt;&gt;,/ w0 u! D0 ]3 v/ O/ x. r6 N4 A$ J
}
( s7 B: U4 ]* [2 }5 |% ~: ^7 P/ a, D</code></pre>
( Q- Z* _- q: Y3 G<h1 id="要点总结">要点总结</h1>. {8 J3 n( Y! v) b2 D7 N
<ul>
: Y; x/ v# U0 |3 W: Y1 J& r; G9 B: V<li>Mutex依赖于生命周期管理锁的释放,使用的时候需要注意是否逾期持有锁</li>
5 B: J( n+ Y% g- I/ o3 ]<li><code>Vec&lt;Option&lt;T&gt;&gt;</code> 可以解决某些情况下需要T所有权的场景</li>% ]) b+ y% ^& `# d7 A5 W
</ul>
: T$ ^% ?* d; A* V) m<h1 id="完整代码">完整代码</h1>
( s3 s8 T) a, v1 l; Z0 \; K<pre><code>use std::thread::{self, JoinHandle};
" E+ V* a$ d( @4 H1 Z4 ouse std::sync::{Arc, mpsc, Mutex};9 R" B% ]* E; j6 y

3 R7 D( m% u+ X! T! x7 \
# s- `! ?) S! R5 `) z9 U% R5 ltype Job = Box&lt;dyn FnOnce() + 'static + Send&gt;;
. V4 d5 j  L& x3 ?) Xenum Message {9 l& O# w  Z+ P2 S& Q0 [
    ByeBye,. C1 _) ]1 y/ y" h
    NewJob(Job),8 m8 O8 ~; }7 E, T
}) c* S. l. O0 c1 N8 A- \0 [
0 {' p- S: n3 w9 L0 a- {
struct Worker where
; G" t* K( k* I9 m# b7 a1 v2 ^2 d  v{
! d4 ^7 J* _: U% ^2 V* ]    _id: usize,: ^/ s7 l( b% B9 J/ M
    t: Option&lt;JoinHandle&lt;()&gt;&gt;,
5 y: E" l! n+ M/ F, J- C( ~" f}3 w/ Y0 N" i- p1 ^2 t% L

" T0 `0 @1 {# |4 Cimpl Worker1 I  i; ~& h6 M- F5 k. Q/ A
{/ S# Y" Y+ Q8 t- _# c8 n
    fn new(id: usize, receiver: Arc::&lt;Mutex&lt;mpsc::Receiver&lt;Message&gt;&gt;&gt;) -&gt; Worker {
3 f. o0 s3 i! m        let t = thread::spawn( move || {* m, [+ P( ~6 @! u, F4 Q0 [( I; r
            loop {
: d) J& k. N9 C5 l                let message = receiver.lock().unwrap().recv().unwrap();
) [( S( [2 q# ~& w6 G- m                match message {0 V- G- G7 R. X
                    Message::NewJob(job) =&gt; {
, i: w1 _4 m# U! f: c                        println!("do job from worker[{}]", id);
' [' ]: A) e3 R# V% q                        job();) `9 ?2 e+ q2 c9 t
                    },  j  J( F& K) q$ T
                    Message::ByeBye =&gt; {4 D5 G1 z4 m# t% M
                        println!("ByeBye from worker[{}]", id);6 ~6 g2 |1 v5 p+ U2 e
                        break
+ F$ H6 ]2 Q" D+ `+ i& b                    },1 ]8 W/ y5 Q) N' Y9 N
                }  ! {: I9 S8 F8 G: B* E
            }
* \2 {$ e: Y2 ^# k1 K! e        });
* G8 |' y' C9 g, f) r: |' A: _5 `4 R" C: Y- O3 ^, _( o$ [% A; b
        Worker {
# c; v! Y5 }/ x, D            _id: id,' {6 V5 T4 }1 a6 G
            t: Some(t),9 ^8 w! H( ?+ b+ W
        }
& C3 U  c# H' t- p9 [! S    }" z" Q  f& h3 y7 ?$ b3 V4 c
}
, T  e1 C! r/ x" n$ I+ m8 P+ G' U
0 V1 ~( J+ J1 j0 O3 ^pub struct Pool {; m# l5 @' I) l2 ^$ E
    workers: Vec&lt;Worker&gt;," Z+ i* }3 y5 @5 w' n2 _
    max_workers: usize,& ], Q2 R4 J" ~8 d
    sender: mpsc::Sender&lt;Message&gt;; ?/ N+ J" }  V* F( ?! x1 L; ^! K/ t/ S
}6 |' W% A9 F/ T* {3 O! Q

" r' i$ |4 A1 W1 F: e+ R4 h- Iimpl Pool where {5 x4 K+ f1 l  Q
    pub fn new(max_workers: usize) -&gt; Pool {) R$ x% |) D! q% |! l
        if max_workers == 0 {
+ b# T, O# Q5 `; A            panic!("max_workers must be greater than zero!")( b; F9 Y2 V4 W! b
        }
# l5 y" b2 f% @5 ^5 v        let (tx, rx) = mpsc::channel();7 o. d, a$ D+ f" C
: r& m1 O% D0 m
        let mut workers = Vec::with_capacity(max_workers);
& y$ M; q: q& ?5 u: g5 s- [        let receiver = Arc::new(Mutex::new(rx));
% N+ l- t; W. H6 n1 b4 v: e  H% A        for i in 0..max_workers {
! {; Y8 x( T- e) j3 T, x            workers.push(Worker::new(i, Arc::clone(&amp;receiver)));
8 Y# b# E* F! K! T6 K( N6 ~0 G        }
% `! r& x' J. Y
% Z- Z3 Q/ X, @        Pool { workers: workers, max_workers: max_workers, sender: tx }9 o1 B" u- L* x4 E4 G, [: E0 x
    }
# P% R2 y- w" Z( u9 u   
7 E  p' y( _/ ?7 s9 p    pub fn execute&lt;F&gt;(&amp;self, f:F) where F: FnOnce() + 'static + Send! `2 G- |7 [0 q( b  X& x/ l0 e
    {
2 o$ Q5 A' w7 ^8 Z2 N/ V+ b5 l* |" V3 A
        let job = Message::NewJob(Box::new(f));
# S/ [! ]* ^* Y3 X6 y        self.sender.send(job).unwrap();' Y( d- p  J  h
    }1 J6 \' T4 K  S6 ~0 c& l
}
7 i4 _8 l' j! G4 d  R9 c
6 {* Z! H6 q; O/ p" Rimpl Drop for Pool {
& A, B0 a9 @2 Y9 t  m    fn drop(&amp;mut self) {
0 R* T& r5 g! R& T% r- f        for _ in 0..self.max_workers {
9 j4 k% z- }) C, n            self.sender.send(Message::ByeBye).unwrap();4 \, t7 v6 \1 u3 c8 {  C
        }8 q5 k( {& k  R& h# E: ~
        for w in self.workers {. W! C& s" a' s* u4 R/ K* a
            if let Some(t) = w.t.take() {2 o/ d  m6 \' z. q" N
                t.join().unwrap();/ J* g5 r% K# O) a7 \
            }0 G7 ~' e% |6 F  D3 O
        }
( h. m3 \. U/ ^6 ]- x/ s1 @* V    }' E5 b9 Q: Z' f* a2 I% f+ Q
}+ [/ K& G- i$ v6 q4 [" L% i; @

! P7 G9 u. C) m5 _6 ~3 A+ I$ E9 J: |% E, U) W; b
#[cfg(test)]1 U5 O! o% s2 p
mod tests {# H: d+ W3 A8 L* n( \. X. o
    use super::*;, r5 t/ I1 M1 m9 m
    #[test]" F6 s% ]- _0 @' A' [9 L/ @# M+ l
    fn it_works() {
4 G1 G# }3 y  U2 m( X/ |        let p = Pool::new(4);$ u0 d4 j* |/ i( O* }! N
        p.execute(|| println!("do new job1"));8 j( |2 Q6 h7 K7 ^) b
        p.execute(|| println!("do new job2"));
  z! w" j! B" x        p.execute(|| println!("do new job3"));
% `& k6 w4 f% }1 @) a        p.execute(|| println!("do new job4"));
1 V8 `" M; ^. z6 Z5 t! h& G    }: z5 d3 I: X0 z0 \
}5 _3 u. Q' o6 |& i' q
</code></pre>
. b# K1 ]7 H2 e8 `; K) g/ S  M4 }- [3 y
回复

使用道具 举报

懒得打字嘛,点击右侧快捷回复 【右侧内容,后台自定义】
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

手机版|飞雪团队

GMT+8, 2025-12-9 17:35 , Processed in 0.066963 second(s), 22 queries , Gzip On.

Powered by Discuz! X3.4

Copyright © 2001-2021, Tencent Cloud.

快速回复 返回顶部 返回列表