飞雪团队

 找回密码
 立即注册
搜索
热搜: 活动 交友 discuz
查看: 5458|回复: 0

rust 实战 - 实现一个线程工作池 ThreadPool

[复制链接]

5344

主题

5432

帖子

1万

积分

管理员

Rank: 9Rank: 9Rank: 9

积分
18354
发表于 2022-2-12 14:35:42 | 显示全部楼层 |阅读模式
5 O+ J6 R* [1 Y; Q) {( j* d. M
<h1 id="如何实现一个线程池">如何实现一个线程池</h1>
9 d/ Y4 y  M4 B5 E6 S<p>线程池:一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。 例如,对于计算密集型任务,线程数一般取cpu数量+2比较合适,线程数过多会导致额外的线程切换开销。</p>. }6 e5 e  f# l1 `9 q, {0 c" ?
<p>如何定义线程池Pool呢,首先最大线程数量肯定要作为线程池的一个属性,并且在new Pool时创建指定的线程。</p>
- d# Q  @( _: K# W' y" p6 A  K<p>线程池Pool</p>$ }" w1 N6 L4 z6 [7 J) q# s
<pre><code>pub struct Pool {
1 d  P* ^; s: P% _  max_workers: usize, // 定义最大线程数% X2 u" S) u7 I! O' E3 r1 M
}% ^0 E  b) J3 y* ^
2 m* t: P0 K, B) V" I( t
impl Pool {
- \, b7 Z: K# v8 a$ w  fn new(max_workers: usize) -&gt; Pool {}
& D6 X: p& F! U- b  fn execute&lt;F&gt;(&amp;self, f:F) where F: FnOnce() + 'static + Send {}
0 w! N; u  X; H) \/ q}/ H: n  k7 B9 ?4 b3 u. \

