飞雪团队

 找回密码
 立即注册
搜索
热搜: 活动 交友 discuz
查看: 14633|回复: 0

rust 实战 - 实现一个线程工作池 ThreadPool

[复制链接]

8038

主题

8126

帖子

2万

积分

管理员

Rank: 9Rank: 9Rank: 9

积分
26444
发表于 2022-2-12 14:35:42 | 显示全部楼层 |阅读模式
9 t8 u1 d. g2 a
<h1 id="如何实现一个线程池">如何实现一个线程池</h1>
+ X2 ^& n+ ~% ?( J6 b<p>线程池:一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。 例如,对于计算密集型任务,线程数一般取cpu数量+2比较合适,线程数过多会导致额外的线程切换开销。</p>
& t; o: W  b  [- n6 r" \. W<p>如何定义线程池Pool呢,首先最大线程数量肯定要作为线程池的一个属性,并且在new Pool时创建指定的线程。</p>
3 P& F2 [' V0 ~5 Q8 h% Q0 M8 l<p>线程池Pool</p>1 g! p/ @3 d% T, @3 o$ S
<pre><code>pub struct Pool {$ m# H6 [% {# N" i
  max_workers: usize, // 定义最大线程数
! \: g# x4 T* F6 p; ~}
/ X9 F" K. f/ S. f/ h) w
- n( N- ]# y! |: nimpl Pool {
; S. d1 }+ ?: a" w  f" S$ s0 m  fn new(max_workers: usize) -&gt; Pool {}
) a7 o, W1 @. [$ T  [9 a  w3 [5 [  fn execute&lt;F&gt;(&amp;self, f:F) where F: FnOnce() + 'static + Send {}
2 ~% |$ N8 ?5 F9 f' W4 ]}1 t4 |* j: L. T* D" ^
: D0 w1 i/ @% V) Z$ S
</code></pre>2 Q+ |6 p5 b$ c- u. }2 M$ c0 e
<p>用<code>execute</code>来执行任务,<code>F: FnOnce() + 'static + Send</code> 是使用thread::spawn线程执行需要满足的trait, 代表F是一个能在线程里执行的闭包函数。</p>" V  U% N3 w# i6 y' Z
<p>另一点自然而然会想到在Pool添加一个线程数组, 这个线程数组就是用来执行任务的。比如<code>Vec&lt;Thread&gt;</code> balabala。这里的线程是活的,是一个个不断接受任务然后执行的实体。<br>
: X4 Z6 M; s# f# z& l可以看作在一个线程里不断执行获取任务并执行的Worker。</p>3 F3 x, [5 B( N2 f% S+ _
<pre><code>struct Worker where1 R; G- T- z$ U# `  i
{  Z# m1 q: F- o# g& n
    _id: usize, // worker 编号
. O% Q9 N& H* s" F6 k2 ?6 Z}7 B0 I, c) g. @7 d: k* b2 i9 m
</code></pre>4 j4 D) N  e9 H0 l" _
<p>要怎么把任务发送给Worker执行呢?mpsc(multi producer single consumer) 多生产者单消费者可以满足我们的需求,<code>let (tx, rx) = mpsc::channel()</code> 可以获取到一对发送端和接收端。<br>2 |9 a5 c% Z* E; h! @
把发送端添加到Pool里面,把接收端添加到Worker里面。Pool通过channel将任务发送给多个worker消费执行。</p>( T% w9 n& L  W4 V
<p><strong>这里有一点需要特别注意,channel的接收端receiver需要安全的在多个线程间共享</strong>,因此需要用<code>Arc&lt;Mutex::&lt;T&gt;&gt;</code>来包裹起来,也就是用锁来解决并发冲突。</p>: o1 Y' V. |1 v  c$ C
<p>Pool的完整定义</p>4 P# c$ W$ L+ n1 S) S6 C' ]1 }4 j2 O
<pre><code>pub struct Pool {+ N4 m$ h$ s" K
    workers: Vec&lt;Worker&gt;,
' a' i9 q, M" D' {/ x    max_workers: usize,. c/ C2 U! g, G1 W0 H, L
    sender: mpsc::Sender&lt;Message&gt;
. k3 E# i# B" H2 \$ Y. ?( f}/ m& h! Z7 K+ o* T
</code></pre>/ W: g4 M1 T: h. N% ~% K
<p>该是时候定义我们要发给Worker的消息Message了<br>
" E' W5 a' H+ p! m' o8 l) P! }  C定义如下的枚举值</p>
; l, Y, S  [! W+ t& R<pre><code>type Job = Box&lt;dyn FnOnce() + 'static + Send&gt;;9 f5 D# K. F8 X# g
enum Message {3 X: h4 K# |* r( d( v
    ByeBye,
9 N. U. d# I* `5 q) E    NewJob(Job),
, _$ v5 _* A. n}
" ]1 M+ t! G% d) v: t8 H</code></pre>4 m' `0 s+ |* x- R0 e" b, ^; w
<p>Job是一个要发送给Worker执行的闭包函数,这里ByeBye用来通知Worker可以终止当前的执行,退出线程。</p>  S. K+ D8 j0 k) H; p' G
<p>只剩下实现Worker和Pool的具体逻辑了。</p>
6 w! z' }' k9 F0 M% N5 U$ z1 r<p>Worker的实现</p>5 u+ q- g( S' n. J7 x9 A
<pre><code>impl Worker
7 U2 t4 v" B$ V( z& F+ Z{
# }( D  O0 C* X1 L" @! v5 w    fn new(id: usize, receiver: Arc::&lt;Mutex&lt;mpsc::Receiver&lt;Message&gt;&gt;&gt;) -&gt; Worker {
$ ~5 v5 N1 i4 A9 l, T6 W  c$ {        let t = thread::spawn( move || {
. m, Y; g+ n' [- k" [            loop {' b3 [/ _1 a  q* C2 ?8 B+ Q8 t; i
                let receiver = receiver.lock().unwrap();5 b! {# X" a! C. q0 d
                let message=  receiver.recv().unwrap();+ m( `; C( ~% i$ W* Y+ P
                match message {
5 ^; w) @# i: g( |% J7 z" Y                    Message::NewJob(job) =&gt; {
" G5 ^3 b$ ~( A" ~                        println!("do job from worker[{}]", id);
# X" D) G. u3 v" ?, [1 R! [8 r                        job();- D+ H7 h3 H( a0 r* h; O
                    },
5 Z) Y/ ]8 B/ n0 E  Z                    Message::ByeBye =&gt; {1 M% i4 h1 K3 M2 D
                        println!("ByeBye from worker[{}]", id);2 H) c: D/ G1 `+ C9 j
                        break
5 t, T: U0 f- N& a0 a& @                    },! g* z- h  Q4 H+ Z# W
                }  ( q2 z5 Q/ O! H- t4 o  I
            }* ~' f9 O! Z% C
        });
4 M$ F! R& l7 m- k  K& G% w+ ^, F+ C7 l! h+ p: ?# J3 k0 v' A
        Worker {. o% S0 |0 m, M5 d1 u/ o
            _id: id,
& x8 S' ~/ A1 b- _! ]- `) S            t: Some(t),
! v8 E- k- Y/ t3 P' V4 b$ v        }3 o! M8 c* o" W
    }
' ~* _, v" M  h- }- y1 u}
) e" D# E! M+ D/ R0 r% z9 |2 J</code></pre>9 n5 n; ^; H2 H+ u: ^8 X
<p><strong>let message = receiver.lock().unwrap().recv().unwrap();</strong> 这里获取锁后从receiver获取到消息体,然后let message结束后rust的生命周期会自动释放掉锁。<br>
0 L$ e" T, Q" i! K$ v) l  p$ H但如果写成</p>+ M$ [. G8 u3 s
<pre><code>while let message = receiver.lock().unwrap().recv().unwrap() {
2 a- `' D6 I# R8 }  Y; e7 O};
8 ~: I+ {; g2 K$ w" J' R4 ^</code></pre>+ d; ?  A$ a' k, O4 |( _5 Y$ T- @
<p>while let 后面整个括号都是一个作用域,要在这个作用域结束后,锁才会释放,比上面let message要锁定久时间。<br>7 V! w# M0 \; {6 m5 }  d; K
rust的mutex锁没有对应的unlock方法,由mutex的生命周期管理。</p>9 g' k; f# L" R0 j
<p>我们给Pool实现<code>Drop</code> trait, 让Pool被销毁时,自动暂停掉worker线程的执行。</p>6 V# i* I0 m' M3 n. u- t2 c
<pre><code>impl Drop for Pool {% A0 A0 t! ?" s; |1 a
    fn drop(&amp;mut self) {
& W+ T) p" {1 x3 X' f        for _ in 0..self.max_workers {
5 S8 D4 q9 f6 ?* z) t            self.sender.send(Message::ByeBye).unwrap();7 d0 p$ n. j1 M  f9 g/ G
        }3 V2 \8 u3 K' ^5 [, |
        for w in self.workers.iter_mut() {
5 Z* N  |7 P$ Z6 f* z            if let Some(t) = w.t.take() {- h  s: W' J6 M% J5 z& f/ n( p( X
                t.join().unwrap();) M. A$ h, b! r" ^) k
            }
* ]9 H: E5 s& J        }
) V; h8 D/ V3 B' b    }6 y8 |3 |* f. a# v$ Q( v8 w- T
}
; F8 x+ @" i) v& X) ?' C: d+ J/ }7 C6 R6 W. h; O6 v+ v
</code></pre>7 L) `# Q" }# X7 d' j6 f7 W
<p><strong>drop方法里面用了两个循环</strong>,而不是在一个循环里做完两件事?</p>
* e! i& c6 ~) P# @- [! _; ~<pre><code>for w in self.workers.iter_mut() {6 ^7 j3 O& A4 N" l' x" e0 l1 \
    if let Some(t) = w.t.take() {
# R8 |3 X" L2 O( k        self.sender.send(Message::ByeBye).unwrap();
  e/ p' C/ h% N* F6 r        t.join().unwrap();4 S: h  |+ g: s$ a. \9 f
    }9 N- f' Y/ T$ e: s: T9 W
}& j9 A5 p" @2 M/ o1 h& H

/ t3 s0 Y! g5 x& E</code></pre>, D! z' D' Q1 K# @5 o1 s7 Y# X7 w
<p>这里面隐藏了一个会造成死锁的陷阱,比如两个Worker, 在单个循环里面迭代所有Worker,再将终止信息发送给通道后,直接调用join,<br>
6 o% c  m$ ^1 T; z& H( k我们预期是第一个worker要收到消息,并且等他执行完。当情况可能是第二个worker获取到了消息,第一个worker没有获取到,那接下来的join就会阻塞造成死锁。</p>
$ E1 {3 H4 _( K3 ~<p><strong>注意到没有,Worker是被包装在Option内的</strong>,这里有两个点需要注意</p>, z# [8 d. h- n9 ?4 G
<ol>/ F5 T4 \# P9 Z
<li>t.join 需要持有t的所有权</li>! `4 I$ F, I! E6 P( ~7 H( {0 m
<li>在我们这种情况下,self.workers只能作为引用被for循环迭代。</li>- h  C" Z0 j) |8 M8 Z" C
</ol>0 W7 \/ M! U: ~! c
<p>这里考虑让Worker持有<code>Option&lt;JoinHandle&lt;()&gt;&gt;</code>,后续可以通过在Option上调用take方法将Some变体的值移出来,并在原来的位置留下None变体。<br>
/ Q9 J0 Z8 K& _) F* c, t$ f2 N换而言之,让运行中的worker持有Some的变体,清理worker时,可以使用None替换掉Some,从而让Worker失去可以运行的线程</p>8 V9 j3 Z+ {5 S0 l5 c9 `7 A
<pre><code>struct Worker where
, i9 b; A- J5 s+ F' s{( ?5 M- ^% N% H8 U+ I
    _id: usize,6 ?7 q2 b3 u3 c  ~7 B
    t: Option&lt;JoinHandle&lt;()&gt;&gt;,
2 F# g+ v9 W% C2 @+ L7 f% |! N}2 ~! z3 W% T8 D% \1 n
</code></pre>
( A+ o2 i. O5 k9 {" i. [<h1 id="要点总结">要点总结</h1>6 ^3 X, r. j* M7 U, N' _  a
<ul>4 Z' F, F; p6 E- E% f
<li>Mutex依赖于生命周期管理锁的释放,使用的时候需要注意是否逾期持有锁</li>
7 \+ h# H8 e1 d8 g  {8 R: ^; K<li><code>Vec&lt;Option&lt;T&gt;&gt;</code> 可以解决某些情况下需要T所有权的场景</li>! g" ?% h: s1 h/ a- C" R) |
</ul>* Q2 ^$ E8 _7 u9 `
<h1 id="完整代码">完整代码</h1>) C4 K3 d' ^3 U- C/ ?: K
<pre><code>use std::thread::{self, JoinHandle};
  R3 O6 x" l2 z) K8 D0 _0 }use std::sync::{Arc, mpsc, Mutex};. {7 m7 ^* I6 {+ E9 C# j; r% L3 ~7 X
4 ?0 i3 _* n) V) b/ q* c( g
- I+ K& |1 ~  N/ ~7 ?  {
type Job = Box&lt;dyn FnOnce() + 'static + Send&gt;;
! Y. ^' ]" J; C3 y! j, `enum Message {0 N- {4 k; z' l& X- j4 f% d
    ByeBye,
8 V! P9 C1 s2 c1 i+ J5 V5 t    NewJob(Job),
0 P9 a: M' y- N4 [$ |# g}7 g  z+ D0 L2 l1 R- Z$ g$ p/ p
. H$ C$ }# e+ z& b
struct Worker where
1 q" L/ ^. ^/ E1 w- M/ N3 v{% s, t( \( q9 w7 @5 k1 i
    _id: usize,
# D. h! y- `& N/ s& ^    t: Option&lt;JoinHandle&lt;()&gt;&gt;,
4 d& h# B) N% T! w/ d9 D* w# o}
7 c  N& d+ v+ b. [$ x/ L! Y
" f9 L0 n) c7 M% W6 u5 zimpl Worker
5 [9 z  f9 J$ n( @+ }5 @{# X/ h% y3 Y' t  J5 `, c; [+ e3 z
    fn new(id: usize, receiver: Arc::&lt;Mutex&lt;mpsc::Receiver&lt;Message&gt;&gt;&gt;) -&gt; Worker {
! w% Y7 Z% N9 \, b        let t = thread::spawn( move || {8 B+ X+ H. M* d
            loop {
( X& V2 r+ v' U. ]! [$ Q) R                let message = receiver.lock().unwrap().recv().unwrap();4 A5 |& J( `  M5 B( @; q  L* {6 K( B
                match message {0 e1 R% B1 Y' a
                    Message::NewJob(job) =&gt; {( y7 L( j# e& q$ g* D; z
                        println!("do job from worker[{}]", id);
+ _% p: g) S- X9 p& D9 c                        job();& F7 R6 Y- o8 X: ?& u
                    },
0 f. U2 Z8 k9 \  h6 m/ K: Q* p                    Message::ByeBye =&gt; {
3 b& W! Q# M1 F, L  D9 n                        println!("ByeBye from worker[{}]", id);
% Y  h) p$ ^7 U* N: c8 @4 s- y$ F                        break
' }* k9 G1 q$ Q2 k( `' J                    },
/ e7 F! s$ \$ P4 L# f                }  
7 [' {. s' h% W' _- G% w1 A            }
& G: b9 c1 ^1 a2 n' {        });
0 M- w6 L& a. z& n" H1 [. o% Q9 j+ R1 D  y
        Worker {
; E0 {6 q+ }" R( Z$ {2 g) k            _id: id,
) b0 O9 |; V/ A            t: Some(t),+ f+ Y0 L; n5 }; s/ X3 C% b$ f
        }
. [* H1 C" V3 |+ S7 U% X9 e    }
5 k; Q4 _" }  F6 w% H, e}8 m: c! P; V. a( i8 w

# W8 H) {& h6 ?" upub struct Pool {/ k7 n6 a0 h$ W5 ~, ?/ N0 O1 L
    workers: Vec&lt;Worker&gt;,
* s9 O6 O* D( f  }    max_workers: usize,! Z6 v& r1 [, P, g
    sender: mpsc::Sender&lt;Message&gt;
: A( Z7 r# t3 k. C/ }. Y8 O}
8 l: Y  y& m& Y: U( \$ |! ~, D- o$ k( r9 k
impl Pool where {
. w* R6 B5 y6 d    pub fn new(max_workers: usize) -&gt; Pool {+ K  D6 U6 I9 ?" O" R; I" `) a
        if max_workers == 0 {
$ F, ~" c9 J& d* T  `% y" i. a            panic!("max_workers must be greater than zero!")2 s/ `! A9 a) I
        }; j9 }9 E* ^5 k9 s7 b& }
        let (tx, rx) = mpsc::channel();9 \1 P% p9 L, ]9 G% q
; J- d8 l: N) V6 S. [8 o
        let mut workers = Vec::with_capacity(max_workers);
, l- B" P3 W& l: P7 t% t+ u- F        let receiver = Arc::new(Mutex::new(rx));
0 z' P, R# n% j- K! X* _0 X7 d        for i in 0..max_workers {" [# t4 Z" x3 r9 V0 ]2 _) `
            workers.push(Worker::new(i, Arc::clone(&amp;receiver)));- L2 A2 ^0 M: q' {- v3 l) W# O
        }' e, G3 U7 F+ n
4 E0 O  V4 ?2 C' O, L+ o
        Pool { workers: workers, max_workers: max_workers, sender: tx }
4 m3 j$ U/ M& l1 P3 r    }. ~' V: S# I, n: k
   
% ~1 A# |4 m3 R' t; f! Z/ |    pub fn execute&lt;F&gt;(&amp;self, f:F) where F: FnOnce() + 'static + Send
7 y/ p, L6 h3 h: {  t    {
! r1 h* t/ c% f& t$ E2 z& @+ k, U: o5 H5 y
        let job = Message::NewJob(Box::new(f));! c7 N( J4 K( q9 c3 v! j1 @
        self.sender.send(job).unwrap();9 `; x9 \4 x6 Z& f1 a
    }
# `: O$ Q8 y. t}6 A/ s( i% R. l6 B: C2 L, H* z) U
( p$ C- u! w4 G
impl Drop for Pool {
& B. q0 n4 K% X! [( q    fn drop(&amp;mut self) {  u8 _/ `3 W( k5 z; d
        for _ in 0..self.max_workers {5 }4 g7 m" {, Q0 n( z- l
            self.sender.send(Message::ByeBye).unwrap();
2 U. v% m' i! w4 {- a2 ]        }
) X) E, T1 J; l! F        for w in self.workers {
" S. K- n% E% K# o  s            if let Some(t) = w.t.take() {
6 G9 c# ~/ h4 r                t.join().unwrap();
) f  A) ?% G+ h1 F            }
7 p4 e4 ?) H4 D0 A        }
' z$ K8 F, O$ Y1 C$ p' z1 ?    }0 F7 z2 Q! z# o1 n0 p
}9 P. P5 s' C& F, j( N. T
5 ^% J4 j6 e, r& M3 ~

+ q, P$ S1 s; F: ?# N) ?$ |* h6 j#[cfg(test)]
, J9 E0 _4 n9 s- w( h% wmod tests {
/ D) x' X: G! F3 O    use super::*;) v) r5 l, Q" |3 h2 O: N
    #[test]
; `( M1 S& L1 s3 g    fn it_works() {/ U5 J% k  ?8 l% r0 ^9 Z
        let p = Pool::new(4);+ A( d! U6 n% U/ C9 X, A
        p.execute(|| println!("do new job1"));% A: a2 v/ O4 C1 G- d
        p.execute(|| println!("do new job2"));) C% b+ _1 x1 h! z* m
        p.execute(|| println!("do new job3"));
) ~2 X' a( _2 `5 s+ V        p.execute(|| println!("do new job4"));1 X& Q7 ]1 j2 n8 }) t( X0 X
    }/ Z4 Z" i7 ?4 B, \& D, v
}; L; b+ M6 S+ j4 }* J7 q, I
</code></pre>$ g2 x3 r$ }1 C7 h2 R

( F; w9 D" R9 |, L
回复

使用道具 举报

懒得打字嘛,点击右侧快捷回复 【右侧内容,后台自定义】
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

手机版|飞雪团队

GMT+8, 2025-12-6 00:17 , Processed in 0.063830 second(s), 22 queries , Gzip On.

Powered by Discuz! X3.4

Copyright © 2001-2021, Tencent Cloud.

快速回复 返回顶部 返回列表