飞雪团队

 找回密码
 立即注册
搜索
热搜: 活动 交友 discuz
查看: 14955|回复: 0

rust 实战 - 实现一个线程工作池 ThreadPool

[复制链接]

8114

主题

8202

帖子

2万

积分

管理员

Rank: 9Rank: 9Rank: 9

积分
26672
发表于 2022-2-12 14:35:42 | 显示全部楼层 |阅读模式
0 e+ X$ l9 l, F. c' k$ Q
<h1 id="如何实现一个线程池">如何实现一个线程池</h1>+ D( B8 o: i7 R: E+ Y1 C
<p>线程池:一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。 例如,对于计算密集型任务,线程数一般取cpu数量+2比较合适,线程数过多会导致额外的线程切换开销。</p>
) N1 b* f5 ]' c% i# }2 r<p>如何定义线程池Pool呢,首先最大线程数量肯定要作为线程池的一个属性,并且在new Pool时创建指定的线程。</p>
. _( j/ F/ |, r3 y" p6 i2 t' c; r<p>线程池Pool</p># U" a9 N5 c. Z' x( Y
<pre><code>pub struct Pool {6 r4 U/ W  J" S( i
  max_workers: usize, // 定义最大线程数/ H% e$ E* }. C3 [$ ~
}
' c5 l/ \$ U' V* h) N, d- B$ Z4 ~& M) L" L
impl Pool {
5 h$ Q/ t" n3 b8 v; `  fn new(max_workers: usize) -&gt; Pool {}
# |0 F. C) N& p7 s  fn execute&lt;F&gt;(&amp;self, f:F) where F: FnOnce() + 'static + Send {}
2 V! `, M* h( u& r  D}
# j+ w& E3 i2 H
7 d& |5 z3 L. `8 V</code></pre>
: p' Y6 t% C. p! u4 O' x/ G<p>用<code>execute</code>来执行任务,<code>F: FnOnce() + 'static + Send</code> 是使用thread::spawn线程执行需要满足的trait, 代表F是一个能在线程里执行的闭包函数。</p>
! \6 C! x! N2 e% m' v% B<p>另一点自然而然会想到在Pool添加一个线程数组, 这个线程数组就是用来执行任务的。比如<code>Vec&lt;Thread&gt;</code> balabala。这里的线程是活的,是一个个不断接受任务然后执行的实体。<br>$ Y* F9 F5 e6 q' r7 C2 M7 L/ P
可以看作在一个线程里不断执行获取任务并执行的Worker。</p>
- v0 L; Y; b5 T* |3 e/ ]/ H<pre><code>struct Worker where) L" U4 e, d+ K0 m: f* p" {
{
! ~# u  Z+ R0 f( A! B$ {! N    _id: usize, // worker 编号
" F( [# z& e0 _2 `4 D% ^# a}
: W' N- W2 G  I</code></pre>
9 Q. o+ K0 W/ d$ p' l  J<p>要怎么把任务发送给Worker执行呢?mpsc(multi producer single consumer) 多生产者单消费者可以满足我们的需求,<code>let (tx, rx) = mpsc::channel()</code> 可以获取到一对发送端和接收端。<br># ?' |7 a; L8 c$ k8 J# p' J- P
把发送端添加到Pool里面,把接收端添加到Worker里面。Pool通过channel将任务发送给多个worker消费执行。</p>
$ [# Q& _! l! a6 y<p><strong>这里有一点需要特别注意,channel的接收端receiver需要安全的在多个线程间共享</strong>,因此需要用<code>Arc&lt;Mutex::&lt;T&gt;&gt;</code>来包裹起来,也就是用锁来解决并发冲突。</p>
; w, |- f; X  k% w8 L; m8 N<p>Pool的完整定义</p>
0 u2 A4 _- S( h+ p/ u<pre><code>pub struct Pool {
7 i$ B/ X& {  t8 ]) D8 k! v5 O$ p6 Q    workers: Vec&lt;Worker&gt;," z. v. n% }+ e, M6 T" E
    max_workers: usize,
- S  K7 J8 @  B" K' Z, V+ B" r    sender: mpsc::Sender&lt;Message&gt;
: L& q* u4 s9 E5 ?9 R}
$ n9 M  @3 N. `: M2 ?# K  L</code></pre>
2 x: Z! g7 J/ C9 g<p>该是时候定义我们要发给Worker的消息Message了<br>' O* V; H# z, {) e( T$ Y
定义如下的枚举值</p>& ~# B" K3 J' A  g) ^9 Y( Z2 l5 A
<pre><code>type Job = Box&lt;dyn FnOnce() + 'static + Send&gt;;
+ o. ]' L5 m9 m/ Lenum Message {0 e( C0 k% a; x1 `) X
    ByeBye,7 r, m$ u* \* h9 s
    NewJob(Job),2 B1 X& C8 B' L/ S. b  |
}
% _6 i5 ?0 _2 @+ m</code></pre>1 f; y+ g7 b; t* X; z
<p>Job是一个要发送给Worker执行的闭包函数,这里ByeBye用来通知Worker可以终止当前的执行,退出线程。</p>
$ S% d- w7 g/ h<p>只剩下实现Worker和Pool的具体逻辑了。</p># Y0 L: u4 N. A9 v3 B. j) p/ ^: ]
<p>Worker的实现</p>
3 f6 n+ ^- E8 f$ o8 m. L& [<pre><code>impl Worker
) U* k/ [. e1 L1 t* k- e8 Q{  E- @. t2 l: D; X3 k* g' F3 ]7 i& u
    fn new(id: usize, receiver: Arc::&lt;Mutex&lt;mpsc::Receiver&lt;Message&gt;&gt;&gt;) -&gt; Worker {
! L/ j8 B& u9 d4 A        let t = thread::spawn( move || {
( i& h$ y' h6 g            loop {
# y; f# ^( F% d                let receiver = receiver.lock().unwrap();8 g7 Q% z2 G* q/ O. q
                let message=  receiver.recv().unwrap();
. Q* }% Z) G3 o                match message {0 S# T. N2 ^! V  }( ~: O4 I7 q
                    Message::NewJob(job) =&gt; {
  n: M5 d7 M- ~: s; `3 I                        println!("do job from worker[{}]", id);
. p5 m1 D! d7 h                        job();. K- I5 b& G  y  r
                    },- G+ h! ?( u  r
                    Message::ByeBye =&gt; {, h; F: S4 c! c* E) W
                        println!("ByeBye from worker[{}]", id);
7 J- V" I7 G% l& a! f+ ^5 ?/ Q                        break
/ G0 ^1 q3 l* A- h                    },
, d8 ?; o6 U: r7 D- C6 I6 T, q. S                }  
6 O) K& q! a+ L1 p, b2 Q" `+ X7 r            }
8 D( K! e$ [; h        });
( G) i4 e1 g. v
7 }  V% w2 U: K1 d' v! k( u        Worker {, Z# U; ~, g5 v: n
            _id: id,6 {6 L, b2 @( t$ |! F3 C5 B( a
            t: Some(t),* k! U# b% r& G
        }, |, O+ ], D( E0 T. |. Y5 l
    }0 u/ y7 P# _  D
}
$ D8 C" Z3 n) Q( ]- E) C</code></pre>
: r% U( y0 Y" e% ]! S/ v9 G, i- Y<p><strong>let message = receiver.lock().unwrap().recv().unwrap();</strong> 这里获取锁后从receiver获取到消息体,然后let message结束后rust的生命周期会自动释放掉锁。<br>4 p7 V; v: s: b0 E4 `: k
但如果写成</p>6 `9 j% `5 y2 E* T3 w
<pre><code>while let message = receiver.lock().unwrap().recv().unwrap() {2 j& V* \/ Q1 S+ X
};
" j8 c8 l# Y/ T0 d! t</code></pre>3 e- i  N' o" z  z( p% V, n
<p>while let 后面整个括号都是一个作用域,要在这个作用域结束后,锁才会释放,比上面let message要锁定久时间。<br>
6 j1 z. }* O1 b+ krust的mutex锁没有对应的unlock方法,由mutex的生命周期管理。</p>, _+ @3 ~; d* S
<p>我们给Pool实现<code>Drop</code> trait, 让Pool被销毁时,自动暂停掉worker线程的执行。</p>0 G" ?# @8 |4 R: O
<pre><code>impl Drop for Pool {+ Y6 d7 x) S1 Q! G8 Y4 ^. V
    fn drop(&amp;mut self) {( l, n3 z, D" [9 _) i
        for _ in 0..self.max_workers {, A7 \* }: L; Q; C# o
            self.sender.send(Message::ByeBye).unwrap();
. M' y0 v: s, N        }# C0 S( V3 Q1 q4 J
        for w in self.workers.iter_mut() {
2 l8 \# u8 w0 }5 ?            if let Some(t) = w.t.take() {
' ^9 \- e  F; }                t.join().unwrap();
* d# F) M- i6 ?  X9 a/ z6 B            }# ?1 p" k. c% t' B0 R" p
        }& k9 n( w9 Z: r) g
    }
( {# [# V1 c6 a- g, o& d- l}
5 X9 V8 a0 V3 z5 R5 S3 E' E
6 p* Z" ]! r) O& V</code></pre>
6 E8 ^5 Z8 H5 Y9 a8 ?<p><strong>drop方法里面用了两个循环</strong>,而不是在一个循环里做完两件事?</p>/ f# D. L1 K% q* h9 Z- F
<pre><code>for w in self.workers.iter_mut() {- I4 R4 a  Q# c
    if let Some(t) = w.t.take() {
7 j  f4 R: e  p' Y! w/ s        self.sender.send(Message::ByeBye).unwrap();) i: V. J8 D3 o7 p6 \8 s
        t.join().unwrap();8 Z0 f" n8 `/ r: H
    }3 e" ~8 d3 |" N  q2 T* N% G
}% O; w4 t" q( a
- h; _2 T, }- S) W0 B! ^
</code></pre>/ \1 ]3 C- ^& X7 h+ i. ?# U) ^
<p>这里面隐藏了一个会造成死锁的陷阱,比如两个Worker, 在单个循环里面迭代所有Worker,再将终止信息发送给通道后,直接调用join,<br>
  R/ A* n3 q7 q0 g  p; d8 B我们预期是第一个worker要收到消息,并且等他执行完。当情况可能是第二个worker获取到了消息,第一个worker没有获取到,那接下来的join就会阻塞造成死锁。</p>
$ m; l$ g& {- v) \9 f# u& Z<p><strong>注意到没有,Worker是被包装在Option内的</strong>,这里有两个点需要注意</p>5 x1 `, q9 Q* b
<ol>
8 d- ~; c: g+ E, _/ e/ C9 @<li>t.join 需要持有t的所有权</li>
1 r1 N5 k1 F, d! r  ^<li>在我们这种情况下,self.workers只能作为引用被for循环迭代。</li>" u8 x$ I8 \: y# E; }* j) ~5 r
</ol>
) q+ D$ F, v7 v+ Z- m, _/ K- r<p>这里考虑让Worker持有<code>Option&lt;JoinHandle&lt;()&gt;&gt;</code>,后续可以通过在Option上调用take方法将Some变体的值移出来,并在原来的位置留下None变体。<br>
! k- D1 m2 G6 v  Y换而言之,让运行中的worker持有Some的变体,清理worker时,可以使用None替换掉Some,从而让Worker失去可以运行的线程</p>  S! u, @# h  B- p
<pre><code>struct Worker where! ^2 u  \& g0 g- ?+ N
{, T8 s( X1 P! `# M  J
    _id: usize,
/ }1 V" U' z4 E/ W/ [! b  [    t: Option&lt;JoinHandle&lt;()&gt;&gt;,' `9 ?+ z% M' K6 B( r
}
  B4 B3 W& A+ _" T2 G. T9 d2 J</code></pre>
" N, k* R& U0 y3 L8 T8 P<h1 id="要点总结">要点总结</h1>
+ |& M$ f( _! t" O8 ]<ul>
7 z) J( O( P: I1 d0 ~, F6 i" K$ e<li>Mutex依赖于生命周期管理锁的释放,使用的时候需要注意是否逾期持有锁</li>5 W+ T: n; H3 [$ f+ q, }& R
<li><code>Vec&lt;Option&lt;T&gt;&gt;</code> 可以解决某些情况下需要T所有权的场景</li>
* w  X, T* @2 e; Z</ul>
( T& s( t9 L! q  [, L+ r<h1 id="完整代码">完整代码</h1>
3 p; V/ o, K( b. p+ J, `- n' z<pre><code>use std::thread::{self, JoinHandle};3 Y  y+ ~  V- y
use std::sync::{Arc, mpsc, Mutex};
1 m% f4 c4 Z9 p3 z4 N
8 m$ b& O: ?8 P# `4 i+ z8 R$ n' t) k! d8 i/ ~  S. S, V
type Job = Box&lt;dyn FnOnce() + 'static + Send&gt;;
( y# a9 e0 _. K( Kenum Message {) v4 o" b, u, @1 K* z* B
    ByeBye,
  y0 `! t5 F( O0 e7 y    NewJob(Job)," u/ [+ P0 d. s% h; r" N* B+ e
}
7 A/ g) m- Q# a0 T- |- p
, ?) I/ S  w* w: mstruct Worker where) s8 d; V% E8 m' F
{
" _; }( G" r, z4 q" t' M4 D+ \    _id: usize,/ H( H$ N" T  j: y9 {% u
    t: Option&lt;JoinHandle&lt;()&gt;&gt;,
+ |. R* s$ T) ^" N  K' ^! f}& R, |6 g' j* W5 T6 T( h
: `1 q9 d6 ]& \5 u- N! X7 M; Q
impl Worker" ]  ~2 H) v4 F8 _( @: S8 p
{" R4 d: g( L  J, m" K& M. X8 d
    fn new(id: usize, receiver: Arc::&lt;Mutex&lt;mpsc::Receiver&lt;Message&gt;&gt;&gt;) -&gt; Worker {+ }( J7 f, M+ @! Y: Y7 d
        let t = thread::spawn( move || {# o  I& h% h5 Q3 }" S3 V7 V
            loop {
/ u9 O. V; l. S+ t2 _; c" ?                let message = receiver.lock().unwrap().recv().unwrap();4 B3 m4 c! b. n' y; t. U5 y
                match message {
/ }0 M! I& k" }: q                    Message::NewJob(job) =&gt; {
2 Z' y- E: _* L! z+ u* ]                        println!("do job from worker[{}]", id);
- \* F; A& l- T. Q                        job();: s; m9 J& Q2 l9 n! W. @
                    },* c$ m! [/ V1 x* M3 N( J9 l% I- f  I
                    Message::ByeBye =&gt; {
3 C, L- M: t/ u8 R, V; g$ v6 ^                        println!("ByeBye from worker[{}]", id);. d8 j9 h. m) F: y8 P0 `3 M  {
                        break
2 \: ^9 J2 ~3 f7 Y5 k% n                    },
! k1 t4 Z( h$ s0 u' h1 R$ j5 ]                }  
+ u$ x9 O! r- Z" O) f1 @& J% j            }
3 A# r0 i9 T; L9 \, r  @$ N        });
1 G& `* j' |; j  p9 t
* ^0 d( G: O# n& p4 @2 p2 O        Worker {) u+ S& D2 C+ l
            _id: id,
: S- \9 j: V! Y* g- k7 e5 R            t: Some(t),
. _' g  X3 e6 @- T        }1 U7 B( E- f8 ]5 o. K
    }8 u$ h4 g# w5 Q) u1 ?; D* ?9 [$ W- e
}5 i$ [0 r+ u7 q" [0 ^1 O# R
) u4 r& |" Q" ~/ T# I: ?
pub struct Pool {
8 x, T6 o% C$ h# O    workers: Vec&lt;Worker&gt;,
1 N3 Y! b+ W% Q' y& J  s) V    max_workers: usize,- b: Q; f3 x& f$ ?* S
    sender: mpsc::Sender&lt;Message&gt;
. u2 ?; ?0 Y) m9 C+ t6 `% ?( Y( \}
0 `  P7 ?/ Y* v6 X1 B: `+ w# s
& h' o) P6 O3 {, A. x3 V: oimpl Pool where {- l- b: e$ C/ |& g6 {3 C& o& N1 n
    pub fn new(max_workers: usize) -&gt; Pool {
9 k! N4 V# m. ]" L8 S        if max_workers == 0 {
- j) T. v  Z1 `; w2 s- p4 M            panic!("max_workers must be greater than zero!")
" j# M! f0 i% N/ D        }
. K, d9 a. e* |$ R; k; @  V        let (tx, rx) = mpsc::channel();
! V- e9 p% S1 K: O1 u) a
# h4 y, _  s$ E        let mut workers = Vec::with_capacity(max_workers);
# Z$ P" ~0 o& _9 P) q" B        let receiver = Arc::new(Mutex::new(rx));/ W+ P; {/ y, T- c
        for i in 0..max_workers {. }* g+ c: _  ?6 y$ s, @. n
            workers.push(Worker::new(i, Arc::clone(&amp;receiver)));
- z3 p, H6 y6 d/ f( @. a        }2 b6 m1 n& o. a/ u" @( e
; F2 J& i* _; \- y4 T
        Pool { workers: workers, max_workers: max_workers, sender: tx }
1 T5 S1 c: {0 O( i    }: A! j' j: g# a% w' ]: u
   
0 p6 ~" W" @, L1 k0 A$ a    pub fn execute&lt;F&gt;(&amp;self, f:F) where F: FnOnce() + 'static + Send
& J, k1 G$ j3 _6 v    {( C/ v$ d/ b" g9 ]  |  Y
8 l3 ]1 z( A/ X  [
        let job = Message::NewJob(Box::new(f));
& x3 {  a; W7 I, ?) O        self.sender.send(job).unwrap();
( G8 O- b9 q$ S0 h' W    }/ V+ O( J! [3 N
}7 G9 S5 A/ [' K
& v" _- \! l  {  @& `3 N
impl Drop for Pool {' @' d' D& C+ R" ~$ C
    fn drop(&amp;mut self) {
7 j) v! l' m7 y2 F2 e' F        for _ in 0..self.max_workers {
: P- ^2 V$ s1 |            self.sender.send(Message::ByeBye).unwrap();& y. @* \* Y; {& s0 m8 P: B8 m
        }
* ^1 }) m) @* `1 [# n9 X( \+ Z9 v' ~        for w in self.workers {7 u3 e) m" j4 I* R! p; b
            if let Some(t) = w.t.take() {  h  \" e+ c- K+ }0 m' C, k0 H
                t.join().unwrap();
# C* E! s) \8 C/ s4 |            }
2 N7 F; V( I; @7 v        }0 p" M. h; v$ c+ s* j
    }( I# A& [: g; L5 f8 F1 Q! L# ?
}8 ^2 L' t: q, Y' I% P& h
! u/ I1 _' p: \: F6 ~' v6 X( i0 h; \5 l

5 A1 H; X5 r/ _- h" _#[cfg(test)]
( O" W) |# |8 F, r$ W) C' M5 \mod tests {5 U9 Z4 L) Y5 Z" n6 d! k
    use super::*;
8 Q" o" @4 m; R    #[test]
  X* M" l  n* U3 h5 ~    fn it_works() {: |$ y. e+ ?2 e* W
        let p = Pool::new(4);
- p# d: _/ N" _( y& c" j        p.execute(|| println!("do new job1"));. H4 s. y5 C) G2 j$ R& K' }6 ~
        p.execute(|| println!("do new job2"));
8 Z" c: g5 Y. x& h+ n        p.execute(|| println!("do new job3"));" p% G- x% v, F2 Y  G$ I/ V% H
        p.execute(|| println!("do new job4"));
, L6 D  V8 m  j9 k    }% \/ H, G" J, Y
}
6 U  K; c; j  z- q</code></pre>
* r6 d+ j: `& ?: r2 A! r3 y+ R; V  \' I$ d8 W& Q. O9 ?! Z. c
回复

使用道具 举报

懒得打字嘛,点击右侧快捷回复 【右侧内容,后台自定义】
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

手机版|飞雪团队

GMT+8, 2025-12-19 20:53 , Processed in 0.066249 second(s), 22 queries , Gzip On.

Powered by Discuz! X3.4

Copyright © 2001-2021, Tencent Cloud.

快速回复 返回顶部 返回列表