|
|
E1 W( ^) o4 q& t; O/ ]9 E5 X" }
<h4 id="flink系列文章">Flink系列文章</h4>2 k" g j e# C; P1 [* K
<ol>( ~; i! d4 g: C% p4 R
<li><a href="https://www.ikeguang.com/?p=1976">第01讲:Flink 的应用场景和架构模型</a></li>
% n6 h. ` ?1 u$ s% A7 h<li><a href="https://www.ikeguang.com/?p=1977">第02讲:Flink 入门程序 WordCount 和 SQL 实现</a></li>; t1 O/ U& z6 q. {2 F
<li><a href="https://www.ikeguang.com/?p=1978">第03讲:Flink 的编程模型与其他框架比较</a></li>% Z# G! e, S' w5 j
<li><a href="https://www.ikeguang.com/?p=1982">第04讲:Flink 常用的 DataSet 和 DataStream API</a></li>8 I3 t/ s; O) t5 K4 B* v
<li><a href="https://www.ikeguang.com/?p=1983">第05讲:Flink SQL & Table 编程和案例</a></li> z' h9 U* }" C: i0 W$ d( u! K
<li><a href="https://www.ikeguang.com/?p=1985">第06讲:Flink 集群安装部署和 HA 配置</a></li>
: ?$ c$ l! X5 s9 I# m2 O<li><a href="https://www.ikeguang.com/?p=1986">第07讲:Flink 常见核心概念分析</a></li>3 G) e- T3 ~5 T6 l
<li><a href="https://www.ikeguang.com/?p=1987">第08讲:Flink 窗口、时间和水印</a></li>1 i. }5 [# P! u) x# ]! r
<li><a href="https://www.ikeguang.com/?p=1988">第09讲:Flink 状态与容错</a></li>" }+ o" I' u' v% } U4 o" n
</ol>9 h: [; J& B% J V9 A4 x$ V! T
<blockquote>7 T5 M! A/ w, ` y4 F2 F+ o
<p>关注公众号:<code>大数据技术派</code>,回复<code>资料</code>,领取<code>1024G</code>资料。</p>& f: }0 ^8 R3 r. r. O4 o
</blockquote>1 E6 ~8 T( \: j3 A. ]4 M
<p>这一课时将介绍 Flink 中提供的一个很重要的功能:旁路分流器。</p>& B+ K2 ~8 P4 n' E
<h3 id="分流场景">分流场景</h3>1 w8 k) k, P) a" `% g/ O
<p>我们在生产实践中经常会遇到这样的场景,需把输入源按照需要进行拆分,比如我期望把订单流按照金额大小进行拆分,或者把用户访问日志按照访问者的地理位置进行拆分等。面对这样的需求该如何操作呢?</p>
1 | e- a: ~6 }. h- I<h3 id="分流的方法">分流的方法</h3>
6 I0 }7 I, y# g% Y; E) J5 d$ ?( N<p>通常来说针对不同的场景,有以下三种办法进行流的拆分。</p>
7 Q8 E9 X* n! |/ Y% k1 V<h4 id="filter-分流">Filter 分流</h4>
0 k3 s1 s9 c' ~) F! C<p><img src="https://kingcall.oss-cn-hangzhou.aliyuncs.com/blog/img/CgqCHl7CAy6ADUaXAACSFUbdpuA911-20210223084827182.png" ></p>
: |1 @- y( n# s [/ h5 y+ m<p>Filter 方法我们在第 04 课时中(Flink 常用的 DataSet 和 DataStream API)讲过,这个算子用来根据用户输入的条件进行过滤,每个元素都会被 filter() 函数处理,如果 filter() 函数返回 true 则保留,否则丢弃。那么用在分流的场景,我们可以做多次 filter,把我们需要的不同数据生成不同的流。</p>
3 X- ?# \3 k6 Y& M# E. j6 c<p>来看下面的例子:</p>
6 U/ ?" i+ C/ w) x" U, E2 H8 S. o<p>复制代码</p>
, v' _: ~3 M9 k<pre><code class="language-java">public static void main(String[] args) throws Exception {2 B3 F. Z2 ?! k' R. U: b
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
; R8 o Z* m6 t+ i9 e: y //获取数据源
6 d" T- C# E1 ~. z7 g List data = new ArrayList<Tuple3<Integer,Integer,Integer>>();* ?' k! k1 Z0 t& V/ Y
data.add(new Tuple3<>(0,1,0));" b, h9 W) O) e0 }. ]; }
data.add(new Tuple3<>(0,1,1));
/ V& x$ t3 ?9 V& z, _/ B data.add(new Tuple3<>(0,2,2));
/ I2 N# h) y5 J" ]3 ?6 u* j4 D: O" p data.add(new Tuple3<>(0,1,3));
' T9 r, Z9 u: v data.add(new Tuple3<>(1,2,5));
2 {7 ^8 s( s% b data.add(new Tuple3<>(1,2,9));* u+ M. E1 K( T( ?: ] i
data.add(new Tuple3<>(1,2,11));
) W% W7 ?+ Z* R! \ data.add(new Tuple3<>(1,2,13));. X) e; s& M) c, c" M4 Y4 y
& _0 ]0 }* q' N" ~1 y, l f# G
DataStreamSource<Tuple3<Integer,Integer,Integer>> items = env.fromCollection(data);
+ Q O; r% Q/ a: D0 [# x9 G: {. o5 O! v4 } @# J9 a
0 B: W5 T8 O( y7 V
; O7 h4 F# U4 {1 ?1 r. F, l
SingleOutputStreamOperator<Tuple3<Integer, Integer, Integer>> zeroStream = items.filter((FilterFunction<Tuple3<Integer, Integer, Integer>>) value -> value.f0 == 0);- j1 J1 o. O' R6 W! v, o( h9 V0 ~. y; |
: i9 H0 B. t6 d, K0 C, K6 ` SingleOutputStreamOperator<Tuple3<Integer, Integer, Integer>> oneStream = items.filter((FilterFunction<Tuple3<Integer, Integer, Integer>>) value -> value.f0 == 1);
* X5 W) S- `8 b( n2 X% F: K; j
4 D! w4 w. F4 B
* t6 s d, _% h! B9 f4 a" g0 u( p1 {+ L5 ` E
zeroStream.print();; g7 O6 ]' R) t+ s
1 U) ] f# q* C# Q oneStream.printToErr();% Y# _% @: o, n( N7 n2 u
9 h0 |3 _* h8 L. v; Q+ r) y# [& t
; g) S" _$ p% y
7 Q: i3 l4 [/ p: P( W
, P' G2 g+ Z5 j- K( T
# m1 d6 H+ c' S* \ //打印结果
4 y" i( H, m1 q# c% y* V# w6 x* y$ V1 p( ?5 f3 s% w! f& [0 P
String jobName = "user defined streaming source";
8 j7 H) e* }8 K/ M9 v( k3 Z& ~) e/ ]: s8 S3 s( B) ~! B
env.execute(jobName);9 W9 W$ Y+ h, `* J1 n' a- ]+ r
- ]8 F3 `8 h5 H}
5 M+ ~+ m, ]$ C# C" H- ?+ O: ]3 ?. c</code></pre>) Q+ d3 m: _. o1 a3 F
<p>在上面的例子中我们使用 filter 算子将原始流进行了拆分,输入数据第一个元素为 0 的数据和第一个元素为 1 的数据分别被写入到了 zeroStream 和 oneStream 中,然后把两个流进行了打印。</p>) c" g( T$ x: h p# U, }, `$ N( n
<p><img src="https://kingcall.oss-cn-hangzhou.aliyuncs.com/blog/img/Ciqc1F7CA2WAYbshAAKj494h86s723-20210223084827296.png" ></p>' @' ~% q, y- \. [1 L
<p>可以看到 zeroStream 和 oneStream 分别被打印出来。</p>' E8 X. Y7 g9 P7 ^! M& l
<p>Filter 的弊端是显而易见的,为了得到我们需要的流数据,需要多次遍历原始流,这样无形中浪费了我们集群的资源。</p>/ G1 D# [+ ]3 D( G8 R0 @. w
<h4 id="split-分流">Split 分流</h4>
& ~6 R: e( b( r( ` G5 n<p>Split 也是 Flink 提供给我们将流进行切分的方法,需要在 split 算子中定义 OutputSelector,然后重写其中的 select 方法,将不同类型的数据进行标记,最后对返回的 SplitStream 使用 select 方法将对应的数据选择出来。</p>2 [9 m5 O# N8 a
<p>我们来看下面的例子:</p>
6 B+ M# ^. p( ~" K! l<p>复制代码</p>. F. Q& z7 e" U; e5 _1 v
<pre><code class="language-java">public static void main(String[] args) throws Exception {
" l. h) v- s5 J I5 N) v+ Y m" z2 U L5 V0 T! W
% G. U+ w% L- ]4 [) m$ b
5 i: o& @9 y5 l8 y StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();/ s" j1 e' G g7 u
7 i: q! \: [0 {: Y
//获取数据源
% n7 O8 X8 l. n1 ^9 U/ L3 q
) X! B4 j8 @8 Y _7 N) J List data = new ArrayList<Tuple3<Integer,Integer,Integer>>();
* V. a+ N7 I# r$ W
; U! a" t. x3 t6 V1 f |1 t, V* } data.add(new Tuple3<>(0,1,0));
& e8 `1 z% J' }2 [/ r- _
! |: n/ D" Q: ` data.add(new Tuple3<>(0,1,1));
/ X% P3 U6 u/ f8 {* w" q( H& a& M4 A8 l8 D
data.add(new Tuple3<>(0,2,2));
' O2 q2 d$ x# h5 C6 [$ U5 C7 I M) F6 w6 |8 @
data.add(new Tuple3<>(0,1,3));( {: E1 I) S- |! _. h; m4 n, i
, {1 U. y7 u& B9 s' L1 l data.add(new Tuple3<>(1,2,5));
9 P9 V3 m3 c. k* W* V* m
+ \+ [+ F7 Q9 d3 e, I# ~ data.add(new Tuple3<>(1,2,9));- n* t$ f- V3 n" W6 m8 p5 X
, N0 \" b1 \) Y8 j0 w- ]6 B2 m
data.add(new Tuple3<>(1,2,11));
, a* I$ t- \9 T! d8 H6 ~+ _ [3 T3 A; Y
data.add(new Tuple3<>(1,2,13));
. \2 m/ k: g. @1 A4 Q2 N( {7 O0 A+ F5 Y! m
5 |6 Y! K( j" s( ]' ^. c! D
4 q, R' p2 {% a% \; F0 X: J5 p- K
: u. @( S9 y2 g- S; P: m+ l1 X
- x4 b: [' Q. A! a1 [3 }( q" y DataStreamSource<Tuple3<Integer,Integer,Integer>> items = env.fromCollection(data);
) l2 ~ S# T# ~8 V( n8 K6 O4 y1 ^" O5 ?' P1 F( T0 B+ i7 m
: J- o M$ d1 b+ U
: b! e* h. c3 q: q/ }4 q) R1 w2 S8 t! ?/ Q' e3 y8 n$ a
; |. p9 {4 l8 }, y N- f G/ P9 h SplitStream<Tuple3<Integer, Integer, Integer>> splitStream = items.split(new OutputSelector<Tuple3<Integer, Integer, Integer>>() {$ L: a1 N! Z, f9 f
3 a# D0 N+ K- R0 c4 N @Override6 z6 O. ?3 _8 B% W2 r' H
: |6 p4 J, }2 ^8 H5 R! L* I" q- |# R public Iterable<String> select(Tuple3<Integer, Integer, Integer> value) { F4 d- v/ a4 \; t
9 t$ x6 `8 F& O& `' f
List<String> tags = new ArrayList<>();
% l. p/ i" B7 L' p; W4 j& W# K f5 i
if (value.f0 == 0) {
2 `! C& O- q" S" J1 P% o# P& U0 [( w B. e% F
tags.add("zeroStream");% G5 E/ T$ U, k0 E: N
( ^4 F. ~. d( c } else if (value.f0 == 1) {: Z6 X- o! O0 J4 J/ q5 _
& J1 }/ M$ ~# {; F% {
tags.add("oneStream");7 @( z( s% T( ^: ~. [% |
3 [+ i& G4 q- v6 a }
$ { U. C& E) m) m6 P% z' g: g
" i! r7 c2 N w2 j return tags;
p( T5 B: _2 E& z' I5 f8 e u8 Q# X4 [3 P% c
}
' w. Z9 p/ ~* P- i+ q, b2 X( C, r% N2 p3 i2 }9 S
});9 T0 e! i: G% T1 V
5 @/ @: u% O/ P4 Z3 H( q' ]7 U
8 `' a+ m0 C' I3 S( I
$ c1 X# Q: x _9 `5 |! v, y
splitStream.select("zeroStream").print();3 B8 m4 u3 ]& T; J5 B* a) F* _
- e2 e. U" ]+ i+ _ splitStream.select("oneStream").printToErr();9 \* _ l7 P1 h1 B
, f, F6 ~5 A. x) P8 P6 s' U+ q [# f* A: C$ u/ q
; G% G4 s7 ?5 r, ]3 i; M //打印结果, Y- D! Y j. c* d+ z, S2 [
5 x: t" {6 g) v, W' i, J String jobName = "user defined streaming source";
) x" X& [) F/ Z& z% f" [# s& I6 `+ N% p& r
env.execute(jobName);
; g. }5 \+ ]0 n1 G' x+ e9 F) O, W* `6 ~
}
/ w, x P }! R6 P) L. y</code></pre>+ g; o( ]# M; ]9 j' d6 L6 e2 u+ Y
<p>同样,我们把来源的数据使用 split 算子进行了切分,并且打印出结果。</p>' _( k. f% B( A! v3 |9 P
<p><img src="https://kingcall.oss-cn-hangzhou.aliyuncs.com/blog/img/CgqCHl7CA4aAbUSJAAG1LWNB3qw627-20210223084827377.png" ></p>* s' E/ P" i' p( \
<p>但是要注意,使用 split 算子切分过的流,是不能进行二次切分的,假如把上述切分出来的 zeroStream 和 oneStream 流再次调用 split 切分,控制台会抛出以下异常。</p>+ P+ T& d6 {* E; U+ T6 y
<p>复制代码</p>- ^8 b5 q6 p/ ^" P6 s3 ]4 g
<pre><code class="language-java">Exception in thread "main" java.lang.IllegalStateException: Consecutive multiple splits are not supported. Splits are deprecated. Please use side-outputs.* O* ]& `, y. w/ G8 F8 P
</code></pre>/ p7 {! j7 R9 E
<p>这是什么原因呢?我们在源码中可以看到注释,该方式已经废弃并且建议使用最新的 SideOutPut 进行分流操作。</p>- D$ O" x. Y( B3 h, S( |
<p><img src="https://kingcall.oss-cn-hangzhou.aliyuncs.com/blog/img/CgqCHl7CA6OAJ-JDAAIrh1JSAEo033-20210223084827602.png" ></p>
7 V# \% e$ e6 j7 T! N% {- G<h4 id="sideoutput-分流">SideOutPut 分流</h4>
. Z. Z, d' Y" X7 [; W; Y% @<p>SideOutPut 是 Flink 框架为我们提供的最新的也是最为推荐的分流方法,在使用 SideOutPut 时,需要按照以下步骤进行:</p>' V2 k9 V% h6 X
<ul>
: N* V+ U6 {' ]& K5 A$ O<li>定义 OutputTag</li>
1 L: V) |3 ]9 A- W5 g2 ]( O* `<li>调用特定函数进行数据拆分
/ K o. U. ~2 N" I<ul>
- M; d' X6 v! A6 F6 I<li>ProcessFunction</li>; e/ M7 X4 }2 G- H. g! I
<li>KeyedProcessFunction</li>
: t: J& u) E2 d0 {2 ~<li>CoProcessFunction</li>% {2 }& T) X: D' V6 ^+ N; `" l% b& o
<li>KeyedCoProcessFunction</li>
- d- Y- m8 {; _. T |, l<li>ProcessWindowFunction</li>! t# }, s3 F8 o
<li>ProcessAllWindowFunction</li>
! P. q0 Q# F9 V: e1 a- E</ul>% q9 S7 j+ @4 S5 m: _! U
</li>
1 x9 N; K# M1 a& D</ul>
" N- [- x7 X# @- q, K<p>在这里我们使用 ProcessFunction 来讲解如何使用 SideOutPut:</p>( E( M( [9 o+ L D
<p>复制代码</p>
, f6 X1 M: @3 F<pre><code class="language-java">public static void main(String[] args) throws Exception {
) Q0 h" b6 g, Z) ?' g, }, n$ |; ` ~5 x% i5 u
2 \. R, {9 Q5 Q: V- \3 o
" B. } x7 R/ F0 \; a7 m0 G StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
8 l- |. K; d. v3 `0 v0 W
& ]2 w& n: a+ D3 z //获取数据源/ k4 Z s' H- ^8 u
& \+ x* a$ L2 Z List data = new ArrayList<Tuple3<Integer,Integer,Integer>>();$ Z4 `/ l5 T* G. c
! `( Z' U+ x& h* _
data.add(new Tuple3<>(0,1,0));
. I N- ^6 [, K2 R; a4 t
[+ A8 f( S- I. s5 b) f! d data.add(new Tuple3<>(0,1,1));* O5 L* T3 _/ ?! ^1 p4 U
/ D, Z! }9 c0 k$ _1 |4 F; ^
data.add(new Tuple3<>(0,2,2));
6 L1 Z! a4 d/ t' `; r3 Y+ N* o* A! m6 |0 x
data.add(new Tuple3<>(0,1,3));& S1 @5 i8 {6 g) h1 w- y* i
1 n5 O* w/ y4 H! s; Z8 [ data.add(new Tuple3<>(1,2,5));: ?: z8 P+ u/ @% b3 C7 u3 J8 U
1 f! U8 ?: W1 { data.add(new Tuple3<>(1,2,9));+ _, ]! j B9 U. l
2 d' m) ]/ M3 P. {. W- V
data.add(new Tuple3<>(1,2,11));/ y+ Z& V* o& v: G5 a$ z1 K
J" f; h* b4 U2 @: }# Y
data.add(new Tuple3<>(1,2,13));
& o% M* f/ z- o5 A n
! P6 o% J# T6 a& k$ W
' c- ^9 T5 U0 q# ` u7 }2 V+ w4 F6 z) m; b l$ i; R6 G
- f8 e7 p% P2 |
7 a: Y( f" Q& r! k0 s" o+ B; ~ |# r# ] DataStreamSource<Tuple3<Integer,Integer,Integer>> items = env.fromCollection(data);2 Q/ `' Z( R9 h$ m. b% v8 W
5 v( h# A2 N# u8 U
# b# ^2 Z+ T/ `% e
8 B( j4 A% j% e6 T. N5 q
OutputTag<Tuple3<Integer,Integer,Integer>> zeroStream = new OutputTag<Tuple3<Integer,Integer,Integer>>("zeroStream") {};- |& m# J+ X) y1 o
: F! z! B2 v& u& F" p4 l% ` OutputTag<Tuple3<Integer,Integer,Integer>> oneStream = new OutputTag<Tuple3<Integer,Integer,Integer>>("oneStream") {};& q% N, x; `& M" F q
; ^4 g( A5 r2 Q9 x
0 |8 G! O% I2 O" H7 b4 T9 _$ c
: N% L; u" d0 B7 g' [
$ ?% o. M. j$ F# p8 G+ }% s' Q+ C4 W& A' g. A- \$ v; ]( X: j
SingleOutputStreamOperator<Tuple3<Integer, Integer, Integer>> processStream= items.process(new ProcessFunction<Tuple3<Integer, Integer, Integer>, Tuple3<Integer, Integer, Integer>>() {
) w/ p3 G6 n ^) @- d- A! Y+ |* A; f, [% ~5 P9 N2 o' R+ ~. h6 j" v
@Override4 d- H! l% Y& S( Y; v
4 `" V; F. {4 g7 S; \
public void processElement(Tuple3<Integer, Integer, Integer> value, Context ctx, Collector<Tuple3<Integer, Integer, Integer>> out) throws Exception {
" E2 k @9 v5 j2 c7 O" l0 I0 K, q- Z$ g, F' @7 O4 A0 P
: @/ K6 p' i1 L6 R4 u9 H
4 c( j5 `& E- ^) o1 }, F8 a if (value.f0 == 0) {$ d0 G4 @8 i# F" ^0 }+ J( e
1 V( e( X2 Q0 n& |7 v H
ctx.output(zeroStream, value);
3 i, W- P I& j6 i: {" N! ]* ~+ X3 J
} else if (value.f0 == 1) {
0 R4 q! V0 D% c
. C4 R4 g6 @- M ctx.output(oneStream, value);
; m9 ?$ A, M, m' K" ]/ w) K
U' o0 t5 G. C- j6 k; \$ y }- `9 \( B$ {$ x8 m7 O
2 T1 b0 M, h, a+ m0 j/ `
}
, t/ X. i0 O5 K+ b1 P6 Q; o
5 _0 x) m& J/ w' ? });: l# E% m7 C5 w* y" u/ w
$ m ^7 M, u3 V
& ]0 V s" p; T" [- ]8 k; q9 ]
7 r8 e" Y1 i5 U+ Q% W1 h2 G; [6 J DataStream<Tuple3<Integer, Integer, Integer>> zeroSideOutput = processStream.getSideOutput(zeroStream);( N# u7 R M" h' ^
. K5 h2 M" s4 a" J$ s, {6 n
DataStream<Tuple3<Integer, Integer, Integer>> oneSideOutput = processStream.getSideOutput(oneStream);. ]+ S; T* q/ x+ n
9 Z( t/ y+ C/ S P- |2 U) Z
# L f- S5 P c- ~7 E! ^& z/ s* n) G }7 L
zeroSideOutput.print();
+ O, l- l& n8 ?
/ a. O5 ?+ v2 b5 N; l oneSideOutput.printToErr();
+ y5 F ?/ K. U1 F2 m# [9 i2 ^) q$ M* e% O
+ R1 X- Y# X/ I0 \, t% p1 P: e% q% {' [
: P U/ t5 W$ _! s3 D" S: M6 x0 P# p7 P! J- f4 w5 e) k
//打印结果
& X7 e3 Y( S* g7 c& \* A7 T1 f# F% k8 l* W7 v
String jobName = "user defined streaming source";/ }& | ~& @3 @" Z
0 x/ x2 J* Q4 j3 D/ y$ C. f$ o# F env.execute(jobName);0 |5 \5 @2 \, W& M0 ~$ f1 b: ~3 N
" w" n( E# p s. \}
* [8 P- t7 j( _2 z</code></pre>. v& H/ N- m% E& @* T$ L
<p>可以看到,我们将流进行了拆分,并且成功打印出了结果。这里要注意,Flink 最新提供的 SideOutPut 方式拆分流是<strong>可以多次进行拆分</strong>的,无需担心会爆出异常。</p>
/ F: u3 @# e2 m) e<p><img src="https://kingcall.oss-cn-hangzhou.aliyuncs.com/blog/img/CgqCHl7CBMKAGHoUAAM-5UL5geg132-20210223084827698.png" ></p>
6 U1 F+ \9 W/ N7 E<h3 id="总结">总结</h3>: I+ T5 ~, U# [: ]
<p>这一课时我们讲解了 Flink 的一个小的知识点,是我们生产实践中经常遇到的场景,Flink 在最新的版本中也推荐我们使用 SideOutPut 进行流的拆分。</p>
" b1 Q; w: U! o% {5 I; |3 b<blockquote>
9 F7 J# \6 x! q6 h& c" y( }7 A! |<p>关注公众号:<code>大数据技术派</code>,回复<code>资料</code>,领取<code>1024G</code>资料。</p>
6 J3 A% T t. `4 ?: y</blockquote>6 A# z; M5 b! R2 I
3 W0 l' `. \; X& ?1 w0 @ |
|