$ z. `! ~* J( v; K9 b4 O</code></pre>( ~4 \- S8 X. q
<p>用<code>execute</code>来执行任务,<code>F: FnOnce() + 'static + Send</code> 是使用thread::spawn线程执行需要满足的trait, 代表F是一个能在线程里执行的闭包函数。</p>, f- W& F6 H0 H" K' X  w
<p>另一点自然而然会想到在Pool添加一个线程数组, 这个线程数组就是用来执行任务的。比如<code>Vec&lt;Thread&gt;</code> balabala。这里的线程是活的,是一个个不断接受任务然后执行的实体。<br>1 _. l9 _# D" ?7 q3 b6 E8 D
可以看作在一个线程里不断执行获取任务并执行的Worker。</p>
  p) ~( n' v- V$ v2 X<pre><code>struct Worker where" u6 p2 ~( M- r# `& z& g9 D
{+ h$ C$ L1 A, Z, \0 q3 Q9 X
    _id: usize, // worker 编号
1 E3 j& @9 S5 o. Z6 n}- v. n3 i: _9 t- k, T/ d0 L  W
</code></pre>4 L% O1 I- F7 `
<p>要怎么把任务发送给Worker执行呢?mpsc(multi producer single consumer) 多生产者单消费者可以满足我们的需求,<code>let (tx, rx) = mpsc::channel()</code> 可以获取到一对发送端和接收端。<br>% Y, X5 q- w& x$ t# F3 `3 ?; c
把发送端添加到Pool里面,把接收端添加到Worker里面。Pool通过channel将任务发送给多个worker消费执行。</p>) w) G! [+ b' ?7 I  S& O( m! y2 a
<p><strong>这里有一点需要特别注意,channel的接收端receiver需要安全的在多个线程间共享</strong>,因此需要用<code>Arc&lt;Mutex::&lt;T&gt;&gt;</code>来包裹起来,也就是用锁来解决并发冲突。</p>1 B: D$ ?  w7 ~# p; K( k
<p>Pool的完整定义</p>
2 s6 j" T' Y- k) L: q+ l<pre><code>pub struct Pool {
- Y8 i. g0 S2 k" P( ?    workers: Vec&lt;Worker&gt;,
9 F% D4 K  F# D! ?/ L    max_workers: usize,+ z" h) ~( m. s# a, c" F' A
    sender: mpsc::Sender&lt;Message&gt;2 ]" [2 \* H& M9 u+ z# v. `
}3 b% E5 G/ s7 S/ J5 |
</code></pre>/ q1 @" G1 b! L( V, N( A
<p>该是时候定义我们要发给Worker的消息Message了<br>
. @& S# A' T4 z: f: L定义如下的枚举值</p>
/ m& x( x1 ]% m7 Z+ l1 r+ {% Z<pre><code>type Job = Box&lt;dyn FnOnce() + 'static + Send&gt;;& H) v* p* \/ P
enum Message {* l3 a1 `3 N" B9 d! X# `
    ByeBye,! D% a6 |- H  E2 P- ~3 [- W
    NewJob(Job),
! c7 K3 W) G" J! S6 w& n* s}7 A. t" L  V0 o. w6 m! }/ y! T% v# t
</code></pre>
5 Z/ B% [4 m, t6 r# h  z3 e. U) W<p>Job是一个要发送给Worker执行的闭包函数,这里ByeBye用来通知Worker可以终止当前的执行,退出线程。</p>& }4 b: K% B1 e+ L& g  Y
<p>只剩下实现Worker和Pool的具体逻辑了。</p>
- r" B" ^/ B0 |; p2 m3 q<p>Worker的实现</p>
" r7 p/ ?: e$ J4 R6 v' H<pre><code>impl Worker4 w+ a  X. A* {. p
{
4 c$ X, J6 y- p8 W5 K* P0 h    fn new(id: usize, receiver: Arc::&lt;Mutex&lt;mpsc::Receiver&lt;Message&gt;&gt;&gt;) -&gt; Worker {# L6 _% I* k. g
        let t = thread::spawn( move || {; d% C) l# j5 E# j/ i
            loop {7 d: y$ {( A5 x
                let receiver = receiver.lock().unwrap();
- ]8 I& I$ H. Q7 m                let message=  receiver.recv().unwrap();& u, Y; w: B! X9 A2 j
                match message {4 l/ t0 Z' K9 y8 s0 r3 r3 N
                    Message::NewJob(job) =&gt; {
. T7 g) b+ u3 Q$ b% y' _, W) I* n                        println!("do job from worker[{}]", id);0 ]) a* @) @/ A& z- b3 ^/ C
                        job();
& p( h- `8 }! E4 i: y5 z, M' B                    },
2 [, A: `( a3 q  r                    Message::ByeBye =&gt; {' }3 p: O. y0 @0 v6 b7 z, W
                        println!("ByeBye from worker[{}]", id);1 m0 C. F9 W% c$ B( A* h% k, |
                        break
0 a7 Q  b* K( f7 v: y                    },
- T6 }3 c# i5 O( b' u% [                }  
  ]* ~! r0 U1 T* ^' {; Z# {            }
; c" w6 I: _3 c/ H        });
# l! j+ N7 Y  o
, V( E6 ~- ^# |) S1 `6 a: X0 ^        Worker {% X, ^8 O+ K% @
            _id: id,
5 a2 m2 n0 G) M( i; u6 {            t: Some(t),
+ C1 V; _2 h0 {: F! Q4 r" B( ?        }* w, D/ |4 b) s* b3 f- r
    }
  A. w8 s; ^1 r/ ^) U1 E1 |}
6 I, u# m6 O8 i" q% o( U  ]</code></pre>
9 p3 O. @: e- n  a, V1 I<p><strong>let message = receiver.lock().unwrap().recv().unwrap();</strong> 这里获取锁后从receiver获取到消息体,然后let message结束后rust的生命周期会自动释放掉锁。<br>) ^+ M* j+ V5 b9 v3 C$ W, J
但如果写成</p>
* h6 u0 F" X) h/ L<pre><code>while let message = receiver.lock().unwrap().recv().unwrap() {5 U" U* P6 i( C" k+ u
};
0 K' x+ x7 A! `( n</code></pre>& l' U4 [# \4 C0 g
<p>while let 后面整个括号都是一个作用域,要在这个作用域结束后,锁才会释放,比上面let message要锁定久时间。<br>+ k' `. H& Q, B7 o) [( J8 O
rust的mutex锁没有对应的unlock方法,由mutex的生命周期管理。</p>8 w' K. @( x+ z- }3 j
<p>我们给Pool实现<code>Drop</code> trait, 让Pool被销毁时,自动暂停掉worker线程的执行。</p>8 x9 ^3 k6 A  s, \: h9 \; G
<pre><code>impl Drop for Pool {7 b7 C; K- o3 ~
    fn drop(&amp;mut self) {
5 X4 U6 q5 L6 W# ~! B' L7 g: n        for _ in 0..self.max_workers {/ i! `. C% v! z: y) v, H% R
            self.sender.send(Message::ByeBye).unwrap();4 C7 D# T# o0 Y. v1 Q+ S
        }7 f$ U1 o4 Z4 ]9 H
        for w in self.workers.iter_mut() {: E# G) K% T* g% |
            if let Some(t) = w.t.take() {/ G' Y+ [9 a" \& n" `( Y
                t.join().unwrap();0 Y' V9 ~8 c5 B( d( Z5 ~8 B/ @
            }( }- Q! s6 J6 P
        }; A0 k0 M3 [# q) M1 ?1 T2 W3 i
    }
  Q/ g/ }, I) ~3 X}
7 A8 D( y, c) u7 T# k5 U, `7 C
5 x' T$ K" }- x7 ]7 k& s' N4 Q</code></pre>
, n1 c3 e' r  O" }3 m+ q<p><strong>drop方法里面用了两个循环</strong>,而不是在一个循环里做完两件事?</p>
! f. I) H* j" d5 P7 n+ K  l<pre><code>for w in self.workers.iter_mut() {
1 \. m0 y" X4 ~& _4 D7 ?' B6 I% h8 g    if let Some(t) = w.t.take() {8 g- [3 U' R; r) @3 ?
        self.sender.send(Message::ByeBye).unwrap();
0 o' K+ k8 F+ J2 J- @        t.join().unwrap();& h/ C  q7 G/ E" v3 h. D7 ~
    }/ ~/ r1 k1 }$ _/ a6 j
}
$ q, S8 J9 U& P* \' f
& X7 v' j. z5 W6 c</code></pre>
* B9 |$ r& V" p' P7 m, I<p>这里面隐藏了一个会造成死锁的陷阱,比如两个Worker, 在单个循环里面迭代所有Worker,再将终止信息发送给通道后,直接调用join,<br>
  Q- }5 e% p9 |" h3 g; {我们预期是第一个worker要收到消息,并且等他执行完。当情况可能是第二个worker获取到了消息,第一个worker没有获取到,那接下来的join就会阻塞造成死锁。</p>( i* q7 k8 x, G) |
<p><strong>注意到没有,Worker是被包装在Option内的</strong>,这里有两个点需要注意</p>
. F$ s6 w) `* M$ O2 D. j<ol>
4 b' V; W; ]  a4 }; I<li>t.join 需要持有t的所有权</li>0 Z) \2 E: x' Y2 D
<li>在我们这种情况下,self.workers只能作为引用被for循环迭代。</li>7 V8 ]  S( r5 h7 ^/ ^- \3 z
</ol>  I# J1 L9 R( X. b) w. O
<p>这里考虑让Worker持有<code>Option&lt;JoinHandle&lt;()&gt;&gt;</code>,后续可以通过在Option上调用take方法将Some变体的值移出来,并在原来的位置留下None变体。<br>
. K3 L' U1 {7 F- E/ v" N  x, Z9 l换而言之,让运行中的worker持有Some的变体,清理worker时,可以使用None替换掉Some,从而让Worker失去可以运行的线程</p>- P7 C. f' m' Z. q
<pre><code>struct Worker where
" R  B( c" Q! ~7 h1 }5 h# l% X{
9 ]6 C; @% }3 R# a- t# o* A    _id: usize,* I, t% ?9 @0 W+ X+ W* b; @* m
    t: Option&lt;JoinHandle&lt;()&gt;&gt;,: A" a! R/ E! s4 o! C2 t
}5 L: w9 R" w% f* P
</code></pre>; c1 X& M  J7 k1 M/ r. G- s
<h1 id="要点总结">要点总结</h1>. w& S0 o" b5 Q% P: @; Q  W, |2 ?
<ul>
1 [  ^' P2 `5 k; I- g, U9 h( W<li>Mutex依赖于生命周期管理锁的释放,使用的时候需要注意是否逾期持有锁</li>
% l( j9 ]& m! ?<li><code>Vec&lt;Option&lt;T&gt;&gt;</code> 可以解决某些情况下需要T所有权的场景</li>
( H/ H( ^0 d" x2 F</ul>
) T! ^) x" ^# h) _0 `( m<h1 id="完整代码">完整代码</h1>, Q/ Z7 y8 d, Q9 V/ D* r
<pre><code>use std::thread::{self, JoinHandle};+ ^4 t8 z4 Z3 x% Y( b
use std::sync::{Arc, mpsc, Mutex};
, x/ K- y* m, C( X8 v- o* \# t- [0 c) r5 Y6 ?
1 r5 S, T3 B: h- X, ~
type Job = Box&lt;dyn FnOnce() + 'static + Send&gt;;
8 u# w' Y0 s3 p& w3 y9 l2 ienum Message {5 c6 M' {, T5 G$ T8 Y2 }& G
    ByeBye,* ^8 \5 H. r( c# K5 `/ Q1 d* D7 z9 }' O
    NewJob(Job),
9 @% ^5 C/ h) t) ]! }+ `& ~2 B. O}
2 g( t, K: t0 E( p- |5 k" i# A1 p; j  M( I- G  l1 z7 o0 _
struct Worker where& k9 @& e# R- L0 l4 R
{2 H3 J9 z4 K5 R2 Y. O. o, e6 A* E6 b
    _id: usize,$ ]. |% x3 p2 n1 q9 o
    t: Option&lt;JoinHandle&lt;()&gt;&gt;,
' y7 m. M5 }. V- a. H}$ C) n: U1 `: `* z, s

  u4 L! c& p: b$ g5 z, {8 o6 Uimpl Worker4 [8 y( H2 e% a6 j
{/ _1 g# P7 Y# x# O! E: U7 |
    fn new(id: usize, receiver: Arc::&lt;Mutex&lt;mpsc::Receiver&lt;Message&gt;&gt;&gt;) -&gt; Worker {# l) ?* p1 r5 b" ]6 Z
        let t = thread::spawn( move || {  M. J- \$ s* s" N; z5 P, D
            loop {
. j7 X8 H- ^, C% V# M3 H                let message = receiver.lock().unwrap().recv().unwrap();
+ j( E0 a6 @8 k$ W& [                match message {% u( r" ~2 \8 x  M; D3 }! d! ~
                    Message::NewJob(job) =&gt; {: g  ]. n, F; d2 Y
                        println!("do job from worker[{}]", id);
/ F3 v, `& d6 I                        job();
; w: T% p! t' u! g3 n                    },3 w$ d2 K- u6 m& }
                    Message::ByeBye =&gt; {
7 ~* _; i% Q( K! U                        println!("ByeBye from worker[{}]", id);
8 u# ?, Q' I3 X5 ?: N9 F3 C7 ]! k                        break2 ?# H+ S: e4 e0 o" l
                    },/ T' d: j( i0 t& [$ H0 q9 B
                }  
8 r2 M9 S) J/ F            }
6 s+ v8 F& I1 {9 l* |' _        });, k/ f! {" X* h% F9 |- T: P* d
, s" d! @/ j" W" `& n  D# d
        Worker {
  z. b, @% m9 ?4 g& M# J4 A            _id: id,
) C8 b. C3 G/ v            t: Some(t),
2 d' V; O+ D7 q. b        }7 Z  W: F/ T$ E( |7 J, t2 k/ T0 Z
    }
