|
|
4 q: y( _+ H$ S( n7 E) P
<h4 id="flink系列文章">Flink系列文章</h4> R# s3 ^' Q9 q3 s) a7 O
<ol>
) a' _* @7 [; y M<li><a href="https://www.ikeguang.com/?p=1976">第01讲:Flink 的应用场景和架构模型</a></li>5 w; g V% M, D
<li><a href="https://www.ikeguang.com/?p=1977">第02讲:Flink 入门程序 WordCount 和 SQL 实现</a></li>5 S" f, S; b8 e) i1 S1 s( j% c2 V9 _
<li><a href="https://www.ikeguang.com/?p=1978">第03讲:Flink 的编程模型与其他框架比较</a></li>" s# B; Q- A" ]7 @1 K' d
<li><a href="https://www.ikeguang.com/?p=1982">第04讲:Flink 常用的 DataSet 和 DataStream API</a></li>
3 y/ K6 Z! B \/ T/ |; n X<li><a href="https://www.ikeguang.com/?p=1983">第05讲:Flink SQL & Table 编程和案例</a></li>4 d I, {( U' W3 }& d2 t
<li><a href="https://www.ikeguang.com/?p=1985">第06讲:Flink 集群安装部署和 HA 配置</a></li>
5 ?5 `7 u" ~5 I; h% q) R( m<li><a href="https://www.ikeguang.com/?p=1986">第07讲:Flink 常见核心概念分析</a></li>
/ Z" J# ?5 s; F& I<li><a href="https://www.ikeguang.com/?p=1987">第08讲:Flink 窗口、时间和水印</a></li>
- h; [# Z- l5 L+ N) Q' }! d<li><a href="https://www.ikeguang.com/?p=1988">第09讲:Flink 状态与容错</a></li>7 q3 a. A+ b8 I% j) E- x
</ol>
$ }* o$ p* V& U( H) T<blockquote>
S3 V4 g/ l% ?# h8 B<p>关注公众号:<code>大数据技术派</code>,回复<code>资料</code>,领取<code>1024G</code>资料。</p>
" ?% S& @) U2 R</blockquote>
! I o# \3 S( A) K- X! S( H<p>这一课时将介绍 Flink 中提供的一个很重要的功能:旁路分流器。</p>9 @0 s1 d; j# }
<h3 id="分流场景">分流场景</h3>
( M" l; l; u! z. ~9 X' _8 G% w<p>我们在生产实践中经常会遇到这样的场景,需把输入源按照需要进行拆分,比如我期望把订单流按照金额大小进行拆分,或者把用户访问日志按照访问者的地理位置进行拆分等。面对这样的需求该如何操作呢?</p>6 ~% _5 I/ B S- B' E( o
<h3 id="分流的方法">分流的方法</h3>
. C; z/ }4 _% `, g<p>通常来说针对不同的场景,有以下三种办法进行流的拆分。</p>
% t$ @1 ]; i- t3 W3 [1 C K+ O1 |<h4 id="filter-分流">Filter 分流</h4>8 F8 J2 O0 `+ v4 I0 I
<p><img src="https://kingcall.oss-cn-hangzhou.aliyuncs.com/blog/img/CgqCHl7CAy6ADUaXAACSFUbdpuA911-20210223084827182.png" ></p>
& W8 a! z/ p- V$ S, M8 V; h6 g<p>Filter 方法我们在第 04 课时中(Flink 常用的 DataSet 和 DataStream API)讲过,这个算子用来根据用户输入的条件进行过滤,每个元素都会被 filter() 函数处理,如果 filter() 函数返回 true 则保留,否则丢弃。那么用在分流的场景,我们可以做多次 filter,把我们需要的不同数据生成不同的流。</p>
0 }2 k9 i. ~$ h& Y' X/ _# [<p>来看下面的例子:</p>
" F$ `" z& L1 Y0 M5 L<p>复制代码</p>
" [. x4 Q/ T# G( M! g/ N<pre><code class="language-java">public static void main(String[] args) throws Exception {/ Z" w% c3 S) x N
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();, w/ i, y- c5 s$ c6 Z! _$ b0 n
//获取数据源0 h3 H8 R5 c {8 R0 [. q& }. F8 X
List data = new ArrayList<Tuple3<Integer,Integer,Integer>>();0 |$ o# y. S9 |* A! M
data.add(new Tuple3<>(0,1,0));
/ Q1 O( {6 g& p4 { data.add(new Tuple3<>(0,1,1));4 l& g0 C* V3 q- I. h; ]2 @3 r
data.add(new Tuple3<>(0,2,2));
+ ]* H/ J0 D% ?" Z, f+ \ data.add(new Tuple3<>(0,1,3));
Q8 Y5 W, o9 ^% ^, { data.add(new Tuple3<>(1,2,5));% s& v9 ^" }2 L! F
data.add(new Tuple3<>(1,2,9));
+ F* l- Y" z7 [! M% J ], n data.add(new Tuple3<>(1,2,11));# Q! ?2 v/ O$ _7 e3 w
data.add(new Tuple3<>(1,2,13));
O; v. c# A M
9 n5 a0 D/ W0 q" `6 G DataStreamSource<Tuple3<Integer,Integer,Integer>> items = env.fromCollection(data);
( q$ `/ `$ V k' g1 ~4 x% j) z+ I( u
; W: k9 d. |1 m& Q7 f8 \ N& M% N( B
1 ^8 j2 X! q, m% F4 n0 Q SingleOutputStreamOperator<Tuple3<Integer, Integer, Integer>> zeroStream = items.filter((FilterFunction<Tuple3<Integer, Integer, Integer>>) value -> value.f0 == 0);
# t8 y0 R* v4 V: A) E F0 I2 u. n, Q- R" A
SingleOutputStreamOperator<Tuple3<Integer, Integer, Integer>> oneStream = items.filter((FilterFunction<Tuple3<Integer, Integer, Integer>>) value -> value.f0 == 1);
3 N. ]% P) V1 K. `4 |( M+ B
) K: y+ s, P( p8 K) o' V) h
/ Z+ N- J- H$ @' K% Q$ `3 ]. a! Y+ G% d; l
zeroStream.print();
/ N. b2 R, C0 v3 X5 J1 M1 T! j5 e* o' `5 C7 b6 n) q. e) B( n/ |
oneStream.printToErr();- e2 N& a5 u4 ?" j6 P% C6 o1 ~. {
$ x" e( b0 ~0 {. \' \9 y. t
1 |% ~; B; d4 H9 m( z- ]* }; P; W5 e. b; R1 Z S6 G
: R1 t9 Y/ f1 A7 C3 Z% u2 ^9 G
! U9 ]6 w/ R: x. c8 {+ F! c7 _
//打印结果- o) a& J0 k( w) H H: b
- n5 \3 _* h: B) P& @! E String jobName = "user defined streaming source";
u+ F" j# \" u8 X
( T3 v$ Y, x- f: o! ]! _* h/ o env.execute(jobName);
5 S0 a& d% _5 Y6 e4 N/ n9 Y4 q: n7 X7 G$ d* g) A: Y
}
* V9 v/ [' u( u! F</code></pre>
0 x6 l3 C& `& H) l<p>在上面的例子中我们使用 filter 算子将原始流进行了拆分,输入数据第一个元素为 0 的数据和第一个元素为 1 的数据分别被写入到了 zeroStream 和 oneStream 中,然后把两个流进行了打印。</p>. M" F, Y0 U% F% |+ s6 h) n! L
<p><img src="https://kingcall.oss-cn-hangzhou.aliyuncs.com/blog/img/Ciqc1F7CA2WAYbshAAKj494h86s723-20210223084827296.png" ></p>7 w# f0 x$ `! b" F
<p>可以看到 zeroStream 和 oneStream 分别被打印出来。</p>
w) K8 {% a% m2 u<p>Filter 的弊端是显而易见的,为了得到我们需要的流数据,需要多次遍历原始流,这样无形中浪费了我们集群的资源。</p>0 M/ ], c$ j6 e0 M; j0 X4 M
<h4 id="split-分流">Split 分流</h4>' e1 Z- A K) ?7 r
<p>Split 也是 Flink 提供给我们将流进行切分的方法,需要在 split 算子中定义 OutputSelector,然后重写其中的 select 方法,将不同类型的数据进行标记,最后对返回的 SplitStream 使用 select 方法将对应的数据选择出来。</p>
% Z. _4 q/ [5 B. W# `<p>我们来看下面的例子:</p>9 E# l+ E8 h. I9 z0 J
<p>复制代码</p>
3 F% Z4 b5 }* [3 F3 m<pre><code class="language-java">public static void main(String[] args) throws Exception {
: G0 R: o+ X3 o
3 f4 O; E5 ^. ~: Z! t3 z- ~ @# C8 H) P' |& I4 Y$ l
# Z: f1 M4 s/ L2 [' J# [9 m1 M
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();6 a" E* M4 y+ \2 v
; R/ p& \0 j0 k: L, x/ D7 L; T) F
//获取数据源
, M$ {, e0 F- V+ D% @3 X; D" Y% r& @8 l" U
List data = new ArrayList<Tuple3<Integer,Integer,Integer>>();
8 M+ c I {: Z2 ? J; Y( `, i8 Z" V3 B* y. q R) {
data.add(new Tuple3<>(0,1,0));
$ w7 R$ y% u5 J9 [3 G2 ~
; }( v( b) `$ v! u# a. ? data.add(new Tuple3<>(0,1,1));
# P; ?! B# U/ m2 D
6 a6 P2 z; ~7 [+ p B; Q data.add(new Tuple3<>(0,2,2));) Y/ g" V& r) ^" {* L4 q
% n& ?2 X# Z. t" m$ }: f* ] data.add(new Tuple3<>(0,1,3));8 n) e1 u- r6 b9 U
; l9 }" m8 q$ R* ^- J/ O( S, c
data.add(new Tuple3<>(1,2,5));- F# u. M, z/ p
( z+ h/ ]8 O( s' P; ^ data.add(new Tuple3<>(1,2,9));4 C0 X9 ^: z8 U) W6 u, G# Y. {
4 j; d( Q+ y4 ]: [7 ~# I* u
data.add(new Tuple3<>(1,2,11));: E3 b7 p7 [) M W
$ Y$ `6 j3 G7 D. u0 Q( k
data.add(new Tuple3<>(1,2,13));" |1 ^* x8 K. S6 w+ ^5 d) G
6 @2 C+ x3 \2 k2 y) }* M" c
$ Q, Q/ m! V/ s$ K8 U; g6 V# O; l# o9 ]6 e/ w
& N+ q2 S& D# `- c$ _1 n P
; q+ @% _6 \ H0 H v' _ DataStreamSource<Tuple3<Integer,Integer,Integer>> items = env.fromCollection(data);
' _% _1 ^9 X& N4 g ~5 k$ S w: _: E% @* k0 ~
4 d v2 k1 O% i" }6 b
8 p+ [" |" l2 S+ b4 S9 v$ \! ?" A& i% v& P% C
$ h7 n& ^# G$ J6 V% ^ SplitStream<Tuple3<Integer, Integer, Integer>> splitStream = items.split(new OutputSelector<Tuple3<Integer, Integer, Integer>>() {7 r& Q: T" O" C& x
5 Z0 v# f3 J. X+ i# K" I4 [' H @Override7 x, o: m& q1 o" E7 G( e, ^
: o5 l, @5 H! L( N
public Iterable<String> select(Tuple3<Integer, Integer, Integer> value) {
4 P6 A/ P3 u1 `6 d2 a' q! \2 x; G, ]
0 ~+ T* V1 M- H2 J1 { List<String> tags = new ArrayList<>();
+ v3 M1 y; r {2 i- p: f' |5 u) f
if (value.f0 == 0) {2 N! ]( J) a8 ^3 ]
# i, S4 }, o- u4 p' N/ _
tags.add("zeroStream");
9 e8 U$ F. S* z9 s% t: ?/ ]
, |2 ~% Y/ W$ f- T+ @) I } else if (value.f0 == 1) {2 d( W; j8 y( u/ n$ v ~7 {
: s9 }- M. a! z) z3 B0 [
tags.add("oneStream");" \8 V; _" N7 I Y# k
( a) |, r$ ~: ^( } }
" |% N/ x+ }, \ X: I, H
. t& S# R. k) ?! @! } return tags;: G/ v- L" D$ ?) Z4 u' c: I$ w
( j$ {, S$ z! o }
8 b9 O7 r) P, p! F1 ~1 s% q5 D% w, i8 n r4 d7 H- L- I6 H2 ?. R& V
});4 S* I3 p% a* y
9 D: M3 _* P) C7 K/ k% F" c6 Y
2 z. [5 @5 A$ `
' b* ]' m: J( z2 z: O splitStream.select("zeroStream").print();
- e+ k; ~- G" }" P; ?8 Y* R- K! N5 W" H# K
splitStream.select("oneStream").printToErr();* p/ A7 ], G/ I, h) e7 s
, M5 s( x" B; T3 P9 Z& T
7 f# \2 g. d4 v+ X, v
6 Z. ?4 ?0 d8 u( f: S, ? //打印结果
& e( U8 }) c( }6 J: ^: A
9 a6 P- G, U; w e String jobName = "user defined streaming source";- ~6 n2 h8 f/ x9 N! M
$ M* f/ F% F, Y7 |8 Z
env.execute(jobName);) g5 {' |# j- A3 c4 [
! Q" m" @7 ^9 W3 ^0 i8 q+ a
}
/ l; F$ [* j, P- x; V</code></pre>, ]& J# @; r- i+ ~6 k
<p>同样,我们把来源的数据使用 split 算子进行了切分,并且打印出结果。</p>. l3 o2 r- b$ Z( {* m
<p><img src="https://kingcall.oss-cn-hangzhou.aliyuncs.com/blog/img/CgqCHl7CA4aAbUSJAAG1LWNB3qw627-20210223084827377.png" ></p>2 m2 U/ t. j6 T8 S
<p>但是要注意,使用 split 算子切分过的流,是不能进行二次切分的,假如把上述切分出来的 zeroStream 和 oneStream 流再次调用 split 切分,控制台会抛出以下异常。</p>! a" p# i* a2 h5 \8 a3 D
<p>复制代码</p>
0 J: Z7 ^% x* y& F" c<pre><code class="language-java">Exception in thread "main" java.lang.IllegalStateException: Consecutive multiple splits are not supported. Splits are deprecated. Please use side-outputs.
. w! P$ `! c+ g1 r+ @% F6 S</code></pre>
+ H1 V, T @2 ]0 M<p>这是什么原因呢?我们在源码中可以看到注释,该方式已经废弃并且建议使用最新的 SideOutPut 进行分流操作。</p>
: ]3 ~& H5 s% q<p><img src="https://kingcall.oss-cn-hangzhou.aliyuncs.com/blog/img/CgqCHl7CA6OAJ-JDAAIrh1JSAEo033-20210223084827602.png" ></p>
6 G! V) g/ q" v' X9 W<h4 id="sideoutput-分流">SideOutPut 分流</h4>" ^& b* E7 l! O$ V5 S
<p>SideOutPut 是 Flink 框架为我们提供的最新的也是最为推荐的分流方法,在使用 SideOutPut 时,需要按照以下步骤进行:</p>
$ }4 f4 Q/ \& n3 r<ul>
# k: I# y; Q( I9 n# u% |<li>定义 OutputTag</li>% o C+ g" W }- O5 a$ t- s, k, Z9 T
<li>调用特定函数进行数据拆分
" R7 x2 h3 N; T<ul>
" b2 s3 h( _) j" b<li>ProcessFunction</li>6 z1 f6 q: p7 i% ~
<li>KeyedProcessFunction</li> ]5 q4 M$ {5 ^$ B- M! m
<li>CoProcessFunction</li>
" O% W& m: q9 w# G<li>KeyedCoProcessFunction</li>1 D9 ~9 g+ T B3 P( {- O. v& m2 s
<li>ProcessWindowFunction</li># ~! C( H" k: h* i1 |5 v
<li>ProcessAllWindowFunction</li>
. Z# X, Q9 |7 Q' T) B! {</ul>
/ C+ }) Z s, m4 d: Y</li>% `$ n% m! G8 \9 n% K/ U8 H
</ul>; |, N* r3 U" R- a% E/ {2 G
<p>在这里我们使用 ProcessFunction 来讲解如何使用 SideOutPut:</p>* y" f$ v, m* G, _
<p>复制代码</p> B7 l1 r6 y) ~4 V% S
<pre><code class="language-java">public static void main(String[] args) throws Exception {2 \7 {: E7 I' r9 [
/ P7 l9 z0 y! q$ [) i* g8 Y
4 T3 z: t3 W. h
2 q3 [/ O( `2 W4 B8 q* R/ ]) |+ }4 m StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
* W$ T" S0 h6 J7 y3 N( K" e- K! W4 B0 B9 P ^
//获取数据源, G3 h7 m( I+ m. @* u/ L, g7 O
/ \, i+ t- h7 J! U5 Q7 g
List data = new ArrayList<Tuple3<Integer,Integer,Integer>>();
) F$ e/ f2 k' D z
$ e2 V. Z) E; Q& [ data.add(new Tuple3<>(0,1,0));. q% |% Z+ G5 T5 Z* F* P
2 i' K; ?& S8 K/ C
data.add(new Tuple3<>(0,1,1));: ^- s6 U% Q/ @: F
2 D; I# _: F, l$ Q* i5 {9 o% m data.add(new Tuple3<>(0,2,2));' p* h; K! S* \ C% l
. h# |$ {, a& b6 d% z6 e data.add(new Tuple3<>(0,1,3));3 M$ U2 c7 B( @; m9 @
! l4 N' V6 _: V0 [
data.add(new Tuple3<>(1,2,5));
! @4 a9 L3 N |8 J+ `# V; x3 c5 f
1 I: G0 A" l9 }( x/ C R data.add(new Tuple3<>(1,2,9));
6 s1 O2 B; j% q; J0 i2 M. V5 r! x' X( I" A1 Y+ L# i% o
data.add(new Tuple3<>(1,2,11));
8 L$ y' K1 q0 [4 I ]
4 u d8 h% Z) [ \5 g data.add(new Tuple3<>(1,2,13));
- F% @* u# [# H5 v# t. m: v* q& _) J, \* f
0 o9 P2 ^: B Q5 u' ^, n% [
+ ^- M. n' {4 F
7 i# [, w7 a! G1 X
' X f! z- p) p% z8 `
DataStreamSource<Tuple3<Integer,Integer,Integer>> items = env.fromCollection(data);6 |* Z4 @8 }' e7 i3 _- b
3 f' l3 D4 L% |. w# p
( n+ N( u, s3 I' W8 d
2 ^. R. K4 x& _2 s$ }2 p
OutputTag<Tuple3<Integer,Integer,Integer>> zeroStream = new OutputTag<Tuple3<Integer,Integer,Integer>>("zeroStream") {}; c# Q ]' N, @/ x( `$ Y5 b
6 V/ e$ M7 x3 ^ OutputTag<Tuple3<Integer,Integer,Integer>> oneStream = new OutputTag<Tuple3<Integer,Integer,Integer>>("oneStream") {};0 Q: i( Y/ `' k O
' {+ a! i ]* p' p
! o# f% ? e# @* m/ f
. E4 I9 F% g9 z, ^9 D
# r0 l* s2 A) j
- A$ a1 k% m: [7 O9 j1 o7 `1 k SingleOutputStreamOperator<Tuple3<Integer, Integer, Integer>> processStream= items.process(new ProcessFunction<Tuple3<Integer, Integer, Integer>, Tuple3<Integer, Integer, Integer>>() {4 S; V( n( k0 [5 J
, m! b Q' \) e
@Override' Y+ V/ q( a+ [+ a0 I
: C: m$ k8 a" j
public void processElement(Tuple3<Integer, Integer, Integer> value, Context ctx, Collector<Tuple3<Integer, Integer, Integer>> out) throws Exception {" v) \; Z& f/ k- x
+ ]& `- X. P* ^5 i: k. I5 D9 g! ?7 S: W# I$ ?1 K0 u
5 B: N5 a7 }& m$ D
if (value.f0 == 0) {) C5 F! ?# w+ F3 N) s
) i# d9 t$ n2 z8 d- F ctx.output(zeroStream, value);% q) A. G' k1 W1 Z
, b) r7 ^! Q$ T2 n w& X" x } else if (value.f0 == 1) {
" c, V7 }2 R9 Y, G' U
) b7 Z$ d: W! J- d: C P t ctx.output(oneStream, value);
2 F( N9 b4 f4 z1 E
% R$ P5 K* ]9 V. B( h* w }: }2 G: r; B6 V J' d
1 D' J/ |8 X6 A- O
}
$ H _3 k8 t" ^/ J# v" @+ m
) r, R1 _) E- R. z. | });$ P' K4 q |" C0 m t, {
2 m O% V3 U" ^% r! r' E
4 Y9 t, M8 i+ z$ e+ n
* H; ~7 t* J5 [' l1 I, J* o DataStream<Tuple3<Integer, Integer, Integer>> zeroSideOutput = processStream.getSideOutput(zeroStream); {+ C3 Z* A& Z8 |5 r1 m
9 J+ T2 ^) _% G+ x5 f
DataStream<Tuple3<Integer, Integer, Integer>> oneSideOutput = processStream.getSideOutput(oneStream);, h$ J2 ~2 o/ f. S
5 q% L! D) o: K+ u' F$ Y* S/ r4 L0 w y& M" _
5 l8 B0 u7 ~# a1 V( J l. Z! K
zeroSideOutput.print();
3 L& R. N' r3 M2 u) l2 s/ k7 s" d- o0 o* w" W& Y# u$ [
oneSideOutput.printToErr();
2 F6 S! ]7 J5 w& v# b
3 R0 D- D: L' p% }0 W' \5 c8 o. y) J9 T. j
; v, C* p5 {; n
- p" B* @& e7 A4 ?; S
" L X: o5 r1 b* S! B0 D2 q //打印结果
" [$ Y' W+ O: K
/ G+ ^, I( b6 P String jobName = "user defined streaming source";: w& L1 y" W5 e' j1 i0 o
) V2 S0 d4 r) Y) h5 E5 c o3 N1 s env.execute(jobName);
0 [0 ~- C: |' g! I( T8 V( x9 l4 ]& Y
) H5 |0 `/ H# g}
" Z' O6 o# G# i0 v. c' F) M9 o</code></pre>, F; \. v$ [6 z4 C3 R; I
<p>可以看到,我们将流进行了拆分,并且成功打印出了结果。这里要注意,Flink 最新提供的 SideOutPut 方式拆分流是<strong>可以多次进行拆分</strong>的,无需担心会爆出异常。</p>
3 b; C) q6 r! ]% {. `: M/ M+ F& F<p><img src="https://kingcall.oss-cn-hangzhou.aliyuncs.com/blog/img/CgqCHl7CBMKAGHoUAAM-5UL5geg132-20210223084827698.png" ></p>
$ b0 F) v- s9 n, {4 C<h3 id="总结">总结</h3>2 v' U) w7 Z1 X4 f1 B+ A& q2 u
<p>这一课时我们讲解了 Flink 的一个小的知识点,是我们生产实践中经常遇到的场景,Flink 在最新的版本中也推荐我们使用 SideOutPut 进行流的拆分。</p>0 G; v+ D3 E2 B. D) d. [
<blockquote>
( ?: S; F9 I2 u' w; L2 b) P<p>关注公众号:<code>大数据技术派</code>,回复<code>资料</code>,领取<code>1024G</code>资料。</p>
3 ^% G. r2 {9 z( s/ v</blockquote>
4 a+ F0 N( ~# v7 ?9 g) h% m, `* I' b$ P2 V; V5 \5 @
|
|