|
|
; g; y& s& d' d" }9 E<h4 id="flink系列文章">Flink系列文章</h4>
- \# H0 o: }7 ^- M) k<ol>& {& ?8 `/ A( K4 P
<li><a href="https://www.ikeguang.com/?p=1976">第01讲:Flink 的应用场景和架构模型</a></li>+ \: K+ X: h. v7 H" s; N
<li><a href="https://www.ikeguang.com/?p=1977">第02讲:Flink 入门程序 WordCount 和 SQL 实现</a></li># h: G9 j# w' V% X" ?6 H8 F' `$ b1 p
<li><a href="https://www.ikeguang.com/?p=1978">第03讲:Flink 的编程模型与其他框架比较</a></li>
! [2 Z2 m! E0 ^) q9 u<li><a href="https://www.ikeguang.com/?p=1982">第04讲:Flink 常用的 DataSet 和 DataStream API</a></li>
5 b, x% `0 d6 T! \3 i" O9 ^<li><a href="https://www.ikeguang.com/?p=1983">第05讲:Flink SQL & Table 编程和案例</a></li>, \# B! k; c8 u6 p
<li><a href="https://www.ikeguang.com/?p=1985">第06讲:Flink 集群安装部署和 HA 配置</a></li>* B1 y+ I0 J/ I
<li><a href="https://www.ikeguang.com/?p=1986">第07讲:Flink 常见核心概念分析</a></li>- c9 ^& a" T; x* s* A
<li><a href="https://www.ikeguang.com/?p=1987">第08讲:Flink 窗口、时间和水印</a></li>& l" h# ^* G9 a8 b9 c
<li><a href="https://www.ikeguang.com/?p=1988">第09讲:Flink 状态与容错</a></li>
- W1 O! l# I7 R. a" \ `</ol>
( u5 ~8 ^1 r+ m, K: i2 H1 J$ M<blockquote>
& N ]' e0 _! k, o<p>关注公众号:<code>大数据技术派</code>,回复<code>资料</code>,领取<code>1024G</code>资料。</p>
$ p- Y8 N/ p* f- K# m) e</blockquote>
0 [5 d2 M3 N; [" p5 O2 |3 T<p>这一课时将介绍 Flink 中提供的一个很重要的功能:旁路分流器。</p>
1 S- W8 u& F* z<h3 id="分流场景">分流场景</h3>6 j8 L0 N7 a# M0 ]4 P" Y
<p>我们在生产实践中经常会遇到这样的场景,需把输入源按照需要进行拆分,比如我期望把订单流按照金额大小进行拆分,或者把用户访问日志按照访问者的地理位置进行拆分等。面对这样的需求该如何操作呢?</p>: N) [) p( P8 E. y1 {
<h3 id="分流的方法">分流的方法</h3>
! w: }! J0 V% V# T: A8 v+ p<p>通常来说针对不同的场景,有以下三种办法进行流的拆分。</p>
! E* ?' U* }3 _/ v9 ]6 ?6 c<h4 id="filter-分流">Filter 分流</h4>
- g/ j* y% a M) O<p><img src="https://kingcall.oss-cn-hangzhou.aliyuncs.com/blog/img/CgqCHl7CAy6ADUaXAACSFUbdpuA911-20210223084827182.png" ></p>5 A0 G& b$ k/ ], B1 f+ q8 F) Q
<p>Filter 方法我们在第 04 课时中(Flink 常用的 DataSet 和 DataStream API)讲过,这个算子用来根据用户输入的条件进行过滤,每个元素都会被 filter() 函数处理,如果 filter() 函数返回 true 则保留,否则丢弃。那么用在分流的场景,我们可以做多次 filter,把我们需要的不同数据生成不同的流。</p>$ }0 K6 o3 K E
<p>来看下面的例子:</p>- S9 F( a" {# T# |* g0 k6 e4 g0 X5 X
<p>复制代码</p>
3 M: o4 q, `0 a D<pre><code class="language-java">public static void main(String[] args) throws Exception {2 o- [+ \$ P$ W( k- ?/ b
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
7 e5 c5 H. ^) E4 Q; y //获取数据源. T. I- I0 m1 v+ Z
List data = new ArrayList<Tuple3<Integer,Integer,Integer>>();& G5 v% @$ l& ~: u8 f
data.add(new Tuple3<>(0,1,0));1 ]: P, L2 A& u5 [
data.add(new Tuple3<>(0,1,1));2 |2 d: E, R0 H* z! j
data.add(new Tuple3<>(0,2,2));
% t c# B$ ~& o) g1 w data.add(new Tuple3<>(0,1,3));
( X/ Y+ \' f7 S- b0 R5 i8 T2 d5 _9 Z data.add(new Tuple3<>(1,2,5));; L7 S8 R% S6 }" ]7 E, Z
data.add(new Tuple3<>(1,2,9));4 S* A& ^ x1 X: |& ~5 X* ~- n1 P
data.add(new Tuple3<>(1,2,11));
% |' `' ?1 u5 w( I) q data.add(new Tuple3<>(1,2,13));
/ n% V3 X" _5 Y) E+ |7 P. K
1 U: N1 w4 D) i DataStreamSource<Tuple3<Integer,Integer,Integer>> items = env.fromCollection(data);
$ M" v. s8 i/ r8 Y
# `' j2 h j6 u+ [$ |! x, a6 \0 n% m& q
8 y' K4 E! _3 @- O- C" y
SingleOutputStreamOperator<Tuple3<Integer, Integer, Integer>> zeroStream = items.filter((FilterFunction<Tuple3<Integer, Integer, Integer>>) value -> value.f0 == 0);2 j! \1 k4 C1 @" c+ a# |' W
4 Z: \6 S( u. N) ~* a
SingleOutputStreamOperator<Tuple3<Integer, Integer, Integer>> oneStream = items.filter((FilterFunction<Tuple3<Integer, Integer, Integer>>) value -> value.f0 == 1);& `9 O8 i3 K; T6 j7 D
% f7 i' J) m d0 ~9 y" c
; W4 j6 k4 `- I2 G- M) b3 D- ]8 i; o9 l4 I* j* |0 `: a
zeroStream.print();" g( T. v4 y8 s
# \) y: A" P# k- F& c4 B
oneStream.printToErr();
/ W0 n4 `2 P$ t! X- m9 M' d j1 F
5 |* ^5 G* o) |1 ~$ r
3 H+ i+ o/ ]! @8 h4 a3 K2 E2 a' l
3 {! [9 W n3 w% H% ?4 E$ \0 I& T) M; j
8 X1 e) Y- P. o, j
//打印结果# s# F( ]% v) H7 d$ t# j- I+ I4 @
- I& `" A( G2 N& k: o String jobName = "user defined streaming source";( H! f7 @+ J8 u
* p8 a& R! Z ]" W) p env.execute(jobName);
% u9 _; k0 I. x
& l0 d5 y2 B+ K6 H$ y}
% _# c- S/ ]' \7 a6 F! y6 [0 T</code></pre>
4 S) {; l& z0 @ D5 z1 G<p>在上面的例子中我们使用 filter 算子将原始流进行了拆分,输入数据第一个元素为 0 的数据和第一个元素为 1 的数据分别被写入到了 zeroStream 和 oneStream 中,然后把两个流进行了打印。</p>
$ g0 T+ G9 k+ U x3 d( U8 b1 K<p><img src="https://kingcall.oss-cn-hangzhou.aliyuncs.com/blog/img/Ciqc1F7CA2WAYbshAAKj494h86s723-20210223084827296.png" ></p>
; h1 R! y* e! X+ u% h# M<p>可以看到 zeroStream 和 oneStream 分别被打印出来。</p>. o U1 Z* B4 n$ l
<p>Filter 的弊端是显而易见的,为了得到我们需要的流数据,需要多次遍历原始流,这样无形中浪费了我们集群的资源。</p>
' M& S- m$ s. {) J5 c- }<h4 id="split-分流">Split 分流</h4>; \: T6 f3 A8 B) `1 Q
<p>Split 也是 Flink 提供给我们将流进行切分的方法,需要在 split 算子中定义 OutputSelector,然后重写其中的 select 方法,将不同类型的数据进行标记,最后对返回的 SplitStream 使用 select 方法将对应的数据选择出来。</p>
7 I; g* z/ K7 T<p>我们来看下面的例子:</p>' @# x- c* q& r; W- V% a* n, T2 t; A
<p>复制代码</p>! N, s/ l% g" O# U A) Q; ~" R
<pre><code class="language-java">public static void main(String[] args) throws Exception {/ v/ b I* {) k$ p
/ D U( e) R' B7 u; W
$ c; b% [# N& h8 O" Z2 s
, @; Q* Z$ k2 k( E4 F9 S StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();' v0 G& r1 G2 r ]8 `3 ^
9 A( T5 R6 n) }" ?5 F9 z' o //获取数据源
5 d) q- x0 v8 Y
- `5 Y4 y& K3 ]1 N3 f: p+ ]3 t; R+ g List data = new ArrayList<Tuple3<Integer,Integer,Integer>>();1 y. J; {; D: X. B- X N
0 v9 ^: v# |) {% h6 E: o data.add(new Tuple3<>(0,1,0));
3 c% z. R+ p" r; f9 Z( X! L+ @4 s7 k% S) D( G g% X+ U
data.add(new Tuple3<>(0,1,1));
2 L6 d. y. j$ {* [4 U1 ^6 i! c% u4 R
data.add(new Tuple3<>(0,2,2));, O- F6 Z) A. L
: E$ R; d R: T4 g& S( O data.add(new Tuple3<>(0,1,3));3 r5 Q! z( E- T$ Q, h' ^
/ @4 i- Y. J* e' W7 ~) e
data.add(new Tuple3<>(1,2,5));4 f- W. v3 x' b3 o8 e$ { U
! P5 d# n" n3 F# H' d, [ data.add(new Tuple3<>(1,2,9));
& l# ]3 @* D! D* Y: l2 S C: x1 ^. d, i
data.add(new Tuple3<>(1,2,11));
0 |) s; t# m0 U( S7 W2 I. G3 O2 [ @+ B4 X
data.add(new Tuple3<>(1,2,13));
3 d4 u& I; D/ H" K& ~$ W+ V; m) m) v
: Q- h" a& ~& P- ^. j; z/ c4 d- y7 U2 l" p: Q; {" C% |
* W# W" c7 B( {5 j( P3 Z; a% I
m, e5 X7 P' g3 N
DataStreamSource<Tuple3<Integer,Integer,Integer>> items = env.fromCollection(data);
* ~- S0 S. N( J( W, U$ e. K ]; M* [
: b" c. y% y( x& Q) x3 P6 c# \1 h
5 e7 H/ {& N9 @' C( m" c
; H' y. a$ Z; a, F4 u) o" F* `
: n2 N8 S4 k6 D" A* h: D SplitStream<Tuple3<Integer, Integer, Integer>> splitStream = items.split(new OutputSelector<Tuple3<Integer, Integer, Integer>>() {
4 Y; o. V& ], q9 O. a# a d& D9 l, _4 A0 a, o, |0 r1 ~- ^! T
@Override- D5 F4 k) x* h* N0 ]
# } U) O$ |& A$ |" w5 j3 r
public Iterable<String> select(Tuple3<Integer, Integer, Integer> value) {4 h8 t" L* t# x4 T h
0 Y/ r/ {0 Z- [$ B1 ^1 b: Q List<String> tags = new ArrayList<>();4 }- D5 U! [# Z0 E+ ` e
& T0 J/ C' q9 j- h% q" A if (value.f0 == 0) {
" x# A, c; Y z: i3 { a4 \! K) n7 z5 i9 x: n% t! S
tags.add("zeroStream");/ T+ y1 _1 @, W8 G
. x6 T- v! `( R% I) Q
} else if (value.f0 == 1) {+ v" Y8 O! \9 p' z: W7 ^0 _
4 l _$ E/ |7 ~0 z, ?/ @! X% a5 ~0 ?
tags.add("oneStream");. V& n) U" h* I9 W, w+ I
6 b$ H% w5 J/ e3 c4 S& v }7 u9 y( `) ]; r
6 `1 U( p, i# `2 U return tags;
; p: r' l- G* L! |( Z
0 z9 L. ^1 \# O }
, T2 r u5 d q. |' i; F
: D" S4 s; z* l$ | });
" V$ _" J6 y8 j2 }5 A" `4 w7 I6 n; y
9 x; r, x6 ~, V$ q* m& V' V9 I
; k% M( {( e& K( Y* L/ Y' z
splitStream.select("zeroStream").print();, l* G9 x" ^# n+ m" v3 K+ I
: K6 r- A0 ] @8 h4 {
splitStream.select("oneStream").printToErr();
( J) C2 D: p& ]2 W: v2 R, i5 v' r' Y- C1 k
9 ` ~7 a1 j8 S! I$ |) m/ C
* U4 B- V, o, Y$ h4 @
//打印结果9 o' N B3 |/ L9 N) _
5 C# M% B8 L+ V' f4 @
String jobName = "user defined streaming source";2 P5 E$ M; i6 w: R8 ^
6 \; G( X- q8 q% e) p: W
env.execute(jobName);6 P, w$ Z W3 y( s- c. h3 z
5 h4 S6 e; s' u/ X; r. ]7 t3 x}
" y7 g8 d6 W- S) i. W</code></pre>) k7 ?2 Z# S3 _
<p>同样,我们把来源的数据使用 split 算子进行了切分,并且打印出结果。</p>. b$ D+ ?1 R$ |' H" ~, B" X
<p><img src="https://kingcall.oss-cn-hangzhou.aliyuncs.com/blog/img/CgqCHl7CA4aAbUSJAAG1LWNB3qw627-20210223084827377.png" ></p>, x# B4 z7 h4 S9 [) h
<p>但是要注意,使用 split 算子切分过的流,是不能进行二次切分的,假如把上述切分出来的 zeroStream 和 oneStream 流再次调用 split 切分,控制台会抛出以下异常。</p>
: z: q) \6 u$ F# ~- b( A1 ?<p>复制代码</p>
* @1 A8 o6 p. L" |4 x7 T0 E( g<pre><code class="language-java">Exception in thread "main" java.lang.IllegalStateException: Consecutive multiple splits are not supported. Splits are deprecated. Please use side-outputs.8 t" A5 w- ]3 u4 {6 D- j H
</code></pre>
8 W; R1 h1 F2 L2 X+ R' }0 u- h& `<p>这是什么原因呢?我们在源码中可以看到注释,该方式已经废弃并且建议使用最新的 SideOutPut 进行分流操作。</p>
/ ]3 ^5 n e4 C6 _<p><img src="https://kingcall.oss-cn-hangzhou.aliyuncs.com/blog/img/CgqCHl7CA6OAJ-JDAAIrh1JSAEo033-20210223084827602.png" ></p>& |! l7 l0 {: m; z; k: `
<h4 id="sideoutput-分流">SideOutPut 分流</h4>
5 v0 L$ A z9 |+ P/ n/ P9 f/ `<p>SideOutPut 是 Flink 框架为我们提供的最新的也是最为推荐的分流方法,在使用 SideOutPut 时,需要按照以下步骤进行:</p>$ L2 @/ ~' e* s+ u
<ul>. m7 J0 T s+ i* ? z
<li>定义 OutputTag</li>
) L7 Z$ t2 u( S7 s- U: o4 ?<li>调用特定函数进行数据拆分7 T" a5 N5 [: l+ }) ?$ N
<ul>
* U+ |( \- L2 s- W) s<li>ProcessFunction</li>
+ F6 z3 I' L* d2 r<li>KeyedProcessFunction</li>7 O% o; f2 [- k; T2 S- {0 C
<li>CoProcessFunction</li>
1 u( F3 y$ L, u- X- B6 ~<li>KeyedCoProcessFunction</li>8 L q, b/ ^" L! ~9 M
<li>ProcessWindowFunction</li>
8 b# d% m" f4 U<li>ProcessAllWindowFunction</li>1 c1 O& p4 X! ~0 L
</ul>
6 F7 ?: A" w( d a6 ?</li>
- J1 l2 v: n* ?9 h</ul>
: n# B p& k: o$ o& X<p>在这里我们使用 ProcessFunction 来讲解如何使用 SideOutPut:</p>9 b' E9 K x9 ?( V8 u
<p>复制代码</p>
3 z" F) @* ?: R. H5 P<pre><code class="language-java">public static void main(String[] args) throws Exception {0 e0 v( P6 J" r/ `% T4 K" y
& B. v! [# l X5 `7 w3 i
; B6 O. n+ X W& M d9 j
* h4 J Z, `" U1 F p( l StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
; t5 F: [0 b) Y* q/ v
8 k, b' G7 X8 g' D( j* Q6 i //获取数据源
0 h7 P {' m) G n# ~$ x! L4 O& w) N, [& P7 {0 I
List data = new ArrayList<Tuple3<Integer,Integer,Integer>>();
( x) ]+ _0 L3 v7 t9 P$ l9 D9 Z& W5 F2 b+ A$ s
data.add(new Tuple3<>(0,1,0));
9 l7 Q \! g" q; P) A. Y& h& S" X' [: ~5 |- u9 x( L# T
data.add(new Tuple3<>(0,1,1));
; J" K. {5 n9 {% V: X7 e" M# c: y. T1 I0 H
data.add(new Tuple3<>(0,2,2));
2 S0 g' e T& z* E* M2 [4 w7 C) i* f5 e2 E! p
data.add(new Tuple3<>(0,1,3));
6 _% _) w0 X7 k! ^! J1 n! w
& q3 j4 H! _# q: D. W data.add(new Tuple3<>(1,2,5));
, p! h8 H- p8 F; y
* M/ D/ Y9 R& q" w+ |2 m/ F$ Q# k data.add(new Tuple3<>(1,2,9));
5 F5 M+ @2 p) X: K# p! k) Z. f+ k) q+ {! l- G" t( V
data.add(new Tuple3<>(1,2,11));9 d, k! f: _8 T. Q- e0 v
6 O, q7 N# U. U7 [2 y7 f6 Z
data.add(new Tuple3<>(1,2,13));
8 [7 B. l8 w5 W) L6 C3 \& N2 M8 E2 s7 j1 a9 v
! c; m- m% w* f$ [
8 q8 f5 t4 n8 l2 y
6 p2 x$ P3 V, u" i: p
5 B& U* [8 G! c- j- j
DataStreamSource<Tuple3<Integer,Integer,Integer>> items = env.fromCollection(data);% q' g( y2 A% `7 Q4 }7 Z% t$ i
5 Y2 _# }! v% V* s; F9 s* S# Q( o
# R$ y4 l( G" v8 D8 p
! w; c4 J2 M5 @6 w6 F y g5 I" ?0 O! [2 | OutputTag<Tuple3<Integer,Integer,Integer>> zeroStream = new OutputTag<Tuple3<Integer,Integer,Integer>>("zeroStream") {};9 B+ v# L% L! V1 j
u8 w+ f: l: j# N OutputTag<Tuple3<Integer,Integer,Integer>> oneStream = new OutputTag<Tuple3<Integer,Integer,Integer>>("oneStream") {};
2 M6 T2 c3 l! z6 j) U/ B$ E7 m+ p
2 F7 P2 K; ^$ U5 W, {! @0 H' N2 J5 A% H
- x5 r) X9 ^& H' ^2 s1 n3 P( y$ v
) p& f6 L; Y- o5 R/ e2 q2 F+ b( H, R6 Q
SingleOutputStreamOperator<Tuple3<Integer, Integer, Integer>> processStream= items.process(new ProcessFunction<Tuple3<Integer, Integer, Integer>, Tuple3<Integer, Integer, Integer>>() {& W% k$ z3 C) V( C
/ u$ d# m8 X- ^- t. n2 z! M8 f( [# N
@Override
5 G1 U& P& P$ X# f/ U; d- {, A, y+ ]1 {: G# P' a
public void processElement(Tuple3<Integer, Integer, Integer> value, Context ctx, Collector<Tuple3<Integer, Integer, Integer>> out) throws Exception {0 i: |1 \. f2 k* Q& E$ w
# ?' Y% K8 O& o, H* x
8 Z6 F+ E+ f1 l" l" T
8 r- X4 s1 M- C& ]3 C! f1 W if (value.f0 == 0) {2 U0 ]2 y3 y* P- X/ R+ ]
/ i0 ~/ u% O0 I3 _' d$ r& d! Q7 ~* u
ctx.output(zeroStream, value);
: m8 C; Z" z+ b+ f, ^; R) {: P0 Z, z/ w: U3 H5 w, B
} else if (value.f0 == 1) {% }- c- W" ~7 C9 ^7 b
1 @9 [% l! P& ]$ ?5 F4 u ctx.output(oneStream, value);
3 Y; j; S& K! E( @1 @6 y7 Q! ?4 t1 }' o0 ?
}- m) p& u( V4 \6 w9 e9 r# m
/ `! j! ~" r) F2 F/ `! w
}
9 u6 g( f$ l6 D. _9 j! \* N7 K6 g/ d. |/ W
});# v2 m1 K/ ~# i+ P) v' O
. E: S* q: P. ~/ J! k( B
# w9 O- `1 c( [. m
+ ~ R1 _* w0 m% G) M DataStream<Tuple3<Integer, Integer, Integer>> zeroSideOutput = processStream.getSideOutput(zeroStream);7 S: h Q7 k, C ]
) ] d/ U, y/ D: V2 h1 H; n
DataStream<Tuple3<Integer, Integer, Integer>> oneSideOutput = processStream.getSideOutput(oneStream);1 O' b: ^, m' c
" R1 a! V4 H! J; `( q7 Z7 B6 J" _
6 O/ b# y H/ h( _- d& l! Q/ y/ L
zeroSideOutput.print();
) U5 I8 Y& `5 S$ ~* @9 z
6 K( M/ I8 y! M% r8 B% { oneSideOutput.printToErr();
- Y+ U0 l# Q& T& H8 D( i" A: }
6 R, Z/ B t, F1 v! m G" y4 E" w% B! T+ x) h# D! S
3 V& e" \! y- L% U) u. O7 {1 i
+ \2 u. W/ U8 P% V+ v) n
% G8 W t" S! ^3 g, r" c8 @7 r //打印结果, H7 y! l, Z! W0 e4 G4 b6 Y
" P+ H: j+ W7 D2 N1 _1 a: ?8 F String jobName = "user defined streaming source";
. \0 ^+ k$ D' N/ K$ N# [- \! u9 | ?' P' h
env.execute(jobName);# K% m- l. b* r$ W
, m- \, G9 [6 {
}
" r/ ^& u2 i3 d; e d1 ~- `! b( L</code></pre>9 a3 v( E# j( e; e% R2 @; n# r
<p>可以看到,我们将流进行了拆分,并且成功打印出了结果。这里要注意,Flink 最新提供的 SideOutPut 方式拆分流是<strong>可以多次进行拆分</strong>的,无需担心会爆出异常。</p>
+ U, U8 e' A" l2 Y) K<p><img src="https://kingcall.oss-cn-hangzhou.aliyuncs.com/blog/img/CgqCHl7CBMKAGHoUAAM-5UL5geg132-20210223084827698.png" ></p>- s8 {7 r' ^7 x u! m5 p( [8 h, T
<h3 id="总结">总结</h3>& T4 ]- g7 H+ g: Q6 Z% X" o
<p>这一课时我们讲解了 Flink 的一个小的知识点,是我们生产实践中经常遇到的场景,Flink 在最新的版本中也推荐我们使用 SideOutPut 进行流的拆分。</p>
+ ] } L! P( N7 b( f5 J<blockquote>
' g$ J, L* z7 \6 S<p>关注公众号:<code>大数据技术派</code>,回复<code>资料</code>,领取<code>1024G</code>资料。</p>
3 u. L$ r/ p# w</blockquote>( f7 `1 o; e' @2 a# V* \, a
+ g/ P) h4 G1 [$ C6 M" {7 l# i
|
|