; t( D! m, c$ T}8 i& U* X4 h$ V/ s) w$ i+ m

: W+ l8 I2 i, Y* ?pub struct Pool {$ ?# J- X  f$ `% `. [
    workers: Vec&lt;Worker&gt;,  y9 X: O3 L  c7 H( y
    max_workers: usize,- G- U+ @- G6 W& s9 T' }( E. c
    sender: mpsc::Sender&lt;Message&gt;
# I) a& Y- P; E! G: g' ?}
; ^9 m: \) {4 @! U. R2 m
0 I0 s/ `9 J8 Limpl Pool where {
/ D$ \; E6 H/ i    pub fn new(max_workers: usize) -&gt; Pool {; l  E- a' O, z
        if max_workers == 0 {8 \9 @: d. W5 q* {3 Q1 j
            panic!("max_workers must be greater than zero!")
4 K& v3 L% V; \0 W9 l, q% N        }
! y% R# Z! a- C/ N        let (tx, rx) = mpsc::channel();
# a. ]% W5 J" H0 V  @! w% {1 P! U  G' m8 s' u( x/ Y0 E' v
        let mut workers = Vec::with_capacity(max_workers);' a4 R) w- O, Q& r# F3 K
        let receiver = Arc::new(Mutex::new(rx));2 W3 k# ~5 V4 X- o# E. p
        for i in 0..max_workers {: Y5 A% ^8 A, {6 b) o& C
            workers.push(Worker::new(i, Arc::clone(&amp;receiver)));  R2 y" d# ^. r4 a5 g3 j
        }
5 @. q+ r7 V+ _: ~; X0 y& Y8 U' x  I6 \
        Pool { workers: workers, max_workers: max_workers, sender: tx }
  p. v9 M5 A, Q0 l8 Q9 O    }2 n, Q* j6 J. `/ Y6 ^
    8 V2 r# B" u+ N) l, Z
    pub fn execute&lt;F&gt;(&amp;self, f:F) where F: FnOnce() + 'static + Send
  o  e: x, _3 u3 [! H! S( B    {( K6 F/ _2 h3 |$ X

( d' V$ i) e: ?/ e1 b" t        let job = Message::NewJob(Box::new(f));$ g4 }- L. H& H3 H2 C
        self.sender.send(job).unwrap();
0 L- j+ C" X5 s9 D* ]6 X    }4 m4 f3 v- R6 c
}7 e8 C+ A6 c+ F0 K) j0 E0 V% H# f

) k$ G  [3 O' F0 q5 x1 {& mimpl Drop for Pool {( ?+ G  s% H" E- u. d% Z
    fn drop(&amp;mut self) {
% ^; e# @) ~. ^* [        for _ in 0..self.max_workers {3 P' n9 `& [% s& }. D
            self.sender.send(Message::ByeBye).unwrap();
5 I# q4 M) _/ O        }
  H' r* G  z* O8 @4 f        for w in self.workers {. @7 J, @0 H, n" V" I# ?: H+ d% e
            if let Some(t) = w.t.take() {
" `6 {* W' x& L! N; ~  A                t.join().unwrap();
5 y  G1 L# t3 b' R( D4 I            }
2 g2 H) G/ \4 l( m+ S2 t: S4 [        }
$ _# Y6 i5 L4 ]1 n" Q9 a    }/ V/ s# ]# S6 G- t
}
% a% x8 {2 J. u* u
" z3 q  \7 {4 G) U8 A% Z% V& D) Q0 D) L
#[cfg(test)]( r! n0 E+ g2 y* g- S6 X; t  q
mod tests {
- R7 G# p+ V9 o0 F& ]    use super::*;) P- f: n* C9 J- Z7 y' n
    #[test]
$ n! J$ b2 r. b    fn it_works() {
- u6 N  s( x7 T( \3 s        let p = Pool::new(4);4 o; S" }2 |! J3 i9 ]
        p.execute(|| println!("do new job1"));8 _7 {  \2 I2 B8 x/ N" \
        p.execute(|| println!("do new job2"));: {% v; z) q9 I2 J) {
        p.execute(|| println!("do new job3"));
  g% G# r) c" p8 c        p.execute(|| println!("do new job4"));
! e2 O9 V# x1 A" z' l+ \' C    }2 c0 Z( _# B+ W0 H
}5 T* s2 B1 B5 p
</code></pre>
' J6 W6 V+ o4 `0 j) `$ w4 P3 a; k+ q+ Q. Z) H6 L+ c& B: {% S( J) I' }
回复

使用道具 举报

懒得打字嘛,点击右侧快捷回复 【右侧内容,后台自定义】
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

手机版|飞雪团队

GMT+8, 2024-11-24 03:04 , Processed in 0.067322 second(s), 21 queries , Gzip On.

Powered by Discuz! X3.4

Copyright © 2001-2021, Tencent Cloud.

快速回复 返回顶部 返回列表