|
|
- O0 ]* s) I" Y- w5 N
<h4 id="flink系列文章">Flink系列文章</h4>
$ k; [) m* H; `- d! |6 m<ol>+ c9 y' T4 N' \
<li><a href="https://www.ikeguang.com/?p=1976">第01讲:Flink 的应用场景和架构模型</a></li>
, V* ~& ]/ @$ _0 N7 z<li><a href="https://www.ikeguang.com/?p=1977">第02讲:Flink 入门程序 WordCount 和 SQL 实现</a></li>
4 B0 l" a5 F) W7 F |0 q' u) _<li><a href="https://www.ikeguang.com/?p=1978">第03讲:Flink 的编程模型与其他框架比较</a></li>
& m. `3 N3 q, E' `+ ^<li><a href="https://www.ikeguang.com/?p=1982">第04讲:Flink 常用的 DataSet 和 DataStream API</a></li>0 |5 j, c! g$ f' C' A8 z
<li><a href="https://www.ikeguang.com/?p=1983">第05讲:Flink SQL & Table 编程和案例</a></li>, t$ D! t/ |. {2 y+ ?' }
<li><a href="https://www.ikeguang.com/?p=1985">第06讲:Flink 集群安装部署和 HA 配置</a></li>
- N7 A- U4 K S<li><a href="https://www.ikeguang.com/?p=1986">第07讲:Flink 常见核心概念分析</a></li>+ l2 y5 k: j; H. r
<li><a href="https://www.ikeguang.com/?p=1987">第08讲:Flink 窗口、时间和水印</a></li>; B; _9 Z# t+ Z: T3 |6 x
<li><a href="https://www.ikeguang.com/?p=1988">第09讲:Flink 状态与容错</a></li>
( F" k: H) s4 L8 H</ol>% E0 ?3 `1 F. Q6 V/ T7 p- [
<blockquote>
, J2 g* o) `! i6 N) j- D<p>关注公众号:<code>大数据技术派</code>,回复<code>资料</code>,领取<code>1024G</code>资料。</p>
+ A7 p: s/ f; \- U5 `</blockquote>
1 S4 s. e7 b. I/ _9 u% p' [7 ~<p>这一课时将介绍 Flink 中提供的一个很重要的功能:旁路分流器。</p>
1 ?) |: e2 ~$ O0 v7 N<h3 id="分流场景">分流场景</h3>
$ }% V$ U$ N# d" d<p>我们在生产实践中经常会遇到这样的场景,需把输入源按照需要进行拆分,比如我期望把订单流按照金额大小进行拆分,或者把用户访问日志按照访问者的地理位置进行拆分等。面对这样的需求该如何操作呢?</p>
9 ^5 W' ^" z q9 H: x& z<h3 id="分流的方法">分流的方法</h3>
- {8 u- E& M4 w! }5 C$ ?<p>通常来说针对不同的场景,有以下三种办法进行流的拆分。</p>
1 e. b; V+ B: a( o<h4 id="filter-分流">Filter 分流</h4>
: S2 X! ]6 U$ F3 ?<p><img src="https://kingcall.oss-cn-hangzhou.aliyuncs.com/blog/img/CgqCHl7CAy6ADUaXAACSFUbdpuA911-20210223084827182.png" ></p>9 Y8 W; h' M8 _, }1 B2 H
<p>Filter 方法我们在第 04 课时中(Flink 常用的 DataSet 和 DataStream API)讲过,这个算子用来根据用户输入的条件进行过滤,每个元素都会被 filter() 函数处理,如果 filter() 函数返回 true 则保留,否则丢弃。那么用在分流的场景,我们可以做多次 filter,把我们需要的不同数据生成不同的流。</p>
: ~; k" Y* D. ?/ Y<p>来看下面的例子:</p>5 w% J" G" n$ M# r i X' z* X3 g
<p>复制代码</p>8 G- d+ R) n- ^/ m2 H, V# }* Z) u* l
<pre><code class="language-java">public static void main(String[] args) throws Exception {2 h3 |0 L. y* M$ b* R0 m1 q
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();* r- u" o& M* w( V/ N6 ~
//获取数据源
2 Q" w& \3 A2 _+ ~/ {! R) D List data = new ArrayList<Tuple3<Integer,Integer,Integer>>();
. j9 h- X I, P3 v' ]' w data.add(new Tuple3<>(0,1,0));
( `5 v& t# c7 |9 c, F# E data.add(new Tuple3<>(0,1,1));$ J5 ~2 ~, ^. l2 A! Z0 G2 A' F
data.add(new Tuple3<>(0,2,2));1 l! l9 w6 e) q! X
data.add(new Tuple3<>(0,1,3));
4 V8 Q7 ^/ x5 y data.add(new Tuple3<>(1,2,5));
" k. R4 m7 A; {$ y data.add(new Tuple3<>(1,2,9));
4 O9 T0 }" J0 ~7 R# e0 H data.add(new Tuple3<>(1,2,11));$ V" G; ]# W9 M/ P! B O9 y
data.add(new Tuple3<>(1,2,13));4 B, a. Q' ?; d% E! R' c8 Y/ @; ]
, R2 T w% `' b9 ~( r ^4 v. s DataStreamSource<Tuple3<Integer,Integer,Integer>> items = env.fromCollection(data);
- o8 [' ~$ o. B' d- o4 ]
* a2 \3 u$ b" f9 D& p9 o9 `7 l" i6 R" x
5 p+ @2 v/ `" S P! x SingleOutputStreamOperator<Tuple3<Integer, Integer, Integer>> zeroStream = items.filter((FilterFunction<Tuple3<Integer, Integer, Integer>>) value -> value.f0 == 0);
& I# m/ ?2 j6 K1 a
9 z" V: D* P+ t SingleOutputStreamOperator<Tuple3<Integer, Integer, Integer>> oneStream = items.filter((FilterFunction<Tuple3<Integer, Integer, Integer>>) value -> value.f0 == 1);
4 D' o& f9 [7 l0 x" `7 u4 t4 y' z2 D' E, v4 E
, C) ~) P. ]4 p; ?
8 s- h6 [, {% k [/ O) i+ k* i zeroStream.print();
4 P$ i& T) r. J: S- U: G% p2 e$ J+ s: t3 f$ @% O7 W
oneStream.printToErr();
+ b6 D$ x" _, u- h I j
( {: n5 a m5 @3 f% z/ `! k2 h# H T6 p4 o' Z
) N2 V. w6 E; e) m7 k) D( H& B
2 T. i4 I9 R F) e% f
/ i+ ^" D2 B, c* |
//打印结果+ y5 D: Q9 p- @5 t6 w5 B
+ j+ f) o' H2 h' H String jobName = "user defined streaming source";
7 z1 D) m% P3 X0 O& N+ y7 B) W. Y% e* ?+ \
env.execute(jobName); Z( u2 r- k: r8 X" ?' Q
& e/ O6 o+ _4 N( u) K}2 s; z* q t4 j) n
</code></pre>) N* b6 p+ O) E& U5 |8 g' J9 b
<p>在上面的例子中我们使用 filter 算子将原始流进行了拆分,输入数据第一个元素为 0 的数据和第一个元素为 1 的数据分别被写入到了 zeroStream 和 oneStream 中,然后把两个流进行了打印。</p>
2 e- O) `5 d6 {6 b( Q8 k$ ^/ @1 E<p><img src="https://kingcall.oss-cn-hangzhou.aliyuncs.com/blog/img/Ciqc1F7CA2WAYbshAAKj494h86s723-20210223084827296.png" ></p>
4 ^ x& F8 J5 G1 B0 L<p>可以看到 zeroStream 和 oneStream 分别被打印出来。</p>* P8 j; q7 x$ Q9 @/ N$ \1 E, X
<p>Filter 的弊端是显而易见的,为了得到我们需要的流数据,需要多次遍历原始流,这样无形中浪费了我们集群的资源。</p>
7 _3 _# m& `1 E: k3 L1 Y, J<h4 id="split-分流">Split 分流</h4>6 F% e; J! K1 `6 I
<p>Split 也是 Flink 提供给我们将流进行切分的方法,需要在 split 算子中定义 OutputSelector,然后重写其中的 select 方法,将不同类型的数据进行标记,最后对返回的 SplitStream 使用 select 方法将对应的数据选择出来。</p>/ Y& Y6 q% q+ Z; h8 d
<p>我们来看下面的例子:</p>
: Z X f2 c. ^/ K<p>复制代码</p>
; w/ l8 x2 [ g<pre><code class="language-java">public static void main(String[] args) throws Exception {
( T) I/ T% |3 s$ \. Y
0 e9 ^. H3 ^! F7 } J7 c0 N! c! j/ y- h7 i. h
6 `& Y. K, E7 u/ r! N* @' k StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
! l9 j2 \ X/ M7 H$ t2 K+ F% b% f, H
//获取数据源
; N9 m% x* F" ~- n/ f8 C7 @( M5 D: m) ~# f6 J9 u4 }* j3 O1 c9 |
List data = new ArrayList<Tuple3<Integer,Integer,Integer>>();
* \. Q8 x! E# x( U( z' C8 c
0 ~; x( }" n2 b) [1 W( l data.add(new Tuple3<>(0,1,0));& Z$ l: w8 h$ T: F0 ?
) g. M7 i! s @) [2 O8 o8 U
data.add(new Tuple3<>(0,1,1));
4 s2 C L2 {9 x" X. L. u& u* N4 \
7 I2 m6 m& o5 J3 n; F: A: |5 u data.add(new Tuple3<>(0,2,2));
9 [+ g/ L0 `+ j+ R" F3 ~ _7 q
# `% ~8 w: s7 B8 O: x8 [' } data.add(new Tuple3<>(0,1,3));
1 x4 V+ ^4 T3 y$ P7 {! y, h3 S% w# v3 U
data.add(new Tuple3<>(1,2,5));, g, _: x5 W" w% a% N; s* j
) a ^) m: D% W' P& P3 C data.add(new Tuple3<>(1,2,9));
/ |9 {0 j) @1 a' p Z
: y9 H! r7 P$ \: f' w& h8 i/ p data.add(new Tuple3<>(1,2,11)); F4 j& ~- i. l. g
# U" i, L) \( l9 W) {0 z B& d
data.add(new Tuple3<>(1,2,13));4 W5 A0 b5 K# r2 n
7 U5 `9 W" T' K! H3 L- r9 }( n
5 v# [1 X3 r6 v) i" D( ]2 R# ~0 Q
* c- U* e* z \
" o' E& F+ W& R. c+ y- z5 I2 v( ^' R" }9 e5 \3 B8 z7 x: |0 J
DataStreamSource<Tuple3<Integer,Integer,Integer>> items = env.fromCollection(data);
O& z, a2 M0 I7 o$ V; s8 j) _4 ~2 n4 @$ z7 G4 R+ T* X
( v# Y/ D3 Q' q$ ]$ n+ q) N# y
% h8 |. _# S( s( B( ^2 |+ F9 T6 U8 ]# ]/ H5 S
; j2 I2 ^; Y1 D, N! B9 X2 J SplitStream<Tuple3<Integer, Integer, Integer>> splitStream = items.split(new OutputSelector<Tuple3<Integer, Integer, Integer>>() {" d% a$ F, A) J. m! [6 Y
6 U9 ~5 c3 t0 p3 D& }1 y' e+ D4 _. @+ v @Override
' a8 L" V0 g5 U4 }1 a. Y- E% l1 _3 e5 d3 j( l. V
public Iterable<String> select(Tuple3<Integer, Integer, Integer> value) {
/ m9 w! ~) B5 D5 D- \" w1 Y# v u ` k5 x% X1 E
List<String> tags = new ArrayList<>();
( V: {2 {/ k- g1 F, }1 ]) v$ s8 i) a. P5 F8 o
if (value.f0 == 0) {& r2 e$ y( G( l/ _7 S3 I
3 F' c9 j; v5 \3 Z+ Z
tags.add("zeroStream");8 ], _+ n5 {5 u9 g+ R/ E7 H* E0 A e
* } ~/ |+ M& e* n) S } else if (value.f0 == 1) {
: i& a' s B: U
; P+ @. D D4 v$ P tags.add("oneStream");+ N' o9 f$ p6 W! A% ]1 S7 a* Z
p S* U4 }3 |" D0 U' o
}
% V6 l- U, A' z
3 ~+ h+ @. F u# N. X; o return tags;& y, V' H' |( T
, t% x; \" J% O }
2 ?) b' y. l9 n! I& i/ K& D/ v3 ?8 T! W+ \5 _
});
5 ~/ w& X h$ Y2 I' p& o+ T3 j9 a" X2 V! k A$ p
- E% x+ U7 K) i% \( b& i
. c& O8 _. W! g, U7 Z2 Z. V splitStream.select("zeroStream").print();7 B3 h6 X8 g- W. ~! I# ?
# N) k+ w0 w% z& W2 u/ d splitStream.select("oneStream").printToErr();
' K* t/ _4 B7 T
: k. @0 r& [. P8 o$ C" _1 w
3 S/ O7 n4 g% E9 `$ o' }9 X& K; S+ \
//打印结果) v; M- o x! z+ _9 Y
( \( R3 k2 l4 x. \0 L
String jobName = "user defined streaming source";
* r' ]- F M- i
8 b5 [7 k# P6 H env.execute(jobName);( D% X+ l b3 A/ _! g
! A0 ~( ` r% {, v$ S
}2 y! G4 p6 c3 F4 ^ q
</code></pre>
* f7 p; J% t+ h7 v1 F- s# H<p>同样,我们把来源的数据使用 split 算子进行了切分,并且打印出结果。</p>0 S( o+ f' _! l
<p><img src="https://kingcall.oss-cn-hangzhou.aliyuncs.com/blog/img/CgqCHl7CA4aAbUSJAAG1LWNB3qw627-20210223084827377.png" ></p>4 b3 C. A$ L3 w3 Z9 c
<p>但是要注意,使用 split 算子切分过的流,是不能进行二次切分的,假如把上述切分出来的 zeroStream 和 oneStream 流再次调用 split 切分,控制台会抛出以下异常。</p>
. {- U6 b6 o0 ~ Y% t) A) Q; U<p>复制代码</p>
1 ~: P: M+ f6 L6 n<pre><code class="language-java">Exception in thread "main" java.lang.IllegalStateException: Consecutive multiple splits are not supported. Splits are deprecated. Please use side-outputs.4 Z) Q$ Z2 l# I f% ~# u( O
</code></pre>2 w8 T7 H9 W- ]
<p>这是什么原因呢?我们在源码中可以看到注释,该方式已经废弃并且建议使用最新的 SideOutPut 进行分流操作。</p>8 H2 P! z, r" d* T7 I0 G" R, z
<p><img src="https://kingcall.oss-cn-hangzhou.aliyuncs.com/blog/img/CgqCHl7CA6OAJ-JDAAIrh1JSAEo033-20210223084827602.png" ></p>& z! m) h$ W9 O* Y7 A2 @; B3 z
<h4 id="sideoutput-分流">SideOutPut 分流</h4>+ o5 ?) k8 h$ ]
<p>SideOutPut 是 Flink 框架为我们提供的最新的也是最为推荐的分流方法,在使用 SideOutPut 时,需要按照以下步骤进行:</p>" b4 f, J& C5 {
<ul>$ K! x4 H6 s& p* Y/ M
<li>定义 OutputTag</li>2 n5 o; D+ F; Y, T
<li>调用特定函数进行数据拆分
/ j) n3 i) H$ t4 [1 u; w( @+ ^. N/ Q<ul>
% S6 s9 z" u" S& s<li>ProcessFunction</li> F; ^7 {' m& Y3 c0 f
<li>KeyedProcessFunction</li>4 N( ]/ e( O+ L, a2 `, m1 p
<li>CoProcessFunction</li>8 d$ `2 H; ~# c& j9 ~
<li>KeyedCoProcessFunction</li>
3 p( u3 u, w! q C) p; m4 }<li>ProcessWindowFunction</li>
3 S+ c- P* I3 y<li>ProcessAllWindowFunction</li>
* j7 G; z. c; y$ \* d* n% m</ul>
% \6 H! f* B& m/ g7 F</li>
8 @. i1 c/ ~% t0 H8 M9 L# x</ul>/ p/ R6 H! S$ d: V3 \
<p>在这里我们使用 ProcessFunction 来讲解如何使用 SideOutPut:</p>
6 c! G6 d/ }5 ^ ?2 R<p>复制代码</p>
! ~1 a) Y% f7 E# z" Q1 g9 R<pre><code class="language-java">public static void main(String[] args) throws Exception {
( W) E8 s7 H# \! s9 ]4 n) [
. m$ M. o) q4 V0 b! R4 ?: |6 t8 S3 f
$ c; r3 `# Y) V$ B4 X& o* u; U
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
0 ]% b1 J, G0 ^) @4 ~7 C. \/ K+ H+ a8 M
//获取数据源
1 K+ K- Y/ ^0 K+ ^- Z
, T: a6 H$ I4 ^5 I# N! M. q List data = new ArrayList<Tuple3<Integer,Integer,Integer>>();7 n0 [5 T b2 x, a% f* [. p
9 N2 ?0 z! W/ Z% N- b" u
data.add(new Tuple3<>(0,1,0));' y' j! l g [
( W& m0 Q; `5 t& H* @/ _
data.add(new Tuple3<>(0,1,1));
# o. X. Y! D% I4 a& F! a# t& @7 b* A$ e, v" T* W! f
data.add(new Tuple3<>(0,2,2));/ I3 e; Y* W ] x' U1 i
8 `6 d* Q: [# {) y, l3 o$ d
data.add(new Tuple3<>(0,1,3));( A& {1 F! X: l) \+ u, w
2 l& L3 ~- y* m' U! _
data.add(new Tuple3<>(1,2,5));
; _: U8 L+ G8 s/ v1 q' d( q ?6 q, S( }# s& J% [ |' l
data.add(new Tuple3<>(1,2,9));6 M! V2 N c! E& ~5 x% W8 e$ G
/ m+ e+ w: \& R, d$ |* N6 M& q data.add(new Tuple3<>(1,2,11));) Z- J& x9 b1 E$ o. D
6 Q- D+ @: U2 G2 S8 K: V! T
data.add(new Tuple3<>(1,2,13));
& z# @. {: L* Y9 ^/ l; N
" ?- Y% X) q% `
" c) i) h3 O5 s' @+ r" L, z* }
: {, r0 ]( D A' P( @, ~7 d$ s
6 ]& l, c6 v3 W. n3 c. J; r DataStreamSource<Tuple3<Integer,Integer,Integer>> items = env.fromCollection(data);
2 o$ K% o5 `9 n! V0 C2 v9 b( @6 M; g1 k
7 r* i8 n1 e+ A" _" @3 Q/ E
& l2 z1 i) p2 m3 y6 r3 o8 r' g OutputTag<Tuple3<Integer,Integer,Integer>> zeroStream = new OutputTag<Tuple3<Integer,Integer,Integer>>("zeroStream") {};) Q' i4 I" U) [3 V; d9 V6 G0 Y
) J5 L( `" g; ]1 }) z% e7 ?
OutputTag<Tuple3<Integer,Integer,Integer>> oneStream = new OutputTag<Tuple3<Integer,Integer,Integer>>("oneStream") {};1 o* Q8 s% P( I' n+ m
' o# t% R! G! j- C$ C/ ~' _# l# O+ L( ~5 ?0 ?5 A# @$ V+ f
1 Z6 E7 ~, O9 t; F! u+ I
; R1 l3 O6 l/ S3 `) c
; [& h( l% r$ Z' S$ a4 \ SingleOutputStreamOperator<Tuple3<Integer, Integer, Integer>> processStream= items.process(new ProcessFunction<Tuple3<Integer, Integer, Integer>, Tuple3<Integer, Integer, Integer>>() {
0 n& k9 C6 C3 y3 g- q- M2 B$ o7 G
# b- |2 E/ R8 n1 F( F7 g+ E: L @Override4 a# }7 t+ A- N" n6 W& j2 K
7 x6 P5 @1 \7 \" i6 ]9 a3 R
public void processElement(Tuple3<Integer, Integer, Integer> value, Context ctx, Collector<Tuple3<Integer, Integer, Integer>> out) throws Exception {
K4 e5 W( \1 n2 y! @
) Z" Q% O- I' Z; A; H# d" j/ e+ q& g0 C1 D% ^ V" ]
3 |# Q" I) i/ }$ Q- i l3 k
if (value.f0 == 0) {
* c$ L1 _4 H9 b3 p. Y
" Y$ S g" A3 p# B+ u ctx.output(zeroStream, value);
& l' o; s9 V# x% u
7 H" E( k5 b w9 e* n } else if (value.f0 == 1) {
9 h# v* t- @0 @& Y
' v0 Q6 |5 G9 H" @+ l5 C ctx.output(oneStream, value);& `4 Y+ ^" z+ B) ^
0 B6 s0 K2 M* Q- H3 u9 g+ I6 o }
/ |# y. b' `& B, [- d3 d* w1 |
- @( w/ S6 {/ ^ }
" X7 H G# _4 D; o8 q! O. \1 b) D3 ]0 v& G
});
4 {( E# Y7 k1 C& H% h' s! s& f) v' i& X& @
0 S. @ J$ @! ~# n/ q( x6 n
+ b2 Q) e2 `7 Q5 Q8 V4 b* @# [ DataStream<Tuple3<Integer, Integer, Integer>> zeroSideOutput = processStream.getSideOutput(zeroStream);- e( M. o g7 I) d; y) B
! ?3 w! {4 t8 A9 `" m# m. W* m DataStream<Tuple3<Integer, Integer, Integer>> oneSideOutput = processStream.getSideOutput(oneStream);
, u5 l7 ~4 M9 n2 j. n1 c) w1 C, Z& N/ \: G5 s
; N) V8 ~3 G2 F6 z
2 B8 J' [$ L" S! } zeroSideOutput.print();7 J. ]- q; }7 v7 ]4 Q
8 l, w x( e' |5 L0 A. S
oneSideOutput.printToErr();
! m3 G8 H* T) E) _. I9 l2 [# P c5 Z, K% F9 f |, }
8 ~- L4 r% n# r, ?. W
1 C6 U& V, J5 j5 U, _, l! j7 F% y- m$ I+ ^. X+ q# s
6 x, k9 l' v v- M G# ~ ~ //打印结果
; I6 {+ K& {" N0 G* u, s4 N5 d
5 e- \; ^/ l5 U% M* D, ?4 \ String jobName = "user defined streaming source";( T+ r, a2 d* n5 L# _
/ b9 g3 W* F0 w" \: G8 _
env.execute(jobName);
) l# g: p1 \& `0 Z3 ?. v8 M3 ~* s4 Q# C$ u* W ~- O) p* F6 F
}
& D0 i, w3 u d5 m. P7 C</code></pre>
7 [# V& @4 q1 M<p>可以看到,我们将流进行了拆分,并且成功打印出了结果。这里要注意,Flink 最新提供的 SideOutPut 方式拆分流是<strong>可以多次进行拆分</strong>的,无需担心会爆出异常。</p>
' }% l- i; `: ^% h<p><img src="https://kingcall.oss-cn-hangzhou.aliyuncs.com/blog/img/CgqCHl7CBMKAGHoUAAM-5UL5geg132-20210223084827698.png" ></p>
: ]- O @7 E z, _<h3 id="总结">总结</h3>
X. B! S; A. H7 _<p>这一课时我们讲解了 Flink 的一个小的知识点,是我们生产实践中经常遇到的场景,Flink 在最新的版本中也推荐我们使用 SideOutPut 进行流的拆分。</p>
7 s4 `! G, l( v1 W<blockquote>% l) h) t8 ? I
<p>关注公众号:<code>大数据技术派</code>,回复<code>资料</code>,领取<code>1024G</code>资料。</p>5 i. p# G: h3 y1 X0 c+ J
</blockquote>; c( e7 G! z6 h. s3 ` B
0 m" H& r' G/ |: L6 H- _+ ^ N |
|