飞雪团队

 找回密码
 立即注册
搜索
热搜: 活动 交友 discuz
查看: 14396|回复: 0

rust 实战 - 实现一个线程工作池 ThreadPool

[复制链接]

7953

主题

8041

帖子

2万

积分

管理员

Rank: 9Rank: 9Rank: 9

积分
26189
发表于 2022-2-12 14:35:42 | 显示全部楼层 |阅读模式

* A6 r4 A$ N; V6 I4 T+ T4 F$ q, @<h1 id="如何实现一个线程池">如何实现一个线程池</h1>
( x: T' ~/ I6 Y' m4 f7 l: y/ [<p>线程池:一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。 例如,对于计算密集型任务,线程数一般取cpu数量+2比较合适,线程数过多会导致额外的线程切换开销。</p># h& ^5 d( ~8 W3 y3 T4 u$ t
<p>如何定义线程池Pool呢,首先最大线程数量肯定要作为线程池的一个属性,并且在new Pool时创建指定的线程。</p>
9 p6 A. Y, n1 u9 c( V<p>线程池Pool</p>. O! K- O) Z6 U' v' z, T
<pre><code>pub struct Pool {
4 X( @% W: S6 X/ H, A; U( }  N  max_workers: usize, // 定义最大线程数; W0 M+ {  }2 g% `( g
}
  W- P: Y: N, g* P
( Z5 `& ^* h: S, V( U6 D7 F! Dimpl Pool {
1 o8 `* V: T) ^1 J5 z3 ^  T/ j3 @  fn new(max_workers: usize) -&gt; Pool {}
# r0 L/ K  U2 |/ B. W# i8 |  fn execute&lt;F&gt;(&amp;self, f:F) where F: FnOnce() + 'static + Send {}( L4 J. ]+ H  _2 i2 j$ e
}
" h$ R% T9 w1 v4 C' J4 v% P7 P$ a- y
</code></pre>
3 K! ]8 @* l  i<p>用<code>execute</code>来执行任务,<code>F: FnOnce() + 'static + Send</code> 是使用thread::spawn线程执行需要满足的trait, 代表F是一个能在线程里执行的闭包函数。</p>
4 R2 R( G; ~" h  u# M<p>另一点自然而然会想到在Pool添加一个线程数组, 这个线程数组就是用来执行任务的。比如<code>Vec&lt;Thread&gt;</code> balabala。这里的线程是活的,是一个个不断接受任务然后执行的实体。<br>
2 a8 ~3 b3 e. M  d. j可以看作在一个线程里不断执行获取任务并执行的Worker。</p>) C1 Q! d" j% N; h$ q- z2 Y, P
<pre><code>struct Worker where
' s+ I0 F' L7 u! Z! U{
# F0 ?; c* _/ s, p" {5 e; q    _id: usize, // worker 编号+ v4 g! p: y. g  i, E+ T
}
% u5 S7 [* Y# {0 m7 U" l2 }, h</code></pre>
. ~8 u/ K6 S" a3 O1 x<p>要怎么把任务发送给Worker执行呢?mpsc(multi producer single consumer) 多生产者单消费者可以满足我们的需求,<code>let (tx, rx) = mpsc::channel()</code> 可以获取到一对发送端和接收端。<br>3 N; w* a$ {* l7 m" n
把发送端添加到Pool里面,把接收端添加到Worker里面。Pool通过channel将任务发送给多个worker消费执行。</p>5 H2 R' j0 l0 ~# S* R  z
<p><strong>这里有一点需要特别注意,channel的接收端receiver需要安全的在多个线程间共享</strong>,因此需要用<code>Arc&lt;Mutex::&lt;T&gt;&gt;</code>来包裹起来,也就是用锁来解决并发冲突。</p>
% n* I# k0 u$ ~$ E8 J<p>Pool的完整定义</p>. n2 J2 k6 v" K: B- `1 q% d8 b2 D
<pre><code>pub struct Pool {
5 g( F' M! H+ A0 u% o: X    workers: Vec&lt;Worker&gt;,
% ?& x9 c; U2 P7 Q    max_workers: usize,
$ ]  o$ Q0 [$ c    sender: mpsc::Sender&lt;Message&gt;
: l0 R  ?4 |# c8 M% t}
1 ^3 p1 f! s4 f4 g& ^8 q</code></pre>
) t9 ?9 ?+ I) n<p>该是时候定义我们要发给Worker的消息Message了<br>
/ ?1 w" i( O* ~定义如下的枚举值</p>- w0 r% P- c" P
<pre><code>type Job = Box&lt;dyn FnOnce() + 'static + Send&gt;;
) M% q9 [# a' [enum Message {5 B5 A; Z4 }; J! D0 g) a; C- h4 |
    ByeBye,7 l- `% a. j  Z- Z: o
    NewJob(Job),& ]: Q& i! V+ @% z3 Z
}
# }5 [  ~5 I4 T8 i/ j( a</code></pre>& Z* s5 r1 u7 u2 E5 Q- T  O/ [
<p>Job是一个要发送给Worker执行的闭包函数,这里ByeBye用来通知Worker可以终止当前的执行,退出线程。</p>8 s0 m& ]' [8 P
<p>只剩下实现Worker和Pool的具体逻辑了。</p>
& Q9 N) J4 {5 i- F<p>Worker的实现</p>$ e( M" z& H" a: I. f
<pre><code>impl Worker
" y+ s' y/ m. x! `) Z{
" H- e8 }/ _! o* i6 A1 m    fn new(id: usize, receiver: Arc::&lt;Mutex&lt;mpsc::Receiver&lt;Message&gt;&gt;&gt;) -&gt; Worker {, |: ^/ G2 S9 V* k/ J: E
        let t = thread::spawn( move || {
* c; s+ p5 n. h/ [5 X1 C            loop {
- w; _5 S+ u2 f  o% I/ [% J8 B                let receiver = receiver.lock().unwrap();
1 r2 p, x3 B% I! T3 Q0 u                let message=  receiver.recv().unwrap();
' t4 q" h7 @1 R% V                match message {
1 q; t# X4 L, M; z) h                    Message::NewJob(job) =&gt; {
: T; f  y7 L5 X1 s                        println!("do job from worker[{}]", id);
  G- R# m8 [% `9 N# K                        job();
7 N" c/ n3 S! R% f1 X7 x3 z: z                    },
2 h/ F, e- N* H# r2 {8 v                    Message::ByeBye =&gt; {
/ I. W1 k! w9 R! y                        println!("ByeBye from worker[{}]", id);
7 F& U1 d$ t( ?0 I) O. x                        break
# S- ]9 c! x1 }5 v* N                    },
% ~, `6 c( \8 }2 s2 p# ]5 G+ z                }  
/ ?8 q/ ?* g# `* I0 r  |            }. ]& y$ z+ m1 {
        });
! [- r' p) N1 }( I5 h
0 w+ [: r" \5 ^9 g        Worker {
! d/ t1 }( ~/ z" H+ ^) u            _id: id,5 D7 G4 ~1 v: }  X$ N; \. C# n
            t: Some(t),
  A: H' e- q  z' E) z( {        }4 `9 |8 f5 n0 R8 }
    }
. b( K9 v6 }2 p- m  w}0 b( Y. u4 ?% i5 e# {
</code></pre>
; J) O1 T, G" q* E! r<p><strong>let message = receiver.lock().unwrap().recv().unwrap();</strong> 这里获取锁后从receiver获取到消息体,然后let message结束后rust的生命周期会自动释放掉锁。<br>$ O; S! i/ Y6 O7 Y# T0 i; r& v
但如果写成</p>
2 o* ]  ?( e% J1 F: v' k<pre><code>while let message = receiver.lock().unwrap().recv().unwrap() {
/ W& j" ?) M+ A$ Q};" U( B/ {" H) _9 t" _- g
</code></pre>
8 P0 r% }& r9 t+ w! T. r<p>while let 后面整个括号都是一个作用域,要在这个作用域结束后,锁才会释放,比上面let message要锁定久时间。<br>
4 T. a4 g8 n' W6 Rrust的mutex锁没有对应的unlock方法,由mutex的生命周期管理。</p>
) N# v2 Q! A. {<p>我们给Pool实现<code>Drop</code> trait, 让Pool被销毁时,自动暂停掉worker线程的执行。</p>3 C; V; N- t0 q7 [$ r1 F, t
<pre><code>impl Drop for Pool {
0 V$ v9 x4 X9 e    fn drop(&amp;mut self) {
3 \- [3 M, i+ K- D5 X' w7 q        for _ in 0..self.max_workers {* |$ Y9 ?0 P; f3 p4 a. m
            self.sender.send(Message::ByeBye).unwrap();& I2 ]# }+ d$ e3 j
        }
, @; J5 _% a' @  {, F        for w in self.workers.iter_mut() {# B& j2 a5 l' [9 Q! H4 c
            if let Some(t) = w.t.take() {
( @$ x" R0 X% P% S0 {/ s                t.join().unwrap();
6 q! y7 ~! g6 e0 k" }& _            }8 Z* ]% }( l0 e0 d$ O
        }
6 k7 O& h3 ~+ I/ _+ L6 J    }
4 m$ R8 M  x: Y, w, h: q}
/ r2 w" w$ ?8 Q, U2 @5 x( E) X& Z8 O3 h! M# g1 I& M
</code></pre>
; ~# l/ _% D, B# a2 l# x<p><strong>drop方法里面用了两个循环</strong>,而不是在一个循环里做完两件事?</p>) R! |4 e) h) ~# Q" v
<pre><code>for w in self.workers.iter_mut() {; D4 f$ J- Z7 j
    if let Some(t) = w.t.take() {7 l- ]/ Y+ F$ ?. n: U1 x
        self.sender.send(Message::ByeBye).unwrap();
; q% H  B0 X  b: Z* i        t.join().unwrap();
: |3 Z: G* P0 F    }
+ O# x: |5 t! ]& @4 A}
$ T( }6 L; _& D. v: S
' ~% @% k; r  k$ n</code></pre>+ ?+ J. b- l# B/ E
<p>这里面隐藏了一个会造成死锁的陷阱,比如两个Worker, 在单个循环里面迭代所有Worker,再将终止信息发送给通道后,直接调用join,<br>1 v' f/ [: C7 R" }- ]) J
我们预期是第一个worker要收到消息,并且等他执行完。当情况可能是第二个worker获取到了消息,第一个worker没有获取到,那接下来的join就会阻塞造成死锁。</p>
1 x# Y8 e% V% N: z! I  D<p><strong>注意到没有,Worker是被包装在Option内的</strong>,这里有两个点需要注意</p>8 s/ ]( u1 B! j/ s
<ol>
! i6 D! [! ?+ n( H4 b<li>t.join 需要持有t的所有权</li>
; B+ F, _" N. j( ^& k" X- h) H<li>在我们这种情况下,self.workers只能作为引用被for循环迭代。</li>
: T1 H# ~9 s7 T) j% }) R2 u</ol>% t7 ~5 ^- `) y; M- f
<p>这里考虑让Worker持有<code>Option&lt;JoinHandle&lt;()&gt;&gt;</code>,后续可以通过在Option上调用take方法将Some变体的值移出来,并在原来的位置留下None变体。<br>
$ `* M& O, p0 s3 A5 L( h! c: p1 H换而言之,让运行中的worker持有Some的变体,清理worker时,可以使用None替换掉Some,从而让Worker失去可以运行的线程</p>* B1 p5 j/ K9 N  ]' w* c% W
<pre><code>struct Worker where
  Q$ O/ s5 ]  D/ a; ?{% b3 [1 J5 V' T- h3 B6 B) E
    _id: usize,
% U- B" \- s  r( L( l    t: Option&lt;JoinHandle&lt;()&gt;&gt;,
. t$ A1 O+ B  k# T$ w: t}
* b& }0 Y. o4 _1 S</code></pre>
4 U; E3 \( H9 m8 H8 q6 Y) t<h1 id="要点总结">要点总结</h1>
5 C& p1 v/ j+ O% v3 F5 E<ul>
! K, p% @* J+ T& G) K5 Z<li>Mutex依赖于生命周期管理锁的释放,使用的时候需要注意是否逾期持有锁</li>
' ~+ N- |5 P5 _. }<li><code>Vec&lt;Option&lt;T&gt;&gt;</code> 可以解决某些情况下需要T所有权的场景</li>
$ L/ y' a- V# n+ `/ i</ul>
: |: Y* v- Z5 s& o<h1 id="完整代码">完整代码</h1>
) N6 |2 \( b' @7 \& B$ C5 Y- u<pre><code>use std::thread::{self, JoinHandle};8 Q5 R3 F* [2 f: z5 i
use std::sync::{Arc, mpsc, Mutex};% F, \# U2 D2 D* z8 c* X

! O( O, U7 [/ l" S9 ~* i1 Z# |( s; B- H
type Job = Box&lt;dyn FnOnce() + 'static + Send&gt;;
- U  |3 x; Z) A! h; N( Menum Message {5 q! ~, @; Z/ k
    ByeBye,& ?2 w, J" T7 i) ?
    NewJob(Job),: ~! g0 _2 C/ b! p+ Q0 b( }) W# X
}  Q; I0 O7 \, {
. `+ f0 m4 D* I; p
struct Worker where
% I- o# Y4 l: V3 I8 B" a{
0 R1 J- k$ \, L9 w+ ^# ?    _id: usize,
3 T1 q' {3 H1 A) z9 c# j; i    t: Option&lt;JoinHandle&lt;()&gt;&gt;,
' \/ R1 @: h! \}) I! v5 N- c! K! J* D0 s" x

1 {- U4 O) k: S: C  L) B* Aimpl Worker
. k' e3 N0 L' H  v0 T{& _7 f; D/ e, T, Y  M$ d# i4 A
    fn new(id: usize, receiver: Arc::&lt;Mutex&lt;mpsc::Receiver&lt;Message&gt;&gt;&gt;) -&gt; Worker {
# _* d5 ]/ j- f5 e$ L: x8 N        let t = thread::spawn( move || {
/ o4 L  k. w% o# T6 l$ N- l8 s            loop {
; m: Y$ z/ b6 v  c( k' g1 [8 @                let message = receiver.lock().unwrap().recv().unwrap();
1 }  w& r8 B/ Q% w6 g                match message {# ]- h( ^7 `4 V: t! _
                    Message::NewJob(job) =&gt; {' {$ x4 b% d' X3 T0 Y; q' P$ i/ I) e
                        println!("do job from worker[{}]", id);
8 B* G1 Q- M) t; V                        job();
8 v4 e2 R% V/ H3 W+ _9 c( Y- h                    },, I: G* k! j4 @+ u' L4 Y; P5 M0 j7 Q. F# L
                    Message::ByeBye =&gt; {
5 h, h( f0 r* i* ?3 m. n/ u$ A                        println!("ByeBye from worker[{}]", id);
$ C: e$ ^: Q0 \+ G! f% q                        break0 r5 }" T' |; \' a
                    },
: V% @3 z9 `1 v/ c% c/ q                }  6 A, w/ F5 x! u* y. v
            }
7 s7 j; n& t1 J        });
4 D1 D9 L- \, n! @: \
2 H2 E" u3 l, m/ J9 I        Worker {( `! Q! G! m6 W3 G9 K# [3 W
            _id: id,
9 b; G0 T  m4 p# a) Y( _            t: Some(t),4 y( w. ?1 K5 q5 V  }' l, j
        }
* h# U; J0 ^# J) n    }
( F( P) D3 c) @}) f; y/ `9 Q0 w; K
2 M% G9 O0 O1 C" E% ^
pub struct Pool {
# b/ J1 N- ?- ^) e3 B2 Q    workers: Vec&lt;Worker&gt;,
3 J$ h, c" y, y! |. b/ ^% S. `* z    max_workers: usize,
$ V1 e, T+ l' a    sender: mpsc::Sender&lt;Message&gt;9 W, O3 j/ a! S8 j
}
) L; M% [4 A4 W- d( q4 c1 K3 k
7 q& s: ~2 M+ B& \/ Oimpl Pool where {6 `( j% s; ~$ L2 ~* `5 A' V
    pub fn new(max_workers: usize) -&gt; Pool {
) E* @( j& j9 Q# \5 _* W        if max_workers == 0 {) \- h! [8 V, z7 u, K
            panic!("max_workers must be greater than zero!")
9 o1 G* B: F6 I- t+ n        }2 L# l5 t4 e7 V7 J: O7 Q
        let (tx, rx) = mpsc::channel();
: k7 x3 t: s0 ~9 y+ J( l$ s9 \/ S1 o$ z2 a
        let mut workers = Vec::with_capacity(max_workers);! G4 ^2 }& C5 e" @4 k" i
        let receiver = Arc::new(Mutex::new(rx));
) }. h% ^! O: {        for i in 0..max_workers {, R) ?3 j  F7 J9 y  @" E; V
            workers.push(Worker::new(i, Arc::clone(&amp;receiver)));
5 B4 s+ |: Q  `/ p" ]$ Q) q        }
: Y) ~8 p+ `! Z: O' v3 T4 ]1 l0 n4 i$ m( E) ]
        Pool { workers: workers, max_workers: max_workers, sender: tx }
' U0 F6 s6 Y* B+ ]: J$ f! N    }7 u# K5 J) R3 S1 C+ U
    0 s, H/ d% }% g! K! _" X( b$ L
    pub fn execute&lt;F&gt;(&amp;self, f:F) where F: FnOnce() + 'static + Send
, p9 D: F) h4 B1 }4 w/ [+ _- E/ _    {
' R7 n9 M: K, f6 t7 U" o) M2 u0 P# y( `
        let job = Message::NewJob(Box::new(f));
: ~: i( g2 I- U- M8 _- S        self.sender.send(job).unwrap();% A. e- ~% ^0 T9 X* a
    }
, E) n# p7 U6 F  a8 d}! H! Z  F$ Z" w- K3 V* m

( v4 x7 I# B. h- h# Gimpl Drop for Pool {
" W6 J' S# g$ j/ j    fn drop(&amp;mut self) {  n+ u- C- n% s: n
        for _ in 0..self.max_workers {# S/ P* Z5 E  ~3 r" j
            self.sender.send(Message::ByeBye).unwrap();# [% X6 X8 \8 S  v, p
        }
. x% W# Z0 T' I1 e; `7 q        for w in self.workers {6 a. K7 Y* W9 u0 v! f* a5 n1 o
            if let Some(t) = w.t.take() {
" t* T+ g, ?: H  U( _2 @: M& Y* C                t.join().unwrap();  k3 |6 _/ x- [0 J& M, r# E
            }0 y" C) r4 J2 r: {# d3 u
        }
( ~6 R% W& G, o) r' W    }
; X% q0 Z8 h4 {}6 k' ]  T$ r& K5 }- P

7 K* _* c$ j; u1 ]
) J  F, d9 m6 o7 C( C% K! i3 @6 Y#[cfg(test)]. K! U3 J/ |7 y
mod tests {
3 c! x  Q' E# H    use super::*;/ }. _5 g6 s6 S1 Q
    #[test]
$ a: ?1 `4 C- ?- L, @5 o( a- }    fn it_works() {* D; Z+ V7 \" A! c0 Y) H
        let p = Pool::new(4);
/ z1 s3 Y6 e: I5 T        p.execute(|| println!("do new job1"));
! q8 R) o8 t2 }) s. C9 Y        p.execute(|| println!("do new job2"));* r0 A$ R( f( J/ Y' u
        p.execute(|| println!("do new job3"));2 F& H* X/ q7 q4 Z, d
        p.execute(|| println!("do new job4"));
# B; z& ~) Y1 B" ~    }' s! H) r. s* W7 i
}
) r5 L% K( v  r7 v$ u$ b</code></pre>  [" \& \6 r: G7 j8 E5 j1 {

, Q& p3 W7 n+ r' _
回复

使用道具 举报

懒得打字嘛,点击右侧快捷回复 【右侧内容,后台自定义】
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

手机版|飞雪团队

GMT+8, 2025-11-24 14:39 , Processed in 0.067677 second(s), 21 queries , Gzip On.

Powered by Discuz! X3.4

Copyright © 2001-2021, Tencent Cloud.

快速回复 返回顶部 返回列表