飞雪团队

 找回密码
 立即注册
搜索
热搜: 活动 交友 discuz
查看: 14810|回复: 0

rust 实战 - 实现一个线程工作池 ThreadPool

[复制链接]

8057

主题

8145

帖子

2万

积分

管理员

Rank: 9Rank: 9Rank: 9

积分
26501
发表于 2022-2-12 14:35:42 | 显示全部楼层 |阅读模式
6 F6 Z$ ^& s, m( T
<h1 id="如何实现一个线程池">如何实现一个线程池</h1>$ }2 a3 K) e. u2 Y: @
<p>线程池:一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。 例如,对于计算密集型任务,线程数一般取cpu数量+2比较合适,线程数过多会导致额外的线程切换开销。</p>' p# q  x3 j8 B! P) F
<p>如何定义线程池Pool呢,首先最大线程数量肯定要作为线程池的一个属性,并且在new Pool时创建指定的线程。</p>
6 ?& w* B. _. n9 p" V<p>线程池Pool</p>
$ H# x: X& }2 \& w0 s% p<pre><code>pub struct Pool {
' a0 j% u0 X( ^5 ^  max_workers: usize, // 定义最大线程数
2 V. x  Z( W* r}
" h) z( b$ b, _# y3 m  |! z; r+ [5 z5 H& C' U9 o
impl Pool {4 |1 \' s- T; I: a3 d4 S8 V
  fn new(max_workers: usize) -&gt; Pool {}7 v; w' j! j1 G, ~; m! ~9 k
  fn execute&lt;F&gt;(&amp;self, f:F) where F: FnOnce() + 'static + Send {}
/ u$ M: y" u4 t  I- ]: ]% g" m}
$ u( B! i$ h2 K- [/ `& k9 T& O" z7 o% s0 [
</code></pre>
9 I2 {+ o6 L6 q8 A<p>用<code>execute</code>来执行任务,<code>F: FnOnce() + 'static + Send</code> 是使用thread::spawn线程执行需要满足的trait, 代表F是一个能在线程里执行的闭包函数。</p>
8 y4 [9 M2 u' W<p>另一点自然而然会想到在Pool添加一个线程数组, 这个线程数组就是用来执行任务的。比如<code>Vec&lt;Thread&gt;</code> balabala。这里的线程是活的,是一个个不断接受任务然后执行的实体。<br>
% ]4 H- }6 N5 k2 ~- e可以看作在一个线程里不断执行获取任务并执行的Worker。</p>
! n- }% w5 S% Z* \, @/ h; F8 E<pre><code>struct Worker where
$ L, d) Z# V# p* e3 a, c9 }{
, Q: F3 u# z# t. ]    _id: usize, // worker 编号
$ E7 l. I8 L+ {}
) ~, O3 W) M0 P0 b4 j& Z3 u' [</code></pre>- k2 _  I! i6 g( v
<p>要怎么把任务发送给Worker执行呢?mpsc(multi producer single consumer) 多生产者单消费者可以满足我们的需求,<code>let (tx, rx) = mpsc::channel()</code> 可以获取到一对发送端和接收端。<br>9 q) }9 r2 i+ w! g
把发送端添加到Pool里面,把接收端添加到Worker里面。Pool通过channel将任务发送给多个worker消费执行。</p>
  N. T  x$ U) ^6 Y$ t, A<p><strong>这里有一点需要特别注意,channel的接收端receiver需要安全的在多个线程间共享</strong>,因此需要用<code>Arc&lt;Mutex::&lt;T&gt;&gt;</code>来包裹起来,也就是用锁来解决并发冲突。</p>! e9 G& E3 z8 W+ f# r; U
<p>Pool的完整定义</p>
0 h) h+ s9 b4 ~! _& [8 y8 Y<pre><code>pub struct Pool {
$ Z, R8 b  k. |* v    workers: Vec&lt;Worker&gt;,
1 g0 I7 K2 c* l+ k, b    max_workers: usize,
+ R7 z3 w# J' Q, J    sender: mpsc::Sender&lt;Message&gt;
- ?  q' D: R% y/ ]}* p- h: j( [( z" s' N+ I% X" K. d) V
</code></pre>: k6 q! t0 x, |/ ~2 G8 K8 a* H5 \
<p>该是时候定义我们要发给Worker的消息Message了<br>
: z& y( ~( G- r; R% |3 J定义如下的枚举值</p>
; K7 \0 ^- J$ D2 z4 H+ I0 j<pre><code>type Job = Box&lt;dyn FnOnce() + 'static + Send&gt;;% V! |# \% w; I
enum Message {" A- L& C# D1 H+ j( n8 v1 r
    ByeBye,; g9 P2 i- f* o
    NewJob(Job),8 q+ l& q2 ?* {( i( E2 G
}
* E4 \( m4 b  Q4 S4 L) P</code></pre>- O: O$ b* k% C8 p: P% X
<p>Job是一个要发送给Worker执行的闭包函数,这里ByeBye用来通知Worker可以终止当前的执行,退出线程。</p>+ Z! F. N# K: `6 T1 H3 S
<p>只剩下实现Worker和Pool的具体逻辑了。</p>. ^# X" \6 D. `/ _
<p>Worker的实现</p>% T1 w3 e, t6 C
<pre><code>impl Worker
6 B$ Y2 _5 P: ~3 C{
# i6 G* {' p7 N/ n& {+ D    fn new(id: usize, receiver: Arc::&lt;Mutex&lt;mpsc::Receiver&lt;Message&gt;&gt;&gt;) -&gt; Worker {8 q, X$ X' T7 s( Y% l$ c
        let t = thread::spawn( move || {1 |0 \; M, V1 L: a, [3 [% l* m
            loop {) j) p& X2 d! [
                let receiver = receiver.lock().unwrap();1 s5 r5 P' k" z( }* T
                let message=  receiver.recv().unwrap();
9 L$ Y: |$ Y5 w) U8 l7 z- S6 j                match message {0 P+ S3 ]" C' T& \0 v$ u
                    Message::NewJob(job) =&gt; {, e* K- I  v2 ~9 ~
                        println!("do job from worker[{}]", id);" E, k0 x7 B0 o  C2 G3 Z# X
                        job();) V. V, d* \- J6 d. J5 z. ?
                    },
7 _; h8 r+ J. U$ F/ N8 X                    Message::ByeBye =&gt; {
4 q" Z, \" b& T                        println!("ByeBye from worker[{}]", id);
& {' u. N  H& l3 b5 I! p% [                        break
0 n( ?( H- h0 O8 _; d! N                    },5 Q7 Z. W6 E5 W4 |
                }  + j0 C8 o% O3 I7 C
            }# C7 F" E- `; t: k: l) r& y
        });
/ @/ t, v% a! ^$ V
% F- D" P3 i. i1 B/ E        Worker {
6 ^# [8 d& c% ?            _id: id,
4 G3 m, @1 [: U2 E' U7 U            t: Some(t),
, |5 K( k  T3 v$ l- u! y. }; Z' |        }
7 R. Z; l: n1 ^  @    }; t2 T$ s1 C1 f8 ~$ Q
}
" Y: H+ p- u2 ?5 N</code></pre>
5 u$ \! [# d( e, ~/ Z<p><strong>let message = receiver.lock().unwrap().recv().unwrap();</strong> 这里获取锁后从receiver获取到消息体,然后let message结束后rust的生命周期会自动释放掉锁。<br>( f' H: y. B+ B5 n! l/ y
但如果写成</p>
5 Z! X: M+ e- ?3 u  H<pre><code>while let message = receiver.lock().unwrap().recv().unwrap() {
* O7 g( |# t. h( x1 h9 e/ o};
. k4 e; ?0 w9 v  A; X! O8 A</code></pre>
$ R& Z+ k$ R  j+ L4 `<p>while let 后面整个括号都是一个作用域,要在这个作用域结束后,锁才会释放,比上面let message要锁定久时间。<br>7 D8 g' ^( a9 D% q5 b
rust的mutex锁没有对应的unlock方法,由mutex的生命周期管理。</p>
5 d+ h3 y0 j# g0 E  N# F# Z<p>我们给Pool实现<code>Drop</code> trait, 让Pool被销毁时,自动暂停掉worker线程的执行。</p>
5 D! q( T8 A, b5 Q<pre><code>impl Drop for Pool {* `3 {% N1 K  _7 r, l$ x2 \9 V
    fn drop(&amp;mut self) {
! ~2 _0 N$ B2 p8 J        for _ in 0..self.max_workers {, |8 b% e# X$ ?, d" H. y
            self.sender.send(Message::ByeBye).unwrap();8 `3 i4 x' I7 B1 V/ s
        }7 M! o8 D/ w& n3 ^: [
        for w in self.workers.iter_mut() {
* v, A0 x. ]( Y            if let Some(t) = w.t.take() {  F2 a8 D: n4 Q8 l0 B
                t.join().unwrap();4 \6 e9 h5 V# |0 k8 G
            }" H. |0 G4 d: ^+ z2 t5 c  Y2 N  A
        }. {9 `7 \/ {7 f4 C
    }2 @" ?9 s9 L; @' G1 z5 B
}
5 s, k2 ^# c3 R- H
  d! I8 u- b& e& s- [</code></pre>: Z$ Z( L9 r. j1 x) ^: H  w
<p><strong>drop方法里面用了两个循环</strong>,而不是在一个循环里做完两件事?</p>
7 f: i) V5 s9 F: Q, `1 h<pre><code>for w in self.workers.iter_mut() {' m; f6 ?, n( j6 O5 T; @# [$ {
    if let Some(t) = w.t.take() {) \) P9 f+ p: L, y
        self.sender.send(Message::ByeBye).unwrap();; K, {9 B9 d; e/ c
        t.join().unwrap();
- k: S( o. ^  @; i- u* R    }% K5 j/ C, V- Y) H: m2 Y; I2 c/ Y
}3 \" d5 f# D3 ~: [

# F; ?8 f; v/ `+ q</code></pre>
4 l5 z4 x$ t' \7 x! H; {, a<p>这里面隐藏了一个会造成死锁的陷阱,比如两个Worker, 在单个循环里面迭代所有Worker,再将终止信息发送给通道后,直接调用join,<br>0 h9 h; {  v+ w+ U+ h; x
我们预期是第一个worker要收到消息,并且等他执行完。当情况可能是第二个worker获取到了消息,第一个worker没有获取到,那接下来的join就会阻塞造成死锁。</p>8 Y0 H5 |7 y# h; g- Y: H
<p><strong>注意到没有,Worker是被包装在Option内的</strong>,这里有两个点需要注意</p>" d3 x9 c& c: Z
<ol>
* o+ q7 [% u) s6 {$ K<li>t.join 需要持有t的所有权</li>2 P. ]: U: N; p5 f9 k' O5 N
<li>在我们这种情况下,self.workers只能作为引用被for循环迭代。</li>
, l4 E, P4 x' g: [2 b</ol>' e( w3 p, _- N% \7 h) I
<p>这里考虑让Worker持有<code>Option&lt;JoinHandle&lt;()&gt;&gt;</code>,后续可以通过在Option上调用take方法将Some变体的值移出来,并在原来的位置留下None变体。<br>
, g3 {$ A4 b, o# R/ I  [8 v换而言之,让运行中的worker持有Some的变体,清理worker时,可以使用None替换掉Some,从而让Worker失去可以运行的线程</p>" \0 g  m8 N) L- |
<pre><code>struct Worker where
/ e0 X/ w* |; p{
6 y4 x) _5 ^% i$ m, E0 N5 }1 }    _id: usize,  i) l, ?6 ?/ m3 k9 k/ F
    t: Option&lt;JoinHandle&lt;()&gt;&gt;,) `$ D/ F) I* `& F. Q
}
5 z6 P  E' M7 G8 |5 M( h! u</code></pre>0 }/ b' S4 d9 {8 Y9 Y
<h1 id="要点总结">要点总结</h1>
- A" ~3 A! P' T<ul>
) D* x; F& @( }3 V- k% [2 w<li>Mutex依赖于生命周期管理锁的释放,使用的时候需要注意是否逾期持有锁</li>  M0 X% U& q/ f! y, p8 T
<li><code>Vec&lt;Option&lt;T&gt;&gt;</code> 可以解决某些情况下需要T所有权的场景</li>
; F: }) B/ w; d9 p, M7 N</ul>
7 e4 o) _: V4 `% D: Y) V8 j0 v<h1 id="完整代码">完整代码</h1>
5 Z( B& o0 B$ f7 T. I<pre><code>use std::thread::{self, JoinHandle};6 A! d8 ^! ?" f% v3 V. b
use std::sync::{Arc, mpsc, Mutex};, P* j: F8 C' t$ y

# F' x: k( j& @
* C# J) J, B( G& l- Qtype Job = Box&lt;dyn FnOnce() + 'static + Send&gt;;
; ~- J" d0 J% F4 {; W. P1 K8 s+ tenum Message {, x0 h4 l# v2 G
    ByeBye,' E& E, O- U" i9 }" r  h/ t$ C
    NewJob(Job),
. s% Z$ Z& t$ Z. z; _. q}
8 A3 V: X* e& W+ l. K2 K
) C! O9 O+ P+ W/ Zstruct Worker where
# \2 t8 ]. F* K% K- q! L3 P) w{
: F  f$ C  K/ {. u    _id: usize,% }- ?9 Z; k7 c
    t: Option&lt;JoinHandle&lt;()&gt;&gt;,
; _# E) [. O' {* |4 t% m}7 C. ~: K7 m: O. g4 J4 N
. H0 T1 p) B% d1 r
impl Worker, c% n  o5 o( T
{
$ ]" Q4 _: P/ f0 R    fn new(id: usize, receiver: Arc::&lt;Mutex&lt;mpsc::Receiver&lt;Message&gt;&gt;&gt;) -&gt; Worker {
4 p# ?% Y. _- l$ F8 N+ [        let t = thread::spawn( move || {: W* h8 ~2 h/ U
            loop {
/ n  o' i9 n/ @% i0 \' ]                let message = receiver.lock().unwrap().recv().unwrap();3 Y1 \0 p- [7 @! r9 D/ F6 P. \
                match message {
- H5 y3 T( S/ S5 C. B                    Message::NewJob(job) =&gt; {$ L4 u; h: K! n% M/ X$ ^7 I
                        println!("do job from worker[{}]", id);
( p5 e9 b& b5 D/ ?; O  p/ {                        job();! A) g; |  H1 I; U
                    },
0 j1 \9 _# ?8 Z) J( ]                    Message::ByeBye =&gt; {
+ r6 H# V1 g  t/ E& I, Y                        println!("ByeBye from worker[{}]", id);
6 O3 h% n) M3 D/ ?, c8 t* V+ x4 V                        break; Q% x7 m1 Z4 }) q0 x+ i: s
                    },
+ e+ g. f# F" p4 \5 Z5 |$ ~                }  4 q, {# w, [3 d! d1 _
            }
/ A2 L& @7 ?  p9 D2 s4 t        });( l3 c# h8 h6 l! D: o

( [6 |" X+ z6 M: x; N        Worker {
$ w5 E/ A% I* M, x5 C5 f            _id: id,
) O: z$ B* h# V; y            t: Some(t),5 e3 ]. w5 ]6 i# J6 ~7 e
        }
* f- ?4 g5 k- ~5 ]4 y4 C. D7 \; k: L    }$ r' S2 D0 n$ S+ f) O5 j. ]# u
}$ r6 @8 J3 `8 [8 y9 t

+ k8 q- g; ?/ @# rpub struct Pool {* ^9 M. N: D) I
    workers: Vec&lt;Worker&gt;,
2 z0 u3 t9 l6 Y* `5 T( d    max_workers: usize,
* T! J1 X4 c+ E" \1 w& F    sender: mpsc::Sender&lt;Message&gt;; T2 W) ~4 C: N4 G  }
}
0 a' _8 U+ j: L) D+ D3 A0 N) ~# t7 i, L6 A! D' d" E
impl Pool where {
. J7 Z- M! C( n. W; J    pub fn new(max_workers: usize) -&gt; Pool {* G/ E# Z7 Y2 V5 [
        if max_workers == 0 {
+ S& w0 ]4 |! k% y( I            panic!("max_workers must be greater than zero!")% {1 L3 I& l( V  B' O
        }
0 P' Z1 \& R/ x% e2 U% R        let (tx, rx) = mpsc::channel();$ F7 z& L/ N) f5 t/ P- |: s  V
. w* I, ~. k, U8 {* V4 i
        let mut workers = Vec::with_capacity(max_workers);
0 b. q6 P' s3 d        let receiver = Arc::new(Mutex::new(rx));
8 ~: l2 b" D* E" O7 b3 y" Q, M        for i in 0..max_workers {
  j! C9 Y6 A+ w! Z, L            workers.push(Worker::new(i, Arc::clone(&amp;receiver)));% D4 F$ u) \( y5 c9 ?4 U1 H8 p4 K- u, n
        }
& E! {% H! l5 V4 m  v( F- d8 ?5 Y4 [  }2 J  Q$ M* f
        Pool { workers: workers, max_workers: max_workers, sender: tx }
7 [! U) s7 u7 f2 W  O, x    }& x( X8 p; t. }$ N- |6 c, c
    + G+ {8 H! O7 q4 [
    pub fn execute&lt;F&gt;(&amp;self, f:F) where F: FnOnce() + 'static + Send
2 {3 c( I1 N& I    {" n: j5 I1 _" \
! m/ ]: \8 T" W  s' b9 a. S
        let job = Message::NewJob(Box::new(f));
, ~  z( Q4 z2 ^: q- M$ O1 \8 V  x4 c        self.sender.send(job).unwrap();
8 R/ A1 a1 T: V" D! T! C    }
4 A8 ~* u- O; w+ G/ V: a& |- c" _}. f  a; P+ P; g! ]( ]9 B  Q

& f" o: H& j* ^1 U4 Timpl Drop for Pool {
1 _. C. ^# O" y) d& g- _7 t$ c    fn drop(&amp;mut self) {
( F! y" e. l9 b        for _ in 0..self.max_workers {1 j; u( V. a9 g3 e6 g( o' |
            self.sender.send(Message::ByeBye).unwrap();# O" G5 c& S/ \7 A( N% C  M6 h, J& Z3 l
        }
6 Y4 `1 ?) ~* N0 L9 x, T        for w in self.workers {: P. B0 @& m4 [9 ^6 ]. {
            if let Some(t) = w.t.take() {6 C4 W) T/ P" a8 X! E( r& N1 `- Q
                t.join().unwrap();* o* D$ v1 P2 O: d+ L# _% y
            }
5 x) N- X1 w4 h        }! c# ^& a$ a* m! A7 Y% R
    }! C) T  l' p4 p4 a! ?
}& h; z9 Q. i3 `/ y# f
6 }: Z# g% A1 `: j( O6 A

; |8 h! ?& k1 i#[cfg(test)]
# I7 n' A, @  Omod tests {, H, A( R8 b8 n5 R8 T
    use super::*;
/ V& b- C% a+ ^1 q  I2 _% d    #[test]
8 [, E! }( V9 Q6 u4 l9 l    fn it_works() {
9 \" H: Z& l+ m: p: `        let p = Pool::new(4);
& v" d) F, B/ B: m* E* j& w% k" e) \; n        p.execute(|| println!("do new job1"));. k9 X7 H) j4 M! Y& X0 w# b1 y
        p.execute(|| println!("do new job2"));3 @% D/ T8 P" ?6 A$ [
        p.execute(|| println!("do new job3"));1 S' M9 a- d5 i0 N5 M- K
        p.execute(|| println!("do new job4"));
1 e/ K8 E8 g% j1 V: ~+ u, H) A' y    }
0 {" h3 }6 x7 u}$ I$ Q) |# ~8 q* `9 r/ o
</code></pre>/ _7 p/ t' Q+ c: z
* V, _  h: y" X6 d% \
回复

使用道具 举报

懒得打字嘛,点击右侧快捷回复 【右侧内容,后台自定义】
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

手机版|飞雪团队

GMT+8, 2025-12-12 23:01 , Processed in 0.065821 second(s), 22 queries , Gzip On.

Powered by Discuz! X3.4

Copyright © 2001-2021, Tencent Cloud.

快速回复 返回顶部 返回列表