|
|
& ^, |( X$ [4 v$ [9 z
<h4 id="flink系列文章">Flink系列文章</h4>
. h3 x8 E5 o" n<ol>4 J0 {, u; n) x% N' ~' [; w
<li><a href="https://www.ikeguang.com/?p=1976">第01讲:Flink 的应用场景和架构模型</a></li>
" |% l- t, [$ k8 r I/ Q' x' N3 l @<li><a href="https://www.ikeguang.com/?p=1977">第02讲:Flink 入门程序 WordCount 和 SQL 实现</a></li>
6 _' r+ o' f( z2 R<li><a href="https://www.ikeguang.com/?p=1978">第03讲:Flink 的编程模型与其他框架比较</a></li>5 Z) {& c1 D, W! i8 }
<li><a href="https://www.ikeguang.com/?p=1982">第04讲:Flink 常用的 DataSet 和 DataStream API</a></li>7 A. x3 c( M& @4 I& ~+ | p
<li><a href="https://www.ikeguang.com/?p=1983">第05讲:Flink SQL & Table 编程和案例</a></li>2 V" P; @2 h- e/ Y% \# Y3 b
<li><a href="https://www.ikeguang.com/?p=1985">第06讲:Flink 集群安装部署和 HA 配置</a></li>: j+ w+ Y. S& r
<li><a href="https://www.ikeguang.com/?p=1986">第07讲:Flink 常见核心概念分析</a></li>
2 t) S1 o& s1 j r _- o1 ]' w<li><a href="https://www.ikeguang.com/?p=1987">第08讲:Flink 窗口、时间和水印</a></li>
* m: g) u7 K' q4 W0 G+ j/ n<li><a href="https://www.ikeguang.com/?p=1988">第09讲:Flink 状态与容错</a></li>1 ^( |8 _% N' ^% p7 F
</ol>: u& m9 x# o5 o$ H
<blockquote>
6 C5 X+ F# X; C" L8 ~1 J" R5 J<p>关注公众号:<code>大数据技术派</code>,回复<code>资料</code>,领取<code>1024G</code>资料。</p>4 v3 ]& C1 _2 b8 a7 g+ f: q: t
</blockquote>
8 ^, A* {! [( O- `6 k, o# z& ~0 ~$ _<p>这一课时将介绍 Flink 中提供的一个很重要的功能:旁路分流器。</p>0 s4 q/ Y' a8 U A3 z
<h3 id="分流场景">分流场景</h3>+ p% ~% _7 ?2 Y/ S& T
<p>我们在生产实践中经常会遇到这样的场景,需把输入源按照需要进行拆分,比如我期望把订单流按照金额大小进行拆分,或者把用户访问日志按照访问者的地理位置进行拆分等。面对这样的需求该如何操作呢?</p>8 a; s! u S6 v2 }4 M
<h3 id="分流的方法">分流的方法</h3>
! W! o+ s1 @: ?+ f# H& m<p>通常来说针对不同的场景,有以下三种办法进行流的拆分。</p>
5 J! H& V7 i# c4 W<h4 id="filter-分流">Filter 分流</h4>
9 k! O% L7 D+ j# Y$ G/ t5 N1 l<p><img src="https://kingcall.oss-cn-hangzhou.aliyuncs.com/blog/img/CgqCHl7CAy6ADUaXAACSFUbdpuA911-20210223084827182.png" ></p>2 h9 F2 L4 a5 l/ l4 {$ c
<p>Filter 方法我们在第 04 课时中(Flink 常用的 DataSet 和 DataStream API)讲过,这个算子用来根据用户输入的条件进行过滤,每个元素都会被 filter() 函数处理,如果 filter() 函数返回 true 则保留,否则丢弃。那么用在分流的场景,我们可以做多次 filter,把我们需要的不同数据生成不同的流。</p>
! P/ N" \! m6 N+ }3 l& B2 l<p>来看下面的例子:</p>6 P0 V0 ^9 R2 n" J" E8 T
<p>复制代码</p>5 ?! I% Y) e* @+ Z' U
<pre><code class="language-java">public static void main(String[] args) throws Exception {
0 f% A" [! x9 ]2 u; |6 B StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
' B, ?$ Q1 _4 u, r, i8 z n/ n //获取数据源$ ~4 e7 X2 j* M8 ~( J; N& }
List data = new ArrayList<Tuple3<Integer,Integer,Integer>>();
/ W) F) p8 O( E9 Y& |! F data.add(new Tuple3<>(0,1,0));7 M& W. K8 L: h& S% N+ ^
data.add(new Tuple3<>(0,1,1));$ F0 }4 w& Z8 m# X1 B. X4 Q
data.add(new Tuple3<>(0,2,2));
3 e) l; J, ]$ Z, B2 Y& ^$ J data.add(new Tuple3<>(0,1,3));% k" u! q _+ ^; `3 _% s
data.add(new Tuple3<>(1,2,5));
M0 H/ T' x# n9 t; O+ g6 N data.add(new Tuple3<>(1,2,9));
& o# T6 X, [1 i data.add(new Tuple3<>(1,2,11));
$ G. `3 F, ]: |: z3 `9 U8 O data.add(new Tuple3<>(1,2,13));/ D! n: A$ J/ M/ a+ S+ u/ U
, x( X+ {; v9 D2 l+ _' @' e
DataStreamSource<Tuple3<Integer,Integer,Integer>> items = env.fromCollection(data);
3 ^3 ]% ?9 g% F2 n# K" O: `) f+ I1 Y( V0 E
* {( u9 c' Z ]9 T
5 M& X/ A, Q: P6 E( j
SingleOutputStreamOperator<Tuple3<Integer, Integer, Integer>> zeroStream = items.filter((FilterFunction<Tuple3<Integer, Integer, Integer>>) value -> value.f0 == 0);
9 J5 S$ ~& |* W) A% h; L+ }& R b- E
+ a3 o& g( `1 c' W# e( X5 Y i/ \ SingleOutputStreamOperator<Tuple3<Integer, Integer, Integer>> oneStream = items.filter((FilterFunction<Tuple3<Integer, Integer, Integer>>) value -> value.f0 == 1);
6 P8 r8 l7 I! l+ n5 r* c6 w# G) U7 Z) m" \. a, p. }: o
: j* K) d4 N, Y& T
5 Y! N' o6 Q6 d( C' f zeroStream.print();/ N* y+ e, r2 X Z6 i; m& u: H
5 L" O7 p x9 Y, l
oneStream.printToErr();2 D( A9 \$ u. {0 z, r- h8 m6 v
0 K$ ]' s7 I; T0 x5 b: i
0 b, k" e( E( P4 }; n4 H
6 t' o* Y; T. S+ P7 v0 a/ a9 P2 v
( |: m: v. R" m H2 J8 Z) y2 r. ~! b
//打印结果
: b3 U+ G- Z% r* O; B, j5 |
& S+ k+ P' ^9 e. | String jobName = "user defined streaming source";
9 r7 H: D- ~ m8 l, Z
. l- b: i# d0 N/ u; T env.execute(jobName);5 _) d f2 R1 c2 ^7 g
6 v# S J$ u1 y _! t4 l
}& S: f) V1 |( I7 E
</code></pre>$ c( @' W% U4 P# b8 i% u
<p>在上面的例子中我们使用 filter 算子将原始流进行了拆分,输入数据第一个元素为 0 的数据和第一个元素为 1 的数据分别被写入到了 zeroStream 和 oneStream 中,然后把两个流进行了打印。</p>
" {* ~, M0 c/ Q. k<p><img src="https://kingcall.oss-cn-hangzhou.aliyuncs.com/blog/img/Ciqc1F7CA2WAYbshAAKj494h86s723-20210223084827296.png" ></p> I4 w! X& X! t7 ?, t
<p>可以看到 zeroStream 和 oneStream 分别被打印出来。</p>( u' D% W5 y5 H5 s' N0 @/ P) o
<p>Filter 的弊端是显而易见的,为了得到我们需要的流数据,需要多次遍历原始流,这样无形中浪费了我们集群的资源。</p>3 o7 J- _: l3 e9 |
<h4 id="split-分流">Split 分流</h4>7 K5 t8 H8 u. | E, P) B& M
<p>Split 也是 Flink 提供给我们将流进行切分的方法,需要在 split 算子中定义 OutputSelector,然后重写其中的 select 方法,将不同类型的数据进行标记,最后对返回的 SplitStream 使用 select 方法将对应的数据选择出来。</p>5 {- c1 h# e% `
<p>我们来看下面的例子:</p>
- v+ r$ _" m: X# M# k, @5 f [<p>复制代码</p>
S/ U' e# ]& A<pre><code class="language-java">public static void main(String[] args) throws Exception {
{! T/ t4 x& R3 k6 h5 C& g4 t) W) Q4 D$ f8 G% a
& Y, h5 d8 t, a" d8 T
4 h A% X( B3 \' R1 w- j6 H StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
/ s, s5 ?! O! J5 ]7 N( m5 n1 r/ ~7 e5 j* w, l# h
//获取数据源$ o7 W" ]; K" N; S( M% t% c
) K4 ]& Y! F. w List data = new ArrayList<Tuple3<Integer,Integer,Integer>>();! x- M& M* S0 ]9 G; K
' O( W5 f e% E; i data.add(new Tuple3<>(0,1,0));
5 P3 u. J5 O! j$ |# L$ W2 t# T! o6 {' R0 ~+ d b
data.add(new Tuple3<>(0,1,1));$ ~2 D& I3 Z# n$ O
6 h6 l9 F% `0 S3 l1 n2 {$ ?0 {
data.add(new Tuple3<>(0,2,2));6 e; m2 t" ]: Y$ b
4 \. W4 k* N3 G5 t5 d3 x8 [ data.add(new Tuple3<>(0,1,3));5 k* p, D [, f5 y1 R
2 V0 c" R- x5 w. \- v. @
data.add(new Tuple3<>(1,2,5));% \' K; u, I8 I! n" c3 ]8 }+ \
% C3 N8 V. P; X* d* k; J
data.add(new Tuple3<>(1,2,9));
: e: `, N+ V9 u: y% D, U
( h" w& U5 V& X8 g0 L0 C$ f data.add(new Tuple3<>(1,2,11));. N9 F" ~" H d" ?! T( M2 F8 r! m9 T
9 @! b4 U; C7 t% m+ }( l data.add(new Tuple3<>(1,2,13));
4 x; i# ]' O. O) s3 v: w4 B x( U1 l u
; K+ t" @: `- h) q7 C
, l. [6 D* w+ o
2 z: A$ v2 B( z6 y) K. [
% G' G }4 d* R. ^8 @/ b' p DataStreamSource<Tuple3<Integer,Integer,Integer>> items = env.fromCollection(data);
$ P0 U, f1 r2 s4 Q Q/ y/ e$ m+ J
' W% y3 A% M: N1 D+ Z1 b% U* m5 H }. U& S" u
* _6 [' v% `2 y/ X
( E0 {& k' ~; y% a' a, n# R
; H7 U( W1 |' n* L* o SplitStream<Tuple3<Integer, Integer, Integer>> splitStream = items.split(new OutputSelector<Tuple3<Integer, Integer, Integer>>() {& R( c! X" O! a% t; A
@$ w+ g8 x' J* D* s6 b
@Override" Z$ Y. ]; L/ W* c
7 l1 M% t* k a: H( f
public Iterable<String> select(Tuple3<Integer, Integer, Integer> value) {% e. o: b& A: I6 q" u6 [
! E* Q5 s, C& S
List<String> tags = new ArrayList<>();
/ g* E& K( y R4 H9 `5 C. a5 \8 G( u4 v6 h; ~: N9 u7 U
if (value.f0 == 0) {8 c f$ ^: ~- P" F
. {. `! `' y% { K; R3 ~9 W tags.add("zeroStream");- O: v3 O0 C- N7 w% k2 V& @; `
6 @& D2 N1 L! k( u- D- ]2 w/ S } else if (value.f0 == 1) {# E5 V3 s0 G. R3 j+ K4 H
# F; {* |, N x& v e% B* [' p4 d tags.add("oneStream");/ L1 [% I- b& I: m
! b7 K4 p+ l3 ~: a& q, d& h4 y' v }% G+ p+ E) z9 j c/ _
7 N6 f" H; a2 _8 F1 b( a- I( w8 S return tags;
6 [8 L' [- R; H2 Z% c. r& {, P6 F
}8 b1 J# s% T+ h8 @
3 r8 ?; E P) G, y });
9 P' b( q. e. z/ N* ]
" l# I4 I* m3 p( u6 {9 d8 `
+ B4 X3 a1 j( ^5 {/ ]
5 ?6 t4 A5 a c. M4 B& s( I splitStream.select("zeroStream").print();
4 c$ l2 z5 h- k
m2 R6 Q. b$ p8 T* i! z4 q/ Q7 z splitStream.select("oneStream").printToErr();( ?) m4 T% S+ k. Y) j7 A
2 ^ r) u" d" `5 @0 \9 i
3 S8 v p) u- I, s
" [4 n4 d4 H, B1 O3 |* y //打印结果7 `9 O/ z& |1 }" m" z( j7 E- f% a2 w
{9 M- X* b1 W' h# X& {
String jobName = "user defined streaming source";0 K* Q* |0 J* E A }
) a. g" R: n ^! w
env.execute(jobName);
9 w( @0 e0 o4 J3 D/ v5 ~; L. D5 R8 B4 F
}
; K, Y* Q; P+ X9 q }# @9 W* b' @</code></pre>$ b6 I6 b" ?3 ?' p0 m6 x9 k2 r7 r7 u
<p>同样,我们把来源的数据使用 split 算子进行了切分,并且打印出结果。</p>7 U" a3 Z* X" H! v
<p><img src="https://kingcall.oss-cn-hangzhou.aliyuncs.com/blog/img/CgqCHl7CA4aAbUSJAAG1LWNB3qw627-20210223084827377.png" ></p>: _) P1 p% j \0 j5 ?$ O
<p>但是要注意,使用 split 算子切分过的流,是不能进行二次切分的,假如把上述切分出来的 zeroStream 和 oneStream 流再次调用 split 切分,控制台会抛出以下异常。</p>
6 f" b$ o& W7 {+ A0 d6 I<p>复制代码</p># ?4 v2 |0 ]8 x1 g6 @" L
<pre><code class="language-java">Exception in thread "main" java.lang.IllegalStateException: Consecutive multiple splits are not supported. Splits are deprecated. Please use side-outputs.9 j8 x$ z+ i" R$ v# e; F6 g
</code></pre> v* B/ g& V7 }3 } k) b
<p>这是什么原因呢?我们在源码中可以看到注释,该方式已经废弃并且建议使用最新的 SideOutPut 进行分流操作。</p>
+ i# U4 U. [ c; k<p><img src="https://kingcall.oss-cn-hangzhou.aliyuncs.com/blog/img/CgqCHl7CA6OAJ-JDAAIrh1JSAEo033-20210223084827602.png" ></p>! _2 E# Z5 l" r. m
<h4 id="sideoutput-分流">SideOutPut 分流</h4>+ v* r! b! t: l5 V* r! q. J
<p>SideOutPut 是 Flink 框架为我们提供的最新的也是最为推荐的分流方法,在使用 SideOutPut 时,需要按照以下步骤进行:</p>
# E& h! r: r0 `0 V' {9 } o! @<ul>
( Y5 P6 L" Y: d U \<li>定义 OutputTag</li>0 k# P* L4 t* X2 P
<li>调用特定函数进行数据拆分1 C5 w% \, E: c" o0 t) v2 R) v
<ul>
0 a7 t h P' y# v; I6 F% L$ A<li>ProcessFunction</li>- ^8 ]8 C1 G: U
<li>KeyedProcessFunction</li>
) K; r/ |! b, C" z<li>CoProcessFunction</li>, F/ v8 ]% R# a# y6 B# p" W
<li>KeyedCoProcessFunction</li>$ p$ r/ S2 ]7 @7 I+ L8 ^9 G
<li>ProcessWindowFunction</li>
+ D8 x6 w. p- e- D! s<li>ProcessAllWindowFunction</li>$ }1 k3 Z2 ?% \/ e
</ul>) P5 R0 [# Z! F' Q! Q6 E
</li>
* w4 c& M8 D o</ul>
( q+ F8 Q( ~2 m<p>在这里我们使用 ProcessFunction 来讲解如何使用 SideOutPut:</p>, Y) o4 Q1 e4 ]3 J
<p>复制代码</p>- p1 U5 U& t3 {+ W9 w) c- p! J4 X
<pre><code class="language-java">public static void main(String[] args) throws Exception {; {) T+ | t7 a
# w" z2 f) V; S, C1 \
/ c( j) s4 V" B' q6 f0 \/ H
* l# c, _% g: m% T0 O& W; F StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
0 q8 ]3 G z; j/ @ }9 ?0 x4 \
! ]; ?1 a0 _* \) q //获取数据源7 e2 l8 f0 o3 E6 x! ?0 U% i2 _# m
: h, {8 H9 i4 y, o; J! h0 w. f" A List data = new ArrayList<Tuple3<Integer,Integer,Integer>>();% U/ L+ k, ^3 d- w% L" g' y) x
: b- q! P; A! y: c; S2 O% ^ data.add(new Tuple3<>(0,1,0));* t0 T- h6 L6 M
( `8 m6 r: d" ?1 s" O
data.add(new Tuple3<>(0,1,1));& H `8 _4 N; P1 [/ C* ~
, {% s+ M0 N) x' f6 N
data.add(new Tuple3<>(0,2,2));0 o3 m, v& H2 ]( W- S1 S+ F
W! C$ g0 \$ k2 k data.add(new Tuple3<>(0,1,3));
8 S8 s5 s7 m( {; Q/ b9 g, H6 M# C9 S1 V- S; {& @+ Y* S
data.add(new Tuple3<>(1,2,5));+ C* @4 w5 X# p% f2 ]; h
6 U) L" S' w0 H; N3 L data.add(new Tuple3<>(1,2,9));
, W; m j9 o7 U0 C" m7 N9 i% U/ s* u' N" g, Y+ h
data.add(new Tuple3<>(1,2,11));3 Z! i7 J5 q5 C
8 Y4 G& d+ @: h# s* @6 s
data.add(new Tuple3<>(1,2,13));
. G1 P0 o* p. g1 O8 k, r( @6 c9 I+ T! d& t0 X* l
5 K. G- X/ F% h, m
f( ]" o$ T% o
# J+ g3 Q# u: S ^% O3 i! Y( f
, i6 n1 P) W4 z# J3 w& L% P1 t
DataStreamSource<Tuple3<Integer,Integer,Integer>> items = env.fromCollection(data); F: N' t/ m$ q; q' T; X
9 R7 D& J) ~! ?, \& w' G
* ?: V: Q% A0 v# j" M/ l! L% f W& e% d' n
OutputTag<Tuple3<Integer,Integer,Integer>> zeroStream = new OutputTag<Tuple3<Integer,Integer,Integer>>("zeroStream") {};
+ B' g7 Z* b! A' l5 P0 a8 b+ \' d3 p# ^: {! J. B& N3 n
OutputTag<Tuple3<Integer,Integer,Integer>> oneStream = new OutputTag<Tuple3<Integer,Integer,Integer>>("oneStream") {};
) H- J& [: w+ J* C+ H9 k. R
$ j' O! r# z `5 {* | b7 w2 d3 ~5 [; c& t* l/ M
2 y0 g* ]/ {: u# U) c; ]+ e0 t* d
+ `9 u/ x7 q* ]
3 X9 Y& w- y: {8 X$ h% o/ u( x SingleOutputStreamOperator<Tuple3<Integer, Integer, Integer>> processStream= items.process(new ProcessFunction<Tuple3<Integer, Integer, Integer>, Tuple3<Integer, Integer, Integer>>() {: ^5 O" Q* K( A) O8 Y( @/ j
" {% J+ C' Q# l" ^
@Override
7 z2 G3 ~6 ]1 ^+ N2 T" c( d
5 c5 }' |+ b4 |+ @, ~ public void processElement(Tuple3<Integer, Integer, Integer> value, Context ctx, Collector<Tuple3<Integer, Integer, Integer>> out) throws Exception {
$ M/ r6 W) a+ C0 ?8 C& B) I) g9 n3 l8 {3 l- [* C' e
) u, ?: e7 l9 K# K6 Y& q" H P" q$ f3 t+ S, d( O, Y
if (value.f0 == 0) {3 R. k, T2 L5 e. g: }
3 K7 S' A7 G' D( ], I
ctx.output(zeroStream, value);
% }2 J8 A; o* @- o/ V8 n0 K0 D8 ^, E) C+ i3 `
} else if (value.f0 == 1) {
x+ R8 O# v, Y0 |: O! H* @" l% ?0 a2 @3 B6 l# y; `
ctx.output(oneStream, value);. }- ?0 K M$ D6 U
" S# T* H- p9 ^ }
0 x5 {& t+ W5 B* W
& S$ G# X3 t! B1 j1 m* U+ E& K }& c( b% H" O- s
# o1 q. S Z7 P/ Z0 v5 n: `4 t. O });- S( \: s* h9 H5 y
! I' t4 _$ u7 Y X! ]% U! `- ~4 k0 W- N9 e) U, S T
; @5 ]/ \1 G6 f2 ^, A) }
DataStream<Tuple3<Integer, Integer, Integer>> zeroSideOutput = processStream.getSideOutput(zeroStream);
$ o& T1 L+ o# U, H
$ b# g, Q- ~- l' v- N' f! @ DataStream<Tuple3<Integer, Integer, Integer>> oneSideOutput = processStream.getSideOutput(oneStream);
/ `2 x4 C9 ]0 J2 u- Y
" ?+ z) N) H: b M
0 O& l% A. i/ H+ C( M( d, @! m) P' Z; U6 p) H6 x
zeroSideOutput.print();
( G1 N; a9 w* e0 e
* E \* W/ F& v) l; ^ oneSideOutput.printToErr();; B% E: S s; D# z
7 T% l7 F- x. j/ v0 S2 H' ^) N. R/ p5 N
4 Z3 F4 k3 u( t# k' C! m" T5 ?8 S# H" n" M, s5 I
/ A; o& n+ e+ Z, Y8 P) c' J
//打印结果
: G. O: ^, f) K3 ^) i: }
# E1 U, b/ E/ u! ]/ R String jobName = "user defined streaming source";
( b6 x+ o& K; v8 D/ ]# n; W5 Y
( Y# ?% W& ^7 U- a0 J env.execute(jobName);
! N5 w; q- y$ |) u1 T/ k) ?5 n6 @6 k- z# i. X
}
! q4 Z9 C: y9 c# i7 S/ I</code></pre>4 m" _- s& G& G( h5 Y4 A
<p>可以看到,我们将流进行了拆分,并且成功打印出了结果。这里要注意,Flink 最新提供的 SideOutPut 方式拆分流是<strong>可以多次进行拆分</strong>的,无需担心会爆出异常。</p>! |9 Z" S% @" \
<p><img src="https://kingcall.oss-cn-hangzhou.aliyuncs.com/blog/img/CgqCHl7CBMKAGHoUAAM-5UL5geg132-20210223084827698.png" ></p>* D. \8 [" t3 K! A
<h3 id="总结">总结</h3> B- i4 _3 H4 \7 _
<p>这一课时我们讲解了 Flink 的一个小的知识点,是我们生产实践中经常遇到的场景,Flink 在最新的版本中也推荐我们使用 SideOutPut 进行流的拆分。</p>7 R8 s: j2 p, t* e4 ?
<blockquote>
3 z* ^# Y4 b, y: m: f4 T8 }$ z2 L<p>关注公众号:<code>大数据技术派</code>,回复<code>资料</code>,领取<code>1024G</code>资料。</p>
3 O9 n) X* h. g; ^9 O</blockquote>
5 N& S# P$ r# ?" K2 E/ T: H+ ?$ y% z( n- k
|
|