飞雪团队

 找回密码
 立即注册
搜索
热搜: 活动 交友 discuz
查看: 9841|回复: 0

rust 实战 - 实现一个线程工作池 ThreadPool

[复制链接]

6831

主题

6919

帖子

2万

积分

管理员

Rank: 9Rank: 9Rank: 9

积分
22823
发表于 2022-2-12 14:35:42 | 显示全部楼层 |阅读模式
3 A, Y2 C8 X1 p: {( G
<h1 id="如何实现一个线程池">如何实现一个线程池</h1>
) Z/ [/ V. g! K$ p) C. z<p>线程池:一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。 例如,对于计算密集型任务,线程数一般取cpu数量+2比较合适,线程数过多会导致额外的线程切换开销。</p>
5 r( |9 c( o1 g- x, t6 G6 h<p>如何定义线程池Pool呢,首先最大线程数量肯定要作为线程池的一个属性,并且在new Pool时创建指定的线程。</p>$ D8 h: ~5 `% F' q. I3 i" q
<p>线程池Pool</p>
9 X* G- L, d( @) ~8 c$ m<pre><code>pub struct Pool {
2 X' y% y7 z( K/ ], _7 t  max_workers: usize, // 定义最大线程数
' t+ |' T0 W8 f5 J) g' b. c}, o0 }  F2 I) T. p- C$ Y  c  i- k

* c) Q4 m4 s) V2 V+ Rimpl Pool {
/ p6 H1 B2 I* ^- R' [  fn new(max_workers: usize) -&gt; Pool {}
$ \- q# l4 Y3 ~0 }5 u/ M  fn execute&lt;F&gt;(&amp;self, f:F) where F: FnOnce() + 'static + Send {}' S3 h! o& G# i- m
}
7 ~# L+ l1 ^, D5 B1 Z6 z% W
0 i+ }$ F0 r. l* ~, u6 U</code></pre>1 D4 r& u( I* `: V% U  C2 J
<p>用<code>execute</code>来执行任务,<code>F: FnOnce() + 'static + Send</code> 是使用thread::spawn线程执行需要满足的trait, 代表F是一个能在线程里执行的闭包函数。</p>; U  A* L0 u( d6 ?4 k0 L
<p>另一点自然而然会想到在Pool添加一个线程数组, 这个线程数组就是用来执行任务的。比如<code>Vec&lt;Thread&gt;</code> balabala。这里的线程是活的,是一个个不断接受任务然后执行的实体。<br>" M; T: u! r9 X. l# u: F
可以看作在一个线程里不断执行获取任务并执行的Worker。</p>9 j$ I; E* m5 Y2 p) U8 l
<pre><code>struct Worker where! k8 j3 Q0 T+ T" a) ^
{
( n( D* H4 A4 E1 P    _id: usize, // worker 编号, h6 O( a; O, O  K. M. K& E' Q
}+ a- X4 r0 W: N2 ?6 [" t
</code></pre>" f# K/ D1 F: p, H/ A
<p>要怎么把任务发送给Worker执行呢?mpsc(multi producer single consumer) 多生产者单消费者可以满足我们的需求,<code>let (tx, rx) = mpsc::channel()</code> 可以获取到一对发送端和接收端。<br>4 i& G6 K7 d' O- X8 Q! B' {' ~! @
把发送端添加到Pool里面,把接收端添加到Worker里面。Pool通过channel将任务发送给多个worker消费执行。</p>
6 G- S# X  s( o+ p8 @) \& Y3 Y<p><strong>这里有一点需要特别注意,channel的接收端receiver需要安全的在多个线程间共享</strong>,因此需要用<code>Arc&lt;Mutex::&lt;T&gt;&gt;</code>来包裹起来,也就是用锁来解决并发冲突。</p>
4 B6 G* @0 W- ?. L<p>Pool的完整定义</p>
# N) L9 Q7 D* ^* o5 Q2 Y3 z<pre><code>pub struct Pool {
2 K8 q+ [+ b1 x0 d4 ?    workers: Vec&lt;Worker&gt;,
8 J$ r( V: A; f' d4 \4 X    max_workers: usize,. u% ~- f; q/ j  f: c- ]
    sender: mpsc::Sender&lt;Message&gt;1 P" [, I. v1 D2 ?
}
: J% b! J  Q8 ^6 o7 V</code></pre>$ R0 |2 T; D$ ]9 u4 u
<p>该是时候定义我们要发给Worker的消息Message了<br>( X# D1 w, h0 t  t' X: _
定义如下的枚举值</p>$ ~; `, y6 F5 y8 |; H4 k' Z! r
<pre><code>type Job = Box&lt;dyn FnOnce() + 'static + Send&gt;;
! |) L3 s- e- y. ~2 henum Message {
; {& y( a  _; J8 O! |* l% l    ByeBye,# D$ W. Y- C) M3 v- M/ P4 [
    NewJob(Job),' y( @/ L9 W! O8 T4 d" x! h
}
& |  |- t* k* K# S9 k5 S' H" W</code></pre>
/ a( ~1 ?  V! L) M8 t7 r; {<p>Job是一个要发送给Worker执行的闭包函数,这里ByeBye用来通知Worker可以终止当前的执行,退出线程。</p>- ^" T1 @+ Z4 ^2 j* f6 o* D0 U
<p>只剩下实现Worker和Pool的具体逻辑了。</p>
$ @/ M- ]  f, _( ~) x2 l! U<p>Worker的实现</p>3 L4 v. O+ C0 y; O
<pre><code>impl Worker5 R5 Z4 D! l9 i
{
7 R0 x& u8 W" V" E$ O2 P    fn new(id: usize, receiver: Arc::&lt;Mutex&lt;mpsc::Receiver&lt;Message&gt;&gt;&gt;) -&gt; Worker {0 Y$ t6 J* O' ?6 G$ \, u8 X: T3 V
        let t = thread::spawn( move || {
1 N- U9 I* i( L2 O3 _& n7 H6 |            loop {
. q. d6 V6 ]4 `: R" J                let receiver = receiver.lock().unwrap();0 v3 e+ n0 h# D
                let message=  receiver.recv().unwrap();
2 q, Z+ p3 H9 k+ A. {8 y- q                match message {
9 f% Y, w5 h9 e. d3 R4 M' l9 z$ a                    Message::NewJob(job) =&gt; {
: p3 G) _( m$ P0 ^' r: \* Q" m( {( f                        println!("do job from worker[{}]", id);5 H' ?. X8 }# h5 [: b0 U! j
                        job();
3 E$ L+ S$ W- O; `3 Z                    },
* y/ l7 @  u# J4 ]. A                    Message::ByeBye =&gt; {
4 t9 X, ]$ r: I6 |1 j( o! n; @                        println!("ByeBye from worker[{}]", id);
% k6 x- T2 o" t: ~                        break
# z" n" d/ K; m# p/ n1 T                    },
  g) J- S' a' G/ `                }  9 G+ @' U6 I' W9 y7 z; L
            }& h3 J% d8 h3 x1 P" x( f6 u
        });7 }* W1 d& V7 |. b0 i7 ^$ O
; c  u2 I! e, }3 l4 d2 M! L
        Worker {
. i* M, p! N, U            _id: id,/ U6 w4 Z2 q5 N4 Q
            t: Some(t),
, {; n; j& A5 S        }2 s6 Y( I; q5 \! W  f/ k' z
    }
; H* O# c1 |% }' }3 {}7 M8 Q# c6 A7 ]: _% b
</code></pre>* Q5 K2 [. l! l- Y
<p><strong>let message = receiver.lock().unwrap().recv().unwrap();</strong> 这里获取锁后从receiver获取到消息体,然后let message结束后rust的生命周期会自动释放掉锁。<br>: j+ a6 g- K! n1 K
但如果写成</p>: n) R: m# ]: x# z% T8 n* A8 v* |, @
<pre><code>while let message = receiver.lock().unwrap().recv().unwrap() {% V6 ^7 k, i4 `9 o* V( ~% B$ _
};
0 H) m( ?) d& z2 S9 F( u0 h0 `</code></pre>
: ]3 n1 O/ I% D<p>while let 后面整个括号都是一个作用域,要在这个作用域结束后,锁才会释放,比上面let message要锁定久时间。<br>, T' z3 i, _% u4 C! H1 A8 s
rust的mutex锁没有对应的unlock方法,由mutex的生命周期管理。</p>* k$ z! P3 O' a1 D) s: i5 G
<p>我们给Pool实现<code>Drop</code> trait, 让Pool被销毁时,自动暂停掉worker线程的执行。</p>
/ i8 ]+ Z- p* j9 \) p1 o( E3 v! {<pre><code>impl Drop for Pool {5 U" B' p+ H' R! X) G! Z4 B; ^# g
    fn drop(&amp;mut self) {
" t, ^0 ]0 I  B' ~# S        for _ in 0..self.max_workers {
, s5 W$ y2 T% q  U            self.sender.send(Message::ByeBye).unwrap();
3 E$ Y- v  [8 X- [* h" _        }
: S! V: d5 v- b        for w in self.workers.iter_mut() {. I5 u" w0 E( o7 m. T
            if let Some(t) = w.t.take() {
. ?7 I- d4 O6 Y" G& Z8 `; P                t.join().unwrap();% G9 x! A$ F! n$ f3 e. i0 z
            }
6 r( M" d" E6 T( X        }  l3 u, E+ c7 T
    }
, C/ ^# T3 i$ X2 c+ {& f/ y}5 e% N7 R. O9 a/ S3 V! g) X" e$ G0 x

- ?/ ]! R, s( A</code></pre>$ ?! ~' V* ?% {6 |/ V6 C8 i
<p><strong>drop方法里面用了两个循环</strong>,而不是在一个循环里做完两件事?</p>
# Z) ^; N; Z% d; |  e' Z5 m<pre><code>for w in self.workers.iter_mut() {  C3 c/ G; H6 W* v! ?& i0 r8 ^
    if let Some(t) = w.t.take() {, z) n' s' }! g! o" k" x3 Y
        self.sender.send(Message::ByeBye).unwrap();: }6 F& u3 e1 F0 U, T! |: e
        t.join().unwrap();) y6 A+ a0 a# ^3 Z. s. Y# H9 ~
    }
6 @, k' \0 _4 M+ i1 P' N}; U4 [2 t3 c, A  P) L
5 w" T% T" S+ Q& x# l
</code></pre>
4 N! O* c0 M( Y, @4 g* n5 n<p>这里面隐藏了一个会造成死锁的陷阱,比如两个Worker, 在单个循环里面迭代所有Worker,再将终止信息发送给通道后,直接调用join,<br>: H6 D8 L& e* [# |- ?
我们预期是第一个worker要收到消息,并且等他执行完。当情况可能是第二个worker获取到了消息,第一个worker没有获取到,那接下来的join就会阻塞造成死锁。</p>
- W4 J, W) k$ M( w<p><strong>注意到没有,Worker是被包装在Option内的</strong>,这里有两个点需要注意</p>0 e" X. m& w4 A1 \6 b/ I
<ol>
$ Q& c) \5 |$ A<li>t.join 需要持有t的所有权</li>1 Y) U0 H; P& Q  z
<li>在我们这种情况下,self.workers只能作为引用被for循环迭代。</li>& ?6 t/ q: V  d) A
</ol>& k5 [$ A; D2 D( H- y1 v; M8 r+ v/ @6 w
<p>这里考虑让Worker持有<code>Option&lt;JoinHandle&lt;()&gt;&gt;</code>,后续可以通过在Option上调用take方法将Some变体的值移出来,并在原来的位置留下None变体。<br>
4 ^6 W2 o# N3 l, s% r, l' A3 B换而言之,让运行中的worker持有Some的变体,清理worker时,可以使用None替换掉Some,从而让Worker失去可以运行的线程</p>$ ]& E. Q, A& D6 N% n$ f2 R6 M
<pre><code>struct Worker where
' q& }$ D! K/ w! L8 D{, m) j: l5 q. j5 I8 ^4 p9 w  V# a7 b
    _id: usize,
( ~0 f( m! _) S( t8 ?. k9 S    t: Option&lt;JoinHandle&lt;()&gt;&gt;,
, f' w2 A6 V/ N7 d}. X  s6 ^" s$ f0 k, r. I0 g3 S
</code></pre>2 w9 e- B4 @8 Q8 y* e) ?& R
<h1 id="要点总结">要点总结</h1>% G/ `0 i4 ]3 D8 ?( x9 \
<ul>5 a- K* ]3 P2 V. i$ u+ _. C
<li>Mutex依赖于生命周期管理锁的释放,使用的时候需要注意是否逾期持有锁</li>
' r9 w  j7 Q8 f9 ?* _* A' f<li><code>Vec&lt;Option&lt;T&gt;&gt;</code> 可以解决某些情况下需要T所有权的场景</li>
/ j, b2 w$ Z3 d. v* y. N1 f</ul>
7 _9 S, k. G- ^) K8 y1 ?+ Z<h1 id="完整代码">完整代码</h1>
$ T: d& ~: b) I5 \( E<pre><code>use std::thread::{self, JoinHandle};
: C: i1 n7 n, t7 Q# F4 E8 iuse std::sync::{Arc, mpsc, Mutex};
: h" ~3 J4 I& k$ \# V/ S7 s* R8 d2 V, ?( C
. K" W# p% b) v
type Job = Box&lt;dyn FnOnce() + 'static + Send&gt;;
. w8 |4 t9 v  aenum Message {% ?6 y" r" {* e+ T8 {; k
    ByeBye,. Y1 h/ L7 p! k$ L4 x3 a
    NewJob(Job),  d) @/ c) \  T7 ~& i, e. g4 ~) r3 C
}3 Y  f* j5 P+ @! t& e

& P. t7 u% {, _- m) Lstruct Worker where
3 A3 L. v; }9 j- O2 k% s{3 r/ F9 I$ D- a# n3 k0 z0 u7 T4 D  u
    _id: usize,- z" a9 ]3 U* x1 N* ]0 M/ R
    t: Option&lt;JoinHandle&lt;()&gt;&gt;,+ O% g) w5 H/ r' j4 h1 M$ C
}
) A6 U0 S& }& A" {6 ~8 f6 w( Y- m1 A1 j1 y9 y  d
impl Worker
. c+ r! R# b* J{
' y  V' V$ _) X7 j' Q  s- W    fn new(id: usize, receiver: Arc::&lt;Mutex&lt;mpsc::Receiver&lt;Message&gt;&gt;&gt;) -&gt; Worker {8 w) e4 l  x9 a2 h6 i" Q
        let t = thread::spawn( move || {
3 c8 n: P1 O' e: C; Z& T  @            loop {4 ?# K* J2 |3 V* h& q
                let message = receiver.lock().unwrap().recv().unwrap();' X0 v7 e/ H" m: `
                match message {- `8 k0 o  x3 m( e; {  @+ B* _
                    Message::NewJob(job) =&gt; {, t& m$ x( H8 }& v
                        println!("do job from worker[{}]", id);
, z/ [" Q4 w" r6 f                        job();  r5 }" v* [4 L0 m: M- M
                    },! v: t' m- a& W. N
                    Message::ByeBye =&gt; {
7 o4 ^( A! j9 G) h7 j6 c0 g5 x3 O                        println!("ByeBye from worker[{}]", id);' U, ~9 k. _9 H$ \0 j, W, b1 e
                        break. g' O$ u( F; S8 m/ s
                    },$ N. z! N5 R/ G; d* z
                }  
6 T: |! H5 g$ T% D7 h5 _            }4 b% X6 e5 I- B9 K% [+ x
        });
3 I- t- K, w# s7 N1 S* T3 |1 t( H
        Worker {! W2 e+ l$ n! J- O( ]" b
            _id: id,
6 y; ]2 Y4 i+ W) f, {            t: Some(t),2 @9 c' U' z- f
        }: Q1 g( [, x" u& o0 R6 Z( J
    }
$ O+ M4 N( @" l" E+ Y' I* u" c}) I  d( r6 H& k1 G6 r2 T9 Z

1 `! G$ o1 j# F1 bpub struct Pool {! X: v# V* N9 {
    workers: Vec&lt;Worker&gt;,2 R/ T* W: ^4 _7 ^; o
    max_workers: usize,  O2 v. t3 A1 V0 `0 `" C
    sender: mpsc::Sender&lt;Message&gt;' u' L. _+ M: f3 I* O
}
+ s9 Z3 `; ]& y+ z
* |  J/ _* v! T$ O' \impl Pool where {
0 C( s0 b7 V+ H6 `0 y    pub fn new(max_workers: usize) -&gt; Pool {" [; E, _5 A; B! K4 R% G# ~8 \
        if max_workers == 0 {
9 A8 Y2 w0 B' E; V( _/ m; [5 C            panic!("max_workers must be greater than zero!")
3 |) T, H; n3 v. C' V        }3 t, k( y) q7 M% }9 z1 i
        let (tx, rx) = mpsc::channel();
2 B2 z5 E6 f5 B7 M
/ F' T1 H9 d- \        let mut workers = Vec::with_capacity(max_workers);
" {2 v# j7 F/ K5 f        let receiver = Arc::new(Mutex::new(rx));
( z* ^) D& r; _8 W- `        for i in 0..max_workers {9 U4 ~7 V1 i8 K  y6 _
            workers.push(Worker::new(i, Arc::clone(&amp;receiver)));9 Z: m6 l1 o9 n. ~: Z- y1 g% ^
        }
+ I0 V7 \, O- c- d* O4 Y: m+ p9 }0 w" K- D4 D" U2 d
        Pool { workers: workers, max_workers: max_workers, sender: tx }  }  ~1 r; }0 W1 J$ q! C+ N9 S# ?9 ?
    }
8 K! S  n4 u/ l8 p    9 `  c" H$ |0 E
    pub fn execute&lt;F&gt;(&amp;self, f:F) where F: FnOnce() + 'static + Send0 A9 \1 t* i) q0 ^
    {
% }, z6 z3 g6 J8 s# T/ t  E$ s+ P/ ^+ N' m$ l  ]! H
        let job = Message::NewJob(Box::new(f));
  C% @. a- W' s3 F% h8 }3 l" p        self.sender.send(job).unwrap();
: s+ B, X4 P3 L7 v1 ?- n8 A) Y! l0 k  D" D    }4 u9 m% n9 m; r# Q$ ~
}2 ^! ~+ [& Q# w

, P4 B# X2 Q2 B6 ^* Cimpl Drop for Pool {& H- w5 @- y7 F  \4 D. ?- j
    fn drop(&amp;mut self) {
; H5 q/ q, x/ i1 A1 R        for _ in 0..self.max_workers {: ?5 W6 J& x  @; N
            self.sender.send(Message::ByeBye).unwrap();0 A8 ?8 M, n  C- y! k- m' V
        }3 [' F1 c* W0 G( r
        for w in self.workers {/ G5 A1 Y& K& `$ e' F$ B
            if let Some(t) = w.t.take() {
2 Q' C' g- b3 B7 o8 ]                t.join().unwrap();& }, B" ?3 f! r- S
            }
/ t/ E+ G* m, X+ }0 D        }# B# r: B6 r; K6 C) v- d% a
    }& Q; o1 G! \0 r" X, `
}
: p, Q. v& a% N5 ]0 n0 }  @; B1 y/ l+ o# u

: N- F5 H$ E+ V5 U, g1 X9 c) k- _#[cfg(test)]
4 |' e( H4 a4 O% Nmod tests {$ v1 ^' J" k* p9 J* l2 y2 N
    use super::*;
5 e( I& T/ @0 R. J/ S: ~    #[test]- l* ^: r. K) z& _, L$ f  A' O" n
    fn it_works() {
0 k1 P. q. Y- s" r' A* B        let p = Pool::new(4);
. w" w7 D  @6 Y0 B        p.execute(|| println!("do new job1"));
1 q6 V5 p2 x/ F+ m2 J$ Z+ n. R        p.execute(|| println!("do new job2"));
+ }( {- X1 z7 y! E3 u1 u, {        p.execute(|| println!("do new job3"));( x8 @* M% j# D! ~5 g3 r6 m# ?
        p.execute(|| println!("do new job4"));( {! c4 W6 W' ?; e
    }' l$ O: L8 R2 S9 d' M
}
* g, T% W. `8 v0 w* x</code></pre>( L0 T6 y& x6 P% X- D, ~0 b- E! y

" O/ g: ^  P7 G, [3 \0 r- `
回复

使用道具 举报

懒得打字嘛,点击右侧快捷回复 【右侧内容,后台自定义】
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

手机版|飞雪团队

GMT+8, 2025-7-1 05:38 , Processed in 0.061294 second(s), 22 queries , Gzip On.

Powered by Discuz! X3.4

Copyright © 2001-2021, Tencent Cloud.

快速回复 返回顶部 返回列表