飞雪团队

 找回密码
 立即注册
搜索
热搜: 活动 交友 discuz
查看: 14720|回复: 0

rust 实战 - 实现一个线程工作池 ThreadPool

[复制链接]

8042

主题

8130

帖子

2万

积分

管理员

Rank: 9Rank: 9Rank: 9

积分
26456
发表于 2022-2-12 14:35:42 | 显示全部楼层 |阅读模式
0 D! a4 ^0 g, i* a' c( }
<h1 id="如何实现一个线程池">如何实现一个线程池</h1>
7 ]  V  d2 c2 K3 u6 `7 z<p>线程池:一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。 例如,对于计算密集型任务,线程数一般取cpu数量+2比较合适,线程数过多会导致额外的线程切换开销。</p>' {9 H: n* ~: k" |4 R' F+ e5 D
<p>如何定义线程池Pool呢,首先最大线程数量肯定要作为线程池的一个属性,并且在new Pool时创建指定的线程。</p>
8 {- Z) b8 x: u4 J" s1 R# e<p>线程池Pool</p>$ u: z1 L$ h& p; v! {- n
<pre><code>pub struct Pool {
8 O8 w( }$ M0 c3 f  Q2 H  max_workers: usize, // 定义最大线程数
1 ~) ~* k1 U* K; }}
- ?% G# C! o8 Z: ]; I0 ^
& X1 q0 m3 H$ r* @impl Pool {
* S. u( Q! N* U8 F4 f. A! H  fn new(max_workers: usize) -&gt; Pool {}! W; O2 T* X* _, }
  fn execute&lt;F&gt;(&amp;self, f:F) where F: FnOnce() + 'static + Send {}2 r4 y2 b5 X: ^6 [) I7 d' s. S
}
* a' f2 l. [+ b% \
% U% M( n$ a. U4 K' T& B- G</code></pre>
) s! f" k1 C3 X9 A* ~7 q<p>用<code>execute</code>来执行任务,<code>F: FnOnce() + 'static + Send</code> 是使用thread::spawn线程执行需要满足的trait, 代表F是一个能在线程里执行的闭包函数。</p>( n) ^: E8 p7 J
<p>另一点自然而然会想到在Pool添加一个线程数组, 这个线程数组就是用来执行任务的。比如<code>Vec&lt;Thread&gt;</code> balabala。这里的线程是活的,是一个个不断接受任务然后执行的实体。<br>0 Z2 t1 [- w3 _
可以看作在一个线程里不断执行获取任务并执行的Worker。</p>8 `& j4 r6 P6 k2 i  z% ?/ L
<pre><code>struct Worker where
4 @' y( }+ R2 N8 ], j{
& t- W" O( f" H2 a    _id: usize, // worker 编号6 B9 s+ O, ~" }$ D" `- {
}8 }* D7 s: [( Q1 j4 q
</code></pre>
' {& N& v' s+ w0 S7 t' {5 F9 h# K<p>要怎么把任务发送给Worker执行呢?mpsc(multi producer single consumer) 多生产者单消费者可以满足我们的需求,<code>let (tx, rx) = mpsc::channel()</code> 可以获取到一对发送端和接收端。<br>* j/ ?5 T3 K) P
把发送端添加到Pool里面,把接收端添加到Worker里面。Pool通过channel将任务发送给多个worker消费执行。</p>; Y9 @# b! }# G2 b' {* b7 I) `
<p><strong>这里有一点需要特别注意,channel的接收端receiver需要安全的在多个线程间共享</strong>,因此需要用<code>Arc&lt;Mutex::&lt;T&gt;&gt;</code>来包裹起来,也就是用锁来解决并发冲突。</p>
2 V1 I4 u' |! ^' t  _' V4 S<p>Pool的完整定义</p>: `% z) Q, Y+ Q0 e
<pre><code>pub struct Pool {* s' p! D2 z9 N* i! M7 Z% ?) S/ {
    workers: Vec&lt;Worker&gt;,1 ~; R/ q2 O# s. O& A
    max_workers: usize,
" @; t5 ?; f$ b    sender: mpsc::Sender&lt;Message&gt;
& ~2 n. c$ g6 n! e- t) s+ }2 K& x}
4 Z- ~- l# P- J5 ]# [( Q</code></pre>& ~9 h, d+ X6 i+ x& x5 \9 ~
<p>该是时候定义我们要发给Worker的消息Message了<br>
3 P2 h1 v2 k* R7 U) C9 K* {& Q0 K定义如下的枚举值</p>
5 Z2 I$ u) _0 R<pre><code>type Job = Box&lt;dyn FnOnce() + 'static + Send&gt;;7 G8 i  E# X( h. D( n8 e  s
enum Message {" Y& r0 p5 y: y1 f( X
    ByeBye,
4 P: X0 Z. F, ?- ?. n4 J    NewJob(Job),
- d& G6 Y2 O. M5 a}: w1 u% l# i0 E5 E
</code></pre>- T( Y& Z; _, e% U" I$ l
<p>Job是一个要发送给Worker执行的闭包函数,这里ByeBye用来通知Worker可以终止当前的执行,退出线程。</p>, k( I; M( j! |6 s% h
<p>只剩下实现Worker和Pool的具体逻辑了。</p>% U, y- W  ?/ o1 K0 A/ s" J, C! G
<p>Worker的实现</p>+ w+ e/ m  h/ S& }* P4 T
<pre><code>impl Worker
5 \4 j+ t  z( n{
7 B4 J/ _* Q- y    fn new(id: usize, receiver: Arc::&lt;Mutex&lt;mpsc::Receiver&lt;Message&gt;&gt;&gt;) -&gt; Worker {
  D( v; f: \9 z! s        let t = thread::spawn( move || {
& S# w8 b7 T/ a0 ^' r            loop {% K& u5 z/ w6 r+ a& u$ f
                let receiver = receiver.lock().unwrap();
7 h& b1 G, O% y" P                let message=  receiver.recv().unwrap();& q# `* a  D8 x7 X. k
                match message {
  I) f: V$ v1 E* B" y                    Message::NewJob(job) =&gt; {1 l7 W" ?9 }# t$ l  _/ A
                        println!("do job from worker[{}]", id);4 u, g% }  B" c% \+ b  s( l
                        job();- W$ N2 l& O% |- i
                    },
( y+ ]* N# {& o9 d# L) T: t                    Message::ByeBye =&gt; {
1 d* ^$ @, `( f0 V/ h                        println!("ByeBye from worker[{}]", id);
  e) A7 a9 ~5 f9 l1 m1 I: n                        break: q  ^) x7 {1 Q1 r" W
                    },/ k5 G3 ?2 F% g
                }  
: b& a# p8 C" T, r0 k( I' I( c            }
- a' X( g6 D. C8 F& J* H: C        });- \5 M  O5 g2 D4 @. ~8 a

( V, X4 ^2 s9 ^( J        Worker {4 y8 Y3 x: ?& m  Y2 V4 G* ]
            _id: id,
2 f" n' Z. v; ~# h+ A            t: Some(t),3 I( m! c, B- I2 A% V  `: W+ Z) ]# j
        }, F( B' P. x; U; U4 J* x/ D6 g
    }; R" X7 d6 F$ ]
}
$ C  x# U6 _8 ^. ^, n0 U</code></pre>
' S" L( l4 l& K6 ?7 g<p><strong>let message = receiver.lock().unwrap().recv().unwrap();</strong> 这里获取锁后从receiver获取到消息体,然后let message结束后rust的生命周期会自动释放掉锁。<br>2 a' ^9 i, o8 p# u
但如果写成</p>( m8 U6 Z: }0 X, F" u) S
<pre><code>while let message = receiver.lock().unwrap().recv().unwrap() {
$ ^% y6 |9 v+ l! Q) [2 ~( Z};  \! N: K( f: E; N( Z4 W0 F4 L
</code></pre>" A2 y- B6 J# S9 ]
<p>while let 后面整个括号都是一个作用域,要在这个作用域结束后,锁才会释放,比上面let message要锁定久时间。<br>
# D! w  C* [$ ?5 ~/ Urust的mutex锁没有对应的unlock方法,由mutex的生命周期管理。</p>
& j' X* [7 n3 f( W4 l) T<p>我们给Pool实现<code>Drop</code> trait, 让Pool被销毁时,自动暂停掉worker线程的执行。</p>
8 N4 }: R! h  N0 F" x  {. m<pre><code>impl Drop for Pool {* w6 e! z4 n' ^- W- [
    fn drop(&amp;mut self) {
6 ?3 d4 m+ c& `4 X        for _ in 0..self.max_workers {
* h* D4 [& ~. |1 L            self.sender.send(Message::ByeBye).unwrap();
# [2 ?5 {$ c( ]" a9 ?& ~' E        }; x6 R  C) I/ j; k! E5 Z2 W
        for w in self.workers.iter_mut() {- _' G* ^( {% W
            if let Some(t) = w.t.take() {2 _. [6 D1 s* Z/ n# f, C. `
                t.join().unwrap();+ j& N6 S4 e& j# T
            }2 f5 L) F1 y, S# {, R
        }
. w1 [4 ^+ \: o  b) e' Y) M' `    }
% ^/ P1 y2 H, q3 u}+ ~# Q2 Y% [' H+ N2 D5 m

0 x! b, _1 M  I# `' S</code></pre>/ y6 R* _) D, M3 y/ x* H+ Q' x0 I6 ]
<p><strong>drop方法里面用了两个循环</strong>,而不是在一个循环里做完两件事?</p>
2 z! [7 p0 k3 ^<pre><code>for w in self.workers.iter_mut() {
# H0 y8 @' y! S6 W+ R) u! {9 |    if let Some(t) = w.t.take() {
# y. `) N9 |, e- C6 O        self.sender.send(Message::ByeBye).unwrap();* j5 Y$ c2 f8 m) ^
        t.join().unwrap();
, |: |9 K5 K  H4 T6 |# ?    }$ H; c6 b) ^. {' W
}7 ~2 Y7 p# d  \

7 B( m# [3 O, W( k, E3 c* N' U1 {</code></pre>4 Z- `% c/ X0 @1 I
<p>这里面隐藏了一个会造成死锁的陷阱,比如两个Worker, 在单个循环里面迭代所有Worker,再将终止信息发送给通道后,直接调用join,<br>
; G- z; n/ j/ L6 q我们预期是第一个worker要收到消息,并且等他执行完。当情况可能是第二个worker获取到了消息,第一个worker没有获取到,那接下来的join就会阻塞造成死锁。</p>
3 x7 `) R1 W( Y8 ^* a. Q" }$ p<p><strong>注意到没有,Worker是被包装在Option内的</strong>,这里有两个点需要注意</p>- w1 ~! v7 \+ n: P
<ol>! T+ v% h! Q4 z" [, k( @! E7 V$ [* G
<li>t.join 需要持有t的所有权</li>8 Z: y. n+ Z+ w! n' p
<li>在我们这种情况下,self.workers只能作为引用被for循环迭代。</li>- E6 f! J* W5 J, {+ L
</ol>
) _" x1 A8 q4 E* P' @6 g6 d<p>这里考虑让Worker持有<code>Option&lt;JoinHandle&lt;()&gt;&gt;</code>,后续可以通过在Option上调用take方法将Some变体的值移出来,并在原来的位置留下None变体。<br>
2 W. d' F, K& D' t" P: e% ~9 I换而言之,让运行中的worker持有Some的变体,清理worker时,可以使用None替换掉Some,从而让Worker失去可以运行的线程</p>
* w' u# c+ _& T, h2 x  k0 Y+ ?<pre><code>struct Worker where
+ q0 {2 F8 Y2 ?. @! j, I{5 Z% h3 W8 _+ ~; z  O
    _id: usize,
, Y( E/ P  A* O6 A( j& l    t: Option&lt;JoinHandle&lt;()&gt;&gt;,
0 w  @% d. o& j% J  F1 w! W+ p}" U, g& H8 M& o, j
</code></pre>
: k5 B' A* ]3 t/ a<h1 id="要点总结">要点总结</h1>
2 Q! u' X5 W7 l: t6 Y% U3 f<ul>
) L4 y9 C0 u; U3 t<li>Mutex依赖于生命周期管理锁的释放,使用的时候需要注意是否逾期持有锁</li>
3 j3 |& T. c" e4 O3 ^, p<li><code>Vec&lt;Option&lt;T&gt;&gt;</code> 可以解决某些情况下需要T所有权的场景</li>; q8 T& `% o& @- E  D% I7 D& n
</ul>
* L: g8 Y1 m7 ?- j" j. ^9 p5 {<h1 id="完整代码">完整代码</h1>
2 V0 H( C0 V/ K<pre><code>use std::thread::{self, JoinHandle};
' X& s/ v5 V% O* _! j, buse std::sync::{Arc, mpsc, Mutex};
$ T7 c& J0 S5 P' U7 A0 ~
- R- ^# S  M" z& l& \5 I" X2 J, s! T- W. Y: {- i: C, d! `" |2 W
type Job = Box&lt;dyn FnOnce() + 'static + Send&gt;;
( b% K/ ^# }# V3 D$ t, [- genum Message {
7 a1 A' t2 }$ f& _0 l# ~    ByeBye,( d9 H2 s6 j0 h2 s7 K9 ~
    NewJob(Job),
: a, E# x% t! A+ c}
+ g. Q, t) l8 i3 L
: ~3 }" ?7 @( E6 |% t& Sstruct Worker where) O6 G# C! u2 L7 q
{
  T+ s& t( _% o8 R% l+ i" z    _id: usize,8 ]/ o  ]) w2 K+ Q2 I
    t: Option&lt;JoinHandle&lt;()&gt;&gt;,& N& P* w( T; L( e, _
}
, A) Y* h7 z1 S4 C+ ~& H
' f2 M& T. N4 X7 a' yimpl Worker
7 _1 l: z7 K' k! }{/ ]4 ~# ?2 y2 s: U1 i4 V% {9 N# C4 g
    fn new(id: usize, receiver: Arc::&lt;Mutex&lt;mpsc::Receiver&lt;Message&gt;&gt;&gt;) -&gt; Worker {
  G% p5 L1 Q! q( I8 `+ V; K7 Z0 k' x        let t = thread::spawn( move || {! V: e+ @- Z1 d+ C: T
            loop {" E8 o1 w4 t" I9 m$ ]8 r
                let message = receiver.lock().unwrap().recv().unwrap();
$ L7 ^6 e$ P' V5 Y0 q1 g( T                match message {
. ^3 i1 V+ S4 z. a& |                    Message::NewJob(job) =&gt; {/ `7 r! i5 Q1 y) F7 v& M
                        println!("do job from worker[{}]", id);" u0 @9 S$ T, X
                        job();- c5 B3 I: r* U" Q
                    },6 g4 r- s; C' P4 X) u3 @5 o3 V; T
                    Message::ByeBye =&gt; {* _& R- O, b, Y. X' K) f- ?; h  j
                        println!("ByeBye from worker[{}]", id);
1 h0 c5 z/ [6 {$ Z) t                        break
4 J, v- k+ m8 G6 S# M/ ^7 ^                    },
: l* r) j1 k$ r                }  
9 c$ k9 ]/ e* Q/ e$ \# I. a% k0 k6 k            }, O# X$ [" W0 f
        });
" M8 _  k; f+ \1 U5 [) Y  H# a
6 [' s7 }, U+ |        Worker {3 O" x8 Q+ V' a9 H$ u# Q# Y
            _id: id,' C) o) G3 H: B  N
            t: Some(t),
6 p9 x2 F9 j" X4 n. h# \7 \        }
: }. u# A* C# b0 X5 `1 v, V    }5 w, b$ E/ ]# z% ~& Y
}* R4 F+ I3 v/ E  u: u, A$ t
+ B. \7 {, D# }" z) f
pub struct Pool {
! `2 B# y9 J* j4 n2 c    workers: Vec&lt;Worker&gt;,1 B% x8 ^. _6 N  t; [& J$ a
    max_workers: usize,8 y8 b5 n, L4 w. U  |. o( b
    sender: mpsc::Sender&lt;Message&gt;( w, K5 S4 e0 F8 H% E  o
}$ f9 k: \7 ^' X* L" k- z
7 ]4 F* ~# k4 G: f! y( Q
impl Pool where {
, H$ L) }- S: \! Z    pub fn new(max_workers: usize) -&gt; Pool {. h* }/ X+ X1 m) ^  i% W
        if max_workers == 0 {
, O7 T4 w4 Q/ O; i5 j; ?* R            panic!("max_workers must be greater than zero!")7 F+ A# y/ D: {, _. F
        }: ]  Z9 B5 l/ y  I
        let (tx, rx) = mpsc::channel();
6 J# N/ r9 b; T  c' C" F$ C
4 d0 L6 z. }: s: z0 ]        let mut workers = Vec::with_capacity(max_workers);# i, c$ g9 \; r, E
        let receiver = Arc::new(Mutex::new(rx));$ J5 m4 {4 n4 d" t9 a
        for i in 0..max_workers {
2 |$ X& p: _. d- Y2 X            workers.push(Worker::new(i, Arc::clone(&amp;receiver)));
" e' M1 _% ]' j        }
% ?5 w  u* @' Y) u% e
' Q; o% s7 o8 t1 k& v3 X$ x        Pool { workers: workers, max_workers: max_workers, sender: tx }7 }2 w" }0 ?  D* Y& [  F4 @5 Q+ I
    }/ m1 u/ f" D+ ?0 G. k+ [5 X! k- K$ C
    2 @9 `" }2 [" s1 U9 G
    pub fn execute&lt;F&gt;(&amp;self, f:F) where F: FnOnce() + 'static + Send
+ n$ w2 w: p: l6 h    {: K! c6 t6 C, {1 d; Q4 `6 Q

1 r$ V# R2 E% N        let job = Message::NewJob(Box::new(f));9 m3 V6 V+ I) _( }: Z( _
        self.sender.send(job).unwrap();( Y9 n# z' o/ E! L% H
    }  f4 m. C, g# W8 E* I
}% z+ \  B, Z/ {# x5 k

9 h" ~8 f( g; x. b3 H% C* N8 k2 ]impl Drop for Pool {$ g( _% i2 P. T* [1 `/ _3 r: z
    fn drop(&amp;mut self) {
7 f4 L, s1 i5 v! e        for _ in 0..self.max_workers {
, ^+ b$ y. s' Q$ S/ t& B            self.sender.send(Message::ByeBye).unwrap();
  \( {# T! ^5 b        }! ~: v* {6 f6 z) c2 R
        for w in self.workers {$ o! {5 ]3 [+ l# v& p* v$ y
            if let Some(t) = w.t.take() {
$ J6 @" O: w  A* {                t.join().unwrap();1 e$ ^, h& h* l! x6 l4 L, I
            }7 U( S/ t4 t! d: U2 f' j
        }, N0 f5 p1 z' Q2 V4 e
    }+ e9 b1 }# Y8 q) w5 S- g
}
2 U% G9 l: ~% E& P! i1 R/ t* z: ?* h' N# ]1 I
$ D; x& E3 h$ U$ s
#[cfg(test)]4 N* U, W% `  ?2 p7 ^1 c7 K. G
mod tests {
# C: a) H4 S: G- m1 I    use super::*;$ {4 X% x' j1 b7 d" w9 z6 N4 C( O
    #[test]! D  ~0 b; V8 I* @7 a, m. S
    fn it_works() {
/ a! B- Y6 y# ]! v0 z0 O' v        let p = Pool::new(4);* t8 i, K9 V0 q2 h, y; ?' G/ r* g
        p.execute(|| println!("do new job1"));+ o+ x: f2 S9 v+ @3 ]
        p.execute(|| println!("do new job2"));
. B9 D: X" B- `3 F  Y3 `. R+ K        p.execute(|| println!("do new job3"));3 v, O0 B+ k1 I0 T% F- {- q
        p.execute(|| println!("do new job4"));9 J- M- g) w2 z6 K  P$ r
    }! j% v$ z$ |- _" Z8 Q! b1 O- G! c9 n
}
( L- l/ O3 _" t" N; M$ s$ t; x</code></pre>) b7 b, A6 j; C' L; a! V2 M& ]

% h. _+ }" o5 s
回复

使用道具 举报

懒得打字嘛,点击右侧快捷回复 【右侧内容,后台自定义】
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

手机版|飞雪团队

GMT+8, 2025-12-9 02:54 , Processed in 0.067155 second(s), 22 queries , Gzip On.

Powered by Discuz! X3.4

Copyright © 2001-2021, Tencent Cloud.

快速回复 返回顶部 返回列表