飞雪团队

 找回密码
 立即注册
搜索
热搜: 活动 交友 discuz
查看: 14824|回复: 0

rust 实战 - 实现一个线程工作池 ThreadPool

[复制链接]

8058

主题

8146

帖子

2万

积分

管理员

Rank: 9Rank: 9Rank: 9

积分
26504
发表于 2022-2-12 14:35:42 | 显示全部楼层 |阅读模式

  O% D8 v: w7 L8 W8 B4 t( D7 c8 v3 Y<h1 id="如何实现一个线程池">如何实现一个线程池</h1>  r* G" i( a5 ~  B
<p>线程池:一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。 例如,对于计算密集型任务,线程数一般取cpu数量+2比较合适,线程数过多会导致额外的线程切换开销。</p>
- Q- @/ N' _0 V9 l0 ?/ i/ Y<p>如何定义线程池Pool呢,首先最大线程数量肯定要作为线程池的一个属性,并且在new Pool时创建指定的线程。</p>
  a6 Q! H3 C- `+ ^- ]- t<p>线程池Pool</p>
' j7 R! A# e, J' I<pre><code>pub struct Pool {# n  }4 @6 a4 F$ W6 |/ ]' _! }
  max_workers: usize, // 定义最大线程数
' Y& w" R! r% S( F7 Q. C}' R4 z* c' M) A, }8 H- C
0 _  j3 P( s' @
impl Pool {
2 `- X( I$ N: F4 Y  fn new(max_workers: usize) -&gt; Pool {}$ v2 L4 {  V0 k5 v6 Q4 n$ @
  fn execute&lt;F&gt;(&amp;self, f:F) where F: FnOnce() + 'static + Send {}8 {; s/ u( Q5 G9 K6 o0 y  l$ f
}
6 B3 M0 |' E$ y$ ^: g# n# W1 |2 ~- U: J8 ]/ ?  p" G
</code></pre>! _1 y4 C! e  |5 C5 e+ b
<p>用<code>execute</code>来执行任务,<code>F: FnOnce() + 'static + Send</code> 是使用thread::spawn线程执行需要满足的trait, 代表F是一个能在线程里执行的闭包函数。</p>8 S' e) z' e1 \8 S0 j
<p>另一点自然而然会想到在Pool添加一个线程数组, 这个线程数组就是用来执行任务的。比如<code>Vec&lt;Thread&gt;</code> balabala。这里的线程是活的,是一个个不断接受任务然后执行的实体。<br>$ G6 Y+ L1 `/ x0 {
可以看作在一个线程里不断执行获取任务并执行的Worker。</p>& J3 {4 L, O/ N: V
<pre><code>struct Worker where7 ?# X  Q0 @1 X6 n8 g. V
{2 f. r2 z$ ~3 v, ?" m! }+ Q
    _id: usize, // worker 编号( T4 O! `+ {; q9 w0 b  @! v. g
}
7 j2 e3 }( H- ~5 A8 [# s</code></pre>% ^6 H& R. M) w! d0 O$ w
<p>要怎么把任务发送给Worker执行呢?mpsc(multi producer single consumer) 多生产者单消费者可以满足我们的需求,<code>let (tx, rx) = mpsc::channel()</code> 可以获取到一对发送端和接收端。<br>
% ^2 Z& M( {/ z% B( q$ a- a6 A4 ~把发送端添加到Pool里面,把接收端添加到Worker里面。Pool通过channel将任务发送给多个worker消费执行。</p>
$ I: Z7 B7 r! K1 I7 e* l) M<p><strong>这里有一点需要特别注意,channel的接收端receiver需要安全的在多个线程间共享</strong>,因此需要用<code>Arc&lt;Mutex::&lt;T&gt;&gt;</code>来包裹起来,也就是用锁来解决并发冲突。</p>/ X8 e- S" i2 A/ O0 C2 d
<p>Pool的完整定义</p>- G( h) l4 K5 Q9 C6 H" f
<pre><code>pub struct Pool {
, V( O& O# K. X7 P5 C: a    workers: Vec&lt;Worker&gt;,5 b$ D6 O  @/ s% e# G; v6 P
    max_workers: usize,( H6 r! g) d) f8 x' J
    sender: mpsc::Sender&lt;Message&gt;
3 k8 k" R8 l6 j$ {0 J  `6 _$ }5 y}
/ `# ?) x' a# Q: G# L</code></pre>/ ~' p. M$ r) j( u9 \' Q) e
<p>该是时候定义我们要发给Worker的消息Message了<br>: V& q! Y7 U/ i3 |+ ^. B
定义如下的枚举值</p>
2 Z# A% i% e, q0 e# u8 h- U* j! r<pre><code>type Job = Box&lt;dyn FnOnce() + 'static + Send&gt;;1 X9 G! ?" u% H
enum Message {
; {4 n$ [; Q( d# ?! P2 k: X) N- I    ByeBye,2 ?! l9 h/ K& i; N
    NewJob(Job),
1 j" q; ]% [4 v" ]( r; T}  g4 x; K+ m2 B. i- C- Z/ Y' j
</code></pre>1 I2 C( \3 P4 k0 L3 T0 o
<p>Job是一个要发送给Worker执行的闭包函数,这里ByeBye用来通知Worker可以终止当前的执行,退出线程。</p># G0 V# G& `: G3 R
<p>只剩下实现Worker和Pool的具体逻辑了。</p>
- D+ `- b+ R! d4 O<p>Worker的实现</p>
( `5 S7 G7 M" @$ g; _<pre><code>impl Worker
  }! I& n! W8 ^; |5 E" w{5 s$ P( {! G- ^8 v
    fn new(id: usize, receiver: Arc::&lt;Mutex&lt;mpsc::Receiver&lt;Message&gt;&gt;&gt;) -&gt; Worker {/ ?, P6 j$ T# X6 ]
        let t = thread::spawn( move || {
+ B1 o0 U4 D1 A: e  j9 d" Z            loop {
4 q. h. s1 X$ q2 H& M                let receiver = receiver.lock().unwrap();
" b4 X% X' g1 b* K1 m# M                let message=  receiver.recv().unwrap();
: A' Q7 W' q  E$ o                match message {
9 F' S; b, c# h                    Message::NewJob(job) =&gt; {: w7 g% s4 A3 T0 P
                        println!("do job from worker[{}]", id);
2 X( j% S- x: r' ]5 q) C; M                        job();
3 y, Q- n, U6 k* w" l; N                    },
5 [: O- N! N1 @! h+ Y- A7 Q  B                    Message::ByeBye =&gt; {
: _7 ^5 g' r0 G; k' w' ^                        println!("ByeBye from worker[{}]", id);
4 i, `  Z$ N  ~6 M* x9 r" j$ c                        break; D( ^, d: ?: ~0 \$ k: ~9 @0 J
                    },1 \$ z- v/ b* b) f* J; Q/ n
                }  3 H- T- H; R( p' y
            }3 [! I- p% O+ U; l& S+ j, L2 @
        });0 q  Z, {1 ]* i, e7 X4 x3 f
- l6 j& ^  ]' ~% }: o0 r/ {
        Worker {
3 g# \6 w5 d. ?' l# V) e- F            _id: id,0 S  B9 Z: V4 u5 p+ o# b, ~
            t: Some(t),
* F% _) }% V) [1 n/ d7 k, p        }9 ^" f" P/ F9 X2 d) A: f  J
    }
/ L& Y3 w6 D: |- b}+ v# s! E" k% I
</code></pre>; p' M  f- W. S. T  @8 J# A
<p><strong>let message = receiver.lock().unwrap().recv().unwrap();</strong> 这里获取锁后从receiver获取到消息体,然后let message结束后rust的生命周期会自动释放掉锁。<br>
3 L+ Q* G* j7 S' Q( |% b! j但如果写成</p>* z8 a# S/ G2 ?' _1 g" M  y& t  r! m
<pre><code>while let message = receiver.lock().unwrap().recv().unwrap() {7 @4 x; v5 a. e% I0 Q/ @$ h0 z
};# E: H0 p6 I% e2 P
</code></pre>
, d& f2 ]2 e1 V& \8 U! H4 ?7 r8 ~<p>while let 后面整个括号都是一个作用域,要在这个作用域结束后,锁才会释放,比上面let message要锁定久时间。<br>" w5 ^# Z) A1 }5 ^4 Z5 K6 D
rust的mutex锁没有对应的unlock方法,由mutex的生命周期管理。</p>; c" x+ K3 t) h# o' v$ S$ S
<p>我们给Pool实现<code>Drop</code> trait, 让Pool被销毁时,自动暂停掉worker线程的执行。</p>
" b" k. W. J, w( r  }: s- |0 L<pre><code>impl Drop for Pool {2 Z8 Y- J; s" L7 k4 O$ i
    fn drop(&amp;mut self) {; c/ a# a* r2 M6 p% J' |# J
        for _ in 0..self.max_workers {
7 ~4 H6 e% [0 H7 e5 F            self.sender.send(Message::ByeBye).unwrap();9 B1 h( ], K' N/ C% j3 }; y; a
        }: ?  m1 B- j: F% u+ x7 |$ H
        for w in self.workers.iter_mut() {$ U, N& {, A, I6 I$ \
            if let Some(t) = w.t.take() {; A4 K+ q7 V3 B* p8 ?5 F) F5 U- u
                t.join().unwrap();
+ `5 B- o6 b* ~. ?            }
9 e3 J  L) i1 X& u( v1 }        }/ P- w. @  Z) d: T* ?
    }& L5 y' W' L" j6 z0 I. }- P" }
}
! n! p2 B. _) }" x3 K; E3 r! }' k6 \# d5 g8 r2 o0 h2 ~4 `
</code></pre>& _0 [  p; D( m
<p><strong>drop方法里面用了两个循环</strong>,而不是在一个循环里做完两件事?</p># b. K+ O% v* F: v$ r% V* w
<pre><code>for w in self.workers.iter_mut() {9 z- a& y4 U/ y7 W# p7 p
    if let Some(t) = w.t.take() {7 u9 Q! L2 X! ]7 p# f& d9 M! P* Y
        self.sender.send(Message::ByeBye).unwrap();
3 r) S0 N% B. ]2 a, ^        t.join().unwrap();3 b/ ~( R! u( u9 k
    }, y: s4 L! }& \+ e+ x2 `/ Q
}( ~5 \( H- \) B3 z6 M
. K, X: W4 n% R. y. p, L  f
</code></pre>
$ X" h% R, z( n* G! C1 H6 T# B, r$ [; k<p>这里面隐藏了一个会造成死锁的陷阱,比如两个Worker, 在单个循环里面迭代所有Worker,再将终止信息发送给通道后,直接调用join,<br>  D8 R) ]2 r+ A
我们预期是第一个worker要收到消息,并且等他执行完。当情况可能是第二个worker获取到了消息,第一个worker没有获取到,那接下来的join就会阻塞造成死锁。</p># Z$ ^2 f! |9 U9 q* X
<p><strong>注意到没有,Worker是被包装在Option内的</strong>,这里有两个点需要注意</p>$ z. ^# v: C: J$ b8 s
<ol>
4 N' d; F0 ]$ X$ Z  D5 @<li>t.join 需要持有t的所有权</li>
7 q8 u$ z  ]/ O. w- m" Q' Y<li>在我们这种情况下,self.workers只能作为引用被for循环迭代。</li>
% Y% ~# v& k5 |: _, b</ol>
& |! `7 {$ g' @7 J& K+ K<p>这里考虑让Worker持有<code>Option&lt;JoinHandle&lt;()&gt;&gt;</code>,后续可以通过在Option上调用take方法将Some变体的值移出来,并在原来的位置留下None变体。<br>
( d) E0 e6 m) L/ U换而言之,让运行中的worker持有Some的变体,清理worker时,可以使用None替换掉Some,从而让Worker失去可以运行的线程</p>$ l6 O0 |) j- E
<pre><code>struct Worker where% h1 P  {) |( f
{
3 l% t& ^6 |2 G5 ~6 b3 u2 h$ I    _id: usize,9 ^* M$ C6 y1 c
    t: Option&lt;JoinHandle&lt;()&gt;&gt;,
: c1 R4 t: ^4 q% X' W}& X) `& \- ?+ q2 g* X
</code></pre>
0 I/ Q8 g5 b1 K! d7 `<h1 id="要点总结">要点总结</h1>' S2 z7 T% y0 _. p3 I5 t! o
<ul>6 F, r" f" R! h4 y, f. h
<li>Mutex依赖于生命周期管理锁的释放,使用的时候需要注意是否逾期持有锁</li>5 ?! U+ [! ^! X9 |$ u+ x8 }) P
<li><code>Vec&lt;Option&lt;T&gt;&gt;</code> 可以解决某些情况下需要T所有权的场景</li>: x- X+ N% _9 s; _+ a' C) Y
</ul>
1 P, R' X/ @' {$ B+ u<h1 id="完整代码">完整代码</h1>
* t) U- V7 Z9 e+ k5 v) `( W" @<pre><code>use std::thread::{self, JoinHandle};
. M! Z$ I7 p$ [4 Quse std::sync::{Arc, mpsc, Mutex};
) z; o! Q. m: D8 E8 T2 h% d
2 l7 K# V" t! Q: d1 q, \
* S1 @7 k/ F+ R! b2 Ttype Job = Box&lt;dyn FnOnce() + 'static + Send&gt;;
# |( E; L+ e0 W5 Venum Message {
- _1 j* J. t/ d    ByeBye,
" A% w) G" c2 r. J  T" Q/ q+ n    NewJob(Job),$ F1 t5 _( [. u" ?4 y. ^9 D: }5 R
}
5 `0 Z# e. e. y
% R, V/ f$ @$ F$ P/ _" gstruct Worker where
6 j' H' c( z+ P# i& T{! C$ b1 {1 ~1 [8 s& W
    _id: usize,3 A2 B  e$ `% ?0 G. Q
    t: Option&lt;JoinHandle&lt;()&gt;&gt;,
! P4 I6 I! d$ a}
5 z4 c0 a. j2 y# |/ `1 U
% b6 b2 K( ?& Z% u' W  jimpl Worker3 Y5 V, I& \9 g! I' c) Y
{7 k, R: C% C, P7 ]! X$ _- E6 i7 _8 u
    fn new(id: usize, receiver: Arc::&lt;Mutex&lt;mpsc::Receiver&lt;Message&gt;&gt;&gt;) -&gt; Worker {" Z! _; v0 l3 L
        let t = thread::spawn( move || {
. J7 h# F6 |1 g4 f! H            loop {0 z' ^7 j8 u; S3 T1 [2 m
                let message = receiver.lock().unwrap().recv().unwrap();! g! q/ q  v# Q) G+ |, y
                match message {; g$ e" E, |5 t
                    Message::NewJob(job) =&gt; {  ^7 k" `* r  h; f! v% b9 R1 d
                        println!("do job from worker[{}]", id);/ X* H0 [; g8 [* Z: ]& c
                        job();1 t5 _4 G1 b+ `0 n* `
                    },
! B" w8 p  |. }$ v- l! J                    Message::ByeBye =&gt; {
1 {: S( O3 E6 H) C1 a                        println!("ByeBye from worker[{}]", id);; s/ t/ ]; l# R/ x( i: d7 T
                        break
4 f5 a: Y  T: u# y1 L  C4 C                    },
9 E, R1 ]3 M" K4 E5 H                }  , l1 }9 }; D( X
            }, |& A9 N. v1 U: l
        });
, A, [4 \4 f# @& l& @
0 n5 c/ i6 Y' |+ X        Worker {. a0 @3 @; @) f- @6 N: k$ F/ Y
            _id: id,
/ v% O3 c. a# i" }+ k# T- r            t: Some(t),
( ]0 C3 |% R; {5 h' j" Y' S; c+ b        }
8 K& g/ Q" j# O9 A6 q$ ~) l0 s6 [    }1 O; W# r# C! z% E& \
}7 U: q3 V! ~; ]7 M9 d4 j$ D

6 D6 U" a' G' B/ \  upub struct Pool {: [2 o) r; c: J; z
    workers: Vec&lt;Worker&gt;,
  F/ a" Y! X; A# M  j! t" m' H+ @    max_workers: usize,6 \& C: c! r! v% {% e/ k
    sender: mpsc::Sender&lt;Message&gt;
* S( U, y1 _( p' B) i: A5 d}
2 y* t! J1 G; \0 ^% B
2 A2 \0 j& B3 C9 |: _impl Pool where {
. ^  `# X4 p! s. V' e4 l    pub fn new(max_workers: usize) -&gt; Pool {
' S% V1 k' {( J8 V5 I        if max_workers == 0 {: N: y8 z, C$ t% ?/ b& K5 w' i7 O
            panic!("max_workers must be greater than zero!")2 }  c5 A, B! ]  D1 ~1 r
        }
. ?) G! N( @. T/ G' F3 r        let (tx, rx) = mpsc::channel();
/ U" G& I! y0 G1 a. e9 @3 X+ J1 i: ^3 e( P: s' L
        let mut workers = Vec::with_capacity(max_workers);
6 F8 ^: ~, v. h# b6 B$ \! u; p        let receiver = Arc::new(Mutex::new(rx));
3 D! t+ b( y5 v5 S0 S  |' }        for i in 0..max_workers {6 I* T' {7 [4 N" V8 j! ?
            workers.push(Worker::new(i, Arc::clone(&amp;receiver)));
5 f9 J) X; w6 q# Z$ o5 v# L        }
2 T* O5 n1 M; F+ Y- t# b+ a+ ~5 u
        Pool { workers: workers, max_workers: max_workers, sender: tx }
& a: G, R8 r3 `    }; N1 w# O6 q& K9 D% _3 z
    % S5 K3 l6 N6 u5 q* m* e; c
    pub fn execute&lt;F&gt;(&amp;self, f:F) where F: FnOnce() + 'static + Send/ J9 F; |* `. v7 y8 i. a3 r4 x
    {
$ J3 G( w  D& o, }2 ^# {( s% ]8 d- }7 t/ u* L, q* d
        let job = Message::NewJob(Box::new(f));  ?5 v2 ^& E* T# i1 @0 t
        self.sender.send(job).unwrap();
/ ^4 r8 D- s" Y    }  G1 B& r; j& E2 }$ \( |, c$ ]
}
( Q' L" e# M2 Z% f/ a' K3 [
6 y9 D$ {6 ?. O" Y1 ]impl Drop for Pool {
/ Z/ d" P7 O: o$ M. P- T) S1 H    fn drop(&amp;mut self) {
% k  O$ t" |, _        for _ in 0..self.max_workers {
6 W5 t- a$ u2 q- t: Z% j; l6 q  ?+ f! }            self.sender.send(Message::ByeBye).unwrap();1 `" [5 [9 v6 }' Q
        }
5 x$ O2 k- F! C) ^* e; ~7 c9 D        for w in self.workers {3 K% Z- z  G+ q: Z6 G
            if let Some(t) = w.t.take() {2 t  p8 U9 {6 k. X; r: f
                t.join().unwrap();
0 J; Q: U* _; m0 f/ d/ G! V            }7 O5 ~4 o. N" Z
        }
* \8 v' N$ W/ X# z/ P' Q" Z' {    }
+ D" I- x: E3 ^  ]8 S7 v5 [0 e}, }1 |! V5 ^. r, Q! N

% a/ O3 i( J: w; T1 u  a
; G1 F4 n( [% G1 }#[cfg(test)]
7 r1 h! M& o7 B5 ]mod tests {
8 U' \, v* L) @' a    use super::*;
3 |9 \3 d; s+ g4 `5 G( u& T    #[test]5 i. i' I* ?& V3 l  M, s
    fn it_works() {
: k+ I/ [5 m0 d! p+ B) I* q        let p = Pool::new(4);, Z% x/ {: W6 H% V. l: p1 X' U/ J2 _
        p.execute(|| println!("do new job1"));
+ j: A2 c) L8 f5 J$ J' K        p.execute(|| println!("do new job2"));
8 u  u( d+ [- v8 C+ A. A        p.execute(|| println!("do new job3"));
' K1 V1 L+ ]/ O7 \1 i# x        p.execute(|| println!("do new job4"));
' B/ p, {% g% n6 G7 E+ i    }
  O" b7 I/ G  W& S}/ K( ]4 a: _7 v# `
</code></pre>& f$ _& x7 E& W

# M) N/ H7 k5 P# A
回复

使用道具 举报

懒得打字嘛,点击右侧快捷回复 【右侧内容,后台自定义】
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

手机版|飞雪团队

GMT+8, 2025-12-13 19:13 , Processed in 0.081543 second(s), 22 queries , Gzip On.

Powered by Discuz! X3.4

Copyright © 2001-2021, Tencent Cloud.

快速回复 返回顶部 返回列表