飞雪团队

 找回密码
 立即注册
搜索
热搜: 活动 交友 discuz
查看: 14856|回复: 0

rust 实战 - 实现一个线程工作池 ThreadPool

[复制链接]

8059

主题

8147

帖子

2万

积分

管理员

Rank: 9Rank: 9Rank: 9

积分
26507
发表于 2022-2-12 14:35:42 | 显示全部楼层 |阅读模式
6 Z- c- o7 u( Q* m" Y& k( O' Z
<h1 id="如何实现一个线程池">如何实现一个线程池</h1>. }3 O0 k" {/ z4 G' w
<p>线程池:一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。 例如,对于计算密集型任务,线程数一般取cpu数量+2比较合适,线程数过多会导致额外的线程切换开销。</p>- M6 W# r. r7 s2 C) r
<p>如何定义线程池Pool呢,首先最大线程数量肯定要作为线程池的一个属性,并且在new Pool时创建指定的线程。</p>1 o9 ]$ t* ^0 k6 d8 t# z; E
<p>线程池Pool</p>' a+ t0 I8 b( T2 s" `3 ?+ w* Y
<pre><code>pub struct Pool {$ k  u" |9 X7 V8 }4 u& a/ k
  max_workers: usize, // 定义最大线程数; H' S% ?  h  r" i* @+ s
}
1 P* F' ~7 I+ Z0 D
. n1 ^' c7 S: J# p' i! v& oimpl Pool {& w) f" Q) {1 O) w& J/ p- k: w
  fn new(max_workers: usize) -&gt; Pool {}; l4 h( y0 {8 f) l- _3 k
  fn execute&lt;F&gt;(&amp;self, f:F) where F: FnOnce() + 'static + Send {}
" R$ D9 T& K) V) I6 W4 M}0 B4 h' L6 S& W( W3 t: }

, w- I4 O7 Y( ~. G9 F- b</code></pre>1 a" X9 }' |' G1 E+ X1 x. [$ R0 L4 a
<p>用<code>execute</code>来执行任务,<code>F: FnOnce() + 'static + Send</code> 是使用thread::spawn线程执行需要满足的trait, 代表F是一个能在线程里执行的闭包函数。</p>
0 L3 i- E* k! Z; M+ u8 V" y<p>另一点自然而然会想到在Pool添加一个线程数组, 这个线程数组就是用来执行任务的。比如<code>Vec&lt;Thread&gt;</code> balabala。这里的线程是活的,是一个个不断接受任务然后执行的实体。<br>
: |6 C0 n& u8 m4 o) S' |% l可以看作在一个线程里不断执行获取任务并执行的Worker。</p>1 {" B- U( ^; k- R" {
<pre><code>struct Worker where, A0 `5 m. x5 n( q1 B" T4 t
{. S8 N$ p  v; [+ y( L
    _id: usize, // worker 编号
" K2 b% h) A8 f) ^}
% R) q% Y) R1 T</code></pre>- W. R- ^( V) L1 w
<p>要怎么把任务发送给Worker执行呢?mpsc(multi producer single consumer) 多生产者单消费者可以满足我们的需求,<code>let (tx, rx) = mpsc::channel()</code> 可以获取到一对发送端和接收端。<br>
+ U2 E' P5 o# {6 i) Y" C) d- Z把发送端添加到Pool里面,把接收端添加到Worker里面。Pool通过channel将任务发送给多个worker消费执行。</p>
( C$ l( q  m- w4 s+ d3 f7 P<p><strong>这里有一点需要特别注意,channel的接收端receiver需要安全的在多个线程间共享</strong>,因此需要用<code>Arc&lt;Mutex::&lt;T&gt;&gt;</code>来包裹起来,也就是用锁来解决并发冲突。</p>2 X- u/ `1 g- G
<p>Pool的完整定义</p>( h6 y6 Z( h0 Q2 y9 O% t+ ?& O" L& J
<pre><code>pub struct Pool {
+ K. \/ N# ]2 S3 P: G3 a" \- p    workers: Vec&lt;Worker&gt;,+ Z4 \6 E8 g# T' y  T5 M
    max_workers: usize,
6 g2 t5 ^/ x  O, V) n: s    sender: mpsc::Sender&lt;Message&gt;
1 M* P8 w! N! |. f}. u2 \2 v+ j# B
</code></pre>
0 o0 Y7 O' R( [) B1 d( D) b1 N<p>该是时候定义我们要发给Worker的消息Message了<br>
" n7 }8 F" z, J8 J1 E2 l7 ?) o4 k  t定义如下的枚举值</p>- h, k$ p5 ~: D3 E- y
<pre><code>type Job = Box&lt;dyn FnOnce() + 'static + Send&gt;;  E8 m# s/ C0 |$ Y0 y( a. F
enum Message {8 t" F( ~" O1 ?1 H6 E4 ?3 m% P" c
    ByeBye,% M# U$ k) _  m9 _: L. t& e/ O2 y
    NewJob(Job),
% X  u- y8 L. [8 a1 u! P}
% S; r, Y2 h4 ~  ?</code></pre>
# ]9 Z$ R$ D% [6 `( Z) ]0 v& X<p>Job是一个要发送给Worker执行的闭包函数,这里ByeBye用来通知Worker可以终止当前的执行,退出线程。</p>
& A; E0 P0 p8 c5 x1 h" b8 G) {<p>只剩下实现Worker和Pool的具体逻辑了。</p>$ `& ~) q/ e' X$ a0 b7 x
<p>Worker的实现</p>
0 ^( O/ z% H# O' G- N* {2 E) K<pre><code>impl Worker: i; i' S+ `4 F& R/ U
{2 a( ]% j( T1 ^% R3 O, t/ E
    fn new(id: usize, receiver: Arc::&lt;Mutex&lt;mpsc::Receiver&lt;Message&gt;&gt;&gt;) -&gt; Worker {. E0 Q$ V) b2 ?; F
        let t = thread::spawn( move || {  N8 r& c; _! X# C1 Q
            loop {! n! N0 |7 g1 ]9 N0 w6 d+ J! o; p) F
                let receiver = receiver.lock().unwrap();2 t+ e% B; ^" k9 b% q# h
                let message=  receiver.recv().unwrap();, d- J, M" l3 p
                match message {* S1 ?; G, }) R6 j3 U  U& w
                    Message::NewJob(job) =&gt; {
2 X8 ~; C- ?& L                        println!("do job from worker[{}]", id);- u1 O& f' e& {1 R6 \; V) ^7 t
                        job();8 Q6 F  I) e, _
                    },$ c# G! C" G6 B5 `) I6 l9 f
                    Message::ByeBye =&gt; {
  V; Z$ Z% I5 P8 I1 @                        println!("ByeBye from worker[{}]", id);
5 d% F" q  E$ |5 E4 p                        break
" d5 X5 S1 b( j7 N                    },$ |; o) F3 t# u3 U( O  @# T
                }  0 Y# o5 [( \- Q' U4 a$ [
            }
2 z  Y+ A0 S' J  o        });. f" M- `; K) ]  @* j0 W

, U2 |- L% U8 `9 M. T% q$ ^6 Y        Worker {
4 s/ M$ j* Z; X7 U: s            _id: id,
/ U& b! r+ N- t( J# c            t: Some(t),+ M' f3 R' n: q. c* I. s0 @8 P
        }
+ T7 R1 J. q" ?% ]) C6 B1 v+ R: ^6 Z    }
% \# o2 ^) t' T: S1 o}
. l7 V0 G6 c% L& F. E</code></pre>
$ c4 K) b6 g- x0 i<p><strong>let message = receiver.lock().unwrap().recv().unwrap();</strong> 这里获取锁后从receiver获取到消息体,然后let message结束后rust的生命周期会自动释放掉锁。<br>( U) J8 O7 f3 w5 ]2 L. D8 D9 U
但如果写成</p>8 I7 E4 @5 G3 ?0 L( K0 x
<pre><code>while let message = receiver.lock().unwrap().recv().unwrap() {! V( e/ t( O  |5 Z4 S1 d7 P0 U
};( P! z9 C0 L3 W8 b* M2 I/ c; Y
</code></pre>
% V; {4 |( S5 A3 Q* F3 q) T. E<p>while let 后面整个括号都是一个作用域,要在这个作用域结束后,锁才会释放,比上面let message要锁定久时间。<br>
% j, @- b  [" O9 `$ Wrust的mutex锁没有对应的unlock方法,由mutex的生命周期管理。</p>5 |+ i0 x2 w  m) F7 |4 \, @0 s
<p>我们给Pool实现<code>Drop</code> trait, 让Pool被销毁时,自动暂停掉worker线程的执行。</p>
* J# u8 c3 `" l4 w<pre><code>impl Drop for Pool {
/ a: D+ V1 V1 E    fn drop(&amp;mut self) {( N8 Z/ @) f: r& i1 N
        for _ in 0..self.max_workers {* j' ^7 \  G. x8 D
            self.sender.send(Message::ByeBye).unwrap();
! d# e, W5 ~: E        }
, ^7 _1 k) d2 i$ q7 i- n        for w in self.workers.iter_mut() {0 f: ~. O" b% x1 f- B8 W
            if let Some(t) = w.t.take() {4 u0 f5 @* x, U6 [* d8 G, R1 D
                t.join().unwrap();
( J$ N9 d6 N  y' A# l            }
" B$ O+ x7 D  E4 Z$ T        }' p) f1 t4 L5 M" L5 C
    }/ K) Z$ u# _' M; M6 r% e$ z# x
}
6 I7 x$ v* p' G0 w; O  r  x& `2 }9 f3 z% z7 t
</code></pre>
* y- e  A" ]& s/ R<p><strong>drop方法里面用了两个循环</strong>,而不是在一个循环里做完两件事?</p>+ a# ^3 W3 P+ Q3 }3 W
<pre><code>for w in self.workers.iter_mut() {; p: }1 L6 Q* G
    if let Some(t) = w.t.take() {
3 n# h( e3 B/ N7 x1 l. {        self.sender.send(Message::ByeBye).unwrap();
1 S* d- {2 X  c+ |3 N        t.join().unwrap();
+ `0 Y# L, \% m: G- B; Z    }9 e# A) _0 |: V; d* ~0 `
}
8 v" R) J0 P7 l: R- a
# z2 {" z! f; T) ]</code></pre>6 i  I8 a0 D$ O$ m$ y$ m5 j5 T
<p>这里面隐藏了一个会造成死锁的陷阱,比如两个Worker, 在单个循环里面迭代所有Worker,再将终止信息发送给通道后,直接调用join,<br>
7 `/ y& G8 [2 x! W# U' s6 U# a我们预期是第一个worker要收到消息,并且等他执行完。当情况可能是第二个worker获取到了消息,第一个worker没有获取到,那接下来的join就会阻塞造成死锁。</p>7 ]( H# i& _/ |* R- f- S
<p><strong>注意到没有,Worker是被包装在Option内的</strong>,这里有两个点需要注意</p>
0 i9 P/ F$ P- `2 I, v<ol>
$ }+ q/ d5 s" u<li>t.join 需要持有t的所有权</li>
. y& D0 \" W- ^* F. t. R0 l<li>在我们这种情况下,self.workers只能作为引用被for循环迭代。</li>1 T" v5 n4 D$ @4 H2 z1 o
</ol>
. O( D4 n: `+ R& K( v$ r1 ^" [<p>这里考虑让Worker持有<code>Option&lt;JoinHandle&lt;()&gt;&gt;</code>,后续可以通过在Option上调用take方法将Some变体的值移出来,并在原来的位置留下None变体。<br>* ^7 W3 ^) ?& C
换而言之,让运行中的worker持有Some的变体,清理worker时,可以使用None替换掉Some,从而让Worker失去可以运行的线程</p>6 Y% _  i" z1 f) @  C# j
<pre><code>struct Worker where
: E* ^0 n6 ~5 K{
5 J; T+ Z3 `' d9 v    _id: usize,! o3 ~% L& o- a
    t: Option&lt;JoinHandle&lt;()&gt;&gt;,3 Q& G7 ^4 ]# R# [: L" i) @- ]) Z/ m
}
" t0 f& {) b- @% }: ~' O, H0 \5 N</code></pre>* f0 \/ W) p; M- d5 `
<h1 id="要点总结">要点总结</h1>7 O; D2 a+ @1 `6 [: `4 {3 O
<ul>
9 e1 N8 h$ q2 c; K: R<li>Mutex依赖于生命周期管理锁的释放,使用的时候需要注意是否逾期持有锁</li>' G5 m4 V  i, o7 M( o
<li><code>Vec&lt;Option&lt;T&gt;&gt;</code> 可以解决某些情况下需要T所有权的场景</li>
- S0 G' G# w/ ~</ul>
( v: u8 k. }0 ^<h1 id="完整代码">完整代码</h1>
0 G: m* J+ e% `: B; d8 G<pre><code>use std::thread::{self, JoinHandle};$ F+ O  }) _; e0 q- B3 m
use std::sync::{Arc, mpsc, Mutex};' P1 I, u. |: v; O+ v1 x) }; u. w$ z5 L
5 p/ \/ N7 T  \0 d9 H. O, l' p0 [
7 p! }5 O; m9 p( f
type Job = Box&lt;dyn FnOnce() + 'static + Send&gt;;" {' i, \+ v, f  s+ L
enum Message {
  S+ f( E* X/ r, V1 K* p    ByeBye,
! H9 J8 F/ X* g6 Q    NewJob(Job),( y  u5 c5 E+ Q! J) @) E
}; W; a- ]/ U' V9 i3 C* s
) q- j3 r, B9 F
struct Worker where
; y1 Q9 `. Z# X2 Y8 h- z{
7 c+ |2 o) C7 s1 D9 ^  C# ~$ t1 ^    _id: usize,
1 a  y5 D. {8 U( a9 t6 U1 o* @/ y1 P    t: Option&lt;JoinHandle&lt;()&gt;&gt;,
" i; X# a, q6 q5 r7 v}
! w7 s$ l% I: _/ X: z: }3 d
- T, G( N% B! b3 c3 r% Himpl Worker
, i5 O( F) l. L: _{
* Z4 J# H: s8 v, l    fn new(id: usize, receiver: Arc::&lt;Mutex&lt;mpsc::Receiver&lt;Message&gt;&gt;&gt;) -&gt; Worker {
' ~( p9 V, h9 ~  W; X' ~! t, l! L        let t = thread::spawn( move || {
1 k: I) j' R  z4 E% e            loop {" @8 E9 w6 F7 ?1 D; Z$ J2 k7 U
                let message = receiver.lock().unwrap().recv().unwrap();
9 Z+ F2 l1 Z6 u# V                match message {
! y9 {$ U+ ]! h' z6 M+ ^6 }* [& h                    Message::NewJob(job) =&gt; {
' v  i8 G3 n  G9 t9 n  T! ^5 \                        println!("do job from worker[{}]", id);6 A# y7 B; b8 Z% k% b2 g# _9 L3 X
                        job();
& u' n6 x9 D+ b1 O                    },! T' _, b+ o6 A
                    Message::ByeBye =&gt; {3 Q- A+ B( b4 r5 V. H. r9 I
                        println!("ByeBye from worker[{}]", id);, Q* G3 s* O8 A' D
                        break
( W, v) f- m( g                    },
& v& f$ ~9 S( j' y) s% n% b$ |  }                }  " g: ?' h) N! N" U& d! Y
            }7 Y( k9 t% m* ]4 r! z! o) }2 j( _$ T
        });
1 Y! ~; \! t$ Y1 l6 e0 K* B
+ @3 @# z' X4 Q+ E) W6 b: P        Worker {
7 M1 P, E% _8 c/ R, g            _id: id,
6 H7 Q4 H' V/ m1 e            t: Some(t),
4 @' \1 x. g7 p% a2 C        }0 b3 N4 K2 M) _. Z2 @) h
    }5 p# V* J! _: t) u
}+ w$ Y7 b* K2 a- C" P* k* @2 e

0 F3 @! `% R( l2 r( j# N; [5 Kpub struct Pool {9 T# r% o6 E5 [3 I9 @, @, p
    workers: Vec&lt;Worker&gt;,
  E# o5 a# ?# ]. \2 E4 v: A- P    max_workers: usize,8 ^" n( F3 r8 T- ^" z) f2 \
    sender: mpsc::Sender&lt;Message&gt;$ Z, C% F( }% }# s8 v& V, A, Q
}0 @. ^0 ]& k) a' B! \* }$ F, n

: z+ [1 p; N" X. z. H. @impl Pool where {" b4 l  c! H7 N* n2 b
    pub fn new(max_workers: usize) -&gt; Pool {* y( C  i7 l1 I
        if max_workers == 0 {1 e5 n0 V+ O3 M) x0 j
            panic!("max_workers must be greater than zero!")
1 h! \0 u& l! H# E: ]        }
8 U9 [0 F; i6 ~: ^) S' z1 v- |* X! R        let (tx, rx) = mpsc::channel();
1 A4 A( Q! P; R2 }7 d! s4 e" n4 }/ i0 A
        let mut workers = Vec::with_capacity(max_workers);
* H0 ?2 a( F8 J        let receiver = Arc::new(Mutex::new(rx));; S1 D1 p9 V! _7 j9 Q+ W. }
        for i in 0..max_workers {
5 v" }  Z* n$ e* y            workers.push(Worker::new(i, Arc::clone(&amp;receiver)));
, T- K" s6 u5 j: c        }
4 L  D- M* ]6 C9 Z4 U5 i- T5 M
/ _' M! x; ?6 q8 V3 J3 s        Pool { workers: workers, max_workers: max_workers, sender: tx }
  I2 y* K$ P/ ^' l* O- w    }
; p$ ?' g/ x" B  Y! I( W7 e   
" h" D4 u3 t# L    pub fn execute&lt;F&gt;(&amp;self, f:F) where F: FnOnce() + 'static + Send6 g9 {$ |+ W/ T* J$ i9 d0 `5 I# H
    {
7 a. n/ T- I5 p% D3 u
9 O, A  ?# {3 \+ r        let job = Message::NewJob(Box::new(f));
5 T# A6 A' f# T3 O        self.sender.send(job).unwrap();7 }. e2 t5 r4 {7 P, G' }9 `) x
    }6 o9 V) g' V4 t. q
}
1 j2 A* \% _0 l2 @% A: o! h- `' F8 u1 @. b5 f# B* g
impl Drop for Pool {
0 l0 a& i0 c& t* }    fn drop(&amp;mut self) {
) o7 I! i4 i3 ^1 c        for _ in 0..self.max_workers {
" ^3 A2 |/ l2 _. N9 ~9 ^8 P+ l* X            self.sender.send(Message::ByeBye).unwrap();
0 H' j7 ~0 I! m. @, e        }$ t( U- `+ o: x3 E8 H4 z
        for w in self.workers {
. h1 w% [1 M, f( \  ]+ a            if let Some(t) = w.t.take() {( Z/ P" {7 F6 a) g; U6 G" ?
                t.join().unwrap();
% p3 R1 i3 N- ]; D0 `            }
+ J6 Y5 t5 g# t: N        }; l; Z9 q( a+ O
    }
# f% I# Q0 y9 K* A9 s4 u/ z, l0 k8 C" Y}
2 n0 k# ~7 D$ H7 \! K- c* A1 s# V! I0 E6 X/ u) z

6 W6 [6 Q: ?  L  C, }#[cfg(test)], M: s! k- F# ~/ @( \8 `& i
mod tests {
. t3 `: m: t# ~" D    use super::*;
% b7 G( v/ r/ K* `8 T+ x    #[test]8 o! O- ?- ?# Z& w3 |  P7 M
    fn it_works() {
! f% n8 o5 Q- s- J8 V        let p = Pool::new(4);
+ X' v' a! h$ w6 S& S. Q2 x        p.execute(|| println!("do new job1"));* H% F! V, r  L# e8 O
        p.execute(|| println!("do new job2"));
5 r9 a0 {& H( k0 E. ]( E        p.execute(|| println!("do new job3"));
& h2 ^8 q7 q- O        p.execute(|| println!("do new job4"));
# _; K% X* ]9 S* |7 R9 k! \    }9 l8 g# ^3 ~. G  X4 m
}, A* ?& j+ i' e+ l: {. C
</code></pre>
* t- V5 U- P7 ^
4 y" E  I4 k6 M: ~+ Q
回复

使用道具 举报

懒得打字嘛,点击右侧快捷回复 【右侧内容,后台自定义】
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

手机版|飞雪团队

GMT+8, 2025-12-14 17:34 , Processed in 0.116832 second(s), 22 queries , Gzip On.

Powered by Discuz! X3.4

Copyright © 2001-2021, Tencent Cloud.

快速回复 返回顶部 返回列表