飞雪团队

 找回密码
 立即注册
搜索
热搜: 活动 交友 discuz
查看: 15152|回复: 0

rust 实战 - 实现一个线程工作池 ThreadPool

[复制链接]

8135

主题

8223

帖子

2万

积分

管理员

Rank: 9Rank: 9Rank: 9

积分
26735
发表于 2022-2-12 14:35:42 | 显示全部楼层 |阅读模式

+ f+ c4 n* N  ~1 P0 F, t( {' L<h1 id="如何实现一个线程池">如何实现一个线程池</h1>8 O% m6 r% c- X" j
<p>线程池:一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。 例如,对于计算密集型任务,线程数一般取cpu数量+2比较合适,线程数过多会导致额外的线程切换开销。</p>
( [/ W# I# Z3 q3 H/ Y% R. v6 M1 ^9 a* G<p>如何定义线程池Pool呢,首先最大线程数量肯定要作为线程池的一个属性,并且在new Pool时创建指定的线程。</p>
+ ?( R1 Z! `: i1 {. m& v6 e<p>线程池Pool</p>
0 i+ l% K3 W% z- A: C3 ~<pre><code>pub struct Pool {' R* Q, m$ H& b7 l( z- ]
  max_workers: usize, // 定义最大线程数6 @; j1 W* O& j/ F6 S; `) }5 A
}% I, e  u1 V+ r: c8 B

: W; d0 s. D% M% X8 u+ yimpl Pool {
8 ]" }0 r9 _+ F" C- x  fn new(max_workers: usize) -&gt; Pool {}1 ?1 t" T1 f$ x5 z7 B5 Y# P1 b
  fn execute&lt;F&gt;(&amp;self, f:F) where F: FnOnce() + 'static + Send {}: P+ \# C9 k$ A) O+ ?8 ?5 K/ p
}
# b" C7 b3 V! ]8 ]& D$ B% `5 U9 c6 a& w( u" `% j2 e+ j9 q9 x
</code></pre>
  z8 ]: Z5 I  \) X1 ]( j. D2 I<p>用<code>execute</code>来执行任务,<code>F: FnOnce() + 'static + Send</code> 是使用thread::spawn线程执行需要满足的trait, 代表F是一个能在线程里执行的闭包函数。</p>
  `( ?/ `( s+ e+ Y<p>另一点自然而然会想到在Pool添加一个线程数组, 这个线程数组就是用来执行任务的。比如<code>Vec&lt;Thread&gt;</code> balabala。这里的线程是活的,是一个个不断接受任务然后执行的实体。<br>
$ l' ]9 k: T! |1 x: U& K8 O5 K) _可以看作在一个线程里不断执行获取任务并执行的Worker。</p>" ]2 _" q* y, g7 G3 B2 y
<pre><code>struct Worker where0 S: d! A% Q+ S5 q0 d0 |7 l" R; k* ^% A
{6 O! k3 R6 T3 o) Z" l1 J% o4 Q
    _id: usize, // worker 编号. f& E7 n" U. ]& Y
}, R' A# H6 c$ v: W! V
</code></pre>2 C; I" `7 X/ `+ ]3 Z* e
<p>要怎么把任务发送给Worker执行呢?mpsc(multi producer single consumer) 多生产者单消费者可以满足我们的需求,<code>let (tx, rx) = mpsc::channel()</code> 可以获取到一对发送端和接收端。<br>
5 x0 F* R1 ~0 }$ l把发送端添加到Pool里面,把接收端添加到Worker里面。Pool通过channel将任务发送给多个worker消费执行。</p>
: b) H! s5 A6 E, f' X<p><strong>这里有一点需要特别注意,channel的接收端receiver需要安全的在多个线程间共享</strong>,因此需要用<code>Arc&lt;Mutex::&lt;T&gt;&gt;</code>来包裹起来,也就是用锁来解决并发冲突。</p>; {% o! p" k. F9 c6 F/ h
<p>Pool的完整定义</p>6 `5 [; u, I# U1 N% R
<pre><code>pub struct Pool {
0 U: a" [" h' c+ `    workers: Vec&lt;Worker&gt;,
7 L& E. y9 \9 k5 O* z& Y    max_workers: usize,* A1 a9 c5 H. i
    sender: mpsc::Sender&lt;Message&gt;2 t2 C5 b6 A) n' v5 t. R
}
+ g' E7 O! E: G3 t; h</code></pre>) p0 D& k1 q" S9 t
<p>该是时候定义我们要发给Worker的消息Message了<br>& o$ Y( s5 b2 D9 \/ X. F! z4 Y$ Z
定义如下的枚举值</p>% ~6 J6 Y5 l- S( H7 `& d
<pre><code>type Job = Box&lt;dyn FnOnce() + 'static + Send&gt;;
7 x% r7 p/ E1 `enum Message {
" l0 a4 z' {" R    ByeBye,/ K# f. f" |6 X! P
    NewJob(Job),9 O% O  H# }- f
}
2 E# q7 F" [) W6 M( U# d1 R  |</code></pre>
/ P( o! S4 X" G<p>Job是一个要发送给Worker执行的闭包函数,这里ByeBye用来通知Worker可以终止当前的执行,退出线程。</p>
1 y* H3 J, A* {: w8 M4 |' a<p>只剩下实现Worker和Pool的具体逻辑了。</p>9 p, M; i( H& {( U/ e! Y7 e: S
<p>Worker的实现</p>( w+ X/ E7 _6 ^$ t
<pre><code>impl Worker
8 s  o& @! S# L* X  {# b: G{
4 ]" t0 m1 |' G0 E) Q& C7 p    fn new(id: usize, receiver: Arc::&lt;Mutex&lt;mpsc::Receiver&lt;Message&gt;&gt;&gt;) -&gt; Worker {
, B. V' ]# G. i2 {8 E$ Y3 C        let t = thread::spawn( move || {$ D3 w$ V. I6 r$ T6 c/ S4 P
            loop {
. _" y& b1 y( F* d; W7 h                let receiver = receiver.lock().unwrap();
3 T9 {4 P7 ?, j/ q                let message=  receiver.recv().unwrap();4 K: c/ j: l1 k. E
                match message {
/ n" T( z3 f: q                    Message::NewJob(job) =&gt; {# e- [- k) \7 M5 [9 g
                        println!("do job from worker[{}]", id);9 r" b3 `. @1 b
                        job();9 k* I6 h5 u: a6 I8 L0 W
                    },
2 V/ G8 j; g. l1 |' t1 \                    Message::ByeBye =&gt; {
& ^0 ?$ V+ \6 d. P                        println!("ByeBye from worker[{}]", id);
( B! V5 H8 i% e                        break3 d' H3 `( K! o5 M4 B; _2 S+ \( Q
                    },
1 q' g$ @. t6 E9 b5 g: N                }  4 X5 l3 @( ?8 X  A6 ~9 n' Y$ {
            }
+ ~4 C- w, Y2 ?1 @( P: d1 q        });
4 U( {6 K1 K5 t+ d
, [  N# }- z3 _: y        Worker {
3 j$ |; q; e$ ?  `5 l            _id: id,
, q, V. b  h5 n. u$ w4 C4 w% v            t: Some(t),  Y. _+ {% c" ?: i0 \% S
        }
% k5 l* E- a, I0 U5 o    }
$ ]# A) i" Y- S0 t}
* H& n( z* f0 L; j& K- c- W  \+ [</code></pre>
+ t; H# S/ ]: F<p><strong>let message = receiver.lock().unwrap().recv().unwrap();</strong> 这里获取锁后从receiver获取到消息体,然后let message结束后rust的生命周期会自动释放掉锁。<br>- G( t- W' O2 t
但如果写成</p>
! V; J& y& q% a<pre><code>while let message = receiver.lock().unwrap().recv().unwrap() {3 y  p& L! Q$ Q2 E9 F9 c; L, J
};
& W3 t; j8 T) C2 R3 _! Z& e</code></pre>
& L2 u* Q5 M" B* c( x<p>while let 后面整个括号都是一个作用域,要在这个作用域结束后,锁才会释放,比上面let message要锁定久时间。<br>
- V) e2 t% ]  M3 d: U! {rust的mutex锁没有对应的unlock方法,由mutex的生命周期管理。</p>
  U+ I% C* c6 b/ @<p>我们给Pool实现<code>Drop</code> trait, 让Pool被销毁时,自动暂停掉worker线程的执行。</p>/ v$ f( g. k( U) J
<pre><code>impl Drop for Pool {+ E# k. B9 r% ?' k3 @# e( s
    fn drop(&amp;mut self) {
9 O" Y. V. d' W        for _ in 0..self.max_workers {5 e, v2 O& g* b2 w- Z. A# v
            self.sender.send(Message::ByeBye).unwrap();8 y( w7 S& o" l8 g
        }
# f. }$ n+ q$ m9 _( O2 A) a        for w in self.workers.iter_mut() {
1 p7 W( a* Z& J5 Y9 P5 K            if let Some(t) = w.t.take() {8 p3 W( o. i: b+ Q! [* z
                t.join().unwrap();" C1 H6 `8 g/ ?! c1 _
            }5 ?. {) u' M  d3 H* O1 V; j8 Q
        }/ p) q7 _$ x0 r6 p& h  s1 s
    }
5 `+ \2 l. S9 U. @}
7 _5 _1 H; S9 a: A7 n6 R' ~* k
- v/ R$ ?( b  W</code></pre>
: \* y- R& o7 L* ]* C<p><strong>drop方法里面用了两个循环</strong>,而不是在一个循环里做完两件事?</p>" j( v2 T5 L0 D0 M7 v1 M
<pre><code>for w in self.workers.iter_mut() {
# q+ B* p5 q" }    if let Some(t) = w.t.take() {6 n, o2 p) c; i5 {* i
        self.sender.send(Message::ByeBye).unwrap();8 E% m3 B6 V% P8 O) O6 R* C1 ]" k
        t.join().unwrap();
( T- b' _6 ?6 h. {5 {4 Z    }
3 x1 p$ {; ^2 \: }9 x& |% h}
, k% }! n& Z. W  D& P4 P& Q
6 S/ ^' N6 c# j. V- V) V7 H7 a+ _9 X</code></pre>
- r* t% s( }4 ]$ u) r, }. u0 L<p>这里面隐藏了一个会造成死锁的陷阱,比如两个Worker, 在单个循环里面迭代所有Worker,再将终止信息发送给通道后,直接调用join,<br>: U, d1 \( ]5 i# f) h
我们预期是第一个worker要收到消息,并且等他执行完。当情况可能是第二个worker获取到了消息,第一个worker没有获取到,那接下来的join就会阻塞造成死锁。</p>+ n/ X6 V9 w$ ]* Q3 _6 W$ h
<p><strong>注意到没有,Worker是被包装在Option内的</strong>,这里有两个点需要注意</p>
- h5 F( j( R4 _% G5 W8 G<ol>
8 Q. a& B2 z, I! T3 P$ ~" K7 ]<li>t.join 需要持有t的所有权</li>
( ]( Y' o8 E& b% l& G) M<li>在我们这种情况下,self.workers只能作为引用被for循环迭代。</li>
( y+ {: L, H7 m" D</ol>
4 B3 v( B( y/ y<p>这里考虑让Worker持有<code>Option&lt;JoinHandle&lt;()&gt;&gt;</code>,后续可以通过在Option上调用take方法将Some变体的值移出来,并在原来的位置留下None变体。<br>) X5 y# `3 a! w3 i" n; P1 x
换而言之,让运行中的worker持有Some的变体,清理worker时,可以使用None替换掉Some,从而让Worker失去可以运行的线程</p>) w0 r; E$ C' }  P; `% q
<pre><code>struct Worker where- u& n! W3 v9 U' Q1 u7 j' W
{! T; A6 {) Y4 h% }
    _id: usize,& B/ i7 U8 P& C; U2 k, h  `7 u
    t: Option&lt;JoinHandle&lt;()&gt;&gt;,0 q; r1 |, @7 b; ]' N% x
}
' c* M2 M8 s; y5 ?# d. Y</code></pre>0 b' H* a$ @+ y$ Q; ~# b7 M" {: ~
<h1 id="要点总结">要点总结</h1>
& }8 c5 A/ H+ M0 d/ @<ul>
8 k) A5 o* O7 ^' j# t<li>Mutex依赖于生命周期管理锁的释放,使用的时候需要注意是否逾期持有锁</li>. A. {2 w9 `0 J9 Z. E& r
<li><code>Vec&lt;Option&lt;T&gt;&gt;</code> 可以解决某些情况下需要T所有权的场景</li>
7 K& r- e! D* S+ `</ul>  ~4 o( a7 j. X! s" Y$ r
<h1 id="完整代码">完整代码</h1>& }, K9 b3 R/ U  ^  i
<pre><code>use std::thread::{self, JoinHandle};/ Y" u1 Q- A) I: q
use std::sync::{Arc, mpsc, Mutex};
1 Z$ M: z1 g) m" D. J8 V
& x) f2 B+ m( Z; v7 f/ f
& i* v4 ?( h# P' B, ?type Job = Box&lt;dyn FnOnce() + 'static + Send&gt;;
" g: x2 s8 x$ p& Y4 s' Renum Message {% T' o3 S0 B* C, ?$ j( Z8 F% a
    ByeBye,2 c5 \" o  s  g$ @( M
    NewJob(Job),
2 S3 ~$ O; e* H5 O/ b}
. l  W2 V2 ~7 c4 ^* |$ G1 Y; q
) Q/ F) V: t& {( m" ^: jstruct Worker where6 T# O; @8 q3 X) P9 A
{
3 j; n0 y! V" z3 @/ H5 E8 q    _id: usize,
) ]) ^3 m. h% U    t: Option&lt;JoinHandle&lt;()&gt;&gt;,
3 d& \: s: A, Q9 t# T3 q0 K: K}
% w( |5 m' V! E& ~9 Q! Z, s/ [  I1 M
impl Worker
0 e: @3 U! z* N. X3 b. c{& G* K# m4 N' Y1 d8 |4 E0 ^6 B
    fn new(id: usize, receiver: Arc::&lt;Mutex&lt;mpsc::Receiver&lt;Message&gt;&gt;&gt;) -&gt; Worker {. F1 L; b2 K; _, {1 S
        let t = thread::spawn( move || {
4 `! Z* p4 P4 b4 ~            loop {
: J7 a2 v3 ~3 C4 [                let message = receiver.lock().unwrap().recv().unwrap();
0 ^8 m. H6 v8 u4 a2 Q                match message {
, A2 y9 {3 h, Z  t& L# v4 _: _                    Message::NewJob(job) =&gt; {1 O; c6 @4 q. x* |! ]( Y( o
                        println!("do job from worker[{}]", id);
' L2 w  Q6 ?6 c! `/ K/ l; Y                        job();% j' [0 J1 S( K
                    },
6 v+ {7 l, m( t+ H9 O                    Message::ByeBye =&gt; {
: }8 ^+ t; b' L. r) m% N/ M                        println!("ByeBye from worker[{}]", id);
2 T+ Y  [+ `6 B) e/ ]8 l- m' c                        break( w! ~1 n- c4 |1 M* H
                    },
1 H6 k2 U% H# m& E6 l                }  
- Z/ @1 A$ d: S  H. ^. r            }: R5 ^9 w, _7 G6 A
        });
3 ?: J4 w7 R! c. ?3 p% u; M1 \$ w
: k/ u$ S% v$ y1 ~# E        Worker {
: n. Y1 |. B5 J  X! B8 M            _id: id,+ e- o2 D" L0 q6 M7 |) e
            t: Some(t),
5 E. g4 C( m) L) Y3 E        }9 B7 R; J  q# z  T4 C4 ~* h. b0 b
    }+ _6 D/ C3 ~% G, H7 W3 r) l
}
) c; U5 {3 ^, A: I6 d% i6 b, o- @$ T
5 }. e4 |1 B! @  f# }pub struct Pool {
9 B* k: Z7 l6 @" g; V    workers: Vec&lt;Worker&gt;,
/ f6 Y8 U1 t+ ?, r    max_workers: usize,: X: W* x: c7 O/ y  v
    sender: mpsc::Sender&lt;Message&gt;  g! r2 [& F) E3 z* C/ W* |
}
/ P; l0 J8 Q$ @% D9 [7 d' L  Y% x5 ~! p+ j6 t# L
impl Pool where {
" ?# `9 O5 ~! A. C  _- @    pub fn new(max_workers: usize) -&gt; Pool {& x0 J- S$ T" }  J
        if max_workers == 0 {
* d# ]" m0 P/ m  L            panic!("max_workers must be greater than zero!")
5 N" R5 w: R0 B  H# W        }
$ a# W2 F9 X/ O4 ^; u  R3 Z! o        let (tx, rx) = mpsc::channel();: g$ T/ f/ I- M0 X
7 f* N8 o$ A) Z, [8 T; d0 I: v+ ?
        let mut workers = Vec::with_capacity(max_workers);$ R7 L6 J- |- R" _2 G/ f" ^8 X  K$ [
        let receiver = Arc::new(Mutex::new(rx));  B5 p7 A$ c: [; A
        for i in 0..max_workers {
" w$ |' N  q, ]0 |            workers.push(Worker::new(i, Arc::clone(&amp;receiver)));: P2 w% i$ y2 w* Y) C1 d" d7 [
        }" ?3 a' y- L2 K, z( U: _

, B5 U' }+ f/ i6 {' b+ Q& U        Pool { workers: workers, max_workers: max_workers, sender: tx }- T' L' J! k8 x# z0 j9 L; B
    }; m9 E& W. Z9 i$ y
    9 ]* L; _" N* W( W* X
    pub fn execute&lt;F&gt;(&amp;self, f:F) where F: FnOnce() + 'static + Send
3 }6 F, s2 X4 A" V8 [    {% m: F: g' Z- w9 g+ n$ R. G

7 V6 ~& u5 W+ _; A1 r# J8 j- F        let job = Message::NewJob(Box::new(f));" D0 v$ I3 V. S! G) S1 ^( D
        self.sender.send(job).unwrap();
6 R3 X3 {+ Q1 |2 K: y, N% s    }6 [/ c* d4 M0 l$ S% V
}
- |" r# V% z4 ^7 a5 e
* E; H4 w; b! y/ {8 t7 ^  Dimpl Drop for Pool {
: }! T2 i2 }2 x% W, @    fn drop(&amp;mut self) {" ?; v( v2 ^# L+ G: i
        for _ in 0..self.max_workers {% u/ E7 |# H6 l
            self.sender.send(Message::ByeBye).unwrap();1 P- F7 _* p7 L1 G1 {( a0 e
        }
$ B/ V& b: I7 e( X- q        for w in self.workers {! `- C. F7 E+ R! e# r: T5 F. \; e3 v
            if let Some(t) = w.t.take() {# a% p  `! t! l# |7 B
                t.join().unwrap();# \6 C3 s7 C: k0 l
            }
$ Y5 _. y: x& H/ ~0 S8 `        }! C* h6 s/ d  O2 v$ m- X7 I, [
    }
& t. X8 c! x9 S0 n}6 Y! ^( `( F5 }
/ b& X& d' }& C5 V

- ?' D0 X" v) R& Z! ]. o3 D9 f1 i& _#[cfg(test)], E" \6 ?% K, A6 L* l7 e! p! m
mod tests {
& F* a" ^9 A$ G    use super::*;
% ~4 S8 n4 h, d! K1 D" ^% B& q    #[test]) `9 y+ d! k  K2 v9 H2 r( |( Y
    fn it_works() {2 l6 n3 i' w: \0 V
        let p = Pool::new(4);
$ |" l3 X& h- w8 T5 [/ D7 ?8 f        p.execute(|| println!("do new job1"));
/ i. M& H$ x: v& P, G. Q  i: f        p.execute(|| println!("do new job2"));  g) n8 X  U1 k% w  n0 U! a
        p.execute(|| println!("do new job3"));- r4 S- l/ {7 s6 m9 n) M$ y  V
        p.execute(|| println!("do new job4"));
, E0 g, i1 K$ u& d9 z5 T& z    }
2 s% [$ ^7 f/ x% _}0 ?* @& t' Q' C2 F* B
</code></pre># E( o0 U+ b4 _" k
2 F( y, d! S6 n0 V9 o# e7 J$ X
回复

使用道具 举报

懒得打字嘛,点击右侧快捷回复 【右侧内容,后台自定义】
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

手机版|飞雪团队

GMT+8, 2026-1-2 20:05 , Processed in 0.068551 second(s), 22 queries , Gzip On.

Powered by Discuz! X3.4

Copyright © 2001-2021, Tencent Cloud.

快速回复 返回顶部 返回列表