|
- }3 E$ W% i6 e" I" a" M7 W<h4 id="flink系列文章">Flink系列文章</h4>
: b9 A0 u! Q1 B<ol>% F& V, P# ^# ^9 Y9 I
<li><a href="https://www.ikeguang.com/?p=1976">第01讲:Flink 的应用场景和架构模型</a></li>
5 ]) x3 b6 {# U4 h5 o# p, @7 I<li><a href="https://www.ikeguang.com/?p=1977">第02讲:Flink 入门程序 WordCount 和 SQL 实现</a></li>+ k7 \6 Y5 h/ j) [" B* I7 I
<li><a href="https://www.ikeguang.com/?p=1978">第03讲:Flink 的编程模型与其他框架比较</a></li>) C: h1 I6 M5 v3 R2 e* `9 F
<li><a href="https://www.ikeguang.com/?p=1982">第04讲:Flink 常用的 DataSet 和 DataStream API</a></li>: t) B/ y( G k) m
<li><a href="https://www.ikeguang.com/?p=1983">第05讲:Flink SQL & Table 编程和案例</a></li>
/ S0 W7 ~( B* e& {0 x, f<li><a href="https://www.ikeguang.com/?p=1985">第06讲:Flink 集群安装部署和 HA 配置</a></li>
8 R t$ ~4 `3 @<li><a href="https://www.ikeguang.com/?p=1986">第07讲:Flink 常见核心概念分析</a></li>; L- l: y; i2 ~
<li><a href="https://www.ikeguang.com/?p=1987">第08讲:Flink 窗口、时间和水印</a></li>
+ e4 G/ z7 v, t+ [ w<li><a href="https://www.ikeguang.com/?p=1988">第09讲:Flink 状态与容错</a></li>
2 L& K% {9 H7 C/ [. X5 ^3 y</ol>$ M1 [& f& [. V
<blockquote>
8 f, ]* v0 U& T! }7 ^<p>关注公众号:<code>大数据技术派</code>,回复<code>资料</code>,领取<code>1024G</code>资料。</p>4 w4 o. m" r6 l: l# t# {5 @; J- ?
</blockquote>3 h# |: h' E7 b4 Q$ q/ E& d
<p>这一课时将介绍 Flink 中提供的一个很重要的功能:旁路分流器。</p>
# X& T& p* r. }) A& K6 D) L<h3 id="分流场景">分流场景</h3>; n/ V4 x, Q% O
<p>我们在生产实践中经常会遇到这样的场景,需把输入源按照需要进行拆分,比如我期望把订单流按照金额大小进行拆分,或者把用户访问日志按照访问者的地理位置进行拆分等。面对这样的需求该如何操作呢?</p>
! z8 p: p R; v( f1 V# E' e<h3 id="分流的方法">分流的方法</h3>
" o6 K2 {2 R7 p9 ]6 y& j<p>通常来说针对不同的场景,有以下三种办法进行流的拆分。</p>9 L; n( q* a# |
<h4 id="filter-分流">Filter 分流</h4>
3 ?8 P9 ?* e# j7 i8 M T<p><img src="https://kingcall.oss-cn-hangzhou.aliyuncs.com/blog/img/CgqCHl7CAy6ADUaXAACSFUbdpuA911-20210223084827182.png" ></p>
% C$ `8 S: Y7 \+ T9 }& P2 [<p>Filter 方法我们在第 04 课时中(Flink 常用的 DataSet 和 DataStream API)讲过,这个算子用来根据用户输入的条件进行过滤,每个元素都会被 filter() 函数处理,如果 filter() 函数返回 true 则保留,否则丢弃。那么用在分流的场景,我们可以做多次 filter,把我们需要的不同数据生成不同的流。</p>" R& P' X; o6 v- ]
<p>来看下面的例子:</p>7 k+ G' x2 G. p5 X
<p>复制代码</p>
, P! w3 U# J" W+ y' \; i: i<pre><code class="language-java">public static void main(String[] args) throws Exception {
: N* |- @+ t/ Q) @( w StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();9 V4 w# m! S' ~/ ^8 G: S1 }3 y
//获取数据源$ t. x I0 [0 o- G% h5 j) \; m
List data = new ArrayList<Tuple3<Integer,Integer,Integer>>();3 V1 N: @' p% P4 _& X" [
data.add(new Tuple3<>(0,1,0));* c, T; n, f$ _, v* F
data.add(new Tuple3<>(0,1,1));
- G$ L4 }7 M+ j+ |/ `6 O4 N data.add(new Tuple3<>(0,2,2));
! @1 c# c& O: D% M" a( X data.add(new Tuple3<>(0,1,3));; [2 Q: ^' e. Q1 j3 K3 |
data.add(new Tuple3<>(1,2,5));( b5 U. r5 q! X
data.add(new Tuple3<>(1,2,9));
) f7 i% {2 I& w* R; l7 Y6 i; R data.add(new Tuple3<>(1,2,11));
4 s( ~" d" H7 ^3 m data.add(new Tuple3<>(1,2,13));
2 D0 @4 ~4 W" [. F2 b4 ^5 y0 A9 ~2 P+ m
DataStreamSource<Tuple3<Integer,Integer,Integer>> items = env.fromCollection(data);
( V1 A/ \% x' [3 x/ O: a. V/ M w. S# q
1 r9 \9 {4 H( ~! c1 F( b+ {" M/ L0 o+ U5 H& S2 m* L* h
SingleOutputStreamOperator<Tuple3<Integer, Integer, Integer>> zeroStream = items.filter((FilterFunction<Tuple3<Integer, Integer, Integer>>) value -> value.f0 == 0);
5 Y" X( E: A) D5 Z- v' Y5 [
/ F" v; |7 r! \6 C4 p+ D9 b' Q( H SingleOutputStreamOperator<Tuple3<Integer, Integer, Integer>> oneStream = items.filter((FilterFunction<Tuple3<Integer, Integer, Integer>>) value -> value.f0 == 1);
- ~4 T; E. ?- z
, E2 ]8 m% v0 R* h. n; b: c7 M3 d* u# C1 h6 d
1 n0 L# d) E D# ~. S! c, w' V
zeroStream.print(); p4 N% b3 R$ e" N5 _$ w
; |. ]% g4 b0 S) k+ a' N& ` l oneStream.printToErr();& _( P, H' i# n0 w' R
, [0 t0 x) O! y' Z3 G
4 p# u5 b0 v' C7 @8 [: t; w/ @4 V) N$ Q( A' Z) C7 ~) N6 Z& {9 }
0 S8 ^# v' c6 e' ]4 l
! g7 A; V$ m, K //打印结果5 g! n- A% N8 R9 n5 u
+ _8 i; J) j. |' H/ s# x; _
String jobName = "user defined streaming source";
+ e8 R2 t& K3 h) e% J" ]
" A; ~) P3 N, Z env.execute(jobName);
, e6 o: D. p7 m% \# W& \4 c# ^! A4 z9 k$ I* U$ a Y6 F7 m: S
}$ x6 y9 m! y% M ]# Z6 s1 p& l
</code></pre>
$ L. K( k" p8 y, z/ s+ C4 m* V$ }<p>在上面的例子中我们使用 filter 算子将原始流进行了拆分,输入数据第一个元素为 0 的数据和第一个元素为 1 的数据分别被写入到了 zeroStream 和 oneStream 中,然后把两个流进行了打印。</p>0 H1 @5 r: g% E
<p><img src="https://kingcall.oss-cn-hangzhou.aliyuncs.com/blog/img/Ciqc1F7CA2WAYbshAAKj494h86s723-20210223084827296.png" ></p>/ a% d8 T7 e/ `0 X
<p>可以看到 zeroStream 和 oneStream 分别被打印出来。</p>9 X8 S; G( R% e) Z% j2 v$ W
<p>Filter 的弊端是显而易见的,为了得到我们需要的流数据,需要多次遍历原始流,这样无形中浪费了我们集群的资源。</p>7 U; t; W" G8 g
<h4 id="split-分流">Split 分流</h4>) K+ d: A0 P e- @% Z" F
<p>Split 也是 Flink 提供给我们将流进行切分的方法,需要在 split 算子中定义 OutputSelector,然后重写其中的 select 方法,将不同类型的数据进行标记,最后对返回的 SplitStream 使用 select 方法将对应的数据选择出来。</p>
/ u \7 r5 s9 f; ?7 e<p>我们来看下面的例子:</p>
2 W0 k/ m5 l; u+ L+ \. q<p>复制代码</p>
2 {! ?0 e3 j7 Z6 ~- N) |<pre><code class="language-java">public static void main(String[] args) throws Exception {3 {1 S& J1 R: m3 {' O8 K/ @
" G) m+ }" Z7 S6 B2 y! h0 ^& T, E* z5 e+ C) x
, Z0 m0 H8 x3 S2 x1 R& Y" _. G StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();0 g5 ~$ x8 `4 _+ D9 a
. A6 ~# Y- o" J( M8 E
//获取数据源
7 J9 o6 q8 Z4 t' w# y& U9 R* N
6 q% S; f% E7 M* S# G List data = new ArrayList<Tuple3<Integer,Integer,Integer>>();+ |2 y( L. f- W% h$ N
& G( v7 b \, }; O6 S: J& x* I data.add(new Tuple3<>(0,1,0));
$ h( f1 \, M% o5 b' _
( R/ z7 ~$ W" G. ~4 _5 k4 b) r data.add(new Tuple3<>(0,1,1));
- ^4 N; j z. }; J$ @5 z3 o/ Q* C6 [& |4 s+ B: j, g [+ s
data.add(new Tuple3<>(0,2,2));( G% Z' q# C. U5 e; R& ~) K
! E7 r+ ]6 ]+ B( U5 v
data.add(new Tuple3<>(0,1,3));
1 W" D& k: {8 x$ @% f* |
. x& V3 e% O7 {4 S. q data.add(new Tuple3<>(1,2,5));( Z9 L$ K+ [( }7 `& \. Q
9 E; O; i7 u- R/ ~5 ^' O' s6 o
data.add(new Tuple3<>(1,2,9));
. i4 y6 B" M3 i1 I3 i G, \1 U2 Q4 A0 y: w/ U2 J
data.add(new Tuple3<>(1,2,11));
1 f! u7 x) u L: x7 L# C9 j
, T% Y: u: i6 _& ~ data.add(new Tuple3<>(1,2,13));
. [! a5 x% y" F' _$ L( i. U; G' |/ j$ r0 R' ^
+ A. `8 B2 R$ m) _0 w+ T" C" |/ K! C2 ~; P/ V- X
9 e. b3 j- C* T, G5 q, {
% J! C: Q, Z! ]' P! L( { DataStreamSource<Tuple3<Integer,Integer,Integer>> items = env.fromCollection(data);$ V5 u& X; j& _% _- m" J- P
2 G/ ~' Y. E a1 `6 n6 ~% v& D3 S
+ ?# U) r- A9 ^4 Q4 h8 k3 r) h; R/ k! Z& ^0 C
Y' c5 Z+ U* Z" a
SplitStream<Tuple3<Integer, Integer, Integer>> splitStream = items.split(new OutputSelector<Tuple3<Integer, Integer, Integer>>() {
- p( G' T7 ~1 @7 V7 e8 z0 N; i! z/ w- J* @
@Override
/ b# O ^& i; _6 Z; h* i4 i& g1 j4 L- m$ p/ G
public Iterable<String> select(Tuple3<Integer, Integer, Integer> value) {
; x8 P8 }! i3 ]$ U/ g, W! E$ U4 \. \; [3 c
List<String> tags = new ArrayList<>();
. c* ]/ \: p1 Z7 {; M7 l! ^* U0 S! ^- {# H7 C* [. q' j
if (value.f0 == 0) {1 h+ B3 f4 s+ s
; d, M. S" H. b" K4 H
tags.add("zeroStream");0 O/ c4 N6 }) ~
/ y$ B0 _3 ~3 c4 n
} else if (value.f0 == 1) {
) _: t( [- h. ?3 t8 j. h9 L( i% ` ]. n7 ]/ O3 \. O6 q
tags.add("oneStream");
, T% l( ]+ J) a9 L3 g, j7 s( |; D1 C! ~; M7 A6 W8 B+ Q
}( J1 u0 e) O) A2 |* Z
; p) d5 J$ c: j9 A( }3 f return tags;
( k$ J0 m. v+ b7 F9 m: U3 Y% W9 r" w: C5 ?5 H
}" C, u* r/ s, m p. x
U0 r( A+ J3 ?7 i: F1 p, x
});4 q/ ~: X+ `; N( s% O- {: M* ]' E
% ]: u4 q6 i) g( n* e6 q8 M8 u. w# w! e9 _2 l* D
( F' N. H) Z4 P+ K0 l splitStream.select("zeroStream").print();, Y) ?3 d: m* R% z- y
! H6 c9 w8 @8 T6 ~/ s, _ splitStream.select("oneStream").printToErr();
& V; u) i1 ~. G! l% C
: Y1 |9 c; D& q4 N, {3 }- G. `, w# a) q- D
3 h* |$ X$ l2 M8 n& [
//打印结果) R$ j% [1 K- G% S+ u# g
7 y2 W, o; S6 J# |5 [4 b1 ]; Q String jobName = "user defined streaming source";$ Q& l# u0 f: Z9 L" G- X6 C+ h
: n* r. K& w( \ env.execute(jobName);. h% r( e5 \' ~% [, `) `
6 z; `* u" s! b& d9 h7 K% B}$ a# n/ A. A7 ]2 D4 `- p4 F% {" A
</code></pre>- Z0 ]: Y' T F
<p>同样,我们把来源的数据使用 split 算子进行了切分,并且打印出结果。</p>
5 T+ t3 ?* H# q \0 I3 W<p><img src="https://kingcall.oss-cn-hangzhou.aliyuncs.com/blog/img/CgqCHl7CA4aAbUSJAAG1LWNB3qw627-20210223084827377.png" ></p>
, J @# t7 T0 l* U/ _8 ?: C4 C<p>但是要注意,使用 split 算子切分过的流,是不能进行二次切分的,假如把上述切分出来的 zeroStream 和 oneStream 流再次调用 split 切分,控制台会抛出以下异常。</p>- ]7 C( @' ?! h3 m) H- d7 Q R
<p>复制代码</p>2 L3 j: b q& C/ c& R. p9 P
<pre><code class="language-java">Exception in thread "main" java.lang.IllegalStateException: Consecutive multiple splits are not supported. Splits are deprecated. Please use side-outputs.6 i# y# R9 b* p# @' s
</code></pre>8 O$ M6 q6 w2 @
<p>这是什么原因呢?我们在源码中可以看到注释,该方式已经废弃并且建议使用最新的 SideOutPut 进行分流操作。</p>7 f4 |0 D" g# R P
<p><img src="https://kingcall.oss-cn-hangzhou.aliyuncs.com/blog/img/CgqCHl7CA6OAJ-JDAAIrh1JSAEo033-20210223084827602.png" ></p>
2 H0 m( {( P7 h+ Y<h4 id="sideoutput-分流">SideOutPut 分流</h4>
* C L9 h/ l. S4 V, \3 i0 a<p>SideOutPut 是 Flink 框架为我们提供的最新的也是最为推荐的分流方法,在使用 SideOutPut 时,需要按照以下步骤进行:</p>
3 f% z5 v4 q I<ul>
- Z3 S/ g3 U+ E- z* Q D4 `<li>定义 OutputTag</li>8 L2 D/ j' {! T
<li>调用特定函数进行数据拆分. B2 h4 H5 f; v" K1 T
<ul>
8 x4 S+ B% V, b& Y/ e0 _5 |/ v<li>ProcessFunction</li>
3 m" g' o+ W5 _0 P/ g. P<li>KeyedProcessFunction</li>, M' g! G0 q, X
<li>CoProcessFunction</li>
7 K$ i& \3 l( D* l% v<li>KeyedCoProcessFunction</li>0 q. ?( C: h7 S: f
<li>ProcessWindowFunction</li>& L) y+ R1 X8 C# x
<li>ProcessAllWindowFunction</li>
7 l7 e! U% J Q7 p/ I7 f# {- f</ul>
2 | b4 O. ]$ s% [5 A- C</li>
# E( R/ @4 A8 z! C1 _2 O</ul>
2 | a: Q R0 f4 a0 X$ x, `<p>在这里我们使用 ProcessFunction 来讲解如何使用 SideOutPut:</p>
" K8 [* P! h1 g* l* G" ]<p>复制代码</p>) o% _4 K+ o, M" w
<pre><code class="language-java">public static void main(String[] args) throws Exception {9 O4 M* j# ^( C7 Z
, P+ ^) T- c* `7 f9 V1 @5 Y5 _& B! x3 `9 w$ j3 J5 _1 f
1 B3 c6 G2 @- Q5 a StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
3 M6 v5 H- @& I: t0 W8 u: [4 T
) u6 J* N9 T8 }! m$ M8 j //获取数据源$ e* D* k* u# D- ?5 ?
3 J# U& Q, y9 w% a9 Y* C List data = new ArrayList<Tuple3<Integer,Integer,Integer>>();
* x1 V- P2 E( p4 D+ s( X0 ?
! M' I2 }) |$ o( j' H( f4 h1 W) Q data.add(new Tuple3<>(0,1,0));3 B) c7 _% e0 B2 [& r1 L
: k5 g# \8 O; F data.add(new Tuple3<>(0,1,1));
& }+ u& s& q$ e* w
8 ~% `, }0 |" G b: [9 ?% J! O data.add(new Tuple3<>(0,2,2));
. Y% u) W, r1 y0 s! ~" Z% X$ ]5 ?/ o
data.add(new Tuple3<>(0,1,3));
- n+ j% f2 M7 s( r# u6 U% [" }' t( E. _+ t( B
data.add(new Tuple3<>(1,2,5));. _4 g- j; ^8 W2 P
) ]$ }5 c% ]$ \5 [: a data.add(new Tuple3<>(1,2,9));
+ ~0 W; N1 _7 `$ \& d2 C
1 }, _+ v) J. x" s# ] data.add(new Tuple3<>(1,2,11));
! I% A* d7 W' V! V2 L' e0 ^; W! `7 H B8 |, m
data.add(new Tuple3<>(1,2,13));
& e: w# ? _9 ]8 k; ?" X1 |
& G/ p. Z' V( i( u, h6 x3 A9 @" [4 @3 [! b
' u1 n! n6 {! \0 y( p
* h% l* F D Y, A/ U( N+ u" u6 v% ^7 ~; ]
DataStreamSource<Tuple3<Integer,Integer,Integer>> items = env.fromCollection(data);! l& r: C6 X! ?) n' s
7 t6 H6 x. l5 | q" J2 d
, |6 l' i% W3 i* `3 V' V( k$ Q3 B4 T2 h
OutputTag<Tuple3<Integer,Integer,Integer>> zeroStream = new OutputTag<Tuple3<Integer,Integer,Integer>>("zeroStream") {};
) I! z7 W" g, Q
8 t* b: l( j! v$ s' x1 Y OutputTag<Tuple3<Integer,Integer,Integer>> oneStream = new OutputTag<Tuple3<Integer,Integer,Integer>>("oneStream") {};
4 k% y. C/ m' ^8 g6 `9 z9 \% {
" ?9 m( W, M0 ~( B1 G5 Q* b+ V! X4 }# Y0 d. I
p0 h# z" U/ T1 J; T+ f
8 f! L& `7 a" l N& x7 u
9 e5 \' x6 w* l& o: j; @
SingleOutputStreamOperator<Tuple3<Integer, Integer, Integer>> processStream= items.process(new ProcessFunction<Tuple3<Integer, Integer, Integer>, Tuple3<Integer, Integer, Integer>>() {6 q7 V$ M- Z7 L% S- }/ |; r% e
/ k8 H* r% U7 ?6 q% p @Override% P) \2 E, z% _/ m5 J' U4 }
7 B) b/ s- D1 |- R public void processElement(Tuple3<Integer, Integer, Integer> value, Context ctx, Collector<Tuple3<Integer, Integer, Integer>> out) throws Exception {
+ c: Y2 c1 S8 O6 @2 D% K
% A- \7 v! [# v7 k3 \9 j; L
2 j, j2 x4 w) w' V! b
) }8 t. i, i8 p& Z if (value.f0 == 0) {. B& z0 H+ q7 a
\6 p; g9 v: g2 V
ctx.output(zeroStream, value);
1 H3 r) ?) E, ~; }3 v3 u+ H. T0 q9 r- [; P& G' ^; E& s/ W
} else if (value.f0 == 1) {) G: k9 x9 e2 q, ?
, z+ Q2 C2 P) j" \) B8 D" T
ctx.output(oneStream, value);
6 D y$ o/ Z' j: t" R$ P O! ]; W0 X( n6 l; p
}8 @- m T0 B8 M) O" [
1 `& ^6 \' ^9 W Z1 C0 i, e# W9 x }
) \) w, K( T1 r8 U3 I/ U' ^' i8 W* I2 `& r& v
});
3 Y) I6 L: V+ \* H
" y! ]" P5 s5 U" ~8 R" B& }! S. `7 P: a2 l' r' F$ j, @
. Q1 |' e" G5 _
DataStream<Tuple3<Integer, Integer, Integer>> zeroSideOutput = processStream.getSideOutput(zeroStream);+ u: _8 M" { B+ Z
* n7 r5 n5 s2 E DataStream<Tuple3<Integer, Integer, Integer>> oneSideOutput = processStream.getSideOutput(oneStream);- l$ J% ]" f! w7 c, N
1 M* w* u$ B& ?6 [$ S
& p! b* F# G; W+ @2 O. b7 ~
' W! w ?7 [1 V$ h! A% H( { zeroSideOutput.print();1 |/ j% O4 h% \$ y
# E* n+ h! `% e1 `0 ]/ L
oneSideOutput.printToErr();. i5 Y5 j n& }+ a4 Y, A! L
& G8 ^. C! B, q- m+ W! j4 T; y+ @
4 j; I8 K4 |( I- \
C% N! ` j. L# e( r' ]8 h6 ?
+ K" R9 `* A5 r: e! b3 e: E" [4 T& C7 a //打印结果
. A* a7 H( k6 y2 J6 I( z! G7 N" G% c E2 z! ]0 ?0 ^
String jobName = "user defined streaming source";
' i2 ]/ l2 b7 w Z6 U2 g5 r0 b% g2 W1 A* Z6 j4 |. o
env.execute(jobName);
' A% R& x" F4 @/ ]
8 t6 t9 T3 o4 n8 o/ H2 z3 V3 I}8 Y# g" s& t' `$ d8 l9 V
</code></pre>$ L5 r* n( r: n. c: ^& h) x" A
<p>可以看到,我们将流进行了拆分,并且成功打印出了结果。这里要注意,Flink 最新提供的 SideOutPut 方式拆分流是<strong>可以多次进行拆分</strong>的,无需担心会爆出异常。</p>
$ S. g" O: B" R. p<p><img src="https://kingcall.oss-cn-hangzhou.aliyuncs.com/blog/img/CgqCHl7CBMKAGHoUAAM-5UL5geg132-20210223084827698.png" ></p>$ Y+ b5 O0 F5 g5 ~, t; X, H
<h3 id="总结">总结</h3>1 P6 W4 m n2 L% Q0 ^: A
<p>这一课时我们讲解了 Flink 的一个小的知识点,是我们生产实践中经常遇到的场景,Flink 在最新的版本中也推荐我们使用 SideOutPut 进行流的拆分。</p>' q3 f# H3 S) W+ g/ ]0 y
<blockquote>
& F, v1 s0 `6 k! A+ Z" z<p>关注公众号:<code>大数据技术派</code>,回复<code>资料</code>,领取<code>1024G</code>资料。</p>1 N3 z0 I' q9 C4 e+ H! i" { c
</blockquote>* j! B5 i- W! a' e8 K: R! ~
/ A! U4 g! z$ C |
|