|
* K+ I3 d3 J% j# k: P! ?& S+ `+ q
<h4 id="flink系列文章">Flink系列文章</h4>& @( p& [; G5 q4 D% u" j6 c
<ol>. R- P$ I: J, k5 ~" t
<li><a href="https://www.ikeguang.com/?p=1976">第01讲:Flink 的应用场景和架构模型</a></li>
$ B, J% W e! b8 h: s v<li><a href="https://www.ikeguang.com/?p=1977">第02讲:Flink 入门程序 WordCount 和 SQL 实现</a></li>' B/ t( w: M5 q5 D! L; [
<li><a href="https://www.ikeguang.com/?p=1978">第03讲:Flink 的编程模型与其他框架比较</a></li>; m4 V% Z+ R* h6 X2 j# J$ k
<li><a href="https://www.ikeguang.com/?p=1982">第04讲:Flink 常用的 DataSet 和 DataStream API</a></li>
7 t* c. v* A' W3 Y<li><a href="https://www.ikeguang.com/?p=1983">第05讲:Flink SQL & Table 编程和案例</a></li>
- H; R$ O# M7 w8 C% ^5 y" X" ]* K<li><a href="https://www.ikeguang.com/?p=1985">第06讲:Flink 集群安装部署和 HA 配置</a></li>
9 ^4 v- e5 U L1 Q3 S' e( `<li><a href="https://www.ikeguang.com/?p=1986">第07讲:Flink 常见核心概念分析</a></li>
& o1 `. t D( T o& |) i2 R# L. c<li><a href="https://www.ikeguang.com/?p=1987">第08讲:Flink 窗口、时间和水印</a></li>. @' |7 [ C# i5 S+ h: ?1 @ T' n
<li><a href="https://www.ikeguang.com/?p=1988">第09讲:Flink 状态与容错</a></li>( p7 h0 l0 h- w A0 C) E- Y4 q: H
</ol>" q* f" M" T3 }: O6 `+ b& I
<blockquote>
' Z/ N" Q+ c0 }; p) ]" R& m' V6 E<p>关注公众号:<code>大数据技术派</code>,回复<code>资料</code>,领取<code>1024G</code>资料。</p>% Y. I5 }8 A! J7 e' M; ~. m
</blockquote>5 t+ P1 ~ F$ l$ S
<p>这一课时将介绍 Flink 中提供的一个很重要的功能:旁路分流器。</p>
2 H3 l' f' C0 |0 |7 V' i( {; K<h3 id="分流场景">分流场景</h3>
) D* i- I( F4 m( h6 n<p>我们在生产实践中经常会遇到这样的场景,需把输入源按照需要进行拆分,比如我期望把订单流按照金额大小进行拆分,或者把用户访问日志按照访问者的地理位置进行拆分等。面对这样的需求该如何操作呢?</p>
$ R/ A" u5 e: l<h3 id="分流的方法">分流的方法</h3> s+ T$ L( v0 S g3 K
<p>通常来说针对不同的场景,有以下三种办法进行流的拆分。</p>
* m' s2 H" P/ Q" U<h4 id="filter-分流">Filter 分流</h4> L! B E: z# J# D
<p><img src="https://kingcall.oss-cn-hangzhou.aliyuncs.com/blog/img/CgqCHl7CAy6ADUaXAACSFUbdpuA911-20210223084827182.png" ></p>% S6 n3 h1 i% ?, p* g" B: ~% w) Q4 p
<p>Filter 方法我们在第 04 课时中(Flink 常用的 DataSet 和 DataStream API)讲过,这个算子用来根据用户输入的条件进行过滤,每个元素都会被 filter() 函数处理,如果 filter() 函数返回 true 则保留,否则丢弃。那么用在分流的场景,我们可以做多次 filter,把我们需要的不同数据生成不同的流。</p>
* O" L- N/ p5 ^- ?$ d. E3 ^<p>来看下面的例子:</p>
1 z. T+ c, s$ R<p>复制代码</p>) W) d% b% E' h- Y5 K+ A& Z" M
<pre><code class="language-java">public static void main(String[] args) throws Exception {
, b8 K- e' Z3 c1 a: s. v4 ^8 Q StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
9 y2 { u) I) d( o. F1 S //获取数据源
7 U. V. O% f: Q2 |* f- Y List data = new ArrayList<Tuple3<Integer,Integer,Integer>>();
, g: x2 I% N c# B6 e& H data.add(new Tuple3<>(0,1,0));
0 B( r7 C [2 D+ j5 p9 X data.add(new Tuple3<>(0,1,1));
/ E% L" o# @8 g+ g; F2 x data.add(new Tuple3<>(0,2,2));8 }4 b E% N- u
data.add(new Tuple3<>(0,1,3));: ^+ B8 t5 i$ b) N7 b( X' l- J6 D+ q
data.add(new Tuple3<>(1,2,5));
* ]) `! l4 F5 w7 [ data.add(new Tuple3<>(1,2,9));% o' ]' H, Z1 a9 M$ D! j
data.add(new Tuple3<>(1,2,11));
( M4 D8 N2 U6 a* B data.add(new Tuple3<>(1,2,13));
/ {. K$ G6 _2 m/ `8 N4 }6 V
+ S3 `. M' T E+ f3 s7 j" R/ c) ^ DataStreamSource<Tuple3<Integer,Integer,Integer>> items = env.fromCollection(data);7 p; F- D. U) I$ |1 O: k7 _1 c
3 j1 Q4 F5 I/ B7 L8 P# _; \6 u% X6 u# x6 b
3 C0 V" @1 b0 m: P6 ` SingleOutputStreamOperator<Tuple3<Integer, Integer, Integer>> zeroStream = items.filter((FilterFunction<Tuple3<Integer, Integer, Integer>>) value -> value.f0 == 0);
0 u2 s1 v/ c) B/ i" o2 O( |
! Q" O, `! c% Z- p0 m SingleOutputStreamOperator<Tuple3<Integer, Integer, Integer>> oneStream = items.filter((FilterFunction<Tuple3<Integer, Integer, Integer>>) value -> value.f0 == 1);% q9 x3 W. B# Z3 C* C' ^; \
: d+ U# J- Q% b2 n, t& U
4 \. h/ ~8 O4 u. h F5 R$ F) @/ _4 F' y$ W$ u
zeroStream.print();3 d8 G4 [. s, i, y
2 \! O6 U+ y4 R! `/ T$ B& W/ s
oneStream.printToErr();
, M6 _* N2 |% \# d1 ]0 }8 z, ~+ s$ W0 z
V1 S$ q& T }/ q0 V
8 L/ O6 ~9 ]+ w/ }3 w) h, W$ H+ Y/ k' J+ w
R" V! n. t* u, C; p" X) m //打印结果
* H8 Z" S8 I$ v5 l. h0 m4 D, B3 Y; [9 m/ p q s- q
String jobName = "user defined streaming source";& N& g" s2 l" w" u8 }1 R
) i$ J/ t. E4 Q
env.execute(jobName);
, Q2 |. l0 Q/ S& ~( ^0 B A; D. T; n8 i; D% u$ X/ l
}
2 m5 U: C) e4 Q0 {. P</code></pre>
* y2 r, d- O' ~# F Q, k" W0 {7 R<p>在上面的例子中我们使用 filter 算子将原始流进行了拆分,输入数据第一个元素为 0 的数据和第一个元素为 1 的数据分别被写入到了 zeroStream 和 oneStream 中,然后把两个流进行了打印。</p>0 ]; K8 ?0 k @5 E6 ]# d& n
<p><img src="https://kingcall.oss-cn-hangzhou.aliyuncs.com/blog/img/Ciqc1F7CA2WAYbshAAKj494h86s723-20210223084827296.png" ></p>
3 Y. z) y: P0 ^- x<p>可以看到 zeroStream 和 oneStream 分别被打印出来。</p>
: P# w/ S" d$ i6 J% d7 T9 ?: V5 l8 r<p>Filter 的弊端是显而易见的,为了得到我们需要的流数据,需要多次遍历原始流,这样无形中浪费了我们集群的资源。</p>
D6 r+ W# g2 T& _<h4 id="split-分流">Split 分流</h4>
# ~* L' d1 g+ t, S$ F `9 O v/ y<p>Split 也是 Flink 提供给我们将流进行切分的方法,需要在 split 算子中定义 OutputSelector,然后重写其中的 select 方法,将不同类型的数据进行标记,最后对返回的 SplitStream 使用 select 方法将对应的数据选择出来。</p>
! z) J* V1 U. O; J8 @3 ~<p>我们来看下面的例子:</p>
2 r7 b) X' f; c$ Z2 A1 E<p>复制代码</p>
5 c- S) K+ |( a( t* `* v2 u& O<pre><code class="language-java">public static void main(String[] args) throws Exception {
$ G2 }# @" y- I( `' K
. O9 n' ~6 d$ B" Z- m+ H2 c9 i1 t' ~9 }( R$ U: ]1 R3 ^' ]/ Z1 I
* ^) F. X/ J. H3 H StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();' {2 D7 M, H3 ?8 }
$ J( w( ?$ u1 j$ ~9 P# a0 I2 R
//获取数据源1 X7 F8 b$ ]/ W+ q* l4 p9 b
7 {$ _9 y. J: ~) L7 G
List data = new ArrayList<Tuple3<Integer,Integer,Integer>>();3 f& Z$ q& P) |* B
7 I3 O/ K4 T4 Q Q% o1 U
data.add(new Tuple3<>(0,1,0));7 \% G: U! m" u& [$ N, }* w+ ?
% S2 f/ X. W1 g4 j5 F6 E
data.add(new Tuple3<>(0,1,1));3 s: W: ~3 u, l
8 `$ U2 S5 ^ u/ V# L
data.add(new Tuple3<>(0,2,2));
7 C! u/ R9 G# V$ C' G) \
# p9 b9 V, L7 @: O data.add(new Tuple3<>(0,1,3));
% q3 v+ w* p% l `$ a9 [
' R+ }4 d) u( l. Z2 J/ h, {# z& F data.add(new Tuple3<>(1,2,5));* U' Q3 |. }. C5 {" B
8 i! r% ?' s \* W4 t data.add(new Tuple3<>(1,2,9));
" u8 j Y7 Y* S) N& a# ~ @/ j2 s' @, d) [" \ b$ d
data.add(new Tuple3<>(1,2,11));% A# Q) f) B' ~
5 a: _& |3 |! W' f# q# c' [ ^
data.add(new Tuple3<>(1,2,13));
% Z4 [* G& v- D6 G+ N8 ]+ o& n: J4 b, K0 p9 f- h
( e# W# |! L7 q; z* r0 P" D
6 H4 N* _. A; t2 `: u" J/ A* W
; D& y. O1 U: U2 I5 B* P t+ X; S3 c, l8 X, G
DataStreamSource<Tuple3<Integer,Integer,Integer>> items = env.fromCollection(data);) R( t6 p$ f) M' F) L
5 c ^6 O9 r. @% U
( J8 s( o3 n( T. y" W1 P, G+ S1 ~# c. @8 l) y. @) V
0 q- y% n! y0 g2 R
3 n8 u d& O6 o/ U, \ c0 Q SplitStream<Tuple3<Integer, Integer, Integer>> splitStream = items.split(new OutputSelector<Tuple3<Integer, Integer, Integer>>() {6 ~8 q! e- z6 v4 w+ z2 z
4 X8 q# e# z' H+ O+ w& ]
@Override, g, x4 I. h/ W: |5 A
4 y, \% q, Y6 B6 i
public Iterable<String> select(Tuple3<Integer, Integer, Integer> value) {
' |. t' O& E( x; T- S+ W; l; F
' Y5 w9 W/ p: I0 } I- _ List<String> tags = new ArrayList<>();
& X4 }" ^0 B" _2 _# k$ h# G5 L7 k
+ z) N* g L: Y if (value.f0 == 0) {
( L. p: H( T% i- l- u) Z, E. q) I/ B$ h4 t$ a
tags.add("zeroStream");4 a1 A/ G! a+ S6 R( C9 j% I
! z Y4 c) @2 X7 X) D! O, k; H' t
} else if (value.f0 == 1) {
8 E0 G5 S* G8 z0 {. k% f$ h
) y D, p ]/ i2 v/ L- M tags.add("oneStream");
- }) b9 O+ E: y$ Y& X
/ g- a: C1 C) x8 L% m. H: [ }
2 \$ _: n" z' H7 [' t/ J$ ~) \ B n E( K& O/ [% s
return tags;# |/ W0 Y9 a. _+ c% O+ C* x
+ r& h! Q0 u6 z9 _7 D }( X$ W* q. O2 F) n5 g1 q
2 _3 y/ I& U# s4 G& g });
# e0 p; f/ N( R8 }5 W. }/ V& N" H9 Q! M8 ]" ?3 @* [* S8 l
; W( R j9 N; J8 ]
2 _' V' j7 M" y$ U B- ] splitStream.select("zeroStream").print();7 t# Z5 i! r( D2 o# T( ~
- z8 e8 n# ]) A5 x. q# @+ j% p! z
splitStream.select("oneStream").printToErr();# i- T* F6 \2 }) m( d: H# _
i1 b, k- m8 a. V" D
4 i% X& h" M! s0 P
% o U2 \6 f4 S //打印结果
6 p; u1 A+ h6 a" i/ k( K$ Y
# _! E' t6 L1 U& v7 O String jobName = "user defined streaming source";' P" Z& K1 K; }
2 g! M1 `$ c# K# a% _
env.execute(jobName);
; O$ m4 t0 N; w, P
, f% l: J8 s9 r1 k3 b3 D+ H( `; \}$ ?7 J# u/ Y9 O `, J9 Z. Z
</code></pre>
2 V( f7 e( p g% s7 d+ P<p>同样,我们把来源的数据使用 split 算子进行了切分,并且打印出结果。</p>9 c5 Z2 i; k, V
<p><img src="https://kingcall.oss-cn-hangzhou.aliyuncs.com/blog/img/CgqCHl7CA4aAbUSJAAG1LWNB3qw627-20210223084827377.png" ></p>
, c3 ]: I9 q3 C ]& H; a5 P<p>但是要注意,使用 split 算子切分过的流,是不能进行二次切分的,假如把上述切分出来的 zeroStream 和 oneStream 流再次调用 split 切分,控制台会抛出以下异常。</p>. J; [7 \ w3 p7 B" @& S
<p>复制代码</p>! c+ T+ G9 s5 i( Y) ]! P
<pre><code class="language-java">Exception in thread "main" java.lang.IllegalStateException: Consecutive multiple splits are not supported. Splits are deprecated. Please use side-outputs.( D) e4 v( l7 d5 J
</code></pre>- v8 X& |* f( s7 ~9 N% M
<p>这是什么原因呢?我们在源码中可以看到注释,该方式已经废弃并且建议使用最新的 SideOutPut 进行分流操作。</p>2 u: |9 h& \5 c. n& r8 \5 ?
<p><img src="https://kingcall.oss-cn-hangzhou.aliyuncs.com/blog/img/CgqCHl7CA6OAJ-JDAAIrh1JSAEo033-20210223084827602.png" ></p>
1 y3 {% s1 n; w) R1 g+ W<h4 id="sideoutput-分流">SideOutPut 分流</h4>7 a: _* s' |& p
<p>SideOutPut 是 Flink 框架为我们提供的最新的也是最为推荐的分流方法,在使用 SideOutPut 时,需要按照以下步骤进行:</p>
3 o2 M/ W; l$ Q9 A ^<ul>
* c, D& t! y" H0 c3 i5 t<li>定义 OutputTag</li>
" J. n. W9 z6 k* s<li>调用特定函数进行数据拆分5 x& P" z/ e% u j
<ul>
4 J8 e; `7 c( o2 k+ s& }<li>ProcessFunction</li>
! E2 C0 l- f* w<li>KeyedProcessFunction</li>
7 w) g$ i. g E0 @* @" F<li>CoProcessFunction</li>
& b2 [4 w! G# s& G. O5 v4 W<li>KeyedCoProcessFunction</li>8 k6 m M9 w. h3 @
<li>ProcessWindowFunction</li>% v: z8 \" B! q
<li>ProcessAllWindowFunction</li>2 v0 j. P* V; Y$ M
</ul>
/ `2 n; B& i" I& C</li>1 ], I8 j" Z- Q
</ul>7 y, x) t* `' t( _: G5 o
<p>在这里我们使用 ProcessFunction 来讲解如何使用 SideOutPut:</p>
& f6 H3 y; `9 T$ q4 @& X" H<p>复制代码</p>6 j6 n" S9 k2 g7 P, X' @
<pre><code class="language-java">public static void main(String[] args) throws Exception {
9 A1 {& I$ d$ j, L
# C$ a" Q$ F. [& Q
6 k+ n w0 j/ s3 ~) j! L( m* G* Y" }( ^3 M# U8 _1 n$ \% F1 H
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();5 I3 @( M7 k% u% C
- t% e9 B2 [ |3 t+ @+ P
//获取数据源
9 A: ?# C) X& Q4 M% H8 u2 ?. E6 E9 @4 K
List data = new ArrayList<Tuple3<Integer,Integer,Integer>>();9 |4 ~: X% z# p* p3 p7 c7 X) Z) E
) d$ y- w X: T9 c6 ~0 J4 ] data.add(new Tuple3<>(0,1,0));# C; S5 Y3 a5 o3 X
# B5 ~. m" @, {4 v
data.add(new Tuple3<>(0,1,1));
7 `9 @6 n7 Z/ e4 d2 I4 V! `( u) F1 r `; \
data.add(new Tuple3<>(0,2,2));
; B( ~9 G+ _, k+ s
% ^9 M& d0 v, i: { data.add(new Tuple3<>(0,1,3));
; E7 {* U% I: {) t* u4 L; E/ \& ]8 s8 @; j
data.add(new Tuple3<>(1,2,5));
7 o3 h, k& {$ M5 p D. o3 {) W2 j7 `0 l$ i- V8 L
data.add(new Tuple3<>(1,2,9));9 c, F$ h$ a; @/ X2 X
9 M3 c/ Q; c8 t data.add(new Tuple3<>(1,2,11));
5 Q2 N1 y, M' e; b: S
/ y7 `1 @6 a: L6 T. a# \2 G data.add(new Tuple3<>(1,2,13));% s2 v. a* C8 j6 Q" l4 ?6 i/ ?& _
* B9 r3 M: ?6 c. b4 K; W' w
( L+ o* [" |' j: [" }) C
7 [8 `* C f5 J5 ?1 S4 N
( p- X0 \4 q0 H0 ?2 ?0 \
* y4 {3 @7 b: q6 v3 [
DataStreamSource<Tuple3<Integer,Integer,Integer>> items = env.fromCollection(data);! c2 ^ U# @ Z/ z: }
- e! f. a, i% e+ P! X
" ?3 T- v; P' i1 g9 Q! B1 A
+ _' V+ Y# T( [0 i1 Y0 @* r3 ? OutputTag<Tuple3<Integer,Integer,Integer>> zeroStream = new OutputTag<Tuple3<Integer,Integer,Integer>>("zeroStream") {};
" ?7 W! E; m8 F9 N
) m* a0 [4 B0 G4 Q. x8 r0 @ OutputTag<Tuple3<Integer,Integer,Integer>> oneStream = new OutputTag<Tuple3<Integer,Integer,Integer>>("oneStream") {};
# f' J5 O8 B L8 T2 c) S/ |. }" I3 |( R% w7 D7 y7 B+ `7 E
8 D) Q8 O0 L+ T8 M
. w5 Y/ F8 K' [, J/ F @
$ y: y* S# y: P3 J) _0 M7 J
! Q. K5 q g3 R) L( {
SingleOutputStreamOperator<Tuple3<Integer, Integer, Integer>> processStream= items.process(new ProcessFunction<Tuple3<Integer, Integer, Integer>, Tuple3<Integer, Integer, Integer>>() {" B$ k. K8 }; v7 e
; V+ {7 w$ I8 e% j8 B
@Override" I+ V/ A3 K* L
( o P* K0 B: }8 W0 [" u. q% z# y
public void processElement(Tuple3<Integer, Integer, Integer> value, Context ctx, Collector<Tuple3<Integer, Integer, Integer>> out) throws Exception {
# W- u. ^9 Y9 o% F" _2 j' n& u
" X7 @ t2 R. d. G
, W! y* s* l3 O
( C) D" z: z8 Z7 y$ {( k+ n4 S if (value.f0 == 0) {
. G% A: j' O& K( b6 X
. ~" ^( E5 W# p$ g8 p+ n/ c ctx.output(zeroStream, value);9 f& o( b, v* k p3 m& R$ `- w" ]
N7 o$ ]) c4 x( k } else if (value.f0 == 1) {) ?. ]$ e6 f$ f1 `/ U
) J2 i6 h/ A! M- A
ctx.output(oneStream, value);8 v$ v6 {. |6 T6 a
: W+ _! e7 l" I }
: \9 @. ? d: P0 a: N2 T/ O: k' Q% p" A3 c0 Z% O8 o
}
+ S1 g3 D8 p, M9 Y: z
# C8 N! e! U, T; r2 p$ {8 r3 ` });9 Q0 `* S7 m! I$ @. Y! E
+ Y1 A. m$ X* B, A
4 a0 ?) @7 j5 ^' _/ A! k! M4 [/ D- ?/ W
DataStream<Tuple3<Integer, Integer, Integer>> zeroSideOutput = processStream.getSideOutput(zeroStream);; U/ L% m9 r) m" T0 I$ I# L
+ g$ m* m# |7 k# S( m
DataStream<Tuple3<Integer, Integer, Integer>> oneSideOutput = processStream.getSideOutput(oneStream);: n5 M2 c) F) f! }
: F$ G0 P, h$ K: s$ L0 I% y
2 k4 g- T! x5 g$ P8 [% x# m
3 N+ ~) f0 J& ?1 X* p7 N$ f
zeroSideOutput.print();
) k# u6 P0 O7 C" m. M' T
7 e( k- r: b5 Y; q/ q: i- I( L% g7 I oneSideOutput.printToErr();5 r. {5 n; }2 Y0 r4 Z
( V: B4 d3 O( S1 S& \$ e
( P7 W w; z5 l% N7 X4 u6 I6 }
$ F0 j1 k9 w f6 N$ Y
# k6 ~3 i' r5 x( R; n8 H8 ?& y7 }0 x1 `7 |/ V3 Z, W5 g6 N' V. U
//打印结果
\6 H& U5 ^. `9 w! U1 }0 u
3 e9 {+ s; Z. \ d# H String jobName = "user defined streaming source";
$ B- x5 s8 [9 I! d- x6 |2 z
1 j8 Y: s2 O4 W3 V/ D env.execute(jobName);
5 V1 T; ~; o% T) d- B/ C* ?3 f; @
}# F/ }8 E1 W+ l0 v# G; I9 Z0 w
</code></pre>% b. _) M# |: q6 S
<p>可以看到,我们将流进行了拆分,并且成功打印出了结果。这里要注意,Flink 最新提供的 SideOutPut 方式拆分流是<strong>可以多次进行拆分</strong>的,无需担心会爆出异常。</p>
; s6 U* ]4 D" o; w<p><img src="https://kingcall.oss-cn-hangzhou.aliyuncs.com/blog/img/CgqCHl7CBMKAGHoUAAM-5UL5geg132-20210223084827698.png" ></p>! `: E6 y) f7 E
<h3 id="总结">总结</h3>
5 t& v" B5 E" K: [. o, n$ n, U<p>这一课时我们讲解了 Flink 的一个小的知识点,是我们生产实践中经常遇到的场景,Flink 在最新的版本中也推荐我们使用 SideOutPut 进行流的拆分。</p>! q) K& n% S% ^. i
<blockquote>
( i5 f& }1 c2 r; w9 _/ I; T<p>关注公众号:<code>大数据技术派</code>,回复<code>资料</code>,领取<code>1024G</code>资料。</p>
8 |: S9 y. c5 T9 { L/ t</blockquote>6 [4 V6 h0 m& m) p& {% ]
2 m2 [' I* F# ], q4 x |
|